跳到主要内容

WebSockets

ChatGPT-4o-mini 中英对照 WebSockets

本部分参考文档涵盖了对 reactive-stack WebSocket 消息传递的支持。

WebSocket 介绍

WebSocket 协议,RFC 6455,提供了一种标准化的方法,通过单个 TCP 连接在客户端和服务器之间建立全双工、双向通信通道。它是与 HTTP 不同的 TCP 协议,但设计上旨在通过 HTTP 工作,使用 80 和 443 端口,并允许重用现有的防火墙规则。

WebSocket 交互始于一个 HTTP 请求,该请求使用 HTTP Upgrade 头来升级或在这种情况下切换到 WebSocket 协议。以下示例展示了这样的交互:

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket // <1>
Connection: Upgrade // <2>
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
yaml
  • Upgrade 头部。

  • 使用 Upgrade 连接。

与通常的 200 状态码不同,支持 WebSocket 的服务器返回的输出类似于以下内容:

HTTP/1.1 101 Switching Protocols // <1>
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
yaml
  • 协议切换

在成功握手后,HTTP 升级请求底层的 TCP 套接字保持打开状态,以便客户端和服务器继续发送和接收消息。

WebSockets 的完整工作原理介绍超出了本文档的范围。请参阅 RFC 6455、HTML5 的 WebSocket 章节或网络上的许多介绍和教程。

注意,如果 WebSocket 服务器运行在一个 web 服务器后面(例如,nginx),您可能需要配置它以将 WebSocket 升级请求传递给 WebSocket 服务器。同样,如果应用程序运行在云环境中,请检查云提供商与 WebSocket 支持相关的说明。

HTTP 与 WebSocket

尽管 WebSocket 被设计为与 HTTP 兼容并以 HTTP 请求开始,但重要的是要理解这两种协议导致非常不同的架构和应用程序编程模型。

在 HTTP 和 REST 中,一个应用程序被建模为多个 URL。为了与应用程序交互,客户端以请求-响应的方式访问这些 URL。服务器根据 HTTP URL、方法和头部将请求路由到适当的处理程序。

相比之下,在 WebSockets 中,通常只有一个用于初始连接的 URL。随后,所有应用程序消息都通过同一个 TCP 连接流动。这指向了一种完全不同的异步、事件驱动的消息架构。

WebSocket 也是一种低级传输协议,与 HTTP 不同,它不对消息内容规定任何语义。这意味着,除非客户端和服务器对消息语义达成一致,否则无法路由或处理消息。

WebSocket 客户端和服务器可以通过 HTTP 握手请求中的 Sec-WebSocket-Protocol 头部协商使用更高级别的消息传递协议(例如,STOMP)。如果没有这个,他们需要自己制定约定。

何时使用 WebSockets

WebSockets 可以使网页变得动态和互动。然而,在许多情况下,AJAX 和 HTTP 流或长轮询的组合可以提供一个简单有效的解决方案。

例如,新闻、邮件和社交动态需要动态更新,但每几分钟更新一次可能是完全可以接受的。另一方面,协作、游戏和金融应用则需要更接近实时。

延迟本身并不是决定性因素。如果消息的数量相对较少(例如,监控网络故障),HTTP 流式传输或轮询可以提供有效的解决方案。正是低延迟、高频率和高容量的结合,使得使用 WebSocket 成为最佳选择。

请记住,互联网上可能存在一些超出您控制范围的限制性代理,这可能会阻止 WebSocket 交互,原因可能是它们未配置为传递 Upgrade 头,或者因为它们关闭看起来处于空闲状态的长连接。这意味着在防火墙内的内部应用程序使用 WebSocket 是一个比公共应用程序更直接的决定。

WebSocket API

Spring 框架提供了一个 WebSocket API,您可以使用它来编写处理 WebSocket 消息的客户端和服务器端应用程序。

服务器

要创建一个 WebSocket 服务器,您可以首先创建一个 WebSocketHandler。以下示例展示了如何做到这一点:

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}
java

然后你可以将其映射到一个 URL:

@Configuration
class WebConfig {

@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers

return new SimpleUrlHandlerMapping(map, order);
}
}
java

如果使用 WebFlux 配置,则无需进一步操作;否则,如果不使用 WebFlux 配置,则需要声明一个 WebSocketHandlerAdapter,如下所示:

@Configuration
class WebConfig {

// ...

@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
java

WebSocketHandler

WebSocketHandlerhandle 方法接受 WebSocketSession 并返回 Mono<Void>,以指示会话的应用程序处理何时完成。会话通过两个流进行处理,一个用于入站消息,一个用于出站消息。下表描述了处理这些流的两个方法:

WebSocketSession 方法描述
Flux<WebSocketMessage> receive()提供对传入消息流的访问,并在连接关闭时完成。
Mono<Void> send(Publisher<WebSocketMessage>)接受一个用于发送消息的源,写入消息,并返回一个 Mono<Void>,在源完成且写入完成时完成。

一个 WebSocketHandler 必须将入站和出站流组合成一个统一的流,并返回一个 Mono<Void>,以反映该流的完成。根据应用程序的要求,统一流在以下情况下完成:

  • 入站或出站消息流完成。

  • 入站流完成(即连接关闭),而出站流是无限的。

  • 在选定的时刻,通过 WebSocketSessionclose 方法。

当入站和出站消息流组合在一起时,不需要检查连接是否打开,因为反应式流信号结束活动。入站流接收完成或错误信号,而出站流接收取消信号。

处理程序最基本的实现是处理传入流的一个。以下示例展示了这样的实现:

class ExampleHandler implements WebSocketHandler {

@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive() 1
.doOnNext(message -> {
// ... // <2>
})
.concatMap(message -> {
// ... // <3>
})
.then(); 4
}
}
java
  • 访问传入消息的流。

  • 对每条消息执行某些操作。

  • 执行使用消息内容的嵌套异步操作。

  • 返回一个 Mono<Void>,在接收完成时完成。

提示

对于嵌套的异步操作,您可能需要在使用池化数据缓冲区的底层服务器上调用 message.retain()(例如,Netty)。否则,数据缓冲区可能会在您有机会读取数据之前被释放。有关更多背景信息,请参见 数据缓冲区和编解码器

以下实现结合了入站和出站流:

class ExampleHandler implements WebSocketHandler {

@Override
public Mono<Void> handle(WebSocketSession session) {

Flux<WebSocketMessage> output = session.receive() 1
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.map(value -> session.textMessage("Echo " + value)); 2

return session.send(output); 3
}
}
java
  • 处理入站消息流。

  • 创建出站消息,生成一个组合流。

  • 返回一个 Mono<Void>,在我们继续接收时不会完成。

入站和出站流可以是独立的,并且仅在完成时连接,如以下示例所示:

class ExampleHandler implements WebSocketHandler {

@Override
public Mono<Void> handle(WebSocketSession session) {

Mono<Void> input = session.receive() 1
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();

Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage)); 2

return Mono.zip(input, output).then(); 3
}
}
java
  • 处理传入消息流。

  • 发送传出消息。

  • 合并流并返回一个 Mono<Void>,当任一流结束时完成。

DataBuffer

DataBuffer 是 WebFlux 中字节缓冲区的表示。参考文献的 Spring Core 部分在 数据缓冲区和编解码器 一节中有更多内容。关键点是要理解,在某些服务器上,如 Netty,字节缓冲区是池化和引用计数的,必须在使用后释放以避免内存泄漏。

在使用 Netty 运行时,应用程序必须使用 DataBufferUtils.retain(dataBuffer) 来保持输入数据缓冲区,以确保它们不会被释放,并在缓冲区被消费后使用 DataBufferUtils.release(dataBuffer)

握手

WebSocketHandlerAdapter 委托给 WebSocketService。默认情况下,这是 HandshakeWebSocketService 的一个实例,它对 WebSocket 请求执行基本检查,然后使用所使用的服务器的 RequestUpgradeStrategy。目前,内置支持 Reactor Netty、Tomcat、Jetty 和 Undertow。

HandshakeWebSocketService 暴露了一个 sessionAttributePredicate 属性,允许设置一个 Predicate<String> 来从 WebSession 中提取属性,并将其插入到 WebSocketSession 的属性中。

服务器配置

每个服务器的 RequestUpgradeStrategy 暴露了特定于底层 WebSocket 服务器引擎的配置。当使用 WebFlux Java 配置时,您可以自定义如下属性,如 WebFlux 配置 中的相应部分所示;如果不使用 WebFlux 配置,请使用以下内容:

@Configuration
class WebConfig {

@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}

@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
java

检查您的服务器的升级策略,以查看有哪些可用选项。目前,仅 Tomcat 和 Jetty 提供此类选项。

CORS

配置 CORS 并限制对 WebSocket 端点的访问最简单的方法是让你的 WebSocketHandler 实现 CorsConfigurationSource 并返回一个包含允许的来源、头部和其他细节的 CorsConfiguration。如果你无法做到这一点,你也可以在 SimpleUrlHandler 上设置 corsConfigurations 属性,通过 URL 模式指定 CORS 设置。如果两者都被指定,它们将通过在 CorsConfiguration 上使用 combine 方法进行合并。

客户端

Spring WebFlux 提供了一个 WebSocketClient 抽象,具有 Reactor Netty、Tomcat、Jetty、Undertow 和标准 Java(即 JSR-356)的实现。

备注

Tomcat 客户端实际上是标准 Java 客户端的扩展,具有一些额外的功能,在 WebSocketSession 处理上利用 Tomcat 特定的 API 来暂停接收消息以应对背压。

要开始一个 WebSocket 会话,您可以创建客户端的实例并使用其 execute 方法:

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());
java

一些客户端,例如 Jetty,实现了 Lifecycle,在使用之前需要先停止和启动它们。所有客户端都有与底层 WebSocket 客户端配置相关的构造函数选项。