MQTT 支持
Spring Integration 提供了入站和出站通道适配器,以支持消息队列遥测传输(MQTT)协议。
此依赖项为项目所需:
- Maven
- Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>7.0.2</version>
</dependency>
compile "org.springframework.integration:spring-integration-mqtt:7.0.2"
当前实现使用了 Eclipse Paho MQTT Client 库。从 6.5 版本开始,org.eclipse.paho:org.eclipse.paho.client.mqttv3 依赖项被标记为 optional 依赖,因此若需支持 MQTT v3 协议,必须在目标项目中显式引入该依赖。
XML 配置及本章大部分内容均针对 MQTT v3.1 协议支持及其对应的 Paho 客户端。相关协议支持请参阅 MQTT v5 支持 段落。
两个适配器的配置均通过 DefaultMqttPahoClientFactory 实现。有关配置选项的更多信息,请参阅 Paho 文档。
我们建议配置一个 MqttConnectOptions 对象并将其注入到工厂中,而不是在工厂本身上设置(已弃用的)选项。
入站(消息驱动)通道适配器
入站通道适配器由 MqttPahoMessageDrivenChannelAdapter 实现。为方便起见,您可以使用命名空间进行配置。一个最小配置示例如下:
<bean id="clientFactory"
class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
<property name="connectionOptions">
<bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
<property name="userName" value="${mqtt.username}"/>
<property name="password" value="${mqtt.password}"/>
</bean>
</property>
</bean>
<int-mqtt:message-driven-channel-adapter id="mqttInbound"
client-id="${mqtt.default.client.id}.src"
url="${mqtt.url}"
topics="sometopic"
client-factory="clientFactory"
channel="output"/>
以下列表展示了可用的属性:
<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
client-id="foo" // <1>
url="tcp://localhost:1883" // <2>
topics="bar,baz" // <3>
qos="1,2" // <4>
converter="myConverter" // <5>
client-factory="clientFactory" // <6>
send-timeout="123" // <7>
error-channel="errors" // <8>
recovery-interval="10000" // <9>
manual-acks="false" // <10>
channel="out" />
客户端 ID。
代理 URL。
一个以逗号分隔的主题列表,此适配器将从这些主题接收消息。
一个以逗号分隔的 QoS 值列表。它可以是一个应用于所有主题的单一值,也可以是每个主题对应一个值,在后一种情况下,两个列表的长度必须相同。
一个
MqttMessageConverter(可选)。默认情况下,默认的DefaultPahoMessageConverter会生成一个负载为String的消息,并包含以下头部信息:mqtt_topic:消息来源的主题mqtt_duplicate:如果消息是重复的,则为truemqtt_qos:服务质量
您可以通过将DefaultPahoMessageConverter声明为<bean/>并将其payloadAsBytes属性设置为true来配置它以返回原始的byte[]作为负载。客户端工厂。
send()超时时间。仅当通道可能阻塞(例如当前已满的有界QueueChannel)时才适用。错误通道。如果提供了此通道,下游异常会以
ErrorMessage的形式发送到此通道。其负载是一个包含失败消息和原因的MessagingException。恢复间隔。它控制适配器在失败后尝试重新连接的间隔时间。默认值为
10000ms(十秒)。确认模式;设置为 true 表示手动确认。
从版本 4.1 开始,你可以省略 URL。相反,你可以在 DefaultMqttPahoClientFactory 的 serverURIs 属性中提供服务器 URI。这样做可以实现,例如,连接到高可用性(HA)集群。
从版本 4.2.2 开始,当适配器成功订阅主题时,会发布一个 MqttSubscribedEvent。当连接或订阅失败时,会发布 MqttConnectionFailedEvent 事件。这些事件可以通过实现 ApplicationListener 的 bean 来接收。
此外,新增的 recoveryInterval 属性用于控制适配器在连接失败后尝试重新连接的间隔时间,默认值为 10000ms(十秒)。
在 4.2.3 版本之前,当适配器停止时,客户端总是会取消订阅。这是不正确的,因为如果客户端的 QOS 大于 0,我们需要保持订阅处于活动状态,以便在适配器停止期间到达的消息能在下次启动时被投递。这还需要将客户端工厂的 cleanSession 属性设置为 false。该属性默认值为 true。
从 4.2.3 版本开始,如果 cleanSession 属性为 false,适配器(默认情况下)不会取消订阅。
此行为可以通过设置工厂上的 consumerCloseAction 属性来覆盖。该属性可以有以下值:UNSUBSCRIBE_ALWAYS、UNSUBSCRIBE_NEVER 和 UNSUBSCRIBE_CLEAN。最后一个值(默认值)仅在 cleanSession 属性为 true 时取消订阅。
要恢复到 4.2.3 版本之前的行为,请使用 UNSUBSCRIBE_ALWAYS。
从 5.0 版本开始,topic、qos 和 retained 属性被映射到 .RECEIVED_… 头部(MqttHeaders.RECEIVED_TOPIC、MqttHeaders.RECEIVED_QOS 和 MqttHeaders.RECEIVED_RETAINED),以避免无意中传播到出站消息(默认情况下)使用的 MqttHeaders.TOPIC、MqttHeaders.QOS 和 MqttHeaders.RETAINED 头部。
运行时添加和移除主题
从版本4.1开始,你可以通过编程方式更改适配器订阅的主题。Spring Integration 提供了 addTopic() 和 removeTopic() 方法。在添加主题时,你可以选择性地指定 QoS(默认值:1)。你也可以通过向 <control-bus/> 发送带有适当负载的消息来修改主题,例如:"myMqttAdapter.addTopic('foo', 1)"。
停止和启动适配器对主题列表没有影响(不会恢复到配置中的原始设置)。这些更改不会在应用程序上下文生命周期之外保留。新的应用程序上下文会恢复到配置的设置。
当适配器停止(或与代理断开连接)时更改主题,将在下次建立连接时生效。
手动确认
从 5.3 版本开始,你可以将 manualAcks 属性设置为 true。这通常用于异步确认消息传递。当设置为 true 时,消息头(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK)会被添加到消息中,其值为一个 SimpleAcknowledgment。你必须调用 acknowledge() 方法来完成消息传递。更多信息请参阅 IMqttClient 的 setManualAcks() 和 messageArrivedComplete() 的 Javadocs。为了方便使用,提供了一个消息头访问器:
StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();
从版本 5.2.11 开始,当消息转换器在 MqttMessage 转换过程中抛出异常或返回 null 时,MqttPahoMessageDrivenChannelAdapter 会向 errorChannel(如果已配置)发送一个 ErrorMessage。否则,将此转换错误重新抛出到 MQTT 客户端回调中。
使用 Java 配置进行配置
以下Spring Boot应用程序展示了如何使用Java配置来配置入站适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
"topic1", "topic2");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 进行配置
以下Spring Boot应用程序展示了如何使用Java DSL配置入站适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow mqttInbound() {
return IntegrationFlow.from(
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
"testClient", "topic1", "topic2"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
出站通道适配器
出站通道适配器由 MqttPahoMessageHandler 实现,该处理器被包装在 ConsumerEndpoint 中。为方便起见,您可以使用命名空间来配置它。
从 4.1 版本开始,适配器支持异步发送操作,避免在确认送达前阻塞。你可以触发应用事件,以便应用在需要时确认送达。
以下列表展示了出站通道适配器可用的属性:
<int-mqtt:outbound-channel-adapter id="withConverter"
client-id="foo" // <1>
url="tcp://localhost:1883" // <2>
converter="myConverter" // <3>
client-factory="clientFactory" // <4>
default-qos="1" // <5>
qos-expression="" // <6>
default-retained="true" // <7>
retained-expression="" // <8>
default-topic="bar" // <9>
topic-expression="" // <10>
async="false" // <11>
async-events="false" // <12>
channel="target" />
客户端 ID。
代理 URL。
一个
MqttMessageConverter(可选)。默认的DefaultPahoMessageConverter识别以下头部信息:mqtt_topic:消息将被发送到的主题mqtt_retained:如果消息需要被保留,则为truemqtt_qos:服务质量客户端工厂。
默认的服务质量。如果未找到
mqtt_qos头部信息或qos-expression返回null,则使用此值。如果提供了自定义的converter,则不使用此值。用于确定 qos 的表达式。默认为
headers[mqtt_qos]。保留标志的默认值。如果未找到
mqtt_retained头部信息,则使用此值。如果提供了自定义的converter,则不使用此值。用于确定保留布尔值的表达式。默认为
headers[mqtt_retained]。发送消息的默认主题(在未找到
mqtt_topic头部信息时使用)。用于确定目标主题的表达式。默认为
headers['mqtt_topic']。当为
true时,调用者不会阻塞。相反,它在消息发送时等待投递确认。默认为false(发送会阻塞,直到投递被确认)。当
async和async-events都为true时,会发出一个MqttMessageSentEvent(参见事件)。它包含消息、主题、由客户端库生成的messageId、clientId和clientInstance(每次客户端连接时递增)。当投递被客户端库确认时,会发出一个MqttMessageDeliveredEvent。它包含messageId、clientId和clientInstance,使得投递可以与send()关联起来。任何ApplicationListener或事件入站通道适配器都可以接收这些事件。请注意,有可能在MqttMessageSentEvent之前接收到MqttMessageDeliveredEvent。默认为false。
从版本 4.1 开始,可以省略 URL。相反,可以在 DefaultMqttPahoClientFactory 的 serverURIs 属性中提供服务器 URI。例如,这支持连接到高可用性(HA)集群。
使用 Java 配置进行配置
以下Spring Boot应用程序展示了如何使用Java配置来配置出站适配器的示例:
@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToMqtt("foo");
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
options.setUserName("username");
options.setPassword("password".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("testTopic");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data);
}
}
使用 Java DSL 进行配置
以下Spring Boot应用程序提供了一个使用Java DSL配置出站适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow mqttOutboundFlow() {
return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
}
}
事件
某些应用程序事件由适配器发布。
-
MqttConnectionFailedEvent- 当连接失败或后续连接丢失时,两个适配器都会发布此事件。对于 MQTT v5 Paho 客户端,当服务器执行正常断开连接时也会触发此事件,此时连接丢失的cause为null。 -
MqttMessageSentEvent- 在异步模式下运行时,当消息已发送时,出站适配器会发布此事件。 -
MqttMessageDeliveredEvent- 在异步模式下运行时,当客户端指示消息已送达时,出站适配器会发布此事件。 -
MqttMessageNotDeliveredEvent- 在异步模式下运行时,当客户端指示消息未送达时,出站适配器会发布此事件。 -
MqttSubscribedEvent- 在订阅主题后,入站适配器会发布此事件。
这些事件可以通过 ApplicationListener<MqttIntegrationEvent> 或带有 @EventListener 注解的方法来接收。
要确定事件的来源,请使用以下方法;您可以检查 bean 名称和/或连接选项(以访问服务器 URI 等)。
MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();
MQTT v5 支持
从 5.5.5 版本开始,spring-integration-mqtt 模块提供了针对 MQTT v5 协议的通道适配器实现。org.eclipse.paho:org.eclipse.paho.mqttv5.client 是一个 optional 依赖项,因此必须在目标项目中显式包含。
由于 MQTT v5 协议支持在 MQTT 消息中包含额外的任意属性,因此引入了 MqttHeaderMapper 实现,用于在发布和接收操作中与消息头进行映射。默认情况下(通过 * 模式),它会映射所有接收到的 PUBLISH 帧属性(包括用户属性)。在出站侧,它会为 PUBLISH 帧映射以下消息头子集:contentType、mqtt_messageExpiryInterval、mqtt_responseTopic、mqtt_correlationData。
MQTT v5 协议出站通道适配器以 Mqttv5PahoMessageHandler 的形式存在。它需要一个 clientId 和 MQTT 代理 URL 或 MqttConnectionOptions 引用。它支持 MqttClientPersistence 选项,可以是 async 的,并且在这种情况下可以发出 MqttIntegrationEvent 对象(参见 asyncEvents 选项)。如果请求消息负载是 org.eclipse.paho.mqttv5.common.MqttMessage,则通过内部的 IMqttAsyncClient 按原样发布。如果负载是 byte[],则直接用作目标 MqttMessage 的负载进行发布。如果负载是 String,则将其转换为 byte[] 进行发布。其余用例委托给提供的 MessageConverter,该转换器是来自应用程序上下文的 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME ConfigurableCompositeMessageConverter bean。注意:当请求消息负载已经是 MqttMessage 时,提供的 HeaderMapper<MqttProperties> 不会被使用。以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:
@Bean
public IntegrationFlow mqttOutFlow() {
Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
messageHandler.setHeaderMapper(mqttHeaderMapper);
messageHandler.setAsync(true);
messageHandler.setAsyncEvents(true);
messageHandler.setConverter(mqttStringToBytesConverter());
return f -> f.handle(messageHandler);
}
org.springframework.integration.mqtt.support.MqttMessageConverter 不能与 Mqttv5PahoMessageHandler 一起使用,因为其契约仅针对 MQTT v3 协议。
如果在启动时或运行时连接失败,Mqttv5PahoMessageHandler 会在下一次向此处理器发送消息时尝试重新连接。如果这种手动重连失败,连接异常将被抛回给调用者。在这种情况下,将应用标准的 Spring Integration 错误处理流程,包括请求处理器通知(例如重试或断路器)。
更多信息请参阅 Mqttv5PahoMessageHandler 的 javadocs 及其超类文档。
MQTT v5 协议的入站通道适配器以 Mqttv5PahoMessageDrivenChannelAdapter 的形式存在。它需要一个 clientId 和 MQTT 代理 URL 或 MqttConnectionOptions 引用,以及要订阅和消费的主题。它支持 MqttClientPersistence 选项,默认为内存存储。可以配置预期的 payloadType(默认为 byte[]),并将其传播到提供的 SmartMessageConverter,以便从接收到的 MqttMessage 的 byte[] 进行转换。如果设置了 manualAck 选项,则会将一个 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 头添加到生成的消息中,作为 SimpleAcknowledgment 的实例。HeaderMapper<MqttProperties> 用于将 PUBLISH 帧属性(包括用户属性)映射到目标消息头中。标准的 MqttMessage 属性,如 qos、id、dup、retained 以及接收到的主题,始终会映射到消息头中。更多信息请参阅 MqttHeaders。
从版本 6.3 开始,Mqttv5PahoMessageDrivenChannelAdapter 提供了基于 MqttSubscription 的构造函数,用于细粒度配置,而不是简单的主题名称。当提供了这些订阅时,通道适配器的 qos 选项将无法使用,因为这种 qos 模式是 MqttSubscription API 的一部分。
以下 Java DSL 配置示例展示了如何在集成流中使用此通道适配器:
@Bean
public IntegrationFlow mqttInFlow() {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
messageProducer.setPayloadType(String.class);
messageProducer.setMessageConverter(mqttStringToBytesConverter());
messageProducer.setManualAcks(true);
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
org.springframework.integration.mqtt.support.MqttMessageConverter 不能与 Mqttv5PahoMessageDrivenChannelAdapter 一起使用,因为它的契约仅针对 MQTT v3 协议。
更多信息请参阅 Mqttv5PahoMessageDrivenChannelAdapter 的 Java 文档及其超类。
建议将 MqttConnectionOptions#setAutomaticReconnect(boolean) 设置为 true,以便让内部的 IMqttAsyncClient 实例处理重新连接。否则,只有通过手动重启 Mqttv5PahoMessageDrivenChannelAdapter 才能处理重新连接,例如在断开连接时通过处理 MqttConnectionFailedEvent 事件来实现。
共享 MQTT 客户端支持
如果多个集成需要使用同一个 MQTT ClientID,则无法使用多个 MQTT 客户端实例,因为 MQTT 代理可能对每个 ClientID 的连接数量有限制(通常只允许单个连接)。若要让单个客户端被不同通道适配器复用,可以使用 org.springframework.integration.mqtt.core.ClientManager 组件,并将其传递给所需的任何通道适配器。该组件将管理 MQTT 连接的生命周期,并在需要时执行自动重连。此外,与当前通道适配器组件的配置方式相同,也可以向客户端管理器提供自定义连接选项和 MqttClientPersistence。
请注意,MQTT v5 和 v3 通道适配器均受支持。
以下 Java DSL 配置示例展示了如何在集成流中使用此客户端管理器:
@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
connectionOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
connectionOptions.setConnectionTimeout(30000);
connectionOptions.setMaxReconnectDelay(1000);
connectionOptions.setAutomaticReconnect(true);
Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
clientManager.setPersistence(new MqttDefaultFilePersistence());
return clientManager;
}
@Bean
public IntegrationFlow mqttInFlowTopic1(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
@Bean
public IntegrationFlow mqttInFlowTopic2(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
@Bean
public IntegrationFlow mqttOutFlow(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}
从版本 6.4 开始,现在可以通过 IntegrationFlowContext 使用相应的 ClientManager 在运行时添加多个 MqttPahoMessageDrivenChannelAdapter 和 Mqttv5PahoMessageDrivenChannelAdapter 实例。
private void addAddRuntimeAdapter(IntegrationFlowContext flowContext, Mqttv5ClientManager clientManager,
String topic, MessageChannel channel) {
flowContext
.registration(
IntegrationFlow
.from(new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, topic))
.channel(channel)
.get())
.register();
}