对于最新的稳定版本,请使用 Spring Integration 6.5.1spring-doc.cadn.net.cn

JDBC 消息存储

Spring Integration 提供了两个特定于 JDBC 的消息存储实现。 这JdbcMessageStore适用于聚合器和声明检查模式。 这JdbcChannelMessageStore实现专门为消息通道提供了更有针对性和可扩展的实现。spring-doc.cadn.net.cn

请注意,您可以使用JdbcMessageStore支持消息通道,JdbcChannelMessageStore为此目的进行了优化。spring-doc.cadn.net.cn

从版本 5.0.11、5.1.2 开始,索引的JdbcChannelMessageStore已优化。 如果此类存储中有大型消息组,则可能希望更改索引。 此外,索引PriorityChannel被注释掉,因为除非您使用 JDBC 支持的此类通道,否则不需要它。
使用OracleChannelMessageStoreQueryProvider则必须添加优先级通道索引,因为它包含在查询的提示中。

初始化数据库

在开始使用 JDBC 消息存储组件之前,您应该使用适当的对象来配置目标数据库。spring-doc.cadn.net.cn

Spring Integration附带了一些可用于初始化数据库的示例脚本。 在spring-integration-jdbcJAR 文件中,您可以在org.springframework.integration.jdbc包。 它为一系列常见数据库平台提供了示例创建和示例删除脚本。 使用这些脚本的常见方法是在 Spring JDBC 数据源初始值设定项中引用它们。 请注意,这些脚本作为示例和所需表和列名称的规范提供。 您可能会发现需要增强它们以供生产使用(例如,通过添加索引声明)。spring-doc.cadn.net.cn

从 6.2 版开始,JdbcMessageStore,JdbcChannelMessageStore,JdbcMetadataStoreDefaultLockRepository实现SmartLifecycle并在各自的表上执行“SELECT COUNT”查询,在start()方法来确保目标数据库中存在所需的表(根据提供的前缀)。 如果所需的表不存在,则应用程序上下文无法启动。 可以通过以下方式禁用检查setCheckDatabaseOnStart(false).spring-doc.cadn.net.cn

通用 JDBC 消息存储

JDBC 模块提供了 Spring Integration 的实现MessageStore(在声明检查模式中很重要)和MessageGroupStore(在有状态模式(如聚合器)中很重要)由数据库支持。 这两个接口都是由JdbcMessageStore,并且支持在 XML 中配置存储实例,如以下示例所示:spring-doc.cadn.net.cn

<int-jdbc:message-store id="messageStore" data-source="dataSource"/>

您可以指定一个JdbcTemplate而不是DataSource.spring-doc.cadn.net.cn

以下示例显示了一些其他可选属性:spring-doc.cadn.net.cn

<int-jdbc:message-store id="messageStore" data-source="dataSource"
    lob-handler="lobHandler" table-prefix="MY_INT_"/>

在前面的示例中,我们指定了LobHandler用于将消息作为大对象处理(这对于 Oracle 来说通常是必需的)和存储生成的查询中表名的前缀。 表名前缀默认为INT_.spring-doc.cadn.net.cn

支持消息通道

如果您打算使用 JDBC 支持消息通道,我们建议使用JdbcChannelMessageStore实现。 它只能与消息通道结合使用。spring-doc.cadn.net.cn

支持的数据库

JdbcChannelMessageStore使用特定于数据库的 SQL 查询从数据库中检索消息。 因此,您必须将ChannelMessageStoreQueryProvider属性JdbcChannelMessageStore. 这channelMessageStoreQueryProvider提供您指定的特定数据库的 SQL 查询。 Spring Integration 为以下关系数据库提供支持:spring-doc.cadn.net.cn

如果您的数据库未列出,您可以实现ChannelMessageStoreQueryProvider接口并提供您自己的自定义查询。spring-doc.cadn.net.cn

4.0 版本添加了MESSAGE_SEQUENCE列,以确保先进先出 (FIFO) 队列,即使消息存储在同一毫秒内也是如此。spring-doc.cadn.net.cn

从 6.2 版本开始,ChannelMessageStoreQueryProvider公开一个isSingleStatementForPoll标志,其中PostgresChannelMessageStoreQueryProvider返回true并且它对民意调查的查询现在基于单个DELETE…​RETURNING陈述。 这JdbcChannelMessageStore咨询isSingleStatementForPoll选项并跳过单独的DELETE如果仅支持单个轮询语句,则语句。spring-doc.cadn.net.cn

自定义消息插入

从 5.0 版本开始,通过重载ChannelMessageStorePreparedStatementSetter类,您可以在JdbcChannelMessageStore. 您可以使用它来设置不同的列或更改表结构或序列化策略。 例如,将默认序列化设置为byte[],您可以将其结构存储为 JSON 字符串。spring-doc.cadn.net.cn

以下示例使用默认实现setValues以存储公共列,并覆盖将消息有效负载存储为varchar:spring-doc.cadn.net.cn

public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {

    @Override
    public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
        Object groupId, String region, 	boolean priorityEnabled) throws SQLException {
        // Populate common columns
        super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
        // Store message payload as varchar
        preparedStatement.setString(6, requestMessage.getPayload().toString());
    }
}

通常,我们不建议使用关系数据库进行排队。 相反,如果可能,请考虑改用 JMS 或 AMQP 支持的通道。 有关进一步参考,请参阅以下资源:spring-doc.cadn.net.cn

如果您仍计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,这将在后续部分中描述。spring-doc.cadn.net.cn

并发轮询

轮询消息通道时,可以选择配置关联的Poller使用TaskExecutor参考。spring-doc.cadn.net.cn

但是请记住,如果您使用 JDBC 支持的消息通道,并且您计划轮询该通道,从而轮询具有多个线程的消息存储事务,则应确保使用支持多版本并发控制 (MVCC) 的关系数据库。 否则,锁定可能是一个问题,并且使用多个线程时性能可能无法按预期实现。 例如,Apache Derby 在这方面就存在问题。spring-doc.cadn.net.cn

实现更好的 JDBC 队列吞吐量并避免不同线程可能轮询相同的问题Message从队列中,将usingIdCache属性JdbcChannelMessageStoretrue使用不支持 MVCC 的数据库时。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

<bean id="queryProvider"
    class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="10"
    queue-capacity="10" rejection-policy="CALLER_RUNS" />

<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="TX_TIMEOUT"/>
    <property name="usingIdCache" value="true"/>
</bean>

<int:channel id="inputChannel">
    <int:queue message-store="store"/>
</int:channel>

<int:bridge input-channel="inputChannel" output-channel="outputChannel">
    <int:poller fixed-delay="500" receive-timeout="500"
        max-messages-per-poll="1" task-executor="pool">
        <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
        isolation="READ_COMMITTED" transaction-manager="transactionManager" />
    </int:poller>
</int:bridge>

<int:channel id="outputChannel" />

优先通道

从 4.0 版本开始,JdbcChannelMessageStore实现PriorityCapableChannelMessageStore并提供priorityEnabled选项,让它用作message-store参考priority-queue实例。 为此,该INT_CHANNEL_MESSAGEtable 有一个MESSAGE_PRIORITY列来存储PRIORITY消息标头。 此外,新的MESSAGE_SEQUENCE列让我们实现了强大的先进先出 (FIFO) 轮询机制,即使在同一毫秒内以相同的优先级存储了多条消息。 从数据库中轮询(选择)消息,并使用order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE.spring-doc.cadn.net.cn

我们不建议使用相同的JdbcChannelMessageStorebean 用于优先级和非优先级队列通道,因为priorityEnabled选项适用于整个存储,并且不会为队列通道保留正确的 FIFO 队列语义。 然而,同样INT_CHANNEL_MESSAGE表(甚至region) 可以同时用于两者JdbcChannelMessageStore类型。 要配置该场景,您可以从另一个消息存储 Bean 扩展一个消息存储 Bean,如以下示例所示:
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

对邮件存储进行分区

通常使用JdbcMessageStore作为同一应用程序中的一组应用程序或节点的全局存储。 为了提供一些针对名称冲突的保护并控制数据库元数据配置,消息存储允许以两种方式对表进行分区。 一种方法是通过更改前缀(如前所述)来使用单独的表名。 另一种方法是指定一个region用于在单个表中对数据进行分区的名称。 第二种方法的一个重要用例是当MessageStore正在管理支持 Spring Integration Message Channel 的持久队列。 持久性通道的消息数据在存储中的通道名称上键入。 因此,如果通道名称不是全局唯一的,则通道可以拾取不适合它们的数据。 为避免这种危险,您可以使用消息存储region将具有相同逻辑名称的不同物理通道的数据分开。spring-doc.cadn.net.cn

PostgreSQL:接收推送通知

PostgreSQL 提供了一个监听和通知框架,用于在数据库表作时接收推送通知。 Spring Integration 利用此机制(从版本 6.0 开始)允许在将新消息添加到JdbcChannelMessageStore. 使用此功能时,必须定义数据库触发器,该触发器可以作为schema-postgresql.sql包含在 Spring Integration 的 JDBC 模块中的文件。spring-doc.cadn.net.cn

推送通知通过PostgresChannelMessageTableSubscriber类,允许其订阅者在到达任何给定消息的新消息时接收回调regiongroupId. 即使消息被附加到不同的 JVM 上,也会收到这些通知,但附加到同一个数据库。 这PostgresSubscribableChannel实现使用PostgresChannelMessageTableSubscriber.Subscription从商店中提取消息的合约,作为对上述通知的反应PostgresChannelMessageTableSubscriber通知。spring-doc.cadn.net.cn

例如,推送通知some group可按如下方式接收:spring-doc.cadn.net.cn

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
    messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
    return messageStore;
}

@Bean
public PostgresChannelMessageTableSubscriber subscriber(
      @Value("${spring.datasource.url}") String url,
      @Value("${spring.datasource.username}") String username,
      @Value("${spring.datasource.password}") String password) {
    return new PostgresChannelMessageTableSubscriber(() ->
        DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}

@Bean
public PostgresSubscribableChannel channel(
    PostgresChannelMessageTableSubscriber subscriber,
    JdbcChannelMessageStore messageStore) {
  return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}

从版本 6.0.5 开始,指定PlatformTransactionManagerPostgresSubscribableChannel将在交易中通知订阅者。 订阅者中的异常将导致事务回滚,并将消息放回消息存储中。 默认情况下,事务性支持不会激活。spring-doc.cadn.net.cn

从版本 6.0.5 开始,可以通过提供RetryTemplatePostgresSubscribableChannel. 默认情况下,不执行重试。spring-doc.cadn.net.cn

任何活动PostgresChannelMessageTableSubscriber占用一个独占的JDBCConnection在其有效生命周期的持续时间内。 因此,此连接不源自池,这一点很重要DataSource. 此类连接池通常期望在预定义的超时窗口内关闭已发出的连接。spring-doc.cadn.net.cn

对于这种独占连接的需求,还建议 JVM 只运行一个PostgresChannelMessageTableSubscriber可用于注册任意数量的订阅。spring-doc.cadn.net.cn