|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0! |
配置选项
本节包含Apache Kafka绑定器所使用的配置选项。
关于绑定器的常见配置选项和属性,请参见核心文档中的绑定属性。
卡夫卡活页夹属性
- spring.cloud.stream.kafka.binder.brokers
-
卡夫卡装订器连接的经纪人列表。
违约:
本地主持. - spring.cloud.stream.kafka.binder.defaultBrokerPort
-
经纪人允许指定带有或不包含端口信息的主机(例如,主持1,主持2:端口2). 当经纪商列表中未配置端口时,该程序设置默认端口。违约:
9092. - Spring.cloud.stream.kafka.binder.configuration
-
客户端属性(生产者和消费者)的键值映射传递给所有由绑定器创建的客户端。 由于这些属性被生产者和消费者共同使用,使用应限制在常见属性上——例如安全设置。 通过该配置提供的未知Kafka生产者或消费者属性会被过滤掉,不允许传播。 这里的属性优先于boot中设置的任何属性。
默认:空白地图。
- spring.cloud.stream.kafka.binder.consumerProperties
-
任意 Kafka 客户端消费者属性的关键/值映射。 除了支持已知的卡夫卡消费者属性外,这里也允许使用未知的消费者属性。 这里的属性优先于boot中设置的所有属性,并且在
配置上方的财产。默认:空白地图。
- spring.cloud.stream.kafka.binder.headers
-
由活页夹传输的自定义头部列表。 仅在与较旧的应用程序(⇐ 1.3.x)通信时才需要
卡夫卡客户端版本< 0.11.0.0。新版本原生支持头部。默认:空。
- spring.cloud.stream.kafka.binder.healthTimeout
-
等待获取分区信息的时间只需几秒。 如果计时器到期,健康状况显示下降。
默认值:60。
- spring.cloud.stream.kafka.binder.requiredAcks
-
经纪人要求的按键数。 请参见卡夫卡关于制片人的文档
补丁财产。违约:
1. - spring.cloud.stream.kafka.binder.minPartitionCount
-
仅在
自动创建主题或自动添加分区已设定。 装订器在其产生或消耗数据的主题上配置的全局最小分区数。 它可以被partitionCount由生产者设定,或以实例计数 * 并发制作者的设置(如果其中一个更大)。违约:
1. - spring.cloud.stream.kafka.binder.producerProperties
-
任意 Kafka 客户端生产者属性的键值映射。 除了支持已知的卡夫卡制片人作品外,这里也允许使用未知制作人的作品。 这里的属性优先于boot中设置的所有属性,并且在
配置上方的财产。默认:空白地图。
- spring.cloud.stream.kafka.binder.replicationFactor
-
自动创建主题的复制因子如果
自动创建主题活跃。 每个绑定都可以被覆盖。如果你使用的是 Kafka 2.4 之前的 Broker 版本,那么这个值至少应该设置为 1. 从3.0.8版本开始,绑定器使用-1作为默认值,这表明代理的“default.replication.factor”属性将用于确定副本数量。 请咨询你的Kafka经纪商管理员,看看是否有政策要求最低复制因子,如果是这样,通常default.replication.factor将匹配该值,且-1应该使用,除非你需要比最小值更高的复制因子。违约:
-1. - spring.cloud.stream.kafka.binder.autoCreateTopics
-
如果设置为
true,活页夹会自动创建新的主题。 如果设置为false,活页夹依赖于主题已经配置好。 在后一种情况下,如果不存在这些主题,活页夹将无法开始。该设置与 auto.create.topics.enable经纪人设定,不会影响其设置。 如果服务器设置为自动创建主题,这些主题可能会作为元数据检索请求的一部分创建,且默认为代理设置。违约:
true. - spring.cloud.stream.kafka.binder.autoAddPartitions
-
如果设置为
true,如果需要,绑页器会创建新的分区。 如果设置为false,该绑定器依赖于已配置的主题分区大小。 如果目标主题的分区计数小于预期值,绑定器将无法启动。违约:
false. - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
使活页夹中的交易能够实现。看
transaction.id在卡夫卡文献和会议记录中春-卡夫卡文档。 当交易被启用时,个人制作人性质被忽略,所有生产者都使用spring.cloud.stream.kafka.binder.transaction.producer.*性能。默认值
零(无交易) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
交易性活页夹中的全局生产者属性。 看
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix以及卡夫卡制片人和所有装订商支持的综合制片人产。默认:查看各个生产者属性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
豆子的名称
KafkaHeaderMapper用于制图春季消息卡夫卡头部的标题和字幕之间。 例如,如果你想自定义受信任的包,可以使用这个BinderHeaderMapper使用JSON反序列化来生成头部的豆子。 如果这是习俗BinderHeaderMapper使用该属性时,绑订器不会提供 Bean 的可用性,然后 Binder 会寻找带有名称的头部映射 BeankafkaBinderHeaderMapper该类型为BinderHeaderMapper然后又回到默认状态BinderHeaderMapper由活页夹创造。默认:无。
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
-
用标志将活页夹的健康值设置为
下当该主题上的任何分区,无论接收数据的消费者如何,都没有引导者。违约:
true. - spring.cloud.stream.kafka.binder.certificateStoreDirectory
-
当信任存储或密钥存储证书位置作为非本地文件系统资源(由 org.springframework.core.io.Resource 支持的资源,如 CLASSPATH、HTTP 等)提供时, 绑定器将资源从路径(可转换为 org.springframework.core.io.Resource)复制到文件系统中的某个位置。 这对两个经纪人级证书都适用(
SSL.truststore.location和ssl.keystore.location)以及用于模式注册表的证书(schema.registry.ssl.truststore.location和schema.registry.ssl.keystore.location). 请记住,truststore和keystore的位置路径必须在以下以下提供。spring.cloud.stream.kafka.binder.configuration.... 例如spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location,Spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location等。 文件将被复制到指定为该属性值的位置,该位置必须是文件系统中已有的目录,且该目录可被运行该应用程序的进程写入。 如果该值未被设置且证书文件是非本地文件系统资源,则将被复制到系统临时目录,返回如下System.getProperty(“java.io.tmpdir”). 如果该值存在,但该目录在文件系统中找不到或不可写,也同样适用。默认:无。
- spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled
-
当设置为true时,每个消费者主题的偏移滞后度量在访问该指标时计算出来。 当设置为false时,只使用周期计算的偏移延迟。
默认:真
- spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval
-
计算每个消费者主题的偏移滞后的时间间隔。 该值用于
metrics.defaultOffsetLagMetricsEnabled是被禁用的,或者它的 计算太慢了。默认时间:60秒
- spring.cloud.stream.kafka.binder.enableObservation
-
启用该绑定器中所有绑定的微米观察登记。
默认:false
- spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup
-
KafkaHealthIndicator元数据消费者group.id. 该消费者被以下用户使用健康指标查询所使用主题的元数据。默认:无。
卡夫卡消费地产
以下属性仅供卡夫卡用户使用,且必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer..
为避免重复,Spring Cloud Stream 支持为所有频道设置数值,格式为spring.cloud.stream.kafka.default.consumer.<property>=<value>. |
- admin.configuration
-
自2.1.1版本起,该属性被弃用,取而代之的是
topic.properties,未来版本中将取消对该支持。 - admin.replicas-assignment
-
自2.1.1版本起,该属性被弃用,取而代之的是
topic.replicas-assignment,未来版本中将取消对该支持。 - admin.replication-factor
-
自2.1.1版本起,该属性被弃用,取而代之的是
主题。复制因子,未来版本中将取消对该支持。 - 自动重新平衡启用
-
什么时候
true话题分区会自动在消费者组成员之间重新平衡。 什么时候false,每个消费者会被分配一组固定的分区,基于spring.cloud.stream.instanceCount和spring.cloud.stream.instanceIndex. 这需要spring.cloud.stream.instanceCount和spring.cloud.stream.instanceIndex每个启动实例都必须正确设置属性。 该spring.cloud.stream.instanceCount在这种情况下,财产通常必须大于1。违约:
true. - AckEach记录
-
什么时候
autoCommitOffset是true该设置决定了每条记录处理后是否提交偏移量。 默认情况下,偏移量在返回的记录批次中的所有记录之后提交consumer.poll()已经处理完毕。 轮询返回的记录数量可以通过以下方式控制Max.poll.recordsKafka 属性,由消费者设定配置财产。 将此设置为true这可能导致性能下降,但这会降低故障发生时记录被重新交付的可能性。 另外,看看活页夹requiredAcks属性,也影响提交偏移量的性能。 该特性自3.1版本起被弃用,改为使用ack模式. 如果ack模式未设置且批处理模式未启用,记录将使用ackMode。违约:
false. - autoCommitOffset
-
从3.1版本开始,该属性被弃用。 看
ack模式关于替代方案的更多细节。 是否在消息处理完毕后自动提交偏移量。 如果设置为false,一个带有密钥的头部kafka_acknowledgment该类型org.springframework.kafka.support.Acknowledgedment入站消息中包含了 头部。 应用程序可以使用该头来确认消息。 详情请参见示例部分。 当该属性被设置为false,Kafka 绑定器将 ack 模式设置为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL应用程序负责确认记录。 另见AckEach记录.违约:
true. - ack模式
-
指定容器的 ack 模式。 这基于 Spring Kafka 定义的 AckMode 枚举。 如果
AckEach记录属性设置为true且消费者未处于批处理模式,则将使用ACK模式记录否则,使用该属性提供的 ack 模式。 - autoCommitOnError
-
在可轮询的消费者中,如果设置为
true,它总是自动提交错误。 如果未设置(默认)或为假,则不会在可轮询的消费者中自动提交。 请注意,这一特性仅适用于可投票消费者。默认:未设置。
- resetOffsets
-
是否将消费者的偏移量重置为startOffset提供的值。 如果 是
KafkaBindingRebalanceListener提供;参见rebalance lister,参见reset-offsets以了解更多关于该属性的信息。违约:
false. - startOffset
-
新组的起始偏移。 允许的数值:
最早和最近的. 如果消费者组被明确设置为消费者“绑定”(通过spring.cloud.stream.bindings.<channelName>.group),'startOffset' 被设置为最早.否则,它被设置为最近的对于匿名消费者集团。 有关该性质的更多信息,请参见重置偏移量。默认值:空(等价于
最早). - enableDlq
-
当设置为true时,它能为消费者实现DLQ行为。 默认情况下,导致错误的消息会被转发到名为
error.<destination>.<group>. DLQ主题名称可以通过设置dlqName性质或定义@Bean类型DlqDestinationResolver. 这为错误数量较少且重放整个原始主题过于繁琐的情况提供了替代卡夫卡回放场景的替代选择。 更多信息请参见Kafka DLQ处理。 从2.0版本开始,发送到DLQ主题的消息增加了以下头部:x-原始主题,x-异常-消息和x-exception-栈追踪如字节[]. 默认情况下,失败记录会被发送到与原始记录相同的分区号。 关于如何改变该行为,请参见 dlq 分区选择。不允许目的地是模式是true.违约:
false. - dlqPartitions
-
什么时候
enableDlq成立且该属性未被设置,因此创建了一个与主主题相同分区数的死字母主题。 通常,死信记录会发送到与原始记录相同的死信主题分区。 这种行为是可以改变的;参见DLQ划分选择。 如果该性质被设置为1而且没有DqlPartitionFunctionBean,所有死信记录都会写入分区0. 如果该性质大于1你必须提供一个DlqPartitionFunction豆。 注意实际分区计数受绑定器最小分区计数财产。违约:
没有 - 配置
-
映射为包含通用Kafka消费者属性的键值对。 除了具有 Kafka 的消费者属性外,还可以传递其他配置属性。 例如,应用程序需要的一些属性,例如
Spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar. 这bootstrap.servers这里不能设置属性;如果需要连接多个集群,使用多绑定器支持。默认:空白地图。
- dlqName
-
DLQ主题名称用于接收错误信息。
默认:空(如果未指定,导致错误的消息会转发到名为
error.<destination>.<group>). - dlqProducerProperties(开发者属性)
-
利用此,可以设置DLQ专属的生产者属性。 通过Kafka制作者属性可以设置所有属性。 当消费者启用原生解码(即 useNativeDecoding: true)时,应用程序必须为 DLQ 提供对应的键值串行器。 这必须以以下形式提供
dlqProducerProperties.configuration.key.serializer和dlqProducerProperties.configuration.value.serializer.默认:默认Kafka制作人属性。
- 标准头部
-
表示入站通道适配器填充的标准头部。 允许的数值:
没有,身份证,时间戳或双. 如果使用原生反序列化,且第一个接收消息的组件需要身份证(例如配置为使用 JDBC 消息存储的聚合器)。违约:
没有 - converterBeanName
-
一种实现
记录消息转换器.用于入站通道适配器,以替代默认接口消息信息转换器.违约:
零 - idleEventInterval
-
以毫秒为单位,表示最近未收到任何消息的事件间隔。 使用一个
ApplicationListener<ListenerContainerIdleEvent>接收这些事件。 请参见暂停-恢复的使用示例。违约:
30000 - 目的地是模式
-
当为真时,目标被视为正则表达式
模式经纪人用来匹配主题名称。 当属实时,主题不会被提供,且enableDlq不允许,因为活页夹在配置阶段不知道主题名称。 注意,检测与模式相符的新主题所花费的时间由消费者属性控制metadata.max.age.ms,在撰写时默认为300,000毫秒(5分钟)。 这可以通过以下方式进行配置配置上方的财产。违约:
false - topic.properties
-
一个
地图在为新主题提供时使用的Kafka主题属性——例如,spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0默认:无。
- topic.replicas-assignment
-
一个映射</整数、列表<整数>>副本赋值,键为分区,值为赋值。 用于配置新主题。 参见
新主题Javadocs 在卡夫卡客户端罐。默认:无。
- 主题。复制因子
-
在配置主题时,复制因素。覆盖整个活页夹的设置。 如果忽略
副本-赋值存在。默认值:无(使用整个绑定器默认的-1)。
- 投票时间
-
超时用于在可投票消费者中进行民调。
默认时间:5秒。
- transactionManager
-
Beans名称
KafkaAwareTransactionManager用于覆盖绑定器的交易管理器。 通常如果你想用 Kafka 交易同步另一笔交易,使用ChainedKafkaTransactionManaager. 为了实现记录的精确一次消费和生产,消费者和生产者绑定必须配置为同一个事务管理器。默认:无。
- txCommitRecovered
-
使用事务绑定器时,恢复记录的偏移量(例如重试用尽且记录被发送到死符主题时)默认通过新事务提交。 将该属性设置为
false抑制了对恢复记录的偏移量的提交。默认:真。
- commonErrorHandlerBeanName
-
通用错误处理器每个消费者绑定时使用的豆子名称。 在场时,该用户提供通用错误处理器优先于绑定器定义的其他错误处理程序。 如果应用程序不想使用ListenerContainerCustomizer然后检查目标/组的组合来设置错误处理程序。默认:无。
卡夫卡制片人版权
以下作品仅供卡夫卡制作人使用,
必须以spring.cloud.stream.kafka.bindings.<channelName>.producer..
为避免重复,Spring Cloud Stream 支持为所有频道设置数值,格式为spring.cloud.stream.kafka.default.producer.<property>=<value>. |
- admin.configuration
-
自2.1.1版本起,该属性被弃用,取而代之的是
topic.properties,未来版本中将取消对该支持。 - admin.replicas-assignment
-
自2.1.1版本起,该属性被弃用,取而代之的是
topic.replicas-assignment,未来版本中将取消对该支持。 - admin.replication-factor
-
自2.1.1版本起,该属性被弃用,取而代之的是
主题。复制因子,未来版本中将取消对该支持。 - 缓冲区大小
-
以字节计,卡夫卡制作者在发送前尝试批量处理的数据量上限。
违约:
16384. - 同步
-
制作人是否同步。
违约:
false. - sendTimeoutExpression
-
针对外发消息计算的SpEL表达式,用于评估启用同步发布时等待ack的时间——例如,
标题['mySendTimeout']. 超时值以毫秒为单位。 在3.0之前的版本中,除非使用本地编码,否则无法使用有效载荷,因为在该表达式被评估时,有效载荷已经以字节[]. 现在,在有效载荷转换之前,先计算该表达式。违约:
没有. - batchTimeout
-
生产者等待多长时间,允许同一批中更多消息积累后再发送消息。 (通常,生产者根本不等待,直接发送上一次发送过程中累积的所有消息。)非零值可能会以延迟为代价提高吞吐量。
违约:
0. - messageKeyExpression
-
一个针对用于填充生成卡夫卡消息密钥的外发消息进行评估的 SpEL 表达式——例如,
头部['myKey']. 在3.0之前的版本中,除非使用本地编码,否则无法使用有效载荷,因为在该表达式被评估时,有效载荷已经以字节[]. 现在,在有效载荷转换之前,先计算该表达式。 对于普通处理器(函数<字符串,字符串>或Function<Message<?>,Message<?>),如果产生的密钥需要与主题的输入密钥相同,则该属性可设置如下。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']对于反应性函数,有一个重要的注意事项需要注意。 在这种情况下,应用程序需要手动将收到消息的头部复制到发出消息。 你可以设置头部,例如:我的钥匙以及使用头部['myKey']如上所述,或者为方便起见,简单地设置KafkaHeaders.MESSAGE_KEY你根本不需要设置这个属性。违约:
没有. - 头部模式
-
一个逗号分隔的简单模式列表,用于匹配 Spring 消息头,映射到 Kafka
头在制作人唱片. 模式可以以万用符(星号)开始或结束。 模式可以通过前缀 来否定!. 匹配在第一次匹配后停止(正或负)。 例如!问,像*会过去的灰但又不是问.身份证和时间戳从未被映射。默认:(所有头部 - 除了
*身份证和时间戳) - 配置
-
映射中包含包含通用Kafka制作者属性的键值对。 这
bootstrap.servers这里不能设置属性;如果需要连接多个集群,使用多绑定器支持。默认:空白地图。
- topic.properties
-
一个
地图在为新主题提供时使用的Kafka主题属性——例如,spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0 - topic.replicas-assignment
-
一个映射</整数、列表<整数>>副本赋值,键为分区,值为赋值。 用于配置新主题。 参见
新主题Javadocs 在卡夫卡客户端罐。默认:无。
- 主题。复制因子
-
在配置主题时,复制因素。覆盖整个活页夹的设置。 如果忽略
副本-赋值存在。默认值:无(使用整个绑定器默认的-1)。
- useTopicHeader
-
设置为
true用 的值覆盖默认绑定目的地(主题名)卡夫卡标题。主题消息头在外发消息中。 如果没有头部,则使用默认绑定目的地。违约:
false. - 记录元数据通道
-
豆子的名称
消息频道成功发送结果应发送至该平台;豆子必须存在于应用上下文中。 发送到信道的消息是发送消息(转换后,如果有),并加一个额外的头部KafkaHeaders.RECORD_METADATA. 头部包含记录元数据由卡夫卡客户端提供的对象;它包括记录在主题中写入的分区和偏移量。ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)发送失败则进入制作人错误通道(如果已配置);参见卡夫卡错误通道。
默认:无。
卡夫卡活页夹使用了partitionCount将生产者设置为提示,以创建具有给定分区计数的主题(配合最小分区计数,两者中的最大值即为所用值)。
配置两者时请谨慎最小分区计数对于一个活接器和partitionCount对于某个应用,因为使用较大的值。
如果已有一个分区数较小的主题,且自动添加分区被禁用(默认),绑定器无法启动。
如果已有一个分区数较小的主题,且自动添加分区启用后,会添加新的分区。
如果一个主题已经存在,其分区数超过最大值 (最小分区计数或partitionCount),使用现有的分区计数。 |
- 压缩
-
设置
压缩类型生产者财产。 支持的值有没有,吉普,活泼,LZ4和ZSTD. 如果你覆盖了卡夫卡客户端jar 到 2.1.0(或更高版本),如 Apache Kafka 春季文档中讨论的,并希望使用ZSTD压缩与使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd.违约:
没有. - transactionManager
-
Beans名称
KafkaAwareTransactionManager用于覆盖绑定器的交易管理器。 通常如果你想用 Kafka 交易同步另一笔交易,使用ChainedKafkaTransactionManaager. 为了实现记录的精确一次消费和生产,消费者和生产者绑定必须配置为同一个事务管理器。默认:无。
- 结束时间
-
关闭生产者时,等待的秒数是超时。
违约:
30 - 允许非交易
-
通常,所有与事务绑定器关联的输出绑定都会发布在新的事务中,前提是该事务尚未在处理中。 这个属性允许你覆盖该行为。 如果设置为 true,发布到该输出绑定的记录不会在事务中运行,除非事务已经处于进程中。
违约:
false