绑定属性是使用 的格式提供的。 表示正在配置的绑定的名称。spring.cloud.stream.bindings.<bindingName>.<property>=<value><bindingName>

例如,对于以下函数

@Bean
public Function<String, String> uppercase() {
	return v -> v.toUpperCase();
}

有两个绑定分别用于输入和输出。有关详细信息,请参阅 [绑定和绑定名称]。uppercase-in-0uppercase-out-0

为了避免重复,Spring Cloud Stream 支持以常见绑定属性的格式设置所有绑定的值。spring.cloud.stream.default.<property>=<value>spring.cloud.stream.default.<producer|consumer>.<property>=<value>

在避免扩展绑定属性的重复时,应使用 - 格式。spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>

为了避免重复,Spring Cloud Stream 支持以常见绑定属性的格式设置所有绑定的值。spring.cloud.stream.default.<property>=<value>spring.cloud.stream.default.<producer|consumer>.<property>=<value>

常见绑定属性

这些属性通过以下方式公开org.springframework.cloud.stream.config.BindingProperties

以下绑定属性可用于输入和输出绑定,并且必须以 (例如,) 为前缀。spring.cloud.stream.bindings.<bindingName>.spring.cloud.stream.bindings.uppercase-in-0.destination=ticktock

可以使用前缀设置默认值(例如 )。spring.cloud.stream.defaultspring.cloud.stream.default.contentType=application/json

目的地

绑定中间件上绑定的目标目标(例如,RabbitMQ 交换或 Kafka 主题)。 如果 binding 表示使用者绑定(输入),则它可以绑定到多个目标,并且可以将目标名称指定为逗号分隔值。 否则,将改用实际的绑定名称。 无法重写此属性的默认值。String

绑定的使用者组。 仅适用于入站绑定。 请参阅使用者组

默认值:(表示匿名使用者)。null

内容类型

此绑定的内容类型。 看。Content Type Negotiation

违约:。application/json

粘结 剂

此绑定使用的活页夹。 有关详细信息,请参阅。Multiple Binders on the Classpath

默认值:(如果存在,则使用默认活页夹)。null

消费者属性

这些属性通过以下方式公开org.springframework.cloud.stream.binder.ConsumerProperties

以下绑定属性仅适用于输入绑定,并且必须以 (例如,) 为前缀。spring.cloud.stream.bindings.<bindingName>.consumer.spring.cloud.stream.bindings.input.consumer.concurrency=3

可以使用前缀设置默认值(例如,)。spring.cloud.stream.default.consumerspring.cloud.stream.default.consumer.headerMode=none

自动启动

指示此使用者是否需要自动启动

违约:。true

并发

入站使用者的并发性。

违约:。1

分区

使用者是否从分区生产者接收数据。

违约:。false

标题模式

设置为 时,禁用输入上的标头解析。 仅对本机不支持消息头且需要嵌入消息头的消息中间件有效。 当不支持本机标头时,使用来自非 Spring Cloud Stream 应用程序的数据时,此选项非常有用。 设置为 时,它使用中间件的本机标头机制。 设置为 时,它会将标头嵌入到消息有效负载中。noneheadersembeddedHeaders

默认值:取决于 binder 实现。

最大尝试次数

如果处理失败,则为尝试处理消息的次数(包括第一次)。 设置为禁用重试。1

违约:。3

backOffInitialInterval

重试时的回退初始间隔。

违约:。1000

backOffMaxInterval

最大回退间隔。

违约:。10000

backOff乘法器

退避乘数。

违约:。2.0

default可重试

侦听器引发的未在 中列出的异常是否可重试。retryableExceptions

违约:。true

实例计数

当设置为大于等于零的值时,它允许自定义此使用者的实例计数(如果与 不同)。 设置为负值时,默认为 。 有关详细信息,请参阅。spring.cloud.stream.instanceCountspring.cloud.stream.instanceCountInstance Index and Instance Count

违约:。-1

实例索引

当设置为大于等于零的值时,它允许自定义此使用者的实例索引(如果与 不同)。 设置为负值时,默认为 。 如果提供,则忽略。 有关详细信息,请参阅。spring.cloud.stream.instanceIndexspring.cloud.stream.instanceIndexinstanceIndexListInstance Index and Instance Count

违约:。-1

instanceIndexList

与不支持本机分区的绑定器(如 RabbitMQ)一起使用;允许应用程序实例从多个分区使用。

默认值:空。

retryableExceptions

键中 Throwable 类名的映射和值中的布尔值的映射。 指定将要或不会重试的异常(和子类)。 另请参见。 例:。defaultRetriablespring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false

默认值:空。

使用NativeDecoding

设置为 时,入站消息由客户端库直接反序列化,必须相应地配置该库(例如,设置适当的 Kafka 生产者值反序列化程序)。 使用此配置时,入站消息取消编组不基于绑定。 使用本机解码时,生产者负责使用适当的编码器(例如,Kafka 生产者值序列化程序)来序列化出站消息。 此外,当使用本机编码和解码时,将忽略该属性,并且不会在消息中嵌入标头。 请参见 producer 属性 。truecontentTypeheaderMode=embeddedHeadersuseNativeEncoding

违约:。false

多重

设置为 true 时,基础绑定器将在同一输入绑定上本机多路复用目标。

违约:。false

高级消费者配置

要为消息驱动的使用者配置底层消息侦听器容器的高级配置,请将单个 Bean 添加到应用程序上下文中。 它将在应用上述属性后调用,并可用于设置其他属性。 同样,对于轮询的消费者,添加一个 bean。ListenerContainerCustomizerMessageSourceCustomizer

以下是 RabbitMQ 活页夹的示例:

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer() {
    return (container, dest, group) -> container.setAdviceChain(advice1, advice2);
}

@Bean
public MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer() {
    return (source, dest, group) -> source.setPropertiesConverter(customPropertiesConverter);
}

生产者属性

这些属性通过以下方式公开org.springframework.cloud.stream.binder.ProducerProperties

以下绑定属性仅适用于输出绑定,并且必须以 (例如,) 为前缀。spring.cloud.stream.bindings.<bindingName>.producer.spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id

可以使用前缀设置默认值(例如,)。spring.cloud.stream.default.producerspring.cloud.stream.default.producer.partitionKeyExpression=headers.id

自动启动

指示此使用者是否需要自动启动

违约:。true

partitionKey表达式

一个 SpEL 表达式,用于确定如何对出站数据进行分区。 如果设置,则对此绑定上的出站数据进行分区。 必须设置为大于 1 的值才能生效。 看。partitionCountPartitioning

默认值:null。

partitionKeyExtractorName

实现 的 Bean 的名称。用于提取用于计算的密钥 分区 ID(请参阅“partitionSelector*”)。与“partitionKeyExpression”互斥。PartitionKeyExtractorStrategy

默认值:null。

partitionSelectorName

实现 的 Bean 的名称。用于确定基于分区 ID 在分区键上(请参阅“partitionKeyExtractor*”)。与“partitionSelectorExpression”互斥。PartitionSelectorStrategy

默认值:null。

partitionSelectorExpression

用于自定义分区选择的 SpEL 表达式。 如果两者都未设置,则选择分区作为 ,其中通过任一 .hashCode(key) % partitionCountkeypartitionKeyExpression

违约:。null

分区计数

数据的目标分区数(如果启用了分区)。 如果对生产者进行了分区,则必须将其设置为大于 1 的值。 在卡夫卡上,它被解释为一种暗示。改用目标主题的分区计数和分区计数中的较大者。

违约:。1

必需组

一个以逗号分隔的组列表,生产者必须确保消息传递到这些组,即使它们在创建消息后启动(例如,通过在 RabbitMQ 中预先创建持久队列)。

标题模式

设置为 时,它将禁用输出上的标头嵌入。 它仅对本机不支持消息头且需要嵌入消息头的消息中间件有效。 当不支持本机标头时,为非 Spring Cloud Stream 应用程序生成数据时,此选项非常有用。 设置为 时,它使用中间件的本机标头机制。 设置为 时,它会将标头嵌入到消息有效负载中。noneheadersembeddedHeaders

默认值:取决于绑定器实现。

使用NativeEncoding

当设置为 时,出站消息由客户端库直接序列化,必须相应地配置该库(例如,设置适当的 Kafka 生产者值序列化程序)。 使用此配置时,出站消息封送处理不基于绑定。 使用本机编码时,使用者有责任使用适当的解码器(例如,Kafka 使用者值反序列化器)来反序列化入站消息。 此外,当使用本机编码和解码时,将忽略该属性,并且不会在消息中嵌入标头。 请参见 consumer 属性 。truecontentTypeheaderMode=embeddedHeadersuseNativeDecoding

违约:。false

errorChannelEnabled

设置为 true 时,如果绑定程序支持异步发送结果,则发送失败将发送到目标的错误通道。有关详细信息,请参阅错误处理。

默认值:false。

高级生产者配置

在某些情况下,生产者属性不足以在活页夹中正确配置生成 MessageHandler,或者您可能更喜欢编程方法 同时配置此类生成 MessageHandler。无论出于何种原因,spring-cloud-stream 都可以完成它。ProducerMessageHandlerCustomizer

@FunctionalInterface
public interface ProducerMessageHandlerCustomizer<H extends MessageHandler> {

	/**
	 * Configure a {@link MessageHandler} that is being created by the binder for the
	 * provided destination name.
	 * @param handler the {@link MessageHandler} from the binder.
	 * @param destinationName the bound destination name.
	 */
	void configure(H handler, String destinationName);

}

如您所见,它使您可以访问生产的实际实例,您可以根据需要对其进行配置。 您需要做的就是提供此策略的实现,并将其配置为 .MessageHandler@Bean