此版本仍在开发中,目前尚不稳定。如需最新稳定版本,请使用 Spring AMQP 4.0.2spring-doc.cadn.net.cn

请求/回复消息

AmqpTemplate 还提供了多种 sendAndReceive 方法,这些方法接受与前述单向发送操作(exchangeroutingKeyMessage)相同的参数选项。
这些方法在请求-应答场景中非常有用,因为它们可在发送前自动配置必要的 reply-to 属性,并能在为该目的而内部创建的专用队列上监听应答消息。spring-doc.cadn.net.cn

类似的请求-回复方法也已提供,其中 MessageConverter 同时应用于请求和回复。这些方法被命名为 convertSendAndReceive。有关更多详情,请参阅 AmqpTemplate 的 Javadocspring-doc.cadn.net.cn

从版本 1.5.0 开始,每个 sendAndReceive 方法变体都提供了一个重载版本,该版本接受 CorrelationData。结合一个正确配置的连接工厂,这使得可以在操作的发送端接收发布确认(publisher confirms)。有关更多信息,请参阅 关联的发布者确认与返回 以及 RabbitOperations 的 Javadocspring-doc.cadn.net.cn

从版本 2.0 开始,这些方法有变体(convertSendAndReceiveAsType),可接受额外的 ParameterizedTypeReference 个参数以转换复杂返回类型。模板必须配置一个 SmartMessageConverter。有关更多信息,请参阅 使用 RabbitTemplateMessage 进行转换spring-doc.cadn.net.cn

从版本 2.1 开始,您可以使用 RabbitTemplate 配置 noLocalReplyConsumer 选项,以控制回复消费者的 noLocal 标志。这默认是 falsespring-doc.cadn.net.cn

回复超时

默认情况下,发送和接收方法在五秒后超时并返回 null。您可以通过设置 replyTimeout 属性来修改此行为。从版本 1.5 开始,如果将 mandatory 属性设置为 true(或针对特定消息时 mandatory-expression 的计算结果为 true),当消息无法投递到队列时,会抛出 AmqpMessageReturnedException 异常。该异常包含 returnedMessagereplyCodereplyText 等属性,以及用于发送的 exchangeroutingKeyspring-doc.cadn.net.cn

此功能使用发布者返回机制。您可以通过将 publisherReturns 设置为 true 来启用它,具体请参见 发布者确认与返回CachingConnectionFactory)。
此外,您不得向 RabbitTemplate 注册自定义的 ReturnCallback

从版本 2.1.2 开始,新增了一个 replyTimedOut 方法,使子类能够获知超时信息,从而可以清理任何保留的状态。spring-doc.cadn.net.cn

从版本 2.0.11 和 2.1.3 开始,当您使用默认值 DirectReplyToMessageListenerContainer 时,可以通过设置模板的 replyErrorHandler 属性来添加一个错误处理器。此错误处理器将用于处理任何失败的投递,例如延迟回复以及未带关联头(correlation header)的消息。传入的异常是一个 ListenerExecutionFailedException,它具有一个 failedMessage 属性。spring-doc.cadn.net.cn

RabbitMQ 直接回复-回复地址

从版本 3.4.0 开始,RabbitMQ 服务器支持 直接回复到。这消除了使用固定回复队列的主要原因(即避免为每个请求创建临时队列)。从 Spring AMQP 版本 1.4.1 开始,默认情况下(如果服务器支持)将使用直接回复到功能,而不是创建临时回复队列。当未提供 replyQueue(或其名称设置为 amq.rabbitmq.reply-to)时,RabbitTemplate 会自动检测是否支持直接回复到功能,并据此选择使用它或回退到使用临时回复队列。在使用直接回复到功能时,不需要配置 reply-listener,也不应对其进行配置。

回复监听器仍与命名队列(除了 amq.rabbitmq.reply-to)兼容,从而可控制回复的并发性等。spring-doc.cadn.net.cn

从版本 1.6 开始,如果您希望为每个回复使用一个临时、独占、自动删除的队列,请将 useTemporaryReplyQueues 属性设置为 true。如果设置了 replyAddress,则此属性将被忽略。spring-doc.cadn.net.cn

您可以更改决定是否使用直接回复地址的条件,方法是继承 RabbitTemplate 并重写 useDirectReplyTo() 方法以检查不同的条件。该方法仅在第一个请求发出时调用一次。spring-doc.cadn.net.cn

在 2.0 版本之前,RabbitTemplate 会在每次请求时创建一个新的消费者,并在收到回复(或超时)后取消该消费者。现在,模板改用 DirectReplyToMessageListenerContainer 来替代,从而允许复用消费者。模板依然负责将回复与对应的请求进行关联,因此不会出现延迟回复被错误地发送给其他发送方的情况。如果您希望恢复到之前的 Behavior,请将 useDirectReplyToContainer(使用 XML 配置时为 direct-reply-to-container)属性设置为 false。spring-doc.cadn.net.cn

代码 AsyncRabbitTemplate 没有此选项。
当直接使用回复地址时,它始终使用 DirectReplyToContainer 进行回复。spring-doc.cadn.net.cn

从版本 2.3.7 开始,模板新增了一个属性 useChannelForCorrelation。当该属性为 true 时,服务器无需将关联 ID(correlation id)从请求消息头复制到响应消息中。相反,发送请求所使用的通道将用于将响应与请求进行关联。spring-doc.cadn.net.cn

消息关联与回复队列

当使用固定回复队列(除 amq.rabbitmq.reply-to 外)时,您必须提供关联数据,以便将回复与请求进行匹配。参见 RabbitMQ 远程过程调用(RPC)。默认情况下,标准 correlationId 属性用于存储关联数据。然而,如果您希望使用自定义属性来保存关联数据,可以在 元素上设置 correlation-key 属性。显式地将该属性设置为 correlationId 等同于不设置该属性。客户端和服务器必须使用相同的头部字段来存放关联数据。spring-doc.cadn.net.cn

Spring AMQP 版本 1.1 曾使用一个名为 spring_reply_correlation 的自定义属性来存储此数据。如果希望在当前版本中恢复此行为(例如,为了与另一款使用 1.1 版本的应用程序保持兼容),必须将该属性设置为 spring_reply_correlation

默认情况下,模板会自动生成其自身的关联ID(忽略任何用户提供的值)。
如果您希望使用自己的关联ID,请将 RabbitTemplate 实例的 userCorrelationId 属性设置为 truespring-doc.cadn.net.cn

相关 ID 必须是唯一的,以避免为请求返回错误的响应。

回复监听器容器

在使用 RabbitMQ 3.4.0 之前的版本时,每次回复都会使用一个新的临时队列。然而,您可以在模板上配置单个回复队列,这可能更高效,并且还能为该队列设置参数。不过,在这种情况下,您还必须提供一个 子元素。该元素为回复队列提供了一个监听器容器,其中模板即作为监听器。所有允许在 元素上使用的 消息监听器容器配置 属性,均可在该元素上使用,但不包括 connection-factorymessage-converter,因为它们继承自模板的配置。spring-doc.cadn.net.cn

如果您运行应用程序的多个实例或使用多个 RabbitTemplate 实例,则必须为每个实例使用一个唯一的回复队列。

spring-doc.cadn.net.cn

RabbitMQ 无法从队列中选择消息,因此,如果所有实例都使用相同的队列,每个实例将竞争回复消息,且不一定能收到属于自己的回复。spring-doc.cadn.net.cn

以下示例定义了一个带有连接工厂的兔子模板:spring-doc.cadn.net.cn

<rabbit:template id="amqpTemplate"
        connection-factory="connectionFactory"
        reply-queue="replies"
        reply-address="replyEx/routeReply">
    <rabbit:reply-listener/>
</rabbit:template>

虽然容器和模板共享一个连接工厂,但它们并不共享一个通道。因此,请求和回复不会在同一个事务中执行(如果启用了事务)。spring-doc.cadn.net.cn

在 1.5.0 版本之前,reply-address 属性不可用。回复始终通过默认交换机并以 reply-queue 名称作为路由键进行路由。这仍然是默认行为,但现在您可以指定新的 reply-address 属性。reply-address 可以包含一个格式为 <exchange>/<routingKey> 的地址,回复将被路由至指定的交换机,并进一步路由到与该路由键绑定的队列。reply-address 的优先级高于 reply-queue。当仅使用 reply-address 时,<reply-listener> 必须配置为一个独立的 <listener-container> 组件。reply-addressreply-queue(或 queues 属性在 <listener-container> 上)在逻辑上必须指向同一个队列。

在此配置中,使用 SimpleListenerContainer 接收回复,其中 RabbitTemplateMessageListener。在使用 <rabbit:template/> 命名空间元素定义模板时(如前面示例所示),解析器会将模板中的容器和连线定义为监听器。spring-doc.cadn.net.cn

当模板不使用固定的 replyQueue(或正在使用直接回复——参见 RabbitMQ 直接回复)时,不需要监听器容器。

spring-doc.cadn.net.cn

在使用 RabbitMQ 3.4.0 或更高版本时,直接 reply-to 是首选机制。spring-doc.cadn.net.cn

如果您将 RabbitTemplate 定义为 <bean/>,或使用 @Configuration 类来将其定义为 @Bean,或者在程序中动态创建模板时,您需要自行定义并配置回复监听器容器。
如果您未执行此操作,模板将永远不会接收到回复,最终会超时,并向 sendAndReceive 方法的调用返回 null 作为回复。spring-doc.cadn.net.cn

从版本 1.5 开始,RabbitTemplate 会检测其是否已配置为 MessageListener 以接收回复。如果未配置,则尝试发送并接收带有回复地址的消息时将失败,抛出 IllegalStateException(因为回复永远不会被接收)。spring-doc.cadn.net.cn

此外,如果使用简单的 replyAddress(队列名称),回复监听器容器会验证其正在监听一个具有相同名称的队列。如果回复地址是交换机和路由键,则无法执行此检查,并会记录一条调试日志消息。spring-doc.cadn.net.cn

当您自己配置回复监听器和模板时,重要的是要确保模板的 replyAddress 属性与容器的 queues(或 queueNames)属性指向同一个队列。

spring-doc.cadn.net.cn

模板会将回复地址插入到出站消息的 replyTo 属性中。spring-doc.cadn.net.cn

以下列表展示了如何手动配置 Bean 的示例:spring-doc.cadn.net.cn

<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <constructor-arg ref="connectionFactory" />
    <property name="exchange" value="foo.exchange" />
    <property name="routingKey" value="foo" />
    <property name="replyQueue" ref="replyQ" />
    <property name="replyTimeout" value="600000" />
    <property name="useDirectReplyToContainer" value="false" />
</bean>

<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <constructor-arg ref="connectionFactory" />
    <property name="queues" ref="replyQ" />
    <property name="messageListener" ref="amqpTemplate" />
</bean>

<rabbit:queue id="replyQ" name="my.reply.queue" />
    @Bean
    public RabbitTemplate amqpTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(msgConv());
        rabbitTemplate.setReplyAddress(replyQueue().getName());
        rabbitTemplate.setReplyTimeout(60000);
        rabbitTemplate.setUseDirectReplyToContainer(false);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer replyListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueues(replyQueue());
        container.setMessageListener(amqpTemplate());
        return container;
    }

    @Bean
    public Queue replyQueue() {
        return new Queue("my.reply.queue");
    }

一个完整的示例,其中包含一个与固定回复队列连接的 RabbitTemplate,以及一个“远程”监听容器,用于处理请求并返回回复,详见 此测试用例spring-doc.cadn.net.cn

当回复超时(replyTimeout)时,sendAndReceive() 方法返回 null。

在 1.3.6 版本之前,对于超时消息的延迟回复仅被记录日志。现在,若收到延迟回复,将予以拒绝(模板会抛出 AmqpRejectAndDontRequeueException)。如果回复队列配置为将被拒绝的消息发送至死信交换机,则可稍后检索该回复以供分析。为此,请将一个队列绑定至配置的死信交换机,并将路由键设置为回复队列的名称。spring-doc.cadn.net.cn

请参阅 RabbitMQ 死信文档,了解有关配置死信的更多信息。
您也可以查看 FixedReplyQueueDeadLetterTests 测试用例,以获取示例。spring-doc.cadn.net.cn

异步 Rabbit 模板

版本 1.6 引入了 AsyncRabbitTemplate。这具有与 AmqpTemplate 上的类似 sendAndReceive(以及 convertSendAndReceive)方法。然而,它们不阻塞,而是返回一个 CompletableFuturespring-doc.cadn.net.cn

方法 sendAndReceive 返回一个 RabbitMessageFuture
方法 convertSendAndReceive 返回一个 RabbitConverterFuturespring-doc.cadn.net.cn

您可以选择以同步方式稍后通过调用未来对象上的 get() 来获取结果,也可以注册一个回调函数,该回调函数在异步获取结果时被调用。以下列表展示了这两种方法:spring-doc.cadn.net.cn

@Autowired
private AsyncRabbitTemplate template;

...

public void doSomeWorkAndGetResultLater() {

    ...

    CompletableFuture<String> future = this.template.convertSendAndReceive("foo");

    // do some more work

    String reply = null;
    try {
        reply = future.get(10, TimeUnit.SECONDS);
    }
    catch (ExecutionException e) {
        ...
    }

    ...

}

public void doSomeWorkAndGetResultAsync() {

    ...

    RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            // success
        }
        else {
            // failure
        }
    });

    ...

}

如果设置为 mandatory,且消息无法投递,则该未来对象将抛出一个 ExecutionException 异常,其原因包含 AmqpMessageReturnedException,该异常封装了返回的消息及有关返回的详细信息。spring-doc.cadn.net.cn

如果设置为 enableConfirms,该异步结果对象具有一个名为 confirm 的属性,该属性本身是一个 CompletableFuture<Boolean> 对象,其中 true 表示发布是否成功。
如果确认异步结果为 false,则 RabbitFuture 具有另一个名为 nackCause 的属性,其中包含失败原因(如可用)。spring-doc.cadn.net.cn

如果确认(publisher confirm)在回复之后收到,则会被丢弃,因为回复意味着发布已成功。

您可以设置模板上的 receiveTimeout 属性以超时回复(默认值为 30000,即 30 秒)。如果发生超时,该未来对象将以 AmqpReplyTimeoutException 完成。spring-doc.cadn.net.cn

该模板实现了 SmartLifecycle。在存在待处理回复时停止模板,会导致待处理的 Future 实例被取消。spring-doc.cadn.net.cn

从版本 2.0 开始,异步模板现在支持 直接回复,而不是配置的回复队列。要启用此功能,请使用以下任一构造函数:spring-doc.cadn.net.cn

public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)

public AsyncRabbitTemplate(RabbitTemplate template)

参见 RabbitMQ 直接回复,以在同步 RabbitTemplate 中使用直接回复功能。spring-doc.cadn.net.cn

版本 2.0 引入了这些方法的变体(convertSendAndReceiveAsType),它们接受一个额外的 ParameterizedTypeReference 参数,用于转换复杂返回类型。您必须将底层的 RabbitTemplate 配置为使用 SmartMessageConverter。有关更多信息,请参阅 使用 RabbitTemplateMessage 进行转换spring-doc.cadn.net.cn

Spring Remoting with AMQP

Spring 远程调用已不再受支持,因为该功能已从 Spring Framework 中移除。spring-doc.cadn.net.cn

使用 sendAndReceive 操作,采用 RabbitTemplate(客户端)和 @RabbitListener 代替。spring-doc.cadn.net.cn