该版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring AMQP 3.2.6spring-doc.cadn.net.cn

弹性:从错误和代理故障中恢复

Spring AMQP 提供的一些关键(也是最流行的)高级功能与在发生协议错误或代理故障时的恢复和自动重新连接有关。 我们已经在本指南中查看了所有相关组件,但将它们全部汇集在一起并单独调用功能和恢复方案应该会有所帮助。spring-doc.cadn.net.cn

主要重新连接功能由CachingConnectionFactory本身。 使用RabbitAdmin自动声明功能。 此外,如果您关心保证交付,您可能还需要使用channelTransacted标记RabbitTemplateSimpleMessageListenerContainerAcknowledgeMode.AUTO(如果您自己做 ack,则手动)在SimpleMessageListenerContainer.spring-doc.cadn.net.cn

自动声明交换、队列和绑定

RabbitAdmin组件可以在启动时声明交换、队列和绑定。 它通过ConnectionListener. 因此,如果代理在启动时不存在,则无关紧要。 第一次Connection(例如, 通过发送消息)触发侦听器并应用管理功能。 在侦听器中执行自动声明的另一个好处是,如果连接因任何原因(例如, 代理死亡、网络故障等),当重新建立连接时,它们将再次应用。spring-doc.cadn.net.cn

以这种方式声明的队列必须具有固定名称——显式声明或由框架生成AnonymousQueue实例。 匿名队列是非持久的、独占的和自动删除的。
仅当CachingConnectionFactory缓存模式为CHANNEL(默认值)。 存在此限制,因为独占队列和自动删除队列绑定到连接。

从 2.2.2 版本开始,RabbitAdmin将检测类型DeclarableCustomizer并在实际处理声明之前应用该函数。 例如,这对于在框架内获得一流支持之前设置新参数(属性)很有用。spring-doc.cadn.net.cn

@Bean
public DeclarableCustomizer customizer() {
    return dec -> {
        if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
            dec.addArgument("some.new.queue.argument", true);
        }
        return dec;
    };
}

它在不提供对Declarablebean 定义。spring-doc.cadn.net.cn

同步作失败和重试选项

如果您在使用RabbitTemplate(例如),Spring AMQP 会抛出AmqpException(通常,但并非总是,AmqpIOException). 我们不会试图隐藏存在问题的事实,因此您必须能够捕获并响应异常。 如果您怀疑连接丢失(这不是您的错),最简单的方法是重试该作。 您可以手动执行此作,也可以考虑使用 Spring Retry 来处理重试(命令式或声明式)。spring-doc.cadn.net.cn

Spring Retry 提供了几个 AOP 拦截器和很大的灵活性来指定重试的参数(尝试次数、异常类型、退避算法等)。 Spring AMQP 还提供了一些方便的工厂 bean,用于以方便的形式为 AMQP 用例创建 Spring Retry 拦截器,并具有可用于实现自定义恢复逻辑的强类型回调接口。 请参阅 Javadoc 和StatefulRetryOperationsInterceptorRetryOperationsInterceptor了解更多详情。 如果没有事务或事务在重试回调中启动,则无状态重试是合适的。 请注意,无状态重试比有状态重试更易于配置和分析,但如果存在必须回滚或肯定要回滚的正在进行的事务,则通常不合适。 在事务中间断开的连接应该与回滚具有相同的效果。 因此,对于事务在堆栈上启动的重新连接,有状态重试通常是最佳选择。 有状态重试需要一种机制来唯一标识消息。 最简单的方法是让发件人在MessageIdmessage 属性。 提供的消息转换器提供了一个选项来执行此作:您可以将createMessageIdstrue. 否则,您可以注入MessageKeyGenerator实现到拦截器中。 密钥生成器必须为每条消息返回一个唯一的密钥。 在 2.0 版之前的版本中,MissingMessageIdAdvice被提供。 它启用了没有messageId属性只重试一次(忽略重试设置)。 此建议不再提供,因为spring-retry版本 1.2,其功能内置于拦截器和消息侦听器容器中。spring-doc.cadn.net.cn

为了向后兼容,默认情况下(重试一次后),消息 ID 为 null 的消息对使用者(使用者已停止)被视为致命消息。 要复制MissingMessageIdAdvice,您可以将statefulRetryFatalWithNullMessageId属性设置为false在侦听器容器上。 使用此设置,使用者将继续运行,并且消息将被拒绝(重试一次后)。 它被丢弃或路由到死信队列(如果已配置)。

从 1.3 版开始,提供了一个构建器 API,以帮助使用 Java(在@Configuration类)。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public StatefulRetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateful()
            .maxAttempts(5)
            .backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
            .build();
}

只能以这种方式配置重试功能的子集。 更高级的功能需要配置RetryTemplate作为春豆。 有关可用策略及其配置的完整信息,请参阅 Spring Retry Javadocspring-doc.cadn.net.cn

使用批处理侦听器重试

不建议使用批处理侦听器配置重试,除非批处理是由生产者在单个记录中创建的。 有关使用者和生产者创建的批处理的信息,请参阅批处理消息。 对于使用者创建的批处理,框架不知道批处理中的哪条消息导致了失败,因此无法在重试用尽后进行恢复。 对于生产者创建的批处理,由于只有一条消息实际失败,因此可以恢复整个消息。 应用程序可能希望通过设置引发异常的索引属性来通知自定义恢复器在批处理中发生故障的位置。spring-doc.cadn.net.cn

批处理侦听器的重试恢复器必须实现MessageBatchRecoverer.spring-doc.cadn.net.cn

消息侦听器和异步情况

如果MessageListener由于业务异常而失败,则该异常由消息侦听器容器处理,然后返回侦听另一条消息。 如果故障是由断开连接(不是业务异常)引起的,则必须取消并为侦听器收集消息的使用者并重新启动。 这SimpleMessageListenerContainer无缝处理此问题,并留下一个日志,说明侦听器正在重新启动。 事实上,它无休止地循环,试图重启消费者。 只有当消费者确实表现得非常糟糕时,它才会放弃。 一个副作用是,如果代理在容器启动时关闭,它会继续尝试,直到可以建立连接。spring-doc.cadn.net.cn

与协议错误和断开连接相反,业务异常处理可能需要更多的思考和一些自定义配置,尤其是在使用事务或容器确认的情况下。在 2.8.x 之前,RabbitMQ 没有死信行为的定义。因此,默认情况下,由于业务异常而被拒绝或回滚的消息可以无休止地重新传递。要对客户端的重新传递次数进行限制,一个选择是StatefulRetryOperationsInterceptor在侦听器的通知链中。拦截器可以有一个恢复回调,用于实现自定义死信作——任何适合您的特定环境的作。spring-doc.cadn.net.cn

另一种方法是将容器的defaultRequeueRejected属性设置为false. 这会导致所有失败的消息都被丢弃。当使用 RabbitMQ 2.8.x 或更高版本时,这也有助于将消息传递到死信交换。spring-doc.cadn.net.cn

或者,您可以抛出一个AmqpRejectAndDontRequeueException. 这样做可以防止消息重新排队,无论defaultRequeueRejected财产。spring-doc.cadn.net.cn

从 2.1 版开始,ImmediateRequeueAmqpException引入来执行完全相反的逻辑:消息将被重新排队,而不管defaultRequeueRejected财产。spring-doc.cadn.net.cn

通常,使用这两种技术的组合。 您可以使用StatefulRetryOperationsInterceptor在通知链中,带有MessageRecoverer这会抛出一个AmqpRejectAndDontRequeueException. 这MessageRecover当所有重试都用完时调用。 这RejectAndDontRequeueRecoverer正是这样做的。 默认值MessageRecoverer消耗错误消息并发出WARN消息。spring-doc.cadn.net.cn

从 1.3 版开始,新的RepublishMessageRecoverer,以允许在重试用尽后发布失败的消息。spring-doc.cadn.net.cn

当恢复器使用最终异常时,消息将被确认,并且代理不会发送到死信交换(如果已配置)。spring-doc.cadn.net.cn

什么时候RepublishMessageRecoverer在消费者端使用,接收到的消息有deliveryModereceivedDeliveryModemessage 属性。 在本例中,deliveryModenull. 这意味着NON_PERSISTENT代理上的交付模式。 从 2.0 版开始,您可以配置RepublishMessageRecoverer对于deliveryMode设置为要重新发布的消息(如果是)null. 默认情况下,它使用MessageProperties默认值 -MessageDeliveryMode.PERSISTENT.

以下示例显示了如何设置RepublishMessageRecoverer作为恢复者:spring-doc.cadn.net.cn

@Bean
RetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateless()
            .maxAttempts(5)
            .recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
            .build();
}

RepublishMessageRecoverer在邮件标头中发布包含其他信息的邮件,例如异常邮件、堆栈跟踪、原始交换和路由密钥。 可以通过创建子类并覆盖additionalHeaders(). 这deliveryMode(或任何其他属性)也可以在additionalHeaders(),如以下示例所示:spring-doc.cadn.net.cn

RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {

    protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
        message.getMessageProperties()
            .setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
        return null;
    }

};

从 2.0.5 版本开始,如果堆栈跟踪太大,可能会被截断;这是因为所有标头都必须适合单个帧。 默认情况下,如果堆栈跟踪会导致其他标头可用的字节少于 20,000 字节(“余量”),则它将被截断。 这可以通过设置恢复器的frameMaxHeadroom属性,如果您需要更多或更少的空间来存储其他标头。 从 2.1.13、2.2.3 版本开始,异常消息将包含在此计算中,并且将使用以下算法最大化堆栈跟踪量:spring-doc.cadn.net.cn

  • 如果单独的堆栈跟踪将超过限制,则异常消息标头将被截断为 97 字节加…​并且堆栈跟踪也会被截断。spring-doc.cadn.net.cn

  • 如果堆栈跟踪很小,则消息将被截断(加上…​) 以适合可用字节(但堆栈跟踪本身中的消息被截断为 97 字节加…​).spring-doc.cadn.net.cn

每当发生任何类型的截断时,都会记录原始异常以保留完整信息。 在增强标头后执行评估,以便在表达式中使用异常类型等信息。spring-doc.cadn.net.cn

从版本 2.4.8 开始,错误交换和路由密钥可以作为 SpEL 表达式提供,使用Message作为评估的根对象。spring-doc.cadn.net.cn

从 2.3.3 版开始,一个新的子类RepublishMessageRecovererWithConfirms提供;这支持两种样式的发布者确认,并将等待确认后再返回(如果未确认或返回消息,则引发异常)。spring-doc.cadn.net.cn

如果确认类型为CORRELATED,子类还将检测是否返回消息并抛出AmqpMessageReturnedException;如果发布被否定确认,它将抛出一个AmqpNackReceivedException.spring-doc.cadn.net.cn

如果确认类型为SIMPLE,子类将调用waitForConfirmsOrDie方法。spring-doc.cadn.net.cn

有关确认和退货的更多信息,请参阅发布商确认和退货spring-doc.cadn.net.cn

从 2.1 版开始,ImmediateRequeueMessageRecoverer添加以抛出ImmediateRequeueAmqpException,它通知侦听器容器将当前失败的消息重新排队。spring-doc.cadn.net.cn

春季重试的异常分类

春季重试具有很大的灵活性,可以确定哪些异常可以调用重试。 默认配置对所有异常重试。 鉴于用户异常包装在ListenerExecutionFailedException,我们需要确保分类检查异常原因。 默认分类器仅查看顶级异常。spring-doc.cadn.net.cn

从 Spring Retry 1.0.3 开始,BinaryExceptionClassifier有一个名为traverseCauses(默认值:false). 什么时候true,它遍历异常原因,直到找到匹配项或没有原因。spring-doc.cadn.net.cn

若要使用此分类器进行重试,可以使用SimpleRetryPolicy使用采用最大尝试次数的构造函数创建,则MapException实例,布尔值 (traverseCauses) 并将此策略注入到RetryTemplate.spring-doc.cadn.net.cn

通过代理重试

从 DLX 重新路由后,可以从队列中死信的消息重新发布回此队列。 这种重试行为在代理端通过x-death页眉。 有关此方法的更多信息,请参阅官方 RabbitMQ 文档spring-doc.cadn.net.cn

另一种方法是从应用程序手动将失败的邮件重新发布回原始交换。 从版本开始4.0,RabbitMQ 代理不考虑x-death从客户端发送的标头。 从本质上讲,任何x-*客户端将忽略标头。spring-doc.cadn.net.cn

为了缓解 RabbitMQ 代理的这种新行为,Spring AMQP 引入了retry_count标头,从 3.2 版开始。 当此标头不存在且服务器端 DLX 正在运行时,则x-death.count属性映射到此标头。 当手动重新发布失败的消息以进行重试时,retry_count标头值必须手动递增。 有关更多信息,请参阅 Javadocspring-doc.cadn.net.cn

以下示例总结了对代理进行手动重试的算法:spring-doc.cadn.net.cn

@RabbitListener(queues = "some_queue")
public void rePublish(Message message) {
    try {
    // Process message
    }
    catch (Exception ex) {
        Long retryCount = message.getMessageProperties().getRetryCount();
        if (retryCount < 3) {
            message.getMessageProperties().incrementRetryCount();
            this.rabbitTemplate.send("", "some_queue", message);
        }
        else {
            throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
        }
    }
}