参考指南
本指南介绍了RabbitMQ对Spring Cloud StreamBinder的实现。它包含有关其设计、使用和配置选项的信息,以及有关StreamCloudStream概念如何映射到RabbitMQ特定构造的信息。
1. 用法
要使用RabbitMQ绑定器,您可以将其添加到您的Spring Cloud流应用程序中,方法是使用以下Maven坐标:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
相反,您可以使用Spring Cloud Stream RabbitMQStarters,如下所示:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2. RabbitMQ Binder 概述
以下简化示意图展示了RabbitMQ binder的运行方式:
默认情况下,RabbitMQ 绑定实现将每个目标映射到TopicExchange。对于每个消费者组,Queue绑定到该TopicExchange。每个消费者实例都有一个对应的RabbitMQConsumer实例,用于其组的Queue。对于分区生产者和使用者,队列使用分区索引后缀,并将分区索引作为路由密钥。对于匿名使用者(没有group属性),将使用自动删除队列(带随机唯一名称)。
通过可选的autoBindDlq选项,您可以配置绑定器创建并配置死信队列(DLQs)(以及一个死信交换DLX,以及路由基础设施)。默认情况下,死信队列的名称为目的地名称加上 .dlq。如果启用了重试(maxAttempts > 1),则在重试用尽后,会将失败的消息传递到死信队列。如果重试被禁用(maxAttempts = 1),则应将requeueRejected设置为false(默认值),以便失败的消息被路由到DLQ,而不是重新排队。
另外,republishToDlq 会导致绑定程序将失败的消息发布到 DLQ(而不是拒绝它)。This feature lets additional information (such as the stack trace in the x-exception-stacktrace header) be added to the message in headers.查看frameMaxHeadroom属性,了解有关截断堆栈跟踪的信息。这个选项不需要启用重试。您可以仅在一次尝试后重新发布失败的消息。
Starting with version 1.
2,您可以配置重新发布消息的交付模式。参见republishDeliveryMode属性。
如果流侦听器抛出一个ImmediateAcknowledgeAmqpException,则会跳过DLQ,并且消息将被简单地丢弃。从版本2.1开始,无论republishToDlq如何设置,这都是正确的;在以前,只有当republishToDlq为false时才成立。
将 requeueRejected 设置为 true(使用 republishToDlq=false )会导致消息被重新排队并不断重新传递,除非故障是瞬态的,否则这可能不是您想要的操作。 通常,您应通过将 maxAttempts 设置为大于一或将 republishToDlq 设置为 true 来在绑定器中启用重试。 |
有关这些属性的更多信息,请参阅RabbitMQ绑定器属性。
该框架不提供任何标准机制来处理死信消息(或将其重新路由回主队列)。死信队列处理中描述了一些选项。
当在Spring Cloud Stream应用程序中使用多个RabbitMQ绑定器时,禁用'RabbitAutoConfiguration'很重要,以避免相同的配置从RabbitAutoConfiguration应用于两个绑定器。您可以使用 @SpringBootApplication注解来排除该类。 |
从版本2.0开始,RabbitMessageChannelBinder 将 RabbitTemplate.userPublisherConnection 属性设置为 true,以便非事务性生产者避免死锁,这可能会导致由于代理上的内存警报而导致缓存连接被阻塞。
| 当前,仅支持一个消费者(一个消费者监听多个队列)用于消息驱动式消费者;轮询式消费者只能从一个队列中获取消息。 |
3. 配置选项
本节包含与 RabbitMQ 绑定程序和绑定通道相关的设置。
有关通用绑定配置选项和属性的信息,请参阅Spring Cloud Stream核心文档。
3.1. RabbitMQ Binder 属性
默认情况下,RabbitMQ 绑定器使用 Spring Boot 的 ConnectionFactory。因此,它支持 RabbitMQ 的所有 Spring Boot 配置选项。(有关参考,请参阅 Spring Boot 文档)。RabbitMQ 配置选项使用 spring.rabbitmq 前缀。
除了Spring Boot选项之外,RabbitMQ绑定器还支持以下属性:
- spring.cloud.stream.rabbit.binder.adminAddresses
-
一个逗号分隔的 RabbitMQ 管理插件 URL 列表。仅在
nodes包含多个条目时使用。spring.rabbitmq.addresses中必须有对应的条目。如果您使用 RabbitMQ 集群并希望从托管队列的节点消费,则需要此列表。有关更多信息,请参阅队列亲和性和 LocalizedQueueConnectionFactory。默认:空。
- spring.cloud.stream.rabbit.binder.nodes
-
用逗号分隔的 RabbitMQ 节点名称列表。
当有多个条目时,用于定位队列所在的服务器地址。
此列表中的每个条目都必须在spring.rabbitmq.addresses中有相应的条目。
只有在使用 RabbitMQ 集群并且希望从托管队列的节点消费时才需要。
有关更多信息,请参阅队列关联性和本地化队列连接工厂。默认:空。
- spring.cloud.stream.rabbit.binder.compressionLevel
-
压缩绑定的压缩级别。
参见java.util.zip.Deflater。默认:
1(BEST_LEVEL)。 - spring.cloud.stream.binder.connection-name-prefix
-
此绑定器创建的连接使用的连接名称前缀。 每次打开新的连接时,
#n会递增。
名称为此前缀后跟#n,其中n每次打开新连接时都会递增。默认值:无(Spring AMQP 默认值)。
3.2. RabbitMQ 消费者属性
以下属性仅适用于 Rabbit 消费者,并且必须以 spring.cloud.stream.rabbit.bindings.<channelName>.consumer. 为前缀。
但是,如果需要将同一组属性应用于大多数绑定,为了防止重复,Spring Cloud Stream 支持在格式为 spring.cloud.stream.rabbit.default.<property>=<value> 的所有通道上设置值。
也请记住,绑定特定属性会将其默认值覆盖。
- 确认模式
-
确认模式。
默认值:
AUTO。 - 匿名组前缀
-
当绑定没有
group属性时,会将一个匿名、自动删除的队列绑定到目标交换机。
此类队列的默认命名策略会导致队列被命名为anonymous.<base64 representation of a UUID>。
设置此属性可更改为除默认值以外的前缀。默认值:
anonymous.。 - 自动绑定死信队列
-
是否自动声明死信队列(DLQ)并将其绑定到绑定器的死信交换(DLX)。
默认值:
false。 - 绑定路由键
-
用于将队列绑定到交换机的路由键(如果
bindQueue是true)。可以有多个键——参见bindingRoutingKeyDelimiter。对于分区目的地,将在每个键后追加-<instanceIndex>。默认值:
#。 - 绑定路由键分隔符
-
当此值不为 null 时,将把 'bindingRoutingKey' 视为由该值分隔的键列表;通常使用逗号作为分隔符。
默认值:
null。 - 绑定队列
-
是否声明队列并将其绑定到目标交换机。
如果已设置自己的基础设施并且先前创建并绑定了队列,请将其设置为false。默认值:
true。 - 消费者标记前缀
-
用于创建消费者标签;每次创建一个消费者时,将会附加
#n。
示例:${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}。默认值:无 - 代理将生成随机的消费者标签。
- 容器类型
-
选择要使用的监听器容器类型。 见 在 Spring AMQP 文档中的“选择容器” 以获取更多信息。
默认值:
simple - 死信队列名称
-
死信队列的名称
默认值:
prefix+destination.dlq - 死信交换
-
一个DLX分配到队列。 仅当
autoBindDlq为true时相关。默认值:'前缀+死信交换'
- 死信交换机类型
-
要分配给队列的DLX的类型。
仅当autoBindDlq为true时相关。默认值:'direct'
- 死信路由键
-
分配给队列的死信路由密钥。仅在
autoBindDlq是true时相关。默认值:
destination - 声明死信交换器
-
是否为目的地声明死信交换? 相关仅当
autoBindDlq为true时。 若已预配置DLX,请设为false。默认值:
true。 - 声明交换机
-
是否为目的地声明交换机。
默认值:
true。 - 延迟交换
-
是否将交换声明为
Delayed Message Exchange。
需要在代理上安装延迟消息交换插件。
将x-delayed-type参数设置为exchangeType。默认值:
false。 - 延迟队列绑定参数
-
绑定死信交换时应用于死信队列(dlq)的参数;与
headersdeadLetterExchangeType结合使用,用于指定要匹配的头部信息。例如…dlqBindingArguments.x-match=any,…dlqBindingArguments.someHeader=someValue。默认:空
- 死信队列死信交换机
-
如果声明了一个死信队列(DLQ),则需指定一个死信交换器(DLX)来分配给该队列。
默认值:
none - 死信队列死信路由键
-
如果声明了死信队列,则需要指定一个分配给该队列的死信路由键。
默认值:
none - 延迟队列到期
-
未使用的死信队列在被删除前的等待时间(以毫秒为单位)。
默认值:
no expiration - 懒加载
-
使用
x-queue-mode=lazy参数声明死信队列。参见“延迟队列”。建议改用策略而不是此设置,因为策略允许在不删除队列的情况下更改该设置。默认值:
false。 - 最大dlq长度
-
死信队列中的最大消息数。
默认值:
no limit - 最大字节数
-
死信队列中所有消息的最大总字节数。
默认值:
no limit - 最大优先级队列
-
死信队列中消息的最大优先级(0-255)。
默认值:
none - 队列溢出时的行为
-
当超过
dlqMaxLength或dlqMaxLengthBytes时要采取的操作;目前为drop-head或reject-publish,但请参考RabbitMQ文档。默认值:
none - dlqQuorum.deliveryLimit
-
当
quorum.enabled=true时,设置一个投递限制,超过该限制后消息将被丢弃或转为死信。默认:无 - 将应用代理程序默认设置。
- dlqQuorum.enabled
-
当为 true 时,创建一个法定人数死信队列而不是经典队列。
(默认值:false)
- dlqQuorum.initialQuorumSize
-
当
quorum.enabled=true时,设置初始法定人数大小。默认:无 - 将应用代理程序默认设置。
- 单个活动消费者
-
设置为 true 可将
x-single-active-consumer队列属性设为 true。默认值:
false - dlqTtl
-
声明死信队列时应用的默认生存时间(以毫秒为单位)。
默认值:
no limit - 持久化订阅
-
订阅是否应该持久化。只有在同时设置
group时才有效。默认值:
true。 - 交换自动删除
-
如果
declareExchange为真,则表示交换机是否应该自动删除(即在最后一个队列被删除后将其移除)。默认值:
true。 - 交换机持久化
-
如果
declareExchange为真,则表示交换机是否应具有持久性(即,在代理重启后仍然存在)。默认值:
true。 - 交易类型
-
交换类型:
direct、fanout、headers或topic表示非分区目的地,而direct、标题或topic表示分区目的地。默认值:
topic。 - 独家
-
是否创建独占消费者。 当此值为
true时,并发应设置为 1。 通常在需要严格顺序但启用热备用实例以在发生故障后接管时使用。 参见recoveryInterval,它控制备用实例尝试消费的频率。 当使用 RabbitMQ 3.8 或更高版本时,建议改用singleActiveConsumer。默认值:
false。 - 到期
-
多久以后未使用的队列会被删除(以毫秒为单位)。
默认值:
no expiration - 声明失败重试间隔
-
如果队列缺失,则尝试从队列中消耗之间的间隔(以毫秒为单位)。
默认值:5000
- 框架最大回程
-
将堆栈跟踪添加到死信队列(DLQ)消息头时,为其他标头保留的字节数。所有标头必须符合代理上配置的
frame_max大小。
堆栈跟踪可能很大;如果此属性加上堆栈跟踪的大小超过frame_max,则堆栈跟踪将被截断。
将会记录一个警告日志;考虑增加frame_max或通过捕获异常并抛出自定义较小堆栈跟踪的异常来减少堆栈跟踪。默认值:20000
- headerPatterns
-
从入站消息映射标头的模式。
默认:
['*'](所有头部)。 - 懒加载
-
使用
x-queue-mode=lazy参数声明队列。
参见 “延迟队列”。
建议改用策略,而不是此设置,因为策略允许在不删除队列的情况下更改设置。默认值:
false。 - 最大并发数
-
消费者的最大数量。
当containerType为direct时不支持。默认值:
1。 - 最大长度
-
队列中消息的最大数量。
默认值:
no limit - 最大长度字节
-
队列中所有消息的最大总字节数。
默认值:
no limit - 最高严重程度
-
队列中消息的最大优先级(0-255)。
默认值:
none - 缺少队列致命
-
当找不到队列时,是否将此情况视为致命错误并停止监听器容器。默认值为
false,因此容器会不断尝试从队列中消费消息——例如,在使用集群时,如果托管非高可用性队列的节点宕机。默认值:
false - 溢出行为
-
当超过
maxLength或maxLengthBytes时要采取的操作;目前为drop-head或reject-publish,但请参考RabbitMQ文档。默认值:
none - 预取
-
预取计数。
默认值:
1。 - 前缀
-
要添加到
destination和队列名称前缀。默认值:""。
- 队列绑定参数
-
绑定队列到交换时应用的参数;用于指定要匹配的标头,例如
…queueBindingArguments.x-match=any、…queueBindingArguments.someHeader=someValue。
使用headers或exchangeType来指定要匹配的标头。默认:空
- 队列声明重试次数
-
如果队列缺失,从队列中重新消费的重试次数。
仅当missingQueuesFatal是true时有效。
否则,容器将无限期地重试。
不支持当containerType是direct的情况。默认值:
3 - 仅队列名组
-
当为 true 时,从队列名称等于
group的队列中消费。否则队列名称为destination.group。例如,在使用 Spring Cloud Stream 来从现有的 RabbitMQ 队列进行消费时,这很有用。默认值:false。
- quorum.deliveryLimit
-
当
quorum.enabled=true时,设置一个投递限制,超过该限制后消息将被丢弃或转为死信。默认:无 - 将应用代理程序默认设置。
- quorum.enabled
-
为真时,创建一个法定人数队列而不是经典队列。
(默认值:false)
- quorum.initialQuorumSize
-
当
quorum.enabled=true时,设置初始法定人数大小。默认:无 - 将应用代理程序默认设置。
- 恢复间隔
-
连接恢复尝试之间的间隔,单位为毫秒。
默认值:
5000。 - 重新排队拒绝的
-
当重试被禁用或
republishToDlq时,是否应重新排队交付失败的消息。false表示是。默认值:
false。
- 重新发布配送模式
-
当
republishToDlq是true时,指定重新发布的消息的传递模式。默认值:
DeliveryMode.PERSISTENT - 转发到死信队列
-
默认情况下,重试次数用尽后仍失败的消息会被拒绝。如果配置了死信队列(DLQ),RabbitMQ会将失败的消息(未更改)路由到DLQ。如果设置为
true,绑定器会将失败的消息重新发布到DLQ,并添加额外的标题,包括最终失败原因中的异常消息和堆栈跟踪。另请参阅frameMaxHeadroom属性。(默认值:false)
- singleActiveConsumer
-
设置为 true 可将
x-single-active-consumer队列属性设为 true。默认值:
false - 事务性
-
是否使用事务通道。
默认值:
false。 - TTL
-
声明队列时要应用的默认存活时间(以毫秒为单位)。
默认值:
no limit - 字体大小
-
在确认之间交付的数量。
当containerType是direct时不支持。默认值:
1。
3.3. 高级监听器容器配置
要设置未通过绑定程序或绑定属性公开的侦听器容器属性,请向应用程序上下文中添加一个 0 类型的 bean。
将设置绑定程序和绑定属性,然后调用自定义程序。
自定义程序(1 方法)将提供队列名称以及消费者组作为参数。
3.4. 高级队列/交换器/绑定配置
每隔一段时间,RabbitMQ团队都会添加一些新功能,这些功能需要在声明时设置某些参数才能启用,例如,一个队列。通常,通过添加适当的属性来启用此类功能,但在当前版本中可能不会立即可用。从版本 3.0.1 开始,您可以现在向应用程序上下文添加 DeclarableCustomizer 个 bean(s),以便在声明之前进行修改。这允许您添加当前未直接由绑定器支持的参数。
3.5. 接收批量消息
通常,如果生产者绑定具有batch-enabled=true(参见Rabbit Producer Properties),或者消息是由BatchingRabbitTemplate创建的,则批次中的元素将作为对监听器方法的单独调用返回。
从版本3.0开始,如果将spring.cloud.stream.bindings.<name>.consumer.batch-mode设置为true,则可以将此类任何批次呈现为对监听器方法的List<?>。
3.6. Rabbit 生产者属性
Rabbit 生成器特有的以下属性可用,并且必须以“0”开头。
但是,如果需要将同一组属性应用于大多数绑定,为了防止重复,Spring Cloud Stream 支持在格式为 spring.cloud.stream.rabbit.default.<property>=<value> 的所有通道上设置值。
也请记住,绑定特定属性会将其默认值覆盖。
- 自动绑定死信队列
-
是否自动声明死信队列(DLQ)并将其绑定到绑定器的死信交换(DLX)。
默认值:
false。 - 批量处理是否启用
-
是否启用生产者的消息批处理。消息会根据以下属性(在本列表中的接下来三个条目中描述)进行批处理:
batchSize、batchBufferLimit和batchTimeout。批量处理以获取更多信息。接收已批处理的消息。默认值:
false。 - 批处理大小
-
启用批处理时,缓冲的消息数量。
默认值:
100。 - 批次缓冲限制
-
批量处理启用时的最大缓冲区大小。
默认值:
10000。 - 批量超时
-
启用批处理时的批处理超时时间。
默认值:
5000。 - 绑定路由键
-
绑定队列到交换机时使用的路由键(如果
bindQueue是true)。 可以有多个键 - 参见bindingRoutingKeyDelimiter。 对于分区目的地,每个键都会追加上-n。 仅在提供requiredGroups时适用,并且仅对这些组有效。默认值:
#。 - 绑定路由键分隔符
-
当此值不为 null 时,'bindingRoutingKey' 被认为是由该值分隔的键列表;通常使用逗号作为分隔符。 仅在提供
requiredGroups个值时适用,并且只适用于这些组。默认值:
null。 - 绑定队列
-
是否声明队列并将其绑定到目标交换机。如果已设置好自己的基础设施并且先前已创建并绑定了队列,则将其设置为
false。仅在提供requiredGroups时适用,然后仅适用于这些组。默认值:
true。 - 压缩
-
数据发送时是否应进行压缩。
默认值:
false。 - 确认消息通道
-
当
errorChannelEnabled为 true 时,发送正向交付确认的通道(即发布者确认)。 如果通道不存在,将使用此名称注册一个DirectChannel。 连接工厂必须配置以启用发布者确认。默认:
nullChannel(确认被丢弃)。 - 死信队列名称
-
死信队列的名称仅在提供
requiredGroups时适用,且仅适用于这些组。默认值:
prefix+destination.dlq - 死信交换
-
将DLX分配给队列。
仅在autoBindDlq为true时相关。
仅当提供requiredGroups时适用,且仅对这些组有效。默认值:'前缀+死信交换'
- 死信交换机类型
-
分配给队列的DLX类型。仅当
autoBindDlq为true时相关。
只有在提供requiredGroups时才适用,然后仅适用于这些组。默认值:'direct'
- 死信路由键
-
分配给队列的死信路由密钥。 仅在
autoBindDlq是true时相关。 只有当提供requiredGroups时才适用,然后只适用于这些组。默认值:
destination - 声明死信交换器
-
是否为目的地声明死信交换。 仅在
autoBindDlq是true时相关。 如果已配置DLX,请设置为false。 仅当提供了requiredGroups时适用,且仅适用于这些组。默认值:
true。 - 声明交换机
-
是否为目的地声明交换机。
默认值:
true。 - 延迟表达式
-
一个SpEL表达式,用于评估要应用到消息的延迟时间(
x-delay标题)。如果交换不是延迟消息交换,则此操作无效。默认情况下:未设置
x-delay标题。 - 延迟交换
-
是否将交换声明为
Delayed Message Exchange。
需要在代理上安装延迟消息交换插件。
将x-delayed-type参数设置为exchangeType。默认值:
false。 - 配送方式
-
交付模式。
默认值:
PERSISTENT。 - 延迟队列绑定参数
-
绑定死信队列(DLQ)到死信交换时应用的参数;与
headersdeadLetterExchangeType一起使用,用于指定要匹配的头部信息。
例如…dlqBindingArguments.x-match=any,…dlqBindingArguments.someHeader=someValue。
仅在提供requiredGroups时适用,并且只对这些组有效。默认:空
- 死信队列死信交换机
-
当声明死信队列(DLQ)时,会分配一个死信交换器(DLX)给该队列。
仅在提供了requiredGroups的情况下适用,并且仅适用于这些组。默认值:
none - 死信队列死信路由键
-
声明死信队列 (DLQ) 后,将一个死信路由键分配给该队列。 仅在提供
requiredGroups时适用,并且仅对这些组有效。默认值:
none - 延迟队列到期
-
闲置的死信队列在被删除之前保留的时间(以毫秒为单位)。仅当提供
requiredGroups时适用,并且仅对这些组有效。默认值:
no expiration - 懒加载
-
声明死信队列时使用
x-queue-mode=lazy参数。 参见 “惰性队列”。 建议使用策略而不是此设置,因为使用策略可以在不删除队列的情况下更改设置。 仅在提供requiredGroups个参数时适用,并且仅对这些组有效。 - 最大dlq长度
-
死信队列中的最大消息数。 仅在提供
requiredGroups时适用,并且仅对这些组有效。默认值:
no limit - 最大字节数
-
死信队列中所有消息的最大总字节数。仅在提供
requiredGroups时适用,且仅对这些组有效。默认值:
no limit - 最大优先级队列
-
死信队列中消息的最大优先级(0-255)
仅在提供requiredGroups时适用,且仅对这些组有效。默认值:
none - dlqQuorum.deliveryLimit
-
当
quorum.enabled=true时,设置一个投递限制,超过该限制后消息将被丢弃或转入死信队列。仅在提供requiredGroups时生效,并且仅适用于这些组。默认:无 - 将应用代理程序默认设置。
- dlqQuorum.enabled
-
当为 true 时,创建一个法定人数死信队列而不是经典队列。
仅在提供requiredGroups时适用,并且仅对这些组有效。(默认值:false)
- dlqQuorum.initialQuorumSize
-
当
quorum.enabled=true时,设置初始多数大小。
仅在提供requiredGroups时适用,并且仅适用于这些组。默认:无 - 将应用代理程序默认设置。
- 单个活动消费者
-
设置为 true 将
x-single-active-consumer队列属性设为true。仅当提供requiredGroups时适用,且仅适用于这些组。默认值:
false - dlqTtl
-
声明时应用于死信队列的默认存活时间(以毫秒为单位)。仅在提供
requiredGroups时适用,且仅对这些组生效。默认值:
no limit - 交换自动删除
-
如果
declareExchange是true,则交换是否应自动删除(在最后一个队列被移除后)。默认值:
true。 - 交换机持久化
-
如果
declareExchange是true,则交换是否应该具有持久性(在代理重启后仍然存在)。默认值:
true。 - 交易类型
-
交换类型:
direct,fanout,headers或topic用于非分区目的地,而direct,headers或topic用于分区目的地。默认值:
topic。 - 到期
-
闲置队列在被删除前等待的时间(以毫秒为单位)。仅当提供
requiredGroups时适用,并且仅对这些组有效。默认值:
no expiration - headerPatterns
-
用于映射到传出消息的标题模式。
默认:
['*'](所有头部)。 - 懒加载
-
使用
x-queue-mode=lazy参数声明队列。查看“延迟队列”。考虑使用策略而不是此设置,因为使用策略可以在不删除队列的情况下更改设置。仅当提供了requiredGroups时才适用,然后只适用于这些组。默认值:
false。 - 最大长度
-
队列中的最大消息数。仅在提供
requiredGroups时适用,并且仅对这些组有效。默认值:
no limit - 最大长度字节
-
队列中所有消息的最大总字节数。仅在提供
requiredGroups时适用,并且仅适用于这些组。默认值:
no limit - 最高严重程度
-
队列中消息的最大优先级(0-255)。仅当提供
requiredGroups时适用,并且仅对这些组有效。默认值:
none - 前缀
-
要添加到
destination交换机名称前的前缀。默认值:""。
- 队列绑定参数
-
绑定队列到交换时应用的参数;用于
headersexchangeType指定要匹配的标头。
例如…queueBindingArguments.x-match=any,…queueBindingArguments.someHeader=someValue。
仅在提供requiredGroups时适用,且仅适用于这些组。默认:空
- 仅队列名组
-
当为
true时,从队列名称与group相等的队列中消费。否则队列名称为destination.group。例如,在使用Spring Cloud Stream消费现有的RabbitMQ队列时,这非常有用。仅在提供requiredGroups时适用,并且仅对这些组有效。默认值:false。
- quorum.deliveryLimit
-
当
quorum.enabled=true时,设置一个投递限制,超过该限制后消息将被丢弃或转入死信队列。仅在提供requiredGroups时生效,并且仅适用于这些组。默认:无 - 将应用代理程序默认设置。
- quorum.enabled
-
当为 true 时,创建一个法定人数队列而不是经典队列。
仅在提供requiredGroups时适用,然后仅适用于这些组。(默认值:false)
- quorum.initialQuorumSize
-
当
quorum.enabled=true时,设置初始多数大小。
仅在提供requiredGroups时适用,并且仅适用于这些组。默认:无 - 将应用代理程序默认设置。
- 路由键表达式
-
一个用于确定在发布消息时使用的路由键的SpEL表达式。 对于固定的路由键,使用字面表达式,例如在属性文件中使用
routingKeyExpression='my.routingKey',在YAML文件中使用routingKeyExpression: '''my.routingKey'''。默认值:
destination或destination-<partition>(针对分区目的地)。 - singleActiveConsumer
-
设置为 true 将
x-single-active-consumer队列属性设为true。仅当提供requiredGroups时适用,且仅适用于这些组。默认值:
false - 事务性
-
是否使用事务通道。
默认值:
false。 - TTL
-
声明队列时要应用的默认存活时间(以毫秒为单位)。仅当提供
requiredGroups时才会对这些组生效。默认值:
no limit
| 在RabbitMQ的情况下,内容类型标头可以通过外部应用程序进行设置。</p><p>Spring Cloud Stream将其作为用于任何类型的传输的扩展内部协议的一部分来支持——包括Kafka(0.11之前版本)等不原生支持标头的传输方式。 |
4. 使用现有的队列/交换机
默认情况下,绑定器会自动 provision 具有与绑定属性 <prefix><destination> 的值同名的目标 topic exchange。目标默认为绑定名,如果未提供。当绑定使用者时,将自动 provision 名为 <prefix><destination>.<group> 的队列(如果指定了 group 绑定属性),或者没有 group 时使用匿名、自动删除队列。队列将使用“匹配全部”通配符路由键(#)进行非分区绑定或 <destination>-<instanceIndex> 进行分区绑定进行绑定。前缀默认为空 String。如果指定了带有 requiredGroups 的输出绑定,则为每个组 provision 队列/绑定。
有几种与兔子相关的绑定属性,允许您修改此默认行为。
如果您有一个想要使用的现有exchange/queue,您可以完全禁用自动配置,假设exchange名为myExchange,queue名为myQueue: \ "
}
-
spring.cloud.stream.bindings.<binding name>.destination=myExhange -
spring.cloud.stream.bindings.<binding name>.group=myQueue -
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindQueue=false -
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.declareExchange=false -
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.queueNameGroupOnly=true
如果要让绑定程序为队列/交换机提供服务,但又想使用不同于此处讨论的默认值,请使用以下属性。 有关更多信息,请参阅上述属性文档。
-
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey -
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.exchangeType=<type> -
spring.cloud.stream.rabbit.bindings.<binding name>.producer.routingKeyExpression='myRoutingKey'
有相似的属性在声明死信交换/队列时使用,当autoBindDlq是true。
5. 使用 RabbitMQ 绑定重试
当在绑定程序中启用重试时,监听器容器线程会在配置的任何退避期间被暂停。这在单个消费者需要严格顺序时可能很重要。但是,在其他使用情况下,它会阻止该线程上的其他消息被处理。替代使用绑定程序重试的方法是设置死信队列(DLQ)的时间存活以及DLQ本身的死信配置,并结合使用时间到活期功能。有关此处讨论属性的更多信息,请参阅“RabbitMQ 绑定程序属性”。您可以使用以下示例配置来启用此功能:
-
设置
autoBindDlq为true。绑定器创建一个DLQ。可选地,您可以在deadLetterQueueName中指定名称。 -
设置要等待重新传递之间的时间段,您想要的退避时间。您可以选择从 0 到 1792 秒之间的任何值。
-
将
0设置为默认交换机。从死信队列中过期的消息被路由回原队列,因为默认值1是队列名称(2)。通过不设置属性值来设置为默认交换机,如下一个示例所示。
要强制消息进入死信队列,要么抛出AmqpRejectAndDontRequeueException要么设置requeueRejected为true(默认)并抛出任何异常。
循环不间断,这对临时问题来说没问题,但你可能希望在一定次数尝试后放弃。幸运的是,RabbitMQ 提供了 0 标头,允许你确定已发生多少个周期。
在放弃后确认消息,抛出一个 ImmediateAcknowledgeAmqpException。
5.1 将所有内容整合在一起
以下配置创建了一个交换 myDestination,该交换绑定到一个主题交换,并使用通配符路由键 # 绑定到队列 myDestination.consumerGroup:
---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---
此配置创建了一个与直连交换机(DLX)绑定的死信队列,路由键为 myDestination.consumerGroup。当消息被拒绝时,它们会被路由到死信队列。5秒后,消息过期并使用队列名称作为路由键,将消息重新路由回原始队列,如下例所示:
@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {
public static void main(String[] args) {
SpringApplication.run(XDeathApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
if (death != null && death.get("count").equals(3L)) {
// giving up - don't send to DLX
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
throw new AmqpRejectAndDontRequeueException("failed");
}
}
请注意,x-death 头部中的 count 属性是 Long。
6. 错误通道
从版本 1.3 开始,绑定器会无条件地将每个消费者目标的异常发送到错误通道,并且还可以配置为将异步生产者发送失败发送到错误通道。
有关更多信息,请参阅“[spring-cloud-stream-overview-error-handling]”。
RabbitMQ有两种类型的发送失败:
-
返回的消息,
-
被否定确认发布者确认。
后者较为罕见。 根据RabbitMQ文档,"[A nack] 只有在负责队列的Erlang进程发生内部错误时才会被投递"。
除了启用生产者错误通道(如“[spring-cloud-stream-overview-error-handling]”中所述)外,RabbitMQ绑定器仅在连接工厂正确配置时才会向通道发送消息,如下所示:
-
ccf.setPublisherConfirms(true); -
ccf.setPublisherReturns(true);
使用Spring Boot配置连接工厂时,设置以下属性:<br>
-
spring.rabbitmq.publisher-confirms -
spring.rabbitmq.publisher-returns
返回消息的ErrorMessage负载是一个带有以下属性的ReturnedAmqpMessageException:
-
failedMessage: 未能发送的 spring-messagingMessage<?>。 -
amqpMessage: 原生的 Spring AMQPMessage。 -
replyCode:一个整数值,表示失败的原因(例如,312-无路由)。 -
replyText: 失败的原因(例如,NO_ROUTE)。 -
exchange: 消息发布的交换机。 -
routingKey: 消息发布时使用的路由键。
对于负确认,负载是一个具有以下属性的NackedAmqpMessageException:
-
failedMessage: 未能发送的 spring-messagingMessage<?>。 -
nackReason: 如果有原因(您可能需要检查代理日志以获取更多信息)。
这些异常没有自动处理方式(例如发送到死信队列)。
您可以使用自己的Spring集成流程来消费这些异常。
7. Dead-Letter 队列处理
由于无法预知用户希望如何处理死信消息,框架不提供任何标准机制来处理这些消息。
如果导致消息进入死信队列的原因是暂时性的,您可能希望将这些消息重新路由回原始队列。
但是,如果问题是永久性的,则可能会导致无限循环。
下面的Spring Boot应用程序显示了如何将这些消息重新路由回原始队列的例子,但在三次尝试后会将它们移动到第三个“停车场”队列。
第二个例子使用RabbitMQ延迟消息交换为重新排队的消息引入延迟。
在这个例子中,每次尝试时延迟都会增加。
这些示例使用@RabbitListener从DLQ接收消息。
在批处理过程中也可以使用RabbitTemplate.receive()。
示例假设原始目标为 so8400in,消费组为 so8400。
7.1. 非分区目的地
前两个示例适用于目标位置未分区的情况:
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String DELAY_EXCHANGE = "dlqReRouter";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
headers.put("x-delay", 5000 * retriesHeader);
this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public DirectExchange delayExchange() {
DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Binding bindOriginalToDelay() {
return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
7.2. 分区目的地
使用分区目标时,所有分区共用一个死信队列。我们通过消息头来确定原始队列。
7.2.1. republishToDlq=false
当 republishToDlq 是 false 时,RabbitMQ 使用包含有关原始目标信息的 x-death 标头将消息发布到死信交换/队列(DLX/DLQ),如下例所示:
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_DEATH_HEADER = "x-death";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@SuppressWarnings("unchecked")
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
String exchange = (String) xDeath.get(0).get("exchange");
List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
7.2.2. republishToDlq=true
当 republishToDlq 是 true 时,重新发布恢复程序会将原始交换机和路由键添加到头信息中,如下例所示:
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
8. 使用 RabbitMQ 绑定器进行分区
RabbitMQ 不原生支持分区。
有时,向特定分区发送数据是有利的——例如,当您希望严格按顺序处理消息时,某个客户的全部消息应发送到同一个分区。
该 RabbitMessageChannelBinder 通过为每个分区绑定一个队列到目标交换器来提供分区。
以下 Java 和 YAML 示例显示了如何配置生产者:
@SpringBootApplication
@EnableBinding(Source.class)
public class RabbitPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"abc1", "def1", "qux1",
"abc2", "def2", "qux2",
"abc3", "def3", "qux3",
"abc4", "def4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
.web(false)
.run(args);
}
@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
public Message<?> generate() {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
}
}
spring:
cloud:
stream:
bindings:
output:
destination: partitioned.destination
producer:
partitioned: true
partition-key-expression: headers['partitionKey']
partition-count: 2
required-groups:
- myGroup
|
前面示例中的配置使用了默认分区(
|
以下配置设置了一个主题交换:
以下队列结维到该 Exchange:
以下绑定将队列与交换机关联:
下面的 Java 和 YAML 示例继续了前面的例子,展示了如何配置消费者:
@SpringBootApplication
@EnableBinding(Sink.class)
public class RabbitPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@StreamListener(Sink.INPUT)
public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
System.out.println(in + " received from queue " + queue);
}
}
spring:
cloud:
stream:
bindings:
input:
destination: partitioned.destination
group: myGroup
consumer:
partitioned: true
instance-index: 0
0不支持动态缩放。 每个分区必须至少有一个消费者。 消费者的1用于指示哪个分区被消费。 如Cloud Foundry等平台只能有一个实例带有2。 |