|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0! |
结合性质
绑定属性通过以下格式提供spring.cloud.stream.bindings.<bindingName>.<property>=<value>.
这<绑定名字>表示被配置的装订名称。
例如,对于以下函数
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
有两种装订,名为0大写对于输入和大写输出0用于输出。详情请参见[绑定和绑定名称]。
为避免重复,Spring Cloud Stream 支持为所有绑定设置值,格式为spring.cloud.stream.default.<property>=<value>和spring.cloud.stream.default.<producer|consumer>.<property>=<value>因为具有共同的结合性质。 |
为了避免重复以获得延长的结合特性,应采用这种格式——Spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>.
常见的结合性质
这些属性通过以下方式被揭示org.springframework.cloud.stream.config.BindingProperties
以下绑定属性适用于输入和输出绑定,且必须以spring.cloud.stream.bindings.<bindingName>.(例如,spring.cloud.stream.bindings.uppercase-in-0.destination=ticktock).
默认值可以通过使用spring.cloud.stream.default前缀(例如)spring.cloud.stream.default.contentType=application/json).
- 目的地
-
绑定的目标目的地是绑定中间件上的目标(例如RabbitMQ交换或Kafka主题)。如果绑定代表消费者绑定(输入),它可以绑定到多个目的地,目的名称可以用逗号分隔表示
字符串值。 如果不行,则使用实际的绑定名称。该属性的默认值无法被覆盖。 - 群
-
绑定的消费者组。仅适用于入站绑定。参见消费者组。
违约:
零(表示匿名消费者)。 - 内容类型
-
这种装订内容类型。 看
内容类型协商.违约:
application/json. - 粘结 剂
-
这种装订所用的活页夹。 看
类路径上的多个活页夹细节。违约:
零(如果存在,则使用默认绑定器)。
消费物业
这些属性通过以下方式被揭示org.springframework.cloud.stream.binder.ConsumerProperties
以下绑定属性仅适用于输入绑定,且必须以spring.cloud.stream.bindings.<bindingName>.consumer.(例如,spring.cloud.stream.bindings.input.consumer.concurrency=3).
默认值可以通过使用spring.cloud.stream.default.consumer前缀(例如,spring.cloud.stream.default.consumer.headerMode=none).
- 自动启动
-
如果需要启动该消费者,信号
违约:
true. - 并发
-
入站消费者的并发。
违约:
1. - 分区
-
消费者是否从分区生产者那里接收数据。
违约:
false. - 头部模式
-
当设置为
没有, 禁用输入的头部解析。仅适用于不原生支持消息头且需要头部嵌入的消息中间件。当非 Spring Cloud Stream 应用中不支持本地头部时,该选项非常有用。当设置为头,它使用了中间件的原生头部机制。 当设置为嵌入的头部它将报头嵌入到消息有效载荷中。默认:取决于绑定器的实现。
- 最大尝试次数
-
如果处理失败,处理该消息的次数(包括第一次)。设置为
1以禁用重试。违约:
3. - backOffInitialInterval
-
重试时的退回初始间隔。
违约:
1000. - backOffMaxInterval
-
最大后退间隔。
违约:
10000. - backOffMultiplier
-
退后倍数。
违约:
2.0. - default可重试
-
听者是否抛出未在
retryableExceptions可以重试。违约:
true. - 实例计数
-
当设置为大于零的值时,允许自定义该消费者的实例计数(如果不同于
spring.cloud.stream.instanceCount). 当设置为负值时,默认为spring.cloud.stream.instanceCount. 看实例索引与实例计数更多信息请见。违约:
-1. - 实例索引
-
当设置为大于零的值时,允许自定义该消费者的实例索引(如果与 不同于
spring.cloud.stream.instanceIndex). 当设置为负值时,默认为spring.cloud.stream.instanceIndex. 如果忽略实例索引列表提供。 看实例索引与实例计数更多信息请见。违约:
-1. - 实例索引列表
-
与不支持本地分区的绑定器(如RabbitMQ)一起使用;允许一个应用实例从多个分区中获取数据。
默认:空。
- retryableExceptions
-
键中包含可投掷类名的映射,值中包含布尔值。指定哪些例外(及子类)会被重试或不会重试。另见
default可重试. 例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false.默认:空。
- useNativeDecoding
-
当设置为
true,入站消息由客户端库直接反序列化,客户端库必须相应配置(例如设置合适的Kafka产出值反串化器)。当使用该配置时,入站消息的解封组并非基于内容类型绑定的序列。当使用原生解码时,生产者有责任使用合适的编码器(例如Kafka的生产值序列化器)来序列化出站消息。此外,当使用本地编码和解码时,headerMode=embeddedHeaders属性被忽略,且头部不嵌入消息中。参见生产者属性useNative编码.违约:
false. - 多重
-
当设置为 true,底层绑定器会在同一输入绑定上原生复用目的地。
违约:
false.
高级消费者配置
对于消息驱动消费者的底层消息监听器容器的高级配置,只需添加一个ListenerContainerCustomizer在应用上下文中调用 bean。应用上述属性后,它将被调用,并可用于设置更多属性。同样,对于被轮询的消费者,添加一个消息源定制器豆。
以下是RabbitMQ绑定器的示例:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer() {
return (container, dest, group) -> container.setAdviceChain(advice1, advice2);
}
@Bean
public MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer() {
return (source, dest, group) -> source.setPropertiesConverter(customPropertiesConverter);
}
制作人属性
这些属性通过以下方式被揭示org.springframework.cloud.stream.binder.ProducerProperties
以下绑定属性仅适用于输出绑定,且必须以spring.cloud.stream.bindings.<bindingName>.producer.(例如,spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id).
默认值可以通过使用前缀 来设置spring.cloud.stream.default.producer(例如,spring.cloud.stream.default.producer.partitionKeyExpression=headers.id).
- 自动启动
-
如果需要启动该消费者,信号
违约:
true. - partitionKeyExpression
-
一个确定如何分区出站数据的 SpEL 表达式。 如果设置为,该绑定上的出站数据会被分区。
partitionCount必须设置为大于1的值才能有效。 看分区.默认:无。
- partitionKeyExtractorName
-
实现
PartitionKeyExtractorStrategy.用于提取用于计算的密钥 分区 ID(参见“partitionSelector*”)。与“partitionKeyExpression”互斥。默认:无。
- partitionSelectorName
-
实现
PartitionSelectorStrategy.用于确定基于分区ID的判定 在分区键上(参见“partitionKeyExtractor*”)。与“partitionSelectorExpression”互斥。默认:无。
- partitionSelectorExpression
-
用于自定义分区选择的SpEL表达式。 如果两者都未被设置,则选定该划分为
hashCode(key) % partitionCount哪里钥匙通过以下方式计算partitionKeyExpression.违约:
零. - partitionCount
-
如果启用分区,数据的目标分区数量。 如果生产者被分割,必须设置为大于1的值。 在卡夫卡中,这被解读为暗示。使用较大者和目标主题的分区计数。
违约:
1. - 必要组
-
一个逗号分隔的组列表,生产者必须确保消息在创建后才开始传递(例如,通过在RabbitMQ中预先创建持久队列)。
- 头部模式
-
当设置为
没有它在输出时禁用了头部嵌入。 它仅适用于那些不原生支持消息头且需要头嵌入的消息中间件。 当非 Spring Cloud Stream 应用不支持本地头部时,该选项非常有用。 当设置为头,它使用了中间件的原生头部机制。 当设置为嵌入的头部它将报头嵌入到消息有效载荷中。默认:取决于绑定器实现。
- useNative编码
-
当设置为
true,出境消息由客户端库直接序列化,客户端库必须相应配置(例如设置合适的Kafka产出值序列化器)。 当使用这种配置时,出站消息编组并非基于内容类型绑定的。 当使用原生编码时,消费者有责任使用合适的解码器(例如Kafka的消费者值反串行器)来反序列化入站消息。 此外,当使用原生编码和解码时,headerMode=embeddedHeaders属性被忽略,且报头不嵌入消息中。 参见消费物业useNativeDecoding.违约:
false. - errorChannelEnabled
-
当设置为true时,如果绑定器支持异步发送结果,发送失败会被发送到目的地的错误通道。更多信息请参见错误处理。
默认:false。
高级生产者配置
在某些情况下,生产者属性不足以在活页夹中正确配置生成的消息处理程序,或者你可能更倾向于程序化方法
同时配置这样一个生成MessageHandler的过程。无论原因如何,Spring-云-溪流都能带来ProducerMessageHandlerCustomizer去完成它。
@FunctionalInterface
public interface ProducerMessageHandlerCustomizer<H extends MessageHandler> {
/**
* Configure a {@link MessageHandler} that is being created by the binder for the
* provided destination name.
* @param handler the {@link MessageHandler} from the binder.
* @param destinationName the bound destination name.
*/
void configure(H handler, String destinationName);
}
如你所见,它让你能进入实际的制作实例消息处理器你可以根据需要配置。
你只需要实现这个策略,并配置为@Bean.