STOMP客户端
Spring提供了基于WebSocket的STOMP客户端和基于TCP的STOMP客户端。
首先,你可以创建并配置WebSocketStompClient,如下例所示:
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats
在前面的例子中,你可以用 SockJsClient 替换 StandardWebSocketClient,因为 SockJsClient 也是 WebSocketClient 的一种实现。SockJsClient 可以在无法使用 WebSocket 时,回退到基于 HTTP 的传输方式。更多详情,请参见 SockJsClient。
接下来,你可以建立连接并为STOMP会话提供一个处理程序,如下例所示:
String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);
当会话可以使用时,会通知处理器,如下例所示:
public class MyStompSessionHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
// ...
}
}
一旦会话建立,就可以发送任何有效载荷(payload),并且该有效载荷会使用配置好的MessageConverter进行序列化处理,如下例所示:
session.send("/topic/something", "payload");
您也可以订阅目的地。subscribe 方法需要一个用于处理订阅消息的处理器,并返回一个 Subscription 接口,您可以使用该接口来取消订阅。对于每条接收到的消息,处理器可以指定负载应该被反序列化的目标 Object 类型,如下例所示:
session.subscribe("/topic/something", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
// ...
}
});
要启用STOMP心跳机制,你可以使用TaskScheduler来配置WebSocketStompClient,并可选地自定义心跳间隔(写入操作不活跃时为10秒,此时会发送一次心跳信号;读取操作不活跃时也为10秒,此时会关闭连接)。
WebSocketStompClient 仅在无活动(即没有发送其他消息)的情况下才发送心跳信号。当使用外部代理时,这可能会带来挑战,因为那些目标地址非代理的消息虽然表示有通信活动,但实际上并不会被转发给代理。在这种情况下,你可以在初始化 外部代理 时配置一个 TaskScheduler,以确保即使只发送了目标地址非代理的消息,也能将心跳信号转发给代理。
当您使用 WebSocketStompClient 进行性能测试,以模拟同一台机器上的数千个客户端时,可以考虑关闭心跳机制。因为每个连接都会自行调度心跳任务,而这种机制并不适合在同一台机器上运行大量客户端的情况。
STOMP协议还支持接收确认(receipts)功能,客户端必须在请求发送或订阅操作完成后添加一个receipt头部信息,服务器会在处理完成后通过RECEIPT帧对此作出响应。为了支持这一功能,StompSession提供了setAutoReceipt(boolean)方法,该方法会使得在后续的每次发送或订阅操作中都会自动添加receipt头部信息。此外,您也可以手动将receipt头部信息添加到StompHeaders中。无论是发送请求还是订阅操作,都会返回一个Receiptable实例,您可以使用该实例来注册接收成功或失败的回调函数。要使用此功能,您需要为客户端配置一个TaskScheduler,并指定接收确认的有效时间(默认为15秒)。
请注意,StompSessionHandler 本身是一个 StompFrameHandler,这使得它除了能够处理来自消息处理的异常的 handleException 回调外,还能够处理 ERROR 框架,以及包括 ConnectionLostException 在内的传输层错误的 handleTransportError 回调。
你可以使用 WebSocketStompClient 的 inboundMessageSizeLimit 和 outboundMessageSizeLimit 属性来限制 WebSocket 入站和出站消息的最大大小。当出站的 STOMP 消息超过限制时,它会被分割成多个部分(frame),接收方需要重新组装这些部分。默认情况下,出站消息没有大小限制。当入站的 STOMP 消息大小超过配置的限制时,会抛出一个 StompConversionException 异常。入站消息的默认大小限制是 64KB。
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setInboundMessageSizeLimit(64 * 1024); // 64KB
stompClient.setOutboundMessageSizeLimit(64 * 1024); // 64KB