这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4spring-doc.cadn.net.cn

重新平衡监听器

ContainerProperties有一个名为consumerRebalanceListener的属性,该属性采用Kafka客户端的ConsumerRebalanceListener接口的实现。 如果未提供此属性,容器会配置一个日志监听器,将重平衡事件记录在INFO级别日志中。 框架还添加了一个子接口ConsumerAwareRebalanceListener。 以下代码片段显示了ConsumerAwareRebalanceListener接口的定义:spring-doc.cadn.net.cn

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {

    void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

}

注意,当分区被回收时,有两个回调:第一个会立即调用。 第二个会在任何待处理的偏移量提交之后调用。 这在你希望将偏移量维护在某个外部存储库时很有用,如下例所示:spring-doc.cadn.net.cn

containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

    @Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // acknowledge any pending Acknowledgments (if using manual acks)
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // ...
        store(consumer.position(partition));
        // ...
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // ...
        consumer.seek(partition, offsetTracker.getOffset() + 1);
        // ...
    }
});
从 2.4 版本开始,新增了一个方法 onPartitionsLost()(与在 ConsumerRebalanceLister 中同名的方法类似)。 在 ConsumerRebalanceLister 上的默认实现简单地调用 onPartitionsRevoked。 在 ConsumerAwareRebalanceListener 上的默认实现什么都不做。 当向监听器容器提供自定义监听器(无论是哪种类型)时,重要的是你的实现不要从 onPartitionsLost 调用 onPartitionsRevoked。 如果你实现了 ConsumerRebalanceListener,你应该覆盖默认方法。 这是因为监听器容器会在调用其实现的 9》 上的 onPartitionsRevoked 方法后,再调用你实现上的该方法。 如果你的实现委托到默认行为,onPartitionsRevoked 每次 Consumer 调用该方法时都会被调用两次。

Kafka 4.0 消费者重平衡协议

Spring for Apache Kafka 4.0 支持 Apache Kafka 4.0 的新消费者重新平衡协议(KIP-848),该协议通过服务器驱动的、增量的分区分配来提升性能。 这减少了消费者组的重新平衡停机时间。spring-doc.cadn.net.cn

启用新协议,配置 group.protocol 属性:spring-doc.cadn.net.cn

spring.kafka.consumer.properties.group.protocol=consumer

请注意,上述属性是 Spring Boot 的属性。 如果你没有使用 Spring Boot,你可能需要像下面这样手动设置。spring-doc.cadn.net.cn

或者,通过程序设置:spring-doc.cadn.net.cn

Map<String, Object> props = new HashMap<>();
props.put("group.protocol", "consumer");
ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(props);

新的协议与 ConsumerAwareRebalanceListener 工作无缝衔接。 由于增量重平衡,onPartitionsAssigned 可能会使用较小的分区集合被多次调用,这与传统协议中单次回调的模式不同。spring-doc.cadn.net.cn

新的协议使用服务器端的分区分配,忽略通过spring.kafka.consumer.partition-assignment-strategy设置的客户端自定义分配器。 如果检测到自定义分配器,会记录警告。 要使用自定义分配器,请设置group.protocol=classic(如果未指定group.protocol的值则默认为此设置)。spring-doc.cadn.net.cn