消息路由
消息路由
本章涵盖了使用Spring Integration路由消息的详细信息。
路由
此部分介绍了路由的工作原理。 它包括以下主题:
概述
路由器在许多消息架构中是关键元素。 它们从一个消息通道消费消息,并根据一组条件将每个消费的消息转发到一个或多个不同的消息通道。
Spring Integration 提供了以下路由器:
路由实现共享许多配置参数。 然而,不同路由器之间存在一些差异。 此外,配置参数的可用性取决于路由器是在链内还是链外使用。 为了提供一个快速概览,所有可用属性在下面两个表格中列出。
以下表格显示了链外路由器可用的配置参数:
| 属性 | 路由器 | Header Value Router | xpath 路由 | payload type router | 收件人列表路由 | 异常类型路由器 |
|---|---|---|---|---|---|---|
apply-sequence |
|
|
|
|
|
|
default-output-channel |
|
|
|
|
|
|
resolution-required |
|
|
|
|
|
|
ignore-send-failures |
|
|
|
|
|
|
timeout |
|
|
|
|
|
|
id |
|
|
|
|
|
|
auto-startup |
|
|
|
|
|
|
input-channel |
|
|
|
|
|
|
订单 |
|
|
|
|
|
|
方法 |
|
|||||
ref |
|
|||||
表达式 |
|
|||||
header-name |
|
|||||
evaluate-as-string |
|
|||||
xpath-expression-ref |
|
|||||
转换器 |
|
以下表格显示了链内部路由器可用的配置参数:
| 属性 | 路由器 | Header Value Router | xpath 路由 | payload type router | 受件人列表路由 | 异常类型路由器 |
|---|---|---|---|---|---|---|
apply-sequence |
|
|
|
|
|
|
default-output-channel |
|
|
|
|
|
|
resolution-required |
|
|
|
|
|
|
ignore-send-failures |
|
|
|
|
|
|
timeout |
|
|
|
|
|
|
id |
||||||
auto-startup |
||||||
input-channel |
||||||
订单 |
||||||
方法 |
|
|||||
ref |
|
|||||
表达式 |
|
|||||
header-name |
|
|||||
evaluate-as-string |
|
|||||
xpath-expression-ref |
|
|||||
转换器 |
|
|
自 Spring Integration 2.1 版本起,所有路由器实现的路由器参数已更加标准化。 因此,一些微小的更改可能会破坏基于旧版 Spring Integration 的应用程序。 自从 Spring Integration 2.1 版本起, 在这些更改之前, 如果确实希望静默丢弃消息,可以设置 |
常见路由参数
此部分描述了所有路由器参数共有的参数(在本章早期显示的两张表中所有方框都打勾的参数)。
链的内部与外部
以下参数适用于所有链内外的所有路由器。
apply-sequence-
此属性指定是否应在每个消息中添加序列号和大小标头。 此可选属性默认值为
false。 default-output-channel-
如果设置此属性,则该属性提供一个引用,指示如果通道解析无法返回任何通道,则应向哪个通道发送消息。 如果没有提供默认输出通道,则路由器将抛出异常。 如果您希望无声地丢弃这些消息,请将默认输出通道属性值设置为
nullChannel。从 6.0 版本开始,设置默认输出通道也会将 channelKeyFallback选项重置为false。 因此,不会再尝试根据名称解析通道,而是回退到此默认输出通道——类似于 Java 中的switch语句。 如果channelKeyFallback被显式设置为true,则后续逻辑取决于resolutionRequired选项:来自键的未解析通道的消息仅当resolutionRequired为false时才能到达defaultOutputChannel。 因此,在defaultOutputChannel已提供且channelKeyFallback和resolutionRequired均设置为true的配置下,AbstractMappingMessageRouter初始化阶段会拒绝该配置。 resolution-required-
此属性指定频道名称是否必须始终解析为存在的通道实例。 如果设置为
true,则在无法解析通道时会抛出MessagingException。 将该属性设置为false会使任何未解析的通道被忽略。 此可选属性默认值为true。仅当指定的 resolution-required为false且通道未解析时,消息才会发送给default-output-channel。 ignore-send-failures-
若设置为
true,则向消息通道发送失败将被忽略。 若设置为false,则会抛出一个MessageDeliveryException的异常,并且如果路由解析出多个通道,则后续的通道不会接收到该消息。此属性的行为取决于发送消息的目标
Channel的类型。 例如,在使用直接通道(单线程)的情况下,发送失败可能是由下游组件抛出的异常引起的。 然而,在向简单队列通道发送消息(异步情况下),抛出异常的可能性相对较小。大多数路由器会路由到单一通道,但它们可以返回多个通道名称。 例如
recipient-list-router就会这样做。如果你在一个只路由到单一通道的路由器上将此属性设置为
true,任何引发的异常都会被吞掉,这通常意义不大。
在这种情况下,在流入口点处捕获异常在错误流中会更有意义。因此,当路由器实现返回多个通道名称时,将
ignore-send-failures属性设置为true通常更合理,因为其他通道(失败的通道之后)仍会接收到消息。此属性默认为
false。 timeout-
The
timeout属性指定在向目标消息通道发送消息时等待的最大时间(以毫秒为单位)。
顶级(链外)
以下参数仅在所有顶级路由器(不在链中的)之间有效。
id-
标识底层的Spring bean定义,而在路由器的情况下,这可以是
EventDrivenConsumer或PollingConsumer,具体取决于路由器的input-channel是一个SubscribableChannel还是一个PollableChannel。 这是一个可选属性。 auto-startup-
此“生命周期”属性表示该组件是否应该在应用程序上下文启动时被启动。 此可选属性默认值为
true。 input-channel-
此端点的接收消息通道。
order-
此属性定义了当该端点作为频道的订阅者连接时的调用顺序。 这在该频道使用失败重试分发策略时尤为重要。 当该端点本身是一个轮询消费者且通道带有队列时,此属性无效。
路由实现
由于基于内容的路由通常需要一些特定于领域的逻辑,大多数用例都要求通过使用XML命名空间支持或注解将控制权委托给POJO。这两种方法将在后面讨论。 然而,在此之前,我们首先介绍几种满足常见需求的实现。
PayloadTypeRouter
一个PayloadTypeRouter将消息发送到由payload-type映射定义的通道,如下例所示:
<bean id="payloadTypeRouter"
class="org.springframework.integration.router.PayloadTypeRouter">
<property name="channelMapping">
<map>
<entry key="java.lang.String" value-ref="stringChannel"/>
<entry key="java.lang.Integer" value-ref="integerChannel"/>
</map>
</property>
</bean>
配置PayloadTypeRouter也通过Spring Integration提供的命名空间(参见Namespace Support)所支持,这实际上通过将<router/>配置与其相应的实现(使用<bean/>元素定义)合并为一个更简洁的配置元素来简化了配置。
以下示例展示了一个等效于上述配置但利用了命名空间支持的PayloadTypeRouter配置:
<int:payload-type-router input-channel="routingChannel">
<int:mapping type="java.lang.String" channel="stringChannel" />
<int:mapping type="java.lang.Integer" channel="integerChannel" />
</int:payload-type-router>
以下示例显示了等效的路由器配置在Java中:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}
当使用Java DSL时,有两个选项。
首先,您可以像前一个示例中所示的那样定义路由器对象:
@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlow.from("routingChannel")
.route(router())
.get();
}
public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}
注意,路由器可以但不一定是一个@Bean。流会在它不是@Bean时进行注册。
第二步,您还可以在DSL流程本身中定义路由函数,如下例所示:
@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlow.from("routingChannel")
.<Object, Class<?>>route(Object::getClass, m -> m
.channelMapping(String.class, "stringChannel")
.channelMapping(Integer.class, "integerChannel"))
.get();
}
HeaderValueRouter
一个HeaderValueRouter根据个别头部值映射将消息发送到通道。
当创建一个HeaderValueRouter时,它会被初始化为评估的头部名称。
头部的值可能是两件事情之一:
-
任意值
-
一个通道名称
如果是一个任意值,则需要额外的映射将这些头值与频道名称关联。 否则,不需要额外配置。
Spring Integration 提供了一种基于命名空间的 XML 配置来配置一个 HeaderValueRouter。
以下示例展示了当需要映射头值到通道时,HeaderValueRouter 的配置方式:
<int:header-value-router input-channel="routingChannel" header-name="testHeader">
<int:mapping value="someHeaderValue" channel="channelA" />
<int:mapping value="someOtherHeaderValue" channel="channelB" />
</int:header-value-router>
在解析过程中,前面示例中定义的路由器可能会遇到通道解析失败的情况,导致异常。
如果您希望抑制此类异常并将未解析的消息发送到默认输出通道(通过default-output-channel属性标识),可以将resolution-required设置为false。
通常,当消息的头部值未显式映射到某个通道时,这些消息会被发送到 default-output-channel。
然而,如果头部值被映射到了一个通道名称但该通道无法解析,将 resolution-required 属性设置为 false 会导致这样的消息被路由到 default-output-channel。
自 Spring Integration 2.1 版本起,该属性已从 ignore-channel-name-resolution-failures 更改为 resolution-required。
属性 resolution-required 的默认值为 true。 |
以下示例显示了等效的路由器配置在Java中:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter("testHeader");
router.setChannelMapping("someHeaderValue", "channelA");
router.setChannelMapping("someOtherHeaderValue", "channelB");
return router;
}
当使用Java DSL时,有两种选项。 首先,您可以像前面的示例一样定义路由对象:
@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlow.from("routingChannel")
.route(router())
.get();
}
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter("testHeader");
router.setChannelMapping("someHeaderValue", "channelA");
router.setChannelMapping("someOtherHeaderValue", "channelB");
return router;
}
注意,路由器可以但不一定是一个@Bean。流会在它不是@Bean时进行注册。
第二步,您还可以在DSL流程本身中定义路由函数,如下例所示:
@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlow.from("routingChannel")
.route(Message.class, m -> m.getHeaders().get("testHeader", String.class),
m -> m
.channelMapping("someHeaderValue", "channelA")
.channelMapping("someOtherHeaderValue", "channelB"),
e -> e.id("headerValueRouter"))
.get();
}
配置中,不需要将header值映射到频道名称,因为header值本身即代表频道名称。 以下示例展示了一个无需将header值映射到频道名称的路由器:
<int:header-value-router input-channel="routingChannel" header-name="testHeader"/>
|
自 Spring Integration 2.1 版本起,解析通道的行为更加明确。
例如,如果您省略了 基本上,默认情况下,路由器必须能够成功地将消息路由到至少一个通道。
如果真的要丢弃消息,你也必须将 |
RecipientListRouter
一个 RecipientListRouter 将接收到的每个消息发送到静态定义的消息通道列表。
以下示例创建了一个 RecipientListRouter:
<bean id="recipientListRouter"
class="org.springframework.integration.router.RecipientListRouter">
<property name="channels">
<list>
<ref bean="channel1"/>
<ref bean="channel2"/>
<ref bean="channel3"/>
</list>
</property>
</bean>
Spring Integration 还为 RecipientListRouter 配置提供了命名空间支持(参见 命名空间支持),如下例所示:
<int:recipient-list-router id="customRouter" input-channel="routingChannel"
timeout="1234"
ignore-send-failures="true"
apply-sequence="true">
<int:recipient channel="channel1"/>
<int:recipient channel="channel2"/>
</int:recipient-list-router>
以下示例显示了等效的路由器配置在Java中:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public RecipientListRouter router() {
RecipientListRouter router = new RecipientListRouter();
router.setSendTimeout(1_234L);
router.setIgnoreSendFailures(true);
router.setApplySequence(true);
router.addRecipient("channel1");
router.addRecipient("channel2");
router.addRecipient("channel3");
return router;
}
以下示例展示了使用Java DSL配置的等效路由器:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.routeToRecipients(r -> r
.applySequence(true)
.ignoreSendFailures(true)
.recipient("channel1")
.recipient("channel2")
.recipient("channel3")
.sendTimeout(1_234L))
.get();
}
此处的 'apply-sequence' 标志与发布 - 订阅通道(publish-subscribe-channel)的效果相同,并且与发布 - 订阅通道一样,它在 recipient-list-router 上默认处于禁用状态。
有关更多信息,请参阅 PublishSubscribeChannel 配置。 |
在配置RecipientListRouter时,另一个方便的选择是使用Spring Expression Language (SpEL) 支持作为个别接收通道的筛选器。
这样做类似于在一个‘链’的开头使用一个过滤器来充当“选择性消费者”。
不过,在这种情况下,所有这些功能都被简洁地合并到路由器的配置中,如下例所示:
<int:recipient-list-router id="customRouter" input-channel="routingChannel">
<int:recipient channel="channel1" selector-expression="payload.equals('foo')"/>
<int:recipient channel="channel2" selector-expression="headers.containsKey('bar')"/>
</int:recipient-list-router>
在之前的配置中,由selector-expression属性标识的SpEL表达式被评估以确定此接收者是否应包含在一个给定输入消息的目的地列表中。
该表达式的评估结果必须为boolean。
如果没有定义此属性,则通道始终在目的地列表中。
RecipientListRouterManagement
从版本 4.1 开始,RecipientListRouter 提供了若干操作,用于在运行时动态地操纵收件人。
这些管理操作通过 @ManagedResource 注解由 RecipientListRouterManagement 提供。
正如以下示例所示,它们可以通过使用 控制总线(Control Bus) 以及 JMX 来访问:
<control-bus input-channel="controlBus"/>
<recipient-list-router id="simpleRouter" input-channel="routingChannelA">
<recipient channel="channel1"/>
</recipient-list-router>
<channel id="channel2"/>
messagingTemplate.convertAndSend(controlBus, "@'simpleRouter.handler'.addRecipient('channel2')");
从应用程序启动开始,simpleRouter 只有一个 channel1 接收者。
但在执行 addRecipient 命令后,会添加一个 channel2 接收者。
这是一个“注册对消息部分内容感兴趣”的用例:我们可能在某个时间段内对来自路由器的消息感兴趣,因此订阅了 recipient-list-router,并在某个时刻决定取消订阅。
因为对<recipient-list-router>进行了运行时管理操作,所以可以从一开始就无需任何<recipient>进行配置。
在这种情况下,当没有任何匹配接收方的消息时,RecipientListRouter的行为与没有消息的情况相同。
如果配置了defaultOutputChannel,则将消息发送到那里。
否则,会抛出MessageDeliveryException。
xpath 路由
XPath 路由器是 XML 模块的一部分。 请参阅 使用 XPath 路由 XML 消息。
路由与错误处理
Spring Integration 还提供一种特殊的基于类型的路由器,称为 ErrorMessageExceptionTypeRouter,用于路由错误消息(定义为 payload 为 Throwable 实例的消息)。
ErrorMessageExceptionTypeRouter 类似于 PayloadTypeRouter。
事实上,它们几乎完全相同。
唯一的区别在于,虽然 PayloadTypeRouter 遍历有效负载实例的实例层次结构(例如 payload.getClass().getSuperclass())以查找最具体的类型和通道映射,但 ErrorMessageExceptionTypeRouter 遍历"异常原因"的层次结构(例如 payload.getCause())以查找最具体的 Throwable 类型或通道映射,并使用 mappingClass.isInstance(cause) 将 cause 匹配到类或其任何超类。
在这种情况下,通道映射顺序非常重要。
因此,如果有获取IllegalArgumentException的映射需求但没有RuntimeException的需求,那么必须首先在路由器中配置最后一个。 |
自从版本 4.3 起,ErrorMessageExceptionTypeRouter 在初始化阶段加载所有映射类以在遇到问题时快速失败为 ClassNotFoundException。 |
The following example shows a sample configuration for ErrorMessageExceptionTypeRouter:
@Bean
public IntegrationFlow someFlow() {
return f -> f
.routeByException(r -> r
.channelMapping(IllegalArgumentException.class, "illegalChannel")
.channelMapping(NullPointerException.class, "npeChannel")
.defaultOutputChannel("defaultChannel"));
}
@Bean
fun someFlow() =
integrationFlow {
routeByException {
channelMapping(IllegalArgumentException::class.java, "illegalChannel")
channelMapping(NullPointerException::class.java, "npeChannel")
defaultOutputChannel("defaultChannel")
}
}
@Bean
someFlow() {
integrationFlow {
routeByException {
channelMapping IllegalArgumentException, 'illegalChannel'
channelMapping NullPointerException, 'npeChannel'
defaultOutputChannel 'defaultChannel'
}
}
}
<int:exception-type-router input-channel="inputChannel"
default-output-channel="defaultChannel">
<int:mapping exception-type="java.lang.IllegalArgumentException"
channel="illegalChannel"/>
<int:mapping exception-type="java.lang.NullPointerException"
channel="npeChannel"/>
</int:exception-type-router>
<int:channel id="illegalChannel" />
<int:channel id="npeChannel" />
配置通用路由器
Spring Integration 提供了一个通用的路由器。 您可以使用它进行一般的路由(与 Spring Integration 提供的其他路由器不同,每个路由器都有某种形式的专业化)。
使用 XML 配置基于内容的路由器
router 元素提供了一种将路由器连接到输入通道的方式,并且可以接受可选的 default-output-channel 属性。
ref 属性引用了自定义路由器实现的 bean 名称(该实现必须扩展 AbstractMessageRouter)。
以下示例展示了三个通用路由器:
<int:router ref="payloadTypeRouter" input-channel="input1"
default-output-channel="defaultOutput1"/>
<int:router ref="recipientListRouter" input-channel="input2"
default-output-channel="defaultOutput2"/>
<int:router ref="customRouter" input-channel="input3"
default-output-channel="defaultOutput3"/>
<beans:bean id="customRouterBean" class="org.foo.MyCustomRouter"/>
Alternatively, ref 可能指向包含 @Router 注解的一个POJO(稍后会展示),或者您可以将 ref 结合一个显式的函数名。
指定一个方法会使行为与文档中稍后的 @Router 注解部分描述的行为相同。
以下示例定义了一个路由器,其 ref 属性指向一个POJO:
<int:router input-channel="input" ref="somePojo" method="someMethod"/>
我们通常建议如果自定义路由器实现被其他<router>定义引用,则使用ref属性。
然而,如果自定义路由器实现应仅限于<router>的一个定义中,您可以提供一个内部bean定义,如下例所示:
<int:router method="someMethod" input-channel="input3"
default-output-channel="defaultOutput3">
<beans:bean class="org.foo.MyCustomRouter"/>
</int:router>
在同一<router>配置中同时使用ref属性和内部处理器定义是不允许的。
这样做会导致条件模糊,并抛出异常。 |
如果ref属性引用了一个扩展了AbstractMessageProducingHandler的bean(例如框架本身提供的路由器),则配置优化为直接引用该路由器。
在这种情况下,每个ref属性必须分别指向一个bean实例(或一个prototype-作用域的bean),或者使用内部<bean/>配置类型。
然而,此优化仅适用于在路由器XML定义中未提供任何路由器特定属性的情况。
如果您不小心从多个beans引用了同一个消息处理器,则会得到一个配置异常。 |
以下示例显示了等效的路由器配置在Java中:
@Bean
@Router(inputChannel = "routingChannel")
public AbstractMessageRouter myCustomRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
return // determine channel(s) for message
}
};
}
以下示例展示了使用Java DSL配置的等效路由器:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.route(myCustomRouter())
.get();
}
public AbstractMessageRouter myCustomRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
return // determine channel(s) for message
}
};
}
可以按消息负载中的数据进行路由,如下例所示:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.route(String.class, p -> p.contains("foo") ? "fooChannel" : "barChannel")
.get();
}
Routers 和 Spring 表达语言(SpEL)
有时,路由逻辑可能非常简单,编写一个单独的类来实现它并配置为bean可能会显得过于复杂。 从Spring Integration 2.0开始,我们提供了一种替代方案,允许您使用SpEL来实现以前需要自定义POJO路由器的简单计算。
| 关于Spring表达式语言的更多信息,请参阅《Spring框架参考指南》的相关章节。 |
通常,一个SpEL表达式会被评估,并且其结果会映射到一个通道,如下例所示:
<int:router input-channel="inChannel" expression="payload.paymentType">
<int:mapping value="CASH" channel="cashPaymentChannel"/>
<int:mapping value="CREDIT" channel="authorizePaymentChannel"/>
<int:mapping value="DEBIT" channel="authorizePaymentChannel"/>
</int:router>
以下示例显示了等效的路由器配置在Java中:
@Router(inputChannel = "routingChannel")
@Bean
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter("payload.paymentType");
router.setChannelMapping("CASH", "cashPaymentChannel");
router.setChannelMapping("CREDIT", "authorizePaymentChannel");
router.setChannelMapping("DEBIT", "authorizePaymentChannel");
return router;
}
以下示例展示了等效的路由器配置在Java DSL中:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.route("payload.paymentType", r -> r
.channelMapping("CASH", "cashPaymentChannel")
.channelMapping("CREDIT", "authorizePaymentChannel")
.channelMapping("DEBIT", "authorizePaymentChannel"))
.get();
}
简化操作甚至可以更进一步,SpEL 表达式可以直接返回一个通道名称,如下表达式所示:
<int:router input-channel="inChannel" expression="payload + 'Channel'"/>
在 preceding 配置中,结果通道由 SpEL 表达式计算得出,该表达式将 payload 的值与字面量 String 和 'Channel' 连接起来。
配置路由器时,SpEL 的另一个优点是表达式可以返回 Collection,这使得每一个 <router> 都成为接收者列表路由器。
当表达式返回多个通道值时,消息会被转发到每个通道。以下示例展示了这样的表达式:
<int:router input-channel="inChannel" expression="headers.channels"/>
在上述配置中,如果消息包含一个名为'channels'的头部,并且该头部的值为0个频道名称,则消息将发送到列表中的每个频道。 您也可以在需要选择多个频道时发现集合投影和集合选择表达式很有用。 如需进一步信息,请参见:
配置路由器注解
当使用@Router注解一个方法时,该方法可以返回MessageChannel或String类型。
在后一种情况下,端点会像处理默认输出通道那样解析通道名称。
此外,该方法还可以返回单个值或者集合。如果返回的是集合,则回复消息会被发送到多个通道。
总之,以下方法签名都是有效的:
@Router
public MessageChannel route(Message message) {...}
@Router
public List<MessageChannel> route(Message message) {...}
@Router
public String route(Foo payload) {...}
@Router
public List<String> route(Foo payload) {...}
除了基于有效负载的路由外,消息还可以根据其消息头中可用的元数据(作为属性或特性)进行路由。
在这种情况下,一个使用 @Router 注解的方法可以包含一个使用 @Header 注解的参数,该参数映射到消息头的值,如下例所示,并在 注解支持 中进行了文档说明:
@Router
public List<String> route(@Header("orderStatus") OrderStatus status)
| 对于基于XML的消息的路由(包括XPath支持),请参阅XML支持 - 处理XML负载。 |
另请参阅 Java DSL 章节中的 消息路由器,以获取更多关于路由器配置的信息。
动态路由
Spring Integration 提供了多种不同的路由器配置,用于常见的基于内容的路由用例,并且还提供了作为POJO实现自定义路由器的选择。
例如,PayloadTypeRouter提供了一种简单的方式来配置一个路由器,该路由器根据传入消息的负载类型来计算通道;而HeaderValueRouter则提供了同样的便利性,用于配置一个路由器,该路由器通过评估特定的消息Header值来计算通道。
还有基于表达式的(SpEL)路由器,在这种路由器中,通道是通过对表达式进行评估来确定的。
所有这些类型的路由器都表现出一些动态特性。
然而,这些路由器都需要静态配置。 即使在基于表达式的路由器的情况下,该表达式本身也是作为路由器配置的一部分定义的,这意味着相同的表达式对相同值进行操作时总是会计算出同一个通道。 这在大多数情况下是可以接受的,因为这样的路由是明确定义的,因此是可预测的。 但有时我们需要动态更改路由器配置,以便消息流能够被路由到不同的通道。
例如,您可能希望暂时关闭系统的一部分进行维护,并将消息临时重定向到不同的消息流。
作为另一个示例,在处理特定类型的java.lang.Number(在PayloadTypeRouter的情况下),您可能希望通过添加另一条路由来引入更多细粒度的消息流。
不幸的是,要通过静态路由配置来实现这两个目标之一,您必须关闭整个应用程序,更改路由器的配置(修改路由),然后重新启动应用程序。 这显然是任何人都不想的解决方案。
动态路由模式描述了无需关闭系统或个别路由即可更改或配置路由的机制。
在我们具体探讨Spring Integration如何支持动态路由之前,我们需要考虑路由器的典型流程:
-
计算一个通道标识符,这是路由器在接收到消息后进行的计算得到的一个值。 通常,它是一个字符串或实际的
MessageChannel实例。 -
将渠道标识解析为渠道名称。 我们将在本节稍后详细描述这一过程。
-
将通道名称解析为实际的
MessageChannel
如果第一步的结果是MessageChannel的实际实例,则无法进行太多动态路由操作,因为MessageChannel是任何路由器工作的最终产物。
但是,如果第一步结果是一个不是MessageChannel的实例的通道标识符,那么你就有许多可能的方法来影响从MessageChannel推导过程。
考虑以下消息类型路由的示例:
<int:payload-type-router input-channel="routingChannel">
<int:mapping type="java.lang.String" channel="channel1" />
<int:mapping type="java.lang.Integer" channel="channel2" />
</int:payload-type-router>
在消息类型路由器的上下文中,前面提到的三个步骤将实现如下所示:
-
计算一个通道标识符,它是负载类型的标准名称(例如,
java.lang.String)。 -
将通道标识解析为通道名称,其中上一步的结果用于从
mapping元素中定义的payload类型映射中选择适当的值。 -
将通道名称解析为实际的
MessageChannel实例,作为应用程序上下文中某个bean的引用(希望该bean是MessageChannel),这个bean由上一步的结果标识。
换句话说,每个步骤都会为下一个步骤提供数据,直到过程完成。
现在考虑一个头部值路由器的例子:
<int:header-value-router input-channel="inputChannel" header-name="testHeader">
<int:mapping value="foo" channel="fooChannel" />
<int:mapping value="bar" channel="barChannel" />
</int:header-value-router>
现在我们可以考虑头值路由器工作的三个步骤:
-
计算一个通道标识符,它是通过
header-name属性标识的头部的值。 -
将渠道标识解析为渠道名称,其中上一步的结果用于从
mapping元素中定义的通用映射中选择适当的值。 -
将通道名称解析为实际的
MessageChannel实例,作为应用程序上下文中某个bean的引用(希望该bean是MessageChannel),这个bean由上一步的结果标识。
前面的两个不同路由器类型的配置看起来几乎完全相同。
然而,如果我们仔细查看 HeaderValueRouter 的替代配置,我们会清楚地看到没有 mapping 子元素,如下所示:
<int:header-value-router input-channel="inputChannel" header-name="testHeader">
然而,配置仍然完全有效。 因此,自然的问题是第二步中的映射怎么样?
第二步现在是可选的。
如果未定义mapping,那么在第一步中计算得到的通道标识符值将自动被视为 channel name,进而解析为实际的 MessageChannel,正如第三步所示。
这也意味着,第二步是提供路由器动态特性的一个关键步骤,因为它引入了一个过程,允许您更改通道标识符如何解析为通道名称的方式,从而影响最终实例MessageChannel确定的过程。
例如,在上述配置中,假设 testHeader 的值是 'kermit',这现在是一个通道标识符(第一步)。
由于在这个路由器中没有对应的映射,将这个通道标识符解析为通道名称(第二步)是不可能的,因此该通道标识符现在被视为通道名称。
然而,如果存在一个不同的映射会发生什么情况呢?
最终结果仍然相同,因为在将通道标识符解析为通道名称的过程中无法确定新值时,通道标识符就会被当作通道名称。
所有剩余的步骤就是进行第三步,将通道名称('kermit')解析为实际由该名称标识的MessageChannel实例。
这基本上涉及到通过提供的名称查找bean。
现在,所有包含头值对作为testHeader=kermit的消息都将被路由到一个bean名称为其id是'kermit'的MessageChannel。
但如果你希望将这些消息路由到'simpson'通道呢?显然,修改静态配置是可以的,但这也会要求你的系统停机。
然而,如果你能够访问通道标识符映射表,你可以在新的映射中引入一个新映射,使头部值对现在为kermit=simpson,从而让第二步将'kermit'视为通道标识符,并将其解析为'simpson'作为通道名称。
The same obviously applies for PayloadTypeRouter, where you can now remap or remove a particular payload type mapping.
In fact, it applies to every other router, including expression-based routers, since their computed values now have a chance to go through the second step to be resolved to the actual channel name.
任何作为 AbstractMappingMessageRouter(包括大多数框架定义的路由器)的子类的路由器都是动态路由器,因为 channelMapping 是在 AbstractMappingMessageRouter 级别定义的。
该映射的 setter 方法被公开为公共方法,与 'setChannelMapping' 和 'removeChannelMapping' 方法一起提供。
只要您拥有对路由器本身的引用,这些方法就允许您在运行时更改、添加和移除路由器映射。
这也意味着您可以通过 JMX(参见 JMX 支持)或 Spring Integration 控制总线(参见 控制总线)功能来暴露这些相同的配置选项。
使用通道键作为通道名称是灵活且方便的。然而,如果不信任消息创建者,恶意行为者(对系统有所了解)可以创建一条被路由到意外通道的消息。
例如,如果键设置为路由器输入通道的通道名称,这样一条消息最终会被路由回路由器,导致堆栈溢出错误。
因此,你可能希望禁用此功能(将channelKeyFallback属性设为false),并在需要时更改映射关系。 |
使用控制总线管理路由映射
一种管理路由器映射的方法是通过控制总线模式,该模式暴露了一个可以发送控制消息来管理和监控Spring Integration组件(包括路由器)的控制通道。
| 有关控制总线的更多信息,请参见 Control Bus。 |
通常,您会发送一个控制消息,要求在一个特定的管理组件(如路由器)上调用某个特定的操作。 以下管理操作(方法)是针对更改路由解析过程的具体操作:
-
public void setChannelMapping(String key, String channelName): 让您添加一个新映射或修改现有映射关系,将channel identifier与channel name关联起来 -
public void removeChannelMapping(String key): 允许你移除特定的通道映射,从而断开channel identifier和channel name之间的关系
请注意,这些方法可以用于简单的更改(例如更新一个路由或添加/删除一个路由)。</p> <p>但是,如果您想移除一个路由并添加另一个路由,则更新不是原子性的。这意味着在更新之间路由表可能处于不确定的状态。</p> <p>从4.0版本开始,您可以使用控制总线以原子方式更新整个路由表。以下方法可让您做到这一点:
-
public Map<String, String>getChannelMappings(): 返回当前的映射。 -
public void replaceChannelMappings(Properties channelMappings): 更新映射关系。 请注意,channelMappings参数是一个Properties对象。 这种安排使得控制总线命令可以使用内置的StringToPropertiesConverter,如下例所示:
"@'router.handler'.replaceChannelMappings('foo=qux \n baz=bar')"
请注意,每个映射由换行符(\n)分隔。
对于程序的地图更改,由于类型安全方面的考虑,我们建议您使用 setChannelMappings 方法。
replaceChannelMappings 会忽略不是 String 对象的键或值。
使用 JMX 管理路由映射
您也可以使用 Spring 的 JMX 支持来暴露一个路由器实例,然后使用您喜欢的 JMX 客户端(例如 JConsole)来管理这些操作(方法),以更改路由器的配置。
| 有关 Spring Integration 的 JMX 支持的更多信息,请参见 JMX 支持。 |
路由滑槽
从Spring Integration 4.1版本开始,提供了routingSlipAbstractMessageProducingHandleroutputChanneloutput-channelroutingSlipreplyChannel
配置路由条的设置以HeaderEnricher选项呈现——这是一个分号分隔的路由条,包含path个条目,如下例所示:
<util:properties id="properties">
<beans:prop key="myRoutePath1">channel1</beans:prop>
<beans:prop key="myRoutePath2">request.headers[myRoutingSlipChannel]</beans:prop>
</util:properties>
<context:property-placeholder properties-ref="properties"/>
<header-enricher input-channel="input" output-channel="process">
<routing-slip
value="${myRoutePath1}; @routingSlipRoutingPojo.get(request, reply);
routingSlipRoutingStrategy; ${myRoutePath2}; finishChannel"/>
</header-enricher>
该示例包含:
-
使用
<context:property-placeholder>配置来演示路由条目的键可以在path中指定为可解析的键。 -
<header-enricher><routing-slip>子元素用于填充RoutingSlipHeaderValueMessageProcessor到HeaderEnricher处理程序。 -
The
RoutingSlipHeaderValueMessageProcessor接受一个已解析的路由单path条目的String数组,并从processMessage()返回一个singletonMap,其中path作为key,0作为初始routingSlipIndex。
路由表 path 条目可以包含 MessageChannel 个 Bean 名称、RoutingSlipRouteStrategy 个 Bean 名称以及 Spring 表达式(SpEL)。
RoutingSlipHeaderValueMessageProcessor 会在首次 processMessage 调用时,将每个路由表 path 条目与 BeanFactory 进行校验。
它会将应用上下文中非 Bean 名称的条目转换为 ExpressionEvaluatingRoutingSlipRouteStrategy 实例。
RoutingSlipRouteStrategy 条目会被多次调用,直到它们返回 null 或空的 String。
由于路由单涉及getOutputChannel流程,我们拥有一个请求 - 回复上下文。
已引入RoutingSlipRouteStrategy用于确定下一个使用requestMessage和reply对象的outputChannel。
该策略的实现应作为 Bean 注册在应用上下文中,其 Bean 名称将在路由单的path中使用。
提供了ExpressionEvaluatingRoutingSlipRouteStrategy实现。
它接受一个 SpEL 表达式,并使用内部的ExpressionEvaluatingRoutingSlipRouteStrategy.RequestAndReply对象作为评估上下文的根对象。
这是为了避免为每次ExpressionEvaluatingRoutingSlipRouteStrategy.getNextPath()调用创建EvaluationContext的开销。
它是一个简单的 Java Bean,包含两个属性:Message<?> request和Object reply。
通过此表达式实现,我们可以使用 SpEL 指定路由单path条目(例如@routingSlipRoutingPojo.get(request, reply)和request.headers[myRoutingSlipChannel]),从而避免为RoutingSlipRouteStrategy定义 Bean。
参数 requestMessage 始终是一个 Message<?>。 根据上下文,回复对象可能是一个 Message<?>、一个 AbstractIntegrationMessageBuilder,或者是一个任意的应用程序域对象(例如,当它是由服务激活器调用的 POJO 方法返回时)。 在前两种情况下,使用 SpEL(或 Java 实现)时,通常的 Message 属性(payload 和 headers)是可用的。 对于一个任意的域对象,这些属性不可用。 因此,在将路由单与 POJO 方法结合使用时要小心,如果结果用于确定下一个路径。 |
如果路由单(routing slip)涉及分布式环境,我们建议不要在路由单 path 中使用内联表达式。
此建议适用于各种分布式环境,例如跨 JVM 的应用程序、通过消息代理(如AMQP 支持或JMS 支持)进行通信,或在集成流程中使用持久化的 MessageStore(消息存储)。
框架使用 RoutingSlipHeaderValueMessageProcessor 将其转换为 ExpressionEvaluatingRoutingSlipRouteStrategy 对象,并在 routingSlip 消息头中使用它们。
由于该类不是 Serializable(它无法成为可序列化的,因为它依赖于 BeanFactory),整个 Message 变得不可序列化,并且在任何分布式操作中,最终都会导致 NotSerializableException。
为克服此限制,请注册一个带有所需 SpEL 的 ExpressionEvaluatingRoutingSlipRouteStrategy Bean,并在路由单 path 配置中使用其 Bean 名称。 |
对于Java配置,您可以向HeaderEnricher bean定义添加一个RoutingSlipHeaderValueMessageProcessor实例,如下例所示:
@Bean
@Transformer(inputChannel = "routingSlipHeaderChannel")
public HeaderEnricher headerEnricher() {
return new HeaderEnricher(Collections.singletonMap(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
new RoutingSlipHeaderValueMessageProcessor("myRoutePath1",
"@routingSlipRoutingPojo.get(request, reply)",
"routingSlipRoutingStrategy",
"request.headers[myRoutingSlipChannel]",
"finishChannel")));
}
当端点生成回复且未定义outputChannel时,路由 Slip 算法的工作方式如下:
-
使用
routingSlipIndex可以从路由条目path列表中获取一个值。 -
如果来自
routingSlipIndex的值为String,则使用该值从BeanFactory获取一个bean。 -
如果返回的bean是一个
MessageChannel的实例,它将被用作下一个outputChannel,并且在回复消息头中递增routingSlipIndex(路由条path的条目保持不变)。 -
如果返回的bean是一个
RoutingSlipRouteStrategy实例,并且其getNextPath不返回空String,那么这个结果将被用作下一个outputChannel的bean名称。 而routingSlipIndex保持不变。 -
如果
RoutingSlipRouteStrategy.getNextPath返回空的String或null,则routingSlipIndex递增,并且对下一个路由条目path调用getOutputChannelFromRoutingSlip进行递归。 -
如果下一个路由条目
path不是String,那么它必须是RoutingSlipRouteStrategy的实例。 -
当数字
routingSlipIndex超过路由条目path列表的大小时,算法将切换到标准replyChannel头的默认行为。
过程管理企业集成模式
企业集成模式包括 流程管理器 模式。
现在,您可以通过在路由清单中封装在 RoutingSlipRouteStrategy 内的自定义流程管理器逻辑,轻松实现此模式。
除了 Bean 名称外,RoutingSlipRouteStrategy 还可以返回任何 MessageChannel 对象,并且不要求该 MessageChannel 实例必须是应用上下文中的 Bean。
通过这种方式,当无法预测应使用哪个通道时,我们可以提供强大的动态路由逻辑。
可以在 RoutingSlipRouteStrategy 内创建 MessageChannel 并返回它。
具有关联的 MessageHandler 实现的 FixedSubscriberChannel 是此类情况的理想组合。
例如,您可以将消息路由到 响应式流(Reactive Streams),如下示例所示:
@Bean
public PollableChannel resultsChannel() {
return new QueueChannel();
}
@Bean
public RoutingSlipRouteStrategy routeStrategy() {
return (requestMessage, reply) -> requestMessage.getPayload() instanceof String
? new FixedSubscriberChannel(m ->
Mono.just((String) m.getPayload())
.map(String::toUpperCase)
.subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)))
: new FixedSubscriberChannel(m ->
Mono.just((Integer) m.getPayload())
.map(v -> v * 2)
.subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)));
}
过滤器
消息过滤器用于根据某些标准(如消息头值或消息本身)决定是否应该将一个Message传递下去或者丢弃。
因此,一个消息过滤器类似于路由器,但不同的是,对于过滤器输入通道接收到的每个消息,该消息可能会或不会被发送到过滤器的输出通道。
与路由器不同的是,它不涉及决定将消息发送到哪个消息通道的问题,只决定是否发送此消息。
| 在本节后面的部分中,我们将描述该过滤器还支持一个丢弃通道。 在某些情况下,它可以根据布尔条件扮演非常简单的路由器(或“交换机”)的角色。 |
在Spring Integration中,您可以将消息过滤器配置为委托给MessageSelector接口实现的消息端点。
该接口本身相当简单,如下所示:
public interface MessageSelector {
boolean accept(Message<?> message);
}
The MessageFilter 构造函数接受一个选择器实例,如下例所示:
MessageFilter filter = new MessageFilter(someSelector);
使用 Java、Groovy 和 Kotlin DSL 配置过滤器
由 Java DSL(也用作 Groovy 和 Kotlin DSL 的基础)提供的IntegrationFlowBuilder为filter()运算符提供了多个重载方法。
上述提到的MessageSelector抽象可以在filter()定义中作为 Lambda 使用:
@Bean
public IntegrationFlow someFlow() {
return f -> f
.<String>filter((payload) -> !"junk".equals(payload));
}
@Bean
fun someFlow() =
integrationFlow {
filter<String> { it != "junk" }
}
@Bean
someFlow() {
integrationFlow {
filter String, { it != 'junk' }
}
}
在相应的章节中查看关于DSLs的更多信息:
使用 XML 配置过滤器
结合命名空间和SpEL,您可以使用非常少的Java代码配置强大的过滤器。
您可以使用<filter>元素来创建一个消息选择端点。除了input-channel和output-channel属性外,它还需要一个ref属性。
ref可以指向一个MessageSelector实现,如下例所示:
<int:filter input-channel="input" ref="selector" output-channel="output"/>
<bean id="selector" class="example.MessageSelectorImpl"/>
Alternatively, you可以添加method属性。
在这种情况下,ref属性可以引用任何对象。引用的方法可能期望Message类型或入站消息的负载类型。方法必须返回一个布尔值。
如果该方法返回'true',则消息将发送到输出通道。
以下示例展示了如何配置使用method属性的过滤器:
<int:filter input-channel="input" output-channel="output"
ref="exampleObject" method="someBooleanReturningMethod"/>
<bean id="exampleObject" class="example.SomeObject"/>
如果选择器或适配的 POJO 方法返回 false,则有几个设置控制被拒绝消息的处理方式。
默认情况下(如果按前一个示例配置),被拒绝的消息会被静默丢弃。
如果希望拒绝导致错误条件,请将 throw-exception-on-rejection 属性设置为 true,如下例所示:
<int:filter input-channel="input" ref="selector"
output-channel="output" throw-exception-on-rejection="true"/>
如果要将拒绝的消息路由到特定的通道,请提供该引用作为discard-channel,如下例所示:
<int:filter input-channel="input" ref="selector"
output-channel="output" discard-channel="rejectedMessages"/>
如果未提供throwExceptionOnRejection == false和无discardChannel,消息将被静默丢弃,并在日志中仅发出警告信息(从版本6.1开始)。
要不带任何警告的日志信息就丢弃消息,可以在过滤器上配置一个NullChannel作为discardChannel。
框架的目标是默认情况下不要完全静默,如果这是所需行为,则需要显式设置选项。
另请参阅 顾问过滤器。
| 消息过滤器通常与发布/订阅频道一起使用。 许多过滤端点可以订阅同一个频道,并决定是否将消息传递给下一个端点,这个下一个端点可能是任何支持的类型(例如服务激活器)。 这提供了一种相对于使用单个点对点输入频道和多个输出频道的消息路由器而言更加响应式的替代方案。 |
我们推荐在自定义过滤器实现被其他<filter>定义引用时,使用ref属性。
然而,如果自定义过滤器实现仅限于单个<filter>元素,则您应该提供一个内部bean定义,如下例所示:
<int:filter method="someMethod" input-channel="inChannel" output-channel="outChannel">
<beans:bean class="org.foo.MyCustomFilter"/>
</filter>
在同一个<filter>配置中同时使用ref属性和内嵌处理器定义是不允许的,因为这会导致一种模棱两可的情况并抛出异常。 |
如果 ref 属性引用了一个扩展了 MessageFilter 的 bean(例如框架本身提供的过滤器),则配置将通过直接注入输出通道来优化到过滤器 bean。
在这种情况下,每个 ref 必须是单独的 bean 实例(或 prototype-作用域的 bean)或者使用内部的 <bean/> 配置类型。
然而,这种优化仅在您未在过滤器 XML 定义中提供任何特定于过滤器的属性时才适用。
如果您意外地从多个 bean 引用了同一个消息处理器,则会得到一个配置异常。 |
随着SpEL支持的引入,Spring Integration 添加了 expression 属性到 filter 元素。
它可以用于避免完全使用 Java 来实现简单的过滤器,如下例所示:
<int:filter input-channel="input" expression="payload.equals('nonsense')"/>
将作为表达式属性值传递的字符串作为 SpEL 表达式进行评估,其中的消息在评估上下文中可用。
如果必须将在应用程序上下文范围内包含表达式的结果,则可以使用 #{} 符号,如SpEL 参考文档中定义的那样,如下例所示:
<int:filter input-channel="input"
expression="payload.matches(#{filterPatterns.nonsensePattern})"/>
如果表达式本身需要是动态的,您可以使用'expression'子元素。
这提供了通过键从ExpressionSource解析表达式的间接层。
这是一个策略接口,您可以直接实现它,或者依赖Spring Integration中提供的版本,该版本可以从“资源包”加载表达式,并能在给定秒数后检查是否发生了修改。
以下配置示例演示了所有这些功能:如果底层文件被修改,表达式可以在一分钟内重新加载:
<int:filter input-channel="input" output-channel="output">
<int:expression key="filterPatterns.example" source="myExpressions"/>
</int:filter>
<beans:bean id="myExpressions"
class="o.s.i.expression.ReloadableResourceBundleExpressionSource">
<beans:property name="basename" value="config/integration/expressions"/>
<beans:property name="cacheSeconds" value="60"/>
</beans:bean>
如果 ExpressionSource bean 的名称为 expressionSource,则无需在 <expression> 元素上提供 `source` 属性。
不过,在前面的示例中,为了完整性我们仍展示了它。
'config/integration/expressions.properties' 文件(或任何带有区域设置扩展的更具体的版本,将以加载资源包的典型方式解析)可以包含键/值对,如下例所示:
filterPatterns.example=payload > 100
所有将 expression 作为属性或子元素使用的示例,同样可以应用于 transformer、router、splitter、service-activator 和 header-enricher 元素。
给定组件类型的语义和作用会影响对求值结果的解释,这与方法调用的返回值被解释的方式相同。
例如,表达式可以返回字符串,这些字符串将被 router 组件视为消息通道名称。
然而,在 Spring Integration 的所有核心 EIP 组件中,以消息为根对象评估表达式并在前缀为 '@' 时解析 Bean 名称的底层功能是一致的。 |
使用注解配置过滤器
以下示例展示了如何使用注解配置一个过滤器:
public class PetFilter {
...
@Filter (1)
public boolean dogsOnly(String input) {
...
}
}
| 1 | 这是一个指示此方法作为过滤器使用的注解。 如果要将此类用作过滤器,则必须指定此注解。 |
XML 元素提供的所有配置选项也同样适用于 @Configuration 注解。
该过滤器可以从XML中显式引用,或者如果类上定义了@MessageEndpoint注解,则可以通过类路径扫描自动检测。
另请参阅 使用注解配置端点。
拆分器
splitter 是一个组件,其作用是将消息分割成几个部分,并发送这些结果消息独立进行处理。 非常经常地,它们在包含聚合器的管道中作为上游生产者。
编程模型
performing splitting 的 API 包含一个基类,AbstractMessageSplitter。
它是一个 MessageHandler 实现,封装了分隔器共有的功能,例如在生成的消息中填充适当的消息头(CORRELATION_ID、SEQUENCE_SIZE 和 SEQUENCE_NUMBER)。
这些填充使得跟踪消息及其处理结果成为可能(通常情况下,这些头部信息会被复制到各种转换端点生成的消息中)。
然后可以使用这些值,例如由一个 组合消息处理器。
The following example shows an excerpt from AbstractMessageSplitter:
public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);
}
要在一个应用程序中实现特定的拆分器,您可以扩展AbstractMessageSplitter并实现splitMessage方法,该方法包含拆分消息的逻辑。
返回值可以是以下之一:
-
一个
Collection、一个消息数组或一个Iterable(或Iterator),该值会迭代消息。 在这种情况下,消息会在CORRELATION_ID、SEQUENCE_SIZE和SEQUENCE_NUMBER填充之后被发送作为消息。 采用这种方法可以让你获得更多的控制权——例如,在拆分过程中填充自定义消息头。 -
一个
Collection或非消息对象的数组,或者一个Iterable(或Iterator),该值会迭代非消息对象。 它的工作方式类似于前一种情况,除了每个集合元素都会作为消息负载使用。 通过这种方式,您可以在不考虑消息系统的情况下专注于领域对象,并生成更易于测试的代码。 -
a
Message或非消息对象(但不是集合或数组)。 它与之前的案例类似,只是发送一条消息。
在 Spring Integration 中,任何 POJO 都可以实现拆分算法,前提是该类定义了一个接受单个参数并返回结果的方法。
在此情况下,方法的返回值按前文所述进行解释。
输入参数可以是 Message 或一个简单的 POJO。
在后一种情况下,拆分器将接收传入消息的负载(payload)。
我们推荐这种方案,因为它使代码与 Spring Integration API 解耦,并且通常更易于测试。
迭代器
从版本 4.1 开始,AbstractMessageSplitter 支持将 value 拆分为 Iterator 类型。
注意,在出现 Iterator(或 Iterable)的情况下,我们无法访问底层项目的数量,且 SEQUENCE_SIZE 头被设置为 0。
这意味着 <aggregator> 的默认 SequenceSizeReleaseStrategy 将无法工作,来自 splitter 的 CORRELATION_ID 的分组也不会被释放;它将保持为 incomplete。
在这种情况下,您应使用适当的自定义 ReleaseStrategy,或结合 send-partial-result-on-expiry 与 group-timeout 或一个 MessageGroupStoreReaper 来依赖处理。
从版本 5.0 开始,AbstractMessageSplitter 提供了 protected obtainSizeIfPossible() 方法来允许确定 Iterable 和 Iterator 对象的大小(如果可能的话)。
例如,XPathMessageSplitter 可以确定其底层 NodeList 对象的大小。
并且从版本 5.0.9 开始,这个方法也正确地返回了 com.fasterxml.jackson.core.TreeNode 的大小。
一个Iterator对象可以在不需要在内存中构建整个集合的情况下避免分片的需要。
例如,当基础项是从某些外部系统(例如,数据库或FTP MGET)通过迭代或流来填充时。
流与 Flux
从5.0版本开始,AbstractMessageSplitter支持Java Stream和Reactive Streams Publisher类型来划分value。
在这种情况下,目标Iterator是基于它们的迭代功能构建的。
此外,如果拆分器的输出通道是ReactiveStreamsSubscribableChannel的一个实例,则AbstractMessageSplitter会生成一个Flux结果而不是Iterator,并且输出通道将订阅这个Flux以基于下游流的需求进行回压拆分。
从版本 5.2 开始,拆分器支持使用 discardChannel 选项来发送那些拆分函数返回空容器(集合、数组、流、Flux 等)的请求消息。
在这种情况下,没有项目可供迭代以发送到 outputChannel。
null 的拆分结果将保留为流程结束指示符。
使用 Java、Groovy 和 Kotlin DSL 配置 Splitter
基于Message及其可迭代载荷的简单拆分器示例(使用DSL配置):
@Bean
public IntegrationFlow someFlow() {
return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
integrationFlow {
split<Message<*>> { it.payload }
}
@Bean
someFlow() {
integrationFlow {
split Message<?>, { it.payload }
}
}
查看有关 DSL 的更多信息,请参见相应的章节:
使用 XML 配置拆分器
可以使用XML配置一个分隔符,如下所示:
<int:channel id="inputChannel"/>
<int:splitter id="splitter" (1)
ref="splitterBean" (2)
method="split" (3)
input-channel="inputChannel" (4)
output-channel="outputChannel" (5)
discard-channel="discardChannel" /> (6)
<int:channel id="outputChannel"/>
<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
| 1 | 分隔符的ID是可选的。 |
| 2 | 对应用程序上下文中定义的 bean 的引用。
该 bean 必须实现拆分逻辑,如前文所述。
可选。
如果未提供对 bean 的引用,则假设到达 input-channel 的消息负载是 java.util.Collection 的实现,并将默认拆分逻辑应用于该集合,将每个单独元素封装为消息并发送至 output-channel。 |
| 3 | 实现拆分逻辑的方法(在bean上定义)。 可选的。 |
| 4 | 分流器的输入通道。 必需。 |
| 5 | 发送结果的通道(拆分器将结果发送到该通道)。 可选的(因为传入的消息本身可以指定回复通道)。 |
| 6 | 在拆分结果为空的情况下,请求消息被发送到的通道。
可选(如果结果为 null,它们将停止)。 |
我们推荐使用一个ref属性,如果自定义分割器实现可以在其他<splitter>定义中引用。
然而,如果自定义分割器处理器实现应该局限于单个<splitter>的定义中,你可以配置一个内部bean定义,如下例所示:
<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
在同一<int:splitter> 配置中同时使用 ref 属性和内部处理器定义是不允许的,因为这会导致条件模糊,并引发异常。 |
如果 ref 属性引用了一个扩展了 AbstractMessageProducingHandler 的 bean(例如框架本身提供的 splitters),配置将会优化,直接注入输出通道到处理器中。
在这种情况下,每个 ref 必须是一个单独的 bean 实例(或者 prototype-作用域的 bean)或使用内部的 <bean/> 配置类型。
然而,这种优化仅适用于您在 splitters XML 定义中没有提供任何特定于 splitter 的属性的情况。
如果您不小心从多个 bean 引用了同一个消息处理器,则会收到配置异常。 |
使用注解配置拆分器
The @Splitter annotation is applicable to methods that expect either the Message type or the message payload type, and the return values of the method should be a Collection of any type.
If the returned values are not actual Message objects, each item is wrapped in a Message as the payload of the Message.
Each resulting Message is sent to the designated output channel for the endpoint on which the @Splitter is defined.
以下示例展示了如何使用@Splitter注解来配置一个splitter:
@Splitter
List<LineItem> extractItems(Order order) {
return order.getItems()
}
聚合器
聚合器本质上是拆分器的镜像,它是一种消息处理器,接收多条消息并将它们合并为一条消息。 实际上,聚合器通常是包含拆分器的管道中的下游消费者。
从技术角度看,聚合器比拆分器更复杂,因为它是有状态的。
它必须保存待聚合的消息,并确定何时完整的消息组已准备好进行聚合。
为此,它需要一个 MessageStore。
功能
聚合器通过关联和存储一组相关的消息,直到该组被视为完整,从而将这些消息合并。 此时,聚合器处理整个组并创建单条消息,然后将聚合后的消息作为输出发送。
实现聚合器需要提供执行聚合的逻辑(即从多条消息创建单条消息)。 两个相关的概念是关联和释放。
相关性决定了消息如何被分组以进行聚合。
在 Spring Integration 中,默认基于 IntegrationMessageHeaderAccessor.CORRELATION_ID 消息头执行相关性处理。
具有相同 IntegrationMessageHeaderAccessor.CORRELATION_ID 的消息会被归为一组。
然而,您可以自定义相关性策略,以支持其他指定消息分组方式的方法。
为此,您可以实现一个 CorrelationStrategy(本章后续将对此进行介绍)。
要确定一组消息何时准备好进行处理,需查阅 ReleaseStrategy。
聚合器的默认释放策略是:当基于 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 头信息的所有序列中包含的消息均已到达时,释放该组消息。
您可以通过提供自定义 ReleaseStrategy 实现的引用来覆盖此默认策略。
编程模型
聚合 API 由多个类组成:
-
接口
MessageGroupProcessor,及其子类:MethodInvokingAggregatingMessageGroupProcessor和ExpressionEvaluatingMessageGroupProcessor -
ReleaseStrategy接口及其默认实现:SimpleSequenceSizeReleaseStrategy -
CorrelationStrategy接口及其默认实现:HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
The AggregatingMessageHandler(AbstractCorrelatingMessageHandler 的一个子类)是一个 MessageHandler 实现,封装了聚合器(以及其他相关用例)的通用功能,具体如下:
-
将消息关联到组中进行聚合
-
在组可以被释放之前,将这些消息保留在
MessageStore中 -
决定何时可以发布该组
-
将已发布的组聚合为单条消息
-
识别并响应已过期的组
决定消息应如何分组的职责被委托给 CorrelationStrategy 实例。
决定消息组是否可以释放的职责被委托给 ReleaseStrategy 实例。
以下列表展示了基础 AbstractAggregatingMessageGroupProcessor 的简要亮点(实现 aggregatePayloads 方法的职责留给开发者):
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
请参阅 DefaultAggregatingMessageGroupProcessor、ExpressionEvaluatingMessageGroupProcessor 和 MethodInvokingMessageGroupProcessor,它们是 AbstractAggregatingMessageGroupProcessor 的开箱即用实现。
从版本 5 开始。2, 一个 Function<MessageGroup, Map<String, Object>> 策略可用于 AbstractAggregatingMessageGroupProcessor,以合并和计算(聚合)输出消息的标头。DefaultAggregateHeadersFunction 的实现可用,其逻辑是返回组内无冲突的所有标头;组内一条或多条消息中缺失的标头不被视为冲突。冲突的标头已被省略。随着新引入的DelegatingMessageGroupProcessor,此函数用于任何任意(非AbstractAggregatingMessageGroupProcessor)的MessageGroupProcessor实现。本质上,该框架将提供的函数注入到 AbstractAggregatingMessageGroupProcessor 实例中,并将所有其他实现包装到 DelegatingMessageGroupProcessor 中。AbstractAggregatingMessageGroupProcessor 和 DelegatingMessageGroupProcessor 在逻辑上的区别在于:后者不会在调用委托策略之前预先计算请求头,并且如果委托返回 Message 或 AbstractIntegrationMessageBuilder,则不会执行该函数。在这种情况下,框架假设目标实现已负责生成一组正确的标头并填充到返回结果中。Function<MessageGroup, Map<String, Object>>策略可作为 XML 配置的headers-function参考属性、Java DSL 的AggregatorSpec.headersFunction()选项,以及纯 Java 配置的AggregatorFactoryBean.setHeadersFunction()使用。
CorrelationStrategy 由 AbstractCorrelatingMessageHandler 拥有,并且其默认值基于 IntegrationMessageHeaderAccessor.CORRELATION_ID 消息头,如下示例所示:
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
对于消息组的实际处理,默认实现是 DefaultAggregatingMessageGroupProcessor。
它会创建一个单一的 Message,其负载是为给定组接收的负载的 List。
这对于具有上游拆分器、发布 - 订阅通道或收件人列表路由器的简单分散 - 聚集实现效果良好。
在使用发布 - 订阅通道或收件人列表路由器处理此类场景时,请务必启用 apply-sequence 标志。
这样做会添加必要的标头:CORRELATION_ID、SEQUENCE_NUMBER 和 SEQUENCE_SIZE。
在 Spring Integration 中,该行为对拆分器(splitters)默认启用,但对发布 - 订阅通道或收件人列表路由器并未默认启用,因为这些组件可能用于各种上下文,而在这些上下文中并不需要这些标头。 |
在为应用程序实现特定的聚合器策略时,您可以扩展 AbstractAggregatingMessageGroupProcessor 并实现 aggregatePayloads 方法。
然而,存在更优的解决方案,它们与 API 的耦合度更低,可用于实现聚合逻辑,这些逻辑可以通过 XML 或注解进行配置。
一般来说,任何 POJO 都可以实现聚合算法,只要它提供一个接受单个 java.util.List 作为参数的方法(也支持参数化列表)。
该方法按以下方式被调用以聚合消息:
-
如果参数为
java.util.Collection<T>,且参数类型 T 可分配给Message,则会将为聚合累积的整个消息列表发送给聚合器。 -
如果参数是非泛型
java.util.Collection或参数类型不可分配给Message,则该方法接收累积消息的负载。 -
如果返回类型无法分配给
Message,则它将被视为由框架自动创建的Message的有效载荷。
| 为了代码简洁并推广最佳实践(如低耦合、可测试性等),实现聚合逻辑的首选方式是通过 POJO,并使用 XML 或注解支持在应用程序中对其进行配置。 |
从 5.3 版本开始,在处理消息组后,AbstractCorrelatingMessageHandler会对 MessageBuilder.popSequenceDetails() 消息头进行修改,以适配具有多个嵌套层级的拆分器 - 聚合器场景。
此操作仅在消息组的释放结果不是消息集合时才会执行。
在这种情况下,目标 MessageGroupProcessor 负责在构建这些消息时调用 MessageBuilder.popSequenceDetails()。
如果 MessageGroupProcessor 返回 Message,则仅当 sequenceDetails 与组中的第一条消息匹配时,才会对输出消息执行 MessageBuilder.popSequenceDetails()。
(此前,仅在从 MessageGroupProcessor 返回纯负载或 AbstractIntegrationMessageBuilder 时才执行此操作。)
此功能可通过一个新的 popSequence boolean 属性进行控制,因此当标准拆分器未填充关联详细信息时,在某些场景下可以禁用 MessageBuilder.popSequenceDetails()。
该属性本质上会撤销在 AbstractMessageSplitter 中由最近的上游 applySequence = true 所执行的操作。
有关更多信息,请参阅 Splitter。
The SimpleMessageGroup.getMessages() method returns an unmodifiableCollection.
Therefore, if an aggregating POJO method has a Collection<Message> parameter, the argument passed in is exactly that Collection instance and, when you use a SimpleMessageStore for the aggregator, that original Collection<Message> is cleared after releasing the group.
Consequently, the Collection<Message> variable in the POJO is cleared too, if it is passed out of the aggregator.
If you wish to simply release that collection as-is for further processing, you must build a new Collection (for example, new ArrayList<Message>(messages)).
Starting with version 4.3, the framework no longer copies the messages to a new collection, to avoid undesired extra object creation. |
在 4.2 版本之前,无法通过 XML 配置提供 MessageGroupProcessor。
仅可使用 POJO 方法进行聚合。
现在,如果框架检测到引用的(或内部)Bean 实现了 MessageProcessor,则将其用作聚合器的输出处理器。
如果您希望从自定义 MessageGroupProcessor 中释放对象集合作为消息的有效负载,您的类应扩展 AbstractAggregatingMessageGroupProcessor 并实现 aggregatePayloads()。
此外,自 4.2 版本起,提供了 SimpleMessageGroupProcessor。
它返回消息组的消息集合,如前所述,这将导致已释放的消息单独发送。
这使得聚合器能够作为消息屏障工作,到达的消息将被保留,直到释放策略触发,并且该组将作为一系列单独的消息被释放。
从版本 6.0 开始,上述描述的拆分行为仅在分组处理器为 SimpleMessageGroupProcessor 时生效。
否则,对于任何其他返回 Collection<Message> 的 MessageGroupProcessor 实现,只会发送一条包含整个消息集合作为其负载的回复消息。
这种逻辑由聚合器的核心目的决定:按某个键收集请求消息并生成单个分组消息。
ReleaseStrategy
ReleaseStrategy 接口定义如下:
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
通常,任何 POJO 都可以实现完成决策逻辑,前提是该类提供一个接受单个 java.util.List(参数化列表也支持)作为参数的方法,并返回一个布尔值。
该方法在每条新消息到达后被调用,以判断组是否已完成,如下所示:
-
如果参数为
java.util.List<T>,且参数类型T可分配给Message,则将该组中累积的所有消息发送到该方法。 -
如果参数是非参数化的
java.util.List,或者参数类型无法分配给Message,则该方法将接收累积消息的负载。 -
如果消息组已准备好进行聚合,则该方法必须返回
true,否则返回 false。
以下示例展示了如何为类型为 Message 的 List 使用 @ReleaseStrategy 注解:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下示例展示了如何为类型为 String 的 List 使用 @ReleaseStrategy 注解:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
基于前两个示例中的签名,基于 POJO 的发布策略会传递 Collection 个尚未发布的消息(如果您需要访问整个 Message),或者传递 Collection 个负载对象(如果类型参数不是 Message)。
这满足了大多数用例。
然而,如果出于某种原因您需要访问完整的 MessageGroup,则应提供 ReleaseStrategy 接口的实现。
|
在处理可能较大的组时,您应了解这些方法的调用方式,因为在组被释放之前,释放策略可能会被多次调用。
最高效的是实现 出于这些原因,对于大型团队,我们建议您实施 |
当组被释放以进行聚合时,该组中所有尚未释放的消息都会被处理并从组中移除。
如果该组也已完整(即,来自某个序列的所有消息都已到达,或者未定义序列),则该组被标记为已完成。
任何针对此组的新消息都将被发送到丢弃通道(如果已定义)。
将 expire-groups-upon-completion 设置为 true(默认值为 false)会移除整个组,而任何新消息(其关联 ID 与被移除组的关联 ID 相同)将形成一个新的组。
您可以通过使用 MessageGroupStoreReaper 并将 send-partial-result-on-expiry 设置为 true 来释放部分序列。
为了便于丢弃迟到的消息,聚合器在释放组后必须维护该组的状态。
这最终可能导致内存溢出(out-of-memory)情况。
为避免此类情况,您应该考虑将 MessageGroupStoreReaper 配置为移除组元数据。
过期参数应设置为:一旦达到某个时间点(此后预期不再有迟到消息到达),即让组过期。
有关配置回收器(reaper)的详细信息,请参见 管理聚合器中的状态:MessageGroupStore。 |
Spring Integration 为 ReleaseStrategy 提供了实现:SimpleSequenceSizeReleaseStrategy。
该实现会检查每条到达消息的 SEQUENCE_NUMBER 和 SEQUENCE_SIZE 标头,以决定消息组何时完成并准备进行聚合。
如前所述,它也是默认策略。
在 5.0 版本之前,默认的发布策略是 SequenceSizeReleaseStrategy,该策略在处理大型组时性能不佳。
使用该策略时,会检测并拒绝重复的序列号。
此操作可能代价较高。 |
如果您正在聚合大型组,则无需释放部分组,也无需检测/拒绝重复序列,请考虑使用 SimpleSequenceSizeReleaseStrategy 替代——对于这些用例,它的效率更高,并且自 5.0 版本 起,在未指定部分组释放的情况下默认为此选项。
聚合大型组
4.3 版本将消息在 SimpleMessageGroup 中的默认值从 Collection 更改为 HashSet(此前为 BlockingQueue)。
当从大型组中移除单个消息时,这一变更开销较大(需要进行 O(n) 线性扫描)。
虽然哈希集在移除操作上通常更快,但对于大消息而言可能开销较大,因为在插入和移除时都需要计算哈希值。
如果您的消息哈希计算代价较高,请考虑使用其他集合类型。
正如在 使用 MessageGroupFactory 中所讨论的,提供了 SimpleMessageGroupFactory,以便您可以选择最适合需求的 Collection。
您还可以提供自己的工厂实现来创建其他类型的 Collection<Message<?>>。
以下示例展示了如何配置聚合器,使用之前的实现和一个 SimpleSequenceSizeReleaseStrategy:
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果过滤器端点参与了聚合器上游的流程,那么序列大小释放策略(固定或基于 sequenceSize 头)将无法达到其目的,因为序列中的一些消息可能会被过滤器丢弃。
在这种情况下,建议选择另一个 ReleaseStrategy,或者使用从丢弃子流程发送的补偿消息,这些消息的内容中包含一些信息,以便在自定义的完整组函数中被跳过。
有关更多信息,请参阅 过滤器。 |
关联策略
CorrelationStrategy 接口定义如下:
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
该方法返回一个 Object,表示用于将消息与消息组关联的相关键。
该键必须满足在实现 equals() 和 hashCode() 时,针对 Map 中键所使用的标准。
通常,任何 POJO 都可以实现关联逻辑,将消息映射到方法参数(或多个参数)的规则与 ServiceActivator 相同(包括对 @Header 注解的支持)。
该方法必须返回一个值,且该值不能为 null。
Spring Integration 为 CorrelationStrategy:提供了实现 HeaderAttributeCorrelationStrategy。
此实现将消息头之一的值(其名称由构造函数参数指定)作为关联键返回。
默认情况下,关联策略是一个 HeaderAttributeCorrelationStrategy,它返回 CORRELATION_ID 头属性的值。
如果您有希望用于关联的自定义头名称,可以在 HeaderAttributeCorrelationStrategy 的实例上进行配置,并将其作为聚合器关联策略的引用提供。
锁注册表
对组的更改是线程安全的。
因此,当您并发发送具有相同关联 ID 的消息时,聚合器中只会处理其中一条消息,使其实际上成为每个消息组单线程。
使用 LockRegistry 来获取已解析关联 ID 的锁。
默认使用 DefaultLockRegistry(内存中)。
对于在使用共享 MessageGroupStore 的情况下跨服务器同步更新,您必须配置共享锁注册表。
避免死锁
如上所述,当消息组发生突变(添加或释放消息)时,会持有锁。
考虑以下流程:
...->aggregator1-> ... ->aggregator2-> ...
如果存在多个线程,且聚合器共享一个公共锁注册表,则可能发生死锁。
这将导致线程挂起,并且 jstack <pid> 可能会呈现如下结果:
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有几种方法可以避免这个问题:
-
确保每个聚合器都有其自己的锁注册表(这可以是跨应用程序实例共享的注册表,但流程中的两个或更多聚合器必须各自拥有独立的注册表)
-
使用
ExecutorChannel或QueueChannel作为聚合器的输出通道,以便下游流程在新线程上运行 -
从版本 5.1.1 开始,将
releaseLockBeforeSend聚合器属性设置为true
| 如果出于某种原因,单个聚合器的输出最终被路由回同一个聚合器,也可能导致此问题。 当然,上述第一种解决方案在这种情况下并不适用。 |
在 Java DSL 中配置聚合器
有关如何在 Java DSL 中配置聚合器,请参阅 聚合器和重排序器。
使用 XML 配置聚合器
Spring Integration 支持通过 <aggregator/> 元素使用 XML 配置聚合器。
以下示例展示了聚合器的用法:
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" (1)
auto-startup="true" (2)
input-channel="inputChannel" (3)
output-channel="outputChannel" (4)
discard-channel="throwAwayChannel" (5)
message-store="persistentMessageStore" (6)
order="1" (7)
send-partial-result-on-expiry="false" (8)
send-timeout="1000" (9)
correlation-strategy="correlationStrategyBean" (10)
correlation-strategy-method="correlate" (11)
correlation-strategy-expression="headers['foo']" (12)
ref="aggregatorBean" (13)
method="aggregate" (14)
release-strategy="releaseStrategyBean" (15)
release-strategy-method="release" (16)
release-strategy-expression="size() == 5" (17)
expire-groups-upon-completion="false" (18)
empty-group-min-timeout="60000" (19)
lock-registry="lockRegistry" (20)
group-timeout="60000" (21)
group-timeout-expression="size() ge 2 ? 100 : -1" (22)
expire-groups-upon-timeout="true" (23)
scheduler="taskScheduler" > (24)
<expire-transactional/> (25)
<expire-advice-chain/> (26)
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
| 1 | 聚合器的 ID 是可选的。 |
| 2 | 生命周期属性,指示聚合器是否应在应用程序上下文启动期间启动。 可选(默认为'true')。 |
| 3 | 聚合器接收消息的通道。 必填。 |
| 4 | 聚合器发送聚合结果的通道。 可选(因为传入的消息本身可以在'replyChannel'消息头中指定回复通道)。 |
| 5 | 聚合器发送超时消息的目标通道(如果 send-partial-result-on-expiry 为 false)。
可选。 |
| 6 | 对 MessageGroupStore 的引用,用于在消息完成前按相关键存储消息组。
可选。
默认情况下,它是一个易失性内存存储。
有关更多信息,请参阅 消息存储。 |
| 7 | 当多个处理器订阅到同一个 DirectChannel 时,此聚合器的顺序(用于负载均衡目的)。
可选。 |
| 8 | 表示过期的消息应在其包含的MessageGroup过期后,被聚合并发送到“output-channel”或“replyChannel”(参见MessageGroupStore.expireMessageGroups(long))。
使MessageGroup过期的一种方法是配置MessageGroupStoreReaper。
然而,您也可以通过调用MessageGroupStore.expireMessageGroups(timeout)来使MessageGroup过期。
您可以通过控制总线操作实现这一点,或者如果您拥有MessageGroupStore实例的引用,可以通过调用expireMessageGroups(timeout)来实现。
否则,该属性本身不会执行任何操作。
它仅用作指示器,用于判断是否应丢弃或将仍位于即将过期的MessageGroup中的任何消息发送到输出通道或回复通道。
可选(默认为false)。
注意:此属性更恰当的命名应为send-partial-result-on-timeout,因为如果expire-groups-upon-timeout设置为false,则分组可能实际上不会过期。 |
| 9 | 发送回复时等待的超时间隔 Message 到 output-channel 或 discard-channel。
默认为 30 秒。
仅当输出通道存在某些“发送”限制(例如具有固定“容量”的 QueueChannel)时才适用此设置。
在这种情况下,将抛出 MessageDeliveryException。
对于 AbstractSubscribableChannel 实现,send-timeout 将被忽略。
对于 group-timeout(-expression),来自计划过期任务的 MessageDeliveryException 会导致该任务被重新调度。
可选。 |
| 10 | 对实现消息关联(分组)算法的 Bean 的引用。
该 Bean 可以是 CorrelationStrategy 接口的实现,也可以是一个 POJO。
在后一种情况下,还必须定义 correlation-strategy-method 属性。
可选(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 标头)。 |
| 11 | 由 correlation-strategy 引用的 Bean 上定义的方法。
它实现了关联决策算法。
可选,但有限制(必须存在 correlation-strategy)。 |
| 12 | 表示关联策略的 SpEL 表达式。
示例:"headers['something']"。
只允许使用 correlation-strategy 或 correlation-strategy-expression 中的一个。 |
| 13 | 对应用上下文中定义的 Bean 的引用。 该 Bean 必须实现前文所述的聚合逻辑。 可选(默认情况下,聚合消息列表将成为输出消息的有效载荷)。 |
| 14 | 由 ref 属性引用的 Bean 上定义的方法。
它实现了消息聚合算法。
可选(取决于是否定义了 ref 属性)。 |
| 15 | 对实现发布策略的 Bean 的引用。
该 Bean 可以是 ReleaseStrategy 接口的实现类,也可以是一个 POJO。
在后一种情况下,还必须定义 release-strategy-method 属性。
可选(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 头属性)。 |
| 16 | 由 release-strategy 属性引用的 Bean 上定义的方法。
它实现了完成决策算法。
可选,但有限制(必须存在 release-strategy)。 |
| 17 | 表示发布策略的 SpEL 表达式。
表达式的根对象为 MessageGroup。
示例:"size() == 5"。
仅允许使用 release-strategy 或 release-strategy-expression 中的一个。 |
| 18 | 当设置为 true(默认为 false)时,已完成的组将从消息存储中移除,允许具有相同关联的后续消息形成新组。
默认行为是将与已完成组具有相同关联的消息发送到 discard-channel。 |
| 19 | 仅当为 <aggregator> 的 MessageStore 配置了 MessageGroupStoreReaper 时生效。
默认情况下,当配置 MessageGroupStoreReaper 以过期部分组时,空组也会被移除。
空组在组正常释放后存在。
空组可用于检测和丢弃迟到的消息。
如果您希望以比过期部分组更长的周期来过期空组,请设置此属性。
此时,空组将不会被从 MessageStore 中移除,直到它们至少未被修改过指定的毫秒数。
请注意,实际过期空组的时间也会受到收割器(reaper)的 timeout 属性的影响,最长可能为该值加上超时时间。 |
| 20 | 对 org.springframework.integration.util.LockRegistry bean 的引用。
它用于基于 groupId 获取一个 Lock,以便在 MessageGroup 上进行并发操作。
默认情况下,使用内部 DefaultLockRegistry。
使用分布式 LockRegistry(例如 ZookeeperLockRegistry)可确保聚合器只有一个实例能够同时处理一组操作。
有关更多信息,请参阅 Redis 锁注册表 或 Zookeeper 锁注册表。 |
| 21 | 一个超时时间(以毫秒为单位),用于在当前消息到达时,如果ReleaseStrategy未释放组,则强制MessageGroup完成。此属性为聚合器提供了一种基于时间的内置发布策略,当在从最后一条消息到达开始计算的超时时间内没有新消息到达时,如果需要发出部分结果(或丢弃该组)。要设置从 MessageGroup 创建时刻开始计时的超时,请参阅 group-timeout-expression 相关信息。当新消息到达聚合器时,其 MessageGroup 对应的任何现有 ScheduledFuture<?> 将被取消。如果 ReleaseStrategy 返回 false(表示不释放)和 groupTimeout > 0,则计划一个新任务来使该组过期。我们不建议将此属性设置为零(或负值)。这样做会有效地禁用聚合器,因为每个消息组都会立即完成。不过,您可以通过使用表达式有条件地将其设置为零(或负值)。请参阅 group-timeout-expression 获取相关信息。完成期间采取的操作取决于 ReleaseStrategy 和 send-partial-group-on-expiry 属性。参见 聚合器和组超时 以获取更多信息。它与 'group-timeout-expression' 属性互斥。 |
| 22 | 求值为 groupTimeout 的 SpEL 表达式,其中 MessageGroup 作为 #root 评估上下文对象。用于调度强制完成 MessageGroup。如果表达式计算结果为 null,则不会安排完成。如果评估结果为零,则该组将立即在当前线程上完成。实际上,这提供了一个动态的 group-timeout 属性。作为一个示例,如果您希望在组创建后经过 10 秒强制完成 MessageGroup,可以考虑使用以下 SpEL 表达式:timestamp + 10000 - T(System).currentTimeMillis(),其中 timestamp 由 MessageGroup.getTimestamp() 提供,而 MessageGroup 此处是 #root 求值上下文对象。请注意,组的创建时间可能因其他组过期属性的配置而与第一条消息到达的时间不同。参见 group-timeout 以获取更多信息。与 'group-timeout' 属性互斥。 |
| 23 | 当组因超时(或通过MessageGroupStoreReaper)完成时,默认情况下该组会过期(被完全移除)。
迟到的消息将启动一个新组。
您可以将其设置为false以完成该组,但保留其元数据,以便丢弃迟到的消息。
空组可以稍后使用MessageGroupStoreReaper与empty-group-min-timeout属性一起进行过期处理。
默认为'true'。 |
| 24 | 一个 TaskScheduler Bean 引用,用于在 groupTimeout 内没有新消息到达时,强制完成 MessageGroup。如果未提供,则使用在 ApplicationContext (ThreadPoolTaskScheduler) 中注册的默认调度器 (taskScheduler)。如果未指定 group-timeout 或 group-timeout-expression,则此属性不适用。 |
| 25 | 自 4.1 版本起。
它允许为 forceComplete 操作启动事务。
该事务由 group-timeout(-expression) 或 MessageGroupStoreReaper 发起,且不适用于正常的 add、release 和 discard 操作。
仅允许此子元素或 <expire-advice-chain/>。 |
| 26 | 自版本 4.1起。
它允许为forceComplete操作配置任意Advice值。
它由group-timeout(-expression)或MessageGroupStoreReaper触发,不适用于正常的add、release和discard操作。
仅允许此子元素或<expire-transactional/>。
也可以使用 Spring tx命名空间在此处配置事务Advice。 |
|
即将过期的组
有两个与过期(完全移除)组相关的属性。
当组过期时,不会保留其记录;如果收到具有相同关联的新消息,将启动一个新组。
当组完成(未过期)时,空组会保留,后续到达的消息将被丢弃。
可以稍后通过使用
如果一个组没有正常完成,而是因超时而被释放或丢弃,则该组通常会被标记为过期。
从 4.1 版本开始,您可以通过使用
自 5.0 版本起,空组也会在 从版本 5 开始。4. 聚合器(以及重排序器)可配置为过期孤儿组(即持久化消息存储中可能不会被释放的组)。 |
我们通常建议在自定义聚合器处理器实现可能在其他 <aggregator> 定义中被引用时,使用 ref 属性。
然而,如果自定义聚合器实现仅被单个 <aggregator> 定义使用,则可以使用内部 bean 定义(从版本 1.0.3 开始)在 <aggregator> 元素中配置聚合 POJO,如下示例所示:
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
在同一个 <aggregator> 配置中同时使用 ref 属性和内部 Bean 定义是不允许的,因为这会产生歧义条件。
在这种情况下,将抛出异常。 |
以下示例展示了聚合器 bean 的实现:
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
前述示例的完成策略 Bean 的实现可能如下:
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
| 在合理的情况下,发布策略方法和聚合器方法可以合并为单个 Bean。 |
上述示例的相关策略 Bean 的一种实现可能如下所示:
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
前述示例中的聚合器会根据某些条件(在本例中为除以十后的余数)对数字进行分组,并保留该组,直到有效负载提供的数字总和超过某个值。
| 在合理的情况下,发布策略方法、关联策略方法和聚合器方法可以合并到同一个 Bean 中。 (实际上,它们全部或任意两个都可以合并。) |
聚合器与 Spring 表达式语言 (SpEL)
自 Spring Integration 2.0 起,您可以使用 SpEL 处理各种策略(关联、释放和聚合),如果此类释放策略背后的逻辑相对简单,我们推荐使用 SpEL。
假设您有一个遗留组件,其设计目的是接收对象数组。
我们知道,默认的释放策略会将所有聚合消息组装到 List 中。
现在我们要解决两个问题。
首先,我们需要从列表中提取单个消息。
其次,我们需要提取每条消息的有效载荷并组装成对象数组。
以下示例解决了这两个问题:
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<String>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
然而,借助 SpEL,此类需求实际上可以通过一行表达式相对轻松地处理,从而避免您编写自定义类并将其配置为 Bean。 以下示例展示了如何实现:
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在前面的配置中,我们使用集合投影表达式从列表中所有消息的负载组装一个新集合,然后将其转换为数组,从而实现与早期 Java 代码相同的结果。
在处理自定义发布和相关性策略时,您可以应用相同的基于表达式的做法。
无需在 correlation-strategy 属性中为自定义 CorrelationStrategy 定义 bean,您可以将简单的关联逻辑实现为 SpEL 表达式,并在 correlation-strategy-expression 属性中进行配置,如下示例所示:
correlation-strategy-expression="payload.person.id"
在前面的示例中,我们假设有效载荷具有一个 person 属性,其值为 id,该值将用于关联消息。
同样,对于 ReleaseStrategy,您可以将发布逻辑实现为 SpEL 表达式,并将其配置在 release-strategy-expression 属性中。
评估上下文的根对象是 MessageGroup 本身。
消息的 List 可以通过表达式中使用组的 message 属性进行引用。
在 5.0 版本之前的发布版中,根对象是 Message<?> 的集合,如前面的示例所示: |
release-strategy-expression="!messages.?[payload==5].empty"
在前面的示例中,SpEL 评估上下文的根对象是 MessageGroup 本身,您声明的是,只要该组中存在有效载荷为 5 的消息,该组即应被释放。
聚合器与分组超时
从版本 4.0 开始,引入了两个新的互斥属性:group-timeout 和 group-timeout-expression。
请参阅 使用 XML 配置聚合器。
在某些情况下,如果当前消息到达时 ReleaseStrategy 未释放,您可能需要在超时后发出聚合结果(或丢弃该组)。
为此,groupTimeout 选项允许强制调度 MessageGroup 完成,如下示例所示:
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
通过此示例,如果聚合器按release-strategy-expression定义的顺序接收到最后一条消息,则正常发布是可行的。
如果该特定消息未到达,只要组中包含至少两条消息,groupTimeout将在十秒后强制完成该组。
强制组完成的结果取决于 ReleaseStrategy 和 send-partial-result-on-expiry。
首先,再次查阅发布策略,以确定是否进行正常发布。
由于组未发生变化,ReleaseStrategy 可以决定在此时释放该组。
如果发布策略仍未释放该组,则视为已过期。
如果 send-partial-result-on-expiry 为 true,则(部分)MessageGroup 中的现有消息将作为正常的聚合器回复消息释放给 output-channel。
否则,将被丢弃。
groupTimeout 的行为与 MessageGroupStoreReaper 存在差异(请参阅 使用 XML 配置聚合器)。
收割器会定期为 MessageGroupStore 中的所有 MessageGroup 强制完成操作。
如果在 groupTimeout 期间未收到新消息,groupTimeout 会对每个 MessageGroup 单独执行此操作。
此外,收割器还可用于移除空组(保留空组是为了在 expire-groups-upon-completion 为 false 时丢弃延迟消息)。
从版本 5.5 开始,groupTimeoutExpression 可以求值为 java.util.Date 实例。
这在某些情况下非常有用,例如根据组创建时间(MessageGroup.getTimestamp())而不是当前消息到达时间来确定计划任务的时刻,因为这是在将 groupTimeoutExpression 求值为 long 时计算的:
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
使用注解配置聚合器
以下示例展示了使用注解配置的聚合器:
public class Waiter {
...
@Aggregator (1)
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy (2)
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy (3)
public String correlateBy(OrderItem item) {
...
}
}
| 1 | 一个注解,表示此方法应作为聚合器使用。 如果此类用作聚合器,则必须指定该注解。 |
| 2 | 一个注解,表示该方法被用作聚合器的释放策略。
如果未在任何方法上出现,则聚合器使用 SimpleSequenceSizeReleaseStrategy。 |
| 3 | 一个注解,指示该方法应作为聚合器的关联策略使用。
如果未指定关联策略,则聚合器将使用基于CORRELATION_ID的HeaderAttributeCorrelationStrategy。 |
XML 元素提供的所有配置选项也适用于 @Aggregator 注解。
聚合器可以从 XML 中显式引用,或者如果类上定义了 @MessageEndpoint,则可以通过类路径扫描自动检测。
聚合组件的注解配置(@Aggregator 及其他)仅适用于简单用例,其中大多数默认选项已足够。
如果您在使用注解配置时需要对这些选项进行更多控制,请考虑为 AggregatingMessageHandler 使用 @Bean 定义,并将其 @Bean 方法标记为 @ServiceActivator,如下示例所示:
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
有关更多信息,请参阅 编程模型 和 @Bean 方法上的注解。
从版本 4.2 开始,AggregatorFactoryBean 已可用,以简化 AggregatingMessageHandler 的 Java 配置。 |
在聚合器中管理状态:MessageGroupStore
聚合器(以及 Spring Integration 中的一些其他模式)是一种有状态的模式,它需要根据一段时间内到达的、具有相同相关键的一组消息做出决策。
有状态模式中接口的的设计(例如 ReleaseStrategy)遵循一个原则:组件(无论是框架定义的还是用户定义的)应当能够保持无状态。
所有状态都由 MessageGroup 承载,其管理被委托给 MessageGroupStore。MessageGroupStore 接口定义如下:
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
欲了解更多信息,请参阅 Javadoc。
MessageGroupStore在等待释放策略被触发时,会在MessageGroups中累积状态信息,而该事件可能永远不会发生。
因此,为了防止陈旧消息滞留,并让易失性存储提供在应用程序关闭时进行清理的钩子,MessageGroupStore允许您注册回调,以便在其MessageGroups过期时应用这些回调。
该接口非常直接,如下列表所示:
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回调函数可以直接访问存储和消息组,以便管理持久状态(例如,通过完全从存储中移除该组)。
The MessageGroupStore 维护这些回调的列表,它根据需求将这些回调应用于时间戳早于作为参数提供的时间的所有消息(参见前面描述的 registerMessageGroupExpiryCallback(..) 和 expireMessageGroups(..) 方法)。
重要的是,当您希望依赖 expireMessageGroups 功能时,不要在多个聚合器组件中使用相同的 MessageGroupStore 实例。
每个 AbstractCorrelatingMessageHandler 都会基于 forceComplete() 回调注册其自己的 MessageGroupCallback。
这样,每个过期组可能会被错误的聚合器完成或丢弃。
从版本 5.0.10 开始,MessageGroupStore 中的注册回调会使用来自 AbstractCorrelatingMessageHandler 的 UniqueExpiryCallback。
反过来,MessageGroupStore 会检查是否存在该类的实例,如果回调集中已存在该实例,则记录带有相应消息的错误。
通过这种方式,框架禁止在不同的聚合器/重组器中使用 MessageGroupStore 实例,以避免因非特定关联处理器创建的组而过期的副作用。 |
您可以使用超时值调用 expireMessageGroups 方法。
任何早于当前时间减去该值的消息均已过期,并已应用回调。
因此,存储的使用者定义了消息组“过期”的含义。
为了方便用户使用,Spring Integration 提供了以 MessageGroupStoreReaper 形式表示的消息过期包装功能,如下示例所示:
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
收割者是一个 Runnable。
在前面的示例中,消息组存储的 expire 方法每十秒被调用一次。
超时时间本身为 30 秒。
重要的是要理解,MessageGroupStoreReaper的'timeout'属性是一个近似值,并且会受到任务调度器速率的影响,因为该属性仅在MessageGroupStoreReaper任务的下次计划执行时进行检查。
例如,如果超时设置为十分钟,但MessageGroupStoreReaper任务被安排每小时运行一次,且MessageGroupStoreReaper任务的最后一次执行发生在超时前一分钟,那么MessageGroup在接下来的59分钟内不会过期。
因此,我们建议将速率设置为至少等于超时的值或更短。 |
除了收割器之外,当应用程序通过AbstractCorrelatingMessageHandler中的生命周期回调关闭时,也会调用过期回调。
The AbstractCorrelatingMessageHandler 注册其自身的过期回调,这与 XML 配置中聚合器的布尔标志 send-partial-result-on-expiry 相关联。
如果该标志设置为 true,则在调用过期回调时,任何尚未释放的组中的未标记消息都可以被发送到输出通道。
由于MessageGroupStoreReaper是从定时任务中调用的,并且可能会根据sendPartialResultOnExpiry选项向下游集成流生成一条消息,因此建议提供一个自定义的TaskScheduler和MessagePublishingErrorHandler,通过errorChannel来处理异常,因为这与常规的聚合器释放功能预期一致。
相同的逻辑也适用于组超时功能,该功能同样依赖于TaskScheduler。
有关更多信息,请参阅错误处理。 |
|
当共享的 一些 有关 |
Flux 聚合器
在 5.2 版本中,引入了 FluxAggregatorMessageHandler 组件。
它基于 Project Reactor 的 Flux.groupBy() 和 Flux.window() 操作符。
传入的消息由该组件构造函数中的 Flux.create() 启动的 FluxSink 发出。
如果未提供 outputChannel 或其不是 ReactiveStreamsSubscribableChannel 的实例,则对主 Flux 的订阅将由 Lifecycle.start() 实现执行。
否则,订阅将推迟到由 ReactiveStreamsSubscribableChannel 实现完成。
消息按 Flux.groupBy() 进行分组,使用 CorrelationStrategy 作为分组键。
默认情况下,将检查消息的 IntegrationMessageHeaderAccessor.CORRELATION_ID 标头。
默认情况下,每个已关闭的窗口在要生成的消息载荷中都会作为 Flux 被释放。
此消息包含该窗口中第一条消息的所有标头。
输出消息载荷中的此 Flux 必须被订阅并在下游进行处理。
此类逻辑可以通过 FluxAggregatorMessageHandler 的 setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>) 配置选项进行自定义(或替换)。
例如,如果我们希望最终消息中包含多个 List 的载荷,可以按如下方式配置一个 Flux.collectList():
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
FluxAggregatorMessageHandler 中有多个选项可用于选择合适的时间窗口策略:
-
setBoundaryTrigger(Predicate<Message<?>>)- 被传播到Flux.windowUntil()运算符。 有关更多信息,请参阅其 Javadoc。 优先级高于所有其他窗口选项。 -
setWindowSize(int)和setWindowSizeFunction(Function<Message<?>, Integer>)- 会传播到Flux.window(int)或windowTimeout(int, Duration)。 默认情况下,窗口大小根据组中的第一条消息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE头计算得出。 -
setWindowTimespan(Duration)- 根据窗口大小配置传播到Flux.window(Duration)或windowTimeout(int, Duration)。 -
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)- 一个函数,用于将转换应用于分组后的 Flux,以支持任何未由公开选项涵盖的自定义窗口操作。
由于此组件是 MessageHandler 的实现,它可以简单地与 @ServiceActivator 消息注解一起用作 @Bean 定义。
使用 Java DSL,可以从 .handle() EIP 方法中使用它。
下面的示例演示了如何在运行时注册一个 IntegrationFlow,以及如何将 FluxAggregatorMessageHandler 与上游的拆分器进行关联:
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);
重新排序器
重组器与聚合器相关,但用途不同。 虽然聚合器会合并消息,但重组器只是传递消息而不对其进行更改。
功能
重组器的工作方式与聚合器类似,因为它使用 CORRELATION_ID 将消息分组存储。
区别在于,重组器不会对消息进行任何处理。
相反,它会按照其 SEQUENCE_NUMBER 头部值的顺序释放这些消息。
对此,您可以选择一次性释放所有消息(根据 SEQUENCE_SIZE 及其他选项,在整个序列完成后),或者在获得有效序列后立即释放。
(我们将在本章稍后解释“有效序列”的含义。)
| 重组器旨在对具有较小间隔的相对较短的消息序列进行重新排序。 如果您有大量存在许多间隔的不连续序列,可能会遇到性能问题。 |
配置重组器
有关在 Java DSL 中配置重新排序器的信息,请参见 聚合器和重新排序器。
配置重组器只需在 XML 中包含相应的元素。
以下示例展示了重新排序器的配置:
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:resequencer id="completelyDefinedResequencer" (1)
input-channel="inputChannel" (2)
output-channel="outputChannel" (3)
discard-channel="discardChannel" (4)
release-partial-sequences="true" (5)
message-store="messageStore" (6)
send-partial-result-on-expiry="true" (7)
send-timeout="86420000" (8)
correlation-strategy="correlationStrategyBean" (9)
correlation-strategy-method="correlate" (10)
correlation-strategy-expression="headers['something']" (11)
release-strategy="releaseStrategyBean" (12)
release-strategy-method="release" (13)
release-strategy-expression="size() == 10" (14)
empty-group-min-timeout="60000" (15)
lock-registry="lockRegistry" (16)
group-timeout="60000" (17)
group-timeout-expression="size() ge 2 ? 100 : -1" (18)
scheduler="taskScheduler" /> (19)
expire-group-upon-timeout="false" /> (20)
| 1 | 重排序器的 ID 是可选的。 |
| 2 | 重组器的输入通道。 必需。 |
| 3 | 重排序器发送重新排序消息的通道。 可选。 |
| 4 | 重新排序器发送超时消息的通道(如果将send-partial-result-on-timeout设置为false)。
可选。 |
| 5 | 是立即发送已排序的序列,还是等到整个消息组到达后再发送。
可选。
(默认值为 false。) |
| 6 | 对 MessageGroupStore 的引用,可用于在消息组完成之前,根据其关联键存储这些消息组。
可选。
(默认值为易失性内存存储。) |
| 7 | 当组过期时,是否应发送有序组(即使某些消息缺失)。
可选。
(默认为 false。)
请参阅 在聚合器中管理状态:MessageGroupStore。 |
| 8 | 发送回复时等待的超时间隔,将 Message 发送到 output-channel 或 discard-channel。
仅当输出通道存在某些“发送”限制(例如具有固定“容量”的 QueueChannel)时才适用此设置。
在这种情况下,将抛出 MessageDeliveryException。
对于 AbstractSubscribableChannel 实现,send-timeout 将被忽略。
对于 group-timeout(-expression),来自计划过期任务的 MessageDeliveryException 会导致该任务被重新调度。
可选。 |
| 9 | 对实现消息关联(分组)算法的 bean 的引用。
该 bean 可以是 CorrelationStrategy 接口的实现,也可以是一个 POJO。
在后一种情况下,还必须定义 correlation-strategy-method 属性。
可选。
(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 标头。) |
| 10 | 由 correlation-strategy 引用的 Bean 上定义的、实现关联决策算法的方法。
可选,但有约束条件(要求必须存在 correlation-strategy)。 |
| 11 | 表示关联策略的 SpEL 表达式。
示例:"headers['something']"。
只允许使用 correlation-strategy 或 correlation-strategy-expression 中的一个。 |
| 12 | 对实现发布策略的 bean 的引用。
该 bean 可以是 ReleaseStrategy 接口的实现,也可以是一个 POJO。
在后一种情况下,还必须定义 release-strategy-method 属性。
可选(默认情况下,聚合器将使用 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 头部属性)。 |
| 13 | 由 release-strategy 引用的 bean 上定义并实现完成决策算法的方法。
可选,但有限制(要求必须存在 release-strategy)。 |
| 14 | 表示发布策略的 SpEL 表达式。
表达式的根对象为 MessageGroup。
示例:"size() == 5"。
仅允许使用 release-strategy 或 release-strategy-expression 中的一个。 |
| 15 | 仅当为 MessageGroupStoreReaper 配置了 <resequencer> MessageStore 时适用。
默认情况下,当配置 MessageGroupStoreReaper 以过期部分组时,空组也会被移除。
空组在组正常释放后存在。
这是为了能够检测并丢弃迟到的消息。
如果您希望以比过期部分组更长的时间间隔来过期空组,请设置此属性。
这样,空组将不会从 MessageStore 中移除,直到它们至少在此数值的毫秒内未被修改。
请注意,实际过期空组的时间也会受到收割者(reaper)超时属性的影响,可能长达该值加上超时时间。 |
| 16 | 参见 使用 XML 配置聚合器。 |
| 17 | 参见 使用 XML 配置聚合器。 |
| 18 | 参见 使用 XML 配置聚合器。 |
| 19 | 参见 使用 XML 配置聚合器。 |
| 20 | 默认情况下,当组因超时(或由 MessageGroupStoreReaper)完成时,空组的元数据将被保留。
迟到的消息会被立即丢弃。
将此值设置为 true 以完全移除该组。
随后,迟到的消息将启动一个新组,直到该组再次超时前不会被丢弃。
由于导致超时的序列范围中存在“空洞”,新组永远不会被正常释放。
之后可以使用 MessageGroupStoreReaper 配合 empty-group-min-timeout 属性来使空组过期(完全移除)。
从 5.0 版本开始,空组也会在 empty-group-min-timeout 过期后被安排移除。
默认值为 'false'。 |
另请参阅 聚合器过期组 以获取更多信息。
| 由于重排序器在 Java 类中无需实现任何自定义行为,因此不提供相应的注解支持。 |
消息处理器链
MessageHandlerChain 是 MessageHandler 的一个实现,它可以被配置为单个消息端点,同时实际委托给一系列其他处理器,例如过滤器、转换器、拆分器等。
当需要将多个处理器以固定的线性顺序连接时,这可以带来更简单的配置。
例如,在其他组件之前提供一个转换器是非常常见的做法。
同样地,当你在链中的某个其他组件之前提供一个过滤器时,你实际上创建了一个 选择性消费者。
在这两种情况下,该链只需要一个 input-channel 和一个 output-channel,从而消除了为每个单独组件定义通道的需要。
The MessageHandlerChain 主要用于 XML 配置。
对于 Java DSL,IntegrationFlow 定义可被视为链式组件的一部分,但它与本章以下描述的概念和原理无关。
有关更多信息,请参阅 Java DSL。 |
Spring Integration 的 Filter 提供了一个布尔属性:throwExceptionOnRejection。
当您在同一个点对点通道上提供多个具有不同接受条件的选择性消费者时,您应该将此值设置为 'true'(默认值为 false),以便调度器知道消息已被拒绝,从而尝试将消息传递给其他订阅者。
如果未抛出异常,对于调度器而言,消息似乎已成功传递,尽管过滤器实际上已丢弃该消息以阻止进一步处理。
如果您确实希望“丢弃”这些消息,过滤器的'discard-channel'可能很有用,因为它允许您对丢弃的消息执行某些操作(例如将其发送到 JMS 队列或写入日志)。 |
处理器链简化了配置,同时在内部保持了组件之间相同的低耦合程度,并且如果将来需要非线性排列,修改配置也轻而易举。
在内部,链被展开为所列端点的线性设置,由匿名通道分隔。
回复通道头在链中不予考虑。
只有在调用最后一个处理器后,生成的消息才会被转发到回复通道或链的输出通道。
由于这种设置,除最后一个之外的所有处理器必须实现MessageProducer接口(该接口提供'setOutputChannel()'方法)。
如果outputChannel上的MessageHandlerChain已设置,则最后一个处理器只需要一个输出通道。
与其他端点一样,output-channel 是可选的。
如果链的末尾存在回复消息,则 output-channel 将优先使用。
但是,如果不可用,链处理器将检查入站消息上的 reply channel 头部作为回退方案。 |
在大多数情况下,您无需自行实现 MessageHandler。
下一节将重点介绍链元素的命名空间支持。
大多数 Spring Integration 端点(例如服务激活器和转换器)都适合在 MessageHandlerChain 中使用。
配置链
<chain>元素提供了一个input-channel属性。
如果链中的最后一个元素能够生成回复消息(可选),它还支持一个output-channel属性。
子元素包括过滤器、转换器、拆分器和活动服务组件。
最后一个元素也可以是路由器或出站通道适配器。
以下示例展示了一个链的定义:
<int:chain input-channel="input" output-channel="output">
<int:filter ref="someSelector" throw-exception-on-rejection="true"/>
<int:header-enricher>
<int:header name="thing1" value="thing2"/>
</int:header-enricher>
<int:service-activator ref="someService" method="someMethod"/>
</int:chain>
在前面的示例中使用的 <header-enricher> 元素会在消息上设置一个名为 thing1、值为 thing2 的消息头。
头部丰富器是 Transformer 的一种特化形式,仅处理头部值。
您也可以通过实现一个执行头部修改的 MessageHandler 并将其配置为 Bean 来获得相同的结果,但使用头部丰富器是一个更简单的选择。
The <chain> can be configured as the last “closed-box” consumer of the message flow.
For this solution, you can to put it at the end of the <chain> some <outbound-channel-adapter>, as the following example shows:
<int:chain input-channel="input">
<int-xml:marshalling-transformer marshaller="marshaller" result-type="StringResult" />
<int:service-activator ref="someService" method="someMethod"/>
<int:header-enricher>
<int:header name="thing1" value="thing2"/>
</int:header-enricher>
<int:logging-channel-adapter level="INFO" log-full-message="true"/>
</int:chain>
|
不允许使用的属性和元素
某些属性,例如 对于 Spring Integration 核心组件,XML 模式本身会强制执行其中一些约束。 然而,对于非核心组件或您自己的自定义组件,这些约束由 XML 命名空间解析器强制执行,而不是由 XML 模式强制执行。 这些 XML 命名空间解析器约束是在 Spring Integration 2.2 版本中新增的。
如果您尝试使用不允许的属性和元素,XML 命名空间解析器将抛出 |
使用 'id' 属性
从 Spring Integration 3.0 开始,如果链元素被赋予 id 属性,则该元素的 bean 名称是链的 id 与元素自身的 id 的组合。
没有 id 属性的元素不会被注册为 bean,但每个元素都会被分配一个包含链 id 的 componentName。
请考虑以下示例:
<int:chain id="somethingChain" input-channel="input">
<int:service-activator id="somethingService" ref="someService" method="someMethod"/>
<int:object-to-json-transformer/>
</int:chain>
在前面的示例中:
-
<chain>根元素具有一个值为'somethingChain'的id。 因此,AbstractEndpoint实现(根据input-channel类型,为PollingConsumer或EventDrivenConsumer)的 Bean 将此值作为其 Bean 名称。 -
MessageHandlerChainbean 获取了一个 bean 别名('somethingChain.handler'),这使得可以从BeanFactory直接访问该 bean。 -
<service-activator>不是一个功能完整的消息端点(它不是PollingConsumer或EventDrivenConsumer)。 它是<chain>中的一个MessageHandler。 在这种情况下,向BeanFactory注册的 Bean 名称为 'somethingChain$child.somethingService.handler'。 -
该
componentName的ServiceActivatingHandler采用相同的值,但不带 '.handler' 后缀。 它变为 'somethingChain$child.somethingService'。 -
最后一个
<chain>子组件<object-to-json-transformer>没有id属性。 它的componentName基于其在<chain>中的位置。 在此情况下,它是 'somethingChain$child#1'。 (名称的最终元素是链内的顺序,从 '#0' 开始)。 注意,此转换器未在应用程序上下文中注册为 Bean,因此它不会获得beanName。 然而,它的componentName具有对日志记录和其他用途有价值的值。
为简化日志中子组件的识别并允许从BeanFactory等位置访问它们,建议在<chain>元素上显式提供id属性。 |
在链中调用另一个链
有时,您需要从某个链内部嵌套调用另一个链,然后返回并继续在原始链中执行。 为此,您可以通过包含 <gateway> 元素来使用消息网关,如下例所示:
<int:chain id="main-chain" input-channel="in" output-channel="out">
<int:header-enricher>
<int:header name="name" value="Many" />
</int:header-enricher>
<int:service-activator>
<bean class="org.foo.SampleService" />
</int:service-activator>
<int:gateway request-channel="inputA"/>
</int:chain>
<int:chain id="nested-chain-a" input-channel="inputA">
<int:header-enricher>
<int:header name="name" value="Moe" />
</int:header-enricher>
<int:gateway request-channel="inputB"/>
<int:service-activator>
<bean class="org.foo.SampleService" />
</int:service-activator>
</int:chain>
<int:chain id="nested-chain-b" input-channel="inputB">
<int:header-enricher>
<int:header name="name" value="Jack" />
</int:header-enricher>
<int:service-activator>
<bean class="org.foo.SampleService" />
</int:service-activator>
</int:chain>
在上面的示例中,nested-chain-a 由那里配置的“网关”元素在 main-chain 处理结束时调用。
而在 nested-chain-a 中,在标头丰富之后会调用一个 nested-chain-b。
随后,流程返回到 nested-chain-b 以完成执行。
最后,流程返回到 main-chain。
当链中定义了 <gateway> 元素的嵌套版本时,它不需要 service-interface 属性。
相反,它会获取当前状态下的消息,并将其放置在 request-channel 属性中定义的通道上。
当该网关触发的下游流完成后,会将一个 Message 返回给网关,并继续在当前链中流转。
Scatter-Gather
自 Spring Integration 4.1 版本起,提供了 散列-聚合 企业集成模式的实现。 它是一个复合端点,目标是向接收者发送消息并聚集结果。 正如在 企业集成模式 中所指出的那样,在“最佳报价”场景中,我们需要从多个提供商那里获取信息,并决定哪个提供商为请求的商品提供了最优条件。
之前,模式可以通过独立的组件进行配置。 此增强带来了更加便捷的配置方式。
The ScatterGatherHandler 是一个请求-回复端点,结合了一个 PublishSubscribeChannel(或一个 RecipientListRouter) 和一个 AggregatingMessageHandler。
请求消息被发送到 scatter 通道,并且 ScatterGatherHandler 等待聚合器将回复发送到 outputChannel。
功能
Scatter-Gather模式暗示了两种场景:"拍卖"和"分发"。
在这两种情况下,aggregation函数是相同的,并为AggregatingMessageHandler提供所有可用选项。
(实际上,ScatterGatherHandler仅需一个AggregatingMessageHandler作为构造函数参数。)
有关更多信息,请参阅聚合器。
拍卖
拍卖 Scatter-Gather 变体使用“发布 - 订阅”逻辑来处理请求消息,其中“散射”通道是一个带有 PublishSubscribeChannel 和 apply-sequence="true" 的 MessageChannel。
然而,该通道可以是任何 MessageChannel 实现(就像 ContentEnricher 中的 request-channel 一样——参见 内容增强器)。
但是,在这种情况下,您应该为 aggregation 函数创建自己的自定义 correlationStrategy。
分发
该Scatter-Gather分发变体基于RecipientListRouter(参见RecipientListRouter),并包含RecipientListRouter的所有可用选项。
这是第二个ScatterGatherHandler构造函数参数。
如果您希望仅依赖recipient-list-router和aggregator的默认correlationStrategy,则应指定apply-sequence="true"。
否则,您应为aggregator提供一个自定义的correlationStrategy。
与PublishSubscribeChannel变体(拍卖变体)不同,拥有recipient-list-routerselector选项允许根据消息过滤目标提供商。
使用apply-sequence="true"时,将提供默认的sequenceSize,并且aggregator可以正确释放该组。
分发选项与拍卖选项互斥。
The applySequence=true 是必需的,仅适用于基于 ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) 构造函数配置的纯 Java 配置,因为框架无法修改外部提供的组件。 为了方便起见,从版本 6.0 开始,Scatter-Gather 的 XML 和 Java DSL 将 applySequence 设置为 true。 |
对于拍卖和分配变体,请求(散列)消息会被丰富添加带有gatherResultChannel头信息以等待来自aggregator的回复消息。
默认情况下,所有提供商应该将他们的结果发送到replyChannel头(通常通过省略最终端点中的output-channel来实现)。
但是,还提供了gatherChannel选项,允许提供商将他们的回复发送到该通道以进行聚合。
配置散射 - 汇聚端点
以下示例展示了bean定义的Java配置代码段Scatter-Gather:
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}
@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}
在前面的例子中,我们配置了RecipientListRouterdistributor bean 为 applySequence="true" 并且设置了接收者通道的列表。
下一个 bean 是一个 AggregatingMessageHandler。最后,我们将这两个 bean 注入到 ScatterGatherHandler bean 定义中,并将其标记为 @ServiceActivator 以将散列收集组件连接到集成流中。
以下示例展示了如何通过使用XML命名空间来配置<scatter-gather>端点:
<scatter-gather
id="" (1)
auto-startup="" (2)
input-channel="" (3)
output-channel="" (4)
scatter-channel="" (5)
gather-channel="" (6)
order="" (7)
phase="" (8)
send-timeout="" (9)
gather-timeout="" (10)
requires-reply="" > (11)
<scatterer/> (12)
<gatherer/> (13)
</scatter-gather>
| 1 | 端点的 ID。
ScatterGatherHandler Bean 已使用别名 id + '.handler' 注册。
RecipientListRouter Bean 已使用别名 id + '.scatterer' 注册。
AggregatingMessageHandler`bean is registered with an alias of `id + '.gatherer'。
可选。
(BeanFactory 会生成默认的 id 值。) |
| 2 | 生命周期属性,表示端点是否应在应用程序上下文初始化时启动。
此外,ScatterGatherHandler还实现了Lifecycle并启停gatherEndpoint,如果提供了gather-channel,则会在内部创建gatherEndpoint。
此属性可选。
(默认值为true。) |
| 3 | 在接收请求消息并处理它们的通道上进行接收。 必需。 |
| 4 | 发送聚合结果的通道。
可选。
(入站消息可以在replyChannel消息头中自己指定回复通道)。 |
| 5 | 用于向拍卖场景发送散射消息的频道。
可选。
与 <scatterer> 子元素互斥。 |
| 6 | 用于从每个提供商接收聚合回复的通道。
它用作散点消息中的 replyChannel 标头。
可选。
默认情况下,将创建 FixedSubscriberChannel。 |
| 7 | 当多个处理器订阅到同一个 DirectChannel 时,此组件的顺序(用于负载均衡目的)。
可选。 |
| 8 | 指定端点应启动和停止的阶段。
启动顺序从低到高,关闭顺序从高到低。
默认情况下,此值为 Integer.MAX_VALUE,表示该容器尽可能晚地启动并尽快停止。
可选。 |
| 9 | 发送回复时等待的超时间隔 Message 到 output-channel。
默认情况下,send() 会阻塞一秒。
它仅适用于输出通道存在某些“发送”限制的情况——例如,具有固定“容量”且已满的 QueueChannel。
在这种情况下,将抛出 MessageDeliveryException。
对于 AbstractSubscribableChannel 实现,send-timeout 将被忽略。
对于 group-timeout(-expression),计划过期任务中的 MessageDeliveryException 会导致该任务被重新调度。
可选。 |
| 10 | 允许您指定 scatter-gather 在返回前等待回复消息的时长。
默认情况下,它等待 30 秒。
如果回复超时,将返回 'null'。
可选。 |
| 11 | 指定散合模式是否必须返回非空值。
该值默认为 true。
因此,当底层聚合器在 gather-timeout 后返回空值时,将抛出 ReplyRequiredException。
注意,如果 null 是可能的情况,则应指定 gather-timeout 以避免无限等待。 |
| 12 | <recipient-list-router> 选项。
可选。
与 scatter-channel 属性互斥。 |
| 13 | <aggregator> 个选项。
必填。 |
错误处理
由于 Scatter-Gather 是一个多请求 - 响应组件,错误处理具有一定的额外复杂性。
在某些情况下,如果 ReleaseStrategy 允许流程以少于请求数的响应完成,则最好直接捕获并忽略下游异常。
在其他情况下,当发生错误时,应考虑使用类似“补偿消息”的机制从子流程返回。
每个异步子流程都应配置一个 errorChannel 标头,以便从 MessagePublishingErrorHandler 正确发送错误消息。
否则,错误将发送至全局 errorChannel,并应用通用的错误处理逻辑。
有关异步错误处理的更多信息,请参阅 错误处理。
同步流可以使用ExpressionEvaluatingRequestHandlerAdvice来忽略异常或返回补偿消息。
当一个异常从某个子流抛出到ScatterGatherHandler时,它会被重新抛出给上游。
这样,所有其他子流都将白费功夫,它们的回复将在ScatterGatherHandler中被忽略。
这在某些情况下可能是预期行为,但在大多数情况下,最好在特定的子流中处理错误,而不影响其他子流以及收集器的期望。
从版本 5.1.3 开始,ScatterGatherHandler 随 errorChannelName 选项一同提供。
它被填充到散点消息的 errorChannel 头中,并在发生异步错误时使用,或者可用于常规同步子流程以直接发送错误消息。
下面的示例配置展示了如何通过返回补偿消息来实现异步错误处理:
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
要生成正确的回复,我们必须从已发送至 scatterGatherErrorChannel 的 MessagePublishingErrorHandler 的 MessagingException 的 failedMessage 中复制标头(包括 replyChannel 和 errorChannel)。
这样,目标异常将返回给用于完成回复消息组的 ScatterGatherHandler 的收集器。
此类异常 payload 可以在收集器的 MessageGroupProcessor 中被过滤掉,或者在分散 - 聚集端点之后以其他方式在下游处理。
在将散射结果发送给收集器之前,ScatterGatherHandler 会恢复请求消息头,包括回复通道和错误通道(如果存在)。
通过这种方式,即使散射接收子流程中应用了异步交接,来自 AggregatingMessageHandler 的错误也将被传播给调用者。
为了成功操作,必须将 gatherResultChannel、originalReplyChannel 和 originalErrorChannel 头信息从散射接收子流程的回复中传回。
在这种情况下,必须为 ScatterGatherHandler 配置一个合理的有限值 gatherTimeout。
否则,默认情况下它将一直阻塞并等待来自收集器的回复。 |
线程屏障
有时,我们需要暂停消息流线程,直到发生某些其他异步事件。 例如,考虑一个向 RabbitMQ 发布消息的 HTTP 请求。 我们可能希望等待 RabbitMQ 代理发出消息已收到的确认后再回复用户。
在 4.2 版本中,Spring Integration 为此引入了 <barrier/> 组件。
底层的 MessageHandler 是 BarrierMessageHandler。
该类还实现了 MessageTriggerAction,其中传递给 trigger() 方法的消息会释放 handleRequestMessage() 方法(如果存在)中的相应线程。
暂停的线程与触发线程通过在对消息调用 CorrelationStrategy 时进行关联。
当消息发送到 input-channel 时,线程将暂停最多 requestTimeout 毫秒,等待相应的触发消息。
默认的相关策略使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 标头。
当具有相同相关性的触发消息到达时,线程将被释放。
在释放后发送到 output-channel 的消息是使用 MessageGroupProcessor 构建的。
默认情况下,消息是两个有效负载的 Collection<?>,并且标头通过使用 DefaultAggregatingMessageGroupProcessor 进行合并。
如果首先(或在主线程超时后)调用了trigger()方法,它将暂停最多triggerTimeout以等待暂停消息到达。
如果您不希望挂起触发线程,可以考虑将其转交给TaskExecutor,以便其线程被挂起。 |
在 5.4 版本之前,请求消息和触发消息仅有一个 timeout 选项,但在某些使用场景中,为这些操作设置不同的超时时间更为合适。
因此,现已引入 requestTimeout 和 triggerTimeout 选项。 |
requires-reply 属性决定了如果在触发消息到达之前挂起的线程超时,应采取的操作。
默认情况下,其值为 false,这意味着端点返回 null,流程结束,线程返回给调用者。
当值为 true 时,将抛出 ReplyRequiredException。
您可以以编程方式调用 trigger() 方法(通过名称获取 Bean 引用,barrier.handler — 其中 barrier 是屏障端点的 Bean 名称)。
或者,您可以配置一个 <outbound-channel-adapter/> 来触发释放。
| 同一关联 ID 只能暂停一个线程。 同一关联 ID 可多次使用,但不可同时并发使用。 如果有第二个线程携带相同的关联 ID 到达,将抛出异常。 |
以下示例展示了如何使用自定义标头进行关联:
@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateTriggerChannel);
return barrier;
}
@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
correlation-strategy-expression="headers['myHeader']"
output-processor="myOutputProcessor"
discard-channel="lateTriggerChannel"
timeout="10000">
</int:barrier>
<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />
取决于哪条消息先到达,发送消息到in的线程或发送消息到release的线程最多等待十秒,直到另一条消息到达。
当消息被释放时,会向out通道发送一条消息,该消息结合了调用自定义的名为myOutputProcessor的MessageGroupProcessor Bean的结果。
如果主线程超时且触发器稍后到达,您可以配置一个丢弃通道,稍后的触发器将被发送到该通道。
有关此组件的示例,请参阅 屏障示例应用程序。