此版本仍在开发中,目前尚不稳定。如需最新稳定版本,请使用 Spring AMQP 4.0.2spring-doc.cadn.net.cn

RabbitMQ AMQP 1.0 支持

版本 4.0 引入了 spring-rabbitmq-client 模块,用于在 RabbitMQ 上支持 AMQP 1.0 协议。spring-doc.cadn.net.cn

该构件基于 com.rabbitmq.client:amqp-client 库,因此仅能与 RabbitMQ 及其对 AMQP 1.0 协议的支持配合使用。它不能用于任何任意的 AMQP 1.0 消息代理。对于此目的,目前推荐使用 JMS 网桥 以及相应的 Spring JMS 集成。spring-doc.cadn.net.cn

此依赖项必须添加到项目中,才能与 RabbitMQ AMQP 1.0 支持进行交互:spring-doc.cadn.net.cn

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbitmq-client</artifactId>
  <version>4.1.0-SNAPSHOT</version>
</dependency>
implementation 'org.springframework.amqp:spring-rabbitmq-client:4.1.0-SNAPSHOT'

代码 spring-rabbit(用于 AMQP 0.9.1 协议)作为传递依赖项引入,以便在新客户端中复用一些通用 API,例如异常处理及 @RabbitListener 支持。并非必须在目标项目中同时使用这两种功能,但 RabbitMQ 允许 AMQP 0.9.1 和 1.0 同时共存。spring-doc.cadn.net.cn

有关 RabbitMQ AMQP 1.0 Java 客户端的更多信息,请参阅其 文档spring-doc.cadn.net.cn

RabbitMQ AMQP 1.0 环境

代码 com.rabbitmq.client.amqp.Environment 是项目中必须首先添加的内容,用于连接管理及其他通用设置。它是一个节点或节点集群的入口点。环境允许创建连接。它可以包含在连接之间共享的与基础设施相关的配置设置,例如线程池、指标和/或监控:spring-doc.cadn.net.cn

@Bean
Environment environment() {
    return new AmqpEnvironmentBuilder()
            .connectionSettings()
            .port(5672)
            .environmentBuilder()
            .build();
}

同一个 Environment 实例可用于连接到不同的 RabbitMQ 代理服务器,此时必须在特定连接上提供连接设置。
请参见下方。spring-doc.cadn.net.cn

AMQP 连接工厂

抽象 org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory 引入的目的是用于管理 com.rabbitmq.client.amqp.Connection。不要将其与仅适用于 AMQP 0.9.1 协议的 org.springframework.amqp.rabbit.connection.ConnectionFactory 混淆。SingleAmqpConnectionFactory 的实现用于管理一个连接及其设置。相同的 Connection 可在多个生产者、消费者和管理组件之间共享。多路复用由链接抽象处理,该抽象在 AMQP 客户端库内部用于 AMQP 1.0 协议的实现。Connection 具有恢复能力,并且还负责处理拓扑结构。spring-doc.cadn.net.cn

在大多数情况下,只需将此 Bean 添加到项目中即可:spring-doc.cadn.net.cn

@Bean
AmqpConnectionFactory connectionFactory(Environment environment) {
    return new SingleAmqpConnectionFactory(environment);
}

查看 SingleAmqpConnectionFactory 个用于所有连接特定设置的 setter。spring-doc.cadn.net.cn

RabbitMQ 拓扑管理

对于从应用角度管理拓扑结构(交换机、队列及它们之间的绑定关系),RabbitAmqpAdmin 存在,它是现有 AmqpAdmin 接口的一个实现:spring-doc.cadn.net.cn

@Bean
RabbitAmqpAdmin admin(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpAdmin(connectionFactory);
}

ExchangeQueueBindingDeclarables实例相同的bean定义,如配置代理中所述,必须用于管理拓扑结构。 RabbitAdmin来自spring-rabbit也可以实现此功能,但这是在AMQP 0.9.1连接上进行的;而由于RabbitAmqpAdmin基于AMQP 1.0连接,因此拓扑恢复可在此处顺利进行,同时也会处理发布者和订阅者的恢复。spring-doc.cadn.net.cn

在它的 start() 生命周期回调中,RabbitAmqpAdmin 执行相应的 Bean 扫描。
在运行时,可以手动调用 initialize() 以及所有其他 RabbitMQ 实体管理方法。
内部,RabbitAmqpAdmin 使用 com.rabbitmq.client.amqp.Connection.management() API 执行相应的拓扑操作。spring-doc.cadn.net.cn

RabbitAmqpTemplate

RabbitAmqpTemplateAsyncAmqpTemplate 的一个实现,并使用 AMQP 1.0 协议执行各种发送/接收操作。需要一个 AmqpConnectionFactory,并且可以使用一些默认配置进行设置。即使 com.rabbitmq.client:amqp-client 库自带一个 com.rabbitmq.client.amqp.MessageRabbitAmqpTemplate 仍会基于广为人知的 org.springframework.amqp.core.Message 提供 API,包含诸如 MessagePropertiesMessageConverter 等所有支持类的抽象。在 com.rabbitmq.client.amqp.MessageRabbitAmqpTemplate 之间的转换由 RabbitAmqpTemplate 内部完成。所有方法均返回一个 CompletableFuture,以便最终获取操作结果。对于使用普通对象的操作,需要对消息体进行转换,而 SimpleMessageConverter 默认被用于此目的。有关转换的更多信息,请参阅 消息转换器spring-doc.cadn.net.cn

通常,仅一个这样的 Bean 就足以执行所有可能的模板模式操作:spring-doc.cadn.net.cn

@Bean
RabbitAmqpTemplate rabbitTemplate(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpTemplate(connectionFactory);
}

它可以配置为某些默认交换机和路由键,或者仅配置队列。该 RabbitAmqpTemplate 具有一个用于接收操作的默认队列,以及另一个用于请求-响应操作的默认队列;如果客户端未提供临时队列,则由客户端创建临时队列以用于请求。spring-doc.cadn.net.cn

以下是一些 RabbitAmqpTemplate 操作的示例:spring-doc.cadn.net.cn

@Bean
DirectExchange e1() {
    return new DirectExchange("e1");
}

@Bean
Queue q1() {
    return QueueBuilder.durable("q1").deadLetterExchange("dlx1").build();
}

@Bean
Binding b1() {
    return BindingBuilder.bind(q1()).to(e1()).with("k1");
}

...

@Test
void defaultExchangeAndRoutingKey() {
    this.rabbitAmqpTemplate.setExchange("e1");
    this.rabbitAmqpTemplate.setRoutingKey("k1");
	this.rabbitAmqpTemplate.setReceiveQueue("q1");

    assertThat(this.rabbitAmqpTemplate.convertAndSend("test1"))
            .succeedsWithin(Duration.ofSeconds(10));

    assertThat(this.rabbitAmqpTemplate.receiveAndConvert())
            .succeedsWithin(Duration.ofSeconds(10))
            .isEqualTo("test1");
}

在这里,我们声明了一个 e1 交换机、q1 队列,并使用 k1 路由键将该队列绑定到此交换机上。随后,我们为 RabbitAmqpTemplate 使用默认设置,向指定的交换机发送消息(并使用相应的路由键),同时将 q1 作为接收操作的默认队列。这些方法有重载版本,可用于向特定交换机或队列发送(发送和接收)消息。执行 receiveAndConvert() 操作时,若涉及 ParameterizedTypeReference<T>,则需要注入一个 SmartMessageConverterRabbitAmqpTemplate 中。spring-doc.cadn.net.cn

下一个示例展示了与 RabbitAmqpTemplate 的 RPC 实现(假设与前一个示例中使用相同的 RabbitMQ 对象):spring-doc.cadn.net.cn

@Test
void verifyRpc() {
    String testRequest = "rpc-request";
    String testReply = "rpc-reply";

    CompletableFuture<Object> rpcClientResult = this.template.convertSendAndReceive("e1", "k1", testRequest);

    AtomicReference<String> receivedRequest = new AtomicReference<>();
    CompletableFuture<Boolean> rpcServerResult =
            this.rabbitAmqpTemplate.<String, String>receiveAndReply("q1",
                     payload -> {
                         receivedRequest.set(payload);
                         return testReply;
                     });

    assertThat(rpcServerResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(true);
    assertThat(rpcClientResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(testReply);
    assertThat(receivedRequest.get()).isEqualTo(testRequest);
}

关联和 replyTo 队列由内部进行管理。
服务器端可使用如下所述的 @RabbitListener POJO 方法实现。spring-doc.cadn.net.cn

RabbitMQ AMQP 1.0 消费者

与其他许多面向消费者的消息传递实现类似,spring-rabbitmq-client 模块附带了 RabbitAmqpListenerContainer,这本质上是广为人知的 org.springframework.amqp.core.MessageListenerContainer 的一个实现。它与 DirectMessageListenerContainer 完全相同,但用于支持 RabbitMQ AMQP 1.0。需要一个 AmqpConnectionFactory 以及至少一个待消费的消息队列。此外,还必须提供 MessageListener(或针对 AMQP 1.0 的特定 RabbitAmqpMessageListener)。可通过 autoSettle = false 进行配置,其含义为 AcknowledgeMode.MANUAL。在此情况下,传递给 MessageListenerMessage 中,在其 MessageProperties 内包含一个 AmqpAcknowledgment 回调,供目标逻辑处理使用。spring-doc.cadn.net.cn

接口 RabbitAmqpMessageListener 具有对 com.rabbitmq.client:amqp-client 个抽象的契约:spring-doc.cadn.net.cn

/**
 * Process an AMQP message.
 * @param message the message to process.
 * @param context the consumer context to settle message.
 *                Null if container is configured for {@code autoSettle}.
 */
void onAmqpMessage(Message message, Consumer.Context context);

其中第一个参数是原生接收的 com.rabbitmq.client.amqp.Message,而 context 是用于消息结算的原生回调函数,类似于上述提到的 AmqpAcknowledgment 抽象。spring-doc.cadn.net.cn

当提供 batchSize 选项时,RabbitAmqpMessageListener 可以批量处理和解决消息。为此,必须实现 MessageListener.onMessageBatch() 合约。batchReceiveDuration 选项用于为未满批次安排强制释放,以避免内存和 消费者信用 耗尽。spring-doc.cadn.net.cn

通常,RabbitAmqpMessageListener 类在目标项目中不直接使用,而是选择通过 @RabbitListener 的 POJO 方法注解配置来实现声明式消费者配置。 RabbitAmqpListenerContainerFactory 必须注册在 RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME 下,而 @RabbitListener 注解处理过程会将 RabbitAmqpMessageListener 实例注册到 RabbitListenerEndpointRegistry 中。 目标 POJO 方法调用由特定的 RabbitAmqpMessageListenerAdapter 实现处理,该实现继承自 MessagingMessageListenerAdapter 并复用了其大量功能,包括请求-回复场景(同步或异步)。 因此,所有在 注解驱动的监听器端点 中描述的概念同样适用于此 RabbitAmqpMessageListenerspring-doc.cadn.net.cn

除了传统的消息传递 payloadheaders 外,@RabbitListener POJO 方法契约还可以包含以下参数:spring-doc.cadn.net.cn

  • com.rabbitmq.client.amqp.Message - 原生 AMQP 1.0 消息,不进行任何转换;spring-doc.cadn.net.cn

  • org.springframework.amqp.core.Message - Spring AMQP 消息抽象,作为从原生 AMQP 1.0 消息转换而来的结果;spring-doc.cadn.net.cn

  • org.springframework.messaging.Message - Spring 消息抽象作为来自 Spring AMQP 消息的转换结果;spring-doc.cadn.net.cn

  • Consumer.Context - RabbitMQ AMQP 客户端消费者结算 API;spring-doc.cadn.net.cn

  • org.springframework.amqp.core.AmqpAcknowledgment - Spring AMQP 确认抽象:委托给 Consumer.Contextspring-doc.cadn.net.cn

以下示例演示了一个简单的 @RabbitListener,用于与 RabbitMQ AMQP 1.0 进行交互,并采用手动确认机制:spring-doc.cadn.net.cn

@Bean(RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
RabbitAmqpListenerContainerFactory rabbitAmqpListenerContainerFactory(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpListenerContainerFactory(connectionFactory);
}

final List<String> received = Collections.synchronizedList(new ArrayList<>());

CountDownLatch consumeIsDone = new CountDownLatch(11);

@RabbitListener(queues = {"q1", "q2"},
        ackMode = "#{T(org.springframework.amqp.core.AcknowledgeMode).MANUAL}",
        concurrency = "2",
        id = "testAmqpListener")
void processQ1AndQ2Data(String data, AmqpAcknowledgment acknowledgment, Consumer.Context context) {
    try {
        if ("discard".equals(data)) {
            if (!this.received.contains(data)) {
                context.discard();
            }
            else {
                throw new MessageConversionException("Test message is rejected");
            }
        }
        else if ("requeue".equals(data) && !this.received.contains(data)) {
            acknowledgment.acknowledge(AmqpAcknowledgment.Status.REQUEUE);
        }
        else {
            acknowledgment.acknowledge();
        }
        this.received.add(data);
    }
    finally {
        this.consumeIsDone.countDown();
    }
}