|
对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.6! |
新增功能
自 3.1 以来 3.2 中的新增功能
本节介绍从版本 3.1 到版本 3.2 所做的更改。 有关早期版本的更改,请参阅更改历史记录。
Kafka 客户端版本
此版本需要 3.7.0kafka-clients.
3.7.0 版本的 Kafka 客户端引入了新的消费组协议。
有关更多详细信息及其限制,请参阅 KIP-848。
新的使用者组协议是早期访问版本,不打算在生产中使用。
在此版本中,仅建议用于测试目的。
因此, Spring for Apache Kafka 仅在kafka-client本身。
默认情况下, Spring for Apache Kafka 使用经典的消费者组协议,在测试新的消费者组协议时,需要通过group.protocol属性。
测试支持更改
这kraft模式在EmbeddedKafka默认情况下,想要使用kraftmode 必须启用它。
这是由于在使用EmbeddedKafka在kraft模式,尤其是在测试新的消费者组协议时。
新的消费组协议仅在kraft模式,因此,在测试新协议时,需要针对真实的 Kafka 集群进行,而不是基于KafkaClusterTestKit哪EmbeddedKafka所依据的。
此外,在运行多个KafkaListener方法与EmbeddedKafka在kraft模式。
在这些问题得到解决之前,kraft默认开启EmbeddedKafka将保持为false.
Kafka Streams 交互式查询支持
新的 APIKafkaStreamsInteractiveQuerySupport用于访问 Kafka Streams 交互式查询中使用的可查询存储。
有关更多详细信息,请参阅 Kafka Streams Interactive Support 。
TransactionIdSuffixStrategy
新的TransactionIdSuffixStrategy引入了 interface 来管理transactional.id后缀。
默认实现为DefaultTransactionIdSuffixStrategy当设置maxCache大于零可以重复使用transactional.id在特定范围内,否则将通过递增 counter 动态生成后缀。
有关更多信息,请参阅Fixed TransactionIdSuffix。
异步 @KafkaListener 返回
@KafkaListener(以及@KafkaHandler) 方法现在可以返回异步返回类型,包括CompletableFuture<?>,Mono<?>和 Kotlinsuspend功能。
有关更多信息,请参阅 Async Returns 。
根据引发的异常将消息路由到自定义 DLT
现在可以根据异常的类型将消息重定向到自定义 DLT,该异常在消息处理过程中引发。
重定向规则通过RetryableTopic.exceptionBasedDltRouting或RetryTopicConfigurationBuilder.dltRoutingRules.
自定义 DLT 以及其他重试和死信主题会自动创建。
有关更多信息,请参阅根据引发的异常将消息路由到自定义 DLT。
弃用 ContainerProperties transactionManager 属性
弃用transactionManagerproperty 中ContainerProperties赞成KafkaAwareTransactionManager,与常规PlatformTransactionManager.请参阅 ContainerProperties 和事务同步。
回滚处理后
新的AfterRollbackProcessor应用程序接口processBatch。
有关更多信息,请参阅 After-rollback Processor 。
更改 SameIntervalTopicReuseStrategy 默认值@RetryableTopic
改变@RetryableTopic属性SameIntervalTopicReuseStrategydefault 值设置为SINGLE_TOPIC.
请参阅 maxInterval Exponential Delay 的单个主题。
非阻塞重试支持类级别@KafkaListener
非阻塞重试支持对 Class 进行@KafkaListener。 请参阅 Non-blocking Retries。
支持 RetryTopicConfigurationProvider 中类的进程@RetryableTopic。
提供新的公共 API 以查找RetryTopicConfiguration.
请参阅 查找 RetryTopicConfiguration
RetryTopicConfigurer 支持进程 MultiMethodKafkaListenerEndpoint。
这RetryTopicConfigurer支持流程和注册MultiMethodKafkaListenerEndpoint.
这MultiMethodKafkaListenerEndpoint提供getter/setter对于属性defaultMethod和methods.
修改EndpointCustomizer严格来说MethodKafkaListenerEndpoint类型。
这EndpointHandlerMethod添加新构造函数为提供的 bean 构造一个实例。
提供 new classEndpointHandlerMultiMethod处理程序 multi 方法,用于重试端点。
新的 API 方法,用于根据用户提供的函数寻找偏移量
ConsumerCallback提供了一个新的 API,用于根据用户定义的函数查找偏移量,该函数将使用者中的当前偏移量作为参数。
有关更多详细信息,请参阅 Seek API Docs 。
@PartitionOffset SeekPosition 支持
添加seekPositionproperty 设置为@PartitionOffset支持TopicPartitionOffset.SeekPosition.
有关更多详细信息,请参阅 manual-assignment 。
TopicPartitionOffset 中的新构造函数,它接受一个函数来计算要查找的偏移量
TopicPartitionOffset具有一个新的构造函数,该构造函数采用用户提供的函数来计算要查找的偏移量。
使用此构造函数时,框架使用当前使用者偏移位置的 input 参数调用函数。
有关更多详细信息,请参阅 Seek API Docs 。
Spring Boot 应用程序名称作为默认客户端 ID 前缀
对于定义应用程序名称的 Spring Boot 应用程序,现在使用此名称 作为某些客户端类型的自动生成的客户端 ID 的默认前缀。 有关更多详细信息,请参阅默认客户端 ID 前缀。
增强了 MessageListenerContainers 的检索
ListenerContainerRegistry提供两个新的 API 动态查找和筛选MessageListenerContainer实例。getListenerContainersMatching(Predicate<String> idMatcher)按 ID 进行筛选,另一个是getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher)按 ID 和容器属性进行筛选。
看@KafkaListener生命周期管理的 API 文档了解更多信息。
通过提供更多跟踪标签来增强观察
KafkaTemplateObservation提供更多的跟踪标签(低基数)。KafkaListenerObservation提供了新的 API 来查找高基数 Key 名称和更多跟踪标签(高基数或低基数)。
参见千分尺观察