配置选项
本节包含 Apache Kafka 活页夹使用的配置选项。
有关与 Binder 相关的常见配置选项和属性,请参阅核心文档中的 binding 属性。
Kafka Binder 属性
- spring.cloud.stream.kafka.binder.brokers
-
Kafka Binder 连接到的代理列表。
违约:
localhost. - spring.cloud.stream.kafka.binder.defaultBrokerPort
-
brokers允许指定带或不带端口信息的主机(例如host1,host2:port2). 这将在 broker 列表中未配置端口时设置默认端口。违约:
9092. - spring.cloud.stream.kafka.binder.configuration
-
传递给 Binder 创建的所有客户端的客户端属性(生产者和使用者)的键/值映射。 由于这些属性由生产者和使用者都使用,因此应仅限于通用属性(例如,安全性设置)。 通过此配置提供的未知 Kafka 生产者或使用者属性将被过滤掉,不允许传播。 这里的 properties 取代了 boot 中设置的任何 properties。
默认值:空地图。
- spring.cloud.stream.kafka.binder.consumerProperties
-
任意 Kafka 客户端使用者属性的键/值映射。 除了支持已知的 Kafka 使用者属性外,这里还允许未知的使用者属性。 这里的属性会取代在 boot 和
configuration属性。默认值:空地图。
- spring.cloud.stream.kafka.binder.headers
-
由 Binder 传输的自定义标头的列表。 仅当⇐使用
kafka-clients版本 < 0.11.0.0。较新版本本身支持标头。默认值:空。
- spring.cloud.stream.kafka.binder.healthTimeout的
-
等待获取分区信息的时间,以秒为单位。 如果此计时器过期,则运行状况报告为 down。
默认值:60。
- spring.cloud.stream.kafka.binder.requiredAcks
-
代理上所需的 ack 数。 请参阅创建者的 Kafka 文档
acks财产。违约:
1. - spring.cloud.stream.kafka.binder.minPartitionCount
-
仅在以下情况下有效
autoCreateTopics或autoAddPartitions已设置。 Binder 在其生成或使用数据的主题上配置的全局最小分区数。 它可以被partitionCount设置或instanceCount * concurrency创建器的设置(如果其中一个更大)。违约:
1. - spring.cloud.stream.kafka.binder.producerProperties
-
任意 Kafka 客户端生产者属性的键/值映射。 除了支持已知的 Kafka 生产者属性外,这里还允许未知的生产者属性。 这里的属性会取代在 boot 和
configuration属性。默认值:空地图。
- spring.cloud.stream.kafka.binder.replicationFactor
-
如果
autoCreateTopics处于活动状态。 可以在每个绑定上覆盖。如果您使用的是 2.4 之前的 Kafka 代理版本,则应至少将此值设置为 1. 从版本 3.0.8 开始,Binder 使用-1作为默认值,这表示代理 'default.replication.factor' 属性将用于确定副本数。 请与您的 Kafka 代理管理员联系,了解是否有需要最小复制因子的策略,如果是这种情况,则通常会使用default.replication.factor将匹配该值,而-1,除非您需要大于最小值的复制因子。违约:
-1. - spring.cloud.stream.kafka.binder.autoCreate主题
-
如果设置为
true,Binder 会自动创建新主题。 如果设置为false,则 Binder 依赖于已配置的主题。 在后一种情况下,如果主题不存在,则 Binder 无法启动。此设置独立于 auto.create.topics.enable设置,并且不会影响它。 如果服务器设置为自动创建主题,则可以使用默认代理设置将它们作为元数据检索请求的一部分创建。违约:
true. - spring.cloud.stream.kafka.binder.autoAddPartitions
-
如果设置为
true,Binder 会根据需要创建新分区。 如果设置为false,则 Binder 依赖于已配置的主题的分区大小。 如果目标主题的分区计数小于预期值,则 Binder 无法启动。违约:
false. - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
在 Binder 中启用事务。看
transaction.id和 Kafka 文档中的 Transactionsspring-kafka文档。 启用事务后,单个producer属性将被忽略,并且所有生产者都使用spring.cloud.stream.kafka.binder.transaction.producer.*性能。默认值
null(无交易) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
事务 Binder 中生成者的全局生产者属性。 看
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix以及 Kafka Producer Properties 以及所有 Binders 支持的常规 producer 属性。默认值:请参阅各个生产者属性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
一个 bean 的 bean 名称
KafkaHeaderMapper用于映射spring-messaging标头与 Kafka 标头之间的匹配。 例如,如果您希望在BinderHeaderMapper对 Headers 使用 JSON 反序列化的 bean。 如果此自定义BinderHeaderMapperBean 不可用于使用此属性的 Binder,则 Binder 将查找名称为kafkaBinderHeaderMapper那是BinderHeaderMapper在回退到默认值之前BinderHeaderMapper由 Binder 创建。默认值:none。
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
-
用于将 Binder 运行状况设置为
down时,无论从哪个使用者接收数据,都发现该主题上的任何分区没有领导者。违约:
true. - spring.cloud.stream.kafka.binder.certificateStore目录
-
当信任库或密钥库证书位置作为非本地文件系统资源(org.springframework.core.io.Resource支持的资源,例如CLASSPATH、HTTP等)给出时, Binder 将资源从路径(可转换为 org.springframework.core.io.Resource)复制到文件系统上的某个位置。 对于代理级别证书 (
ssl.truststore.location和ssl.keystore.location) 和用于 Schema 注册表的证书 (schema.registry.ssl.truststore.location和schema.registry.ssl.keystore.location). 请记住,truststore 和 keystore 位置路径必须在spring.cloud.stream.kafka.binder.configuration…. 例如spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location,spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location等。 该文件将被复制到指定为此属性值的位置,该位置必须是文件系统上可由运行应用程序的进程写入的现有目录。 如果未设置此值,并且证书文件是非本地文件系统资源,则它将被复制到 System 的临时目录,由System.getProperty("java.io.tmpdir"). 如果存在此值,但在文件系统上找不到该目录或该目录不可写,则也是如此。默认值:none。
- spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled
-
当设置为 true 时,每当访问该指标时,都会计算每个使用者主题的偏移滞后指标。 当设置为 false 时,仅使用定期计算的偏移滞后。
默认值:true
- spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval
-
计算每个使用者主题的偏移滞后的间隔。 每当
metrics.defaultOffsetLagMetricsEnabled已禁用或其 计算时间过长。默认值:60 秒
- spring.cloud.stream.kafka.binder.enableObservation
-
在此 Binder 中的所有绑定上启用 Micrometer observation 注册表。
默认值:false
- spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup
-
KafkaHealthIndicator元数据使用者group.id. 此使用者由HealthIndicator查询有关正在使用的主题的元数据。默认值:none。
Kafka Consumer 属性
以下属性仅适用于 Kafka 使用者,并且必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer..
为避免重复,Spring Cloud Stream 支持以spring.cloud.stream.kafka.default.consumer.<property>=<value>. |
- admin.configuration 的
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.properties,并且在未来版本中将删除对它的支持。 - admin.replicas-分配
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replicas-assignment,并且在未来版本中将删除对它的支持。 - admin.replication-factor (管理员复制因子)
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replication-factor,并且在未来版本中将删除对它的支持。 - autoRebalance已启用
-
什么时候
true,主题分区将在使用者组的成员之间自动重新平衡。 什么时候false,每个使用者都会根据spring.cloud.stream.instanceCount和spring.cloud.stream.instanceIndex. 这需要spring.cloud.stream.instanceCount和spring.cloud.stream.instanceIndex属性。 的spring.cloud.stream.instanceCount在这种情况下,property 通常必须大于 1。违约:
true. - ackEachRecord 的
-
什么时候
autoCommitOffset是true,此设置指示是否在处理每条记录后提交偏移量。 默认情况下,偏移量在consumer.poll()已处理。 轮询返回的记录数可以通过max.poll.recordsKafka 属性,该属性通过使用者configuration财产。 将此项设置为true可能会导致性能下降,但这样做会降低发生故障时重新传送记录的可能性。 另请参阅 BinderrequiredAcksproperty,这也会影响 commit offset 的性能。 此属性从 3.1 开始已弃用,取而代之的是ackMode. 如果ackMode未设置且未启用批处理模式,RECORDackMode 的调用。违约:
false. - autoCommitOffset
-
从版本 3.1 开始,此属性已弃用。 看
ackMode有关替代方案的更多详细信息。 是否在处理消息时自动提交偏移量。 如果设置为false、带有键kafka_acknowledgment的类型org.springframework.kafka.support.Acknowledgmentheader 出现在入站邮件中。 应用程序可以使用此标头来确认消息。 有关详细信息,请参阅 examples 部分。 当此属性设置为false,Kafka Binder 将 ack 模式设置为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL应用程序负责确认记录。 另请参阅ackEachRecord.违约:
true. - ackMode
-
指定容器确认模式。 这是基于 Spring Kafka 中定义的 AckMode 枚举。 如果
ackEachRecord属性设置为true并且 consumer 不是 batch 模式,那么这将使用RECORD,否则,请使用此属性提供的 ACK 模式。 - autoCommitOnError
-
在可轮询使用者中,如果设置为
true,它总是在出错时自动提交。 如果未设置(默认值)或 false,则不会在可轮询的使用者中自动提交。 请注意,此属性仅适用于可轮询的使用者。默认值:未设置。
- resetOffsets
-
是否将 Consumer 的 Offset 重置为 startOffset 提供的值。 如果
KafkaBindingRebalanceListener提供;请参阅 rebalance listener 有关此属性的更多信息,请参阅 reset-offsets。违约:
false. - startOffset
-
新组的起始偏移量。 允许的值:
earliest和latest. 如果为使用者 'binding' 显式设置了使用者组(通过spring.cloud.stream.bindings.<channelName>.group),则 'startOffset' 设置为earliest.否则,它设置为latest对于anonymousConsumer 组。 有关此属性的更多信息,请参阅 reset-offsets。默认值:null(相当于
earliest). - enableDlq
-
当设置为 true 时,它将为使用者启用 DLQ 行为。 默认情况下,导致错误的消息将转发到名为
error.<destination>.<group>. DLQ 主题名称可以通过设置dlqName属性或通过定义@Bean的类型DlqDestinationResolver. 这为更常见的 Kafka 重放场景提供了一个替代选项,用于错误数量相对较少并且重放整个原始主题可能太麻烦的情况。 有关更多信息,请参阅 kafka dlq 处理。 从版本 2.0 开始,发送到 DLQ 主题的消息通过以下标头进行了增强:x-original-topic,x-exception-message和x-exception-stacktrace如byte[]. 默认情况下,失败的记录将发送到 DLQ 主题中与原始记录相同的分区号。 有关如何更改该行为,请参阅 dlq partition selection。不允许destinationIsPattern是true.违约:
false. - dlq分区
-
什么时候
enableDlq为 true,并且未设置此属性,则会创建一个分区数与主主题相同数量的死信主题。 通常,死信记录会发送到与原始记录相同的死信主题中的分区。 此行为可以更改;请参阅 DLQ 分区选择。 如果此属性设置为1并且没有DqlPartitionFunctionbean,则所有死信记录都将写入 partition0. 如果此属性大于1,您必须提供DlqPartitionFunction豆。 请注意,实际的分区计数受 Binder 的minPartitionCount财产。违约:
none - 配置
-
使用包含通用 Kafka 使用者属性的键/值对进行映射。 除了具有 Kafka 使用者属性之外,还可以在此处传递其他配置属性。 例如,应用程序所需的一些属性,例如
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar. 这bootstrap.serversproperty 不能在此处设置;如果您需要连接到多个集群,请使用 Multi-Binder 支持。默认值:空地图。
- dlq名称
-
用于接收错误消息的 DLQ 主题的名称。
默认值:null(如果未指定,则导致错误的消息将转发到名为
error.<destination>.<group>). - dlqProducerProperties
-
使用此功能,可以设置特定于 DLQ 的创建者属性。 通过 kafka producer 属性提供的所有属性都可以通过此属性进行设置。 当 consumer 上启用了原生解码(即 useNativeDecoding: true)时,应用程序必须为 DLQ 提供相应的键/值序列化器。 这必须以
dlqProducerProperties.configuration.key.serializer和dlqProducerProperties.configuration.value.serializer.默认值:默认 Kafka 生产者属性。
- standardHeaders 的
-
指示入站通道适配器填充哪些标准头。 允许的值:
none,id,timestamp或both. 如果使用本机反序列化并且接收消息的第一个组件需要id(例如配置为使用 JDBC 消息存储的聚合器)。违约:
none - converterBean名称
-
实现
RecordMessageConverter.在入站通道适配器中使用,以替换默认的MessagingMessageConverter.违约:
null - idleEventInterval
-
指示最近未收到任何消息的事件之间的时间间隔(以毫秒为单位)。 使用
ApplicationListener<ListenerContainerIdleEvent>以接收这些事件。 有关使用示例,请参阅 pause-resume。违约:
30000 - destinationIsPattern
-
如果为 true,则目标被视为正则表达式
Pattern用于匹配 broker 的主题名称。 如果为 true,则不预置主题,并且enableDlq是不允许的,因为 Binder 在预置阶段不知道主题名称。 请注意,检测与模式匹配的新主题所花费的时间由 consumer 属性metadata.max.age.ms,该指标(在撰写本文时)默认为 300000 毫秒(5 分钟)。 这可以使用configuration属性。违约:
false - topic.properties
-
一个
Map预置新主题时使用的 Kafka 主题属性,例如spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0默认值:none。
- topic.replicas-assignment
-
副本分配的 Map<Integer、List<Integer>>,键是分区,值是分配。 在预置新主题时使用。 请参阅
NewTopicJavadocs 中的kafka-clients罐。默认值:none。
- topic.replication-factor
-
预置主题时要使用的复制因子。覆盖 binder 范围的设置。 如果 Ignored
replicas-assignments存在。默认值:none (使用 binder 范围的默认值 -1)。
- 轮询超时
-
用于轮询使用者中的轮询的超时。
默认值:5 秒。
- 事务管理器
-
的 Bean 名称
KafkaAwareTransactionManager用于覆盖此绑定的 Binder 的事务管理器。 如果要将另一个事务与 Kafka 事务同步,通常需要使用ChainedKafkaTransactionManaager. 要实现记录的 Just Once 使用和生成,Consumer 和 Producer 绑定都必须使用相同的事务管理器进行配置。默认值:none。
- txCommitRecovered
-
使用事务 Binders 时,默认情况下,将通过新事务提交已恢复记录的偏移量(例如,当重试用尽并且记录被发送到死信主题时)。 将此属性设置为
false禁止提交已恢复记录的偏移量。默认值:true。
- commonErrorHandlerBeanName
-
CommonErrorHandler每个使用者绑定使用的 Bean 名称。 如果存在,此用户提供CommonErrorHandler优先于 Binder 定义的任何其他错误处理程序。 这是表示错误处理程序的便捷方式,如果应用程序不想使用ListenerContainerCustomizer,然后检查 Destination/Group 组合以设置错误处理程序。默认值:none。
Kafka 生产者属性
以下属性仅适用于 Kafka 生产者,并且
必须以spring.cloud.stream.kafka.bindings.<channelName>.producer..
为避免重复,Spring Cloud Stream 支持以spring.cloud.stream.kafka.default.producer.<property>=<value>. |
- admin.configuration 的
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.properties,并且在未来版本中将删除对它的支持。 - admin.replicas-分配
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replicas-assignment,并且在未来版本中将删除对它的支持。 - admin.replication-factor (管理员复制因子)
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replication-factor,并且在未来版本中将删除对它的支持。 - 缓冲区大小
-
Kafka 创建者在发送之前尝试批处理的数据量的上限(以字节为单位)。
违约:
16384. - 同步
-
生产者是否同步。
违约:
false. - 发送超时表达式
-
根据传出消息评估的 SPEL 表达式,用于评估启用同步发布时等待确认的时间,例如,
headers['mySendTimeout']. 超时的值以毫秒为单位。 在 3.0 之前的版本中,除非使用本机编码,否则无法使用有效负载,因为在计算此表达式时,有效负载已经采用byte[]. 现在,在转换有效负载之前计算表达式。违约:
none. - batch超时
-
在发送消息之前,创建者等待多长时间以允许更多消息在同一批次中累积。 (通常,创建者根本不等待,而只是发送在上一次发送过程中累积的所有消息。非零值可能会以延迟为代价增加吞吐量。
违约:
0. - messageKey表达式
-
根据用于填充生成的 Kafka 消息的键的传出消息计算的 SpEL 表达式,例如,
headers['myKey']. 在 3.0 之前的版本中,除非使用本机编码,否则无法使用有效负载,因为在计算此表达式时,有效负载已经采用byte[]. 现在,在转换有效负载之前计算表达式。 对于常规处理器 (Function<String, String>或Function<Message<?>, Message<?>),如果生成的 key 需要与来自 topic 的传入 key 相同,则可以按如下方式设置此属性。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']对于 reactive 函数,有一个重要的警告需要牢记。 在这种情况下,由应用程序手动将标头从传入消息复制到出站消息。 您可以设置标头,例如myKey并使用headers['myKey']如上所述,或者为方便起见,只需设置KafkaHeaders.MESSAGE_KEYheader,则完全不需要设置此属性。违约:
none. - headerPatterns 的
-
一个逗号分隔的简单模式列表,用于匹配要映射到 Kafka 的 Spring 消息传递标头
Headers在ProducerRecord. 模式可以以通配符(星号)开头或结尾。 模式可以通过在!. 匹配在第一个匹配项 (正或负) 之后停止。 例如!ask,as*将通过ash但不是ask.id和timestamp永远不会映射。默认值:(所有标头 - 除了
*id和timestamp) - 配置
-
Map 包含通用 Kafka 生产者属性的键/值对。 这
bootstrap.serversproperty 不能在此处设置;如果您需要连接到多个集群,请使用 Multi-Binder 支持。默认值:空地图。
- topic.properties
-
一个
Map预置新主题时使用的 Kafka 主题属性,例如spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0 - topic.replicas-assignment
-
副本分配的 Map<Integer、List<Integer>>,键是分区,值是分配。 在预置新主题时使用。 请参阅
NewTopicJavadocs 中的kafka-clients罐。默认值:none。
- topic.replication-factor
-
预置主题时要使用的复制因子。覆盖 binder 范围的设置。 如果 Ignored
replicas-assignments存在。默认值:none (使用 binder 范围的默认值 -1)。
- useTopicHeader
-
设置为
true要使用KafkaHeaders.TOPICmessage 标头。 如果标头不存在,则使用默认绑定目标。违约:
false. - recordMetadataChannel
-
一个 bean 的 bean 名称
MessageChannel应将成功的发送结果发送到哪个位置;该 Bean 必须存在于应用程序上下文中。 发送到通道的消息是带有附加标头的已发送消息(转换后,如果有)KafkaHeaders.RECORD_METADATA. 标头包含一个RecordMetadataKafka 客户端提供的 object;它包括主题中写入记录的 partition 和 offset。ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)失败的发送将转到 producer 错误通道(如果已配置);请参阅 Kafka 错误通道。
默认值:null。
Kafka Binder 使用partitionCount设置创建具有给定分区计数的主题(与minPartitionCount,两者中的最大值是正在使用的值)。
在配置两者时要小心minPartitionCount对于活页夹和partitionCount对于应用程序,因为使用了较大的值。
如果已存在分区计数较小的主题,并且autoAddPartitions已禁用(默认值),则 Binder 无法启动。
如果已存在分区计数较小的主题,并且autoAddPartitions,则会添加新的分区。
如果已存在分区数大于最大值 (minPartitionCount或partitionCount),则使用现有分区计数。 |
- 压缩
-
将
compression.typeproducer 属性。 支持的值包括none,gzip,snappy,lz4和zstd. 如果您覆盖kafka-clientsjar 更改为 2.1.0(或更高版本),如 Spring for Apache Kafka 文档中所述,并希望使用zstd压缩, 使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd.违约:
none. - 事务管理器
-
的 Bean 名称
KafkaAwareTransactionManager用于覆盖此绑定的 Binder 的事务管理器。 如果要将另一个事务与 Kafka 事务同步,通常需要使用ChainedKafkaTransactionManaager. 要实现记录的 Just Once 使用和生成,Consumer 和 Producer 绑定都必须使用相同的事务管理器进行配置。默认值:none。
- closeTimeout 超时
-
关闭创建器时要等待的超时(以秒数为单位)。
违约:
30 - allowNonTransactional (允许非事务性)
-
通常,与事务 Binder 关联的所有输出绑定都将在新事务中发布(如果尚未进行)。 此属性允许您覆盖该行为。 如果设置为 true,则发布到此输出绑定的记录将不会在事务中运行,除非事务已在处理中。
违约:
false