此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1spring-doc.cadn.net.cn

响应式流支持

Spring Integration 在框架的某些地方和不同方面为响应式流交互提供支持。 我们将在这里讨论其中的大部分,并在必要时提供指向目标章节的适当链接以获取详细信息。spring-doc.cadn.net.cn

前言

回顾一下,Spring Integration 扩展了 Spring 编程模型以支持众所周知的企业集成模式。 Spring Integration 在基于 Spring 的应用程序中实现轻量级消息传递,并支持通过声明性适配器与外部系统集成。 Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点的分离,这对于生成可维护、可测试的代码至关重要。 这个目标是在目标应用程序中使用一等公民(如message,channelendpoint,这允许我们构建一个集成流(管道),其中(在大多数情况下)一个端点将消息生成到一个通道中,供另一个端点使用。 通过这种方式,我们将集成交互模型与目标业务逻辑区分开来。 这里的关键部分是介于两者之间的通道:流行为取决于其实现,而端点保持不变。spring-doc.cadn.net.cn

另一方面,反应流是具有非阻塞背压的异步流处理的标准。 响应式流的主要目标是管理跨异步边界的流数据交换——例如将元素传递到另一个线程或线程池——同时确保接收端不会被迫缓冲任意数量的数据。 换句话说,背压是该模型的一个组成部分,以便允许在线程之间进行调解的队列受到限制。 响应式流实现(例如 Project Reactor)的目的是在流应用程序的整个处理图中保留这些优点和特征。 Reactive Streams 库的最终目标是以透明和流畅的方式为目标应用程序提供类型、运算符集和支持 API,就像可用的编程语言结构一样,但最终的解决方案并不像正常函数链调用那样势在必行。 它分为几个阶段:定义和执行,这发生在订阅最终响应式发布者一段时间后,对数据的需求从定义的底部推送到顶部,并根据需要施加背压 - 我们请求尽可能多的事件我们目前可以处理。 响应式应用程序看起来像一个"stream"或者正如我们在 Spring Integration 术语中习惯的那样 -"flow". 事实上,自 Java 9 以来的响应式流 SPI 在java.util.concurrent.Flow类。spring-doc.cadn.net.cn

从这里看,当我们在端点上应用一些响应式框架运算符时,Spring Integration 流可能看起来确实非常适合编写响应式流应用程序,但实际上问题要广泛得多,我们需要记住,并非所有端点(例如JdbcMessageHandler)可以在反应流中透明地处理。 当然,Spring Integration 中响应式流支持的主要目标是允许整个过程完全响应式、按需启动和背压准备就绪。 在通道适配器的目标协议和系统提供响应式流交互模型之前,这是不可能的。 在下面的部分中,我们将描述 Spring Integration 中提供了哪些组件和方法,用于开发响应式应用程序保留集成流结构。spring-doc.cadn.net.cn

Spring Integration 中的所有响应式流交互都使用 Project Reactor 类型实现,例如MonoFlux.

消息传递网关

与 Reactive Streams 交互的最简单点是@MessagingGateway其中我们只是将 gateway 方法的返回类型作为Mono<?>- 当订阅发生时,将执行网关方法调用背后的整个集成流Mono实例。 看反应器Mono了解更多信息。 类似的Mono-reply 方法在框架内部用于完全基于 Reactive Streams 兼容协议的入站网关(有关更多信息,请参阅下面的 Reactive Channel Adapters)。 发送和接收作包装在Mono.defer()链接来自replyChannel标头。 这样,特定响应式协议(例如 Netty)的入站组件将作为在 Spring Integration 上执行的响应式流的订阅者和发起者。 如果请求有效负载是响应式类型,则最好在响应式流定义中处理它,将进程延迟到Starters订阅。 为此,处理程序方法也必须返回响应式类型。 有关详细信息,请参阅下一节。spring-doc.cadn.net.cn

响应式应答有效负载

当回复产生MessageHandler返回回复消息的响应式类型有效负载,它以异步方式处理,并使用MessageChanneloutputChannel(这async必须设置为true),并在输出通道为ReactiveStreamsSubscribableChannel实现,例如FluxMessageChannel. 具有标准命令MessageChannel用例,并且回复有效负载是否是多值发布者(请参阅ReactiveAdapter.isMultiValue()更多信息),它被包装成一个Mono.just(). 因此,Mono必须在下游显式订阅或由FluxMessageChannel下游。 使用ReactiveStreamsSubscribableChannel对于outputChannel,无需担心返回类型和订阅;一切都由框架内部顺利处理。spring-doc.cadn.net.cn

有关详细信息,请参阅异步服务激活器spring-doc.cadn.net.cn

FluxMessageChannelReactiveStreamsConsumer

FluxMessageChannelMessageChannelPublisher<Message<?>>. 一个Flux,作为热源,在内部创建,用于接收来自send()实现。 这Publisher.subscribe()实现被委托给该内部Flux. 此外,对于按需上游消费,FluxMessageChannelReactiveStreamsSubscribableChannel合同。 任何上游Publisher(例如,请参阅下面的源轮询通道适配器和拆分器)当订阅准备好时,将自动订阅此通道。 来自此委托发布者的事件被沉入内部Flux上述。spring-doc.cadn.net.cn

消费者FluxMessageChannel必须是org.reactivestreams.Subscriber实例,以兑现响应式流合同。 幸运的是,所有MessageHandlerSpring Integration 中的实现还实现了CoreSubscriber来自 Reactor 项目。 多亏了ReactiveStreamsConsumer在实现之间,整个集成流配置对目标开发人员保持透明。 在这种情况行为从命令式推送模型更改为响应式拉动模型。 一个ReactiveStreamsConsumer也可用于转动任何MessageChannel使用IntegrationReactiveUtils,使集成流部分被动。spring-doc.cadn.net.cn

从 5.5 版本开始,ConsumerEndpointSpec引入了一个reactive()选项,将流中的端点设为ReactiveStreamsConsumer独立于输入通道。 可选的Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>可以提供来自定义源Flux从输入通道通过Flux.transform()作,例如使用publishOn(),doOnNext(),retry()等。 此功能表示为@Reactive子注释 (@ServiceActivator,@Splitter等等)通过他们的reactive()属性。spring-doc.cadn.net.cn

源轮询通道适配器

通常,SourcePollingChannelAdapter依赖于由TaskScheduler. 轮询触发器是根据提供的选项构建的,用于定期计划任务以轮询数据或事件的目标源。 当outputChannel是一个ReactiveStreamsSubscribableChannel一样Trigger用于确定下次执行时间,但不是调度任务,SourcePollingChannelAdapter创建一个Flux<Message<?>>基于Flux.generate()对于nextExecutionTimevalues 和Mono.delay()从上一步开始的持续时间。 一个Flux.flatMapMany()然后用于轮询maxMessagesPerPoll并将它们沉入输出中Flux. 该发电机Flux由提供的ReactiveStreamsSubscribableChannel尊重下游的背压。 从 5.5 版本开始,当maxMessagesPerPoll == 0,根本不调用源,并且flatMapMany()通过Mono.empty()结果,直到maxMessagesPerPoll稍后更改为非零值,例如通过控制总线。 这样,任何MessageSource实现可以变成反应热源。spring-doc.cadn.net.cn

有关详细信息,请参阅轮询消费者。spring-doc.cadn.net.cn

事件驱动通道适配器

MessageProducerSupport是事件驱动通道适配器的基类,通常,其sendMessage(Message<?>)在生产驱动程序 API 中用作监听器回调。 此回调也可以轻松插入doOnNext()Reactor 运算符,当消息生产者实现构建Flux消息而不是基于侦听器的功能。 事实上,这是在框架中完成的,当outputChannel消息的 producer 不是ReactiveStreamsSubscribableChannel. 但是,为了改善最终用户体验并允许更多背压就绪功能,请MessageProducerSupport提供一个subscribeToPublisher(Publisher<? extends Message<?>>)API 在目标实现中使用时Publisher<Message<?>>>是目标系统的数据源。 通常,它从doStart()为目标驱动程序 API 调用时实现Publisher源数据。 建议结合反应MessageProducerSupport使用FluxMessageChannel作为outputChannel用于下游的按需订阅和事件消费。 当订阅Publisher被取消。 叫stop()在这样的通道适配器上完成从源头生产Publisher. 可以通过自动订阅新创建的源来重新启动通道适配器Publisher.spring-doc.cadn.net.cn

响应式流的消息源

从 5.3 版开始,一个ReactiveMessageSourceProducer被提供。 它是提供的MessageSource和事件驱动生产到配置的outputChannel. 在内部,它包装了一个MessageSource进入反复重新订阅的Mono生成一个Flux<Message<?>>subscribeToPublisher(Publisher<? extends Message<?>>)上述。 此订阅Mono使用Schedulers.boundedElastic()以避免目标中可能出现阻塞MessageSource. 当消息源返回时null(没有数据可拉取),则Mono变成了一个repeatWhenEmpty()状态,并带有delay对于基于IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY Duration订阅者上下文中的条目。 默认情况下,它为 1 秒。 如果MessageSource生成带有IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK信息,则在doOnSuccess()原件的Mono并在doOnError()如果下游流抛出MessagingException与要拒绝的失败消息。 这ReactiveMessageSourceProducer可用于任何用例,当轮询通道适配器的功能应转换为任何现有的响应式按需解决方案时MessageSource<?>实现。spring-doc.cadn.net.cn

分路器和聚合器

AbstractMessageSplitter得到一个Publisher对于其逻辑,该过程自然会遍历Publisher将它们映射到消息中以发送到outputChannel. 如果此通道是ReactiveStreamsSubscribableChannelFlux包装器Publisher从该通道按需订阅,并且此拆分器行为看起来更像是flatMapReactor 运算符,当我们将传入事件映射到多值输出时Publisher. 当整个集成流使用FluxMessageChannel在拆分器之前和之后,使 Spring Integration 配置与 Reactive Streams 要求及其运算符保持一致,以进行事件处理。 对于常规通道,一个Publisher转换为Iterable用于标准迭代和生产拆分逻辑。spring-doc.cadn.net.cn

一个FluxAggregatorMessageHandler是特定 Reactive Streams 逻辑实现的另一个示例,可以将其视为"reactive operator"就 Project Reactor 而言。 它基于Flux.groupBy()Flux.window()(或buffer()) 运算符。 传入消息沉入Flux.create()FluxAggregatorMessageHandler创建,使其成为热源。 这FluxReactiveStreamsSubscribableChannel按需,或直接在FluxAggregatorMessageHandler.start()outputChannel不是反应性的。 这MessageHandler当整个集成流使用FluxMessageChannel在此组件之前和之后,使整个逻辑背压就绪。spring-doc.cadn.net.cn

有关更多信息,请参阅流和通量拆分通量聚合器spring-doc.cadn.net.cn

Java DSL

IntegrationFlow在 Java 中,DSL 可以从任何Publisher实例(参见IntegrationFlow.from(Publisher<Message<T>>)). 此外,使用IntegrationFlowBuilder.toReactivePublisher()运算符,则IntegrationFlow可以变成反应性热源。 一个FluxMessageChannel在这两种情况下都在内部使用;它可以订阅入站Publisher根据其ReactiveStreamsSubscribableChannelcontract 并且它是一个Publisher<Message<?>>为下游用户提供。 具有动态IntegrationFlow注册,我们可以实现一个强大的逻辑,将 Reactive Streams 与此集成流桥接/从中桥接Publisher.spring-doc.cadn.net.cn

从 5.5.6 版开始,一个toReactivePublisher(boolean autoStartOnSubscribe)运算符变体的存在来控制整个生命周期IntegrationFlow回归的背后Publisher<Message<?>>. 通常,来自响应式发布者的订阅和使用发生在以后的运行时阶段,而不是在响应式流组合期间,甚至不是在响应式流组合期间ApplicationContext启动。 为了避免用于生命周期管理的样板代码IntegrationFlowPublisher<Message<?>>订阅点,为了获得更好的最终用户体验,这个新运营商具有autoStartOnSubscribe标志已被引入。 它标记(如果true) 的IntegrationFlow及其组件用于autoStartup = false,所以一个ApplicationContext不会自动启动流中消息的生成和使用。 相反,该start()对于IntegrationFlow从内部启动Flux.doOnSubscribe(). 独立于autoStartOnSubscribe值,则流从Flux.doOnCancel()Flux.doOnTerminate()- 如果没有任何东西可以消费消息,那么生成消息是没有意义的。spring-doc.cadn.net.cn

对于完全相反的用例,当IntegrationFlow应该调用响应式流并在完成后继续,则fluxTransform()运算符在IntegrationFlowDefinition. 此时的流程变成了FluxMessageChannel它被传播到提供的fluxFunction,在Flux.transform()算子。 函数的结果被包装成Mono<Message<?>>用于平面映射到输出中Flux由另一个人订阅FluxMessageChannel用于下游流。spring-doc.cadn.net.cn

有关更多信息,请参阅 Java DSL 章节。spring-doc.cadn.net.cn

ReactiveMessageHandler

从 5.3 版开始,ReactiveMessageHandler在框架中原生支持。 这种类型的消息处理程序专为响应式客户端设计,这些客户端返回响应式类型以进行按需订阅以执行低级作,并且不提供任何回复数据来继续响应式流组合。 当ReactiveMessageHandler用于命令式集成流,handleMessage()返回后立即订阅,只是因为这样的流中没有响应式流组合来遵守背压。 在这种情况下,框架将ReactiveMessageHandler变成一个ReactiveMessageHandlerAdapter- 简单的实现MessageHandler. 但是,当ReactiveStreamsConsumer参与流(例如,当要消费的通道是FluxMessageChannel),这样的ReactiveMessageHandler组成整个响应式流,并带有flatMap()反应器操作员在消耗过程中遵守背压。spring-doc.cadn.net.cn

开箱即用的ReactiveMessageHandler实现是一个ReactiveMongoDbStoringMessageHandler用于出站通道适配器。 有关更多信息,请参阅 MongoDB 响应式通道适配器。spring-doc.cadn.net.cn

从 6.1 版开始,IntegrationFlowDefinition暴露了一个方便的handleReactive(ReactiveMessageHandler)Jetty运营商。 任何ReactiveMessageHandler实现(即使只是使用MonoAPI)可用于此运算符。 框架订阅返回的Mono<Void>自然而然。 以下是此运算符可能配置的简单示例:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow wireTapFlow1() {
    return IntegrationFlow.from("tappedChannel1")
            .wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
            .handleReactive((message) -> Mono.just(message).log().then());
}

此运算符的重载版本接受Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>>围绕提供的ReactiveMessageHandler.spring-doc.cadn.net.cn

此外,一个ReactiveMessageHandlerSpec还提供了基于的变体。 在大多数情况下,它们用于特定于协议的通道适配器实现。 请参阅下一节,该部分指向具有相应反应通道适配器的目标技术的链接。spring-doc.cadn.net.cn

无功通道适配器

当集成的目标协议提供 Reactive Streams 解决方案时,在 Spring Integration 中实现通道适配器变得很简单。spring-doc.cadn.net.cn

入站、事件驱动的通道适配器实现是将请求(如有必要)包装到延迟的MonoFlux并且仅当协议组件启动订阅时才执行发送(并生成回复,如果有)到Mono从 listener 方法返回。 这样,我们就有一个反应流解决方案完全封装在这个组件中。 当然,在输出通道上订阅的下游集成流应遵循 Reactive Streams 规范,并以按需、背压就绪的方式执行。spring-doc.cadn.net.cn

这并不总是根据性质(或当前实现)可用MessageHandler集成流中使用的处理器。 可以使用线程池和队列或FluxMessageChannel(见上文)在没有响应式实现时的集成端点之前和之后。spring-doc.cadn.net.cn

响应式事件驱动的入站通道适配器的示例:spring-doc.cadn.net.cn

public class CustomReactiveMessageProducer extends MessageProducerSupport {

    private final CustomReactiveSource customReactiveSource;

    public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
        Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
        this.customReactiveSource = customReactiveSource;
    }

    @Override
    protected void doStart() {
        Flux<Message<?>> messageFlux =
            this.customReactiveSource
                .map(event - >
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());

        subscribeToPublisher(messageFlux);
    }
}

用法如下所示:spring-doc.cadn.net.cn

public class MainFlow {
  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .channel(outputChannel)
        .get();
  }
}

或者以声明性方式:spring-doc.cadn.net.cn

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
        .handle(outputChannel)
        .get();
  }
}

或者即使没有通道适配器,我们也可以始终通过以下方式使用 Java DSL:spring-doc.cadn.net.cn

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
    Flux<Message<?>> myFlux = this.customReactiveSource
                .map(event ->
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());
     return IntegrationFlow.from(myFlux)
        .handle(outputChannel)
        .get();
  }
}

响应式出站通道适配器实现是关于根据为目标协议提供的响应式 API 启动(或继续)响应式流以与外部系统进行交互。 入站有效负载本身可以是响应式类型,也可以作为整个集成流的事件,它是顶部响应式流的一部分。 如果我们处于单向、一劳永逸的场景中,则可以立即订阅返回的响应式类型,或者它可以向下游传播(请求-回复场景)以进行进一步的集成流或目标业务逻辑中的显式订阅,但仍在下游保留响应式流语义。spring-doc.cadn.net.cn

响应式出站通道适配器的示例:spring-doc.cadn.net.cn

public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {

    private final CustomEntityOperations customEntityOperations;

    public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
        Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
        this.customEntityOperations = customEntityOperations;
    }

    @Override
    protected Mono<Void> handleMessageInternal(Message<?> message) {
        return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
                .flatMap(mode -> {
                    switch (mode) {
                        case INSERT:
                            return handleInsert(message);
                        case UPDATE:
                            return handleUpdate(message);
                        default:
                            return Mono.error(new IllegalArgumentException());
                    }
                }).then();
    }

    private Mono<Void> handleInsert(Message<?> message) {
        return this.customEntityOperations.insert(message.getPayload())
                .then();
    }

    private Mono<Void> handleUpdate(Message<?> message) {
        return this.r2dbcEntityOperations.update(message.getPayload())
                .then();
    }

    public enum Type {
        INSERT,
        UPDATE,
    }
}

我们将能够使用这两个通道适配器:spring-doc.cadn.net.cn

public class MainFlow {

  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Autowired
  private CustomReactiveMessageHandler customReactiveMessageHandler;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .transform(someOperation)
        .handle(customReactiveMessageHandler)
        .get();
  }
}

目前,Spring Integration 为 WebFluxRSocketMongoDbR2DBCZeroMQ、GraphQLApache Cassandra 提供了通道适配器(或网关)实现。 Redis 流通道适配器也是响应式的,并且使用ReactiveStreamOperations来自 Spring Data。 更多响应式通道适配器即将推出,例如,Kafka 中的 Apache Kafka 基于ReactiveKafkaProducerTemplateReactiveKafkaConsumerTemplate来自 Apache Kafka 等的 Spring。 对于许多其他非响应式通道适配器,建议使用线程池以避免在响应式流处理期间阻塞。spring-doc.cadn.net.cn

响应命令式上下文传播

上下文传播库位于类路径上时,Project Reactor 可以将ThreadLocal值(例如千分尺观察SecurityContextHolder) 并将它们存储到Subscriber上下文。 当我们需要填充日志记录 MDC 进行跟踪或让我们从反应流调用的服务从作用域恢复观察时,也可以进行相反的作。 请参阅 Project Reactor 文档中有关其用于上下文传播的特殊运算符的更多信息。 如果我们的整个解决方案是单个响应式流组合,则存储和恢复上下文可以顺利进行,因为Subscriber上下文从下游到合成的开头都可见(FluxMono). 但是,如果应用程序在不同的Flux实例或进入命令式处理并返回,然后上下文绑定到Subscriber可能不可用。 对于这样的用例,Spring Integration 提供了一个额外的功能(从 version6.0.5) 来存储反应堆ContextView进入IntegrationMessageHeaderAccessor.REACTOR_CONTEXT从响应式流生成的消息头,例如当我们执行send()操作。 然后,此标头用于FluxMessageChannel.subscribeTo()以恢复Message该通道将发出。 目前,此标头是从WebFluxInboundEndpointRSocketInboundGateway组件,但可用于执行命令式集成反应性的任何解决方案。 填充此标头的逻辑如下所示:spring-doc.cadn.net.cn

return requestMono
        .flatMap((message) ->
                Mono.deferContextual((context) ->
                        Mono.just(message)
                                .handle((messageToSend, sink) ->
                                        send(messageWithReactorContextIfAny(messageToSend, context)))));
...

private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
    if (!context.isEmpty()) {
        return getMessageBuilderFactory()
                .fromMessage(message)
                .setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
                .build();
    }
    return message;
}

请注意,我们仍然需要使用handle()运算符,使 Reactor 恢复ThreadLocal上下文中的值。 即使它作为标头发送,框架也无法假设它是否要恢复到ThreadLocal下游的值。spring-doc.cadn.net.cn

要从Message另一方面FluxMonocomposition,可以执行此逻辑:spring-doc.cadn.net.cn

Mono.just(message)
        .handle((messageToHandle, sink) -> ...)
        .contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));