此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9spring-doc.cadn.net.cn

集装箱工厂

@KafkaListener注解一个ConcurrentKafkaListenerContainerFactory用于为带注释的方法创建容器。spring-doc.cadn.net.cn

从 2.2 版开始,您可以使用相同的工厂创建任何ConcurrentMessageListenerContainer. 如果您想创建多个具有相似属性的容器,或者您希望使用一些外部配置的工厂,例如 Spring Boot 自动配置提供的工厂,这可能很有用。 创建容器后,您可以进一步修改其属性,其中许多属性是通过使用container.getContainerProperties(). 以下示例配置ConcurrentMessageListenerContainer:spring-doc.cadn.net.cn

@Bean
public ConcurrentMessageListenerContainer<String, String>(
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> container =
        factory.createContainer("topic1", "topic2");
    container.setMessageListener(m -> { ... } );
    return container;
}
以这种方式创建的容器不会添加到终结点注册表中。 它们应该创建为@Bean定义,以便它们向应用程序上下文注册。

从 2.3.4 版本开始,您可以添加ContainerCustomizer到工厂,以便在创建和配置每个容器后进一步配置它。spring-doc.cadn.net.cn

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setContainerCustomizer(container -> { /* customize the container */ });
    return factory;
}

从 3.1 版开始,还可以通过在 KafkaListener 注释上指定“ContainerPostProcessor”的 bean 名称,在单个侦听器上应用相同类型的自定义。spring-doc.cadn.net.cn

@Bean
public ContainerPostProcessor<String, String, AbstractMessageListenerContainer<String, String>> customContainerPostProcessor() {
    return container -> { /* customize the container */ };
}

...

@KafkaListener(..., containerPostProcessor="customContainerPostProcessor", ...)