此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1spring-doc.cadn.net.cn

幂等接收方企业集成模式

从版本 4.1 开始,Spring Integration 提供了幂等接收器企业集成模式的实现。 它是一种函数模式,整个幂等逻辑应该在应用程序中实现。 但是,为了简化决策,该IdempotentReceiverInterceptor组件。 这是一个 AOPAdvice应用于MessageHandler.handleMessage()方法,可以filter请求消息或将其标记为duplicate,根据其配置。spring-doc.cadn.net.cn

以前,您可以使用自定义MessageSelector<filter/>(请参阅过滤器)。 但是,由于此模式实际上定义了终结点的行为,而不是终结点本身,因此幂等接收器实现不提供终结点组件。 相反,它应用于应用程序中声明的端点。spring-doc.cadn.net.cn

的逻辑IdempotentReceiverInterceptor基于提供的MessageSelector并且,如果该选择器未接受消息,则使用duplicateMessageheader 设置为true. 目标MessageHandler(或下游流)可以查阅此标头以实现正确的幂等逻辑。 如果IdempotentReceiverInterceptor配置了discardChannelthrowExceptionOnRejection = true,则不会将重复消息发送到目标MessageHandler.handleMessage(). 相反,它被丢弃了。 如果要丢弃(不执行任何作)重复的消息,则discardChannel应配置NullChannel,例如默认的nullChannel豆。spring-doc.cadn.net.cn

为了维护消息之间的状态并提供比较消息幂等性的能力,我们提供了MetadataStoreSelector. 它接受一个MessageProcessor实现(它基于Message) 和可选的ConcurrentMetadataStore (元数据存储)。 请参阅MetadataStoreSelectorJavadoc了解更多信息。 您还可以自定义valueConcurrentMetadataStore通过使用附加的MessageProcessor. 默认情况下,MetadataStoreSelector使用timestamp消息头。spring-doc.cadn.net.cn

通常,如果键没有现有值,则选择器会选择一条消息进行接受。 在某些情况下,比较键的当前值和新值以确定是否应接受消息非常有用。 从 5.3 版开始,compareValues属性,它引用了BiPredicate<String, String>;第一个参数是旧值;返回true接受消息并将旧值替换为MetadataStore. 这对于减少键数很有用;例如,在处理文件中的行时,可以将文件名存储在键中,将当前行号存储在值中。 然后,在重新启动后,您可以跳过已处理的行。 有关示例,请参阅幂等下游处理拆分文件spring-doc.cadn.net.cn

为方便起见,该MetadataStoreSelector选项可直接在<idempotent-receiver>元件。 以下列表显示了所有可能的属性:spring-doc.cadn.net.cn

<idempotent-receiver
        id=""  (1)
        endpoint=""  (2)
        selector=""  (3)
        discard-channel=""  (4)
        metadata-store=""  (5)
        key-strategy=""  (6)
        key-expression=""  (7)
        value-strategy=""  (8)
        value-expression=""  (9)
        compare-values="" (10)
        throw-exception-on-rejection="" />  (11)
1 的 IDIdempotentReceiverInterceptor豆。 自选。
2 应用此拦截器的使用者端点名称或模式。 用逗号 (,),例如endpoint="aaa, bbb*, ccc, *ddd, eee*fff". 然后,使用与这些模式匹配的端点 Bean 名称来检索目标端点的MessageHandlerbean(使用其.handler后缀),和IdempotentReceiverInterceptor应用于这些 bean。 必填。
3 一个MessageSelectorbean 引用。 相互排斥metadata-storekey-strategy (key-expression). 什么时候selector未提供,则其中之一key-strategykey-strategy-expression是必需的。
4 标识在IdempotentReceiverInterceptor不接受。 如果省略,则重复的消息将转发到处理程序,并带有duplicateMessage页眉。 自选。
5 一个ConcurrentMetadataStore参考。 由基础层使用MetadataStoreSelector. 相互排斥selector. 自选。 默认值MetadataStoreSelector使用内部SimpleMetadataStore不会在应用程序执行中保持状态。
6 一个MessageProcessor参考。 由基础层使用MetadataStoreSelector. 评估idempotentKey从请求消息中。 相互排斥selectorkey-expression. 当selector未提供,则其中之一key-strategykey-strategy-expression是必需的。
7 用于填充ExpressionEvaluatingMessageProcessor. 由基础层使用MetadataStoreSelector. 评估idempotentKey通过使用请求消息作为评估上下文根对象。 相互排斥selectorkey-strategy. 当selector未提供,则其中之一key-strategykey-strategy-expression是必需的。
8 一个MessageProcessor参考。 由基础层使用MetadataStoreSelector. 评估value对于idempotentKey从请求消息中。 相互排斥selectorvalue-expression. 默认情况下,“MetadataStoreSelector”使用“timestamp”消息标头作为元数据“值”。
9 用于填充ExpressionEvaluatingMessageProcessor. 由基础层使用MetadataStoreSelector. 评估value对于idempotentKey通过使用请求消息作为评估上下文根对象。 相互排斥selectorvalue-strategy. 默认情况下,“MetadataStoreSelector”使用“timestamp”消息标头作为元数据“值”。
10 BiPredicate<String, String>bean,它允许您通过比较键的新旧值来选择消息;null默认情况下。
11 如果IdempotentReceiverInterceptor拒绝邮件。 默认为false. 无论是否discard-channel被提供。

对于 Java 配置,Spring Integration 提供了方法级@IdempotentReceiver注解。 它用于标记method具有消息传递注释 (@ServiceActivator,@Router, and others) to specify which `IdempotentReceiverInterceptor对象将应用于此终结点。 以下示例演示如何使用@IdempotentReceiver注解:spring-doc.cadn.net.cn

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
   return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
                                                    m.getHeaders().get(INVOICE_NBR_HEADER)));
}

@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
    ....
}

使用 Java DSL 时,可以将拦截器添加到端点的通知链中,如以下示例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow flow() {
    ...
        .handle("someBean", "someMethod",
            e -> e.advice(idempotentReceiverInterceptor()))
    ...
}
IdempotentReceiverInterceptor专为MessageHandler.handleMessage(Message<?>)方法。 从 4.3.1 版开始,它实现了HandleMessageAdvice,替换为AbstractHandleMessageAdvice作为基类,以便更好地解离。 有关详细信息,请参阅处理消息通知