此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9! |
重新平衡监听器
ContainerProperties
有一个名为consumerRebalanceListener
,它采用 Kafka 客户端的ConsumerRebalanceListener
接口。
如果未提供此属性,则容器将配置一个日志记录侦听器,该侦听器在INFO
水平。
该框架还添加了一个子接口ConsumerAwareRebalanceListener
.
以下列表显示了ConsumerAwareRebalanceListener
接口定义:
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
请注意,撤销分区时有两个回调。 第一个立即调用。 第二个在提交任何挂起的偏移量后调用。 如果您希望在某个外部存储库中维护偏移量,这很有用,如以下示例所示:
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
从 2.4 版本开始,一种新的方法onPartitionsLost() 已添加(类似于ConsumerRebalanceLister ).
默认实现ConsumerRebalanceLister 简单调用onPartitionsRevoked .
默认实现ConsumerAwareRebalanceListener 什么都不做。
当为侦听器容器提供自定义侦听器(任一类型)时,重要的是您的实现不要调用onPartitionsRevoked 从onPartitionsLost .
如果实现ConsumerRebalanceListener 您应该覆盖默认方法。
这是因为侦听器容器将调用自己的onPartitionsRevoked 从其实施onPartitionsLost 在实现上调用该方法之后。
如果实现委托给默认行为,onPartitionsRevoked 每次调用Consumer 在容器的侦听器上调用该方法。 |
Kafka 4.0 消费者再平衡协议
Spring for Apache Kafka 4.0 支持 Apache Kafka 4.0 的新消费者重新平衡协议 (KIP-848),该协议通过服务器驱动的增量分区分配增强了性能。 这减少了消费者群体的再平衡停机时间。
要启用新协议,请配置group.protocol
财产:
spring.kafka.consumer.properties.group.protocol=consumer
请记住,上面的属性是 Spring Boot 属性。 如果您没有使用 Spring Boot,您可能需要手动设置它,如下所示。
或者,以编程方式设置它:
Map<String, Object> props = new HashMap<>();
props.put("group.protocol", "consumer");
ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(props);
新协议与ConsumerAwareRebalanceListener
.
由于增量再平衡,onPartitionsAssigned
可以使用较小的分区集多次调用,这与传统协议的典型单回调不同。
新协议使用服务器端分区分配,忽略通过spring.kafka.consumer.partition-assignment-strategy
.
如果检测到自定义分配器,则会记录警告。
要使用自定义分配器,请将group.protocol=classic
(如果您未指定group.protocol
).