该版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring AMQP 3.2.6! |
交易
Spring Rabbit 框架支持同步和异步用例中的自动事务管理,具有许多不同的语义,可以通过声明方式选择,这是 Spring 事务的现有用户所熟悉的。 这使得许多(如果不是最常见的)消息传递模式易于实现。
有两种方法可以向框架发出所需的事务语义信号。
在这两个RabbitTemplate
和SimpleMessageListenerContainer
,有一个标志channelTransacted
如果true
,告诉框架使用事务通道并通过提交或回滚(取决于结果)结束所有作(发送或接收),但异常表示回滚。
另一个信号是提供一个外部事务,其中一个 Spring 的PlatformTransactionManager
实现作为正在进行的作的上下文。
如果框架在发送或接收消息时已经有正在进行的事务,并且channelTransacted
flag 是true
,则消息传递事务的提交或回滚将延迟到当前事务结束。
如果channelTransacted
flag 是false
,则没有事务语义适用于消息传递作(自动确认)。
这channelTransacted
标志是配置时间设置。
在创建 AMQP 组件时,通常在应用程序启动时,它会声明和处理一次。
原则上,外部事务更加动态,因为系统在运行时响应当前线程状态。
然而,在实践中,当事务以声明方式分层到应用程序时,它通常也是一种配置设置。
对于RabbitTemplate
,外部事务由调用者根据喜好声明式或命令式提供(通常的 Spring 事务模型)。
以下示例显示了一种声明性方法(通常是首选,因为它是非侵入性的),其中模板配置了channelTransacted=true
:
@Transactional
public void doSomething() {
String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}
在前面的示例中,String
有效负载在标记为@Transactional
.
如果数据库处理失败并出现异常,那么传入消息将返回到代理,并且不会发送传出消息。
这适用于使用RabbitTemplate
在事务方法链中(例如,除非Channel
直接纵以提前提交事务)。
对于具有SimpleMessageListenerContainer
,如果需要外部事务,则容器在设置侦听器时必须请求它。
为了发出需要外部事务的信号,用户提供了PlatformTransactionManager
配置时到容器。
以下示例显示了如何执行此作:
@Configuration
public class ExampleExternalTransactionAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
}
在前面的示例中,事务管理器被添加为从另一个 Bean 定义注入的依赖项(未显示),并且channelTransacted
flag 也设置为true
.
其效果是,如果侦听器因异常而失败,则事务将回滚,并且消息也会返回给代理。
值得注意的是,如果事务未能提交(例如,由于
数据库约束错误或连接问题),AMQP 事务也会回滚,并将消息返回给代理。
这有时被称为“尽力而为的 1 阶段提交”,是可靠消息传递的非常强大的模式。
如果channelTransacted
flag 设置为false
(默认值)在前面的示例中,仍将为侦听器提供外部事务,但所有消息传递作都将自动确认,因此即使在业务作回滚时,效果也是提交消息传递作。
条件回滚
在 1.6.6 版之前,将回滚规则添加到容器的transactionAttribute
使用外部事务管理器(如 JDBC)时没有效果。
异常总是回滚事务。
此外,当在容器的通知链中使用事务通知时,条件回滚不是很有用,因为所有侦听器异常都包装在ListenerExecutionFailedException
.
第一个问题已得到纠正,规则现已正确应用。
此外,ListenerFailedRuleBasedTransactionAttribute
现在提供了。
它是RuleBasedTransactionAttribute
,唯一的区别是它知道ListenerExecutionFailedException
并将此类例外的原因用于规则。
此事务属性可以直接在容器中使用,也可以通过事务通知使用。
以下示例使用此规则:
@Bean
public AbstractMessageListenerContainer container() {
...
container.setTransactionManager(transactionManager);
RuleBasedTransactionAttribute transactionAttribute =
new ListenerFailedRuleBasedTransactionAttribute();
transactionAttribute.setRollbackRules(Collections.singletonList(
new NoRollbackRuleAttribute(DontRollBackException.class)));
container.setTransactionAttribute(transactionAttribute);
...
}
关于回滚已接收消息的说明
AMQP 事务仅适用于发送到代理的消息和 ack。
因此,当 Spring 事务回滚并收到消息时,Spring AMQP 不仅必须回滚事务,还必须手动拒绝消息(有点像 nack,但这不是规范所说的)。
对消息拒绝采取的作与事务无关,并且取决于defaultRequeueRejected
属性(默认:true
).
有关拒绝失败消息的详细信息,请参阅消息侦听器和异步情况。
有关 RabbitMQ 事务及其限制的更多信息,请参阅 RabbitMQ 代理语义。
在 RabbitMQ 2.7.0 之前,此类消息(以及通道关闭或中止时未确认的任何消息)会转到 Rabbit 代理的队列后面。 从 2.7.0 开始,被拒绝的消息将转到队列的前面,其方式与 JMS 回滚消息类似。 |
以前,事务回滚时的消息重新排队在本地事务之间不一致,并且当TransactionManager 被提供。
在前一种情况下,正常的重新排队逻辑 (AmqpRejectAndDontRequeueException 或defaultRequeueRejected=false ) 应用(请参阅消息侦听器和异步情况)。
使用事务管理器时,消息在回滚时无条件地重新排队。
从 V2.0 开始,行为是一致的,并且在这两种情况下都应用了正常的重新排队逻辑。
要恢复到以前的行为,您可以将容器的alwaysRequeueWithTxManagerRollback 属性设置为true .
请参阅消息侦听器容器配置。 |
用RabbitTransactionManager
这RabbitTransactionManager
是在外部事务中执行 Rabbit作并与外部事务同步的替代方法。
此事务管理器是PlatformTransactionManager
接口,应该与单个 Rabbit 一起使用ConnectionFactory
.
此策略无法提供 XA 事务 — 例如,为了在消息传递和数据库访问之间共享事务。 |
需要应用程序代码才能通过以下方式检索事务性 Rabbit 资源ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)
而不是标准Connection.createChannel()
调用,并创建后续通道。
使用 Spring AMQP 的RabbitTemplate
,它将自动检测线程绑定的 Channel 并自动参与其事务。
使用 Java 配置,您可以使用以下 bean 设置新的 RabbitTransactionManager:
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory);
}
如果您更喜欢 XML 配置,则可以在 XML Application Context 文件中声明以下 bean:
<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
事务同步
将 RabbitMQ 事务与其他一些(例如 DBMS)事务同步可提供“尽力而为的单阶段提交”语义。
RabbitMQ 事务在事务同步的 after completion 阶段可能会提交失败。
这由spring-tx
infrastructure 作为错误,但不会向调用代码引发异常。
从 2.3.10 版本开始,您可以调用ConnectionUtils.checkAfterCompletion()
在事务在处理事务的同一线程上提交之后。
如果没有发生异常,它将简单地返回;否则它会抛出一个AfterCompletionFailedException
它将具有表示完成的同步状态的属性。
通过调用ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true)
;这是一个全局标志,适用于所有线程。