此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9! |
开始@KafkaListener
s 在序列中
一个常见的用例是在另一个侦听器使用了主题中的所有记录后启动侦听器。
例如,您可能希望在处理来自其他主题的记录之前将一个或多个压缩主题的内容加载到内存中。
从 2.7.3 版本开始,一个新组件ContainerGroupSequencer
已被引入。
它使用@KafkaListener
的containerGroup
属性将容器分组在一起,并在当前组中的所有容器都空闲时启动下一组中的容器。
最好用一个例子来说明。
@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}
@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}
@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}
@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}
@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}
在这里,我们有 4 个听众,分为两组,g1
和g2
.
在应用程序上下文初始化期间,排序器将autoStartup
属性设置为false
.
它还设置了idleEventInterval
对于任何容器(尚未设置一个容器)到提供的值(在本例中为 5000 毫秒)。
然后,当应用程序上下文启动排序器时,将启动第一组中的容器。
如ListenerContainerIdleEvent
收到 s,则每个容器中的每个单独的子容器都将停止。
当ConcurrentMessageListenerContainer
停止时,父容器将停止。
当一个组中的所有容器都已停止时,将启动下一个组中的容器。
组中的组或容器数没有限制。
默认情况下,最后一组中的容器 (g2
以上)在空闲时不会停止。
要修改该行为,请将stopLastGroupWhenIdle
自true
在音序器上。
顺便说一句,以前每个组中的容器都被添加到类型为Collection<MessageListenerContainer>
其中 bean 名称是containerGroup
.
这些集合现在已被弃用,取而代之的是类型为ContainerGroup
替换为 bean 名称,该名称是组名称,后缀为.group
;在上面的示例中,将有 2 个 beang1.group
和g2.group
.
这Collection
beans 将在将来的版本中删除。