WebFlux 支持
WebFlux Spring 集成模块 (spring-integration-webflux) 允许以反应方式执行 HTTP 请求和处理入站 HTTP 请求。
您需要将此依赖项包含在您的项目中:
- 
Maven 
- 
Gradle 
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-webflux</artifactId>
    <version>6.4.0</version>
</dependency>compile "org.springframework.integration:spring-integration-webflux:6.4.0"这io.projectreactor.netty:reactor-netty如果服务器配置不是基于 Servlet 的,则必须包含 dependency 。
WebFlux 支持包括以下网关实现:WebFluxInboundEndpoint和WebFluxRequestExecutingMessageHandler.
该支持完全基于 Spring WebFlux 和 Project Reactor 基础。
有关更多信息,请参阅 HTTP 支持,因为许多选项在反应式和常规 HTTP 组件之间共享。
WebFlux 命名空间支持
Spring 集成提供了一个webfluxnamespace 和相应的 schema definition 创建。
要将其包含在您的配置中,请在您的应用程序上下文配置文件中添加以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/webflux
    https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
    ...
</beans>WebFlux 入站组件
从版本 5.0 开始,WebFluxInboundEndpoint实现WebHandler。
此组件类似于基于 MVC 的HttpRequestHandlingEndpointSupport,它与它共享一些常用选项,通过新提取的BaseHttpInboundEndpoint.
它用于 Spring WebFlux 反应式环境(而不是 MVC)。
以下示例显示了 WebFlux 端点的简单实现:
- 
Java DSL 
- 
Kotlin DSL 
- 
Java 
- 
XML 
@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
    return IntegrationFlow
        .from(WebFlux.inboundChannelAdapter("/reactivePost")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
            .statusCodeFunction(m -> HttpStatus.ACCEPTED))
        .channel(c -> c.queue("storeChannel"))
        .get();
}@Bean
fun inboundChannelAdapterFlow() =
    integrationFlow(
        WebFlux.inboundChannelAdapter("/reactivePost")
            .apply {
                requestMapping { m -> m.methods(HttpMethod.POST) }
                requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
                statusCodeFunction { m -> HttpStatus.ACCEPTED }
            })
    {
        channel { queue("storeChannel") }
    }@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {
    @Bean
    public WebFluxInboundEndpoint simpleInboundEndpoint() {
        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setPathPatterns("/test");
        endpoint.setRequestMapping(requestMapping);
        endpoint.setRequestChannelName("serviceChannel");
        return endpoint;
    }
    @ServiceActivator(inputChannel = "serviceChannel")
    String service() {
        return "It works!";
    }
}<int-webflux:inbound-gateway request-channel="requests" path="/sse">
    <int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>该配置类似于HttpRequestHandlingEndpointSupport(在本示例前面提到),除了我们使用@EnableWebFlux将 WebFlux 基础架构添加到我们的集成应用程序中。
此外,WebFluxInboundEndpoint执行sendAndReceive作到下游流。
| 回复部分也是非阻塞的,并且基于内部的 FutureReplyChannel,它被平面映射到回复Mono以获得按需分辨率。 | 
您可以配置WebFluxInboundEndpoint使用自定义ServerCodecConfigurer一个RequestedContentTypeResolver,甚至ReactiveAdapterRegistry.
后者提供了一种机制,你可以用它来将回复作为任何反应类型返回:ReactorFlux、RxJavaObservable,Flowable等。
这样,我们就可以使用 Spring 集成组件实现 Server Sent Events 场景,如下例所示:
- 
Java DSL 
- 
Kotlin DSL 
- 
Java 
- 
XML 
@Bean
public IntegrationFlow sseFlow() {
    return IntegrationFlow
            .from(WebFlux.inboundGateway("/sse")
                    .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            .handle((p, h) -> Flux.just("foo", "bar", "baz"))
            .get();
}@Bean
fun sseFlow() =
     integrationFlow(
            WebFlux.inboundGateway("/sse")
                       .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            {
                 handle { (p, h) -> Flux.just("foo", "bar", "baz") }
            }@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
    WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
    RequestMapping requestMapping = new RequestMapping();
    requestMapping.setPathPatterns("/sse");
    requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
    endpoint.setRequestMapping(requestMapping);
    endpoint.setRequestChannelName("requests");
    return endpoint;
}<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
                               path="test1"
                               auto-startup="false"
                               phase="101"
                               request-payload-type="byte[]"
                               error-channel="errorChannel"
                               payload-expression="payload"
                               supported-methods="PUT"
                               status-code-expression="'202'"
                               header-mapper="headerMapper"
                               codec-configurer="codecConfigurer"
                               reactive-adapter-registry="reactiveAdapterRegistry"
                               requested-content-type-resolver="requestedContentTypeResolver">
            <int-webflux:request-mapping headers="foo"/>
            <int-webflux:cross-origin origin="foo" method="PUT"/>
            <int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>有关更多可能的配置选项,请参阅请求映射支持和跨域资源共享 (CORS) 支持。
当请求正文为空或payloadExpression返回null、请求参数 (MultiValueMap<String, String>) 用于payload的目标消息。
有效载荷验证
从版本 5.2 开始,WebFluxInboundEndpoint可以配置Validator.
与 HTTP 支持中的 MVC 验证不同,它用于验证Publisher请求已由HttpMessageReader,在执行回退之前,使用 Swift 和payloadExpression功能。
框架无法假设Publisherobject 可以在构建最终有效负载之后。
如果需要限制确切最终有效负载(或其Publisher元素),验证应该向下游而不是 WebFlux 端点。
请参阅 Spring WebFlux 文档中的更多信息。
无效的有效负载被拒绝,并显示IntegrationWebExchangeBindException(一个WebExchangeBindException扩展),包含所有验证Errors.
在 Spring Framework 参考手册中有关验证的更多信息。
WebFlux 出站组件
这WebFluxRequestExecutingMessageHandler(从版本 5.0 开始)实现类似于HttpRequestExecutingMessageHandler.
它使用WebClient从 Spring Framework WebFlux 模块。
要配置它,请定义类似于以下内容的 Bean:
- 
Java DSL 
- 
Kotlin DSL 
- 
Java 
- 
XML 
@Bean
public IntegrationFlow outboundReactive() {
    return f -> f
        .handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
                UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
                        .queryParams(m.getPayload())
                        .build()
                        .toUri())
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String.class));
}@Bean
fun outboundReactive() =
    integrationFlow {
        handle(
            WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
                UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
                    .queryParams(m.getPayload())
                    .build()
                    .toUri()
            })
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String::class.java)
        )
    }@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
    WebFluxRequestExecutingMessageHandler handler =
        new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
    handler.setHttpMethod(HttpMethod.POST);
    handler.setExpectedResponseType(String.class);
    return handler;
}<int-webflux:outbound-gateway id="reactiveExample1"
    request-channel="requests"
    url="http://localhost/test"
    http-method-expression="headers.httpMethod"
    extract-request-payload="false"
    expected-response-type-expression="payload"
    charset="UTF-8"
    reply-timeout="1234"
    reply-channel="replies"/>
<int-webflux:outbound-channel-adapter id="reactiveExample2"
    url="http://localhost/example"
    http-method="GET"
    channel="requests"
    charset="UTF-8"
    extract-payload="false"
    expected-response-type="java.lang.String"
    order="3"
    auto-startup="false"/>这WebClient exchange()作会返回一个Mono<ClientResponse>,该映射映射(通过使用多个Mono.map()steps) 复制到AbstractIntegrationMessageBuilder作为WebFluxRequestExecutingMessageHandler.
与ReactiveChannel作为outputChannel这Mono<ClientResponse>评估将推迟到进行下游订阅。
否则,它被视为async模式和Mono响应已适应SettableListenableFuture对于来自WebFluxRequestExecutingMessageHandler.
输出消息的目标负载取决于WebFluxRequestExecutingMessageHandler配置。
这setExpectedResponseType(Class<?>)或setExpectedResponseTypeExpression(Expression)标识 Target type 的 Response body 元素转换。
如果replyPayloadToFlux设置为true,响应正文将转换为Flux使用提供的expectedResponseType对于每个元素,并且 thisFlux作为负载向下游发送。
之后,你可以使用 splitter 来迭代 thisFlux以一种反应性的方式。
此外,一个BodyExtractor<?, ClientHttpResponse>可以注射到WebFluxRequestExecutingMessageHandler而不是expectedResponseType和replyPayloadToFlux性能。
它可用于对ClientHttpResponse以及对正文和 HTTP 标头转换的更多控制。
Spring 集成提供了ClientHttpResponseBodyExtractor作为恒等函数来生成(下游)整个ClientHttpResponse和任何其他可能的自定义逻辑。
从版本 5.2 开始,WebFluxRequestExecutingMessageHandler支持反应式Publisher,Resource和MultiValueMap类型作为请求消息负载。
一个BodyInserter在内部用于填充到WebClient.RequestBodySpec.
当有效负载是 reactive 时Publisher,则已配置publisherElementType或publisherElementTypeExpression可用于确定发布者的元素类型的类型。
表达式必须解析为Class<?>,String解析为目标Class<?>或ParameterizedTypeReference.
从版本 5.5 开始,WebFluxRequestExecutingMessageHandler暴露一个extractResponseBody标志(即true)以仅返回响应正文,或返回整个ResponseEntity作为回复消息有效负载,独立于提供的expectedResponseType或replyPayloadToFlux.
如果ResponseEntity,则忽略此标志,并且整个ResponseEntity返回。
有关更多可能的配置选项,请参阅 HTTP 出站组件。
WebFlux 标头映射
由于 WebFlux 组件完全基于 HTTP 协议,因此 HTTP 标头映射没有区别。 请参阅 HTTP Header Mappings 了解用于映射 Headers 的更多可能选项和组件。
WebFlux 请求属性
从版本 6.0 开始,WebFluxRequestExecutingMessageHandler可以配置为通过以下方式评估请求属性setAttributeVariablesExpression().
此 SPEL 表达式必须在Map.
然后,这样的映射会传播到WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer)HTTP 请求配置回调。
如果需要从Message请求和下游过滤器将访问这些属性以进行进一步处理。