STOMP 支持
Spring Integration 4.2 版引入了 STOMP(简单文本面向消息协议)客户端支持。它是基于 Spring Framework 的消息模块、stomp 包中的架构、基础设施和 API 构建的。Spring Integration 使用了许多 Spring STOMP 组件(如 StompSession
和 StompClientSupport
)。有关更多信息,请参阅 Spring Framework 参考手册中的 Spring Framework STOMP 支持 章节。
你需要将这个依赖项添加到你的项目中:
- Maven
- Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stomp</artifactId>
<version>6.4.2</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:6.4.2"
对于服务器端组件,你需要添加 org.springframework:spring-websocket
和/或 io.projectreactor.netty:reactor-netty
依赖。
概述
要配置 STOMP,你应该从 STOMP 客户端对象开始。Spring 框架提供了以下实现:
-
WebSocketStompClient
:基于 Spring WebSocket API 构建,支持标准 JSR-356 WebSocket、Jetty 9 和 SockJS,用于基于 HTTP 的 WebSocket 模拟与 SockJS 客户端。 -
ReactorNettyTcpStompClient
:基于reactor-netty
项目中的ReactorNettyTcpClient
构建。
你可以提供任何其他 StompClientSupport
实现。参见这些类的 Javadoc 。
StompClientSupport
类被设计为一个 工厂,用于为提供的 StompSessionHandler
创建一个 StompSession
,所有剩余的工作都是通过该 StompSessionHandler
和 StompSession
抽象的 回调 来完成的。借助 Spring Integration 的 适配器 抽象,我们需要提供一些受管共享对象来表示我们的应用程序作为一个具有唯一会话的 STOMP 客户端。为此,Spring Integration 提供了 StompSessionManager
抽象,以在任何提供的 StompSessionHandler
之间管理 单个 StompSession
。这允许使用特定 STOMP Broker 的 入站 或 出站 通道适配器(或两者)。有关更多信息,请参阅 StompSessionManager
(及其实现)JavaDocs。
STOMP 入站通道适配器
StompInboundChannelAdapter
是一个一站式的 MessageProducer
组件,它将您的 Spring Integration 应用订阅到提供的 STOMP 目标,并从这些目标接收消息(通过连接的 StompSession
上提供的 MessageConverter
从 STOMP 帧转换而来)。您可以通过在 StompInboundChannelAdapter
上使用适当的 @ManagedOperation
注解在运行时更改目标(因此也更改 STOMP 订阅)。
对于更多的配置选项,请参见 STOMP 命名空间支持 和 StompInboundChannelAdapter
Javadoc。
STOMP 外发通道适配器
StompMessageHandler
是 <int-stomp:outbound-channel-adapter>
的 MessageHandler
,用于通过 StompSession
(由共享的 StompSessionManager
提供)将传出的 Message<?>
实例发送到 STOMP destination
(预配置或通过运行时的 SpEL 表达式确定)。
对于更多配置选项,请参见 STOMP 命名空间支持 和 StompMessageHandler
Javadoc。
STOMP 标头映射
STOMP 协议作为其帧的一部分提供了标题。STOMP 帧的整个结构具有以下格式:
....
COMMAND
header1:value1
header2:value2
Body^@
....
Spring Framework 提供了 StompHeaders
来表示这些头部信息。详情请参阅 Javadoc。STOMP 帧被转换为 Message<?>
实例,这些头部信息被映射到 MessageHeaders
实例。Spring Integration 为 STOMP 适配器提供了一个默认的 HeaderMapper
实现。该实现是 StompHeaderMapper
。它分别为入站和出站适配器提供了 fromHeaders()
和 toHeaders()
操作。
与许多其他 Spring Integration 模块一样,引入了 IntegrationStompHeaders
类以将标准 STOMP 标头映射到 MessageHeaders
,并使用 stomp_
作为标头名称前缀。此外,在发送到目标时,所有带有该前缀的 MessageHeaders
实例都会映射到 StompHeaders
。
对于更多详细信息,请参阅这些类的 Javadoc 和 mapped-headers
属性描述在 STOMP 命名空间支持 中。
STOMP 集成事件
许多 STOMP 操作是异步的,包括错误处理。例如,当客户端帧通过添加 RECEIPT
头来请求时,STOMP 会返回一个 RECEIPT
服务器帧。为了提供对这些异步事件的访问,Spring Integration 发出 StompIntegrationEvent
实例,您可以通过实现 ApplicationListener
或使用 <int-event:inbound-channel-adapter>
来获取(参见接收 Spring 应用程序事件)。
具体来说,当 stompSessionListenableFuture
因无法连接到 STOMP 代理而收到 onFailure()
时,会从 AbstractStompSessionManager
触发一个 StompExceptionEvent
。另一个例子是 StompMessageHandler
。它处理 ERROR
STOMP 帧,这是服务器对由这个 StompMessageHandler
发送的不正确(未被接受)消息的响应。
StompMessageHandler
在 StompSession.Receiptable
回调中异步响应发送到 StompSession
的消息时,会发出 StompReceiptEvent
。StompReceiptEvent
可以是正面的或负面的,这取决于是否在 receiptTimeLimit
期间从服务器收到了 RECEIPT
帧,你可以在 StompClientSupport
实例上配置该时间限制。默认值为 15 * 1000
(以毫秒为单位,即 15 秒)。
只有在要发送的消息的 RECEIPT
STOMP 标头不为 null
时,才会添加 StompSession.Receiptable
回调。您可以通过 StompSession
的 autoReceipt
选项和 StompSessionManager
分别启用自动 RECEIPT
标头生成。
有关如何配置 Spring Integration 以接受这些 ApplicationEvent
实例的更多信息,请参阅 STOMP 适配器 Java 配置。
STOMP 适配器 Java 配置
以下示例展示了 STOMP 适配器的综合 Java 配置:
@Configuration
@EnableIntegration
public class StompConfiguration {
@Bean
public ReactorNettyTcpStompClient stompClient() {
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
stompClient.setMessageConverter(new PassThruMessageConverter());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler);
stompClient.setReceiptTimeLimit(5000);
return stompClient;
}
@Bean
public StompSessionManager stompSessionManager() {
ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
stompSessionManager.setAutoReceipt(true);
return stompSessionManager;
}
@Bean
public PollableChannel stompInputChannel() {
return new QueueChannel();
}
@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
adapter.setOutputChannel(stompInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public MessageHandler stompMessageHandler() {
StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
handler.setDestination("/topic/myTopic");
return handler;
}
@Bean
public PollableChannel stompEvents() {
return new QueueChannel();
}
@Bean
public ApplicationListener<ApplicationEvent> stompEventListener() {
ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
producer.setEventTypes(StompIntegrationEvent.class);
producer.setOutputChannel(stompEvents());
return producer;
}
}
STOMP 命名空间支持
Spring Integration STOMP 命名空间实现了入站和出站通道适配器组件。要将其包含在配置中,在应用程序上下文配置文件中提供以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stomp
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
...
</beans>
了解 <int-stomp:outbound-channel-adapter>
元素
以下列表显示了 STOMP 外发通道适配器的可用属性:
<int-stomp:outbound-channel-adapter
id="" // <1>
channel="" // <2>
stomp-session-manager="" // <3>
header-mapper="" // <4>
mapped-headers="" // <5>
destination="" // <6>
destination-expression="" // <7>
auto-startup="" // <8>
phase=""/> // <9>
组件的 bean 名称。
MessageHandler
会以id
加上.handler
的别名注册为一个 bean。如果您不设置channel
属性,将创建并以这个id
属性值作为 bean 名称在应用程序上下文中注册一个DirectChannel
。在这种情况下,端点将以id
加上.adapter
的 bean 名称注册。如果存在
id
,标识与此适配器关联的通道。参见id
。可选。引用一个
StompSessionManager
bean,它封装了低级连接和StompSession
处理操作。必需。引用一个实现了
HeaderMapper<StompHeaders>
的 bean,它将 Spring IntegrationMessageHeaders
映射到 STOMP 帧头以及从 STOMP 帧头映射回来。它与mapped-headers
互斥。默认为StompHeaderMapper
。要映射到 STOMP 帧头的 STOMP 头名称的逗号分隔列表。只有在未设置
header-mapper
引用时才能提供此列表。此列表中的值也可以是用于匹配头名称的简单模式(例如myheader*
或*myheader
)。一个特殊标记 (STOMP_OUTBOUND_HEADERS
) 表示所有标准的 STOMP 头(如 content-length、receipt、heart-beat 等)。它们默认包含。如果您想添加自己的头并且希望标准头也被映射,则必须包含此标记或通过使用header-mapper
提供您自己的HeaderMapper
实现。发送 STOMP 消息的目标名称。它与
destination-expression
互斥。在运行时针对每个 Spring Integration
Message
作为根对象进行评估的 SpEL 表达式。它与destination
互斥。布尔值,指示此端点是否应自动启动。默认为
true
。此端点应在其中启动和停止的生命周期阶段。值越小,此端点启动得越早,停止得越晚。默认值为
Integer.MIN_VALUE
。值可以为负数。请参阅 SmartLifeCycle。
了解 <int-stomp:inbound-channel-adapter>
元素 {#understanding-the-int-stomp:inbound-channel-adapter-element}
以下列表显示了 STOMP 入站通道适配器的可用属性:
<int-stomp:inbound-channel-adapter
id="" // <1>
channel="" // <2>
error-channel="" // <3>
stomp-session-manager="" // <4>
header-mapper="" // <5>
mapped-headers="" // <6>
destinations="" // <7>
send-timeout="" // <8>
payload-type="" // <9>
auto-startup="" // <10>
phase=""/> // <11>
组件的 bean 名称。如果你不设置
channel
属性,会创建一个DirectChannel
并在应用程序上下文中以这个id
属性的值作为 bean 名称进行注册。在这种情况下,端点将以 bean 名称id
加上.adapter
进行注册。标识与此适配器关联的通道。
应该发送
ErrorMessage
实例的MessageChannel
bean 引用。请参阅 <int-stomp:outbound-channel-adapter> 上相同的选项。
用逗号分隔的 STOMP 标头名称列表,这些标头将从 STOMP 帧标头映射。只有当未设置
header-mapper
引用时才能提供此内容。此列表中的值也可以是与标头名称匹配的简单模式(例如,myheader*
或*myheader
)。一个特殊标记 (STOMP_INBOUND_HEADERS
) 表示所有标准 STOMP 标头(content-length、receipt、heart-beat 等)。它们默认包含。如果你想添加自己的标头并希望标准标头也被映射,则必须包含此标记或使用header-mapper
提供自己的HeaderMapper
实现。请参阅 <int-stomp:outbound-channel-adapter> 上相同的选项。
用逗号分隔的要订阅的 STOMP 目标名称列表。目标列表(因此也是订阅)可以通过
addDestination()
和removeDestination()
的@ManagedOperation
注解在运行时修改。在向通道发送消息时,如果通道可以阻塞,则等待的最大时间(以毫秒为单位)。例如,当
QueueChannel
达到最大容量时,它可以一直阻塞直到有可用空间。从传入的 STOMP 帧转换的目标
payload
的 Java 类型的全限定名称。默认为String.class
。请参阅 <int-stomp:outbound-channel-adapter> 上相同的选项。
请参阅 <int-stomp:outbound-channel-adapter> 上相同的选项。