跳到主要内容

MQTT 支持

QWen Plus 中英对照 MQTT Support

Spring Integration 提供了入站和出站通道适配器来支持消息队列遥测传输 (MQTT) 协议。

你需要将这个依赖项添加到你的项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.4.2</version>
</dependency>
xml

当前的实现使用了 Eclipse Paho MQTT Client 库。从 6.5 版本开始,org.eclipse.paho:org.eclipse.paho.client.mqttv3 依赖项是一个 optional 依赖项,因此必须在目标项目中显式包含以支持 MQTT v3。

important

XML 配置和本章的大部分内容是关于 MQTT v3.1 协议支持及相应的 Paho 客户端。请参阅 MQTT v5 支持段落以了解相应协议支持。

两个适配器的配置是使用 DefaultMqttPahoClientFactory 实现的。有关配置选项的更多信息,请参阅 Paho 文档。

备注

我们建议配置一个 MqttConnectOptions 对象并将其注入工厂,而不是在工厂本身上设置(已弃用)的选项。

入站 (消息驱动) 通道适配器

入站通道适配器由 MqttPahoMessageDrivenChannelAdapter 实现。为了方便,你可以使用命名空间来配置它。最小配置可能如下:

<bean id="clientFactory"
class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
<property name="connectionOptions">
<bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
<property name="userName" value="${mqtt.username}"/>
<property name="password" value="${mqtt.password}"/>
</bean>
</property>
</bean>

<int-mqtt:message-driven-channel-adapter id="mqttInbound"
client-id="${mqtt.default.client.id}.src"
url="${mqtt.url}"
topics="sometopic"
client-factory="clientFactory"
channel="output"/>
xml

以下列表显示了可用的属性:

<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
client-id="foo" // <1>
url="tcp://localhost:1883" // <2>
topics="bar,baz" // <3>
qos="1,2" // <4>
converter="myConverter" // <5>
client-factory="clientFactory" // <6>
send-timeout="123" // <7>
error-channel="errors" // <8>
recovery-interval="10000" // <9>
manual-acks="false" // <10>
channel="out" />
xml
  • 客户端 ID。

  • 经纪人 URL。

  • 以逗号分隔的主题列表,此适配器从这些主题接收消息。

  • 以逗号分隔的 QoS 值列表。它可以是一个应用于所有主题的单一值,或者是每个主题的一个值(在这种情况下,列表长度必须相同)。

  • 一个 MqttMessageConverter (可选)。默认情况下,默认的 DefaultPahoMessageConverter 生成的消息包含一个 String 类型的有效负载,并带有以下标头:

  • mqtt_topic: 消息接收到的主题
  • mqtt_duplicate: 如果消息是重复的则为 true
  • mqtt_qos: 服务质量。你可以通过将其声明为 <bean/> 并将 payloadAsBytes 属性设置为 true 来配置 DefaultPahoMessageConverter 返回原始的 byte[] 作为有效负载。
  • 客户端工厂。

  • send() 超时时间。仅在通道可能会阻塞的情况下应用(例如当前已满的有界 QueueChannel)。

  • 错误通道。下游异常会在此通道中以 ErrorMessage 的形式发送,如果提供了该通道的话。有效负载是一个包含失败消息和原因的 MessagingException

  • 恢复间隔。它控制适配器在故障后尝试重新连接的时间间隔。默认为 10000 ms (十秒)。

  • 确认模式;设置为 true 以启用手动确认。

备注

从 4.1 版开始,您可以省略 URL。相反,您可以在 DefaultMqttPahoClientFactoryserverURIs 属性中提供服务器 URI。这样做可以实现例如连接到高可用性 (HA) 集群。

从 4.2.2 版本开始,当适配器成功订阅主题时,会发布 MqttSubscribedEvent。当连接或订阅失败时,会发布 MqttConnectionFailedEvent 事件。这些事件可以被实现了 ApplicationListener 的 bean 接收。

此外,一个叫做 recoveryInterval 的新属性控制适配器在失败后尝试重新连接的时间间隔。它默认为 10000ms (十秒)。

备注

在 4.2.3 版本之前,当适配器停止时,客户端总是取消订阅。这是不正确的,因为如果客户端 QOS 大于 0,我们需要保持订阅处于激活状态,以便在适配器停止期间到达的消息可以在下次启动时被传递。这也要求将客户端工厂的 cleanSession 属性设置为 false。它的默认值是 true

从 4.2.3 版本开始,默认情况下,如果 cleanSession 属性为 false,适配器不会取消订阅。

可以通过在工厂中设置 consumerCloseAction 属性来覆盖此行为。它可以有以下值:UNSUBSCRIBE_ALWAYSUNSUBSCRIBE_NEVERUNSUBSCRIBE_CLEAN。后者(默认值)仅在 cleanSession 属性为 true 时取消订阅。

要恢复到 4.2.3 之前的版本的行为,请使用 UNSUBSCRIBE_ALWAYS

important

从 5.0 版开始,topicqosretained 属性被映射到 .RECEIVED_…​ 标头 (MqttHeaders.RECEIVED_TOPICMqttHeaders.RECEIVED_QOSMqttHeaders.RECEIVED_RETAINED),以避免意外传播到使用 MqttHeaders.TOPICMqttHeaders.QOSMqttHeaders.RETAINED 标头的 outbound 消息(默认情况下)。

在运行时添加和移除主题

从 4.1 版本开始,您可以编程更改适配器订阅的主题。Spring Integration 提供了 addTopic()removeTopic() 方法。在添加主题时,您可以选择指定 QoS(默认值:1)。您还可以通过向 <control-bus/> 发送适当的消息并带有适当的负载来修改主题——例如:"myMqttAdapter.addTopic('foo', 1)"

停止和启动适配器对主题列表没有影响(它不会恢复到配置中的原始设置)。这些更改不会在应用程序上下文的生命周期之外保留。一个新的应用程序上下文会恢复到配置的设置。

在适配器停止(或与代理断开连接)时更改主题,将在下次建立连接时生效。

手动确认

从 5.3 版本开始,您可以将 manualAcks 属性设置为 true。通常用于异步确认交付。当设置为 true 时,会向消息中添加一个头 (IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK),其值为一个 SimpleAcknowledgment。您必须调用 acknowledge() 方法来完成交付。有关更多信息,请参阅 IMqttClient setManualAcks()messageArrivedComplete() 的 Javadoc。为了方便起见,提供了一个头访问器:

StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();
java

从版本 5.2.11 开始,当消息转换器抛出异常或从 MqttMessage 转换返回 null 时,MqttPahoMessageDrivenChannelAdapter 会将一个 ErrorMessage 发送到提供的 errorChannel。否则,会将此转换错误重新抛入 MQTT 客户端回调中。

使用 Java 配置

以下的 Spring Boot 应用展示了如何使用 Java 配置来配置入站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}

@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}

@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
"topic1", "topic2");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}

@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {

@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}

};
}

}
java

使用 Java DSL 配置

以下 Spring Boot 应用程序提供了使用 Java DSL 配置入站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}

@Bean
public IntegrationFlow mqttInbound() {
return IntegrationFlow.from(
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
"testClient", "topic1", "topic2"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}

}
java

输出通道适配器

outbound 通道适配器由 MqttPahoMessageHandler 实现,它被包装在一个 ConsumerEndpoint 中。为了方便起见,你可以使用命名空间来配置它。

从 4.1 版开始,适配器支持异步发送操作,避免阻塞直到交付确认。你可以发出应用程序事件,以使应用程序能够在需要时确认交付。

以下列表显示了 outbound 通道适配器可用的属性:

<int-mqtt:outbound-channel-adapter id="withConverter"
client-id="foo" // <1>
url="tcp://localhost:1883" // <2>
converter="myConverter" // <3>
client-factory="clientFactory" // <4>
default-qos="1" // <5>
qos-expression="" // <6>
default-retained="true" // <7>
retained-expression="" // <8>
default-topic="bar" // <9>
topic-expression="" // <10>
async="false" // <11>
async-events="false" // <12>
channel="target" />
xml
  • 客户端 ID。

  • 经纪人 URL。

  • 一个 MqttMessageConverter(可选)。默认的 DefaultPahoMessageConverter 识别以下头信息:

  • mqtt_topic: 消息将发送到的主题
  • mqtt_retained: 如果消息需要保留,则为 true
  • mqtt_qos: 服务质量
  • 客户端工厂。

  • 默认的服务质量。如果未找到 mqtt_qos 头或 qos-expression 返回 null,则使用它。如果你提供了一个自定义的 converter,则不使用它。

  • 用于确定 qos 的表达式。默认是 headers[mqtt_qos]

  • 保留标志的默认值。如果未找到 mqtt_retained 头,则使用它。如果提供了自定义的 converter,则不使用它。

  • 用于确定保留布尔值的表达式。默认是 headers[mqtt_retained]

  • 消息默认发送到的主题(如果未找到 mqtt_topic 头,则使用)。

  • 用于确定目标主题的表达式。默认是 headers['mqtt_topic']

  • 当为 true 时,调用方不会阻塞。而是,在消息发送时等待交付确认。默认是 false(发送会阻塞直到交付被确认)。

  • asyncasync-events 都为 true 时,会发出一个 MqttMessageSentEvent(参见 事件)。它包含消息、主题、客户端库生成的 messageIdclientIdclientInstance(每次客户端连接时递增)。当客户端库确认交付时,会发出一个 MqttMessageDeliveredEvent。它包含 messageIdclientIdclientInstance,使交付可以与 send() 相关联。任何 ApplicationListener 或事件入站通道适配器都可以接收这些事件。请注意,MqttMessageDeliveredEvent 可能在 MqttMessageSentEvent 之前收到。默认是 false

备注

从 4.1 版开始,可以省略 URL。相反,可以在 DefaultMqttPahoClientFactoryserverURIs 属性中提供服务器 URI。这使得例如连接到高可用性 (HA) 集群成为可能。

使用 Java 配置

下面的 Spring Boot 应用程序展示了如何使用 Java 配置来配置 outbound 适配器:

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToMqtt("foo");
}

@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
options.setUserName("username");
options.setPassword("password".toCharArray());
factory.setConnectionOptions(options);
return factory;
}

@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("testTopic");
return messageHandler;
}

@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {

void sendToMqtt(String data);

}

}
java

使用 Java DSL 进行配置

以下 Spring Boot 应用程序提供了使用 Java DSL 配置出站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}

@Bean
public IntegrationFlow mqttOutboundFlow() {
return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
}

}
java

事件

某些应用程序事件由适配器发布。

  • MqttConnectionFailedEvent - 如果连接失败或连接随后丢失,两个适配器都会发布此事件。对于 MQTT v5 Paho 客户端,当服务器执行正常断开连接时,也会发出此事件,在这种情况下,断开连接的 causenull

  • MqttMessageSentEvent - 在异步模式下运行时,出站适配器在消息已发送后会发布此事件。

  • MqttMessageDeliveredEvent - 在异步模式下运行时,出站适配器在客户端指示消息已送达后会发布此事件。

  • MqttMessageNotDeliveredEvent - 在异步模式下运行时,出站适配器在客户端指示消息未送达后会发布此事件。

  • MqttSubscribedEvent - 入站适配器在订阅主题后会发布此事件。

这些事件可以通过 ApplicationListener<MqttIntegrationEvent> 或带有 @EventListener 方法接收。

要确定事件的来源,使用以下方法;您可以检查 bean 名称和/或连接选项(以访问服务器 URI 等)。

MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();
java

MQTT v5 支持

从 5.5.5 版本开始,spring-integration-mqtt 模块提供了对 MQTT v5 协议的通道适配器实现。org.eclipse.paho:org.eclipse.paho.mqttv5.client 是一个 optional 依赖项,因此必须在目标项目中显式包含。

由于 MQTT v5 协议支持在 MQTT 消息中添加额外的任意属性,因此引入了 MqttHeaderMapper 实现以映射发布和接收操作中的头部信息。默认情况下(通过 * 模式),它会映射所有接收到的 PUBLISH 帧属性(包括用户属性)。在 outbound 方面,它为 PUBLISH 帧映射以下头部子集:contentTypemqtt_messageExpiryIntervalmqtt_responseTopicmqtt_correlationData

MQTT v5 协议的 outbound 通道适配器以 Mqttv5PahoMessageHandler 的形式存在。它需要一个 clientId 和 MQTT 经纪人 URL 或 MqttConnectionOptions 引用。它支持 MqttClientPersistence 选项,可以是 async 并且在这种情况下可以发出 MqttIntegrationEvent 对象(参见 asyncEvents 选项)。如果请求消息的有效负载是 org.eclipse.paho.mqttv5.common.MqttMessage,它将通过内部的 IMqttAsyncClient 按原样发布。如果有效负载是 byte[],它将按原样用作目标 MqttMessage 有效负载进行发布。如果有效负载是一个 String,它将被转换为 byte[] 进行发布。其余用例则委托给提供的 MessageConverter,它是应用程序上下文中的 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME ConfigurableCompositeMessageConverter bean。注意:当请求的消息有效负载已经是 MqttMessage 时,提供的 HeaderMapper<MqttProperties> 不会被使用。以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:

// The following Java DSL configuration sample demonstrates how to use this channel adapter in the integration flow:
java

请注意,上述代码注释部分已在文本中给出,实际的 Java DSL 示例代码应根据具体的集成需求编写。

@Bean
public IntegrationFlow mqttOutFlow() {
Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
messageHandler.setHeaderMapper(mqttHeaderMapper);
messageHandler.setAsync(true);
messageHandler.setAsyncEvents(true);
messageHandler.setConverter(mqttStringToBytesConverter());

return f -> f.handle(messageHandler);
}
java
important

org.springframework.integration.mqtt.support.MqttMessageConverter 不能与 Mqttv5PahoMessageHandler 一起使用,因为它的协议仅针对 MQTT v3 协议。

如果在启动时或运行时连接失败,Mqttv5PahoMessageHandler 会在下一个消息产生到此处理程序时尝试重新连接。如果这种手动重新连接失败,连接异常会抛回给调用者。在这种情况下,将应用标准的 Spring Integration 错误处理程序流程,包括请求处理程序建议,例如重试或断路器。

请参阅 Mqttv5PahoMessageHandler 的 javadocs 及其超类以获取更多信息。

MQTT v5 协议的入站通道适配器以 Mqttv5PahoMessageDrivenChannelAdapter 的形式存在。它需要一个 clientId 和 MQTT 经纪人 URL 或 MqttConnectionOptions 引用,以及要订阅和消费的主题。它支持一个 MqttClientPersistence 选项,默认情况下是内存中的。可以配置预期的 payloadType(默认为 byte[]),并且它会被传播到提供的 SmartMessageConverter 以将收到的 MqttMessagebyte[] 进行转换。如果设置了 manualAck 选项,则会向消息中添加一个 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 标头,以生成一个 SimpleAcknowledgment 实例。HeaderMapper<MqttProperties> 用于将 PUBLISH 帧属性(包括用户属性)映射到目标消息标头。标准的 MqttMessage 属性,如 qosiddupretained 以及接收到的主题总是被映射到标头。更多信息请参见 MqttHeaders

从 6.3 版本开始,Mqttv5PahoMessageDrivenChannelAdapter 提供了基于 MqttSubscription 的构造函数,以实现细粒度配置,而不是使用简单的主题名称。当提供这些订阅时,通道适配器的 qos 选项无法使用,因为这样的 qos 模式是 MqttSubscription API 的一部分。

以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:

@Bean
public IntegrationFlow mqttInFlow() {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
messageProducer.setPayloadType(String.class);
messageProducer.setMessageConverter(mqttStringToBytesConverter());
messageProducer.setManualAcks(true);

return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
java
important

org.springframework.integration.mqtt.support.MqttMessageConverter 不能与 Mqttv5PahoMessageDrivenChannelAdapter 一起使用,因为它的协议仅针对 MQTT v3 协议。

请参阅 Mqttv5PahoMessageDrivenChannelAdapter javadocs 及其超类以获取更多信息。

important

建议将 MqttConnectionOptions#setAutomaticReconnect(boolean) 设置为 true,以让内部的 IMqttAsyncClient 实例处理重新连接。否则,只有手动重启 Mqttv5PahoMessageDrivenChannelAdapter 才能处理重新连接,例如通过在断开连接时处理 MqttConnectionFailedEvent

共享 MQTT 客户端支持

如果一个 MQTT ClientID 需要用于多个集成,则不能使用多个 MQTT 客户端实例,因为 MQTT 代理可能对每个 ClientID 的连接数有限制(通常只允许一个连接)。为了在一个客户端中重用不同的通道适配器,可以使用 org.springframework.integration.mqtt.core.ClientManager 组件,并将其传递给任何需要的通道适配器。它将管理 MQTT 连接生命周期,并在需要时进行自动重新连接。此外,还可以向客户端管理器提供自定义连接选项和 MqttClientPersistence,就像目前可以为通道适配器组件所做的那样。

请注意,同时支持 MQTT v5 和 v3 通道适配器。

以下 Java DSL 配置示例演示了如何在集成流中使用此客户端管理器:

@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
connectionOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
connectionOptions.setConnectionTimeout(30000);
connectionOptions.setMaxReconnectDelay(1000);
connectionOptions.setAutomaticReconnect(true);
Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
clientManager.setPersistence(new MqttDefaultFilePersistence());
return clientManager;
}

@Bean
public IntegrationFlow mqttInFlowTopic1(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}

@Bean
public IntegrationFlow mqttInFlowTopic2(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}

@Bean
public IntegrationFlow mqttOutFlow(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}
java
备注

从 6.4 版本开始,可以通过 IntegrationFlowContext 使用相应的 ClientManager 在运行时添加多个 MqttPahoMessageDrivenChannelAdapterMqttv5PahoMessageDrivenChannelAdapter 实例

private void addAddRuntimeAdapter(IntegrationFlowContext flowContext, Mqttv5ClientManager clientManager,
String topic, MessageChannel channel) {
flowContext
.registration(
IntegrationFlow
.from(new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, topic))
.channel(channel)
.get())
.register();
}
java