WebFlux 支持

WebFlux 支持

WebFlux Spring 集成模块(Spring-积分-Webflux)允许以响应式方式执行HTTP请求并处理入站HTTP请求。spring-doc.cadn.net.cn

你需要把这种依赖性纳入你的项目中:spring-doc.cadn.net.cn

梅文
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-webflux</artifactId>
    <version>6.1.9</version>
</dependency>
格拉德勒
compile "org.springframework.integration:spring-integration-webflux:6.1.9"

io.projectreactor.netty:reactor-netty非基于Servlet的服务器配置必须包含依赖。spring-doc.cadn.net.cn

WebFlux 支持包括以下网关实现:WebFluxInboundEndpointWebFluxRequestExecutingMessageHandler. 支持完全基于 Spring WebFluxProject Reactor 基础。 更多信息请参见 HTTP 支持,因为许多选项在响应式和常规 HTTP 组件之间是共享的。spring-doc.cadn.net.cn

WebFlux命名空间支持

Spring 集成提供了网络流命名空间及其相应的模式定义。 要将其包含在配置中,请在应用上下文配置文件中添加以下命名空间声明:spring-doc.cadn.net.cn

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/webflux
    https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
    ...
</beans>

WebFlux 入站组件

从5.0版本开始,WebFluxInboundEndpoint实现WebHandler提供。 该组件类似于基于MVC的HttpRequestHandlingEndpointSupport,它们通过新提取的共享一些共同选项BaseHttpInboundEndpoint. 它被用于 Spring WebFlux 响应式环境(取代 MVC)。 以下示例展示了WebFlux端点的一个简单实现:spring-doc.cadn.net.cn

Java DSL
@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
    return IntegrationFlow
        .from(WebFlux.inboundChannelAdapter("/reactivePost")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
            .statusCodeFunction(m -> HttpStatus.ACCEPTED))
        .channel(c -> c.queue("storeChannel"))
        .get();
}
Kotlin DSL
@Bean
fun inboundChannelAdapterFlow() =
    integrationFlow(
        WebFlux.inboundChannelAdapter("/reactivePost")
            .apply {
                requestMapping { m -> m.methods(HttpMethod.POST) }
                requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
                statusCodeFunction { m -> HttpStatus.ACCEPTED }
            })
    {
        channel { queue("storeChannel") }
    }
Java
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {

    @Bean
    public WebFluxInboundEndpoint simpleInboundEndpoint() {
        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setPathPatterns("/test");
        endpoint.setRequestMapping(requestMapping);
        endpoint.setRequestChannelName("serviceChannel");
        return endpoint;
    }

    @ServiceActivator(inputChannel = "serviceChannel")
    String service() {
        return "It works!";
    }

}
XML
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
    <int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>

其构型类似于HttpRequestHandlingEndpointSupport(之前提到过),但我们使用@EnableWebFlux将WebFlux基础设施添加到我们的集成应用中。 另外,还有WebFluxInboundEndpoint执行发送与接收通过响应式 HTTP 服务器实现提供的按需反压功能,实现下游流程的作。spring-doc.cadn.net.cn

回复部分也是非阻塞性的,基于内部未来回复频道,且被平映射为一个回复用于按需分辨率。

你可以配置WebFluxInboundEndpoint有个习俗ServerCodecConfigurer一个RequestedContentTypeResolver,甚至ReactiveAdapter注册表. 后者提供了一种机制,你可以用任何反应类型返回回复:反应堆通量, RxJava观察,流动式,以及其他。 这样,我们可以用 Spring 集成组件实现服务器发送事件场景,如下示例所示:spring-doc.cadn.net.cn

Java DSL
@Bean
public IntegrationFlow sseFlow() {
    return IntegrationFlow
            .from(WebFlux.inboundGateway("/sse")
                    .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            .handle((p, h) -> Flux.just("foo", "bar", "baz"))
            .get();
}
Kotlin DSL
@Bean
fun sseFlow() =
     integrationFlow(
            WebFlux.inboundGateway("/sse")
                       .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            {
                 handle { (p, h) -> Flux.just("foo", "bar", "baz") }
            }
Java
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
    WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
    RequestMapping requestMapping = new RequestMapping();
    requestMapping.setPathPatterns("/sse");
    requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
    endpoint.setRequestMapping(requestMapping);
    endpoint.setRequestChannelName("requests");
    return endpoint;
}
XML
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
                               path="test1"
                               auto-startup="false"
                               phase="101"
                               request-payload-type="byte[]"
                               error-channel="errorChannel"
                               payload-expression="payload"
                               supported-methods="PUT"
                               status-code-expression="'202'"
                               header-mapper="headerMapper"
                               codec-configurer="codecConfigurer"
                               reactive-adapter-registry="reactiveAdapterRegistry"
                               requested-content-type-resolver="requestedContentTypeResolver">
            <int-webflux:request-mapping headers="foo"/>
            <int-webflux:cross-origin origin="foo" method="PUT"/>
            <int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>

当请求主体为空时,或者payloadExpression返回, 请求参数(多值地图<字符串,字符串>)用于有效载荷目标消息的处理。spring-doc.cadn.net.cn

有效载荷验证

从5.2版本开始,WebFluxInboundEndpoint可以配置为验证器. 与 HTTP 支持中的 MVC 验证不同,它用于验证发行人请求已被HttpMessageReader,然后执行一个回退,且payloadExpression功能。 框架无法假设发行人目标可以在构建最终有效载荷后。 如果有限制验证可见性的要求,则对正好最终有效载荷(或其发行人元素),验证应进行到下游,而不是WebFlux端点。 更多信息请参见Spring WebFlux文档。 无效有效载荷被拒绝时会有IntegrationWebExchangeBindException(aWebExchangeBindException扩展),包含所有验证错误. 更多内容请参见Spring Framework参考手册,了解验证。spring-doc.cadn.net.cn

WebFlux 出站组件

WebFluxRequestExecutingMessageHandler(从5.0版本开始)实现类似于HttpRequestExecutingMessageHandler. 它使用了一个Web客户端来自 Spring Framework WebFlux 模块。 要配置它,请定义类似以下内容的豆子:spring-doc.cadn.net.cn

Java DSL
@Bean
public IntegrationFlow outboundReactive() {
    return f -> f
        .handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
                UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
                        .queryParams(m.getPayload())
                        .build()
                        .toUri())
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String.class));
}
Kotlin DSL
@Bean
fun outboundReactive() =
    integrationFlow {
        handle(
            WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
                UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
                    .queryParams(m.getPayload())
                    .build()
                    .toUri()
            })
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String::class.java)
        )
    }
Java
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
    WebFluxRequestExecutingMessageHandler handler =
        new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
    handler.setHttpMethod(HttpMethod.POST);
    handler.setExpectedResponseType(String.class);
    return handler;
}
XML
<int-webflux:outbound-gateway id="reactiveExample1"
    request-channel="requests"
    url="http://localhost/test"
    http-method-expression="headers.httpMethod"
    extract-request-payload="false"
    expected-response-type-expression="payload"
    charset="UTF-8"
    reply-timeout="1234"
    reply-channel="replies"/>

<int-webflux:outbound-channel-adapter id="reactiveExample2"
    url="http://localhost/example"
    http-method="GET"
    channel="requests"
    charset="UTF-8"
    extract-payload="false"
    expected-response-type="java.lang.String"
    order="3"
    auto-startup="false"/>

Web客户端 exchange()作返回A单一客户<响应>,映射为(通过使用几个单一地图(Mono.map)steps)到一个摘要集成信息构建器作为WebFluxRequestExecutingMessageHandler. 与反应频道作为输出通道单一客户<响应>评估会被推迟,直到下游订阅完成。 否则,它被视为异步模式,以及响应被调整为可设定可听未来对于来自WebFluxRequestExecutingMessageHandler. 输出消息的目标有效载荷取决于WebFluxRequestExecutingMessageHandler配置。 这setExpectedResponseType(Class<?>)setExpectedResponseTypeExpression(表达式)识别响应体元素转换的目标类型。 如果回复PayloadToFlux设置为true,响应体被转换为通量提供的expectedResponseType对每个元素,且通量作为有效载荷发送到下游。 之后,你可以用分流器来迭代这个过程通量以一种被动的方式。spring-doc.cadn.net.cn

此外,还有一个BodyExtractor<?, ClientHttpResponse>可以注入WebFluxRequestExecutingMessageHandler而不是expectedResponseType回复PayloadToFlux性能。 它可以用于对客户端HttpResponse(客户端HttpResponse)以及对正文和HTTP头转换的更多控制。 Spring Integration 提供ClientHttpResponseBodyExtractor作为恒等函数产生(下游)整体客户端HttpResponse(客户端HttpResponse)以及任何其他可能的自定义逻辑。spring-doc.cadn.net.cn

从5.2版本开始,WebFluxRequestExecutingMessageHandler支持响应式发行人,资源多价值地图类型为请求消息有效载荷。 一个身体插入器内部使用以填充到WebClient.RequestBodySpec. 当有效载荷是响应式时发行人, 配置过publisherElementTypepublisherElementTypeExpression可用于确定发布者元素类型的类型。 该表达式必须解析为班级是<?>,字符串该 被解析为目标班级是<?>参数化类型引用.spring-doc.cadn.net.cn

从5.5版本开始,WebFluxRequestExecutingMessageHandler暴露了extractResponseBody(提取响应体)旗帜(即true默认情况下,返回仅响应正文,或返回整个响应实体作为回复消息的有效载荷,独立于所提供的expectedResponseType回复PayloadToFlux. 如果一个物体不存在于响应实体这面旗帜被忽视了,整个响应实体被归还。spring-doc.cadn.net.cn

更多可能的配置选项请参见 HTTP 出站组件spring-doc.cadn.net.cn

WebFlux 头部映射

由于 WebFlux 组件完全基于 HTTP 协议,HTTP 头部映射没有区别。 参见HTTP头部映射,了解更多可用于映射头部的选项和组件。spring-doc.cadn.net.cn

WebFlux 请求属性

从6.0版本开始,WebFluxRequestExecutingMessageHandler可以配置为通过以下方式来评估请求属性setAttributeVariablesExpression(). 该SpEL表达式必须在地图. 这样的映射随后传播到WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer)HTTP 请求配置回调。 如果需要传递一个以键值对象形式出现的信息,这会很有帮助消息请求和下游过滤将获得这些属性的访问权限以进行进一步处理。spring-doc.cadn.net.cn