此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1spring-doc.cadn.net.cn

消息传递网关

网关隐藏了 Spring Integration 提供的消息传递 API。 它允许应用程序的业务逻辑不知道 Spring Integration API。 通过使用通用网关,您的代码仅与一个简单的接口进行交互。spring-doc.cadn.net.cn

输入GatewayProxyFactoryBean

如前所述,如果不依赖 Spring Integration API(包括网关类),那就太好了。 因此,Spring Integration 提供了GatewayProxyFactoryBean,它为任何接口生成代理,并在内部调用如下所示的网关方法。 通过使用依赖注入,您可以将接口公开给您的业务方法。spring-doc.cadn.net.cn

以下示例显示了一个可用于与 Spring Integration 交互的接口:spring-doc.cadn.net.cn

public interface Cafe {

    void placeOrder(Order order);

}

网关 XML 命名空间支持

还提供命名空间支持。 它允许您将接口配置为服务,如以下示例所示:spring-doc.cadn.net.cn

<int:gateway id="cafeService"
         service-interface="org.cafeteria.Cafe"
         default-request-channel="requestChannel"
         default-reply-timeout="10000"
         default-reply-channel="replyChannel"/>

定义此配置后,cafeService现在可以注入到其他 bean,并且调用该代理实例上的方法的代码Cafe接口不知道 Spring Integration API。 请参阅“示例”附录,了解使用gateway元素(在 Cafe 演示中)。spring-doc.cadn.net.cn

上述配置中的默认值将应用于网关接口上的所有方法。 如果未指定应答超时,则调用线程将等待应答 30 秒。 请参阅未到达响应时的网关行为spring-doc.cadn.net.cn

可以覆盖单个方法的默认值。 请参阅使用注释和 XML 的网关配置spring-doc.cadn.net.cn

设置默认回复通道

通常,您不需要指定default-reply-channel,因为网关会自动创建一个临时的匿名应答通道,并在其中侦听应答。 但是,在某些情况下,可能会提示您定义default-reply-channel(或reply-channel与适配器网关(例如 HTTP、JMS 等)一起使用)。spring-doc.cadn.net.cn

对于一些背景信息,我们简要讨论网关的一些内部工作原理。 网关创建临时点对点应答通道。 它是匿名的,并以名称replyChannel. 当提供显式default-reply-channel (reply-channel使用远程适配器网关),您可以指向发布-订阅通道,该通道之所以如此命名,是因为您可以向其添加多个订阅者。 在内部,Spring Integration 在临时replyChannel和显式定义的default-reply-channel.spring-doc.cadn.net.cn

假设您希望您的回复不仅发送到网关,还发送到其他一些消费者。 在这种情况下,您需要两件事:spring-doc.cadn.net.cn

网关使用的默认策略不能满足这些需求,因为添加到标头的应答通道是匿名的和点对点的。 这意味着没有其他订阅者可以获取它的句柄,即使可以,通道也具有点对点行为,因此只有一个订阅者可以获取消息。 通过定义default-reply-channel您可以指向您选择的频道。 在这种情况下,这是一个publish-subscribe-channel. 网关从它创建一个桥接器,连接到存储在标头中的临时匿名应答通道。spring-doc.cadn.net.cn

您可能还希望显式提供一个应答通道,以便通过拦截器(例如,窃听)进行监视或审计。 要配置通道拦截器,您需要一个命名通道。spring-doc.cadn.net.cn

从 5.4 版开始,当网关方法返回类型为void,则框架填充replyChannelheader 作为nullChannelbean 引用,如果未显式提供此类标头。 这允许丢弃来自下游流的任何可能的回复,从而满足单向网关合同。

使用注释和 XML 的网关配置

请考虑以下示例,该示例扩展了前面的Cafe接口示例,通过添加@Gateway注解:spring-doc.cadn.net.cn

public interface Cafe {

    @Gateway(requestChannel="orders")
    void placeOrder(Order order);

}

@Header批注允许您添加被解释为消息标头的值,如以下示例所示:spring-doc.cadn.net.cn

public interface FileWriter {

    @Gateway(requestChannel="filesOut")
    void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);

}

如果您更喜欢使用 XML 方法来配置网关方法,则可以将method元素添加到网关配置中,如以下示例所示:spring-doc.cadn.net.cn

<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB"/>
  <int:method name="echoViaDefault"/>
</int:gateway>

您还可以使用 XML 为每个方法调用提供单独的标头。 如果要设置的标头本质上是静态的,并且您不想使用@Header附注。 例如,在贷款经纪人示例中,我们希望根据发起的请求类型(单引号或所有报价)影响贷款报价的聚合方式。 通过评估调用的网关方法来确定请求的类型,尽管可能,但会违反关注点分离范式(该方法是 Java 工件)。 但是,在消息传递架构中,在消息头中表达您的意图(元信息)是很自然的。 以下示例演示如何为两种方法中的每一种添加不同的邮件头:spring-doc.cadn.net.cn

<int:gateway id="loanBrokerGateway"
         service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
  <int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="BEST"/>
  </int:method>
  <int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="ALL"/>
  </int:method>
</int:gateway>

在前面的示例中,根据网关的方法,为“RESPONSE_TYPE”标头设置了不同的值。spring-doc.cadn.net.cn

例如,如果指定requestChannel<int:method/>以及在@Gateway注释,则注释值优先。
如果在 XML 中指定了无参数网关,并且接口方法同时具有@Payload@Gateway注释(带有payloadExpressionpayload-expression<int:method/>元素)、@Payload值被忽略。

表达式和“全局”标头

<header/>元素支持expression作为替代方案value. 计算 SpEL 表达式以确定标头的值。 从 5.2 版开始,#root对象是MethodArgsHoldergetMethod()getArgs()访问。 例如,如果您希望在简单方法名称上进行路由,则可以添加具有以下表达式的标头:method.name.spring-doc.cadn.net.cn

java.reflect.Method不可序列化。 表达式为method如果稍后序列化邮件,则会丢失。 因此,您可能希望使用method.namemethod.toString()在这些情况下。 这toString()方法提供了一个String方法的表示形式,包括参数和返回类型。

从 3.0 版本开始,<default-header/>可以定义元素以将标头添加到网关生成的所有消息中,而不管调用的方法如何。 为方法定义的特定标头优先于默认标头。 此处为方法定义的特定标头会覆盖任何@Header服务接口中的注释。 但是,默认标头不会覆盖任何@Header服务接口中的注释。spring-doc.cadn.net.cn

网关现在还支持default-payload-expression,适用于所有方法(除非被覆盖)。spring-doc.cadn.net.cn

将方法参数映射到消息

使用上一节中的配置技术可以控制方法参数如何映射到消息元素(有效负载和标头)。 如果未使用显式配置,则使用某些约定来执行映射。 在某些情况下,这些约定无法确定哪个参数是有效负载,哪个参数应映射到标头。 请考虑以下示例:spring-doc.cadn.net.cn

public String send1(Object thing1, Map thing2);

public String send2(Map thing1, Map thing2);

在第一种情况下,约定是将第一个参数映射到有效负载(只要它不是Map),第二个参数的内容成为标头。spring-doc.cadn.net.cn

在第二种情况下(或第一种情况下,当参数的参数thing1是一个Map),框架无法确定哪个参数应该是有效负载。 因此,映射失败。 这通常可以使用payload-expression一个@Payload注释,或@Headers注解。spring-doc.cadn.net.cn

或者(每当约定失效时),您可以承担将方法调用映射到消息的全部责任。 为此,请实现MethodArgsMessageMapper并将其提供给<gateway/>通过使用mapper属性。 映射器映射一个MethodArgsHolder,这是一个简单的类,它包装了java.reflect.Method实例和Object[]包含参数。 提供自定义映射器时,default-payload-expression属性和<default-header/>网关上不允许元素。 同样,payload-expression属性和<header/>元素不允许在任何<method/>元素。spring-doc.cadn.net.cn

映射方法参数

以下示例显示了如何将方法参数映射到消息,并显示了一些无效配置的示例:spring-doc.cadn.net.cn

public interface MyGateway {

    void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);

    void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);

    void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);

    void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added

    void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);

    @Payload("args[0] + args[1] + '!'")
    void payloadAnnotationAtMethodLevel(String a, String b);

    @Payload("@someBean.exclaim(args[0])")
    void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);

    void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);

    void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); //  (1)

    // invalid
    void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);

    // invalid
    void twoPayloads(@Payload String s1, @Payload String s2);

    // invalid
    void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);

    // invalid
    void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);

}
1 请注意,在本例中,SpEL 变量#this,引用参数 — 在本例中,的值s.

XML 等效项看起来有点不同,因为没有#thismethod 参数的上下文。 但是,表达式可以通过使用args属性的MethodArgsHolderroot 对象(有关更多信息,请参阅表达式和“全局”标头),如以下示例所示:spring-doc.cadn.net.cn

<int:gateway id="myGateway" service-interface="org.something.MyGateway">
  <int:method name="send1" payload-expression="args[0] + 'thing2'"/>
  <int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
  <int:method name="send3" payload-expression="method"/>
  <int:method name="send4">
    <int:header name="thing1" expression="args[2].toUpperCase()"/>
  </int:method>
</int:gateway>

@MessagingGateway注解

从 4.0 版开始,网关服务接口可以标记@MessagingGateway注释,而不是要求定义<gateway />xml 元素进行配置。 以下一对示例比较了配置同一网关的两种方法:spring-doc.cadn.net.cn

<int:gateway id="myGateway" service-interface="org.something.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB">
    <int:header name="thing1" value="thing2"/>
  </int:method>
  <int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
		  defaultHeaders = @GatewayHeader(name = "calledMethod",
		                           expression="#gatewayMethod.name"))
public interface TestGateway {

   @Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
   String echo(String payload);

   @Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
   String echoUpperCase(String payload);

   String echoViaDefault(String payload);

}
与 XML 版本类似,当 Spring Integration 在组件扫描期间发现这些注释时,它会创建proxy实施及其消息传递基础设施。 要执行此扫描并注册BeanDefinition在应用程序上下文中,将@IntegrationComponentScan注释到@Configuration类。 标准@ComponentScan基础设施不处理接口。 因此,我们引入了@IntegrationComponentScan逻辑来查找@MessagingGateway接口和寄存器上的注释GatewayProxyFactoryBean实例。 另请参阅注释支持

@MessagingGateway注释,您可以使用@Profile注释以避免创建 bean(如果此类配置文件未处于活动状态)。spring-doc.cadn.net.cn

从 6.0 版开始,带有@MessagingGateway也可以用@Primary注释,因为它可以在任何 Spring 中使用@Component定义。spring-doc.cadn.net.cn

从 6.0 版本开始,@MessagingGateway接口可以在标准 Spring 中使用@Import配置。 这可以用作@IntegrationComponentScan或手动AnnotationGatewayProxyFactoryBeanbean 定义。spring-doc.cadn.net.cn

@MessagingGateway@MessageEndpoint自版本6.0name()属性本质上是别名为@Compnent.value(). 这样,网关代理的 bean 名称生成策略将与扫描和导入组件的标准 Spring 注释配置重新对齐。 默认值AnnotationBeanNameGenerator可以通过AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR或作为@IntegrationComponentScan.nameGenerator()属性。spring-doc.cadn.net.cn

如果您没有 XML 配置,则@EnableIntegration至少一个@Configuration类。 看配置和@EnableIntegration了解更多信息。

调用无参数方法

在没有任何参数的网关接口上调用方法时,默认行为是接收MessagePollableChannel.spring-doc.cadn.net.cn

但是,有时您可能希望触发无参数方法,以便可以与不需要用户提供参数的下游其他组件进行交互,例如触发无参数 SQL 调用或存储过程。spring-doc.cadn.net.cn

若要实现发送和接收语义,必须提供有效负载。 要生成有效负载,不需要接口上的方法参数。 您可以使用@Payload注释或payload-expression属性method元素。 以下列表包括有效负载可能是什么的几个示例:spring-doc.cadn.net.cn

以下示例演示如何使用@Payload注解:spring-doc.cadn.net.cn

public interface Cafe {

    @Payload("new java.util.Date()")
    List<Order> retrieveOpenOrders();

}

您还可以使用@Gateway注解。spring-doc.cadn.net.cn

public interface Cafe {

    @Gateway(payloadExpression = "new java.util.Date()")
    List<Order> retrieveOpenOrders();

}
如果两个注释都存在(并且payloadExpression提供),@Gateway获胜。

如果方法没有参数和返回值,但包含有效负载表达式,则将其视为仅发送作。spring-doc.cadn.net.cn

调用default方法

网关代理的接口可能具有default方法,从 5.3 版本开始,框架注入了DefaultMethodInvokingMethodInterceptor转换为用于调用的代理default使用java.lang.invoke.MethodHandle方法而不是代理。 来自 JDK 的接口,例如java.util.function.Function,仍然可以用于网关代理,但它们的default由于内部 Java 安全原因,无法调用MethodHandles.Lookup针对 JDK 类的实例化。 这些方法也可以使用显式@Gateway注释,或proxyDefaultMethods@MessagingGateway注释或<gateway>XML 组件。spring-doc.cadn.net.cn

错误处理

网关调用可能会导致错误。 默认情况下,在网关的方法调用时,下游发生的任何错误都会“按原样”重新抛出。 例如,考虑以下简单流:spring-doc.cadn.net.cn

gateway -> service-activator

如果服务激活器调用的服务抛出MyException(例如),框架将其包装在MessagingException并将传递给服务激活器的消息附加到failedMessage财产。 因此,框架执行的任何日志记录都具有完整的故障上下文。 默认情况下,当网关捕获异常时,MyException被解包并抛给调用方。 您可以配置throws子句,以匹配原因链中的特定异常类型。 例如,如果你想捕捉一个整体MessagingException有了下游错误原因的所有消息传递信息,您应该有一个类似于以下内容的网关方法:spring-doc.cadn.net.cn

public interface MyGateway {

    void performProcess() throws MessagingException;

}

由于我们鼓励 POJO 编程,您可能不想将调用者暴露给消息传递基础设施。spring-doc.cadn.net.cn

如果您的网关方法没有throws子句中,网关遍历原因树,寻找RuntimeException这不是MessagingException. 如果未找到任何内容,则框架会抛出MessagingException. 如果MyException在前面的讨论中,有一个原因SomeOtherException和你的方法throws SomeOtherException,网关进一步解包并将其抛给调用方。spring-doc.cadn.net.cn

当网关声明时没有service-interface,一个内部框架接口RequestReplyExchanger被使用。spring-doc.cadn.net.cn

请考虑以下示例:spring-doc.cadn.net.cn

public interface RequestReplyExchanger {

	Message<?> exchange(Message<?> request) throws MessagingException;

}

在 5.0 版本之前,这个exchange方法没有throws子句,结果,异常被解包。 如果使用此接口并想要恢复以前的展开行为,请使用自定义service-interface而不是访问causeMessagingException你自己。spring-doc.cadn.net.cn

但是,您可能希望记录错误而不是传播错误,或者您可能希望将异常视为有效回复(通过将其映射到符合调用方理解的某些“错误消息”协定的消息)。 为此,网关通过包含对error-channel属性。 在以下示例中,“transformer”创建回复MessageException:spring-doc.cadn.net.cn

<int:gateway id="sampleGateway"
    default-request-channel="gatewayChannel"
    service-interface="foo.bar.SimpleGateway"
    error-channel="exceptionTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

exceptionTransformer可以是一个简单的 POJO,它知道如何创建预期的错误响应对象。 这将成为发送回调用方的有效负载。 如有必要,您可以在这样的“错误流”中做更多更复杂的事情。 它可能涉及路由器(包括 Spring Integration 的ErrorMessageExceptionTypeRouter)、过滤器等。 然而,大多数时候,一个简单的“转换器”就足够了。spring-doc.cadn.net.cn

或者,您可能只想记录异常(或异步将其发送到某个地方)。 如果提供单向流,则不会将任何内容发送回调用方。 如果要完全禁止异常,可以提供对全局nullChannel(本质上是一个/dev/null方法)。 最后,如上所述,如果没有error-channel定义,则异常将照常传播。spring-doc.cadn.net.cn

当您使用@MessagingGateway注释(参见 @MessagingGateway' 注释), you can use an `errorChannel属性。spring-doc.cadn.net.cn

从版本 5.0 开始,当您使用带有void返回类型(单向流),则error-channel参考(如果提供)填充在标准中errorChannel每个已发送消息的标头。 此功能允许基于标准的下游异步流ExecutorChannel配置(或QueueChannel),以覆盖默认全局errorChannel异常发送行为。 以前,您必须手动指定errorChannel标头与@GatewayHeader注释或<header>元素。 这error-channel属性被忽略void具有异步流的方法。 相反,错误消息被发送到默认的errorChannel.spring-doc.cadn.net.cn

通过简单的 POJI 网关公开消息传递系统会带来好处,但“隐藏”底层消息传递系统的现实确实是有代价的,因此您应该考虑某些事项。 我们希望我们的 Java 方法能够尽快返回,并且在调用者等待它返回时不会无限期地挂起(无论是 void、返回值还是抛出的异常)。 当常规方法用作消息传递系统前面的代理时,我们必须考虑底层消息传递的潜在异步性质。 这意味着,由网关发起的消息可能会被过滤器丢弃,并且永远无法到达负责生成回复的组件。 某些服务激活器方法可能会导致异常,从而不提供任何回复(因为我们不生成空消息)。 换句话说,多种情况可能会导致回复消息永远不会到来。 这在消息传递系统中是非常自然的。 但是,请考虑对网关方法的影响。 网关的方法输入参数被合并到消息中并发送到下游。 回复消息将转换为网关方法的返回值。 因此,您可能希望确保对于每个网关呼叫,始终有一条回复消息。 否则,如果出现以下情况,网关方法可能永远不会返回并无限期挂起reply-timeout设置为负值。 处理这种情况的一种方法是使用异步网关(本节后面将介绍)。 另一种处理方法是依赖默认值reply-timeout作为30秒。 这样,网关挂起的时间不会超过reply-timeout如果超时确实过去,则返回“null”。 最后,您可能需要考虑在服务激活器上设置下游标志,例如“requires-reply”或在过滤器上设置“throw-exceptions-on-rejection”。 本章的最后一节将更详细地讨论这些选项。
如果下游流返回ErrorMessagepayload(一个Throwable)被视为常规下游错误。 如果有error-channel已配置,则将其发送到错误流。 否则,有效负载将抛出给网关的调用方。 同样,如果错误流在error-channel返回一个ErrorMessage,则其有效负载将抛出给调用方。 这同样适用于任何带有Throwable有效载荷。 这在异步情况下非常有用,当您需要传播Exception直接给来电者。 为此,您可以返回Exception(作为reply来自某些服务)或扔掉它。 通常,即使使用异步流,框架也会负责将下游流抛出的异常传播回网关。 TCP 客户端-服务器多路复用示例演示了将异常返回给调用方的两种技术。 它通过使用aggregatorgroup-timeout(请参阅聚合器和组超时)和MessagingTimeoutException回复丢弃流。

网关超时

网关有两个超时属性:requestTimeoutreplyTimeout. 仅当通道可以阻塞(例如,有界QueueChannel那是满的)。 这replyTimeoutvalue 是网关等待回复或返回的时间null. 它默认为无大。spring-doc.cadn.net.cn

超时可以设置为网关上所有方法的默认值(defaultRequestTimeoutdefaultReplyTimeout) 或在MessagingGateway接口注释。单个方法可以覆盖这些默认值(在<method/>child 元素)或@Gateway注解。spring-doc.cadn.net.cn

从 5.0 版开始,超时可以定义为表达式,如以下示例所示:spring-doc.cadn.net.cn

@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
        requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);

评估上下文具有BeanResolver(使用@someBean引用其他 bean),以及argsarray 属性,来自#root对象可用。有关此根对象的更多信息,请参阅表达式和“全局”标头。使用 XML 配置时,超时属性可以是长值或 SpEL 表达式,如以下示例所示:spring-doc.cadn.net.cn

<method name="someMethod" request-channel="someRequestChannel"
                      payload-expression="args[0]"
                      request-timeout="1000"
                      reply-timeout="args[1]">
</method>

异步网关

作为一种模式,消息传递网关提供了一种隐藏特定于消息传递的代码的好方法,同时仍然公开消息传递系统的全部功能。 如前所述GatewayProxyFactoryBean提供了一种通过服务接口公开代理的便捷方法,使您可以基于 POJO 访问消息传递系统(基于您自己的域中的对象、基元/字符串或其他对象)。 但是,当网关通过返回值的简单 POJO 方法公开时,这意味着对于每个请求消息(在调用方法时生成),必须有一个回复消息(在方法返回时生成)。 由于消息传递系统天生是异步的,您可能并不总是能够保证“对于每个请求,总会有一个回复”的合同。Spring Integration 2.0 引入了对异步网关的支持,当您可能不知道是否需要回复或回复到达需要多长时间时,它提供了一种启动流的便捷方法。spring-doc.cadn.net.cn

为了处理这些类型的场景,Spring Integration 使用java.util.concurrent.Future实例以支持异步网关。spring-doc.cadn.net.cn

从 XML 配置中,没有任何更改,并且您仍然以与定义常规网关相同的方式定义异步网关,如以下示例所示:spring-doc.cadn.net.cn

<int:gateway id="mathService"
     service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
     default-request-channel="requestChannel"/>

但是,网关接口(服务接口)略有不同,如下所示:spring-doc.cadn.net.cn

public interface MathServiceGateway {

  Future<Integer> multiplyByTwo(int i);

}

如前面的示例所示,网关方法的返回类型是Future. 什么时候GatewayProxyFactoryBean看到网关方法的返回类型是Future,它会立即使用AsyncTaskExecutor. 这就是差异的程度。 对此类方法的调用始终立即返回Future实例。 然后,您可以与Future按照自己的节奏获取结果、取消等。 此外,与任何其他用途一样Future实例, 调用get()可能会显示超时、执行异常等。 以下示例显示如何使用Future从异步网关返回:spring-doc.cadn.net.cn

MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult =  result.get(1000, TimeUnit.SECONDS);

有关更详细的示例,请参阅 Spring Integration 示例中的 async-gateway 示例。spring-doc.cadn.net.cn

AsyncTaskExecutor

默认情况下,GatewayProxyFactoryBean使用org.springframework.core.task.SimpleAsyncTaskExecutor提交内部时AsyncInvocationTask返回类型为Future. 但是,async-executor属性中的<gateway/>元素的配置允许您提供对任何实现的引用java.util.concurrent.Executor在 Spring 应用程序上下文中可用。spring-doc.cadn.net.cn

(默认)SimpleAsyncTaskExecutor支持两者FutureCompletableFuture返回类型。 看CompletableFuture. 即使有一个默认的执行器,提供一个外部执行器通常也很有用,以便您可以在日志中识别其线程(使用 XML 时,线程名称基于执行器的 bean 名称),如以下示例所示:spring-doc.cadn.net.cn

@Bean
public AsyncTaskExecutor exec() {
    SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
    simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
    return simpleAsyncTaskExecutor;
}

@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}

如果您想退回其他Future实现时,你可以提供自定义执行器或完全禁用执行器并返回Future在下游流的回复消息有效负载中。要禁用执行程序,请将其设置为nullGatewayProxyFactoryBean(通过使用setAsyncTaskExecutor(null)). 使用 XML 配置网关时,请使用async-executor="". 使用@MessagingGateway注释,请使用类似于以下内容的代码:spring-doc.cadn.net.cn

@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}
如果返回类型是特定的具体Future实现或配置的执行程序不支持的其他一些子接口,则流在调用方的线程上运行,并且流必须在回复消息有效负载中返回所需的类型。

CompletableFuture

从 4.2 版开始,网关方法现在可以返回CompletableFuture<?>. 返回此类型时有两种作模式:spring-doc.cadn.net.cn

  • 当提供异步执行器并且返回类型正好是CompletableFuture(不是子类),框架在执行器上运行任务并立即返回CompletableFuture给来电者。CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)用来创造未来。spring-doc.cadn.net.cn

  • 当异步执行器显式设置为null返回类型为CompletableFuture或者返回类型是CompletableFuture,则在调用方的线程上调用流。 在此方案中,下游流应返回CompletableFuture适当类型的。spring-doc.cadn.net.cn

org.springframework.util.concurrent.ListenableFuture从 Spring Framework 开始已被弃用6.0. 建议现在迁移到CompletableFuture它提供了类似的处理功能。

使用场景

在以下方案中,调用方线程立即返回CompletableFuture<Invoice>,当下游流回复网关时完成(使用Invoice对象)。spring-doc.cadn.net.cn

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />

在以下方案中,调用方线程返回CompletableFuture<Invoice>当下游流将其作为对网关的回复的有效负载提供时。 当发票准备就绪时,其他一些流程必须在未来完成。spring-doc.cadn.net.cn

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
    async-executor="" />

在以下方案中,调用方线程返回CompletableFuture<Invoice>当下游流将其作为对网关的回复的有效负载提供时。 当发票准备就绪时,其他一些流程必须在未来完成。 如果DEBUG启用日志记录时,将发出日志条目,指示异步执行器不能用于此方案。spring-doc.cadn.net.cn

MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />

CompletableFuture实例可用于对回复执行其他作,如以下示例所示:spring-doc.cadn.net.cn

CompletableFuture<String> process(String data);

...

CompletableFuture result = process("foo")
    .thenApply(t -> t.toUpperCase());

...

String out = result.get(10, TimeUnit.SECONDS);

反应器Mono

从 5.0 版开始,GatewayProxyFactoryBean允许将 Project Reactor 与网关接口方法一起使用,使用Mono<T>返回类型。 内部AsyncInvocationTask被包裹在一个Mono.fromCallable().spring-doc.cadn.net.cn

一个Mono可用于稍后检索结果(类似于Future<?>),或者您可以通过调用Consumer当结果返回到网关时。spring-doc.cadn.net.cn

Mono不会立即被框架刷新。 因此,在网关方法返回之前,不会启动基础消息流(因为它使用Future<?> Executor任务)。 当Mono订阅。或者,Mono(作为“可组合项”)可能是 Reactor 流的一部分,当subscribe()关系到整个Flux. 以下示例显示如何使用 Project Reactor 创建网关:
@MessagingGateway
public interface TestGateway {

    @Gateway(requestChannel = "multiplyChannel")
    Mono<Integer> multiply(Integer value);

}

@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
    return value * 2;
}

其中这样的网关可以在某些服务中使用,该服务处理Flux数据数量:spring-doc.cadn.net.cn

@Autowired
TestGateway testGateway;

public void hadnleFlux() {
    Flux.just("1", "2", "3", "4", "5")
            .map(Integer::parseInt)
            .flatMap(this.testGateway::multiply)
            .collectList()
            .subscribe(System.out::println);
}

另一个使用 Project Reactor 的示例是一个简单的回调场景,如以下示例所示:spring-doc.cadn.net.cn

Mono<Invoice> mono = service.process(myOrder);

mono.subscribe(invoice -> handleInvoice(invoice));

调用线程继续,并handleInvoice()在流完成时调用。spring-doc.cadn.net.cn

返回异步类型的下游流

AsyncTaskExecutor部分,如果您希望某些下游组件返回带有异步有效负载 (Future,Mono等),您必须将异步执行器显式设置为null(或使用 XML 配置时)。 然后在调用方线程上调用流,稍后可以检索结果。""spring-doc.cadn.net.cn

异步void返回类型

消息传递网关方法可以这样声明:spring-doc.cadn.net.cn

@MessagingGateway
public interface MyGateway {

    @Gateway(requestChannel = "sendAsyncChannel")
    @Async
    void sendAsync(String payload);

}

但是下游异常不会传播回调用方。 为了确保下游流调用和异常传播到调用方的异步行为,从 6.0 版开始,框架提供了对Future<Void>Mono<Void>返回类型。 该用例类似于前面描述的普通void返回类型,但不同之处在于流执行异步发生并返回Future(或Mono) 以null或根据send作结果。spring-doc.cadn.net.cn

如果Future<Void>是精确的下游流回复,然后asyncExecutor网关的选项必须设置为 null (AnnotationConstants.NULL对于一个@MessagingGateway配置)和send部分在生产者线程上执行。 回复取决于下游流配置。 这样,目标应用程序就可以生成Future<Void>正确回复。 这Monouse-case 已经超出了框架线程控制,因此将asyncExecutor设置为 null 没有意义。 那里Mono<Void>由于请求-回复网关作必须配置为Mono<?>gateway 方法的返回类型。

未到达响应时的网关行为

前所述,网关提供了一种通过 POJO 方法调用与消息传递系统交互的便捷方式。 但是,通常期望始终返回(即使有 Exception)的典型方法调用可能并不总是一对一地映射到消息交换(例如,回复消息可能不会到达 - 相当于方法未返回)。spring-doc.cadn.net.cn

本部分的其余部分介绍各种方案以及如何使网关的行为更可预测。 可以配置某些属性以使同步网关行为更可预测,但其中一些属性可能并不总是按预期工作。 其中之一是reply-timeout(在方法级别或default-reply-timeout在网关级别)。 我们检查reply-timeout属性,以查看它如何影响同步网关在各种场景中的行为。 我们研究了单线程方案(下游的所有组件都通过直接通道连接)和多线程方案(例如,在下游的某个地方,您可能有一个打破单线程边界的可轮询或执行器通道)。spring-doc.cadn.net.cn

下游长时间运行的进程

同步网关,单线程

如果下游组件仍在运行(可能是因为无限循环或服务速度慢),则将reply-timeout没有效果,并且网关方法调用在下游服务退出(通过返回或抛出异常)之前不会返回。spring-doc.cadn.net.cn

同步网关,多线程

如果下游组件仍在多线程消息流中运行(可能是因为无限循环或服务速度慢),则将reply-timeout通过允许网关方法调用在达到超时后返回来产生影响,因为GatewayProxyFactoryBean轮询,等待消息,直到超时到期。 但是,如果在生成实际回复之前已达到超时,则可能会导致网关方法返回“null”。 您应该了解,回复消息(如果生成)是在网关方法调用可能返回后发送到回复通道的,因此您必须意识到这一点,并在设计流时牢记这一点。spring-doc.cadn.net.cn

另请参阅errorOnTimeout属性来抛出一个MessageTimeoutException而不是返回null,当发生超时时。spring-doc.cadn.net.cn

下游组件返回“null”

同步网关 — 单线程

如果下游组件返回“null”,并且reply-timeout已配置为负值,则网关方法调用无限期挂起,除非requires-reply属性已在下游组件(例如,服务激活器)上设置,该组件可能会返回“null”。 在这种情况下,将抛出异常并将其传播到网关。spring-doc.cadn.net.cn

同步网关 — 多线程

行为与前一种情况相同。spring-doc.cadn.net.cn

下游组件返回签名为“void”,而网关方法签名为非 void

同步网关 — 单线程

如果下游组件返回“void”,并且reply-timeout已配置为负值,则网关方法调用无限期挂起。spring-doc.cadn.net.cn

同步网关 — 多线程

行为与前一种情况相同。spring-doc.cadn.net.cn

下游组件导致运行时异常

同步网关 — 单线程

如果下游组件抛出运行时异常,则该异常将通过错误消息传播回网关并重新抛出。spring-doc.cadn.net.cn

同步网关 — 多线程

行为与前一种情况相同。spring-doc.cadn.net.cn

您应该了解,默认情况下,reply-timeout是无限的。 因此,如果将reply-timeout设置为负值,则网关方法调用可能会无限期挂起。 因此,为了确保分析流,并且如果存在发生其中一种情况的可能性很小,您应该将reply-timeout属性设置为“'safe'”值。 是的30秒。 更好的是,您可以将requires-reply属性设置为“true”,以确保及时响应,这是在下游组件内部返回 null 时立即抛出异常产生的。 但是,您还应该意识到,在某些情况下(请参阅第一个场景),其中reply-timeout无济于事。 这意味着分析消息流并决定何时使用同步网关而不是异步网关也很重要。 如前所述,后一种情况是定义返回Future实例。 然后,可以保证您收到该返回值,并且您可以更精细地控制调用结果。 此外,在处理路由器时,您应该记住将resolution-required属性设置为“true”会导致路由器在无法解析特定通道时引发异常。 同样,在处理过滤器时,您可以将throw-exception-on-rejection属性。 在这两种情况下,生成的流的行为就像它包含一个具有“requires-reply”属性的服务激活器。 换句话说,它有助于确保网关方法调用的及时响应。
您应该了解,计时器在线程返回网关时启动,即当流完成或消息传递给另一个线程时。 此时,调用线程开始等待回复。 如果流完全同步,则应答立即可用。 对于异步流,线程会等待到这个时间。

从 6.2 版开始,errorOnTimeout内部的属性MethodInvocationGateway扩展MessagingGatewaySupport暴露在@MessagingGatewayGatewayEndpointSpec. 此选项的含义与终端摘要一章末尾中解释的任何入站网关的含义完全相同。 换句话说,将此选项设置为true,将导致MessageTimeoutException从发送和接收网关作引发,而不是返回null当接收超时用尽时。spring-doc.cadn.net.cn

IntegrationFlow作为 Gateway在 Java DSL 章节中,了解通过IntegrationFlow.spring-doc.cadn.net.cn