此版本仍在开发中,目前尚不稳定。如需最新稳定版本,请使用 Spring AMQP 4.0.2spring-doc.cadn.net.cn

异步消费者

Spring AMQP 还通过使用 @RabbitListener 注解支持注解驱动的监听器端点,并提供了一个开放的基础设施,用于以编程方式注册端点。

spring-doc.cadn.net.cn

这是设置异步消费者最便捷的方式。spring-doc.cadn.net.cn

有关更多详情,请参阅 注解驱动的监听器端点spring-doc.cadn.net.cn

预取的默认值曾为1,这可能导致高效消费者未被充分利用。从2.0版本开始,预取的默认值现已更改为250,这在大多数常见场景中应能保持消费者持续忙碌,从而提升吞吐量。spring-doc.cadn.net.cn

然而,在某些情况下,预取值应保持较低:spring-doc.cadn.net.cn

此外,对于低流量消息以及多个消费者(包括单个监听容器实例内的并发),您可能希望减少预取数量,以实现消息在各消费者之间更均衡的分发。spring-doc.cadn.net.cn

有关预取的更多背景信息,请参阅这篇关于 RabbitMQ 中消费者利用率 的文章,以及这篇关于 排队论 的文章。spring-doc.cadn.net.cn

消息监听器

对于异步 Message 接收,会涉及一个专用组件(而非 AmqpTemplate)。该组件是用于容纳一个 Message-消费回调函数的容器。我们将在本节稍后部分讨论该容器及其属性。不过,首先我们应该查看回调函数,因为这是您的应用程序代码与消息系统进行集成的地方。回调函数有几种可选方案,其中一种是实现 MessageListener 接口,如下所示的列表:spring-doc.cadn.net.cn

public interface MessageListener {
    void onMessage(Message message);
}

如果您在回调逻辑中依赖于 AMQP Channel 实例,您可以改用 ChannelAwareMessageListener。它看起来相似,但多了一个参数。以下列表展示了 ChannelAwareMessageListener 接口的定义:spring-doc.cadn.net.cn

public interface ChannelAwareMessageListener {
    void onMessage(Message message, Channel channel) throws Exception;
}
在版本 2.1 中,此接口从包 o.s.amqp.rabbit.core 移动到了 o.s.amqp.rabbit.listener.api

MessageListenerAdapter

如果您更倾向于在应用逻辑与消息API之间保持更严格的分离,可以依赖框架提供的适配器实现。这通常被称为“基于消息的POJO”支持。spring-doc.cadn.net.cn

版本 1.5 引入了一种更灵活的 POJO 消息传递机制,即 @RabbitListener 注解。有关更多信息,请参阅 注解驱动的监听器端点

在使用适配器时,您只需提供一个对该适配器本身应调用的实例的引用即可。</p><p>以下示例展示了如何实现这一点:spring-doc.cadn.net.cn

MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");

您可以继承该适配器并提供 getListenerMethodName() 的实现,以根据消息动态选择不同的方法。此方法有两个参数,originalMessageextractedMessage,其中后者是任何转换后的结果。默认情况下,配置了一个 SimpleMessageConverter。有关更多信息及其他可用转换器的详情,请参阅 SimpleMessageConverterspring-doc.cadn.net.cn

从版本 1.4.2 开始,原始消息具有 consumerQueueconsumerTag 属性,可用于确定消息来自哪个队列。spring-doc.cadn.net.cn

从版本 1.5 开始,您可以配置一个消费者队列或标签到方法名的映射,以动态选择要调用的方法。如果映射中没有匹配项,则回退到默认监听器方法。默认监听器方法(如未设置)为 handleMessagespring-doc.cadn.net.cn

从版本 2.0 开始,提供了一个便捷的 FunctionalInterface
以下列表显示了 FunctionalInterface 的定义:spring-doc.cadn.net.cn

@FunctionalInterface
public interface ReplyingMessageListener<T, R> {

    R handleMessage(T t);

}

该接口通过使用 Java 8 的 Lambda 表达式,实现了对适配器的便捷配置,如下例所示:spring-doc.cadn.net.cn

new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
    ...
    return result;
}));

从版本 2.2 开始,buildListenerArguments(Object) 已被弃用,取而代之的是新的 buildListenerArguments(Object, Channel, Message)。新的方法有助于监听器获取 ChannelMessage 参数以执行更多操作,例如在手动确认模式下调用 channel.basicReject(long, boolean)。以下列表展示了最基本的示例:spring-doc.cadn.net.cn

public class ExtendedListenerAdapter extends MessageListenerAdapter {

    @Override
    protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
        return new Object[]{extractedMessage, channel, message};
    }

}

现在,如果您需要接收“channel”和“message”,可以将 ExtendedListenerAdapter 配置为与 MessageListenerAdapter 相同。监听器的参数应设置为 buildListenerArguments(Object, Channel, Message) 返回的内容,如下所示的监听器示例:spring-doc.cadn.net.cn

public void handleMessage(Object object, Channel channel, Message message) throws IOException {
    ...
}

容器

现在您已经了解了 Message 监听回调的多种选项,接下来我们可以将注意力转向容器。基本上,容器负责处理“主动”职责,从而使监听器回调能够保持被动状态。容器是一个“生命周期”组件的示例。它提供了启动和停止的方法。在配置容器时,您本质上是在 AMQP 队列与 MessageListener 实例之间建立桥梁。您必须提供对 ConnectionFactory 的引用以及队列名称或队列实例,监听器应从这些队列中消费消息。spring-doc.cadn.net.cn

在 2.0 版本之前,只有一个监听器容器,即 SimpleMessageListenerContainer。现在新增了第二个容器,即 DirectMessageListenerContainer。这两个容器之间的差异以及您在选择使用哪一个时可能参考的标准,详见 选择容器spring-doc.cadn.net.cn

以下列表展示了最基本的示例,该示例通过使用 SimpleMessageListenerContainer 实现:spring-doc.cadn.net.cn

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));

作为一个“活跃”的组件,最常见的方式是通过bean定义来创建监听器容器,以便它能在后台运行。以下示例展示了如何使用XML实现这一点:spring-doc.cadn.net.cn

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

以下列表展示了另一种使用 XML 实现的方法:spring-doc.cadn.net.cn

<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

前面的两个示例均创建了一个 DirectMessageListenerContainer(注意 type 属性——它默认为 simple)。spring-doc.cadn.net.cn

或者,您可能更倾向于使用 Java 配置,其外观与前面的代码片段类似:spring-doc.cadn.net.cn

@Configuration
public class ExampleAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

    @Bean
    public CachingConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public MessageListener exampleListener() {
        return new MessageListener() {
            public void onMessage(Message message) {
                System.out.println("received: " + message);
            }
        };
    }
}

消费者优先级

从 RabbitMQ 版本 3.2 开始,代理现在支持消费者优先级(参见 在 RabbitMQ 中使用消费者优先级)。这是通过在消费者上设置 x-priority 参数来启用的。现在 SimpleMessageListenerContainer 支持设置消费者参数,如下例所示:spring-doc.cadn.net.cn

container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));

为了方便起见,该命名空间在 listener 元素上提供了 priority 属性,如下例所示:spring-doc.cadn.net.cn

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>

从版本 1.3 开始,您可以在运行时修改容器监听的队列。请参阅 监听器容器队列spring-doc.cadn.net.cn

auto-delete队列

当容器被配置为监听 auto-delete 个队列时,如果队列具有 x-expires 选项,或者在 Broker 上配置了 生存时间(Time-To-Live) 策略,则在容器停止时(即最后一个消费者被取消时),Broker 会自动移除该队列。在 1.3 版本之前,由于队列缺失,容器无法重启。仅当连接关闭或重新打开时,RabbitAdmin 才会自动重新声明队列等资源,而容器停止和启动并不会触发此操作。spring-doc.cadn.net.cn

从版本 1.3 开始,容器在启动期间使用 RabbitAdmin 重新声明任何缺失的队列。spring-doc.cadn.net.cn

您还可以将条件声明(参见 条件声明)与一个 auto-startup="false" 管理员配合使用,以将队列声明推迟到容器启动之后进行。以下示例展示了如何实现这一点:spring-doc.cadn.net.cn

<rabbit:queue id="otherAnon" declared-by="containerAdmin" />

<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
    <rabbit:bindings>
        <rabbit:binding queue="otherAnon" key="otherAnon" />
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:listener-container id="container2" auto-startup="false">
    <rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>

<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
    auto-startup="false" />

在这种情况下,队列和交换机由 containerAdmin 声明,而 auto-startup="false" 使得元素不会在上下文初始化期间被声明。此外,由于相同的原因,容器也不会启动。当稍后启动容器时,它会使用对 containerAdmin 的引用以声明这些元素。spring-doc.cadn.net.cn