此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
交易支持
了解消息流中的事务
Spring Integration 公开了多个钩子来满足消息流的事务需求。 为了更好地理解这些钩子以及如何从中受益,我们必须首先重新审视可用于启动消息流的六种机制,并了解如何在每种机制中满足这些流的事务需求。
以下六种机制启动消息流(本手册中提供了每种机制的详细信息):
-
网关代理:基本消息传递网关。
-
消息渠道:直接互动
MessageChannel
方法(例如,channel.send(message)
). -
消息发布者:作为 Spring Bean 上方法调用的副产品启动消息流的方式。
-
入站通道适配器和网关:基于将第三方系统与 Spring Integration 消息传递系统连接而发起消息流的方式(例如
[JmsMessage] → Jms Inbound Adapter[SI Message] → SI Channel
). -
调度程序:根据预配置的调度程序分发的调度事件启动消息流的方法。
-
轮询器:与调度器类似,这是根据预配置轮询器分发的调度或基于间隔的事件启动消息流的方法。
我们可以将这六种机制分为两大类:
-
由用户进程启动的消息流:此类别中的示例场景是调用网关方法或显式发送
Message
设置为MessageChannel
. 换句话说,这些消息流依赖于要启动的第三方进程(例如您编写的某些代码)。 -
由守护进程启动的消息流:此类别中的示例场景包括轮询程序轮询消息队列以使用轮询消息启动新消息流,或者调度程序通过创建新消息并在预定义的时间启动消息流来调度进程。
显然是网关代理,MessageChannel.send(…)
和MessagePublisher
都属于第一类,入站适配器和网关、调度程序和轮询器属于第二类。
那么,如何满足每个类别中各种场景中的事务需求,Spring Integration 是否需要为特定场景的事务提供明确的内容? 或者你可以使用 Spring 的事务支持吗?
Spring 本身为事务管理提供了一流的支持。 因此,我们在这里的目标不是提供新的东西,而是使用 Spring 从其现有的事务支持中受益。 换句话说,作为一个框架,我们必须向 Spring 的事务管理功能公开钩子。 但是,由于 Spring Integration 配置是基于 Spring 配置的,因此我们不必总是公开这些钩子,因为 Spring 已经公开了它们。 毕竟,每个 Spring Integration 组件都是一个 Spring Bean。
考虑到这个目标,我们可以再次考虑两种场景:由用户进程启动的消息流和由守护进程启动的消息流。
由用户进程启动并在 Spring 应用程序上下文中配置的消息流受此类进程的通常事务配置的约束。
因此,Spring Integration不需要显式配置它们来支持事务。
交易可以而且应该通过 Spring 的标准事务支持来启动。
Spring Integration 消息流自然会遵循组件的事务语义,因为它本身是由 Spring 配置的。
例如,网关或服务激活器方法可以使用@Transactional
或TransactionInterceptor
可以在 XML 配置中定义,并使用指向应该是事务性的特定方法的切入点表达式。
最重要的是,在这些方案中,您可以完全控制事务配置和边界。
但是,当涉及到由守护进程启动的消息流时,情况会有所不同。 尽管由开发人员配置,但这些流并不直接涉及人工或其他要启动的过程。 这些是基于触发器的流,由触发器进程(守护进程)根据进程的配置启动。 例如,我们可以让调度程序在每周五晚上启动消息流。 我们还可以配置一个触发器,每秒启动一次消息流,依此类推。 因此,我们需要一种方法让这些基于触发器的进程知道我们的意图,即使生成的消息流成为事务性的,以便在启动新消息流时可以创建事务上下文。 换句话说,我们需要公开一些事务配置,但只能委托给 Spring 已经提供的事务支持(就像我们在其他场景中所做的那样)。
轮询器事务支持
Spring Integration 为轮询器提供事务支持。
轮询器是一种特殊类型的组件,因为在轮询器任务中,我们可以调用receive()
针对本身是事务性的资源,因此包括receive()
调用事务的边界,以便在任务失败时回滚事务。
如果我们要为通道添加相同的支持,则添加的事务将影响所有从send()
叫。
这为事务划分提供了相当广泛的范围,没有任何强有力的理由,尤其是当 Spring 已经提供了多种方法来满足下游任何组件的事务需求时。
但是,receive()
方法包含在事务边界中是轮询器的“强有力理由”。
每当您配置轮询器时,您都可以使用transactional
child 元素及其属性,如以下示例所示:
<int:poller max-messages-per-poll="1" fixed-rate="1000">
<transactional transaction-manager="txManager"
isolation="DEFAULT"
propagation="REQUIRED"
read-only="true"
timeout="1000"/>
</poller>
前面的配置看起来类似于本机 Spring 事务配置。
您仍然必须提供对事务管理器的引用,并指定事务属性或依赖默认值(例如,如果未指定“transaction-manager”属性,则默认为名为“transactionManager”的 bean)。
在内部,该进程被包装在 Spring 的原生事务中,其中TransactionInterceptor
负责处理事务。
有关如何配置事务管理器,事务管理器类型(如JTA,Datasource等)以及与事务配置相关的其他详细信息的更多信息,请参阅Spring Framework参考指南。
使用上述配置,此轮询器启动的所有消息流都是事务性的。 有关轮询程序事务配置的更多信息和详细信息,请参阅轮询和事务。
除了事务之外,在运行轮询器时,您可能还需要解决更多跨领域问题。
为了帮助解决这个问题,poller 元素接受<advice-chain>
child 元素,它允许您定义要应用于轮询器的通知实例的自定义链。
(有关更多详细信息,请参阅可轮询消息源。
在 Spring Integration 2.0 中,Poller 经历了重构工作,现在使用代理机制来解决事务问题以及其他跨领域问题。
从这项工作中演变的重大变化之一是,我们使<transactional>
和<advice-chain>
元素是互斥的。
这背后的基本原理是,如果您需要多个建议,其中一个是交易建议,您可以将其包含在<advice-chain>
与以前一样方便,但控制力更强,因为您现在可以选择按所需顺序放置建议。
以下示例显示了如何执行此作:
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<advice-chain>
<ref bean="txAdvice"/>
<ref bean="someOtherAdviceBean" />
<beans:bean class="foo.bar.SampleAdvice"/>
</advice-chain>
</poller>
<tx:advice id="txAdvice" transaction-manager="txManager">
<tx:attributes>
<tx:method name="get*" read-only="true"/>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>
前面的示例显示了基于 XML 的基本 Spring Transaction 通知 (txAdvice
) 并将其包含在<advice-chain>
由轮询器定义。
如果您只需要解决轮询器的事务问题,您仍然可以使用<transactional>
元素作为方便。
交易边界
另一个重要因素是消息流中事务的边界。 启动事务时,事务上下文绑定到当前线程。 因此,无论消息流中有多少个端点和通道,只要您确保流在同一线程上继续,您的事务上下文就会被保留。 一旦你通过引入可轮询通道或执行器通道来破坏它,或者在某些服务中手动启动一个新线程,事务边界也会被打破。 从本质上讲,事务将在那里结束,如果线程之间发生了成功的切换,则流将被视为成功,并且即使流将继续并且仍可能导致下游某处出现异常,也会发送 COMMIT 信号。 如果这样的流是同步的,那么该异常可能会被抛回消息流的发起者,而消息流的发起者也是事务上下文的发起者,并且该事务将导致 ROLLBACK。 中间立场是在线程边界被破坏的任何点使用事务通道。 例如,您可以使用委托给事务性 MessageStore 策略的队列支持的通道,也可以使用 JMS 支持的通道。
事务同步
在某些环境中,它有助于将作与包含整个流的事务同步。
例如,考虑一个<file:inbound-channel-adapter/>
在执行多个数据库更新的流开始时。
如果事务提交,我们可能希望将文件移动到success
目录,而我们可能希望将其移动到failure
如果事务回滚,则目录。
Spring Integration 2.2 引入了将这些作与事务同步的功能。
此外,您可以配置PseudoTransactionManager
如果您没有“真实”事务,但仍想在成功或失败时执行不同的作。
有关详细信息,请参阅伪事务。
以下列表显示了此功能的关键策略接口:
public interface TransactionSynchronizationFactory {
TransactionSynchronization create(Object key);
}
public interface TransactionSynchronizationProcessor {
void processBeforeCommit(IntegrationResourceHolder holder);
void processAfterCommit(IntegrationResourceHolder holder);
void processAfterRollback(IntegrationResourceHolder holder);
}
工厂负责创建一个TransactionSynchronization
对象。
您可以实现自己的框架或使用框架提供的框架:DefaultTransactionSynchronizationFactory
.
此实现返回一个TransactionSynchronization
委托给默认实现的TransactionSynchronizationProcessor
:ExpressionEvaluatingTransactionSynchronizationProcessor
.
该处理器支持三种 SpEL 表达式:beforeCommitExpression
,afterCommitExpression
和afterRollbackExpression
.
这些行动对于熟悉交易的人来说应该是不言自明的。
在每种情况下,#root
变量是原始的Message
.
在某些情况下,其他 SpEL 变量可用,具体取决于MessageSource
由轮询者进行轮询。
例如,MongoDbMessageSource
提供#mongoTemplate
变量,它引用消息源的MongoTemplate
.
同样,RedisStoreMessageSource
提供#store
变量,它引用了RedisStore
由民意调查创建。
要为特定轮询器启用该功能,您可以提供对TransactionSynchronizationFactory
在轮询器的<transactional/>
元素使用synchronization-factory
属性。
从 5.0 版开始,Spring Integration 提供了PassThroughTransactionSynchronizationFactory
,默认情况下,当没有TransactionSynchronizationFactory
已配置,但类型为TransactionInterceptor
存在于建议链中。
使用任何开箱即用的TransactionSynchronizationFactory
实施时,轮询端点会将轮询的消息绑定到当前事务上下文,并将其作为failedMessage
在MessagingException
如果在事务通知之后抛出异常。
使用未实现的自定义事务通知时TransactionInterceptor
,您可以显式配置PassThroughTransactionSynchronizationFactory
以实现此行为。
无论哪种情况,都将MessagingException
成为ErrorMessage
发送到errorChannel
,原因是通知抛出的原始异常。
以前,ErrorMessage
的有效负载是建议引发的原始异常,并且没有提供对failedMessage
信息,因此很难确定事务提交问题的原因。
为了简化这些组件的配置,Spring Integration为默认工厂提供了命名空间支持。 以下示例演示如何使用命名空间配置文件入站通道适配器:
<int-file:inbound-channel-adapter id="inputDirPoller"
channel="someChannel"
directory="/foo/bar"
filter="filter"
comparator="testComparator">
<int:poller fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
</int:poller>
</int-file:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
channel="committedChannel" />
<int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
channel="rolledBackChannel" />
</int:transaction-synchronization-factory>
SpEL 评估的结果作为有效负载发送到committedChannel
或rolledBackChannel
(在本例中,这将是Boolean.TRUE
或Boolean.FALSE
— 的结果java.io.File.renameTo()
方法调用)。
如果您希望发送整个有效负载以进行进一步的 Spring Integration 处理,请使用 'payload' 表达式。
重要的是要了解这会将作与事务同步。 它不会使本质上不是事务性的资源实际上是事务性的。 相反,事务(无论是 JDBC 还是其他)在轮询之前启动,并在流完成时提交或回滚,然后是同步作。 如果您提供自定义 |
除了after-commit
和after-rollback
表达 式before-commit
也受支持。
在这种情况下,如果评估(或下游处理)抛出异常,则事务将回滚而不是提交。
伪事务
在阅读了 Transaction Synchronization 部分之后,您可能认为在流完成时执行这些“成功”或“失败”作会很有用,即使轮询器下游没有“真正的”事务资源(例如 JDBC)。 例如,考虑“<file:inbound-channel-adapter/>”后跟“<ftp:outbout-channel-adapter/>”。 这两个组件都不是事务性的,但我们可能希望根据 FTP 传输的成功或失败将输入文件移动到不同的目录。
为了提供此功能,框架提供了一个PseudoTransactionManager
,即使不涉及实际事务资源,也可以启用上述配置。
如果流程正常完成,则beforeCommit
和afterCommit
同步被调用。
失败时,afterRollback
同步被调用。
因为它不是真正的事务,所以不会发生实际的提交或回滚。
伪事务是用于启用同步功能的工具。
要使用PseudoTransactionManager
,您可以将其定义为 <bean/>,就像配置真正的事务管理器一样。
以下示例显示了如何执行此作:
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />
响应式事务
从 5.3 版开始,一个ReactiveTransactionManager
也可以与TransactionInterceptor
返回响应式类型的端点的建议。
这包括MessageSource
和ReactiveMessageHandler
实现(例如ReactiveMongoDbMessageSource
),它会生成一条带有Flux
或Mono
有效载荷。
所有其他回复生成消息处理程序的实现都可以依赖于ReactiveTransactionManager
当他们的回复有效负载也是某种反应型时。