|
对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
这reactive()端点
从 5.5 版本开始,ConsumerEndpointSpec提供一个reactive()配置属性与可选的定制器Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>.
此选项将目标端点配置为ReactiveStreamsConsumer实例,独立于输入通道类型,该类型转换为Flux通过IntegrationReactiveUtils.messageChannelToFlux().
提供的函数从Flux.transform()运算符来自定义 (publishOn(),log(),doOnNext()等等)来自输入通道的响应式流源。
以下示例演示如何将发布线程从输入通道更改为独立于最终订阅者和生产者DirectChannel:
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.transformWith(t -> t
.<String, Integer>transformer(Integer::parseInt)
.reactive(flux -> flux.publishOn(Schedulers.parallel()))
)
.get();
}
有关更多信息,请参阅响应式流支持。