跳到主要内容
版本:7.0.3

WebSockets

Hunyuan 7b 中英对照 WebSockets

本参考文档的这一部分涵盖了对reactive-stack WebSocket消息传递的支持。

WebSocket简介

WebSocket协议(RFC 6455)提供了一种标准化的方式,通过单一的TCP连接在客户端和服务器之间建立全双工、双向的通信通道。它是一种与HTTP不同的TCP协议,但设计为可以在HTTP之上运行,使用80和443端口,并允许重用现有的防火墙规则。

WebSocket交互始于一个HTTP请求,该请求使用HTTP的Upgrade头部来升级连接,或者在这种情况下,切换到WebSocket协议。以下示例展示了这样的交互过程:

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket // <1>
Connection: Upgrade // <2>
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
  • Upgrade标题。

  • 使用Upgrade连接。

与通常的200状态码不同,支持WebSocket的服务器会返回类似以下的输出:

HTTP/1.1 101 Switching Protocols // <1>
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
  • 协议交换机

在成功完成握手后,支持HTTP升级请求的TCP套接字对客户端和服务器来说仍然保持开放状态,以便双方可以继续发送和接收消息。

关于WebSockets的工作原理的完整介绍超出了本文档的范围。请参阅RFC 6455、HTML5中的WebSocket章节,或者网络上众多的相关介绍和教程。

请注意,如果WebSocket服务器运行在Web服务器(例如nginx)之后,您可能需要配置它将WebSocket升级请求转发给实际的WebSocket服务器。同样地,如果应用程序在云环境中运行,请查阅云提供商关于WebSocket支持的说明。

HTTP 与 WebSocket

尽管WebSocket被设计为与HTTP兼容,并且以HTTP请求开始,但重要的是要理解,这两种协议导致了非常不同的架构和应用程序编程模型。

在HTTP和REST中,一个应用被建模为多个URL。为了与应用程序交互,客户端通过请求-响应的方式访问这些URL。服务器根据HTTP URL、方法和头部信息将请求路由到相应的处理程序。

相比之下,在WebSocket中,初次连接时通常只有一个URL。之后,所有应用程序的消息都通过同一个TCP连接进行传输。这指向了一种完全不同的异步、事件驱动的消息架构。

WebSocket也是一种低级别的传输协议,与HTTP不同,它不对消息的内容规定任何语义。这意味着,除非客户端和服务器就消息的语义达成一致,否则就无法对消息进行路由或处理。

WebSocket客户端和服务器可以通过HTTP握手请求中的Sec-WebSocket-Protocol头部来协商使用更高级别的消息协议(例如STOMP)。如果没有这个头部,它们就需要自己制定协议约定。

何时使用WebSocket

WebSocket可以使网页变得动态和交互式。然而,在许多情况下,结合使用AJAX和HTTP流或长轮询可以提供一种简单而有效的解决方案。

例如,新闻、邮件和社交动态需要动态更新,但每隔几分钟更新一次可能完全没问题。另一方面,协作工具、游戏和金融应用程序则需要更接近实时的更新速度。

仅延迟本身并不是决定性因素。如果消息量相对较少(例如,用于监控网络故障),HTTP流式传输或轮询可以提供有效的解决方案。只有低延迟、高频率和高消息量的结合,才能使WebSocket的使用达到最佳效果。

还要记住,在互联网上,一些不受你控制的限制性代理可能会阻止WebSocket交互,要么是因为这些代理没有配置为传递Upgrade头部信息,要么是因为它们会关闭那些看似空闲的长期连接的。这意味着,对于防火墙内的内部应用程序来说,使用WebSocket是一个更为直接的决定,而对于面向公众的应用程序而言则不然。

WebSocket API

Spring框架提供了一个WebSocket API,你可以使用它来编写处理WebSocket消息的客户端和服务器端应用程序。

服务器

要创建一个WebSocket服务器,你可以首先创建一个WebSocketHandler。以下示例展示了如何这样做:

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}

然后你可以将其映射到一个URL:

@Configuration
class WebConfig {

@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers

return new SimpleUrlHandlerMapping(map, order);
}
}

如果使用WebFlux Config,则无需再做其他操作;否则,如果不使用WebFlux配置,就需要声明一个WebSocketHandlerAdapter,如下所示:

@Configuration
class WebConfig {

// ...

@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}

WebSocketHandler

WebSocketHandlerhandle方法接收WebSocketSession作为参数,并返回MonoVoid>来表示应用程序对会话的处理已完成。会话的处理通过两个流来完成,一个用于传入(inbound)消息,另一个用于传出(outbound)消息。下表描述了处理这些流的两种方法:

WebSocketSession 方法描述
Flux<WebSocketMessage> receive()提供对传入消息流的访问,当连接关闭时该方法完成执行。
MonoVoid> send(Publisher<WebSocketMessage>)接受传出消息的源,写入这些消息,并返回一个 MonoVOID>,当消息源完成发送且写入操作结束后,该 MonoVoid> 也会完成执行。

WebSocketHandler必须将传入(inbound)和传出(outbound)的流组合成一个统一的流,并返回一个Mono Void>,该Mono Void>表示该流的完成。根据应用程序的需求,统一的流在以下情况下完成:

  • 入站消息流或出站消息流中的任意一个必须完成。
  • 入站消息流完成(即连接关闭),而出站消息流则是持续不断的。
  • 可以通过调用 WebSocketSessionclose 方法,在某个特定时刻终止出站消息流的传输。

当入站和出站消息流结合在一起时,就没有必要检查连接是否开放,因为Reactive Streams会发出结束活动的信号。入站流会接收到一个完成或错误信号,而出站流则会接收到一个取消信号。

处理程序(handler)最基本的实现形式是那种能够处理传入数据流(inbound stream)的实现。以下示例展示了这样的实现:

class ExampleHandler implements WebSocketHandler {

@Override
public MonoVoid> handle(WebSocketSession session) {
return session.receive() 1
.doOnNext(message -> {
// ... // <2>
})
.concatMap(message -> {
// ... // <3>
})
.then(); 4
}
}
  • \ [#1] 访问传入消息的流。
  • 对每条消息执行某些操作。

  • \ [#3] 执行使用消息内容的嵌套异步操作。
  • \ [#4] 返回一个 MonoVoid,在接收完成后该 Mono Void 也会完成。
提示

对于嵌套的、异步的操作,你可能需要在使用池化数据缓冲区的底层服务器(例如 Netty)上调用 message.retain()。否则,在你有机会读取数据之前,数据缓冲区可能会被释放。有关更多背景信息,请参阅 数据缓冲区和编码器

以下实现结合了入站流和出站流:

class ExampleHandler implements WebSocketHandler {

@Override
public Mono VOID> handle(WebSocketSession session) {

Flux<WebSocketMessage> output = session.receive(); 1
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.map(value -> session.textMessage("Echo " + value)); 2

return session.send(output); 3
}
}
  • 处理传入的消息流。

  • 创建传出消息,生成一个组合流。

  • \ [#3] 只要我们继续接收消息,就返回一个不会完成的 MonoVoid>

入站流和出站流可以是独立的,只有在完成时才会被合并,如下例所示:

class ExampleHandler implements WebSocketHandler {

@Override
public Mono VOID> handle(WebSocketSession session) {

MonoVoid> input = session.receive() 1
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();

Flux<String> source = ... ;
MonoVoid> output = session.send(source.map(session::textMessage)); 2

return input.and(output); 3
}
}
  • 处理传入的消息流。

  • 发送传出消息。

  • 将两个流合并后返回一个 MonoVoid>,任一流结束时该 MonoVoid 也会完成。

DataBuffer

DataBuffer 是 WebFlux 中字节缓冲区的表示形式。关于这一点的更多信息,可以在 Spring Core 的参考文档中 “数据缓冲区与编码器”(Data Buffers and Codecs)部分找到。需要理解的关键点是,在像 Netty 这样的某些服务器上,字节缓冲区是经过池化管理的,并且采用了引用计数机制;因此,在使用完毕后必须释放这些缓冲区,以避免内存泄漏。

在Netty上运行时,如果应用程序希望保留输入数据缓冲区以确保它们不会被释放,就必须使用DataBufferUtils.retain(dataBuffer);而当这些缓冲区被消耗完毕后,则需要使用DataBufferUtils.release(dataBuffer)

握手协议

WebSocketHandlerAdapter 会委托给一个 WebSocketService。默认情况下,这个 WebSocketServiceHandshakeWebSocketService 的实例,它会对 WebSocket 请求进行基本的检查,然后使用当前服务器所支持的 RequestUpgradeStrategy。目前,内置了对 Reactor Netty、Tomcat 和 Jetty 的支持。

HandshakeWebSocketService 提供了一个 sessionAttributePredicate 属性,该属性允许设置一个 Predicate<String> 来从 WebSession 中提取属性,并将这些属性插入到 WebSocketSession 的属性中。

服务器配置

每个服务器的RequestUpgradeStrategy都会暴露特定于底层WebSocket服务器引擎的配置。当使用WebFlux Java配置时,可以按照WebFlux Config中相应部分的说明来自定义这些属性;如果不使用WebFlux配置,则可以使用以下方式:

@Configuration
class WebConfig {

@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}

@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}

检查您的服务器的升级策略,以了解有哪些可用的选项。目前,只有Tomcat和Jetty提供了这样的选项。

CORS

配置CORS并限制对WebSocket端点的访问的最简单方法是让你的WebSocketHandler实现CorsConfigurationSource,并返回一个包含允许的源、头部信息及其他详细设置的CorsConfiguration对象。如果无法这样做,你也可以在SimpleUrlHandler上设置corsConfigurations属性,通过URL模式来指定CORS设置。如果两者都指定了,那么这些设置会通过使用CorsConfiguration上的combine方法进行合并。

客户端

Spring WebFlux 提供了一个 WebSocketClient 抽象层,其实现涵盖了 Reactor Netty、Tomcat、Jetty 以及标准 Java(即 JSR-356)环境。

备注

Tomcat客户端实际上是标准Java客户端的扩展,在WebSocketSession处理方面增加了一些额外的功能,以便利用Tomcat特定的API来暂停接收消息,从而实现背压控制。

要启动一个WebSocket会话,你可以创建一个客户端实例并使用其execute方法:

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());

一些客户端,例如Jetty,实现了Lifecycle(生命周期)机制,在使用它们之前需要先停止再启动。所有客户端都提供与底层WebSocket客户端配置相关的构造函数选项。