参考指南
此指南描述了 Spring Cloud Stream 绑定器的 Apache Kafka 实现。 它包含了有关其设计、使用和配置选项的信息,以及 Stream Cloud Stream 概念如何映射到 Apache Kafka 特定结构的信息。 此外,该指南还解释了 Spring Cloud Stream 的 Kafka Streams 绑定能力。
1. Apache Kafka 绑定器
1.1. 使用方式
要使用Apache Kafka绑定程序,您需要将spring-cloud-stream-binder-kafka添加为Spring Cloud Stream应用程序的依赖项,如以下Maven示例所示:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
或者,您也可以使用 Spring Cloud Stream Kafka Starter,如下例所示(Maven):
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
1.2. 概述
下图显示了Apache Kafka绑定程序如何操作的简化图:
The Apache Kafka Binder 实现将每个目的地映射到一个 Apache Kafka 主题。 The consumer group 直接映射到相同的 Apache Kafka 概念。 分区也直接映射到 Apache Kafka 分区。
绑定程序当前使用 Apache Kafka kafka-clients 版本 2.3.1。 此客户端可以与较旧的代理服务器通信(请参阅 Kafka 文档),但某些功能可能不可用。 例如,早于 0.11.x.x 的版本不支持原生标头。 还有,0.11.x.x 不支持 autoAddPartitions 属性。
1.3. 配置选项
本节包含Apache Kafka绑定器使用的配置选项。
有关绑定器的常见配置选项和属性,请参阅核心文档。
1.3.1. Kafka 绑定属性
- spring.cloud.stream.kafka.binder.brokers
-
与Kafka绑定连接的经纪人的列表。
默认值:
localhost。 - spring.cloud.stream.kafka.binder.defaultBrokerPort
-
brokers允许不带端口信息的主机(例如,host1,host2:port2)。这设置了没有在代理列表中配置端口时的默认端口。默认值:
9092。 - spring.cloud.stream.kafka.binder.configuration
-
由绑定器创建的所有客户端传递的客户端属性(生产者和使用者)的键/值映射。 <br> 由于这些属性同时用于生产者和使用者,因此应将其限制使用于通用属性,例如安全性设置。 <br> 通过此配置提供的任何未知的Kafka生产者或使用者属性都会被筛选掉,不允许传播。 <br> 此处的属性会覆盖在启动中设置的任何属性。
默认值:空映射。
- spring.cloud.stream.kafka.binder.consumerProperties
-
任意Kafka客户端消费者属性的键值映射。除了支持已知的Kafka消费者属性外,还允许在此处设置未知的消费者属性。此处设置的属性会覆盖在boot和
configuration属性中设置的属性。默认值:空映射。
- spring.cloud.stream.kafka.binder.headers
-
列表中包含传输程序运输的自定义标题。仅在与旧版应用程序(< 1.3.x)通信时需要(使用版本号< 0.11.0.0)。较新版本(版本号> 0.11.0.0)原生支持标头。
默认:空。
- spring.cloud.stream.kafka.binder.healthTimeout
-
要等待获取分区信息的时间,以秒为单位。健康报告在计时器到期时变为离线状态。
默认值:10。
- spring.cloud.stream.kafka.binder.requiredAcks
-
在broker上所需确认的数量。 见producer的Kafka文档
acks属性。默认值:
1。 - spring.cloud.stream.kafka.binder.minPartitionCount
-
仅在设置为
autoCreateTopics或autoAddPartitions时有效。生产或消费数据的话题上配置绑定器的全局最小分区数。可以被partitionCount设置的生产者或instanceCount * concurrency设置的生产者(如果更大)所取代。默认值:
1。 - spring.cloud.stream.kafka.binder.producerProperties
-
任意 Kafka 客户端生产者属性的键/值映射。 除了支持已知的 Kafka 生产者属性外,这里也允许使用未知的生产者属性。 这里的属性会覆盖在 boot 和上一个
configuration属性中设置的任何属性。默认值:空映射。
- spring.cloud.stream.kafka.binder.replicationFactor
-
自动创建主题的复制因子,如果为
autoCreateTopics则有效。 可以在每个绑定上重写它。默认值:
1。 - spring.cloud.stream.kafka.binder.autoCreateTopics
-
如果设置为
true,绑定器会自动创建新主题。
如果设置为false,绑定器依赖于主题已经配置好。
在后一种情况下,如果主题不存在,绑定器将无法启动。这个设置独立于代理的
auto.create.topics.enable设置,并不影响它。如果服务器设置为自动创建主题,它们可能作为元数据检索请求的一部分被创建,具有默认的代理设置。
默认值:
true。 - spring.cloud.stream.kafka.binder.autoAddPartitions
-
如果设置为
true,则在必要时创建新分区。如果设置为false,则依赖于目标主题的分区大小已经配置。如果目标主题的分区数小于预期值,绑定程序将无法启动。默认值:
false。 - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
在绑定程序中启用事务。请参阅Kafka文档中的
transaction.id和spring-kafka文档中的https://docs.spring.io/spring-kafka/reference/html/_reference.html#transactions。当启用事务时,单独的producer属性被忽略,所有生产者使用spring.cloud.stream.kafka.binder.transaction.producer.*属性。默认的
null(无事务) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
事务绑定程序中生产者的全局生产者属性。
有关所有绑定程序支持的通用生产者属性,请参阅
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix和Kafka Producer Properties。默认值:请参阅各个生产者属性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
用于将 Kafka 标头映射到和从 Kafka 标头进行映射的
KafkaHeaderMapper的 Bean 名称。
例如,如果您希望自定义使用 JSON 反序列化处理标头的BinderHeaderMapperbean 中的信任包集,则可以使用此功能。
如果未使用此属性提供此自定义BinderHeaderMapperbean,则绑定器将查找名称为kafkaBinderHeaderMapper且类型为BinderHeaderMapper的标头映射器 bean,然后回退到由绑定器创建的默认BinderHeaderMapper。默认值为 no.
1.3.2. Kafka消费者属性
避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为 spring.cloud.stream.kafka.default.consumer.<property>=<value>。 |
以下属性仅适用于Kafka消费者,并且必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.为前缀。
- admin.configuration
-
{ 'html':'***' }
- admin.replicas-assignment
-
{ 'html':'***' }
- admin.replication-factor
-
{ 'html':'***' }
- autoRebalanceEnabled
-
当为
0时,主题分区会自动在消费者组成员之间重新平衡。当为
1时,每个消费者根据2和3分配一组固定的分区。这需要在每个已启动的实例上适当地设置
4和5属性。6的值在这种情况下通常必须大于 1。默认值:
true。 - 处理每条记录
-
当
autoCommitOffset是true时,此设置决定了是否在处理完每个记录后提交偏移量。默认情况下,在处理完由consumer.poll()返回的批次中的所有记录后提交偏移量。可以通过Kafka属性max.poll.records控制poll返回的记录数,该属性通过消费者configuration属性进行设置。将此值设为true可能会导致性能下降,但这样可以减少在发生故障时重新传递记录的可能性。另请参阅绑定器requiredAcks属性,它也会影响提交偏移量的性能。默认值:
false。 - 自动提交偏移量
-
处理完消息后是否自动提交偏移量。
如果设置为false,则传入的消息中存在一个键为kafka_acknowledgment、类型为org.springframework.kafka.support.Acknowledgment的头信息。
应用程序可以使用此头信息来确认消息。
详情请参阅示例部分。
当此属性设置为false时,Kafka 绑定器将确认模式设为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL,应用程序负责确认记录。
另请参阅ackEachRecord。默认值:
true。 - 自动提交错误
-
仅当设置为
true时有效。
如果设置为false,它会抑制导致错误的消息的自动提交,并且只对成功处理的消息进行提交。这允许流在发生持久性失败时从最后一次成功处理的消息处自动重新播放。
如果设置为true,则始终启用自动提交(如果启用了自动提交)。
如果没有设置(默认情况),它的效果与enableDlq相同,即把发送到死信队列(DLQ)的错误消息进行自动提交,而其他情况下不进行提交。默认值:未设置。
- 重置偏移量
-
是否重置消费者的偏移量为提供的 startOffset 值。
如果提供了KafkaRebalanceListener,则必须为 false;参见 使用 KafkaRebalanceListener。默认值:
false。 - startOffset
-
新组的起始偏移量。
允许值:earliest和latest。
如果消费者组通过spring.cloud.stream.bindings.<channelName>.group显式设置给消费者“绑定”,则将 'startOffset' 设置为earliest。否则,对于anonymous消费者组将其设置为latest。
另请参阅resetOffsets(本列表前面)。默认值:null(等同于
earliest)。 - 启用死信队列
-
设置为 true 时,它会为消费者启用死信队列(DLQ)行为。 默认情况下,导致错误的消息会被转发到名为
error.<destination>.<group>的主题。 可以通过设置dlqName属性来配置死信队列的主题名称。 当错误数量相对较少且重放整个原始主题可能过于繁琐时,这提供了与更常见的 Kafka 重放场景不同的选择。 有关更多信息,请参阅 死信主题处理。 从版本 2.0 开始,发送到死信队列主题的消息增强了以下标头:x-original-topic、x-exception-message和x-exception-stacktrace作为byte[]。 默认情况下,失败的记录会被发送到与原始记录相同的分区号中的死信队列主题。 请参阅 死信主题分区选择 来了解如何更改此行为。 当destinationIsPattern为true时不允许。默认值:
false。 - 分区
-
当
enableDlq为 true,且此属性未设置时,会创建一个与主主题分区数相同的死信主题。
通常情况下,死信记录会被发送到与原始记录相同分区的死信主题中。
可以更改这种行为;请参阅死信主题分区选择。
如果将此属性设置为1并且没有DqlPartitionFunction的 Bean,则所有死信记录都将写入分区0。
如果此属性大于1,则您必须提供一个DlqPartitionFunction的 Bean。
请注意,实际的分区数量受绑定器的minPartitionCount属性影响。默认值:
none - 配置
-
包含通用Kafka消费者属性的键/值对映射。除了具有Kafka消费者属性外,还可以在此处传递其他配置属性。例如应用程序所需的某些属性,如
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar。默认值:空映射。
- 死信队列名称
-
接收错误消息的死信队列(DLQ)主题的名称。
默认值:null(如果未指定,则导致错误的消息会转发到名为
error.<destination>.<group>的主题)。 - 延迟消息生产者属性
-
使用此功能,可以设置与死信队列(DLQ)相关的生产者属性。
所有可以通过 Kafka 生产者属性设置的属性都可以通过此属性进行设置。
当在消费者上启用原生解码时(即 useNativeDecoding: true),应用程序必须为 DLQ 提供相应的键/值序列化器。
这必须以dlqProducerProperties.configuration.key.serializer和dlqProducerProperties.configuration.value.serializer的形式提供。默认:Kafka 生产者默认属性。
- standardHeaders
-
指示入站通道适配器填充哪些标准标头。
允许值:none,id,timestamp,或both。
如果使用原生反序列化并且接收消息的第一个组件需要一个id(例如配置为使用JDBC消息存储的聚合器),则很有用。默认值:
none - 转换器Bean名称
-
实现
RecordMessageConverter的bean的名称。用于入站通道适配器中,以替换默认的MessagingMessageConverter。默认值:
null - 空闲事件间隔
-
表示没有最近收到消息的事件之间的时间间隔(单位:毫秒)。 使用
ApplicationListener<ListenerContainerIdleEvent>来接收这些事件。 参见消费者暂停与恢复示例,了解使用示例。默认值:
30000 - 目的地是模式
-
当为 true 时,目的地被视为正则表达式
Pattern,代理用其匹配主题名称。
当为 true 时,不会配置主题,并且不允许使用enableDlq,因为在配置阶段绑定器不知道主题名称。
请注意,检测与模式匹配的新主题所需的时间由消费者属性metadata.max.age.ms控制(在编写本文时),默认值为 300,000 毫秒(5 分钟)。
可以使用上面的configuration属性进行配置。默认值:
false - topic.properties
-
用于在配置新主题时使用的Kafka主题属性的
Map——例如,spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0默认值为 no.
- topic.replicas-assignment
-
一个<Integer, List<Integer>> 的副本分配映射,其中键为分区,值为分配。
用于配置新主题时使用。
参见 <code>0</code> Java 文档在 <code>1</code> jar 中的说明。
默认值为 no.
- topic.replication-factor
-
配置主题时要使用的复制因子。覆盖绑定器范围内的设置。
如果存在replicas-assignments,则忽略此设置。默认值:无(使用绑定器范围的默认值1)。
- 轮询超时
-
轮询中用于可轮询消费者超时时间。
默认值:5秒。
- 事务管理器
-
用于覆盖此绑定的Binder事务管理器的
KafkaAwareTransactionManager的Bean名称。如果要使用ChainedKafkaTransactionManaager将另一个事务与Kafka事务同步,则通常需要它。为了实现记录的精确一次消费和生产,必须为所有消费者和生产者绑定配置相同的事务管理器。默认值为 no.
1.3.3. 批量消费
从版本 3.0 开始,当spring.cloud.stream.binding.<name>.consumer.batch-mode设置为true时,轮询 Kafka Consumer接收的所有记录都将作为List<?>传递给监听器方法。否则,该方法将一个记录接一个地被调用。批处理的大小由 Kafka 消费者属性max.poll.records、min.fetch.bytes、fetch.max.wait.ms
在使用批处理模式时,绑定器内不支持重试,因此maxAttempts会被覆盖为1。您可以配置一个 您还可以手动使用 有关这些技术的更多信息,请参阅Spring for Apache Kafka 文档。 |
1.3.4. Kafka生产者属性
避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为 spring.cloud.stream.kafka.default.producer.<property>=<value>。 |
仅适用于Kafka生产者的以下属性可用,并且必须使用spring.cloud.stream.kafka.bindings.<channelName>.producer.作为前缀。
- admin.configuration
-
{ 'html':'***' }
- admin.replicas-assignment
-
{ 'html':'***' }
- admin.replication-factor
-
{ 'html':'***' }
- 缓冲区大小
-
Kafka 生产者在发送前尝试批处理数据的最大字节数上限。
默认值:
16384。 - 同步
-
生产者是否为同步的。
默认值:
false。 - 发送超时表达式
-
在启用同步发布时,针对传出消息评估的 SpEL 表达式用于确定等待确认的时间(以毫秒为单位)——例如,
headers['mySendTimeout']。
当版本低于 3.0 时,除非使用原生编码,否则无法使用负载,因为在评估此表达式时,负载已经是byte[]的形式。现在,在转换负载之前就评估该表达式。默认值:
none。 - 批量超时
-
生产者在发送同一批次中的消息之前等待多长时间以允许更多消息累积。(通常,生产者不会等待,并且会立即发送所有已累积的消息。非零值可能会增加吞吐量,但会增加延迟)。
默认值:
0。 - 消息键表达式
-
针对传出消息评估的 SpEL 表达式用于填充生成的 Kafka 消息的键——例如,
headers['myKey']。在 3.0 版本之前,除非使用原生编码,否则无法使用负载,因为在此表达式被评估时,负载已经是byte[]的形式。现在,在转换负载之前就评估该表达式。默认值:
none。 - headerPatterns
-
用于匹配Spring消息头并映射到Kafka
Headers的简单模式的逗号分隔列表,位于ProducerRecord中。
模式可以以通配符(星号)开头或结尾。
可以通过在前面加上!来否定模式。
匹配将在第一次匹配(正向或负向)后停止。
例如!ask,as*将通过ash但不通过ask。id和timestamp永远不会被映射。默认值:
*(所有请求头,除了id和timestamp) - 配置
-
包含通用 Kafka 生产者属性的键/值对映射。
默认值:空映射。
- topic.properties
-
用于在配置新主题时使用的Kafka主题属性的
Map——例如,spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0 - topic.replicas-assignment
-
一个<Integer, List<Integer>> 的副本分配映射,其中键为分区,值为分配。
用于配置新主题时使用。
参见 <code>0</code> Java 文档在 <code>1</code> jar 中的说明。
默认值为 no.
- topic.replication-factor
-
配置主题时要使用的复制因子。覆盖绑定器范围内的设置。
如果存在replicas-assignments,则忽略此设置。默认值:无(使用绑定器范围的默认值1)。
- 使用主题标题
-
设置为
true,以使用传出消息中的KafkaHeaders.TOPIC消息头的值覆盖默认绑定目标(主题名称)。 如果未提供该标头,则使用默认绑定目标。 默认:false。 - 记录元数据通道
-
成功发送结果应被发送到的
MessageChannel的bean名称;该bean必须存在于应用程序上下文中。
发送到通道的消息是已转换(如果有)的已发送消息,并带有额外的标头KafkaHeaders.RECORD_METADATA。
此标头包含Kafka客户端提供的RecordMetadata对象;它包括记录在主题中写入的分区和偏移量。
ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
失败发送到生产者错误通道(如果已配置);参见错误通道。
默认值:null
+
Kafka 绑定器使用生产者的 partitionCount 设置作为提示,以创建具有给定分区数目的主题(结合minPartitionCount,两者中的最大值被用作该值)。当同时为绑定器配置 如果一个已经存在的主题分区数目较小,并且 如果一个已经存在的主题分区数目较小,并且 如果一个已存在的主题分区数目大于 ( |
- 压缩
-
设置
compression.type生产者属性。支持的值为none、gzip、snappy和lz4。如果您覆盖kafka-clientsjar到2.1.0(或更高版本),如Spring for Apache Kafka文档中所述,并希望使用zstd压缩,请使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd。默认值:
none。 - 事务管理器
-
用于覆盖此绑定的Binder事务管理器的
KafkaAwareTransactionManager的Bean名称。如果要使用ChainedKafkaTransactionManaager将另一个事务与Kafka事务同步,则通常需要它。为了实现记录的精确一次消费和生产,必须为所有消费者和生产者绑定配置相同的事务管理器。默认值为 no.
1.3.5. 使用示例
在本节中,我们展示了如何针对特定场景使用前面所述的属性。
示例:将autoCommitOffset设置为false并依赖手动确认
此示例说明了如何在使用者应用程序中手动确认偏移量。
此示例要求将spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset设置为1 。使用相应的输入通道名称为您示例。
@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
示例:安全配置
Apache Kafka 0.9 支持客户端和代理之间的安全连接。为了利用此功能,请遵循Apache Kafka 文档以及Kafka 0.9的Confluent文档中的安全性指南。使用spring.cloud.stream.kafka.binder.configuration选项为绑定器创建的所有客户端设置安全属性。
例如,要将security.protocol设置为SASL_SSL,请设置以下属性:
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
其他所有安全性属性可以采用类似的方式设置。
使用 Kerberos 时,请按照参考文档中的说明创建和引用 JAAS 配置。参考文档
Spring Cloud Stream 支持通过使用 JAAS 配置文件和使用 Spring Boot 属性来向应用程序传递 JAAS 配置信息。
使用 JAAS 配置文件
可以通过使用系统属性为 Spring Cloud Stream 应用程序设置 JAAS 和(可选地)krb5 文件位置。<br/>以下示例显示了如何通过使用 JAAS 配置文件来启动具有 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序:<br/>
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot 属性
作为使用 JAAS 配置文件的替代方案,Spring Cloud Stream 提供了一种通过使用 Spring Boot 属性来为 Spring Cloud Stream 应用程序设置 JAAS 配置的机制。
可以使用以下属性来配置Kafka客户端的登录上下文:
- spring.cloud.stream.kafka.binder.jaas.loginModule
-
登录模块名称。在正常情况下无需设置。
默认值:
com.sun.security.auth.module.Krb5LoginModule。 - spring.cloud.stream.kafka.binder.jaas.controlFlag
-
登录模块的控制标志。
默认值:
required。 - spring.cloud.stream.kafka.binder.jaas.options
-
包含登录模块选项的键/值对映射。
默认值:空映射。
以下示例显示了如何通过使用Spring Boot配置属性来启动带有SASL和Kerberos的Spring Cloud Stream应用程序:
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM
前面的示例表示等效于以下JAAS文件:<br>
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_client.keytab"
principal="[email protected]";
};
如果所需的主题已存在于代理上或由管理员创建,则可以关闭自动创建功能,只需发送客户端JAAS属性即可。
| 不要在同一应用程序中混合使用 JAAS 配置文件和 Spring Boot 属性。 如果 |
使用autoCreateTopics和autoAddPartitions时请谨慎操作Kerberos。通常,应用程序可以使用在Kafka和Zookeeper中没有管理权限的主体。 因此,依赖Spring Cloud Stream创建/修改主题可能会失败。 在安全环境中,我们强烈建议通过使用Kafka工具手动创建主题并管理ACLs。 |
示例:暂停和恢复消费者
如果您希望暂停消费但不引起分区重新平衡,可以暂停和恢复消费者。这可以通过在您的@StreamListener中添加Consumer作为参数来实现。ApplicationListener用于ListenerContainerIdleEvent实例以恢复消费。事件发布的频率由idleEventInterval属性控制。由于消费者不是线程安全的,您必须在调用线程上调用这些方法。
以下简单的应用程序展示了如何暂停和恢复:
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
}
@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
return event -> {
System.out.println(event);
if (event.getConsumer().paused().size() > 0) {
event.getConsumer().resume(event.getConsumer().paused());
}
};
}
}
1.4. 事务绑定器
通过将spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix设置为非空值(例如tx-)来启用事务。在处理器应用程序中,消费者启动事务;任何由消费者线程发送的记录都参与同一事务。当监听器正常退出时,监听器容器会向事务发送偏移量并提交该事务。使用公共生产者工厂用于所有使用spring.cloud.stream.kafka.binder.transaction.producer.*属性配置的生产者绑定;忽略单个绑定Kafka生产者属性。
普通绑定器重试(以及死信)在事务中不受支持,因为重试将在原始事务中运行,该事务可能会回滚,并且任何已发布的记录也会被回滚。当启用重试时(常见属性maxAttempts大于零),将使用重试属性来配置DefaultAfterRollbackProcessor以在容器级别启用重试。同样地,不是在事务内发布死信记录,而是通过DefaultAfterRollbackProcessor将此功能移至监听器容器,在主事务回滚之后运行。 |
如果要在源应用程序中使用事务,或从任意线程为仅生产者事务(例如 @Scheduled 方法)使用事务,则必须获取事务性生产者工厂的引用,并使用它定义一个 KafkaTransactionManager bean。
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
请注意,我们使用BinderFactory获取绑定程序的引用;当只有一个绑定程序配置时,在第一个参数中使用null。
如果配置了多个绑定程序,请使用绑定程序名称来获取引用。
一旦我们有了绑定程序的引用,就可以获得ProducerFactory的引用并创建一个事务管理器。
然后,您将使用正常的Spring事务支持,例如TransactionTemplate或@Transactional,例如:
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
如果您希望将仅生产者事务与来自其他某些事务管理器的事务同步,请使用ChainedTransactionManager。
如果您部署了应用程序的多个实例,每个实例都需要一个唯一的transactionIdPrefix。 |
1.5. 错误通道
从版本 1.3 开始,绑定器会无条件地将每个消费者目标的异常发送到错误通道,并且还可以配置为将异步生产者发送失败发送到错误通道。 有关更多信息,请参阅[spring-cloud-stream-overview-error-handling]。
请求的 ErrorMessage 负载为带有属性的 KafkaSendFailureException:
-
failedMessage: Spring 消息Message<?>: 发送失败的消息。 -
record: The rawProducerRecordthat was created from thefailedMessage
没有对生产者异常(例如发送到死信队列)进行自动处理。 您可以使用自己的Spring Integration流程来消费这些异常。
1.6. Kafka指标
Kafka 绑定模块公开了以下指标:
spring.cloud.stream.binder.kafka.offset: 此指标表示给定绑定器的主题中,尚未被特定消费者组消费的消息数量。所提供的的指标基于Mircometer指标库。该指标包含消费者组信息、主题以及从最新偏移量到已提交偏移量的实际延迟。
此指标对于向PaaS平台提供自动扩展反馈特别有用。
1.7. 垃圾记录(空记录值)
使用压缩主题时,值为 null 的记录(也称为墓碑记录)表示某个键的删除。
要在 @StreamListener 方法中接收此类消息,则必须将参数标记为不需要以接收 null 值参数。
@StreamListener(Sink.INPUT)
public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key,
@Payload(required = false) Customer customer) {
// customer is null if a tombstone record
...
}
1.8. 使用 KafkaRebalanceListener
应用程序可能希望在初始分配分区时寻找任意偏移量的主题/分区,或者对使用者执行其他操作。
从版本 2.1 开始,如果在应用程序上下文中提供一个单例 KafkaRebalanceListener bean,则将其注入所有 Kafka 消费器绑定中。
public interface KafkaBindingRebalanceListener {
/**
* Invoked by the container before any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
}
/**
* Invoked by the container after any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
/**
* Invoked when partitions are initially assigned or after a rebalance.
* Applications might only want to perform seek operations on an initial assignment.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
boolean initial) {
}
}
不能将 resetOffsets 消费器属性设置为 true,同时提供平衡侦听器。
1.9. 死信主题处理
1.9.1. 死信主题分区选择
按默认值,记录使用与原始记录相同的分区发布到死信主题。这意味着死信主题必须至少具有与原始记录相同的分区数。
要更改此行为,请添加一个DlqPartitionFunction实现作为@Bean到应用程序上下文中。 只能有一个这样的Bean存在。 函数接收消费者组、失败的ConsumerRecord和异常。 例如,如果总是想路由到分区0,可以使用:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果您将消费者绑定的dlqPartitions属性设置为1(且绑定器的minPartitionCount等于1),则无需提供DlqPartitionFunction;框架将始终使用分区0。如果您将消费者绑定的 |
1.9.2. 处理死信主题中的记录
由于框架无法预测用户希望如何处理被死信的消息,因此它不提供任何标准机制来处理这些消息。如果导致消息进入死信队列的原因是暂时性的,您可能希望将这些消息重新路由回原始主题。但是,如果问题是永久性的,则可能导致无限循环。本主题中的Spring Boot应用程序示例展示了如何将这些消息重新路由回原始主题,但在三次尝试后会将它们移动到一个“停车场”主题中。该应用程序是另一个读取死信主题的spring-cloud-stream应用程序,并且在5秒内未收到任何消息时终止。
示例假设原始目标为 so8400out,消费组为 so8400。
有几种策略要考虑:
-
考虑仅在主应用程序未运行时运行重新路由。否则,非常快速地用完临时错误重试。
-
另外,可以使用两阶段的方法:使用此应用程序对第三个主题进行路由,并使用另一个对从那里返回到主主题。
下面的代码清单显示了示例应用程序:
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private MessageChannel parkingLot;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<?> reRoute(Message<?> failed) {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries.intValue() < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
parkingLot.send(MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, terminating");
return;
}
}
}
public interface TwoOutputProcessor extends Processor {
@Output("parkingLot")
MessageChannel parkingLot();
}
}
1.10. 使用Kafka绑定程序进行分区
Apache Kafka 原生支持主题分区。
有时将数据发送到特定分区是有利的——例如,当您希望严格按顺序处理消息时(针对特定客户的的所有消息应发送到同一分区)。
下面的例子展示了如何配置生产者和消费者端:
@SpringBootApplication
@EnableBinding(Source.class)
public class KafkaPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"foo1", "bar1", "qux1",
"foo2", "bar2", "qux2",
"foo3", "bar3", "qux3",
"foo4", "bar4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
.web(false)
.run(args);
}
@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
public Message<?> generate() {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
}
}
spring:
cloud:
stream:
bindings:
output:
destination: partitioned.topic
producer:
partition-key-expression: headers['partitionKey']
partition-count: 12
主题必须配置足够的分区,以便为所有消费者组实现所需的并发性。
上述配置最多支持12个消费者实例(如果它们的concurrency是2,则最多支持6个;如果并发度是3,则最多支持4个,以此类推)。
通常最好“过度配置”分区,以允许未来增加消费者或提高并发度。 |
先前的配置使用默认分区(key.hashCode() % partitionCount)。
这可能提供也可能不提供适当平衡的算法,具体取决于键值。
您可以使用 partitionSelectorExpression 或 partitionSelectorClass 属性覆盖此默认设置。 |
由于分区由 Kafka 原生处理,因此消费者端不需要特殊配置。Kafka 会在实例之间分配分区。
以下 Spring Boot 应用程序侦听 Kafka 流,并将每个消息进入的分区 ID 打印到控制台(Console):
@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@StreamListener(Sink.INPUT)
public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(in + " received from partition " + partition);
}
}
spring:
cloud:
stream:
bindings:
input:
destination: partitioned.topic
group: myGroup
可以根据需要添加实例。
Kafka 会重新平衡分区分配。
如果实例数量(或 instance count * concurrency)超过分区数,则某些消费者处于空闲状态。
2. Kafka Streams绑定
2.1. 使用方法
对于使用 Kafka Streams 绑定器,您只需将其添加到 Spring Cloud Stream 应用程序中,使用以下 Maven 坐标:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
一个快速启动Kafka Streams绑定器的新项目的方法是使用Spring初始器,然后选择“云流”和“Spring for Kafka Streams”,如下面所示。
2.2. 概述
Spring Cloud Stream 包含一个专门设计用于绑定的 Apache Kafka 流库实现。使用此本机集成,Spring Cloud Stream 处理程序应用程序可以直接在核心业务逻辑中使用 Apache Kafka 流 API。
Kafka Streams binder实现建立在Spring for Apache Kafka项目提供的基础上。
Kafka 流式处理程序绑定器提供对 Kafka 流式处理中的三大主要类型的支持 - < code > 0 、 < code > 1 和 < code > 2 。
Kafka 流应用程序通常遵循这样的模型:从入站主题读取记录,应用业务逻辑,然后将转换后的记录写入出站主题。 也可以定义没有出站目标的处理器应用程序。
在下面的节中,我们将详细介绍Spring Cloud Stream与Kafka Streams的集成。
2.3. 编程模型
在使用 Kafka Streams 绑定器提供的编程模型时,可以使用高级 Streams DSL 和高级与低级 处理器-API 的混合作为选项。当混合使用高级和低级 API 时,通常通过调用 transform 或 process 上 KStream 的 API 方法来实现。
2.3.1. 功能样式
从 Spring Cloud Stream 3.0.0 开始,Kafka Streams 绑定器允许应用程序使用 Java 8 中可用的函数式编程风格进行设计和开发。这意味着应用程序可以简洁地表示为类型 java.util.function.Function 或 java.util.function.Consumer 的 lambda 表达式。
让我们看一个非常基础的例子。
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
虽然简单,但这是一个完整的独立Spring Boot应用程序,利用Kafka Streams进行流处理。这是一个没有出站绑定且只有一个入站绑定的消费者应用程序。该应用程序消耗数据,并将KStream键和值的信息记录到标准输出中。该应用程序包含SpringBootApplication注解和一个标记为Bean的方法。bean方法是类型java.util.function.Consumer,其参数化为KStream。然后在实现中,我们返回了一个Consumer对象,它基本上是一个lambda表达式。在lambda表达式内部提供了处理数据的代码。
在此应用程序中,有一个单一输入绑定,其类型为KStream。
绑定器为此应用程序创建此绑定,并为其分配名称process-in-0,即函数bean名称后跟连字符(-)和字面量in,再跟另一个连字符以及参数的序数位置。
您使用此绑定名称来设置其他属性,例如目标。
例如,spring.cloud.stream.bindings.process-in-0.destination=my-topic。
| 如果绑定上未设置目标属性,则会创建一个与绑定同名的主题(如果有足够的权限),或者期望该主题已经存在。 |
构建为一个 uber-jar 后(例如,kstream-consumer-app.jar),您可以像下面这样运行上面的例子。
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic
这里是另一个完整的处理器示例,它同时具有输入和输出绑定。这是经典的单词计数示例,在该示例中,应用程序从主题接收数据,并在滚动时间窗口内计算每个单词出现的次数。
@SpringBootApplication
public class WordCountProcessorApplication {
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("word-counts-state-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
}
这里再次是一个完整的 Spring Boot 应用程序。与第一个应用程序的不同之处在于,bean 方法的类型是 java.util.function.Function。
对于 Function 的第一个参数化类型用于输入 KStream,第二个类型用于输出。
在方法体中,提供了一个 lambda 表达式,其类型为 Function,并给出了实际业务逻辑作为实现。
类似于之前讨论过的基于 Consumer 的应用程序,这里的输入绑定默认命名为 process-in-0。对于输出,绑定名称也自动设置为 process-out-0。
构建为一个可执行 jar 文件(例如,wordcount-processor.jar)后,您可以像下面这样运行上面的例子。
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
此应用程序将从 Kafka 主题 words 中获取消息,并将计算结果发布到输出主题 counts。
Spring Cloud Stream 将确保来自入站和出站主题的消息自动绑定为 KStream 对象。作为开发人员,您可以专注于代码的业务方面,即编写处理器所需的逻辑。设置 Kafka Streams 基础设施所需的特定配置由框架自动处理。
我们上面看到的两个示例都有一个KStream输入绑定。在这两种情况下,这些绑定都从单个主题接收记录。
如果您希望将多个主题多路复用到单个KStream绑定中,则可以在下面提供逗号分隔的Kafka主题作为目标。
spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3
另外,如果您希望将主题模式作为目标位置提供以与正则表达式匹配,则可以做到。
spring.cloud.stream.bindings.process-in-0.destination=input.*
多重输入绑定
许多非平凡的 Kafka Streams 应用程序通常通过多个绑定从多个主题中消费数据。例如,一个主题被作为 Kstream 消费,而另一个主题则被作为 KTable 或 GlobalKTable 消费。应用程序可能希望将数据以表格类型接收有许多原因。考虑一种使用场景,其中底层主题是通过数据库中的变更数据捕获(CDC)机制填充的,或者也许应用程序只关心下游处理的最新更新。如果应用程序指定数据需要绑定为KTable或GlobalKTable,那么Kafka Streams绑定器会正确地将目的地绑定到KTable或GlobalKTable,以便应用程序可以操作它们。我们将看看几种不同的场景,如何在Kafka Streams绑定器中处理多个输入绑定。
Kafka Streams绑定中的BiFunction
这里有一个包含两个输入和一个输出的示例。在这种情况下,应用程序可以利用java.util.function.BiFunction。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
再次,基本主题与前面的示例相同,但这里有两个输入。
Java 的 BiFunction 支持用于将输入绑定到所需的目标。
绑定器为输入生成的默认绑定名称分别为 process-in-0 和 process-in-1。默认输出绑定是 process-out-0。
在此示例中,BiFunction 的第一个参数被绑定为第一个输入的 KStream,第二个参数被绑定为第二个输入的 KTable。
BiConsumer 在 Kafka Streams 绑定中
如果有两个输入,但没有输出,则可以使用java.util.function.BiConsumer,如下所示。
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {}
}
超过两个输入
如果需要超过两个输入怎么办?<br/>在某些情况下,您可能需要使用多于两个的输入。在这种情况下,绑定器允许您链接部分函数。<br/>用函数式编程术语来说,这种技术通常被称为柯里化(currying)。<br/>随着Java 8添加了函数式编程支持,现在Java可以编写柯里化函数。<br/>Spring Cloud Stream Kafka Streams绑定器可以利用此功能来实现多个输入绑定。
让我们看一个例子。
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
return orders -> (
customers -> (
products -> (
orders.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
让我们看看上面提出的绑定模型的详细信息。在此模型中,我们对传入的数据应用了三个部分函数。让我们将它们称为f(x)、f(y)和f(z)。如果我们将这些函数扩展为真正的数学函数,它们看起来会像这样:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>。变量x代表KStream<Long, Order>,变量y代表GlobalKTable<Long, Customer>,而变量z则代表GlobalKTable<Long, Product>。第一个函数 f(x) 具有应用程序的第一个输入绑定 (KStream<Long, Order>),其输出是该函数 f(y)。函数f(y)具有应用程序的第二个输入绑定(GlobalKTable<Long, Customer>),并且其输出是另一个函数,f(z)。应用程序的第三个输入(GlobalKTable<Long, Product>)是该函数 f(z) 的输入,其输出为 KStream<Long, EnrichedOrder>,即应用程序的最终输出绑定。三个部分函数的输入分别为KStream、GlobalKTable和GlobalKTable,您可以在方法体中使用它们来实现业务逻辑。
输入绑定分别命名为enrichOrder-in-0、enrichOrder-in-1和enrichOrder-in-2。输出绑定命名为enrichOrder-out-0。
使用柯里化函数,你可以几乎拥有任意数量的输入。然而,请记住,如果输入的数量超过一个较小的数字,并且如上所述在Java中部分应用这些函数可能会导致代码难以阅读。<br/>因此,如果你的Kafka Streams应用程序需要超过合理的小数量的输入绑定并且你想要使用这种功能模型,那么你可能需要重新考虑你的设计并适当地分解应用程序。
多个输出绑定
Kafka Streams 允许将输出数据写入多个主题。此功能在 Kafka Streams 中被称为分支。
当使用多个输出绑定时,需要提供一个 KStream 数组(KStream[])作为输出返回类型。
这是一个示例:
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
编程模型保持不变,但输出参数化类型是KStream[]。
默认的输出绑定名称分别是process-out-0、process-out-1和process-out-2。
绑定器生成三个输出绑定的原因在于它检测到返回的KStream数组的长度。
关于Kafka Streams函数式编程风格的总结
总之,下表显示了在函数式编程中可以使用的各种选项。
| 输入数量 | 输出数量 | 要使用的组件 |
|---|---|---|
1 |
0 |
java.util.function.Consumer |
2 |
0 |
java.util.function.BiConsumer |
1 |
1..n |
java.util.function.Function |
2 |
1..n |
java.util.function.BiFunction |
>= 3 |
0..n |
使用柯里化函数 |
-
如果此表中有多个输出,则类型将简单地变为
KStream[]。
2.3.2. 命令式编程模型。
虽然上述的功能编程模型是首选方法,但如果您更喜欢,仍然可以使用基于StreamListener的经典方法。
这里有的一些示例。
以下是使用StreamListener的单词计数示例。
@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class WordCountProcessorApplication {
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<?, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-multi"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
如你所见,这有点冗长,因为你需要提供EnableBinding和其他额外的注解,例如StreamListener和SendTo来使它成为一个完整应用程序。EnableBinding是你指定包含绑定接口的地方,在这种情况下,我们使用了带有以下契约的默认KafkaStreamsProcessor绑定接口。
public interface KafkaStreamsProcessor {
@Input("input")
KStream<?, ?> input();
@Output("output")
KStream<?, ?> output();
}
Binder 将为输入 KStream 和输出 KStream 创建绑定,因为您正在使用包含这些声明的绑定接口。
除了函数式编程模型中明显不同的编程模型之外,这里需要特别提到的一点是绑定名称是由你在绑定接口中指定的内容。例如,在上述应用程序中,由于我们使用的是KafkaStreamsProcessor,所以绑定名称为input和output。绑定属性需要使用这些名称。例如spring.cloud.stream.bindings.input.destination、spring.cloud.stream.bindings.output.destination等。请记住,这与函数式风格有根本不同,因为在那种情况下绑定器会为应用程序生成绑定名称。这是因为应用程序在使用EnableBinding的函数模型中没有提供任何绑定接口。
这是另一个使用两个输入的数据接收器示例。
@EnableBinding(KStreamKTableBinding.class)
.....
.....
@StreamListener
public void process(@Input("inputStream") KStream<String, PlayEvent> playEvents,
@Input("inputTable") KTable<Long, Song> songTable) {
....
....
}
interface KStreamKTableBinding {
@Input("inputStream")
KStream<?, ?> inputStream();
@Input("inputTable")
KTable<?, ?> inputTable();
}
以下是上面所见基于StreamListener的相同BiFunction处理器的等效。
@EnableBinding(KStreamKTableBinding.class)
....
....
@StreamListener
@SendTo("output")
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
@Input("inputTable") KTable<String, String> userRegionsTable) {
....
....
}
interface KStreamKTableBinding extends KafkaStreamsProcessor {
@Input("inputX")
KTable<?, ?> inputTable();
}
最后,这是具有三个输入和柯里化函数的应用程序的StreamListener等效。
@EnableBinding(CustomGlobalKTableProcessor.class)
...
...
@StreamListener
@SendTo("output")
public KStream<Long, EnrichedOrder> process(
@Input("input-1") KStream<Long, Order> ordersStream,
@Input("input-"2) GlobalKTable<Long, Customer> customers,
@Input("input-3") GlobalKTable<Long, Product> products) {
KStream<Long, CustomerOrder> customerOrdersStream = ordersStream.join(
customers, (orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order));
return customerOrdersStream.join(products,
(orderId, customerOrder) -> customerOrder.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
});
}
interface CustomGlobalKTableProcessor {
@Input("input-1")
KStream<?, ?> input1();
@Input("input-2")
GlobalKTable<?, ?> input2();
@Input("input-3")
GlobalKTable<?, ?> input3();
@Output("output")
KStream<?, ?> output();
}
您可能注意到,上述两个示例的代码量甚至更多,因为除了提供EnableBinding之外,还需要编写自己的自定义绑定接口。
使用函数式模型,您可以避免所有这些繁琐细节。
在我们继续讨论 Kafka Streams 绑定提供的通用编程模型之前,这里是多输出绑定的StreamListener版本。
EnableBinding(KStreamProcessorWithBranches.class)
public static class WordCountProcessorApplication {
@Autowired
private TimeWindows timeWindows;
@StreamListener("input")
@SendTo({"output1","output2","output3"})
public KStream<?, WordCount>[] process(KStream<Object, String> input) {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(timeWindows)
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
interface KStreamProcessorWithBranches {
@Input("input")
KStream<?, ?> input();
@Output("output1")
KStream<?, ?> output1();
@Output("output2")
KStream<?, ?> output2();
@Output("output3")
KStream<?, ?> output3();
}
}
总结一下,我们回顾了使用Kafka Streams绑定器时的各种编程模型选择。
绑定器提供了对输入上的KStream、KTable和GlobalKTable的绑定功能。KTable和GlobalKTable绑定仅在输入上可用。对于KStream,绑定器既支持输入也支持输出绑定。
Kafka Streams 绑定器的编程模型的核心要点在于,该绑定器为您提供了灵活性,使您可以选择使用完全函数式的编程模型,或者采用基于StreamListener的命令式方法。
2.4. 编程模型的辅助功能
2.4.1. 在单个应用程序中使用多个 Kafka 流处理器
Binder 允许在一个单个的 Spring Cloud Stream 应用程序中拥有多个 Kafka Streams 处理器。</p><p>您可以拥有如下的应用程序。
@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
...
}
在本例中,绑定器将创建具有不同应用ID(稍后详细介绍)的3个单独的Kafka Streams对象。<br>但是,如果您有多个处理器,则必须告诉Spring Cloud Stream,需要激活哪些函数。这是激活这些函数的方法。
spring.cloud.stream.function.definition: process;anotherProcess;yetAnotherProcess
如果您希望某些功能在一开始时不立即激活,可以从这个列表中移除它们。
这也同样适用于在同一应用程序中有一个单独的Kafka Streams处理程序,并且还有通过不同绑定器(例如基于常规Kafka消息通道绑定器的函数Bean)处理的其他类型为Function的Bean
2.4.2. Kafka流应用程序ID
应用程序 ID 是 Kafka 流应用程序所需属性之一。 Spring Cloud Stream Kafka 流绑定程序允许您通过多种方式配置此应用程序 ID。
如果应用程序中只有一个处理器或 StreamListener,则可以使用以下属性在绑定器级别进行设置:
spring.cloud.stream.kafka.streams.binder.applicationId.
作为一个方便的选项,如果你只有一台处理器,也可以使用 spring.application.name 作为属性将应用程序 id 委托出去。
如果您的应用程序中有多个 Kafka 流处理器,那么需要为每个处理器设置应用程序 ID。在函数式模型中,可以将其附加到每个函数作为属性。
例如,假设您有以下功能。
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
...
}
和
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
然后您可以为每个使用以下绑定器级属性设置应用程序ID。
spring.cloud.stream.kafka.streams.binder.functions.process.applicationId
和
spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId
在StreamListener的情况下,您需要在处理器的第一个输入绑定上设置此选项。
例如,假设您有两个基于0的处理器。StreamListener
@StreamListener
@SendTo("output")
public KStream<String, String> process(@Input("input") <KStream<Object, String>> input) {
...
}
@StreamListener
@SendTo("anotherOutput")
public KStream<String, String> anotherProcess(@Input("anotherInput") <KStream<Object, String>> input) {
...
}
然后,您必须使用以下绑定属性为此设置应用ID。
spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId
和
spring.cloud.stream.kafka.streams.bindings.anotherInput.consumer.applicationId
对于基于函数的模型,也可以在此处设置应用程序 ID。如果使用的是函数式模型,则如上文所示在绑定程序级别设置每函数会更简单。
在生产部署中,强烈建议通过配置显式指定应用程序ID。 如果您正在自动扩展应用程序,则需要确保每个实例都使用相同的application ID进行部署,这尤其重要。
如果应用程序不提供应用程序 ID,则绑定器会为您自动生成一个静态的应用程序 ID。
在开发场景中,这样很方便,因为它避免了显式提供应用程序 ID 的需要。
以这种方式生成的应用程序 ID 在应用程序重启时将是静态的。
如果是函数模型,则生成的应用程序 ID 将是函数 Bean 名称后跟字面量applicationID,例如:process-applicationID 如果函数 Bean 名称为process;如果函数 Bean 名称为StreamListener,则生成的应用程序 ID 将使用包含类名后跟方法名再加字面量applicationId。
spring 概述
-
默认情况下,binder 将为每个函数或
StreamListener方法自动生成应用程序 ID。 -
如果您有单处理器,那么您可以使用
spring.kafka.streams.applicationId、spring.application.name或spring.cloud.stream.kafka.streams.binder.applicationId。 -
如果您的系统中存在多个处理器,则可以使用属性-
spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId为每个函数设置应用程序ID。
在情况StreamListener下,可以通过spring.cloud.stream.kafka.streams.bindings.input.applicationId实现,假设输入绑定名称是input。
2.4.3 使用函数式风格覆盖绑定器生成的默认绑定名称
默认情况下,当使用函数式风格时,绑定器使用上述策略生成绑定名称,即 function-bean-name-in|-out-[0..n],例如 process-in-0、process-out-0 等。</p><p>如果您想覆盖这些绑定名称,可以这样做,通过指定以下属性。
0. 绑定的默认名称是绑定器生成的原始绑定名称。
例如,假设你有一个这个函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
Binder 将生成具有名称的绑定,process-in-0、process-in-1 和 process-out-0。
现在,如果你希望将它们完全更改为其他不同的名称,比如更具体的领域绑定名称,你可以按下面的方式进行更改。
springc.cloud.stream.function.bindings.process-in-0=users
springc.cloud.stream.function.bindings.process-in-0=regions
和
spring.cloud.stream.function.bindings.process-out-0=clicks
在那之后,您必须在这些新的绑定名称上设置所有绑定级别属性。
请注意,即使使用上述功能性编程模型,大多数情况下遵循默认绑定名称也是有意义的。 如果您有大量配置属性,并且希望将绑定映射到更友好的领域,那么仍然可能需要这样做覆盖。
2.4.4. 设置启动服务器配置
当运行 Kafka 流应用程序时,您必须提供 Kafka broker 服务器信息。
如果您没有提供这些信息,那么 binder 将假定您正在运行 broker 的默认值为 localhost:9092。
如果情况并非如此,那么您需要进行重写。这里有几种方法可以做到这一点。
-
使用boot属性-
spring.kafka.bootstrapServers -
Binder级属性-<代码>0
当涉及到绑定程序级别属性时,无论您使用的是通过常规 Kafka 绑定程序提供的代理属性 spring.cloud.stream.kafka.binder.brokers。Kafka 流绑定程序将首先检查是否设置了 Kafka 流绑定程序特定代理属性 spring.cloud.stream.kafka.streams.binder.brokers,如果未找到,则查找 spring.cloud.stream.kafka.binder.brokers。
2.5. 记录序列化和反序列化
Kafka Streams binder 允许您以两种方式对记录进行序列化和反序列化。一个是 Kafka 提供的本机序列化和反序列化设施,另一个是 Spring Cloud Stream 框架的消息转换功能。让我们看看一些细节。
2.5.1. 入站反序列化
<keysAreAlwaysDeserializedUsingNativeSerdes/>
对于值,入站处的反序列化默认情况下由Kafka原生完成。请注意,这与Kafka Streams绑定器之前版本中的默认行为有重大变化,那时反序列化是由框架完成的。
Kafka Streams 绑定器将尝试通过查看 Serde 或 java.util.function.Function|Consumer 的类型签名来推断匹配的 StreamListener 类型。以下是它匹配 Serdes 的顺序。
-
如果应用程序提供了类型为
Serde的 bean,并且返回类型使用实际键或值类型参数化,那么它将为此使用该Serde进行反序列化。
例如,如果您在应用程序中具有以下内容,则检测器会发现KStream的传入值类型与Serdebean 的参数化类型匹配。
它将使用它进行反序列化。
@Bean
public Serde<Foo() customSerde{
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下来,它查看类型,看看它们是否是Kafka Streams公开的其中一个类型。如果是,就使用它们。<br /> 下面是绑定程序将从Kafka Streams匹配的Serde类型。
Integer, Long, Short, Double, Float, byte[], UUID and String.
-
如果Kafka Streams提供的任何Serdes都不匹配类型,那么它将使用Spring Kafka提供的JsonSerde。在这种情况下,绑定器假定这些类型是JSON友好的。
如果您的输入包含多个值对象,则这很有用,因为绑定器会将它们内部推断为正确的Java类型。
在回退到JsonSerde之前,绑定器还会检查默认的Serde`s set in the Kafka Streams configuration to see if it is a `Serde是否可以与传入的KStream类型匹配。
如果上述任何一种策略都无效,那么应用程序必须通过配置提供 `Serde`。这可以通过两种方式配置——绑定或默认。
First the binder will look if a Serde is provided at the binding level. For e.g. if you have the following processor,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
然后,您可以使用以下内容提供级别Serde的绑定:
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果为输入绑定提供如上所述的Serde,则该值将具有更高优先级,绑定器将避免任何Serde推断。 |
如果要在解压缩时使用默认密钥/值Serdes,可以在绑定程序级别执行此操作。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果不想使用Kafka提供的原生解码,可以依赖Spring Cloud Stream提供的消息转换功能。 由于原生解码是默认的,为了使Spring Cloud Stream对入站值对象进行反序列化,需要显式地禁用原生解码。
例如,如果你使用上面相同的BiFunction处理器,那么spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
你需要为所有输入单独禁用原生解码。否则,对于未禁用的输入仍然会应用原生解码。
默认情况下,Spring Cloud Stream 将使用 application/json 作为内容类型,并使用适当的 json 消息转换器。您可以通过使用以下属性和相应的 MessageConverter Bean 来使用自定义消息转换器。
spring.cloud.stream.bindings.process-in-0.contentType
2.5.2. 出站序列化
外出序列化基本上遵循上述与 inbound 反序列化相同的规则。 与 inbound 反序列化类似,从 Spring Cloud Stream 3.0 版本之前的 binder 开始的一个重大变化是,outbound 的序列化现在由 Kafka 原生处理。 在 3.0 版本之前的 binder 中,这是由框架本身处理的。
Keys on the outbound are always serialized by Kafka using a matching Serde that is inferred by the binder. If it can’t infer the type of the key, then that needs to be specified using configuration.
值序列化器和反序列化器是使用与入站反序列化相同的规则推断出来的。
首先,它会匹配检查出站类型是否来自应用程序中提供的bean。
如果没有,则检查其是否与Kafka公开的Serde之一匹配,例如:Integer、Long、Short、Double、Float、byte[]、UUID和String。
如果这也不行,那么将回退到Spring Kafka项目提供的JsonSerde,但在那之前,请先查看默认的Serde配置以确定是否有匹配项。
请记住,所有这些操作对应用程序来说都是透明的。
如果以上都不起作用,用户必须通过配置提供所需的Serde。
假设您正在使用与上述相同的BiFunction处理器。那么,您可以按照以下方式配置出站键/值Serdes。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果 Serde 推断失败,并且未提供绑定级别的 Serdes,则 binder 将回退到JsonSerde,但会查找默认 Serdes 是否匹配。
默认的序列化/反序列化器配置方式与上文中描述的反序列化部分相同。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您的应用程序使用分支功能并且具有多个输出绑定,则必须为每个绑定进行配置。再次强调,如果绑定程序能够推断出Serde类型,则不需要进行此配置。
如果您不希望使用 Kafka 提供的原生编码,而想使用框架提供的消息转换,则需要显式禁用原生编码,因为原生编码是默认选项。
例如,如果您的处理器与上面相同,则spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
在分支的情况下,您需要为所有输出单独禁用原生编码。否则,对于未禁用的输出仍将应用原生编码。
当Spring Cloud Stream进行转换时,默认情况下,它将使用application/json作为内容类型,并使用适当的JSON消息转换器。
您可以使用以下属性和相应的MessageConverter Bean来使用自定义消息转换器。
spring.cloud.stream.bindings.process-out-0.contentType
当禁用原生编码/解码时,binder 不会进行任何推断,这与使用原生序列化/反序列化的情况不同。
应用程序需要显式提供所有配置选项。
因此,通常建议对于序列化和反序列化的默认选项保持不变,并且在编写 Spring Cloud Stream Kafka Streams 应用程序时坚持使用 Kafka Streams 提供的原生反序列化。
你必须使用框架提供的消息转换功能的一个场景是,当你上游生产者正在使用特定的序列化策略时。
在这种情况下,你想使用匹配的反序列化策略,因为原生机制可能会失败。
当依赖于默认的Serde机制时,应用程序必须确保 binder 能够正确地将入站和出站映射到合适的Serde,否则事情可能会失败。
值得一提的是,上述数据序列化和反序列化的方案仅适用于处理器的边缘,即传入和传出。您的业务逻辑可能仍需要调用显式需要Serde对象的Kafka Streams API。这些仍然属于应用程序的责任,并且必须由开发人员相应地处理。
2.6. 错误处理
Apache Kafka Streams 提供了从反序列化错误处理异常的功能。
有关此支持的详细信息,请单击此处。
出站,Apache Kafka Streams 为反序列化异常提供两种类型的处理程序 - LogAndContinueExceptionHandler 和 LogAndFailExceptionHandler。
顾名思义,前者会记录错误并继续处理下一条记录,后者会记录错误并失败。 LogAndFailExceptionHandler 是默认的反序列化异常处理程序。
2.6.1. 处理绑定器中的反序列化异常
Kafka 流绑定器允许使用以下属性指定反序列化异常处理程序。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
or
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
在上面两个反序列化异常处理程序之外,绑定器还提供了一个第三个处理程序,用于将错误记录(毒药药丸)发送到 DLQ(死信队列)主题。 下面是如何启用此 DLQ 异常处理程序。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
当设置此属性时,反序列化错误中的所有记录都会自动发送到DLQ主题。
您可以在下面设置 DLQ 消息发布的话题名称。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
如果设置了此选项,则错误记录将发送到主题 custom-dlq。
如果没有设置,它将创建一个名为 error.<input-topic-name>.<application-id> 的死信队列(DLQ)主题。
例如,如果您的绑定的目标主题是 inputTopic 并且应用程序 ID 是 process-applicationId,则默认的 DLQ 主题为 error.inputTopic.process-applicationId。
建议显式地为每个输入绑定创建一个 DLQ 主题,如果您打算启用 DLQ。
2.6.2. 每个输入消费者绑定的死信队列(DLQ)
属性spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler适用于整个应用程序。这意味着,如果同一应用程序中有多个函数或StreamListener方法,则此属性将应用于所有这些函数或方法。然而,如果您在一个处理器中拥有多个处理器或者在单个处理器内的多个输入绑定,则可以使用粘贴器提供的更细粒度的DLQ控制来对每个输入消费者绑定进行控制。
如果拥有以下处理器,
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
并且,如果你只想在第一个输入绑定上启用死信队列(DLQ),并在第二个绑定上使用logAndSkip,则可以在下面的消费者中进行设置。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: logAndSkip
以这种方式设置反序列化异常处理程序具有比在绑定器级别设置更高的优先级。
2.6.3. 死信队列分区
按默认值,记录使用与原始记录相同的分区发布到死信主题。这意味着死信主题必须至少具有与原始记录相同的分区数。
若要更改此行为,请将DlqPartitionFunction实现作为@Bean添加到应用程序上下文中。只能存在一个这样的bean。该函数会提供消费者组(在大多数情况下与应用ID相同)、失败的ConsumerRecord和异常。例如,如果您始终希望路由到分区0,则可以使用:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果您将消费者绑定的dlqPartitions属性设置为1(且绑定器的minPartitionCount等于1),则无需提供DlqPartitionFunction;框架将始终使用分区0。如果您将消费者绑定的 |
使用Kafka Streams绑定中的异常处理功能时需要注意的一些事项。
-
属性
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler适用于整个应用程序。这意味着,如果同一应用程序中有多个函数或StreamListener方法,则此属性会应用于所有这些函数或方法。 -
反序列化时的异常处理与原生反序列化以及框架提供的消息转换一致。
2.6.4. 处理绑定器中的生产异常
与上述描述的反序列化异常处理器支持不同,绑定器不提供此类用于处理生产异常的一流机制。然而,您仍然可以使用StreamsBuilderFactoryBean自定义程序来配置生产异常处理器,有关更多信息,请参阅下面后续部分。
2.7. 状态存储
当使用高级 DSL 并执行相应调用触发状态存储时,Kafka Streams 将自动创建状态存储。
如果您希望将传入的KTable绑定作为命名状态存储进行实例化,可以使用以下策略。
假设你有以下函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
然后通过设置下面的属性,传入的KTable数据将会在命名状态存储中进行重构。
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store
你可以在应用程序中定义自定义状态存储作为bean,这些存储将被绑定程序检测到,并添加到Kafka流生成器中。 特别是当使用处理器api时,您需要手动注册一个状态存储。 为此,您可以在应用程序中创建StateStore作为bean的示例。
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
这些状态存储可以直接被应用程序访问。
在引导过程中,将由绑定器处理这些bean,并传递给Streams builder对象。
访问状态存储:
Processor<Object, Product>() {
WindowStore<Object, String> state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}
当涉及到注册全局状态存储时,这将无法工作。 要注册全局状态存储,请参阅下面关于自定义StreamsBuilderFactoryBean的部分。
2.8. 交互式查询
Kafka Streams 绑定器 API 暴露了一个名为 InteractiveQueryService 的类,用于与状态存储进行交互式查询。您可以在应用程序中将其作为 Spring Bean 访问。从应用程序中获取此 Bean 的简单方法是 autowire 这个 Bean。
@Autowired
private InteractiveQueryService interactiveQueryService;
Once you gain access to this bean, then you can query for the particular state-store that you are interested. See below.
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
在启动期间,调用上述方法检索存储时可能会失败。例如,在初始化状态存储的过程中可能尚未完成。在这种情况下,重试此操作会很有帮助。Kafka Streams绑定提供了一个简单的重试机制来解决这个问题。
您可使用以下两个属性来控制重试行为。
-
默认值为 1。 -
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认是
1000毫秒。
如果运行了多个 Kafka 流应用程序实例,那么在可以对其进行交互式查询之前,需要确定哪个应用实例托管要查询的特定密钥。
0 API 提供了用于确定主机信息的方法。
在进行此操作之前,必须按如下所示配置属性application.server:
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
这里有一些代码片段:
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
2.9. 健康指标
健康指示器需要依赖项 spring-boot-starter-actuator。对于 maven,请使用:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Spring Cloud Stream Kafka Streams Binder 提供了一个健康指示器,用于检查底层流线程的状态。 Spring Cloud Stream 定义了属性 management.health.binders.enabled 来启用健康指示器。请参阅 Spring Cloud Stream 文档。
健康指标为每个流线程的元数据提供以下详细信息:
-
线程名称
-
线程状态:
CREATED,RUNNING,PARTITIONS_REVOKED,PARTITIONS_ASSIGNED,PENDING_SHUTDOWN或DEAD -
活动任务:任务 ID 和分区
-
待处理任务:任务 ID 和分区
默认情况下,只有全局状态可见(UP或DOWN)。要显示详细信息,属性management.endpoint.health.show-details必须设置为ALWAYS或WHEN_AUTHORIZED。
有关健康信息的更多详细信息,请参阅Spring Boot操作员文档。
健康指标的状态为 UP,如果所有Kafka线程注册状态为RUNNING。 |
由于Kafka Streams binder中有三个单独的Binder(KStream、KTable和GlobalKTable),所有这些Binder都会报告健康状态。
当启用show-details时,可能会报告一些冗余信息。
当同一应用中有多个Kafka Streams处理器存在时,将为所有处理器报告健康检查,并按Kafka Streams的应用程序ID进行分类。
2.10. 访问Kafka Streams指标
Spring Cloud Stream Kafka Streams binder 提供了一种基本机制,用于访问通过 Micrometer 导出的 Kafka Streams 指标。MeterRegistry。Kafka Streams 指标可通过 KafkaStreams#metrics() 访问,并由绑定器导出到此计量注册表中。导出的指标来自消费者、生产者、管理员客户端以及流本身。
绑定器导出的指标采用指标组名称后跟点号,再跟实际指标名称的格式。原始指标信息中的所有连字符均被替换为点号。
例如,指标组 consumer-metrics 中的指标名称network-io-total 在micrometer注册表中以 consumer.metrics.network.io.total 的形式提供。同样地,stream-metrics中的指标commit-total 可作为stream.metrics.commit.total获取。
如果在同一个应用程序中有多个 Kafka Streams 处理器,则指标名称将以对应的应用程序 ID 前缀。
在这种情况下,应用程序 ID 将按原样保留,即不会将破折号转换为点等。
例如,如果第一个处理器的应用程序 ID 是processor-1,则度量组consumer-metrics中的指标名network-io-total可以在 Micrometer 注册表中作为processor-1.consumer.metrics.network.io.total获取。
您可以选择在应用程序中以编程方式访问 Micrometer MeterRegistry,然后遍历所有可用的计数器,或者使用 Spring Boot Starters通过 REST 端点访问指标。
当通过Starters端点进行访问时,请确保将metrics添加到属性management.endpoints.web.exposure.include中。
然后您可以通过/acutator/metrics访问来获取所有可用指标的列表,这些指标可以通过同一URI(/actuator/metrics/<metric-name>)单独访问。
除了通过KafkaStreams#metrics()可以访问的信息级别指标外(例如调试级别的指标),在将metrics.recording.level设置为DEBUG之后,仍然只能通过JMX访问这些指标。
Kafka Streams默认将此级别设置为INFO。
请参阅Kafka Streams文档中的本节了解更多信息。
在未来版本中,binder可能支持通过Micrometer导出这些DEBUG级别的指标。
2.11. 高级DSL与低级处理器API的混合使用
Kafka Streams 提供两种变体的 API。它有一个类似高级 DSL 的 API,其中可以链接各种操作,这对许多函数式程序员来说可能是熟悉的。Kafka Streams 还提供了低级处理器 API。处理器 API 尽管非常强大,能够以更低级别的控制方式处理事物,但它是命令式的。Spring Cloud Stream Kafka 流绑定器允许您使用高级 DSL 或混合使用 DSL 和处理器 API。这两种变体的混合使用为您在应用程序中提供了很多选项来控制各种用例。应用程序可以调用 transform 或 process 方法调用来访问处理器 API。
让我们看看如何在一个Spring Cloud Stream应用程序中使用process API,组合DSL和处理器API。
@Bean
public Consumer<KStream<Object, String>> process() {
return input ->
input.process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, String value) {
//business logic
}
@Override
public void close() {
});
}
这是一个使用transformAPI的示例。
@Bean
public Consumer<KStream<Object, String>> process() {
return (input, a) ->
input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void close() {
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}
process 方法调用是终端操作,而transform 方法是非终端,并提供了一个可能转换后的KStream,您可以使用它继续使用 DSL 或处理器 API 进行进一步处理。
2.12. 出站分区支持
一个Kafka Streams处理器通常将处理后的输出发送到出站Kafka主题。
如果出站主题被分区,并且处理器需要将传出数据发送到特定分区,则应用程序需要提供类型为StreamPartitioner的bean。
有关更多详细信息,请参阅StreamPartitioner。
让我们看一些示例。
这是我们已经多次看到的同一处理器,
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
...
}
这是输出绑定目标:
spring.cloud.stream.bindings.process-out-0.destination: outputTopic
如果主题 outputTopic 有4个分区,并且您没有提供一个分区策略,Kafka Streams 将使用默认的分区策略,这可能不是根据特定用例想要的结果。假设您希望将任何匹配到 spring 的键发送到分区 0,cloud 到分区 1,stream 到分区 2,而其他所有内容则发送到分区 3。这就是您需要在应用程序中执行的操作。
@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
return (t, k, v, n) -> {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("stream")) {
return 2;
}
else {
return 3;
}
};
}
这是一个基本的实现,但是您可以访问记录的关键字/值、主题名称以及分区总数。因此,如有需要,可以实现复杂的分区策略。
您还需要提供此 Bean 名称以及应用程序配置。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner
应用程序中的每个输出主题都需要像这样单独配置。
2.13. 流式工厂Bean构建器自定义
通常需要自定义创建 StreamsBuilderFactoryBean 的方式来生成 KafkaStreams 对象。 基于 Spring Kafka 提供的基础支持,绑定器允许您自定义 StreamsBuilderFactoryBean。 您可以使用 StreamsBuilderFactoryBeanCustomizer 来自定义 StreamsBuilderFactoryBean 本身。 然后,一旦通过此定制器获得对 StreamsBuilderFactoryBean 的访问权限后,就可以使用 KafkaStreamsCustomzier 来自定义相应的 KafkaStreams。 这两个定制器都是 Spring for Apache Kafka 项目的一部分。
Here is an example of using the StreamsBuilderFactoryBeanCustomizer.
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
上面显示的是您可以对 StreamsBuilderFactoryBean 所能做的自定义说明。您可以从 StreamsBuilderFactoryBean 调用任何可用的变异操作来定制它。此自定义器将在工厂 bean 启动前由绑定器调用。
一旦你获取到 StreamsBuilderFactoryBean 的访问权限,你还可以自定义底层的 KafkaStreams 对象。 以下是实现此目的的蓝图。
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
KafkaStreamsCustomizer 将在底层 KafkaStreams 启动前由 StreamsBuilderFactoryBeabn 调用。
在整个应用程序中,只能有一个StreamsBuilderFactoryBeanCustomizer。
那么,如何处理多个Kafka Streams处理器(每个都由独立的StreamsBuilderFactoryBean对象进行后备)呢?在这种情况下,如果针对这些处理器需要不同的自定义设置,应用程序需要根据应用程序ID应用一些过滤器。
例如,
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
}
};
2.13.1. 使用自定义器注册全局状态存储
如上所述,绑定器没有提供一种原生的方式来注册全局状态存储作为功能。为此,您需要使用自定义程序。以下是实现方法。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
try {
final StreamsBuilder streamsBuilder = fb.getObject();
streamsBuilder.addGlobalStore(...);
}
catch (Exception e) {
}
};
}
再次强调,如果您有多个处理器,则需要将全局状态存储附加到正确的 StreamsBuilder 上,并使用上述应用程序 ID 过滤掉其他 StreamsBuilderFactoryBean 对象。
2.13.2. 使用自定义器注册生产环境异常处理器
在错误处理部分,我们指出绑定器不提供处理生产异常的一流方式。
虽然这是事实,但您仍然可以使用StreamsBuilderFacotryBean定制器注册生产异常处理程序。请参见下文。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}
再次,如果您的处理器有多个,您可能需要针对StreamsBuilderFactoryBean设置它。
您也可以使用配置属性(有关更多详细信息,请参阅下面),但如果您选择采用程序化方法,这是一项选择。
2.14. 时间戳提取器
Kafka 流允许您根据各种时间戳概念控制对使用者记录的处理。默认情况下,Kafka 流从使用者记录中提取嵌入式时间戳元数据。您可以通过为每个输入绑定提供不同的TimestampExtractor实现来更改此默认行为。下面是一些关于如何做到这一点的详细信息。
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
return orderStream ->
customers ->
products -> orderStream;
}
@Bean
public TimestampExtractor timestampExtractor() {
return new WallclockTimestampExtractor();
}
然后你为每个消费者绑定设置上面的TimestampExtractorbean名字。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"
如果跳过输入消费者绑定以设置自定义时间戳提取器,该消费者将使用默认设置。
2.15. 使用基于 Kafka Streams 的绑定器和常规 Kafka 绑定器进行多重绑定
你可以在一个应用程序中同时拥有基于常规Kafka绑定器的功能/消费者/提供商和基于Kafka Streams的处理器。 然而,你不能在同一功能或消费者中混合使用这两种方式。
此处是一个示例,其中在同一应用程序中同时存在基于绑定的组件。
@Bean
public Function<String, String> process() {
return s -> s;
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {
return input -> input;
}
这是配置的相关部分。
spring.cloud.stream.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
如果您的应用与上面的示例相似,但同时处理两个不同的Kafka集群,则事情会变得更加复杂,例如,process同时连接到Kafka集群1和集群2(从集群-1接收数据并发送到集群-2),而Kafka流处理器仅连接到Kafka集群2。
这时,您需要使用Spring Cloud Stream提供的多绑定器功能。
这是您在该场景中可能需要更改的配置方式。
# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2
# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
注意以上配置。我们有两种类型的绑定器,但实际上有 3 种绑定器,第一个是基于群集 1(kafka1)的常规 Kafka 绑定器,然后是基于群集 2(kafka2)的另一个 Kafka 绑定器,以及最后一个(kstream)(kafka3)。 第一个处理器从 kafka1 接收数据并发布到 kafka2,其中这两个绑定器都基于常规 Kafka 绑定器但不同的群集。 第二个处理器是一个 Kafka 流处理器,它从 kafka3 消费数据,该组与 kafka2 同一集群,但不同类型的绑定器。
因为Kafka Streams绑定器家族中有三种不同的绑定器类型可用——kstream、ktable和globalktable——如果您的应用基于这些绑定器中的任何一种有多个绑定,那么需要显式地提供作为绑定器类型。
对于例如,如果你有一个处理器如下,
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
...
}
然后,在多绑定程序的情况下,必须按如下所示进行配置。 请注意,只有在有真正的多绑定程序场景中,即在一个应用程序中有多个处理器处理多个群集时,才需要这样做。 在这种情况下,需要显式地向绑定器提供绑定,以区分其他处理器绑定器类型和群集。
spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream
# rest of the configuration is omitted.
2.16. 状态清理
默认情况下,停止绑定时会调用Kafkastreams.cleanup()方法。参见Spring Kafka文档。要修改此行为,请简单地向应用上下文添加一个CleanupConfig@Bean(配置为在启动、停止或两者都不清理),该bean将被检测并连接到工厂bean。
2.17. Kafka Streams 拓扑可视化
Kafka Streams绑定器提供了以下用于检索拓扑描述的执行器端点,您可以使用外部工具对其进行可视化。
/actuator/topology
/actuator/topology/<applicaiton-id of the processor>
您需要包含来自 Spring Boot 的Starters和 Web 依赖项才能访问这些端点。另外,您还需要添加topology到management.endpoints.web.exposure.include属性。默认情况下,topology端点是禁用的。
2.18. 配置选项
本节包含Kafka Streams绑定器使用的配置选项。
有关绑定器的常见配置选项和属性,请参阅核心文档。
2.18.1. Kafka Streams Binder 属性
以下属性可在绑定程序级别使用,并且必须加上前缀spring.cloud.stream.kafka.streams.binder.
- 配置
-
包含与 Apache Kafka Streams API 相关的属性键/值对映射。此属性必须使用
spring.cloud.stream.kafka.streams.binder.前缀。以下是一些使用此属性的示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有关流配置中可能包含的所有属性的更多信息,请参阅Apache Kafka Streams文档中的StreamsConfig JavaDoc。所有可以从StreamsConfig设置的配置都可以通过这种方式进行设置。使用此属性时,由于这是绑定器级别的属性,因此适用于整个应用程序。如果您的应用程序中有多个处理器,则它们都将获取这些属性。对于像application.id这样的属性,这将成为一个问题,因此您必须仔细检查如何使用这个绑定器级别的configuration属性来映射StreamsConfig的属性。
- functions.<function-bean-name>.applicationId
-
仅适用于函数式处理器。 可以用于为应用程序中的每个功能设置应用ID。 在有多个功能的情况下,这是一种方便的方式来设置应用ID。
- functions.<function-bean-name>.configuration
-
仅适用于函数式风格处理器。
包含与Apache Kafka Streams API相关的属性的键/值对映射。
这类似于上述描述的绑定器级别的configuration属性,但这个configuration级别的属性仅限于命名函数使用。
当您有多个处理器并且希望根据特定函数限制配置访问时,可能需要使用此功能。
所有StreamsConfig属性都可以在此处使用。 - 经纪人
-
代理网址
默认值:
localhost - zk节点
-
Zookeeper 地址
默认值:
localhost - 反序列化异常处理程序
-
反序列化错误处理器类型。
此处理器应用于绑定器级别,因此对应用程序中的所有输入绑定应用。
在消费者绑定级别可以更细致地控制它。
可能的值为 -logAndContinue、logAndFail或sendToDlq默认值:
logAndFail - applicationId
-
在绑定器级别全局设置 Kafka Streams 应用程序的应用程序 ID 的便捷方法。
如果应用程序包含多个函数或StreamListener方法,则应以不同方式设置应用程序 ID。
参见上文,其中详细讨论了设置应用程序 ID 的问题。默认情况下,应用程序会生成一个静态的应用程序ID。有关更多详细信息,请参阅应用程序ID部分。
- stateStoreRetry.maxAttempts
-
尝试连接到状态存储的最大重试次数。
默认值:1
- stateStoreRetry.backoffPeriod
-
重试时尝试连接到状态存储的退避周期。
默认值:1000 毫秒
2.18.2. Kafka Streams 生产者属性
以下属性仅适用于Kafka Streams生产者,且必须使用spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.作为前缀。
为了方便起见,如果存在多个输出绑定并且它们都需要一个公共值,则可以使用前缀spring.cloud.stream.kafka.streams.default.producer.进行配置。
- 密钥序列化器
-
键序列化器/反序列化器
默认:参见上述关于消息序列化/反序列化的讨论
- 值序列化器
-
序列化/反序列化器
默认:参见上述关于消息序列化/反序列化的讨论
- 使用原生编码
-
启用/禁用原生编码的标志
默认值:
true。
streamPartitionerBeanName:
自定义出站分区程序的bean名称,将在消费者处使用。
应用程序可以提供自定义 StreamPartitioner 作为Spring bean,并且可以将此bean的名称提供给生产者,以便替代默认值使用。
默认值:请参阅上述关于传出分区支持的讨论。
2.18.3. Kafka Streams 消费者属性
以下属性可用于Kafka Streams消费者,并且必须使用spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.作为前缀。为了方便起见,如果存在多个输入绑定并且它们都需要一个公共值,则可以使用前缀spring.cloud.stream.kafka.streams.default.consumer.进行配置。
- applicationId
-
根据输入绑定设置 application.id。这仅适用于基于
StreamListener的处理器,对于函数式处理器,请参阅上述其他方法。默认值:见上文。
- 密钥序列化器
-
键序列化器/反序列化器
默认:参见上述关于消息序列化/反序列化的讨论
- 值序列化器
-
序列化/反序列化器
默认:参见上述关于消息序列化/反序列化的讨论
- 物化为
-
使用传入的 KTable 类型时,用于生成状态存储
默认值:
none。 - 使用原生解码
-
启用/禁用原生解码的标志
默认值:
true。 - 死信队列名称
-
死信队列主题名称。
默定:请参考上面错误处理和DLQ的讨论。
- startOffset
-
如果消费者没有已提交的偏移量可以从中消费,则开始的位置偏移。 这主要用于当消费者第一次从主题中消费时的情况。 Kafka Streams 使用
earliest作为默认策略,绑定器也使用相同的默认值。 可以通过此属性将其覆盖为latest。默认值:
earliest。
注:在消费者中使用 resetOffsets 对 Kafka Streams 绑定器没有任何影响。
与基于消息通道的绑定器不同,Kafka Streams 绑定器不会根据需求寻求开始或结束位置。
- 反序列化异常处理程序
-
反序列化错误处理器类型。
此处理程序按每个消费者绑定应用,而不是前面所述的绑定器级别属性。
可能的值为 -logAndContinue、logAndFail或sendToDlq默认值:
logAndFail - 时间戳提取器Bean名称
-
消费者使用的特定时间戳提取器的bean名称。
应用程序可以提供TimestampExtractor作为Spring bean,并且此bean的名称可以提供给消费者,以代替默认值使用。默认值:参见上文关于时间戳提取器的讨论。
2.18.4. 并发的特殊注意事项
在 Kafka Streams 中,您可以使用 num.stream.threads 属性来控制处理器可以创建的线程数量。
您可以通过上述绑定器、函数、生产者或消费者级别的各种 configuration 选项实现此操作。
您还可以使用核心 Spring Cloud Stream 提供的 concurrency 属性来实现此目的。
使用时需要将其设置在消费者上。
当一个函数或 StreamListener 中有多个输入绑定时,请将此属性设置在第一个输入绑定上。
例如,当设置 spring.cloud.stream.bindings.process-in-0.consumer.concurrency 时,绑定器会将其转换为 num.stream.threads。