此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1spring-doc.cadn.net.cn

剥离

延迟器是一个简单的端点,它允许消息流延迟一定的时间间隔。 当消息延迟时,原始发件人不会阻止。 相反,延迟消息使用org.springframework.scheduling.TaskScheduler延迟过后发送到输出通道。 这种方法即使在相当长的延迟下也是可扩展的,因为它不会导致大量阻塞的发送方线程。 相反,在典型情况下,线程池用于实际执行释放消息。 本节包含配置中间器的几个示例。spring-doc.cadn.net.cn

配置 Delayer

<delayer>元素用于延迟两个消息通道之间的消息流。 与其他端点一样,您可以提供“input-channel”和“output-channel”属性,但中间器还具有“default-delay”和“expression”属性(以及“expression”元素),用于确定每条消息应延迟的毫秒数。 以下示例将所有消息延迟三秒:spring-doc.cadn.net.cn

<int:delayer id="delayer" input-channel="input"
             default-delay="3000" output-channel="output"/>

如果需要确定每条消息的延迟,还可以使用 'expression' 属性提供 SpEL 表达式,如以下表达式所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from("input")
            .delay(d -> d
                    .messageGroupId("delayer.messageGroupId")
                    .defaultDelay(3_000L)
                    .delayExpression("headers['delay']"))
            .channel("output")
            .get();
}
@Bean
fun flow() =
    integrationFlow("input") {
        delay {
            messageGroupId("delayer.messageGroupId")
            defaultDelay(3000L)
            delayExpression("headers['delay']")
        }
        channel("output")
    }
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
    DelayHandler handler = new DelayHandler("delayer.messageGroupId");
    handler.setDefaultDelay(3_000L);
    handler.setDelayExpressionString("headers['delay']");
    handler.setOutputChannelName("output");
    return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
             default-delay="3000" expression="headers['delay']"/>

在前面的示例中,仅当给定入站消息的表达式计算结果为 null 时,三秒延迟才适用。 如果只想对具有表达式评估有效结果的消息应用延迟,则可以使用0(默认值)。 对于延迟为0(或更少),消息会立即在调用线程上发送。spring-doc.cadn.net.cn

XML 解析器使用消息组 ID<beanName>.messageGroupId.
延迟处理程序支持表达式评估结果,这些结果表示以毫秒为单位的间隔(任何Object谁的toString()方法生成一个可以解析为一个Long)以及java.util.Date表示绝对时间的实例。 在第一种情况下,毫秒从当前时间开始计数(例如,值5000将从中间人收到消息之日起至少延迟五秒钟)。 使用Date实例,则消息直到该Date对象。 等同于非正延迟或过去日期的值会导致没有延迟。 相反,它直接发送到原始发送方线程上的输出通道。 如果表达式求值结果不是Date并且不能解析为Long,默认延迟(如果有 — 默认值为0) 被应用。
表达式计算可能会因各种原因(包括无效表达式或其他条件)引发评估异常。 默认情况下,此类异常将被忽略(尽管在 DEBUG 级别记录),并且 delayer 回退到默认延迟(如果有)。 您可以通过将ignore-expression-failures属性。 默认情况下,此属性设置为true并且 delayer 行为如前所述。 但是,如果您不希望忽略表达式求值异常并将它们抛给 delayer的调用者,请将ignore-expression-failures属性设置为false.

在前面的示例中,延迟表达式指定为headers['delay']. 这是 SpELIndexer访问Map元素 (MessageHeaders实现Map). 它调用:headers.get("delay"). 对于简单的映射元素名称(不包含 '.'),您还可以使用 SpEL“点访问器”语法,其中前面显示的标头表达式可以指定为headers.delay. 但是,如果缺少标头,则会获得不同的结果。 在第一种情况下,表达式的计算结果为null. 第二个结果类似于以下内容:spring-doc.cadn.net.cn

 org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8):
		   Field or property 'delay' cannot be found on object of type 'org.springframework.messaging.MessageHeaders'

因此,如果有可能省略标头,并且想要回退到默认延迟,则通常使用索引器语法而不是点属性访问器语法更有效(并建议使用),因为检测 null 比捕获异常更快。spring-doc.cadn.net.cn

delayer 委托给 Spring 的TaskScheduler抽象化。 delayer 使用的默认调度程序是ThreadPoolTaskScheduler启动时由 Spring Integration 提供的实例。 请参阅配置任务计划程序。 如果要委托给不同的调度器,可以通过 delayer 元素的 'scheduler' 属性提供引用,如以下示例所示:spring-doc.cadn.net.cn

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    scheduler="exampleTaskScheduler"/>

<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
如果将外部ThreadPoolTaskScheduler,您可以设置waitForTasksToCompleteOnShutdown = true在这个财产上。 它允许在应用程序关闭时成功完成已经处于执行状态(释放消息)的“延迟”任务。 在 Spring Integration 2.2 之前,此属性在<delayer>元素,因为DelayHandler可以在后台创建自己的调度程序。 从 2.2 开始,delayer 需要一个外部调度器实例,并且waitForTasksToCompleteOnShutdown被删除了。 您应该使用调度程序自己的配置。
ThreadPoolTaskScheduler有一个属性errorHandler,可以注入一些org.springframework.util.ErrorHandler. 此处理程序允许处理Exception从发送延迟消息的计划任务的线程。 默认情况下,它使用org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler,您可以在日志中看到堆栈跟踪。 您可能需要考虑使用org.springframework.integration.channel.MessagePublishingErrorHandler,它发送一个ErrorMessage变成一个error-channel,从失败邮件的标头或默认的error-channel. 此错误处理在事务回滚(如果存在)后执行。 请参阅发布失败

Delayer 和消息存储

DelayHandler将延迟消息保存到提供的消息组中MessageStore. (“groupId”基于<delayer>元素。 也可以看看DelayHandler.setMessageGroupId(String).) 延迟消息将从MessageStoreDelayHandler将消息发送到output-channel. 如果提供的MessageStore是持久的(例如JdbcMessageStore),它提供了在应用程序关闭时不会丢失消息的能力。 应用程序启动后,DelayHandler从其消息组中读取消息MessageStore并根据消息的原始到达时间(如果延迟为数字)以延迟重新安排它们。 对于延迟标头为DateDate在重新安排时使用。 如果延迟消息保留在MessageStore除了它的“延迟”之外,它还在启动后立即发送。 这messageGroupId是必需的,并且不能依赖DelayHandler可以生成的 bean 名称。 这样,在应用程序重新启动后,一个DelayHandler可能会获得一个新生成的 bean 名称。 因此,延迟消息可能会因重新计划而丢失,因为它们的组不再由应用程序管理。spring-doc.cadn.net.cn

<delayer>可以使用两个互斥元素之一进行丰富:<transactional><advice-chain>. 这List这些 AOP 建议应用于代理的内部DelayHandler.ReleaseMessageHandler,它负责在延迟后在Thread计划任务的。 例如,当下游消息流抛出异常并且ReleaseMessageHandler回滚。 在这种情况下,延迟的消息将保留在持久性MessageStore. 您可以使用任何自定义org.aopalliance.aop.Advice<advice-chain>. 这<transactional>元素定义了一个简单的通知链,该链只有事务性通知。 以下示例显示了advice-chain<delayer>:spring-doc.cadn.net.cn

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    message-store="jdbcMessageStore">
    <int:advice-chain>
        <beans:ref bean="customAdviceBean"/>
        <tx:advice>
            <tx:attributes>
                <tx:method name="*" read-only="true"/>
            </tx:attributes>
        </tx:advice>
    </int:advice-chain>
</int:delayer>

DelayHandler可以导出为 JMXMBean使用托管作 (getDelayedMessageCountreschedulePersistedMessages),它允许在运行时重新安排延迟持久化消息——例如,如果TaskScheduler之前已被停止。 这些作可以通过Control Bus命令,如以下示例所示:spring-doc.cadn.net.cn

Message<String> delayerReschedulingMessage =
    MessageBuilder.withPayload("@'delayer.handler'.reschedulePersistedMessages()").build();
controlBusChannel.send(delayerReschedulingMessage);
有关消息存储、JMX 和控制总线的更多信息,请参阅 系统管理

从版本 5.3.7 开始,如果当消息存储到MessageStore,则发布任务计划在TransactionSynchronization.afterCommit()回调。 这是防止竞争条件所必需的,在竞争条件中,计划的发布可能会在事务提交之前运行,并且找不到消息。 在这种情况下,消息将在延迟后或事务提交后释放,以较晚者为准。spring-doc.cadn.net.cn

发布失败

从 5.0.8 版本开始,delayer 上有两个新属性:spring-doc.cadn.net.cn

发布消息时,如果下游流失败,则将在retryDelay. 如果maxAttempts到达时,消息将被丢弃(除非发布是事务性的,在这种情况下,消息将保留在存储中,但将不再计划发布,直到应用程序重新启动,或者reschedulePersistedMessages()方法被调用,如上所述)。spring-doc.cadn.net.cn

此外,您可以配置delayedMessageErrorChannel;当发布失败时,一个ErrorMessage被发送到该通道,例外作为有效负载,并且具有originalMessage财产。 这ErrorMessage包含标头IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT包含当前计数。spring-doc.cadn.net.cn

如果错误流消耗错误消息并正常退出,则不会采取进一步的作;如果发布是事务性的,则事务将提交,消息将从存储中删除。 如果错误流引发异常,则将重试发布,直到maxAttempts如上所述。spring-doc.cadn.net.cn