跳到主要内容
版本:7.0.2

STOMP 支持

DeepSeek V3 中英对照 STOMP Support

Spring Integration 版本 4.2 引入了 STOMP(简单文本定向消息协议)客户端支持。它基于 Spring Framework 消息模块中 stomp 包的架构、基础设施和 API。Spring Integration 使用了 Spring STOMP 的许多组件(例如 StompSessionStompClientSupport)。更多信息,请参阅 Spring Framework 参考手册中的 Spring Framework STOMP 支持章节。

此依赖项为项目所需:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stomp</artifactId>
<version>7.0.2</version>
</dependency>

对于服务器端组件,你需要添加 org.springframework:spring-websocket 和/或 io.projectreactor.netty:reactor-netty 依赖项。

概述

要配置 STOMP,您应该从 STOMP 客户端对象开始。Spring Framework 提供了以下实现:

  • WebSocketStompClient: 基于 Spring WebSocket API 构建,支持标准 JSR-356 WebSocket、Jetty 9 以及用于基于 HTTP 的 WebSocket 模拟的 SockJS(配合 SockJS 客户端使用)。

  • ReactorNettyTcpStompClient: 基于 reactor-netty 项目中的 ReactorNettyTcpClient 构建。

你可以提供任何其他的 StompClientSupport 实现。请参阅这些类的 Javadoc

StompClientSupport 类被设计为一个工厂,用于为提供的 StompSessionHandler 生成 StompSession,所有剩余的工作都通过该 StompSessionHandlerStompSession 抽象层的回调来完成。借助 Spring Integration 的适配器抽象,我们需要提供一个受管理的共享对象,以代表我们的应用程序作为一个具有其唯一会话的 STOMP 客户端。为此,Spring Integration 提供了 StompSessionManager 抽象来管理任何提供的 StompSessionHandler 之间的单一 StompSession。这允许为特定的 STOMP 代理使用入站出站通道适配器(或两者兼用)。更多信息请参阅 StompSessionManager(及其实现)的 Javadocs。

STOMP 入站通道适配器

StompInboundChannelAdapter 是一个一站式的 MessageProducer 组件,它让你的 Spring Integration 应用程序订阅到提供的 STOMP 目的地,并从这些目的地接收消息(这些消息通过已连接的 StompSession 上提供的 MessageConverter 从 STOMP 帧转换而来)。你可以通过在 StompInboundChannelAdapter 上使用适当的 @ManagedOperation 注解,在运行时更改目的地(以及相应的 STOMP 订阅)。

有关更多配置选项,请参阅 STOMP 命名空间支持StompInboundChannelAdapterJavadoc

STOMP 出站通道适配器

StompMessageHandler<int-stomp:outbound-channel-adapter>MessageHandler,用于通过 StompSession(由共享的 StompSessionManager 提供)将传出的 Message<?> 实例发送到 STOMP destination(预先配置或在运行时通过 SpEL 表达式确定)。

更多配置选项请参阅 STOMP 命名空间支持StompMessageHandlerJavadoc

STOMP 头部映射

STOMP 协议在其帧结构中提供了头部信息。完整的 STOMP 帧格式如下:

....
COMMAND
header1:value1
header2:value2

Body^@
....

Spring Framework 提供了 StompHeaders 来表示这些头部信息。更多详情请参阅 Javadoc。STOMP 帧会与 Message<?> 实例相互转换,而这些头部信息也会与 MessageHeaders 实例相互映射。Spring Integration 为 STOMP 适配器提供了一个默认的 HeaderMapper 实现,即 StompHeaderMapper。该实现分别为入站和出站适配器提供了 fromHeaders()toHeaders() 操作。

与其他许多Spring Integration模块一样,IntegrationStompHeaders类已被引入,用于将标准STOMP头部映射到MessageHeaders,并以stomp_作为头部名称前缀。此外,所有带有该前缀的MessageHeaders实例在发送到目标时都会被映射到StompHeaders

更多信息,请参阅这些类的 Javadoc 以及 STOMP 命名空间支持 中关于 mapped-headers 属性的描述。

STOMP 集成事件

许多STOMP操作是异步的,包括错误处理。例如,STOMP有一个 RECEIPT 服务器帧,当客户端帧通过添加 RECEIPT 头请求时,服务器会返回该帧。为了提供对这些异步事件的访问,Spring Integration 会发出 StompIntegrationEvent 实例,您可以通过实现 ApplicationListener 或使用 <int-event:inbound-channel-adapter> 来获取这些实例(请参阅接收Spring应用事件)。

具体来说,当 stompSessionListenableFuture 因无法连接到 STOMP 代理而收到 onFailure() 时,AbstractStompSessionManager 会发出 StompExceptionEvent。另一个例子是 StompMessageHandler。它处理 ERROR STOMP 帧,这些帧是服务器对此 StompMessageHandler 发送的不正确(未被接受)消息的响应。

StompMessageHandler 会作为 StompSession.Receiptable 回调的一部分,在发送给 StompSession 的消息的异步响应中发出 StompReceiptEventStompReceiptEvent 可以是正面的也可以是负面的,这取决于是否在 receiptTimeLimit 期限内从服务器接收到 RECEIPT 帧,你可以在 StompClientSupport 实例上配置此期限。默认值为 15 * 1000(以毫秒为单位,即 15 秒)。

备注

仅当待发送消息的 RECEIPT STOMP 标头不为 null 时,才会添加 StompSession.Receiptable 回调。你可以通过 StompSessionautoReceipt 选项以及 StompSessionManager 分别启用自动 RECEIPT 标头生成。

有关如何配置 Spring Integration 以接收这些 ApplicationEvent 实例的更多信息,请参阅 STOMP 适配器 Java 配置

STOMP 适配器 Java 配置

以下示例展示了STOMP适配器的完整Java配置:

@Configuration
@EnableIntegration
public class StompConfiguration {

@Bean
public ReactorNettyTcpStompClient stompClient() {
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
stompClient.setMessageConverter(new PassThruMessageConverter());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler);
stompClient.setReceiptTimeLimit(5000);
return stompClient;
}

@Bean
public StompSessionManager stompSessionManager() {
ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
stompSessionManager.setAutoReceipt(true);
return stompSessionManager;
}

@Bean
public PollableChannel stompInputChannel() {
return new QueueChannel();
}

@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
adapter.setOutputChannel(stompInputChannel());
return adapter;
}

@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public MessageHandler stompMessageHandler() {
StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
handler.setDestination("/topic/myTopic");
return handler;
}

@Bean
public PollableChannel stompEvents() {
return new QueueChannel();
}

@Bean
public ApplicationListener<ApplicationEvent> stompEventListener() {
ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
producer.setEventTypes(StompIntegrationEvent.class);
producer.setOutputChannel(stompEvents());
return producer;
}

}

STOMP 命名空间支持

Spring Integration STOMP 命名空间实现了入站和出站通道适配器组件。要在配置中包含它,请在应用程序上下文配置文件中提供以下命名空间声明:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stomp
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
...
</beans>

理解 <int-stomp:outbound-channel-adapter> 元素

以下列表展示了 STOMP 出站通道适配器的可用属性:

<int-stomp:outbound-channel-adapter
id="" // <1>
channel="" // <2>
stomp-session-manager="" // <3>
header-mapper="" // <4>
mapped-headers="" // <5>
destination="" // <6>
destination-expression="" // <7>
auto-startup="" // <8>
phase=""/> // <9>
  • 组件 Bean 名称。MessageHandler 会以 id 加上 .handler 的 Bean 别名注册。如果未设置 channel 属性,则会创建一个 DirectChannel 并在应用上下文中注册,其 Bean 名称为此 id 属性的值。在这种情况下,端点会以 id 加上 .adapter 的 Bean 名称注册。

  • 如果存在 id,则标识附加到此适配器的通道。参见 id。可选。

  • StompSessionManager Bean 的引用,该 Bean 封装了底层连接和 StompSession 处理操作。必需。

  • 对实现 HeaderMapper<StompHeaders> 的 Bean 的引用,该 Bean 将 Spring Integration 的 MessageHeaders 与 STOMP 帧头相互映射。它与 mapped-headers 互斥。默认为 StompHeaderMapper

  • 要映射到 STOMP 帧头的 STOMP 头名称的逗号分隔列表。仅当未设置 header-mapper 引用时才能提供。此列表中的值也可以是用于匹配头名称的简单模式(例如 myheader**myheader)。一个特殊标记(STOMP_OUTBOUND_HEADERS)代表所有标准 STOMP 头(content-length、receipt、heart-beat 等)。默认情况下它们会被包含。如果你想添加自己的头并且也希望标准头被映射,则必须包含此标记,或者通过 header-mapper 提供你自己的 HeaderMapper 实现。

  • STOMP 消息要发送到的目标名称。它与 destination-expression 互斥。

  • 一个 SpEL 表达式,在运行时针对每个作为根对象的 Spring Integration Message 进行评估。它与 destination 互斥。

  • 布尔值,指示此端点是否应自动启动。默认为 true

  • 此端点应启动和停止的生命周期阶段。值越小,此端点启动越早,停止越晚。默认为 Integer.MIN_VALUE。值可以为负数。参见 SmartLifeCycle

理解 <int-stomp:inbound-channel-adapter> 元素 {#understanding-the-int-stomp:inbound-channel-adapter-element}

以下列表展示了 STOMP 入站通道适配器的可用属性:

<int-stomp:inbound-channel-adapter
id="" // <1>
channel="" // <2>
error-channel="" // <3>
stomp-session-manager="" // <4>
header-mapper="" // <5>
mapped-headers="" // <6>
destinations="" // <7>
send-timeout="" // <8>
payload-type="" // <9>
auto-startup="" // <10>
phase=""/> // <11>
  • 组件 Bean 名称。如果未设置 channel 属性,将创建一个 DirectChannel 并在应用程序上下文中注册,其 Bean 名称为此 id 属性的值。此时,端点将以 id 加上 .adapter 作为 Bean 名称进行注册。

  • 标识与此适配器关联的通道。

  • 应发送 ErrorMessage 实例的 MessageChannel Bean 引用。

  • 请参阅 <int-stomp:outbound-channel-adapter> 中的相同选项。

  • 要从 STOMP 帧头映射的 STOMP 头名称的逗号分隔列表。仅当未设置 header-mapper 引用时才能提供此列表。此列表中的值也可以是用于匹配头名称的简单模式(例如 myheader**myheader)。特殊标记 (STOMP_INBOUND_HEADERS) 代表所有标准 STOMP 头(content-length、receipt、heart-beat 等)。默认情况下它们会被包含。如果您想添加自己的头并且也希望映射标准头,则必须包含此标记,或使用 header-mapper 提供您自己的 HeaderMapper 实现。

  • 请参阅 <int-stomp:outbound-channel-adapter> 中的相同选项。

  • 要订阅的 STOMP 目标名称的逗号分隔列表。目标列表(以及相应的订阅)可以在运行时通过 addDestination()removeDestination() @ManagedOperation 注解进行修改。

  • 如果通道可能阻塞,则在向通道发送消息时等待的最长时间(以毫秒为单位)。例如,如果 QueueChannel 已达到其最大容量,它可能会阻塞直到有可用空间。

  • 要从传入的 STOMP 帧转换的目标 payload 的 Java 类型的完全限定名。默认为 String.class

  • 请参阅 <int-stomp:outbound-channel-adapter> 中的相同选项。

  • 请参阅 <int-stomp:outbound-channel-adapter> 中的相同选项。