|
对于最新稳定版本,请使用 Spring Framework 7.0.6! |
WebSockets
本参考文档的这一部分涵盖了对响应式栈(reactive-stack)WebSocket 消息传递的支持。
WebSocket 介绍
WebSocket 协议(RFC 6455)提供了一种标准化的方式,可在客户端与服务器之间通过单一 TCP 连接建立全双工、双向通信通道。它是一种不同于 HTTP 的 TCP 协议,但被设计为可在 HTTP 上运行,使用 80 和 443 端口,并允许复用现有的防火墙规则。
WebSocket 交互始于一个使用 HTTP Upgrade 头的 HTTP 请求,
以升级(或在此情况下切换)到 WebSocket 协议。以下示例
展示了此类交互:
GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
| 1 | Upgrade 头部。 |
| 2 | 使用 Upgrade 连接。 |
与通常的 200 状态码不同,支持 WebSocket 的服务器会返回类似如下的输出:
HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
| 1 | 协议切换 |
握手成功后,HTTP 升级请求底层的 TCP 套接字将保持打开状态,供客户端和服务器继续发送和接收消息。
本文档的范围不包括对 WebSocket 工作原理的完整介绍。 有关详细信息,请参阅 RFC 6455、HTML5 中的 WebSocket 章节,或网上众多的入门指南和教程。
请注意,如果 WebSocket 服务器运行在 Web 服务器(例如 nginx)之后,您很可能需要配置该 Web 服务器,使其将 WebSocket 升级请求转发给 WebSocket 服务器。同样,如果应用程序运行在云环境中,请查阅云服务提供商关于 WebSocket 支持的相关说明。
HTTP 与 WebSocket
尽管 WebSocket 被设计为与 HTTP 兼容,并以 HTTP 请求开始, 但理解这两种协议会导致截然不同的架构和应用程序编程模型非常重要。
在 HTTP 和 REST 中,应用程序被建模为多个 URL。客户端通过访问这些 URL,以请求-响应的方式与应用程序进行交互。服务器根据 HTTP URL、方法和请求头,将请求路由到相应的处理器。
相比之下,在 WebSocket 中,通常只有一个用于初始连接的 URL。 随后,所有应用消息都通过该相同的 TCP 连接进行传输。这指向了一种完全不同的异步、事件驱动的消息架构。
WebSocket 也是一种低层传输协议,与 HTTP 不同,它不对消息内容规定任何语义。这意味着除非客户端和服务器就消息的语义达成一致,否则无法对消息进行路由或处理。
WebSocket 客户端和服务器可以通过 HTTP 握手请求中的 Sec-WebSocket-Protocol 头协商使用更高层的消息协议(例如 STOMP)。如果没有该头信息,它们就需要自行约定通信规范。
何时使用 WebSocket
WebSocket 可以使网页变得动态且具有交互性。然而,在许多情况下,结合使用 Ajax 与 HTTP 流式传输或长轮询即可提供一种简单而有效的解决方案。
例如,新闻、邮件和社交动态需要动态更新,但每隔几分钟更新一次可能就完全足够了。而协作应用、游戏和金融类应用则需要更接近实时的更新。
仅延迟本身并不是一个决定性因素。如果消息量相对较低(例如,监控网络故障),HTTP 流式传输或轮询可以提供一种有效的解决方案。 只有在低延迟、高频率和高吞吐量三者结合的情况下,才最能体现使用 WebSocket 的优势。
还需注意的是,在互联网上,您无法控制的限制性代理可能会阻止 WebSocket 通信,原因可能是它们未配置为传递 Upgrade 头部,或者因为它们会关闭看似空闲的长连接。这意味着,对于防火墙内部的应用程序而言,使用 WebSocket 是一个更为直接明确的选择,而对于面向公众的应用程序则不然。
WebSocket API
Spring Framework 提供了一个 WebSocket API,可用于编写处理 WebSocket 消息的客户端和服务器端应用程序。
服务器
要创建一个 WebSocket 服务器,您可以首先创建一个 WebSocketHandler。
以下示例展示了如何实现:
-
Java
-
Kotlin
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
class MyWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
// ...
}
}
然后你可以将其映射到一个 URL:
-
Java
-
Kotlin
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(map, order);
}
}
@Configuration
class WebConfig {
@Bean
fun handlerMapping(): HandlerMapping {
val map = mapOf("/path" to MyWebSocketHandler())
val order = -1 // before annotated controllers
return SimpleUrlHandlerMapping(map, order)
}
}
如果使用了WebFlux 配置,则无需进行其他操作;否则,如果不使用 WebFlux 配置,则需要声明一个 WebSocketHandlerAdapter,如下所示:
-
Java
-
Kotlin
@Configuration
class WebConfig {
// ...
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
@Configuration
class WebConfig {
// ...
@Bean
fun handlerAdapter() = WebSocketHandlerAdapter()
}
WebSocketHandler
handle 的 WebSocketHandler 方法接收一个 WebSocketSession 参数,并返回 Mono<Void>,
以指示应用程序对该会话的处理何时完成。该会话通过两个流进行处理,一个用于入站消息,另一个用于出站消息。下表
描述了用于处理这两个流的两个方法:
WebSocketSession 方法 |
<description> </description> |
|---|---|
|
提供对入站消息流的访问,并在连接关闭时完成。 |
|
接收一个用于传出消息的源,写入这些消息,并返回一个 |
WebSocketHandler 必须将入站流和出站流组合成一个统一的数据流,并返回一个 Mono<Void>,以反映该数据流的完成状态。根据应用程序的需求,当满足以下条件时,该统一数据流即告完成:
-
入站或出站消息流完成。
-
入站流已完成(即连接已关闭),而出站流是无限的。
-
在选定的某个时间点,通过
close的WebSocketSession方法。
当入站和出站消息流组合在一起时,无需检查连接是否处于打开状态,因为响应式流(Reactive Streams)的信号会终止活动。 入站流会收到完成或错误信号,而出站流会收到取消信号。
处理器最基本的实现是处理入站流的实现。以下示例展示了此类实现:
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive() (1)
.doOnNext(message -> {
// ... (2)
})
.concatMap(message -> {
// ... (3)
})
.then(); (4)
}
}
| 1 | 访问入站消息流。 |
| 2 | 对每条消息执行某些操作。 |
| 3 | 执行使用消息内容的嵌套异步操作。 |
| 4 | 返回一个 Mono<Void>,当接收完成时,该 Mono 将完成。 |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
return session.receive() (1)
.doOnNext {
// ... (2)
}
.concatMap {
// ... (3)
}
.then() (4)
}
}
| 1 | 访问入站消息流。 |
| 2 | 对每条消息执行某些操作。 |
| 3 | 执行使用消息内容的嵌套异步操作。 |
| 4 | 返回一个 Mono<Void>,当接收完成时,该 Mono 将完成。 |
对于嵌套的异步操作,你可能需要在使用池化数据缓冲区的底层服务器(例如 Netty)上调用 message.retain()。否则,数据缓冲区可能会在你有机会读取数据之前就被释放。更多背景信息,请参阅数据缓冲区与编解码器。 |
以下实现结合了入站和出站流:
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive() (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.map(value -> session.textMessage("Echo " + value)); (2)
return session.send(output); (3)
}
}
| 1 | 处理入站消息流。 |
| 2 | 创建出站消息,生成一个组合流。 |
| 3 | 返回一个 Mono<Void>,只要我们持续接收数据,它就不会完成。 |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val output = session.receive() (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.map { session.textMessage("Echo $it") } (2)
return session.send(output) (3)
}
}
| 1 | 处理入站消息流。 |
| 2 | 创建出站消息,生成一个组合流。 |
| 3 | 返回一个 Mono<Void>,只要我们持续接收数据,它就不会完成。 |
入站流和出站流可以相互独立,仅在完成时进行合并,如下例所示:
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive() (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage)); (2)
return Mono.zip(input, output).then(); (3)
}
}
| 1 | 处理入站消息流。 |
| 2 | 发送传出消息。 |
| 3 | 合并两个流,并返回一个 Mono<Void>,当任一流结束时,该 Mono 即完成。 |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive() (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.then()
val source: Flux<String> = ...
val output = session.send(source.map(session::textMessage)) (2)
return Mono.zip(input, output).then() (3)
}
}
| 1 | 处理入站消息流。 |
| 2 | 发送传出消息。 |
| 3 | 合并两个流,并返回一个 Mono<Void>,当任一流结束时,该 Mono 即完成。 |
DataBuffer
DataBuffer 是 WebFlux 中字节缓冲区的表示形式。参考文档的 Spring Core 部分在数据缓冲区和编解码器一节中有更多相关内容。需要理解的关键点是,在某些服务器(如 Netty)上,字节缓冲区是池化的并采用引用计数机制,必须在使用完毕后释放,以避免内存泄漏。
当在 Netty 上运行时,如果应用程序希望保留输入的数据缓冲区以确保它们不会被释放,则必须使用 DataBufferUtils.retain(dataBuffer),并在缓冲区被消费后随后调用 DataBufferUtils.release(dataBuffer)。
握手
WebSocketHandlerAdapter 委托给一个 WebSocketService。默认情况下,该服务是 HandshakeWebSocketService 的实例,它会对 WebSocket 请求执行基本检查,然后使用适用于当前服务器的 RequestUpgradeStrategy。目前,内置支持 Reactor Netty、Tomcat、Jetty 和 Undertow。
HandshakeWebSocketService 提供了一个 sessionAttributePredicate 属性,允许设置一个 Predicate<String>,用于从 WebSession 中提取属性并将其插入到 WebSocketSession 的属性中。
服务器配置
每个服务器的 RequestUpgradeStrategy 会暴露底层 WebSocket 服务器引擎特有的配置。当使用 WebFlux Java 配置时,您可以按照WebFlux 配置相应章节中所示的方式自定义这些属性;如果不使用 WebFlux 配置,则可使用以下方法:
-
Java
-
Kotlin
@Configuration
class WebConfig {
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
@Configuration
class WebConfig {
@Bean
fun handlerAdapter() =
WebSocketHandlerAdapter(webSocketService())
@Bean
fun webSocketService(): WebSocketService {
val strategy = TomcatRequestUpgradeStrategy().apply {
setMaxSessionIdleTimeout(0L)
}
return HandshakeWebSocketService(strategy)
}
}
请检查您服务器的升级策略,以查看有哪些可用选项。目前,只有 Tomcat 和 Jetty 提供此类选项。
跨域资源共享(CORS)
配置 CORS 并限制对 WebSocket 端点访问的最简单方法是让您的 WebSocketHandler 实现 CorsConfigurationSource 接口,并返回一个包含允许的源(origins)、请求头(headers)及其他详细信息的 CorsConfiguration 对象。如果您无法这样做,也可以在 corsConfigurations 上设置 SimpleUrlHandler 属性,通过 URL 模式来指定 CORS 设置。如果同时指定了这两种方式,系统将通过 combine 的 CorsConfiguration 方法将它们合并。
客户端
Spring WebFlux 提供了 WebSocketClient 抽象,其实现包括 Reactor Netty、Tomcat、Jetty、Undertow 和标准 Java(即 JSR-356)。
Tomcat 客户端实际上是标准 Java 客户端的一个扩展,在 WebSocketSession 处理中增加了一些额外功能,以利用 Tomcat 特有的 API 来暂停接收消息,从而实现背压(back pressure)控制。 |
要启动一个 WebSocket 会话,你可以创建客户端的实例并使用其 execute 方法:
-
Java
-
Kotlin
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());
val client = ReactorNettyWebSocketClient()
val url = URI("ws://localhost:8080/path")
client.execute(url) { session ->
session.receive()
.doOnNext(::println)
.then()
}
某些客户端(例如 Jetty)实现了 Lifecycle 接口,在使用之前需要先停止再启动。所有客户端都提供了与底层 WebSocket 客户端配置相关的构造函数选项。