此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 spring-cloud-function 4.3.0! |
编程模型
函数目录和灵活的功能签名
Spring Cloud Function 的主要功能之一是为用户定义的函数适配和支持一系列类型签名,
同时提供一致的执行模型。
这就是为什么所有用户定义的函数都被转换为规范表示FunctionCatalog
.
虽然用户通常不必关心FunctionCatalog
根本,知道什么是有用的
用户代码中支持类型的函数。
了解 Spring Cloud Function 为响应式 API 提供了一流的支持也很重要
由 Project Reactor 提供,允许响应式原语,例如Mono
和Flux
用作用户定义函数中的类型,在选择编程模型时提供更大的灵活性
你的函数实现。
响应式编程模型还支持对原本难以实现甚至不可能实现的功能
使用命令式编程风格。有关这方面的更多信息,请阅读 函数 Arity 部分。
Java 8 函数支持
Spring Cloud Function 拥抱并构建在 Java 定义的 3 个核心功能接口之上 从 Java 8 开始可供我们使用。
-
提供商<O>
-
功能<I,O>
-
消费者<我>
避免不断提及Supplier
,Function
和Consumer
在适当的情况下,我们将在本手册的其余部分将其称为功能 Bean。
简而言之,应用程序上下文中任何功能 bean 都将延迟注册FunctionCatalog
.
这意味着它可以从本参考手册中描述的所有附加功能中受益。
在最简单的应用程序中,您需要做的就是声明@Bean
类型Supplier
,Function
或Consumer
在您的应用程序配置中。
然后你可以访问FunctionCatalog
并根据其名称查找特定函数。
例如:
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
. . .
FunctionCatalog catalog = applicationContext.getBean(FunctionCatalog.class);
Function uppercase = catalog.lookup(“uppercase”);
重要的是要了解这一点uppercase
是 bean,你当然可以从ApplicationContext
直接,但你得到的只是你声明的 bean,没有 SCF 提供的任何额外功能。当您通过FunctionCatalog
,您将收到的实例被包装(检测)使用本手册中描述的附加功能(即类型转换、组合等)。此外,重要的是要了解典型用户不会直接使用 Spring Cloud Function。相反,典型的用户实现 JavaFunction/Supplier/Consumer
在不同的执行环境中使用它的想法,而无需额外的工作。例如,通过提供的 Spring Cloud Function,可以将相同的 java 函数表示为 REST 端点或流消息处理程序或 AWS Lambda 等
适配器以及使用 Spring Cloud Function 作为核心编程模型的其他框架(例如 Spring Cloud Stream)。
因此,总而言之,Spring Cloud Function 使用附加功能来检测 java 函数,以便在各种执行上下文中使用。
功能定义
虽然前面的示例向您展示了如何以编程方式在 FunctionCatalog 中查找函数,但在另一个框架(例如 Spring Cloud Stream)将 Spring Cloud Function 用作编程模型的典型集成案例中,您可以通过spring.cloud.function.definition
财产。知道在发现函数时了解一些默认行为很重要FunctionCatalog
.例如,如果ApplicationContext
这spring.cloud.function.definition
属性通常不需要,因为FunctionCatalog
可以通过空名称或任何名称查找。例如,假设uppercase
是目录中唯一的函数,可以将其查找为catalog.lookup(null)
,catalog.lookup(“”)
,catalog.lookup(“foo”)
.
也就是说,对于您使用框架(例如 Spring Cloud Stream)的情况,它使用spring.cloud.function.definition
这是最佳实践,建议始终使用spring.cloud.function.definition
财产。
例如
spring.cloud.function.definition=uppercase
过滤不合格的函数
典型的应用程序上下文可能包括作为有效 java 函数的 bean,但不打算成为要注册的候选FunctionCatalog
.
此类 bean 可以是来自其他项目的自动配置,也可以是符合 Java 函数条件的任何其他 bean。
该框架提供了已知 bean 的默认过滤,这些 bean 不应成为向函数目录注册的候选对象。
您还可以通过提供 bean 定义名称的 coma 分隔列表来将其他 bean 添加到此列表中spring.cloud.function.ineligible-definitions
财产。
例如
spring.cloud.function.ineligible-definitions=foo,bar
提供商
提供商可以被动 - Supplier<Flux<T>>
或命令式 - Supplier<T>
.从调用的角度来看,这应该没有区别
给该提供商的实施者。但是,当在框架内使用时
(例如,Spring Cloud Stream)、提供商,尤其是被动的、
通常用于表示流的源,因此调用它们一次即可获取流(例如,Flux)
消费者可以订阅。换句话说,这样的提供商相当于无限流。
然而,相同的响应式提供商也可以表示有限流(例如,轮询的 JDBC 数据上的结果集)。
在这些情况下,此类反应性提供商必须连接到底层框架的某种轮询机制。
为了帮助实现这一点,Spring Cloud 函数提供了一个标记注释org.springframework.cloud.function.context.PollableBean
以表示该提供商生产
有限流,可能需要再次轮询。也就是说,重要的是要了解 Spring Cloud 函数本身
此注解没有提供任何行为。
另外PollableBean
注释将可拆分属性公开给产生流的信号
需要拆分(参见拆分器 EIP)
下面是示例:
@PollableBean(splittable = true)
public Supplier<Flux<String>> someSupplier() {
return () -> {
String v1 = String.valueOf(System.nanoTime());
String v2 = String.valueOf(System.nanoTime());
String v3 = String.valueOf(System.nanoTime());
return Flux.just(v1, v2, v3);
};
}
功能
函数也可以以命令式或响应式方式编写,但与提供商和消费者不同的是,有 对于实现者来说,除了理解在框架内使用时,没有特别的考虑因素 比如 Spring Cloud Stream 等,响应式函数是 仅调用一次以传递对流(Flux 或 Mono)的引用,并且每个事件调用一次命令式。
功能组成
函数组合是一项允许将多个功能组合为一个的功能。 核心支持基于自 Java 8 以来提供的 Function.andThen(..) 支持提供的函数组合功能。然而,除此之外,我们还提供了一些附加功能。
函数路由和过滤
从 2.2 版本开始,Spring Cloud Function 提供了路由功能,允许 调用单个函数,该函数充当您希望调用的实际函数的路由器。 此功能在某些维护配置的 FAAS 环境中非常有用 因为多个函数可能很麻烦,或者不可能公开多个函数。
这RoutingFunction
在 FunctionCatalog 中以functionRouter
.为简单起见
和一致性你也可以参考RoutingFunction.FUNCTION_NAME
不断。
此函数具有以下签名:
public class RoutingFunction implements Function<Object, Object> {
. . .
}
路由指令可以通过多种方式进行传达。我们支持通过消息头、系统提供指令 属性以及可插拔策略。那么让我们看看一些细节
MessageRouting回调
这MessageRoutingCallback
是一种策略,用于帮助确定路由到函数定义的名称。
public interface MessageRoutingCallback {
default String routingResult(Message<?> message) {
return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
}
}
您需要做的就是实现并将其注册为 bean,以便由RoutingFunction
.
例如:
@Bean
public MessageRoutingCallback customRouter() {
return new MessageRoutingCallback() {
@Override
public String routingResult(Message<?> message) {
return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
}
};
}
在前面的示例中,您可以看到一个非常简单的实现MessageRoutingCallback
它确定函数定义FunctionProperties.FUNCTION_DEFINITION
Message 标头,并返回String
表示要调用的函数的定义。
邮件头
如果输入参数类型为Message<?>
,您可以通过设置以下选项之一来传达路由指令spring.cloud.function.definition
或spring.cloud.function.routing-expression
邮件标头。
顾名思义spring.cloud.function.routing-expression
依赖于 Spring 表达式语言 (SpEL)。
对于更多静态情况,您可以使用spring.cloud.function.definition
标头,允许您提供
单个函数的名称(例如…definition=foo
)或作文指令(例如,…definition=foo|bar|baz
).
对于更动态的情况,您可以使用spring.cloud.function.routing-expression
标头,并提供应解析的 SpEL 表达式
转换为函数的定义(如上所述)。
SpEL 评估上下文的根对象是
实际输入参数,因此在Message<?> 可以构造具有访问权限的表达式
对两者payload 和headers (例如,spring.cloud.function.routing-expression=headers.function_name ). |
SpEL 允许用户提供要执行的 Java 代码的字符串表示。鉴于spring.cloud.function.routing-expression 可以通过消息标头提供意味着设置此类表达式的能力可能会暴露给最终用户(即,使用 Web 模块时的 HTTP 标头),这可能会导致一些问题(例如,恶意代码)。为了管理这一点,将仅针对 Message 标头的所有表达式进行评估SimpleEvaluationContext 它的功能有限,并且设计为仅评估上下文对象(在我们的例子中为 Message)。另一方面,通过属性或系统变量设置的所有表达式都将根据StandardEvaluationContext ,这允许 Java 语言的充分灵活性。
虽然通过系统/应用程序属性或环境变量设置表达式通常被认为是安全的,因为它在正常情况下不会向最终用户公开,但在某些情况下,可见性以及更新系统、应用程序和环境变量的能力确实通过某些 Spring 项目或第三方提供的 Spring Boot Actuator 端点或最终用户的自定义实现公开给最终用户。必须使用行业标准的 Web 安全实践来保护此类端点。
Spring Cloud Function 不会公开任何此类端点。 |
在特定的执行环境/模型中,适配器负责翻译和通信spring.cloud.function.definition
和/或spring.cloud.function.routing-expression
通过 Message 标头。
例如,在使用 spring-cloud-function-web 时,您可以提供spring.cloud.function.definition
作为 HTTP
标头,框架将把它和其他 HTTP 标头作为消息标头传播。
应用程序属性
路由指令也可以通过以下方式进行通信spring.cloud.function.definition
或spring.cloud.function.routing-expression
作为应用程序属性。中描述的规则
上一节也适用于此。唯一的区别是您将这些说明提供为
应用程序属性(例如--spring.cloud.function.definition=foo
).
重要的是要了解提供spring.cloud.function.definition 或spring.cloud.function.routing-expression 因为 Message 标头仅适用于命令式函数(例如Function<Foo, Bar> ).
也就是说,我们只能使用命令式函数路由每条消息。使用响应式函数,我们无法按消息路由。因此,您只能将路由说明作为应用程序属性提供。
这一切都与工作单元有关。在命令式函数中,工作单元是消息,因此我们可以根据这样的工作单元进行路由。
对于响应式函数,工作单元是整个流,因此我们只会根据通过应用程序提供的指令执行作
属性并路由整个流。 |
路由指令的优先级顺序
鉴于我们有几种提供路由指令的机制,了解优先级非常重要 同时使用多种机制时的冲突解决,因此顺序如下:
-
MessageRoutingCallback
(如果函数是命令式的,则无论是否定义了任何其他内容,它都会接管) -
消息头(如果函数是命令式的,并且没有
MessageRoutingCallback
提供) -
应用程序属性(任何函数)
不可路由的消息
如果目录中没有路由到函数,您将收到一个异常,说明这一点。
在某些情况下,这种行为是不受欢迎的,你可能希望有一些“包罗万象”的类型函数来处理此类消息。
为了实现这一目标,框架提供了org.springframework.cloud.function.context.DefaultMessageRoutingHandler
策略。您需要做的就是将其注册为 bean。
它的默认实现将仅记录消息不可路由的事实,但将允许消息流继续进行,没有例外,从而有效地删除不可路由的消息。
如果你想要更复杂的东西,你需要做的就是提供你自己的策略实现并将其注册为 bean。
@Bean
public DefaultMessageRoutingHandler defaultRoutingHandler() {
return new DefaultMessageRoutingHandler() {
@Override
public void accept(Message<?> message) {
// do something really cool
}
};
}
函数过滤
过滤是一种只有两条路径的路由类型 - “go”或“discard”。就功能而言,这意味着 只有当某个条件返回 'true' 时,您才想调用某个函数,否则您想丢弃输入。 但是,当涉及到丢弃输入时,对于它在应用程序上下文中的含义有很多解释。 例如,您可能想要记录它,或者您可能想要维护已丢弃邮件的计数器。您可能也想什么都不做。 由于这些不同的路径,我们没有提供如何处理丢弃消息的常规配置选项。 相反,我们只是建议定义一个简单的消费者,它表示“丢弃”路径:
@Bean
public Consumer<?> devNull() {
// log, count or whatever
}
现在,您可以拥有实际上只有两条路径的路由表达式,实际上成为过滤器。例如:
--spring.cloud.function.routing-expression=headers.contentType.toString().equals('text/plain') ? 'echo' : 'devNull'
每条不符合进入“echo”函数的条件的消息都将进入“devNull”,在那里你什么也做不了。
签名Consumer<?>
还将确保不会尝试类型转换,从而几乎没有执行开销。
在处理响应式输入(例如,发布者)时,路由指令只能通过函数属性提供。这是 由于响应式函数的性质,这些函数仅调用一次以传递发布者及其余部分 由反应器处理,因此我们无法访问和/或依赖通过单个传达的路由指令 值(例如,消息)。 |
多台路由器
默认情况下,框架将始终具有配置的单个路由函数,如前面的部分所述。但是,有时您可能需要多个路由功能。
在这种情况下,您可以创建自己的RoutingFunction
bean 除了现有的 bean 之外,只要你给它一个名字functionRouter
.
您可以通过spring.cloud.function.routing-expression
或spring.cloud.function.definition
设置为 RoutinFunction 作为映射中的键/值对。
这是一个简单的例子
@Configuration protected static class MultipleRouterConfiguration { @Bean RoutingFunction mySpecialRouter(FunctionCatalog functionCatalog, BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) { Map<String, String> propertiesMap = new HashMap<>(); propertiesMap.put(FunctionProperties.PREFIX + ".routing-expression", "'reverse'"); return new RoutingFunction(functionCatalog, propertiesMap, new BeanFactoryResolver(beanFactory), routingCallback); } @Bean public Function<String, String> reverse() { return v -> new StringBuilder(v).reverse().toString(); } @Bean public Function<String, String> uppercase() { return String::toUpperCase; } }
以及演示其工作原理的测试
`
@Test public void testMultipleRouters() { System.setProperty(FunctionProperties.PREFIX + ".routing-expression", "'uppercase'"); FunctionCatalog functionCatalog = this.configureCatalog(MultipleRouterConfiguration.class); Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME); assertThat(function).isNotNull(); Message<String> message = MessageBuilder.withPayload("hello").build(); assertThat(function.apply(message)).isEqualTo("HELLO"); function = functionCatalog.lookup("mySpecialRouter"); assertThat(function).isNotNull(); message = MessageBuilder.withPayload("hello").build(); assertThat(function.apply(message)).isEqualTo("olleh"); }
[[输入/输出丰富]] == 输入/输出丰富
有时,您需要修改或优化传入或传出的消息,并使您的代码没有非功能问题。您不想在业务逻辑中执行此作。
您始终可以通过函数组合来完成它。这种方法有几个好处:
-
它允许您将此非功能性问题隔离到一个单独的函数中,您可以将其与业务函数一起组合为函数定义。
-
它为您提供了完全的自由(和危险),即在传入消息到达实际业务功能之前可以修改哪些内容。
@Bean
public Function<Message<?>, Message<?>> enrich() {
return message -> MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
}
@Bean
public Function<Message<?>, Message<?>> myBusinessFunction() {
// do whatever
}
然后通过提供以下函数定义来编写函数enrich|myBusinessFunction
.
虽然所描述的方法是最灵活的,但它也是最复杂的,因为它需要你编写一些代码,使其成为一个 bean 或手动将其注册为函数,然后才能将其与业务函数一起组合,如前面的示例所示。
但是,如果您尝试进行的修改(扩充)像前面的示例中一样微不足道怎么办?是否有更简单、更动态和可配置的机制来完成相同的任务?
从 3.1.3 版本开始,该框架允许您提供 SpEL 表达式来丰富进入函数和 以及从中产生的输出。让我们以其中一个测试为例。
@Test
public void testMixedInputOutputHeaderMapping() throws Exception {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.main.lazy-initialization=true",
"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut1='hello1'",
"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut2=headers.contentType",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key1=headers.path.split('/')[0]",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key2=headers.path.split('/')[1]",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key3=headers.path")) {
FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class);
FunctionInvocationWrapper function = functionCatalog.lookup("split");
Message<byte[]> result = (Message<byte[]>) function.apply(MessageBuilder.withPayload("helo")
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
.setHeader("path", "foo/bar/baz").build());
assertThat(result.getHeaders().containsKey("keyOut1")).isTrue();
assertThat(result.getHeaders().get("keyOut1")).isEqualTo("hello1");
assertThat(result.getHeaders().containsKey("keyOut2")).isTrue();
assertThat(result.getHeaders().get("keyOut2")).isEqualTo("application/json");
}
}
在这里,你会看到一个名为input-header-mapping-expression
和output-header-mapping-expression
前面是函数的名称(即split
),然后是要设置的消息头键的名称和作为 SpEL 表达式的值。第一个表达式(用于 'keyOut1')是用单引号括起来的文字 SpEL 表达式,有效地将 'keyOut1' 设置为值hello1
.这keyOut2
设置为现有 'contentType' 标头的值。
您还可以在输入标头映射中观察到一些有趣的功能,其中我们实际上拆分了现有标头“路径”的值,将 key1 和 key2 的各个值设置为基于索引的拆分元素的值。
如果由于某种原因提供的表达式计算失败,则函数的执行将像什么都没有发生一样继续进行。 但是,您将在日志中看到 WARN 消息,通知您 |
o.s.c.f.context.catalog.InputEnricher : Failed while evaluating expression "hello1" on incoming message. . .
如果您正在处理具有多个输入的函数(下一节),则可以立即在之后使用 indexinput-header-mapping-expression
--spring.cloud.function.configuration.echo.input-header-mapping-expression[0].key1=‘hello1'
--spring.cloud.function.configuration.echo.input-header-mapping-expression[1].key2='hello2'
函数 Arity
有时需要对数据流进行分类和组织。例如 考虑一个经典的大数据用例,该用例处理包含以下内容的无组织数据: 'orders' 和 'invoices',并且您希望每个都进入单独的数据存储。 这就是函数 arity(具有多个输入和输出的函数)支持的地方 来玩。
让我们看一个这样的函数的例子(完整的实现细节在这里),
@Bean
public Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> organise() {
return flux -> ...;
}
鉴于 Project Reactor 是 SCF 的核心依赖项,我们正在使用它的元组库。元组通过与我们传达基数和类型信息来为我们提供独特的优势。在 SCSt 的上下文中,两者都极其重要。基数让我们知道需要创建多少个输入和输出绑定并绑定到相应的函数的输入和输出。对类型信息的认识确保正确的类型 转换。
此外,这是绑定names 的命名约定的“index”部分发挥作用的地方,因为在此函数中,两个输出绑定names 是organise-out-0
和organise-out-1
.
重要提示:目前,函数 arity 仅支持响应式函数
(Function<TupleN<Flux<?>…>, TupleN<Flux<?>…>> )以复杂事件处理为中心
其中对事件汇合的评估和计算通常需要查看
事件流,而不是单个事件。 |
输入标头传播
在典型情况下,输入消息标头不会传播到输出,这是理所当然的,因为函数的输出可能是对需要它自己的一组消息标头的其他东西的输入。 但是,有时可能需要这种传播,因此 Spring Cloud Function 提供了多种机制来实现此目的。
首先,您始终可以手动复制标题。例如,如果你有一个函数,其签名将Message
并返回Message
(即Function<Message, Message>
),您可以简单且有选择地自己复制标题。请记住,如果您的函数返回 Message,则框架除了正确转换其有效负载之外不会对其执行任何作。
但是,这种方法可能会被证明有点乏味,尤其是在您只想复制所有标头的情况下。
为了帮助处理此类情况,我们提供了一个简单的属性,允许您在要传播输入标头的函数上设置布尔标志。
该属性是copy-input-headers
.
例如,假设您有以下配置:
@EnableAutoConfiguration
@Configuration
protected static class InputHeaderPropagationConfiguration {
@Bean
public Function<String, String> uppercase() {
return x -> x.toUpperCase();
}
}
如您所知,您仍然可以通过向它发送消息来调用此函数(框架将负责类型转换和有效负载提取)
通过简单地将spring.cloud.function.configuration.uppercase.copy-input-headers
自true
,以下断言也将成立
Function<Message<String>, Message<byte[]>> uppercase = catalog.lookup("uppercase", "application/json"); Message<byte[]> result = uppercase.apply(MessageBuilder.withPayload("bob").setHeader("foo", "bar").build()); assertThat(result.getHeaders()).containsKey("foo");
类型转换(内容-类型协商)
内容类型协商是 Spring Cloud Function 的核心功能之一,因为它不仅允许将传入数据转换为声明的类型 通过函数签名,但在函数组合期间进行相同的转换,使原本不可组合的(按类型)函数可组合。
为了更好地理解内容类型协商背后的机制和必要性,我们看了一个非常简单的用例,即 以以下函数为例:
@Bean
public Function<Person, String> personFunction {..}
前面示例中显示的函数需要一个Person
object 作为参数,并生成 String 类型作为输出。如果这样的函数是
使用Person
,比一切都很好。但通常函数扮演传入数据的处理程序的角色,这些数据最常出现
以原始格式,例如byte[]
,JSON String
等。为了使框架成功地将传入数据作为参数传递给
这个函数,它必须以某种方式将传入的数据转换为Person
类型。
Spring Cloud Function 依赖于 Spring 的两种原生机制来实现这一点。
-
MessageConverter - 将传入的消息数据转换为函数声明的类型。
-
ConversionService - 从传入的非 Message 数据转换为函数声明的类型。
这意味着根据原始数据的类型(消息或非消息),Spring Cloud Function 将应用一种或另一种机制。
在大多数情况下,在处理作为其他请求的一部分调用的函数(例如,HTTP、消息传递等)时,框架所依赖的MessageConverters
,
由于此类请求已经转换为 SpringMessage
.换句话说,该框架定位并应用适当的MessageConverter
.
为了实现这一点,框架需要用户的一些指令。函数的签名已经提供了其中一条指令
本身(人员类型)。因此,从理论上讲,这应该(并且在某些情况下)足够了。但是,对于大多数用例,为了
选择适当的MessageConverter
,框架需要额外的信息。缺失的那块是contentType
页眉。
此类标头通常作为 Message 的一部分出现,由最初创建此类 Message 的相应适配器注入。
例如,HTTP POST 请求将将其内容类型 HTTP 标头复制到contentType
Message 的标头。
对于此类标头不存在的情况,框架依赖于默认内容类型作为application/json
.
内容类型与参数类型
如前所述,对于框架选择合适的MessageConverter
,它需要参数类型,以及(可选)内容类型信息。
选择适当MessageConverter
与参数解析器一起驻留,该解析器在调用用户定义的
函数(即框架已知实际参数类型时)。
如果参数类型与当前有效负载的类型不匹配,则框架会委托给
预配置MessageConverters
看看其中任何一个是否可以转换有效负载。
的组合contentType
参数类型是框架通过定位
适当的MessageConverter
.
如果不合适MessageConverter
,则会抛出异常,您可以通过添加自定义MessageConverter
(参见User-defined Message Converters
).
不期望Message 仅基于contentType .
请记住,contentType 与目标类型互补。
这是一个提示,它MessageConverter 可能会考虑也可能不会考虑。 |
消息转换器
MessageConverters
定义两个方法:
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);
了解这些方法的契约及其用法非常重要,特别是在 Spring Cloud Stream 的上下文中。
这fromMessage
方法将传入的Message
设置为参数类型。
的有效负载Message
可以是任何类型,它是
直到实际实施MessageConverter
以支持多种类型。
提供的 MessageConverters
如前所述,该框架已经提供了MessageConverters
以处理最常见的用例。
以下列表描述了提供的MessageConverters
,按优先级顺序(第一个MessageConverter
使用有效的方法):
-
JsonMessageConverter
:支持转换Message
to/from POJO 适用于以下情况contentType
是application/json
使用 Jackson (DEFAULT) 或 Gson 库。此消息转换器还意识到type
参数(例如,application/json;type=foo.bar.Person)。这对于在开发函数时可能不知道类型的情况很有用,因此函数签名可能看起来像Function<?, ?>
或Function
或Function<Object, Object>
.换句话说,对于类型转换,我们通常从函数签名派生类型。拥有 mime-type 参数允许您以更动态的方式传达类型。 -
ByteArrayMessageConverter
:支持转换Message
从byte[]
自byte[]
适用于以下情况contentType
是application/octet-stream
.它本质上是一个直通,主要是为了向后兼容性而存在的。 -
StringMessageConverter
:支持将任何类型转换为String
什么时候contentType
是text/plain
.
如果找不到合适的转换器,框架会引发异常。发生这种情况时,您应该检查您的代码和配置并确保您这样做了
不会遗漏任何内容(即,确保您提供了contentType
通过使用绑定或标头)。
但是,您很可能发现了一些不常见的情况(例如自定义contentType
也许)和当前提供的堆栈MessageConverters
不知道如何转换。如果是这种情况,您可以添加自定义MessageConverter
.请参阅用户定义的消息转换器。
用户定义的消息转换器
Spring Cloud Function 公开了一种机制来定义和注册额外的MessageConverters
.
要使用它,请实现org.springframework.messaging.converter.MessageConverter
,将其配置为@Bean
.
然后将其附加到现有的“MessageConverter”堆栈中。
重要的是要了解习惯MessageConverter 实现被添加到现有堆栈的头部。
因此,定制MessageConverter 实现优先于现有实现,这使您可以覆盖和添加到现有转换器中。 |
以下示例显示如何创建消息转换器 Bean 以支持名为application/bar
:
@SpringBootApplication
public static class SinkApplication {
...
@Bean
public MessageConverter customMessageConverter() {
return new MyCustomMessageConverter();
}
}
public class MyCustomMessageConverter extends AbstractMessageConverter {
public MyCustomMessageConverter() {
super(new MimeType("application", "bar"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (Bar.class.equals(clazz));
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
}
}
关于 JSON 选项的说明
在 Spring Cloud Function 中,我们支持 Jackson 和 Gson 机制来处理 JSON。
为了您的利益,已将其抽象在以下位置org.springframework.cloud.function.json.JsonMapper
它本身知道两种机制,并将使用所选的一种
由您或遵循默认规则。
默认规则如下:
-
无论哪个库位于将要使用的机制上,都是要使用的机制。所以如果你有
com.fasterxml.jackson.*
到类路径,Jackson 将被使用,如果你有com.google.code.gson
,则将使用 Gson。 -
如果你两者都有,那么 Gson 将是默认的,或者你可以将
spring.cloud.function.preferred-json-mapper
属性,具有以下两个值之一:gson
或jackson
.
也就是说,类型转换通常对开发人员是透明的,但是考虑到org.springframework.cloud.function.json.JsonMapper
也注册为 bean如果需要,您可以轻松地将其注入到代码中。
Kotlin Lambda 支持
我们还为 Kotlin lambda 提供支持(自 v2.0 起)。请考虑以下事项:
@Bean
open fun kotlinSupplier(): () -> String {
return { "Hello from Kotlin" }
}
@Bean
open fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
open fun kotlinConsumer(): (String) -> Unit {
return { println(it) }
}
上面表示配置为 Spring bean 的 Kotlin lambda。每个签名映射到 Java 等效的Supplier
,Function
和Consumer
,从而由框架支持/识别签名。虽然 Kotlin 到 Java 映射的机制不在本文档的讨论范围内,但请务必了解此处也应用了“Java 8 函数支持”部分中概述的签名转换规则。
要启用 Kotlin 支持,您只需在类路径上添加 Kotlin SDK 库,这将触发适当的autoconfiguration 和支持类。
函数组件扫描
Spring Cloud Function 将扫描Function
,Consumer
和Supplier
在名为functions
如果存在的话。使用此功能,您可以编写对 Spring 没有依赖关系的函数 - 甚至没有@Component
需要注释。如果要使用不同的包,可以设置spring.cloud.function.scan.packages
. 您还可以使用spring.cloud.function.scan.enabled=false
以完全关闭扫描。
数据屏蔽
典型的应用程序带有多个级别的日志记录。某些云/无服务器平台可能会在正在记录的数据包中包含敏感数据,供所有人查看。虽然检查正在记录的数据是个人开发人员的责任,但日志记录来自框架本身,因此从 4.1 版本开始,我们引入了JsonMasker
最初帮助屏蔽 AWS Lambda 有效负载中的敏感数据。但是,JsonMasker
是通用的,可用于任何模块。目前它仅适用于结构化数据,例如 JSON。您所需要做的就是指定要屏蔽的键,剩下的就由它来处理。应在文件中指定键META-INF/mask.keys
. 文件的格式非常简单,您可以用逗号或换行符或两者分隔多个键。
以下是此类文件内容的示例:
eventSourceARN asdf1, SS
在这里,您会看到定义了三个键一旦存在这样的文件,JsonMasker 将使用它来屏蔽指定键的值。
这是显示用法的示例代码
private final static JsonMasker masker = JsonMasker.INSTANCE(); . . . logger.info("Received: " + masker.mask(new String(payload, StandardCharsets.UTF_8)));