参考指南
本指南介绍了 Spring Cloud Stream Binder 的 Apache Kafka 实现。 它包含有关其设计、用法和配置选项的信息,以及有关 Stream Cloud Stream 概念如何映射到 Apache Kafka 特定构造的信息。 此外,本指南还介绍了 Spring Cloud Stream 的 Kafka Streams 绑定能力。
1. Apache Kafka 绑定器
1.1. 用法
要使用 Apache Kafka 绑定器,您需要将spring-cloud-stream-binder-kafka
作为 Spring Cloud Stream 应用程序的依赖项,如以下 Maven 示例所示:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
或者,您也可以使用 Spring Cloud Stream Kafka Starter,如以下 Maven 示例所示:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
1.2. 概述
下图显示了 Apache Kafka 绑定程序如何运行的简化图:

Apache Kafka Binder 实现将每个目标映射到 Apache Kafka 主题。 消费者组直接映射到相同的 Apache Kafka 概念。 分区也直接映射到 Apache Kafka 分区。
该绑定器当前使用 Apache Kafkakafka-clients
版本2.3.1
.
此客户端可以与较旧的代理通信(请参阅 Kafka 文档),但某些功能可能不可用。
例如,对于低于 0.11.x.x 的版本,不支持本机标头。
此外,0.11.x.x 不支持autoAddPartitions
财产。
1.3. 配置选项
本节包含 Apache Kafka 绑定器使用的配置选项。
有关与 binder 相关的常见配置选项和属性,请参阅核心文档。
1.3.1. Kafka Binder 属性
- spring.cloud.stream.kafka.binder.brokers
-
Kafka 绑定器连接到的代理列表。
违约:
localhost
. - spring.cloud.stream.kafka.binder.defaultBrokerPort 的
-
brokers
允许指定带或不带端口信息的主机(例如,host1,host2:port2
). 当代理列表中未配置端口时,这将设置默认端口。违约:
9092
. - spring.cloud.stream.kafka.binder.configuration
-
客户端属性(生产者和使用者)的键/值映射传递给绑定器创建的所有客户端。 由于生产者和使用者都使用这些属性,因此使用应限制为公共属性,例如安全设置。 通过此配置提供的未知 Kafka 生产者或消费者属性将被过滤掉,不允许传播。 此处的属性将取代启动中设置的任何属性。
默认值:空地图。
- spring.cloud.stream.kafka.binder.consumer属性
-
任意 Kafka 客户端使用者属性的键/值映射。 除了支持已知的 Kafka 使用者属性外,这里还允许未知的使用者属性。 此处的属性将取代在启动和
configuration
属性。默认值:空地图。
- spring.cloud.stream.kafka.binder.headers
-
由活页夹传输的自定义标头列表。仅当与较旧的应用程序(⇐ 1.3.x)通信时才需要
kafka-clients
版本 < 0.11.0.0。较新的版本原生支持标头。默认值:空。
- spring.cloud.stream.kafka.binder.health超时
-
等待获取分区信息的时间(以秒为单位)。如果此计时器过期,运行状况将报告为关闭。
默认值:10。
- spring.cloud.stream.kafka.binder.requiredAcks
-
代理上所需的确认数。请参阅生产者的 Kafka 文档
acks
财产。违约:
1
. - spring.cloud.stream.kafka.binder.minPartitionCount
-
仅在以下情况下有效
autoCreateTopics
或autoAddPartitions
已设置。绑定器在其生成或使用数据的主题上配置的全局最小分区数。它可以被partitionCount
的设置或通过instanceCount * concurrency
生产者的设置(如果其中任何一个较大)。违约:
1
. - spring.cloud.stream.kafka.binder.producer属性
-
任意 Kafka 客户端生产者属性的键/值映射。除了支持已知的 Kafka 生产者属性外,这里还允许未知的生产者属性。此处的属性取代在引导和
configuration
属性。默认值:空地图。
- spring.cloud.stream.kafka.binder.replicationFactor
-
自动创建主题的复制因子
autoCreateTopics
处于活动状态。可以在每个绑定上覆盖。违约:
1
. - spring.cloud.stream.kafka.binder.autoCreateTopics
-
如果设置为
true
,则活页夹会自动创建新主题。如果设置为false
,则 Binder 依赖于已配置的主题。在后一种情况下,如果主题不存在,则 Binder 无法启动。此设置独立于 auto.create.topics.enable
代理的设置,并且不会影响它。如果服务器设置为自动创建主题,则可以使用默认代理设置将它们作为元数据检索请求的一部分创建。违约:
true
. - spring.cloud.stream.kafka.binder.autoAddPartitions
-
如果设置为
true
,则活页夹会根据需要创建新分区。如果设置为false
,则绑定程序依赖于已配置的主题的分区大小。如果目标主题的分区计数小于预期值,则绑定程序无法启动。违约:
false
. - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
在活页夹中启用事务。 看
transaction.id
在 Kafka 文档中,在spring-kafka
文档。 启用交易后,单个producer
属性被忽略,所有生产者都使用spring.cloud.stream.kafka.binder.transaction.producer.*
性能。默认值
null
(无交易) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
事务性活页夹中生产者的全局生产者属性。 看
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
和 Kafka 生产者属性以及所有绑定器支持的通用生产者属性。默认值:查看各个生产者属性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
的 bean 名称
KafkaHeaderMapper
用于映射spring-messaging
header 进出 Kafka 标头。 例如,如果您希望自定义BinderHeaderMapper
对标头使用 JSON 反序列化的 bean。 如果此自定义BinderHeaderMapper
使用此属性的 Bean 不可供 Binder 使用,则 Binder 将查找名称为kafkaBinderHeaderMapper
即BinderHeaderMapper
在回退到默认值之前BinderHeaderMapper
由活页夹创建。默认值:无。
1.3.2. Kafka 消费者属性
为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为spring.cloud.stream.kafka.default.consumer.<property>=<value> . |
以下属性仅适用于 Kafka 使用者,并且必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.
.
- admin.configuration
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.properties
,并且将在将来的版本中删除对它的支持。 - admin.replicas-assignment
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replicas-assignment
,并且将在将来的版本中删除对它的支持。 - admin.replication-factor
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replication-factor
,并且将在将来的版本中删除对它的支持。 - autoRebalance启用
-
什么时候
true
,主题分区将在消费者组的成员之间自动重新平衡。 什么时候false
,每个消费者都会根据spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
. 这需要同时spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
属性。的值spring.cloud.stream.instanceCount
在这种情况下,属性通常必须大于 1。违约:
true
. - ackEach记录
-
什么时候
autoCommitOffset
是true
,则此设置规定是否在处理每条记录后提交偏移量。默认情况下,偏移量将在consumer.poll()
已处理。轮询返回的记录数可以通过max.poll.records
Kafka 属性,通过使用者设置configuration
财产。 将此设置为true
可能会导致性能下降,但这样做会降低发生故障时重新传递记录的可能性。 另请参阅活页夹requiredAcks
属性,这也会影响提交偏移量的性能。违约:
false
. - 自动提交偏移
-
是否在处理消息时自动提交偏移量。 如果设置为
false
,带有键kafka_acknowledgment
的类型org.springframework.kafka.support.Acknowledgment
标头存在于入站消息中。 应用程序可以使用此标头来确认消息。 有关详细信息,请参阅示例部分。 当此属性设置为false
,Kafka 绑定器将 ack 模式设置为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL
应用程序负责确认记录。 另请参阅ackEachRecord
.违约:
true
. - 自动提交错误
-
仅在以下情况下有效
autoCommitOffset
设置为true
. 如果设置为false
,它会抑制导致错误的消息的自动提交,并且仅对成功的消息进行提交。它允许流从上次成功处理的消息中自动重放,以防持续失败。 如果设置为true
,它总是自动提交(如果启用了自动提交)。 如果未设置(默认值),则它实际上具有与enableDlq
,如果错误消息被发送到 DLQ,则自动提交错误消息,否则不提交错误消息。默认值:未设置。
- 重置偏移量
-
是否将使用者上的偏移量重置为 startOffset 提供的值。 如果
KafkaRebalanceListener
提供;请参阅使用 KafkaRebalanceListener。违约:
false
. - 开始偏移量
-
新组的起始偏移量。 允许的值:
earliest
和latest
. 如果为消费者“绑定”显式设置了使用者组(通过spring.cloud.stream.bindings.<channelName>.group
),'startOffset' 设置为earliest
.否则,它设置为latest
对于anonymous
消费者群体。 另请参阅resetOffsets
(在此列表中的前面)。默认值:null(相当于
earliest
). - 启用Dlq
-
当设置为 true 时,它会为使用者启用 DLQ 行为。 默认情况下,导致错误的邮件将转发到名为
error.<destination>.<group>
. 可以通过设置dlqName
财产。 这为更常见的 Kafka 重放场景提供了另一种选择,适用于错误数量相对较少且重放整个原始主题可能过于繁琐的情况。 请参阅死信主题处理处理以获取更多信息。 从 V2.0 开始,发送到 DLQ 主题的消息将使用以下标头进行增强:x-original-topic
,x-exception-message
和x-exception-stacktrace
如byte[]
. 缺省情况下,失败的记录将发送到 DLQ 主题中与原始记录相同的分区号。 有关如何更改该行为,请参阅死信主题分区选择。不允许在以下情况下使用destinationIsPattern
是true
.违约:
false
. - dlq分区
-
什么时候
enableDlq
为 true,并且未设置此属性,则创建与主主题具有相同分区数的死信主题。 通常,死信记录将发送到死信主题中与原始记录相同的分区。 此行为可以更改;请参阅死信主题分区选择。 如果此属性设置为1
并且没有DqlPartitionFunction
bean,所有死信记录都将写入分区0
. 如果此属性大于1
,您必须提供一个DlqPartitionFunction
豆。 请注意,实际分区计数受活页夹的minPartitionCount
财产。违约:
none
- 配置
-
映射包含通用 Kafka 使用者属性的键/值对。 除了具有 Kafka 消费者属性外,还可以在此处传递其他配置属性。 例如,应用程序所需的某些属性,例如
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar
.默认值:空地图。
- dlq名称
-
要接收错误消息的 DLQ 主题的名称。
默认值:null(如果未指定,则导致错误的消息将转发到名为
error.<destination>.<group>
). - dlqProducer属性
-
使用此功能,可以设置特定于 DLQ 的生产者属性。 通过 kafka 生产者属性提供的所有属性都可以通过此属性进行设置。 当在消费者上启用本机解码(即 useNativeDecoding: true)时,应用程序必须为 DLQ 提供相应的键/值序列化器。 这必须以以下形式提供
dlqProducerProperties.configuration.key.serializer
和dlqProducerProperties.configuration.value.serializer
.默认值:默认 Kafka 生产者属性。
- 标准标头
-
指示入站通道适配器填充的标准标头。 允许的值:
none
,id
,timestamp
或both
. 如果使用本机反序列化并且接收消息的第一个组件需要id
(例如配置为使用 JDBC 消息存储的聚合器)。违约:
none
- 转换器BeanName
-
实现
RecordMessageConverter
.用于入站通道适配器以替换默认的MessagingMessageConverter
.违约:
null
- 空闲事件间隔
-
指示最近未收到任何消息的事件之间的间隔(以毫秒为单位)。 使用
ApplicationListener<ListenerContainerIdleEvent>
以接收这些事件。 有关使用示例,请参阅示例:暂停和恢复消费者。违约:
30000
- 目的地是模式
-
当 true 时,目标被视为正则表达式
Pattern
用于匹配代理的主题名称。 如果为 true,则不会预配主题,并且enableDlq
不允许,因为绑定程序在配置阶段不知道主题名称。 请注意,检测与模式匹配的新主题所花费的时间由消费者属性控制metadata.max.age.ms
,(在撰写本文时)默认为 300,000 毫秒(5 分钟)。 这可以使用configuration
属性。违约:
false
- topic.properties
-
一个
Map
配置新主题时使用的 Kafka 主题属性——例如spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0
默认值:无。
- topic.replicas-assignment
-
副本分配的 Map<Integer, List<Integer>>,键是分区,值是分配。 在预配新主题时使用。 请参阅
NewTopic
Javadocs 中的kafka-clients
罐。默认值:无。
- topic.replication-factor
-
预配主题时要使用的复制因子。覆盖活页夹范围的设置。 如果出现以下情况,则忽略
replicas-assignments
存在。默认值:无(使用活页夹范围的默认值 1)。
- pollTimeout
-
超时,用于在可轮询消费者中轮询。
默认值:5 秒。
- 事务管理器
-
的 Bean 名称
KafkaAwareTransactionManager
用于覆盖此绑定的绑定程序的事务管理器。 如果您想将另一个事务与 Kafka 事务同步,通常需要使用ChainedKafkaTransactionManaager
. 要实现记录的一次使用和生产,使用者和生产者绑定必须全部配置为相同的事务管理器。默认值:无。
1.3.3. 使用批次
从 3.0 版本开始,当spring.cloud.stream.binding.<name>.consumer.batch-mode
设置为true
,通过轮询 Kafka 收到的所有记录Consumer
将作为List<?>
到 listener 方法。
否则,将一次调用一条记录。
批处理的大小由 Kafka 使用者属性控制max.poll.records
,min.fetch.bytes
,fetch.max.wait.ms
;有关更多信息,请参阅 Kafka 文档。
使用批处理模式时,不支持在绑定器中重试,因此maxAttempts 将被覆盖为 1。
您可以配置SeekToCurrentBatchErrorHandler (使用ListenerContainerCustomizer ) 以实现与在 Binder 中重试类似的功能。
您也可以使用手册AckMode 并调用Ackowledgment.nack(index, sleep) 提交部分批次的偏移量并重新传递剩余记录。
有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档。 |
1.3.4. Kafka 生产者属性
为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为spring.cloud.stream.kafka.default.producer.<property>=<value> . |
以下属性仅适用于 Kafka 生产者,并且
必须以spring.cloud.stream.kafka.bindings.<channelName>.producer.
.
- admin.configuration
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.properties
,并且将在将来的版本中删除对它的支持。 - admin.replicas-assignment
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replicas-assignment
,并且将在将来的版本中删除对它的支持。 - admin.replication-factor
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replication-factor
,并且将在将来的版本中删除对它的支持。 - 缓冲区大小
-
Kafka 生产者在发送前尝试批处理的数据量上限(以字节为单位)。
违约:
16384
. - 同步
-
生产者是否同步。
违约:
false
. - 发送超时表达式
-
根据传出消息计算的 SpEL 表达式,用于在启用同步发布时评估等待 ack 的时间,例如,
headers['mySendTimeout']
. 超时值以毫秒为单位。 在 3.0 之前的版本中,除非使用本机编码,否则无法使用有效负载,因为在计算此表达式时,有效负载已经采用byte[]
. 现在,在转换有效负载之前对表达式进行计算。违约:
none
. - batchTimeout
-
生产者在发送消息之前等待多长时间,以允许更多消息在同一批次中累积。 (通常,生产者根本不会等待,而只是发送上一次发送过程中累积的所有消息。非零值可能会以延迟为代价来增加吞吐量。
违约:
0
. - messageKey表达式
-
根据用于填充生成的 Kafka 消息的密钥的传出消息进行评估的 SpEL 表达式,例如
headers['myKey']
. 在 3.0 之前的版本中,除非使用本机编码,否则无法使用有效负载,因为在计算此表达式时,有效负载已经采用byte[]
. 现在,在转换有效负载之前对表达式进行计算。违约:
none
. - headerPatterns
-
以逗号分隔的简单模式列表,用于匹配要映射到 Kafka 的 Spring 消息传递标头
Headers
在ProducerRecord
. 模式可以以通配符(星号)开头或结尾。模式可以通过前缀!
. 匹配在第一次匹配(正或负)后停止。 例如!ask,as*
将通过ash
但不是ask
.id
和timestamp
永远不会映射。默认值:(所有标头 - 除了
*
id
和timestamp
) - 配置
-
映射包含通用 Kafka 生产者属性的键/值对。
默认值:空地图。
- topic.properties
-
一个
Map
配置新主题时使用的 Kafka 主题属性——例如spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0
- topic.replicas-assignment
-
副本分配的 Map<Integer, List<Integer>>,键是分区,值是分配。 在预配新主题时使用。 请参阅
NewTopic
Javadocs 中的kafka-clients
罐。默认值:无。
- topic.replication-factor
-
预配主题时要使用的复制因子。覆盖活页夹范围的设置。 如果出现以下情况,则忽略
replicas-assignments
存在。默认值:无(使用活页夹范围的默认值 1)。
- 使用主题标头
-
设置为
true
将默认绑定目标(主题名称)替换为KafkaHeaders.TOPIC
出站邮件中的邮件头。 如果标头不存在,则使用默认绑定目标。 违约:false
. - 记录元数据通道
-
的 bean 名称
MessageChannel
应将成功的发送结果发送到哪个;Bean 必须存在于应用程序上下文中。 发送到通道的消息是带有附加标头的已发送消息(转换后,如果有)KafkaHeaders.RECORD_METADATA
. 标头包含一个RecordMetadata
Kafka 客户端提供的对象;它包括在主题中写入记录的分区和偏移量。
ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
失败的发送将进入生产者错误通道(如果已配置);请参阅错误通道。 默认值:null
+
Kafka 绑定器使用partitionCount 将生产者设置为提示,以创建具有给定分区计数的主题(结合minPartitionCount ,两者中的最大值是正在使用的值)。
配置两者时要小心minPartitionCount 用于活页夹和partitionCount 对于应用程序,因为使用较大的值。
如果主题已存在分区计数较小且autoAddPartitions 禁用(默认值),则活页夹无法启动。
如果主题已存在分区计数较小且autoAddPartitions 启用,则添加新分区。
如果主题已存在分区数大于 (minPartitionCount 或partitionCount ),则使用现有分区计数。 |
- 压缩
-
将
compression.type
producer 属性。 支持的值包括none
,gzip
,snappy
和lz4
. 如果您覆盖kafka-clients
jar 到 2.1.0(或更高版本),如 Spring for Apache Kafka 文档中所述,并希望使用zstd
压缩, 使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd
.违约:
none
. - 事务管理器
-
的 Bean 名称
KafkaAwareTransactionManager
用于覆盖此绑定的绑定程序的事务管理器。 如果您想将另一个事务与 Kafka 事务同步,通常需要使用ChainedKafkaTransactionManaager
. 要实现记录的一次使用和生产,使用者和生产者绑定必须全部配置为相同的事务管理器。默认值:无。
1.3.5. 使用示例
在本部分中,我们将演示上述属性在特定方案中的用法。
示例:设置autoCommitOffset
自false
和依赖手动确认
此示例说明了如何在使用者应用程序中手动确认偏移量。
此示例要求spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset
设置为false
.
为您的示例使用相应的输入通道名称。
@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
示例:安全配置
Apache Kafka 0.9 支持客户端和代理之间的安全连接。
要利用此功能,请遵循 Apache Kafka 文档中的指南以及 Confluent 文档中的 Kafka 0.9 安全指南。
使用spring.cloud.stream.kafka.binder.configuration
选项为活页夹创建的所有客户端设置安全属性。
例如,要将security.protocol
自SASL_SSL
,请设置以下属性:
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
所有其他安全属性都可以以类似的方式设置。
使用 Kerberos 时,请按照参考文档中的说明创建和引用 JAAS 配置。
Spring Cloud Stream 支持使用 JAAS 配置文件和 Spring Boot 属性将 JAAS 配置信息传递给应用程序。
使用 JAAS 配置文件
可以使用系统属性为 Spring Cloud Stream 应用程序设置 JAAS 和(可选)krb5 文件位置。 以下示例显示如何使用 JAAS 配置文件启动具有 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序:
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot 属性
作为拥有 JAAS 配置文件的替代方法,Spring Cloud Stream 提供了一种机制,用于使用 Spring Boot 属性为 Spring Cloud Stream 应用程序设置 JAAS 配置。
以下属性可用于配置 Kafka 客户端的登录上下文:
- spring.cloud.stream.kafka.binder.jaas.login模块
-
登录模块名称。正常情况下无需设置。
违约:
com.sun.security.auth.module.Krb5LoginModule
. - spring.cloud.stream.kafka.binder.jaas.controlFlag
-
登录模块的控制标志。
违约:
required
. - spring.cloud.stream.kafka.binder.jaas.options
-
映射包含登录模块选项的键/值对。
默认值:空地图。
以下示例演示如何使用 Spring Boot 配置属性启动具有 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序:
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM
前面的示例表示与以下 JAAS 文件等效的内容:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_client.keytab"
principal="[email protected]";
};
如果所需的主题已存在于代理上或将由管理员创建,则可以关闭自动创建,并且只需要发送客户端 JAAS 属性。
不要在同一应用程序中混合使用 JAAS 配置文件和 Spring Boot 属性。
如果-Djava.security.auth.login.config system 属性已经存在,则 Spring Cloud Stream 会忽略 Spring Boot 属性。 |
使用autoCreateTopics 和autoAddPartitions 使用 Kerberos。
通常,应用程序可能会使用在 Kafka 和 Zookeeper 中没有管理权限的主体。
因此,依赖 Spring Cloud Stream 创建/修改主题可能会失败。
在安全环境中,强烈建议使用 Kafka 工具以管理方式创建主题和管理 ACL。 |
示例:暂停和恢复使用者
如果希望暂停使用但不导致分区重新平衡,则可以暂停和恢复使用。
这可以通过添加Consumer
作为参数@StreamListener
.
要恢复,您需要一个ApplicationListener
为ListenerContainerIdleEvent
实例。
事件发布的频率由idleEventInterval
财产。
由于使用者不是线程安全的,因此必须在调用线程上调用这些方法。
以下简单应用程序演示如何暂停和恢复:
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
}
@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
return event -> {
System.out.println(event);
if (event.getConsumer().paused().size() > 0) {
event.getConsumer().resume(event.getConsumer().paused());
}
};
}
}
1.4. 事务性活页夹
通过设置启用事务spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
设置为非空值,例如tx-
.
当在处理器应用程序中使用时,消费者启动事务;在使用者线程上发送的任何记录都参与同一事务。
当监听器正常退出时,监听器容器会将偏移量发送给事务并提交。
一个通用的生产者工厂用于使用spring.cloud.stream.kafka.binder.transaction.producer.*
性能; 忽略单个绑定 Kafka 生产者属性。
事务不支持正常的活页夹重试(和死信),因为重试将在原始事务中运行,该事务可能会回滚,并且任何已发布的记录也将回滚。启用重试时(公共属性maxAttempts 大于零),重试属性用于配置DefaultAfterRollbackProcessor 以启用容器级别的重试。
同样,此功能不是在事务中发布死信记录,而是通过DefaultAfterRollbackProcessor 在主事务回滚后运行。 |
如果您希望在源应用程序中使用事务,或者从某个任意线程中用于仅生产者事务(例如@Scheduled
方法),您必须获取对事务性生产者工厂的引用,并定义一个KafkaTransactionManager
bean 使用它。
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
请注意,我们使用BinderFactory
;用null
在第一个参数中,当只有一个 Binder 配置时。
如果配置了多个活页夹,请使用活页夹名称获取引用。
一旦我们有了对 binder 的引用,我们就可以获得对ProducerFactory
并创建事务管理器。
然后您将使用正常的 Spring 事务支持,例如TransactionTemplate
或@Transactional
例如:
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
如果您希望将仅生产者事务与其他事务管理器中的事务同步,请使用ChainedTransactionManager
.
如果您部署应用程序的多个实例,则每个实例都需要一个唯一的transactionIdPrefix . |
1.5. 错误通道
从 1.3 版开始,绑定器无条件地将异常发送到每个使用者目标的错误通道,也可以配置为将异步生产者发送失败发送到错误通道。 有关更多信息,请参阅 [spring-cloud-stream-overview-error-handling]。
的有效负载ErrorMessage
对于发送失败,是KafkaSendFailureException
具有属性:
-
failedMessage
:春季消息Message<?>
未能发送。 -
record
:原始的ProducerRecord
这是从failedMessage
没有自动处理生产者异常(例如发送到死信队列)。 您可以在自己的 Spring Integration 流中使用这些异常。
1.6. Kafka 指标
Kafka binder 模块公开以下指标:
spring.cloud.stream.binder.kafka.offset
:此指标指示给定使用者组尚未从给定 Binder 的主题中使用多少消息。
提供的指标基于 Mircometer 指标库。该指标包含消费者组信息、主题以及与主题上最新偏移量的实际滞后。
此指标对于向 PaaS 平台提供自动缩放反馈特别有用。
1.7. 逻辑删除记录(空记录值)
使用压缩主题时,带有null
值(也称为逻辑删除记录)表示删除键。
要在@StreamListener
方法,则必须将参数标记为不需要接收null
value 参数。
@StreamListener(Sink.INPUT)
public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key,
@Payload(required = false) Customer customer) {
// customer is null if a tombstone record
...
}
1.8. 使用 KafkaRebalanceListener
应用程序可能希望在最初分配分区时将主题/分区查找到任意偏移量,或者对使用者执行其他作。
从 2.1 版开始,如果您提供单个KafkaRebalanceListener
bean 中,它将连接到所有 Kafka 消费者绑定中。
public interface KafkaBindingRebalanceListener {
/**
* Invoked by the container before any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
}
/**
* Invoked by the container after any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
/**
* Invoked when partitions are initially assigned or after a rebalance.
* Applications might only want to perform seek operations on an initial assignment.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
boolean initial) {
}
}
您无法将resetOffsets
consumer 属性设置为true
当您提供重新平衡侦听器时。
1.9. 死信主题处理
1.9.1. 死信主题分区选择
默认情况下,记录使用与原始记录相同的分区发布到死信主题。 这意味着死信主题必须至少具有与原始记录一样多的分区。
要更改此行为,请添加一个DlqPartitionFunction
实现为@Bean
到应用程序上下文。
只能存在一个这样的豆子。
该函数是随消费者组提供的,失败的ConsumerRecord
和例外。
例如,如果您始终想要路由到分区 0,则可以使用:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果将使用者绑定的dlqPartitions 属性设置为 1(以及活页夹的minPartitionCount 等于1 ),无需提供DlqPartitionFunction ;框架将始终使用分区 0。
如果将使用者绑定的dlqPartitions 属性设置为大于1 (或活页夹的minPartitionCount 大于1 ),您必须提供一个DlqPartitionFunction bean,即使分区计数与原始主题的分区计数相同。 |
1.9.2. 处理死信主题中的记录
由于该框架无法预测用户希望如何处理死信邮件,因此它没有提供任何标准机制来处理它们。 如果死信的原因是暂时的,您可能希望将邮件路由回原始主题。 但是,如果问题是永久性问题,则可能会导致无限循环。 本主题中的示例 Spring Boot 应用程序是如何将这些消息路由回原始主题的示例,但它在尝试三次后将它们移动到“停车场”主题。 该应用程序是另一个从死信主题中读取的 spring-cloud-stream 应用程序。 当 5 秒内未收到消息时,它终止。
这些示例假定原始目标为so8400out
消费群体是so8400
.
有几种策略需要考虑:
-
考虑仅在主应用程序未运行时运行重新路由。 否则,暂时性错误的重试会很快用完。
-
或者,使用两阶段方法:使用此应用程序路由到第三个主题,另一个应用程序从那里路由回主主题。
以下代码列表显示了示例应用程序:
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private MessageChannel parkingLot;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<?> reRoute(Message<?> failed) {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries.intValue() < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
parkingLot.send(MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, terminating");
return;
}
}
}
public interface TwoOutputProcessor extends Processor {
@Output("parkingLot")
MessageChannel parkingLot();
}
}
1.10. 使用 Kafka Binder 进行分区
Apache Kafka 原生支持主题分区。
有时将数据发送到特定分区是有利的 — 例如,当您想要严格排序消息处理时(特定客户的所有消息都应转到同一分区)。
以下示例展示了如何配置生产者端和消费者端:
@SpringBootApplication
@EnableBinding(Source.class)
public class KafkaPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"foo1", "bar1", "qux1",
"foo2", "bar2", "qux2",
"foo3", "bar3", "qux3",
"foo4", "bar4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
.web(false)
.run(args);
}
@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
public Message<?> generate() {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
}
}
spring:
cloud:
stream:
bindings:
output:
destination: partitioned.topic
producer:
partition-key-expression: headers['partitionKey']
partition-count: 12
必须预配主题以具有足够的分区,以便为所有使用者组实现所需的并发性。
上述配置最多支持 12 个消费者实例(如果其concurrency 是 2,如果它们的并发性为 3,则为 4,依此类推)。
通常最好“过度预配”分区,以允许将来增加使用者或并发性。 |
上述配置使用默认分区 (key.hashCode() % partitionCount ).
这可能会也可能不会提供适当平衡的算法,具体取决于键值。
您可以使用partitionSelectorExpression 或partitionSelectorClass 性能。 |
由于分区由 Kafka 原生处理,因此消费者端不需要特殊配置。 Kafka 在实例之间分配分区。
以下 Spring Boot 应用程序监听 Kafka 流并打印(到控制台)每条消息转到的分区 ID:
@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@StreamListener(Sink.INPUT)
public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(in + " received from partition " + partition);
}
}
spring:
cloud:
stream:
bindings:
input:
destination: partitioned.topic
group: myGroup
您可以根据需要添加实例。
Kafka 重新平衡分区分配。
如果实例计数(或instance count * concurrency
) 超过分区数,部分消费者处于空闲状态。
2. Kafka 流活页夹
2.1. 用法
要使用 Kafka Streams 绑定器,您只需使用以下 Maven 坐标将其添加到 Spring Cloud Stream 应用程序中:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
为 Kafka Streams 绑定器引导新项目的一种快速方法是使用 Spring Initializr,然后选择“Cloud Streams”和“Spring for Kafka Streams”,如下所示

2.2. 概述
Spring Cloud Stream 包括一个专为 Apache Kafka Streams 绑定而设计的绑定器实现。 通过这种本机集成,Spring Cloud Stream“处理器”应用程序可以直接在核心业务逻辑中使用 Apache Kafka Streams API。
Kafka Streams 绑定器实现建立在 Spring for Apache Kafka 项目提供的基础之上。
Kafka Streams 绑定器为 Kafka Streams 中的三种主要类型提供了绑定功能 -KStream
,KTable
和GlobalKTable
.
Kafka Streams 应用程序通常遵循从入站主题读取记录的模型,应用业务逻辑,然后将转换后的记录写入出站主题。 或者,也可以定义没有出站目标的处理器应用程序。
在以下部分中,我们将详细介绍 Spring Cloud Stream 与 Kafka Streams 的集成。
2.3. 编程模型
当使用 Kafka Streams binder 提供的编程模型时,高级 Streams DSL 和高级和低级 Processor-API 的混合都可以用作选项。
当混合更高级别和较低级别的 API 时,这通常是通过调用transform
或process
API 方法KStream
.
2.3.1. 函数式样式
从 Spring Cloud Stream 开始3.0.0
,Kafka Streams 绑定器允许使用 Java 8 中可用的函数式编程风格来设计和开发应用程序。
这意味着应用程序可以简洁地表示为类型的 lambda 表达式java.util.function.Function
或java.util.function.Consumer
.
让我们举一个非常基本的例子。
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
虽然很简单,但这是一个完整的独立 Spring Boot 应用程序,它利用 Kafka Streams 进行流处理。
这是一个没有出站绑定且只有一个入站绑定的使用者应用程序。
应用程序使用数据,它只是记录来自KStream
标准输出上的键和值。
该应用程序包含SpringBootApplication
注释和标记为Bean
.
bean 方法的类型为java.util.function.Consumer
其中参数化为KStream
.
然后在实现中,我们返回一个 Consumer 对象,该对象本质上是一个 lambda 表达式。
在 lambda 表达式中,提供了用于处理数据的代码。
在此应用程序中,有一个类型的单个输入绑定KStream
.
活页夹为应用程序创建此绑定,名称为process-in-0
,即函数 bean name 的名称后跟破折号字符 () 和文字-
in
后跟另一个破折号,然后是参数的序号位置。
使用此绑定名称可以设置其他属性,例如目标。
例如spring.cloud.stream.bindings.process-in-0.destination=my-topic
.
如果未在绑定上设置目标属性,那么将创建一个与绑定同名的主题(如果应用程序有足够的权限),或者该主题应该已经可用。 |
一旦构建为 uber-jar(例如kstream-consumer-app.jar
),您可以像下面一样运行上面的示例。
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic
这是另一个示例,它是一个具有输入和输出绑定的完整处理器。 这是经典的字数计数示例,其中应用程序从主题接收数据,然后在翻转时间窗口中计算每个单词的出现次数。
@SpringBootApplication
public class WordCountProcessorApplication {
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("word-counts-state-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
}
同样,这是一个完整的 Spring Boot 应用程序。这里与第一个应用程序的不同之处在于 bean 方法是java.util.function.Function
.
第一个参数化类型Function
用于输入KStream
第二个是输出。
在方法体中,提供了一个 lambda 表达式,该表达式类型为Function
作为实现,给出了实际的业务逻辑。
与前面讨论的基于消费者的应用程序类似,此处的输入绑定被命名为process-in-0
默认情况下。对于输出,绑定名称也会自动设置为process-out-0
.
一旦构建为 uber-jar(例如wordcount-processor.jar
),您可以像下面一样运行上面的示例。
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
此应用程序将使用 Kafka 主题中的消息words
并将计算结果发布到输出
主题counts
.
Spring Cloud Stream 将确保来自传入和传出主题的消息自动绑定为 KStream 对象。作为开发人员,您可以专注于代码的业务方面,即编写逻辑 处理器中需要。设置 Kafka Streams 基础架构所需的 Kafka Streams 特定配置 由框架自动处理。
我们在上面看到的两个示例有一个KStream
输入绑定。在这两种情况下,绑定都从单个主题接收记录。
如果要将多个主题多路复用为一个主题KStream
binding,则可以在下方提供逗号分隔的 Kafka 主题作为目标。
spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3
此外,如果要将主题与常规 Exresession 进行匹配,还可以将主题模式提供为目标。
spring.cloud.stream.bindings.process-in-0.destination=input.*
多个输入绑定
许多重要的 Kafka Streams 应用程序通常通过多个绑定使用来自多个主题的数据。
例如,一个主题被消耗为Kstream
另一个作为KTable
或GlobalKTable
.
应用程序可能希望将数据作为表类型接收的原因有很多。
考虑一个用例,其中基础主题是通过数据库中的更改数据捕获 (CDC) 机制填充的,或者应用程序可能只关心下游处理的最新更新。
如果应用程序指定数据需要绑定为KTable
或GlobalKTable
,则 Kafka Streams 绑定器会正确地将目标绑定到KTable
或GlobalKTable
并使它们可供应用程序作。
我们将研究几种不同的场景,了解如何在 Kafka Streams 绑定器中处理多个输入绑定。
Kafka Streams Binder 中的 BiFunction
这是一个示例,我们有两个输入和一个输出。在这种情况下,应用程序可以利用java.util.function.BiFunction
.
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
同样,基本主题与前面的示例相同,但这里我们有两个输入。
Java 的BiFunction
支持用于将输入绑定到所需的目标。
绑定器为输入生成的默认绑定名称为process-in-0
和process-in-1
分别。默认输出绑定为process-out-0
.
在此示例中,第一个参数BiFunction
被绑定为KStream
对于第一个输入,第二个参数被绑定为KTable
对于第二个输入。
Kafka Streams Binder 中的 BiConsumer
如果有两个输入,但没有输出,在这种情况下,我们可以使用java.util.function.BiConsumer
如下图所示。
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {}
}
超过两个输入
如果您有两个以上的输入怎么办? 在某些情况下,您需要两个以上的输入。在这种情况下,活页夹允许您链接部分函数。 在函数式编程术语中,这种技术通常称为柯里化。 随着 Java 8 中添加的函数式编程支持,Java 现在允许您编写柯里函数。 Spring Cloud Stream Kafka Streams 绑定器可以利用此功能来启用多个输入绑定。
让我们看一个例子。
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
return orders -> (
customers -> (
products -> (
orders.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
让我们看看上面介绍的绑定模型的细节。
在此模型中,我们在入站上有 3 个部分应用的函数。让我们将它们称为f(x)
,f(y)
和f(z)
.
如果我们在真正的数学函数意义上扩展这些函数,它将如下所示:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>
.
这x
变量代表KStream<Long, Order>
这y
变量代表GlobalKTable<Long, Customer>
和z
变量代表GlobalKTable<Long, Product>
.
第一个功能f(x)
具有应用程序的第一个输入绑定 (KStream<Long, Order>
),其输出是函数 f(y)。
功能f(y)
具有应用程序的第二个输入绑定 (GlobalKTable<Long, Customer>
),其输出是另一个函数,f(z)
.
函数的输入f(z)
是应用程序的第三个输入 (GlobalKTable<Long, Product>
),其输出为KStream<Long, EnrichedOrder>
这是应用程序的最终输出绑定。
来自三个部分函数的输入,即KStream
,GlobalKTable
,GlobalKTable
分别在方法主体中可供您使用,用于将业务逻辑实现为 lambda 表达式的一部分。
输入绑定命名为enrichOrder-in-0
,enrichOrder-in-1
和enrichOrder-in-2
分别。输出绑定命名为enrichOrder-out-0
.
使用柯里函数,您几乎可以拥有任意数量的输入。但是,请记住,在 Java 中,超过较少数量的输入和部分应用的函数都可能导致代码不可读。 因此,如果您的 Kafka Streams 应用程序需要的输入绑定数量超过相当少的数量,并且您想使用此函数模型,那么您可能需要重新考虑您的设计并适当地分解应用程序。
多个输出绑定
Kafka Streams 允许将出站数据写入多个主题。此功能在 Kafka Streams 中称为分支。
使用多个输出绑定时,需要提供 KStream (KStream[]
) 作为出站返回类型。
这是一个例子:
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
编程模型保持不变,但出站参数化类型为KStream[]
.
默认输出绑定名称为process-out-0
,process-out-1
,process-out-2
分别。
binder 之所以会生成三个输出绑定,是因为它检测了返回的KStream
数组。
Kafka 流的基于函数的编程风格摘要
总之,下表显示了可在函数范式中使用的各种选项。
输入数 | 输出数量 | 要使用的组件 |
---|---|---|
1 |
0 |
java.util.function.消费者 |
2 |
0 |
java.util.function.Bi消费者 |
1 |
1..n |
java.util.function.函数 |
2 |
1..n |
java.util.function.Bi函数 |
>= 3 |
0..n |
使用柯里化函数 |
-
如果此表中有多个输出,则类型将变为
KStream[]
.
2.3.2. 命令式编程模型。
尽管上面概述的函数式编程模型是首选方法,但您仍然可以使用经典的StreamListener
如果您愿意,可以基于方法。
这里有些例子。
下面是使用StreamListener
.
@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class WordCountProcessorApplication {
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<?, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-multi"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
如您所见,这有点冗长,因为您需要提供EnableBinding
以及其他额外的注释,例如StreamListener
和SendTo
使其成为一个完整的应用程序。EnableBinding
是指定包含绑定的绑定接口的位置。
在这种情况下,我们使用的是库存KafkaStreamsProcessor
具有以下协定的绑定接口。
public interface KafkaStreamsProcessor {
@Input("input")
KStream<?, ?> input();
@Output("output")
KStream<?, ?> output();
}
Binder 将为输入创建绑定KStream
和输出KStream
因为您使用的是包含这些声明的绑定接口。
除了函数式中提供的编程模型的明显差异之外,这里需要提到的一件特别的事情是,绑定名称是您在绑定接口中指定的名称。
例如,在上面的应用程序中,由于我们正在使用KafkaStreamsProcessor
,绑定名称为input
和output
.
绑定属性需要使用这些名称。例如spring.cloud.stream.bindings.input.destination
,spring.cloud.stream.bindings.output.destination
等。
请记住,这与函数式样式有根本的不同,因为活页夹会为应用程序生成绑定名称。
这是因为应用程序不在函数模型中使用EnableBinding
.
这是另一个接收器示例,其中我们有两个输入。
@EnableBinding(KStreamKTableBinding.class)
.....
.....
@StreamListener
public void process(@Input("inputStream") KStream<String, PlayEvent> playEvents,
@Input("inputTable") KTable<Long, Song> songTable) {
....
....
}
interface KStreamKTableBinding {
@Input("inputStream")
KStream<?, ?> inputStream();
@Input("inputTable")
KTable<?, ?> inputTable();
}
以下是StreamListener
等同于相同的BiFunction
基于我们在上面看到的处理器。
@EnableBinding(KStreamKTableBinding.class)
....
....
@StreamListener
@SendTo("output")
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
@Input("inputTable") KTable<String, String> userRegionsTable) {
....
....
}
interface KStreamKTableBinding extends KafkaStreamsProcessor {
@Input("inputX")
KTable<?, ?> inputTable();
}
最后,这是StreamListener
相当于具有三个输入和柯里化函数的应用程序。
@EnableBinding(CustomGlobalKTableProcessor.class)
...
...
@StreamListener
@SendTo("output")
public KStream<Long, EnrichedOrder> process(
@Input("input-1") KStream<Long, Order> ordersStream,
@Input("input-"2) GlobalKTable<Long, Customer> customers,
@Input("input-3") GlobalKTable<Long, Product> products) {
KStream<Long, CustomerOrder> customerOrdersStream = ordersStream.join(
customers, (orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order));
return customerOrdersStream.join(products,
(orderId, customerOrder) -> customerOrder.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
});
}
interface CustomGlobalKTableProcessor {
@Input("input-1")
KStream<?, ?> input1();
@Input("input-2")
GlobalKTable<?, ?> input2();
@Input("input-3")
GlobalKTable<?, ?> input3();
@Output("output")
KStream<?, ?> output();
}
您可能会注意到,上述两个示例更加冗长,因为除了提供EnableBinding
,您还需要编写自己的自定义绑定接口。
使用功能模型,您可以避免所有这些仪式细节。
在我们继续查看 Kafka Streams 绑定器提供的通用编程模型之前,这里是StreamListener
多个输出绑定的版本。
EnableBinding(KStreamProcessorWithBranches.class)
public static class WordCountProcessorApplication {
@Autowired
private TimeWindows timeWindows;
@StreamListener("input")
@SendTo({"output1","output2","output3"})
public KStream<?, WordCount>[] process(KStream<Object, String> input) {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(timeWindows)
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
interface KStreamProcessorWithBranches {
@Input("input")
KStream<?, ?> input();
@Output("output1")
KStream<?, ?> output1();
@Output("output2")
KStream<?, ?> output2();
@Output("output3")
KStream<?, ?> output3();
}
}
回顾一下,我们回顾了使用 Kafka Streams 绑定器时的各种编程模型选择。
活页夹提供KStream
,KTable
和GlobalKTable
在输入上。KTable
和GlobalKTable
绑定仅在输入上可用。
Binder 支持KStream
.
Kafka Streams 绑定器编程模型的结果是,绑定器为您提供了使用功能齐全的编程模型或使用StreamListener
基于命令式方法。
2.4. 编程模型的辅助
2.4.1. 单个应用程序中的多个 Kafka Streams 处理器
Binder 允许在单个 Spring Cloud Stream 应用程序中拥有多个 Kafka Streams 处理器。 您可以申请如下。
@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
...
}
在这种情况下,绑定器将创建 3 个具有不同应用程序 ID 的单独 Kafka Streams 对象(更多内容见下文)。 但是,如果应用程序中有多个处理器,则必须告诉 Spring Cloud Stream 需要激活哪些功能。 以下是激活这些功能的方法。
spring.cloud.stream.function.definition: process;anotherProcess;yetAnotherProcess
如果您不希望立即激活某些功能,可以将其从此列表中删除。
当您拥有单个 Kafka Streams 处理器和其他类型的Function
同一应用程序中的 bean,通过不同的绑定器处理(例如,基于常规 Kafka 消息通道绑定器的函数 bean)
2.4.2. Kafka Streams 应用程序 ID
应用程序 ID 是您需要为 Kafka Streams 应用程序提供的必填属性。 Spring Cloud Stream Kafka Streams 绑定器允许您以多种方式配置此应用程序 ID。
如果您只有一个处理器或StreamListener
在应用程序中,则可以使用以下属性在活页夹级别进行设置:
spring.cloud.stream.kafka.streams.binder.applicationId
.
为了方便起见,如果您只有一个处理器,您还可以使用spring.application.name
作为属性来委托应用程序 ID。
如果应用程序中有多个 Kafka Streams 处理器,则需要为每个处理器设置应用程序 ID。 对于函数模型,您可以将其作为属性附加到每个函数。
例如,假设您有以下功能。
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
...
}
和
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
然后,您可以使用以下活页夹级别属性为每个应用程序设置应用程序 ID。
spring.cloud.stream.kafka.streams.binder.functions.process.applicationId
和
spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId
在以下情况下StreamListener
,您需要在处理器上的第一个输入绑定上设置此设置。
例如,假设您有以下两个StreamListener
基于处理器。
@StreamListener
@SendTo("output")
public KStream<String, String> process(@Input("input") <KStream<Object, String>> input) {
...
}
@StreamListener
@SendTo("anotherOutput")
public KStream<String, String> anotherProcess(@Input("anotherInput") <KStream<Object, String>> input) {
...
}
然后,必须使用以下绑定属性为此设置应用程序 ID。
spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId
和
spring.cloud.stream.kafka.streams.bindings.anotherInput.consumer.applicationId
对于基于函数的模型,这种在绑定级别设置应用程序 ID 的方法也将有效。但是,如果您使用的是功能模型,则在绑定器级别设置每个函数会容易得多,如我们在上面看到的那样。
对于生产部署,强烈建议通过配置显式指定应用程序 ID。如果您要自动扩展应用程序,这尤其重要,在这种情况下,您需要确保使用相同的应用程序 ID 部署每个实例。
如果应用程序没有提供应用程序 ID,那么在这种情况下,绑定器将为您自动生成静态应用程序 ID。这在开发场景中很方便,因为它避免了显式提供应用程序 ID 的需要。以这种方式生成的应用程序 ID 将在应用程序重新启动时是静态的。在功能模型的情况下,生成的应用程序 ID 将是函数 bean 名称,后跟文字applicationID
,例如:process-applicationID
如果process
如果函数 bean name 的 bean 名称。在StreamListener
,生成的应用程序 ID 将使用包含的类名,后跟方法名,后跟文字,而不是使用函数 bean 名称applicationId
.
设置应用程序 ID 的摘要
-
默认情况下,binder 将自动生成每个函数的应用程序 ID 或
StreamListener
方法。 -
如果您有单个处理器,则可以使用
spring.kafka.streams.applicationId
,spring.application.name
或spring.cloud.stream.kafka.streams.binder.applicationId
. -
如果您有多个处理器,则可以使用属性 -
spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId
. 在以下情况下StreamListener
,这可以使用spring.cloud.stream.kafka.streams.bindings.input.applicationId
,假设输入绑定名称为input
.
2.4.3. 使用函数样式覆盖绑定器生成的默认绑定名称
默认情况下,绑定器在使用函数式样式时使用上面讨论的策略来生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。如果要覆盖这些绑定名称,可以通过指定以下属性来实现。
spring.cloud.stream.function.bindings.<default binding name>
. 默认绑定名称是活页夹生成的原始绑定名称。
例如,假设您有这个函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
Binder 将生成带有名称的绑定,process-in-0
,process-in-1
和process-out-0
. 现在,如果您想将它们完全更改为其他内容,也许是更多特定于域的绑定名称,那么您可以按如下方式进行作。
springc.cloud.stream.function.bindings.process-in-0=users
springc.cloud.stream.function.bindings.process-in-0=regions
和
spring.cloud.stream.function.bindings.process-out-0=clicks
之后,必须在这些新绑定名称上设置所有绑定级别属性。
请记住,对于上述函数式编程模型,在大多数情况下,遵守默认绑定名称是有意义的。您可能仍希望执行此重写的唯一原因是,当您具有大量配置属性并且想要将绑定映射到更域友好的内容时。
2.4.4. 设置引导服务器配置
运行 Kafka Streams 应用程序时,必须提供 Kafka 代理服务器信息。如果您不提供此信息,则 Binder 希望您以默认值运行代理localhost:9092
. 如果不是这种情况,那么您需要覆盖它。有几种方法可以做到这一点。
-
使用 boot 属性 -
spring.kafka.bootstrapServers
-
活页夹级属性 -
spring.cloud.stream.kafka.streams.binder.brokers
当涉及到 binder 级别属性时,是否使用通过常规 Kafka binder 提供的 broker 属性并不重要 -spring.cloud.stream.kafka.binder.brokers
.
Kafka Streams 绑定器将首先检查是否设置了 Kafka Streams 绑定器特定的代理属性 (spring.cloud.stream.kafka.streams.binder.brokers
),如果未找到,则查找spring.cloud.stream.kafka.binder.brokers
.
2.5. 记录序列化和反序列化
Kafka Streams 绑定器允许您以两种方式序列化和反序列化记录。 一个是Kafka提供的原生序列化和反序列化工具,另一个是Spring Cloud Stream框架的消息转换能力。 让我们看看一些细节。
2.5.1. 入站反序列化
密钥始终使用本机 Serdes 进行反序列化。
对于值,默认情况下,入站的反序列化由 Kafka 本机执行。 请注意,这是对以前版本的 Kafka Streams 绑定程序的默认行为的重大更改,其中反序列化是由框架完成的。
Kafka Streams 绑定器将尝试推断匹配Serde
类型通过查看java.util.function.Function|Consumer
或StreamListener
.
这是它与 Serdes 匹配的顺序。
-
如果应用程序提供类型为
Serde
如果返回类型使用传入键或值类型的实际类型进行参数化,则它将使用Serde
用于入站反序列化。 例如,如果应用程序中有以下内容,则活页夹会检测到KStream
与在Serde
豆。 它将用于入站反序列化。
@Bean
public Serde<Foo() customSerde{
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下来,它查看类型,看看它们是否是 Kafka Streams 公开的类型之一。如果是这样,请使用它们。 以下是绑定器将尝试从 Kafka Streams 匹配的 Serde 类型。
Integer, Long, Short, Double, Float, byte[], UUID and String.
-
如果 Kafka Streams 提供的 Serdes 都不匹配类型,那么它将使用 Spring Kafka 提供的 JsonSerde。在这种情况下,绑定器假定类型是 JSON 友好的。 如果您有多个值对象作为输入,这很有用,因为绑定器会在内部将它们推断为正确的 Java 类型。 在回退到
JsonSerde
不过,活页夹会以默认值Serde`s set in the Kafka Streams configuration to see if it is a `Serde
它可以与传入的 KStream 类型匹配。
如果上述策略都不起作用,那么应用程序必须通过配置提供“Serde”。 这可以通过两种方式进行配置 - 绑定或默认。
首先,活页夹将查看Serde
在绑定级别提供。
例如,如果您有以下处理器,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
然后,您可以提供绑定级别Serde
使用以下内容:
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您提供Serde 作为每个输入绑定的 abover,那么这将具有更高的优先级,并且绑定器将远离任何Serde 推理。 |
如果您希望将默认键/值 Serdes 用于入站反序列化,则可以在 Binder 级别执行此作。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果你不想要 Kafka 提供的原生解码,可以依赖 Spring Cloud Stream 提供的消息转换功能。 由于原生解码是默认的,为了让 Spring Cloud Stream 反序列化入站值对象,您需要显式禁用原生解码。
例如,如果您拥有与上述相同的 BiFunction 处理器,则spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
您需要单独禁用所有输入的本机解码。否则,本机解码仍将应用于您未禁用的那些。
默认情况下,Spring Cloud Stream 将使用application/json
作为内容类型,并使用适当的 JSON 消息转换器。
可以使用以下属性和适当的MessageConverter
豆。
spring.cloud.stream.bindings.process-in-0.contentType
2.5.2. 出站序列化
出站序列化几乎遵循与上述入站反序列化相同的规则。 与入站反序列化一样,与以前版本的 Spring Cloud Stream 相比,一个主要变化是出站的序列化由 Kafka 本机处理。 在 3.0 版本的 binder 之前,这是由框架本身完成的。
出站上的密钥始终由 Kafka 使用匹配Serde
这是由活页夹推断出来的。
如果它无法推断出键的类型,则需要使用配置来指定。
值 serde 使用用于入站反序列化的相同规则进行推断。
首先,它匹配以查看出站类型是否来自应用程序中提供的 bean。
如果没有,它会检查它是否与Serde
被卡夫卡暴露,例如 -Integer
,Long
,Short
,Double
,Float
,byte[]
,UUID
和String
.
如果这不起作用,那么它就会回退到JsonSerde
由 Spring Kafka 项目提供,但先看默认的Serde
配置以查看是否存在匹配项。
请记住,所有这些都对应用程序透明地发生。
如果这些都不起作用,则用户必须提供Serde
按配置使用。
假设您正在使用相同的BiFunction
处理器,如上所述。然后,您可以按如下方式配置出站键/值 Serdes。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果 Serde 推理失败,并且没有提供绑定级别 Serdes,则绑定器将回退到JsonSerde
,但请查看默认的 Serdes 以进行匹配。
默认 serde 的配置方式与上述反序列化下描述的相同。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您的应用程序使用分支功能并具有多个输出绑定,则必须为每个绑定配置这些绑定。同样,如果绑定器能够推断Serde
类型,则无需执行此配置。
如果您不想要 Kafka 提供的本机编码,但想使用框架提供的消息转换,那么您需要显式禁用本机编码,因为本机编码是默认的。例如,如果您具有与上述相同的 BiFunction 处理器,则spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
在分支的情况下,您需要单独禁用所有输出的本机编码。否则,本机编码仍将应用于您未禁用的那些。
当 Spring Cloud Stream 完成转换时,默认情况下,它将使用application/json
作为内容类型,并使用适当的 JSON 消息转换器。可以使用以下属性和相应的MessageConverter
豆。
spring.cloud.stream.bindings.process-out-0.contentType
禁用本机编码/解码时,binder 不会像本机 Serdes 那样进行任何推理。应用程序需要显式提供所有配置选项。因此,通常建议在编写 Spring Cloud Stream Kafka Streams 应用程序时保留反序列化的默认选项,并坚持使用 Kafka Streams 提供的本机反序列化。您必须使用框架提供的消息转换功能的一种场景是,当您的上游生产者使用特定的序列化策略时。在这种情况下,您需要使用匹配的反序列化策略,因为本机机制可能会失败。当依赖默认Serde
机制,应用程序必须确保活页夹有一条前进的道路,以正确的方式正确映射入站和出站Serde
,否则事情可能会失败。
值得一提的是,上面概述的数据反序列化方法仅适用于处理器的边缘,即入站和出站。
您的业务逻辑可能仍需要调用显式需要的 Kafka Streams APISerde
对象。
这些仍然是应用程序的责任,必须由开发人员相应地处理。
2.6. 错误处理
Apache Kafka Streams 提供了本机处理反序列化错误异常的功能。
有关此支持的详细信息,请参阅此。
开箱即用,Apache Kafka Streams 提供了两种反序列化异常处理程序 -LogAndContinueExceptionHandler
和LogAndFailExceptionHandler
.
顾名思义,前者将记录错误并继续处理下一条记录,后者将记录错误并失败。LogAndFailExceptionHandler
是默认的反序列化异常处理程序。
2.6.1. 在 Binder 中处理反序列化异常
Kafka Streams 绑定器允许使用以下属性指定上述反序列化异常处理程序。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
或
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
除了上述两个反序列化异常处理程序外,绑定器还提供了第三个处理程序,用于将错误记录(毒丸)发送到 DLQ(死信队列)主题。 下面是启用此 DLQ 异常处理程序的方法。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
设置上述属性后,反序列化错误中的所有记录都会自动发送到 DLQ 主题。
您可以设置发布 DLQ 消息的主题名称,如下所示。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
如果设置了此设置,则错误记录将发送到主题custom-dlq
.
如果未设置此设置,则它将创建一个名称为error.<input-topic-name>.<application-id>
.
例如,如果绑定的目标主题是inputTopic
应用程序 ID 为process-applicationId
,则默认的 DLQ 主题为error.inputTopic.process-applicationId
.
如果您打算启用 DLQ,则始终建议为每个输入绑定显式创建 DLQ 主题。
2.6.2. 每个输入消费者绑定的 DLQ
该物业spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
适用于整个应用。
这意味着如果有多个函数或StreamListener
方法,则此属性将应用于所有方法。
但是,如果单个处理器中有多个处理器或多个输入绑定,则可以使用绑定程序为每个输入使用者绑定提供的更细粒度的 DLQ 控件。
如果您有以下处理器,
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
并且您只想在第一个输入绑定上启用 DLQ,在第二个绑定上启用 logAndSkip,然后您可以在消费者上执行此作,如下所示。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: logAndSkip
以这种方式设置反序列化异常处理程序的优先级高于在绑定程序级别设置的优先级。
2.6.3. DLQ 分区
默认情况下,记录使用与原始记录相同的分区发布到死信主题。 这意味着死信主题必须至少具有与原始记录一样多的分区。
要更改此行为,请添加一个DlqPartitionFunction
实现为@Bean
到应用程序上下文。只能存在一个这样的 bean。该函数与消费者组一起提供(在大多数情况下与应用程序 ID 相同),失败的ConsumerRecord
和例外。
例如,如果您始终想要路由到分区 0,则可以使用:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果将使用者绑定的dlqPartitions 属性设置为 1(以及活页夹的minPartitionCount 等于1 ),无需提供DlqPartitionFunction ;框架将始终使用分区 0。
如果将使用者绑定的dlqPartitions 属性设置为大于1 (或活页夹的minPartitionCount 大于1 ),您必须提供一个DlqPartitionFunction bean,即使分区计数与原始主题的分区计数相同。 |
在 Kafka Streams 绑定器中使用异常处理功能时,需要记住几件事。
-
该物业
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
适用于整个应用。 这意味着如果有多个函数或StreamListener
方法,则此属性将应用于所有方法。 -
反序列化的异常处理与本机反序列化和框架提供的消息转换一致。
2.6.4. 在 Binder 中处理生产异常
与上述对反序列化异常处理程序的支持不同,绑定程序不提供用于处理生产异常的第一类机制。但是,您仍然可以使用StreamsBuilderFactoryBean
定制器,您可以在下面的后续部分中找到更多详细信息。
2.7. 状态商店
当使用高级 DSL 并进行适当的调用以触发状态存储时,Kafka Streams 会自动创建状态存储。
如果您想实现传入的KTable
绑定为命名状态存储,则可以使用以下策略来执行此作。
假设您有以下功能。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
然后通过设置以下属性,传入的KTable
数据将具体化到命名状态存储中。
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store
您可以在应用程序中将自定义状态存储定义为 bean,这些状态存储将被 binder 检测并添加到 Kafka Streams 构建器中。 特别是在使用处理器 API 时,需要手动注册状态存储。 为此,您可以在应用程序中将 StateStore 创建为 bean。 以下是定义此类 bean 的示例。
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
然后,应用程序可以直接访问这些状态存储。
在引导过程中,上述 bean 将由绑定器处理并传递给 Streams 构建器对象。
访问状态存储:
Processor<Object, Product>() {
WindowStore<Object, String> state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}
在注册全局状态存储时,这将不起作用。
要注册全局状态存储,请参阅下面有关自定义的部分StreamsBuilderFactoryBean
.
2.8. 交互式查询
Kafka Streams 绑定器 API 公开了一个名为InteractiveQueryService
以交互方式查询状态存储。
您可以在应用程序中以 Spring Bean 的形式访问它。从应用程序访问此 Bean 的一种简单方法是autowire
豆子。
@Autowired
private InteractiveQueryService interactiveQueryService;
一旦您获得了对此 bean 的访问权限,您就可以查询您感兴趣的特定状态存储。见下文。
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
在启动期间,上述检索存储的方法调用可能会失败。 例如,它可能仍在初始化状态存储的过程中。 在这种情况下,重试此作会很有用。 Kafka Streams 绑定器提供了一个简单的重试机制来适应这一点。
下面是可用于控制此重试的两个属性。
-
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 默认值为
1
. -
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认值为
1000
毫秒。
如果有多个 kafka 流应用程序实例正在运行,那么在以交互方式查询它们之前,您需要确定哪个应用程序实例托管了您要查询的特定密钥。InteractiveQueryService
API 提供了用于识别主机信息的方法。
为了使其正常工作,您必须配置属性application.server
如下:
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
以下是一些代码片段:
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
2.9. 健康指标
运行状况指示器需要依赖项spring-boot-starter-actuator
.对于 maven 使用:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Spring Cloud Stream Kafka Streams Binder 提供了一个健康指示器来检查底层流线程的状态。
Spring Cloud Stream 定义了一个属性management.health.binders.enabled
以启用运行状况指示器。请参阅 Spring Cloud Stream 文档。
运行状况指示器为每个流线程的元数据提供以下详细信息:
-
线程名称
-
线程状态:
CREATED
,RUNNING
,PARTITIONS_REVOKED
,PARTITIONS_ASSIGNED
,PENDING_SHUTDOWN
或DEAD
-
活动任务:任务 ID 和分区
-
备用任务:任务 ID 和分区
默认情况下,只有全局状态可见 (UP
或DOWN
).要显示详细信息,属性management.endpoint.health.show-details
必须设置为ALWAYS
或WHEN_AUTHORIZED
.
有关运行状况信息的更多详细信息,请参阅 Spring Boot Actuator 文档。
运行状况指示器的状态为UP 如果注册的所有 Kafka 线程都在RUNNING 州。 |
由于 Kafka Streams 绑定器中有三个单独的绑定器 (KStream
,KTable
和GlobalKTable
),它们都将报告健康状态。
启用时show-details
,报告的一些信息可能是多余的。
当同一应用程序中存在多个 Kafka Streams 处理器时,将报告所有处理器的运行状况检查,并按 Kafka Streams 的应用程序 ID 进行分类。
2.10. 访问 Kafka Streams 指标
Spring Cloud Stream Kafka Streams 绑定器提供了一种基本机制,用于访问通过 Micrometer 导出的 Kafka Streams 指标MeterRegistry
.
Kafka Streams 指标可通过KafkaStreams#metrics()
由活页夹导出到此仪表注册表。
导出的指标来自使用者、生产者、管理员客户端和流本身。
活页夹导出的指标格式为指标组名称,后跟点,再按实际指标名称导出。 原始指标信息中的所有破折号都替换为点。
例如,指标名称network-io-total
从指标组consumer-metrics
在千分尺注册表中作为consumer.metrics.network.io.total
.
同样,指标commit-total
从stream-metrics
可作为stream.metrics.commit.total
.
如果同一应用程序中有多个 Kafka Streams 处理器,则指标名称将以 Kafka Streams 的相应应用程序 ID 为前缀。
在这种情况下,应用程序 ID 将按原样保留,即不会将破折号转换为点等。
例如,如果第一个处理器的应用程序 ID 是processor-1
,然后是指标名称network-io-total
从指标组consumer-metrics
在千分尺注册表中作为processor-1.consumer.metrics.network.io.total
.
您可以通过编程方式访问千分尺MeterRegistry
,然后遍历可用的仪表或使用 Spring Boot 执行器通过 REST 端点访问指标。
通过启动执行器端点访问时,请确保将metrics
前往酒店management.endpoints.web.exposure.include
.
然后你可以访问/acutator/metrics
以获取所有可用指标的列表,然后可以通过相同的 URI (/actuator/metrics/<metric-name>
).
超出信息级指标的任何内容KafkaStreams#metrics()
,(例如调试级别指标)仍然只能在您设置metrics.recording.level
自DEBUG
.
Kafka Streams 默认情况下,将此级别设置为INFO
.有关更多详细信息,请参阅 Kafka Streams 文档中的此部分。
在未来的版本中,binder 可能支持通过 Micrometer 导出这些 DEBUG 级别指标。
2.11. 混合使用高级 DSL 和低级处理器 API
Kafka Streams 提供了两种 API 变体。
它有一个更高级别的 DSL 类似 API,您可以在其中链接许多功能程序员可能熟悉的各种作。
Kafka Streams 还允许访问低级处理器 API。
处理器 API 虽然非常强大,并且能够在低得多的级别上控制事物,但本质上是势在必行的。
用于 Spring Cloud Stream 的 Kafka Streams 绑定器允许您使用高级 DSL 或混合使用 DSL 和处理器 API。
混合使用这两种变体可以为您提供许多选项来控制应用程序中的各种用例。
应用程序可以使用transform
或process
方法 API 调用来访问处理器 API。
下面是如何使用process
应用程序接口。
@Bean
public Consumer<KStream<Object, String>> process() {
return input ->
input.process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, String value) {
//business logic
}
@Override
public void close() {
});
}
这是一个使用transform
应用程序接口。
@Bean
public Consumer<KStream<Object, String>> process() {
return (input, a) ->
input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void close() {
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}
这process
API 方法调用是一个终端作,而transform
API 是非终端的,可为您提供潜在的转换KStream
使用它可以继续使用 DSL 或处理器 API 进行进一步处理。
2.12. 出站分区支持
Kafka Streams 处理器通常将处理后的输出发送到出站 Kafka 主题中。如果出站主题是分区的,并且处理器需要将传出数据发送到特定分区中,那么应用程序需要提供类型为StreamPartitioner
. 有关更多详细信息,请参阅 StreamPartitioner。让我们看一些示例。
这是我们已经多次看到的同一个处理器,
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
...
}
以下是输出绑定目标:
spring.cloud.stream.bindings.process-out-0.destination: outputTopic
如果主题outputTopic
有 4 个分区,如果您不提供分区策略,Kafka Streams 将使用默认的分区策略,这可能不是您想要的结果,具体取决于特定用例。
假设,您想发送任何匹配的密钥spring
对 0 进行分区,cloud
到分区 1,stream
分区 2,其他所有内容都分区 3。
这是您需要在应用程序中执行的作。
@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
return (t, k, v, n) -> {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("stream")) {
return 2;
}
else {
return 3;
}
};
}
这是一个基本的实现,但是,您可以访问记录的键/值、主题名称和分区总数。 因此,如果需要,您可以实现复杂的分区策略。
您还需要提供此 Bean 名称以及应用程序配置。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner
应用程序中的每个输出主题都需要像这样单独配置。
2.13. StreamsBuilderFactoryBean 定制器
通常需要自定义StreamsBuilderFactoryBean
这会创建KafkaStreams
对象。
基于 Spring Kafka 提供的底层支持,binder 允许您自定义StreamsBuilderFactoryBean
.
您可以使用StreamsBuilderFactoryBeanCustomizer
自定义StreamsBuilderFactoryBean
本身。
然后,一旦您访问了StreamsBuilderFactoryBean
通过这个定制器,你可以自定义相应的KafkaStreams
用KafkaStreamsCustomzier
.
这两个定制器都是 Spring for Apache Kafka 项目的一部分。
下面是使用StreamsBuilderFactoryBeanCustomizer
.
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
上面显示为您可以执行的作以自定义StreamsBuilderFactoryBean
.
您基本上可以从StreamsBuilderFactoryBean
以自定义它。
此定制器将在工厂 bean 启动之前由 binder 调用。
一旦您访问了StreamsBuilderFactoryBean
,您还可以自定义底层KafkaStreams
对象。
这是这样做的蓝图。
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
KafkaStreamsCustomizer
将被StreamsBuilderFactoryBeabn
就在基础KafkaStreams
开始。
只能有一个StreamsBuilderFactoryBeanCustomizer
在整个应用程序中。
那么我们如何考虑多个 Kafka Streams 处理器,因为它们中的每个处理器都由单独备份StreamsBuilderFactoryBean
对象?
在这种情况下,如果这些处理器的自定义需要不同,则应用程序需要根据应用程序 ID 应用一些筛选器。
例如,
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
}
};
2.13.1. 使用定制器注册全局状态存储
如上所述,绑定器不提供将全局状态存储注册为功能的第一类方法。 为此,您需要使用定制器。 这是如何做到这一点的。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
try {
final StreamsBuilder streamsBuilder = fb.getObject();
streamsBuilder.addGlobalStore(...);
}
catch (Exception e) {
}
};
}
同样,如果您有多个处理器,则需要将全局状态存储附加到右侧StreamsBuilder
通过过滤掉另一个StreamsBuilderFactoryBean
使用如上所述的应用程序 ID 的对象。
2.13.2. 使用定制器注册生产异常处理程序
在错误处理部分,我们指出 binder 没有提供处理生产异常的第一类方法。
尽管如此,您仍然可以使用StreamsBuilderFacotryBean
customizer 来注册生产异常处理程序。见下文。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}
同样,如果您有多个处理器,您可能需要根据正确的处理器将其适当地设置为StreamsBuilderFactoryBean
.
您也可以使用配置属性添加此类生产异常处理程序(有关更多信息,请参阅下文),但如果您选择使用编程方法,则这是一个选项。
2.14. 时间戳提取器
Kafka Streams 允许您根据各种时间戳概念控制使用者记录的处理。
默认情况下,Kafka Streams 会提取嵌入在使用者记录中的时间戳元数据。
您可以通过提供不同的TimestampExtractor
每个输入绑定的实现。
以下是有关如何做到这一点的一些详细信息。
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
return orderStream ->
customers ->
products -> orderStream;
}
@Bean
public TimestampExtractor timestampExtractor() {
return new WallclockTimestampExtractor();
}
然后你设置上面的TimestampExtractor
每个消费者绑定的 Bean 名称。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"
如果跳过输入使用者绑定来设置自定义时间戳提取器,则该使用者将使用默认设置。
2.15. 具有基于 Kafka Streams 的绑定器和常规 Kafka 绑定器的多绑定器
您可以拥有一个应用程序,其中既有基于常规 Kafka 绑定器的功能/使用者/提供商,又有基于 Kafka Streams 的处理器。 但是,您不能在单个函数或消费者中混合使用它们。
下面是一个示例,在同一应用程序中具有两个基于 Binder 的组件。
@Bean
public Function<String, String> process() {
return s -> s;
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {
return input -> input;
}
这是配置中的相关部分:
spring.cloud.stream.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
如果您拥有与上述相同的应用程序,但正在处理两个不同的 Kafka 集群,例如常规process
同时作用于 Kafka 集群 1 和集群 2(从 cluster-1 接收数据并发送到 cluster-2),并且 Kafka Streams 处理器作用于 Kafka 集群 2。
然后你必须使用 Spring Cloud Stream 提供的多绑定器工具。
下面是配置在这种情况下可能发生的变化。
# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2
# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
注意以上配置。
我们有两种绑定器,但总共有 3 个绑定器,第一种是基于集群 1 (kafka1
),然后是另一个基于集群 2 (kafka2
),最后是kstream
一个 (kafka3
). 应用程序中的第一个处理器从kafka1
并发布到kafka2
其中两个绑定器都基于常规的 Kafka 绑定器,但集群不同。第二个处理器是 Kafka Streams 处理器,它使用来自kafka3
与kafka2
,但粘合剂类型不同。
由于 Kafka Streams 系列绑定器中有三种不同的绑定器类型可用 -kstream
,ktable
和globalktable
- 如果您的应用程序具有基于这些绑定中的任何一个的多个绑定,则需要将其显式作为绑定器类型提供。
例如,如果您有如下处理器,
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
...
}
然后,必须在多活页夹方案中按如下方式配置。 请注意,仅当您有一个真正的多绑定器方案时,才需要这样做,其中有多个处理器在单个应用程序中处理多个集群。 在这种情况下,需要显式为绑定器提供绑定,以区别于其他处理器的绑定器类型和集群。
spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream
# rest of the configuration is omitted.
2.16. 状态清理
默认情况下,Kafkastreams.cleanup()
当绑定停止时调用方法。
请参阅 Spring Kafka 文档。
要修改此行为,只需将单个CleanupConfig
@Bean
(配置为在启动、停止或两者都不清理)到应用程序上下文;将检测到 Bean 并将其连接到工厂 Bean。
2.17. Kafka Streams 拓扑可视化
Kafka Streams Binder 提供了以下执行器端点,用于检索拓扑描述,您可以使用这些端点使用外部工具可视化拓扑。
/actuator/topology
/actuator/topology/<applicaiton-id of the processor>
您需要包含 Spring Boot 中的执行器和 Web 依赖项才能访问这些端点。
此外,您还需要添加topology
自management.endpoints.web.exposure.include
财产。
默认情况下,topology
端点已禁用。
2.18. 配置选项
本节包含 Kafka Streams 绑定器使用的配置选项。
有关与 binder 相关的常见配置选项和属性,请参阅核心文档。
2.18.1. Kafka Streams Binder 属性
以下属性在活页夹级别可用,并且必须以spring.cloud.stream.kafka.streams.binder.
- 配置
-
映射包含与 Apache Kafka Streams API 相关的属性的键/值对。 此属性必须以
spring.cloud.stream.kafka.streams.binder.
. 以下是使用此属性的一些示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有关可能进入流配置的所有属性的更多信息,请参阅StreamsConfig
Apache Kafka Streams 文档中的 JavaDocs。
您可以从StreamsConfig
可以通过这个设置。
使用此属性时,它适用于整个应用程序,因为这是活页夹级别的属性。
如果应用程序中有多个处理器,则所有这些处理器都将获得这些属性。
对于像application.id
,这将成为问题,因此您必须仔细检查属性如何StreamsConfig
使用此活页夹级别进行映射configuration
财产。
- functions.<function-bean-name>.applicationId
-
仅适用于功能型处理器。 这可用于设置应用程序中每个功能的应用程序 ID。 在多个函数的情况下,这是设置应用程序 ID 的便捷方法。
- functions.<function-bean-name>.configuration
-
仅适用于功能型处理器。 映射包含与 Apache Kafka Streams API 相关的属性的键/值对。 这类似于活页夹级别
configuration
属性描述,但这个级别的configuration
属性仅针对命名函数进行限制。 当您有多个处理器并且想要根据特定函数限制对配置的访问时,您可能需要使用它。 都StreamsConfig
属性可以在此处使用。 - 经纪人
-
代理 URL
违约:
localhost
- zk节点
-
Zookeeper URL
违约:
localhost
- 反序列化ExceptionHandler
-
反序列化错误处理程序类型。 此处理程序在绑定程序级别应用,因此应用于应用程序中的所有输入绑定。 有一种方法可以在消费者绑定级别以更细粒度的方式控制它。 可能的值是 -
logAndContinue
,logAndFail
或sendToDlq
违约:
logAndFail
- 应用程序 Id
-
在绑定器级别全局设置 Kafka Streams 应用程序 application.id 的便捷方法。 如果应用程序包含多个函数或
StreamListener
方法,则应用程序 ID 应以不同的方式设置。 请参阅上面详细讨论设置应用程序 ID。默认:应用程序将生成静态应用程序 ID。有关更多详细信息,请参阅应用程序 ID 部分。
- stateStoreRetry.maxAttempts
-
尝试连接到状态存储的最大尝试次数。
默认值:1
- stateStoreRetry.backoffPeriod
-
尝试在重试时连接到状态存储时的回退期。
默认值:1000 毫秒
2.18.2. Kafka Streams 生产者属性
以下属性仅适用于 Kafka Streams 生产者,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.
为方便起见,如果有多个输出绑定并且它们都需要一个通用值,则可以使用前缀spring.cloud.stream.kafka.streams.default.producer.
.
- keySerde
-
要使用的密钥 Serde
默认值:请参阅上面关于消息反序列化的讨论
- 值Serde
-
值 serde 使用
默认值:请参阅上面关于消息反序列化的讨论
- 使用原生编码
-
标志来启用/禁用本机编码
违约:
true
.
streamPartitionerBeanName:
要在使用者处使用的自定义出站分区器 Bean 名称。
应用程序可以提供自定义StreamPartitioner
作为 Spring Bean,并且可以将此 Bean 的名称提供给生产者以代替默认名称。
+ 默认值:请参阅上面有关出站分区支持的讨论。
2.18.3. Kafka Streams 消费者属性
以下属性可用于 Kafka Streams 使用者,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.
为方便起见,如果有多个输入绑定并且它们都需要一个公共值,则可以使用前缀spring.cloud.stream.kafka.streams.default.consumer.
.
- 应用程序 Id
-
为每个输入绑定设置 application.id。这仅适用于
StreamListener
基于功能的处理器,请参阅上面概述的其他方法。默认值:见上文。
- keySerde
-
要使用的密钥 Serde
默认值:请参阅上面关于消息反序列化的讨论
- 值Serde
-
值 serde 使用
默认值:请参阅上面关于消息反序列化的讨论
- 具体化为
-
状态存储在使用传入的 KTable 类型时实现
违约:
none
. - 使用原生解码
-
标志来启用/禁用本机解码
违约:
true
. - dlq名称
-
DLQ 主题名称。
默认值:请参阅上文有关错误处理和 DLQ 的讨论。
- 开始偏移量
-
如果没有要使用的已提交偏移量,则从偏移量开始。 这主要用于消费者第一次使用某个主题时。 Kafka Streams 使用
earliest
作为默认策略,活页夹使用相同的默认值。 这可以重写为latest
使用此属性。违约:
earliest
.
注意:使用resetOffsets
对 Kafka Streams 绑定器没有任何影响。
与基于消息通道的绑定器不同,Kafka Streams 绑定器不会寻求按需开始或结束。
- 反序列化ExceptionHandler
-
反序列化错误处理程序类型。 此处理程序按使用者绑定应用,而不是前面所述的绑定器级别属性。 可能的值是 -
logAndContinue
,logAndFail
或sendToDlq
违约:
logAndFail
- timestampExtractorBeanName
-
要在使用者处使用的特定时间戳提取器 Bean 名称。 应用程序可以提供
TimestampExtractor
作为 Spring bean,并且可以将此 bean 的名称提供给消费者以代替默认名称。默认值:请参阅上面关于时间戳提取器的讨论。
2.18.4. 关于并发的特别说明
在 Kafka Streams 中,您可以使用num.stream.threads
财产。
这,您可以使用各种configuration
上述 binder、functions、producer 或 consumer 级别下的选项。
您还可以使用concurrency
核心 Spring Cloud Stream 为此目的提供的属性。
使用此功能时,您需要在消费者上使用它。
当函数或StreamListener
,在第一个输入绑定上设置此值。
例如,当设置spring.cloud.stream.bindings.process-in-0.consumer.concurrency
,它将被翻译为num.stream.threads
通过活页夹。