|
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.4! |
|
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.4! |
RSocket Spring 集成模块 () 允许执行 RSocket 应用程序协议。spring-integration-rsocket
您需要将此依赖项包含在您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>6.3.5-SNAPSHOT</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:6.3.5-SNAPSHOT"
该模块从版本 5.2 开始提供,并且基于 Spring Messaging 基础及其 RSocket 组件实现,例如 , 和 。
有关 RSocket 协议、术语和组件的更多信息,请参阅 Spring Framework RSocket 支持。RSocketRequesterRSocketMessageHandlerRSocketStrategies
在通过通道适配器开始集成流处理之前,我们需要在服务器和客户端之间建立 RSocket 连接。
为此,Spring 集成 RSocket 支持提供了 .ServerRSocketConnectorClientRSocketConnectorAbstractRSocketConnector
根据提供的用于接受来自客户端的连接,在主机和端口上公开侦听器。
可以使用 自定义内部实例,也可以配置其他选项,例如 以及有效负载数据和标头元数据。
当客户端请求者提供 a 时(见下文),连接的客户端将作为 a 存储在由 确定的键下。
默认情况下,连接数据用于键,作为转换为具有 UTF-8 字符集的字符串的值。
这样的注册表可以在应用程序逻辑中使用,以确定特定的 Client 端连接,以便与它进行交互,或将相同的消息发布到所有连接的 Client 端。
从客户端建立连接时,将从 .
这类似于 Spring Messaging 模块中的 Comments 提供的内容。
映射模式表示接受所有客户端路由。
这可用于通过 header 区分不同的路由。ServerRSocketConnectorio.rsocket.transport.ServerTransportRSocketServersetServerConfigurer()RSocketStrategiesMimeTypesetupRouteClientRSocketConnectorRSocketRequesterclientRSocketKeyStrategyBiFunction<Map<String, Object>, DataBuffer, Object>RSocketRequesterRSocketConnectedEventServerRSocketConnector@ConnectMapping*RSocketConnectedEventDestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER
典型的服务器配置可能如下所示:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}
@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}
所有选项(包括 bean 和 for )都是可选的。
有关更多信息,请参阅 JavaDocs。RSocketStrategies@EventListenerRSocketConnectedEventServerRSocketConnector
从版本 5.2.1 开始,它被提取到一个公共的顶级类中,以便与现有 RSocket 服务器建立可能的连接。
当 a 提供 的外部实例时,它不会在内部创建 RSocket 服务器,而只是将所有处理逻辑委托给提供的实例。
此外,还可以配置一个标志来处理 RSocket 控制器,完全替换标准 .
这在混合配置中非常有用,当经典方法与 RSocket 通道适配器一起存在于同一应用程序中,并且应用程序中存在外部配置的 RSocket 服务器时。ServerRSocketMessageHandlerServerRSocketConnectorServerRSocketMessageHandlerServerRSocketMessageHandlermessageMappingCompatible@MessageMappingRSocketMessageHandler@MessageMapping
该用作基于连接的支架。
可以使用提供的 进行自定义。
的 (带有可选的 templates 变量) 和 with metadata 也可以在此组件上配置。ClientRSocketConnectorRSocketRequesterRSocketClientTransportRSocketConnectorRSocketConnectorConfigurersetupRoutesetupData
典型的客户端配置可能如下所示:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}
这些选项中的大多数(包括 bean)都是可选的。
请注意我们如何在任意端口上连接到本地启动的 RSocket 服务器。
有关使用案例,请参阅。
有关更多信息,另请参阅及其超类 JavaDocs。RSocketStrategiesServerRSocketConnector.clientRSocketKeyStrategysetupDataClientRSocketConnectorAbstractRSocketConnector
两者都负责将入站通道适配器映射到其配置,以路由传入的 RSocket 请求。
有关更多信息,请参阅下一节。ClientRSocketConnectorServerRSocketConnectorpath
RSocket 入站网关
负责接收 RSocket 请求并生成响应(如果有)。
它需要一个映射数组,该数组可以是类似于 MVC 请求映射或语义的模式。
此外,(从版本 5.2.2 开始),可以在 上配置一组交互模型(请参阅 ),以通过特定帧类型限制 RSocket 请求到此端点。
默认情况下,支持所有交互模型。
这样的 bean,根据其 implementation (extension of a ),由内部的 或 内部的路由逻辑自动检测,用于传入请求。
可以向 提供 for explicit endpoint registration.
这样,自动检测选项就会在该 上被禁用。
也可以注入到 中,或者它们是从提供的覆盖任何显式注入中获得的。
解码器用于根据提供的 .
如果在传入 中未提供标头,则会将请求视为 RSocket 交互模型。
在这种情况下,an 将对 执行普通操作。
否则,来自 Headers 的值用于向 RSocket 发送回复。
为此,an 对 执行操作。
要向下游发送的消息始终是 根据 logic 的。
在 RSocket 交互模型中,消息具有 plain converted .
回复可以是普通对象,也可以是 - 根据 中提供的编码器,将它们都正确地转换为 RSocket 响应。RSocketInboundGatewaypath@MessageMappingRSocketInteractionModelRSocketInboundGatewayIntegrationRSocketEndpointReactiveMessageHandlerServerRSocketConnectorClientRSocketConnectorIntegrationRSocketMessageHandlerAbstractRSocketConnectorRSocketInboundGatewayAbstractRSocketConnectorRSocketStrategiesRSocketInboundGatewayAbstractRSocketConnectorRSocketStrategiesrequestElementTypeRSocketPayloadReturnValueHandler.RESPONSE_HEADERMessageRSocketInboundGatewayfireAndForgetRSocketInboundGatewaysendoutputChannelMonoProcessorRSocketPayloadReturnValueHandler.RESPONSE_HEADERRSocketInboundGatewaysendAndReceiveMessageReactiveoutputChannelpayloadFluxMessagingRSocketfireAndForgetpayloadpayloadPublisherRSocketInboundGatewayRSocketStrategies
从版本 5.3 开始,将向 .
默认情况下,incoming 的转换方式与其每个事件单独解码的方式相同。
这是当前语义中存在的确切行为。
要根据应用程序要求恢复以前的行为或将整个解码为单个单元,必须将 设置为 。
但是,目标解码逻辑取决于所选逻辑,例如 a 需要在流中存在新的行分隔符(默认情况下)以指示字节缓冲区结束。decodeFluxAsUnitfalseRSocketInboundGatewayFlux@MessageMappingFluxdecodeFluxAsUnittrueDecoderStringDecoder
有关如何配置终端节点和处理下游负载的示例,请参阅使用 Java 配置 RSocket 终端节点。RSocketInboundGateway
RSocket 出站网关
用于在 RSocket 中执行请求,并根据 RSocket 回复(如果有)生成回复。
低级 RSocket 协议交互被委托给服务器端请求消息中提供的 Resolved 或 Headers。
服务器端的目标可以根据为 connect 请求映射选择的某些业务键,从 或使用 API 进行解析。
有关更多信息,请参阅 JavaDocs。RSocketOutboundGatewayAbstractReplyProducingMessageHandlerRSocketRequesterClientRSocketConnectorRSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADERRSocketRequesterRSocketConnectedEventServerRSocketConnector.getClientRSocketRequester()ServerRSocketConnector.setClientRSocketKeyStrategy()ServerRSocketConnector
要发送请求必须显式配置(与路径变量一起)或通过根据请求消息进行评估的 SPEL 表达式进行配置。route
RSocket 交互模型可以通过选项或相应的表达式设置来提供。
默认情况下,a 用于常见的网关使用案例。RSocketInteractionModelrequestResponse
当请求消息有效负荷为 a 时,可以提供一个选项,以根据 target 中提供的 an 对其元素进行编码。
此选项的表达式的计算结果可以为 .
有关数据及其类型的更多信息,请参阅 JavaDocs。PublisherpublisherElementTypeRSocketStrategiesRSocketRequesterParameterizedTypeReferenceRSocketRequester.RequestSpec.data()
RSocket 请求也可以使用 .
为此,可以在 .
这样的表达式的计算结果必须为 .metadatametadataExpressionRSocketOutboundGatewayMap<Object, MimeType>
When is not ,必须提供 an。
默认情况下,它是一个。
此选项的表达式的计算结果可以为 .
有关回复数据及其类型的更多信息,请参阅 和 JavaDocs。interactionModelfireAndForgetexpectedResponseTypeString.classParameterizedTypeReferenceRSocketRequester.RetrieveSpec.retrieveMono()RSocketRequester.RetrieveSpec.retrieveFlux()
来自 的回复 是一个 (即使对于交互模型也是如此) 始终使这个组件为 。
此类 a 在生成到常规频道之前订阅,或由 按需处理。
或 交互模型的响应也被包装到 reply 中。
它可以通过 直通服务激活器在下游展平:payloadRSocketOutboundGatewayMonofireAndForgetMono<Void>asyncMonooutputChannelFluxMessageChannelFluxrequestStreamrequestChannelMonoFluxMessageChannel
@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}
或在目标应用程序逻辑中显式订阅。
还可以配置预期的响应类型(或通过表达式评估)以将此网关视为出站通道适配器。
但是,仍然必须配置 (即使它只是一个 ) 以启动对返回的 .voidoutputChannelNullChannelMono
有关如何配置终端节点以处理下游有效负载的示例,请参阅使用 Java 配置 RSocket 终端节点。RSocketOutboundGateway
RSocket Namespace 支持
Spring 集成提供了一个名称空间和相应的模式定义。
要将其包含在您的配置中,请在您的应用程序上下文配置文件中添加以下命名空间声明:rsocket
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>
入境
要使用 XML 配置 Spring 集成 RSocket 入站通道适配器,您需要使用名称空间中的适当组件。
以下示例显示如何配置它:inbound-gatewayint-rsocket
<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
A 和 应配置为通用定义。ClientRSocketConnectorServerRSocketConnector<bean>
出境
<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>
有关所有这些 XML 属性的说明,请参见 。spring-integration-rsocket.xsd
使用 Java 配置 RSocket 端点
以下示例显示了如何使用 Java 配置 RSocket 入站端点:
@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}
@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}
在此配置中假定 A 或,其含义是在 “echo” 路径上自动检测此类端点。
请注意签名,因为它对 RSocket 请求的完全反应式处理并生成反应式回复。ClientRSocketConnectorServerRSocketConnector@Transformer
以下示例显示了如何使用 Java DSL 配置 RSocket 入站网关:
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlow
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}
在此配置中假定 A 或,其含义是在 “/uppercase” 路径上自动检测此类端点,并将预期的交互模型视为 “request channel”。ClientRSocketConnectorServerRSocketConnector
以下示例显示如何使用 Java 配置 RSocket 出站网关:
@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}
只有客户端才需要。
在服务器端,必须在请求消息中提供带有值的标头。setClientRSocketConnector()RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADERRSocketRequester
以下示例显示了如何使用 Java DSL 配置 RSocket 出站网关:
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlow
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
有关如何使用上述流程开头提到的接口的更多信息,请参阅 IntegrationFlow as a Gateway。Function