|
请使用 Spring AMQP 4.0.2(最新稳定版本)! |
事务
Spring Rabbit 框架支持在同步和异步使用场景中自动管理事务,提供多种不同的语义选择,且可通过声明式方式配置,这与现有 Spring 事务用户所熟悉的用法一致。这使得许多(甚至大多数)常见的消息传递模式得以轻松实现。
有两种方式可以向框架表明所需的事务语义。在 RabbitTemplate 和 SimpleMessageListenerContainer 中,都存在一个标志 channelTransacted,如果该标志为 true,则会指示框架使用事务通道,并在所有操作(发送或接收)结束后通过提交或回滚(根据结果)完成事务,其中异常将触发回滚。另一种方式是提供一个外部事务,作为当前操作的上下文,使用 Spring 的 PlatformTransactionManager 实现之一。如果框架在发送或接收消息时已有事务正在进行,且 channelTransacted 标志为 true,则消息事务的提交或回滚将推迟到当前事务结束时才执行。若 channelTransacted 标志为 false,则对消息操作不应用任何事务语义(自动确认)。
标志 channelTransacted 是一个配置时设置。它在 AMQP 组件创建时(通常在应用启动时)被声明并处理一次。外部事务在原则上更具动态性,因为系统在运行时会根据当前线程状态作出响应。然而,在实际应用中,当事务以声明方式叠加到应用程序上时,它也常常是一个配置设置。
对于同步用例且使用 RabbitTemplate 的情况,外部事务由调用者提供,无论是通过声明式方式还是命令式方式(根据个人偏好),其方式与典型的 Spring 事务模型一致。以下示例展示了一种声明式方法(通常更受推荐,因其非侵入性),其中模板已配置为 channelTransacted=true:
@Transactional
public void doSomething() {
String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}
在前面的示例中,收到一个 String 负载,经过转换后作为消息体发送到标记为 @Transactional 的方法内部。如果数据库处理因异常失败,则传入的消息会被返回至消息代理(broker),而传出的消息则不会被发送。此行为适用于链式调用中的任何事务性方法操作(除非例如直接操作 Channel 以提前提交事务)。
对于异步用例(使用 SimpleMessageListenerContainer 时),如果需要外部事务,则必须由容器在设置监听器时请求该事务。为了表明需要外部事务,用户需在配置容器时向其提供 PlatformTransactionManager 的实现类。以下示例展示了如何操作:
@Configuration
public class ExampleExternalTransactionAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
}
在前面的示例中,事务管理器作为依赖注入从另一个 bean 定义(未显示)中引入,并且 channelTransacted 标志也设置为 true。
其效果是:如果监听器因异常而失败,则事务将回滚,同时消息也会被返还给消息代理(broker)。
值得注意的是,如果事务无法提交(例如,由于数据库约束错误或连接问题),AMQP 事务也会被回滚,消息同样会被返还给消息代理。
这有时被称为“最佳努力一阶段提交”(Best Efforts 1 Phase Commit),是一种用于可靠消息传递的非常强大的模式。
如果在前面的示例中,channelTransacted 标志被设置为 false(默认值),则外部事务仍会提供给监听器,但所有消息操作都将自动确认(auto-acked),因此即使业务操作回滚,消息操作也会被提交。
条件回滚
在 1.6.6 版本之前,当使用外部事务管理器(如 JDBC)时,向容器的 transactionAttribute 添加回滚规则没有任何效果。异常始终会回滚事务。
此外,当在容器的建议链中使用 事务建议 时,条件回滚作用不大,因为所有监听器异常都会被封装在一个 ListenerExecutionFailedException 中。
第一个问题已修复,现在规则已正确应用。此外,现已提供 ListenerFailedRuleBasedTransactionAttribute。它继承自 RuleBasedTransactionAttribute,唯一的区别在于它能感知 ListenerExecutionFailedException,并利用此类异常的原因来执行规则。此事务属性可直接在容器中使用,或通过事务通知(transaction advice)进行调用。
以下示例使用此规则:
@Bean
public AbstractMessageListenerContainer container() {
...
container.setTransactionManager(transactionManager);
RuleBasedTransactionAttribute transactionAttribute =
new ListenerFailedRuleBasedTransactionAttribute();
transactionAttribute.setRollbackRules(Collections.singletonList(
new NoRollbackRuleAttribute(DontRollBackException.class)));
container.setTransactionAttribute(transactionAttribute);
...
}
关于已接收消息回滚的说明
AMQP 事务仅适用于发送到代理的消息和确认(acks)。因此,当 Spring 事务发生回滚且消息已接收时,Spring AMQP 不仅需要回滚事务,还必须手动拒绝该消息(类似于 NACK,但根据规范,这并非正式称为 NACK 的操作)。消息拒绝所采取的操作与事务无关,而是取决于 defaultRequeueRejected 属性(默认值: true)。有关如何拒绝失败消息的更多信息,请参见 消息监听器与异步场景。
有关 RabbitMQ 事务及其限制的更多信息,请参阅 RabbitMQ Broker Semantics。
| 在 RabbitMQ 2.7.0 之前,此类消息(以及任何在通道关闭或中止时未被确认的消息)会返回到 Rabbit 代理的队列末尾。</p><p>自 2.7.0 版本起,被拒绝的消息会进入队列头部,其处理方式类似于 JMS 中回滚的消息。 |
之前,事务回滚时消息重新入队的行为在本地事务和提供 TransactionManager 的情况下不一致。在前一种情况下,应用了正常的重新入队逻辑(AmqpRejectAndDontRequeueException 或 defaultRequeueRejected=false)(参见 消息监听器与异步情况)。在使用事务管理器时,无论何种情况,消息均会在回滚时无条件重新入队。从 2.0 版本开始,行为变得一致,两种情况下均应用正常的重新入队逻辑。若要恢复到旧版行为,可将容器的 alwaysRequeueWithTxManagerRollback 属性设置为 true。参见 消息监听器容器配置。 |
使用RabbitTransactionManager
《RabbitTransactionManager》是将Rabbit操作置于外部事务内并与其同步执行的一种替代方案。此事务管理器实现了PlatformTransactionManager接口,并应与单个Rabbit ConnectionFactory配合使用。
| 此策略无法提供 XA 事务——例如,为了在消息传递和数据库访问之间共享事务。 |
应用程序代码需要通过 ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean) 获取事务性 Rabbit 资源,而不是通过标准的 Connection.createChannel() 调用并随后创建通道。当使用 Spring AMQP 的 RabbitTemplate 时,它会自动检测线程绑定的 Channel 并自动参与其事务。
使用 Java 配置,您可以通过以下 Bean 设置一个新的 RabbitTransactionManager:
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory);
}
如果您更倾向于使用 XML 配置,可以在您的 XML 应用上下文文件中声明以下 Bean:
<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
事务同步
将 RabbitMQ 事务与某些其他事务(例如数据库管理系统 DBMS)进行同步,可提供“最佳努力一阶段提交”语义。在事务同步的后完成阶段,RabbitMQ 事务可能无法成功提交。此情况由 spring-tx 基础设施记录为错误日志,但不会向调用代码抛出异常。从版本 2.3.10 开始,您可以在同一处理事务的线程上,在事务提交后调用 ConnectionUtils.checkAfterCompletion()。若未发生异常,则该方法直接返回;否则将抛出一个 AfterCompletionFailedException,其包含一个表示完成同步状态的属性。
通过调用 ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true) 启用此功能;这是一个全局标志,适用于所有线程。