|
请使用 spring-cloud-function 5.0.1 获取最新稳定版本! |
编程模型
功能目录与灵活的功能签名
Spring Cloud Function 的主要特性之一是能够适应并支持多种用户自定义函数的类型签名,同时提供一致的执行模型。这就是为什么所有用户自定义函数都会被 FunctionCatalog 转换为一种标准表示形式。
虽然用户通常不需要关心 FunctionCatalog,但了解在用户代码中支持哪些类型的函数是有用的。
同样重要的是要理解,Spring Cloud Function 为 Project Reactor 提供的响应式 API 提供了第一类支持,允许将响应式原语(如 Mono 和 Flux)用作用户自定义函数中的类型,从而在选择函数实现的编程模型时提供更大的灵活性。响应式编程模型还使得能够以功能性方式支持那些使用命令式编程风格难以甚至无法实现的功能。有关更多内容,请阅读 函数元数 部分。
Java 8 函数式支持
Spring Cloud Function 接受并建立在 Java 8 起就已提供的三个核心函数式接口之上。
-
提供商<O>
-
Function<I, O>
-
Consumer<I>
为避免不断提及 Supplier、Function 和 Consumer,在本手册其余部分中,我们将适当使用“函数式 Bean”来指代它们。
在一句话中,您的应用程序上下文中的任何函数式 bean 都将延迟注册为0 。这意味着它可以受益于本参考手册中描述的所有其他功能。
在最简单的应用程序中,您需要在应用程序配置中声明@Bean类型的Supplier、Function或Consumer。然后您可以访问FunctionCatalog并根据其名称查找特定函数。
例如:
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
. . .
FunctionCatalog catalog = applicationContext.getBean(FunctionCatalog.class);
Function uppercase = catalog.lookup(“uppercase”);
重要的是要理解,考虑到uppercase是一个bean,你可以直接从ApplicationContext获取它,但你所得到的只是你声明的bean本身,没有scf提供的任何额外功能。当你通过FunctionCatalog查找函数时,你将获得一个被包装(增强)的实例,该实例具有额外的功能(例如。e., 类型转换,组合等。)本手册中所述。此外,重要的是要了解典型的用户不会直接使用 Spring Cloud Function。相反,典型用户使用 Java Function/Supplier/Consumer 的想法是在不同的执行上下文中使用它而无需额外的工作。例如,相同的 Java 函数可以通过 Spring Cloud Function 提供的适配器表示为 REST 端点、流式消息处理程序 或 AWS Lambda 等更多形式,也可以通过使用 Spring Cloud Function 作为核心编程模型的其他框架实现。g., Spring Cloud Stream).总之,Spring Cloud Function通过附加功能对Java函数进行增强,以便在各种执行环境中使用。
函数定义
虽然上一个示例向您展示了如何通过编程方式在FunctionCatalog中查找函数,但在典型的集成情况下,当Spring Cloud Function作为另一个框架(例如Spring Cloud Stream)的编程模型使用时,您可以通过spring.cloud.function.definition属性声明要使用的函数。了解在FunctionCatalog中发现函数时的一些默认行为非常重要。例如,如果您只有一个Functional bean在您的ApplicationContext中,则通常不需要spring.cloud.function.definition属性,因为单个函数可以在FunctionCatalog中通过空名称或任何名称进行查找。例如,假设uppercase是您的目录中的唯一函数,它可以被查找为catalog.lookup(null)、catalog.lookup(“”)、catalog.lookup(“foo”)。不过,在使用如Spring Cloud Stream这样的框架的情况下,最好且建议始终使用spring.cloud.function.definition属性。
例如,
spring.cloud.function.definition=uppercase
筛选不合格的功能
典型的 Application Context 可能包括有效 java 函数的 bean,但这些函数并非旨在注册到 FunctionCatalog。 这些 bean 可以是来自其他项目的自动配置或任何符合 Java 函数资格的其他 bean。
框架提供了默认过滤器,用于排除已知不应注册到功能目录的 bean。
您还可以通过使用spring.cloud.function.ineligible-definitions属性提供逗号分隔的 bean 定义名称列表来添加额外的 bean。
例如,
spring.cloud.function.ineligible-definitions=foo,bar
提供商
提供商可以是 响应式 - Supplier<Flux<T>> 或 命令式 - Supplier<T>。从调用的角度来看,这应该对这种提供商的实现者没有影响。然而,在框架(例如 Spring Cloud Stream)中使用时,特别是响应式的提供商通常用于表示流的源,因此它们被调用一次以获取流(如Flux),消费者可以订阅该流。换句话说,这些提供商代表了相当于一个 无限流 的东西。但是,相同的响应式提供商也可以表示 有限流(例如轮询JDBC数据的结果集)。在这些情况下,这样的响应式提供商必须连接到底层框架的一些轮询机制。
为了实现这一点,Spring Cloud Function 提供了一个标记注解org.springframework.cloud.function.context.PollableBean来表明该提供商生成的是有限流,并且可能需要再次轮询。需要注意的是,Spring Cloud Function 本身并不为此注解提供任何行为。
除了PollableBean注解外,还公开了一个
这是一个例子:
@PollableBean(splittable = true)
public Supplier<Flux<String>> someSupplier() {
return () -> {
String v1 = String.valueOf(System.nanoTime());
String v2 = String.valueOf(System.nanoTime());
String v3 = String.valueOf(System.nanoTime());
return Flux.just(v1, v2, v3);
};
}
函数组合
函数组合是允许将多个函数组合成一个的功能。 核心支持基于 Java 8 提供的 Function.andThen(..) 函数组合功能。然而,在此基础上,我们还提供了一些额外的功能。
声明式函数组合
此功能允许您使用|(管道)或,(逗号)分隔符以声明式方式提供组成指令,当提供spring.cloud.function.definition属性时。
例如
--spring.cloud.function.definition=uppercase|reverse
在这里,我们实际上提供了一个单一函数的定义,该函数本身是函数uppercase和函数reverse的组合。事实上,这就是属性名称为definition而不是name的原因之一,因为一个函数的定义可以是几个命名函数的组合。正如前面提到的,你可以使用,代替管道(如…definition=uppercase,reverse)。
函数路由与过滤
从版本 2.2 开始,Spring Cloud Function 提供了路由功能,允许您调用一个作为路由器的单一函数来实际调用您希望调用的目标函数。此功能在某些 FAAS 环境中非常有用,在这些环境中维护多个函数的配置可能很繁琐或无法暴露超过一个函数。
在RoutingFunction注册到FunctionCatalog下,名称为functionRouter。为了简单起见和一致性,您也可以引用RoutingFunction.FUNCTION_NAME常量。
此函数具有以下签名:
public class RoutingFunction implements Function<Object, Object> {
. . .
}
路由指令可以通过多种方式传达。我们支持通过消息头、系统属性以及可插拔策略提供指令。因此,让我们来看一些细节。
MessageRoutingCallback
代码 MessageRoutingCallback 是一种策略,用于辅助确定路由目标函数定义的名称。
public interface MessageRoutingCallback {
default String routingResult(Message<?> message) {
return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
}
}
您所需要做的就是实现并将其注册为一个bean,以便被RoutingFunction拾取。例如:
@Bean
public MessageRoutingCallback customRouter() {
return new MessageRoutingCallback() {
@Override
public String routingResult(Message<?> message) {
return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
}
};
}
在前面的例子中,您可以看到MessageRoutingCallback的一个非常简单的实现,它从传入消息的FunctionProperties.FUNCTION_DEFINITION消息头确定函数定义,并返回表示要调用的函数定义的String实例。
消息头
如果输入参数是Message<?>类型,您可以通过设置spring.cloud.function.definition或spring.cloud.function.routing-expression消息头之一来传递路由指令。正如属性名称所暗示的,spring.cloud.function.routing-expression依赖于Spring表达式语言(SpEL)。对于更静态的情况,您可以使用spring.cloud.function.definition头部,允许您提供单个函数(例如…definition=foo)或组合指令(例如…definition=foo|bar|baz)。对于更具动态性的场景,可以使用spring.cloud.function.routing-expression标头并提供一个SpEL表达式,该表达式应解析为上述定义的函数。
SpEL 求值上下文的根对象是实际输入参数,因此在 Message<?> 的情况下,您可以构造一个表达式,该表达式可以访问 payload 和 headers(例如 spring.cloud.function.routing-expression=headers.function_name)。 |
SpEL 允许用户提供要执行的 Java 代码的字符串表示形式。考虑到spring.cloud.function.routing-expression可以通过消息头提供,这意味着可以向最终用户暴露设置此类表达式的功能(即。e., 使用 web 模块时的 HTTP 标头)可能导致一些问题(例如。g.,恶意代码)。为了管理这一点,所有通过消息头传入的表达式只会针对SimpleEvaluationContext进行评估,其功能有限,并且仅设计用于评估上下文对象(在我们的情况下是Message)。另一方面,通过属性或系统变量设置的所有表达式都针对StandardEvaluationContext进行评估,这允许Java语言的完全灵活性。通过系统/应用程序属性或环境变量设置表达式通常被认为是安全的,因为在正常情况下这些信息不会暴露给最终用户。然而,在某些情况下,通过Spring Boot Actuator端点(由一些Spring项目、第三方提供或用户自定义实现)确实会向最终用户提供查看以及更新系统、应用程序和环境变量的能力。此类端点必须使用行业标准的网络安全实践进行保护。Spring Cloud Function 不暴露任何此类端点。
|
在特定的执行环境/模型中,适配器负责通过消息头来转换和通信
spring.cloud.function.definition 和 / 或 spring.cloud.function.routing-expression。
例如,在使用 spring-cloud-function-web 时,您可以提供 spring.cloud.function.definition 作为 HTTP
头部,框架也会将其传播以及其他 HTTP 头部作为消息头部。
应用属性
路由指令也可以通过spring.cloud.function.definition或spring.cloud.function.routing-expression作为应用程序属性来传达。上一节中描述的规则也适用于此处。唯一的区别是,您提供这些指示作为应用程序属性(例如--spring.cloud.function.definition=foo)。
重要的是要明白,仅当使用命令式函数(例如Function<Foo, Bar>)时,提供spring.cloud.function.definition或spring.cloud.function.routing-expression作为消息头才会起作用。也就是说我们只能基于每条消息对命令式函数进行路由。对于响应式函数来说,我们不能按每条消息进行路由。因此你只能通过应用程序属性来提供你的路由指令。这全部与工作单元有关。在命令式函数中,工作单元是消息,所以我们能够根据这样的工作单元来进行路由。而在响应式函数中,工作单元是整个流,因此我们将只根据应用程序提供的指令操作,并对整个流进行路由。 |
路由指令的优先级顺序
鉴于我们有多种提供路由指令的机制,了解在同时使用多种机制时冲突解决的优先级非常重要,因此顺序如下:
-
MessageRoutingCallback(如果函数是命令式的,将接管无论其他定义什么) -
消息头(如果函数为强制性且未提供
MessageRoutingCallback) -
应用属性(任意功能)
无法路由的消息
如果目录中没有可用的事件路由函数,您将收到一个异常信息。
在某些情况下,这种行为是不希望出现的,您可能需要有一个“兜底”类型的函数来处理这些消息。
为了实现这一点,框架提供了org.springframework.cloud.function.context.DefaultMessageRoutingHandler策略。您所需要做的就是将其注册为一个bean。
它的默认实现将简单地记录下该消息无法路由的事实,但允许消息流继续进行而不抛出异常,从而有效地丢弃无法路由的消息。
如果您想要更复杂的操作,只需要提供自己的此策略的实现并将其注册为一个bean即可。
@Bean
public DefaultMessageRoutingHandler defaultRoutingHandler() {
return new DefaultMessageRoutingHandler() {
@Override
public void accept(Message<?> message) {
// do something really cool
}
};
}
函数过滤
过滤是一种路由类型,它只有两种路径:'继续'或'丢弃'。从函数的角度来说,这意味着你只希望在某些条件返回'true'时调用特定的函数,否则要丢弃输入。<br/>然而,在涉及到丢弃输入时,根据你的应用程序的上下文,可能有多种解释。<br/>例如,你可能想要记录日志,或者维护一个被丢弃消息的计数器。当然,你也可能什么都不想做。<br/>由于这些不同的处理方式,我们没有提供一般性的配置选项来处理被丢弃的消息。<br/>相反,我们建议定义一个简单的Consumer来表示'丢弃'路径:
@Bean
public Consumer<?> devNull() {
// log, count or whatever
}
现在,您可以拥有路由表达式,该表达式实际上只有两个路径,从而成为过滤器。例如:
--spring.cloud.function.routing-expression=headers.contentType.toString().equals('text/plain') ? 'echo' : 'devNull'
每个不符合发送到 'echo' 函数条件的消息都会被发送到 'devNull',你可以在那里什么都不用做。
签名 Consumer<?> 还能确保不会尝试任何类型转换,从而几乎没有任何执行开销。
| 在处理响应式输入(例如 Publisher)时,路由指令必须仅通过 Function 属性提供。这是因为响应式函数的性质决定了它们只会被调用一次来传递一个 Publisher,并且其余操作由 reactor 处理,因此我们无法访问和/或依赖于通过单个值(例如 Message)传达的路由指令。 |
多个路由器
默认情况下,框架将始终配置一个单一的路由函数,如前面部分所述。但是,在某些时候,您可能需要多个路由函数。
在这种情况下,您可以创建自己RoutingFunctionbean的实例,除了现有的实例之外,只要给它取其他名字而不是functionRouter即可。
您可以将 spring.cloud.function.routing-expression 或 spring.cloud.function.definition 作为键/值对传递给 RoutinFunction 的映射中。
这是一个简单的例子
@Configuration
protected static class MultipleRouterConfiguration {
@Bean
RoutingFunction mySpecialRouter(FunctionCatalog functionCatalog, BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
Map<String, String> propertiesMap = new HashMap<>();
propertiesMap.put(FunctionProperties.PREFIX + ".routing-expression", "'reverse'");
return new RoutingFunction(functionCatalog, propertiesMap, new BeanFactoryResolver(beanFactory), routingCallback);
}
@Bean
public Function<String, String> reverse() {
return v -> new StringBuilder(v).reverse().toString();
}
@Bean
public Function<String, String> uppercase() {
return String::toUpperCase;
}
}
并提供一个测试,演示如何使用它
`
@Test
public void testMultipleRouters() {
System.setProperty(FunctionProperties.PREFIX + ".routing-expression", "'uppercase'");
FunctionCatalog functionCatalog = this.configureCatalog(MultipleRouterConfiguration.class);
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
assertThat(function).isNotNull();
Message<String> message = MessageBuilder.withPayload("hello").build();
assertThat(function.apply(message)).isEqualTo("HELLO");
function = functionCatalog.lookup("mySpecialRouter");
assertThat(function).isNotNull();
message = MessageBuilder.withPayload("hello").build();
assertThat(function.apply(message)).isEqualTo("olleh");
}
[[input/output-enrichment]]<br/>== 输入/输出增强<br/>
在许多情况下,您需要修改或优化传入或传出的消息,并保持代码中没有非功能性的问题。您不希望在里面执行业务逻辑。
您可以通过函数组合来实现这一点。这种方法提供了几个好处:
-
它允许你将这种非功能性需求隔离到一个单独的功能中,你可以将其与业务功能组合为函数定义。
-
它为您提供完全的自由(以及危险),以便您可以在传入消息到达实际业务功能之前对其进行修改。
@Bean
public Function<Message<?>, Message<?>> enrich() {
return message -> MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
}
@Bean
public Function<Message<?>, Message<?>> myBusinessFunction() {
// do whatever
}
然后通过提供以下函数定义来组合你的函数enrich|myBusinessFunction。
虽然所述方法是最灵活的,但也是最复杂的,因为它要求您编写一些代码,将其作为bean或在可以将它与业务函数组合之前手动注册为一个函数,如前面的示例所示。
但如果要进行的修改(增强)像前面的例子那样是微不足道的呢?有没有更简单、更动态和可配置的方法来完成同样的事情?
从版本3.1.3开始,该框架允许您提供SpEL表达式来丰富进入函数的输入和输出单个消息头。让我们看看其中一个测试作为例子。
@Test
public void testMixedInputOutputHeaderMapping() throws Exception {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.main.lazy-initialization=true",
"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut1='hello1'",
"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut2=headers.contentType",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key1=headers.path.split('/')[0]",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key2=headers.path.split('/')[1]",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key3=headers.path")) {
FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class);
FunctionInvocationWrapper function = functionCatalog.lookup("split");
Message<byte[]> result = (Message<byte[]>) function.apply(MessageBuilder.withPayload("helo")
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
.setHeader("path", "foo/bar/baz").build());
assertThat(result.getHeaders().containsKey("keyOut1")).isTrue();
assertThat(result.getHeaders().get("keyOut1")).isEqualTo("hello1");
assertThat(result.getHeaders().containsKey("keyOut2")).isTrue();
assertThat(result.getHeaders().get("keyOut2")).isEqualTo("application/json");
}
}
此处您会看到一个属性,名为input-header-mapping-expression和output-header-mapping-expression,前面是函数的名称(即split),后面是想要设置的消息头键名以及值作为SpEL表达式。第一个表达式(用于‘keyOut1’)是用单引号括起来的文字SpEL表达式,有效地将'keyOut1' 设置为值hello1。keyOut2被设置为现有'contentType'头部的值。
您还可以观察到输入标头映射中的一些有趣功能,我们实际上正在拆分现有标头 'path' 的值,并根据索引将 key1 和 key2 的各个值设置为拆分元素的值。
| 如果由于任何原因提供的表达式求值失败,函数的执行将像什么都没发生过一样继续。</p><p>但是您会在日志中看到一条警告消息,通知您发生了这种情况。 |
o.s.c.f.context.catalog.InputEnricher : Failed while evaluating expression "hello1" on incoming message. . .
如果您正在处理具有多个输入的函数(下一节),可以在input-header-mapping-expression之后立即使用索引
--spring.cloud.function.configuration.echo.input-header-mapping-expression[0].key1=‘hello1'
--spring.cloud.function.configuration.echo.input-header-mapping-expression[1].key2='hello2'
函数参数个数
有时需要对数据流进行分类和整理。例如,考虑一个经典的大型数据用例,即处理包含“订单”和“发票”的无序数据,并希望将它们分别存储到不同的数据存储中。这时就需要使用具有多个输入和输出的功能参数(函数)支持。
让我们来看一个这样的函数的例子(完整的实现细节可以在这里找到),
@Bean
public Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> organise() {
return flux -> ...;
}
由于Project Reactor是SCF的核心依赖项,因此我们使用其Tuple库。元组通过向我们提供基数和类型信息而赋予了我们独特的优势。这两者在SCSt的上下文中都非常重要。基数使我们知道需要创建和绑定多少个输入和输出绑定到函数的相应输入和输出上。了解类型信息可以确保正确地进行类型转换。
此外,这也是绑定名称命名约定中的“index”部分发挥作用的地方,因为在该函数中,两个输出绑定名称为organise-out-0和organise-out-1。
重要提示:目前,函数数量仅对响应式函数(Function<TupleN<Flux<?>…>, TupleN<Flux<?>…>>)支持,这些函数围绕复杂事件处理中心展开。在事件汇聚时进行评估和计算通常需要查看一系列事件,而不仅仅是单个事件。 |
输入头传播
在典型场景中,输入消息头通常不会传播到输出端,这是合理的,因为函数的输出可能作为其他组件的输入,而后者需要自己的一组消息头。</p><p>然而,在某些情况下,这种传播可能是必要的,因此 Spring Cloud Function 提供了多种机制来实现此功能。
首先,您可以手动复制标头。例如,如果您有一个函数签名需要 Message 并返回 Message(即 Function<Message, Message>),则可以自行选择性地复制标头。请记住,如果您的函数返回的是Message,则框架除了正确转换其有效负载外不会对其做任何操作。
然而,在某些情况下您只想复制所有标头时,这种做法可能会显得有点繁琐。
为了帮助处理这种情况,我们提供了一个简单的属性,该属性允许在您希望输入标头被传播的函数上设置一个布尔标志。
该属性是 copy-input-headers。
例如,假设您具有以下配置:
@EnableAutoConfiguration
@Configuration
protected static class InputHeaderPropagationConfiguration {
@Bean
public Function<String, String> uppercase() {
return x -> x.toUpperCase();
}
}
正如您所知,您仍可以通过向其发送一条消息来调用此函数(框架将负责类型转换和负载提取)
只需将spring.cloud.function.configuration.uppercase.copy-input-headers设置为true,以下断言也将成立
Function<Message<String>, Message<byte[]>> uppercase = catalog.lookup("uppercase", "application/json");
Message<byte[]> result = uppercase.apply(MessageBuilder.withPayload("bob").setHeader("foo", "bar").build());
assertThat(result.getHeaders()).containsKey("foo");
类型转换(内容协商)
内容类型协商是 Spring Cloud Function 的核心功能之一,它不仅允许将传入的数据转换为函数签名所声明的类型,还能在函数组合过程中执行相同的转换,从而使原本因类型不匹配而无法组合(un-composable)的函数变得可组合。
为了更好地理解内容类型协商背后的机制和必要性,我们以一个非常简单的用例为例,使用以下函数作为示例:
@Bean
public Function<Person, String> personFunction {..}
前面示例中显示的函数期望以Person对象作为参数,并生成String类型作为输出。如果使用类型Person调用此函数,则一切正常。但通常情况下,该函数充当处理传入数据的处理器,这些数据大多以原始格式(如byte[]、JSON String等)出现。为了使框架能够成功地将传入的数据作为参数传递给此函数,它必须设法将传入的数据转换为Person类型。
Spring Cloud Function 依赖于 Spring 的两种原生机制来实现该功能。
-
MessageConverter - 用于将传入的 Message 数据转换为函数声明的类型。
-
ConversionService - 将传入的非 Message 数据转换为函数所声明的类型。
这意味着,根据原始数据的类型(Message 或非 Message),Spring Cloud Function 将应用其中一种或另一种机制。
在处理作为其他请求(例如HTTP、Messaging等)一部分调用的函数时,框架依赖于MessageConverters。因为此类请求已经转换为Spring Message。
换句话说,框架定位并应用适当的MessageConverter。为了实现这一点,框架需要用户的一些指令。其中一条指令已由该函数本身的签名(Person类型)提供。因此,在理论上,这应该是(并且在某些情况下是)足够的。
然而,在大多数使用场景中,为了选择合适的MessageConverter,框架需要额外的信息。缺失的部分是contentType标头。
此类标题通常作为消息的一部分出现,由创建该消息的相应适配器注入。例如,HTTP POST 请求会将其 content-type HTTP 头部复制到消息的 contentType 头部中。
当此类头信息不存在时,框架将依赖默认的内容类型,即 application/json。
内容类型与参数类型
如前所述,为了使框架选择适当的MessageConverter,它需要参数类型和可选的内容类型信息。选择适当MessageConverter的逻辑由参数解析器负责,在调用用户定义函数之前触发(此时框架知道实际参数类型)。如果参数类型与当前有效载荷的类型不匹配,则框架会委托给预配置的MessageConverters堆栈,查看其中任何一个是否可以转换有效载荷。
组合contentType和参数类型是框架确定是否可以通过定位适当的MessageConverter将消息转换为目标类型的机制。
如果没有找到合适的MessageConverter,则会抛出异常,并且您可以通过添加自定义的MessageConverter来处理此异常(参见User-defined Message Converters)。
不要期望仅根据 Message 就将 contentType 转换为其他类型。请记住, contentType 是目标类型的补充。它是一种提示,MessageConverter 可能会考虑,也可能不会考虑。 |
消息转换器
MessageConverters 定义两个方法:
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);
理解这些方法的契约及其使用方式非常重要,尤其是在 Spring Cloud Stream 的上下文中。
该 fromMessage 方法将传入的 Message 转换为参数类型。
该 Message 的有效载荷可以是任何类型,而实际实现中的 MessageConverter 支持多种类型的职责在于其自身。
提供的 MessageConverters
如前所述,该框架已提供了一套 MessageConverters,用于处理大多数常见用例。以下列表按优先级顺序描述了所提供的 MessageConverters(第一个适用的 MessageConverter 将被使用):
-
JsonMessageConverter: 支持将Message的负载在 POJO 与消息体之间进行转换,适用于contentType为application/json的情况(默认使用 Jackson 或 Gson 库)。该消息转换器还能够识别type参数(例如:application/json;type=foo.bar.Person)。这在某些情况下非常有用:当函数开发时类型尚不明确,因此函数签名可能看起来像Function<?, ?>、Function或Function<Object, Object>。换句话说,对于类型转换,我们通常依据函数签名来推导类型;而通过 MIME 类型参数,可以以更动态的方式传递类型信息。 -
ByteArrayMessageConverter: 支持将Message的负载从byte[]转换为byte[],适用于contentType为application/octet-stream的情况。它本质上是直接传递(pass through),主要目的是为了向后兼容。 -
StringMessageConverter: 支持将任何类型转换为String,当contentType为text/plain时。
当找不到合适的转换器时,框架会抛出异常。发生这种情况时,您应该检查代码和配置,并确保没有遗漏任何内容(即通过绑定或标头提供contentType)。但是,很可能您遇到了一些不常见的案例(例如自定义的contentType),当前提供的MessageConverters堆栈不知道如何进行转换。如果是这种情况,您可以添加自定义的MessageConverter。参见用户定义的消息转换器。
用户自定义消息转换器
Spring Cloud Function 提供了一种机制,用于定义和注册额外的 MessageConverters。要使用它,请实现 org.springframework.messaging.converter.MessageConverter,并将其配置为 @Bean。随后,它将被添加到现有的 `MessageConverter` 栈中。
重要的是要理解,自定义 MessageConverter 实现会被添加到现有栈的顶部。因此,自定义 MessageConverter 实现会优先于现有的实现,这使您能够覆盖并扩展现有的转换器。 |
以下示例展示了如何创建一个消息转换器 bean,以支持一种名为 application/bar 的新内容类型:
@SpringBootApplication
public static class SinkApplication {
...
@Bean
public MessageConverter customMessageConverter() {
return new MyCustomMessageConverter();
}
}
public class MyCustomMessageConverter extends AbstractMessageConverter {
public MyCustomMessageConverter() {
super(new MimeType("application", "bar"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (Bar.class.equals(clazz));
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
}
}
JSON 选项说明
在 Spring Cloud Function 中,我们支持使用 Jackson 和 Gson 机制来处理 JSON。
为了方便您使用,我们将这些机制抽象到了 org.springframework.cloud.function.json.JsonMapper,它本身知道两种机制,并将根据您的选择或默认规则使用其中一种。
默认规则如下:
-
无论类路径上存在哪个库,都将使用该库作为机制。因此,如果您将
com.fasterxml.jackson.*添加到类路径中,将使用 Jackson;如果您将com.google.code.gson添加到类路径中,则将使用 Gson。 -
如果您同时拥有两者,则 Gson 将作为默认选项,或者您可以设置
spring.cloud.function.preferred-json-mapper属性,并使用以下两个值中的任意一个:gson或jackson。
尽管如此,类型转换通常对开发者是透明的,但是由于org.springframework.cloud.function.json.JsonMapper也被注册为一个bean,因此如果需要的话可以很容易地将其注入到您的代码中。
Kotlin Lambda 支持
我们还为 Kotlin lambda 表达式提供支持(自 v2.0 起)。<br/>考虑以下示例:
@Bean
open fun kotlinSupplier(): () -> String {
return { "Hello from Kotlin" }
}
@Bean
open fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
open fun kotlinConsumer(): (String) -> Unit {
return { println(it) }
}
上述内容表示作为Spring Bean配置的Kotlin lambda。每个函数签名都映射到等效的Java Supplier、Function 和 Consumer,因此框架支持/识别这些签名。
虽然Kotlin到Java的映射机制超出了本文档的范围,但重要的是要了解此处应用了“Java 8功能支持”部分中概述的相同签名转换规则。
要启用 Kotlin 支持,您所需要做的就是将 Kotlin SDK 库添加到类路径上,这将会触发适当的自动配置和支持类。
函数组件扫描
如果存在包 functions,Spring Cloud Function 将扫描其中的 Function、Consumer 和 Supplier 实现。使用此功能,您可以编写不依赖 Spring 的函数——甚至不需要 @Component 注解。如果您想使用不同的包,请设置 spring.cloud.function.scan.packages。您也可以使用 spring.cloud.function.scan.enabled=false 完全关闭扫描。
数据掩码
一个典型的应用程序会包含多个级别的日志。某些云/无服务器平台可能会在被记录的数据包中包含敏感数据,任何人都可以查看。
虽然检查正在记录的数据是单个开发人员的责任,但日志来自框架本身,因此从版本4.1开始我们引入了JsonMasker来帮助屏蔽AWS Lambda负载中的敏感数据。但是JsonMasker是通用的,并且可用于任何模块。目前它只会对如JSON这样的结构化数据起作用。您只需要指定要屏蔽的键,其余工作将由其自动处理。
应在文件META-INF/mask.keys中指定这些密钥。该文件格式非常简单,您可以使用逗号或换行符(或者两者)分隔多个键。
以下是此类文件内容的示例:
eventSourceARN asdf1, SS
在这里,您会看到定义了三个键。一旦这样的文件存在,JsonMasker 将使用它来屏蔽指定键的值。
这里是一个展示用法的示例代码
private final static JsonMasker masker = JsonMasker.INSTANCE();
. . .
logger.info("Received: " + masker.mask(new String(payload, StandardCharsets.UTF_8)));