|
这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4! |
强制消费者重平衡
Kafka 客户端现在支持触发一个 强制重平衡 的选项。
从版本 3.1.2 开始,Spring for Apache Kafka 提供了通过消息监听器容器在 Kafka 消费者上调用此 API 的选项。
调用此 API 仅仅是提示 Kafka 消费者触发强制重平衡;实际的重平衡将在下一个 poll() 操作期间发生。
如果正在进行重平衡,调用强制重平衡是无操作(NO-OP)。
调用方必须等待当前重平衡完成后再调用另一个。
有关更多信息,请参阅 enforceRebalance 的 javadocs。
以下代码片段展示了使用消息监听器容器强制执行重新平衡的要点。
@KafkaListener(id = "my.id", topics = "my-topic")
void listen(ConsumerRecord<String, String> in) {
System.out.println("From KafkaListener: " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
return args -> {
final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
System.out.println("Enforcing a rebalance");
Thread.sleep(5_000);
listenerContainer.enforceRebalance();
Thread.sleep(5_000);
};
}
正如上面的代码所示,应用程序使用 KafkaListenerEndpointRegistry 来获取消息监听器容器的访问权限,然后在该容器上调用 enforceRebalance API。
当在监听器容器上调用 enforceRebalance 时,它会将调用委托到底层的 Kafka 消费者。
Kafka 消费者会在接下来的 poll() 操作中触发重新平衡。