|
这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4! |
@KafkaListener 注解
The @KafkaListener 注解用于将 bean 方法指定为监听器容器的监听器。
该 bean 会被包装在 MessagingMessageListenerAdapter 中,可根据需要配置各种特性,例如转换器以将数据转换为匹配方法参数。
You can configure most attributes on the annotation with SpEL by using #{…} or property placeholders (${…}).
See the Javadoc for more information.
记录监听器
@KafkaListener 注解为简单的 POJO 监听器提供了一种机制。
以下示例展示了如何使用它:
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
这种机制要求在您要使用的 @EnableKafka 类之一上使用 @Configuration 注解,并使用一个监听器容器工厂来配置底层的 ConcurrentMessageListenerContainer。
默认情况下,期望有一个名为 kafkaListenerContainerFactory 的 bean。
以下示例展示了如何使用 ConcurrentMessageListenerContainer:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
...
return props;
}
}
请注意,要设置容器属性,必须使用在工厂上的getContainerProperties()方法。
它用作实际注入到容器中的属性的模板。
从版本 2.1.1 开始,您可以通过注解创建的消费者现在可以设置 client.id 属性。
clientIdPrefix 会附加 -n,其中 n 是在使用并发时表示容器编号的整数。
从 2.2 版本开始,您可以使用注解本身上的属性来覆盖容器工厂的 concurrency 和 autoStartup 属性。
这些属性可以是简单值、属性占位符或 SpEL 表达式。
以下示例展示了如何实现:
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
主题分区分配
你可以通过三种方式为@KafkaListener配置主题。
你必须通过这三种方式中的一种来配置主题。
@KafkaListener(id = "myListener", topics = "myTopic")
public void listen(String data) {
...
}
@KafkaListener(id = "myListener", topicPattern = "my.*")
public void listen(String data) {
...
}
@KafkaListener(id = "myListener", topicPartitions = { @TopicPartition(topic = "myTopic", partitions = { "0", "1" })})
public void listen(String data) {
...
}
你可以通过直接使用名称来配置主题。
在这种情况下,你也可以像 topics = {"myTopic1", myTopic2"} 一样配置多个主题。
您也可以使用topicPattern进行配置,这使得可以根据正则表达式进行主题订阅。
当使用这两种方式(主题或主题模式)之一配置时,Kafka 会根据消费者组自动分配分区。 或者,你可以通过显式配置主题和分区(可选地,还有初始偏移量)来配置 POJO 监听器。 以下示例展示了如何进行配置:
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
您可以为每个分区在partitions或partitionOffsets属性中指定一个,但不能同时指定两者。
与大多数注解属性类似,您可以使用SpEL表达式;例如,要生成一个包含大量分区的大列表,请参见手动分配所有分区。
从版本2.5.5开始,您可以对所有分配的分区应用初始偏移:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
The * 通配符表示 partitions 属性中的所有分区。
在每个 @TopicPartition 中,@PartitionOffset 与通配符的组合只能出现一次。
此外,当监听器实现ConsumerSeekAware时,即使使用手动赋值,onPartitionsAssigned现在也会被调用,这允许在该时间点进行任意的跳转操作。
从2.6.4版本开始,您可以指定一个由逗号分隔的分区列表,或分区范围:
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}
范围是包含在内的;上面的例子将分配分区 0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15。
相同的技巧也可以用于指定初始偏移量:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
初始偏移将应用于所有6个分区。
自 3.2 起,@PartitionOffset 支持 SeekPosition.END、SeekPosition.BEGINNING、SeekPosition.TIMESTAMP、seekPosition 匹配 SeekPosition 枚举名称:
@KafkaListener(id = "seekPositionTime", topicPartitions = {
@TopicPartition(topic = TOPIC_SEEK_POSITION, partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "723916800000", seekPosition = "TIMESTAMP"),
@PartitionOffset(partition = "1", initialOffset = "0", seekPosition = "BEGINNING"),
@PartitionOffset(partition = "2", initialOffset = "0", seekPosition = "END")
})
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
如果 seekPosition 设置为 END 或 BEGINNING 会忽略 initialOffset 和 relativeToCurrent。
如果 seekPosition 设置为 TIMESTAMP,initialOffset 表示时间戳。
手动确认
当使用 manual AckMode 时,你也可以通过 Acknowledgment 提供监听器。
要激活 manual AckMode,需要将 ContainerProperties 中的 ack-mode 设置为适当的 manual 模式。
以下示例也展示了如何使用不同的容器工厂。
此自定义容器工厂必须通过调用 getContainerProperties() 将 AckMode 设置为 manual 类型,然后调用 setAckMode 在其上。
否则,Acknowledgment 对象将为 null
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
从 4.1 版本开始,你可以直接在 @KafkaListener 注解上使用 ackMode 属性来覆盖容器工厂的默认 AckMode:
@KafkaListener(id = "manual", topics = "myTopic", ackMode = "MANUAL")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
ackMode属性接受对应于ContainerProperties.AckMode枚举值的字符串值。
这消除了为不同的确认模式创建单独容器工厂的需要。
该属性也可以通过SpEL表达式(#{…})或属性占位符(${…})进行配置。
消费者记录元数据
最后,可以从消息头获取记录的元数据。 您可以使用以下标题名称来检索消息的标题:
-
KafkaHeaders.OFFSET -
KafkaHeaders.RECEIVED_KEY -
KafkaHeaders.RECEIVED_TOPIC -
KafkaHeaders.RECEIVED_PARTITION -
KafkaHeaders.RECEIVED_TIMESTAMP -
KafkaHeaders.TIMESTAMP_TYPE
从2.5版本开始,如果传入的记录具有null键,则不再包含RECEIVED_KEY;在此之前,标题会被填充为null值。
此更改是为了使框架与spring-messaging约定保持一致,其中null值的标题不存在。
以下示例显示了如何使用 headers:
@KafkaListener(id = "qux", topics = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
参数注解(@Payload, @Header) 必须指定在监听器方法的具体实现上;如果定义在接口上,将无法检测到。 |
从 2.5 版本开始,您可以使用一个 ConsumerRecordMetadata 参数来接收记录元数据,而不再使用单独的头信息。
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
这包含了来自ConsumerRecord的所有数据,但不包含键和值。
批次监听器
从 1.1 版本开始,您可以配置 0 个方法以接收从消费者 poll 中获取的整个消费者记录批次。
| 非阻塞重试 不支持批处理监听器。 |
要配置监听器容器工厂以创建批处理监听器,可以设置batchListener属性。
以下示例展示了如何进行配置:
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
从 2.8 版本开始,您可以使用在 @KafkaListener 注解上的 batch 属性来覆盖工厂的 batchListener 属性。
这,结合对 容器错误处理程序 的更改,允许同一个工厂同时用于记录监听器和批处理监听器。 |
从 2.9.6 版本开始,容器工厂为 recordMessageConverter 和 batchMessageConverter 属性提供了单独的设置方法。
此前,只有一个 messageConverter 属性适用于记录监听器和批处理监听器。 |
以下示例显示如何接收一个payload列表:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}
主题、分区、偏移量等信息在与负载相对应的头信息中可用。以下示例展示了如何使用这些头信息:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
或者,你也可以通过每个偏移接收一个List或一个Message<?>个对象,以及消息中的其他详细信息,但该方法只能定义一个该参数(在使用手动提交时的额外Acknowledgment参数,以及/或Consumer<?, ?>参数除外)。
以下示例展示了如何实现:
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen1(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen2(List<Message<?>> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}
在这种情况下,不会对有效负载执行转换。
如果将 BatchMessagingMessageConverter 配置为 RecordMessageConverter,还可以向 Message 参数添加泛型类型,并且有效负载会被转换。批处理监听器中的有效负载转换可获取更多详细信息。
您还可以接收一个ConsumerRecord<?, ?>对象列表,但必须是方法上唯一的参数(在使用手动提交和Consumer<?, ?>个参数时可选定义Acknowledgment)。下面的例子展示了如何实现:
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
从版本2.2开始,监听器可以接收ConsumerRecords<?, ?>方法返回的完整的poll()对象,使监听器能够访问其他额外的方法,例如partitions()(它返回列表中的TopicPartition实例)和records(TopicPartition)(获取选择性记录)。
同样,此参数必须是该方法上唯一的参数(除了使用手动提交或Acknowledgment参数时可选的Consumer<?, ?>)
下面的例子展示了如何实现:
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
...
}
如果容器工厂配置了 RecordFilterStrategy,则对于 ConsumerRecords<?, ?> 监听器会被忽略,并发出 WARN 日志消息。只有在使用 List<?> 形式的监听器时才能对记录进行过滤。默认情况下,每次只过滤一条记录;从版本 2.8 开始,您可以覆盖 filterBatch 来一次性调用过滤整个批次。 |
注解属性
从版本 2.0 开始,如果存在 id 属性,则将其用作 Kafka 消费者 group.id 属性,并覆盖消费者工厂中配置的属性(如果存在)。
您也可以显式设置 groupId 或将 idIsGroup 设置为 false 来恢复以前使用消费者工厂 group.id 的行为。
您可以在大多数注解属性中使用属性占位符或SpEL表达式,如下例所示:
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
groupId = "#{someBean.someProperty}.group")
从版本2.1.2开始,SpEL表达式支持一个特殊的标记:__listener。它是一个伪bean名称,表示当前注解所在的bean实例。
考虑以下示例:
@Bean
public Listener listener1() {
return new Listener("topic1");
}
@Bean
public Listener listener2() {
return new Listener("topic2");
}
在前面的示例中提供了这些bean后,我们可以使用以下内容:
public class Listener {
private final String topic;
public Listener(String topic) {
this.topic = topic;
}
@KafkaListener(topics = "#{__listener.topic}",
groupId = "#{__listener.topic}.group")
public void listen(...) {
...
}
public String getTopic() {
return this.topic;
}
}
如果在极少数情况下您有一个实际名为__listener的bean,可以通过使用beanRef属性来更改表达式标记。以下示例显示了如何操作:
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")
从版本 2.2.4 开始,您可以在注解中直接指定 Kafka 消费者属性,这些属性将覆盖消费者工厂中配置的同名属性。不能以这种方式指定 group.id 和 client.id 属性;它们将被忽略;请使用 groupId 和 clientIdPrefix 注解属性来指定这些。
属性以单独的字符串形式指定,使用标准Java Properties文件格式:foo:bar、foo=bar或foo bar,如下例所示:
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
以下为< a href="1" class="0">使用 < code >2 示例中对应的监听器示例。
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}