此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1spring-doc.cadn.net.cn

消息存储

企业集成模式 (EIP) 一书确定了几种能够缓冲消息的模式。 例如,聚合器会缓冲消息,直到可以释放它们,并且QueueChannel缓冲消息,直到使用者从该通道显式接收这些消息。 由于在消息流中的任何时间点都可能发生故障,因此缓冲消息的 EIP 组件还会引入消息可能丢失的点。spring-doc.cadn.net.cn

为了降低丢失消息的风险,EIP 定义了消息存储模式,它允许 EIP 组件存储消息,通常存储在某种类型的持久存储(例如 RDBMS)中。spring-doc.cadn.net.cn

Spring Integration 通过以下方式为消息存储模式提供支持:spring-doc.cadn.net.cn

有关如何配置特定消息存储实现以及如何注入MessageStore整个手册都描述了特定缓冲组件的实现(请参阅特定组件,例如 QueueChannelAggregatorDelayer 等)。 以下一对示例演示如何添加对消息存储的引用QueueChannel对于聚合器:spring-doc.cadn.net.cn

队列通道
<int:channel id="myQueueChannel">
    <int:queue message-store="refToMessageStore"/>
<int:channel>
聚合
<int:aggregator message-store="refToMessageStore"/>

默认情况下,消息通过使用o.s.i.store.SimpleMessageStore,实现MessageStore. 这对于开发或简单的低容量环境来说可能没问题,因为这些环境不会担心非持久性消息的潜在丢失。 但是,典型的生产应用程序需要一个更可靠的选项,不仅可以降低消息丢失的风险,还可以避免潜在的内存不足错误。 因此,我们还提供MessageStore各种数据存储的实现。 以下是受支持实现的完整列表:spring-doc.cadn.net.cn

但是,在使用MessageStore.spring-doc.cadn.net.cn

消息数据(有效负载和标头)通过使用不同的序列化策略进行序列化和反序列化,具体取决于MessageStore. 例如,当使用JdbcMessageStoreSerializable默认情况下,数据是持久化的。 在这种情况下,在序列化发生之前,将删除不可序列化标头。 此外,请注意传输适配器(例如 FTP、HTTP、JMS 等)注入的特定于协议的标头。 例如<http:inbound-channel-adapter/>将 HTTP 标头映射到消息标头中,其中之一是ArrayList的不可序列化org.springframework.http.MediaType实例。 但是,您可以注入自己的SerializerDeserializer策略接口到一些MessageStore实现(例如JdbcMessageStore) 来更改序列化和反序列化的行为。spring-doc.cadn.net.cn

特别注意表示某些类型数据的标头。 例如,如果其中一个标头包含某个 Spring bean 的实例,则在反序列化时,您最终可能会得到该 bean 的不同实例,这直接影响框架创建的一些隐式标头(例如REPLY_CHANNELERROR_CHANNEL). 目前,它们不可序列化,但即使可序列化,反序列化通道也不会表示预期的实例。spring-doc.cadn.net.cn

从 Spring Integration 版本 3.0 开始,您可以使用配置为在向HeaderChannelRegistry.spring-doc.cadn.net.cn

此外,请考虑按如下方式配置消息流时会发生什么:网关→队列通道(由持久性消息存储支持)→服务激活器。 该网关创建一个临时应答通道,当服务激活器的轮询器从队列中读取时,该通道将丢失。 同样,您可以使用标头扩充器将标头替换为String表示法。spring-doc.cadn.net.cn

有关详细信息,请参阅标头扩充器spring-doc.cadn.net.cn

Spring Integration 4.0 引入了两个新接口:spring-doc.cadn.net.cn

  • ChannelMessageStore:实现特定于以下内容的作QueueChannel实例spring-doc.cadn.net.cn

  • PriorityCapableChannelMessageStore:标记MessageStore用于的实现PriorityChannel实例,并为持久化消息提供优先级顺序。spring-doc.cadn.net.cn

实际行为取决于实现。 该框架提供了以下实现,可以作为持久化MessageStoreQueueChannelPriorityChannel:spring-doc.cadn.net.cn

注意事项SimpleMessageStore

从 4.1 版本开始,SimpleMessageStore调用时不再复制消息组getMessageGroup(). 对于大型消息组,这是一个重大的性能问题。 4.0.1 引入了一个布尔值copyOnGet属性,用于控制此行为。 当聚合器在内部使用时,此属性设置为false以提高性能。 现在false默认情况下。spring-doc.cadn.net.cn

在组件(如聚合器)之外访问组存储的用户现在可以直接引用聚合器正在使用的组,而不是副本。 在聚合器外部作组可能会导致不可预知的结果。spring-doc.cadn.net.cn

因此,您不应该执行此类作或将copyOnGet属性设置为true.spring-doc.cadn.net.cn

MessageGroupFactory

从 4.3 版本开始,一些MessageGroupStore实现可以注入自定义MessageGroupFactory策略来创建和自定义MessageGroup实例MessageGroupStore. 这默认为SimpleMessageGroupFactory,产生SimpleMessageGroup实例基于GroupType.HASH_SET (LinkedHashSet) 内部集合。 其他可能的选项是SYNCHRONISED_SETBLOCKING_QUEUE,其中最后一个可用于恢复前一个SimpleMessageGroup行为。 此外,PERSISTENT选项可用。 有关详细信息,请参阅下一节。 从版本 5.0.1 开始,LIST当组中消息的顺序和唯一性无关紧要时,选项也可用。spring-doc.cadn.net.cn

持续MessageGroupStore和延迟加载

从 4.3 版本开始,所有持久性MessageGroupStore实例检索MessageGroup实例及其messages以延迟加载的方式从商店。 在大多数情况下,它对相关性很有用MessageHandler实例(参见 AggregatorResequencer),这会增加加载整个MessageGroup从存储中进行每个关联作。spring-doc.cadn.net.cn

您可以使用AbstractMessageGroupStore.setLazyLoadMessageGroups(false)选项以从配置中关闭延迟加载行为。spring-doc.cadn.net.cn

我们对 MongoDB 上的延迟加载进行性能测试MessageStore (MongoDB 消息存储)和<aggregator> (聚合)使用自定义release-strategy类似于以下内容:spring-doc.cadn.net.cn

<int:aggregator input-channel="inputChannel"
                output-channel="outputChannel"
                message-store="mongoStore"
                release-strategy-expression="size() == 1000"/>

对于 1000 条简单消息,它会产生类似于以下结果的结果:spring-doc.cadn.net.cn

...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms     %     Task name
-----------------------------------------
02652  007%  Lazy-Load
36266  093%  Eager
...

但是,从 5.5 版本开始,所有持久的MessageGroupStore实现提供了一个streamMessagesForGroup(Object groupId)基于目标数据库流 API 的合约。 当存储中的组非常大时,这提高了资源利用率。 在框架内部,当 Delayer 在启动时重新安排持久化消息时,将使用此新 API(例如)。 返回Stream<Message<?>>必须在处理结束时关闭,例如通过try-with-resources. 每当PersistentMessageGroup,则其streamMessages()委托到MessageGroupStore.streamMessagesForGroup().spring-doc.cadn.net.cn

消息组条件

从 5.5 版本开始,MessageGroup抽象提供了一个conditionstring 选项。 此选项的值可以是稍后出于任何原因可以解析以为组做出决策的任何值。 例如,一个ReleaseStrategy相关消息处理程序可以从组中查询此属性,而不是循环访问组中的所有消息。 这MessageGroupStore公开一个setGroupCondition(Object groupId, String condition)应用程序接口。 为此,一个setGroupConditionSupplier(BiFunction<Message<?>, String, String>)选项已添加到AbstractCorrelatingMessageHandler. 在将每条消息添加到组后,将根据每条消息以及组的现有条件评估此函数。 实现可以决定返回一个新值、现有值,或将目标条件重置为null. 一个condition可以是 JSON、SpEL 表达式、数字或任何可以序列化为字符串并在之后解析的内容。 例如,FileMarkerReleaseStrategy文件聚合器组件中,将条件填充到组中FileHeaders.LINE_COUNTFileSplitter.FileMarker.Mark.END消息并从其canRelease()将组大小与此条件下的值进行比较。 这样它就不会迭代组中的所有消息来查找FileSplitter.FileMarker.Mark.END消息与FileHeaders.LINE_COUNT页眉。 它还允许结束标记在所有其他记录之前到达聚合器;例如,在多线程环境中处理文件时。spring-doc.cadn.net.cn

此外,为了配置方便,一个GroupConditionProvider合同已经引入。 这AbstractCorrelatingMessageHandler检查是否提供了ReleaseStrategy实现此接口并提取conditionSupplier用于组条件评估逻辑。spring-doc.cadn.net.cn