此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1spring-doc.cadn.net.cn

线程屏障

有时,我们需要挂起消息流线程,直到发生其他异步事件。 例如,考虑将消息发布到 RabbitMQ 的 HTTP 请求。 我们可能希望在 RabbitMQ 代理发出收到消息的确认之前不回复用户。spring-doc.cadn.net.cn

在 4.2 版本中,Spring Integration 引入了<barrier/>组件。 基础MessageHandlerBarrierMessageHandler. 此类还实现MessageTriggerAction,其中消息传递给trigger()方法在handleRequestMessage()方法(如果存在)。spring-doc.cadn.net.cn

挂起的线程和触发线程通过调用CorrelationStrategy在消息上。 当消息发送到input-channel,线程最多悬挂requestTimeout毫秒,等待相应的触发消息。 默认关联策略使用IntegrationMessageHeaderAccessor.CORRELATION_ID页眉。 当触发器消息到达具有相同关联时,线程将被释放。 发送到output-channelAfter Release 是通过使用MessageGroupProcessor. 默认情况下,该消息是Collection<?>两个有效负载,并且标头是使用DefaultAggregatingMessageGroupProcessor.spring-doc.cadn.net.cn

如果trigger()方法首先被调用(或在主线程超时后),它最多会被挂起triggerTimeout等待挂起消息到达。 如果您不想挂起触发器线程,请考虑将触发器线程移交给TaskExecutor相反,使其线程被挂起。
在 5.4 之前,只有一个timeout选项,但在某些情况下,最好为这些作设置不同的超时。 因此requestTimeouttriggerTimeout已引入选项。

requires-reply属性确定如果挂起的线程在触发器消息到达之前超时时要执行的作。 默认情况下,它是false,这意味着端点返回null,流结束,线程返回给调用方。 什么时候true一个ReplyRequiredException被抛出。spring-doc.cadn.net.cn

您可以调用trigger()方法以编程方式(使用名称barrier.handler— 其中barrier是屏障端点的 bean 名称)。 或者,您可以配置<outbound-channel-adapter/>以触发释放。spring-doc.cadn.net.cn

只能挂起一个具有相同关联的线程。 同一相关性可以多次使用,但只能同时使用一次。 如果第二个线程到达具有相同的相关性,则会引发异常。

以下示例演示如何使用自定义标头进行关联:spring-doc.cadn.net.cn

@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
    BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
    barrier.setOutputChannel(out());
    barrier.setDiscardChannel(lateTriggerChannel);
    return barrier;
}

@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
    return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
        correlation-strategy-expression="headers['myHeader']"
        output-processor="myOutputProcessor"
        discard-channel="lateTriggerChannel"
        timeout="10000">
</int:barrier>

<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />

根据哪个消息先到达,将消息发送到in或将消息发送到release最多等待 10 秒钟,直到另一条消息到达。 发布消息时,outchannel 会发送一条消息,该消息结合了调用自定义MessageGroupProcessorbean,命名为myOutputProcessor. 如果主线程超时并且触发器稍后到达,您可以配置将延迟触发器发送到的丢弃通道。 如果请求消息未及时到达,触发器消息也会被丢弃。spring-doc.cadn.net.cn

有关此组件的示例,请参阅屏障示例应用程序spring-doc.cadn.net.cn