对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
响应式流支持
Spring Integration 在框架的某些地方和不同方面提供了对 Reactive Streams 交互的支持。我们将在必要时讨论其中的大部分内容,并提供指向目标章节的适当链接以获取详细信息。
前言
回顾一下,Spring Integration 扩展了 Spring 编程模型以支持众所周知的企业集成模式。Spring Integration 在基于 Spring 的应用程序中实现轻量级消息传递,并支持通过声明式适配器与外部系统集成。Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点的分离,这对于生成可维护、可测试的代码至关重要。这个目标是在目标应用程序中使用一等公民实现的,例如message
,channel
和endpoint
,这允许我们构建一个集成流(管道),其中(在大多数情况下)一个端点将消息生成到一个通道中,供另一个端点使用。通过这种方式,我们将集成交互模型与目标业务逻辑区分开来。这里的关键部分是介于两者之间的通道:流行为取决于其实现,而端点保持不变。
另一方面,反应流是具有非阻塞背压的异步流处理的标准。反应流的主要目标是管理跨异步边界的流数据交换——例如将元素传递到另一个线程或线程池——同时确保接收端不会被迫缓冲任意数量的数据。换句话说,背压是该模型的一个组成部分,以允许在线程之间进行调解的队列被限制。反应流实现(例如 Project Reactor)的目的是在流应用程序的整个处理图中保留这些优势和特征。反应流库的最终目标是以透明和流畅的方式为目标应用程序提供类型、运算符集和支持 API,就像可用的编程语言结构一样,但最终的解决方案并不像正常函数链调用那样必要。它分为几个阶段:定义和执行,这发生在订阅最终响应式发布者的一段时间后,对数据的需求从定义的底部推送到顶部,并根据需要施加背压 - 我们请求尽可能多的事件,我们目前可以处理。响应式应用程序看起来像一个"stream"
或者正如我们在 Spring Integration 术语中习惯的那样 -"flow"
. 事实上,自 Java 9 以来的响应式流 SPI 在java.util.concurrent.Flow
类。
从这里看,当我们在端点上应用一些响应式框架运算符时,Spring Integration 流可能看起来确实非常适合编写响应式流应用程序,但实际上问题要广泛得多,我们需要记住,并非所有端点(例如JdbcMessageHandler
)可以在反应流中透明地处理。当然,Spring Integration中响应式流支持的主要目标是允许整个过程完全响应式,按需启动并准备背压。在通道适配器的目标协议和系统提供响应式流交互模型之前,这是不可能的。在下面的部分中,我们将描述Spring Integration中提供了哪些组件和方法,用于开发响应式应用程序保留集成流结构。
Spring Integration 中的所有响应式流交互都使用 Project Reactor 类型实现,例如Mono 和Flux . |
消息传递网关
与 Reactive Streams 交互的最简单点是@MessagingGateway
其中我们只是将 gateway 方法的返回类型作为Mono<?>
- 当订阅发生时,将执行网关方法调用背后的整个集成流Mono
实例。 看反应器Mono
了解更多信息。类似的Mono
-reply 方法在框架内部用于完全基于 Reactive Streams 兼容协议的入站网关(有关更多信息,请参阅下面的 Reactive Channel Adapters)。发送和接收作包装在Mono.deffer()
链接来自replyChannel
标头。这样,特定响应式协议(例如 Netty)的入站组件将作为在 Spring Integration 上执行的响应式流的订阅者和发起器。如果请求有效负载是响应式类型,则最好使用响应式流定义来处理它,将进程推迟到Starters订阅。为此,处理程序方法也必须返回响应式类型。有关更多信息,请参阅下一节。
响应式应答有效负载
当回复产生MessageHandler
返回回复消息的响应式类型有效负载,它以异步方式处理,并使用MessageChannel
为outputChannel
(这async
必须设置为true
),并在输出通道为ReactiveStreamsSubscribableChannel
实现,例如FluxMessageChannel
. 具有标准命令MessageChannel
用例,并且回复有效负载是否是多值发布者(请参阅ReactiveAdapter.isMultiValue()
更多信息),它被包装成一个Mono.just()
. 因此,Mono
必须在下游显式订阅或由FluxMessageChannel
下游。 使用ReactiveStreamsSubscribableChannel
对于outputChannel
,无需担心返回类型和订阅;一切都由框架内部顺利处理。
有关详细信息,请参阅异步服务激活器。
另请参阅 Kotlin 协程了解更多信息。
FluxMessageChannel
和ReactiveStreamsConsumer
这FluxMessageChannel
是MessageChannel
和Publisher<Message<?>>
. 一个Flux
,作为热源,在内部创建,用于接收来自send()
实现。 这Publisher.subscribe()
实现被委托给该内部Flux
. 此外,对于按需上游消费,FluxMessageChannel
为ReactiveStreamsSubscribableChannel
合同。 任何上游Publisher
(例如,请参阅下面的源轮询通道适配器和拆分器)当此通道的订阅准备好时,会自动订阅此通道。来自此委托发布者的事件将沉入内部Flux
上述。
消费者FluxMessageChannel
必须是org.reactivestreams.Subscriber
实例来兑现 Reactive Streams 合同。幸运的是,所有MessageHandler
Spring Integration 中的实现还实现了CoreSubscriber
来自 Reactor 项目。
多亏了ReactiveStreamsConsumer
在实现之间,整个集成流配置对目标开发人员保持透明。
在这种情况行为从命令式推送模型更改为响应式拉动模型。
一个ReactiveStreamsConsumer
也可用于转动任何MessageChannel
使用IntegrationReactiveUtils
,使集成流部分被动。
看FluxMessageChannel
了解更多信息。
从 5.5 版本开始,ConsumerEndpointSpec
引入了一个reactive()
选项,将流中的端点设为ReactiveStreamsConsumer
独立于输入通道。
可选的Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
可以提供来自定义源Flux
从输入通道通过Flux.transform()
作,例如使用publishOn()
,doOnNext()
,retry()
等。
此功能表示为@Reactive
子注释 (@ServiceActivator
,@Splitter
等等)通过他们的reactive()
属性。
源轮询通道适配器
通常,SourcePollingChannelAdapter
依赖于由TaskScheduler
.
轮询触发器是根据提供的选项构建的,用于定期计划任务以轮询数据或事件的目标源。
当outputChannel
是一个ReactiveStreamsSubscribableChannel
一样Trigger
用于确定下次执行时间,但不是调度任务,SourcePollingChannelAdapter
创建一个Flux<Message<?>>
基于Flux.generate()
对于nextExecutionTime
values 和Mono.delay()
从上一步开始的持续时间。
一个Flux.flatMapMany()
然后用于轮询maxMessagesPerPoll
并将它们沉入输出中Flux
.
该发电机Flux
由提供的ReactiveStreamsSubscribableChannel
尊重下游的背压。
从 5.5 版本开始,当maxMessagesPerPoll == 0
,根本不调用源,并且flatMapMany()
通过Mono.empty()
结果,直到maxMessagesPerPoll
稍后更改为非零值,例如通过控制总线。
这样,任何MessageSource
实现可以变成反应热源。
有关详细信息,请参阅轮询消费者。
事件驱动通道适配器
MessageProducerSupport
是事件驱动通道适配器的基类,通常,其sendMessage(Message<?>)
在生产驱动程序 API 中用作监听器回调。
此回调也可以轻松插入doOnNext()
Reactor 运算符,当消息生产者实现构建Flux
消息而不是基于侦听器的功能。
事实上,这是在框架中完成的,当outputChannel
消息的 producer 不是ReactiveStreamsSubscribableChannel
.
但是,为了改善最终用户体验并允许更多背压就绪功能,请MessageProducerSupport
提供一个subscribeToPublisher(Publisher<? extends Message<?>>)
API 在目标实现中使用时Publisher<Message<?>>>
是目标系统的数据源。
通常,它从doStart()
为目标驱动程序 API 调用时实现Publisher
源数据。
建议结合反应MessageProducerSupport
使用FluxMessageChannel
作为outputChannel
用于下游的按需订阅和事件消费。
当订阅Publisher
被取消。
叫stop()
在这样的通道适配器上完成从源头生产Publisher
.
可以通过自动订阅新创建的源来重新启动通道适配器Publisher
.
响应式流的消息源
从 5.3 版开始,一个ReactiveMessageSourceProducer
被提供。
它是提供的MessageSource
和事件驱动生产到配置的outputChannel
.
在内部,它包装了一个MessageSource
进入反复重新订阅的Mono
生成一个Flux<Message<?>>
在subscribeToPublisher(Publisher<? extends Message<?>>)
上述。
此订阅Mono
使用Schedulers.boundedElastic()
以避免目标中可能出现阻塞MessageSource
.
当消息源返回时null
(没有数据可拉取),则Mono
变成了一个repeatWhenEmpty()
状态,并带有delay
对于基于IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY
Duration
订阅者上下文中的条目。
默认情况下,它为 1 秒。
如果MessageSource
生成带有IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
信息,则在doOnSuccess()
原件的Mono
并在doOnError()
如果下游流抛出MessagingException
与要拒绝的失败消息。
这ReactiveMessageSourceProducer
可用于任何用例,当轮询通道适配器的功能应转换为任何现有的响应式按需解决方案时MessageSource<?>
实现。
分路器和聚合器
当AbstractMessageSplitter
得到一个Publisher
对于其逻辑,该过程自然会遍历Publisher
将它们映射到消息中以发送到outputChannel
.
如果此通道是ReactiveStreamsSubscribableChannel
这Flux
包装器Publisher
从该通道按需订阅,并且此拆分器行为看起来更像是flatMap
Reactor 运算符,当我们将传入事件映射到多值输出时Publisher
.
当整个集成流使用FluxMessageChannel
在拆分器之前和之后,使 Spring Integration 配置与 Reactive Streams 要求及其运算符保持一致,以进行事件处理。
对于常规通道,一个Publisher
转换为Iterable
用于标准迭代和生产拆分逻辑。
一个FluxAggregatorMessageHandler
是特定 Reactive Streams 逻辑实现的另一个示例,可以将其视为"reactive operator"
就 Project Reactor 而言。
它基于Flux.groupBy()
和Flux.window()
(或buffer()
) 运算符。
传入消息沉入Flux.create()
在FluxAggregatorMessageHandler
创建,使其成为热源。
这Flux
由ReactiveStreamsSubscribableChannel
按需,或直接在FluxAggregatorMessageHandler.start()
当outputChannel
不是反应性的。
这MessageHandler
当整个集成流使用FluxMessageChannel
在此组件之前和之后,使整个逻辑背压就绪。
Java DSL
一IntegrationFlow
在 Java 中,DSL 可以从任何Publisher
实例(参见IntegrationFlow.from(Publisher<Message<T>>)
).
此外,使用IntegrationFlowBuilder.toReactivePublisher()
运算符,则IntegrationFlow
可以变成反应性热源。
一个FluxMessageChannel
在这两种情况下都在内部使用;它可以订阅入站Publisher
根据其ReactiveStreamsSubscribableChannel
contract 并且它是一个Publisher<Message<?>>
为下游用户提供。
具有动态IntegrationFlow
注册,我们可以实现一个强大的逻辑,将 Reactive Streams 与此集成流桥接/从中桥接Publisher
.
从 5.5.6 版开始,一个toReactivePublisher(boolean autoStartOnSubscribe)
运算符变体的存在来控制整个生命周期IntegrationFlow
回归的背后Publisher<Message<?>>
.
通常,来自响应式发布者的订阅和使用发生在以后的运行时阶段,而不是在响应式流组合期间,甚至不是在响应式流组合期间ApplicationContext
启动。
为了避免用于生命周期管理的样板代码IntegrationFlow
在Publisher<Message<?>>
订阅点,为了获得更好的最终用户体验,这个新运营商具有autoStartOnSubscribe
标志已被引入。
它标记(如果true
) 的IntegrationFlow
及其组件用于autoStartup = false
,所以一个ApplicationContext
不会自动启动流中消息的生成和使用。
相反,该start()
对于IntegrationFlow
从内部启动Flux.doOnSubscribe()
.
独立于autoStartOnSubscribe
值,则流从Flux.doOnCancel()
和Flux.doOnTerminate()
- 如果没有任何东西可以消费消息,那么生成消息是没有意义的。
对于完全相反的用例,当IntegrationFlow
应该调用响应式流并在完成后继续,则fluxTransform()
运算符在IntegrationFlowDefinition
.
此时的流程变成了FluxMessageChannel
它被传播到提供的fluxFunction
,在Flux.transform()
算子。
函数的结果被包装成Mono<Message<?>>
用于平面映射到输出中Flux
由另一个人订阅FluxMessageChannel
用于下游流。
有关更多信息,请参阅 Java DSL 章节。
ReactiveMessageHandler
从 5.3 版开始,ReactiveMessageHandler
在框架中原生支持。
这种类型的消息处理程序专为响应式客户端设计,这些客户端返回响应式类型以进行按需订阅以执行低级作,并且不提供任何回复数据来继续响应式流组合。
当ReactiveMessageHandler
用于命令式集成流,handleMessage()
返回后立即订阅,只是因为这样的流中没有响应式流组合来遵守背压。
在这种情况下,框架将ReactiveMessageHandler
变成一个ReactiveMessageHandlerAdapter
- 简单的实现MessageHandler
.
但是,当ReactiveStreamsConsumer
参与流(例如,当要消费的通道是FluxMessageChannel
),这样的ReactiveMessageHandler
组成整个响应式流,并带有flatMap()
反应器操作员在消耗过程中遵守背压。
开箱即用的ReactiveMessageHandler
实现是一个ReactiveMongoDbStoringMessageHandler
用于出站通道适配器。
有关更多信息,请参阅 MongoDB 响应式通道适配器。
从 6.1 版开始,IntegrationFlowDefinition
暴露了一个方便的handleReactive(ReactiveMessageHandler)
Jetty运营商。
任何ReactiveMessageHandler
实现(即使只是使用Mono
API)可用于此运算符。
框架订阅返回的Mono<Void>
自然而然。
以下是此运算符可能配置的简单示例:
@Bean
public IntegrationFlow wireTapFlow1() {
return IntegrationFlow.from("tappedChannel1")
.wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
.handleReactive((message) -> Mono.just(message).log().then());
}
此运算符的重载版本接受Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>>
围绕提供的ReactiveMessageHandler
.
此外,一个ReactiveMessageHandlerSpec
还提供了基于的变体。
在大多数情况下,它们用于特定于协议的通道适配器实现。
请参阅下一节,该部分指向具有相应反应通道适配器的目标技术的链接。
无功通道适配器
当集成的目标协议提供 Reactive Streams 解决方案时,在 Spring Integration 中实现通道适配器变得很简单。
入站、事件驱动的通道适配器实现是将请求(如有必要)包装到延迟的Mono
或Flux
并且仅当协议组件启动订阅时才执行发送(并生成回复,如果有)到Mono
从 listener 方法返回。
这样,我们就有一个反应流解决方案完全封装在这个组件中。
当然,在输出通道上订阅的下游集成流应遵循 Reactive Streams 规范,并以按需、背压就绪的方式执行。
这并不总是根据性质(或当前实现)可用MessageHandler
集成流中使用的处理器。
可以使用线程池和队列或FluxMessageChannel
(见上文)在没有响应式实现时的集成端点之前和之后。
响应式事件驱动的入站通道适配器的示例:
public class CustomReactiveMessageProducer extends MessageProducerSupport {
private final CustomReactiveSource customReactiveSource;
public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
this.customReactiveSource = customReactiveSource;
}
@Override
protected void doStart() {
Flux<Message<?>> messageFlux =
this.customReactiveSource
.map(event - >
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
subscribeToPublisher(messageFlux);
}
}
用法如下所示:
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.channel(outputChannel)
.get();
}
}
或者以声明性方式:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
.handle(outputChannel)
.get();
}
}
或者即使没有通道适配器,我们也可以始终通过以下方式使用 Java DSL:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
Flux<Message<?>> myFlux = this.customReactiveSource
.map(event ->
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
return IntegrationFlow.from(myFlux)
.handle(outputChannel)
.get();
}
}
响应式出站通道适配器实现是关于根据为目标协议提供的响应式 API 启动(或继续)响应式流以与外部系统进行交互。 入站有效负载本身可以是响应式类型,也可以作为整个集成流的事件,它是顶部响应式流的一部分。 如果我们处于单向、一劳永逸的场景中,则可以立即订阅返回的响应式类型,或者它可以向下游传播(请求-回复场景)以进行进一步的集成流或目标业务逻辑中的显式订阅,但仍在下游保留响应式流语义。
响应式出站通道适配器的示例:
public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {
private final CustomEntityOperations customEntityOperations;
public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
this.customEntityOperations = customEntityOperations;
}
@Override
protected Mono<Void> handleMessageInternal(Message<?> message) {
return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
.flatMap(mode -> {
switch (mode) {
case INSERT:
return handleInsert(message);
case UPDATE:
return handleUpdate(message);
default:
return Mono.error(new IllegalArgumentException());
}
}).then();
}
private Mono<Void> handleInsert(Message<?> message) {
return this.customEntityOperations.insert(message.getPayload())
.then();
}
private Mono<Void> handleUpdate(Message<?> message) {
return this.r2dbcEntityOperations.update(message.getPayload())
.then();
}
public enum Type {
INSERT,
UPDATE,
}
}
我们将能够使用这两个通道适配器:
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Autowired
private CustomReactiveMessageHandler customReactiveMessageHandler;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.transform(someOperation)
.handle(customReactiveMessageHandler)
.get();
}
}
目前,Spring Integration 为 WebFlux、RSocket、MongoDb、R2DBC、ZeroMQ、GraphQL、Apache Cassandra 提供了通道适配器(或网关)实现。
Redis 流通道适配器也是响应式的,并且使用ReactiveStreamOperations
来自 Spring Data。
更多响应式通道适配器即将推出,例如,Kafka 中的 Apache Kafka 基于ReactiveKafkaProducerTemplate
和ReactiveKafkaConsumerTemplate
来自 Apache Kafka 等的 Spring。
对于许多其他非响应式通道适配器,建议使用线程池以避免在响应式流处理期间阻塞。
响应命令式上下文传播
当上下文传播库位于类路径上时,Project Reactor 可以将ThreadLocal
值(例如千分尺观察或SecurityContextHolder
) 并将它们存储到Subscriber
上下文。
当我们需要填充日志记录 MDC 进行跟踪或让我们从反应流调用的服务从作用域恢复观察时,也可以进行相反的作。
请参阅 Project Reactor 文档中有关其用于上下文传播的特殊运算符的更多信息。
如果我们的整个解决方案是单个响应式流组合,则存储和恢复上下文可以顺利进行,因为Subscriber
上下文从下游到合成的开头都可见(Flux
或Mono
).
但是,如果应用程序在不同的Flux
实例或进入命令式处理并返回,然后上下文绑定到Subscriber
可能不可用。
对于这样的用例,Spring Integration 提供了一个额外的功能(从 version6.0.5
) 来存储反应堆ContextView
进入IntegrationMessageHeaderAccessor.REACTOR_CONTEXT
从响应式流生成的消息头,例如当我们执行send()
操作。
然后,此标头用于FluxMessageChannel.subscribeTo()
以恢复Message
该通道将发出。
目前,此标头是从WebFluxInboundEndpoint
和RSocketInboundGateway
组件,但可用于执行命令式集成反应性的任何解决方案。
填充此标头的逻辑如下所示:
return requestMono
.flatMap((message) ->
Mono.deferContextual((context) ->
Mono.just(message)
.handle((messageToSend, sink) ->
send(messageWithReactorContextIfAny(messageToSend, context)))));
...
private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
if (!context.isEmpty()) {
return getMessageBuilderFactory()
.fromMessage(message)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
.build();
}
return message;
}
请注意,我们仍然需要使用handle()
运算符,使 Reactor 恢复ThreadLocal
上下文中的值。
即使它作为标头发送,框架也无法假设它是否要恢复到ThreadLocal
下游的值。
要从Message
另一方面Flux
或Mono
composition,可以执行此逻辑:
Mono.just(message)
.handle((messageToHandle, sink) -> ...)
.contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));