此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9spring-doc.cadn.net.cn

消息侦听器容器

MessageListenerContainer提供实现:spring-doc.cadn.net.cn

KafkaMessageListenerContainer接收来自单个线程上所有主题或分区的所有消息。 这ConcurrentMessageListenerContainer一个或多个委托KafkaMessageListenerContainer实例来提供多线程消费。spring-doc.cadn.net.cn

从 2.2.7 版本开始,您可以添加RecordInterceptor到侦听器容器;在调用侦听器之前,将调用它,允许检查或修改记录。 如果拦截器返回 null,则不会调用侦听器。 从 2.7 版开始,它有额外的方法,这些方法在侦听器退出后调用(通常,或通过抛出异常)。 此外,从 2.7 版本开始,现在有一个BatchInterceptor,为批处理侦听器提供类似的功能。 此外,ConsumerAwareRecordInterceptor(和BatchInterceptor) 提供对Consumer<?, ?>. 例如,这可用于访问拦截器中的使用者指标。spring-doc.cadn.net.cn

您不应执行任何影响消费者在这些拦截器中的位置和/或提交偏移量的方法;容器需要管理此类信息。
如果拦截器更改记录(通过创建新记录),则topic,partitionoffset必须保持不变,以避免意外副作用,例如记录丢失。

CompositeRecordInterceptorCompositeBatchInterceptor可用于调用多个拦截器。spring-doc.cadn.net.cn

从 4.0 版本开始,AbstractMessageListenerContainer公开getRecordInterceptor()作为公共方法。 如果返回的拦截器是CompositeRecordInterceptor附加RecordInterceptor即使在容器实例扩展之后,也可以向其中添加实例AbstractMessageListenerContainer已创建,并且RecordInterceptor已配置。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

public void configureRecordInterceptor(KafkaMessageListenerContainer<Integer, String> container) {
    CompositeRecordInterceptor compositeInterceptor;

    RecordInterceptor<Integer, String> previousInterceptor = container.getRecordInterceptor();
    if (previousInterceptor instanceof CompositeRecordInterceptor interceptor) {
        compositeInterceptor = interceptor;
    } else {
        compositeInterceptor = new CompositeRecordInterceptor<>();
        container.setRecordInterceptor(compositeInterceptor);
    }

    if (previousInterceptor != null) {
        compositeRecordInterceptor.addRecordInterceptor(previousInterceptor);
    }

    RecordInterceptor<Integer, String> recordInterceptor1 = new RecordInterceptor() {...};
    RecordInterceptor<Integer, String> recordInterceptor2 = new RecordInterceptor() {...};

    compositeInterceptor.addRecordInterceptor(recordInterceptor1);
    compositeInterceptor.addRecordInterceptor(recordInterceptor2);
}

默认情况下,从 2.8 版开始,在使用事务时,在事务开始之前调用拦截器。 您可以设置侦听器容器的interceptBeforeTx属性设置为false在事务开始后调用拦截器。 从 2.9 版开始,这将适用于任何事务管理器,而不仅仅是KafkaAwareTransactionManagers. 例如,这允许拦截器参与容器启动的 JDBC 事务。spring-doc.cadn.net.cn

从版本 2.3.8、2.4.6 开始,ConcurrentMessageListenerContainer现在支持并发大于 1 时的静态成员身份。 这group.instance.id后缀为-nn起价1. 这,加上增加的session.timeout.ms,可用于减少重新平衡事件,例如,当应用程序实例重新启动时。spring-doc.cadn.net.cn

KafkaMessageListenerContainer

以下构造函数可用:spring-doc.cadn.net.cn

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

它收到一个ConsumerFactory以及有关主题和分区以及其他配置的信息,在ContainerProperties对象。ContainerProperties具有以下构造函数:spring-doc.cadn.net.cn

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)

第一个构造函数采用TopicPartitionOffset参数来显式指示容器使用哪些分区(使用 consumerassign()方法),并具有可选的初始偏移量。 默认情况下,正值是绝对偏移量。 默认情况下,负值相对于分区内的当前最后一个偏移量。 的构造函数TopicPartitionOffset这需要额外的boolean参数。 如果这是true,初始偏移量(正或负)相对于此使用者的当前位置。 偏移量在容器启动时应用。 第二个采用主题数组,Kafka 根据group.idproperty — 在组中分配分区。 第三种使用正则表达式Pattern以选择主题。spring-doc.cadn.net.cn

要分配一个MessageListener到容器,您可以使用ContainerProps.setMessageListener方法。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

请注意,在创建DefaultKafkaConsumerFactory,使用仅采用上述属性的构造函数意味着键和值Deserializer类是从配置中选取的。 或者Deserializer实例可以传递给DefaultKafkaConsumerFactory键和/或值的构造函数,在这种情况下,所有使用者共享相同的实例。 另一种选择是提供Supplier<Deserializer>s(从 2.3 版开始),将用于获取单独的Deserializer每个实例Consumer:spring-doc.cadn.net.cn

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

请参阅 JavadocContainerProperties有关您可以设置的各种属性的更多信息。spring-doc.cadn.net.cn

从 2.1.1 版本开始,一个名为logContainerConfig可用。 什么时候trueINFO启用日志记录时,每个侦听器容器都会写入一条日志消息,汇总其配置属性。spring-doc.cadn.net.cn

默认情况下,主题偏移量提交的日志记录在DEBUG日志记录级别。 从 2.1.2 版开始,中的属性ContainerPropertiescommitLogLevel允许您指定这些消息的日志级别。 例如,要将日志级别更改为INFO,您可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);.spring-doc.cadn.net.cn

从 2.2 版开始,一个名为missingTopicsFatal已添加(默认:false自 2.3.4 起)。 如果代理上不存在任何配置的主题,这可以防止容器启动。 如果容器配置为侦听主题模式 (regex),则不适用。 以前,容器线程在consumer.poll()方法等待主题出现,同时记录许多消息。 除了日志之外,没有迹象表明存在问题。spring-doc.cadn.net.cn

从 2.8 版本开始,新的容器属性authExceptionRetryInterval已被引入。 这会导致容器在获取任何消息后重试获取消息AuthenticationExceptionAuthorizationExceptionKafkaConsumer. 例如,当配置的用户被拒绝访问读取某个主题或凭据不正确时,就会发生这种情况。 定义authExceptionRetryInterval允许容器在授予适当的权限时恢复。spring-doc.cadn.net.cn

默认情况下,不配置任何间隔 - 身份验证和授权错误被视为致命错误,这会导致容器停止。

从 2.8 版开始,在创建消费者工厂时,如果您将反序列化器作为对象提供(在构造函数中或通过 setter),工厂将调用configure()使用配置属性配置它们的方法。spring-doc.cadn.net.cn

ConcurrentMessageListenerContainer

单个构造函数类似于KafkaListenerContainer构造 函数。 以下列表显示了构造函数的签名:spring-doc.cadn.net.cn

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

它还有一个concurrency财产。 例如container.setConcurrency(3)创建三个KafkaMessageListenerContainer实例。spring-doc.cadn.net.cn

如果为主题(或主题模式)配置了容器属性,则 Kafka 会使用其组管理功能在使用者之间分配分区。spring-doc.cadn.net.cn

当监听多个主题时,默认的分区分布可能不是你所期望的。 例如,如果您有三个主题,每个主题有五个分区,并且您想使用concurrency=15,则只看到 5 个活动使用者,每个使用者从每个主题分配一个分区,其他 10 个使用者处于空闲状态。 这是因为默认的 KafkaConsumerPartitionAssignorRangeAssignor(参见其 Javadoc)。 对于这种情况,您可能需要考虑使用RoundRobinAssignor相反,它将分区分布在所有使用者之间。 然后,为每个使用者分配一个主题或分区。 要更改ConsumerPartitionAssignor,您可以将partition.assignment.strategy消费者属性 (ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG) 在提供给DefaultKafkaConsumerFactory.spring-doc.cadn.net.cn

使用 Spring Boot 时,您可以分配设置策略,如下所示:spring-doc.cadn.net.cn

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

当容器属性配置为TopicPartitionOffsets,则ConcurrentMessageListenerContainer分发TopicPartitionOffset跨委托的实例KafkaMessageListenerContainer实例。spring-doc.cadn.net.cn

如果,比如说,六个TopicPartitionOffset实例,并且concurrency3;每个容器都有两个分区。 五人份TopicPartitionOffset实例中,两个容器获得两个分区,第三个容器获得一个分区。 如果concurrency大于TopicPartitionsconcurrency向下调整,使每个容器都有一个分区。spring-doc.cadn.net.cn

client.id属性(如果设置)附加为-n哪里n是与并发相对应的使用者实例。 启用 JMX 时,这是为 MBean 提供唯一名称所必需的。

从 1.3 版本开始,MessageListenerContainer提供对基础指标的访问KafkaConsumer. 在以下情况下ConcurrentMessageListenerContainermetrics()方法返回所有目标的指标KafkaMessageListenerContainer实例。 这些指标被分组到Map<MetricName, ? extends Metric>通过client-id为标的KafkaConsumer.spring-doc.cadn.net.cn

从 2.3 版本开始,ContainerProperties提供idleBetweenPolls选项,让监听器容器中的主循环在KafkaConsumer.poll()调用。 从提供的选项中选择实际睡眠间隔作为最小值,并且max.poll.interval.ms消费者配置和当前记录的批处理时间。spring-doc.cadn.net.cn

提交偏移量

提供了几个用于提交偏移的选项。 如果enable.auto.commit消费者属性是true,Kafka 会根据其配置自动提交偏移量。 如果是false,容器支持多个AckMode设置(在下一个列表中描述)。 默认值AckModeBATCH. 从 2.3 版开始,框架集enable.auto.commitfalse除非在配置中显式设置。 以前,Kafka 默认值 (true如果未设置属性,则使用 )。spring-doc.cadn.net.cn

消费者poll()方法返回一个或多个ConsumerRecords. 这MessageListener为每条记录调用。 以下列表描述了容器对每个AckMode(当事务未被使用时):spring-doc.cadn.net.cn

使用事务时,偏移量被发送到事务,语义等同于RECORDBATCH,具体取决于侦听器类型(记录或批处理)。spring-doc.cadn.net.cn

MANUALMANUAL_IMMEDIATE要求监听器是AcknowledgingMessageListenerBatchAcknowledgingMessageListener. 请参阅消息侦听器

根据syncCommitscontainer 属性,则commitSync()commitAsync()方法。syncCommitstrue默认情况下;另请参阅setSyncCommitTimeout. 看setCommitCallback获取异步提交的结果;默认回调是LoggingCommitCallback它记录错误(并在调试级别成功)。spring-doc.cadn.net.cn

由于侦听器容器有自己的提交偏移量机制,因此它更喜欢 KafkaConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG成为false. 从 2.3 版开始,它无条件地将其设置为 false,除非在消费者工厂中专门设置或容器的消费者属性覆盖。spring-doc.cadn.net.cn

Acknowledgment有以下方法:spring-doc.cadn.net.cn

public interface Acknowledgment {

    void acknowledge();

}

此方法使侦听器能够控制何时提交偏移量。spring-doc.cadn.net.cn

从 2.3 版本开始,Acknowledgment接口有两个附加方法nack(long sleep)nack(int index, long sleep). 第一个用于记录侦听器,第二个用于批处理侦听器。 为侦听器类型调用错误的方法将抛出IllegalStateException.spring-doc.cadn.net.cn

如果要提交部分批处理,请使用nack(),使用事务时,将AckModeMANUAL;调用nack()将成功处理的记录的偏移量发送到交易。
nack()只能在调用侦听器的使用者线程上调用。
nack()使用无序提交时不允许。

使用记录侦听器时,当nack()调用时,将提交任何挂起的偏移量,丢弃上次轮询中的剩余记录,并在其分区上执行搜索,以便在下一个分区重新传递失败的记录和未处理的记录poll(). 消费者可以在重新交付之前暂停,方法是将sleep论点。 这类似于在容器配置了DefaultErrorHandler.spring-doc.cadn.net.cn

nack()在指定的睡眠持续时间内暂停整个侦听器,包括所有分配的分区。

使用批处理侦听器时,可以在发生故障的批处理中指定索引。 什么时候nack()调用时,将在索引之前为记录提交偏移量,并在失败和丢弃的记录的分区上执行搜索,以便在下一个记录时重新传递它们poll().spring-doc.cadn.net.cn

有关更多信息,请参阅容器错误处理程序spring-doc.cadn.net.cn

消费者在睡眠期间暂停,以便我们继续轮询代理以保持消费者处于活动状态。 实际睡眠时间及其分辨率取决于容器的pollTimeout默认为 5 秒。 最短睡眠时间等于pollTimeout所有的睡眠时间都将是它的倍数。 对于较短的睡眠时间,或者为了提高其准确性,请考虑减少容器的pollTimeout.

从 3.0.10 版开始,批处理侦听器可以使用acknowledge(index)Acknowledgment论点。 调用此方法时,将提交索引处记录的偏移量(以及所有先前的记录)。 叫acknowledge()执行部分批处理提交后,将提交批处理其余部分的偏移量。 以下限制适用:spring-doc.cadn.net.cn

这些限制将被强制执行,该方法将抛出一个IllegalArgumentExceptionIllegalStateException,具体取决于违规情况。spring-doc.cadn.net.cn

侦听器容器自动启动

侦听器容器实现SmartLifecycleautoStartuptrue默认情况下。 容器在后期阶段启动 (Integer.MAX-VALUE - 100). 实现的其他组件SmartLifecycle,以处理来自侦听器的数据,应在较早的阶段启动。 这- 100为后续阶段留出了空间,使组件能够在容器之后自动启动。spring-doc.cadn.net.cn