|
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.0! |
Reactive Streams 支持
Spring 集成在框架的某些地方和不同方面提供了对 Reactive Streams 交互的支持。 我们将在这里讨论其中的大部分内容,并在必要时提供指向目标章节的适当链接以获取详细信息。
前言
概括地说,Spring Integration 扩展了 Spring 编程模型以支持众所周知的企业集成模式。
Spring 集成在基于 Spring 的应用程序内支持轻量级消息传递,并支持通过声明式适配器与外部系统集成。
Spring 集成的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。
此目标是在目标应用程序中使用一等公民实现的,例如message,channel和endpoint,这允许我们构建一个集成流(管道),其中(在大多数情况下)一个端点将消息生成到一个通道中,供另一个端点使用。
通过这种方式,我们可以将集成交互模型与目标业务逻辑区分开来。
这里的关键部分是介于两者之间的通道:流行为取决于其实现,而端点保持不变。
另一方面,Reactive Streams 是具有非阻塞背压的异步流处理标准。
Reactive Streams 的主要目标是管理跨异步边界的流数据交换——就像将元素传递给另一个线程或线程池一样——同时确保接收方不会被迫缓冲任意数量的数据。
换句话说,背压是此模型不可或缺的一部分,以便允许在线程之间调解的队列有界。
Reactive Streams 实现(例如 Project Reactor)的目的是在 Stream 应用程序的整个处理图中保留这些优势和特征。
Reactive Streams 库的最终目标是使用可用的编程语言结构,以透明和流畅的方式为目标应用程序提供类型、运算符集和支持 API,但最终解决方案并不像普通函数链调用那样势在必行。
它分为几个阶段:定义和执行,这发生在订阅最终反应式发布者的一段时间后,对数据的需求从定义的底部推送到顶部,根据需要施加背压 - 我们请求当前可以处理的尽可能多的事件。
响应式应用程序看起来像一个"stream"或者正如我们在 Spring Integration 术语中习惯的那样——"flow".
事实上,自 Java 9 以来的 Reactive Streams SPI 在java.util.concurrent.Flow类。
从这里来看,当我们在端点上应用一些反应式框架运算符时,Spring Integration 流似乎真的非常适合编写 Reactive Streams 应用程序,但实际上问题要广泛得多,我们需要记住,并非所有端点(例如JdbcMessageHandler) 可以在反应式流中透明地处理。
当然,Spring Integration 中 Reactive Streams 支持的主要目标是允许整个过程完全反应式、按需启动和背压就绪。
在通道适配器的目标协议和系统提供 Reactive Streams 交互模型之前,这是不可能的。
在下面的部分中,我们将描述 Spring Integration 中提供了哪些组件和方法,用于开发保留集成流结构的响应式应用程序。
Spring 集成中的所有 Reactive Streams 交互都是用 Project Reactor 类型实现的,比如Mono和Flux. |
消息网关
与 Reactive Streams 交互的最简单点是@MessagingGateway其中,我们只将 gateway 方法的 return 类型设为Mono<?>- 当订阅发生在返回的Mono实例。
看反应器Mono了解更多信息。
类似的Mono-reply 方法在框架内部用于完全基于 Reactive Streams 兼容协议的入站网关(有关更多信息,请参阅下面的 Reactive Channel Adapters)。
send-and-receive作被包装到Mono.defer()链接来自replyChannel标头。
这样,特定反应式协议(例如 Netty)的入站组件将作为在 Spring 集成上执行的反应式流的订阅者和发起者。
如果请求有效负载是反应类型,则最好在反应流定义中处理它,将进程推迟到发起方订阅。
为此,处理程序方法也必须返回响应式类型。
有关更多信息,请参阅下一节。
反应式回复有效负载
当回复产生MessageHandler返回回复消息的反应式类型有效负载,它会以异步方式处理,并使用常规的MessageChannelimplementation providedoutputChannel(async必须设置为true),并在输出通道为ReactiveStreamsSubscribableChannelimplementation 的 intent 实例,例如FluxMessageChannel.
使用标准祈使语气MessageChanneluse-case,如果回复有效负载是多值发布者(请参阅ReactiveAdapter.isMultiValue()有关更多信息),它被包装到Mono.just().
因此,Mono必须在下游显式订阅,或者由FluxMessageChannel下游。
使用ReactiveStreamsSubscribableChannel对于outputChannel,无需担心返回类型和订阅;框架内部一切都顺利处理。
有关更多信息,请参阅 Asynchronous Service Activator 。
如需了解详情,另请参阅 Kotlin 协程。
FluxMessageChannel和ReactiveStreamsConsumer
这FluxMessageChannel是MessageChannel和Publisher<Message<?>>.
一个Flux作为热源,是在内部创建的,用于从send()实现。
这Publisher.subscribe()implementation 被委托给该内部Flux.
此外,对于按需上游消费,FluxMessageChannel提供了ReactiveStreamsSubscribableChannel合同。
任何上游Publisher(例如,请参阅下面的 Source Polling Channel Adapter 和 splitter)当此通道的订阅准备就绪时,将自动订阅此通道。
来自此委派发布者的事件将沉入内部Flux上述。
一个FluxMessageChannel必须是org.reactivestreams.Subscriber实例来遵守 Reactive Streams 合约。
幸运的是,所有的MessageHandlerSpring Integration 中的实现还实现了一个CoreSubscriber来自 Reactor 项目。
多亏了ReactiveStreamsConsumerimplementation 中,整个集成流配置对目标开发人员是透明的。
在这种情况行为从命令式推送模型更改为反应式拉取模型。
一个ReactiveStreamsConsumer也可用于转动任何MessageChannel发送到响应式源中IntegrationReactiveUtils,使集成流部分反应。
看FluxMessageChannel了解更多信息。
从版本 5.5 开始,ConsumerEndpointSpec引入了reactive()选项,将流中的终端节点设为ReactiveStreamsConsumer独立于 input 通道。
可选的Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>可以提供以自定义源Flux从 Input Channel 通过Flux.transform()作,例如使用publishOn(),doOnNext(),retry()等。
此功能表示为@Reactivesub-annotation 的所有消息注释 (@ServiceActivator,@Splitter等)通过他们的reactive()属性。
源轮询通道适配器
通常,SourcePollingChannelAdapter依赖于由TaskScheduler.
轮询触发器是根据提供的选项构建的,用于定期计划任务以轮询目标数据或事件源。
当outputChannel是一个ReactiveStreamsSubscribableChannel一样Trigger用于确定下次执行的时间,但不是调度任务,而是SourcePollingChannelAdapter创建一个Flux<Message<?>>基于Flux.generate()对于nextExecutionTimevalues 和Mono.delay()从上一步开始的持续时间。
一个Flux.flatMapMany()用于轮询maxMessagesPerPoll并将它们沉入 outputFlux.
此生成器Flux由提供的ReactiveStreamsSubscribableChannel尊重下游的背压。
从版本 5.5 开始,当maxMessagesPerPoll == 0,则根本不会调用源,并且flatMapMany()立即通过Mono.empty()result 直到maxMessagesPerPoll稍后更改为非零值,例如通过 Control Bus。
这样,任何MessageSourceimplementation 可以变成一个反应式热源。
有关更多信息,请参阅轮询使用者。
事件驱动的通道适配器
MessageProducerSupport是事件驱动型通道适配器的基类,通常,其sendMessage(Message<?>)在生产驱动程序 API 中用作侦听器回调。
这个回调也可以很容易地插入到doOnNext()Reactor 运算符,当消息生成者实现构建Flux的消息,而不是基于侦听器的功能。
事实上,这是在框架中完成的,当outputChannel的消息生成者不是ReactiveStreamsSubscribableChannel.
但是,为了改进最终用户体验,并允许更多的背压就绪功能,MessageProducerSupport提供subscribeToPublisher(Publisher<? extends Message<?>>)在 Target 实施中使用的 APIPublisher<Message<?>>>是来自目标系统的数据源。
通常,它是从doStart()在为目标驱动程序 API 调用Publisher的源数据。
建议将反应性MessageProducerSupportimplementation 替换为FluxMessageChannel作为outputChannel用于按需订阅和下游事件使用。
当订阅Publisher已取消。
叫stop()在这样的通道上,适配器完成从源Publisher.
通道适配器可以通过自动订阅新创建的源来重新启动Publisher.
Message Source 到 Reactive Streams
从版本 5.3 开始,ReactiveMessageSourceProducer。
它是提供的MessageSource和事件驱动型生产集成到配置的outputChannel.
在内部,它包装了一个MessageSource到重复重新订阅的Mono生成一个Flux<Message<?>>以订阅subscribeToPublisher(Publisher<? extends Message<?>>)上述。
此订阅Mono使用Schedulers.boundedElastic()避免在目标中可能阻塞MessageSource.
当消息源返回null(没有数据可提取),则Mono转换为repeatWhenEmpty()state 替换为delay对于基于IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY Duration条目。
默认情况下,它是 1 秒。
如果MessageSource生成带有IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK信息,则会在 Headers 的doOnSuccess()原件Mono和 rejected 在doOnError()如果下游流抛出MessagingException替换为 Reject 的失败消息。
这ReactiveMessageSourceProducer可用于任何用例,当轮询通道适配器的功能应该转换为任何现有MessageSource<?>实现。
拆分器和聚合器
当AbstractMessageSplitter获取Publisher对于其逻辑,该过程自然会越过Publisher要将它们映射到消息中,以便发送到outputChannel.
如果此通道是ReactiveStreamsSubscribableChannel这Fluxwrapper 的Publisher从该通道按需订阅,并且此拆分器行为看起来更像是flatMapReactor 运算符,当我们将传入事件映射到多值输出时Publisher.
当整个集成流程是使用FluxMessageChannel在拆分器之前和之后,将 Spring 集成配置与 Reactive Streams 需求及其事件处理运算符保持一致。
对于常规通道,一个Publisher转换为Iterable用于标准的 Iterate-and-produce 拆分逻辑。
一个FluxAggregatorMessageHandler是另一个特定的 Reactive Streams 逻辑实现示例,可以将其视为"reactive operator"就 Project Reactor 而言。
它基于Flux.groupBy()和Flux.window()(或buffer()) 运算符。
传入消息将沉入Flux.create()在FluxAggregatorMessageHandler,使其成为热源。
这Flux由ReactiveStreamsSubscribableChannel按需,或直接在FluxAggregatorMessageHandler.start()当outputChannel不是反应性的。
这MessageHandler具有它的力量,当整个集成流程是使用FluxMessageChannel在这个 component 之前和之后,使整个 logic back-pressure 准备好。
有关更多信息,请参阅 Stream and Flux Splitting 和 Flux Aggregator 。
Java DSL
一IntegrationFlow在 Java 中,DSL 可以从任何Publisher实例(请参阅IntegrationFlow.from(Publisher<Message<T>>)).
此外,使用IntegrationFlowBuilder.toReactivePublisher()运算符、IntegrationFlow可以变成反应性热源。
一个FluxMessageChannel在这两种情况下都在内部使用;它可以订阅入站Publisher根据其ReactiveStreamsSubscribableChannel合同,它是一个Publisher<Message<?>>对于下游订阅者。
具有动态IntegrationFlow注册后,我们可以实现一个强大的逻辑,将 Reactive Streams 与这个集成流相结合,桥接到 / 从Publisher.
从版本 5.5.6 开始,toReactivePublisher(boolean autoStartOnSubscribe)operator 变体用于控制整个IntegrationFlow在返回的Publisher<Message<?>>.
通常,来自反应式发布者的订阅和使用发生在后面的运行时阶段,而不是在反应式流组合期间,甚至ApplicationContext启动。
为了避免使用IntegrationFlow在Publisher<Message<?>>订阅点,为了获得更好的最终用户体验,这个新运算符具有autoStartOnSubscribe标志。
它标记(如果true) 的IntegrationFlow及其组件autoStartup = false,因此ApplicationContext不会自动启动流中消息的生成和使用。
相反,start()对于IntegrationFlow从内部Flux.doOnSubscribe().
独立于autoStartOnSubscribe值,则流将从Flux.doOnCancel()和Flux.doOnTerminate()- 如果没有东西可以消费,那么生成消息就没有意义。
对于完全相反的用例,当IntegrationFlow应该调用一个反应式流并在完成后继续,一个fluxTransform()运算符在IntegrationFlowDefinition.
此时的流将转换为FluxMessageChannel它被传播到提供的fluxFunction,在Flux.transform()算子。
该函数的结果被包装到Mono<Message<?>>用于平面映射到输出Flux被另一个人订阅FluxMessageChannel用于下游流。
有关更多信息,请参见 Java DSL 章节。
ReactiveMessageHandler
从版本 5.3 开始,ReactiveMessageHandler在框架中原生支持。
这种类型的消息处理程序专为反应式客户端而设计,这些客户端返回反应式类型以进行按需订阅以执行低级作,并且不提供任何回复数据来继续反应式流组合。
当ReactiveMessageHandler在命令式集成流程中使用,handleMessage()result in subscribed 在返回后立即返回,只是因为在这样的 flow 中没有响应式流组合来遵守背压。
在这种情况下,框架将ReactiveMessageHandler转换为ReactiveMessageHandlerAdapter- 一个MessageHandler.
但是,当ReactiveStreamsConsumer参与流中(例如,当 channel to consume 是一个FluxMessageChannel),这样一个ReactiveMessageHandler由一个flatMap()反应器作员在消耗过程中遵守背压。
开箱即用的ReactiveMessageHandlerimplementation 是一个ReactiveMongoDbStoringMessageHandler用于出站通道适配器。
有关更多信息,请参见MongoDB Reactive Channel Adapters。
从版本 6.1 开始,IntegrationFlowDefinition暴露了一个方便的handleReactive(ReactiveMessageHandler)终端作员。
任何ReactiveMessageHandler实现(即使只是使用MonoAPI) 可用于此运算符。
框架订阅返回的Mono<Void>自然而然。
以下是此运算符的可能配置的简单示例:
@Bean
public IntegrationFlow wireTapFlow1() {
return IntegrationFlow.from("tappedChannel1")
.wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
.handleReactive((message) -> Mono.just(message).log().then());
}
此运算符的重载版本接受Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>>要围绕提供的ReactiveMessageHandler.
此外,一个ReactiveMessageHandlerSpec-基于的变体。
在大多数情况下,它们用于特定于协议的 channel adapter 实现。
请参阅下一节,从链接到具有相应 reactive channel adapters的目标技术。
Reactive 通道适配器
当集成的目标协议提供 Reactive Streams 解决方案时,在 Spring Integration 中实现通道适配器就变得简单了。
入站、事件驱动的通道适配器实现是将请求(如有必要)包装到延迟的Mono或Flux并仅在协议组件启动订阅到Mono从 listener 方法返回。
这样我们就有了一个完全封装在这个组件中的反应式流解决方案。
当然,在输出通道上订阅的下游集成流应该遵循 Reactive Streams 规范,并以按需、背压就绪的方式执行。
这并不总是由 的性质 (或当前实现) 提供的MessageHandler处理器。
可以使用线程池和队列或FluxMessageChannel(见上文)在没有反应式实现时,在 integration endpoints 之前和之后。
反应式事件驱动的入站通道适配器的示例:
public class CustomReactiveMessageProducer extends MessageProducerSupport {
private final CustomReactiveSource customReactiveSource;
public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
this.customReactiveSource = customReactiveSource;
}
@Override
protected void doStart() {
Flux<Message<?>> messageFlux =
this.customReactiveSource
.map(event - >
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
subscribeToPublisher(messageFlux);
}
}
用法将如下所示:
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.channel(outputChannel)
.get();
}
}
或者以声明的方式:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
.handle(outputChannel)
.get();
}
}
或者即使没有通道适配器,我们也可以始终按以下方式使用 Java DSL:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
Flux<Message<?>> myFlux = this.customReactiveSource
.map(event ->
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
return IntegrationFlow.from(myFlux)
.handle(outputChannel)
.get();
}
}
反应式出站通道适配器实现是关于根据为目标协议提供的反应式 API 启动(或延续)反应流以与外部系统交互。 入站有效负载本身可以是反应式类型,也可以是整个集成流的事件,它是顶部的反应式流的一部分。 如果我们处于单向、即发即弃的场景中,则可以立即订阅返回的反应式类型,或者它被传播到下游(请求-回复场景)以进行进一步的集成流或目标业务逻辑中的显式订阅,但仍在下游保留反应式流语义。
反应式出站通道适配器的示例:
public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {
private final CustomEntityOperations customEntityOperations;
public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
this.customEntityOperations = customEntityOperations;
}
@Override
protected Mono<Void> handleMessageInternal(Message<?> message) {
return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
.flatMap(mode -> {
switch (mode) {
case INSERT:
return handleInsert(message);
case UPDATE:
return handleUpdate(message);
default:
return Mono.error(new IllegalArgumentException());
}
}).then();
}
private Mono<Void> handleInsert(Message<?> message) {
return this.customEntityOperations.insert(message.getPayload())
.then();
}
private Mono<Void> handleUpdate(Message<?> message) {
return this.r2dbcEntityOperations.update(message.getPayload())
.then();
}
public enum Type {
INSERT,
UPDATE,
}
}
我们将能够使用这两个通道适配器:
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Autowired
private CustomReactiveMessageHandler customReactiveMessageHandler;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.transform(someOperation)
.handle(customReactiveMessageHandler)
.get();
}
}
目前, Spring 集成为 WebFlux、RSocket、MongoDb、R2DBC、ZeroMQ、GraphQL、Apache Cassandra 提供通道适配器(或网关)实现。
Redis Stream Channel Adapter 也是反应式的,并使用ReactiveStreamOperations来自 Spring Data。
更多的反应式通道适配器即将到来,例如基于ReactiveKafkaProducerTemplate和ReactiveKafkaConsumerTemplate来自 Spring for Apache Kafka 等。
对于许多其他非反应式通道适配器,建议使用线程池以避免在反应式流处理期间阻塞。
对命令式上下文传播的反应
当上下文传播库位于 Classpath 上时,Project Reactor 可以采用ThreadLocal值(例如 Micrometer Observation 或SecurityContextHolder) 并将它们存储到Subscriber上下文。
当我们需要填充日志记录 MDC 进行跟踪或让我们从反应流中调用的服务来从作用域中恢复观察时,也可以进行相反的作。
请参阅 Project Reactor 文档中有关其用于上下文传播的特殊运算符的更多信息。
如果我们的整个解决方案是单个响应式流组合,那么存储和恢复上下文可以顺利地进行,因为Subscribercontext 从下游一直到合成的开头都可见 (Flux或Mono).
但是,如果应用程序在不同的Flux实例或命令式处理并返回,则上下文绑定到Subscriber可能不可用。
对于这样的用例, Spring 集成提供了一个额外的功能(从版本6.0.5) 来存储 ReactorContextView到IntegrationMessageHeaderAccessor.REACTOR_CONTEXT从反应式流生成的 message 头,例如,当我们执行 directsend()操作。
然后,此标头在FluxMessageChannel.subscribeTo()恢复 Reactor 上下文Message这个频道将要发出。
目前,此标头是从WebFluxInboundEndpoint和RSocketInboundGateway组件,但可用于执行 Reactive to Imperative 集成的任何解决方案。
填充此标头的逻辑如下所示:
return requestMono
.flatMap((message) ->
Mono.deferContextual((context) ->
Mono.just(message)
.handle((messageToSend, sink) ->
send(messageWithReactorContextIfAny(messageToSend, context)))));
...
private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
if (!context.isEmpty()) {
return getMessageBuilderFactory()
.fromMessage(message)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
.build();
}
return message;
}
请注意,我们仍然需要使用handle()运算符进行 Reactor 还原ThreadLocal值。
即使它是作为标头发送的,框架也无法假设它是否要恢复到ThreadLocal值。
要从Message另一方面Flux或Monocomposition 中,可以执行此逻辑:
Mono.just(message)
.handle((messageToHandle, sink) -> ...)
.contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));