该版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring AMQP 3.2.6spring-doc.cadn.net.cn

交易

Spring Rabbit 框架支持同步和异步用例中的自动事务管理,具有许多不同的语义,可以通过声明方式选择,这是 Spring 事务的现有用户所熟悉的。 这使得许多(如果不是最常见的)消息传递模式易于实现。spring-doc.cadn.net.cn

有两种方法可以向框架发出所需的事务语义信号。 在这两个RabbitTemplateSimpleMessageListenerContainer,有一个标志channelTransacted如果true,告诉框架使用事务通道并通过提交或回滚(取决于结果)结束所有作(发送或接收),但异常表示回滚。 另一个信号是提供一个外部事务,其中一个 Spring 的PlatformTransactionManager实现作为正在进行的作的上下文。 如果框架在发送或接收消息时已经有正在进行的事务,并且channelTransactedflag 是true,则消息传递事务的提交或回滚将延迟到当前事务结束。 如果channelTransactedflag 是false,则没有事务语义适用于消息传递作(自动确认)。spring-doc.cadn.net.cn

channelTransacted标志是配置时间设置。 在创建 AMQP 组件时,通常在应用程序启动时,它会声明和处理一次。 原则上,外部事务更加动态,因为系统在运行时响应当前线程状态。 然而,在实践中,当事务以声明方式分层到应用程序时,它通常也是一种配置设置。spring-doc.cadn.net.cn

对于RabbitTemplate,外部事务由调用者根据喜好声明式或命令式提供(通常的 Spring 事务模型)。 以下示例显示了一种声明性方法(通常是首选,因为它是非侵入性的),其中模板配置了channelTransacted=true:spring-doc.cadn.net.cn

@Transactional
public void doSomething() {
    String incoming = rabbitTemplate.receiveAndConvert();
    // do some more database processing...
    String outgoing = processInDatabaseAndExtractReply(incoming);
    rabbitTemplate.convertAndSend(outgoing);
}

在前面的示例中,String有效负载在标记为@Transactional. 如果数据库处理失败并出现异常,那么传入消息将返回到代理,并且不会发送传出消息。 这适用于使用RabbitTemplate在事务方法链中(例如,除非Channel直接纵以提前提交事务)。spring-doc.cadn.net.cn

对于具有SimpleMessageListenerContainer,如果需要外部事务,则容器在设置侦听器时必须请求它。 为了发出需要外部事务的信号,用户提供了PlatformTransactionManager配置时到容器。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Configuration
public class ExampleExternalTransactionAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setTransactionManager(transactionManager());
        container.setChannelTransacted(true);
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

}

在前面的示例中,事务管理器被添加为从另一个 Bean 定义注入的依赖项(未显示),并且channelTransactedflag 也设置为true. 其效果是,如果侦听器因异常而失败,则事务将回滚,并且消息也会返回给代理。 值得注意的是,如果事务未能提交(例如,由于 数据库约束错误或连接问题),AMQP 事务也会回滚,并将消息返回给代理。 这有时被称为“尽力而为的 1 阶段提交”,是可靠消息传递的非常强大的模式。 如果channelTransactedflag 设置为false(默认值)在前面的示例中,仍将为侦听器提供外部事务,但所有消息传递作都将自动确认,因此即使在业务作回滚时,效果也是提交消息传递作。spring-doc.cadn.net.cn

条件回滚

在 1.6.6 版之前,将回滚规则添加到容器的transactionAttribute使用外部事务管理器(如 JDBC)时没有效果。 异常总是回滚事务。spring-doc.cadn.net.cn

此外,当在容器的通知链中使用事务通知时,条件回滚不是很有用,因为所有侦听器异常都包装在ListenerExecutionFailedException.spring-doc.cadn.net.cn

第一个问题已得到纠正,规则现已正确应用。 此外,ListenerFailedRuleBasedTransactionAttribute现在提供了。 它是RuleBasedTransactionAttribute,唯一的区别是它知道ListenerExecutionFailedException并将此类例外的原因用于规则。 此事务属性可以直接在容器中使用,也可以通过事务通知使用。spring-doc.cadn.net.cn

以下示例使用此规则:spring-doc.cadn.net.cn

@Bean
public AbstractMessageListenerContainer container() {
    ...
    container.setTransactionManager(transactionManager);
    RuleBasedTransactionAttribute transactionAttribute =
        new ListenerFailedRuleBasedTransactionAttribute();
    transactionAttribute.setRollbackRules(Collections.singletonList(
        new NoRollbackRuleAttribute(DontRollBackException.class)));
    container.setTransactionAttribute(transactionAttribute);
    ...
}

关于回滚已接收消息的说明

AMQP 事务仅适用于发送到代理的消息和 ack。 因此,当 Spring 事务回滚并收到消息时,Spring AMQP 不仅必须回滚事务,还必须手动拒绝消息(有点像 nack,但这不是规范所说的)。 对消息拒绝采取的作与事务无关,并且取决于defaultRequeueRejected属性(默认:true). 有关拒绝失败消息的详细信息,请参阅消息侦听器和异步情况spring-doc.cadn.net.cn

有关 RabbitMQ 事务及其限制的更多信息,请参阅 RabbitMQ 代理语义spring-doc.cadn.net.cn

在 RabbitMQ 2.7.0 之前,此类消息(以及通道关闭或中止时未确认的任何消息)会转到 Rabbit 代理的队列后面。 从 2.7.0 开始,被拒绝的消息将转到队列的前面,其方式与 JMS 回滚消息类似。
以前,事务回滚时的消息重新排队在本地事务之间不一致,并且当TransactionManager被提供。 在前一种情况下,正常的重新排队逻辑 (AmqpRejectAndDontRequeueExceptiondefaultRequeueRejected=false) 应用(请参阅消息侦听器和异步情况)。 使用事务管理器时,消息在回滚时无条件地重新排队。 从 V2.0 开始,行为是一致的,并且在这两种情况下都应用了正常的重新排队逻辑。 要恢复到以前的行为,您可以将容器的alwaysRequeueWithTxManagerRollback属性设置为true. 请参阅消息侦听器容器配置

RabbitTransactionManager

RabbitTransactionManager是在外部事务中执行 Rabbit作并与外部事务同步的替代方法。 此事务管理器是PlatformTransactionManager接口,应该与单个 Rabbit 一起使用ConnectionFactory.spring-doc.cadn.net.cn

此策略无法提供 XA 事务 — 例如,为了在消息传递和数据库访问之间共享事务。

需要应用程序代码才能通过以下方式检索事务性 Rabbit 资源ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)而不是标准Connection.createChannel()调用,并创建后续通道。 使用 Spring AMQP 的RabbitTemplate,它将自动检测线程绑定的 Channel 并自动参与其事务。spring-doc.cadn.net.cn

使用 Java 配置,您可以使用以下 bean 设置新的 RabbitTransactionManager:spring-doc.cadn.net.cn

@Bean
public RabbitTransactionManager rabbitTransactionManager() {
    return new RabbitTransactionManager(connectionFactory);
}

如果您更喜欢 XML 配置,则可以在 XML Application Context 文件中声明以下 bean:spring-doc.cadn.net.cn

<bean id="rabbitTxManager"
      class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

事务同步

将 RabbitMQ 事务与其他一些(例如 DBMS)事务同步可提供“尽力而为的单阶段提交”语义。 RabbitMQ 事务在事务同步的 after completion 阶段可能会提交失败。 这由spring-txinfrastructure 作为错误,但不会向调用代码引发异常。 从 2.3.10 版本开始,您可以调用ConnectionUtils.checkAfterCompletion()在事务在处理事务的同一线程上提交之后。 如果没有发生异常,它将简单地返回;否则它会抛出一个AfterCompletionFailedException它将具有表示完成的同步状态的属性。spring-doc.cadn.net.cn

通过调用ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true);这是一个全局标志,适用于所有线程。spring-doc.cadn.net.cn