请使用 Spring AMQP 4.0.2(最新稳定版本)!spring-doc.cadn.net.cn

回复管理

现有对 MessageListenerAdapter 的支持已允许您的方法具有非 void 返回类型。在这种情况下,调用的结果将被封装为一条消息,并发送到原始消息中 ReplyToAddress 头部所指定的地址,或发送到监听器上配置的默认地址。您可以通过使用消息抽象的 @SendTo 注解来设置该默认地址。spring-doc.cadn.net.cn

假设我们的 processOrder 方法现在应返回一个 OrderStatus,我们可以按如下方式编写,以自动发送回复:spring-doc.cadn.net.cn

@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
    // order processing
    return status;
}

如果您需要以传输无关的方式设置额外的标头,可以返回 Message,例如以下内容:spring-doc.cadn.net.cn

@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
    // order processing
    return MessageBuilder
        .withPayload(status)
        .setHeader("code", 1234)
        .build();
}

或者,您可以在 beforeSendReplyMessagePostProcessors 容器工厂属性中使用 MessagePostProcessor 来添加更多标头。
从版本 2.2.3 开始,被调用的 Bean/方法会作为回复消息的一部分提供,该信息可用于消息后处理器中,以便将信息回传给调用方:spring-doc.cadn.net.cn

factory.setBeforeSendReplyPostProcessors(msg -> {
    msg.getMessageProperties().setHeader("calledBean",
            msg.getMessageProperties().getTargetBean().getClass().getSimpleName());
    msg.getMessageProperties().setHeader("calledMethod",
            msg.getMessageProperties().getTargetMethod().getName());
    return m;
});

从版本 2.2.5 开始,您可以配置一个 ReplyPostProcessor 来在消息发送前修改回复内容;该配置在 correlationId 头部设置完毕以匹配请求后被调用。spring-doc.cadn.net.cn

@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")
public String capitalizeWithHeader(String in) {
    return in.toUpperCase();
}

@Bean
public ReplyPostProcessor echoCustomHeader() {
    return (req, resp) -> {
        resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
        return resp;
    };
}

从版本 3.0 开始,您可以在容器工厂上配置后处理器,而不是在注解上配置。spring-doc.cadn.net.cn

factory.setReplyPostProcessorProvider(id -> (req, resp) -> {
    resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
    return resp;
});

参数 id 是监听器 ID。spring-doc.cadn.net.cn

注解上的设置将覆盖工厂设置。spring-doc.cadn.net.cn

@SendTo 被假定为遵循 exchange/routingKey 模式的一对回复 exchangeroutingKey,其中这些部分之一可以省略。有效的取值如下:spring-doc.cadn.net.cn

  • thing1/thing2: 交换机 replyTo 和交换机 routingKey
    thing1/: 交换机 replyTo 和默认(空)交换机 routingKey
    thing2/thing2: 交换机 replyTo routingKey 和默认(空)交换机。
    / 或空值:默认交换机 replyTo 和默认 routingKeyspring-doc.cadn.net.cn

此外,您可以不使用 @SendTo 属性而直接使用 value。这种情况等同于一个空的 sendTo 模式。@SendTo 仅在传入消息不具有 replyToAddress 属性时才被使用。spring-doc.cadn.net.cn

从版本 1.5 开始,@SendTo 值可以是一个 bean 初始化的 SpEL 表达式,如下例所示:spring-doc.cadn.net.cn

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
    return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
    return "test.sendTo.reply.spel";
}

表达式必须计算为一个 String,它可以是一个简单的队列名称(发送到默认交换机)或采用 exchange/routingKey 的形式(如前文所述示例之前所讨论的那样)。spring-doc.cadn.net.cn

表达式 #{…​} 在初始化期间仅计算一次。

对于动态回复路由,消息发送者应包含 reply_to 消息属性,或使用后续示例之后描述的替代运行时 SpEL 表达式。spring-doc.cadn.net.cn

从版本 1.6 开始,@SendTo 可以是一个 SpEL 表达式,该表达式会在运行时针对请求和响应进行求值,如下例所示:spring-doc.cadn.net.cn

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
    return processTheFooAndReturnABar(foo);
}

SpEL 表达式的运行时特性由 !{…​} 分隔符表示。该表达式的求值上下文 #root 对象具有三个属性:spring-doc.cadn.net.cn

上下文具有一个映射属性访问器、一个标准类型转换器和一个 Bean 解析器,这使得上下文中的其他 Bean 可以被引用(例如,@someBeanName.determineReplyQ(request, result))。spring-doc.cadn.net.cn

总之,#{…​} 在初始化期间仅计算一次,而 #root 对象即为应用上下文。 beans 通过其名称进行引用。!{…​} 在每次消息处理时于运行时进行计算,根对象具有前述列出的属性。beans 通过其名称进行引用,并以 @ 作为前缀。spring-doc.cadn.net.cn

从版本 2.1 开始,也支持简单的属性占位符(例如 ${some.reply.to})。对于早期版本,可以使用以下方法作为替代方案,如下例所示:spring-doc.cadn.net.cn

@RabbitListener(queues = "foo")
@SendTo("#{environment['my.send.to']}")
public String listen(Message in) {
    ...
    return ...
}