MQTT 支持
Spring 集成提供入站和出站通道适配器以支持消息队列遥测传输 (MQTT) 协议。
您需要将此依赖项包含在您的项目中:
- 
Maven 
- 
Gradle 
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>6.4.0</version>
</dependency>compile "org.springframework.integration:spring-integration-mqtt:6.4.0"当前实现使用 Eclipse Paho MQTT 客户端库。
| XML 配置和本章的大部分内容都是关于 MQTT v3.1 协议支持和相应的 Paho 客户端的。 有关相应的协议支持,请参阅 MQTT v5 支持段落。 | 
两个适配器的配置都是使用DefaultMqttPahoClientFactory.
有关配置选项的更多信息,请参阅 Paho 文档。
| 我们建议配置 MqttConnectOptions对象并将其注入工厂,而不是在工厂本身上设置(已弃用的)选项。 | 
入站 (消息驱动) 通道适配器
入站通道适配器由MqttPahoMessageDrivenChannelAdapter.
为方便起见,您可以使用 namespace 对其进行配置。
最小配置可能如下所示:
<bean id="clientFactory"
        class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
    <property name="connectionOptions">
        <bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
            <property name="userName" value="${mqtt.username}"/>
            <property name="password" value="${mqtt.password}"/>
        </bean>
    </property>
</bean>
<int-mqtt:message-driven-channel-adapter id="mqttInbound"
    client-id="${mqtt.default.client.id}.src"
    url="${mqtt.url}"
    topics="sometopic"
    client-factory="clientFactory"
    channel="output"/>以下清单显示了可用的属性:
<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
    client-id="foo"  (1)
    url="tcp://localhost:1883"  (2)
    topics="bar,baz"  (3)
    qos="1,2"  (4)
    converter="myConverter"  (5)
    client-factory="clientFactory"  (6)
    send-timeout="123"  (7)
    error-channel="errors"  (8)
    recovery-interval="10000"  (9)
    manual-acks="false" (10)
    channel="out" />| 1 | 客户端 ID。 | 
| 2 | 代理 URL。 | 
| 3 | 此适配器从中接收消息的主题的逗号分隔列表。 | 
| 4 | 以逗号分隔的 QoS 值列表。 它可以是应用于所有主题的单个值,也可以是每个主题的值(在这种情况下,列表的长度必须相同)。 | 
| 5 | 一 MqttMessageConverter(可选)。
默认情况下,默认的DefaultPahoMessageConverter生成一条带有Stringpayload 中,其中包含以下标头:
 | 
| 6 | 客户端工厂。 | 
| 7 | 这 send()超时。
仅当通道可能阻塞(例如有界的QueueChannel目前已满)。 | 
| 8 | 错误通道。
下游异常将发送到此通道(如果提供),位于 ErrorMessage.
有效负载是一个MessagingException,其中包含 Failed 消息和 Cause。 | 
| 9 | 恢复间隔。
它控制适配器在发生故障后尝试重新连接的时间间隔。
它默认为 10000ms(10 秒)。 | 
| 10 | 确认模式;设置为 true 以手动确认。 | 
| 从版本 4.1 开始,您可以省略 URL。
相反,您可以在 serverURIs属性的DefaultMqttPahoClientFactory.
例如,这样做可以连接到高可用性 (HA) 集群。 | 
从版本 4.2.2 开始,MqttSubscribedEvent在适配器成功订阅主题时发布。MqttConnectionFailedEvent当连接或订阅失败时,将发布事件。
这些事件可以由实现ApplicationListener.
此外,一个名为recoveryInterval控制适配器在发生故障后尝试重新连接的时间间隔。
它默认为10000ms(10 秒)。
| 在版本 4.2.3 之前,当适配器停止时,客户端始终取消订阅。
这是不正确的,因为如果客户端 QOS 大于 0,我们需要保持订阅处于活动状态,以便消息到达
当适配器停止时,将在下次启动时交付。
这还需要设置 从版本 4.2.3 开始,如果 可以通过将 要恢复到 4.2.3 之前的行为,请使用 | 
| 从版本 5.0 开始, | 
在运行时添加和删除主题
从版本 4.1 开始,您可以通过编程方式更改适配器订阅的主题。
Spring 集成提供了addTopic()和removeTopic()方法。
添加主题时,您可以选择指定QoS(默认值:1)。
您还可以通过向<control-bus/>具有适当的有效负载 — 例如:"myMqttAdapter.addTopic('foo', 1)".
停止和启动适配器对主题列表没有影响(它不会恢复到配置中的原始设置)。 这些更改不会在应用程序上下文的生命周期之外保留。 新的应用程序上下文将恢复为配置的设置。
在适配器停止(或与代理断开连接)时更改主题将在下次建立连接时生效。
手动 Acks
从版本 5.3 开始,您可以设置manualAcksproperty 设置为 true。
通常用于异步确认投放。
当设置为true、标题 (IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK) 添加到消息中,其值为SimpleAcknowledgment.
您必须调用acknowledge()方法完成投放。
请参阅 Javadocs 以获取IMqttClient setManualAcks()和messageArrivedComplete()了解更多信息。
为方便起见,提供了 header 访问器:
StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();从 version 开始5.2.11,当消息转换器引发异常或返回null从MqttMessageconversion、MqttPahoMessageDrivenChannelAdapter发送一个ErrorMessage到errorChannel(如果提供)。
将此转换错误重新引发到 MQTT 客户端回调中。
使用 Java 配置进行配置
Spring 下面的 Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
                .web(false)
                .run(args);
    }
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
                                                 "topic1", "topic2");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }
        };
    }
}使用 Java DSL 进行配置
以下 Spring Boot 应用程序提供了使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }
    @Bean
    public IntegrationFlow mqttInbound() {
        return IntegrationFlow.from(
                         new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
                                        "testClient", "topic1", "topic2"))
                .handle(m -> System.out.println(m.getPayload()))
                .get();
    }
}出站通道适配器
出站通道适配器由MqttPahoMessageHandler,它包装在ConsumerEndpoint.
为方便起见,您可以使用 namespace 对其进行配置。
从版本 4.1 开始,适配器支持异步发送作,避免在确认投放之前阻塞。 如果需要,您可以发出应用程序事件以使应用程序能够确认交付。
下面的清单显示了可用于出站通道适配器的属性:
<int-mqtt:outbound-channel-adapter id="withConverter"
    client-id="foo"  (1)
    url="tcp://localhost:1883"  (2)
    converter="myConverter"  (3)
    client-factory="clientFactory"  (4)
    default-qos="1"  (5)
    qos-expression="" (6)
    default-retained="true"  (7)
    retained-expression="" (8)
    default-topic="bar"  (9)
    topic-expression="" (10)
    async="false"  (11)
    async-events="false"  (12)
    channel="target" />| 1 | 客户端 ID。 | 
| 2 | 代理 URL。 | 
| 3 | 一 MqttMessageConverter(可选)。
默认的DefaultPahoMessageConverter识别以下标头:
 | 
| 4 | 客户端工厂。 | 
| 5 | 默认服务质量。
如果没有,则使用它 mqtt_qosheader 或qos-expression返回null.
如果您提供自定义converter. | 
| 6 | 用于计算以确定 qos 的表达式。
默认值为 headers[mqtt_qos]. | 
| 7 | 保留标志的默认值。
如果没有,则使用它 mqtt_retainedheader 的 Header。
如果自定义converter。 | 
| 8 | 用于计算以确定保留布尔值的表达式。
默认值为 headers[mqtt_retained]. | 
| 9 | 将消息发送到的默认主题(如果为 no 则使用 mqtt_topicheader 的 Header)。 | 
| 10 | 用于确定目标主题的 VALUE 表达式。
默认值为 headers['mqtt_topic']. | 
| 11 | 什么时候 true,则调用方不会阻止。
相反,它会在发送消息时等待送达确认。
默认值为false(Send 会阻止,直到确认送达为止)。 | 
| 12 | 什么时候 async和async-events都是true一MqttMessageSentEvent(请参阅 事件)。
它包含消息、主题、messageId生成的clientId和clientInstance(每次连接客户端时递增)。
当客户端库确认投放时,MqttMessageDeliveredEvent被触发。
它包含messageId这clientId和clientInstance,使投放与send().
任何ApplicationListener或者事件入站通道适配器可以接收这些事件。
请注意,对于MqttMessageDeliveredEvent在MqttMessageSentEvent.
默认值为false. | 
| 从版本 4.1 开始,可以省略 URL。
相反,可以在 serverURIs属性的DefaultMqttPahoClientFactory.
例如,这将启用与高可用性 (HA) 集群的连接。 | 
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:
@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {
    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {
        void sendToMqtt(String data);
    }
}使用 Java DSL 进行配置
以下 Spring Boot 应用程序提供了使用 Java DSL 配置出站适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }
       @Bean
       public IntegrationFlow mqttOutboundFlow() {
           return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }
}事件
某些应用程序事件由适配器发布。
- 
MqttConnectionFailedEvent- 如果我们连接失败或随后连接丢失,则由两个适配器发布。 对于 MQTT v5 Paho 客户端,当服务器执行正常断开连接时,也会发出此事件,在这种情况下,cause的 Lost Connection 是null.
- 
MqttMessageSentEvent- 如果以异步模式运行,则在发送消息时由出站适配器发布。
- 
MqttMessageDeliveredEvent- 如果以异步模式运行,则当客户端指示已传送消息时,由出站适配器发布。
- 
MqttMessageNotDeliveredEvent- 如果在异步模式下运行,则当客户端指示消息尚未传送时,由出站适配器发布。
- 
MqttSubscribedEvent- 由入站适配器在订阅主题后发布。
这些事件可以由ApplicationListener<MqttIntegrationEvent>或使用@EventListener方法。
要确定事件的来源,请使用以下方法;您可以检查 bean 名称和/或 connect 选项(以访问服务器 URI 等)。
MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();MQTT v5 支持
从版本 5.5.5 开始,spring-integration-mqtt模块为 MQTT v5 协议提供通道适配器实现。
这org.eclipse.paho:org.eclipse.paho.mqttv5.client是一个optional依赖项,因此必须显式包含在目标项目中。
由于 MQTT v5 协议支持 MQTT 消息中的额外任意属性,因此MqttHeaderMapper引入了 Implement to Map to/from headers on publish 和 receive作。
默认情况下,(通过模式)它会映射所有接收到的*PUBLISH框架属性(包括用户属性)。
在出站端,它将这个标头子集映射到PUBLISH框架:contentType,mqtt_messageExpiryInterval,mqtt_responseTopic,mqtt_correlationData.
MQTT v5 协议的出站通道适配器以Mqttv5PahoMessageHandler.
它需要一个clientId和 MQTT 代理 URL 或MqttConnectionOptions参考。
它支持MqttClientPersistence选项,可以是async并且可以发出MqttIntegrationEvent对象(请参阅asyncEvents选项)。
如果请求消息负载是org.eclipse.paho.mqttv5.common.MqttMessage,它将通过内部的IMqttAsyncClient.
如果负载为byte[]它按原样用于目标MqttMessagepayload 进行发布。
如果有效负载是String它被转换为byte[]以发布。
其余用例委托给提供的MessageConverter这是一个IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME ConfigurableCompositeMessageConverterbean 中。
注意:提供的HeaderMapper<MqttProperties>当请求的消息负载已经是MqttMessage.
以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:
@Bean
public IntegrationFlow mqttOutFlow() {
    Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
    MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
    mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
    messageHandler.setHeaderMapper(mqttHeaderMapper);
    messageHandler.setAsync(true);
    messageHandler.setAsyncEvents(true);
    messageHandler.setConverter(mqttStringToBytesConverter());
    return f -> f.handle(messageHandler);
}| 这 org.springframework.integration.mqtt.support.MqttMessageConverter不能与Mqttv5PahoMessageHandler因为它的合约仅针对 MQTT v3 协议。 | 
如果连接在启动时或运行时失败,则Mqttv5PahoMessageHandler尝试在生成给此处理程序的下一条消息上重新连接。
如果此手动重新连接失败,则连接异常将抛回到调用方。
在这种情况下,应用标准的 Spring 集成错误处理过程,包括请求处理程序建议,例如重试或断路器。
有关更多信息,请参阅Mqttv5PahoMessageHandlerJavaDocs 及其超类。
MQTT v5 协议的入站通道适配器以Mqttv5PahoMessageDrivenChannelAdapter.
它需要一个clientId和 MQTT 代理 URL 或MqttConnectionOptionsreference 以及要订阅和使用的主题。
它支持MqttClientPersistence选项,默认情况下为 in-memory。
预期的payloadType (byte[])可以配置,并且它会传播到提供的SmartMessageConverter用于转换byte[]的接收MqttMessage.
如果manualAck选项,则IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACKheader 添加到消息中,以生成SimpleAcknowledgment.
这HeaderMapper<MqttProperties>用于映射PUBLISHframe 属性(包括用户属性)添加到 Target 消息标头中。
标准MqttMessage属性,例如qos,id,dup,retained以及接收到的主题始终映射到 Headers。
看MqttHeaders了解更多信息。
从版本 6.3 开始,Mqttv5PahoMessageDrivenChannelAdapter提供基于MqttSubscription进行精细配置,而不是简单的主题名称。
提供这些订阅后,qos选项,因为这样的qosmode 是 的一部分MqttSubscription应用程序接口。
以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:
@Bean
public IntegrationFlow mqttInFlow() {
    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
    messageProducer.setPayloadType(String.class);
    messageProducer.setMessageConverter(mqttStringToBytesConverter());
    messageProducer.setManualAcks(true);
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}| 这 org.springframework.integration.mqtt.support.MqttMessageConverter不能与Mqttv5PahoMessageDrivenChannelAdapter因为它的合约仅针对 MQTT v3 协议。 | 
有关更多信息,请参阅Mqttv5PahoMessageDrivenChannelAdapterJavaDocs 及其超类。
| 建议将 MqttConnectionOptions#setAutomaticReconnect(boolean)设置为 true 以允许内部IMqttAsyncClient实例来处理重新连接。
否则,只有手动重启Mqttv5PahoMessageDrivenChannelAdapter可以处理重新连接,例如 viaMqttConnectionFailedEvent断开连接时的处理。 | 
共享 MQTT 客户端支持
如果多个集成需要一个 MQTT ClientID,则不能使用多个 MQTT 客户端实例,因为 MQTT 代理可能对每个 ClientID 的连接数量有限制(通常允许单个连接)。
为了将单个客户端重新用于不同的通道适配器,一个org.springframework.integration.mqtt.core.ClientManager组件,并将其传递给任何需要的通道适配器。
它将管理 MQTT 连接生命周期,并在需要时自动重新连接。
此外,自定义连接选项和MqttClientPersistence可以提供给 Client Manager,就像当前对 Channel Adapter 组件所做的那样。
请注意,MQTT v5 和 v3 通道适配器均受支持。
以下 Java DSL 配置示例演示了如何在集成流中使用此客户端管理器:
@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
    MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
    connectionOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
    connectionOptions.setConnectionTimeout(30000);
    connectionOptions.setMaxReconnectDelay(1000);
    connectionOptions.setAutomaticReconnect(true);
    Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
    clientManager.setPersistence(new MqttDefaultFilePersistence());
    return clientManager;
}
@Bean
public IntegrationFlow mqttInFlowTopic1(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}
@Bean
public IntegrationFlow mqttInFlowTopic2(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}
@Bean
public IntegrationFlow mqttOutFlow(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
    return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}| 从版本 6.4 开始,多个 MqttPahoMessageDrivenChannelAdapter和Mqttv5PahoMessageDrivenChannelAdapter现在可以在运行时使用相应的ClientManager通过IntegrationFlowContext | 
private void addAddRuntimeAdapter(IntegrationFlowContext flowContext, Mqttv5ClientManager clientManager,
                                  String topic, MessageChannel channel) {
    flowContext
        .registration(
            IntegrationFlow
                .from(new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, topic))
                .channel(channel)
                .get())
        .register();
}