小伙伴们,你们好呀,我是老寇,跟我一起学习对接MQTT
安装EMQX
采用docker-compose一键式启动!!!
还没有安装docker朋友,参考文章下面两篇文章
使用 emqx 5.4.1,按照老夫的教程来,请不要改版本号!!!
使用 emqx 5.4.1,按照老夫的教程来,请不要改版本号!!!
使用 emqx 5.4.1,按照老夫的教程来,请不要改版本号!!!
services: emqx: image: emqx/emqx:5.4.1 container_name: emqx # 保持容器在没有守护程序的情况下运行 tty: true restart: always privileged: true ports: - "1883:1883" - "8083:8083" - "8883:8883" - "18083:18083" environment: - TZ=Asia/Shanghai volumes: # 挂载数据存储 - ./emqx/data:/opt/emqx/data # 挂载日志文件 - ./emqx/log:/opt/emqx/log networks: - laokou_network networks: laokou_network: driver: bridge
访问 http://127.0.0.1:18083 设置密码

EMQX MQTT【摘抄自官方文档】
MQTT 是物联网 (IoT) 的 OASIS 标准消息传递协议。它被设计为一种极轻量的发布/订阅消息传输协议,非常适合以较小的代码占用空间和极低的网络带宽连接远程设备。MQTT 目前广泛应用于汽车、制造、电信、石油和天然气等众多行业。
EMQX 完全兼容 MQTT 5.0 和 3.x,本节将介绍 MQTT 相关功能的基本配置项,包括基本 MQTT 设置、订阅设置、会话设置、强制关闭设置和强制垃圾回收设置等
客户端对接
本文章采用三种客户端对接
| 维度 | Paho | Hivemq-MQTT-Client | Vert.x MQTT Client |
|---|---|---|---|
| 协议支持 | MQTT 3.1.1(5.0 实验性) | MQTT 5.0 完整支持 | MQTT 5.0(较新版本) |
| 性能 | 中(同步模式) | 高(异步非阻塞) | 极高(响应式架构) |
| 依赖复杂度 | 低 | 中(仅 Netty) | 高(需 Vert.x 生态) |
| 社区资源 | 丰富 | 较少 | 中等 |
| 适用场景 | 传统 IoT、跨语言项目 | 企业级 MQTT 5.0、高吞吐 | 响应式系统、高并发微服务 |
Paho【不推荐,连接不稳定】
引入依赖
<dependencies> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.mqttv5.client</artifactId> <version>1.2.5</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency> </dependencies>
项目集成
PahoProperties
/** * @author laokou */ @Data public class PahoProperties { private boolean auth = true; private String username = "emqx"; private String password = "laokou123"; private String host = "127.0.0.1"; private int port = 1883; private String clientId; private int subscribeQos = 1; private int publishQos = 0; private int willQos = 1; private int connectionTimeout = 60; private boolean manualAcks = false; // @formatter:off /** * 控制是否创建新会话(true=新建,false=复用历史会话). clearStart=true => Broker 会在连接断开后立即清除所有会话信息. * clearStart=false => Broker 会在连接断开后保存会话信息,并在重新连接后复用会话信息. * <a href="https://github.com/hivemq/hivemq-mqtt-client/issues/627">...</a> */ // @formatter:on private boolean clearStart = false; private int receiveMaximum = 10000; private int maximumPacketSize = 10000; // @formatter:off /** * 默认会话保留一天. * 最大值,4294967295L,会话过期时间【永不过期,单位秒】. * 定义客户端断开后会话保留的时间(仅在 Clean Session = false 时生效). */ private long sessionExpiryInterval = 86400L; // @formatter:on /** * 心跳包每隔60秒发一次. */ private int keepAliveInterval = 60; private boolean automaticReconnect = true; private Set<String> topics = new HashSet<>(0); }
PahoMqttClientMessageCallbackV5
/** * @author laokou */ @Slf4j @RequiredArgsConstructor public class PahoMqttClientMessageCallbackV5 implements MqttCallback { private final List<MessageHandler> messageHandlers; @Override public void disconnected(MqttDisconnectResponse disconnectResponse) { log.error("【Paho-V5】 => MQTT关闭连接"); } @Override public void mqttErrorOccurred(MqttException ex) { log.error("【Paho-V5】 => MQTT报错,错误信息:{}", ex.getMessage()); } @Override public void messageArrived(String topic, MqttMessage message) { for (MessageHandler messageHandler : messageHandlers) { if (messageHandler.isSubscribe(topic)) { log.info("【Paho-V5】 => MQTT接收到消息,Topic:{}", topic); messageHandler.handle(new org.laokou.sample.mqtt.handler.MqttMessage(message.getPayload(), topic)); } } } @Override public void deliveryComplete(IMqttToken token) { log.info("【Paho-V5】 => MQTT消息发送成功,消息ID:{}", token.getMessageId()); } @Override public void connectComplete(boolean reconnect, String uri) { if (reconnect) { log.info("【Paho-V5】 => MQTT重连成功,URI:{}", uri); } else { log.info("【Paho-V5】 => MQTT建立连接,URI:{}", uri); } } @Override public void authPacketArrived(int reasonCode, MqttProperties properties) { log.info("【Paho-V5】 => 接收到身份验证数据包:{}", reasonCode); } }
PahoV5MqttClientTest
/** * @author laokou */ @SpringBootTest @RequiredArgsConstructor @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL) class PahoV5MqttClientTest { private final List<MessageHandler> messageHandlers; @Test void testMqttClient() throws InterruptedException { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16); PahoProperties pahoProperties = new PahoProperties(); pahoProperties.setClientId("test-client-3"); pahoProperties.setTopics(Set.of("/test-topic-3/#")); PahoMqttClientV5 pahoMqttClientV5 = new PahoMqttClientV5(pahoProperties, messageHandlers, scheduledExecutorService); pahoMqttClientV5.open(); Thread.sleep(1000); pahoMqttClientV5.publish("/test-topic-3/789", "Hello World789".getBytes()); } }
PahoMqttClientMessageCallbackV3
/** * @author laokou */ @Slf4j @RequiredArgsConstructor public class PahoMqttClientMessageCallbackV3 implements MqttCallback { private final List<MessageHandler> messageHandlers; @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { log.info("【Paho-V3】 => MQTT消息发送成功,消息ID:{}", iMqttDeliveryToken.getMessageId()); } @Override public void connectionLost(Throwable throwable) { log.error("【Paho-V3】 => MQTT关闭连接"); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { for (MessageHandler messageHandler : messageHandlers) { if (messageHandler.isSubscribe(topic)) { log.info("【Paho-V3】 => MQTT接收到消息,Topic:{}", topic); messageHandler.handle(new org.laokou.sample.mqtt.handler.MqttMessage(message.getPayload(), topic)); } } } }
PahoV3MqttClientTest
/** * @author laokou */ @SpringBootTest @RequiredArgsConstructor @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL) class PahoV3MqttClientTest { private final List<MessageHandler> messageHandlers; @Test void testMqttClient() throws InterruptedException { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16); PahoProperties pahoProperties2 = new PahoProperties(); pahoProperties2.setClientId("test-client-4"); pahoProperties2.setTopics(Set.of("/test-topic-4/#")); PahoMqttClientV3 pahoMqttClientV3 = new PahoMqttClientV3(pahoProperties2, messageHandlers, scheduledExecutorService); pahoMqttClientV3.open(); Thread.sleep(1000); pahoMqttClientV3.publish("/test-topic-4/000", "Hello World000".getBytes()); } }
Hivemq-MQTT-Client【不推荐】
注意:订阅一段时间收不到数据,标准mqtt5.0协议,不兼容emqx broker mqtt5.0
引入依赖
<dependencies> <dependency> <groupId>com.hivemq</groupId> <artifactId>hivemq-mqtt-client-reactor</artifactId> <version>1.3.5</version> </dependency> <dependency> <groupId>com.hivemq</groupId> <artifactId>hivemq-mqtt-client-epoll</artifactId> <version>1.3.5</version> <type>pom</type> </dependency> <dependencies>
项目集成
HivemqProperties
/** * @author laokou */ @Data public class HivemqProperties { private boolean auth = true; private String username = "emqx"; private String password = "laokou123"; private String host = "127.0.0.1"; private int port = 1883; private String clientId; private int subscribeQos = 1; private int publishQos = 0; private int willQos = 1; // @formatter:off /** * 控制是否创建新会话(true=新建,false=复用历史会话). clearStart=true => Broker 会在连接断开后立即清除所有会话信息. * clearStart=false => Broker 会在连接断开后保存会话信息,并在重新连接后复用会话信息. * <a href="https://github.com/hivemq/hivemq-mqtt-client/issues/627">...</a> */ // @formatter:on private boolean clearStart = false; private int receiveMaximum = 10000; private int sendMaximum = 10000; private int maximumPacketSize = 10000; private int sendMaximumPacketSize = 10000; private int topicAliasMaximum = 1024; private int sendTopicAliasMaximum = 2048; private long messageExpiryInterval = 86400L; private boolean requestProblemInformation = true; private boolean requestResponseInformation = true; // @formatter:off /** * 默认会话保留一天. * 最大值,4294967295L,会话过期时间【永不过期,单位秒】. * 定义客户端断开后会话保留的时间(仅在 Clean Session = false 时生效). */ private long sessionExpiryInterval = 86400L; // @formatter:on /** * 心跳包每隔60秒发一次. */ private int keepAliveInterval = 60; private boolean automaticReconnect = true; private long automaticReconnectMaxDelay = 5; private long automaticReconnectInitialDelay = 1; private Set<String> topics = new HashSet<>(0); private int nettyThreads = 32; private boolean retain = false; private boolean noLocal = false; }
HivemqClientV5
/** * @author laokou */ @Slf4j public class HivemqClientV5 { /** * 响应主题. */ private final String RESPONSE_TOPIC = "response/topic"; /** * 服务下线数据. */ private final byte[] WILL_PAYLOAD = "offline".getBytes(UTF_8); /** * 相关数据. */ private final byte[] CORRELATION_DATA = "correlationData".getBytes(UTF_8); private final HivemqProperties hivemqProperties; private final List<MessageHandler> messageHandlers; private volatile Mqtt5RxClient client; private final Object lock = new Object(); private volatile Disposable connectDisposable; private volatile Disposable subscribeDisposable; private volatile Disposable unSubscribeDisposable; private volatile Disposable publishDisposable; private volatile Disposable disconnectDisposable; private volatile Disposable consumeDisposable; public HivemqClientV5(HivemqProperties hivemqProperties, List<MessageHandler> messageHandlers) { this.hivemqProperties = hivemqProperties; this.messageHandlers = messageHandlers; } public void open() { if (Objects.isNull(client)) { synchronized (lock) { if (Objects.isNull(client)) { client = getMqtt5ClientBuilder().buildRx(); } } } connect(); consume(); } public void close() { if (!Objects.isNull(client)) { disconnectDisposable = client.disconnectWith() .sessionExpiryInterval(hivemqProperties.getSessionExpiryInterval()) .applyDisconnect() .subscribeOn(Schedulers.io()) .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1) .takeWhile(retryCount -> retryCount != -1) .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))) .subscribe(() -> log.info("【Hivemq-V5】 => MQTT断开连接成功,客户端ID:{}", hivemqProperties.getClientId()), e -> log.error("【Hivemq-V5】 => MQTT断开连接失败,错误信息:{}", e.getMessage(), e)); } } public void subscribe() { String[] topics = getTopics(); subscribe(topics, getQosArray(topics)); } public String[] getTopics() { return hivemqProperties.getTopics().toArray(String[]::new); } public int[] getQosArray(String[] topics) { return Stream.of(topics).mapToInt(item -> hivemqProperties.getSubscribeQos()).toArray(); } public void subscribe(String[] topics, int[] qosArray) { checkTopicAndQos(topics, qosArray); if (!Objects.isNull(client)) { List<Mqtt5Subscription> subscriptions = new ArrayList<>(topics.length); for (int i = 0; i < topics.length; i++) { subscriptions.add(Mqtt5Subscription.builder() .topicFilter(topics[i]) .qos(getMqttQos(qosArray[i])) .retainAsPublished(hivemqProperties.isRetain()) .noLocal(hivemqProperties.isNoLocal()) .build()); } subscribeDisposable = client.subscribeWith() .addSubscriptions(subscriptions) .applySubscribe() .subscribeOn(Schedulers.io()) .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1) .takeWhile(retryCount -> retryCount != -1) .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))) .subscribe(ack -> log.info("【Hivemq-V5】 => MQTT订阅成功,主题: {}", String.join("、", topics)), e -> log .error("【Hivemq-V5】 => MQTT订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e)); } } public void unSubscribe() { String[] topics = hivemqProperties.getTopics().toArray(String[]::new); unSubscribe(topics); } public void unSubscribe(String[] topics) { checkTopic(topics); if (!Objects.isNull(client)) { List<MqttTopicFilter> matchedTopics = new ArrayList<>(topics.length); for (String topic : topics) { matchedTopics.add(MqttTopicFilter.of(topic)); } unSubscribeDisposable = client.unsubscribeWith() .addTopicFilters(matchedTopics) .applyUnsubscribe() .subscribeOn(Schedulers.io()) .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1) .takeWhile(retryCount -> retryCount != -1) .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))) .subscribe(ack -> log.info("【Hivemq-V5】 => MQTT取消订阅成功,主题:{}", String.join("、", topics)), e -> log .error("【Hivemq-V5】 => MQTT取消订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e)); } } public void publish(String topic, byte[] payload, int qos) { if (!Objects.isNull(client)) { publishDisposable = client .publish(Flowable.just(Mqtt5Publish.builder() .topic(topic) .qos(getMqttQos(qos)) .payload(payload) .noMessageExpiry() .retain(hivemqProperties.isRetain()) .messageExpiryInterval(hivemqProperties.getMessageExpiryInterval()) .correlationData(CORRELATION_DATA) .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) .contentType("text/plain") .responseTopic(RESPONSE_TOPIC) .build())) .subscribeOn(Schedulers.io()) .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1) .takeWhile(retryCount -> retryCount != -1) .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))) .subscribe(ack -> log.info("【Hivemq-V5】 => MQTT消息发布成功,topic:{}", topic), e -> log.error("【Hivemq-V5】 => MQTT消息发布失败,topic:{},错误信息:{}", topic, e.getMessage(), e)); } } public void publish(String topic, byte[] payload) { publish(topic, payload, hivemqProperties.getPublishQos()); } public void dispose(Disposable disposable) { if (!Objects.isNull(disposable) && !disposable.isDisposed()) { // 显式取消订阅 disposable.dispose(); } } public void dispose() { dispose(connectDisposable); dispose(subscribeDisposable); dispose(unSubscribeDisposable); dispose(publishDisposable); dispose(consumeDisposable); dispose(disconnectDisposable); } public void reSubscribe() { log.info("【Hivemq-V5】 => MQTT重新订阅开始"); dispose(subscribeDisposable); subscribe(); log.info("【Hivemq-V5】 => MQTT重新订阅结束"); } private MqttQos getMqttQos(int qos) { return MqttQos.fromCode(qos); } private void connect() { connectDisposable = client.connectWith() .keepAlive(hivemqProperties.getKeepAliveInterval()) .cleanStart(hivemqProperties.isClearStart()) .sessionExpiryInterval(hivemqProperties.getSessionExpiryInterval()) .willPublish() .topic("will/topic") .payload(WILL_PAYLOAD) .qos(getMqttQos(hivemqProperties.getWillQos())) .retain(true) .messageExpiryInterval(100) .delayInterval(10) .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) .contentType("text/plain") .responseTopic(RESPONSE_TOPIC) .correlationData(CORRELATION_DATA) .applyWillPublish() .restrictions() .receiveMaximum(hivemqProperties.getReceiveMaximum()) .sendMaximum(hivemqProperties.getSendMaximum()) .maximumPacketSize(hivemqProperties.getMaximumPacketSize()) .sendMaximumPacketSize(hivemqProperties.getSendMaximumPacketSize()) .topicAliasMaximum(hivemqProperties.getTopicAliasMaximum()) .sendTopicAliasMaximum(hivemqProperties.getSendTopicAliasMaximum()) .requestProblemInformation(hivemqProperties.isRequestProblemInformation()) .requestResponseInformation(hivemqProperties.isRequestResponseInformation()) .applyRestrictions() .applyConnect() .toFlowable() .firstElement() .subscribeOn(Schedulers.io()) .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1) .takeWhile(retryCount -> retryCount != -1) .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))) .subscribe( ack -> log.info("【Hivemq-V5】 => MQTT连接成功,主机:{},端口:{},客户端ID:{}", hivemqProperties.getHost(), hivemqProperties.getPort(), hivemqProperties.getClientId()), e -> log.error("【Hivemq-V5】 => MQTT连接失败,错误信息:{}", e.getMessage(), e)); } private void consume() { if (!Objects.isNull(client)) { consumeDisposable = client.publishes(MqttGlobalPublishFilter.ALL) .onBackpressureBuffer(8192) .observeOn(Schedulers.computation(), false, 8192) .doOnSubscribe(subscribe -> { log.info("【Hivemq-V5】 => MQTT开始订阅消息,请稍候。。。。。。"); reSubscribe(); }) .subscribeOn(Schedulers.io()) .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1) .takeWhile(retryCount -> retryCount != -1) .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))) .subscribe(publish -> { for (MessageHandler messageHandler : messageHandlers) { if (messageHandler.isSubscribe(publish.getTopic().toString())) { log.info("【Hivemq-V5】 => MQTT接收到消息,Topic:{}", publish.getTopic()); messageHandler .handle(new MqttMessage(publish.getPayloadAsBytes(), publish.getTopic().toString())); } } }, e -> log.error("【Hivemq-V5】 => MQTT消息处理失败,错误信息:{}", e.getMessage(), e), () -> log.info("【Hivemq-V5】 => MQTT订阅消息结束,请稍候。。。。。。")); } } private Mqtt5ClientBuilder getMqtt5ClientBuilder() { Mqtt5ClientBuilder builder = Mqtt5Client.builder().addConnectedListener(listener -> { Optional<? extends MqttClientConnectionConfig> config = Optional .of(listener.getClientConfig().getConnectionConfig()) .get(); config.ifPresent(mqttClientConnectionConfig -> log.info("【Hivemq-V5】 => MQTT连接保持时间:{}ms", mqttClientConnectionConfig.getKeepAlive())); log.info("【Hivemq-V5】 => MQTT已连接,客户端ID:{}", hivemqProperties.getClientId()); }) .addDisconnectedListener( listener -> log.error("【Hivemq-V5】 => MQTT已断开连接,客户端ID:{}", hivemqProperties.getClientId())) .identifier(hivemqProperties.getClientId()) .serverHost(hivemqProperties.getHost()) .serverPort(hivemqProperties.getPort()) .executorConfig(MqttClientExecutorConfig.builder() .nettyExecutor(ThreadUtils.newVirtualTaskExecutor()) .nettyThreads(hivemqProperties.getNettyThreads()) .applicationScheduler(Schedulers.from(ThreadUtils.newVirtualTaskExecutor())) .build()); // 开启重连 if (hivemqProperties.isAutomaticReconnect()) { builder.automaticReconnect() .initialDelay(hivemqProperties.getAutomaticReconnectInitialDelay(), TimeUnit.SECONDS) .maxDelay(hivemqProperties.getAutomaticReconnectMaxDelay(), TimeUnit.SECONDS) .applyAutomaticReconnect(); } if (hivemqProperties.isAuth()) { builder.simpleAuth() .username(hivemqProperties.getUsername()) .password(hivemqProperties.getPassword().getBytes()) .applySimpleAuth(); } return builder; } private void checkTopicAndQos(String[] topics, int[] qosArray) { if (topics == null || qosArray == null) { throw new IllegalArgumentException("【" + "Hivemq-V5" + "】 => Topics and QoS arrays cannot be null"); } if (topics.length != qosArray.length) { throw new IllegalArgumentException("【" + "Hivemq-V5" + "】 => Topics and QoS arrays must have the same length"); } if (topics.length == 0) { throw new IllegalArgumentException("【" + "Hivemq-V5" + "】 => Topics array cannot be empty"); } } private void checkTopic(String[] topics) { if (topics.length == 0) { throw new IllegalArgumentException("【" + "Hivemq-V5" + "】 => Topics array cannot be empty"); } } }
HivemqV5MqttClientTest
/** * @author laokou */ @SpringBootTest @RequiredArgsConstructor @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL) class HivemqV5MqttClientTest { private final List<MessageHandler> messageHandlers; @Test void testMqttClient() throws InterruptedException { HivemqProperties hivemqProperties = new HivemqProperties(); hivemqProperties.setClientId("test-client-1"); hivemqProperties.setTopics(Set.of("/test-topic-1/#")); HivemqClientV5 hivemqClientV5 = new HivemqClientV5(hivemqProperties, messageHandlers); hivemqClientV5.open(); hivemqClientV5.publish("/test-topic-1/123", "Hello World123".getBytes()); } }
HivemqClientV3
/** * @author laokou */ @Slf4j public class HivemqClientV3 { /** * 服务下线数据. */ private final byte[] WILL_PAYLOAD = "offline".getBytes(UTF_8); private final HivemqProperties hivemqProperties; private final List<MessageHandler> messageHandlers; private volatile Mqtt3RxClient client; private final Object lock = new Object(); private volatile Disposable connectDisposable; private volatile Disposable subscribeDisposable; private volatile Disposable unSubscribeDisposable; private volatile Disposable publishDisposable; private volatile Disposable disconnectDisposable; private volatile Disposable consumeDisposable; public HivemqClientV3(HivemqProperties hivemqProperties, List<MessageHandler> messageHandlers) { this.hivemqProperties = hivemqProperties; this.messageHandlers = messageHandlers; } public void open() { if (Objects.isNull(client)) { synchronized (lock) { if (Objects.isNull(client)) { client = getMqtt3ClientBuilder().buildRx(); } } } connect(); consume(); } public void close() { if (!Objects.isNull(client)) { disconnectDisposable = client.disconnect() .subscribeOn(Schedulers.io()) .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1) .takeWhile(retryCount -> retryCount != -1) .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))) .subscribe(() -> log.info("【Hivemq-V3】 => MQTT断开连接成功,客户端ID:{}", hivemqProperties.getClientId()), e -> log.error("【Hivemq-V3】 => MQTT断开连接失败,错误信息:{}", e.getMessage(), e)); } } public void subscribe() { String[] topics = getTopics(); subscribe(topics, getQosArray(topics)); } public String[] getTopics() { return hivemqProperties.getTopics().toArray(String[]::new); } public int[] getQosArray(String[] topics) { return Stream.of(topics).mapToInt(item -> hivemqProperties.getSubscribeQos()).toArray(); } public void subscribe(String[] topics, int[] qosArray) { checkTopicAndQos(topics, qosArray); if (!Objects.isNull(client)) { List<Mqtt3Subscription> subscriptions = new ArrayList<>(topics.length); for (int i = 0; i < topics.length; i++) { subscriptions.add(Mqtt3Subscription.builder() .topicFilter(topics[i]) .qos(getMqttQos(qosArray[i])) .build()); } subscribeDisposable = client.subscribeWith() .addSubscriptions(subscriptions) .applySubscribe() .subscribeOn(Schedulers.io()) .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1) .takeWhile(retryCount -> retryCount != -1) .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))) .subscribe(ack -> log.info("【Hivemq-V3】 => MQTT订阅成功,主题: {}", String.join("、", topics)), e -> log .error("【Hivemq-V3】 => MQTT订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e)); } } public void unSubscribe() { String[] topics = hivemqProperties.getTopics().toArray(String[]::new); unSubscribe(topics); } public void unSubscribe(String[] topics) { checkTopic(topics); if (!Objects.isNull(client)) { List<MqttTopicFilter> matchedTopics = new ArrayList<>(topics.length); for (String topic : topics) { matchedTopics.add(MqttTopicFilter.of(topic)); } unSubscribeDisposable = client.unsubscribeWith() .addTopicFilters(matchedTopics) .applyUnsubscribe() .subscribeOn(Schedulers.io()) .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1) .takeWhile(retryCount -> retryCount != -1) .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))) .subscribe(() -> log.info("【Hivemq-V3】 => MQTT取消订阅成功,主题:{}", String.join("、", topics)), e -> log .error("【Hivemq-V3】 => MQTT取消订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e)); } } public void publish(String topic, byte[] payload, int qos) { if (!Objects.isNull(client)) { publishDisposable = client .publish(Flowable.just(Mqtt3Publish.builder() .topic(topic) .qos(getMqttQos(qos)) .payload(payload) .retain(hivemqProperties.isRetain()) .build())) .subscribeOn(Schedulers.io()) .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1) .takeWhile(retryCount -> retryCount != -1) .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))) .subscribe(ack -> log.info("【Hivemq-V3】 => MQTT消息发布成功,topic:{}", topic), e -> log.error("【Hivemq-V3】 => MQTT消息发布失败,topic:{},错误信息:{}", topic, e.getMessage(), e)); } } public void publish(String topic, byte[] payload) { publish(topic, payload, hivemqProperties.getPublishQos()); } public void dispose(Disposable disposable) { if (!Objects.isNull(disposable) && !disposable.isDisposed()) { // 显式取消订阅 disposable.dispose(); } } public void dispose() { dispose(connectDisposable); dispose(subscribeDisposable); dispose(unSubscribeDisposable); dispose(publishDisposable); dispose(consumeDisposable); dispose(disconnectDisposable); } public void reSubscribe() { log.info("【Hivemq-V3】 => MQTT重新订阅开始"); dispose(subscribeDisposable); subscribe(); log.info("【Hivemq-V3】 => MQTT重新订阅结束"); } private MqttQos getMqttQos(int qos) { return MqttQos.fromCode(qos); } private void connect() { connectDisposable = client.connectWith() .keepAlive(hivemqProperties.getKeepAliveInterval()) .willPublish() .topic("will/topic") .payload(WILL_PAYLOAD) .qos(getMqttQos(hivemqProperties.getWillQos())) .retain(true) .applyWillPublish() .restrictions() .sendMaximum(hivemqProperties.getSendMaximum()) .sendMaximumPacketSize(hivemqProperties.getSendMaximumPacketSize()) .applyRestrictions() .applyConnect() .toFlowable() .firstElement() .subscribeOn(Schedulers.io()) .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1) .takeWhile(retryCount -> retryCount != -1) .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))) .subscribe( ack -> log.info("【Hivemq-V3】 => MQTT连接成功,主机:{},端口:{},客户端ID:{}", hivemqProperties.getHost(), hivemqProperties.getPort(), hivemqProperties.getClientId()), e -> log.error("【Hivemq-V3】 => MQTT连接失败,错误信息:{}", e.getMessage(), e)); } private void consume() { if (!Objects.isNull(client)) { consumeDisposable = client.publishes(MqttGlobalPublishFilter.ALL) .onBackpressureBuffer(8192) .observeOn(Schedulers.computation(), false, 8192) .doOnSubscribe(subscribe -> { log.info("【Hivemq-V3】 => MQTT开始订阅消息,请稍候。。。。。。"); reSubscribe(); }) .subscribeOn(Schedulers.io()) .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1) .takeWhile(retryCount -> retryCount != -1) .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))) .subscribe(publish -> { for (MessageHandler messageHandler : messageHandlers) { if (messageHandler.isSubscribe(publish.getTopic().toString())) { log.info("【Hivemq-V3】 => MQTT接收到消息,Topic:{}", publish.getTopic()); messageHandler .handle(new MqttMessage(publish.getPayloadAsBytes(), publish.getTopic().toString())); } } }, e -> log.error("【Hivemq-V3】 => MQTT消息处理失败,错误信息:{}", e.getMessage(), e), () -> log.info("【Hivemq-V3】 => MQTT订阅消息结束,请稍候。。。。。。")); } } private Mqtt3ClientBuilder getMqtt3ClientBuilder() { Mqtt3ClientBuilder builder = Mqtt3Client.builder().addConnectedListener(listener -> { Optional<? extends MqttClientConnectionConfig> config = Optional .of(listener.getClientConfig().getConnectionConfig()) .get(); config.ifPresent(mqttClientConnectionConfig -> log.info("【Hivemq-V5】 => MQTT连接保持时间:{}ms", mqttClientConnectionConfig.getKeepAlive())); log.info("【Hivemq-V3】 => MQTT已连接,客户端ID:{}", hivemqProperties.getClientId()); }) .addDisconnectedListener( listener -> log.error("【Hivemq-V3】 => MQTT已断开连接,客户端ID:{}", hivemqProperties.getClientId())) .identifier(hivemqProperties.getClientId()) .serverHost(hivemqProperties.getHost()) .serverPort(hivemqProperties.getPort()) .executorConfig(MqttClientExecutorConfig.builder() .nettyExecutor(ThreadUtils.newVirtualTaskExecutor()) .nettyThreads(hivemqProperties.getNettyThreads()) .applicationScheduler(Schedulers.from(ThreadUtils.newVirtualTaskExecutor())) .build()); // 开启重连 if (hivemqProperties.isAutomaticReconnect()) { builder.automaticReconnect() .initialDelay(hivemqProperties.getAutomaticReconnectInitialDelay(), TimeUnit.SECONDS) .maxDelay(hivemqProperties.getAutomaticReconnectMaxDelay(), TimeUnit.SECONDS) .applyAutomaticReconnect(); } if (hivemqProperties.isAuth()) { builder.simpleAuth() .username(hivemqProperties.getUsername()) .password(hivemqProperties.getPassword().getBytes()) .applySimpleAuth(); } return builder; } private void checkTopicAndQos(String[] topics, int[] qosArray) { if (topics == null || qosArray == null) { throw new IllegalArgumentException("【" + "Hivemq-V3" + "】 => Topics and QoS arrays cannot be null"); } if (topics.length != qosArray.length) { throw new IllegalArgumentException("【" + "Hivemq-V3" + "】 => Topics and QoS arrays must have the same length"); } if (topics.length == 0) { throw new IllegalArgumentException("【" + "Hivemq-V3" + "】 => Topics array cannot be empty"); } } private void checkTopic(String[] topics) { if (topics.length == 0) { throw new IllegalArgumentException("【" + "Hivemq-V3" + "】 => Topics array cannot be empty"); } } }
HivemqV3MqttClientTest
/** * @author laokou */ @SpringBootTest @RequiredArgsConstructor @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL) class HivemqV3MqttClientTest { private final List<MessageHandler> messageHandlers; @Test void testMqttClient() throws InterruptedException { HivemqProperties hivemqProperties2 = new HivemqProperties(); hivemqProperties2.setClientId("test-client-2"); hivemqProperties2.setTopics(Set.of("/test-topic-2/#")); HivemqClientV3 hivemqClientV3 = new HivemqClientV3(hivemqProperties2, messageHandlers); hivemqClientV3.open(); hivemqClientV3.publish("/test-topic-2/456", "Hello World456".getBytes()); } }
Vert.x MQTT Client【推荐,只兼容mqtt3.1.1】
引入依赖
<dependencies> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-mqtt</artifactId> <version>4.5.14</version> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.7.5</version> </dependency> </dependencies>
项目集成
MqttClientProperties
/** * @author laokou */ @Data public class MqttClientProperties { private boolean auth = true; private String username = "emqx"; private String password = "laokou123"; private String host = "127.0.0.1"; private int port = 1883; private String clientId = UUIDGenerator.generateUUID(); // @formatter:off /** * 控制是否创建新会话(true=新建,false=复用历史会话). clearStart=true => Broker 会在连接断开后立即清除所有会话信息. * clearStart=false => Broker 会在连接断开后保存会话信息,并在重新连接后复用会话信息. */ // @formatter:on private boolean clearSession = false; private int receiveBufferSize = Integer.MAX_VALUE; private int maxMessageSize = -1; /** * 心跳包每隔60秒发一次. */ private int keepAliveInterval = 60; private boolean autoKeepAlive = true; private long reconnectInterval = 1000; private int reconnectAttempts = Integer.MAX_VALUE; private Map<String, Integer> topics = new HashMap<>(0); private int willQos = 1; private boolean willRetain = false; private int ackTimeout = -1; private boolean autoAck = true; /** * 服务下线主题. */ private String willTopic = "/will"; /** * 服务下线数据. */ private String willPayload = "offline"; }
VertxConfig
/** * @author laokou */ @Configuration public class VertxConfig { @Bean public Vertx vertx() { VertxOptions vertxOptions = new VertxOptions(); vertxOptions.setMaxEventLoopExecuteTime(60); vertxOptions.setMaxWorkerExecuteTime(60); vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS); vertxOptions.setMaxWorkerExecuteTimeUnit(TimeUnit.SECONDS); vertxOptions.setPreferNativeTransport(true); return Vertx.vertx(vertxOptions); } }
VertxMqttClient
注意:vertx-mqtt不支持客户端自动断线重连,网络不通畅或连接关闭,需要自己手动调用连接!!!实现这个重连的功能
/** * @author laokou */ @Slf4j public class VertxMqttClient { private final Sinks.Many<MqttPublishMessage> messageSink = Sinks.many() .multicast() .onBackpressureBuffer(Integer.MAX_VALUE, false); private final MqttClient mqttClient; private final Vertx vertx; private final MqttClientProperties mqttClientProperties; private final List<MessageHandler> messageHandlers; private final List<Disposable> disposables; private final AtomicBoolean isConnected = new AtomicBoolean(false); private final AtomicBoolean isLoaded = new AtomicBoolean(false); private final AtomicBoolean isReconnected = new AtomicBoolean(true); public VertxMqttClient(final Vertx vertx, final MqttClientProperties mqttClientProperties, final List<MessageHandler> messageHandlers) { this.vertx = vertx; this.mqttClientProperties = mqttClientProperties; this.mqttClient = MqttClient.create(vertx, getOptions()); this.messageHandlers = messageHandlers; this.disposables = Collections.synchronizedList(new ArrayList<>()); } public void open() { mqttClient.closeHandler(v -> { isConnected.set(false); log.error("【Vertx-MQTT】 => MQTT连接断开,客户端ID:{}", mqttClientProperties.getClientId()); reconnect(); }) .publishHandler(messageSink::tryEmitNext) // 仅接收QoS1和QoS2的消息 .publishCompletionHandler(id -> { // log.info("【Vertx-MQTT】 => 接收MQTT的PUBACK或PUBCOMP数据包,数据包ID:{}", id); }) .subscribeCompletionHandler(ack -> { // log.info("【Vertx-MQTT】 => 接收MQTT的SUBACK数据包,数据包ID:{}", ack.messageId()); }) .unsubscribeCompletionHandler(id -> { // log.info("【Vertx-MQTT】 => 接收MQTT的UNSUBACK数据包,数据包ID:{}", id); }) .pingResponseHandler(s -> { // log.info("【Vertx-MQTT】 => 接收MQTT的PINGRESP数据包"); }) .connect(mqttClientProperties.getPort(), mqttClientProperties.getHost(), connectResult -> { if (connectResult.succeeded()) { isConnected.set(true); log.info("【Vertx-MQTT】 => MQTT连接成功,主机:{},端口:{},客户端ID:{}", mqttClientProperties.getHost(), mqttClientProperties.getPort(), mqttClientProperties.getClientId()); resubscribe(); } else { isConnected.set(false); Throwable ex = connectResult.cause(); log.error("【Vertx-MQTT】 => MQTT连接失败,原因:{},客户端ID:{}", ex.getMessage(), mqttClientProperties.getClientId(), ex); reconnect(); } }); } public void close() { disconnect(); } /** * Sends the PUBLISH message to the remote MQTT server. * @param topic topic on which the message is published * @param payload message payload * @param qos QoS level * @param isDup if the message is a duplicate * @param isRetain if the message needs to be retained */ public void publish(String topic, int qos, String payload, boolean isDup, boolean isRetain) { mqttClient.publish(topic, Buffer.buffer(payload), convertQos(qos), isDup, isRetain); } private void reconnect() { if (isReconnected.get()) { log.info("【Vertx-MQTT】 => MQTT尝试重连"); vertx.setTimer(mqttClientProperties.getReconnectInterval(), handler -> ThreadUtils.newVirtualTaskExecutor().execute(this::open)); } } private void subscribe() { Map<String, Integer> topics = mqttClientProperties.getTopics(); checkTopicAndQos(topics); mqttClient.subscribe(topics, subscribeResult -> { if (subscribeResult.succeeded()) { log.info("【Vertx-MQTT】 => MQTT订阅成功,主题: {}", String.join("、", topics.keySet())); } else { Throwable ex = subscribeResult.cause(); log.error("【Vertx-MQTT】 => MQTT订阅失败,主题:{},错误信息:{}", String.join("、", topics.keySet()), ex.getMessage(), ex); } }); } private void resubscribe() { if (isConnected.get() || mqttClient.isConnected()) { ThreadUtils.newVirtualTaskExecutor().execute(this::subscribe); } if (isLoaded.compareAndSet(false, true)) { ThreadUtils.newVirtualTaskExecutor().execute(this::consume); } } private void consume() { Disposable disposable = messageSink.asFlux().doOnNext(mqttPublishMessage -> { String topic = mqttPublishMessage.topicName(); log.info("【Vertx-MQTT】 => MQTT接收到消息,Topic:{}", topic); for (MessageHandler messageHandler : messageHandlers) { if (messageHandler.isSubscribe(topic)) { messageHandler.handle(new MqttMessage(mqttPublishMessage.payload(), topic)); } } }).subscribeOn(Schedulers.boundedElastic()).subscribe(); disposables.add(disposable); } private void disposable() { for (Disposable disposable : disposables) { if (ObjectUtils.isNotNull(disposable) && !disposable.isDisposed()) { disposable.dispose(); } } } private void disconnect() { isReconnected.set(false); mqttClient.disconnect(disconnectResult -> { if (disconnectResult.succeeded()) { disposable(); log.info("【Vertx-MQTT】 => MQTT断开连接成功"); disposables.clear(); } else { Throwable ex = disconnectResult.cause(); log.error("【Vertx-MQTT】 => MQTT断开连接失败,错误信息:{}", ex.getMessage(), ex); } }); } private void unsubscribe(List<String> topics) { checkTopic(topics); mqttClient.unsubscribe(topics, unsubscribeResult -> { if (unsubscribeResult.succeeded()) { log.info("【Vertx-MQTT】 => MQTT取消订阅成功,主题:{}", String.join("、", topics)); } else { Throwable ex = unsubscribeResult.cause(); log.error("【Vertx-MQTT】 => MQTT取消订阅失败,主题:{},错误信息:{}", String.join("、", topics), ex.getMessage(), ex); } }); } private MqttClientOptions getOptions() { MqttClientOptions options = new MqttClientOptions(); options.setClientId(mqttClientProperties.getClientId()); options.setCleanSession(mqttClientProperties.isClearSession()); options.setAutoKeepAlive(mqttClientProperties.isAutoKeepAlive()); options.setKeepAliveInterval(mqttClientProperties.getKeepAliveInterval()); options.setReconnectAttempts(mqttClientProperties.getReconnectAttempts()); options.setReconnectInterval(mqttClientProperties.getReconnectInterval()); options.setWillQoS(mqttClientProperties.getWillQos()); options.setWillTopic(mqttClientProperties.getWillTopic()); options.setAutoAck(mqttClientProperties.isAutoAck()); options.setAckTimeout(mqttClientProperties.getAckTimeout()); options.setWillRetain(mqttClientProperties.isWillRetain()); options.setWillMessageBytes(Buffer.buffer(mqttClientProperties.getWillPayload())); options.setReceiveBufferSize(mqttClientProperties.getReceiveBufferSize()); options.setMaxMessageSize(mqttClientProperties.getMaxMessageSize()); if (mqttClientProperties.isAuth()) { options.setPassword(mqttClientProperties.getPassword()); options.setUsername(mqttClientProperties.getUsername()); } return options; } private void checkTopicAndQos(Map<String, Integer> topics) { topics.forEach((topic, qos) -> { if (StringUtils.isEmpty(topic) || ObjectUtils.isNull(qos)) { throw new IllegalArgumentException("【Vertx-MQTT】 => Topic and QoS cannot be null"); } }); } private void checkTopic(List<String> topics) { if (CollectionUtils.isEmpty(topics)) { throw new IllegalArgumentException("【Vertx-MQTT】 => Topics list cannot be empty"); } } private MqttQoS convertQos(int qos) { return MqttQoS.valueOf(qos); } }
VertxMqttClientTest
/** * @author laokou */ @SpringBootTest @RequiredArgsConstructor @ContextConfiguration(classes = { DefaultMessageHandler.class, VertxConfig.class }) @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL) class VertxMqttClientTest { private final List<MessageHandler> messageHandlers; private final Vertx vertx; @Test void testMqttClient() throws InterruptedException { MqttClientProperties properties = new MqttClientProperties(); properties.setHost("127.0.0.1"); properties.setPort(1883); properties.setUsername("emqx"); properties.setPassword("laokou123"); properties.setClientId("test-client-1"); properties.setTopics(Map.of("/test-topic-1/#", 1)); VertxMqttClient vertxMqttClient = new VertxMqttClient(vertx, properties, messageHandlers); Assertions.assertDoesNotThrow(vertxMqttClient::open); Thread.sleep(500); Assertions.assertDoesNotThrow(() -> vertxMqttClient.publish("/test-topic-1/test", 1, "test", false, false)); Thread.sleep(500); Assertions.assertDoesNotThrow(vertxMqttClient::close); Thread.sleep(500); } }
非常推荐使用vertx-mqtt,项目平稳运行好用!!!
但是,需要时注意的是,项目部署到Linux系统,需要最少分配 -Xmx2100m -Xms2100m 内存,不然连接会关闭!
我是老寇,我们下次再见啦~