Apache Kafka Binder

参考指南

本指南介绍了 Spring Cloud Stream Binder 的 Apache Kafka 实现。 它包含有关其设计、用法和配置选项的信息,以及有关 Stream Cloud Stream 概念如何映射到 Apache Kafka 特定构造的信息。 此外,本指南还介绍了 Spring Cloud Stream 的 Kafka Streams 绑定能力。spring-doc.cadn.net.cn

1. Apache Kafka 绑定器

1.1. 用法

要使用 Apache Kafka 绑定器,您需要将spring-cloud-stream-binder-kafka作为 Spring Cloud Stream 应用程序的依赖项,如以下 Maven 示例所示:spring-doc.cadn.net.cn

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

或者,您也可以使用 Spring Cloud Stream Kafka Starter,如以下 Maven 示例所示:spring-doc.cadn.net.cn

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

1.2. 概述

下图显示了 Apache Kafka 绑定程序如何运行的简化图:spring-doc.cadn.net.cn

卡夫卡活页夹
图 1.卡夫卡活页夹

Apache Kafka Binder 实现将每个目标映射到 Apache Kafka 主题。 消费者组直接映射到相同的 Apache Kafka 概念。 分区也直接映射到 Apache Kafka 分区。spring-doc.cadn.net.cn

该绑定器当前使用 Apache Kafkakafka-clients版本3.1.0. 此客户端可以与较旧的代理通信(请参阅 Kafka 文档),但某些功能可能不可用。 例如,对于低于 0.11.x.x 的版本,不支持本机标头。 此外,0.11.x.x 不支持autoAddPartitions财产。spring-doc.cadn.net.cn

1.3. 配置选项

本节包含 Apache Kafka 绑定器使用的配置选项。spring-doc.cadn.net.cn

有关与活页夹相关的常见配置选项和属性,请参阅核心文档中的绑定属性spring-doc.cadn.net.cn

1.3.1. Kafka Binder 属性

spring.cloud.stream.kafka.binder.brokers

Kafka 绑定器连接到的代理列表。spring-doc.cadn.net.cn

违约:localhost.spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.defaultBrokerPort 的

brokers允许指定带或不带端口信息的主机(例如,host1,host2:port2). 当代理列表中未配置端口时,这将设置默认端口。spring-doc.cadn.net.cn

违约:9092.spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.configuration

客户端属性(生产者和使用者)的键/值映射传递给绑定器创建的所有客户端。 由于生产者和使用者都使用这些属性,因此使用应限制为公共属性,例如安全设置。 通过此配置提供的未知 Kafka 生产者或消费者属性将被过滤掉,不允许传播。 此处的属性将取代启动中设置的任何属性。spring-doc.cadn.net.cn

默认值:空地图。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.consumer属性

任意 Kafka 客户端使用者属性的键/值映射。 除了支持已知的 Kafka 使用者属性外,这里还允许未知的使用者属性。 此处的属性将取代在启动和configuration属性。spring-doc.cadn.net.cn

默认值:空地图。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.headers

由活页夹传输的自定义标头列表。 仅当与旧应用程序(⇐ 1.3.x)通信时才需要kafka-clients版本 < 0.11.0.0。较新的版本本机支持标头。spring-doc.cadn.net.cn

默认值:空。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.health超时

等待获取分区信息的时间,以秒为单位。 如果此计时器过期,运行状况将报告为关闭。spring-doc.cadn.net.cn

默认值:10。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.requiredAcks

代理上所需的确认数。 请参阅生产者的 Kafka 文档acks财产。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.minPartitionCount

仅在以下情况下有效autoCreateTopicsautoAddPartitions已设置。 Binder 在生成或使用数据的主题上配置的全局最小分区数。 它可以被partitionCount的设置或通过instanceCount * concurrency生产者的设置(如果其中任何一个较大)。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.producer属性

任意 Kafka 客户端生产者属性的键/值映射。 除了支持已知的 Kafka 生产者属性外,这里还允许使用未知的生产者属性。 此处的属性将取代在启动和configuration属性。spring-doc.cadn.net.cn

默认值:空地图。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.replicationFactor

自动创建主题的复制因子autoCreateTopics处于活动状态。 可以在每个绑定上覆盖。spring-doc.cadn.net.cn

如果您使用的是 2.4 之前的 Kafka 代理版本,则此值应至少设置为1. 从 3.0.8 版开始,绑定器使用-1作为默认值,这表示代理“default.replication.factor”属性将用于确定副本数。 请咨询您的 Kafka 代理管理员,看看是否有需要最小复制因子的策略,如果是这种情况,则通常会default.replication.factor将匹配该值,并且-1应使用,除非您需要大于最小值的复制系数。
spring.cloud.stream.kafka.binder.autoCreateTopics

如果设置为true,活页夹会自动创建新主题。 如果设置为false,则活页夹依赖于已配置的主题。 在后一种情况下,如果主题不存在,则活页夹无法启动。spring-doc.cadn.net.cn

此设置独立于auto.create.topics.enable经纪人的设置,不会影响它。 如果服务器设置为自动创建主题,则可以使用默认代理设置将其作为元数据检索请求的一部分创建。

违约:true.spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.autoAddPartitions

如果设置为true,如果需要,活页夹会创建新分区。 如果设置为false,则 binder 依赖于已配置的主题的分区大小。 如果目标主题的分区计数小于预期值,则绑定程序无法启动。spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

在活页夹中启用事务。看transaction.id在 Kafka 文档中,在spring-kafka文档。 启用交易后,单个producer属性被忽略,所有生产者都使用spring.cloud.stream.kafka.binder.transaction.producer.*性能。spring-doc.cadn.net.cn

默认值null(无交易)spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.transaction.producer.*

事务性活页夹中生产者的全局生产者属性。 看spring.cloud.stream.kafka.binder.transaction.transactionIdPrefixKafka 生产者属性以及所有绑定器支持的通用生产者属性。spring-doc.cadn.net.cn

默认值:查看各个生产者属性。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.headerMapperBeanName

的 bean 名称KafkaHeaderMapper用于映射spring-messagingheaders 与 Kafka 标头之间的标头。例如,如果您希望自定义BinderHeaderMapper对标头使用 JSON 反序列化的 bean。如果这个自定义BinderHeaderMapper使用此属性的 Bean 不可供 Binder 使用,则 Binder 将查找名称为kafkaBinderHeaderMapperBinderHeaderMapper在回退到默认值之前BinderHeaderMapper由活页夹创建。spring-doc.cadn.net.cn

默认值:无。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader

将绑定器运行状况设置为down,当找到主题上的任何分区时,无论从该分区接收数据的使用者如何,都找不到没有领导者。spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.certificateStore目录

当信任库或密钥库证书位置作为非本地文件系统资源(org.springframework.core.io.Resource 支持的资源,例如 CLASSPATH、HTTP 等)时,Binder 将资源从路径(可转换为 org.springframework.core.io.Resource)复制到文件系统上的某个位置。对于两个代理级证书(ssl.truststore.locationssl.keystore.location) 和用于架构注册表的证书 (schema.registry.ssl.truststore.locationschema.registry.ssl.keystore.location). 请记住,信任库和密钥库位置路径必须在spring.cloud.stream.kafka.binder.configuration…​. 例如spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location,spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location等。 该文件将被复制到指定为此属性值的位置,该位置必须是文件系统上可由运行应用程序的进程写入的现有目录。如果未设置此值,并且证书文件是非本地文件系统资源,则它将复制到系统的临时目录,如System.getProperty("java.io.tmpdir"). 如果存在此值,但在文件系统上找不到该目录或不可写,则也是如此。spring-doc.cadn.net.cn

默认值:无。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetrics启用

当设置为 true 时,每当访问指标时,都会计算每个使用者主题的偏移滞后指标。 当设置为 false 时,仅使用定期计算的偏移滞后。spring-doc.cadn.net.cn

默认值:truespring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval

计算每个使用者主题的偏移滞后的时间间隔。 每当metrics.defaultOffsetLagMetricsEnabled被禁用或其 计算时间太长。spring-doc.cadn.net.cn

默认值:60 秒spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.enableObservation

在此活页夹中的所有绑定上启用千分尺观察注册表。spring-doc.cadn.net.cn

默认值:falsespring-doc.cadn.net.cn

1.3.2. Kafka 消费者属性

为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为spring.cloud.stream.kafka.default.consumer.<property>=<value>.

以下属性仅适用于 Kafka 使用者,并且 必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer..spring-doc.cadn.net.cn

admin.configuration

从版本 2.1.1 开始,此属性已被弃用,取而代之的是topic.properties,并且将在将来的版本中删除对它的支持。spring-doc.cadn.net.cn

admin.replicas-assignment

从版本 2.1.1 开始,此属性已被弃用,取而代之的是topic.replicas-assignment,并且将在将来的版本中删除对它的支持。spring-doc.cadn.net.cn

admin.replication-factor

从版本 2.1.1 开始,此属性已被弃用,取而代之的是topic.replication-factor,并且将在将来的版本中删除对它的支持。spring-doc.cadn.net.cn

autoRebalance启用

什么时候true,主题分区将在消费者组的成员之间自动重新平衡。 什么时候false,每个消费者都会根据spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex. 这需要同时spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex在每个启动的实例上适当设置的属性。 的值spring.cloud.stream.instanceCount在这种情况下,属性通常必须大于 1。spring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

ackEach记录

什么时候autoCommitOffsettrue,则此设置规定是否在处理每条记录后提交偏移量。 默认情况下,偏移量将在consumer.poll()已处理。 轮询返回的记录数可以通过max.poll.recordsKafka 属性,通过使用者设置configuration财产。 将此设置为true可能会导致性能下降,但这样做会降低发生故障时重新传递记录的可能性。 另请参阅活页夹requiredAcks属性,这也会影响提交偏移量的性能。 此属性从 3.1 开始被弃用,转而使用ackMode. 如果ackMode未设置且未启用批处理模式,RECORDackMode 将被使用。spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

自动提交偏移

从版本 3.1 开始,此属性已弃用。 看ackMode有关替代方案的更多详细信息。 是否在处理消息时自动提交偏移量。 如果设置为false,带有键kafka_acknowledgment的类型org.springframework.kafka.support.Acknowledgment标头存在于入站消息中。 应用程序可以使用此标头来确认消息。 有关详细信息,请参阅示例部分。 当此属性设置为false,Kafka 绑定器将 ack 模式设置为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL应用程序负责确认记录。 另请参阅ackEachRecord.spring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

ack模式

指定容器确认模式。 这是基于 Spring Kafka 中定义的 AckMode 枚举。 如果ackEachRecord属性设置为true并且消费者不处于批处理模式,则这将使用RECORD,否则,请使用此属性使用提供的 ack 模式。spring-doc.cadn.net.cn

自动提交错误

在可轮询的消费者中,如果设置为true,它总是在错误时自动提交。 如果未设置(默认值)或 false,则不会在可轮询的消费者中自动提交。 请注意,此属性仅适用于可轮询的使用者。spring-doc.cadn.net.cn

默认值:未设置。spring-doc.cadn.net.cn

重置偏移量

是否将使用者上的偏移量重置为 startOffset 提供的值。 如果KafkaBindingRebalanceListener提供;请参阅使用 KafkaBindingRebalanceListener。 有关此属性的更多信息,请参阅重置偏移。spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

开始偏移量

新组的起始偏移量。 允许的值:earliestlatest. 如果为消费者“绑定”显式设置了使用者组(通过spring.cloud.stream.bindings.<channelName>.group),'startOffset' 设置为earliest.否则,它设置为latest对于anonymous消费者群体。 有关此属性的更多信息,请参阅重置偏移。spring-doc.cadn.net.cn

默认值:null(相当于earliest).spring-doc.cadn.net.cn

启用Dlq

当设置为 true 时,它会为使用者启用 DLQ 行为。 默认情况下,导致错误的邮件将转发到名为error.<destination>.<group>. 可以通过设置dlqName属性或通过定义@Bean类型DlqDestinationResolver. 这为更常见的 Kafka 重放场景提供了另一种选择,适用于错误数量相对较少且重放整个原始主题可能过于繁琐的情况。 请参阅死信主题处理处理以获取更多信息。 从 V2.0 开始,发送到 DLQ 主题的消息将使用以下标头进行增强:x-original-topic,x-exception-messagex-exception-stacktracebyte[]. 缺省情况下,失败的记录将发送到 DLQ 主题中与原始记录相同的分区号。 有关如何更改该行为,请参阅死信主题分区选择不允许在以下情况下使用destinationIsPatterntrue.spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

dlq分区

什么时候enableDlq为 true,并且未设置此属性,则创建与主主题具有相同分区数的死信主题。 通常,死信记录将发送到死信主题中与原始记录相同的分区。 此行为可以更改;请参阅死信主题分区选择。 如果此属性设置为1并且没有DqlPartitionFunctionbean,所有死信记录都将写入分区0. 如果此属性大于1您必须提供一个DlqPartitionFunction豆。 请注意,实际分区计数受活页夹的minPartitionCount财产。spring-doc.cadn.net.cn

违约:nonespring-doc.cadn.net.cn

配置

映射包含通用 Kafka 使用者属性的键/值对。 除了具有 Kafka 消费者属性外,还可以在此处传递其他配置属性。 例如,应用程序所需的某些属性,例如spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar. 这bootstrap.servers属性不能在此处设置;如果需要连接到多个集群,请使用多绑定器支持。spring-doc.cadn.net.cn

默认值:空地图。spring-doc.cadn.net.cn

dlq名称

要接收错误消息的 DLQ 主题的名称。spring-doc.cadn.net.cn

默认值:null(如果未指定,则导致错误的消息将转发到名为error.<destination>.<group>).spring-doc.cadn.net.cn

dlqProducer属性

使用此功能,可以设置特定于 DLQ 的生产者属性。 通过 kafka 生产者属性提供的所有属性都可以通过此属性进行设置。 当在消费者上启用本机解码(即 useNativeDecoding: true)时,应用程序必须为 DLQ 提供相应的键/值序列化器。 这必须以以下形式提供dlqProducerProperties.configuration.key.serializerdlqProducerProperties.configuration.value.serializer.spring-doc.cadn.net.cn

默认值:默认 Kafka 生产者属性。spring-doc.cadn.net.cn

标准标头

指示入站通道适配器填充的标准标头。 允许的值:none,id,timestampboth. 如果使用本机反序列化并且接收消息的第一个组件需要id(例如配置为使用 JDBC 消息存储的聚合器)。spring-doc.cadn.net.cn

违约:nonespring-doc.cadn.net.cn

转换器BeanName

实现RecordMessageConverter.用于入站通道适配器以替换默认的MessagingMessageConverter.spring-doc.cadn.net.cn

违约:nullspring-doc.cadn.net.cn

空闲事件间隔

指示最近未收到任何消息的事件之间的间隔(以毫秒为单位)。 使用ApplicationListener<ListenerContainerIdleEvent>以接收这些事件。 有关使用示例,请参阅示例:暂停和恢复消费者。spring-doc.cadn.net.cn

违约:30000spring-doc.cadn.net.cn

目的地是模式

当 true 时,目标被视为正则表达式Pattern用于匹配代理的主题名称。 如果为 true,则不会预配主题,并且enableDlq不允许,因为绑定程序在配置阶段不知道主题名称。 请注意,检测与模式匹配的新主题所花费的时间由消费者属性控制metadata.max.age.ms,(在撰写本文时)默认为 300,000 毫秒(5 分钟)。 这可以使用configuration属性。spring-doc.cadn.net.cn

违约:falsespring-doc.cadn.net.cn

topic.properties

一个Map配置新主题时使用的 Kafka 主题属性——例如spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0spring-doc.cadn.net.cn

默认值:无。spring-doc.cadn.net.cn

topic.replicas-assignment

副本分配的 Map<Integer, List<Integer>>,键是分区,值是分配。 在预配新主题时使用。 请参阅NewTopicJavadocs 中的kafka-clients罐。spring-doc.cadn.net.cn

默认值:无。spring-doc.cadn.net.cn

topic.replication-factor

预配主题时要使用的复制因子。覆盖活页夹范围的设置。如果出现以下情况,则忽略replicas-assignments存在。spring-doc.cadn.net.cn

默认值:无(使用活页夹范围的默认值 -1)。spring-doc.cadn.net.cn

pollTimeout

超时,用于在可轮询消费者中轮询。spring-doc.cadn.net.cn

默认值:5 秒。spring-doc.cadn.net.cn

事务管理器

的 Bean 名称KafkaAwareTransactionManager用于覆盖此绑定的绑定器的事务管理器。如果您想将另一个事务与 Kafka 事务同步,通常需要使用ChainedKafkaTransactionManaager. 要实现记录的一次使用和生产,使用者和生产者绑定必须全部配置为相同的事务管理器。spring-doc.cadn.net.cn

默认值:无。spring-doc.cadn.net.cn

txCommit已恢复

使用事务性绑定器时,默认情况下,恢复记录的偏移量(例如,当重试用尽并将记录发送到死信主题时)将通过新事务提交。 将此属性设置为false禁止提交已恢复记录的偏移量。spring-doc.cadn.net.cn

默认值:true。spring-doc.cadn.net.cn

commonErrorHandlerBeanName (常见错误处理程序)BeanName

CommonErrorHandler每个消费者绑定使用的 bean 名称。 如果存在,此用户提供了CommonErrorHandler优先于绑定程序定义的任何其他错误处理程序。 这是表达错误处理程序的便捷方法,如果应用程序不想使用ListenerContainerCustomizer然后检查目标/组组合以设置错误处理程序。spring-doc.cadn.net.cn

默认值:无。spring-doc.cadn.net.cn

1.3.3. 重置偏移量

当应用程序启动时,每个分配的分区中的初始位置取决于两个属性startOffsetresetOffsets. 如果resetOffsetsfalse,普通 Kafka 消费者auto.offset.reset语义适用。 即,如果绑定的消费者组的分区没有提交的偏移量,则位置为earliestlatest. 默认情况下,具有显式groupearliest和匿名绑定(没有group)使用latest. 可以通过将startOffsetbinding 属性。 第一次使用特定group. 不存在已提交偏移量的另一种情况是偏移量已过期。 使用现代代理(自 2.1 起)和默认代理属性,偏移量将在最后一个成员离开组后 7 天过期。 请参阅offsets.retention.minutes经纪人属性了解更多信息。spring-doc.cadn.net.cn

什么时候resetOffsetstrue,则绑定器将与代理上没有提交偏移量时适用的语义类似的语义,就好像此绑定从未从主题中消耗过一样;即,任何当前提交的偏移量都将被忽略。spring-doc.cadn.net.cn

以下是可以使用此功能的两个用例。spring-doc.cadn.net.cn

  1. 从包含键/值对的压缩主题使用。 设置resetOffsetstruestartOffsetearliest;绑定将执行seekToBeginning在所有新分配的分区上。spring-doc.cadn.net.cn

  2. 从包含事件的主题使用,其中您只对此绑定运行时发生的事件感兴趣。 设置resetOffsetstruestartOffsetlatest;绑定将执行seekToEnd在所有新分配的分区上。spring-doc.cadn.net.cn

如果在初始分配之后发生重新平衡,则仅对在初始分配期间未分配的任何新分配的分区执行搜索。

有关对主题偏移量的更多控制,请参阅使用 KafkaBindingRebalanceListener;当提供侦听器时,resetOffsets不应设置为true,否则会导致错误。spring-doc.cadn.net.cn

1.3.4. 使用批次

从 3.0 版本开始,当spring.cloud.stream.bindings.<name>.consumer.batch-mode设置为true,通过轮询 Kafka 收到的所有记录Consumer将作为List<?>到 listener 方法。 否则,将一次调用一条记录。 批处理的大小由 Kafka 使用者属性控制max.poll.records,fetch.min.bytes,fetch.max.wait.ms;有关更多信息,请参阅 Kafka 文档。spring-doc.cadn.net.cn

从版本开始4.0.2,则活页夹在批处理模式下使用时支持 DLQ 功能。 请记住,在处于批处理模式的使用者绑定上使用 DLQ 时,从上一次轮询接收的所有记录都将传递到 DLQ 主题。spring-doc.cadn.net.cn

使用批处理模式时,不支持在绑定器中重试,因此maxAttempts将被覆盖为 1。 您可以配置DefaultErrorHandler(使用ListenerContainerCustomizer) 以实现与在 Binder 中重试类似的功能。 您也可以使用手册AckMode并调用Ackowledgment.nack(index, sleep)提交部分批次的偏移量并重新传递剩余记录。 有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档

1.3.5. Kafka 生产者属性

为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为spring.cloud.stream.kafka.default.producer.<property>=<value>.

以下属性仅适用于 Kafka 生产者,并且 必须以spring.cloud.stream.kafka.bindings.<channelName>.producer..spring-doc.cadn.net.cn

admin.configuration

从版本 2.1.1 开始,此属性已被弃用,取而代之的是topic.properties,并且将在将来的版本中删除对它的支持。spring-doc.cadn.net.cn

admin.replicas-assignment

从版本 2.1.1 开始,此属性已被弃用,取而代之的是topic.replicas-assignment,并且将在将来的版本中删除对它的支持。spring-doc.cadn.net.cn

admin.replication-factor

从版本 2.1.1 开始,此属性已被弃用,取而代之的是topic.replication-factor,并且将在将来的版本中删除对它的支持。spring-doc.cadn.net.cn

缓冲区大小

Kafka 生产者在发送前尝试批处理的数据量上限(以字节为单位)。spring-doc.cadn.net.cn

违约:16384.spring-doc.cadn.net.cn

同步

生产者是否同步。spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

发送超时表达式

根据传出消息计算的 SpEL 表达式,用于在启用同步发布时评估等待 ack 的时间,例如,headers['mySendTimeout']. 超时值以毫秒为单位。 在 3.0 之前的版本中,除非使用本机编码,否则无法使用有效负载,因为在计算此表达式时,有效负载已经采用byte[]. 现在,在转换有效负载之前对表达式进行计算。spring-doc.cadn.net.cn

违约:none.spring-doc.cadn.net.cn

batchTimeout

生产者在发送消息之前等待多长时间,以允许更多消息在同一批次中累积。 (通常,生产者根本不会等待,而只是发送上一次发送过程中累积的所有消息。非零值可能会以延迟为代价来增加吞吐量。spring-doc.cadn.net.cn

messageKey表达式

根据用于填充生成的 Kafka 消息的密钥的传出消息进行评估的 SpEL 表达式,例如headers['myKey']. 在 3.0 之前的版本中,除非使用本机编码,否则无法使用有效负载,因为在计算此表达式时,有效负载已经采用byte[]. 现在,在转换有效负载之前对表达式进行计算。 对于常规处理器(Function<String, String>Function<Message<?>, Message<?>),如果生成的密钥需要与来自主题的传入密钥相同,则可以按如下方式设置此属性。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']对于响应式函数,需要牢记一个重要的警告。 在这种情况下,由应用程序手动将标头从传入邮件复制到出站邮件。 您可以设置标题,例如myKey并使用headers['myKey']如上所述,或者为方便起见,只需将KafkaHeaders.MESSAGE_KEYheader,并且您根本不需要设置此属性。spring-doc.cadn.net.cn

违约:none.spring-doc.cadn.net.cn

headerPatterns

以逗号分隔的简单模式列表,用于匹配要映射到 Kafka 的 Spring 消息传递标头HeadersProducerRecord. 模式可以以通配符(星号)开头或结尾。 模式可以通过前缀!. 匹配在第一次匹配(正或负)后停止。 例如!ask,as*将通过ash但不是ask.idtimestamp永远不会映射。spring-doc.cadn.net.cn

默认值:(所有标头 - 除了*idtimestamp)spring-doc.cadn.net.cn

配置

映射包含通用 Kafka 生产者属性的键/值对。 这bootstrap.servers属性不能在此处设置;如果需要连接到多个集群,请使用多绑定器支持。spring-doc.cadn.net.cn

默认值:空地图。spring-doc.cadn.net.cn

topic.properties

一个Map配置新主题时使用的 Kafka 主题属性——例如spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0spring-doc.cadn.net.cn

topic.replicas-assignment

副本分配的 Map<Integer, List<Integer>>,键是分区,值是分配。 在预配新主题时使用。 请参阅NewTopicJavadocs 中的kafka-clients罐。spring-doc.cadn.net.cn

默认值:无。spring-doc.cadn.net.cn

topic.replication-factor

预配主题时要使用的复制因子。覆盖活页夹范围的设置。如果出现以下情况,则忽略replicas-assignments存在。spring-doc.cadn.net.cn

默认值:无(使用活页夹范围的默认值 -1)。spring-doc.cadn.net.cn

使用主题标头

设置为true将默认绑定目标(主题名称)替换为KafkaHeaders.TOPIC出站邮件中的邮件头。 如果标头不存在,则使用默认绑定目标。spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

记录元数据通道

的 bean 名称MessageChannel应将成功的发送结果发送到哪个;Bean 必须存在于应用程序上下文中。 发送到通道的消息是带有附加标头的已发送消息(转换后,如果有)KafkaHeaders.RECORD_METADATA. 标头包含一个RecordMetadataKafka 客户端提供的对象;它包括在主题中写入记录的分区和偏移量。spring-doc.cadn.net.cn

ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)spring-doc.cadn.net.cn

失败的发送将进入生产者错误通道(如果已配置);请参阅错误通道spring-doc.cadn.net.cn

默认值:null。spring-doc.cadn.net.cn

Kafka 绑定器使用partitionCount将生产者设置为提示,以创建具有给定分区计数的主题(结合minPartitionCount,两者中的最大值是正在使用的值)。 配置两者时要小心minPartitionCount用于活页夹和partitionCount对于应用程序,因为使用较大的值。 如果主题已存在分区计数较小且autoAddPartitions禁用(默认值),则活页夹无法启动。 如果主题已存在分区计数较小且autoAddPartitions启用,则添加新分区。 如果主题已存在分区数大于 (minPartitionCountpartitionCount),则使用现有分区计数。
压缩

compression.typeproducer 属性。 支持的值包括none,gzip,snappy,lz4zstd. 如果您覆盖kafka-clientsjar 到 2.1.0(或更高版本),如 Spring for Apache Kafka 文档中所述,并希望使用zstd压缩, 使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd.spring-doc.cadn.net.cn

违约:none.spring-doc.cadn.net.cn

事务管理器

的 Bean 名称KafkaAwareTransactionManager用于覆盖此绑定的绑定器的事务管理器。如果您想将另一个事务与 Kafka 事务同步,通常需要使用ChainedKafkaTransactionManaager. 要实现记录的一次使用和生产,使用者和生产者绑定必须全部配置为相同的事务管理器。spring-doc.cadn.net.cn

默认值:无。spring-doc.cadn.net.cn

close超时

关闭生产者时等待的超时秒数。spring-doc.cadn.net.cn

允许非事务性

通常,与事务性绑定器关联的所有输出绑定都将在新事务中发布(如果尚未处理)。 此属性允许您覆盖该行为。 如果设置为 true,则发布到此输出绑定的记录将不会在事务中运行,除非事务已在处理中。spring-doc.cadn.net.cn

违约:falsespring-doc.cadn.net.cn

1.3.6. 使用示例

在本部分中,我们将演示上述属性在特定方案中的用法。spring-doc.cadn.net.cn

示例:设置ackModeMANUAL和依赖手动确认

此示例说明了如何在使用者应用程序中手动确认偏移量。spring-doc.cadn.net.cn

此示例要求spring.cloud.stream.kafka.bindings.input.consumer.ackMode设置为MANUAL. 为您的示例使用相应的输入通道名称。spring-doc.cadn.net.cn

@SpringBootApplication
public class ManuallyAcknowdledgingConsumer {

 public static void main(String[] args) {
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }

 @Bean
 public Consumer<Message<?>> process() {
    return message -> {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
        }
    };
}
示例:安全配置

Apache Kafka 0.9 支持客户端和代理之间的安全连接。 要利用此功能,请遵循 Apache Kafka 文档中的指南以及 Confluent 文档中的 Kafka 0.9 安全指南。 使用spring.cloud.stream.kafka.binder.configuration选项为活页夹创建的所有客户端设置安全属性。spring-doc.cadn.net.cn

例如,要将security.protocolSASL_SSL,请设置以下属性:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

所有其他安全属性都可以以类似的方式设置。spring-doc.cadn.net.cn

使用 Kerberos 时,请按照参考文档中的说明创建和引用 JAAS 配置。spring-doc.cadn.net.cn

Spring Cloud Stream 支持使用 JAAS 配置文件和 Spring Boot 属性将 JAAS 配置信息传递给应用程序。spring-doc.cadn.net.cn

使用 JAAS 配置文件

可以使用系统属性为 Spring Cloud Stream 应用程序设置 JAAS 和(可选)krb5 文件位置。以下示例演示如何使用 JAAS 配置文件启动具有 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序:spring-doc.cadn.net.cn

 java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
   --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot 属性

作为拥有 JAAS 配置文件的替代方法,Spring Cloud Stream 提供了一种机制,用于使用 Spring Boot 属性为 Spring Cloud Stream 应用程序设置 JAAS 配置。spring-doc.cadn.net.cn

以下属性可用于配置 Kafka 客户端的登录上下文:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.jaas.login模块

登录模块名称。正常情况下不需要设置。spring-doc.cadn.net.cn

违约:com.sun.security.auth.module.Krb5LoginModule.spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.jaas.controlFlag

登录模块的控制标志。spring-doc.cadn.net.cn

违约:required.spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.jaas.options

映射包含登录模块选项的键/值对。spring-doc.cadn.net.cn

默认值:空地图。spring-doc.cadn.net.cn

以下示例演示如何使用 Spring Boot 配置属性启动具有 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序:spring-doc.cadn.net.cn

 java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.autoCreateTopics=false \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
   --spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
   --spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
   --spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
   --spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM

前面的示例表示与以下 JAAS 文件等效的内容:spring-doc.cadn.net.cn

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_client.keytab"
    principal="[email protected]";
};

如果所需的主题已存在于代理上或将由管理员创建,则可以关闭自动创建,并且只需要发送客户端 JAAS 属性。spring-doc.cadn.net.cn

不要在同一应用程序中混合使用 JAAS 配置文件和 Spring Boot 属性。 如果-Djava.security.auth.login.configsystem 属性已经存在,则 Spring Cloud Stream 会忽略 Spring Boot 属性。
使用autoCreateTopicsautoAddPartitions使用 Kerberos。 通常,应用程序可能会使用在 Kafka 和 Zookeeper 中没有管理权限的主体。 因此,依赖 Spring Cloud Stream 创建/修改主题可能会失败。 在安全环境中,强烈建议使用 Kafka 工具以管理方式创建主题和管理 ACL。
多绑定器配置和 JAAS

当连接到多个集群时,每个集群都需要单独的 JAAS 配置,请使用属性sasl.jaas.config. 当应用程序中存在此属性时,它优先于上述其他策略。 有关更多详细信息,请参阅此 KIP-85spring-doc.cadn.net.cn

例如,如果您的应用程序中有两个集群,具有单独的 JAAS 配置,那么您可以使用以下模板:spring-doc.cadn.net.cn

spring.cloud.stream:
    binders:
        kafka1:
          type: kafka
          environment:
             spring:
               cloud:
                 stream:
                  kafka:
                    binder:
                      brokers: localhost:9092
                      configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
        kafka2:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9093
                      configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1-secret\";"
    kafka.binder:
        configuration:
          security.protocol: SASL_PLAINTEXT
          sasl.mechanism: PLAIN

请注意,Kafka 集群和sasl.jaas.config在上述配置中,它们中的每一个的值都不同。spring-doc.cadn.net.cn

有关如何设置和运行此类应用程序的更多详细信息,请参阅此示例应用程序spring-doc.cadn.net.cn

示例:暂停和恢复使用者

如果希望暂停使用但不导致分区重新平衡,则可以暂停和恢复使用。 这可以通过管理绑定生命周期来促进,如 Spring Cloud Stream 文档中的绑定可视化和控制所示,使用State.PAUSEDState.RESUMED.spring-doc.cadn.net.cn

要恢复,您可以使用ApplicationListener(或@EventListener方法)接收ListenerContainerIdleEvent实例。 事件发布的频率由idleEventInterval财产。spring-doc.cadn.net.cn

1.4. 事务性活页夹

通过设置启用事务spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix设置为非空值,例如tx-. 当在处理器应用程序中使用时,消费者启动事务;在使用者线程上发送的任何记录都参与同一事务。 当监听器正常退出时,监听器容器会将偏移量发送给事务并提交。 一个通用的生产者工厂用于使用spring.cloud.stream.kafka.binder.transaction.producer.*性能;忽略单个绑定 Kafka 生产者属性。spring-doc.cadn.net.cn

事务不支持正常的活页夹重试(和死信),因为重试将在原始事务中运行,该事务可能会回滚,任何已发布的记录也将回滚。 启用重试时(公共属性maxAttempts大于零),重试属性用于配置DefaultAfterRollbackProcessor以启用容器级别的重试。 同样,此功能不是在事务中发布死信记录,而是通过DefaultAfterRollbackProcessor在主事务回滚后运行。

如果您希望在源应用程序中使用事务,或者从某个任意线程中用于仅生产者事务(例如@Scheduled方法),您必须获取对事务性生产者工厂的引用,并定义一个KafkaTransactionManagerbean 使用它。spring-doc.cadn.net.cn

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
        @Value("${unique.tx.id.per.instance}") String txId) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionId(txId)
    return tm;
}

请注意,我们使用BinderFactory;用null在第一个参数中,当只有一个 Binder 配置时。 如果配置了多个活页夹,请使用活页夹名称获取引用。 一旦我们有了对 binder 的引用,我们就可以获得对ProducerFactory并创建事务管理器。spring-doc.cadn.net.cn

然后您将使用正常的 Spring 事务支持,例如TransactionTemplate@Transactional例如:spring-doc.cadn.net.cn

public static class Sender {

    @Transactional
    public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
        stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
    }

}

如果您希望将仅生产者事务与其他事务管理器中的事务同步,请使用ChainedTransactionManager.spring-doc.cadn.net.cn

如果您部署应用程序的多个实例,则每个实例都需要一个唯一的transactionIdPrefix.

1.5. 错误通道

从 1.3 版开始,绑定器无条件地将异常发送到每个使用者目标的错误通道,也可以配置为将异步生产者发送失败发送到错误通道。 有关详细信息,请参阅此部分有关错误处理。spring-doc.cadn.net.cn

的有效负载ErrorMessage对于发送失败,是KafkaSendFailureException具有属性:spring-doc.cadn.net.cn

没有自动处理生产者异常(例如发送到死信队列)。 您可以在自己的 Spring Integration 流中使用这些异常。spring-doc.cadn.net.cn

1.6. Kafka 指标

Kafka binder 模块公开以下指标:spring-doc.cadn.net.cn

spring.cloud.stream.binder.kafka.offset:此指标指示给定使用者组尚未从给定 Binder 的主题中使用多少消息。 提供的指标基于 Micrometer 库。 活页夹创建KafkaBinderMetricsbean 如果 Micrometer 在类路径上并且应用程序没有提供其他此类 bean。 该指标包含消费者组信息、主题以及与主题上最新偏移量的实际滞后。 此指标对于向 PaaS 平台提供自动缩放反馈特别有用。spring-doc.cadn.net.cn

可以通过在spring.cloud.stream.kafka.binder.metricsNamespace 有关更多信息,请参阅 Kafka Binder 属性部分spring-doc.cadn.net.cn

您可以排除KafkaBinderMetrics从创建必要的基础设施(如使用者)到通过在应用程序中提供以下组件来报告指标。spring-doc.cadn.net.cn

@Component
class NoOpBindingMeters {
	NoOpBindingMeters(MeterRegistry registry) {
		registry.config().meterFilter(
				MeterFilter.denyNameStartsWith(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME));
	}
}

有关如何有选择地抑制仪表的更多详细信息,请参见此处spring-doc.cadn.net.cn

1.7. 逻辑删除记录(空记录值)

使用压缩主题时,带有null值(也称为逻辑删除记录)表示删除键。 要在 Spring Cloud Stream 函数中接收此类消息,您可以使用以下策略。spring-doc.cadn.net.cn

@Bean
public Function<Message<Person>, String> myFunction() {
    return value -> {
        Object v = value.getPayload();
        String className = v.getClass().getName();
        if (className.isEqualTo("org.springframework.kafka.support.KafkaNull")) {
            // this is a tombstone record
        }
        else {
            // continue with processing
        }
    };
}

1.8. 使用 KafkaBindingRebalanceListener

应用程序可能希望在最初分配分区时将主题/分区查找到任意偏移量,或者对使用者执行其他作。 从 2.1 版开始,如果您提供单个KafkaBindingRebalanceListenerbean 中,它将连接到所有 Kafka 消费者绑定中。spring-doc.cadn.net.cn

public interface KafkaBindingRebalanceListener {

	/**
	 * Invoked by the container before any pending offsets are committed.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 */
	default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions) {

	}

	/**
	 * Invoked by the container after any pending offsets are committed.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 */
	default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {

	}

	/**
	 * Invoked when partitions are initially assigned or after a rebalance.
	 * Applications might only want to perform seek operations on an initial assignment.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
			boolean initial) {

	}

}

您无法将resetOffsetsconsumer 属性设置为true当您提供重新平衡侦听器时。spring-doc.cadn.net.cn

1.9. 重试和死信处理

默认情况下,当您配置重试(例如maxAttemts) 和enableDlq在使用者绑定中,这些功能在绑定器中执行,侦听器容器或 Kafka 使用者不参与。spring-doc.cadn.net.cn

在某些情况下,最好将此功能移动到侦听器容器,例如:spring-doc.cadn.net.cn

要配置将此功能从活页夹移动到容器,请定义一个@Bean类型ListenerContainerWithDlqAndRetryCustomizer. 该接口具有以下方法:spring-doc.cadn.net.cn

/**
 * Configure the container.
 * @param container the container.
 * @param destinationName the destination name.
 * @param group the group.
 * @param dlqDestinationResolver a destination resolver for the dead letter topic (if
 * enableDlq).
 * @param backOff the backOff using retry properties (if configured).
 * @see #retryAndDlqInBinding(String, String)
 */
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
        @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
        @Nullable BackOff backOff);

/**
 * Return false to move retries and DLQ from the binding to a customized error handler
 * using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
 * configured via
 * {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
 * @param destinationName the destination name.
 * @param group the group.
 * @return false to disable retries and DLQ in the binding
 */
default boolean retryAndDlqInBinding(String destinationName, String group) {
    return true;
}

目标解析器和BackOff是根据绑定属性(如果已配置)创建的。这KafkaTemplate使用来自spring.kafka…​.性能。然后,您可以使用它们创建自定义错误处理程序和死信发布器;例如:spring-doc.cadn.net.cn

@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
    return new ListenerContainerWithDlqAndRetryCustomizer() {

        @Override
        public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
                String group,
                @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
                @Nullable BackOff backOff) {

            if (destinationName.equals("topicWithLongTotalRetryConfig")) {
                ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
                        dlqDestinationResolver);
                container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
            }
        }

        @Override
        public boolean retryAndDlqInBinding(String destinationName, String group) {
            return !destinationName.contains("topicWithLongTotalRetryConfig");
        }

    };
}

现在,只需一次重试延迟大于消费者的延迟max.poll.interval.ms财产。spring-doc.cadn.net.cn

当使用多个 Binder 时,'ListenerContainerWithDlqAndRetryCustomizer' bean 将被 'DefaultBinderFactory' 覆盖。对于豆子 要应用,您需要使用 'BinderCustomizer' 来设置容器定制器(参见 [binder-customizer]):spring-doc.cadn.net.cn

@Bean
public BinderCustomizer binderCustomizer(ListenerContainerWithDlqAndRetryCustomizer containerCustomizer) {
    return (binder, binderName) -> {
        if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
            kafkaMessageChannelBinder.setContainerCustomizer(containerCustomizer);
        }
        else if (binder instanceof KStreamBinder) {
            ...
        }
        else if (binder instanceof RabbitMessageChannelBinder) {
            ...
        }
    };
}

1.10. 自定义消费者和生产者配置

如果要对用于创建的使用者和生产者配置进行高级定制ConsumerFactoryProducerFactory在卡夫卡中, 您可以实现以下定制器。spring-doc.cadn.net.cn

这两个接口都提供了一种配置用于消费者和生产者属性的配置映射的方法。 例如,如果您想访问在应用程序级别定义的 Bean,则可以在configure方法。 当 Binder 发现这些定制器可以作为 bean 使用时,它将调用configure方法。spring-doc.cadn.net.cn

这两个接口还提供对绑定名称和目标名称的访问,以便在自定义生产者和使用者属性时可以访问它们。spring-doc.cadn.net.cn

1.11. 自定义 AdminClient 配置

与上面的消费者和生产者配置自定义一样,应用程序还可以通过提供AdminClientConfigCustomizer. AdminClientConfigCustomizer 的 configure 方法提供对管理客户端属性的访问,您可以使用它定义进一步的自定义。 Binder 的 Kafka 主题配置器为通过此定制器提供的属性提供了最高优先级。 下面是提供此定制器 bean 的示例。spring-doc.cadn.net.cn

@Bean
public AdminClientConfigCustomizer adminClientConfigCustomizer() {
    return props -> {
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    };
}

1.12. 自定义 Kafka Binder 健康指标

当 Spring Boot 执行器位于类路径上时,Kafka binder 会激活默认的运行状况指示器。 此运行状况指示器检查绑定器的运行状况以及与 Kafka 代理的任何通信问题。 如果应用程序想要禁用此默认运行状况检查实现并包含自定义实现,则它可以提供KafkaBinderHealth接口。KafkaBinderHealth是一个标记接口,从HealthIndicator. 在自定义实现中,它必须为health()方法。 自定义实现必须作为 Bean 存在于应用程序配置中。 当 Binder 发现自定义实现时,它将使用它而不是默认实现。 以下是应用程序中此类自定义实现 bean 的示例。spring-doc.cadn.net.cn

@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
    return new KafkaBinderHealth() {
        @Override
        public Health health() {
            // custom implementation details.
        }
    };
}

1.13. 死信主题处理

1.13.1. 死信主题分区选择

默认情况下,记录使用与原始记录相同的分区发布到死信主题。 这意味着死信主题必须至少具有与原始记录一样多的分区。spring-doc.cadn.net.cn

要更改此行为,请添加一个DlqPartitionFunction实现为@Bean到应用程序上下文。 只能存在一个这样的豆子。 该函数是随消费者组提供的,失败的ConsumerRecord和例外。 例如,如果您始终想要路由到分区 0,则可以使用:spring-doc.cadn.net.cn

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}
如果将使用者绑定的dlqPartitions属性设置为 1(以及活页夹的minPartitionCount等于1),无需提供DlqPartitionFunction;框架将始终使用分区 0。 如果将使用者绑定的dlqPartitions属性设置为大于1(或活页夹的minPartitionCount大于1),您必须提供一个DlqPartitionFunctionbean,即使分区计数与原始主题的分区计数相同。

还可以为 DLQ 主题定义自定义名称。 为此,请创建DlqDestinationResolver作为@Bean到应用程序上下文。 当 binder 检测到这样的 bean 时,优先,否则它将使用dlqName财产。 如果都未找到,则默认为error.<destination>.<group>. 下面是一个示例DlqDestinationResolver作为@Bean.spring-doc.cadn.net.cn

@Bean
public DlqDestinationResolver dlqDestinationResolver() {
    return (rec, ex) -> {
        if (rec.topic().equals("word1")) {
            return "topic1-dlq";
        }
        else {
            return "topic2-dlq";
        }
    };
}

在提供实现时要记住的一件重要事情DlqDestinationResolver是 binder 中的 provisioner 不会为应用程序自动创建主题。 这是因为活页夹无法推断出实现可能发送到的所有 DLQ 主题的名称。 因此,如果使用此策略提供 DLQ 名称,则应用程序有责任确保事先创建这些主题。spring-doc.cadn.net.cn

1.13.2. 处理死信主题中的记录

由于该框架无法预测用户希望如何处理死信邮件,因此它没有提供任何标准机制来处理它们。 如果死信的原因是暂时的,您可能希望将邮件路由回原始主题。 但是,如果问题是永久性问题,则可能会导致无限循环。 本主题中的示例 Spring Boot 应用程序是如何将这些消息路由回原始主题的示例,但它在尝试三次后将它们移动到“停车场”主题。 该应用程序是另一个从死信主题中读取的 spring-cloud-stream 应用程序。 当 5 秒内未收到消息时,它将退出。spring-doc.cadn.net.cn

这些示例假定原始目标为so8400out消费群体是so8400.spring-doc.cadn.net.cn

有几种策略需要考虑:spring-doc.cadn.net.cn

  • 考虑仅在主应用程序未运行时运行重新路由。否则,暂时性错误的重试很快就会用完。spring-doc.cadn.net.cn

  • 或者,使用两阶段方法:使用此应用程序路由到第三个主题,另一个应用程序从那里路由回主主题。spring-doc.cadn.net.cn

以下代码列表显示了示例应用程序:spring-doc.cadn.net.cn

应用程序属性
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot

spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest

spring.cloud.stream.kafka.binder.headers=x-retries
应用
@SpringBootApplication
public class ReRouteDlqKApplication implements CommandLineRunner {

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) {
        SpringApplication.run(ReRouteDlqKApplication.class, args).close();
    }

    private final AtomicInteger processed = new AtomicInteger();

    @Autowired
    private StreamBridge streamBridge;

    @Bean
    public Function<Message<?>, Message<?>> reRoute() {
        return failed -> {
            processed.incrementAndGet();
            Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
            if (retries == null) {
                System.out.println("First retry for " + failed);
                return MessageBuilder.fromMessage(failed)
                        .setHeader(X_RETRIES_HEADER, 1)
                        .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                        .build();
            }
            else if (retries < 3) {
                System.out.println("Another retry for " + failed);
                return MessageBuilder.fromMessage(failed)
                        .setHeader(X_RETRIES_HEADER, retries + 1)
                        .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                        .build();
            }
            else {
                System.out.println("Retries exhausted for " + failed);
                streamBridge.send("parkingLot", MessageBuilder.fromMessage(failed)
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build());
            }
            return null;
        };
    }

    @Override
    public void run(String... args) throws Exception {
        while (true) {
            int count = this.processed.get();
            Thread.sleep(5000);
            if (count == this.processed.get()) {
                System.out.println("Idle, exiting");
                return;
            }
        }
    }
}

1.14. 使用 Kafka Binder 进行分区

Apache Kafka 原生支持主题分区。spring-doc.cadn.net.cn

有时将数据发送到特定分区是有利的 — 例如,当您想要严格排序消息处理时(特定客户的所有消息都应转到同一分区)。spring-doc.cadn.net.cn

以下示例展示了如何配置生产者端和消费者端:spring-doc.cadn.net.cn

@SpringBootApplication
public class KafkaPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "foo1", "bar1", "qux1",
            "foo2", "bar2", "qux2",
            "foo3", "bar3", "qux3",
            "foo4", "bar4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Supplier<Message<?>> generate() {
        return () -> {
            String value = data[RANDOM.nextInt(data.length)];
            System.out.println("Sending: " + value);
            return MessageBuilder.withPayload(value)
                    .setHeader("partitionKey", value)
                    .build();
        };
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        generate-out-0:
          destination: partitioned.topic
          producer:
            partition-key-expression: headers['partitionKey']
            partition-count: 12
重要的是要记住,由于 Apache Kafka 本机支持分区,因此无需依赖上述 Binder 分区,除非您使用示例中的自定义分区键或涉及有效负载本身的表达式。 活页夹提供的分区选择适用于不支持本机分区的中间件技术。 请注意,我们使用的是一个名为partitionKey在上面的示例中,这将是分区的决定因素,因此在这种情况下,使用活页夹分区是合适的。 使用本机 Kafka 分区时,即当您不提供partition-key-expression,则 Apache Kafka 将选择一个分区,默认情况下,该分区将是可用分区数的记录键的哈希值。 要向出站记录添加键,请将KafkaHeaders.KEYheader 添加到 spring-messaging 中所需的键值Message<?>. 默认情况下,当未提供记录键时,Apache Kafka 将根据 Apache Kafka 文档中描述的逻辑选择分区。
必须预配主题以具有足够的分区,以便为所有使用者组实现所需的并发性。 上述配置最多支持 12 个消费者实例(如果其concurrency是 2,如果它们的并发性为 3,则为 4,依此类推)。 通常最好“过度预配”分区,以允许将来增加使用者或并发性。
上述配置使用默认分区 (key.hashCode() % partitionCount). 这可能会也可能不会提供适当平衡的算法,具体取决于键值。特别请注意,此分区策略与独立 Kafka 生产者使用的默认策略(例如 Kafka Streams 使用的默认策略)不同,这意味着当这些客户端生成时,相同的键值可能会在分区之间以不同的方式平衡。 您可以使用partitionSelectorExpressionpartitionSelectorClass性能。

由于分区由 Kafka 原生处理,因此消费者端不需要特殊配置。 Kafka 在实例之间分配分区。spring-doc.cadn.net.cn

kafka 主题的 partitionCount 可能会在运行时更改(例如,由于管理任务)。 之后计算的分区将有所不同(例如,届时将使用新分区)。 从 Spring Cloud Stream 的 4.0.3 开始,将支持分区计数的更改。 另请参阅参数“spring.kafka.producer.properties.metadata.max.age.ms”以配置更新间隔。 由于某些限制,无法使用引用消息的“有效负载”的“partition-key-expression”,在这种情况下,该机制将被禁用。 默认情况下,整体行为处于禁用状态,可以使用配置参数“producer.dynamicPartitionUpdatesEnabled=true”启用。

以下 Spring Boot 应用程序监听 Kafka 流并打印(到控制台)每条消息转到的分区 ID:spring-doc.cadn.net.cn

@SpringBootApplication
public class KafkaPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
            .web(WebApplicationType.NONE)
            .run(args);
    }

    @Bean
    public Consumer<Message<String>> listen() {
        return message -> {
            int partition = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION);
            System.out.println(message + " received from partition " + partition);
        };
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        listen-in-0:
          destination: partitioned.topic
          group: myGroup

您可以根据需要添加实例。 Kafka 重新平衡分区分配。 如果实例计数(或instance count * concurrency) 超过分区数,部分消费者处于空闲状态。spring-doc.cadn.net.cn

2. 响应式 Kafka 活页夹

Spring Cloud Stream 中的 Kafka 绑定器提供了一个基于 Reactor Kafka 项目的专用响应式绑定器。这个响应式 Kafka 绑定器在基于 Apache Kafka 的应用程序中启用了完整的端到端响应式功能,例如背压、响应式流等。当您的 Spring Cloud Stream Kafka 应用程序使用响应式类型(Flux,Mono等),建议使用这个响应式 Kafka 绑定器,而不是常规的基于消息通道的 Kafka 绑定器。spring-doc.cadn.net.cn

2.1. Maven 坐标

以下是响应式 Kafka 绑定器的 maven 坐标。spring-doc.cadn.net.cn

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka-reactive</artifactId>
</dependency>

2.2. 使用响应式 Kafka Binder 的基本示例

在本节中,我们将展示一些使用响应式绑定器编写响应式 Kafka 应用程序的基本代码片段以及围绕它们的详细信息。spring-doc.cadn.net.cn

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
    return s -> s.map(String::toUpperCase);
}

您可以使用上述upppercase函数与基于消息通道的 Kafka 绑定器 (spring-cloud-stream-binder-kafka)以及反应性 Kafka 绑定剂 (spring-cloud-stream-binder-kafka-reactive),本节讨论的主题。 当将此函数与常规 Kafka 绑定器一起使用时,尽管您在应用程序中使用响应式类型(即,在uppercase函数),您只能在函数执行过程中获得响应式流。 在函数的执行上下文之外,没有响应式好处,因为底层绑定器不是基于响应式堆栈。 因此,尽管这看起来像是带来了完整的端到端响应式堆栈,但此应用程序仅具有部分响应性。spring-doc.cadn.net.cn

现在假设您正在为 Kafka 使用适当的响应式绑定器 -spring-cloud-stream-binder-kafka-reactive与上述函数的应用。 这种 binder 实现将提供从顶端消费到链底端发布的全部响应式好处。 这是因为底层绑定器是建立在 Reactor Kafka 的核心 API 之上的。 在消费者方面,它利用了 KafkaReceiver,它是 Kafka 消费者的响应式实现。 同样,在生产者端,它使用 KafkaSender API,它是 Kafka 生产者的响应式实现。 由于响应式 Kafka 绑定器的基础是建立在适当的响应式 Kafka API 之上的,因此应用程序可以获得使用响应式技术的全部好处。 使用这个响应式 Kafka 绑定器时,应用程序内置了自动背压等响应式功能。spring-doc.cadn.net.cn

从 4.0.2 版开始,您可以自定义ReceiverOptionsSenderOptions通过提供一个或多个ReceiverOptionsCustomizerSenderOptionsCustomizerBeans。 他们是BiFunctions 接收绑定名称和初始选项,返回自定义选项。 接口扩展Ordered因此,当存在多个定制器时,将按所需的顺序应用定制器。spring-doc.cadn.net.cn

默认情况下,活页夹不提交偏移量。 从 4.0.2 版开始,KafkaHeaders.ACKNOWLEDGMENT标头包含一个ReceiverOffset对象,它允许您通过调用其acknowledge()commit()方法。
@Bean
public Consumer<Flux<Message<String>> consume() {
    return msg -> {
        process(msg.getPayload());
        msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
    }
}

请参阅reactor-kafka文档和 JavaDocs 了解更多信息。spring-doc.cadn.net.cn

此外,从 4.0.3 版本开始,Kafka 消费者属性reactiveAtmostOnce可以设置为true并且 Binder 将在处理每次轮询返回的记录之前自动提交偏移量。 此外,从版本 4.0.3 开始,您可以设置 consumer 属性reactiveAutoCommittrue并且 binder 将在处理每次轮询返回的记录后自动提交偏移量。 在这些情况下,确认标头不存在。spring-doc.cadn.net.cn

4.0.2 也提供reactiveAutoCommit,但实现不正确,它的行为类似于reactiveAtMostOnce.

下面是一个如何使用的示例reaciveAutoCommit.spring-doc.cadn.net.cn

@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
	return flux -> flux
			.doOnNext(inner -> inner
				.doOnNext(val -> {
					log.info(val.value());
				})
				.subscribe())
			.subscribe();
}

请注意reactor-kafka返回一个Flux<Flux<ConsumerRecord<?, ?>>>使用自动提交时。 鉴于 Spring 无法访问内部通量的内容,应用程序必须处理本机ConsumerRecord;没有对内容应用消息转换或转换服务。 这需要使用本机解码(通过指定Deserializer配置中适当类型的)返回所需类型的记录键/值。spring-doc.cadn.net.cn

2.3. 以原始格式使用记录

在上面upppercase函数,我们将记录用作Flux<String>然后将其生成为Flux<String>. 在某些情况下,您可能需要以原始接收格式接收记录 -ReceiverRecord. 这是这样一个功能。spring-doc.cadn.net.cn

@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
    return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}

请注意,在此函数中,我们将记录用作Flux<ReceiverRecord<byte[], byte[]>>然后将其生成为Flux<String>.ReceiverRecord是基本接收记录,它是专门的 KafkaConsumerRecord在卡夫卡反应堆中。 当使用响应式 Kafka 绑定器时,上述函数将允许您访问ReceiverRecord类型。 但是,在这种情况下,您需要为 RecordMessageConverter 提供自定义实现。 默认情况下,响应式 Kafka 绑定器使用 MessagingMessageConverter 将有效负载和标头从ConsumerRecord. 因此,当您的处理程序方法收到它时,有效负载已经从接收到的记录中提取并传递到该方法中,就像我们上面看到的第一个函数一样。 通过提供自定义RecordMessageConverter实现,您可以覆盖默认行为。 例如,如果要将记录作为原始记录使用Flux<ReceiverRecord<byte[], byte[]>>,则可以在应用程序中提供以下 bean 定义。spring-doc.cadn.net.cn

@Bean
RecordMessageConverter fullRawReceivedRecord() {
    return new RecordMessageConverter() {

        private final RecordMessageConverter converter = new MessagingMessageConverter();

        @Override
        public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
                Consumer<?, ?> consumer, Type payloadType) {
            return MessageBuilder.withPayload(record).build();
        }

        @Override
        public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
            return this.converter.fromMessage(message, defaultTopic);
        }

    };
}

然后,您需要指示框架使用此转换器进行所需的绑定。 这是一个基于我们的lowercase功能。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"

lowercase-in-0是我们的lowercase功能。 对于出站(lowecase-out-0),我们仍然使用常规MessagingMessageConverter.spring-doc.cadn.net.cn

toMessage实现上述,我们收到原始的ConsumerRecord (ReceiverRecord因为我们处于响应式 Binder 上下文中),然后将其包装在Message. 然后,该消息有效负载是ReceiverRecord提供给用户方法。spring-doc.cadn.net.cn

如果reactiveAutoCommitfalse(默认),调用rec.receiverOffset().acknowledge()(或commit()) 导致偏移量被提交;如果reactiveAutoCommittrue,助焊剂供应ConsumerRecords 代替。 请参阅reactor-kafka文档和 JavaDocs 了解更多信息。spring-doc.cadn.net.cn

2.4. 并发

将响应式函数与响应式 Kafka 绑定器一起使用时,如果您在使用者绑定上设置并发性,则绑定器会创建尽可能多的专用KafkaReceiver并发值提供的对象。 换句话说,这会创建多个响应式流,并具有单独的Flux实现。 当您使用分区主题中的记录时,这可能很有用。spring-doc.cadn.net.cn

例如,假设传入主题至少有三个分区。 然后,您可以设置以下属性。spring-doc.cadn.net.cn

spring.cloud.stream.bindings.lowercase-in-0.consumer.concurrency=3

这将创建三个专用的KafkaReceiver生成三个单独的对象Flux实现,然后将它们流式传输到处理程序方法。spring-doc.cadn.net.cn

2.5. 多路复用

从 4.0.3 版开始,通用使用者属性multiplex现在,响应式绑定器支持,其中单个绑定可以从多个主题使用。 什么时候false(默认值),则为公共destination财产。spring-doc.cadn.net.cn

2.6. 目的地是模式

从 4.0.3 版本开始,destination-is-pattern现在支持 Kafka 绑定消费者属性。 接收器选项使用正则表达式Pattern,允许绑定从与模式匹配的任何主题中使用。spring-doc.cadn.net.cn

2.7. 发送方结果通道

从 4.0.3 版开始,您可以配置resultMetadataChannel接收SenderResult<?>s 来确定发送的成功/失败。spring-doc.cadn.net.cn

SenderResult包含correlationMetadata允许您将结果与发送相关联;它还包含RecordMetadata,表示TopicPartition以及已发送记录的偏移量。spring-doc.cadn.net.cn

resultMetadataChannel 必须FluxMessageChannel实例。spring-doc.cadn.net.cn

下面是如何使用此功能的示例,相关元数据类型为Integer:spring-doc.cadn.net.cn

@Bean
FluxMessageChannel sendResults() {
    return new FluxMessageChannel();
}

@ServiceActivator(inputChannel = "sendResults")
void handleResults(SenderResult<Integer> result) {
    if (result.exception() != null) {
        failureFor(result);
    }
    else {
        successFor(result);
    }
}

要在输出记录上设置相关元数据,请将CORRELATION_ID页眉:spring-doc.cadn.net.cn

streamBridge.send("words1", MessageBuilder.withPayload("foobar")
        .setCorrelationId(42)
        .build());

将该功能与Function,则函数输出类型必须是Message<?>将相关 ID 标头设置为所需值。spring-doc.cadn.net.cn

元数据应是唯一的,至少在发送期间是唯一的。spring-doc.cadn.net.cn

3. Kafka Streams 活页夹

3.1. 用法

要使用 Kafka Streams 绑定器,您只需使用以下 Maven 坐标将其添加到 Spring Cloud Stream 应用程序中:spring-doc.cadn.net.cn

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>

为 Kafka Streams 绑定器引导新项目的一种快速方法是使用 Spring Initializr,然后选择“Cloud Streams”和“Spring for Kafka Streams”,如下所示spring-doc.cadn.net.cn

spring initializr kafka 流

3.2. 概述

Spring Cloud Stream 包括一个专为 Apache Kafka Streams 绑定而设计的绑定器实现。 通过这种本机集成,Spring Cloud Stream“处理器”应用程序可以直接在核心业务逻辑中使用 Apache Kafka Streams API。spring-doc.cadn.net.cn

Kafka Streams 绑定器实现建立在 Spring for Apache Kafka 项目提供的基础之上。spring-doc.cadn.net.cn

Kafka Streams 绑定器为 Kafka Streams 中的三种主要类型提供了绑定功能 -KStream,KTableGlobalKTable.spring-doc.cadn.net.cn

Kafka Streams 应用程序通常遵循从入站主题读取记录的模型,应用业务逻辑,然后将转换后的记录写入出站主题。 或者,也可以定义没有出站目标的处理器应用程序。spring-doc.cadn.net.cn

在以下部分中,我们将详细介绍 Spring Cloud Stream 与 Kafka Streams 的集成。spring-doc.cadn.net.cn

3.3. 编程模型

当使用 Kafka Streams binder 提供的编程模型时,高级 Streams DSL 和高级和低级 Processor-API 的混合都可以用作选项。 当混合更高级别和较低级别的 API 时,这通常是通过调用transformprocessAPI 方法KStream.spring-doc.cadn.net.cn

3.3.1. 函数式样式

从 Spring Cloud Stream 开始3.0.0,Kafka Streams 绑定器允许使用 Java 8 中可用的函数式编程风格来设计和开发应用程序。 这意味着应用程序可以简洁地表示为类型的 lambda 表达式java.util.function.Functionjava.util.function.Consumer.spring-doc.cadn.net.cn

让我们举一个非常基本的例子。spring-doc.cadn.net.cn

@SpringBootApplication
public class SimpleConsumerApplication {

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input ->
                input.foreach((key, value) -> {
                    System.out.println("Key: " + key + " Value: " + value);
                });
    }
}

虽然很简单,但这是一个完整的独立 Spring Boot 应用程序,它利用 Kafka Streams 进行流处理。 这是一个没有出站绑定且只有一个入站绑定的使用者应用程序。 应用程序使用数据,它只是记录来自KStream标准输出上的键和值。 该应用程序包含SpringBootApplication注释和标记为Bean. bean 方法的类型为java.util.function.Consumer其中参数化为KStream. 然后在实现中,我们返回一个 Consumer 对象,该对象本质上是一个 lambda 表达式。 在 lambda 表达式中,提供了用于处理数据的代码。spring-doc.cadn.net.cn

在此应用程序中,有一个类型的单个输入绑定KStream. 活页夹为应用程序创建此绑定,名称为process-in-0,即函数 bean name 的名称后跟破折号字符 () 和文字-in后跟另一个破折号,然后是参数的序号位置。 使用此绑定名称可以设置其他属性,例如目标。 例如spring.cloud.stream.bindings.process-in-0.destination=my-topic.spring-doc.cadn.net.cn

如果未在绑定上设置目标属性,那么将创建一个与绑定同名的主题(如果应用程序有足够的权限),或者该主题应该已经可用。

一旦构建为 uber-jar(例如kstream-consumer-app.jar),您可以像下面一样运行上面的示例。spring-doc.cadn.net.cn

如果应用程序选择使用 Spring 的Component注释,则活页夹也支持该模型。 上面的功能 bean 可以重写如下。spring-doc.cadn.net.cn

@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {

    @Override
    public void accept(KStream<Object, String> input) {
        input.foreach((key, value) -> {
            System.out.println("Key: " + key + " Value: " + value);
        });
    }
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic

这是另一个示例,它是一个具有输入和输出绑定的完整处理器。 这是经典的字数计数示例,其中应用程序从主题接收数据,然后在翻转时间窗口中计算每个单词的出现次数。spring-doc.cadn.net.cn

@SpringBootApplication
public class WordCountProcessorApplication {

  @Bean
  public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    return input -> input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, value) -> new KeyValue<>(value, value))
                .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("word-counts-state-store"))
                .toStream()
                .map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))));
  }

	public static void main(String[] args) {
		SpringApplication.run(WordCountProcessorApplication.class, args);
	}
}

同样,这是一个完整的 Spring Boot 应用程序。这里与第一个应用程序的不同之处在于 bean 方法是java.util.function.Function. 第一个参数化类型Function用于输入KStream第二个是输出。 在方法体中,提供了一个 lambda 表达式,该表达式类型为Function作为实现,给出了实际的业务逻辑。 与前面讨论的基于消费者的应用程序类似,此处的输入绑定被命名为process-in-0默认情况下。对于输出,绑定名称也会自动设置为process-out-0.spring-doc.cadn.net.cn

一旦构建为 uber-jar(例如wordcount-processor.jar),您可以像下面一样运行上面的示例。spring-doc.cadn.net.cn

java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts

此应用程序将使用 Kafka 主题中的消息words并将计算结果发布到输出 主题counts.spring-doc.cadn.net.cn

Spring Cloud Stream 将确保来自传入和传出主题的消息自动绑定为 KStream 对象。作为开发人员,您可以专注于代码的业务方面,即编写逻辑 处理器中需要。设置 Kafka Streams 基础架构所需的 Kafka Streams 特定配置 由框架自动处理。spring-doc.cadn.net.cn

我们在上面看到的两个示例有一个KStream输入绑定。在这两种情况下,绑定都从单个主题接收记录。 如果要将多个主题多路复用为一个主题KStreambinding,则可以在下方提供逗号分隔的 Kafka 主题作为目标。spring-doc.cadn.net.cn

spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3spring-doc.cadn.net.cn

此外,如果要将主题与常规 Exresession 进行匹配,还可以将主题模式提供为目标。spring-doc.cadn.net.cn

spring.cloud.stream.bindings.process-in-0.destination=input.*spring-doc.cadn.net.cn

多个输入绑定

许多重要的 Kafka Streams 应用程序通常通过多个绑定使用来自多个主题的数据。 例如,一个主题被消耗为Kstream另一个作为KTableGlobalKTable. 应用程序可能希望将数据作为表类型接收的原因有很多。 考虑一个用例,其中基础主题是通过数据库中的更改数据捕获 (CDC) 机制填充的,或者应用程序可能只关心下游处理的最新更新。 如果应用程序指定数据需要绑定为KTableGlobalKTable,则 Kafka Streams 绑定器会正确地将目标绑定到KTableGlobalKTable并使它们可供应用程序作。 我们将研究几种不同的场景,了解如何在 Kafka Streams 绑定器中处理多个输入绑定。spring-doc.cadn.net.cn

Kafka Streams Binder 中的 BiFunction

这是一个示例,我们有两个输入和一个输出。在这种情况下,应用程序可以利用java.util.function.BiFunction.spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
                    regionWithClicks.getClicks()))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum)
            .toStream());
}

同样,基本主题与前面的示例相同,但这里我们有两个输入。 Java 的BiFunction支持用于将输入绑定到所需的目标。绑定器为输入生成的默认绑定名称是process-in-0process-in-1分别。 默认输出绑定为process-out-0. 在此示例中,第一个参数BiFunction被绑定为KStream对于第一个输入,第二个参数被绑定为KTable对于第二个输入。spring-doc.cadn.net.cn

Kafka Streams Binder 中的 BiConsumer

如果有两个输入,但没有输出,在这种情况下,我们可以使用java.util.function.BiConsumer如下图所示。spring-doc.cadn.net.cn

@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
    return (userClicksStream, userRegionsTable) -> {}
}
超过两个输入

如果您有两个以上的输入怎么办? 在某些情况下,您需要两个以上的输入。在这种情况下,活页夹允许您链接部分函数。 在函数式编程术语中,这种技术通常称为柯里化。 随着 Java 8 中添加的函数式编程支持,Java 现在允许您编写柯里函数。 Spring Cloud Stream Kafka Streams 绑定器可以利用此功能来启用多个输入绑定。spring-doc.cadn.net.cn

让我们看一个例子。spring-doc.cadn.net.cn

@Bean
public Function<KStream<Long, Order>,
        Function<GlobalKTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    return orders -> (
              customers -> (
                    products -> (
                        orders.join(customers,
                            (orderId, order) -> order.getCustomerId(),
                                (order, customer) -> new CustomerOrder(customer, order))
                                .join(products,
                                        (orderId, customerOrder) -> customerOrder
                                                .productId(),
                                        (customerOrder, product) -> {
                                            EnrichedOrder enrichedOrder = new EnrichedOrder();
                                            enrichedOrder.setProduct(product);
                                            enrichedOrder.setCustomer(customerOrder.customer);
                                            enrichedOrder.setOrder(customerOrder.order);
                                            return enrichedOrder;
                                        })
                        )
                )
    );
}

让我们看看上面介绍的绑定模型的细节。在这个模型中,我们在入站上有 3 个部分应用的函数。让我们将它们称为f(x),f(y)f(z). 如果我们在真正的数学函数意义上扩展这些函数,它将如下所示:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>. 这x变量代表KStream<Long, Order>y变量代表GlobalKTable<Long, Customer>z变量代表GlobalKTable<Long, Product>. 第一个功能f(x)具有应用程序的第一个输入绑定 (KStream<Long, Order>),其输出是函数 f(y)。 功能f(y)具有应用程序的第二个输入绑定 (GlobalKTable<Long, Customer>),其输出是另一个函数,f(z). 函数的输入f(z)是应用程序的第三个输入 (GlobalKTable<Long, Product>),其输出为KStream<Long, EnrichedOrder>这是应用程序的最终输出绑定。 来自三个部分函数的输入,即KStream,GlobalKTable,GlobalKTable分别在方法主体中可供您使用,用于将业务逻辑实现为 lambda 表达式的一部分。spring-doc.cadn.net.cn

输入绑定命名为enrichOrder-in-0,enrichOrder-in-1enrichOrder-in-2分别。输出绑定命名为enrichOrder-out-0.spring-doc.cadn.net.cn

使用柯里函数,您几乎可以拥有任意数量的输入。但是,请记住,在 Java 中,超过较少数量的输入和部分应用的函数都可能导致代码不可读。 因此,如果您的 Kafka Streams 应用程序需要的输入绑定数量超过相当少的数量,并且您想使用此函数模型,那么您可能需要重新考虑您的设计并适当地分解应用程序。spring-doc.cadn.net.cn

输出绑定

Kafka Streams 绑定器允许KStreamKTable作为输出绑定。 在幕后,活页夹使用to方法KStream将生成的记录发送到输出主题。 如果应用程序提供KTable作为函数中的输出,Binder 仍然通过委托给to方法KStream.spring-doc.cadn.net.cn

例如,以下两个功能都可以工作:spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}
多个输出绑定

Kafka Streams 允许将出站数据写入多个主题。此功能在 Kafka Streams 中称为分支。 使用多个输出绑定时,需要提供 KStream (KStream[]) 作为出站返回类型。spring-doc.cadn.net.cn

这是一个例子:spring-doc.cadn.net.cn

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {

    Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
    Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
    Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

    return input -> {
        final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .count(Materialized.as("WordCounts-branch"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))))
                .split()
                .branch(isEnglish)
                .branch(isFrench)
                .branch(isSpanish)
                .noDefaultBranch();

        return stringKStreamMap.values().toArray(new KStream[0]);
    };
}

编程模型保持不变,但出站参数化类型为KStream[]. 默认输出绑定名称为process-out-0,process-out-1,process-out-2分别用于上述功能。 binder 之所以会生成三个输出绑定,是因为它检测了返回的KStream数组为三个。 请注意,在此示例中,我们提供了一个noDefaultBranch();如果我们使用defaultBranch()相反,这将需要额外的输出绑定,本质上返回一个KStream长度为四的数组。spring-doc.cadn.net.cn

Kafka 流的基于函数的编程风格摘要

总之,下表显示了可在函数范式中使用的各种选项。spring-doc.cadn.net.cn

输入数 输出数量 要使用的组件

1spring-doc.cadn.net.cn

0spring-doc.cadn.net.cn

java.util.function.消费者spring-doc.cadn.net.cn

2spring-doc.cadn.net.cn

0spring-doc.cadn.net.cn

java.util.function.Bi消费者spring-doc.cadn.net.cn

1spring-doc.cadn.net.cn

1..nspring-doc.cadn.net.cn

java.util.function.函数spring-doc.cadn.net.cn

2spring-doc.cadn.net.cn

1..nspring-doc.cadn.net.cn

java.util.function.Bi函数spring-doc.cadn.net.cn

>= 3spring-doc.cadn.net.cn

0..nspring-doc.cadn.net.cn

使用柯里化函数spring-doc.cadn.net.cn

Kafka Streams 绑定器中的函数组合

Kafka Streams 绑定器支持线性拓扑的最小形式的功能组合。 使用 Java 函数 API 支持,您可以编写多个函数,然后使用andThen方法。 例如,假设您有以下两个函数。spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
    return input -> input.peek((s, s2) -> {});
}

@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
    return input -> input.peek((s, s2) -> {});
}

即使活页夹中没有功能组合支持,也可以按如下方式组合这两个功能。spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
    foo().andThen(bar());
}

然后,您可以提供表单的定义spring.cloud.function.definition=foo;bar;composed. 使用活页夹中的函数组合支持,您无需编写要执行显式函数组合的第三个函数。spring-doc.cadn.net.cn

您可以简单地执行以下作:spring-doc.cadn.net.cn

spring.cloud.function.definition=foo|bar

您甚至可以这样做:spring-doc.cadn.net.cn

spring.cloud.function.definition=foo|bar;foo;bar

在此示例中,组合函数的默认绑定名称将变为foobar-in-0foobar-out-0.spring-doc.cadn.net.cn

Kafka Streams bincer 中功能组合的局限性

当你有java.util.function.Functionbean,可以由另一个函数或多个函数组成。 相同的函数 bean 可以用java.util.function.Consumer也。在这种情况下,消费者是最后一个组成的组件。 一个函数可以由多个函数组成,然后以java.util.function.Consumer豆子也是如此。spring-doc.cadn.net.cn

在组合类型java.util.function.BiFunctionBiFunction必须是定义中的第一个函数。 组成的实体必须是java.util.function.Functionjava.util.funciton.Consumer. 换句话说,您不能将BiFunctionbean 然后与另一个BiFunction.spring-doc.cadn.net.cn

不能使用BiConsumer或定义,其中Consumer是第一个组件。 您也不能使用输出为数组 (KStream[]用于分支),除非这是定义中的最后一个组件。spring-doc.cadn.net.cn

第一个FunctionBiFunction在函数定义中也可以使用柯里形式。 例如,以下情况是可能的。spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
    return a -> b ->
            a.join(b, (value1, value2) -> value1 + value2);
}

@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
    return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}

函数定义可以是curriedFoo|bar. 在后台,绑定器将为柯里函数创建两个输入绑定,以及基于定义中的最终函数的输出绑定。 在这种情况下,默认输入绑定将是curriedFoobar-in-0curriedFoobar-in-1. 此示例的默认输出绑定将变为curriedFoobar-out-0.spring-doc.cadn.net.cn

使用特别注意事项KTable作为函数组合中的输出

假设您有以下两个功能。spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

您可以将它们组合为foo|bar,但请记住,第二个函数 (bar在这种情况下)必须有一个KTable作为输入,因为第一个函数 (foo) 有KTable作为输出。spring-doc.cadn.net.cn

3.4. 编程模型的辅助工具

3.4.1. 单个应用程序中的多个 Kafka Streams 处理器

Binder 允许在单个 Spring Cloud Stream 应用程序中拥有多个 Kafka Streams 处理器。 您可以申请如下。spring-doc.cadn.net.cn

@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
   ...
}

在这种情况下,绑定器将创建 3 个具有不同应用程序 ID 的单独 Kafka Streams 对象(更多内容见下文)。 但是,如果应用程序中有多个处理器,则必须告诉 Spring Cloud Stream 需要激活哪些功能。 以下是激活这些功能的方法。spring-doc.cadn.net.cn

spring.cloud.function.definition: process;anotherProcess;yetAnotherProcessspring-doc.cadn.net.cn

如果您不希望立即激活某些功能,可以将其从此列表中删除。spring-doc.cadn.net.cn

当您拥有单个 Kafka Streams 处理器和其他类型的Function同一应用程序中的 bean,通过不同的绑定器处理(例如,基于常规 Kafka 消息通道绑定器的函数 bean)spring-doc.cadn.net.cn

3.4.2. Kafka Streams 应用程序 ID

应用程序 ID 是您需要为 Kafka Streams 应用程序提供的必填属性。 Spring Cloud Stream Kafka Streams 绑定器允许您以多种方式配置此应用程序 ID。spring-doc.cadn.net.cn

如果应用程序中只有一个处理器,则可以使用以下属性在绑定器级别进行设置:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.applicationId.spring-doc.cadn.net.cn

为了方便起见,如果您只有一个处理器,您还可以使用spring.application.name作为属性来委托应用程序 ID。spring-doc.cadn.net.cn

如果应用程序中有多个 Kafka Streams 处理器,则需要为每个处理器设置应用程序 ID。 对于函数模型,您可以将其作为属性附加到每个函数。spring-doc.cadn.net.cn

例如,假设您有以下功能。spring-doc.cadn.net.cn

@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
   ...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

然后,您可以使用以下活页夹级别属性为每个应用程序设置应用程序 ID。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.functions.process.applicationIdspring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationIdspring-doc.cadn.net.cn

对于基于函数的模型,这种在绑定级别设置应用程序 ID 的方法也将有效。但是,如果您使用的是功能模型,则在绑定器级别设置每个函数会容易得多,如我们在上面看到的那样。spring-doc.cadn.net.cn

对于生产部署,强烈建议通过配置显式指定应用程序 ID。如果您要自动扩展应用程序,这尤其重要,在这种情况下,您需要确保使用相同的应用程序 ID 部署每个实例。spring-doc.cadn.net.cn

如果应用程序未提供应用程序 ID,则在这种情况下,活页夹将自动为您生成静态应用程序 ID。 这在开发方案中很方便,因为它避免了显式提供应用程序 ID 的需要。 以这种方式生成的应用程序 ID 将在应用程序重启时保持静态。 对于功能模型,生成的应用程序 ID 将是函数 bean 名称,后跟文字applicationID,例如:process-applicationID如果process如果函数 bean 名称。spring-doc.cadn.net.cn

设置应用程序 ID 的摘要
  • 默认情况下,binder 将根据函数方法自动生成应用程序 ID。spring-doc.cadn.net.cn

  • 如果您有单个处理器,则可以使用spring.kafka.streams.applicationId,spring.application.namespring.cloud.stream.kafka.streams.binder.applicationId.spring-doc.cadn.net.cn

  • 如果您有多个处理器,则可以使用属性 -spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId.spring-doc.cadn.net.cn

3.4.3. 使用函数式样式覆盖绑定器生成的默认绑定名称

默认情况下,绑定器在使用函数式样式时使用上面讨论的策略来生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。 如果要覆盖这些绑定名称,可以通过指定以下属性来实现。spring-doc.cadn.net.cn

spring.cloud.stream.function.bindings.<default binding name>.默认绑定名称是活页夹生成的原始绑定名称。spring-doc.cadn.net.cn

例如,假设您有这个函数。spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

Binder 将生成带有名称的绑定,process-in-0,process-in-1process-out-0. 现在,如果您想将它们完全更改为其他内容,也许是更多特定于域的绑定名称,那么您可以按如下方式进行作。spring-doc.cadn.net.cn

spring.cloud.stream.function.bindings.process-in-0=usersspring-doc.cadn.net.cn

spring.cloud.stream.function.bindings.process-in-0=regionsspring-doc.cadn.net.cn

spring.cloud.stream.function.bindings.process-out-0=clicksspring-doc.cadn.net.cn

之后,必须在这些新绑定名称上设置所有绑定级别属性。spring-doc.cadn.net.cn

请记住,使用上述函数式编程模型,在大多数情况下,遵循默认绑定名称是有意义的。 您可能仍希望执行此重写的唯一原因是,当您具有大量配置属性并且想要将绑定映射到更域友好的内容时。spring-doc.cadn.net.cn

3.4.4. 设置引导服务器配置

运行 Kafka Streams 应用程序时,必须提供 Kafka 代理服务器信息。 如果您不提供此信息,则 Binder 希望您以默认值运行代理localhost:9092. 如果不是这种情况,那么您需要覆盖它。有几种方法可以做到这一点。spring-doc.cadn.net.cn

当涉及到 binder 级别属性时,是否使用通过常规 Kafka binder 提供的 broker 属性并不重要 -spring.cloud.stream.kafka.binder.brokers. Kafka Streams 绑定器将首先检查是否设置了 Kafka Streams 绑定器特定的代理属性 (spring.cloud.stream.kafka.streams.binder.brokers),如果未找到,则查找spring.cloud.stream.kafka.binder.brokers.spring-doc.cadn.net.cn

3.5. 记录序列化和反序列化

Kafka Streams 绑定器允许您以两种方式序列化和反序列化记录。 一个是Kafka提供的原生序列化和反序列化工具,另一个是Spring Cloud Stream框架的消息转换能力。 让我们看看一些细节。spring-doc.cadn.net.cn

3.5.1. 入站反序列化

密钥始终使用本机 Serdes 进行反序列化。spring-doc.cadn.net.cn

对于值,默认情况下,入站的反序列化由 Kafka 本机执行。 请注意,这是对以前版本的 Kafka Streams 绑定程序的默认行为的重大更改,其中反序列化是由框架完成的。spring-doc.cadn.net.cn

Kafka Streams 绑定器将尝试推断匹配Serde类型通过查看java.util.function.Function|Consumer. 这是它与 Serdes 匹配的顺序。spring-doc.cadn.net.cn

  • 如果应用程序提供类型为Serde如果返回类型使用传入键或值类型的实际类型进行参数化,则它将使用Serde用于入站反序列化。 例如,如果应用程序中有以下内容,则活页夹会检测到KStream与在Serde豆。 它将用于入站反序列化。spring-doc.cadn.net.cn

@Bean
public Serde<Foo> customSerde() {
 ...
}

@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
  • 接下来,它查看类型,看看它们是否是 Kafka Streams 公开的类型之一。如果是这样,请使用它们。 以下是绑定器将尝试从 Kafka Streams 匹配的 Serde 类型。spring-doc.cadn.net.cn

    Integer, Long, Short, Double, Float, byte[], UUID and String.
  • 如果 Kafka Streams 提供的 Serdes 都不匹配类型,那么它将使用 Spring Kafka 提供的 JsonSerde。在这种情况下,绑定器假定类型是 JSON 友好的。 如果您有多个值对象作为输入,这很有用,因为绑定器会在内部将它们推断为正确的 Java 类型。 在回退到JsonSerde不过,活页夹会以默认值Serdes 设置,以查看它是否是Serde它可以与传入的 KStream 类型匹配。spring-doc.cadn.net.cn

如果上述策略均无效,则应用程序必须提供Serde通过配置。 这可以通过两种方式进行配置 - 绑定或默认。spring-doc.cadn.net.cn

首先,活页夹将查看Serde在绑定级别提供。 例如,如果您有以下处理器,spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}

然后,您可以提供绑定级别Serde使用以下内容:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您提供Serde作为每个输入绑定的 abover,那么这将具有更高的优先级,并且绑定器将远离任何Serde推理。

如果您希望将默认键/值 Serdes 用于入站反序列化,则可以在 Binder 级别执行此作。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

如果你不想要 Kafka 提供的原生解码,可以依赖 Spring Cloud Stream 提供的消息转换功能。由于原生解码是默认的,为了让 Spring Cloud Stream 对入站值对象进行反序列化,你需要显式禁用原生解码。spring-doc.cadn.net.cn

例如,如果您拥有与上述相同的 BiFunction 处理器,则spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false您需要单独禁用所有输入的本机解码。否则,本机解码仍将应用于您未禁用的输入。spring-doc.cadn.net.cn

默认情况下,Spring Cloud Stream 将使用application/json作为内容类型,并使用适当的 JSON 消息转换器。 可以使用以下属性和适当的MessageConverter豆。spring-doc.cadn.net.cn

spring.cloud.stream.bindings.process-in-0.contentType

3.5.2. 出站序列化

出站序列化几乎遵循与上述入站反序列化相同的规则。 与入站反序列化一样,与以前版本的 Spring Cloud Stream 相比,一个主要变化是出站的序列化由 Kafka 本机处理。 在 3.0 版本的 binder 之前,这是由框架本身完成的。spring-doc.cadn.net.cn

出站上的密钥始终由 Kafka 使用匹配Serde这是由活页夹推断出来的。 如果它无法推断出键的类型,则需要使用配置来指定。spring-doc.cadn.net.cn

值 serde 使用用于入站反序列化的相同规则进行推断。 首先,它匹配以查看出站类型是否来自应用程序中提供的 bean。 如果没有,它会检查它是否与Serde被卡夫卡暴露,例如 -Integer,Long,Short,Double,Float,byte[],UUIDString. 如果这不起作用,那么它就会回退到JsonSerde由 Spring Kafka 项目提供,但先看默认的Serde配置以查看是否存在匹配项。 请记住,所有这些都对应用程序透明地发生。 如果这些都不起作用,则用户必须提供Serde按配置使用。spring-doc.cadn.net.cn

假设您正在使用相同的BiFunction处理器,如上所述。然后,您可以按如下方式配置出站键/值 Serdes。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

如果 Serde 推理失败,并且没有提供绑定级别 Serdes,则绑定器将回退到JsonSerde,但请查看默认的 Serdes 以进行匹配。spring-doc.cadn.net.cn

默认 serde 的配置方式与上述反序列化下描述的相同。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serdespring-doc.cadn.net.cn

如果应用程序使用分支功能并具有多个输出绑定,则必须按绑定配置这些绑定。 同样,如果活页夹能够推断出Serde类型,则无需执行此配置。spring-doc.cadn.net.cn

如果您不希望 Kafka 提供的本机编码,但想使用框架提供的消息转换,那么您需要显式禁用本机编码,因为本机编码是默认的。 例如,如果您拥有与上述相同的 BiFunction 处理器,则spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false在分支的情况下,您需要单独禁用所有输出的本机编码。否则,本机编码仍将应用于您未禁用的那些。spring-doc.cadn.net.cn

当 Spring Cloud Stream 完成转换时,默认情况下,它将使用application/json作为内容类型,并使用适当的 JSON 消息转换器。可以使用以下属性和相应的MessageConverter豆。spring-doc.cadn.net.cn

spring.cloud.stream.bindings.process-out-0.contentType

禁用本机编码/解码时,binder 不会像本机 Serdes 那样进行任何推理。应用程序需要显式提供所有配置选项。因此,通常建议在编写 Spring Cloud Stream Kafka Streams 应用程序时保留反序列化的默认选项,并坚持使用 Kafka Streams 提供的本机反序列化。您必须使用框架提供的消息转换功能的一种场景是,当您的上游生产者使用特定的序列化策略时。在这种情况下,您需要使用匹配的反序列化策略,因为本机机制可能会失败。当依赖默认Serde机制,应用程序必须确保活页夹有一条前进的道路,以正确的方式正确映射入站和出站Serde,否则事情可能会失败。spring-doc.cadn.net.cn

值得一提的是,上面概述的数据反序列化方法仅适用于处理器的边缘,即入站和出站。 您的业务逻辑可能仍需要调用显式需要的 Kafka Streams APISerde对象。 这些仍然是应用程序的责任,必须由开发人员相应地处理。spring-doc.cadn.net.cn

3.6. 错误处理

Apache Kafka Streams 提供了本机处理反序列化错误异常的功能。 有关此支持的详细信息,请参阅。 开箱即用,Apache Kafka Streams 提供了两种反序列化异常处理程序 -LogAndContinueExceptionHandlerLogAndFailExceptionHandler. 顾名思义,前者将记录错误并继续处理下一条记录,后者将记录错误并失败。LogAndFailExceptionHandler是默认的反序列化异常处理程序。spring-doc.cadn.net.cn

3.6.1. 在 Binder 中处理反序列化异常

Kafka Streams 绑定器允许使用以下属性指定上述反序列化异常处理程序。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail

除了上述两个反序列化异常处理程序外,绑定器还提供了第三个处理程序,用于将错误记录(毒丸)发送到 DLQ(死信队列)主题。 下面是启用此 DLQ 异常处理程序的方法。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq

设置上述属性后,反序列化错误中的所有记录都会自动发送到 DLQ 主题。spring-doc.cadn.net.cn

您可以设置发布 DLQ 消息的主题名称,如下所示。spring-doc.cadn.net.cn

您可以为DlqDestinationResolver这是一个功能接口。DlqDestinationResolver需要ConsumerRecord并将异常作为输入,然后允许指定主题名称作为输出。 通过访问 KafkaConsumerRecord,则可以在BiFunction.spring-doc.cadn.net.cn

下面是一个提供实现的示例DlqDestinationResolver.spring-doc.cadn.net.cn

@Bean
public DlqDestinationResolver dlqDestinationResolver() {
    return (rec, ex) -> {
        if (rec.topic().equals("word1")) {
            return "topic1-dlq";
        }
        else {
            return "topic2-dlq";
        }
    };
}

在提供实现时要记住的一件重要事情DlqDestinationResolver是 binder 中的 provisioner 不会为应用程序自动创建主题。 这是因为活页夹无法推断出实现可能发送到的所有 DLQ 主题的名称。 因此,如果使用此策略提供 DLQ 名称,则应用程序有责任确保事先创建这些主题。spring-doc.cadn.net.cn

如果DlqDestinationResolver作为 bean 存在于应用程序中,具有更高的优先级。 如果不想遵循此方法,而是使用配置提供静态 DLQ 名称,则可以设置以下属性。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)

如果设置了此设置,则错误记录将发送到主题custom-dlq. 如果应用程序未使用上述任何一种策略,则它将创建一个名称为error.<input-topic-name>.<application-id>. 例如,如果绑定的目标主题是inputTopic应用程序 ID 为process-applicationId,则默认的 DLQ 主题为error.inputTopic.process-applicationId. 如果您打算启用 DLQ,则始终建议为每个输入绑定显式创建 DLQ 主题。spring-doc.cadn.net.cn

3.6.2. 每个输入消费者绑定的DLQ

该物业spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler适用于整个应用程序。这意味着,如果同一应用程序中有多个函数,则此属性将应用于所有函数。但是,如果单个处理器中有多个处理器或多个输入绑定,则可以使用绑定程序为每个输入使用者绑定提供的更细粒度的 DLQ 控件。spring-doc.cadn.net.cn

如果您有以下处理器,spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

并且您只想在第一个输入绑定上启用 DLQ 并在第二个绑定上启用 skipAndContinue,然后您可以在使用者上执行以下作。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinuespring-doc.cadn.net.cn

以这种方式设置反序列化异常处理程序的优先级高于在绑定程序级别设置的优先级。spring-doc.cadn.net.cn

3.6.3. DLQ 分区

默认情况下,记录使用与原始记录相同的分区发布到死信主题。 这意味着死信主题必须至少具有与原始记录一样多的分区。spring-doc.cadn.net.cn

要更改此行为,请添加一个DlqPartitionFunction实现为@Bean到应用程序上下文。 只能存在一个这样的豆子。 该函数与消费者组一起提供(在大多数情况下与应用程序 ID 相同),失败ConsumerRecord和例外。 例如,如果您始终想要路由到分区 0,则可以使用:spring-doc.cadn.net.cn

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}
如果将使用者绑定的dlqPartitions属性设置为 1(以及活页夹的minPartitionCount等于1),无需提供DlqPartitionFunction;框架将始终使用分区 0。 如果将使用者绑定的dlqPartitions属性设置为大于1(或活页夹的minPartitionCount大于1),您必须提供一个DlqPartitionFunctionbean,即使分区计数与原始主题的分区计数相同。

在 Kafka Streams 绑定器中使用异常处理功能时,需要记住几件事。spring-doc.cadn.net.cn

  • 该物业spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler适用于整个应用。 这意味着如果同一应用程序中有多个函数,则此属性将应用于所有函数。spring-doc.cadn.net.cn

  • 反序列化的异常处理与本机反序列化和框架提供的消息转换一致。spring-doc.cadn.net.cn

3.6.4. 在 Binder 中处理生产异常

与上述对反序列化异常处理程序的支持不同,绑定程序不提供用于处理生产异常的第一类机制。 但是,您仍然可以使用StreamsBuilderFactoryBean定制器,您可以在下面的后续部分中找到更多详细信息。spring-doc.cadn.net.cn

3.7. 重试关键业务逻辑

在某些情况下,您可能希望重试业务逻辑中对应用程序至关重要的部分。 可能会对关系数据库进行外部调用,或者从 Kafka Streams 处理器调用 REST 端点。 这些调用可能会因各种原因而失败,例如网络问题或远程服务不可用。 更常见的是,如果您可以重试,这些故障可能会自行解决。 默认情况下,Kafka Streams 绑定器会创建RetryTemplate所有输入绑定的 bean。spring-doc.cadn.net.cn

如果函数具有以下签名,spring-doc.cadn.net.cn

@Bean
public java.util.function.Consumer<KStream<Object, String>> process()

和默认绑定名称,RetryTemplate将注册为process-in-0-RetryTemplate. 这遵循绑定名称 (process-in-0)后跟文字-RetryTemplate. 在多个输入绑定的情况下,将有一个单独的RetryTemplate每个绑定可用的 bean。 如果有自定义RetryTemplate应用程序中可用的 bean 并通过spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName,则该属性优先于任何输入绑定级别重试模板配置属性。spring-doc.cadn.net.cn

一旦RetryTemplate从绑定注入到应用程序中,它可用于重试应用程序的任何关键部分。下面是一个示例:spring-doc.cadn.net.cn

@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {

    return input -> input
            .process(() -> new Processor<Object, String>() {
                @Override
                public void init(ProcessorContext processorContext) {
                }

                @Override
                public void process(Object o, String s) {
                    retryTemplate.execute(context -> {
                       //Critical business logic goes here.
                    });
                }

                @Override
                public void close() {
                }
            });
}

或者您可以使用自定义RetryTemplate如下。spring-doc.cadn.net.cn

@EnableAutoConfiguration
public static class CustomRetryTemplateApp {

    @Bean
    @StreamRetryTemplate
    RetryTemplate fooRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(1);

        retryTemplate.setBackOffPolicy(backOffPolicy);
        retryTemplate.setRetryPolicy(retryPolicy);

        return retryTemplate;
    }

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input -> input
                .process(() -> new Processor<Object, String>() {
                    @Override
                    public void init(ProcessorContext processorContext) {
                    }

                    @Override
                    public void process(Object o, String s) {
                        fooRetryTemplate().execute(context -> {
                           //Critical business logic goes here.
                        });

                    }

                    @Override
                    public void close() {
                    }
                });
    }
}

请注意,当重试用尽时,默认情况下,将抛出最后一个异常,导致处理器终止。如果您希望处理异常并继续处理,则可以将 RecoveryCallback 添加到execute方法: 这是一个例子。spring-doc.cadn.net.cn

retryTemplate.execute(context -> {
    //Critical business logic goes here.
    }, context -> {
       //Recovery logic goes here.
       return null;
    ));

有关 RetryTemplate、重试策略、退避策略等的更多信息,请参阅 Spring Retry 项目。spring-doc.cadn.net.cn

3.8. 状态商店

当使用高级 DSL 并进行适当的调用以触发状态存储时,Kafka Streams 会自动创建状态存储。spring-doc.cadn.net.cn

如果您想实现传入的KTable绑定为命名状态存储,则可以使用以下策略来执行此作。spring-doc.cadn.net.cn

假设您有以下功能。spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
   ...
}

然后通过设置以下属性,传入的KTable数据将具体化到命名状态存储中。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store

您可以在应用程序中将自定义状态存储定义为 bean,这些状态存储将被 binder 检测并添加到 Kafka Streams 构建器中。 特别是在使用处理器 API 时,需要手动注册状态存储。 为此,您可以在应用程序中将 StateStore 创建为 bean。 以下是定义此类 bean 的示例。spring-doc.cadn.net.cn

@Bean
public StoreBuilder myStore() {
    return Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
            Serdes.Long());
}

@Bean
public StoreBuilder otherStore() {
    return Stores.windowStoreBuilder(
            Stores.persistentWindowStore("other-store",
                    1L, 3, 3L, false), Serdes.Long(),
            Serdes.Long());
}

然后,应用程序可以直接访问这些状态存储。spring-doc.cadn.net.cn

在引导过程中,上述 bean 将由绑定器处理并传递给 Streams 构建器对象。spring-doc.cadn.net.cn

访问状态存储:spring-doc.cadn.net.cn

Processor<Object, Product>() {

    WindowStore<Object, String> state;

    @Override
    public void init(ProcessorContext processorContext) {
        state = (WindowStore)processorContext.getStateStore("mystate");
    }
    ...
}

在注册全局状态存储时,这将不起作用。 要注册全局状态存储,请参阅下面有关自定义的部分StreamsBuilderFactoryBean.spring-doc.cadn.net.cn

3.9. 交互式查询

Kafka Streams 绑定器 API 公开了一个名为InteractiveQueryService以交互方式查询状态存储。 您可以在应用程序中以 Spring Bean 的形式访问它。从应用程序访问此 Bean 的一种简单方法是autowire豆子。spring-doc.cadn.net.cn

@Autowired
private InteractiveQueryService interactiveQueryService;

一旦您获得了对此 bean 的访问权限,您就可以查询您感兴趣的特定状态存储。见下文。spring-doc.cadn.net.cn

ReadOnlyKeyValueStore<Object, Object> keyValueStore =
						interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());

在启动期间,上述检索存储的方法调用可能会失败。 例如,它可能仍在初始化状态存储的过程中。 在这种情况下,重试此作会很有用。 Kafka Streams 绑定器提供了一个简单的重试机制来适应这一点。spring-doc.cadn.net.cn

下面是可用于控制此重试的两个属性。spring-doc.cadn.net.cn

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 默认值为1.spring-doc.cadn.net.cn

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认值为1000毫秒。spring-doc.cadn.net.cn

如果有多个 kafka 流应用程序实例正在运行,那么在以交互方式查询它们之前,您需要确定哪个应用程序实例托管了您要查询的特定密钥。InteractiveQueryServiceAPI 提供了用于识别主机信息的方法。spring-doc.cadn.net.cn

为了使其正常工作,您必须配置属性application.server如下:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>

以下是一些代码片段:spring-doc.cadn.net.cn

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
						key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
}

有关这些主机查找方法的更多信息,请参阅有关这些方法的 Javadoc。 对于这些方法,在启动期间,如果底层 KafkaStreams 对象尚未准备就绪,它们可能会抛出异常。 上述重试属性也适用于这些方法。spring-doc.cadn.net.cn

3.9.1. 通过 InteractiveQueryService 提供的其他 API 方法

使用以下 API 方法检索KeyQueryMetadata与给定存储和键的组合相关联的对象。spring-doc.cadn.net.cn

public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)

使用以下 API 方法检索KakfaStreams与给定存储和键的组合相关联的对象。spring-doc.cadn.net.cn

public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)

3.9.2. 自定义存储查询参数

有时需要先微调 store 查询参数,然后才能通过InteractiveQueryService. 为此,从4.0.1版本的 Binder 中,你可以为StoreQueryParametersCustomizer这是一个功能接口,具有customize采用StoreQueryParameter作为论据。 这是它的方法签名。spring-doc.cadn.net.cn

StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);

使用此方法,应用程序可以进一步自定义StoreQueryParameters例如启用陈旧的商店。spring-doc.cadn.net.cn

当此 bean 存在于此应用程序中时,InteractiveQueryService会调用其customize方法。spring-doc.cadn.net.cn

请记住,必须有一个唯一的 beanStoreQueryParametersCustomizer在应用程序中可用。

3.10. 健康指标

运行状况指示器需要依赖项spring-boot-starter-actuator.对于 maven 使用:spring-doc.cadn.net.cn

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

Spring Cloud Stream Kafka Streams Binder 提供了一个健康指示器来检查底层流线程的状态。 Spring Cloud Stream 定义了一个属性management.health.binders.enabled以启用运行状况指示器。请参阅 Spring Cloud Stream 文档spring-doc.cadn.net.cn

运行状况指示器为每个流线程的元数据提供以下详细信息:spring-doc.cadn.net.cn

默认情况下,只有全局状态可见 (UPDOWN).要显示详细信息,属性management.endpoint.health.show-details必须设置为ALWAYSWHEN_AUTHORIZED. 有关运行状况信息的更多详细信息,请参阅 Spring Boot Actuator 文档spring-doc.cadn.net.cn

运行状况指示器的状态为UP如果注册的所有 Kafka 线程都在RUNNING州。

由于 Kafka Streams 绑定器中有三个单独的绑定器 (KStream,KTableGlobalKTable),它们都将报告健康状态。 启用时show-details,报告的一些信息可能是多余的。spring-doc.cadn.net.cn

当同一应用程序中存在多个 Kafka Streams 处理器时,将报告所有处理器的运行状况检查,并按 Kafka Streams 的应用程序 ID 进行分类。spring-doc.cadn.net.cn

3.11. 访问 Kafka Streams 指标

Spring Cloud Stream Kafka Streams 绑定器提供可以通过千分尺导出的 Kafka Streams 指标MeterRegistry.spring-doc.cadn.net.cn

对于 Spring Boot 版本 2.2.x,度量支持是通过绑定程序的自定义 Micrometer 度量实现提供的。 对于 Spring Boot 版本 2.3.x,Kafka Streams 指标支持是通过 Micrometer 本机提供的。spring-doc.cadn.net.cn

通过启动执行器端点访问指标时,请确保将metrics前往酒店management.endpoints.web.exposure.include. 然后你可以访问/acutator/metrics以获取所有可用指标的列表,然后可以通过相同的 URI (/actuator/metrics/<metric-name>).spring-doc.cadn.net.cn

3.12. 混合使用高级 DSL 和低级处理器 API

Kafka Streams 提供了两种 API 变体。 它有一个更高级别的 DSL 类似 API,您可以在其中链接许多功能程序员可能熟悉的各种作。 Kafka Streams 还允许访问低级处理器 API。 处理器 API 虽然非常强大,并且能够在低得多的级别上控制事物,但本质上是势在必行的。 用于 Spring Cloud Stream 的 Kafka Streams 绑定器允许您使用高级 DSL 或混合使用 DSL 和处理器 API。 混合使用这两种变体可以为您提供许多选项来控制应用程序中的各种用例。 应用程序可以使用transformprocess方法 API 调用来访问处理器 API。spring-doc.cadn.net.cn

下面是如何使用process应用程序接口。spring-doc.cadn.net.cn

@Bean
public Consumer<KStream<Object, String>> process() {
    return input ->
        input.process(() -> new Processor<Object, String>() {
            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
               this.context = context;
            }

            @Override
            public void process(Object key, String value) {
                //business logic
            }

            @Override
            public void close() {

        });
}

这是一个使用transform应用程序接口。spring-doc.cadn.net.cn

@Bean
public Consumer<KStream<Object, String>> process() {
    return (input, a) ->
        input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
            @Override
            public void init(ProcessorContext context) {

            }

            @Override
            public void close() {

            }

            @Override
            public KeyValue<Object, String> transform(Object key, String value) {
                // business logic - return transformed KStream;
            }
        });
}

processAPI 方法调用是一个终端作,而transformAPI 是非终端的,可为您提供潜在的转换KStream使用它可以继续使用 DSL 或处理器 API 进行进一步处理。spring-doc.cadn.net.cn

3.13. 出站分区支持

Kafka Streams 处理器通常将处理后的输出发送到出站 Kafka 主题中。 如果出站主题是分区的,并且处理器需要将传出数据发送到特定分区中,那么应用程序需要提供类型为StreamPartitioner. 有关更多详细信息,请参阅 StreamPartitioner。 让我们看一些例子。spring-doc.cadn.net.cn

这是我们已经多次看到的同一个处理器,spring-doc.cadn.net.cn

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    ...
}

以下是输出绑定目标:spring-doc.cadn.net.cn

spring.cloud.stream.bindings.process-out-0.destination: outputTopic

如果主题outputTopic有 4 个分区,如果您不提供分区策略,Kafka Streams 将使用默认的分区策略,这可能不是您想要的结果,具体取决于特定用例。 假设,您想发送任何匹配的密钥spring对 0 进行分区,cloud到分区 1,stream分区 2,其他所有内容都分区 3。 这是您需要在应用程序中执行的作。spring-doc.cadn.net.cn

@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
    return (t, k, v, n) -> {
        if (k.equals("spring")) {
            return 0;
        }
        else if (k.equals("cloud")) {
            return 1;
        }
        else if (k.equals("stream")) {
            return 2;
        }
        else {
            return 3;
        }
    };
}

这是一个基本的实现,但是,您可以访问记录的键/值、主题名称和分区总数。 因此,如果需要,您可以实现复杂的分区策略。spring-doc.cadn.net.cn

您还需要提供此 Bean 名称以及应用程序配置。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner

应用程序中的每个输出主题都需要像这样单独配置。spring-doc.cadn.net.cn

3.14. StreamsBuilderFactoryBean 其他自定义

通常需要自定义StreamsBuilderFactoryBean这会创建KafkaStreams对象。 基于 Spring Kafka 提供的底层支持,binder 允许您自定义StreamsBuilderFactoryBean. 您可以使用org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer从 Spring for Apache Kafka 项目中自定义/配置StreamsBuilderFactoryBean本身。spring-doc.cadn.net.cn

下面是使用StreamsBuilderFactoryBeanConfigurer.spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

上面显示了您可以执行的作,以配置StreamsBuilderFactoryBean. 您基本上可以从StreamsBuilderFactoryBean以配置它。 此配置器将在工厂 Bean 启动之前由 Binder 调用。spring-doc.cadn.net.cn

一旦您访问了StreamsBuilderFactoryBean,您还可以自定义底层KafkaStreams对象通过KafkaStreamsCustomizer. 这是这样做的蓝图。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

KafkaStreamsCustomizer将被StreamsBuilderFactoryBean就在基础KafkaStreams开始。spring-doc.cadn.net.cn

只能有一个StreamsBuilderFactoryBeanConfigurer在整个应用程序中。 那么我们如何考虑多个 Kafka Streams 处理器,因为它们中的每个处理器都由单独备份StreamsBuilderFactoryBean对象? 在这种情况下,如果这些处理器的自定义需要不同,则应用程序需要根据应用程序 ID 应用一些筛选器。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {

    return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                    });
                }
            });
        }
    };

3.14.1. 使用定制器注册全局状态存储

如上所述,绑定器不提供将全局状态存储注册为功能的第一类方法。 为此,您需要使用定制器。 这是如何做到这一点的。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        try {
            final StreamsBuilder streamsBuilder = fb.getObject();
            streamsBuilder.addGlobalStore(...);
        }
        catch (Exception e) {

        }
    };
}

同样,如果您有多个处理器,则需要将全局状态存储附加到右侧StreamsBuilder通过过滤掉另一个StreamsBuilderFactoryBean使用如上所述的应用程序 ID 的对象。spring-doc.cadn.net.cn

3.14.2. 使用定制器注册生产异常处理程序

在错误处理部分,我们指出 binder 没有提供处理生产异常的第一类方法。 尽管如此,您仍然可以使用StreamsBuilderFacotryBeancustomizer 来注册生产异常处理程序。见下文。spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            CustomProductionExceptionHandler.class);
    };
}

同样,如果您有多个处理器,您可能需要根据正确的处理器将其适当地设置为StreamsBuilderFactoryBean. 您也可以使用配置属性添加此类生产异常处理程序(有关更多信息,请参阅下文),但如果您选择使用编程方法,则这是一个选项。spring-doc.cadn.net.cn

3.15. 时间戳提取器

Kafka Streams 允许您根据各种时间戳概念控制使用者记录的处理。 默认情况下,Kafka Streams 会提取嵌入在使用者记录中的时间戳元数据。 您可以通过提供不同的TimestampExtractor每个输入绑定的实现。 以下是有关如何做到这一点的一些详细信息。spring-doc.cadn.net.cn

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
    return orderStream ->
            customers ->
                products -> orderStream;
}

@Bean
public TimestampExtractor timestampExtractor() {
    return new WallclockTimestampExtractor();
}

然后你设置上面的TimestampExtractor每个消费者绑定的 Bean 名称。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"

如果跳过输入使用者绑定来设置自定义时间戳提取器,则该使用者将使用默认设置。spring-doc.cadn.net.cn

3.16. 具有基于 Kafka Streams 的绑定器和常规 Kafka 绑定器的多绑定器

您可以拥有一个应用程序,其中既有基于常规 Kafka 绑定器的功能/使用者/提供商,又有基于 Kafka Streams 的处理器。 但是,您不能在单个函数或消费者中混合使用它们。spring-doc.cadn.net.cn

下面是一个示例,在同一应用程序中具有两个基于 Binder 的组件。spring-doc.cadn.net.cn

@Bean
public Function<String, String> process() {
    return s -> s;
}

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {

    return input -> input;
}

这是配置中的相关部分:spring-doc.cadn.net.cn

spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar

如果您拥有与上述相同的应用程序,但正在处理两个不同的 Kafka 集群,例如常规process同时作用于 Kafka 集群 1 和集群 2(从 cluster-1 接收数据并发送到 cluster-2),并且 Kafka Streams 处理器作用于 Kafka 集群 2。 然后你必须使用 Spring Cloud Stream 提供的多绑定器工具。spring-doc.cadn.net.cn

下面是配置在这种情况下可能发生的变化。spring-doc.cadn.net.cn

# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster

spring.cloud.function.definition=process;kstreamProcess

# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2

# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3

注意以上配置。 我们有两种绑定器,但总共有 3 个绑定器,第一种是基于集群 1 (kafka1),然后是另一个基于集群 2 (kafka2),最后是kstream一个 (kafka3). 应用程序中的第一个处理器从kafka1并发布到kafka2其中两个绑定器都基于常规的 Kafka 绑定器,但集群不同。 第二个处理器是 Kafka Streams 处理器,它使用kafka3kafka2,但粘合剂类型不同。spring-doc.cadn.net.cn

由于 Kafka Streams 系列绑定器中有三种不同的绑定器类型可用 -kstream,ktableglobalktable- 如果您的应用程序具有基于这些绑定中的任何一个的多个绑定,则需要将其显式作为绑定器类型提供。spring-doc.cadn.net.cn

例如,如果您有如下处理器,spring-doc.cadn.net.cn

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    ...
}

然后,必须在多活页夹方案中按如下方式配置。 请注意,仅当您有一个真正的多绑定器方案时,才需要这样做,其中有多个处理器在单个应用程序中处理多个集群。 在这种情况下,需要显式为绑定器提供绑定,以区别于其他处理器的绑定器类型和集群。spring-doc.cadn.net.cn

spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}

spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1  #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2  #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3  #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream

# rest of the configuration is omitted.

3.17. 状态清理

默认情况下,停止绑定时不会清理任何本地状态。 这与 Spring Kafka 2.7 版的行为相同。 有关更多详细信息,请参阅 Spring Kafka 文档。 要修改此行为,只需将单个CleanupConfig @Bean(配置为在启动、停止或两者都不清理)到应用程序上下文;将检测到 Bean 并将其连接到工厂 Bean。spring-doc.cadn.net.cn

3.18. Kafka Streams 拓扑可视化

Kafka Streams Binder 提供了以下执行器端点,用于检索拓扑描述,您可以使用这些端点使用外部工具可视化拓扑。spring-doc.cadn.net.cn

/actuator/kafkastreamstopologyspring-doc.cadn.net.cn

/actuator/kafkastreamstopology/<application-id of the processor>spring-doc.cadn.net.cn

您需要包含 Spring Boot 中的执行器和 Web 依赖项才能访问这些端点。 此外,您还需要添加kafkastreamstopologymanagement.endpoints.web.exposure.include财产。 默认情况下,kafkastreamstopology端点已禁用。spring-doc.cadn.net.cn

3.19. Kafka Streams 应用程序中基于事件类型的路由

Kafka Streams 绑定器不支持基于常规消息通道的绑定程序中可用的路由函数。 但是,Kafka Streams 绑定器仍然通过入站记录上的事件类型记录头提供路由功能。spring-doc.cadn.net.cn

若要启用基于事件类型的路由,应用程序必须提供以下属性。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes.spring-doc.cadn.net.cn

这可以是逗号分隔的值。spring-doc.cadn.net.cn

例如,假设我们有这个函数:spring-doc.cadn.net.cn

@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
    return input -> input;
}

我们还假设,如果传入记录的事件类型为foobar. 这可以使用以下方式表示eventTypes属性。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,barspring-doc.cadn.net.cn

现在,当应用程序运行时,活页夹会检查每个传入记录中的标头event_type并查看它的值是否设置为foobar. 如果找不到其中任何一个,则将跳过函数执行。spring-doc.cadn.net.cn

默认情况下,活页夹期望记录头键为event_type,但可以根据绑定进行更改。 例如,如果我们想将此绑定上的标头键更改为my_event而不是默认值,可以按如下方式更改。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event.spring-doc.cadn.net.cn

在 Kafkfa Streams 绑定器中使用事件路由功能时,它使用 byte 数组Serde反序列化所有传入记录。 如果记录头与事件类型匹配,则只有它使用实际的Serde使用配置的或推断的Serde. 如果在绑定上设置反序列化异常处理程序,则会引入问题,因为预期的反序列化仅发生在堆栈中,从而导致意外错误。 为了解决此问题,您可以在绑定上设置以下属性,以强制绑定器使用配置或推断的Serde而不是字节数组Serde.spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEventsspring-doc.cadn.net.cn

这样,应用程序可以在使用事件路由功能时立即检测反序列化问题,并可以做出适当的处理决策。spring-doc.cadn.net.cn

3.20. Kafka Streams 绑定器中的绑定可视化和控制

从 3.1.2 版本开始,Kafka Streams 绑定器支持绑定可视化和控制。 仅支持的两个生命周期阶段是STOPPEDSTARTED. 生命周期阶段PAUSEDRESUMED在 Kafka Streams 绑定器中不可用。spring-doc.cadn.net.cn

为了激活绑定可视化和控制,应用程序需要包含以下两个依赖项。spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

如果您更喜欢使用 webflux,则可以包含spring-boot-starter-webflux而不是标准的 Web 依赖项。spring-doc.cadn.net.cn

此外,您还需要设置以下属性:spring-doc.cadn.net.cn

management.endpoints.web.exposure.include=bindings

为了进一步说明此功能,让我们使用以下应用程序作为指南:spring-doc.cadn.net.cn

@SpringBootApplication
public class KafkaStreamsApplication {

	public static void main(String[] args) {
		SpringApplication.run(KafkaStreamsApplication.class, args);
	}

	@Bean
	public Consumer<KStream<String, String>> consumer() {
		return s -> s.foreach((key, value) -> System.out.println(value));
	}

	@Bean
	public Function<KStream<String, String>, KStream<String, String>> function() {
		return ks -> ks;
	}

}

正如我们所看到的,该应用程序有两个 Kafka Streams 函数 - 一个是消费者,另一个是函数。 默认情况下,使用者绑定命名为consumer-in-0. 同样,对于函数,输入绑定为function-in-0输出绑定为function-out-0.spring-doc.cadn.net.cn

应用程序启动后,我们可以使用以下绑定端点找到有关绑定的详细信息。spring-doc.cadn.net.cn

 curl http://localhost:8080/actuator/bindings | jq .
[
  {
    "bindingName": "consumer-in-0",
    "name": "consumer-in-0",
    "group": "consumer-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": true,
    "extendedInfo": {}
  },
  {
    "bindingName": "function-in-0",
    "name": "function-in-0",
    "group": "function-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": true,
    "extendedInfo": {}
  },
  {
    "bindingName": "function-out-0",
    "name": "function-out-0",
    "group": "function-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": false,
    "extendedInfo": {}
  }
]

有关所有三个绑定的详细信息可以在上面找到。spring-doc.cadn.net.cn

现在让我们停止 consumer-in-0 绑定。spring-doc.cadn.net.cn

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0

此时,不会通过此绑定接收任何记录。spring-doc.cadn.net.cn

重新开始绑定。spring-doc.cadn.net.cn

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0

当单个函数上存在多个绑定时,对其中任何一个绑定调用这些作都将起作用。 这是因为单个函数上的所有绑定都由相同的StreamsBuilderFactoryBean. 因此,对于上述函数,要么function-in-0function-out-0会起作用。spring-doc.cadn.net.cn

3.21. 手动启动 Kafka Streams 处理器

Spring Cloud Stream Kafka Streams 绑定器提供了一个名为StreamsBuilderFactoryManagerStreamsBuilderFactoryBean来自 Apache Kafka 的 Spring。 此管理器 API 用于控制多个StreamsBuilderFactoryBean基于活页夹的应用程序中的每个处理器。 因此,在使用活页夹时,如果要手动控制各种StreamsBuilderFactoryBean对象,您需要使用StreamsBuilderFactoryManager. 您可以使用该属性spring.kafka.streams.auto-startup并将其设置为false以关闭处理器的自动启动。 然后,在应用程序中,您可以使用以下内容来启动处理器StreamsBuilderFactoryManager.spring-doc.cadn.net.cn

@Bean
public ApplicationRunner runner(StreamsBuilderFactoryManager sbfm) {
    return args -> {
        sbfm.start();
    };
}

当您希望应用程序在主线程中启动并让 Kafka Streams 处理器单独启动时,此功能非常方便。 例如,当有一个需要还原的大型状态存储时,如果处理器像默认情况一样正常启动,这可能会阻止应用程序启动。 如果您正在使用某种活跃度探测机制(例如在 Kubernetes 上),它可能会认为应用程序已关闭并尝试重新启动。 为了纠正此问题,您可以将spring.kafka.streams.auto-startupfalse并按照上述方法进行作。spring-doc.cadn.net.cn

请记住,在使用 Spring Cloud Stream 绑定器时,您不会直接处理StreamsBuilderFactoryBean来自 Apache Kafka 的 Spring,而不是StreamsBuilderFactoryManager,作为StreamsBuilderFactoryBean对象由活页夹在内部管理。spring-doc.cadn.net.cn

3.22. 有选择地手动启动 Kafka Streams 处理器

虽然上面列出的方法将无条件应用自动启动false通过StreamsBuilderFactoryManager,通常希望只有单独选择的 Kafka Streams 处理器不自动启动。 例如,假设您的应用程序中有三个不同的函数(处理器),并且对于其中一个处理器,您不希望将其作为应用程序启动的一部分来启动它。 这是这种情况的一个例子。spring-doc.cadn.net.cn

@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {

}

@Bean
public Consumer<KStream<?, ?>> process2() {

}

@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {

}

在上述方案中,如果将spring.kafka.streams.auto-startupfalse,则在应用程序启动期间,没有任何处理器将自动启动。 在这种情况下,您必须按照上述方式以编程方式启动它们,方法是调用start()在基础上StreamsBuilderFactoryManager. 但是,如果我们有一个用例有选择地仅禁用一个处理器,那么您必须将auto-startup在该处理器的单个绑定上。 让我们假设我们不想要我们的process3自动启动功能。 这是一个BiFunction具有两个输入绑定 -process3-in-0process3-in-1. 为了避免此处理器自动启动,您可以选择这些输入绑定中的任何一个,并将auto-startup在他们身上。 您选择哪种装订并不重要;如果您愿意,您可以设置auto-startupfalse在他们两个上,但一个就足够了。 因为它们共享同一个工厂 bean,所以您不必在两个绑定上将 autoStartup 设置为 false,但为了清楚起见,这样做可能是有意义的。spring-doc.cadn.net.cn

这是可用于禁用此处理器的自动启动的 Spring Cloud Stream 属性。spring-doc.cadn.net.cn

spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false
spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false

然后,您可以使用 REST 端点或使用BindingsEndpointAPI 如下所示。 为此,您需要确保对类路径具有 Spring Boot 执行器依赖项。spring-doc.cadn.net.cn

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process3-in-0
@Autowired
BindingsEndpoint endpoint;

@Bean
public ApplicationRunner runner() {
    return args -> {
        endpoint.changeState("process3-in-0", State.STARTED);
    };
}

有关此机制的更多详细信息,请参阅参考文档中的此部分spring-doc.cadn.net.cn

通过禁用来控制绑定时auto-startup如本节所述,请注意,这仅适用于消费者绑定。换句话说,如果您使用生产者绑定,process3-out-0,这在禁用处理器的自动启动方面没有任何影响,尽管此生产者绑定使用相同的StreamsBuilderFactoryBean作为消费者绑定。

3.23. 使用 Spring Cloud Sleuth 进行跟踪

当 Spring Cloud Sleuth 位于基于 Spring Cloud Stream Kafka Streams 绑定程序的应用程序的类路径上时,其使用者和生产者都会自动检测跟踪信息。 但是,为了跟踪任何特定于应用程序的作,需要由用户代码显式检测这些作。 这可以通过注入KafkaStreamsTracingbean 来自应用程序中的 Spring Cloud Sleuth,然后通过这个注入的 bean 调用各种 Kafka Streams作。 以下是一些使用它的示例。spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
                LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
                return new KeyValue<>(value.getRegion(),
                        value.getClicks());
            }))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum, Materialized.as(CLICK_UPDATES))
            .toStream());
}

在上面的示例中,有两个地方添加了显式跟踪检测。 首先,我们记录传入的键/值信息KStream. 记录此信息时,也会记录关联的跨度和跟踪 ID,以便监控系统可以跟踪它们并与同一跨度 ID 相关联。 其次,当我们调用map作,而不是直接在KStream类,我们将其包装在一个transform作,然后调用mapKafkaStreamsTracing. 在这种情况下,记录的消息也将包含跨度 ID 和跟踪 ID。spring-doc.cadn.net.cn

这是另一个示例,我们使用低级转换器 API 来访问各种 Kafka Streams 标头。 当 spring-cloud-sleuth 在类路径上时,也可以像这样访问所有跟踪标头。spring-doc.cadn.net.cn

@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
    return input -> input.transform(kafkaStreamsTracing.transformer(
            "transformer-1",
            () -> new Transformer<String, String, KeyValue<String, String>>() {
                ProcessorContext context;

                @Override
                public void init(ProcessorContext context) {
                    this.context = context;
                }

                @Override
                public KeyValue<String, String> transform(String key, String value) {
                    LOG.info("Headers: " + this.context.headers());
                    LOG.info("K/V:" + key + "/" + value);
                    // More transformations, business logic execution, etc. go here.
                    return KeyValue.pair(key, value);
                }

                @Override
                public void close() {
                }
            }));
}

3.24. 配置选项

本节包含 Kafka Streams 绑定器使用的配置选项。spring-doc.cadn.net.cn

有关与 binder 相关的常见配置选项和属性,请参阅核心文档spring-doc.cadn.net.cn

3.24.1. Kafka Streams Binder 属性

以下属性在活页夹级别可用,并且必须以spring.cloud.stream.kafka.streams.binder.在 Kafka Streams 绑定器中重复使用的任何 Kafka 绑定器提供的属性都必须以spring.cloud.stream.kafka.streams.binder而不是spring.cloud.stream.kafka.binder. 此规则的唯一例外是定义 Kafka 引导服务器属性时,在这种情况下,任一前缀都有效。spring-doc.cadn.net.cn

配置

映射包含与 Apache Kafka Streams API 相关的属性的键/值对。 此属性必须以spring.cloud.stream.kafka.streams.binder.. 以下是使用此属性的一些示例。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

有关可能进入流配置的所有属性的更多信息,请参阅StreamsConfigApache Kafka Streams 文档中的 JavaDocs。 您可以从StreamsConfig可以通过这个设置。 使用此属性时,它适用于整个应用程序,因为这是活页夹级别的属性。 如果应用程序中有多个处理器,则所有这些处理器都将获得这些属性。 对于像application.id,这将成为问题,因此您必须仔细检查属性如何StreamsConfig使用此活页夹级别进行映射configuration财产。spring-doc.cadn.net.cn

functions.<function-bean-name>.applicationId

仅适用于功能型处理器。 这可用于设置应用程序中每个功能的应用程序 ID。 在多个函数的情况下,这是设置应用程序 ID 的便捷方法。spring-doc.cadn.net.cn

functions.<function-bean-name>.configuration

仅适用于功能型处理器。 映射包含与 Apache Kafka Streams API 相关的属性的键/值对。 这类似于活页夹级别configuration属性描述,但这个级别的configuration属性仅针对命名函数进行限制。 当您有多个处理器并且想要根据特定函数限制对配置的访问时,您可能需要使用它。 都StreamsConfig属性可以在此处使用。spring-doc.cadn.net.cn

经纪人

代理 URLspring-doc.cadn.net.cn

违约:localhostspring-doc.cadn.net.cn

zk节点

Zookeeper URLspring-doc.cadn.net.cn

违约:localhostspring-doc.cadn.net.cn

反序列化ExceptionHandler

反序列化错误处理程序类型。 此处理程序在绑定程序级别应用,因此应用于应用程序中的所有输入绑定。 有一种方法可以在消费者绑定级别以更细粒度的方式控制它。 可能的值是 -logAndContinue,logAndFail,skipAndContinuesendToDlqspring-doc.cadn.net.cn

违约:logAndFailspring-doc.cadn.net.cn

应用程序 Id

在绑定器级别全局设置 Kafka Streams 应用程序 application.id 的便捷方法。 如果应用程序包含多个函数,则应用程序 ID 应以不同的方式设置。 请参阅上面详细讨论设置应用程序 ID。spring-doc.cadn.net.cn

默认:应用程序将生成静态应用程序 ID。有关更多详细信息,请参阅应用程序 ID 部分。spring-doc.cadn.net.cn

stateStoreRetry.maxAttempts

尝试连接到状态存储的最大尝试次数。spring-doc.cadn.net.cn

默认值:1spring-doc.cadn.net.cn

stateStoreRetry.backoffPeriod

尝试在重试时连接到状态存储时的回退期。spring-doc.cadn.net.cn

默认值:1000 毫秒spring-doc.cadn.net.cn

consumer属性

活页夹级别的任意使用者属性。spring-doc.cadn.net.cn

producer属性

绑定器级别的任意生产者属性。spring-doc.cadn.net.cn

includeStoppedProcessorsForHealthCheck

当处理器的绑定通过执行器停止时,默认情况下,该处理器将不参与健康检查。 将此属性设置为true为所有处理器启用运行状况检查,包括当前通过绑定执行器端点停止的处理器。spring-doc.cadn.net.cn

默认值:falsespring-doc.cadn.net.cn

3.24.2. Kafka Streams 生产者属性

以下属性适用于 Kafka Streams 生产者,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.为方便起见,如果有多个输出绑定并且它们都需要一个通用值,则可以使用前缀spring.cloud.stream.kafka.streams.default.producer..spring-doc.cadn.net.cn

keySerde

要使用的密钥 Serdespring-doc.cadn.net.cn

默认值:请参阅上面关于消息反序列化的讨论spring-doc.cadn.net.cn

值Serde

值 serde 使用spring-doc.cadn.net.cn

默认值:请参阅上面关于消息反序列化的讨论spring-doc.cadn.net.cn

使用原生编码

标志来启用/禁用本机编码spring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

streamPartitionerBeanName (流分区器BeanName)

要在使用者处使用的自定义出站分区器 Bean 名称。 应用程序可以提供自定义StreamPartitioner作为 Spring Bean,并且可以将此 Bean 的名称提供给生产者以代替默认名称。spring-doc.cadn.net.cn

默认值:请参阅上面有关出站分区支持的讨论。spring-doc.cadn.net.cn

生产作为

处理器要生成到的接收器组件的自定义名称。spring-doc.cadn.net.cn

亲爱的:none(由 Kafka Streams 生成)spring-doc.cadn.net.cn

3.24.3. Kafka Streams 消费者属性

以下属性可用于 Kafka Streams 使用者,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.为方便起见,如果有多个输入绑定并且它们都需要一个公共值,则可以使用前缀spring.cloud.stream.kafka.streams.default.consumer..spring-doc.cadn.net.cn

应用程序 Id

为每个输入绑定设置 application.id。spring-doc.cadn.net.cn

默认值:见上文。spring-doc.cadn.net.cn

keySerde

要使用的密钥 Serdespring-doc.cadn.net.cn

默认值:请参阅上面关于消息反序列化的讨论spring-doc.cadn.net.cn

值Serde

值 serde 使用spring-doc.cadn.net.cn

默认值:请参阅上面关于消息反序列化的讨论spring-doc.cadn.net.cn

具体化为

状态存储在使用传入的 KTable 类型时实现spring-doc.cadn.net.cn

违约:none.spring-doc.cadn.net.cn

使用原生解码

标志来启用/禁用本机解码spring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

dlq名称

DLQ 主题名称。spring-doc.cadn.net.cn

默认值:请参阅上文有关错误处理和 DLQ 的讨论。spring-doc.cadn.net.cn

开始偏移量

如果没有要使用的已提交偏移量,则从偏移量开始。 这主要用于消费者第一次使用某个主题时。 Kafka Streams 使用earliest作为默认策略,活页夹使用相同的默认值。 这可以重写为latest使用此属性。spring-doc.cadn.net.cn

违约:earliest.spring-doc.cadn.net.cn

注意:使用resetOffsets对 Kafka Streams 绑定器没有任何影响。 与基于消息通道的绑定器不同,Kafka Streams 绑定器不会寻求按需开始或结束。spring-doc.cadn.net.cn

反序列化ExceptionHandler

反序列化错误处理程序类型。 此处理程序按使用者绑定应用,而不是前面所述的绑定器级别属性。 可能的值是 -logAndContinue,logAndFail,skipAndContinuesendToDlqspring-doc.cadn.net.cn

违约:logAndFailspring-doc.cadn.net.cn

timestampExtractorBeanName

要在使用者处使用的特定时间戳提取器 Bean 名称。 应用程序可以提供TimestampExtractor作为 Spring bean,并且可以将此 bean 的名称提供给消费者以代替默认名称。spring-doc.cadn.net.cn

默认值:请参阅上面关于时间戳提取器的讨论。spring-doc.cadn.net.cn

事件类型

此绑定支持的事件类型的逗号分隔列表。spring-doc.cadn.net.cn

违约:nonespring-doc.cadn.net.cn

eventTypeHeaderKey

事件类型标头键。spring-doc.cadn.net.cn

违约:event_typespring-doc.cadn.net.cn

consumedAs

处理器从中消费的源组件的自定义名称。spring-doc.cadn.net.cn

亲爱的:none(由 Kafka Streams 生成)spring-doc.cadn.net.cn

3.24.4. 关于并发的特别说明

在 Kafka Streams 中,您可以使用num.stream.threads财产。 这,您可以使用各种configuration上述 binder、functions、producer 或 consumer 级别下的选项。 您还可以使用concurrency核心 Spring Cloud Stream 为此目的提供的属性。 使用此功能时,您需要在消费者上使用它。 当有多个输入绑定时,请在第一个输入绑定上设置此值。 例如,当设置spring.cloud.stream.bindings.process-in-0.consumer.concurrency,它将被翻译为num.stream.threads通过活页夹。 如果您有多个处理器,并且一个处理器定义绑定级别并发,但不定义其他处理器,则那些没有绑定级别并发的处理器将默认返回通过spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads. 如果此 Binder 配置不可用,则应用程序将使用 Kafka Streams 设置的默认设置。spring-doc.cadn.net.cn

4. 提示、技巧和Recipes

4.1. 使用 Kafka 的简单 DLQ

4.1.1. 问题陈述

作为开发人员,我想编写一个消费者应用程序来处理来自 Kafka 主题的记录。但是,如果在处理过程中出现一些错误,我不希望应用程序完全停止。相反,我想将错误的记录发送到 DLT(死信主题),然后继续处理新记录。spring-doc.cadn.net.cn

4.1.2. 解决方案

这个问题的解决方案是使用 Spring Cloud Stream 中的 DLQ 功能。出于本次讨论的目的,让我们假设以下是我们的处理器函数。spring-doc.cadn.net.cn

@Bean
public Consumer<byte[]> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

这是一个非常微不足道的函数,它会为它处理的所有记录抛出异常,但您可以采用此函数并将其扩展到任何其他类似情况。spring-doc.cadn.net.cn

为了将错误的记录发送到 DLT,我们需要提供以下配置。spring-doc.cadn.net.cn

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq

为了激活 DLQ,应用程序必须提供组名称。 匿名使用者无法使用 DLQ 设施。 我们还需要通过设置enableDLQKafka 消费者绑定到true. 最后,我们可以选择通过提供dlqName在 Kafka 消费者绑定上,否则默认为error.input-topic.my-group在这种情况下。spring-doc.cadn.net.cn

请注意,在上面提供的示例消费者中,有效负载的类型是byte[]. 默认情况下,Kafka binder 中的 DLQ 生产者需要byte[]. 如果不是这种情况,那么我们需要提供正确序列化程序的配置。 例如,让我们重写消费者函数,如下所示:spring-doc.cadn.net.cn

@Bean
public Consumer<String> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

现在,我们需要告诉 Spring Cloud Stream,在写入 DLT 时我们希望如何序列化数据。 下面是此方案的修改配置:spring-doc.cadn.net.cn

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq
         dlqProducerProperties:
           configuration:
             value.serializer: org.apache.kafka.common.serialization.StringSerializer

4.2. 具有高级重试选项的 DLQ

4.2.1. 问题陈述

这与上面的配方类似,但作为开发人员,我想配置重试的处理方式。spring-doc.cadn.net.cn

4.2.2. 解决方案

如果您遵循了上述配方,那么当处理遇到错误时,您将获得 Kafka 绑定器中内置的默认重试选项。spring-doc.cadn.net.cn

默认情况下,活页夹最多停用 3 次尝试,初始延迟为 1 秒,每次回退为 2.0 乘数,最大延迟为 10 秒。 您可以按如下方式更改所有这些配置:spring-doc.cadn.net.cn

spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval

如果需要,还可以通过提供布尔值映射来提供可重试异常的列表。 例如spring-doc.cadn.net.cn

spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false

默认情况下,将重试上面映射中未列出的任何异常。如果不需要,那么您可以通过提供spring-doc.cadn.net.cn

spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false

您也可以提供自己的RetryTemplate并将其标记为@StreamRetryTemplate这将被活页夹扫描和使用。当您需要更复杂的重试策略和策略时,这很有用。spring-doc.cadn.net.cn

如果您有多个@StreamRetryTemplatebean,那么你可以使用属性spring-doc.cadn.net.cn

spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>

4.3. 使用 DLQ 处理反序列化错误

4.3.1. 问题陈述

我有一个处理器在 Kafka 消费者中遇到反序列化异常。我希望 Spring Cloud Stream DLQ 机制会捕获这种情况,但它没有。我该如何处理这个问题?spring-doc.cadn.net.cn

4.3.2. 解决方案

当 Kafka 消费者抛出不可恢复的反序列化异常时,Spring Cloud Stream 提供的正常 DLQ 机制将无济于事。 这是因为,此异常甚至发生在消费者的poll()方法返回。 Spring for Apache Kafka 项目提供了一些很好的方法来帮助 Binder 解决这种情况。 让我们来探讨一下。spring-doc.cadn.net.cn

假设这是我们的函数:spring-doc.cadn.net.cn

@Bean
public Consumer<String> functionName() {
    return s -> {
        System.out.println(s);
    };
}

这是一个简单的函数,需要一个String参数。spring-doc.cadn.net.cn

我们想绕过 Spring Cloud Stream 提供的消息转换器,而是想使用本机反序列化器。 在以下情况下String类型,这没有多大意义,但对于更复杂的类型,如 AVRO 等,您必须依赖外部解序列化器,因此希望将转换委托给 Kafka。spring-doc.cadn.net.cn

现在,当消费者收到数据时,让我们假设有一个导致反序列化错误的错误记录,也许有人传递了Integer而不是String例如。 在这种情况下,如果您不在应用程序中执行某些作,则异常将通过链传播,并且您的应用程序最终将退出。spring-doc.cadn.net.cn

为了处理这个问题,您可以添加一个ListenerContainerCustomizer @Bean配置DefaultErrorHandler. 这DefaultErrorHandler配置了DeadLetterPublishingRecoverer. 我们还需要配置一个ErrorHandlingDeserializer对于消费者。 这听起来像是很多复杂的事情,但实际上,在这种情况下,它归结为这 3 个豆子。spring-doc.cadn.net.cn

	@Bean
	public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
		return (container, dest, group) -> {
			container.setCommonErrorHandler(errorHandler);
		};
	}
	@Bean
	public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
		return new DefaultErrorHandler(deadLetterPublishingRecoverer);
	}
	@Bean
	public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
		return new DeadLetterPublishingRecoverer(bytesTemplate);
	}

让我们逐一分析一下。 第一个是ListenerContainerCustomizer需要DefaultErrorHandler. 容器现在已使用该特定错误处理程序进行自定义。 您可以在此处了解有关容器定制的更多信息。spring-doc.cadn.net.cn

第二个 bean 是DefaultErrorHandler配置了发布到DLT. 有关更多详细信息,请参阅此处DefaultErrorHandler.spring-doc.cadn.net.cn

第三个 bean 是DeadLetterPublishingRecoverer最终负责发送到DLT. 默认情况下,DLTtopic 被命名为ORIGINAL_TOPIC_NAME。分布式账本技术。 不过你可以改变这一点。 有关更多详细信息,请参阅文档spring-doc.cadn.net.cn

我们还需要通过应用程序配置配置 ErrorHandlingDeserializerspring-doc.cadn.net.cn

ErrorHandlingDeserializer委托给实际的解序列化程序。 如果出现错误,它会将记录的键/值设置为空,并包含消息的原始字节。 然后,它在标头中设置异常,并将此记录传递给侦听器,然后侦听器调用已注册的错误处理程序。spring-doc.cadn.net.cn

以下是所需的配置:spring-doc.cadn.net.cn

spring.cloud.stream:
  function:
    definition: functionName
  bindings:
    functionName-in-0:
      group: group-name
      destination: input-topic
      consumer:
       use-native-decoding: true
  kafka:
    bindings:
      functionName-in-0:
        consumer:
          enableDlq: true
          dlqName: dlq-topic
          dlqProducerProperties:
            configuration:
              value.serializer: org.apache.kafka.common.serialization.StringSerializer
          configuration:
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

我们正在提供ErrorHandlingDeserializer通过configuration属性。 我们还指出,要委托的实际反序列化程序是StringDeserializer.spring-doc.cadn.net.cn

请记住,上述 dlq 属性都与此配方中的讨论无关。 它们纯粹用于解决任何应用程序级错误。spring-doc.cadn.net.cn

4.4. Kafka Binder 中的基本偏移量管理

4.4.1. 问题陈述

我想编写一个 Spring Cloud Stream Kafka 消费者应用程序,但不确定它如何管理 Kafka 消费者偏移量。 你能解释一下吗?spring-doc.cadn.net.cn

4.4.2. 解决方案

我们鼓励您阅读有关此内容的文档部分以全面了解它。spring-doc.cadn.net.cn

这是它的要点:spring-doc.cadn.net.cn

Kafka 默认支持两种类型的偏移量开始 -earliestlatest. 从他们的名字来看,它们的语义是不言自明的。spring-doc.cadn.net.cn

假设您是第一次运行消费者。 如果您错过了 Spring Cloud Stream 应用程序中的 group.id,那么它就会成为匿名消费者。 无论何时,您有一个匿名消费者,在这种情况下,Spring Cloud Stream 应用程序默认将从latest主题分区中的可用偏移量。 另一方面,如果显式指定 group.id,则默认情况下,Spring Cloud Stream 应用程序将从earliest主题分区中的可用偏移量。spring-doc.cadn.net.cn

在上述两种情况下(具有显式组和匿名组的使用者),可以使用属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset并将其设置为earliestlatest.spring-doc.cadn.net.cn

现在,假设您之前已经运行了消费者,现在又开始了它。 在这种情况下,上述情况下的起始偏移量语义不适用,因为使用者会为使用者组找到已提交的偏移量(对于匿名使用者,尽管应用程序不提供 group.id,但绑定器会自动为您生成一个)。 它只是从最后一个承诺的偏移量开始。 这是真的,即使你有一个startOffset提供的值。spring-doc.cadn.net.cn

但是,您可以使用resetOffsets财产。 为此,请设置属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsetstrue(即false默认情况下)。 然后确保提供startOffset值(earliestlatest). 当您执行此作,然后启动使用者应用程序时,每次启动时,它都会像第一次启动一样启动,并忽略分区的任何已提交偏移量。spring-doc.cadn.net.cn

4.5. 在 Kafka 中寻求任意偏移

4.5.1. 问题陈述

使用 Kafka 绑定器,我知道它可以将偏移量设置为earliestlatest,但我有一个要求,即寻求中间某物的偏移量,一个任意偏移量。 有没有办法使用 Spring Cloud Stream Kafka 绑定器来实现这一点?spring-doc.cadn.net.cn

4.5.2. 解决方案

之前我们了解了 Kafka binder 如何允许您处理基本的偏移量管理。 默认情况下,活页夹不允许您倒带到任意偏移量,至少通过我们在该配方中看到的机制是这样。 但是,活页夹提供了一些低级策略来实现此用例。 让我们来探讨一下它们。spring-doc.cadn.net.cn

首先,当您想要重置为任意偏移量时,除了earliestlatest,请务必将resetOffsets配置设置为其默认值,即false. 然后,您必须提供类型为KafkaBindingRebalanceListener,这将被注入到所有消费者绑定中。 这是一个带有一些默认方法的界面,但这是我们感兴趣的方法:spring-doc.cadn.net.cn

/**
	 * Invoked when partitions are initially assigned or after a rebalance. Applications
	 * might only want to perform seek operations on an initial assignment. While the
	 * 'initial' argument is true for each thread (when concurrency is greater than 1),
	 * implementations should keep track of exactly which partitions have been sought.
	 * There is a race in that a rebalance could occur during startup and so a topic/
	 * partition that has been sought on one thread may be re-assigned to another
	 * thread and you may not wish to re-seek it at that time.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment on the current thread.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions, boolean initial) {
		// do nothing
	}

让我们看看细节。spring-doc.cadn.net.cn

从本质上讲,每次在主题分区的初始分配期间或重新平衡之后都会调用此方法。 为了更好地说明,让我们假设我们的主题是foo它有 4 个分区。 最初,我们只启动组中的单个使用者,并且该使用者将从所有分区中使用。 当使用者首次启动时,所有 4 个分区都会被初始分配。 但是,我们不想启动要以默认值 (earliest由于我们定义了一个组),而不是对于每个分区,我们希望它们在寻求任意偏移量后使用。 假设您有一个业务案例要从某些偏移中使用,如下所示。spring-doc.cadn.net.cn

Partition   start offset

0           1000
1           2000
2           2000
3           1000

这可以通过实现上述方法来实现,如下所示。spring-doc.cadn.net.cn

@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {

    Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
    topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
    topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);

    if (initial) {
        partitions.forEach(tp -> {
            if (topicPartitionOffset.containsKey(tp)) {
                final Long offset = topicPartitionOffset.get(tp);
                try {
                    consumer.seek(tp, offset);
                }
                catch (Exception e) {
                    // Handle exceptions carefully.
                }
            }
        });
    }
}

这只是一个基本的实现。 现实世界的用例比这复杂得多,您需要进行相应的调整,但这肯定会为您提供基本的草图。 当消费者seek失败,它可能会抛出一些运行时异常,您需要决定在这些情况下该怎么做。spring-doc.cadn.net.cn

4.5.3. 如果我们启动具有相同组 ID 的第二个消费者会怎样?

当我们添加第二个消费者时,将发生重新平衡,并且一些分区将被移动。 假设新的使用者获得分区23. 当这个新的 Spring Cloud Stream 消费者调用onPartitionsAssigned方法,它将看到这是分区的初始赋值23在这个消费者身上。 因此,它将执行寻道作,因为对initial论点。 对于第一个消费者,它现在只有分区01但是,对于该消费者来说,这只是一个重新平衡事件,不被视为初始分配。 因此,它不会重新搜索给定的偏移量,因为对initial论点。spring-doc.cadn.net.cn

4.6. 如何使用 Kafka binder 手动确认?

4.6.1. 问题陈述

使用 Kafka 绑定器,我想手动确认使用者中的消息。 我该怎么做?spring-doc.cadn.net.cn

4.6.2. 解决方案

默认情况下,Kafka 绑定器委托给 Spring for Apache Kafka 项目中的默认提交设置。 默认值ackMode在Spring,卡夫卡是batch. 有关这方面的更多详细信息,请参阅此处spring-doc.cadn.net.cn

在某些情况下,您希望禁用此默认提交行为并依赖手动提交。 以下步骤允许您做到这一点。spring-doc.cadn.net.cn

设置属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode设置为MANUALMANUAL_IMMEDIATE. 当它设置为这样时,就会有一个名为kafka_acknowledgment(从KafkaHeaders.ACKNOWLEDGMENT) 存在于消费者方法接收的消息中。spring-doc.cadn.net.cn

例如,将其想象为您的消费者方法。spring-doc.cadn.net.cn

@Bean
public Consumer<Message<String>> myConsumer() {
    return msg -> {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
        }
    };
}

然后,您将属性spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackModeMANUALMANUAL_IMMEDIATE.spring-doc.cadn.net.cn

4.7. 如何覆盖 Spring Cloud Stream 中的默认绑定名称?

4.7.1. 问题陈述

Spring Cloud Stream 根据函数定义和签名创建默认绑定,但是如何将它们覆盖为更友好的域名呢?spring-doc.cadn.net.cn

4.7.2. 解决方案

假设下面是函数签名。spring-doc.cadn.net.cn

@Bean
public Function<String, String> uppercase(){
...
}

默认情况下,Spring Cloud Stream 将创建如下绑定。spring-doc.cadn.net.cn

可以使用以下属性将这些绑定重写到某些内容。spring-doc.cadn.net.cn

spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out

在此之后,必须对新名称创建所有绑定属性,my-transformer-inmy-transformer-out.spring-doc.cadn.net.cn

这是另一个使用 Kafka Streams 和多个输入的示例。spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}

默认情况下,Spring Cloud Stream 将为此函数创建三个不同的绑定名称。spring-doc.cadn.net.cn

  1. 流程订单 in 0spring-doc.cadn.net.cn

  2. 流程订购合1spring-doc.cadn.net.cn

  3. 流程订单出 0spring-doc.cadn.net.cn

每次要在这些绑定上设置一些配置时,都必须使用这些绑定名称。 您不喜欢这样,并且希望使用更对域友好且可读的绑定名称,例如,类似的东西。spring-doc.cadn.net.cn

只需设置这三个属性即可轻松做到这一点spring-doc.cadn.net.cn

  1. spring.cloud.stream.function.bindings.processOrder-in-0=订单spring-doc.cadn.net.cn

  2. spring.cloud.stream.function.bindings.processOrder-in-1=账户spring-doc.cadn.net.cn

  3. spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrdersspring-doc.cadn.net.cn

执行此作后,它将覆盖默认绑定名称,并且要在其上设置的任何属性都必须位于这些新绑定名称上。spring-doc.cadn.net.cn

4.8. 如何发送消息密钥作为记录的一部分?

4.8.1. 问题陈述

我需要将密钥与记录的有效负载一起发送,有没有办法在 Spring Cloud Stream 中做到这一点?spring-doc.cadn.net.cn

4.8.2. 解决方案

通常需要将关联数据结构(如地图)作为具有键和值的记录发送。 Spring Cloud Stream 允许您以简单的方式做到这一点。 以下是执行此作的基本蓝图,但您可能希望将其调整为特定用例。spring-doc.cadn.net.cn

这是示例生产者方法(又名Supplier).spring-doc.cadn.net.cn

@Bean
public Supplier<Message<String>> supplier() {
    return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}

这是一个微不足道的函数,它发送一条带有String有效载荷,但也带有密钥。 请注意,我们将键设置为消息头KafkaHeaders.MESSAGE_KEY.spring-doc.cadn.net.cn

如果要更改默认键kafka_messageKey,那么在配置中,我们需要指定这个属性:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']

请注意,我们使用绑定名称supplier-out-0由于这是我们的函数名称,请相应更新。spring-doc.cadn.net.cn

然后,我们在生成消息时使用这个新键。spring-doc.cadn.net.cn

4.9. 如何使用本机序列化器和反序列化器而不是 Spring Cloud Stream 完成的消息转换?

4.9.1. 问题陈述

我想在 Kafka 中使用本机序列化器和反序列化器,而不是使用 Spring Cloud Stream 中的消息转换器。 默认情况下,Spring Cloud Stream 使用其内部内置消息转换器来处理此转换。 如何绕过这一点并将责任委托给 Kafka?spring-doc.cadn.net.cn

4.9.2. 解决方案

这真的很容易做到。spring-doc.cadn.net.cn

您所要做的就是提供以下属性以启用本机序列化。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true

然后,您还需要设置序列化程序。 有几种方法可以做到这一点。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

或使用活页夹配置。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

使用绑定方式时,它适用于所有绑定,而在绑定处设置它们是每个绑定。spring-doc.cadn.net.cn

在反序列化方面,您只需提供反序列化器作为配置。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

您还可以在活页夹级别设置它们。spring-doc.cadn.net.cn

可以设置一个可选属性来强制本机解码。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true

但是,对于 Kafka 绑定器,这是不必要的,因为当它到达绑定器时,Kafka 已经使用配置的反序列化器对它们进行了反序列化。spring-doc.cadn.net.cn

4.10. 解释偏移重置在 Kafka Streams 绑定器中的工作原理

4.10.1. 问题陈述

默认情况下,Kafka Streams 绑定器始终从新使用者的最早偏移量开始。 有时,应用程序从最新的偏移量开始是有益的或要求的。 Kafka Streams 绑定器允许您做到这一点。spring-doc.cadn.net.cn

4.10.2. 解决方案

在我们查看解决方案之前,让我们先看看以下场景。spring-doc.cadn.net.cn

@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
    (s, t) -> s.join(t, ...)
    ...
}

我们有一个BiConsumer需要两个输入绑定的 bean。 在这种情况下,第一个绑定是针对KStream第二个是针对KTable. 首次运行此应用程序时,默认情况下,两个绑定都从earliest抵消。 我想从latest由于某些要求而抵消? 您可以通过启用以下属性来执行此作。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest

如果您只想从一个绑定开始latestoffset 和另一个与默认值的消费者earliest,则将后者从配置中保留绑定。spring-doc.cadn.net.cn

请记住,一旦存在已提交偏移量,这些设置将不会被接受,并且已提交偏移量优先。spring-doc.cadn.net.cn

4.11. 跟踪 Kafka 中记录的成功发送(生成)

4.11.1. 问题陈述

我有一个 Kafka 生产者应用程序,我想跟踪我所有成功的发送。spring-doc.cadn.net.cn

4.11.2. 解决方案

让我们假设我们在应用程序中有以下提供商。spring-doc.cadn.net.cn

@Bean
	public Supplier<Message<String>> supplier() {
		return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
	}

然后,我们需要定义一个新的MessageChannelbean 来捕获所有成功的发送信息。spring-doc.cadn.net.cn

@Bean
	public MessageChannel fooRecordChannel() {
		return new DirectChannel();
	}

接下来,在应用程序配置中定义此属性,以提供recordMetadataChannel.spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel

此时,成功发送的信息将发送到fooRecordChannel.spring-doc.cadn.net.cn

您可以编写一个IntegrationFlow如下所示,以查看信息。spring-doc.cadn.net.cn

@Bean
public IntegrationFlow integrationFlow() {
    return f -> f.channel("fooRecordChannel")
                 .handle((payload, messageHeaders) -> payload);
}

handle方法,有效负载是发送到 Kafka 的内容,消息头包含一个名为kafka_recordMetadata. 它的值是RecordMetadata包含有关主题分区、当前偏移量等的信息。spring-doc.cadn.net.cn

4.12. 在 Kafka 中添加自定义标头映射器

4.12.1. 问题陈述

我有一个 Kafka 生产者应用程序,它设置了一些标头,但消费者应用程序中缺少它们。为什么?spring-doc.cadn.net.cn

4.12.2. 解决方案

一般情况下,这应该没问题。spring-doc.cadn.net.cn

想象一下,你有以下生产商。spring-doc.cadn.net.cn

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}

在消费者方面,您仍然应该看到标头“foo”,并且以下内容应该不会给您带来任何问题。spring-doc.cadn.net.cn

@Bean
public Consumer<Message<String>> consume() {
    return s -> {
        final String foo = (String)s.getHeaders().get("foo");
        System.out.println(foo);
    };
}

如果您在应用程序中提供自定义标头映射器,则这将不起作用。 假设您有一个空的KafkaHeaderMapper在应用程序中。spring-doc.cadn.net.cn

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {

        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {

        }
    };
}

如果这是你的实现,那么你会错过foo标头。 很有可能,你可能在其中有一些逻辑KafkaHeaderMapper方法。 您需要以下内容来填充foo页眉。spring-doc.cadn.net.cn

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String foo = (String) headers.get("foo");
            target.add("foo", foo.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header foo = source.lastHeader("foo");
			target.put("foo", new String(foo.value()));
        }
    }

这将正确填充foo标头。spring-doc.cadn.net.cn

4.12.3. 关于 id 标头的特别说明

在 Spring Cloud Stream 中,idheader 是一个特殊的标头,但某些应用程序可能希望具有特殊的自定义 ID 标头 - 类似于custom-idIDId. 第一个 (custom-id) 将在没有任何自定义标头映射器的情况下从生产者传播到消费者。但是,如果您使用保留的框架变体进行生产idheader - 例如ID,Id,iD等等。那么你会遇到框架内部的问题。请参阅此 StackOverflow 线程,了解有关此用例的更多上下文。在这种情况下,您必须使用自定义KafkaHeaderMapper以映射区分大小写的 id 标头。例如,假设您有以下生产者。spring-doc.cadn.net.cn

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}

标题Id以上将从消费方消失,因为它与框架发生冲突id页眉。 可以提供自定义KafkaHeaderMapper来解决这个问题。spring-doc.cadn.net.cn

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String myId = (String) headers.get("Id");
			target.add("Id", myId.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header Id = source.lastHeader("Id");
			target.put("Id", new String(Id.value()));
        }
    };
}

通过这样做,两者idId标头将从生产者到消费者端可用。spring-doc.cadn.net.cn

4.13. 在事务中生成多个主题

4.13.1. 问题陈述

如何生成到多个 Kafka 主题的事务性消息?spring-doc.cadn.net.cn

有关更多上下文,请参阅此 StackOverflow 问题spring-doc.cadn.net.cn

4.13.2. 解决方案

在 Kafka 绑定器中使用事务支持,然后提供AfterRollbackProcessor. 为了生成多个主题,请使用StreamBridge应用程序接口。spring-doc.cadn.net.cn

以下是为此的代码片段:spring-doc.cadn.net.cn

@Autowired
StreamBridge bridge;

@Bean
Consumer<String> input() {
    return str -> {
        System.out.println(str);
        this.bridge.send("left", str.toUpperCase());
        this.bridge.send("right", str.toLowerCase());
        if (str.equals("Fail")) {
            throw new RuntimeException("test");
        }
    };
}

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
    return (container, dest, group) -> {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
                MessageChannel.class)).getTransactionalProducerFactory();
        KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
        DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
        container.setAfterRollbackProcessor(rollbackProcessor);
    };
}

DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
    return new DefaultAfterRollbackProcessor<>(
            new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}

4.13.3. 所需配置

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right

spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1

为了进行测试,您可以使用以下内容:spring-doc.cadn.net.cn

@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
    return args -> {
        System.in.read();
        template.send("input", "Fail".getBytes());
        template.send("input", "Good".getBytes());
    };
}

一些重要注意事项:spring-doc.cadn.net.cn

请确保您在应用程序配置上没有任何 DLQ 设置,因为我们手动配置 DLT(默认情况下,它将发布到名为input.DLT基于初始消费者函数)。 此外,重置maxAttempts在消费者绑定到1以避免绑定器重试。 在上面的示例中,它将最多尝试 3 次(初始尝试 + 在FixedBackoff).spring-doc.cadn.net.cn

有关如何测试此代码的更多详细信息,请参阅 StackOverflow 线程。 如果您使用 Spring Cloud Stream 通过添加更多消费者函数来测试它,请确保将isolation-level在消费者绑定到read-committed.spring-doc.cadn.net.cn

这个 StackOverflow 线程也与此讨论有关。spring-doc.cadn.net.cn

4.14. 运行多个可轮询消费者时要避免的陷阱

4.14.1. 问题陈述

如何运行可轮询消费者的多个实例并生成唯一的client.id对于每个实例?spring-doc.cadn.net.cn

4.14.2. 解决方案

假设我有以下定义:spring-doc.cadn.net.cn

spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group

运行应用程序时,Kafka 消费者会生成一个 client.id(类似于consumer-my-group-1). 对于正在运行的应用程序的每个实例,此client.id将是相同的,从而导致意外问题。spring-doc.cadn.net.cn

为了解决此问题,可以在应用程序的每个实例上添加以下属性:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}

有关更多详细信息,请参阅此 GitHub 问题spring-doc.cadn.net.cn