配置代理
AMQP 规范描述了如何使用该协议在代理上配置队列、交换机和绑定。这些操作(可从 0.8 规范及更高版本中移植)存在于 AmqpAdmin 接口中,位于 org.springframework.amqp.core 包内。RabbitMQ 对该类的实现是 RabbitAdmin,位于 org.springframework.amqp.rabbit.core 包中。
接口 AmqpAdmin 基于使用 Spring AMQP 领域抽象,如下所示:
public interface AmqpAdmin {
// Exchange Operations
void declareExchange(Exchange exchange);
void deleteExchange(String exchangeName);
// Queue Operations
Queue declareQueue();
String declareQueue(Queue queue);
boolean deleteQueue(String queueName);
void deleteQueue(String queueName, boolean unused, boolean empty);
void purgeQueue(String queueName, boolean noWait);
// Binding Operations
void declareBinding(Binding binding);
void removeBinding(Binding binding);
Properties getQueueProperties(String queueName);
QueueInformation getQueueInfo(String queueName);
}
参见 作用域操作。
方法 getQueueProperties() 返回关于队列的有限信息(消息数量和消费者数量)。
返回属性的键可作为常量在 RabbitAdmin 中获取(QUEUE_NAME、QUEUE_MESSAGE_COUNT 和 QUEUE_CONSUMER_COUNT)。
方法 getQueueInfo() 返回一个便捷的 QueueInformation 数据对象。
无参数 declareQueue() 方法在代理上定义一个队列,其名称为自动生成的。此自动生成队列的附加属性为 exclusive=true、autoDelete=true 和 durable=false。
方法 declareQueue(Queue queue) 接受一个 Queue 对象,并返回所声明队列的名称。如果所提供的 Queue 的 name 属性为空的 String,则代理服务器会使用生成的名称来声明该队列。该名称将返回给调用方,同时也会添加到 Queue 的 actualName 属性中。您只能通过直接调用 RabbitAdmin 才能以编程方式使用此功能。在应用上下文中以声明方式定义队列时,若启用自动声明(由管理器完成),可将名称属性设置为 ""(空字符串)。此时,代理服务器将自动生成队列名称。从版本 2.1 开始,监听器容器可以使用此类队列。有关更多信息,请参见 容器与代理命名队列。
这与 AnonymousQueue 形成对比,后者中框架会生成一个唯一的 (UUID) 名称,并将 durable 设置为 false,将 exclusive、autoDelete 设置为 true。一个 <rabbit:queue/> 若其 name 属性为空(或缺失),则始终会创建一个 AnonymousQueue。
请参阅 AnonymousQueue 以了解为何 AnonymousQueue 比由代理生成的队列名称更受推荐,以及如何控制名称的格式。从版本 2.1 开始,匿名队列默认使用参数 Queue.X_QUEUE_LEADER_LOCATOR 设置为 client-local 进行声明。这可确保队列在应用程序所连接的节点上被声明。声明式队列必须具有固定名称,因为它们可能在上下文中的其他位置被引用——例如,在以下示例中的监听器中:
<rabbit:listener-container>
<rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>
参见 自动声明交换机、队列和绑定。
此接口的 RabbitMQ 实现为 RabbitAdmin,当通过 Spring XML 配置时,其示例如下:
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>
当缓存模式 CachingConnectionFactory 为 CHANNEL(默认值)时,RabbitAdmin 实现会自动延迟声明队列、交换机和绑定关系,这些组件均在同一 ApplicationContext 中声明。一旦打开与代理服务器的 Connection 连接,这些组件便会立即被声明。某些命名空间特性使得此操作极为便捷——例如,在 Stocks 示例应用程序中,我们有以下内容:
<rabbit:queue id="tradeQueue"/>
<rabbit:queue id="marketDataQueue"/>
<fanout-exchange name="broadcast.responses"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="tradeQueue"/>
</bindings>
</fanout-exchange>
<topic-exchange name="app.stock.marketdata"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
</bindings>
</topic-exchange>
在前面的例子中,我们使用了匿名队列(实际上,内部只是由框架生成名称的队列,而非由消息代理生成)并以 ID 引用它们。我们也可以声明具有显式名称的队列,这些名称同样作为其在上下文中的 Bean 定义标识符。以下示例配置了一个具有显式名称的队列:
<rabbit:queue name="stocks.trade.queue"/>
您可以同时提供 id 和 name 属性。这使您能够通过一个与队列名称无关的 ID 来引用该队列(例如,在绑定中使用)。 同时,它还允许使用标准 Spring 功能(如属性占位符和用于队列名称的 SpEL 表达式)。 当使用名称作为 Bean 标识符时,这些功能将不可用。 |
队列可以配置额外的参数——例如,x-message-ttl。当您使用命名空间支持时,这些参数以 Map(即参数名/参数值对)的形式提供,这些参数是通过使用 <rabbit:queue-arguments> 元素来定义的。以下示例展示了如何实现这一点:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="myDLX"/>
<entry key="x-dead-letter-routing-key" value="dlqRK"/>
</rabbit:queue-arguments>
</rabbit:queue>
默认情况下,参数被视为字符串。对于其他类型的参数,您必须提供其类型。以下示例展示了如何指定类型:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments value-type="java.lang.Long">
<entry key="x-message-ttl" value="100"/>
</rabbit:queue-arguments>
</rabbit:queue>
在提供混合类型参数时,您必须为每个条目元素指定类型。以下示例展示了如何实现这一点:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">100</value>
</entry>
<entry key="x-dead-letter-exchange" value="myDLX"/>
<entry key="x-dead-letter-routing-key" value="dlqRK"/>
</rabbit:queue-arguments>
</rabbit:queue>
从 Spring Framework 3.2 及更高版本开始,可以更简洁地声明如下:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
当你使用 Java 配置时,Queue.X_QUEUE_LEADER_LOCATOR 参数可通过 Queue 类上的 setLeaderLocator() 方法作为一级属性支持。
从版本 2.1 开始,默认情况下,匿名队列会通过此属性设置为 client-local 进行声明。
这可确保队列在应用程序所连接的节点上进行声明。
RabbitMQ 代理不允许声明参数不匹配的队列。例如,如果已存在一个 queue(即无 time to live 参数),而您尝试以(例如)key="x-message-ttl" value="100" 的参数重新声明该队列,则会抛出异常。 |
默认情况下,RabbitAdmin 在发生任何异常时会立即停止处理所有声明。这可能导致下游问题,例如监听器容器因另一个队列(在出错队列之后定义)未被声明而无法初始化。
此行为可以通过将 ignore-declaration-exceptions 属性设置为 true 来修改,该属性属于 RabbitAdmin 实例。
此选项会指示 RabbitAdmin 记录异常并继续声明其他元素。
在使用 Java 配置 RabbitAdmin 时,此属性被称为 ignoreDeclarationExceptions。
这是一个全局设置,适用于所有元素。
队列、交换机和绑定也具有类似属性,但仅适用于这些特定元素。
在 1.6 版本之前,此属性仅在通道中发生 IOException 时生效,例如当前属性与期望属性不匹配时。现在,此属性对任何异常均生效,包括 TimeoutException 及其他异常。
此外,任何声明异常都会导致发布一个 DeclarationExceptionEvent,这是一个 ApplicationEvent,可被上下文中的任何 ApplicationListener 消费。该事件包含对管理员的引用、正在被声明的元素以及 Throwable。
头交换
从版本 1.3 开始,您可以将 HeadersExchange 配置为匹配多个请求头。您还可以指定是任意一个还是所有请求头都必须匹配。以下示例展示了如何实现这一点:
<rabbit:headers-exchange name="headers-test">
<rabbit:bindings>
<rabbit:binding queue="bucket">
<rabbit:binding-arguments>
<entry key="foo" value="bar"/>
<entry key="baz" value="qux"/>
<entry key="x-match" value="all"/>
</rabbit:binding-arguments>
</rabbit:binding>
</rabbit:bindings>
</rabbit:headers-exchange>
从版本 1.6 开始,您可以将 Exchanges 配置为一个 internal 标志(默认值为 false),并且如果应用程序上下文中存在,则该 Exchange 会在 Broker 上通过一个 RabbitAdmin 正确配置。如果交换机的 internal 标志为 true,RabbitMQ 将不允许客户端使用该交换机。这在死信交换机或交换机到交换机的绑定中非常有用,此时您不希望交换机被发布者直接使用。
要了解如何使用 Java 配置 AMQP 基础设施,请查看 Stock 示例应用程序,其中包含类 @Configuration,该类又具有 RabbitClientConfiguration 和 RabbitServerConfiguration 子类。以下列表显示了 AbstractStockRabbitConfiguration 的代码:
@Configuration
public abstract class AbstractStockAppRabbitConfiguration {
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
configureRabbitTemplate(template);
return template;
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public TopicExchange marketDataExchange() {
return new TopicExchange("app.stock.marketdata");
}
// additional code omitted for brevity
}
在股票应用中,服务器是通过以下 @Configuration 类进行配置的:
@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration {
@Bean
public Queue stockRequestQueue() {
return new Queue("app.stock.request");
}
}
这是整个继承链的终点,共包含 @Configuration 个类。
最终结果是,在应用程序启动时,TopicExchange 和 Queue 已被声明给代理(broker)。
服务器配置中未将 TopicExchange 绑定到队列,该操作由客户端应用程序完成。
然而,股票请求队列会自动绑定到 AMQP 默认交换机。
此行为由规范所定义。
客户端 @Configuration 类要有趣一些。其声明如下:
@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
/**
* Binds to the market data exchange.
* Interested in any stock quotes
* that match its routing key.
*/
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
// additional code omitted for brevity
}
客户端通过 declareQueue() 方法在 AmqpAdmin 上声明另一个队列。它将该队列绑定到市场数据交换机,绑定的路由模式存储在属性文件中。
队列和交换器的构建器API
版本 1.6 引入了一个便捷的流式 API,用于在使用 Java 配置时配置 Queue 和 Exchange 对象。以下示例展示了如何使用它:
@Bean
public Queue queue() {
return QueueBuilder.nonDurable("foo")
.autoDelete()
.exclusive()
.withArgument("foo", "bar")
.build();
}
@Bean
public Exchange exchange() {
return ExchangeBuilder.directExchange("foo")
.autoDelete()
.internal()
.withArgument("foo", "bar")
.build();
}
查看 org.springframework.amqp.core.QueueBuilder 和 org.springframework.amqp.core.ExchangeBuilder 的 Javadoc 文档以获取更多信息。
从版本 2.0 开始,ExchangeBuilder 默认创建持久化交换机(durable exchanges),以与各个 AbstractExchange 类中的简单构造函数保持一致。要使用构建器创建非持久化交换机,请在调用 .build() 之前使用 .durable(false)。不带参数的 durable() 方法已不再提供。
版本 2.2 引入了流畅式 API,用于添加“常见”的交换机和队列参数…
@Bean
public Queue allArgs1() {
return QueueBuilder.nonDurable("all.args.1")
.ttl(1000)
.expires(200_000)
.maxLength(42)
.maxLengthBytes(10_000)
.overflow(Overflow.rejectPublish)
.deadLetterExchange("dlx")
.deadLetterRoutingKey("dlrk")
.maxPriority(4)
.lazy()
.leaderLocator(LeaderLocator.minLeaders)
.singleActiveConsumer()
.build();
}
@Bean
public DirectExchange ex() {
return ExchangeBuilder.directExchange("ex.with.alternate")
.durable(true)
.alternate("alternate")
.build();
}
声明交换、队列和绑定的集合
您可以将 Declarable 个对象的集合(Queue、Exchange 和 Binding)包装在 Declarables 个对象中。
当连接建立时(初始连接及连接失败后),RabbitAdmin 会检测应用上下文中的此类 Bean(以及独立的 Declarable Bean),并在代理上声明其中包含的对象。
以下示例展示了如何实现此操作:
@Configuration
public static class Config {
@Bean
public CachingConnectionFactory cf() {
return new CachingConnectionFactory("localhost");
}
@Bean
public RabbitAdmin admin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}
@Bean
public DirectExchange e1() {
return new DirectExchange("e1", false, true);
}
@Bean
public Queue q1() {
return new Queue("q1", false, false, true);
}
@Bean
public Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}
@Bean
public Declarables es() {
return new Declarables(
new DirectExchange("e2", false, true),
new DirectExchange("e3", false, true));
}
@Bean
public Declarables qs() {
return new Declarables(
new Queue("q2", false, false, true),
new Queue("q3", false, false, true));
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Declarables prototypes() {
return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
}
@Bean
public Declarables bs() {
return new Declarables(
new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
}
@Bean
public Declarables ds() {
return new Declarables(
new DirectExchange("e4", false, true),
new Queue("q4", false, false, true),
new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
}
}
在 2.1 版本之前的版本中,您可以通过定义类型为 Collection<Declarable> 的 bean 来声明多个 Declarable 实例。这在某些情况下可能引发不期望的副作用,因为管理员必须遍历所有 |
版本 2.2 添加了 getDeclarablesByType 方法到 Declarables;此方法可用作一种便捷方式,例如在声明监听器容器 bean 时使用。
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
Declarables mixedDeclarables, MessageListener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
container.setMessageListener(listener);
return container;
}
条件声明
默认情况下,所有队列、交换机和绑定均由应用程序上下文中的所有 RabbitAdmin 实例声明(假设它们具有 auto-startup="true")。
从版本 2.1.9 开始,RabbitAdmin 新增了一个属性 explicitDeclarationsOnly(默认值为 false);当该属性设置为 true 时,管理员将仅声明那些显式配置为由该管理员声明的 Bean。
| 从 1.2 版本开始,您可以有条件地声明这些元素。这在应用程序连接到多个代理(broker)且需要指定某个特定元素应与哪些代理进行声明时特别有用。 |
表示这些元素的类实现了 Declarable,该接口包含两个方法:shouldDeclare() 和 getDeclaringAdmins()。RabbitAdmin 使用这些方法来确定特定实例是否应实际处理其 Connection 上的声明。
属性可在命名空间中作为属性使用,如下例所示:
<rabbit:admin id="admin1" connection-factory="CF1" />
<rabbit:admin id="admin2" connection-factory="CF2" />
<rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" />
<rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" />
<rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" />
<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />
<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />
<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
<rabbit:bindings>
<rabbit:binding key="foo" queue="bar"/>
</rabbit:bindings>
</rabbit:direct-exchange>
默认情况下,auto-declare 属性为 true;如果未提供 declared-by(或为空),则所有 RabbitAdmin 实例均会声明该对象(前提是管理员的 auto-startup 属性为 true,即默认值,且管理员的 explicit-declarations-only 属性为 false)。 |
同样,您也可以使用基于 Java 的 @Configuration 来实现相同的效果。在以下示例中,组件由 admin1 声明,但并非由 admin2 声明:
@Bean
public RabbitAdmin admin1() {
return new RabbitAdmin(cf1());
}
@Bean
public RabbitAdmin admin2() {
return new RabbitAdmin(cf2());
}
@Bean
public Queue queue() {
Queue queue = new Queue("foo");
queue.setAdminsThatShouldDeclare(admin1());
return queue;
}
@Bean
public Exchange exchange() {
DirectExchange exchange = new DirectExchange("bar");
exchange.setAdminsThatShouldDeclare(admin1());
return exchange;
}
@Bean
public Binding binding() {
Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
binding.setAdminsThatShouldDeclare(admin1());
return binding;
}
关于这一点的说明id和name属性
属性 name 在 <rabbit:queue/> 和 <rabbit:exchange/> 元素上反映的是代理中实体的名称。对于队列,如果省略了 name,则会创建一个匿名队列(参见 AnonymousQueue)。
在 2.0 版本之前的版本中,name 也被注册为一个 Bean 名称别名(类似于 name 在 <bean/> 元素上的用法)。
这导致了两个问题:
-
它阻止了声明一个与现有队列或交换机同名的队列和交换机。
-
如果别名包含 SpEL 表达式(
#{…}),则无法解析该别名。
从版本 2.0 开始,如果您同时声明了带有 id 和 name 属性的这些元素之一,则该名称将不再被声明为一个 Bean 名称别名。如果希望声明一个队列和一个交换机并使用相同的 name,则必须提供一个 id。
如果元素仅包含一个 name 属性,则不会发生任何变化。
该 Bean 仍可通过 name 引用——例如,在绑定声明中。
然而,如果名称包含 SpEL 表达式,您仍然无法引用它——您必须为引用目的提供一个 id。
AnonymousQueue
一般来说,当您需要一个唯一命名、独占且自动删除的队列时,我们建议您使用 AnonymousQueue,而不是由代理服务器定义的队列名称(使用 "" 作为 Queue 名称会导致代理服务器生成队列名称)。
这是因为:
-
队列实际上是在与代理建立连接时声明的。这发生在 Bean 被创建并装配完成之后很久。</p><p>使用该队列的 Bean 需要知道其名称。</p><p>事实上,当应用程序启动时,代理可能甚至尚未运行。
-
如果由于某种原因与代理的连接丢失,管理员会重新声明名称相同的
AnonymousQueue。如果我们使用代理声明的队列,队列名称将会改变。
您可以控制由 AnonymousQueue 实例使用的队列名称的格式。
默认情况下,队列名称以 spring.gen- 开头,后接 UUID 的 Base64 编码表示形式——例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g。
您可以在构造函数参数中提供一个 AnonymousQueue.NamingStrategy 的实现。
以下示例展示了如何做到这一点:
@Bean
public Queue anon1() {
return new AnonymousQueue();
}
@Bean
public Queue anon2() {
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}
@Bean
public Queue anon3() {
return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}
第一个 Bean 生成一个以 spring.gen- 开头、后接 UUID 的 Base64 编码表示的队列名称——例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g。
第二个 Bean 生成一个以 something- 开头、后接 UUID 的 Base64 编码表示的队列名称。
第三个 Bean 仅使用 UUID(不进行 Base64 转换)来生成名称——例如,f20c818a-006b-4416-bf91-643590fedb0e。
Base64 编码使用 RFC 4648 中定义的“URL 和文件名安全字母表”。尾部填充字符(=)已被移除。
您可以提供自己的命名策略,从而在队列名称中包含其他信息(例如应用程序名称或客户端主机)。
在使用 XML 配置时,您可以指定命名策略。naming-strategy 属性存在于 <rabbit:queue> 元素上,该元素用于引用实现了 AnonymousQueue.NamingStrategy 的 Bean。以下示例展示了以各种方式指定命名策略的方法:
<rabbit:queue id="uuidAnon" />
<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />
<rabbit:queue id="customAnon" naming-strategy="customNamer" />
<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />
<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
<constructor-arg value="custom.gen-" />
</bean>
第一个示例创建诸如 spring.gen-MRBv9sqISkuCiPfOYfpo4g 这样的名称。第二个示例使用 UUID 的字符串表示形式创建名称。第三个示例创建诸如 custom.gen-MRBv9sqISkuCiPfOYfpo4g 这样的名称。
您还可以提供自己的命名策略bean。
从版本 2.1 开始,匿名队列默认通过设置参数 Queue.X_QUEUE_LEADER_LOCATOR 为 client-local 进行声明。这可确保队列在应用程序所连接的节点上被声明。您可以通过在构造实例后调用 queue.setLeaderLocator(null) 来恢复之前的行为。
恢复自动删除声明
通常情况下,RabbitAdmin(s)仅会恢复应用上下文中声明为 Bean 的队列/交换机/绑定;如果此类声明是自动删除的,当连接丢失时,代理服务器将将其移除。当连接重新建立后,管理员将重新声明这些实体。通常,通过调用 admin.declareQueue(…)、admin.declareExchange(…) 和 admin.declareBinding(…) 创建的实体不会被恢复。
从版本 2.4 开始,管理员新增了一个属性 redeclareManualDeclarations;当其值为 true 时,管理员将恢复这些实体,而不仅限于应用程序上下文中的 Bean。
如果调用 deleteQueue(…)、deleteExchange(…) 或 removeBinding(…),将不会对单个声明进行恢复。当队列和交换机被删除时,与可恢复实体关联的绑定会被移除。
最后,调用 resetAllManualDeclarations() 将防止恢复任何先前声明的实体。