对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9! |
强制消费者再平衡
Kafka 客户端现在支持触发强制重新平衡的选项。
从版本开始3.1.2
,Spring for Apache Kafka 提供了一个选项,可以通过消息侦听器容器在 Kafka 消费者上调用此 API。
调用此 API 时,它只是提醒 Kafka 消费者触发强制重新平衡;实际的再平衡只会作为下一个poll()
操作。
如果已经有重新平衡正在进行中,则调用强制重新平衡是 NO-OP。
调用方必须等待当前重新平衡完成,然后才能调用另一个重新平衡。
请参阅 javadocsenforceRebalance
了解更多详情。
以下代码片段显示了使用消息侦听器容器强制重新平衡的本质。
@KafkaListener(id = "my.id", topics = "my-topic")
void listen(ConsumerRecord<String, String> in) {
System.out.println("From KafkaListener: " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
return args -> {
final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
System.out.println("Enforcing a rebalance");
Thread.sleep(5_000);
listenerContainer.enforceRebalance();
Thread.sleep(5_000);
};
}
如上面的代码所示,应用程序使用KafkaListenerEndpointRegistry
以访问消息侦听器容器,然后调用enforceRebalnce
API 在上面。
调用enforceRebalance
在侦听器容器上,它将调用委托给底层 Kafka 消费者。
Kafka 消费者将触发重新平衡,作为下一个poll()
操作。