配置选项
本节包含Kafka Streams绑定器中使用的配置选项。
有关绑定器的常见配置选项和属性,请参阅核心文档。
Kafka Streams Binder Properties
以下属性在活页夹层面可用,且必须以Spring.cloud.stream.kafka.streams.binder.任何在 Kafka Streams 活页夹中重复使用的 Kafka 提供的属性都必须在前缀spring.cloud.stream.kafka.streams.binder而不是Spring.cloud.stream.kafka.binder.
唯一的例外是定义 Kafka bootstrap 服务器属性时,任一前缀都适用。
- 配置
-
映射中包含与 Apache Kafka Streams API 相关的属性的键值对。 该性质必须以
Spring.cloud.stream.kafka.streams.binder.. 以下是使用该性质的一些例子。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
关于所有可能涉及流配置的属性的更多信息,请参见StreamsConfigJavaDocs in Apache Kafka Streams docs.
所有你可以从中设置的配置StreamsConfig可以通过这个设置。
使用该性质时,适用于整个应用程序,因为它是绑定器级别的性质。
如果你在应用中有多个处理器,它们都会获得这些属性。
在以下性质的情况下application.id这将成为问题,因此你必须仔细检查从中获得的性质StreamsConfig是用这个绑定器层级映射的配置财产。
- functions.<function-bean-name>.applicationId
-
仅适用于功能性型处理器。 这可以用来设置应用程序中的每个函数的应用ID。 对于多函数,这是一种方便设置应用ID的方法。
- functions.<function-bean-name>.configuration
-
仅适用于功能性型处理器。 映射中包含与 Apache Kafka Streams API 相关的属性的键值对。 这与束缚剂级别类似
配置上述描述的性质,但该层级配置性质仅对命名函数受限。 当你有多个处理器,并且想根据特定功能限制对配置的访问时,你可能会想用这个方法。 都StreamsConfig这里可以使用属性。 - 经纪人
-
经纪商网址
违约:
本地主持 - zkNodes
-
Zookeeper 网址
违约:
本地主持 - deserializationExceptionHandler
-
反序列化错误处理类型。 该处理程序在绑定器层面应用,因此针对应用中所有输入绑定。 在消费者绑定层面,有一种更细致的方法来控制它。 可能的数值有——
logAndContinue,logAndFail,跳过并继续或sendToDlq违约:
logAndFail - applicationID
-
方便地在装订器层面为Kafka Streams应用设置全球的 Books(卡夫卡流)应用 application.id。 如果应用程序包含多个功能,那么应用 ID 应设置为不同设置。 详见上文,详细讨论了设置应用ID。
默认情况下:应用程序将生成一个静态应用ID。详情请参见申请ID部分。
- stateStoreRetry.maxAttempts
-
Max试图连接州商店。
默认值:1
- stateStoreRetry.backoffPeriod
-
尝试重试连接州商店时,需要有一段时间的退后期。
默认:1000毫秒
- consumerProperties
-
在装订者层面的任意消费者属性。
- 制作人属性
-
在粘合剂层面,生产者的属性是任意的。
- includeStoppedProcessorsForHealthCheck
-
当处理器绑定通过执行器停止时,该处理器默认不会参与健康检查。 将此属性设置为
true为所有处理器(包括目前通过绑定执行器终端被阻止的处理器)启用健康检查。默认:false
Kafka Streams 制片人
以下属性仅供Kafka Streams制作者使用,且必须以spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.为了方便起见,如果有多个输出绑定且它们都需要一个共同值,可以通过使用前缀配置spring.cloud.stream.kafka.streams.default.producer..
- keySerde
-
Key Serde to use
默认:参见上面关于消息去序列化的讨论
- valueSerde
-
value serde to use
默认:参见上面关于消息去序列化的讨论
- useNative编码
-
用于启用/禁用本地编码的标志
违约:
true. - streamPartitionerBeanName
-
用户可自定义的出站分区豆名。 应用程序可以提供定制服务
StreamPartitioner作为春季豆,且该豆名可以提供给生产者,代替默认名称。默认:请参见上面关于出站分区支持的讨论。
- 制作作品
-
处理器生产的汇组件的自定义名称。
违约:
没有(由Kafka Streams生成)
卡夫卡流媒体消费品
以下属性供Kafka Streams用户使用,且必须以前缀加spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.为了方便起见,如果有多个输入绑定且都需要一个共同值,可以通过使用前缀进行配置spring.cloud.stream.kafka.streams.default.consumer。.
- applicationID
-
设置每个输入绑定的 application.id。
默认:见上文。
- keySerde
-
Key Serde to use
默认:参见上面关于消息去序列化的讨论
- valueSerde
-
value serde to use
默认:参见上面关于消息去序列化的讨论
- 实现为
-
在使用入站KTable类型时实现状态存储
违约:
没有. - 缓存失效
-
关闭实体化KTable的缓存。 当设置为
true调用withCachingDisabled()在物质化物体上。 当设置为false调用withCachingEnabled()在物质化物体上。违约:
false. - 日志被禁用
-
禁用实体化KTable的日志记录。 当设置为
true调用withLoggingDisabled()在物质化物体上。违约:
false. - useNativeDecoding
-
启用/禁用原生解码的标志
违约:
true. - dlqName
-
DLQ主题名称。
默认:请参见上文关于错误处理和DLQ的讨论。
- startOffset
-
如果没有承诺的可消费的偏移量,则从起始点。 这通常用于消费者首次阅读某个主题时。 卡夫卡流的用途
最早作为默认策略,绑定者也使用相同的默认策略。 此值可覆盖为最近的利用该性质。违约:
最早.
注意:使用resetOffsets对消费者来说,这对Kafka Streams的活页夹没有任何影响。
与基于消息通道的活页夹不同,Kafka Streams 的活页夹不寻求按需开始或结束。
- deserializationExceptionHandler
-
反序列化错误处理类型。 该处理程序是按消费者绑定应用的,而非之前描述的绑定器级别属性。 可能的数值有——
logAndContinue,logAndFail,跳过并继续或sendToDlq违约:
logAndFail - 时间戳提取器BeanName。
-
消费者需使用特定的时间戳提取豆名称。 应用程序可以提供
时间戳提取器作为春豆,且该豆子的名称可以提供给消费者,以代替默认名称。默认情况下:请参见上面关于时间戳提取器的讨论。
- 事件类型
-
逗号分隔了该绑定支持的事件类型列表。
违约:
没有 - eventTypeHeaderKey
-
通过此绑定,每个入站记录的事件类型头键。
违约:
event_type - 被消费的As
-
为处理器从源组件自定义名称。
Deafult:
没有(由Kafka Streams生成)
关于并发的特别说明
在 Kafka Streams 中,你可以控制处理器通过数字.stream.threads财产。
你可以利用各种方法来实现配置上述选项分别在文件夹、功能、生产者或消费者层面描述。
你也可以使用并发这是核心 Spring Cloud Stream 为此目的提供的属性。
使用时,你需要对消费者使用。
当你有多个输入绑定时,把它设置在第一个输入绑定上。
例如,设置时Spring.cloud.stream.bindings.process-in-0.consumer.concurrency,翻译为数字.stream.threads在活页夹旁边。
如果你有多个处理器,其中一个处理器定义了绑定级别并发,而其他处理器没有,那么那些没有绑定级别并发的处理器将默认回到通过以下方式指定的绑定器范围属性spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads.
如果无法使用该绑定器配置,应用程序将使用 Kafka Streams 的默认设置。