此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9spring-doc.cadn.net.cn

更改历史

自 3.2 以来 3.3 的新增功能

本节介绍从 3.2 版到 3.3 版所做的更改。 有关早期版本的更改,请参阅更改历史记录spring-doc.cadn.net.cn

DLT 主题命名约定

DLT 主题的命名约定已标准化,可始终使用“-dlt”后缀。此更改可确保兼容性并避免在不同重试解决方案之间转换时发生冲突。希望保留“.DLT“后缀行为需要通过设置适当的 DLT 名称属性来显式选择加入。spring-doc.cadn.net.cn

增强消费者群体的 Seek作

一种新方法getGroupId(),已添加到ConsumerSeekCallback接口。 此方法允许通过仅针对所需的使用者组来进行更具选择性的查找作。 这AbstractConsumerSeekAware现在还可以在多组侦听器场景中注册、检索和删除每个主题分区的所有回调,而不会遗漏任何回调。 请参阅新的 API (getSeekCallbacksFor(TopicPartition topicPartition),getTopicsAndCallbacks())了解更多详情。 有关更多详细信息,请参阅 Seek API 文档spring-doc.cadn.net.cn

使用 RecordFilterStrategy 在 Kafka 侦听器中可配置地处理空批次

RecordFilterStrategy现在支持忽略过滤产生的空批次。 这可以通过覆盖默认方法进行配置ignoreEmptyBatch(),默认为 false,确保KafkaListener即使所有ConsumerRecords被过滤掉。 有关详细信息,请参阅消息接收过滤文档spring-doc.cadn.net.cn

ConcurrentContainerStoppedEvent

ConcurentContainerMessageListenerContainer现在发出一个ConcurrentContainerStoppedEvent当其所有子容器都停止时。 有关更多详细信息,请参阅应用程序事件和ConcurrentContainerStoppedEventJavadocs。spring-doc.cadn.net.cn

原始记录密钥回复

使用时ReplyingKafkaTemplate,如果请求中的原始记录包含键,则同一键也将成为回复的一部分。 有关更多详细信息,请参阅参考文档的发送消息部分。spring-doc.cadn.net.cn

在 DeadLetterPublishingRecovererFactory 中自定义日志记录

使用时DeadLetterPublishingRecovererFactory,则用户应用程序可以覆盖maybeLogListenerException方法来自定义日志记录行为。spring-doc.cadn.net.cn

在 KafkaAdmin 中自定义管理客户端

扩展时KafkaAdmin,用户应用程序可能会覆盖createAdmin自定义管理员客户端创建的方法。spring-doc.cadn.net.cn

自定义 Kafka 流的实现

使用时KafkaStreamsCustomizer现在可以返回KafkaStreams对象通过覆盖initKafkaStreams方法。spring-doc.cadn.net.cn

批量侦听器的KafkaHeaders.DELIVERY_ATTEMPT

使用BatchListenerConsumerRecord可以有KafkaHeaders.DELIVERY_ATTMPTheader 在其 headers 字段中。 如果DeliveryAttemptAwareRetryListener设置为错误处理程序作为重试侦听器,每个ConsumerRecordhas delivery attempt 标头。 有关更多详细信息,请参阅 Kafka Headers for Batch Listenerspring-doc.cadn.net.cn

Kafka 指标侦听器和TaskScheduler

MicrometerProducerListener,MicrometerConsumerListenerKafkaStreamsMicrometerListener现在可以使用TaskScheduler. 看KafkaMetricsSupportJavaDocs 和 Micrometer 支持了解更多信息。spring-doc.cadn.net.cn

3.2 自 3.1 以来的新增功能

本节介绍从 3.1 版到 3.2 版所做的更改。 有关早期版本的更改,请参阅更改历史记录spring-doc.cadn.net.cn

Kafka 客户端版本

此版本需要 3.7.0kafka-clients. Kafka 客户端 3.7.0 版本引入了新的消费者组协议。 有关更多详细信息及其局限性,请参阅 KIP-848。 新的消费者组协议是抢先体验版本,不打算在生产中使用。 在此版本中仅建议用于测试目的。 因此,Spring for Apache Kafka 仅在kafka-client本身。 默认情况下,Spring for Apache Kafka 使用经典的消费者组协议,在测试新的消费者组协议时,需要通过group.protocol消费者的财产。spring-doc.cadn.net.cn

测试支持更改

kraft模式在EmbeddedKafka默认情况下,想要使用kraftmode 必须启用它。 这是由于在使用EmbeddedKafkakraft模式,尤其是在测试新的消费者组协议时。 新的使用者组协议仅在以下情况下受支持kraft模式,因此,在测试新协议时,需要针对真实的 Kafka 集群而不是基于KafkaClusterTestKitEmbeddedKafka是基于的。 此外,在运行多个KafkaListener方法EmbeddedKafkakraft模式。 在这些问题得到解决之前,kraft默认值EmbeddedKafka将保持为false.spring-doc.cadn.net.cn

Kafka Streams 交互式查询支持

新 APIKafkaStreamsInteractiveQuerySupport用于访问 Kafka Streams 交互式查询中使用的可查询存储。 有关更多详细信息,请参阅 Kafka Streams 交互式支持spring-doc.cadn.net.cn

TransactionId后缀策略

一个新的TransactionIdSuffixStrategy引入界面来管理transactional.id后缀。 默认实现是DefaultTransactionIdSuffixStrategy设置时maxCache大于零可以重用transactional.id在特定范围内,否则将通过递增计数器动态生成后缀。 有关详细信息,请参阅 Fixed TransactionIdSuffixspring-doc.cadn.net.cn

异步@KafkaListener返回

@KafkaListener(和@KafkaHandler) 方法现在可以返回异步返回类型包括CompletableFuture<?>,Mono<?>和 Kotlinsuspend功能。 有关详细信息,请参阅异步返回。spring-doc.cadn.net.cn

根据引发的异常将消息路由到自定义 DLT

现在可以根据消息处理期间引发的异常类型将消息重定向到自定义 DLT。 重定向规则通过RetryableTopic.exceptionBasedDltRoutingRetryTopicConfigurationBuilder.dltRoutingRules. 自定义 DLT 以及其他重试和死信主题是自动创建的。 有关详细信息,请参阅 根据引发的异常将消息路由到自定义 DLTspring-doc.cadn.net.cn

弃用 ContainerProperties transactionManager 属性

弃用transactionManager属性ContainerProperties赞成KafkaAwareTransactionManager,与一般相比,这是一种更窄的类型PlatformTransactionManager.请参阅 ContainerProperties事务同步spring-doc.cadn.net.cn

回滚处理后

一个新的AfterRollbackProcessor应用程序接口processBatch被提供。 有关详细信息,请参阅回滚后处理器spring-doc.cadn.net.cn

更改 @RetryableTopic SameIntervalTopicReuseStrategy 默认值

改变@RetryableTopic属性SameIntervalTopicReuseStrategy默认值为SINGLE_TOPIC. 有关 maxInterval 指数延迟,请参阅单个主题spring-doc.cadn.net.cn

非阻塞重试支持类级@KafkaListener

支持进程@RetryableTopic RetryTopicConfigurationProvider 中的类。

提供新的公共 API 来查找RetryTopicConfiguration. 请参阅查找 RetryTopicConfigurationspring-doc.cadn.net.cn

RetryTopicConfigurer 支持进程 MultiMethodKafkaListenerEndpoint。

RetryTopicConfigurer支持流程和注册MultiMethodKafkaListenerEndpoint. 这MultiMethodKafkaListenerEndpoint提供getter/setter对于属性defaultMethodmethods. 修改EndpointCustomizer严格来说MethodKafkaListenerEndpoint类型。 这EndpointHandlerMethod添加新的构造函数为提供的 bean 构造一个实例。提供新类EndpointHandlerMultiMethod处理用于重试端点的多方法。spring-doc.cadn.net.cn

新的 API 方法,用于根据用户提供的函数搜索偏移量

ConsumerCallback提供了一个新的 API 来根据用户定义的函数搜索偏移量,该函数将消费者中的当前偏移量作为参数。有关更多详细信息,请参阅 Seek API 文档spring-doc.cadn.net.cn

@PartitionOffset对 SeekPosition 的支持

添加seekPosition属性设置为@PartitionOffset支持TopicPartitionOffset.SeekPosition. 有关更多详细信息,请参阅手动分配spring-doc.cadn.net.cn

TopicPartitionOffset 中的新构造函数,接受一个函数来计算要搜索的偏移量

TopicPartitionOffset有一个新的构造函数,该构造函数采用用户提供的函数来计算要搜索的偏移量。 使用此构造函数时,框架使用当前使用者偏移位置的输入参数调用函数。 有关更多详细信息,请参阅 Seek API 文档spring-doc.cadn.net.cn

Spring Boot 应用程序名称作为默认客户端 ID 前缀

对于定义应用程序名称的 Spring Boot 应用程序,现在使用此名称 作为某些客户端类型的自动生成客户端 ID 的默认前缀。 有关更多详细信息,请参阅默认客户端 ID 前缀。spring-doc.cadn.net.cn

增强了 MessageListenerContainers 的检索

ListenerContainerRegistry提供了两个新的 API 动态查找和过滤MessageListenerContainer实例。getListenerContainersMatching(Predicate<String> idMatcher)按 ID 过滤,另一个是getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher)按 ID 和容器属性进行筛选。spring-doc.cadn.net.cn

通过提供更多跟踪标签来增强观察

KafkaTemplateObservation提供更多的跟踪标签(低基数)。KafkaListenerObservation提供了一个新的 API 来查找高基数键名称和更多跟踪标签(高基数或低基数)。 参见千分尺观察spring-doc.cadn.net.cn

自 3.0 以来 3.1 的新增功能

本节介绍从 3.0 版到 3.1 版所做的更改。 有关早期版本的更改,请参阅更改历史记录spring-doc.cadn.net.cn

Kafka 客户端版本

此版本需要 3.6.0kafka-clients.spring-doc.cadn.net.cn

嵌入式KafkaBroker

现在提供了一个额外的实现来使用Kraft而不是 Zookeeper。 有关更多信息,请参阅嵌入式 Kafka 代理spring-doc.cadn.net.cn

Json反序列化器

当发生反序列化异常时,SerializationExceptionmessage 不再包含表单Can’t deserialize data [[123, 34, 98, 97, 122, …​;每个数据字节的数值数组没有用处,对于大数据可能会很冗长。 当与ErrorHandlingDeserializerDeserializationException发送到错误处理程序包含data包含无法反序列化的原始数据的属性。 不与ErrorHandlingDeserializerKafkaConsumer将不断为显示主题/分区/偏移量和 Jackson 抛出的原因的同一记录发出异常。spring-doc.cadn.net.cn

容器后处理器

可以通过指定ContainerPostProcessor@KafkaListener注解。 这发生在创建容器后以及任何配置的ContainerCustomizer在容器工厂上配置。 有关更多信息,请参阅容器工厂spring-doc.cadn.net.cn

ErrorHandlingDeserializer

您现在可以添加Validator到这个解串器;如果委托Deserializer成功反序列化对象,但该对象未通过验证,则引发异常,类似于发生反序列化异常。 这允许将原始原始数据传递给错误处理程序。 看ErrorHandlingDeserializer了解更多信息。spring-doc.cadn.net.cn

可重试主题

更改后缀-retry-5000-retry什么时候@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC). 如果要保留后缀-retry-5000@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2"). 有关更多信息,请参阅主题命名spring-doc.cadn.net.cn

侦听器容器更改

手动分配分区时,使用null消费者group.idAckMode现在会自动强制MANUAL. 有关详细信息,请参阅手动分配所有分区spring-doc.cadn.net.cn

自 2.9 以来 3.0 的新增功能

Kafka 客户端版本

此版本需要 3.3.1kafka-clients.spring-doc.cadn.net.cn

恰好一次语义

EOSMode.V1(又名ALPHA) 不再受支持。spring-doc.cadn.net.cn

使用事务时,最低代理版本为 2.5。

有关详细信息,请参阅 Exactly Once 语义KIP-447spring-doc.cadn.net.cn

观察

现在支持使用微分尺启用定时器和跟踪的观察。 有关更多信息,请参阅观察spring-doc.cadn.net.cn

原生图像

支持创建本机映像。 有关详细信息,请参阅本机映像spring-doc.cadn.net.cn

全局单嵌入 Kafka

嵌入式 Kafka (EmbeddedKafkaBroker) 现在可以作为整个测试计划的单个全局实例启动。 有关更多信息,请参阅对多个测试类使用相同的代理spring-doc.cadn.net.cn

可重试主题更改

此功能不再被视为实验性功能(就其 API 而言),该功能本身从 2.7 开始就得到了支持,但破坏 API 更改的可能性比正常情况更大。spring-doc.cadn.net.cn

在此版本中,Non-Blocking Retries 基础架构 Bean 的引导已更改,以避免某些应用程序中出现的有关应用程序初始化的一些计时问题。spring-doc.cadn.net.cn

您现在可以设置不同的concurrency对于重试容器;默认情况下,并发与主容器相同。spring-doc.cadn.net.cn

@RetryableTopic现在可以用作自定义注释的元注释,包括对@AliasFor性能。spring-doc.cadn.net.cn

有关详细信息,请参阅配置spring-doc.cadn.net.cn

重试主题的默认复制因子现在是-1(使用代理默认值)。 如果您的代理版本低于 2.4 版,您现在需要显式设置该属性。spring-doc.cadn.net.cn

您现在可以配置多个@RetryableTopic同一应用程序上下文中同一主题的侦听器。 以前,这是不可能的。 有关更多信息,请参阅多个侦听器,同一主题spring-doc.cadn.net.cn

有重大 API 更改RetryTopicConfigurationSupport;具体来说,如果您覆盖destinationTopicResolver,kafkaConsumerBackoffManager和/或retryTopicConfigurer; 这些方法现在需要ObjectProvider<RetryTopicComponentFactory>参数。spring-doc.cadn.net.cn

侦听器容器更改

与使用者身份验证和授权失败相关的事件现在由容器发布。 有关详细信息,请参阅应用程序事件。spring-doc.cadn.net.cn

现在,您可以自定义使用者线程使用的线程名称。 有关更多信息,请参阅容器线程命名spring-doc.cadn.net.cn

容器属性restartAfterAuthException已被添加。 有关更多信息,请参阅侦听器容器属性spring-doc.cadn.net.cn

KafkaTemplate变化

该类返回的合约现在是CompletableFutures 而不是ListenableFutures. 看KafkaTemplate.spring-doc.cadn.net.cn

ReplyingKafkaTemplate变化

该类返回的合约现在是CompletableFutures 而不是ListenableFutures. 看ReplyingKafkaTemplate请求/回复Message<?>s.spring-doc.cadn.net.cn

@KafkaListener变化

现在可以使用自定义关联标头,该标头将在任何回复消息中回显。 请参阅末尾的注释ReplyingKafkaTemplate了解更多信息。spring-doc.cadn.net.cn

现在,您可以在处理整个批次之前手动提交批次的各个部分。 有关更多信息,请参阅提交偏移量。spring-doc.cadn.net.cn

KafkaHeaders变化

四个常量KafkaHeaders在 2.9.x 中被弃用的现已删除。spring-doc.cadn.net.cn

同样地RECEIVED_MESSAGE_KEY替换为RECEIVED_KEYRECEIVED_PARTITION_ID替换为RECEIVED_PARTITION.spring-doc.cadn.net.cn

测试更改

3.0.7 版引入了MockConsumerFactoryMockProducerFactory. 有关更多信息,请参阅模拟消费者和生产者spring-doc.cadn.net.cn

从版本 3.0.10 开始,默认情况下,嵌入式 Kafka 代理设置 Spring Boot 属性spring.kafka.bootstrap-servers到嵌入式代理的地址。spring-doc.cadn.net.cn

自 2.8 以来 2.9 的新增功能

Kafka 客户端版本

此版本需要 3.2.0kafka-clients.spring-doc.cadn.net.cn

错误处理程序更改

DefaultErrorHandler现在可以配置为暂停容器进行一次轮询,并使用上一次轮询的剩余结果,而不是查找剩余记录的偏移量。 有关详细信息,请参阅 DefaultErrorHandlerspring-doc.cadn.net.cn

DefaultErrorHandler现在有一个BackOffHandler财产。 有关更多信息,请参阅退后处理程序spring-doc.cadn.net.cn

侦听器容器更改

interceptBeforeTx现在适用于所有事务管理器(以前它仅在KafkaAwareTransactionManager被使用)。 请参阅 [interceptBeforeTx]。spring-doc.cadn.net.cn

新的容器属性pauseImmediate,允许容器在处理当前记录后暂停使用者,而不是在处理完上一次轮询中的所有记录后暂停使用者。 请参阅 [pauseImmediate]。spring-doc.cadn.net.cn

与消费者身份验证和授权相关的事件spring-doc.cadn.net.cn

标头映射器更改

您现在可以配置应映射哪些入站标头。 也可在 2.8.8 或更高版本中使用。 有关详细信息,请参阅消息头spring-doc.cadn.net.cn

KafkaTemplate变化

在 3.0 中,该类返回的 future 将为CompletableFutures 而不是ListenableFutures. 看KafkaTemplate以获取使用此版本时过渡的帮助。spring-doc.cadn.net.cn

ReplyingKafkaTemplate变化

该模板现在提供了一种方法来等待回复容器上的分配,以避免在初始化回复容器之前发送请求时出现争用。 也可在 2.8.8 或更高版本中使用。 看ReplyingKafkaTemplate.spring-doc.cadn.net.cn

在 3.0 中,该类返回的 future 将为CompletableFutures 而不是ListenableFutures. 看ReplyingKafkaTemplate请求/回复Message<?>s以获取使用此版本时过渡的帮助。spring-doc.cadn.net.cn

2.8 自 2.7 以来的新增功能

本节介绍从 2.7 版到 2.8 版所做的更改。 有关早期版本的更改,请参阅更改历史记录spring-doc.cadn.net.cn

Kafka 客户端版本

此版本需要 3.0.0kafka-clientsspring-doc.cadn.net.cn

包更改

与类型映射相关的类和接口已从…​support.converter…​support.mapping.spring-doc.cadn.net.cn

无序手动提交

侦听器容器现在可以配置为接受无序的手动偏移提交(通常是异步的)。 容器将延迟提交,直到确认丢失的偏移量。 有关详细信息,请参阅手动提交偏移量spring-doc.cadn.net.cn

@KafkaListener变化

现在可以指定侦听器方法是否是方法本身的批处理侦听器。 这允许将同一个容器工厂用于记录和批处理侦听器。spring-doc.cadn.net.cn

有关更多信息,请参阅 [batch-listeners]。spring-doc.cadn.net.cn

批处理侦听器现在可以处理转换异常。spring-doc.cadn.net.cn

RecordFilterStrategy,当与批处理侦听器一起使用时,现在可以在一次调用中筛选整个批处理。 有关更多信息,请参阅 [batch-listeners] 末尾的注释。spring-doc.cadn.net.cn

@KafkaListener注释现在具有filter属性,以覆盖容器工厂的RecordFilterStrategy只为这个听众。spring-doc.cadn.net.cn

@KafkaListener注释现在具有info属性;这用于填充新的侦听器容器属性listenerInfo. 然后,它用于填充KafkaHeaders.LISTENER_INFO每个记录中的标头,可用于RecordInterceptor,RecordFilterStrategy,或听者本身。 有关更多信息,请参阅 Listener Info HeaderAbstractMessageListenerContainer 属性spring-doc.cadn.net.cn

KafkaTemplate变化

您现在可以接收一条记录,给定主题、分区和偏移量。 看KafkaTemplate接收了解更多信息。spring-doc.cadn.net.cn

CommonErrorHandler添加

遗产GenericErrorHandler及其用于记录批处理侦听器的子接口层次结构已被新的单个接口所取代CommonErrorHandler与大多数旧版实现相对应的实现GenericErrorHandler. 请参阅容器错误处理程序将自定义旧版错误处理程序实现迁移到CommonErrorHandler了解更多信息。spring-doc.cadn.net.cn

侦听器容器更改

interceptBeforeTxcontainer 属性现在是true默认情况下。spring-doc.cadn.net.cn

authorizationExceptionRetryInterval属性已重命名为authExceptionRetryInterval现在适用于AuthenticationExceptions 除了AuthorizationExceptions 之前。 这两个异常都被视为致命异常,除非设置了此属性,否则容器将默认停止。spring-doc.cadn.net.cn

序列化器/解串化器更改

DelegatingByTopicSerializerDelegatingByTopicDeserializer现在提供了。 有关详细信息,请参阅委托序列化器和反序列化器spring-doc.cadn.net.cn

DeadLetterPublishingRecover变化

该物业stripPreviousExceptionHeaders现在true默认情况下。spring-doc.cadn.net.cn

现在有几种技术可以自定义将哪些标头添加到输出记录中。spring-doc.cadn.net.cn

有关详细信息,请参阅管理死信记录标题spring-doc.cadn.net.cn

可重试主题更改

现在,可以将同一工厂用于可重试和不可重试的主题。 有关更多信息,请参阅指定 ListenerContainerFactoryspring-doc.cadn.net.cn

现在有一个可管理的致命异常全球列表,这些异常将使失败的记录直接进入 DLT。 请参阅异常分类器以了解如何管理它。spring-doc.cadn.net.cn

您现在可以结合使用阻止和非阻止重试。 有关详细信息,请参阅组合阻塞和非阻塞重试。spring-doc.cadn.net.cn

使用可重试主题功能时抛出的 KafkaBackOffException 现在记录在 DEBUG 级别。 如果您需要将日志记录级别更改回 WARN 或将其设置为任何其他级别,请参阅更改 KafkaBackOffException 日志记录级别spring-doc.cadn.net.cn

2.6 和 2.7 之间的更改

Kafka 客户端版本

此版本需要 2.7.0kafka-clients. 从版本 2.7.1 开始,它还与 2.8.0 客户端兼容;请参阅覆盖 Spring Boot 依赖项spring-doc.cadn.net.cn

使用主题的非阻塞延迟重试

此版本中添加了这一重要的新功能。当严格排序不重要时,可以将失败的投放发送到另一个主题以供以后使用。可以配置一系列此类重试主题,但延迟会增加。请参阅 非阻塞重试 有关更多信息。spring-doc.cadn.net.cn

侦听器容器更改

onlyLogRecordMetadatacontainer 属性现在是true默认情况下。spring-doc.cadn.net.cn

新的容器属性stopImmediate现已推出。spring-doc.cadn.net.cn

有关更多信息,请参阅侦听器容器属性spring-doc.cadn.net.cn

使用BackOff在投递尝试之间(例如SeekToCurrentErrorHandlerDefaultAfterRollbackProcessor) 现在将在容器停止后不久退出回退间隔,而不是延迟停止。spring-doc.cadn.net.cn

错误处理程序和扩展的回滚处理器FailedRecordProcessor现在可以配置一个或多个RetryListener以接收有关重试和恢复进度的信息。spring-doc.cadn.net.cn

RecordInterceptor现在在侦听器返回后调用了其他方法(通常,或通过抛出异常)。 它还有一个子界面ConsumerAwareRecordInterceptor. 此外,现在还有一个BatchInterceptor用于批量侦听器。 有关详细信息,请参阅消息侦听器容器spring-doc.cadn.net.cn

@KafkaListener变化

您现在可以验证@KafkaHandler方法(类级侦听器)。 看@KafkaListener @Payload验证了解更多信息。spring-doc.cadn.net.cn

现在,您可以将rawRecordHeader属性MessagingMessageConverterBatchMessagingMessageConverter这导致原始的ConsumerRecord添加到转换后的Message<?>. 例如,如果您希望使用DeadLetterPublishingRecoverer在侦听器错误处理程序中。 有关更多信息,请参阅侦听器错误处理程序spring-doc.cadn.net.cn

您现在可以修改@KafkaListener应用程序初始化期间的注释。 看@KafkaListener属性修改了解更多信息。spring-doc.cadn.net.cn

DeadLetterPublishingRecover变化

现在,如果键和值都失败反序列化,则原始值将发布到 DLT。 以前,已填充值,但键DeserializationException留在标题中。 如果您对恢复器进行了子类化并覆盖了createProducerRecord方法。spring-doc.cadn.net.cn

此外,恢复程序在发布到目标解析程序之前验证目标解析器选择的分区是否实际存在。spring-doc.cadn.net.cn

有关详细信息,请参阅发布死信记录spring-doc.cadn.net.cn

ChainedKafkaTransactionManager已弃用

有关详细信息,请参阅事务。spring-doc.cadn.net.cn

ReplyingKafkaTemplate变化

现在有一种机制可以检查回复,如果存在某些条件,则未来异常失败。spring-doc.cadn.net.cn

支持发送和接收spring-messaging Message<?>s 已被添加。spring-doc.cadn.net.cn

Kafka 流更改

默认情况下,StreamsBuilderFactoryBean现在配置为不清理本地状态。 有关详细信息,请参阅配置spring-doc.cadn.net.cn

KafkaAdmin变化

新方法createOrModifyTopicsdescribeTopics已被添加。KafkaAdmin.NewTopics已添加以方便在单个 Bean 中配置多个主题。有关更多信息,请参阅 [configuring-topics]。spring-doc.cadn.net.cn

MessageConverter变化

现在可以添加一个spring-messaging SmartMessageConverterMessagingMessageConverter,允许基于contentType页眉。 有关更多信息,请参阅 Spring Messaging Message Conversionspring-doc.cadn.net.cn

测 序@KafkaListeners

ExponentialBackOffWithMaxRetries

一个新的BackOff提供了实现,可以更方便地配置最大重试次数。 看ExponentialBackOffWithMaxRetries实现了解更多信息。spring-doc.cadn.net.cn

条件委托错误处理程序

这些新的错误处理程序可以配置为委托给不同的错误处理程序,具体取决于异常类型。 有关详细信息,请参阅委派错误处理程序。spring-doc.cadn.net.cn

2.5 和 2.6 之间的更改

Kafka 客户端版本

此版本需要 2.6.0kafka-clients.spring-doc.cadn.net.cn

侦听器容器更改

默认值EOSMode现在BETA. 有关详细信息,请参阅 Exactly Once 语义spring-doc.cadn.net.cn

各种错误处理程序(扩展FailedRecordProcessor) 和DefaultAfterRollbackProcessor现在重置BackOff如果恢复失败。 此外,您现在可以选择BackOff根据失败的记录和/或异常使用。spring-doc.cadn.net.cn

您现在可以配置adviceChain在容器属性中。 有关更多信息,请参阅侦听器容器属性spring-doc.cadn.net.cn

将容器配置为发布时ListenerContainerIdleEvents,它现在发布一个ListenerContainerNoLongerIdleEvent发布空闲事件后收到记录时。 有关详细信息,请参阅应用程序事件检测空闲和无响应使用者spring-doc.cadn.net.cn

@KafkaListener变化

使用手动分区分配时,您现在可以指定一个通配符来确定哪些分区应重置为初始偏移量。此外,如果侦听器实现ConsumerSeekAware,onPartitionsAssigned()在手动分配之后调用。(在版本 2.5.5 中也添加了)。有关更多信息,请参阅显式分区分配spring-doc.cadn.net.cn

新增便捷方法AbstractConsumerSeekAware使寻找更容易。 有关更多信息,请参阅 [seek]。spring-doc.cadn.net.cn

ErrorHandler 更改

的子类FailedRecordProcessor(例如SeekToCurrentErrorHandler,DefaultAfterRollbackProcessor,RecoveringBatchErrorHandler) 现在可以配置为重置重试状态,如果异常类型与此记录之前发生的异常类型不同。spring-doc.cadn.net.cn

生产者工厂变更

您现在可以为生产者设置最大年龄,之后它们将被关闭并重新创建。 有关详细信息,请参阅事务。spring-doc.cadn.net.cn

您现在可以在DefaultKafkaProducerFactory已被创建。 例如,如果必须在凭据更改后更新 SSL 密钥/信任存储位置,这可能很有用。 看DefaultKafkaProducerFactory了解更多信息。spring-doc.cadn.net.cn

2.4 和 2.5 之间的更改

本节介绍从 2.4 版到 2.5 版所做的更改。 有关早期版本的更改,请参阅更改历史记录spring-doc.cadn.net.cn

消费者/生产者工厂变更

默认的消费者和生产者工厂现在可以在创建或关闭消费者或生产者时调用回调。 提供了原生 Micrometer 指标的实现。 有关更多信息,请参阅工厂监听器。spring-doc.cadn.net.cn

现在,您可以在运行时更改引导服务器属性,从而启用故障转移到另一个 Kafka 集群。 有关更多信息,请参阅连接到 Kafkaspring-doc.cadn.net.cn

StreamsBuilderFactoryBean变化

工厂 bean 现在可以在KafkaStreams创建或销毁。 提供了原生千分尺度量的实现。 有关更多信息,请参阅 KafkaStreams Micrometer 支持spring-doc.cadn.net.cn

Kafka 客户端版本

此版本需要 2.5.0kafka-clients.spring-doc.cadn.net.cn

类/包更改

SeekUtils已从o.s.k.supportpackage 到o.s.k.listener.spring-doc.cadn.net.cn

Delivery Attempts 标头

现在有一个选项可以添加一个标头,用于在使用某些错误处理程序和回滚处理器后跟踪传递尝试。 有关更多信息,请参阅 Delivery Attempts Headerspring-doc.cadn.net.cn

@KafkaListener变化

如果需要,现在将自动填充默认回复标头,当@KafkaListener返回类型为Message<?>. 有关详细信息,请参阅回复类型消息<?>spring-doc.cadn.net.cn

KafkaHeaders.RECEIVED_MESSAGE_KEY不再填充null当传入记录具有null钥匙;标题被完全省略。spring-doc.cadn.net.cn

@KafkaListener方法现在可以指定一个ConsumerRecordMetadata参数,而不是对元数据(如主题、分区等)使用离散标头。 有关更多信息,请参阅消费者记录元数据spring-doc.cadn.net.cn

侦听器容器更改

assignmentCommitOptioncontainer 属性现在是LATEST_ONLY_NO_TX默认情况下。 有关更多信息,请参阅侦听器容器属性spring-doc.cadn.net.cn

subBatchPerPartitioncontainer 属性现在是true默认情况下,使用事务时。 有关详细信息,请参阅事务。spring-doc.cadn.net.cn

一个新的RecoveringBatchErrorHandler现在提供了。spring-doc.cadn.net.cn

现在支持静态组成员身份。 有关详细信息,请参阅消息侦听器容器spring-doc.cadn.net.cn

配置增量/协作重新平衡时,如果偏移量未能提交,则非致命RebalanceInProgressException,则容器将尝试在重新平衡完成后重新提交仍分配给此实例的分区的偏移量。spring-doc.cadn.net.cn

默认错误处理程序现在是SeekToCurrentErrorHandler用于唱片听众和RecoveringBatchErrorHandler用于批量侦听器。 有关更多信息,请参阅容器错误处理程序spring-doc.cadn.net.cn

现在,您可以控制记录标准错误处理程序故意抛出的异常的级别。 有关更多信息,请参阅容器错误处理程序spring-doc.cadn.net.cn

getAssignmentsByClientId()方法,可以更轻松地确定并发容器中的哪些使用者被分配了哪些分区。 有关更多信息,请参阅侦听器容器属性spring-doc.cadn.net.cn

您现在可以禁止整个日志记录ConsumerRecords 出错,调试日志等。 看onlyLogRecordMetadata侦听器容器属性中spring-doc.cadn.net.cn

KafkaTemplate 更改

KafkaTemplate现在可以维护千分尺计时器。有关更多信息,请参阅监视spring-doc.cadn.net.cn

KafkaTemplate现在可以配置ProducerConfig属性来覆盖生产者工厂中的属性。 看KafkaTemplate了解更多信息。spring-doc.cadn.net.cn

一个RoutingKafkaTemplate现已提供。 看RoutingKafkaTemplate了解更多信息。spring-doc.cadn.net.cn

您现在可以使用KafkaSendCallback而不是ListenerFutureCallback以获得更窄的异常,从而更容易提取失败的异常ProducerRecord. 看KafkaTemplate了解更多信息。spring-doc.cadn.net.cn

Kafka 字符串序列化器/反序列化器

新增功能ToStringSerializer/StringDeserializers 以及关联的SerDe现在提供了。 有关详细信息,请参阅字符串序列化spring-doc.cadn.net.cn

Json反序列化器

JsonDeserializer现在可以更灵活地确定反序列化类型。 有关详细信息,请参阅使用方法确定类型spring-doc.cadn.net.cn

委托序列化器/解串化器

DelegatingSerializer现在可以处理“标准”类型,当出站记录没有标头时。 有关详细信息,请参阅委托序列化器和反序列化器spring-doc.cadn.net.cn

测试更改

KafkaTestUtils.consumerProps()辅助记录现在设置ConsumerConfig.AUTO_OFFSET_RESET_CONFIGearliest默认情况下。 有关更多信息,请参阅 JUnitspring-doc.cadn.net.cn

2.3 和 2.4 之间的更改

Kafka 客户端版本

此版本需要 2.4.0kafka-clients或更高,并支持新的增量再平衡功能。spring-doc.cadn.net.cn

ConsumerAwareRebalanceListener

喜欢ConsumerRebalanceListener,这个接口现在有一个额外的方法onPartitionsLost. 有关更多信息,请参阅 Apache Kafka 文档。spring-doc.cadn.net.cn

ConsumerRebalanceListener,默认实现不会调用onPartitionsRevoked. 相反,侦听器容器将在调用onPartitionsLost;因此,在实现时不应执行相同的作ConsumerAwareRebalanceListener.spring-doc.cadn.net.cn

有关更多信息,请参阅重新平衡侦听器末尾的重要说明。spring-doc.cadn.net.cn

通用错误处理程序

isAckAfterHandle()默认实现现在默认返回 true。spring-doc.cadn.net.cn

卡夫卡模板

KafkaTemplate现在支持非事务发布和事务发布。 看KafkaTemplate事务性和非事务性发布了解更多信息。spring-doc.cadn.net.cn

聚合回复卡夫卡模板

releaseStrategy现在是一个BiConsumer. 现在在超时后(以及记录到达时)调用它;第二个参数是true在超时后调用的情况下。spring-doc.cadn.net.cn

有关详细信息,请参阅聚合多个回复spring-doc.cadn.net.cn

侦听器容器

ContainerProperties提供authorizationExceptionRetryInterval选项,让侦听器容器在任何AuthorizationException是由KafkaConsumer. 请参阅其 JavaDocs 和KafkaMessageListenerContainer了解更多信息。spring-doc.cadn.net.cn

@KafkaListener

@KafkaListener注释具有新属性splitIterables;默认为 true。 当回复侦听器返回Iterable此属性控制返回结果是作为单个记录发送还是发送每个元素的记录。 看转发侦听器结果@SendTo更多信息spring-doc.cadn.net.cn

批处理侦听器现在可以使用BatchToRecordAdapter;例如,这允许在事务中处理批处理,而侦听器一次获取一条记录。 在默认实现中,一个ConsumerRecordRecoverer可用于处理批处理中的错误,而无需停止整个批处理 - 这在使用事务时可能很有用。 有关更多信息,请参阅使用批处理侦听器的事务spring-doc.cadn.net.cn

Kafka 流

StreamsBuilderFactoryBean接受新属性KafkaStreamsInfrastructureCustomizer. 这允许在创建流之前配置构建器和/或拓扑。 有关更多信息,请参阅 Spring Managementspring-doc.cadn.net.cn

2.2 和 2.3 之间的更改

本节介绍从 2.2 版到 2.3 版所做的更改。spring-doc.cadn.net.cn

提示、技巧和示例

添加了一个新章节提示、技巧和示例。 请提交 GitHub 问题和/或拉取请求以获取该章中的其他条目。spring-doc.cadn.net.cn

Kafka 客户端版本

此版本需要 2.3.0kafka-clients或更高。spring-doc.cadn.net.cn

类/包更改

TopicPartitionInitialOffset被弃用,取而代之的是TopicPartitionOffset.spring-doc.cadn.net.cn

配置更改

从 2.3.4 版本开始,missingTopicsFatalcontainer 属性默认为 false。 当这种情况为 true 时,如果代理关闭,应用程序将无法启动;许多用户受到此更改的影响;鉴于 Kafka 是一个高可用性平台,我们没有预料到在没有活动代理的情况下启动应用程序会是一个常见的用例。spring-doc.cadn.net.cn

生产者和消费者工厂的变化

DefaultKafkaProducerFactory现在可以配置为为每个线程创建一个生产者。 您还可以提供Supplier<Serializer>实例作为配置类(需要无参数构造函数)的替代方法,或使用Serializer实例,然后在所有生产者之间共享。 看DefaultKafkaProducerFactory了解更多信息。spring-doc.cadn.net.cn

同样的选项可用于Supplier<Deserializer>实例中的实例DefaultKafkaConsumerFactory. 看KafkaMessageListenerContainer了解更多信息。spring-doc.cadn.net.cn

侦听器容器更改

以前,错误处理程序收到ListenerExecutionFailedException(实际侦听器例外为cause) 当使用侦听器适配器(例如@KafkaListeners). 本机引发的异常GenericMessageListeners 被原封不动地传递给错误处理程序。 现在一个ListenerExecutionFailedException始终是参数(实际侦听器异常为cause),它提供对容器的group.id财产。spring-doc.cadn.net.cn

因为侦听器容器有自己的提交偏移量机制,所以它更喜欢 KafkaConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG成为false. 现在,它会自动将其设置为 false,除非在使用者工厂中特别设置或容器的使用者属性覆盖。spring-doc.cadn.net.cn

ackOnError属性现在是false默认情况下。spring-doc.cadn.net.cn

现在可以获得消费者的group.idlistener 方法中的属性。 看获取消费者group.id了解更多信息。spring-doc.cadn.net.cn

容器具有新属性recordInterceptor允许在调用侦听器之前检查或修改记录。 一个CompositeRecordInterceptor还提供了,以防您需要调用多个拦截器。 有关详细信息,请参阅消息侦听器容器spring-doc.cadn.net.cn

ConsumerSeekAware具有新方法,允许您相对于开始、结束或当前位置执行搜索,并搜索到大于或等于时间戳的第一个偏移量。 有关更多信息,请参阅 [seek]。spring-doc.cadn.net.cn

便利班AbstractConsumerSeekAware现在提供以简化搜索。 有关更多信息,请参阅 [seek]。spring-doc.cadn.net.cn

ContainerProperties提供idleBetweenPolls选项,让监听器容器中的主循环在KafkaConsumer.poll()调用。 请参阅其 JavaDocs 和KafkaMessageListenerContainer了解更多信息。spring-doc.cadn.net.cn

使用时AckMode.MANUAL(或MANUAL_IMMEDIATE),您现在可以通过调用nackAcknowledgment. 有关更多信息,请参阅提交偏移量。spring-doc.cadn.net.cn

现在可以使用 Micrometer 监控监听器性能Timers. 有关详细信息,请参阅监视spring-doc.cadn.net.cn

容器现在发布与启动相关的其他使用者生命周期事件。 有关详细信息,请参阅应用程序事件。spring-doc.cadn.net.cn

事务性批处理侦听器现在可以支持僵尸隔离。有关更多信息,请参阅事务spring-doc.cadn.net.cn

侦听器容器工厂现在可以配置ContainerCustomizer在创建和配置每个容器后进一步配置每个容器。有关更多信息,请参阅容器工厂spring-doc.cadn.net.cn

ErrorHandler 更改

SeekToCurrentErrorHandler现在将某些异常视为致命异常,并禁用这些异常的重试,在第一次失败时调用恢复器。spring-doc.cadn.net.cn

SeekToCurrentErrorHandlerSeekToCurrentBatchErrorHandler现在可以配置为应用BackOff(线程睡眠)。spring-doc.cadn.net.cn

从版本 2.3.2 开始,当错误处理程序在恢复失败的记录后返回时,将提交恢复记录的偏移量。spring-doc.cadn.net.cn

DeadLetterPublishingRecoverer,当与ErrorHandlingDeserializer,现在将发送到死信主题的消息的有效负载设置为无法反序列化的原始值。 以前,它是null以及提取DeserializationException从邮件头。 有关详细信息,请参阅发布死信记录spring-doc.cadn.net.cn

主题生成器

新职业TopicBuilder提供更方便的创建NewTopic @Beans 用于自动主题配置。 有关更多信息,请参阅 [configuring-topics]。spring-doc.cadn.net.cn

Kafka 流更改

您现在可以对StreamsBuilderFactoryBean创建者@EnableKafkaStreams. 有关更多信息,请参阅 Streams 配置spring-doc.cadn.net.cn

一个RecoveringDeserializationExceptionHandler现在提供了允许恢复具有反序列化错误的记录。 它可以与DeadLetterPublishingRecoverer将这些记录发送到死信主题。 有关详细信息,请参阅从反序列化异常中恢复spring-doc.cadn.net.cn

HeaderEnricher提供了 transformer,使用 SpEL 生成标头值。 有关详细信息,请参阅标头扩充器spring-doc.cadn.net.cn

MessagingTransformer已提供。 这允许 Kafka 流拓扑与 spring-messaging 组件(例如 Spring Integration 流)交互。 看MessagingProcessor和 SeeKStream了解更多信息。spring-doc.cadn.net.cn

JSON 组件更改

现在,所有 JSON 感知组件默认配置为 JacksonObjectMapperJacksonUtils.enhancedObjectMapper(). 这JsonDeserializer现在提供TypeReference基于构造函数,以便更好地处理目标通用容器类型。 还有一个JacksonMimeTypeModule已引入序列化org.springframework.util.MimeType到纯字符串。有关更多信息,请参阅其 JavaDocs 和 Serialization, Deserialization, and Message Conversionspring-doc.cadn.net.cn

一个ByteArrayJsonMessageConverter以及为所有 Json 转换器提供的新超类,JsonMessageConverter. 此外,一个StringOrBytesSerializer现已可用;它可以序列化byte[],BytesStringProducerRecords. 有关更多信息,请参阅 Spring Messaging Message Conversionspring-doc.cadn.net.cn

JsonSerializer,JsonDeserializerJsonSerde现在拥有流畅的 API,使编程配置更简单。有关更多信息,请参阅 javadocs、序列化、反序列化和消息转换以及 Streams JSON 序列化和反序列化。spring-doc.cadn.net.cn

回复卡夫卡模板

当回复超时时,未来将异常完成,并使用KafkaReplyTimeoutException而不是KafkaException.spring-doc.cadn.net.cn

此外,重载的sendAndReceive现在提供了允许在每条消息的基础上指定回复超时的方法。spring-doc.cadn.net.cn

聚合回复卡夫卡模板

扩展ReplyingKafkaTemplate通过聚合来自多个接收者的回复。 有关详细信息,请参阅聚合多个回复spring-doc.cadn.net.cn

交易变更

您现在可以覆盖生产者工厂的transactionIdPrefixKafkaTemplateKafkaTransactionManager. 看transactionIdPrefix了解更多信息。spring-doc.cadn.net.cn

新委派序列化器/反序列化器

该框架现在提供了一个委托SerializerDeserializer,利用标头来生成和使用具有多种键/值类型的记录。 有关详细信息,请参阅委托序列化器和反序列化器spring-doc.cadn.net.cn

新的重试解序列化器

该框架现在提供了一个委托RetryingDeserializer,以便在可能发生暂时性错误(如网络问题)时重试序列化。 有关详细信息,请参阅重试反序列化程序spring-doc.cadn.net.cn

2.1 和 2.2 之间的更改

Kafka 客户端版本

此版本需要 2.0.0kafka-clients或更高。spring-doc.cadn.net.cn

类和包更改

ContainerProperties类已从org.springframework.kafka.listener.configorg.springframework.kafka.listener.spring-doc.cadn.net.cn

AckMode枚举已从AbstractMessageListenerContainerContainerProperties.spring-doc.cadn.net.cn

setBatchErrorHandler()setErrorHandler()方法已从ContainerProperties对两者AbstractMessageListenerContainerAbstractKafkaListenerContainerFactory.spring-doc.cadn.net.cn

回滚处理后

一个新的AfterRollbackProcessor提供了策略。 有关详细信息,请参阅回滚后处理器spring-doc.cadn.net.cn

ConcurrentKafkaListenerContainerFactory变化

您现在可以使用ConcurrentKafkaListenerContainerFactory创建和配置任何ConcurrentMessageListenerContainer,不仅是那些@KafkaListener附注。 有关详细信息,请参阅容器工厂spring-doc.cadn.net.cn

侦听器容器更改

新的容器属性 (missingTopicsFatal)已被添加。 看KafkaMessageListenerContainer了解更多信息。spring-doc.cadn.net.cn

一个ConsumerStoppedEvent现在在使用者停止时发出。 有关详细信息,请参阅线程安全spring-doc.cadn.net.cn

批处理侦听器可以选择接收完整的ConsumerRecords<?, ?>对象而不是List<ConsumerRecord<?, ?>. 有关更多信息,请参阅 [batch-listeners]。spring-doc.cadn.net.cn

DefaultAfterRollbackProcessorSeekToCurrentErrorHandler现在可以恢复(跳过)不断失败的记录,默认情况下,在 10 次失败后恢复(跳过)。 可以将它们配置为将失败的记录发布到死信主题。spring-doc.cadn.net.cn

从 2.2.4 版本开始,在选择死信主题名称时可以使用消费者的组 ID。spring-doc.cadn.net.cn

ConsumerStoppingEvent已被添加。 有关详细信息,请参阅应用程序事件。spring-doc.cadn.net.cn

SeekToCurrentErrorHandler现在可以配置为在容器配置为AckMode.MANUAL_IMMEDIATE(自 2.2.4 起)。spring-doc.cadn.net.cn

@KafkaListener变化

您现在可以覆盖concurrencyautoStartup通过在注释上设置属性来获取侦听器容器工厂的属性。 现在,您可以添加配置以确定将哪些标头(如果有)复制到回复消息中。 看@KafkaListener注解了解更多信息。spring-doc.cadn.net.cn

您现在可以使用@KafkaListener作为你自己注解的元注解。 看@KafkaListener作为元注释了解更多信息。spring-doc.cadn.net.cn

现在可以更轻松地配置Validator@Payload验证。 看@KafkaListener @Payload验证了解更多信息。spring-doc.cadn.net.cn

您现在可以直接在 Comments 上指定 kafka 消费者属性;这些属性将覆盖消费者工厂中定义的具有相同名称的任何属性(自版本 2.2.4 起)。有关更多信息,请参阅 Annotation 属性。spring-doc.cadn.net.cn

标头映射更改

类型的标题MimeTypeMediaType现在在RecordHeader价值。 以前,它们被映射为 JSON,并且仅MimeType被解码。MediaType无法解码。它们现在是用于互作性的简单字符串。spring-doc.cadn.net.cn

此外,JsonKafkaHeaderMapper有一个新的addToStringClasses方法,允许指定应通过使用toString()而不是 JSON。有关更多信息,请参阅消息头spring-doc.cadn.net.cn

嵌入式 Kafka 更改

KafkaEmbeddedclass 及其KafkaRule接口已被弃用,取而代之的是EmbeddedKafkaBroker及其 JUnit 4EmbeddedKafkaRule包装纸。 这@EmbeddedKafka注释现在填充EmbeddedKafkaBrokerbean 而不是已弃用的KafkaEmbedded. 此更改允许使用@EmbeddedKafka在 JUnit 5 测试中。 这@EmbeddedKafka注释现在具有属性ports指定填充EmbeddedKafkaBroker. 有关详细信息,请参阅测试应用程序。spring-doc.cadn.net.cn

JsonSerializer/Deserializer 增强功能

现在,您可以使用生产者和使用者属性来提供类型映射信息。spring-doc.cadn.net.cn

反序列化程序上提供了新的构造函数,以允许使用提供的目标类型重写类型标头信息。spring-doc.cadn.net.cn

JsonDeserializer现在默认删除任何类型信息标头。spring-doc.cadn.net.cn

您现在可以配置JsonDeserializer使用 Kafka 属性忽略类型信息标头(从 2.2.3 开始)。spring-doc.cadn.net.cn

Kafka 流更改

流配置 bean 现在必须是KafkaStreamsConfiguration对象而不是StreamsConfig对象。spring-doc.cadn.net.cn

StreamsBuilderFactoryBean已从包中移动…​core…​config.spring-doc.cadn.net.cn

KafkaStreamBrancher引入是为了在条件分支构建在KStream实例。spring-doc.cadn.net.cn

有关更多信息,请参阅 Apache Kafka Streams 支持配置spring-doc.cadn.net.cn

事务 ID

当侦听器容器启动事务时,transactional.id现在是transactionIdPrefix附加<group.id>.<topic>.<partition>. 此更改允许对僵尸进行适当的围栏,如此处所述spring-doc.cadn.net.cn

2.0 和 2.1 之间的更改

Kafka 客户端版本

此版本需要 1.0.0kafka-clients或更高。spring-doc.cadn.net.cn

1.1.x 客户端在 2.2 版中原生受支持。spring-doc.cadn.net.cn

JSON 改进

StringJsonMessageConverterJsonSerializer现在在Headers,让转换器和JsonDeserializer在接收时创建特定类型,基于消息本身而不是固定的配置类型。 有关详细信息,请参阅序列化、反序列化和消息转换。spring-doc.cadn.net.cn

容器停止错误处理程序

现在为记录和批处理侦听器提供了容器错误处理程序,这些侦听器将侦听器抛出的任何异常视为致命/ 他们停止了容器。 有关详细信息,请参阅处理异常spring-doc.cadn.net.cn

暂停和恢复容器

侦听器容器现在具有pause()resume()方法(自 2.1.3 版起)。 有关更多信息,请参阅暂停和恢复侦听器容器spring-doc.cadn.net.cn

有状态重试

从版本 2.1.3 开始,您可以配置有状态重试。 有关详细信息,请参阅有状态重试spring-doc.cadn.net.cn

客户端 ID

从 2.1.1 版本开始,您现在可以将client.id前缀@KafkaListener. 以前,要自定义客户端 ID,您需要为每个侦听器设置一个单独的使用者工厂(和容器工厂)。 前缀以-n在使用并发时提供唯一的客户端 ID。spring-doc.cadn.net.cn

记录偏移量提交

默认情况下,主题偏移量提交的日志记录是使用DEBUG日志记录级别。从 2.1.2 版开始,中的一个新属性ContainerPropertiescommitLogLevel允许您指定这些消息的日志级别。 看KafkaMessageListenerContainer了解更多信息。spring-doc.cadn.net.cn

默认@KafkaHandler

从版本 2.1.3 开始,您可以指定@KafkaHandler类级别的注释@KafkaListener作为默认值。 看@KafkaListener在类上了解更多信息。spring-doc.cadn.net.cn

回复卡夫卡模板

从 2.1.3 版开始,一个KafkaTemplate提供给支持请求/回复语义。 看ReplyingKafkaTemplate了解更多信息。spring-doc.cadn.net.cn

ChainedKafkaTransactionManager

2.1.3 版本引入了ChainedKafkaTransactionManager. (现已弃用)。spring-doc.cadn.net.cn

2.0 的迁移指南

1.3 和 2.0 之间的更改

Spring 框架和 Java 版本

Spring for Apache Kafka 项目现在需要 Spring Framework 5.0 和 Java 8。spring-doc.cadn.net.cn

@KafkaListener变化

您现在可以注释@KafkaListener方法(以及类和@KafkaHandler方法)替换为@SendTo. 如果该方法返回结果,则将其转发到指定的主题。 看转发侦听器结果@SendTo了解更多信息。spring-doc.cadn.net.cn

消息侦听器

消息侦听器现在可以知道Consumer对象。 有关详细信息,请参阅 [message-listeners]。spring-doc.cadn.net.cn

ConsumerAwareRebalanceListener

Rebalance 监听器现在可以访问Consumer对象。 有关更多信息,请参阅重新平衡侦听器。spring-doc.cadn.net.cn

1.2 和 1.3 之间的更改

对事务的支持

0.11.0.0 客户端库添加了对事务的支持。 这KafkaTransactionManager并添加了对事务的其他支持。 有关详细信息,请参阅事务。spring-doc.cadn.net.cn

支持标头

0.11.0.0 客户端库添加了对消息头的支持。 这些现在可以映射到spring-messaging MessageHeaders. 有关详细信息,请参阅消息头spring-doc.cadn.net.cn

创建主题

0.11.0.0 客户端库提供了一个AdminClient,您可以使用它来创建主题。 这KafkaAdmin使用此客户端自动添加定义为@Bean实例。spring-doc.cadn.net.cn

支持 Kafka 时间戳

KafkaTemplate现在支持添加带有时间戳的记录的 API。 新增功能KafkaHeaders已介绍有关timestamp支持。 此外,新的KafkaConditions.timestamp()KafkaMatchers.hasTimestamp()添加了测试实用程序。 看KafkaTemplate,@KafkaListener注解测试应用程序了解更多详细信息。spring-doc.cadn.net.cn

@KafkaListener变化

您现在可以配置KafkaListenerErrorHandler处理异常。 有关详细信息,请参阅处理异常spring-doc.cadn.net.cn

默认情况下,@KafkaListener id属性现在用作group.id属性,覆盖消费者工厂中配置的属性(如果存在)。 此外,您可以显式配置groupId在注释上。 以前,您需要一个单独的容器工厂(和消费者工厂)才能使用不同的group.id值。 要恢复以前使用出厂配置的行为group.id,将idIsGroup属性设置为false.spring-doc.cadn.net.cn

@EmbeddedKafka注解

为方便起见,测试类级别@EmbeddedKafka提供注释,以注册KafkaEmbedded作为豆子。 有关详细信息,请参阅测试应用程序。spring-doc.cadn.net.cn

Kerberos 配置

现在支持配置 Kerberos。 有关更多信息,请参阅 JAAS 和 Kerberosspring-doc.cadn.net.cn

1.1 和 1.2 之间的更改

此版本使用 0.10.2.x 客户端。spring-doc.cadn.net.cn

1.0 和 1.1 之间的更改

Kafka 客户端

此版本使用 Apache Kafka 0.10.x.x 客户端。spring-doc.cadn.net.cn

批处理侦听器

侦听器可以配置为接收consumer.poll()作,而不是一次一个。spring-doc.cadn.net.cn

空有效负载

使用日志压缩时,空有效负载用于“删除”密钥。spring-doc.cadn.net.cn

初始偏移(Initial Offset)

显式分配分区时,您现在可以配置相对于使用者组的当前位置的初始偏移量,而不是绝对或相对于当前端的初始偏移量。spring-doc.cadn.net.cn

寻求

您现在可以查找每个主题或分区的位置。您可以使用它来设置初始化期间的初始位置,当组管理正在使用中并且 Kafka 分配分区时。您还可以在检测到空闲容器时或在应用程序执行中的任何任意点进行搜索。有关更多信息,请参阅 [seek]。spring-doc.cadn.net.cn