此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
聚合
聚合器基本上是拆分器的镜像,是一种消息处理程序,它接收多条消息并将它们组合成一条消息。 事实上,聚合器通常是包含拆分器的管道中的下游使用者。
从技术上讲,聚合器比拆分器更复杂,因为它是有状态的。
它必须保存要聚合的消息,并确定何时准备好聚合完整的消息组。
为此,它需要一个MessageStore
.
功能性
聚合器通过关联和存储一组相关消息来组合它们,直到该组被视为完整。 此时,聚合器通过处理整个组来创建单个消息,并将聚合消息作为输出发送。
实现聚合器需要提供执行聚合的逻辑(即,从多个消息中创建单个消息)。 两个相关的概念是关联和释放。
关联确定如何对消息进行分组以进行聚合。
在 Spring Integration 中,默认情况下,基于IntegrationMessageHeaderAccessor.CORRELATION_ID
消息头。
具有相同IntegrationMessageHeaderAccessor.CORRELATION_ID
被分组在一起。
但是,您可以自定义关联策略,以允许其他方式指定如何将消息分组在一起。
为此,您可以实现CorrelationStrategy
(本章后面会介绍)。
要确定一组消息准备好处理的时间点,请ReleaseStrategy
被咨询。
聚合器的默认发布策略在序列中包含的所有消息都存在时释放组,基于IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
页眉。
您可以通过提供对自定义ReleaseStrategy
实现。
编程模型
聚合 API 由许多类组成:
-
界面
MessageGroupProcessor
及其子类:MethodInvokingAggregatingMessageGroupProcessor
和ExpressionEvaluatingMessageGroupProcessor
-
这
ReleaseStrategy
接口及其默认实现:SimpleSequenceSizeReleaseStrategy
-
这
CorrelationStrategy
接口及其默认实现:HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
这AggregatingMessageHandler
(AbstractCorrelatingMessageHandler
) 是一个MessageHandler
实现,封装聚合器的通用功能(以及其他相关用例),如下所示:
-
将消息关联到要聚合的组中
-
在
MessageStore
直到该组可以发布 -
决定何时可以释放群组
-
将已发布的组聚合到单个消息中
-
识别和响应过期的组
决定如何将消息分组在一起的责任委托给CorrelationStrategy
实例。
决定是否可以释放消息组的责任委托给ReleaseStrategy
实例。
以下列表显示了基地的简要亮点AbstractAggregatingMessageGroupProcessor
(实施aggregatePayloads
方法留给开发者):
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
看DefaultAggregatingMessageGroupProcessor
,ExpressionEvaluatingMessageGroupProcessor
和MethodInvokingMessageGroupProcessor
作为AbstractAggregatingMessageGroupProcessor
.
从 5.2 版开始,一个Function<MessageGroup, Map<String, Object>>
策略可用于AbstractAggregatingMessageGroupProcessor
合并和计算(聚合)输出消息的标头。
这DefaultAggregateHeadersFunction
实现可用于返回组之间没有冲突的所有标头的逻辑;组中的一封或多封邮件上缺少标头不被视为冲突。
省略冲突的标头。
与新推出的DelegatingMessageGroupProcessor
,则此函数用于任何任意(非AbstractAggregatingMessageGroupProcessor
) MessageGroupProcessor
实现。 本质上,框架将提供的函数注入到AbstractAggregatingMessageGroupProcessor
实例并将所有其他实现包装到DelegatingMessageGroupProcessor
. 逻辑上的差异AbstractAggregatingMessageGroupProcessor
和DelegatingMessageGroupProcessor
后者在调用委托策略之前不会提前计算标头,并且如果委托返回Message
或AbstractIntegrationMessageBuilder
. 在这种情况下,框架假定目标实现已负责生成一组填充到返回结果中的正确标头。 这Function<MessageGroup, Map<String, Object>>
策略可作为headers-function
reference 属性,作为AggregatorSpec.headersFunction()
Java DSL 的选项和AggregatorFactoryBean.setHeadersFunction()
用于纯 Java 配置。
这CorrelationStrategy
归AbstractCorrelatingMessageHandler
并具有基于IntegrationMessageHeaderAccessor.CORRELATION_ID
message 标头,如以下示例所示:
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
至于消息组的实际处理,默认实现是DefaultAggregatingMessageGroupProcessor
.
它创建了一个Message
其有效负载是List
为给定组接收的有效负载。
这适用于具有拆分器、发布-订阅通道或上游收件人列表路由器的简单分散收集实现。
在此类场景中使用发布-订阅通道或收件人列表路由器时,请务必启用apply-sequence 旗。
这样做会添加必要的标头:CORRELATION_ID ,SEQUENCE_NUMBER 和SEQUENCE_SIZE .
默认情况下,该行为对 Spring Integration 中的拆分器启用,但未为发布-订阅通道或收件人列表路由器启用,因为这些组件可能用于不需要这些标头的各种上下文。 |
为应用程序实现特定聚合器策略时,可以扩展AbstractAggregatingMessageGroupProcessor
并实现aggregatePayloads
方法。
但是,有更好的解决方案,与 API 的耦合较少,用于实现聚合逻辑,可以通过 XML 或通过注释进行配置。
通常,任何 POJO 都可以实现聚合算法,前提是它提供了一个接受单个java.util.List
作为参数(也支持参数化列表)。
调用此方法用于聚合消息,如下所示:
-
如果参数是
java.util.Collection<T>
参数类型 T 可分配给Message
,则为聚合而累积的整个消息列表将发送到聚合器。 -
如果参数是非参数化的
java.util.Collection
或者参数类型不可分配给Message
,该方法接收累积消息的有效负载。 -
如果返回类型不可分配给
Message
,则将其视为Message
由框架自动创建。
为了简化代码和促进最佳实践(例如低耦合、可测试性等),实现聚合逻辑的首选方法是通过 POJO 并使用 XML 或注释支持在应用程序中配置它。 |
从 5.3 版本开始,在处理消息组后,一个AbstractCorrelatingMessageHandler
执行MessageBuilder.popSequenceDetails()
具有多个嵌套级别的正确拆分器聚合器方案的消息头修改。
仅当消息组释放结果不是消息集合时,才会执行此作。
在这种情况下,目标MessageGroupProcessor
负责MessageBuilder.popSequenceDetails()
调用,同时生成这些消息。
如果MessageGroupProcessor
返回一个Message
一个MessageBuilder.popSequenceDetails()
仅当sequenceDetails
与组中的第一条消息匹配。
(以前,仅当普通有效负载或AbstractIntegrationMessageBuilder
已从MessageGroupProcessor
.)
此功能可由新的popSequence
boolean
属性,因此MessageBuilder.popSequenceDetails()
在某些情况下,当标准拆分器尚未填充关联详细信息时,可以禁用。
从本质上讲,该属性撤消了最近的上游所做的作applySequence = true
在AbstractMessageSplitter
.
有关更多信息,请参阅拆分器。
这SimpleMessageGroup.getMessages() 方法返回一个unmodifiableCollection .
因此,如果聚合 POJO 方法具有Collection<Message> 参数,传入的参数正是Collection 实例,并且当您使用SimpleMessageStore 对于聚合器,该原始Collection<Message> 释放群组后被清除。
因此,Collection<Message> 如果变量被传递出聚合器,则 POJO 中的变量也会被清除。
如果您希望简单地按原样发布该集合以进行进一步处理,则必须构建一个新的Collection (例如,new ArrayList<Message>(messages) ).
从 4.3 版开始,框架不再将消息复制到新集合,以避免创建不需要的额外对象。 |
在 4.2 版本之前,无法提供MessageGroupProcessor
通过使用 XML 配置。
只有 POJO 方法可用于聚合。
现在,如果框架检测到引用的(或内部)bean 实现MessageProcessor
,它用作聚合器的输出处理器。
如果您希望从自定义MessageGroupProcessor
作为消息的有效负载,您的类应该扩展AbstractAggregatingMessageGroupProcessor
并实施aggregatePayloads()
.
此外,从 4.2 版本开始,一个SimpleMessageGroupProcessor
被提供。
它返回来自组的消息集合,如前所述,这会导致单独发送已释放的消息。
这允许聚合器充当消息屏障,在其中,到达的消息将被保留,直到释放策略触发并且组作为单个消息序列被释放。
从 6.0 版开始,仅当组处理器是SimpleMessageGroupProcessor
.
否则,与任何其他MessageGroupProcessor
返回Collection<Message>
,则仅发出一条回复消息,并将整个消息集合作为其有效负载。
这种逻辑是由聚合器的规范目的决定的——通过某个键收集请求消息并生成单个分组消息。
在 6.5 版之前,如果MessageGroupProcessor
(通常是来自 DSL 的 lambda)返回有效负载的集合,即AbstractCorrelatingMessageHandler
失败了IllegalArgumentException
声明只能收集消息。
从现在开始,这种限制将被消除,并且返回的有效负载集合将作为来自聚合器的单个回复消息发出,仅包含来自最后一个请求消息的标头。
如果标头聚合需要与有效负载集合一起,则AbstractAggregatingMessageGroupProcessor
建议使用实现而不是普通MessageGroupProcessor
功能接口。
ReleaseStrategy
这ReleaseStrategy
接口定义如下:
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
通常,任何 POJO 都可以实现完成决策逻辑,如果它提供了一个接受单个java.util.List
作为参数(也支持参数化列表)并返回布尔值。
在每条新消息到达后调用此方法,以确定组是否完整,如下所示:
-
如果参数是
java.util.List<T>
和参数类型T
可分配给Message
,则将组中累积的整个消息列表发送到该方法。 -
如果参数是非参数化的
java.util.List
或者参数类型不可分配给Message
,该方法接收累积消息的有效负载。 -
该方法必须返回
true
如果消息组已准备好进行聚合,否则为 false。
以下示例演示如何使用@ReleaseStrategy
注释List
类型Message
:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下示例演示如何使用@ReleaseStrategy
注释List
类型String
:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
基于前两个示例中的签名,基于 POJO 的发布策略会传递一个Collection
尚未发布的消息(如果您需要访问整个Message
) 或Collection
有效负载对象(如果 type 参数是Message
).
这满足了大多数用例。
但是,如果由于某种原因,您需要访问完整的MessageGroup
,您应该提供ReleaseStrategy
接口。
在处理潜在的大型组时,应了解如何调用这些方法,因为在释放组之前可能会多次调用发布策略。
最有效的是实现 出于这些原因,对于大型团体,我们建议您实施 |
当组被释放以进行聚合时,将处理其所有尚未释放的消息,并将其从组中删除。
如果该组也已完成(即,如果序列中的所有消息都已到达,或者没有定义序列),则该组将标记为完成。
此组的任何新消息都将发送到丢弃通道(如果已定义)。
设置expire-groups-upon-completion
自true
(默认值为false
) 将删除整个组,并且任何新消息(与删除的组具有相同的相关 ID)将形成一个新组。
您可以使用MessageGroupStoreReaper
䋰send-partial-result-on-expiry
设置为true
.
从 6.5 版开始,还可以将关联处理程序配置为discardIndividuallyOnExpiry
将整个组作为单个消息丢弃的选项。
本质上,此消息的有效负载是来自过期组的消息列表。
仅在以下情况下有效sendPartialResultOnExpiry
设置为false
(默认)和dicardChannel
被提供。
为了便于丢弃延迟到达的消息,聚合器必须在组释放后维护有关该组的状态。
这最终会导致内存不足的情况。
为避免此类情况,您应该考虑配置MessageGroupStoreReaper 以删除组元数据。
到期参数应设置为在到达某个点后使组过期,在此之后预计不会到达延迟消息。
有关配置收割器的信息,请参阅在聚合器中管理状态:MessageGroupStore . |
Spring Integration 提供了一个实现ReleaseStrategy
:SimpleSequenceSizeReleaseStrategy
.
此实现会咨询SEQUENCE_NUMBER
和SEQUENCE_SIZE
每个到达消息的标头,以确定消息组何时完成并准备好聚合。
如前所述,它也是默认策略。
在 5.0 版本之前,默认发布策略为SequenceSizeReleaseStrategy ,这在大型团体中表现不佳。
使用该策略,可以检测并拒绝重复的序列号。
此作可能很昂贵。 |
如果要聚合大型组,则不需要释放部分组,也不需要检测/拒绝重复序列,请考虑使用SimpleSequenceSizeReleaseStrategy
相反 - 对于这些用例,它效率要高得多,并且是自 5.0 版以来未指定部分组发布的默认值。
聚合大型组
4.3 版本更改了默认值Collection
对于SimpleMessageGroup
自HashSet
(它以前是一个BlockingQueue
).
当从大型组中删除单个消息时,这很昂贵(需要 O(n) 线性扫描)。
尽管哈希集的删除速度通常要快得多,但对于大型消息来说,它的成本可能很高,因为必须在插入和删除时计算哈希。
如果消息的哈希成本很高,请考虑使用其他集合类型。
如用MessageGroupFactory
一个SimpleMessageGroupFactory
,以便您可以选择Collection
最适合您的需求。
您还可以提供自己的工厂实现来创建其他一些Collection<Message<?>>
.
以下示例演示如何使用上一个实现配置聚合器和SimpleSequenceSizeReleaseStrategy
:
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果过滤器端点涉及聚合器的上游流,则序列大小发布策略(固定或基于sequenceSize header)不会达到其目的,因为过滤器可能会丢弃序列中的某些消息。
在这种情况下,建议选择另一个ReleaseStrategy ,或使用从丢弃子流发送的补偿消息,该子流的内容中带有一些信息,以便在自定义完整组函数中跳过。
有关详细信息,请参阅过滤器。 |
相关策略
这CorrelationStrategy
接口定义如下:
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
该方法返回一个Object
表示用于将消息与消息组相关联的关联键。
密钥必须满足用于Map
关于执行equals()
和hashCode()
.
一般来说,任何 POJO 都可以实现关联逻辑,并且将消息映射到方法的参数(或多个参数)的规则与ServiceActivator
(包括对@Header
注释)。
该方法必须返回一个值,并且该值不得null
.
Spring Integration 提供了一个实现CorrelationStrategy
:HeaderAttributeCorrelationStrategy
.
此实现返回其中一个消息头(其名称由构造函数参数指定)的值作为关联键。
默认情况下,关联策略是HeaderAttributeCorrelationStrategy
返回CORRELATION_ID
header 属性。
如果要用于关联的自定义标头名称,可以在HeaderAttributeCorrelationStrategy
并将其作为聚合器关联策略的参考。
锁定注册表
对组的更改是线程安全的。
因此,当您同时发送同一相关 ID 的消息时,聚合器中只会处理其中一个消息,从而有效地将其作为每个消息组的单线程。
一个LockRegistry
用于获取已解析相关 ID 的锁。
一个DefaultLockRegistry
默认使用(内存中)。
用于在共享的服务器之间同步更新MessageGroupStore
,则必须配置共享锁注册表。
避免死锁
如上所述,当消息组发生变化(添加或释放消息)时,将保持锁定。
请考虑以程:
...->aggregator1-> ... ->aggregator2-> ...
如果存在多个线程,并且聚合器共享一个公共锁注册表,则可能会出现死锁。
这将导致挂起线程和jstack <pid>
可能会显示以下结果:
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有几种方法可以避免此问题:
-
确保每个聚合器都有自己的锁注册表(这可以是跨应用程序实例的共享注册表,但流中的两个或多个聚合器必须每个都有一个不同的注册表)
-
使用
ExecutorChannel
或QueueChannel
作为聚合器的输出通道,以便下游流在新线程上运行 -
从 5.1.1 版本开始,将
releaseLockBeforeSend
aggregator 属性设置为true
如果由于某种原因,单个聚合器的输出最终被路由回同一聚合器,也可能导致此问题。 当然,上述第一个解决方案不适用于这种情况。 |
在 Java DSL 中配置聚合器
有关如何在 Java DSL 中配置聚合器,请参阅聚合器和重排序器。
使用 XML 配置聚合器
Spring Integration 支持通过<aggregator/>
元素。
以下示例显示了聚合器的示例:
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" (1)
auto-startup="true" (2)
input-channel="inputChannel" (3)
output-channel="outputChannel" (4)
discard-channel="throwAwayChannel" (5)
message-store="persistentMessageStore" (6)
order="1" (7)
send-partial-result-on-expiry="false" (8)
send-timeout="1000" (9)
correlation-strategy="correlationStrategyBean" (10)
correlation-strategy-method="correlate" (11)
correlation-strategy-expression="headers['foo']" (12)
ref="aggregatorBean" (13)
method="aggregate" (14)
release-strategy="releaseStrategyBean" (15)
release-strategy-method="release" (16)
release-strategy-expression="size() == 5" (17)
expire-groups-upon-completion="false" (18)
empty-group-min-timeout="60000" (19)
lock-registry="lockRegistry" (20)
group-timeout="60000" (21)
group-timeout-expression="size() ge 2 ? 100 : -1" (22)
expire-groups-upon-timeout="true" (23)
scheduler="taskScheduler" > (24)
<expire-transactional/> (25)
<expire-advice-chain/> (26)
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 | 聚合器的 id 是可选的。 |
2 | 生命周期属性,指示是否应在应用程序上下文启动期间启动聚合器。 可选(默认值为 'true')。 |
3 | 聚合器从中接收消息的通道。 必填。 |
4 | 聚合器将聚合结果发送到的通道。 可选(因为传入消息本身可以在 'replyChannel' 消息标头中指定回复通道)。 |
5 | 聚合器向其发送超时消息的通道(如果send-partial-result-on-expiry 是false ).
自选。 |
6 | 对MessageGroupStore 用于将消息组存储在其相关键下,直到它们完成。
自选。
默认情况下,它是一个易失性的内存中存储。
有关详细信息,请参阅消息存储。 |
7 | 当多个句柄订阅同一句柄时,此聚合器的顺序DirectChannel (用于负载平衡目的)。
自选。 |
8 | 指示过期消息应聚合并发送到 'output-channel' 或 'replyChannel',一旦其中包含MessageGroup 已过期(请参阅MessageGroupStore.expireMessageGroups(long) ).
使MessageGroup 是通过配置MessageGroupStoreReaper .
但是,您也可以过期MessageGroup 通过调用MessageGroupStore.expireMessageGroups(timeout) .
您可以通过控制总线作来实现此目的,或者,如果您有对MessageGroupStore 实例,通过调用expireMessageGroups(timeout) .
否则,此属性本身不执行任何作。
它仅用作指示器,指示是否丢弃或将任何仍在MessageGroup 即将到期。
可选(默认值为false ).
注意:此属性可能更合适地称为send-partial-result-on-timeout ,因为如果出现以下情况,则组实际上可能不会过期expire-groups-upon-timeout 设置为false . |
9 | 发送回复时等待的超时间隔Message 到output-channel 或discard-channel .
默认为30 秒。
仅当输出通道具有一些“发送”限制时才应用它,例如QueueChannel 具有固定的“容量”。
在这种情况下,一个MessageDeliveryException 被抛出。
为AbstractSubscribableChannel 实现,则send-timeout 被忽略。
为group-timeout(-expression) 这MessageDeliveryException 从计划过期任务中,将导致重新计划此任务。
自选。 |
10 | 对实现消息关联(分组)算法的 Bean 的引用。
bean 可以是CorrelationStrategy 接口或 POJO。
在后一种情况下,correlation-strategy-method 属性也必须定义。
可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID 标头)。 |
11 | 在引用的 bean 上定义的方法correlation-strategy .
它实现了相关决策算法。
可选,有限制 (correlation-strategy 必须存在)。 |
12 | 表示相关策略的 SpEL 表达式。
例:"headers['something']" .
只有其中一个correlation-strategy 或correlation-strategy-expression 是允许的。 |
13 | 对在应用程序上下文中定义的 bean 的引用。 bean 必须实现聚合逻辑,如前所述。 可选(默认情况下,聚合消息列表将成为输出消息的有效负载)。 |
14 | 在 bean 上定义的方法,由ref 属性。
它实现了消息聚合算法。
可选(取决于ref 属性)。 |
15 | 对实现发布策略的 Bean 的引用。
bean 可以是ReleaseStrategy 接口或 POJO。
在后一种情况下,release-strategy-method 属性也必须定义。
可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZE header 属性)。 |
16 | 在 bean 上定义的方法,由release-strategy 属性。
它实现了完成决策算法。
可选,有限制 (release-strategy 必须存在)。 |
17 | 表示发布策略的 SpEL 表达式。
表达式的根对象是MessageGroup .
例:"size() == 5" .
只有其中一个release-strategy 或release-strategy-expression 是允许的。 |
18 | 当设置为true (默认值为false ),则已完成的组将从消息存储中删除,从而使具有相同关联的后续消息形成一个新组。
默认行为是将与已完成组具有相同关联的消息发送到discard-channel . |
19 | 仅当MessageGroupStoreReaper 为MessageStore 的<aggregator> .
默认情况下,当MessageGroupStoreReaper 配置为使部分组过期,空组也会被删除。
正常释放组后存在空组。
空组支持检测和丢弃迟到的邮件。
如果您希望空组过期的时间比过期的部分组更长的时间,请设置此属性。
然后,空组不会从MessageStore 直到它们至少在这个毫秒数内没有被修改。
请注意,空组的实际过期时间也受到收割者的timeout 属性,它可以与此值加上超时一样多。 |
20 | 对org.springframework.integration.util.LockRegistry 豆。
它曾经获得一个Lock 基于groupId 对于MessageGroup .
默认情况下,内部DefaultLockRegistry 被使用。
使用LockRegistry ,例如ZookeeperLockRegistry ,确保聚合器只有一个实例可以同时对组进行作。
有关更多信息,请参阅 Redis 锁注册表或 Zookeeper 锁注册表。 |
21 | 超时(以毫秒为单位)强制MessageGroup 当ReleaseStrategy 当前消息到达时不会释放组。
此属性为聚合器提供了内置的基于时间的发布策略,当需要发出部分结果(或丢弃组)时,如果新消息未到达MessageGroup 在从最后一条消息到达开始计算的超时内。
要设置一个超时,该超时从时间开始计算MessageGroup 创建了,请参阅group-timeout-expression 信息。
当新消息到达聚合器时,任何现有的ScheduledFuture<?> 对于它的MessageGroup 被取消。
如果ReleaseStrategy 返回false (意思是不要释放)和groupTimeout > 0 ,则计划新任务使组过期。
我们不建议将此属性设置为零(或负值)。
这样做会有效地禁用聚合器,因为每个消息组都会立即完成。
但是,您可以使用表达式有条件地将其设置为零(或负值)。
看group-timeout-expression 以获取信息。
在完成期间采取的作取决于ReleaseStrategy 和send-partial-group-on-expiry 属性。
有关详细信息,请参阅聚合器和组超时。
它与“group-timeout-expression”属性互斥。 |
22 | 计算结果为groupTimeout 使用MessageGroup 作为#root 评估上下文对象。
用于调度MessageGroup 强制完成。
如果表达式的计算结果为null ,则未计划完成。
如果计算结果为零,则该组将立即在当前线程上完成。
实际上,这提供了一个动态的group-timeout 财产。
例如,如果您希望强制完成MessageGroup 自创建组以来经过 10 秒后,您可以考虑使用以下 SpEL 表达式:timestamp + 10000 - T(System).currentTimeMillis() 哪里timestamp 由MessageGroup.getTimestamp() 作为MessageGroup 这是#root 评估上下文对象。
但请记住,组创建时间可能与第一个到达消息的时间不同,具体取决于其他组过期属性的配置。
看group-timeout 了解更多信息。
与“group-timeout”属性互斥。 |
23 | 当组由于超时(或MessageGroupStoreReaper ),默认情况下,该组已过期(完全删除)。
迟到的邮件会启动一个新组。
您可以将其设置为false 以完成组,但保留其元数据,以便丢弃延迟到达的邮件。
空组可以稍后使用MessageGroupStoreReaper 与empty-group-min-timeout 属性。
它默认为 'true'。 |
24 | 一个TaskScheduler bean 引用来调度MessageGroup 如果没有新消息到达,则强制完成MessageGroup 在groupTimeout .
如果未提供,则默认调度程序 (taskScheduler ) 在ApplicationContext (ThreadPoolTaskScheduler ) 被使用。
如果出现以下情况,则此属性不适用group-timeout 或group-timeout-expression 未指定。 |
25 | 从 4.1 版本开始。
它允许为forceComplete 操作。
它是从group-timeout(-expression) 或通过MessageGroupStoreReaper 并且不适用于法线add ,release 和discard 操作。
只有这个子元素或<expire-advice-chain/> 是允许的。 |
26 | 从 4.1 版本开始。
它允许配置任何Advice 对于forceComplete 操作。
它是从group-timeout(-expression) 或通过MessageGroupStoreReaper 并且不适用于法线add ,release 和discard 操作。
只有这个子元素或<expire-transactional/> 是允许的。
交易Advice 也可以在此处使用 Spring 进行配置tx Namespace。 |
即将过期的组
有两个属性与即将过期(完全删除)的组相关。
当一个组过期时,没有它的记录,如果新消息到达具有相同的关联性,那么将启动一个新组。
当组完成(没有过期)时,空组将保留,并丢弃迟到的消息。
稍后可以使用
如果组未正常完成,但由于超时而被释放或丢弃,则该组通常已过期。
从 4.1 版开始,您可以使用以下命令来控制此行为
从 5.0 版开始,空组也计划在 从版本 5.4 开始,可以将聚合器(和重排序器)配置为使孤立组过期(持久消息存储中可能不会释放的组)。
这 |
我们通常建议使用ref
属性,如果自定义聚合器处理程序实现可以在其他<aggregator>
定义。
但是,如果自定义聚合器实现仅由<aggregator>
,您可以使用内部 bean 定义(从 1.0.3 版开始)在<aggregator>
元素,如以下示例所示:
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
同时使用ref 属性和内部 Bean 定义<aggregator> 不允许配置,因为它会产生不明确的条件。
在这种情况下,将引发异常。 |
以下示例显示了聚合器 bean 的实现:
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
前面示例的完成策略 bean 的实现可能如下所示:
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
只要有意义,就可以将发布策略方法和聚合器方法组合成一个 bean。 |
上面示例的关联策略 bean 的实现可能如下所示:
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
前面示例中的聚合器将按某些条件(在本例中为除以 10 后的余数)对数字进行分组,并保留该组,直到有效负载提供的数字之和超过某个值。
只要这样做有意义,就可以将发布策略方法、关联策略方法和聚合器方法组合在单个 bean 中。(实际上,它们全部或任意两个都可以组合在一起。 |
聚合器和 Spring 表达式语言 (SpEL)
从 Spring Integration 2.0 开始,您可以使用 SpEL 处理各种策略(关联、发布和聚合),如果此类发布策略背后的逻辑相对简单,我们建议使用该策略。假设您有一个设计用于接收对象数组的遗留组件。我们知道默认的发布策略将所有聚合消息组装在List
. 现在我们有两个问题。首先,我们需要从列表中提取单个消息。其次,我们需要提取每条消息的有效负载并组装对象数组。以下示例解决了这两个问题:
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<String>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
但是,使用 SpEL,实际上可以使用单行表达式相对容易地处理此类需求,从而避免您编写自定义类并将其配置为 bean。以下示例显示了如何执行此作:
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在前面的配置中,我们使用集合投影表达式从列表中所有消息的有效负载中组装一个新集合,然后将其转换为数组,从而获得与前面的 Java 代码相同的结果。
在处理自定义发布和关联策略时,可以应用相同的基于表达式的方法。
而不是为自定义定义 beanCorrelationStrategy
在correlation-strategy
属性,您可以将简单的关联逻辑实现为 SpEL 表达式,并在correlation-strategy-expression
属性,如以下示例所示:
correlation-strategy-expression="payload.person.id"
在前面的示例中,我们假设有效负载具有person
属性,并带有id
,这将用于关联消息。
同样,对于ReleaseStrategy
,您可以将发布逻辑实现为 SpEL 表达式,并在release-strategy-expression
属性。
评估上下文的根对象是MessageGroup
本身。
这List
的消息可以通过使用message
表达式中组的属性。
在 5.0 版之前的版本中,根对象是Message<?> ,如前面的示例所示: |
release-strategy-expression="!messages.?[payload==5].empty"
在前面的示例中,SpEL 评估上下文的根对象是MessageGroup
本身,并且您表示,一旦出现有效负载为5
在这个组中,该组应该被释放。
聚合器和组超时
从 4.0 版开始,引入了两个新的互斥属性:group-timeout
和group-timeout-expression
.
请参阅使用 XML 配置聚合器。
在某些情况下,如果ReleaseStrategy
当前消息到达时不会释放。
为此,该groupTimeout
选项允许安排MessageGroup
强制完成,如以下示例所示:
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
在此示例中,如果聚合器按顺序接收最后一条消息,则正常发布是可能的,如release-strategy-expression
. 如果该特定消息未到达,则groupTimeout
强制组在十秒后完成,只要该组至少包含两个消息。
强制组完成的结果取决于ReleaseStrategy
和send-partial-result-on-expiry
. 首先,再次咨询发布策略,看看是否要进行正常发布。虽然组没有改变,但ReleaseStrategy
此时可以决定释放该组。如果发布策略仍然没有释放该组,则该组已过期。 如果send-partial-result-on-expiry
是true
,(部分) 中的现有消息MessageGroup
作为普通聚合器回复消息发布给output-channel
. 否则,它将被丢弃。
之间有区别groupTimeout
行为和MessageGroupStoreReaper
(请参阅使用 XML 配置聚合器)。收割器启动所有MessageGroup
s 在MessageGroupStore
周期性地。 这groupTimeout
为每个人做MessageGroup
如果在groupTimeout
. 此外,收割器可用于删除空组(如果expire-groups-upon-completion
是假的)。
从 5.5 版本开始,groupTimeoutExpression
可以评估为java.util.Date
实例。 这在根据组创建时间 (MessageGroup.getTimestamp()
),而不是当前消息到达,因为它是在groupTimeoutExpression
被评估为long
:
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
使用注释配置聚合器
以下示例显示了配置了注释的聚合器:
public class Waiter {
...
@Aggregator (1)
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy (2)
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy (3)
public String correlateBy(OrderItem item) {
...
}
}
1 | 指示此方法应用作聚合器的注释。如果将此类用作聚合器,则必须指定它。 |
2 | 指示此方法用作聚合器的发布策略的注释。如果任何方法上都不存在,则聚合器使用SimpleSequenceSizeReleaseStrategy . |
3 | 指示此方法应用作聚合器的关联策略的注释。如果未指示关联策略,则聚合器使用HeaderAttributeCorrelationStrategy 基于CORRELATION_ID . |
XML 元素提供的所有配置选项也可用于@Aggregator
注解。
聚合器可以从 XML 显式引用,或者如果@MessageEndpoint
在类上定义,通过类路径扫描自动检测。
注释配置 (@Aggregator
等)仅涵盖简单的用例,其中大多数默认选项就足够了。
如果您在使用注释配置时需要对这些选项进行更多控制,请考虑使用@Bean
定义AggregatingMessageHandler
并标记其@Bean
方法@ServiceActivator
,如以下示例所示:
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
从 4.2 版开始,AggregatorFactoryBean 可用于简化AggregatingMessageHandler . |
在聚合器中管理状态:MessageGroupStore
聚合器(以及 Spring Integration 中的其他一些模式)是一种有状态模式,它要求根据一段时间内到达的一组消息做出决策,所有这些消息都具有相同的关联键。
有状态模式中接口的设计(例如ReleaseStrategy
)的原则是组件(无论是由框架定义还是由用户定义)都应该能够保持无状态。
所有状态都由MessageGroup
其管理权委托给MessageGroupStore
.
这MessageGroupStore
接口定义如下:
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
有关更多信息,请参阅 Javadoc。
这MessageGroupStore
在MessageGroups
在等待触发发布策略时,该事件可能永远不会发生。
因此,为了防止过时消息挥之不去,并让易失性存储在应用程序关闭时提供用于清理的钩子,MessageGroupStore
允许您注册回调以应用于其MessageGroups
当它们到期时。
界面非常简单,如以下列表所示:
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回调可以直接访问存储和消息组,以便它可以管理持久状态(例如,通过从存储中完全删除组)。
这MessageGroupStore
维护这些回调的列表,根据需要将其应用于时间戳早于作为参数提供的时间的所有消息(请参阅registerMessageGroupExpiryCallback(..)
和expireMessageGroups(..)
方法,如前所述)。
重要的是不要使用相同的MessageGroupStore 实例,当您打算依赖expireMessageGroups 功能性。
每AbstractCorrelatingMessageHandler 注册自己的MessageGroupCallback 基于forceComplete() 回调。
这样,每个过期的组可能会被错误的聚合器完成或丢弃。
从 5.0.10 版本开始,一个UniqueExpiryCallback 从AbstractCorrelatingMessageHandler 对于MessageGroupStore .
这MessageGroupStore 反过来,检查是否存在此类的实例,并记录错误并显示相应消息(如果回调集中已存在)。
这样,框架就不允许使用MessageGroupStore 实例,以避免上述过期的副作用,而不是由特定相关处理程序创建的组。 |
您可以调用expireMessageGroups
方法。
任何早于当前时间减去此值的消息都已过期并应用回调。
因此,是商店的用户定义了消息组“到期”的含义。
为了方便用户,Spring Integration 以MessageGroupStoreReaper
,如以下示例所示:
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
收割者是一个Runnable
.
在前面的示例中,消息组存储的 expire 方法每十秒调用一次。
超时本身为 30 秒。
重要的是要了解 的 'timeout' 属性MessageGroupStoreReaper 是一个近似值,受任务计划程序速率的影响,因为此属性仅在下一次计划执行MessageGroupStoreReaper 任务。
例如,如果超时设置为 10 分钟,但MessageGroupStoreReaper 任务计划每小时运行一次,并且最后一次执行MessageGroupStoreReaper 任务发生在超时前一分钟,则MessageGroup 在接下来的 59 分钟内不会过期。
因此,我们建议将速率设置为至少等于超时值或更短。 |
除了收割器之外,当应用程序关闭时,也会通过AbstractCorrelatingMessageHandler
.
这AbstractCorrelatingMessageHandler
注册自己的过期回调,这是带有布尔标志的链接send-partial-result-on-expiry
在聚合器的 XML 配置中。
如果标志设置为true
,则在调用过期回调时,组中尚未释放的任何未标记消息都可以发送到输出通道。
由于MessageGroupStoreReaper 从计划任务调用,并且可能会导致消息的生成(取决于sendPartialResultOnExpiry option) 给下游集成流,建议提供自定义TaskScheduler 使用MessagePublishingErrorHandler 通过errorChannel ,正如常规聚合器发布功能所期望的那样。
同样的逻辑也适用于组超时功能,该功能也依赖于TaskScheduler .
有关详细信息,请参阅错误处理。 |
当共享的 一些 有关 |
通量聚合器
在 5.2 版本中,FluxAggregatorMessageHandler
组件已被引入。
它基于反应堆项目Flux.groupBy()
和Flux.window()
运营商。
传入消息将发送到FluxSink
由Flux.create()
在此组件的构造函数中。
如果outputChannel
未提供或不是ReactiveStreamsSubscribableChannel
,订阅主Flux
从Lifecycle.start()
实现。
否则,它将推迟到ReactiveStreamsSubscribableChannel
实现。
消息按Flux.groupBy()
使用CorrelationStrategy
组键。
默认情况下,IntegrationMessageHeaderAccessor.CORRELATION_ID
查阅邮件的标头。
默认情况下,每个关闭的窗口都作为Flux
在要生成的消息的有效负载中。
此消息包含窗口中第一条消息的所有标头。
这Flux
在输出消息中,有效负载必须订阅并在下游处理。
这样的逻辑可以由setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)
配置选项的FluxAggregatorMessageHandler
.
例如,如果我们想要一个List
的有效负载,我们可以配置一个Flux.collectList()
像下面这样:
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
中有几个选项FluxAggregatorMessageHandler
要选择适当的窗口策略,请执行以下作:
-
setBoundaryTrigger(Predicate<Message<?>>)
- 传播到Flux.windowUntil()
算子。 有关更多信息,请参阅其 JavaDocs。 优先于所有其他窗口选项。 -
setWindowSize(int)
和setWindowSizeFunction(Function<Message<?>, Integer>)
- 传播到Flux.window(int)
或windowTimeout(int, Duration)
. 默认情况下,窗口大小是根据组中的第一条消息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
页眉。 -
setWindowTimespan(Duration)
- 传播到Flux.window(Duration)
或windowTimeout(int, Duration)
取决于窗口大小配置。 -
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)
- 一个函数,用于将转换应用于公开选项未涵盖的任何自定义窗口作的分组通量。
由于此组件是MessageHandler
实现它可以简单地用作@Bean
定义与@ServiceActivator
消息传递注释。
对于 Java DSL,它可以从.handle()
EIP 方法。
下面的示例演示了我们如何注册IntegrationFlow
在运行时,以及如何FluxAggregatorMessageHandler
可以与上游的分路器相关联:
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);
消息组上的条件
从 5.5 版本开始,AbstractCorrelatingMessageHandler
(包括其 Java 和 XML DSL)公开了一个groupConditionSupplier
选项的BiFunction<Message<?>, String, String>
实现。
此功能用于添加到组的每条消息,并将结果条件句子存储到组中以供将来考虑。
这ReleaseStrategy
可以查阅此条件,而不是迭代组中的所有消息。
看GroupConditionProvider
JavaDocs 和 Message Group Condition 了解更多信息。
另请参阅文件聚合器。