|
此版本仍在开发中,目前尚不被视为稳定版本。如需最新稳定版本,请使用 spring-cloud-function 5.0.1! |
Spring Integration Interaction
Spring Integration Framework 扩展了 Spring 编程模型,以支持广为人知的企业集成模式。它可在基于 Spring 的应用程序中实现轻量级消息传递,并通过声明式适配器支持与外部系统的集成。此外,它还提供了一种高级领域特定语言(DSL),用于将各种操作(端点)组合成逻辑集成流程。借助该 DSL 的 Lambda 风格配置,Spring Integration 已具备良好的 java.util.function 接口采用程度。其中,@MessagingGateway 代理接口还可作为 Function 或 Consumer 使用,根据 Spring Cloud Function 环境,可将其注册到函数目录中。有关其对函数的支持的更多信息,请参阅 Spring Integration 参考手册。
另一方面,从版本 4.0.3 开始,Spring Cloud Function 引入了一个 spring-cloud-function-integration 模块,该模块从 Spring Integration DSL 视角出发,为与 FunctionCatalog 的交互提供更深入、更具云特性和基于自动配置的 API。该 FunctionFlowBuilder 是自动配置并由 FunctionCatalog 自动装配的,它作为面向目标 IntegrationFlow 实例的功能特定 DSL 的入口点。除了标准的 IntegrationFlow.from() 工厂(为方便起见),FunctionFlowBuilder 还公开了一个 fromSupplier(String supplierDefinition) 工厂,用于在提供的 FunctionCatalog 中查找目标 Supplier。随后,该 FunctionFlowBuilder 将导向 FunctionFlowDefinition。该 FunctionFlowDefinition 是 IntegrationFlowExtension 的实现,并提供了 apply(String functionDefinition) 和 accept(String consumerDefinition) 操作符,分别用于从 FunctionCatalog 中查找 Function 或 Consumer。有关更多信息,请参阅其 Javadoc。
以下示例展示了 FunctionFlowBuilder 的实际应用,以及 IntegrationFlow API 其余部分的强大功能:
@Configuration
public class IntegrationConfiguration {
@Bean
Supplier<byte[]> simpleByteArraySupplier() {
return "simple test data"::getBytes;
}
@Bean
Function<String, String> upperCaseFunction() {
return String::toUpperCase;
}
@Bean
BlockingQueue<String> results() {
return new LinkedBlockingQueue<>();
}
@Bean
Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
return results::add;
}
@Bean
QueueChannel wireTapChannel() {
return new QueueChannel();
}
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("simpleByteArraySupplier")
.wireTap("wireTapChannel")
.apply("upperCaseFunction")
.log(LoggingHandler.Level.WARN)
.accept("simpleStringConsumer");
}
}
由于 FunctionCatalog.lookup() 功能不仅限于简单的函数名称,还可以在上述 apply() 和 accept() 运算符中使用函数组合功能:
@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.from("functionCompositionInput")
.accept("upperCaseFunction|simpleStringConsumer");
}
当我们在 Spring Cloud 应用程序中添加预定义功能的自动配置依赖项时,此 API 的相关性会更高。例如,流应用程序(Stream Applications) 项目不仅提供应用程序镜像,还提供适用于各种集成用例的功能构件,如 debezium-supplier、elasticsearch-consumer、aggregator-function 等。
以下配置分别基于 http-supplier、spel-function 和 file-consumer:
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
.<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
.channel(c -> c.flux())
.apply("spelFunction")
.<String, String>transform(String::toUpperCase)
.accept("fileConsumer");
}
我们还需要做的就是将它们的配置添加到 application.properties 中(如有必要):
http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt