|
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.0! |
这reactive()端点
从版本 5.5 开始,ConsumerEndpointSpec提供reactive()configuration 属性与可选的定制器Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>.
此选项将目标终端节点配置为ReactiveStreamsConsumer实例,独立于输入通道类型,该类型转换为Flux通过IntegrationReactiveUtils.messageChannelToFlux().
提供的函数从Flux.transform()运算符进行自定义 (publishOn(),log(),doOnNext()等)来自 input 通道的反应式流源。
下面的示例演示了如何将发布线程从独立于最终订阅者和生产者的 input 通道更改为该DirectChannel:
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.transformWith(t -> t
.<String, Integer>transformer(Integer::parseInt)
.reactive(flux -> flux.publishOn(Schedulers.parallel()))
)
.get();
}
有关更多信息,请参阅 Reactive Streams Support 。