RSocket 支持

RSocket 支持

RSocket Spring 集成模块(Spring-积分-RSOCKET)允许执行RSocket应用协议spring-doc.cadn.net.cn

你需要把这种依赖性纳入你的项目中:spring-doc.cadn.net.cn

梅文
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
    <version>6.1.9</version>
</dependency>
格拉德勒
compile "org.springframework.integration:spring-integration-rsocket:6.1.9"

该模块从5.2版本开始可用,基于Spring Messaging基础及其RSocket组件实现,例如:RSocketRequester,RSocketMessageHandlerRocket策略. 有关RSocket协议、术语和组件的更多信息,请参见Spring Framework RSocket支持spring-doc.cadn.net.cn

在通过通道适配器开始集成流程处理之前,我们需要在服务器和客户端之间建立RSocket连接。 为此,Spring Integration RSocket 支持提供了ServerRSocketConnectorClientRSocketConnector该系统的实现摘要RSocket连接器.spring-doc.cadn.net.cn

ServerRSocketConnector根据提供的,在主机和端口上暴露监听器io.rsocket.transport.ServerTransport因为接受客户的联系。 一个内部RSocketServer实例可以通过以下方式进行自定义setServerConfigurer()以及其他可配置的选项,例如:Rocket策略模仿类型用于有效载荷数据和头部元数据。 当设置路由由客户端请求者提供(参见ClientRSocketConnector以下),连接的客户端存储为RSocketRequester在由 确定的密钥下clientRSocketKeyStrategy 双功能<Map<String,Object>,DataBuffer,Object>. 默认情况下,连接数据作为字符串的转换值,使用 UTF-8 字符集。 这样的RSocketRequester注册表可以在应用逻辑中确定与其交互的特定客户端连接,或向所有连接的客户端发布相同消息。 当客户端建立连接时,RSocketConnectedEventServerRSocketConnector. 这与@ConnectMappingSpring 消息模块中的注释。 映射模式意味着接受所有客户端路由。 这*RSocketConnectedEvent可用于区分不同路线,具体如下DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER页眉。spring-doc.cadn.net.cn

典型的服务器配置可能如下:spring-doc.cadn.net.cn

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ServerRSocketConnector serverRSocketConnector() {
    ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
    serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
    serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
    serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
    serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
                                    + headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
    return serverRSocketConnector;
}

@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
	...
}

所有选项,包括Rocket策略豆和@EventListenerRSocketConnectedEvent,是可选的。 看ServerRSocketConnector更多信息请参见JavaDocs。spring-doc.cadn.net.cn

从5.2.1版本开始,ServerRSocketMessageHandler被提取为公开的顶层类,以便可能与现有的RSocket服务器连接。 当ServerRSocketConnector是通过 的外部实例 提供ServerRSocketMessageHandler它不会在内部创建RSocket服务器,而是将所有处理逻辑委托给所提供的实例。 此外,ServerRSocketMessageHandler可以配置为messageMapping兼容还要处理旗帜@MessageMapping对于RSocket控制器,完全取代了标准所提供的功能RSocketMessageHandler. 这在经典配置中非常有用@MessageMapping方法与RSocket通道适配器一起存在,应用程序中也有一个外部配置的RSocket服务器。spring-doc.cadn.net.cn

ClientRSocketConnector作为RSocketRequester基于RSocket通过提供的连接方式客户运输. 这RSocket连接器可以通过提供的RSocketConnectorConfigurer. 这设置路由(含可选模板变量)和setupData该组件还可以配置元数据。spring-doc.cadn.net.cn

典型的客户端配置可能如下:spring-doc.cadn.net.cn

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ClientRSocketConnector clientRSocketConnector() {
    ClientRSocketConnector clientRSocketConnector =
            new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
    clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
    clientRSocketConnector.setSetupRoute("clientConnect/{user}");
    clientRSocketConnector.setSetupRouteVariables("myUser");
    return clientRSocketConnector;
}

大多数选项(包括Rocket策略豆子)是可选的。 注意我们如何通过任意端口连接到本地启动的RSocket服务器。 看ServerRSocketConnector.clientRSocketKeyStrategysetupData使用场景。 另见ClientRSocketConnector以及其摘要RSocket连接器关于更多信息,请参考超类JavaDocs。spring-doc.cadn.net.cn

ClientRSocketConnectorServerRSocketConnector负责将入站信道适配器映射到其路径用于路由接收RSocket请求的配置。 更多信息请见下一节。spring-doc.cadn.net.cn

RSocket 入站网关

RSocketInboundGateway负责接收RSocket请求并生成响应(如有)。 它需要一个数组路径映射可以是类似于 MVC 请求映射的模式,或者@MessageMapping语义学。 此外,(自版本5.2.2起)还有一组交互模型(参见RSocket交互模型)可以在RSocketInboundGateway通过特定帧类型限制RSocket请求至该端点。 默认情况下,所有互动模型都支持。 这样的豆子,根据它的说法IntegrationRSocketEndpoint实现(A的扩展)响应式消息处理器),被自动检测到,要么通过ServerRSocketConnectorClientRSocketConnector对于内部的路由逻辑集成RSocketMessageHandler来处理请求。 一摘要RSocket连接器可以提供给RSocketInboundGateway用于显式端点注册。 这样,自动检测选项就会被禁用摘要RSocket连接器. 这Rocket策略也可以注入RSocketInboundGateway或者它们从提供的摘要RSocket连接器覆盖任何显性注入。 解码器来自以下Rocket策略根据以下条件解码请求有效载荷requestElementType. 如果RSocketPayloadReturnValueHandler.RESPONSE_HEADER接收中未提供报头消息RSocketInboundGateway将请求视为火灾与遗忘RSocket交互模型。 在这种情况下,一个RSocketInboundGateway执行一个平行发送进入输出通道. 否则,单处理器取值RSocketPayloadReturnValueHandler.RESPONSE_HEADER头部用于向RSocket发送回复。 为此,一个RSocketInboundGateway执行发送与接收消息反应输出通道. 这有效载荷在下游发送的消息中总是通量根据消息传递RSocket逻辑。 当火灾与遗忘RSocket交互模型中,消息的转换是有效载荷. 回复有效载荷可以是一个普通物体,也可以是发行人-这RSocketInboundGateway根据编码器提供的编码器,将两者正确转换为 RSocket 响应Rocket策略.spring-doc.cadn.net.cn

从5.3版本开始,adecodeFluxAsUnit选项(默认)false)被添加到RSocketInboundGateway. 默认情况下,输入通量其变换方式是每个事件分别被解码。 这正是目前存在的行为@MessageMapping语义学。 恢复之前的行为或解码整个过程通量根据应用需求,作为单一单元,decodeFluxAsUnit必须设置为true. 然而,目标译码逻辑依赖于译码器被选中,例如 a字符串解码器流中默认需要新的行分隔符来表示字节缓冲区的末端。spring-doc.cadn.net.cn

关于如何配置RSocketInboundGateway端点并处理下游的有效载荷。spring-doc.cadn.net.cn

RSocket 出站网关

RSocketOutboundGateway摘要回复制作消息处理器对RSocket执行请求并根据RSocket回复(如有)生成回复。 低层级RSocket协议交互被委派到以下RSocketRequester从提供的ClientRSocketConnector或者来自RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER请求消息中的头部。 一个目标RSocketRequester在服务器端,可以通过RSocketConnectedEvent或者使用ServerRSocketConnector.getClientRSocketRequester()根据某个业务密钥选择的API,用于连接请求映射,通过以下方式ServerRSocketConnector.setClientRSocketKeyStrategy(). 看ServerRSocketConnector更多信息请参见JavaDocs。spring-doc.cadn.net.cn

路线发送请求必须显式配置(包括路径变量),或通过针对请求消息进行评估的SpEL表达式进行配置。spring-doc.cadn.net.cn

RSocket交互模型可以通过以下方式提供RSocket交互模型选项或相应的表达设置。 默认情况下,请求回应用于常见的网关用例。spring-doc.cadn.net.cn

当请求消息载荷为发行人一个publisherElementType可以提供选项,根据Rocket策略目标中供应RSocketRequester. 该选项的表达式可以计算为参数化类型引用. 参见RSocketRequester.RequestSpec.data()关于数据及其类型的更多信息,请使用 JavaDocs。spring-doc.cadn.net.cn

RSocket请求也可以通过以下方式进行增强元数据. 为此,一个元数据表达式针对请求消息可以配置在RSocketOutboundGateway. 这样的表达式必须应值为Map<Object, MimeType>.spring-doc.cadn.net.cn

什么时候交互模型火灾与遗忘expectedResponseType必须提供。 它是String.class默认。 该选项的表达式可以计算为参数化类型引用. 参见RSocketRequester.RetrieveSpec.retrieveMono()RSocketRequester.RetrieveSpec.retrieveFlux()关于回复数据及其类型的更多信息,请使用 JavaDocs。spring-doc.cadn.net.cn

回复有效载荷来自RSocketOutboundGateway(即使是对火灾与遗忘那就是交互模型单<虚空>)总是将该分量设为异步. 这样的在生产前已订阅输出通道对于普通频道或由流信息频道. 一个通量请求流请求频道交互模型也被包裹在回复中. 它可以在下游通过流信息频道带有直通服务激活器:spring-doc.cadn.net.cn

@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
    return payload;
}

或者明确订阅目标应用逻辑。spring-doc.cadn.net.cn

期望的响应类型也可以配置(或通过表达式评估)为无效将该网关视为出站通道适配器。 然而,输出通道仍然需要配置(即使只是一个零通道)以发起对退伍者协会的订阅.spring-doc.cadn.net.cn

关于如何配置RSocketOutboundGateway端点A,处理下游有效载荷。spring-doc.cadn.net.cn

RSocket 命名空间支持

Spring 集成提供了RSOCKET命名空间及其相应的模式定义。 要将其包含在配置中,请在应用上下文配置文件中添加以下命名空间声明:spring-doc.cadn.net.cn

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/rsocket
    https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
    ...
</beans>

入境

要配置带有XML的Spring Integration RSocket入站通道适配器,你需要使用合适的入站网关来自国际-RSOCKETNamespace。 以下示例展示了如何配置:spring-doc.cadn.net.cn

<int-rsocket:inbound-gateway id="inboundGateway"
                             path="testPath"
                             interaction-models="requestStream,requestChannel"
                             rsocket-connector="clientRSocketConnector"
                             request-channel="requestChannel"
                             rsocket-strategies="rsocketStrategies"
                             request-element-type="byte[]"/>

一个ClientRSocketConnectorServerRSocketConnector应配置为通用<豆子>定义。spring-doc.cadn.net.cn

出境

<int-rsocket:outbound-gateway id="outboundGateway"
                              client-rsocket-connector="clientRSocketConnector"
                              auto-startup="false"
                              interaction-model="fireAndForget"
                              route-expression="'testRoute'"
                              request-channel="requestChannel"
                              publisher-element-type="byte[]"
                              expected-response-type="java.util.Date"
                              metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>

spring-integration-rsocket.xsd用于描述所有这些XML属性。spring-doc.cadn.net.cn

用 Java 配置 RSocket 端点

以下示例展示了如何用 Java 配置 RSocket 入站端点:spring-doc.cadn.net.cn

@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
    RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
    rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
    return rsocketInboundGateway;
}

@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
    return payload.next().map(String::toUpperCase);
}

一个ClientRSocketConnectorServerRSocketConnector在该配置中假设 ,其含义是自动检测“回声”路径上此类端点。 注意@Transformer签名,其对RSocket请求的完全响应式处理并生成响应式回复。spring-doc.cadn.net.cn

以下示例展示了如何用Java DSL配置RSocket入站网关:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
    return IntegrationFlow
        .from(RSockets.inboundGateway("/uppercase")
                   .interactionModels(RSocketInteractionModel.requestChannel))
        .<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
        .get();
}

一个ClientRSocketConnectorServerRSocketConnector在此配置中假设,其意义为自动检测“/大写”路径上的此类端点,预期交互模型为“请求通道”。spring-doc.cadn.net.cn

以下示例展示了如何用 Java 配置 RSocket 出站网关:spring-doc.cadn.net.cn

@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
    RSocketOutboundGateway rsocketOutboundGateway =
            new RSocketOutboundGateway(
                    new FunctionExpression<Message<?>>((m) ->
                        m.getHeaders().get("route_header")));
    rsocketOutboundGateway.setInteractionModelExpression(
            new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
    rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
    return rsocketOutboundGateway;
}

setClientRSocketConnector()仅对客户端要求。 在服务器端,RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER带有RSocketRequester请求消息中必须提供值。spring-doc.cadn.net.cn

以下示例展示了如何配置使用 Java DSL 的 RSocket 出站网关:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
    return IntegrationFlow
        .from(Function.class)
        .handle(RSockets.outboundGateway("/uppercase")
            .interactionModel(RSocketInteractionModel.requestResponse)
            .expectedResponseType(String.class)
            .clientRSocketConnector(clientRSocketConnector))
        .get();
}

集成流程作为门户欲了解更多信息,如何使用上述功能界面位于上述流程的起点。spring-doc.cadn.net.cn