MQTT 支持
Spring Integration 提供了入站和出站通道适配器来支持消息队列遥测传输 (MQTT) 协议。
你需要将这个依赖项添加到你的项目中:
- Maven
- Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.4.2</version>
</dependency>
compile "org.springframework.integration:spring-integration-mqtt:6.4.2"
当前的实现使用了 Eclipse Paho MQTT Client 库。从 6.5 版本开始,org.eclipse.paho:org.eclipse.paho.client.mqttv3
依赖项是一个 optional
依赖项,因此必须在目标项目中显式包含以支持 MQTT v3。
XML 配置和本章的大部分内容是关于 MQTT v3.1 协议支持及相应的 Paho 客户端。请参阅 MQTT v5 支持段落以了解相应协议支持。
两个适配器的配置是使用 DefaultMqttPahoClientFactory
实现的。有关配置选项的更多信息,请参阅 Paho 文档。
我们建议配置一个 MqttConnectOptions
对象并将其注入工厂,而不是在工厂本身上设置(已弃用)的选项。
入站 (消息驱动) 通道适配器
入站通道适配器由 MqttPahoMessageDrivenChannelAdapter
实现。为了方便,你可以使用命名空间来配置它。最小配置可能如下:
<bean id="clientFactory"
class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
<property name="connectionOptions">
<bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
<property name="userName" value="${mqtt.username}"/>
<property name="password" value="${mqtt.password}"/>
</bean>
</property>
</bean>
<int-mqtt:message-driven-channel-adapter id="mqttInbound"
client-id="${mqtt.default.client.id}.src"
url="${mqtt.url}"
topics="sometopic"
client-factory="clientFactory"
channel="output"/>
以下列表显示了可用的属性:
<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
client-id="foo" // <1>
url="tcp://localhost:1883" // <2>
topics="bar,baz" // <3>
qos="1,2" // <4>
converter="myConverter" // <5>
client-factory="clientFactory" // <6>
send-timeout="123" // <7>
error-channel="errors" // <8>
recovery-interval="10000" // <9>
manual-acks="false" // <10>
channel="out" />
客户端 ID。
经纪人 URL。
以逗号分隔的主题列表,此适配器从这些主题接收消息。
以逗号分隔的 QoS 值列表。它可以是一个应用于所有主题的单一值,或者是每个主题的一个值(在这种情况下,列表长度必须相同)。
一个
MqttMessageConverter
(可选)。默认情况下,默认的DefaultPahoMessageConverter
生成的消息包含一个String
类型的有效负载,并带有以下标头:mqtt_topic
: 消息接收到的主题mqtt_duplicate
: 如果消息是重复的则为true
mqtt_qos
: 服务质量。你可以通过将其声明为<bean/>
并将payloadAsBytes
属性设置为true
来配置DefaultPahoMessageConverter
返回原始的byte[]
作为有效负载。客户端工厂。
send()
超时时间。仅在通道可能会阻塞的情况下应用(例如当前已满的有界QueueChannel
)。错误通道。下游异常会在此通道中以
ErrorMessage
的形式发送,如果提供了该通道的话。有效负载是一个包含失败消息和原因的MessagingException
。恢复间隔。它控制适配器在故障后尝试重新连接的时间间隔。默认为
10000 ms
(十秒)。确认模式;设置为
true
以启用手动确认。
从 4.1 版开始,您可以省略 URL。相反,您可以在 DefaultMqttPahoClientFactory
的 serverURIs
属性中提供服务器 URI。这样做可以实现例如连接到高可用性 (HA) 集群。
从 4.2.2 版本开始,当适配器成功订阅主题时,会发布 MqttSubscribedEvent
。当连接或订阅失败时,会发布 MqttConnectionFailedEvent
事件。这些事件可以被实现了 ApplicationListener
的 bean 接收。
此外,一个叫做 recoveryInterval
的新属性控制适配器在失败后尝试重新连接的时间间隔。它默认为 10000ms
(十秒)。
在 4.2.3 版本之前,当适配器停止时,客户端总是取消订阅。这是不正确的,因为如果客户端 QOS 大于 0,我们需要保持订阅处于激活状态,以便在适配器停止期间到达的消息可以在下次启动时被传递。这也要求将客户端工厂的 cleanSession
属性设置为 false
。它的默认值是 true
。
从 4.2.3 版本开始,默认情况下,如果 cleanSession
属性为 false
,适配器不会取消订阅。
可以通过在工厂中设置 consumerCloseAction
属性来覆盖此行为。它可以有以下值:UNSUBSCRIBE_ALWAYS
、UNSUBSCRIBE_NEVER
和 UNSUBSCRIBE_CLEAN
。后者(默认值)仅在 cleanSession
属性为 true
时取消订阅。
要恢复到 4.2.3 之前的版本的行为,请使用 UNSUBSCRIBE_ALWAYS
。
从 5.0 版开始,topic
、qos
和 retained
属性被映射到 .RECEIVED_…
标头 (MqttHeaders.RECEIVED_TOPIC
,MqttHeaders.RECEIVED_QOS
和 MqttHeaders.RECEIVED_RETAINED
),以避免意外传播到使用 MqttHeaders.TOPIC
、MqttHeaders.QOS
和 MqttHeaders.RETAINED
标头的 outbound 消息(默认情况下)。
在运行时添加和移除主题
从 4.1 版本开始,您可以编程更改适配器订阅的主题。Spring Integration 提供了 addTopic()
和 removeTopic()
方法。在添加主题时,您可以选择指定 QoS
(默认值:1)。您还可以通过向 <control-bus/>
发送适当的消息并带有适当的负载来修改主题——例如:"myMqttAdapter.addTopic('foo', 1)"
。
停止和启动适配器对主题列表没有影响(它不会恢复到配置中的原始设置)。这些更改不会在应用程序上下文的生命周期之外保留。一个新的应用程序上下文会恢复到配置的设置。
在适配器停止(或与代理断开连接)时更改主题,将在下次建立连接时生效。
手动确认
从 5.3 版本开始,您可以将 manualAcks
属性设置为 true。通常用于异步确认交付。当设置为 true
时,会向消息中添加一个头 (IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
),其值为一个 SimpleAcknowledgment
。您必须调用 acknowledge()
方法来完成交付。有关更多信息,请参阅 IMqttClient
setManualAcks()
和 messageArrivedComplete()
的 Javadoc。为了方便起见,提供了一个头访问器:
StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();
从版本 5.2.11
开始,当消息转换器抛出异常或从 MqttMessage
转换返回 null
时,MqttPahoMessageDrivenChannelAdapter
会将一个 ErrorMessage
发送到提供的 errorChannel
。否则,会将此转换错误重新抛入 MQTT 客户端回调中。
使用 Java 配置
以下的 Spring Boot 应用展示了如何使用 Java 配置来配置入站适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
"topic1", "topic2");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 配置
以下 Spring Boot 应用程序提供了使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow mqttInbound() {
return IntegrationFlow.from(
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
"testClient", "topic1", "topic2"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
输出通道适配器
outbound 通道适配器由 MqttPahoMessageHandler
实现,它被包装在一个 ConsumerEndpoint
中。为了方便起见,你可以使用命名空间来配置它。
从 4.1 版开始,适配器支持异步发送操作,避免阻塞直到交付确认。你可以发出应用程序事件,以使应用程序能够在需要时确认交付。
以下列表显示了 outbound 通道适配器可用的属性:
<int-mqtt:outbound-channel-adapter id="withConverter"
client-id="foo" // <1>
url="tcp://localhost:1883" // <2>
converter="myConverter" // <3>
client-factory="clientFactory" // <4>
default-qos="1" // <5>
qos-expression="" // <6>
default-retained="true" // <7>
retained-expression="" // <8>
default-topic="bar" // <9>
topic-expression="" // <10>
async="false" // <11>
async-events="false" // <12>
channel="target" />
客户端 ID。
经纪人 URL。
一个
MqttMessageConverter
(可选)。默认的DefaultPahoMessageConverter
识别以下头信息:mqtt_topic
: 消息将发送到的主题mqtt_retained
: 如果消息需要保留,则为true
mqtt_qos
: 服务质量客户端工厂。
默认的服务质量。如果未找到
mqtt_qos
头或qos-expression
返回null
,则使用它。如果你提供了一个自定义的converter
,则不使用它。用于确定 qos 的表达式。默认是
headers[mqtt_qos]
。保留标志的默认值。如果未找到
mqtt_retained
头,则使用它。如果提供了自定义的converter
,则不使用它。用于确定保留布尔值的表达式。默认是
headers[mqtt_retained]
。消息默认发送到的主题(如果未找到
mqtt_topic
头,则使用)。用于确定目标主题的表达式。默认是
headers['mqtt_topic']
。当为
true
时,调用方不会阻塞。而是,在消息发送时等待交付确认。默认是false
(发送会阻塞直到交付被确认)。当
async
和async-events
都为true
时,会发出一个MqttMessageSentEvent
(参见 事件)。它包含消息、主题、客户端库生成的messageId
、clientId
和clientInstance
(每次客户端连接时递增)。当客户端库确认交付时,会发出一个MqttMessageDeliveredEvent
。它包含messageId
、clientId
和clientInstance
,使交付可以与send()
相关联。任何ApplicationListener
或事件入站通道适配器都可以接收这些事件。请注意,MqttMessageDeliveredEvent
可能在MqttMessageSentEvent
之前收到。默认是false
。
从 4.1 版开始,可以省略 URL。相反,可以在 DefaultMqttPahoClientFactory
的 serverURIs
属性中提供服务器 URI。这使得例如连接到高可用性 (HA) 集群成为可能。
使用 Java 配置
下面的 Spring Boot 应用程序展示了如何使用 Java 配置来配置 outbound 适配器:
@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToMqtt("foo");
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
options.setUserName("username");
options.setPassword("password".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("testTopic");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data);
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序提供了使用 Java DSL 配置出站适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow mqttOutboundFlow() {
return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
}
}
事件
某些应用程序事件由适配器发布。
-
MqttConnectionFailedEvent
- 如果连接失败或连接随后丢失,两个适配器都会发布此事件。对于 MQTT v5 Paho 客户端,当服务器执行正常断开连接时,也会发出此事件,在这种情况下,断开连接的cause
为null
。 -
MqttMessageSentEvent
- 在异步模式下运行时,出站适配器在消息已发送后会发布此事件。 -
MqttMessageDeliveredEvent
- 在异步模式下运行时,出站适配器在客户端指示消息已送达后会发布此事件。 -
MqttMessageNotDeliveredEvent
- 在异步模式下运行时,出站适配器在客户端指示消息未送达后会发布此事件。 -
MqttSubscribedEvent
- 入站适配器在订阅主题后会发布此事件。
这些事件可以通过 ApplicationListener<MqttIntegrationEvent>
或带有 @EventListener
方法接收。
要确定事件的来源,使用以下方法;您可以检查 bean 名称和/或连接选项(以访问服务器 URI 等)。
MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();
MQTT v5 支持
从 5.5.5 版本开始,spring-integration-mqtt
模块提供了对 MQTT v5 协议的通道适配器实现。org.eclipse.paho:org.eclipse.paho.mqttv5.client
是一个 optional
依赖项,因此必须在目标项目中显式包含。
由于 MQTT v5 协议支持在 MQTT 消息中添加额外的任意属性,因此引入了 MqttHeaderMapper
实现以映射发布和接收操作中的头部信息。默认情况下(通过 *
模式),它会映射所有接收到的 PUBLISH
帧属性(包括用户属性)。在 outbound 方面,它为 PUBLISH
帧映射以下头部子集:contentType
、mqtt_messageExpiryInterval
、mqtt_responseTopic
、mqtt_correlationData
。
MQTT v5 协议的 outbound 通道适配器以 Mqttv5PahoMessageHandler
的形式存在。它需要一个 clientId
和 MQTT 经纪人 URL 或 MqttConnectionOptions
引用。它支持 MqttClientPersistence
选项,可以是 async
并且在这种情况下可以发出 MqttIntegrationEvent
对象(参见 asyncEvents
选项)。如果请求消息的有效负载是 org.eclipse.paho.mqttv5.common.MqttMessage
,它将通过内部的 IMqttAsyncClient
按原样发布。如果有效负载是 byte[]
,它将按原样用作目标 MqttMessage
有效负载进行发布。如果有效负载是一个 String
,它将被转换为 byte[]
进行发布。其余用例则委托给提供的 MessageConverter
,它是应用程序上下文中的 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
ConfigurableCompositeMessageConverter
bean。注意:当请求的消息有效负载已经是 MqttMessage
时,提供的 HeaderMapper<MqttProperties>
不会被使用。以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:
// The following Java DSL configuration sample demonstrates how to use this channel adapter in the integration flow:
请注意,上述代码注释部分已在文本中给出,实际的 Java DSL 示例代码应根据具体的集成需求编写。
@Bean
public IntegrationFlow mqttOutFlow() {
Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
messageHandler.setHeaderMapper(mqttHeaderMapper);
messageHandler.setAsync(true);
messageHandler.setAsyncEvents(true);
messageHandler.setConverter(mqttStringToBytesConverter());
return f -> f.handle(messageHandler);
}
org.springframework.integration.mqtt.support.MqttMessageConverter
不能与 Mqttv5PahoMessageHandler
一起使用,因为它的协议仅针对 MQTT v3 协议。
如果在启动时或运行时连接失败,Mqttv5PahoMessageHandler
会在下一个消息产生到此处理程序时尝试重新连接。如果这种手动重新连接失败,连接异常会抛回给调用者。在这种情况下,将应用标准的 Spring Integration 错误处理程序流程,包括请求处理程序建议,例如重试或断路器。
请参阅 Mqttv5PahoMessageHandler
的 javadocs 及其超类以获取更多信息。
MQTT v5 协议的入站通道适配器以 Mqttv5PahoMessageDrivenChannelAdapter
的形式存在。它需要一个 clientId
和 MQTT 经纪人 URL 或 MqttConnectionOptions
引用,以及要订阅和消费的主题。它支持一个 MqttClientPersistence
选项,默认情况下是内存中的。可以配置预期的 payloadType
(默认为 byte[]
),并且它会被传播到提供的 SmartMessageConverter
以将收到的 MqttMessage
的 byte[]
进行转换。如果设置了 manualAck
选项,则会向消息中添加一个 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
标头,以生成一个 SimpleAcknowledgment
实例。HeaderMapper<MqttProperties>
用于将 PUBLISH
帧属性(包括用户属性)映射到目标消息标头。标准的 MqttMessage
属性,如 qos
、id
、dup
、retained
以及接收到的主题总是被映射到标头。更多信息请参见 MqttHeaders
。
从 6.3 版本开始,Mqttv5PahoMessageDrivenChannelAdapter
提供了基于 MqttSubscription
的构造函数,以实现细粒度配置,而不是使用简单的主题名称。当提供这些订阅时,通道适配器的 qos
选项无法使用,因为这样的 qos
模式是 MqttSubscription
API 的一部分。
以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:
@Bean
public IntegrationFlow mqttInFlow() {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
messageProducer.setPayloadType(String.class);
messageProducer.setMessageConverter(mqttStringToBytesConverter());
messageProducer.setManualAcks(true);
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
org.springframework.integration.mqtt.support.MqttMessageConverter
不能与 Mqttv5PahoMessageDrivenChannelAdapter
一起使用,因为它的协议仅针对 MQTT v3 协议。
请参阅 Mqttv5PahoMessageDrivenChannelAdapter
javadocs 及其超类以获取更多信息。
建议将 MqttConnectionOptions#setAutomaticReconnect(boolean)
设置为 true,以让内部的 IMqttAsyncClient
实例处理重新连接。否则,只有手动重启 Mqttv5PahoMessageDrivenChannelAdapter
才能处理重新连接,例如通过在断开连接时处理 MqttConnectionFailedEvent
。
共享 MQTT 客户端支持
如果一个 MQTT ClientID 需要用于多个集成,则不能使用多个 MQTT 客户端实例,因为 MQTT 代理可能对每个 ClientID 的连接数有限制(通常只允许一个连接)。为了在一个客户端中重用不同的通道适配器,可以使用 org.springframework.integration.mqtt.core.ClientManager
组件,并将其传递给任何需要的通道适配器。它将管理 MQTT 连接生命周期,并在需要时进行自动重新连接。此外,还可以向客户端管理器提供自定义连接选项和 MqttClientPersistence
,就像目前可以为通道适配器组件所做的那样。
请注意,同时支持 MQTT v5 和 v3 通道适配器。
以下 Java DSL 配置示例演示了如何在集成流中使用此客户端管理器:
@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
connectionOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
connectionOptions.setConnectionTimeout(30000);
connectionOptions.setMaxReconnectDelay(1000);
connectionOptions.setAutomaticReconnect(true);
Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
clientManager.setPersistence(new MqttDefaultFilePersistence());
return clientManager;
}
@Bean
public IntegrationFlow mqttInFlowTopic1(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
@Bean
public IntegrationFlow mqttInFlowTopic2(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
@Bean
public IntegrationFlow mqttOutFlow(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}
从 6.4 版本开始,可以通过 IntegrationFlowContext
使用相应的 ClientManager
在运行时添加多个 MqttPahoMessageDrivenChannelAdapter
和 Mqttv5PahoMessageDrivenChannelAdapter
实例
private void addAddRuntimeAdapter(IntegrationFlowContext flowContext, Mqttv5ClientManager clientManager,
String topic, MessageChannel channel) {
flowContext
.registration(
IntegrationFlow
.from(new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, topic))
.channel(channel)
.get())
.register();
}