WebSockets 支持
从版本4.1开始,Spring Integration 提供了 WebSocket 支持。它基于 Spring Framework 的 web-socket 模块的架构、基础设施和 API。因此,许多 Spring WebSocket 的组件(例如 SubProtocolHandler 或 WebSocketClient)和配置选项(例如 @EnableWebSocketMessageBroker)可以在 Spring Integration 中复用。更多信息,请参阅 Spring Framework 参考手册中的 Spring Framework WebSocket 支持章节。
此依赖项为项目所需:
- Maven
- Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-websocket</artifactId>
<version>7.0.2</version>
</dependency>
compile "org.springframework.integration:spring-integration-websocket:7.0.2"
对于服务器端,必须显式包含 org.springframework:spring-webmvc 依赖项。
Spring Framework 的 WebSocket 基础设施基于 Spring 消息传递基础构建,并提供了一个基于相同 MessageChannel 实现和 MessageHandler 实现的基本消息传递框架(这些实现与 Spring Integration 使用的相同,还包括一些 POJO 方法注解映射)。因此,即使没有 WebSocket 适配器,Spring Integration 也可以直接参与到 WebSocket 流程中。为此,你可以使用适当的注解配置一个 Spring Integration @MessagingGateway,如下例所示:
@MessagingGateway
@Controller
public interface WebSocketGateway {
@MessageMapping("/greeting")
@SendToUser("/queue/answer")
@Gateway(requestChannel = "greetingChannel")
String greeting(String payload);
}
概述
由于 WebSocket 协议本质上是流式的,并且我们可以在同一时间向 WebSocket 发送和接收消息,因此无论处于客户端还是服务器端,我们都可以处理相应的 WebSocketSession。为了封装连接管理和 WebSocketSession 注册,系统提供了 IntegrationWebSocketContainer,并包含 ClientWebSocketContainer 和 ServerWebSocketContainer 实现。得益于 WebSocket API 及其在 Spring Framework 中的实现(包含许多扩展功能),服务器端和客户端(当然是从 Java 角度)都使用相同的类。因此,大多数连接和 WebSocketSession 注册选项在两端都是相同的。这使得我们能够复用许多配置项和基础设施钩子,在服务器端和客户端构建 WebSocket 应用程序。以下示例展示了组件如何同时服务于这两个目的:
//Client side
@Bean
public WebSocketClient webSocketClient() {
return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}
@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}
//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
return new ServerWebSocketContainer("/endpoint").withSockJs();
}
IntegrationWebSocketContainer 旨在实现双向消息传递,可在入站和出站通道适配器之间共享(见下文)。当使用单向(发送或接收)WebSocket消息传递时,可仅从其中一个适配器引用它。它也可以在没有通道适配器的情况下使用,但在此情况下,IntegrationWebSocketContainer 仅充当 WebSocketSession 注册表的角色。
ServerWebSocketContainer 实现了 WebSocketConfigurer 接口,用于将一个内部的 IntegrationWebSocketContainer.IntegrationWebSocketHandler 注册为一个 Endpoint。该注册在 ServletWebSocketHandlerRegistry 中,针对目标供应商的 WebSocket 容器,在提供的 paths 以及其他服务器 WebSocket 选项(例如 HandshakeHandler 或 SockJS fallback)下进行。此注册是通过一个基础设施组件 WebSocketIntegrationConfigurationInitializer 实现的,其作用与 @EnableWebSocket 注解相同。这意味着,通过使用 @EnableIntegration(或应用上下文中的任何 Spring Integration 命名空间),你可以省略 @EnableWebSocket 声明,因为 Spring Integration 基础设施会自动检测所有 WebSocket 端点。
从 6.1 版本开始,ClientWebSocketContainer 可以通过提供的 URI 进行配置,而不再需要 uriTemplate 和 uriVariables 的组合。这在需要对 URI 的某些部分进行自定义编码时非常有用。为方便起见,请参阅 UriComponentsBuilder API。
WebSocket 入站通道适配器
WebSocketInboundChannelAdapter 实现了 WebSocketSession 交互的接收部分。您必须为其提供一个 IntegrationWebSocketContainer,该适配器会将自身注册为 WebSocketListener,以处理传入的消息和 WebSocketSession 事件。
在 IntegrationWebSocketContainer 中只能注册一个 WebSocketListener。
对于WebSocket子协议,WebSocketInboundChannelAdapter 可以通过将 SubProtocolHandlerRegistry 作为第二个构造函数参数进行配置。该适配器会委托 SubProtocolHandlerRegistry 来确定已接受的 WebSocketSession 的适当 SubProtocolHandler,并根据子协议实现将 WebSocketMessage 转换为 Message。
默认情况下,WebSocketInboundChannelAdapter 仅依赖于原始的 PassThruSubProtocolHandler 实现,该实现将 WebSocketMessage 转换为 Message。
WebSocketInboundChannelAdapter 仅接受 SimpMessageType.MESSAGE 类型或 simpMessageType 标头为空的 Message 实例,并将其发送到底层集成流。所有其他 Message 类型均通过 SubProtocolHandler 实现(例如 StompSubProtocolHandler)发出的 ApplicationEvent 实例进行处理。
在服务器端,如果存在 @EnableWebSocketMessageBroker 配置,您可以使用 useBroker = true 选项来配置 WebSocketInboundChannelAdapter。在这种情况下,所有 非 MESSAGE 类型的 Message 都会被委托给提供的 AbstractBrokerMessageHandler 处理。此外,如果代理中继配置了目标前缀,那么匹配代理目标的消息将被路由到 AbstractBrokerMessageHandler,而不是 WebSocketInboundChannelAdapter 的 outputChannel。
如果 useBroker = false 且接收到的消息类型为 SimpMessageType.CONNECT,WebSocketInboundChannelAdapter 会立即向 WebSocketSession 发送一条 SimpMessageType.CONNECT_ACK 消息,而不会将其发送到通道。
Spring 的 WebSocket 支持仅允许配置一个代理中继。因此,我们不需要 AbstractBrokerMessageHandler 引用。它会在应用上下文中被自动检测到。
更多配置选项,请参阅 WebSockets 命名空间支持。
WebSocket 出站通道适配器
WebSocketOutboundChannelAdapter:
-
从
MessageChannel接收 Spring Integration 消息 -
从
MessageHeaders中确定WebSocketSession的id -
从提供的
IntegrationWebSocketContainer中检索WebSocketSession -
将
WebSocketMessage的转换和发送工作委托给提供的SubProtocolHandlerRegistry中合适的SubProtocolHandler。
在客户端,WebSocketSession 的 id 消息头不是必需的,因为 ClientWebSocketContainer 仅处理单个连接及其对应的 WebSocketSession。
要使用STOMP子协议,您需要使用StompSubProtocolHandler来配置此适配器。然后,您可以使用StompHeaderAccessor.create(StompCommand…)和MessageBuilder,或者直接使用HeaderEnricher(参见Header Enricher)向此适配器发送任何STOMP消息类型。
本章的其余部分主要涵盖额外的配置选项。
WebSockets 命名空间支持
Spring Integration WebSocket 命名空间包含本章后续部分描述的多个组件。若要在配置中使用该命名空间,请在应用程序上下文配置文件中添加以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/websocket
https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
...
</beans>
<int-websocket:client-container> 属性
以下列表展示了 <int-websocket:client-container> 元素可用的属性:
<int-websocket:client-container
id="" // <1>
client="" // <2>
uri="" // <3>
uri-variables="" // <4>
origin="" // <5>
send-time-limit="" // <6>
send-buffer-size-limit="" // <7>
send-buffer-overflow-strategy="" // <8>
auto-startup="" // <9>
phase=""> // <10>
<int-websocket:http-headers>
<entry key="" value=""/>
</int-websocket:http-headers> // <11>
</int-websocket:client-container>
组件 Bean 名称。
WebSocketClientBean 引用。目标 WebSocket 服务的
uri或uriTemplate。如果将其用作带有 URI 变量占位符的uriTemplate,则uri-variables属性是必需的。uri属性值中 URI 变量占位符的逗号分隔值。这些值根据它们在uri中的顺序替换到占位符中。请参阅 UriComponents.expand(Object…uriVariableValues)。Origin握手 HTTP 标头值。WebSocket 会话
send超时限制。默认为10000。WebSocket 会话
send消息大小限制。默认为524288。WebSocket 会话发送缓冲区溢出策略,用于确定当会话的出站消息缓冲区达到
send-buffer-size-limit时的行为。有关可能的值和更多详细信息,请参阅ConcurrentWebSocketSessionDecorator.OverflowStrategy。布尔值,指示此端点是否应自动启动。默认为
false,假设此容器是从 WebSocket 入站适配器 启动的。此端点应启动和停止的生命周期阶段。值越低,此端点启动越早,停止越晚。默认值为
Integer.MAX_VALUE。值可以为负数。请参阅 SmartLifeCycle。用于握手请求的
HttpHeaders的Map。
<int-websocket:server-container> 属性 {#int-websocket:server-container-attributes}
以下列表展示了 <int-websocket:server-container> 元素可用的属性:
<int-websocket:server-container
id="" // <1>
path="" // <2>
handshake-handler="" // <3>
handshake-interceptors="" // <4>
decorator-factories="" // <5>
send-time-limit="" // <6>
send-buffer-size-limit="" // <7>
send-buffer-overflow-strategy="" // <8>
allowed-origins=""> // <9>
<int-websocket:sockjs
client-library-url="" // <10>
stream-bytes-limit="" // <11>
session-cookie-needed="" // <12>
heartbeat-time="" // <13>
disconnect-delay="" // <14>
message-cache-size="" // <15>
websocket-enabled="" // <16>
scheduler="" // <17>
message-codec="" // <18>
transport-handlers="" // <19>
suppress-cors="true" /> // <20>
</int-websocket:server-container>
组件 Bean 名称。
将特定请求映射到
WebSocketHandler的路径(或逗号分隔的路径列表)。支持精确路径映射 URI(例如/myPath)和 Ant 风格路径模式(例如/myPath/**)。HandshakeHandlerBean 引用。默认为DefaultHandshakeHandler。HandshakeInterceptorBean 引用列表。用于装饰处理 WebSocket 消息的处理程序的一个或多个工厂(
WebSocketHandlerDecoratorFactory)列表。这对于某些高级用例可能很有用(例如,允许 Spring Security 在相应的 HTTP 会话过期时强制关闭 WebSocket 会话)。更多信息请参阅 Spring Session 项目。请参阅 <int-websocket:client-container> 上的相同选项。
请参阅 <int-websocket:client-container> 上的相同选项。
WebSocket 会话发送缓冲区溢出策略决定了当会话的出站消息缓冲区达到
send-buffer-size-limit时的行为。可能的取值和更多细节请参阅ConcurrentWebSocketSessionDecorator.OverflowStrategy。允许的 Origin 头部值。您可以指定多个来源,以逗号分隔的列表形式。此检查主要针对浏览器客户端设计。无法阻止其他类型的客户端修改 Origin 头部值。当启用 SockJS 并限制允许的来源时,不使用 Origin 头部进行跨域请求的传输类型(
jsonp-polling、iframe-xhr-polling、iframe-eventsource和iframe-htmlfile)将被禁用。因此,不支持 IE6 和 IE7,IE8 和 IE9 仅在无 Cookie 模式下支持。默认情况下,允许所有来源。没有原生跨域通信的传输方式(例如
eventsource和htmlfile)必须从一个“外部”域在一个不可见的 iframe 中获取一个简单页面,以便 iframe 中的代码可以在 SockJS 服务器的本地域中运行。由于 iframe 需要加载 SockJS JavaScript 客户端库,此属性允许您指定加载它的位置。默认情况下,它指向[d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js](https://d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js)。但是,您也可以将其设置为指向应用程序提供的 URL。请注意,可以指定相对 URL,在这种情况下,URL 必须相对于 iframe URL。例如,假设一个映射到/sockjs的 SockJS 端点,生成的 iframe URL 是/sockjs/iframe.html,相对 URL 必须以../../开头,以向上遍历到 SockJS 映射之上的位置。对于基于前缀的 Servlet 映射,可能需要再多一次遍历。单个 HTTP 流式请求在关闭前可以发送的最小字节数。默认为
128K(即 128*1024 或 131072 字节)。SockJs
/info端点响应中的cookie_needed值。此属性指示应用程序是否需要JSESSIONIDCookie 才能正常运行(例如,用于负载均衡或在 Java Servlet 容器中使用 HTTP 会话)。服务器未发送任何消息的时间量(以毫秒为单位),超过此时间后,服务器应向客户端发送一个心跳帧以防止连接中断。默认值为
25,000(25 秒)。客户端在未建立接收连接(即服务器可以向客户端发送数据的活动连接)后被视为断开连接之前的时间量(以毫秒为单位)。默认值为
5000。会话在等待客户端的下一个 HTTP 轮询请求时可以缓存的服务器到客户端消息的数量。默认大小为
100。某些负载均衡器不支持 WebSocket。将此选项设置为
false以在服务器端禁用 WebSocket 传输。默认值为true。TaskSchedulerBean 引用。如果未提供值,则会创建一个新的ThreadPoolTaskScheduler实例。此调度程序实例用于调度心跳消息。用于编码和解码 SockJS 消息的
SockJsMessageCodecBean 引用。默认使用Jackson2SockJsMessageCodec,这要求类路径上存在 Jackson 库。TransportHandlerBean 引用列表。是否禁用为 SockJS 请求自动添加 CORS 头部。默认值为
false。
<int-websocket:outbound-channel-adapter> 属性
以下列表展示了 <int-websocket:outbound-channel-adapter> 元素可用的属性:
<int-websocket:outbound-channel-adapter
id="" // <1>
channel="" // <2>
container="" // <3>
default-protocol-handler="" // <4>
protocol-handlers="" // <5>
message-converters="" // <6>
merge-with-default-converters="" // <7>
auto-startup="" // <8>
phase=""/> // <9>
组件 Bean 名称。若未提供
channel属性,将创建DirectChannel并在应用上下文中以该id属性作为 Bean 名称进行注册。此时,端点将以id加上.adapter作为 Bean 名称注册,而MessageHandler将以id加上.handler作为 Bean 别名注册。标识与此适配器关联的通道。
对
IntegrationWebSocketContainerBean 的引用,该 Bean 封装了底层连接和WebSocketSession处理操作。必需。对
SubProtocolHandler实例的可选引用。当客户端未请求子协议或为单一协议处理器时使用。若未提供此引用或protocol-handlers列表,默认使用PassThruSubProtocolHandler。此通道适配器的
SubProtocolHandlerBean 引用列表。若仅提供单个 Bean 引用且未提供default-protocol-handler,则该单个SubProtocolHandler将用作default-protocol-handler。若未设置此属性或default-protocol-handler,默认使用PassThruSubProtocolHandler。此通道适配器的
MessageConverterBean 引用列表。布尔值,指示是否应在任何自定义转换器之后注册默认转换器。仅当提供
message-converters时使用此标志。否则将注册所有默认转换器。默认为false。默认转换器按顺序为:StringMessageConverter、ByteArrayMessageConverter以及MappingJackson2MessageConverter(若类路径中存在 Jackson 库)。布尔值,指示此端点是否应自动启动。默认为
true。此端点应启动和停止的生命周期阶段。数值越低,端点启动越早、停止越晚。默认为
Integer.MIN_VALUE。允许负值。参见 SmartLifeCycle。
<int-websocket:inbound-channel-adapter> 属性 {#int-websocket:inbound-channel-adapter-attributes}
以下列表展示了 <int-websocket:outbound-channel-adapter> 元素可用的属性:
<int-websocket:inbound-channel-adapter
id="" // <1>
channel="" // <2>
error-channel="" // <3>
container="" // <4>
default-protocol-handler="" // <5>
protocol-handlers="" // <6>
message-converters="" // <7>
merge-with-default-converters="" // <8>
send-timeout="" // <9>
payload-type="" // <10>
use-broker="" // <11>
auto-startup="" // <12>
phase=""/> // <13>
组件 Bean 名称。如果未设置
channel属性,将创建一个DirectChannel并在应用程序上下文中注册,其 Bean 名称为此id属性值。此时,端点将以id属性值加上.adapter作为 Bean 名称进行注册。标识与此适配器关联的通道。
应向其发送
ErrorMessage实例的MessageChannelBean 引用。请参阅 <int-websocket:outbound-channel-adapter> 中的相同选项。
请参阅 <int-websocket:outbound-channel-adapter> 中的相同选项。
请参阅 <int-websocket:outbound-channel-adapter> 中的相同选项。
请参阅 <int-websocket:outbound-channel-adapter> 中的相同选项。
请参阅 <int-websocket:outbound-channel-adapter> 中的相同选项。
向通道发送消息时,如果通道可能阻塞,则等待的最长时间(以毫秒为单位)。例如,如果
QueueChannel已达到其最大容量,则可能会阻塞直到有可用空间。要从传入的
WebSocketMessage转换的目标payload的 Java 类型的完全限定名。默认为java.lang.String。指示此适配器是否将
non-MESSAGE类型的WebSocketMessage实例以及带有代理目的地的消息发送到应用程序上下文中的AbstractBrokerMessageHandler。当此属性为true时,需要配置Broker Relay。此属性仅在服务器端使用,在客户端会被忽略。默认为false。请参阅 <int-websocket:outbound-channel-adapter> 中的相同选项。
请参阅 <int-websocket:outbound-channel-adapter> 中的相同选项。
使用 ClientStompEncoder
自 4.3.13 版本起,Spring Integration 提供了 ClientStompEncoder(作为标准 StompEncoder 的扩展),用于 WebSocket 通道适配器的客户端。为了正确准备客户端消息,您必须将 ClientStompEncoder 的实例注入到 StompSubProtocolHandler 中。默认 StompSubProtocolHandler 的一个问题是它专为服务器端设计,因此会将 SEND stompCommand 标头更新为 MESSAGE(这是 STOMP 协议对服务器端的要求)。如果客户端未在正确的 SEND WebSocket 帧中发送其消息,某些 STOMP 代理将不会接受这些消息。在这种情况下,ClientStompEncoder 的作用是在将消息编码为 byte[] 之前,覆盖 stompCommand 标头并将其设置为 SEND 值。
动态 WebSocket 端点注册
从 5.5 版本开始,WebSocket 服务器端点(基于 ServerWebSocketContainer 的通道适配器)现在可以在运行时注册(和移除)——ServerWebSocketContainer 映射的 paths 通过 HandlerMapping 暴露给 DispatcherServlet,并可供 WebSocket 客户端访问。动态和运行时集成流 支持有助于以透明的方式注册这些端点:
@Autowired
IntegrationFlowContext integrationFlowContext;
@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
new ServerWebSocketContainer("/dynamic")
.setHandshakeHandler(this.handshakeHandler);
WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
new WebSocketInboundChannelAdapter(serverWebSocketContainer);
QueueChannel dynamicRequestsChannel = new QueueChannel();
IntegrationFlow serverFlow =
IntegrationFlow.from(webSocketInboundChannelAdapter)
.channel(dynamicRequestsChannel)
.get();
IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
this.integrationFlowContext.registration(serverFlow)
.addBean(serverWebSocketContainer)
.register();
...
dynamicServerFlow.destroy();
重要的是,在动态流注册中调用 .addBean(serverWebSocketContainer),将 ServerWebSocketContainer 的实例添加到 ApplicationContext 中,以便进行端点注册。当动态流注册被销毁时,关联的 ServerWebSocketContainer 实例也会被销毁,同时相应的端点注册(包括 URL 路径映射)也会被移除。
动态 Websocket 端点只能通过 Spring Integration 机制注册:当使用常规的 Spring @EnableWebsocket 时,Spring Integration 配置会退出,并且不会注册用于动态端点的基础设施。