这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4spring-doc.cadn.net.cn

变更历史

4.0 有哪些新内容(自 3.3 起)

本部分涵盖从3.3版本到4.0版本的更改。 对于更早版本的更改,请参见 变更历史spring-doc.cadn.net.cn

Apache Kafka 4.0 客户端升级

Spring for Apache Kafka 已升级以使用 Apache Kafka 客户端版本 4.0.0。 此次升级带来了几个重要的更改:spring-doc.cadn.net.cn

嵌入式Kafka测试框架更改

测试基础设施已得到显著更新:spring-doc.cadn.net.cn

消费者记录构建器更改

The ConsumerRecords 构造函数现在需要一个额外的 Map 参数,这一变更已在整个框架中得到处理。 直接使用此构造函数的应用程序将需要更新其代码。spring-doc.cadn.net.cn

生产者接口更新

新的 Kafka 生产者接口方法已实现:spring-doc.cadn.net.cn

移除已弃用功能

已移除几个已弃用的项目:spring-doc.cadn.net.cn

Kafka Streams API 变更

  • KafkaStreamBrancher 已更新以使用新的 split()branch() 方法,而不是已弃用的 branch() 方法spring-doc.cadn.net.cn

  • The DeserializationExceptionHandler has been updated to use the new ErrorHandlerContextspring-doc.cadn.net.cn

内部 API 更新,与 Apache Kafka 4.0.0 相关

  • The BrokerAddress 类现在使用 org.apache.kafka.server.network.BrokerEndPoint 代替已弃用的 kafka.cluster.BrokerEndPointspring-doc.cadn.net.cn

  • The GlobalEmbeddedKafkaTestExecutionListener has been updated to work solely with KRaft modespring-doc.cadn.net.cn

新消费者重平衡协议

Spring for Apache Kafka 4.0 支持 Kafka 4.0 的新消费者重新平衡协议 - KIP-848。 对于详情,请参见 新消费者重新平衡协议文档spring-doc.cadn.net.cn

支持多值头

The JsonKafkaHeaderMapper and SimpleKafkaHeaderMapper 支持为 Kafka 记录的多值头映射。 更多详情请参阅 支持多值头映射spring-doc.cadn.net.cn

配置额外的RecordInterceptor

监听器容器现在支持通过 getRecordInterceptor() 进行拦截器自定义。 详见 消息监听器容器 部分的说明。spring-doc.cadn.net.cn

每条记录的批处理监听器观察

现在可以为使用批处理监听器时的每条记录获取观察值。 请参阅 批处理监听器的可观测性 以获取更多信息。spring-doc.cadn.net.cn

Kafka 队列(共享消费者)支持

Spring for Apache Kafka 现在通过 share consumers 提供 Kafka Queues 的早期访问支持,这些 share consumers 是 Apache Kafka 4.0.0 的一部分,实现了 KIP-932。 这使得多个消费者可以同时从相同的分区消费,相比传统的 consumer groups 提供了更好的负载分布。 见 Kafka Queues (Share Consumer) 了解更多信息。spring-doc.cadn.net.cn

Jackson 3 支持

Spring for Apache Kafka 现在提供了对 Jackson 3 的全面支持,同时保留对 Jackson 2 的支持。 当可用时,Jackson 3 会自动检测并优先使用,提供增强的性能和现代的 JSON 处理能力。spring-doc.cadn.net.cn

所有 Jackson 2 类现在都有与之对应的 Jackson 3 类,命名一致且类型安全性得到提升:spring-doc.cadn.net.cn

新的 Jackson 3 类使用 JsonMapper 代替泛型 ObjectMapper 以增强类型安全性,并利用了 Jackson 3 改进的模块系统和性能优化。spring-doc.cadn.net.cn

迁移路径: 现有应用程序在添加 Jackson 3 到类路径且更新类引用以使用新的 Jackson 3 等价项后将继续正常工作。 要迁移到 Jackson 3,只需将 Jackson 3 添加到您的类路径并更新类引用以使用新的 Jackson 3 等价项。 当两种版本同时存在时,框架会自动检测并优先选择 Jackson 3。spring-doc.cadn.net.cn

向后兼容性: 所有 Jackson 2 类都已弃用但仍然完全功能正常。 它们将在未来的一个重大版本中移除。spring-doc.cadn.net.cn

Spring 重试 依赖移除

Spring for Apache Kafka 已移除对其上 Spring Retry 的依赖,转而使用在 Spring Framework 7 中引入的核心重试支持。 这是一个影响整个框架重试配置和 API 的重大变更。spring-doc.cadn.net.cn

BackOffValuesGenerator 会在启动时预先生成所需的BackOff值,现在可以直接与 Spring Framework 的BackOff接口配合,而不再使用BackOffPolicy。 这些值由监听器基础设施进行管理,Spring Retry 也不再参与。spring-doc.cadn.net.cn

从配置的角度来看,Spring Kafka 严重依赖于 Spring Retry 的 @Backoff 注解。 由于 Spring 框架中没有等效的注解,该注解已移动到 Spring Kafka 作为 @BackOff,并带有以下改进:spring-doc.cadn.net.cn

迁移示例:spring-doc.cadn.net.cn

// Before
@RetryableTopic(backoff = @Backoff(delay = 2000, maxDelay = 10000, multiplier = 2))

// After
@RetryableTopic(backOff = @BackOff(delay = 2000, maxDelay = 10000, multiplier = 2))

// With new duration format support
@RetryableTopic(backOff = @BackOff(delayString = "2s", maxDelayString = "10s", multiplier = 2))

// With property placeholders
@RetryableTopic(backOff = @BackOff(delayString = "${retry.delay}", multiplierString = "${retry.multiplier}"))

RetryingDeserializer 不再提供 RecoveryCallback,而是提供一个以 RetryException 为输入的等效功能。 此内容包含抛出的异常以及重试尝试的次数:spring-doc.cadn.net.cn

// Before
retryingDeserializer.setRecoveryCallback(context -> {
    return fallbackValue;
});

// After
retryingDeserializer.setRecoveryCallback(retryException -> {
    return fallbackValue;
});

使用 BinaryExceptionClassifier 已被新引入的 ExceptionMatcher 取代,后者提供了一个更精致的 API。spring-doc.cadn.net.cn

额外的更改包括:spring-doc.cadn.net.cn

应用程序必须更新其配置以使用新的 Spring Framework 重试 API,但重试行为和功能保持不变。spring-doc.cadn.net.cn

最新更新 in 3.3 自 3.2 起

本部分涵盖从版本 3.2 到版本 3.3 的更改。 更早版本的更改请参见 变更历史spring-doc.cadn.net.cn

DLT 主题命名约定

DLT主题的命名约定已标准化为始终使用"-dlt"后缀。此更改确保了兼容性,并在在不同重试解决方案之间进行转换时避免冲突。希望保留".DLT"后缀行为的用户需要通过设置适当的DLT名称属性显式选择。spring-doc.cadn.net.cn

增强消费者组的查找操作

新增了一个方法,getGroupId(),已添加到ConsumerSeekCallback接口中。 该方法通过针对仅需的消费组,实现了更选择性的seek操作。 AbstractConsumerSeekAware现在也可以在多组监听器场景下,为每个主题分区注册、检索和移除所有回调,而不会遗漏任何。 了解更多详情,请查看新的API(getSeekCallbacksFor(TopicPartition topicPartition), getTopicsAndCallbacks())。 更多信息,请查看Seek API文档spring-doc.cadn.net.cn

可配置处理Kafka监听器中的空批次(使用RecordFilterStrategy)

RecordFilterStrategy 现在支持忽略由过滤操作产生的空批次。 这可以通过重写默认方法 ignoreEmptyBatch() 配置,该方法默认为 false,确保即使所有 ConsumerRecords 被过滤掉,KafkaListener 也会被调用。 了解更多详情,请参阅 消息接收过滤文档spring-doc.cadn.net.cn

并发容器停止事件

The ConcurentContainerMessageListenerContainer 现在在所有子容器停止时发出一个 ConcurrentContainerStoppedEvent。 了解更多详情,请参见 Application EventsConcurrentContainerStoppedEvent Javadocs。spring-doc.cadn.net.cn

原始回复记录键

当使用 ReplyingKafkaTemplate 时,如果请求中的原始记录包含一个键,那么回复中也会包含该相同键。 更多信息,请参见参考文档中 发送消息 部分。spring-doc.cadn.net.cn

定制 DeadLetterPublishingRecovererFactory 的日志记录

当使用 DeadLetterPublishingRecovererFactory 时,用户应用程序可以重写 maybeLogListenerException 方法以自定义日志记录行为。spring-doc.cadn.net.cn

定制 KafkaAdmin 管理客户端

当扩展 KafkaAdmin 时,用户应用程序可以重写 createAdmin 方法以自定义 Admin 客户端创建。spring-doc.cadn.net.cn

定制 Kafka Streams 的实现

当使用 KafkaStreamsCustomizer 时,现在可以通过重写 initKafkaStreams 方法返回 KafkaStreams 接口的自定义实现。spring-doc.cadn.net.cn

KafkaHeaders.DELIVERY_ATTEMPT 对于批处理监听器

当使用一个 BatchListener 时,ConsumerRecord 的 headers 字段可以包含 KafkaHeaders.DELIVERY_ATTMPT 头部。 如果将 DeliveryAttemptAwareRetryListener 设置为错误处理程序作为重试监听器,每个 ConsumerRecord 都会包含 delivery attempt 头部。 更多详情,请参见 批量监听器的 Kafka 头部spring-doc.cadn.net.cn

Kafka 指标监听器和TaskScheduler

The MicrometerProducerListener, MicrometerConsumerListener and KafkaStreamsMicrometerListener 现在可以使用 TaskScheduler 配置。 参见 KafkaMetricsSupport JavaDocs 和 Micrometer 支持 以获取更多信息。spring-doc.cadn.net.cn

最新更新 in 3.2 自 3.1 起

本部分涵盖从版本 3.1 到版本 3.2 的更改。 更早版本的更改请参见 变更历史spring-doc.cadn.net.cn

Kafka 客户端版本

该版本要求 3.7.0 kafka-clients。 3.7.0 版本的 Kafka 客户端引入了新的消费者组协议。 有关详细信息及其限制,请参见 KIP-848。 新的消费者组协议是一个早期访问版本,不建议用于生产环境。 在本版本中仅建议用于测试目的。 因此,Spring for Apache Kafka 仅在与 kafka-client 本身提供的测试级别支持范围内支持此新的消费者组协议。 默认情况下,Spring for Apache Kafka 使用经典的消费者组协议;在测试新的消费者组协议时,需要通过 group.protocol 属性进行启用。spring-doc.cadn.net.cn

测试支持变更

kraft 模式在 EmbeddedKafka 中默认被禁用,用户想要使用 kraft 模式必须启用它。 这是因为在 EmbeddedKafkakraft 模式下观察到某些不稳定性,尤其是在测试新的消费者组协议时。 新的消费者组协议仅支持在 kraft 模式下,因此在测试新协议时,必须针对真实的 Kafka 集群,而不是基于 KafkaClusterTestKit 的那个,EmbeddedKafka 也是基于此。 此外,还观察到了一些在 KafkaListener 方法使用 EmbeddedKafkakraft 模式时的竞态条件。 在这些问题解决之前,kraftEmbeddedKafka 上的默认值将保持为 falsespring-doc.cadn.net.cn

Kafka Streams 交互式查询支持

一个新的API KafkaStreamsInteractiveQuerySupport,用于访问Kafka Streams交互查询中使用的可查询存储。 查看 Kafka Streams交互支持 了解更多信息。spring-doc.cadn.net.cn

交易ID后缀策略

新增了 TransactionIdSuffixStrategy 接口用于管理 transactional.id 后缀。 默认实现为 DefaultTransactionIdSuffixStrategy,当设置 maxCache 大于零时,可以在特定范围内复用 transactional.id,否则将通过递增计数器动态生成后缀。 有关更多信息,请参见 Fixed TransactionIdSuffixspring-doc.cadn.net.cn

异步 @KafkaListener 返回

@KafkaListener(以及 @KafkaHandler)方法现在可以返回异步返回类型,包括 CompletableFuture<?>Mono<?> 和 Kotlin 的 suspend 函数。 请参阅 异步返回 以获取更多信息。spring-doc.cadn.net.cn

基于抛出的异常将消息路由到自定义DLTs

现在可以在消息处理过程中根据抛出的异常类型将消息重定向到自定义死信队列(DLT)。 重定向规则可以通过RetryableTopic.exceptionBasedDltRoutingRetryTopicConfigurationBuilder.dltRoutingRules设置。 自定义死信队列以及其他重试和死信主题也会自动创建。 有关更多信息,请参阅基于抛出的异常将消息路由到自定义死信队列spring-doc.cadn.net.cn

容器属性transactionManager属性弃用

transactionManager 属性在 ContainerProperties 中已被弃用,建议使用 KafkaAwareTransactionManager ,这是一个比通用的 PlatformTransactionManager 更窄的类型。参见 ContainerProperties事务同步spring-doc.cadn.net.cn

回滚处理后

一个新的AfterRollbackProcessor API processBatch 提供了。 请参阅 回滚处理器 以获取更多信息。spring-doc.cadn.net.cn

更改 @RetryableTopic 同一时隔策略默认值

@RetryableTopic属性SameIntervalTopicReuseStrategy的默认值更改为SINGLE_TOPIC。 参见单主题最大间隔指数延迟.spring-doc.cadn.net.cn

非阻塞重试支持类级别@KafkaListener

在RetryTopicConfigurationProvider中对一个类应用@RetryableTopic过程。

提供新的公共API以查找RetryTopicConfiguration。 见Find RetryTopicConfigurationspring-doc.cadn.net.cn

RetryTopicConfigurer 支持处理 MultiMethodKafkaListenerEndpoint。

The RetryTopicConfigurer支持过程和注册MultiMethodKafkaListenerEndpoint。 The MultiMethodKafkaListenerEndpoint为属性defaultMethodmethods提供getter/setter。 修改仅适用于MethodKafkaListenerEndpoint类型的EndpointCustomizer。 The EndpointHandlerMethod添加新构造函数以构建提供的bean实例。 提供新的类EndpointHandlerMultiMethod来处理多方法重试端点。spring-doc.cadn.net.cn

基于用户提供的函数寻求到一个偏移的新API方法

ConsumerCallback 提供了一种新的API,可以根据用户定义的函数定位到指定偏移量,该函数将消费者的当前偏移量作为参数。 更多信息,请参阅 Seek API 文档spring-doc.cadn.net.cn

@PartitionOffset 支持 SeekPosition

添加了seekPosition属性到@PartitionOffsetTopicPartitionOffset.SeekPosition的支持。更多细节请参见手动分配spring-doc.cadn.net.cn

新的 `TopicPartitionOffset` 构造函数接受一个计算要跳转的偏移量的功能参数

TopicPartitionOffset现在有一个新的构造函数,该构造函数接受一个用户提供的函数来计算跳转到的位置。 当使用此构造函数时,框架会用当前消费者偏移位置作为输入参数调用该函数。 有关更多详细信息,请参见Seek API 文档spring-doc.cadn.net.cn

Spring Boot应用名称作为默认客户端ID前缀

对于定义了应用程序名称的Spring Boot应用,该名称现在被用作某些客户端类型自动生成客户端ID的默认前缀。 请参见默认客户端ID前缀以获取更多信息。spring-doc.cadn.net.cn

增强的消息监听容器检索

ListenerContainerRegistry 提供了两个新的 API 动态查找和过滤 MessageListenerContainer 实例。getListenerContainersMatching(Predicate<String> idMatcher) 用于按 ID 进行筛选,另一个是 getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher) 用于按 ID 和容器属性进行筛选。spring-doc.cadn.net.cn

增强观察通过提供更多的跟踪标签

KafkaTemplateObservation 提供了更多的跟踪标签(低基数)。 KafkaListenerObservation 提供了一个新的 API 以查找高基数键名称以及更多跟踪标签(高或低基数)。请参见 Micrometer 观测spring-doc.cadn.net.cn

自 3.1 版本以来的新功能

此部分涵盖了从版本3.0到版本3.1所做的更改。 早期版本的变更请参见变更历史spring-doc.cadn.net.cn

Kafka 客户端版本

此版本需要 3.6.0 kafka-clientsspring-doc.cadn.net.cn

嵌入式Kafka Brokers

现在提供了一个额外的实现,可以使用Kraft而不是Zookeeper。 更多信息请参见嵌入式Kafka代理spring-doc.cadn.net.cn

JsonDeserializer

当反序列化异常发生时,SerializationException消息不再包含具有形式Can’t deserialize data [[123, 34, 98, 97, 122, …​的数据;每个数据字节的数值数组对于大数据来说可能没有用且很冗长。 使用ErrorHandlingDeserializer时,DeserializationException发送给错误处理程序包含data属性,该属性包含无法反序列化的原始数据。 如果不使用ErrorHandlingDeserializerKafkaConsumer将继续为同一记录发出异常,显示主题/分区/偏移量以及由Jackson抛出的错误原因。spring-doc.cadn.net.cn

ContainerPostProcessor

Post-processing 可以通过在 @KafkaListener 注解中指定监听器容器的 bean 名称来应用。 这发生在容器已经创建并且在容器工厂上配置的任何 ContainerCustomizer 都已设置之后。 有关更多信息,请参阅 容器工厂spring-doc.cadn.net.cn

错误处理反序列化器

您现在可以在这个反序列化器中添加一个Validator; 如果代理Deserializer成功地反序列化了对象,但该对象验证失败,则会抛出类似反序列化异常的错误。 这使得原始数据能够传递给错误处理器。参见使用ErrorHandlingDeserializer获取更多信息。spring-doc.cadn.net.cn

重试主题

将后缀-retry-5000改为-retry时,请在@RetryableTopic(backOff = @BackOff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)处操作。 若要保持后缀-retry-5000,请使用@RetryableTopic(backOff = @BackOff(delay = 5000), attempts = "2")。 更多信息请参见主题命名spring-doc.cadn.net.cn

监听器容器变更

当手动分配分区时,使用一个null消费者group.id,现在的AckMode会自动转换为MANUAL。 有关更多信息,请参阅手动分配所有分区spring-doc.cadn.net.cn

自从2.9以来的3.0新功能

Kafka 客户端版本

此版本需要 3.3.1 kafka-clientsspring-doc.cadn.net.cn

精确一次语义

EOSMode.V1 (aka ALPHA) 不再受支持。spring-doc.cadn.net.cn

在使用事务时,最小的broker版本是2.5。

观察

启用 Micrometer 对定时器和跟踪的观察功能现在已受支持。请参阅观察获取更多信息。spring-doc.cadn.net.cn

原生镜像

支持创建原生映像。
请参阅 原生映像 了解更多信息。spring-doc.cadn.net.cn

全局单嵌入Kafka

The embedded Kafka (EmbeddedKafkaBroker) can now be start as a single global instance for the whole test plan. See Using the Same Broker(s) for Multiple Test Classes for more information.spring-doc.cadn.net.cn

重试主题变更

此功能不再被视为实验性(就其API而言),该功能自2.7版本以来就已经支持,但可能会有大于正常的API变更可能性。spring-doc.cadn.net.cn

The bootstrapping of Non-Blocking Retries infrastructure beans has changed in this release to avoid some timing problems that occurred in some application regarding application initialization.spring-doc.cadn.net.cn

您现在可以为重试容器设置不同的concurrency;默认情况下,并发数与主容器相同。spring-doc.cadn.net.cn

@RetryableTopic 可以作为自定义注解上的元注解使用,包括支持 @AliasFor 属性。spring-doc.cadn.net.cn

配置 以获取更多信息。spring-doc.cadn.net.cn

The default replication factor for the retry topics is now -1 (use broker default). If your broker is earlier that version 2.4, you will now need to explicitly set the property.spring-doc.cadn.net.cn

您现在可以在同一个应用上下文中为同一主题配置多个@RetryableTopic监听器。 在此之前,这并不是可能的。 更多详细信息请参阅多个监听器,同一主题(或多个)spring-doc.cadn.net.cn

RetryTopicConfigurationSupport版本中,存在打破API的更改;具体来说,如果你重写了bean定义方法对于destinationTopicResolverkafkaConsumerBackoffManager和/或retryTopicConfigurer;这些方法现在需要一个ObjectProvider<RetryTopicComponentFactory>参数。spring-doc.cadn.net.cn

监听器容器变更

事件与消费者身份验证和授权失败相关现在由容器发布。 请参阅应用程序事件以获取更多信息。spring-doc.cadn.net.cn

您可以现在自定义消费者线程使用的线程名称。 请参阅 容器线程命名 以获取更多信息。spring-doc.cadn.net.cn

The container property restartAfterAuthException has been added. See Listener Container Properties for more information.spring-doc.cadn.net.cn

KafkaTemplate变更

该类返回的未来值现在是CompletableFuture而不是ListenableFuture。 请参见使用KafkaTemplate.spring-doc.cadn.net.cn

ReplyingKafkaTemplate变更

spring-doc.cadn.net.cn

此类别返回的未来对象现在是 CompletableFuture 秒,而不是 ListenableFuture 秒。 见 使用 ReplyingKafkaTemplate带有 Message<?> 秒的请求/回复. spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

@KafkaListener变更

您现在可以使用自定义相关标头,该标头将在任何回复消息中重复显示。 请参阅使用ReplyingKafkaTemplate一文末尾的备注以获取更多信息。spring-doc.cadn.net.cn

你现在可以手动提交批处理中的部分消息,而无需等待整个批处理完成。更多详情请参阅提交偏移量spring-doc.cadn.net.cn

KafkaHeaders变更

在2.9.x版本中被弃用的KafkaHeaders中的四个常量现在已被移除。spring-doc.cadn.net.cn

同样地,RECEIVED_MESSAGE_KEY 被替换为 RECEIVED_KEY,而 RECEIVED_PARTITION_ID 被替换为 RECEIVED_PARTITIONspring-doc.cadn.net.cn

测试更改

Version 3.0.7 引入了 MockConsumerFactoryMockProducerFactory。 See 模拟消费者和生产者 以获取更多信息。spring-doc.cadn.net.cn

从版本 3.0.10 开始,嵌入式 Kafka 中继器,默认情况下会将 Spring Boot 属性 spring.kafka.bootstrap-servers 设置为嵌入式中继器的地址。spring-doc.cadn.net.cn

自2.8以来,2.9的新功能

Kafka 客户端版本

这个版本需要 3.2.0 kafka-clients 版本。spring-doc.cadn.net.cn

错误处理器更改

The DefaultErrorHandler 现在可以配置为暂停容器一次拉取操作,并使用之前拉取操作剩余的结果,而不是跳转到剩余记录的偏移量。 有关更多信息,请参阅 DefaultErrorHandlerspring-doc.cadn.net.cn

The DefaultErrorHandler 现在有一个 BackOffHandler 属性。 See Back Off Handlers 以获取更多信息。spring-doc.cadn.net.cn

监听器容器变更

interceptBeforeTx 现在可以与所有事务管理器一起工作(之前它仅应用于 KafkaAwareTransactionManager)。 请参见 [前置事务拦截].spring-doc.cadn.net.cn

pauseImmediate 是一个新的容器属性,它使容器能够在处理完当前记录后暂停消费者,而不是在上一次轮询的所有记录都被处理完毕后再暂停。 请参阅 [pauseImmediate].spring-doc.cadn.net.cn

事件相关的消费者身份验证和授权spring-doc.cadn.net.cn

Header Mapper 更改

现在可以配置应该映射哪些入站标头。 也可以在版本 2.8.8 或更新的版本中使用。 有关更多信息,请参阅 消息标头spring-doc.cadn.net.cn

KafkaTemplate变更

在3.0版本中,此类返回的未来将为CompletableFuture而不是ListenableFuture。 请参见使用KafkaTemplate以获取在此发布版本过渡时的帮助。spring-doc.cadn.net.cn

ReplyingKafkaTemplate变更

The template now provides a method to wait for assignment on the reply container, to avoid a race when sending a request before the reply container is initialized. Also available in version 2.8.8 or later. See 使用ReplyingKafkaTemplate.spring-doc.cadn.net.cn

在3.0版本中,此类返回的未来将为CompletableFuture而不是ListenableFuture。有关在此版本下进行迁移的帮助,请参阅使用ReplyingKafkaTemplateMessage<?>s进行请求/回复spring-doc.cadn.net.cn

自2.7版本以来的新功能2.8

这个部分介绍了从版本 2.7 到版本 2.8 的更改。早期版本的更改,请参阅变更历史记录spring-doc.cadn.net.cn

Kafka 客户端版本

此版本需要 3.0.0 kafka-clientsspring-doc.cadn.net.cn

包变更

类型映射相关的类和接口已经从…​support.converter移至…​support.mappingspring-doc.cadn.net.cn

乱序的手动提交

The listener container can now be configured to accept manual offset commits out of order (usually asynchronously). The container will defer the commit until the missing offset is acknowledged. See Manually Committing Offsets for more information.spring-doc.cadn.net.cn

@KafkaListener变更

现在可以在方法本身上指定监听器方法是否为批量监听器。 这使得同一个容器工厂可以用于记录和批量监听器的配置。spring-doc.cadn.net.cn

See [批量监听器] for more information.spring-doc.cadn.net.cn

批处理监听器现在可以处理转换异常。spring-doc.cadn.net.cn

RecordFilterStrategy, 当与批处理监听器结合使用时,现在可以在一次调用中过滤整个批处理。 有关更多信息,请参阅[批处理监听器]一文末尾的说明。spring-doc.cadn.net.cn

The @KafkaListener 注解现在具有 filter 属性,用于覆盖此监听器容器工厂的 RecordFilterStrategyspring-doc.cadn.net.cn

The @KafkaListener 注解现在具有 info 属性;此属性用于填充新的监听器容器属性 listenerInfo。 这然后被用来在每个记录中生成一个 KafkaHeaders.LISTENER_INFO 头部,该头部可以在 RecordInterceptorRecordFilterStrategy 或者监听器本身中使用。 有关更多信息,请参阅 监听器信息头部AbstractMessageListenerContainer 属性spring-doc.cadn.net.cn

KafkaTemplate变更

你现在可以基于主题、分区和偏移量来接收单条记录。更多详情请参阅使用KafkaTemplate接收spring-doc.cadn.net.cn

CommonErrorHandler添加

The legacy GenericErrorHandler 和其子接口层次结构用于记录和批处理监听器已被一个新的单一接口 CommonErrorHandler 取代,该新接口有对应于大多数遗留的 GenericErrorHandler 实现的实现。 请参阅 容器错误处理器将自定义遗留错误处理程序实现迁移到 CommonErrorHandler 以获取更多信息。spring-doc.cadn.net.cn

监听器容器变更

The interceptBeforeTx 容器属性现在默认为 truespring-doc.cadn.net.cn

The authorizationExceptionRetryInterval 属性已被重命名为 authExceptionRetryInterval,现在适用于除 AuthorizationException 之外的 AuthenticationException。 两种异常都被视为致命错误,默认情况下容器会停止运行,除非设置了该属性。spring-doc.cadn.net.cn

序列化/反序列化更改

The DelegatingByTopicSerializerDelegatingByTopicDeserializer 现在已提供。 See 委托序列化器和反序列化器 以获取更多详细信息.spring-doc.cadn.net.cn

DeadLetterPublishingRecover变更

The property stripPreviousExceptionHeaders is now true by default.spring-doc.cadn.net.cn

现在有几种技术可以自定义添加到输出记录中的哪些标头。spring-doc.cadn.net.cn

重试主题变更

现在您可以使用同一个工厂来处理可重试和不可重试的主题。查看指定ListenerContainerFactory的更多信息.spring-doc.cadn.net.cn

现在有一个可管理的全局致命异常列表,将会使失败记录直接进入DLT。 请参阅异常分类器以了解如何进行管理。spring-doc.cadn.net.cn

您现在可以将阻塞和非阻塞重试结合起来使用。
请参阅结合阻塞和非阻塞重试以获取更多信息。spring-doc.cadn.net.cn

The KafkaBackOffException thrown when using the retryable topics feature is now logged at DEBUG level. See Changing KafkaBackOffException Logging Level if you need to change the logging level back to WARN or set it to any other level.spring-doc.cadn.net.cn

变化从 2.6 到 2.7

Kafka 客户端版本

此版本需要2.7.0 kafka-clients。 它还与自2.7.1版本以来的2.8.0客户端兼容;请参阅覆盖Spring Boot依赖项spring-doc.cadn.net.cn

非阻塞延迟重试机制使用主题

在本次发布中添加了这一重要新功能。 当严格顺序不重要时,失败的递送可以被发送到另一个主题供以后消费。 可以配置一系列这样的重试主题,带有不断增加的延迟。 有关更多信息,请参见非阻塞重试spring-doc.cadn.net.cn

监听器容器变更

The onlyLogRecordMetadata 容器属性现在默认为 truespring-doc.cadn.net.cn

一个新的容器属性stopImmediate现在可用。spring-doc.cadn.net.cn

使用一个BackOff在重试尝试之间的错误处理程序(例如:SeekToCurrentErrorHandlerDefaultAfterRollbackProcessor)现在会在容器停止后不久退出退避间隔,而不是延迟停止。spring-doc.cadn.net.cn

错误处理器和回滚后处理程序,如果它们扩展自FailedRecordProcessor,现在可以配置一个或多个RetryListener以接收关于重试和恢复进度的信息。spring-doc.cadn.net.cn

The RecordInterceptor 现在在监听器返回(正常或通过抛出异常)后还具有其他方法。 它还具有一個子接口 ConsumerAwareRecordInterceptor。 此外,现在为批处理监听器提供了一个 BatchInterceptor。 有关更多详细信息,请参阅 消息监听容器spring-doc.cadn.net.cn

@KafkaListener变更

你现在可以验证@KafkaHandler方法(类级别侦听器)的参数负载。@KafkaListener @Payload 验证中包含更多信息。spring-doc.cadn.net.cn

您现在可以将rawRecordHeader属性设置在MessagingMessageConverterBatchMessagingMessageConverter上,这会导致原始的ConsumerRecord被添加到转换后的Message<?>中。 这对于例如在监听器错误处理程序中使用DeadLetterPublishingRecoverer时非常有用。 请参阅监听器错误处理程序以获取更多信息。spring-doc.cadn.net.cn

您现在可以在应用程序初始化时修改@KafkaListener注解。 有关更多信息,请参见@KafkaListener 属性修改spring-doc.cadn.net.cn

DeadLetterPublishingRecover变更

现在,如果键和值的反序列化都失败,则会将原始值发布到DLT。 此前,值会被填充但键DeserializationException仍然保留在头部。 存在打破API的更改,如果您继承了恢复器并重写了createProducerRecord方法。spring-doc.cadn.net.cn

此外,恢复器会在发布到目标解析器之前验证由目标解析器选择的分区是否确实存在。spring-doc.cadn.net.cn

See 发布死信记录 以获取更多信息。spring-doc.cadn.net.cn

ChainedKafkaTransactionManageris Deprecated

事务 了解更多信息。spring-doc.cadn.net.cn

ReplyingKafkaTemplate变更

现在有一个机制可以检查回复并在将来如果存在某些条件时异常失败。spring-doc.cadn.net.cn

已添加发送和接收spring-messagingMessage<?>的功能。spring-doc.cadn.net.cn

Kafka Streams 变更

默认情况下,StreamsBuilderFactoryBean现在配置为不清理本地状态。 请参阅配置以获取更多详细信息。spring-doc.cadn.net.cn

KafkaAdmin变更

新方法 createOrModifyTopicsdescribeTopics 已被添加。 KafkaAdmin.NewTopics 用于在单个 Bean 中配置多个主题,请参阅 [配置主题] 获取更多信息。spring-doc.cadn.net.cn

MessageConverter变更

现在可以向MessagingMessageConverter添加一个spring-messagingSmartMessageConverter,从而根据contentType头部进行内容协商。 有关更多信息,请参阅Spring Messaging 消息转换spring-doc.cadn.net.cn

序列化@KafkaListeners

ExponentialBackOffWithMaxRetries

一个新的BackOff实现被提供出来,使得配置最大重试次数更加方便。
请参阅ExponentialBackOffWithMaxRetries 实现以获取更多信息。spring-doc.cadn.net.cn

条件委托错误处理器

这些新的错误处理器可以配置为根据异常类型委托给不同的错误处理器。 有关更多信息,请参阅 委托的错误处理器spring-doc.cadn.net.cn

变更说明从 2.5 版到 2.6 版

Kafka 客户端版本

此版本需要 2.6.0 kafka-clientsspring-doc.cadn.net.cn

监听器容器变更

各种错误处理器(继承自FailedRecordProcessor)和DefaultAfterRollbackProcessor现在在恢复失败时会重置BackOff。此外,您现在可以根据失败记录和/或异常来选择要使用的BackOffspring-doc.cadn.net.cn

您现在可以在容器属性中配置一个adviceChain。 有关更多信息,请参阅监听器容器属性spring-doc.cadn.net.cn

当容器配置为发布ListenerContainerIdleEvents时,现在在发送记录后会发布一个ListenerContainerNoLongerIdleEvent而不是空闲事件。 请参阅应用事件检测空闲和非响应式消费者以获取更多信息。spring-doc.cadn.net.cn

@KafkaListener 变更

当使用手动分区分配时,现在可以指定通配符以确定应将哪些分区重置为初始偏移量。 此外,如果监听器实现ConsumerSeekAware,那么在手动分配后会调用onPartitionsAssigned()。 (此功能也在版本2.5.5中添加)。 请参阅显式分区分配以获取更多信息。spring-doc.cadn.net.cn

已向AbstractConsumerSeekAware添加了方便的方法,以使定位更加容易。 有关更多信息,请参阅[定位]spring-doc.cadn.net.cn

错误处理器更改

子类的FailedRecordProcessor(例如:SeekToCurrentErrorHandlerDefaultAfterRollbackProcessorRecoveringBatchErrorHandler)现在可以配置为如果异常类型与之前记录中的类型不同,则重置重试状态。spring-doc.cadn.net.cn

Producer Factory Changes

您现在可以设置生产者最大存活时间,在此之后,它们将被关闭并重新创建。
请参阅事务以获取更多信息。spring-doc.cadn.net.cn

您现在可以在创建了 `0` 之后更新配置映射。这在例如凭证更改后需要更新 SSL 密钥/信任库位置时可能会很有用。更多信息,请参阅 使用 `DefaultKafkaProducerFactory`.spring-doc.cadn.net.cn

Spring框架2.4与2.5之间的更改

此部分涵盖了从版本2.4到版本2.5所做的更改。对于早期版本的变更历史,请参见变更历史spring-doc.cadn.net.cn

消费者/生产者工厂更改

The default consumer and producer factories can now invoke a callback whenever a consumer or producer is created or closed. Implementations for native Micrometer metrics are provided. See Factory Listeners for more information.spring-doc.cadn.net.cn

您现在可以在运行时更改 Bootstrap 服务器属性,从而启用向另一个 Kafka 集群的故障转移。了解连接到 Kafka 的更多信息spring-doc.cadn.net.cn

StreamsBuilderFactoryBean变更

The factory bean can now invoke a callback whenever a KafkaStreams created or destroyed. An Implementation for native Micrometer metrics is provided. See KafkaStreams Micrometer 支持 for more information.spring-doc.cadn.net.cn

Kafka 客户端版本

这个版本需要 2.5.0 版本的 kafka-clientsspring-doc.cadn.net.cn

类/包更改

SeekUtils 已从 o.s.k.support 包移动到 o.s.k.listenerspring-doc.cadn.net.cn

配送尝试标题

现在在使用某些错误处理器和事务回滚处理器时,可以添加一个跟踪投递尝试的头部选项。 有关更多信息,请参阅 投递尝试头部spring-doc.cadn.net.cn

@KafkaListener 变更

默认情况下,如果返回类型为@KafkaListener且需要填充默认回复头,则会自动填充这些头。 请参见Reply Type Message<?>以获取更多信息。spring-doc.cadn.net.cn

The 0 是不再填充为 1 值的,当传入记录包含 2 键时;头部完全省略了。spring-doc.cadn.net.cn

@KafkaListener 方法现在可以指定一个 ConsumerRecordMetadata 参数,而不是使用主题、分区等元数据的独立头。 更多信息,请参见 消费者记录元数据spring-doc.cadn.net.cn

监听器容器变更

The assignmentCommitOption容器属性现在默认为LATEST_ONLY_NO_TX。 请参阅监听器容器属性以获取更多信息。spring-doc.cadn.net.cn

The subBatchPerPartition容器属性现在默认为true当使用事务时。 更多详情请参见事务spring-doc.cadn.net.cn

一个全新的RecoveringBatchErrorHandler现在提供了。spring-doc.cadn.net.cn

静态组成员资格现在受支持。 请参阅消息监听容器以获取更多信息。spring-doc.cadn.net.cn

当增量/协作重新平衡被配置时,如果偏移量因非致命错误RebalanceInProgressException而无法提交,在重新平衡完成后,容器将尝试为仍分配给此实例的分区重试提交偏移量。spring-doc.cadn.net.cn

The default error handler is now the SeekToCurrentErrorHandler for record listeners and RecoveringBatchErrorHandler for batch listeners. See 容器错误处理器 for more information.spring-doc.cadn.net.cn

现在您可以控制标准错误处理程序故意抛出的异常的日志记录级别。更多信息请参阅容器错误处理程序spring-doc.cadn.net.cn

The getAssignmentsByClientId()方法已被添加,使得在并发容器中更易于确定哪些消费者被分配了哪个分区。 请参阅监听器容器属性以获取更多信息。spring-doc.cadn.net.cn

您现在可以抑制在错误、调试日志等中记录整个ConsumerRecords。 参见监听器容器属性中的onlyLogRecordMetadataspring-doc.cadn.net.cn

KafkaTemplate 变更

The KafkaTemplate 现在可以维护 micrometer 计时器。 见 监控 以获取更多详细信息.spring-doc.cadn.net.cn

The KafkaTemplate 现在可以配置 ProducerConfig 属性以覆盖生产者工厂中的那些属性。 参见 使用 KafkaTemplate 以获取更多详细信息。spring-doc.cadn.net.cn

一个 RoutingKafkaTemplate 现在已经被提供。 请参见 使用 RoutingKafkaTemplate 以获取更多信息。spring-doc.cadn.net.cn

您现在可以使用KafkaSendCallback而不是ListenerFutureCallback来获取更窄的异常,从而更容易提取失败的ProducerRecord。 请参阅使用KafkaTemplate以获取更多详细信息。spring-doc.cadn.net.cn

Kafka 字符串序列化器/反序列化器

New ToStringSerializer/StringDeserializers as well as an associated SerDe are now provided. See 字符串序列化 for more information.spring-doc.cadn.net.cn

JsonDeserializer

The JsonDeserializer 现在更具灵活性,可以确定反序列化类型。 请参见 通过方法来确定类型 以获取更多信息。spring-doc.cadn.net.cn

委托序列化器/反序列化器

The DelegatingSerializer 现在可以处理“标准”类型,当传出记录没有头部时。 更多信息,请参见 委托序列化器和反序列化器spring-doc.cadn.net.cn

测试更改

The KafkaTestUtils.consumerProps() helper record now sets ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest by default. See JUnit for more information.spring-doc.cadn.net.cn

变化在2.3和2.4之间

Kafka 客户端版本

此版本需要 2.4.0 kafka-clients 或更高版本,并支持新的增量重新平衡功能。spring-doc.cadn.net.cn

ConsumerAwareRebalanceListener

ConsumerRebalanceListener 这个接口现在有了一个新的方法 onPartitionsLost。 请参阅 Apache Kafka 文档以获取更多信息。spring-doc.cadn.net.cn

不同于ConsumerRebalanceListener,默认实现不会调用onPartitionsRevoked。相反,监听器容器会在调用完onPartitionsLost之后再调用该方法;因此,在实现ConsumerAwareRebalanceListener时不应做同样的事情。spring-doc.cadn.net.cn

Rebalancing Listeners 部分末尾的重要说明以获得更多信息。spring-doc.cadn.net.cn

GenericErrorHandler

The isAckAfterHandle() 默认实现现在默认返回true。spring-doc.cadn.net.cn

KafkaTemplate

The KafkaTemplate 现在支持非事务性发布与事务性发布并行。 请参阅 KafkaTemplate 事务性和非事务性发布 获取更多详细信息。spring-doc.cadn.net.cn

AggregatingReplyingKafkaTemplate

The releaseStrategy is now a BiConsumer. It is now called after a timeout (as well as when records arrive); the second parameter is true in the case of a call after a timeout.spring-doc.cadn.net.cn

监听器容器

The 0 provides an 1 option to let the listener container to retry after any 2 is thrown by the 3. See its JavaDocs and Using KafkaMessageListenerContainer for more information.spring-doc.cadn.net.cn

@KafkaListener

The @KafkaListener annotation has a new property splitIterables; default true. When a replying listener returns an Iterable this property controls whether the return result is sent as a single record or a record for each element is sent. See Forwarding Listener Results using @SendTo for more informationspring-doc.cadn.net.cn

批处理监听器现在可以配置为BatchToRecordAdapter; 这意味着,例如,批处理可以在事务中进行处理,而监听器一次接收一条记录。 使用默认实现时,可以使用ConsumerRecordRecoverer来在批次内部处理错误,而不中断整个批次的处理 - 在使用事务的情况下这可能很有用。 请参阅 批处理监听器中的事务 以获取更多信息。spring-doc.cadn.net.cn

Kafka Streams

The StreamsBuilderFactoryBean 接受一个新属性 KafkaStreamsInfrastructureCustomizer。 这允许在创建流之前配置构建器和/或拓扑结构。有关更多信息,请参阅 Spring 管理spring-doc.cadn.net.cn

变化概述

此部分涵盖了从版本 2.2 到版本 2.3 的更改。spring-doc.cadn.net.cn

提示, 技巧和示例

一个新章节 提示、技巧和示例 已添加。 请提交 GitHub 问题和/或拉取请求以在该章节中增加更多条目。spring-doc.cadn.net.cn

Kafka 客户端版本

此版本需要2.3.0 kafka-clients 或更高版本。spring-doc.cadn.net.cn

类/包更改

TopicPartitionInitialOffset 已被 TopicPartitionOffset 代替。spring-doc.cadn.net.cn

配置更改

自2.3.4版本起,missingTopicsFatal容器属性默认为false。 当此设置为true时,如果代理器宕机,则应用程序将无法启动;许多用户都受到了这一更改的影响;鉴于Kafka是一个高可用性平台,我们并未预料到没有活动代理器的情况下启动应用程序会是一种常见的用例。spring-doc.cadn.net.cn

Producer and Consumer Factory Changes

The 0可以现在被配置为每个线程创建一个生产者。你也可以在构造函数中提供1个实例,作为另一种选择,要么使用配置的类(这需要无参构造函数),或者使用2个实例进行构造,这些实例将在所有生产者之间共享。 更多信息请参阅使用0spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

在带有Supplier<Deserializer>个实例的DefaultKafkaConsumerFactory中同样可以使用这个选项。spring-doc.cadn.net.cn

有关更多信息,请参阅KafkaMessageListenerContainer的用法spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

监听器容器变更

此前,当使用监听器适配器(例如2s)调用监听器时,错误处理程序接收到的参数为ListenerExecutionFailedException(实际监听器异常作为cause)。spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

由原生GenericMessageListener抛出的异常未修改地传递给错误处理程序。spring-doc.cadn.net.cn

现在,错误处理程序始终接收一个ListenerExecutionFailedException参数(实际监听器异常作为cause),这提供了访问容器的group.id属性的能力。spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

因为监听容器有自己的提交偏移量的机制,它更偏好于Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG被设置为false。 现在,除非在消费者工厂或容器的消费者属性中明确设置,否则它会自动设置为false。spring-doc.cadn.net.cn

The ackOnError 属性现在默认为 falsespring-doc.cadn.net.cn

现在可以在监听器方法中获取消费者的group.id属性。 有关更多信息,请参阅取得消费者group.idspring-doc.cadn.net.cn

The container has a new property recordInterceptor allowing records to be inspected or modified before invoking the listener. A CompositeRecordInterceptor is also provided in case you need to invoke multiple interceptors. See Message Listener Containers for more information.spring-doc.cadn.net.cn

The ConsumerSeekAware 有新的方法,允许你在开始、结束或当前位置进行跳转,并且可以跳转到大于或等于时间戳的第一个偏移量。 请参见 [seek] 获取更多详细信息。spring-doc.cadn.net.cn

一个方便类 AbstractConsumerSeekAware 现在提供了,用于简化查找操作。 参见 [查找] 以获取更多信息.spring-doc.cadn.net.cn

The ContainerProperties 提供了一个 idleBetweenPolls 选项,让监听器容器的主要循环在两次 KafkaConsumer.poll() 调用之间暂停。 请参阅其 JavaDocs 和 使用 KafkaMessageListenerContainer 获取更多详细信息。spring-doc.cadn.net.cn

当使用AckMode.MANUAL(或MANUAL_IMMEDIATE)时,你现在可以通过调用nack来触发重传,在Acknowledgment中可以找到更多相关信息。 提交偏移量spring-doc.cadn.net.cn

现在可以使用Micrometer Timer来监控监听器性能。spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

请参见监控以获取更多信息。spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

The containers now publish additional consumer lifecycle events relating to startup. See Application Events for more information.spring-doc.cadn.net.cn

事务性批处理监听器现在可以支持僵尸隔离。 更多详情请参见事务spring-doc.cadn.net.cn

The listener container factory can now be configured with a ContainerCustomizer to further configure each container after it has been created and configured. See Container factory for more information.spring-doc.cadn.net.cn

错误处理器更改

The SeekToCurrentErrorHandler 现在将某些异常视为致命的,并在首次失败时禁用重试,调用恢复器。spring-doc.cadn.net.cn

The SeekToCurrentErrorHandler and SeekToCurrentBatchErrorHandler can now be configured to apply a BackOff (thread sleep) between delivery attempts.spring-doc.cadn.net.cn

从版本 2.3.2 开始,当错误处理程序在恢复失败记录后返回时,已恢复记录的偏移量将被提交。spring-doc.cadn.net.cn

The `DeadLetterPublishingRecoverer`,当与一个`1`结合使用时,现在设置了发送到死信主题的消息载荷为原本无法反序列化的原始值。此前,它是`2`,用户代码需要从消息头中提取`3`。 更多详细信息请参见发布死信记录spring-doc.cadn.net.cn

TopicBuilder

一个新类TopicBuilder提供了一种更方便的方法来创建NewTopic@Beans,用于自动主题分配。 更多信息,请参阅[配置主题]spring-doc.cadn.net.cn

Kafka Streams 变更

您现在可以对由@EnableKafkaStreams创建的StreamsBuilderFactoryBean进行额外的配置。 参见流配置以获取更多信息。spring-doc.cadn.net.cn

A RecoveringDeserializationExceptionHandler 现在提供了一个允许记录具有反序列化错误的状态,以便从这些错误中恢复。它可以用作将这些记录发送到死信主题的 DeadLetterPublishingRecoverer 的辅助工具。有关更多信息,请参阅 从反序列化异常恢复spring-doc.cadn.net.cn

The HeaderEnricher 转换器已提供,使用SpEL生成头部值。 See Header Enricher for more information.spring-doc.cadn.net.cn

The MessagingTransformer 已被提供。 这允许 Kafka Streams 拓扑与一个 Spring Messaging 组件,例如一个 Spring Integration 流,进行交互。 请参阅 MessagingProcessor 和 See 从一个 KStream 调用一个 Spring Integration Flow 以获取更多信息。spring-doc.cadn.net.cn

JSON组件更改

现在所有JSON感知组件默认使用Jackson生成的ObjectMapper进行配置。 JacksonUtils.enhancedObjectMapper() 现在提供了基于TypeReference 的构造函数,以便更好地处理目标泛型容器类型。 此外还引入了JacksonMimeTypeModule 用于将org.springframework.util.MimeType 序列化为普通字符串。 有关更多信息,请参见其Java文档和序列化、反序列化及消息转换spring-doc.cadn.net.cn

一个 ByteArrayJsonMessageConverter 已提供,同时还有一个所有 Json 转换器的新超类,JsonMessageConverter。 此外,现在有一个 StringOrBytesSerializer;它可以将 byte[]BytesString 值转换为 ProducerRecords。 有关更多信息,请参阅 Spring Messaging 消息转换spring-doc.cadn.net.cn

The JsonSerializer, JsonDeserializer and JsonSerde now have fluent APIs to make programmatic configuration simpler. See the javadocs, 序列化、反序列化和消息转换, and 流的JSON序列化和反序列化 for more information.spring-doc.cadn.net.cn

ReplyingKafkaTemplate

当回复超时,future将以0而不是1完成异常。spring-doc.cadn.net.cn

也提供了带有消息级别的回复超时指定的重载sendAndReceive方法。spring-doc.cadn.net.cn

AggregatingReplyingKafkaTemplate

扩展ReplyingKafkaTemplate通过从多个接收者聚合回复。 参见聚合多个回复以获取更多信息。spring-doc.cadn.net.cn

事务更改

您现在可以在KafkaTemplateKafkaTransactionManager上覆盖生产者工厂的transactionIdPrefix。 有关更多信息,请参阅:transactionIdPrefixspring-doc.cadn.net.cn

新的委托序列化器/反序列化器

The framework now provides a delegating Serializer and Deserializer, utilizing a header to enable producing and consuming records with multiple key/value types. See Delegating Serializer and Deserializer for more information.spring-doc.cadn.net.cn

新重试反序列化器

The framework now provides a delegating RetryingDeserializer, to retry serialization when transient errors such as network problems might occur. See Retrying Deserializer for more information.spring-doc.cadn.net.cn

2.1 与 2.2 版本之间的变更

Kafka 客户端版本

此版本需要 2.0.0 kafka-clients 或更高版本。spring-doc.cadn.net.cn

类和包的更改

The ContainerProperties class has been moved from org.springframework.kafka.listener.config to org.springframework.kafka.listener.spring-doc.cadn.net.cn

The AckMode 枚举已被从 AbstractMessageListenerContainer 移动到 ContainerPropertiesspring-doc.cadn.net.cn

The setBatchErrorHandler()setErrorHandler() 方法已被从 ContainerProperties 移动到 AbstractMessageListenerContainerAbstractKafkaListenerContainerFactoryspring-doc.cadn.net.cn

回滚处理后

新的AfterRollbackProcessor策略已提供。 请参阅回滚后处理器以获取更多详细信息。spring-doc.cadn.net.cn

ConcurrentKafkaListenerContainerFactory变更

您现在可以使用ConcurrentKafkaListenerContainerFactory来创建和配置任何ConcurrentMessageListenerContainer,而不仅仅是针对@KafkaListener注解的。
有关更多信息,请参阅容器工厂spring-doc.cadn.net.cn

监听器容器变更

一个新的容器属性(missingTopicsFatal)已被添加。 更多详情请参阅 使用 KafkaMessageListenerContainerspring-doc.cadn.net.cn

一个 ConsumerStoppedEvent 现在会在消费者停止时发出。 有关更多信息,请参见 线程安全性spring-doc.cadn.net.cn

批处理监听器可以选择接收完整的ConsumerRecords<?, ?>对象,而不是List<ConsumerRecord<?, ?>。 有关更多信息,请参阅[批量监听器]spring-doc.cadn.net.cn

The DefaultAfterRollbackProcessorSeekToCurrentErrorHandler 现在可以恢复(跳过)持续失败的记录,并且默认情况下会在 10 次失败后进行此操作。它们可以配置为将失败的记录发布到死信主题。spring-doc.cadn.net.cn

使用版本2.2.4后,可以在选择死信主题名称时使用消费者的组ID。spring-doc.cadn.net.cn

The ConsumerStoppingEvent 已经被添加。 See 应用事件 以获取更多信息。spring-doc.cadn.net.cn

The SeekToCurrentErrorHandler 现在可以配置为在容器使用 AckMode.MANUAL_IMMEDIATE 配置时提交恢复记录的偏移量(自 2.2.4 版起).spring-doc.cadn.net.cn

@KafkaListener 变更

您现在可以通过在注解上设置属性来覆盖监听器容器工厂的concurrencyautoStartup属性。 您可以添加配置以确定哪些(如果有的话)标头被复制到回复消息中。 有关更多信息,请参阅@KafkaListener注解spring-doc.cadn.net.cn

您现在可以使用@KafkaListener作为元注解在自定义注解上。 请参阅@KafkaListener作为元注释以获取更多信息。spring-doc.cadn.net.cn

现在为Validator配置@Payload验证变得更加容易。 更多详情请参见@KafkaListener @Payload验证spring-doc.cadn.net.cn

您现在可以直接在注解上指定 Kafka 消费者属性;这些属性将覆盖消费者工厂中同名定义的任何属性(自版本 2.2.4 开始)。 有关更多信息,请参阅 注解属性spring-doc.cadn.net.cn

Header Mapping Changes

Headers of type MimeTypeMediaType 现在映射为简单的字符串类型,在RecordHeader值中。 之前,它们被映射为 JSON,并且只有MimeType会被解码。MediaType无法被解码。 现在为了互操作性,它们是简单的字符串。spring-doc.cadn.net.cn

Also, the JsonKafkaHeaderMapper has a new addToStringClasses method, allowing the specification of types that should be mapped by using toString() instead of JSON. See Message Headers for more information.spring-doc.cadn.net.cn

嵌入式Kafka更改

The KafkaEmbedded 类及其 KafkaRule 接口已被弃用,改用了 EmbeddedKafkaBroker 和其 JUnit 4 EmbeddedKafkaRule 包装器。
The @EmbeddedKafka 注解现在填充一个 EmbeddedKafkaBroker 实例而不是弃用的 KafkaEmbedded 实例。
This change allows the use of @EmbeddedKafka 在 JUnit 5 测试中。
The @EmbeddedKafka 注解现在有一个属性 ports 来指定填充 EmbeddedKafkaBroker 的端口。请参阅 测试应用程序 获取更多信息。spring-doc.cadn.net.cn

Spring 框架中的 Java 开发者增强功能

您可以现在通过使用生产者和消费者属性来提供类型映射信息。spring-doc.cadn.net.cn

新的反序列化构造函数可供使用,允许通过提供的目标类型覆盖类型头部信息。spring-doc.cadn.net.cn

The JsonDeserializer 现在默认移除任何类型信息头部。spring-doc.cadn.net.cn

您现在可以通过使用一个Kafka属性来配置JsonDeserializer以忽略类型信息头(从2.2.3版本开始)。spring-doc.cadn.net.cn

Kafka Streams 变更

Spring框架中,流配置bean现在必须是一个KafkaStreamsConfiguration对象,而不是一个StreamsConfig对象。spring-doc.cadn.net.cn

StreamsBuilderFactoryBean已被从包…​core移动到…​configspring-doc.cadn.net.cn

spring-doc.cadn.net.cn

当基于KStream实例构建条件分支时,为了提高最终用户体验,已经引入了KafkaStreamBrancherspring-doc.cadn.net.cn

spring-doc.cadn.net.cn

事务标识

当监听容器启动事务时,transactional.id 现在是 transactionIdPrefix 附加上 <group.id>.<topic>.<partition>。 这个变化允许正确隔离僵尸对象,如这里所述spring-doc.cadn.net.cn

2.0 与 2.1 版本之间的变更

Kafka 客户端版本

此版本需要1.0.0 kafka-clients 或更高版本。spring-doc.cadn.net.cn

1.1.x 版本客户端在 2.2 版本中得到了原生支持。spring-doc.cadn.net.cn

JSON改进

The StringJsonMessageConverterJsonSerializer 现在在 Headers 中添加类型信息,让转换器和 JsonDeserializer 在接收时根据消息本身创建特定的类型,而不是固定配置的类型。 更多详细信息,请参阅 序列化、反序列化和消息转换spring-doc.cadn.net.cn

容器停止错误处理程序

容器错误处理程序现在为记录和批量侦听器提供,将任何由侦听器抛出的异常视为致命错误。它们会停止容器。 请参阅 处理异常 以获取更多信息。spring-doc.cadn.net.cn

暂停和恢复容器

The listener containers now have pause() and resume() methods (since version 2.1.3). See Pausing and Resuming Listener Containers for more information.spring-doc.cadn.net.cn

有状态重试

从版本2.1.3开始,您可以配置状态化重试。 更多信息,请参见状态化重试spring-doc.cadn.net.cn

客户端ID

从 2.1.1 版本开始,你现在可以在 @KafkaListener 前面设置 client.id 前缀。 此前,要自定义客户端 ID,你需要为每个监听器分别配置一个消费者工厂(和容器工厂)。 前缀会在使用并发时自动后接 -n 以提供唯一的客户端 ID。spring-doc.cadn.net.cn

日志偏移提交

默认情况下,主题偏移提交的日志记录使用DEBUG日志级别进行。 从2.1.2版本开始,在ContainerProperties中新增了一个名为commitLogLevel的属性,允许您指定这些消息的日志级别。 有关更多信息,请参阅使用KafkaMessageListenerContainerspring-doc.cadn.net.cn

默认 @KafkaHandler

从版本 2.1.3 开始,您可以在类级别的注解中指定一个 @KafkaHandler 注解作为默认值。更多详细信息请参阅 @KafkaListener 在类级别上的使用spring-doc.cadn.net.cn

ReplyingKafkaTemplate

自版本 2.1.3 起,提供了一个子类 KafkaTemplate 来支持请求/回复语义。 请参阅 使用 ReplyingKafkaTemplate 获取更多详细信息。spring-doc.cadn.net.cn

ChainedKafkaTransactionManager

Version 2.1.3 引入了 ChainedKafkaTransactionManager。 (现在已弃用)spring-doc.cadn.net.cn

迁移指南从2.0版本

变化从1.3到2.0

Spring框架与Java版本

The Spring for Apache Kafka项目现在需要Spring框架5.0和Java 8。spring-doc.cadn.net.cn

@KafkaListener变更

您现在可以使用@KafkaListener注解方法(以及类和@KafkaHandler方法)来使用@SendTo。如果该方法返回结果,则会将其转发到指定的主题。有关更多信息,请参阅通过@SendTo转发监听器结果spring-doc.cadn.net.cn

消息监听器

Message listeners can now be aware of the Consumer object. See [message-listeners] for more information.spring-doc.cadn.net.cn

使用ConsumerAwareRebalanceListener

rebalance监听器现在可以在重新均衡通知期间访问Consumer对象。 有关更多信息,请参阅重新均衡监听器spring-doc.cadn.net.cn

变化从1.2到1.3

支持事务处理

0.11.0.0 客户端库增加了对事务的支持。 KafkaTransactionManager 和其他事务支持也已添加。 有关更多信息,请参见 事务spring-doc.cadn.net.cn

支持标头

0.11.0.0 客户端库增加了对消息头的支持。 这些内容现在可以映射到 spring-messagingMessageHeaders 之间。 有关更多信息,请参阅 消息头spring-doc.cadn.net.cn

创建主题

0.11.0.0 客户端库提供了一个 AdminClient,你可以使用它来创建主题。 KafkaAdmin 使用此客户端自动添加定义为 @Bean 实例的主题。spring-doc.cadn.net.cn

Kafka 时间戳支持

KafkaTemplate 现支持添加带有时间戳的记录的 API。 新发布的 KafkaHeaders 包含了对于 timestamp 的支持。 此外,还新增了 KafkaConditions.timestamp()KafkaMatchers.hasTimestamp() 测试工具。 请参阅 使用 KafkaTemplate@KafkaListener 注解 以及 测试应用程序 以获取更多详情。spring-doc.cadn.net.cn

@KafkaListener变更

你现在已经可以配置一个KafkaListenerErrorHandler来处理异常。更多详情请参阅处理异常spring-doc.cadn.net.cn

默认情况下,现在使用@KafkaListenerid属性作为group.id属性,覆盖在消费者工厂中配置的相应属性(如果存在的话)。 此外,您还可以显式地在注解上配置groupId。 以前,如果您需要使用不同的group.id值为监听器设置不同的容器工厂和消费者工厂。 为了恢复之前的使用在工厂中配置的group.id的行为,请将注解上的idIsGroup属性设置为falsespring-doc.cadn.net.cn

@EmbeddedKafka注解

为了方便,提供了一个测试类级别的@EmbeddedKafka注解,用于将KafkaEmbedded注册为一个bean。 有关更多信息,请参见测试应用程序spring-doc.cadn.net.cn

Kerberos 配置

现在提供了配置Kerberos的支持。 请参阅JAAS和Kerberos以获取更多信息。spring-doc.cadn.net.cn

变化从1.1到1.2

此版本使用的是0.10.2.x客户端。spring-doc.cadn.net.cn

变化于1.0和1.1之间

Kafka 客户端

此版本使用了 Apache Kafka 0.10.x.x 客户端。spring-doc.cadn.net.cn

批次监听器

Listeners可以配置为接收由consumer.poll()操作返回的整个消息批次,而不是逐个接收。spring-doc.cadn.net.cn

无数据载荷

null payload 被用于在使用日志压缩时“删除”键。spring-doc.cadn.net.cn

初始偏移量

当显式分配分区时,现在可以配置消费者组的初始偏移量相对于当前位置,而不仅仅是绝对位置或相对于当前末尾的位置。spring-doc.cadn.net.cn

寻求

您现在可以查找每个主题或分区的位置。 您可以使用此功能在初始化期间设置初始位置,当使用组管理且Kafka分配了分区时。 您也可以在检测到空闲容器时进行查找或在应用程序执行的任意一点。 有关更多信息,请参阅 [seek]spring-doc.cadn.net.cn