此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Framework 6.2.10! |
Web套接字
参考文档的这一部分涵盖了对响应式堆栈 WebSocket 的支持 消息。
WebSocket 简介
WebSocket 协议 RFC 6455 提供了一个标准化的 在客户端和服务器之间建立全双工双向通信通道的方法 通过单个 TCP 连接。它是与 HTTP 不同的 TCP 协议,但旨在 通过 HTTP 工作,使用端口 80 和 443,并允许重复使用现有防火墙规则。
WebSocket 交互从使用 HTTP 的 HTTP 请求开始Upgrade
页眉
升级,或者在这种情况下,切换到 WebSocket 协议。以下示例
显示了这样的交互:
GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
1 | 这Upgrade 页眉。 |
2 | 使用Upgrade 连接。 |
支持 WebSocket 的服务器返回输出,而不是通常的 200 状态代码 类似于以下内容:
HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
1 | 协议交换机 |
成功握手后,HTTP 升级请求的基础 TCP 套接字将保留 打开,供客户端和服务器继续发送和接收消息。
WebSocket 工作原理的完整介绍超出了本文档的范围。 请参阅 RFC 6455、HTML5 的 WebSocket 章节或许多介绍和 Web 上的教程。
请注意,如果 WebSocket 服务器在 Web 服务器(例如 nginx)后面运行,则 可能需要将其配置为将 WebSocket 升级请求传递到 WebSocket 服务器。同样,如果应用程序在云环境中运行,请检查 与 WebSocket 支持相关的云提供商的说明。
HTTP 与 WebSocket
尽管 WebSocket 被设计为 HTTP 兼容并以 HTTP 请求开头, 重要的是要了解这两种协议导致非常不同的 架构和应用程序编程模型。
在 HTTP 和 REST 中,应用程序被建模为多个 URL。要与应用程序交互, 客户端访问这些 URL,请求-响应风格。服务器将请求路由到 基于 HTTP URL、方法和标头的适当处理程序。
相比之下,在 WebSockets 中,初始连接通常只有一个 URL。 随后,所有应用程序消息都在同一 TCP 连接上流动。这指向 一个完全不同的异步、事件驱动的消息传递体系结构。
WebSocket 也是一种低级传输协议,与 HTTP 不同,它没有规定 消息内容的任何语义。这意味着无法路由或处理 除非客户端和服务器就消息语义达成一致。
WebSocket 客户端和服务器可以协商使用更高级别的消息传递协议
(例如,STOMP),通过Sec-WebSocket-Protocol
HTTP 握手请求上的标头。
如果没有这一点,他们需要制定自己的公约。
何时使用 WebSockets
WebSockets 可以使网页具有动态性和交互性。然而,在许多情况下, AJAX 和 HTTP 流的组合或长轮询可以提供一个简单的 有效的解决方案。
例如,新闻、邮件和社交源需要动态更新,但可能是 每隔几分钟就这样做一次完全没问题。协作、游戏和金融应用程序,在 另一方面,需要更接近实时。
延迟本身并不是决定因素。如果消息量相对较低(例如 监控网络故障)HTTP 流或轮询可以提供有效的解决方案。 正是低延迟、高频和高音量的结合造就了最好的 使用 WebSocket 的案例。
还要记住,在互联网上,您无法控制的限制性代理
可能会阻止 WebSocket 交互,因为它们未配置为传递Upgrade
标头,或者因为它们关闭了显示为空闲的长期连接。这
意味着将 WebSocket 用于防火墙内的内部应用程序是
与面向公众的应用程序相比,直接做出决定。
WebSocket API
Spring Framework 提供了一个 WebSocket API,您可以使用它来编写客户端和 处理 WebSocket 消息的服务器端应用程序。
服务器
要创建 WebSocket 服务器,您可以先创建一个WebSocketHandler
.
以下示例显示了如何执行此作:
-
Java
-
Kotlin
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
class MyWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
// ...
}
}
然后,您可以将其映射到 URL:
-
Java
-
Kotlin
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(map, order);
}
}
@Configuration
class WebConfig {
@Bean
fun handlerMapping(): HandlerMapping {
val map = mapOf("/path" to MyWebSocketHandler())
val order = -1 // before annotated controllers
return SimpleUrlHandlerMapping(map, order)
}
}
如果使用 WebFlux Config,则没有任何内容
进一步执行,否则,如果不使用 WebFlux 配置,则需要声明一个WebSocketHandlerAdapter
如下图所示:
-
Java
-
Kotlin
@Configuration
class WebConfig {
// ...
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
@Configuration
class WebConfig {
// ...
@Bean
fun handlerAdapter() = WebSocketHandlerAdapter()
}
WebSocketHandler
这handle
方法WebSocketHandler
需要WebSocketSession
并返回Mono<Void>
以指示会话的应用程序处理何时完成。会话已处理
通过两个流,一个用于入站消息,一个用于出站消息。下表
描述了处理流的两种方法:
WebSocketSession 方法 |
描述 |
---|---|
|
提供对入站消息流的访问,并在连接关闭时完成。 |
|
获取传出消息的源,写入消息,并返回 |
一个WebSocketHandler
必须将入站和出站流组合成一个统一的流,并且
返回一个Mono<Void>
这反映了该流程的完成。取决于应用
要求时,统一流将在以下情况下完成:
-
入站或出站消息流完成。
-
入站流完成(即连接关闭),而出站流是无限的。
-
在选定的点上,通过
close
方法WebSocketSession
.
当入站和出站消息流组合在一起时,无需 检查连接是否打开,因为反应流表示活动结束。 入站流接收完成或错误信号,出站流 接收取消信号。
处理程序最基本的实现是处理入站流的实现。这 以下示例显示了这样的实现:
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive() (1)
.doOnNext(message -> {
// ... (2)
})
.concatMap(message -> {
// ... (3)
})
.then(); (4)
}
}
1 | 访问入站消息流。 |
2 | 对每条消息做一些事情。 |
3 | 执行使用消息内容的嵌套异步作。 |
4 | 返回一个Mono<Void> 当接收完成时完成。 |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
return session.receive() (1)
.doOnNext {
// ... (2)
}
.concatMap {
// ... (3)
}
.then() (4)
}
}
1 | 访问入站消息流。 |
2 | 对每条消息做一些事情。 |
3 | 执行使用消息内容的嵌套异步作。 |
4 | 返回一个Mono<Void> 当接收完成时完成。 |
对于嵌套的异步作,您可能需要调用message.retain() 在底层使用池数据缓冲区的服务器(例如 Netty)。否则,数据缓冲区可能会在您有机会读取数据之前被释放。有关更多背景信息,请参阅数据缓冲区和编解码器。 |
以下实现结合了入站和出站流:
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive() (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.map(value -> session.textMessage("Echo " + value)); (2)
return session.send(output); (3)
}
}
1 | 处理入站消息流。 |
2 | 创建出站消息,生成组合流。 |
3 | 返回一个Mono<Void> 当我们继续接收时,这还没有完成。 |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val output = session.receive() (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.map { session.textMessage("Echo $it") } (2)
return session.send(output) (3)
}
}
1 | 处理入站消息流。 |
2 | 创建出站消息,生成组合流。 |
3 | 返回一个Mono<Void> 当我们继续接收时,这还没有完成。 |
入站和出站流可以是独立的,并且只能在完成时加入, 如以下示例所示:
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive() (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage)); (2)
return Mono.zip(input, output).then(); (3)
}
}
1 | 处理入站消息流。 |
2 | 发送外发消息。 |
3 | 加入流并返回Mono<Void> 当任一流结束时完成。 |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive() (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.then()
val source: Flux<String> = ...
val output = session.send(source.map(session::textMessage)) (2)
return Mono.zip(input, output).then() (3)
}
}
1 | 处理入站消息流。 |
2 | 发送外发消息。 |
3 | 加入流并返回Mono<Void> 当任一流结束时完成。 |
DataBuffer
DataBuffer
是 WebFlux 中字节缓冲区的表示形式。的 Spring Core 部分
该参考资料在有关数据缓冲区和编解码器的部分中有更多相关内容。要理解的关键点是,在某些
像 Netty 这样的服务器,字节缓冲区被池化并对引用进行计数,并且必须释放
使用时以避免内存泄漏。
在 Netty 上运行时,应用程序必须使用DataBufferUtils.retain(dataBuffer)
如果他们
希望保留输入数据缓冲区以确保它们不会被释放,并且
随后使用DataBufferUtils.release(dataBuffer)
当缓冲区被消耗时。
握手
WebSocketHandlerAdapter
委托给WebSocketService
. 默认情况下,这是一个实例 之HandshakeWebSocketService
,它对 WebSocket 请求执行基本检查,然后使用然后使用RequestUpgradeStrategy
对于正在使用的服务器。目前,内置了支持 Reactor Netty、Tomcat、Jetty 和 Undertow。
HandshakeWebSocketService
公开一个sessionAttributePredicate
允许设置一个Predicate<String>
从WebSession
并将它们插入到WebSocketSession
.
服务器配置
这RequestUpgradeStrategy
对于每个服务器,会公开特定于
底层 WebSocket 服务器引擎。使用 WebFlux Java 配置时,您可以自定义
WebFlux 配置的相应部分中显示的此类属性,或者如果
不使用 WebFlux 配置,请使用以下内容:
-
Java
-
Kotlin
@Configuration
class WebConfig {
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
@Configuration
class WebConfig {
@Bean
fun handlerAdapter() =
WebSocketHandlerAdapter(webSocketService())
@Bean
fun webSocketService(): WebSocketService {
val strategy = TomcatRequestUpgradeStrategy().apply {
setMaxSessionIdleTimeout(0L)
}
return HandshakeWebSocketService(strategy)
}
}
检查服务器的升级策略,查看可用的选项。现在 只有 Tomcat 和 Jetty 公开了这样的选项。
CORS
配置 CORS 并限制对 WebSocket 端点的访问的最简单方法是
拥有您的WebSocketHandler
实现CorsConfigurationSource
并返回一个CorsConfiguration
包含允许的来源、标头和其他详细信息。如果你不能
这样,您还可以将corsConfigurations
属性SimpleUrlHandler
自
通过 URL 模式指定 CORS 设置。如果同时指定了两者,则使用combine
方法CorsConfiguration
.
客户端
Spring WebFlux 提供了一个WebSocketClient
抽象与实现
Reactor Netty、Tomcat、Jetty、Undertow 和标准 Java(即 JSR-356)。
Tomcat 客户端实际上是标准 Java 客户端的扩展,但有一些额外的功能
功能WebSocketSession 处理以利用特定于 Tomcat 的API 来暂停接收消息以进行背压。 |
要启动 WebSocket 会话,您可以创建客户端的实例并使用其execute
方法:
-
Java
-
Kotlin
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());
val client = ReactorNettyWebSocketClient()
val url = URI("ws://localhost:8080/path")
client.execute(url) { session ->
session.receive()
.doOnNext(::println)
.then()
}
一些客户端,例如 Jetty,实现Lifecycle
并且需要停止和启动然后才能使用它们。所有客户端都有与配置相关的构造函数选项底层 WebSocket 客户端。