此版本仍在开发中,目前尚不稳定。如需最新稳定版本,请使用 Spring AMQP 4.0.2spring-doc.cadn.net.cn

发送消息

发送消息时,您可以使用以下任意一种方法:spring-doc.cadn.net.cn

void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

我们可以从前面列表中的最后一个方法开始讨论,因为它实际上是最明确的。它允许在运行时提供一个 AMQP 交换机名称(连同路由键)。最后一个参数是负责实际创建消息实例的回调函数。使用此方法发送消息的一个示例如下:以下示例展示了如何使用 send 方法发送消息:spring-doc.cadn.net.cn

amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
    new Message("12.34".getBytes(), someProperties));

您可以将 exchange 属性设置在模板本身上,如果您计划经常或大部分时间使用该模板实例向同一交换发送消息。在这种情况下,您可以使用前述列表中的第二种方法。以下示例在功能上等同于前一个示例:spring-doc.cadn.net.cn

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));

如果在模板上同时设置了 exchangeroutingKey 属性,您可以使用仅接受 Message 的方法。以下示例展示了如何实现这一点:spring-doc.cadn.net.cn

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));

关于交换机和路由键属性的更好理解方式是:显式方法参数始终会覆盖模板的默认值。事实上,即使您没有在模板中显式设置这些属性,也始终存在默认值。在这两种情况下,默认值都是一个空的 String,但这个实际上是一个合理且明智的默认值。就路由键而言,起初并不总是必要的(例如,对于 Fanout 交换器)。此外,队列还可以与交换机绑定,且绑定键为空字符串 String。这两种情况都是合理场景,需要依赖模板的路由键属性的默认空值 String。就交换机名称而言,空字符串 String 通常被使用,因为 AMQP 规范定义了“默认交换机”没有名称。由于所有队列都自动绑定到该默认交换机(这是一个直接交换机),并使用其名称作为绑定值,因此可以使用前面列表中的第二种方法,通过默认交换机向任意队列发送简单的点对点消息。您可以将队列名称作为 routingKey 提供,方法是在运行时提供方法参数。下面的例子展示了如何做到这一点:spring-doc.cadn.net.cn

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));

或者,您可以创建一个模板,该模板主要用于或 exclusively 发布到单个队列。以下示例展示了如何实现此操作:spring-doc.cadn.net.cn

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));

消息构建器API

从版本 1.3 开始,MessageBuilderMessagePropertiesBuilder 提供了一个消息构建器 API。这些方法提供了一种便捷的“流畅式”方式来创建消息或消息属性。以下示例展示了流畅式 API 的实际应用:spring-doc.cadn.net.cn

Message message = MessageBuilder.withBody("foo".getBytes())
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
Message message = MessageBuilder.withBody("foo".getBytes())
    .andProperties(props)
    .build();

MessageProperties 上定义的每个属性都可以进行设置。其他方法包括 setHeader(String key, String value)removeHeader(String key)removeHeaders()copyProperties(MessageProperties properties)。每个属性设置方法都有一个 set*IfAbsent() 变体。在存在默认初始值的情况下,该方法命名为 set*IfAbsentOrDefault()spring-doc.cadn.net.cn

提供了五种静态方法来创建初始消息构建器:spring-doc.cadn.net.cn

public static MessageBuilder withBody(byte[] body) (1)

public static MessageBuilder withClonedBody(byte[] body) (2)

public static MessageBuilder withBody(byte[] body, int from, int to) (3)

public static MessageBuilder fromMessage(Message message) (4)

public static MessageBuilder fromClonedMessage(Message message) (5)
1 构建器创建的消息具有一个正文,该正文直接引用了参数。
2 构建器创建的消息正文是一个新数组,其中包含参数中字节的副本。
3 构建器创建的消息正文是一个新数组,其中包含来自参数的字节范围。Arrays.copyOfRange() 以获取更多详细信息。
4 构建器创建的消息具有一个正文,该正文直接引用了参数的正文。

spring-doc.cadn.net.cn

参数的属性被复制到一个新的 MessageProperties 对象中。spring-doc.cadn.net.cn

5 构建器创建的消息具有一个正文,该正文是一个新数组,包含对参数正文的副本。参数的属性被复制到一个新的 MessageProperties 对象中。

提供了三种静态方法来创建一个 MessagePropertiesBuilder 实例:spring-doc.cadn.net.cn

public static MessagePropertiesBuilder newInstance() (1)

public static MessagePropertiesBuilder fromProperties(MessageProperties properties) (2)

public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) (3)
1 初始化一个新的消息属性对象,并使用默认值。
2 构建器以提供的属性对象进行初始化,并且 build() 将返回该属性对象。
3 MessageProperties对象的属性被复制到新的对象中。

带有RabbitTemplate实现的AmqpTemplate,每个send()方法都有一个额外的CorrelationData对象作为参数的重载版本。 当确认发布者时,此对象在AmqpTemplate中所描述的回调中返回。 这使发送方可以将确认(acknack)与已发送消息相关联。spring-doc.cadn.net.cn

从版本 1.6.7 开始,引入了CorrelationAwareMessagePostProcessor接口,允许在消息转换后修改关联数据。
下面的例子展示了如何使用它:
spring-doc.cadn.net.cn

Message postProcessMessage(Message message, Correlation correlation);

在版本 2.0 中,此接口已过时。该方法已移到带有默认实现的方法中,该实现将调用。spring-doc.cadn.net.cn

从1.6.7版本开始,提供了一个新的回调接口CorrelationDataPostProcessor。此方法在所有MessagePostProcessor实例(包括通过send()方法提供的以及在setBeforePublishPostProcessors()中提供的)之后被调用。实现可以更新或替换提供给send()方法的相关数据(如有需要)。提供了原始的MessageCorrelationData(如有需要)。下面的例子展示了如何使用postProcess方法:spring-doc.cadn.net.cn

CorrelationData postProcess(Message message, CorrelationData correlationData);

发布者返回

当模板的mandatory属性为true时,返回的消息由以下描述的回调提供,请参阅AmqpTemplatespring-doc.cadn.net.cn

从版本 1.4 开始,RabbitTemplate支持SpELmandatoryExpression属性,它将每个请求消息作为根评估对象进行评估,解析为boolean值。Bean引用,例如@myBean.isMandatory(#root),可以在表达式中使用。spring-doc.cadn.net.cn

发布者返回还可以在发送和接收操作中的RabbitTemplate内部使用。 有关更多信息,请参见回复超时spring-doc.cadn.net.cn

批次处理

版本 1.4.2 引入了 BatchingRabbitTemplatespring-doc.cadn.net.cn

spring-doc.cadn.net.cn

这是 RabbitTemplate 的子类,覆盖了 send 方法,该方法根据 BatchingStrategy 批处理消息。spring-doc.cadn.net.cn

只有当批处理完成时,消息才发送到 RabbitMQ。spring-doc.cadn.net.cn

下面的清单显示了 BatchingStrategy 接口定义:spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

public interface BatchingStrategy {

    MessageBatch addToBatch(String exchange, String routingKey, Message message);

    Date nextRelease();

    Collection<MessageBatch> releaseBatches();

}
批处理数据保留在内存中。</p><p>未发送的消息在系统故障时可能会丢失。

提供了一个SimpleBatchingStrategy。 它支持向单个交换机或路由密钥发送消息。 它有以下属性:spring-doc.cadn.net.cn

  • batchSize: 批量发送之前的消息数量。spring-doc.cadn.net.cn

  • bufferLimit: 批次消息的最大大小。 这会预取batchSize,如果超出则会导致发送部分批次。spring-doc.cadn.net.cn

  • timeout: A time after which a partial batch is sent when there is no new activity adding messages to the batch.spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

SimpleBatchingStrategy 格式化批次,通过在每个嵌入消息前加上一个四字节的二进制长度。这通过将 springBatchFormat 消息属性设置为 lengthHeader4 传达给接收系统。spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

批量消息默认由侦听器容器自动解批(通过使用 0 消息标头)。 任何拒绝来自批的消息都会导致整个批被拒绝。

然而,有关更多信息,请参见带有分批处理的spring-doc.cadn.net.cn