请使用 Spring AMQP 4.0.2(最新稳定版本)!spring-doc.cadn.net.cn

事务

Spring Rabbit 框架支持在同步和异步使用场景中自动管理事务,提供多种不同的语义选择,且可通过声明式方式配置,这与现有 Spring 事务用户所熟悉的用法一致。这使得许多(甚至大多数)常见的消息传递模式得以轻松实现。spring-doc.cadn.net.cn

有两种方式可以向框架表明所需的事务语义。在 RabbitTemplateSimpleMessageListenerContainer 中,都存在一个标志 channelTransacted,如果该标志为 true,则会指示框架使用事务通道,并在所有操作(发送或接收)结束后通过提交或回滚(根据结果)完成事务,其中异常将触发回滚。另一种方式是提供一个外部事务,作为当前操作的上下文,使用 Spring 的 PlatformTransactionManager 实现之一。如果框架在发送或接收消息时已有事务正在进行,且 channelTransacted 标志为 true,则消息事务的提交或回滚将推迟到当前事务结束时才执行。若 channelTransacted 标志为 false,则对消息操作不应用任何事务语义(自动确认)。spring-doc.cadn.net.cn

标志 channelTransacted 是一个配置时设置。它在 AMQP 组件创建时(通常在应用启动时)被声明并处理一次。外部事务在原则上更具动态性,因为系统在运行时会根据当前线程状态作出响应。然而,在实际应用中,当事务以声明方式叠加到应用程序上时,它也常常是一个配置设置。spring-doc.cadn.net.cn

对于同步用例且使用 RabbitTemplate 的情况,外部事务由调用者提供,无论是通过声明式方式还是命令式方式(根据个人偏好),其方式与典型的 Spring 事务模型一致。以下示例展示了一种声明式方法(通常更受推荐,因其非侵入性),其中模板已配置为 channelTransacted=truespring-doc.cadn.net.cn

@Transactional
public void doSomething() {
    String incoming = rabbitTemplate.receiveAndConvert();
    // do some more database processing...
    String outgoing = processInDatabaseAndExtractReply(incoming);
    rabbitTemplate.convertAndSend(outgoing);
}

在前面的示例中,收到一个 String 负载,经过转换后作为消息体发送到标记为 @Transactional 的方法内部。如果数据库处理因异常失败,则传入的消息会被返回至消息代理(broker),而传出的消息则不会被发送。此行为适用于链式调用中的任何事务性方法操作(除非例如直接操作 Channel 以提前提交事务)。spring-doc.cadn.net.cn

对于异步用例(使用 SimpleMessageListenerContainer 时),如果需要外部事务,则必须由容器在设置监听器时请求该事务。为了表明需要外部事务,用户需在配置容器时向其提供 PlatformTransactionManager 的实现类。以下示例展示了如何操作:spring-doc.cadn.net.cn

@Configuration
public class ExampleExternalTransactionAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setTransactionManager(transactionManager());
        container.setChannelTransacted(true);
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

}

在前面的示例中,事务管理器作为依赖注入从另一个 bean 定义(未显示)中引入,并且 channelTransacted 标志也设置为 truespring-doc.cadn.net.cn

spring-doc.cadn.net.cn

其效果是:如果监听器因异常而失败,则事务将回滚,同时消息也会被返还给消息代理(broker)。spring-doc.cadn.net.cn

值得注意的是,如果事务无法提交(例如,由于数据库约束错误或连接问题),AMQP 事务也会被回滚,消息同样会被返还给消息代理。spring-doc.cadn.net.cn

这有时被称为“最佳努力一阶段提交”(Best Efforts 1 Phase Commit),是一种用于可靠消息传递的非常强大的模式。spring-doc.cadn.net.cn

如果在前面的示例中,channelTransacted 标志被设置为 false(默认值),则外部事务仍会提供给监听器,但所有消息操作都将自动确认(auto-acked),因此即使业务操作回滚,消息操作也会被提交。spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

条件回滚

在 1.6.6 版本之前,当使用外部事务管理器(如 JDBC)时,向容器的 transactionAttribute 添加回滚规则没有任何效果。异常始终会回滚事务。spring-doc.cadn.net.cn

此外,当在容器的建议链中使用 事务建议 时,条件回滚作用不大,因为所有监听器异常都会被封装在一个 ListenerExecutionFailedException 中。spring-doc.cadn.net.cn

第一个问题已修复,现在规则已正确应用。此外,现已提供 ListenerFailedRuleBasedTransactionAttribute。它继承自 RuleBasedTransactionAttribute,唯一的区别在于它能感知 ListenerExecutionFailedException,并利用此类异常的原因来执行规则。此事务属性可直接在容器中使用,或通过事务通知(transaction advice)进行调用。spring-doc.cadn.net.cn

以下示例使用此规则:spring-doc.cadn.net.cn

@Bean
public AbstractMessageListenerContainer container() {
    ...
    container.setTransactionManager(transactionManager);
    RuleBasedTransactionAttribute transactionAttribute =
        new ListenerFailedRuleBasedTransactionAttribute();
    transactionAttribute.setRollbackRules(Collections.singletonList(
        new NoRollbackRuleAttribute(DontRollBackException.class)));
    container.setTransactionAttribute(transactionAttribute);
    ...
}

关于已接收消息回滚的说明

AMQP 事务仅适用于发送到代理的消息和确认(acks)。因此,当 Spring 事务发生回滚且消息已接收时,Spring AMQP 不仅需要回滚事务,还必须手动拒绝该消息(类似于 NACK,但根据规范,这并非正式称为 NACK 的操作)。消息拒绝所采取的操作与事务无关,而是取决于 defaultRequeueRejected 属性(默认值: true)。有关如何拒绝失败消息的更多信息,请参见 消息监听器与异步场景spring-doc.cadn.net.cn

有关 RabbitMQ 事务及其限制的更多信息,请参阅 RabbitMQ Broker Semanticsspring-doc.cadn.net.cn

在 RabbitMQ 2.7.0 之前,此类消息(以及任何在通道关闭或中止时未被确认的消息)会返回到 Rabbit 代理的队列末尾。</p><p>自 2.7.0 版本起,被拒绝的消息会进入队列头部,其处理方式类似于 JMS 中回滚的消息。
之前,事务回滚时消息重新入队的行为在本地事务和提供 TransactionManager 的情况下不一致。在前一种情况下,应用了正常的重新入队逻辑(AmqpRejectAndDontRequeueExceptiondefaultRequeueRejected=false)(参见 消息监听器与异步情况)。在使用事务管理器时,无论何种情况,消息均会在回滚时无条件重新入队。从 2.0 版本开始,行为变得一致,两种情况下均应用正常的重新入队逻辑。若要恢复到旧版行为,可将容器的 alwaysRequeueWithTxManagerRollback 属性设置为 true。参见 消息监听器容器配置

使用RabbitTransactionManager

RabbitTransactionManager》是将Rabbit操作置于外部事务内并与其同步执行的一种替代方案。此事务管理器实现了PlatformTransactionManager接口,并应与单个Rabbit ConnectionFactory配合使用。spring-doc.cadn.net.cn

此策略无法提供 XA 事务——例如,为了在消息传递和数据库访问之间共享事务。

应用程序代码需要通过 ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean) 获取事务性 Rabbit 资源,而不是通过标准的 Connection.createChannel() 调用并随后创建通道。当使用 Spring AMQP 的 RabbitTemplate 时,它会自动检测线程绑定的 Channel 并自动参与其事务。spring-doc.cadn.net.cn

使用 Java 配置,您可以通过以下 Bean 设置一个新的 RabbitTransactionManager:spring-doc.cadn.net.cn

@Bean
public RabbitTransactionManager rabbitTransactionManager() {
    return new RabbitTransactionManager(connectionFactory);
}

如果您更倾向于使用 XML 配置,可以在您的 XML 应用上下文文件中声明以下 Bean:spring-doc.cadn.net.cn

<bean id="rabbitTxManager"
      class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

事务同步

将 RabbitMQ 事务与某些其他事务(例如数据库管理系统 DBMS)进行同步,可提供“最佳努力一阶段提交”语义。在事务同步的后完成阶段,RabbitMQ 事务可能无法成功提交。此情况由 spring-tx 基础设施记录为错误日志,但不会向调用代码抛出异常。从版本 2.3.10 开始,您可以在同一处理事务的线程上,在事务提交后调用 ConnectionUtils.checkAfterCompletion()。若未发生异常,则该方法直接返回;否则将抛出一个 AfterCompletionFailedException,其包含一个表示完成同步状态的属性。spring-doc.cadn.net.cn

通过调用 ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true) 启用此功能;这是一个全局标志,适用于所有线程。spring-doc.cadn.net.cn