RSocket 支持
RSocket 支持
RSocket Spring 集成模块(Spring-积分-RSOCKET)允许执行RSocket应用协议。
你需要把这种依赖性纳入你的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>6.1.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:6.1.9"
该模块从5.2版本开始可用,基于Spring Messaging基础及其RSocket组件实现,例如:RSocketRequester,RSocketMessageHandler和Rocket策略.
有关RSocket协议、术语和组件的更多信息,请参见Spring Framework RSocket支持。
在通过通道适配器开始集成流程处理之前,我们需要在服务器和客户端之间建立RSocket连接。
为此,Spring Integration RSocket 支持提供了ServerRSocketConnector和ClientRSocketConnector该系统的实现摘要RSocket连接器.
这ServerRSocketConnector根据提供的,在主机和端口上暴露监听器io.rsocket.transport.ServerTransport因为接受客户的联系。
一个内部RSocketServer实例可以通过以下方式进行自定义setServerConfigurer()以及其他可配置的选项,例如:Rocket策略和模仿类型用于有效载荷数据和头部元数据。
当设置路由由客户端请求者提供(参见ClientRSocketConnector以下),连接的客户端存储为RSocketRequester在由 确定的密钥下clientRSocketKeyStrategy 双功能<Map<String,Object>,DataBuffer,Object>.
默认情况下,连接数据作为字符串的转换值,使用 UTF-8 字符集。
这样的RSocketRequester注册表可以在应用逻辑中确定与其交互的特定客户端连接,或向所有连接的客户端发布相同消息。
当客户端建立连接时,RSocketConnectedEvent由ServerRSocketConnector.
这与@ConnectMappingSpring 消息模块中的注释。
映射模式意味着接受所有客户端路由。
这*RSocketConnectedEvent可用于区分不同路线,具体如下DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER页眉。
典型的服务器配置可能如下:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}
@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}
所有选项,包括Rocket策略豆和@EventListener为RSocketConnectedEvent,是可选的。
看ServerRSocketConnector更多信息请参见JavaDocs。
从5.2.1版本开始,ServerRSocketMessageHandler被提取为公开的顶层类,以便可能与现有的RSocket服务器连接。
当ServerRSocketConnector是通过 的外部实例 提供ServerRSocketMessageHandler它不会在内部创建RSocket服务器,而是将所有处理逻辑委托给所提供的实例。
此外,ServerRSocketMessageHandler可以配置为messageMapping兼容还要处理旗帜@MessageMapping对于RSocket控制器,完全取代了标准所提供的功能RSocketMessageHandler.
这在经典配置中非常有用@MessageMapping方法与RSocket通道适配器一起存在,应用程序中也有一个外部配置的RSocket服务器。
这ClientRSocketConnector作为RSocketRequester基于RSocket通过提供的连接方式客户运输.
这RSocket连接器可以通过提供的RSocketConnectorConfigurer.
这设置路由(含可选模板变量)和setupData该组件还可以配置元数据。
典型的客户端配置可能如下:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}
大多数选项(包括Rocket策略豆子)是可选的。
注意我们如何通过任意端口连接到本地启动的RSocket服务器。
看ServerRSocketConnector.clientRSocketKeyStrategy为setupData使用场景。
另见ClientRSocketConnector以及其摘要RSocket连接器关于更多信息,请参考超类JavaDocs。
双ClientRSocketConnector和ServerRSocketConnector负责将入站信道适配器映射到其路径用于路由接收RSocket请求的配置。
更多信息请见下一节。
RSocket 入站网关
这RSocketInboundGateway负责接收RSocket请求并生成响应(如有)。
它需要一个数组路径映射可以是类似于 MVC 请求映射的模式,或者@MessageMapping语义学。
此外,(自版本5.2.2起)还有一组交互模型(参见RSocket交互模型)可以在RSocketInboundGateway通过特定帧类型限制RSocket请求至该端点。
默认情况下,所有互动模型都支持。
这样的豆子,根据它的说法IntegrationRSocketEndpoint实现(A的扩展)响应式消息处理器),被自动检测到,要么通过ServerRSocketConnector或ClientRSocketConnector对于内部的路由逻辑集成RSocketMessageHandler来处理请求。
一摘要RSocket连接器可以提供给RSocketInboundGateway用于显式端点注册。
这样,自动检测选项就会被禁用摘要RSocket连接器.
这Rocket策略也可以注入RSocketInboundGateway或者它们从提供的摘要RSocket连接器覆盖任何显性注入。
解码器来自以下Rocket策略根据以下条件解码请求有效载荷requestElementType.
如果RSocketPayloadReturnValueHandler.RESPONSE_HEADER接收中未提供报头消息这RSocketInboundGateway将请求视为火灾与遗忘RSocket交互模型。
在这种情况下,一个RSocketInboundGateway执行一个平行发送进入输出通道.
否则,单处理器取值RSocketPayloadReturnValueHandler.RESPONSE_HEADER头部用于向RSocket发送回复。
为此,一个RSocketInboundGateway执行发送与接收消息反应对输出通道.
这有效载荷在下游发送的消息中总是通量根据消息传递RSocket逻辑。
当火灾与遗忘RSocket交互模型中,消息的转换是有效载荷.
回复有效载荷可以是一个普通物体,也可以是发行人-这RSocketInboundGateway根据编码器提供的编码器,将两者正确转换为 RSocket 响应Rocket策略.
从5.3版本开始,adecodeFluxAsUnit选项(默认)false)被添加到RSocketInboundGateway.
默认情况下,输入通量其变换方式是每个事件分别被解码。
这正是目前存在的行为@MessageMapping语义学。
恢复之前的行为或解码整个过程通量根据应用需求,作为单一单元,decodeFluxAsUnit必须设置为true.
然而,目标译码逻辑依赖于译码器被选中,例如 a字符串解码器流中默认需要新的行分隔符来表示字节缓冲区的末端。
关于如何配置RSocketInboundGateway端点并处理下游的有效载荷。
RSocket 出站网关
这RSocketOutboundGateway是摘要回复制作消息处理器对RSocket执行请求并根据RSocket回复(如有)生成回复。
低层级RSocket协议交互被委派到以下RSocketRequester从提供的ClientRSocketConnector或者来自RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER请求消息中的头部。
一个目标RSocketRequester在服务器端,可以通过RSocketConnectedEvent或者使用ServerRSocketConnector.getClientRSocketRequester()根据某个业务密钥选择的API,用于连接请求映射,通过以下方式ServerRSocketConnector.setClientRSocketKeyStrategy().
看ServerRSocketConnector更多信息请参见JavaDocs。
这路线发送请求必须显式配置(包括路径变量),或通过针对请求消息进行评估的SpEL表达式进行配置。
RSocket交互模型可以通过以下方式提供RSocket交互模型选项或相应的表达设置。
默认情况下,请求回应用于常见的网关用例。
当请求消息载荷为发行人一个publisherElementType可以提供选项,根据Rocket策略目标中供应RSocketRequester.
该选项的表达式可以计算为参数化类型引用.
参见RSocketRequester.RequestSpec.data()关于数据及其类型的更多信息,请使用 JavaDocs。
RSocket请求也可以通过以下方式进行增强元数据.
为此,一个元数据表达式针对请求消息可以配置在RSocketOutboundGateway.
这样的表达式必须应值为Map<Object, MimeType>.
什么时候交互模型莫火灾与遗忘一expectedResponseType必须提供。
它是String.class默认。
该选项的表达式可以计算为参数化类型引用.
参见RSocketRequester.RetrieveSpec.retrieveMono()和RSocketRequester.RetrieveSpec.retrieveFlux()关于回复数据及其类型的更多信息,请使用 JavaDocs。
回复有效载荷来自RSocketOutboundGateway是单(即使是对火灾与遗忘那就是交互模型单<虚空>)总是将该分量设为异步.
这样的单在生产前已订阅输出通道对于普通频道或由流信息频道.
一个通量对请求流或请求频道交互模型也被包裹在回复中单.
它可以在下游通过流信息频道带有直通服务激活器:
@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}
或者明确订阅目标应用逻辑。
期望的响应类型也可以配置(或通过表达式评估)为无效将该网关视为出站通道适配器。
然而,输出通道仍然需要配置(即使只是一个零通道)以发起对退伍者协会的订阅单.
关于如何配置RSocketOutboundGateway端点A,处理下游有效载荷。
RSocket 命名空间支持
Spring 集成提供了RSOCKET命名空间及其相应的模式定义。
要将其包含在配置中,请在应用上下文配置文件中添加以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>
入境
要配置带有XML的Spring Integration RSocket入站通道适配器,你需要使用合适的入站网关来自国际-RSOCKETNamespace。
以下示例展示了如何配置:
<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
一个ClientRSocketConnector和ServerRSocketConnector应配置为通用<豆子>定义。
出境
<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>
看spring-integration-rsocket.xsd用于描述所有这些XML属性。
用 Java 配置 RSocket 端点
以下示例展示了如何用 Java 配置 RSocket 入站端点:
@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}
@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}
一个ClientRSocketConnector或ServerRSocketConnector在该配置中假设 ,其含义是自动检测“回声”路径上此类端点。
注意@Transformer签名,其对RSocket请求的完全响应式处理并生成响应式回复。
以下示例展示了如何用Java DSL配置RSocket入站网关:
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlow
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}
一个ClientRSocketConnector或ServerRSocketConnector在此配置中假设,其意义为自动检测“/大写”路径上的此类端点,预期交互模型为“请求通道”。
以下示例展示了如何用 Java 配置 RSocket 出站网关:
@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}
这setClientRSocketConnector()仅对客户端要求。
在服务器端,RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER带有RSocketRequester请求消息中必须提供值。
以下示例展示了如何配置使用 Java DSL 的 RSocket 出站网关:
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlow
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
看集成流程作为门户欲了解更多信息,如何使用上述功能界面位于上述流程的起点。