附录 D:更改历史记录
D.1. 2.9 自 2.8 以来的新增功能
D.1.2. 错误处理程序更改
这DefaultErrorHandler现在可以配置为暂停容器进行一次轮询,并使用上一次轮询的剩余结果,而不是查找剩余记录的偏移量。
有关详细信息,请参阅 DefaultErrorHandler。
这DefaultErrorHandler现在有一个BackOffHandler财产。
有关更多信息,请参阅退后处理程序。
D.1.3. 侦听器容器更改
interceptBeforeTx现在适用于所有事务管理器(以前它仅在KafkaAwareTransactionManager被使用)。
请参阅 [interceptBeforeTx]。
新的容器属性pauseImmediate,允许容器在处理当前记录后暂停使用者,而不是在处理完上一次轮询中的所有记录后暂停使用者。
请参阅 [pauseImmediate]。
与消费者身份验证和授权相关的事件
D.1.4. 标头映射器更改
您现在可以配置应映射哪些入站标头。 也可在 2.8.8 或更高版本中使用。 有关详细信息,请参阅消息头。
D.1.5.KafkaTemplate变化
在 3.0 中,该类返回的 future 将为CompletableFutures 而不是ListenableFutures.
看用KafkaTemplate以获取使用此版本时过渡的帮助。
D.1.6.ReplyingKafkaTemplate变化
该模板现在提供了一种方法来等待回复容器上的分配,以避免在初始化回复容器之前发送请求时出现争用。
也可在 2.8.8 或更高版本中使用。
看用ReplyingKafkaTemplate.
在 3.0 中,该类返回的 future 将为CompletableFutures 而不是ListenableFutures.
看用ReplyingKafkaTemplate和请求/回复Message<?>s以获取使用此版本时过渡的帮助。
D.2. 2.8 自 2.7 以来的新功能
本节介绍从 2.7 版到 2.8 版所做的更改。 有关早期版本的更改,请参阅更改历史记录。
D.2.2. 软件包更改
与类型映射相关的类和接口已从…support.converter自…support.mapping.
-
AbstractJavaTypeMapper -
ClassMapper -
DefaultJackson2JavaTypeMapper -
Jackson2JavaTypeMapper
D.2.3. 无序手动提交
侦听器容器现在可以配置为接受无序的手动偏移提交(通常是异步的)。 容器将延迟提交,直到确认丢失的偏移量。 有关详细信息,请参阅手动提交偏移量。
D.2.4.@KafkaListener变化
现在可以指定侦听器方法是否是方法本身的批处理侦听器。 这允许将同一个容器工厂用于记录和批处理侦听器。
有关更多信息,请参阅批处理侦听器。
批处理侦听器现在可以处理转换异常。
有关详细信息,请参阅使用批处理错误处理程序的转换错误。
RecordFilterStrategy,当与批处理侦听器一起使用时,现在可以在一次调用中筛选整个批处理。
有关更多信息,请参阅 Batch Listeners 末尾的注释。
这@KafkaListener注释现在具有filter属性,以覆盖容器工厂的RecordFilterStrategy只为这个听众。
D.2.5.KafkaTemplate变化
您现在可以接收一条记录,给定主题、分区和偏移量。
看用KafkaTemplate接收了解更多信息。
D.2.6.CommonErrorHandler添加
遗产GenericErrorHandler及其用于记录批处理侦听器的子接口层次结构已被新的单个接口所取代CommonErrorHandler与大多数旧版实现相对应的实现GenericErrorHandler.
请参阅容器错误处理程序和将自定义旧版错误处理程序实现迁移到CommonErrorHandler了解更多信息。
D.2.7. 侦听器容器更改
这interceptBeforeTxcontainer 属性现在是true默认情况下。
这authorizationExceptionRetryInterval属性已重命名为authExceptionRetryInterval现在适用于AuthenticationExceptions 除了AuthorizationExceptions 之前。
这两个异常都被视为致命异常,除非设置了此属性,否则容器将默认停止。
看用KafkaMessageListenerContainer和侦听器容器属性以获取更多信息。
D.2.8. 序列化器/解序列化器更改
这DelegatingByTopicSerializer和DelegatingByTopicDeserializer现在提供了。
有关详细信息,请参阅委托序列化器和反序列化器。
D.2.9.DeadLetterPublishingRecover变化
该物业stripPreviousExceptionHeaders现在true默认情况下。
现在有几种技术可以自定义将哪些标头添加到输出记录中。
有关详细信息,请参阅管理死信记录标题。
D.2.10. 可重试主题更改
现在,可以将同一工厂用于可重试和不可重试的主题。 有关更多信息,请参阅指定 ListenerContainerFactory。
现在有一个可管理的致命异常全球列表,这些异常将使失败的记录直接进入 DLT。 请参阅异常分类器以了解如何管理它。
您现在可以结合使用阻止和非阻止重试。 有关详细信息,请参阅组合阻塞和非阻塞重试。
使用可重试主题功能时抛出的 KafkaBackOffException 现在记录在 DEBUG 级别。 如果您需要将日志记录级别更改回 WARN 或将其设置为任何其他级别,请参阅更改 KafkaBackOffException 日志记录级别。
D.3. 2.6 和 2.7 之间的更改
D.3.1. Kafka 客户端版本
此版本需要 2.7.0kafka-clients. 从版本 2.7.1 开始,它还与 2.8.0 客户端兼容;请参阅覆盖 Spring Boot 依赖项。
D.3.2. 使用 Topics 的非阻塞延迟重试
此版本中添加了这一重要的新功能。当严格排序不重要时,可以将失败的投放发送到另一个主题以供以后使用。可以配置一系列此类重试主题,但延迟会增加。请参阅 非阻塞重试 有关更多信息。
D.3.3. 侦听器容器更改
这onlyLogRecordMetadatacontainer 属性现在是true默认情况下。
新的容器属性stopImmediate现已推出。
有关更多信息,请参阅侦听器容器属性。
使用BackOff在投递尝试之间(例如SeekToCurrentErrorHandler和DefaultAfterRollbackProcessor) 现在将在容器停止后不久退出回退间隔,而不是延迟停止。
错误处理程序和扩展的回滚处理器FailedRecordProcessor现在可以配置一个或多个RetryListener以接收有关重试和恢复进度的信息。
这RecordInterceptor现在在侦听器返回后调用了其他方法(通常,或通过抛出异常)。
它还有一个子界面ConsumerAwareRecordInterceptor.
此外,现在还有一个BatchInterceptor用于批量侦听器。
有关详细信息,请参阅消息侦听器容器。
D.3.4.@KafkaListener变化
您现在可以验证@KafkaHandler方法(类级侦听器)。
看@KafkaListener @Payload验证了解更多信息。
现在,您可以将rawRecordHeader属性MessagingMessageConverter和BatchMessagingMessageConverter这导致原始的ConsumerRecord添加到转换后的Message<?>.
例如,如果您希望使用DeadLetterPublishingRecoverer在侦听器错误处理程序中。
有关更多信息,请参阅侦听器错误处理程序。
您现在可以修改@KafkaListener应用程序初始化期间的注释。
看@KafkaListener属性修改了解更多信息。
D.3.5.DeadLetterPublishingRecover变化
现在,如果键和值都失败反序列化,则原始值将发布到 DLT。
以前,已填充值,但键DeserializationException留在标题中。
如果您对恢复器进行了子类化并覆盖了createProducerRecord方法。
此外,恢复程序在发布到目标解析程序之前验证目标解析器选择的分区是否实际存在。
有关详细信息,请参阅发布死信记录。
D.3.6.ChainedKafkaTransactionManager已弃用
有关详细信息,请参阅事务。
D.3.7.ReplyingKafkaTemplate变化
现在有一种机制可以检查回复,如果存在某些条件,则未来异常失败。
支持发送和接收spring-messaging Message<?>s 已被添加。
看用ReplyingKafkaTemplate了解更多信息。
D.3.8. Kafka 流更改
默认情况下,StreamsBuilderFactoryBean现在配置为不清理本地状态。
有关详细信息,请参阅配置。
D.3.9.KafkaAdmin变化
新方法createOrModifyTopics和describeTopics已被添加。KafkaAdmin.NewTopics已添加以方便在单个 Bean 中配置多个主题。
有关详细信息,请参阅配置主题。
D.3.10.MessageConverter变化
现在可以添加一个spring-messaging SmartMessageConverter到MessagingMessageConverter,允许基于contentType页眉。
有关更多信息,请参阅 Spring Messaging Message Conversion。
D.3.11. 测序@KafkaListeners
看开始@KafkaListeners 在序列中了解更多信息。
D.3.12.ExponentialBackOffWithMaxRetries
一个新的BackOff提供了实现,可以更方便地配置最大重试次数。
看ExponentialBackOffWithMaxRetries实现了解更多信息。
D.3.13. 条件委托错误处理程序
这些新的错误处理程序可以配置为委托给不同的错误处理程序,具体取决于异常类型。 有关详细信息,请参阅委派错误处理程序。
D.4. 2.5 和 2.6 之间的更改
D.4.2. 侦听器容器更改
默认值EOSMode现在BETA.
有关详细信息,请参阅 Exactly Once 语义。
各种错误处理程序(扩展FailedRecordProcessor) 和DefaultAfterRollbackProcessor现在重置BackOff如果恢复失败。
此外,您现在可以选择BackOff根据失败的记录和/或异常使用。
您现在可以配置adviceChain在容器属性中。
有关更多信息,请参阅侦听器容器属性。
将容器配置为发布时ListenerContainerIdleEvents,它现在发布一个ListenerContainerNoLongerIdleEvent发布空闲事件后收到记录时。
有关详细信息,请参阅应用程序事件和检测空闲和无响应使用者。
D.4.3. @KafkaListener更改
使用手动分区分配时,您现在可以指定通配符来确定哪些分区应重置为初始偏移量。
此外,如果监听器实现ConsumerSeekAware,onPartitionsAssigned()在手动分配后调用。
(2.5.5 版中也添加了)。
有关详细信息,请参阅显式分区分配。
新增便捷方法AbstractConsumerSeekAware使寻找更容易。
有关详细信息,请参阅搜索到特定偏移量。
D.4.4. ErrorHandler 更改
的子类FailedRecordProcessor(例如SeekToCurrentErrorHandler,DefaultAfterRollbackProcessor,RecoveringBatchErrorHandler) 现在可以配置为重置重试状态,如果异常类型与此记录之前发生的异常类型不同。
D.4.5. 生产者工厂变更
您现在可以为生产者设置最大年龄,之后它们将被关闭并重新创建。 有关详细信息,请参阅事务。
您现在可以在DefaultKafkaProducerFactory已被创建。
例如,如果必须在凭据更改后更新 SSL 密钥/信任存储位置,这可能很有用。
看用DefaultKafkaProducerFactory了解更多信息。
D.5. 2.4 和 2.5 之间的更改
本节介绍从 2.4 版到 2.5 版所做的更改。 有关早期版本的更改,请参阅更改历史记录。
D.5.1. 消费者/生产者工厂变更
默认的消费者和生产者工厂现在可以在创建或关闭消费者或生产者时调用回调。 提供了原生 Micrometer 指标的实现。 有关更多信息,请参阅工厂监听器。
现在,您可以在运行时更改引导服务器属性,从而启用故障转移到另一个 Kafka 集群。 有关更多信息,请参阅连接到 Kafka。
D.5.2.StreamsBuilderFactoryBean变化
工厂 bean 现在可以在KafkaStreams创建或销毁。
提供了原生千分尺度量的实现。
有关更多信息,请参阅 KafkaStreams Micrometer 支持。
D.5.5. Delivery Attempts 标头
现在有一个选项可以添加一个标头,用于在使用某些错误处理程序和回滚处理器后跟踪传递尝试。 有关更多信息,请参阅 Delivery Attempts Header。
D.5.6. @KafkaListener更改
如果需要,现在将自动填充默认回复标头,当@KafkaListener返回类型为Message<?>.
有关详细信息,请参阅回复类型消息<?>。
这KafkaHeaders.RECEIVED_MESSAGE_KEY不再填充null当传入记录具有null钥匙;标题被完全省略。
@KafkaListener方法现在可以指定一个ConsumerRecordMetadata参数,而不是对元数据(如主题、分区等)使用离散标头。
有关更多信息,请参阅消费者记录元数据。
D.5.7. 侦听器容器更改
这assignmentCommitOptioncontainer 属性现在是LATEST_ONLY_NO_TX默认情况下。
有关更多信息,请参阅侦听器容器属性。
这subBatchPerPartitioncontainer 属性现在是true默认情况下,使用事务时。
有关详细信息,请参阅事务。
一个新的RecoveringBatchErrorHandler现在提供了。
现在支持静态组成员身份。 有关详细信息,请参阅消息侦听器容器。
配置增量/协作重新平衡时,如果偏移量未能提交,则非致命RebalanceInProgressException,则容器将尝试在重新平衡完成后重新提交仍分配给此实例的分区的偏移量。
默认错误处理程序现在是SeekToCurrentErrorHandler用于唱片听众和RecoveringBatchErrorHandler用于批量侦听器。
有关更多信息,请参阅容器错误处理程序。
现在,您可以控制记录标准错误处理程序故意抛出的异常的级别。 有关更多信息,请参阅容器错误处理程序。
这getAssignmentsByClientId()方法,可以更轻松地确定并发容器中的哪些使用者被分配了哪些分区。
有关更多信息,请参阅侦听器容器属性。
您现在可以禁止整个日志记录ConsumerRecords 出错,调试日志等。
看onlyLogRecordMetadata在侦听器容器属性中。
D.5.8. KafkaTemplate 更改
这KafkaTemplate现在可以维护千分尺定时器。
有关详细信息,请参阅监视。
这KafkaTemplate现在可以配置ProducerConfig属性来覆盖生产者工厂中的属性。
看用KafkaTemplate了解更多信息。
一个RoutingKafkaTemplate现已提供。
看用RoutingKafkaTemplate了解更多信息。
您现在可以使用KafkaSendCallback而不是ListenerFutureCallback以获得更窄的异常,从而更容易提取失败的异常ProducerRecord. 看用KafkaTemplate了解更多信息。
D.5.9. Kafka 字符串序列化器/反序列化器
新增功能ToStringSerializer/StringDeserializers 以及关联的SerDe现在提供了。
有关详细信息,请参阅字符串序列化。
D.5.10. Json反序列化器
这JsonDeserializer现在可以更灵活地确定反序列化类型。
有关详细信息,请参阅使用方法确定类型。
D.5.11. 委托序列化器/反序列化器
这DelegatingSerializer现在可以处理“标准”类型,当出站记录没有标头时。
有关详细信息,请参阅委托序列化器和反序列化器。
D.5.12. 测试更改
这KafkaTestUtils.consumerProps()辅助记录现在设置ConsumerConfig.AUTO_OFFSET_RESET_CONFIG自earliest默认情况下。
有关更多信息,请参阅 JUnit。
D.6. 2.3 和 2.4 之间的更改
D.6.2. ConsumerAwareRebalanceListener
喜欢ConsumerRebalanceListener,这个接口现在有一个额外的方法onPartitionsLost.
有关更多信息,请参阅 Apache Kafka 文档。
与ConsumerRebalanceListener,默认实现不会调用onPartitionsRevoked.
相反,侦听器容器将在调用onPartitionsLost;因此,在实现时不应执行相同的作ConsumerAwareRebalanceListener.
有关更多信息,请参阅重新平衡侦听器末尾的重要说明。
D.6.4. 卡夫卡模板
这KafkaTemplate现在支持非事务发布和事务发布。
看KafkaTemplate事务性和非事务性发布了解更多信息。
D.6.5. 聚合回复卡夫卡模板
这releaseStrategy现在是一个BiConsumer.
现在在超时后(以及记录到达时)调用它;第二个参数是true在超时后调用的情况下。
有关详细信息,请参阅聚合多个回复。
D.6.6. 侦听器容器
这ContainerProperties提供authorizationExceptionRetryInterval选项,让侦听器容器在任何AuthorizationException是由KafkaConsumer.
请参阅其 JavaDocs 和用KafkaMessageListenerContainer了解更多信息。
D.6.7. @KafkaListener
这@KafkaListener注释具有新属性splitIterables;默认为 true。
当回复侦听器返回Iterable此属性控制返回结果是作为单个记录发送还是发送每个元素的记录。
看转发侦听器结果@SendTo更多信息
批处理侦听器现在可以使用BatchToRecordAdapter;例如,这允许在事务中处理批处理,而侦听器一次获取一条记录。
在默认实现中,一个ConsumerRecordRecoverer可用于处理批处理中的错误,而无需停止整个批处理 - 这在使用事务时可能很有用。
有关更多信息,请参阅使用批处理侦听器的事务。
D.6.8. Kafka 流
这StreamsBuilderFactoryBean接受新属性KafkaStreamsInfrastructureCustomizer.
这允许在创建流之前配置构建器和/或拓扑。
有关更多信息,请参阅 Spring Management。
D.7. 2.2 和 2.3 之间的更改
本节介绍从 2.2 版到 2.3 版所做的更改。
D.7.1. 提示、技巧和示例
添加了一个新章节提示、技巧和示例。 请提交 GitHub 问题和/或拉取请求以获取该章中的其他条目。
D.7.4. 配置更改
从 2.3.4 版本开始,missingTopicsFatalcontainer 属性默认为 false。
当这种情况为 true 时,如果代理关闭,应用程序将无法启动;许多用户受到此更改的影响;鉴于 Kafka 是一个高可用性平台,我们没有预料到在没有活动代理的情况下启动应用程序会是一个常见的用例。
D.7.5. 生产者和消费者工厂变更
这DefaultKafkaProducerFactory现在可以配置为为每个线程创建一个生产者。
您还可以提供Supplier<Serializer>实例作为配置类(需要无参数构造函数)的替代方法,或使用Serializer实例,然后在所有生产者之间共享。
看用DefaultKafkaProducerFactory了解更多信息。
同样的选项可用于Supplier<Deserializer>实例中的实例DefaultKafkaConsumerFactory. 看用KafkaMessageListenerContainer了解更多信息。
D.7.6. 侦听器容器更改
以前,错误处理程序收到ListenerExecutionFailedException(实际侦听器例外为cause) 当使用侦听器适配器(例如@KafkaListeners).
本机引发的异常GenericMessageListeners 被原封不动地传递给错误处理程序。
现在一个ListenerExecutionFailedException始终是参数(实际侦听器异常为cause),它提供对容器的group.id财产。
因为侦听器容器有自己的提交偏移量机制,所以它更喜欢 KafkaConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG成为false.
现在,它会自动将其设置为 false,除非在使用者工厂中特别设置或容器的使用者属性覆盖。
这ackOnError属性现在是false默认情况下。
现在可以获得消费者的group.idlistener 方法中的属性。
看获取消费者group.id了解更多信息。
容器具有新属性recordInterceptor允许在调用侦听器之前检查或修改记录。
一个CompositeRecordInterceptor还提供了,以防您需要调用多个拦截器。
有关详细信息,请参阅消息侦听器容器。
这ConsumerSeekAware具有新方法,允许您相对于开始、结束或当前位置执行搜索,并搜索到大于或等于时间戳的第一个偏移量。
有关详细信息,请参阅搜索到特定偏移量。
便利班AbstractConsumerSeekAware现在提供以简化搜索。
有关详细信息,请参阅搜索到特定偏移量。
这ContainerProperties提供idleBetweenPolls选项,让监听器容器中的主循环在KafkaConsumer.poll()调用。
请参阅其 JavaDocs 和用KafkaMessageListenerContainer了解更多信息。
使用时AckMode.MANUAL(或MANUAL_IMMEDIATE),您现在可以通过调用nack在Acknowledgment.
有关更多信息,请参阅提交偏移量。
现在可以使用 Micrometer 监控监听器性能Timers.
有关详细信息,请参阅监视。
容器现在发布与启动相关的其他使用者生命周期事件。 有关详细信息,请参阅应用程序事件。
事务性批处理侦听器现在可以支持僵尸隔离。 有关详细信息,请参阅事务。
侦听器容器工厂现在可以配置ContainerCustomizer在创建和配置每个容器后进一步配置它。
有关详细信息,请参阅容器工厂。
D.7.7. ErrorHandler 更改
这SeekToCurrentErrorHandler现在将某些异常视为致命异常,并禁用这些异常的重试,在第一次失败时调用恢复器。
这SeekToCurrentErrorHandler和SeekToCurrentBatchErrorHandler现在可以配置为应用BackOff(线程睡眠)。
从版本 2.3.2 开始,当错误处理程序在恢复失败的记录后返回时,将提交恢复记录的偏移量。
这DeadLetterPublishingRecoverer,当与ErrorHandlingDeserializer,现在将发送到死信主题的消息的有效负载设置为无法反序列化的原始值。
以前,它是null以及提取DeserializationException从邮件头。
有关详细信息,请参阅发布死信记录。
D.7.8. 主题构建器
新职业TopicBuilder提供更方便的创建NewTopic @Beans 用于自动主题配置。
有关详细信息,请参阅配置主题。
D.7.9. Kafka 流更改
您现在可以对StreamsBuilderFactoryBean创建者@EnableKafkaStreams.
有关更多信息,请参阅 Streams 配置。
一个RecoveringDeserializationExceptionHandler现在提供了允许恢复具有反序列化错误的记录。
它可以与DeadLetterPublishingRecoverer将这些记录发送到死信主题。
有关详细信息,请参阅从反序列化异常中恢复。
这HeaderEnricher提供了 transformer,使用 SpEL 生成标头值。
有关详细信息,请参阅标头扩充器。
这MessagingTransformer已提供。这允许 Kafka 流拓扑与 spring-messaging 组件(例如 Spring Integration 流)交互。 看MessagingProcessor和 See[从KStream] 以获取更多信息。
D.7.10. JSON组件更改
现在,所有 JSON 感知组件默认配置为 JacksonObjectMapper由JacksonUtils.enhancedObjectMapper(). 这JsonDeserializer现在提供TypeReference基于构造函数,以便更好地处理目标通用容器类型。
还有一个JacksonMimeTypeModule已引入序列化org.springframework.util.MimeType到普通字符串。
有关更多信息,请参阅其 JavaDocs 和 Serialization, Deserialization, and Message Conversion。
一个ByteArrayJsonMessageConverter以及为所有 Json 转换器提供的新超类,JsonMessageConverter.
此外,一个StringOrBytesSerializer现已上市;它可以序列化byte[],Bytes和String值ProducerRecords.
有关更多信息,请参阅 Spring Messaging Message Conversion。
这JsonSerializer,JsonDeserializer和JsonSerde现在拥有流畅的 API,使编程配置更简单。
有关更多信息,请参阅 javadocs、序列化、反序列化和消息转换以及 Streams JSON 序列化和反序列化。
D.7.11. 回复 KafkaTemplate
当回复超时时,未来将异常完成,并使用KafkaReplyTimeoutException而不是KafkaException.
此外,重载的sendAndReceive现在提供了允许在每条消息的基础上指定回复超时的方法。
D.7.12. 聚合回复卡夫卡模板
扩展ReplyingKafkaTemplate通过聚合来自多个接收者的回复。
有关详细信息,请参阅聚合多个回复。
D.7.13. 事务更改
您现在可以覆盖生产者工厂的transactionIdPrefix在KafkaTemplate和KafkaTransactionManager. 看transactionIdPrefix了解更多信息。
D.7.14. 新的委托序列化器/反序列化器
该框架现在提供了一个委托Serializer和Deserializer,利用标头来生成和使用具有多种键/值类型的记录。
有关详细信息,请参阅委托序列化器和反序列化器。
D.7.15. 新的重试解序列化器
该框架现在提供了一个委托RetryingDeserializer,以便在可能发生暂时性错误(如网络问题)时重试序列化。
有关详细信息,请参阅重试反序列化程序。
D.8. 2.1 和 2.2 之间的更改
D.8.2. 类和包更改
这ContainerProperties类已从org.springframework.kafka.listener.config自org.springframework.kafka.listener.
这AckMode枚举已从AbstractMessageListenerContainer自ContainerProperties.
这setBatchErrorHandler()和setErrorHandler()方法已从ContainerProperties对两者AbstractMessageListenerContainer和AbstractKafkaListenerContainerFactory.
D.8.3. 回滚处理后
一个新的AfterRollbackProcessor提供了策略。
有关详细信息,请参阅回滚后处理器。
D.8.4.ConcurrentKafkaListenerContainerFactory变化
您现在可以使用ConcurrentKafkaListenerContainerFactory创建和配置任何ConcurrentMessageListenerContainer,不仅是那些@KafkaListener附注。
有关详细信息,请参阅容器工厂。
D.8.5. 侦听器容器更改
新的容器属性 (missingTopicsFatal)已被添加。
看用KafkaMessageListenerContainer了解更多信息。
一个ConsumerStoppedEvent现在在使用者停止时发出。
有关详细信息,请参阅线程安全。
批处理侦听器可以选择接收完整的ConsumerRecords<?, ?>对象而不是List<ConsumerRecord<?, ?>.
有关更多信息,请参阅批处理侦听器。
这DefaultAfterRollbackProcessor和SeekToCurrentErrorHandler现在可以恢复(跳过)不断失败的记录,默认情况下,在 10 次失败后恢复(跳过)。
可以将它们配置为将失败的记录发布到死信主题。
从 2.2.4 版本开始,在选择死信主题名称时可以使用消费者的组 ID。
这ConsumerStoppingEvent已被添加。
有关详细信息,请参阅应用程序事件。
这SeekToCurrentErrorHandler现在可以配置为在容器配置为AckMode.MANUAL_IMMEDIATE(自 2.2.4 起)。
D.8.6. @KafkaListener更改
您现在可以覆盖concurrency和autoStartup通过在注释上设置属性来获取侦听器容器工厂的属性。
现在,您可以添加配置以确定将哪些标头(如果有)复制到回复消息中。
看@KafkaListener注解了解更多信息。
您现在可以使用@KafkaListener作为你自己注解的元注解。
看@KafkaListener作为元注释了解更多信息。
现在可以更轻松地配置Validator为@Payload验证。
看@KafkaListener @Payload验证了解更多信息。
您现在可以直接在 Comments 上指定 kafka 消费者属性;这些将覆盖消费者工厂中定义的具有相同名称的任何属性(从版本 2.2.4 开始)。 有关详细信息,请参阅注释属性。
D.8.7. 标头映射更改
类型的标题MimeType和MediaType现在在RecordHeader价值。
以前,它们被映射为 JSON,并且仅MimeType被解码。MediaType无法解码。
它们现在是用于互作性的简单字符串。
此外,DefaultKafkaHeaderMapper有一个新的addToStringClasses方法,允许指定应通过使用toString()而不是 JSON。
有关详细信息,请参阅消息头。
D.8.8. 嵌入式 Kafka 更改
这KafkaEmbeddedclass 及其KafkaRule接口已被弃用,取而代之的是EmbeddedKafkaBroker及其 JUnit 4EmbeddedKafkaRule包装纸。
这@EmbeddedKafka注释现在填充EmbeddedKafkaBrokerbean 而不是已弃用的KafkaEmbedded.
此更改允许使用@EmbeddedKafka在 JUnit 5 测试中。
这@EmbeddedKafka注释现在具有属性ports指定填充EmbeddedKafkaBroker.
有关详细信息,请参阅测试应用程序。
D.8.9. JsonSerializer/Deserializer 增强功能
现在,您可以使用生产者和使用者属性来提供类型映射信息。
反序列化程序上提供了新的构造函数,以允许使用提供的目标类型重写类型标头信息。
这JsonDeserializer现在默认删除任何类型信息标头。
您现在可以配置JsonDeserializer使用 Kafka 属性忽略类型信息标头(从 2.2.3 开始)。
有关详细信息,请参阅序列化、反序列化和消息转换。
D.8.10. Kafka 流更改
流配置 bean 现在必须是KafkaStreamsConfiguration对象而不是StreamsConfig对象。
这StreamsBuilderFactoryBean已从包中移动…core自…config.
这KafkaStreamBrancher引入是为了在条件分支构建在KStream实例。
有关更多信息,请参阅 Apache Kafka Streams 支持和配置。
D.8.11. 事务 ID
当侦听器容器启动事务时,transactional.id现在是transactionIdPrefix附加<group.id>.<topic>.<partition>.
此更改允许对僵尸进行适当的围栏,如此处所述。
D.9. 2.0 和 2.1 之间的更改
D.9.2. JSON改进
这StringJsonMessageConverter和JsonSerializer现在在Headers,让转换器和JsonDeserializer在接收时创建特定类型,基于消息本身而不是固定配置的类型。有关详细信息,请参阅序列化、反序列化和消息转换。
D.9.3. 容器停止错误处理程序
现在为记录侦听器和批处理侦听器提供了容器错误处理程序,这些侦听器将侦听器抛出的任何异常视为致命/它们停止容器。有关更多信息,请参阅处理异常。
D.9.4. 暂停和恢复容器
侦听器容器现在具有pause()和resume()方法(自 2.1.3 版起)。
有关更多信息,请参阅暂停和恢复侦听器容器。
D.9.5. 有状态重试
从版本 2.1.3 开始,您可以配置有状态重试。 有关详细信息,请参阅有状态重试。
D.9.6. 客户端 ID
从 2.1.1 版本开始,您现在可以将client.id前缀@KafkaListener.
以前,要自定义客户端 ID,您需要为每个侦听器设置一个单独的使用者工厂(和容器工厂)。
前缀以-n在使用并发时提供唯一的客户端 ID。
D.9.7. 记录偏移量提交
默认情况下,主题偏移量提交的日志记录是使用DEBUG日志记录级别。
从 2.1.2 版开始,中的一个新属性ContainerProperties叫commitLogLevel允许您指定这些消息的日志级别。
看用KafkaMessageListenerContainer了解更多信息。
D.9.8. 默认@KafkaHandler
从版本 2.1.3 开始,您可以指定@KafkaHandler类级别的注释@KafkaListener作为默认值。
看@KafkaListener在类上了解更多信息。
D.9.9. 回复 KafkaTemplate
从 2.1.3 版开始,一个KafkaTemplate提供给支持请求/回复语义。
看用ReplyingKafkaTemplate了解更多信息。
D.9.11. 2.0 的迁移指南
请参阅 2.0 到 2.1 迁移指南。
D.10. 1.3 和 2.0 之间的更改
D.10.2.@KafkaListener变化
您现在可以注释@KafkaListener方法(以及类和@KafkaHandler方法)替换为@SendTo.
如果该方法返回结果,则将其转发到指定的主题。
看转发侦听器结果@SendTo了解更多信息。
D.10.3. 消息侦听器
消息侦听器现在可以知道Consumer对象。
有关详细信息,请参阅消息侦听器。
D.10.4. 使用ConsumerAwareRebalanceListener
Rebalance 监听器现在可以访问Consumer对象。
有关更多信息,请参阅重新平衡侦听器。
D.11. 1.2 和 1.3 之间的更改
D.11.1. 对事务的支持
0.11.0.0 客户端库添加了对事务的支持。
这KafkaTransactionManager并添加了对事务的其他支持。
有关详细信息,请参阅事务。
D.11.2. 对标头的支持
0.11.0.0 客户端库添加了对消息头的支持。
这些现在可以映射到spring-messaging MessageHeaders.
有关详细信息,请参阅消息头。
D.11.4. 对 Kafka 时间戳的支持
KafkaTemplate现在支持添加带有时间戳的记录的 API。
新增功能KafkaHeaders已介绍有关timestamp支持。
此外,新的KafkaConditions.timestamp()和KafkaMatchers.hasTimestamp()添加了测试实用程序。
看用KafkaTemplate,@KafkaListener注解和测试应用程序了解更多详细信息。
D.11.5.@KafkaListener变化
您现在可以配置KafkaListenerErrorHandler处理异常。
有关详细信息,请参阅处理异常。
默认情况下,@KafkaListener id属性现在用作group.id属性,覆盖消费者工厂中配置的属性(如果存在)。
此外,您可以显式配置groupId在注释上。
以前,您需要一个单独的容器工厂(和消费者工厂)才能使用不同的group.id值。
要恢复以前使用出厂配置的行为group.id,将idIsGroup属性设置为false.
D.11.6.@EmbeddedKafka注解
为方便起见,测试类级别@EmbeddedKafka提供注释,以注册KafkaEmbedded作为豆子。
有关详细信息,请参阅测试应用程序。
D.11.7. Kerberos 配置
现在支持配置 Kerberos。 有关更多信息,请参阅 JAAS 和 Kerberos。
D.13. 1.0 和 1.1 之间的更改
D.13.5. 搜索
您现在可以查找每个主题或分区的位置。您可以使用它来设置初始化期间的初始位置,当组管理正在使用中并且 Kafka 分配分区时。您还可以在检测到空闲容器时或在应用程序执行中的任何任意点进行查找。有关更多信息,请参阅 Seeking to a Specific Offset。