|
对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.6! |
更改历史记录
自 3.0 以来 3.1 中的新增功能
本节介绍从版本 3.0 到版本 3.1 所做的更改。 有关早期版本的更改,请参阅更改历史记录。
嵌入 KafkaBroker
现在提供了一个额外的实现供使用Kraft而不是 Zookeeper。
有关更多信息,请参阅嵌入式 Kafka 代理。
JsonDeserializer
当反序列化异常发生时,SerializationExceptionmessage 不再包含格式为Can’t deserialize data [[123, 34, 98, 97, 122, …;每个数据字节的数值数组没有用,对于大数据来说可能很详细。
当与ErrorHandlingDeserializer这DeserializationExceptionsent to 错误处理程序包含data属性,其中包含无法反序列化的原始数据。
当不与ErrorHandlingDeserializer这KafkaConsumer将持续为同一记录发出异常,显示 topic/partition/offset 和 Jackson 抛出的原因。
容器后处理器
可以通过指定 bean 名称ContainerPostProcessor在@KafkaListener注解。
这发生在创建容器之后以及任何配置的ContainerCustomizer在 Container Factory 上配置。
有关更多信息,请参阅 Container Factory。
ErrorHandlingDeserializer
您现在可以添加Validator到这个反序列化器;如果委托的Deserializer成功反序列化对象,但该对象未通过验证,则会引发异常,类似于发生的反序列化异常。
这允许将原始原始数据传递给错误处理程序。
看用ErrorHandlingDeserializer了解更多信息。
可重试主题
更改后缀-retry-5000自-retry什么时候@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC).
如果要保留后缀-retry-5000用@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2").
有关更多信息,请参阅主题命名。
侦听器容器更改
手动分配分区时,使用null消费者group.id这AckMode现在会自动强制为MANUAL.
有关更多信息,请参阅手动分配所有分区。
自 2.9 以来 3.0 中的新增功能
观察
现在支持使用 Micrometer 对计时器启用观察和跟踪。 有关更多信息,请参阅 Observation (观察)。
本机映像
支持创建本机映像。 有关更多信息,请参阅本机映像。
全局单嵌入式 Kafka
嵌入式 Kafka (EmbeddedKafkaBroker) 现在可以作为整个测试计划的单个全局实例启动。
有关更多信息,请参阅对多个测试类使用相同的 Broker(s)。
可重试主题更改
此功能不再被视为实验性功能(就其 API 而言),该功能本身自 2.7 以来一直受支持,但破坏 API 更改的可能性比正常情况更大。
在此版本中,非阻塞重试基础结构 bean 的引导已更改,以避免在某些应用程序中发生的有关应用程序初始化的一些计时问题。
您现在可以设置不同的concurrency对于重试容器;默认情况下,并发性与主容器相同。
@RetryableTopic现在可以用作自定义注释上的元注释,包括对@AliasFor性能。
有关更多信息,请参阅配置。
重试主题的默认复制因子现在为-1(使用 broker default)。
如果您的代理版本低于 2.4,则现在需要显式设置该属性。
您现在可以配置多个@RetryableTopic同一应用程序上下文中同一主题的侦听器。
以前,这是不可能的。
有关更多信息,请参阅多个侦听器,相同主题。
中存在重大 API 更改RetryTopicConfigurationSupport;具体来说,如果你覆盖了destinationTopicResolver,kafkaConsumerBackoffManager和/或retryTopicConfigurer;
这些方法现在需要一个ObjectProvider<RetryTopicComponentFactory>参数。
侦听器容器更改
与使用者身份验证和授权失败相关的事件现在由容器发布。 有关更多信息,请参阅应用程序事件。
您现在可以自定义使用者线程使用的线程名称。 有关更多信息,请参阅容器线程命名。
container 属性restartAfterAuthException已添加。
有关更多信息,请参阅 Listener Container Properties (侦听器容器属性)。
KafkaTemplate变化
该类返回的 future 现在是CompletableFutures 而不是ListenableFutures.
看用KafkaTemplate.
ReplyingKafkaTemplate变化
该类返回的 future 现在是CompletableFutures 而不是ListenableFutures.
看用ReplyingKafkaTemplate和请求/回复Message<?>s.
@KafkaListener变化
您现在可以使用自定义关联标头,该标头将在任何回复消息中回显。
请参阅末尾的注释用ReplyingKafkaTemplate了解更多信息。
现在,您可以在处理整个批次之前手动提交批次的各个部分。 有关更多信息,请参阅提交偏移量。
KafkaHeaders变化
4 个常量KafkaHeaders在 2.9.x 中弃用的 现在已被删除。
-
而不是
MESSAGE_KEY用KEY. -
而不是
PARTITION_ID用PARTITION
同样地RECEIVED_MESSAGE_KEY替换为RECEIVED_KEY和RECEIVED_PARTITION_ID替换为RECEIVED_PARTITION.
测试更改
版本 3.0.7 引入了MockConsumerFactory和MockProducerFactory.
有关更多信息,请参阅 Mock Consumer 和 Producer。
从版本 3.0.10 开始,默认情况下,嵌入式 Kafka 代理会设置 Spring Boot 属性spring.kafka.bootstrap-servers到嵌入式经纪人的地址。
自 2.8 以来 2.9 中的新增功能
错误处理程序更改
这DefaultErrorHandler现在可以配置为暂停容器进行一次轮询,并使用上一次轮询的剩余结果,而不是查找剩余记录的偏移量。
有关更多信息,请参阅 DefaultErrorHandler。
这DefaultErrorHandler现在有一个BackOffHandler财产。
有关更多信息,请参见 Back Off Handlers。
侦听器容器更改
interceptBeforeTx现在适用于所有事务管理器(以前,它仅在KafkaAwareTransactionManager被使用)。
参见 [interceptBeforeTx]。
新的 container 属性pauseImmediate,这允许容器在处理当前记录后暂停使用者,而不是在处理完上一次轮询中的所有记录之后。
请参阅 [pauseImmediate]。
与消费者身份验证和授权相关的事件
Header Mapper 更改
您现在可以配置应映射的入站标头。 在版本 2.8.8 或更高版本中也可用。 有关更多信息,请参阅 Message Headers 。
KafkaTemplate变化
在 3.0 中,该类返回的 future 将为CompletableFutures 而不是ListenableFutures.
看用KafkaTemplate有关使用此版本时进行过渡的帮助。
ReplyingKafkaTemplate变化
该模板现在提供了一种在回复容器上等待分配的方法,以避免在初始化回复容器之前发送请求时发生争用。
在版本 2.8.8 或更高版本中也可用。
看用ReplyingKafkaTemplate.
在 3.0 中,该类返回的 future 将为CompletableFutures 而不是ListenableFutures.
看用ReplyingKafkaTemplate和请求/回复Message<?>s有关使用此版本时进行过渡的帮助。
自 2.7 以来 2.8 中的新增功能
本节介绍从版本 2.7 到版本 2.8 所做的更改。 有关早期版本的更改,请参阅更改历史记录。
套件更改
与类型映射相关的类和接口已从…support.converter自…support.mapping.
-
AbstractJavaTypeMapper -
ClassMapper -
DefaultJackson2JavaTypeMapper -
Jackson2JavaTypeMapper
无序手动提交
现在可以将侦听器容器配置为接受不按顺序(通常是异步)的手动偏移提交。 容器将延迟提交,直到确认缺少的偏移量。 有关更多信息,请参阅手动提交偏移量。
@KafkaListener变化
现在可以指定侦听器方法是否是方法本身的批处理侦听器。 这允许将同一容器工厂用于记录侦听器和批处理侦听器。
有关更多信息,请参阅 [batch-listeners]。
批处理侦听器现在可以处理转换异常。
有关更多信息,请参阅使用批处理错误处理程序的转换错误。
RecordFilterStrategy与批处理侦听器一起使用时,现在可以在一次调用中筛选整个批处理。
有关更多信息,请参阅 [batch-listeners] 末尾的注释。
这@KafkaListener注解现在具有filter属性来覆盖容器工厂的RecordFilterStrategy对于这个侦听器。
这@KafkaListener注解现在具有info属性;这用于填充 New Listener Container 属性listenerInfo.
然后,它用于填充KafkaHeaders.LISTENER_INFOheader 中可用于RecordInterceptor,RecordFilterStrategy或侦听器本身。
有关更多信息,请参阅 Listener Info Header 和 Abstract Listener Container Properties 。
KafkaTemplate变化
您现在可以接收给定主题、分区和偏移量的单个记录。
看用KafkaTemplate接收了解更多信息。
CommonErrorHandler添加
遗产GenericErrorHandler其用于 Record an Batch 侦听器的子接口层次结构已替换为新的单个接口CommonErrorHandler与 大多数遗留 implementations 对应的GenericErrorHandler.
请参阅 容器错误处理程序 和将自定义旧版错误处理程序实现迁移到CommonErrorHandler了解更多信息。
侦听器容器更改
这interceptBeforeTxcontainer 属性现在是true默认情况下。
这authorizationExceptionRetryIntervalproperty 已重命名为authExceptionRetryInterval,现在适用于AuthenticationExceptions 除了AuthorizationException之前。
这两个异常都被视为致命的,除非设置了此属性,否则容器将默认停止。
Serializer/Deserializer 更改
这DelegatingByTopicSerializer和DelegatingByTopicDeserializer现在提供。
有关更多信息,请参见Delegating Serializer 和 Deserializer。
DeadLetterPublishingRecover变化
物业stripPreviousExceptionHeaders现在是true默认情况下。
现在有几种技术可以自定义将哪些标头添加到输出记录中。
有关更多信息,请参阅 管理死信记录标题 。
可重试主题更改
现在,您可以对可重试和不可重试的主题使用相同的工厂。 有关更多信息,请参阅指定 ListenerContainerFactory。
现在有一个可管理的致命异常全局列表,这些异常将使失败的记录直接进入 DLT。 请参阅 Exception Classifier 以了解如何管理它。
现在,您可以结合使用阻塞和非阻塞重试。 有关更多信息,请参阅组合阻塞和非阻塞重试。
使用可重试主题功能时引发的 KafkaBackOffException 现在记录在 DEBUG 级别。 如果您需要将日志记录级别更改回 WARN 或将其设置为任何其他级别,请参阅更改 KafkaBackOffException 日志记录级别。
2.6 和 2.7 之间的更改
Kafka 客户端版本
此版本需要 2.7.0kafka-clients.
它还与 2.8.0 客户端兼容,从 2.7.1 版本开始;请参见覆盖 Spring Boot 依赖项。
使用主题的非阻塞延迟重试
此版本中添加了这一重要的新功能。 当严格排序不重要时,失败的投放可以发送到另一个主题以供以后使用。 可以配置一系列此类重试主题,但延迟会增加。 有关更多信息,请参阅Non-Blocking Retries 。
侦听器容器更改
这onlyLogRecordMetadatacontainer 属性现在是true默认情况下。
新的 container 属性stopImmediate现已推出。
有关更多信息,请参阅 Listener Container Properties (侦听器容器属性)。
使用BackOff在两次投放尝试之间(例如SeekToCurrentErrorHandler和DefaultAfterRollbackProcessor) 现在将在容器停止后不久退出 Back off 间隔,而不是延迟停止。
错误处理程序和回滚后扩展FailedRecordProcessor现在可以配置一个或多个RetryListeners 接收有关重试和恢复进度的信息。
这RecordInterceptor现在在侦听器返回后调用了其他方法(通常,或通过引发异常)。
它还有一个子接口ConsumerAwareRecordInterceptor.
此外,现在还有一个BatchInterceptor对于批处理侦听器。
有关更多信息,请参阅 Message Listener Containers 。
@KafkaListener变化
您现在可以验证@KafkaHandler方法(类级侦听器)。
看@KafkaListener @Payload验证了解更多信息。
您现在可以设置rawRecordHeader属性MessagingMessageConverter和BatchMessagingMessageConverter这会导致 RAWConsumerRecord添加到已转换的Message<?>.
这很有用,例如,如果您希望使用DeadLetterPublishingRecoverer在侦听器错误处理程序中。
有关更多信息,请参阅 Listener Error Handlers 。
您现在可以修改@KafkaListenerannotations 的 Comments。
看@KafkaListener属性修改了解更多信息。
DeadLetterPublishingRecover变化
现在,如果 key 和 value 都失败了,则原始值将发布到 DLT。
以前,该值已填充,但键DeserializationException留在标头中。
如果存在中断性 API 更改,如果将 recoverer 子类化并覆盖createProducerRecord方法。
此外,recoverer 会在发布到目标 resolver 选择的分区之前验证该分区是否确实存在。
有关更多信息,请参阅发布死信记录。
ChainedKafkaTransactionManager已弃用
有关更多信息,请参阅 事务 。
ReplyingKafkaTemplate变化
现在有一种机制可以检查回复,如果存在某些条件,则异常地使 future 失败。
支持发送和接收spring-messaging Message<?>s 已添加。
看用ReplyingKafkaTemplate了解更多信息。
Kafka Streams 更改
默认情况下,StreamsBuilderFactoryBean现在配置为不清理本地状态。
有关更多信息,请参阅配置。
KafkaAdmin变化
新方法createOrModifyTopics和describeTopics已添加。KafkaAdmin.NewTopics已添加以方便在单个 bean 中配置多个主题。
有关更多信息,请参见 [configuring-topics]。
MessageConverter变化
现在可以添加spring-messaging SmartMessageConverter到MessagingMessageConverter,允许基于contentType页眉。
有关更多信息,请参见Spring Messaging Message Conversion。
测 序@KafkaListeners
看开始@KafkaListener序列中的 s了解更多信息。
ExponentialBackOffWithMaxRetries
新的BackOffimplementation 的配置,可以更方便地配置 Max Retreries。
看ExponentialBackOffWithMaxRetries实现了解更多信息。
条件委托错误处理程序
这些新的错误处理程序可以配置为委托给不同的错误处理程序,具体取决于异常类型。 有关更多信息,请参阅委派错误处理程序。
2.5 和 2.6 之间的变化
侦听器容器更改
默认的EOSMode现在是BETA.
有关更多信息,请参阅 Exactly Once 语义。
各种错误处理程序(扩展FailedRecordProcessor) 和DefaultAfterRollbackProcessor现在重置BackOff如果恢复失败。
此外,您现在可以选择BackOff以根据失败的记录和/或异常使用。
现在,您可以配置adviceChain在 Container Properties (容器属性) 中。
有关更多信息,请参阅 Listener Container Properties (侦听器容器属性)。
当容器配置为发布时ListenerContainerIdleEvents 中,它现在会发布一个ListenerContainerNoLongerIdleEvent发布空闲事件后收到记录时。
有关更多信息,请参阅应用程序事件和检测空闲和无响应的使用者。
@KafkaListener 更改
使用手动分区分配时,您现在可以指定通配符来确定哪些分区应重置为初始偏移量。
此外,如果监听器实现了ConsumerSeekAware,onPartitionsAssigned()在手动分配后调用。
(也在版本 2.5.5 中添加)。
有关更多信息,请参阅 Explicit Partition Assignment。
添加了便捷的方法AbstractConsumerSeekAware使查找更容易。
有关更多信息,请参见 [seek]。
ErrorHandler 更改
的子类FailedRecordProcessor(例如SeekToCurrentErrorHandler,DefaultAfterRollbackProcessor,RecoveringBatchErrorHandler) 现在可以配置为在异常类型与之前此记录发生的异常类型不同时重置重试状态。
Producer Factory 更改
现在,您可以为创建者设置一个最长期限,超过该期限后,它们将被关闭并重新创建。 有关更多信息,请参阅 事务 。
现在,您可以在DefaultKafkaProducerFactory已创建。
这可能很有用,例如,如果您必须在凭证更改后更新 SSL 密钥/信任存储位置。
看用DefaultKafkaProducerFactory了解更多信息。
2.4 和 2.5 之间的变化
本节介绍从版本 2.4 到版本 2.5 所做的更改。 有关早期版本的更改,请参阅更改历史记录。
消费者/生产者出厂设置更改
现在,每当创建或关闭 Consumer 或 Producer 时,默认的 consumer 和 producer 工厂都可以调用回调。 提供了本机 Micrometer 度量的实现。 有关更多信息,请参阅 Factory Listeners 。
现在,您可以在运行时更改引导服务器属性,从而能够故障转移到另一个 Kafka 集群。 有关更多信息,请参阅连接到 Kafka。
StreamsBuilderFactoryBean变化
工厂 Bean 现在可以在某个KafkaStreamscreated 或 destroyed。
提供了本机 Micrometer 度量的实现。
有关更多信息,请参阅 KafkaStreams Micrometer Support 。
Delivery Attempts 标头
现在有一个选项可以添加一个 headers,该 headers 在使用某些错误处理程序时和回滚处理器之后跟踪传递尝试。 有关更多信息,请参阅 Delivery Attempts Header。
@KafkaListener 更改
现在,如果需要,默认回复标头将在@KafkaListenerreturn 类型为Message<?>.
有关更多信息,请参阅 Reply Type Message<?> 。
这KafkaHeaders.RECEIVED_MESSAGE_KEY不再填充null值,当传入记录具有null钥匙;标题被完全省略。
@KafkaListener方法现在可以指定ConsumerRecordMetadata参数,而不是对元数据(如 Topic、Partition 等)使用离散标头。
有关更多信息,请参阅 Consumer Record Metadata。
侦听器容器更改
这assignmentCommitOptioncontainer 属性现在是LATEST_ONLY_NO_TX默认情况下。
有关更多信息,请参阅 Listener Container Properties (侦听器容器属性)。
这subBatchPerPartitioncontainer 属性现在是true默认情况下,使用 transactions 时。
有关更多信息,请参阅 事务 。
新的RecoveringBatchErrorHandler。
现在支持静态组成员资格。 有关更多信息,请参阅 Message Listener Containers 。
当配置了增量/协作再平衡时,如果偏移量无法提交,并且使用非致命的RebalanceInProgressException,容器将尝试在再平衡完成后重新提交仍分配给此实例的分区的偏移量。
默认错误处理程序现在是SeekToCurrentErrorHandlerfor record listener 和RecoveringBatchErrorHandler对于批处理侦听器。
有关更多信息,请参阅容器错误处理程序。
现在,您可以控制记录标准错误处理程序有意引发的异常的级别。 有关更多信息,请参阅容器错误处理程序。
这getAssignmentsByClientId()方法,从而更容易确定并发容器中的哪些使用者被分配了哪些分区。
有关更多信息,请参阅 Listener Container Properties (侦听器容器属性)。
您现在可以禁止整个日志记录ConsumerRecord错误、调试日志等。
看onlyLogRecordMetadata在 Listener Container Properties 中。
KafkaTemplate 更改
这KafkaTemplate现在可以保持千分尺计时器。
有关更多信息,请参阅监控。
这KafkaTemplate现在可以配置ProducerConfig属性来覆盖 Producer Factory 中的属性。
看用KafkaTemplate了解更多信息。
一个RoutingKafkaTemplate现在已经提供了。
看用RoutingKafkaTemplate了解更多信息。
您现在可以使用KafkaSendCallback而不是ListenerFutureCallback来获得范围更窄的异常,从而更容易提取失败的ProducerRecord.
看用KafkaTemplate了解更多信息。
Kafka 字符串序列化器/反序列化器
新增功能ToStringSerializer/StringDeserializers 以及关联的SerDe现在提供。
有关更多信息,请参阅字符串序列化。
JsonDeserializer
这JsonDeserializer现在可以更灵活地确定反序列化类型。
有关更多信息,请参阅使用方法确定类型。
委托序列化器/反序列化器
这DelegatingSerializer现在可以处理 “standard” 类型,当出站记录没有 Headers 时。
有关更多信息,请参见Delegating Serializer 和 Deserializer。
测试更改
这KafkaTestUtils.consumerProps()帮助程序记录现在设置ConsumerConfig.AUTO_OFFSET_RESET_CONFIG自earliest默认情况下。
有关更多信息,请参阅 JUnit。
2.3 和 2.4 之间的更改
ConsumerAwareRebalanceListener 的
喜欢ConsumerRebalanceListener,此接口现在具有一个额外的方法onPartitionsLost.
有关更多信息,请参阅 Apache Kafka 文档。
与ConsumerRebalanceListener,默认实现不会调用onPartitionsRevoked.
相反,侦听器容器将在调用onPartitionsLost;因此,在实现ConsumerAwareRebalanceListener.
有关更多信息,请参阅 Rebalancing Listeners 末尾的重要说明。
Kafka模板
这KafkaTemplate现在支持非事务性发布和事务性发布。
看KafkaTemplate事务性和非事务性发布了解更多信息。
AggregatingReplyingKafka模板
这releaseStrategy现在是BiConsumer.
现在,在超时后(以及记录到达时)调用它;第二个参数是true在超时后调用。
有关更多信息,请参阅 聚合多个回复 。
侦听器容器
这ContainerProperties提供了一个authorizationExceptionRetryInterval选项,让侦听器容器在任何AuthorizationException由KafkaConsumer.
请参阅其 JavaDocs 和用KafkaMessageListenerContainer了解更多信息。
@KafkaListener
这@KafkaListenerannotation 具有新属性splitIterables;默认为 true。
当回复侦听器返回Iterable此属性控制是将返回结果作为单个记录发送,还是发送每个元素的一条记录。
看转发侦听器结果@SendTo了解更多信息
批处理侦听器现在可以使用BatchToRecordAdapter;例如,这允许在事务中处理 BATCH,而侦听器一次获取一条记录。
在默认实现中,ConsumerRecordRecoverer可用于处理批处理中的错误,而无需停止整个批处理的处理 - 这在使用事务时可能很有用。
有关更多信息,请参阅使用 Batch Listeners 的事务。
Kafka 流
这StreamsBuilderFactoryBean接受新属性KafkaStreamsInfrastructureCustomizer.
这允许在创建流之前配置生成器和/或拓扑。
有关更多信息,请参见 Spring Management。
2.2 和 2.3 之间的变化
本节介绍从版本 2.2 到版本 2.3 所做的更改。
提示、技巧和示例
添加了新章节 提示、技巧和示例。 请提交 GitHub 问题和/或拉取请求以获取该章节中的其他条目。
配置更改
从版本 2.3.4 开始,missingTopicsFatalcontainer 属性默认为 false。
如果为 true,则如果代理已关闭,则应用程序将无法启动;许多用户受到此更改的影响;鉴于 Kafka 是一个高可用性平台,我们没有预料到在没有活动代理的情况下启动应用程序会是一个常见的用例。
生产者和消费者工厂更改
这DefaultKafkaProducerFactory现在可以配置为为每个线程创建一个 Producer。
您还可以提供Supplier<Serializer>实例作为已配置类(需要无 arg 构造函数)的替代方法,或使用Serializer实例,然后在所有 Producer 之间共享。
看用DefaultKafkaProducerFactory了解更多信息。
相同的选项可用于Supplier<Deserializer>中的 实例DefaultKafkaConsumerFactory.
看用KafkaMessageListenerContainer了解更多信息。
侦听器容器更改
以前,收到 error handlersListenerExecutionFailedException(实际的侦听器异常作为cause),当使用侦听器适配器(如@KafkaListeners).
本机引发的异常GenericMessageListeners 被原封不动地传递给错误处理程序。
现在ListenerExecutionFailedException始终是参数(实际的侦听器异常作为cause),它提供对容器的group.id财产。
因为侦听器容器有自己的提交偏移量的机制,所以它更喜欢 KafkaConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG成为false.
它现在会自动将其设置为 false,除非在 Consumer Factory 中专门设置或容器的 consumer 属性覆盖。
这ackOnErrorproperty 现在是false默认情况下。
现在可以获取消费者的group.id属性。
看获取 Consumergroup.id了解更多信息。
容器具有新属性recordInterceptor允许在调用侦听器之前检查或修改记录。
一个CompositeRecordInterceptor在您需要调用多个拦截器的情况下也提供了。
有关更多信息,请参阅 Message Listener Containers 。
这ConsumerSeekAware具有新方法,允许您相对于开始、结束或当前位置执行查找,并查找大于或等于时间戳的第一个偏移量。
有关更多信息,请参见 [seek]。
便利类AbstractConsumerSeekAware现在提供以简化查找。
有关更多信息,请参见 [seek]。
这ContainerProperties提供了一个idleBetweenPolls选项,让侦听器容器中的 main 循环在KafkaConsumer.poll()调用。
请参阅其 JavaDocs 和用KafkaMessageListenerContainer了解更多信息。
使用AckMode.MANUAL(或MANUAL_IMMEDIATE),现在,您可以通过调用nack在Acknowledgment.
有关更多信息,请参阅提交偏移量。
现在可以使用 Micrometer 监控侦听器性能Timers.
有关更多信息,请参阅监控。
容器现在发布与启动相关的其他使用者生命周期事件。 有关更多信息,请参阅应用程序事件。
事务性批处理侦听器现在可以支持僵尸屏蔽。 有关更多信息,请参阅 事务 。
侦听器容器工厂现在可以使用ContainerCustomizer在创建和配置每个容器后进一步配置每个容器。
有关更多信息,请参阅 Container factory。
ErrorHandler 更改
这SeekToCurrentErrorHandler现在将某些异常视为 Fatal 并禁用这些异常的重试,并在第一次失败时调用 Recoverer。
这SeekToCurrentErrorHandler和SeekToCurrentBatchErrorHandler现在可以配置为应用BackOff(线程休眠)。
从版本 2.3.2 开始,当错误处理程序在恢复失败的记录后返回时,将提交已恢复记录的偏移量。
这DeadLetterPublishingRecoverer与ErrorHandlingDeserializer现在,将发送到死信主题的消息的有效负载设置为无法反序列化的原始值。
以前,它是null以及提取DeserializationException从消息标头。
有关更多信息,请参阅发布死信记录。
主题生成器
新类TopicBuilder为了更方便地创建NewTopic @Beans 进行自动主题配置。
有关更多信息,请参见 [configuring-topics]。
Kafka Streams 更改
现在,您可以执行StreamsBuilderFactoryBean创建者@EnableKafkaStreams.
有关更多信息,请参阅 Streams Configuration 。
一个RecoveringDeserializationExceptionHandler,这允许恢复具有反序列化错误的记录。
它可以与DeadLetterPublishingRecoverer将这些记录发送到死信主题。
有关更多信息,请参见Recovery from Deserialization Exceptions。
这HeaderEnrichertransformer,使用 SpEL 生成 Headers 值。
有关更多信息,请参阅 Header Enricher 。
这MessagingTransformer已提供。
这允许 Kafka 流拓扑与 spring-messaging 组件(例如 Spring Integration 流)进行交互。
看MessagingProcessor和 See[从KStream] 了解更多信息。
JSON 组件更改
现在,默认情况下,所有 JSON 感知组件都使用 Jackson 进行配置ObjectMapper制作人JacksonUtils.enhancedObjectMapper().
这JsonDeserializer现在提供TypeReference的构造函数,以便更好地处理目标泛型容器类型。
也是一个JacksonMimeTypeModule已引入序列化org.springframework.util.MimeType转换为纯字符串。
有关更多信息,请参阅其 JavaDocs 和 Serialization, Deserialization, and Message Conversion。
一个ByteArrayJsonMessageConverter以及所有 Json 转换器的新超类,JsonMessageConverter.
此外,一个StringOrBytesSerializer现已推出;它可以序列化byte[],Bytes和String的值ProducerRecords.
有关更多信息,请参见Spring Messaging Message Conversion。
这JsonSerializer,JsonDeserializer和JsonSerde现在拥有 Fluent API,使编程配置更简单。
请参阅 javadocs、 序列化、反序列化和消息转换 以及 Streams JSON 序列化和反序列化 以获取更多信息。
回复KafkaTemplate
当回复超时时,future 异常地以KafkaReplyTimeoutException而不是KafkaException.
此外,重载的sendAndReceive现在提供了允许基于每条消息指定回复超时的方法。
AggregatingReplyingKafka模板
扩展ReplyingKafkaTemplate通过聚合来自多个接收者的回复。
有关更多信息,请参阅 聚合多个回复 。
交易变更
您现在可以覆盖 producer 工厂的transactionIdPrefix在KafkaTemplate和KafkaTransactionManager.
看transactionIdPrefix了解更多信息。
新的 Delegating Serializer/Deserializer
该框架现在提供了一个委托Serializer和Deserializer,利用标头来启用生成和使用具有多个键/值类型的记录。
有关更多信息,请参见Delegating Serializer 和 Deserializer。
新的 Retrying Deserializer
该框架现在提供了一个委托RetryingDeserializer,以便在可能发生暂时性错误(如网络问题)时重试序列化。
有关更多信息,请参阅 Retrying Deserializer 。
2.1 和 2.2 之间的更改
类和软件包更改
这ContainerPropertiesclass 已从org.springframework.kafka.listener.config自org.springframework.kafka.listener.
这AckModeenum 已从AbstractMessageListenerContainer自ContainerProperties.
这setBatchErrorHandler()和setErrorHandler()方法已从ContainerProperties到两者AbstractMessageListenerContainer和AbstractKafkaListenerContainerFactory.
回滚处理后
新的AfterRollbackProcessor策略。
有关更多信息,请参阅 After-rollback Processor 。
ConcurrentKafkaListenerContainerFactory变化
您现在可以使用ConcurrentKafkaListenerContainerFactory创建和配置任何ConcurrentMessageListenerContainer,而不仅仅是那些@KafkaListener附注。
有关更多信息,请参阅 Container factory。
侦听器容器更改
新的 container 属性 (missingTopicsFatal) 已添加。
看用KafkaMessageListenerContainer了解更多信息。
一个ConsumerStoppedEvent现在,当使用者停止时发出。
有关更多信息,请参阅 线程安全 。
Batch 侦听器可以选择接收完整的ConsumerRecords<?, ?>object 而不是List<ConsumerRecord<?, ?>.
有关更多信息,请参阅 [batch-listeners]。
这DefaultAfterRollbackProcessor和SeekToCurrentErrorHandler现在可以恢复(跳过)不断失败的记录,并且默认情况下,在 10 次失败后恢复。
它们可以配置为将失败的记录发布到死信主题。
从版本 2.2.4 开始,可以在选择死信主题名称时使用使用者的组 ID。
这ConsumerStoppingEvent已添加。
有关更多信息,请参阅应用程序事件。
这SeekToCurrentErrorHandler现在可以配置为在容器配置了AckMode.MANUAL_IMMEDIATE(自 2.2.4 起)。
@KafkaListener 更改
您现在可以覆盖concurrency和autoStartupproperties 的 Comments。
现在,您可以添加配置以确定将哪些标头(如果有)复制到回复消息中。
看@KafkaListener注解了解更多信息。
您现在可以使用@KafkaListener作为你自己的注解上的元注解。
看@KafkaListener作为元注释了解更多信息。
现在,可以更轻松地配置Validator为@Payload验证。
看@KafkaListener @Payload验证了解更多信息。
现在,您可以直接在 Comments 上指定 kafka 使用者属性;这些将覆盖 Consumer Factory 中定义的任何具有相同名称的属性(自版本 2.2.4 起)。 有关更多信息,请参阅 Annotation 属性。
标头映射更改
类型的标头MimeType和MediaType现在映射为 Simpler Strings 中的RecordHeader价值。
以前,它们被映射为 JSON,并且只MimeType被解码。MediaType无法解码。
它们现在是用于互作性的简单字符串。
此外,DefaultKafkaHeaderMapper具有新的addToStringClasses方法,允许使用toString()而不是 JSON。
有关更多信息,请参阅 Message Headers 。
嵌入式 Kafka 更改
这KafkaEmbedded类及其KafkaRule接口已被弃用,取而代的EmbeddedKafkaBroker及其 JUnit 4EmbeddedKafkaRule包装纸。
这@EmbeddedKafka注解现在会填充EmbeddedKafkaBrokerbean 而不是已弃用的KafkaEmbedded.
此更改允许使用@EmbeddedKafka在 JUnit 5 测试中。
这@EmbeddedKafka注解现在具有属性ports要指定填充EmbeddedKafkaBroker.
有关更多信息,请参阅测试应用程序。
JsonSerializer/Deserializer 增强功能
现在,您可以使用 producer 和 consumer 属性提供类型映射信息。
反序列化器上提供了新的构造函数,以允许使用提供的目标类型覆盖类型 Headers 信息。
这JsonDeserializer现在默认删除任何类型信息标头。
您现在可以配置JsonDeserializer使用 Kafka 属性忽略类型信息标头(自 2.2.3 起)。
有关更多信息,请参见 序列化、反序列化和消息转换。
Kafka Streams 更改
流配置 Bean 现在必须是一个KafkaStreamsConfigurationobject 而不是StreamsConfig对象。
这StreamsBuilderFactoryBean已从包…core自…config.
这KafkaStreamBrancher,以便在条件分支构建在KStream实例。
有关更多信息,请参阅 Apache Kafka Streams 支持和配置。
事务 ID
当侦听器容器启动事务时,transactional.id现在是transactionIdPrefix附加<group.id>.<topic>.<partition>.
此更改允许对僵尸进行适当的围栏,如此处所述。
2.0 和 2.1 之间的变化
JSON 改进
这StringJsonMessageConverter和JsonSerializer现在在Headers,让转换器和JsonDeserializer根据消息本身而不是固定的配置类型,在 Reception 时创建特定类型。
有关更多信息,请参见 序列化、反序列化和消息转换。
容器停止错误处理程序
现在为记录和批处理侦听器提供了容器错误处理程序,这些侦听器将侦听器抛出的任何异常视为 fatal/ 他们停止了容器。 有关更多信息,请参阅处理异常。
暂停和恢复容器
侦听器容器现在具有pause()和resume()方法(自版本 2.1.3 起)。
有关更多信息,请参阅暂停和恢复侦听器容器。
状态重试
从版本 2.1.3 开始,您可以配置有状态重试。 有关更多信息,请参阅有状态重试。
客户端 ID
从版本 2.1.1 开始,您现在可以将client.id前缀@KafkaListener.
以前,要自定义客户端 ID,您需要为每个侦听器提供单独的使用者工厂(和容器工厂)。
前缀后缀为-n以在使用并发时提供唯一的客户端 ID。
日志记录偏移提交
默认情况下,主题偏移提交的日志记录是使用DEBUG日志记录级别。
从版本 2.1.2 开始,在ContainerProperties叫commitLogLevel用于指定这些消息的日志级别。
看用KafkaMessageListenerContainer了解更多信息。
默认@KafkaHandler
从版本 2.1.3 开始,您可以指定一个@KafkaHandler类级别的注释@KafkaListener作为默认值。
看@KafkaListener在类上了解更多信息。
回复KafkaTemplate
从版本 2.1.3 开始,一个KafkaTemplate以支持请求/回复语义。
看用ReplyingKafkaTemplate了解更多信息。
2.0 版的迁移指南
请参阅 2.0 到 2.1 迁移指南。
1.3 和 2.0 之间的变化
@KafkaListener变化
您现在可以批注@KafkaListener方法(以及类和@KafkaHandler方法)与@SendTo.
如果该方法返回结果,则会将其转发到指定的 Topic。
看转发侦听器结果@SendTo了解更多信息。
消息侦听器
消息侦听器现在可以了解Consumer对象。
有关更多信息,请参阅 [message-listeners]。
用ConsumerAwareRebalanceListener
再平衡侦听器现在可以访问Consumer对象。
有关更多信息,请参阅重新平衡侦听器。
1.2 和 1.3 之间的变化
对事务的支持
0.11.0.0 客户端库添加了对事务的支持。
这KafkaTransactionManager以及对事务的其他支持。
有关更多信息,请参阅 事务 。
对标头的支持
0.11.0.0 客户端库添加了对消息标头的支持。
现在可以将这些 Map 映射到spring-messaging MessageHeaders.
有关更多信息,请参阅 Message Headers 。
支持 Kafka 时间戳
KafkaTemplate现在支持 API 添加带有时间戳的记录。
新增功能KafkaHeaders已介绍timestamp支持。
此外,新的KafkaConditions.timestamp()和KafkaMatchers.hasTimestamp()添加了 testing 实用程序。
看用KafkaTemplate,@KafkaListener注解和 Testing Applications 了解更多详情。
@KafkaListener变化
您现在可以配置KafkaListenerErrorHandler处理异常。
有关更多信息,请参阅处理异常。
默认情况下,@KafkaListener id属性现在用作group.id属性,覆盖在 Consumer Factory 中配置的属性(如果存在)。
此外,您可以显式配置groupId在注释上。
以前,您需要一个单独的容器工厂(和消费者工厂)来使用不同的group.id侦听器的值。
要恢复以前使用出厂配置的行为group.id中,将idIsGroup属性设置为false.
@EmbeddedKafka注解
为方便起见,测试类级别的@EmbeddedKafkaannotation 来注册KafkaEmbedded作为 Bean 进行。
有关更多信息,请参阅测试应用程序。
Kerberos 配置
现在提供对配置 Kerberos 的支持。 有关更多信息,请参阅 JAAS 和 Kerberos。
1.0 和 1.1 之间的更改
寻求
您现在可以查找每个主题或分区的位置。 当使用组管理并且 Kafka 分配分区时,您可以使用此选项在初始化期间设置初始位置。 您还可以在检测到空闲容器时或在应用程序执行中的任何任意点进行查找。 有关更多信息,请参见 [seek]。