编程模型
函数目录和灵活的功能签名
Spring Cloud Function 的主要特性之一是为用户定义的函数适配和支持一系列类型签名,同时提供一致的执行模型。
这就是为什么所有用户定义的函数都被FunctionCatalog
.
虽然用户通常不必关心FunctionCatalog
根本上,了解用户代码中支持什么样的函数是有用的。
同样重要的是要了解 Spring Cloud Function 为响应式 API 提供了一流的支持,由 Project Reactor 提供。
这允许响应式原语,例如Mono
和Flux
用作用户定义函数中的类型,从而在为函数实现选择编程模型时提供更大的灵活性。
响应式编程模型还可以为使用命令式编程风格难以或不可能实现的功能提供功能支持。
有关这方面的更多信息,请阅读有关 Function Arity 的部分。
Java 8 函数支持
Spring Cloud Function 拥抱并构建在 Java 自 Java 8 以来定义的 3 个核心功能接口之上。
-
提供商<O>
-
功能<I,O>
-
消费者<我>
为了不断避免提及Supplier
,Function
和Consumer
,我们将在本手册的其余部分适当地将它们称为功能 bean。
简而言之,您的任何豆子ApplicationContext
也就是说,函数式 bean 将被延迟注册FunctionCatalog
.
这意味着它可以从本参考手册中描述的所有附加功能中受益。
在最简单的应用程序中,您需要做的就是声明一个@Bean
类型Supplier
,Function
或Consumer
在您的应用程序配置中。
然后,您可以使用FunctionCatalog
根据特定函数的名称查找特定函数。
例如:
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
// . . .
FunctionCatalog catalog = applicationContext.getBean(FunctionCatalog.class);
Function uppercase = catalog.lookup(“uppercase”);
重要的是要了解,鉴于uppercase
是 bean,你当然可以从ApplicationContext
直接,但你得到的只是你声明的 bean,没有 SCF 提供的任何额外功能。
当您通过FunctionCatalog
,您收到的实例将使用本手册中描述的附加功能(即类型转换、组合等)进行包装(检测)。
此外,重要的是要了解典型用户不会直接使用 Spring Cloud Function。
相反,典型的用户实现 JavaFunction
,Supplier
或Consumer
在不同的执行环境中使用它的想法,而无需额外的工作。
例如,通过 Spring Cloud Function 提供的适配器以及使用 Spring Cloud Function 作为核心编程模型的其他框架(例如 Spring Cloud Stream),可以将相同的 Java 函数表示为 REST 端点、流消息处理程序或 AWS Lambda,甚至更多。
总之,Spring Cloud Function 使用附加功能来检测 Java 函数,以便在各种执行上下文中使用。
功能定义
虽然前面的示例向您展示了如何在FunctionCatalog
在编程方式上,在另一个框架(例如 Spring Cloud Stream)将 Spring Cloud Function 用作编程模型的典型集成案例中,您可以通过spring.cloud.function.definition
财产。 在发现函数时,了解和理解默认行为非常重要FunctionCatalog
.
例如,如果ApplicationContext
这spring.cloud.function.definition
属性通常不需要,因为FunctionCatalog
可以通过空名称或任何名称查找。例如,假设uppercase
是目录中唯一的函数,可以将其查找为catalog.lookup(null)
,catalog.lookup(“”)
,catalog.lookup(“foo”)
.
也就是说,对于您使用框架(例如 Spring Cloud Stream)的情况,它使用spring.cloud.function.definition
,建议始终使用spring.cloud.function.definition
财产。
例如
spring.cloud.function.definition=uppercase
过滤不合格的函数
典型的ApplicationContext
可能包括作为有效Java函数的bean,但不打算作为要注册的候选FunctionCatalog
. 此类 bean 可以是来自其他项目的自动配置,也可以是符合 Java 函数条件的任何其他 bean。
该框架提供了对已知 bean 的默认过滤,这些 bean 不应成为注册的候选者FunctionCatalog
. 您还可以通过使用spring.cloud.function.ineligible-definitions
财产。
例如
spring.cloud.function.ineligible-definitions=foo,bar
提供商
提供商可以被动 - Supplier<Flux<T>>
或命令式 - Supplier<T>
.
从调用的角度来看,这对此类Supplier
.
但是,当在框架(例如 Spring Cloud Stream)中使用时,提供商(尤其是响应式提供商)通常用于表示流的源。
因此,调用它们一次以获取流(例如Flux
)消费者可以订阅。
换句话说,这样的提供商相当于无限流。
虽然,相同的响应式提供商也可以表示有限流(例如,轮询的 JDBC 数据上的结果集)。 在这些情况下,此类反应性提供商必须连接到底层框架的某种轮询机制。
为了帮助实现这一点,Spring Cloud 函数提供了一个标记注释org.springframework.cloud.function.context.PollableBean
以表示此类提供商产生有限流,可能需要再次轮询。
但是,重要的是要了解 Spring Cloud Function 本身没有为此 Comments 提供任何行为。
此外,PollableBean
注释公开一个可分割的属性,以表明生成的流需要拆分(参见 Splitter EIP)
这是一个例子:
@PollableBean(splittable = true)
public Supplier<Flux<String>> someSupplier() {
return () -> {
String v1 = String.valueOf(System.nanoTime());
String v2 = String.valueOf(System.nanoTime());
String v3 = String.valueOf(System.nanoTime());
return Flux.just(v1, v2, v3);
};
}
功能
函数也可以以命令式或响应式方式编写。
然而,与Supplier
和Consumer
,除了了解在框架(例如 Spring Cloud Stream)中使用时,响应式函数仅调用一次以传递对流的引用(即Flux
或Mono
),而命令式函数每个事件调用一次。
public Function<String, String> uppercase() {
. . . .
}
双函数
如果您需要通过有效负载接收一些额外的数据(元数据),您可以随时声明函数签名以接收Message
包含带有附加信息的标头映射。
public Function<Message<String>, String> uppercase() {
. . . .
}
为了使您的函数签名更轻巧,更像 POJO,还有另一种方法。您可以使用BiFunction
.
public BiFunction<String, Map, String> uppercase() {
. . . .
}
鉴于Message
仅包含两个属性(有效负载和标头),并且BiFunction
需要两个输入参数,框架将自动识别此签名并从Message
将其作为第一个参数传递,并将Map
的标题作为第二个。
因此,您的函数未与 Spring 的消息传递 API 耦合。
请记住BiFunction
需要严格的签名,其中第二个参数必须是Map
.
同样的规则也适用于BiConsumer
.
功能组成
函数组合是一项允许将多个功能组合为一个的功能。 核心支持基于 Function.andThen(..) 提供的函数组合功能,该功能从 Java 8 开始可用。 但是,Spring Cloud Function 在此基础上提供了一些附加功能。
函数路由和过滤
从版本 2.2 开始,Spring Cloud Function 提供了一个路由功能,允许您调用单个函数,该函数充当您希望调用的实际函数的路由器。 此功能在某些 FAAS 环境中非常有用,在这些环境中,维护多个功能的配置可能很麻烦或无法公开多个功能。
这RoutingFunction
在 FunctionCatalog 中以functionRouter
.
为了简单性和一致性,您还可以参考RoutingFunction.FUNCTION_NAME
不断。
此函数具有以下签名:
public class RoutingFunction implements Function<Object, Object> {
// . . .
}
路由指令可以通过多种方式进行传达。 我们支持通过消息标头、系统属性以及可插拔策略提供指令。 让我们看看一些细节。
MessageRouting回调
这MessageRoutingCallback
是一种策略,用于帮助确定路由到函数定义的名称。
public interface MessageRoutingCallback {
default String routingResult(Message<?> message) {
return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
}
}
您需要做的就是实现并注册一个MessageRoutingCallback
作为一颗豆子,要被RoutingFunction
.
例如:
@Bean
public MessageRoutingCallback customRouter() {
return new MessageRoutingCallback() {
@Override
public String routingResult(Message<?> message) {
return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
}
};
}
在前面的示例中,您可以看到一个非常简单的实现MessageRoutingCallback
,它从FunctionProperties.FUNCTION_DEFINITION
Message
传入的标头Message
,返回String
表示要调用的函数的定义。
邮件头
如果输入参数类型为Message<?>
,您可以通过设置spring.cloud.function.definition
或spring.cloud.function.routing-expression
Message
头。
正如该物业的名称所暗示的那样,spring.cloud.function.routing-expression
依赖于 Spring 表达式语言 (SpEL)。
对于更多静态情况,您可以使用spring.cloud.function.definition
header,它允许您提供单个函数的名称(例如…definition=foo
)或作文指令(例如…definition=foo|bar|baz
).
对于更动态的情况,您可以使用spring.cloud.function.routing-expression
标头并提供应解析为函数定义的 SpEL 表达式(如上所述)。
SpEL 求值上下文的根对象是实际的输入参数,因此在Message<?> 您可以构造一个可以访问两者的表达式payload 和headers (例如spring.cloud.function.routing-expression=headers.function_name ). |
SpEL 允许用户提供要执行的 Java 代码的字符串表示。
鉴于spring.cloud.function.routing-expression 可以通过消息标头提供意味着设置此类表达式的能力可能会暴露给最终用户(即使用 Web 模块时的 HTTP 标头),这可能会导致一些问题(例如恶意代码)。
为了管理这一点,将仅针对 Message 标头的所有表达式进行评估SimpleEvaluationContext ,其功能有限,旨在仅评估上下文对象(在我们的例子中为 Message)。
另一方面,通过属性或系统环境变量设置的所有表达式都是根据StandardEvaluationContext 允许 Java 语言的充分灵活性。
虽然通过系统/应用程序属性或环境变量设置表达式通常被认为是安全的,因为它在正常情况下不会向最终用户公开,但在某些情况下,更新系统、应用程序和环境变量的可见性以及更新系统、应用程序和环境变量的能力确实通过其他一些 Spring 项目提供的 Spring Boot Actuator 端点公开给最终用户, 第三方或最终用户创建的自定义实现。
必须使用行业标准的 Web 安全实践来保护此类端点。
Spring Cloud Function 不公开任何此类端点。 |
在特定的执行环境/模型中,适配器负责翻译和通信spring.cloud.function.definition
和/或spring.cloud.function.routing-expression
通过Message
页眉。
例如,在使用 spring-cloud-function-web 时,您可以提供spring.cloud.function.definition
作为 HTTP 标头,框架会将其与其他 HTTP 标头一起作为消息标头传播。
应用程序属性
路由指令也可以通过以下方式进行传达spring.cloud.function.definition
或spring.cloud.function.routing-expression
作为应用程序属性。
上一节中描述的规则也适用于此。唯一的区别是您将这些指令作为应用程序属性提供(例如--spring.cloud.function.definition=foo
).
重要的是要了解提供spring.cloud.function.definition 或spring.cloud.function.routing-expression 因为 Message 标头仅适用于命令式函数(例如Function<Foo, Bar> ).
也就是说,我们只能使用命令式函数路由每条消息。
使用响应式函数,我们无法按消息路由。
因此,您只能将路由指令作为应用程序属性提供。
这一切都与工作单元有关。
在命令式函数中,工作单元是 Message,因此我们可以基于这样的工作单元进行路由。
使用响应式函数时,工作单元是整个流,因此我们将仅对通过应用程序属性提供的指令进行作并路由整个流。 |
路由指令的优先级顺序
鉴于我们有多种提供路由指令的机制,因此了解同时使用多种机制时解决冲突的优先级非常重要。顺序如下:
-
MessageRoutingCallback
(当函数是命令式时,无论是否定义了任何其他内容,都优先) -
消息头(如果函数是命令式的,并且没有
MessageRoutingCallback
提供) -
应用程序属性(任何函数)
不可路由的消息
如果目录中没有路由到函数,您将收到一个异常,说明该异常。
在某些情况下,这种行为是不受欢迎的,你可能希望有一些能够处理此类消息的“包罗万象”类型函数。
为了实现这一点,该框架提供了org.springframework.cloud.function.context.DefaultMessageRoutingHandler
策略。
您需要做的就是将其注册为 bean。
它的默认实现将仅记录消息不可路由的事实,但将允许消息流继续进行,没有例外,从而有效地删除不可路由的消息。
如果您需要更复杂的东西,您需要做的就是提供您自己的此策略实现并将其注册为 bean。
@Bean
public DefaultMessageRoutingHandler defaultRoutingHandler() {
return new DefaultMessageRoutingHandler() {
@Override
public void accept(Message<?> message) {
// do something really cool
}
};
}
函数过滤
过滤是一种只有两条路径的路由类型 - “go”或“discard”。就函数而言,这意味着您只想在某个条件返回“true”时调用某个函数,否则您想丢弃输入。
但是,当涉及到丢弃输入时,对于它在应用程序上下文中的含义有很多解释。 例如,您可能想要记录它,或者您可能想要维护已丢弃消息的计数器。 您可能也想什么都不做。
由于这些不同的路径,我们没有提供如何处理丢弃消息的常规配置选项。
相反,我们只是建议定义一个简单的Consumer
这将表示“丢弃”路径:
@Bean
public Consumer<?> devNull() {
// log, count, or whatever
}
现在,您可以拥有一个实际上只有两条路径的路由表达式,实际上成为过滤器。 例如:
--spring.cloud.function.routing-expression=headers.contentType.toString().equals('text/plain') ? 'echo' : 'devNull'
每条不符合转到“echo”函数的条件的消息都将转到“devNull”,您可以在其中对其执行任何作。
签名Consumer<?>
还将确保不会尝试类型转换,从而几乎没有执行开销。
在处理响应式输入(例如 Publisher)时,路由指令只能通过 Function 属性提供。
这是由于响应式函数的性质,这些函数仅调用一次以传递Publisher 其余的由反应器处理,因此我们无法访问和/或依赖通过单个值(例如消息)传达的路由指令。 |
多台路由器
默认情况下,框架将始终配置一个路由函数,如前面的部分所述。但是,有时您可能需要多个路由函数。在这种情况下,您可以创建自己的RoutingFunction
bean 除了现有的 bean 之外,只要你给它一个名字functionRouter
.
您可以通过spring.cloud.function.routing-expression
或spring.cloud.function.definition
自RoutingFunction
作为映射中的键/值对。
这是一个简单的例子:
@Configuration protected static class MultipleRouterConfiguration { @Bean RoutingFunction mySpecialRouter(FunctionCatalog functionCatalog, BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) { Map<String, String> propertiesMap = new HashMap<>(); propertiesMap.put(FunctionProperties.PREFIX + ".routing-expression", "'reverse'"); return new RoutingFunction(functionCatalog, propertiesMap, new BeanFactoryResolver(beanFactory), routingCallback); } @Bean public Function<String, String> reverse() { return v -> new StringBuilder(v).reverse().toString(); } @Bean public Function<String, String> uppercase() { return String::toUpperCase; } }
这是一个演示其工作原理的测试:
@Test
public void testMultipleRouters() {
System.setProperty(FunctionProperties.PREFIX + ".routing-expression", "'uppercase'");
FunctionCatalog functionCatalog = this.configureCatalog(MultipleRouterConfiguration.class);
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
assertThat(function).isNotNull();
Message<String> message = MessageBuilder.withPayload("hello").build();
assertThat(function.apply(message)).isEqualTo("HELLO");
function = functionCatalog.lookup("mySpecialRouter");
assertThat(function).isNotNull();
message = MessageBuilder.withPayload("hello").build();
assertThat(function.apply(message)).isEqualTo("olleh");
}
输入/输出丰富
有时,您需要修改或优化传入或传出的消息,并使您的代码没有非功能问题。 您不想在业务逻辑中执行此作。
您始终可以通过函数组合来完成它。 这种方法有几个好处:
-
它允许您将此非功能性问题隔离到一个单独的函数中,您可以将其与业务函数一起组合为函数定义。
-
它为您提供了完全的自由(和危险),即在传入消息到达实际业务功能之前可以修改哪些内容。
@Bean
public Function<Message<?>, Message<?>> enrich() {
return message -> MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
}
@Bean
public Function<Message<?>, Message<?>> myBusinessFunction() {
// do whatever
}
然后,通过提供以下函数定义来组合函数:enrich|myBusinessFunction
.
虽然所描述的方法是最灵活的,但它也是最复杂的。它需要您编写一些代码,然后将其设置为 bean,或者手动将其注册为函数,然后才能使用业务函数编写它,如前面的示例所示。
但是,如果您尝试进行的修改(扩充)像前面的示例中一样微不足道怎么办?是否有更简单、更动态和可配置的机制来完成相同的作?
从 3.1.3 版本开始,该框架允许您提供 SpEL 表达式来丰富进入函数的输入和输出的单个消息头。让我们以其中一个测试为例。
@Test
public void testMixedInputOutputHeaderMapping() throws Exception {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.main.lazy-initialization=true",
"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut1='hello1'",
"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut2=headers.contentType",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key1=headers.path.split('/')[0]",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key2=headers.path.split('/')[1]",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key3=headers.path")) {
FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class);
FunctionInvocationWrapper function = functionCatalog.lookup("split");
Message<byte[]> result = (Message<byte[]>) function.apply(MessageBuilder.withPayload("hello")
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
.setHeader("path", "foo/bar/baz")
.build());
assertThat(result.getHeaders()).containsKey("keyOut1"));
assertThat(result.getHeaders().get("keyOut1")).isEqualTo("hello1");
assertThat(result.getHeaders()).containsKey("keyOut2"));
assertThat(result.getHeaders().get("keyOut2")).isEqualTo("application/json");
}
}
在这里,你可以看到名为input-header-mapping-expression
和output-header-mapping-expression
前面加上函数名称(即split
)后跟要设置的消息头键的名称和作为 SpEL 表达式的值。第一个表达式(用于 'keyOut1')是用单引号括起来的文字 SpEL 表达式,有效地将 'keyOut1' 设置为值hello1
. 这keyOut2
设置为现有 'contentType' 标头的值。
您还可以在输入标头映射中观察到一些有趣的功能,我们实际上正在拆分现有标头“路径”的值,将 key1 和 key2 的各个值设置为基于索引的拆分元素的值。
如果由于某种原因提供的表达式计算失败,则函数的执行将像什么都没发生一样继续进行。但是,您将在日志中看到 WARN 消息,通知您相关情况。 |
o.s.c.f.context.catalog.InputEnricher : Failed while evaluating expression "hello1" on incoming message. . .
如果您正在处理具有多个输入的函数(下一节),则可以在之后立即使用索引input-header-mapping-expression
:
--spring.cloud.function.configuration.echo.input-header-mapping-expression[0].key1=‘hello1'
--spring.cloud.function.configuration.echo.input-header-mapping-expression[1].key2='hello2'
函数 Arity
有时需要对数据流进行分类和组织。 例如,考虑一个经典的大数据用例,用于处理包含“订单”和“发票”的无组织数据,并且您希望每个数据都进入单独的数据存储。 这就是函数 arity(具有多个输入和输出的函数)支持发挥作用的地方。
让我们看一个这样的函数的例子。MessageRouting回调
完整的实现详细信息可在此处获得。 |
@Bean
public Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> organise() {
return flux -> ...;
}
鉴于 Project Reactor 是 SCF 的核心依赖项,我们使用其 Tuple 库。 元组通过与我们传达基数和类型信息,为我们提供了独特的优势。 在 SCSt 的上下文中,两者都极其重要。基数让我们知道需要创建多少个输入和输出绑定并绑定到函数的相应输入和输出。 对类型信息的了解可确保正确的类型转换。
此外,这是绑定名称命名约定的“索引”部分发挥作用的地方,因为在此函数中,两个输出绑定名称是organise-out-0
和organise-out-1
.
目前,函数 arity 仅支持响应式函数 (Function<TupleN<Flux<?>…>, TupleN<Flux<?>…>> )以复杂事件处理为中心,其中对事件汇合的评估和计算通常需要查看事件流而不是单个事件。 |
输入标头传播
在典型情况下,输入消息标头不会传播到输出,这是理所当然的,因为函数的输出可能是对需要它自己的一组消息标头的其他东西的输入。 但是,有时可能需要这种传播,因此 Spring Cloud Function 提供了多种机制来实现此目的。
首先,您始终可以手动复制标题。
例如,如果你有一个函数,其签名将Message
并返回Message
(即Function<Message, Message>
),您可以简单且有选择地自己复制标题。
请记住,如果您的函数返回 Message,则框架除了正确转换其有效负载之外不会对其执行任何作。
但是,这种方法可能会被证明有点乏味,尤其是在您只想复制所有标头的情况下。
为了帮助处理此类情况,我们提供了一个简单的属性,允许您在要传播输入标头的函数上设置布尔标志。
该属性是copy-input-headers
.
例如,假设您有以下配置:
@EnableAutoConfiguration
@Configuration
protected static class InputHeaderPropagationConfiguration {
@Bean
public Function<String, String> uppercase() {
return x -> x.toUpperCase();
}
}
如您所知,您仍然可以通过向它发送消息来调用此函数(框架将负责类型转换和有效负载提取)
通过简单地将spring.cloud.function.configuration.uppercase.copy-input-headers
自true
,以下断言也将成立
Function<Message<String>, Message<byte[]>> uppercase = catalog.lookup("uppercase", "application/json");
Message<byte[]> result = uppercase.apply(MessageBuilder.withPayload("bob").setHeader("foo", "bar").build());
assertThat(result.getHeaders()).containsKey("foo");
类型转换(内容-类型协商)
内容类型协商是 Spring Cloud Function 的核心功能之一,因为它不仅允许将传入数据转换为声明的类型通过函数签名,而且在函数组合过程中进行相同的转换,使原本不可组合的(按类型)函数可组合。
为了更好地理解内容类型协商背后的机制和必要性,我们看了一个非常简单的用例,即 以以下函数为例:
@Bean
public Function<Person, String> personFunction {..}
前面示例中显示的函数需要一个Person
object 作为参数,并生成 String 类型作为输出。如果使用类型Person
,则一切正常。
但是,通常函数充当传入数据的处理程序,这些数据通常以原始格式出现,例如byte[]
,JSON String
等。
为了使框架成功地将传入数据作为参数传递给此函数,它必须以某种方式将传入数据转换为Person
类型。
Spring Cloud Function 依赖于 Spring 的两种原生机制来实现这一点。
-
MessageConverter - 将传入的消息数据转换为函数声明的类型。
-
ConversionService - 从传入的非 Message 数据转换为函数声明的类型。
这意味着根据原始数据的类型(消息或非消息),Spring Cloud Function 将应用一种或另一种机制。
在大多数情况下,在处理作为其他请求的一部分调用的函数(例如,HTTP、消息传递等)时,框架所依赖的MessageConverters
,因为此类请求已经转换为 SpringMessage
.
换句话说,该框架定位并应用适当的MessageConverter
.
为了实现这一点,框架需要用户的一些指令。
其中一条指令已由函数本身的签名(人员类型)提供。
因此,从理论上讲,这应该(并且在某些情况下)足够了。
但是,对于大多数用例,为了选择适当的MessageConverter
,框架需要额外的信息。
缺失的那块是contentType
页眉。
此类标头通常作为 Message 的一部分出现,由最初创建此类 Message 的相应适配器注入。
例如,HTTP POST 请求将将其内容类型 HTTP 标头复制到contentType
Message 的标头。
对于此类标头不存在的情况,框架依赖于默认内容类型作为application/json
.
内容类型与参数类型
如前所述,对于框架选择合适的MessageConverter
,它需要参数类型,以及(可选)内容类型信息。
选择适当MessageConverter
与参数解析器一起驻留,这些解析器在调用用户定义函数之前触发(即框架已知实际参数类型时)。
如果参数类型与当前有效负载的类型不匹配,则框架将委托给预配置的堆栈MessageConverters
看看其中任何一个是否可以转换有效负载。
的组合contentType
参数类型是框架通过查找适当的MessageConverter
.
如果不合适MessageConverter
,则会抛出异常,您可以通过添加自定义MessageConverter
(参见User-defined Message Converters
).
不期望Message 仅基于contentType .
请记住,contentType 与目标类型互补。
这是一个提示,它MessageConverter 可能会考虑也可能不会考虑。 |
消息转换器
MessageConverters
定义两个方法:
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);
了解这些方法的契约及其用法非常重要,特别是在 Spring Cloud Stream 的上下文中。
这fromMessage
方法将传入的Message
设置为参数类型。
的有效负载Message
可以是任何类型,这取决于MessageConverter
以支持多种类型。
提供的 MessageConverters
如前所述,该框架已经提供了MessageConverters
以处理最常见的用例。
以下列表描述了提供的MessageConverters
,按优先级顺序(第一个MessageConverter
使用有效的方法):
-
JsonMessageConverter
:支持转换Message
to/from POJO 适用于以下情况contentType
是application/json
使用 Jackson (DEFAULT) 或 Gson 库。此消息转换器还知道type
参数(例如,application/json;type=foo.bar.Person)。这对于在开发函数时可能不知道类型的情况很有用,因此函数签名可能看起来像Function<?, ?>
或Function
或Function<Object, Object>
.换句话说,对于类型转换,我们通常从函数签名派生类型。拥有 mime-type 参数允许您以更动态的方式传达类型。 -
ByteArrayMessageConverter
:支持转换Message
从byte[]
自byte[]
适用于以下情况contentType
是application/octet-stream
.它本质上是一个直通,主要是为了向后兼容性而存在的。 -
StringMessageConverter
:支持将任何类型转换为String
什么时候contentType
是text/plain
.
如果找不到合适的转换器,框架会引发异常。发生这种情况时,您应该检查您的代码和配置并确保您没有遗漏任何内容(也就是说,确保您提供了contentType
通过使用绑定或标头)。
但是,您很可能发现了一些不常见的情况(例如自定义contentType
也许)和当前提供的堆栈MessageConverters
不知道如何转换。
如果是这种情况,您可以添加自定义MessageConverter
.请参阅用户定义的消息转换器。
用户定义的 MessageConverters
Spring Cloud Function 公开了一种机制来定义和注册额外的MessageConverters
.
要使用它,请实现org.springframework.messaging.converter.MessageConverter
,将其配置为@Bean
.
然后将其附加到现有的“MessageConverter”堆栈中。
重要的是要了解习惯MessageConverter 实现被添加到现有堆栈的头部。
因此,定制MessageConverter 实现优先于现有实现,这使您可以覆盖和添加到现有转换器中。 |
以下示例显示如何创建消息转换器 Bean 以支持名为application/bar
:
@SpringBootApplication
public static class SinkApplication {
...
@Bean
public MessageConverter customMessageConverter() {
return new MyCustomMessageConverter();
}
}
public class MyCustomMessageConverter extends AbstractMessageConverter {
public MyCustomMessageConverter() {
super(new MimeType("application", "bar"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (Bar.class.equals(clazz));
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
}
}
关于 JSON 选项的说明
在 Spring Cloud Function 中,我们支持 Jackson 和 Gson 机制来处理 JSON。
为了您的利益,已将其抽象在以下位置org.springframework.cloud.function.json.JsonMapper
它本身知道两种机制,并将使用您选择的机制或遵循默认规则的机制。
默认规则如下:
-
无论哪个库位于将要使用的机制上,都是要使用的机制。所以如果你有
com.fasterxml.jackson.*
到类路径,Jackson 将被使用,如果你有com.google.code.gson
,则将使用 Gson。 -
如果你两者都有,那么 Gson 将是默认的,或者你可以将
spring.cloud.function.preferred-json-mapper
属性,具有以下两个值之一:gson
或jackson
.
也就是说,类型转换通常对开发人员是透明的。但是,鉴于org.springframework.cloud.function.json.JsonMapper
也注册为 bean,如果需要,您可以轻松地将其注入到代码中。
Kotlin Lambda 支持
我们还为 Kotlin lambda(自 v2.0 起)提供支持。 考虑以下事项:
@Bean
open fun kotlinSupplier(): () -> String {
return { "Hello from Kotlin" }
}
@Bean
open fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
open fun kotlinConsumer(): (String) -> Unit {
return { println(it) }
}
以上表示配置为 Spring bean 的 Kotlin lambda。每个签名映射到 Java 等效的Supplier
,Function
和Consumer
,从而得到框架支持/认可的签名。
虽然 Kotlin 到 Java 映射的机制不在本文档的讨论范围内,但请务必了解,此处也应用了“Java 8 函数支持”部分中概述的签名转换规则。
要启用 Kotlin 支持,您只需在类路径上添加 Kotlin SDK 库,这将触发适当的自动配置和支持类。
函数组件扫描
Spring Cloud Function 将扫描Function
,Consumer
和Supplier
在名为functions
如果存在的话。
使用此功能,您可以编写不依赖于 Spring 的函数 - 甚至@Component
需要注释。
如果要使用不同的包,可以设置spring.cloud.function.scan.packages
.
您还可以使用spring.cloud.function.scan.enabled=false
以完全关闭扫描。
数据屏蔽
典型的应用程序带有多个级别的日志记录。
某些云/无服务器平台可能会在正在记录的数据包中包含敏感数据,供所有人查看。
虽然检查正在记录的数据是个人开发人员的责任,但由于日志记录来自框架本身,从版本 4.1 开始,我们引入了JsonMasker
最初帮助屏蔽 AWS Lambda 有效负载中的敏感数据。
但是,JsonMasker
是通用的,可用于任何模块。
目前,它仅适用于 JSON 等结构化数据。
您所需要做的就是指定要屏蔽的键,剩下的事情就会由它来处理。
应在文件中指定键META-INF/mask.keys
.
文件的格式非常简单,您可以用逗号、换行符或两者分隔多个键。
以下是此类文件内容的示例:
eventSourceARN
asdf1, SS
在这里,您会看到定义了三个键。
一旦存在这样的文件,就会JsonMasker
将使用它来屏蔽指定键的值。
下面是显示用法的示例代码:
private final static JsonMasker masker = JsonMasker.INSTANCE();
// . . .
logger.info("Received: " + masker.mask(new String(payload, StandardCharsets.UTF_8)));