JPA 支持
JPA 支持
Spring Integration 的 JPA(Java Persistence API)模块提供了用于使用 JPA 执行各种数据库操作的组件。
您需要将以下依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jpa</artifactId>
<version>6.0.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-jpa:6.0.9"
JPA API 必须通过某些特定提供商的实现来包含,例如 Hibernate ORM 框架。
以下组件已提供:
这些组件可用于通过向目标数据库发送和接收消息,对它们执行select、create、update和delete操作。
JPA 入站通道适配器允许您使用 JPA 从数据库轮询并检索 (select) 数据,而 JPA 出站通道适配器则允许您创建、更新和删除实体。
您可以使用 JPA 的出站网关将实体持久化到数据库,从而让您继续流程并执行下游的更多组件。 同样,您也可以使用出站网关从数据库中检索实体。
例如,您可以使用出站网关,该网关在其请求通道上接收带有 Message 和 userId 作为有效负载的消息,以查询数据库、检索用户实体,并将其传递给下游进行进一步处理。
认识到这些语义差异,Spring Integration 提供了两个独立的 JPA 出站网关:
-
检索出站网关
-
更新出站网关
功能
所有 JPA 组件均通过以下任一方式执行其相应的 JPA 操作:
-
实体类
-
用于更新、选择和删除的 Java 持久化查询语言 (JPQL)(JPQL 不支持插入)
-
原生查询
-
命名查询
以下各节将更详细地描述这些组件。
Java 实现
提供的每个组件都使用了 o.s.i.jpa.core.JpaExecutor 类,该类反过来又实现了 o.s.i.jpa.core.JpaOperations 接口。
JpaOperations 像一个典型的数据访问对象(DAO),提供了 find、persist、executeUpdate 等方法。
对于大多数用例,默认实现(o.s.i.jpa.core.DefaultJpaOperations)应该已经足够。
然而,如果您需要自定义行为,可以指定您自己的实现。
要初始化 JpaExecutor,您必须使用接受以下之一的构造函数:
-
EntityManagerFactory
-
实体管理器
-
JpaOperations
下面的示例展示了如何使用 entityManagerFactory 初始化 JpaExecutor 并在出站网关中使用它:
@Bean
public JpaExecutor jpaExecutor() {
JpaExecutor executor = new JpaExecutor(this.entityManagerFactory);
executor.setJpaParameters(Collections.singletonList(new JpaParameter("firstName", null, "#this")));
executor.setUsePayloadAsParameterSource(true);
executor.setExpectSingleResult(true);
return executor;
}
@ServiceActivator(inputChannel = "getEntityChannel")
@Bean
public MessageHandler retrievingJpaGateway() {
JpaOutboundGateway gateway = new JpaOutboundGateway(jpaExecutor());
gateway.setGatewayType(OutboundGatewayType.RETRIEVING);
gateway.setOutputChannelName("resultsChannel");
return gateway;
}
命名空间支持
当使用 XML 命名空间支持时,底层的解析器类会为您实例化相关的 Java 类。 因此,您通常无需处理 JPA 适配器的内部工作原理。 本节记录了 Spring Integration 提供的 XML 命名空间支持,并展示了如何使用该命名空间支持来配置 JPA 组件。
通用 XML 命名空间配置属性
某些配置参数被所有 JPA 组件共享:
auto-startup-
指示该组件是否在应用上下文启动期间启动的生命周期属性。 默认为
true。 可选。 id-
标识底层的 Spring bean 定义,它是
EventDrivenConsumer或PollingConsumer的实例。 可选。 entity-manager-factory-
适配器用于创建
EntityManager的JPA实体管理器工厂的引用。 您必须提供此属性、entity-manager属性或jpa-operations属性。 entity-manager-
组件所使用的 JPA 实体管理器的引用。 您必须提供此属性、
entity-manager-factory属性或jpa-operations属性。通常,您的 Spring 应用上下文仅定义一个 JPA 实体管理器工厂,并且 EntityManager是通过使用@PersistenceContext注解注入的。 这种方法不适用于 Spring Integration 的 JPA 组件。 通常,注入 JPA 实体管理器工厂是最佳选择,但是,当您想要显式注入一个EntityManager时,您必须定义一个SharedEntityManagerBean。 更多信息,请参阅相关的 Javadoc。下面的示例展示了如何显式地包含一个实体管理器工厂:
<bean id="entityManager" class="org.springframework.orm.jpa.support.SharedEntityManagerBean"> <property name="entityManagerFactory" ref="entityManagerFactoryBean" /> </bean> jpa-operations-
对实现
JpaOperations接口的 bean 的引用。 在极少数情况下,可能建议您提供您自己的JpaOperations接口实现,而不是依赖默认实现(org.springframework.integration.jpa.core.DefaultJpaOperations)。 如果您使用jpa-operations属性,则不能提供 JPA 实体管理器或 JPA 实体管理器工厂,因为JpaOperations会包装必要的数据源。 entity-class-
实体类的完全限定名。 此属性的确切语义取决于我们是在执行
persist还是update操作,或者是否是从数据库中检索对象。检索数据时,您可以指定
entity-class属性,表示希望从数据库中检索该类型的对象。 在这种情况下,您不得定义任何查询属性(jpa-query、native-query或named-query)。当持久化数据时,
entity-class属性指示要持久化的对象类型。 如果未指定(针对持久化操作),则实体类会自动从消息的负载中检索。 jpa-query-
定义要使用的 JPA 查询(Java 持久化查询语言)。
native-query-
定义要使用的原生 SQL 查询。
named-query-
引用命名查询。 命名查询可以使用原生 SQL 或 JPAQL 定义,但底层的 JPA 持久化提供者会在内部处理这种区别。
提供 JPA 查询参数
要提供参数,您可以使用 parameter XML 元素。
它具有一种机制,允许您为基于 Java Persistence Query Language (JPQL) 或原生 SQL 查询的查询提供参数。
您还可以为命名查询提供参数。
- 基于表达式的参数
-
以下示例展示了如何设置基于表达式的参数:
<int-jpa:parameter expression="payload.name" name="firstName"/> - 基于值的参数
-
以下示例展示了如何设置一个基于值的参数:
<int-jpa:parameter name="name" type="java.lang.String" value="myName"/> - 位置参数
-
以下示例展示了如何设置基于表达式的参数:
<int-jpa:parameter expression="payload.name"/> <int-jpa:parameter type="java.lang.Integer" value="21"/>
事务处理
所有 JPA 操作(例如 INSERT、UPDATE 和 DELETE)在执行时都需要一个活动的事务。
对于入站通道适配器,您无需进行任何特殊配置。
其工作方式与我们为配合其他入站通道适配器使用的轮询器配置事务管理器的方式类似。
以下 XML 示例配置了一个使用轮询器和入站通道适配器的事务管理器:
<int-jpa:inbound-channel-adapter
channel="inboundChannelAdapterOne"
entity-manager="em"
auto-startup="true"
jpa-query="select s from Student s"
expect-single-result="true"
delete-after-poll="true">
<int:poller fixed-rate="2000" >
<int:transactional propagation="REQUIRED"
transaction-manager="transactionManager"/>
</int:poller>
</int-jpa:inbound-channel-adapter>
然而,在使用出站通道适配器或网关时,您可能需要显式地启动一个事务。
如果 DirectChannel 是出站适配器或网关的输入通道,并且当前执行线程中存在活动事务,则 JPA 操作将在相同的事务上下文中执行。
您也可以配置该 JPA 操作作为一个新事务运行,如下例所示:
<int-jpa:outbound-gateway
request-channel="namedQueryRequestChannel"
reply-channel="namedQueryResponseChannel"
named-query="updateStudentByRollNumber"
entity-manager="em"
gateway-type="UPDATING">
<int-jpa:parameter name="lastName" expression="payload"/>
<int-jpa:parameter name="rollNumber" expression="headers['rollNumber']"/>
<int-jpa:transactional propagation="REQUIRES_NEW"
transaction-manager="transactionManager"/>
</int-jpa:outbound-gateway>
在上面的示例中,出站网关或适配器的事务元素指定了事务属性。
如果您将 DirectChannel 作为适配器的输入通道,并希望适配器在与调用者相同的事务上下文中执行操作,则定义此子元素是可选的。
然而,如果您使用 ExecutorChannel,则必须拥有 transactional 元素,因为调用方的事务上下文不会被传播。
与在 Spring Integration 命名空间中定义的轮询器的 transactional 元素不同,出站网关或适配器的 transactional 元素是在 JPA 命名空间中定义的。 |
入站通道适配器
入站通道适配器用于使用 JPA QL 执行数据库查询并返回结果。
消息负载要么是单个实体,要么是一组 List 个实体。
以下 XML 配置了一个 inbound-channel-adapter:
<int-jpa:inbound-channel-adapter channel="inboundChannelAdapterOne" (1)
entity-manager="em" (2)
auto-startup="true" (3)
query="select s from Student s" (4)
expect-single-result="true" (5)
max-results="" (6)
max-results-expression="" (7)
delete-after-poll="true" (8)
flush-after-delete="true"> (9)
<int:poller fixed-rate="2000" >
<int:transactional propagation="REQUIRED" transaction-manager="transactionManager"/>
</int:poller>
</int-jpa:inbound-channel-adapter>
| 1 | 在 JPA QL 执行后,inbound-channel-adapter 将消息(包含负载)放入的通道,该通道由 query 属性指定。 |
| 2 | The EntityManager 实例用于执行所需的 JPA 操作。 |
| 3 | 属性,用于指示组件是否应在应用程序上下文启动时自动启动。
该值的默认值为 true。 |
| 4 | 发送为消息负载的 JPA QL |
| 5 | 此属性指示 JPQL 查询是否在结果中返回单个实体或 List 个实体。
如果值设置为 true,则单个实体将作为消息的有效负载发送。
然而,如果将此设置为 true 后返回多个结果,则会抛出 MessagingException。
该值的默认值为 false。 |
| 6 | 此非零、非负整数值指示适配器在执行选择操作时不要选择超过给定数量的行。
默认情况下,如果未设置此属性,查询将选择所有可能的记录。
此属性与 max-results-expression 互斥。
可选。 |
| 7 | 用于在结果集中查找最大结果数量的表达式。
与 max-results 互斥。
可选。 |
| 8 | 如果您希望在查询执行后删除接收到的行,请将此值设置为true。
您必须确保该组件作为事务的一部分运行。
否则,您可能会遇到类似以下的异常:java.lang.IllegalArgumentException: Removing a detached instance … |
| 9 | 如果您希望在删除接收到的实体后立即刷新持久化上下文,并且不希望依赖flushMode的EntityManager,请将此值设置为true。
该值的默认值为false。 |
配置参数参考
以下列表显示了可以为 inbound-channel-adapter 设置的所有值:
<int-jpa:inbound-channel-adapter
auto-startup="true" (1)
channel="" (2)
delete-after-poll="false" (3)
delete-per-row="false" (4)
entity-class="" (5)
entity-manager="" (6)
entity-manager-factory="" (7)
expect-single-result="false" (8)
id=""
jpa-operations="" (9)
jpa-query="" (10)
named-query="" (11)
native-query="" (12)
parameter-source="" (13)
send-timeout=""> (14)
<int:poller ref="myPoller"/>
</int-jpa:inbound-channel-adapter>
| 1 | 此生命周期属性指示该组件是否应在应用程序上下文启动时自动启动。
此属性的默认值为true。
可选。 |
| 2 | 适配器发送带有负载的消息以执行所需 JPA 操作的目标通道。 |
| 3 | 一个布尔标志,用于指示适配器轮询后是否删除选定的记录。
默认值为false(即不删除记录)。
必须确保该组件在事务中运行。
否则,可能会遇到异常,例如:java.lang.IllegalArgumentException: Removing a detached instance …。
可选。 |
| 4 | 一个布尔标志,用于指示记录是否可以批量删除,还是必须逐条删除。
默认情况下,该值为 false(即允许批量删除记录)。
可选参数。 |
| 5 | 要查询的实体类的完全限定名。 适配器将根据实体类名称自动构建 JPA 查询。 可选。 |
| 6 | 一个用于执行 JPA 操作的 jakarta.persistence.EntityManager 实例。
可选。 |
| 7 | 一个用于获取执行 JPA 操作的 jakarta.persistence.EntityManager 实例的 jakarta.persistence.EntityManagerFactory 实例。
可选。 |
| 8 | 一个布尔标志,指示该选择操作是否预期返回单个结果或List个结果。
如果此标志设置为true,则所选的单个实体将作为消息的有效载荷发送。
如果返回多个实体,则将抛出异常。
如果为false,则List个实体将作为消息的有效载荷发送。
该值默认为false。
可选。 |
| 9 | 对 org.springframework.integration.jpa.core.JpaOperations 的实现,用于执行 JPA 操作。
我们建议不要提供您自己的实现,而是使用默认的 org.springframework.integration.jpa.core.DefaultJpaOperations 实现。
您可以使用 entity-manager、entity-manager-factory 或 jpa-operations 中的任意一个属性。
可选的。 |
| 10 | 此适配器要执行的 JPA QL。 可选。 |
| 11 | 此适配器需要执行的命名查询。 可选。 |
| 12 | 此适配器执行的本地查询。
您可以使用任意 jpa-query、named-query、entity-class 或 native-query 属性。
可选。 |
| 13 | 一个用于解析查询中参数值的o.s.i.jpa.support.parametersource.ParameterSource实现。
如果entity-class属性具有值,则会被忽略。
可选。 |
| 14 | 发送消息到通道时等待的最大时间(毫秒)。 可选。 |
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置入站适配器的示例:
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public JpaExecutor jpaExecutor() {
JpaExecutor executor = new JpaExecutor(this.entityManagerFactory);
jpaExecutor.setJpaQuery("from Student");
return executor;
}
@Bean
@InboundChannelAdapter(channel = "jpaInputChannel",
poller = @Poller(fixedDelay = "${poller.interval}"))
public MessageSource<?> jpaInbound() {
return new JpaPollingChannelAdapter(jpaExecutor());
}
@Bean
@ServiceActivator(inputChannel = "jpaInputChannel")
public MessageHandler handler() {
return message -> System.out.println(message.getPayload());
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序示例展示了如何使用 Java DSL 配置入站适配器:
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public IntegrationFlow pollingAdapterFlow() {
return IntegrationFlow
.from(Jpa.inboundAdapter(this.entityManagerFactory)
.entityClass(StudentDomain.class)
.maxResults(1)
.expectSingleResult(true),
e -> e.poller(p -> p.trigger(new OnlyOnceTrigger())))
.channel(c -> c.queue("pollingResults"))
.get();
}
}
出站通道适配器
JPA 出站通道适配器允许您通过请求通道接受消息。 负载(payload)可用作要持久化的实体,也可在参数表达式中与标头一起用于 JPQL 查询。 以下章节将介绍执行这些操作的各种可能方式。
使用实体类
以下 XML 配置了出站通道适配器,用于将实体持久化到数据库:
<int-jpa:outbound-channel-adapter channel="entityTypeChannel" (1)
entity-class="org.springframework.integration.jpa.test.entity.Student" (2)
persist-mode="PERSIST" (3)
entity-manager="em"/ > (4)
| 1 | 将有效的 JPA 实体发送到 JPA 出站通道适配器的通道。 |
| 2 | 适配器用于持久化到数据库所接受的实体类的完全限定名。 在大多数情况下,您可以省略此属性,因为适配器可以从 Spring Integration 消息负载中自动确定实体类。 |
| 3 | 适配器要执行的操作。
有效值为 PERSIST、MERGE 和 DELETE。
默认值为 MERGE。 |
| 4 | 要使用的 JPA 实体管理器。 |
这四个属性配置了outbound-channel-adapter以从输入通道接受实体,并将它们处理为来自底层数据源的PERSIST、MERGE或DELETE实体。
自 Spring Integration 3.0 起,类型为 PERSIST 或 MERGE 的有效负载也可以是类型 java.lang.Iterable。
在这种情况下,Iterable 返回的每个对象都被视为实体,并使用底层 EntityManager 进行持久化或合并。
迭代器返回的空值将被忽略。 |
从版本 5.5.4 开始,配置了 PersistMode.DELETE 的 JpaExecutor 可以接受 Iterable 负载,以对提供的实体执行批量持久化删除操作。 |
使用 JPA 查询语言 (JPA QL)
The previous section showed how to perform a PERSIST action by using an entity.
This section shows how to use an outbound channel adapter with JPA QL.
以下 XML 配置了出站通道适配器,用于将实体持久化到数据库:
<int-jpa:outbound-channel-adapter channel="jpaQlChannel" (1)
jpa-query="update Student s set s.firstName = :firstName where s.rollNumber = :rollNumber" (2)
entity-manager="em"> (3)
<int-jpa:parameter name="firstName" expression="payload['firstName']"/> (4)
<int-jpa:parameter name="rollNumber" expression="payload['rollNumber']"/>
</int-jpa:outbound-channel-adapter>
| 1 | 消息发送到的输出通道适配器的输入通道。 |
| 2 | 要执行的 JPA QL。
此查询可能包含参数,这些参数通过使用 parameter 元素进行求值。 |
| 3 | 适配器用于执行 JPA 操作所使用的实体管理器。 |
| 4 | 用于定义 JPA QL(在 query 属性中指定)参数名称的参数值元素(每个参数一个)。 |
The parameter元素接受一个属性,其name对应于提供的JPA QL中指定的命名参数(前一个示例中的第2点)。
该参数的值可以是静态的,也可以通过使用表达式推导得出。
用于推导值的静态值和表达式分别通过value和expression属性指定。
这些属性是互斥的。
如果指定了 value 属性,您可以提供一个可选的 type 属性。
该属性的值是 value 属性所表示值的类的完全限定名。
默认情况下,类型假定为 java.lang.String。
以下示例展示了如何定义 JPA 参数:
<int-jpa:outbound-channel-adapter ...
>
<int-jpa:parameter name="level" value="2" type="java.lang.Integer"/>
<int-jpa:parameter name="name" expression="payload['name']"/>
</int-jpa:outbound-channel-adapter>
正如前面的示例所示,您可以在出站通道适配器元素中使用多个 parameter 元素,并通过表达式定义部分参数,同时使用静态值定义其他参数。
然而,请注意不要多次指定相同的参数名称。
您应为 JPA 查询中指定的每个命名参数提供一个 parameter 元素。
例如,我们指定了两个参数:level 和 name。
level 属性是类型为 java.lang.Integer 的静态值,而 name 属性则源自消息的有效负载(payload)。
虽然指定 select 对于 JPA QL 是有效的,但这样做没有意义。
出站通道适配器不会返回任何结果。
如果您想要选择某些值,请考虑使用出站网关代替。 |
使用原生查询
本节介绍如何使用原生查询配合 JPA 出站通道适配器执行操作。 使用原生查询与使用 JPA QL 类似,不同之处在于查询是原生的数据库查询。 通过使用原生查询,我们将失去使用 JPA QL 时获得的数据库厂商独立性。
通过使用原生查询,我们可以实现数据库插入操作,而这是 JPA QL 无法做到的。 (要执行插入操作,我们将 JPA 实体发送到通道适配器,如前文所述)。 下面是一个小的 XML 片段,演示了如何使用原生查询向表中插入值。
| 命名参数可能不被您的 JPA 提供程序在与原生 SQL 查询结合时支持。 虽然它们在 Hibernate 中工作正常,但 OpenJPA 和 EclipseLink 不支持它们。 请参阅 https://issues.apache.org/jira/browse/OPENJPA-111。 JPA 2.0 规范的 3.8.12 节指出:“只有位置参数绑定和对结果项的位置访问才能在原生查询中可移植地使用。” |
以下示例配置了一个使用原生查询的 outbound-channel-adapter:
<int-jpa:outbound-channel-adapter channel="nativeQlChannel"
native-query="insert into STUDENT_TABLE(FIRST_NAME,LAST_UPDATED) values (:lastName,:lastUpdated)" (1)
entity-manager="em">
<int-jpa:parameter name="lastName" expression="payload['updatedLastName']"/>
<int-jpa:parameter name="lastUpdated" expression="new java.util.Date()"/>
</int-jpa:outbound-channel-adapter>
| 1 | 此出站通道适配器执行的本地查询。 |
请注意,其他属性(如 channel 和 entity-manager)以及 parameter 元素具有与 JPA QL 相同的语义。
使用命名查询
使用命名查询类似于使用 JPA QL 或 原生查询,区别在于我们指定的是命名查询而非具体查询。
首先,我们将介绍如何定义 JPA 命名查询。
接着,我们将介绍如何声明一个出站通道适配器以配合命名查询使用。
如果我们有一个名为 Student 的实体,可以在 Student 类上使用注解来定义两个命名查询:selectStudent 和 updateStudent。
以下示例展示了如何实现:
@Entity
@Table(name="Student")
@NamedQueries({
@NamedQuery(name="selectStudent",
query="select s from Student s where s.lastName = 'Last One'"),
@NamedQuery(name="updateStudent",
query="update Student s set s.lastName = :lastName,
lastUpdated = :lastUpdated where s.id in (select max(a.id) from Student a)")
})
public class Student {
...
}
或者,您可以使用 orm.xml 来定义命名查询,如下例所示:
<entity-mappings ...>
...
<named-query name="selectStudent">
<query>select s from Student s where s.lastName = 'Last One'</query>
</named-query>
</entity-mappings>
现在我们已经展示了如何使用注解或使用orm.xml来定义命名查询,接下来我们将展示一个小的 XML 片段,该片段通过命名查询定义了一个outbound-channel-adapter,如下例所示:
<int-jpa:outbound-channel-adapter channel="namedQueryChannel"
named-query="updateStudent" (1)
entity-manager="em">
<int-jpa:parameter name="lastName" expression="payload['updatedLastName']"/>
<int-jpa:parameter name="lastUpdated" expression="new java.util.Date()"/>
</int-jpa:outbound-channel-adapter>
| 1 | 当适配器通过通道收到消息时,我们希望它执行的命名查询。 |
配置参数参考
以下代码列出了您可以在出站通道适配器上设置的所有属性:
<int-jpa:outbound-channel-adapter
auto-startup="true" (1)
channel="" (2)
entity-class="" (3)
entity-manager="" (4)
entity-manager-factory="" (5)
id=""
jpa-operations="" (6)
jpa-query="" (7)
named-query="" (8)
native-query="" (9)
order="" (10)
parameter-source-factory="" (11)
persist-mode="MERGE" (12)
flush="true" (13)
flush-size="10" (14)
clear-on-flush="true" (15)
use-payload-as-parameter-source="true" (16)
<int:poller/>
<int-jpa:transactional/> (17)
<int-jpa:parameter/> (18)
</int-jpa:outbound-channel-adapter>
| 1 | 生命周期属性,用于指示该组件是否应在应用上下文启动时启动。
默认值为 true。
可选。 |
| 2 | 发送适配器接收用于执行所需操作的消息的通道。 |
| 3 | JPA 操作的实体类的完全限定名。
entity-class、query和named-query属性是互斥的。
可选。 |
| 4 | 一个用于执行 JPA 操作的 jakarta.persistence.EntityManager 实例。
可选。 |
| 5 | 一个 jakarta.persistence.EntityManagerFactory 的实例用于获取 jakarta.persistence.EntityManager 的实例,后者执行 JPA 操作。
可选。 |
| 6 | 用于执行 JPA 操作的 org.springframework.integration.jpa.core.JpaOperations 实现。
我们建议您不要提供自己的实现,而是使用默认的 org.springframework.integration.jpa.core.DefaultJpaOperations 实现。
您可以使用 entity-manager、entity-manager-factory 或 jpa-operations 属性中的任意一个。
可选。 |
| 7 | 此适配器要执行的 JPA QL。 可选。 |
| 8 | 此适配器需要执行的命名查询。 可选。 |
| 9 | 此适配器执行的本地查询。
您可以使用 jpa-query、named-query 或 native-query 属性中的任意一个。
可选。 |
| 10 | 当注册了多个消费者时,此消费者的顺序用于管理负载均衡和故障转移。
默认为 Ordered.LOWEST_PRECEDENCE。
可选。 |
| 11 | 一个 o.s.i.jpa.support.parametersource.ParameterSourceFactory 的实例用于获取 o.s.i.jpa.support.parametersource.ParameterSource 的实例,该实例用于解析查询中的参数值。
如果使用 JPA 实体执行操作,则忽略此配置。
parameter 子元素与 parameter-source-factory 属性互斥,必须在提供的 ParameterSourceFactory 上进行配置。
可选。 |
| 12 | 接受以下之一:PERSIST、MERGE或DELETE。
表示适配器需要执行的操作。
仅在使用实体进行 JPA 操作时相关。
如果提供了 JPA QL、命名查询或原生查询,则此选项将被忽略。
默认值为MERGE。
可选参数。
自 Spring Integration 3.0 起,用于持久化或合并的负载也可以是java.lang.Iterable类型。
在这种情况下,Iterable返回的每个对象都被视为实体,并使用底层的EntityManager进行持久化或合并。
迭代器返回的 null 值将被忽略。 |
| 13 | 如果您希望在持久化、合并或删除操作后立即刷新持久化上下文,并且不希望依赖flushMode的EntityManager,请将此值设置为true。
它默认为false。
仅当您未指定flush-size属性时适用。
如果将此属性设置为true,且未配置其他值,则flush-size将隐式设置为1。 |
| 14 | 如果您希望在持久化、合并或删除操作后立即刷新持久化上下文,并且不希望依赖flushMode的EntityManager,请将此属性设置为大于'0'的值。
默认值设置为0,这意味着"不刷新"。
此属性针对具有Iterable负载的消息。
例如,如果将flush-size设置为3,那么每处理第三个实体后就会调用entityManager.flush()。
此外,在循环结束后还会再次调用entityManager.flush()。
如果已指定'flush-size'属性且其值大于'0',则无需配置flush属性。 |
| 15 | 如果您希望在每次刷新操作后立即清除持久化上下文,请将此值设置为'true'。
该属性的值仅在flush属性设置为true或flush-size属性设置为大于0的值时才会生效。 |
| 16 | 如果设置为true,则消息的有效负载用作参数的来源。
然而,如果设置为false,整个Message将作为参数的来源可用。
可选。 |
| 17 | 定义事务管理属性以及JPA适配器要使用的事务管理器引用。 可选。 |
| 18 | 一个或多个 parameter 属性——每个在查询中使用的参数对应一个。
该值或表达式被求值以计算参数的值。
可选。 |
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置出站适配器:
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
@IntegrationComponentScan
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@MessagingGateway
interface JpaGateway {
@Gateway(requestChannel = "jpaPersistChannel")
@Transactional
void persistStudent(StudentDomain payload);
}
@Bean
public JpaExecutor jpaExecutor() {
JpaExecutor executor = new JpaExecutor(this.entityManagerFactory);
jpaExecutor.setEntityClass(StudentDomain.class);
jpaExecutor.setPersistMode(PersistMode.PERSIST);
return executor;
}
@Bean
@ServiceActivator(channel = "jpaPersistChannel")
public MessageHandler jpaOutbound() {
JpaOutboundGateway adapter = new JpaOutboundGateway(jpaExecutor());
adapter.setProducesReply(false);
return adapter;
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置出站适配器:
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public IntegrationFlow outboundAdapterFlow() {
return f -> f
.handle(Jpa.outboundAdapter(this.entityManagerFactory)
.entityClass(StudentDomain.class)
.persistMode(PersistMode.PERSIST),
e -> e.transactional());
}
}
出站网关
JPA 入站通道适配器允许您轮询数据库以检索一个或多个 JPA 实体。 检索到的数据随后用于启动 Spring Integration 流程,该流程将检索到的数据作为消息负载使用。
此外,您可以在流程末尾使用 JPA 出站通道适配器来持久化数据,本质上是在持久化操作结束时停止流程。
然而,您如何在流程中间执行 JPA 持久化操作呢?例如,您可能正在 Spring Integration 消息流中处理业务数据并希望将其持久化,但同时仍需要在下游使用其他组件。 或者,您不需要使用轮询器轮询数据库,而是需要执行 JPQL 查询并主动检索数据,然后在流程中的后续组件中处理这些数据。
这正是 JPA 出站网关发挥作用的地方。 它们使您能够持久化数据以及检索数据。 为了促进这些用途,Spring Integration 提供了两种类型的 JPA 出站网关:
-
更新出站网关
-
检索出站网关
每当使用出站网关执行保存、更新或删除数据库中某些记录的操作时,您需要使用一个更新型出站网关。
例如,如果您使用 entity 来持久化它,则返回一个合并并持久化的实体作为结果。
在其他情况下,则返回受影响的记录数(已更新或删除的记录数)。
当从数据库检索(选择)数据时,我们使用检索出站网关。 通过检索出站网关,我们可以使用 JPQL、命名查询(原生或基于 JPQL)或原生查询(SQL)来选择数据并检索结果。
更新型出站网关在功能上类似于出站通道适配器,不同之处在于,更新型出站网关在执行 JPA 操作后会将结果发送到网关的回复通道。
检索出站网关类似于入站通道适配器。
这种相似性是主要因素,促使我们使用中央 JpaExecutor 类来尽可能统一通用功能。
所有 JPA 出站网关的通用内容,类似于 outbound-channel-adapter,我们可以使用它来执行各种 JPA 操作:
-
实体类
-
JPA 查询语言 (JPQL)
-
原生查询
-
命名查询
有关配置示例,请参阅 JPA 出站网关示例。
通用配置参数
JPA 出站网关始终可以访问 Spring Integration Message 作为输入。
因此,以下参数可用:
parameter-source-factory-
一个
o.s.i.jpa.support.parametersource.ParameterSourceFactory的实例用于获取o.s.i.jpa.support.parametersource.ParameterSource的实例。ParameterSource用于解析查询中提供的参数的值。 如果您使用 JPA 实体执行操作,则忽略parameter-source-factory属性。parameter子元素与parameter-source-factory互斥,并且必须在提供的ParameterSourceFactory上进行配置。 可选。 use-payload-as-parameter-source-
如果设置为
true,则使用Message的有效负载作为参数的来源。 如果设置为false,则整个Message都可用作为参数的来源。 如果没有传入 JPA 参数,此属性默认为true。 这意味着,如果您使用了默认的BeanPropertyParameterSourceFactory,则有效负载的 Bean 属性将用作 JPA 查询的参数值来源。 然而,如果传入了 JPA 参数,此属性默认评估为false。 原因是 JPA 参数允许您提供 SpEL 表达式。 因此,能够访问整个Message(包括请求头)非常有益。 可选。
更新出站网关
以下清单显示了您可以在 updating-outbound-gateway 上设置的所有属性,并描述了关键属性:
<int-jpa:updating-outbound-gateway request-channel="" (1)
auto-startup="true"
entity-class=""
entity-manager=""
entity-manager-factory=""
id=""
jpa-operations=""
jpa-query=""
named-query=""
native-query=""
order=""
parameter-source-factory=""
persist-mode="MERGE"
reply-channel="" (2)
reply-timeout="" (3)
use-payload-as-parameter-source="true">
<int:poller/>
<int-jpa:transactional/>
<int-jpa:parameter name="" type="" value=""/>
<int-jpa:parameter name="" expression=""/>
</int-jpa:updating-outbound-gateway>
| 1 | 接收出站网关消息以执行所需操作的通道。
此属性类似于 channel 的 outbound-channel-adapter 属性。
可选。 |
| 2 | 执行所需的 JPA 操作后,网关发送响应到的通道。
如果未定义此属性,则请求消息必须包含 replyChannel 头信息。
可选。 |
| 3 | 指定网关等待将结果发送到回复通道的时间。
仅当回复通道本身可能阻塞发送操作时适用(例如,当前已满的有界QueueChannel)。
默认情况下,网关会无限期等待。
该值以毫秒为单位指定。
可选。 |
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置出站适配器:
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
@IntegrationComponentScan
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@MessagingGateway
interface JpaGateway {
@Gateway(requestChannel = "jpaUpdateChannel")
@Transactional
void updateStudent(StudentDomain payload);
}
@Bean
@ServiceActivator(channel = "jpaUpdateChannel")
public MessageHandler jpaOutbound() {
JpaOutboundGateway adapter =
new JpaOutboundGateway(new JpaExecutor(this.entityManagerFactory));
adapter.setOutputChannelName("updateResults");
return adapter;
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置出站适配器:
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("updateResults"));
}
}
检索出站网关
以下示例演示如何配置检索出站网关:
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public IntegrationFlow retrievingGatewayFlow() {
return f -> f
.handle(Jpa.retrievingGateway(this.entityManagerFactory)
.jpaQuery("from Student s where s.id = :id")
.expectSingleResult(true)
.parameterExpression("id", "payload"))
.channel(c -> c.queue("retrieveResults"));
}
}
@Bean
fun retrievingGatewayFlow() =
integrationFlow {
handle(Jpa.retrievingGateway(this.entityManagerFactory)
.jpaQuery("from Student s where s.id = :id")
.expectSingleResult(true)
.parameterExpression("id", "payload"))
channel { queue("retrieveResults") }
}
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public JpaExecutor jpaExecutor() {
JpaExecutor executor = new JpaExecutor(this.entityManagerFactory);
jpaExecutor.setJpaQuery("from Student s where s.id = :id");
executor.setJpaParameters(Collections.singletonList(new JpaParameter("id", null, "payload")));
jpaExecutor.setExpectSingleResult(true);
return executor;
}
@Bean
@ServiceActivator(channel = "jpaRetrievingChannel")
public MessageHandler jpaOutbound() {
JpaOutboundGateway adapter = new JpaOutboundGateway(jpaExecutor());
adapter.setOutputChannelName("retrieveResults");
adapter.setGatewayType(OutboundGatewayType.RETRIEVING);
return adapter;
}
}
<int-jpa:retrieving-outbound-gateway request-channel=""
auto-startup="true"
delete-after-poll="false"
delete-in-batch="false"
entity-class=""
id-expression="" (1)
entity-manager=""
entity-manager-factory=""
expect-single-result="false" (2)
id=""
jpa-operations=""
jpa-query=""
max-results="" (3)
max-results-expression="" (4)
first-result="" (5)
first-result-expression="" (6)
named-query=""
native-query=""
order=""
parameter-source-factory=""
reply-channel=""
reply-timeout=""
use-payload-as-parameter-source="true">
<int:poller></int:poller>
<int-jpa:transactional/>
<int-jpa:parameter name="" type="" value=""/>
<int-jpa:parameter name="" expression=""/>
</int-jpa:retrieving-outbound-gateway>
| 1 | (自 Spring Integration 4.0 起) 用于确定 primaryKey 方法相对于作为评估上下文根对象的 requestMessage 的 EntityManager.find(Class entityClass, Object primaryKey) 值的 SpEL 表达式。
如果存在,entityClass 参数从 entity-class 属性中确定。
否则,它从 payload 类中确定。
如果您使用 id-expression,则不允许使用其他所有属性。
可选。 |
| 2 | 一个布尔标志,指示选择操作是否预期返回单个结果或 List 个结果。
如果将此标志设置为 true,则单个实体将作为消息的有效载荷发送。
如果返回多个实体,则会抛出异常。
如果为 false,则实体的 List 将作为消息的有效载荷发送。
默认值为 false。
可选。 |
| 3 | 此非零且非负的整数值告知适配器在执行选择操作时,不要选择超过指定数量的行。
默认情况下,如果未设置此属性,则给定查询将选择所有可能的记录。
此属性与 max-results-expression 互斥。
可选。 |
| 4 | 一个可用于查找结果集中最大结果数的表达式。
它与 max-results 互斥。
可选。 |
| 5 | 此非零且非负的整数值告知适配器从哪一条记录开始检索结果。
此属性与first-result-expression互斥。
版本3.0引入了此属性。
可选。 |
| 6 | 该表达式针对消息进行求值,以在结果集中找到第一条记录的位置。
此属性与 first-result 互斥。
版本 3.0 引入了此属性。
可选。 |
|
当您选择检索时删除实体,并且已检索到实体集合时,默认情况下,实体将逐个进行删除。 这可能导致性能问题。 或者,您可以将属性 JSR 317: Java™ Persistence 2.0 在第 4.10 章“批量更新和删除操作”中指出: “删除操作仅适用于指定类及其子类的实体。 它不会级联到相关实体。” 有关更多信息,请参见 JSR 317: Java™ Persistence 2.0 |
从版本 6.0 开始,当查询未返回任何实体时,Jpa.retrievingGateway() 将返回空列表结果。
此前,会根据 requiresReply 的配置,返回 null 以终止流程,或抛出异常。
或者,若要恢复之前的行为,可在网关后添加一个 filter 以过滤掉空列表。
在空列表处理属于下游逻辑一部分的应用程序中,这需要额外的配置。
有关可能的空列表处理选项,请参阅 Splitter Discard Channel。 |
JPA 出站网关示例
本节包含使用更新出站网关和检索出站网关的各种示例:
使用实体类进行更新
在以下示例中,更新出站网关使用 org.springframework.integration.jpa.test.entity.Student 实体类作为 JPA 定义参数进行持久化:
<int-jpa:updating-outbound-gateway request-channel="entityRequestChannel" (1)
reply-channel="entityResponseChannel" (2)
entity-class="org.springframework.integration.jpa.test.entity.Student"
entity-manager="em"/>
| 1 | 这是出站网关的请求通道。
它类似于 channel 的 outbound-channel-adapter 属性。 |
| 2 | 这是网关与出站适配器不同的地方。
这是接收 JPA 操作回复的通道。
然而,如果您不关心收到的回复,而只想执行该操作,使用 JPA outbound-channel-adapter 是合适的选择。
在本示例中,我们使用了实体类,回复即为作为 JPA 操作结果创建或合并的实体对象。 |
使用 JPQL 进行更新
以下示例使用 Java Persistence Query Language (JPQL) 更新实体, 这需要配置一个用于更新的出站网关:
<int-jpa:updating-outbound-gateway request-channel="jpaqlRequestChannel"
reply-channel="jpaqlResponseChannel"
jpa-query="update Student s set s.lastName = :lastName where s.rollNumber = :rollNumber" (1)
entity-manager="em">
<int-jpa:parameter name="lastName" expression="payload"/>
<int-jpa:parameter name="rollNumber" expression="headers['rollNumber']"/>
</int-jpa:updating-outbound-gateway>
| 1 | 网关执行的 JPQL 查询。
由于我们使用了更新出站网关,只有 update 和 delete 的 JPQL 查询才是合理的选择。 |
当您发送一个包含 String 负载的消息,且该消息还包含一个名为 rollNumber 的头部(其值为 long)时,指定学号的学生姓氏将被更新为消息负载中的值。
在使用更新网关时,返回值始终为一个整数,表示执行 JPA QL 所影响的记录数。
使用 JPQL 检索实体
以下示例使用检索出站网关和 JPQL 从数据库中检索(选择)一个或多个实体:
<int-jpa:retrieving-outbound-gateway request-channel="retrievingGatewayReqChannel"
reply-channel="retrievingGatewayReplyChannel"
jpa-query="select s from Student s where s.firstName = :firstName and s.lastName = :lastName"
entity-manager="em">
<int-jpa:parameter name="firstName" expression="payload"/>
<int-jpa:parameter name="lastName" expression="headers['lastName']"/>
</int-jpa:outbound-gateway>
使用实体检索id-expression
以下示例使用带有 id-expression 的检索出站网关从数据库中检索(查找)且仅检索一个实体:
primaryKey 是 id-expression 求值的结果。
entityClass 是消息类 payload。
<int-jpa:retrieving-outbound-gateway
request-channel="retrievingGatewayReqChannel"
reply-channel="retrievingGatewayReplyChannel"
id-expression="payload.id"
entity-manager="em"/>
使用命名查询进行更新
使用命名查询基本上与直接使用 JPQL 查询相同。
区别在于使用了 named-query 属性,如下例所示:
<int-jpa:updating-outbound-gateway request-channel="namedQueryRequestChannel"
reply-channel="namedQueryResponseChannel"
named-query="updateStudentByRollNumber"
entity-manager="em">
<int-jpa:parameter name="lastName" expression="payload"/>
<int-jpa:parameter name="rollNumber" expression="headers['rollNumber']"/>
</int-jpa:outbound-gateway>
| 您可以在此处找到使用 Spring Integration JPA 适配器的完整示例应用程序 here。 |