对于最新稳定版本,请使用 Spring Framework 7.0.6spring-doc.cadn.net.cn

RSocket

本节介绍 Spring Framework 对 RSocket 协议的支持。spring-doc.cadn.net.cn

概述

RSocket 是一种应用层协议,支持通过 TCP、WebSocket 和其他字节流传输方式进行多路复用的双向通信,并采用以下交互模型之一:spring-doc.cadn.net.cn

一旦初始连接建立,“客户端”与“服务器端”的区别便不复存在,因为双方变得对称,每一方都可以发起上述交互之一。这也正是在协议调用中,参与双方被称为“请求方”(requester)和“响应方”(responder),而上述交互则被称为“请求流”(request streams)或简称为“请求”(requests)的原因。spring-doc.cadn.net.cn

这些是 RSocket 协议的主要特性和优势:spring-doc.cadn.net.cn

  • Reactive Streams 语义跨越网络边界——对于 Request-StreamChannel 等流式请求,背压(back pressure)信号在请求方与响应方之间传递,使请求方能够从源头减缓响应方的速度,从而减少对网络层拥塞控制的依赖,以及对网络层或任何层级缓冲的需求。spring-doc.cadn.net.cn

  • 请求限流——此功能名为“租约”(Leasing),名称源自LEASE帧,该帧可由任一端发送,用于在给定时间内限制另一端允许的请求总数。租约会定期续期。spring-doc.cadn.net.cn

  • 会话恢复(Session resumption)——这是为应对连接中断而设计的,需要维护一些状态。 状态管理对应用程序是透明的,并且能够很好地与背压(back pressure)机制结合使用: 当可能时,背压可以停止生产者,从而减少所需维护的状态量。spring-doc.cadn.net.cn

  • 大型消息的分片与重组。spring-doc.cadn.net.cn

  • 保持连接(心跳)。spring-doc.cadn.net.cn

RSocket 在多种语言中都有实现。其Java 库基于Project Reactor构建,并使用Reactor Netty作为传输层。这意味着应用程序中来自 Reactive Streams Publisher 的信号能够透明地通过 RSocket 在网络中传播。spring-doc.cadn.net.cn

协议

RSocket 的优势之一在于它在传输层具有明确定义的行为,并附带一份易于阅读的规范以及一些协议扩展。因此,无论使用何种语言的实现或高层框架 API,阅读该规范都是很有帮助的。本节将提供一个简明的概述,以建立必要的背景知识。spring-doc.cadn.net.cn

客户端最初通过某种底层流式传输协议(例如 TCP 或 WebSocket)连接到服务器,并向服务器发送一个 SETUP 帧以设置连接参数。spring-doc.cadn.net.cn

服务器可能会拒绝 SETUP 帧,但通常在该帧被发送(对于客户端)并被接收(对于服务器)之后,双方即可开始发送请求,除非 SETUP 帧表明使用了租赁语义来限制请求数量;在这种情况下,双方都必须等待从对方接收到一个 LEASE 帧,才能获准发送请求。spring-doc.cadn.net.cn

一旦连接建立,双方均可通过 REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNELREQUEST_FNF 其中一种帧发起请求。每种帧都会从请求方携带一条消息发送给响应方。spring-doc.cadn.net.cn

随后,响应方可以返回包含响应消息的 PAYLOAD 帧;在 REQUEST_CHANNEL 的情况下,请求方也可以发送包含更多请求消息的 PAYLOAD 帧。spring-doc.cadn.net.cn

当一个请求涉及消息流(例如 Request-StreamChannel)时, 响应方必须遵守请求方发出的需求信号。需求以消息数量的形式表示。 初始需求在 REQUEST_STREAMREQUEST_CHANNEL 帧中指定。 后续需求则通过 REQUEST_N 帧进行通知。spring-doc.cadn.net.cn

每一方还可以通过 METADATA_PUSH 帧发送元数据通知,这些通知不针对任何特定请求,而是与整个连接相关。spring-doc.cadn.net.cn

RSocket 消息包含数据和元数据。元数据可用于发送路由、安全Tokens等信息。数据和元数据可以采用不同的格式。各自的 MIME 类型在 SETUP 帧中声明,并适用于给定连接上的所有请求。spring-doc.cadn.net.cn

虽然所有消息都可以包含元数据,但诸如路由(route)之类的元数据通常是按请求(per-request)的, 因此仅包含在请求的第一个消息中,即使用以下帧类型之一: REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNELREQUEST_FNFspring-doc.cadn.net.cn

协议扩展定义了应用程序中使用的通用元数据格式:spring-doc.cadn.net.cn

Java 实现

RSocket 的 Java 实现 基于 Project Reactor 构建。TCP 和 WebSocket 传输层则 基于 Reactor Netty 构建。作为 Reactive Streams 库,Reactor 简化了协议的实现工作。对于应用程序而言,使用 FluxMono 配合声明式操作符以及透明的背压支持是一种自然而然的选择。spring-doc.cadn.net.cn

RSocket Java 中的 API 故意设计得简洁而基础。它专注于协议特性,而将应用程序编程模型(例如 RPC 代码生成或其他方式)作为更高层次、独立的关注点。spring-doc.cadn.net.cn

主要契约 io.rsocket.RSocket 通过 Mono 表示单条消息的承诺,Flux 表示消息流,以及 io.rsocket.Payload 表示实际消息(可访问数据和元数据的字节缓冲区),对四种请求交互类型进行了建模。RSocket 契约以对称方式使用。在发起请求时,应用程序会获得一个 RSocket 实例用于执行请求;在响应请求时,应用程序则需实现 RSocket 接口以处理请求。spring-doc.cadn.net.cn

这并不是一个详尽的入门介绍。在大多数情况下,Spring 应用程序无需直接使用其 API。然而,脱离 Spring 环境单独查看或试验 RSocket 可能是很有必要的。RSocket Java 代码仓库包含多个示例应用,用于演示其 API 和协议特性。spring-doc.cadn.net.cn

Spring 支持

spring-messaging 模块包含以下内容:spring-doc.cadn.net.cn

spring-web 模块包含 EncoderDecoder 的实现,例如 Jackson CBOR/JSON 和 Protobuf,这些通常是 RSocket 应用程序所需要的。该模块还包含 PathPatternParser,可用于高效地进行路由匹配。spring-doc.cadn.net.cn

Spring Boot 2.2 支持通过 TCP 或 WebSocket 启动一个 RSocket 服务器,包括在 WebFlux 服务器中通过 WebSocket 暴露 RSocket 的选项。此外,还提供了对 RSocketRequester.BuilderRSocketStrategies 的客户端支持和自动配置。 更多详细信息,请参阅 Spring Boot 参考文档中的 RSocket 章节spring-doc.cadn.net.cn

Spring Security 5.2 提供了对 RSocket 的支持。spring-doc.cadn.net.cn

Spring Integration 5.2 提供了入站和出站网关,用于与 RSocket 客户端和服务器进行交互。更多详细信息,请参阅《Spring Integration 参考手册》。spring-doc.cadn.net.cn

Spring Cloud Gateway 支持 RSocket 连接。spring-doc.cadn.net.cn

RSocket 请求器

RSocketRequester 提供了一个流畅的 API 来执行 RSocket 请求,它接受和返回用于数据与元数据的对象,而不是底层的数据缓冲区。它可以对称地使用,既可用于客户端发起请求,也可用于服务器端发起请求。spring-doc.cadn.net.cn

客户端请求器

在客户端获取 RSocketRequester 需要连接到服务器,该过程涉及发送一个包含连接设置的 RSocket SETUP 帧。RSocketRequester 提供了一个构建器,用于准备 io.rsocket.core.RSocketConnector,其中包括 SETUP 帧所需的连接设置。spring-doc.cadn.net.cn

这是使用默认设置进行连接的最基本方式:spring-doc.cadn.net.cn

RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);

URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)

URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)

上述操作不会立即建立连接。当发起请求时,会透明地建立并使用一个共享连接。spring-doc.cadn.net.cn

连接设置

RSocketRequester.Builder 提供了以下方法来自定义初始的 SETUP 帧:spring-doc.cadn.net.cn

对于数据,默认的 MIME 类型由第一个配置的 Decoder 决定。对于元数据,默认的 MIME 类型是 复合元数据(composite metadata),它允许每个请求包含多个元数据值和 MIME 类型对。通常情况下,这两者都不需要更改。spring-doc.cadn.net.cn

SETUP 帧中的数据和元数据是可选的。在服务器端,可以使用 @ConnectMapping 方法来处理连接的建立以及 SETUP 帧的内容。元数据可用于连接级别的安全控制。spring-doc.cadn.net.cn

策略

RSocketRequester.Builder 接收 RSocketStrategies 以配置请求器。 您需要使用它来提供用于数据和元数据值(反)序列化的编码器和解码器。 默认情况下,仅注册了来自 spring-core 的基本编解码器,用于处理 Stringbyte[]ByteBuffer。添加 spring-web 依赖后,即可使用更多编解码器, 可按如下方式注册:spring-doc.cadn.net.cn

RSocketStrategies strategies = RSocketStrategies.builder()
	.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
	.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
	.build();

RSocketRequester requester = RSocketRequester.builder()
	.rsocketStrategies(strategies)
	.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
		.encoders { it.add(Jackson2CborEncoder()) }
		.decoders { it.add(Jackson2CborDecoder()) }
		.build()

val requester = RSocketRequester.builder()
		.rsocketStrategies(strategies)
		.tcp("localhost", 7000)

RSocketStrategies 被设计为可重用的。在某些场景下,例如客户端和服务器位于同一个应用程序中,将其声明在 Spring 配置中可能是更优的选择。spring-doc.cadn.net.cn

客户端响应器

RSocketRequester.Builder 可用于配置对来自服务器的请求的响应。spring-doc.cadn.net.cn

你可以使用带注解的处理器,基于服务器端所用的相同基础设施来实现客户端响应,但需以编程方式注册,如下所示:spring-doc.cadn.net.cn

RSocketStrategies strategies = RSocketStrategies.builder()
	.routeMatcher(new PathPatternRouteMatcher())  (1)
	.build();

SocketAcceptor responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(responder)) (3)
	.tcp("localhost", 7000);
1 如果存在 PathPatternRouteMatcher,请使用 spring-web 进行高效的路由匹配。
2 从一个包含 @MessageMapping 和/或 @ConnectMapping 方法的类创建响应器。
3 注册响应器。
val strategies = RSocketStrategies.builder()
		.routeMatcher(PathPatternRouteMatcher())  (1)
		.build()

val responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(responder) } (3)
		.tcp("localhost", 7000)
1 如果存在 PathPatternRouteMatcher,请使用 spring-web 进行高效的路由匹配。
2 从一个包含 @MessageMapping 和/或 @ConnectMapping 方法的类创建响应器。
3 注册响应器。

请注意,上述内容仅为用于以编程方式注册客户端响应器的快捷方式。在其他场景中,如果客户端响应器已在 Spring 配置中定义,您仍然可以将 RSocketMessageHandler 声明为一个 Spring Bean,然后按如下方式应用:spring-doc.cadn.net.cn

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(handler.responder()))
	.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(handler.responder()) }
		.tcp("localhost", 7000)

对于上述情况,您可能还需要在 setHandlerPredicate 中使用 RSocketMessageHandler, 以切换到另一种用于检测客户端响应器(client responders)的策略,例如基于自定义注解 @RSocketClientResponder 而非默认的 @Controller。这在同一个应用程序中同时包含客户端与服务器, 或包含多个客户端的场景下是必要的。spring-doc.cadn.net.cn

另请参阅带注解的响应器,以了解更多关于该编程模型的信息。spring-doc.cadn.net.cn

高级

RSocketRequesterBuilder 提供了一个回调,用于暴露底层的 io.rsocket.core.RSocketConnector,以便进一步配置 keepalive 间隔、会话恢复、拦截器等选项。你可以在该层级按如下方式配置选项:spring-doc.cadn.net.cn

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> {
		// ...
	})
	.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
		.rsocketConnector {
			//...
		}
		.tcp("localhost", 7000)

服务器请求器

要从服务器向已连接的客户端发送请求,只需从服务器获取该已连接客户端的请求器即可。spring-doc.cadn.net.cn

带注解的响应器中,@ConnectMapping@MessageMapping 方法支持一个 RSocketRequester 参数。使用它可以访问该连接对应的请求方。请记住,@ConnectMapping 方法本质上是处理 SETUP 帧的处理器, 必须在开始处理请求之前完成处理。因此,在连接初始阶段发出的请求必须与处理逻辑解耦。例如:spring-doc.cadn.net.cn

@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
	requester.route("status").data("5")
		.retrieveFlux(StatusReport.class)
		.subscribe(bar -> { (1)
			// ...
		});
	return ... (2)
}
1 启动请求的异步处理,与请求处理过程相互独立。
2 执行处理并返回完成的 Mono<Void>
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
	GlobalScope.launch {
		requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
			// ...
		}
	}
	/// ... (2)
}
1 启动请求的异步处理,与请求处理过程相互独立。
2 在挂起函数中执行处理。

请求

一旦你有了一个客户端服务器端请求器,就可以按如下方式发起请求:spring-doc.cadn.net.cn

ViewBox viewBox = ... ;

Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlux(AirportLocation.class); (3)
1 指定一个路由,以包含在请求消息的元数据中。
2 为请求消息提供数据。
3 声明预期的响应。
val viewBox: ViewBox = ...

val locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlow<AirportLocation>() (3)
1 指定一个路由,以包含在请求消息的元数据中。
2 为请求消息提供数据。
3 声明预期的响应。

交互类型由输入和输出的基数(cardinality)隐式确定。上述示例属于Request-Stream(Request-Stream),因为发送一个值并接收一个值的流。在大多数情况下,只要输入和输出的选择与 RSocket 交互类型以及响应方所期望的输入和输出类型相匹配,你无需过多考虑这一点。唯一无效的组合是“多对一”(many-to-one)。spring-doc.cadn.net.cn

data(Object) 方法还接受任何 Reactive Streams Publisher,包括 FluxMono,以及在 ReactiveAdapterRegistry 中注册的任何其他值生产者。对于多值 Publisher(例如 Flux),如果其产生相同类型的值,建议使用其中一个重载的 data 方法,以避免对每个元素进行类型检查和 Encoder 查找:spring-doc.cadn.net.cn

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

data(Object) 步骤是可选的。对于不发送数据的请求,请跳过此步骤:spring-doc.cadn.net.cn

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
	.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait

val location = requester.route("find.radar.EWR")
	.retrieveAndAwait<AirportLocation>()

如果使用复合元数据(默认方式),并且所添加的值被已注册的Encoder所支持,则可以添加额外的元数据值。例如:spring-doc.cadn.net.cn

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");

Flux<AirportLocation> locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow

val requester: RSocketRequester = ...

val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")

val locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlow<AirportLocation>()

对于Fire-and-Forget场景,请使用返回send()Mono<Void>方法。请注意,该Mono仅表示消息已成功发送,并不表示消息已被处理。spring-doc.cadn.net.cn

对于 Metadata-Push,请使用返回值为 sendMetadata()Mono<Void> 方法。spring-doc.cadn.net.cn

带注解的响应器

RSocket 响应器可以实现为 @MessageMapping@ConnectMapping 方法。 @MessageMapping 方法用于处理单个请求,而 @ConnectMapping 方法用于处理 连接级别的事件(建立连接和元数据推送)。带注解的响应器在服务端响应和客户端响应中均对称支持。spring-doc.cadn.net.cn

服务器响应器

要在服务器端使用带注解的响应器,请在您的 Spring 配置中添加 RSocketMessageHandler,以自动检测带有 @Controller@MessageMapping 方法的 @ConnectMapping Bean:spring-doc.cadn.net.cn

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.routeMatcher(new PathPatternRouteMatcher());
		return handler;
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		routeMatcher = PathPatternRouteMatcher()
	}
}

然后通过 Java RSocket API 启动一个 RSocket 服务器,并按如下方式为响应端接入 RSocketMessageHandlerspring-doc.cadn.net.cn

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

CloseableChannel server =
	RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.block();
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val server = RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.awaitSingle()

RSocketMessageHandler 默认支持 复合(composite)路由(routing) 元数据。如果需要切换到其他 MIME 类型或注册额外的元数据 MIME 类型,可以设置其 MetadataExtractorspring-doc.cadn.net.cn

你需要设置所需的 EncoderDecoder 实例,以支持相应的元数据和数据格式。 你很可能需要 spring-web 模块来获取编解码器的实现。spring-doc.cadn.net.cn

默认情况下,使用 SimpleRouteMatcher 通过 AntPathMatcher 进行路由匹配。 我们建议接入来自 PathPatternRouteMatcherspring-web, 以实现高效的路由匹配。RSocket 路由可以是分层的,但并非 URL 路径。 这两种路由匹配器默认都配置为使用“.”作为分隔符,并且不像 HTTP URL 那样进行 URL 解码。spring-doc.cadn.net.cn

RSocketMessageHandler 可通过 RSocketStrategies 进行配置,如果您需要在同一进程中共享客户端和服务器之间的配置,这将非常有用:spring-doc.cadn.net.cn

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.setRSocketStrategies(rsocketStrategies());
		return handler;
	}

	@Bean
	public RSocketStrategies rsocketStrategies() {
		return RSocketStrategies.builder()
			.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
			.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
			.routeMatcher(new PathPatternRouteMatcher())
			.build();
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		rSocketStrategies = rsocketStrategies()
	}

	@Bean
	fun rsocketStrategies() = RSocketStrategies.builder()
			.encoders { it.add(Jackson2CborEncoder()) }
			.decoders { it.add(Jackson2CborDecoder()) }
			.routeMatcher(PathPatternRouteMatcher())
			.build()
}

客户端响应器

客户端的带注解的响应器需要在 RSocketRequester.Builder 中进行配置。详情请参见 客户端响应器spring-doc.cadn.net.cn

@MessageMapping

一旦配置好服务器客户端响应器, 就可以按如下方式使用@MessageMapping方法:spring-doc.cadn.net.cn

@Controller
public class RadarsController {

	@MessageMapping("locate.radars.within")
	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
@Controller
class RadarsController {

	@MessageMapping("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

上述 @MessageMapping 方法用于响应路由为 "locate.radars.within" 的请求-流(Request-Stream)交互。它支持灵活的方法签名,可选择使用以下方法参数:spring-doc.cadn.net.cn

方法参数 <description> </description>

@Payloadspring-doc.cadn.net.cn

请求的有效载荷。这可以是诸如 MonoFlux 等异步类型的具象值。spring-doc.cadn.net.cn

注意: 使用该注解是可选的。如果一个方法参数不是简单类型,也不属于其他受支持的参数类型,则默认将其视为预期的有效载荷(payload)。spring-doc.cadn.net.cn

RSocketRequesterspring-doc.cadn.net.cn

用于向远程端发起请求的请求器。spring-doc.cadn.net.cn

@DestinationVariablespring-doc.cadn.net.cn

根据映射模式中的变量从路由中提取的值,例如 @MessageMapping("find.radar.{id}")spring-doc.cadn.net.cn

@Headerspring-doc.cadn.net.cn

MetadataExtractor中所述,已注册用于提取的元数据值。spring-doc.cadn.net.cn

@Headers Map<String, Object>spring-doc.cadn.net.cn

所有按照MetadataExtractor中所述注册用于提取的元数据值。spring-doc.cadn.net.cn

返回值应为一个或多个将被序列化为响应负载(payloads)的对象。这可以是异步类型,例如 MonoFlux、一个具体值,或者为 void 或无值的异步类型,例如 Mono<Void>spring-doc.cadn.net.cn

@MessageMapping 方法所支持的 RSocket 交互类型由输入(即 @Payload 参数)和输出的基数(cardinality)决定,其中基数的含义如下:spring-doc.cadn.net.cn

基数 <description> </description>

1spring-doc.cadn.net.cn

可以是一个显式值,也可以是单值异步类型,例如 Mono<T>spring-doc.cadn.net.cn

许多spring-doc.cadn.net.cn

一种多值异步类型,例如 Flux<T>spring-doc.cadn.net.cn

0spring-doc.cadn.net.cn

对于输入而言,这意味着该方法没有 @Payload 参数。spring-doc.cadn.net.cn

对于输出,这是 void 或者无值的异步类型,例如 Mono<Void>spring-doc.cadn.net.cn

下表展示了所有输入和输出基数的组合及其对应的交互类型:spring-doc.cadn.net.cn

输入基数 输出基数 交互类型

0, 1spring-doc.cadn.net.cn

0spring-doc.cadn.net.cn

即发即忘、请求-响应spring-doc.cadn.net.cn

0, 1spring-doc.cadn.net.cn

1spring-doc.cadn.net.cn

Request-Responsespring-doc.cadn.net.cn

0, 1spring-doc.cadn.net.cn

许多spring-doc.cadn.net.cn

Request-Streamspring-doc.cadn.net.cn

许多spring-doc.cadn.net.cn

0、1、多个spring-doc.cadn.net.cn

Request-Channelspring-doc.cadn.net.cn

@ConnectMapping

@ConnectMapping 用于处理 RSocket 连接开始时的 SETUP 帧,以及后续通过 METADATA_PUSH 帧发送的任何元数据推送通知,即 metadataPush(Payload) 中的 io.rsocket.RSocketspring-doc.cadn.net.cn

@ConnectMapping 方法支持与 @MessageMapping 相同的参数,但这些参数基于 SETUPMETADATA_PUSH 帧中的元数据和数据。@ConnectMapping 可以包含一个模式(pattern),用于将处理限定到元数据中包含特定路由的连接;如果没有声明任何模式,则所有连接都会匹配。spring-doc.cadn.net.cn

@ConnectMapping 方法不能返回数据,其返回值必须声明为 voidMono<Void>。如果在处理新连接时发生错误,则该连接将被拒绝。处理过程中不得阻塞以向该连接的 RSocketRequester 发起请求。详情请参见 服务器端 Requesterspring-doc.cadn.net.cn

元数据提取器

响应方必须解析元数据。 复合元数据(Composite metadata)允许使用各自独立格式的元数据值(例如用于路由、安全、追踪等),每种元数据都有自己的 MIME 类型。应用程序需要一种方式来配置所支持的元数据 MIME 类型,以及一种方式来访问已提取的值。spring-doc.cadn.net.cn

MetadataExtractor 是一个契约,用于接收序列化的元数据并返回解码后的名称-值对,这些名称-值对随后可通过名称像访问消息头一样进行访问,例如在带注解的处理方法中通过 @Header 进行访问。spring-doc.cadn.net.cn

DefaultMetadataExtractor 可以接收 Decoder 实例来解码元数据。开箱即用,它内置支持 "message/x.rsocket.routing.v0", 会将其解码为 String 并保存在 "route" 键下。对于其他任何 MIME 类型,您需要提供一个 Decoder, 并按如下方式注册该 MIME 类型:spring-doc.cadn.net.cn

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")

复合元数据非常适合用于组合独立的元数据值。然而,请求方可能不支持复合元数据,或者可能选择不使用它。在这种情况下,DefaultMetadataExtractor 可能需要自定义逻辑,将解码后的值映射到输出的 Map 中。以下是一个使用 JSON 作为元数据的示例:spring-doc.cadn.net.cn

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
	MimeType.valueOf("application/vnd.myapp.metadata+json"),
	new ParameterizedTypeReference<Map<String,String>>() {},
	(jsonMap, outputMap) -> {
		outputMap.putAll(jsonMap);
	});
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
	outputMap.putAll(jsonMap)
}

通过 MetadataExtractor 配置 RSocketStrategies 时,可以让 RSocketStrategies.Builder 使用已配置的解码器创建提取器,并 只需使用回调来定制注册项,如下所示:spring-doc.cadn.net.cn

RSocketStrategies strategies = RSocketStrategies.builder()
	.metadataExtractorRegistry(registry -> {
		registry.metadataToExtract(fooMimeType, Foo.class, "foo");
		// ...
	})
	.build();
import org.springframework.messaging.rsocket.metadataToExtract

val strategies = RSocketStrategies.builder()
		.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
			registry.metadataToExtract<Foo>(fooMimeType, "foo")
			// ...
		}
		.build()

RSocket 接口

Spring 框架允许你将 RSocket 服务定义为一个带有注解方法的 Java 接口,用于处理 RSocket 交互。然后,你可以生成一个实现该接口并执行这些交互的代理。这种方式通过封装底层 RSocketRequester 的使用,简化了 RSocket 的远程调用。spring-doc.cadn.net.cn

首先,声明一个包含 @RSocketExchange 方法的接口:spring-doc.cadn.net.cn

interface RadarService {

	@RSocketExchange("radars")
	Flux<AirportLocation> getRadars(@Payload MapRequest request);

	// more RSocket exchange methods...

}

第二,创建一个代理,用于执行所声明的 RSocket 交互:spring-doc.cadn.net.cn

RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();

RadarService service = factory.createClient(RadarService.class);

方法参数

带注解的 RSocket 交换方法支持灵活的方法签名,可使用以下方法参数:spring-doc.cadn.net.cn

方法参数 <description> </description>

@DestinationVariablespring-doc.cadn.net.cn

添加一个路由变量,与 RSocketRequester 注解中指定的路由一起传递给 @RSocketExchange,以展开路由中的模板占位符。 该变量可以是 String 类型或任意 Object 类型,后者将通过 toString() 方法进行格式化。spring-doc.cadn.net.cn

@Payloadspring-doc.cadn.net.cn

设置请求的输入负载(payload)。这可以是一个具体值,也可以是任何可通过 Publisher 适配为 Reactive Streams ReactiveAdapterRegistry 的值生成器。spring-doc.cadn.net.cn

Object,如果后面跟有 MimeTypespring-doc.cadn.net.cn

输入负载中元数据条目的值。该值可以是任意 Object,只要下一个参数是该元数据条目的 MimeType 即可。该值可以是一个具体值,也可以是任何能生成单个值的生产者,只要它能通过 Publisher 适配为响应式流(Reactive Streams)的 ReactiveAdapterRegistryspring-doc.cadn.net.cn

MimeTypespring-doc.cadn.net.cn

元数据条目的 MimeType。前一个方法参数应为元数据值。spring-doc.cadn.net.cn

返回值

带注解的 RSocket 交换方法支持返回具体值,或任何可通过 Publisher 适配为 Reactive Streams ReactiveAdapterRegistry 的值生产者。spring-doc.cadn.net.cn