参考指南

本指南介绍了RabbitMQ对Spring Cloud StreamBinder的实现。它包含有关其设计、使用和配置选项的信息,以及有关StreamCloudStream概念如何映射到RabbitMQ特定构造的信息。spring-doc.cadn.net.cn

1. 用法

要使用RabbitMQ绑定器,您可以将其添加到您的Spring Cloud流应用程序中,方法是使用以下Maven坐标:spring-doc.cadn.net.cn

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

相反,您可以使用Spring Cloud Stream RabbitMQStarters,如下所示:spring-doc.cadn.net.cn

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. RabbitMQ Binder 概述

以下简化示意图展示了RabbitMQ binder的运行方式:spring-doc.cadn.net.cn

rabbit binder
图1. RabbitMQ 绑定器

默认情况下,RabbitMQ 绑定实现将每个目标映射到TopicExchange。对于每个消费者组,Queue绑定到该TopicExchange。每个消费者实例都有一个对应的RabbitMQConsumer实例,用于其组的Queue。对于分区生产者和使用者,队列使用分区索引后缀,并将分区索引作为路由密钥。对于匿名使用者(没有group属性),将使用自动删除队列(带随机唯一名称)。spring-doc.cadn.net.cn

通过可选的autoBindDlq选项,您可以配置绑定器创建并配置死信队列(DLQs)(以及一个死信交换DLX,以及路由基础设施)。默认情况下,死信队列的名称为目的地名称加上 .dlq。如果启用了重试(maxAttempts > 1),则在重试用尽后,会将失败的消息传递到死信队列。如果重试被禁用(maxAttempts = 1),则应将requeueRejected设置为false(默认值),以便失败的消息被路由到DLQ,而不是重新排队。
另外,republishToDlq 会导致绑定程序将失败的消息发布到 DLQ(而不是拒绝它)。This feature lets additional information (such as the stack trace in the x-exception-stacktrace header) be added to the message in headers.查看frameMaxHeadroom属性,了解有关截断堆栈跟踪的信息。这个选项不需要启用重试。您可以仅在一次尝试后重新发布失败的消息。spring-doc.cadn.net.cn

Starting with version 1.spring-doc.cadn.net.cn

2,您可以配置重新发布消息的交付模式。参见republishDeliveryMode属性

spring-doc.cadn.net.cn

如果流侦听器抛出一个ImmediateAcknowledgeAmqpException,则会跳过DLQ,并且消息将被简单地丢弃。从版本2.1开始,无论republishToDlq如何设置,这都是正确的;在以前,只有当republishToDlqfalse时才成立。spring-doc.cadn.net.cn

requeueRejected 设置为 true(使用 republishToDlq=false )会导致消息被重新排队并不断重新传递,除非故障是瞬态的,否则这可能不是您想要的操作。 通常,您应通过将 maxAttempts 设置为大于一或将 republishToDlq 设置为 true 来在绑定器中启用重试。

有关这些属性的更多信息,请参阅RabbitMQ绑定器属性spring-doc.cadn.net.cn

该框架不提供任何标准机制来处理死信消息(或将其重新路由回主队列)。死信队列处理中描述了一些选项。spring-doc.cadn.net.cn

当在Spring Cloud Stream应用程序中使用多个RabbitMQ绑定器时,禁用'RabbitAutoConfiguration'很重要,以避免相同的配置从RabbitAutoConfiguration应用于两个绑定器。
您可以使用@SpringBootApplication注解来排除该类。

从版本2.0开始,RabbitMessageChannelBinderRabbitTemplate.userPublisherConnection 属性设置为 true,以便非事务性生产者避免死锁,这可能会导致由于代理上的内存警报而导致缓存连接被阻塞。spring-doc.cadn.net.cn

当前,仅支持一个消费者(一个消费者监听多个队列)用于消息驱动式消费者;轮询式消费者只能从一个队列中获取消息。

3. 配置选项

本节包含与 RabbitMQ 绑定程序和绑定通道相关的设置。spring-doc.cadn.net.cn

有关通用绑定配置选项和属性的信息,请参阅Spring Cloud Stream核心文档spring-doc.cadn.net.cn

3.1. RabbitMQ Binder 属性

默认情况下,RabbitMQ 绑定器使用 Spring Boot 的 ConnectionFactory。因此,它支持 RabbitMQ 的所有 Spring Boot 配置选项。(有关参考,请参阅 Spring Boot 文档)。RabbitMQ 配置选项使用 spring.rabbitmq 前缀。spring-doc.cadn.net.cn

除了Spring Boot选项之外,RabbitMQ绑定器还支持以下属性:spring-doc.cadn.net.cn

spring.cloud.stream.rabbit.binder.adminAddresses

一个逗号分隔的 RabbitMQ 管理插件 URL 列表。仅在 nodes 包含多个条目时使用。spring.rabbitmq.addresses 中必须有对应的条目。如果您使用 RabbitMQ 集群并希望从托管队列的节点消费,则需要此列表。有关更多信息,请参阅队列亲和性和 LocalizedQueueConnectionFactoryspring-doc.cadn.net.cn

默认:空。spring-doc.cadn.net.cn

spring.cloud.stream.rabbit.binder.nodes

用逗号分隔的 RabbitMQ 节点名称列表。
当有多个条目时,用于定位队列所在的服务器地址。
此列表中的每个条目都必须在spring.rabbitmq.addresses中有相应的条目。
只有在使用 RabbitMQ 集群并且希望从托管队列的节点消费时才需要。
有关更多信息,请参阅队列关联性和本地化队列连接工厂spring-doc.cadn.net.cn

默认:空。spring-doc.cadn.net.cn

spring.cloud.stream.rabbit.binder.compressionLevel

压缩绑定的压缩级别。
参见 java.util.zip.Deflaterspring-doc.cadn.net.cn

默认:1(BEST_LEVEL)。spring-doc.cadn.net.cn

spring.cloud.stream.binder.connection-name-prefix

此绑定器创建的连接使用的连接名称前缀。 每次打开新的连接时,#n会递增。
名称为此前缀后跟 #n,其中 n 每次打开新连接时都会递增。spring-doc.cadn.net.cn

默认值:无(Spring AMQP 默认值)。spring-doc.cadn.net.cn

3.2. RabbitMQ 消费者属性

以下属性仅适用于 Rabbit 消费者,并且必须以 spring.cloud.stream.rabbit.bindings.<channelName>.consumer. 为前缀。spring-doc.cadn.net.cn

但是,如果需要将同一组属性应用于大多数绑定,为了防止重复,Spring Cloud Stream 支持在格式为 spring.cloud.stream.rabbit.default.<property>=<value> 的所有通道上设置值。spring-doc.cadn.net.cn

也请记住,绑定特定属性会将其默认值覆盖。spring-doc.cadn.net.cn

确认模式

确认模式。spring-doc.cadn.net.cn

默认值: AUTOspring-doc.cadn.net.cn

匿名组前缀

当绑定没有 group 属性时,会将一个匿名、自动删除的队列绑定到目标交换机。
此类队列的默认命名策略会导致队列被命名为 anonymous.<base64 representation of a UUID>
设置此属性可更改为除默认值以外的前缀。spring-doc.cadn.net.cn

默认值: anonymous.spring-doc.cadn.net.cn

自动绑定死信队列

是否自动声明死信队列(DLQ)并将其绑定到绑定器的死信交换(DLX)。spring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

绑定路由键

用于将队列绑定到交换机的路由键(如果bindQueuetrue)。可以有多个键——参见bindingRoutingKeyDelimiter。对于分区目的地,将在每个键后追加-<instanceIndex>spring-doc.cadn.net.cn

默认值: #spring-doc.cadn.net.cn

绑定路由键分隔符

当此值不为 null 时,将把 'bindingRoutingKey' 视为由该值分隔的键列表;通常使用逗号作为分隔符。spring-doc.cadn.net.cn

默认值: nullspring-doc.cadn.net.cn

绑定队列

是否声明队列并将其绑定到目标交换机。
如果已设置自己的基础设施并且先前创建并绑定了队列,请将其设置为falsespring-doc.cadn.net.cn

默认值: truespring-doc.cadn.net.cn

消费者标记前缀

用于创建消费者标签;每次创建一个消费者时,将会附加#n
示例:${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}spring-doc.cadn.net.cn

默认值:无 - 代理将生成随机的消费者标签。spring-doc.cadn.net.cn

容器类型

选择要使用的监听器容器类型。 见 在 Spring AMQP 文档中的“选择容器” 以获取更多信息。spring-doc.cadn.net.cn

默认值: simplespring-doc.cadn.net.cn

死信队列名称

死信队列的名称spring-doc.cadn.net.cn

默认值: prefix+destination.dlqspring-doc.cadn.net.cn

死信交换

一个DLX分配到队列。 仅当autoBindDlqtrue时相关。spring-doc.cadn.net.cn

默认值:'前缀+死信交换'spring-doc.cadn.net.cn

死信交换机类型

要分配给队列的DLX的类型。
仅当autoBindDlqtrue时相关。spring-doc.cadn.net.cn

默认值:'direct'spring-doc.cadn.net.cn

死信路由键

分配给队列的死信路由密钥。仅在 autoBindDlqtrue 时相关。spring-doc.cadn.net.cn

默认值: destinationspring-doc.cadn.net.cn

声明死信交换器

是否为目的地声明死信交换? 相关仅当autoBindDlqtrue时。 若已预配置DLX,请设为falsespring-doc.cadn.net.cn

默认值: truespring-doc.cadn.net.cn

声明交换机

是否为目的地声明交换机。spring-doc.cadn.net.cn

默认值: truespring-doc.cadn.net.cn

延迟交换

是否将交换声明为Delayed Message Exchange
需要在代理上安装延迟消息交换插件。
x-delayed-type参数设置为exchangeTypespring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

延迟队列绑定参数

绑定死信交换时应用于死信队列(dlq)的参数;与 headers deadLetterExchangeType 结合使用,用于指定要匹配的头部信息。例如 …​dlqBindingArguments.x-match=any, …​dlqBindingArguments.someHeader=someValuespring-doc.cadn.net.cn

死信队列死信交换机

如果声明了一个死信队列(DLQ),则需指定一个死信交换器(DLX)来分配给该队列。spring-doc.cadn.net.cn

默认值: nonespring-doc.cadn.net.cn

死信队列死信路由键

如果声明了死信队列,则需要指定一个分配给该队列的死信路由键。spring-doc.cadn.net.cn

默认值: nonespring-doc.cadn.net.cn

延迟队列到期

未使用的死信队列在被删除前的等待时间(以毫秒为单位)。spring-doc.cadn.net.cn

默认值: no expirationspring-doc.cadn.net.cn

懒加载

使用x-queue-mode=lazy参数声明死信队列。参见“延迟队列”。建议改用策略而不是此设置,因为策略允许在不删除队列的情况下更改该设置。spring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

最大dlq长度

死信队列中的最大消息数。spring-doc.cadn.net.cn

默认值: no limitspring-doc.cadn.net.cn

最大字节数

死信队列中所有消息的最大总字节数。spring-doc.cadn.net.cn

默认值: no limitspring-doc.cadn.net.cn

最大优先级队列

死信队列中消息的最大优先级(0-255)。spring-doc.cadn.net.cn

默认值: nonespring-doc.cadn.net.cn

队列溢出时的行为

当超过dlqMaxLengthdlqMaxLengthBytes时要采取的操作;目前为drop-headreject-publish,但请参考RabbitMQ文档。spring-doc.cadn.net.cn

默认值: nonespring-doc.cadn.net.cn

dlqQuorum.deliveryLimit

quorum.enabled=true 时,设置一个投递限制,超过该限制后消息将被丢弃或转为死信。spring-doc.cadn.net.cn

默认:无 - 将应用代理程序默认设置。spring-doc.cadn.net.cn

dlqQuorum.enabled

当为 true 时,创建一个法定人数死信队列而不是经典队列。spring-doc.cadn.net.cn

(默认值:false)spring-doc.cadn.net.cn

dlqQuorum.initialQuorumSize

quorum.enabled=true 时,设置初始法定人数大小。spring-doc.cadn.net.cn

默认:无 - 将应用代理程序默认设置。spring-doc.cadn.net.cn

单个活动消费者

设置为 true 可将 x-single-active-consumer 队列属性设为 true。spring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

dlqTtl

声明死信队列时应用的默认生存时间(以毫秒为单位)。spring-doc.cadn.net.cn

默认值: no limitspring-doc.cadn.net.cn

持久化订阅

订阅是否应该持久化。只有在同时设置group时才有效。spring-doc.cadn.net.cn

默认值: truespring-doc.cadn.net.cn

交换自动删除

如果 declareExchange 为真,则表示交换机是否应该自动删除(即在最后一个队列被删除后将其移除)。spring-doc.cadn.net.cn

默认值: truespring-doc.cadn.net.cn

交换机持久化

如果 declareExchange 为真,则表示交换机是否应具有持久性(即,在代理重启后仍然存在)。spring-doc.cadn.net.cn

默认值: truespring-doc.cadn.net.cn

交易类型

交换类型:directfanoutheaderstopic 表示非分区目的地,而 direct、标题或 topic 表示分区目的地。spring-doc.cadn.net.cn

默认值: topicspring-doc.cadn.net.cn

独家

是否创建独占消费者。 当此值为 true 时,并发应设置为 1。 通常在需要严格顺序但启用热备用实例以在发生故障后接管时使用。 参见 recoveryInterval,它控制备用实例尝试消费的频率。 当使用 RabbitMQ 3.8 或更高版本时,建议改用 singleActiveConsumerspring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

到期

多久以后未使用的队列会被删除(以毫秒为单位)。spring-doc.cadn.net.cn

默认值: no expirationspring-doc.cadn.net.cn

声明失败重试间隔

如果队列缺失,则尝试从队列中消耗之间的间隔(以毫秒为单位)。spring-doc.cadn.net.cn

默认值:5000spring-doc.cadn.net.cn

框架最大回程

将堆栈跟踪添加到死信队列(DLQ)消息头时,为其他标头保留的字节数。所有标头必须符合代理上配置的frame_max大小。
堆栈跟踪可能很大;如果此属性加上堆栈跟踪的大小超过frame_max,则堆栈跟踪将被截断。
将会记录一个警告日志;考虑增加frame_max或通过捕获异常并抛出自定义较小堆栈跟踪的异常来减少堆栈跟踪。spring-doc.cadn.net.cn

默认值:20000spring-doc.cadn.net.cn

headerPatterns

从入站消息映射标头的模式。spring-doc.cadn.net.cn

默认:['*'](所有头部)。spring-doc.cadn.net.cn

懒加载

使用 x-queue-mode=lazy 参数声明队列。
参见 “延迟队列”
建议改用策略,而不是此设置,因为策略允许在不删除队列的情况下更改设置。spring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

最大并发数

消费者的最大数量。
containerTypedirect时不支持。spring-doc.cadn.net.cn

默认值: 1spring-doc.cadn.net.cn

最大长度

队列中消息的最大数量。spring-doc.cadn.net.cn

默认值: no limitspring-doc.cadn.net.cn

最大长度字节

队列中所有消息的最大总字节数。spring-doc.cadn.net.cn

默认值: no limitspring-doc.cadn.net.cn

最高严重程度

队列中消息的最大优先级(0-255)。spring-doc.cadn.net.cn

默认值: nonespring-doc.cadn.net.cn

缺少队列致命

当找不到队列时,是否将此情况视为致命错误并停止监听器容器。默认值为false,因此容器会不断尝试从队列中消费消息——例如,在使用集群时,如果托管非高可用性队列的节点宕机。spring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

溢出行为

当超过maxLengthmaxLengthBytes时要采取的操作;目前为drop-headreject-publish,但请参考RabbitMQ文档。spring-doc.cadn.net.cn

默认值: nonespring-doc.cadn.net.cn

预取

预取计数。spring-doc.cadn.net.cn

默认值: 1spring-doc.cadn.net.cn

前缀

要添加到destination和队列名称前缀。spring-doc.cadn.net.cn

默认值:""。spring-doc.cadn.net.cn

队列绑定参数

绑定队列到交换时应用的参数;用于指定要匹配的标头,例如…​queueBindingArguments.x-match=any…​queueBindingArguments.someHeader=someValue
使用headersexchangeType来指定要匹配的标头。spring-doc.cadn.net.cn

队列声明重试次数

如果队列缺失,从队列中重新消费的重试次数。
仅当missingQueuesFataltrue时有效。
否则,容器将无限期地重试。
不支持当containerTypedirect的情况。spring-doc.cadn.net.cn

默认值: 3spring-doc.cadn.net.cn

仅队列名组

当为 true 时,从队列名称等于group的队列中消费。否则队列名称为destination.group。例如,在使用 Spring Cloud Stream 来从现有的 RabbitMQ 队列进行消费时,这很有用。spring-doc.cadn.net.cn

默认值:false。spring-doc.cadn.net.cn

quorum.deliveryLimit

quorum.enabled=true 时,设置一个投递限制,超过该限制后消息将被丢弃或转为死信。spring-doc.cadn.net.cn

默认:无 - 将应用代理程序默认设置。spring-doc.cadn.net.cn

quorum.enabled

为真时,创建一个法定人数队列而不是经典队列。spring-doc.cadn.net.cn

(默认值:false)spring-doc.cadn.net.cn

quorum.initialQuorumSize

quorum.enabled=true 时,设置初始法定人数大小。spring-doc.cadn.net.cn

默认:无 - 将应用代理程序默认设置。spring-doc.cadn.net.cn

恢复间隔

连接恢复尝试之间的间隔,单位为毫秒。spring-doc.cadn.net.cn

默认值: 5000spring-doc.cadn.net.cn

重新排队拒绝的

当重试被禁用或republishToDlq时,是否应重新排队交付失败的消息。false表示是。spring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

重新发布配送模式

republishToDlqtrue 时,指定重新发布的消息的传递模式。spring-doc.cadn.net.cn

默认值: DeliveryMode.PERSISTENTspring-doc.cadn.net.cn

转发到死信队列

默认情况下,重试次数用尽后仍失败的消息会被拒绝。如果配置了死信队列(DLQ),RabbitMQ会将失败的消息(未更改)路由到DLQ。如果设置为true,绑定器会将失败的消息重新发布到DLQ,并添加额外的标题,包括最终失败原因中的异常消息和堆栈跟踪。另请参阅frameMaxHeadroom属性spring-doc.cadn.net.cn

(默认值:false)spring-doc.cadn.net.cn

singleActiveConsumer

设置为 true 可将 x-single-active-consumer 队列属性设为 true。spring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

事务性

是否使用事务通道。spring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

TTL

声明队列时要应用的默认存活时间(以毫秒为单位)。spring-doc.cadn.net.cn

默认值: no limitspring-doc.cadn.net.cn

字体大小

在确认之间交付的数量。
containerTypedirect时不支持。spring-doc.cadn.net.cn

默认值: 1spring-doc.cadn.net.cn

3.3. 高级监听器容器配置

要设置未通过绑定程序或绑定属性公开的侦听器容器属性,请向应用程序上下文中添加一个 0 类型的 bean。 将设置绑定程序和绑定属性,然后调用自定义程序。 自定义程序(1 方法)将提供队列名称以及消费者组作为参数。spring-doc.cadn.net.cn

3.4. 高级队列/交换器/绑定配置

每隔一段时间,RabbitMQ团队都会添加一些新功能,这些功能需要在声明时设置某些参数才能启用,例如,一个队列。通常,通过添加适当的属性来启用此类功能,但在当前版本中可能不会立即可用。从版本 3.0.1 开始,您可以现在向应用程序上下文添加 DeclarableCustomizer 个 bean(s),以便在声明之前进行修改。这允许您添加当前未直接由绑定器支持的参数。spring-doc.cadn.net.cn

3.5. 接收批量消息

通常,如果生产者绑定具有batch-enabled=true(参见Rabbit Producer Properties),或者消息是由BatchingRabbitTemplate创建的,则批次中的元素将作为对监听器方法的单独调用返回。
从版本3.0开始,如果将spring.cloud.stream.bindings.<name>.consumer.batch-mode设置为true,则可以将此类任何批次呈现为对监听器方法的List<?>spring-doc.cadn.net.cn

3.6. Rabbit 生产者属性

Rabbit 生成器特有的以下属性可用,并且必须以“0”开头。spring-doc.cadn.net.cn

但是,如果需要将同一组属性应用于大多数绑定,为了防止重复,Spring Cloud Stream 支持在格式为 spring.cloud.stream.rabbit.default.<property>=<value> 的所有通道上设置值。spring-doc.cadn.net.cn

也请记住,绑定特定属性会将其默认值覆盖。spring-doc.cadn.net.cn

自动绑定死信队列

是否自动声明死信队列(DLQ)并将其绑定到绑定器的死信交换(DLX)。spring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

批量处理是否启用

是否启用生产者的消息批处理。消息会根据以下属性(在本列表中的接下来三个条目中描述)进行批处理:batchSizebatchBufferLimitbatchTimeout批量处理以获取更多信息。接收已批处理的消息spring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

批处理大小

启用批处理时,缓冲的消息数量。spring-doc.cadn.net.cn

默认值: 100spring-doc.cadn.net.cn

批次缓冲限制

批量处理启用时的最大缓冲区大小。spring-doc.cadn.net.cn

默认值: 10000spring-doc.cadn.net.cn

批量超时

启用批处理时的批处理超时时间。spring-doc.cadn.net.cn

默认值: 5000spring-doc.cadn.net.cn

绑定路由键

绑定队列到交换机时使用的路由键(如果bindQueuetrue)。 可以有多个键 - 参见bindingRoutingKeyDelimiter。 对于分区目的地,每个键都会追加上-n。 仅在提供requiredGroups时适用,并且仅对这些组有效。spring-doc.cadn.net.cn

默认值: #spring-doc.cadn.net.cn

绑定路由键分隔符

当此值不为 null 时,'bindingRoutingKey' 被认为是由该值分隔的键列表;通常使用逗号作为分隔符。 仅在提供 requiredGroups 个值时适用,并且只适用于这些组。spring-doc.cadn.net.cn

默认值: nullspring-doc.cadn.net.cn

绑定队列

是否声明队列并将其绑定到目标交换机。如果已设置好自己的基础设施并且先前已创建并绑定了队列,则将其设置为false。仅在提供requiredGroups时适用,然后仅适用于这些组。spring-doc.cadn.net.cn

默认值: truespring-doc.cadn.net.cn

压缩

数据发送时是否应进行压缩。spring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

确认消息通道

errorChannelEnabled 为 true 时,发送正向交付确认的通道(即发布者确认)。 如果通道不存在,将使用此名称注册一个 DirectChannel。 连接工厂必须配置以启用发布者确认。spring-doc.cadn.net.cn

默认:nullChannel(确认被丢弃)。spring-doc.cadn.net.cn

死信队列名称

死信队列的名称仅在提供requiredGroups时适用,且仅适用于这些组。spring-doc.cadn.net.cn

默认值: prefix+destination.dlqspring-doc.cadn.net.cn

死信交换

将DLX分配给队列。
仅在autoBindDlqtrue时相关。
仅当提供requiredGroups时适用,且仅对这些组有效。spring-doc.cadn.net.cn

默认值:'前缀+死信交换'spring-doc.cadn.net.cn

死信交换机类型

分配给队列的DLX类型。仅当autoBindDlqtrue时相关。
只有在提供requiredGroups时才适用,然后仅适用于这些组。spring-doc.cadn.net.cn

默认值:'direct'spring-doc.cadn.net.cn

死信路由键

分配给队列的死信路由密钥。 仅在 autoBindDlqtrue 时相关。 只有当提供requiredGroups时才适用,然后只适用于这些组。spring-doc.cadn.net.cn

默认值: destinationspring-doc.cadn.net.cn

声明死信交换器

是否为目的地声明死信交换。 仅在autoBindDlqtrue时相关。 如果已配置DLX,请设置为false。 仅当提供了requiredGroups时适用,且仅适用于这些组。spring-doc.cadn.net.cn

默认值: truespring-doc.cadn.net.cn

声明交换机

是否为目的地声明交换机。spring-doc.cadn.net.cn

默认值: truespring-doc.cadn.net.cn

延迟表达式

一个SpEL表达式,用于评估要应用到消息的延迟时间(x-delay标题)。如果交换不是延迟消息交换,则此操作无效。spring-doc.cadn.net.cn

默认情况下:未设置x-delay标题。spring-doc.cadn.net.cn

延迟交换

是否将交换声明为Delayed Message Exchange
需要在代理上安装延迟消息交换插件。
x-delayed-type参数设置为exchangeTypespring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

配送方式

交付模式。spring-doc.cadn.net.cn

默认值: PERSISTENTspring-doc.cadn.net.cn

延迟队列绑定参数

绑定死信队列(DLQ)到死信交换时应用的参数;与headers deadLetterExchangeType一起使用,用于指定要匹配的头部信息。
例如…​dlqBindingArguments.x-match=any, …​dlqBindingArguments.someHeader=someValue
仅在提供requiredGroups时适用,并且只对这些组有效。spring-doc.cadn.net.cn

死信队列死信交换机

当声明死信队列(DLQ)时,会分配一个死信交换器(DLX)给该队列。
仅在提供了requiredGroups的情况下适用,并且仅适用于这些组。spring-doc.cadn.net.cn

默认值: nonespring-doc.cadn.net.cn

死信队列死信路由键

声明死信队列 (DLQ) 后,将一个死信路由键分配给该队列。 仅在提供 requiredGroups 时适用,并且仅对这些组有效。spring-doc.cadn.net.cn

默认值: nonespring-doc.cadn.net.cn

延迟队列到期

闲置的死信队列在被删除之前保留的时间(以毫秒为单位)。仅当提供requiredGroups时适用,并且仅对这些组有效。spring-doc.cadn.net.cn

默认值: no expirationspring-doc.cadn.net.cn

懒加载

声明死信队列时使用 x-queue-mode=lazy 参数。 参见 “惰性队列”。 建议使用策略而不是此设置,因为使用策略可以在不删除队列的情况下更改设置。 仅在提供 requiredGroups 个参数时适用,并且仅对这些组有效。spring-doc.cadn.net.cn

最大dlq长度

死信队列中的最大消息数。 仅在提供requiredGroups时适用,并且仅对这些组有效。spring-doc.cadn.net.cn

默认值: no limitspring-doc.cadn.net.cn

最大字节数

死信队列中所有消息的最大总字节数。仅在提供requiredGroups时适用,且仅对这些组有效。spring-doc.cadn.net.cn

默认值: no limitspring-doc.cadn.net.cn

最大优先级队列

死信队列中消息的最大优先级(0-255)
仅在提供 requiredGroups 时适用,且仅对这些组有效。spring-doc.cadn.net.cn

默认值: nonespring-doc.cadn.net.cn

dlqQuorum.deliveryLimit

quorum.enabled=true时,设置一个投递限制,超过该限制后消息将被丢弃或转入死信队列。仅在提供requiredGroups时生效,并且仅适用于这些组。spring-doc.cadn.net.cn

默认:无 - 将应用代理程序默认设置。spring-doc.cadn.net.cn

dlqQuorum.enabled

当为 true 时,创建一个法定人数死信队列而不是经典队列。
仅在提供 requiredGroups 时适用,并且仅对这些组有效。spring-doc.cadn.net.cn

(默认值:false)spring-doc.cadn.net.cn

dlqQuorum.initialQuorumSize

quorum.enabled=true时,设置初始多数大小。
仅在提供requiredGroups时适用,并且仅适用于这些组。spring-doc.cadn.net.cn

默认:无 - 将应用代理程序默认设置。spring-doc.cadn.net.cn

单个活动消费者

设置为 true 将x-single-active-consumer队列属性设为true。仅当提供requiredGroups时适用,且仅适用于这些组。spring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

dlqTtl

声明时应用于死信队列的默认存活时间(以毫秒为单位)。仅在提供requiredGroups时适用,且仅对这些组生效。spring-doc.cadn.net.cn

默认值: no limitspring-doc.cadn.net.cn

交换自动删除

如果 declareExchangetrue,则交换是否应自动删除(在最后一个队列被移除后)。spring-doc.cadn.net.cn

默认值: truespring-doc.cadn.net.cn

交换机持久化

如果 declareExchangetrue,则交换是否应该具有持久性(在代理重启后仍然存在)。spring-doc.cadn.net.cn

默认值: truespring-doc.cadn.net.cn

交易类型

交换类型:directfanoutheaderstopic用于非分区目的地,而directheaderstopic用于分区目的地。spring-doc.cadn.net.cn

默认值: topicspring-doc.cadn.net.cn

到期

闲置队列在被删除前等待的时间(以毫秒为单位)。仅当提供requiredGroups时适用,并且仅对这些组有效。spring-doc.cadn.net.cn

默认值: no expirationspring-doc.cadn.net.cn

headerPatterns

用于映射到传出消息的标题模式。spring-doc.cadn.net.cn

默认:['*'](所有头部)。spring-doc.cadn.net.cn

懒加载

使用x-queue-mode=lazy参数声明队列。查看“延迟队列”。考虑使用策略而不是此设置,因为使用策略可以在不删除队列的情况下更改设置。仅当提供了requiredGroups时才适用,然后只适用于这些组。spring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

最大长度

队列中的最大消息数。仅在提供 requiredGroups 时适用,并且仅对这些组有效。spring-doc.cadn.net.cn

默认值: no limitspring-doc.cadn.net.cn

最大长度字节

队列中所有消息的最大总字节数。仅在提供requiredGroups时适用,并且仅适用于这些组。spring-doc.cadn.net.cn

默认值: no limitspring-doc.cadn.net.cn

最高严重程度

队列中消息的最大优先级(0-255)。仅当提供requiredGroups时适用,并且仅对这些组有效。spring-doc.cadn.net.cn

默认值: nonespring-doc.cadn.net.cn

前缀

要添加到destination交换机名称前的前缀。spring-doc.cadn.net.cn

默认值:""。spring-doc.cadn.net.cn

队列绑定参数

绑定队列到交换时应用的参数;用于headersexchangeType指定要匹配的标头。
例如…​queueBindingArguments.x-match=any…​queueBindingArguments.someHeader=someValue
仅在提供requiredGroups时适用,且仅适用于这些组。spring-doc.cadn.net.cn

仅队列名组

当为true时,从队列名称与group相等的队列中消费。否则队列名称为destination.group。例如,在使用Spring Cloud Stream消费现有的RabbitMQ队列时,这非常有用。仅在提供requiredGroups时适用,并且仅对这些组有效。spring-doc.cadn.net.cn

默认值:false。spring-doc.cadn.net.cn

quorum.deliveryLimit

quorum.enabled=true时,设置一个投递限制,超过该限制后消息将被丢弃或转入死信队列。仅在提供requiredGroups时生效,并且仅适用于这些组。spring-doc.cadn.net.cn

默认:无 - 将应用代理程序默认设置。spring-doc.cadn.net.cn

quorum.enabled

当为 true 时,创建一个法定人数队列而不是经典队列。
仅在提供requiredGroups时适用,然后仅适用于这些组。spring-doc.cadn.net.cn

(默认值:false)spring-doc.cadn.net.cn

quorum.initialQuorumSize

quorum.enabled=true时,设置初始多数大小。
仅在提供requiredGroups时适用,并且仅适用于这些组。spring-doc.cadn.net.cn

默认:无 - 将应用代理程序默认设置。spring-doc.cadn.net.cn

路由键表达式

一个用于确定在发布消息时使用的路由键的SpEL表达式。 对于固定的路由键,使用字面表达式,例如在属性文件中使用routingKeyExpression='my.routingKey',在YAML文件中使用routingKeyExpression: '''my.routingKey'''spring-doc.cadn.net.cn

默认值:destinationdestination-<partition>(针对分区目的地)。spring-doc.cadn.net.cn

singleActiveConsumer

设置为 true 将x-single-active-consumer队列属性设为true。仅当提供requiredGroups时适用,且仅适用于这些组。spring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

事务性

是否使用事务通道。spring-doc.cadn.net.cn

默认值: falsespring-doc.cadn.net.cn

TTL

声明队列时要应用的默认存活时间(以毫秒为单位)。仅当提供requiredGroups时才会对这些组生效。spring-doc.cadn.net.cn

默认值: no limitspring-doc.cadn.net.cn

在RabbitMQ的情况下,内容类型标头可以通过外部应用程序进行设置。</p><p>Spring Cloud Stream将其作为用于任何类型的传输的扩展内部协议的一部分来支持——包括Kafka(0.11之前版本)等不原生支持标头的传输方式。

4. 使用现有的队列/交换机

默认情况下,绑定器会自动 provision 具有与绑定属性 <prefix><destination> 的值同名的目标 topic exchange。目标默认为绑定名,如果未提供。当绑定使用者时,将自动 provision 名为 <prefix><destination>.<group> 的队列(如果指定了 group 绑定属性),或者没有 group 时使用匿名、自动删除队列。队列将使用“匹配全部”通配符路由键(#)进行非分区绑定或 <destination>-<instanceIndex> 进行分区绑定进行绑定。前缀默认为空 String。如果指定了带有 requiredGroups 的输出绑定,则为每个组 provision 队列/绑定。spring-doc.cadn.net.cn

有几种与兔子相关的绑定属性,允许您修改此默认行为。spring-doc.cadn.net.cn

如果您有一个想要使用的现有exchange/queue,您可以完全禁用自动配置,假设exchange名为myExchange,queue名为myQueue: \ " }spring-doc.cadn.net.cn

如果要让绑定程序为队列/交换机提供服务,但又想使用不同于此处讨论的默认值,请使用以下属性。 有关更多信息,请参阅上述属性文档。spring-doc.cadn.net.cn

  • spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKeyspring-doc.cadn.net.cn

  • spring.cloud.stream.rabbit.bindings.<binding name>.consumer.exchangeType=<type>spring-doc.cadn.net.cn

  • spring.cloud.stream.rabbit.bindings.<binding name>.producer.routingKeyExpression='myRoutingKey'spring-doc.cadn.net.cn

有相似的属性在声明死信交换/队列时使用,当autoBindDlqtruespring-doc.cadn.net.cn

5. 使用 RabbitMQ 绑定重试

当在绑定程序中启用重试时,监听器容器线程会在配置的任何退避期间被暂停。这在单个消费者需要严格顺序时可能很重要。但是,在其他使用情况下,它会阻止该线程上的其他消息被处理。替代使用绑定程序重试的方法是设置死信队列(DLQ)的时间存活以及DLQ本身的死信配置,并结合使用时间到活期功能。有关此处讨论属性的更多信息,请参阅“RabbitMQ 绑定程序属性”。您可以使用以下示例配置来启用此功能:spring-doc.cadn.net.cn

  • 设置autoBindDlqtrue。绑定器创建一个DLQ。可选地,您可以在deadLetterQueueName中指定名称。spring-doc.cadn.net.cn

  • 设置要等待重新传递之间的时间段,您想要的退避时间。您可以选择从 0 到 1792 秒之间的任何值。spring-doc.cadn.net.cn

  • 0 设置为默认交换机。从死信队列中过期的消息被路由回原队列,因为默认值1 是队列名称(2 )。通过不设置属性值来设置为默认交换机,如下一个示例所示。spring-doc.cadn.net.cn

要强制消息进入死信队列,要么抛出AmqpRejectAndDontRequeueException要么设置requeueRejectedtrue(默认)并抛出任何异常。spring-doc.cadn.net.cn

循环不间断,这对临时问题来说没问题,但你可能希望在一定次数尝试后放弃。幸运的是,RabbitMQ 提供了 0 标头,允许你确定已发生多少个周期。spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

在放弃后确认消息,抛出一个 ImmediateAcknowledgeAmqpExceptionspring-doc.cadn.net.cn

spring-doc.cadn.net.cn

5.1 将所有内容整合在一起

以下配置创建了一个交换 myDestination,该交换绑定到一个主题交换,并使用通配符路由键 # 绑定到队列 myDestination.consumerGroupspring-doc.cadn.net.cn

---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---

此配置创建了一个与直连交换机(DLX)绑定的死信队列,路由键为 myDestination.consumerGroup。当消息被拒绝时,它们会被路由到死信队列。5秒后,消息过期并使用队列名称作为路由键,将消息重新路由回原始队列,如下例所示:spring-doc.cadn.net.cn

Spring Boot 应用程序
@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {

    public static void main(String[] args) {
        SpringApplication.run(XDeathApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
        if (death != null && death.get("count").equals(3L)) {
            // giving up - don't send to DLX
            throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
        }
        throw new AmqpRejectAndDontRequeueException("failed");
    }

}

请注意,x-death 头部中的 count 属性是 Longspring-doc.cadn.net.cn

6. 错误通道

从版本 1.3 开始,绑定器会无条件地将每个消费者目标的异常发送到错误通道,并且还可以配置为将异步生产者发送失败发送到错误通道。
有关更多信息,请参阅“[spring-cloud-stream-overview-error-handling]”。spring-doc.cadn.net.cn

RabbitMQ有两种类型的发送失败:spring-doc.cadn.net.cn

后者较为罕见。 根据RabbitMQ文档,"[A nack] 只有在负责队列的Erlang进程发生内部错误时才会被投递"。spring-doc.cadn.net.cn

除了启用生产者错误通道(如“[spring-cloud-stream-overview-error-handling]”中所述)外,RabbitMQ绑定器仅在连接工厂正确配置时才会向通道发送消息,如下所示:spring-doc.cadn.net.cn

使用Spring Boot配置连接工厂时,设置以下属性:<br>spring-doc.cadn.net.cn

返回消息的ErrorMessage负载是一个带有以下属性的ReturnedAmqpMessageExceptionspring-doc.cadn.net.cn

对于负确认,负载是一个具有以下属性的NackedAmqpMessageExceptionspring-doc.cadn.net.cn

这些异常没有自动处理方式(例如发送到死信队列)。
您可以使用自己的Spring集成流程来消费这些异常。
spring-doc.cadn.net.cn

7. Dead-Letter 队列处理

由于无法预知用户希望如何处理死信消息,框架不提供任何标准机制来处理这些消息。
如果导致消息进入死信队列的原因是暂时性的,您可能希望将这些消息重新路由回原始队列。
但是,如果问题是永久性的,则可能会导致无限循环。
下面的Spring Boot应用程序显示了如何将这些消息重新路由回原始队列的例子,但在三次尝试后会将它们移动到第三个“停车场”队列。
第二个例子使用RabbitMQ延迟消息交换为重新排队的消息引入延迟。
在这个例子中,每次尝试时延迟都会增加。
这些示例使用@RabbitListener从DLQ接收消息。
在批处理过程中也可以使用RabbitTemplate.receive()spring-doc.cadn.net.cn

示例假设原始目标为 so8400in,消费组为 so8400spring-doc.cadn.net.cn

7.1. 非分区目的地

前两个示例适用于目标位置未分区的情况:spring-doc.cadn.net.cn

@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
            this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}
@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    private static final String DELAY_EXCHANGE = "dlqReRouter";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            headers.put("x-delay", 5000 * retriesHeader);
            this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

7.2. 分区目的地

使用分区目标时,所有分区共用一个死信队列。我们通过消息头来确定原始队列。spring-doc.cadn.net.cn

7.2.1. republishToDlq=false

republishToDlqfalse 时,RabbitMQ 使用包含有关原始目标信息的 x-death 标头将消息发布到死信交换/队列(DLX/DLQ),如下例所示:spring-doc.cadn.net.cn

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_DEATH_HEADER = "x-death";

	private static final String X_RETRIES_HEADER = "x-retries";

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@SuppressWarnings("unchecked")
	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
			String exchange = (String) xDeath.get(0).get("exchange");
			List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
			this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

7.2.2. republishToDlq=true

republishToDlqtrue 时,重新发布恢复程序会将原始交换机和路由键添加到头信息中,如下例所示:spring-doc.cadn.net.cn

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_RETRIES_HEADER = "x-retries";

	private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;

	private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
			String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
			this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

8. 使用 RabbitMQ 绑定器进行分区

RabbitMQ 不原生支持分区。spring-doc.cadn.net.cn

有时,向特定分区发送数据是有利的——例如,当您希望严格按顺序处理消息时,某个客户的全部消息应发送到同一个分区。spring-doc.cadn.net.cn

RabbitMessageChannelBinder 通过为每个分区绑定一个队列到目标交换器来提供分区。spring-doc.cadn.net.cn

以下 Java 和 YAML 示例显示了如何配置生产者:spring-doc.cadn.net.cn

生产者
@SpringBootApplication
@EnableBinding(Source.class)
public class RabbitPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "abc1", "def1", "qux1",
            "abc2", "def2", "qux2",
            "abc3", "def3", "qux3",
            "abc4", "def4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            output:
              destination: partitioned.destination
              producer:
                partitioned: true
                partition-key-expression: headers['partitionKey']
                partition-count: 2
                required-groups:
                - myGroup

前面示例中的配置使用了默认分区(key.hashCode() % partitionCount)。根据键值,这可能提供一个适当平衡的算法,也可能不提供。
您可以通过使用 partitionSelectorExpressionpartitionSelectorClass 属性来覆盖此默认设置。spring-doc.cadn.net.cn

required-groups属性仅在需要在生产者部署时为消费者队列配置时才需要。 否则,发送到分区的任何消息在相应的消费者部署之前都会丢失。spring-doc.cadn.net.cn

以下配置设置了一个主题交换:spring-doc.cadn.net.cn

part exchange

以下队列结维到该 Exchange:spring-doc.cadn.net.cn

part queues

以下绑定将队列与交换机关联:spring-doc.cadn.net.cn

part bindings

下面的 Java 和 YAML 示例继续了前面的例子,展示了如何配置消费者:spring-doc.cadn.net.cn

消费者
@SpringBootApplication
@EnableBinding(Sink.class)
public class RabbitPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        System.out.println(in + " received from queue " + queue);
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            input:
              destination: partitioned.destination
              group: myGroup
              consumer:
                partitioned: true
                instance-index: 0

0不支持动态缩放。spring-doc.cadn.net.cn

每个分区必须至少有一个消费者。spring-doc.cadn.net.cn

消费者的1用于指示哪个分区被消费。spring-doc.cadn.net.cn

如Cloud Foundry等平台只能有一个实例带有2spring-doc.cadn.net.cn