Apache Kafka Binder
参考指南
本指南介绍了 Apache Kafka 实现的 Spring Cloud Stream Binder。 它包含了其设计、使用和配置选项的信息,以及 Stream Cloud Stream 概念如何映射到 Apache Kafka 的特定结构。 此外,本指南还解释了 Spring Cloud Streams 的 Kafka Streams 绑定功能。
1. Apache Kafka Binder
1.1. 使用情况
要使用 Apache Kafka 绑定器,你需要添加春-云-溪-绑定-卡夫卡作为对Spring Cloud Stream应用的依赖,如下一个Maven示例所示:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
或者,你也可以使用Spring Cloud Stream KafkaStarters,如下例所示的Maven:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
1.2. 概述
下图展示了Apache Kafka绑定器的简化工作原理:
Apache Kafka Binder 实现会将每个目的地映射到 Apache Kafka 主题。 消费者群体直接对应到同样的Apache Kafka概念。 分区还直接映射到 Apache Kafka 分区。
该活页夹目前使用的是Apache Kafka卡夫卡客户端版本3.1.0.
该客户端可以与较旧的代理商通信(参见Kafka文档),但某些功能可能不可用。
例如,0.11.x.x 之前的版本不支持原生头部。
此外,0.11.x.x 不支持自动添加分区财产。
1.3. 配置选项
本节包含Apache Kafka绑定器所使用的配置选项。
关于绑定器的常见配置选项和属性,请参见核心文档中的绑定属性。
1.3.1. 卡夫卡绑订器属性
- spring.cloud.stream.kafka.binder.brokers
-
卡夫卡装订器连接的经纪人列表。
违约:
本地主持. - spring.cloud.stream.kafka.binder.defaultBrokerPort
-
经纪人允许指定带有或不包含端口信息的主机(例如,主持1,主持2:端口2). 当经纪商列表中未配置端口时,该程序设置默认端口。违约:
9092. - Spring.cloud.stream.kafka.binder.configuration
-
客户端属性(生产者和消费者)的键值映射传递给所有由绑定器创建的客户端。 由于这些属性被生产者和消费者共同使用,使用应限制在常见属性上——例如安全设置。 通过该配置提供的未知Kafka生产者或消费者属性会被过滤掉,不允许传播。 这里的属性优先于boot中设置的任何属性。
默认:空白地图。
- spring.cloud.stream.kafka.binder.consumerProperties
-
任意 Kafka 客户端消费者属性的关键/值映射。 除了支持已知的卡夫卡消费者属性外,这里也允许使用未知的消费者属性。 这里的属性优先于boot中设置的所有属性,并且在
配置上方的财产。默认:空白地图。
- spring.cloud.stream.kafka.binder.headers
-
由活页夹传输的自定义头部列表。 仅在与较旧的应用程序(⇐ 1.3.x)通信时才需要
卡夫卡客户端版本< 0.11.0.0。新版本原生支持头部。默认:空。
- spring.cloud.stream.kafka.binder.healthTimeout
-
等待获取分区信息的时间只需几秒。 如果计时器到期,健康状况显示下降。
默认值:10。
- spring.cloud.stream.kafka.binder.requiredAcks
-
经纪人要求的按键数。 请参见卡夫卡关于制片人的文档
补丁财产。违约:
1. - spring.cloud.stream.kafka.binder.minPartitionCount
-
仅在
自动创建主题或自动添加分区已设定。 装订器在其产生或消耗数据的主题上配置的全局最小分区数。 它可以被partitionCount由生产者设定,或以实例计数 * 并发制作者的设置(如果其中一个更大)。违约:
1. - spring.cloud.stream.kafka.binder.producerProperties
-
任意 Kafka 客户端生产者属性的键值映射。 除了支持已知的卡夫卡制片人作品外,这里也允许使用未知制作人的作品。 这里的属性优先于boot中设置的所有属性,并且在
配置上方的财产。默认:空白地图。
- spring.cloud.stream.kafka.binder.replicationFactor
-
自动创建主题的复制因子如果
自动创建主题活跃。 每个绑定都可以被覆盖。如果你使用的是 Kafka 2.4 之前的 Broker 版本,那么这个值至少应该设置为 1. 从3.0.8版本开始,绑定器使用-1作为默认值,这表明代理的“default.replication.factor”属性将用于确定副本数量。 请咨询你的Kafka经纪商管理员,看看是否有政策要求最低复制因子,如果是这样,通常default.replication.factor将匹配该值,且-1应该使用,除非你需要比最小值更高的复制因子。违约:
-1. - spring.cloud.stream.kafka.binder.autoCreateTopics
-
如果设置为
true,活页夹会自动创建新的主题。 如果设置为false,活页夹依赖于主题已经配置好。 在后一种情况下,如果不存在这些主题,活页夹将无法开始。该设置与 auto.create.topics.enable经纪人设定,不会影响其设置。 如果服务器设置为自动创建主题,这些主题可能会作为元数据检索请求的一部分创建,且默认为代理设置。违约:
true. - spring.cloud.stream.kafka.binder.autoAddPartitions
-
如果设置为
true,如果需要,绑页器会创建新的分区。 如果设置为false,该绑定器依赖于已配置的主题分区大小。 如果目标主题的分区计数小于预期值,绑定器将无法启动。违约:
false. - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
使活页夹中的交易能够实现。看
transaction.id在卡夫卡文献和会议记录中春-卡夫卡文档。 当交易被启用时,个人制作人性质被忽略,所有生产者都使用spring.cloud.stream.kafka.binder.transaction.producer.*性能。默认值
零(无交易) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
交易性活页夹中的全局生产者属性。 看
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix以及卡夫卡制片人和所有装订商支持的综合制片人产。默认:查看各个生产者属性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
豆子的名称
KafkaHeaderMapper用于制图春季消息卡夫卡头部的标题和字幕之间。 例如,如果你想自定义受信任的包,可以使用这个BinderHeaderMapper使用JSON反序列化来生成头部的豆子。 如果这是习俗BinderHeaderMapper使用该属性时,绑订器不会提供 Bean 的可用性,然后 Binder 会寻找带有名称的头部映射 BeankafkaBinderHeaderMapper该类型为BinderHeaderMapper然后又回到默认状态BinderHeaderMapper由活页夹创造。默认:无。
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
-
用标志将活页夹的健康值设置为
下当该主题上的任何分区,无论接收数据的消费者如何,都没有引导者。违约:
false. - spring.cloud.stream.kafka.binder.certificateStoreDirectory
-
当信任存储或密钥存储证书位置作为非本地文件系统资源(由 org.springframework.core.io.Resource 支持的资源,如 CLASSPATH、HTTP 等)提供时, 绑定器将资源从路径(可转换为 org.springframework.core.io.Resource)复制到文件系统中的某个位置。 这对两个经纪人级证书都适用(
SSL.truststore.location和ssl.keystore.location)以及用于模式注册表的证书(schema.registry.ssl.truststore.location和schema.registry.ssl.keystore.location). 请记住,truststore和keystore的位置路径必须在以下以下提供。spring.cloud.stream.kafka.binder.configuration.... 例如spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location,Spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location等。 文件将被复制到指定为该属性值的位置,该位置必须是文件系统中已有的目录,且该目录可被运行该应用程序的进程写入。 如果该值未被设置且证书文件是非本地文件系统资源,则将被复制到系统临时目录,返回如下System.getProperty(“java.io.tmpdir”). 如果该值存在,但该目录在文件系统中找不到或不可写,也同样适用。默认:无。
- spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled
-
当设置为true时,每个消费者主题的偏移滞后度量在访问该指标时计算出来。 当设置为false时,只使用周期计算的偏移延迟。
默认:真
- spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval
-
计算每个消费者主题的偏移滞后的时间间隔。 该值用于
metrics.defaultOffsetLagMetricsEnabled是被禁用的,或者它的 计算太慢了。默认时间:60秒
- spring.cloud.stream.kafka.binder.enableObservation
-
启用该绑定器中所有绑定的微米观察登记。
默认:false
1.3.2. 卡夫卡消费者地产
为避免重复,Spring Cloud Stream 支持为所有频道设置数值,格式为spring.cloud.stream.kafka.default.consumer.<property>=<value>. |
以下房产仅供卡夫卡消费者使用,
必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer..
- admin.configuration
-
自2.1.1版本起,该属性被弃用,取而代之的是
topic.properties,未来版本中将取消对该支持。 - admin.replicas-assignment
-
自2.1.1版本起,该属性被弃用,取而代之的是
topic.replicas-assignment,未来版本中将取消对该支持。 - admin.replication-factor
-
自2.1.1版本起,该属性被弃用,取而代之的是
主题。复制因子,未来版本中将取消对该支持。 - 自动重新平衡启用
-
什么时候
true话题分区会自动在消费者组成员之间重新平衡。 什么时候false,每个消费者会被分配一组固定的分区,基于spring.cloud.stream.instanceCount和spring.cloud.stream.instanceIndex. 这需要spring.cloud.stream.instanceCount和spring.cloud.stream.instanceIndex每个启动实例都必须正确设置属性。 该spring.cloud.stream.instanceCount在这种情况下,财产通常必须大于1。违约:
true. - AckEach记录
-
什么时候
autoCommitOffset是true该设置决定了每条记录处理后是否提交偏移量。 默认情况下,偏移量在返回的记录批次中的所有记录之后提交consumer.poll()已经处理完毕。 轮询返回的记录数量可以通过以下方式控制Max.poll.recordsKafka 属性,由消费者设定配置财产。 将此设置为true这可能导致性能下降,但这会降低故障发生时记录被重新交付的可能性。 另外,看看活页夹requiredAcks属性,也影响提交偏移量的性能。 该特性自3.1版本起被弃用,改为使用ack模式. 如果ack模式未设置且批处理模式未启用,记录将使用ackMode。违约:
false. - autoCommitOffset
-
从3.1版本开始,该属性被弃用。 看
ack模式关于替代方案的更多细节。 是否在消息处理完毕后自动提交偏移量。 如果设置为false,一个带有密钥的头部kafka_acknowledgment该类型org.springframework.kafka.support.Acknowledgedment入站消息中包含了 头部。 应用程序可以使用该头来确认消息。 详情请参见示例部分。 当该属性被设置为false,Kafka 绑定器将 ack 模式设置为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL应用程序负责确认记录。 另见AckEach记录.违约:
true. - ack模式
-
指定容器的 ack 模式。 这基于 Spring Kafka 定义的 AckMode 枚举。 如果
AckEach记录属性设置为true且消费者未处于批处理模式,则将使用ACK模式记录否则,使用该属性提供的 ack 模式。 - autoCommitOnError
-
在可轮询的消费者中,如果设置为
true,它总是自动提交错误。 如果未设置(默认)或为假,则不会在可轮询的消费者中自动提交。 请注意,这一特性仅适用于可投票消费者。默认:未设置。
- resetOffsets
-
是否将消费者的偏移量重置为startOffset提供的值。 如果 是
KafkaBindingRebalanceListener提供;参见使用 KafkaBindingRebalance Listener。 有关该属性的更多信息,请参见“重置偏移”。违约:
false. - startOffset
-
新组的起始偏移。 允许的数值:
最早和最近的. 如果消费者组被明确设置为消费者“绑定”(通过spring.cloud.stream.bindings.<channelName>.group),'startOffset' 被设置为最早.否则,它被设置为最近的对于匿名消费者集团。 有关该属性的更多信息,请参见“重置偏移”。默认值:空(等价于
最早). - enableDlq
-
当设置为true时,它能为消费者实现DLQ行为。 默认情况下,导致错误的消息会被转发到名为
error.<destination>.<group>. DLQ主题名称可以通过设置dlqName性质或定义@Bean类型DlqDestinationResolver. 这为错误数量较少且重放整个原始主题过于繁琐的情况提供了替代卡夫卡回放场景的替代选择。 更多信息请参见“死字母主题处理处理”。 从2.0版本开始,发送到DLQ主题的消息增加了以下头部:x-原始主题,x-异常-消息和x-exception-栈追踪如字节[]. 默认情况下,失败记录会被发送到与原始记录相同的分区号。 关于如何改变该行为,请参见死字母主题分区选择。不允许目的地是模式是true.违约:
false. - dlqPartitions
-
什么时候
enableDlq成立且该属性未被设置,因此创建了一个与主主题相同分区数的死字母主题。 通常,死信记录会发送到与原始记录相同的死信主题分区。 这种行为是可以改变的;参见死字母主题划分选择。 如果该性质被设置为1而且没有DqlPartitionFunctionBean,所有死信记录都会写入分区0. 如果该性质大于1你必须提供一个DlqPartitionFunction豆。 注意实际分区计数受绑定器最小分区计数财产。违约:
没有 - 配置
-
映射为包含通用Kafka消费者属性的键值对。 除了具有 Kafka 的消费者属性外,还可以传递其他配置属性。 例如,应用程序需要的一些属性,例如
Spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar. 这bootstrap.servers这里不能设置属性;如果需要连接多个集群,使用多绑定器支持。默认:空白地图。
- dlqName
-
DLQ主题名称用于接收错误信息。
默认:空(如果未指定,导致错误的消息会转发到名为
error.<destination>.<group>). - dlqProducerProperties(开发者属性)
-
利用此,可以设置DLQ专属的生产者属性。 通过Kafka制作者属性可以设置所有属性。 当消费者启用原生解码(即 useNativeDecoding: true)时,应用程序必须为 DLQ 提供对应的键值串行器。 这必须以以下形式提供
dlqProducerProperties.configuration.key.serializer和dlqProducerProperties.configuration.value.serializer.默认:默认Kafka制作人属性。
- 标准头部
-
表示入站通道适配器填充的标准头部。 允许的数值:
没有,身份证,时间戳或双. 如果使用原生反序列化,且第一个接收消息的组件需要身份证(例如配置为使用 JDBC 消息存储的聚合器)。违约:
没有 - converterBeanName
-
一种实现
记录消息转换器.用于入站通道适配器,以替代默认接口消息信息转换器.违约:
零 - idleEventInterval
-
以毫秒为单位,表示最近未收到任何消息的事件间隔。 使用一个
ApplicationListener<ListenerContainerIdleEvent>接收这些事件。 请参见示例:暂停和恢复消费者的使用示例。违约:
30000 - 目的地是模式
-
当为真时,目标被视为正则表达式
模式经纪人用来匹配主题名称。 当属实时,主题不会被提供,且enableDlq不允许,因为活页夹在配置阶段不知道主题名称。 注意,检测与模式相符的新主题所花费的时间由消费者属性控制metadata.max.age.ms,在撰写时默认为300,000毫秒(5分钟)。 这可以通过以下方式进行配置配置上方的财产。违约:
false - topic.properties
-
一个
地图在为新主题提供时使用的Kafka主题属性——例如,spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0默认:无。
- topic.replicas-assignment
-
一个映射</整数、列表<整数>>副本赋值,键为分区,值为赋值。 用于配置新主题。 参见
新主题Javadocs 在卡夫卡客户端罐。默认:无。
- 主题。复制因子
-
在配置主题时,复制因素。覆盖整个活页夹的设置。 如果忽略
副本-赋值存在。默认值:无(使用整个绑定器默认的-1)。
- 投票时间
-
超时用于在可投票消费者中进行民调。
默认时间:5秒。
- transactionManager
-
Beans名称
KafkaAwareTransactionManager用于覆盖绑定器的交易管理器。 通常如果你想用 Kafka 交易同步另一笔交易,使用ChainedKafkaTransactionManaager. 为了实现记录的精确一次消费和生产,消费者和生产者绑定必须配置为同一个事务管理器。默认:无。
- txCommitRecovered
-
使用事务绑定器时,恢复记录的偏移量(例如重试用尽且记录被发送到死符主题时)默认通过新事务提交。 将该属性设置为
false抑制了对恢复记录的偏移量的提交。默认:真。
- commonErrorHandlerBeanName
-
通用错误处理器每个消费者绑定时使用的豆子名称。 在场时,该用户提供通用错误处理器优先于绑定器定义的其他错误处理程序。 如果应用程序不想使用ListenerContainerCustomizer然后检查目标/组的组合来设置错误处理程序。默认:无。
1.3.3. 重置偏移量
应用程序启动时,每个分配分区的初始位置取决于两个属性startOffset和resetOffsets.
如果resetOffsets是false,普通卡夫卡用户自动.offset.reset语义问题是必要的。
即如果绑定的消费者组没有承诺的分区偏移量,则位置为最早或最近的.
默认情况下,带有显式群用最早,以及匿名绑定(没有群) 的使用最近的.
这些默认值可以通过设置startOffset具有约束力的属性。
第一次以特定绑定开始时,不会有承诺的偏移量群.
另一个不存在已承诺偏移量的条件是偏移量已过期。
现代经纪人(自2.1版本起)和默认经纪人属性中,抵消在最后一位成员离开组后7天到期。
参见偏移。留任。分钟数更多信息请咨询经纪房产。
什么时候resetOffsets是true,绑定器应用的语义类似于当中介没有承诺抵消时的语义,仿佛该约束从未从主题中消耗过;即当前承诺的偏移量被忽略。
以下是两种可能使用的用例。
-
从包含键值对的压缩主题中消费。 设置
resetOffsets自true和startOffset自最早;绑定将执行开始在所有新分配的分区上。 -
从包含事件的主题中消费,你只对绑定运行期间发生的事件感兴趣。 设置
resetOffsets自true和startOffset自最近的;绑定将执行seekToEnd在所有新分配的分区上。
| 如果在初始分配后发生重新平衡,寻道任务只会对未在初始分配时分配的新分区进行。 |
关于主题偏移的更多控制,请参见使用 KafkaBindingRebalanceListener;当提供听者时,resetOffsets不应设置为true否则,将引发错误。
1.3.4. 消耗批次
从3.0版本开始,当spring.cloud.stream.bindings.<name>.consumer.batch-mode设置为true,所有通过投票卡夫卡获得的记录消费者将以名单<?>转向听者法。
否则,方法将一次调用一条记录。
批次的规模由Kafka消费者属性控制Max.poll.records,fetch.min.bytes,fetch.max.wait.ms;更多信息请参阅卡夫卡文献。
从版本开始4.0.2,该绑定器在批量处理时支持 DLQ 功能。
请记住,在批量处理消费者绑定时,之前轮询收到的所有记录都会被传递到DLQ主题。
使用批量模式时不支持在绑定器内重试,所以最大尝试次数将被覆盖为1。
你可以配置一个默认错误处理(使用ListenerContainerCustomizer)以实现类似的功能,以便在活页夹中重试。
你也可以用手动说明书AckMode并呼叫Ackowledgment.nack(索引,睡眠)提交部分批次的偏移量,并重新交付剩余记录。
有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档。 |
1.3.5. 卡夫卡制片人
为避免重复,Spring Cloud Stream 支持为所有频道设置数值,格式为spring.cloud.stream.kafka.default.producer.<property>=<value>. |
以下作品仅供卡夫卡制作人使用,
必须以spring.cloud.stream.kafka.bindings.<channelName>.producer..
- admin.configuration
-
自2.1.1版本起,该属性被弃用,取而代之的是
topic.properties,未来版本中将取消对该支持。 - admin.replicas-assignment
-
自2.1.1版本起,该属性被弃用,取而代之的是
topic.replicas-assignment,未来版本中将取消对该支持。 - admin.replication-factor
-
自2.1.1版本起,该属性被弃用,取而代之的是
主题。复制因子,未来版本中将取消对该支持。 - 缓冲区大小
-
以字节计,卡夫卡制作者在发送前尝试批量处理的数据量上限。
违约:
16384. - 同步
-
制作人是否同步。
违约:
false. - sendTimeoutExpression
-
针对外发消息计算的SpEL表达式,用于评估启用同步发布时等待ack的时间——例如,
标题['mySendTimeout']. 超时值以毫秒为单位。 在3.0之前的版本中,除非使用本地编码,否则无法使用有效载荷,因为在该表达式被评估时,有效载荷已经以字节[]. 现在,在有效载荷转换之前,先计算该表达式。违约:
没有. - batchTimeout
-
生产者等待多长时间,允许同一批中更多消息积累后再发送消息。 (通常,生产者根本不等待,直接发送上一次发送过程中累积的所有消息。)非零值可能会以延迟为代价提高吞吐量。
违约:
0. - messageKeyExpression
-
一个针对用于填充生成卡夫卡消息密钥的外发消息进行评估的 SpEL 表达式——例如,
头部['myKey']. 在3.0之前的版本中,除非使用本地编码,否则无法使用有效载荷,因为在该表达式被评估时,有效载荷已经以字节[]. 现在,在有效载荷转换之前,先计算该表达式。 对于普通处理器(函数<字符串,字符串>或Function<Message<?>,Message<?>),如果产生的密钥需要与主题的输入密钥相同,则该属性可设置如下。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']对于反应性函数,有一个重要的注意事项需要注意。 在这种情况下,应用程序需要手动将收到消息的头部复制到发出消息。 你可以设置头部,例如:我的钥匙以及使用头部['myKey']如上所述,或者为方便起见,简单地设置KafkaHeaders.MESSAGE_KEY你根本不需要设置这个属性。违约:
没有. - 头部模式
-
一个逗号分隔的简单模式列表,用于匹配 Spring 消息头,映射到 Kafka
头在制作人唱片. 模式可以以万用符(星号)开始或结束。 模式可以通过前缀 来否定!. 匹配在第一次匹配后停止(正或负)。 例如!问,像*会过去的灰但又不是问.身份证和时间戳从未被映射。默认:(所有头部 - 除了
*身份证和时间戳) - 配置
-
映射中包含包含通用Kafka制作者属性的键值对。 这
bootstrap.servers这里不能设置属性;如果需要连接多个集群,使用多绑定器支持。默认:空白地图。
- topic.properties
-
一个
地图在为新主题提供时使用的Kafka主题属性——例如,spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0 - topic.replicas-assignment
-
一个映射</整数、列表<整数>>副本赋值,键为分区,值为赋值。 用于配置新主题。 参见
新主题Javadocs 在卡夫卡客户端罐。默认:无。
- 主题。复制因子
-
在配置主题时,复制因素。覆盖整个活页夹的设置。 如果忽略
副本-赋值存在。默认值:无(使用整个绑定器默认的-1)。
- useTopicHeader
-
设置为
true用 的值覆盖默认绑定目的地(主题名)卡夫卡标题。主题消息头在外发消息中。 如果没有头部,则使用默认绑定目的地。违约:
false. - 记录元数据通道
-
豆子的名称
消息频道成功发送结果应发送至该平台;豆子必须存在于应用上下文中。 发送到信道的消息是发送消息(转换后,如果有),并加一个额外的头部KafkaHeaders.RECORD_METADATA. 头部包含记录元数据由卡夫卡客户端提供的对象;它包括记录在主题中写入的分区和偏移量。ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)发送失败则进入制作人错误通道(如果已配置);参见错误信道。
默认:无。
卡夫卡活页夹使用了partitionCount将生产者设置为提示,以创建具有给定分区计数的主题(配合最小分区计数,两者中的最大值即为所用值)。
配置两者时请谨慎最小分区计数对于一个活接器和partitionCount对于某个应用,因为使用较大的值。
如果已有一个分区数较小的主题,且自动添加分区被禁用(默认),绑定器无法启动。
如果已有一个分区数较小的主题,且自动添加分区启用后,会添加新的分区。
如果一个主题已经存在,其分区数超过最大值 (最小分区计数或partitionCount),使用现有的分区计数。 |
- 压缩
-
设置
压缩类型生产者财产。 支持的值有没有,吉普,活泼,LZ4和ZSTD. 如果你覆盖了卡夫卡客户端jar 到 2.1.0(或更高版本),如 Apache Kafka 春季文档中讨论的,并希望使用ZSTD压缩与使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd.违约:
没有. - transactionManager
-
Beans名称
KafkaAwareTransactionManager用于覆盖绑定器的交易管理器。 通常如果你想用 Kafka 交易同步另一笔交易,使用ChainedKafkaTransactionManaager. 为了实现记录的精确一次消费和生产,消费者和生产者绑定必须配置为同一个事务管理器。默认:无。
- 结束时间
-
关闭生产者时,等待的秒数是超时。
违约:
30 - 允许非交易
-
通常,所有与事务绑定器关联的输出绑定都会发布在新的事务中,前提是该事务尚未在处理中。 这个属性允许你覆盖该行为。 如果设置为 true,发布到该输出绑定的记录不会在事务中运行,除非事务已经处于进程中。
违约:
false
1.3.6. 使用示例
本节展示了上述属性在特定场景中的应用。
示例:设定ack模式自手动以及依赖人工确认
这个例子说明了在消费者应用中如何手动确认偏移。
该示例要求spring.cloud.stream.kafka.bindings.input.consumer.ackMode设置为手动.
用对应的输入通道名称来描述你的例子。
@SpringBootApplication
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@Bean
public Consumer<Message<?>> process() {
return message -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
};
}
示例:安全配置
Apache Kafka 0.9 支持客户端与代理之间的安全连接。
要利用此功能,请遵循Apache Kafka文档中的指南以及Confluent文档中的Kafka 0.9安全指南。
使用该Spring.cloud.stream.kafka.binder.configuration为绑定器创建的所有客户端设置安全属性的选项。
例如,设security.protocol自SASL_SSL,设以下性质:
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
其他所有安全属性也可以以类似方式设置。
使用Kerberos时,请按照参考文档中的说明创建和引用JAAS配置。
Spring Cloud Stream 支持通过使用 JAAS 配置文件和 Spring Boot 属性向应用程序传递 JAAS 配置信息。
使用 JAAS 配置文件
通过系统属性,可以为 Spring Cloud Stream 应用设置 JAAS 和(可选)krb5 文件位置。 以下示例展示了如何通过使用 JAAS 配置文件启动 SASL 和 Kerberos 的 Spring Cloud Stream 应用:
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot 属性
作为使用 JAAS 配置文件的替代方案,Spring Cloud Stream 提供了一种机制,通过使用 Spring Boot 属性来设置 Spring Cloud Stream 应用的 JAAS 配置。
以下属性可用于配置 Kafka 客户端的登录上下文:
- spring.cloud.stream.kafka.binder.jaas.loginModule
-
登录模块名称。在正常情况下,不一定非得设置。
违约:
com.sun.security.auth.module.Krb5LoginModule. - spring.cloud.stream.kafka.binder.jaas.controlFlag
-
登录模块的控制标志。
违约:
必填. - spring.cloud.stream.kafka.binder.jaas.options
-
映射中包含登录模块选项的键值对。
默认:空白地图。
以下示例展示了如何利用 Spring Boot 配置属性启动 SASL 和 Kerberos 的 Spring Cloud Stream 应用:
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM
上述示例相当于以下JAAS文件:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_client.keytab"
principal="[email protected]";
};
如果经纪人中已有所需的主题或管理员将创建,可以关闭自动创建,只需发送客户端的JAAS属性。
不要在同一应用程序中混合使用JAAS配置文件和Spring Boot属性。
如果-Djava.security.auth.login.configsystem 属性已经存在,Spring Cloud Stream 忽略了 Spring Boot 属性。 |
使用时要小心自动创建主题和自动添加分区与克尔伯洛斯。
通常,应用程序可以使用在 Kafka 和 Zookeeper 中没有管理权限的 Principal。
因此,依赖 Spring Cloud Stream 来创建或修改主题可能会失败。
在安全环境中,我们强烈建议通过使用 Kafka 工具创建主题并管理 ACL。 |
多束缚机配置与JAAS
当连接到多个集群时,每个集群都需要单独的 JAAS 配置,则使用以下属性设置sasl.jaas.config.
当该特性存在于应用中时,优先于上述其他策略。
详情请参见KIP-85。
例如,如果你的应用中有两个集群,且各自配置了不同的JAAS,那么你可以使用以下模板:
spring.cloud.stream:
binders:
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
kafka2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9093
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1-secret\";"
kafka.binder:
configuration:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
注意,卡夫卡星团和sasl.jaas.config上述配置中,每个的值都不同。
有关如何设置和运行此类应用的更多细节,请参见此示例应用。
示例:暂停并恢复消费者
如果你想暂停消费但不想导致分区重新平衡,可以暂停并恢复消费者。
这可以通过管理绑定生命周期实现,如 Spring Cloud Stream 文档中的绑定可视化和控制,使用状态。暂停和状态。恢复.
要继续,你可以使用ApplicationListener(或@EventListener方法)以接收ListenerContainerIdleEvent实例。
事件发布的频率由idleEventInterval财产。
1.4. 交易活页夹
通过设置来启用交易spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix变为非空值,例如:谢谢-.
在处理器应用中使用时,消费者开始交易;发送到消费者线程的任何记录都参与同一事务。
当监听器正常退出时,监听器容器会将偏移量发送到事务并提交。
所有生产者绑定都使用一个共同生产工厂,配置为spring.cloud.stream.kafka.binder.transaction.producer.*性能;忽略单个绑定卡夫卡生产者属性。
普通的活页夹重试(以及死字母)不支持交易,因为重试会在原始交易中运行,原始交易可能会被回滚,任何已发布的记录也会被回滚。
当重试被启用时(这是共同的特性)最大尝试次数大于 0)重试属性用于配置DefaultAfterRollback处理器以启用容器级别的重试。
同样,该功能不再在事务中发布死符记录,而是通过DefaultAfterRollback处理器该程序在主事务回滚后运行。 |
如果你想在源应用中使用事务,或者从某个任意线程中用于仅生产者事务(例如,@Scheduled方法),你必须获得交易生产者工厂的引用并定义一个KafkaTransactionManager豆子在用它。
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
注意我们通过装订工坊;用零在第一个论元中,当只有一个活页夹配置时,
如果配置了多个活页夹,使用活页夹名称来获取引用。
一旦我们有了对活写器的引用,就可以获得对生产工厂并创建事务管理器。
然后你会使用正常的Spring事务支持,例如:交易模板或@Transactional例如:
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
如果你想将仅生产者事务与其他事务管理器的事务同步,可以使用ChainedTransactionManager.
如果你部署多个应用程序实例,每个实例都需要一个唯一的实例transactionId前缀. |
1.5. 错误信道
从版本1.3开始,绑定器无条件向每个消费者目的地的错误通道发送异常,并且可以配置为向错误通道发送异步生产者发送失败。 更多信息请参见本节关于错误处理的部分。
有效载荷错误消息对于发送失败是KafkaSendFailureException其性质为:
-
失败消息:春季讯息留言<?>但那份文件未能发送。 -
记录:原始制作人唱片由失败消息
生产者异常(如发送至死信队列)没有自动处理。 你可以用自己的 Spring 集成流程来调用这些例外。
1.6. 卡夫卡度量
Kafka 活页夹模块揭示了以下指标:
spring.cloud.stream.binder.kafka.offset:该指标表示某一用户群体尚未从某一文件夹的主题中消费多少消息。
所提供的指标基于Micrometer库。
粘合剂创造了KafkaBinderMetrics如果Micrometer在类路径上,且应用程序没有提供其他类似的beans。
该指标包含消费者组信息、主题以及与主题最新偏移量的实际提交偏移延迟。
该指标对于为PaaS平台提供自动扩展反馈尤其有用。
度量收集行为可以通过在spring.cloud.stream.kafka.binder.metricsNamespace
更多信息请参阅卡夫卡活页夹属性部分。
你可以排除KafkaBinderMetrics从建立必要的基础设施如消费者,到通过在应用中提供以下组件来报告指标。
@Component
class NoOpBindingMeters {
NoOpBindingMeters(MeterRegistry registry) {
registry.config().meterFilter(
MeterFilter.denyNameStartsWith(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME));
}
}
关于如何选择性抑制电表的更多细节,请点击这里。
1.7. Tombstone Records(无记录值)
使用压缩主题时,记录具有零值(也称为墓碑记录)表示键的删除。
要在春云流函数中接收此类消息,可以使用以下策略。
@Bean
public Function<Message<Person>, String> myFunction() {
return value -> {
Object v = value.getPayload();
String className = v.getClass().getName();
if (className.isEqualTo("org.springframework.kafka.support.KafkaNull")) {
// this is a tombstone record
}
else {
// continue with processing
}
};
}
1.8. 使用 KafkaBindingRebalanceListener
应用程序可能希望在分配分区时寻找任意偏移的主题/分区,或对消费者执行其他作。
从2.1版本开始,如果你提供一个KafkaBindingRebalanceListener在应用环境中,它将被连接到所有 Kafka 消费者绑定中。
public interface KafkaBindingRebalanceListener {
/**
* Invoked by the container before any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
}
/**
* Invoked by the container after any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
/**
* Invoked when partitions are initially assigned or after a rebalance.
* Applications might only want to perform seek operations on an initial assignment.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
boolean initial) {
}
}
你不能设置resetOffsets消费者财产true当你提供一个重新平衡的听众时。
1.9. 重试与死信处理
默认情况下,当你配置重试(例如,maxAttemts)enableDlq在消费者绑定中,这些功能在绑定器内完成,监听器容器或 Kafka 消费者不参与。
在某些情况下,将此功能迁移到监听器容器会更为理想,例如:
-
重试和延误的总和将超过消费者
max.poll.interval.ms财产,可能导致分区重新平衡。 -
你想向另一个卡夫卡群发表死信。
-
你想向错误处理程序添加重试监听器。
-
…
要配置将此功能从绑定器迁移到容器,定义一个@Bean类型ListenerContainerWithDlqAndRetryCustomizer. 该接口具有以下方法:
/**
* Configure the container.
* @param container the container.
* @param destinationName the destination name.
* @param group the group.
* @param dlqDestinationResolver a destination resolver for the dead letter topic (if
* enableDlq).
* @param backOff the backOff using retry properties (if configured).
* @see #retryAndDlqInBinding(String, String)
*/
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff);
/**
* Return false to move retries and DLQ from the binding to a customized error handler
* using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
* configured via
* {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
* @param destinationName the destination name.
* @param group the group.
* @return false to disable retries and DLQ in the binding
*/
default boolean retryAndDlqInBinding(String destinationName, String group) {
return true;
}
目的解析器和退避如果配置了,则由绑定属性创建。 这卡夫卡模板使用来自的配置Spring。卡夫卡......性能。 你可以用这些工具创建自定义的错误处理程序和死符发布器; 例如:
@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
return new ListenerContainerWithDlqAndRetryCustomizer() {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff) {
if (destinationName.equals("topicWithLongTotalRetryConfig")) {
ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
dlqDestinationResolver);
container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
}
}
@Override
public boolean retryAndDlqInBinding(String destinationName, String group) {
return !destinationName.contains("topicWithLongTotalRetryConfig");
}
};
}
现在,只需一次重试延迟超过消费者的延迟max.poll.interval.ms财产。
在使用多个活页夹时,'ListenerContainerWithDlqAndRetryCustomizer' 豆子会被 'DefaultBinderFactory' 覆盖。对于豆子要应用,你需要使用“BinderCustomizer” 来设置容器自定义器(参见 [binder-customizer]):
@Bean
public BinderCustomizer binderCustomizer(ListenerContainerWithDlqAndRetryCustomizer containerCustomizer) {
return (binder, binderName) -> {
if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
kafkaMessageChannelBinder.setContainerCustomizer(containerCustomizer);
}
else if (binder instanceof KStreamBinder) {
...
}
else if (binder instanceof RabbitMessageChannelBinder) {
...
}
};
}
1.10. 定制消费者和生产者配置
如果你想要对消费者和生产者配置进行高级定制,这些配置用于创作消费者工厂和生产工厂在卡夫卡中,你可以实现以下自定义工具。
-
ConsumerConfigCustomizer
-
ProducerConfigCustomizer
这两个接口都提供了配置用于消费者和生产者属性的配置映射的方式。例如,如果你想访问在应用层定义的豆子,可以在配置方法。 当活页夹发现这些定制工具以豆子形式存在时,它会调用配置在建立消费和生产工厂之前。
这两个接口还提供绑定和目的名称的访问,便于在定制生产者和消费者属性时访问它们。
1.11. 定制管理员客户端配置
与上述消费者和生产者配置自定义一样,应用程序也可以通过提供AdminClientConfigCustomizer. AdminClientConfigCustomizer 的 configure 方法提供了访问管理员客户端属性的权限,你可以用它来定义进一步的自定义。Binder 的 Kafka 主题 provisioner 为通过该定制器提供的属性提供了最高优先级。这里是一个提供该自定义工具 bean 的示例。
@Bean
public AdminClientConfigCustomizer adminClientConfigCustomizer() {
return props -> {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
};
}
1.12. 定制卡夫卡活页夹健康指示器
当 Spring Boot 执行器位于类路径上时,Kafka 绑定器会激活默认健康指示器。该健康指示器检查绑定器的健康状况以及与 Kafka 代理之间的通信问题。如果应用程序希望禁用该默认健康检查实现并包含自定义实现,则可以提供卡夫卡·宾德健康接口。卡夫卡·宾德健康是一个从健康指标. 在自定义实现中,必须提供健康()方法。 自定义实现必须以豆子的形式存在于应用配置中。当绑定器发现自定义实现时,它会使用该实现代替默认实现。这里是一个应用程序中此类自定义实现豆的示例。
@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
return new KafkaBinderHealth() {
@Override
public Health health() {
// custom implementation details.
}
};
}
1.13. 死信话题处理
1.13.1. 死字主题划分选择
默认情况下,记录会发布到死信主题,使用与原始记录相同的分区。这意味着死信主题必须拥有与原始记录至少相同的分区数。
要改变这种行为,可以添加一个DlqPartitionFunction作为@Bean切换到应用上下文。只能存在一个这样的豆子。该函数由消费者组提供,失败消费者记录以及例外。例如,如果你总是想路由到分区0,你可以使用:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果你设置了消费者绑定dlqPartitions属性为1(以及绑定者的最小分区计数等于1),无需提供DlqPartitionFunction; 框架始终使用分区 0。如果你设置了消费者绑定dlqPartitions属性为大于1(或者说是活页夹的最小分区计数大于1),你必须提供一个DlqPartitionFunction豆子,即使分区计数和原主题相同。 |
也可以为DLQ主题定义自定义名称。
为此,创建一个实现DlqDestinationResolver作为@Bean切换到应用上下文。
当粘合剂检测到这样的豆子时,优先处理,否则它将使用dlqName财产。
如果这两个都找不到,则默认为error.<destination>.<group>.
这里有一个示例DlqDestinationResolver作为@Bean.
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
在为 提供实现时,有一点很重要要记住DlqDestinationResolver就是 Binder 中的 provisioner 不会自动为应用创建主题。
这是因为绑定器无法推断实现可能发送的所有DLQ主题名称。
因此,如果你用这种策略提供DLQ名称,应用程序有责任确保这些主题事先被创建。
1.13.2. 死字母主题中的记录处理
由于该框架无法预见用户如何处理死字母消息,因此没有提供任何标准处理机制。 如果死字母的原因是暂时的,你可能需要将消息路由回原主题。 但如果问题是永久性的,可能会导致无限循环。 本主题中的示例 Spring Boot 应用展示了如何将这些消息路由回原始主题,但尝试三次后会将其移至“停车场”主题。 该应用是另一个春云流应用,读取死字母主题。 当5秒内无消息时,它会退出。
示例假设原始目的地为SO8400 出而消费者组为SO8400.
有几种策略需要考虑:
-
考虑只在主应用程序未运行时运行重定向。 否则,瞬态错误的重试次数会很快被用尽。
-
或者,采用两阶段方法:用该应用路由到第三个主题,再用另一个应用从那里路由回主主题。
以下代码列表展示了示例应用:
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private StreamBridge streamBridge;
@Bean
public Function<Message<?>, Message<?>> reRoute() {
return failed -> {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, retries + 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
streamBridge.send("parkingLot", MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
};
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, exiting");
return;
}
}
}
}
1.14. 用卡夫卡装订器进行分区
Apache Kafka 原生支持主题分区。
有时将数据发送到特定分区是有利的——例如,当你想严格订购消息处理时(特定客户的所有消息都应发送到同一分区)。
以下示例展示了如何配置生产者和消费者端:
@SpringBootApplication
public class KafkaPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"foo1", "bar1", "qux1",
"foo2", "bar2", "qux2",
"foo3", "bar3", "qux3",
"foo4", "bar4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
.web(false)
.run(args);
}
@Bean
public Supplier<Message<?>> generate() {
return () -> {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
};
}
}
spring:
cloud:
stream:
bindings:
generate-out-0:
destination: partitioned.topic
producer:
partition-key-expression: headers['partitionKey']
partition-count: 12
需要注意的是,由于Apache Kafka原生支持分区,除非你使用了示例中的自定义分区键或涉及payload本身的表达式,否则无需依赖上述绑定器分区。
绑定器提供的分区选择本应适用于不支持原生分区的中间件技术。
注意我们使用的是一个名为partitionKey在上述例子中,该划分将成为划分的决定因素,因此在此情况下使用束缚器划分是合适的。
当使用原生Kafka分区时,即当你不提供分区键表达式然后Apache Kafka会选择一个分区,默认为记录键的哈希值除以可用分区数。
要向出站记录添加密钥,设置KafkaHeaders.KEY在 Spring-Messaging 中,将目标键值的头部留言<?>.
默认情况下,当没有提供记录键时,Apache Kafka 会根据 Apache Kafka 文档中描述的逻辑选择一个分区。 |
主题必须配置为拥有足够的分区,以实现所有消费者组所需的并发性。
上述配置支持最多12个消费者实例(如果是6个,则支持)并发是2,如果它们的并发是3,则为4,依此类推)。
通常最好“过度配置”分区,以便未来消费者或并发增加。 |
之前的配置使用默认的分区(key.hashCode() % partitionCount).
这可能提供或不合适的平衡算法,取决于关键值的不同。特别注意,这种分区策略不同于独立 Kafka 生产者的默认方式——如 Kafka Streams 所采用的,这意味着同一键值在客户端生成时可能在不同分区间的平衡不同。
你可以通过使用partitionSelectorExpression或partitionSelectorClass性能。 |
由于分区由Kafka原生处理,消费者端无需特殊配置。 Kafka 在实例之间分配分区。
| kafka 主题的 partitionCount 可能在运行时发生变化(例如因管理任务)。 计算后的划分会有所不同(例如,此时会使用新的划分)。 自 Spring Cloud Stream 4.0.3 版本起,将支持分区计数的更改。 另见参数“spring.kafka.producer.properties.metadata.max.age.ms”以配置更新间隔。 由于某些限制,无法使用引用消息“有效载荷”的“分区密钥表达式”,此时该机制将被禁用。 整体行为默认被禁用,可以通过配置参数 'producer.dynamicPartitionUpdatesEnabled=true' 来启用。 |
以下 Spring Boot 应用程序监听 Kafka 流,并打印(向控制台)每个消息所指向的分区 ID:
@SpringBootApplication
public class KafkaPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
int partition = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION);
System.out.println(message + " received from partition " + partition);
};
}
}
spring:
cloud:
stream:
bindings:
listen-in-0:
destination: partitioned.topic
group: myGroup
你可以根据需要添加实例。
卡夫卡会重新平衡分区分配。
如果实例计数(或实例计数 * 并发)超过分区数,部分消费者处于空闲状态。
2. 响应式卡夫卡活页夹
Spring Cloud Stream中的Kafka装订器提供了一个基于Reactor Kafka项目的专用响应式装订器。
该响应式Kafka绑定器使基于Apache Kafka的应用实现了完整的端到端反应功能,如背压、反应流等。
当你的 Spring Cloud Stream Kafka 应用程序使用响应式类型编写时(通量,单等等),建议使用这个响应式Kafka活页夹,而不是基于消息通道的普通Kafka活页夹。
2.1. Maven坐标
以下是响应式卡夫卡结合器的maven坐标。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-reactive</artifactId>
</dependency>
2.2. 使用响应式卡夫卡活页夹的基本示例
在本节中,我们展示了一些使用响应式绑定器编写响应式 Kafka 应用的基本代码片段及其相关细节。
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return s -> s.map(String::toUpperCase);
}
你可以用上面的大写函数同时支持基于消息通道的 Kafka 绑定器(春-云-溪-绑定-卡夫卡以及反应性卡夫卡结合剂(春云流束缚剂卡夫卡反应),本节讨论的话题。
在使用常规Kafka绑定器时,尽管你在应用程序中使用了反应类型(即在大写函数),你只会在函数执行时获得反应流。
在函数执行上下文之外,没有响应式益处,因为底层绑定器不基于响应式栈。
因此,尽管看起来像是带来了完整的端到端响应式堆栈,但该应用实际上只是部分被动式。
现在假设你用的是适合卡夫卡的反应性活页夹——春云流束缚剂卡夫卡反应上述函数的应用。
这种活页夹的实现将从高端消费到链底端发布,提供全方位的反应性效益。
这是因为底层的绑定器是建立在 Reactor Kafka 核心 API 之上。
在消费者端,它使用 KafkaReceiver,这是一种 Kafka 消费者的响应式实现。
同样,在生产者端,它使用 KafkaSender API,这是 Kafka 生产器的响应式实现。
由于响应式Kafka绑定器的基础建立在合适的响应式Kafka API之上,应用程序能够充分享受使用响应式技术的全部优势。
使用这种响应式卡夫卡活页夹时,应用内置了自动背压等反应功能。
从4.0.2版本开始,你可以自定义接收者选项和发件选项通过提供一个或多个接收器选项定制器或SenderOptionsCustomizer豆子。
它们是双功能接收绑定名称和初始选项,返回自定义选项。
接口延伸命令因此,当多个自定义器存在时,定制器将按要求顺序应用。
绑定器默认不会提交偏移量。
从4.0.2版本开始,KafkaHeaders.致谢首部包含一个接收机偏移量该对象允许你通过调用其来导致偏移量被提交确认()或commit()方法。 |
@Bean
public Consumer<Flux<Message<String>> consume() {
return msg -> {
process(msg.getPayload());
msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
}
}
参见反应堆-卡夫卡更多信息请见文档和Javadocs。
此外,从4.0.3版本开始,Kafka的消费者属性reactiveAtmostOnce可以设置为true绑定器会在每次轮询返回的记录处理前自动提交偏移量。
另外,从4.0.3版本开始,你可以设置consumer 属性reactiveAutoCommit自true而在每次轮询返回的记录处理完后,绑页器会自动提交偏移量。
在这种情况下,确认头不存在。
还提供了4.0.2版本reactiveAutoCommit但实现不正确,表现类似于reactiveAtMostOnce. |
以下是如何使用的示例reaciveAutoCommit.
@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
return flux -> flux
.doOnNext(inner -> inner
.doOnNext(val -> {
log.info(val.value());
})
.subscribe())
.subscribe();
}
注意反应堆-卡夫卡返回 aFlux<Flux<ConsumerRecord<?, ?>>>使用自动提交时,
鉴于Spring无法访问内部通量的内容,应用程序必须处理原生流量消费者记录;内容没有消息转换或转换服务。
这需要使用本地译码(通过指定反串化器配置中相应类型的记录键/值。
2.3. 以原始格式消费唱片
在上述内容中大写函数,我们用通<弦>然后生成为通<弦>.
有时你可能需要以原始接收格式接收记录——接收记录.
这里有一个这样的函数。
@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}
在此函数中,注意我们消耗记录为Flux<ReceiverRecord<byte[], byte[]>>然后生成为通<弦>.接收记录是基本的接收记录,是专门的卡夫卡消费者记录在《反应堆卡夫卡》中。
使用响应式Kafka装订器时,上述函数将为你提供访问接收记录为每条进入记录输入。
不过,在这种情况下,你需要为 RecordMessageConverter 提供一个自定义实现。
默认情况下,响应式Kafka绑定器使用MessagingMessageConverter,将有效载荷和头部从以下消费者记录.
因此,当你的处理方法接收到有效载荷时,已从接收记录中提取并传递到方法,就像我们上面提到的第一个函数一样。
通过提供定制记录消息转换器在应用中实现时,你可以覆盖默认行为。
例如,如果你想以原始内容消费记录Flux<ReceiverRecord<byte[], byte[]>>那么你可以在应用中提供以下豆子定义。
@Bean
RecordMessageConverter fullRawReceivedRecord() {
return new RecordMessageConverter() {
private final RecordMessageConverter converter = new MessagingMessageConverter();
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type payloadType) {
return MessageBuilder.withPayload(record).build();
}
@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
return this.converter.fromMessage(message, defaultTopic);
}
};
}
然后,你需要指示框架使用该转换器来完成所需的绑定。
这里有一个基于我们小写功能。
spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"
0 中小写是我们输入绑定的名称小写功能。
对于出站(低壳出零),我们仍然使用常规消息信息转换器.
在收件人消息在上述实现上,我们收到了原始文件消费者记录 (接收记录因为我们处于响应式结合器上下文中),然后将其包裹在消息.
然后该消息载荷为接收记录是提供给用户方法的。
如果reactiveAutoCommit是false(默认),呼叫rec.receiverOffset().acknowledge()(或commit())导致偏移量被提交;如果reactiveAutoCommit是true,通量供应消费者记录而不是S。
参见反应堆-卡夫卡更多信息请见文档和Javadocs。
2.4. 并发
在使用响应式函数配合响应式Kafka绑定器时,如果你在消费级绑定上设置并发,绑定器就会生成尽可能多的专用绑定卡夫卡接收器对象由并发值提供。
换句话说,这会产生多个响应式流,分别是独立的通量实现。
当你从分区主题中获取记录时,这可能非常有用。
例如,假设进入的主题至少有三个分区。 然后你可以设置以下属性。
spring.cloud.stream.bindings.lowercase-in-0.consumer.concurrency=3
这样就有三个专门的卡夫卡接收器生成三个独立对象通量实现,然后将它们流式传输到处理器方法。
2.5. 多路复用
从4.0.3版本开始,通用的消费者属性多重现在被响应式装帧器支持,单一绑定可以同时消耗多个主题。
什么时候false(默认),在通用中逗号分隔列表中,每个主题都会创建独立的绑定目的地财产。
2.6. 目的地是模式
从4.0.3版本开始,目的即图案卡夫卡绑定消费者属性现已支持。
接收选项通过正则表达式表示表示模式,允许绑定从任何符合模式的主题中取用。
2.7. 发送结果通道
从4.0.3版本开始,你可以配置结果元数据通道接收SenderResult<?>s 用于判定发送的成功/失败。
这发送结果包含相关元数据以便你将结果与发送数据关联;它还包含记录元数据,表示主题分区以及发送记录的偏移。
这结果元数据通道 必须是流信息频道实例。
这里是一个如何使用该功能的示例,关联元数据类型为整数:
@Bean
FluxMessageChannel sendResults() {
return new FluxMessageChannel();
}
@ServiceActivator(inputChannel = "sendResults")
void handleResults(SenderResult<Integer> result) {
if (result.exception() != null) {
failureFor(result);
}
else {
successFor(result);
}
}
要在输出记录上设置相关元数据,请设置CORRELATION_ID页眉:
streamBridge.send("words1", MessageBuilder.withPayload("foobar")
.setCorrelationId(42)
.build());
当使用带有功能,函数输出类型必须是留言<?>并将关联ID头设置为所需的值。
元数据至少在发送期间应保持唯一。
3. 卡夫卡流活页夹
3.1. 使用情况
使用 Kafka Streams 绑定器时,只需使用以下 maven 坐标将其添加到你的 Spring Cloud Stream 应用中:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
快速启动 Kafka Streams 绑定器新项目的方法是使用 Spring Initializr,然后选择“Cloud Streams”和“Spring for Kafka Streams”,如下所示
3.2. 概述
Spring Cloud Stream 包含一个专门为 Apache Kafka Streams 绑定设计的绑定器实现。 通过这种原生集成,Spring Cloud Stream “处理器”应用可以直接在核心业务逻辑中使用 Apache Kafka Streams API。
Kafka Streams 绑定器实现基于 Spring for Apache Kafka 项目提供的基础。
Kafka Streams 装订器为 Kafka Streams 的三种主要类型提供了绑定功能——KStream,KTable(英国可爱的)音乐和全球可爱.
Kafka Streams 应用通常遵循一种模式:从入站主题读取记录,应用业务逻辑,然后将转换后的记录写入出站主题。 或者,也可以定义一个没有出站目的地的处理器应用程序。
在接下来的章节中,我们将详细介绍 Spring Cloud Stream 与 Kafka Streams 的集成。
3.3. 编程模型
使用 Kafka Streams 绑定器提供的编程模型时,可以使用高级 Streams DSL 以及高层和低层 Processor-API 的混合选项。
当混合高层和低层 API 时,通常通过调用来实现变换或过程API 方法在KStream.
3.3.1. 功能风格
从春云溪开始3.0.0Kafka Streams 绑定器允许使用 Java 8 中可用的函数式编程风格设计和开发应用程序。
这意味着应用程序可以简明地表示为类型的λ表达式java.util.function.函数或java.util.function.Consumer.
我们来举一个非常基础的例子。
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
虽然简单,但它是一个完整的独立 Spring Boot 应用程序,利用 Kafka Streams 进行流处理。
这是一个消费者应用,没有出站绑定,只有一个入站绑定。
应用程序消耗数据,并简单地记录来自KStream标准输出的键和值。
该应用程序包含SpringBootApplication注释和标记为豆.
豆子法属于java.util.function.Consumer参数化为KStream.
然后在实现中,我们返回一个本质上是λ表达式的消费者对象。
在λ表达式中,提供了处理数据的代码。
在该应用中,有一个类型的输入绑定KStream.
该绑定器为应用程序创建了带有名称的绑定process-in-0,即函数豆名后跟一个破折号字符()和字面-在再接一个破折号,最后是参数的序数位置。
你可以用这个绑定名来设置其他属性,比如目的地。
例如Spring.cloud.stream.bindings.process-in-0.destination=my-topic.
| 如果目标属性未在绑定上设置,则创建与绑定同名的主题(如果应用权限足够),或者该主题应已可用。 |
一旦被构建成超级罐子(例如,kstream-consumer-app.jar),你可以按照以下方式运行上述示例。
如果应用程序选择使用 Spring 来定义功能豆子元件注释,Binder 也支持该模型。
上述功能豆可以重写如下。
@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {
@Override
public void accept(KStream<Object, String> input) {
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic
这里还有另一个例子,它是一个包含输入和输出绑定的完整处理器。 这是经典的字数示例,应用程序接收主题数据后,在一个滚动时间窗口内计算每个词的出现次数。
@SpringBootApplication
public class WordCountProcessorApplication {
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("word-counts-state-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
}
这里同样是一个完整的 Spring Boot 应用。这里与第一个应用的不同之处在于豆子法属于java.util.function.函数.
第一个参数化类型功能是输入KStream第二个是输出。
在方法本体中,提供了一个类型为 的 λ 表达式功能作为实现,实际的业务逻辑也被给出。
类似于之前讨论的基于消费者的应用,这里的输入绑定命名为process-in-0默认。对于输出,绑定名也会自动设置为process-out-0.
一旦被构建成超级罐子(例如,wordcount-processor.jar),你可以按照以下方式运行上述示例。
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
该应用程序将接收来自卡夫卡主题的消息的话计算结果会被发布到输出中
主题计数.
Spring Cloud Stream 将确保来自进出主题的消息自动绑定为 KStream对象。作为开发者,你可以专注于代码的业务层面,也就是编写逻辑 处理器中必须如此。设置Kafka Streams基础设施所需的特定配置 由框架自动处理。
我们上面看到的两个例子是单一的KStream输入绑定。在这两种情况下,装订者都接收了单一主题的记录。
如果你想把多个主题合并成一个KStream有约束力的话题,你可以在下方提供逗号分隔的卡夫卡主题作为目的地。
Spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3
此外,如果你想将主题与常规内容匹配,也可以提供主题模式作为目的地。
spring.cloud.stream.bindings.process-in-0.destination=input.*
多输入绑定
许多非平凡的 Kafka Streams 应用程序经常通过多个绑定消耗多个主题的数据。
例如,一个主题被消费为克斯特里姆还有一个为KTable(英国可爱的)音乐或全球可爱.
应用程序希望以表类型接收数据的原因有很多。
想象一个用例:底层主题通过数据库中的变更数据捕获(CDC)机制填充,或者应用程序只关心最新更新以便下游处理。
如果应用程序指定数据需要绑定为KTable(英国可爱的)音乐或全球可爱那么Kafka Streams的绑定器就能正确绑定目标到一个KTable(英国可爱的)音乐或全球可爱并让这些信息可供应用程序运行。
我们将探讨 Kafka Streams 绑定器中多输入绑定的几种不同场景。
BiFunction in Kafka Streams Binder
这里有一个例子,我们有两个输入和一个输出。在这种情况下,应用可以利用java.util.function.BiFunction.
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
这里的基本主题与前述例子相同,但这里有两个输入。
爪哇的双功能支持用于将输入绑定到目标。
绑定器为输入生成的默认绑定名称为process-in-0和process-in-1分别。默认输出绑定为process-out-0.
在这个例子中,第一个参数双功能被束缚为KStream对于第一个输入,第二个参数被绑定为KTable(英国可爱的)音乐对于第二个输入。
BiConsumer in Kafka Streams Binder
如果有两个输入但没有输出,那么我们可以使用java.util.function.BiConsumer如下所示。
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {}
}
超过两个输入
如果你有超过两个输入怎么办? 有些情况下你需要超过两个输入。在这种情况下,绑定器允许你串联部分函数。 在函数式编程术语中,这种技术通常被称为currying。 随着Java 8新增函数式编程支持,Java现在可以编写curri函数。 Spring Cloud Stream Kafka Streams 绑定器可以利用此功能实现多输入绑定。
让我们举个例子。
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
return orders -> (
customers -> (
products -> (
orders.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
让我们来看看上面介绍的绑定模型的细节。
在这个模型中,我们有3个部分应用的函数在入站线上。我们称它们为f(x),f(y)和f(z).
如果我们将这些函数展开为真正的数学函数,它会呈现如下:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>.
这x变量代表KStream<Long,Order>这y变量代表GlobalKTable<Long,客户>以及z变量代表GlobalKTable<Long,产品部>.
第一个功能f(x)具有应用的第一个输入绑定(KStream<Long,Order>)其输出为函数f(y)。
函数f(y)具有应用的第二个输入绑定(GlobalKTable<Long,客户>)而其输出又是另一个函数,f(z).
函数的输入f(z)是应用的第三个输入(GlobalKTable<Long,产品部>)其输出为Kstream<Long,EnrichedOrder>这是应用程序的最终输出绑定。
来自三个偏函数的输入,分别是KStream,全球可爱,全球可爱这些内容分别在实现业务逻辑作为 lambda 表达式一部分的方法体中可供你使用。
输入绑定被命名为enrichOrder-in-0,enrichOrder-in-1和enrichOrder-in-2分别。输出绑定命名为enrichOrder-out-0.
有了库里函数,你几乎可以有任意数量的输入。不过要记住,超过较少的输入和部分应用函数,就像上面 Java 里那样,可能会导致代码无法读取。 因此,如果你的 Kafka Streams 应用需要的输入绑定数量超过相对较少,并且你想使用这个功能模型,那么你可能需要重新考虑设计,并适当地分解应用程序。
输出绑定
Kafka Streams 活页夹支持以下两种类型KStream或KTable(英国可爱的)音乐作为输出绑定。
在幕后,活页夹使用了自方法KStream将结果记录发送到输出主题。
如果申请提供KTable(英国可爱的)音乐作为函数的输出,绑定器仍然通过委派给自方法KStream.
例如,下面的两个功能都适用:
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
多输出绑定
Kafka Streams 允许将出站数据写入多个主题。这一特征被称为卡夫卡溪流中的分支。
使用多个输出绑定时,你需要提供 KStream 数组 (KStream[])作为出站返回类型。
这里有一个例子:
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> {
final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.split()
.branch(isEnglish)
.branch(isFrench)
.branch(isSpanish)
.noDefaultBranch();
return stringKStreamMap.values().toArray(new KStream[0]);
};
}
编程模型保持不变,但出站参数类型为KStream[].
默认的输出绑定名称为process-out-0,流程出局1,流程出局二分别对上述函数。
结合器产生三个输出绑定的原因是它检测返回的长度KStream阵列为三。
注意在这个例子中,我们提供了noDefaultBranch();如果我们使用defaultBranch()相反,这需要额外的输出绑定,本质上返回一个KStream长度为四的数组。
Kafka 流函数式编程风格总结
总之,下表展示了功能范式中可用的各种选项。
| 输入数量 | 输出数量 | 组件 |
|---|---|---|
1 |
0 |
java.util.function.Consumer |
2 |
0 |
java.util.function.BiConsumer |
1 |
1......n |
java.util.function.函数 |
2 |
1......n |
java.util.function.BiFunction |
>= 3 |
0......n |
使用curried函数 |
-
当表中输出多于一个时,类型简单变为
KStream[].
Function composition in Kafka Streams binder
Kafka Streams 绑针器支持线性拓扑的最小函数复合形式。
利用 Java 函数式 API 支持,你可以编写多个函数,然后自己用然后方法。
例如,假设你有以下两个函数。
@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
return input -> input.peek((s, s2) -> {});
}
@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
return input -> input.peek((s, s2) -> {});
}
即使没有活页夹中的功能性作文支持,你也可以像下面这样组合这两个功能。
@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
foo().andThen(bar());
}
然后你可以给出 形式的定义spring.cloud.function.definition=foo;酒吧;由.
有了装订器中的函数组合支持,你就不需要写那个你在做显式函数组合的第三个函数。
你可以直接这样做:
spring.cloud.function.definition=foo|bar
你甚至可以这样做:
spring.cloud.function.definition=foo|bar;foo;bar
该组合函数的默认绑定名称在此示例中变为福巴尔-0和福巴尔出局-0.
Kafka Streams bincer 功能组合的局限性
当你有java.util.function.函数豆子,可以与其他函数或多个函数组合。
同一函数豆可以由java.util.function.Consumer也。在这种情况下,消费者是最后一个组成的组成部分。
一个函数可以由多个函数组合而成,然后以java.util.function.Consumer豆豆也是。
在合成字模时java.util.function.BiFunction这双功能必须是定义中的第一个函数。
组成的实体必须是类型java.util.function.函数或java.util.funciton.Consumer.
换句话说,你不能取双功能然后再和另一个人作曲双功能.
你无法用双消费者或定义消费者是第一个分量。
你也不能用输出为数组的函数来组合 (KStream[]对于分支)除非这是定义中的最后一个分量。
第一位功能之双功能在函数定义中,也可以使用curried形式。
例如,以下情况是可能的。
@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
return a -> b ->
a.join(b, (value1, value2) -> value1 + value2);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}
函数定义可以为咖喱Foo|bar.
在幕后,绑定器会为curreed函数创建两个输入绑定,以及基于定义中最终函数的输出绑定。
在这种情况下,默认的输入绑定将是咖喱福巴尔-0和咖喱Foobar-in-1.
本例的默认输出绑定为咖喱福巴尔出局-0.
关于使用KTable(英国可爱的)音乐作为函数合成中的输出
假设你有以下两个功能。
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
你可以将它们写成foo|bar,但请记住第二个函数(酒吧此时)必须有KTable(英国可爱的)音乐作为输入,因为第一个函数(福) 具有KTable(英国可爱的)音乐作为输出。
3.4. 编程模型的辅助
3.4.1. 单一应用中的多个Kafka Streams处理器
Binder 允许在单个 Spring Cloud Stream 应用中拥有多个 Kafka Streams 处理器。 你可以提交如下的申请表。
@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
...
}
在这种情况下,绑定器会创建3个不同的应用ID的Kafka Streams对象(下面会详细说明)。 但是,如果你的应用程序中有多个处理器,你必须告诉 Spring Cloud Stream 需要激活哪些功能。 以下是如何激活这些功能的方法。
spring.cloud.function.definition: process;anotherProcess;然而另一个过程
如果你希望某些功能不立即激活,可以从列表中移除。
当你只有一个Kafka Streams处理器和其他类型的功能同一应用程序中的豆子,但通过不同的装订器处理(例如,基于普通卡夫卡消息通道装订器的功能豆)
3.4.2. Kafka Streams Application ID
申请ID是Kafka Streams申请必须提供的属性。 Spring Cloud Stream Kafka Streams 绑定器允许你以多种方式配置该应用 ID。
如果应用程序中只有一个处理器,那么你可以在绑定器层面使用以下属性设置:
spring.cloud.stream.kafka.streams.binder.applicationId.
为了方便,如果你只有一个处理器,也可以使用spring.application.name作为用于委托应用ID的属性。
如果你在应用程序中有多个 Kafka Streams 处理器,那么你需要为每个处理器设置应用 ID。 对于函数模型,你可以将其作为属性附加到每个函数上。
例如,假设你有以下函数。
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
...
}
和
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
然后你可以用以下绑定器级别的属性为每个应用设置应用ID。
spring.cloud.stream.kafka.streams.binder.functions.process.applicationId
和
spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId
对于基于函数的模型,将应用ID设置为绑定级别的方法同样有效。 然而,如上所述,使用函数模型时,在绑定器层面设置每个函数会容易得多。
对于生产部署,强烈建议通过配置明确指定应用ID。 如果你是自动扩展应用,这一点尤其关键,这时你需要确保每个实例都部署相同的应用ID。
如果应用程序没有提供应用ID,那么绑定器会自动生成一个静态应用ID。
这在开发场景中非常方便,因为避免了明确提供应用 ID 的需求。
以这种方式生成的应用ID在应用重启时将保持静态。
在函数模型的情况下,生成的应用ID将是函数豆名,后跟文字applicationID,例如process-applicationID如果过程如果函数 BEAN 名称。
应用ID设置总结
-
默认情况下,Binder 会自动生成每个函数方法的应用 ID。
-
如果你只有一个处理器,那你可以使用
spring.kafka.streams.applicationId,spring.application.name或spring.cloud.stream.kafka.streams.binder.applicationId. -
如果你有多个处理器,那么可以用以下属性为每个函数设置应用 ID -
spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId.
3.4.3. 覆盖绑本器生成的默认绑定名称,函数式样式
默认情况下,绑定器在使用函数样式时采用上述策略生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n]),例如 process-in-0、process-out-0 等。 如果你想覆盖这些绑定名称,可以通过指定以下属性来实现。
spring.cloud.stream.function.bindings.<default binding name>.默认绑定名是绑定器生成的原始绑定名。
比如说,你有这个函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
Binder 会生成带有名称的绑定,process-in-0,process-in-1和process-out-0.
如果你想完全改成别的,比如更专属领域的绑定名,可以按下面作。
spring.cloud.stream.function.bindings.process-in-0=users
spring.cloud.stream.function.bindings.process-in-0=regions
和
spring.cloud.stream.function.bindings.process-out-0=clicks
之后,你必须为这些新绑定名称设置所有绑定级别的属性。
请记住,在上述函数式编程模型中,在大多数情况下遵循默认的绑定名称是合理的。 你可能还想做覆盖的唯一原因是当配置属性更多,想把绑定映射到更适合域的配置时。
3.4.4. 设置引导服务器配置
运行 Kafka Streams 应用时,必须提供 Kafka 代理服务器信息。
如果你不提供这些信息,绑定者会期望你默认运行经纪人本地主机:9092.
如果不是这样,那你需要覆盖这个。有几种方法可以做到这一点。
-
利用boot特性——
spring.kafka.bootstrapServers -
活页夹级别属性——
spring.cloud.stream.kafka.streams.binder.brokers
关于活页夹层的属性,无论你是否使用普通卡夫卡活页夹提供的经纪人属性——spring.cloud.stream.kafka.binder.brokers.
Kafka Streams 绑定器首先会检查 Kafka Streams 绑订器的特定经纪人属性是否被设置 (spring.cloud.stream.kafka.streams.binder.brokers如果找不到,则寻找spring.cloud.stream.kafka.binder.brokers.
3.5. 记录序列化与反序列化
Kafka Streams 装订器允许你通过两种方式序列化和反序列化记录。 一是Kafka提供的原生序列化和反序列化功能,另一是Spring Cloud Stream框架的消息转换功能。 让我们来看一些细节。
3.5.1. 入站反序列化
密钥总是通过原生Serdes进行反序列化。
对于值,默认情况下,Kafka原生执行入站的反序列化。 请注意,这与之前版本的 Kafka Streams 绑装器相比,默认行为发生了重大变化,之前的反序列化由框架完成。
Kafka Streams 的活页夹会尝试推断匹配Serde通过观察 的类型签名来确定类型java.util.function.Function|消费者.
以下是它与Serdes匹配的顺序。
-
如果应用程序提供了 类型的豆子
Serde如果返回类型参数化为输入键或值的实际类型,则会使用Serde用于入站反序列化。 例如,如果你在应用程序中有以下内容,绑定器会检测到输入值的类型KStream与参数化在Serde豆。 它会用这些数据进行入库反序列化。
@Bean
public Serde<Foo> customSerde() {
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下来,它会观察这些类型,看看它们是否是卡夫卡流(Kafka Streams)暴露的类型之一。如果有,就用它们。 以下是活页夹会尝试匹配的 Kafka Streams 中的 Serde 字体。
Integer, Long, Short, Double, Float, byte[], UUID and String.
-
如果 Kafka Streams 提供的 Serde 都不符合类型,那么它将使用 Spring Kafka 提供的 JsonSerde。在这种情况下,绑定器假设这些类型对JSON友好。 如果你有多个值对象作为输入,这很有用,因为绑定器会在内部推断它们以纠正 Java 类型。 在退回
JsonSerde不过,绑定器默认会检查Serde在 Kafka Streams 配置中设置了 s,以判断它是否是Serde它能与新来的KStream类型匹配。
如果上述策略均无效,应用程序必须提供Serde通过配置。
这可以通过两种方式配置——绑定或默认。
首先,活页夹会检查是否Serde在绑定层面提供。
例如,如果你有以下处理器,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
然后,你可以提供一个绑定级别Serde使用以下方法:
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果你提供Serde作为输入绑定的 abover,那么该绑定优先级更高,绑定器将避免使用任何Serde推理。 |
如果你想让默认的键值Serdes用于入站反序列化,可以在绑定器层面实现。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果你不想要Kafka提供的原生解码,可以依赖Spring Cloud Stream提供的消息转换功能。 由于原生解码是默认,为了让 Spring Cloud Stream 反序列化入站值对象,你需要明确禁用原生解码。
例如,如果你使用的是与上面相同的 BiFunction 处理器,spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false你需要单独关闭所有输入的原生解码。否则,对于未禁用的部分,仍会应用原生解码。
默认情况下,Spring Cloud Stream 将使用application/json作为内容类型,并使用合适的 JSON 消息转换器。
你可以通过以下属性和适当的条件来使用自定义消息转换器消息转换器豆。
spring.cloud.stream.bindings.process-in-0.contentType
3.5.2. 出站序列化
出站序列化基本上遵循上述的入站反序列化规则。 与入站反序列化类似,与之前版本的 Spring Cloud Stream 有一个重大变化是出站的序列化由 Kafka 原生处理。 在3.0版本之前,这都是由框架本身完成的。
出境键总是由卡夫卡用匹配序列化Serde这由活页夹推断。
如果无法推断密钥的类型,那就需要用配置来指定。
值服务通过与进站反序列化相同的规则推断。
首先匹配的是外出类型是否来自应用中提供的豆子。
如果不匹配,它会检查是否与Serde卡夫卡揭露了这些作品,例如——整数,长,短,双,浮,字节[],UUID和字符串.
如果不行,那就退回去JsonSerde由Spring Kafka项目提供,但先看默认Serde配置以判断是否匹配。
请记住,所有这些都是对应用程序透明的。
如果这些都不行,用户必须提供Serde通过配置使用。
假设你用的是同样的双功能处理器如上所述。然后你可以配置出站键/值Serdes,具体如下。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果Serde推断失败,且没有提供绑定级别Serdes,则绑定器退回到JsonSerde但看看默认的Serdes匹配。
默认 serdes 的配置方式与上述相同,描述在反序列化中。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果你的应用使用分支功能并且有多个输出绑定,那么这些绑定必须在每个绑定中配置。
同样,如果活页夹能够推断Serde类型,你不需要做这个配置。
如果你不想要Kafka提供的原生编码,但想使用框架提供的消息转换,那么你需要明确禁用原生编码,因为原生编码是默认的。
例如,如果你使用的是与上面相同的 BiFunction 处理器,spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false在分支情况下,你需要单独禁用所有输出的原生编码。否则,对于未禁用的部分,仍会应用原生编码。
当Spring Cloud Stream进行转换时,默认情况下,它会使用application/json作为内容类型,并使用合适的 JSON 消息转换器。
你可以通过以下属性和相应的条件使用自定义消息转换器消息转换器豆。
spring.cloud.stream.bindings.process-out-0.contentType
当本地编码/解码被禁用时,binder不会像原生Serdes那样进行任何推理。
应用程序需要明确提供所有配置选项。
因此,通常建议在编写 Spring Cloud Stream Kafka Streams 应用时,保持默认的反序列化选项,并坚持使用 Kafka Streams 提供的原生反序列化功能。
唯一必须使用框架提供的消息转换能力的情况是,当上游生产者使用特定的序列化策略时。
在这种情况下,你需要采用匹配的反序列化策略,因为原生机制可能会失败。
当依赖默认时Serde机制中,应用程序必须确保活页夹有前进路径,能够正确映射进出的进站和出站Serde否则可能会失败。
值得一提的是,上述数据反序列化方法仅适用于处理器的边缘,即进站和出站。
你的业务逻辑可能仍然需要调用明确需要的 Kafka Streams APISerde对象。
这些仍然是应用的责任,开发者必须相应处理。
3.6. 错误处理
Apache Kafka Streams 提供了原生处理反序列化错误异常的能力。
有关此支持的详细信息,请参见此处。
开箱即用,Apache Kafka Streams 提供了两种类型的反序列化异常处理程序——LogAndContinueExceptionHandler和LogAndFailExceptionHandler.
顾名思义,前者会记录错误并继续处理后续记录,后者则会记录错误并失败。LogAndFailExceptionHandler是默认的反序列化异常处理程序。
3.6.1. 在文件夹中处理反序列化异常
Kafka Streams 绑定器允许使用以下属性指定上述反序列化异常处理程序。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
或
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
除了上述两个反序列化异常处理程序外,该绑定器还提供了第三个处理程序,用于将错误记录(毒丸)发送到DLQ(死信队列)主题。 以下是如何启用这个DLQ异常处理程序的方法。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
当上述属性被设置时,所有处于反序列化错误中的记录都会自动发送到DLQ主题。
你可以设置DLQ消息发布的主题名称如下。
你可以提供一个实现DlqDestinationResolver这是一个功能性接口。DlqDestinationResolver需要消费者记录以及例外作为输入,然后允许将主题名称指定为输出。
通过获得卡夫卡的访问权消费者记录,头部记录可以在实现双功能.
这里有一个提供 的实现示例DlqDestinationResolver.
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
在为 提供实现时,有一点很重要要记住DlqDestinationResolver就是 Binder 中的 provisioner 不会自动为应用创建主题。
这是因为绑定器无法推断实现可能发送的所有DLQ主题名称。
因此,如果你用这种策略提供DLQ名称,应用程序有责任确保这些主题事先被创建。
如果DlqDestinationResolver在应用中以豆子的形式存在,优先级更高。
如果你不想采用这种方法,而是通过配置提供静态的DLQ名称,可以设置以下属性。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
如果设置了这个,那么错误记录会发送到主题Custom-DLQ.
如果应用程序没有使用上述任何一种策略,那么它会创建一个带有名称的 DLQ 主题error.<input-topic-name>.<application-id>.
例如,如果你的装订目标主题是输入主题应用程序ID为process-applicationID,则默认的DLQ主题为error.inputTopic.process-applicationID.
如果你打算启用DLQ,建议为每个输入绑定明确创建一个DLQ主题。
3.6.2. DLQ每输入消费者绑定
该物业spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler适用于整个应用。
这意味着如果同一应用中有多个函数,该属性会应用到所有函数上。
然而,如果你在单个处理器内有多个处理器或多个输入绑定,那么你可以使用绑定器为每个输入消费者绑定提供的更细粒度的DLQ控制。
如果你有以下处理器,
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
你只需要在第一个输入绑定上启用DLQ,在第二个绑定上启用skipAndContinue,然后你可以像下面那样在消费者端这样做。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue
以这种方式设置反序列化异常处理程序的优先级高于在绑定器层面设置。
3.6.3. DLQ 分区
默认情况下,记录会发布到死信主题,使用与原始记录相同的分区。这意味着死信主题必须拥有与原始记录至少相同的分区数。
要改变这种行为,可以添加一个DlqPartitionFunction作为@Bean切换到应用上下文。只能存在一个这样的 bean。该函数由消费者组(在大多数情况下与应用 ID 相同)提供,失败消费者记录以及例外。例如,如果你总是想路由到分区0,你可以使用:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果你设置了消费者绑定dlqPartitions属性为1(以及绑定者的最小分区计数等于1),无需提供DlqPartitionFunction; 框架始终使用分区 0。如果你设置了消费者绑定dlqPartitions属性为大于1(或者说是活页夹的最小分区计数大于1),你必须提供一个DlqPartitionFunction豆子,即使分区计数和原主题相同。 |
使用 Kafka Streams Binder 中的异常处理功能时,有几点需要注意。
-
该物业
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler适用于整个应用程序。这意味着如果同一应用程序中有多个函数,该性质会应用到所有函数。 -
反序列化的异常处理与原生反序列化和框架提供的消息转换保持一致。
3.6.4. 在活页夹中处理生产异常
与上述对反序列化异常处理程序的支持不同,该绑定器不提供处理生产异常的第一类机制。不过,你仍然可以通过以下方式配置生产异常处理程序StreamsBuilderFactoryBean你可以在下面下一节找到更多关于定制器的详细信息。
3.7. 重试关键业务逻辑
在某些情况下,你可能需要重试对应用至关重要的业务逻辑部分。比如外部调用关系数据库,或从 Kafka Streams 处理器调用 REST 端点。这些调用可能因网络问题或远程服务不可用等多种原因而失败。但更常见的是,如果你能再试一次,这些失败可能会自行解决。默认情况下,Kafka Streams 的绑定器会创建重试模板所有输入绑定都用豆子。
如果函数的签名如下,
@Bean
public java.util.function.Consumer<KStream<Object, String>> process()
以及默认绑定名重试模板将注册为process-in-0-RetryTemplate. 这遵循了绑定名称的惯例(process-in-0) 后接字面- 重试模板. 在多个输入绑定的情况下,会有单独的绑定重试模板每个绑定都有豆子。如果有自定义的话重试模板申请中可通过以下方式获得 BEANspring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName那么,它优先于任何输入绑定层的重试模板配置属性。
一旦重试模板从注入到应用程序的绑定后,可以用来重试应用程序中的关键部分。这里有一个示例:
@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
retryTemplate.execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
或者你也可以用自定义重试模板如下所示。
@EnableAutoConfiguration
public static class CustomRetryTemplateApp {
@Bean
@StreamRetryTemplate
RetryTemplate fooRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1);
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
fooRetryTemplate().execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
}
注意,当重试用尽时,默认情况下会抛出最后一个异常,导致处理器终止。如果你想处理该异常并继续处理,可以在执行方法: 这里有一个例子。
retryTemplate.execute(context -> {
//Critical business logic goes here.
}, context -> {
//Recovery logic goes here.
return null;
));
有关RetryTemplate、重试政策、退回政策等的更多信息,请参阅春季重试项目。
3.8. 州商店
当使用高层DSL并进行相应调用时,Kafka Streams会自动创建状态存储。
如果你想实现一个新来的目标KTable(英国可爱的)音乐绑定为命名状态存储,然后你可以用以下策略实现。
假设你有以下函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
然后通过设置以下属性,即KTable(英国可爱的)音乐数据会被具体化到指定的状态存储中。
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store
你可以在应用中将自定义状态存储定义为豆子,这些存储会被绑定器检测并添加到Kafka Streams构建器中。 尤其是使用处理器 API 时,你需要手动注册状态存储。 为此,你可以在应用程序中创建一个 StateStore 作为 bean。 以下是定义此类Beans的示例。
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
这些状态存储器随后可以被应用程序直接访问。
在引导过程中,上述豆子会被结合器处理,并传递给Streams构建对象。
进入州商店:
Processor<Object, Product>() {
WindowStore<Object, String> state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}
但这在注册全球州商店时行不通。
如需注册全球状态商店,请参见下方关于定制的部分StreamsBuilderFactoryBean.
3.9. 交互式查询
Kafka Streams binder API 会暴露一个名为互动查询服务用于交互式查询状态存储。
你可以在申请中以春季豆的形式访问这些信息。通过你的申请获取这颗豆子的一个简单方法是自动线豆子。
@Autowired
private InteractiveQueryService interactiveQueryService;
一旦你获得了这个豆子的访问权限,就可以查询你感兴趣的特定州商店。见下文。
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
启动过程中,上述调用存储库的方法调用可能会失败。 比如,它可能还在初始化状态存储的过程中。 在这种情况下,重试该作会很有用。 Kafka Streams 绑定器提供了简单的重试机制来支持此功能。
以下是你可以用来控制这种重试的两个属性。
-
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 默认为
1. -
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认是
1000毫秒。
如果有多个 kafka 流应用实例在运行,那么在你可以交互式查询它们之前,你需要确定你查询的具体密钥托管在哪个应用实例。互动查询服务API提供了识别主机信息的方法。
为了实现这一点,你必须配置该属性application.server如下:
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
以下是一些代码片段:
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
有关这些主机查找方法的更多信息,请参见Javadoc上的相关方法。 对于这些方法,启动时如果底层 KafkaStreams 对象尚未准备好,可能会抛出异常。 上述重试特性同样适用于这些方法。
3.9.1. 通过InteractiveQueryService可用的其他API方法
请使用以下API方法检索KeyQueryMetadata与给定存储和键组合相关的对象。
public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)
请使用以下API方法检索KakfaStreams与给定存储和键组合相关的对象。
public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)
3.9.2. 自定义存储查询参数
有时候你需要在通过互动查询服务.
为此,从以下内容开始4.0.1结合剂版本,你可以提供一个豆子StoreQueryParametersCustomizer这是一个功能性接口,具有自定义方法StoreQueryParameter作为论点。
这是它的方法签名。
StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);
通过这种方法,应用程序可以进一步定制StoreQueryParameters(存储查询参数)比如让陈旧商店出现。
当这种豆子出现在应用中时,互动查询服务将称之为自定义在查询状态存储之前。
请记住,必须有一种独特的豆子StoreQueryParametersCustomizer申请表中提供。 |
3.10. 健康指标
健康指标需要依赖性Spring-启动-执行器.关于专家的使用:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Spring Cloud Stream Kafka Streams Binder 提供了一个健康指示器,用于检查底层流线程的状态。
春云流定义了一个属性管理.健康.binders.enabled以启用健康指示器。请参阅春季云流文档。
健康指示器为每个流线程的元数据提供以下详细信息:
-
线程名称
-
线程状态:
创建,运行,PARTITIONS_REVOKED,PARTITIONS_ASSIGNED,PENDING_SHUTDOWN或死 -
活动任务:任务ID和分区
-
备用任务:任务ID和分区
默认情况下,只有全局状态可见(向上或下).为了说明细节,房产管理。端点。健康。显示-详情必须设置为总是或WHEN_AUTHORIZED.
有关健康信息的更多信息,请参见Spring靴执行器文档。
健康指标的状态为向上如果所有卡夫卡线程都位于运行州。 |
由于《Kafka Streams》活页夹中有三个独立活页夹(KStream,KTable(英国可爱的)音乐和全球可爱),所有这些都会报告健康状况。
启用时节目详情,报告的一些信息可能存在重复。
当同一应用程序中存在多个 Kafka Streams 处理器时,健康检查将报告所有处理器,并按 Kafka Streams 的应用 ID 进行分类。
3.11. 访问Kafka Streams Metrics
Spring Cloud Stream Kafka Streams 绑定器提供了 Kafka Streams 指标,可以通过 Micrometer 导出MeterRegistry.
对于 Spring Boot 2.2.x,度量支持通过绑定器自定义的微尺度量实现提供。 对于 Spring Boot 2.3.x,Kafka Streams 的度量支持是通过 Micrometer 原生提供的。
通过Boot执行器端点访问指标时,务必添加指标归属管理端点.web.exposure.include.
然后你就可以访问了/acutator/metrics以获取所有可用指标的列表,然后可以通过同一个URI单独访问这些指标(/执行器/度量/<度量名>).
3.12. 混合高级DSL和低级处理器API
Kafka Streams 提供了两种 API 变体。
它有更高级的类似DSL的API,可以串联各种作,很多函数式程序员可能都很熟悉。
Kafka Streams 还提供一个低级别的处理器 API。
处理器API虽然非常强大,并且能够在更低层次上控制事物,但本质上是必不可少的。
Kafka Streams 的 Spring Cloud Stream 绑定器允许你使用高级 DSL 或混合使用 DSL 和处理器 API。
混合这两种变体可以让你在应用中控制各种用例。
应用程序可以使用变换或过程方法API调用以访问处理器API。
以下是如何结合 DSL 和处理器 API 在 Spring Cloud Stream 应用中的应用,使用过程应用程序接口。
@Bean
public Consumer<KStream<Object, String>> process() {
return input ->
input.process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, String value) {
//business logic
}
@Override
public void close() {
});
}
这里有一个使用以下条件的例子变换应用程序接口。
@Bean
public Consumer<KStream<Object, String>> process() {
return (input, a) ->
input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void close() {
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}
这过程API 方法调用是终端作,而变换API 是非终端的,可能会给你一个变换后的版本KStream你可以通过DSL或处理器API继续进行进一步处理。
3.13. 出站分区支持
Kafka Streams 处理器通常将处理后的输出发送到出站的 Kafka 主题中。
如果出站主题被分区,处理器需要将出站数据发送到特定分区,应用程序需要提供 类型的 beanStreamPartitioner.
详情请参见StreamPartitioner。
让我们看看一些例子。
这就是我们已经多次见过的同一个处理器,
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
...
}
以下是输出绑定的目的地:
spring.cloud.stream.bindings.process-out-0.destination: outputTopic
如果话题输出主题有4个分区,如果你不提供分区策略,Kafka Streams会使用默认的分区策略,这可能不是你想要的结果,具体取决于具体用例。
假设你想发送任何匹配的密钥到Spring划分到0,云到第1区划,流归入第2分区,其他所有内容归第3分区。
这就是你在申请中需要做的事情。
@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
return (t, k, v, n) -> {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("stream")) {
return 2;
}
else {
return 3;
}
};
}
这是一种初步的实现,但你可以访问记录的键/值、主题名和分区总数。 因此,如果需要,你可以实现复杂的分区策略。
你还需要提供这个豆子名和应用配置。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner
应用中的每个输出主题都需要像这样单独配置。
3.14. StreamsBuilderFactoryBean 额外自定义
通常需要对StreamsBuilderFactoryBean这就产生了卡夫卡流对象。
基于Spring Kafka提供的底层支持,该活页夹允许你自定义StreamsBuilderFactoryBean.
你可以使用org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer来自 Spring for Apache Kafka 项目以定制/配置StreamsBuilderFactoryBean本身。
这里有一个使用该条件的例子StreamsBuilderFactoryBeanConfigurer.
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
上面展示了你可以做的配置StreamsBuilderFactoryBean.
你基本上可以调用任何可用的突变作StreamsBuilderFactoryBean来配置它。
这个配置器会在开厂豆子前被活页夹调用。
一旦你获得了访问权限StreamsBuilderFactoryBean你也可以自定义底层卡夫卡流对象通过KafkaStreamsCustomizer.
这里有一个实现这一目标的蓝图。
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
KafkaStreamsCustomizer将被StreamsBuilderFactoryBean就在标的卡夫卡流开始吧。
只能有一个StreamsBuilderFactoryBeanConfigurer在整个申请中。
那么,当每个处理器分别备份时,我们如何考虑多个 Kafka Streams 处理器的情况StreamsBuilderFactoryBean对象?
在这种情况下,如果这些处理器需要自定义不同,应用程序需要基于应用ID应用某种过滤。
例如,
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
}
};
3.14.1. 使用自定义器注册全局状态存储
如上所述,该活页夹并不提供一流的全局状态存储注册功能。 为此,你需要使用自定义工具。 以下是实现方式的方法。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
try {
final StreamsBuilder streamsBuilder = fb.getObject();
streamsBuilder.addGlobalStore(...);
}
catch (Exception e) {
}
};
}
同样,如果你有多个处理器,你要把全局状态存储附加在右侧StreamsBuilder通过过滤掉对方StreamsBuilderFactoryBean如上所述,使用应用程序 ID 的对象。
3.14.2. 使用定制工具注册生产异常处理程序
在错误处理部分,我们指出该活页夹并未提供处理生产异常的第一流方法。
不过情况是这样,你仍然可以使用StreamsBuilderFacotryBean用于注册生产异常处理程序的定制器。见下文。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}
同样,如果你有多个处理器,可能需要根据正确的设置StreamsBuilderFactoryBean.
你也可以使用配置属性添加此类生产异常处理程序(详见下文),但如果你选择程序化方法,这也是一个选项。
3.15. 时间戳提取器
Kafka Streams 允许你根据各种时间戳概念控制消费者记录的处理。
默认情况下,Kafka Streams 会提取嵌入在消费者记录中的时间戳元数据。
你可以通过提供不同的方式来改变这种默认行为时间戳提取器实现按输入绑定。
以下是一些具体的实现方式。
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
return orderStream ->
customers ->
products -> orderStream;
}
@Bean
public TimestampExtractor timestampExtractor() {
return new WallclockTimestampExtractor();
}
然后你设置上述时间戳提取器根据消费者装订的豆子名称。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"
如果你跳过设置自定义时间戳提取器的输入消费者绑定,该消费者将使用默认设置。
3.16. 多重活页夹,基于 Kafka Streams 的活页夹和普通 Kafka 活页夹
你可以有一个应用程序,既有基于普通Kafka活页夹的功能/消费者/提供商,也有基于Kafka Streams的处理器。 然而,你不能在一个功能或消费者中同时使用这两者。
这里有个例子,你在同一个应用里有两个基于活页夹的组件。
@Bean
public Function<String, String> process() {
return s -> s;
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {
return input -> input;
}
以下是配置中相关的部分:
spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
如果你的应用和上面一样,但涉及两个不同的Kafka簇,情况会更复杂,比如普通的过程同时作用于 Kafka 集群 1 和 cluster 2(接收来自 cluster-1 的数据并发送到 cluster-2),而 Kafka Streams 处理器则作用于 Kafka 集群 2。
然后你得用Spring Cloud Stream提供的多活页夹功能。
以下是你在那种情况下配置可能会发生变化的情况。
# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2
# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
注意上述配置。
我们有两种结合剂,但总共有三种,第一种是基于第1组的普通卡夫卡粘合剂(卡夫卡1),然后基于第2个簇的另一个卡夫卡活页夹(卡夫卡2) 最后Kstream一 (卡夫卡3).
应用程序中的第一个处理器接收来自卡夫卡1并出版至以下卡夫卡2其中两种结合剂都基于普通卡夫卡结合剂,但簇不同。
第二个处理器是Kafka Streams处理器,消耗来自以下的数据。卡夫卡3该簇与卡夫卡2但用的是不同类型的活页夹。
由于Kafka Streams系列活页夹有三种不同类型的活页夹——Kstream,ktable和全球克洛卡泰表- 如果你的应用程序基于这些绑定器中的任意一个有多个绑定,则需要明确提供该绑定器类型。
例如,如果你有一个像下面这样的处理器,
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
...
}
然后,在多活页夹场景中,必须配置如下。 请注意,只有在真正的多绑定器场景下才需要,即单个应用内有多个处理器处理多个集群。 在这种情况下,绑定器需要明确提供绑定,以区别于其他处理器的绑定器类型和集群。
spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream
# rest of the configuration is omitted.
3.17. 州级清理
默认情况下,绑定停止时不会清理任何本地状态。
这与Spring Kafka 2.7版本相同的行为有效。
更多详情请参见Spring Kafka文档。
要修改此行为,只需添加一个CleanupConfig @Bean(配置为在启动、停止或两者皆不清理)到应用上下文;咖啡豆会被检测并接入原厂咖啡豆。
3.18. Kafka Streams 拓扑可视化
Kafka Streams 绑定器提供了以下执行器端点,用于检索拓扑描述,您可以通过外部工具对拓扑进行可视化。
/执行器/kafkastreamstopology
/执行器/kafkastreamstopology/<application-id处理器>
你需要包含 Spring Boot 中的执行器和 Web 依赖来访问这些端点。
此外,你还需要补充卡夫卡溪流止水学自管理端点.web.exposure.include财产。
默认情况下,卡夫卡溪流止水学端点已禁用。
3.19. Kafka Streams 应用中的基于事件类型的路由
基于常规消息通道的绑定器中可用的路由功能在 Kafka Streams 绑定器中不被支持。 然而,Kafka Streams 绑定器仍通过入站记录的事件类型记录头提供路由功能。
为了基于事件类型实现路由,应用程序必须提供以下属性。
spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes.
这可以是一个逗号分隔值。
例如,假设我们有这个函数:
@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
return input -> input;
}
我们也假设只有当输入记录的事件类型为福或酒吧.
这可以通过以下表达方式表示:事件类型绑定上的属性。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar
现在,当应用程序运行时,绑定器会检查每个进入的记录是否有头部event_type并观察其值是否设为福或酒吧.
如果找不到这两个函数,则会跳过函数执行。
默认情况下,绑定器期望记录头键为event_type但这可以根据绑定进行调整。
例如,如果我们想将该绑定的头键更改为my_event默认设置可以更改如下。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event.
使用 Kafkfa Streams 绑定器中的事件路由功能时,它使用字节数组Serde用来解序所有进来的记录。
如果记录头与事件类型匹配,则只有它使用实际事件Serde使用配置或推断的 进行适当的反序列化Serde. 如果你在绑定上设置了反序列化异常处理程序,这会带来问题,因为预期的反序列化只发生在栈的下游,导致意外错误。为了解决这个问题,你可以在绑定上设置以下属性,强制绑定器使用配置或推断的Serde而不是字节数组Serde.
spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents
这样,应用程序在使用事件路由功能时可以立即检测反序列化问题,并做出适当的处理决策。
3.20. Kafka Streams binder 中的绑定可视化与控制
从3.1.2版本开始,Kafka Streams 绑定器支持绑定可视化和控制。仅支持两个生命周期阶段停止和开始. 生命周期阶段暂停和恢复在Kafka Streams的活页夹中没有。
为了激活绑定可视化和控制,应用程序需要包含以下两个依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
如果你更喜欢用 webflux,也可以包含Spring BootStarters网流而不是标准的网络依赖。
此外,你还需要设置以下属性:
management.endpoints.web.exposure.include=bindings
为了进一步说明这一特性,我们以以下应用为参考:
@SpringBootApplication
public class KafkaStreamsApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsApplication.class, args);
}
@Bean
public Consumer<KStream<String, String>> consumer() {
return s -> s.foreach((key, value) -> System.out.println(value));
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> function() {
return ks -> ks;
}
}
如我们所见,该应用有两个 Kafka Streams 函数——一个是消费者,另一个是函数。消费者绑定默认命名为消费者-0. 同样,对于函数,输入绑定为函数在0中输出绑定为函数输出-0.
应用程序启动后,我们可以通过以下绑定端点查找绑定的详细信息。
curl http://localhost:8080/actuator/bindings | jq .
[
{
"bindingName": "consumer-in-0",
"name": "consumer-in-0",
"group": "consumer-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-in-0",
"name": "function-in-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-out-0",
"name": "function-out-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": false,
"extendedInfo": {}
}
]
关于这三种装帧的详细信息可在上文找到。
让我们现在停止消费者对零的约束。
curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
此时,不会通过这种装订收到任何记录。
重新绑定。
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
当单个函数存在多个绑定时,调用这些作对任意绑定都有效。这是因为单个函数的所有绑定都由相同的绑定支持。StreamsBuilderFactoryBean. 因此,对于上述函数,以下函数在0中或函数输出-0会有问题的。
3.21. 手动启动Kafka Streams处理器
Spring Cloud Stream Kafka Streams 绑定器提供了一个抽象功能,称为StreamsBuilderFactoryManager在StreamsBuilderFactoryBean来自 Spring for Apache Kafka。该管理器 API 用于控制StreamsBuilderFactoryBean在基于Binder的应用中,每个处理器都会产生不同的影响。因此,使用Binder时,如果你想手动控制各种自动启动StreamsBuilderFactoryBean应用中的对象,你需要使用StreamsBuilderFactoryManager. 你可以使用这块地产Spring.kafka.streams.auto-startup并将此设为false为了关闭处理器的自动启动。然后,在应用程序中,你可以用以下方法启动处理器,使用StreamsBuilderFactoryManager.
@Bean
public ApplicationRunner runner(StreamsBuilderFactoryManager sbfm) {
return args -> {
sbfm.start();
};
}
当你希望应用程序在主线程启动,并让Kafka Streams处理器单独启动时,这个功能非常有用。例如,当你需要恢复一个大型状态存储时,如果处理器正常启动(默认情况),这可能会阻止你的应用程序启动。如果你使用某种活检定机制(例如在Kubernetes上),它可能会认为应用程序宕机并尝试重启。为了纠正这个问题,你可以设置Spring.kafka.streams.auto-startup自false并按照上面的方法进行。
请记住,使用Spring Cloud Stream绑定器时,你并不是直接处理StreamsBuilderFactoryBean更像是《阿帕奇·卡夫卡的Spring》StreamsBuilderFactoryManager,作为StreamsBuilderFactoryBean对象由绑定器内部管理。
3.22. 手动选择性启动Kafka Streams处理器
而上述方法则无条件适用自动启动false通过应用程序中的所有Kafka Streams处理器StreamsBuilderFactoryManager通常希望只有单独选定的Kafka Streams处理器不自动启动。例如,假设你的应用程序中有三个不同的功能(处理器),其中一个处理器你不想在启动应用时启动它。这里有一个这样的例子。
@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {
}
@Bean
public Consumer<KStream<?, ?>> process2() {
}
@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {
}
在上述情景中,如果你设定Spring.kafka.streams.auto-startup自false那么,应用程序启动时没有处理器会自动启动。
在这种情况下,你必须按照上面描述的方式,通过调用来启动它们开始()关于标的StreamsBuilderFactoryManager.
但是,如果我们有选择性地禁用一个处理器的用例,那么你必须设置自动启动在该处理器的单独绑定上。
假设我们不想要过程3自动启动功能。
这是一艘双功能带有两个输入绑定 -process3-in-0和process3-in-1.
为了避免该处理器自动启动,你可以选择这些输入绑定并设置自动启动在他们身上。
无论你选择哪种绑定;如果你愿意,可以设置自动启动自false两个人都来,但一个就足够了。
因为它们共用同一个出厂豆,所以你不必在两个绑定上都把自动启动设置为假,但为了清晰起见,这样做可能更合理。
这里有 Spring Cloud Stream 属性,可以用来禁用该处理器的自动启动功能。
spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false
或
spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false
然后,你可以手动启动处理器,要么用 REST 端点,要么使用Bindings终点如下所示的API。
为此,你需要确保 Spring Boot 执行器依赖于类路径。
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process3-in-0
或
@Autowired
BindingsEndpoint endpoint;
@Bean
public ApplicationRunner runner() {
return args -> {
endpoint.changeState("process3-in-0", State.STARTED);
};
}
关于该机制,请参阅参考文献中的这一部分。
通过禁用绑定来控制绑定自动启动如本节所述,请注意,这仅适用于消费者装订。
换句话说,如果你使用生产者绑定,过程3出局0,虽然该生产者绑定使用相同的功能,但对关闭处理器自动启动没有任何影响StreamsBuilderFactoryBean作为消费者装订。 |
3.23. 使用春云侦探进行追踪
当 Spring Cloud Sleuth 位于基于 Spring Cloud Stream Kafka Streams 的绑定器应用的类路径上时,其消费者和生产者都会自动获得追踪信息的监测工具。
然而,为了追踪任何应用特定的作,这些作需要被用户代码显式进行表化。
这可以通过注入KafkaStreams追踪在应用程序中调用 Spring Cloud Sleuth 的 bean,然后通过这个注入的 bean 调用各种 Kafka Streams作。
以下是一些使用该法的例子。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
return new KeyValue<>(value.getRegion(),
value.getClicks());
}))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum, Materialized.as(CLICK_UPDATES))
.toStream());
}
在上面的例子中,有两个地方它添加了显式追踪仪表。
首先,我们记录来自输入的密钥/值信息KStream.
当这些信息被记录时,相关的跨度和轨距ID也会被记录,以便监控系统跟踪并与相同的跨度ID进行关联。
其次,当我们调用地图作,而不是直接调用KStream类,我们把它包裹在一个变换作,然后调用地图从KafkaStreams追踪.
在这种情况下,日志消息也会包含span ID和trace ID。
这里还有一个例子,我们使用低级变换器 API 访问各种 Kafka Streams 头部。 当 spring-cloud-sleuth 在类路径上时,所有追踪头部也可以这样访问。
@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
return input -> input.transform(kafkaStreamsTracing.transformer(
"transformer-1",
() -> new Transformer<String, String, KeyValue<String, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, String> transform(String key, String value) {
LOG.info("Headers: " + this.context.headers());
LOG.info("K/V:" + key + "/" + value);
// More transformations, business logic execution, etc. go here.
return KeyValue.pair(key, value);
}
@Override
public void close() {
}
}));
}
3.24. 配置选项
本节包含Kafka Streams绑定器中使用的配置选项。
有关绑定器的常见配置选项和属性,请参阅核心文档。
3.24.1. Kafka Streams Binder Properties(卡夫卡流 Binder Properties)
以下属性在活页夹层面可用,且必须以Spring.cloud.stream.kafka.streams.binder.任何在 Kafka Streams 活页夹中重复使用的 Kafka 提供的属性都必须在前缀spring.cloud.stream.kafka.streams.binder而不是Spring.cloud.stream.kafka.binder.
唯一的例外是定义 Kafka bootstrap 服务器属性时,任一前缀都适用。
- 配置
-
映射中包含与 Apache Kafka Streams API 相关的属性的键值对。 该性质必须以
Spring.cloud.stream.kafka.streams.binder.. 以下是使用该性质的一些例子。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
关于所有可能涉及流配置的属性的更多信息,请参见StreamsConfigJavaDocs in Apache Kafka Streams docs.
所有你可以从中设置的配置StreamsConfig可以通过这个设置。
使用该性质时,适用于整个应用程序,因为它是绑定器级别的性质。
如果你在应用中拥有多个处理器,所有处理器都会获得这些特性。
在以下性质的情况下application.id这将成为问题,因此你必须仔细检查从中获得的性质StreamsConfig是用这个绑定器层级映射的配置财产。
- functions.<function-bean-name>.applicationId
-
仅适用于功能性型处理器。 这可以用来设置应用程序中的每个函数的应用ID。 对于多函数,这是一种方便设置应用ID的方法。
- functions.<function-bean-name>.configuration
-
仅适用于功能性型处理器。 映射中包含与 Apache Kafka Streams API 相关的属性的键值对。 这与束缚剂级别类似
配置上述描述的性质,但该层级配置性质仅对命名函数受限。 当你有多个处理器,并且想根据特定功能限制对配置的访问时,你可能会想用这个方法。 都StreamsConfig这里可以使用属性。 - 经纪人
-
经纪商网址
违约:
本地主持 - zkNodes
-
Zookeeper 网址
违约:
本地主持 - deserializationExceptionHandler
-
反序列化错误处理类型。 该处理程序在绑定器层面应用,因此针对应用中所有输入绑定。 在消费者绑定层面,有一种更细致的方法来控制它。 可能的数值有——
logAndContinue,logAndFail,跳过并继续或sendToDlq违约:
logAndFail - applicationID
-
方便地在装订器层面为Kafka Streams应用设置全球的 Books(卡夫卡流)应用 application.id。 如果应用程序包含多个功能,那么应用 ID 应设置为不同设置。 详见上文,详细讨论了设置应用ID。
默认情况下:应用程序将生成一个静态应用ID。详情请参见申请ID部分。
- stateStoreRetry.maxAttempts
-
Max试图连接州商店。
默认值:1
- stateStoreRetry.backoffPeriod
-
尝试重试连接州商店时,需要有一段时间的退后期。
默认:1000毫秒
- consumerProperties
-
在装订者层面的任意消费者属性。
- 制作人属性
-
在粘合剂层面,生产者的属性是任意的。
- includeStoppedProcessorsForHealthCheck
-
当处理器绑定通过执行器停止时,该处理器默认不会参与健康检查。 将此属性设置为
true为所有处理器(包括目前通过绑定执行器终端被阻止的处理器)启用健康检查。默认:false
3.24.2. 卡夫卡流 制片人
以下属性仅供Kafka Streams制作者使用,且必须以spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.为了方便起见,如果有多个输出绑定且它们都需要一个共同值,可以通过使用前缀配置spring.cloud.stream.kafka.streams.default.producer..
- keySerde
-
Key Serde to use
默认:参见上面关于消息去序列化的讨论
- valueSerde
-
value serde to use
默认:参见上面关于消息去序列化的讨论
- useNative编码
-
用于启用/禁用本地编码的标志
违约:
true. - streamPartitionerBeanName
-
用户可自定义的出站分区豆名。 应用程序可以提供定制服务
StreamPartitioner作为春季豆,且该豆名可以提供给生产者,代替默认名称。默认:请参见上面关于出站分区支持的讨论。
- 制作作品
-
处理器生产的汇组件的自定义名称。
Deafult:
没有(由Kafka Streams生成)
3.24.3. 卡夫卡流媒体消费品
以下属性供Kafka Streams用户使用,且必须以前缀加spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.为了方便起见,如果有多个输入绑定且都需要一个共同值,可以通过使用前缀进行配置spring.cloud.stream.kafka.streams.default.consumer。.
- applicationID
-
设置每个输入绑定的 application.id。
默认:见上文。
- keySerde
-
Key Serde to use
默认:参见上面关于消息去序列化的讨论
- valueSerde
-
value serde to use
默认:参见上面关于消息去序列化的讨论
- 实现为
-
在使用入站KTable类型时实现状态存储
违约:
没有. - useNativeDecoding
-
启用/禁用原生解码的标志
违约:
true. - dlqName
-
DLQ主题名称。
默认:请参见上文关于错误处理和DLQ的讨论。
- startOffset
-
如果没有承诺的可消费的偏移量,则从起始点。 这通常用于消费者首次阅读某个主题时。 卡夫卡流的用途
最早作为默认策略,绑定者也使用相同的默认策略。 此值可覆盖为最近的利用该性质。违约:
最早.
注意:使用resetOffsets对消费者来说,这对Kafka Streams的活页夹没有任何影响。
与基于消息通道的活页夹不同,Kafka Streams 的活页夹不寻求按需开始或结束。
- deserializationExceptionHandler
-
反序列化错误处理类型。 该处理程序是按消费者绑定应用的,而非之前描述的绑定器级别属性。 可能的数值有——
logAndContinue,logAndFail,跳过并继续或sendToDlq违约:
logAndFail - 时间戳提取器BeanName。
-
消费者需使用特定的时间戳提取豆名称。 应用程序可以提供
时间戳提取器作为春豆,且该豆子的名称可以提供给消费者,以代替默认名称。默认情况下:请参见上面关于时间戳提取器的讨论。
- 事件类型
-
逗号分隔了该绑定支持的事件类型列表。
违约:
没有 - eventTypeHeaderKey
-
通过此绑定,每个入站记录的事件类型头键。
违约:
event_type - 被消费的As
-
为处理器从源组件自定义名称。
Deafult:
没有(由Kafka Streams生成)
3.24.4. 关于并发的特别说明
在 Kafka Streams 中,你可以控制处理器通过数字.stream.threads财产。
你可以利用各种方法来实现配置上述选项分别在文件夹、功能、生产者或消费者层面描述。
你也可以使用并发这是核心 Spring Cloud Stream 为此目的提供的属性。
使用时,你需要对消费者使用。
当你有多个输入绑定时,把它设置在第一个输入绑定上。
例如,设置时Spring.cloud.stream.bindings.process-in-0.consumer.concurrency,翻译为数字.stream.threads在活页夹旁边。
如果你有多个处理器,其中一个处理器定义了绑定级别并发,而其他处理器没有,那么那些没有绑定级别并发的处理器将默认回到通过以下方式指定的绑定器范围属性spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads.
如果无法使用该绑定器配置,应用程序将使用 Kafka Streams 的默认设置。
4. 技巧、窍门和Recipes
4.1. 与卡夫卡的简单DLQ
4.1.1. 问题陈述
作为开发者,我想写一个消费者应用程序来处理来自卡夫卡主题的记录。 不过,如果处理过程中出现错误,我不希望应用程序完全停止。 相反,我想把错误记录发送到DLT(死信主题),然后继续处理新的记录。
4.1.2. 解决方案
解决这个问题的方法是使用 Spring Cloud Stream 中的 DLQ 功能。 为了讨论,我们假设以下是处理器函数。
@Bean
public Consumer<byte[]> processData() {
return s -> {
throw new RuntimeException();
};
}
这是一个非常简单的函数,会对处理的所有记录抛出异常,但你可以将这个函数扩展到其他类似情况。
为了将错误记录发送到DLT,我们需要提供以下配置。
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
为了激活DLQ,应用程序必须提供组名。
匿名消费者无法使用DLQ设施。
我们还需要通过设置enableDLQ卡夫卡消费者的属性绑定于true. 最后,我们可以通过提供dlqName在 Kafka 消费者绑定上,否则默认为error.input-topic.my-group在这种情况下。
注意,在上述示例消费者中,有效载荷类型为字节[]. 默认情况下,Kafka binder 中的 DLQ 生成器期望 为 的有效载荷字节[]. 如果不是这样,那么我们需要提供合适的串行器配置。例如,我们将消费者函数重写如下:
@Bean
public Consumer<String> processData() {
return s -> {
throw new RuntimeException();
};
}
现在,我们需要告诉 Spring Cloud Stream,写入 DLT 时我们想如何序列化数据。以下是该场景的修改配置:
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
4.2. 带高级重试选项的DLQ
4.2.1. 问题陈述
这和上面的方案类似,但作为开发者,我想配置重试处理方式。
4.2.2. 解决方案
如果你按照上面的配方作,那么当处理遇到错误时,卡夫卡绑定器里会内置默认的重试选项。
默认情况下,绑定器最多退赛3次,初始延迟1秒,每次后退时2.0倍数,最大延迟10秒。你可以像下面那样更改所有这些配置:
spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval
如果你愿意,也可以通过提供布尔值的映射来提供可重试的异常列表。 例如
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false
默认情况下,地图中未列出的任何例外都会被重试。如果不希望如此,你可以通过以下条件禁用,
spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false
你也可以提供自己的重试模板并标记为@StreamRetryTemplate这些信息会被扫描并被活页夹使用。当你需要更复杂的重试策略和策略时,这非常有用。
如果你有多重@StreamRetryTemplate然后你可以通过以下属性指定绑定想要的 Beans,
spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>
4.3. 处理使用DLQ的反序列化错误
4.3.1. 问题陈述
我有一个处理器在 Kafka Consumer 中遇到了反序列化异常。我本以为 Spring Cloud Stream DLQ 机制会捕捉到这种情况,但它没有。我该如何处理这个问题?
4.3.2. 解决方案
Spring Cloud Stream提供的正常DLQ机制在Kafka用户抛出不可恢复的反序列化异常时无法解决问题。这是因为,该异常甚至在消费者poll()方法返回。Spring for Apache Kafka 项目提供了一些很好的方法来帮助绑定器处理这种情况。让我们来探讨这些方法。
假设这是我们的函数:
@Bean
public Consumer<String> functionName() {
return s -> {
System.out.println(s);
};
}
这是一个平凡函数,具有字符串参数。
我们想绕过Spring Cloud Stream提供的消息转换器,改用原生反串化器。在字符串类型方面,这不太合理,但对于像AVRO等更复杂的类型,你必须依赖外部解串器,因此需要委托转换到Kafka。
现在当消费者收到数据时,假设存在一个错误记录导致反序列化错误,可能有人传递了整数而不是字符串例如。 在这种情况下,如果你在应用里没有做某件事,异常会在链条中传播,最终你的应用会退出。
为了处理这个问题,你可以添加一个ListenerContainerCustomizer @Bean该配置为默认错误处理. 这默认错误处理配置为死信出版恢复者. 我们还需要配置一个ErrorHandlingDeserializer对消费者来说。这听起来很复杂,但实际上,归结为这三颗豆子。
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setCommonErrorHandler(errorHandler);
};
}
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
让我们逐一分析。第一个是ListenerContainerCustomizer豆子需要一个默认错误处理. 容器现在已经用该错误处理程序进行了定制。你可以在这里了解更多关于容器自定义的信息。
第二颗豆子是默认错误处理该配置为发布到双重学习技术. 更多详情请见此处默认错误处理.
第三颗豆子是死信出版恢复者最终负责发送给双重学习技术. 默认情况下,双重学习技术主题被命名为ORIGINAL_TOPIC_NAME。DLT。不过你可以更改这个名。详情请参见文档。
我们还需要通过应用配置配置ErrorHandlingDe串行器。
这ErrorHandlingDeserializer授权到实际的反串化器。
如果出现错误,它会将记录的键/值设置为空,并包含消息的原始字节。
然后它在一个头部中设置异常,并将该记录传递给监听器,监听器再调用注册的错误处理程序。
以下是所需的配置:
spring.cloud.stream:
function:
definition: functionName
bindings:
functionName-in-0:
group: group-name
destination: input-topic
consumer:
use-native-decoding: true
kafka:
bindings:
functionName-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
我们正在提供ErrorHandlingDeserializer通过配置绑定上的属性。
我们还表示实际要委派的反串化器是字符串解串器.
请记住,上述DLQ属性与本配方讨论无关。 它们纯粹用于解决任何应用层面的错误。
4.4. Kafka 装订器中的基本偏移管理
4.4.1. 问题陈述
我想写一个 Spring Cloud Stream 的 Kafka 消费者应用,但不确定它是如何管理 Kafka 消费者的偏移量的。 你能解释一下吗?
4.4.2. 解决方案
我们鼓励你阅读相关文档部分,以全面理解。
大致内容如下:
Kafka 默认支持两种初始偏移类型——最早和最近的.
它们的语义从名字中就能理解。
假设你是第一次使用消费者。
如果你在Spring Cloud Stream申请中错过了 group.id,那么它就会变成匿名消费者。
无论哪里,只要你遇到匿名用户,Spring Cloud Stream 应用默认会从最近的主题分区中的可用偏移量。
另一方面,如果你明确指定了 group.id,那么默认情况下,Spring Cloud Stream 应用会从最早主题分区中的可用偏移量。
在上述两种情况(具有显式组和匿名组的消费者)中,起始偏移量可以通过使用以下属性进行交换spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset并设置为最早或最近的.
现在,假设你之前已经运行过消费者,现在重新开始。
在这种情况下,上述起始偏移语义不适用,因为消费者已经为消费者群体找到了已承诺的偏移量(对于匿名消费者,尽管应用程序未提供 group.id,活页夹会自动为你生成一个)。
它只是从上一个提交的偏移量开始。
即使你有startOffset价值被提供。
不过,你可以通过使用resetOffsets财产。
为此,设置属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets自true(即false默认情况下)。
然后确保你提供startOffset值(或最早或最近的).
当你这样做后再启动消费者应用程序时,每次启动时,它都像第一次启动一样开始,忽略分区的任何已提交的偏移量。
4.5. 寻求卡夫卡中的任意抵消
4.5.1. 问题陈述
用Kafka活页夹,我知道它可以把偏移设置成以下一种最早或最近的但我有个要求,必须寻找中间的某个偏移量,一个任意的偏移量。
有没有办法用Spring Cloud Stream Kafka活页夹实现这个功能?
4.5.2. 解决方案
之前我们看到Kafka活页夹如何帮助你处理基础的胶置管理。 默认情况下,活页夹不允许你倒带到任意偏移,至少通过我们在那个配方中看到的机制是这样。 然而,活页夹提供了一些低层策略来实现这一用例。 让我们来探索一下。
首先,当你想重置到任意偏移量时,除了最早或最近的,确保离开resetOffsets配置到其默认值,即false.
然后你必须提供一种定制的咖啡豆KafkaBindingRebalanceListener,将注入所有消费者绑定。
这是一个带有几种默认方法的界面,但我们感兴趣的是:
/**
* Invoked when partitions are initially assigned or after a rebalance. Applications
* might only want to perform seek operations on an initial assignment. While the
* 'initial' argument is true for each thread (when concurrency is greater than 1),
* implementations should keep track of exactly which partitions have been sought.
* There is a race in that a rebalance could occur during startup and so a topic/
* partition that has been sought on one thread may be re-assigned to another
* thread and you may not wish to re-seek it at that time.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment on the current thread.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
// do nothing
}
让我们来看看细节。
本质上,该方法将在初始任务分配时或重新平衡后调用。为更好地说明,假设我们的主题为福它有4个分区。最初,我们只在组中启动一个消费者,这个消费者会从所有分区中消耗。当消费者首次启动时,所有4个分区都会被初始分配。然而,我们不希望从默认值开始分区消费(最早由于我们定义了一个群),而是对于每个分区,我们希望它们在寻找任意偏移量后消耗。假设你有一个商业案例,需要从某些偏移量中消耗,如下所示。
Partition start offset
0 1000
1 2000
2 2000
3 1000
这可以通过实现上述方法实现。
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);
if (initial) {
partitions.forEach(tp -> {
if (topicPartitionOffset.containsKey(tp)) {
final Long offset = topicPartitionOffset.get(tp);
try {
consumer.seek(tp, offset);
}
catch (Exception e) {
// Handle exceptions carefully.
}
}
});
}
}
这只是一个初步的实现。现实中的用例远比这复杂得多,你需要相应调整,但这确实给了你一个基本的草图。当消费者寻求如果失败,可能会触发运行时异常,你需要决定在这种情况下该怎么做。
4.5.3. 如果我们用相同的组ID启动第二个消费者呢?
当我们添加第二个消费者时,会发生重新平衡,一些分区会被移动。假设新的消费者获得了分区2和3. 当这个新的Spring Cloud Stream用户称呼时onPartitionsAssigned方法,它会看到这是划分的初始赋值2和3对该消费者。因此,它会执行寻道作,因为对初论点。 对于第一个消费者,它现在只有分区0和1然而,对于该消费者来说,这只是一次再平衡事件,并未被视为初始赋值。因此,由于对初论点。
4.6. 我如何手动确认使用 Kafka 活页夹?
4.6.1. 问题陈述
使用 Kafka Binder,我想手动在我的消费者中确认消息。我该怎么做?
4.6.2. 解决方案
默认情况下,Kafka 绑定器会委派给 Spring for Apache Kafka 项目中的默认提交设置。默认ack模式Spring的卡夫卡是Batch. 更多详情请见这里。
有些情况下你想禁用默认提交行为,依赖手动提交。按照步骤作就可以实现这一点。
设置属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode无论哪一手动或MANUAL_IMMEDIATE. 当它被这样设置时,会有一个叫做kafka_acknowledgment(摘自KafkaHeaders.致谢)存在于消费者方法接收到的消息中。
例如,把它想象成你的消费者方法。
@Bean
public Consumer<Message<String>> myConsumer() {
return msg -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
};
}
然后你设置属性spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode自手动或MANUAL_IMMEDIATE.
4.7. 我如何覆盖 Spring Cloud Stream 中的默认绑定名称?
4.7.1. 问题陈述
Spring Cloud Stream 会根据函数定义和签名创建默认绑定,但我该如何将这些绑定覆盖到更适合域名的名称?
4.7.2. 解决方案
假设以下是你的函数签名。
@Bean
public Function<String, String> uppercase(){
...
}
默认情况下,Spring Cloud Stream会像下面那样创建绑定。
-
0大写
-
大写输出0
你可以通过以下属性覆盖这些绑定。
spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out
之后,必须对新名称施加所有绑定性质,我的变形金刚和我的转换器输出.
这里还有另一个例子,使用Kafka Streams和多输入。
@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}
默认情况下,Spring Cloud Stream 会为该函数创建三个不同的绑定名称。
-
processOrder-in-0
-
processOrder-in-1
-
processOrder-out-0
每次想为这些绑定设置时,都必须使用这些绑定名称。你不喜欢这样,你希望使用更适合域且易读的绑定名称,比如这样的。
-
订单
-
帐户
-
丰富勋章
你只需设置这三个属性即可轻松实现
-
spring.cloud.stream.function.bindings.processOrder-in-0=orders
-
spring.cloud.stream.function.bindings.processOrder-in-1=accounts
-
spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders
一旦你这样做,它就会覆盖默认的绑定名,而你想设置的所有属性都必须放在这些新的绑定名上。
4.8. 我如何将消息密钥作为记录的一部分发送?
4.8.1. 问题陈述
我需要把密钥和记录的有效载荷一起发送,有没有办法在Spring Cloud Stream里做到这一点?
4.8.2. 解决方案
通常你需要发送关联数据结构,比如映射作为带有键和值的记录。 《春云流》让你以一种直接的方式实现这一点。 以下是实现这一点的基本蓝图,但你可能需要根据你的具体用例进行调整。
这里是采样生产者法(也称为提供商).
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
这是一个平凡函数,发送带有字符串有效载荷,但也有钥匙。
注意,我们将密钥设置为消息头部,使用KafkaHeaders.MESSAGE_KEY.
如果你想把密钥从默认更改kafka_messageKey那么在配置中,我们需要指定以下性质:
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']
请注意,我们使用绑定名称提供商出局0既然这是我们的功能名称,请相应更新。
然后,我们在生成消息时使用这个新密钥。
4.9. 我如何使用原生串行器和反串行器来代替 Spring Cloud Stream 进行的消息转换?
4.9.1. 问题陈述
我想在 Kafka 中使用原生的串行器和解串器,而不是使用 Spring Cloud Stream 里的消息转换器。 默认情况下,Spring Cloud Stream 通过其内置的消息转换器来完成转换。 我怎样才能绕过这个,把责任交给卡夫卡?
4.9.2. 解决方案
这真的很简单。
你只需提供以下属性即可启用本地序列化。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true
然后,你还需要设置串行器。 有几种方法可以做到这一点。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
或者使用活页夹配置。
spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
使用束缚器时,它会对所有绑定施加,而在绑定处设置绑定则是每个绑定。
在反串出方面,你只需要提供解串器作为配置。
例如
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
你也可以把他们设定在活页夹层面。
有一个可选属性可以强制本地解码。
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true
然而,对于 Kafka 装帧器来说,这并非必要,因为当它到达装帧器时,Kafka 已经用配置好的反串化器将其反序列化。
4.10. 解释 Kafka Streams 绑定器中偏移重置的工作原理
4.10.1. 问题陈述
默认情况下,Kafka Streams 的活页夹总是从新用户最早的偏移开始。 有时,申请要求从最新的偏移量开始是有益的或必须的。 Kafka Streams 的活页夹可以让你做到这一点。
4.10.2. 解决方案
在我们看解决方案之前,先看看以下情景。
@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
(s, t) -> s.join(t, ...)
...
}
我们有一个双消费者需要两个输入绑定的豆子。
在这种情况下,第一个绑定是KStream第二个是KTable(英国可爱的)音乐.
首次运行该应用程序时,默认情况下,两个绑定都从最早抵消。
那我想从这个开始呢?最近的是因为某些要求而抵消的吗?
你可以通过启用以下属性来实现这一点。
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest
如果你只想从一个绑定开始,最近的偏移量,另一个则从默认值转给消费者最早然后将后者从配置中剔除。
请记住,一旦有承诺的偏移量,这些设置就不会被尊重,已承诺的偏移优先。
4.11. 记录卡夫卡中成功发送(制作)记录的情况
4.11.1. 问题陈述
我有一个Kafka制作人申请,想记录我所有成功的发送记录。
4.11.2. 解决方案
假设申请中有以下提供商。
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
然后,我们需要定义一个新的消息频道BEAN 用来捕捉所有成功发送的信息。
@Bean
public MessageChannel fooRecordChannel() {
return new DirectChannel();
}
接下来,在应用配置中定义该属性,以提供记录元数据通道.
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel
此时,成功发送的信息将发送给fooRecord频道.
你可以写一个集成流程如下所示,您可以查看相关信息。
@Bean
public IntegrationFlow integrationFlow() {
return f -> f.channel("fooRecordChannel")
.handle((payload, messageHeaders) -> payload);
}
在处理方法,payload 是发送到 Kafka 的,消息头部包含一个名为kafka_recordMetadata.
其价值为记录元数据其中包含主题分区、当前偏移等信息。
4.12. 在 Kafka 中添加自定义头部映射器
4.12.1. 问题陈述
我有一个 Kafka producer 应用程序可以设置一些头部,但在消费者应用中缺少这些头部。为什么?
4.12.2. 解决方案
在正常情况下,这应该没问题。
想象一下,你有以下的制作人。
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}
在消费者端,你仍然应该看到“foo”这个头,以下内容应该不会有问题。
@Bean
public Consumer<Message<String>> consume() {
return s -> {
final String foo = (String)s.getHeaders().get("foo");
System.out.println(foo);
};
}
如果你在应用中提供了自定义的头部映射器,那就无法实现。
假设你有一个空的KafkaHeaderMapper在申请表里。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
}
};
}
如果你的实现方式是这样的,那你就会错过福消费者的头部。
很可能你在这些里面有一些逻辑KafkaHeaderMapper方法。
你需要以下信息来填充福页眉。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String foo = (String) headers.get("foo");
target.add("foo", foo.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header foo = source.lastHeader("foo");
target.put("foo", new String(foo.value()));
}
}
这样才能正确填充福生产者向消费者的标题。
4.12.3. 关于 id 头部的特别说明
在春云溪中,身份证Header 是一个特殊的 header,但有些应用可能希望有特殊的自定义 ID header——类似这样的自定义ID或身份证或身份.
第一个(自定义ID)将无需任何自定义的头部映射器从生产者传播到消费者。
然而,如果你用框架的一个变体来生产,保留了身份证首部 - 例如身份证,身份,iD等等,你会遇到框架内部的问题。
想了解更多关于这个用例的背景,可以参考这个 StackOverflow 帖子。
那你必须用自定义软件KafkaHeaderMapper映射大小写区分ID头部。
例如,假设你有以下生产者。
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}
头部身份以上内容将从消费端消失,因为它与框架冲突身份证页眉。
你可以提供定制KafkaHeaderMapper解决这个问题。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String myId = (String) headers.get("Id");
target.add("Id", myId.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header Id = source.lastHeader("Id");
target.put("Id", new String(Id.value()));
}
};
}
这样做,两者都得身份证和身份头部将由生产商提供给消费者端。
4.13. 在交易中对多个主题进行产出
4.13.2. 解决方案
在 Kafka Binder 中使用事务支持进行交易,然后提供AfterRollback处理器.
为了生成多个主题,请使用流桥应用程序接口。
以下是该代码的摘要:
@Autowired
StreamBridge bridge;
@Bean
Consumer<String> input() {
return str -> {
System.out.println(str);
this.bridge.send("left", str.toUpperCase());
this.bridge.send("right", str.toLowerCase());
if (str.equals("Fail")) {
throw new RuntimeException("test");
}
};
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
container.setAfterRollbackProcessor(rollbackProcessor);
};
}
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
return new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}
4.13.3. 必要配置
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right
spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1
为了测试,你可以使用以下工具:
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
System.in.read();
template.send("input", "Fail".getBytes());
template.send("input", "Good".getBytes());
};
}
一些重要说明:
请确保您在应用配置中没有任何DLQ设置,因为我们会手动配置DLT(默认情况下,DLT会发布到一个名为输入。双重学习技术基于初始消费者函数)。
另外,重置最大尝试次数关于消费者绑定到1以避免活页夹重试。
在上述示例中,最多尝试3次(初始尝试+2次尝试固定后退).
关于如何测试这段代码,请参见 StackOverflow 讨论串。
如果你用 Spring Cloud Stream 测试,增加更多消费者函数,务必设置隔离层级在消费者绑定到已读提交.
这个StackOverflow帖子也与此讨论相关。
4.14. 运行多个可投票消费者时应避免的陷阱
4.14.1. 问题陈述
我如何运行多个可轮询的消费者实例并生成唯一client.id每一次?
4.14.2. 解决方案
假设我有以下定义:
spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group
运行应用时,Kafka的消费者会生成一个 client.id(类似消费者-我的组-1).
对于每个正在运行的应用程序实例,以下client.id结果会一样,导致意想不到的问题。
为了解决这个问题,你可以在每个应用程序实例上添加以下属性:
spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}
详情请参见GitHub相关问题。