此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
轮询器
本节介绍轮询在 Spring Integration 中的工作原理。
轮询消费者
当消息端点(通道适配器)连接到通道并实例化时,它们会生成以下实例之一:
实际实现取决于这些端点连接到的通道类型。连接到实现org.springframework.messaging.SubscribableChannel
接口生成一个EventDrivenConsumer
. 另一方面,连接到实现org.springframework.messaging.PollableChannel
接口(例如QueueChannel
) 会生成PollingConsumer
.
轮询消费者让 Spring Integration 组件主动轮询 Message,而不是以事件驱动的方式处理消息。
它们代表了许多消息传递场景中的关键跨领域问题。在 Spring Integration 中,轮询使用者基于具有相同名称的模式,这在 Gregor Hohpe 和 Bobby Woolf 所著的 Enterprise Integration Patterns 一书中进行了描述。您可以在本书的网站上找到该模式的描述。
有关轮询使用者配置的更多信息,请参阅消息端点。
可轮询消息源
Spring Integration提供了轮询消费者模式的第二种变体。当使用入站通道适配器时,这些适配器通常由SourcePollingChannelAdapter
. 例如,从远程 FTP 服务器位置检索消息时,FTP 入站通道适配器中描述的适配器配置了轮询器,以定期检索消息。因此,当组件配置了轮询器时,生成的实例属于以下类型之一:
这意味着轮询器用于入站和出站消息传递场景。以下是使用轮询器的一些用例:
-
轮询某些外部系统,例如 FTP 服务器、数据库和 Web 服务
-
轮询内部(可轮询)消息通道
-
轮询内部服务(例如在 Java 类上重复执行方法)
AOP 建议类可以应用于轮询器,在advice-chain ,例如启动事务的事务通知。从 4.1 版本开始,一个PollSkipAdvice 提供。轮询器使用触发器来确定下一次轮询的时间。 这PollSkipAdvice 可用于抑制(跳过)轮询,可能是因为存在一些下游条件会阻止消息被处理。要使用此建议,您必须为其提供PollSkipStrategy . 从 4.2.5 版开始,SimplePollSkipStrategy 提供了。要使用它,您可以将实例作为 bean 添加到应用程序上下文中,将其注入到PollSkipAdvice ,并将其添加到轮询器的建议链中。要跳过轮询,请调用skipPolls() . 要恢复轮询,请调用reset() . 4.2 版在这方面增加了更多灵活性。请参阅条件轮询器。 |
延迟确认可轮询消息源
从 5.0.1 版本开始,某些模块提供MessageSource
支持延迟确认直到下游流完成(或将消息移交给另一个线程)的实现。
这目前仅限于AmqpMessageSource
和KafkaMessageSource
.
使用这些消息源,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
标题(请参阅MessageHeaderAccessor
应用程序接口) 将添加到消息中。
当与可轮询消息源一起使用时,标头的值是AcknowledgmentCallback
,如以下示例所示:
@FunctionalInterface
public interface AcknowledgmentCallback {
void acknowledge(Status status);
boolean isAcknowledged();
void noAutoAck();
default boolean isAutoAck();
enum Status {
/**
* Mark the message as accepted.
*/
ACCEPT,
/**
* Mark the message as rejected.
*/
REJECT,
/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE
}
}
并非所有消息源(例如,一个KafkaMessageSource
) 支持REJECT
地位。
它的处理方式与ACCEPT
.
应用程序可以随时确认消息,如以下示例所示:
Message<?> received = source.receive();
...
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(Status.ACCEPT);
如果MessageSource
连接到一个SourcePollingChannelAdapter
,当轮询器线程在下游流完成后返回到适配器时,适配器会检查确认是否已得到确认,如果没有,则将其状态设置为ACCEPT
它(或REJECT
如果流抛出异常)。
状态值在AcknowledgmentCallback.Status
列举.
Spring Integration 提供MessageSourcePollingTemplate
执行MessageSource
.
这也负责设置ACCEPT
或REJECT
在AcknowledgmentCallback
当MessageHandler
回调返回(或抛出异常)。
以下示例演示如何使用MessageSourcePollingTemplate
:
MessageSourcePollingTemplate template =
new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
...
});
在这两种情况下(SourcePollingChannelAdapter
和MessageSourcePollingTemplate
),您可以通过调用noAutoAck()
在回调上。
如果您将消息移交给另一个线程并希望稍后确认,则可以这样做。
并非所有实现都支持此功能(例如,Apache Kafka 不支持,因为偏移量提交必须在同一线程上执行)。
消息源的条件轮询器
本节介绍如何使用条件轮询器。
背景
Advice
对象,在advice-chain
在轮询器上,建议整个轮询任务(消息检索和处理)。
这些“围绕建议”方法无法访问民意调查的任何上下文——只能访问民意调查本身。
如前所述,这对于使任务事务化或由于某些外部条件而跳过轮询等需求来说是可以的。
如果我们希望根据receive
民意调查的一部分,或者我们是否想根据情况调整民意调查器?对于这些实例,Spring Integration 提供了“智能”轮询。
“智能”轮询
5.3 版引入了ReceiveMessageAdvice
接口。
任何Advice
对象中的advice-chain
实现此接口的receive()
操作-MessageSource.receive()
和PollableChannel.receive(timeout)
.
因此,它们只能应用于SourcePollingChannelAdapter
或PollingConsumer
.
此类类实现以下方法:
-
beforeReceive(Object source)
此方法在Object.receive()
方法。 它允许您检查和重新配置源。 返回false
取消此轮询(类似于PollSkipAdvice
前面提到过)。 -
Message<?> afterReceive(Message<?> result, Object source)
此方法在receive()
方法。 同样,您可以重新配置源或采取任何作(可能取决于结果,这可能是null
如果没有源创建的消息)。 您甚至可以返回其他消息
线程安全
如果 |
建议链订购
您应该了解在初始化期间如何处理通知链。 |
SimpleActiveIdleReceiveMessageAdvice
此建议是ReceiveMessageAdvice
.
当与DynamicPeriodicTrigger
,它会根据上一次轮询是否导致消息来调整轮询频率。
轮询器还必须引用相同的DynamicPeriodicTrigger
.
重要提示:异步切换
SimpleActiveIdleReceiveMessageAdvice 根据receive() 结果。
仅当在轮询器线程上调用通知时,这才有效。
如果轮询器有一个task-executor .
要在希望在轮询结果后使用异步作时使用此建议,请稍后执行异步切换,也许使用ExecutorChannel . |
CompoundTriggerAdvice
此建议允许根据轮询是否返回消息来选择两个触发器之一。
考虑一个轮询器,它使用CronTrigger
.CronTrigger
实例是不可变的,因此一旦构造就无法更改。
考虑一个用例,我们希望使用 cron 表达式每小时触发一次轮询,但如果没有收到消息,则每分钟轮询一次,并在检索消息时恢复为 cron 表达式。
建议(和轮询器)使用CompoundTrigger
为此目的。
触发器的primary
trigger 可以是CronTrigger
.
当通知检测到未收到任何消息时,它会将辅助触发器添加到CompoundTrigger
.
当CompoundTrigger
实例的nextExecutionTime
方法时,它委托给辅助触发器(如果存在)。
否则,它将委托给主触发器。
轮询器还必须引用相同的CompoundTrigger
.
以下示例显示了每小时 cron 表达式的配置,回退到每分钟一次:
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
<int:poller trigger="compoundTrigger">
<int:advice-chain>
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
<constructor-arg ref="compoundTrigger"/>
<constructor-arg ref="secondary"/>
</bean>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
<constructor-arg ref="primary" />
</bean>
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
<constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
<constructor-arg value="60000" />
</bean>
重要提示:异步切换
CompoundTriggerAdvice 根据receive() 结果。
仅当在轮询器线程上调用通知时,这才有效。
如果轮询器有一个task-executor .
要在希望在轮询结果后使用异步作时使用此建议,请稍后执行异步切换,也许使用ExecutorChannel . |
仅 MessageSource 通知
某些建议可能仅适用于MessageSource.receive()
而且它们对PollableChannel
.
为此,一个MessageSourceMutator
接口(ReceiveMessageAdvice
) 仍然存在。
有关详细信息,请参阅入站通道适配器:轮询多个服务器和目录。