此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
RSocket 支持
RSocket Spring 集成模块 (spring-integration-rsocket
) 允许执行 RSocket 应用程序协议。
您需要将此依赖项包含在您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>6.3.12-SNAPSHOT</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:6.3.12-SNAPSHOT"
该模块从版本 5.2 开始可用,并且基于 Spring Messaging 基础及其 RSocket 组件实现,例如RSocketRequester
,RSocketMessageHandler
和RSocketStrategies
.
有关 RSocket 协议、术语和组件的更多信息,请参阅 Spring Framework RSocket 支持。
在通过通道适配器开始集成流处理之前,我们需要在服务器和客户端之间建立 RSocket 连接。
为此,Spring Integration RSocket 支持提供了ServerRSocketConnector
和ClientRSocketConnector
的实现AbstractRSocketConnector
.
这ServerRSocketConnector
根据提供的io.rsocket.transport.ServerTransport
用于接受来自客户端的连接。
内部RSocketServer
实例可以使用setServerConfigurer()
,以及其他可以配置的选项,例如RSocketStrategies
和MimeType
用于有效负载数据和标头元数据。
当setupRoute
由客户端请求者提供(请参阅ClientRSocketConnector
),连接的客户端存储为RSocketRequester
在由clientRSocketKeyStrategy
BiFunction<Map<String, Object>, DataBuffer, Object>
.
默认情况下,连接数据用于键,作为转换为具有 UTF-8 字符集的字符串的值。
这样的RSocketRequester
注册表可以在应用程序逻辑中用于确定与其交互的特定客户端连接,或将相同的消息发布到所有连接的客户端。
从客户端建立连接时,RSocketConnectedEvent
从ServerRSocketConnector
.
这类似于@ConnectMapping
注释。
映射模式意味着接受所有客户端路由。
这*
RSocketConnectedEvent
可用于区分不同的路由DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER
页眉。
典型的服务器配置可能如下所示:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}
@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}
所有选项,包括RSocketStrategies
bean 和@EventListener
为RSocketConnectedEvent
,是可选的。
看ServerRSocketConnector
JavaDocs 了解更多信息。
从 5.2.1 版本开始,ServerRSocketMessageHandler
被提取到公共顶级类中,以便可能与现有 RSocket 服务器连接。
当ServerRSocketConnector
与外部实例ServerRSocketMessageHandler
,它不会在内部创建 RSocket 服务器,而只是将所有处理逻辑委托给提供的实例。
此外,ServerRSocketMessageHandler
可以配置为messageMappingCompatible
要处理的标志@MessageMapping
对于 RSocket 控制器,完全取代标准提供的功能RSocketMessageHandler
.
这在混合配置中非常有用,当经典配置@MessageMapping
方法与 RSocket 通道适配器一起存在于同一应用程序中,并且应用程序中存在外部配置的 RSocket 服务器。
这ClientRSocketConnector
作为RSocketRequester
基于RSocket
通过提供的ClientTransport
. 这RSocketConnector
可以使用提供的RSocketConnectorConfigurer
. 这setupRoute
(使用可选模板变量)和setupData
也可以在此组件上配置元数据。
典型的客户端配置可能如下所示:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}
这些选项中的大多数(包括RSocketStrategies
bean)是可选的。
请注意我们如何在任意端口上连接到本地启动的 RSocket 服务器。
看ServerRSocketConnector.clientRSocketKeyStrategy
为setupData
用例。
另请参阅ClientRSocketConnector
及其AbstractRSocketConnector
超类 JavaDocs 了解更多信息。
双ClientRSocketConnector
和ServerRSocketConnector
负责将入站通道适配器映射到其path
用于路由传入 RSocket 请求的配置。
有关详细信息,请参阅下一节。
RSocket 入站网关
这RSocketInboundGateway
负责接收 RSocket 请求并生成响应(如果有)。
它需要一个数组path
映射,可以是类似于 MVC 请求映射或@MessageMapping
语义学。
此外,(从 5.2.2 版本开始),一组交互模型(参见RSocketInteractionModel
) 可以在RSocketInboundGateway
按特定帧类型限制对此终结点的 RSocket 请求。
默认情况下,支持所有交互模型。
这样的豆子,根据它的IntegrationRSocketEndpoint
实现(扩展ReactiveMessageHandler
),由ServerRSocketConnector
或ClientRSocketConnector
对于内部IntegrationRSocketMessageHandler
用于传入请求。
一AbstractRSocketConnector
可以提供给RSocketInboundGateway
用于显式终结点注册。
这样,自动检测选项就会被禁用AbstractRSocketConnector
. 这RSocketStrategies
也可以注射到RSocketInboundGateway
或者它们是从提供的AbstractRSocketConnector
覆盖任何显式注入。
解码器来自那些RSocketStrategies
根据提供的requestElementType
.
如果RSocketPayloadReturnValueHandler.RESPONSE_HEADER
header 在传入的Message
这RSocketInboundGateway
将请求视为fireAndForget
RSocket 交互模型。
在这种情况下,一个RSocketInboundGateway
执行一个普通的send
作到outputChannel
.
否则,一个MonoProcessor
值来自RSocketPayloadReturnValueHandler.RESPONSE_HEADER
标头用于向 RSocket 发送回复。
为此,一个RSocketInboundGateway
执行sendAndReceiveMessageReactive
对outputChannel
. 这payload
要向下游发送的消息始终是Flux
根据MessagingRSocket
逻辑。
当在fireAndForget
RSocket 交互模型,消息具有payload
.
答复payload
可以是一个普通对象或一个Publisher
-这RSocketInboundGateway
根据RSocketStrategies
.
从 5.3 版开始,一个decodeFluxAsUnit
选项(默认false
) 添加到RSocketInboundGateway
.
默认情况下,传入Flux
以单独解码每个事件的方式进行转换。
这是当前存在的确切行为@MessageMapping
语义学。
要恢复以前的行为或解码整个行为Flux
作为单个单元,根据应用要求,decodeFluxAsUnit
必须设置为true
.
但是,目标解码逻辑取决于Decoder
已选择,例如StringDecoder
要求流中存在换行分隔符(默认情况下)以指示字节缓冲区结束。
有关如何配置RSocketInboundGateway
端点并处理下游有效负载。
RSocket 出站网关
这RSocketOutboundGateway
是一个AbstractReplyProducingMessageHandler
向 RSocket 执行请求,并根据 RSocket 应答(如果有)生成应答。
低级 RSocket 协议交互被委托到RSocketRequester
从提供的ClientRSocketConnector
或从RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
header 在服务器端的请求消息中。
目标RSocketRequester
可以从RSocketConnectedEvent
或使用ServerRSocketConnector.getClientRSocketRequester()
API 根据为连接请求映射选择的某些业务密钥,通过ServerRSocketConnector.setClientRSocketKeyStrategy()
.
看ServerRSocketConnector
JavaDocs 了解更多信息。
这route
要发送请求,必须显式配置(与路径变量一起)或通过根据请求消息评估的 SpEL 表达式。
RSocket 交互模型可以通过以下方式提供RSocketInteractionModel
选项或相应的表达式设置。
默认情况下,一个requestResponse
用于常见的网关用例。
当请求消息有效负载是Publisher
一个publisherElementType
选项可以根据RSocketStrategies
在目标中提供RSocketRequester
.
此选项的表达式可以计算为ParameterizedTypeReference
.
请参阅RSocketRequester.RequestSpec.data()
JavaDocs 了解有关数据及其类型的更多信息。
RSocket 请求也可以通过metadata
.
为此,一个metadataExpression
Against Request 消息可以在RSocketOutboundGateway
.
这样的表达式必须计算为Map<Object, MimeType>
.
什么时候interactionModel
莫fireAndForget
一expectedResponseType
必须提供。
这是一个String.class
默认情况下。
此选项的表达式可以计算为ParameterizedTypeReference
.
请参阅RSocketRequester.RetrieveSpec.retrieveMono()
和RSocketRequester.RetrieveSpec.retrieveFlux()
JavaDocs 了解有关回复数据及其类型的更多信息。
回复payload
从RSocketOutboundGateway
是一个Mono
(即使对于fireAndForget
交互模型 它是Mono<Void>
) 始终将此组件设置为async
.
这样的Mono
在生成到outputChannel
对于常规频道或由FluxMessageChannel
.
一个Flux
响应requestStream
或requestChannel
交互模型也包装在回复中Mono
.
它可以通过以下方式压平下游FluxMessageChannel
使用直通服务激活器:
@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}
或者在目标应用程序逻辑中显式订阅。
预期的响应类型也可以配置(或通过表达式计算)为void
将此网关视为出站通道适配器。
但是,outputChannel
仍然必须配置(即使它只是一个NullChannel
) 启动对返回的Mono
.
有关如何配置RSocketOutboundGateway
端点与下游有效负载的交易。
RSocket 命名空间支持
Spring Integration 提供了一个rsocket
命名空间和相应的架构定义。
要将其包含在配置中,请在应用程序上下文配置文件中添加以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>
入境
要使用 XML 配置 Spring Integration RSocket 入站通道适配器,您需要使用适当的inbound-gateway
组件中的int-rsocket
Namespace。
以下示例显示了如何配置它:
<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
一个ClientRSocketConnector
和ServerRSocketConnector
应配置为通用<bean>
定义。
出境
<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>
看spring-integration-rsocket.xsd
以获取所有这些 XML 属性的描述。
使用 Java 配置 RSocket 端点
以下示例演示如何使用 Java 配置 RSocket 入站终结点:
@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}
@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}
一个ClientRSocketConnector
或ServerRSocketConnector
在此配置中假定,其含义用于自动检测“回显”路径上的此类端点。
注意@Transformer
签名,对 RSocket 请求进行完全响应式处理并生成响应式回复。
以下示例显示如何使用 Java DSL 配置 RSocket 入站网关:
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlow
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}
一个ClientRSocketConnector
或ServerRSocketConnector
在此配置中假定,其含义是自动检测“/uppercase”路径上的此类端点,预期的交互模型为“请求通道”。
以下示例演示如何使用 Java 配置 RSocket 出站网关:
@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}
这setClientRSocketConnector()
仅客户端需要。
在服务器端,RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
标头,并带有RSocketRequester
value 必须在请求消息中提供。
以下示例显示如何使用 Java DSL 配置 RSocket 出站网关:
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlow
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
看IntegrationFlow
作为网关有关如何使用提及的详细信息Function
界面。