|
这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4! |
过滤消息
在某些场景中,例如重新平衡,一个已经处理过的消息可能会被重新投递。 框架无法知道该消息是否已被处理。 这是应用程序级别的功能。 这被称为 幂等接收器 模式,Spring Integration 提供了它的 实现。
The Spring for Apache Kafka 项目也通过 FilteringMessageListenerAdapter 类提供了一些帮助,该类可以包装你的 MessageListener。
此类接受一个 RecordFilterStrategy 的实现,其中你实现 filter 方法以指示消息是重复的并应被丢弃。
此类还有一个名为 ackDiscarded 的额外属性,用于指示适配器是否应确认丢弃的记录。
它默认值为 false。
当您使用 @KafkaListener 时,请在容器工厂上设置 RecordFilterStrategy(可选地设置 ackDiscarded),以便将监听器包装在适当的过滤适配器中。
此外,还提供一个 FilteringBatchMessageListenerAdapter,用于当您使用一批量 消息监听器 时。
当 @KafkaListener 接收到一个 List<ConsumerRecord<?, ?>> 而不是 ConsumerRecords 时,FilteringBatchMessageListenerAdapter 会被忽略,因为 ConsumerRecords 是不可变的。 |
从 2.8.4 版本开始,您可以使用在监听器注解上的 filter 属性来覆盖监听器容器工厂的默认 RecordFilterStrategy。
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}
从 3.3 版本开始,忽略由按 RecordFilterStrategy 过滤产生的空批次是支持的。
在实现 RecordFilterStrategy 时,可以通过 ignoreEmptyBatch() 进行配置。
默认设置为 false,表示即使所有 ConsumerRecord 都被过滤掉,KafkaListener 也会被调用。
如果返回 true,当所有 ConsumerRecord 都被过滤掉时,KafkaListener 将不会被调用。
然而,提交到经纪人,执行仍然会进行。
如果返回 false,当所有 ConsumerRecord 都被过滤掉时,KafkaListener 将被调用。
这里有的一些示例。
public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return true;
}
}
// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
在这种情况下,IgnoreEmptyBatchRecordFilterStrategy 总是返回空列表,而 true 是 ignoreEmptyBatch() 的结果。
因此 KafkaListener#listen(…) 从头到尾都不会被调用。
public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return false;
}
}
// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
然而在这种情况下,NotIgnoreEmptyBatchRecordFilterStrategy 总是返回空列表,ignoreEmptyBatch() 返回 false 作为结果。因此 KafkaListener#listen(…) 总是会被调用。