过滤消息

在某些场景中,例如重新平衡,一个已经处理过的消息可能会被重新投递。 框架无法知道该消息是否已被处理。 这是应用程序级别的功能。 这被称为 幂等接收器 模式,Spring Integration 提供了它的 实现spring-doc.cadn.net.cn

The Spring for Apache Kafka 项目也通过 FilteringMessageListenerAdapter 类提供了一些帮助,该类可以包装你的 MessageListener。 此类接受一个 RecordFilterStrategy 的实现,其中你实现 filter 方法以指示消息是重复的并应被丢弃。 此类还有一个名为 ackDiscarded 的额外属性,用于指示适配器是否应确认丢弃的记录。 它默认值为 falsespring-doc.cadn.net.cn

当您使用 @KafkaListener 时,请在容器工厂上设置 RecordFilterStrategy(可选地设置 ackDiscarded),以便将监听器包装在适当的过滤适配器中。spring-doc.cadn.net.cn

此外,还提供一个 FilteringBatchMessageListenerAdapter,用于当您使用一批量 消息监听器 时。spring-doc.cadn.net.cn

@KafkaListener 接收到一个 List<ConsumerRecord<?, ?>> 而不是 ConsumerRecords 时,FilteringBatchMessageListenerAdapter 会被忽略,因为 ConsumerRecords 是不可变的。

从 2.8.4 版本开始,您可以使用在监听器注解上的 filter 属性来覆盖监听器容器工厂的默认 RecordFilterStrategyspring-doc.cadn.net.cn

@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
    ...
}

从 3.3 版本开始,忽略由按 RecordFilterStrategy 过滤产生的空批次是支持的。 在实现 RecordFilterStrategy 时,可以通过 ignoreEmptyBatch() 进行配置。 默认设置为 false,表示即使所有 ConsumerRecord 都被过滤掉,KafkaListener 也会被调用。spring-doc.cadn.net.cn

如果返回 true,当所有 ConsumerRecord 都被过滤掉时,KafkaListener 将不会被调用。 然而,提交到经纪人,执行仍然会进行。spring-doc.cadn.net.cn

如果返回 false,当所有 ConsumerRecord 都被过滤掉时,KafkaListener 将被调用spring-doc.cadn.net.cn

这里有的一些示例。spring-doc.cadn.net.cn

public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
    ...
    @Override
    public List<ConsumerRecord<String, String>> filterBatch(
            List<ConsumerRecord<String, String>> consumerRecords) {
        return List.of();
    }

    @Override
    public boolean ignoreEmptyBatch() {
        return true;
    }
}

// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
    ...
}

在这种情况下,IgnoreEmptyBatchRecordFilterStrategy 总是返回空列表,而 trueignoreEmptyBatch() 的结果。 因此 KafkaListener#listen(…​) 从头到尾都不会被调用。spring-doc.cadn.net.cn

public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
    ...
    @Override
    public List<ConsumerRecord<String, String>> filterBatch(
            List<ConsumerRecord<String, String>> consumerRecords) {
        return List.of();
    }

    @Override
    public boolean ignoreEmptyBatch() {
        return false;
    }
}

// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
    ...
}

然而在这种情况下,NotIgnoreEmptyBatchRecordFilterStrategy 总是返回空列表,ignoreEmptyBatch() 返回 false 作为结果。因此 KafkaListener#listen(…​) 总是会被调用。spring-doc.cadn.net.cn