JDBC 支持

JDBC 支持

Spring Integration提供了通道适配器,用于使用数据库查询接收和发送消息。 通过这些适配器,Spring Integration 不仅支持普通 JDBC SQL 查询,还支持存储过程和存储函数调用。spring-doc.cadn.net.cn

您需要将此依赖项包含在您的项目中:spring-doc.cadn.net.cn

专家
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jdbc</artifactId>
    <version>6.1.9</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-jdbc:6.1.9"

默认情况下,以下 JDBC 组件可用:spring-doc.cadn.net.cn

Spring Integration JDBC 模块还提供了一个 JDBC 消息存储spring-doc.cadn.net.cn

入站通道适配器

入站通道适配器的主要功能是执行 SQLSELECT查询并将结果集转换为消息。 消息有效负载是整个结果集(表示为List),列表中项的类型取决于行映射策略。 默认策略是一个通用映射器,它返回一个Map查询结果中的每一行。 或者,您可以通过添加对RowMapper实例(有关行映射的更多详细信息,请参阅 Spring JDBC 文档)。spring-doc.cadn.net.cn

如果要转换SELECT查询结果到单个消息时,可以使用下游拆分器。

入站适配器还需要引用JdbcTemplate实例或DataSource.spring-doc.cadn.net.cn

以及SELECT语句来生成消息,则适配器还具有UPDATE将记录标记为已处理的语句,以便它们不会显示在下一次轮询中。 更新可以通过原始选择中的 ID 列表进行参数化。 默认情况下,这是通过命名约定(输入结果集中名为id在参数映射中转换为名为id). 以下示例定义了具有更新查询和DataSource参考。spring-doc.cadn.net.cn

<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
    channel="target" data-source="dataSource"
    update="update item set status=10 where id in (:id)" />
更新查询中的参数用冒号 (:) 前缀添加到参数的名称(在前面的示例中,该参数是要应用于轮询结果集中的每一行的表达式)。 这是 Spring JDBC 中命名参数 JDBC 支持的标准功能,结合了 Spring Integration 中采用的约定(投影到轮询结果列表)。 底层的 Spring JDBC 功能限制了可用的表达式(例如,不允许使用句点以外的大多数特殊字符),但由于目标通常是可由 bean 路径寻址的对象列表(可能是一个对象的列表),因此这不会受到过度限制。

要更改参数生成策略,您可以注入SqlParameterSourceFactory添加到适配器中以覆盖默认行为(适配器具有sql-parameter-source-factory属性)。 Spring Integration 提供ExpressionEvaluatingSqlParameterSourceFactory,它创建一个基于 SpEL 的参数源,查询结果作为#root对象。 (如果update-per-row为 true,则根对象是行)。 如果同一参数名称在更新查询中多次出现,则仅计算一次,并缓存其结果。spring-doc.cadn.net.cn

还可以将参数源用于选择查询。 在这种情况下,由于没有要计算的“结果”对象,因此每次都使用单个参数源(而不是使用参数源工厂)。 从版本 4.0 开始,您可以使用 Spring 创建基于 SpEL 的参数源,如以下示例所示:spring-doc.cadn.net.cn

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
	channel="target" data-source="dataSource"
	select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
			factory-method="createParameterSourceNoCache">
	<constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
		class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
	<property name="parameterExpressions">
		<map>
			<entry key="status" value="@statusBean.which()" />
		</map>
	</property>
</bean>

<bean id="statusBean" class="foo.StatusDetermination" />

value在每个参数表达式中,可以是任何有效的 SpEL 表达式。 这#root表达式求值的对象是parameterSource豆。 对于所有评估,它都是静态的(在前面的示例中,空的String).spring-doc.cadn.net.cn

从 5.0 版本开始,您可以提供ExpressionEvaluatingSqlParameterSourceFactorysqlParameterTypes以指定特定参数的目标 SQL 类型。spring-doc.cadn.net.cn

以下示例为查询中使用的参数提供了 SQL 类型:spring-doc.cadn.net.cn

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
    channel="target" data-source="dataSource"
    select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
            factory-method="createParameterSourceNoCache">
    <constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
        class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="sqlParameterTypes">
        <map>
            <entry key="status" value="#{ T(java.sql.Types).BINARY}" />
        </map>
    </property>
</bean>
使用createParameterSourceNoCache工厂方法。 否则,参数源将缓存评估结果。 另请注意,由于缓存已禁用,因此如果同一参数名称多次出现在选择查询中,则会针对每次匹配项重新计算该参数。

轮询和交易

入站适配器接受常规的 Spring Integration 轮询器作为子元素。 因此,可以控制轮询的频率(以及其他用途)。 用于 JDBC 的轮询器的一个重要功能是将轮询作包装在事务中的选项,如以下示例所示:spring-doc.cadn.net.cn

<int-jdbc:inbound-channel-adapter query="..."
        channel="target" data-source="dataSource" update="...">
    <int:poller fixed-rate="1000">
        <int:transactional/>
    </int:poller>
</int-jdbc:inbound-channel-adapter>
如果未显式指定轮询器,则使用默认值。 与 Spring Integration 一样,它可以定义为顶级 bean)。

在前面的示例中,数据库每 1000 毫秒(或每秒轮询一次)一次,并且更新和选择查询都在同一事务中执行。 未显示事务管理器配置。 但是,只要它知道数据源,轮询就是事务性的。 一个常见的用例是下游通道是直接通道(默认值),以便端点在同一线程中调用,因此在同一事务中调用。 这样,如果其中任何一个失败,事务将回滚,输入数据将恢复到其原始状态。spring-doc.cadn.net.cn

max-rowsmax-messages-per-poll

JDBC 入站通道适配器定义了一个名为max-rows. 指定适配器的轮询器时,还可以定义一个名为max-messages-per-poll. 虽然这两个属性看起来很相似,但它们的含义却大不相同。spring-doc.cadn.net.cn

max-messages-per-poll指定每个轮询间隔执行查询的次数,而max-rows指定每次执行返回的行数。spring-doc.cadn.net.cn

在正常情况下,您可能不想将轮询器的max-messages-per-poll属性。 其默认值为1,这意味着 JDBC 入站通道适配器的receive()方法对每个轮询间隔只执行一次。spring-doc.cadn.net.cn

设置max-messages-per-poll属性设置为更大的值意味着查询连续执行多次。 有关max-messages-per-poll属性,请参阅配置入站通道适配器spring-doc.cadn.net.cn

相比之下,max-rows属性,如果大于0,指定要从由receive()方法。 如果属性设置为0,则所有行都包含在生成的消息中。 该属性默认为0.spring-doc.cadn.net.cn

建议通过特定于提供商的查询选项(例如 MySQL)使用结果集限制LIMIT或 SQL ServerTOP或 Oracle 的ROWNUM. 有关详细信息,请参阅特定提供商文档。

出站通道适配器

出站通道适配器与入站通道适配器相反:它的作用是处理消息并使用它来执行 SQL 查询。 默认情况下,消息有效负载和标头可用作查询的输入参数,如以下示例所示:spring-doc.cadn.net.cn

<int-jdbc:outbound-channel-adapter
    query="insert into foos (id, status, name) values (:headers[id], 0, :payload[something])"
    data-source="dataSource"
    channel="input"/>

在前面的示例中,到达标记为input具有键为something,因此运算符会从映射中取消引用该值。 标头也可以作为地图访问。[]spring-doc.cadn.net.cn

前面查询中的参数是传入消息上的 bean 属性表达式(不是 SpEL 表达式)。 此行为是SqlParameterSource,这是出站适配器创建的默认源。 您可以注入不同的SqlParameterSourceFactory以获得不同的行为。

出站适配器需要对DataSourceJdbcTemplate. 您还可以注入SqlParameterSourceFactory以控制每个传入消息与查询的绑定。spring-doc.cadn.net.cn

如果输入通道是直接通道,则出站适配器在与消息发送方相同的线程中运行其查询,因此,如果有,则运行相同的事务(如果有)。spring-doc.cadn.net.cn

使用 SpEL 表达式传递参数

大多数 JDBC 通道适配器的常见需求是将参数作为 SQL 查询或存储过程或函数的一部分传递。 如前所述,这些参数默认是 bean 属性表达式,而不是 SpEL 表达式。 但是,如果需要将 SpEL 表达式作为参数传递,则必须显式注入SqlParameterSourceFactory.spring-doc.cadn.net.cn

以下示例使用ExpressionEvaluatingSqlParameterSourceFactory要实现该要求:spring-doc.cadn.net.cn

<jdbc:outbound-channel-adapter data-source="dataSource" channel="input"
    query="insert into MESSAGES (MESSAGE_ID,PAYLOAD,CREATED_DATE) values (:id, :payload, :createdDate)"
    sql-parameter-source-factory="spelSource"/>

<bean id="spelSource"
      class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="parameterExpressions">
        <map>
            <entry key="id"          value="headers['id'].toString()"/>
            <entry key="createdDate" value="new java.util.Date()"/>
            <entry key="payload"     value="payload"/>
        </map>
    </property>
</bean>

有关详细信息,请参阅定义参数源spring-doc.cadn.net.cn

使用PreparedStatement回调

有时,灵活性和松散耦合SqlParameterSourceFactory没有做我们需要的目标PreparedStatement或者我们需要做一些低级的 JDBC 工作。 Spring JDBC 模块提供了用于配置执行环境的 API(例如ConnectionCallbackPreparedStatementCreator)并作参数值(例如SqlParameterSource). 它甚至可以访问用于低级作的 API,例如StatementCallback.spring-doc.cadn.net.cn

从 Spring Integration 4.2 开始,MessagePreparedStatementSetter允许在PreparedStatement手动,在requestMessage上下文。 该类的作用与PreparedStatementSetter在标准的 Spring JDBC API 中。 实际上,它是直接从内联调用的PreparedStatementSetterJdbcMessageHandler调用executeJdbcTemplate.spring-doc.cadn.net.cn

此功能界面选项与sqlParameterSourceFactory并可用作更强大的替代方案来填充PreparedStatementrequestMessage. 例如,当我们需要存储时,它很有用File数据到数据库BLOB列。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
@ServiceActivator(inputChannel = "storeFileChannel")
public MessageHandler jdbcMessageHandler(DataSource dataSource) {
    JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(dataSource,
            "INSERT INTO imagedb (image_name, content, description) VALUES (?, ?, ?)");
    jdbcMessageHandler.setPreparedStatementSetter((ps, m) -> {
        ps.setString(1, m.getHeaders().get(FileHeaders.FILENAME));
        try (FileInputStream inputStream = new FileInputStream((File) m.getPayload()); ) {
            ps.setBlob(2, inputStream);
        }
        catch (Exception e) {
            throw new MessageHandlingException(m, e);
        }
        ps.setClob(3, new StringReader(m.getHeaders().get("description", String.class)));
    });
    return jdbcMessageHandler;
}

从 XML 配置的角度来看,prepared-statement-setter属性在<int-jdbc:outbound-channel-adapter>元件。 它允许您指定一个MessagePreparedStatementSetterbean 引用。spring-doc.cadn.net.cn

批量更新

从 5.1 版本开始,JdbcMessageHandler执行JdbcOperations.batchUpdate()如果请求消息的有效负载是Iterable实例。 的每个元素Iterable被包装成Message替换为请求消息中的标头,如果此类元素不是Message已经。 在常规的情况下SqlParameterSourceFactory基于配置的这些消息用于构建SqlParameterSource[]对于上述中使用的参数JdbcOperations.batchUpdate()功能。 当MessagePreparedStatementSetter配置,则BatchPreparedStatementSettervariant 用于迭代每个项目的这些消息,并提供的MessagePreparedStatementSetter被要求反对他们。 在以下情况下,不支持批量更新keysGenerated模式已选择。spring-doc.cadn.net.cn

出站网关

出站网关就像出站适配器和入站适配器的组合:它的作用是处理消息并使用它来执行 SQL 查询,然后通过将其发送到回复通道来响应结果。 默认情况下,消息有效负载和标头可用作查询的输入参数,如以下示例所示:spring-doc.cadn.net.cn

<int-jdbc:outbound-gateway
    update="insert into mythings (id, status, name) values (:headers[id], 0, :payload[thing])"
    request-channel="input" reply-channel="output" data-source="dataSource" />

前面示例的结果是将一条记录插入到mythings表,并返回一条消息,指示受影响的行数(有效负载是映射:{UPDATED=1}) 到输出通道。spring-doc.cadn.net.cn

如果更新查询是具有自动生成键的插入,则可以通过添加keys-generated="true"到前面的示例(这不是默认值,因为某些数据库平台不支持它)。 以下示例显示了更改的配置:spring-doc.cadn.net.cn

<int-jdbc:outbound-gateway
    update="insert into mythings (status, name) values (0, :payload[thing])"
    request-channel="input" reply-channel="output" data-source="dataSource"
    keys-generated="true"/>

您还可以提供选择查询来执行并从结果(例如入站适配器)生成回复消息,而不是更新计数或生成的键,如以下示例所示:spring-doc.cadn.net.cn

<int-jdbc:outbound-gateway
    update="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])"
    query="select * from foos where id=:headers[$id]"
    request-channel="input" reply-channel="output" data-source="dataSource"/>

从 Spring Integration 2.2 开始,更新 SQL 查询不再是强制性的。 现在,您只能提供选择查询,方法是使用query属性或query元素。 如果您需要使用通用网关或有效负载扩充器等方式主动检索数据,这将非常有用。 然后,根据结果生成回复消息(类似于入站适配器的工作方式)并传递给回复通道。 以下示例显示如何使用query属性:spring-doc.cadn.net.cn

<int-jdbc:outbound-gateway
    query="select * from foos where id=:headers[id]"
    request-channel="input"
    reply-channel="output"
    data-source="dataSource"/>

默认情况下,组件的SELECTquery 仅从游标返回一行(第一行)。 您可以使用max-rows选择。 如果需要从 SELECT 返回所有行,请考虑指定max-rows="0".spring-doc.cadn.net.cn

与通道适配器一样,您还可以提供SqlParameterSourceFactory请求和回复的实例。 默认值与出站适配器相同,因此请求消息可用作表达式的根。 如果keys-generated="true",表达式的根是生成的键(如果只有一个,则为映射,如果为多值,则为映射列表)。spring-doc.cadn.net.cn

出站网关需要引用DataSourceJdbcTemplate. 它还可以有一个SqlParameterSourceFactoryinject 以控制传入消息与查询的绑定。spring-doc.cadn.net.cn

从 4.2 版本开始,request-prepared-statement-setter属性在<int-jdbc:outbound-gateway>作为替代方案request-sql-parameter-source-factory. 它允许您指定一个MessagePreparedStatementSetterbean 引用,它实现了更复杂的PreparedStatement执行前的准备工作。spring-doc.cadn.net.cn

从 6.0 版开始,JdbcOutboundGateway按原样返回空列表结果,而不是将其转换为null和以前一样,意思是“没有回复”。 这导致在处理空列表是下游逻辑一部分的应用程序中进行额外的配置。 请参阅拆分器丢弃通道,了解可能的空列表处理选项。spring-doc.cadn.net.cn

有关以下详细信息,请参阅出站通道适配器MessagePreparedStatementSetter.spring-doc.cadn.net.cn

JDBC 消息存储

Spring Integration 提供了两个特定于 JDBC 的消息存储实现。 这JdbcMessageStore适用于聚合器和声明检查模式。 这JdbcChannelMessageStore实现专门为消息通道提供了更有针对性和可扩展的实现。spring-doc.cadn.net.cn

请注意,您可以使用JdbcMessageStore支持消息通道,JdbcChannelMessageStore为此目的进行了优化。spring-doc.cadn.net.cn

从版本 5.0.11、5.1.2 开始,索引的JdbcChannelMessageStore已优化。 如果此类存储中有大型消息组,则可能希望更改索引。 此外,索引PriorityChannel被注释掉,因为除非您使用 JDBC 支持的此类通道,否则不需要它。
使用OracleChannelMessageStoreQueryProvider则必须添加优先级通道索引,因为它包含在查询的提示中。

初始化数据库

在开始使用 JDBC 消息存储组件之前,您应该使用适当的对象来配置目标数据库。spring-doc.cadn.net.cn

Spring Integration附带了一些可用于初始化数据库的示例脚本。 在spring-integration-jdbcJAR 文件中,您可以在org.springframework.integration.jdbc包。 它为一系列常见数据库平台提供了示例创建和示例删除脚本。 使用这些脚本的常见方法是在 Spring JDBC 数据源初始值设定项中引用它们。 请注意,这些脚本作为示例和所需表和列名称的规范提供。 您可能会发现需要增强它们以供生产使用(例如,通过添加索引声明)。spring-doc.cadn.net.cn

通用 JDBC 消息存储

JDBC 模块提供了 Spring Integration 的实现MessageStore(在声明检查模式中很重要)和MessageGroupStore(在有状态模式(如聚合器)中很重要)由数据库支持。 这两个接口都是由JdbcMessageStore,并且支持在 XML 中配置存储实例,如以下示例所示:spring-doc.cadn.net.cn

<int-jdbc:message-store id="messageStore" data-source="dataSource"/>

您可以指定一个JdbcTemplate而不是DataSource.spring-doc.cadn.net.cn

以下示例显示了一些其他可选属性:spring-doc.cadn.net.cn

<int-jdbc:message-store id="messageStore" data-source="dataSource"
    lob-handler="lobHandler" table-prefix="MY_INT_"/>

在前面的示例中,我们指定了LobHandler用于将消息作为大对象处理(这对于 Oracle 来说通常是必需的)和存储生成的查询中表名的前缀。 表名前缀默认为INT_.spring-doc.cadn.net.cn

支持消息通道

如果您打算使用 JDBC 支持消息通道,我们建议使用JdbcChannelMessageStore实现。 它只能与消息通道结合使用。spring-doc.cadn.net.cn

支持的数据库

JdbcChannelMessageStore使用特定于数据库的 SQL 查询从数据库中检索消息。 因此,您必须将ChannelMessageStoreQueryProvider属性JdbcChannelMessageStore. 这channelMessageStoreQueryProvider提供您指定的特定数据库的 SQL 查询。 Spring Integration 为以下关系数据库提供支持:spring-doc.cadn.net.cn

如果您的数据库未列出,您可以扩展AbstractChannelMessageStoreQueryProvider类,并提供您自己的自定义查询。spring-doc.cadn.net.cn

4.0 版本添加了MESSAGE_SEQUENCE列,以确保先进先出 (FIFO) 队列,即使消息存储在同一毫秒内也是如此。spring-doc.cadn.net.cn

自定义消息插入

从 5.0 版本开始,通过重载ChannelMessageStorePreparedStatementSetter类,您可以在JdbcChannelMessageStore. 您可以使用它来设置不同的列或更改表结构或序列化策略。 例如,将默认序列化设置为byte[],您可以将其结构存储为 JSON 字符串。spring-doc.cadn.net.cn

以下示例使用默认实现setValues以存储公共列,并覆盖将消息有效负载存储为varchar:spring-doc.cadn.net.cn

public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {

    @Override
    public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
        Object groupId, String region, 	boolean priorityEnabled) throws SQLException {
        // Populate common columns
        super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
        // Store message payload as varchar
        preparedStatement.setString(6, requestMessage.getPayload().toString());
    }
}

通常,我们不建议使用关系数据库进行排队。 相反,如果可能,请考虑改用 JMS 或 AMQP 支持的通道。 有关进一步参考,请参阅以下资源:spring-doc.cadn.net.cn

如果您仍计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,这将在后续部分中描述。spring-doc.cadn.net.cn

并发轮询

轮询消息通道时,可以选择配置关联的Poller使用TaskExecutor参考。spring-doc.cadn.net.cn

但是请记住,如果您使用 JDBC 支持的消息通道,并且计划轮询该通道,从而使用多个线程以事务方式轮询消息存储,则应确保使用支持多版本并发控制 (MVCC) 的关系数据库。 否则,锁定可能是一个问题,并且使用多个线程时性能可能无法按预期实现。 例如,Apache Derby 在这方面就存在问题。spring-doc.cadn.net.cn

实现更好的 JDBC 队列吞吐量并避免不同线程可能轮询相同的问题Message从队列中,将usingIdCache属性JdbcChannelMessageStoretrue使用不支持 MVCC 的数据库时。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

<bean id="queryProvider"
    class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="10"
    queue-capacity="10" rejection-policy="CALLER_RUNS" />

<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="TX_TIMEOUT"/>
    <property name="usingIdCache" value="true"/>
</bean>

<int:channel id="inputChannel">
    <int:queue message-store="store"/>
</int:channel>

<int:bridge input-channel="inputChannel" output-channel="outputChannel">
    <int:poller fixed-delay="500" receive-timeout="500"
        max-messages-per-poll="1" task-executor="pool">
        <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
        isolation="READ_COMMITTED" transaction-manager="transactionManager" />
    </int:poller>
</int:bridge>

<int:channel id="outputChannel" />
优先通道

从 4.0 版本开始,JdbcChannelMessageStore实现PriorityCapableChannelMessageStore并提供priorityEnabled选项,让它用作message-store参考priority-queue实例。 为此,该INT_CHANNEL_MESSAGEtable 有一个MESSAGE_PRIORITY列来存储PRIORITY消息标头。 此外,新的MESSAGE_SEQUENCE列让我们实现了强大的先进先出 (FIFO) 轮询机制,即使在同一毫秒内以相同的优先级存储了多条消息。 从数据库中轮询(选择)消息,并使用order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE.spring-doc.cadn.net.cn

我们不建议使用相同的JdbcChannelMessageStorebean 用于优先级和非优先级队列通道,因为priorityEnabled选项适用于整个存储,并且不会为队列通道保留正确的 FIFO 队列语义。 然而,同样INT_CHANNEL_MESSAGE表(甚至region) 可以同时用于两者JdbcChannelMessageStore类型。 要配置该场景,您可以从另一个消息存储 Bean 扩展一个消息存储 Bean,如以下示例所示:
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

对邮件存储进行分区

通常使用JdbcMessageStore作为同一应用程序中的一组应用程序或节点的全局存储。 为了提供一些针对名称冲突的保护并控制数据库元数据配置,消息存储允许以两种方式对表进行分区。 一种方法是通过更改前缀(如前所述)来使用单独的表名。 另一种方法是指定一个region用于在单个表中对数据进行分区的名称。 第二种方法的一个重要用例是当MessageStore正在管理支持 Spring Integration Message Channel 的持久队列。 持久性通道的消息数据在存储中的通道名称上键入。 因此,如果通道名称不是全局唯一的,则通道可以拾取不适合它们的数据。 为避免这种危险,您可以使用消息存储region将具有相同逻辑名称的不同物理通道的数据分开。spring-doc.cadn.net.cn

PostgreSQL:接收推送通知

PostgreSQL 提供了一个监听和通知框架,用于在数据库表作时接收推送通知。 Spring Integration 利用此机制(从版本 6.0 开始)允许在将新消息添加到JdbcChannelMessageStore. 使用此功能时,必须定义数据库触发器,该触发器可以作为schema-postgresql.sql包含在 Spring Integration 的 JDBC 模块中的文件。spring-doc.cadn.net.cn

推送通知通过PostgresChannelMessageTableSubscriber类,允许其订阅者在到达任何给定消息的新消息时接收回调regiongroupId. 即使消息被附加到不同的 JVM 上,也会收到这些通知,但附加到同一个数据库。 这PostgresSubscribableChannel实现使用PostgresChannelMessageTableSubscriber.Subscription从商店中提取消息的合约,作为对上述通知的反应PostgresChannelMessageTableSubscriber通知。spring-doc.cadn.net.cn

例如,推送通知some group可按如下方式接收:spring-doc.cadn.net.cn

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
    messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
    return messageStore;
}

@Bean
public PostgresChannelMessageTableSubscriber subscriber(
      @Value("${spring.datasource.url}") String url,
      @Value("${spring.datasource.username}") String username,
      @Value("${spring.datasource.password}") String password) {
    return new PostgresChannelMessageTableSubscriber(() ->
        DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}

@Bean
public PostgresSubscribableChannel channel(
    PostgresChannelMessageTableSubscriber subscriber,
    JdbcChannelMessageStore messageStore) {
  return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}

从版本 6.0.5 开始,指定PlatformTransactionManagerPostgresSubscribableChannel将在交易中通知订阅者。 订阅者中的异常将导致事务回滚,并将消息放回消息存储中。 默认情况下,事务性支持不会激活。spring-doc.cadn.net.cn

从版本 6.0.5 开始,可以通过提供RetryTemplatePostgresSubscribableChannel. 默认情况下,不执行重试。spring-doc.cadn.net.cn

任何活动PostgresChannelMessageTableSubscriber占用一个独占的JDBCConnection在其有效生命周期的持续时间内。 因此,此连接不源自池,这一点很重要DataSource. 此类连接池通常期望在预定义的超时窗口内关闭已发出的连接。spring-doc.cadn.net.cn

对于这种独占连接的需求,还建议 JVM 只运行一个PostgresChannelMessageTableSubscriber可用于注册任意数量的订阅。spring-doc.cadn.net.cn

存储过程

在某些情况下,普通的 JDBC 支持是不够的。 也许您处理遗留关系数据库架构,或者您有复杂的数据处理需求,但最终,您必须使用存储过程或存储函数。 从 Spring Integration 2.1 开始,我们提供了三个组件来执行存储过程或存储函数:spring-doc.cadn.net.cn

支持的数据库

为了启用对存储过程和存储函数的调用,存储过程组件使用org.springframework.jdbc.core.simple.SimpleJdbcCall类。 因此,完全支持以下数据库执行存储过程:spring-doc.cadn.net.cn

如果要执行存储函数,则完全支持以下数据库:spring-doc.cadn.net.cn

即使您的特定数据库可能没有得到完全支持,但只要您的 RDBMS 支持存储过程或存储函数,您也有可能非常成功地使用存储过程 Spring Integration 组件。spring-doc.cadn.net.cn

事实上,一些提供的集成测试使用 H2 数据库。 尽管如此,彻底测试这些使用场景非常重要。spring-doc.cadn.net.cn

配置

存储过程组件提供完整的 XML 命名空间支持,并且配置组件与前面讨论的通用 JDBC 组件类似。spring-doc.cadn.net.cn

通用配置属性

所有存储过程组件共享某些配置参数:spring-doc.cadn.net.cn

  • auto-startup:生命周期属性,指示是否应在应用程序上下文启动期间启动此组件。 它默认为true. 自选。spring-doc.cadn.net.cn

  • data-source:对javax.sql.DataSource,用于访问数据库。 必填。spring-doc.cadn.net.cn

  • id:标识底层 Spring bean 定义,它是EventDrivenConsumerPollingConsumer,具体取决于出站通道适配器的channel属性引用SubscribableChannelPollableChannel. 自选。spring-doc.cadn.net.cn

  • ignore-column-meta-data:对于完全受支持的数据库,基础SimpleJdbcCallclass 可以自动从 JDBC 元数据中检索存储过程或存储函数的参数信息。spring-doc.cadn.net.cn

    但是,如果数据库不支持元数据查找,或者需要提供自定义参数定义,则可以将此标志设置为true. 它默认为false. 自选。spring-doc.cadn.net.cn

  • is-function:如果true,则调用 SQL 函数。 在这种情况下,stored-procedure-namestored-procedure-name-expression属性定义被调用函数的名称。 它默认为false. 自选。spring-doc.cadn.net.cn

  • stored-procedure-name:此属性指定存储过程的名称。如果is-function属性设置为true,则此属性会改为指定函数名称。此属性或stored-procedure-name-expression必须指定。spring-doc.cadn.net.cn

  • stored-procedure-name-expression:此属性使用 SpEL 表达式指定存储过程的名称。通过使用 SpEL,您可以访问完整消息(如果可用),包括其标头和有效负载。您可以使用此属性在运行时调用不同的存储过程。例如,您可以提供要作为消息标头执行的存储过程名称。表达式必须解析为String.spring-doc.cadn.net.cn

    如果is-function属性设置为true,则此属性指定存储函数。此属性或stored-procedure-name必须指定。spring-doc.cadn.net.cn

  • jdbc-call-operations-cache-size:定义缓存的最大数量SimpleJdbcCallOperations实例。 基本上,对于每个存储过程名称,一个新的SimpleJdbcCallOperations创建实例,作为回报,该实例被缓存。spring-doc.cadn.net.cn

    Spring Integration 2.2 添加了stored-procedure-name-expression属性和jdbc-call-operations-cache-size属性。

    默认缓存大小为10. 值0禁用缓存。不允许使用负值。spring-doc.cadn.net.cn

    如果启用 JMX,则有关jdbc-call-operations-cache作为 MBean 公开。有关更多信息,请参阅 MBean 导出器spring-doc.cadn.net.cn

  • sql-parameter-source-factory:(不适用于存储过程入站通道适配器。对SqlParameterSourceFactory. 默认情况下,传入的 bean 属性Message有效负载用作存储过程输入参数的源,方法是使用BeanPropertySqlParameterSourceFactory.spring-doc.cadn.net.cn

    对于基本用例,这可能已经足够了。对于更复杂的选项,请考虑传入一个或多个选项ProcedureParameter值。 请参阅定义参数源。 自选。spring-doc.cadn.net.cn

  • use-payload-as-parameter-source:(不适用于存储过程入站通道适配器。如果设置为true,的有效负载Message用作提供参数的源。如果设置为false然而,整个Message可用作参数的源。spring-doc.cadn.net.cn

    如果未传入任何过程参数,则此属性默认为true. 这意味着,通过使用默认的BeanPropertySqlParameterSourceFactory,有效负载的 Bean 属性用作存储过程或存储函数的参数值的源。spring-doc.cadn.net.cn

    但是,如果传入过程参数,则此属性(默认情况下)的计算结果为false.ProcedureParameter提供 SpEL 表达式。 因此,能够访问整个是非常有益的Message. 该属性设置在基础StoredProcExecutor. 自选。spring-doc.cadn.net.cn

常见配置子元素

存储过程组件共享一组通用子元素,可用于定义参数并将其传递给存储过程或存储函数。 以下元素可用:spring-doc.cadn.net.cn

  • parameterspring-doc.cadn.net.cn

  • returning-resultsetspring-doc.cadn.net.cn

  • sql-parameter-definitionspring-doc.cadn.net.cn

  • pollerspring-doc.cadn.net.cn

  • parameter:提供提供存储过程参数的机制。 参数可以是静态的,也可以是使用 SpEL 表达式提供的。spring-doc.cadn.net.cn

    <int-jdbc:parameter name=""         (1)
                        type=""         (2)
                        value=""/>      (3)
    
    <int-jdbc:parameter name=""
                        expression=""/> (4)

    + <1> 要传递到存储过程或存储函数的参数的名称。 必填。 <2> 此属性指定值的类型。 如果未提供任何内容,则此属性默认为java.lang.String. 仅当value属性。 自选。 <3> 参数的值。 您必须提供此属性或expression属性。 自选。 <4>而不是value属性,您可以指定一个 SpEL 表达式来传递参数的值。 如果指定expressionvalue属性不允许。 自选。spring-doc.cadn.net.cn

  • returning-resultset:存储过程可能会返回多个结果集。 通过设置一个或多个returning-resultset元素,您可以指定RowMappers转换每个返回的ResultSet到有意义的对象。 自选。spring-doc.cadn.net.cn

    <int-jdbc:returning-resultset name="" row-mapper="" />
  • sql-parameter-definition:如果使用完全受支持的数据库,则通常不必指定存储过程参数定义。 相反,这些参数可以从 JDBC 元数据自动派生。 但是,如果您使用不完全支持的数据库,则必须使用sql-parameter-definition元素。spring-doc.cadn.net.cn

    您还可以选择使用ignore-column-meta-data属性。spring-doc.cadn.net.cn

    <int-jdbc:sql-parameter-definition
                                       name=""                           (1)
                                       direction="IN"                    (2)
                                       type="STRING"                     (3)
                                       scale="5"                         (4)
                                       type-name="FOO_STRUCT"            (5)
                                       return-type="fooSqlReturnType"/>  (6)
    1 指定 SQL 参数的名称。 必填。
    2 指定 SQL 参数定义的方向。 默认为IN. 有效值为:IN,OUTINOUT. 如果您的过程返回结果集,请使用returning-resultset元素。 自选。
    3 用于此 SQL 参数定义的 SQL 类型。 转换为整数值,定义如下java.sql.Types. 或者,您也可以提供整数值。 如果未显式设置此属性,则默认为“VARCHAR”。 自选。
    4 SQL 参数的规模。 仅用于数字和十进制参数。 自选。
    5 typeName对于用户命名的类型,例如:STRUCT,DISTINCT,JAVA_OBJECT和命名数组类型。 此属性与scale属性。 自选。
    6 对复杂类型的自定义值处理程序的引用。 实现SqlReturnType. 此属性与scale属性,仅适用于 OUT 和 INOUT 参数。 自选。
  • poller:如果此端点是PollingConsumer. 自选。spring-doc.cadn.net.cn

定义参数源

参数源控制检索 Spring Integration 消息属性并将其映射到相关存储过程输入参数的技术。spring-doc.cadn.net.cn

存储过程组件遵循某些规则。 默认情况下,的 bean 属性Message有效负载用作存储过程输入参数的源。 在这种情况下,一个BeanPropertySqlParameterSourceFactory被使用。 对于基本用例来说,这可能就足够了。 下一个示例演示了该默认行为。spring-doc.cadn.net.cn

对于使用BeanPropertySqlParameterSourceFactory要工作,您的 Bean 属性必须以小写定义。 这是因为在org.springframework.jdbc.core.metadata.CallMetaDataContext(Java 方法是matchInParameterValuesWithCallParameters()),检索到的存储过程参数声明将转换为小写。 因此,如果您具有驼峰命名法 bean 属性(例如lastName),查找失败。 在这种情况下,请提供显式ProcedureParameter.

假设我们有一个有效负载,它由一个具有以下三个属性的简单 bean 组成:id,namedescription. 此外,我们还有一个简单的存储过程,称为INSERT_COFFEE接受三个输入参数:id,namedescription. 我们还使用完全支持的数据库。 在这种情况下,存储过程出站适配器的以下配置就足够了:spring-doc.cadn.net.cn

<int-jdbc:stored-proc-outbound-channel-adapter data-source="dataSource"
    channel="insertCoffeeProcedureRequestChannel"
    stored-procedure-name="INSERT_COFFEE"/>

对于更复杂的选项,请考虑传入一个或多个ProcedureParameter值。spring-doc.cadn.net.cn

如果您确实提供了ProcedureParameter默认情况下,显式的值ExpressionEvaluatingSqlParameterSourceFactory用于参数处理,以启用 SpEL 表达式的全部功能。spring-doc.cadn.net.cn

如果您需要对参数的检索方式进行更多控制,请考虑传入SqlParameterSourceFactory通过使用sql-parameter-source-factory属性。spring-doc.cadn.net.cn

存储过程入站通道适配器

以下列表调用了对存储过程入站通道适配器很重要的属性:spring-doc.cadn.net.cn

<int-jdbc:stored-proc-inbound-channel-adapter
                                   channel=""                                    (1)
                                   stored-procedure-name=""
                                   data-source=""
                                   auto-startup="true"
                                   id=""
                                   ignore-column-meta-data="false"
                                   is-function="false"
                                   skip-undeclared-results=""                    (2)
                                   return-value-required="false"                 (3)
    <int:poller/>
    <int-jdbc:sql-parameter-definition name="" direction="IN"
                                               type="STRING"
                                               scale=""/>
    <int-jdbc:parameter name="" type="" value=""/>
    <int-jdbc:parameter name="" expression=""/>
    <int-jdbc:returning-resultset name="" row-mapper="" />
</int-jdbc:stored-proc-inbound-channel-adapter>
1 将轮询消息发送到的通道。 如果存储过程或函数未返回任何数据,则Message为 null。 必填。
2 如果此属性设置为true,则存储过程调用的所有结果都没有相应的SqlOutParameter声明被绕过。 例如,存储过程可以返回更新计数值,即使存储过程仅声明了单个结果参数也是如此。 确切的行为取决于数据库实现。 该值是在基础JdbcTemplate. 该值默认为true. 自选。
3 指示是否应包含此过程的返回值。 从 Spring Integration 3.0 开始。 自选。

存储过程出站通道适配器

以下列表调用了对存储过程出站通道适配器很重要的属性:spring-doc.cadn.net.cn

<int-jdbc:stored-proc-outbound-channel-adapter channel=""                        (1)
                                               stored-procedure-name=""
                                               data-source=""
                                               auto-startup="true"
                                               id=""
                                               ignore-column-meta-data="false"
                                               order=""                          (2)
                                               sql-parameter-source-factory=""
                                               use-payload-as-parameter-source="">
    <int:poller fixed-rate=""/>
    <int-jdbc:sql-parameter-definition name=""/>
    <int-jdbc:parameter name=""/>

</int-jdbc:stored-proc-outbound-channel-adapter>
1 此端点的接收消息通道。 必填。
2 指定此端点作为通道的订阅者连接到时的调用顺序。当该通道使用failoverdispatching 策略。当此端点本身是具有队列的通道的轮询使用者时,它不起作用。 自选。

存储过程出站网关

以下列表调用了对存储过程出站通道适配器很重要的属性:spring-doc.cadn.net.cn

<int-jdbc:stored-proc-outbound-gateway request-channel=""                        (1)
                                       stored-procedure-name=""
                                       data-source=""
                                   auto-startup="true"
                                   id=""
                                   ignore-column-meta-data="false"
                                   is-function="false"
                                   order=""
                                   reply-channel=""                              (2)
                                   reply-timeout=""                              (3)
                                   return-value-required="false"                 (4)
                                   skip-undeclared-results=""                    (5)
                                   sql-parameter-source-factory=""
                                   use-payload-as-parameter-source="">
<int-jdbc:sql-parameter-definition name="" direction="IN"
                                   type=""
                                   scale="10"/>
<int-jdbc:sql-parameter-definition name=""/>
<int-jdbc:parameter name="" type="" value=""/>
<int-jdbc:parameter name="" expression=""/>
<int-jdbc:returning-resultset name="" row-mapper="" />
1 此端点的接收消息通道。 必填。
2 收到数据库响应后应向其发送回复的消息通道。 自选。
3 用于指定此网关在引发异常之前等待回复消息成功发送的时间。 请记住,当发送到DirectChannel,则调用发生在发送方的线程中。 因此,发送作的失败可能是由下游的其他组件引起的。 该值以毫秒为单位指定。 自选。
4 指示是否应包含此过程的返回值。 自选。
5 如果skip-undeclared-results属性设置为true,则存储过程调用的所有结果都没有相应的SqlOutParameter声明被绕过。 例如,存储过程可能会返回更新计数值,即使存储过程仅声明了单个结果参数也是如此。 确切的行为取决于数据库。 该值是在基础JdbcTemplate. 该值默认为true. 自选。

例子

本节包含两个调用 Apache Derby 存储过程的示例。 第一个过程调用一个存储过程,该存储过程返回ResultSet. 通过使用RowMapper,数据被转换为域对象,然后成为 Spring Integration 消息有效负载。spring-doc.cadn.net.cn

在第二个示例中,我们调用一个存储过程,该存储过程使用输出参数来返回数据。spring-doc.cadn.net.cn

该项目包含此处引用的 Apache Derby 示例,以及如何运行它的说明。 Spring Integration Samples 项目还提供了使用 Oracle 存储过程的示例spring-doc.cadn.net.cn

在第一个示例中,我们调用一个名为FIND_ALL_COFFEE_BEVERAGES不定义任何输入参数,但返回ResultSet.spring-doc.cadn.net.cn

在 Apache Derby 中,存储过程是用 Java 实现的。 以下列表显示了方法签名:spring-doc.cadn.net.cn

public static void findAllCoffeeBeverages(ResultSet[] coffeeBeverages)
            throws SQLException {
    ...
}

以下列表显示了相应的 SQL:spring-doc.cadn.net.cn

CREATE PROCEDURE FIND_ALL_COFFEE_BEVERAGES() \
PARAMETER STYLE JAVA LANGUAGE JAVA MODIFIES SQL DATA DYNAMIC RESULT SETS 1 \
EXTERNAL NAME 'o.s.i.jdbc.storedproc.derby.DerbyStoredProcedures.findAllCoffeeBeverages';

在 Spring Integration 中,您现在可以通过使用例如,stored-proc-outbound-gateway,如以下示例所示:spring-doc.cadn.net.cn

<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-all"
                                       data-source="dataSource"
                                       request-channel="findAllProcedureRequestChannel"
                                       expect-single-result="true"
                                       stored-procedure-name="FIND_ALL_COFFEE_BEVERAGES">
<int-jdbc:returning-resultset name="coffeeBeverages"
    row-mapper="org.springframework.integration.support.CoffeBeverageMapper"/>
</int-jdbc:stored-proc-outbound-gateway>

在第二个示例中,我们调用一个名为FIND_COFFEE有一个输入参数。 而不是返回ResultSet,它使用输出参数。 以下示例显示了方法签名:spring-doc.cadn.net.cn

public static void findCoffee(int coffeeId, String[] coffeeDescription)
            throws SQLException {
    ...
}

以下列表显示了相应的 SQL:spring-doc.cadn.net.cn

CREATE PROCEDURE FIND_COFFEE(IN ID INTEGER, OUT COFFEE_DESCRIPTION VARCHAR(200)) \
PARAMETER STYLE JAVA LANGUAGE JAVA EXTERNAL NAME \
'org.springframework.integration.jdbc.storedproc.derby.DerbyStoredProcedures.findCoffee';

在 Spring Integration 中,您现在可以通过使用例如,stored-proc-outbound-gateway,如以下示例所示:spring-doc.cadn.net.cn

<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-coffee"
                                       data-source="dataSource"
                                       request-channel="findCoffeeProcedureRequestChannel"
                                       skip-undeclared-results="true"
                                       stored-procedure-name="FIND_COFFEE"
                                       expect-single-result="true">
    <int-jdbc:parameter name="ID" expression="payload" />
</int-jdbc:stored-proc-outbound-gateway>

JDBC 锁注册表

4.3 版引入了JdbcLockRegistry. 某些组件(例如,聚合器和重排序器)使用从LockRegistry实例,以确保一次只有一个线程作一个组。 这DefaultLockRegistry在单个组件中执行此功能。 现在可以在这些组件上配置外部锁注册表。 与共享的MessageGroupStore,您可以使用JdbcLockRegistry跨多个应用程序实例提供此功能,以便一次只有一个实例可以作该组。spring-doc.cadn.net.cn

当本地线程释放锁时,另一个本地线程通常可以立即获取锁。 如果使用其他注册表实例的线程释放锁,则获取锁最多可能需要 100 毫秒。spring-doc.cadn.net.cn

JdbcLockRegistry基于LockRepository抽象,它有一个DefaultLockRepository实现。 数据库架构脚本位于org.springframework.integration.jdbc包,它为特定的 RDBMS 提供商划分。 例如,以下列表显示了锁表的 H2 DDL:spring-doc.cadn.net.cn

CREATE TABLE INT_LOCK  (
    LOCK_KEY CHAR(36),
    REGION VARCHAR(100),
    CLIENT_ID CHAR(36),
    CREATED_DATE TIMESTAMP NOT NULL,
    constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);

INT_可以根据目标数据库的设计要求进行更改。因此,您必须使用prefix属性DefaultLockRepositorybean 定义。spring-doc.cadn.net.cn

有时,一个应用程序已进入无法释放分布式锁并删除数据库中的特定记录的状态。为此,此类死锁可能会在下一次锁定调用时由另一个应用程序过期。 这timeToLive(TTL) 选项DefaultLockRepository为此目的而提供。您可能还想指定CLIENT_ID对于为给定存储的锁DefaultLockRepository实例。 如果是这样,您可以指定idDefaultLockRepository作为构造函数参数。spring-doc.cadn.net.cn

从 5.1.8 版开始,JdbcLockRegistry可以使用idleBetweenTries-一个Duration在锁定记录插入/更新执行之间休眠。默认情况下,它是100毫秒级,并且在某些环境中,非领导者过于频繁地污染与数据源的连接。spring-doc.cadn.net.cn

从 5.4 版本开始,RenewableLockRegistry接口已被引入并添加到JdbcLockRegistry. 这renewLock()方法必须在锁定进程期间调用,否则锁定进程将比锁的生存时间长。因此,生存时间可以大大缩短,并且部署可以快速重新获取丢失的锁。spring-doc.cadn.net.cn

仅当锁由当前线程持有时,才能完成锁更新。

String 版本为 5.5.6,则JdbcLockRegistry支持自动清理缓存 JdbcLock inJdbcLockRegistry.locks通过JdbcLockRegistry.setCacheCapacity(). 有关更多信息,请参阅其 JavaDocs。spring-doc.cadn.net.cn

String 与 6.0 版本相比,则DefaultLockRepository可以提供PlatformTransactionManager而不是依赖应用程序上下文中的主 Bean。spring-doc.cadn.net.cn

String 版本为 6.1,则DefaultLockRepository可配置为自定义insert,updaterenew查询。 为此,公开了各自的 setter 和 getter。例如,PostgreSQL 提示的插入查询可以这样配置:spring-doc.cadn.net.cn

lockRepository.setInsertQuery(lockRepository.getInsertQuery() + " ON CONFLICT DO NOTHING");

JDBC 元数据存储

5.0 版引入了 JDBCMetadataStore(请参阅元数据存储)实现。 您可以使用JdbcMetadataStore以在应用程序重启时维护元数据状态。 这MetadataStore实现可以与以下适配器一起使用:spring-doc.cadn.net.cn

要将这些适配器配置为使用JdbcMetadataStore,使用 bean 名称metadataStore. Feed 入站通道适配器和 Feed 入站通道适配器都会自动选取并使用声明的JdbcMetadataStore,如以下示例所示:spring-doc.cadn.net.cn

@Bean
public MetadataStore metadataStore(DataSource dataSource) {
    return new JdbcMetadataStore(dataSource);
}

org.springframework.integration.jdbc包包含多个 RDMBS 提供商的数据库模式脚本。 例如,以下列表显示了元数据表的 H2 DDL:spring-doc.cadn.net.cn

CREATE TABLE INT_METADATA_STORE  (
	METADATA_KEY VARCHAR(255) NOT NULL,
	METADATA_VALUE VARCHAR(4000),
	REGION VARCHAR(100) NOT NULL,
	constraint INT_METADATA_STORE_PK primary key (METADATA_KEY, REGION)
);

您可以更改INT_前缀以匹配目标数据库设计要求。 您还可以配置JdbcMetadataStore以使用自定义前缀。spring-doc.cadn.net.cn

JdbcMetadataStore实现ConcurrentMetadataStore,使其在多个应用程序实例之间可靠共享,其中只有一个实例可以存储或修改键的值。 所有这些作都是原子的,这要归功于事务保证。spring-doc.cadn.net.cn

事务管理必须使用JdbcMetadataStore. 入站通道适配器可以提供对TransactionManager在轮询器配置中。与非事务性不同MetadataStore实现,使用JdbcMetadataStore,则该条目仅在事务提交后才出现在目标表中。发生回滚时,不会将任何条目添加到INT_METADATA_STORE桌子。spring-doc.cadn.net.cn

从 5.0.7 版本开始,您可以配置JdbcMetadataStore使用 RDBMS 提供商特定的lockHint选项,用于对元数据存储条目进行基于锁的查询。默认情况下,它是FOR UPDATE如果目标数据库不支持行锁定功能,则可以使用空字符串进行配置。请咨询您的提供商,了解SELECT用于在更新前锁定行的表达式。spring-doc.cadn.net.cn