|
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring AMQP 3.2.0! |
弹性:从错误和代理故障中恢复
Spring AMQP 提供的一些关键(也是最流行的)高级功能与在发生协议错误或代理故障时的恢复和自动重新连接有关。 我们已经在本指南中看到了所有相关组件,但将它们全部汇集在这里并单独介绍功能和恢复场景应该会有所帮助。
主要的重新连接功能由CachingConnectionFactory本身。
使用RabbitAdmin自动声明功能。
此外,如果您关心保证送达,您可能还需要使用channelTransacted标志输入RabbitTemplate和SimpleMessageListenerContainer和AcknowledgeMode.AUTO(如果您自己执行 ACK,则为 MANUAL)SimpleMessageListenerContainer.
自动声明交换、队列和绑定
这RabbitAdmin组件可以在启动时声明 exchanges、queues 和 bindings。
它通过ConnectionListener.
因此,如果 broker 在启动时不存在,则无关紧要。
第一次Connection(例如,
通过发送消息)将触发侦听器并应用 Admin 功能。
在侦听器中执行 auto 声明的另一个好处是,如果由于任何原因(例如,
Broker Death、Network Glitch 等),则在重新建立连接时会再次应用它们。
以这种方式声明的队列必须具有固定名称 — 要么显式声明,要么由框架为AnonymousQueue实例。
匿名队列是非持久队列、独占队列和自动删除队列。 |
仅当CachingConnectionFactorycache 模式为CHANNEL(默认值)。
存在此限制是因为独占队列和自动删除队列绑定到连接。 |
从版本 2.2.2 开始,RabbitAdmin将检测DeclarableCustomizer并在实际处理声明之前应用该函数。
这很有用,例如,在框架中具有一等支持之前设置新参数 (property)。
@Bean
public DeclarableCustomizer customizer() {
return dec -> {
if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
dec.addArgument("some.new.queue.argument", true);
}
return dec;
};
}
在不提供对Declarablebean 定义。
另请参阅 RabbitMQ 自动连接/拓扑恢复。
同步作失败和重试选项
如果在使用RabbitTemplate(例如),Spring AMQP 会抛出一个AmqpException(通常,但并非总是,AmqpIOException).
我们不会试图隐藏存在问题的事实,因此您必须能够捕获并响应异常。
如果您怀疑连接丢失(这不是您的错),最简单的方法是再次尝试该作。
你可以手动执行此作,也可以考虑使用 Spring Retry 来处理重试(命令式或声明式)。
Spring Retry 提供了几个 AOP 拦截器,并且具有很大的灵活性来指定重试的参数(尝试次数、异常类型、退避算法等)。
Spring AMQP 还提供了一些方便的工厂 bean,用于以方便的形式为 AMQP 用例创建 Spring 重试拦截器,并具有可用于实现自定义恢复逻辑的强类型回调接口。
请参阅 Javadoc 和StatefulRetryOperationsInterceptor和StatelessRetryOperationsInterceptor了解更多详情。
如果没有事务,或者事务是在重试回调中启动的,则无状态重试是合适的。
请注意,无状态重试比有状态重试更易于配置和分析,但如果必须回滚或肯定要回滚正在进行的事务,则通常不合适。
在事务中间丢弃的连接应具有与回滚相同的效果。
因此,对于事务在堆栈上层启动的重新连接,有状态重试通常是最佳选择。
有状态重试需要一种机制来唯一标识消息。
最简单的方法是让发送者在MessageIdmessage 属性。
提供的消息转换器提供了一个选项来执行此作:您可以将createMessageIds自true.
否则,您可以注入MessageKeyGeneratorimplementation 复制到拦截器中。
密钥生成器必须为每条消息返回一个唯一的密钥。
在版本 2.0 之前的版本中,MissingMessageIdAdvice。
它启用了没有messageId属性设置为仅重试一次(忽略重试设置)。
此建议不再提供,因为spring-retry版本 1.2 中,其功能内置于 Interceptor 和 Message Listener 容器中。
为了向后兼容,默认情况下(重试一次后),消息 ID 为 null 的消息被视为对使用者(使用者已停止)致命。
要复制MissingMessageIdAdvice中,您可以设置statefulRetryFatalWithNullMessageIdproperty 设置为false在 Listener 容器上。
使用该设置,使用者将继续运行,消息将被拒绝(重试一次后)。
它被丢弃或路由到死信队列(如果已配置)。 |
从版本 1.3 开始,提供了一个构建器 API 来帮助使用 Java 组装这些拦截器(在@Configuration类)。
以下示例显示了如何执行此作:
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
只能以这种方式配置重试功能的子集。
更高级的功能需要配置RetryTemplate作为 Spring bean 进行。
有关可用策略及其配置的完整信息,请参阅 Spring 重试 Javadoc。
使用 Batch 侦听器重试
建议不要使用批处理侦听器配置重试,除非该批处理是由创建者在单个记录中创建的。 请参阅Batched Messages 以了解有关使用者和创建者创建的批处理的信息。 对于使用者创建的批处理,框架不知道批处理中的哪条消息导致了失败,因此在重试用尽后无法恢复。 对于创建者创建的批处理,由于实际上只有一条消息失败,因此可以恢复整个消息。 应用程序可能希望通知自定义恢复程序在批处理中发生故障的位置,可能通过设置引发的异常的 index 属性。
批处理侦听器的重试 recoverer 必须实现MessageBatchRecoverer.
消息侦听器和异步情况
如果MessageListener由于业务异常而失败,则异常由 Message Listener 容器处理,然后该容器返回侦听另一条消息。
如果失败是由断开的连接引起的(不是业务异常),则必须取消并重新启动为侦听器收集消息的使用者。
这SimpleMessageListenerContainer无缝处理此问题,并留下一个日志来说明侦听器正在重新启动。
事实上,它无休止地循环,试图重新启动消费者。
只有当消费者确实行为非常糟糕时,它才会放弃。
一个副作用是,如果 broker 在容器启动时关闭,它会一直尝试,直到可以建立连接。
与协议错误和断开连接相反,业务异常处理可能需要更多的思考和一些自定义配置,尤其是在使用事务或容器 ack 时。
在 2.8.x 之前,RabbitMQ 没有死信行为的定义。
因此,默认情况下,由于业务异常而被拒绝或回滚的消息可以无限地重新传递。
要限制客户端的重新投放次数,一种选择是StatefulRetryOperationsInterceptor在侦听器的通知链中。
拦截器可以具有实现自定义死信作的恢复回调 — 任何适合您的特定环境的方法。
另一种选择是将容器的defaultRequeueRejectedproperty 设置为false.
这会导致所有失败的消息都被丢弃。
当使用 RabbitMQ 2.8.x 或更高版本时,这也有助于将消息传送到死信交换。
或者,您可以抛出AmqpRejectAndDontRequeueException.
这样做可以防止消息重新排队,而不管defaultRequeueRejected财产。
从版本 2.1 开始,ImmediateRequeueAmqpException来执行完全相反的逻辑:消息将被重新排队,而不管defaultRequeueRejected财产。
通常,会结合使用这两种技术。
您可以使用StatefulRetryOperationsInterceptor在通知链中,使用MessageRecoverer这会抛出一个AmqpRejectAndDontRequeueException.
这MessageRecover在所有重试都已用尽时调用。
这RejectAndDontRequeueRecoverer确实如此。
默认的MessageRecoverer使用错误消息并发出WARN消息。
从版本 1.3 开始,新的RepublishMessageRecoverer,以允许在重试用尽后发布失败的消息。
当 recoverer 使用最终异常时,该消息将被确认,并且不会由 broker 发送到死信交换(如果已配置)。
什么时候RepublishMessageRecoverer在消费者端使用,收到的消息有deliveryMode在receivedDeliveryModemessage 属性。
在这种情况下,deliveryMode是null.
这意味着NON_PERSISTENTbroker 上的 delivery 模式。
从版本 2.0 开始,您可以配置RepublishMessageRecoverer对于deliveryMode设置为重新发布的消息(如果是null.
默认情况下,它使用MessageProperties默认值 -MessageDeliveryMode.PERSISTENT. |
以下示例演示如何设置RepublishMessageRecoverer作为 recoverer:
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
}
这RepublishMessageRecoverer发布消息时,消息标头中包含其他信息,例如异常消息、堆栈跟踪、原始交换和路由密钥。
可以通过创建子类并覆盖来添加其他 HeadersadditionalHeaders().
这deliveryMode(或任何其他属性)也可以在additionalHeaders(),如下例所示:
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {
protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
message.getMessageProperties()
.setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
return null;
}
};
从版本 2.0.5 开始,如果堆栈跟踪太大,则可能会截断堆栈跟踪;这是因为所有标头都必须适合单个帧。
默认情况下,如果堆栈跟踪导致可用于其他标头的字节少于 20,000 字节 ('headroom'),则它将被截断。
这可以通过设置 recoverer 的frameMaxHeadroomproperty,如果你需要更多或更少的空间来存储其他标头。
从版本 2.1.13、2.2.3 开始,异常消息包含在此计算中,并且将使用以下算法最大化堆栈跟踪量:
-
如果单独的堆栈跟踪会超过限制,则异常消息标头将被截断为 97 字节加
…并且堆栈跟踪也被截断。 -
如果堆栈跟踪很小,则消息将被截断(加上
…) 以适应可用字节(但堆栈跟踪本身中的消息被截断为 97 字节加…).
每当发生任何类型的截断时,都会记录原始异常以保留完整信息。 评估在增强标头后执行,以便可以在表达式中使用异常类型等信息。
从版本 2.4.8 开始,错误交换和路由密钥可以作为 SPEL 表达式提供,其中Message作为评估的根对象。
从版本 2.3.3 开始,一个新的子类RepublishMessageRecovererWithConfirms提供;这支持两种样式的发布者确认,并将在返回之前等待确认(如果未确认或返回消息,则引发异常)。
如果确认类型为CORRELATED中,子类还将检测是否返回消息并抛出AmqpMessageReturnedException;如果发布被否定确认,它将抛出一个AmqpNackReceivedException.
如果确认类型为SIMPLE,子类将调用waitForConfirmsOrDie方法。
请参阅 Publisher Confirms and Returns 以了解有关确认和返回的更多信息。
从版本 2.1 开始,ImmediateRequeueMessageRecoverer以抛出ImmediateRequeueAmqpException,它通知侦听器容器对当前失败的消息重新排队。
Spring 重试的异常分类
Spring Retry 在确定哪些异常可以调用重试方面具有很大的灵活性。
默认配置将对所有异常重试。
鉴于用户异常被包装在ListenerExecutionFailedException,我们需要确保 classification 检查异常原因。
默认分类器仅查看顶级异常。
从 Spring Retry 1.0.3 开始,BinaryExceptionClassifier具有一个名为traverseCauses(默认:false).
什么时候true,它会遍历异常原因,直到找到匹配项或没有原因。
要使用此分类器进行重试,您可以使用SimpleRetryPolicy使用采用最大尝试次数的构造函数创建,Map之Exception实例和布尔值 (traverseCauses) 并将此策略注入到RetryTemplate.
通过 Broker 重试
从 DLX 重新路由后,可以从队列中收到死信的消息重新发布回此队列。
此重试行为在代理端通过x-death页眉。
有关此方法的更多信息,请参阅 RabbitMQ 官方文档。
另一种方法是从应用程序手动将失败的消息重新发布回原始交换。
从 version 开始4.0,则 RabbitMQ 代理不会考虑x-death标头。
本质上,任何x-*客户端将忽略标头。
为了缓解 RabbitMQ 代理的这种新行为, Spring AMQP 引入了一个retry_count标头,从版本 3.2 开始。
当此标头不存在且服务器端 DLX 正在运行时,x-death.countproperty 映射到此标头。
当手动重新发布失败的消息进行重试时,retry_countheader 值必须手动递增。
看MessageProperties.incrementRetryCount()JavaDocs 了解更多信息。
以下示例总结了通过 broker 进行手动重试的算法:
@RabbitListener(queueNames = "some_queue")
public void rePublish(Message message) {
try {
// Process message
}
catch (Exception ex) {
Long retryCount = message.getMessageProperties().getRetryCount();
if (retryCount < 3) {
message.getMessageProperties().incrementRetryCount();
this.rabbitTemplate.send("", "some_queue", message);
}
else {
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
}
}