跳到主要内容

WebSockets 支持

QWen Plus 中英对照 WebSockets Support

从 4.1 版开始,Spring Integration 支持 WebSocket。它是基于 Spring Framework 的 web-socket 模块的架构、基础设施和 API。因此,Spring WebSocket 的许多组件(例如 SubProtocolHandlerWebSocketClient)和配置选项(例如 @EnableWebSocketMessageBroker)可以在 Spring Integration 中重用。有关更多信息,请参阅 Spring Framework 参考手册中的 Spring Framework WebSocket 支持 章节。

你需要将这个依赖添加到你的项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-websocket</artifactId>
<version>6.4.2</version>
</dependency>
xml

对于服务器端,必须显式包含 org.springframework:spring-webmvc 依赖。

Spring 框架的 WebSocket 基础设施基于 Spring 消息传递基础,并提供了一个基本的消息传递框架,该框架基于与 Spring Integration 使用的相同的 MessageChannel 实现和 MessageHandler 实现(以及一些 POJO 方法注解映射)。因此,即使没有 WebSocket 适配器,Spring Integration 也可以直接参与到 WebSocket 流中。为此,您可以使用适当的注解配置一个 Spring Integration 的 @MessagingGateway,如下例所示:

@MessagingGateway
@Controller
public interface WebSocketGateway {

@MessageMapping("/greeting")
@SendToUser("/queue/answer")
@Gateway(requestChannel = "greetingChannel")
String greeting(String payload);

}
java

概述

由于 WebSocket 协议本质上是流式的,我们可以同时向 WebSocket 发送和接收消息,因此我们可以处理适当的 WebSocketSession,无论是在客户端还是服务器端。为了封装连接管理和 WebSocketSession 注册表,提供了带有 ClientWebSocketContainerServerWebSocketContainer 实现的 IntegrationWebSocketContainer。得益于 WebSocket API 及其在 Spring 框架中的实现(带有许多扩展),服务器端和客户端使用相同的类(当然,从 Java 的角度来看)。因此,大多数连接和 WebSocketSession 注册表选项在两端都是相同的。这使我们能够重用许多配置项和基础架构钩子,在服务器端和客户端构建 WebSocket 应用程序。以下示例显示了组件如何同时服务于这两个目的:

//Client side
@Bean
public WebSocketClient webSocketClient() {
return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}

@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}

//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
return new ServerWebSocketContainer("/endpoint").withSockJs();
}
java

IntegrationWebSocketContainer 是为了实现双向消息传递而设计的,可以在入站和出站通道适配器之间共享(见下文),但在使用单向(发送或接收)WebSocket 消息传递时,只能从其中一个引用。它可以在没有任何通道适配器的情况下使用,但在此情况下,IntegrationWebSocketContainer 仅充当 WebSocketSession 注册表的作用。

备注

ServerWebSocketContainer 实现了 WebSocketConfigurer,以将内部的 IntegrationWebSocketContainer.IntegrationWebSocketHandler 作为 Endpoint 注册。它在提供的 paths 和其他服务器 WebSocket 选项(如 HandshakeHandlerSockJS 回退)下,在目标供应商 WebSocket 容器的 ServletWebSocketHandlerRegistry 中进行注册。此注册是通过一个基础结构组件 WebSocketIntegrationConfigurationInitializer 来完成的,其功能与 @EnableWebSocket 注解相同。这意味着,通过使用 @EnableIntegration(或应用程序上下文中的任何 Spring Integration 命名空间),你可以省略 @EnableWebSocket 声明,因为 Spring Integration 基础设施会检测所有 WebSocket 端点。

从 6.1 版本开始,ClientWebSocketContainer 可以使用提供的 URI 进行配置,而不需要使用 uriTemplateuriVariables 组合。这在需要对 URI 的某些部分进行自定义编码时非常有用。可以参见 UriComponentsBuilder API 以获得便利。

WebSocket 入站通道适配器

WebSocketInboundChannelAdapter 实现了 WebSocketSession 交互的接收部分。你必须为其提供一个 IntegrationWebSocketContainer,并且适配器会将自己注册为 WebSocketListener 以处理传入的消息和 WebSocketSession 事件。

备注

IntegrationWebSocketContainer 中只能注册一个 WebSocketListener

对于 WebSocket 子协议,WebSocketInboundChannelAdapter 可以使用 SubProtocolHandlerRegistry 作为第二个构造函数参数进行配置。适配器委托给 SubProtocolHandlerRegistry 来确定接受的 WebSocketSession 的适当 SubProtocolHandler,并根据子协议实现将 WebSocketMessage 转换为 Message

备注

默认情况下,WebSocketInboundChannelAdapter 仅依赖于原始的 PassThruSubProtocolHandler 实现,该实现将 WebSocketMessage 转换为 Message

WebSocketInboundChannelAdapter 仅接受并向底层集成流发送具有 SimpMessageType.MESSAGE 或空 simpMessageType 标头的 Message 实例。所有其他类型的 Message 是通过从 SubProtocolHandler 实现(例如 StompSubProtocolHandler)发出的 ApplicationEvent 实例来处理的。

在服务器端,如果存在 @EnableWebSocketMessageBroker 配置,你可以使用 useBroker = true 选项配置 WebSocketInboundChannelAdapter。在这种情况下,所有 非 MESSAGE 类型的 Message 都会委托给提供的 AbstractBrokerMessageHandler。此外,如果代理中继配置了目标前缀,则匹配代理目的地的消息将路由到 AbstractBrokerMessageHandler,而不是路由到 WebSocketInboundChannelAdapteroutputChannel

如果 useBroker = false 且接收到的消息是 SimpMessageType.CONNECT 类型,WebSocketInboundChannelAdapter 会立即向 WebSocketSession 发送一个 SimpMessageType.CONNECT_ACK 消息,而不会将其发送到通道。

备注

Spring 的 WebSocket 支持只允许配置一个代理中继。因此,我们不需要 AbstractBrokerMessageHandler 引用。它在应用程序上下文中被检测到。

对于更多配置选项,请参阅 WebSockets 命名空间支持

WebSocket 外发通道适配器

WebSocketOutboundChannelAdapter

  1. 从其 MessageChannel 接受 Spring Integration 消息

  2. MessageHeaders 确定 WebSocketSessionid

  3. 从提供的 IntegrationWebSocketContainer 中检索 WebSocketSession

  4. WebSocketMessage 的转换和发送工作委托给来自提供的 SubProtocolHandlerRegistry 的适当 SubProtocolHandler

在客户端,WebSocketSessionid 消息头不是必需的,因为 ClientWebSocketContainer 仅处理单个连接及其相应的 WebSocketSession

要使用 STOMP 子协议,您应该使用 StompSubProtocolHandler 配置此适配器。然后,您可以发送任何 STOMP 消息类型到此适配器,使用 StompHeaderAccessor.create(StompCommand…​) 和一个 MessageBuilder,或者仅仅使用一个 HeaderEnricher(参见Header Enricher)。

本章的其余部分主要涵盖了额外的配置选项。

WebSockets 命名空间支持

Spring Integration WebSocket 命名空间包括几个组件,在本章的其余部分中进行了描述。要将其包含在配置中,请在应用程序上下文配置文件中使用以下命名空间声明:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/websocket
https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
...
</beans>
xml

<int-websocket:client-container> 属性

以下列表显示了 <int-websocket:client-container> 元素可用的属性:

<int-websocket:client-container
id="" // <1>
client="" // <2>
uri="" // <3>
uri-variables="" // <4>
origin="" // <5>
send-time-limit="" // <6>
send-buffer-size-limit="" // <7>
send-buffer-overflow-strategy="" // <8>
auto-startup="" // <9>
phase=""> // <10>
<int-websocket:http-headers>
<entry key="" value=""/>
</int-websocket:http-headers> // <11>
</int-websocket:client-container>
xml
  • 组件的bean名称。

  • WebSocketClient bean 引用。

  • 目标 WebSocket 服务的 uriuriTemplate。如果你将其作为带有 URI 变量占位符的 uriTemplate 使用,则需要 uri-variables 属性。

  • uri 属性值内的 URI 变量占位符的逗号分隔值。根据它们在 uri 中的顺序,这些值会替换到占位符中。请参阅 UriComponents.expand(Object…​uriVariableValues)

  • Origin 握手 HTTP 头的值。

  • WebSocket 会话 'send' 超时限制。默认为 10000

  • WebSocket 会话 'send' 消息大小限制。默认为 524288

  • WebSocket 会话发送缓冲区溢出策略决定了当会话的传出消息缓冲区达到 send-buffer-size-limit 时的行为。有关可能的值和更多详细信息,请参阅 ConcurrentWebSocketSessionDecorator.OverflowStrategy

  • 布尔值,指示此端点是否应自动启动。默认为 false,假定此容器是从 WebSocket 入站适配器 启动的。

  • 此端点应在其中启动和停止的生命周期阶段。值越小,此端点启动得越早,停止得越晚。默认值为 Integer.MAX_VALUE。值可以是负数。请参阅 SmartLifeCycle

  • 一个包含要在握手请求中使用的 HttpHeadersMap

<int-websocket:server-container> 属性 {#int-websocket:server-container-attributes}

以下列表显示了 <int-websocket:server-container> 元素可用的属性:

<int-websocket:server-container
id="" // <1>
path="" // <2>
handshake-handler="" // <3>
handshake-interceptors="" // <4>
decorator-factories="" // <5>
send-time-limit="" // <6>
send-buffer-size-limit="" // <7>
send-buffer-overflow-strategy="" // <8>
allowed-origins=""> // <9>
<int-websocket:sockjs
client-library-url="" // <10>
stream-bytes-limit="" // <11>
session-cookie-needed="" // <12>
heartbeat-time="" // <13>
disconnect-delay="" // <14>
message-cache-size="" // <15>
websocket-enabled="" // <16>
scheduler="" // <17>
message-codec="" // <18>
transport-handlers="" // <19>
suppress-cors="true" /> // <20>
</int-websocket:server-container>
xml
  • 组件 bean 名称。

  • 一个路径(或逗号分隔的路径),用于将特定请求映射到 WebSocketHandler。支持精确路径映射 URI(如 /myPath)和 ant 风格的路径模式(如 /myPath/**)。

  • HandshakeHandler bean 引用。默认为 DefaultHandshakeHandler

  • HandshakeInterceptor bean 引用列表。

  • 一个或多个工厂 (WebSocketHandlerDecoratorFactory) 的列表,这些工厂用于装饰处理 WebSocket 消息的处理器。这在某些高级用例中可能很有用(例如,允许 Spring Security 在相应的 HTTP 会话过期时强制关闭 WebSocket 会话)。更多信息请参阅 Spring Session 项目

  • 请参阅 <int-websocket:client-container> 上的相同选项。

  • 请参阅 <int-websocket:client-container> 上的相同选项。

  • WebSocket 会话发送缓冲区溢出策略决定了当会话的传出消息缓冲区达到 send-buffer-size-limit 时的行为。有关可能的值和更多详细信息,请参阅 ConcurrentWebSocketSessionDecorator.OverflowStrategy

  • 允许的来源头值。您可以指定多个来源作为逗号分隔的列表。此检查主要针对浏览器客户端设计。其他类型的客户端可以修改来源头值,并无任何限制。当启用了 SockJS 并且限制了允许的来源时,不使用来源头进行跨域请求的传输类型(如 jsonp-pollingiframe-xhr-pollingiframe-eventsourceiframe-htmlfile)将被禁用。因此,IE6 和 IE7 不受支持,而 IE8 和 IE9 只有在没有 cookie 的情况下才受支持。默认情况下,所有来源都是允许的。

  • 没有原生跨域通信的传输(如 eventsourcehtmlfile)必须从“外部”域获取一个简单的页面,在不可见的 iframe 中运行代码,以使 iframe 中的代码可以从本地于 SockJS 服务器的域运行。由于 iframe 需要加载 SockJS JavaScript 客户端库,此属性允许您指定加载它的位置。默认情况下,它指向 [d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js](https://d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js)。但是,您也可以将其设置为应用程序提供的 URL。请注意,可以指定相对 URL,在这种情况下,URL 必须相对于 iframe URL。例如,假设 SockJS 端点映射到 /sockjs,生成的 iframe URL 是 /sockjs/iframe.html,相对 URL 必须以 "../../" 开头,以遍历到 SockJS 映射上方的位置。对于基于前缀的 servlet 映射,您可能需要再进行一次遍历。

  • 在单个 HTTP 流式请求关闭之前可以发送的最小字节数。默认为 128K(即 128 * 1024 或 131072 字节)。

  • 来自 SockJs /info 端点响应中的 cookie_needed 值。此属性指示是否需要 JSESSIONID cookie 以确保应用程序正常工作(例如,用于负载均衡或在 Java Servlet 容器中使用 HTTP 会话)。

  • 服务器未发送任何消息的时间(以毫秒为单位),在此之后服务器应向客户端发送心跳帧以保持连接不断开。默认值为 25,000(25 秒)。

  • 客户端被认为断开连接之前的时间(以毫秒为单位),即在没有接收连接的情况下(即服务器无法通过活动连接向客户端发送数据)。默认值为 5000

  • 会话在等待下一个来自客户端的 HTTP 轮询请求时可以缓存的服务器到客户端消息的数量。默认大小为 100

  • 一些负载均衡器不支持 WebSockets。将此选项设置为 false 以禁用服务器端的 WebSocket 传输。默认值为 true

  • TaskScheduler bean 引用。如果没有提供值,则创建一个新的 ThreadPoolTaskScheduler 实例。此调度程序实例用于安排心跳消息。

  • 用于编码和解码 SockJS 消息的 SockJsMessageCodec bean 引用。默认使用 Jackson2SockJsMessageCodec,这要求类路径上存在 Jackson 库。

  • TransportHandler bean 引用列表。

  • 是否禁用自动添加 CORS 头以处理 SockJS 请求。默认值为 false

<int-websocket:outbound-channel-adapter> 属性

以下列表显示了 <int-websocket:outbound-channel-adapter> 元素可用的属性:

<int-websocket:outbound-channel-adapter
id="" // <1>
channel="" // <2>
container="" // <3>
default-protocol-handler="" // <4>
protocol-handlers="" // <5>
message-converters="" // <6>
merge-with-default-converters="" // <7>
auto-startup="" // <8>
phase=""/> // <9>
xml
  • 组件的bean名称。如果你不提供 channel 属性,则会创建一个 DirectChannel 并在应用程序上下文中以这个 id 属性作为bean名称进行注册。在这种情况下,端点将以bean名称 id 加上 .adapter 进行注册。而 MessageHandler 将以bean别名 id 加上 .handler 进行注册。

  • 标识与此适配器关联的通道。

  • IntegrationWebSocketContainer bean 的引用,它封装了低级别的连接和 WebSocketSession 处理操作。这是必需的。

  • SubProtocolHandler 实例的可选引用。当客户端未请求子协议或它是单个协议处理程序时使用。如果未提供此引用或 protocol-handlers 列表,则默认使用 PassThruSubProtocolHandler

  • 为该通道适配器列出的 SubProtocolHandler bean 引用。如果你只提供了一个bean引用且没有提供 default-protocol-handler,那么这个单一的 SubProtocolHandler 将作为 default-protocol-handler 使用。如果你不设置此属性或 default-protocol-handler,则默认使用 PassThruSubProtocolHandler

  • 为该通道适配器列出的 MessageConverter bean 引用。

  • 布尔值,指示是否应在任何自定义转换器之后注册默认转换器。仅当提供了 message-converters 时才使用此标志。否则,所有默认转换器都将被注册。默认值为 false。默认转换器(按顺序)是:StringMessageConverterByteArrayMessageConverterMappingJackson2MessageConverter(如果类路径上有 Jackson 库)。

  • 布尔值,指示此端点是否应自动启动。默认值为 true

  • 此端点应在其中启动和停止的生命周期阶段。值越小,此端点启动得越早,停止得越晚。默认值为 Integer.MIN_VALUE。值可以是负数。请参阅 SmartLifeCycle

<int-websocket:inbound-channel-adapter> 属性 {#int-websocket:inbound-channel-adapter-attributes}

以下列表显示了 <int-websocket:outbound-channel-adapter> 元素可用的属性:

<int-websocket:inbound-channel-adapter
id="" // <1>
channel="" // <2>
error-channel="" // <3>
container="" // <4>
default-protocol-handler="" // <5>
protocol-handlers="" // <6>
message-converters="" // <7>
merge-with-default-converters="" // <8>
send-timeout="" // <9>
payload-type="" // <10>
use-broker="" // <11>
auto-startup="" // <12>
phase=""/> // <13>
xml
  • 组件的 bean 名称。如果你不设置 channel 属性,将创建一个 DirectChannel 并以该 id 属性作为 bean 名称注册到应用程序上下文中。在这种情况下,端点将以 bean 名称 id 加上 .adapter 注册。

  • 标识与此适配器关联的通道。

  • 应该发送 ErrorMessage 实例的 MessageChannel bean 引用。

  • 请参阅 <int-websocket:outbound-channel-adapter> 上相同的选项。

  • 请参阅 <int-websocket:outbound-channel-adapter> 上相同的选项。

  • 请参阅 <int-websocket:outbound-channel-adapter> 上相同的选项。

  • 请参阅 <int-websocket:outbound-channel-adapter> 上相同的选项。

  • 请参阅 <int-websocket:outbound-channel-adapter> 上相同的选项。

  • 在向通道发送消息时的最大等待时间(以毫秒为单位),如果通道可以阻塞的话。例如,当 QueueChannel 达到最大容量时,会一直阻塞直到有可用空间。

  • 目标 payload 的 Java 类型的完全限定名称,用于从传入的 WebSocketMessage 进行转换。默认值为 java.lang.String

  • 指示此适配器是否将 non-MESSAGE WebSocketMessage 实例和带有代理目标的消息发送到应用程序上下文中的 AbstractBrokerMessageHandler。当此属性为 true 时,需要 Broker Relay 配置。此属性仅在服务器端使用,在客户端被忽略。默认值为 false

  • 请参阅 <int-websocket:outbound-channel-adapter> 上相同的选项。

  • 请参阅 <int-websocket:outbound-channel-adapter> 上相同的选项。

使用 ClientStompEncoder

从 4.3.13 版本开始,Spring Integration 提供了 ClientStompEncoder(作为标准 StompEncoder 的扩展),用于 WebSocket 通道适配器的客户端。为了正确准备客户端消息,您必须将 ClientStompEncoder 的实例注入到 StompSubProtocolHandler 中。默认 StompSubProtocolHandler 的一个问题是它被设计用于服务器端,因此它会将 SEND stompCommand 头更新为 MESSAGE(这是 STOMP 协议对服务器端的要求)。如果客户端不以正确的 SEND WebSocket 帧发送消息,一些 STOMP 消息代理将不会接受它们。在这种情况下,ClientStompEncoder 的作用是覆盖 stompCommand 头,并在将消息编码为 byte[] 之前将其设置为 SEND 值。

动态 WebSocket 端点注册

从 5.5 版本开始,WebSocket 服务器端点(基于 ServerWebSocketContainer 的通道适配器)现在可以在运行时注册(和移除)——ServerWebSocketContainer 映射的 paths 通过 HandlerMapping 暴露到 DispatcherServlet 中,并对 WebSocket 客户端可访问。动态和运行时集成流支持以透明的方式注册这些端点:

@Autowired
IntegrationFlowContext integrationFlowContext;

@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
new ServerWebSocketContainer("/dynamic")
.setHandshakeHandler(this.handshakeHandler);

WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
new WebSocketInboundChannelAdapter(serverWebSocketContainer);

QueueChannel dynamicRequestsChannel = new QueueChannel();

IntegrationFlow serverFlow =
IntegrationFlow.from(webSocketInboundChannelAdapter)
.channel(dynamicRequestsChannel)
.get();

IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
this.integrationFlowContext.registration(serverFlow)
.addBean(serverWebSocketContainer)
.register();
...
dynamicServerFlow.destroy();
java
备注

在动态流注册中调用 .addBean(serverWebSocketContainer) 来将 ServerWebSocketContainer 的实例添加到 ApplicationContext 中进行端点注册非常重要。当销毁动态流注册时,相关的 ServerWebSocketContainer 实例也会被销毁,以及相应的端点注册,包括 URL 路径映射。

important

动态 Websocket 端点只能通过 Spring Integration 机制注册:当使用常规的 Spring @EnableWebsocket 时,Spring Integration 配置会退让,并且不会注册动态端点的任何基础设施。