绑定属性是使用 的格式提供的。
表示正在配置的绑定的名称。spring.cloud.stream.bindings.<bindingName>.<property>=<value>
<bindingName>
例如,对于以下函数
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
有两个绑定分别用于输入和输出。有关详细信息,请参阅 [绑定和绑定名称]。uppercase-in-0
uppercase-out-0
为了避免重复,Spring Cloud Stream 支持以常见绑定属性的格式设置所有绑定的值。spring.cloud.stream.default.<property>=<value> spring.cloud.stream.default.<producer|consumer>.<property>=<value> |
在避免扩展绑定属性的重复时,应使用 - 格式。spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>
为了避免重复,Spring Cloud Stream 支持以常见绑定属性的格式设置所有绑定的值。spring.cloud.stream.default.<property>=<value> spring.cloud.stream.default.<producer|consumer>.<property>=<value> |
常见绑定属性
这些属性通过以下方式公开org.springframework.cloud.stream.config.BindingProperties
以下绑定属性可用于输入和输出绑定,并且必须以 (例如,) 为前缀。spring.cloud.stream.bindings.<bindingName>.
spring.cloud.stream.bindings.uppercase-in-0.destination=ticktock
可以使用前缀设置默认值(例如 )。spring.cloud.stream.default
spring.cloud.stream.default.contentType=application/json
- 目的地
-
绑定中间件上绑定的目标目标(例如,RabbitMQ 交换或 Kafka 主题)。 如果 binding 表示使用者绑定(输入),则它可以绑定到多个目标,并且可以将目标名称指定为逗号分隔值。 否则,将改用实际的绑定名称。 无法重写此属性的默认值。
String
- 群
-
绑定的使用者组。 仅适用于入站绑定。 请参阅使用者组。
默认值:(表示匿名使用者)。
null
- 内容类型
-
此绑定的内容类型。 看。
Content Type Negotiation
违约:。
application/json
- 粘结 剂
-
此绑定使用的活页夹。 有关详细信息,请参阅。
Multiple Binders on the Classpath
默认值:(如果存在,则使用默认活页夹)。
null
消费者属性
这些属性通过以下方式公开org.springframework.cloud.stream.binder.ConsumerProperties
以下绑定属性仅适用于输入绑定,并且必须以 (例如,) 为前缀。spring.cloud.stream.bindings.<bindingName>.consumer.
spring.cloud.stream.bindings.input.consumer.concurrency=3
可以使用前缀设置默认值(例如,)。spring.cloud.stream.default.consumer
spring.cloud.stream.default.consumer.headerMode=none
- 自动启动
-
指示此使用者是否需要自动启动
违约:。
true
- 并发
-
入站使用者的并发性。
违约:。
1
- 分区
-
使用者是否从分区生产者接收数据。
违约:。
false
- 标题模式
-
设置为 时,禁用输入上的标头解析。 仅对本机不支持消息头且需要嵌入消息头的消息中间件有效。 当不支持本机标头时,使用来自非 Spring Cloud Stream 应用程序的数据时,此选项非常有用。 设置为 时,它使用中间件的本机标头机制。 设置为 时,它会将标头嵌入到消息有效负载中。
none
headers
embeddedHeaders
默认值:取决于 binder 实现。
- 最大尝试次数
-
如果处理失败,则为尝试处理消息的次数(包括第一次)。 设置为禁用重试。
1
违约:。
3
- backOffInitialInterval
-
重试时的回退初始间隔。
违约:。
1000
- backOffMaxInterval
-
最大回退间隔。
违约:。
10000
- backOff乘法器
-
退避乘数。
违约:。
2.0
- default可重试
-
侦听器引发的未在 中列出的异常是否可重试。
retryableExceptions
违约:。
true
- 实例计数
-
当设置为大于等于零的值时,它允许自定义此使用者的实例计数(如果与 不同)。 设置为负值时,默认为 。 有关详细信息,请参阅。
spring.cloud.stream.instanceCount
spring.cloud.stream.instanceCount
Instance Index and Instance Count
违约:。
-1
- 实例索引
-
当设置为大于等于零的值时,它允许自定义此使用者的实例索引(如果与 不同)。 设置为负值时,默认为 。 如果提供,则忽略。 有关详细信息,请参阅。
spring.cloud.stream.instanceIndex
spring.cloud.stream.instanceIndex
instanceIndexList
Instance Index and Instance Count
违约:。
-1
- instanceIndexList
-
与不支持本机分区的绑定器(如 RabbitMQ)一起使用;允许应用程序实例从多个分区使用。
默认值:空。
- retryableExceptions
-
键中 Throwable 类名的映射和值中的布尔值的映射。 指定将要或不会重试的异常(和子类)。 另请参见。 例:。
defaultRetriable
spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false
默认值:空。
- 使用NativeDecoding
-
设置为 时,入站消息由客户端库直接反序列化,必须相应地配置该库(例如,设置适当的 Kafka 生产者值反序列化程序)。 使用此配置时,入站消息取消编组不基于绑定。 使用本机解码时,生产者负责使用适当的编码器(例如,Kafka 生产者值序列化程序)来序列化出站消息。 此外,当使用本机编码和解码时,将忽略该属性,并且不会在消息中嵌入标头。 请参见 producer 属性 。
true
contentType
headerMode=embeddedHeaders
useNativeEncoding
违约:。
false
- 多重
-
设置为 true 时,基础绑定器将在同一输入绑定上本机多路复用目标。
违约:。
false
高级消费者配置
要为消息驱动的使用者配置底层消息侦听器容器的高级配置,请将单个 Bean 添加到应用程序上下文中。
它将在应用上述属性后调用,并可用于设置其他属性。
同样,对于轮询的消费者,添加一个 bean。ListenerContainerCustomizer
MessageSourceCustomizer
以下是 RabbitMQ 活页夹的示例:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer() {
return (container, dest, group) -> container.setAdviceChain(advice1, advice2);
}
@Bean
public MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer() {
return (source, dest, group) -> source.setPropertiesConverter(customPropertiesConverter);
}
生产者属性
这些属性通过以下方式公开org.springframework.cloud.stream.binder.ProducerProperties
以下绑定属性仅适用于输出绑定,并且必须以 (例如,) 为前缀。spring.cloud.stream.bindings.<bindingName>.producer.
spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id
可以使用前缀设置默认值(例如,)。spring.cloud.stream.default.producer
spring.cloud.stream.default.producer.partitionKeyExpression=headers.id
- 自动启动
-
指示此使用者是否需要自动启动
违约:。
true
- partitionKey表达式
-
一个 SpEL 表达式,用于确定如何对出站数据进行分区。 如果设置,则对此绑定上的出站数据进行分区。 必须设置为大于 1 的值才能生效。 看。
partitionCount
Partitioning
默认值:null。
- partitionKeyExtractorName
-
实现 的 Bean 的名称。用于提取用于计算的密钥 分区 ID(请参阅“partitionSelector*”)。与“partitionKeyExpression”互斥。
PartitionKeyExtractorStrategy
默认值:null。
- partitionSelectorName
-
实现 的 Bean 的名称。用于确定基于分区 ID 在分区键上(请参阅“partitionKeyExtractor*”)。与“partitionSelectorExpression”互斥。
PartitionSelectorStrategy
默认值:null。
- partitionSelectorExpression
-
用于自定义分区选择的 SpEL 表达式。 如果两者都未设置,则选择分区作为 ,其中通过任一 .
hashCode(key) % partitionCount
key
partitionKeyExpression
违约:。
null
- 分区计数
-
数据的目标分区数(如果启用了分区)。 如果对生产者进行了分区,则必须将其设置为大于 1 的值。 在卡夫卡上,它被解释为一种暗示。改用目标主题的分区计数和分区计数中的较大者。
违约:。
1
- 必需组
-
一个以逗号分隔的组列表,生产者必须确保消息传递到这些组,即使它们在创建消息后启动(例如,通过在 RabbitMQ 中预先创建持久队列)。
- 标题模式
-
设置为 时,它将禁用输出上的标头嵌入。 它仅对本机不支持消息头且需要嵌入消息头的消息中间件有效。 当不支持本机标头时,为非 Spring Cloud Stream 应用程序生成数据时,此选项非常有用。 设置为 时,它使用中间件的本机标头机制。 设置为 时,它会将标头嵌入到消息有效负载中。
none
headers
embeddedHeaders
默认值:取决于绑定器实现。
- 使用NativeEncoding
-
当设置为 时,出站消息由客户端库直接序列化,必须相应地配置该库(例如,设置适当的 Kafka 生产者值序列化程序)。 使用此配置时,出站消息封送处理不基于绑定。 使用本机编码时,使用者有责任使用适当的解码器(例如,Kafka 使用者值反序列化器)来反序列化入站消息。 此外,当使用本机编码和解码时,将忽略该属性,并且不会在消息中嵌入标头。 请参见 consumer 属性 。
true
contentType
headerMode=embeddedHeaders
useNativeDecoding
违约:。
false
- errorChannelEnabled
-
设置为 true 时,如果绑定程序支持异步发送结果,则发送失败将发送到目标的错误通道。有关详细信息,请参阅错误处理。
默认值:false。
高级生产者配置
在某些情况下,生产者属性不足以在活页夹中正确配置生成 MessageHandler,或者您可能更喜欢编程方法
同时配置此类生成 MessageHandler。无论出于何种原因,spring-cloud-stream 都可以完成它。ProducerMessageHandlerCustomizer
@FunctionalInterface
public interface ProducerMessageHandlerCustomizer<H extends MessageHandler> {
/**
* Configure a {@link MessageHandler} that is being created by the binder for the
* provided destination name.
* @param handler the {@link MessageHandler} from the binder.
* @param destinationName the bound destination name.
*/
void configure(H handler, String destinationName);
}
如您所见,它使您可以访问生产的实际实例,您可以根据需要对其进行配置。
您需要做的就是提供此策略的实现,并将其配置为 .MessageHandler
@Bean