|
对于最新稳定版本,请使用Spring for Apache Kafka 4.0.4! |
容器工厂
如在 @KafkaListener 注解 中讨论的,使用 ConcurrentKafkaListenerContainerFactory 来创建带有注解方法的容器。
从 2.2 版本开始,您可以使用相同的工厂来创建任何 ConcurrentMessageListenerContainer。
这可能在您需要创建具有相似属性的多个容器,或者希望使用某些外部配置的工厂(如由 Spring Boot 自动配置提供的工厂)时很有用。
一旦创建了容器,您可以进一步修改其属性,其中许多属性是通过使用 container.getContainerProperties() 设置的。
以下示例配置了一个 ConcurrentMessageListenerContainer:
@Bean
public ConcurrentMessageListenerContainer<String, String>(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> container =
factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}
通过这种方式创建的容器不会被添加到端点注册表中。
它们应该作为@Bean定义创建,以便与应用程序上下文注册。 |
从 2.3.4 版本开始,你可以在工厂后添加一个 0 来进一步配置每个在创建和配置后生成的容器。
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setContainerCustomizer(container -> { /* customize the container */ });
return factory;
}
从版本 3.1 开始,还可以通过在 KafkaListener 注解上指定 'ContainerPostProcessor' 的 bean 名称,在单个监听器上应用相同类型的自定义。
@Bean
public ContainerPostProcessor<String, String, AbstractMessageListenerContainer<String, String>> customContainerPostProcessor() {
return container -> { /* customize the container */ };
}
...
@KafkaListener(..., containerPostProcessor="customContainerPostProcessor", ...)