Java DSL

Java DSL

Spring Integration Java 配置和 DSL 提供了一组方便的构建器和一个流畅的 API,允许您从 Spring 配置 Spring Integration 消息流@Configuration类。spring-doc.cadn.net.cn

用于 Spring Integration 的 Java DSL 本质上是 Spring Integration 的门面。 DSL 提供了一种简单的方法,可以使用 fluent 将 Spring Integration Message Flows 嵌入到您的应用程序中Builderpattern 以及来自 Spring Framework 和 Spring Integration 的现有 Java 配置。 我们还使用并支持 lambda(在 Java 8 中可用)来进一步简化 Java 配置。spring-doc.cadn.net.cn

咖啡馆提供了使用 DSL 的一个很好的例子。spring-doc.cadn.net.cn

DSL 由IntegrationFlow流畅的 API(请参阅IntegrationFlowBuilder). 这会产生IntegrationFlow组件,该组件应注册为 Spring bean(通过使用@Bean注释)。 构建器模式用于将任意复杂的结构表示为可以接受 lambda 作为参数的方法的层次结构。spring-doc.cadn.net.cn

IntegrationFlowBuilder仅收集集成组件 (MessageChannel实例AbstractEndpoint实例,依此类推)在IntegrationFlowbean 用于通过IntegrationFlowBeanPostProcessor.spring-doc.cadn.net.cn

Java DSL 直接使用 Spring Integration 类,并绕过任何 XML 生成和解析。 然而,DSL 提供的不仅仅是 XML 之上的语法糖。 其最引人注目的功能之一是能够定义内联 lambda 来实现端点逻辑,无需外部类来实现自定义逻辑。 从某种意义上说,Spring Integration 对 Spring 表达式语言 (SpEL) 和内联脚本的支持解决了这个问题,但 lambda 更容易、更强大。spring-doc.cadn.net.cn

以下示例显示了如何使用 Java 配置进行 Spring 集成:spring-doc.cadn.net.cn

@Configuration
@EnableIntegration
public class MyConfiguration {

    @Bean
    public AtomicInteger integerSource() {
        return new AtomicInteger();
    }

    @Bean
    public IntegrationFlow myFlow() {
        return IntegrationFlow.fromSupplier(integerSource()::getAndIncrement,
                                         c -> c.poller(Pollers.fixedRate(100)))
                    .channel("inputChannel")
                    .filter((Integer p) -> p > 0)
                    .transform(Object::toString)
                    .channel(MessageChannels.queue())
                    .get();
    }
}

前面配置示例的结果是,它创建了ApplicationContext启动、Spring Integration 端点和消息通道。 Java 配置可用于替换和增强 XML 配置。 无需替换所有现有的 XML 配置即可使用 Java 配置。spring-doc.cadn.net.cn

DSL 基础知识

org.springframework.integration.dsl包包含IntegrationFlowBuilder前面提到的 API 和一些IntegrationComponentSpec实现,它们也是构建器,并提供流畅的 API 来配置具体端点。 这IntegrationFlowBuilder基础设施为基于消息的应用程序(例如通道、端点、轮询器和通道拦截器)提供常见的企业集成模式 (EIP)。spring-doc.cadn.net.cn

端点在 DSL 中表示为动词,以提高可读性。 以下列表包括常见的 DSL 方法名称和关联的 EIP 端点:spring-doc.cadn.net.cn

从概念上讲,集成过程是通过将这些端点组合成一个或多个消息流来构造的。 请注意,EIP 没有正式定义术语“消息流”,但将其视为使用众所周知的消息传递模式的工作单元是有用的。 DSL 提供了一个IntegrationFlow组件来定义通道和它们之间的端点的组合,但现在IntegrationFlow仅在应用程序上下文中填充实际 Bean 的配置角色,在运行时不使用。 但是,用于IntegrationFlow可以自动接线为Lifecycle控制start()stop()对于委托给与此关联的所有 Spring Integration 组件的整个流程IntegrationFlow. 以下示例使用IntegrationFlowfluent API 来定义IntegrationFlowbean 使用 EIP 方法IntegrationFlowBuilder:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlow.from("input")
            .<String, Integer>transform(Integer::parseInt)
            .get();
}

transform方法接受 lambda 作为端点参数来对消息有效负载进行作。 这个方法的真正参数是GenericTransformer<S, T>实例。 因此,任何提供的转换器 (ObjectToJsonTransformer,FileToStringTransformer,和其他)可以在此处使用。spring-doc.cadn.net.cn

在被窝里,IntegrationFlowBuilder识别MessageHandler和它的端点,使用MessageTransformingHandlerConsumerEndpointFactoryBean分别。 考虑另一个例子:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlow.from("input")
                .filter("World"::equals)
                .transform("Hello "::concat)
                .handle(System.out::println)
                .get();
}

前面的示例组成了Filter → Transformer → Service Activator. 流程是“'单向'”。 也就是说,它不提供回复消息,而仅将有效负载打印到 STDOUT。 使用直接通道自动将终结点连接在一起。spring-doc.cadn.net.cn

lambda 和Message<?>参数

在 EIP 方法中使用 lambda 时,“input”参数通常是消息有效负载。如果您希望访问整个消息,请使用其中一种重载方法,该方法采用Class<?>作为第一个参数。例如,这是行不通的:spring-doc.cadn.net.cn

.<Message<?>, Foo>transform(m -> newFooFromMessage(m))

这将在运行时失败,并显示ClassCastException因为 lambda 不保留参数类型,并且框架将尝试将有效负载转换为Message<?>.spring-doc.cadn.net.cn

相反,请使用:spring-doc.cadn.net.cn

.(Message.class, m -> newFooFromMessage(m))
Bean 定义覆盖

Java DSL 可以为流定义中内联定义的对象注册 Bean,也可以重用现有的注入 Bean。如果为内联对象和现有 Bean 定义定义了相同的 Bean 名称,则BeanDefinitionOverrideException被抛出,表明这样的配置是错误的。但是,当你处理prototypebean 时,无法从集成流处理器中检测到现有的 bean 定义,因为每次我们调用prototypebean 来自BeanFactory我们得到一个新实例。这样,提供的实例就会在IntegrationFlow没有任何 bean 注册和任何可能的对现有prototypebean 定义。 然而BeanFactory.initializeBean()如果该对象具有显式id并且此名称的 bean 定义位于prototype范围。spring-doc.cadn.net.cn

消息通道

除了IntegrationFlowBuilder使用 EIP 方法时,Java DSL 提供了一个流畅的 API 来配置MessageChannel实例。 为此,该MessageChannelsbuilder factory 提供了。以下示例展示了如何使用它:spring-doc.cadn.net.cn

@Bean
public MessageChannel priorityChannel() {
    return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
                        .interceptor(wireTap())
                        .get();
}

一样MessageChannelsbuilder factory 可用于channel()EIP 方法IntegrationFlowBuilder连接端点,类似于连接input-channel/output-channel配对。默认情况下,端点使用DirectChannelbean 名称基于以下模式的实例:[IntegrationFlow.beanName].channel#[channelNameIndex]. 此规则也适用于内联生成的未命名通道MessageChannels构建器工厂使用。但是,所有MessageChannels方法有一个变体,该变体知道channelId您可以使用它来设置MessageChannel实例。 这MessageChannel参考资料和beanName可以用作 bean-method 调用。以下示例显示了使用channel()EIP方式:spring-doc.cadn.net.cn

@Bean
public MessageChannel queueChannel() {
    return MessageChannels.queue().get();
}

@Bean
public MessageChannel publishSubscribe() {
    return MessageChannels.publishSubscribe().get();
}

@Bean
public IntegrationFlow channelFlow() {
    return IntegrationFlow.from("input")
                .fixedSubscriberChannel()
                .channel("queueChannel")
                .channel(publishSubscribe())
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor))
                .channel("output")
                .get();
}
  • from("input")意思是“'查找并使用MessageChannel使用“输入”ID,或创建一个“”。spring-doc.cadn.net.cn

  • fixedSubscriberChannel()生成一个FixedSubscriberChannel并使用channelFlow.channel#0.spring-doc.cadn.net.cn

  • channel("queueChannel")工作方式相同,但使用现有的queueChannel豆。spring-doc.cadn.net.cn

  • channel(publishSubscribe())是 bean-method 引用。spring-doc.cadn.net.cn

  • channel(MessageChannels.executor("executorChannel", this.taskExecutor))IntegrationFlowBuilder这暴露了IntegrationComponentSpecExecutorChannel并将其注册为executorChannel.spring-doc.cadn.net.cn

  • channel("output")注册DirectChannelbean 与output作为其名称,只要没有具有此名称的 bean 已经存在。spring-doc.cadn.net.cn

注意:前面的IntegrationFlow定义有效,并且其所有通道都应用于具有BridgeHandler实例。spring-doc.cadn.net.cn

小心通过MessageChannels来自不同工厂IntegrationFlow实例。 即使 DSL 解析器将不存在的对象注册为 bean,它也无法确定相同的对象 (MessageChannel) 来自不同的IntegrationFlow器皿。 以下示例是错误的:
@Bean
public IntegrationFlow startFlow() {
    return IntegrationFlow.from("input")
                .transform(...)
                .channel(MessageChannels.queue("queueChannel"))
                .get();
}

@Bean
public IntegrationFlow endFlow() {
    return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
                .handle(...)
                .get();
}

这个坏例子的结果是以下异常:spring-doc.cadn.net.cn

Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
     there is already object [queueChannel] bound
	    at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)

要使其正常工作,您需要声明@Bean并为该通道使用其来自不同IntegrationFlow实例。spring-doc.cadn.net.cn

轮询器

Spring Integration 还提供了一个流畅的 API,可让您将PollerMetadataAbstractPollingEndpoint实现。 您可以使用Pollers构建器工厂,用于配置公共 Bean 定义或从IntegrationFlowBuilderEIP 方法,如下例所示:spring-doc.cadn.net.cn

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
    return Pollers.fixedRate(500)
        .errorChannel("myErrors");
}

PollersPollerSpec在 Javadoc 中获取更多信息。spring-doc.cadn.net.cn

如果使用 DSL 构造PollerSpec作为@Bean,请勿调用get()方法。 这PollerSpec是一个FactoryBean这会生成PollerMetadata对象,并初始化其所有属性。

reactive()端点

从 5.5 版本开始,ConsumerEndpointSpec提供一个reactive()配置属性与可选的定制器Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>. 此选项将目标端点配置为ReactiveStreamsConsumer实例,独立于输入通道类型,该类型转换为Flux通过IntegrationReactiveUtils.messageChannelToFlux(). 提供的函数从Flux.transform()运算符来自定义 (publishOn(),log(),doOnNext()等)来自输入通道的反应流源。spring-doc.cadn.net.cn

以下示例演示如何将发布线程从输入通道更改为独立于最终订阅者和生产者DirectChannel:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow reactiveEndpointFlow() {
    return IntegrationFlow
            .from("inputChannel")
            .<String, Integer>transform(Integer::parseInt,
                    e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())))
            .get();
}

有关更多信息,请参阅响应式流支持spring-doc.cadn.net.cn

DSL 和终结点配置

IntegrationFlowBuilderEIP 方法有一个变体,它应用 lambda 参数来提供选项AbstractEndpoint实例:SmartLifecycle,PollerMetadata,request-handler-advice-chain,等。 它们中的每一个都有通用参数,因此它允许您配置端点,甚至它的MessageHandler在上下文中,如以下示例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow flow2() {
    return IntegrationFlow.from(this.inputChannel)
                .transform(new PayloadSerializingTransformer(),
                       c -> c.autoStartup(false).id("payloadSerializingTransformer"))
                .transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
                .get();
}

此外,EndpointSpec提供id()方法,让您使用给定的 Bean 名称而不是生成的 Bean 名称注册端点 Bean。spring-doc.cadn.net.cn

如果MessageHandler被引用为 bean,则任何现有的adviceChain如果.advice()方法存在于 DSL 定义中:spring-doc.cadn.net.cn

@Bean
public TcpOutboundGateway tcpOut() {
    TcpOutboundGateway gateway = new TcpOutboundGateway();
    gateway.setConnectionFactory(cf());
    gateway.setAdviceChain(Collections.singletonList(fooAdvice()));
    return gateway;
}

@Bean
public IntegrationFlow clientTcpFlow() {
    return f -> f
        .handle(tcpOut(), e -> e.advice(testAdvice()))
        .transform(Transformers.objectToString());
}

它们不会合并,只有testAdvice()在这种情况下使用 bean。spring-doc.cadn.net.cn

变形金刚

DSL API 提供了方便、流畅的Transformers工厂,用作.transform()EIP 方法。 以下示例演示如何使用它:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow transformFlow() {
    return IntegrationFlow.from("input")
            .transform(Transformers.fromJson(MyPojo.class))
            .transform(Transformers.serializer())
            .get();
}

它避免了使用 setter 进行不方便的编码,并使流定义更加简单。 请注意,您可以使用Transformers声明目标Transformer实例作为@Bean实例,并再次将它们从IntegrationFlow定义为 bean 方法。 尽管如此,DSL 解析器会处理内联对象的 bean 声明,如果它们尚未定义为 bean。spring-doc.cadn.net.cn

有关更多信息和支持的工厂方法,请参阅 Javadoc 中的 Transformersspring-doc.cadn.net.cn

入站通道适配器

通常,消息流从入站通道适配器(例如<int-jdbc:inbound-channel-adapter>). 适配器配置为<poller>,它会询问一个MessageSource<?>定期生成消息。 Java DSL 允许启动IntegrationFlowMessageSource<?>太。 为此,该IntegrationFlowFluent API 提供了一个重载的IntegrationFlow.from(MessageSource<?> messageSource)方法。 您可以配置MessageSource<?>作为 bean 并将其作为该方法的参数提供。 的第二个参数IntegrationFlow.from()是一个Consumer<SourcePollingChannelAdapterSpec>lambda 的 lambda 允许您提供选项(例如PollerMetadataSmartLifecycle) 的SourcePollingChannelAdapter. 以下示例演示如何使用 Fluent API 和 lambda 创建IntegrationFlow:spring-doc.cadn.net.cn

@Bean
public MessageSource<Object> jdbcMessageSource() {
    return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}

@Bean
public IntegrationFlow pollingFlow() {
    return IntegrationFlow.from(jdbcMessageSource(),
                c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
            .transform(Transformers.toJson())
            .channel("furtherProcessChannel")
            .get();
}

对于那些没有构建要求的情况Message对象,你可以使用IntegrationFlow.fromSupplier()基于java.util.function.Supplier. 结果Supplier.get()会自动包装在Message(如果它还不是Message).spring-doc.cadn.net.cn

消息路由器

Spring Integration 原生提供专门的路由器类型,包括:spring-doc.cadn.net.cn

与许多其他 DSL 一样IntegrationFlowBuilderEIP 方法,则route()方法可以应用任何AbstractMessageRouter实现,或者为方便起见,一个String作为 SpEL 表达式或ref-method双。 此外,您可以配置route()使用 lambda 并将 lambda 用于Consumer<RouterSpec<MethodInvokingRouter>>. Fluent API 还提供AbstractMappingMessageRouter选项,例如channelMapping(String key, String channelName)对,如以下示例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routeFlowByLambda() {
    return IntegrationFlow.from("routerInput")
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.suffix("Channel")
                            .channelMapping(true, "even")
                            .channelMapping(false, "odd")
            )
            .get();
}

以下示例显示了一个简单的基于表达式的路由器:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routeFlowByExpression() {
    return IntegrationFlow.from("routerInput")
            .route("headers['destChannel']")
            .get();
}

routeToRecipients()方法采用Consumer<RecipientListRouterSpec>,如以下示例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow recipientListFlow() {
    return IntegrationFlow.from("recipientListInput")
            .<String, String>transform(p -> p.replaceFirst("Payload", ""))
            .routeToRecipients(r -> r
                    .recipient("thing1-channel", "'thing1' == payload")
                    .recipientMessageSelector("thing2-channel", m ->
                            m.getHeaders().containsKey("recipient")
                                    && (boolean) m.getHeaders().get("recipient"))
                    .recipientFlow("'thing1' == payload or 'thing2' == payload or 'thing3' == payload",
                            f -> f.<String, String>transform(String::toUpperCase)
                                    .channel(c -> c.queue("recipientListSubFlow1Result")))
                    .recipientFlow((String p) -> p.startsWith("thing3"),
                            f -> f.transform("Hello "::concat)
                                    .channel(c -> c.queue("recipientListSubFlow2Result")))
                    .recipientFlow(new FunctionExpression<Message<?>>(m ->
                                    "thing3".equals(m.getPayload())),
                            f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
                    .defaultOutputToParentFlow())
            .get();
}

.defaultOutputToParentFlow().routeToRecipients()definition 允许您设置路由器的defaultOutput作为网关,以继续处理主流中不匹配的消息。spring-doc.cadn.net.cn

分配器

要创建拆分器,请使用split()EIP 方法。 默认情况下,如果有效负载是IterableIteratorArray一个Stream,或响应式Publishersplit()方法将每个项目输出为单独的消息。 它接受 lambda、SpEL 表达式或任何AbstractMessageSplitter实现。 或者,您可以在不带参数的情况下使用它来提供DefaultMessageSplitter. 以下示例演示如何使用split()方法,提供 lambda:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow splitFlow() {
    return IntegrationFlow.from("splitInput")
              .split(s -> s.applySequence(false).delimiters(","))
              .channel(MessageChannels.executor(taskExecutor()))
              .get();
}

前面的示例创建了一个拆分器,用于拆分包含逗号分隔的消息String.spring-doc.cadn.net.cn

聚合器和重排序器

Aggregator在概念上与Splitter. 它将一系列单个消息聚合到单个消息中,并且必然更加复杂。 默认情况下,聚合器返回一条消息,其中包含来自传入消息的有效负载集合。 相同的规则适用于Resequencer. 以下示例显示了拆分器聚合器模式的规范示例:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow splitAggregateFlow() {
    return IntegrationFlow.from("splitAggregateInput")
            .split()
            .channel(MessageChannels.executor(this.taskExecutor()))
            .resequence()
            .aggregate()
            .get();
}

split()方法将列表拆分为单独的消息,并将它们发送到ExecutorChannel. 这resequence()方法按消息头中的序列详细信息对消息重新排序。 这aggregate()方法收集这些消息。spring-doc.cadn.net.cn

但是,您可以通过指定发布策略和关联策略等来更改默认行为。 请考虑以下示例:spring-doc.cadn.net.cn

.aggregate(a ->
        a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
            .releaseStrategy(g -> g.size() > 10)
            .messageStore(messageStore()))

前面的示例关联了具有myCorrelationKey标头并在累积至少十个消息后释放消息。spring-doc.cadn.net.cn

resequence()EIP 方法。spring-doc.cadn.net.cn

服务激活器和.handle()方法

.handle()EIP 方法的目标是调用任何MessageHandler实现或某些 POJO 上的任何方法。 另一种选择是使用 lambda 表达式定义“活动”。 因此,我们引入了一个泛型GenericHandler<P>功能接口。 其handle方法需要两个参数:P payloadMessageHeaders headers(从 5.1 版开始)。 有了这个,我们可以定义一个流程,如下所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlow.from("flow3Input")
        .<Integer>handle((p, h) -> p * 2)
        .get();
}

前面的示例将它收到的任何整数加倍。spring-doc.cadn.net.cn

然而,Spring Integration 的一个主要目标是loose coupling,通过从消息有效负载到消息处理程序的目标参数的运行时类型转换。 由于 Java 不支持 lambda 类的泛型类型解析,因此我们引入了一种解决方法,其中包含一个额外的payloadType大多数 EIP 方法的参数和LambdaMessageProcessor. 这样做将艰苦的转换工作委托给 Spring 的ConversionService,它使用提供的type以及请求的消息到目标方法参数。 以下示例显示了生成的IntegrationFlow可能看起来像:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlow.from("input")
            .<byte[], String>transform(p - > new String(p, "UTF-8"))
            .handle(Integer.class, (p, h) -> p * 2)
            .get();
}

我们也可以注册一些BytesToIntegerConverterConversionService以摆脱额外的.transform():spring-doc.cadn.net.cn

@Bean
@IntegrationConverter
public BytesToIntegerConverter bytesToIntegerConverter() {
   return new BytesToIntegerConverter();
}

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlow.from("input")
             .handle(Integer.class, (p, h) -> p * 2)
            .get();
}

操作员网关 ()

gateway()运算符IntegrationFlowdefinition 是一个特殊的服务激活器实现,通过其输入通道调用其他端点或集成流并等待回复。 从技术上讲,它与嵌套的<gateway>组件中的组件<chain>定义(请参阅从链中调用链),并允许流更干净、更直接。 从逻辑上讲,从业务角度来看,它是一个消息传递网关,允许在目标集成解决方案的不同部分之间分发和重用功能(请参阅消息传递网关)。 此运算符针对不同的目标有多个重载:spring-doc.cadn.net.cn

  • gateway(String requestChannel)按名称向某个端点的输入通道发送消息;spring-doc.cadn.net.cn

  • gateway(MessageChannel requestChannel)通过直接注入将消息发送到某个端点的输入通道;spring-doc.cadn.net.cn

  • gateway(IntegrationFlow flow)向提供的输入通道发送消息IntegrationFlow.spring-doc.cadn.net.cn

所有这些都有第二个变体Consumer<GatewayEndpointSpec>参数来配置目标GatewayMessageHandler和各自的AbstractEndpoint. 此外,IntegrationFlow-based 方法允许调用现有的IntegrationFlowbean 或通过就地 lambda 将流声明为子流,以便IntegrationFlow函数式接口或将其提取到private方法 Cleaner 代码样式:spring-doc.cadn.net.cn

@Bean
IntegrationFlow someFlow() {
        return IntegrationFlow
                .from(...)
                .gateway(subFlow())
                .handle(...)
                .get();
}

private static IntegrationFlow subFlow() {
        return f -> f
                .scatterGather(s -> s.recipientFlow(...),
                        g -> g.outputProcessor(MessageGroup::getOne))
}
如果下游流并不总是返回回复,则应将requestTimeout设置为 0 以防止无限期挂起调用线程。 在这种情况将在该点结束,并释放线程以进行进一步工作。

运算符 log()

为方便起见,要通过 Spring Integration 流程(<logging-channel-adapter>)、一个log()运算符。 在内部,它由WireTap ChannelInterceptor使用LoggingHandler作为其订阅者。 它负责将传入消息记录到下一个端点或当前通道中。 以下示例演示如何使用LoggingHandler:spring-doc.cadn.net.cn

.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)

在前面的示例中,id标头记录在ERROR水平到test.category仅适用于通过筛选器和路由之前的邮件。spring-doc.cadn.net.cn

从版本 6.0 开始,此运算符在流末尾的行为与其在中间的用法保持一致。 换句话说,即使log()运算符被删除。 因此,如果预计不会在流结束时生成回复,则nullChannel()建议在最后一个之后使用log().spring-doc.cadn.net.cn

运算符 intercept()

从 5.3 版开始,intercept()运算符允许注册一个或多个ChannelInterceptor实例在当前MessageChannel在流动中。 这是创建显式MessageChannel通过MessageChannels应用程序接口。 以下示例使用MessageSelectingInterceptor要拒绝某些邮件并出现异常:spring-doc.cadn.net.cn

.transform(...)
.intercept(new MessageSelectingInterceptor(m -> m.getPayload().isValid()))
.handle(...)

MessageChannelSpec.wireTap()

Spring Integration 包括一个.wireTap()流畅的 APIMessageChannelSpec建设者。 以下示例演示如何使用wireTap记录输入的方法:spring-doc.cadn.net.cn

@Bean
public QueueChannelSpec myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input");
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}

如果MessageChannelInterceptableChannellog(),wireTap()intercept()运算符应用于当前MessageChannel. 否则,中间DirectChannel注入到当前配置的端点的流中。 在以下示例中,WireTapInterceptor 被添加到myChannel直接,因为DirectChannel实现InterceptableChannel:spring-doc.cadn.net.cn

@Bean
MessageChannel myChannel() {
    return new DirectChannel();
}

...
    .channel(myChannel())
    .log()
}

当当前MessageChannel不实现InterceptableChannel,隐式DirectChannelBridgeHandler被注入IntegrationFlowWireTap添加到这个新的DirectChannel. 以下示例没有任何通道声明:spring-doc.cadn.net.cn

.handle(...)
.log()
}

在前面的示例中(以及任何未声明通道时),隐式DirectChannel被注入到当前位置的IntegrationFlow并用作当前配置的ServiceActivatingHandler(从.handle()前面描述)。spring-doc.cadn.net.cn

使用消息流

IntegrationFlowBuilder提供了一个顶级 API 来生成连接到消息流的集成组件。 当您的集成可以通过单个流程完成时(通常情况下),这很方便。 交互IntegrationFlow实例可以通过以下方式加入MessageChannel实例。spring-doc.cadn.net.cn

默认情况下,MessageFlow在 Spring Integration 术语中表现为“链”。 也就是说,端点由DirectChannel实例。 消息流实际上并没有被构造为链,这提供了更大的灵活性。 例如,如果您知道流中的任何组件,则可以向流中的任何组件发送消息inputChannelname (也就是说,如果您显式定义了它)。 您还可以在流中引用外部定义的通道,以允许使用通道适配器(以启用远程传输协议、文件 I/O 等)而不是直接通道。 因此,DSL 不支持 Spring Integrationchain元素,因为它在这种情况下不会增加太多价值。spring-doc.cadn.net.cn

由于 Spring Integration Java DSL 生成与任何其他配置选项相同的 bean 定义模型,并且基于现有的 Spring Framework@Configuration基础设施,它可以与 XML 定义一起使用,并与 Spring Integration 消息传递注释配置连接。spring-doc.cadn.net.cn

您还可以定义直接IntegrationFlow实例。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow lambdaFlow() {
    return f -> f.filter("World"::equals)
                   .transform("Hello "::concat)
                   .handle(System.out::println);
}

此定义的结果是使用隐式直接通道连接的同一组集成组件。 这里唯一的限制是此流是从命名的直接通道启动的 -lambdaFlow.input. 此外,Lambda 流不能从MessageSourceMessageProducer.spring-doc.cadn.net.cn

从 5.1 版本开始,这种IntegrationFlow被包装到代理中,以公开生命周期控制并提供对inputChannel的内部关联StandardIntegrationFlow.spring-doc.cadn.net.cn

从 V5.0.6 开始,为IntegrationFlow包括 flow bean 后跟一个点 (.) 作为前缀。 例如,ConsumerEndpointFactoryBean对于.transform("Hello "::concat)在前面的示例中,导致 bean 名称为lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0. (这o.s.i是从org.springframework.integration以适合页面。 这Transformer该端点的实现 Bean 的 Bean 名称为lambdaFlow.transformer#0(从 5.1 版开始),其中不是MethodInvokingTransformerclass,则使用其组件类型。 相同的模式应用于所有NamedComponents 当必须在流中生成 Bean 名称时。 这些生成的 bean 名称前面加上流 ID,用于解析日志或在某些分析工具中将组件分组在一起等目的,以及避免在运行时同时注册集成流时出现竞争条件。 有关详细信息,请参阅动态和运行时集成流spring-doc.cadn.net.cn

FunctionExpression

我们引入了FunctionExpression类(SpEL 的Expression接口),让我们使用 lambda 和generics. 这Function<T, R>选项,以及expression选项,当存在隐式StrategyCore Spring Integration 的变体。 以下示例演示如何使用函数表达式:spring-doc.cadn.net.cn

.enrich(e -> e.requestChannel("enrichChannel")
            .requestPayload(Message::getPayload)
            .propertyFunction("date", m -> new Date()))

FunctionExpression还支持运行时类型转换,如SpelExpression.spring-doc.cadn.net.cn

子流支持

一些if…​elsepublish-subscribe组件提供了使用子流指定其逻辑或映射的能力。 最简单的示例是.publishSubscribeChannel(),如以下示例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow subscribersFlow() {
    return flow -> flow
            .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p / 2)
                            .channel(c -> c.queue("subscriber1Results")))
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p * 2)
                            .channel(c -> c.queue("subscriber2Results"))))
            .<Integer>handle((p, h) -> p * 3)
            .channel(c -> c.queue("subscriber3Results"));
}

您可以使用单独的IntegrationFlow @Bean定义,但我们希望您发现逻辑组合的子流风格有用。 我们发现它会导致代码更短(因此更具可读性)。spring-doc.cadn.net.cn

从 5.3 版开始,一个BroadcastCapableChannel-基于publishSubscribeChannel()提供了在代理支持的消息通道上配置子流订阅者的实现。 例如,我们现在可以将多个订阅者配置为Jms.publishSubscribeChannel():spring-doc.cadn.net.cn

@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
    return Jms.publishSubscribeChannel(jmsConnectionFactory())
                .destination("pubsub")
                .get();
}

@Bean
public IntegrationFlow pubSubFlow() {
    return f -> f
            .publishSubscribeChannel(jmsPublishSubscribeChannel(),
                    pubsub -> pubsub
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel1")))
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}

@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel(ConnectionFactory jmsConnectionFactory) {
    return (BroadcastCapableChannel) Jms.publishSubscribeChannel(jmsConnectionFactory)
            .destination("pubsub")
            .get();
}

类似的publish-subscribe子流组合提供了.routeToRecipients()方法。spring-doc.cadn.net.cn

另一个例子是使用.discardFlow()而不是.discardChannel().filter()方法。spring-doc.cadn.net.cn

.route()值得特别关注。 请考虑以下示例:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routeFlow() {
    return f -> f
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.channelMapping("true", "evenChannel")
                            .subFlowMapping("false", sf ->
                                    sf.<Integer>handle((p, h) -> p * 3)))
            .transform(Object::toString)
            .channel(c -> c.queue("oddChannel"));
}

.channelMapping()继续像在常规中一样工作Router映射,但.subFlowMapping()将该子流与主流联系起来。 换句话说,任何路由器的子流都会在.route().spring-doc.cadn.net.cn

有时,您需要参考现有的IntegrationFlow @Bean.subFlowMapping(). 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow splitRouteAggregate() {
    return f -> f
            .split()
            .<Integer, Boolean>route(o -> o % 2 == 0,
                    m -> m
                            .subFlowMapping(true, oddFlow())
                            .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
            .aggregate();
}

@Bean
public IntegrationFlow oddFlow() {
    return f -> f.handle(m -> System.out.println("odd"));
}

@Bean
public IntegrationFlow evenFlow() {
    return f -> f.handle((p, h) -> "even");
}


在这种情况下,当您需要从此类子流接收回复并继续主流时,此IntegrationFlowbean 引用(或其输入通道)必须用.gateway()如前面的示例所示。 这oddFlow()引用不会包装到.gateway(). 因此,我们不希望此路由分支回复。否则,您最终会遇到类似于以下内容的异常:spring-doc.cadn.net.cn

Caused by: org.springframework.beans.factory.BeanCreationException:
    The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c)
    is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'.
    This is the end of the integration flow.

将子流配置为 lambda 时,框架会处理与子流的请求-回复交互,并且不需要网关。spring-doc.cadn.net.cn

子流可以嵌套到任何深度,但我们不建议这样做。事实上,即使在路由器的情况下,在流中添加复杂的子流也会很快开始看起来像一盘意大利面,并且人类很难解析。spring-doc.cadn.net.cn

在 DSL 支持子流配置的情况下,当正在配置的组件通常需要通道时,并且该子流以channel()元素,框架隐式放置一个bridge()在组件输出通道和流的输入通道之间。例如,在此filter定义:spring-doc.cadn.net.cn

.filter(p -> p instanceof String, e -> e
	.discardFlow(df -> df
                         .channel(MessageChannels.queue())
                         ...)

框架在内部创建了一个DirectChannel用于注入MessageFilter.discardChannel. 然后它将子流包装成IntegrationFlow从订阅的这个隐式通道开始,并将一个bridgechannel()在流中指定。当现有的IntegrationFlowbean 用作子流引用(而不是内联子流,例如 lambda),不需要这样的桥接器,因为框架可以解析流 bean 中的第一个通道。对于内联子流,输入通道尚不可用。spring-doc.cadn.net.cn

使用协议适配器

到目前为止显示的所有示例都说明了 DSL 如何使用 Spring Integration 编程模型来支持消息传递架构。但是,我们还没有进行任何真正的集成。这样做需要通过 HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP 等访问远程资源或访问本地文件系统。Spring Integration 支持所有这些以及更多。理想情况下,DSL 应该为所有这些提供一流的支持,但实现所有这些并跟上新适配器添加到 Spring Integration 是一项艰巨的任务。因此,期望 DSL 不断赶上 Spring Integration。spring-doc.cadn.net.cn

因此,我们提供了高级 API 来无缝定义特定于协议的消息传递。我们使用工厂和构建器模式以及 lambda 来做到这一点。您可以将工厂类视为“命名空间工厂”,因为它们与特定于具体协议的 Spring Integration 模块中的组件的 XML 命名空间扮演相同的角色。目前,Spring Integration Java DSL 支持Amqp,Feed,Jms,Files,(S)Ftp,Http,JPA,MongoDb,TCP/UDP,Mail,WebFluxScripts命名空间工厂。以下示例显示如何使用其中三个 (Amqp,JmsMail):spring-doc.cadn.net.cn

@Bean
public IntegrationFlow amqpFlow() {
    return IntegrationFlow.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
            .transform("hello "::concat)
            .transform(String.class, String::toUpperCase)
            .get();
}

@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
    return IntegrationFlow.from("jmsOutboundGatewayChannel")
            .handle(Jms.outboundGateway(this.jmsConnectionFactory)
                        .replyContainer(c ->
                                    c.concurrentConsumers(3)
                                            .sessionTransacted(true))
                        .requestDestination("jmsPipelineTest"))
            .get();
}

@Bean
public IntegrationFlow sendMailFlow() {
    return IntegrationFlow.from("sendMailChannel")
            .handle(Mail.outboundAdapter("localhost")
                            .port(smtpPort)
                            .credentials("user", "pw")
                            .protocol("smtp")
                            .javaMailProperties(p -> p.put("mail.debug", "true")),
                    e -> e.id("sendMailEndpoint"))
            .get();
}

前面的示例演示了如何使用“命名空间工厂”作为内联适配器声明。 但是,您可以从@Bean定义,使IntegrationFlow方法链更具可读性。spring-doc.cadn.net.cn

在我们花精力在其他命名空间工厂之前,我们正在征求社区对这些命名空间工厂的反馈。 我们也感谢对我们接下来应该支持哪些适配器和网关的优先级提出任何意见。

您可以在本参考手册中特定于协议的章节中找到更多 Java DSL 示例。spring-doc.cadn.net.cn

所有其他协议通道适配器都可以配置为通用 Bean 并连接到IntegrationFlow,如以下示例所示:spring-doc.cadn.net.cn

@Bean
public QueueChannelSpec wrongMessagesChannel() {
    return MessageChannels
            .queue()
            .wireTap("wrongMessagesWireTapChannel");
}

@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
    return IntegrationFlow.from("inputChannel")
            .filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
                    e -> e.discardChannel(wrongMessagesChannel))
            .log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
            .route(xpathRouter(wrongMessagesChannel))
            .get();
}

@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
    XPathRouter router = new XPathRouter("local-name(/*)");
    router.setEvaluateAsString(true);
    router.setResolutionRequired(false);
    router.setDefaultOutputChannel(wrongMessagesChannel);
    router.setChannelMapping("Tags", "splittingChannel");
    router.setChannelMapping("Tag", "receivedChannel");
    return router;
}

IntegrationFlowAdapter

IntegrationFlow接口可以直接实现并指定为扫描的组件,如下例所示:spring-doc.cadn.net.cn

@Component
public class MyFlow implements IntegrationFlow {

    @Override
    public void configure(IntegrationFlowDefinition<?> f) {
        f.<String, String>transform(String::toUpperCase);
    }

}

它被IntegrationFlowBeanPostProcessor并在应用程序上下文中正确解析和注册。spring-doc.cadn.net.cn

为了方便并获得松散耦合架构的好处,我们提供了IntegrationFlowAdapter基类实现。 它需要一个buildFlow()方法实现以生成IntegrationFlowDefinition通过使用from()方法,如以下示例所示:spring-doc.cadn.net.cn

@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {

    private final AtomicBoolean invoked = new AtomicBoolean();

    public Date nextExecutionTime(TriggerContext triggerContext) {
          return this.invoked.getAndSet(true) ? null : new Date();
    }

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return from(this::messageSource,
                      e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
                 .split(this)
                 .transform(this)
                 .aggregate(a -> a.processor(this, null), null)
                 .enrichHeaders(Collections.singletonMap("thing1", "THING1"))
                 .filter(this)
                 .handle(this)
                 .channel(c -> c.queue("myFlowAdapterOutput"));
    }

    public String messageSource() {
         return "T,H,I,N,G,2";
    }

    @Splitter
    public String[] split(String payload) {
         return StringUtils.commaDelimitedListToStringArray(payload);
    }

    @Transformer
    public String transform(String payload) {
         return payload.toLowerCase();
    }

    @Aggregator
    public String aggregate(List<String> payloads) {
           return payloads.stream().collect(Collectors.joining());
    }

    @Filter
    public boolean filter(@Header Optional<String> thing1) {
            return thing1.isPresent();
    }

    @ServiceActivator
    public String handle(String payload, @Header String thing1) {
           return payload + ":" + thing1;
    }

}

动态和运行时集成流

IntegrationFlow并且它的所有依赖组件都可以在运行时注册。 在 5.0 版本之前,我们使用BeanFactory.registerSingleton()钩。 从 Spring 框架开始5.0,我们使用instanceSupplier用于程序化的钩子BeanDefinition注册。 以下示例显示了如何以编程方式注册 bean:spring-doc.cadn.net.cn

BeanDefinition beanDefinition =
         BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
               .getRawBeanDefinition();

((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);

请注意,在前面的示例中,instanceSupplierhook 是genericBeanDefinition方法,在本例中由 lambda 提供。spring-doc.cadn.net.cn

所有必要的 Bean 初始化和生命周期都是自动完成的,就像标准上下文配置 Bean 定义一样。spring-doc.cadn.net.cn

为了简化开发体验,Spring Integration 引入了IntegrationFlowContext注册和管理IntegrationFlow实例,如以下示例所示:spring-doc.cadn.net.cn

@Autowired
private AbstractServerConnectionFactory server1;

@Autowired
private IntegrationFlowContext flowContext;

...

@Test
public void testTcpGateways() {
    TestingUtilities.waitListening(this.server1, null);

    IntegrationFlow flow = f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client1"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());

    IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
    assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}

当我们有多个配置选项并且必须创建多个类似流的实例时,这很有用。 为此,我们可以迭代我们的选项并创建和注册IntegrationFlow循环中的实例。 另一种变体是当我们的数据源不是基于 Spring 时,因此我们必须动态创建它。 此类示例是响应式流事件源,如以下示例所示:spring-doc.cadn.net.cn

Flux<Message<?>> messageFlux =
    Flux.just("1,2,3,4")
        .map(v -> v.split(","))
        .flatMapIterable(Arrays::asList)
        .map(Integer::parseInt)
        .map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
    IntegrationFlow.from(messageFlux)
        .<Integer, Integer>transform(p -> p * 2)
        .channel(resultChannel)
        .get();

this.integrationFlowContext.registration(integrationFlow)
            .register();

IntegrationFlowRegistrationBuilder(由于IntegrationFlowContext.registration()) 可用于为IntegrationFlow注册,控制其autoStartup,并注册非 Spring Integration Bean。 通常,这些附加 bean 是连接工厂(AMQP、JMS、(S)FTP、TCP/UDP 等)、序列化器和解序列化器,或任何其他所需的支持组件。spring-doc.cadn.net.cn

您可以使用IntegrationFlowRegistration.destroy()回调以删除动态注册的IntegrationFlow以及它的所有依赖 bean(当您不再需要它们时)。 请参阅IntegrationFlowContextJavadoc了解更多信息。spring-doc.cadn.net.cn

从 5.0.6 版开始,所有生成的 bean 名称在IntegrationFlow定义的前缀是流 ID 作为前缀。 我们建议始终指定显式流 ID。 否则,同步屏障将在IntegrationFlowContext,以生成IntegrationFlow并注册其 bean。 我们在这两个作上同步,以避免当生成的相同 bean 名称可用于不同的 bean 名称时出现竞争条件IntegrationFlow实例。

此外,从版本 5.0.6 开始,注册生成器 API 有一个新方法:useFlowIdAsPrefix(). 如果您希望声明同一流的多个实例并避免在流中的组件具有相同 ID 时发生 Bean 名称冲突,这很有用,如以下示例所示:spring-doc.cadn.net.cn

private void registerFlows() {
    IntegrationFlowRegistration flow1 =
              this.flowContext.registration(buildFlow(1234))
                    .id("tcp1")
                    .useFlowIdAsPrefix()
                    .register();

    IntegrationFlowRegistration flow2 =
              this.flowContext.registration(buildFlow(1235))
                    .id("tcp2")
                    .useFlowIdAsPrefix()
                    .register();
}

private IntegrationFlow buildFlow(int port) {
    return f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());
}

在这种情况下,可以使用 bean 名称tcp1.client.handler.spring-doc.cadn.net.cn

id属性是必需的。useFlowIdAsPrefix().

IntegrationFlow作为网关

IntegrationFlow可以从提供GatewayProxyFactoryBean组件,如以下示例所示:spring-doc.cadn.net.cn

public interface ControlBusGateway {

    void send(String command);
}

...

@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlow.from(ControlBusGateway.class)
            .controlBus()
            .get();
}

接口方法的所有代理都随通道一起提供,用于将消息发送到下一个集成组件IntegrationFlow. 您可以使用@MessagingGateway注释,并使用@Gateway附注。 尽管如此,requestChannel被忽略并被该内部通道覆盖,用于IntegrationFlow. 否则,使用IntegrationFlow没有意义。spring-doc.cadn.net.cn

默认情况下,一个GatewayProxyFactoryBean获取常规 bean 名称,例如[FLOW_BEAN_NAME.gateway]. 您可以使用@MessagingGateway.name()属性或重载的IntegrationFlow.from(Class<?> serviceInterface, Consumer<GatewayProxySpec> endpointConfigurer)工厂方法。 此外,来自@MessagingGateway接口上的注释应用于目标GatewayProxyFactoryBean. 当注释配置不适用时,Consumer<GatewayProxySpec>variant 可用于为目标代理提供适当的选项。 此 DSL 方法从 5.2 版开始可用。spring-doc.cadn.net.cn

在 Java 8 中,您甚至可以使用java.util.function接口,如以下示例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow errorRecovererFlow() {
    return IntegrationFlow.from(Function.class, (gateway) -> gateway.beanName("errorRecovererFunction"))
            .<Object>handle((p, h) -> {
                throw new RuntimeException("intentional");
            }, e -> e.advice(retryAdvice()))
            .get();
}

errorRecovererFlow可以这样使用:spring-doc.cadn.net.cn

@Autowired
@Qualifier("errorRecovererFunction")
private Function<String, String> errorRecovererFlowGateway;

DSL 扩展

从 5.3 版开始,IntegrationFlowExtension已引入以允许使用自定义或组合的 EIP 运算符扩展现有的 Java DSL。所需要的只是该类的扩展,它提供了可用于IntegrationFlowbean 定义。扩展类也可用于自定义IntegrationComponentSpec配置; 例如,可以在现有的IntegrationComponentSpec外延。 下面的示例演示了复合自定义运算符和AggregatorSpec默认自定义的扩展outputProcessor:spring-doc.cadn.net.cn

public class CustomIntegrationFlowDefinition
        extends IntegrationFlowExtension<CustomIntegrationFlowDefinition> {

    public CustomIntegrationFlowDefinition upperCaseAfterSplit() {
        return split()
                .transform("payload.toUpperCase()");
    }

    public CustomIntegrationFlowDefinition customAggregate(Consumer<CustomAggregatorSpec> aggregator) {
        return register(new CustomAggregatorSpec(), aggregator);
    }

}

public class CustomAggregatorSpec extends AggregatorSpec {

    CustomAggregatorSpec() {
        outputProcessor(group ->
                group.getMessages()
                        .stream()
                        .map(Message::getPayload)
                        .map(String.class::cast)
                        .collect(Collectors.joining(", ")));
    }

}

对于方法链流,这些扩展中的新 DSL 运算符必须返回扩展类。这样,目标IntegrationFlowdefinition 将适用于新的和现有的 DSL 运算符:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow customFlowDefinition() {
    return
            new CustomIntegrationFlowDefinition()
                    .log()
                    .upperCaseAfterSplit()
                    .channel("innerChannel")
                    .customAggregate(customAggregatorSpec ->
                            customAggregatorSpec.expireGroupsUponCompletion(true))
                    .logAndReply();
}

集成流组合

使用MessageChannel抽象作为一等公民在 Spring Integration 中,集成流的组成始终被假设。流中任何端点的输入通道都可用于从任何其他端点发送消息,而不仅仅是从具有此通道作为输出的端点发送消息。此外,使用@MessagingGatewaycontract、Content Enricher 组件、复合端点(如<chain>,现在使用IntegrationFlowBeans(例如IntegrationFlowAdapter),在较短的、可重用的部分之间分配业务逻辑非常简单。最终组合所需要的只是有关MessageChannel发送到或接收。spring-doc.cadn.net.cn

从版本开始5.5.4,从中抽象出更多内容MessageChannel并对最终用户隐藏实现详细信息,IntegrationFlow引入了from(IntegrationFlow)工厂方法允许启动电流IntegrationFlow从现有流的输出中:spring-doc.cadn.net.cn

@Bean
IntegrationFlow templateSourceFlow() {
    return IntegrationFlow.fromSupplier(() -> "test data")
            .channel("sourceChannel")
            .get();
}

@Bean
IntegrationFlow compositionMainFlow(IntegrationFlow templateSourceFlow) {
    return IntegrationFlow.from(templateSourceFlow)
            .<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("compositionMainFlowResult"))
            .get();
}

另一方面,IntegrationFlowDefinition添加了一个to(IntegrationFlow)终端运算符在其他流的输入通道上继续电流:spring-doc.cadn.net.cn

@Bean
IntegrationFlow mainFlow(IntegrationFlow otherFlow) {
    return f -> f
            .<String, String>transform(String::toUpperCase)
            .to(otherFlow);
}

@Bean
IntegrationFlow otherFlow() {
    return f -> f
            .<String, String>transform(p -> p + " from other flow")
            .channel(c -> c.queue("otherFlowResultChannel"));
}

流中间的组成可以通过现有的gateway(IntegrationFlow)EIP 方法。这样,我们可以通过从更简单、可重用的逻辑块组合流来构建任何复杂的流。例如,您可以添加一个IntegrationFlowbean 作为依赖项,只需将其配置类导入到最终项目并为您的自动连接即可IntegrationFlow定义。spring-doc.cadn.net.cn