此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1spring-doc.cadn.net.cn

RSocket 支持

RSocket Spring 集成模块 (spring-integration-rsocket) 允许执行 RSocket 应用程序协议spring-doc.cadn.net.cn

项目需要此依赖项:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
    <version>7.0.0-SNAPSHOT</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:7.0.0-SNAPSHOT"

该模块从版本 5.2 开始可用,并且基于 Spring Messaging 基础及其 RSocket 组件实现,例如RSocketRequester,RSocketMessageHandlerRSocketStrategies. 有关 RSocket 协议、术语和组件的更多信息,请参阅 Spring Framework RSocket 支持spring-doc.cadn.net.cn

在通过通道适配器开始集成流处理之前,我们需要在服务器和客户端之间建立 RSocket 连接。为此,Spring Integration RSocket 支持提供了ServerRSocketConnectorClientRSocketConnector的实现AbstractRSocketConnector.spring-doc.cadn.net.cn

ServerRSocketConnector根据提供的io.rsocket.transport.ServerTransport用于接受来自客户端的连接。内部RSocketServer实例可以使用setServerConfigurer(),以及其他可以配置的选项,例如RSocketStrategiesMimeType用于有效负载数据和标头元数据。当setupRoute由客户端请求者提供(请参阅ClientRSocketConnector),连接的客户端存储为RSocketRequester在由clientRSocketKeyStrategy BiFunction<Map<String, Object>, DataBuffer, Object>. 默认情况下,连接数据用作键的转换值,转换为具有 UTF-8 字符集的字符串。这样的RSocketRequester注册表可以在应用程序逻辑中用于确定特定的客户端连接以与其交互,或将相同的消息发布到所有连接的客户端。从客户端建立连接时,RSocketConnectedEventServerRSocketConnector. 这类似于@ConnectMappingSpring Messaging 模块中的注释。映射模式意味着接受所有客户端路由。 这*RSocketConnectedEvent可用于区分不同的路由DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER页眉。spring-doc.cadn.net.cn

典型的服务器配置可能如下所示:spring-doc.cadn.net.cn

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ServerRSocketConnector serverRSocketConnector() {
    ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
    serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
    serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
    serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
    serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
                                    + headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
    return serverRSocketConnector;
}

@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
	...
}

所有选项,包括RSocketStrategiesbean 和@EventListenerRSocketConnectedEvent,是可选的。 看ServerRSocketConnectorJavaDocs 了解更多信息。spring-doc.cadn.net.cn

从 5.2.1 版本开始,ServerRSocketMessageHandler被提取到公共顶级类中,以便可能与现有 RSocket 服务器连接。当ServerRSocketConnector与外部实例ServerRSocketMessageHandler,它不会在内部创建 RSocket 服务器,而只是将所有处理逻辑委托给提供的实例。此外,ServerRSocketMessageHandler可以配置为messageMappingCompatible要处理的标志@MessageMapping对于 RSocket 控制器,完全取代标准提供的功能RSocketMessageHandler. 这在混合配置中非常有用,当经典配置@MessageMapping方法与 RSocket 通道适配器一起存在于同一应用程序中,并且应用程序中存在外部配置的 RSocket 服务器。spring-doc.cadn.net.cn

ClientRSocketConnector作为RSocketRequester基于RSocket通过提供的ClientTransport. 这RSocketConnector可以使用提供的RSocketConnectorConfigurer. 这setupRoute(使用可选模板变量)和setupData也可以在此组件上配置元数据。spring-doc.cadn.net.cn

典型的客户端配置可能如下所示:spring-doc.cadn.net.cn

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ClientRSocketConnector clientRSocketConnector() {
    ClientRSocketConnector clientRSocketConnector =
            new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
    clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
    clientRSocketConnector.setSetupRoute("clientConnect/{user}");
    clientRSocketConnector.setSetupRouteVariables("myUser");
    return clientRSocketConnector;
}

这些选项中的大多数(包括RSocketStrategiesbean)是可选的。请注意我们如何在任意端口上连接到本地启动的 RSocket 服务器。 看ServerRSocketConnector.clientRSocketKeyStrategysetupData用例。另请参阅ClientRSocketConnector及其AbstractRSocketConnector超类 Javadocs 以获取更多信息。spring-doc.cadn.net.cn

ClientRSocketConnectorServerRSocketConnector负责将入站通道适配器映射到其path用于路由传入 RSocket 请求的配置。有关详细信息,请参阅下一节。spring-doc.cadn.net.cn

RSocket 入站网关

RSocketInboundGateway负责接收 RSocket 请求并生成响应(如果有)。它需要一个数组path映射,可以是类似于 MVC 请求映射或@MessageMapping语义学。 此外,(从 5.2.2 版本开始),一组交互模型(参见RSocketInteractionModel) 可以在RSocketInboundGateway通过特定的帧类型限制对此端点的 RSocket 请求。默认情况下,支持所有交互模型。这样的 bean,根据其IntegrationRSocketEndpoint实现(扩展ReactiveMessageHandler),由ServerRSocketConnectorClientRSocketConnector对于内部IntegrationRSocketMessageHandler用于传入请求。 一AbstractRSocketConnector可以提供给RSocketInboundGateway用于显式终结点注册。 这样,自动检测选项就会被禁用AbstractRSocketConnector. 这RSocketStrategies也可以注射到RSocketInboundGateway或者它们是从提供的AbstractRSocketConnector覆盖任何显式注入。 解码器来自那些RSocketStrategies根据提供的requestElementType. 如果RSocketPayloadReturnValueHandler.RESPONSE_HEADERheader 在传入的MessageRSocketInboundGateway将请求视为fireAndForgetRSocket 交互模型。 在这种情况下,一个RSocketInboundGateway执行一个普通的send作到outputChannel. 否则,一个MonoProcessor值来自RSocketPayloadReturnValueHandler.RESPONSE_HEADER标头用于向 RSocket 发送回复。 为此,一个RSocketInboundGateway执行sendAndReceiveMessageReactiveoutputChannel. 这payload要向下游发送的消息始终是Flux根据MessagingRSocket逻辑。 当在fireAndForgetRSocket 交互模型,消息具有payload. 答复payload可以是一个普通对象或一个Publisher-这RSocketInboundGateway根据RSocketStrategies.spring-doc.cadn.net.cn

从 5.3 版开始,一个decodeFluxAsUnit选项(默认false) 添加到RSocketInboundGateway. 默认情况下,传入Flux以单独解码每个事件的方式进行转换。 这是当前存在的确切行为@MessageMapping语义学。 要恢复以前的行为或解码整个行为Flux作为单个单元,根据应用要求,decodeFluxAsUnit必须设置为true. 但是,目标解码逻辑取决于Decoder已选择,例如StringDecoder要求流中存在换行分隔符(默认情况下)以指示字节缓冲区结束。spring-doc.cadn.net.cn

有关如何配置RSocketInboundGateway端点并处理下游有效负载。spring-doc.cadn.net.cn

RSocket 出站网关

RSocketOutboundGateway是一个AbstractReplyProducingMessageHandler向 RSocket 执行请求,并根据 RSocket 应答(如果有)生成应答。 低级 RSocket 协议交互被委托到RSocketRequester从提供的ClientRSocketConnector或从RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADERheader 在服务器端的请求消息中。 目标RSocketRequester可以从RSocketConnectedEvent或使用ServerRSocketConnector.getClientRSocketRequester()API 根据为连接请求映射选择的某些业务密钥,通过ServerRSocketConnector.setClientRSocketKeyStrategy(). 看ServerRSocketConnectorJavaDocs 了解更多信息。spring-doc.cadn.net.cn

route要发送请求,必须显式配置(与路径变量一起)或通过根据请求消息评估的 SpEL 表达式。spring-doc.cadn.net.cn

RSocket 交互模型可以通过以下方式提供RSocketInteractionModel选项或相应的表达式设置。 默认情况下,一个requestResponse用于常见的网关用例。spring-doc.cadn.net.cn

当请求消息有效负载是Publisher一个publisherElementType选项可以根据RSocketStrategies在目标中提供RSocketRequester. 此选项的表达式可以计算为ParameterizedTypeReference. 请参阅RSocketRequester.RequestSpec.data()JavaDocs 了解有关数据及其类型的更多信息。spring-doc.cadn.net.cn

RSocket 请求也可以通过metadata. 为此,一个metadataExpressionAgainst Request 消息可以在RSocketOutboundGateway. 这样的表达式必须计算为Map<Object, MimeType>.spring-doc.cadn.net.cn

什么时候interactionModelfireAndForgetexpectedResponseType必须提供。这是一个String.class默认情况下。此选项的表达式可以计算为ParameterizedTypeReference. 请参阅RSocketRequester.RetrieveSpec.retrieveMono()RSocketRequester.RetrieveSpec.retrieveFlux()JavaDocs 了解有关回复数据及其类型的更多信息。spring-doc.cadn.net.cn

回复payloadRSocketOutboundGateway是一个Mono(即使对于fireAndForget交互模型 它是Mono<Void>) 始终将此组件设置为async. 这样的Mono在生成到outputChannel对于常规频道或由FluxMessageChannel. 一个Flux响应requestStreamrequestChannel交互模型也包装在回复中Mono. 它可以通过以下方式压平下游FluxMessageChannel使用直通服务激活器:spring-doc.cadn.net.cn

@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
    return payload;
}

或者在目标应用程序逻辑中显式订阅。spring-doc.cadn.net.cn

预期的响应类型也可以配置(或通过表达式计算)为void将此网关视为出站通道适配器。但是,outputChannel仍然必须配置(即使它只是一个NullChannel) 启动对返回的Mono.spring-doc.cadn.net.cn

有关如何配置RSocketOutboundGateway端点与下游有效负载的交易。spring-doc.cadn.net.cn

RSocket 命名空间支持

Spring Integration 提供了一个rsocket命名空间和相应的架构定义。 要将其包含在配置中,请在应用程序上下文配置文件中添加以下命名空间声明:spring-doc.cadn.net.cn

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/rsocket
    https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
    ...
</beans>

入境

要使用 XML 配置 Spring Integration RSocket 入站通道适配器,您需要使用适当的inbound-gateway组件中的int-rsocketNamespace。 以下示例显示了如何配置它:spring-doc.cadn.net.cn

<int-rsocket:inbound-gateway id="inboundGateway"
                             path="testPath"
                             interaction-models="requestStream,requestChannel"
                             rsocket-connector="clientRSocketConnector"
                             request-channel="requestChannel"
                             rsocket-strategies="rsocketStrategies"
                             request-element-type="byte[]"/>

一个ClientRSocketConnectorServerRSocketConnector应配置为通用<bean>定义。spring-doc.cadn.net.cn

出境

<int-rsocket:outbound-gateway id="outboundGateway"
                              client-rsocket-connector="clientRSocketConnector"
                              auto-startup="false"
                              interaction-model="fireAndForget"
                              route-expression="'testRoute'"
                              request-channel="requestChannel"
                              publisher-element-type="byte[]"
                              expected-response-type="java.util.Date"
                              metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>

spring-integration-rsocket.xsd以获取所有这些 XML 属性的描述。spring-doc.cadn.net.cn

使用 Java 配置 RSocket 端点

以下示例演示如何使用 Java 配置 RSocket 入站终结点:spring-doc.cadn.net.cn

@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
    RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
    rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
    return rsocketInboundGateway;
}

@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
    return payload.next().map(String::toUpperCase);
}

一个ClientRSocketConnectorServerRSocketConnector在此配置中假定,其含义用于自动检测“回显”路径上的此类端点。 注意@Transformer签名,对 RSocket 请求进行完全响应式处理并生成响应式回复。spring-doc.cadn.net.cn

以下示例显示如何使用 Java DSL 配置 RSocket 入站网关:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
    return IntegrationFlow
        .from(RSockets.inboundGateway("/uppercase")
                   .interactionModels(RSocketInteractionModel.requestChannel))
        .<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
        .get();
}

一个ClientRSocketConnectorServerRSocketConnector在此配置中假定,其含义是自动检测“/uppercase”路径上的此类端点,并预期的交互模型为“请求通道”。spring-doc.cadn.net.cn

以下示例演示如何使用 Java 配置 RSocket 出站网关:spring-doc.cadn.net.cn

@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
    RSocketOutboundGateway rsocketOutboundGateway =
            new RSocketOutboundGateway(
                    new FunctionExpression<Message<?>>((m) ->
                        m.getHeaders().get("route_header")));
    rsocketOutboundGateway.setInteractionModelExpression(
            new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
    rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
    return rsocketOutboundGateway;
}

setClientRSocketConnector()仅客户端需要。 在服务器端,RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER标头,并带有RSocketRequestervalue 必须在请求消息中提供。spring-doc.cadn.net.cn

以下示例显示如何使用 Java DSL 配置 RSocket 出站网关:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
    return IntegrationFlow
        .from(Function.class)
        .handle(RSockets.outboundGateway("/uppercase")
            .interactionModel(RSocketInteractionModel.requestResponse)
            .expectedResponseType(String.class)
            .clientRSocketConnector(clientRSocketConnector))
        .get();
}

IntegrationFlow作为网关有关如何使用提及的详细信息Function界面。spring-doc.cadn.net.cn