|
请使用 Spring AMQP 4.0.2(最新稳定版本)! |
轮询消费者
代码AmqpTemplate本身可用于轮询Message接收。
默认情况下,如果没有消息可用,则立即返回null。
不会阻塞。
从版本1.5开始,您可以设置一个receiveTimeout(单位为毫秒),此时接收方法最多将阻塞该时长以等待消息到达。
负值表示无限期阻塞(或至少直到与代理的连接断开)。
版本1.6引入了receive方法的变体,允许在每次调用时传递超时参数。
由于接收操作为每条消息创建一个新的 QueueingConsumer,因此该技术在高流量环境中并不真正适用。对于这些用例,建议使用异步消费者或一个 receiveTimeout(零值)。 |
从版本 2.4.8 开始,当使用非零超时时间时,您可以指定传递给用于将消费者与通道关联的 basicConsume 方法的参数。例如:template.addConsumerArg("x-priority", 10)。
有四种简单的 receive 方法可用。与发送端的 Exchange 类似,其中一种方法要求在模板本身上直接设置一个默认队列属性,另一种方法则在运行时接受一个队列参数。1.6 版本引入了变体,以接受 timeoutMillis 的形式,在每次请求的基础上覆盖 receiveTimeout。以下列表展示了这四种方法的定义:
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
与发送消息的情况类似,AmqpTemplate 提供了一些便捷方法,可用于接收 POJO 而非 Message 实例;而实现类则提供了一种方式,用于自定义用于创建 MessageConverter 的 Object:以下列表展示了这些方法:
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
从版本 2.0 开始,这些方法有变体,可额外接受一个 ParameterizedTypeReference 参数以转换复杂类型。
模板必须配置一个 SmartMessageConverter。
有关更多信息,请参阅 使用 RabbitTemplate 从 Message 转换。
与 sendAndReceive 方法类似,从 1.3 版本开始,AmqpTemplate 提供了若干便捷的 receiveAndReply 方法,用于同步接收、处理并回复消息。
以下列表展示了这些方法定义:
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
实现 AmqpTemplate 负责处理 receive 和 reply 阶段。
在大多数情况下,您只需提供 ReceiveAndReplyCallback 的实现,以执行针对接收到的消息的业务逻辑,并构建一个回复对象或消息(如需要)。
注意,ReceiveAndReplyCallback 可能返回 null。
在此情况下,不会发送回复,且 receiveAndReply 的行为类似于 receive 方法。
这使得同一队列可同时用于混合类型的消息,其中部分消息可能不需要回复。
自动消息(请求与响应)转换仅在提供的回调不是 ReceiveAndReplyMessageCallback 的实例时应用,而 ReceiveAndReplyMessageCallback 提供了一个原始消息交换契约。
代码 ReplyToAddressCallback 在需要在运行时根据接收到的消息和来自 ReceiveAndReplyCallback 的回复,自定义逻辑来确定 replyTo 地址的场景中非常有用。默认情况下,会使用请求消息中的 replyTo 信息来路由回复。
以下列表显示了一个基于POJO的接收和回复示例:
boolean received =
this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {
public Invoice handle(Order order) {
return processOrder(order);
}
});
if (received) {
log.info("We received an order!");
}