此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1spring-doc.cadn.net.cn

消息通道实现

Spring Integration 提供了不同的消息通道实现。 以下部分简要介绍了每一个。spring-doc.cadn.net.cn

PublishSubscribeChannel

PublishSubscribeChannel实现广播任何Message发送给它的所有订阅处理程序。 这最常用于发送事件消息,其主要作用是通知(与文档消息相反,文档消息通常旨在由单个处理程序处理)。 请注意,PublishSubscribeChannel仅用于发送。 由于它在send(Message)方法,则使用者无法轮询消息(它不会实现PollableChannel因此没有receive()方法)。 相反,任何订阅者本身都必须是MessageHandler,订阅者的handleMessage(Message)方法依次调用。spring-doc.cadn.net.cn

在 3.0 版之前,调用send方法PublishSubscribeChannel没有返回订阅者false. 当与MessagingTemplate一个MessageDeliveryException被扔了。 从 3.0 版开始,行为已更改,使得send如果至少存在最少的订阅者(并成功处理消息),则始终被视为成功。 可以通过设置minSubscribers属性,默认为0.spring-doc.cadn.net.cn

如果您使用TaskExecutor,则仅使用正确数量的订阅者来进行此确定,因为消息的实际处理是异步执行的。

QueueChannel

QueueChannel实现包装队列。 与PublishSubscribeChannelQueueChannel具有点对点语义。 换句话说,即使通道有多个消费者,也只有一个消费者应该收到任何Message发送到该频道。 它提供了一个默认的无参数构造函数(提供本质上无限的容量Integer.MAX_VALUE)以及接受队列容量的构造函数,如以下列表所示:spring-doc.cadn.net.cn

public QueueChannel(int capacity)

未达到其容量限制的通道将消息存储在其内部队列中,并且send(Message<?>)方法立即返回,即使没有接收方准备好处理消息。 如果队列已达到容量,则发送方将阻止,直到队列中有可用空间。 或者,如果使用具有其他超时参数的 send 方法,则队列将阻塞,直到房间可用或超时期限过去,以先发生者为准。 同样,一个receive()如果队列上有消息可用,则调用会立即返回,但是,如果队列为空,则接收调用可能会阻塞,直到消息可用或超时(如果提供)过去。 无论哪种情况,都可以通过传递超时值 0 来强制立即返回,而不管队列的状态如何。 但是请注意,调用send()receive()没有timeoutparameter 块无限期。spring-doc.cadn.net.cn

PriorityChannel

QueueChannel强制实施先进先出 (FIFO) 排序,则PriorityChannel是一种替代实现,允许根据优先级在通道内对消息进行排序。 默认情况下,优先级由priority标头。 但是,对于自定义优先级确定逻辑,类型为Comparator<Message<?>>可以提供给PriorityChannel构造 函数。spring-doc.cadn.net.cn

RendezvousChannel

RendezvousChannel启用“直接切换”方案,其中发送方阻止,直到另一方调用通道的receive()方法。 另一方阻止,直到发件人发送消息。 在内部,此实现与QueueChannel,除了它使用SynchronousQueue(零容量实施BlockingQueue). 这在发送方和接收方在不同线程中运行的情况下效果很好,但异步删除队列中的消息是不合适的。 换句话说,使用RendezvousChannel,发送方知道某个接收方已经接受了该消息,而使用QueueChannel,则消息将存储到内部队列中,并且可能永远不会收到。spring-doc.cadn.net.cn

请记住,默认情况下,所有这些基于队列的通道仅将消息存储在内存中。 当需要持久性时,您可以在“queue”元素中提供“message-store”属性来引用持久性MessageStore实现,或者您可以将本地通道替换为由持久代理支持的通道,例如 JMS 支持的通道或通道适配器。 后一个选项允许您利用任何 JMS 提供程序的实现来实现消息持久性,如 JMS Support 中所述。 但是,当不需要在队列中缓冲时,最简单的方法是依赖DirectChannel,下一节将讨论。

RendezvousChannel对于实现请求-回复作也很有用。 发送方可以创建RendezvousChannel,然后在构建Message. 发送后Message,发送者可以立即调用receive(可选地提供超时值)以便在等待回复时阻止Message. 这与许多 Spring Integration 的请求-回复组件在内部使用的实现非常相似。spring-doc.cadn.net.cn

DirectChannel

DirectChannel具有点对点语义,但其他方面更类似于PublishSubscribeChannel比前面描述的任何基于队列的通道实现。 它实现了SubscribableChannel接口而不是PollableChannel接口,因此它将消息直接分派给订阅者。 然而,作为点对点通道,它与PublishSubscribeChannel因为它发送了每个Message到单个订阅的MessageHandler.spring-doc.cadn.net.cn

除了是最简单的点对点通道选项之外,它最重要的功能之一是它使单个线程能够在通道的“两端”执行作。 例如,如果处理程序订阅了DirectChannel,然后发送Message触发调用该处理程序的handleMessage(Message)方法直接在发送者的线程中,在send()方法调用可以返回。spring-doc.cadn.net.cn

提供具有此行为的通道实现的主要动机是支持必须跨通道的事务,同时仍受益于通道提供的抽象和松散耦合。 如果send()调用时,处理程序调用的结果(例如,更新数据库记录)在确定该事务的最终结果(提交或回滚)方面发挥作用。spring-doc.cadn.net.cn

由于DirectChannel是最简单的选项,不会增加调度和管理轮询器线程所需的任何额外开销,它是 Spring Integration 中的默认通道类型。 一般思想是为应用程序定义通道,考虑其中哪些通道需要提供缓冲或限制输入,并将它们修改为基于队列PollableChannels. 同样,如果通道需要广播消息,则不应是DirectChannel而是PublishSubscribeChannel. 稍后,我们将展示如何配置这些通道中的每一个。

DirectChannel在内部委托给消息调度器以调用其订阅的消息处理程序,并且该调度器可以具有由load-balancerload-balancer-ref属性(互斥)。 消息分派程序使用负载平衡策略来帮助确定当多个消息处理程序预订同一通道时,消息如何在消息处理程序之间分配。 为方便起见,该load-balancer属性公开指向预先存在的实现的值枚举LoadBalancingStrategy. 一个round-robin(轮换处理程序之间的负载平衡)和none(对于想要显式禁用负载平衡的情况)是唯一可用的值。 其他策略实现可能会在未来的版本中添加。 但是,从 3.0 版开始,您可以提供自己的LoadBalancingStrategy并使用load-balancer-ref属性,该属性应指向实现LoadBalancingStrategy,如以下示例所示:spring-doc.cadn.net.cn

一个FixedSubscriberChannel是一个SubscribableChannel仅支持单个MessageHandler订阅者。 这对于不涉及其他订阅者且不需要信道拦截器的高吞吐量性能用例非常有用。spring-doc.cadn.net.cn

<int:channel id="lbRefChannel">
  <int:dispatcher load-balancer-ref="lb"/>
</int:channel>

<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>

请注意,load-balancerload-balancer-ref属性是互斥的。spring-doc.cadn.net.cn

负载平衡也与布尔值结合使用failover财产。 如果failovervalue 为 true (默认值),当前面的处理程序抛出异常时,调度程序会回退到任何后续处理程序(如有必要)。 顺序由在处理程序本身上定义的可选顺序值决定,或者,如果不存在此类值,则由处理程序订阅的顺序决定。spring-doc.cadn.net.cn

如果某种情况要求调度程序始终尝试调用第一个处理程序,然后在每次发生错误时都以相同的固定顺序顺序回退,则不应提供负载平衡策略。 换句话说,调度器仍然支持failoverboolean 属性,即使未启用负载平衡。 但是,如果没有负载平衡,则处理程序的调用始终根据处理程序的顺序从第一个处理程序开始。 例如,当对初级、次级、第三级等有明确的定义时,这种方法效果很好。 使用命名空间支持时,order属性确定顺序。spring-doc.cadn.net.cn

请记住,负载平衡和failover仅当通道具有多个订阅消息处理程序时才适用。 使用命名空间支持时,这意味着多个端点共享input-channel属性。

从 5.2 版本开始,当failover为 true,则当前处理程序的失败以及失败的消息将记录在debuginfo如果分别配置。spring-doc.cadn.net.cn

ExecutorChannel

ExecutorChannel是一个点对点通道,支持与DirectChannel(负载平衡策略和failoverboolean 属性)。 这两种调度通道类型之间的主要区别在于ExecutorChannel委托给TaskExecutor执行调度。 这意味着 send 方法通常不会阻塞,但这也意味着处理程序调用可能不会发生在发送方的线程中。 因此,它不支持跨越发送方和接收处理程序的事务。spring-doc.cadn.net.cn

发件人有时可以阻止。 例如,当使用TaskExecutor使用限制客户端的拒绝策略(例如ThreadPoolExecutor.CallerRunsPolicy),发送方的线程可以在线程池达到最大容量且执行器的工作队列已满时执行该方法。 由于这种情况只会以不可预测的方式发生,因此您不应依赖它进行交易。

PartitionedChannel

从 6.1 版开始,PartitionedChannel提供了实现。 这是AbstractExecutorChannel表示点对点调度逻辑,其中实际消耗在特定线程上处理,由从发送到此通道的消息评估的分区键确定。 此通道类似于ExecutorChannel上面提到过,但不同之处在于具有相同分区键的消息始终在同一线程中处理,保留顺序。 它不需要外部TaskExecutor,但可以使用自定义ThreadFactory(例如Thread.ofVirtual().name("partition-", 0).factory()). 该工厂用于将单线程执行器填充到MessageDispatcher委托,每个分区。 默认情况下,IntegrationMessageHeaderAccessor.CORRELATION_ID消息头用作分区键。 此通道可以配置为简单的 bean:spring-doc.cadn.net.cn

@Bean
PartitionedChannel somePartitionedChannel() {
    return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}

该频道将具有3分区 - 专用线程;将使用partitionKey标头来确定消息将在哪个分区中处理。 看PartitionedChannelclass Javadocs 了解更多信息。spring-doc.cadn.net.cn

FluxMessageChannel

FluxMessageChannel是一个org.reactivestreams.Publisher实现"sinking"将消息发送到内部reactor.core.publisher.Flux用于下游响应式用户的按需消费。 此通道实现既不是SubscribableChannel,也不是PollableChannel,所以只有org.reactivestreams.Subscriber实例可用于从此通道使用,尊重响应式流的背压性质。 另一方面,FluxMessageChannel实现一个ReactiveStreamsSubscribableChannel与其subscribeTo(Publisher<Message<?>>)Contract 允许从响应式源发布者接收事件,将响应式流桥接到集成流中。 要实现整个集成流的完全反应性行为,必须在流中的所有端点之间放置这样的通道。spring-doc.cadn.net.cn

有关与响应式流交互的更多信息,请参阅响应式流支持spring-doc.cadn.net.cn

作用域通道

Spring Integration 1.0 提供了一个ThreadLocalChannel实现,但从 2.0 开始已被删除。 现在,处理相同需求的更通用方法是添加一个scope属性添加到频道。 属性的值可以是上下文中可用的作用域的名称。 例如,在 Web 环境中,某些作用域可用,并且任何自定义作用域实现都可以向上下文注册。 以下示例显示了应用于通道的线程本地作用域,包括作用域本身的注册:spring-doc.cadn.net.cn

<int:channel id="threadScopedChannel" scope="thread">
     <int:queue />
</int:channel>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
        </map>
    </property>
</bean>

上一个示例中定义的通道也在内部委托给队列,但通道绑定到当前线程,因此队列的内容也类似地绑定。 这样,发送到通道的线程稍后可以接收这些相同的消息,但其他线程无法访问它们。 虽然很少需要线程范围的通道,但它们在以下情况下可能很有用DirectChannel实例用于强制执行单个线程的作,但任何回复消息都应发送到“终端”通道。 如果该终端通道是线程范围的,则原始发送线程可以从终端通道收集其回复。spring-doc.cadn.net.cn

现在,由于任何通道都可以限定范围,因此除了 thread-Local 之外,您还可以定义自己的范围。spring-doc.cadn.net.cn