此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring AMQP 3.2.6! |
轮询消费者
这AmqpTemplate
本身可用于轮询Message
接待。 默认情况下,如果没有可用的消息,null
立即返回。没有阻塞。从 1.5 版本开始,您可以将receiveTimeout
,以毫秒为单位,接收方法阻塞长达该时间,等待消息。小于零的值表示无限期阻塞(或至少直到与代理的连接丢失)。1.6 版引入了receive
允许在每次调用时传入超时的方法。
由于接收作会创建一个新的QueueingConsumer 对于每条消息,此技术并不真正适用于大容量环境。考虑使用异步使用者或receiveTimeout 为零。 |
从版本 2.4.8 开始,当使用非零超时时,您可以指定传递给basicConsume
方法,用于将消费者与通道相关联。 例如:template.addConsumerArg("x-priority", 10)
.
有四个简单的receive
可用的方法。与Exchange
在发送端,有一个方法要求已设置默认队列属性直接在模板本身上,并且有一个方法在运行时接受队列参数。1.6 版引入了接受的变体timeoutMillis
覆盖receiveTimeout
基于每个请求。以下列表显示了四种方法的定义:
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
与发送消息一样,该AmqpTemplate
有一些接收 POJO 的便捷方法,而不是Message
实例和实现提供了一种自定义MessageConverter
用于创建Object
返回: 以下列表显示了这些方法:
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
从 2.0 版开始,这些方法的变体需要额外的ParameterizedTypeReference
参数来转换复杂类型。模板必须配置SmartMessageConverter
. 看从Message
跟RabbitTemplate
了解更多信息。
似sendAndReceive
方法,从 1.3 版开始,AmqpTemplate
有几个便利receiveAndReply
用于同步接收、处理和回复消息的方法。以下列表显示了这些方法定义:
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
这AmqpTemplate
实现负责receive
和reply
阶段。 在大多数情况下,您应该只提供ReceiveAndReplyCallback
为接收到的消息执行一些业务逻辑,并在需要时构建回复对象或消息。请注意,ReceiveAndReplyCallback
可能会返回null
. 在这种情况下,不会发送回复,并且receiveAndReply
工作方式类似于receive
方法。 这样,同一队列就可以用于混合消息,其中一些消息可能不需要回复。
仅当提供的回调不是ReceiveAndReplyMessageCallback
,它提供原始消息交换协定。
这ReplyToAddressCallback
对于需要自定义逻辑来确定replyTo
运行时针对收到的消息的地址,并从ReceiveAndReplyCallback
. 默认情况下,replyTo
请求消息中的信息用于路由回复。
以下列表显示了基于 POJO 的接收和回复示例:
boolean received =
this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {
public Invoice handle(Order order) {
return processOrder(order);
}
});
if (received) {
log.info("We received an order!");
}