消息流
一旦STOMP端点被公开(即可以被客户端访问),Spring应用程序就成为了连接客户端的STOMP代理。本节将描述服务器端消息的传输流程。
spring-messaging 模块为消息应用程序提供了基础支持,该模块最初来源于 Spring Integration,后来被分离出来并整合到 Spring Framework 中,以便在更多的 Spring 项目 和应用场景中得到更广泛的应用。以下列表简要描述了一些可用的消息抽象:
-
Message:消息的简单表示形式,包括头部(headers)和负载(payload)。
-
MessageHandler:用于处理消息的接口规范。
-
MessageChannel:用于发送消息的接口规范,可实现生产者和消费者之间的松耦合。
-
SubscribableChannel:带有
MessageHandler订阅者的MessageChannel。 -
ExecutorSubscribableChannel:使用
Executor来传递消息的SubscribableChannel。
Java配置(即@EnableWebSocketMessageBroker)和XML命名空间配置(即<websocket:message-broker>)都使用上述组件来组装消息工作流。下图展示了启用简单内置消息代理时所使用的组件:

前面的图表展示了三个消息通道:
-
clientInboundChannel:用于传递从WebSocket客户端接收到的消息。 -
clientOutboundChannel:用于向WebSocket客户端发送服务器消息。 -
brokerChannel:用于从服务器端应用程序代码向消息代理发送消息。
下图展示了在配置外部代理(如RabbitMQ)用于管理订阅和广播消息时所使用的组件:

前两个图表之间的主要区别在于“broker relay”的使用:它通过TCP将消息传递给外部STOMP代理,同时也将消息从代理传递给已订阅的客户端。
当从WebSocket连接接收到消息时,这些消息会被解码为STOMP帧,然后转换为Spring的Message表示形式,并发送到clientInboundChannel以进行进一步处理。例如,目标头部以/app开头的STOMP消息可能会被路由到带有注解的控制器中的@MessageMapping方法,而以/topic或/queue开头的消息则可能会被直接路由到消息代理服务器。
一个带有注解的@Controller可以处理来自客户端的STOMP消息,并通过brokerChannel将消息发送到消息代理(message broker),然后代理会通过clientOutboundChannel将消息广播给订阅该消息的客户端。同一个控制器也可以对HTTP请求执行相同的操作,因此客户端可以发起一个HTTP POST请求,接着一个@PostMapping方法就可以将消息发送到消息代理,由代理再广播给已订阅的客户端。
我们可以通过一个简单的例子来追踪这个流程。考虑以下示例,它设置了一个服务器:
- Java
- Kotlin
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableSimpleBroker("/topic");
}
}
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfiguration : WebSocketMessageBrokerConfigurer {
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
registry.addEndpoint("/portfolio")
}
override fun configureMessageBroker(registry: MessageBrokerRegistry) {
registry.setApplicationDestinationPrefixes("/app")
registry.enableSimpleBroker("/topic")
}
}
- Java
- Kotlin
@Controller
public class GreetingController {
@MessageMapping("/greeting")
public String handle(String greeting) {
return "[" + getTimestamp() + ": " + greeting;
}
private String getTimestamp() {
return new SimpleDateFormat("MM/dd/yyyy h:mm:ss a").format(new Date());
}
}
@Controller
class GreetingController {
@MessageMapping("/greeting")
fun handle(greeting: String): String {
return "[${getTimestamp()}: $greeting"
}
private fun getTimestamp(): String {
return SimpleDateFormat("MM/dd/yyyy h:mm:ss a").format(Date())
}
}
前面的示例支持以下流程:
-
客户端连接到
[localhost:8080/portfolio](http://localhost:8080/portfolio),一旦WebSocket连接建立,STOMP帧就开始在其上传输。 -
客户端发送一个带有目标头
/topic/greeting的 SUBSCRIBE 帧。接收到并解码后,该消息会被发送到clientInboundChannel,然后路由到消息代理,消息代理会存储客户端的订阅信息。 -
客户端向
/app/greeting发送一个 SEND 帧。/app前缀有助于将其路由到相应的控制器。去除/app前缀后,剩余的/greeting部分会被映射到GreetingController中的@MessageMapping方法。 -
GreetingController返回的值会被转换成 Spring 的Message对象,其有效载荷基于返回值,同时设置默认的目标头为/topic/greeting(该目标头是从输入目标/app替换为/topic而得)。生成的消息会被发送到brokerChannel,由消息代理进行处理。 -
消息代理会找到所有匹配的订阅者,并通过
clientOutboundChannel向每个订阅者发送一个 MESSAGE 帧,这些消息随后会被编码成 STOMP 帧并通过 WebSocket 连接进行传输。
下一节将提供关于带注释方法的更多细节,包括所支持的参数类型和返回值类型。