对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9spring-doc.cadn.net.cn

重新平衡监听器

ContainerProperties有一个名为consumerRebalanceListener,它采用 Kafka 客户端的ConsumerRebalanceListener接口。 如果未提供此属性,则容器将配置一个日志记录侦听器,该侦听器在INFO水平。 该框架还添加了一个子接口ConsumerAwareRebalanceListener. 以下列表显示了ConsumerAwareRebalanceListener接口定义:spring-doc.cadn.net.cn

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {

    void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

}

请注意,撤销分区时有两个回调。 第一个立即调用。 第二个在提交任何挂起的偏移量后调用。 如果您希望在某个外部存储库中维护偏移量,这很有用,如以下示例所示:spring-doc.cadn.net.cn

containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

    @Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // acknowledge any pending Acknowledgments (if using manual acks)
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // ...
        store(consumer.position(partition));
        // ...
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // ...
        consumer.seek(partition, offsetTracker.getOffset() + 1);
        // ...
    }
});
从 2.4 版本开始,一种新的方法onPartitionsLost()已添加(类似于ConsumerRebalanceLister). 默认实现ConsumerRebalanceLister简单调用onPartionsRevoked. 默认实现ConsumerAwareRebalanceListener什么都不做。 当为侦听器容器提供自定义侦听器(任一类型)时,重要的是您的实现不要调用onPartitionsRevokedonPartitionsLost. 如果实现ConsumerRebalanceListener您应该覆盖默认方法。 这是因为侦听器容器将调用自己的onPartitionsRevoked从其实施onPartitionsLost在实现上调用该方法之后。 如果实现委托给默认行为,onPartitionsRevoked每次调用Consumer在容器的侦听器上调用该方法。