Java DSL

Java DSL

Spring Integration Java 配置和 DSL 提供了一组便捷的构建器和流畅的 API,使您能够从 Spring @Configuration 类配置 Spring Integration 消息流。spring-doc.cadn.net.cn

Spring Integration 的 Java DSL 本质上是 Spring Integration 的门面。 该 DSL 提供了一种简单的方法,通过使用流式 Builder 模式以及现有的 Spring Framework 和 Spring Integration Java 配置,将 Spring Integration 消息流嵌入到您的应用程序中。 我们还使用并支持 Lambda(需 Java 8 及以上版本),以进一步简化 Java 配置。spring-doc.cadn.net.cn

The cafe 提供了使用 DSL 的一个良好示例。spring-doc.cadn.net.cn

该 DSL 由 IntegrationFlow 流畅 API 提供(参见 IntegrationFlowBuilder)。 这会生成 IntegrationFlow 组件,应将其注册为 Spring Bean(通过使用 @Bean 注解)。 构建器模式用于将任意复杂的结构表达为可接受 Lambda 表达式作为参数的方法层级。spring-doc.cadn.net.cn

The IntegrationFlowBuilder 仅收集集成组件(MessageChannel 实例、AbstractEndpoint 实例等)到 IntegrationFlow Bean 中,以便由 IntegrationFlowBeanPostProcessor 进一步解析并在应用上下文中注册具体的 Bean。spring-doc.cadn.net.cn

Java DSL 直接使用 Spring Integration 类,绕过了任何 XML 的生成和解析。 然而,DSL 提供的不仅仅是基于 XML 的语法糖。 其最引人注目的功能之一是能够定义内联 Lambda 表达式来实现端点逻辑,从而消除了为自定义逻辑创建外部类的需求。 从某种意义上说,Spring Integration 对 Spring 表达式语言 (SpEL) 和内联脚本的支持可以解决这一问题,但 Lambda 表达式更简单且功能强大得多。spring-doc.cadn.net.cn

以下示例展示了如何在 Spring Integration 中使用 Java 配置:spring-doc.cadn.net.cn

@Configuration
@EnableIntegration
public class MyConfiguration {

    @Bean
    public AtomicInteger integerSource() {
        return new AtomicInteger();
    }

    @Bean
    public IntegrationFlow myFlow(AtomicInteger integerSource) {
        return IntegrationFlow.fromSupplier(integerSource::getAndIncrement,
                                         c -> c.poller(Pollers.fixedRate(100)))
                    .channel("inputChannel")
                    .filter((Integer p) -> p > 0)
                    .transform(Object::toString)
                    .channel(MessageChannels.queue())
                    .get();
    }
}

前述配置示例的结果是,在ApplicationContext次启动后,创建 Spring Integration 端点和消息通道。 Java 配置既可用于替换 XML 配置,也可用于增强 XML 配置。 您无需替换所有现有的 XML 配置即可使用 Java 配置。spring-doc.cadn.net.cn

DSL 基础

org.springframework.integration.dsl 包包含前面提到的 IntegrationFlowBuilder API 以及多个 IntegrationComponentSpec 实现,这些实现同时也是构建器,并提供用于配置具体端点的流畅 API。 IntegrationFlowBuilder 基础设施为基于消息的应用程序提供通用的 企业集成模式(EIP),例如通道、端点、轮询器和通道拦截器。spring-doc.cadn.net.cn

重要

IntegrationComponentSpec 是一个 FactoryBean 实现,因此其 getObject() 方法不得从 Bean 定义中调用。 IntegrationComponentSpec 实现必须保持原样以用于 Bean 定义,框架将管理其生命周期。 对于目标 IntegrationComponentSpec 类型(即 FactoryBean 值),Bean 定义应使用 Bean 方法参数注入,而不是 Bean 方法引用。spring-doc.cadn.net.cn

在 DSL 中,端点使用动词表示以提高可读性。 以下列表包含常见的 DSL 方法名称及其关联的 EIP 端点:spring-doc.cadn.net.cn

从概念上讲,集成流程是通过将这些端点组合成一个或多个消息流来构建的。 请注意,EIP 并未正式定义“消息流”这一术语,但将其视为使用已知消息模式的独立工作单元是有益的。 该 DSL 提供了一个 IntegrationFlow 组件用于定义通道及其之间端点的组合,但现在 IntegrationFlow 仅扮演配置角色,用于在应用上下文中填充实际的 Bean,而在运行时并不使用。 然而,IntegrationFlow 的 Bean 可以自动装配为 Lifecycle,以控制整个流程中的 start()stop(),该流程将委托给与此 IntegrationFlow 关联的所有 Spring Integration 组件。 以下示例使用 IntegrationFlow 流畅 API,通过来自 IntegrationFlowBuilder 的 EIP 方法定义一个 IntegrationFlow Bean:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlow.from("input")
            .<String, Integer>transform(Integer::parseInt)
            .get();
}

transform 方法接受一个 lambda 表达式作为端点参数,用于操作消息负载。 该方法的实际参数是一个 GenericTransformer<S, T> 实例。 因此,此处可以使用任何提供的转换器(ObjectToJsonTransformerFileToStringTransformer 等)。spring-doc.cadn.net.cn

在底层,IntegrationFlowBuilder 识别 MessageHandler 及其对应的端点,分别通过 MessageTransformingHandlerConsumerEndpointFactoryBean。 考虑另一个示例:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlow.from("input")
                .filter("World"::equals)
                .transform("Hello "::concat)
                .handle(System.out::println)
                .get();
}

上述示例组合了一个 Filter → Transformer → Service Activator 序列。 该流程是“单向”的。 也就是说,它不提供回复消息,仅将有效载荷打印到 STDOUT。 端点通过使用直接通道自动连接在一起。spring-doc.cadn.net.cn

Lambda 表达式和 Message<?> 个参数

在使用 EIP 方法中的 lambda 表达式时,"input" 参数通常指消息负载。 如果您希望访问整个消息,请使用接受 Class<?> 作为第一个参数的重载方法之一。 例如,以下代码将无法正常工作:spring-doc.cadn.net.cn

.<Message<?>, Foo>transform(m -> newFooFromMessage(m))

由于 lambda 表达式未保留参数类型,且框架将尝试将负载强制转换为 Message<?>,这将在运行时以 ClassCastException 失败。spring-doc.cadn.net.cn

相反,请使用:spring-doc.cadn.net.cn

.(Message.class, m -> newFooFromMessage(m))
Bean 定义覆盖

Java DSL 可以为流定义中内联定义的对象注册 Bean,也可以重用现有的注入的 Bean。 如果为内联对象和现有 Bean 定义定义了相同的 Bean 名称,将抛出 BeanDefinitionOverrideException,表明此类配置是错误的。 然而,当您处理 prototype Bean 时,集成流处理器无法检测到现有的 Bean 定义,因为每次从 BeanFactory 调用 prototype Bean 时,我们都会获得一个新实例。 因此,提供的实例会直接在 IntegrationFlow 中使用,而无需进行任何 Bean 注册,也不会对现有的 prototype Bean 定义进行任何可能的检查。 然而,如果该对象具有显式的 id,且该名称的 Bean 定义位于 prototype 作用域中,则会为此对象调用 BeanFactory.initializeBean()spring-doc.cadn.net.cn

消息通道

除了使用 EIP 方法的 IntegrationFlowBuilder 之外,Java DSL 还提供了流畅的 API 来配置 MessageChannel 实例。 为此,提供了 MessageChannels 构建器工厂。 以下示例展示了如何使用它:spring-doc.cadn.net.cn

@Bean
public PriorityChannelSpec priorityChannel() {
    return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
                        .interceptor(wireTap());
}

相同的 MessageChannels 构建器工厂可用于来自 IntegrationFlowBuilderchannel() EIP 方法以连接端点,类似于在 XML 配置中连接 input-channel/output-channel 对。 默认情况下,端点使用 DirectChannel 实例进行连接,其中 Bean 名称基于以下模式:[IntegrationFlow.beanName].channel#[channelNameIndex]。 此规则也适用于由内联 MessageChannels 构建器工厂用法生成的未命名通道。 然而,所有 MessageChannels 方法都有一个变体,能够感知 channelId,您可以使用它来为 MessageChannel 实例设置 Bean 名称。 MessageChannel 引用和 beanName 可用作 Bean 方法调用。 以下示例展示了使用 channel() EIP 方法的几种可能方式:spring-doc.cadn.net.cn

@Bean
public QueueChannelSpec queueChannel() {
    return MessageChannels.queue();
}

@Bean
public PublishSubscribeChannelSpec<?> publishSubscribe() {
    return MessageChannels.publishSubscribe();
}

@Bean
public IntegrationFlow channelFlow() {
    return IntegrationFlow.from("input")
                .fixedSubscriberChannel()
                .channel("queueChannel")
                .channel(publishSubscribe())
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor))
                .channel("output")
                .get();
}
  • from("input") 表示“查找并使用 id 为 MessageChannel 的‘输入’元素,或者创建一个”。spring-doc.cadn.net.cn

  • fixedSubscriberChannel() 会生成一个 FixedSubscriberChannel 的实例,并以名称 channelFlow.channel#0 将其注册。spring-doc.cadn.net.cn

  • channel("queueChannel") 工作方式相同,但使用现有的 queueChannel Bean。spring-doc.cadn.net.cn

  • channel(publishSubscribe()) 是 bean 方法的引用。spring-doc.cadn.net.cn

  • channel(MessageChannels.executor("executorChannel", this.taskExecutor))IntegrationFlowBuilder,它将 IntegrationComponentSpec 暴露给 ExecutorChannel 并将其注册为 executorChannelspring-doc.cadn.net.cn

  • channel("output")DirectChannel Bean 注册为名称 output,前提是尚不存在具有该名称的 Bean。spring-doc.cadn.net.cn

注意:前述 IntegrationFlow 定义是有效的,其所有通道均应用于具有 BridgeHandler 个实例的端点。spring-doc.cadn.net.cn

请小心,在使用来自不同 IntegrationFlow 实例的 MessageChannels 工厂时,应确保使用相同的内联通道定义。 即使 DSL 解析器将不存在的对象注册为 Bean,它也无法从不同的 IntegrationFlow 容器中识别出同一个对象(MessageChannel)。 以下示例是错误的:
@Bean
public IntegrationFlow startFlow() {
    return IntegrationFlow.from("input")
                .transform(...)
                .channel(MessageChannels.queue("queueChannel"))
                .get();
}

@Bean
public IntegrationFlow endFlow() {
    return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
                .handle(...)
                .get();
}

该错误示例的结果是以下异常:spring-doc.cadn.net.cn

Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
     there is already object [queueChannel] bound
	    at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)

要使其生效,您需要为该通道声明 @Bean,并在不同的 IntegrationFlow 实例中使用其 Bean 方法。spring-doc.cadn.net.cn

轮询器

Spring Integration 还提供了一种流畅的 API,允许您为 PollerMetadata 实现配置 AbstractPollingEndpoint。 您可以使用 Pollers 构建器工厂来配置通用的 bean 定义或从 IntegrationFlowBuilder EIP 方法创建的 bean 定义,如下示例所示:spring-doc.cadn.net.cn

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
    return Pollers.fixedRate(500)
        .errorChannel("myErrors");
}

请参阅 Javadoc 中的 PollersPollerSpec 以获取更多信息。spring-doc.cadn.net.cn

如果您使用 DSL 构建一个 PollerSpec 作为 @Bean,请不要在 bean 定义中调用 getObject() 方法。 PollerSpec 是一个 FactoryBean,它根据规范生成 PollerMetadata 对象并初始化其所有属性。

reactive()端点

从版本 5.5 开始,ConsumerEndpointSpec 提供了一个带有可选自定义器 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>reactive() 配置属性。 此选项将目标端点配置为 ReactiveStreamsConsumer 实例,独立于输入通道类型,该类型通过 IntegrationReactiveUtils.messageChannelToFlux() 转换为 Flux。 提供的函数用于从 Flux.transform() 操作符中自定义来自输入通道的响应式流源(例如 publishOn()log()doOnNext() 等)。spring-doc.cadn.net.cn

以下示例演示了如何独立于最终订阅者和生产者,将输入通道的发布线程更改为 DirectChannelspring-doc.cadn.net.cn

@Bean
public IntegrationFlow reactiveEndpointFlow() {
    return IntegrationFlow
            .from("inputChannel")
            .<String, Integer>transform(Integer::parseInt,
                    e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())))
            .get();
}

有关更多信息,请参阅 响应式流支持spring-doc.cadn.net.cn

DSL 和端点配置

所有 IntegrationFlowBuilder EIP 方法都有一个变体,可将 lambda 参数应用于为 AbstractEndpoint 实例提供选项:SmartLifecyclePollerMetadatarequest-handler-advice-chain 等。 它们都具有泛型参数,因此您可以在上下文中配置端点甚至其 MessageHandler,如下示例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow flow2() {
    return IntegrationFlow.from(this.inputChannel)
                .transform(new PayloadSerializingTransformer(),
                       c -> c.autoStartup(false).id("payloadSerializingTransformer"))
                .transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
                .get();
}

此外,EndpointSpec 提供了一个 id() 方法,允许您使用指定的 Bean 名称注册端点 Bean,而不是使用生成的名称。spring-doc.cadn.net.cn

如果 MessageHandler 被引用为 Bean,那么如果 DSL 定义中存在 .advice() 方法,则任何现有的 adviceChain 配置都将被覆盖:spring-doc.cadn.net.cn

@Bean
public TcpOutboundGateway tcpOut() {
    TcpOutboundGateway gateway = new TcpOutboundGateway();
    gateway.setConnectionFactory(cf());
    gateway.setAdviceChain(Collections.singletonList(fooAdvice()));
    return gateway;
}

@Bean
public IntegrationFlow clientTcpFlow() {
    return f -> f
        .handle(tcpOut(), e -> e.advice(testAdvice()))
        .transform(Transformers.objectToString());
}

它们未被合并,在此情况下仅使用 testAdvice() Bean。spring-doc.cadn.net.cn

转换器

DSL API 提供了一个便捷的、流畅的Transformers工厂,可用作.transform()EIP方法内的内联目标对象定义。 以下示例展示了如何使用它:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow transformFlow() {
    return IntegrationFlow.from("input")
            .transform(Transformers.fromJson(MyPojo.class))
            .transform(Transformers.serializer())
            .get();
}

它避免了使用 setter 方法进行不方便的编码,并使流程定义更加直接。 请注意,您可以使用 Transformers 将目标 Transformer 实例声明为 @Bean 实例,并再次从 IntegrationFlow 定义中作为 bean 方法使用它们。 不过,DSL 解析器会处理内联对象的 bean 声明,如果它们尚未定义为 beans。spring-doc.cadn.net.cn

请参阅 Javadoc 中的Transformer以获取更多信息和受支持的工厂方法。spring-doc.cadn.net.cn

入站通道适配器

通常,消息流从入站通道适配器(如 <int-jdbc:inbound-channel-adapter>)开始。 该适配器配置了 <poller>,并请求 MessageSource<?> 定期生成消息。 Java DSL 也允许从 MessageSource<?> 启动 IntegrationFlow。 为此,IntegrationFlow 流式 API 提供了重载的 IntegrationFlow.from(MessageSource<?> messageSource) 方法。 您可以将 MessageSource<?> 配置为 Bean,并将其作为该方法的参数提供。 IntegrationFlow.from() 的第二个参数是一个 Consumer<SourcePollingChannelAdapterSpec> Lambda 表达式,允许您为 SourcePollingChannelAdapter 提供选项(例如 PollerMetadataSmartLifecycle)。 以下示例展示了如何使用流式 API 和 Lambda 表达式创建 IntegrationFlowspring-doc.cadn.net.cn

@Bean
public MessageSource<Object> jdbcMessageSource() {
    return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}

@Bean
public IntegrationFlow pollingFlow() {
    return IntegrationFlow.from(jdbcMessageSource(),
                c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
            .transform(Transformers.toJson())
            .channel("furtherProcessChannel")
            .get();
}

对于那些没有直接构建 Message 对象需求的场景,您可以使用基于 java.util.function.SupplierIntegrationFlow.fromSupplier() 变体。 Supplier.get() 的结果会自动包装在 Message 中(如果它本身还不是 Message)。spring-doc.cadn.net.cn

消息路由器

Spring Integration 原生提供专门的路由器类型,包括:spring-doc.cadn.net.cn

与许多其他 DSL IntegrationFlowBuilder EIP 方法一样,route() 方法可以应用任何 AbstractMessageRouter 实现,或者为了方便起见,使用作为 SpEL 表达式的 Stringref-method 对。 此外,您可以使用 lambda 表达式配置 route(),并为 Consumer<RouterSpec<MethodInvokingRouter>> 使用 lambda 表达式。 流畅 API 还提供 AbstractMappingMessageRouter 选项,例如 channelMapping(String key, String channelName) 对,如下示例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routeFlowByLambda() {
    return IntegrationFlow.from("routerInput")
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.suffix("Channel")
                            .channelMapping(true, "even")
                            .channelMapping(false, "odd")
            )
            .get();
}

以下示例展示了一个简单的基于表达式的路由器:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routeFlowByExpression() {
    return IntegrationFlow.from("routerInput")
            .route("headers['destChannel']")
            .get();
}

routeToRecipients() 方法接受一个 Consumer<RecipientListRouterSpec>,如下示例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow recipientListFlow() {
    return IntegrationFlow.from("recipientListInput")
            .<String, String>transform(p -> p.replaceFirst("Payload", ""))
            .routeToRecipients(r -> r
                    .recipient("thing1-channel", "'thing1' == payload")
                    .recipientMessageSelector("thing2-channel", m ->
                            m.getHeaders().containsKey("recipient")
                                    && (boolean) m.getHeaders().get("recipient"))
                    .recipientFlow("'thing1' == payload or 'thing2' == payload or 'thing3' == payload",
                            f -> f.<String, String>transform(String::toUpperCase)
                                    .channel(c -> c.queue("recipientListSubFlow1Result")))
                    .recipientFlow((String p) -> p.startsWith("thing3"),
                            f -> f.transform("Hello "::concat)
                                    .channel(c -> c.queue("recipientListSubFlow2Result")))
                    .recipientFlow(new FunctionExpression<Message<?>>(m ->
                                    "thing3".equals(m.getPayload())),
                            f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
                    .defaultOutputToParentFlow())
            .get();
}

.routeToRecipients()定义中的.defaultOutputToParentFlow()允许您将路由器的defaultOutput设置为网关,以继续处理主流程中未匹配的消息。spring-doc.cadn.net.cn

分割器

要创建拆分器,请使用 split() EIP 方法。 默认情况下,如果负载是 IterableIteratorArrayStream 或响应式 Publishersplit() 方法会将每个项目作为单独的消息输出。 它接受一个 lambda 表达式、一个 SpEL 表达式或任何 AbstractMessageSplitter 实现。 或者,您也可以不带参数使用它来提供 DefaultMessageSplitter。 以下示例展示了如何通过提供 lambda 表达式来使用 split() 方法:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow splitFlow() {
    return IntegrationFlow.from("splitInput")
              .split(s -> s.applySequence(false).delimiters(","))
              .channel(MessageChannels.executor(taskExecutor()))
              .get();
}

上述示例创建了一个拆分器,用于拆分包含逗号分隔的 String 的消息。spring-doc.cadn.net.cn

聚合器和重组器

一个 Aggregator 在概念上是 Splitter 的相反操作。 它将一系列单独的消息聚合为单条消息,因此必然更加复杂。 默认情况下,聚合器返回的消息包含来自传入消息的负载集合。 同样的规则也适用于 Resequencer。 以下示例展示了拆分 - 聚合模式的经典案例:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow splitAggregateFlow() {
    return IntegrationFlow.from("splitAggregateInput")
            .split()
            .channel(MessageChannels.executor(this.taskExecutor()))
            .resequence()
            .aggregate()
            .get();
}

split()方法将列表拆分为单独的消息,并将它们发送到ExecutorChannelresequence()方法根据消息头中找到的序列详细信息对消息进行重新排序。 aggregate()方法收集这些消息。spring-doc.cadn.net.cn

然而,您可以通过指定发布策略和相关策略等来更改默认行为。 考虑以下示例:spring-doc.cadn.net.cn

.aggregate(a ->
        a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
            .releaseStrategy(g -> g.size() > 10)
            .messageStore(messageStore()))

前述示例将具有 myCorrelationKey 个标头的消息进行关联,并在至少累积十条消息后释放这些消息。spring-doc.cadn.net.cn

resequence() EIP 方法提供了类似的 Lambda 配置。spring-doc.cadn.net.cn

服务激活器和.handle()方法

The .handle() EIP 方法的目标是调用任何 MessageHandler 实现或某个 POJO 上的任何方法。 另一种选择是使用 lambda 表达式定义“活动”。 因此,我们引入了一个通用的 GenericHandler<P> 函数式接口。 其 handle 方法需要两个参数:P payloadMessageHeaders headers(从 5.1 版本开始)。 有了这个,我们可以按如下方式定义流程:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlow.from("flow3Input")
        .<Integer>handle((p, h) -> p * 2)
        .get();
}

上述示例会将接收到的任何整数翻倍。spring-doc.cadn.net.cn

然而,Spring Integration 的一个主要目标是loose coupling,即通过运行时类型转换将消息负载转换为消息处理器的目标参数。 由于 Java 不支持 lambda 类的泛型类型解析,我们为大多数 EIP 方法和LambdaMessageProcessor引入了一个变通方案,即添加一个额外的payloadType参数。 这样做可以将繁重的转换工作委托给 Spring 的ConversionService,它利用提供的type和请求的消息来匹配目标方法的参数。 以下示例展示了生成的IntegrationFlow可能的外观:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlow.from("input")
            .<byte[], String>transform(p - > new String(p, "UTF-8"))
            .handle(Integer.class, (p, h) -> p * 2)
            .get();
}

我们还可以在 ConversionService 中注册一些 BytesToIntegerConverter,以消除额外的 .transform()spring-doc.cadn.net.cn

@Bean
@IntegrationConverter
public BytesToIntegerConverter bytesToIntegerConverter() {
   return new BytesToIntegerConverter();
}

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlow.from("input")
             .handle(Integer.class, (p, h) -> p * 2)
            .get();
}

操作符 gateway()

IntegrationFlow定义中,gateway()操作符是一种特殊的服务激活器实现,用于通过其输入通道调用其他端点或集成流并等待回复。 从技术上讲,它在<chain>定义中扮演着与嵌套<gateway>组件相同的角色(参见在链内部调用链),并且使流程更加清晰和直接。 从逻辑和业务角度来看,它是一个消息网关,允许在目标集成解决方案的不同部分之间分发和重用功能(参见消息网关)。 该操作符具有多个重载版本以适用于不同的目标:spring-doc.cadn.net.cn

  • gateway(String requestChannel) 通过名称向某个端点的输入通道发送消息;spring-doc.cadn.net.cn

  • gateway(MessageChannel requestChannel) 通过直接注入向某个端点的输入通道发送消息;spring-doc.cadn.net.cn

  • gateway(IntegrationFlow flow) 用于向提供的 IntegrationFlow 的输入通道发送消息。spring-doc.cadn.net.cn

所有这些都提供了一个变体,其中第二个 Consumer<GatewayEndpointSpec> 参数用于配置目标 GatewayMessageHandler 以及相应的 AbstractEndpoint。 此外,基于 IntegrationFlow 的方法允许调用现有的 IntegrationFlow Bean,或通过内联 Lambda 将流程声明为子流程(针对 IntegrationFlow 函数式接口),或者将其提取到 private 方法中以获得更简洁的代码风格:spring-doc.cadn.net.cn

@Bean
IntegrationFlow someFlow() {
        return IntegrationFlow
                .from(...)
                .gateway(subFlow())
                .handle(...)
                .get();
}

private static IntegrationFlow subFlow() {
        return f -> f
                .scatterGather(s -> s.recipientFlow(...),
                        g -> g.outputProcessor(MessageGroup::getOne))
}
如果下游流程并非总是返回回复,您应将 requestTimeout 设置为 0,以防止调用线程无限期挂起。 在这种情况下,流程将在此处结束,并释放该线程以进行后续工作。

操作符 log()

为了方便记录消息在 Spring Integration 流程(<logging-channel-adapter>)中的流转,系统提供了一个 log() 算子。 在内部,它由一个以 LoggingHandler 作为订阅者的 WireTap ChannelInterceptor 表示。 其职责是将传入的消息记录到下一个端点或当前通道中。 以下示例展示了如何使用 LoggingHandlerspring-doc.cadn.net.cn

.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)

在前面的示例中,仅针对通过过滤器并在路由之前的消息,id 级别的标头会被记录到 ERRORtest.category 上。spring-doc.cadn.net.cn

从版本 6.0 开始,该操作符在流程末尾的行为已与其在中间的使用方式保持一致。 换句话说,即使移除了 log() 操作符,流程的行为也保持不变。 因此,如果预期在流程末尾不产生回复,建议在最后一个 log() 之后使用 nullChannel()spring-doc.cadn.net.cn

操作符 intercept()

从版本 5.3 开始,intercept() 运算符允许在当前 MessageChannel 中注册一个或多个 ChannelInterceptor 实例。这是通过 MessageChannels API 显式创建 MessageChannel 的替代方案。 以下示例使用 MessageSelectingInterceptor 以异常方式拒绝某些消息:spring-doc.cadn.net.cn

.transform(...)
.intercept(new MessageSelectingInterceptor(m -> m.getPayload().isValid()))
.handle(...)

MessageChannelSpec.wireTap()

Spring Integration 包含一个 .wireTap() 流式 API MessageChannelSpec 构建器。 以下示例展示了如何使用 wireTap 方法来记录输入:spring-doc.cadn.net.cn

@Bean
public QueueChannelSpec myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input");
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}

如果 MessageChannelInterceptableChannel 的实例,则会将 log()wireTap()intercept() 运算符应用于当前的 MessageChannel。 否则,将为当前配置的端点向流程中注入一个中间 DirectChannel。 在以下示例中,WireTap 拦截器被直接添加到 myChannel,因为 DirectChannel 实现了 InterceptableChannelspring-doc.cadn.net.cn

@Bean
MessageChannel myChannel() {
    return new DirectChannel();
}

...
    .channel(myChannel())
    .log()
}

当当前的 MessageChannel 未实现 InterceptableChannel 时,一个隐式的 DirectChannelBridgeHandler 会被注入到 IntegrationFlow 中,并且 WireTap 会被添加到这个新的 DirectChannel 中。 以下示例没有任何通道声明:spring-doc.cadn.net.cn

.handle(...)
.log()
}

在上面的示例中(以及任何未声明通道的情况下),会在当前配置的 ServiceActivatingHandler 的当前位置隐式注入 DirectChannel,并将其用作输出通道(来自 .handle()前面已描述)。spring-doc.cadn.net.cn

使用消息流

IntegrationFlowBuilder 提供了一个顶级 API,用于生成集成组件并将其连接到消息流。 当您的集成可以通过单个流完成时(这通常是情况),这将非常方便。 或者,IntegrationFlow 实例可以通过 MessageChannel 实例进行连接。spring-doc.cadn.net.cn

默认情况下,MessageFlow 在 Spring Integration 术语中表现为“链”。 也就是说,端点由 DirectChannel 实例自动且隐式地连接。 消息流实际上并非构建为链,这提供了更大的灵活性。 例如,如果您知道某个组件的 inputChannel 名称(即您显式定义了它),您可以向该流中的任何组件发送消息。 您还可以在流中引用外部定义的通道,以使用通道适配器(启用远程传输协议、文件 I/O 等),而不是直接使用通道。 因此,DSL 不支持 Spring Integration 的 chain 元素,因为在这种情况下它并未增加太多价值。spring-doc.cadn.net.cn

由于 Spring Integration Java DSL 产生的 Bean 定义模型与其他任何配置选项相同,并且基于现有的 Spring Framework @Configuration 基础设施,因此它可以与 XML 定义一起使用,并与 Spring Integration 消息注解配置进行连接。spring-doc.cadn.net.cn

您也可以使用 lambda 定义直接的 IntegrationFlow 实例。 以下示例展示了如何实现:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow lambdaFlow() {
    return f -> f.filter("World"::equals)
                   .transform("Hello "::concat)
                   .handle(System.out::println);
}

此定义的结果是相同的集成组件集,这些组件通过隐式直接通道进行连接。 唯一的限制是,此流程以一个命名的直接通道启动 - lambdaFlow.input。 此外,Lambda 流程不能从 MessageSourceMessageProducer 开始。spring-doc.cadn.net.cn

从版本 5.1 开始,此类 IntegrationFlow 被包装到代理中,以暴露生命周期控制并提供对内部关联的 StandardIntegrationFlowinputChannel 的访问。spring-doc.cadn.net.cn

从版本 5 开始。0.6,IntegrationFlow 中组件生成的 Bean 名称包括流程 Bean,后跟一个点(.)作为前缀。例如,前文示例中 .transform("Hello "::concat)ConsumerEndpointFactoryBean 会导致生成名为 lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0 的 Bean。(o.s.iorg.springframework.integration 的简化形式,以便适应页面显示。该端点的 Transformer 实现 Bean 的 Bean 名称为 lambdaFlow.transformer#0(从版本 5 开始)。1),其中不使用MethodInvokingTransformer类的完全限定名称,而是使用其组件类型。当在流程中生成 bean 名称时,所有 NamedComponent 都应用相同的模式。这些生成的 Bean 名称以前缀形式添加流程 ID,目的是用于解析日志或在某些分析工具中将组件分组,同时避免在运行时并发注册集成流程时出现竞态条件。有关更多信息,请参阅 动态和运行时集成流spring-doc.cadn.net.cn

FunctionExpression

我们引入了 FunctionExpression 类(SpEL 的 Expression 接口的一个实现),以支持使用 lambda 表达式和 generics。 为 DSL 组件提供了 Function<T, R> 选项,并附带一个 expression 选项,当存在来自核心 Spring Integration 的隐式 Strategy 变体时。 以下示例展示了如何使用函数表达式:spring-doc.cadn.net.cn

.enrich(e -> e.requestChannel("enrichChannel")
            .requestPayload(Message::getPayload)
            .propertyFunction("date", m -> new Date()))

FunctionExpression 还支持运行时类型转换,这与 SpelExpression 中所做的一样。spring-doc.cadn.net.cn

子流程支持

一些 if…​elsepublish-subscribe 组件提供了使用子流程来指定其逻辑或映射的功能。 最简单的示例是 .publishSubscribeChannel(),如下例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow subscribersFlow() {
    return flow -> flow
            .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p / 2)
                            .channel(c -> c.queue("subscriber1Results")))
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p * 2)
                            .channel(c -> c.queue("subscriber2Results"))))
            .<Integer>handle((p, h) -> p * 3)
            .channel(c -> c.queue("subscriber3Results"));
}

您可以使用单独的 IntegrationFlow @Bean 定义来实现相同的结果,但我们希望您会发现子流程式的逻辑组合方式非常有用。 我们发现这能生成更短(因此更易读)的代码。spring-doc.cadn.net.cn

从版本 5.3 开始,提供了一个基于 BroadcastCapableChannelpublishSubscribeChannel() 实现,用于在由代理支持的消息通道上配置子流订阅者。 例如,我们现在可以在 Jms.publishSubscribeChannel() 上配置多个订阅者作为子流:spring-doc.cadn.net.cn

@Bean
public JmsPublishSubscribeMessageChannelSpec jmsPublishSubscribeChannel() {
    return Jms.publishSubscribeChannel(jmsConnectionFactory())
                .destination("pubsub");
}

@Bean
public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
    return f -> f
            .publishSubscribeChannel(jmsPublishSubscribeChannel,
                    pubsub -> pubsub
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel1")))
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}

类似的 publish-subscribe 子流程组合提供了 .routeToRecipients() 方法。spring-doc.cadn.net.cn

另一个例子是在 .filter() 方法中使用 .discardFlow() 而不是 .discardChannel()spring-doc.cadn.net.cn

.route() 值得特别关注。 考虑以下示例:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routeFlow() {
    return f -> f
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.channelMapping("true", "evenChannel")
                            .subFlowMapping("false", sf ->
                                    sf.<Integer>handle((p, h) -> p * 3)))
            .transform(Object::toString)
            .channel(c -> c.queue("oddChannel"));
}

.channelMapping() 继续像常规 Router 映射中那样工作,但 .subFlowMapping() 将该子流程与主流程绑定。 换句话说,任何路由器的子流程在 .route() 之后都会返回到主流程。spring-doc.cadn.net.cn

有时,您需要从 .subFlowMapping() 引用现有的 IntegrationFlow @Bean。 以下示例展示了如何实现这一点:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow splitRouteAggregate() {
    return f -> f
            .split()
            .<Integer, Boolean>route(o -> o % 2 == 0,
                    m -> m
                            .subFlowMapping(true, oddFlow())
                            .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
            .aggregate();
}

@Bean
public IntegrationFlow oddFlow() {
    return f -> f.handle(m -> System.out.println("odd"));
}

@Bean
public IntegrationFlow evenFlow() {
    return f -> f.handle((p, h) -> "even");
}


在这种情况下,当您需要从这样的子流程接收回复并继续主流程时,此IntegrationFlowbean 引用(或其输入通道)必须用.gateway()如前面的示例所示。 TheoddFlow()前述示例中的引用未被包装到.gateway(). 因此,我们预期不会从此路由分支收到回复。 否则,您最终会遇到类似于以下内容的异常:spring-doc.cadn.net.cn

Caused by: org.springframework.beans.factory.BeanCreationException:
    The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c)
    is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'.
    This is the end of the integration flow.

当您将子流程配置为 lambda 时,框架将处理与子流程的请求 - 回复交互,因此不需要网关。spring-doc.cadn.net.cn

子流程可以嵌套到任意深度,但我们不建议这样做。 实际上,即使在路由器场景中,在流程中添加复杂的子流程也会迅速变得像一盘意大利面,难以被人解析。spring-doc.cadn.net.cn

在 DSL 支持子流配置的情况下,当为正在配置的组件通常需要通道,且该子流以 channel() 元素开头时,框架会在组件输出通道与流的输入通道之间隐式放置一个 bridge()。 例如,在此 filter 定义中:spring-doc.cadn.net.cn

.filter(p -> p instanceof String, e -> e
	.discardFlow(df -> df
                         .channel(MessageChannels.queue())
                         ...)

框架内部会创建一个 DirectChannel Bean,用于注入到 MessageFilter.discardChannel 中。 随后,它将子流程封装为一个 IntegrationFlow,以该隐式通道作为订阅的起点,并在流程中指定的 channel() 之前放置一个 bridge。 当使用现有的 IntegrationFlow Bean 作为子流程引用(而非内联子流程,例如 lambda 表达式)时,不需要此类桥接,因为框架可以从流程 Bean 中解析出第一个通道。 对于内联子流程,输入通道尚未可用。spring-doc.cadn.net.cn

使用协议适配器

到目前为止,所有示例都说明了 DSL 如何通过使用 Spring Integration 编程模型来支持消息架构。 然而,我们尚未进行任何真正的集成。 实现这一点需要能够通过网络协议(如 HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP 等)访问远程资源,或访问本地文件系统。 Spring Integration 支持所有这些功能以及更多功能。 理想情况下,DSL 应为所有这些功能提供一等公民级别的支持,但实现所有这些功能并随着新适配器添加到 Spring Integration 中而保持同步是一项艰巨的任务。 因此,预期是 DSL 将不断追赶 Spring Integration 的更新。spring-doc.cadn.net.cn

因此,我们提供高级 API 以无缝地定义特定于协议的 Messaging。 我们通过工厂模式和构建者模式以及 Lambda 表达式来实现这一点。 您可以将工厂类视为“命名空间工厂”,因为它们所扮演的角色与来自具体协议特定 Spring Integration 模块的组件所使用的 XML 命名空间相同。 目前,Spring Integration Java DSL 支持 AmqpFeedJmsFiles(S)FtpHttpJPAMongoDbTCP/UDPMailWebFluxScripts 命名空间工厂。 以下示例展示了如何使用其中三个(AmqpJmsMail):spring-doc.cadn.net.cn

@Bean
public IntegrationFlow amqpFlow() {
    return IntegrationFlow.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
            .transform("hello "::concat)
            .transform(String.class, String::toUpperCase)
            .get();
}

@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
    return IntegrationFlow.from("jmsOutboundGatewayChannel")
            .handle(Jms.outboundGateway(this.jmsConnectionFactory)
                        .replyContainer(c ->
                                    c.concurrentConsumers(3)
                                            .sessionTransacted(true))
                        .requestDestination("jmsPipelineTest"))
            .get();
}

@Bean
public IntegrationFlow sendMailFlow() {
    return IntegrationFlow.from("sendMailChannel")
            .handle(Mail.outboundAdapter("localhost")
                            .port(smtpPort)
                            .credentials("user", "pw")
                            .protocol("smtp")
                            .javaMailProperties(p -> p.put("mail.debug", "true")),
                    e -> e.id("sendMailEndpoint"))
            .get();
}

上述示例展示了如何将“命名空间工厂”用作内联适配器声明。 然而,您可以从 @Bean 定义中使用它们,以使 IntegrationFlow 方法链更具可读性。spring-doc.cadn.net.cn

我们正在征求社区对这些命名空间工厂的反馈,以便在投入其他工作之前收集意见。 我们也非常欢迎任何关于接下来应优先支持哪些适配器和网关的建议。

您可以在本参考手册中各协议特定章节找到更多 Java DSL 示例。spring-doc.cadn.net.cn

所有其他协议通道适配器都可以配置为通用 Bean,并像以下示例所示连接到 IntegrationFlowspring-doc.cadn.net.cn

@Bean
public QueueChannelSpec wrongMessagesChannel() {
    return MessageChannels
            .queue()
            .wireTap("wrongMessagesWireTapChannel");
}

@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
    return IntegrationFlow.from("inputChannel")
            .filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
                    e -> e.discardChannel(wrongMessagesChannel))
            .log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
            .route(xpathRouter(wrongMessagesChannel))
            .get();
}

@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
    XPathRouter router = new XPathRouter("local-name(/*)");
    router.setEvaluateAsString(true);
    router.setResolutionRequired(false);
    router.setDefaultOutputChannel(wrongMessagesChannel);
    router.setChannelMapping("Tags", "splittingChannel");
    router.setChannelMapping("Tag", "receivedChannel");
    return router;
}

IntegrationFlowAdapter

IntegrationFlow接口可以直接实现,并指定为扫描的组件,如下示例所示:spring-doc.cadn.net.cn

@Component
public class MyFlow implements IntegrationFlow {

    @Override
    public void configure(IntegrationFlowDefinition<?> f) {
        f.<String, String>transform(String::toUpperCase);
    }

}

它被 IntegrationFlowBeanPostProcessor 捕获,并正确解析和注册到应用程序上下文中。spring-doc.cadn.net.cn

为了便于使用并获得松散耦合架构的好处,我们提供了 IntegrationFlowAdapter 基类实现。 它需要实现 buildFlow() 方法,以通过 from() 方法之一生成 IntegrationFlowDefinition,如下示例所示:spring-doc.cadn.net.cn

@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {

    private final AtomicBoolean invoked = new AtomicBoolean();

    public Date nextExecutionTime(TriggerContext triggerContext) {
          return this.invoked.getAndSet(true) ? null : new Date();
    }

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return from(this::messageSource,
                      e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
                 .split(this)
                 .transform(this)
                 .aggregate(a -> a.processor(this, null), null)
                 .enrichHeaders(Collections.singletonMap("thing1", "THING1"))
                 .filter(this)
                 .handle(this)
                 .channel(c -> c.queue("myFlowAdapterOutput"));
    }

    public String messageSource() {
         return "T,H,I,N,G,2";
    }

    @Splitter
    public String[] split(String payload) {
         return StringUtils.commaDelimitedListToStringArray(payload);
    }

    @Transformer
    public String transform(String payload) {
         return payload.toLowerCase();
    }

    @Aggregator
    public String aggregate(List<String> payloads) {
           return payloads.stream().collect(Collectors.joining());
    }

    @Filter
    public boolean filter(@Header Optional<String> thing1) {
            return thing1.isPresent();
    }

    @ServiceActivator
    public String handle(String payload, @Header String thing1) {
           return payload + ":" + thing1;
    }

}

动态和运行时集成流程

IntegrationFlow 及其所有依赖组件都可以在运行时注册。 在 5.0 版本之前,我们使用 BeanFactory.registerSingleton() 钩子。 从 Spring Framework 5.0 开始,我们使用 instanceSupplier 钩子进行编程式 BeanDefinition 注册。 以下示例展示了如何编程式注册一个 Bean:spring-doc.cadn.net.cn

BeanDefinition beanDefinition =
         BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
               .getRawBeanDefinition();

((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);

注意,在上述示例中,instanceSupplier钩子是genericBeanDefinition方法的最后一个参数,本例中由 lambda 提供。spring-doc.cadn.net.cn

所有必要的 Bean 初始化和生命周期管理均会自动完成,这与标准上下文配置 Bean 定义的行为一致。spring-doc.cadn.net.cn

为了简化开发体验,Spring Integration 引入了 IntegrationFlowContext 来在运行时注册和管理 IntegrationFlow 实例,如下示例所示:spring-doc.cadn.net.cn

@Autowired
private AbstractServerConnectionFactory server1;

@Autowired
private IntegrationFlowContext flowContext;

...

@Test
public void testTcpGateways() {
    TestingUtilities.waitListening(this.server1, null);

    IntegrationFlow flow = f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client1"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());

    IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
    assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}

当我们拥有多个配置选项且需要创建多个相似流程的实例时,这将非常有用。 为此,我们可以遍历我们的选项,并在循环中创建并注册 IntegrationFlow 个实例。 另一种变体是当我们的数据源不是基于 Spring 的,因此我们必须动态地创建它。 如下示例所示,响应式流(Reactive Streams)事件源就是这样一个样本:spring-doc.cadn.net.cn

Flux<Message<?>> messageFlux =
    Flux.just("1,2,3,4")
        .map(v -> v.split(","))
        .flatMapIterable(Arrays::asList)
        .map(Integer::parseInt)
        .map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
    IntegrationFlow.from(messageFlux)
        .<Integer, Integer>transform(p -> p * 2)
        .channel(resultChannel)
        .get();

this.integrationFlowContext.registration(integrationFlow)
            .register();

IntegrationFlowRegistrationBuilder(作为IntegrationFlowContext.registration()的结果)可用于为要注册的IntegrationFlow指定 Bean 名称,控制其autoStartup,并注册非 Spring Integration 的 Bean。 通常,这些额外的 Bean 是连接工厂(AMQP、JMS、(S)FTP、TCP/UDP 等)、序列化和反序列化器,或任何其他所需的支持组件。spring-doc.cadn.net.cn

当您不再需要时,可以使用 IntegrationFlowRegistration.destroy() 回调来移除动态注册的 IntegrationFlow 及其所有依赖的 Bean。 有关更多信息,请参阅 IntegrationFlowContext Javadocspring-doc.cadn.net.cn

从版本 5.0.6 开始,IntegrationFlow 定义中生成的所有 bean 名称都会以前缀形式添加流程 ID。 我们建议始终显式指定流程 ID。 否则,将在 IntegrationFlowContext 中启动同步屏障,以生成 IntegrationFlow 的 bean 名称并注册其 bean。 我们对这两个操作进行同步,以避免当相同的生成 bean 名称可能用于不同的 IntegrationFlow 实例时出现竞态条件。

此外,从 5.0.6 版本开始,注册构建器 API 新增了一个方法:useFlowIdAsPrefix()。 如果您希望声明同一流程的多个实例,并避免流程中具有相同 ID 的组件发生 Bean 名称冲突,此方法非常有用,如下示例所示:spring-doc.cadn.net.cn

private void registerFlows() {
    IntegrationFlowRegistration flow1 =
              this.flowContext.registration(buildFlow(1234))
                    .id("tcp1")
                    .useFlowIdAsPrefix()
                    .register();

    IntegrationFlowRegistration flow2 =
              this.flowContext.registration(buildFlow(1235))
                    .id("tcp2")
                    .useFlowIdAsPrefix()
                    .register();
}

private IntegrationFlow buildFlow(int port) {
    return f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());
}

在这种情况下,第一个流程的消息处理器可以通过 bean 名称 tcp1.client.handler 引用。spring-doc.cadn.net.cn

在使用 useFlowIdAsPrefix() 时,需要指定 id 属性。

IntegrationFlow作为网关

IntegrationFlow 可以从提供 GatewayProxyFactoryBean 组件的服务接口开始,如下示例所示:spring-doc.cadn.net.cn

public interface ControlBusGateway {

    void send(String command);
}

...

@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlow.from(ControlBusGateway.class)
            .controlBus()
            .get();
}

所有用于接口方法的代理都会附带一个通道,用于向 IntegrationFlow 中的下一个集成组件发送消息。 您可以使用 @MessagingGateway 注解标记服务接口,并使用 @Gateway 注解标记方法。 然而,requestChannel 将被忽略,并在 IntegrationFlow 中由内部通道(用于下一个组件)所覆盖。 否则,使用 IntegrationFlow 创建此类配置是没有意义的。spring-doc.cadn.net.cn

默认情况下,GatewayProxyFactoryBean 会获得一个常规的 Bean 名称,例如 [FLOW_BEAN_NAME.gateway]。 您可以通过使用 @MessagingGateway.name() 属性或重载的 IntegrationFlow.from(Class<?> serviceInterface, Consumer<GatewayProxySpec> endpointConfigurer) 工厂方法来更改该 ID。 此外,接口上 @MessagingGateway 注解的所有属性都会应用到目标 GatewayProxyFactoryBean。 当注解配置不适用时,可以使用 Consumer<GatewayProxySpec> 变体为目标代理提供合适的选项。 此 DSL 方法从版本 5.2 开始可用。spring-doc.cadn.net.cn

使用 Java 8,您甚至可以创建集成网关,无需实现 java.util.function 接口,如下示例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow errorRecovererFlow() {
    return IntegrationFlow.from(Function.class, (gateway) -> gateway.beanName("errorRecovererFunction"))
            .<Object>handle((p, h) -> {
                throw new RuntimeException("intentional");
            }, e -> e.advice(retryAdvice()))
            .get();
}

errorRecovererFlow 可按如下方式使用:spring-doc.cadn.net.cn

@Autowired
@Qualifier("errorRecovererFunction")
private Function<String, String> errorRecovererFlowGateway;

领域特定语言扩展

从版本 5.3 开始,引入了一个 IntegrationFlowExtension,用于通过自定义或组合的 EIP 操作符扩展现有的 Java DSL。 只需扩展此类,提供可在 IntegrationFlow Bean 定义中使用的相应方法即可。 该扩展类也可用于自定义 IntegrationComponentSpec 配置;例如,可以在现有的 IntegrationComponentSpec 扩展中实现缺失或默认选项。 下面的示例演示了复合自定义操作符的使用,以及针对默认自定义 outputProcessorAggregatorSpec 扩展用法:spring-doc.cadn.net.cn

public class CustomIntegrationFlowDefinition
        extends IntegrationFlowExtension<CustomIntegrationFlowDefinition> {

    public CustomIntegrationFlowDefinition upperCaseAfterSplit() {
        return split()
                .transform("payload.toUpperCase()");
    }

    public CustomIntegrationFlowDefinition customAggregate(Consumer<CustomAggregatorSpec> aggregator) {
        return register(new CustomAggregatorSpec(), aggregator);
    }

}

public class CustomAggregatorSpec extends AggregatorSpec {

    CustomAggregatorSpec() {
        outputProcessor(group ->
                group.getMessages()
                        .stream()
                        .map(Message::getPayload)
                        .map(String.class::cast)
                        .collect(Collectors.joining(", ")));
    }

}

对于方法链流程,这些扩展中的新 DSL 运算符必须返回扩展类。 这样,目标 IntegrationFlow 定义将适用于新的和现有的 DSL 运算符:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow customFlowDefinition() {
    return
            new CustomIntegrationFlowDefinition()
                    .log()
                    .upperCaseAfterSplit()
                    .channel("innerChannel")
                    .customAggregate(customAggregatorSpec ->
                            customAggregatorSpec.expireGroupsUponCompletion(true))
                    .logAndReply();
}

集成流组合

随着 MessageChannel 抽象在 Spring Integration 中成为一等公民,集成流的组合始终被假定是可行的。 流中任何端点的输入通道都可以用于从任何其他端点发送消息,而不仅限于以该通道为输出的端点。 此外,借助 @MessagingGateway 契约、内容增强器组件、复合端点(如 <chain>),以及现在的 IntegrationFlow Bean(例如 IntegrationFlowAdapter),将业务逻辑分布在更短、可重用的部分变得非常简单。 最终组合所需的一切,就是了解要发送或接收的 MessageChannelspring-doc.cadn.net.cn

从版本 5.5.4 开始,为了更抽象地处理 MessageChannel 并向最终用户隐藏实现细节,IntegrationFlow 引入了 from(IntegrationFlow) 工厂方法,允许从现有流的输出启动当前的 IntegrationFlowspring-doc.cadn.net.cn

@Bean
IntegrationFlow templateSourceFlow() {
    return IntegrationFlow.fromSupplier(() -> "test data")
            .channel("sourceChannel")
            .get();
}

@Bean
IntegrationFlow compositionMainFlow(IntegrationFlow templateSourceFlow) {
    return IntegrationFlow.from(templateSourceFlow)
            .<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("compositionMainFlowResult"))
            .get();
}

另一方面,IntegrationFlowDefinition 添加了一个 to(IntegrationFlow) 终端操作符,以便在某个其他流的输入通道处继续当前流:spring-doc.cadn.net.cn

@Bean
IntegrationFlow mainFlow(IntegrationFlow otherFlow) {
    return f -> f
            .<String, String>transform(String::toUpperCase)
            .to(otherFlow);
}

@Bean
IntegrationFlow otherFlow() {
    return f -> f
            .<String, String>transform(p -> p + " from other flow")
            .channel(c -> c.queue("otherFlowResultChannel"));
}

流程中间的组合完全可以通过现有的 gateway(IntegrationFlow) EIP 方法实现。 这样,我们就可以通过组合更简单、可复用的逻辑块来构建任意复杂度的流程。 例如,您可以将包含 IntegrationFlow 个 Bean 的库作为依赖项添加,只需将其配置类导入到最终项目中并自动注入到您的 IntegrationFlow 定义中即可。spring-doc.cadn.net.cn