这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4spring-doc.cadn.net.cn

消息监听器容器

两个 MessageListenerContainer 实现被提供:spring-doc.cadn.net.cn

The KafkaMessageListenerContainer 接收来自所有主题或分区的消息,所有消息都在单一线程上处理。 The ConcurrentMessageListenerContainer 将请求委托到一个或多个 KafkaMessageListenerContainer 实例以提供多线程消费。spring-doc.cadn.net.cn

从版本 2.2.7 开始,您可以向监听器容器添加一个 RecordInterceptor;在调用监听器之前会调用它,从而允许检查或修改记录。
如果拦截器返回 null,则不会调用监听器。
从版本 2.7 开始,它还有额外的方法,在监听器退出后被调用(正常情况下或抛出异常时)。
此外,从版本 2.7 开始,现在有一个 BatchInterceptor,为批处理监听器提供类似的功能。
另外,ConsumerAwareRecordInterceptor(和 BatchInterceptor)提供了对 Consumer<?, ?> 的访问。
例如,可以在拦截器中访问消费者指标。spring-doc.cadn.net.cn

您不应在这些拦截器中执行任何影响消费者位置和/或已提交偏移的方法;容器需要管理此类信息。
如果拦截器修改了记录(通过创建新记录),则必须保持topicpartitionoffset不变,以免产生意外的副作用,例如记录丢失。

可以使用 CompositeRecordInterceptorCompositeBatchInterceptor 来调用多个拦截器。spring-doc.cadn.net.cn

从版本 2.8 开始,默认情况下,使用事务时,在事务启动之前会调用拦截器。
您可以将监听器容器的 interceptBeforeTx 属性设置为 false,以便在事务启动后而不是之前调用拦截器。
从版本 2.9 开始,这将适用于任何事务管理器,而不仅仅是 KafkaAwareTransactionManager 的事务管理器。
这样允许拦截器参与由容器启动的 JDBC 事务。spring-doc.cadn.net.cn

从版本 2.3.8、2.4.6 开始,当并发度大于一时,ConcurrentMessageListenerContainer 现在支持静态成员资格group.instance.id1 处附加了 -nn。这与增加的 session.timeout.ms 结合使用,可以减少重新平衡事件,例如,在应用程序实例重启时。spring-doc.cadn.net.cn

使用KafkaMessageListenerContainer

以下构造函数可用:spring-doc.cadn.net.cn

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

它接收一个 ConsumerFactory 和有关主题和分区以及其他配置的信息,在一个 ContainerProperties 对象中。 ContainerProperties 具有以下构造函数:spring-doc.cadn.net.cn

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)

第一个构造函数采用TopicPartitionOffset个参数的数组,明确指示容器使用哪些分区(使用消费者assign()方法),并可选择一个初始偏移量。
正数值默认为绝对偏移量。
负数值默认相对于当前分区中的最后一个偏移量。
提供了第二个构造函数TopicPartitionOffset,它额外接受一个boolean参数。
如果这个值是true,则初始偏移量(正或负)相对于此消费者的当前位置。
当容器启动时应用这些偏移量。
第二个构造函数采用话题数组,Kafka根据group.id属性分配分区——在组中分布分区。
第三个构造函数使用正则表达式Pattern来选择话题。spring-doc.cadn.net.cn

要向容器分配一个 MessageListener,可以在创建容器时使用 ContainerProps.setMessageListener 方法。 以下示例显示了如何操作:spring-doc.cadn.net.cn

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

请注意,当创建DefaultKafkaConsumerFactory时,使用仅包含上述属性的构造函数意味着从配置中选择键和值Deserializer类。
或者,可以将Deserializer实例传递给DefaultKafkaConsumerFactory构造函数来获取密钥和/或值,在这种情况下所有消费者共享相同的实例。
另一种选择是提供Supplier<Deserializer>(从版本2.3开始),这些将在每个Consumer中用于获得单独的Deserializer实例:spring-doc.cadn.net.cn

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

refer to the javadoc for ContainerProperties for more information about the various properties that you can set.spring-doc.cadn.net.cn

从版本 2.1.1 开始,新增了一个名为 logContainerConfig 的属性。
当启用 trueINFO 日志记录时,每个监听器容器都会写入一条日志消息,总结其配置属性。spring-doc.cadn.net.cn

默认情况下,主题偏移量提交的记录级别为DEBUG。 从版本2.1.2开始,在ContainerProperties中有一个属性commitLogLevel可让您指定这些消息的日志级别。 例如,要将日志级别更改为INFO,可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);spring-doc.cadn.net.cn

从版本 2.2 开始,新增了一个名为 missingTopicsFatal 的容器属性(自 2.3.4 版本起默认为 false)。如果任何配置的主题在代理上不存在,则会阻止容器启动。如果容器配置为监听主题模式(正则表达式),则此设置不适用。
以前,容器线程会在 consumer.poll() 方法中循环等待主题出现,并记录大量日志消息。除了日志外,没有其他迹象表明存在问题。spring-doc.cadn.net.cn

从版本 2.8 开始,引入了一个新的容器属性 authExceptionRetryInterval。当容器从 KafkaConsumer 获取到任何 AuthenticationExceptionAuthorizationException 后,此属性会导致容器重试获取消息。例如,配置的用户被拒绝访问读取某个主题或凭据不正确时就可能发生这种情况。定义 authExceptionRetryInterval 允许在授予适当权限后容器恢复。spring-doc.cadn.net.cn

默认情况下,未配置任何间隔时间 - 认证和授权错误被视为致命错误,这将导致容器停止。

从版本 2.8 开始,在创建消费者工厂时,如果您以对象形式提供反序列化器(在构造函数中或通过 setter 方法),则工厂将调用 configure() 方法使用配置属性来配置它们。spring-doc.cadn.net.cn

使用ConcurrentMessageListenerContainer

单个构造函数类似于KafkaListenerContainer构造函数。下列表示了构造函数的签名:spring-doc.cadn.net.cn

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

它还有一个 concurrency 属性。 例如,container.setConcurrency(3) 创建了三个 KafkaMessageListenerContainer 实例。spring-doc.cadn.net.cn

如果容器属性配置为主题(或主题模式),Kafka 将使用其组管理功能在消费者之间分发分区。spring-doc.cadn.net.cn

当监听多个主题时,默认的分区分配可能不符合您的预期。 例如,如果您有三个主题,每个主题都有五个分区,并且您想使用 concurrency=15,则只会看到五个活动消费者,每个消费者从每个主题中被分配一个分区,其他10个消费者处于空闲状态。 这是因为默认Kafka ConsumerPartitionAssignorRangeAssignor(参见其Javadoc)。 对于这种场景,您可以考虑使用 RoundRobinAssignor,它会将分区跨所有消费者进行分布。 然后,每个消费者会被分配一个主题或分区。 要更改 ConsumerPartitionAssignor,可以在提供给 DefaultKafkaConsumerFactory 的属性中设置 partition.assignment.strategy 消费者属性 (ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。spring-doc.cadn.net.cn

使用Spring Boot时,可以按如下方式设置策略:spring-doc.cadn.net.cn

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

当容器属性配置为TopicPartitionOffset时,ConcurrentMessageListenerContainer会将TopicPartitionOffset实例分布到代理KafkaMessageListenerContainer实例上。spring-doc.cadn.net.cn

例如,如果提供了六个TopicPartitionOffset实例且concurrency3;每个容器获得两个分区。
对于五个TopicPartitionOffset实例,两个容器各获得两个分区,第三个容器获得一个分区。
如果concurrency大于TopicPartitions的数量,则调整concurrency使得每个容器获得一个分区。spring-doc.cadn.net.cn

如果设置了client.id属性,则会在其后面附加-n,其中n是与并发性相对应的消费者实例。当启用JMX时,需要此操作以提供唯一的MBean名称。

从版本 1.3 开始,MessageListenerContainer 提供了对底层 KafkaConsumer 的指标的访问。在 ConcurrentMessageListenerContainer 的情况下,metrics() 方法返回所有目标 KafkaMessageListenerContainer 实例的指标。Map<MetricName, ? extends Metric> 根据为底层 KafkaConsumer 提供的 client-id 对指标进行分组。spring-doc.cadn.net.cn

从版本2.3开始,ContainerProperties提供了一个idleBetweenPolls选项,允许监听器容器中的主循环在两次KafkaConsumer.poll()调用之间休眠。实际的睡眠间隔是从提供的选项和max.poll.interval.ms消费者配置与当前记录批处理时间之间的差值中选择最小值。spring-doc.cadn.net.cn

提交偏移量

提交偏移量有多种选择。
如果enable.auto.commit消费者属性为true,则Kafka会根据其配置自动提交偏移量。
如果是false,容器支持几种AckMode设置(将在下一列表中描述)。
默认的AckModeBATCH
从版本2.3开始,除非在配置中明确设置了框架,否则将enable.auto.commit设为false
以前,如果未设置该属性,则使用Kafka默认值(true)。spring-doc.cadn.net.cn

消费者 poll() 方法返回一个或多个 ConsumerRecords
对每个记录调用MessageListener
以下列表描述了容器针对每个 AckMode 执行的操作(当未使用事务时):
spring-doc.cadn.net.cn

  • RECORD: 记录监听器在处理完记录后返回时提交偏移量。spring-doc.cadn.net.cn

  • BATCH: 在处理完 poll() 返回的所有记录后提交偏移量。spring-doc.cadn.net.cn

  • TIME: 在处理完由 poll() 返回的所有记录时提交偏移量,只要自上次提交以来已超过 ackTimespring-doc.cadn.net.cn

  • COUNT: 确保自上次提交以来已收到 ackCount 条记录,并在由 poll() 返回的所有记录处理完成后提交偏移量。
    spring-doc.cadn.net.cn

  • COUNT_TIME: 类似于 TIMECOUNT,但如果任一条件为 true,则执行提交操作。spring-doc.cadn.net.cn

  • MANUAL: 消息监听器负责 acknowledge()Acknowledgment。在那之后,应用与BATCH相同的语义。spring-doc.cadn.net.cn

  • MANUAL_IMMEDIATE: 当监听器调用 Acknowledgment.acknowledge() 方法时,立即提交偏移量。spring-doc.cadn.net.cn

使用 事务时,偏移量会发送到事务中,并且语义等效于RECORDBATCH,具体取决于监听器类型(记录或批处理)。spring-doc.cadn.net.cn

MANUALMANUAL_IMMEDIATE 需要监听器是 AcknowledgingMessageListenerBatchAcknowledgingMessageListener 的实例。
请参阅消息监听器

根据 syncCommits 容器属性,使用消费者上的 commitSync()commitAsync() 方法。syncCommits 默认为 true;另请参阅 setSyncCommitTimeout。查看 setCommitCallback 获取异步提交的结果;默认回调是 LoggingCommitCallback,该回调会记录错误(在调试级别上记录成功)。spring-doc.cadn.net.cn

由于监听器容器具有自己的提交偏移量的机制,因此它更倾向于将 Kafka 的 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 设置为 false。从版本 2.3 开始,除非在消费者工厂或容器的消费者属性覆盖中明确设置,否则它会无条件地将其设为 false。spring-doc.cadn.net.cn

代码Acknowledgment具有以下方法:spring-doc.cadn.net.cn

public interface Acknowledgment {

    void acknowledge();

}

此方法使监听器能够控制何时提交偏移量。spring-doc.cadn.net.cn

从版本 2.3 开始,Acknowledgment 接口有两个额外的方法 nack(long sleep)nack(int index, long sleep)。第一个方法用于记录监听器,第二个方法用于批处理监听器。为您的监听器类型调用错误的方法将抛出一个 IllegalStateExceptionspring-doc.cadn.net.cn

如果您想提交一个部分批次,请使用 nack()。在使用事务时,设置AckModeMANUAL;调用nack()将成功处理的记录偏移量发送给事务。
nack()只能在调用监听器的消费者线程上调用。
nack() 在使用乱序提交时不允许。

使用记录监听器时,调用 nack() 会提交任何待处理的偏移量,丢弃上一次轮询中的剩余记录,并在其分区上执行查找操作,以便在下一次 poll() 中重新传递失败的记录和未处理的记录。可以通过设置 sleep 参数来暂停消费者在重新交付之前的运行。这与将容器配置为具有 DefaultErrorHandler 并抛出异常的功能类似。spring-doc.cadn.net.cn

nack() 在指定的休眠时间内暂停整个监听器,包括所有分配的分区。

使用批处理侦听器时,可以指定发生故障的批次中的索引。
当调用 nack() 时,将提交该索引之前记录的偏移量,并对失败和丢弃的记录所在的分区执行寻址操作,以便它们将在下一个poll()中重新交付。spring-doc.cadn.net.cn

有关更多信息,请参阅容器错误处理程序spring-doc.cadn.net.cn

消费者在睡眠期间被暂停,以便继续轮询代理以保持消费者的活跃状态。

spring-doc.cadn.net.cn

实际的睡眠时间及其分辨率取决于容器的pollTimeout,默认值为5秒。spring-doc.cadn.net.cn

最小睡眠时间等于pollTimeout,所有睡眠时间都是其倍数。spring-doc.cadn.net.cn

对于较小的睡眠时间或为了提高准确性,请考虑减少容器的pollTimeoutspring-doc.cadn.net.cn

从版本 3.0.10 开始,批处理监听器可以提交批次中部分记录的偏移量,使用 acknowledge(index) 作为 Acknowledgment 参数。
当调用此方法时,索引处(以及所有先前记录)的偏移量将被提交。
在执行了部分批次提交后,调用 acknowledge() 将提交剩余批次的偏移量。
以下限制适用:spring-doc.cadn.net.cn

这些限制会被强制执行,如果违反,该方法将抛出IllegalArgumentExceptionIllegalStateExceptionspring-doc.cadn.net.cn

监听器容器自动启动

监听器容器实现 SmartLifecycleautoStartup 默认为 true。 容器在较晚阶段启动(Integer.MAX-VALUE - 100)。 其他组件如果实现了 SmartLifecycle 来处理来自监听器的数据,则应在更早的阶段启动。 - 100 为后期阶段留出空间,以便组件可以在容器之后自动启动。spring-doc.cadn.net.cn