此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring AMQP 3.2.6! |
发送消息
发送消息时,可以使用以下任一方法:
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
我们可以从前面列表中的最后一个方法开始讨论,因为它实际上是最明确的。
它允许在运行时提供 AMQP 交换名称(以及路由密钥)。
最后一个参数是负责实际创建消息实例的回调。
使用此方法发送消息的示例可能如下所示:
以下示例演示如何使用send
发送消息的方法:
amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
new Message("12.34".getBytes(), someProperties));
您可以将exchange
属性,如果您计划在大部分时间或所有时间使用该模板实例发送到同一交易所。
在这种情况下,可以使用前面列表中的第二种方法。
以下示例在功能上等同于前面的示例:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果同时exchange
和routingKey
属性,则可以使用仅接受Message
.
以下示例显示了如何执行此作:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
考虑交换和路由键属性的更好方法是显式方法参数始终覆盖模板的默认值。
事实上,即使您没有在模板上显式设置这些属性,也始终存在默认值。
在这两种情况下,默认值都是空的String
,但这实际上是一个明智的默认值。
就路由键而言,它并不总是一开始就必需的(例如,对于
一个Fanout
交换)。
此外,队列可以绑定到具有空String
.
这些都是依赖默认空的合法场景String
模板的路由键属性的值。
就交易所名称而言,空的String
常用,因为 AMQP 规范将“默认交换”定义为没有名称。
由于所有队列都自动绑定到该默认交换(即直接交换),因此使用其名称作为绑定值,因此前面列表中的第二种方法可用于通过默认交换向任何队列进行简单的点对点消息传递。
您可以将队列名称作为routingKey
,通过在运行时提供方法参数。
以下示例显示了如何执行此作:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
或者,您可以创建一个模板,该模板可用于主要或专门发布到单个队列。 以下示例显示了如何执行此作:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));
消息生成器 API
从 1.3 版开始,消息构建器 API 由MessageBuilder
和MessagePropertiesBuilder
.
这些方法提供了一种方便的“流畅”方法来创建消息或消息属性。
以下示例显示了 Fluent API 的实际应用:
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
在MessageProperties
可以设置。
其他方法包括setHeader(String key, String value)
,removeHeader(String key)
,removeHeaders()
和copyProperties(MessageProperties properties)
.
每个属性设置方法都有一个set*IfAbsent()
变体。
在存在默认初始值的情况下,该方法名为set*IfAbsentOrDefault()
.
提供了五个静态方法来创建初始消息生成器:
public static MessageBuilder withBody(byte[] body) (1)
public static MessageBuilder withClonedBody(byte[] body) (2)
public static MessageBuilder withBody(byte[] body, int from, int to) (3)
public static MessageBuilder fromMessage(Message message) (4)
public static MessageBuilder fromClonedMessage(Message message) (5)
1 | 构建器创建的消息具有对参数的直接引用的正文。 |
2 | 构建器创建的消息有一个正文,该正文是一个新数组,其中包含参数中的字节副本。 |
3 | 构建器创建的消息有一个正文,该正文是一个新数组,其中包含参数中的字节范围。
看Arrays.copyOfRange() 了解更多详情。 |
4 | 构建器创建的消息具有对参数正文的直接引用的正文。
参数的属性被复制到新的MessageProperties 对象。 |
5 | 构建器创建的消息有一个正文,该正文是一个包含参数正文副本的新数组。
参数的属性被复制到新的MessageProperties 对象。 |
提供了三种静态方法来创建MessagePropertiesBuilder
实例:
public static MessagePropertiesBuilder newInstance() (1)
public static MessagePropertiesBuilder fromProperties(MessageProperties properties) (2)
public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) (3)
1 | 使用默认值初始化新的消息属性对象。 |
2 | 构建器初始化为 和build() 将返回提供的属性对象。 |
3 | 参数的属性被复制到新的MessageProperties 对象。 |
使用RabbitTemplate
实现AmqpTemplate
,每个send()
methods 有一个重载版本,它需要额外的CorrelationData
对象。
启用发布者确认后,此对象将在AmqpTemplate
.
这允许发件人关联确认 (ack
或nack
) 替换为已发送的消息。
从 1.6.7 版本开始,CorrelationAwareMessagePostProcessor
引入接口,允许在消息转换后修改相关数据。
以下示例演示如何使用它:
Message postProcessMessage(Message message, Correlation correlation);
在 2.0 版中,此接口已被弃用。
该方法已移至MessagePostProcessor
使用委托给postProcessMessage(Message message)
.
同样从 1.6.7 版本开始,一个名为CorrelationDataPostProcessor
被提供。
毕竟这是被调用的MessagePostProcessor
实例(在send()
方法以及setBeforePublishPostProcessors()
).
实现可以更新或替换send()
方法(如果有)。
这Message
和原创CorrelationData
(如果有)作为参数提供。
以下示例演示如何使用postProcess
方法:
CorrelationData postProcess(Message message, CorrelationData correlationData);
发布者退货
当模板的mandatory
属性是true
,返回的消息由AmqpTemplate
.
从 1.4 版开始,RabbitTemplate
支持 SpELmandatoryExpression
属性,该属性作为根评估对象针对每个请求消息进行评估,解析为boolean
价值。
Bean 引用,例如@myBean.isMandatory(#root)
,可用于表达式。
发布者返回也可以由RabbitTemplate
在发送和接收作中。
有关更多信息,请参阅回复超时。
配料
1.4.2 版引入了BatchingRabbitTemplate
.
这是RabbitTemplate
替换的send
根据BatchingStrategy
.
只有当批处理完成时,消息才会发送到 RabbitMQ。
以下列表显示了BatchingStrategy
接口定义:
public interface BatchingStrategy {
MessageBatch addToBatch(String exchange, String routingKey, Message message);
Date nextRelease();
Collection<MessageBatch> releaseBatches();
}
批处理数据保存在内存中。 如果发生系统故障,未发送的消息可能会丢失。 |
一个SimpleBatchingStrategy
被提供。
它支持将消息发送到单个交换或路由密钥。
它具有以下属性:
-
batchSize
:发送之前批处理中的消息数。 -
bufferLimit
:批处理消息的最大大小。 这会抢占batchSize
,如果超过,则会导致发送部分批次。 -
timeout
:在没有向批处理添加消息的新活动时发送部分批处理的时间。
这SimpleBatchingStrategy
通过在每个嵌入的消息前面加上 4 字节的二进制长度来格式化批处理。
通过设置springBatchFormat
message 属性设置为lengthHeader4
.
默认情况下,侦听器容器会自动取消批处理的消息(通过使用springBatchFormat message header) 的 Message 标头)。
拒绝批处理中的任何邮件会导致整个批处理被拒绝。 |
但是,有关详细信息,请参阅@RabbitListener使用批处理。