|
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.5! |
筛选消息
Spring for Apache Kafka 项目还通过FilteringMessageListenerAdapter类,它可以包装您的MessageListener.
此类采用RecordFilterStrategy其中,您可以实现filter方法来表示消息是重复的,应该被丢弃。
这有一个名为ackDiscarded,它指示适配器是否应确认丢弃的记录。
是的false默认情况下。
当您使用@KafkaListener中,将RecordFilterStrategy(并且可选ackDiscarded),以便将侦听器包装在适当的筛选适配器中。
此外,一个FilteringBatchMessageListenerAdapter,用于使用批处理消息侦听器。
这FilteringBatchMessageListenerAdapter如果您的@KafkaListener接收一个ConsumerRecords<?, ?>而不是List<ConsumerRecord<?, ?>>因为ConsumerRecords是不可变的。 |
从版本 2.8.4 开始,您可以覆盖侦听器容器工厂的默认值RecordFilterStrategy通过使用filter属性。
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}
从版本 3.3 开始,忽略筛选依据RecordFilterStrategy受支持。
实现RecordFilterStrategy,它可以通过ignoreEmptyBatch().
默认设置为false指示KafkaListener将调用,即使所有ConsumerRecord被过滤掉。
如果true返回KafkaListener 不会在调用所有ConsumerRecord被过滤掉。
但是,commit to broker 仍将被执行。
如果false返回KafkaListener 将在所有ConsumerRecord被过滤掉。
以下是一些示例。
public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return true;
}
};
// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
在这种情况下,IgnoreEmptyBatchRecordFilterStrategy始终返回空列表并返回true作为ignoreEmptyBatch().
因此KafkaListener#listen(…)永远不会被调用。
public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return false;
}
};
// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
但是,在这种情况下,IgnoreEmptyBatchRecordFilterStrategy始终返回空列表并返回false作为ignoreEmptyBatch().
因此KafkaListener#listen(…)always 将被调用。