消息路由

消息路由

本章介绍了使用 Spring Integration 路由消息的详细信息。spring-doc.cadn.net.cn

路由器

本节介绍路由器的工作原理。它包括以下主题:spring-doc.cadn.net.cn

概述

路由器是许多消息传递体系结构中的关键元素。它们使用来自消息通道的消息,并根据一组条件将每个使用的消息转发到一个或多个不同的消息通道。spring-doc.cadn.net.cn

Spring Integration 提供以下路由器:spring-doc.cadn.net.cn

路由器实现共享许多配置参数。但是,路由器之间存在某些差异。此外,配置参数的可用性取决于路由器是在链内部还是外部使用。为了提供快速概述,以下两个表中列出了所有可用属性。spring-doc.cadn.net.cn

下表显示了链外路由器可用的配置参数:spring-doc.cadn.net.cn

表 1.链外的路由器
属性 路由器 标头值路由器 XPath路由器 有效载荷类型路由器 收件人列表路由 异常类型路由器

应用序列spring-doc.cadn.net.cn

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

默认输出通道spring-doc.cadn.net.cn

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

需要分辨率spring-doc.cadn.net.cn

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

忽略发送失败spring-doc.cadn.net.cn

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

超时spring-doc.cadn.net.cn

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

idspring-doc.cadn.net.cn

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

自动启动spring-doc.cadn.net.cn

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

输入通道spring-doc.cadn.net.cn

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

次序spring-doc.cadn.net.cn

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

方法spring-doc.cadn.net.cn

刻度线

裁判spring-doc.cadn.net.cn

刻度线

表达spring-doc.cadn.net.cn

刻度线

标头名称spring-doc.cadn.net.cn

刻度线

评估为字符串spring-doc.cadn.net.cn

刻度线

xpath-expression-refspring-doc.cadn.net.cn

刻度线

转炉spring-doc.cadn.net.cn

刻度线

下表显示了链内路由器可用的配置参数:spring-doc.cadn.net.cn

表 2.链内的路由器
属性 路由器 标头值路由器 XPath路由器 有效载荷类型路由器 收件人列表路由器 异常类型路由器

应用序列spring-doc.cadn.net.cn

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

默认输出通道spring-doc.cadn.net.cn

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

需要分辨率spring-doc.cadn.net.cn

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

忽略发送失败spring-doc.cadn.net.cn

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

超时spring-doc.cadn.net.cn

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

idspring-doc.cadn.net.cn

自动启动spring-doc.cadn.net.cn

输入通道spring-doc.cadn.net.cn

次序spring-doc.cadn.net.cn

方法spring-doc.cadn.net.cn

刻度线

裁判spring-doc.cadn.net.cn

刻度线

表达spring-doc.cadn.net.cn

刻度线

标头名称spring-doc.cadn.net.cn

刻度线

评估为字符串spring-doc.cadn.net.cn

刻度线

xpath-expression-refspring-doc.cadn.net.cn

刻度线

转炉spring-doc.cadn.net.cn

刻度线

从 Spring Integration 2.1 开始,路由器参数在所有路由器实现中都更加标准化。 因此,一些细微的更改可能会破坏基于 Spring Integration 的旧应用程序。spring-doc.cadn.net.cn

从 Spring Integration 2.1 开始,ignore-channel-name-resolution-failures属性被删除,以支持将其行为与resolution-required属性。 此外,resolution-required属性现在默认为true.spring-doc.cadn.net.cn

在这些更改之前,resolution-required属性默认为false,导致消息在未解析通道且default-output-channel已设置。 新行为至少需要一个已解析的通道,并且默认情况下,会抛出MessageDeliveryException如果未确定通道(或尝试发送不成功)。spring-doc.cadn.net.cn

如果您确实希望静默删除消息,您可以将default-output-channel="nullChannel".spring-doc.cadn.net.cn

常见路由器参数

本节介绍所有路由器参数的通用参数(在本章前面显示的两个表中勾选了所有框的参数)。spring-doc.cadn.net.cn

链条的内部和外部

以下参数对链内外的所有路由器都有效。spring-doc.cadn.net.cn

apply-sequence

此属性指定是否应将序列号和大小标头添加到每封邮件中。 此可选属性默认为false.spring-doc.cadn.net.cn

default-output-channel

如果设置,则此属性提供对通道的引用,如果通道解析未能返回任何通道,则应在其中发送消息。 如果未提供默认输出通道,路由器将抛出异常。 如果要以静默方式删除这些消息,请将默认输出通道属性值设置为nullChannel.spring-doc.cadn.net.cn

从 6.0 版开始,设置默认输出通道也会重置channelKeyFallback选项设置为false. 因此,不会尝试从其名称解析通道,而是回退到这个默认输出通道 - 类似于 Javaswitch陈述。 如果channelKeyFallback设置为true显式地,进一步的逻辑取决于resolutionRequired选项:从键到未解析通道的消息可以到达defaultOutputChannel仅当resolutionRequiredfalse. 因此,配置中defaultOutputChannel提供,并且channelKeyFallback & resolutionRequired设置为trueAbstractMappingMessageRouter初始化阶段。
resolution-required

此属性指定是否必须始终将通道名称成功解析为存在的通道实例。 如果设置为true一个MessagingException当通道无法解析时,会引发。 将此属性设置为false导致忽略任何不可解析的通道。 此可选属性默认为true.spring-doc.cadn.net.cn

消息仅发送到default-output-channel,如果指定,则resolution-requiredfalse并且通道未解析。
ignore-send-failures

如果设置为true,则忽略发送到消息通道的失败。 如果设置为false一个MessageDeliveryException,如果路由器解析多个通道,则任何后续通道都不会接收消息。spring-doc.cadn.net.cn

此属性的确切行为取决于Channel消息发送到的。 例如,当使用直接通道(单线程)时,发送失败可能是由下游组件抛出的异常引起的。 但是,当将消息发送到简单的队列通道(异步)时,抛出异常的可能性相当小。spring-doc.cadn.net.cn

虽然大多数路由器路由到单个通道,但它们可以返回多个通道名称。 这recipient-list-router例如,正是这样做的。 如果将此属性设置为true在仅路由到单个通道的路由器上,任何导致的异常都会被吞噬,这通常没有意义。 在这种情况下,最好在流入口点的错误流中捕获异常。 因此,将ignore-send-failures属性设置为true当路由器实现返回多个通道名称时,通常更有意义,因为失败通道后面的其他通道仍会收到消息。

此属性默认为false.spring-doc.cadn.net.cn

timeout

timeout属性指定将消息发送到目标消息通道时等待的最长时间(以毫秒为单位)。spring-doc.cadn.net.cn

顶层(链外)

以下参数仅在链外的所有顶级路由器上有效。spring-doc.cadn.net.cn

id

标识底层 Spring Bean 定义,对于路由器,它是EventDrivenConsumerPollingConsumer,具体取决于路由器的input-channel是一个SubscribableChannelPollableChannel分别。 这是一个可选属性。spring-doc.cadn.net.cn

auto-startup

此“生命周期”属性指示是否应在应用程序上下文启动期间启动此组件。 此可选属性默认为true.spring-doc.cadn.net.cn

input-channel

此端点的接收消息通道。spring-doc.cadn.net.cn

order

此属性定义此端点作为通道的订阅者连接到时的调用顺序。 当该通道使用故障转移调度策略时,这一点尤其重要。 当此端点本身是具有队列的通道的轮询使用者时,它不起作用。spring-doc.cadn.net.cn

路由器实施

由于基于内容的路由通常需要一些特定于域的逻辑,因此大多数用例都需要 Spring Integration 的选项,以便使用 XML 命名空间支持或注释委托给 POJO。 这两者都会在后面讨论。 但是,我们首先介绍几个满足常见需求的实现。spring-doc.cadn.net.cn

PayloadTypeRouter

一个PayloadTypeRouter将消息发送到有效负载类型映射定义的通道,如以下示例所示:spring-doc.cadn.net.cn

<bean id="payloadTypeRouter"
      class="org.springframework.integration.router.PayloadTypeRouter">
    <property name="channelMapping">
        <map>
            <entry key="java.lang.String" value-ref="stringChannel"/>
            <entry key="java.lang.Integer" value-ref="integerChannel"/>
        </map>
    </property>
</bean>

配置PayloadTypeRouterSpring Integration 提供的命名空间也支持 (请参阅Namespace Support),它本质上通过将<router/>配置及其相应的实现(通过使用<bean/>元素)转换为单个且更简洁的配置元素。 以下示例显示了PayloadTypeRouter与上述配置等效,但使用命名空间支持:spring-doc.cadn.net.cn

<int:payload-type-router input-channel="routingChannel">
    <int:mapping type="java.lang.String" channel="stringChannel" />
    <int:mapping type="java.lang.Integer" channel="integerChannel" />
</int:payload-type-router>

以下示例显示了在 Java 中配置的等效路由器:spring-doc.cadn.net.cn

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public PayloadTypeRouter router() {
    PayloadTypeRouter router = new PayloadTypeRouter();
    router.setChannelMapping(String.class.getName(), "stringChannel");
    router.setChannelMapping(Integer.class.getName(), "integerChannel");
    return router;
}

使用 Java DSL 时,有两个选项。spring-doc.cadn.net.cn

首先,您可以定义路由器对象,如上例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routerFlow1() {
    return IntegrationFlow.from("routingChannel")
            .route(router())
            .get();
}

public PayloadTypeRouter router() {
    PayloadTypeRouter router = new PayloadTypeRouter();
    router.setChannelMapping(String.class.getName(), "stringChannel");
    router.setChannelMapping(Integer.class.getName(), "integerChannel");
    return router;
}

请注意,路由器可以是,但不一定是@Bean. 如果它不是@Bean.spring-doc.cadn.net.cn

其次,可以在 DSL 流本身中定义路由函数,如以下示例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routerFlow2() {
    return IntegrationFlow.from("routingChannel")
            .<Object, Class<?>>route(Object::getClass, m -> m
                    .channelMapping(String.class, "stringChannel")
                    .channelMapping(Integer.class, "integerChannel"))
            .get();
}
HeaderValueRouter

一个HeaderValueRouter根据各个标头值映射将消息发送到通道。 当HeaderValueRouter创建时,它将使用要评估的标头的名称进行初始化。 标头的值可以是以下两种情况之一:spring-doc.cadn.net.cn

如果它是任意值,则需要将这些标头值与通道名称进行其他映射。 否则,不需要额外的配置。spring-doc.cadn.net.cn

Spring Integration 提供了一个简单的基于命名空间的 XML 配置来配置HeaderValueRouter. 以下示例演示了HeaderValueRouter当需要将标头值映射到通道时:spring-doc.cadn.net.cn

<int:header-value-router input-channel="routingChannel" header-name="testHeader">
    <int:mapping value="someHeaderValue" channel="channelA" />
    <int:mapping value="someOtherHeaderValue" channel="channelB" />
</int:header-value-router>

在解析过程中,上例中定义的路由器可能会遇到通道解析失败,从而导致异常。 如果要禁止此类异常并将未解析的消息发送到默认输出通道(用default-output-channel属性)设置resolution-requiredfalse.spring-doc.cadn.net.cn

通常,标头值未显式映射到通道的消息将发送到default-output-channel. 但是,当标头值映射到通道名称但无法解析通道时,将resolution-required属性设置为false导致此类消息路由到default-output-channel.spring-doc.cadn.net.cn

从 Spring Integration 2.1 开始,该属性已从ignore-channel-name-resolution-failuresresolution-required. 属性resolution-required默认为true.

以下示例显示了在 Java 中配置的等效路由器:spring-doc.cadn.net.cn

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter("testHeader");
    router.setChannelMapping("someHeaderValue", "channelA");
    router.setChannelMapping("someOtherHeaderValue", "channelB");
    return router;
}

使用 Java DSL 时,有两个选项。 首先,您可以定义路由器对象,如上例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routerFlow1() {
    return IntegrationFlow.from("routingChannel")
            .route(router())
            .get();
}

public HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter("testHeader");
    router.setChannelMapping("someHeaderValue", "channelA");
    router.setChannelMapping("someOtherHeaderValue", "channelB");
    return router;
}

请注意,路由器可以是,但不一定是@Bean. 如果它不是@Bean.spring-doc.cadn.net.cn

其次,可以在 DSL 流本身中定义路由函数,如以下示例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routerFlow2() {
    return IntegrationFlow.from("routingChannel")
            .route(Message.class, m -> m.getHeaders().get("testHeader", String.class),
                    m -> m
                        .channelMapping("someHeaderValue", "channelA")
                        .channelMapping("someOtherHeaderValue", "channelB"),
                e -> e.id("headerValueRouter"))
            .get();
}

不需要将标头值映射到通道名称的配置,因为标头值本身表示通道名称。 以下示例显示了不需要将标头值映射到通道名称的路由器:spring-doc.cadn.net.cn

<int:header-value-router input-channel="routingChannel" header-name="testHeader"/>

从 Spring Integration 2.1 开始,解析通道的行为更加明确。 例如,如果您省略default-output-channel属性,则路由器无法解析至少一个有效通道,并且通过设置resolution-requiredfalse,然后是MessageDeliveryException被抛出。spring-doc.cadn.net.cn

基本上,默认情况下,路由器必须能够成功地将消息路由到至少一个通道。 如果你真的想留言,你还必须有default-output-channel设置为nullChannel.spring-doc.cadn.net.cn

RecipientListRouter

一个RecipientListRouter将每个接收到的消息发送到静态定义的消息通道列表。 以下示例创建了一个RecipientListRouter:spring-doc.cadn.net.cn

<bean id="recipientListRouter"
      class="org.springframework.integration.router.RecipientListRouter">
    <property name="channels">
        <list>
            <ref bean="channel1"/>
            <ref bean="channel2"/>
            <ref bean="channel3"/>
        </list>
    </property>
</bean>

Spring Integration 还为RecipientListRouter配置(请参阅命名空间支持),如以下示例所示:spring-doc.cadn.net.cn

<int:recipient-list-router id="customRouter" input-channel="routingChannel"
        timeout="1234"
        ignore-send-failures="true"
        apply-sequence="true">
  <int:recipient channel="channel1"/>
  <int:recipient channel="channel2"/>
</int:recipient-list-router>

以下示例显示了在 Java 中配置的等效路由器:spring-doc.cadn.net.cn

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public RecipientListRouter router() {
    RecipientListRouter router = new RecipientListRouter();
    router.setSendTimeout(1_234L);
    router.setIgnoreSendFailures(true);
    router.setApplySequence(true);
    router.addRecipient("channel1");
    router.addRecipient("channel2");
    router.addRecipient("channel3");
    return router;
}

以下示例显示了使用 Java DSL 配置的等效路由器:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlow.from("routingChannel")
            .routeToRecipients(r -> r
                    .applySequence(true)
                    .ignoreSendFailures(true)
                    .recipient("channel1")
                    .recipient("channel2")
                    .recipient("channel3")
                    .sendTimeout(1_234L))
            .get();
}
此处的“apply-sequence”标志与发布订阅通道的效果相同,并且与发布订阅通道一样,默认情况下在recipient-list-router. 看PublishSubscribeChannel配置了解更多信息。

配置RecipientListRouter是使用 Spring 表达式语言 (SpEL) 支持作为单个接收方通道的选择器。 这样做类似于在“链”的开头使用过滤器来充当“选择性消费者”。 但是,在这种情况下,它们都相当简洁地组合到路由器的配置中,如以下示例所示:spring-doc.cadn.net.cn

<int:recipient-list-router id="customRouter" input-channel="routingChannel">
    <int:recipient channel="channel1" selector-expression="payload.equals('foo')"/>
    <int:recipient channel="channel2" selector-expression="headers.containsKey('bar')"/>
</int:recipient-list-router>

在前面的配置中,由selector-expression属性的计算,以确定是否应将此收件人包含在给定输入消息的收件人列表中。 表达式的评估结果必须是boolean. 如果未定义此属性,则通道始终位于收件人列表中。spring-doc.cadn.net.cn

RecipientListRouterManagement

从 4.1 版本开始,RecipientListRouter提供了多个作,用于在运行时动态作收件人。 这些管理作由RecipientListRouterManagement通过@ManagedResource注解。 它们可以通过使用控制总线和 JMX 来获得,如以下示例所示:spring-doc.cadn.net.cn

<control-bus input-channel="controlBus"/>

<recipient-list-router id="simpleRouter" input-channel="routingChannelA">
   <recipient channel="channel1"/>
</recipient-list-router>

<channel id="channel2"/>
messagingTemplate.convertAndSend(controlBus, "@'simpleRouter.handler'.addRecipient('channel2')");

从应用程序启动simpleRouter,只有一个channel1收件人。 但在addRecipient命令channel2收件人。 这是一个“注册对消息一部分的内容的兴趣”用例,当我们可能对某个时间段来自路由器的消息感兴趣时,因此我们订阅了recipient-list-router并在某个时候决定取消订阅。spring-doc.cadn.net.cn

由于运行时管理作<recipient-list-router>,它可以在没有任何<recipient>从一开始。 在这种情况下,的行为RecipientListRouter当邮件没有匹配的收件人时,也是如此。 如果defaultOutputChannel配置,则消息将发送到那里。 否则,MessageDeliveryException被抛出。spring-doc.cadn.net.cn

XPath 路由器

XPath 路由器是 XML 模块的一部分。 请参阅使用 XPath 路由 XML 消息spring-doc.cadn.net.cn

路由和错误处理

Spring Integration 还提供了一个特殊的基于类型的路由器,称为ErrorMessageExceptionTypeRouter用于路由错误消息(定义为其payload是一个Throwable实例)。ErrorMessageExceptionTypeRouter类似于PayloadTypeRouter. 事实上,它们几乎相同。 唯一的区别是,虽然PayloadTypeRouter导航有效负载实例的实例层次结构(例如,payload.getClass().getSuperclass()) 查找最具体的类型和通道映射,则ErrorMessageExceptionTypeRouter导航“异常原因”的层次结构(例如,payload.getCause()) 以找到最具体的Throwable类型或通道映射和用途mappingClass.isInstance(cause)以匹配cause到类或任何超类。spring-doc.cadn.net.cn

在这种情况下,通道映射顺序很重要。 因此,如果需要获取IllegalArgumentException,但不是RuntimeException,必须先在路由器上配置最后一个。
从 4.3 版本开始,ErrorMessageExceptionTypeRouter在初始化阶段加载所有映射类,以快速失败ClassNotFoundException.

以下示例显示了ErrorMessageExceptionTypeRouter:spring-doc.cadn.net.cn

Java DSL
@Bean
public IntegrationFlow someFlow() {
    return f -> f
            .routeByException(r -> r
                 .channelMapping(IllegalArgumentException.class, "illegalChannel")
                 .channelMapping(NullPointerException.class, "npeChannel")
                 .defaultOutputChannel("defaultChannel"));
}
Kotlin DSL
@Bean
fun someFlow() =
    integrationFlow {
        routeByException {
                    channelMapping(IllegalArgumentException::class.java, "illegalChannel")
                    channelMapping(NullPointerException::class.java, "npeChannel")
                    defaultOutputChannel("defaultChannel")
                }
    }
时髦的 DSL
@Bean
someFlow() {
    integrationFlow {
        routeByException {
            channelMapping IllegalArgumentException, 'illegalChannel'
            channelMapping NullPointerException, 'npeChannel'
            defaultOutputChannel 'defaultChannel'
        }
    }
}
<int:exception-type-router input-channel="inputChannel"
                           default-output-channel="defaultChannel">
    <int:mapping exception-type="java.lang.IllegalArgumentException"
                 channel="illegalChannel"/>
    <int:mapping exception-type="java.lang.NullPointerException"
                 channel="npeChannel"/>
</int:exception-type-router>

<int:channel id="illegalChannel" />
<int:channel id="npeChannel" />

配置通用路由器

Spring Integration 提供了一个通用路由器。 您可以将其用于通用路由(与 Spring Integration 提供的其他路由器相反,每个路由器都有某种形式的专用化)。spring-doc.cadn.net.cn

使用 XML 配置基于内容的路由器

router元素提供了一种将路由器连接到输入通道的方法,并且还接受可选的default-output-channel属性。 这ref属性引用自定义路由器实现的 bean 名称(必须扩展AbstractMessageRouter). 以下示例显示了三个通用路由器:spring-doc.cadn.net.cn

<int:router ref="payloadTypeRouter" input-channel="input1"
            default-output-channel="defaultOutput1"/>

<int:router ref="recipientListRouter" input-channel="input2"
            default-output-channel="defaultOutput2"/>

<int:router ref="customRouter" input-channel="input3"
            default-output-channel="defaultOutput3"/>

<beans:bean id="customRouterBean" class="org.foo.MyCustomRouter"/>

或者ref可能指向包含@Router注释(稍后显示),或者您可以将ref使用显式方法名称。 指定方法应用@Router注释部分。 以下示例定义了一个路由器,该路由器指向其 POJOref属性:spring-doc.cadn.net.cn

<int:router input-channel="input" ref="somePojo" method="someMethod"/>

我们通常建议使用ref属性,如果自定义路由器实现在其他<router>定义。 但是,如果自定义路由器实现的范围应限定为<router>,您可以提供内部 Bean 定义,如以下示例所示:spring-doc.cadn.net.cn

<int:router method="someMethod" input-channel="input3"
            default-output-channel="defaultOutput3">
    <beans:bean class="org.foo.MyCustomRouter"/>
</int:router>
同时使用ref属性和内部处理程序定义<router>不允许配置。 这样做会创建一个不明确的条件并引发异常。
如果ref属性引用扩展的 beanAbstractMessageProducingHandler(例如框架本身提供的路由器),配置优化为直接引用路由器。 在这种情况下,每个ref属性必须引用单独的 bean 实例(或prototype-scoped bean)或使用<bean/>配置类型。 但是,仅当您未在路由器 XML 定义中提供任何特定于路由器的属性时,此优化才适用。 如果您无意中从多个 Bean 引用了相同的消息处理程序,则会收到配置异常。

以下示例显示了在 Java 中配置的等效路由器:spring-doc.cadn.net.cn

@Bean
@Router(inputChannel = "routingChannel")
public AbstractMessageRouter myCustomRouter() {
    return new AbstractMessageRouter() {

        @Override
        protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
            return // determine channel(s) for message
        }

    };
}

以下示例显示了使用 Java DSL 配置的等效路由器:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlow.from("routingChannel")
            .route(myCustomRouter())
            .get();
}

public AbstractMessageRouter myCustomRouter() {
    return new AbstractMessageRouter() {

        @Override
        protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
            return // determine channel(s) for message
        }

    };
}

或者,可以路由来自消息有效负载的数据,如以下示例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlow.from("routingChannel")
            .route(String.class, p -> p.contains("foo") ? "fooChannel" : "barChannel")
            .get();
}

路由器和 Spring 表达式语言 (SpEL)

有时,路由逻辑可能很简单,为它编写一个单独的类并将其配置为 bean 似乎有些矫枉过正。 从 Spring Integration 2.0 开始,我们提供了一种替代方案,允许您使用 SpEL 实现以前需要自定义 POJO 路由器的简单计算。spring-doc.cadn.net.cn

有关 Spring 表达式语言的更多信息,请参阅 Spring Framework 参考指南中的相关章节

通常,对 SpEL 表达式进行求值,并将其结果映射到通道,如以下示例所示:spring-doc.cadn.net.cn

<int:router input-channel="inChannel" expression="payload.paymentType">
    <int:mapping value="CASH" channel="cashPaymentChannel"/>
    <int:mapping value="CREDIT" channel="authorizePaymentChannel"/>
    <int:mapping value="DEBIT" channel="authorizePaymentChannel"/>
</int:router>

以下示例显示了在 Java 中配置的等效路由器:spring-doc.cadn.net.cn

@Router(inputChannel = "routingChannel")
@Bean
public ExpressionEvaluatingRouter router() {
    ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter("payload.paymentType");
    router.setChannelMapping("CASH", "cashPaymentChannel");
    router.setChannelMapping("CREDIT", "authorizePaymentChannel");
    router.setChannelMapping("DEBIT", "authorizePaymentChannel");
    return router;
}

以下示例显示了在 Java DSL 中配置的等效路由器:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlow.from("routingChannel")
        .route("payload.paymentType", r -> r
            .channelMapping("CASH", "cashPaymentChannel")
            .channelMapping("CREDIT", "authorizePaymentChannel")
            .channelMapping("DEBIT", "authorizePaymentChannel"))
        .get();
}

为了进一步简化事情,SpEL 表达式可以计算为通道名称,如以下表达式所示:spring-doc.cadn.net.cn

<int:router input-channel="inChannel" expression="payload + 'Channel'"/>

在前面的配置中,结果通道由 SpEL 表达式计算,该表达式将payload用文字String,“频道”。spring-doc.cadn.net.cn

SpEL 用于配置路由器的另一个优点是表达式可以返回Collection,有效地使每一个<router>收件人列表路由器。每当表达式返回多个通道值时,消息就会转发到每个通道。以下示例显示了这样的表达式:spring-doc.cadn.net.cn

<int:router input-channel="inChannel" expression="headers.channels"/>

在上述配置中,如果消息包含名称为“channels”的标头,并且该标头的值为List的通道名称,则消息将发送到列表中的每个通道。当需要选择多个通道时,您可能还会发现集合投影和集合选择表达式很有用。有关更多信息,请参阅:spring-doc.cadn.net.cn

使用注释配置路由器

使用时@Router要注释方法,该方法可以返回MessageChannelString类型。 在后一种情况下,终结点解析通道名称,就像解析默认输出通道一样。此外,该方法可以返回单个值或集合。如果返回集合,则回复消息将发送到多个通道。总而言之,以下方法签名都是有效的:spring-doc.cadn.net.cn

@Router
public MessageChannel route(Message message) {...}

@Router
public List<MessageChannel> route(Message message) {...}

@Router
public String route(Foo payload) {...}

@Router
public List<String> route(Foo payload) {...}

除了基于有效负载的路由之外,还可以根据消息标头中可用的元数据作为属性或属性来路由消息。 在这种情况下,一个用@Router可以包含一个标有@Header,该值映射到标头值,如以下示例所示,并在 Annotation Support 中记录:spring-doc.cadn.net.cn

@Router
public List<String> route(@Header("orderStatus") OrderStatus status)
有关基于 XML 的消息的路由,包括 XPath 支持,请参阅 XML 支持 - 处理 XML 有效负载

有关路由器配置的更多信息,另请参见 Java DSL 一章中的消息路由器spring-doc.cadn.net.cn

动态路由器

Spring Integration 为常见的基于内容的路由用例提供了许多不同的路由器配置,以及将自定义路由器实现为 POJO 的选项。 例如PayloadTypeRouter提供了一种简单的方法来配置路由器,该路由器根据传入消息的有效负载类型计算通道,而HeaderValueRouter在配置路由器时提供了相同的便利,该路由器通过评估特定消息标头的值来计算通道。 还有基于表达式 (SpEL) 的路由器,其中信道是根据计算表达式来确定的。 所有这些类型的路由器都表现出一些动态特征。spring-doc.cadn.net.cn

但是,这些路由器都需要静态配置。 即使在基于表达式的路由器的情况下,表达式本身也被定义为路由器配置的一部分,这意味着对相同值运行的相同表达式始终会导致相同通道的计算。 这在大多数情况下是可以接受的,因为此类路由是明确定义的,因此是可预测的。 但有时我们需要动态更改路由器配置,以便消息流可以路由到不同的通道。spring-doc.cadn.net.cn

例如,您可能希望关闭系统的某些部分以进行维护,并暂时将消息重新路由到不同的消息流。 再举一个例子,您可能希望通过添加另一个路由来处理更具体的java.lang.Number(在以下情况下PayloadTypeRouter).spring-doc.cadn.net.cn

不幸的是,使用静态路由器配置来实现这些目标中的任何一个,您必须关闭整个应用程序,更改路由器的配置(更改路由),然后恢复应用程序。 这显然不是任何人想要的解决方案。spring-doc.cadn.net.cn

动态路由器模式描述了在不关闭系统或单个路由器的情况下动态更改或配置路由器的机制。spring-doc.cadn.net.cn

在我们深入了解 Spring Integration 如何支持动态路由的细节之前,我们需要考虑路由器的典型流程:spring-doc.cadn.net.cn

  1. 计算通道标识符,这是路由器在收到消息后计算的值。通常,它是一个字符串或实际MessageChannel.spring-doc.cadn.net.cn

  2. 将通道标识符解析为通道名称。我们将在本节后面描述此过程的细节。spring-doc.cadn.net.cn

  3. 将通道名称解析为实际MessageChannelspring-doc.cadn.net.cn

如果步骤 1 导致MessageChannel,因为MessageChannel是任何路由器作业的最终产品。但是,如果第一步导致通道标识符不是MessageChannel,你有很多可能的方法来影响MessageChannel. 考虑以下有效负载类型路由器的示例:spring-doc.cadn.net.cn

<int:payload-type-router input-channel="routingChannel">
    <int:mapping type="java.lang.String"  channel="channel1" />
    <int:mapping type="java.lang.Integer" channel="channel2" />
</int:payload-type-router>

在有效负载类型路由器的上下文中,前面提到的三个步骤将实现如下:spring-doc.cadn.net.cn

  1. 计算作为有效负载类型的完全限定名称的通道标识符(例如java.lang.String).spring-doc.cadn.net.cn

  2. 将通道标识符解析为通道名称,其中上一步的结果用于从mapping元素。spring-doc.cadn.net.cn

  3. 将通道名称解析为MessageChannel作为对应用程序上下文中 bean 的引用(希望是MessageChannel)由上一步的结果标识。spring-doc.cadn.net.cn

换句话说,每个步骤都为下一步提供信息,直到该过程完成。spring-doc.cadn.net.cn

现在考虑一个标头值路由器的示例:spring-doc.cadn.net.cn

<int:header-value-router input-channel="inputChannel" header-name="testHeader">
    <int:mapping value="foo" channel="fooChannel" />
    <int:mapping value="bar" channel="barChannel" />
</int:header-value-router>

现在我们可以考虑这三个步骤如何适用于标头值路由器:spring-doc.cadn.net.cn

  1. 计算一个通道标识符,该标识符是由header-name属性。spring-doc.cadn.net.cn

  2. 将通道标识符解析为通道名称,其中上一步的结果用于从mapping元素。spring-doc.cadn.net.cn

  3. 将通道名称解析为MessageChannel作为对应用程序上下文中 bean 的引用(希望是MessageChannel)由上一步的结果标识。spring-doc.cadn.net.cn

两种不同路由器类型的上述两种配置看起来几乎相同。 但是,如果您查看HeaderValueRouter我们清楚地看到没有mappingsub 元素,如以下列表所示:spring-doc.cadn.net.cn

<int:header-value-router input-channel="inputChannel" header-name="testHeader">

但是,该配置仍然完全有效。 那么自然的问题是第二步的映射呢?spring-doc.cadn.net.cn

第二步现在是可选的。 如果mapping未定义,则在第一步中计算的通道标识符值将自动被视为channel name,现在解析为实际的MessageChannel,如第三步。 这也意味着第二步是向路由器提供动态特征的关键步骤之一,因为它引入了一个过程,允许您更改通道标识符解析为通道名称的方式,从而影响确定MessageChannel从初始通道标识符。spring-doc.cadn.net.cn

例如,在前面的配置中,假设testHeadervalue 是 'kermit',它现在是通道标识符(第一步)。 由于此路由器中没有映射,因此无法将此信道标识符解析为信道名称(第二步),并且此信道标识符现在被视为信道名称。 但是,如果存在映射但值不同怎么办? 最终结果仍然是相同的,因为如果无法通过将通道标识符解析为通道名称的过程确定新值,则通道标识符将成为通道名称。spring-doc.cadn.net.cn

剩下的就是第三步将通道名称(“kermit”)解析为MessageChannel由此名称标识。 这基本上涉及对所提供名称的 bean 查找。 现在,所有包含标头值对的邮件testHeader=kermit将被路由到MessageChannel其 bean 名称(其id) 是“kermit”。spring-doc.cadn.net.cn

但是,如果您想将这些消息路由到“辛普森”频道怎么办?显然,更改静态配置是有效的,但这样做也需要关闭您的系统。 但是,如果您有权访问通道标识符映射,则可以在标头值对现在所在的位置引入新的映射kermit=simpson,从而让第二步将“kermit”视为通道标识符,同时将其解析为“simpson”作为通道名称。spring-doc.cadn.net.cn

这显然也适用于PayloadTypeRouter,您现在可以在其中重新映射或删除特定的有效负载类型映射。事实上,它适用于所有其他路由器,包括基于表达式的路由器,因为它们的计算值现在有机会通过第二步解析为实际的channel name.spring-doc.cadn.net.cn

任何属于AbstractMappingMessageRouter(包括大多数框架定义的路由器)是动态路由器,因为channelMappingAbstractMappingMessageRouter水平。 该映射的 setter 方法与 'setChannelMapping' 和 'removeChannelMapping' 方法一起作为公共方法公开。这些允许您在运行时更改、添加和删除路由器映射,只要您有对路由器本身的引用。这也意味着您可以通过 JMX(参见 JMX 支持)或 Spring Integration 控制总线(参见 Control Bus)功能公开这些相同的配置选项。spring-doc.cadn.net.cn

回退到通道键作为通道名称是灵活且方便的。但是,如果您不信任消息创建者,则恶意行为者(了解系统)可能会创建路由到意外通道的消息。例如,如果将密钥设置为路由器输入通道的通道名称,则此类消息将被路由回路由器,最终导致堆栈溢出错误。因此,您可能希望禁用此功能(将channelKeyFallback属性设置为false),并根据需要更改映射。
使用控制总线管理路由器映射

管理路由器映射的一种方法是通过控制总线模式,它公开了一个控制通道,您可以向该通道发送控制消息以管理和监视 Spring Integration 组件,包括路由器。spring-doc.cadn.net.cn

有关控制总线的更多信息,请参阅控制总线

通常,您将发送一条控制消息,要求在特定托管组件(如路由器)上调用特定作。以下托管作(方法)特定于更改路由器解析过程:spring-doc.cadn.net.cn

  • public void setChannelMapping(String key, String channelName):允许您在channel identifierchannel namespring-doc.cadn.net.cn

  • public void removeChannelMapping(String key):允许您删除特定的通道映射,从而断开channel identifierchannel namespring-doc.cadn.net.cn

请注意,这些方法可用于简单的更改(例如更新单个路由或添加或删除路由)。但是,如果要删除一条路由并添加另一条路由,则更新不是原子的。这意味着路由表在更新之间可能处于不确定状态。从 4.0 版开始,您现在可以使用控制总线以原子方式更新整个路由表。以下方法可以让您这样做:spring-doc.cadn.net.cn

  • public Map<String, String>getChannelMappings():返回当前映射。spring-doc.cadn.net.cn

  • public void replaceChannelMappings(Properties channelMappings):更新映射。请注意,channelMappings参数是一个Properties对象。 这种布置允许控制总线命令使用内置的StringToPropertiesConverter,如以下示例所示:spring-doc.cadn.net.cn

"@'router.handler'.replaceChannelMappings('foo=qux \n baz=bar')"

请注意,每个映射都由换行符(\n). 对于对地图进行编程更改,我们建议您使用setChannelMappings方法,出于类型安全考虑。replaceChannelMappings忽略不是的键或值String对象。spring-doc.cadn.net.cn

使用 JMX 管理路由器映射

您还可以使用 Spring 的 JMX 支持来公开路由器实例,然后使用您最喜欢的 JMX 客户端(例如 JConsole)来管理这些作(方法)以更改路由器的配置。spring-doc.cadn.net.cn

有关 Spring Integration 的 JMX 支持的更多信息,请参阅 JMX 支持
路由单

从版本 4.1 开始,Spring Integration 提供了路由单企业集成模式的实现。 它被实现为routingSlip消息头,用于确定AbstractMessageProducingHandler实例,当outputChannel未为终结点指定。 此模式在复杂的动态情况下非常有用,因为配置多个路由器来确定消息流可能会变得困难。 当消息到达没有output-channelroutingSlip以确定将消息发送到的下一个通道。 当路由滑移耗尽时,正常replyChannel处理简历。spring-doc.cadn.net.cn

路由单的配置显示为HeaderEnricheroption — 一个以分号分隔的路由单,其中包含path条目,如以下示例所示:spring-doc.cadn.net.cn

<util:properties id="properties">
    <beans:prop key="myRoutePath1">channel1</beans:prop>
    <beans:prop key="myRoutePath2">request.headers[myRoutingSlipChannel]</beans:prop>
</util:properties>

<context:property-placeholder properties-ref="properties"/>

<header-enricher input-channel="input" output-channel="process">
    <routing-slip
        value="${myRoutePath1}; @routingSlipRoutingPojo.get(request, reply);
               routingSlipRoutingStrategy; ${myRoutePath2}; finishChannel"/>
</header-enricher>

前面的示例具有:spring-doc.cadn.net.cn

  • 一个<context:property-placeholder>配置,以证明路由单中的条目path可以指定为可解析的键。spring-doc.cadn.net.cn

  • <header-enricher> <routing-slip>子元素用于填充RoutingSlipHeaderValueMessageProcessorHeaderEnricher处理器。spring-doc.cadn.net.cn

  • RoutingSlipHeaderValueMessageProcessor接受String已解析的路由滑移数组path条目和返回(从processMessage()) 一个singletonMap使用pathkey0作为初始routingSlipIndex.spring-doc.cadn.net.cn

路由单path条目可以包含MessageChannel豆名,RoutingSlipRouteStrategybean 名称和 Spring 表达式 (SpEL)。 这RoutingSlipHeaderValueMessageProcessor检查每个路由单path条目对BeanFactory在第一个processMessage调用。 它将条目(在应用程序上下文中不是 bean 名称)转换为ExpressionEvaluatingRoutingSlipRouteStrategy实例。RoutingSlipRouteStrategy条目被多次调用,直到它们返回 null 或空String.spring-doc.cadn.net.cn

由于路由单涉及getOutputChannelprocess,我们有一个请求-回复上下文。 这RoutingSlipRouteStrategy已引入以确定下一个outputChannel使用requestMessagereply对象。 此策略的实现应在应用程序上下文中注册为 bean,并在路由单中使用其 bean 名称path. 这ExpressionEvaluatingRoutingSlipRouteStrategy提供了实现。 它接受 SpEL 表达式和内部ExpressionEvaluatingRoutingSlipRouteStrategy.RequestAndReply对象用作评估上下文的根对象。 这是为了避免EvaluationContext为每个人创建ExpressionEvaluatingRoutingSlipRouteStrategy.getNextPath()调用。 它是一个简单的 Java bean,具有两个属性:Message<?> requestObject reply. 通过这个表达式实现,我们可以指定路由单path条目(例如,@routingSlipRoutingPojo.get(request, reply)request.headers[myRoutingSlipChannel])并避免为RoutingSlipRouteStrategy.spring-doc.cadn.net.cn

requestMessage参数始终是一个Message<?>. 根据上下文,回复对象可能是Message<?>AbstractIntegrationMessageBuilder,或任意应用程序域对象(例如,当它由服务激活器调用的 POJO 方法返回时)。 在前两种情况下,通常的Message属性 (payloadheaders) 在使用 SpEL(或 Java 实现)时可用。 对于任意域对象,这些属性不可用。 因此,如果将结果用于确定下一条路径,则在将路由单与 POJO 方法结合使用时要小心。
如果分布式环境中涉及布线单,我们建议不要对布线单使用内联表达式path. 此建议适用于分布式环境,例如跨 JVM 应用程序,使用request-reply通过消息代理(例如AMQP 支持JMS 支持),或使用MessageStore (消息存储)在集成流程中。 该框架使用RoutingSlipHeaderValueMessageProcessor将它们转换为ExpressionEvaluatingRoutingSlipRouteStrategy对象,它们用于routingSlip消息头。 由于这个类不是Serializable(不可能,因为它取决于BeanFactory),整个Message变得不可序列化,并且在任何分布式作中,我们最终都会得到一个NotSerializableException. 要克服此限制,请注册一个ExpressionEvaluatingRoutingSlipRouteStrategybean 具有所需的 SpEL,并在路由单中使用其 bean 名称path配置。

对于 Java 配置,您可以添加一个RoutingSlipHeaderValueMessageProcessorinstance 添加到HeaderEnricherbean 定义,如以下示例所示:spring-doc.cadn.net.cn

@Bean
@Transformer(inputChannel = "routingSlipHeaderChannel")
public HeaderEnricher headerEnricher() {
    return new HeaderEnricher(Collections.singletonMap(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
            new RoutingSlipHeaderValueMessageProcessor("myRoutePath1",
                                                       "@routingSlipRoutingPojo.get(request, reply)",
                                                       "routingSlipRoutingStrategy",
                                                       "request.headers[myRoutingSlipChannel]",
                                                       "finishChannel")));
}

当端点生成回复并且没有outputChannel已定义:spring-doc.cadn.net.cn

  • routingSlipIndex用于从路由单获取值path列表。spring-doc.cadn.net.cn

  • 如果routingSlipIndexString,用于从BeanFactory.spring-doc.cadn.net.cn

  • 如果返回的 bean 是MessageChannel,它用作下一个outputChannelroutingSlipIndex在回复邮件头中递增(路由单path条目保持不变)。spring-doc.cadn.net.cn

  • 如果返回的 bean 是RoutingSlipRouteStrategy及其getNextPath不返回空的String,则该结果将用作下一个 Bean 名称outputChannel. 这routingSlipIndex保持不变。spring-doc.cadn.net.cn

  • 如果RoutingSlipRouteStrategy.getNextPath返回一个空的StringnullroutingSlipIndex递增,并且getOutputChannelFromRoutingSlip为下一个布线单递归调用path项目。spring-doc.cadn.net.cn

  • 如果下一个路由单path条目不是String,它必须是RoutingSlipRouteStrategy.spring-doc.cadn.net.cn

  • routingSlipIndex超过布线单的大小pathlist,则算法将移至标准的默认行为replyChannel页眉。spring-doc.cadn.net.cn

Process Manager 企业集成模式

企业集成模式包括流程管理器模式。 现在,您可以使用封装在RoutingSlipRouteStrategy在工艺路线单内。 除了 bean 名称之外,还有RoutingSlipRouteStrategy可以返回任何MessageChannel对象,并且没有要求MessageChannel实例是应用程序上下文中的 bean。 这样,当无法预测应该使用哪个通道时,我们可以提供强大的动态路由逻辑。 一个MessageChannel可以在RoutingSlipRouteStrategy并返回。 一个FixedSubscriberChannel与关联的MessageHandler对于这种情况,实现是一个很好的组合。 例如,您可以路由到响应式流,如以下示例所示:spring-doc.cadn.net.cn

@Bean
public PollableChannel resultsChannel() {
    return new QueueChannel();
}
@Bean
public RoutingSlipRouteStrategy routeStrategy() {
    return (requestMessage, reply) -> requestMessage.getPayload() instanceof String
            ? new FixedSubscriberChannel(m ->
            Mono.just((String) m.getPayload())
                    .map(String::toUpperCase)
                    .subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)))
            : new FixedSubscriberChannel(m ->
            Mono.just((Integer) m.getPayload())
                    .map(v -> v * 2)
                    .subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)));
}

Filter

消息过滤器用于确定Message应根据某些条件(例如邮件标头值或邮件内容本身)传递或删除。 因此,消息过滤器类似于路由器,不同之处在于,对于从过滤器的输入通道接收的每条消息,相同的消息可能会或可能不会发送到过滤器的输出通道。 与路由器不同,它不决定将消息发送到哪个消息通道,而只决定是否发送消息。spring-doc.cadn.net.cn

正如我们在本节后面所述,过滤器还支持丢弃通道。 在某些情况下,它可以根据布尔条件扮演非常简单的路由器(或“交换机”)的角色。

在 Spring Integration 中,您可以将消息过滤器配置为消息端点,该端点委托给MessageSelector接口。 该界面本身非常简单,如以下列表所示:spring-doc.cadn.net.cn

public interface MessageSelector {

    boolean accept(Message<?> message);

}

MessageFilter构造函数接受一个选择器实例,如以下示例所示:spring-doc.cadn.net.cn

MessageFilter filter = new MessageFilter(someSelector);

使用 Java、Groovy 和 Kotlin DSL 配置过滤器

IntegrationFlowBuilder由 Java DSL(也用作 Groovy 和 Kotlin DSL 的基础)提供了许多重载方法filter()算子。 这MessageSelector上面提到的抽象可以用作filter()定义:spring-doc.cadn.net.cn

Java DSL
@Bean
public IntegrationFlow someFlow() {
    return f -> f
              .<String>filter((payload) -> !"junk".equals(payload));
}
Kotlin DSL
@Bean
fun someFlow() =
    integrationFlow {
        filter<String> { it != "junk" }
    }
时髦的 DSL
@Bean
someFlow() {
    integrationFlow {
        filter String, { it != 'junk' }
    }
}

在相应的章节中查看有关 DSL 的更多信息:spring-doc.cadn.net.cn

使用 XML 配置过滤器

结合命名空间和 SpEL,您可以使用很少的 Java 代码配置强大的过滤器。spring-doc.cadn.net.cn

您可以使用<filter>元素用于创建消息选择端点。 除了input-channeloutput-channel属性,它需要一个ref属性。 这ref可以指向MessageSelector实现,如以下示例所示:spring-doc.cadn.net.cn

<int:filter input-channel="input" ref="selector" output-channel="output"/>

<bean id="selector" class="example.MessageSelectorImpl"/>

或者,您可以添加method属性。 在这种情况下,ref属性可以引用任何对象。 引用的方法可能需要Message类型或入站消息的有效负载类型。 该方法必须返回布尔值。 如果该方法返回“true”,则消息将发送到输出通道。 以下示例演示如何配置使用method属性:spring-doc.cadn.net.cn

<int:filter input-channel="input" output-channel="output"
    ref="exampleObject" method="someBooleanReturningMethod"/>

<bean id="exampleObject" class="example.SomeObject"/>

如果选择器或适配的 POJO 方法返回false,一些设置控制被拒绝邮件的处理。 默认情况下,(如果如前面示例所示配置)将以静默方式删除被拒绝的邮件。 如果拒绝应该导致错误条件,请将throw-exception-on-rejection属性设置为true,如以下示例所示:spring-doc.cadn.net.cn

<int:filter input-channel="input" ref="selector"
    output-channel="output" throw-exception-on-rejection="true"/>

如果您希望将被拒绝的消息路由到特定通道,请将该引用作为discard-channel,如以下示例所示:spring-doc.cadn.net.cn

<int:filter input-channel="input" ref="selector"
    output-channel="output" discard-channel="rejectedMessages"/>

如果throwExceptionOnRejection == false和没有discardChannel,则消息将以静默方式删除,并且o.s.i.filter.MessageFilter实例仅发出有关此丢弃消息的警告日志消息(从版本 6.1 开始)。 要在日志中删除没有警告的消息,请NullChannel可以配置为discardChannel在过滤器上。 默认情况下,框架的目标是不要完全静默,如果这是所需的行为,则需要设置显式选项。spring-doc.cadn.net.cn

消息过滤器通常与发布-订阅通道结合使用。许多过滤器端点可能订阅到同一通道,它们决定是否将消息传递到下一个端点,该端点可以是任何受支持的类型(例如服务激活器)。这为使用具有单个点对点输入通道和多个输出通道的消息路由器的更主动的方法提供了一种响应式替代方法。

我们建议使用ref属性(如果自定义过滤器实现在其他<filter>定义。 但是,如果自定义筛选器实现的范围限定为单个<filter>元素,您应该提供内部 bean 定义,如以下示例所示:spring-doc.cadn.net.cn

<int:filter method="someMethod" input-channel="inChannel" output-channel="outChannel">
  <beans:bean class="org.foo.MyCustomFilter"/>
</filter>
同时使用ref属性和内部处理程序定义<filter>不允许配置,因为它会创建不明确的条件并引发异常。
如果ref属性引用扩展的 beanMessageFilter(例如框架本身提供的过滤器),通过将输出通道直接注入过滤器 bean 来优化配置。 在这种情况下,每个ref必须是单独的 bean 实例(或prototype-scoped bean)或使用<bean/>配置类型。 但是,仅当未在过滤器 XML 定义中提供任何特定于过滤器的属性时,此优化才适用。 如果您无意中从多个 Bean 引用了相同的消息处理程序,则会收到配置异常。

随着 SpEL 支持的引入,Spring Integration 添加了expression属性添加到过滤器元件。 它可以用来完全避免 Java 进行简单过滤器,如以下示例所示:spring-doc.cadn.net.cn

<int:filter input-channel="input" expression="payload.equals('nonsense')"/>

作为 expression 属性值传递的字符串被计算为 SpEL 表达式,其中包含在评估上下文中可用的消息。 如果必须将表达式的结果包含在应用程序上下文的作用域中,则可以使用 SpEL 参考文档中定义的表示法,如以下示例所示:#{}spring-doc.cadn.net.cn

<int:filter input-channel="input"
            expression="payload.matches(#{filterPatterns.nonsensePattern})"/>

如果表达式本身需要动态,您可以使用 'expression' 子元素。 这提供了一个间接级别,用于通过其键解析表达式,从ExpressionSource. 这是一个可以直接实现的策略接口,或者您可以依赖 Spring Integration 中可用的版本,该版本从“资源包”加载表达式,并可以在给定的秒数后检查修改。 所有这些都在以下配置示例中进行了演示,如果修改了基础文件,则可以在一分钟内重新加载表达式:spring-doc.cadn.net.cn

<int:filter input-channel="input" output-channel="output">
    <int:expression key="filterPatterns.example" source="myExpressions"/>
</int:filter>

<beans:bean id="myExpressions"
    class="o.s.i.expression.ReloadableResourceBundleExpressionSource">
    <beans:property name="basename" value="config/integration/expressions"/>
    <beans:property name="cacheSeconds" value="60"/>
</beans:bean>

如果ExpressionSourcebean 被命名为expressionSource,则无需在<expression>元素。 但是,在前面的示例中,为了完整起见,我们显示了它。spring-doc.cadn.net.cn

'config/integration/expressions.properties' 文件(或任何具有语言环境扩展名的更具体版本,以加载资源包的典型方式解析)可以包含键/值对,如以下示例所示:spring-doc.cadn.net.cn

filterPatterns.example=payload > 100
所有这些使用expression作为属性或子元素也可以应用于 Transformer、Router、Splitter、Service-Activator 和 Header-Enricher 元素中。 给定组件类型的语义和作用将影响评估结果的解释,就像解释方法调用的返回值一样。 例如,表达式可以返回将路由器组件视为消息通道名称的字符串。 但是,根据消息作为根对象评估表达式并在前缀为“@”时解析 bean 名称的底层功能在 Spring Integration 中的所有核心 EIP 组件中是一致的。

使用注释配置过滤器

以下示例演示如何使用注释配置过滤器:spring-doc.cadn.net.cn

public class PetFilter {
    ...
    @Filter  (1)
    public boolean dogsOnly(String input) {
        ...
    }
}
1 指示此方法将用作过滤器的注释。 如果要将此类用作过滤器,则必须指定它。

XML 元素提供的所有配置选项也可用于@Filter注解。spring-doc.cadn.net.cn

过滤器可以从 XML 显式引用,或者如果@MessageEndpoint注释在类上定义,通过类路径扫描自动检测。spring-doc.cadn.net.cn

分配器

拆分器是一个组件,其作用是将消息分成多个部分,并发送生成的消息以进行独立处理。 很多时候,他们是包括聚合商在内的管道中的上游生产商。spring-doc.cadn.net.cn

编程模型

用于执行拆分的 API 由一个基类AbstractMessageSplitter. 这是一个MessageHandler封装拆分器通用功能的实现,例如填写适当的消息头 (CORRELATION_ID,SEQUENCE_SIZESEQUENCE_NUMBER) 对生成的消息进行。 此填充允许跟踪消息及其处理结果(在典型情况下,这些标头将复制到各种转换终结点生成的消息)。 然后,这些值可以由组合消息处理器使用。spring-doc.cadn.net.cn

以下示例显示了AbstractMessageSplitter:spring-doc.cadn.net.cn

public abstract class AbstractMessageSplitter
    extends AbstractReplyProducingMessageConsumer {
    ...
    protected abstract Object splitMessage(Message<?> message);

}

要在应用程序中实现特定的拆分器,您可以扩展AbstractMessageSplitter并实现splitMessage方法,其中包含用于拆分消息的逻辑。返回值可以是以下值之一:spring-doc.cadn.net.cn

  • 一个Collection或消息数组或Iterable(或Iterator)迭代消息。在这种情况下,消息将作为消息发送(在CORRELATION_ID,SEQUENCE_SIZESEQUENCE_NUMBER填充)。使用此方法可以为您提供更多控制权,例如,在拆分过程中填充自定义邮件标头。spring-doc.cadn.net.cn

  • 一个Collection或非消息对象数组或Iterable(或Iterator)循环访问非消息对象。它的工作原理与前面的情况类似,只是每个集合元素都用作消息有效负载。使用此方法可以让您专注于域对象,而无需考虑消息传递系统,并生成更易于测试的代码。spring-doc.cadn.net.cn

  • 一个Message或非消息对象(但不是集合或数组)。它的工作方式与前面的情况类似,只是发送了一条消息。spring-doc.cadn.net.cn

在 Spring Integration 中,任何 POJO 都可以实现拆分算法,前提是它定义了一个接受单个参数并具有返回值的方法。在这种情况下,方法的返回值将如前所述解释。输入参数可能是Message或一个简单的 POJO。在后一种情况下,拆分器接收传入消息的有效负载。我们推荐这种方法,因为它将代码与 Spring Integration API 解耦,并且通常更容易测试。spring-doc.cadn.net.cn

迭代器

从 4.1 版本开始,AbstractMessageSplitter支持Iteratortype 的value拆分。注意,如果是Iterator(或Iterable),我们无权访问基础项的数量,并且SEQUENCE_SIZEheader 设置为0. 这意味着默认的SequenceSizeReleaseStrategy<aggregator>将不起作用,并且CORRELATION_IDsplitter不会被释放;它将保持为incomplete. 在这种情况下,您应该使用适当的自定义ReleaseStrategy或依赖send-partial-result-on-expirygroup-timeoutMessageGroupStoreReaper.spring-doc.cadn.net.cn

从 5.0 版开始,AbstractMessageSplitter提供protected obtainSizeIfPossible()允许确定大小的方法IterableIterator对象,如果可能的话。 例如XPathMessageSplitter可以确定底层的大小NodeList对象。 从版本 5.0.9 开始,此方法还正确返回com.fasterxml.jackson.core.TreeNode.spring-doc.cadn.net.cn

Iteratorobject 对于避免在拆分之前在内存中构建整个集合很有用。 例如,当基础项是从某些外部系统(例如 DataBase 或 FTP)填充的时MGET)使用迭代或流。spring-doc.cadn.net.cn

流和通量

从 5.0 版开始,AbstractMessageSplitter支持 JavaStream和反应流Publisher类型value拆分。 在这种情况下,目标Iterator建立在它们的迭代功能之上。spring-doc.cadn.net.cn

此外,如果拆分器的输出通道是ReactiveStreamsSubscribableChannelAbstractMessageSplitter产生一个Fluxresult 而不是Iterator,输出通道订阅了这个Flux用于下游流量需求的基于背压的分流。spring-doc.cadn.net.cn

从 5.2 版开始,拆分器支持discardChannel选项,用于发送拆分函数返回空容器(集合、数组、流、Flux等)。在这种情况下,没有要迭代的项目发送到outputChannel. 这null分割结果保留为流结束指示器。spring-doc.cadn.net.cn

使用 Java、Groovy 和 Kotlin DSL 配置拆分器

基于Message及其具有 DSL 配置的可迭代有效负载:spring-doc.cadn.net.cn

Java DSL
@Bean
public IntegrationFlow someFlow() {
    return f -> f.split(Message.class, Message::getPayload);
}
Kotlin DSL
@Bean
fun someFlow() =
    integrationFlow {
        split<Message<*>> { it.payload }
    }
时髦的 DSL
@Bean
someFlow() {
    integrationFlow {
        split Message<?>, { it.payload }
    }
}

有关 DSL 的更多信息,请参阅相应章节:spring-doc.cadn.net.cn

使用 XML 配置拆分器

可以通过 XML 配置拆分器,如下所示:spring-doc.cadn.net.cn

<int:channel id="inputChannel"/>

<int:splitter id="splitter"           (1)
  ref="splitterBean"                  (2)
  method="split"                      (3)
  input-channel="inputChannel"        (4)
  output-channel="outputChannel"      (5)
  discard-channel="discardChannel" /> (6)

<int:channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 拆分器的 ID 是可选的。
2 对在应用程序上下文中定义的 bean 的引用。 bean 必须实现拆分逻辑,如前面的部分所述。 自选。 如果未提供对 bean 的引用,则假定到达input-channeljava.util.Collection默认的拆分逻辑应用于集合,将每个单独的元素合并到消息中并将其发送到output-channel.
3 实现拆分逻辑的方法(在 bean 上定义)。 自选。
4 分路器的输入通道。 必填。
5 拆分器将拆分传入消息的结果发送到的通道。 可选(因为传入消息可以自行指定回复通道)。
6 在拆分结果为空的情况下,请求消息发送到的通道。 可选(它们将停止,如null结果)。

我们建议使用ref属性,如果自定义拆分器实现可以在其他<splitter>定义。 但是,如果自定义拆分器处理程序实现的范围应限定为<splitter>,您可以配置内部 Bean 定义,如下示例所示:spring-doc.cadn.net.cn

<int:splitter id="testSplitter" input-channel="inChannel" method="split"
                output-channel="outChannel">
  <beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
同时使用ref属性和内部处理程序定义<int:splitter>不允许配置,因为它会创建不明确的条件并导致抛出异常。
如果ref属性引用扩展的 beanAbstractMessageProducingHandler(例如框架本身提供的拆分器),通过将输出通道直接注入处理程序来优化配置。 在这种情况下,每个ref必须是单独的 Bean 实例(或prototype-scoped bean)或使用<bean/>配置类型。 但是,仅当未在拆分器 XML 定义中提供任何特定于拆分器的属性时,此优化才适用。 如果您无意中从多个 Bean 引用了相同的消息处理程序,则会收到配置异常。

配置带有注释的拆分器

@Splitter注释适用于期望Messagetype 或消息有效负载类型,并且该方法的返回值应为Collection任何类型的。 如果返回值不是实际值Message对象,每个项目都包装在Message作为Message. 每个结果Message被发送到终端的指定输出通道,其上@Splitter被定义。spring-doc.cadn.net.cn

以下示例演示如何使用@Splitter注解:spring-doc.cadn.net.cn

@Splitter
List<LineItem> extractItems(Order order) {
    return order.getItems()
}

聚合

聚合器基本上是拆分器的镜像,是一种消息处理程序,它接收多条消息并将它们组合成一条消息。 事实上,聚合器通常是包含拆分器的管道中的下游使用者。spring-doc.cadn.net.cn

从技术上讲,聚合器比拆分器更复杂,因为它是有状态的。 它必须保存要聚合的消息,并确定何时准备好聚合完整的消息组。 为此,它需要一个MessageStore.spring-doc.cadn.net.cn

功能性

聚合器通过关联和存储一组相关消息来组合它们,直到该组被视为完整。 此时,聚合器通过处理整个组来创建单个消息,并将聚合消息作为输出发送。spring-doc.cadn.net.cn

实现聚合器需要提供执行聚合的逻辑(即,从多个消息中创建单个消息)。 两个相关的概念是关联和释放。spring-doc.cadn.net.cn

关联确定如何对消息进行分组以进行聚合。 在 Spring Integration 中,默认情况下,基于IntegrationMessageHeaderAccessor.CORRELATION_ID消息头。 具有相同IntegrationMessageHeaderAccessor.CORRELATION_ID被分组在一起。 但是,您可以自定义关联策略,以允许其他方式指定如何将消息分组在一起。 为此,您可以实现CorrelationStrategy(本章后面会介绍)。spring-doc.cadn.net.cn

要确定一组消息准备好处理的时间点,请ReleaseStrategy被咨询。 聚合器的默认发布策略在序列中包含的所有消息都存在时释放组,基于IntegrationMessageHeaderAccessor.SEQUENCE_SIZE页眉。 您可以通过提供对自定义ReleaseStrategy实现。spring-doc.cadn.net.cn

编程模型

聚合 API 由许多类组成:spring-doc.cadn.net.cn

  • 界面MessageGroupProcessor及其子类:MethodInvokingAggregatingMessageGroupProcessorExpressionEvaluatingMessageGroupProcessorspring-doc.cadn.net.cn

  • ReleaseStrategy接口及其默认实现:SimpleSequenceSizeReleaseStrategyspring-doc.cadn.net.cn

  • CorrelationStrategy接口及其默认实现:HeaderAttributeCorrelationStrategyspring-doc.cadn.net.cn

AggregatingMessageHandler

AggregatingMessageHandlerAbstractCorrelatingMessageHandler) 是一个MessageHandler实现,封装聚合器的通用功能(以及其他相关用例),如下所示:spring-doc.cadn.net.cn

决定如何将消息分组在一起的责任委托给CorrelationStrategy实例。 决定是否可以释放消息组的责任委托给ReleaseStrategy实例。spring-doc.cadn.net.cn

以下列表显示了基地的简要亮点AbstractAggregatingMessageGroupProcessor(实施aggregatePayloads方法留给开发者):spring-doc.cadn.net.cn

public abstract class AbstractAggregatingMessageGroupProcessor
              implements MessageGroupProcessor {

    protected Map<String, Object> aggregateHeaders(MessageGroup group) {
        // default implementation exists
    }

    protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);

}

DefaultAggregatingMessageGroupProcessor,ExpressionEvaluatingMessageGroupProcessorMethodInvokingMessageGroupProcessor作为AbstractAggregatingMessageGroupProcessor.spring-doc.cadn.net.cn

从 5.2 版开始,一个Function<MessageGroup, Map<String, Object>>策略可用于AbstractAggregatingMessageGroupProcessor合并和计算(聚合)输出消息的标头。 这DefaultAggregateHeadersFunction实现可用于返回组之间没有冲突的所有标头的逻辑;组中的一封或多封邮件上缺少标头不被视为冲突。 省略冲突的标头。 与新推出的DelegatingMessageGroupProcessor,则此函数用于任何任意(非AbstractAggregatingMessageGroupProcessor) MessageGroupProcessor实现。 本质上,框架将提供的函数注入到AbstractAggregatingMessageGroupProcessor实例并将所有其他实现包装到DelegatingMessageGroupProcessor. 逻辑上的差异AbstractAggregatingMessageGroupProcessorDelegatingMessageGroupProcessor后者在调用委托策略之前不会提前计算标头,并且如果委托返回MessageAbstractIntegrationMessageBuilder. 在这种情况下,框架假定目标实现已负责生成一组填充到返回结果中的正确标头。 这Function<MessageGroup, Map<String, Object>>策略可作为headers-functionreference 属性,作为AggregatorSpec.headersFunction()Java DSL 的选项和AggregatorFactoryBean.setHeadersFunction()用于纯 Java 配置。spring-doc.cadn.net.cn

CorrelationStrategyAbstractCorrelatingMessageHandler并具有基于IntegrationMessageHeaderAccessor.CORRELATION_IDmessage 标头,如以下示例所示:spring-doc.cadn.net.cn

public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
        CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
    ...
    this.correlationStrategy = correlationStrategy == null ?
        new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
    this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
    ...
}

至于消息组的实际处理,默认实现是DefaultAggregatingMessageGroupProcessor. 它创建了一个Message其有效负载是List为给定组接收的有效负载。 这适用于具有拆分器、发布-订阅通道或上游收件人列表路由器的简单分散收集实现。spring-doc.cadn.net.cn

在此类场景中使用发布-订阅通道或收件人列表路由器时,请务必启用apply-sequence旗。 这样做会添加必要的标头:CORRELATION_ID,SEQUENCE_NUMBERSEQUENCE_SIZE. 默认情况下,该行为对 Spring Integration 中的拆分器启用,但未为发布-订阅通道或收件人列表路由器启用,因为这些组件可能用于不需要这些标头的各种上下文。

为应用程序实现特定聚合器策略时,可以扩展AbstractAggregatingMessageGroupProcessor并实现aggregatePayloads方法。 但是,有更好的解决方案,与 API 的耦合较少,用于实现聚合逻辑,可以通过 XML 或通过注释进行配置。spring-doc.cadn.net.cn

通常,任何 POJO 都可以实现聚合算法,前提是它提供了一个接受单个java.util.List作为参数(也支持参数化列表)。 调用此方法用于聚合消息,如下所示:spring-doc.cadn.net.cn

  • 如果参数是java.util.Collection<T>参数类型 T 可分配给Message,则为聚合而累积的整个消息列表将发送到聚合器。spring-doc.cadn.net.cn

  • 如果参数是非参数化的java.util.Collection或者参数类型不可分配给Message,该方法接收累积消息的有效负载。spring-doc.cadn.net.cn

  • 如果返回类型不可分配给Message,则将其视为Message由框架自动创建。spring-doc.cadn.net.cn

为了简化代码和促进最佳实践(例如低耦合、可测试性等),实现聚合逻辑的首选方法是通过 POJO 并使用 XML 或注释支持在应用程序中配置它。

从 5.3 版本开始,在处理消息组后,一个AbstractCorrelatingMessageHandler执行MessageBuilder.popSequenceDetails()具有多个嵌套级别的正确拆分器聚合器方案的消息头修改。 仅当消息组释放结果不是消息集合时,才会执行此作。 在这种情况下,目标MessageGroupProcessor负责MessageBuilder.popSequenceDetails()调用,同时生成这些消息。spring-doc.cadn.net.cn

如果MessageGroupProcessor返回一个Message一个MessageBuilder.popSequenceDetails()仅当sequenceDetails与组中的第一条消息匹配。 (以前,仅当普通有效负载或AbstractIntegrationMessageBuilder已从MessageGroupProcessor.)spring-doc.cadn.net.cn

此功能可由新的popSequence boolean属性,因此MessageBuilder.popSequenceDetails()在某些情况下,当标准拆分器尚未填充关联详细信息时,可以禁用。 从本质上讲,该属性撤消了最近的上游所做的作applySequence = trueAbstractMessageSplitter. 有关更多信息,请参阅拆分器spring-doc.cadn.net.cn

SimpleMessageGroup.getMessages()方法返回一个unmodifiableCollection. 因此,如果聚合 POJO 方法具有Collection<Message>参数,传入的参数正是Collection实例,并且当您使用SimpleMessageStore对于聚合器,该原始Collection<Message>释放群组后被清除。 因此,Collection<Message>如果变量被传递出聚合器,则 POJO 中的变量也会被清除。 如果您希望简单地按原样发布该集合以进行进一步处理,则必须构建一个新的Collection(例如,new ArrayList<Message>(messages)). 从 4.3 版开始,框架不再将消息复制到新集合,以避免创建不需要的额外对象。

在 4.2 版本之前,无法提供MessageGroupProcessor通过使用 XML 配置。 只有 POJO 方法可用于聚合。 现在,如果框架检测到引用的(或内部)bean 实现MessageProcessor,它用作聚合器的输出处理器。spring-doc.cadn.net.cn

如果您希望从自定义MessageGroupProcessor作为消息的有效负载,您的类应该扩展AbstractAggregatingMessageGroupProcessor并实施aggregatePayloads().spring-doc.cadn.net.cn

此外,从 4.2 版本开始,一个SimpleMessageGroupProcessor被提供。 它返回来自组的消息集合,如前所述,这会导致单独发送已释放的消息。spring-doc.cadn.net.cn

这允许聚合器充当消息屏障,在其中,到达的消息将被保留,直到释放策略触发并且组作为单个消息序列被释放。spring-doc.cadn.net.cn

从 6.0 版开始,仅当组处理器是SimpleMessageGroupProcessor. 否则,与任何其他MessageGroupProcessor返回Collection<Message>,则仅发出一条回复消息,并将整个消息集合作为其有效负载。 这种逻辑是由聚合器的规范目的决定的——通过某个键收集请求消息并生成单个分组消息。spring-doc.cadn.net.cn

ReleaseStrategy

ReleaseStrategy接口定义如下:spring-doc.cadn.net.cn

public interface ReleaseStrategy {

  boolean canRelease(MessageGroup group);

}

通常,任何 POJO 都可以实现完成决策逻辑,如果它提供了一个接受单个java.util.List作为参数(也支持参数化列表)并返回布尔值。 在每条新消息到达后调用此方法,以确定组是否完整,如下所示:spring-doc.cadn.net.cn

  • 如果参数是java.util.List<T>和参数类型T可分配给Message,则将组中累积的整个消息列表发送到该方法。spring-doc.cadn.net.cn

  • 如果参数是非参数化的java.util.List或者参数类型不可分配给Message,该方法接收累积消息的有效负载。spring-doc.cadn.net.cn

  • 该方法必须返回true如果消息组已准备好进行聚合,否则为 false。spring-doc.cadn.net.cn

以下示例演示如何使用@ReleaseStrategy注释List类型Message:spring-doc.cadn.net.cn

public class MyReleaseStrategy {

    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<Message<?>>) {...}
}

以下示例演示如何使用@ReleaseStrategy注释List类型String:spring-doc.cadn.net.cn

public class MyReleaseStrategy {

    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<String>) {...}
}

基于前两个示例中的签名,基于 POJO 的发布策略会传递一个Collection尚未发布的消息(如果您需要访问整个Message) 或Collection有效负载对象(如果 type 参数是Message). 这满足了大多数用例。 但是,如果由于某种原因,您需要访问完整的MessageGroup,您应该提供ReleaseStrategy接口。spring-doc.cadn.net.cn

在处理潜在的大型组时,应了解如何调用这些方法,因为在释放组之前可能会多次调用发布策略。 最有效的是实现ReleaseStrategy,因为聚合器可以直接调用它。 第二高效的是 POJO 方法,其中Collection<Message<?>>参数类型。 效率最低的是具有Collection<Something>类型。 框架必须将有效负载从组中的消息复制到一个新的集合中(并可能尝试将有效负载转换为Something) 调用发布策略时。 用Collection<?>避免转换,但仍需要创建新的Collection.spring-doc.cadn.net.cn

出于这些原因,对于大型团体,我们建议您实施ReleaseStrategy.spring-doc.cadn.net.cn

当组被释放以进行聚合时,将处理其所有尚未释放的消息,并将其从组中除去。如果组也已完成(即,如果序列中的所有消息都已到达,或者未定义序列),则该组将标记为完成。此组的任何新消息都将发送到丢弃通道(如果已定义)。 设置expire-groups-upon-completiontrue(默认值为false) 删除整个组,并且任何新消息(与删除的组具有相同的相关 ID)形成一个新组。您可以使用MessageGroupStoreReapersend-partial-result-on-expiry设置为true.spring-doc.cadn.net.cn

为了便于丢弃延迟到达的消息,聚合器必须在释放组后维护有关该组的状态。这最终可能会导致内存不足的情况。为避免这种情况,您应该考虑配置MessageGroupStoreReaper以删除组元数据。到期参数应设置为在到达某个点后使组过期,在此之后预计不会到达延迟消息。有关配置收割器的信息,请参阅在聚合器中管理状态:MessageGroupStore.

Spring Integration 提供了一个实现ReleaseStrategy:SimpleSequenceSizeReleaseStrategy. 此实现会咨询SEQUENCE_NUMBERSEQUENCE_SIZE每个到达消息的标头,以确定消息组何时完成并准备好聚合。如前所述,它也是默认策略。spring-doc.cadn.net.cn

在 5.0 版本之前,默认发布策略为SequenceSizeReleaseStrategy,这在大型组中表现不佳。使用该策略,可以检测并拒绝重复的序列号。此作可能会很昂贵。

如果要聚合大型组,则不需要释放部分组,也不需要检测/拒绝重复序列,请考虑使用SimpleSequenceSizeReleaseStrategy相反 - 对于这些用例,它效率要高得多,并且是自 5.0 版以来未指定部分组发布的默认值。spring-doc.cadn.net.cn

聚合大型组

4.3 版本更改了默认值Collection对于SimpleMessageGroupHashSet(它以前是一个BlockingQueue). 当从大型组中删除单个消息时,这很昂贵(需要 O(n) 线性扫描)。尽管哈希集的删除速度通常要快得多,但对于大型消息来说,它的成本可能会很高,因为必须在插入和删除时计算哈希。如果您的消息哈希成本很高,请考虑使用其他集合类型。如 中所述MessageGroupFactory一个SimpleMessageGroupFactory,以便您可以选择Collection最适合您的需求。 您还可以提供自己的工厂实现来创建其他一些Collection<Message<?>>.spring-doc.cadn.net.cn

以下示例演示如何使用上一个实现配置聚合器和SimpleSequenceSizeReleaseStrategy:spring-doc.cadn.net.cn

<int:aggregator input-channel="aggregate"
    output-channel="out" message-store="store" release-strategy="releaser" />

<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
    <property name="messageGroupFactory">
        <bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
            <constructor-arg value="BLOCKING_QUEUE"/>
        </bean>
    </property>
</bean>

<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果过滤器端点涉及聚合器的上游流,则序列大小发布策略(固定或基于sequenceSizeheader)不会达到其目的,因为过滤器可能会丢弃序列中的某些消息。 在这种情况下,建议选择另一个ReleaseStrategy,或使用从丢弃子流发送的补偿消息,该子流的内容中带有一些信息,以便在自定义完整组函数中跳过。 有关详细信息,请参阅过滤器
相关策略

CorrelationStrategy接口定义如下:spring-doc.cadn.net.cn

public interface CorrelationStrategy {

  Object getCorrelationKey(Message<?> message);

}

该方法返回一个Object表示用于将消息与消息组相关联的关联键。 密钥必须满足用于Map关于执行equals()hashCode().spring-doc.cadn.net.cn

一般来说,任何 POJO 都可以实现关联逻辑,并且将消息映射到方法的参数(或多个参数)的规则与ServiceActivator(包括对@Header注释)。 该方法必须返回一个值,并且该值不得null.spring-doc.cadn.net.cn

Spring Integration 提供了一个实现CorrelationStrategy:HeaderAttributeCorrelationStrategy. 此实现返回其中一个消息头(其名称由构造函数参数指定)的值作为关联键。 默认情况下,关联策略是HeaderAttributeCorrelationStrategy返回CORRELATION_IDheader 属性。 如果要用于关联的自定义标头名称,可以在HeaderAttributeCorrelationStrategy并将其作为聚合器关联策略的参考。spring-doc.cadn.net.cn

锁定注册表

对组的更改是线程安全的。 因此,当您同时发送同一相关 ID 的消息时,聚合器中只会处理其中一个消息,从而有效地将其作为每个消息组的单线程。 一个LockRegistry用于获取已解析相关 ID 的锁。 一个DefaultLockRegistry默认使用(内存中)。 用于在共享的服务器之间同步更新MessageGroupStore,则必须配置共享锁注册表。spring-doc.cadn.net.cn

避免死锁

如上所述,当消息组发生变化(添加或释放消息)时,将保持锁定。spring-doc.cadn.net.cn

请考虑以程:spring-doc.cadn.net.cn

...->aggregator1-> ... ->aggregator2-> ...

如果存在多个线程,并且聚合器共享一个公共锁注册表,则可能会出现死锁。 这将导致挂起线程和jstack <pid>可能会显示以下结果:spring-doc.cadn.net.cn

Found one Java-level deadlock:
=============================
"t2":
  waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t1"
"t1":
  waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t2"

有几种方法可以避免此问题:spring-doc.cadn.net.cn

  • 确保每个聚合器都有自己的锁注册表(这可以是跨应用程序实例的共享注册表,但流中的两个或多个聚合器必须每个都有一个不同的注册表)spring-doc.cadn.net.cn

  • 使用ExecutorChannelQueueChannel作为聚合器的输出通道,以便下游流在新线程上运行spring-doc.cadn.net.cn

  • 从 5.1.1 版本开始,将releaseLockBeforeSendaggregator 属性设置为truespring-doc.cadn.net.cn

如果由于某种原因,单个聚合器的输出最终被路由回同一聚合器,也可能导致此问题。 当然,上述第一个解决方案不适用于这种情况。

在 Java DSL 中配置聚合器

有关如何在 Java DSL 中配置聚合器,请参阅聚合器和重排序器spring-doc.cadn.net.cn

使用 XML 配置聚合器

Spring Integration 支持通过<aggregator/>元素。 以下示例显示了聚合器的示例:spring-doc.cadn.net.cn

<channel id="inputChannel"/>

<int:aggregator id="myAggregator"                          (1)
        auto-startup="true"                                (2)
        input-channel="inputChannel"                       (3)
        output-channel="outputChannel"                     (4)
        discard-channel="throwAwayChannel"                 (5)
        message-store="persistentMessageStore"             (6)
        order="1"                                          (7)
        send-partial-result-on-expiry="false"              (8)
        send-timeout="1000"                                (9)

        correlation-strategy="correlationStrategyBean"     (10)
        correlation-strategy-method="correlate"            (11)
        correlation-strategy-expression="headers['foo']"   (12)

        ref="aggregatorBean"                               (13)
        method="aggregate"                                 (14)

        release-strategy="releaseStrategyBean"             (15)
        release-strategy-method="release"                  (16)
        release-strategy-expression="size() == 5"          (17)

        expire-groups-upon-completion="false"              (18)
        empty-group-min-timeout="60000"                    (19)

        lock-registry="lockRegistry"                       (20)

        group-timeout="60000"                              (21)
        group-timeout-expression="size() ge 2 ? 100 : -1"  (22)
        expire-groups-upon-timeout="true"                  (23)

        scheduler="taskScheduler" >                        (24)
            <expire-transactional/>                        (25)
            <expire-advice-chain/>                         (26)
</aggregator>

<int:channel id="outputChannel"/>

<int:channel id="throwAwayChannel"/>

<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
    <constructor-arg ref="dataSource"/>
</bean>

<bean id="aggregatorBean" class="sample.PojoAggregator"/>

<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>

<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 聚合器的 id 是可选的。
2 生命周期属性,指示是否应在应用程序上下文启动期间启动聚合器。可选(默认值为“true”)。
3 聚合器从中接收消息的通道。 必填。
4 聚合器将聚合结果发送到的通道。可选(因为传入消息本身可以在“replyChannel”消息标头中指定回复通道)。
5 聚合器向其发送超时消息的通道(如果send-partial-result-on-expiryfalse). 自选。
6 MessageGroupStore用于将消息组存储在其相关键下,直到它们完成。 自选。 默认情况下,它是一个易失性的内存中存储。有关详细信息,请参阅消息存储
7 当多个句柄订阅同一句柄时,此聚合器的顺序DirectChannel(用于负载平衡目的)。 自选。
8 指示过期消息应聚合并发送到 'output-channel' 或 'replyChannel',一旦其中包含MessageGroup已过期(请参阅MessageGroupStore.expireMessageGroups(long)). 使MessageGroup是通过配置MessageGroupStoreReaper. 但是,您也可以过期MessageGroup通过调用MessageGroupStore.expireMessageGroups(timeout). 您可以通过控制总线作来实现此目的,或者,如果您有对MessageGroupStore实例,通过调用expireMessageGroups(timeout). 否则,此属性本身不执行任何作。它仅用作是丢弃或发送到输出或回复通道中仍位于MessageGroup即将过期。可选(默认值为false). 注意:此属性可能更合适地称为send-partial-result-on-timeout,因为如果出现以下情况,则组实际上可能不会过期expire-groups-upon-timeout设置为false.
9 发送回复时等待的超时间隔Messageoutput-channeldiscard-channel. 默认为30秒。 仅当输出通道具有一些“发送”限制时才应用它,例如QueueChannel具有固定的“容量”。在这种情况下,一个MessageDeliveryException被抛出。 为AbstractSubscribableChannel实现,则send-timeout被忽略。 为group-timeout(-expression)MessageDeliveryException从计划过期任务中,将导致重新计划此任务。 自选。
10 对实现消息关联(分组)算法的 Bean 的引用。Bean 可以是CorrelationStrategy接口或 POJO。在后一种情况下,correlation-strategy-method属性也必须定义。可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID标头)。
11 在引用的 bean 上定义的方法correlation-strategy. 它实现了相关决策算法。 可选,有限制 (correlation-strategy必须存在)。
12 表示相关策略的 SpEL 表达式。 例:"headers['something']". 只有其中一个correlation-strategycorrelation-strategy-expression是允许的。
13 对在应用程序上下文中定义的 bean 的引用。 bean 必须实现聚合逻辑,如前所述。 可选(默认情况下,聚合消息列表将成为输出消息的有效负载)。
14 在 bean 上定义的方法,由ref属性。 它实现了消息聚合算法。 可选(取决于ref属性)。
15 对实现发布策略的 Bean 的引用。 bean 可以是ReleaseStrategy接口或 POJO。在后一种情况下,release-strategy-method属性也必须定义。可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZEheader 属性)。
16 在 bean 上定义的方法,由release-strategy属性。 它实现了完成决策算法。 可选,有限制 (release-strategy必须存在)。
17 表示发布策略的 SpEL 表达式。 表达式的根对象是MessageGroup. 例:"size() == 5". 只有其中一个release-strategyrelease-strategy-expression是允许的。
18 当设置为true(默认值为false),则已完成的组将从消息存储中删除,从而使具有相同关联的后续消息形成一个新组。 默认行为是将与已完成组具有相同关联的消息发送到discard-channel.
19 仅当MessageGroupStoreReaperMessageStore<aggregator>. 默认情况下,当MessageGroupStoreReaper配置为使部分组过期,空组也会被删除。 正常释放组后存在空组。 空组支持检测和丢弃迟到的邮件。 如果您希望空组过期的时间比过期的部分组更长的时间,请设置此属性。 然后,空组不会从MessageStore直到它们至少在这个毫秒数内没有被修改。 请注意,空组的实际过期时间也受到收割者的timeout属性,它可以与此值加上超时一样多。
20 org.springframework.integration.util.LockRegistry豆。 它曾经获得一个Lock基于groupId对于MessageGroup. 默认情况下,内部DefaultLockRegistry被使用。 使用LockRegistry,例如ZookeeperLockRegistry,确保聚合器只有一个实例可以同时对组进行作。 有关更多信息,请参阅 Redis 锁注册表Zookeeper 锁注册表
21 超时(以毫秒为单位)强制MessageGroupReleaseStrategy当前消息到达时不会释放组。 此属性为聚合器提供了内置的基于时间的发布策略,当需要发出部分结果(或丢弃组)时,如果新消息未到达MessageGroup在从最后一条消息到达开始计算的超时内。 要设置一个超时,该超时从时间开始计算MessageGroup创建了,请参阅group-timeout-expression信息。 当新消息到达聚合器时,任何现有的ScheduledFuture<?>对于它的MessageGroup被取消。 如果ReleaseStrategy返回false(意思是不要释放)和groupTimeout > 0,则计划新任务使组过期。 我们不建议将此属性设置为零(或负值)。 这样做会有效地禁用聚合器,因为每个消息组都会立即完成。 但是,您可以使用表达式有条件地将其设置为零(或负值)。 看group-timeout-expression以获取信息。 在完成期间采取的作取决于ReleaseStrategysend-partial-group-on-expiry属性。 有关详细信息,请参阅聚合器和组超时。 它与“group-timeout-expression”属性互斥。
22 计算结果为groupTimeout使用MessageGroup作为#root评估上下文对象。 用于调度MessageGroup强制完成。 如果表达式的计算结果为null,则未计划完成。 如果计算结果为零,则该组将立即在当前线程上完成。 实际上,这提供了一个动态的group-timeout财产。 例如,如果您希望强制完成MessageGroup自创建组以来经过 10 秒后,您可以考虑使用以下 SpEL 表达式:timestamp + 10000 - T(System).currentTimeMillis()哪里timestampMessageGroup.getTimestamp()作为MessageGroup这是#root评估上下文对象。 但请记住,组创建时间可能与第一个到达消息的时间不同,具体取决于其他组过期属性的配置。 看group-timeout了解更多信息。 与“group-timeout”属性互斥。
23 当组由于超时(或MessageGroupStoreReaper),默认情况下,该组已过期(完全删除)。 迟到的邮件会启动一个新组。 您可以将其设置为false以完成组,但保留其元数据,以便丢弃延迟到达的邮件。 空组可以稍后使用MessageGroupStoreReaperempty-group-min-timeout属性。 它默认为 'true'。
24 一个TaskSchedulerbean 引用来调度MessageGroup如果没有新消息到达,则强制完成MessageGroupgroupTimeout. 如果未提供,则默认调度程序 (taskScheduler) 在ApplicationContext (ThreadPoolTaskScheduler) 被使用。 如果出现以下情况,则此属性不适用group-timeoutgroup-timeout-expression未指定。
25 从 4.1 版本开始。 它允许为forceComplete操作。 它是从group-timeout(-expression)或通过MessageGroupStoreReaper并且不适用于法线add,releasediscard操作。 只有这个子元素或<expire-advice-chain/>是允许的。
26 4.1 版本开始。 它允许配置任何Advice对于forceComplete操作。 它是从group-timeout(-expression)或通过MessageGroupStoreReaper并且不适用于法线add,releasediscard操作。 只有这个子元素或<expire-transactional/>是允许的。 交易Advice也可以在此处使用 Spring 进行配置txNamespace。
即将过期的组

有两个属性与即将过期(完全删除)的组相关。 当一个组过期时,没有它的记录,如果新消息到达具有相同的关联性,那么将启动一个新组。 当组完成(没有过期)时,空组将保留,并丢弃迟到的消息。 稍后可以使用MessageGroupStoreReaper结合empty-group-min-timeout属性。spring-doc.cadn.net.cn

expire-groups-upon-completion与“正常”完成相关,当ReleaseStrategy释放组。 默认为false.spring-doc.cadn.net.cn

如果组未正常完成,但由于超时而被释放或丢弃,则该组通常已过期。 从 4.1 版开始,您可以使用以下命令来控制此行为expire-groups-upon-timeout. 它默认为true用于向后兼容性。spring-doc.cadn.net.cn

当组超时时,ReleaseStrategy再给一次释放小组的机会。 如果它这样做并且expire-groups-upon-timeout为false,则过期由expire-groups-upon-completion. 如果在超时期间释放策略未释放组,则过期时间由expire-groups-upon-timeout. 超时组要么被丢弃,要么发生部分释放(基于send-partial-result-on-expiry).

从 5.0 版开始,空组也计划在empty-group-min-timeout. 如果expireGroupsUponCompletion == falseminimumTimeoutForEmptyGroups > 0,则在正常或部分序列发布发生时安排删除组的任务。spring-doc.cadn.net.cn

从版本 5.4 开始,可以将聚合器(和重排序器)配置为使孤立组(持久性消息存储中可能不会释放的组)过期。 这expireTimeout(如果大于0) 表示应清除存储中早于此值的组。 这purgeOrphanedGroups()方法在启动时被调用,并且与提供的expireDuration,在计划任务中定期进行。 该方法也可以随时在外部调用。 过期逻辑完全委托给forceComplete(MessageGroup)根据上述提供的到期选项的功能。 当需要从那些旧组中清理消息存储时,这种定期清除功能非常有用,这些旧组将不再使用常规消息到达逻辑释放。 在大多数情况下,当使用持久性消息组存储时,这会在应用程序重新启动后发生。 该功能类似于MessageGroupStoreReaper使用计划任务,但当使用组超时而不是收割器时,提供了一种处理特定组件内旧组的便捷方法。 这MessageGroupStore必须专门为当前相关终结点提供。 否则,一个聚合器可能会从另一个聚合器中清除组。 使用聚合器,使用此技术过期的组将被丢弃或作为部分组释放,具体取决于expireGroupsUponCompletion财产。spring-doc.cadn.net.cn

我们通常建议使用ref属性,如果自定义聚合器处理程序实现可以在其他<aggregator>定义。 但是,如果自定义聚合器实现仅由<aggregator>,您可以使用内部 bean 定义(从 1.0.3 版开始)在<aggregator>元素,如以下示例所示:spring-doc.cadn.net.cn

<aggregator input-channel="input" method="sum" output-channel="output">
    <beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
同时使用ref属性和内部 Bean 定义<aggregator>不允许配置,因为它会产生不明确的条件。 在这种情况下,将引发异常。

以下示例显示了聚合器 bean 的实现:spring-doc.cadn.net.cn

public class PojoAggregator {

  public Long add(List<Long> results) {
    long total = 0l;
    for (long partialResult: results) {
      total += partialResult;
    }
    return total;
  }
}

前面示例的完成策略 bean 的实现可能如下所示:spring-doc.cadn.net.cn

public class PojoReleaseStrategy {
...
  public boolean canRelease(List<Long> numbers) {
    int sum = 0;
    for (long number: numbers) {
      sum += number;
    }
    return sum >= maxValue;
  }
}
只要有意义,就可以将发布策略方法和聚合器方法组合成一个 bean。

上面示例的关联策略 bean 的实现可能如下所示:spring-doc.cadn.net.cn

public class PojoCorrelationStrategy {
...
  public Long groupNumbersByLastDigit(Long number) {
    return number % 10;
  }
}

前面示例中的聚合器将按某些条件(在本例中为除以 10 后的余数)对数字进行分组,并保留该组,直到有效负载提供的数字之和超过某个值。spring-doc.cadn.net.cn

只要有意义,就可以将发布策略方法、关联策略方法和聚合器方法组合在单个 Bean 中。 (实际上,它们都可以组合起来,也可以组合其中任何两个。
聚合器和 Spring 表达式语言 (SpEL)

从 Spring Integration 2.0 开始,您可以使用 SpEL 处理各种策略(关联、发布和聚合),如果此类发布策略背后的逻辑相对简单,我们建议使用这种策略。 假设您有一个旧组件,该组件旨在接收对象数组。 我们知道,默认发布策略会在List. 现在我们有两个问题。 首先,我们需要从列表中提取单个消息。 其次,我们需要提取每条消息的有效负载并组装对象数组。 以下示例解决了这两个问题:spring-doc.cadn.net.cn

public String[] processRelease(List<Message<String>> messages){
    List<String> stringList = new ArrayList<String>();
    for (Message<String> message : messages) {
        stringList.add(message.getPayload());
    }
    return stringList.toArray(new String[]{});
}

但是,使用 SpEL,实际上可以使用单行表达式相对容易地处理此类需求,从而避免您编写自定义类并将其配置为 bean。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

<int:aggregator input-channel="aggChannel"
    output-channel="replyChannel"
    expression="#this.![payload].toArray()"/>

在前面的配置中,我们使用集合投影表达式从列表中所有消息的有效负载中组装一个新集合,然后将其转换为数组,从而获得与前面的 Java 代码相同的结果。spring-doc.cadn.net.cn

在处理自定义发布和关联策略时,可以应用相同的基于表达式的方法。spring-doc.cadn.net.cn

而不是为自定义定义 beanCorrelationStrategycorrelation-strategy属性,您可以将简单的关联逻辑实现为 SpEL 表达式,并在correlation-strategy-expression属性,如以下示例所示:spring-doc.cadn.net.cn

correlation-strategy-expression="payload.person.id"

在前面的示例中,我们假设有效负载具有person属性,并带有id,这将用于关联消息。spring-doc.cadn.net.cn

同样,对于ReleaseStrategy,您可以将发布逻辑实现为 SpEL 表达式,并在release-strategy-expression属性。 评估上下文的根对象是MessageGroup本身。 这List的消息可以通过使用message表达式中组的属性。spring-doc.cadn.net.cn

在 5.0 版之前的版本中,根对象是Message<?>,如前面的示例所示:
release-strategy-expression="!messages.?[payload==5].empty"

在前面的示例中,SpEL 评估上下文的根对象是MessageGroup本身,并且您表示,一旦出现有效负载为5在这个组中,该组应该被释放。spring-doc.cadn.net.cn

聚合器和组超时

从 4.0 版开始,引入了两个新的互斥属性:group-timeoutgroup-timeout-expression. 请参阅使用 XML 配置聚合器。 在某些情况下,如果ReleaseStrategy当前消息到达时不会释放。 为此,该groupTimeout选项允许安排MessageGroup强制完成,如以下示例所示:spring-doc.cadn.net.cn

<aggregator input-channel="input" output-channel="output"
        send-partial-result-on-expiry="true"
        group-timeout-expression="size() ge 2 ? 10000 : -1"
        release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>

在此示例中,如果聚合器按顺序接收最后一条消息,则正常发布是可能的,如release-strategy-expression. 如果该特定消息未到达,则groupTimeout强制组在十秒后完成,只要该组至少包含两个消息。spring-doc.cadn.net.cn

强制组完成的结果取决于ReleaseStrategysend-partial-result-on-expiry. 首先,再次咨询发布策略,看看是否要进行正常发布。 虽然组没有改变,但ReleaseStrategy这个时候可以决定放团。 如果发布策略仍未释放组,则该组已过期。 如果send-partial-result-on-expirytrue,(部分) 中的现有消息MessageGroup作为普通聚合器回复消息发布给output-channel. 否则,它将被丢弃。spring-doc.cadn.net.cn

之间有区别groupTimeout行为和MessageGroupStoreReaper(请参阅使用 XML 配置聚合器)。 收割者为所有MessageGroups 在MessageGroupStore周期性地。 这groupTimeout为每个人做MessageGroup如果在groupTimeout. 此外,收割器可用于删除空组(如果出现以下情况,则保留空组以丢弃延迟消息expire-groups-upon-completion是假的)。spring-doc.cadn.net.cn

从 5.5 版本开始,groupTimeoutExpression可以评估为java.util.Date实例。 这在根据组创建时间 (MessageGroup.getTimestamp()),而不是当前消息到达,因为它是在groupTimeoutExpression被评估为long:spring-doc.cadn.net.cn

group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
使用注释配置聚合器

以下示例显示了配置了注释的聚合器:spring-doc.cadn.net.cn

public class Waiter {
  ...

  @Aggregator  (1)
  public Delivery aggregatingMethod(List<OrderItem> items) {
    ...
  }

  @ReleaseStrategy  (2)
  public boolean releaseChecker(List<Message<?>> messages) {
    ...
  }

  @CorrelationStrategy  (3)
  public String correlateBy(OrderItem item) {
    ...
  }
}
1 指示此方法应用作聚合器的注释。 如果将此类用作聚合器,则必须指定它。
2 指示此方法用作聚合器的发布策略的注释。 如果任何方法上都不存在,聚合器将使用SimpleSequenceSizeReleaseStrategy.
3 指示该方法应用作聚合器的相关策略的注释。 如果未指示相关策略,聚合器将使用HeaderAttributeCorrelationStrategy基于CORRELATION_ID.

XML 元素提供的所有配置选项也可用于@Aggregator注解。spring-doc.cadn.net.cn

聚合器可以从 XML 显式引用,或者如果@MessageEndpoint在类上定义,通过类路径扫描自动检测。spring-doc.cadn.net.cn

注释配置 (@Aggregator等)仅涵盖简单的用例,其中大多数默认选项就足够了。 如果您在使用注释配置时需要对这些选项进行更多控制,请考虑使用@Bean定义AggregatingMessageHandler并标记其@Bean方法@ServiceActivator,如以下示例所示:spring-doc.cadn.net.cn

@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
     AggregatingMessageHandler aggregator =
                       new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                                                 jdbcMessageGroupStore);
     aggregator.setOutputChannel(resultsChannel());
     aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
     aggregator.setTaskScheduler(this.taskScheduler);
     return aggregator;
}
从 4.2 版开始,AggregatorFactoryBean可用于简化AggregatingMessageHandler.

在聚合器中管理状态:MessageGroupStore

聚合器(以及 Spring Integration 中的其他一些模式)是一种有状态模式,它要求根据一段时间内到达的一组消息做出决策,所有这些消息都具有相同的关联键。 有状态模式中接口的设计(例如ReleaseStrategy)的原则是组件(无论是由框架定义还是由用户定义)都应该能够保持无状态。 所有状态都由MessageGroup其管理权委托给MessageGroupStore. 这MessageGroupStore接口定义如下:spring-doc.cadn.net.cn

public interface MessageGroupStore {

    int getMessageCountForAllMessageGroups();

    int getMarkedMessageCountForAllMessageGroups();

    int getMessageGroupCount();

    MessageGroup getMessageGroup(Object groupId);

    MessageGroup addMessageToGroup(Object groupId, Message<?> message);

    MessageGroup markMessageGroup(MessageGroup group);

    MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);

    MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);

    void removeMessageGroup(Object groupId);

    void registerMessageGroupExpiryCallback(MessageGroupCallback callback);

    int expireMessageGroups(long timeout);
}

有关更多信息,请参阅 Javadocspring-doc.cadn.net.cn

MessageGroupStoreMessageGroups在等待触发发布策略时,该事件可能永远不会发生。 因此,为了防止过时消息挥之不去,并让易失性存储在应用程序关闭时提供用于清理的钩子,MessageGroupStore允许您注册回调以应用于其MessageGroups当它们过期时。界面非常简单,如以下列表所示:spring-doc.cadn.net.cn

public interface MessageGroupCallback {

    void execute(MessageGroupStore messageGroupStore, MessageGroup group);

}

回调可以直接访问存储和消息组,以便它可以管理持久状态(例如,通过从存储中完全删除组)。spring-doc.cadn.net.cn

MessageGroupStore维护这些回调的列表,根据需要将其应用于时间戳早于作为参数提供的时间的所有消息(请参阅registerMessageGroupExpiryCallback(..)expireMessageGroups(..)方法,如前所述)。spring-doc.cadn.net.cn

重要的是不要使用相同的MessageGroupStore实例,当您打算依赖expireMessageGroups功能性。 每AbstractCorrelatingMessageHandler注册自己的MessageGroupCallback基于forceComplete()回调。 这样,每个过期的组可能会被错误的聚合器完成或丢弃。从 5.0.10 版开始,一个UniqueExpiryCallbackAbstractCorrelatingMessageHandler对于MessageGroupStore. 这MessageGroupStore,反过来,检查是否存在此类的实例,并记录错误并附上相应消息(如果回调集中已存在)。这样,框架就不允许使用MessageGroupStore实例,以避免上述过期的副作用,而不是由特定相关处理程序创建的组。

您可以调用expireMessageGroups方法,并带有超时值。任何早于当前时间减去此值的消息都已过期并应用了回调。因此,是存储的用户定义了消息组“过期”的含义。spring-doc.cadn.net.cn

为了方便用户,Spring Integration 以MessageGroupStoreReaper,如以下示例所示:spring-doc.cadn.net.cn

<bean id="reaper" class="org...MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="messageStore"/>
    <property name="timeout" value="30000"/>
</bean>

<task:scheduled-tasks scheduler="scheduler">
    <task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>

收割者是一个Runnable. 在前面的示例中,消息组存储的 expire 方法每 10 秒调用一次。超时本身为 30 秒。spring-doc.cadn.net.cn

重要的是要了解 的 'timeout' 属性MessageGroupStoreReaper是一个近似值,受任务计划程序速率的影响,因为此属性仅在下一次计划执行MessageGroupStoreReaper任务。 例如,如果超时设置为 10 分钟,但MessageGroupStoreReaper任务计划每小时运行一次,并且最后一次执行MessageGroupStoreReaper任务发生在超时前一分钟,则MessageGroup在接下来的 59 分钟内不会过期。 因此,我们建议将速率设置为至少等于超时值或更短。

除了收割器之外,当应用程序关闭时,也会通过AbstractCorrelatingMessageHandler.spring-doc.cadn.net.cn

AbstractCorrelatingMessageHandler注册自己的过期回调,这是带有布尔标志的链接send-partial-result-on-expiry在聚合器的 XML 配置中。如果标志设置为true,则在调用过期回调时,组中尚未释放的任何未标记消息都可以发送到输出通道。spring-doc.cadn.net.cn

由于MessageGroupStoreReaper从计划任务调用,并且可能会导致消息的生成(取决于sendPartialResultOnExpiryoption) 给下游集成流,建议提供自定义TaskScheduler使用MessagePublishingErrorHandler通过errorChannel,正如常规聚合器发布功能所期望的那样。相同的逻辑适用于组超时功能,该功能也依赖于TaskScheduler. 有关详细信息,请参阅错误处理

当共享的MessageStore用于不同的关联端点,您必须配置适当的CorrelationStrategy以确保组 ID 的唯一性。 否则,当一个关联终结点释放或使来自其他关联终结点的消息过期时,可能会发生意外行为。 具有相同关联键的消息存储在同一消息组中。spring-doc.cadn.net.cn

一些MessageStore实现允许通过对数据进行分区来使用相同的物理资源。 例如,JdbcMessageStore有一个region属性和MongoDbMessageStore有一个collectionName财产。spring-doc.cadn.net.cn

有关MessageStore接口及其实现,请参阅消息存储spring-doc.cadn.net.cn

通量聚合器

在 5.2 版本中,FluxAggregatorMessageHandler组件已被引入。 它基于反应堆项目Flux.groupBy()Flux.window()运营商。 传入消息将发送到FluxSinkFlux.create()在此组件的构造函数中。 如果outputChannel未提供或不是ReactiveStreamsSubscribableChannel,订阅主FluxLifecycle.start()实现。 否则,它将推迟到ReactiveStreamsSubscribableChannel实现。 消息按Flux.groupBy()使用CorrelationStrategy组键。 默认情况下,IntegrationMessageHeaderAccessor.CORRELATION_ID查阅邮件的标头。spring-doc.cadn.net.cn

默认情况下,每个关闭的窗口都作为Flux在要生成的消息的有效负载中。 此消息包含窗口中第一条消息的所有标头。 这Flux在输出消息中,有效负载必须订阅并在下游处理。 这样的逻辑可以由setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)配置选项的FluxAggregatorMessageHandler. 例如,如果我们想要一个List的有效负载,我们可以配置一个Flux.collectList()像下面这样:spring-doc.cadn.net.cn

fluxAggregatorMessageHandler.setCombineFunction(
                (messageFlux) ->
                        messageFlux
                                .map(Message::getPayload)
                                .collectList()
                                .map(GenericMessage::new));

中有几个选项FluxAggregatorMessageHandler要选择适当的窗口策略,请执行以下作:spring-doc.cadn.net.cn

  • setBoundaryTrigger(Predicate<Message<?>>)- 传播到Flux.windowUntil()算子。 有关更多信息,请参阅其 JavaDocs。 优先于所有其他窗口选项。spring-doc.cadn.net.cn

  • setWindowSize(int)setWindowSizeFunction(Function<Message<?>, Integer>)- 传播到Flux.window(int)windowTimeout(int, Duration). 默认情况下,窗口大小是根据组中的第一条消息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE页眉。spring-doc.cadn.net.cn

  • setWindowTimespan(Duration)- 传播到Flux.window(Duration)windowTimeout(int, Duration)取决于窗口大小配置。spring-doc.cadn.net.cn

  • setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)- 一个函数,用于将转换应用于公开选项未涵盖的任何自定义窗口作的分组通量。spring-doc.cadn.net.cn

由于此组件是MessageHandler实现它可以简单地用作@Bean定义与@ServiceActivator消息传递注释。 对于 Java DSL,它可以从.handle()EIP 方法。 下面的示例演示了我们如何注册IntegrationFlow在运行时,以及如何FluxAggregatorMessageHandler可以与上游的分路器相关联:spring-doc.cadn.net.cn

IntegrationFlow fluxFlow =
        (flow) -> flow
                .split()
                .channel(MessageChannels.flux())
                .handle(new FluxAggregatorMessageHandler());

IntegrationFlowContext.IntegrationFlowRegistration registration =
        this.integrationFlowContext.registration(fluxFlow)
                .register();

Flux<Message<?>> window =
        registration.getMessagingTemplate()
                .convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);

消息组上的条件

从 5.5 版本开始,AbstractCorrelatingMessageHandler(包括其 Java 和 XML DSL)公开了一个groupConditionSupplier选项的BiFunction<Message<?>, String, String>实现。 此功能用于添加到组的每条消息,并将结果条件句子存储到组中以供将来考虑。 这ReleaseStrategy可以查阅此条件,而不是迭代组中的所有消息。 看GroupConditionProviderJavaDocs 和 Message Group Condition 了解更多信息。spring-doc.cadn.net.cn

重序器

重排序器与聚合器相关,但用途不同。 当聚合器合并消息时,重排序器传递消息而不更改它们。spring-doc.cadn.net.cn

功能性

重排序器的工作方式与聚合器类似,因为它使用CORRELATION_ID以分组存储消息。 不同之处在于 Resequencer 不会以任何方式处理消息。 相反,它会按照其SEQUENCE_NUMBER标头值。spring-doc.cadn.net.cn

关于这一点,您可以选择一次释放所有消息(在整个序列之后,根据SEQUENCE_SIZE,以及其他可能性)或一旦有效序列可用。 (我们将在本章后面介绍“有效序列”的含义。spring-doc.cadn.net.cn

重排序器旨在对间隔较小的相对较短的消息序列进行重新排序。 如果有大量不相交的序列,并且存在许多间隙,则可能会遇到性能问题。

配置重排序器

请参阅聚合器和重排序器,了解在 Java DSL 中配置重排序器。spring-doc.cadn.net.cn

配置重排序器只需要在 XML 中包含适当的元素。spring-doc.cadn.net.cn

以下示例显示了重排序器配置:spring-doc.cadn.net.cn

<int:channel id="inputChannel"/>

<int:channel id="outputChannel"/>

<int:resequencer id="completelyDefinedResequencer"  (1)
  input-channel="inputChannel"  (2)
  output-channel="outputChannel"  (3)
  discard-channel="discardChannel"  (4)
  release-partial-sequences="true"  (5)
  message-store="messageStore"  (6)
  send-partial-result-on-expiry="true"  (7)
  send-timeout="86420000"  (8)
  correlation-strategy="correlationStrategyBean"  (9)
  correlation-strategy-method="correlate"  (10)
  correlation-strategy-expression="headers['something']"  (11)
  release-strategy="releaseStrategyBean"  (12)
  release-strategy-method="release"  (13)
  release-strategy-expression="size() == 10"  (14)
  empty-group-min-timeout="60000"  (15)

  lock-registry="lockRegistry"  (16)

  group-timeout="60000"  (17)
  group-timeout-expression="size() ge 2 ? 100 : -1"  (18)
  scheduler="taskScheduler" />  (19)
  expire-group-upon-timeout="false" />  (20)
1 重排序器的 id 是可选的。
2 重定序器的输入通道。 必填。
3 重排序器将重新排序的消息发送到的通道。 自选。
4 重排序器向其发送超时消息的通道(如果send-partial-result-on-timeout设置为false). 自选。
5 是发送有序序列在可用时立即发送,还是仅在整个消息组到达后发送。 自选。 (默认值为false.)
6 MessageGroupStore可用于将消息组存储在其相关键下,直到它们完成。 自选。 (默认值是易失性内存中存储。
7 在组到期时,是否应发送有序的组(即使缺少某些消息)。 自选。 (默认值为 false。 看在聚合器中管理状态:MessageGroupStore.
8 发送回复时等待的超时间隔Messageoutput-channeldiscard-channel. 仅当输出通道具有一些“发送”限制时才应用它,例如QueueChannel具有固定的“容量”。在这种情况下,一个MessageDeliveryException被抛出。 这send-timeout被忽略AbstractSubscribableChannel实现。 为group-timeout(-expression)MessageDeliveryException从计划过期任务中,将导致重新计划此任务。 自选。
9 对实现消息关联(分组)算法的 Bean 的引用。Bean 可以是CorrelationStrategy接口或 POJO。在后一种情况下,correlation-strategy-method属性也必须定义。 自选。 (默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID标头。
10 在引用的 bean 上定义的方法correlation-strategy实现相关决策算法。 可选,有限制(需要correlation-strategy在场)。
11 表示相关策略的 SpEL 表达式。 例:"headers['something']". 只有其中一个correlation-strategycorrelation-strategy-expression是允许的。
12 对实现发布策略的 Bean 的引用。 bean 可以是ReleaseStrategy接口或 POJO。在后一种情况下,release-strategy-method属性也必须定义。 可选(默认情况下,聚合器将使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZEheader 属性)。
13 在引用的 bean 上定义的方法release-strategy并实现完成决策算法。 可选,有限制(需要release-strategy在场)。
14 表示发布策略的 SpEL 表达式。 表达式的根对象是MessageGroup. 例:"size() == 5". 只有其中一个release-strategyrelease-strategy-expression是允许的。
15 仅当MessageGroupStoreReaper<resequencer> MessageStore. 默认情况下,当MessageGroupStoreReaper配置为使部分组过期,空组也会被删除。 组正常释放后存在空组。 这是为了启用延迟到达邮件的检测和丢弃。 如果您希望空组过期的时间比过期的部分组更长的时间,请设置此属性。 然后,空组不会从MessageStore直到它们至少在这个毫秒数内没有被修改。 请注意,空组过期的实际时间也受收割器的超时属性的影响,它可以与此值加上超时一样多。
16 请参阅使用 XML 配置聚合器
17 请参阅使用 XML 配置聚合器
18 请参阅使用 XML 配置聚合器
19 请参阅使用 XML 配置聚合器
20 默认情况下,当组由于超时(或MessageGroupStoreReaper),则将保留空组的元数据。 迟到的消息将立即丢弃。 将此设置为true以完全删除该组。 然后,迟到的邮件将启动一个新组,并且在组再次超时之前不会被丢弃。 由于序列范围内的“漏洞”导致超时,新组永远不会正常释放。 空组可以稍后使用MessageGroupStoreReaperempty-group-min-timeout属性。 从 5.0 版开始,空组也会计划在empty-group-min-timeout流逝。 默认值为“false”。
由于在重排序器的 Java 类中没有要实现的自定义行为,因此没有对它的注释支持。

消息处理程序链

MessageHandlerChainMessageHandler可以将其配置为单个消息端点,同时实际委托给其他处理程序链,例如过滤器、转换器、拆分器等。 当需要以固定的线性进展连接多个处理程序时,这可以导致更简单的配置。 例如,在其他组件之前提供转换器是相当常见的。 同样,当您在链中的其他组件之前提供过滤器时,您实际上创建了一个选择性消费者。 无论哪种情况,链条都只需要一个input-channel和单个output-channel,无需为每个单独的组件定义通道。spring-doc.cadn.net.cn

MessageHandlerChain主要为 XML 配置而设计。 对于 Java DSL,一个IntegrationFlow定义可以被视为一个链式组件,但它与下面本章中描述的概念和原则无关。 有关更多信息,请参阅 Java DSL
Spring Integration 的Filter提供布尔属性:throwExceptionOnRejection. 当您在同一点对点通道上提供具有不同验收条件的多个选择性使用者时,应将此值设置为“true”(默认值为false),以便调度程序知道消息已被拒绝,并因此尝试将消息传递给其他订阅者。 如果未引发异常,则调度程序将显示消息已成功传递,即使筛选器已删除消息以防止进一步处理。 如果您确实想“删除”消息,过滤器的“丢弃通道”可能很有用,因为它确实让您有机会对删除的消息执行一些作(例如将其发送到 JMS 队列或将其写入日志)。

处理程序链简化了配置,同时在内部保持组件之间相同程度的松散耦合,如果在某个时候需要非线性排列,则修改配置是微不足道的。spring-doc.cadn.net.cn

在内部,该链被扩展为列出的端点的线性设置,由匿名通道分隔。 链中不考虑回复通道标头。 只有在调用最后一个处理程序后,生成的消息才会转发到回复通道或链的输出通道。 由于此设置,除最后一个处理程序外,所有处理程序都必须实现MessageProducer接口(提供 'setOutputChannel()' 方法)。 如果outputChannelMessageHandlerChain设置时,最后一个处理程序只需要一个输出通道。spring-doc.cadn.net.cn

与其他端点一样,output-channel是可选的。 如果链的末尾有回复消息,则输出通道优先。 但是,如果它不可用,则链处理程序会检查入站消息上的回复通道标头作为回退。

在大多数情况下,您无需实现MessageHandler你自己。 下一节重点介绍对链元素的命名空间支持。 大多数 Spring Integration 端点(例如服务激活器和转换器)都适合在MessageHandlerChain.spring-doc.cadn.net.cn

配置链

<chain>元素提供了一个input-channel属性。 如果链中的最后一个元素能够生成回复消息(可选),它还支持output-channel属性。 然后,子元素是Filter、转换器、分路器和服务激活器。 最后一个元素也可以是路由器或出站通道适配器。 以下示例显示了链定义:spring-doc.cadn.net.cn

<int:chain input-channel="input" output-channel="output">
    <int:filter ref="someSelector" throw-exception-on-rejection="true"/>
    <int:header-enricher>
        <int:header name="thing1" value="thing2"/>
    </int:header-enricher>
    <int:service-activator ref="someService" method="someMethod"/>
</int:chain>

<header-enricher>前面示例中使用的元素设置名为thing1值为thing2在消息上。标头扩充器是Transformer仅涉及标头值。您可以通过实现MessageHandler这完成了标头修改并将其作为 bean 进行连接,但标头丰富器是一个更简单的选项。spring-doc.cadn.net.cn

<chain>可以配置为消息流的最后一个“封闭盒”使用者。对于此解决方案,您可以将其放在<链的末尾>一些<outbound-channel-adapter>,如以下示例所示:spring-doc.cadn.net.cn

<int:chain input-channel="input">
    <int-xml:marshalling-transformer marshaller="marshaller" result-type="StringResult" />
    <int:service-activator ref="someService" method="someMethod"/>
    <int:header-enricher>
        <int:header name="thing1" value="thing2"/>
    </int:header-enricher>
    <int:logging-channel-adapter level="INFO" log-full-message="true"/>
</int:chain>
不允许的属性和元素

某些属性,例如orderinput-channel不允许在链中使用的组件上指定。轮询器子元素也是如此。spring-doc.cadn.net.cn

对于 Spring Integration 核心组件,XML 模式本身强制执行其中一些约束。但是,对于非核心组件或您自己的自定义组件,这些约束由 XML 命名空间解析器强制执行,而不是由 XML 模式强制执行。spring-doc.cadn.net.cn

这些 XML 命名空间解析器约束是在 Spring Integration 2.2 中添加的。如果您尝试使用不允许的属性和元素,XML 命名空间解析器会抛出一个BeanDefinitionParsingException.spring-doc.cadn.net.cn

使用 'id' 属性

从 Spring Integration 3.0 开始,如果为链元素提供id属性,则元素的 bean 名称是链的idid元素本身。没有元素id属性未注册为 bean,但每个属性都被赋予一个componentName这包括链条id. 请考虑以下示例:spring-doc.cadn.net.cn

<int:chain id="somethingChain" input-channel="input">
    <int:service-activator id="somethingService" ref="someService" method="someMethod"/>
    <int:object-to-json-transformer/>
</int:chain>

在前面的示例中:spring-doc.cadn.net.cn

  • <chain>root 元素有一个id“某物链”。 因此,AbstractEndpoint实现 (PollingConsumerEventDrivenConsumer,具体取决于input-channeltype) bean 将此值作为其 bean 名称。spring-doc.cadn.net.cn

  • MessageHandlerChainbean 获取一个 bean 别名(“somethingChain.handler”),它允许从BeanFactory.spring-doc.cadn.net.cn

  • <service-activator>不是一个成熟的消息传递端点(它不是PollingConsumerEventDrivenConsumer). 这是一个MessageHandler<chain>. 在这种情况下,使用BeanFactory是 'somethingChain$child.somethingService.handler'。spring-doc.cadn.net.cn

  • componentNameServiceActivatingHandler采用相同的值,但没有 '.handler' 后缀。 它变为 'somethingChain$child.somethingService'。spring-doc.cadn.net.cn

  • 最后<chain>子组件,<object-to-json-transformer>,没有id属性。 其componentName基于它在<chain>. 在本例中,它是 'somethingChain$child#1'。 (名称的最后一个元素是链中的顺序,以“#0”开头)。 请注意,此转换器未在应用程序上下文中注册为 bean,因此它不会获得beanName. 然而,它的componentName具有可用于日志记录和其他目的的值。spring-doc.cadn.net.cn

id属性<chain>元素使它们有资格进行 JMX 导出,并且可以在消息历史记录中进行跟踪。 您可以从BeanFactory如前所述,使用适当的 bean 名称。spring-doc.cadn.net.cn

提供显式id属性<chain>元素来简化日志中子组件的识别,并提供从BeanFactory等。

从链中调用链

有时,您需要从链内对另一条链进行嵌套调用,然后返回并在原始链中继续执行。 为此,可以通过包含 <gateway> 元素来使用消息网关,如以下示例所示:spring-doc.cadn.net.cn

<int:chain id="main-chain" input-channel="in" output-channel="out">
    <int:header-enricher>
      <int:header name="name" value="Many" />
    </int:header-enricher>
    <int:service-activator>
      <bean class="org.foo.SampleService" />
    </int:service-activator>
    <int:gateway request-channel="inputA"/>
</int:chain>

<int:chain id="nested-chain-a" input-channel="inputA">
    <int:header-enricher>
        <int:header name="name" value="Moe" />
    </int:header-enricher>
    <int:gateway request-channel="inputB"/>
    <int:service-activator>
        <bean class="org.foo.SampleService" />
    </int:service-activator>
</int:chain>

<int:chain id="nested-chain-b" input-channel="inputB">
    <int:header-enricher>
        <int:header name="name" value="Jack" />
    </int:header-enricher>
    <int:service-activator>
        <bean class="org.foo.SampleService" />
    </int:service-activator>
</int:chain>

在前面的示例中,nested-chain-a在末尾调用main-chain由在那里配置的“网关”元素进行处理。 在nested-chain-a,调用nested-chain-b是在标头富集后制作的。 然后流返回以完成执行nested-chain-b. 最后,流返回到main-chain. 当<gateway>元素在链中定义,它不需要service-interface属性。 相反,它将消息置于当前状态,并将其放置在request-channel属性。 当该网关启动的下游流完成时,一个Message返回到网关并在当前链中继续其旅程。spring-doc.cadn.net.cn

分散-聚集

从版本 4.1 开始,Spring Integration 提供了分散-聚集企业集成模式的实现。 它是一个复合端点,其目标是向收件人发送消息并聚合结果。 正如企业集成模式中所述,它是诸如“最佳报价”等场景的组件,在这些场景中,我们需要向多个提供商请求信息,并决定哪一个提供商为我们提供所请求项目的最佳条款。spring-doc.cadn.net.cn

以前,可以使用离散组件来配置模式。 这一增强带来了更便捷的配置。spring-doc.cadn.net.cn

ScatterGatherHandler是一个请求-回复端点,它结合了PublishSubscribeChannel(或RecipientListRouter) 和AggregatingMessageHandler. 请求消息被发送到scatter通道,以及ScatterGatherHandler等待聚合器发送到outputChannel.spring-doc.cadn.net.cn

功能性

Scatter-Gather模式建议两种场景:“拍卖”和“分发”。 在这两种情况下,aggregation函数相同,并提供了所有可用于AggregatingMessageHandler. (实际上,ScatterGatherHandler只需要一个AggregatingMessageHandler作为构造函数参数。 有关详细信息,请参阅聚合器spring-doc.cadn.net.cn

拍卖

拍卖Scatter-Gathervariant 对请求消息使用“发布-订阅”逻辑,其中“分散”通道是PublishSubscribeChannelapply-sequence="true". 但是,此通道可以是任何MessageChannel实现(与request-channelContentEnricher— 参见内容丰富器)。 但是,在这种情况下,您应该创建自己的自定义correlationStrategy对于aggregation功能。spring-doc.cadn.net.cn

分配

分布Scatter-Gather变体基于RecipientListRouter(参见RecipientListRouter)以及RecipientListRouter. 这是第二个ScatterGatherHandlerconstructor 参数。 如果只想依赖默认值correlationStrategy对于recipient-list-routeraggregator,则应指定apply-sequence="true". 否则,您应该提供自定义correlationStrategy对于aggregator. 与PublishSubscribeChannelvariant(拍卖变体),具有recipient-list-router selector选项允许根据消息过滤目标提供商。 跟apply-sequence="true",默认值sequenceSize,并且aggregator可以正确释放组。 分发选项与拍卖选项是互斥的。spring-doc.cadn.net.cn

applySequence=true仅对于基于ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer)构造函数配置,因为框架不能改变外部提供的组件。 为方便起见,XML 和 Java DSLScatter-GatherapplySequence从 6.0 版开始设置为 true。

对于拍卖和分发变体,请求(分散)消息都使用gatherResultChannel标头来等待来自aggregator.spring-doc.cadn.net.cn

默认情况下,所有提供商都应将其结果发送到replyChannel标头(通常通过省略output-channel从最终端点)。 但是,gatherChannel选项,允许提供商将其回复发送到该渠道以进行聚合。spring-doc.cadn.net.cn

配置分散收集端点

以下示例显示了 Bean 定义的 Java 配置Scatter-Gather:spring-doc.cadn.net.cn

@Bean
public MessageHandler distributor() {
    RecipientListRouter router = new RecipientListRouter();
    router.setApplySequence(true);
    router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
            distributionChannel3()));
    return router;
}

@Bean
public MessageHandler gatherer() {
	return new AggregatingMessageHandler(
			new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
			new SimpleMessageStore(),
			new HeaderAttributeCorrelationStrategy(
			       IntegrationMessageHeaderAccessor.CORRELATION_ID),
			new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}

@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
	ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
	handler.setOutputChannel(output());
	return handler;
}

在前面的示例中,我们将RecipientListRouter distributorbean 与applySequence="true"以及收件人渠道列表。 下一个 bean 是针对AggregatingMessageHandler. 最后,我们将这两个 bean 注入到ScatterGatherHandlerbean 定义,并将其标记为@ServiceActivator将分散-聚集组件连接到集成流中。spring-doc.cadn.net.cn

以下示例显示如何配置<scatter-gather>endpoint 使用 XML 命名空间:spring-doc.cadn.net.cn

<scatter-gather
		id=""  (1)
		auto-startup=""  (2)
		input-channel=""  (3)
		output-channel=""  (4)
		scatter-channel=""  (5)
		gather-channel=""  (6)
		order=""  (7)
		phase=""  (8)
		send-timeout=""  (9)
		gather-timeout=""  (10)
		requires-reply="" > (11)
			<scatterer/>  (12)
			<gatherer/>  (13)
</scatter-gather>
1 终结点的 ID。 这ScatterGatherHandlerbean 的别名为id + '.handler'. 这RecipientListRouterbean 的别名为id + '.scatterer'. 这AggregatingMessageHandler`bean is registered with an alias of `id + '.gatherer'. 自选。 (这BeanFactory生成默认值id值。
2 生命周期属性指示是否应在应用程序上下文初始化期间启动终结点。 此外,ScatterGatherHandler还实现Lifecycle并启动和停止gatherEndpoint,如果gather-channel被提供。 自选。 (默认值为true.)
3 接收请求消息以在ScatterGatherHandler. 必填。
4 该通道的ScatterGatherHandler发送聚合结果。 自选。 (传入消息可以在replyChannelmessage header) 的 Message 标头)。
5 要向其发送拍卖方案的分散消息的通道。 自选。 与<scatterer>子元素。
6 用于接收每个提供商对聚合的回复的通道。它用作replyChannel标头。 自选。 默认情况下,FixedSubscriberChannel被创建。
7 当多个处理程序订阅相同的处理程序时,此组件的顺序DirectChannel(用于负载平衡目的)。 自选。
8 指定应启动和停止终结点的阶段。启动顺序从最低到最高,关机顺序从最高到最低。默认情况下,此值为Integer.MAX_VALUE,这意味着该容器尽可能晚地启动并尽快停止。 自选。
9 发送回复时等待的超时间隔Messageoutput-channel. 默认情况下,send()块一秒钟。 仅当输出通道有一些“发送”限制时,它才适用,例如,QueueChannel具有已满的固定“容量”。 在这种情况下,一个MessageDeliveryException被抛出。 这send-timeout被忽略AbstractSubscribableChannel实现。 为group-timeout(-expression)MessageDeliveryException从计划过期任务中,将导致重新计划此任务。 自选。
10 用于指定分散收集在返回之前等待回复消息的时间。 默认情况下,它等待30秒。 如果回复超时,则返回“null”。 自选。
11 指定分散收集是否必须返回非空值。 此值为true默认情况下。 因此,一个ReplyRequiredException当底层聚合器在gather-timeout. 请注意,如果null是一种可能性,则gather-timeout应指定以避免无限期等待。
12 <recipient-list-router>选项。 自选。 相互排斥scatter-channel属性。
13 <aggregator>选项。 必填。

错误处理

由于 Scatter-Gather 是一个多请求-回复组件,因此错误处理具有一些额外的复杂性。 在某些情况下,如果ReleaseStrategy允许以少于请求的回复完成流程。 在其他情况下,当发生错误时,应考虑从子流返回“补偿消息”之类的内容。spring-doc.cadn.net.cn

每个异步子流都应配置errorChannel标头,用于从MessagePublishingErrorHandler. 否则,将错误发送到全局errorChannel使用常见的错误处理逻辑。 有关异步错误处理的更多信息,请参阅错误处理spring-doc.cadn.net.cn

同步流可以使用ExpressionEvaluatingRequestHandlerAdvice忽略异常或返回补偿消息。 当异常从其中一个子流抛出到ScatterGatherHandler,它只是被重新抛向上游。 这样,所有其他子流都将毫无用处,并且它们的回复将在ScatterGatherHandler. 这有时可能是预期行为,但在大多数情况下,最好在不影响所有其他子流和收集器中的期望的情况下处理特定子流中的错误。spring-doc.cadn.net.cn

从 5.1.3 版本开始,ScatterGatherHandlererrorChannelName选择。 它填充到errorChannel标头,用于发生异步错误时,或者可用于常规同步子流中直接发送错误消息。spring-doc.cadn.net.cn

下面的示例配置演示了通过返回补偿消息来处理异步错误:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
    return f -> f
            .scatterGather(
                    scatterer -> scatterer
                            .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
                            .recipientFlow(f2 -> f2
                                    .channel(c -> c.executor(taskExecutor))
                                    .transform(p -> {
                                        throw new RuntimeException("Sub-flow#2");
                                    })),
                    null,
                    s -> s.errorChannel("scatterGatherErrorChannel"));
}

@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
    return MessageBuilder.withPayload(payload.getCause().getCause())
            .copyHeaders(payload.getFailedMessage().getHeaders())
            .build();
}

为了产生正确的回复,我们必须复制标题(包括replyChannelerrorChannel) 从failedMessageMessagingException已发送到scatterGatherErrorChannel通过MessagePublishingErrorHandler. 这样,目标异常将返回给ScatterGatherHandler用于回复消息组完成。这样的例外payload可以在MessageGroupProcessor收集器或以其他方式处理到下游,在分散收集端点之后。spring-doc.cadn.net.cn

在将散射结果发送给采集器之前,ScatterGatherHandler恢复请求消息标头,包括回复和错误通道(如果有)。这样,来自AggregatingMessageHandler将传播给调用方,即使在分散的收件人子流中应用了异步切换。为了成功作,一个gatherResultChannel,originalReplyChanneloriginalErrorChannel标头必须从分散的收件人子流传输回回复。在这种情况下,一个合理的、有限的gatherTimeout必须为ScatterGatherHandler. 否则,默认情况下,它将被阻止,等待收集者的回复。

线程屏障

有时,我们需要挂起消息流线程,直到发生其他异步事件。例如,考虑一个将消息发布到 RabbitMQ 的 HTTP 请求。我们可能希望在 RabbitMQ 代理发出已收到消息的确认之前不回复用户。spring-doc.cadn.net.cn

在 4.2 版本中,Spring Integration 引入了<barrier/>组件。底层MessageHandlerBarrierMessageHandler. 此类还实现MessageTriggerAction,其中消息传递给trigger()方法在handleRequestMessage()方法(如果存在)。spring-doc.cadn.net.cn

挂起的线程和触发线程通过调用CorrelationStrategy在消息上。 当消息发送到input-channel,线程最多悬挂requestTimeout毫秒,等待相应的触发消息。 默认关联策略使用IntegrationMessageHeaderAccessor.CORRELATION_ID页眉。 当触发器消息到达具有相同关联时,线程将被释放。 发送到output-channelAfter Release 是通过使用MessageGroupProcessor. 默认情况下,该消息是Collection<?>两个有效负载,并且标头是使用DefaultAggregatingMessageGroupProcessor.spring-doc.cadn.net.cn

如果trigger()方法首先被调用(或在主线程超时后),它最多会被挂起triggerTimeout等待挂起消息到达。 如果您不想挂起触发器线程,请考虑将触发器线程移交给TaskExecutor相反,使其线程被挂起。
在 5.4 之前,只有一个timeout选项,但在某些情况下,最好为这些作设置不同的超时。 因此requestTimeouttriggerTimeout已引入选项。

requires-reply属性确定如果挂起的线程在触发器消息到达之前超时时要执行的作。 默认情况下,它是false,这意味着端点返回null,流结束,线程返回给调用方。 什么时候true一个ReplyRequiredException被抛出。spring-doc.cadn.net.cn

您可以调用trigger()方法以编程方式(使用名称barrier.handler— 其中barrier是屏障端点的 bean 名称)。 或者,您可以配置<outbound-channel-adapter/>以触发释放。spring-doc.cadn.net.cn

只能挂起一个具有相同关联的线程。同一关联可以多次使用,但只能同时使用一次。如果第二个线程到达具有相同的关联,则会引发异常。

以下示例演示如何使用自定义标头进行关联:spring-doc.cadn.net.cn

Java
@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
    BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
    barrier.setOutputChannel(out());
    barrier.setDiscardChannel(lateTriggerChannel);
    return barrier;
}

@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
    return barrier::trigger;
}
XML
<int:barrier id="barrier1" input-channel="in" output-channel="out"
        correlation-strategy-expression="headers['myHeader']"
        output-processor="myOutputProcessor"
        discard-channel="lateTriggerChannel"
        timeout="10000">
</int:barrier>

<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />

根据哪个消息先到达,将消息发送到in或将消息发送到release最多等待 10 秒钟,直到另一条消息到达。当消息被释放时,outchannel 会发送一条消息,该消息结合了调用自定义MessageGroupProcessorbean,命名为myOutputProcessor. 如果主线程超时并且触发器稍后到达,您可以配置将延迟触发器发送到的丢弃通道。spring-doc.cadn.net.cn

有关此组件的示例,请参阅屏障示例应用程序spring-doc.cadn.net.cn