|
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.0! |
事务支持
从版本 5.0 开始,新的TransactionHandleMessageAdvice的引入使整个下游流具有事务性,这要归功于HandleMessageAdvice实现。
当常规TransactionInterceptor用于<request-handler-advice-chain>元素(例如,通过配置<tx:advice>),则启动的事务仅适用于内部AbstractReplyProducingMessageHandler.handleRequestMessage(),并且不会传播到下游流。
为了简化 XML 配置,除了<request-handler-advice-chain>一个<transactional>元素已添加到所有<outbound-gateway>和<service-activator>和相关组件。
以下示例显示了<transactional>使用中:
<int-jdbc:outbound-gateway query="select * from things where id=:headers[id]">
<int-jdbc:transactional/>
</int-jdbc:outbound-gateway>
<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>
Java 配置可以通过使用TransactionInterceptorBuilder,结果的 Bean 名称可以在消息传递注释中使用 adviceChain属性,如下例所示:
@Bean
public ConcurrentMetadataStore store() {
return new SimpleMetadataStore(hazelcastInstance()
.getMap("idempotentReceiverMetadataStore"));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(
new MetadataStoreSelector(
message -> message.getPayload().toString(),
message -> message.getPayload().toString().toUpperCase(), store()));
}
@Bean
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder(true)
.transactionManager(this.transactionManager)
.isolation(Isolation.READ_COMMITTED)
.propagation(Propagation.REQUIRES_NEW)
.build();
}
@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
outputChannel = "output",
adviceChain = { "idempotentReceiverInterceptor",
"transactionInterceptor" })
public Transformer transformer() {
return message -> message;
}
请注意true参数TransactionInterceptorBuilder构造 函数。
它会导致创建一个TransactionHandleMessageAdvice,不是常规的TransactionInterceptor.
Java DSL 支持Advice通过.transactional()选项,如下例所示:
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("persistResults"));
}