|
这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4! |
重新平衡监听器
ContainerProperties有一个名为consumerRebalanceListener的属性,该属性采用Kafka客户端的ConsumerRebalanceListener接口的实现。
如果未提供此属性,容器会配置一个日志监听器,将重平衡事件记录在INFO级别日志中。
框架还添加了一个子接口ConsumerAwareRebalanceListener。
以下代码片段显示了ConsumerAwareRebalanceListener接口的定义:
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
注意,当分区被回收时,有两个回调:第一个会立即调用。 第二个会在任何待处理的偏移量提交之后调用。 这在你希望将偏移量维护在某个外部存储库时很有用,如下例所示:
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
从 2.4 版本开始,新增了一个方法 onPartitionsLost()(与在 ConsumerRebalanceLister 中同名的方法类似)。
在 ConsumerRebalanceLister 上的默认实现简单地调用 onPartitionsRevoked。
在 ConsumerAwareRebalanceListener 上的默认实现什么都不做。
当向监听器容器提供自定义监听器(无论是哪种类型)时,重要的是你的实现不要从 onPartitionsLost 调用 onPartitionsRevoked。
如果你实现了 ConsumerRebalanceListener,你应该覆盖默认方法。
这是因为监听器容器会在调用其实现的 9》 上的 |