此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9! |
更改历史
自 3.2 以来 3.3 的新增功能
本节介绍从 3.2 版到 3.3 版所做的更改。 有关早期版本的更改,请参阅更改历史记录。
DLT 主题命名约定
DLT 主题的命名约定已标准化,可始终使用“-dlt”后缀。此更改可确保兼容性并避免在不同重试解决方案之间转换时发生冲突。希望保留“.DLT“后缀行为需要通过设置适当的 DLT 名称属性来显式选择加入。
增强消费者群体的 Seek作
一种新方法getGroupId()
,已添加到ConsumerSeekCallback
接口。
此方法允许通过仅针对所需的使用者组来进行更具选择性的查找作。
这AbstractConsumerSeekAware
现在还可以在多组侦听器场景中注册、检索和删除每个主题分区的所有回调,而不会遗漏任何回调。
请参阅新的 API (getSeekCallbacksFor(TopicPartition topicPartition)
,getTopicsAndCallbacks()
)了解更多详情。
有关更多详细信息,请参阅 Seek API 文档。
使用 RecordFilterStrategy 在 Kafka 侦听器中可配置地处理空批次
RecordFilterStrategy
现在支持忽略过滤产生的空批次。
这可以通过覆盖默认方法进行配置ignoreEmptyBatch()
,默认为 false,确保KafkaListener
即使所有ConsumerRecords
被过滤掉。
有关详细信息,请参阅消息接收过滤文档。
ConcurrentContainerStoppedEvent
这ConcurentContainerMessageListenerContainer
现在发出一个ConcurrentContainerStoppedEvent
当其所有子容器都停止时。
有关更多详细信息,请参阅应用程序事件和ConcurrentContainerStoppedEvent
Javadocs。
原始记录密钥回复
使用时ReplyingKafkaTemplate
,如果请求中的原始记录包含键,则同一键也将成为回复的一部分。
有关更多详细信息,请参阅参考文档的发送消息部分。
在 DeadLetterPublishingRecovererFactory 中自定义日志记录
使用时DeadLetterPublishingRecovererFactory
,则用户应用程序可以覆盖maybeLogListenerException
方法来自定义日志记录行为。
批量侦听器的KafkaHeaders.DELIVERY_ATTEMPT
使用BatchListener
这ConsumerRecord
可以有KafkaHeaders.DELIVERY_ATTMPT
header 在其 headers 字段中。
如果DeliveryAttemptAwareRetryListener
设置为错误处理程序作为重试侦听器,每个ConsumerRecord
has delivery attempt 标头。
有关更多详细信息,请参阅 Kafka Headers for Batch Listener。
Kafka 指标侦听器和TaskScheduler
这MicrometerProducerListener
,MicrometerConsumerListener
和KafkaStreamsMicrometerListener
现在可以使用TaskScheduler
.
看KafkaMetricsSupport
JavaDocs 和 Micrometer 支持了解更多信息。
3.2 自 3.1 以来的新增功能
本节介绍从 3.1 版到 3.2 版所做的更改。 有关早期版本的更改,请参阅更改历史记录。
Kafka 客户端版本
此版本需要 3.7.0kafka-clients
.
Kafka 客户端 3.7.0 版本引入了新的消费者组协议。
有关更多详细信息及其局限性,请参阅 KIP-848。
新的消费者组协议是抢先体验版本,不打算在生产中使用。
在此版本中仅建议用于测试目的。
因此,Spring for Apache Kafka 仅在kafka-client
本身。
默认情况下,Spring for Apache Kafka 使用经典的消费者组协议,在测试新的消费者组协议时,需要通过group.protocol
消费者的财产。
测试支持更改
这kraft
模式在EmbeddedKafka
默认情况下,想要使用kraft
mode 必须启用它。
这是由于在使用EmbeddedKafka
在kraft
模式,尤其是在测试新的消费者组协议时。
新的使用者组协议仅在以下情况下受支持kraft
模式,因此,在测试新协议时,需要针对真实的 Kafka 集群而不是基于KafkaClusterTestKit
哪EmbeddedKafka
是基于的。
此外,在运行多个KafkaListener
方法EmbeddedKafka
在kraft
模式。
在这些问题得到解决之前,kraft
默认值EmbeddedKafka
将保持为false
.
Kafka Streams 交互式查询支持
新 APIKafkaStreamsInteractiveQuerySupport
用于访问 Kafka Streams 交互式查询中使用的可查询存储。
有关更多详细信息,请参阅 Kafka Streams 交互式支持。
TransactionId后缀策略
一个新的TransactionIdSuffixStrategy
引入界面来管理transactional.id
后缀。
默认实现是DefaultTransactionIdSuffixStrategy
设置时maxCache
大于零可以重用transactional.id
在特定范围内,否则将通过递增计数器动态生成后缀。
有关详细信息,请参阅 Fixed TransactionIdSuffix。
异步@KafkaListener返回
@KafkaListener
(和@KafkaHandler
) 方法现在可以返回异步返回类型包括CompletableFuture<?>
,Mono<?>
和 Kotlinsuspend
功能。
有关详细信息,请参阅异步返回。
根据引发的异常将消息路由到自定义 DLT
现在可以根据消息处理期间引发的异常类型将消息重定向到自定义 DLT。
重定向规则通过RetryableTopic.exceptionBasedDltRouting
或RetryTopicConfigurationBuilder.dltRoutingRules
.
自定义 DLT 以及其他重试和死信主题是自动创建的。
有关详细信息,请参阅 根据引发的异常将消息路由到自定义 DLT。
弃用 ContainerProperties transactionManager 属性
弃用transactionManager
属性ContainerProperties
赞成KafkaAwareTransactionManager
,与一般相比,这是一种更窄的类型PlatformTransactionManager
.请参阅 ContainerProperties 和事务同步。
回滚处理后
一个新的AfterRollbackProcessor
应用程序接口processBatch
被提供。
有关详细信息,请参阅回滚后处理器。
更改 @RetryableTopic SameIntervalTopicReuseStrategy 默认值
改变@RetryableTopic
属性SameIntervalTopicReuseStrategy
默认值为SINGLE_TOPIC
.
有关 maxInterval 指数延迟,请参阅单个主题。
非阻塞重试支持类级@KafkaListener
非阻塞重试支持对类进行@KafkaListener。 请参阅非阻塞重试。
支持进程@RetryableTopic RetryTopicConfigurationProvider 中的类。
提供新的公共 API 来查找RetryTopicConfiguration
. 请参阅查找 RetryTopicConfiguration
RetryTopicConfigurer 支持进程 MultiMethodKafkaListenerEndpoint。
这RetryTopicConfigurer
支持流程和注册MultiMethodKafkaListenerEndpoint
. 这MultiMethodKafkaListenerEndpoint
提供getter/setter
对于属性defaultMethod
和methods
. 修改EndpointCustomizer
严格来说MethodKafkaListenerEndpoint
类型。 这EndpointHandlerMethod
添加新的构造函数为提供的 bean 构造一个实例。提供新类EndpointHandlerMultiMethod
处理用于重试端点的多方法。
新的 API 方法,用于根据用户提供的函数搜索偏移量
ConsumerCallback
提供了一个新的 API 来根据用户定义的函数搜索偏移量,该函数将消费者中的当前偏移量作为参数。有关更多详细信息,请参阅 Seek API 文档。
@PartitionOffset对 SeekPosition 的支持
添加seekPosition
属性设置为@PartitionOffset
支持TopicPartitionOffset.SeekPosition
. 有关更多详细信息,请参阅手动分配。
TopicPartitionOffset 中的新构造函数,接受一个函数来计算要搜索的偏移量
TopicPartitionOffset
有一个新的构造函数,该构造函数采用用户提供的函数来计算要搜索的偏移量。
使用此构造函数时,框架使用当前使用者偏移位置的输入参数调用函数。
有关更多详细信息,请参阅 Seek API 文档。
Spring Boot 应用程序名称作为默认客户端 ID 前缀
对于定义应用程序名称的 Spring Boot 应用程序,现在使用此名称 作为某些客户端类型的自动生成客户端 ID 的默认前缀。 有关更多详细信息,请参阅默认客户端 ID 前缀。
增强了 MessageListenerContainers 的检索
ListenerContainerRegistry
提供了两个新的 API 动态查找和过滤MessageListenerContainer
实例。getListenerContainersMatching(Predicate<String> idMatcher)
按 ID 过滤,另一个是getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher)
按 ID 和容器属性进行筛选。
看@KafkaListener
生命周期管理的 API 文档了解更多信息。
通过提供更多跟踪标签来增强观察
KafkaTemplateObservation
提供更多的跟踪标签(低基数)。KafkaListenerObservation
提供了一个新的 API 来查找高基数键名称和更多跟踪标签(高基数或低基数)。
参见千分尺观察
自 3.0 以来 3.1 的新增功能
本节介绍从 3.0 版到 3.1 版所做的更改。 有关早期版本的更改,请参阅更改历史记录。
嵌入式KafkaBroker
现在提供了一个额外的实现来使用Kraft
而不是 Zookeeper。
有关更多信息,请参阅嵌入式 Kafka 代理。
Json反序列化器
当发生反序列化异常时,SerializationException
message 不再包含表单Can’t deserialize data [[123, 34, 98, 97, 122, …
;每个数据字节的数值数组没有用处,对于大数据可能会很冗长。
当与ErrorHandlingDeserializer
这DeserializationException
发送到错误处理程序包含data
包含无法反序列化的原始数据的属性。
不与ErrorHandlingDeserializer
这KafkaConsumer
将不断为显示主题/分区/偏移量和 Jackson 抛出的原因的同一记录发出异常。
容器后处理器
可以通过指定ContainerPostProcessor
在@KafkaListener
注解。
这发生在创建容器后以及任何配置的ContainerCustomizer
在容器工厂上配置。
有关更多信息,请参阅容器工厂。
ErrorHandlingDeserializer
您现在可以添加Validator
到这个解串器;如果委托Deserializer
成功反序列化对象,但该对象未通过验证,则引发异常,类似于发生反序列化异常。
这允许将原始原始数据传递给错误处理程序。
看用ErrorHandlingDeserializer
了解更多信息。
可重试主题
更改后缀-retry-5000
自-retry
什么时候@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
.
如果要保留后缀-retry-5000
用@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")
.
有关更多信息,请参阅主题命名。
侦听器容器更改
手动分配分区时,使用null
消费者group.id
这AckMode
现在会自动强制MANUAL
. 有关详细信息,请参阅手动分配所有分区。
自 2.9 以来 3.0 的新增功能
观察
现在支持使用微分尺启用定时器和跟踪的观察。 有关更多信息,请参阅观察。
原生图像
支持创建本机映像。 有关详细信息,请参阅本机映像。
全局单嵌入 Kafka
嵌入式 Kafka (EmbeddedKafkaBroker
) 现在可以作为整个测试计划的单个全局实例启动。
有关更多信息,请参阅对多个测试类使用相同的代理。
可重试主题更改
此功能不再被视为实验性功能(就其 API 而言),该功能本身从 2.7 开始就得到了支持,但破坏 API 更改的可能性比正常情况更大。
在此版本中,Non-Blocking Retries 基础架构 Bean 的引导已更改,以避免某些应用程序中出现的有关应用程序初始化的一些计时问题。
您现在可以设置不同的concurrency
对于重试容器;默认情况下,并发与主容器相同。
@RetryableTopic
现在可以用作自定义注释的元注释,包括对@AliasFor
性能。
有关详细信息,请参阅配置。
重试主题的默认复制因子现在是-1
(使用代理默认值)。
如果您的代理版本低于 2.4 版,您现在需要显式设置该属性。
您现在可以配置多个@RetryableTopic
同一应用程序上下文中同一主题的侦听器。
以前,这是不可能的。
有关更多信息,请参阅多个侦听器,同一主题。
有重大 API 更改RetryTopicConfigurationSupport
;具体来说,如果您覆盖destinationTopicResolver
,kafkaConsumerBackoffManager
和/或retryTopicConfigurer
;
这些方法现在需要ObjectProvider<RetryTopicComponentFactory>
参数。
侦听器容器更改
与使用者身份验证和授权失败相关的事件现在由容器发布。 有关详细信息,请参阅应用程序事件。
现在,您可以自定义使用者线程使用的线程名称。 有关更多信息,请参阅容器线程命名。
容器属性restartAfterAuthException
已被添加。
有关更多信息,请参阅侦听器容器属性。
KafkaTemplate
变化
该类返回的合约现在是CompletableFuture
s 而不是ListenableFuture
s.
看用KafkaTemplate
.
ReplyingKafkaTemplate
变化
该类返回的合约现在是CompletableFuture
s 而不是ListenableFuture
s.
看用ReplyingKafkaTemplate
和请求/回复Message<?>
s.
@KafkaListener
变化
现在可以使用自定义关联标头,该标头将在任何回复消息中回显。
请参阅末尾的注释用ReplyingKafkaTemplate
了解更多信息。
现在,您可以在处理整个批次之前手动提交批次的各个部分。 有关更多信息,请参阅提交偏移量。
KafkaHeaders
变化
四个常量KafkaHeaders
在 2.9.x 中被弃用的现已删除。
-
而不是
MESSAGE_KEY
用KEY
. -
而不是
PARTITION_ID
用PARTITION
同样地RECEIVED_MESSAGE_KEY
替换为RECEIVED_KEY
和RECEIVED_PARTITION_ID
替换为RECEIVED_PARTITION
.
测试更改
3.0.7 版引入了MockConsumerFactory
和MockProducerFactory
.
有关更多信息,请参阅模拟消费者和生产者。
从版本 3.0.10 开始,默认情况下,嵌入式 Kafka 代理设置 Spring Boot 属性spring.kafka.bootstrap-servers
到嵌入式代理的地址。
自 2.8 以来 2.9 的新增功能
错误处理程序更改
这DefaultErrorHandler
现在可以配置为暂停容器进行一次轮询,并使用上一次轮询的剩余结果,而不是查找剩余记录的偏移量。
有关详细信息,请参阅 DefaultErrorHandler。
这DefaultErrorHandler
现在有一个BackOffHandler
财产。
有关更多信息,请参阅退后处理程序。
侦听器容器更改
interceptBeforeTx
现在适用于所有事务管理器(以前它仅在KafkaAwareTransactionManager
被使用)。
请参阅 [interceptBeforeTx]。
新的容器属性pauseImmediate
,允许容器在处理当前记录后暂停使用者,而不是在处理完上一次轮询中的所有记录后暂停使用者。
请参阅 [pauseImmediate]。
与消费者身份验证和授权相关的事件
标头映射器更改
您现在可以配置应映射哪些入站标头。 也可在 2.8.8 或更高版本中使用。 有关详细信息,请参阅消息头。
KafkaTemplate
变化
在 3.0 中,该类返回的 future 将为CompletableFuture
s 而不是ListenableFuture
s.
看用KafkaTemplate
以获取使用此版本时过渡的帮助。
ReplyingKafkaTemplate
变化
该模板现在提供了一种方法来等待回复容器上的分配,以避免在初始化回复容器之前发送请求时出现争用。
也可在 2.8.8 或更高版本中使用。
看用ReplyingKafkaTemplate
.
在 3.0 中,该类返回的 future 将为CompletableFuture
s 而不是ListenableFuture
s.
看用ReplyingKafkaTemplate
和请求/回复Message<?>
s以获取使用此版本时过渡的帮助。
2.8 自 2.7 以来的新增功能
本节介绍从 2.7 版到 2.8 版所做的更改。 有关早期版本的更改,请参阅更改历史记录。
包更改
与类型映射相关的类和接口已从…support.converter
自…support.mapping
.
-
AbstractJavaTypeMapper
-
ClassMapper
-
DefaultJackson2JavaTypeMapper
-
Jackson2JavaTypeMapper
无序手动提交
侦听器容器现在可以配置为接受无序的手动偏移提交(通常是异步的)。 容器将延迟提交,直到确认丢失的偏移量。 有关详细信息,请参阅手动提交偏移量。
@KafkaListener
变化
现在可以指定侦听器方法是否是方法本身的批处理侦听器。 这允许将同一个容器工厂用于记录和批处理侦听器。
有关更多信息,请参阅 [batch-listeners]。
批处理侦听器现在可以处理转换异常。
有关详细信息,请参阅使用批处理错误处理程序的转换错误。
RecordFilterStrategy
,当与批处理侦听器一起使用时,现在可以在一次调用中筛选整个批处理。
有关更多信息,请参阅 [batch-listeners] 末尾的注释。
这@KafkaListener
注释现在具有filter
属性,以覆盖容器工厂的RecordFilterStrategy
只为这个听众。
这@KafkaListener
注释现在具有info
属性;这用于填充新的侦听器容器属性listenerInfo
.
然后,它用于填充KafkaHeaders.LISTENER_INFO
每个记录中的标头,可用于RecordInterceptor
,RecordFilterStrategy
,或听者本身。
有关更多信息,请参阅 Listener Info Header 和 AbstractMessageListenerContainer 属性。
KafkaTemplate
变化
您现在可以接收一条记录,给定主题、分区和偏移量。
看用KafkaTemplate
接收了解更多信息。
CommonErrorHandler
添加
遗产GenericErrorHandler
及其用于记录批处理侦听器的子接口层次结构已被新的单个接口所取代CommonErrorHandler
与大多数旧版实现相对应的实现GenericErrorHandler
.
请参阅容器错误处理程序和将自定义旧版错误处理程序实现迁移到CommonErrorHandler
了解更多信息。
侦听器容器更改
这interceptBeforeTx
container 属性现在是true
默认情况下。
这authorizationExceptionRetryInterval
属性已重命名为authExceptionRetryInterval
现在适用于AuthenticationException
s 除了AuthorizationException
s 之前。
这两个异常都被视为致命异常,除非设置了此属性,否则容器将默认停止。
看用KafkaMessageListenerContainer
和侦听器容器属性以获取更多信息。
序列化器/解串化器更改
这DelegatingByTopicSerializer
和DelegatingByTopicDeserializer
现在提供了。
有关详细信息,请参阅委托序列化器和反序列化器。
DeadLetterPublishingRecover
变化
该物业stripPreviousExceptionHeaders
现在true
默认情况下。
现在有几种技术可以自定义将哪些标头添加到输出记录中。
有关详细信息,请参阅管理死信记录标题。
可重试主题更改
现在,可以将同一工厂用于可重试和不可重试的主题。 有关更多信息,请参阅指定 ListenerContainerFactory。
现在有一个可管理的致命异常全球列表,这些异常将使失败的记录直接进入 DLT。 请参阅异常分类器以了解如何管理它。
您现在可以结合使用阻止和非阻止重试。 有关详细信息,请参阅组合阻塞和非阻塞重试。
使用可重试主题功能时抛出的 KafkaBackOffException 现在记录在 DEBUG 级别。 如果您需要将日志记录级别更改回 WARN 或将其设置为任何其他级别,请参阅更改 KafkaBackOffException 日志记录级别。
2.6 和 2.7 之间的更改
Kafka 客户端版本
此版本需要 2.7.0kafka-clients
. 从版本 2.7.1 开始,它还与 2.8.0 客户端兼容;请参阅覆盖 Spring Boot 依赖项。
使用主题的非阻塞延迟重试
此版本中添加了这一重要的新功能。当严格排序不重要时,可以将失败的投放发送到另一个主题以供以后使用。可以配置一系列此类重试主题,但延迟会增加。请参阅 非阻塞重试 有关更多信息。
侦听器容器更改
这onlyLogRecordMetadata
container 属性现在是true
默认情况下。
新的容器属性stopImmediate
现已推出。
有关更多信息,请参阅侦听器容器属性。
使用BackOff
在投递尝试之间(例如SeekToCurrentErrorHandler
和DefaultAfterRollbackProcessor
) 现在将在容器停止后不久退出回退间隔,而不是延迟停止。
错误处理程序和扩展的回滚处理器FailedRecordProcessor
现在可以配置一个或多个RetryListener
以接收有关重试和恢复进度的信息。
这RecordInterceptor
现在在侦听器返回后调用了其他方法(通常,或通过抛出异常)。
它还有一个子界面ConsumerAwareRecordInterceptor
.
此外,现在还有一个BatchInterceptor
用于批量侦听器。
有关详细信息,请参阅消息侦听器容器。
@KafkaListener
变化
您现在可以验证@KafkaHandler
方法(类级侦听器)。
看@KafkaListener
@Payload
验证了解更多信息。
现在,您可以将rawRecordHeader
属性MessagingMessageConverter
和BatchMessagingMessageConverter
这导致原始的ConsumerRecord
添加到转换后的Message<?>
.
例如,如果您希望使用DeadLetterPublishingRecoverer
在侦听器错误处理程序中。
有关更多信息,请参阅侦听器错误处理程序。
您现在可以修改@KafkaListener
应用程序初始化期间的注释。
看@KafkaListener
属性修改了解更多信息。
DeadLetterPublishingRecover
变化
现在,如果键和值都失败反序列化,则原始值将发布到 DLT。
以前,已填充值,但键DeserializationException
留在标题中。
如果您对恢复器进行了子类化并覆盖了createProducerRecord
方法。
此外,恢复程序在发布到目标解析程序之前验证目标解析器选择的分区是否实际存在。
有关详细信息,请参阅发布死信记录。
ChainedKafkaTransactionManager
已弃用
有关详细信息,请参阅事务。
ReplyingKafkaTemplate
变化
现在有一种机制可以检查回复,如果存在某些条件,则未来异常失败。
支持发送和接收spring-messaging
Message<?>
s 已被添加。
看用ReplyingKafkaTemplate
了解更多信息。
Kafka 流更改
默认情况下,StreamsBuilderFactoryBean
现在配置为不清理本地状态。
有关详细信息,请参阅配置。
KafkaAdmin
变化
新方法createOrModifyTopics
和describeTopics
已被添加。KafkaAdmin.NewTopics
已添加以方便在单个 Bean 中配置多个主题。有关更多信息,请参阅 [configuring-topics]。
MessageConverter
变化
现在可以添加一个spring-messaging
SmartMessageConverter
到MessagingMessageConverter
,允许基于contentType
页眉。 有关更多信息,请参阅 Spring Messaging Message Conversion。
测 序@KafkaListener
s
看开始@KafkaListener
s 在序列中了解更多信息。
ExponentialBackOffWithMaxRetries
一个新的BackOff
提供了实现,可以更方便地配置最大重试次数。
看ExponentialBackOffWithMaxRetries
实现了解更多信息。
条件委托错误处理程序
这些新的错误处理程序可以配置为委托给不同的错误处理程序,具体取决于异常类型。 有关详细信息,请参阅委派错误处理程序。
2.5 和 2.6 之间的更改
侦听器容器更改
默认值EOSMode
现在BETA
.
有关详细信息,请参阅 Exactly Once 语义。
各种错误处理程序(扩展FailedRecordProcessor
) 和DefaultAfterRollbackProcessor
现在重置BackOff
如果恢复失败。
此外,您现在可以选择BackOff
根据失败的记录和/或异常使用。
您现在可以配置adviceChain
在容器属性中。
有关更多信息,请参阅侦听器容器属性。
将容器配置为发布时ListenerContainerIdleEvent
s,它现在发布一个ListenerContainerNoLongerIdleEvent
发布空闲事件后收到记录时。
有关详细信息,请参阅应用程序事件和检测空闲和无响应使用者。
@KafkaListener变化
使用手动分区分配时,您现在可以指定一个通配符来确定哪些分区应重置为初始偏移量。此外,如果侦听器实现ConsumerSeekAware
,onPartitionsAssigned()
在手动分配之后调用。(在版本 2.5.5 中也添加了)。有关更多信息,请参阅显式分区分配。
新增便捷方法AbstractConsumerSeekAware
使寻找更容易。
有关更多信息,请参阅 [seek]。
ErrorHandler 更改
的子类FailedRecordProcessor
(例如SeekToCurrentErrorHandler
,DefaultAfterRollbackProcessor
,RecoveringBatchErrorHandler
) 现在可以配置为重置重试状态,如果异常类型与此记录之前发生的异常类型不同。
生产者工厂变更
您现在可以为生产者设置最大年龄,之后它们将被关闭并重新创建。 有关详细信息,请参阅事务。
您现在可以在DefaultKafkaProducerFactory
已被创建。
例如,如果必须在凭据更改后更新 SSL 密钥/信任存储位置,这可能很有用。
看用DefaultKafkaProducerFactory
了解更多信息。
2.4 和 2.5 之间的更改
本节介绍从 2.4 版到 2.5 版所做的更改。 有关早期版本的更改,请参阅更改历史记录。
消费者/生产者工厂变更
默认的消费者和生产者工厂现在可以在创建或关闭消费者或生产者时调用回调。 提供了原生 Micrometer 指标的实现。 有关更多信息,请参阅工厂监听器。
现在,您可以在运行时更改引导服务器属性,从而启用故障转移到另一个 Kafka 集群。 有关更多信息,请参阅连接到 Kafka。
StreamsBuilderFactoryBean
变化
工厂 bean 现在可以在KafkaStreams
创建或销毁。
提供了原生千分尺度量的实现。
有关更多信息,请参阅 KafkaStreams Micrometer 支持。
Delivery Attempts 标头
现在有一个选项可以添加一个标头,用于在使用某些错误处理程序和回滚处理器后跟踪传递尝试。 有关更多信息,请参阅 Delivery Attempts Header。
@KafkaListener变化
如果需要,现在将自动填充默认回复标头,当@KafkaListener
返回类型为Message<?>
.
有关详细信息,请参阅回复类型消息<?>。
这KafkaHeaders.RECEIVED_MESSAGE_KEY
不再填充null
当传入记录具有null
钥匙;标题被完全省略。
@KafkaListener
方法现在可以指定一个ConsumerRecordMetadata
参数,而不是对元数据(如主题、分区等)使用离散标头。
有关更多信息,请参阅消费者记录元数据。
侦听器容器更改
这assignmentCommitOption
container 属性现在是LATEST_ONLY_NO_TX
默认情况下。
有关更多信息,请参阅侦听器容器属性。
这subBatchPerPartition
container 属性现在是true
默认情况下,使用事务时。
有关详细信息,请参阅事务。
一个新的RecoveringBatchErrorHandler
现在提供了。
现在支持静态组成员身份。 有关详细信息,请参阅消息侦听器容器。
配置增量/协作重新平衡时,如果偏移量未能提交,则非致命RebalanceInProgressException
,则容器将尝试在重新平衡完成后重新提交仍分配给此实例的分区的偏移量。
默认错误处理程序现在是SeekToCurrentErrorHandler
用于唱片听众和RecoveringBatchErrorHandler
用于批量侦听器。
有关更多信息,请参阅容器错误处理程序。
现在,您可以控制记录标准错误处理程序故意抛出的异常的级别。 有关更多信息,请参阅容器错误处理程序。
这getAssignmentsByClientId()
方法,可以更轻松地确定并发容器中的哪些使用者被分配了哪些分区。
有关更多信息,请参阅侦听器容器属性。
您现在可以禁止整个日志记录ConsumerRecord
s 出错,调试日志等。
看onlyLogRecordMetadata
在侦听器容器属性中。
KafkaTemplate 更改
这KafkaTemplate
现在可以维护千分尺计时器。有关更多信息,请参阅监视。
这KafkaTemplate
现在可以配置ProducerConfig
属性来覆盖生产者工厂中的属性。 看用KafkaTemplate
了解更多信息。
一个RoutingKafkaTemplate
现已提供。 看用RoutingKafkaTemplate
了解更多信息。
您现在可以使用KafkaSendCallback
而不是ListenerFutureCallback
以获得更窄的异常,从而更容易提取失败的异常ProducerRecord
.
看用KafkaTemplate
了解更多信息。
Kafka 字符串序列化器/反序列化器
新增功能ToStringSerializer
/StringDeserializer
s 以及关联的SerDe
现在提供了。
有关详细信息,请参阅字符串序列化。
Json反序列化器
这JsonDeserializer
现在可以更灵活地确定反序列化类型。
有关详细信息,请参阅使用方法确定类型。
委托序列化器/解串化器
这DelegatingSerializer
现在可以处理“标准”类型,当出站记录没有标头时。
有关详细信息,请参阅委托序列化器和反序列化器。
测试更改
这KafkaTestUtils.consumerProps()
辅助记录现在设置ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
自earliest
默认情况下。
有关更多信息,请参阅 JUnit。
2.3 和 2.4 之间的更改
ConsumerAwareRebalanceListener
喜欢ConsumerRebalanceListener
,这个接口现在有一个额外的方法onPartitionsLost
.
有关更多信息,请参阅 Apache Kafka 文档。
与ConsumerRebalanceListener
,默认实现不会调用onPartitionsRevoked
.
相反,侦听器容器将在调用onPartitionsLost
;因此,在实现时不应执行相同的作ConsumerAwareRebalanceListener
.
有关更多信息,请参阅重新平衡侦听器末尾的重要说明。
卡夫卡模板
这KafkaTemplate
现在支持非事务发布和事务发布。
看KafkaTemplate
事务性和非事务性发布了解更多信息。
聚合回复卡夫卡模板
这releaseStrategy
现在是一个BiConsumer
.
现在在超时后(以及记录到达时)调用它;第二个参数是true
在超时后调用的情况下。
有关详细信息,请参阅聚合多个回复。
侦听器容器
这ContainerProperties
提供authorizationExceptionRetryInterval
选项,让侦听器容器在任何AuthorizationException
是由KafkaConsumer
.
请参阅其 JavaDocs 和用KafkaMessageListenerContainer
了解更多信息。
@KafkaListener
这@KafkaListener
注释具有新属性splitIterables
;默认为 true。
当回复侦听器返回Iterable
此属性控制返回结果是作为单个记录发送还是发送每个元素的记录。
看转发侦听器结果@SendTo
更多信息
批处理侦听器现在可以使用BatchToRecordAdapter
;例如,这允许在事务中处理批处理,而侦听器一次获取一条记录。
在默认实现中,一个ConsumerRecordRecoverer
可用于处理批处理中的错误,而无需停止整个批处理 - 这在使用事务时可能很有用。
有关更多信息,请参阅使用批处理侦听器的事务。
Kafka 流
这StreamsBuilderFactoryBean
接受新属性KafkaStreamsInfrastructureCustomizer
.
这允许在创建流之前配置构建器和/或拓扑。
有关更多信息,请参阅 Spring Management。
2.2 和 2.3 之间的更改
本节介绍从 2.2 版到 2.3 版所做的更改。
提示、技巧和示例
添加了一个新章节提示、技巧和示例。 请提交 GitHub 问题和/或拉取请求以获取该章中的其他条目。
配置更改
从 2.3.4 版本开始,missingTopicsFatal
container 属性默认为 false。
当这种情况为 true 时,如果代理关闭,应用程序将无法启动;许多用户受到此更改的影响;鉴于 Kafka 是一个高可用性平台,我们没有预料到在没有活动代理的情况下启动应用程序会是一个常见的用例。
生产者和消费者工厂的变化
这DefaultKafkaProducerFactory
现在可以配置为为每个线程创建一个生产者。
您还可以提供Supplier<Serializer>
实例作为配置类(需要无参数构造函数)的替代方法,或使用Serializer
实例,然后在所有生产者之间共享。
看用DefaultKafkaProducerFactory
了解更多信息。
同样的选项可用于Supplier<Deserializer>
实例中的实例DefaultKafkaConsumerFactory
.
看用KafkaMessageListenerContainer
了解更多信息。
侦听器容器更改
以前,错误处理程序收到ListenerExecutionFailedException
(实际侦听器例外为cause
) 当使用侦听器适配器(例如@KafkaListener
s).
本机引发的异常GenericMessageListener
s 被原封不动地传递给错误处理程序。
现在一个ListenerExecutionFailedException
始终是参数(实际侦听器异常为cause
),它提供对容器的group.id
财产。
因为侦听器容器有自己的提交偏移量机制,所以它更喜欢 KafkaConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
成为false
.
现在,它会自动将其设置为 false,除非在使用者工厂中特别设置或容器的使用者属性覆盖。
这ackOnError
属性现在是false
默认情况下。
现在可以获得消费者的group.id
listener 方法中的属性。
看获取消费者group.id
了解更多信息。
容器具有新属性recordInterceptor
允许在调用侦听器之前检查或修改记录。
一个CompositeRecordInterceptor
还提供了,以防您需要调用多个拦截器。
有关详细信息,请参阅消息侦听器容器。
这ConsumerSeekAware
具有新方法,允许您相对于开始、结束或当前位置执行搜索,并搜索到大于或等于时间戳的第一个偏移量。
有关更多信息,请参阅 [seek]。
便利班AbstractConsumerSeekAware
现在提供以简化搜索。
有关更多信息,请参阅 [seek]。
这ContainerProperties
提供idleBetweenPolls
选项,让监听器容器中的主循环在KafkaConsumer.poll()
调用。
请参阅其 JavaDocs 和用KafkaMessageListenerContainer
了解更多信息。
使用时AckMode.MANUAL
(或MANUAL_IMMEDIATE
),您现在可以通过调用nack
在Acknowledgment
.
有关更多信息,请参阅提交偏移量。
现在可以使用 Micrometer 监控监听器性能Timer
s.
有关详细信息,请参阅监视。
容器现在发布与启动相关的其他使用者生命周期事件。 有关详细信息,请参阅应用程序事件。
事务性批处理侦听器现在可以支持僵尸隔离。有关更多信息,请参阅事务。
侦听器容器工厂现在可以配置ContainerCustomizer
在创建和配置每个容器后进一步配置每个容器。有关更多信息,请参阅容器工厂。
ErrorHandler 更改
这SeekToCurrentErrorHandler
现在将某些异常视为致命异常,并禁用这些异常的重试,在第一次失败时调用恢复器。
这SeekToCurrentErrorHandler
和SeekToCurrentBatchErrorHandler
现在可以配置为应用BackOff
(线程睡眠)。
从版本 2.3.2 开始,当错误处理程序在恢复失败的记录后返回时,将提交恢复记录的偏移量。
这DeadLetterPublishingRecoverer
,当与ErrorHandlingDeserializer
,现在将发送到死信主题的消息的有效负载设置为无法反序列化的原始值。
以前,它是null
以及提取DeserializationException
从邮件头。
有关详细信息,请参阅发布死信记录。
主题生成器
新职业TopicBuilder
提供更方便的创建NewTopic
@Bean
s 用于自动主题配置。
有关更多信息,请参阅 [configuring-topics]。
Kafka 流更改
您现在可以对StreamsBuilderFactoryBean
创建者@EnableKafkaStreams
.
有关更多信息,请参阅 Streams 配置。
一个RecoveringDeserializationExceptionHandler
现在提供了允许恢复具有反序列化错误的记录。
它可以与DeadLetterPublishingRecoverer
将这些记录发送到死信主题。
有关详细信息,请参阅从反序列化异常中恢复。
这HeaderEnricher
提供了 transformer,使用 SpEL 生成标头值。
有关详细信息,请参阅标头扩充器。
这MessagingTransformer
已提供。
这允许 Kafka 流拓扑与 spring-messaging 组件(例如 Spring Integration 流)交互。
看MessagingProcessor
和 See从KStream
了解更多信息。
JSON 组件更改
现在,所有 JSON 感知组件默认配置为 JacksonObjectMapper
由JacksonUtils.enhancedObjectMapper()
. 这JsonDeserializer
现在提供TypeReference
基于构造函数,以便更好地处理目标通用容器类型。
还有一个JacksonMimeTypeModule
已引入序列化org.springframework.util.MimeType
到纯字符串。有关更多信息,请参阅其 JavaDocs 和 Serialization, Deserialization, and Message Conversion。
一个ByteArrayJsonMessageConverter
以及为所有 Json 转换器提供的新超类,JsonMessageConverter
. 此外,一个StringOrBytesSerializer
现已可用;它可以序列化byte[]
,Bytes
和String
值ProducerRecord
s. 有关更多信息,请参阅 Spring Messaging Message Conversion。
这JsonSerializer
,JsonDeserializer
和JsonSerde
现在拥有流畅的 API,使编程配置更简单。有关更多信息,请参阅 javadocs、序列化、反序列化和消息转换以及 Streams JSON 序列化和反序列化。
回复卡夫卡模板
当回复超时时,未来将异常完成,并使用KafkaReplyTimeoutException
而不是KafkaException
.
此外,重载的sendAndReceive
现在提供了允许在每条消息的基础上指定回复超时的方法。
聚合回复卡夫卡模板
扩展ReplyingKafkaTemplate
通过聚合来自多个接收者的回复。
有关详细信息,请参阅聚合多个回复。
交易变更
您现在可以覆盖生产者工厂的transactionIdPrefix
在KafkaTemplate
和KafkaTransactionManager
.
看transactionIdPrefix
了解更多信息。
新委派序列化器/反序列化器
该框架现在提供了一个委托Serializer
和Deserializer
,利用标头来生成和使用具有多种键/值类型的记录。
有关详细信息,请参阅委托序列化器和反序列化器。
新的重试解序列化器
该框架现在提供了一个委托RetryingDeserializer
,以便在可能发生暂时性错误(如网络问题)时重试序列化。
有关详细信息,请参阅重试反序列化程序。
2.1 和 2.2 之间的更改
类和包更改
这ContainerProperties
类已从org.springframework.kafka.listener.config
自org.springframework.kafka.listener
.
这AckMode
枚举已从AbstractMessageListenerContainer
自ContainerProperties
.
这setBatchErrorHandler()
和setErrorHandler()
方法已从ContainerProperties
对两者AbstractMessageListenerContainer
和AbstractKafkaListenerContainerFactory
.
回滚处理后
一个新的AfterRollbackProcessor
提供了策略。
有关详细信息,请参阅回滚后处理器。
ConcurrentKafkaListenerContainerFactory
变化
您现在可以使用ConcurrentKafkaListenerContainerFactory
创建和配置任何ConcurrentMessageListenerContainer
,不仅是那些@KafkaListener
附注。
有关详细信息,请参阅容器工厂。
侦听器容器更改
新的容器属性 (missingTopicsFatal
)已被添加。
看用KafkaMessageListenerContainer
了解更多信息。
一个ConsumerStoppedEvent
现在在使用者停止时发出。
有关详细信息,请参阅线程安全。
批处理侦听器可以选择接收完整的ConsumerRecords<?, ?>
对象而不是List<ConsumerRecord<?, ?>
.
有关更多信息,请参阅 [batch-listeners]。
这DefaultAfterRollbackProcessor
和SeekToCurrentErrorHandler
现在可以恢复(跳过)不断失败的记录,默认情况下,在 10 次失败后恢复(跳过)。
可以将它们配置为将失败的记录发布到死信主题。
从 2.2.4 版本开始,在选择死信主题名称时可以使用消费者的组 ID。
这ConsumerStoppingEvent
已被添加。
有关详细信息,请参阅应用程序事件。
这SeekToCurrentErrorHandler
现在可以配置为在容器配置为AckMode.MANUAL_IMMEDIATE
(自 2.2.4 起)。
@KafkaListener变化
您现在可以覆盖concurrency
和autoStartup
通过在注释上设置属性来获取侦听器容器工厂的属性。
现在,您可以添加配置以确定将哪些标头(如果有)复制到回复消息中。
看@KafkaListener
注解了解更多信息。
您现在可以使用@KafkaListener
作为你自己注解的元注解。
看@KafkaListener
作为元注释了解更多信息。
现在可以更轻松地配置Validator
为@Payload
验证。 看@KafkaListener
@Payload
验证了解更多信息。
您现在可以直接在 Comments 上指定 kafka 消费者属性;这些属性将覆盖消费者工厂中定义的具有相同名称的任何属性(自版本 2.2.4 起)。有关更多信息,请参阅 Annotation 属性。
标头映射更改
类型的标题MimeType
和MediaType
现在在RecordHeader
价值。 以前,它们被映射为 JSON,并且仅MimeType
被解码。MediaType
无法解码。它们现在是用于互作性的简单字符串。
此外,JsonKafkaHeaderMapper
有一个新的addToStringClasses
方法,允许指定应通过使用toString()
而不是 JSON。有关更多信息,请参阅消息头。
嵌入式 Kafka 更改
这KafkaEmbedded
class 及其KafkaRule
接口已被弃用,取而代之的是EmbeddedKafkaBroker
及其 JUnit 4EmbeddedKafkaRule
包装纸。 这@EmbeddedKafka
注释现在填充EmbeddedKafkaBroker
bean 而不是已弃用的KafkaEmbedded
. 此更改允许使用@EmbeddedKafka
在 JUnit 5 测试中。 这@EmbeddedKafka
注释现在具有属性ports
指定填充EmbeddedKafkaBroker
. 有关详细信息,请参阅测试应用程序。
JsonSerializer/Deserializer 增强功能
现在,您可以使用生产者和使用者属性来提供类型映射信息。
反序列化程序上提供了新的构造函数,以允许使用提供的目标类型重写类型标头信息。
这JsonDeserializer
现在默认删除任何类型信息标头。
您现在可以配置JsonDeserializer
使用 Kafka 属性忽略类型信息标头(从 2.2.3 开始)。
有关详细信息,请参阅序列化、反序列化和消息转换。
Kafka 流更改
流配置 bean 现在必须是KafkaStreamsConfiguration
对象而不是StreamsConfig
对象。
这StreamsBuilderFactoryBean
已从包中移动…core
自…config
.
这KafkaStreamBrancher
引入是为了在条件分支构建在KStream
实例。
有关更多信息,请参阅 Apache Kafka Streams 支持和配置。
事务 ID
当侦听器容器启动事务时,transactional.id
现在是transactionIdPrefix
附加<group.id>.<topic>.<partition>
. 此更改允许对僵尸进行适当的围栏,如此处所述。
2.0 和 2.1 之间的更改
JSON 改进
这StringJsonMessageConverter
和JsonSerializer
现在在Headers
,让转换器和JsonDeserializer
在接收时创建特定类型,基于消息本身而不是固定的配置类型。
有关详细信息,请参阅序列化、反序列化和消息转换。
容器停止错误处理程序
现在为记录和批处理侦听器提供了容器错误处理程序,这些侦听器将侦听器抛出的任何异常视为致命/ 他们停止了容器。 有关详细信息,请参阅处理异常。
暂停和恢复容器
侦听器容器现在具有pause()
和resume()
方法(自 2.1.3 版起)。
有关更多信息,请参阅暂停和恢复侦听器容器。
有状态重试
从版本 2.1.3 开始,您可以配置有状态重试。 有关详细信息,请参阅有状态重试。
客户端 ID
从 2.1.1 版本开始,您现在可以将client.id
前缀@KafkaListener
.
以前,要自定义客户端 ID,您需要为每个侦听器设置一个单独的使用者工厂(和容器工厂)。
前缀以-n
在使用并发时提供唯一的客户端 ID。
记录偏移量提交
默认情况下,主题偏移量提交的日志记录是使用DEBUG
日志记录级别。从 2.1.2 版开始,中的一个新属性ContainerProperties
叫commitLogLevel
允许您指定这些消息的日志级别。
看用KafkaMessageListenerContainer
了解更多信息。
默认@KafkaHandler
从版本 2.1.3 开始,您可以指定@KafkaHandler
类级别的注释@KafkaListener
作为默认值。
看@KafkaListener
在类上了解更多信息。
回复卡夫卡模板
从 2.1.3 版开始,一个KafkaTemplate
提供给支持请求/回复语义。
看用ReplyingKafkaTemplate
了解更多信息。
2.0 的迁移指南
请参阅 2.0 到 2.1 迁移指南。
1.3 和 2.0 之间的更改
@KafkaListener
变化
您现在可以注释@KafkaListener
方法(以及类和@KafkaHandler
方法)替换为@SendTo
.
如果该方法返回结果,则将其转发到指定的主题。
看转发侦听器结果@SendTo
了解更多信息。
消息侦听器
消息侦听器现在可以知道Consumer
对象。
有关详细信息,请参阅 [message-listeners]。
用ConsumerAwareRebalanceListener
Rebalance 监听器现在可以访问Consumer
对象。
有关更多信息,请参阅重新平衡侦听器。
1.2 和 1.3 之间的更改
对事务的支持
0.11.0.0 客户端库添加了对事务的支持。
这KafkaTransactionManager
并添加了对事务的其他支持。
有关详细信息,请参阅事务。
支持标头
0.11.0.0 客户端库添加了对消息头的支持。
这些现在可以映射到spring-messaging
MessageHeaders
.
有关详细信息,请参阅消息头。
支持 Kafka 时间戳
KafkaTemplate
现在支持添加带有时间戳的记录的 API。
新增功能KafkaHeaders
已介绍有关timestamp
支持。
此外,新的KafkaConditions.timestamp()
和KafkaMatchers.hasTimestamp()
添加了测试实用程序。
看用KafkaTemplate
,@KafkaListener
注解和测试应用程序了解更多详细信息。
@KafkaListener
变化
您现在可以配置KafkaListenerErrorHandler
处理异常。
有关详细信息,请参阅处理异常。
默认情况下,@KafkaListener
id
属性现在用作group.id
属性,覆盖消费者工厂中配置的属性(如果存在)。
此外,您可以显式配置groupId
在注释上。
以前,您需要一个单独的容器工厂(和消费者工厂)才能使用不同的group.id
值。
要恢复以前使用出厂配置的行为group.id
,将idIsGroup
属性设置为false
.
@EmbeddedKafka
注解
为方便起见,测试类级别@EmbeddedKafka
提供注释,以注册KafkaEmbedded
作为豆子。
有关详细信息,请参阅测试应用程序。
Kerberos 配置
现在支持配置 Kerberos。 有关更多信息,请参阅 JAAS 和 Kerberos。
1.0 和 1.1 之间的更改
寻求
您现在可以查找每个主题或分区的位置。您可以使用它来设置初始化期间的初始位置,当组管理正在使用中并且 Kafka 分配分区时。您还可以在检测到空闲容器时或在应用程序执行中的任何任意点进行搜索。有关更多信息,请参阅 [seek]。