|
这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4! |
变更历史
最新更新 in 3.2 自 3.1 起
本部分涵盖从版本 3.1 到版本 3.2 的更改。 更早版本的更改请参见 变更历史。
Kafka 客户端版本
该版本要求 3.7.0 kafka-clients。
3.7.0 版本的 Kafka 客户端引入了新的消费者组协议。
有关详细信息及其限制,请参见 KIP-848。
新的消费者组协议是一个早期访问版本,不建议用于生产环境。
在本版本中仅建议用于测试目的。
因此,Spring for Apache Kafka 仅在与 kafka-client 本身提供的测试级别支持范围内支持此新的消费者组协议。
默认情况下,Spring for Apache Kafka 使用经典的消费者组协议;在测试新的消费者组协议时,需要通过 group.protocol 属性进行启用。
测试支持变更
kraft 模式在 EmbeddedKafka 中默认被禁用,用户想要使用 kraft 模式必须启用它。
这是因为在 EmbeddedKafka 的 kraft 模式下观察到某些不稳定性,尤其是在测试新的消费者组协议时。
新的消费者组协议仅支持在 kraft 模式下,因此在测试新协议时,必须针对真实的 Kafka 集群,而不是基于 KafkaClusterTestKit 的那个,EmbeddedKafka 也是基于此。
此外,还观察到了一些在 KafkaListener 方法使用 EmbeddedKafka 的 kraft 模式时的竞态条件。
在这些问题解决之前,kraft 在 EmbeddedKafka 上的默认值将保持为 false。
Kafka Streams 交互式查询支持
一个新的API KafkaStreamsInteractiveQuerySupport,用于访问Kafka Streams交互查询中使用的可查询存储。
查看 Kafka Streams交互支持 了解更多信息。
交易ID后缀策略
新增了 TransactionIdSuffixStrategy 接口用于管理 transactional.id 后缀。
默认实现为 DefaultTransactionIdSuffixStrategy,当设置 maxCache 大于零时,可以在特定范围内复用 transactional.id,否则将通过递增计数器动态生成后缀。
有关更多信息,请参见 Fixed TransactionIdSuffix。
异步 @KafkaListener 返回
@KafkaListener(以及 @KafkaHandler)方法现在可以返回异步返回类型,包括 CompletableFuture<?>、Mono<?> 和 Kotlin 的 suspend 函数。
请参阅 异步返回 以获取更多信息。
基于抛出的异常将消息路由到自定义DLTs
现在可以在消息处理过程中根据抛出的异常类型将消息重定向到自定义死信队列(DLT)。
重定向规则可以通过RetryableTopic.exceptionBasedDltRouting或RetryTopicConfigurationBuilder.dltRoutingRules设置。
自定义死信队列以及其他重试和死信主题也会自动创建。
有关更多信息,请参阅基于抛出的异常将消息路由到自定义死信队列。
容器属性transactionManager属性弃用
transactionManager 属性在 ContainerProperties 中已被弃用,建议使用 KafkaAwareTransactionManager ,这是一个比通用的 PlatformTransactionManager 更窄的类型。参见 ContainerProperties 和 事务同步。
回滚处理后
一个新的AfterRollbackProcessor API processBatch 提供了。
请参阅 回滚处理器 以获取更多信息。
更改 @RetryableTopic 同一时隔策略默认值
将@RetryableTopic属性SameIntervalTopicReuseStrategy的默认值更改为SINGLE_TOPIC。
参见单主题最大间隔指数延迟.
非阻塞重试支持类级别@KafkaListener
非阻塞重试支持 @KafkaListener 在类上。 请参见 非阻塞重试.
在RetryTopicConfigurationProvider中对一个类应用@RetryableTopic过程。
提供新的公共API以查找RetryTopicConfiguration。
见Find RetryTopicConfiguration
RetryTopicConfigurer 支持处理 MultiMethodKafkaListenerEndpoint。
The RetryTopicConfigurer支持过程和注册MultiMethodKafkaListenerEndpoint。
The MultiMethodKafkaListenerEndpoint为属性defaultMethod和methods提供getter/setter。
修改仅适用于MethodKafkaListenerEndpoint类型的EndpointCustomizer。
The EndpointHandlerMethod添加新构造函数以构建提供的bean实例。
提供新的类EndpointHandlerMultiMethod来处理多方法重试端点。
基于用户提供的函数寻求到一个偏移的新API方法
ConsumerCallback 提供了一种新的API,可以根据用户定义的函数定位到指定偏移量,该函数将消费者的当前偏移量作为参数。
更多信息,请参阅 Seek API 文档。
@PartitionOffset 支持 SeekPosition
添加了seekPosition属性到@PartitionOffset对TopicPartitionOffset.SeekPosition的支持。更多细节请参见手动分配。
新的 `TopicPartitionOffset` 构造函数接受一个计算要跳转的偏移量的功能参数
TopicPartitionOffset现在有一个新的构造函数,该构造函数接受一个用户提供的函数来计算跳转到的位置。
当使用此构造函数时,框架会用当前消费者偏移位置作为输入参数调用该函数。
有关更多详细信息,请参见Seek API 文档。
Spring Boot应用名称作为默认客户端ID前缀
对于定义了应用程序名称的Spring Boot应用,该名称现在被用作某些客户端类型自动生成客户端ID的默认前缀。 请参见默认客户端ID前缀以获取更多信息。
增强的消息监听容器检索
ListenerContainerRegistry 提供了两个新的 API 动态查找和过滤 MessageListenerContainer 实例。getListenerContainersMatching(Predicate<String> idMatcher) 用于按 ID 进行筛选,另一个是 getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher) 用于按 ID 和容器属性进行筛选。
见 @KafkaListener 生命周期管理的API文档 获取更多信息。
增强观察通过提供更多的跟踪标签
KafkaTemplateObservation 提供了更多的跟踪标签(低基数)。
KafkaListenerObservation 提供了一个新的 API 以查找高基数键名称以及更多跟踪标签(高或低基数)。请参见 Micrometer 观测
自 3.1 版本以来的新功能
此部分涵盖了从版本3.0到版本3.1所做的更改。 早期版本的变更请参见变更历史。
嵌入式Kafka Brokers
现在提供了一个额外的实现,可以使用Kraft而不是Zookeeper。
更多信息请参见嵌入式Kafka代理。
JsonDeserializer
当反序列化异常发生时,SerializationException消息不再包含具有形式Can’t deserialize data [[123, 34, 98, 97, 122, …的数据;每个数据字节的数值数组对于大数据来说可能没有用且很冗长。
使用ErrorHandlingDeserializer时,DeserializationException发送给错误处理程序包含data属性,该属性包含无法反序列化的原始数据。
如果不使用ErrorHandlingDeserializer,KafkaConsumer将继续为同一记录发出异常,显示主题/分区/偏移量以及由Jackson抛出的错误原因。
ContainerPostProcessor
Post-processing 可以通过在 @KafkaListener 注解中指定监听器容器的 bean 名称来应用。
这发生在容器已经创建并且在容器工厂上配置的任何 ContainerCustomizer 都已设置之后。
有关更多信息,请参阅 容器工厂。
错误处理反序列化器
您现在可以在这个反序列化器中添加一个Validator; 如果代理Deserializer成功地反序列化了对象,但该对象验证失败,则会抛出类似反序列化异常的错误。
这使得原始数据能够传递给错误处理器。参见使用ErrorHandlingDeserializer获取更多信息。
重试主题
将后缀-retry-5000改为-retry时,请在@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)处操作。
若要保持后缀-retry-5000,请使用@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")。
更多信息请参见主题命名。
监听器容器变更
当手动分配分区时,使用一个null消费者group.id,现在的AckMode会自动转换为MANUAL。
有关更多信息,请参阅手动分配所有分区。
自从2.9以来的3.0新功能
精确一次语义
EOSMode.V1 (aka ALPHA) 不再受支持。
| 在使用事务时,最小的broker版本是2.5。 |
见 Exactly Once Semantics 和 KIP-447 了解更多。
观察
启用 Micrometer 对定时器和跟踪的观察功能现在已受支持。请参阅观察获取更多信息。
原生镜像
支持创建原生映像。
请参阅 原生映像 了解更多信息。
全局单嵌入Kafka
The embedded Kafka (EmbeddedKafkaBroker) can now be start as a single global instance for the whole test plan.
See Using the Same Broker(s) for Multiple Test Classes for more information.
重试主题变更
此功能不再被视为实验性(就其API而言),该功能自2.7版本以来就已经支持,但可能会有大于正常的API变更可能性。
The bootstrapping of Non-Blocking Retries infrastructure beans has changed in this release to avoid some timing problems that occurred in some application regarding application initialization.
您现在可以为重试容器设置不同的concurrency;默认情况下,并发数与主容器相同。
@RetryableTopic 可以作为自定义注解上的元注解使用,包括支持 @AliasFor 属性。
见 配置 以获取更多信息。
The default replication factor for the retry topics is now -1 (use broker default).
If your broker is earlier that version 2.4, you will now need to explicitly set the property.
您现在可以在同一个应用上下文中为同一主题配置多个@RetryableTopic监听器。
在此之前,这并不是可能的。
更多详细信息请参阅多个监听器,同一主题(或多个)。
在RetryTopicConfigurationSupport版本中,存在打破API的更改;具体来说,如果你重写了bean定义方法对于destinationTopicResolver、kafkaConsumerBackoffManager和/或retryTopicConfigurer;这些方法现在需要一个ObjectProvider<RetryTopicComponentFactory>参数。
监听器容器变更
事件与消费者身份验证和授权失败相关现在由容器发布。 请参阅应用程序事件以获取更多信息。
您可以现在自定义消费者线程使用的线程名称。 请参阅 容器线程命名 以获取更多信息。
The container property restartAfterAuthException has been added.
See Listener Container Properties for more information.
KafkaTemplate变更
该类返回的未来值现在是CompletableFuture而不是ListenableFuture。
请参见使用KafkaTemplate.
ReplyingKafkaTemplate变更
此类别返回的未来对象现在是 CompletableFuture 秒,而不是 ListenableFuture 秒。 见 使用 ReplyingKafkaTemplate 和 带有 Message<?> 秒的请求/回复.
@KafkaListener变更
您现在可以使用自定义相关标头,该标头将在任何回复消息中重复显示。
请参阅使用ReplyingKafkaTemplate一文末尾的备注以获取更多信息。
你现在可以手动提交批处理中的部分消息,而无需等待整个批处理完成。更多详情请参阅提交偏移量。
KafkaHeaders变更
在2.9.x版本中被弃用的KafkaHeaders中的四个常量现在已被移除。
-
请将
MESSAGE_KEY改为KEY。 -
改用
PARTITION,而不是PARTITION_ID
同样地,RECEIVED_MESSAGE_KEY 被替换为 RECEIVED_KEY,而 RECEIVED_PARTITION_ID 被替换为 RECEIVED_PARTITION。
测试更改
Version 3.0.7 引入了 MockConsumerFactory 和 MockProducerFactory。
See 模拟消费者和生产者 以获取更多信息。
从版本 3.0.10 开始,嵌入式 Kafka 中继器,默认情况下会将 Spring Boot 属性 spring.kafka.bootstrap-servers 设置为嵌入式中继器的地址。
自2.8以来,2.9的新功能
错误处理器更改
The DefaultErrorHandler 现在可以配置为暂停容器一次拉取操作,并使用之前拉取操作剩余的结果,而不是跳转到剩余记录的偏移量。
有关更多信息,请参阅 DefaultErrorHandler。
The DefaultErrorHandler 现在有一个 BackOffHandler 属性。
See Back Off Handlers 以获取更多信息。
监听器容器变更
interceptBeforeTx 现在可以与所有事务管理器一起工作(之前它仅应用于 KafkaAwareTransactionManager)。
请参见 [前置事务拦截].
pauseImmediate 是一个新的容器属性,它使容器能够在处理完当前记录后暂停消费者,而不是在上一次轮询的所有记录都被处理完毕后再暂停。
请参阅 [pauseImmediate].
事件相关的消费者身份验证和授权
Header Mapper 更改
现在可以配置应该映射哪些入站标头。 也可以在版本 2.8.8 或更新的版本中使用。 有关更多信息,请参阅 消息标头。
KafkaTemplate变更
在3.0版本中,此类返回的未来将为CompletableFuture而不是ListenableFuture。
请参见使用KafkaTemplate以获取在此发布版本过渡时的帮助。
ReplyingKafkaTemplate变更
The template now provides a method to wait for assignment on the reply container, to avoid a race when sending a request before the reply container is initialized.
Also available in version 2.8.8 or later.
See 使用ReplyingKafkaTemplate.
在3.0版本中,此类返回的未来将为CompletableFuture而不是ListenableFuture。有关在此版本下进行迁移的帮助,请参阅使用ReplyingKafkaTemplate和与Message<?>s进行请求/回复。
自2.7版本以来的新功能2.8
这个部分介绍了从版本 2.7 到版本 2.8 的更改。早期版本的更改,请参阅变更历史记录。
包变更
类型映射相关的类和接口已经从…support.converter移至…support.mapping。
-
AbstractJavaTypeMapper -
ClassMapper -
DefaultJackson2JavaTypeMapper -
Jackson2JavaTypeMapper
乱序的手动提交
The listener container can now be configured to accept manual offset commits out of order (usually asynchronously). The container will defer the commit until the missing offset is acknowledged. See Manually Committing Offsets for more information.
@KafkaListener变更
现在可以在方法本身上指定监听器方法是否为批量监听器。 这使得同一个容器工厂可以用于记录和批量监听器的配置。
See [批量监听器] for more information.
批处理监听器现在可以处理转换异常。
See 批量错误处理程序中的转换错误 以获取更多信息。
RecordFilterStrategy, 当与批处理监听器结合使用时,现在可以在一次调用中过滤整个批处理。
有关更多信息,请参阅[批处理监听器]一文末尾的说明。
The @KafkaListener 注解现在具有 filter 属性,用于覆盖此监听器容器工厂的 RecordFilterStrategy。
The @KafkaListener 注解现在具有 info 属性;此属性用于填充新的监听器容器属性 listenerInfo。
这然后被用来在每个记录中生成一个 KafkaHeaders.LISTENER_INFO 头部,该头部可以在 RecordInterceptor、RecordFilterStrategy 或者监听器本身中使用。
有关更多信息,请参阅 监听器信息头部 和 AbstractMessageListenerContainer 属性。
KafkaTemplate变更
你现在可以基于主题、分区和偏移量来接收单条记录。更多详情请参阅使用KafkaTemplate接收。
CommonErrorHandler添加
The legacy GenericErrorHandler 和其子接口层次结构用于记录和批处理监听器已被一个新的单一接口 CommonErrorHandler 取代,该新接口有对应于大多数遗留的 GenericErrorHandler 实现的实现。
请参阅 容器错误处理器 和 将自定义遗留错误处理程序实现迁移到 CommonErrorHandler 以获取更多信息。
监听器容器变更
The interceptBeforeTx 容器属性现在默认为 true。
The authorizationExceptionRetryInterval 属性已被重命名为 authExceptionRetryInterval,现在适用于除 AuthorizationException 之外的 AuthenticationException。
两种异常都被视为致命错误,默认情况下容器会停止运行,除非设置了该属性。
见 使用 KafkaMessageListenerContainer 和 侦听器容器属性 以获取更多信息。
序列化/反序列化更改
The DelegatingByTopicSerializer 和 DelegatingByTopicDeserializer 现在已提供。
See 委托序列化器和反序列化器 以获取更多详细信息.
DeadLetterPublishingRecover变更
The property stripPreviousExceptionHeaders is now true by default.
现在有几种技术可以自定义添加到输出记录中的哪些标头。
见 Managing Dead Letter Record Headers 的更多信息。
重试主题变更
现在您可以使用同一个工厂来处理可重试和不可重试的主题。查看指定ListenerContainerFactory的更多信息.
现在有一个可管理的全局致命异常列表,将会使失败记录直接进入DLT。 请参阅异常分类器以了解如何进行管理。
您现在可以将阻塞和非阻塞重试结合起来使用。
请参阅结合阻塞和非阻塞重试以获取更多信息。
The KafkaBackOffException thrown when using the retryable topics feature is now logged at DEBUG level. See Changing KafkaBackOffException Logging Level if you need to change the logging level back to WARN or set it to any other level.
变化从 2.6 到 2.7
Kafka 客户端版本
此版本需要2.7.0 kafka-clients。
它还与自2.7.1版本以来的2.8.0客户端兼容;请参阅覆盖Spring Boot依赖项。
非阻塞延迟重试机制使用主题
在本次发布中添加了这一重要新功能。 当严格顺序不重要时,失败的递送可以被发送到另一个主题供以后消费。 可以配置一系列这样的重试主题,带有不断增加的延迟。 有关更多信息,请参见非阻塞重试。
监听器容器变更
The onlyLogRecordMetadata 容器属性现在默认为 true。
一个新的容器属性stopImmediate现在可用。
See Listener Container Properties for more information.
使用一个BackOff在重试尝试之间的错误处理程序(例如:SeekToCurrentErrorHandler和DefaultAfterRollbackProcessor)现在会在容器停止后不久退出退避间隔,而不是延迟停止。
错误处理器和回滚后处理程序,如果它们扩展自FailedRecordProcessor,现在可以配置一个或多个RetryListener以接收关于重试和恢复进度的信息。
The RecordInterceptor 现在在监听器返回(正常或通过抛出异常)后还具有其他方法。
它还具有一個子接口 ConsumerAwareRecordInterceptor。
此外,现在为批处理监听器提供了一个 BatchInterceptor。
有关更多详细信息,请参阅 消息监听容器。
@KafkaListener变更
你现在可以验证@KafkaHandler方法(类级别侦听器)的参数负载。@KafkaListener @Payload 验证中包含更多信息。
您现在可以将rawRecordHeader属性设置在MessagingMessageConverter和BatchMessagingMessageConverter上,这会导致原始的ConsumerRecord被添加到转换后的Message<?>中。
这对于例如在监听器错误处理程序中使用DeadLetterPublishingRecoverer时非常有用。
请参阅监听器错误处理程序以获取更多信息。
您现在可以在应用程序初始化时修改@KafkaListener注解。
有关更多信息,请参见@KafkaListener 属性修改。
DeadLetterPublishingRecover变更
现在,如果键和值的反序列化都失败,则会将原始值发布到DLT。
此前,值会被填充但键DeserializationException仍然保留在头部。
存在打破API的更改,如果您继承了恢复器并重写了createProducerRecord方法。
此外,恢复器会在发布到目标解析器之前验证由目标解析器选择的分区是否确实存在。
See 发布死信记录 以获取更多信息。
ChainedKafkaTransactionManageris Deprecated
见 事务 了解更多信息。
ReplyingKafkaTemplate变更
现在有一个机制可以检查回复并在将来如果存在某些条件时异常失败。
已添加发送和接收spring-messagingMessage<?>的功能。
见 使用 ReplyingKafkaTemplate 以获得更多信息。
Kafka Streams 变更
默认情况下,StreamsBuilderFactoryBean现在配置为不清理本地状态。
请参阅配置以获取更多详细信息。
KafkaAdmin变更
新方法 createOrModifyTopics 和 describeTopics 已被添加。
KafkaAdmin.NewTopics 用于在单个 Bean 中配置多个主题,请参阅 [配置主题] 获取更多信息。
MessageConverter变更
现在可以向MessagingMessageConverter添加一个spring-messaging或SmartMessageConverter,从而根据contentType头部进行内容协商。
有关更多信息,请参阅Spring Messaging 消息转换。
序列化@KafkaListener s
查看 启动 @KafkaListeners 在序列中 以获取更多信息。
ExponentialBackOffWithMaxRetries
一个新的BackOff实现被提供出来,使得配置最大重试次数更加方便。
请参阅ExponentialBackOffWithMaxRetries 实现以获取更多信息。
条件委托错误处理器
这些新的错误处理器可以配置为根据异常类型委托给不同的错误处理器。 有关更多信息,请参阅 委托的错误处理器。
变更说明从 2.5 版到 2.6 版
监听器容器变更
各种错误处理器(继承自FailedRecordProcessor)和DefaultAfterRollbackProcessor现在在恢复失败时会重置BackOff。此外,您现在可以根据失败记录和/或异常来选择要使用的BackOff。
您现在可以在容器属性中配置一个adviceChain。
有关更多信息,请参阅监听器容器属性。
当容器配置为发布ListenerContainerIdleEvents时,现在在发送记录后会发布一个ListenerContainerNoLongerIdleEvent而不是空闲事件。
请参阅应用事件和检测空闲和非响应式消费者以获取更多信息。
@KafkaListener 变更
当使用手动分区分配时,现在可以指定通配符以确定应将哪些分区重置为初始偏移量。
此外,如果监听器实现ConsumerSeekAware,那么在手动分配后会调用onPartitionsAssigned()。
(此功能也在版本2.5.5中添加)。
请参阅显式分区分配以获取更多信息。
已向AbstractConsumerSeekAware添加了方便的方法,以使定位更加容易。
有关更多信息,请参阅[定位]。
错误处理器更改
子类的FailedRecordProcessor(例如:SeekToCurrentErrorHandler、DefaultAfterRollbackProcessor、RecoveringBatchErrorHandler)现在可以配置为如果异常类型与之前记录中的类型不同,则重置重试状态。
Producer Factory Changes
您现在可以设置生产者最大存活时间,在此之后,它们将被关闭并重新创建。
请参阅事务以获取更多信息。
您现在可以在创建了 `0` 之后更新配置映射。这在例如凭证更改后需要更新 SSL 密钥/信任库位置时可能会很有用。更多信息,请参阅 使用 `DefaultKafkaProducerFactory`.
Spring框架2.4与2.5之间的更改
此部分涵盖了从版本2.4到版本2.5所做的更改。对于早期版本的变更历史,请参见变更历史。
消费者/生产者工厂更改
The default consumer and producer factories can now invoke a callback whenever a consumer or producer is created or closed. Implementations for native Micrometer metrics are provided. See Factory Listeners for more information.
您现在可以在运行时更改 Bootstrap 服务器属性,从而启用向另一个 Kafka 集群的故障转移。了解连接到 Kafka 的更多信息。
StreamsBuilderFactoryBean变更
The factory bean can now invoke a callback whenever a KafkaStreams created or destroyed.
An Implementation for native Micrometer metrics is provided.
See KafkaStreams Micrometer 支持 for more information.
配送尝试标题
现在在使用某些错误处理器和事务回滚处理器时,可以添加一个跟踪投递尝试的头部选项。 有关更多信息,请参阅 投递尝试头部。
@KafkaListener 变更
默认情况下,如果返回类型为@KafkaListener且需要填充默认回复头,则会自动填充这些头。
请参见Reply Type Message<?>以获取更多信息。
The 0 是不再填充为 1 值的,当传入记录包含 2 键时;头部完全省略了。
@KafkaListener 方法现在可以指定一个 ConsumerRecordMetadata 参数,而不是使用主题、分区等元数据的独立头。
更多信息,请参见 消费者记录元数据。
监听器容器变更
The assignmentCommitOption容器属性现在默认为LATEST_ONLY_NO_TX。
请参阅监听器容器属性以获取更多信息。
The subBatchPerPartition容器属性现在默认为true当使用事务时。
更多详情请参见事务。
一个全新的RecoveringBatchErrorHandler现在提供了。
静态组成员资格现在受支持。 请参阅消息监听容器以获取更多信息。
当增量/协作重新平衡被配置时,如果偏移量因非致命错误RebalanceInProgressException而无法提交,在重新平衡完成后,容器将尝试为仍分配给此实例的分区重试提交偏移量。
The default error handler is now the SeekToCurrentErrorHandler for record listeners and RecoveringBatchErrorHandler for batch listeners.
See 容器错误处理器 for more information.
现在您可以控制标准错误处理程序故意抛出的异常的日志记录级别。更多信息请参阅容器错误处理程序。
The getAssignmentsByClientId()方法已被添加,使得在并发容器中更易于确定哪些消费者被分配了哪个分区。
请参阅监听器容器属性以获取更多信息。
您现在可以抑制在错误、调试日志等中记录整个ConsumerRecords。
参见监听器容器属性中的onlyLogRecordMetadata。
KafkaTemplate 变更
The KafkaTemplate 现在可以维护 micrometer 计时器。
见 监控 以获取更多详细信息.
The KafkaTemplate 现在可以配置 ProducerConfig 属性以覆盖生产者工厂中的那些属性。
参见 使用 KafkaTemplate 以获取更多详细信息。
一个 RoutingKafkaTemplate 现在已经被提供。
请参见 使用 RoutingKafkaTemplate 以获取更多信息。
您现在可以使用KafkaSendCallback而不是ListenerFutureCallback来获取更窄的异常,从而更容易提取失败的ProducerRecord。
请参阅使用KafkaTemplate以获取更多详细信息。
Kafka 字符串序列化器/反序列化器
New ToStringSerializer/StringDeserializers as well as an associated SerDe are now provided.
See 字符串序列化 for more information.
JsonDeserializer
The JsonDeserializer 现在更具灵活性,可以确定反序列化类型。
请参见 通过方法来确定类型 以获取更多信息。
委托序列化器/反序列化器
The DelegatingSerializer 现在可以处理“标准”类型,当传出记录没有头部时。
更多信息,请参见 委托序列化器和反序列化器。
测试更改
The KafkaTestUtils.consumerProps() helper record now sets ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest by default.
See JUnit for more information.
变化在2.3和2.4之间
ConsumerAwareRebalanceListener
像 ConsumerRebalanceListener 这个接口现在有了一个新的方法 onPartitionsLost。
请参阅 Apache Kafka 文档以获取更多信息。
不同于ConsumerRebalanceListener,默认实现不会调用onPartitionsRevoked。相反,监听器容器会在调用完onPartitionsLost之后再调用该方法;因此,在实现ConsumerAwareRebalanceListener时不应做同样的事情。
见 Rebalancing Listeners 部分末尾的重要说明以获得更多信息。
KafkaTemplate
The KafkaTemplate 现在支持非事务性发布与事务性发布并行。
请参阅 KafkaTemplate 事务性和非事务性发布 获取更多详细信息。
AggregatingReplyingKafkaTemplate
The releaseStrategy is now a BiConsumer.
It is now called after a timeout (as well as when records arrive); the second parameter is true in the case of a call after a timeout.
见聚合多个回复了解更多信息。
监听器容器
The 0 provides an 1 option to let the listener container to retry after any 2 is thrown by the 3.
See its JavaDocs and Using KafkaMessageListenerContainer for more information.
@KafkaListener
The @KafkaListener annotation has a new property splitIterables; default true.
When a replying listener returns an Iterable this property controls whether the return result is sent as a single record or a record for each element is sent.
See Forwarding Listener Results using @SendTo for more information
批处理监听器现在可以配置为BatchToRecordAdapter; 这意味着,例如,批处理可以在事务中进行处理,而监听器一次接收一条记录。
使用默认实现时,可以使用ConsumerRecordRecoverer来在批次内部处理错误,而不中断整个批次的处理 - 在使用事务的情况下这可能很有用。
请参阅 批处理监听器中的事务 以获取更多信息。
Kafka Streams
The StreamsBuilderFactoryBean 接受一个新属性 KafkaStreamsInfrastructureCustomizer。
这允许在创建流之前配置构建器和/或拓扑结构。有关更多信息,请参阅 Spring 管理。
变化概述
此部分涵盖了从版本 2.2 到版本 2.3 的更改。
提示, 技巧和示例
一个新章节 提示、技巧和示例 已添加。 请提交 GitHub 问题和/或拉取请求以在该章节中增加更多条目。
配置更改
自2.3.4版本起,missingTopicsFatal容器属性默认为false。
当此设置为true时,如果代理器宕机,则应用程序将无法启动;许多用户都受到了这一更改的影响;鉴于Kafka是一个高可用性平台,我们并未预料到没有活动代理器的情况下启动应用程序会是一种常见的用例。
Producer and Consumer Factory Changes
The 0可以现在被配置为每个线程创建一个生产者。你也可以在构造函数中提供1个实例,作为另一种选择,要么使用配置的类(这需要无参构造函数),或者使用2个实例进行构造,这些实例将在所有生产者之间共享。 更多信息请参阅使用0。
在带有Supplier<Deserializer>个实例的DefaultKafkaConsumerFactory中同样可以使用这个选项。
有关更多信息,请参阅KafkaMessageListenerContainer的用法。
监听器容器变更
此前,当使用监听器适配器(例如2s)调用监听器时,错误处理程序接收到的参数为ListenerExecutionFailedException(实际监听器异常作为cause)。
由原生GenericMessageListener抛出的异常未修改地传递给错误处理程序。
现在,错误处理程序始终接收一个ListenerExecutionFailedException参数(实际监听器异常作为cause),这提供了访问容器的group.id属性的能力。
因为监听容器有自己的提交偏移量的机制,它更偏好于Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG被设置为false。
现在,除非在消费者工厂或容器的消费者属性中明确设置,否则它会自动设置为false。
The ackOnError 属性现在默认为 false。
现在可以在监听器方法中获取消费者的group.id属性。
有关更多信息,请参阅取得消费者group.id。
The container has a new property recordInterceptor allowing records to be inspected or modified before invoking the listener.
A CompositeRecordInterceptor is also provided in case you need to invoke multiple interceptors.
See Message Listener Containers for more information.
The ConsumerSeekAware 有新的方法,允许你在开始、结束或当前位置进行跳转,并且可以跳转到大于或等于时间戳的第一个偏移量。
请参见 [seek] 获取更多详细信息。
一个方便类 AbstractConsumerSeekAware 现在提供了,用于简化查找操作。
参见 [查找] 以获取更多信息.
The ContainerProperties 提供了一个 idleBetweenPolls 选项,让监听器容器的主要循环在两次 KafkaConsumer.poll() 调用之间暂停。
请参阅其 JavaDocs 和 使用 KafkaMessageListenerContainer 获取更多详细信息。
当使用AckMode.MANUAL(或MANUAL_IMMEDIATE)时,你现在可以通过调用nack来触发重传,在Acknowledgment中可以找到更多相关信息。
提交偏移量
The containers now publish additional consumer lifecycle events relating to startup. See Application Events for more information.
事务性批处理监听器现在可以支持僵尸隔离。 更多详情请参见事务。
The listener container factory can now be configured with a ContainerCustomizer to further configure each container after it has been created and configured.
See Container factory for more information.
错误处理器更改
The SeekToCurrentErrorHandler 现在将某些异常视为致命的,并在首次失败时禁用重试,调用恢复器。
The SeekToCurrentErrorHandler and SeekToCurrentBatchErrorHandler can now be configured to apply a BackOff (thread sleep) between delivery attempts.
从版本 2.3.2 开始,当错误处理程序在恢复失败记录后返回时,已恢复记录的偏移量将被提交。
The `DeadLetterPublishingRecoverer`,当与一个`1`结合使用时,现在设置了发送到死信主题的消息载荷为原本无法反序列化的原始值。此前,它是`2`,用户代码需要从消息头中提取`3`。
更多详细信息请参见发布死信记录。
TopicBuilder
一个新类TopicBuilder提供了一种更方便的方法来创建NewTopic@Beans,用于自动主题分配。
更多信息,请参阅[配置主题]。
Kafka Streams 变更
您现在可以对由@EnableKafkaStreams创建的StreamsBuilderFactoryBean进行额外的配置。
参见流配置以获取更多信息。
A RecoveringDeserializationExceptionHandler 现在提供了一个允许记录具有反序列化错误的状态,以便从这些错误中恢复。它可以用作将这些记录发送到死信主题的 DeadLetterPublishingRecoverer 的辅助工具。有关更多信息,请参阅 从反序列化异常恢复。
The HeaderEnricher 转换器已提供,使用SpEL生成头部值。
See Header Enricher for more information.
The MessagingTransformer 已被提供。
这使得 Kafka 流处理拓扑能够与一个 spring-messaging 组件进行交互,如 Spring Integration 流。
请参阅 MessagingProcessor 和 See [从 KStream 调用一个 Spring Integration 流] 以获取更多信息。
JSON组件更改
现在所有JSON感知组件默认使用Jackson生成的ObjectMapper进行配置。
JacksonUtils.enhancedObjectMapper() 现在提供了基于TypeReference 的构造函数,以便更好地处理目标泛型容器类型。
此外还引入了JacksonMimeTypeModule 用于将org.springframework.util.MimeType 序列化为普通字符串。
有关更多信息,请参见其Java文档和序列化、反序列化及消息转换。
一个 ByteArrayJsonMessageConverter 已提供,同时还有一个所有 Json 转换器的新超类,JsonMessageConverter。
此外,现在有一个 StringOrBytesSerializer;它可以将 byte[]、Bytes 和 String 值转换为 ProducerRecords。
有关更多信息,请参阅 Spring Messaging 消息转换。
The JsonSerializer, JsonDeserializer and JsonSerde now have fluent APIs to make programmatic configuration simpler.
See the javadocs, 序列化、反序列化和消息转换, and 流的JSON序列化和反序列化 for more information.
AggregatingReplyingKafkaTemplate
扩展ReplyingKafkaTemplate通过从多个接收者聚合回复。
参见聚合多个回复以获取更多信息。
事务更改
您现在可以在KafkaTemplate和KafkaTransactionManager上覆盖生产者工厂的transactionIdPrefix。
有关更多信息,请参阅:transactionIdPrefix。
新的委托序列化器/反序列化器
The framework now provides a delegating Serializer and Deserializer, utilizing a header to enable producing and consuming records with multiple key/value types.
See Delegating Serializer and Deserializer for more information.
新重试反序列化器
The framework now provides a delegating RetryingDeserializer, to retry serialization when transient errors such as network problems might occur.
See Retrying Deserializer for more information.
2.1 与 2.2 版本之间的变更
类和包的更改
The ContainerProperties class has been moved from org.springframework.kafka.listener.config to org.springframework.kafka.listener.
The AckMode 枚举已被从 AbstractMessageListenerContainer 移动到 ContainerProperties。
The setBatchErrorHandler() 和 setErrorHandler() 方法已被从 ContainerProperties 移动到 AbstractMessageListenerContainer 和 AbstractKafkaListenerContainerFactory。
回滚处理后
新的AfterRollbackProcessor策略已提供。
请参阅回滚后处理器以获取更多详细信息。
ConcurrentKafkaListenerContainerFactory变更
您现在可以使用ConcurrentKafkaListenerContainerFactory来创建和配置任何ConcurrentMessageListenerContainer,而不仅仅是针对@KafkaListener注解的。
有关更多信息,请参阅容器工厂。
监听器容器变更
一个新的容器属性(missingTopicsFatal)已被添加。
更多详情请参阅 使用 KafkaMessageListenerContainer。
一个 ConsumerStoppedEvent 现在会在消费者停止时发出。
有关更多信息,请参见 线程安全性。
批处理监听器可以选择接收完整的ConsumerRecords<?, ?>对象,而不是List<ConsumerRecord<?, ?>。
有关更多信息,请参阅[批量监听器]。
The DefaultAfterRollbackProcessor 和 SeekToCurrentErrorHandler 现在可以恢复(跳过)持续失败的记录,并且默认情况下会在 10 次失败后进行此操作。它们可以配置为将失败的记录发布到死信主题。
使用版本2.2.4后,可以在选择死信主题名称时使用消费者的组ID。
The ConsumerStoppingEvent 已经被添加。
See 应用事件 以获取更多信息。
The SeekToCurrentErrorHandler 现在可以配置为在容器使用 AckMode.MANUAL_IMMEDIATE 配置时提交恢复记录的偏移量(自 2.2.4 版起).
@KafkaListener 变更
您现在可以通过在注解上设置属性来覆盖监听器容器工厂的concurrency和autoStartup属性。
您可以添加配置以确定哪些(如果有的话)标头被复制到回复消息中。
有关更多信息,请参阅@KafkaListener注解。
您现在可以使用@KafkaListener作为元注解在自定义注解上。
请参阅@KafkaListener作为元注释以获取更多信息。
现在为Validator配置@Payload验证变得更加容易。
更多详情请参见@KafkaListener @Payload验证。
您现在可以直接在注解上指定 Kafka 消费者属性;这些属性将覆盖消费者工厂中同名定义的任何属性(自版本 2.2.4 开始)。 有关更多信息,请参阅 注解属性。
Header Mapping Changes
Headers of type MimeType 和 MediaType 现在映射为简单的字符串类型,在RecordHeader值中。
之前,它们被映射为 JSON,并且只有MimeType会被解码。MediaType无法被解码。
现在为了互操作性,它们是简单的字符串。
Also, the DefaultKafkaHeaderMapper has a new addToStringClasses method, allowing the specification of types that should be mapped by using toString() instead of JSON.
See Message Headers for more information.
嵌入式Kafka更改
The KafkaEmbedded 类及其 KafkaRule 接口已被弃用,改用了 EmbeddedKafkaBroker 和其 JUnit 4 EmbeddedKafkaRule 包装器。
The @EmbeddedKafka 注解现在填充一个 EmbeddedKafkaBroker 实例而不是弃用的 KafkaEmbedded 实例。
This change allows the use of @EmbeddedKafka 在 JUnit 5 测试中。
The @EmbeddedKafka 注解现在有一个属性 ports 来指定填充 EmbeddedKafkaBroker 的端口。请参阅 测试应用程序 获取更多信息。
Spring 框架中的 Java 开发者增强功能
您可以现在通过使用生产者和消费者属性来提供类型映射信息。
新的反序列化构造函数可供使用,允许通过提供的目标类型覆盖类型头部信息。
The JsonDeserializer 现在默认移除任何类型信息头部。
您现在可以通过使用一个Kafka属性来配置JsonDeserializer以忽略类型信息头(从2.2.3版本开始)。
See 序列化、反序列化和消息转换 以获取更多详细信息。
Kafka Streams 变更
Spring框架中,流配置bean现在必须是一个KafkaStreamsConfiguration对象,而不是一个StreamsConfig对象。
StreamsBuilderFactoryBean已被从包…core移动到…config。
当基于KStream实例构建条件分支时,为了提高最终用户体验,已经引入了KafkaStreamBrancher。
See Apache Kafka Streams 支持 and 配置 for more information.
事务标识
当监听容器启动事务时,transactional.id 现在是 transactionIdPrefix 附加上 <group.id>.<topic>.<partition>。
这个变化允许正确隔离僵尸对象,如这里所述。
2.0 与 2.1 版本之间的变更
JSON改进
The StringJsonMessageConverter 和 JsonSerializer 现在在 Headers 中添加类型信息,让转换器和 JsonDeserializer 在接收时根据消息本身创建特定的类型,而不是固定配置的类型。
更多详细信息,请参阅 序列化、反序列化和消息转换。
容器停止错误处理程序
容器错误处理程序现在为记录和批量侦听器提供,将任何由侦听器抛出的异常视为致命错误。它们会停止容器。 请参阅 处理异常 以获取更多信息。
暂停和恢复容器
The listener containers now have pause() and resume() methods (since version 2.1.3).
See Pausing and Resuming Listener Containers for more information.
有状态重试
从版本2.1.3开始,您可以配置状态化重试。 更多信息,请参见状态化重试。
客户端ID
从 2.1.1 版本开始,你现在可以在 @KafkaListener 前面设置 client.id 前缀。
此前,要自定义客户端 ID,你需要为每个监听器分别配置一个消费者工厂(和容器工厂)。
前缀会在使用并发时自动后接 -n 以提供唯一的客户端 ID。
日志偏移提交
默认情况下,主题偏移提交的日志记录使用DEBUG日志级别进行。
从2.1.2版本开始,在ContainerProperties中新增了一个名为commitLogLevel的属性,允许您指定这些消息的日志级别。
有关更多信息,请参阅使用KafkaMessageListenerContainer。
默认 @KafkaHandler
从版本 2.1.3 开始,您可以在类级别的注解中指定一个 @KafkaHandler 注解作为默认值。更多详细信息请参阅 @KafkaListener 在类级别上的使用。
ReplyingKafkaTemplate
自版本 2.1.3 起,提供了一个子类 KafkaTemplate 来支持请求/回复语义。
请参阅 使用 ReplyingKafkaTemplate 获取更多详细信息。
迁移指南从2.0版本
请参阅 从 2.0 到 2.1 的迁移 指南。
变化从1.3到2.0
@KafkaListener变更
您现在可以使用@KafkaListener注解方法(以及类和@KafkaHandler方法)来使用@SendTo。如果该方法返回结果,则会将其转发到指定的主题。有关更多信息,请参阅通过@SendTo转发监听器结果。
消息监听器
Message listeners can now be aware of the Consumer object.
See [message-listeners] for more information.
使用ConsumerAwareRebalanceListener
rebalance监听器现在可以在重新均衡通知期间访问Consumer对象。
有关更多信息,请参阅重新均衡监听器。
变化从1.2到1.3
支持事务处理
0.11.0.0 客户端库增加了对事务的支持。
KafkaTransactionManager 和其他事务支持也已添加。
有关更多信息,请参见 事务。
支持标头
0.11.0.0 客户端库增加了对消息头的支持。
这些内容现在可以映射到 spring-messaging 和 MessageHeaders 之间。
有关更多信息,请参阅 消息头。
Kafka 时间戳支持
KafkaTemplate 现支持添加带有时间戳的记录的 API。
新发布的 KafkaHeaders 包含了对于 timestamp 的支持。
此外,还新增了 KafkaConditions.timestamp() 和 KafkaMatchers.hasTimestamp() 测试工具。
请参阅 使用 KafkaTemplate、@KafkaListener 注解 以及 测试应用程序 以获取更多详情。
@KafkaListener变更
你现在已经可以配置一个KafkaListenerErrorHandler来处理异常。更多详情请参阅处理异常。
默认情况下,现在使用@KafkaListenerid属性作为group.id属性,覆盖在消费者工厂中配置的相应属性(如果存在的话)。
此外,您还可以显式地在注解上配置groupId。
以前,如果您需要使用不同的group.id值为监听器设置不同的容器工厂和消费者工厂。
为了恢复之前的使用在工厂中配置的group.id的行为,请将注解上的idIsGroup属性设置为false。
@EmbeddedKafka注解
为了方便,提供了一个测试类级别的@EmbeddedKafka注解,用于将KafkaEmbedded注册为一个bean。
有关更多信息,请参见测试应用程序。
Kerberos 配置
现在提供了配置Kerberos的支持。 请参阅JAAS和Kerberos以获取更多信息。
变化于1.0和1.1之间
寻求
您现在可以查找每个主题或分区的位置。 您可以使用此功能在初始化期间设置初始位置,当使用组管理且Kafka分配了分区时。 您也可以在检测到空闲容器时进行查找或在应用程序执行的任意一点。 有关更多信息,请参阅 [seek]。