跳到主要内容

STOMP 支持

QWen Plus 中英对照 STOMP Support

Spring Integration 4.2 版引入了 STOMP(简单文本面向消息协议)客户端支持。它是基于 Spring Framework 的消息模块、stomp 包中的架构、基础设施和 API 构建的。Spring Integration 使用了许多 Spring STOMP 组件(如 StompSessionStompClientSupport)。有关更多信息,请参阅 Spring Framework 参考手册中的 Spring Framework STOMP 支持 章节。

你需要将这个依赖项添加到你的项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stomp</artifactId>
<version>6.4.2</version>
</dependency>
xml

对于服务器端组件,你需要添加 org.springframework:spring-websocket 和/或 io.projectreactor.netty:reactor-netty 依赖。

概述

要配置 STOMP,你应该从 STOMP 客户端对象开始。Spring 框架提供了以下实现:

  • WebSocketStompClient:基于 Spring WebSocket API 构建,支持标准 JSR-356 WebSocket、Jetty 9 和 SockJS,用于基于 HTTP 的 WebSocket 模拟与 SockJS 客户端。

  • ReactorNettyTcpStompClient:基于 reactor-netty 项目中的 ReactorNettyTcpClient 构建。

你可以提供任何其他 StompClientSupport 实现。参见这些类的 Javadoc

StompClientSupport 类被设计为一个 工厂,用于为提供的 StompSessionHandler 创建一个 StompSession,所有剩余的工作都是通过该 StompSessionHandlerStompSession 抽象的 回调 来完成的。借助 Spring Integration 的 适配器 抽象,我们需要提供一些受管共享对象来表示我们的应用程序作为一个具有唯一会话的 STOMP 客户端。为此,Spring Integration 提供了 StompSessionManager 抽象,以在任何提供的 StompSessionHandler 之间管理 单个 StompSession。这允许使用特定 STOMP Broker 的 入站出站 通道适配器(或两者)。有关更多信息,请参阅 StompSessionManager(及其实现)JavaDocs。

STOMP 入站通道适配器

StompInboundChannelAdapter 是一个一站式的 MessageProducer 组件,它将您的 Spring Integration 应用订阅到提供的 STOMP 目标,并从这些目标接收消息(通过连接的 StompSession 上提供的 MessageConverter 从 STOMP 帧转换而来)。您可以通过在 StompInboundChannelAdapter 上使用适当的 @ManagedOperation 注解在运行时更改目标(因此也更改 STOMP 订阅)。

对于更多的配置选项,请参见 STOMP 命名空间支持StompInboundChannelAdapter Javadoc

STOMP 外发通道适配器

StompMessageHandler<int-stomp:outbound-channel-adapter>MessageHandler,用于通过 StompSession(由共享的 StompSessionManager 提供)将传出的 Message<?> 实例发送到 STOMP destination(预配置或通过运行时的 SpEL 表达式确定)。

对于更多配置选项,请参见 STOMP 命名空间支持StompMessageHandler Javadoc

STOMP 标头映射

STOMP 协议作为其帧的一部分提供了标题。STOMP 帧的整个结构具有以下格式:

....
COMMAND
header1:value1
header2:value2

Body^@
....
none

Spring Framework 提供了 StompHeaders 来表示这些头部信息。详情请参阅 Javadoc。STOMP 帧被转换为 Message<?> 实例,这些头部信息被映射到 MessageHeaders 实例。Spring Integration 为 STOMP 适配器提供了一个默认的 HeaderMapper 实现。该实现是 StompHeaderMapper。它分别为入站和出站适配器提供了 fromHeaders()toHeaders() 操作。

与许多其他 Spring Integration 模块一样,引入了 IntegrationStompHeaders 类以将标准 STOMP 标头映射到 MessageHeaders,并使用 stomp_ 作为标头名称前缀。此外,在发送到目标时,所有带有该前缀的 MessageHeaders 实例都会映射到 StompHeaders

对于更多详细信息,请参阅这些类的 Javadocmapped-headers 属性描述在 STOMP 命名空间支持 中。

STOMP 集成事件

许多 STOMP 操作是异步的,包括错误处理。例如,当客户端帧通过添加 RECEIPT 头来请求时,STOMP 会返回一个 RECEIPT 服务器帧。为了提供对这些异步事件的访问,Spring Integration 发出 StompIntegrationEvent 实例,您可以通过实现 ApplicationListener 或使用 <int-event:inbound-channel-adapter> 来获取(参见接收 Spring 应用程序事件)。

具体来说,当 stompSessionListenableFuture 因无法连接到 STOMP 代理而收到 onFailure() 时,会从 AbstractStompSessionManager 触发一个 StompExceptionEvent 。另一个例子是 StompMessageHandler 。它处理 ERROR STOMP 帧,这是服务器对由这个 StompMessageHandler 发送的不正确(未被接受)消息的响应。

StompMessageHandlerStompSession.Receiptable 回调中异步响应发送到 StompSession 的消息时,会发出 StompReceiptEventStompReceiptEvent 可以是正面的或负面的,这取决于是否在 receiptTimeLimit 期间从服务器收到了 RECEIPT 帧,你可以在 StompClientSupport 实例上配置该时间限制。默认值为 15 * 1000(以毫秒为单位,即 15 秒)。

备注

只有在要发送的消息的 RECEIPT STOMP 标头不为 null 时,才会添加 StompSession.Receiptable 回调。您可以通过 StompSessionautoReceipt 选项和 StompSessionManager 分别启用自动 RECEIPT 标头生成。

有关如何配置 Spring Integration 以接受这些 ApplicationEvent 实例的更多信息,请参阅 STOMP 适配器 Java 配置

STOMP 适配器 Java 配置

以下示例展示了 STOMP 适配器的综合 Java 配置:

@Configuration
@EnableIntegration
public class StompConfiguration {

@Bean
public ReactorNettyTcpStompClient stompClient() {
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
stompClient.setMessageConverter(new PassThruMessageConverter());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler);
stompClient.setReceiptTimeLimit(5000);
return stompClient;
}

@Bean
public StompSessionManager stompSessionManager() {
ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
stompSessionManager.setAutoReceipt(true);
return stompSessionManager;
}

@Bean
public PollableChannel stompInputChannel() {
return new QueueChannel();
}

@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
adapter.setOutputChannel(stompInputChannel());
return adapter;
}

@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public MessageHandler stompMessageHandler() {
StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
handler.setDestination("/topic/myTopic");
return handler;
}

@Bean
public PollableChannel stompEvents() {
return new QueueChannel();
}

@Bean
public ApplicationListener<ApplicationEvent> stompEventListener() {
ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
producer.setEventTypes(StompIntegrationEvent.class);
producer.setOutputChannel(stompEvents());
return producer;
}

}
java

STOMP 命名空间支持

Spring Integration STOMP 命名空间实现了入站和出站通道适配器组件。要将其包含在配置中,在应用程序上下文配置文件中提供以下命名空间声明:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stomp
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
...
</beans>
xml

了解 <int-stomp:outbound-channel-adapter> 元素

以下列表显示了 STOMP 外发通道适配器的可用属性:

<int-stomp:outbound-channel-adapter
id="" // <1>
channel="" // <2>
stomp-session-manager="" // <3>
header-mapper="" // <4>
mapped-headers="" // <5>
destination="" // <6>
destination-expression="" // <7>
auto-startup="" // <8>
phase=""/> // <9>
xml
  • 组件的 bean 名称。MessageHandler 会以 id 加上 .handler 的别名注册为一个 bean。如果您不设置 channel 属性,将创建并以这个 id 属性值作为 bean 名称在应用程序上下文中注册一个 DirectChannel。在这种情况下,端点将以 id 加上 .adapter 的 bean 名称注册。

  • 如果存在 id,标识与此适配器关联的通道。参见 id。可选。

  • 引用一个 StompSessionManager bean,它封装了低级连接和 StompSession 处理操作。必需。

  • 引用一个实现了 HeaderMapper<StompHeaders> 的 bean,它将 Spring Integration MessageHeaders 映射到 STOMP 帧头以及从 STOMP 帧头映射回来。它与 mapped-headers 互斥。默认为 StompHeaderMapper

  • 要映射到 STOMP 帧头的 STOMP 头名称的逗号分隔列表。只有在未设置 header-mapper 引用时才能提供此列表。此列表中的值也可以是用于匹配头名称的简单模式(例如 myheader**myheader)。一个特殊标记 (STOMP_OUTBOUND_HEADERS) 表示所有标准的 STOMP 头(如 content-length、receipt、heart-beat 等)。它们默认包含。如果您想添加自己的头并且希望标准头也被映射,则必须包含此标记或通过使用 header-mapper 提供您自己的 HeaderMapper 实现。

  • 发送 STOMP 消息的目标名称。它与 destination-expression 互斥。

  • 在运行时针对每个 Spring Integration Message 作为根对象进行评估的 SpEL 表达式。它与 destination 互斥。

  • 布尔值,指示此端点是否应自动启动。默认为 true

  • 此端点应在其中启动和停止的生命周期阶段。值越小,此端点启动得越早,停止得越晚。默认值为 Integer.MIN_VALUE。值可以为负数。请参阅 SmartLifeCycle

了解 <int-stomp:inbound-channel-adapter> 元素 {#understanding-the-int-stomp:inbound-channel-adapter-element}

以下列表显示了 STOMP 入站通道适配器的可用属性:

<int-stomp:inbound-channel-adapter
id="" // <1>
channel="" // <2>
error-channel="" // <3>
stomp-session-manager="" // <4>
header-mapper="" // <5>
mapped-headers="" // <6>
destinations="" // <7>
send-timeout="" // <8>
payload-type="" // <9>
auto-startup="" // <10>
phase=""/> // <11>
xml
  • 组件的 bean 名称。如果你不设置 channel 属性,会创建一个 DirectChannel 并在应用程序上下文中以这个 id 属性的值作为 bean 名称进行注册。在这种情况下,端点将以 bean 名称 id 加上 .adapter 进行注册。

  • 标识与此适配器关联的通道。

  • 应该发送 ErrorMessage 实例的 MessageChannel bean 引用。

  • 请参阅 <int-stomp:outbound-channel-adapter> 上相同的选项。

  • 用逗号分隔的 STOMP 标头名称列表,这些标头将从 STOMP 帧标头映射。只有当未设置 header-mapper 引用时才能提供此内容。此列表中的值也可以是与标头名称匹配的简单模式(例如,myheader**myheader)。一个特殊标记 (STOMP_INBOUND_HEADERS) 表示所有标准 STOMP 标头(content-length、receipt、heart-beat 等)。它们默认包含。如果你想添加自己的标头并希望标准标头也被映射,则必须包含此标记或使用 header-mapper 提供自己的 HeaderMapper 实现。

  • 请参阅 <int-stomp:outbound-channel-adapter> 上相同的选项。

  • 用逗号分隔的要订阅的 STOMP 目标名称列表。目标列表(因此也是订阅)可以通过 addDestination()removeDestination()@ManagedOperation 注解在运行时修改。

  • 在向通道发送消息时,如果通道可以阻塞,则等待的最大时间(以毫秒为单位)。例如,当 QueueChannel 达到最大容量时,它可以一直阻塞直到有可用空间。

  • 从传入的 STOMP 帧转换的目标 payload 的 Java 类型的全限定名称。默认为 String.class

  • 请参阅 <int-stomp:outbound-channel-adapter> 上相同的选项。

  • 请参阅 <int-stomp:outbound-channel-adapter> 上相同的选项。