编程模型
功能目录与灵活的功能签名
Spring Cloud Function 的主要特性之一是能够适应并支持多种用户自定义函数的类型签名,同时提供一致的执行模型。这就是为什么所有用户自定义函数都会被 FunctionCatalog 转换为一种标准表示形式。
虽然用户通常不需要关心 FunctionCatalog,但了解在用户代码中支持哪些类型的函数是有用的。
同样重要的是要理解,Spring Cloud Function 为响应式 API 提供了第一类支持,该支持由 Project Reactor 提供。这使得诸如 Mono 和 Flux 这样的响应式原语可作为用户自定义函数的类型使用,从而在为函数实现选择编程模型时提供更大的灵活性。响应式编程模型还使功能支持成为可能,而这些功能若采用命令式编程风格则难以甚至无法实现。有关更多详情,请阅读关于 函数元数(Function Arity) 的章节。
Java 8 函数式支持
Spring Cloud Function 接受并建立在 Java 8 自定义的三个核心函数式接口之上。
-
提供商<O>
-
Function<I, O>
-
Consumer<I>
为了持续避免提及 Supplier、Function 和 Consumer,在本手册的其余部分,我们将根据需要将其称为“函数式 Bean”。
简而言之,您应用程序中任何属于 ApplicationContext 的函数式 Bean(Functional bean)都将以延迟方式注册到 FunctionCatalog。这意味着它可受益于本参考手册中所描述的所有附加功能。
在最简单的应用程序中,您只需在应用程序配置中声明一个类型为 @Bean、Function 或 Consumer 的 Supplier。然后,您可以使用 FunctionCatalog 根据其名称查找特定函数。
例如:
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
// . . .
FunctionCatalog catalog = applicationContext.getBean(FunctionCatalog.class);
Function uppercase = catalog.lookup(“uppercase”);
重要的是要理解,由于 uppercase 是一个 Bean,您当然可以直接从 ApplicationContext 中获取它,但您所得到的仅是您所声明的 Bean 本身,不包含 SCF 提供的任何额外功能。当您通过 FunctionCatalog 查找一个函数时,所获得的实例会被封装(增强)以添加额外的功能(例如类型转换、组合等),这些功能在本手册中有详细描述。
此外,重要的是要理解,典型的用户并不会直接使用 Spring Cloud Function。相反,典型的用户会实现一个 Java Function、Supplier 或 Consumer,其设计初衷是在不同执行环境中使用,而无需额外的工作。
例如,同一个 Java 函数可以表示为 REST 端点、流消息处理器 或 AWS Lambda,甚至更多,通过 Spring Cloud Function 提供的适配器以及其他以 Spring Cloud Function 为核心编程模型的框架(例如 Spring Cloud Stream)。
总之,Spring Cloud Function 为 Java 函数添加了额外的功能,以便在各种执行环境中使用。
函数定义
虽然前面的例子向您展示了如何在 FunctionCatalog 中以编程方式查找一个函数,但在典型的集成场景中(例如,当 Spring Cloud Function 作为另一个框架(如 Spring Cloud Stream)的编程模型使用时),您可以利用 spring.cloud.function.definition 属性来声明要使用的函数。了解并理解在 FunctionCatalog 中发现函数的默认行为非常重要。
例如,如果您在 ApplicationContext 中仅有一个函数式 Bean,则通常无需指定 spring.cloud.function.definition 属性,因为 FunctionCatalog 中的单个函数可通过空名称或任意名称进行查找。例如,假设 uppercase 是您目录中唯一的函数,则可将其查找为 catalog.lookup(null)、catalog.lookup(“”) 或 catalog.lookup(“foo”)。
话虽如此,对于那些使用 Spring Cloud Stream 等框架(该框架使用 spring.cloud.function.definition)的情况,建议始终使用 spring.cloud.function.definition 属性。
例如,
spring.cloud.function.definition=uppercase
筛选不合格的功能
典型的 ApplicationContext 可能包含一些有效的 Java 函数类型的 Bean,但这些函数并非旨在作为候选者注册到 FunctionCatalog 中。此类 Bean 可能来自其他项目的自动配置,或其他任何符合 Java 函数定义的 Bean。
该框架提供了对已知的不应作为注册候选对象的 Bean 的默认过滤功能。FunctionCatalog。您还可以通过使用 spring.cloud.function.ineligible-definitions 属性提供以逗号分隔的 Bean 定义名称列表,来向此列表中添加额外的 Bean。
例如,
spring.cloud.function.ineligible-definitions=foo,bar
提供商
提供商可以是 响应式 - Supplier<Flux<T>> 或 命令式 - Supplier<T>。
从调用角度出发,这对此类 Supplier 的实现者来说应无任何区别。
然而,当在框架(例如 Spring Cloud Stream)中使用时,提供商(尤其是响应式提供商)通常用于表示流的来源。因此,它们会被调用一次以获取该流(例如 Flux),消费者可以订阅该流。换句话说,此类提供商相当于一个 无限流。
尽管如此,相同的响应式提供商也可以表示一个有限的流(例如,通过轮询 JDBC 数据得到的结果集)。在这些情况下,此类响应式提供商必须连接到底层框架的某种轮询机制。
为了协助这一点,Spring Cloud Function 提供了一个标记注解 org.springframework.cloud.function.context.PollableBean,用于表明此类提供商生成的是有限流,并可能需要再次轮询。
然而,重要的是要理解,Spring Cloud Function 本身并未为此注解提供任何行为。
此外,PollableBean 注解公开了一个 splittable 属性,用于表明生成的流需要被拆分(参见 Splitter EIP)
这是一个示例:
@PollableBean(splittable = true)
public Supplier<Flux<String>> someSupplier() {
return () -> {
String v1 = String.valueOf(System.nanoTime());
String v2 = String.valueOf(System.nanoTime());
String v3 = String.valueOf(System.nanoTime());
return Flux.just(v1, v2, v3);
};
}
功能
函数也可以以命令式或响应式的方式编写。然而,与 Supplier 和 Consumer 不同,对于实现者而言,除了理解在框架(例如 Spring Cloud Stream)中使用时,响应式函数仅被调用一次,以传递对流的引用(即 Flux 或 Mono),而命令式函数则针对每个事件调用一次。
public Function<String, String> uppercase() {
. . . .
}
BiFunction
如果您需要随负载(payload)接收一些额外数据(元数据),您可以始终将函数签名声明为接收一个包含附加信息的头映射(map of headers)的 Message。
public Function<Message<String>, String> uppercase() {
. . . .
}
为了使您的函数签名更简洁、更像POJO(普通Java对象),还有另一种方法。您可以使用 BiFunction。
public BiFunction<String, Map, String> uppercase() {
. . . .
}
鉴于 Message 仅包含两个属性(负载和头信息),而 BiFunction 需要两个输入参数,框架将自动识别此签名,并从 Message 中提取负载作为第一个参数传入,同时将 Map(头信息)作为第二个参数传入。因此,您的函数不会与 Spring 的消息处理 API 耦合。请注意,BiFunction 要求严格的签名,其中第二个参数必须是 Map。该规则同样适用于 BiConsumer。
函数组合
函数组合(Function Composition)是一项功能,允许将多个函数组合成一个函数。核心支持基于 Java 8 中提供的函数组合功能,即 Function.andThen(..)。然而,Spring Cloud Function 在此基础上提供了若干额外功能。
声明式函数组合
此功能允许您使用 |(竖线)或 ,(逗号)分隔符,以声明式方式提供组合说明,当设置 spring.cloud.function.definition 属性时。
例如:
--spring.cloud.function.definition=uppercase|reverse
在这里,我们有效地提供了一个单一函数的定义,该函数本身是函数 uppercase 和函数 reverse 的组合。
事实上,这也是为什么属性名称为 definition 而非 name 的原因之一,因为一个函数的定义可以是多个已命名函数的组合。
如前所述,您可以使用 , 替代 |,例如 …definition=uppercase,reverse。
函数路由与过滤
自 Spring Cloud Function 2.2 版本起,该框架提供了路由功能,允许调用一个单一函数,该函数作为路由器将请求转发至实际的目标函数。</p><p>此功能在某些无服务器(FAAS)环境中非常有用,因为维护多个函数的配置可能十分繁琐,或者无法暴露多个函数。
在 FunctionCatalog 中,RoutingFunction 以名称 functionRouter 进行注册。为了简化和保持一致性,您也可以参考 RoutingFunction.FUNCTION_NAME 常量。
此函数具有以下签名:
public class RoutingFunction implements Function<Object, Object> {
// . . .
}
路由指令可以通过多种方式传达。我们支持通过消息头、系统属性以及可插拔策略来提供指令。让我们看一下其中的一些细节。
MessageRoutingCallback
代码 MessageRoutingCallback 是一种策略,用于辅助确定路由目标函数定义的名称。
public interface MessageRoutingCallback {
default String routingResult(Message<?> message) {
return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
}
}
您所需要做的就是实现并注册一个 MessageRoutingCallback 作为 Bean,以便被 RoutingFunction 捕获。例如:
@Bean
public MessageRoutingCallback customRouter() {
return new MessageRoutingCallback() {
@Override
public String routingResult(Message<?> message) {
return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
}
};
}
在前面的示例中,您可以看到 MessageRoutingCallback 的一种非常简单的实现方式,它通过传入的 Message 的 FunctionProperties.FUNCTION_DEFINITION Message 头部来确定函数定义,并返回一个 String 实例,该实例代表将要调用的函数定义。
消息头
如果输入参数的类型为 Message<?>,您可以通过设置 spring.cloud.function.definition 或 spring.cloud.function.routing-expression Message 头部来传递路由指令。正如该属性名称所暗示的那样,spring.cloud.function.routing-expression 依赖于 Spring 表达式语言(SpEL)。对于更静态的情况,您可以使用 spring.cloud.function.definition 头部,它允许您提供单个函数的名称(例如 …definition=foo)或组合指令(例如 …definition=foo|bar|baz)。对于更动态的情况,您可以使用 spring.cloud.function.routing-expression 头部,并提供一个 SpEL 表达式,该表达式应解析为一个函数定义(如上所述)。
SpEL 求值上下文的根对象是实际的输入参数,因此在 Message<?> 的情况下,您可以构建一个能够同时访问 payload 和 headers 的表达式(例如 spring.cloud.function.routing-expression=headers.function_name)。 |
SpEL 允许用户提供要执行的 Java 代码的字符串表示形式。鉴于 spring.cloud.function.routing-expression 可以通过消息头提供,这意味着设置此类表达式的功能可能向最终用户开放(即……e.使用 web 模块时的 HTTP 头部,这可能导致一些问题(例如。g.恶意代码)。为了实现这一点,所有通过消息头传入的表达式都只会针对 SimpleEvaluationContext 进行求值,而该对象功能有限,仅用于评估上下文对象(在我们的情况下即为 Message)。另一方面,所有通过属性或系统环境变量设置的表达式都会针对 StandardEvaluationContext 进行求值,从而充分利用 Java 语言的全部灵活性。虽然通过系统/应用程序属性或环境变量设置表达式通常被认为是安全的,因为它在正常情况下不会向最终用户暴露,但在某些情况下,最终用户确实可以通过 Spring Boot Actuator 端点(由其他 Spring 项目、第三方或最终用户自定义实现提供)访问到系统、应用程序和环境变量的可见性以及更新能力。此类端点必须使用行业标准的网络安全实践进行保护。Spring Cloud Function 不暴露任何此类端点。
|
在特定的执行环境/模型中,适配器负责将 spring.cloud.function.definition 和/或 spring.cloud.function.routing-expression 通过 Message 头部进行翻译和通信。例如,当使用 spring-cloud-function-web 时,您可以将 spring.cloud.function.definition 作为 HTTP 头部提供,框架会将其与其他 HTTP 头部一起作为消息头进行传播。
应用属性
路由指令也可以通过 spring.cloud.function.definition 或 spring.cloud.function.routing-expression 作为应用程序属性进行传递。上一节中描述的规则同样适用于此处。唯一的区别是,您将这些指令作为应用程序属性提供(例如:--spring.cloud.function.definition=foo)。
重要的是要理解,为消息头提供 spring.cloud.function.definition 或 spring.cloud.function.routing-expression 仅适用于命令式函数(例如 Function<Foo, Bar>)。也就是说,我们只能使用命令式函数对每条消息进行路由。 而在响应式函数中,我们无法对每条消息进行路由。 因此,您只能将路由指令作为应用属性提供。 这一切都与“工作单元”相关。 在命令式函数中,工作单元是消息,因此我们可以基于该工作单元进行路由。 而在响应式函数中,工作单元是整个数据流,因此我们仅依据通过应用属性提供的指令进行操作,并对整个数据流进行路由。 |
路由指令的优先级顺序
鉴于我们有多种提供路由指令的机制,重要的是要理解在同时使用多种机制时的冲突解决优先级。以下是其顺序:
-
MessageRoutingCallback(当函数为命令式时优先级最高,无论其他任何内容是否已定义) -
消息头(如果函数为强制性且未提供
MessageRoutingCallback) -
应用属性(任意功能)
无法路由的消息
如果目录中不存在路由函数,则会抛出异常,提示如下。
在某些情况下,这种行为并不希望出现,您可能需要一个“通配符”类型的函数来处理此类消息。
为实现此目的,框架提供了 org.springframework.cloud.function.context.DefaultMessageRoutingHandler 策略。
您只需将其注册为一个 Bean 即可。其默认实现仅会记录该消息无法路由的事实,但允许消息继续流转而不会抛出异常,从而有效地丢弃该不可路由的消息。
如果您需要更复杂的处理方式,只需提供该策略的自定义实现,并将其注册为一个 Bean 即可。
@Bean
public DefaultMessageRoutingHandler defaultRoutingHandler() {
return new DefaultMessageRoutingHandler() {
@Override
public void accept(Message<?> message) {
// do something really cool
}
};
}
函数过滤
过滤是一种路由类型,其中只有两条路径——‘通过’或‘丢弃’。从函数的角度来看,这意味着你仅在某些条件返回‘true’时才调用特定函数,否则则丢弃输入。
然而,当涉及到丢弃输入时,在您的应用程序上下文中,对“丢弃输入”可能含义的理解有很多。例如,您可能希望对其进行日志记录,或者希望维护一个被丢弃消息的计数器。您也可能完全不采取任何操作。
由于这些不同的路径,我们不提供一个通用的配置选项来处理被丢弃的消息。相反,我们简单地建议定义一个简单的 Consumer,它将表示‘丢弃’路径:
@Bean
public Consumer<?> devNull() {
// log, count, or whatever
}
现在,您可以拥有一个路由表达式,该表达式实际上只有两条路径,从而有效地充当过滤器。例如:
--spring.cloud.function.routing-expression=headers.contentType.toString().equals('text/plain') ? 'echo' : 'devNull'
所有不符合发送至 'echo' 函数条件的消息都将被路由到 'devNull',您可以在其中简单地忽略它。签名 Consumer<?> 同样可确保不会尝试进行任何类型转换,从而几乎不产生执行开销。
在处理响应式输入(例如 Publisher)时,路由指令必须仅通过 Function 属性提供。这是因为响应式函数的特性是仅被调用一次以传递一个 Publisher,其余部分由反应器(reactor)处理,因此我们无法访问或依赖于通过单个值(例如 Message)所传达的路由指令。 |
多个路由器
默认情况下,该框架将始终配置一个路由函数,其配置方式如前面章节所述。然而,有时您可能需要不止一个路由函数。在这种情况下,您可以创建自己的 RoutingFunction Bean 实例,除了现有实例之外,只要为其指定一个不同于 functionRouter 的名称即可。
您可以将 spring.cloud.function.routing-expression 或 spring.cloud.function.definition 作为键/值对传递给 RoutingFunction(以 Map 的形式)。
这是一个简单的示例:
@Configuration
protected static class MultipleRouterConfiguration {
@Bean
RoutingFunction mySpecialRouter(FunctionCatalog functionCatalog, BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
Map<String, String> propertiesMap = new HashMap<>();
propertiesMap.put(FunctionProperties.PREFIX + ".routing-expression", "'reverse'");
return new RoutingFunction(functionCatalog, propertiesMap, new BeanFactoryResolver(beanFactory), routingCallback);
}
@Bean
public Function<String, String> reverse() {
return v -> new StringBuilder(v).reverse().toString();
}
@Bean
public Function<String, String> uppercase() {
return String::toUpperCase;
}
}
这是一个测试,用于演示其工作原理:
@Test
public void testMultipleRouters() {
System.setProperty(FunctionProperties.PREFIX + ".routing-expression", "'uppercase'");
FunctionCatalog functionCatalog = this.configureCatalog(MultipleRouterConfiguration.class);
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
assertThat(function).isNotNull();
Message<String> message = MessageBuilder.withPayload("hello").build();
assertThat(function.apply(message)).isEqualTo("HELLO");
function = functionCatalog.lookup("mySpecialRouter");
assertThat(function).isNotNull();
message = MessageBuilder.withPayload("hello").build();
assertThat(function.apply(message)).isEqualTo("olleh");
}
输入/输出增强
有时您需要修改或精炼传入或传出的消息,以保持代码不被非功能性关注点所污染。您不希望在业务逻辑内部进行此类操作。
您始终可以通过 函数组合 实现这一点。这种方法提供了多种优势:
-
它允许您将这一非功能性关注点隔离到一个独立的函数中,您可以将其与业务函数组合为一个函数定义。
-
它为您提供完全的自由(以及风险),让您可以在传入消息到达实际业务函数之前对其进行任意修改。
@Bean
public Function<Message<?>, Message<?>> enrich() {
return message -> MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
}
@Bean
public Function<Message<?>, Message<?>> myBusinessFunction() {
// do whatever
}
然后,通过提供以下函数定义来组合您的函数:enrich|myBusinessFunction。
虽然所描述的方法最为灵活,但也是最复杂的。它要求您编写一些代码,然后将其作为 Bean 进行注册,或在使用前手动将其注册为一个函数,正如前面示例所示,您才能将其与业务函数进行组合。
但如果你试图进行的修改(增强)如同前面示例中那样微不足道,又该如何处理呢?是否存在一种更简单、更灵活且可配置的机制来实现相同的效果?
自版本 3.1.3 起,该框架允许您为输入函数和输出函数的单个消息头提供 SpEL 表达式以进行丰富增强。让我们以其中一个测试为例进行说明。
@Test
public void testMixedInputOutputHeaderMapping() throws Exception {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.main.lazy-initialization=true",
"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut1='hello1'",
"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut2=headers.contentType",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key1=headers.path.split('/')[0]",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key2=headers.path.split('/')[1]",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key3=headers.path")) {
FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class);
FunctionInvocationWrapper function = functionCatalog.lookup("split");
Message<byte[]> result = (Message<byte[]>) function.apply(MessageBuilder.withPayload("hello")
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
.setHeader("path", "foo/bar/baz")
.build());
assertThat(result.getHeaders()).containsKey("keyOut1"));
assertThat(result.getHeaders().get("keyOut1")).isEqualTo("hello1");
assertThat(result.getHeaders()).containsKey("keyOut2"));
assertThat(result.getHeaders().get("keyOut2")).isEqualTo("application/json");
}
}
在这里,您可以看到名为 input-header-mapping-expression 和 output-header-mapping-expression 的属性,它们前面是函数名称(即 split),接着是您希望设置的消息头键名以及对应的值(SpEL 表达式)。
您还可以观察到输入头映射中的一些有趣特性,其中我们实际上正在拆分现有头 'path' 的值,并根据索引将拆分后的元素值分别设置为 key1 和 key2 的值。
| 如果由于任何原因表达式求值失败,函数的执行将照常进行,仿佛什么都没发生过。</p><p>不过,您会在日志中看到一条警告消息,通知您该情况。 |
o.s.c.f.context.catalog.InputEnricher : Failed while evaluating expression "hello1" on incoming message. . .
在处理具有多个输入参数的函数(下一节)时,您可以在 input-header-mapping-expression 之后立即使用索引:
--spring.cloud.function.configuration.echo.input-header-mapping-expression[0].key1=‘hello1'
--spring.cloud.function.configuration.echo.input-header-mapping-expression[1].key2='hello2'
函数参数个数
有时,需要对数据流进行分类和组织。例如,考虑一个经典的大型数据应用场景:处理包含‘订单’和‘发票’等未组织数据,您希望将它们分别存入不同的数据存储中。这就是函数元数(具有多个输入和输出的函数)支持发挥作用的地方。
让我们看一个此类函数的示例:MessageRoutingCallback。
| 完整的实现细节可在 此处 查看。 |
@Bean
public Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> organise() {
return flux -> ...;
}
鉴于 Project Reactor 是 SCF 的核心依赖项,我们正在使用其 Tuple 库。Tuples 通过向我们传达 基数 和 类型 信息,赋予我们一项独特优势。在 SCSt 的上下文中,这两者都极为重要:基数可帮助我们确定需要为函数的对应输入和输出创建并绑定多少个输入和输出绑定;而对类型信息的了解则可确保进行正确的类型转换。
此外,这也是命名约定中‘index’部分发挥作用的地方,因为在该函数中,两个输出绑定名称分别为 organise-out-0 和 organise-out-1。
目前,函数元数(function arity)仅对响应式函数(Function<TupleN<Flux<?>…>, TupleN<Flux<?>…>>)提供支持,这类函数以复杂事件处理为中心,通常需要在多个事件汇聚的背景下进行评估与计算,而非单个事件。 |
输入头传播
在典型场景中,输入消息头通常不会传播到输出端,这是合理的,因为函数的输出可能作为其他组件的输入,而后者需要自己的一组消息头。</p><p>然而,在某些情况下,这种传播可能是必要的,因此 Spring Cloud Function 提供了多种机制来实现此功能。
首先,您始终可以手动复制标题。例如,如果您有一个函数,其签名是接受 Message 并返回 Message(即 Function<Message, Message>),您可以简单地自行选择性地复制标题。请记住,如果您的函数返回的是 Message 类型,框架将不会对它执行任何操作,除了正确转换其有效载荷。然而,这种做法可能在某些情况下显得有些繁琐,特别是当您只是想复制所有标题时。为协助处理此类情况,我们提供了一个简单的属性,允许您在需要将输入标题传播的函数上设置一个布尔标志。该属性是 copy-input-headers。
例如,假设您具有以下配置:
@EnableAutoConfiguration
@Configuration
protected static class InputHeaderPropagationConfiguration {
@Bean
public Function<String, String> uppercase() {
return x -> x.toUpperCase();
}
}
正如您所知,您仍可以通过向其发送一条消息来调用此函数(框架将负责类型转换和负载提取)
只需将spring.cloud.function.configuration.uppercase.copy-input-headers设置为true,以下断言也将成立
Function<Message<String>, Message<byte[]>> uppercase = catalog.lookup("uppercase", "application/json");
Message<byte[]> result = uppercase.apply(MessageBuilder.withPayload("bob").setHeader("foo", "bar").build());
assertThat(result.getHeaders()).containsKey("foo");
类型转换(内容协商)
内容类型协商是 Spring Cloud Function 的核心功能之一,它不仅允许将传入的数据转换为函数签名所声明的类型,还能在函数组合过程中执行相同的转换,从而使原本因类型不匹配而无法组合(un-composable)的函数变得可组合。
为了更好地理解内容类型协商背后的机制和必要性,我们以一个非常简单的用例为例,使用以下函数作为示例:
@Bean
public Function<Person, String> personFunction {..}
前面示例中所示的函数期望以 Person 对象作为参数,并输出 String 类型。如果该函数以 Person 类型调用,则一切正常。
Spring Cloud Function 依赖于 Spring 的两种原生机制来实现该功能。
-
MessageConverter - 用于将传入的 Message 数据转换为函数声明的类型。
-
ConversionService - 将传入的非 Message 数据转换为函数所声明的类型。
这意味着,根据原始数据的类型(Message 或非 Message),Spring Cloud Function 将应用其中一种或另一种机制。
在大多数情况下,当处理作为其他请求(例如 HTTP、消息传递等)一部分而调用的函数时,框架依赖于 MessageConverters,因为此类请求已转换为 Spring Message。换句话说,框架会定位并应用适当的 MessageConverter。为了实现这一点,框架需要用户提供的某些指令。其中一条指令已由函数本身的签名提供(Person 类型)。因此,理论上,这应该足够了(在某些情况下确实如此)。然而,对于大多数使用场景,为了选择适当的 MessageConverter,框架还需要额外的信息。缺失的这一部分是 contentType 头部。
此类标题通常作为消息的一部分出现,由创建该消息的相应适配器注入。例如,HTTP POST 请求会将其 content-type HTTP 头部复制到消息的 contentType 头部中。
当此类头信息不存在时,框架将依赖默认的内容类型,即 application/json。
内容类型与参数类型
如前所述,为了使框架选择适当的 MessageConverter,它需要参数类型以及可选的内容类型信息。选择适当 MessageConverter 的逻辑由参数解析器负责,这些解析器在调用用户定义函数之前触发(此时框架已确切知道实际参数类型)。如果参数类型与当前有效载荷的类型不匹配,框架会将处理权委托给预先配置好的 MessageConverters 解析器栈,以检查其中是否有任何一个能够将有效载荷转换为所需类型。
通过 contentType 和参数类型组合的方式,框架可确定是否能将消息转换为目标类型,即通过查找合适的 MessageConverter 来实现。如果未找到适当的 MessageConverter,则会抛出异常,您可以通过添加自定义的 MessageConverter 来处理该异常(详见 User-defined Message Converters)。
不要期望仅根据 Message 就将 contentType 转换为其他类型。请记住, contentType 是目标类型的补充。它是一种提示,MessageConverter 可能会考虑,也可能不会考虑。 |
消息转换器
MessageConverters 定义两个方法:
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);
理解这些方法的契约及其使用方式非常重要,尤其是在 Spring Cloud Stream 的上下文中。
方法 fromMessage 将传入的 Message 转换为参数类型。Message 的负载可以是任意类型,具体实现类 MessageConverter 负责支持多种类型。
提供的 MessageConverters
如前所述,该框架已提供了一套 MessageConverters,用于处理大多数常见用例。以下列表按优先级顺序描述了所提供的 MessageConverters(第一个适用的 MessageConverter 将被使用):
-
JsonMessageConverter: 支持将Message的负载在 POJO 与消息体之间进行转换,适用于contentType为application/json的情况(默认使用 Jackson 或 Gson 库)。该消息转换器还能够识别type参数(例如:application/json;type=foo.bar.Person)。这在某些情况下非常有用:当函数开发时类型尚不明确,因此函数签名可能看起来像Function<?, ?>、Function或Function<Object, Object>。换句话说,对于类型转换,我们通常依据函数签名来推导类型;而通过 MIME 类型参数,可以以更动态的方式传递类型信息。 -
ByteArrayMessageConverter: 支持将Message的负载从byte[]转换为byte[],适用于contentType为application/octet-stream的情况。它本质上是直接传递(pass through),主要目的是为了向后兼容。 -
StringMessageConverter: 支持将任何类型转换为String,当contentType为text/plain时。
当未找到合适的转换器时,框架会抛出异常。发生这种情况时,您应检查您的代码和配置,确保您没有遗漏任何内容(即,确保您通过绑定或头信息正确提供了 contentType)。
然而,更有可能的是,您遇到了一些不常见的场景(例如自定义的 contentType),而当前提供的 MessageConverters 转换器堆栈尚不知道如何进行转换。
如果是这种情况,您可以添加自定义的 MessageConverter。参见 用户自定义消息转换器。
用户自定义消息转换器
Spring Cloud Function 提供了一种机制,用于定义和注册额外的 MessageConverters。要使用它,请实现 org.springframework.messaging.converter.MessageConverter,并将其配置为 @Bean。随后,它将被添加到现有的 `MessageConverter` 栈中。
重要的是要理解,自定义 MessageConverter 实现会被添加到现有栈的顶部。因此,自定义 MessageConverter 实现会优先于现有的实现,这使您能够覆盖并扩展现有的转换器。 |
以下示例展示了如何创建一个消息转换器 bean,以支持一种名为 application/bar 的新内容类型:
@SpringBootApplication
public static class SinkApplication {
...
@Bean
public MessageConverter customMessageConverter() {
return new MyCustomMessageConverter();
}
}
public class MyCustomMessageConverter extends AbstractMessageConverter {
public MyCustomMessageConverter() {
super(new MimeType("application", "bar"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (Bar.class.equals(clazz));
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
}
}
JSON 选项说明
在 Spring Cloud Function 中,我们支持 Jackson 和 Gson 机制来处理 JSON。
为了您的便利,我们已将其抽象为 org.springframework.cloud.function.json.JsonMapper,该抽象本身了解两种机制,并将根据您选择的或遵循默认规则的方式使用其中一种。
默认规则如下:
-
无论类路径上存在哪个库,都将使用该库作为机制。因此,如果您将
com.fasterxml.jackson.*添加到类路径中,将使用 Jackson;如果您将com.google.code.gson添加到类路径中,则将使用 Gson。 -
如果您同时拥有两者,则 Gson 将作为默认选项,或者您可以设置
spring.cloud.function.preferred-json-mapper属性,并使用以下两个值中的任意一个:gson或jackson。
话说,类型转换通常对开发者是透明的。然而,由于 org.springframework.cloud.function.json.JsonMapper 也被注册为一个 Bean,因此如果需要,您很容易就能将其注入到您的代码中。
Kotlin Lambda 支持
我们还为 Kotlin lambda 表达式提供支持(自 v2.0 起)。<br/>考虑以下示例:
@Bean
open fun kotlinSupplier(): () -> String {
return { "Hello from Kotlin" }
}
@Bean
open fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
open fun kotlinConsumer(): (String) -> Unit {
return { println(it) }
}
上面展示了将 Kotlin lambda 表达式配置为 Spring Bean 的示例。每个表达式的签名都对应于一个 Java 等价形式的 Supplier、Function 和 Consumer,因此框架支持/识别的签名与此一致。虽然 Kotlin 到 Java 的映射机制超出了本文档的范围,但重要的是要理解:在“Java 8 函数支持”一节中所阐述的签名转换规则同样适用于此处。
要启用 Kotlin 支持,您只需将 Kotlin SDK 库添加到类路径中,这将触发相应的自动配置和相关支持类。
函数组件扫描
Spring Cloud Function 将扫描包名为 functions 中的 Function、Consumer 和 Supplier 的实现类(如果该包存在的话)。
利用此功能,您可以编写不依赖 Spring 框架的函数——甚至不需要 @Component 注解。
如果您希望使用其他包,可设置 spring.cloud.function.scan.packages。
您还可以使用 spring.cloud.function.scan.enabled=false 完全关闭扫描功能。
数据掩码
典型的应用程序包含多个级别的日志记录。某些云/服务器端平台可能在日志包中包含敏感数据,而这些日志包对所有人可见。虽然检查被记录数据的责任在于各个开发人员,但由于日志来自框架本身,从 4.1 版本开始,我们已引入 JsonMasker 来初步帮助屏蔽 AWS Lambda 请求负载中的敏感数据。然而,JsonMasker 是通用的,并适用于任何模块。目前它仅支持结构化数据(如 JSON)。您只需指定需要屏蔽的键名,其余工作将由系统自动完成。键名应列在文件 META-INF/mask.keys 中。该文件的格式非常简单,您可用逗号、换行符或两者结合来分隔多个键。
以下是此类文件内容的示例:
eventSourceARN
asdf1, SS
在这里,您可以看到三个键已定义。
一旦此类文件存在,JsonMasker 将使用它来屏蔽所指定键的值。
并且,以下是展示用法的示例代码:
private final static JsonMasker masker = JsonMasker.INSTANCE();
// . . .
logger.info("Received: " + masker.mask(new String(payload, StandardCharsets.UTF_8)));