|
此版本仍在开发中,目前尚不稳定。如需最新稳定版本,请使用 Spring AMQP 4.0.2! |
弹性:从错误和代理故障中恢复
Spring AMQP 提供的一些关键(且最流行)的高层功能,主要涉及在协议错误或消息代理故障发生时的恢复与自动重连功能。我们已在本指南中介绍了所有相关组件,但在此将它们汇总起来并单独指出各项功能及恢复场景,应能有所帮助。
主要的重连功能由 CachingConnectionFactory 本身启用。此外,通常也建议使用 RabbitAdmin 的自动声明功能。另外,如果您关注确保消息送达,可能还需要在 RabbitTemplate 和 SimpleMessageListenerContainer 中使用 channelTransacted 标志,以及在 SimpleMessageListenerContainer 中使用 AcknowledgeMode.AUTO(或手动处理确认,若您自行处理确认机制)。
自动声明交换机、队列和绑定
组件 RabbitAdmin 可以在启动时声明交换机、队列和绑定。它通过 ConnectionListener 懒惰地完成此操作。因此,如果在启动时代理(broker)不存在,也无妨。第一次使用 Connection 时(例如,通过发送消息),监听器将被触发,并应用管理功能。在监听器中执行自动声明的另一大好处是,若因任何原因(例如,代理宕机、网络故障等)导致连接断开,当连接重新建立时,这些声明会再次生效。
以这种方式声明的队列必须具有固定名称——要么显式声明,要么由框架为 AnonymousQueue 个实例生成。匿名队列是非持久的、独占的,并且在使用后会自动删除。 |
自动声明仅在 CachingConnectionFactory 缓存模式为 CHANNEL(默认值)时执行。此限制存在,因为独占队列和自动删除队列绑定到连接上。 |
从版本 2.2.2 开始,RabbitAdmin 将检测类型为 DeclarableCustomizer 的 Bean,并在实际处理声明之前应用该函数。这很有用,例如,在框架中首次支持该功能之前,可设置新的参数(属性)。
@Bean
public DeclarableCustomizer customizer() {
return dec -> {
if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
dec.addArgument("some.new.queue.argument", true);
}
return dec;
};
}
在那些不直接提供对 Declarable Bean 定义访问权限的项目中,这也非常有用。
请参阅 RabbitMQ 自动连接/拓扑恢复。
同步操作中的失败及重试选项
如果您在使用 RabbitTemplate(例如)时于同步序列中丢失了与代理服务器的连接,Spring AMQP 将抛出一个 AmqpException(通常但不总是 AmqpIOException)。
我们不会试图掩盖问题的发生,因此您必须能够捕获并响应该异常。
如果您怀疑连接已丢失(且并非由您造成),最简单的做法是重新尝试该操作。
您可以手动完成此操作,也可以考虑使用 Spring Retry 来处理重试(无论是以编程方式还是声明式方式)。
Spring Retry 提供了几种 AOP 拦截器,并具有极大的灵活性,可用于指定重试参数(包括重试次数、异常类型、退避算法等)。Spring AMQP 还提供了一些便捷的工厂 bean,用于以适合 AMQP 使用场景的便捷形式创建 Spring 重试拦截器,并提供强类型回调接口,您可使用这些接口实现自定义恢复逻辑。查看 StatefulRetryOperationsInterceptorFactoryBean 和 StatelessRetryOperationsInterceptorFactoryBean 的 Javadoc 及属性以获取更多详细信息。无状态重试适用于没有事务或在重试回调内部启动事务的情况。请注意,无状态重试比有状态重试更易于配置和分析,但若存在必须回滚或肯定将回滚的正在进行中的事务,则通常不适用。在事务进行过程中发生的连接中断,应产生与回滚相同的效果。因此,对于事务在调用栈更高处启动的重连场景,有状态重试通常是最佳选择。有状态重试需要一种机制来唯一标识一条消息。最简单的做法是让发送方在 MessageId 消息属性中放入一个唯一值。提供的消息转换器提供了此选项:您可以将 createMessageIds 设置为 true。否则,您可以将 MessageKeyGenerator 实现注入到拦截器中。密钥生成器必须为每条消息返回一个唯一密钥。在2.0版本之前的版本中。0, 提供了一个 MissingMessageIdAdvice。它使得没有 messageId 属性的消息可以被精确重试一次(忽略重试设置)。此建议已不再提供,因为自 spring-retry 版本起已不再支持。2, 其功能已集成到拦截器和消息监听容器中。
| 为了向后兼容性,一条消息 ID 为 null 的消息默认情况下会被视为对消费者致命的(消费者将被停止),即在重试一次后。 要复现 在此设置下,消费者将继续运行,而消息会被拒绝(在重试一次后)。 该消息将被丢弃或路由到死信队列(如果已配置的话)。 |
从版本 1.3 开始,提供了一个构建器 API,用于辅助通过 Java(在 @Configuration 类中)组装这些拦截器。以下示例展示了如何实现此操作:
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxRetries(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
仅能以这种方式配置一部分重试功能。更高级的功能则需要配置 RetryPolicy。有关可用策略及其配置的更多信息,请参阅 RetryPolicy Javadoc。
重试批处理监听器
不建议在批处理监听器中配置重试,除非该批处理由生产者创建,并且是单条记录。有关消费者和生产者创建的批处理的信息,请参阅批处理消息。对于由消费者创建的批处理,框架无法获知批处理中哪条消息导致了失败,因此在重试次数用尽后无法进行恢复。而对于由生产者创建的批处理,由于实际上只有单条消息失败,整个消息均可被恢复。应用程序可能希望通知自定义恢复器,以指明失败发生在批处理中的哪个位置,例如通过设置抛出异常的索引属性。
批处理监听器的重试恢复器必须实现 MessageBatchRecoverer。
消息监听器与异步情况
如果 MessageListener 因业务异常而失败,该异常由消息监听容器处理,随后容器会返回继续监听其他消息。
如果失败是由连接断开(非业务异常)导致的,则负责为监听器收集消息的消费者必须被取消并重新启动。
SimpleMessageListenerContainer 可无缝处理此情况,并在日志中记录监听器正在重启的信息。
实际上,它会无限循环,持续尝试重启消费者。
只有当消费者行为极其恶劣时,它才会放弃。
一个副作用是:如果在容器启动时消息代理(broker)不可用,容器将持续重试,直到成功建立连接为止。
业务异常处理(与协议错误和连接断开不同)可能需要更多的思考和一些自定义配置,特别是当使用事务或容器确认时。
在 2.8.x 版本之前,RabbitMQ 尚未定义死信(dead letter)行为。
因此,默认情况下,因业务异常被拒绝或回滚的消息可能会被无限次重新投递。
为限制客户端的重投递次数,一种选择是在监听器的建议链(advice chain)中设置一个 StatefulRetryOperationsInterceptor。
拦截器可包含一个恢复回调(recovery callback),用于实现自定义的死信处理动作——具体应根据您所处的特定环境进行调整。
另一种选择是将容器的 defaultRequeueRejected 属性设置为 false。这会导致所有失败的消息被丢弃。在使用 RabbitMQ 2.8.x 或更高版本时,这也便于将消息投递到死信交换器(dead letter exchange)。
或者,您可以抛出一个 AmqpRejectAndDontRequeueException。此举可防止消息重新入队,无论 defaultRequeueRejected 属性的设置如何。
从版本 2.1 开始,引入了一个 ImmediateRequeueAmqpException 来执行完全相反的逻辑:无论 defaultRequeueRejected 属性如何设置,消息都将被重新入队。
通常,会同时使用这两种技术的组合。
您可以在通知链中使用一个 StatefulRetryOperationsInterceptor,并配合一个抛出 AmqpRejectAndDontRequeueException 的 MessageRecoverer。
当所有重试均告失败时,MessageRecover 会被调用。RejectAndDontRequeueRecoverer 正是执行此操作的组件。
默认的 MessageRecoverer 会处理异常消息,并发出一个 WARN 消息。
从版本 1.3 开始,提供了一种新的 RepublishMessageRecoverer,以便在重试次数用尽后发布失败的消息。
当恢复器处理最后一个异常时,消息会被确认(ack),并且如果已配置,则不会由代理发送至死信交换机。
当在消费者端使用 RepublishMessageRecoverer 时,接收到的消息在 receivedDeliveryMode 消息属性中包含 deliveryMode。此时,deliveryMode 为 null。这意味着代理端采用 NON_PERSISTENT 传递模式。从 2.0 版本开始,您可以配置 RepublishMessageRecoverer 的 deliveryMode,以在消息被重新发布且满足 null 条件时将其设置到消息中。默认情况下,它使用 MessageProperties 默认值——MessageDeliveryMode.PERSISTENT。 |
以下示例展示了如何将 RepublishMessageRecoverer 设置为恢复器:
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxRetries(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
}
该 RepublishMessageRecoverer 会以消息头中附加额外信息(如异常消息、堆栈跟踪、原始交换信息及路由键)的方式发布消息。可通过创建子类并重写 additionalHeaders() 来添加更多消息头。此外,deliveryMode(或其他任何属性)也可在 additionalHeaders() 中进行修改,如下例所示:
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {
protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
message.getMessageProperties()
.setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
return null;
}
};
从版本 2.0.5 开始,如果堆栈跟踪过大,可能会被截断;这是因为所有头信息都必须容纳在一个单独的帧内。默认情况下,如果堆栈跟踪会导致其他头信息可用空间少于 20,000 字节(“预留空间”),则会进行截断。您可以通过设置恢复器的 frameMaxHeadroom 属性来调整此值,以根据需要为其他头信息分配更多或更少的空间。从版本 2.1.13 和 2.2.3 开始,异常消息已包含在此计算中,并且将使用以下算法最大化堆栈跟踪的长度:
-
如果堆栈跟踪本身超过限制,异常消息标题将被截断为97字节加上
…,同时堆栈跟踪也会被截断。 -
如果堆栈跟踪较短,消息将被截断(加上
…)以适应可用字节数(但堆栈跟踪内部的消息本身将被截断为 97 字节加…)。
无论发生何种截断,原始异常都会被记录下来,以保留完整信息。评估在头部增强之后进行,以便可将异常类型等信息用于表达式中。
从版本 2.4.8 开始,错误交换和路由键可作为 SpEL 表达式提供,其中 Message 是用于求值的根对象。
从版本 2.3.3 开始,提供了一个新的子类 RepublishMessageRecovererWithConfirms;该子类同时支持两种形式的发布者确认机制,并会在等待确认返回结果(或在未确认、消息被退回时抛出异常)之前进行阻塞。
如果确认类型为 CORRELATED,子类也会检测是否返回了消息,并抛出 AmqpMessageReturnedException;如果发布被否定确认,则抛出 AmqpNackReceivedException。
如果确认类型为 SIMPLE,子类将调用通道上的 waitForConfirmsOrDie 方法。
请参阅 发布者确认和返回 以了解有关确认和返回的更多信息。
从版本 2.1 开始,会向 ImmediateRequeueMessageRecoverer 添加一个以抛出 ImmediateRequeueAmqpException,从而通知监听器容器重新将当前失败的消息重新入队。
Spring 重试异常分类
Spring Retry 在确定哪些异常可以触发重试方面具有极大的灵活性。默认配置会对所有异常进行重试。由于用户异常会被包装在 ListenerExecutionFailedException 中,因此我们需要确保分类器检查异常的底层原因。默认的分类器仅检查最顶层的异常。
自 Spring Retry 1.0.3 起,BinaryExceptionClassifier 具有一个名为 traverseCauses 的属性(默认值为 false)。当 true 时,它会遍历异常原因,直到找到匹配项或无更多原因为止。
要使用此分类器进行重试,您可以使用一个通过接受最大尝试次数的构造函数创建的 SimpleRetryPolicy、Map(即 Exception 实例的 traverseCauses)以及布尔值(traverseCauses),并将此策略注入到 RetryTemplate 中。
重试通过代理
从队列中死信的消息可在通过 DLX 重新路由后,重新发布回此队列。此重试行为由代理端通过 x-death 头部进行控制。有关此方法的更多信息,请参阅官方 RabbitMQ 文档。
另一种方法是,从应用程序中手动将失败的消息重新发布回原始交换机。
从版本 4.0 开始,RabbitMQ 代理不再考虑客户端发送的 x-death 头部。
本质上,任何来自客户端的 x-* 头部均被忽略。
为缓解此 RabbitMQ 代理的新行为,Spring AMQP 在 3.2 版本中引入了 retry_count 头部。当该头部缺失且服务器端死信交换(DLX)正在生效时,x-death.count 属性会被映射到此头部。当失败的消息被手动重新发布以进行重试时,retry_count 头部的值必须手动递增。详情请参见 Javadoc。
以下示例总结了在代理上进行手动重试的算法:
@RabbitListener(queues = "some_queue")
public void rePublish(Message message) {
try {
// Process message
}
catch (Exception ex) {
Long retryCount = message.getMessageProperties().getRetryCount();
if (retryCount < 3) {
message.getMessageProperties().incrementRetryCount();
this.rabbitTemplate.send("", "some_queue", message);
}
else {
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
}
}