|
请使用 Spring AMQP 4.0.2(最新稳定版本)! |
发送消息
发送消息时,您可以使用以下任意一种方法:
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
我们可以从前面列表中的最后一个方法开始讨论,因为它实际上是最明确的。它允许在运行时提供一个 AMQP 交换机名称(连同路由键)。最后一个参数是负责实际创建消息实例的回调函数。使用此方法发送消息的一个示例如下:以下示例展示了如何使用 send 方法发送消息:
amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
new Message("12.34".getBytes(), someProperties));
您可以将 exchange 属性设置在模板本身上,如果您计划经常或大部分时间使用该模板实例向同一交换发送消息。在这种情况下,您可以使用前述列表中的第二种方法。以下示例在功能上等同于前一个示例:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果在模板上同时设置了 exchange 和 routingKey 属性,您可以使用仅接受 Message 的方法。以下示例展示了如何实现这一点:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
关于交换机和路由键属性的更好理解方式是:显式方法参数始终会覆盖模板的默认值。事实上,即使您没有在模板中显式设置这些属性,也始终存在默认值。在这两种情况下,默认值都是一个空的 String,但这个实际上是一个合理且明智的默认值。就路由键而言,起初并不总是必要的(例如,对于 Fanout 交换器)。此外,队列还可以与交换机绑定,且绑定键为空字符串 String。这两种情况都是合理场景,需要依赖模板的路由键属性的默认空值 String。就交换机名称而言,空字符串 String 通常被使用,因为 AMQP 规范定义了“默认交换机”没有名称。由于所有队列都自动绑定到该默认交换机(这是一个直接交换机),并使用其名称作为绑定值,因此可以使用前面列表中的第二种方法,通过默认交换机向任意队列发送简单的点对点消息。您可以将队列名称作为 routingKey 提供,方法是在运行时提供方法参数。下面的例子展示了如何做到这一点:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
或者,您可以创建一个模板,该模板主要用于或 exclusively 发布到单个队列。以下示例展示了如何实现此操作:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));
消息构建器API
从版本 1.3 开始,MessageBuilder 和 MessagePropertiesBuilder 提供了一个消息构建器 API。这些方法提供了一种便捷的“流畅式”方式来创建消息或消息属性。以下示例展示了流畅式 API 的实际应用:
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
在 MessageProperties 上定义的每个属性都可以进行设置。其他方法包括 setHeader(String key, String value)、removeHeader(String key)、removeHeaders() 和 copyProperties(MessageProperties properties)。每个属性设置方法都有一个 set*IfAbsent() 变体。在存在默认初始值的情况下,该方法命名为 set*IfAbsentOrDefault()。
提供了五种静态方法来创建初始消息构建器:
public static MessageBuilder withBody(byte[] body) (1)
public static MessageBuilder withClonedBody(byte[] body) (2)
public static MessageBuilder withBody(byte[] body, int from, int to) (3)
public static MessageBuilder fromMessage(Message message) (4)
public static MessageBuilder fromClonedMessage(Message message) (5)
| 1 | 构建器创建的消息具有一个正文,该正文直接引用了参数。 |
| 2 | 构建器创建的消息正文是一个新数组,其中包含参数中字节的副本。 |
| 3 | 构建器创建的消息正文是一个新数组,其中包含来自参数的字节范围。Arrays.copyOfRange() 以获取更多详细信息。 |
| 4 | 构建器创建的消息具有一个正文,该正文直接引用了参数的正文。 参数的属性被复制到一个新的 |
| 5 | 构建器创建的消息具有一个正文,该正文是一个新数组,包含对参数正文的副本。参数的属性被复制到一个新的 MessageProperties 对象中。 |
提供了三种静态方法来创建一个 MessagePropertiesBuilder 实例:
public static MessagePropertiesBuilder newInstance() (1)
public static MessagePropertiesBuilder fromProperties(MessageProperties properties) (2)
public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) (3)
| 1 | 初始化一个新的消息属性对象,并使用默认值。 |
| 2 | 构建器以提供的属性对象进行初始化,并且 build() 将返回该属性对象。 |
| 3 | MessageProperties对象的属性被复制到新的对象中。 |
带有RabbitTemplate实现的AmqpTemplate,每个send()方法都有一个额外的CorrelationData对象作为参数的重载版本。 当确认发布者时,此对象在AmqpTemplate中所描述的回调中返回。 这使发送方可以将确认(ack或nack)与已发送消息相关联。
从版本 1.6.7 开始,引入了CorrelationAwareMessagePostProcessor接口,允许在消息转换后修改关联数据。
下面的例子展示了如何使用它:
Message postProcessMessage(Message message, Correlation correlation);
在版本 2.0 中,此接口已过时。该方法已移到带有默认实现的方法中,该实现将调用。
从1.6.7版本开始,提供了一个新的回调接口CorrelationDataPostProcessor。此方法在所有MessagePostProcessor实例(包括通过send()方法提供的以及在setBeforePublishPostProcessors()中提供的)之后被调用。实现可以更新或替换提供给send()方法的相关数据(如有需要)。提供了原始的Message和CorrelationData(如有需要)。下面的例子展示了如何使用postProcess方法:
CorrelationData postProcess(Message message, CorrelationData correlationData);
发布者返回
当模板的mandatory属性为true时,返回的消息由以下描述的回调提供,请参阅AmqpTemplate。
从版本 1.4 开始,RabbitTemplate支持SpELmandatoryExpression属性,它将每个请求消息作为根评估对象进行评估,解析为boolean值。Bean引用,例如@myBean.isMandatory(#root),可以在表达式中使用。
发布者返回还可以在发送和接收操作中的RabbitTemplate内部使用。 有关更多信息,请参见回复超时。
批次处理
版本 1.4.2 引入了 BatchingRabbitTemplate。
这是 RabbitTemplate 的子类,覆盖了 send 方法,该方法根据 BatchingStrategy 批处理消息。
只有当批处理完成时,消息才发送到 RabbitMQ。
下面的清单显示了 BatchingStrategy 接口定义:
public interface BatchingStrategy {
MessageBatch addToBatch(String exchange, String routingKey, Message message);
Date nextRelease();
Collection<MessageBatch> releaseBatches();
}
| 批处理数据保留在内存中。</p><p>未发送的消息在系统故障时可能会丢失。 |
提供了一个SimpleBatchingStrategy。 它支持向单个交换机或路由密钥发送消息。 它有以下属性:
-
batchSize: 批量发送之前的消息数量。 -
bufferLimit: 批次消息的最大大小。 这会预取batchSize,如果超出则会导致发送部分批次。 -
timeout: A time after which a partial batch is sent when there is no new activity adding messages to the batch.
SimpleBatchingStrategy 格式化批次,通过在每个嵌入消息前加上一个四字节的二进制长度。这通过将 springBatchFormat 消息属性设置为 lengthHeader4 传达给接收系统。
| 批量消息默认由侦听器容器自动解批(通过使用 0 消息标头)。 任何拒绝来自批的消息都会导致整个批被拒绝。 |
然而,有关更多信息,请参见带有分批处理的