对于最新的稳定版本,请使用 Spring Framework 7.0.6!spring-doc.cadn.net.cn

RSocket

本节描述了Spring框架对RSocket协议的支持。spring-doc.cadn.net.cn

概述

RSocket 是一个应用程序协议,用于通过 TCP、WebSocket 和其他字节流传输进行多路复用和双向通信,使用以下交互模型之一:spring-doc.cadn.net.cn

一旦初始连接建立,客户端与服务器的区别就会消失,因为双方变得对称,每一方都可以发起上述交互之一。这就是为什么在协议调用中,参与的双方被称为“请求者”和“响应者”,而上述交互被称为“请求流”或简称为“请求”。spring-doc.cadn.net.cn

以下是RSocket协议的关键特性和优势:spring-doc.cadn.net.cn

  • 响应式流语义跨越网络边界——对于诸如Request-StreamChannel的流请求,背压信号在请求者和响应者之间传递,允许请求者在源头减慢响应者的速度,从而减少对网络层拥塞控制的依赖,以及在网络层或任何层级缓冲的需求。spring-doc.cadn.net.cn

  • 请求节流——此功能以“租赁”命名,源自LEASE帧,该帧可以从每一端发送以限制另一端在给定时间内允许的请求数量。租约会定期续订。spring-doc.cadn.net.cn

  • 会话恢复——这是为了解决连接中断而设计的,需要维护一些状态。状态管理对应用程序是透明的,并且与背压配合得很好,可以在可能的情况下停止生产者并减少所需的状态量。spring-doc.cadn.net.cn

  • 大消息的分段和重新组装。spring-doc.cadn.net.cn

  • 保活(心跳)。spring-doc.cadn.net.cn

RSocket 有多种语言的实现Java 库基于Project Reactor构建,并使用Reactor Netty进行传输。这意味着来自应用程序中的 Reactive Streams Publishers 的信号会透明地通过 RSocket 跨网络传播。spring-doc.cadn.net.cn

协议

RSocket 的一个优点是它在传输层有明确定义的行为,并且有一个易于阅读的规范以及一些协议扩展。因此,独立于语言实现和更高层次的框架API,阅读规范是一个好主意。本节提供了一个简洁的概述,以建立一些上下文。spring-doc.cadn.net.cn

最初,客户端通过某种低级别的流传输(如TCP或WebSocket)连接到服务器,并发送一个SETUP帧到服务器以设置连接参数。spring-doc.cadn.net.cn

服务器可能会拒绝SETUP帧,但通常在发送(对于客户端)和接收(对于服务器)之后,双方都可以开始发出请求,除非SETUP表示使用租赁语义来限制请求数量,在这种情况下,双方都必须等待来自另一端的LEASE帧才能允许发出请求。spring-doc.cadn.net.cn

一旦建立连接,双方都可以通过帧REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNELREQUEST_FNF之一发起请求。每个帧都携带一条从请求方到响应方的消息。spring-doc.cadn.net.cn

响应者可以返回 PAYLOAD 帧带有响应消息,并且在 REQUEST_CHANNEL 的情况下,请求者也可以发送 PAYLOAD 帧带有更多请求消息。spring-doc.cadn.net.cn

当一个请求涉及一系列消息(如Request-StreamChannel),响应者必须尊重请求者的需求数量信号。需求表示为消息的数量。初始需求在REQUEST_STREAMREQUEST_CHANNEL帧中指定。后续需求通过REQUEST_N帧发出。spring-doc.cadn.net.cn

每一方也可以通过 METADATA_PUSH 帧发送与任何单个请求无关的元数据通知,而是与整个连接相关。spring-doc.cadn.net.cn

RSocket 消息包含数据和元数据。元数据可以用于发送路由、安全Tokens等。数据和元数据可以以不同的格式进行格式化。每种的 MIME 类型在 SETUP 帧中声明,并适用于给定连接上的所有请求。spring-doc.cadn.net.cn

虽然所有消息都可以包含元数据,但通常像路由这样的元数据是按请求的,因此只包含在请求的第一个消息中,即带有帧之一REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNELREQUEST_FNF的消息。spring-doc.cadn.net.cn

协议扩展定义了应用程序中常用的元数据格式:spring-doc.cadn.net.cn

Java 实现

The Java实现 for RSocket is built on Project Reactor. The transports for TCP and WebSocket are built on Reactor Netty. As a Reactive Streams library, Reactor simplifies the job of implementing the protocol. For applications it is a natural fit to use Flux and Mono with declarative operators and transparent back pressure support。spring-doc.cadn.net.cn

RSocket Java 中的 API 是故意保持最小和基础的。它专注于协议功能,并将应用程序编程模型(例如 RPC 代码生成与其他方式)作为更高层次、独立的关注点。spring-doc.cadn.net.cn

核心契约 io.rsocket.RSocket 定义了四种请求交互类型,其中Mono表示单个消息的承诺,Flux表示消息流,io.rsocket.Payload表示实际的消息,并且可以访问数据和元数据作为字节缓冲区。RSocket契约对称使用。对于请求,应用程序被提供了一个RSocket来执行请求。对于响应,应用程序实现RSocket来处理请求。spring-doc.cadn.net.cn

这不是一个彻底的介绍。在大多数情况下,Spring应用程序将不需要直接使用其API。然而,独立于Spring查看或实验RSocket可能很重要。RSocket Java仓库包含了许多示例应用,演示了其API和协议特性。spring-doc.cadn.net.cn

Spring 支持

The spring-messaging 模块包含以下内容:spring-doc.cadn.net.cn

The spring-web 模块包含 EncoderDecoder 实现,如 Jackson CBOR/JSON 和 Protobuf,这些实现是 RSocket 应用程序可能需要的。它还包含可以用于高效路由匹配的 PathPatternParserspring-doc.cadn.net.cn

Spring Boot 2.2 支持通过 TCP 或 WebSocket 构建 RSocket 服务器,包括在 WebFlux 服务器中通过 WebSocket 暴露 RSocket 的选项。还有客户端支持和对 RSocketRequester.BuilderRSocketStrategies 的自动配置。 有关详细信息,请参阅 RSocket 部分 在 Spring Boot 参考文档中。spring-doc.cadn.net.cn

Spring Security 5.2 提供了 RSocket 支持。spring-doc.cadn.net.cn

Spring Integration 5.2 提供了与 RSocket 客户端和服务器交互的入站和出站网关。有关更多详细信息,请参阅 Spring Integration 参考手册。spring-doc.cadn.net.cn

Spring Cloud Gateway 支持 RSocket 连接。spring-doc.cadn.net.cn

RSocket请求器

RSocketRequester 提供了一个流畅的API来执行RSocket请求,接受和返回数据和元数据的对象而不是低级别的数据缓冲区。它可以对称使用,用于从客户端发出请求和从服务器发出请求。spring-doc.cadn.net.cn

客户端请求者

在客户端获取 RSocketRequester 是连接到服务器,这涉及发送一个带有连接设置的 RSocket SETUP 帧。RSocketRequester 提供了一个帮助准备 io.rsocket.core.RSocketConnector 的构建器,包括用于 SETUP 帧的连接设置。spring-doc.cadn.net.cn

这是使用默认设置连接的最基本方式:spring-doc.cadn.net.cn

RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);

URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)

URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)

上述不会立即连接。当发出请求时,会透明地建立并使用共享连接。spring-doc.cadn.net.cn

连接设置

RSocketRequester.Builder 提供以下选项来定制初始的 SETUP 窗口:spring-doc.cadn.net.cn

对于数据,默认的MIME类型是从第一个配置的Decoder派生的。对于元数据,默认的MIME类型是复合元数据,它允许每个请求中包含多个元数据值和MIME类型对。通常这两种都不需要更改。spring-doc.cadn.net.cn

数据和元数据在SETUP帧中是可选的。在服务器端,可以使用@ConnectMapping方法来处理连接的开始和SETUP帧的内容。元数据可用于连接级别的安全性。spring-doc.cadn.net.cn

策略

RSocketRequester.Builder 接受 RSocketStrategies 以配置请求者。 您将需要使用此功能来提供编码器和解码器,以便对数据和元数据值进行(反)序列化。 默认情况下,仅注册了来自 spring-core 的基本编解码器,用于 Stringbyte[]ByteBuffer。 添加 spring-web 可以提供更多可以按如下方式注册的编解码器:spring-doc.cadn.net.cn

RSocketStrategies strategies = RSocketStrategies.builder()
	.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
	.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
	.build();

RSocketRequester requester = RSocketRequester.builder()
	.rsocketStrategies(strategies)
	.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
		.encoders { it.add(Jackson2CborEncoder()) }
		.decoders { it.add(Jackson2CborDecoder()) }
		.build()

val requester = RSocketRequester.builder()
		.rsocketStrategies(strategies)
		.tcp("localhost", 7000)

RSocketStrategies 是为重用而设计的。在某些场景中,例如客户端和服务器在同一应用程序中,可能更倾向于在 Spring 配置中声明它。spring-doc.cadn.net.cn

客户端响应器

RSocketRequester.Builder 可以用于配置响应服务器请求的响应器。spring-doc.cadn.net.cn

您可以使用注解处理器来基于与服务器端相同的基础设施进行客户端响应,但需要通过以下方式编程注册:spring-doc.cadn.net.cn

RSocketStrategies strategies = RSocketStrategies.builder()
	.routeMatcher(new PathPatternRouteMatcher())  (1)
	.build();

SocketAcceptor responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(responder)) (3)
	.tcp("localhost", 7000);
1 使用 PathPatternRouteMatcher,如果 spring-web 存在,则用于高效的路由匹配。
2 从具有@MessageMapping和/或@ConnectMapping个方法的类创建响应器。
3 注册响应者。
val strategies = RSocketStrategies.builder()
		.routeMatcher(PathPatternRouteMatcher())  (1)
		.build()

val responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(responder) } (3)
		.tcp("localhost", 7000)
1 使用 PathPatternRouteMatcher,如果 spring-web 存在,则用于高效的路由匹配。
2 从具有@MessageMapping和/或@ConnectMapping个方法的类创建响应器。
3 注册响应者。

请注意,上述内容只是一个为客户端响应器进行程序化注册的快捷方式。对于其他场景,如果客户端响应器在Spring配置中,则仍然可以将RSocketMessageHandler声明为一个Spring bean,然后按照以下方式应用:spring-doc.cadn.net.cn

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(handler.responder()))
	.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(handler.responder()) }
		.tcp("localhost", 7000)

对于上述情况,您可能还需要在 RSocketMessageHandler 中使用 setHandlerPredicate 来切换到不同的客户端响应者检测策略,例如基于自定义注解 @RSocketClientResponder 而不是默认的 @Controller。这在客户端和服务器或同一应用程序中的多个客户端场景中是必要的。spring-doc.cadn.net.cn

另见 注解响应器,了解更多编程模型。spring-doc.cadn.net.cn

高级

RSocketRequesterBuilder 提供了一个回调来暴露底层的 io.rsocket.core.RSocketConnector 以提供更多配置选项,例如保活间隔、会话恢复、拦截器等。您可以按以下方式在该级别配置选项:spring-doc.cadn.net.cn

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> {
		// ...
	})
	.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
		.rsocketConnector {
			//...
		}
		.tcp("localhost", 7000)

服务器请求器

要从服务器向已连接的客户端发出请求,需要从服务器中获取该已连接客户端的请求者。spring-doc.cadn.net.cn

注解响应器 中,@ConnectMapping@MessageMapping 方法支持一个 RSocketRequester 参数。使用它来访问连接的请求者。请记住,@ConnectMapping 方法本质上是 SETUP 帧的处理器,必须在请求开始之前处理。因此,最开始的请求必须与处理分离。例如:spring-doc.cadn.net.cn

@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
	requester.route("status").data("5")
		.retrieveFlux(StatusReport.class)
		.subscribe(bar -> { (1)
			// ...
		});
	return ... (2)
}
1 异步启动请求,独立于处理。
2 执行处理并返回完成状态 Mono<Void>
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
	GlobalScope.launch {
		requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
			// ...
		}
	}
	/// ... (2)
}
1 异步启动请求,独立于处理。
2 在挂起函数中执行处理。

请求

一旦你有了一个客户端服务器请求者,你可以如下发出请求:spring-doc.cadn.net.cn

ViewBox viewBox = ... ;

Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlux(AirportLocation.class); (3)
1 指定要在请求消息的元数据中包含的路由。
2 为请求消息提供数据。
3 声明预期的响应。
val viewBox: ViewBox = ...

val locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlow<AirportLocation>() (3)
1 指定要在请求消息的元数据中包含的路由。
2 为请求消息提供数据。
3 声明预期的响应。

交互类型是根据输入和输出的基数隐式确定的。上面的例子是一个Request-Stream,因为发送了一个值并接收了一个值流。在大多数情况下,只要输入和输出的选择与RSocket交互类型以及响应者期望的输入和输出类型匹配,你就不需要考虑这一点。唯一一个无效组合的例子是多对一。spring-doc.cadn.net.cn

The data(Object) 方法还接受任何 Reactive Streams Publisher,包括 FluxMono,以及在 ReactiveAdapterRegistry 中注册的任何其他值生成器。对于多值 Publisher(如 Flux),如果它生成相同类型的值,请考虑使用重载的 data 方法以避免对每个元素进行类型检查和 Encoder 查找:spring-doc.cadn.net.cn

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

data(Object) 步是可选的。对于不发送数据的请求,可以跳过此步骤:spring-doc.cadn.net.cn

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
	.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait

val location = requester.route("find.radar.EWR")
	.retrieveAndAwait<AirportLocation>()

如果使用复合元数据(默认情况下)并且这些值被注册的Encoder支持,可以添加额外的元数据值。例如:spring-doc.cadn.net.cn

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");

Flux<AirportLocation> locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow

val requester: RSocketRequester = ...

val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")

val locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlow<AirportLocation>()

对于 Fire-and-Forget 使用返回 Mono<Void>send() 方法。请注意,Mono 只表示消息已成功发送,并不表示它已被处理。spring-doc.cadn.net.cn

对于 Metadata-Push 请使用 sendMetadata() 方法并返回 Mono<Void>spring-doc.cadn.net.cn

带注解的响应器

RSocket响应器可以实现为@MessageMapping@ConnectMapping方法。 @MessageMapping方法处理单个请求,而@ConnectMapping方法处理连接级别的事件(设置和元数据推送)。注解的响应器对称地支持,用于从服务器端响应和从客户端响应。spring-doc.cadn.net.cn

服务器响应器

要在服务器端使用注解响应器,请在您的Spring配置中添加RSocketMessageHandler以检测带有@Controller@MessageMapping方法的@ConnectMapping bean:spring-doc.cadn.net.cn

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.routeMatcher(new PathPatternRouteMatcher());
		return handler;
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		routeMatcher = PathPatternRouteMatcher()
	}
}

然后通过Java RSocket API启动一个RSocket服务器,并将RSocketMessageHandler作为响应器插入,如下所示:spring-doc.cadn.net.cn

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

CloseableChannel server =
	RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.block();
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val server = RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.awaitSingle()

RSocketMessageHandler 支持 复合路由 元数据,默认情况下。你可以设置它的 MetadataExtractor 如果你需要切换到不同的 MIME 类型或注册额外的元数据 MIME 类型。spring-doc.cadn.net.cn

您需要设置支持元数据和数据格式所需的EncoderDecoder实例。您可能需要spring-web模块来实现编解码器。spring-doc.cadn.net.cn

默认情况下使用 SimpleRouteMatcher 进行路由匹配。我们建议插入来自 spring-webPathPatternRouteMatcher 以实现高效的路由匹配。RSocket 路由可以是层次结构的,但不是 URL 路径。两个路由匹配器都默认配置为使用 "." 作为分隔符,并且没有像 HTTP URL 那样的 URL 解码。spring-doc.cadn.net.cn

RSocketMessageHandler 可以通过 RSocketStrategies 进行配置,这在需要在同一进程中共享客户端和服务器配置时可能很有用:spring-doc.cadn.net.cn

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.setRSocketStrategies(rsocketStrategies());
		return handler;
	}

	@Bean
	public RSocketStrategies rsocketStrategies() {
		return RSocketStrategies.builder()
			.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
			.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
			.routeMatcher(new PathPatternRouteMatcher())
			.build();
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		rSocketStrategies = rsocketStrategies()
	}

	@Bean
	fun rsocketStrategies() = RSocketStrategies.builder()
			.encoders { it.add(Jackson2CborEncoder()) }
			.decoders { it.add(Jackson2CborDecoder()) }
			.routeMatcher(PathPatternRouteMatcher())
			.build()
}

客户端响应器

客户端的注解响应器需要在RSocketRequester.Builder中进行配置。有关详细信息,请参阅客户端响应器spring-doc.cadn.net.cn

@MessageMapping

一旦服务器客户端响应配置就绪, @MessageMapping方法可以如下使用:spring-doc.cadn.net.cn

@Controller
public class RadarsController {

	@MessageMapping("locate.radars.within")
	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
@Controller
class RadarsController {

	@MessageMapping("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

上述 @MessageMapping 方法响应具有路由 "locate.radars.within" 的 Request-Stream 交互。它支持灵活的方法签名,并且可以选择使用以下方法参数:spring-doc.cadn.net.cn

方法参数 描述

@Payloadspring-doc.cadn.net.cn

请求的有效载荷。这可以是异步类型的具体值,如 MonoFluxspring-doc.cadn.net.cn

注意: 使用该注解是可选的。如果方法参数不是简单类型 并且不是其他支持的参数之一,则假定为预期的有效载荷。spring-doc.cadn.net.cn

RSocketRequesterspring-doc.cadn.net.cn

请求方用于向远程端发起请求。spring-doc.cadn.net.cn

@DestinationVariablespring-doc.cadn.net.cn

从路由中根据映射模式中的变量提取的值,例如: @MessageMapping("find.radar.{id}")spring-doc.cadn.net.cn

@Headerspring-doc.cadn.net.cn

Metadata value registered for extraction as described in MetadataExtractor.spring-doc.cadn.net.cn

@Headers Map<String, Object>spring-doc.cadn.net.cn

所有已注册用于提取的元数据值,如MetadataExtractor中所述。spring-doc.cadn.net.cn

返回值预期是一个或多个将被序列化为响应负载的对象。这可以是异步类型,如MonoFlux,一个具体值,或者void或一个无值的异步类型,如Mono<Void>spring-doc.cadn.net.cn

一个@MessageMapping方法支持的RSocket交互类型由输入(即@Payload个参数)和输出的基数决定,其中基数的含义如下:spring-doc.cadn.net.cn

基数 描述

1spring-doc.cadn.net.cn

要么是显式的值,要么是单值异步类型,例如Mono<T>spring-doc.cadn.net.cn

许多spring-doc.cadn.net.cn

一个具有多个值的异步类型,例如Flux<T>spring-doc.cadn.net.cn

0spring-doc.cadn.net.cn

这对于输入意味着该方法没有@Payload参数。spring-doc.cadn.net.cn

对于输出,这是 void 或一个无值的异步类型,例如 Mono<Void>spring-doc.cadn.net.cn

下表列出了所有输入和输出基数组合及其对应的交互类型:spring-doc.cadn.net.cn

输入基数 输出基数 交互类型

0, 1spring-doc.cadn.net.cn

0spring-doc.cadn.net.cn

发而忘,请求-响应spring-doc.cadn.net.cn

0, 1spring-doc.cadn.net.cn

1spring-doc.cadn.net.cn

Request-Responsespring-doc.cadn.net.cn

0, 1spring-doc.cadn.net.cn

许多spring-doc.cadn.net.cn

Request-Streamspring-doc.cadn.net.cn

许多spring-doc.cadn.net.cn

0,1,许多spring-doc.cadn.net.cn

Request-Channelspring-doc.cadn.net.cn

@ConnectMapping

@ConnectMapping 处理 RSocket 连接开始时的 SETUP 帧,以及 任何后续通过 METADATA_PUSH 帧推送的元数据通知,即 metadataPush(Payload)io.rsocket.RSocket 中。spring-doc.cadn.net.cn

@ConnectMapping 方法支持与 @MessageMapping 相同的参数,但基于 SETUPMETADATA_PUSH 帧中的元数据和数据。@ConnectMapping 可以有一个模式来缩小处理范围到 具有元数据中路由的特定连接,或者如果没有声明任何模式 则所有连接都匹配。spring-doc.cadn.net.cn

@ConnectMapping 方法不能返回数据,并且必须声明为以 voidMono<Void> 作为返回值。如果处理新连接时出现错误,则会拒绝该连接。处理过程中不应因为向 RSocketRequester 发送请求而被延迟。详情请参见 服务器请求者spring-doc.cadn.net.cn

元数据提取器

Responders 必须解释元数据。 复合元数据 允许独立格式化的元数据值(例如路由、安全、追踪)每个都有自己的 mime 类型。应用程序需要一种方式来配置支持的元数据 mime 类型,并且需要一种方式来访问提取的值。spring-doc.cadn.net.cn

MetadataExtractor 是一个契约,用于接收序列化的元数据并返回解码的名称-值对,这些名称-值对可以通过名称像头信息一样访问,例如通过 @Header 在注解处理器方法中。spring-doc.cadn.net.cn

DefaultMetadataExtractor 可以被赋予 Decoder 个实例来解码元数据。开箱即用,它内置支持 "message/x.rsocket.routing.v0",将其解码为 String 并保存在 "route" 键下。对于任何其他 MIME 类型,您需要提供一个 Decoder 并按如下方式注册 MIME 类型:spring-doc.cadn.net.cn

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")

Composite metadata 适用于组合独立的元数据值。但是,请求者可能不支持复合元数据,或者可能选择不使用它。为此,DefaultMetadataExtractor 可能需要自定义逻辑将解码值映射到输出映射。以下是一个使用 JSON 作为元数据的示例:spring-doc.cadn.net.cn

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
	MimeType.valueOf("application/vnd.myapp.metadata+json"),
	new ParameterizedTypeReference<Map<String,String>>() {},
	(jsonMap, outputMap) -> {
		outputMap.putAll(jsonMap);
	});
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
	outputMap.putAll(jsonMap)
}

在配置MetadataExtractorRSocketStrategies时,你可以让RSocketStrategies.Builder使用已配置的解码器创建提取器,并简单地使用回调自定义注册如下:spring-doc.cadn.net.cn

RSocketStrategies strategies = RSocketStrategies.builder()
	.metadataExtractorRegistry(registry -> {
		registry.metadataToExtract(fooMimeType, Foo.class, "foo");
		// ...
	})
	.build();
import org.springframework.messaging.rsocket.metadataToExtract

val strategies = RSocketStrategies.builder()
		.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
			registry.metadataToExtract<Foo>(fooMimeType, "foo")
			// ...
		}
		.build()

RSocket 接口

Spring 框架允许你将 RSocket 服务定义为一个 Java 接口,该接口包含用于 RSocket 交互的注解方法。然后你可以生成实现此接口并执行这些交互的代理。这有助于通过封装底层 RSocketRequester 的使用来简化 RSocket 远程访问。spring-doc.cadn.net.cn

首先,声明一个包含 @RSocketExchange 个方法的接口:spring-doc.cadn.net.cn

interface RadarService {

	@RSocketExchange("radars")
	Flux<AirportLocation> getRadars(@Payload MapRequest request);

	// more RSocket exchange methods...

}

二、创建一个执行已声明的 RSocket 交换的代理:spring-doc.cadn.net.cn

RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();

RadarService service = factory.createClient(RadarService.class);

方法参数

使用注解的 RSocket 交换方法支持灵活的方法签名,包含以下方法参数:spring-doc.cadn.net.cn

方法参数 描述

@DestinationVariablespring-doc.cadn.net.cn

添加一个路由变量,与来自 RSocketRequester 注解的路由一同传递给 @RSocketExchange,以扩展路由中的模板占位符。 该变量可以是字符串或任意对象,随后将通过 toString() 进行格式化。spring-doc.cadn.net.cn

@Payloadspring-doc.cadn.net.cn

设置请求的输入负载。这可以是一个具体的值,或任何可通过ReactiveAdapterRegistry适配为响应式流Publisher的值生成器。spring-doc.cadn.net.cn

Object,后跟 MimeTypespring-doc.cadn.net.cn

输入负载中元数据项的值。只要下一个参数是元数据项 MimeType,该值可以是任何 Object。该值可以是一个具体的值,或者任何能够通过 ReactiveAdapterRegistry 适配为响应式流 Publisher 的单个值的生产者。spring-doc.cadn.net.cn

MimeTypespring-doc.cadn.net.cn

元数据项的 MimeType。前面的方法参数应为元数据值。spring-doc.cadn.net.cn

返回值

带有注解的、RSocket交换方法支持返回具体值或任何可适应于 Reactive Streams Publisher 的值生产者,通过 ReactiveAdapterRegistry 实现适配。spring-doc.cadn.net.cn