此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring AMQP 3.2.6! |
异步使用者
Spring AMQP 还通过使用@RabbitListener 注释,并提供一个开放的基础设施来以编程方式注册端点。
这是迄今为止设置异步消费者的最便捷方法。
有关更多详细信息,请参阅注释驱动的侦听器端点。 |
预取默认值过去为 1,这可能导致高效使用者的利用率不足。 从 2.0 版开始,默认预取值现在为 250,这应该会让消费者在大多数常见场景中保持忙碌,并且 从而提高吞吐量。 然而,在某些情况下,预取值应该很低:
此外,对于低容量消息传递和多个使用者(包括单个侦听器容器实例中的并发性),您可能希望减少预取,以便在使用者之间获得更均匀的消息分布。 请参阅消息侦听器容器配置。 有关预取的更多背景信息,请参阅这篇关于 RabbitMQ 中消费者利用率的文章和这篇关于排队理论的文章。 |
消息侦听器
对于异步Message
reception,一个专用组件(不是AmqpTemplate
)涉及。
该组件是Message
-consuming 回调。
我们将在本节后面讨论容器及其属性。
不过,首先,我们应该看看回调,因为这是您的应用程序代码与消息传递系统集成的地方。
回调有几个选项,首先是MessageListener
界面,如下表所示:
public interface MessageListener {
void onMessage(Message message);
}
如果您的回调逻辑出于任何原因依赖于 AMQP 通道实例,您可以改用ChannelAwareMessageListener
.
它看起来很相似,但有一个额外的参数。
以下列表显示了ChannelAwareMessageListener
接口定义:
public interface ChannelAwareMessageListener {
void onMessage(Message message, Channel channel) throws Exception;
}
在 2.1 版本中,此接口从 packageo.s.amqp.rabbit.core 自o.s.amqp.rabbit.listener.api . |
MessageListenerAdapter
如果您希望在应用程序逻辑和消息传递 API 之间保持更严格的分离,那么可以依赖框架提供的适配器实现。 这通常被称为“消息驱动的 POJO”支持。
1.5 版为 POJO 消息传递引入了一种更灵活的机制,即@RabbitListener 注解。
有关更多信息,请参阅注释驱动的侦听器端点。 |
使用适配器时,只需提供对适配器本身应调用的实例的引用。 以下示例显示了如何执行此作:
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");
您可以对适配器进行子类化,并提供getListenerMethodName()
根据消息动态选择不同的方法。
此方法有两个参数,originalMessage
和extractedMessage
,后者是任何转换的结果。
默认情况下,一个SimpleMessageConverter
已配置。
看SimpleMessageConverter
有关其他可用转换器的更多信息和信息。
从 1.4.2 版开始,原始消息具有consumerQueue
和consumerTag
属性,可用于确定从中接收消息的队列。
从 1.5 版本开始,您可以配置消费者队列或标签到方法名称的映射,以动态选择要调用的方法。
如果映射中没有条目,我们回退到默认的监听器方法。
默认侦听器方法(如果未设置)为handleMessage
.
从 2.0 版本开始,方便的FunctionalInterface
已提供。
以下列表显示了FunctionalInterface
:
@FunctionalInterface
public interface ReplyingMessageListener<T, R> {
R handleMessage(T t);
}
此接口有助于使用 Java 8 lambda 方便地配置适配器,如以下示例所示:
new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
...
return result;
}));
从 2.2 版开始,buildListenerArguments(Object)
已被弃用和新增buildListenerArguments(Object, Channel, Message)
取而代之的是引入了一个。
新方法帮助听众获得Channel
和Message
参数来执行更多作,例如调用channel.basicReject(long, boolean)
在手动确认模式下。
以下列表显示了最基本的示例:
public class ExtendedListenerAdapter extends MessageListenerAdapter {
@Override
protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
return new Object[]{extractedMessage, channel, message};
}
}
现在您可以配置ExtendedListenerAdapter
与MessageListenerAdapter
如果您需要接收“频道”和“消息”。
监听器的参数应设置为buildListenerArguments(Object, Channel, Message)
返回,如以下侦听器示例所示:
public void handleMessage(Object object, Channel channel, Message message) throws IOException {
...
}
容器
现在您已经了解了Message
-listen 回调,我们可以将注意力转向容器。
基本上,容器处理“主动”职责,以便侦听器回调可以保持被动。
容器是“生命周期”组件的一个示例。
它提供了启动和停止的方法。
配置容器时,您基本上弥合了 AMQP 队列和MessageListener
实例。
您必须提供对ConnectionFactory
以及该侦听器应从中使用消息的队列名称或队列实例。
在 2.0 版本之前,有一个侦听器容器,即SimpleMessageListenerContainer
.
现在有第二个容器,即DirectMessageListenerContainer
.
选择要使用的容器和条件之间的差异在选择容器中进行了描述。
以下列表显示了最基本的示例,它的工作原理是使用SimpleMessageListenerContainer
:
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));
作为“活动”组件,最常见的是创建带有 bean 定义的侦听器容器,以便它可以在后台运行。 以下示例显示了使用 XML 执行此作的一种方法:
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
以下列表显示了使用 XML 执行此作的另一种方法:
<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
前面的两个示例都创建了一个DirectMessageListenerContainer
(请注意type
属性 — 它默认为simple
).
或者,您可能更喜欢使用 Java 配置,它看起来类似于前面的代码片段:
@Configuration
public class ExampleAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public MessageListener exampleListener() {
return new MessageListener() {
public void onMessage(Message message) {
System.out.println("received: " + message);
}
};
}
}
消费者优先
从 RabbitMQ 版本 3.2 开始,代理现在支持消费者优先级(请参阅将消费者优先级与 RabbitMQ 结合使用)。
这是通过设置x-priority
关于消费者的争论。
这SimpleMessageListenerContainer
现在支持设置使用者参数,如以下示例所示:
container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));
为方便起见,命名空间提供了priority
属性listener
元素,如以下示例所示:
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>
从 1.3 版开始,您可以修改容器在运行时侦听的队列。 请参阅侦听器容器队列。
auto-delete
队列
当容器配置为侦听auto-delete
queues,队列有一个x-expires
选项,或者在 Broker 上配置了 Time-To-Live 策略,则当容器停止时(即,当最后一个使用者被取消时),Broker 将删除队列。
在 1.3 版本之前,由于缺少队列,无法重新启动容器。
这RabbitAdmin
仅在连接关闭或打开时自动重新声明队列等,当容器停止和启动时不会发生这种情况。
从 1.3 版开始,容器使用RabbitAdmin
在启动期间重新声明任何丢失的队列。
您还可以将条件声明(请参阅条件声明)与auto-startup="false"
admin 将队列声明推迟到容器启动。
以下示例显示了如何执行此作:
<rabbit:queue id="otherAnon" declared-by="containerAdmin" />
<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
<rabbit:bindings>
<rabbit:binding queue="otherAnon" key="otherAnon" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container id="container2" auto-startup="false">
<rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>
<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
auto-startup="false" />
在这种情况下,队列和交换由containerAdmin
,其中有auto-startup="false"
以便在上下文初始化期间不会声明元素。
此外,容器未启动的原因相同。
稍后启动容器时,它会将其引用用于containerAdmin
以声明元素。