向侦听器容器添加了版本 2.1.3 和方法。
以前,您可以在 a 中暂停使用者,然后通过侦听 a 来恢复它,这会提供对对象的访问。
虽然可以使用事件侦听器暂停空闲容器中的使用者,但在某些情况下,这不是线程安全的,因为无法保证在使用者线程上调用事件侦听器。
若要安全地暂停和恢复使用者,应在侦听器容器上使用 and 方法。
A 在下一个之前生效;A 在当前返回后立即生效。
当容器暂停时,它会继续到使用者,从而避免在使用组管理时重新平衡,但它不会检索任何记录。
有关详细信息,请参阅 Kafka 文档。pause()resume()ConsumerAwareMessageListenerListenerContainerIdleEventConsumerpauseresumepause()poll()resume()poll()poll()
从版本 2.1.5 开始,您可以调用以查看是否已被调用。
但是,消费者可能还没有真正停下来。 如果所有实例实际上都已暂停,则返回 true。isPauseRequested()pause()isConsumerPaused()Consumer
此外(也是从 2.1.5 开始),实例以容器作为属性发布,属性中涉及的实例。ConsumerPausedEventConsumerResumedEventsourceTopicPartitionpartitions
从版本 2.9 开始,当新的容器属性设置为 true 时,暂停在处理当前记录后生效。
默认情况下,当处理完上一次轮询的所有记录时,暂停将生效。
请参阅 pauseImmediate。pauseImmediate
以下简单的 Spring Boot 应用程序演示了如何使用容器注册表获取对方法容器的引用,并暂停或恢复其使用者以及接收相应的事件:@KafkaListener
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(2)
.replicas(1)
.build();
}
}
下面的列表显示了上述示例的结果:
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2