暂停与恢复监听器容器

版本 2.1.3 为监听器容器新增了 pause()resume() 方法。 此前,您可以在 ConsumerAwareMessageListener 中暂停消费者,并通过监听 ListenerContainerIdleEvent 事件来恢复,这提供了对 Consumer 对象的访问。 虽然可以通过事件监听器在空闲容器中暂停消费者,但在某些情况下这并非线程安全,因为无法保证事件监听器在消费者线程中被调用。 为了安全地暂停和恢复消费者,应使用监听器容器上的 pauseresume 方法。 一个 pause() 在下一次 poll() 之前生效;一个 resume() 在当前 poll() 返回后生效。 当容器被暂停时,它会继续 poll() 消费者,避免在使用组管理时发生重新平衡,但不会检索任何记录。 有关更多信息,请参阅 Kafka 文档。spring-doc.cadn.net.cn

从版本 2.1.5 开始,您可以调用 isPauseRequested() 来检查 pause() 是否已经调用。 然而,消费者可能尚未实际暂停。 isConsumerPaused() 返回 true 当所有 Consumer 实例实际上已暂停。spring-doc.cadn.net.cn

此外(自 2.1.5 起也包括),ConsumerPausedEventConsumerResumedEvent 实例会以 source 属性作为容器发布,而涉及在 partitions 属性中的 TopicPartition 实例也会被发布。spring-doc.cadn.net.cn

从2.9版本开始,一个新的容器属性pauseImmediate,当设置为true时,会在当前记录处理完成后生效。 默认情况下,暂停会在上一次poll的所有记录都处理完后生效。 参见pauseImmediatespring-doc.cadn.net.cn

以下这个简单的Spring Boot应用程序演示了如何使用容器注册表获取对某个方法的容器的引用,并暂停或恢复其消费者,以及接收相应的事件:spring-doc.cadn.net.cn

@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("pause.resume.topic")
            .partitions(2)
            .replicas(1)
            .build();
    }

}

以下代码列表显示了前述示例的结果:spring-doc.cadn.net.cn

partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2