|
此版本仍在开发中,目前尚不稳定。如需最新稳定版本,请使用 Spring AMQP 4.0.2! |
AmqpTemplate
与 Spring Framework 及相关项目提供的许多其他高级抽象一样,Spring AMQP 也提供了一个“模板”,其在其中起着核心作用。定义主要操作的接口称为 AmqpTemplate。这些操作涵盖了发送和接收消息的一般行为。换句话说,它们并非任何特定实现所独有的——因此名称中包含“AMQP”。另一方面,该接口存在一些实现,这些实现与 AMQP 协议的实现相关联。与JMS不同,JMS本身是一个接口级别的API,而AMQP是一种网络协议层面的协议。该协议的实现提供了其自身的客户端库,因此每个模板接口的实现都依赖于特定的客户端库。目前,仅有一个实现:RabbitTemplate。在下面的示例中,我们经常使用 AmqpTemplate。然而,当你查看配置示例或任何涉及模板实例化或调用setter方法的代码片段时,你可以看到实现类型(例如,RabbitTemplate)。
另请参阅 异步 Rabbit Template。
添加重试功能
从版本 1.3 开始,现在您可以将 RabbitTemplate 配置为使用 RetryTemplate,以帮助处理与代理(broker)连接相关的问题。有关完整信息,请参阅 spring-retry 项目。以下仅是一个示例,它使用指数退避策略和默认的 SimpleRetryPolicy,即在将异常抛给调用方之前尝试三次。
以下示例使用了 XML 命名空间:
<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>
以下示例在 Java 中使用了 @Configuration 注解:
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}
从版本 1.4 开始,除了 retryTemplate 属性外,recoveryCallback 选项也已在 RabbitTemplate 上得到支持。它被用作 RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback) 的第二个参数。
代码 RecoveryCallback 的功能 somewhat 有限,因为重试上下文仅包含 lastThrowable 字段。对于更复杂的使用场景,建议使用外部的 RetryTemplate,以便通过上下文的属性向 RecoveryCallback 传递更多信息。以下示例展示了如何实现这一点: |
retryTemplate.execute(
new RetryCallback<Object, Exception>() {
@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}, new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}
在这种情况下,您 不会 将 RetryTemplate 注入到 RabbitTemplate 中。
发布是异步的——如何检测成功与失败
发布消息是一种异步机制,默认情况下,RabbitMQ 会丢弃无法路由的消息。为了确保成功发布,您可以接收异步确认,如 关联的发布者确认和返回 所述。请考虑以下两种故障场景:
-
发布到交换机,但没有匹配的目标队列。
-
发布到一个不存在的交换机。
第一种情况由出版商退货涵盖,如关联出版商确认和退货中所述。
对于第二种情况,消息会被丢弃,且不会生成返回结果。
底层通道会因异常而关闭。
默认情况下,该异常会被记录,但您可以在 ChannelListener 中注册一个 CachingConnectionFactory,以获取此类事件的通知。
以下示例展示了如何添加一个 ConnectionListener:
this.connectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
}
@Override
public void onShutDown(ShutdownSignalException signal) {
...
}
});
您可以检查信号的 reason 属性以确定发生的问题。
为了在发送线程上检测异常,您可以在 setChannelTransacted(true) 上 RabbitTemplate,异常将在 txCommit() 处被检测到。然而,事务会显著影响性能,因此在为这一单一用例启用事务之前,请慎重考虑。
关联的发布者确认和返回
《0》实现的《1》支持发布者确认和返回。
对于返回的消息,模板的 mandatory 属性必须设置为 true 或 mandatory-expression 必须计算为 true,才能使特定消息生效。此功能需要一个 CachingConnectionFactory,其 publisherReturns 属性必须设置为 true(参见 发布者确认与返回)。客户端通过注册一个 RabbitTemplate.ReturnsCallback(即调用 setReturnsCallback(ReturnsCallback callback))来接收返回消息。回调方法必须实现以下方法:
void returnedMessage(ReturnedMessage returned);
代码 ReturnedMessage 具有以下属性:
-
message- 返回的消息本身 -
replyCode- 表示返回原因的代码 -
replyText- 用于返回的文本原因 - 例如NO_ROUTE -
exchange- 消息发送至的交易所 -
routingKey- 用于的路由键
每个 RabbitTemplate 仅支持一个 ReturnsCallback。
另请参阅 回复超时。
对于发布者确认(也称为发布者确认),模板需要一个 CachingConnectionFactory,其 publisherConfirm 属性被设置为 ConfirmType.CORRELATED。确认信息由客户端通过调用 setConfirmCallback(ConfirmCallback callback) 注册一个 RabbitTemplate.ConfirmCallback 来发送给客户端。回调方法必须实现以下方法:
void confirm(CorrelationData correlationData, boolean ack, String cause);
客户端在发送原始消息时提供的对象是 CorrelationData。 ack 表示为 ack(即“失败”)时为真,而为 nack(即“成功”)时为假。 对于 nack 实例,如果在生成 nack 时可获取关闭原因,则 nack 中可能包含导致 nack 的原因。例如,在向不存在的交换机发送消息时,代理会关闭通道。关闭的原因将包含在 cause 中。cause 是在 1.4 版本中新增的。
一个 ConfirmCallback 仅由一个 RabbitTemplate 支持。
| 当兔子模板发送操作完成时,通道将被关闭。 这会阻止在连接工厂缓存已满时接收确认或返回(当缓存中有空间时,通道不会被物理关闭,确认和返回可正常进行)。 当缓存已满时,框架最多会延迟关闭5秒,以确保有足够时间接收确认和返回。 在使用确认机制时,通道会在收到最后一个确认时关闭。 仅使用返回机制时,通道将保持打开状态长达完整的5秒。 我们通常建议将连接工厂的 您可以通过使用 RabbitMQ 管理插件来监控通道使用情况。 如果您观察到通道被快速地打开和关闭,应考虑增大缓存大小,以减少对服务器的开销。 |
| 在 2.1 版本之前,启用发布者确认的通道会在收到确认前被放回缓存中。其他某个进程可能在此时获取该通道并执行某些操作,从而导致通道关闭——例如向一个不存在的交换机发送消息。这可能导致确认丢失。 2.1 版本及更高版本不再在确认未完成时将通道放回缓存。
通常情况下,这意味着一个通道上同时仅有一个确认处于待处理状态。 |
从版本 2.2 开始,回调将在连接工厂的 executor 线程之一上调用。此举旨在避免潜在的死锁:如果你在回调内部执行 Rabbit 操作,就可能出现死锁。在早期版本中,回调直接在 amqp-client 连接 I/O 线程上调用;如果此时执行某些 RPC 操作(例如打开新通道),则会发生死锁,因为 I/O 线程会阻塞等待结果,而该结果又需要由 I/O 线程自身进行处理。在那些旧版本中,必须在回调内将工作(如发送消息)移交至其他线程来完成。如今,由于框架已将回调调用交由执行器(executor)处理,这种操作已不再必要。 |
| 只要返回回调在60秒或更短时间内执行,仍会保持在确认(ack)之前收到返回消息的保证。</p><p>确认(confirm)将在返回回调执行完毕后或60秒后(以较早者为准)被调度发送。 |
对象 CorrelationData 具有一个 CompletableFuture,您可以使用它来获取结果,而无需在模板上使用 ConfirmCallback。以下示例展示了如何配置一个 CorrelationData 实例:
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...
由于这是一个 CompletableFuture<Confirm>,您可以在准备好时 get() 结果,或使用 whenComplete() 进行异步回调。
该 Confirm 对象是一个简单的 JavaBean,包含两个属性:ack 和 reason(用于 nack 实例)。
对于由代理生成的 nack 实例,该原因字段不会被填充。
但对于由框架生成的 nack 实例(例如,在存在未完成的 ack 实例时关闭连接),该字段会被填充。
此外,当同时启用确认和返回功能时,如果消息无法路由到任何队列,则 CorrelationData return 属性将填充返回的消息。可以保证在 Future 被设置为 ack 之前,已设置返回消息属性。CorrelationData.getReturn() 返回一个 ReturnMessage,其包含以下属性:
-
消息(返回的消息)
-
replyCode
-
replyText
-
交换
-
routingKey
参见作用域操作,以获取一种更简单的机制来等待发布者确认。
作用域操作
通常,使用模板时,会从缓存中检出一个 Channel(或创建它),用于该操作,然后将其返回到缓存中以供重用。在多线程环境中,无法保证下一次操作会使用相同的通道。然而,有时您可能希望对通道的使用拥有更多控制权,并确保若干操作均在同一通道上执行。
从版本 2.0 开始,提供了一种名为 invoke 的新方法,该方法带有一个 OperationsCallback。在回调作用域内执行的所有操作以及对提供的 RabbitOperations 参数执行的操作均使用相同的专用 Channel,该通道将在最后被关闭(不会返回到缓存中)。如果通道是 PublisherCallbackChannel,则在所有确认均已收到后将其返回到缓存中(参见 关联的发布者确认与返回)。
@FunctionalInterface
public interface OperationsCallback<T> {
T doInRabbit(RabbitOperations operations);
}
一个可能需要此功能的例子是,如果您希望在底层的 waitForConfirms() 方法上进行操作。该方法此前未通过 Spring API 暴露,因为通道通常被缓存并共享(如前所述)。现在,RabbitTemplate 提供了 waitForConfirms(long timeout) 和 waitForConfirmsOrDie(long timeout),它们会将调用委托给在 OperationsCallback 的作用域内使用的专用通道。这些方法仅限于该作用域内使用,原因显而易见。
请注意,更高层次的抽象(允许您将确认与请求关联)在其他地方提供(参见 关联发布者确认和返回)。
如果您只想等待直到代理确认已交付消息,您可以使用以下示例中所示的技术:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
});
如果您希望在 RabbitAdmin 的作用域内,同一通道上不调用任何操作,则管理员必须使用与 invoke 操作相同的 RabbitTemplate 构建。
如果模板操作已经在现有事务的作用域内执行(例如,在已启用事务的监听容器线程上运行,并在已启用事务的模板上执行操作),那么前面的讨论就不再适用。在这种情况下,操作将在该通道上执行,并在该线程返回到容器时提交。在这种场景下,无需使用 invoke。 |
以这种方式使用确认时,用于将确认与请求关联的大部分基础设施实际上并不需要(除非也启用了返回)。
从版本 2.2 开始,连接工厂支持一个名为 publisherConfirmType 的新属性。
当将其设置为 ConfirmType.SIMPLE 时,可避免相关基础设施,从而提升确认处理的效率。
此外,RabbitTemplate 会设置发送消息中的 publisherSequenceNumber 属性 MessageProperties。如果您希望检查(或记录或以其他方式使用)特定的确认信息,可以使用重载的 invoke 方法,如下例所示:
public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
com.rabbitmq.client.ConfirmCallback nacks);
这些 ConfirmCallback 对象(针对 ack 和 nack 实例)是 Rabbit 客户端回调,而非模板回调。 |
以下示例记录了 ack 和 nack 个实例:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
}, (tag, multiple) -> {
log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
log.info("Nack: " + tag + ":" + multiple);
}));
| 作用域操作与线程绑定。多线程环境中的严格消息排序 一文讨论了多线程环境中的严格排序问题。 |
多线程环境中的严格消息排序
在作用域操作中的讨论仅适用于在同一线程上执行的操作。
考虑以下情况:
-
thread-1向队列发送一条消息,并将工作交由thread-2处理 -
thread-2向同一队列发送一条消息
由于 RabbitMQ 的异步特性和缓存通道的使用,无法保证始终使用同一通道,因此消息到达队列的顺序无法得到保障。(在大多数情况下,消息会按顺序到达,但出现乱序传递的概率并非为零)。为解决此用例,可使用大小为 1 的有界通道缓存(配合 channelCheckoutTimeout)来确保所有消息始终通过同一通道发布,从而保证消息顺序。若您的连接工厂还有其他用途(如消费者),则应为模板单独配置一个连接工厂,或配置模板以使用主连接工厂中嵌入的发布者连接工厂(详见 使用独立连接)。
这最好通过一个简单的 Spring Boot 应用程序来说明:
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
publisherCF.setChannelCacheSize(1);
publisherCF.setChannelCheckoutTimeout(1000L);
return ccf;
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
Service(RabbitTemplate template, TaskExecutor exec) {
template.setUsePublisherConnection(true);
this.template = template;
this.exec = exec;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
}
void secondaryService(String toSend) {
LOG.info("Publishing from secondary service");
this.template.convertAndSend("queue", toSend);
}
}
尽管发布是在两个不同的线程上执行的,但它们都将使用同一个通道,因为缓存最多只允许一个通道。
从版本 2.3.7 开始,ThreadChannelConnectionFactory 支持将线程的通道(channel)转移给另一线程,方法是使用 prepareContextSwitch 和 switchContext 方法。
第一种方法返回一个上下文,该上下文传递给第二个线程,由其调用第二种方法。
一个线程可以绑定非事务性通道或事务性通道(或两者兼有);除非您使用两个连接工厂,否则无法单独转移它们。
示例如下:
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
ThreadChannelConnectionFactory tccf() {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
private final ThreadChannelConnectionFactory connFactory;
Service(RabbitTemplate template, TaskExecutor exec,
ThreadChannelConnectionFactory tccf) {
this.template = template;
this.exec = exec;
this.connFactory = tccf;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
Object context = this.connFactory.prepareSwitchContext();
this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
}
void secondaryService(String toSend, Object threadContext) {
LOG.info("Publishing from secondary service");
this.connFactory.switchContext(threadContext);
this.template.convertAndSend("queue", toSend);
this.connFactory.closeThreadChannel();
}
}
一旦调用 prepareSwitchContext,如果当前线程执行任何其他操作,它们都将在一个新的通道上执行。当不再需要时,务必关闭与线程绑定的通道。 |
消息集成
从版本 1.4 开始,RabbitMessagingTemplate(基于 RabbitTemplate 构建)提供了与 Spring Framework 消息抽象的集成——即,org.springframework.messaging.Message。这使您能够通过 spring-messaging Message<?> 抽象发送和接收消息。该抽象被其他 Spring 项目所使用,例如 Spring Integration 和 Spring 对 STOMP 的支持。涉及两种消息转换器:一种用于在 spring-messaging 的 Message<?> 与 Spring AMQP 的 Message 抽象之间进行转换;另一种用于在 Spring AMQP 的 Message 抽象与底层 RabbitMQ 客户端库所需的格式之间进行转换。默认情况下,消息负载由提供的 RabbitTemplate 实例的消息转换器进行转换。或者,您可以注入一个自定义的 MessagingMessageConverter 及其对应的其他负载转换器,如下例所示:
MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);
已验证用户ID
从版本 1.6 开始,模板现在支持 user-id-expression(在使用 Java 配置时为 userIdExpression)。
如果发送了一条消息,则在评估此表达式后,会设置用户 ID 属性(若尚未设置)。
评估的根对象是待发送的消息。
以下示例展示了如何使用 user-id-expression 属性:
<rabbit:template ... user-id-expression="'guest'" />
<rabbit:template ... user-id-expression="@myConnectionFactory.username" />
第一个示例是字面量表达式。
第二个示例从应用上下文中的连接工厂bean中获取username属性。
使用独立的连接
从版本 2.0.2 开始,您可以将 usePublisherConnection 属性设置为 true,以在可能的情况下使用与监听器容器所用连接不同的连接。此举旨在避免当生产者因任何原因被阻塞时,消费者也被阻塞。连接工厂为此目的维护一个内部的第二个连接工厂;默认情况下,它与主工厂类型相同,但如果您希望为发布使用不同的工厂类型,也可以显式进行设置。如果 Rabbit 模板在由监听器容器启动的事务中运行,则无论此设置如何,均使用容器的通道。
通常情况下,您不应将 RabbitAdmin 与一个将此设置为 true 的模板一起使用。请使用接受连接工厂参数的 如果您使用了另一个接受模板参数的构造函数,请确保该模板的属性设置为 这是因为,通常管理员会用于为监听器容器声明队列。 如果使用了一个将该属性设置为 在这种情况下,这些队列将无法被容器使用。 |