Spring Integration Interaction

Spring Integration Framework 扩展了 Spring 编程模型,以支持广为人知的企业集成模式。它可在基于 Spring 的应用程序中实现轻量级消息传递,并通过声明式适配器支持与外部系统的集成。此外,它还提供了一种高级领域特定语言(DSL),用于将各种操作(端点)组合成逻辑集成流程。借助该 DSL 的 Lambda 风格配置,Spring Integration 已具备良好的 java.util.function 接口采用程度。其中,@MessagingGateway 代理接口还可作为 FunctionConsumer 使用,根据 Spring Cloud Function 环境,可将其注册到函数目录中。有关其对函数的支持的更多信息,请参阅 Spring Integration 参考手册spring-doc.cadn.net.cn

另一方面,从版本 4.0.3 开始,Spring Cloud Function 引入了一个 spring-cloud-function-integration 模块,该模块从 Spring Integration DSL 视角出发,为与 FunctionCatalog 的交互提供更深入、更具云特性和基于自动配置的 API。该 FunctionFlowBuilder 是自动配置并由 FunctionCatalog 自动装配的,它作为面向目标 IntegrationFlow 实例的功能特定 DSL 的入口点。除了标准的 IntegrationFlow.from() 工厂(为方便起见),FunctionFlowBuilder 还公开了一个 fromSupplier(String supplierDefinition) 工厂,用于在提供的 FunctionCatalog 中查找目标 Supplier。随后,该 FunctionFlowBuilder 将导向 FunctionFlowDefinition。该 FunctionFlowDefinitionIntegrationFlowExtension 的实现,并提供了 apply(String functionDefinition)accept(String consumerDefinition) 操作符,分别用于从 FunctionCatalog 中查找 FunctionConsumer。有关更多信息,请参阅其 Javadoc。spring-doc.cadn.net.cn

以下示例展示了 FunctionFlowBuilder 的实际应用,以及 IntegrationFlow API 其余部分的强大功能:spring-doc.cadn.net.cn

@Configuration
public class IntegrationConfiguration {

    @Bean
    Supplier<byte[]> simpleByteArraySupplier() {
        return "simple test data"::getBytes;
    }

    @Bean
    Function<String, String> upperCaseFunction() {
        return String::toUpperCase;
    }

    @Bean
    BlockingQueue<String> results() {
        return new LinkedBlockingQueue<>();
    }

    @Bean
    Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
        return results::add;
    }

    @Bean
    QueueChannel wireTapChannel() {
        return new QueueChannel();
    }

    @Bean
    IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
        return functionFlowBuilder
                .fromSupplier("simpleByteArraySupplier")
                .wireTap("wireTapChannel")
                .apply("upperCaseFunction")
                .log(LoggingHandler.Level.WARN)
                .accept("simpleStringConsumer");
    }

}

由于 FunctionCatalog.lookup() 功能不仅限于简单的函数名称,还可以在上述 apply()accept() 运算符中使用函数组合功能:spring-doc.cadn.net.cn

@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .from("functionCompositionInput")
            .accept("upperCaseFunction|simpleStringConsumer");
}

当我们在 Spring Cloud 应用程序中添加预定义功能的自动配置依赖项时,此 API 的相关性会更高。例如,流应用程序(Stream Applications) 项目不仅提供应用程序镜像,还提供适用于各种集成用例的功能构件,如 debezium-supplierelasticsearch-consumeraggregator-function 等。spring-doc.cadn.net.cn

以下配置分别基于 http-supplierspel-functionfile-consumerspring-doc.cadn.net.cn

@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
            .<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
            .channel(c -> c.flux())
            .apply("spelFunction")
            .<String, String>transform(String::toUpperCase)
            .accept("fileConsumer");
}

我们还需要做的就是将它们的配置添加到 application.properties 中(如有必要):spring-doc.cadn.net.cn

http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt