|
对于最新的稳定版本,请使用 Spring Framework 7.0.6! |
WebSockets
这部分参考文档涵盖了对响应式堆栈WebSocket消息传递的支持。
WebSocket简介
WebSocket协议,RFC 6455,提供了一种标准化的方式,用于在客户端和服务器之间通过单个TCP连接建立全双工、双向通信通道。它与HTTP是不同的TCP协议,但设计为通过HTTP工作,使用端口80和443,并允许重用现有的防火墙规则。
WebSocket 交互从使用 HTTP Upgrade 头的 HTTP 请求开始,以升级或在这种情况下切换到 WebSocket 协议。以下示例显示了这样的交互:
GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
| 1 | 核心 Upgrade 标头。 |
| 2 | 使用 Upgrade 连接。 |
与通常的200状态码不同,支持WebSocket的服务器返回的输出类似于以下内容:
HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
| 1 | 协议切换 |
成功握手后,HTTP升级请求底层的TCP套接字保持打开状态,客户端和服务器可以继续发送和接收消息。
关于WebSockets的工作原理的完整介绍超出了本文档的范围。请参阅RFC 6455、HTML5中的WebSocket章节,或网络上的众多介绍和教程。
请注意,如果WebSocket服务器运行在Web服务器(例如nginx)后面,您可能需要配置它以将WebSocket升级请求传递给WebSocket服务器。同样,如果应用程序在云环境中运行,请查看云提供商关于WebSocket支持的相关说明。
HTTP与WebSocket对比
尽管WebSocket被设计为与HTTP兼容并以HTTP请求开始,但重要的是要理解这两种协议会导致非常不同的架构和应用程序编程模型。
在HTTP和REST中,应用程序被建模为许多URL。要与应用程序交互,客户端访问这些URL,采用请求-响应的方式。服务器根据HTTP URL、方法和标头将请求路由到适当的处理器。
相比之下,在WebSockets中,通常只有一个URL用于初始连接。 随后,所有应用程序消息都在同一个TCP连接上流动。这指向了一种完全不同的异步、事件驱动的消息架构。
WebSocket 也是一种低级别的传输协议,与 HTTP 不同,它不对消息内容规定任何语义。这意味着除非客户端和服务器就消息语义达成一致,否则无法路由或处理消息。
WebSocket 客户端和服务器可以通过 HTTP 握手请求中的 Sec-WebSocket-Protocol 头部协商使用更高层级的消息协议(例如,STOMP)。如果没有这个头部,它们需要自己制定约定。
何时使用WebSockets
WebSockets 可以使网页变得动态和互动。然而,在许多情况下,Ajax 和 HTTP 流或长轮询的组合可以提供一个简单而有效的解决方案。
例如,新闻、邮件和社交动态需要动态更新,但每隔几分钟更新一次可能是完全可以接受的。另一方面,协作、游戏和金融应用程序需要更接近实时。
延迟本身并不是决定性因素。如果消息量相对较低(例如,监控网络故障),HTTP流或轮询可以提供有效的解决方案。正是低延迟、高频率和高容量的结合使得使用WebSocket成为最佳选择。
请记住,通过互联网,您无法控制的限制性代理可能会阻止WebSocket交互,要么是因为它们没有配置为传递Upgrade标头,要么是因为它们关闭了看起来空闲的长连接。这意味着在防火墙内的内部应用程序中使用WebSocket是一个更直接的决定,而不是在面向公众的应用程序中。
WebSocket API
Spring框架提供了一个WebSocket API,你可以使用它来编写处理WebSocket消息的客户端和服务器端应用程序。
服务器
要创建WebSocket服务器,您可以首先创建一个WebSocketHandler。
以下示例展示了如何操作:
-
Java
-
Kotlin
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
class MyWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
// ...
}
}
然后你可以将其映射到一个URL:
-
Java
-
Kotlin
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(map, order);
}
}
@Configuration
class WebConfig {
@Bean
fun handlerMapping(): HandlerMapping {
val map = mapOf("/path" to MyWebSocketHandler())
val order = -1 // before annotated controllers
return SimpleUrlHandlerMapping(map, order)
}
}
如果使用 WebFlux 配置,则无需进一步操作,否则如果不使用 WebFlux 配置,则需要如下面所示声明一个
WebSocketHandlerAdapter:
-
Java
-
Kotlin
@Configuration
class WebConfig {
// ...
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
@Configuration
class WebConfig {
// ...
@Bean
fun handlerAdapter() = WebSocketHandlerAdapter()
}
WebSocketHandler
The handle 方法的 WebSocketHandler 接受 WebSocketSession 并返回 Mono<Void>
以指示应用程序处理会话完成的时间。会话通过两个流进行处理,一个用于传入消息,另一个用于传出消息。下表描述了处理这两个流的两种方法:
WebSocketSession 方法 |
描述 |
|---|---|
|
提供对入站消息流的访问,并在连接关闭时完成。 |
|
接受传出消息的来源,写入消息,并在源完成且写入完成后返回一个 |
一个 WebSocketHandler 必须将入站和出站流组合成一个统一的流,并返回一个 Mono<Void>,该值反映了该流的完成。根据应用程序需求,统一流在以下情况下完成:
-
要么是入站消息流完成,要么是出站消息流完成。
-
入站流完成(即连接关闭),而出站流是无限的。
-
在某个选定的点,通过
close方法的WebSocketSession。
当入站和出站消息流组合在一起时,无需检查连接是否打开,因为响应式流会发出结束活动的信号。入站流会收到完成或错误信号,而出站流会收到取消信号。
最基础的处理器实现是处理入站流的。以下示例展示了这种实现:
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive() (1)
.doOnNext(message -> {
// ... (2)
})
.concatMap(message -> {
// ... (3)
})
.then(); (4)
}
}
| 1 | 访问传入消息流。 |
| 2 | 对每条消息进行处理。 |
| 3 | 执行使用消息内容的嵌套异步操作。 |
| 4 | 返回一个 Mono<Void>,在接收到完成时完成。 |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
return session.receive() (1)
.doOnNext {
// ... (2)
}
.concatMap {
// ... (3)
}
.then() (4)
}
}
| 1 | 访问传入消息流。 |
| 2 | 对每条消息进行处理。 |
| 3 | 执行使用消息内容的嵌套异步操作。 |
| 4 | 返回一个 Mono<Void>,在接收到完成时完成。 |
对于嵌套的异步操作,您可能需要在使用池化数据缓冲区(例如,Netty)的底层服务器上调用message.retain()。否则,在您有机会读取数据之前,数据缓冲区可能会被释放。有关更多背景信息,请参见数据缓冲区和编解码器。 |
以下实现结合了入站和出站流:
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive() (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.map(value -> session.textMessage("Echo " + value)); (2)
return session.send(output); (3)
}
}
| 1 | 处理传入的消息流。 |
| 2 | 创建出站消息,生成一个组合流。 |
| 3 | 返回一个 Mono<Void>,只要我们继续接收,则不会完成。 |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val output = session.receive() (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.map { session.textMessage("Echo $it") } (2)
return session.send(output) (3)
}
}
| 1 | 处理传入的消息流。 |
| 2 | 创建出站消息,生成一个组合流。 |
| 3 | 返回一个 Mono<Void>,只要我们继续接收,则不会完成。 |
入站和出站流可以是独立的,并且仅在完成时才进行合并,如下例所示:
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive() (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage)); (2)
return Mono.zip(input, output).then(); (3)
}
}
| 1 | 处理传入的消息流。 |
| 2 | 发送外出消息。 |
| 3 | 加入流并返回一个在任一流结束时完成的Mono<Void>。 |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive() (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.then()
val source: Flux<String> = ...
val output = session.send(source.map(session::textMessage)) (2)
return Mono.zip(input, output).then() (3)
}
}
| 1 | 处理传入的消息流。 |
| 2 | 发送外出消息。 |
| 3 | 加入流并返回一个在任一流结束时完成的Mono<Void>。 |
DataBuffer
DataBuffer 是 WebFlux 中字节缓冲区的表示。Spring Core 部分的参考文档在
数据缓冲区和编解码器一节中有更多相关内容。需要理解的关键点是,在某些服务器(如 Netty)上,字节缓冲区是被池化和引用计数的,当它们被消耗后必须释放以避免内存泄漏。
在使用Netty运行时,应用程序必须使用 DataBufferUtils.retain(dataBuffer) 如果它们希望保留输入数据缓冲区以确保它们不会被释放,并在缓冲区被消耗后使用 DataBufferUtils.release(dataBuffer)。
握手
WebSocketHandlerAdapter 委托给 WebSocketService。默认情况下,这是一个 HandshakeWebSocketService 的实例,它对 WebSocket 请求执行基本检查,然后使用 RequestUpgradeStrategy 为所使用的服务器。目前,内置支持 Reactor Netty、Tomcat、Jetty 和 Undertow。
HandshakeWebSocketService 暴露了一个 sessionAttributePredicate 属性,该属性允许
设置一个 Predicate<String> 以从 WebSession 中提取属性并将它们
插入到 WebSocketSession 的属性中。
服务器配置
RequestUpgradeStrategy 用于每个服务器,以暴露与底层 WebSocket 服务器引擎相关的配置。在使用 WebFlux Java 配置时,可以按照相应部分的
WebFlux 配置 自定义这些属性,或者如果不使用 WebFlux 配置,请使用以下内容:
-
Java
-
Kotlin
@Configuration
class WebConfig {
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
@Configuration
class WebConfig {
@Bean
fun handlerAdapter() =
WebSocketHandlerAdapter(webSocketService())
@Bean
fun webSocketService(): WebSocketService {
val strategy = TomcatRequestUpgradeStrategy().apply {
setMaxSessionIdleTimeout(0L)
}
return HandshakeWebSocketService(strategy)
}
}
检查你的服务器升级策略,看看有哪些可用的选项。目前, 只有Tomcat和Jetty提供了这样的选项。
CORS
配置CORS并限制对WebSocket端点的访问最简单的方法是让你的WebSocketHandler实现CorsConfigurationSource并返回一个CorsConfiguration,其中包含允许的源、标头和其他详细信息。如果你不能这样做,你也可以设置SimpleUrlHandler上的corsConfigurations属性,通过URL模式指定CORS设置。如果两者都指定了,它们将通过在CorsConfiguration上调用combine方法进行组合。
客户端
Spring WebFlux 提供了一个 WebSocketClient 抽象,具有 Reactor Netty、Tomcat、Jetty、Undertow 和标准 Java(即 JSR-356)的实现。
Tomcat客户端实际上是标准Java客户端的扩展,其中在WebSocketSession处理中添加了一些额外的功能,以利用Tomcat特有的API来暂停接收消息以实现背压。 |
要启动WebSocket会话,您可以创建客户端的实例并使用其execute
方法:
-
Java
-
Kotlin
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());
val client = ReactorNettyWebSocketClient()
val url = URI("ws://localhost:8080/path")
client.execute(url) { session ->
session.receive()
.doOnNext(::println)
.then()
}
某些客户端(如Jetty)实现了Lifecycle,需要在使用前停止和启动。所有客户端都有与底层WebSocket客户端配置相关的构造函数选项。