此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Framework 6.2.10! |
RS袜子
本节介绍 Spring Framework 对 RSocket 协议的支持。
概述
RSocket 是一种应用程序协议,用于通过 TCP、WebSocket 和其他字节流传输进行多路复用双工通信,使用以下交互之一 模型:
-
Request-Response
— 发送一条消息并回复一条。 -
Request-Stream
— 发送一条消息并接收一连串消息。 -
Channel
— 双向发送消息流。 -
Fire-and-Forget
— 发送单向消息。
一旦建立初始连接,“客户端”与“服务器”的区别就会丢失,因为双方变得对称,双方都可以发起上述交互之一。这就是为什么在协议中将参与方称为“请求者”和“响应者”而上述交互则称为“请求流”或简称为“请求”。
以下是 RSocket 协议的主要功能和优点:
-
跨网络边界的响应式流语义 - 用于流式请求,例如
Request-Stream
和Channel
,背压信号在请求者和响应者之间传输,允许请求者减慢响应者的速度源,从而减少对网络层拥塞控制的依赖,以及用于网络级别或任何级别的缓冲。 -
请求限制 — 此功能在
LEASE
框架可以从每一端发送,以限制另一端允许的请求总数在给定的时间内。租约会定期续订。 -
会话恢复 — 这是为失去连接而设计的,需要一些状态进行维护。状态管理对应用程序是透明的,并且效果很好结合背压,可以在可能的情况下停止生产者并减少所需的状态量。
-
大消息的碎片化和重组。
-
Keepalive(心跳)。
RSocket 有多种语言的实现。Java 库基于 Project Reactor 构建,和用于传输的 Reactor Netty。这意味着来自应用程序中反应流发布者的信号透明地传播通过网络中的 RSocket。
协议
连接
最初,客户端通过一些低级流传输(例如如 TCP 或 WebSocket)连接到服务器,并发送一个SETUP
frame 设置 连接。
服务器可能会拒绝SETUP
帧,但一般在发送后(对于客户端)和接收(对于服务器),双方都可以开始发出请求,除非SETUP
表示使用租赁语义来限制请求数量,在这种情况下双方都必须等待LEASE
框架从另一端允许提出请求。
提出请求
一旦建立连接,双方都可以通过其中一个
框架REQUEST_RESPONSE
,REQUEST_STREAM
,REQUEST_CHANNEL
或REQUEST_FNF
.每个
这些帧将一条消息从请求者传送到响应者。
然后,响应者可能会返回PAYLOAD
带有响应消息的帧,并且在
之REQUEST_CHANNEL
请求者还可以发送PAYLOAD
要求较多的框架
消息。
当请求涉及消息流时,例如Request-Stream
和Channel
,
响应方必须尊重来自请求方的请求信号。需求表示为
消息数。初始需求在REQUEST_STREAM
和REQUEST_CHANNEL
框架。后续需求通过REQUEST_N
框架。
每一方还可以通过METADATA_PUSH
框架,不
与任何个人请求有关,而是与整个连接有关。
消息格式
RSocket 消息包含数据和元数据。元数据可用于发送路由,一个
安全Tokens等。数据和元数据的格式可以不同。每个 Mime 类型
在SETUP
框架并应用于给定连接上的所有请求。
虽然所有消息都可以有元数据,但通常元数据(如路由)是按请求的
因此仅包含在请求的第一条消息中,即其中一个帧REQUEST_RESPONSE
,REQUEST_STREAM
,REQUEST_CHANNEL
或REQUEST_FNF
.
协议扩展定义了用于应用程序的通用元数据格式:
Java 实现
RSocket 的 Java 实现基于 Project Reactor 构建。TCP 和 WebSocket 的传输是
基于 Reactor Netty 构建。作为响应式流
库,Reactor 简化了实现协议的工作。对于应用,它是
使用起来很自然Flux
和Mono
带有声明性运算符和透明背面
压力支持。
RSocket Java 中的 API 是故意最小和基本的。它侧重于协议 功能,并将应用程序编程模型(例如,RPC codegen 与其他)保留为 更高层次,独立关注。
主合约 io.rsocket.RSocket 对四种请求交互类型进行建模Mono
表示
单消息,Flux
消息流,以及io.rsocket.Payload
实际的
消息,可以作为字节缓冲区访问数据和元数据。这RSocket
使用合约
对称。对于请求,应用程序会被赋予一个RSocket
执行
请求与。为了响应,应用程序实现RSocket
来处理请求。
这并不是一个彻底的介绍。在大多数情况下,Spring 应用程序 将不必直接使用其 API。但是,查看或尝试可能很重要 RSocket 独立于 Spring。RSocket Java 存储库包含许多示例应用程序,这些应用程序 演示其 API 和协议功能。
弹簧支撑
这spring-messaging
模块包含以下内容:
-
RSocketRequester — 用于发出请求的流畅 API 通过
io.rsocket.RSocket
具有数据和元数据编码/解码功能。 -
带注释的响应者 —
@MessageMapping
和@RSocketExchange
用于响应的带注释的处理程序方法。 -
RSocket 接口 — RSocket 服务声明 作为 Java 接口,具有
@RSocketExchange
方法,用作请求者或响应者。
这spring-web
模块包含Encoder
和Decoder
Jackson 等实现
RSocket 应用程序可能需要的 CBOR/JSON 和 Protobuf。它还包含PathPatternParser
可以插入以实现高效的路由匹配。
Spring Boot 2.2 支持通过 TCP 或 WebSocket 建立 RSocket 服务器,包括
在 WebFlux 服务器中通过 WebSocket 公开 RSocket 的选项。还有客户端
支持和自动配置RSocketRequester.Builder
和RSocketStrategies
.
有关更多详细信息,请参阅 Spring Boot 参考中的 RSocket 部分。
Spring Security 5.2 提供 RSocket 支持。
Spring Integration 5.2 提供入站和出站网关以与 RSocket 交互 客户端和服务器。有关更多详细信息,请参阅 Spring Integration 参考手册。
Spring Cloud Gateway 支持 RSocket 连接。
RSocket请求器
RSocketRequester
提供了一个流畅的 API 来执行 RSocket 请求,接受和
返回数据和元数据的对象,而不是低级数据缓冲区。它可以使用
对称地,从客户端发出请求,从服务器发出请求。
客户端请求者
要获取RSocketRequester
在客户端是连接到一个服务器,其中涉及
发送 RSocketSETUP
带有连接设置的框架。RSocketRequester
提供一个
构建器,有助于准备io.rsocket.core.RSocketConnector
包括连接
的设置SETUP
框架。
这是连接默认设置的最基本方法:
-
Java
-
Kotlin
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
以上不会立即连接。发出请求时,共享连接是 透明地建立和使用。
连接设置
RSocketRequester.Builder
提供以下内容来自定义初始SETUP
框架:
-
dataMimeType(MimeType)
- 设置连接上数据的 MIME 类型。 -
metadataMimeType(MimeType)
— 设置连接上元数据的 MIME 类型。 -
setupData(Object)
— 要包含在SETUP
. -
setupRoute(String, Object…)
— 路由以包含在元数据中的SETUP
. -
setupMetadata(Object, MimeType)
— 要包含在SETUP
.
对于数据,默认 MIME 类型派生自第一个配置的Decoder
. 为 metadata,默认 MIME 类型是复合元数据,它允许每个请求有多个metadata 值和 MIME 类型对。通常两者都不需要更改。
数据和元数据中的SETUP
frame 是可选的。在服务器端,可以使用@ConnectMapping方法来处理连接的开始和SETUP
框架。 元数据可能用于连接级别安全。
策略
RSocketRequester.Builder
接受RSocketStrategies
以配置请求者。您需要使用它来提供编码器和解码器,用于数据的 (反) 序列化和metadata 值。默认情况下,只有来自spring-core
为String
,byte[]
和ByteBuffer
已注册。 添加spring-web
提供对更多可以按如下方式注册的访问:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
RSocketStrategies
专为重用而设计。在某些情况下,例如,客户端和服务器在同一个应用程序中,最好在 Spring 配置中声明它。
客户端响应者
RSocketRequester.Builder
可用于配置响应者,以响应来自 服务器。
您可以使用带注释的处理程序进行客户端响应,基于相同的基础设施,但在服务器上使用,但以编程方式注册,如下所示:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher()) (1)
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder)) (3)
.tcp("localhost", 7000);
1 | 用PathPatternRouteMatcher 如果spring-web 存在,以实现高效的路由匹配。 |
2 | 从类创建响应者@MessageMapping 和/或@ConnectMapping 方法。 |
3 | 注册响应者。 |
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher()) (1)
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) } (3)
.tcp("localhost", 7000)
1 | 用PathPatternRouteMatcher 如果spring-web 存在,以实现高效的路由匹配。 |
2 | 从类创建响应者@MessageMapping 和/或@ConnectMapping 方法。 |
3 | 注册响应者。 |
注意,以上只是为客户端编程注册而设计的快捷方式 反应。 对于替代场景,客户端响应器处于 Spring 配置中,您仍然可以声明RSocketMessageHandler
作为 Spring bean,然后按如下方式应用:
-
Java
-
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
对于上述内容,您可能还需要使用setHandlerPredicate
在RSocketMessageHandler
自 切换到不同的策略来检测客户端响应者,例如,基于自定义注释,例如@RSocketClientResponder
与默认@Controller
. 这 在客户端和服务器的场景中,或多个客户端在同一个场景中是必需的 应用。
另请参阅带注释的响应器,了解有关编程模型的更多信息。
高深
RSocketRequesterBuilder
提供回调来公开底层io.rsocket.core.RSocketConnector
有关 keepalive 的更多配置选项间隔、会话恢复、拦截器等。您可以配置选项在该级别,如下所示:
-
Java
-
Kotlin
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}
.tcp("localhost", 7000)
服务器请求者
要从服务器向连接的客户端发出请求,只需从服务器获取请求者。
在带注释的响应器中,@ConnectMapping
和@MessageMapping
方法支持RSocketRequester
论点。 使用它来访问连接的请求者。保持请记住@ConnectMapping
方法本质上是SETUP
框架,其中必须在请求开始之前处理。因此,一开始的请求必须与处理分离。 例如:
-
Java
-
Kotlin
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { (1)
// ...
});
return ... (2)
}
1 | 异步启动请求,独立于处理。 |
2 | 执行处理和退货完成Mono<Void> . |
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
// ...
}
}
/// ... (2)
}
1 | 异步启动请求,独立于处理。 |
2 | 在挂起功能中执行处理。 |
请求
-
Java
-
Kotlin
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlux(AirportLocation.class); (3)
1 | 指定要包含在请求消息元数据中的路由。 |
2 | 为请求消息提供数据。 |
3 | 声明预期响应。 |
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlow<AirportLocation>() (3)
1 | 指定要包含在请求消息元数据中的路由。 |
2 | 为请求消息提供数据。 |
3 | 声明预期响应。 |
交互类型是根据输入的基数隐式确定的,并且
输出。上面的例子是一个Request-Stream
因为发送了一个值和一个流
的值被接收。在大多数情况下,只要
输入和输出的选择与 RSocket 交互类型以及输入和
响应者预期的输出。无效组合的唯一示例是多对一。
这data(Object)
方法也接受任何反应流Publisher
包括Flux
和Mono
,以及在ReactiveAdapterRegistry
.对于多值Publisher
如Flux
这会产生
相同类型的值,请考虑使用重载的data
避免有
类型检查和Encoder
查找每个元素:
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
这data(Object)
step 是可选的。对于不发送数据的请求,请跳过它:
-
Java
-
Kotlin
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
如果使用复合元数据(默认值),并且如果
值由注册的Encoder
.例如:
-
Java
-
Kotlin
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
为Fire-and-Forget
使用send()
返回Mono<Void>
.请注意,Mono
仅指示消息已成功发送,而不是已处理。
为Metadata-Push
使用sendMetadata()
方法与Mono<Void>
返回值。
带注释的响应者
RSocket 响应器可以实现为@MessageMapping
和@ConnectMapping
方法。@MessageMapping
方法处理单个请求,而@ConnectMapping
方法 处理
连接级事件(设置和元数据推送)。支持带注释的响应者
对称地,用于从服务器端响应和从客户端响应。
服务器响应器
要在服务器端使用带注释的响应器,请将RSocketMessageHandler
到你的Spring
要检测的配置@Controller
豆子与@MessageMapping
和@ConnectMapping
方法:
-
Java
-
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.routeMatcher(new PathPatternRouteMatcher());
return handler;
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
routeMatcher = PathPatternRouteMatcher()
}
}
然后通过 Java RSocket API 启动一个 RSocket 服务器,并插入RSocketMessageHandler
对于响应者,如下所示:
-
Java
-
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
CloseableChannel server =
RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.block();
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val server = RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.awaitSingle()
RSocketMessageHandler
默认支持复合和路由元数据。如果您需要切换到不同的 MIME 类型或注册其他元数据 MIME 类型,您可以设置其 MetadataExtractor。
您需要将Encoder
和Decoder
元数据和数据所需的实例格式。您可能需要spring-web
用于编解码器实现的模块。
默认情况下SimpleRouteMatcher
用于匹配路由AntPathMatcher
. 我们建议插入PathPatternRouteMatcher
从spring-web
为 高效的路由匹配。RSocket 路由可以是分层的,但不是 URL 路径。默认情况下,两个路由匹配器都配置为使用“.”作为分隔符,并且没有 URL与 HTTP URL 一样解码。
RSocketMessageHandler
可以通过RSocketStrategies
如果出现以下情况,这可能很有用:您需要在同一进程中在客户端和服务器之间共享配置:
-
Java
-
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.routeMatcher(new PathPatternRouteMatcher())
.build();
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
rSocketStrategies = rsocketStrategies()
}
@Bean
fun rsocketStrategies() = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.routeMatcher(PathPatternRouteMatcher())
.build()
}
客户端响应者
客户端的带注释的响应器需要在RSocketRequester.Builder
. 有关详细信息,请参阅客户端响应程序。
@MessageMapping
-
Java
-
Kotlin
@Controller
public class RadarsController {
@MessageMapping("locate.radars.within")
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
@Controller
class RadarsController {
@MessageMapping("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
以上@MessageMapping
方法响应具有
路由“locate.radars.within”。它支持灵活的方法签名,并可选择
使用以下方法参数:
方法参数 | 描述 |
---|---|
|
请求的有效负载。这可以是异步类型的具体值,例如 注意:注释的使用是可选的。不是简单类型的方法参数 并且不是任何其他受支持的参数,则假定为预期有效负载。 |
|
向远程端发出请求的请求者。 |
|
根据映射模式中的变量从路由中提取的值,例如 |
|
按照 MetadataExtractor 中所述注册为提取的元数据值。 |
|
注册提取的所有元数据值,如 MetadataExtractor 中所述。 |
返回值应是一个或多个要序列化为响应的对象
负载。这可以是异步类型,例如Mono
或Flux
、具体值或
也void
或无值异步类型,例如Mono<Void>
.
RSocket 交互类型,即@MessageMapping
方法支座由
输入的基数(即@Payload
参数)和输出,其中
基数表示以下内容:
基数 | 描述 |
---|---|
1 |
显式值或单值异步类型,例如 |
多 |
多值异步类型,例如 |
0 |
对于输入,这意味着该方法没有 对于输出,这是 |
下表显示了所有输入和输出基数组合以及相应的 交互类型:
输入基数 | 输出基数 | 交互类型 |
---|---|---|
0, 1 |
0 |
即发即忘,请求-响应 |
0, 1 |
1 |
请求-响应 |
0, 1 |
多 |
请求流 |
多 |
0、1、多 |
请求通道 |
@RSocketExchange
作为替代方案@MessageMapping
,您还可以使用@RSocketExchange
方法。此类方法在 RSocket 接口上声明,可以通过RSocketServiceProxyFactory
或由响应者实施。
例如,要以响应程序的身份处理请求:
-
Java
-
Kotlin
public interface RadarsService {
@RSocketExchange("locate.radars.within")
Flux<AirportLocation> radars(MapRequest request);
}
@Controller
public class RadarsController implements RadarsService {
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
interface RadarsService {
@RSocketExchange("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation>
}
@Controller
class RadarsController : RadarsService {
override fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
之间存在一些差异@RSocketExhange
和@MessageMapping
由于
前者需要保持适合请求者和响应者使用。例如,虽然@MessageMapping
可以声明为处理任意数量的路由,并且每个路由都可以
成为一种模式,@RSocketExchange
必须使用单个具体路由声明。有
与元数据相关的支持方法参数也存在细微差异,有关支持参数的列表,请参阅 @MessageMapping 和 RSocket 接口。
@RSocketExchange
可在类型级别用于为所有路由指定通用前缀
对于给定的 RSocket 服务接口。
@ConnectMapping
@ConnectMapping
处理SETUP
frame,以及 RSocket 连接开始时的帧,以及
任何后续元数据推送通知通过METADATA_PUSH
框架,即metadataPush(Payload)
在io.rsocket.RSocket
.
@ConnectMapping
方法支持与 @MessageMapping 相同的参数,但基于元数据和来自SETUP
和METADATA_PUSH
框架。@ConnectMapping
可以有一个模式来缩小处理范围
元数据中具有路由的特定连接,或者未声明任何模式
则所有连接都匹配。
@ConnectMapping
方法不能返回数据,必须使用void
或Mono<Void>
作为返回值。如果处理返回新
连接,则连接被拒绝。不得将处理搁置以使
请求RSocketRequester
用于连接。有关详细信息,请参阅服务器请求器。
元数据提取器
响应者必须解释元数据。复合元数据允许独立 格式化的元数据值(例如,用于路由、安全、跟踪),每个值都有自己的 MIME 类型。应用程序需要一种方法来配置要支持的元数据 MIME 类型,以及一种方法 以访问提取的值。
MetadataExtractor
是一个获取序列化元数据并返回解码的合约
name-value 对,然后可以像标头一样按名称访问,例如通过@Header
在带注释的处理程序方法中。
DefaultMetadataExtractor
可以给Decoder
实例来解码元数据。出
它内置了对“message/x.rsocket.routing.v0”的支持,它解码到String
并保存在“路由”键下。对于您需要提供的任何其他 MIME 类型
一个Decoder
并注册 MIME 类型,如下所示:
-
Java
-
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")
复合元数据可以很好地组合独立的元数据值。然而,
请求者可能不支持复合元数据,或者可能选择不使用它。为此,DefaultMetadataExtractor
可能需要自定义逻辑来将解码值映射到输出
地图。以下是将 JSON 用于元数据的示例:
-
Java
-
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
MimeType.valueOf("application/vnd.myapp.metadata+json"),
new ParameterizedTypeReference<Map<String,String>>() {},
(jsonMap, outputMap) -> {
outputMap.putAll(jsonMap);
});
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
outputMap.putAll(jsonMap)
}
配置时MetadataExtractor
通过RSocketStrategies
,你可以让RSocketStrategies.Builder
使用配置的解码器创建提取器,以及
只需使用回调即可自定义注册,如下所示:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.metadataExtractorRegistry(registry -> {
registry.metadataToExtract(fooMimeType, Foo.class, "foo");
// ...
})
.build();
import org.springframework.messaging.rsocket.metadataToExtract
val strategies = RSocketStrategies.builder()
.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
registry.metadataToExtract<Foo>(fooMimeType, "foo")
// ...
}
.build()
RSocket 接口
Spring Framework 允许您将 RSocket 服务定义为 Java 接口,其中包含@RSocketExchange
方法。您可以将这样的接口传递给RSocketServiceProxyFactory
创建一个代理,通过 RSocketRequester 执行请求。您还可以实现
接口作为处理请求的响应程序。
首先使用@RSocketExchange
方法:
interface RadarService {
@RSocketExchange("radars")
Flux<AirportLocation> getRadars(@Payload MapRequest request);
// more RSocket exchange methods...
}
现在,您可以创建一个代理,在调用方法时执行请求:
RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();
RadarService service = factory.createClient(RadarService.class);
您还可以实现该接口以作为响应者处理请求。 请参阅带注释的响应者。
方法参数
带注释的 RSocket 交换方法支持灵活的方法签名,具有以下功能 方法参数:
方法参数 | 描述 |
---|---|
|
添加要传递的路由变量 |
|
设置请求的输入有效负载。这可以是具体值,也可以是任何生产者
可以适应响应式流的值 |
|
输入有效负载中元数据条目的值。这可以是任何 |
|
这 |