STOMP 客户端
Spring 提供了一个基于 WebSocket 的 STOMP 客户端和一个基于 TCP 的 STOMP 客户端。
首先,您可以创建并配置 WebSocketStompClient
,如下例所示:
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats
在前面的示例中,您可以将 StandardWebSocketClient
替换为 SockJsClient
,因为它也是 WebSocketClient
的一种实现。SockJsClient
可以使用 WebSocket 或基于 HTTP 的传输作为后备。有关更多详细信息,请参见 SockJsClient。
接下来,您可以建立连接并为 STOMP 会话提供处理程序,如下例所示:
String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);
当会话准备好使用时,处理程序会收到通知,如下例所示:
public class MyStompSessionHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
// ...
}
}
一旦会话建立,任何有效负载都可以发送,并使用配置的 MessageConverter
进行序列化,如下例所示:
session.send("/topic/something", "payload");
您还可以订阅目标。subscribe
方法需要一个处理程序来处理订阅上的消息,并返回一个 Subscription
句柄,您可以使用它来取消订阅。对于每个接收到的消息,处理程序可以指定目标 Object
类型,以便将有效负载反序列化,如以下示例所示:
session.subscribe("/topic/something", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
// ...
}
});
要启用 STOMP 心跳,您可以使用 TaskScheduler
配置 WebSocketStompClient
,并可选地自定义心跳间隔(写入不活动 10 秒,导致发送心跳,读取不活动 10 秒,关闭连接)。
WebSocketStompClient
仅在不活动时发送心跳,即当没有其他消息被发送时。这在使用外部代理时可能会带来挑战,因为目标不是代理的消息代表了活动,但实际上并没有被转发到代理。在这种情况下,您可以在初始化 External Broker 时配置一个 TaskScheduler
,以确保即使仅发送目标不是代理的消息时,心跳也会被转发到代理。
当您使用 WebSocketStompClient
进行性能测试以模拟来自同一台机器的数千个客户端时,请考虑关闭心跳,因为每个连接都会调度其自己的心跳任务,这对于在同一台机器上运行大量客户端并不优化。
STOMP 协议还支持收据,客户端必须添加一个 receipt
头,服务器在处理完发送或订阅后会以 RECEIPT 帧进行响应。为了支持这一点,StompSession
提供了 setAutoReceipt(boolean)
方法,该方法会在每个后续的发送或订阅事件中自动添加 receipt
头。或者,您也可以手动将收据头添加到 StompHeaders
中。发送和订阅都会返回一个 Receiptable
实例,您可以使用它来注册收据成功和失败的回调。对于此功能,您必须使用 TaskScheduler
配置客户端,并设置收据过期前的时间(默认 15 秒)。
注意,StompSessionHandler
本身是一个 StompFrameHandler
,这使得它除了可以处理消息处理中的异常的 handleException
回调外,还可以处理 ERROR 帧,以及处理传输级错误的 handleTransportError
,包括 ConnectionLostException
。
您可以使用 WebSocketStompClient
的 inboundMessageSizeLimit
和 outboundMessageSizeLimit
属性来限制入站和出站 WebSocket 消息的最大大小。当出站 STOMP 消息超过限制时,它会被拆分为多个部分帧,接收方需要重新组装。默认情况下,出站消息没有大小限制。当入站 STOMP 消息大小超过配置的限制时,将抛出 StompConversionException
。入站消息的默认大小限制为 64KB
。
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setInboundMessageSizeLimit(64 * 1024); // 64KB
stompClient.setOutboundMessageSizeLimit(64 * 1024); // 64KB