Apache Kafka Binder
参考指南
本指南介绍了 Spring Cloud Stream Binder 的 Apache Kafka 实现。 它包含有关其设计、用法和配置选项的信息,以及有关 Stream Cloud Stream 概念如何映射到 Apache Kafka 特定构造的信息。 此外,本指南还介绍了 Spring Cloud Stream 的 Kafka Streams 绑定能力。
1. Apache Kafka 绑定器
1.1. 用法
要使用 Apache Kafka 绑定器,您需要将spring-cloud-stream-binder-kafka
作为 Spring Cloud Stream 应用程序的依赖项,如以下 Maven 示例所示:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
或者,您也可以使用 Spring Cloud Stream Kafka Starter,如以下 Maven 示例所示:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
1.2. 概述
下图显示了 Apache Kafka 绑定程序如何运行的简化图:

Apache Kafka Binder 实现将每个目标映射到 Apache Kafka 主题。 消费者组直接映射到相同的 Apache Kafka 概念。 分区也直接映射到 Apache Kafka 分区。
该绑定器当前使用 Apache Kafkakafka-clients
版本3.1.0
.
此客户端可以与较旧的代理通信(请参阅 Kafka 文档),但某些功能可能不可用。
例如,对于低于 0.11.x.x 的版本,不支持本机标头。
此外,0.11.x.x 不支持autoAddPartitions
财产。
1.3. 配置选项
本节包含 Apache Kafka 绑定器使用的配置选项。
有关与活页夹相关的常见配置选项和属性,请参阅核心文档中的绑定属性。
1.3.1. Kafka Binder 属性
- spring.cloud.stream.kafka.binder.brokers
-
Kafka 绑定器连接到的代理列表。
违约:
localhost
. - spring.cloud.stream.kafka.binder.defaultBrokerPort 的
-
brokers
允许指定带或不带端口信息的主机(例如,host1,host2:port2
). 当代理列表中未配置端口时,这将设置默认端口。违约:
9092
. - spring.cloud.stream.kafka.binder.configuration
-
客户端属性(生产者和使用者)的键/值映射传递给绑定器创建的所有客户端。 由于生产者和使用者都使用这些属性,因此使用应限制为公共属性,例如安全设置。 通过此配置提供的未知 Kafka 生产者或消费者属性将被过滤掉,不允许传播。 此处的属性将取代启动中设置的任何属性。
默认值:空地图。
- spring.cloud.stream.kafka.binder.consumer属性
-
任意 Kafka 客户端使用者属性的键/值映射。 除了支持已知的 Kafka 使用者属性外,这里还允许未知的使用者属性。 此处的属性将取代在启动和
configuration
属性。默认值:空地图。
- spring.cloud.stream.kafka.binder.headers
-
由活页夹传输的自定义标头列表。 仅当与旧应用程序(⇐ 1.3.x)通信时才需要
kafka-clients
版本 < 0.11.0.0。较新的版本本机支持标头。默认值:空。
- spring.cloud.stream.kafka.binder.health超时
-
等待获取分区信息的时间,以秒为单位。 如果此计时器过期,运行状况将报告为关闭。
默认值:10。
- spring.cloud.stream.kafka.binder.requiredAcks
-
代理上所需的确认数。 请参阅生产者的 Kafka 文档
acks
财产。违约:
1
. - spring.cloud.stream.kafka.binder.minPartitionCount
-
仅在以下情况下有效
autoCreateTopics
或autoAddPartitions
已设置。 Binder 在生成或使用数据的主题上配置的全局最小分区数。 它可以被partitionCount
的设置或通过instanceCount * concurrency
生产者的设置(如果其中任何一个较大)。违约:
1
. - spring.cloud.stream.kafka.binder.producer属性
-
任意 Kafka 客户端生产者属性的键/值映射。 除了支持已知的 Kafka 生产者属性外,这里还允许使用未知的生产者属性。 此处的属性将取代在启动和
configuration
属性。默认值:空地图。
- spring.cloud.stream.kafka.binder.replicationFactor
-
自动创建主题的复制因子
autoCreateTopics
处于活动状态。 可以在每个绑定上覆盖。如果您使用的是 2.4 之前的 Kafka 代理版本,则此值应至少设置为 1
. 从 3.0.8 版开始,绑定器使用-1
作为默认值,这表示代理“default.replication.factor”属性将用于确定副本数。 请咨询您的 Kafka 代理管理员,看看是否有需要最小复制因子的策略,如果是这种情况,则通常会default.replication.factor
将匹配该值,并且-1
应使用,除非您需要大于最小值的复制系数。违约:
-1
. - spring.cloud.stream.kafka.binder.autoCreateTopics
-
如果设置为
true
,活页夹会自动创建新主题。 如果设置为false
,则活页夹依赖于已配置的主题。 在后一种情况下,如果主题不存在,则活页夹无法启动。此设置独立于 auto.create.topics.enable
经纪人的设置,不会影响它。 如果服务器设置为自动创建主题,则可以使用默认代理设置将其作为元数据检索请求的一部分创建。违约:
true
. - spring.cloud.stream.kafka.binder.autoAddPartitions
-
如果设置为
true
,如果需要,活页夹会创建新分区。 如果设置为false
,则 binder 依赖于已配置的主题的分区大小。 如果目标主题的分区计数小于预期值,则绑定程序无法启动。违约:
false
. - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
在活页夹中启用事务。看
transaction.id
在 Kafka 文档中,在spring-kafka
文档。 启用交易后,单个producer
属性被忽略,所有生产者都使用spring.cloud.stream.kafka.binder.transaction.producer.*
性能。默认值
null
(无交易) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
事务性活页夹中生产者的全局生产者属性。 看
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
和 Kafka 生产者属性以及所有绑定器支持的通用生产者属性。默认值:查看各个生产者属性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
的 bean 名称
KafkaHeaderMapper
用于映射spring-messaging
headers 与 Kafka 标头之间的标头。例如,如果您希望自定义BinderHeaderMapper
对标头使用 JSON 反序列化的 bean。如果这个自定义BinderHeaderMapper
使用此属性的 Bean 不可供 Binder 使用,则 Binder 将查找名称为kafkaBinderHeaderMapper
即BinderHeaderMapper
在回退到默认值之前BinderHeaderMapper
由活页夹创建。默认值:无。
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
-
将绑定器运行状况设置为
down
,当找到主题上的任何分区时,无论从该分区接收数据的使用者如何,都找不到没有领导者。违约:
false
. - spring.cloud.stream.kafka.binder.certificateStore目录
-
当信任库或密钥库证书位置作为非本地文件系统资源(org.springframework.core.io.Resource 支持的资源,例如 CLASSPATH、HTTP 等)时,Binder 将资源从路径(可转换为 org.springframework.core.io.Resource)复制到文件系统上的某个位置。对于两个代理级证书(
ssl.truststore.location
和ssl.keystore.location
) 和用于架构注册表的证书 (schema.registry.ssl.truststore.location
和schema.registry.ssl.keystore.location
). 请记住,信任库和密钥库位置路径必须在spring.cloud.stream.kafka.binder.configuration…
. 例如spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location
,spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location
等。 该文件将被复制到指定为此属性值的位置,该位置必须是文件系统上可由运行应用程序的进程写入的现有目录。如果未设置此值,并且证书文件是非本地文件系统资源,则它将复制到系统的临时目录,如System.getProperty("java.io.tmpdir")
. 如果存在此值,但在文件系统上找不到该目录或不可写,则也是如此。默认值:无。
- spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetrics启用
-
当设置为 true 时,每当访问指标时,都会计算每个使用者主题的偏移滞后指标。 当设置为 false 时,仅使用定期计算的偏移滞后。
默认值:true
- spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval
-
计算每个使用者主题的偏移滞后的时间间隔。 每当
metrics.defaultOffsetLagMetricsEnabled
被禁用或其 计算时间太长。默认值:60 秒
- spring.cloud.stream.kafka.binder.enableObservation
-
在此活页夹中的所有绑定上启用千分尺观察注册表。
默认值:false
1.3.2. Kafka 消费者属性
为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为spring.cloud.stream.kafka.default.consumer.<property>=<value> . |
以下属性仅适用于 Kafka 使用者,并且
必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.
.
- admin.configuration
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.properties
,并且将在将来的版本中删除对它的支持。 - admin.replicas-assignment
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replicas-assignment
,并且将在将来的版本中删除对它的支持。 - admin.replication-factor
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replication-factor
,并且将在将来的版本中删除对它的支持。 - autoRebalance启用
-
什么时候
true
,主题分区将在消费者组的成员之间自动重新平衡。 什么时候false
,每个消费者都会根据spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
. 这需要同时spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
在每个启动的实例上适当设置的属性。 的值spring.cloud.stream.instanceCount
在这种情况下,属性通常必须大于 1。违约:
true
. - ackEach记录
-
什么时候
autoCommitOffset
是true
,则此设置规定是否在处理每条记录后提交偏移量。 默认情况下,偏移量将在consumer.poll()
已处理。 轮询返回的记录数可以通过max.poll.records
Kafka 属性,通过使用者设置configuration
财产。 将此设置为true
可能会导致性能下降,但这样做会降低发生故障时重新传递记录的可能性。 另请参阅活页夹requiredAcks
属性,这也会影响提交偏移量的性能。 此属性从 3.1 开始被弃用,转而使用ackMode
. 如果ackMode
未设置且未启用批处理模式,RECORD
ackMode 将被使用。违约:
false
. - 自动提交偏移
-
从版本 3.1 开始,此属性已弃用。 看
ackMode
有关替代方案的更多详细信息。 是否在处理消息时自动提交偏移量。 如果设置为false
,带有键kafka_acknowledgment
的类型org.springframework.kafka.support.Acknowledgment
标头存在于入站消息中。 应用程序可以使用此标头来确认消息。 有关详细信息,请参阅示例部分。 当此属性设置为false
,Kafka 绑定器将 ack 模式设置为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL
应用程序负责确认记录。 另请参阅ackEachRecord
.违约:
true
. - ack模式
-
指定容器确认模式。 这是基于 Spring Kafka 中定义的 AckMode 枚举。 如果
ackEachRecord
属性设置为true
并且消费者不处于批处理模式,则这将使用RECORD
,否则,请使用此属性使用提供的 ack 模式。 - 自动提交错误
-
在可轮询的消费者中,如果设置为
true
,它总是在错误时自动提交。 如果未设置(默认值)或 false,则不会在可轮询的消费者中自动提交。 请注意,此属性仅适用于可轮询的使用者。默认值:未设置。
- 重置偏移量
-
是否将使用者上的偏移量重置为 startOffset 提供的值。 如果
KafkaBindingRebalanceListener
提供;请参阅使用 KafkaBindingRebalanceListener。 有关此属性的更多信息,请参阅重置偏移。违约:
false
. - 开始偏移量
-
新组的起始偏移量。 允许的值:
earliest
和latest
. 如果为消费者“绑定”显式设置了使用者组(通过spring.cloud.stream.bindings.<channelName>.group
),'startOffset' 设置为earliest
.否则,它设置为latest
对于anonymous
消费者群体。 有关此属性的更多信息,请参阅重置偏移。默认值:null(相当于
earliest
). - 启用Dlq
-
当设置为 true 时,它会为使用者启用 DLQ 行为。 默认情况下,导致错误的邮件将转发到名为
error.<destination>.<group>
. 可以通过设置dlqName
属性或通过定义@Bean
类型DlqDestinationResolver
. 这为更常见的 Kafka 重放场景提供了另一种选择,适用于错误数量相对较少且重放整个原始主题可能过于繁琐的情况。 请参阅死信主题处理处理以获取更多信息。 从 V2.0 开始,发送到 DLQ 主题的消息将使用以下标头进行增强:x-original-topic
,x-exception-message
和x-exception-stacktrace
如byte[]
. 缺省情况下,失败的记录将发送到 DLQ 主题中与原始记录相同的分区号。 有关如何更改该行为,请参阅死信主题分区选择。不允许在以下情况下使用destinationIsPattern
是true
.违约:
false
. - dlq分区
-
什么时候
enableDlq
为 true,并且未设置此属性,则创建与主主题具有相同分区数的死信主题。 通常,死信记录将发送到死信主题中与原始记录相同的分区。 此行为可以更改;请参阅死信主题分区选择。 如果此属性设置为1
并且没有DqlPartitionFunction
bean,所有死信记录都将写入分区0
. 如果此属性大于1
,您必须提供一个DlqPartitionFunction
豆。 请注意,实际分区计数受活页夹的minPartitionCount
财产。违约:
none
- 配置
-
映射包含通用 Kafka 使用者属性的键/值对。 除了具有 Kafka 消费者属性外,还可以在此处传递其他配置属性。 例如,应用程序所需的某些属性,例如
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar
. 这bootstrap.servers
属性不能在此处设置;如果需要连接到多个集群,请使用多绑定器支持。默认值:空地图。
- dlq名称
-
要接收错误消息的 DLQ 主题的名称。
默认值:null(如果未指定,则导致错误的消息将转发到名为
error.<destination>.<group>
). - dlqProducer属性
-
使用此功能,可以设置特定于 DLQ 的生产者属性。 通过 kafka 生产者属性提供的所有属性都可以通过此属性进行设置。 当在消费者上启用本机解码(即 useNativeDecoding: true)时,应用程序必须为 DLQ 提供相应的键/值序列化器。 这必须以以下形式提供
dlqProducerProperties.configuration.key.serializer
和dlqProducerProperties.configuration.value.serializer
.默认值:默认 Kafka 生产者属性。
- 标准标头
-
指示入站通道适配器填充的标准标头。 允许的值:
none
,id
,timestamp
或both
. 如果使用本机反序列化并且接收消息的第一个组件需要id
(例如配置为使用 JDBC 消息存储的聚合器)。违约:
none
- 转换器BeanName
-
实现
RecordMessageConverter
.用于入站通道适配器以替换默认的MessagingMessageConverter
.违约:
null
- 空闲事件间隔
-
指示最近未收到任何消息的事件之间的间隔(以毫秒为单位)。 使用
ApplicationListener<ListenerContainerIdleEvent>
以接收这些事件。 有关使用示例,请参阅示例:暂停和恢复消费者。违约:
30000
- 目的地是模式
-
当 true 时,目标被视为正则表达式
Pattern
用于匹配代理的主题名称。 如果为 true,则不会预配主题,并且enableDlq
不允许,因为绑定程序在配置阶段不知道主题名称。 请注意,检测与模式匹配的新主题所花费的时间由消费者属性控制metadata.max.age.ms
,(在撰写本文时)默认为 300,000 毫秒(5 分钟)。 这可以使用configuration
属性。违约:
false
- topic.properties
-
一个
Map
配置新主题时使用的 Kafka 主题属性——例如spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0
默认值:无。
- topic.replicas-assignment
-
副本分配的 Map<Integer, List<Integer>>,键是分区,值是分配。 在预配新主题时使用。 请参阅
NewTopic
Javadocs 中的kafka-clients
罐。默认值:无。
- topic.replication-factor
-
预配主题时要使用的复制因子。覆盖活页夹范围的设置。如果出现以下情况,则忽略
replicas-assignments
存在。默认值:无(使用活页夹范围的默认值 -1)。
- pollTimeout
-
超时,用于在可轮询消费者中轮询。
默认值:5 秒。
- 事务管理器
-
的 Bean 名称
KafkaAwareTransactionManager
用于覆盖此绑定的绑定器的事务管理器。如果您想将另一个事务与 Kafka 事务同步,通常需要使用ChainedKafkaTransactionManaager
. 要实现记录的一次使用和生产,使用者和生产者绑定必须全部配置为相同的事务管理器。默认值:无。
- txCommit已恢复
-
使用事务性绑定器时,默认情况下,恢复记录的偏移量(例如,当重试用尽并将记录发送到死信主题时)将通过新事务提交。 将此属性设置为
false
禁止提交已恢复记录的偏移量。默认值:true。
- commonErrorHandlerBeanName (常见错误处理程序)BeanName
-
CommonErrorHandler
每个消费者绑定使用的 bean 名称。 如果存在,此用户提供了CommonErrorHandler
优先于绑定程序定义的任何其他错误处理程序。 这是表达错误处理程序的便捷方法,如果应用程序不想使用ListenerContainerCustomizer
然后检查目标/组组合以设置错误处理程序。默认值:无。
1.3.3. 重置偏移量
当应用程序启动时,每个分配的分区中的初始位置取决于两个属性startOffset
和resetOffsets
.
如果resetOffsets
是false
,普通 Kafka 消费者auto.offset.reset
语义适用。
即,如果绑定的消费者组的分区没有提交的偏移量,则位置为earliest
或latest
.
默认情况下,具有显式group
用earliest
和匿名绑定(没有group
)使用latest
.
可以通过将startOffset
binding 属性。
第一次使用特定group
.
不存在已提交偏移量的另一种情况是偏移量已过期。
使用现代代理(自 2.1 起)和默认代理属性,偏移量将在最后一个成员离开组后 7 天过期。
请参阅offsets.retention.minutes
经纪人属性了解更多信息。
什么时候resetOffsets
是true
,则绑定器将与代理上没有提交偏移量时适用的语义类似的语义,就好像此绑定从未从主题中消耗过一样;即,任何当前提交的偏移量都将被忽略。
以下是可以使用此功能的两个用例。
-
从包含键/值对的压缩主题使用。 设置
resetOffsets
自true
和startOffset
自earliest
;绑定将执行seekToBeginning
在所有新分配的分区上。 -
从包含事件的主题使用,其中您只对此绑定运行时发生的事件感兴趣。 设置
resetOffsets
自true
和startOffset
自latest
;绑定将执行seekToEnd
在所有新分配的分区上。
如果在初始分配之后发生重新平衡,则仅对在初始分配期间未分配的任何新分配的分区执行搜索。 |
有关对主题偏移量的更多控制,请参阅使用 KafkaBindingRebalanceListener;当提供侦听器时,resetOffsets
不应设置为true
,否则会导致错误。
1.3.4. 使用批次
从 3.0 版本开始,当spring.cloud.stream.bindings.<name>.consumer.batch-mode
设置为true
,通过轮询 Kafka 收到的所有记录Consumer
将作为List<?>
到 listener 方法。
否则,将一次调用一条记录。
批处理的大小由 Kafka 使用者属性控制max.poll.records
,fetch.min.bytes
,fetch.max.wait.ms
;有关更多信息,请参阅 Kafka 文档。
从版本开始4.0.2
,则活页夹在批处理模式下使用时支持 DLQ 功能。
请记住,在处于批处理模式的使用者绑定上使用 DLQ 时,从上一次轮询接收的所有记录都将传递到 DLQ 主题。
使用批处理模式时,不支持在绑定器中重试,因此maxAttempts 将被覆盖为 1。
您可以配置DefaultErrorHandler (使用ListenerContainerCustomizer ) 以实现与在 Binder 中重试类似的功能。
您也可以使用手册AckMode 并调用Ackowledgment.nack(index, sleep) 提交部分批次的偏移量并重新传递剩余记录。
有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档。 |
1.3.5. Kafka 生产者属性
为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为spring.cloud.stream.kafka.default.producer.<property>=<value> . |
以下属性仅适用于 Kafka 生产者,并且
必须以spring.cloud.stream.kafka.bindings.<channelName>.producer.
.
- admin.configuration
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.properties
,并且将在将来的版本中删除对它的支持。 - admin.replicas-assignment
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replicas-assignment
,并且将在将来的版本中删除对它的支持。 - admin.replication-factor
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replication-factor
,并且将在将来的版本中删除对它的支持。 - 缓冲区大小
-
Kafka 生产者在发送前尝试批处理的数据量上限(以字节为单位)。
违约:
16384
. - 同步
-
生产者是否同步。
违约:
false
. - 发送超时表达式
-
根据传出消息计算的 SpEL 表达式,用于在启用同步发布时评估等待 ack 的时间,例如,
headers['mySendTimeout']
. 超时值以毫秒为单位。 在 3.0 之前的版本中,除非使用本机编码,否则无法使用有效负载,因为在计算此表达式时,有效负载已经采用byte[]
. 现在,在转换有效负载之前对表达式进行计算。违约:
none
. - batchTimeout
-
生产者在发送消息之前等待多长时间,以允许更多消息在同一批次中累积。 (通常,生产者根本不会等待,而只是发送上一次发送过程中累积的所有消息。非零值可能会以延迟为代价来增加吞吐量。
违约:
0
. - messageKey表达式
-
根据用于填充生成的 Kafka 消息的密钥的传出消息进行评估的 SpEL 表达式,例如
headers['myKey']
. 在 3.0 之前的版本中,除非使用本机编码,否则无法使用有效负载,因为在计算此表达式时,有效负载已经采用byte[]
. 现在,在转换有效负载之前对表达式进行计算。 对于常规处理器(Function<String, String>
或Function<Message<?>, Message<?>
),如果生成的密钥需要与来自主题的传入密钥相同,则可以按如下方式设置此属性。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']
对于响应式函数,需要牢记一个重要的警告。 在这种情况下,由应用程序手动将标头从传入邮件复制到出站邮件。 您可以设置标题,例如myKey
并使用headers['myKey']
如上所述,或者为方便起见,只需将KafkaHeaders.MESSAGE_KEY
header,并且您根本不需要设置此属性。违约:
none
. - headerPatterns
-
以逗号分隔的简单模式列表,用于匹配要映射到 Kafka 的 Spring 消息传递标头
Headers
在ProducerRecord
. 模式可以以通配符(星号)开头或结尾。 模式可以通过前缀!
. 匹配在第一次匹配(正或负)后停止。 例如!ask,as*
将通过ash
但不是ask
.id
和timestamp
永远不会映射。默认值:(所有标头 - 除了
*
id
和timestamp
) - 配置
-
映射包含通用 Kafka 生产者属性的键/值对。 这
bootstrap.servers
属性不能在此处设置;如果需要连接到多个集群,请使用多绑定器支持。默认值:空地图。
- topic.properties
-
一个
Map
配置新主题时使用的 Kafka 主题属性——例如spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0
- topic.replicas-assignment
-
副本分配的 Map<Integer, List<Integer>>,键是分区,值是分配。 在预配新主题时使用。 请参阅
NewTopic
Javadocs 中的kafka-clients
罐。默认值:无。
- topic.replication-factor
-
预配主题时要使用的复制因子。覆盖活页夹范围的设置。如果出现以下情况,则忽略
replicas-assignments
存在。默认值:无(使用活页夹范围的默认值 -1)。
- 使用主题标头
-
设置为
true
将默认绑定目标(主题名称)替换为KafkaHeaders.TOPIC
出站邮件中的邮件头。 如果标头不存在,则使用默认绑定目标。违约:
false
. - 记录元数据通道
-
的 bean 名称
MessageChannel
应将成功的发送结果发送到哪个;Bean 必须存在于应用程序上下文中。 发送到通道的消息是带有附加标头的已发送消息(转换后,如果有)KafkaHeaders.RECORD_METADATA
. 标头包含一个RecordMetadata
Kafka 客户端提供的对象;它包括在主题中写入记录的分区和偏移量。ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
失败的发送将进入生产者错误通道(如果已配置);请参阅错误通道。
默认值:null。
Kafka 绑定器使用partitionCount 将生产者设置为提示,以创建具有给定分区计数的主题(结合minPartitionCount ,两者中的最大值是正在使用的值)。
配置两者时要小心minPartitionCount 用于活页夹和partitionCount 对于应用程序,因为使用较大的值。
如果主题已存在分区计数较小且autoAddPartitions 禁用(默认值),则活页夹无法启动。
如果主题已存在分区计数较小且autoAddPartitions 启用,则添加新分区。
如果主题已存在分区数大于 (minPartitionCount 或partitionCount ),则使用现有分区计数。 |
- 压缩
-
将
compression.type
producer 属性。 支持的值包括none
,gzip
,snappy
,lz4
和zstd
. 如果您覆盖kafka-clients
jar 到 2.1.0(或更高版本),如 Spring for Apache Kafka 文档中所述,并希望使用zstd
压缩, 使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd
.违约:
none
. - 事务管理器
-
的 Bean 名称
KafkaAwareTransactionManager
用于覆盖此绑定的绑定器的事务管理器。如果您想将另一个事务与 Kafka 事务同步,通常需要使用ChainedKafkaTransactionManaager
. 要实现记录的一次使用和生产,使用者和生产者绑定必须全部配置为相同的事务管理器。默认值:无。
- close超时
-
关闭生产者时等待的超时秒数。
违约:
30
- 允许非事务性
-
通常,与事务性绑定器关联的所有输出绑定都将在新事务中发布(如果尚未处理)。 此属性允许您覆盖该行为。 如果设置为 true,则发布到此输出绑定的记录将不会在事务中运行,除非事务已在处理中。
违约:
false
1.3.6. 使用示例
在本部分中,我们将演示上述属性在特定方案中的用法。
示例:设置ackMode
自MANUAL
和依赖手动确认
此示例说明了如何在使用者应用程序中手动确认偏移量。
此示例要求spring.cloud.stream.kafka.bindings.input.consumer.ackMode
设置为MANUAL
.
为您的示例使用相应的输入通道名称。
@SpringBootApplication
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@Bean
public Consumer<Message<?>> process() {
return message -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
};
}
示例:安全配置
Apache Kafka 0.9 支持客户端和代理之间的安全连接。
要利用此功能,请遵循 Apache Kafka 文档中的指南以及 Confluent 文档中的 Kafka 0.9 安全指南。
使用spring.cloud.stream.kafka.binder.configuration
选项为活页夹创建的所有客户端设置安全属性。
例如,要将security.protocol
自SASL_SSL
,请设置以下属性:
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
所有其他安全属性都可以以类似的方式设置。
使用 Kerberos 时,请按照参考文档中的说明创建和引用 JAAS 配置。
Spring Cloud Stream 支持使用 JAAS 配置文件和 Spring Boot 属性将 JAAS 配置信息传递给应用程序。
使用 JAAS 配置文件
可以使用系统属性为 Spring Cloud Stream 应用程序设置 JAAS 和(可选)krb5 文件位置。以下示例演示如何使用 JAAS 配置文件启动具有 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序:
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot 属性
作为拥有 JAAS 配置文件的替代方法,Spring Cloud Stream 提供了一种机制,用于使用 Spring Boot 属性为 Spring Cloud Stream 应用程序设置 JAAS 配置。
以下属性可用于配置 Kafka 客户端的登录上下文:
- spring.cloud.stream.kafka.binder.jaas.login模块
-
登录模块名称。正常情况下不需要设置。
违约:
com.sun.security.auth.module.Krb5LoginModule
. - spring.cloud.stream.kafka.binder.jaas.controlFlag
-
登录模块的控制标志。
违约:
required
. - spring.cloud.stream.kafka.binder.jaas.options
-
映射包含登录模块选项的键/值对。
默认值:空地图。
以下示例演示如何使用 Spring Boot 配置属性启动具有 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序:
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM
前面的示例表示与以下 JAAS 文件等效的内容:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_client.keytab"
principal="[email protected]";
};
如果所需的主题已存在于代理上或将由管理员创建,则可以关闭自动创建,并且只需要发送客户端 JAAS 属性。
不要在同一应用程序中混合使用 JAAS 配置文件和 Spring Boot 属性。
如果-Djava.security.auth.login.config system 属性已经存在,则 Spring Cloud Stream 会忽略 Spring Boot 属性。 |
使用autoCreateTopics 和autoAddPartitions 使用 Kerberos。
通常,应用程序可能会使用在 Kafka 和 Zookeeper 中没有管理权限的主体。
因此,依赖 Spring Cloud Stream 创建/修改主题可能会失败。
在安全环境中,强烈建议使用 Kafka 工具以管理方式创建主题和管理 ACL。 |
多绑定器配置和 JAAS
当连接到多个集群时,每个集群都需要单独的 JAAS 配置,请使用属性sasl.jaas.config
.
当应用程序中存在此属性时,它优先于上述其他策略。
有关更多详细信息,请参阅此 KIP-85。
例如,如果您的应用程序中有两个集群,具有单独的 JAAS 配置,那么您可以使用以下模板:
spring.cloud.stream:
binders:
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
kafka2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9093
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1-secret\";"
kafka.binder:
configuration:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
请注意,Kafka 集群和sasl.jaas.config
在上述配置中,它们中的每一个的值都不同。
有关如何设置和运行此类应用程序的更多详细信息,请参阅此示例应用程序。
示例:暂停和恢复使用者
如果希望暂停使用但不导致分区重新平衡,则可以暂停和恢复使用。
这可以通过管理绑定生命周期来促进,如 Spring Cloud Stream 文档中的绑定可视化和控制所示,使用State.PAUSED
和State.RESUMED
.
要恢复,您可以使用ApplicationListener
(或@EventListener
方法)接收ListenerContainerIdleEvent
实例。
事件发布的频率由idleEventInterval
财产。
1.4. 事务性活页夹
通过设置启用事务spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
设置为非空值,例如tx-
.
当在处理器应用程序中使用时,消费者启动事务;在使用者线程上发送的任何记录都参与同一事务。
当监听器正常退出时,监听器容器会将偏移量发送给事务并提交。
一个通用的生产者工厂用于使用spring.cloud.stream.kafka.binder.transaction.producer.*
性能;忽略单个绑定 Kafka 生产者属性。
事务不支持正常的活页夹重试(和死信),因为重试将在原始事务中运行,该事务可能会回滚,任何已发布的记录也将回滚。
启用重试时(公共属性maxAttempts 大于零),重试属性用于配置DefaultAfterRollbackProcessor 以启用容器级别的重试。
同样,此功能不是在事务中发布死信记录,而是通过DefaultAfterRollbackProcessor 在主事务回滚后运行。 |
如果您希望在源应用程序中使用事务,或者从某个任意线程中用于仅生产者事务(例如@Scheduled
方法),您必须获取对事务性生产者工厂的引用,并定义一个KafkaTransactionManager
bean 使用它。
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
请注意,我们使用BinderFactory
;用null
在第一个参数中,当只有一个 Binder 配置时。
如果配置了多个活页夹,请使用活页夹名称获取引用。
一旦我们有了对 binder 的引用,我们就可以获得对ProducerFactory
并创建事务管理器。
然后您将使用正常的 Spring 事务支持,例如TransactionTemplate
或@Transactional
例如:
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
如果您希望将仅生产者事务与其他事务管理器中的事务同步,请使用ChainedTransactionManager
.
如果您部署应用程序的多个实例,则每个实例都需要一个唯一的transactionIdPrefix . |
1.5. 错误通道
从 1.3 版开始,绑定器无条件地将异常发送到每个使用者目标的错误通道,也可以配置为将异步生产者发送失败发送到错误通道。 有关详细信息,请参阅此部分有关错误处理。
的有效负载ErrorMessage
对于发送失败,是KafkaSendFailureException
具有属性:
-
failedMessage
:春季消息Message<?>
未能发送。 -
record
:原始的ProducerRecord
这是从failedMessage
没有自动处理生产者异常(例如发送到死信队列)。 您可以在自己的 Spring Integration 流中使用这些异常。
1.6. Kafka 指标
Kafka binder 模块公开以下指标:
spring.cloud.stream.binder.kafka.offset
:此指标指示给定使用者组尚未从给定 Binder 的主题中使用多少消息。
提供的指标基于 Micrometer 库。
活页夹创建KafkaBinderMetrics
bean 如果 Micrometer 在类路径上并且应用程序没有提供其他此类 bean。
该指标包含消费者组信息、主题以及与主题上最新偏移量的实际滞后。
此指标对于向 PaaS 平台提供自动缩放反馈特别有用。
可以通过在spring.cloud.stream.kafka.binder.metrics
Namespace
有关更多信息,请参阅 Kafka Binder 属性部分。
您可以排除KafkaBinderMetrics
从创建必要的基础设施(如使用者)到通过在应用程序中提供以下组件来报告指标。
@Component
class NoOpBindingMeters {
NoOpBindingMeters(MeterRegistry registry) {
registry.config().meterFilter(
MeterFilter.denyNameStartsWith(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME));
}
}
有关如何有选择地抑制仪表的更多详细信息,请参见此处。
1.7. 逻辑删除记录(空记录值)
使用压缩主题时,带有null
值(也称为逻辑删除记录)表示删除键。
要在 Spring Cloud Stream 函数中接收此类消息,您可以使用以下策略。
@Bean
public Function<Message<Person>, String> myFunction() {
return value -> {
Object v = value.getPayload();
String className = v.getClass().getName();
if (className.isEqualTo("org.springframework.kafka.support.KafkaNull")) {
// this is a tombstone record
}
else {
// continue with processing
}
};
}
1.8. 使用 KafkaBindingRebalanceListener
应用程序可能希望在最初分配分区时将主题/分区查找到任意偏移量,或者对使用者执行其他作。
从 2.1 版开始,如果您提供单个KafkaBindingRebalanceListener
bean 中,它将连接到所有 Kafka 消费者绑定中。
public interface KafkaBindingRebalanceListener {
/**
* Invoked by the container before any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
}
/**
* Invoked by the container after any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
/**
* Invoked when partitions are initially assigned or after a rebalance.
* Applications might only want to perform seek operations on an initial assignment.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
boolean initial) {
}
}
您无法将resetOffsets
consumer 属性设置为true
当您提供重新平衡侦听器时。
1.9. 重试和死信处理
默认情况下,当您配置重试(例如maxAttemts
) 和enableDlq
在使用者绑定中,这些功能在绑定器中执行,侦听器容器或 Kafka 使用者不参与。
在某些情况下,最好将此功能移动到侦听器容器,例如:
-
重试和延迟的总和将超过消费者的
max.poll.interval.ms
属性,可能会导致分区重新平衡。 -
您希望将死信发布到不同的 Kafka 集群。
-
您希望将重试侦听器添加到错误处理程序中。
-
…
要配置将此功能从活页夹移动到容器,请定义一个@Bean
类型ListenerContainerWithDlqAndRetryCustomizer
. 该接口具有以下方法:
/**
* Configure the container.
* @param container the container.
* @param destinationName the destination name.
* @param group the group.
* @param dlqDestinationResolver a destination resolver for the dead letter topic (if
* enableDlq).
* @param backOff the backOff using retry properties (if configured).
* @see #retryAndDlqInBinding(String, String)
*/
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff);
/**
* Return false to move retries and DLQ from the binding to a customized error handler
* using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
* configured via
* {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
* @param destinationName the destination name.
* @param group the group.
* @return false to disable retries and DLQ in the binding
*/
default boolean retryAndDlqInBinding(String destinationName, String group) {
return true;
}
目标解析器和BackOff
是根据绑定属性(如果已配置)创建的。这KafkaTemplate
使用来自spring.kafka….
性能。然后,您可以使用它们创建自定义错误处理程序和死信发布器;例如:
@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
return new ListenerContainerWithDlqAndRetryCustomizer() {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff) {
if (destinationName.equals("topicWithLongTotalRetryConfig")) {
ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
dlqDestinationResolver);
container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
}
}
@Override
public boolean retryAndDlqInBinding(String destinationName, String group) {
return !destinationName.contains("topicWithLongTotalRetryConfig");
}
};
}
现在,只需一次重试延迟大于消费者的延迟max.poll.interval.ms
财产。
当使用多个 Binder 时,'ListenerContainerWithDlqAndRetryCustomizer' bean 将被 'DefaultBinderFactory' 覆盖。对于豆子 要应用,您需要使用 'BinderCustomizer' 来设置容器定制器(参见 [binder-customizer]):
@Bean
public BinderCustomizer binderCustomizer(ListenerContainerWithDlqAndRetryCustomizer containerCustomizer) {
return (binder, binderName) -> {
if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
kafkaMessageChannelBinder.setContainerCustomizer(containerCustomizer);
}
else if (binder instanceof KStreamBinder) {
...
}
else if (binder instanceof RabbitMessageChannelBinder) {
...
}
};
}
1.10. 自定义消费者和生产者配置
如果要对用于创建的使用者和生产者配置进行高级定制ConsumerFactory
和ProducerFactory
在卡夫卡中,
您可以实现以下定制器。
-
消费者配置定制器
-
生产者配置定制器
这两个接口都提供了一种配置用于消费者和生产者属性的配置映射的方法。
例如,如果您想访问在应用程序级别定义的 Bean,则可以在configure
方法。
当 Binder 发现这些定制器可以作为 bean 使用时,它将调用configure
方法。
这两个接口还提供对绑定名称和目标名称的访问,以便在自定义生产者和使用者属性时可以访问它们。
1.11. 自定义 AdminClient 配置
与上面的消费者和生产者配置自定义一样,应用程序还可以通过提供AdminClientConfigCustomizer
.
AdminClientConfigCustomizer 的 configure 方法提供对管理客户端属性的访问,您可以使用它定义进一步的自定义。
Binder 的 Kafka 主题配置器为通过此定制器提供的属性提供了最高优先级。
下面是提供此定制器 bean 的示例。
@Bean
public AdminClientConfigCustomizer adminClientConfigCustomizer() {
return props -> {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
};
}
1.12. 自定义 Kafka Binder 健康指标
当 Spring Boot 执行器位于类路径上时,Kafka binder 会激活默认的运行状况指示器。
此运行状况指示器检查绑定器的运行状况以及与 Kafka 代理的任何通信问题。
如果应用程序想要禁用此默认运行状况检查实现并包含自定义实现,则它可以提供KafkaBinderHealth
接口。KafkaBinderHealth
是一个标记接口,从HealthIndicator
.
在自定义实现中,它必须为health()
方法。
自定义实现必须作为 Bean 存在于应用程序配置中。
当 Binder 发现自定义实现时,它将使用它而不是默认实现。
以下是应用程序中此类自定义实现 bean 的示例。
@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
return new KafkaBinderHealth() {
@Override
public Health health() {
// custom implementation details.
}
};
}
1.13. 死信主题处理
1.13.1. 死信主题分区选择
默认情况下,记录使用与原始记录相同的分区发布到死信主题。 这意味着死信主题必须至少具有与原始记录一样多的分区。
要更改此行为,请添加一个DlqPartitionFunction
实现为@Bean
到应用程序上下文。
只能存在一个这样的豆子。
该函数是随消费者组提供的,失败的ConsumerRecord
和例外。
例如,如果您始终想要路由到分区 0,则可以使用:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果将使用者绑定的dlqPartitions 属性设置为 1(以及活页夹的minPartitionCount 等于1 ),无需提供DlqPartitionFunction ;框架将始终使用分区 0。
如果将使用者绑定的dlqPartitions 属性设置为大于1 (或活页夹的minPartitionCount 大于1 ),您必须提供一个DlqPartitionFunction bean,即使分区计数与原始主题的分区计数相同。 |
还可以为 DLQ 主题定义自定义名称。
为此,请创建DlqDestinationResolver
作为@Bean
到应用程序上下文。
当 binder 检测到这样的 bean 时,优先,否则它将使用dlqName
财产。
如果都未找到,则默认为error.<destination>.<group>
.
下面是一个示例DlqDestinationResolver
作为@Bean
.
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
在提供实现时要记住的一件重要事情DlqDestinationResolver
是 binder 中的 provisioner 不会为应用程序自动创建主题。
这是因为活页夹无法推断出实现可能发送到的所有 DLQ 主题的名称。
因此,如果使用此策略提供 DLQ 名称,则应用程序有责任确保事先创建这些主题。
1.13.2. 处理死信主题中的记录
由于该框架无法预测用户希望如何处理死信邮件,因此它没有提供任何标准机制来处理它们。 如果死信的原因是暂时的,您可能希望将邮件路由回原始主题。 但是,如果问题是永久性问题,则可能会导致无限循环。 本主题中的示例 Spring Boot 应用程序是如何将这些消息路由回原始主题的示例,但它在尝试三次后将它们移动到“停车场”主题。 该应用程序是另一个从死信主题中读取的 spring-cloud-stream 应用程序。 当 5 秒内未收到消息时,它将退出。
这些示例假定原始目标为so8400out
消费群体是so8400
.
有几种策略需要考虑:
-
考虑仅在主应用程序未运行时运行重新路由。否则,暂时性错误的重试很快就会用完。
-
或者,使用两阶段方法:使用此应用程序路由到第三个主题,另一个应用程序从那里路由回主主题。
以下代码列表显示了示例应用程序:
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private StreamBridge streamBridge;
@Bean
public Function<Message<?>, Message<?>> reRoute() {
return failed -> {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, retries + 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
streamBridge.send("parkingLot", MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
};
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, exiting");
return;
}
}
}
}
1.14. 使用 Kafka Binder 进行分区
Apache Kafka 原生支持主题分区。
有时将数据发送到特定分区是有利的 — 例如,当您想要严格排序消息处理时(特定客户的所有消息都应转到同一分区)。
以下示例展示了如何配置生产者端和消费者端:
@SpringBootApplication
public class KafkaPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"foo1", "bar1", "qux1",
"foo2", "bar2", "qux2",
"foo3", "bar3", "qux3",
"foo4", "bar4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
.web(false)
.run(args);
}
@Bean
public Supplier<Message<?>> generate() {
return () -> {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
};
}
}
spring:
cloud:
stream:
bindings:
generate-out-0:
destination: partitioned.topic
producer:
partition-key-expression: headers['partitionKey']
partition-count: 12
重要的是要记住,由于 Apache Kafka 本机支持分区,因此无需依赖上述 Binder 分区,除非您使用示例中的自定义分区键或涉及有效负载本身的表达式。
活页夹提供的分区选择适用于不支持本机分区的中间件技术。
请注意,我们使用的是一个名为partitionKey 在上面的示例中,这将是分区的决定因素,因此在这种情况下,使用活页夹分区是合适的。
使用本机 Kafka 分区时,即当您不提供partition-key-expression ,则 Apache Kafka 将选择一个分区,默认情况下,该分区将是可用分区数的记录键的哈希值。
要向出站记录添加键,请将KafkaHeaders.KEY header 添加到 spring-messaging 中所需的键值Message<?> .
默认情况下,当未提供记录键时,Apache Kafka 将根据 Apache Kafka 文档中描述的逻辑选择分区。 |
必须预配主题以具有足够的分区,以便为所有使用者组实现所需的并发性。
上述配置最多支持 12 个消费者实例(如果其concurrency 是 2,如果它们的并发性为 3,则为 4,依此类推)。
通常最好“过度预配”分区,以允许将来增加使用者或并发性。 |
上述配置使用默认分区 (key.hashCode() % partitionCount ).
这可能会也可能不会提供适当平衡的算法,具体取决于键值。特别请注意,此分区策略与独立 Kafka 生产者使用的默认策略(例如 Kafka Streams 使用的默认策略)不同,这意味着当这些客户端生成时,相同的键值可能会在分区之间以不同的方式平衡。
您可以使用partitionSelectorExpression 或partitionSelectorClass 性能。 |
由于分区由 Kafka 原生处理,因此消费者端不需要特殊配置。 Kafka 在实例之间分配分区。
kafka 主题的 partitionCount 可能会在运行时更改(例如,由于管理任务)。 之后计算的分区将有所不同(例如,届时将使用新分区)。 从 Spring Cloud Stream 的 4.0.3 开始,将支持分区计数的更改。 另请参阅参数“spring.kafka.producer.properties.metadata.max.age.ms”以配置更新间隔。 由于某些限制,无法使用引用消息的“有效负载”的“partition-key-expression”,在这种情况下,该机制将被禁用。 默认情况下,整体行为处于禁用状态,可以使用配置参数“producer.dynamicPartitionUpdatesEnabled=true”启用。 |
以下 Spring Boot 应用程序监听 Kafka 流并打印(到控制台)每条消息转到的分区 ID:
@SpringBootApplication
public class KafkaPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
int partition = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION);
System.out.println(message + " received from partition " + partition);
};
}
}
spring:
cloud:
stream:
bindings:
listen-in-0:
destination: partitioned.topic
group: myGroup
您可以根据需要添加实例。
Kafka 重新平衡分区分配。
如果实例计数(或instance count * concurrency
) 超过分区数,部分消费者处于空闲状态。
2. 响应式 Kafka 活页夹
Spring Cloud Stream 中的 Kafka 绑定器提供了一个基于 Reactor Kafka 项目的专用响应式绑定器。这个响应式 Kafka 绑定器在基于 Apache Kafka 的应用程序中启用了完整的端到端响应式功能,例如背压、响应式流等。当您的 Spring Cloud Stream Kafka 应用程序使用响应式类型(Flux
,Mono
等),建议使用这个响应式 Kafka 绑定器,而不是常规的基于消息通道的 Kafka 绑定器。
2.1. Maven 坐标
以下是响应式 Kafka 绑定器的 maven 坐标。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-reactive</artifactId>
</dependency>
2.2. 使用响应式 Kafka Binder 的基本示例
在本节中,我们将展示一些使用响应式绑定器编写响应式 Kafka 应用程序的基本代码片段以及围绕它们的详细信息。
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return s -> s.map(String::toUpperCase);
}
您可以使用上述upppercase
函数与基于消息通道的 Kafka 绑定器 (spring-cloud-stream-binder-kafka
)以及反应性 Kafka 绑定剂 (spring-cloud-stream-binder-kafka-reactive
),本节讨论的主题。
当将此函数与常规 Kafka 绑定器一起使用时,尽管您在应用程序中使用响应式类型(即,在uppercase
函数),您只能在函数执行过程中获得响应式流。
在函数的执行上下文之外,没有响应式好处,因为底层绑定器不是基于响应式堆栈。
因此,尽管这看起来像是带来了完整的端到端响应式堆栈,但此应用程序仅具有部分响应性。
现在假设您正在为 Kafka 使用适当的响应式绑定器 -spring-cloud-stream-binder-kafka-reactive
与上述函数的应用。
这种 binder 实现将提供从顶端消费到链底端发布的全部响应式好处。
这是因为底层绑定器是建立在 Reactor Kafka 的核心 API 之上的。
在消费者方面,它利用了 KafkaReceiver,它是 Kafka 消费者的响应式实现。
同样,在生产者端,它使用 KafkaSender API,它是 Kafka 生产者的响应式实现。
由于响应式 Kafka 绑定器的基础是建立在适当的响应式 Kafka API 之上的,因此应用程序可以获得使用响应式技术的全部好处。
使用这个响应式 Kafka 绑定器时,应用程序内置了自动背压等响应式功能。
从 4.0.2 版开始,您可以自定义ReceiverOptions
和SenderOptions
通过提供一个或多个ReceiverOptionsCustomizer
或SenderOptionsCustomizer
Beans。
他们是BiFunction
s 接收绑定名称和初始选项,返回自定义选项。
接口扩展Ordered
因此,当存在多个定制器时,将按所需的顺序应用定制器。
默认情况下,活页夹不提交偏移量。
从 4.0.2 版开始,KafkaHeaders.ACKNOWLEDGMENT 标头包含一个ReceiverOffset 对象,它允许您通过调用其acknowledge() 或commit() 方法。 |
@Bean
public Consumer<Flux<Message<String>> consume() {
return msg -> {
process(msg.getPayload());
msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
}
}
请参阅reactor-kafka
文档和 JavaDocs 了解更多信息。
此外,从 4.0.3 版本开始,Kafka 消费者属性reactiveAtmostOnce
可以设置为true
并且 Binder 将在处理每次轮询返回的记录之前自动提交偏移量。
此外,从版本 4.0.3 开始,您可以设置 consumer 属性reactiveAutoCommit
自true
并且 binder 将在处理每次轮询返回的记录后自动提交偏移量。
在这些情况下,确认标头不存在。
4.0.2 也提供reactiveAutoCommit ,但实现不正确,它的行为类似于reactiveAtMostOnce . |
下面是一个如何使用的示例reaciveAutoCommit
.
@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
return flux -> flux
.doOnNext(inner -> inner
.doOnNext(val -> {
log.info(val.value());
})
.subscribe())
.subscribe();
}
请注意reactor-kafka
返回一个Flux<Flux<ConsumerRecord<?, ?>>>
使用自动提交时。
鉴于 Spring 无法访问内部通量的内容,应用程序必须处理本机ConsumerRecord
;没有对内容应用消息转换或转换服务。
这需要使用本机解码(通过指定Deserializer
配置中适当类型的)返回所需类型的记录键/值。
2.3. 以原始格式使用记录
在上面upppercase
函数,我们将记录用作Flux<String>
然后将其生成为Flux<String>
.
在某些情况下,您可能需要以原始接收格式接收记录 -ReceiverRecord
.
这是这样一个功能。
@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}
请注意,在此函数中,我们将记录用作Flux<ReceiverRecord<byte[], byte[]>>
然后将其生成为Flux<String>
.ReceiverRecord
是基本接收记录,它是专门的 KafkaConsumerRecord
在卡夫卡反应堆中。
当使用响应式 Kafka 绑定器时,上述函数将允许您访问ReceiverRecord
类型。
但是,在这种情况下,您需要为 RecordMessageConverter 提供自定义实现。
默认情况下,响应式 Kafka 绑定器使用 MessagingMessageConverter 将有效负载和标头从ConsumerRecord
.
因此,当您的处理程序方法收到它时,有效负载已经从接收到的记录中提取并传递到该方法中,就像我们上面看到的第一个函数一样。
通过提供自定义RecordMessageConverter
实现,您可以覆盖默认行为。
例如,如果要将记录作为原始记录使用Flux<ReceiverRecord<byte[], byte[]>>
,则可以在应用程序中提供以下 bean 定义。
@Bean
RecordMessageConverter fullRawReceivedRecord() {
return new RecordMessageConverter() {
private final RecordMessageConverter converter = new MessagingMessageConverter();
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type payloadType) {
return MessageBuilder.withPayload(record).build();
}
@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
return this.converter.fromMessage(message, defaultTopic);
}
};
}
然后,您需要指示框架使用此转换器进行所需的绑定。
这是一个基于我们的lowercase
功能。
spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"
lowercase-in-0
是我们的lowercase
功能。
对于出站(lowecase-out-0
),我们仍然使用常规MessagingMessageConverter
.
在toMessage
实现上述,我们收到原始的ConsumerRecord
(ReceiverRecord
因为我们处于响应式 Binder 上下文中),然后将其包装在Message
.
然后,该消息有效负载是ReceiverRecord
提供给用户方法。
如果reactiveAutoCommit
是false
(默认),调用rec.receiverOffset().acknowledge()
(或commit()
) 导致偏移量被提交;如果reactiveAutoCommit
是true
,助焊剂供应ConsumerRecord
s 代替。
请参阅reactor-kafka
文档和 JavaDocs 了解更多信息。
2.4. 并发
将响应式函数与响应式 Kafka 绑定器一起使用时,如果您在使用者绑定上设置并发性,则绑定器会创建尽可能多的专用KafkaReceiver
并发值提供的对象。
换句话说,这会创建多个响应式流,并具有单独的Flux
实现。
当您使用分区主题中的记录时,这可能很有用。
例如,假设传入主题至少有三个分区。 然后,您可以设置以下属性。
spring.cloud.stream.bindings.lowercase-in-0.consumer.concurrency=3
这将创建三个专用的KafkaReceiver
生成三个单独的对象Flux
实现,然后将它们流式传输到处理程序方法。
2.5. 多路复用
从 4.0.3 版开始,通用使用者属性multiplex
现在,响应式绑定器支持,其中单个绑定可以从多个主题使用。
什么时候false
(默认值),则为公共destination
财产。
2.6. 目的地是模式
从 4.0.3 版本开始,destination-is-pattern
现在支持 Kafka 绑定消费者属性。
接收器选项使用正则表达式Pattern
,允许绑定从与模式匹配的任何主题中使用。
2.7. 发送方结果通道
从 4.0.3 版开始,您可以配置resultMetadataChannel
接收SenderResult<?>
s 来确定发送的成功/失败。
这SenderResult
包含correlationMetadata
允许您将结果与发送相关联;它还包含RecordMetadata
,表示TopicPartition
以及已发送记录的偏移量。
这resultMetadataChannel
必须是FluxMessageChannel
实例。
下面是如何使用此功能的示例,相关元数据类型为Integer
:
@Bean
FluxMessageChannel sendResults() {
return new FluxMessageChannel();
}
@ServiceActivator(inputChannel = "sendResults")
void handleResults(SenderResult<Integer> result) {
if (result.exception() != null) {
failureFor(result);
}
else {
successFor(result);
}
}
要在输出记录上设置相关元数据,请将CORRELATION_ID
页眉:
streamBridge.send("words1", MessageBuilder.withPayload("foobar")
.setCorrelationId(42)
.build());
将该功能与Function
,则函数输出类型必须是Message<?>
将相关 ID 标头设置为所需值。
元数据应是唯一的,至少在发送期间是唯一的。
3. Kafka Streams 活页夹
3.1. 用法
要使用 Kafka Streams 绑定器,您只需使用以下 Maven 坐标将其添加到 Spring Cloud Stream 应用程序中:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
为 Kafka Streams 绑定器引导新项目的一种快速方法是使用 Spring Initializr,然后选择“Cloud Streams”和“Spring for Kafka Streams”,如下所示

3.2. 概述
Spring Cloud Stream 包括一个专为 Apache Kafka Streams 绑定而设计的绑定器实现。 通过这种本机集成,Spring Cloud Stream“处理器”应用程序可以直接在核心业务逻辑中使用 Apache Kafka Streams API。
Kafka Streams 绑定器实现建立在 Spring for Apache Kafka 项目提供的基础之上。
Kafka Streams 绑定器为 Kafka Streams 中的三种主要类型提供了绑定功能 -KStream
,KTable
和GlobalKTable
.
Kafka Streams 应用程序通常遵循从入站主题读取记录的模型,应用业务逻辑,然后将转换后的记录写入出站主题。 或者,也可以定义没有出站目标的处理器应用程序。
在以下部分中,我们将详细介绍 Spring Cloud Stream 与 Kafka Streams 的集成。
3.3. 编程模型
当使用 Kafka Streams binder 提供的编程模型时,高级 Streams DSL 和高级和低级 Processor-API 的混合都可以用作选项。
当混合更高级别和较低级别的 API 时,这通常是通过调用transform
或process
API 方法KStream
.
3.3.1. 函数式样式
从 Spring Cloud Stream 开始3.0.0
,Kafka Streams 绑定器允许使用 Java 8 中可用的函数式编程风格来设计和开发应用程序。
这意味着应用程序可以简洁地表示为类型的 lambda 表达式java.util.function.Function
或java.util.function.Consumer
.
让我们举一个非常基本的例子。
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
虽然很简单,但这是一个完整的独立 Spring Boot 应用程序,它利用 Kafka Streams 进行流处理。
这是一个没有出站绑定且只有一个入站绑定的使用者应用程序。
应用程序使用数据,它只是记录来自KStream
标准输出上的键和值。
该应用程序包含SpringBootApplication
注释和标记为Bean
.
bean 方法的类型为java.util.function.Consumer
其中参数化为KStream
.
然后在实现中,我们返回一个 Consumer 对象,该对象本质上是一个 lambda 表达式。
在 lambda 表达式中,提供了用于处理数据的代码。
在此应用程序中,有一个类型的单个输入绑定KStream
.
活页夹为应用程序创建此绑定,名称为process-in-0
,即函数 bean name 的名称后跟破折号字符 () 和文字-
in
后跟另一个破折号,然后是参数的序号位置。
使用此绑定名称可以设置其他属性,例如目标。
例如spring.cloud.stream.bindings.process-in-0.destination=my-topic
.
如果未在绑定上设置目标属性,那么将创建一个与绑定同名的主题(如果应用程序有足够的权限),或者该主题应该已经可用。 |
一旦构建为 uber-jar(例如kstream-consumer-app.jar
),您可以像下面一样运行上面的示例。
如果应用程序选择使用 Spring 的Component
注释,则活页夹也支持该模型。
上面的功能 bean 可以重写如下。
@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {
@Override
public void accept(KStream<Object, String> input) {
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic
这是另一个示例,它是一个具有输入和输出绑定的完整处理器。 这是经典的字数计数示例,其中应用程序从主题接收数据,然后在翻转时间窗口中计算每个单词的出现次数。
@SpringBootApplication
public class WordCountProcessorApplication {
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("word-counts-state-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
}
同样,这是一个完整的 Spring Boot 应用程序。这里与第一个应用程序的不同之处在于 bean 方法是java.util.function.Function
.
第一个参数化类型Function
用于输入KStream
第二个是输出。
在方法体中,提供了一个 lambda 表达式,该表达式类型为Function
作为实现,给出了实际的业务逻辑。
与前面讨论的基于消费者的应用程序类似,此处的输入绑定被命名为process-in-0
默认情况下。对于输出,绑定名称也会自动设置为process-out-0
.
一旦构建为 uber-jar(例如wordcount-processor.jar
),您可以像下面一样运行上面的示例。
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
此应用程序将使用 Kafka 主题中的消息words
并将计算结果发布到输出
主题counts
.
Spring Cloud Stream 将确保来自传入和传出主题的消息自动绑定为 KStream 对象。作为开发人员,您可以专注于代码的业务方面,即编写逻辑 处理器中需要。设置 Kafka Streams 基础架构所需的 Kafka Streams 特定配置 由框架自动处理。
我们在上面看到的两个示例有一个KStream
输入绑定。在这两种情况下,绑定都从单个主题接收记录。
如果要将多个主题多路复用为一个主题KStream
binding,则可以在下方提供逗号分隔的 Kafka 主题作为目标。
spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3
此外,如果要将主题与常规 Exresession 进行匹配,还可以将主题模式提供为目标。
spring.cloud.stream.bindings.process-in-0.destination=input.*
多个输入绑定
许多重要的 Kafka Streams 应用程序通常通过多个绑定使用来自多个主题的数据。
例如,一个主题被消耗为Kstream
另一个作为KTable
或GlobalKTable
.
应用程序可能希望将数据作为表类型接收的原因有很多。
考虑一个用例,其中基础主题是通过数据库中的更改数据捕获 (CDC) 机制填充的,或者应用程序可能只关心下游处理的最新更新。
如果应用程序指定数据需要绑定为KTable
或GlobalKTable
,则 Kafka Streams 绑定器会正确地将目标绑定到KTable
或GlobalKTable
并使它们可供应用程序作。
我们将研究几种不同的场景,了解如何在 Kafka Streams 绑定器中处理多个输入绑定。
Kafka Streams Binder 中的 BiFunction
这是一个示例,我们有两个输入和一个输出。在这种情况下,应用程序可以利用java.util.function.BiFunction
.
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
同样,基本主题与前面的示例相同,但这里我们有两个输入。
Java 的BiFunction
支持用于将输入绑定到所需的目标。绑定器为输入生成的默认绑定名称是process-in-0
和process-in-1
分别。 默认输出绑定为process-out-0
. 在此示例中,第一个参数BiFunction
被绑定为KStream
对于第一个输入,第二个参数被绑定为KTable
对于第二个输入。
Kafka Streams Binder 中的 BiConsumer
如果有两个输入,但没有输出,在这种情况下,我们可以使用java.util.function.BiConsumer
如下图所示。
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {}
}
超过两个输入
如果您有两个以上的输入怎么办? 在某些情况下,您需要两个以上的输入。在这种情况下,活页夹允许您链接部分函数。 在函数式编程术语中,这种技术通常称为柯里化。 随着 Java 8 中添加的函数式编程支持,Java 现在允许您编写柯里函数。 Spring Cloud Stream Kafka Streams 绑定器可以利用此功能来启用多个输入绑定。
让我们看一个例子。
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
return orders -> (
customers -> (
products -> (
orders.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
让我们看看上面介绍的绑定模型的细节。在这个模型中,我们在入站上有 3 个部分应用的函数。让我们将它们称为f(x)
,f(y)
和f(z)
.
如果我们在真正的数学函数意义上扩展这些函数,它将如下所示:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>
.
这x
变量代表KStream<Long, Order>
这y
变量代表GlobalKTable<Long, Customer>
和z
变量代表GlobalKTable<Long, Product>
.
第一个功能f(x)
具有应用程序的第一个输入绑定 (KStream<Long, Order>
),其输出是函数 f(y)。
功能f(y)
具有应用程序的第二个输入绑定 (GlobalKTable<Long, Customer>
),其输出是另一个函数,f(z)
.
函数的输入f(z)
是应用程序的第三个输入 (GlobalKTable<Long, Product>
),其输出为KStream<Long, EnrichedOrder>
这是应用程序的最终输出绑定。
来自三个部分函数的输入,即KStream
,GlobalKTable
,GlobalKTable
分别在方法主体中可供您使用,用于将业务逻辑实现为 lambda 表达式的一部分。
输入绑定命名为enrichOrder-in-0
,enrichOrder-in-1
和enrichOrder-in-2
分别。输出绑定命名为enrichOrder-out-0
.
使用柯里函数,您几乎可以拥有任意数量的输入。但是,请记住,在 Java 中,超过较少数量的输入和部分应用的函数都可能导致代码不可读。 因此,如果您的 Kafka Streams 应用程序需要的输入绑定数量超过相当少的数量,并且您想使用此函数模型,那么您可能需要重新考虑您的设计并适当地分解应用程序。
输出绑定
Kafka Streams 绑定器允许KStream
或KTable
作为输出绑定。
在幕后,活页夹使用to
方法KStream
将生成的记录发送到输出主题。
如果应用程序提供KTable
作为函数中的输出,Binder 仍然通过委托给to
方法KStream
.
例如,以下两个功能都可以工作:
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
多个输出绑定
Kafka Streams 允许将出站数据写入多个主题。此功能在 Kafka Streams 中称为分支。
使用多个输出绑定时,需要提供 KStream (KStream[]
) 作为出站返回类型。
这是一个例子:
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> {
final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.split()
.branch(isEnglish)
.branch(isFrench)
.branch(isSpanish)
.noDefaultBranch();
return stringKStreamMap.values().toArray(new KStream[0]);
};
}
编程模型保持不变,但出站参数化类型为KStream[]
.
默认输出绑定名称为process-out-0
,process-out-1
,process-out-2
分别用于上述功能。
binder 之所以会生成三个输出绑定,是因为它检测了返回的KStream
数组为三个。
请注意,在此示例中,我们提供了一个noDefaultBranch()
;如果我们使用defaultBranch()
相反,这将需要额外的输出绑定,本质上返回一个KStream
长度为四的数组。
Kafka 流的基于函数的编程风格摘要
总之,下表显示了可在函数范式中使用的各种选项。
输入数 | 输出数量 | 要使用的组件 |
---|---|---|
1 |
0 |
java.util.function.消费者 |
2 |
0 |
java.util.function.Bi消费者 |
1 |
1..n |
java.util.function.函数 |
2 |
1..n |
java.util.function.Bi函数 |
>= 3 |
0..n |
使用柯里化函数 |
-
如果此表中有多个输出,则类型将变为
KStream[]
.
Kafka Streams 绑定器中的函数组合
Kafka Streams 绑定器支持线性拓扑的最小形式的功能组合。
使用 Java 函数 API 支持,您可以编写多个函数,然后使用andThen
方法。
例如,假设您有以下两个函数。
@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
return input -> input.peek((s, s2) -> {});
}
@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
return input -> input.peek((s, s2) -> {});
}
即使活页夹中没有功能组合支持,也可以按如下方式组合这两个功能。
@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
foo().andThen(bar());
}
然后,您可以提供表单的定义spring.cloud.function.definition=foo;bar;composed
.
使用活页夹中的函数组合支持,您无需编写要执行显式函数组合的第三个函数。
您可以简单地执行以下作:
spring.cloud.function.definition=foo|bar
您甚至可以这样做:
spring.cloud.function.definition=foo|bar;foo;bar
在此示例中,组合函数的默认绑定名称将变为foobar-in-0
和foobar-out-0
.
Kafka Streams bincer 中功能组合的局限性
当你有java.util.function.Function
bean,可以由另一个函数或多个函数组成。
相同的函数 bean 可以用java.util.function.Consumer
也。在这种情况下,消费者是最后一个组成的组件。
一个函数可以由多个函数组成,然后以java.util.function.Consumer
豆子也是如此。
在组合类型java.util.function.BiFunction
这BiFunction
必须是定义中的第一个函数。
组成的实体必须是java.util.function.Function
或java.util.funciton.Consumer
.
换句话说,您不能将BiFunction
bean 然后与另一个BiFunction
.
不能使用BiConsumer
或定义,其中Consumer
是第一个组件。
您也不能使用输出为数组 (KStream[]
用于分支),除非这是定义中的最后一个组件。
第一个Function
之BiFunction
在函数定义中也可以使用柯里形式。
例如,以下情况是可能的。
@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
return a -> b ->
a.join(b, (value1, value2) -> value1 + value2);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}
函数定义可以是curriedFoo|bar
.
在后台,绑定器将为柯里函数创建两个输入绑定,以及基于定义中的最终函数的输出绑定。
在这种情况下,默认输入绑定将是curriedFoobar-in-0
和curriedFoobar-in-1
.
此示例的默认输出绑定将变为curriedFoobar-out-0
.
使用特别注意事项KTable
作为函数组合中的输出
假设您有以下两个功能。
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
您可以将它们组合为foo|bar
,但请记住,第二个函数 (bar
在这种情况下)必须有一个KTable
作为输入,因为第一个函数 (foo
) 有KTable
作为输出。
3.4. 编程模型的辅助工具
3.4.1. 单个应用程序中的多个 Kafka Streams 处理器
Binder 允许在单个 Spring Cloud Stream 应用程序中拥有多个 Kafka Streams 处理器。 您可以申请如下。
@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
...
}
在这种情况下,绑定器将创建 3 个具有不同应用程序 ID 的单独 Kafka Streams 对象(更多内容见下文)。 但是,如果应用程序中有多个处理器,则必须告诉 Spring Cloud Stream 需要激活哪些功能。 以下是激活这些功能的方法。
spring.cloud.function.definition: process;anotherProcess;yetAnotherProcess
如果您不希望立即激活某些功能,可以将其从此列表中删除。
当您拥有单个 Kafka Streams 处理器和其他类型的Function
同一应用程序中的 bean,通过不同的绑定器处理(例如,基于常规 Kafka 消息通道绑定器的函数 bean)
3.4.2. Kafka Streams 应用程序 ID
应用程序 ID 是您需要为 Kafka Streams 应用程序提供的必填属性。 Spring Cloud Stream Kafka Streams 绑定器允许您以多种方式配置此应用程序 ID。
如果应用程序中只有一个处理器,则可以使用以下属性在绑定器级别进行设置:
spring.cloud.stream.kafka.streams.binder.applicationId
.
为了方便起见,如果您只有一个处理器,您还可以使用spring.application.name
作为属性来委托应用程序 ID。
如果应用程序中有多个 Kafka Streams 处理器,则需要为每个处理器设置应用程序 ID。 对于函数模型,您可以将其作为属性附加到每个函数。
例如,假设您有以下功能。
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
...
}
和
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
然后,您可以使用以下活页夹级别属性为每个应用程序设置应用程序 ID。
spring.cloud.stream.kafka.streams.binder.functions.process.applicationId
和
spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId
对于基于函数的模型,这种在绑定级别设置应用程序 ID 的方法也将有效。但是,如果您使用的是功能模型,则在绑定器级别设置每个函数会容易得多,如我们在上面看到的那样。
对于生产部署,强烈建议通过配置显式指定应用程序 ID。如果您要自动扩展应用程序,这尤其重要,在这种情况下,您需要确保使用相同的应用程序 ID 部署每个实例。
如果应用程序未提供应用程序 ID,则在这种情况下,活页夹将自动为您生成静态应用程序 ID。
这在开发方案中很方便,因为它避免了显式提供应用程序 ID 的需要。
以这种方式生成的应用程序 ID 将在应用程序重启时保持静态。
对于功能模型,生成的应用程序 ID 将是函数 bean 名称,后跟文字applicationID
,例如:process-applicationID
如果process
如果函数 bean 名称。
设置应用程序 ID 的摘要
-
默认情况下,binder 将根据函数方法自动生成应用程序 ID。
-
如果您有单个处理器,则可以使用
spring.kafka.streams.applicationId
,spring.application.name
或spring.cloud.stream.kafka.streams.binder.applicationId
. -
如果您有多个处理器,则可以使用属性 -
spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId
.
3.4.3. 使用函数式样式覆盖绑定器生成的默认绑定名称
默认情况下,绑定器在使用函数式样式时使用上面讨论的策略来生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。 如果要覆盖这些绑定名称,可以通过指定以下属性来实现。
spring.cloud.stream.function.bindings.<default binding name>
.默认绑定名称是活页夹生成的原始绑定名称。
例如,假设您有这个函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
Binder 将生成带有名称的绑定,process-in-0
,process-in-1
和process-out-0
.
现在,如果您想将它们完全更改为其他内容,也许是更多特定于域的绑定名称,那么您可以按如下方式进行作。
spring.cloud.stream.function.bindings.process-in-0=users
spring.cloud.stream.function.bindings.process-in-0=regions
和
spring.cloud.stream.function.bindings.process-out-0=clicks
之后,必须在这些新绑定名称上设置所有绑定级别属性。
请记住,使用上述函数式编程模型,在大多数情况下,遵循默认绑定名称是有意义的。 您可能仍希望执行此重写的唯一原因是,当您具有大量配置属性并且想要将绑定映射到更域友好的内容时。
3.4.4. 设置引导服务器配置
运行 Kafka Streams 应用程序时,必须提供 Kafka 代理服务器信息。
如果您不提供此信息,则 Binder 希望您以默认值运行代理localhost:9092
.
如果不是这种情况,那么您需要覆盖它。有几种方法可以做到这一点。
-
使用 boot 属性 -
spring.kafka.bootstrapServers
-
活页夹级属性 -
spring.cloud.stream.kafka.streams.binder.brokers
当涉及到 binder 级别属性时,是否使用通过常规 Kafka binder 提供的 broker 属性并不重要 -spring.cloud.stream.kafka.binder.brokers
.
Kafka Streams 绑定器将首先检查是否设置了 Kafka Streams 绑定器特定的代理属性 (spring.cloud.stream.kafka.streams.binder.brokers
),如果未找到,则查找spring.cloud.stream.kafka.binder.brokers
.
3.5. 记录序列化和反序列化
Kafka Streams 绑定器允许您以两种方式序列化和反序列化记录。 一个是Kafka提供的原生序列化和反序列化工具,另一个是Spring Cloud Stream框架的消息转换能力。 让我们看看一些细节。
3.5.1. 入站反序列化
密钥始终使用本机 Serdes 进行反序列化。
对于值,默认情况下,入站的反序列化由 Kafka 本机执行。 请注意,这是对以前版本的 Kafka Streams 绑定程序的默认行为的重大更改,其中反序列化是由框架完成的。
Kafka Streams 绑定器将尝试推断匹配Serde
类型通过查看java.util.function.Function|Consumer
.
这是它与 Serdes 匹配的顺序。
-
如果应用程序提供类型为
Serde
如果返回类型使用传入键或值类型的实际类型进行参数化,则它将使用Serde
用于入站反序列化。 例如,如果应用程序中有以下内容,则活页夹会检测到KStream
与在Serde
豆。 它将用于入站反序列化。
@Bean
public Serde<Foo> customSerde() {
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下来,它查看类型,看看它们是否是 Kafka Streams 公开的类型之一。如果是这样,请使用它们。 以下是绑定器将尝试从 Kafka Streams 匹配的 Serde 类型。
Integer, Long, Short, Double, Float, byte[], UUID and String.
-
如果 Kafka Streams 提供的 Serdes 都不匹配类型,那么它将使用 Spring Kafka 提供的 JsonSerde。在这种情况下,绑定器假定类型是 JSON 友好的。 如果您有多个值对象作为输入,这很有用,因为绑定器会在内部将它们推断为正确的 Java 类型。 在回退到
JsonSerde
不过,活页夹会以默认值Serde
s 设置,以查看它是否是Serde
它可以与传入的 KStream 类型匹配。
如果上述策略均无效,则应用程序必须提供Serde
通过配置。
这可以通过两种方式进行配置 - 绑定或默认。
首先,活页夹将查看Serde
在绑定级别提供。
例如,如果您有以下处理器,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
然后,您可以提供绑定级别Serde
使用以下内容:
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您提供Serde 作为每个输入绑定的 abover,那么这将具有更高的优先级,并且绑定器将远离任何Serde 推理。 |
如果您希望将默认键/值 Serdes 用于入站反序列化,则可以在 Binder 级别执行此作。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果你不想要 Kafka 提供的原生解码,可以依赖 Spring Cloud Stream 提供的消息转换功能。由于原生解码是默认的,为了让 Spring Cloud Stream 对入站值对象进行反序列化,你需要显式禁用原生解码。
例如,如果您拥有与上述相同的 BiFunction 处理器,则spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
您需要单独禁用所有输入的本机解码。否则,本机解码仍将应用于您未禁用的输入。
默认情况下,Spring Cloud Stream 将使用application/json
作为内容类型,并使用适当的 JSON 消息转换器。
可以使用以下属性和适当的MessageConverter
豆。
spring.cloud.stream.bindings.process-in-0.contentType
3.5.2. 出站序列化
出站序列化几乎遵循与上述入站反序列化相同的规则。 与入站反序列化一样,与以前版本的 Spring Cloud Stream 相比,一个主要变化是出站的序列化由 Kafka 本机处理。 在 3.0 版本的 binder 之前,这是由框架本身完成的。
出站上的密钥始终由 Kafka 使用匹配Serde
这是由活页夹推断出来的。
如果它无法推断出键的类型,则需要使用配置来指定。
值 serde 使用用于入站反序列化的相同规则进行推断。
首先,它匹配以查看出站类型是否来自应用程序中提供的 bean。
如果没有,它会检查它是否与Serde
被卡夫卡暴露,例如 -Integer
,Long
,Short
,Double
,Float
,byte[]
,UUID
和String
.
如果这不起作用,那么它就会回退到JsonSerde
由 Spring Kafka 项目提供,但先看默认的Serde
配置以查看是否存在匹配项。
请记住,所有这些都对应用程序透明地发生。
如果这些都不起作用,则用户必须提供Serde
按配置使用。
假设您正在使用相同的BiFunction
处理器,如上所述。然后,您可以按如下方式配置出站键/值 Serdes。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果 Serde 推理失败,并且没有提供绑定级别 Serdes,则绑定器将回退到JsonSerde
,但请查看默认的 Serdes 以进行匹配。
默认 serde 的配置方式与上述反序列化下描述的相同。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果应用程序使用分支功能并具有多个输出绑定,则必须按绑定配置这些绑定。
同样,如果活页夹能够推断出Serde
类型,则无需执行此配置。
如果您不希望 Kafka 提供的本机编码,但想使用框架提供的消息转换,那么您需要显式禁用本机编码,因为本机编码是默认的。
例如,如果您拥有与上述相同的 BiFunction 处理器,则spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
在分支的情况下,您需要单独禁用所有输出的本机编码。否则,本机编码仍将应用于您未禁用的那些。
当 Spring Cloud Stream 完成转换时,默认情况下,它将使用application/json
作为内容类型,并使用适当的 JSON 消息转换器。可以使用以下属性和相应的MessageConverter
豆。
spring.cloud.stream.bindings.process-out-0.contentType
禁用本机编码/解码时,binder 不会像本机 Serdes 那样进行任何推理。应用程序需要显式提供所有配置选项。因此,通常建议在编写 Spring Cloud Stream Kafka Streams 应用程序时保留反序列化的默认选项,并坚持使用 Kafka Streams 提供的本机反序列化。您必须使用框架提供的消息转换功能的一种场景是,当您的上游生产者使用特定的序列化策略时。在这种情况下,您需要使用匹配的反序列化策略,因为本机机制可能会失败。当依赖默认Serde
机制,应用程序必须确保活页夹有一条前进的道路,以正确的方式正确映射入站和出站Serde
,否则事情可能会失败。
值得一提的是,上面概述的数据反序列化方法仅适用于处理器的边缘,即入站和出站。
您的业务逻辑可能仍需要调用显式需要的 Kafka Streams APISerde
对象。
这些仍然是应用程序的责任,必须由开发人员相应地处理。
3.6. 错误处理
Apache Kafka Streams 提供了本机处理反序列化错误异常的功能。
有关此支持的详细信息,请参阅此。
开箱即用,Apache Kafka Streams 提供了两种反序列化异常处理程序 -LogAndContinueExceptionHandler
和LogAndFailExceptionHandler
.
顾名思义,前者将记录错误并继续处理下一条记录,后者将记录错误并失败。LogAndFailExceptionHandler
是默认的反序列化异常处理程序。
3.6.1. 在 Binder 中处理反序列化异常
Kafka Streams 绑定器允许使用以下属性指定上述反序列化异常处理程序。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
或
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
除了上述两个反序列化异常处理程序外,绑定器还提供了第三个处理程序,用于将错误记录(毒丸)发送到 DLQ(死信队列)主题。 下面是启用此 DLQ 异常处理程序的方法。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
设置上述属性后,反序列化错误中的所有记录都会自动发送到 DLQ 主题。
您可以设置发布 DLQ 消息的主题名称,如下所示。
您可以为DlqDestinationResolver
这是一个功能接口。DlqDestinationResolver
需要ConsumerRecord
并将异常作为输入,然后允许指定主题名称作为输出。
通过访问 KafkaConsumerRecord
,则可以在BiFunction
.
下面是一个提供实现的示例DlqDestinationResolver
.
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
在提供实现时要记住的一件重要事情DlqDestinationResolver
是 binder 中的 provisioner 不会为应用程序自动创建主题。
这是因为活页夹无法推断出实现可能发送到的所有 DLQ 主题的名称。
因此,如果使用此策略提供 DLQ 名称,则应用程序有责任确保事先创建这些主题。
如果DlqDestinationResolver
作为 bean 存在于应用程序中,具有更高的优先级。
如果不想遵循此方法,而是使用配置提供静态 DLQ 名称,则可以设置以下属性。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
如果设置了此设置,则错误记录将发送到主题custom-dlq
. 如果应用程序未使用上述任何一种策略,则它将创建一个名称为error.<input-topic-name>.<application-id>
. 例如,如果绑定的目标主题是inputTopic
应用程序 ID 为process-applicationId
,则默认的 DLQ 主题为error.inputTopic.process-applicationId
. 如果您打算启用 DLQ,则始终建议为每个输入绑定显式创建 DLQ 主题。
3.6.2. 每个输入消费者绑定的DLQ
该物业spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
适用于整个应用程序。这意味着,如果同一应用程序中有多个函数,则此属性将应用于所有函数。但是,如果单个处理器中有多个处理器或多个输入绑定,则可以使用绑定程序为每个输入使用者绑定提供的更细粒度的 DLQ 控件。
如果您有以下处理器,
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
并且您只想在第一个输入绑定上启用 DLQ 并在第二个绑定上启用 skipAndContinue,然后您可以在使用者上执行以下作。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue
以这种方式设置反序列化异常处理程序的优先级高于在绑定程序级别设置的优先级。
3.6.3. DLQ 分区
默认情况下,记录使用与原始记录相同的分区发布到死信主题。 这意味着死信主题必须至少具有与原始记录一样多的分区。
要更改此行为,请添加一个DlqPartitionFunction
实现为@Bean
到应用程序上下文。
只能存在一个这样的豆子。
该函数与消费者组一起提供(在大多数情况下与应用程序 ID 相同),失败ConsumerRecord
和例外。
例如,如果您始终想要路由到分区 0,则可以使用:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果将使用者绑定的dlqPartitions 属性设置为 1(以及活页夹的minPartitionCount 等于1 ),无需提供DlqPartitionFunction ;框架将始终使用分区 0。
如果将使用者绑定的dlqPartitions 属性设置为大于1 (或活页夹的minPartitionCount 大于1 ),您必须提供一个DlqPartitionFunction bean,即使分区计数与原始主题的分区计数相同。 |
在 Kafka Streams 绑定器中使用异常处理功能时,需要记住几件事。
-
该物业
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
适用于整个应用。 这意味着如果同一应用程序中有多个函数,则此属性将应用于所有函数。 -
反序列化的异常处理与本机反序列化和框架提供的消息转换一致。
3.6.4. 在 Binder 中处理生产异常
与上述对反序列化异常处理程序的支持不同,绑定程序不提供用于处理生产异常的第一类机制。
但是,您仍然可以使用StreamsBuilderFactoryBean
定制器,您可以在下面的后续部分中找到更多详细信息。
3.7. 重试关键业务逻辑
在某些情况下,您可能希望重试业务逻辑中对应用程序至关重要的部分。
可能会对关系数据库进行外部调用,或者从 Kafka Streams 处理器调用 REST 端点。
这些调用可能会因各种原因而失败,例如网络问题或远程服务不可用。
更常见的是,如果您可以重试,这些故障可能会自行解决。
默认情况下,Kafka Streams 绑定器会创建RetryTemplate
所有输入绑定的 bean。
如果函数具有以下签名,
@Bean
public java.util.function.Consumer<KStream<Object, String>> process()
和默认绑定名称,RetryTemplate
将注册为process-in-0-RetryTemplate
.
这遵循绑定名称 (process-in-0
)后跟文字-RetryTemplate
.
在多个输入绑定的情况下,将有一个单独的RetryTemplate
每个绑定可用的 bean。
如果有自定义RetryTemplate
应用程序中可用的 bean 并通过spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName
,则该属性优先于任何输入绑定级别重试模板配置属性。
一旦RetryTemplate
从绑定注入到应用程序中,它可用于重试应用程序的任何关键部分。下面是一个示例:
@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
retryTemplate.execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
或者您可以使用自定义RetryTemplate
如下。
@EnableAutoConfiguration
public static class CustomRetryTemplateApp {
@Bean
@StreamRetryTemplate
RetryTemplate fooRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1);
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
fooRetryTemplate().execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
}
请注意,当重试用尽时,默认情况下,将抛出最后一个异常,导致处理器终止。如果您希望处理异常并继续处理,则可以将 RecoveryCallback 添加到execute
方法: 这是一个例子。
retryTemplate.execute(context -> {
//Critical business logic goes here.
}, context -> {
//Recovery logic goes here.
return null;
));
有关 RetryTemplate、重试策略、退避策略等的更多信息,请参阅 Spring Retry 项目。
3.8. 状态商店
当使用高级 DSL 并进行适当的调用以触发状态存储时,Kafka Streams 会自动创建状态存储。
如果您想实现传入的KTable
绑定为命名状态存储,则可以使用以下策略来执行此作。
假设您有以下功能。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
然后通过设置以下属性,传入的KTable
数据将具体化到命名状态存储中。
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store
您可以在应用程序中将自定义状态存储定义为 bean,这些状态存储将被 binder 检测并添加到 Kafka Streams 构建器中。 特别是在使用处理器 API 时,需要手动注册状态存储。 为此,您可以在应用程序中将 StateStore 创建为 bean。 以下是定义此类 bean 的示例。
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
然后,应用程序可以直接访问这些状态存储。
在引导过程中,上述 bean 将由绑定器处理并传递给 Streams 构建器对象。
访问状态存储:
Processor<Object, Product>() {
WindowStore<Object, String> state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}
在注册全局状态存储时,这将不起作用。
要注册全局状态存储,请参阅下面有关自定义的部分StreamsBuilderFactoryBean
.
3.9. 交互式查询
Kafka Streams 绑定器 API 公开了一个名为InteractiveQueryService
以交互方式查询状态存储。
您可以在应用程序中以 Spring Bean 的形式访问它。从应用程序访问此 Bean 的一种简单方法是autowire
豆子。
@Autowired
private InteractiveQueryService interactiveQueryService;
一旦您获得了对此 bean 的访问权限,您就可以查询您感兴趣的特定状态存储。见下文。
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
在启动期间,上述检索存储的方法调用可能会失败。 例如,它可能仍在初始化状态存储的过程中。 在这种情况下,重试此作会很有用。 Kafka Streams 绑定器提供了一个简单的重试机制来适应这一点。
下面是可用于控制此重试的两个属性。
-
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 默认值为
1
. -
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认值为
1000
毫秒。
如果有多个 kafka 流应用程序实例正在运行,那么在以交互方式查询它们之前,您需要确定哪个应用程序实例托管了您要查询的特定密钥。InteractiveQueryService
API 提供了用于识别主机信息的方法。
为了使其正常工作,您必须配置属性application.server
如下:
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
以下是一些代码片段:
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
有关这些主机查找方法的更多信息,请参阅有关这些方法的 Javadoc。 对于这些方法,在启动期间,如果底层 KafkaStreams 对象尚未准备就绪,它们可能会抛出异常。 上述重试属性也适用于这些方法。
3.9.1. 通过 InteractiveQueryService 提供的其他 API 方法
使用以下 API 方法检索KeyQueryMetadata
与给定存储和键的组合相关联的对象。
public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)
使用以下 API 方法检索KakfaStreams
与给定存储和键的组合相关联的对象。
public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)
3.9.2. 自定义存储查询参数
有时需要先微调 store 查询参数,然后才能通过InteractiveQueryService
.
为此,从4.0.1
版本的 Binder 中,你可以为StoreQueryParametersCustomizer
这是一个功能接口,具有customize
采用StoreQueryParameter
作为论据。
这是它的方法签名。
StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);
使用此方法,应用程序可以进一步自定义StoreQueryParameters
例如启用陈旧的商店。
当此 bean 存在于此应用程序中时,InteractiveQueryService
会调用其customize
方法。
请记住,必须有一个唯一的 beanStoreQueryParametersCustomizer 在应用程序中可用。 |
3.10. 健康指标
运行状况指示器需要依赖项spring-boot-starter-actuator
.对于 maven 使用:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Spring Cloud Stream Kafka Streams Binder 提供了一个健康指示器来检查底层流线程的状态。
Spring Cloud Stream 定义了一个属性management.health.binders.enabled
以启用运行状况指示器。请参阅 Spring Cloud Stream 文档。
运行状况指示器为每个流线程的元数据提供以下详细信息:
-
线程名称
-
线程状态:
CREATED
,RUNNING
,PARTITIONS_REVOKED
,PARTITIONS_ASSIGNED
,PENDING_SHUTDOWN
或DEAD
-
活动任务:任务 ID 和分区
-
备用任务:任务 ID 和分区
默认情况下,只有全局状态可见 (UP
或DOWN
).要显示详细信息,属性management.endpoint.health.show-details
必须设置为ALWAYS
或WHEN_AUTHORIZED
.
有关运行状况信息的更多详细信息,请参阅 Spring Boot Actuator 文档。
运行状况指示器的状态为UP 如果注册的所有 Kafka 线程都在RUNNING 州。 |
由于 Kafka Streams 绑定器中有三个单独的绑定器 (KStream
,KTable
和GlobalKTable
),它们都将报告健康状态。
启用时show-details
,报告的一些信息可能是多余的。
当同一应用程序中存在多个 Kafka Streams 处理器时,将报告所有处理器的运行状况检查,并按 Kafka Streams 的应用程序 ID 进行分类。
3.11. 访问 Kafka Streams 指标
Spring Cloud Stream Kafka Streams 绑定器提供可以通过千分尺导出的 Kafka Streams 指标MeterRegistry
.
对于 Spring Boot 版本 2.2.x,度量支持是通过绑定程序的自定义 Micrometer 度量实现提供的。 对于 Spring Boot 版本 2.3.x,Kafka Streams 指标支持是通过 Micrometer 本机提供的。
通过启动执行器端点访问指标时,请确保将metrics
前往酒店management.endpoints.web.exposure.include
.
然后你可以访问/acutator/metrics
以获取所有可用指标的列表,然后可以通过相同的 URI (/actuator/metrics/<metric-name>
).
3.12. 混合使用高级 DSL 和低级处理器 API
Kafka Streams 提供了两种 API 变体。
它有一个更高级别的 DSL 类似 API,您可以在其中链接许多功能程序员可能熟悉的各种作。
Kafka Streams 还允许访问低级处理器 API。
处理器 API 虽然非常强大,并且能够在低得多的级别上控制事物,但本质上是势在必行的。
用于 Spring Cloud Stream 的 Kafka Streams 绑定器允许您使用高级 DSL 或混合使用 DSL 和处理器 API。
混合使用这两种变体可以为您提供许多选项来控制应用程序中的各种用例。
应用程序可以使用transform
或process
方法 API 调用来访问处理器 API。
下面是如何使用process
应用程序接口。
@Bean
public Consumer<KStream<Object, String>> process() {
return input ->
input.process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, String value) {
//business logic
}
@Override
public void close() {
});
}
这是一个使用transform
应用程序接口。
@Bean
public Consumer<KStream<Object, String>> process() {
return (input, a) ->
input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void close() {
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}
这process
API 方法调用是一个终端作,而transform
API 是非终端的,可为您提供潜在的转换KStream
使用它可以继续使用 DSL 或处理器 API 进行进一步处理。
3.13. 出站分区支持
Kafka Streams 处理器通常将处理后的输出发送到出站 Kafka 主题中。
如果出站主题是分区的,并且处理器需要将传出数据发送到特定分区中,那么应用程序需要提供类型为StreamPartitioner
.
有关更多详细信息,请参阅 StreamPartitioner。
让我们看一些例子。
这是我们已经多次看到的同一个处理器,
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
...
}
以下是输出绑定目标:
spring.cloud.stream.bindings.process-out-0.destination: outputTopic
如果主题outputTopic
有 4 个分区,如果您不提供分区策略,Kafka Streams 将使用默认的分区策略,这可能不是您想要的结果,具体取决于特定用例。
假设,您想发送任何匹配的密钥spring
对 0 进行分区,cloud
到分区 1,stream
分区 2,其他所有内容都分区 3。
这是您需要在应用程序中执行的作。
@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
return (t, k, v, n) -> {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("stream")) {
return 2;
}
else {
return 3;
}
};
}
这是一个基本的实现,但是,您可以访问记录的键/值、主题名称和分区总数。 因此,如果需要,您可以实现复杂的分区策略。
您还需要提供此 Bean 名称以及应用程序配置。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner
应用程序中的每个输出主题都需要像这样单独配置。
3.14. StreamsBuilderFactoryBean 其他自定义
通常需要自定义StreamsBuilderFactoryBean
这会创建KafkaStreams
对象。
基于 Spring Kafka 提供的底层支持,binder 允许您自定义StreamsBuilderFactoryBean
.
您可以使用org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer
从 Spring for Apache Kafka 项目中自定义/配置StreamsBuilderFactoryBean
本身。
下面是使用StreamsBuilderFactoryBeanConfigurer
.
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
上面显示了您可以执行的作,以配置StreamsBuilderFactoryBean
.
您基本上可以从StreamsBuilderFactoryBean
以配置它。
此配置器将在工厂 Bean 启动之前由 Binder 调用。
一旦您访问了StreamsBuilderFactoryBean
,您还可以自定义底层KafkaStreams
对象通过KafkaStreamsCustomizer
.
这是这样做的蓝图。
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
KafkaStreamsCustomizer
将被StreamsBuilderFactoryBean
就在基础KafkaStreams
开始。
只能有一个StreamsBuilderFactoryBeanConfigurer
在整个应用程序中。
那么我们如何考虑多个 Kafka Streams 处理器,因为它们中的每个处理器都由单独备份StreamsBuilderFactoryBean
对象?
在这种情况下,如果这些处理器的自定义需要不同,则应用程序需要根据应用程序 ID 应用一些筛选器。
例如,
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
}
};
3.14.1. 使用定制器注册全局状态存储
如上所述,绑定器不提供将全局状态存储注册为功能的第一类方法。 为此,您需要使用定制器。 这是如何做到这一点的。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
try {
final StreamsBuilder streamsBuilder = fb.getObject();
streamsBuilder.addGlobalStore(...);
}
catch (Exception e) {
}
};
}
同样,如果您有多个处理器,则需要将全局状态存储附加到右侧StreamsBuilder
通过过滤掉另一个StreamsBuilderFactoryBean
使用如上所述的应用程序 ID 的对象。
3.14.2. 使用定制器注册生产异常处理程序
在错误处理部分,我们指出 binder 没有提供处理生产异常的第一类方法。
尽管如此,您仍然可以使用StreamsBuilderFacotryBean
customizer 来注册生产异常处理程序。见下文。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}
同样,如果您有多个处理器,您可能需要根据正确的处理器将其适当地设置为StreamsBuilderFactoryBean
.
您也可以使用配置属性添加此类生产异常处理程序(有关更多信息,请参阅下文),但如果您选择使用编程方法,则这是一个选项。
3.15. 时间戳提取器
Kafka Streams 允许您根据各种时间戳概念控制使用者记录的处理。
默认情况下,Kafka Streams 会提取嵌入在使用者记录中的时间戳元数据。
您可以通过提供不同的TimestampExtractor
每个输入绑定的实现。
以下是有关如何做到这一点的一些详细信息。
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
return orderStream ->
customers ->
products -> orderStream;
}
@Bean
public TimestampExtractor timestampExtractor() {
return new WallclockTimestampExtractor();
}
然后你设置上面的TimestampExtractor
每个消费者绑定的 Bean 名称。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"
如果跳过输入使用者绑定来设置自定义时间戳提取器,则该使用者将使用默认设置。
3.16. 具有基于 Kafka Streams 的绑定器和常规 Kafka 绑定器的多绑定器
您可以拥有一个应用程序,其中既有基于常规 Kafka 绑定器的功能/使用者/提供商,又有基于 Kafka Streams 的处理器。 但是,您不能在单个函数或消费者中混合使用它们。
下面是一个示例,在同一应用程序中具有两个基于 Binder 的组件。
@Bean
public Function<String, String> process() {
return s -> s;
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {
return input -> input;
}
这是配置中的相关部分:
spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
如果您拥有与上述相同的应用程序,但正在处理两个不同的 Kafka 集群,例如常规process
同时作用于 Kafka 集群 1 和集群 2(从 cluster-1 接收数据并发送到 cluster-2),并且 Kafka Streams 处理器作用于 Kafka 集群 2。
然后你必须使用 Spring Cloud Stream 提供的多绑定器工具。
下面是配置在这种情况下可能发生的变化。
# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2
# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
注意以上配置。
我们有两种绑定器,但总共有 3 个绑定器,第一种是基于集群 1 (kafka1
),然后是另一个基于集群 2 (kafka2
),最后是kstream
一个 (kafka3
).
应用程序中的第一个处理器从kafka1
并发布到kafka2
其中两个绑定器都基于常规的 Kafka 绑定器,但集群不同。
第二个处理器是 Kafka Streams 处理器,它使用kafka3
与kafka2
,但粘合剂类型不同。
由于 Kafka Streams 系列绑定器中有三种不同的绑定器类型可用 -kstream
,ktable
和globalktable
- 如果您的应用程序具有基于这些绑定中的任何一个的多个绑定,则需要将其显式作为绑定器类型提供。
例如,如果您有如下处理器,
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
...
}
然后,必须在多活页夹方案中按如下方式配置。 请注意,仅当您有一个真正的多绑定器方案时,才需要这样做,其中有多个处理器在单个应用程序中处理多个集群。 在这种情况下,需要显式为绑定器提供绑定,以区别于其他处理器的绑定器类型和集群。
spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream
# rest of the configuration is omitted.
3.17. 状态清理
默认情况下,停止绑定时不会清理任何本地状态。
这与 Spring Kafka 2.7 版的行为相同。
有关更多详细信息,请参阅 Spring Kafka 文档。
要修改此行为,只需将单个CleanupConfig
@Bean
(配置为在启动、停止或两者都不清理)到应用程序上下文;将检测到 Bean 并将其连接到工厂 Bean。
3.18. Kafka Streams 拓扑可视化
Kafka Streams Binder 提供了以下执行器端点,用于检索拓扑描述,您可以使用这些端点使用外部工具可视化拓扑。
/actuator/kafkastreamstopology
/actuator/kafkastreamstopology/<application-id of the processor>
您需要包含 Spring Boot 中的执行器和 Web 依赖项才能访问这些端点。
此外,您还需要添加kafkastreamstopology
自management.endpoints.web.exposure.include
财产。
默认情况下,kafkastreamstopology
端点已禁用。
3.19. Kafka Streams 应用程序中基于事件类型的路由
Kafka Streams 绑定器不支持基于常规消息通道的绑定程序中可用的路由函数。 但是,Kafka Streams 绑定器仍然通过入站记录上的事件类型记录头提供路由功能。
若要启用基于事件类型的路由,应用程序必须提供以下属性。
spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes
.
这可以是逗号分隔的值。
例如,假设我们有这个函数:
@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
return input -> input;
}
我们还假设,如果传入记录的事件类型为foo
或bar
.
这可以使用以下方式表示eventTypes
属性。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar
现在,当应用程序运行时,活页夹会检查每个传入记录中的标头event_type
并查看它的值是否设置为foo
或bar
.
如果找不到其中任何一个,则将跳过函数执行。
默认情况下,活页夹期望记录头键为event_type
,但可以根据绑定进行更改。
例如,如果我们想将此绑定上的标头键更改为my_event
而不是默认值,可以按如下方式更改。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event
.
在 Kafkfa Streams 绑定器中使用事件路由功能时,它使用 byte 数组Serde
反序列化所有传入记录。
如果记录头与事件类型匹配,则只有它使用实际的Serde
使用配置的或推断的Serde
.
如果在绑定上设置反序列化异常处理程序,则会引入问题,因为预期的反序列化仅发生在堆栈中,从而导致意外错误。
为了解决此问题,您可以在绑定上设置以下属性,以强制绑定器使用配置或推断的Serde
而不是字节数组Serde
.
spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents
这样,应用程序可以在使用事件路由功能时立即检测反序列化问题,并可以做出适当的处理决策。
3.20. Kafka Streams 绑定器中的绑定可视化和控制
从 3.1.2 版本开始,Kafka Streams 绑定器支持绑定可视化和控制。
仅支持的两个生命周期阶段是STOPPED
和STARTED
.
生命周期阶段PAUSED
和RESUMED
在 Kafka Streams 绑定器中不可用。
为了激活绑定可视化和控制,应用程序需要包含以下两个依赖项。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
如果您更喜欢使用 webflux,则可以包含spring-boot-starter-webflux
而不是标准的 Web 依赖项。
此外,您还需要设置以下属性:
management.endpoints.web.exposure.include=bindings
为了进一步说明此功能,让我们使用以下应用程序作为指南:
@SpringBootApplication
public class KafkaStreamsApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsApplication.class, args);
}
@Bean
public Consumer<KStream<String, String>> consumer() {
return s -> s.foreach((key, value) -> System.out.println(value));
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> function() {
return ks -> ks;
}
}
正如我们所看到的,该应用程序有两个 Kafka Streams 函数 - 一个是消费者,另一个是函数。
默认情况下,使用者绑定命名为consumer-in-0
.
同样,对于函数,输入绑定为function-in-0
输出绑定为function-out-0
.
应用程序启动后,我们可以使用以下绑定端点找到有关绑定的详细信息。
curl http://localhost:8080/actuator/bindings | jq .
[
{
"bindingName": "consumer-in-0",
"name": "consumer-in-0",
"group": "consumer-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-in-0",
"name": "function-in-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-out-0",
"name": "function-out-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": false,
"extendedInfo": {}
}
]
有关所有三个绑定的详细信息可以在上面找到。
现在让我们停止 consumer-in-0 绑定。
curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
此时,不会通过此绑定接收任何记录。
重新开始绑定。
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
当单个函数上存在多个绑定时,对其中任何一个绑定调用这些作都将起作用。
这是因为单个函数上的所有绑定都由相同的StreamsBuilderFactoryBean
.
因此,对于上述函数,要么function-in-0
或function-out-0
会起作用。
3.21. 手动启动 Kafka Streams 处理器
Spring Cloud Stream Kafka Streams 绑定器提供了一个名为StreamsBuilderFactoryManager
在StreamsBuilderFactoryBean
来自 Apache Kafka 的 Spring。
此管理器 API 用于控制多个StreamsBuilderFactoryBean
基于活页夹的应用程序中的每个处理器。
因此,在使用活页夹时,如果要手动控制各种StreamsBuilderFactoryBean
对象,您需要使用StreamsBuilderFactoryManager
.
您可以使用该属性spring.kafka.streams.auto-startup
并将其设置为false
以关闭处理器的自动启动。
然后,在应用程序中,您可以使用以下内容来启动处理器StreamsBuilderFactoryManager
.
@Bean
public ApplicationRunner runner(StreamsBuilderFactoryManager sbfm) {
return args -> {
sbfm.start();
};
}
当您希望应用程序在主线程中启动并让 Kafka Streams 处理器单独启动时,此功能非常方便。
例如,当有一个需要还原的大型状态存储时,如果处理器像默认情况一样正常启动,这可能会阻止应用程序启动。
如果您正在使用某种活跃度探测机制(例如在 Kubernetes 上),它可能会认为应用程序已关闭并尝试重新启动。
为了纠正此问题,您可以将spring.kafka.streams.auto-startup
自false
并按照上述方法进行作。
请记住,在使用 Spring Cloud Stream 绑定器时,您不会直接处理StreamsBuilderFactoryBean
来自 Apache Kafka 的 Spring,而不是StreamsBuilderFactoryManager
,作为StreamsBuilderFactoryBean
对象由活页夹在内部管理。
3.22. 有选择地手动启动 Kafka Streams 处理器
虽然上面列出的方法将无条件应用自动启动false
通过StreamsBuilderFactoryManager
,通常希望只有单独选择的 Kafka Streams 处理器不自动启动。
例如,假设您的应用程序中有三个不同的函数(处理器),并且对于其中一个处理器,您不希望将其作为应用程序启动的一部分来启动它。
这是这种情况的一个例子。
@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {
}
@Bean
public Consumer<KStream<?, ?>> process2() {
}
@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {
}
在上述方案中,如果将spring.kafka.streams.auto-startup
自false
,则在应用程序启动期间,没有任何处理器将自动启动。
在这种情况下,您必须按照上述方式以编程方式启动它们,方法是调用start()
在基础上StreamsBuilderFactoryManager
.
但是,如果我们有一个用例有选择地仅禁用一个处理器,那么您必须将auto-startup
在该处理器的单个绑定上。
让我们假设我们不想要我们的process3
自动启动功能。
这是一个BiFunction
具有两个输入绑定 -process3-in-0
和process3-in-1
.
为了避免此处理器自动启动,您可以选择这些输入绑定中的任何一个,并将auto-startup
在他们身上。
您选择哪种装订并不重要;如果您愿意,您可以设置auto-startup
自false
在他们两个上,但一个就足够了。
因为它们共享同一个工厂 bean,所以您不必在两个绑定上将 autoStartup 设置为 false,但为了清楚起见,这样做可能是有意义的。
这是可用于禁用此处理器的自动启动的 Spring Cloud Stream 属性。
spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false
或
spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false
然后,您可以使用 REST 端点或使用BindingsEndpoint
API 如下所示。
为此,您需要确保对类路径具有 Spring Boot 执行器依赖项。
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process3-in-0
或
@Autowired
BindingsEndpoint endpoint;
@Bean
public ApplicationRunner runner() {
return args -> {
endpoint.changeState("process3-in-0", State.STARTED);
};
}
有关此机制的更多详细信息,请参阅参考文档中的此部分。
通过禁用来控制绑定时auto-startup 如本节所述,请注意,这仅适用于消费者绑定。换句话说,如果您使用生产者绑定,process3-out-0 ,这在禁用处理器的自动启动方面没有任何影响,尽管此生产者绑定使用相同的StreamsBuilderFactoryBean 作为消费者绑定。 |
3.23. 使用 Spring Cloud Sleuth 进行跟踪
当 Spring Cloud Sleuth 位于基于 Spring Cloud Stream Kafka Streams 绑定程序的应用程序的类路径上时,其使用者和生产者都会自动检测跟踪信息。
但是,为了跟踪任何特定于应用程序的作,需要由用户代码显式检测这些作。
这可以通过注入KafkaStreamsTracing
bean 来自应用程序中的 Spring Cloud Sleuth,然后通过这个注入的 bean 调用各种 Kafka Streams作。
以下是一些使用它的示例。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
return new KeyValue<>(value.getRegion(),
value.getClicks());
}))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum, Materialized.as(CLICK_UPDATES))
.toStream());
}
在上面的示例中,有两个地方添加了显式跟踪检测。
首先,我们记录传入的键/值信息KStream
.
记录此信息时,也会记录关联的跨度和跟踪 ID,以便监控系统可以跟踪它们并与同一跨度 ID 相关联。
其次,当我们调用map
作,而不是直接在KStream
类,我们将其包装在一个transform
作,然后调用map
从KafkaStreamsTracing
.
在这种情况下,记录的消息也将包含跨度 ID 和跟踪 ID。
这是另一个示例,我们使用低级转换器 API 来访问各种 Kafka Streams 标头。 当 spring-cloud-sleuth 在类路径上时,也可以像这样访问所有跟踪标头。
@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
return input -> input.transform(kafkaStreamsTracing.transformer(
"transformer-1",
() -> new Transformer<String, String, KeyValue<String, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, String> transform(String key, String value) {
LOG.info("Headers: " + this.context.headers());
LOG.info("K/V:" + key + "/" + value);
// More transformations, business logic execution, etc. go here.
return KeyValue.pair(key, value);
}
@Override
public void close() {
}
}));
}
3.24. 配置选项
本节包含 Kafka Streams 绑定器使用的配置选项。
有关与 binder 相关的常见配置选项和属性,请参阅核心文档。
3.24.1. Kafka Streams Binder 属性
以下属性在活页夹级别可用,并且必须以spring.cloud.stream.kafka.streams.binder.
在 Kafka Streams 绑定器中重复使用的任何 Kafka 绑定器提供的属性都必须以spring.cloud.stream.kafka.streams.binder
而不是spring.cloud.stream.kafka.binder
.
此规则的唯一例外是定义 Kafka 引导服务器属性时,在这种情况下,任一前缀都有效。
- 配置
-
映射包含与 Apache Kafka Streams API 相关的属性的键/值对。 此属性必须以
spring.cloud.stream.kafka.streams.binder.
. 以下是使用此属性的一些示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有关可能进入流配置的所有属性的更多信息,请参阅StreamsConfig
Apache Kafka Streams 文档中的 JavaDocs。
您可以从StreamsConfig
可以通过这个设置。
使用此属性时,它适用于整个应用程序,因为这是活页夹级别的属性。
如果应用程序中有多个处理器,则所有这些处理器都将获得这些属性。
对于像application.id
,这将成为问题,因此您必须仔细检查属性如何StreamsConfig
使用此活页夹级别进行映射configuration
财产。
- functions.<function-bean-name>.applicationId
-
仅适用于功能型处理器。 这可用于设置应用程序中每个功能的应用程序 ID。 在多个函数的情况下,这是设置应用程序 ID 的便捷方法。
- functions.<function-bean-name>.configuration
-
仅适用于功能型处理器。 映射包含与 Apache Kafka Streams API 相关的属性的键/值对。 这类似于活页夹级别
configuration
属性描述,但这个级别的configuration
属性仅针对命名函数进行限制。 当您有多个处理器并且想要根据特定函数限制对配置的访问时,您可能需要使用它。 都StreamsConfig
属性可以在此处使用。 - 经纪人
-
代理 URL
违约:
localhost
- zk节点
-
Zookeeper URL
违约:
localhost
- 反序列化ExceptionHandler
-
反序列化错误处理程序类型。 此处理程序在绑定程序级别应用,因此应用于应用程序中的所有输入绑定。 有一种方法可以在消费者绑定级别以更细粒度的方式控制它。 可能的值是 -
logAndContinue
,logAndFail
,skipAndContinue
或sendToDlq
违约:
logAndFail
- 应用程序 Id
-
在绑定器级别全局设置 Kafka Streams 应用程序 application.id 的便捷方法。 如果应用程序包含多个函数,则应用程序 ID 应以不同的方式设置。 请参阅上面详细讨论设置应用程序 ID。
默认:应用程序将生成静态应用程序 ID。有关更多详细信息,请参阅应用程序 ID 部分。
- stateStoreRetry.maxAttempts
-
尝试连接到状态存储的最大尝试次数。
默认值:1
- stateStoreRetry.backoffPeriod
-
尝试在重试时连接到状态存储时的回退期。
默认值:1000 毫秒
- consumer属性
-
活页夹级别的任意使用者属性。
- producer属性
-
绑定器级别的任意生产者属性。
- includeStoppedProcessorsForHealthCheck
-
当处理器的绑定通过执行器停止时,默认情况下,该处理器将不参与健康检查。 将此属性设置为
true
为所有处理器启用运行状况检查,包括当前通过绑定执行器端点停止的处理器。默认值:false
3.24.2. Kafka Streams 生产者属性
以下属性仅适用于 Kafka Streams 生产者,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.
为方便起见,如果有多个输出绑定并且它们都需要一个通用值,则可以使用前缀spring.cloud.stream.kafka.streams.default.producer.
.
- keySerde
-
要使用的密钥 Serde
默认值:请参阅上面关于消息反序列化的讨论
- 值Serde
-
值 serde 使用
默认值:请参阅上面关于消息反序列化的讨论
- 使用原生编码
-
标志来启用/禁用本机编码
违约:
true
. - streamPartitionerBeanName (流分区器BeanName)
-
要在使用者处使用的自定义出站分区器 Bean 名称。 应用程序可以提供自定义
StreamPartitioner
作为 Spring Bean,并且可以将此 Bean 的名称提供给生产者以代替默认名称。默认值:请参阅上面有关出站分区支持的讨论。
- 生产作为
-
处理器要生成到的接收器组件的自定义名称。
亲爱的:
none
(由 Kafka Streams 生成)
3.24.3. Kafka Streams 消费者属性
以下属性可用于 Kafka Streams 使用者,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.
为方便起见,如果有多个输入绑定并且它们都需要一个公共值,则可以使用前缀spring.cloud.stream.kafka.streams.default.consumer.
.
- 应用程序 Id
-
为每个输入绑定设置 application.id。
默认值:见上文。
- keySerde
-
要使用的密钥 Serde
默认值:请参阅上面关于消息反序列化的讨论
- 值Serde
-
值 serde 使用
默认值:请参阅上面关于消息反序列化的讨论
- 具体化为
-
状态存储在使用传入的 KTable 类型时实现
违约:
none
. - 使用原生解码
-
标志来启用/禁用本机解码
违约:
true
. - dlq名称
-
DLQ 主题名称。
默认值:请参阅上文有关错误处理和 DLQ 的讨论。
- 开始偏移量
-
如果没有要使用的已提交偏移量,则从偏移量开始。 这主要用于消费者第一次使用某个主题时。 Kafka Streams 使用
earliest
作为默认策略,活页夹使用相同的默认值。 这可以重写为latest
使用此属性。违约:
earliest
.
注意:使用resetOffsets
对 Kafka Streams 绑定器没有任何影响。
与基于消息通道的绑定器不同,Kafka Streams 绑定器不会寻求按需开始或结束。
- 反序列化ExceptionHandler
-
反序列化错误处理程序类型。 此处理程序按使用者绑定应用,而不是前面所述的绑定器级别属性。 可能的值是 -
logAndContinue
,logAndFail
,skipAndContinue
或sendToDlq
违约:
logAndFail
- timestampExtractorBeanName
-
要在使用者处使用的特定时间戳提取器 Bean 名称。 应用程序可以提供
TimestampExtractor
作为 Spring bean,并且可以将此 bean 的名称提供给消费者以代替默认名称。默认值:请参阅上面关于时间戳提取器的讨论。
- 事件类型
-
此绑定支持的事件类型的逗号分隔列表。
违约:
none
- eventTypeHeaderKey
-
事件类型标头键。
违约:
event_type
- consumedAs
-
处理器从中消费的源组件的自定义名称。
亲爱的:
none
(由 Kafka Streams 生成)
3.24.4. 关于并发的特别说明
在 Kafka Streams 中,您可以使用num.stream.threads
财产。
这,您可以使用各种configuration
上述 binder、functions、producer 或 consumer 级别下的选项。
您还可以使用concurrency
核心 Spring Cloud Stream 为此目的提供的属性。
使用此功能时,您需要在消费者上使用它。
当有多个输入绑定时,请在第一个输入绑定上设置此值。
例如,当设置spring.cloud.stream.bindings.process-in-0.consumer.concurrency
,它将被翻译为num.stream.threads
通过活页夹。
如果您有多个处理器,并且一个处理器定义绑定级别并发,但不定义其他处理器,则那些没有绑定级别并发的处理器将默认返回通过spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads
. 如果此 Binder 配置不可用,则应用程序将使用 Kafka Streams 设置的默认设置。
4. 提示、技巧和Recipes
4.1. 使用 Kafka 的简单 DLQ
4.1.1. 问题陈述
作为开发人员,我想编写一个消费者应用程序来处理来自 Kafka 主题的记录。但是,如果在处理过程中出现一些错误,我不希望应用程序完全停止。相反,我想将错误的记录发送到 DLT(死信主题),然后继续处理新记录。
4.1.2. 解决方案
这个问题的解决方案是使用 Spring Cloud Stream 中的 DLQ 功能。出于本次讨论的目的,让我们假设以下是我们的处理器函数。
@Bean
public Consumer<byte[]> processData() {
return s -> {
throw new RuntimeException();
};
}
这是一个非常微不足道的函数,它会为它处理的所有记录抛出异常,但您可以采用此函数并将其扩展到任何其他类似情况。
为了将错误的记录发送到 DLT,我们需要提供以下配置。
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
为了激活 DLQ,应用程序必须提供组名称。
匿名使用者无法使用 DLQ 设施。
我们还需要通过设置enableDLQ
Kafka 消费者绑定到true
.
最后,我们可以选择通过提供dlqName
在 Kafka 消费者绑定上,否则默认为error.input-topic.my-group
在这种情况下。
请注意,在上面提供的示例消费者中,有效负载的类型是byte[]
.
默认情况下,Kafka binder 中的 DLQ 生产者需要byte[]
.
如果不是这种情况,那么我们需要提供正确序列化程序的配置。
例如,让我们重写消费者函数,如下所示:
@Bean
public Consumer<String> processData() {
return s -> {
throw new RuntimeException();
};
}
现在,我们需要告诉 Spring Cloud Stream,在写入 DLT 时我们希望如何序列化数据。 下面是此方案的修改配置:
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
4.2. 具有高级重试选项的 DLQ
4.2.1. 问题陈述
这与上面的配方类似,但作为开发人员,我想配置重试的处理方式。
4.2.2. 解决方案
如果您遵循了上述配方,那么当处理遇到错误时,您将获得 Kafka 绑定器中内置的默认重试选项。
默认情况下,活页夹最多停用 3 次尝试,初始延迟为 1 秒,每次回退为 2.0 乘数,最大延迟为 10 秒。 您可以按如下方式更改所有这些配置:
spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval
如果需要,还可以通过提供布尔值映射来提供可重试异常的列表。 例如
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false
默认情况下,将重试上面映射中未列出的任何异常。如果不需要,那么您可以通过提供
spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false
您也可以提供自己的RetryTemplate
并将其标记为@StreamRetryTemplate
这将被活页夹扫描和使用。当您需要更复杂的重试策略和策略时,这很有用。
如果您有多个@StreamRetryTemplate
bean,那么你可以使用属性
spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>
4.3. 使用 DLQ 处理反序列化错误
4.3.1. 问题陈述
我有一个处理器在 Kafka 消费者中遇到反序列化异常。我希望 Spring Cloud Stream DLQ 机制会捕获这种情况,但它没有。我该如何处理这个问题?
4.3.2. 解决方案
当 Kafka 消费者抛出不可恢复的反序列化异常时,Spring Cloud Stream 提供的正常 DLQ 机制将无济于事。
这是因为,此异常甚至发生在消费者的poll()
方法返回。
Spring for Apache Kafka 项目提供了一些很好的方法来帮助 Binder 解决这种情况。
让我们来探讨一下。
假设这是我们的函数:
@Bean
public Consumer<String> functionName() {
return s -> {
System.out.println(s);
};
}
这是一个简单的函数,需要一个String
参数。
我们想绕过 Spring Cloud Stream 提供的消息转换器,而是想使用本机反序列化器。
在以下情况下String
类型,这没有多大意义,但对于更复杂的类型,如 AVRO 等,您必须依赖外部解序列化器,因此希望将转换委托给 Kafka。
现在,当消费者收到数据时,让我们假设有一个导致反序列化错误的错误记录,也许有人传递了Integer
而不是String
例如。
在这种情况下,如果您不在应用程序中执行某些作,则异常将通过链传播,并且您的应用程序最终将退出。
为了处理这个问题,您可以添加一个ListenerContainerCustomizer
@Bean
配置DefaultErrorHandler
.
这DefaultErrorHandler
配置了DeadLetterPublishingRecoverer
.
我们还需要配置一个ErrorHandlingDeserializer
对于消费者。
这听起来像是很多复杂的事情,但实际上,在这种情况下,它归结为这 3 个豆子。
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setCommonErrorHandler(errorHandler);
};
}
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
让我们逐一分析一下。
第一个是ListenerContainerCustomizer
需要DefaultErrorHandler
.
容器现在已使用该特定错误处理程序进行自定义。
您可以在此处了解有关容器定制的更多信息。
第二个 bean 是DefaultErrorHandler
配置了发布到DLT
.
有关更多详细信息,请参阅此处DefaultErrorHandler
.
第三个 bean 是DeadLetterPublishingRecoverer
最终负责发送到DLT
.
默认情况下,DLT
topic 被命名为ORIGINAL_TOPIC_NAME。分布式账本技术。
不过你可以改变这一点。
有关更多详细信息,请参阅文档。
我们还需要通过应用程序配置配置 ErrorHandlingDeserializer。
这ErrorHandlingDeserializer
委托给实际的解序列化程序。
如果出现错误,它会将记录的键/值设置为空,并包含消息的原始字节。
然后,它在标头中设置异常,并将此记录传递给侦听器,然后侦听器调用已注册的错误处理程序。
以下是所需的配置:
spring.cloud.stream:
function:
definition: functionName
bindings:
functionName-in-0:
group: group-name
destination: input-topic
consumer:
use-native-decoding: true
kafka:
bindings:
functionName-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
我们正在提供ErrorHandlingDeserializer
通过configuration
属性。
我们还指出,要委托的实际反序列化程序是StringDeserializer
.
请记住,上述 dlq 属性都与此配方中的讨论无关。 它们纯粹用于解决任何应用程序级错误。
4.4. Kafka Binder 中的基本偏移量管理
4.4.1. 问题陈述
我想编写一个 Spring Cloud Stream Kafka 消费者应用程序,但不确定它如何管理 Kafka 消费者偏移量。 你能解释一下吗?
4.4.2. 解决方案
我们鼓励您阅读有关此内容的文档部分以全面了解它。
这是它的要点:
Kafka 默认支持两种类型的偏移量开始 -earliest
和latest
.
从他们的名字来看,它们的语义是不言自明的。
假设您是第一次运行消费者。
如果您错过了 Spring Cloud Stream 应用程序中的 group.id,那么它就会成为匿名消费者。
无论何时,您有一个匿名消费者,在这种情况下,Spring Cloud Stream 应用程序默认将从latest
主题分区中的可用偏移量。
另一方面,如果显式指定 group.id,则默认情况下,Spring Cloud Stream 应用程序将从earliest
主题分区中的可用偏移量。
在上述两种情况下(具有显式组和匿名组的使用者),可以使用属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset
并将其设置为earliest
或latest
.
现在,假设您之前已经运行了消费者,现在又开始了它。
在这种情况下,上述情况下的起始偏移量语义不适用,因为使用者会为使用者组找到已提交的偏移量(对于匿名使用者,尽管应用程序不提供 group.id,但绑定器会自动为您生成一个)。
它只是从最后一个承诺的偏移量开始。
这是真的,即使你有一个startOffset
提供的值。
但是,您可以使用resetOffsets
财产。
为此,请设置属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets
自true
(即false
默认情况下)。
然后确保提供startOffset
值(earliest
或latest
).
当您执行此作,然后启动使用者应用程序时,每次启动时,它都会像第一次启动一样启动,并忽略分区的任何已提交偏移量。
4.5. 在 Kafka 中寻求任意偏移
4.5.1. 问题陈述
使用 Kafka 绑定器,我知道它可以将偏移量设置为earliest
或latest
,但我有一个要求,即寻求中间某物的偏移量,一个任意偏移量。
有没有办法使用 Spring Cloud Stream Kafka 绑定器来实现这一点?
4.5.2. 解决方案
之前我们了解了 Kafka binder 如何允许您处理基本的偏移量管理。 默认情况下,活页夹不允许您倒带到任意偏移量,至少通过我们在该配方中看到的机制是这样。 但是,活页夹提供了一些低级策略来实现此用例。 让我们来探讨一下它们。
首先,当您想要重置为任意偏移量时,除了earliest
或latest
,请务必将resetOffsets
配置设置为其默认值,即false
.
然后,您必须提供类型为KafkaBindingRebalanceListener
,这将被注入到所有消费者绑定中。
这是一个带有一些默认方法的界面,但这是我们感兴趣的方法:
/**
* Invoked when partitions are initially assigned or after a rebalance. Applications
* might only want to perform seek operations on an initial assignment. While the
* 'initial' argument is true for each thread (when concurrency is greater than 1),
* implementations should keep track of exactly which partitions have been sought.
* There is a race in that a rebalance could occur during startup and so a topic/
* partition that has been sought on one thread may be re-assigned to another
* thread and you may not wish to re-seek it at that time.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment on the current thread.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
// do nothing
}
让我们看看细节。
从本质上讲,每次在主题分区的初始分配期间或重新平衡之后都会调用此方法。
为了更好地说明,让我们假设我们的主题是foo
它有 4 个分区。
最初,我们只启动组中的单个使用者,并且该使用者将从所有分区中使用。
当使用者首次启动时,所有 4 个分区都会被初始分配。
但是,我们不想启动要以默认值 (earliest
由于我们定义了一个组),而不是对于每个分区,我们希望它们在寻求任意偏移量后使用。
假设您有一个业务案例要从某些偏移中使用,如下所示。
Partition start offset
0 1000
1 2000
2 2000
3 1000
这可以通过实现上述方法来实现,如下所示。
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);
if (initial) {
partitions.forEach(tp -> {
if (topicPartitionOffset.containsKey(tp)) {
final Long offset = topicPartitionOffset.get(tp);
try {
consumer.seek(tp, offset);
}
catch (Exception e) {
// Handle exceptions carefully.
}
}
});
}
}
这只是一个基本的实现。
现实世界的用例比这复杂得多,您需要进行相应的调整,但这肯定会为您提供基本的草图。
当消费者seek
失败,它可能会抛出一些运行时异常,您需要决定在这些情况下该怎么做。
4.5.3. 如果我们启动具有相同组 ID 的第二个消费者会怎样?
当我们添加第二个消费者时,将发生重新平衡,并且一些分区将被移动。
假设新的使用者获得分区2
和3
.
当这个新的 Spring Cloud Stream 消费者调用onPartitionsAssigned
方法,它将看到这是分区的初始赋值2
和3
在这个消费者身上。
因此,它将执行寻道作,因为对initial
论点。
对于第一个消费者,它现在只有分区0
和1
但是,对于该消费者来说,这只是一个重新平衡事件,不被视为初始分配。
因此,它不会重新搜索给定的偏移量,因为对initial
论点。
4.6. 如何使用 Kafka binder 手动确认?
4.6.1. 问题陈述
使用 Kafka 绑定器,我想手动确认使用者中的消息。 我该怎么做?
4.6.2. 解决方案
默认情况下,Kafka 绑定器委托给 Spring for Apache Kafka 项目中的默认提交设置。
默认值ackMode
在Spring,卡夫卡是batch
.
有关这方面的更多详细信息,请参阅此处。
在某些情况下,您希望禁用此默认提交行为并依赖手动提交。 以下步骤允许您做到这一点。
设置属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode
设置为MANUAL
或MANUAL_IMMEDIATE
.
当它设置为这样时,就会有一个名为kafka_acknowledgment
(从KafkaHeaders.ACKNOWLEDGMENT
) 存在于消费者方法接收的消息中。
例如,将其想象为您的消费者方法。
@Bean
public Consumer<Message<String>> myConsumer() {
return msg -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
};
}
然后,您将属性spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode
自MANUAL
或MANUAL_IMMEDIATE
.
4.7. 如何覆盖 Spring Cloud Stream 中的默认绑定名称?
4.7.1. 问题陈述
Spring Cloud Stream 根据函数定义和签名创建默认绑定,但是如何将它们覆盖为更友好的域名呢?
4.7.2. 解决方案
假设下面是函数签名。
@Bean
public Function<String, String> uppercase(){
...
}
默认情况下,Spring Cloud Stream 将创建如下绑定。
-
0 中的大写
-
大写-0
可以使用以下属性将这些绑定重写到某些内容。
spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out
在此之后,必须对新名称创建所有绑定属性,my-transformer-in
和my-transformer-out
.
这是另一个使用 Kafka Streams 和多个输入的示例。
@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}
默认情况下,Spring Cloud Stream 将为此函数创建三个不同的绑定名称。
-
流程订单 in 0
-
流程订购合1
-
流程订单出 0
每次要在这些绑定上设置一些配置时,都必须使用这些绑定名称。 您不喜欢这样,并且希望使用更对域友好且可读的绑定名称,例如,类似的东西。
-
订单
-
帐户
-
enrichedOrders
只需设置这三个属性即可轻松做到这一点
-
spring.cloud.stream.function.bindings.processOrder-in-0=订单
-
spring.cloud.stream.function.bindings.processOrder-in-1=账户
-
spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders
执行此作后,它将覆盖默认绑定名称,并且要在其上设置的任何属性都必须位于这些新绑定名称上。
4.8. 如何发送消息密钥作为记录的一部分?
4.8.1. 问题陈述
我需要将密钥与记录的有效负载一起发送,有没有办法在 Spring Cloud Stream 中做到这一点?
4.8.2. 解决方案
通常需要将关联数据结构(如地图)作为具有键和值的记录发送。 Spring Cloud Stream 允许您以简单的方式做到这一点。 以下是执行此作的基本蓝图,但您可能希望将其调整为特定用例。
这是示例生产者方法(又名Supplier
).
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
这是一个微不足道的函数,它发送一条带有String
有效载荷,但也带有密钥。
请注意,我们将键设置为消息头KafkaHeaders.MESSAGE_KEY
.
如果要更改默认键kafka_messageKey
,那么在配置中,我们需要指定这个属性:
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']
请注意,我们使用绑定名称supplier-out-0
由于这是我们的函数名称,请相应更新。
然后,我们在生成消息时使用这个新键。
4.9. 如何使用本机序列化器和反序列化器而不是 Spring Cloud Stream 完成的消息转换?
4.9.1. 问题陈述
我想在 Kafka 中使用本机序列化器和反序列化器,而不是使用 Spring Cloud Stream 中的消息转换器。 默认情况下,Spring Cloud Stream 使用其内部内置消息转换器来处理此转换。 如何绕过这一点并将责任委托给 Kafka?
4.9.2. 解决方案
这真的很容易做到。
您所要做的就是提供以下属性以启用本机序列化。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true
然后,您还需要设置序列化程序。 有几种方法可以做到这一点。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
或使用活页夹配置。
spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
使用绑定方式时,它适用于所有绑定,而在绑定处设置它们是每个绑定。
在反序列化方面,您只需提供反序列化器作为配置。
例如
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
您还可以在活页夹级别设置它们。
可以设置一个可选属性来强制本机解码。
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true
但是,对于 Kafka 绑定器,这是不必要的,因为当它到达绑定器时,Kafka 已经使用配置的反序列化器对它们进行了反序列化。
4.10. 解释偏移重置在 Kafka Streams 绑定器中的工作原理
4.10.1. 问题陈述
默认情况下,Kafka Streams 绑定器始终从新使用者的最早偏移量开始。 有时,应用程序从最新的偏移量开始是有益的或要求的。 Kafka Streams 绑定器允许您做到这一点。
4.10.2. 解决方案
在我们查看解决方案之前,让我们先看看以下场景。
@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
(s, t) -> s.join(t, ...)
...
}
我们有一个BiConsumer
需要两个输入绑定的 bean。
在这种情况下,第一个绑定是针对KStream
第二个是针对KTable
.
首次运行此应用程序时,默认情况下,两个绑定都从earliest
抵消。
我想从latest
由于某些要求而抵消?
您可以通过启用以下属性来执行此作。
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest
如果您只想从一个绑定开始latest
offset 和另一个与默认值的消费者earliest
,则将后者从配置中保留绑定。
请记住,一旦存在已提交偏移量,这些设置将不会被接受,并且已提交偏移量优先。
4.11. 跟踪 Kafka 中记录的成功发送(生成)
4.11.1. 问题陈述
我有一个 Kafka 生产者应用程序,我想跟踪我所有成功的发送。
4.11.2. 解决方案
让我们假设我们在应用程序中有以下提供商。
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
然后,我们需要定义一个新的MessageChannel
bean 来捕获所有成功的发送信息。
@Bean
public MessageChannel fooRecordChannel() {
return new DirectChannel();
}
接下来,在应用程序配置中定义此属性,以提供recordMetadataChannel
.
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel
此时,成功发送的信息将发送到fooRecordChannel
.
您可以编写一个IntegrationFlow
如下所示,以查看信息。
@Bean
public IntegrationFlow integrationFlow() {
return f -> f.channel("fooRecordChannel")
.handle((payload, messageHeaders) -> payload);
}
在handle
方法,有效负载是发送到 Kafka 的内容,消息头包含一个名为kafka_recordMetadata
.
它的值是RecordMetadata
包含有关主题分区、当前偏移量等的信息。
4.12. 在 Kafka 中添加自定义标头映射器
4.12.1. 问题陈述
我有一个 Kafka 生产者应用程序,它设置了一些标头,但消费者应用程序中缺少它们。为什么?
4.12.2. 解决方案
一般情况下,这应该没问题。
想象一下,你有以下生产商。
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}
在消费者方面,您仍然应该看到标头“foo”,并且以下内容应该不会给您带来任何问题。
@Bean
public Consumer<Message<String>> consume() {
return s -> {
final String foo = (String)s.getHeaders().get("foo");
System.out.println(foo);
};
}
如果您在应用程序中提供自定义标头映射器,则这将不起作用。
假设您有一个空的KafkaHeaderMapper
在应用程序中。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
}
};
}
如果这是你的实现,那么你会错过foo
标头。
很有可能,你可能在其中有一些逻辑KafkaHeaderMapper
方法。
您需要以下内容来填充foo
页眉。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String foo = (String) headers.get("foo");
target.add("foo", foo.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header foo = source.lastHeader("foo");
target.put("foo", new String(foo.value()));
}
}
这将正确填充foo
标头。
4.12.3. 关于 id 标头的特别说明
在 Spring Cloud Stream 中,id
header 是一个特殊的标头,但某些应用程序可能希望具有特殊的自定义 ID 标头 - 类似于custom-id
或ID
或Id
. 第一个 (custom-id
) 将在没有任何自定义标头映射器的情况下从生产者传播到消费者。但是,如果您使用保留的框架变体进行生产id
header - 例如ID
,Id
,iD
等等。那么你会遇到框架内部的问题。请参阅此 StackOverflow 线程,了解有关此用例的更多上下文。在这种情况下,您必须使用自定义KafkaHeaderMapper
以映射区分大小写的 id 标头。例如,假设您有以下生产者。
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}
标题Id
以上将从消费方消失,因为它与框架发生冲突id
页眉。 可以提供自定义KafkaHeaderMapper
来解决这个问题。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String myId = (String) headers.get("Id");
target.add("Id", myId.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header Id = source.lastHeader("Id");
target.put("Id", new String(Id.value()));
}
};
}
通过这样做,两者id
和Id
标头将从生产者到消费者端可用。
4.13. 在事务中生成多个主题
4.13.2. 解决方案
在 Kafka 绑定器中使用事务支持,然后提供AfterRollbackProcessor
.
为了生成多个主题,请使用StreamBridge
应用程序接口。
以下是为此的代码片段:
@Autowired
StreamBridge bridge;
@Bean
Consumer<String> input() {
return str -> {
System.out.println(str);
this.bridge.send("left", str.toUpperCase());
this.bridge.send("right", str.toLowerCase());
if (str.equals("Fail")) {
throw new RuntimeException("test");
}
};
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
container.setAfterRollbackProcessor(rollbackProcessor);
};
}
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
return new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}
4.13.3. 所需配置
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right
spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1
为了进行测试,您可以使用以下内容:
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
System.in.read();
template.send("input", "Fail".getBytes());
template.send("input", "Good".getBytes());
};
}
一些重要注意事项:
请确保您在应用程序配置上没有任何 DLQ 设置,因为我们手动配置 DLT(默认情况下,它将发布到名为input.DLT
基于初始消费者函数)。
此外,重置maxAttempts
在消费者绑定到1
以避免绑定器重试。
在上面的示例中,它将最多尝试 3 次(初始尝试 + 在FixedBackoff
).
有关如何测试此代码的更多详细信息,请参阅 StackOverflow 线程。
如果您使用 Spring Cloud Stream 通过添加更多消费者函数来测试它,请确保将isolation-level
在消费者绑定到read-committed
.
这个 StackOverflow 线程也与此讨论有关。
4.14. 运行多个可轮询消费者时要避免的陷阱
4.14.1. 问题陈述
如何运行可轮询消费者的多个实例并生成唯一的client.id
对于每个实例?
4.14.2. 解决方案
假设我有以下定义:
spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group
运行应用程序时,Kafka 消费者会生成一个 client.id(类似于consumer-my-group-1
). 对于正在运行的应用程序的每个实例,此client.id
将是相同的,从而导致意外问题。
为了解决此问题,可以在应用程序的每个实例上添加以下属性:
spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}
有关更多详细信息,请参阅此 GitHub 问题。