这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4spring-doc.cadn.net.cn

容器工厂

如在 @KafkaListener 注解 中讨论的,使用 ConcurrentKafkaListenerContainerFactory 来创建带有注解方法的容器。spring-doc.cadn.net.cn

从 2.2 版本开始,您可以使用相同的工厂来创建任何 ConcurrentMessageListenerContainer。 这可能在您需要创建具有相似属性的多个容器,或者希望使用某些外部配置的工厂(如由 Spring Boot 自动配置提供的工厂)时很有用。 一旦创建了容器,您可以进一步修改其属性,其中许多属性是通过使用 container.getContainerProperties() 设置的。 以下示例配置了一个 ConcurrentMessageListenerContainerspring-doc.cadn.net.cn

@Bean
public ConcurrentMessageListenerContainer<String, String>(
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> container =
        factory.createContainer("topic1", "topic2");
    container.setMessageListener(m -> { ... } );
    return container;
}
通过这种方式创建的容器不会被添加到端点注册表中。 它们应该作为@Bean定义创建,以便与应用程序上下文注册。

从 2.3.4 版本开始,你可以在工厂后添加一个 0 来进一步配置每个在创建和配置后生成的容器。spring-doc.cadn.net.cn

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setContainerCustomizer(container -> { /* customize the container */ });
    return factory;
}

从版本 3.1 开始,还可以通过在 KafkaListener 注解上指定 'ContainerPostProcessor' 的 bean 名称,在单个监听器上应用相同类型的自定义。spring-doc.cadn.net.cn

@Bean
public ContainerPostProcessor<String, String, AbstractMessageListenerContainer<String, String>> customContainerPostProcessor() {
    return container -> { /* customize the container */ };
}

...

@KafkaListener(..., containerPostProcessor="customContainerPostProcessor", ...)