此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9! |
过滤消息
Spring for Apache Kafka 项目还通过FilteringMessageListenerAdapter
类,它可以将MessageListener
. 此类采用RecordFilterStrategy
在其中实现filter
方法来表示消息是重复的,应该被丢弃。这有一个名为ackDiscarded
,指示适配器是否应确认已丢弃的记录。 是的false
默认情况下。
当您使用@KafkaListener
,将RecordFilterStrategy
(并且可选ackDiscarded
) ,以便侦听器被包装在适当的过滤适配器中。
此外,一个FilteringBatchMessageListenerAdapter
,用于使用批处理消息侦听器时。
这FilteringBatchMessageListenerAdapter 如果您的@KafkaListener 收到一个ConsumerRecords<?, ?> 而不是List<ConsumerRecord<?, ?>> 因为ConsumerRecords 是不可变的。 |
从 2.8.4 版本开始,您可以覆盖侦听器容器工厂的默认值RecordFilterStrategy
通过使用filter
属性。
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}
从版本 3.3 开始,忽略过滤结果的空批次RecordFilterStrategy
被支持。实现时RecordFilterStrategy
,可以通过ignoreEmptyBatch()
. 默认设置为false
指示KafkaListener
即使所有ConsumerRecord
s 被过滤掉。
如果true
返回时,KafkaListener
当所有ConsumerRecord
被过滤掉。但是,提交给代理,仍将被执行。
如果false
返回时,KafkaListener
当所有ConsumerRecord
被过滤掉。
这里有些例子。
public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return true;
}
}
// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
在这种情况下,IgnoreEmptyBatchRecordFilterStrategy
总是返回空列表并返回true
由于ignoreEmptyBatch()
. 因此KafkaListener#listen(…)
永远不会被调用。
public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return false;
}
}
// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
但是,在这种情况下,IgnoreEmptyBatchRecordFilterStrategy
总是返回空列表并返回false
由于ignoreEmptyBatch()
. 因此KafkaListener#listen(…)
always 将被调用。