|
对于最新稳定版本,请使用 Spring Framework 7.0.6! |
RSocket
本节介绍 Spring Framework 对 RSocket 协议的支持。
概述
RSocket 是一种应用层协议,支持通过 TCP、WebSocket 和其他字节流传输方式进行多路复用的双向通信,并采用以下交互模型之一:
-
Request-Response—— 发送一条消息并接收一条返回。 -
Request-Stream— 发送一条消息并接收返回的消息流。 -
Channel— 在两个方向上发送消息流。 -
Fire-and-Forget—— 发送单向消息。
一旦初始连接建立,“客户端”与“服务器端”的区别便不复存在,因为双方变得对称,每一方都可以发起上述交互之一。这也正是在协议调用中,参与双方被称为“请求方”(requester)和“响应方”(responder),而上述交互则被称为“请求流”(request streams)或简称为“请求”(requests)的原因。
这些是 RSocket 协议的主要特性和优势:
-
Reactive Streams 语义跨越网络边界——对于
Request-Stream和Channel等流式请求,背压(back pressure)信号在请求方与响应方之间传递,使请求方能够从源头减缓响应方的速度,从而减少对网络层拥塞控制的依赖,以及对网络层或任何层级缓冲的需求。 -
请求限流——此功能名为“租约”(Leasing),名称源自
LEASE帧,该帧可由任一端发送,用于在给定时间内限制另一端允许的请求总数。租约会定期续期。 -
会话恢复(Session resumption)——这是为应对连接中断而设计的,需要维护一些状态。 状态管理对应用程序是透明的,并且能够很好地与背压(back pressure)机制结合使用: 当可能时,背压可以停止生产者,从而减少所需维护的状态量。
-
大型消息的分片与重组。
-
保持连接(心跳)。
RSocket 在多种语言中都有实现。其Java 库基于Project Reactor构建,并使用Reactor Netty作为传输层。这意味着应用程序中来自 Reactive Streams Publisher 的信号能够透明地通过 RSocket 在网络中传播。
协议
RSocket 的优势之一在于它在传输层具有明确定义的行为,并附带一份易于阅读的规范以及一些协议扩展。因此,无论使用何种语言的实现或高层框架 API,阅读该规范都是很有帮助的。本节将提供一个简明的概述,以建立必要的背景知识。
连接
客户端最初通过某种底层流式传输协议(例如 TCP 或 WebSocket)连接到服务器,并向服务器发送一个 SETUP 帧以设置连接参数。
服务器可能会拒绝 SETUP 帧,但通常在该帧被发送(对于客户端)并被接收(对于服务器)之后,双方即可开始发送请求,除非 SETUP 帧表明使用了租赁语义来限制请求数量;在这种情况下,双方都必须等待从对方接收到一个 LEASE 帧,才能获准发送请求。
发起请求
一旦连接建立,双方均可通过 REQUEST_RESPONSE、REQUEST_STREAM、REQUEST_CHANNEL 或 REQUEST_FNF 其中一种帧发起请求。每种帧都会从请求方携带一条消息发送给响应方。
随后,响应方可以返回包含响应消息的 PAYLOAD 帧;在 REQUEST_CHANNEL 的情况下,请求方也可以发送包含更多请求消息的 PAYLOAD 帧。
当一个请求涉及消息流(例如 Request-Stream 和 Channel)时,
响应方必须遵守请求方发出的需求信号。需求以消息数量的形式表示。
初始需求在 REQUEST_STREAM 和 REQUEST_CHANNEL 帧中指定。
后续需求则通过 REQUEST_N 帧进行通知。
每一方还可以通过 METADATA_PUSH 帧发送元数据通知,这些通知不针对任何特定请求,而是与整个连接相关。
消息格式
RSocket 消息包含数据和元数据。元数据可用于发送路由、安全Tokens等信息。数据和元数据可以采用不同的格式。各自的 MIME 类型在 SETUP 帧中声明,并适用于给定连接上的所有请求。
虽然所有消息都可以包含元数据,但诸如路由(route)之类的元数据通常是按请求(per-request)的,
因此仅包含在请求的第一个消息中,即使用以下帧类型之一:
REQUEST_RESPONSE、REQUEST_STREAM、REQUEST_CHANNEL 或 REQUEST_FNF。
协议扩展定义了应用程序中使用的通用元数据格式:
Java 实现
RSocket 的 Java 实现 基于
Project Reactor 构建。TCP 和 WebSocket 传输层则
基于 Reactor Netty 构建。作为 Reactive Streams
库,Reactor 简化了协议的实现工作。对于应用程序而言,使用 Flux 和 Mono 配合声明式操作符以及透明的背压支持是一种自然而然的选择。
RSocket Java 中的 API 故意设计得简洁而基础。它专注于协议特性,而将应用程序编程模型(例如 RPC 代码生成或其他方式)作为更高层次、独立的关注点。
主要契约
io.rsocket.RSocket
通过 Mono 表示单条消息的承诺,Flux 表示消息流,以及 io.rsocket.Payload 表示实际消息(可访问数据和元数据的字节缓冲区),对四种请求交互类型进行了建模。RSocket 契约以对称方式使用。在发起请求时,应用程序会获得一个 RSocket 实例用于执行请求;在响应请求时,应用程序则需实现 RSocket 接口以处理请求。
这并不是一个详尽的入门介绍。在大多数情况下,Spring 应用程序无需直接使用其 API。然而,脱离 Spring 环境单独查看或试验 RSocket 可能是很有必要的。RSocket Java 代码仓库包含多个示例应用,用于演示其 API 和协议特性。
Spring 支持
spring-messaging 模块包含以下内容:
-
RSocketRequester — 通过
io.rsocket.RSocket发起请求的流畅 API,支持数据和元数据的编码/解码。 -
带注解的响应器 — 用于响应的带有
@MessageMapping注解的处理方法。
spring-web 模块包含 Encoder 和 Decoder 的实现,例如 Jackson CBOR/JSON 和 Protobuf,这些通常是 RSocket 应用程序所需要的。该模块还包含 PathPatternParser,可用于高效地进行路由匹配。
Spring Boot 2.2 支持通过 TCP 或 WebSocket 启动一个 RSocket 服务器,包括在 WebFlux 服务器中通过 WebSocket 暴露 RSocket 的选项。此外,还提供了对 RSocketRequester.Builder 和 RSocketStrategies 的客户端支持和自动配置。
更多详细信息,请参阅 Spring Boot 参考文档中的
RSocket 章节。
Spring Security 5.2 提供了对 RSocket 的支持。
Spring Integration 5.2 提供了入站和出站网关,用于与 RSocket 客户端和服务器进行交互。更多详细信息,请参阅《Spring Integration 参考手册》。
Spring Cloud Gateway 支持 RSocket 连接。
RSocket 请求器
RSocketRequester 提供了一个流畅的 API 来执行 RSocket 请求,它接受和返回用于数据与元数据的对象,而不是底层的数据缓冲区。它可以对称地使用,既可用于客户端发起请求,也可用于服务器端发起请求。
客户端请求器
在客户端获取 RSocketRequester 需要连接到服务器,该过程涉及发送一个包含连接设置的 RSocket SETUP 帧。RSocketRequester 提供了一个构建器,用于准备 io.rsocket.core.RSocketConnector,其中包括 SETUP 帧所需的连接设置。
这是使用默认设置进行连接的最基本方式:
-
Java
-
Kotlin
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
上述操作不会立即建立连接。当发起请求时,会透明地建立并使用一个共享连接。
连接设置
RSocketRequester.Builder 提供了以下方法来自定义初始的 SETUP 帧:
-
dataMimeType(MimeType)— 设置连接上数据的 MIME 类型。 -
metadataMimeType(MimeType)— 设置连接上元数据的 MIME 类型。 -
setupData(Object)— 要包含在SETUP中的数据。 -
setupRoute(String, Object…)— 要在元数据中包含于SETUP中的路由。 -
setupMetadata(Object, MimeType)— 要在SETUP中包含的其他元数据。
对于数据,默认的 MIME 类型由第一个配置的 Decoder 决定。对于元数据,默认的 MIME 类型是
复合元数据(composite metadata),它允许每个请求包含多个元数据值和 MIME 类型对。通常情况下,这两者都不需要更改。
SETUP 帧中的数据和元数据是可选的。在服务器端,可以使用 @ConnectMapping 方法来处理连接的建立以及 SETUP 帧的内容。元数据可用于连接级别的安全控制。
策略
RSocketRequester.Builder 接收 RSocketStrategies 以配置请求器。
您需要使用它来提供用于数据和元数据值(反)序列化的编码器和解码器。
默认情况下,仅注册了来自 spring-core 的基本编解码器,用于处理 String、
byte[] 和 ByteBuffer。添加 spring-web 依赖后,即可使用更多编解码器,
可按如下方式注册:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
RSocketStrategies 被设计为可重用的。在某些场景下,例如客户端和服务器位于同一个应用程序中,将其声明在 Spring 配置中可能是更优的选择。
客户端响应器
RSocketRequester.Builder 可用于配置对来自服务器的请求的响应。
你可以使用带注解的处理器,基于服务器端所用的相同基础设施来实现客户端响应,但需以编程方式注册,如下所示:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher()) (1)
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder)) (3)
.tcp("localhost", 7000);
| 1 | 如果存在 PathPatternRouteMatcher,请使用 spring-web 进行高效的路由匹配。 |
| 2 | 从一个包含 @MessageMapping 和/或 @ConnectMapping 方法的类创建响应器。 |
| 3 | 注册响应器。 |
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher()) (1)
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) } (3)
.tcp("localhost", 7000)
| 1 | 如果存在 PathPatternRouteMatcher,请使用 spring-web 进行高效的路由匹配。 |
| 2 | 从一个包含 @MessageMapping 和/或 @ConnectMapping 方法的类创建响应器。 |
| 3 | 注册响应器。 |
请注意,上述内容仅为用于以编程方式注册客户端响应器的快捷方式。在其他场景中,如果客户端响应器已在 Spring 配置中定义,您仍然可以将 RSocketMessageHandler 声明为一个 Spring Bean,然后按如下方式应用:
-
Java
-
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
对于上述情况,您可能还需要在 setHandlerPredicate 中使用 RSocketMessageHandler,
以切换到另一种用于检测客户端响应器(client responders)的策略,例如基于自定义注解
@RSocketClientResponder 而非默认的 @Controller。这在同一个应用程序中同时包含客户端与服务器,
或包含多个客户端的场景下是必要的。
另请参阅带注解的响应器,以了解更多关于该编程模型的信息。
高级
RSocketRequesterBuilder 提供了一个回调,用于暴露底层的 io.rsocket.core.RSocketConnector,以便进一步配置 keepalive 间隔、会话恢复、拦截器等选项。你可以在该层级按如下方式配置选项:
-
Java
-
Kotlin
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}
.tcp("localhost", 7000)
服务器请求器
要从服务器向已连接的客户端发送请求,只需从服务器获取该已连接客户端的请求器即可。
在带注解的响应器中,@ConnectMapping 和 @MessageMapping 方法支持一个
RSocketRequester 参数。使用它可以访问该连接对应的请求方。请记住,@ConnectMapping 方法本质上是处理 SETUP 帧的处理器,
必须在开始处理请求之前完成处理。因此,在连接初始阶段发出的请求必须与处理逻辑解耦。例如:
-
Java
-
Kotlin
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { (1)
// ...
});
return ... (2)
}
| 1 | 启动请求的异步处理,与请求处理过程相互独立。 |
| 2 | 执行处理并返回完成的 Mono<Void>。 |
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
// ...
}
}
/// ... (2)
}
| 1 | 启动请求的异步处理,与请求处理过程相互独立。 |
| 2 | 在挂起函数中执行处理。 |
请求
-
Java
-
Kotlin
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlux(AirportLocation.class); (3)
| 1 | 指定一个路由,以包含在请求消息的元数据中。 |
| 2 | 为请求消息提供数据。 |
| 3 | 声明预期的响应。 |
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlow<AirportLocation>() (3)
| 1 | 指定一个路由,以包含在请求消息的元数据中。 |
| 2 | 为请求消息提供数据。 |
| 3 | 声明预期的响应。 |
交互类型由输入和输出的基数(cardinality)隐式确定。上述示例属于Request-Stream(Request-Stream),因为发送一个值并接收一个值的流。在大多数情况下,只要输入和输出的选择与 RSocket 交互类型以及响应方所期望的输入和输出类型相匹配,你无需过多考虑这一点。唯一无效的组合是“多对一”(many-to-one)。
data(Object) 方法还接受任何 Reactive Streams Publisher,包括
Flux 和 Mono,以及在 ReactiveAdapterRegistry 中注册的任何其他值生产者。对于多值 Publisher(例如 Flux),如果其产生相同类型的值,建议使用其中一个重载的 data 方法,以避免对每个元素进行类型检查和 Encoder 查找:
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
data(Object) 步骤是可选的。对于不发送数据的请求,请跳过此步骤:
-
Java
-
Kotlin
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
如果使用复合元数据(默认方式),并且所添加的值被已注册的Encoder所支持,则可以添加额外的元数据值。例如:
-
Java
-
Kotlin
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
对于Fire-and-Forget场景,请使用返回send()的Mono<Void>方法。请注意,该Mono仅表示消息已成功发送,并不表示消息已被处理。
对于 Metadata-Push,请使用返回值为 sendMetadata() 的 Mono<Void> 方法。
带注解的响应器
RSocket 响应器可以实现为 @MessageMapping 和 @ConnectMapping 方法。
@MessageMapping 方法用于处理单个请求,而 @ConnectMapping 方法用于处理
连接级别的事件(建立连接和元数据推送)。带注解的响应器在服务端响应和客户端响应中均对称支持。
服务器响应器
要在服务器端使用带注解的响应器,请在您的 Spring 配置中添加 RSocketMessageHandler,以自动检测带有 @Controller 和 @MessageMapping 方法的 @ConnectMapping Bean:
-
Java
-
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.routeMatcher(new PathPatternRouteMatcher());
return handler;
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
routeMatcher = PathPatternRouteMatcher()
}
}
然后通过 Java RSocket API 启动一个 RSocket 服务器,并按如下方式为响应端接入 RSocketMessageHandler:
-
Java
-
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
CloseableChannel server =
RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.block();
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val server = RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.awaitSingle()
RSocketMessageHandler 默认支持
复合(composite) 和
路由(routing) 元数据。如果需要切换到其他 MIME 类型或注册额外的元数据 MIME 类型,可以设置其
MetadataExtractor。
你需要设置所需的 Encoder 和 Decoder 实例,以支持相应的元数据和数据格式。
你很可能需要 spring-web 模块来获取编解码器的实现。
默认情况下,使用 SimpleRouteMatcher 通过 AntPathMatcher 进行路由匹配。
我们建议接入来自 PathPatternRouteMatcher 的 spring-web,
以实现高效的路由匹配。RSocket 路由可以是分层的,但并非 URL 路径。
这两种路由匹配器默认都配置为使用“.”作为分隔符,并且不像 HTTP URL 那样进行 URL 解码。
RSocketMessageHandler 可通过 RSocketStrategies 进行配置,如果您需要在同一进程中共享客户端和服务器之间的配置,这将非常有用:
-
Java
-
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.routeMatcher(new PathPatternRouteMatcher())
.build();
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
rSocketStrategies = rsocketStrategies()
}
@Bean
fun rsocketStrategies() = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.routeMatcher(PathPatternRouteMatcher())
.build()
}
客户端响应器
客户端的带注解的响应器需要在 RSocketRequester.Builder 中进行配置。详情请参见 客户端响应器。
@MessageMapping
-
Java
-
Kotlin
@Controller
public class RadarsController {
@MessageMapping("locate.radars.within")
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
@Controller
class RadarsController {
@MessageMapping("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
上述 @MessageMapping 方法用于响应路由为 "locate.radars.within" 的请求-流(Request-Stream)交互。它支持灵活的方法签名,可选择使用以下方法参数:
| 方法参数 | <description> </description> |
|---|---|
|
请求的有效载荷。这可以是诸如 注意: 使用该注解是可选的。如果一个方法参数不是简单类型,也不属于其他受支持的参数类型,则默认将其视为预期的有效载荷(payload)。 |
|
用于向远程端发起请求的请求器。 |
|
根据映射模式中的变量从路由中提取的值,例如
|
|
如MetadataExtractor中所述,已注册用于提取的元数据值。 |
|
所有按照MetadataExtractor中所述注册用于提取的元数据值。 |
返回值应为一个或多个将被序列化为响应负载(payloads)的对象。这可以是异步类型,例如 Mono 或 Flux、一个具体值,或者为 void 或无值的异步类型,例如 Mono<Void>。
@MessageMapping 方法所支持的 RSocket 交互类型由输入(即 @Payload 参数)和输出的基数(cardinality)决定,其中基数的含义如下:
| 基数 | <description> </description> |
|---|---|
1 |
可以是一个显式值,也可以是单值异步类型,例如 |
许多 |
一种多值异步类型,例如 |
0 |
对于输入而言,这意味着该方法没有 对于输出,这是 |
下表展示了所有输入和输出基数的组合及其对应的交互类型:
| 输入基数 | 输出基数 | 交互类型 |
|---|---|---|
0, 1 |
0 |
即发即忘、请求-响应 |
0, 1 |
1 |
Request-Response |
0, 1 |
许多 |
Request-Stream |
许多 |
0、1、多个 |
Request-Channel |
@ConnectMapping
@ConnectMapping 用于处理 RSocket 连接开始时的 SETUP 帧,以及后续通过 METADATA_PUSH 帧发送的任何元数据推送通知,即 metadataPush(Payload) 中的 io.rsocket.RSocket。
@ConnectMapping 方法支持与 @MessageMapping 相同的参数,但这些参数基于 SETUP 和 METADATA_PUSH 帧中的元数据和数据。@ConnectMapping 可以包含一个模式(pattern),用于将处理限定到元数据中包含特定路由的连接;如果没有声明任何模式,则所有连接都会匹配。
@ConnectMapping 方法不能返回数据,其返回值必须声明为 void 或
Mono<Void>。如果在处理新连接时发生错误,则该连接将被拒绝。处理过程中不得阻塞以向该连接的 RSocketRequester 发起请求。详情请参见
服务器端 Requester。
元数据提取器
响应方必须解析元数据。 复合元数据(Composite metadata)允许使用各自独立格式的元数据值(例如用于路由、安全、追踪等),每种元数据都有自己的 MIME 类型。应用程序需要一种方式来配置所支持的元数据 MIME 类型,以及一种方式来访问已提取的值。
MetadataExtractor 是一个契约,用于接收序列化的元数据并返回解码后的名称-值对,这些名称-值对随后可通过名称像访问消息头一样进行访问,例如在带注解的处理方法中通过 @Header 进行访问。
DefaultMetadataExtractor 可以接收 Decoder 实例来解码元数据。开箱即用,它内置支持
"message/x.rsocket.routing.v0",
会将其解码为 String 并保存在 "route" 键下。对于其他任何 MIME 类型,您需要提供一个 Decoder,
并按如下方式注册该 MIME 类型:
-
Java
-
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")
复合元数据非常适合用于组合独立的元数据值。然而,请求方可能不支持复合元数据,或者可能选择不使用它。在这种情况下,DefaultMetadataExtractor 可能需要自定义逻辑,将解码后的值映射到输出的 Map 中。以下是一个使用 JSON 作为元数据的示例:
-
Java
-
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
MimeType.valueOf("application/vnd.myapp.metadata+json"),
new ParameterizedTypeReference<Map<String,String>>() {},
(jsonMap, outputMap) -> {
outputMap.putAll(jsonMap);
});
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
outputMap.putAll(jsonMap)
}
通过 MetadataExtractor 配置 RSocketStrategies 时,可以让
RSocketStrategies.Builder 使用已配置的解码器创建提取器,并
只需使用回调来定制注册项,如下所示:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.metadataExtractorRegistry(registry -> {
registry.metadataToExtract(fooMimeType, Foo.class, "foo");
// ...
})
.build();
import org.springframework.messaging.rsocket.metadataToExtract
val strategies = RSocketStrategies.builder()
.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
registry.metadataToExtract<Foo>(fooMimeType, "foo")
// ...
}
.build()
RSocket 接口
Spring 框架允许你将 RSocket 服务定义为一个带有注解方法的 Java 接口,用于处理 RSocket 交互。然后,你可以生成一个实现该接口并执行这些交互的代理。这种方式通过封装底层 RSocketRequester 的使用,简化了 RSocket 的远程调用。
首先,声明一个包含 @RSocketExchange 方法的接口:
interface RadarService {
@RSocketExchange("radars")
Flux<AirportLocation> getRadars(@Payload MapRequest request);
// more RSocket exchange methods...
}
第二,创建一个代理,用于执行所声明的 RSocket 交互:
RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();
RadarService service = factory.createClient(RadarService.class);
方法参数
带注解的 RSocket 交换方法支持灵活的方法签名,可使用以下方法参数:
| 方法参数 | <description> </description> |
|---|---|
|
添加一个路由变量,与 |
|
设置请求的输入负载(payload)。这可以是一个具体值,也可以是任何可通过 |
|
输入负载中元数据条目的值。该值可以是任意 |
|
元数据条目的 |