JDBC 支持
JDBC 支持
Spring Integration 提供了通道适配器,用于通过数据库查询接收和发送消息。 通过这些适配器,Spring Integration 不仅支持普通的 JDBC SQL 查询,还支持存储过程和存储函数的调用。
您需要将以下依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
<version>6.1.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-jdbc:6.1.9"
默认情况下,以下 JDBC 组件可用:
Spring Integration JDBC 模块还提供了 JDBC 消息存储。
入站通道适配器
入站通道适配器的主要功能是执行 SQL SELECT 查询,并将结果集转换为消息。
消息负载是整个结果集(表示为 List),列表中项的类型取决于行映射策略。
默认策略是一个通用映射器,它会为查询结果中的每一行返回一个 Map。
可选地,您可以通过添加对 RowMapper 实例的引用来更改此行为(有关行映射的更详细信息,请参阅 Spring JDBC 文档)。
如果您希望将 SELECT 查询结果中的行转换为单独的消息,可以使用下游拆分器。 |
入站适配器还需要对 JdbcTemplate 实例或 DataSource 的引用。
除了用于生成消息的SELECT语句外,适配器还有一个UPDATE语句,用于将记录标记为已处理,使其不会在下一次轮询中显示。
该更新可以通过原始 select 查询返回的 ID 列表进行参数化。
默认情况下,这是通过命名约定完成的(输入结果集中名为id的列被转换为更新参数映射中的名为id的列表)。
以下示例定义了一个具有更新查询和DataSource引用的入站通道适配器。
<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
channel="target" data-source="dataSource"
update="update item set status=10 where id in (:id)" />
更新查询中的参数通过冒号(:)前缀指定在参数名称之前(在前面的示例中,这是一个应用于轮询结果集中每一行的表达式)。
这是 Spring JDBC 中命名参数 JDBC 支持的标准化功能,并结合了 Spring Integration 中采用的约定(对轮询结果列表进行投影)。
底层 Spring JDBC 功能限制了可用表达式的范围(例如,除点号外的绝大多数特殊字符均不被允许),但由于目标通常是可通过 Bean 路径寻址的对象列表(可能是一个包含单个对象的列表),因此这种限制并不过分严格。 |
要更改参数生成策略,您可以向适配器注入SqlParameterSourceFactory以覆盖默认行为(该适配器具有sql-parameter-source-factory属性)。
Spring Integration 提供了ExpressionEvaluatingSqlParameterSourceFactory,它创建一个基于 SpEL 的参数源,并将查询结果作为#root对象返回。
(如果update-per-row为 true,则根对象是行)。
如果更新查询中多次出现相同的参数名称,则仅计算一次,并缓存其结果。
您也可以为 select 查询使用参数源。 在这种情况下,由于没有“result”对象可供评估,因此每次使用单个参数源(而不是使用参数源工厂)。 从版本 4.0 开始,您可以使用 Spring 创建基于 SpEL 的参数源,如下例所示:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
channel="target" data-source="dataSource"
select-sql-parameter-source="parameterSource" />
<bean id="parameterSource" factory-bean="parameterSourceFactory"
factory-method="createParameterSourceNoCache">
<constructor-arg value="" />
</bean>
<bean id="parameterSourceFactory"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="parameterExpressions">
<map>
<entry key="status" value="@statusBean.which()" />
</map>
</property>
</bean>
<bean id="statusBean" class="foo.StatusDetermination" />
每个参数表达式中的value可以是任何有效的 SpEL 表达式。
表达式求值的#root对象是定义在parameterSource Bean 上的构造函数参数。
它对于所有求值都是静态的(在前面的示例中,是一个空的String)。
从 5.0 版本开始,您可以使用 ExpressionEvaluatingSqlParameterSourceFactory 和 sqlParameterTypes 来指定特定参数的目标 SQL 类型。
以下示例提供了查询中使用的参数的 SQL 类型:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
channel="target" data-source="dataSource"
select-sql-parameter-source="parameterSource" />
<bean id="parameterSource" factory-bean="parameterSourceFactory"
factory-method="createParameterSourceNoCache">
<constructor-arg value="" />
</bean>
<bean id="parameterSourceFactory"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="sqlParameterTypes">
<map>
<entry key="status" value="#{ T(java.sql.Types).BINARY}" />
</map>
</property>
</bean>
使用 createParameterSourceNoCache 工厂方法。
否则,参数源将缓存求值结果。
另外请注意,由于缓存已禁用,如果相同的参数名在 select 查询中出现多次,则每次出现时都会重新求值。 |
轮询与事务
入站适配器接受一个常规的 Spring Integration 轮询器作为子元素。 因此,轮询的频率可以被控制(以及其他用途)。 对于 JDBC 使用,轮询器的一个重要特性是能够将轮询操作包装在事务中,如下例所示:
<int-jdbc:inbound-channel-adapter query="..."
channel="target" data-source="dataSource" update="...">
<int:poller fixed-rate="1000">
<int:transactional/>
</int:poller>
</int-jdbc:inbound-channel-adapter>
| 如果您未显式指定轮询器,则将使用默认值。 与 Spring Integration 的常规做法一致,它可以定义为一个顶层 Bean)。 |
在前面的示例中,数据库每 1000 毫秒(即每秒)轮询一次,且更新和查询操作均在同一个事务中执行。 事务管理器的配置未显示。 但只要其感知到数据源,轮询操作就是事务性的。 一种常见的使用场景是下游通道为直接通道(默认),以便端点在同一线程中被调用,从而处于同一事务中。 这样,如果其中任何一个失败,事务将回滚,输入数据也会恢复到原始状态。
max-rows对比max-messages-per-poll
JDBC 入站通道适配器定义了一个名为 max-rows 的属性。
当您指定适配器的轮询器时,还可以定义一个名为 max-messages-per-poll 的属性。
虽然这两个属性看起来相似,但它们的含义却大不相同。
max-messages-per-poll 指定每个轮询间隔内查询执行的次数,而 max-rows 指定每次执行返回的行数。
在正常情况下,当您使用 JDBC 入站通道适配器时,通常不希望将轮询器的 max-messages-per-poll 属性设置为该值。
其默认值为 1,这意味着对于每个轮询间隔,JDBC 入站通道适配器的 receive() 方法将恰好执行一次。
将 max-messages-per-poll 属性设置为更大的值意味着查询将连续执行该次数。
有关 max-messages-per-poll 属性的更多信息,请参阅 配置入站通道适配器。
相比之下,max-rows属性如果大于0,则指定从由receive()方法创建的查询结果集中使用的最大行数。
如果该属性设置为0,则所有行都包含在结果消息中。
该属性的默认值为0。
建议通过特定于数据库提供商的查询选项来限制结果集,例如 MySQL LIMIT、SQL Server TOP 或 Oracle 的 ROWNUM。
有关更多信息,请参阅相关提供商的文档。 |
出站通道适配器
出站通道适配器是入站适配器的逆操作:其作用是处理消息并利用该消息执行 SQL 查询。 默认情况下,消息负载和请求头可作为输入参数提供给查询,如下例所示:
<int-jdbc:outbound-channel-adapter
query="insert into foos (id, status, name) values (:headers[id], 0, :payload[something])"
data-source="dataSource"
channel="input"/>
在前面的示例中,到达标记为input的通道的消息具有一个以something为键的映射作为负载,因此[]操作符从该映射中解引用该值。
头信息也作为映射进行访问。
前置查询中的参数是传入消息上的 Bean 属性表达式(而非 SpEL 表达式)。
此行为属于 SqlParameterSource,它是由出站适配器创建的默认源。
您可以注入不同的 SqlParameterSourceFactory 以获得不同的行为。 |
出站适配器需要引用 DataSource 或 JdbcTemplate。
您也可以注入 SqlParameterSourceFactory 来控制每个传入消息到查询的绑定。
如果输入通道是直连通道,则出站适配器将在与消息发送者相同的线程中运行其查询,因此也在相同的事务中(如果存在事务)。
使用 SpEL 表达式传递参数
大多数 JDBC 通道适配器的一个常见需求是将参数作为 SQL 查询、存储过程或函数的一部分传递。
如前所述,这些参数默认是 Bean 属性表达式,而不是 SpEL 表达式。
但是,如果您需要传递 SpEL 表达式作为参数,则必须显式注入 SqlParameterSourceFactory。
以下示例使用 ExpressionEvaluatingSqlParameterSourceFactory 来实现该需求:
<jdbc:outbound-channel-adapter data-source="dataSource" channel="input"
query="insert into MESSAGES (MESSAGE_ID,PAYLOAD,CREATED_DATE) values (:id, :payload, :createdDate)"
sql-parameter-source-factory="spelSource"/>
<bean id="spelSource"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="parameterExpressions">
<map>
<entry key="id" value="headers['id'].toString()"/>
<entry key="createdDate" value="new java.util.Date()"/>
<entry key="payload" value="payload"/>
</map>
</property>
</bean>
有关更多信息,请参阅定义参数源。
使用PreparedStatement回调
有时,SqlParameterSourceFactory的灵活性和松耦合特性无法满足目标PreparedStatement的需求,或者我们需要执行一些底层的JDBC操作。
Spring JDBC模块提供了用于配置执行环境(例如ConnectionCallback或PreparedStatementCreator)以及操纵参数值(例如SqlParameterSource)的API。
它甚至能够访问用于底层操作的API,例如StatementCallback。
从 Spring Integration 4.2 开始,MessagePreparedStatementSetter允许在PreparedStatement上手动指定参数,位于requestMessage上下文中。
该类在标准 Spring JDBC API 中扮演的角色与PreparedStatementSetter完全相同。
实际上,当JdbcMessageHandler在JdbcTemplate上调用execute时,它会直接从内联的PreparedStatementSetter实现中被直接调用。
此函数式接口选项与 sqlParameterSourceFactory 互斥,可用于作为更强大的替代方案,从 requestMessage 中填充 PreparedStatement 的参数。
例如,当我们需要将 File 数据以流式方式存储到数据库的 BLOB 列时,它非常有用。
以下示例展示了如何实现:
@Bean
@ServiceActivator(inputChannel = "storeFileChannel")
public MessageHandler jdbcMessageHandler(DataSource dataSource) {
JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(dataSource,
"INSERT INTO imagedb (image_name, content, description) VALUES (?, ?, ?)");
jdbcMessageHandler.setPreparedStatementSetter((ps, m) -> {
ps.setString(1, m.getHeaders().get(FileHeaders.FILENAME));
try (FileInputStream inputStream = new FileInputStream((File) m.getPayload()); ) {
ps.setBlob(2, inputStream);
}
catch (Exception e) {
throw new MessageHandlingException(m, e);
}
ps.setClob(3, new StringReader(m.getHeaders().get("description", String.class)));
});
return jdbcMessageHandler;
}
从 XML 配置的角度来看,prepared-statement-setter属性在<int-jdbc:outbound-channel-adapter>组件上是可用的。
它允许您指定一个MessagePreparedStatementSetterbean 引用。
批量更新
从 5.1 版本开始,如果请求消息的负载是 Iterable 类型的实例,则 JdbcMessageHandler 将执行 JdbcOperations.batchUpdate()。
Iterable 中的每个元素都会被包装为 Message,并附带来自请求消息头的信息,前提是该元素尚未是 Message 类型。
在基于常规 SqlParameterSourceFactory 的配置情况下,这些消息用于为所提及的 JdbcOperations.batchUpdate() 函数中的参数构建一个 SqlParameterSource[]。
当应用 MessagePreparedStatementSetter 配置时,会使用 BatchPreparedStatementSetter 变体来遍历这些消息以处理每个项目,并对它们调用提供的 MessagePreparedStatementSetter。
当选择 keysGenerated 模式时,不支持批量更新。
出站网关
出站网关类似于出站和入站适配器的组合:它的角色是处理消息,并使用该消息执行 SQL 查询,然后通过向回复通道发送结果来响应。 默认情况下,消息负载和头部可作为查询的输入参数,如下例所示:
<int-jdbc:outbound-gateway
update="insert into mythings (id, status, name) values (:headers[id], 0, :payload[thing])"
request-channel="input" reply-channel="output" data-source="dataSource" />
上述示例的结果是将一条记录插入到 mythings 表中,并向输出通道返回一条指示受影响的行数的消息(负载是一个映射:{UPDATED=1})。
如果更新查询是一个带有自动生成键的插入操作,您可以通过在 preceding example 中添加 keys-generated="true" 来将生成的键填充到回复消息中(这不是默认行为,因为某些数据库平台不支持此功能)。
以下示例展示了更改后的配置:
<int-jdbc:outbound-gateway
update="insert into mythings (status, name) values (0, :payload[thing])"
request-channel="input" reply-channel="output" data-source="dataSource"
keys-generated="true"/>
除了更新计数或生成的键之外,您还可以提供一个选择查询来执行,并根据结果生成回复消息(例如入站适配器),如下例所示:
<int-jdbc:outbound-gateway
update="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])"
query="select * from foos where id=:headers[$id]"
request-channel="input" reply-channel="output" data-source="dataSource"/>
自 Spring Integration 2.2 版本起,更新 SQL 查询不再强制要求。
现在,您可以通过使用 query 属性或 query 元素仅提供一个选择(查询)语句。
如果您需要主动检索数据(例如通过通用网关或负载增强器),这将非常有用。
回复消息随后将根据结果生成(类似于入站适配器的工作原理),并传递到回复通道。
以下示例展示了如何使用 query 属性:
<int-jdbc:outbound-gateway
query="select * from foos where id=:headers[id]"
request-channel="input"
reply-channel="output"
data-source="dataSource"/>
|
默认情况下, |
与通道适配器一样,您也可以为请求和回复提供 SqlParameterSourceFactory 个实例。
默认值与出站适配器相同,因此请求消息可作为表达式的根对象可用。
如果设置为 keys-generated="true",则表达式的根对象是生成的键(如果只有一个则为映射,如果是多值则为映射列表)。
出站网关需要引用一个 DataSource 或 JdbcTemplate。
它还可以注入一个 SqlParameterSourceFactory 来控制将传入消息绑定到查询的过程。
从 4.2 版本开始,request-prepared-statement-setter 属性可作为 request-sql-parameter-source-factory 的替代选项,在 <int-jdbc:outbound-gateway> 上使用。
它允许您指定一个 MessagePreparedStatementSetter bean 引用,该引用会在执行前进行更复杂的 PreparedStatement 准备。
从版本 6.0 开始,JdbcOutboundGateway 将直接返回空列表结果,而不再像以前那样将其转换为 null(其含义为“无回复”)。
这导致在那些将空列表处理作为下游逻辑一部分的应用程序中需要额外的配置。
有关可能的空列表处理选项,请参阅 拆分器丢弃通道。
有关 MessagePreparedStatementSetter 的更多信息,请参阅 出站通道适配器。
JDBC 消息存储
Spring Integration 提供了两种特定的 JDBC 消息存储实现。
JdbcMessageStore 适用于与聚合器和检查点模式一起使用。
JdbcChannelMessageStore 实现提供了一种更具针对性且可扩展的实现,专门用于消息通道。
请注意,您可以使用 JdbcMessageStore 来备份消息通道,JdbcChannelMessageStore 是为此目的而优化的。
从版本 5.0.11、5.1.2 开始,JdbcChannelMessageStore 的索引已得到优化。
如果您在这样的存储中有大量消息组,可能需要调整这些索引。
此外,PriorityChannel 的索引已被注释掉,因为除非您使用的是由 JDBC 支持的此类通道,否则不需要它。 |
当使用 OracleChannelMessageStoreQueryProvider 时,必须添加优先级通道索引 必须,因为它包含在查询的提示中。 |
初始化数据库
在使用 JDBC 消息存储组件之前,您应该先配置目标数据库并创建相应的对象。
Spring Integration 附带了一些示例脚本,可用于初始化数据库。
在 spring-integration-jdbc JAR 文件中,您可以在 org.springframework.integration.jdbc 包中找到这些脚本。
它为多种常见的数据库平台提供了示例创建脚本和示例删除脚本。
使用这些脚本的一种常见方式是在 Spring JDBC 数据源初始化器 中引用它们。
请注意,这些脚本仅提供为示例,并作为所需表名和列名的规范。
您可能会发现需要对其进行增强以用于生产环境(例如,通过添加索引声明)。
通用 JDBC 消息存储
JDBC 模块提供了基于数据库的 Spring Integration MessageStore(在检查点模式中很重要)和 MessageGroupStore(在有状态模式如聚合器中很重要)的实现。
这两个接口均由 JdbcMessageStore 实现,并支持在 XML 中配置存储实例,如下例所示:
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
您可以指定一个 JdbcTemplate 而不是一个 DataSource。
以下示例展示了一些其他可选属性:
<int-jdbc:message-store id="messageStore" data-source="dataSource"
lob-handler="lobHandler" table-prefix="MY_INT_"/>
在上面的示例中,我们指定了 LobHandler 来处理作为大对象的消息(这对 Oracle 通常是必要的),并为存储生成的查询中的表名指定了前缀。
表名前缀默认为 INT_。
支持消息通道
如果您打算使用 JDBC 来后端消息通道,我们推荐使用 JdbcChannelMessageStore 实现。
它仅与消息通道配合使用。
支持的数据库
The JdbcChannelMessageStore 使用特定于数据库的 SQL 查询从数据库中检索消息。
因此,您必须在 JdbcChannelMessageStore 上设置 ChannelMessageStoreQueryProvider 属性。
此 channelMessageStoreQueryProvider 为您提供指定的特定数据库的 SQL 查询。
Spring Integration 支持以下关系型数据库:
-
PostgreSQL
-
HSQLDB
-
MySQL
-
Oracle
-
德比
-
H2
-
SQL Server
-
赛伯斯
-
DB2
如果您的数据库未列出,您可以扩展 AbstractChannelMessageStoreQueryProvider 类并提供您自己的自定义查询。
版本 4.0 在表格中添加了 MESSAGE_SEQUENCE 列,以确保即使消息存储在同一毫秒内也能实现先进先出(FIFO)队列。
自定义消息插入
自 5.0 版本起,通过重载 ChannelMessageStorePreparedStatementSetter 类,您可以为 JdbcChannelMessageStore 中的消息插入提供自定义实现。
您可以使用它来设置不同的列、更改表结构或序列化策略。
例如,除了默认的 byte[] 序列化外,您还可以将其结构存储为 JSON 字符串。
以下示例使用 setValues 的默认实现来存储公共列,并覆盖其行为以将消息负载存储为 varchar:
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
Object groupId, String region, boolean priorityEnabled) throws SQLException {
// Populate common columns
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
// Store message payload as varchar
preparedStatement.setString(6, requestMessage.getPayload().toString());
}
}
|
通常,我们不建议使用关系型数据库进行消息队列。相反,如果可能,请考虑使用基于 JMS 或 AMQP 的通道。有关更多参考,请参阅以下资源: 如果您仍计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,该机制将在 后续章节 中介绍。 |
并发轮询
轮询消息通道时,您可以选择将关联的 Poller 配置为具有 TaskExecutor 引用。
|
不过请记住,如果您使用基于 JDBC 的消息通道,并计划以多个线程事务性地轮询该通道及相应的消息存储,则应确保使用支持多版本并发控制 (MVCC) 的关系型数据库。 否则,可能会出现锁问题,并且在使用多个线程时,性能可能无法达到预期效果。 例如,Apache Derby 在这方面就存在问题。 为了获得更好的 JDBC 队列吞吐量,并避免在不同线程可能从队列中轮询相同的
|
优先通道
从版本 4.0 开始,JdbcChannelMessageStore 实现了 PriorityCapableChannelMessageStore 并提供了 priorityEnabled 选项,使其可用作 message-store 引用以指向 priority-queue 实例。
为此,INT_CHANNEL_MESSAGE 表包含一个 MESSAGE_PRIORITY 列,用于存储 PRIORITY 消息头的值。
此外,新增的 MESSAGE_SEQUENCE 列使我们能够实现健壮的先进先出(FIFO)轮询机制,即使在同一毫秒内将具有相同优先级的多条消息进行存储。
消息通过 order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE 从数据库中被轮询(选取)出来。
我们不推荐使用同一个 JdbcChannelMessageStore bean 同时用于优先级队列通道和非优先级队列通道,因为 priorityEnabled 选项适用于整个存储,且无法为队列通道保留正确的 FIFO 队列语义。
然而,同一个 INT_CHANNEL_MESSAGE 表(甚至 region)可以用于这两种 JdbcChannelMessageStore 类型。
要配置该场景,您可以让一个消息存储 bean 继承自另一个,如下例所示: |
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
分区消息存储
在同一个应用或同一应用的多个节点中,通常使用JdbcMessageStore作为全局存储来存放一组应用程序或节点的数据。
为了在一定程度上防止名称冲突并控制数据库元数据配置,消息存储支持以两种方式进行表分区。
一种方式是通过更改前缀(如前面所述)来使用独立的表名。
另一种方式是为单个表内的数据分区指定一个region名称。
第二种方法的一个重要用例是当MessageStore管理为Spring Integration消息通道提供支持的持久化队列时。
持久化通道的消息数据以通道名称为键存储在存储中。
因此,如果通道名称不是全局唯一的,这些通道可能会获取到本不属于它们的数据。
为了避免这种风险,您可以使用消息存储的region功能,将具有相同逻辑名称但不同物理通道的数据进行隔离。
PostgreSQL: 接收推送通知
PostgreSQL 提供了一个监听和通知框架,用于在数据库表发生操作时接收推送通知。
Spring Integration(从 6.0 版本开始)利用此机制,允许在将新消息添加到 JdbcChannelMessageStore 时接收推送通知。
使用此功能时,必须定义一个数据库触发器,该触发器可包含在 Spring Integration JDBC 模块中包含的 schema-postgresql.sql 文件的注释中。
推送通知通过 PostgresChannelMessageTableSubscriber 类接收,该类允许其订阅者在针对任意给定的 region 和 groupId 有新消息到达时收到回调。
即使消息是在不同的 JVM 中追加到同一个数据库的,也会接收到这些通知。
PostgresSubscribableChannel 实现使用 PostgresChannelMessageTableSubscriber.Subscription 契约,作为对上述 PostgresChannelMessageTableSubscriber 通知的反应,从存储中拉取消息。
例如,可以按以下方式接收some group的推送通知:
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return messageStore;
}
@Bean
public PostgresChannelMessageTableSubscriber subscriber(
@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
return new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}
@Bean
public PostgresSubscribableChannel channel(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore) {
return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}
事务支持
从版本 6.0.5 开始,在 PlatformTransactionManager 上指定 PostgresSubscribableChannel 将在事务中通知订阅者。
订阅者中的异常将导致事务回滚,并将消息放回消息存储区。
默认情况下不会激活事务支持。
重试
从版本 6.0.5 开始,可以通过向 PostgresSubscribableChannel 提供 RetryTemplate 来指定重试策略。
默认情况下,不会执行任何重试操作。
|
任何活动的 为了满足独占连接的需求,还建议一个 JVM 只运行单个 |
存储过程
在某些情况下,原生的 JDBC 支持可能不足以满足需求。 您可能需要处理遗留的关系型数据库架构,或者具有复杂的数据处理需求,但最终必须使用 存储过程 或存储函数。 自 Spring Integration 2.1 版本起,我们提供了三个组件来执行存储过程或存储函数:
-
存储过程入站通道适配器
-
存储过程出站通道适配器
-
存储过程出站网关
支持的数据库
为了启用对存储过程和存储函数的调用,存储过程组件使用 org.springframework.jdbc.core.simple.SimpleJdbcCall 类。
因此,以下数据库完全支持执行存储过程:
-
Apache Derby
-
DB2
-
MySQL
-
Microsoft SQL Server
-
Oracle
-
PostgreSQL
-
赛伯斯
如果您想执行存储函数,以下数据库完全支持:
-
MySQL
-
Microsoft SQL Server
-
Oracle
-
PostgreSQL
|
即使您特定的数据库可能未得到完全支持,只要您的 RDBMS 支持存储过程或存储函数,您仍然很有可能成功使用 Spring Integration 的存储过程组件。 事实上,一些提供的集成测试使用了 H2 数据库。 然而,彻底测试这些使用场景非常重要。 |
通用配置属性
所有存储过程组件共享某些配置参数:
-
auto-startup: 生命周期属性,用于指示该组件是否应在应用程序上下文启动期间启动。 其默认值为true。 可选。 -
data-source: 对javax.sql.DataSource的引用,用于访问数据库。 必需项。 -
id: 标识底层的 Spring bean 定义,它是EventDrivenConsumer或PollingConsumer的实例,具体取决于出站通道适配器的channel属性引用的是SubscribableChannel还是PollableChannel。 可选。 -
ignore-column-meta-data: 对于完全支持的数据库,底层的SimpleJdbcCall类可以自动从JDBC元数据中检索存储过程或存储函数的参数信息。然而,如果数据库不支持元数据查找,或者您需要提供自定义的参数定义,则可以将此标志设置为
true。 它默认为false。 可选。 -
is-function: 如果为true,则调用 SQL 函数。 此时,stored-procedure-name或stored-procedure-name-expression属性定义所调用函数的名称。 默认值为false。 可选。 -
stored-procedure-name: 此属性指定存储过程的名称。 如果is-function属性设置为true,则此属性指定函数名称。 必须指定此属性或stored-procedure-name-expression。 -
stored-procedure-name-expression: 此属性使用 SpEL 表达式指定存储过程的名称。 通过使用 SpEL,您可以访问完整消息(如果可用),包括其标头和负载。 您可以使用此属性在运行时调用不同的存储过程。 例如,您可以将想要执行的存储过程名称作为消息头提供。 该表达式必须解析为String。如果
is-function属性设置为true,则此属性指定一个存储函数。 必须指定此属性或stored-procedure-name。 -
jdbc-call-operations-cache-size: 定义缓存的SimpleJdbcCallOperations实例的最大数量。 基本上,对于每个存储过程名称,都会创建一个新的SimpleJdbcCallOperations实例,该实例随后会被缓存。Spring Integration 2.2 添加了 stored-procedure-name-expression属性和jdbc-call-operations-cache-size属性。默认缓存大小为
10。0的值将禁用缓存。 不允许使用负值。如果您启用了 JMX,关于
jdbc-call-operations-cache的统计信息将作为 MBean 暴露。 有关更多信息,请参阅 MBean 导出器。 -
sql-parameter-source-factory: (存储过程入站适配器不可用。) 对SqlParameterSourceFactory的引用。 默认情况下,传入的Message负载的 Bean 属性被用作存储过程输入参数的来源,方法是使用BeanPropertySqlParameterSourceFactory。这足以满足基本用例。 对于更复杂的选项,请考虑传入一个或多个
ProcedureParameter值。 请参阅 定义参数源。 可选。 -
use-payload-as-parameter-source:(存储过程入站通道适配器不可用。) 如果设置为true,则使用Message的负载作为提供参数的来源。 但如果设置为false,则整个Message都可用作参数的来源。如果没有传入过程参数,此属性默认为
true。 这意味着,通过使用默认值BeanPropertySqlParameterSourceFactory,负载的 bean 属性将用作存储过程或存储函数的参数值的来源。然而,如果传递了过程参数,此属性(默认情况下)评估为
false。ProcedureParameter允许提供 SpEL 表达式。 因此,能够访问整个Message是非常有益的。 该属性设置在底层的StoredProcExecutor上。 可选。
通用配置子元素
存储过程组件共享一组共同的子元素,您可以使用这些元素来定义和传递参数给存储过程或存储函数。 以下元素可用:
-
parameter -
returning-resultset -
sql-parameter-definition -
poller -
parameter: 提供了一种机制来提供存储过程参数。 参数可以是静态的,也可以使用 SpEL 表达式提供。<int-jdbc:parameter name="" (1) type="" (2) value=""/> (3) <int-jdbc:parameter name="" expression=""/> (4)+<1> 要传递给存储过程或存储函数的参数名称。 必需。 <2> 此属性指定值的类型。 如果未提供,此属性默认为
java.lang.String。 仅在使用了value属性时才会使用此属性。 可选。 <3> 参数的值。 必须提供此属性或expression属性之一。 可选。 <4> 除了value属性外,您还可以指定一个 SpEL 表达式来传递参数的值。 如果您指定了expression,则不允许使用value属性。 可选。Optional.
-
returning-resultset: 存储过程可能返回多个结果集。 通过设置一个或多个returning-resultset元素,您可以指定RowMappers将每个返回的ResultSet转换为有意义的对象。 可选。<int-jdbc:returning-resultset name="" row-mapper="" /> -
sql-parameter-definition: 如果您使用的是完全支持的数据库,通常无需指定存储过程参数定义。 相反,这些参数可以从 JDBC 元数据中自动推导出来。 然而,如果您使用的是不完全支持的数据库,则必须使用sql-parameter-definition元素显式设置这些参数。您还可以通过使用
ignore-column-meta-data属性来关闭通过 JDBC 获取的参数元数据信息的任何处理。<int-jdbc:sql-parameter-definition name="" (1) direction="IN" (2) type="STRING" (3) scale="5" (4) type-name="FOO_STRUCT" (5) return-type="fooSqlReturnType"/> (6)1 指定 SQL 参数的名称。 必填项。 2 指定 SQL 参数定义的方向。 默认为 IN。 有效值为:IN、OUT和INOUT。 如果您的存储过程返回结果集,请使用returning-resultset元素。 可选。3 此 SQL 参数定义所使用的 SQL 类型。 转换为整数类型,如 java.sql.Types所定义。 您也可以选择提供整数值。 如果未显式设置此属性,则默认为 'VARCHAR'。 可选。4 SQL 参数的规模。 仅用于数值和十进制参数。 可选。 5 对于用户命名的类型(例如: STRUCT,DISTINCT,JAVA_OBJECT)以及命名数组类型,typeName表示该情况。 此属性与scale属性互斥。 可选。6 对复杂类型的自定义值处理器的引用。 SqlReturnType的实现。 此属性与scale属性互斥,仅适用于 OUT 和 INOUT 参数。 可选。 -
poller: 允许您配置消息轮询器,如果此端点是一个PollingConsumer。 可选。
定义参数源
参数源控制检索和映射 Spring Integration 消息属性到相关存储过程输入参数的技术。
存储过程组件遵循特定规则。
默认情况下,Message负载的bean属性被用作存储过程输入参数的来源。
在这种情况下,将使用BeanPropertySqlParameterSourceFactory。
这可能足以满足基本用例。
下一个示例说明了这种默认行为。
为了使使用 BeanPropertySqlParameterSourceFactory 进行 bean 属性的“自动”查找正常工作,您的 bean 属性必须以小写字母定义。
这是因为在 org.springframework.jdbc.core.metadata.CallMetaDataContext(Java 方法为 matchInParameterValuesWithCallParameters())中,检索到的存储过程参数声明会被转换为小写。
因此,如果您使用了驼峰命名法的 bean 属性(例如 lastName),查找将会失败。
在这种情况下,请提供一个显式的 ProcedureParameter。 |
假设我们有一个负载,它由一个简单的 Bean 组成,该 Bean 具有以下三个属性:id、name 和 description。
此外,我们还有一个简化的存储过程 INSERT_COFFEE,它接受三个输入参数:id、name 和 description。
我们还使用了一个完全支持的数据库。
在这种情况下,以下用于存储过程出站适配器的配置就足够了:
<int-jdbc:stored-proc-outbound-channel-adapter data-source="dataSource"
channel="insertCoffeeProcedureRequestChannel"
stored-procedure-name="INSERT_COFFEE"/>
对于更复杂的选项,请考虑传入一个或多个 ProcedureParameter 值。
如果您显式提供了 ProcedureParameter 值,默认情况下将使用 ExpressionEvaluatingSqlParameterSourceFactory 进行参数处理,以启用 SpEL 表达式的全部功能。
如果您需要更精确地控制参数的检索方式,可以考虑使用 sql-parameter-source-factory 属性传入一个自定义的 SqlParameterSourceFactory 实现。
存储过程入站通道适配器
以下代码列出了存储过程入站通道适配器所相关的属性:
<int-jdbc:stored-proc-inbound-channel-adapter
channel="" (1)
stored-procedure-name=""
data-source=""
auto-startup="true"
id=""
ignore-column-meta-data="false"
is-function="false"
skip-undeclared-results="" (2)
return-value-required="false" (3)
<int:poller/>
<int-jdbc:sql-parameter-definition name="" direction="IN"
type="STRING"
scale=""/>
<int-jdbc:parameter name="" type="" value=""/>
<int-jdbc:parameter name="" expression=""/>
<int-jdbc:returning-resultset name="" row-mapper="" />
</int-jdbc:stored-proc-inbound-channel-adapter>
| 1 | 接收轮询消息的通道。
如果存储过程或函数不返回任何数据,则 Message 的有效负载为 null。
必填项。 |
| 2 | 如果此属性设置为 true,则存储过程调用中所有没有对应 SqlOutParameter 声明的结果都将被跳过。
例如,即使您的存储过程仅声明了一个结果参数,存储过程仍可能返回更新计数值。
具体行为取决于数据库实现。
该值设置在底层 JdbcTemplate 上。
该值的默认值为 true。
可选。 |
| 3 | 指示是否应包含此过程的返回值。 自 Spring Integration 3.0 起。 可选。 |
存储过程出站通道适配器
以下代码列出了存储过程出站通道适配器所需的关键属性:
<int-jdbc:stored-proc-outbound-channel-adapter channel="" (1)
stored-procedure-name=""
data-source=""
auto-startup="true"
id=""
ignore-column-meta-data="false"
order="" (2)
sql-parameter-source-factory=""
use-payload-as-parameter-source="">
<int:poller fixed-rate=""/>
<int-jdbc:sql-parameter-definition name=""/>
<int-jdbc:parameter name=""/>
</int-jdbc:stored-proc-outbound-channel-adapter>
| 1 | 此端点的接收消息通道。 必需。 |
| 2 | 指定当此端点作为订阅者连接到通道时的调用顺序。
这在通道使用 failover 分发策略时尤为重要。
当此端点本身是带有队列的通道的轮询消费者时,此设置无效。
可选。 |
存储过程出站网关
以下代码列出了存储过程出站通道适配器所需的关键属性:
<int-jdbc:stored-proc-outbound-gateway request-channel="" (1)
stored-procedure-name=""
data-source=""
auto-startup="true"
id=""
ignore-column-meta-data="false"
is-function="false"
order=""
reply-channel="" (2)
reply-timeout="" (3)
return-value-required="false" (4)
skip-undeclared-results="" (5)
sql-parameter-source-factory=""
use-payload-as-parameter-source="">
<int-jdbc:sql-parameter-definition name="" direction="IN"
type=""
scale="10"/>
<int-jdbc:sql-parameter-definition name=""/>
<int-jdbc:parameter name="" type="" value=""/>
<int-jdbc:parameter name="" expression=""/>
<int-jdbc:returning-resultset name="" row-mapper="" />
| 1 | 此端点的接收消息通道。 必需。 |
| 2 | 接收数据库响应后应发送回复的消息通道。 可选。 |
| 3 | 允许您指定此网关在抛出异常之前等待回复消息成功发送的时长。
请注意,当向 DirectChannel 发送时,调用将在发送者的线程中发生。
因此,发送操作的失败可能是由下游的其他组件引起的。
该值以毫秒为单位指定。
可选。 |
| 4 | 指示是否应包含此过程的返回值。 可选。 |
| 5 | 如果 skip-undeclared-results 属性设置为 true,则存储过程调用中所有没有对应 SqlOutParameter 声明的结果都将被跳过。
例如,存储过程可能会返回更新计数值,即使您的存储过程仅声明了一个结果参数。
确切的行为取决于数据库。
该值在底层 JdbcTemplate 上设置。
该值的默认值为 true。
可选。 |
示例
本部分包含两个调用 Apache Derby 存储过程的示例。
第一个过程调用一个返回 ResultSet 的存储过程。
通过使用 RowMapper,数据被转换为领域对象,随后该对象成为 Spring Integration 消息的有效负载。
在第二个示例中,我们调用一个存储过程,该过程使用输出参数来返回数据。
|
该项目包含此处引用的 Apache Derby 示例,以及运行它的说明。 Spring Integration Samples 项目还提供了使用 Oracle 存储过程的示例。 |
在第一个示例中,我们调用一个名为 FIND_ALL_COFFEE_BEVERAGES 的存储过程,它不定义任何输入参数,但返回一个 ResultSet。
在 Apache Derby 中,存储过程是用 Java 实现的。 以下代码片段展示了该方法的签名:
public static void findAllCoffeeBeverages(ResultSet[] coffeeBeverages)
throws SQLException {
...
}
以下列表显示了相应的 SQL:
CREATE PROCEDURE FIND_ALL_COFFEE_BEVERAGES() \
PARAMETER STYLE JAVA LANGUAGE JAVA MODIFIES SQL DATA DYNAMIC RESULT SETS 1 \
EXTERNAL NAME 'o.s.i.jdbc.storedproc.derby.DerbyStoredProcedures.findAllCoffeeBeverages';
在 Spring Integration 中,您现在可以使用例如 stored-proc-outbound-gateway 来调用此存储过程,如下例所示:
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-all"
data-source="dataSource"
request-channel="findAllProcedureRequestChannel"
expect-single-result="true"
stored-procedure-name="FIND_ALL_COFFEE_BEVERAGES">
<int-jdbc:returning-resultset name="coffeeBeverages"
row-mapper="org.springframework.integration.support.CoffeBeverageMapper"/>
</int-jdbc:stored-proc-outbound-gateway>
在第二个示例中,我们调用了一个名为 FIND_COFFEE 的存储过程,该过程带有一个输入参数。
它不使用返回值 ResultSet,而是使用一个输出参数。
以下示例展示了方法签名:
public static void findCoffee(int coffeeId, String[] coffeeDescription)
throws SQLException {
...
}
以下列表显示了相应的 SQL:
CREATE PROCEDURE FIND_COFFEE(IN ID INTEGER, OUT COFFEE_DESCRIPTION VARCHAR(200)) \
PARAMETER STYLE JAVA LANGUAGE JAVA EXTERNAL NAME \
'org.springframework.integration.jdbc.storedproc.derby.DerbyStoredProcedures.findCoffee';
在 Spring Integration 中,您现在可以使用例如 stored-proc-outbound-gateway 来调用此存储过程,如下例所示:
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-coffee"
data-source="dataSource"
request-channel="findCoffeeProcedureRequestChannel"
skip-undeclared-results="true"
stored-procedure-name="FIND_COFFEE"
expect-single-result="true">
<int-jdbc:parameter name="ID" expression="payload" />
</int-jdbc:stored-proc-outbound-gateway>
JDBC 锁注册表
版本 4.3 引入了 JdbcLockRegistry。
某些组件(例如聚合器和重排序器)使用从 LockRegistry 实例获取的锁,以确保同一时间只有一个线程操作一个组。
DefaultLockRegistry 在单个组件内执行此功能。
现在您可以为这些组件配置外部锁注册表。
当与共享的 MessageGroupStore 一起使用时,您可以使用 JdbcLockRegistry 在多个应用程序实例之间提供此功能,从而确保同一时间只有一个实例能够操作该组。
当锁由本地线程释放时,另一个本地线程通常可以立即获取该锁。 如果锁是由使用不同注册表实例的线程释放的,则获取该锁可能需要长达 100ms。
JdbcLockRegistry 基于 LockRepository 抽象,该抽象具有 DefaultLockRepository 实现。
数据库架构脚本位于 org.springframework.integration.jdbc 包中,该包针对不同关系型数据库管理系统(RDBMS)厂商进行了划分。
例如,以下代码清单展示了用于锁表的 H2 DDL:
CREATE TABLE INT_LOCK (
LOCK_KEY CHAR(36),
REGION VARCHAR(100),
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);
INT_ 可根据目标数据库设计需求进行修改。
因此,您必须在 DefaultLockRepository bean 定义上使用 prefix 属性。
有时,一个应用程序会进入无法释放分布式锁并从数据库中移除特定记录的状态。
为此,其他应用程序可以在下一次加锁调用时使此类死锁过期。
timeToLive(TTL)选项在DefaultLockRepository上提供,正是用于此目的。
对于存储在给定DefaultLockRepository实例中的锁,您可能还希望指定CLIENT_ID。
如果是这样,您可以将id作为构造函数参数与DefaultLockRepository关联起来。
从版本 5.1.8 开始,JdbcLockRegistry 可以使用 idleBetweenTries 进行配置——这是一个在锁定记录插入/更新执行之间睡眠的 Duration。
默认情况下,它是 100 毫秒,并且在某些环境中,非领导者会过于频繁地通过数据源污染连接。
从版本 5.4 开始,已引入 RenewableLockRegistry 接口并将其添加到 JdbcLockRegistry 中。
在锁定过程可能超过锁的存活时间的情况下,必须在锁定过程中调用 renewLock() 方法。
因此,可以显著缩短锁的存活时间,部署操作也能更快地重新获取丢失的锁。
| 锁的续期操作仅当锁由当前线程持有时方可执行。 |
String with version 5.5.6, the JdbcLockRegistry is support automatically clean up cache for JdbcLock in JdbcLockRegistry.locks via JdbcLockRegistry.setCacheCapacity().
See its JavaDocs for more information.
带有版本 6.0 的字符串,DefaultLockRepository 可以用 PlatformTransactionManager 提供,而无需依赖应用程序上下文中的主 Bean。
String with version 6.1, the DefaultLockRepository can be configured for custom insert, update and renew queries.
For this purpose the respective setters and getters are exposed.
For example, an insert query for PostgreSQL hint can be configured like this:
lockRepository.setInsertQuery(lockRepository.getInsertQuery() + " ON CONFLICT DO NOTHING");
JDBC 元数据存储
版本 5.0 引入了 JDBC MetadataStore(参见 元数据存储)实现。
您可以使用 JdbcMetadataStore 在应用程序重启之间维护元数据状态。
此 MetadataStore 实现可与以下适配器配合使用:
要配置这些适配器以使用 JdbcMetadataStore,请使用 bean 名称 metadataStore 声明一个 Spring bean。
Feed 入站通道适配器和 feed 入站通道适配器都会自动拾取并使用声明的 JdbcMetadataStore,如下例所示:
@Bean
public MetadataStore metadataStore(DataSource dataSource) {
return new JdbcMetadataStore(dataSource);
}
The org.springframework.integration.jdbc 包包含多个关系型数据库管理系统(RDBMS)厂商的数据库架构脚本。
例如,以下列表显示了元数据表的 H2 DDL:
CREATE TABLE INT_METADATA_STORE (
METADATA_KEY VARCHAR(255) NOT NULL,
METADATA_VALUE VARCHAR(4000),
REGION VARCHAR(100) NOT NULL,
constraint INT_METADATA_STORE_PK primary key (METADATA_KEY, REGION)
);
您可以将 INT_ 前缀更改为符合目标数据库设计要求的值。
您也可以配置 JdbcMetadataStore 以使用自定义前缀。
JdbcMetadataStore 实现了 ConcurrentMetadataStore,使其能够在多个应用程序实例之间可靠地共享,其中只有一个实例可以存储或修改键的值。
所有这些操作都是原子的,得益于事务保证。
事务管理必须使用 JdbcMetadataStore。
入站通道适配器可以在轮询配置中提供对 TransactionManager 的引用。
与非事务性的 MetadataStore 实现不同,使用 JdbcMetadataStore 时,条目仅在事务提交后才出现在目标表中。
当发生回滚时,不会向 INT_METADATA_STORE 表添加任何条目。
自 5.0.7 版本起,您可以使用 RDBMS 提供商特定的 lockHint 选项来配置元数据存储项的基于锁的查询中的 JdbcMetadataStore。
默认情况下,其值为 FOR UPDATE;如果目标数据库不支持行锁定功能,则可通过空字符串进行配置。
关于在更新前锁定行的具体提示,请参阅提供商提供的 SELECT 表达式文档。