|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0! |
生成与消费信息
你可以通过编写函数并公开为@Beans.
你也可以使用基于Spring Integration的注释配置,或者
基于 Spring Cloud Stream 的注释配置,尽管始于 spring-cloud-stream 3.x
我们建议使用函数式实现。
Spring Cloud Function 支持
概述
自 Spring Cloud Stream v2.1 起,定义流处理程序和源的另一种选择是使用内置
支持Spring Cloud Function,其中它们可以表示为
类型java.util.function。[提供商/职能/消费者].
为了指定要绑定哪个功能豆到由绑定暴露的外部目的地,
你必须提供spring.cloud.function.definition财产。
如果你只有一颗类型的豆子java.util.function。[提供商/职能/消费者]您可以
跳过spring.cloud.function.definition属性,因为这种函数豆将被自动发现。然而
使用此类财产以避免混淆被认为是最佳实践。
有时这种自动发现会成为障碍,因为单个类型的java.util.function。[提供商/职能/消费者]可能存在于处理消息以外的目的,但由于单一,它会自动被发现并自动绑定。
对于这些罕见情况,你可以通过以下方式关闭自动发现spring.cloud.stream.function.autodetect属性,取值为false. |
以下是应用程序暴露消息处理程序的示例java.util.function.函数通过作为数据的消费者和生产者,有效支持传递语义。
@SpringBootApplication
public class MyFunctionBootApp {
public static void main(String[] args) {
SpringApplication.run(MyFunctionBootApp.class);
}
@Bean
public Function<String, String> toUpperCase() {
return s -> s.toUpperCase();
}
}
在前面的例子中,我们定义一个类型的豆子java.util.function.函数调用 toUpperCase,作为消息处理程序
其“输入”和“输出”必须绑定到由所提供的目的绑定器暴露的外部目的地。
默认情况下,“输入”和“输出”绑定名为toUpperCase-in-0和toUpperCase-out-0.
请参阅功能性绑定名称部分,了解用于确定绑定名称的命名规范。
以下是支持其他语义的简单函数式应用示例:
以下是一个源语义的示例,揭示为java.util.function.Supplier
@SpringBootApplication
public static class SourceFromSupplier {
@Bean
public Supplier<Date> date() {
return () -> new Date(12345L);
}
}
这里有一个流语义的示例,揭示为java.util.function.Consumer
@SpringBootApplication
public static class SinkFromConsumer {
@Bean
public Consumer<String> sink() {
return System.out::println;
}
}
提供商(来源)
功能和消费者在召唤触发方式上相当直接。它们是基于触发的
对发送到目的地的数据(事件)进行判断。换句话说,它们是经典的事件驱动组件。
然而提供商在触发方面属于另一个类别。由于它本质上是数据的来源(起点),因此它并不
订阅任何入站目的地,因此必须通过其他机制触发。
还有一个问题提供商实施,可以是命令式的,也可以是被动的,并且直接与触发这些提供商相关。
请考虑以下示例:
@SpringBootApplication
public static class SupplierConfiguration {
@Bean
public Supplier<String> stringSupplier() {
return () -> "Hello from Supplier";
}
}
前述提供商豆子产生字符串,只要它get()方法被调用。然而,谁会使用这种方法,频率如何?
该框架提供了一个默认的轮询机制(回答“谁?”的问题),该机制会触发提供商的调用,默认情况下它会执行
每一秒(回答“多久一次?”这个问题)。
换句话说,上述配置每秒产生一条消息,每条消息发送给输出目的地被活页夹暴露。
要了解如何自定义轮询机制,请参见轮询配置属性部分。
举一个不同的例子:
@SpringBootApplication
public static class SupplierConfiguration {
@Bean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
return "Hello from Supplier";
} catch (Exception e) {
// ignore
}
}
})).subscribeOn(Schedulers.elastic()).share();
}
}
前述提供商Bean采用了响应式编程风格。通常,且与强制性提供商不同,
鉴于调用其get()方法产生(提供)连续的消息流,而非
个人留言。
该框架识别编程风格的差异,并保证此类服务只被触发一次。
然而,想象一下你想轮询某个数据源并返回代表结果集的有限数据流。 响应式编程风格是此类提供商的完美机制。然而,鉴于产生的水流是有限的, 该提供商仍需定期调用。
请考虑以下示例,它通过生成有限数据流模拟了此类用例:
@SpringBootApplication
public static class SupplierConfiguration {
@PollableBean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.just("hello", "bye");
}
}
豆子本身被注释为可投票豆注释(子集@Bean),从而向框架发出信号,尽管实现
如果这样的提供商是被动的,仍然需要进行调查。
有一个可拆分定义在可投票豆该标注向后处理者发出信号
注释分量产生的结果必须拆分,并设置为true默认。意思是
该系统会将回复者分成单独的消息发送。如果不是这样
他想要的行为,你可以设置为false届时该提供商将直接返回
产生了不分裂的通量。 |
提供商与穿线
正如你现在已经学到的,不像功能和消费者,这些事件由事件触发(它们有输入数据),提供商不具备
任何输入,因此由不同的机制——轮询器触发,而轮询器可能具有不可预测的线程机制。而这些细节
线程机制大多数情况下与函数的下游执行无关,但在某些情况下可能会带来问题
尤其是那些可能对Thread Affinity有特定期望的集成框架。比如春云侦探,它依赖于
在追踪存储在线程本地的数据上。
对于这些情况,我们有另一种机制流桥,用户对线程机制有更多控制权。你可以获得更多细节
在“向输出发送任意数据(例如外部事件驱动源)”部分。 |
消费者(响应式)
反应性的消费者有点特别,因为它有空回归类型,导致框架没有可订阅的引用。
你很可能不需要写作Consumer<Flux<?>>,并将其写作Function<Flux<?>,单一<虚无>>调用然后操作员是你直播中的最后一个操作员。
例如:
public Function<Flux<?>, Mono<Void>> consumer() {
return flux -> flux.map(..).filter(..).then();
}
但如果你确实需要写一个明确的Consumer<Flux<?>>记得订阅即将到来的Flux。
另外,请记住,当反应函数和命令函数混合时,同样的规则也适用于函数组合。
Spring Cloud Function确实支持写有命令式的反应函数,但你必须注意某些限制。
例如,假设你有带有命令式消费者的反应函数。
这种组合的结果是反应性消费者.然而,无法订阅本节前述的此类消费者,
因此,这一限制只能通过让消费者被动并手动订阅(如前所述),或者将功能改为必然来解决。
轮询配置属性
以下属性由春云流揭示,并以Spring.integration.poller。:
- fixedDelay
-
默认轮询器的固定延迟(毫秒)。
默认值:1000升。
- maxMessagesPerPoll
-
默认轮询器的每个轮询事件的最大消息。
默认:1年级。
- 克朗
-
Cron 触发器的 Cron 表达式值。
默认:无。
- 初始延迟
-
周期性触发的初始延迟。
默认值:0。
- 时间单元
-
时间单元用于延迟值。
默认值:毫秒级。
例如--spring.integration.poller.fixed-delay=2000将轮询器间隔设置为每两秒轮询一次。
每个绑定轮询配置
上一节展示了如何配置一个将应用于所有绑定的单一默认轮询器。虽然它与微服务的 spring-cloud-stream 模型很契合,设计时每个微服务代表单个组件(例如提供商),因此默认轮询器配置就足够了,但也存在一些边缘情况 你可能有多个组件需要不同的轮询配置
对于这种情况,请使用按绑定方式配置轮询器。例如,假设你有一个输出绑定供给输出零.在这种情况下,你可以配置轮询器来满足
绑定spring.cloud.stream.bindings.supply-out-0.producer.poller..前缀(例如,spring.cloud.stream.bindings.supply-out-0.producer.poller.fixed-delay=2000).
向输出发送任意数据(例如外部事件驱动源)
有些情况下,实际数据来源可能来自外部(外部)系统,而非活页夹。例如, 数据源可能是经典的 REST 端点。我们如何将这种源与Spring-云流所用的功能机制连接起来?
春云流提供了两种机制,让我们详细来看
这里,对于两个样本,我们将使用一种标准的MVC端点方法,称为delegateToSupplier绑定到根网上下文,
通过StreamBridge机制委派来的请求进行流式传输。
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
System.out.println("Sending " + body);
streamBridge.send("toStream", body);
}
}
这里我们自动接线流桥BEAN 能够有效地将数据发送到输出绑定
将非流应用与 Spring-Cloud-Stream 连接起来。注意,前述例子中没有
定义了源函数(例如,Supplier Bean),使框架没有触发提前创建源绑定的触发条件,这在包含函数豆的配置情况下很常见。这没问题,因为流桥将启动输出绑定的创建(以及
如果需要,目的自动配置)在首次调用时对不存在的绑定进行发送(..)缓存作
后续重用(详情请参见 StreamBridge 和动态目的地)。
不过,如果你想在初始化(启动)时预先创建输出绑定,可以从spring.cloud.stream.output-bindings你可以声明你来源名称的财产。
提供的名称将作为创建源码绑定的触发条件。
你可以用它来表示多个源(多个输出绑定)
(例如,;--spring.cloud.stream.output-bindings=foo;酒吧)
另外,请注意streamBridge.send(..)方法对象为了数据。这意味着你可以发送POJO或消息对它和它
发送输出时会像发送任何函数或提供商提供相同级别的输出一样,经历相同的例程
与函数的一致性。这意味着输出类型的转换、分区等作会被尊重,就像它们来自函数产生的输出一样。
| 与《显式绑定创建》中解释的不同,StreamBridge 的设计既注重性能,也能够随时创建尽可能多的绑定。为此,StreamBridge 创建的实际绑定不会缓存在应用上下文中,因此无法如绑定可视化与控制中所述管理。 不过,如果你仍然想用 StreamBridge 动态创建绑定并管理绑定,请在使用 StreamBridge 之前使用以下机制明确创建绑定 - ref:spring-cloud-stream/binding_visualization_control.adocl#_define_new_and_manage_existing_bindings[编程定义新绑定] |
StreamBridge 带异步发送
流桥使用Spring Integration框架提供的发送机制,该框架是Spring Cloud Stream的核心。默认情况下,该机制使用发送方线程。换句话说,发送就是阻挡。虽然这在很多情况下是可以的,但有些情况下你希望发送是异步的。要实现这种用途setAsync(true)方法流桥然后调用其中一种发送方法。
可观察性上下文传播与异步发送
当使用框架提供的可观察性支持以及支持 Spring 框架时,打破线程边界会影响可观测性上下文的一致性,从而影响你的跟踪历史。要避免这种情况,你只需要添加上下文传播依赖形式 Micrometer(见下文)
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>context-propagation</artifactId>
<version>1.1.0</version>
</dependency>
StreamBridge 与动态目的地
流桥也可用于输出目的地事先未知的情况,类似于这些用例
详见“来自消费者的路由”部分。
让我们看看这个例子
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class, args);
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
System.out.println("Sending " + body);
streamBridge.send("myDestination", body);
}
}
如你所见,前面的例子与前一个非常相似,唯一的区别是通过以下方式提供了显式的绑定指令spring.cloud.stream.output-bindings财产(未提供)。
这里我们正在发送数据到我的目的地这个名字并不存在作为约束。因此,该名称将被视为动态目的地
如“从消费者路由”部分所述。
在前面的例子中,我们使用应用运行者作为外来水源来供养溪流。
一个更实际的例子,外部源是 REST 端点。
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class);
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
streamBridge.send("myBinding", body);
}
}
正如你在里面看到的delegateToSupplier我们用StreamBridge来发送数据的方法我的绑定捆绑。在这里,你也从中受益
动态特征流桥其中如果我的绑定不存在,它会自动创建并缓存,否则将使用已有的绑定。
缓存动态目的地(绑定)可能导致内存泄漏,如果存在多个动态目的地。想要有一定的控制权
我们为输出绑定提供了自驱逐缓存机制,默认缓存大小为10。这意味着如果你的动态目标大小超过这个数值,可能会有现有绑定被逐出,因此需要重新创建,这可能会导致性能轻微下降。你可以通过以下方式增加缓存大小spring.cloud.stream.dynamic-destination-cache-size属性设置为期望值。 |
curl -H "Content-Type: text/plain" -X POST -d "hello from the other side" http://localhost:8080/
通过展示两个例子,我们想强调这种方法适用于任何类型的外国来源。
如果你正在使用Solace PubSub+绑定器,Spring Cloud Stream已经预留了scst_targetDestination头部(可通过BinderHeaders.TARGET_DESTINATION检索),允许消息从绑定配置的目的地重定向到该首部指定的目标目的地。这使得绑定器能够管理发布到动态目的地所需的资源,减轻了框架对此的依赖,并避免了前述说明中提到的缓存问题。更多信息请见此处。 |
StreamBridge 的输出内容类型
如有需要,您也可以通过以下方法签名提供特定内容类型public boolean send(String bindingName, Object data, MimeType outputContentType).
或者如果你以消息,其内容类型将被尊重。
使用特定类型的活页夹配合StreamBridge
Spring Cloud Stream 支持多种活页夹场景。例如,你可能从Kafka接收数据并发送到RabbitMQ。
有关多文件夹场景的更多信息,请参见文件夹部分,特别是“分类路径上的多个文件夹”
如果你计划使用 StreamBridge,并且在申请中配置了多个活关夹,也必须告知 StreamBridge
用哪款活页夹。为此,还有两种发送方法:
public boolean send(String bindingName, @Nullable String binderType, Object data)
public boolean send(String bindingName, @Nullable String binderType, Object data, MimeType outputContentType)
正如你所见,还有一个额外的论点可以提出——binder类型,告诉BindingService创建动态绑定时使用哪个绑定器。
对于spring.cloud.stream.output-bindings如果使用了属性,或者该装订已经在不同的装订器下创建,binder类型争论不会有任何效果。 |
使用StreamBridge的通道拦截器
因为流桥使用消息频道为了建立输出绑定,你可以在发送数据时激活通道拦截器流桥.
申请者可自行决定选择哪些信道拦截器流桥.
春云流不会注入所有检测到的通道拦截器流桥除非它们被标记为@GlobalChannelInterceptor(patterns = “*”).
假设你有以下两种不同的情况流桥应用中的绑定。
streamBridge.send(“foo-out-0”,消息);
和
streamBridge.send(“bar-out-0”,消息);
现在,如果你想在两者上都应用通道拦截器,流桥绑定,然后你可以声明以下内容全球频道拦截者豆。
@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
不过,如果你不喜欢上面的全局方法,想为每个绑定都配备专用拦截器,可以这样做。
@Bean
@GlobalChannelInterceptor(patterns = "foo-*")
public ChannelInterceptor fooInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
和
@Bean
@GlobalChannelInterceptor(patterns = "bar-*")
public ChannelInterceptor barInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
您可以灵活地将图案设置得更严格或定制,以满足您的业务需求。
通过这种方法,应用程序能够决定注入哪些拦截器流桥而不是使用所有可用的拦截弹。
流桥通过StreamOperations包含所有发送方法流桥.因此,应用可以选择自动接线StreamOperations.这在涉及单元测试代码时非常有用,这些代码使用流桥通过提供模拟或类似的机制来实现StreamOperations接口。 |
反应功能支持
由于 Spring Cloud Function 是建立在 Project Reactor 之上,所以你不需要做太多
在实现响应式编程模型时,以受益于提供商,功能或消费者.
例如:
@SpringBootApplication
public static class SinkFromConsumer {
@Bean
public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
return flux -> flux.map(val -> val.toUpperCase());
}
}
|
选择响应式或命令式编程模型时,必须了解一些重要事项。 完全被动式还是仅仅是API? 使用响应式API并不一定意味着你可以享受到该API的所有响应式功能。换句话说,像背压和其他高级功能只有在兼容系统时才会有效——比如响应式卡夫卡活页夹。如果你使用的是普通的Kafka、Rabbit或其他非响应式绑定器,你只能享受响应式API本身的便利,而非其高级功能,因为流的实际来源或目标不是响应式的。 错误处理与重试 在本手册中,您将看到关于基于框架的错误处理、重试及其他功能及其相关配置属性的多项参考文献。重要的是要明白,它们只影响命令式功能,你不应该对响应式功能抱有同样的期待。原因如下......
响应式和命令式功能之间存在根本区别。
命令函数是一个消息处理程序,框架在收到的每条消息时调用它。因此,对于N条消息,该函数将调用N次,因此我们可以对该函数进行包装,并添加诸如错误处理、重试等额外功能。
反应函数是初始化函数。只需调用一次,即可将用户提供的 Flux/Mono 引用与框架提供的 Flux/Mono 连接。之后,我们(框架)对流完全没有可见性和控制权。
因此,对于响应式函数,在错误处理和重试方面,你必须依赖响应式API的丰富性(即, |
功能组成
使用函数式编程模型,你还可以受益于函数组合,可以从一组简单函数动态组合复杂处理程序。 举个例子,我们将以下函数豆添加到上述定义的应用中
@Bean
public Function<String, String> wrapInQuotes() {
return s -> "\"" + s + "\"";
}
并修改spring.cloud.function.definition属性以反映你打算从“toUpperCase”和“wrapInQuotes”中组合一个新函数。
为此,Spring Cloud Function 依赖于|(烟斗)符号。所以,为了结束我们的例子,我们的房产现在看起来如下:
--spring.cloud.function.definition=toUpperCase|wrapInQuotes
| Spring Cloud Function 提供的函数式组合支持的一个重要优势是 你能组合反应式和命令式函数。 |
合成的结果是一个函数,正如你可能猜到的,这个函数可能有一个非常长且相当隐晦的名称(例如,福尔!巴尔|巴兹|XYZ......)
这在涉及其他配置属性时带来了极大的不便。这时,功能性绑定名称部分描述的描述性绑定名称功能可以发挥作用。
例如,如果我们想给出toUpperCase|wrapInQuotes我们可以用更具体的名称来描述
具有以下性质spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=quotedUpperCaseInput允许
其他配置属性用于指代该绑定名称(例如,spring.cloud.stream.bindings.quotedUpperCaseInput.destination=myDestination).
功能组成与跨领域关注点
函数组合实际上可以通过分解复杂性来应对复杂性 变成一组简单且可单独管理/测试的组件,这些组件仍然可以 运行时表示为一个。但这并不是唯一的好处。
你也可以用合成来解决某些跨领域且非功能性的问题, 比如内容丰富。例如,假设你有一个来信,可能 可能缺少某些页眉,或者有些页眉不完全符合你的业务状态 功能是期望的。你现在可以实现一个单独的函数来处理这些问题 然后将其与主要业务内容结合起来。
让我们看看这个例子
@SpringBootApplication
public class DemoStreamApplication {
public static void main(String[] args) {
SpringApplication.run(DemoStreamApplication.class,
"--spring.cloud.function.definition=enrich|echo",
"--spring.cloud.stream.function.bindings.enrich|echo-in-0=input",
"--spring.cloud.stream.bindings.input.destination=myDestination",
"--spring.cloud.stream.bindings.input.group=myGroup");
}
@Bean
public Function<Message<String>, Message<String>> enrich() {
return message -> {
Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
};
}
@Bean
public Function<Message<String>, Message<String>> echo() {
return message -> {
Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
System.out.println("Incoming message " + message);
return message;
};
}
}
虽然很简单,但这个例子展示了一个函数如何通过额外的头部(非函数性关注点)丰富收到的消息,
所以另一个函数——回波- 能从中获益。这回波功能保持简洁,专注于业务逻辑。
你还可以看到spring.cloud.stream.function.bindings属性以简化组合绑定名称。
具有多个输入和输出参数的函数
从3.0版本开始,spring-cloud-stream支持以下函数 拥有多个输入和/或多个输出(返回值)。这到底意味着什么? 它针对的是什么类型的用例?
-
大数据:想象你处理的数据源高度杂乱无章,包含各种类型的数据元素 (例如订单、交易等),你实际上需要把它们理顺。
-
数据聚合:另一个用例可能需要合并来自2+输入_streams的数据元素。
以上描述的只是一些需要用单一函数接受和/或生成的场景 多条数据流。这正是我们这里针对的用例。
另外,这里对“流”概念的强调略有不同。假设这些函数只有有价值
如果他们被允许访问实际的数据流(而非单个元素)。因此我们依赖于
由Project Reactor提供的抽象(即,通量和单)该系统已在
作为 Spring-Cloud-functions 引入的依赖的一部分。
另一个重要方面是多输入和多输出的表示。而 Java 则提供
各种不同的抽象来表示某物的倍数,这些抽象
a)无界,b)缺乏元性,c)缺乏类型信息,这些在该语境中都很重要。
举个例子,我们来看收集或者是一个只允许我们
描述单一类型的多元,或者将所有内容上抛为对象影响了 的透明类型转换特性
春季-云流-溪流等等。
因此,为了满足所有这些需求,初始支持依赖于使用另一种抽象的签名 由反应堆项目 - 元组提供。不过,我们正在努力允许更灵活的签名方式。
| 请参阅“绑定和绑定名称”部分,以了解用于确定该应用所用绑定名称的命名规范。 |
让我们来看几个示例:
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
return tuple -> {
Flux<String> stringStream = tuple.getT1();
Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i));
return Flux.merge(stringStream, intStream);
};
}
}
上述示例展示了一个函数,该函数需要两个输入(第一个类型为字符串以及第二个型式整数)
并产生单一类型的输出字符串.
所以,在上述例子中,两个输入绑定将是零号集合和聚会一为保持一致性,则
输出绑定也遵循相同的惯例,称为集合-0.
知道这一点后,你就能设置绑定的具体属性。
例如,以下将覆盖 的内容类型零号集合捆绑:
--spring.cloud.stream.bindings.gather-in-0.content-type=text/plain
@SpringBootApplication
public class SampleApplication {
@Bean
public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
return flux -> {
Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
UnicastProcessor even = UnicastProcessor.create();
UnicastProcessor odd = UnicastProcessor.create();
Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));
return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
};
}
}
上述例子与前述示例有点相反,展示了函数
接受单一类型的输入整数并产生两个输出(均为类型字符串).
所以,对于上述例子,输入绑定是零中散射以及
输出绑定为散射-0和散射-1.
然后你用以下代码来测试:
@Test
public void testSingleInputMultiOutput() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
SampleApplication.class))
.run("--spring.cloud.function.definition=scatter")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
for (int i = 0; i < 10; i++) {
inputDestination.send(MessageBuilder.withPayload(String.valueOf(i).getBytes()).build());
}
int counter = 0;
for (int i = 0; i < 5; i++) {
Message<byte[]> even = outputDestination.receive(0, 0);
assertThat(even.getPayload()).isEqualTo(("EVEN: " + String.valueOf(counter++)).getBytes());
Message<byte[]> odd = outputDestination.receive(0, 1);
assertThat(odd.getPayload()).isEqualTo(("ODD: " + String.valueOf(counter++)).getBytes());
}
}
}
单一应用中的多重功能
有时还需要将多个消息处理程序分组到单一应用程序中。你会这样做的 定义了几个功能。
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> reverse() {
return value -> new StringBuilder(value).reverse().toString();
}
}
在上述例子中,我们有定义两个函数的配置大写和反向.
首先,如前所述,我们需要注意存在冲突(多个函数),因此
我们需要通过提供来解决这个问题spring.cloud.function.definition指向实际函数的性质
我们想要绑定。但这里我们用分隔符指向两个函数(见下面的测试用例)。;
| 与具有多输入/输出的函数一样,请参见[绑定和绑定名称]部分以了解命名 用于确定该应用所使用的具有约束力名称的惯例。 |
然后你用以下代码来测试:
@Test
public void testMultipleFunctions() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
ReactiveFunctionConfiguration.class))
.run("--spring.cloud.function.definition=uppercase;reverse")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
inputDestination.send(inputMessage, "uppercase-in-0");
inputDestination.send(inputMessage, "reverse-in-0");
Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());
outputMessage = outputDestination.receive(0, "reverse-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
}
}
批量消费者
当使用消息通道绑定器支持批量监听器,并且该功能已启用用于消费者绑定,你可以设置spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode自true以实现
整批消息要传递给函数列表.
@Bean
public Function<List<Person>, Person> findFirstPerson() {
return persons -> persons.get(0);
}
批量类型转换
类似于单一消息消费者的类型转换,批处理要求批处理中的每条消息都转换为请求的类型。例如,在前一个例子中,这个类型是人.
还需理解,批次中每个消息的头部分别在消息头代表整个批次的消息。这些消息及其对应的批处理头由各自的绑定器创建,其结构可能不同。因此,你应该参考活页夹文档来理解批处理头部的结构。关于《卡夫卡》和《兔子》,你可以搜索amqp_batchedHeaders和kafka_batchConvertedHeaders分别。
简而言之,如果你有一个表示5个有效载荷的批次消息,同一消息会包含一组头部,每个头对应一个索引相同的有效载荷。
但是,如果某个有效载荷未能转换,会发生什么?在单条消息场景中,我们只需返回 null 并调用未转换消息的方法,这要么生成异常,要么允许你处理原始消息,具体取决于函数的签名。
批处理的情况则复杂一些。对于未转换的有效载荷返回空值实际上减少了批次大小。例如,如果原始批次包含5条消息,其中2条未能转换,那么转换后的批次中只包含3条消息。这或许可以接受,但对应的批处理头部怎么办?仍然会有5个头部,因为它们是在装订机形成初始批次时创建的。这种差异使得将头与其对应的有效载荷进行关联变得困难。
为解决这个问题,我们提供了MessageConverterHelper接口。
public interface MessageConverterHelper {
/**
* This method will be called by the framework in cases when a message failed to convert.
* It allows you to signal to the framework if such failure should be considered fatal or not.
*
* @param message failed message
* @return true if conversion failure must be considered fatal.
*/
default boolean shouldFailIfCantConvert(Message<?> message) {
return false;
}
/**
* This method will be called by the framework in cases when a single message within batch of messages failed to convert.
* It provides a place for providing post-processing logic before message converter returns.
*
* @param message failed message.
* @param index index of failed message within the batch
*/
default void postProcessBatchMessageOnFailure(Message<?> message, int index) {
}
}
如果实现了,框架的消息转换逻辑会调用该接口,在无法转换特定有效载荷时对批处理消息进行后处理。
Kafka 和 Rabbit 的默认实现会自动移除对应的批处理头,以保持批处理有效载荷与其头部之间的关联。不过,如果你需要为这种情况添加自定义行为,也可以提供自己的实现并注册为豆子。
此外,该接口还提供了一种方法,允许对转换失败进行更确定性地处理。默认情况下,该方法返回false但如果你愿意在转换错误发生时失败整个过程,也可以自定义实现。
批量生产者
你也可以在生产者端使用批处理的概念,通过返回一组消息,实际上提供了 反过来,合集中的每条消息将由装订员单独发送。
考虑以下函数:
@Bean
public Function<String, List<Message<String>>> batch() {
return p -> {
List<Message<String>> list = new ArrayList<>();
list.add(MessageBuilder.withPayload(p + ":1").build());
list.add(MessageBuilder.withPayload(p + ":2").build());
list.add(MessageBuilder.withPayload(p + ":3").build());
list.add(MessageBuilder.withPayload(p + ":4").build());
return list;
};
}
返回列表中的每条消息将单独发送,结果是发送到输出目的地的四条消息。
春季积分流程作为函数
当你实现一个函数时,可能会有符合该类别的复杂需求 企业集成模式(EIP)的成员。这些问题最好通过使用 如Spring Integration(SI)这样的框架,它是EIP的一个参考实现。
幸运的是,SI 已经支持通过集成流作为网关将集成流暴露为函数。请考虑以下示例:
@SpringBootApplication
public class FunctionSampleSpringIntegrationApplication {
public static void main(String[] args) {
SpringApplication.run(FunctionSampleSpringIntegrationApplication.class, args);
}
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlow.from(MessageFunction.class, spec -> spec.beanName("uppercase"))
.<String, String>transform(String::toUpperCase)
.log(LoggingHandler.Level.WARN)
.bridge()
.get();
}
public interface MessageFunction extends Function<Message<String>, Message<String>> {
}
}
对于熟悉SI的人来说,你可以看到我们定义了一种类型的豆子集成流程其中我们
声明一个我们想以 Al S函数<字符串,字符串>(使用国际单位制DSL)呼叫大写.
这消息功能接口允许我们明确声明输入和输出的类型以实现正确的类型转换。
有关类型转换的更多信息,请参见内容类型协商部分。
要接收原始输入,你可以使用from(Function.class, ...).
所得函数被绑定到目标绑定器暴露的输入和输出目的地。
| 请参阅[绑定和绑定名称]部分以了解命名方式 用于确定该应用所使用的具有约束力名称的惯例。 |
关于Spring Integration和Spring Cloud Stream在函数式编程模型上的互作性,更多细节 你可能会觉得这篇文章非常有趣,因为它深入探讨了更多内容 通过合并Spring Integration和Spring Cloud Stream/Functions的优点,可以应用到各种模式中。
利用民调消费者
概述
使用民调消费者时,你会对可投票消息源按需播放。
要为被调查的消费者定义绑定,你需要提供spring.cloud.stream.pollable-source财产。
请考虑以下轮询消费者装订的示例:
--spring.cloud.stream.pollable-source=myDestination
可轮询源名称我的目的地在上述例子中,将得到我的目的地-0保留的约束力名称
与函数式编程模型一致。
给定前面例子中被调查的消费者,你可以这样用:
@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
return args -> {
while (someCondition()) {
try {
if (!destIn.poll(m -> {
String newPayload = ((String) m.getPayload()).toUpperCase();
destOut.send(new GenericMessage<>(newPayload));
})) {
Thread.sleep(1000);
}
}
catch (Exception e) {
// handle failure
}
}
};
}
一个更不手动、更像Spring的替代方案是配置一个计划任务豆。例如
@Scheduled(fixedDelay = 5_000)
public void poll() {
System.out.println("Polling...");
this.source.poll(m -> {
System.out.println(m.getPayload());
}, new ParameterizedTypeReference<Foo>() { });
}
这PollableMessageSource.poll()方法取消息处理器参数(通常是λ表达式,如图所示)。
它回归了true如果消息被接收并成功处理。
与消息驱动的消费者类似,如果消息处理器抛出异常,消息被发布到错误通道,
如同相关内容讨论错误处理.
通常,poll()方法在消息处理器出口。
如果方法异常退出,消息会被拒绝(不重新排队),但请参见处理错误。
你可以通过承担确认的责任来覆盖该行为,如下示例所示:
@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
return args -> {
while (someCondition()) {
if (!dest1In.poll(m -> {
StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
// e.g. hand off to another thread which can perform the ack
// or acknowledge(Status.REQUEUE)
})) {
Thread.sleep(1000);
}
}
};
}
你必须啊(或纳克)在某个时刻发送消息,以避免资源泄漏。 |
一些消息系统(如 Apache Kafka)在日志中保持一个简单的偏移量。如果投递失败并重新排队,则以StaticMessageHeaderAccessor.getAcknowledgegmentCallback(m).acknowledge(Status.REQUEUE);,任何后来成功确认的消息都会被重新投送。 |
还有一个超载民意调查该方法的定义如下:
poll(MessageHandler handler, ParameterizedTypeReference<?> type)
这类型是一个转换提示,允许对接收消息有效载荷进行转换,如下示例所示:
boolean result = pollableSource.poll(received -> {
Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
...
}, new ParameterizedTypeReference<Map<String, Foo>>() {});