Kafka Binder Listener Container Customizers
Spring Cloud Stream 通过自定义工具为消息监听器容器提供了强大的自定义选项。
本节介绍了 Kafka 可用的自定义界面:ListenerContainerCustomizer,其卡夫卡特有的扩展KafkaListenerContainerCustomizer,以及专门化的ListenerContainerWithDlqAndRetryCustomizer.
ListenerContainerCustomizer
这ListenerContainerCustomizer是 Spring Cloud Stream 中的一个通用接口,允许对消息监听器容器进行定制。
用法
使用ListenerContainerCustomizer创建一个在你的配置中实现该接口的豆子:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> genericCustomizer() {
return (container, destinationName, group) -> {
// Customize the container here
};
}
这ListenerContainerCustomizer接口定义了以下方法:
void configure(C container, String destinationName, String group);
-
容器: 可自定义的消息监听器容器。 -
目的地名称:目的地名称(主题)。 -
群:消费者团体ID。
KafkaListenerContainerCustomizer
这KafkaListenerContainerCustomizer接口扩展ListenerContainerCustomizer修改监听器容器的行为,并提供对绑定特定 Kafka 扩展消费者属性的访问。
用法
使用KafkaListenerContainerCustomizer创建一个在你的配置中实现该接口的豆子:
@Bean
public KafkaListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> kafkaCustomizer() {
return (container, destinationName, group, properties) -> {
// Customize the Kafka container here
};
}
这KafkaListenerContainerCustomizer界面增加了以下方法:
default void configureKafkaListenerContainer(
C container,
String destinationName,
String group,
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
configure(container, destinationName, group);
}
该方法扩展了底线配置附加参数的方法:
-
扩展消费者属性:扩展消费性资产,包括卡夫卡专属作品。
ListenerContainerWithDlqAndRetryCustomizer
这ListenerContainerWithDlqAndRetryCustomizer界面为涉及死符队列(DLQ)和重试机制的场景提供了额外的自定义选项。
用法
使用ListenerContainerWithDlqAndRetryCustomizer创建一个在你的配置中实现该接口的豆子:
@Bean
public ListenerContainerWithDlqAndRetryCustomizer dlqCustomizer() {
return (container, destinationName, group, dlqDestinationResolver, backOff, properties) -> {
// Access the container here with access to the extended consumer binding properties.
};
}
这ListenerContainerWithDlqAndRetryCustomizer接口定义了以下方法:
void configure(
AbstractMessageListenerContainer<?, ?> container,
String destinationName,
String group,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
BackOff backOff,
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties
);
-
容器:卡夫卡听众容器可自定义。 -
目的地名称:目的地名称(主题)。 -
群:消费者团体ID。 -
dlqDestinationResolver: 一个用于解析失败记录DLQ目的地的函数。 -
退避:重试时的退回政策。 -
扩展消费者属性:扩展消费性资产,包括卡夫卡专属作品。
总结
-
ListenerContainerWithDlqAndRetryCustomizer如果启用了DLQ,则使用。 -
KafkaListenerContainerCustomizer用于卡夫卡专属的自定义,无需DLQ。 -
基地
ListenerContainerCustomizer用于通用定制。
这种分层方法允许在 Spring Cloud Stream 应用中灵活且具体地定制 Kafka 监听器容器。