此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1spring-doc.cadn.net.cn

轮询器

本节介绍轮询在 Spring Integration 中的工作原理。spring-doc.cadn.net.cn

轮询消费者

当消息端点(通道适配器)连接到通道并实例化时,它们会生成以下实例之一:spring-doc.cadn.net.cn

实际实现取决于这些端点连接到的通道类型。 连接到实现org.springframework.messaging.SubscribableChannel接口生成一个EventDrivenConsumer. 另一方面,连接到实现org.springframework.messaging.PollableChannel接口(例如QueueChannel) 会生成PollingConsumer.spring-doc.cadn.net.cn

轮询消费者让 Spring Integration 组件主动轮询 Message,而不是以事件驱动的方式处理消息。spring-doc.cadn.net.cn

它们代表了许多消息传递方案中的关键跨领域问题。 在 Spring Integration 中,轮询消费者基于同名模式,这在 Gregor Hohpe 和 Bobby Woolf 合著的《企业集成模式》一书中进行了描述。 您可以在该书的网站上找到该图案的描述。spring-doc.cadn.net.cn

有关轮询使用者配置的更多信息,请参阅消息端点spring-doc.cadn.net.cn

可轮询消息源

Spring Integration 提供了轮询消费者模式的第二种变体。 使用入站通道适配器时,这些适配器通常由SourcePollingChannelAdapter. 例如,从远程 FTP 服务器位置检索消息时,FTP 入站通道适配器中描述的适配器配置了轮询器以定期检索消息。 因此,当组件配置了轮询器时,生成的实例属于以下类型之一:spring-doc.cadn.net.cn

这意味着轮询器用于入站和出站消息传递方案。 以下是使用轮询器的一些用例:spring-doc.cadn.net.cn

AOP 建议类可以应用于轮询器,在advice-chain,例如启动交易的交易通知。 从 4.1 版开始,一个PollSkipAdvice被提供。 轮询器使用触发器来确定下一次轮询的时间。 这PollSkipAdvice可用于抑制(跳过)轮询,可能是因为存在某些下游条件会阻止消息被处理。 要使用此建议,您必须为其提供PollSkipStrategy. 从 4.2.5 版开始,SimplePollSkipStrategy被提供。 要使用它,您可以将实例作为 bean 添加到应用程序上下文中,将其注入到PollSkipAdvice,并将其添加到轮询者的建议链中。 要跳过轮询,请调用skipPolls(). 要恢复轮询,请调用reset(). 4.2 版在这方面增加了更多灵活性。 请参阅条件轮询器

本章仅简要概述轮询使用者,以及它们如何适应消息通道(请参阅消息通道)和通道适配器(请参阅通道适配器)的概念。 有关一般消息传递端点和轮询使用者的详细信息,请参阅消息端点spring-doc.cadn.net.cn

延迟确认可轮询消息源

从 5.0.1 版本开始,某些模块提供MessageSource支持延迟确认直到下游流完成(或将消息移交给另一个线程)的实现。 这目前仅限于AmqpMessageSourceKafkaMessageSource.spring-doc.cadn.net.cn

使用这些消息源,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK标题(请参阅MessageHeaderAccessor应用程序接口) 将添加到消息中。 当与可轮询消息源一起使用时,标头的值是AcknowledgmentCallback,如以下示例所示:spring-doc.cadn.net.cn

@FunctionalInterface
public interface AcknowledgmentCallback extends SimpleAcknowledgment {

    void acknowledge(Status status);

    @Override
    default void acknowledge() {
        acknowledge(Status.ACCEPT);
    }

    default boolean isAcknowledged() {
        return false;
    }


    default void noAutoAck() {
        throw new UnsupportedOperationException("You cannot disable auto acknowledgment with this implementation");
    }

    default boolean isAutoAck() {
        return true;
    }

    enum Status {

        /**
         * Mark the message as accepted.
         */
        ACCEPT,

        /**
         * Mark the message as rejected.
         */
        REJECT,

        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE

    }

}

并非所有消息源(例如,一个KafkaMessageSource) 支持REJECT地位。 它的处理方式与ACCEPT.spring-doc.cadn.net.cn

应用程序可以随时确认消息,如以下示例所示:spring-doc.cadn.net.cn

Message<?> received = source.receive();

...

StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
        .acknowledge(Status.ACCEPT);

如果MessageSource连接到一个SourcePollingChannelAdapter,当轮询器线程在下游流完成后返回到适配器时,适配器会检查确认是否已得到确认,如果没有,则将其状态设置为ACCEPT它(或REJECT如果流抛出异常)。 状态值在AcknowledgmentCallback.Status列举.spring-doc.cadn.net.cn

Spring Integration 提供MessageSourcePollingTemplate执行MessageSource. 这也负责设置ACCEPTREJECTAcknowledgmentCallbackMessageHandler回调返回(或抛出异常)。 以下示例演示如何使用MessageSourcePollingTemplate:spring-doc.cadn.net.cn

MessageSourcePollingTemplate template =
    new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
    ...
});

在这两种情况下(SourcePollingChannelAdapterMessageSourcePollingTemplate),您可以通过调用noAutoAck()在回调上。 如果您将消息移交给另一个线程并希望稍后确认,则可以这样做。 并非所有实现都支持此功能,例如,Apache Kafka 不支持,因为偏移量提交必须在同一线程上执行。spring-doc.cadn.net.cn

消息源的条件轮询器

本节介绍如何使用条件轮询器。spring-doc.cadn.net.cn

背景

Advice对象,在advice-chain在轮询器上,建议整个轮询任务(消息检索和处理)。 这些“围绕建议”方法无法访问民意调查的任何上下文——只能访问民意调查本身。 如前所述,这对于使任务事务化或由于某些外部条件而跳过轮询等需求来说是可以的。 如果我们希望根据receive民意调查的一部分,或者我们是否想根据情况调整民意调查器?对于这些实例,Spring Integration 提供了“智能”轮询。spring-doc.cadn.net.cn

“智能”轮询

5.3 版引入了ReceiveMessageAdvice接口。 任何Advice对象中的advice-chain实现此接口的receive()操作-MessageSource.receive()PollableChannel.receive(timeout). 因此,它们只能应用于SourcePollingChannelAdapterPollingConsumer. 此类类实现以下方法:spring-doc.cadn.net.cn

  • beforeReceive(Object source)此方法在Object.receive()方法。 它允许您检查和重新配置源。 返回false取消此轮询(类似于PollSkipAdvice前面提到过)。spring-doc.cadn.net.cn

  • Message<?> afterReceive(Message<?> result, Object source)此方法在receive()方法。 同样,您可以重新配置源或采取任何作(可能取决于结果,这可能是null如果没有源创建的消息)。 您甚至可以返回其他消息spring-doc.cadn.net.cn

线程安全

如果Advice更改源,则不应使用TaskExecutor. 如果Advice突变源,此类突变不是线程安全的,可能会导致意外结果,尤其是对于高频轮询器。 如果需要并发处理轮询结果,请考虑使用下游ExecutorChannel而不是向轮询器添加执行器。spring-doc.cadn.net.cn

建议链订购

您应该了解在初始化期间如何处理通知链。Advice未实现的对象ReceiveMessageAdvice应用于整个轮询过程,并且首先调用,按顺序,然后再调用任何ReceiveMessageAdvice. 然后ReceiveMessageAdvice围绕源按顺序调用对象receive()方法。 例如,如果您有Advice对象a, b, c, d哪里bdReceiveMessageAdvice,对象按以下顺序应用:a, c, b, d. 此外,如果源已经是ProxyReceiveMessageAdvice在任何现有的Advice对象。 如果您想更改订单,您必须自己连接代理。spring-doc.cadn.net.cn

SimpleActiveIdleReceiveMessageAdvice

此建议是ReceiveMessageAdvice. 当与DynamicPeriodicTrigger,它会根据上一次轮询是否导致消息来调整轮询频率。 轮询器还必须引用相同的DynamicPeriodicTrigger.spring-doc.cadn.net.cn

重要提示:异步切换
SimpleActiveIdleReceiveMessageAdvice根据receive()结果。 仅当在轮询器线程上调用通知时,这才有效。 如果轮询器有一个task-executor. 要在希望在轮询结果后使用异步作时使用此建议,请稍后执行异步切换,也许使用ExecutorChannel.

CompoundTriggerAdvice

此建议允许根据轮询是否返回消息来选择两个触发器之一。 考虑一个轮询器,它使用CronTrigger.CronTrigger实例是不可变的,因此一旦构造就无法更改。 考虑一个用例,我们希望使用 cron 表达式每小时触发一次轮询,但如果没有收到消息,则每分钟轮询一次,并在检索消息时恢复为 cron 表达式。spring-doc.cadn.net.cn

建议(和轮询器)使用CompoundTrigger为此目的。 触发器的primarytrigger 可以是CronTrigger. 当通知检测到未收到任何消息时,它会将辅助触发器添加到CompoundTrigger. 当CompoundTrigger实例的nextExecutionTime方法时,它委托给辅助触发器(如果存在)。 否则,它将委托给主触发器。spring-doc.cadn.net.cn

轮询器还必须引用相同的CompoundTrigger.spring-doc.cadn.net.cn

以下示例显示了每小时 cron 表达式的配置,回退到每分钟一次:spring-doc.cadn.net.cn

<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
    <bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
    <int:poller trigger="compoundTrigger">
        <int:advice-chain>
            <bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
                <constructor-arg ref="compoundTrigger"/>
                <constructor-arg ref="secondary"/>
            </bean>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
    <constructor-arg ref="primary" />
</bean>

<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
    <constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>

<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="60000" />
</bean>
重要提示:异步切换
CompoundTriggerAdvice根据receive()结果。 仅当在轮询器线程上调用通知时,这才有效。 如果轮询器有一个task-executor. 要在希望在轮询结果后使用异步作时使用此建议,请稍后执行异步切换,也许使用ExecutorChannel.

仅 MessageSource 通知

某些建议可能仅适用于MessageSource.receive()而且它们对PollableChannel. 为此,一个MessageSourceMutator接口(ReceiveMessageAdvice) 仍然存在。 有关详细信息,请参阅入站通道适配器:轮询多个服务器和目录spring-doc.cadn.net.cn