此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring AMQP 3.2.6spring-doc.cadn.net.cn

配置代理

AMQP 规范描述了如何使用该协议在代理上配置队列、交换和绑定。 这些作(从 0.8 规范及更高版本开始可移植)存在于AmqpAdmin接口中的org.springframework.amqp.core包。 该类的 RabbitMQ 实现是RabbitAdmin位于org.springframework.amqp.rabbit.core包。spring-doc.cadn.net.cn

AmqpAdmin接口基于使用 Spring AMQP 域抽象,如以下列表所示:spring-doc.cadn.net.cn

public interface AmqpAdmin {

    // Exchange Operations

    void declareExchange(Exchange exchange);

    void deleteExchange(String exchangeName);

    // Queue Operations

    Queue declareQueue();

    String declareQueue(Queue queue);

    void deleteQueue(String queueName);

    void deleteQueue(String queueName, boolean unused, boolean empty);

    void purgeQueue(String queueName, boolean noWait);

    // Binding Operations

    void declareBinding(Binding binding);

    void removeBinding(Binding binding);

    Properties getQueueProperties(String queueName);

	QueueInformation getQueueInfo(String queueName);

}

getQueueProperties()方法返回有关队列的一些有限信息(消息计数和使用者计数)。 返回的属性的键可作为RabbitAdmin (QUEUE_NAME,QUEUE_MESSAGE_COUNTQUEUE_CONSUMER_COUNT). 这getQueueInfo()返回一个方便的QueueInformationdata 对象。spring-doc.cadn.net.cn

无参数declareQueue()方法在代理上定义一个队列,其名称为自动生成。 此自动生成队列的其他属性包括exclusive=true,autoDelete=truedurable=false.spring-doc.cadn.net.cn

declareQueue(Queue queue)方法采用Queue对象,并返回声明队列的名称。 如果name提供的属性Queue是一个空的String,代理使用生成的名称声明队列。 该名称将返回给调用方。 该名称也会添加到actualName属性的Queue. 只能通过调用RabbitAdmin径直。 在应用程序上下文中以声明方式定义队列时,管理员使用自动声明时,可以将 name 属性设置为(空字符串)。 然后,代理创建名称。 从 2.1 版开始,侦听器容器可以使用这种类型的队列。 有关更多信息,请参阅容器和代理命名队列。""spring-doc.cadn.net.cn

这与AnonymousQueue其中框架生成一个唯一的 (UUID) 名称和集合durablefalseexclusive,autoDeletetrue. 一个<rabbit:queue/>带有空(或缺失)name属性总是创建一个AnonymousQueue.spring-doc.cadn.net.cn

AnonymousQueue了解原因AnonymousQueue优于代理生成的队列名称以及 如何控制名称的格式。 从版本 2.1 开始,匿名队列使用 argumentQueue.X_QUEUE_LEADER_LOCATOR设置为client-local默认情况下。 这可确保在应用程序连接到的节点上声明队列。 声明性队列必须具有固定名称,因为它们可能会在上下文中的其他地方被引用,例如在 侦听器,如以下示例所示:spring-doc.cadn.net.cn

<rabbit:listener-container>
    <rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>

这个接口的RabbitMQ实现是RabbitAdmin,当使用 Spring XML 进行配置时,类似于以下示例:spring-doc.cadn.net.cn

<rabbit:connection-factory id="connectionFactory"/>

<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>

CachingConnectionFactory缓存模式为CHANNEL(默认值)、RabbitAdmin实现自动延迟声明在同一个中声明的队列、交换和绑定ApplicationContext. 这些组件在Connection向经纪人开放。 有一些命名空间功能使这变得非常方便——例如, 在 Stocks 示例应用程序中,我们有以下内容:spring-doc.cadn.net.cn

<rabbit:queue id="tradeQueue"/>

<rabbit:queue id="marketDataQueue"/>

<fanout-exchange name="broadcast.responses"
                 xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="tradeQueue"/>
    </bindings>
</fanout-exchange>

<topic-exchange name="app.stock.marketdata"
                xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
    </bindings>
</topic-exchange>

在前面的示例中,我们使用匿名队列(实际上,在内部,只是名称由框架而不是代理生成的队列)并通过 ID 引用它们。 我们还可以使用显式名称声明队列,这些队列也用作上下文中其 bean 定义的标识符。 以下示例配置具有显式名称的队列:spring-doc.cadn.net.cn

<rabbit:queue name="stocks.trade.queue"/>
您可以同时提供idname属性。 这允许您通过独立于队列名称的 ID 引用队列(例如,在绑定中)。 它还允许标准 Spring 功能(例如队列名称的属性占位符和 SpEL 表达式)。 当您使用名称作为 Bean 标识符时,这些功能不可用。

可以使用其他参数配置队列 — 例如x-message-ttl. 当您使用命名空间支持时,它们以Map的参数名称/参数值对,这些对是通过使用<rabbit:queue-arguments>元素。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="myDLX"/>
        <entry key="x-dead-letter-routing-key" value="dlqRK"/>
    </rabbit:queue-arguments>
</rabbit:queue>

默认情况下,参数假定为字符串。 对于其他类型的参数,必须提供类型。 以下示例演示如何指定类型:spring-doc.cadn.net.cn

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments value-type="java.lang.Long">
        <entry key="x-message-ttl" value="100"/>
    </rabbit:queue-arguments>
</rabbit:queue>

提供混合类型的参数时,必须为每个条目元素提供类型。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl">
            <value type="java.lang.Long">100</value>
        </entry>
        <entry key="x-dead-letter-exchange" value="myDLX"/>
        <entry key="x-dead-letter-routing-key" value="dlqRK"/>
    </rabbit:queue-arguments>
</rabbit:queue>

在 Spring Framework 3.2 及更高版本中,可以更简洁地声明这一点,如下所示:spring-doc.cadn.net.cn

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
        <entry key="x-ha-policy" value="all"/>
    </rabbit:queue-arguments>
</rabbit:queue>

当您使用 Java 配置时,Queue.X_QUEUE_LEADER_LOCATOR参数通过setLeaderLocator()方法Queue类。 从版本 2.1 开始,匿名队列的声明将此属性设置为client-local默认情况下。 这可确保在应用程序连接到的节点上声明队列。spring-doc.cadn.net.cn

RabbitMQ 代理不允许声明参数不匹配的队列。 例如,如果queue已经存在,没有time to live参数,并且您尝试使用(例如)key="x-message-ttl" value="100",则引发异常。

默认情况下,RabbitAdmin发生任何异常时立即停止处理所有声明。 这可能会导致下游问题,例如侦听器容器无法初始化,因为未声明另一个队列(在错误队列之后定义)。spring-doc.cadn.net.cn

可以通过设置ignore-declaration-exceptions属性设置为trueRabbitAdmin实例。 此选项指示RabbitAdmin以记录异常并继续声明其他元素。 配置RabbitAdmin使用 Java,此属性称为ignoreDeclarationExceptions. 这是适用于所有元素的全局设置。 队列、交换和绑定具有仅适用于这些元素的类似属性。spring-doc.cadn.net.cn

在 1.6 版之前,此属性仅在IOException发生在通道上,例如当前属性和所需属性之间不匹配时。现在,此属性对任何异常生效,包括TimeoutException和其他人。spring-doc.cadn.net.cn

此外,任何声明异常都会导致发布DeclarationExceptionEvent,这是一个ApplicationEvent可以被任何ApplicationListener在上下文中。该事件包含对管理员的引用、正在声明的元素以及Throwable.spring-doc.cadn.net.cn

标头交换

从 1.3 版开始,您可以配置HeadersExchange以匹配多个标头。您还可以指定是否必须匹配任何或所有标头。以下示例演示如何执行此作:spring-doc.cadn.net.cn

<rabbit:headers-exchange name="headers-test">
    <rabbit:bindings>
        <rabbit:binding queue="bucket">
            <rabbit:binding-arguments>
                <entry key="foo" value="bar"/>
                <entry key="baz" value="qux"/>
                <entry key="x-match" value="all"/>
            </rabbit:binding-arguments>
        </rabbit:binding>
    </rabbit:bindings>
</rabbit:headers-exchange>

从 1.6 版开始,您可以配置Exchanges使用internal标志(默认为false)和这样的Exchange通过RabbitAdmin(如果应用程序上下文中存在)。如果internalflag 是true对于交换,RabbitMQ 不允许客户端使用交换。这对于死信交换或交换到交换绑定很有用,您不希望交换被直接由发布者使用。spring-doc.cadn.net.cn

要了解如何使用 Java 配置 AMQP 基础设施,请查看 Stock 示例应用程序其中有@ConfigurationAbstractStockRabbitConfiguration,这反过来又有RabbitClientConfigurationRabbitServerConfiguration子。 以下列表显示了AbstractStockRabbitConfiguration:spring-doc.cadn.net.cn

@Configuration
public abstract class AbstractStockAppRabbitConfiguration {

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(jsonMessageConverter());
        configureRabbitTemplate(template);
        return template;
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public TopicExchange marketDataExchange() {
        return new TopicExchange("app.stock.marketdata");
    }

    // additional code omitted for brevity

}

在 Stock 应用程序中,服务器是使用以下命令配置的@Configuration类:spring-doc.cadn.net.cn

@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration  {

    @Bean
    public Queue stockRequestQueue() {
        return new Queue("app.stock.request");
    }
}

这是整个传承链的终结@Configuration类。 最终结果是TopicExchangeQueue在应用程序启动时向代理声明。没有绑定TopicExchange到服务器配置中的队列,就像在客户端应用程序中完成的那样。但是,库存请求队列会自动绑定到 AMQP 默认交换。此行为由规范定义。spring-doc.cadn.net.cn

客户端@Configurationclass 更有趣一些。它的声明如下:spring-doc.cadn.net.cn

@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {

    @Value("${stocks.quote.pattern}")
    private String marketDataRoutingKey;

    @Bean
    public Queue marketDataQueue() {
        return amqpAdmin().declareQueue();
    }

    /**
     * Binds to the market data exchange.
     * Interested in any stock quotes
     * that match its routing key.
     */
    @Bean
    public Binding marketDataBinding() {
        return BindingBuilder.bind(
                marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
    }

    // additional code omitted for brevity

}

客户端通过declareQueue()方法AmqpAdmin. 它使用属性文件中外部化的路由模式将该队列绑定到市场数据交换。spring-doc.cadn.net.cn

用于队列和交换的构建器 API

1.6 版引入了一个方便的流畅 API 来配置QueueExchange对象。以下示例显示如何使用它:spring-doc.cadn.net.cn

@Bean
public Queue queue() {
    return QueueBuilder.nonDurable("foo")
        .autoDelete()
        .exclusive()
        .withArgument("foo", "bar")
        .build();
}

@Bean
public Exchange exchange() {
  return ExchangeBuilder.directExchange("foo")
      .autoDelete()
      .internal()
      .withArgument("foo", "bar")
      .build();
}

从 2.0 版开始,ExchangeBuilder现在默认创建持久交换,以与单个AbstractExchange类。 要与构建器进行非持久交换,请使用.durable(false)在调用之前.build(). 这durable()不再提供没有参数的方法。spring-doc.cadn.net.cn

2.2 版引入了流畅的 API 来添加“众所周知”的交换和队列参数......spring-doc.cadn.net.cn

@Bean
public Queue allArgs1() {
    return QueueBuilder.nonDurable("all.args.1")
            .ttl(1000)
            .expires(200_000)
            .maxLength(42)
            .maxLengthBytes(10_000)
            .overflow(Overflow.rejectPublish)
            .deadLetterExchange("dlx")
            .deadLetterRoutingKey("dlrk")
            .maxPriority(4)
            .lazy()
            .leaderLocator(LeaderLocator.minLeaders)
            .singleActiveConsumer()
            .build();
}

@Bean
public DirectExchange ex() {
    return ExchangeBuilder.directExchange("ex.with.alternate")
            .durable(true)
            .alternate("alternate")
            .build();
}

声明交换、队列和绑定的集合

您可以包装Declarable对象 (Queue,ExchangeBinding) 在Declarables对象。 这RabbitAdmin检测此类 bean(以及离散的Declarablebeans),并在建立连接时(最初和连接失败后)在代理上声明包含的对象。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Configuration
public static class Config {

    @Bean
    public CachingConnectionFactory cf() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    public RabbitAdmin admin(ConnectionFactory cf) {
        return new RabbitAdmin(cf);
    }

    @Bean
    public DirectExchange e1() {
        return new DirectExchange("e1", false, true);
    }

    @Bean
    public Queue q1() {
        return new Queue("q1", false, false, true);
    }

    @Bean
    public Binding b1() {
        return BindingBuilder.bind(q1()).to(e1()).with("k1");
    }

    @Bean
    public Declarables es() {
        return new Declarables(
                new DirectExchange("e2", false, true),
                new DirectExchange("e3", false, true));
    }

    @Bean
    public Declarables qs() {
        return new Declarables(
                new Queue("q2", false, false, true),
                new Queue("q3", false, false, true));
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public Declarables prototypes() {
        return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
    }

    @Bean
    public Declarables bs() {
        return new Declarables(
                new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
                new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
    }

    @Bean
    public Declarables ds() {
        return new Declarables(
                new DirectExchange("e4", false, true),
                new Queue("q4", false, false, true),
                new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
    }

}
在 2.1 之前的版本中,您可以声明多个Declarable实例,通过定义类型为Collection<Declarable>. 在某些情况下,这可能会导致不良副作用,因为管理员必须遍历所有Collection<?>豆。

2.2 版添加了getDeclarablesByTypemethod 设置为Declarables; 这可以作为方便使用,例如,在声明侦听器容器 bean 时。spring-doc.cadn.net.cn

public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
        Declarables mixedDeclarables, MessageListener listener) {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
    container.setMessageListener(listener);
    return container;
}

有条件声明

默认情况下,所有队列、交换和绑定都由RabbitAdmin实例(假设它们有auto-startup="true") 在应用程序上下文中。spring-doc.cadn.net.cn

从 2.1.9 版本开始,RabbitAdmin有一个新属性explicitDeclarationsOnly(即false默认情况下);当将其设置为true,管理员将仅声明显式配置为由该管理员声明的 Bean。spring-doc.cadn.net.cn

从 1.2 版本开始,您可以有条件地声明这些元素。当应用程序连接到多个代理并需要指定应使用哪些代理声明特定元素时,这特别有用。

表示这些元素的类实现Declarable,它有两种方法:shouldDeclare()getDeclaringAdmins(). 这RabbitAdmin使用这些方法来确定特定实例是否应该实际处理其Connection.spring-doc.cadn.net.cn

这些属性在命名空间中作为属性提供,如以下示例所示:spring-doc.cadn.net.cn

<rabbit:admin id="admin1" connection-factory="CF1" />

<rabbit:admin id="admin2" connection-factory="CF2" />

<rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" />

<rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" />

<rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" />

<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />

<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />

<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
    <rabbit:bindings>
        <rabbit:binding key="foo" queue="bar"/>
    </rabbit:bindings>
</rabbit:direct-exchange>
默认情况下,auto-declare属性为true并且,如果declared-by未提供(或为空),则所有RabbitAdmin实例声明对象(只要管理员的auto-startup属性为true、默认值和管理员的explicit-declarations-only属性为 false)。

同样,您可以使用基于 Java 的@Configuration以达到相同的效果。在下面的示例中,组件由admin1但不是admin2:spring-doc.cadn.net.cn

@Bean
public RabbitAdmin admin1() {
    return new RabbitAdmin(cf1());
}

@Bean
public RabbitAdmin admin2() {
    return new RabbitAdmin(cf2());
}

@Bean
public Queue queue() {
    Queue queue = new Queue("foo");
    queue.setAdminsThatShouldDeclare(admin1());
    return queue;
}

@Bean
public Exchange exchange() {
    DirectExchange exchange = new DirectExchange("bar");
    exchange.setAdminsThatShouldDeclare(admin1());
    return exchange;
}

@Bean
public Binding binding() {
    Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
    binding.setAdminsThatShouldDeclare(admin1());
    return binding;
}

关于idname属性

name属性<rabbit:queue/><rabbit:exchange/>元素反映代理中实体的名称。 对于队列,如果name被省略,则创建一个匿名队列(请参阅AnonymousQueue).spring-doc.cadn.net.cn

在 2.0 之前的版本中,name也注册为 bean 名称别名(类似于name<bean/>元素)。spring-doc.cadn.net.cn

这导致了两个问题:spring-doc.cadn.net.cn

从 2.0 版开始,如果您声明其中一个元素同时使用id name属性,则该名称不再声明为 Bean 名称别名。 如果您希望声明队列并与相同的队列进行交换name,您必须提供id.spring-doc.cadn.net.cn

如果元素只有name属性。 bean 仍然可以被name——例如,在具有约束力的声明中。 但是,如果名称包含 SpEL,您仍然无法引用它 — 您必须提供id仅供参考。spring-doc.cadn.net.cn

AnonymousQueue

通常,当您需要一个唯一命名的独占自动删除队列时,我们建议您使用AnonymousQueue而不是代理定义的队列名称(用作""Queuename 导致代理生成队列 名称)。spring-doc.cadn.net.cn

这是因为:spring-doc.cadn.net.cn

  1. 队列实际上是在建立与代理的连接时声明的。 这是在创建 bean 并连接在一起很久之后。 使用队列的 Bean 需要知道它的名称。 事实上,在应用程序启动时,代理甚至可能没有运行。spring-doc.cadn.net.cn

  2. 如果由于某种原因与代理的连接丢失,管理员会重新声明AnonymousQueue同名。 如果我们使用代理声明的队列,队列名称将发生变化。spring-doc.cadn.net.cn

您可以控制AnonymousQueue实例。spring-doc.cadn.net.cn

默认情况下,队列名称的前缀为spring.gen-后跟 base64 表示UUID— 例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g.spring-doc.cadn.net.cn

您可以提供AnonymousQueue.NamingStrategy构造函数参数中的实现。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public Queue anon1() {
    return new AnonymousQueue();
}

@Bean
public Queue anon2() {
    return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}

@Bean
public Queue anon3() {
    return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}

第一个 bean 生成一个队列名称,前缀为spring.gen-后跟 base64 表示UUID— 用于 例:spring.gen-MRBv9sqISkuCiPfOYfpo4g. 第二个 bean 生成一个队列名称,前缀为something-后跟 base64 表示UUID. 第三个 bean 仅使用 UUID(无 base64 转换)生成名称,例如f20c818a-006b-4416-bf91-643590fedb0e.spring-doc.cadn.net.cn

base64 编码使用 RFC 4648 中的“URL 和文件名安全字母表”。 尾随填充字符 () 被删除。=spring-doc.cadn.net.cn

您可以提供自己的命名策略,从而可以在队列名称中包含其他信息(例如应用程序名称或客户端主机)。spring-doc.cadn.net.cn

使用 XML 配置时,可以指定命名策略。 这naming-strategy属性存在于<rabbit:queue>元素 对于实现AnonymousQueue.NamingStrategy. 以下示例演示如何以各种方式指定命名策略:spring-doc.cadn.net.cn

<rabbit:queue id="uuidAnon" />

<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />

<rabbit:queue id="customAnon" naming-strategy="customNamer" />

<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />

<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
    <constructor-arg value="custom.gen-" />
</bean>

第一个示例创建的名称如下spring.gen-MRBv9sqISkuCiPfOYfpo4g. 第二个示例使用 UUID 的 String 表示形式创建名称。 第三个示例创建的名称如下custom.gen-MRBv9sqISkuCiPfOYfpo4g.spring-doc.cadn.net.cn

您还可以提供自己的命名策略 bean。spring-doc.cadn.net.cn

从版本 2.1 开始,匿名队列使用 argumentQueue.X_QUEUE_LEADER_LOCATOR设置为client-local默认情况下。 这可确保在应用程序连接到的节点上声明队列。 您可以通过调用queue.setLeaderLocator(null)在构造实例后。spring-doc.cadn.net.cn

恢复自动删除声明

通常,RabbitAdmin(s) 仅恢复在应用程序上下文中声明为 bean 的队列/交换/绑定;如果任何此类声明是自动删除的,则在连接丢失时代理将删除它们。 重新建立连接后,管理员将重新声明实体。 通常,通过调用admin.declareQueue(…​),admin.declareExchange(…​)admin.declareBinding(…​)将无法恢复。spring-doc.cadn.net.cn

从 2.4 版开始,管理员有一个新属性redeclareManualDeclarations;什么时候true,除了应用程序上下文中的 bean 之外,管理员还将恢复这些实体。spring-doc.cadn.net.cn

如果出现以下情况,将不会恢复个人声明deleteQueue(…​),deleteExchange(…​)removeBinding(…​)被称为。 删除队列和交换时,将从可恢复实体中删除关联的绑定。spring-doc.cadn.net.cn

最后,调用resetAllManualDeclarations()将阻止恢复任何先前申报的实体。spring-doc.cadn.net.cn