跳到主要内容
版本:7.0.2

MQTT 支持

DeepSeek V3 中英对照 MQTT Support

Spring Integration 提供了入站和出站通道适配器,以支持消息队列遥测传输(MQTT)协议。

此依赖项为项目所需:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>7.0.2</version>
</dependency>

当前实现使用了 Eclipse Paho MQTT Client 库。从 6.5 版本开始,org.eclipse.paho:org.eclipse.paho.client.mqttv3 依赖项被标记为 optional 依赖,因此若需支持 MQTT v3 协议,必须在目标项目中显式引入该依赖。

important

XML 配置及本章大部分内容均针对 MQTT v3.1 协议支持及其对应的 Paho 客户端。相关协议支持请参阅 MQTT v5 支持 段落。

两个适配器的配置均通过 DefaultMqttPahoClientFactory 实现。有关配置选项的更多信息,请参阅 Paho 文档。

备注

我们建议配置一个 MqttConnectOptions 对象并将其注入到工厂中,而不是在工厂本身上设置(已弃用的)选项。

入站(消息驱动)通道适配器

入站通道适配器由 MqttPahoMessageDrivenChannelAdapter 实现。为方便起见,您可以使用命名空间进行配置。一个最小配置示例如下:

<bean id="clientFactory"
class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
<property name="connectionOptions">
<bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
<property name="userName" value="${mqtt.username}"/>
<property name="password" value="${mqtt.password}"/>
</bean>
</property>
</bean>

<int-mqtt:message-driven-channel-adapter id="mqttInbound"
client-id="${mqtt.default.client.id}.src"
url="${mqtt.url}"
topics="sometopic"
client-factory="clientFactory"
channel="output"/>

以下列表展示了可用的属性:

<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
client-id="foo" // <1>
url="tcp://localhost:1883" // <2>
topics="bar,baz" // <3>
qos="1,2" // <4>
converter="myConverter" // <5>
client-factory="clientFactory" // <6>
send-timeout="123" // <7>
error-channel="errors" // <8>
recovery-interval="10000" // <9>
manual-acks="false" // <10>
channel="out" />
  • 客户端 ID。

  • 代理 URL。

  • 一个以逗号分隔的主题列表,此适配器将从这些主题接收消息。

  • 一个以逗号分隔的 QoS 值列表。它可以是一个应用于所有主题的单一值,也可以是每个主题对应一个值,在后一种情况下,两个列表的长度必须相同。

  • 一个 MqttMessageConverter(可选)。默认情况下,默认的 DefaultPahoMessageConverter 会生成一个负载为 String 的消息,并包含以下头部信息:

  • mqtt_topic:消息来源的主题
  • mqtt_duplicate:如果消息是重复的,则为 true
  • mqtt_qos:服务质量
    您可以通过将 DefaultPahoMessageConverter 声明为 <bean/> 并将其 payloadAsBytes 属性设置为 true 来配置它以返回原始的 byte[] 作为负载。
  • 客户端工厂。

  • send() 超时时间。仅当通道可能阻塞(例如当前已满的有界 QueueChannel)时才适用。

  • 错误通道。如果提供了此通道,下游异常会以 ErrorMessage 的形式发送到此通道。其负载是一个包含失败消息和原因的 MessagingException

  • 恢复间隔。它控制适配器在失败后尝试重新连接的间隔时间。默认值为 10000ms(十秒)。

  • 确认模式;设置为 true 表示手动确认。

备注

从版本 4.1 开始,你可以省略 URL。相反,你可以在 DefaultMqttPahoClientFactoryserverURIs 属性中提供服务器 URI。这样做可以实现,例如,连接到高可用性(HA)集群。

从版本 4.2.2 开始,当适配器成功订阅主题时,会发布一个 MqttSubscribedEvent。当连接或订阅失败时,会发布 MqttConnectionFailedEvent 事件。这些事件可以通过实现 ApplicationListener 的 bean 来接收。

此外,新增的 recoveryInterval 属性用于控制适配器在连接失败后尝试重新连接的间隔时间,默认值为 10000ms(十秒)。

备注

在 4.2.3 版本之前,当适配器停止时,客户端总是会取消订阅。这是不正确的,因为如果客户端的 QOS 大于 0,我们需要保持订阅处于活动状态,以便在适配器停止期间到达的消息能在下次启动时被投递。这还需要将客户端工厂的 cleanSession 属性设置为 false。该属性默认值为 true

从 4.2.3 版本开始,如果 cleanSession 属性为 false,适配器(默认情况下)不会取消订阅。

此行为可以通过设置工厂上的 consumerCloseAction 属性来覆盖。该属性可以有以下值:UNSUBSCRIBE_ALWAYSUNSUBSCRIBE_NEVERUNSUBSCRIBE_CLEAN。最后一个值(默认值)仅在 cleanSession 属性为 true 时取消订阅。

要恢复到 4.2.3 版本之前的行为,请使用 UNSUBSCRIBE_ALWAYS

important

从 5.0 版本开始,topicqosretained 属性被映射到 .RECEIVED_…​ 头部(MqttHeaders.RECEIVED_TOPICMqttHeaders.RECEIVED_QOSMqttHeaders.RECEIVED_RETAINED),以避免无意中传播到出站消息(默认情况下)使用的 MqttHeaders.TOPICMqttHeaders.QOSMqttHeaders.RETAINED 头部。

运行时添加和移除主题

从版本4.1开始,你可以通过编程方式更改适配器订阅的主题。Spring Integration 提供了 addTopic()removeTopic() 方法。在添加主题时,你可以选择性地指定 QoS(默认值:1)。你也可以通过向 <control-bus/> 发送带有适当负载的消息来修改主题,例如:"myMqttAdapter.addTopic('foo', 1)"

停止和启动适配器对主题列表没有影响(不会恢复到配置中的原始设置)。这些更改不会在应用程序上下文生命周期之外保留。新的应用程序上下文会恢复到配置的设置。

当适配器停止(或与代理断开连接)时更改主题,将在下次建立连接时生效。

手动确认

从 5.3 版本开始,你可以将 manualAcks 属性设置为 true。这通常用于异步确认消息传递。当设置为 true 时,消息头(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK)会被添加到消息中,其值为一个 SimpleAcknowledgment。你必须调用 acknowledge() 方法来完成消息传递。更多信息请参阅 IMqttClientsetManualAcks()messageArrivedComplete() 的 Javadocs。为了方便使用,提供了一个消息头访问器:

StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();

从版本 5.2.11 开始,当消息转换器在 MqttMessage 转换过程中抛出异常或返回 null 时,MqttPahoMessageDrivenChannelAdapter 会向 errorChannel(如果已配置)发送一个 ErrorMessage。否则,将此转换错误重新抛出到 MQTT 客户端回调中。

使用 Java 配置进行配置

以下Spring Boot应用程序展示了如何使用Java配置来配置入站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}

@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}

@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
"topic1", "topic2");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}

@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {

@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}

};
}

}

使用 Java DSL 进行配置

以下Spring Boot应用程序展示了如何使用Java DSL配置入站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}

@Bean
public IntegrationFlow mqttInbound() {
return IntegrationFlow.from(
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
"testClient", "topic1", "topic2"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}

}

出站通道适配器

出站通道适配器由 MqttPahoMessageHandler 实现,该处理器被包装在 ConsumerEndpoint 中。为方便起见,您可以使用命名空间来配置它。

从 4.1 版本开始,适配器支持异步发送操作,避免在确认送达前阻塞。你可以触发应用事件,以便应用在需要时确认送达。

以下列表展示了出站通道适配器可用的属性:

<int-mqtt:outbound-channel-adapter id="withConverter"
client-id="foo" // <1>
url="tcp://localhost:1883" // <2>
converter="myConverter" // <3>
client-factory="clientFactory" // <4>
default-qos="1" // <5>
qos-expression="" // <6>
default-retained="true" // <7>
retained-expression="" // <8>
default-topic="bar" // <9>
topic-expression="" // <10>
async="false" // <11>
async-events="false" // <12>
channel="target" />
  • 客户端 ID。

  • 代理 URL。

  • 一个 MqttMessageConverter(可选)。默认的 DefaultPahoMessageConverter 识别以下头部信息:

  • mqtt_topic:消息将被发送到的主题
  • mqtt_retained:如果消息需要被保留,则为 true
  • mqtt_qos:服务质量
  • 客户端工厂。

  • 默认的服务质量。如果未找到 mqtt_qos 头部信息或 qos-expression 返回 null,则使用此值。如果提供了自定义的 converter,则不使用此值。

  • 用于确定 qos 的表达式。默认为 headers[mqtt_qos]

  • 保留标志的默认值。如果未找到 mqtt_retained 头部信息,则使用此值。如果提供了自定义的 converter,则不使用此值。

  • 用于确定保留布尔值的表达式。默认为 headers[mqtt_retained]

  • 发送消息的默认主题(在未找到 mqtt_topic 头部信息时使用)。

  • 用于确定目标主题的表达式。默认为 headers['mqtt_topic']

  • 当为 true 时,调用者不会阻塞。相反,它在消息发送时等待投递确认。默认为 false(发送会阻塞,直到投递被确认)。

  • asyncasync-events 都为 true 时,会发出一个 MqttMessageSentEvent(参见事件)。它包含消息、主题、由客户端库生成的 messageIdclientIdclientInstance(每次客户端连接时递增)。当投递被客户端库确认时,会发出一个 MqttMessageDeliveredEvent。它包含 messageIdclientIdclientInstance,使得投递可以与 send() 关联起来。任何 ApplicationListener 或事件入站通道适配器都可以接收这些事件。请注意,有可能在 MqttMessageSentEvent 之前接收到 MqttMessageDeliveredEvent。默认为 false

备注

从版本 4.1 开始,可以省略 URL。相反,可以在 DefaultMqttPahoClientFactoryserverURIs 属性中提供服务器 URI。例如,这支持连接到高可用性(HA)集群。

使用 Java 配置进行配置

以下Spring Boot应用程序展示了如何使用Java配置来配置出站适配器的示例:

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToMqtt("foo");
}

@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
options.setUserName("username");
options.setPassword("password".toCharArray());
factory.setConnectionOptions(options);
return factory;
}

@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("testTopic");
return messageHandler;
}

@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {

void sendToMqtt(String data);

}

}

使用 Java DSL 进行配置

以下Spring Boot应用程序提供了一个使用Java DSL配置出站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}

@Bean
public IntegrationFlow mqttOutboundFlow() {
return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
}

}

事件

某些应用程序事件由适配器发布。

  • MqttConnectionFailedEvent - 当连接失败或后续连接丢失时,两个适配器都会发布此事件。对于 MQTT v5 Paho 客户端,当服务器执行正常断开连接时也会触发此事件,此时连接丢失的 causenull

  • MqttMessageSentEvent - 在异步模式下运行时,当消息已发送时,出站适配器会发布此事件。

  • MqttMessageDeliveredEvent - 在异步模式下运行时,当客户端指示消息已送达时,出站适配器会发布此事件。

  • MqttMessageNotDeliveredEvent - 在异步模式下运行时,当客户端指示消息未送达时,出站适配器会发布此事件。

  • MqttSubscribedEvent - 在订阅主题后,入站适配器会发布此事件。

这些事件可以通过 ApplicationListener<MqttIntegrationEvent> 或带有 @EventListener 注解的方法来接收。

要确定事件的来源,请使用以下方法;您可以检查 bean 名称和/或连接选项(以访问服务器 URI 等)。

MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();

MQTT v5 支持

从 5.5.5 版本开始,spring-integration-mqtt 模块提供了针对 MQTT v5 协议的通道适配器实现。org.eclipse.paho:org.eclipse.paho.mqttv5.client 是一个 optional 依赖项,因此必须在目标项目中显式包含。

由于 MQTT v5 协议支持在 MQTT 消息中包含额外的任意属性,因此引入了 MqttHeaderMapper 实现,用于在发布和接收操作中与消息头进行映射。默认情况下(通过 * 模式),它会映射所有接收到的 PUBLISH 帧属性(包括用户属性)。在出站侧,它会为 PUBLISH 帧映射以下消息头子集:contentTypemqtt_messageExpiryIntervalmqtt_responseTopicmqtt_correlationData

MQTT v5 协议出站通道适配器以 Mqttv5PahoMessageHandler 的形式存在。它需要一个 clientId 和 MQTT 代理 URL 或 MqttConnectionOptions 引用。它支持 MqttClientPersistence 选项,可以是 async 的,并且在这种情况下可以发出 MqttIntegrationEvent 对象(参见 asyncEvents 选项)。如果请求消息负载是 org.eclipse.paho.mqttv5.common.MqttMessage,则通过内部的 IMqttAsyncClient 按原样发布。如果负载是 byte[],则直接用作目标 MqttMessage 的负载进行发布。如果负载是 String,则将其转换为 byte[] 进行发布。其余用例委托给提供的 MessageConverter,该转换器是来自应用程序上下文的 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME ConfigurableCompositeMessageConverter bean。注意:当请求消息负载已经是 MqttMessage 时,提供的 HeaderMapper<MqttProperties> 不会被使用。以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:

@Bean
public IntegrationFlow mqttOutFlow() {
Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
messageHandler.setHeaderMapper(mqttHeaderMapper);
messageHandler.setAsync(true);
messageHandler.setAsyncEvents(true);
messageHandler.setConverter(mqttStringToBytesConverter());

return f -> f.handle(messageHandler);
}
important

org.springframework.integration.mqtt.support.MqttMessageConverter 不能与 Mqttv5PahoMessageHandler 一起使用,因为其契约仅针对 MQTT v3 协议。

如果在启动时或运行时连接失败,Mqttv5PahoMessageHandler 会在下一次向此处理器发送消息时尝试重新连接。如果这种手动重连失败,连接异常将被抛回给调用者。在这种情况下,将应用标准的 Spring Integration 错误处理流程,包括请求处理器通知(例如重试或断路器)。

更多信息请参阅 Mqttv5PahoMessageHandler 的 javadocs 及其超类文档。

MQTT v5 协议的入站通道适配器以 Mqttv5PahoMessageDrivenChannelAdapter 的形式存在。它需要一个 clientId 和 MQTT 代理 URL 或 MqttConnectionOptions 引用,以及要订阅和消费的主题。它支持 MqttClientPersistence 选项,默认为内存存储。可以配置预期的 payloadType(默认为 byte[]),并将其传播到提供的 SmartMessageConverter,以便从接收到的 MqttMessagebyte[] 进行转换。如果设置了 manualAck 选项,则会将一个 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 头添加到生成的消息中,作为 SimpleAcknowledgment 的实例。HeaderMapper<MqttProperties> 用于将 PUBLISH 帧属性(包括用户属性)映射到目标消息头中。标准的 MqttMessage 属性,如 qosiddupretained 以及接收到的主题,始终会映射到消息头中。更多信息请参阅 MqttHeaders

从版本 6.3 开始,Mqttv5PahoMessageDrivenChannelAdapter 提供了基于 MqttSubscription 的构造函数,用于细粒度配置,而不是简单的主题名称。当提供了这些订阅时,通道适配器的 qos 选项将无法使用,因为这种 qos 模式是 MqttSubscription API 的一部分。

以下 Java DSL 配置示例展示了如何在集成流中使用此通道适配器:

@Bean
public IntegrationFlow mqttInFlow() {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
messageProducer.setPayloadType(String.class);
messageProducer.setMessageConverter(mqttStringToBytesConverter());
messageProducer.setManualAcks(true);

return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
important

org.springframework.integration.mqtt.support.MqttMessageConverter 不能与 Mqttv5PahoMessageDrivenChannelAdapter 一起使用,因为它的契约仅针对 MQTT v3 协议。

更多信息请参阅 Mqttv5PahoMessageDrivenChannelAdapter 的 Java 文档及其超类。

important

建议将 MqttConnectionOptions#setAutomaticReconnect(boolean) 设置为 true,以便让内部的 IMqttAsyncClient 实例处理重新连接。否则,只有通过手动重启 Mqttv5PahoMessageDrivenChannelAdapter 才能处理重新连接,例如在断开连接时通过处理 MqttConnectionFailedEvent 事件来实现。

共享 MQTT 客户端支持

如果多个集成需要使用同一个 MQTT ClientID,则无法使用多个 MQTT 客户端实例,因为 MQTT 代理可能对每个 ClientID 的连接数量有限制(通常只允许单个连接)。若要让单个客户端被不同通道适配器复用,可以使用 org.springframework.integration.mqtt.core.ClientManager 组件,并将其传递给所需的任何通道适配器。该组件将管理 MQTT 连接的生命周期,并在需要时执行自动重连。此外,与当前通道适配器组件的配置方式相同,也可以向客户端管理器提供自定义连接选项和 MqttClientPersistence

请注意,MQTT v5 和 v3 通道适配器均受支持。

以下 Java DSL 配置示例展示了如何在集成流中使用此客户端管理器:

@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
connectionOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
connectionOptions.setConnectionTimeout(30000);
connectionOptions.setMaxReconnectDelay(1000);
connectionOptions.setAutomaticReconnect(true);
Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
clientManager.setPersistence(new MqttDefaultFilePersistence());
return clientManager;
}

@Bean
public IntegrationFlow mqttInFlowTopic1(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}

@Bean
public IntegrationFlow mqttInFlowTopic2(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}

@Bean
public IntegrationFlow mqttOutFlow(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}
备注

从版本 6.4 开始,现在可以通过 IntegrationFlowContext 使用相应的 ClientManager 在运行时添加多个 MqttPahoMessageDrivenChannelAdapterMqttv5PahoMessageDrivenChannelAdapter 实例。

private void addAddRuntimeAdapter(IntegrationFlowContext flowContext, Mqttv5ClientManager clientManager,
String topic, MessageChannel channel) {
flowContext
.registration(
IntegrationFlow
.from(new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, topic))
.channel(channel)
.get())
.register();
}