此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9spring-doc.cadn.net.cn

过滤消息

在某些情况下,例如重新平衡,可能会重新传递已处理的消息。框架无法知道此类消息是否已被处理。这是一个应用程序级函数。这称为幂等接收器模式,Spring Integration 提供了它的实现spring-doc.cadn.net.cn

Spring for Apache Kafka 项目还通过FilteringMessageListenerAdapter类,它可以将MessageListener. 此类采用RecordFilterStrategy在其中实现filter方法来表示消息是重复的,应该被丢弃。这有一个名为ackDiscarded,指示适配器是否应确认已丢弃的记录。 是的false默认情况下。spring-doc.cadn.net.cn

当您使用@KafkaListener,将RecordFilterStrategy(并且可选ackDiscarded) ,以便侦听器被包装在适当的过滤适配器中。spring-doc.cadn.net.cn

此外,一个FilteringBatchMessageListenerAdapter,用于使用批处理消息侦听器时。spring-doc.cadn.net.cn

FilteringBatchMessageListenerAdapter如果您的@KafkaListener收到一个ConsumerRecords<?, ?>而不是List<ConsumerRecord<?, ?>>因为ConsumerRecords是不可变的。

从 2.8.4 版本开始,您可以覆盖侦听器容器工厂的默认值RecordFilterStrategy通过使用filter属性。spring-doc.cadn.net.cn

@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
    ...
}

从版本 3.3 开始,忽略过滤结果的空批次RecordFilterStrategy被支持。实现时RecordFilterStrategy,可以通过ignoreEmptyBatch(). 默认设置为false指示KafkaListener即使所有ConsumerRecords 被过滤掉。spring-doc.cadn.net.cn

如果true返回时,KafkaListener 所有ConsumerRecord被过滤掉。但是,提交给代理,仍将被执行。spring-doc.cadn.net.cn

如果false返回时,KafkaListener 所有ConsumerRecord被过滤掉。spring-doc.cadn.net.cn

这里有些例子。spring-doc.cadn.net.cn

public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
    ...
    @Override
    public List<ConsumerRecord<String, String>> filterBatch(
            List<ConsumerRecord<String, String>> consumerRecords) {
        return List.of();
    }

    @Override
    public boolean ignoreEmptyBatch() {
        return true;
    }
}

// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
    ...
}

在这种情况下,IgnoreEmptyBatchRecordFilterStrategy总是返回空列表并返回true由于ignoreEmptyBatch(). 因此KafkaListener#listen(…​)永远不会被调用。spring-doc.cadn.net.cn

public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
    ...
    @Override
    public List<ConsumerRecord<String, String>> filterBatch(
            List<ConsumerRecord<String, String>> consumerRecords) {
        return List.of();
    }

    @Override
    public boolean ignoreEmptyBatch() {
        return false;
    }
}

// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
    ...
}

但是,在这种情况下,IgnoreEmptyBatchRecordFilterStrategy总是返回空列表并返回false由于ignoreEmptyBatch(). 因此KafkaListener#listen(…​)always 将被调用。spring-doc.cadn.net.cn