Kafka Binder Listener Container Customizers

Spring Cloud Stream 通过自定义工具为消息监听器容器提供了强大的自定义选项。 本节介绍了 Kafka 可用的自定义界面:ListenerContainerCustomizer,其卡夫卡特有的扩展KafkaListenerContainerCustomizer,以及专门化的ListenerContainerWithDlqAndRetryCustomizer.spring-doc.cadn.net.cn

ListenerContainerCustomizer

ListenerContainerCustomizer是 Spring Cloud Stream 中的一个通用接口,允许对消息监听器容器进行定制。spring-doc.cadn.net.cn

目的

当你需要修改监听器容器的行为时,可以使用这个自定义器。spring-doc.cadn.net.cn

用法

使用ListenerContainerCustomizer创建一个在你的配置中实现该接口的豆子:spring-doc.cadn.net.cn

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> genericCustomizer() {
    return (container, destinationName, group) -> {
        // Customize the container here
    };
}

ListenerContainerCustomizer接口定义了以下方法:spring-doc.cadn.net.cn

void configure(C container, String destinationName, String group);

KafkaListenerContainerCustomizer

KafkaListenerContainerCustomizer接口扩展ListenerContainerCustomizer修改监听器容器的行为,并提供对绑定特定 Kafka 扩展消费者属性的访问。spring-doc.cadn.net.cn

目的

当你需要访问绑定专用的 Kafka 扩展消费者属性并自定义监听器容器时,可以使用这个自定义器。spring-doc.cadn.net.cn

用法

使用KafkaListenerContainerCustomizer创建一个在你的配置中实现该接口的豆子:spring-doc.cadn.net.cn

@Bean
public KafkaListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> kafkaCustomizer() {
    return (container, destinationName, group, properties) -> {
        // Customize the Kafka container here
    };
}

KafkaListenerContainerCustomizer界面增加了以下方法:spring-doc.cadn.net.cn

default void configureKafkaListenerContainer(
    C container,
    String destinationName,
    String group,
    ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        configure(container, destinationName, group);
}

该方法扩展了底线配置附加参数的方法:spring-doc.cadn.net.cn

ListenerContainerWithDlqAndRetryCustomizer

ListenerContainerWithDlqAndRetryCustomizer界面为涉及死符队列(DLQ)和重试机制的场景提供了额外的自定义选项。spring-doc.cadn.net.cn

目的

当你需要微调DLQ行为或为Kafka用户实现自定义重试逻辑时,可以使用这个自定义器。spring-doc.cadn.net.cn

用法

使用ListenerContainerWithDlqAndRetryCustomizer创建一个在你的配置中实现该接口的豆子:spring-doc.cadn.net.cn

@Bean
public ListenerContainerWithDlqAndRetryCustomizer dlqCustomizer() {
    return (container, destinationName, group, dlqDestinationResolver, backOff, properties) -> {
        // Access the container here with access to the extended consumer binding properties.
    };
}

ListenerContainerWithDlqAndRetryCustomizer接口定义了以下方法:spring-doc.cadn.net.cn

void configure(
    AbstractMessageListenerContainer<?, ?> container,
    String destinationName,
    String group,
    BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
    BackOff backOff,
    ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties
);

总结

这种分层方法允许在 Spring Cloud Stream 应用中灵活且具体地定制 Kafka 监听器容器。spring-doc.cadn.net.cn