|
对于最新稳定版本,请使用Spring for Apache Kafka 4.0.4! |
开始 @KafkaListeners 在序列中
一个常见的用例是在另一个监听器已消耗完主题中的所有记录后启动该监听器。
例如,您可能希望在处理来自其他主题的记录之前,先将一个或多个压缩主题的内容加载到内存中。
从版本2.7.3开始,引入了一个新组件ContainerGroupSequencer。
它使用@KafkaListener的containerGroup属性来对容器进行分组,并且当当前组中的所有容器都处于空闲状态时,启动下一组中的容器。
最好用一个例子来说明。
@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}
@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}
@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}
@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}
@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}
在此,我们有两个组的四个监听器,g1 和 g2。
在应用程序上下文初始化期间,序列生成器会将所提供组中所有容器的autoStartup属性设置为false。它还会将任何未设置的容器(如果尚未设置)的idleEventInterval属性设置为提供的值(本例中为5000毫秒)。然后,当应用程序上下文启动序列生成器时,首先启动第一组中的容器。收到ListenerContainerIdleEvent信号后,每个容器中的各个子容器将停止运行。当某个ConcurrentMessageListenerContainer中的所有子容器都已停止时,父容器也将停止。当一组中的所有容器都已停止时,接下来开始启动下一组中的容器。组数和每组中的容器数量没有限制。
默认情况下,最终组(上例中的g2)中的容器在空闲时不会停止。要修改该行为,请将定时器上的stopLastGroupWhenIdle设置为true。
顺便说一下,以前每个组中的容器被添加到类型为Collection<MessageListenerContainer>的bean中,bean名称是containerGroup。现在这些集合已被弃用,取而代之的是类型为ContainerGroup的bean,其bean名称是组名后缀加上.group;在上面的例子中,会有两个bean g1.group和g2.group。Collection类型的beans将在未来版本中移除。