此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring AMQP 3.2.6spring-doc.cadn.net.cn

请求/回复消息传递

AmqpTemplate还提供了多种sendAndReceive接受前面描述的单向发送作 (exchange,routingKeyMessage). 这些方法对于请求-回复方案非常有用,因为它们处理了必要reply-to属性,并且可以在内部为此目的创建的独占队列上侦听回复消息。spring-doc.cadn.net.cn

类似的请求-回复方法也可用,其中MessageConverter应用于请求和回复。 这些方法被命名为convertSendAndReceive. 请参阅Javadoc 的AmqpTemplate了解更多详情。spring-doc.cadn.net.cn

从 1.5.0 版本开始,每个sendAndReceive方法 variants 有一个重载版本,它采用CorrelationData. 与正确配置的连接工厂一起,这可以接收作的发送端的发布者确认。 请参阅相关发布者确认和退货Javadoc 的RabbitOperations了解更多信息。spring-doc.cadn.net.cn

从 2.0 版开始,这些方法有变体 (convertSendAndReceiveAsType) 需要额外的ParameterizedTypeReference参数来转换复杂的返回类型。 模板必须配置SmartMessageConverter. 看MessageRabbitTemplate了解更多信息。spring-doc.cadn.net.cn

从 2.1 版开始,您可以配置RabbitTemplate使用noLocalReplyConsumer控制noLocal标记回复消费者。 这是false默认情况下。spring-doc.cadn.net.cn

回复超时

默认情况下,发送和接收方法在五秒后超时并返回 null。 您可以通过将replyTimeout财产。 从 1.5 版开始,如果您将mandatory属性设置为true(或mandatory-expression评估为true对于特定消息),如果消息无法传递到队列,则AmqpMessageReturnedException被抛出。 此异常具有returnedMessage,replyCodereplyText属性,以及exchangeroutingKey用于发送。spring-doc.cadn.net.cn

此功能使用发布者退货。 您可以通过设置publisherReturnstrueCachingConnectionFactory(请参阅出版商确认和退货)。 此外,您不得注册自己的ReturnCallback使用RabbitTemplate.

从 2.1.2 版开始,一个replyTimedOut方法,让子类被告知超时,以便它们可以清理任何保留的状态。spring-doc.cadn.net.cn

从版本 2.0.11 和 2.1.3 开始,当您使用默认的DirectReplyToMessageListenerContainer,您可以通过设置模板的replyErrorHandler财产。 对于任何失败的传递,例如延迟回复和收到没有相关标头的消息,都会调用此错误处理程序。 传入的异常是ListenerExecutionFailedException,它有一个failedMessage财产。spring-doc.cadn.net.cn

RabbitMQ 直接回复

从 3.4.0 版本开始,RabbitMQ 服务器支持直接回复。 这消除了固定应答队列的主要原因(以避免需要为每个请求创建临时队列)。 从 Spring AMQP 版本 1.4.1 开始,默认情况下使用直接回复(如果服务器支持),而不是创建临时回复队列。 当没有时replyQueue(或设置为amq.rabbitmq.reply-to)、RabbitTemplate自动检测是否支持直接回复,并使用它或回退到使用临时回复队列。 使用直接回复时,一个reply-listener不是必需的,也不应配置。

命名队列仍支持应答侦听器(除了amq.rabbitmq.reply-to),允许控制回复并发等。spring-doc.cadn.net.cn

从 1.6 版开始,如果您希望对每个 reply,将useTemporaryReplyQueues属性设置为true. 如果将replyAddress.spring-doc.cadn.net.cn

您可以通过子类化来更改指示是否使用直接回复的条件RabbitTemplate和覆盖useDirectReplyTo()以检查不同的标准。 发送第一个请求时,该方法仅被调用一次。spring-doc.cadn.net.cn

在 2.0 版之前,RabbitTemplate为每个请求创建一个新的使用者,并在收到回复(或超时)时取消使用者。 现在,模板使用DirectReplyToMessageListenerContainer相反,让消费者被重复使用。 模板仍然负责关联回复,因此不存在延迟回复给其他发件人的危险。 如果要恢复到以前的行为,请将useDirectReplyToContainer (direct-reply-to-container使用 XML 配置时)属性设置为 false。spring-doc.cadn.net.cn

AsyncRabbitTemplate没有这样的选择。 它总是使用DirectReplyToContainer用于使用直接回复时的回复。spring-doc.cadn.net.cn

从版本 2.3.7 开始,模板有一个新属性useChannelForCorrelation. 当这是true,则服务器不必将相关 ID 从请求消息头复制到回复消息。 相反,用于发送请求的通道用于将回复与请求相关联。spring-doc.cadn.net.cn

消息与应答队列的关联

使用固定应答队列时(amq.rabbitmq.reply-to),您必须提供相关数据,以便回复可以与请求相关联。 请参阅 RabbitMQ 远程过程调用 (RPC)。 默认情况下,标准correlationId属性用于保存相关数据。 但是,如果您希望使用自定义属性来保存相关数据,则可以将correlation-key><属性。 显式将属性设置为correlationId与省略属性相同。 客户端和服务器必须对相关数据使用相同的标头。spring-doc.cadn.net.cn

Spring AMQP 版本 1.1 使用了一个名为spring_reply_correlation对于这些数据。 如果您希望在当前版本中恢复到此行为(可能是为了保持与使用 1.1 的另一个应用程序的兼容性),则必须将属性设置为spring_reply_correlation.

默认情况下,模板会生成自己的相关 ID(忽略任何用户提供的值)。 如果您希望使用自己的相关 ID,请将RabbitTemplate实例的userCorrelationId属性设置为true.spring-doc.cadn.net.cn

关联标识必须是唯一的,以避免为请求返回错误的回复。

回复侦听器容器

当使用 3.4.0 之前的 RabbitMQ 版本时,每个回复都会使用一个新的临时队列。 但是,可以在模板上配置单个回复队列,这样可以更高效,还可以在该队列上设置参数。 但是,在这种情况下,您还必须提供 <reply-listener/> 子元素。 此元素为应答队列提供侦听器容器,模板是侦听器。 该元素允许在<listener-container/>上允许的所有消息侦听器容器配置属性,但connection-factorymessage-converter,这些都是从模板的配置继承而来的。spring-doc.cadn.net.cn

如果您运行应用程序的多个实例或使用RabbitTemplate实例,您必须为每个实例使用唯一的回复队列。 RabbitMQ 无法从队列中选择消息,因此,如果它们都使用相同的队列,则每个实例都会争夺回复,而不一定收到自己的回复。

以下示例定义了具有连接工厂的兔子模板:spring-doc.cadn.net.cn

<rabbit:template id="amqpTemplate"
        connection-factory="connectionFactory"
        reply-queue="replies"
        reply-address="replyEx/routeReply">
    <rabbit:reply-listener/>
</rabbit:template>

虽然容器和模板共享连接工厂,但它们不共享通道。 因此,请求和回复不会在同一事务中执行(如果是事务)。spring-doc.cadn.net.cn

在 1.5.0 版本之前,reply-address属性不可用。 回复始终使用默认交换和reply-queuename 作为路由键。 这仍然是默认设置,但您现在可以指定新的reply-address属性。 这reply-address可以包含具有<exchange>/<routingKey>并且应答将路由到指定的交换,并路由到与路由键绑定的队列。 这reply-address优先于reply-queue. 仅当reply-address正在使用中,则<reply-listener>必须配置为单独的<listener-container>元件。 这reply-addressreply-queue(或queues属性<listener-container>)必须在逻辑上引用同一个队列。

使用此配置,一个SimpleListenerContainer用于接收回复,其中RabbitTemplate作为MessageListener. 使用<rabbit:template/>namespace 元素,如前面的示例所示,解析器将模板中的容器和线定义为监听器。spring-doc.cadn.net.cn

当模板不使用固定的replyQueue(或使用直接回复 - 参见 RabbitMQ 直接回复),则不需要侦听器容器。 直接reply-to是使用 RabbitMQ 3.4.0 或更高版本时的首选机制。

如果您定义了RabbitTemplate作为<bean/>或使用@Configuration类将其定义为@Bean或者,以编程方式创建模板时,需要自行定义和连接回复侦听器容器。 如果不执行此作,模板将永远不会收到回复,最终会超时并返回 null 作为对sendAndReceive方法。spring-doc.cadn.net.cn

从 1.5 版开始,RabbitTemplate检测它是否已经 配置为MessageListener接收回复。 如果没有,则尝试发送和接收带有回复地址的邮件 fail 时IllegalStateException(因为永远不会收到回复)。spring-doc.cadn.net.cn

此外,如果简单的replyAddress(队列名称),则应答侦听器容器验证它是否正在侦听 到具有相同名称的队列。 如果回复地址是交换和路由密钥,并且写入了调试日志消息,则无法执行此检查。spring-doc.cadn.net.cn

在自己连接回复侦听器和模板时,重要的是要确保模板的replyAddress和容器的queues(或queueNames) 属性引用相同的队列。 模板将回复地址插入到出站邮件中replyTo财产。

以下列表显示了如何手动连接 Bean 的示例:spring-doc.cadn.net.cn

<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <constructor-arg ref="connectionFactory" />
    <property name="exchange" value="foo.exchange" />
    <property name="routingKey" value="foo" />
    <property name="replyQueue" ref="replyQ" />
    <property name="replyTimeout" value="600000" />
    <property name="useDirectReplyToContainer" value="false" />
</bean>

<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <constructor-arg ref="connectionFactory" />
    <property name="queues" ref="replyQ" />
    <property name="messageListener" ref="amqpTemplate" />
</bean>

<rabbit:queue id="replyQ" name="my.reply.queue" />
    @Bean
    public RabbitTemplate amqpTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(msgConv());
        rabbitTemplate.setReplyAddress(replyQueue().getName());
        rabbitTemplate.setReplyTimeout(60000);
        rabbitTemplate.setUseDirectReplyToContainer(false);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer replyListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueues(replyQueue());
        container.setMessageListener(amqpTemplate());
        return container;
    }

    @Bean
    public Queue replyQueue() {
        return new Queue("my.reply.queue");
    }

一个完整的示例RabbitTemplate与固定的应答队列连接,以及处理请求并返回应答的“远程”侦听器容器,在此测试用例中显示。spring-doc.cadn.net.cn

当回复超时 (replyTimeout)、sendAndReceive()方法返回 null。

在 1.3.6 版之前,仅记录超时消息的延迟回复。 现在,如果收到延迟回复,则会拒绝它(模板会抛出AmqpRejectAndDontRequeueException). 如果回复队列配置为将被拒绝的邮件发送到死信交换,则可以检索回复以供以后分析。 为此,请使用与回复队列名称相等的路由密钥将队列绑定到配置的死信交换。spring-doc.cadn.net.cn

有关配置死信的更多信息,请参阅 RabbitMQ 死信文档。 您还可以查看FixedReplyQueueDeadLetterTests测试用例作为示例。spring-doc.cadn.net.cn

异步兔子模板

1.6 版引入了AsyncRabbitTemplate. 这有类似的sendAndReceive(和convertSendAndReceive) 方法添加到AmqpTemplate. 但是,它们不是阻塞,而是返回CompletableFuture.spring-doc.cadn.net.cn

sendAndReceive方法返回一个RabbitMessageFuture. 这convertSendAndReceive方法返回一个RabbitConverterFuture.spring-doc.cadn.net.cn

您可以稍后通过调用get(),或者您可以注册一个与结果异步调用的回调。 以下列表显示了这两种方法:spring-doc.cadn.net.cn

@Autowired
private AsyncRabbitTemplate template;

...

public void doSomeWorkAndGetResultLater() {

    ...

    CompletableFuture<String> future = this.template.convertSendAndReceive("foo");

    // do some more work

    String reply = null;
    try {
        reply = future.get(10, TimeUnit.SECONDS);
    }
    catch (ExecutionException e) {
        ...
    }

    ...

}

public void doSomeWorkAndGetResultAsync() {

    ...

    RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            // success
        }
        else {
            // failure
        }
    });

    ...

}

如果mandatory设置了消息,并且无法传递消息,则 future 会抛出一个ExecutionException原因为AmqpMessageReturnedException,它封装了返回的消息和有关返回的信息。spring-doc.cadn.net.cn

如果enableConfirms设置时,未来有一个名为confirm,它本身就是一个CompletableFuture<Boolean>true表示发布成功。 如果确认未来是falseRabbitFuture还有一个名为nackCause,其中包含失败的原因(如果可用)。spring-doc.cadn.net.cn

如果在回复后收到发布者确认,则该确认将被丢弃,因为该回复意味着成功发布。

您可以将receiveTimeout属性以超时回复(默认为30000- 30 秒)。 如果发生超时,则未来将以AmqpReplyTimeoutException.spring-doc.cadn.net.cn

模板实现SmartLifecycle. 在有待处理的回复时停止模板会导致Future要取消的实例。spring-doc.cadn.net.cn

从 2.0 版开始,异步模板现在支持直接回复,而不是配置的回复队列。 若要启用此功能,请使用以下构造函数之一:spring-doc.cadn.net.cn

public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)

public AsyncRabbitTemplate(RabbitTemplate template)

请参阅 RabbitMQ 直接回复,将直接回复与同步RabbitTemplate.spring-doc.cadn.net.cn

2.0 版引入了这些方法的变体 (convertSendAndReceiveAsType) 需要额外的ParameterizedTypeReference参数来转换复杂的返回类型。 您必须配置基础RabbitTemplate使用SmartMessageConverter. 看MessageRabbitTemplate了解更多信息。spring-doc.cadn.net.cn

使用 AMQP 进行春季远程处理

不再支持 Spring 远程处理,因为该功能已从 Spring Framework 中删除。spring-doc.cadn.net.cn

sendAndReceive使用RabbitTemplate(客户端)和@RabbitListener相反。spring-doc.cadn.net.cn