|
对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.6! |
重新平衡侦听器
ContainerProperties具有一个名为consumerRebalanceListener,它采用 Kafka 客户端的ConsumerRebalanceListener接口。
如果未提供此属性,则容器会配置一个日志记录侦听器,该侦听器在INFO水平。
框架还添加了一个子接口ConsumerAwareRebalanceListener.
下面的清单显示了ConsumerAwareRebalanceListener接口定义:
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
请注意,当 partitions 被撤销时,有两个回调。 第一个会立即调用。 第二个 is 在提交任何待处理的偏移量后调用。 如果您希望在某些外部存储库中维护偏移量,这非常有用,如下例所示:
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
从版本 2.4 开始,新方法onPartitionsLost()已添加(类似于ConsumerRebalanceLister).
默认实现ConsumerRebalanceLister只需调用onPartionsRevoked.
默认实现ConsumerAwareRebalanceListener什么都不做。
当为侦听器容器提供自定义侦听器(任一类型)时,您的实现必须不要调用onPartitionsRevoked从onPartitionsLost.
如果您实现ConsumerRebalanceListener您应该覆盖 default 方法。
这是因为侦听器容器将调用自己的onPartitionsRevoked从其onPartitionsLost在对实现调用 方法后。
如果 implementation delegate to default 行为,onPartitionsRevoked将调用两次,每次Consumer在容器的侦听器上调用该方法。 |