ESP32 MQTT对接巴法云平台

ESP32 MQTT对接巴法云平台

MQTT(Message Queuing Telemetry Transport)是一种轻量级的 发布/订阅(Publish/Subscribe) 消息传输协议,专为 低带宽、高延迟、不稳定网络环境 设计,是物联网(IoT)领域的核心通信协议之一。

MQTT 核心特性

特性 说明
轻量级 协议头最小仅 2字节,适合嵌入式设备
发布/订阅模型 设备不直接通信,通过 Broker(代理服务器) 中转,解耦性强
低功耗 适合电池供电设备,支持 心跳包(Keep Alive) 维持长连接
QoS 质量等级 支持 0/1/2 三级消息可靠性保证
主题(Topic) 分层式消息路由(如 home/living_room/temperature),支持通配符 +#

MQTT 与 HTTP 对比

对比维度 MQTT HTTP
协议开销 极低(适合高频小数据) 高(Header 冗余)
通信模式 双向实时推送 单向请求-响应
连接开销 长连接(减少握手延迟) 短连接(每次请求需重建连接)
适用场景 物联网、移动推送、实时监控 Web 服务、API 接口

MQTT 核心概念

  1. Broker(代理服务器)
    • 核心枢纽,负责消息路由(如 Mosquitto、EMQX、HiveMQ)
    • 实现消息存储转发、客户端管理、安全认证
  2. Topic(主题)
    • 消息分类的层级路径(例如:factory/machine1/status
    • 通配符:
      • +:单级匹配(home/+/temperature 匹配 home/living_room/temperature
      • #:多级匹配(home/# 匹配 home/living_room/light
  3. QoS(服务质量)
    • QoS 0:最多一次(尽力交付,可能丢失)
    • QoS 1:至少一次(确保送达,可能重复)
    • QoS 2:恰好一次(严格保证,无重复)

micro python可行性先行验证

# 导入必要的库 from umqtt.simple import MQTTClient  # MQTT协议客户端库 import time                          # 时间相关函数 from machine import Timer            # 硬件定时器控制  #################### 用户可修改配置区域 #################### wifiName = "jianzhiji02"             # WiFi名称(仅支持2.4G网络) wifiPassword = "8765432111"          # WiFi密码 clientID = ""  # 设备密钥,从巴法云控制台获取 subTopic = 'OTALX'                   # 订阅的主题(接收指令) pubTopic = 'bafa'               # 发布的主题(发送数据)  #################### 固定配置区域 ######################## serverIP = "bemfa.com"               # MQTT服务器地址 port = 9501                          # MQTT端口号 ping_interval = 300                  # 心跳包间隔(秒)  ################# WiFi连接函数 ########################## def do_connect():     """连接WiFi网络"""     import network     sta_if = network.WLAN(network.STA_IF)     if not sta_if.isconnected():         print('正在连接网络...')         sta_if.active(True)         sta_if.connect(wifiName, wifiPassword)         while not sta_if.isconnected():             pass     print('WiFi连接成功')     print('网络配置:', sta_if.ifconfig())  ################# MQTT消息回调函数 ####################### def msg_callback(topic, msg):     """处理接收到的MQTT消息"""     global client  # 声明全局客户端对象          print("[消息到达] 主题:", topic.decode(), "| 载荷:", msg.decode())          # 判断是否是订阅的主题     if topic.decode() == subTopic:         # 根据指令执行操作         if msg == b"on":             print("执行开机操作")             # 示例:控制GPIO输出高电平             # pin.value(1)                          # 发送状态确认(推送消息)             client.publish(pubTopic, "设备已开启")                      elif msg == b"off":             print("执行关机操作")             # 示例:控制GPIO输出低电平             # pin.value(0)                          # 发送状态确认(推送消息)             client.publish(pubTopic, "设备已关闭")  ################ MQTT连接与订阅函数 ##################### def connect_mqtt():     """建立MQTT连接并订阅主题"""     global client  # 声明全局客户端对象          try:         # 创建客户端实例         client = MQTTClient(clientID, serverIP, port)         client.set_callback(msg_callback)  # 设置回调函数                  # 建立连接         client.connect()         print(f"成功连接到MQTT服务器 {serverIP}:{port}")                  # 订阅主题         client.subscribe(subTopic)         print(f"已订阅主题: {subTopic}")                  # 设置心跳间隔(可选)         client.keepalive = ping_interval                  return client          except Exception as e:         print("MQTT连接失败:", e)         restart_and_reconnect()  ################ 异常处理函数 ########################## def restart_and_reconnect():     """重启设备并重连"""     print("10秒后重启设备...")     time.sleep(10)     machine.reset()  ################ 定时推送函数 ########################## def timed_publish(timer):     """定时发布设备状态(示例)"""     global client     try:         # 示例数据(可替换为传感器数据)         status = "11"          client.publish(pubTopic, f" {status}")         print(f"定时推送: {status}")     except:         print("定时推送失败")  ################# 主程序流程 ########################### if __name__ == "__main__":     # 连接WiFi     do_connect()          # 初始化MQTT连接     connect_mqtt()          # 设置定时器(每30秒发送心跳)     timer = Timer(-1)  # 创建虚拟定时器     timer.init(         period=3000,    # 间隔3秒(3000毫秒)         mode=Timer.PERIODIC,         callback=timed_publish     )     print("定时推送已启用")          # 主循环     try:         while True:             try:                 client.check_msg()  # 检查新消息                 time.sleep(1)       # 降低CPU占用                              except Exception as e:                 print("运行错误:", e)                 restart_and_reconnect()                      except KeyboardInterrupt:         print("程序终止")         client.disconnect()         timer.deinit() 

终端打印

ESP32 MQTT对接巴法云平台

成功连接云平台,成功推送,订阅主题。可行性验证成功。

云平台

巴法云物联网平台_MQTT设备云

推送内容控制led

ESP32 MQTT对接巴法云平台

接受ESP32推送内容

ESP32 MQTT对接巴法云平台

Esp-idf+C语言实现

重要代码段分析

MQTT推送

//----------------MQTT推送-----------------------------------------------------------------// //推送字符         const char *status = "putting";         //mqtt句柄,推送目标主题,推送内容,长度,服务质量,发布消息的标志位(通常设置为0)         esp_mqtt_client_publish(mqtt_client, PUB_TOPIC, status, strlen(status), 0, 0);   //推送数字         num+=3;//推送运行时间         snprintf(num_str, sizeof(num_str), "%d", num); // 将整数转为字符串         esp_mqtt_client_publish(mqtt_client, PUB_TOPIC2, num_str, strlen(num_str), 0, 0); 
MQTT推送函数

esp_mqtt_client_publish:这是ESP-IDF提供的函数,用于通过MQTT客户端发布消息。

  • mqtt_client:这是MQTT客户端的句柄,通常是在初始化MQTT客户端时创建的。

  • PUB_TOPIC:这是您要发布消息的主题名称,通常是一个字符串。

  • status:这是您要发布的消息内容,通常是一个字符串。

  • strlen(status):这是消息内容的长度,strlen函数用于计算字符串的长度。

    :这是QoS(Quality of Service)等级,表示服务质量。MQTT协议定义了三种QoS等级:

    • 0:最多一次("fire-and-forget")
    • 1:至少一次
    • 2:只有一次
  • 0:这是发布消息的标志位,通常设置为0。

MQTT订阅(MQTT成功连接)

// MQTT事件处理 static void mqtt_event_handler(void *handler_args, esp_event_base_t base,                                int32_t event_id, void *event_data) {     esp_mqtt_event_handle_t event = event_data;      switch (event->event_id) {         case MQTT_EVENT_CONNECTED://<MQTT连接事件>---------->MQTT成功连接则订阅主题             ESP_LOGI(TAG, "MQTT Connected");             mqtt_connected = true;             //----------------MQTT订阅(MQTT成功后)-----------------------------------------             //mqtt句柄,订阅的主题,服务等级(0)             esp_mqtt_client_subscribe(mqtt_client, SUB_TOPIC, 0);//订阅主题SUB_TOPIC(ledctrl)                                   case MQTT_EVENT_DATA: {//<MQTT接收数据事件>             // 处理接收数据             //----------------判断接收主题-----------------------------------------------------------------//             char topic[event->topic_len + 1];             memcpy(topic, event->topic, event->topic_len);             topic[event->topic_len] = '';             //----------------判断接收数据-----------------------------------------------------------------//             char data[event->data_len + 1];             memcpy(data, event->data, event->data_len);             data[event->data_len] = '';             //----------------打印接收主题+数据-----------------------------------------------------------------//             ESP_LOGI(TAG, "Received: Topic=%s, Data=%s", topic, data);              if (strcmp(topic, SUB_TOPIC) == 0) {//判断接收主题(ledctrl)数据——>控制led灯->推送led状态信息                 if (strcmp(data, "on") == 0) {                     gpio_set_level(OUTPUT_PIN, 0);                     //推送led状态信息到主题ledstate                     esp_mqtt_client_publish(mqtt_client, PUB_TOPIC3, "led设备已开启", 0, 0, 0);//同时推送状态到主题ledstate                 } else if (strcmp(data, "off") == 0) {                     gpio_set_level(OUTPUT_PIN, 1);                     esp_mqtt_client_publish(mqtt_client, PUB_TOPIC3, "led设备已关闭", 0, 0, 0);                 }             }             break;         }         default:             break;     } } 

运行逻辑

MQTT成功连接(MQTT事件)->订阅主题

MQTT接收数据事件->判断接收内容

MQTT订阅函数

esp_mqtt_client_subscribe(mqtt_client, SUB_TOPIC, 0); // 订阅主题SUB_TOPIC(ledctrl)

  • esp_mqtt_client_subscribe:这是ESP-IDF提供的函数,用于订阅MQTT主题。
  • mqtt_client:这是MQTT客户端的句柄,通常是在初始化MQTT客户端时创建的。
  • SUB_TOPIC:这是您要订阅的主题名称,通常是一个字符串。在这个例子中,主题名称是ledctrl,表示用于控制LED灯的主题。
  • 0:这是QoS(Quality of Service)等级,表示服务质量。MQTT协议定义了三种QoS等级:
    • 0:最多一次("fire-and-forget")
    • 1:至少一次
    • 2:只有一次

终端现象分析

成功连接WIFI

ESP32 MQTT对接巴法云平台

成功连接MQTT服务器,并推送主题+内容

ESP32 MQTT对接巴法云平台

接受到MQTT服务器订阅的主题内容

ESP32 MQTT对接巴法云平台

完整代码:

#include <string.h> #include "freertos/FreeRTOS.h" #include "freertos/task.h" #include "esp_system.h" #include "esp_log.h" #include "esp_event.h" #include "esp_netif.h" #include "nvs_flash.h" #include "protocol_examples_common.h" #include "mqtt_client.h" #include "driver/gpio.h" #include "esp_timer.h" #include "esp_wifi.h" // 用户配置参数 #define WIFI_SSID      "jianzhiji02"//wifi名称 #define WIFI_PASS      "8765432111"//wifi密码 #define MQTT_CLIENT_ID ""//巴法云平台秘钥 #define SUB_TOPIC      "ledctrl"//订阅主题(控制核心板led) #define PUB_TOPIC      "bafa"//推送主题1(测试推送字符) #define PUB_TOPIC2     "Time"//推送主题2(测试推送数字) #define PUB_TOPIC3     "ledstate"//推送主题2(推送led状态)   // #define BROKER_URI     "mqtt://bemfa.com:9501"//服务器URL(需DNS解析) #define BROKER_URI     "mqtt://119.91.109.180:9501"//服务器ip地址+端口 static const char *TAG = "MQTT_DEMO"; static esp_mqtt_client_handle_t mqtt_client = NULL;//mqtt句柄 static esp_timer_handle_t status_timer;//状态定时器句柄 static bool mqtt_connected = false;//mqtt连接状态  #define OUTPUT_PIN     GPIO_NUM_48//led 引脚(低电平点亮) // GPIO初始化 static void init_gpio(void) {     gpio_config_t io_conf = {         .pin_bit_mask = (1ULL << OUTPUT_PIN),         .mode = GPIO_MODE_OUTPUT,         .pull_up_en = GPIO_PULLUP_DISABLE,         .pull_down_en = GPIO_PULLDOWN_DISABLE,         .intr_type = GPIO_INTR_DISABLE     };     gpio_config(&io_conf);     gpio_set_level(OUTPUT_PIN, 1); }   // 定义静态变量 // 缓冲区用于存储转换后的字符串 static int num = 0;static char num_str[16]; // 定时器回调函数(1s周期) //----------------MQTT推送-----------------------------------------------------------------// static void publish_status(void *arg) {     if (mqtt_connected) {         const char *status = "putting";//推送主题         esp_mqtt_client_publish(mqtt_client, PUB_TOPIC, status, strlen(status), 0, 0);         ESP_LOGI(TAG, "Published status: %s", status);             num+=3;//推送运行时间         snprintf(num_str, sizeof(num_str), "%d", num); // 将整数转为字符串         esp_mqtt_client_publish(mqtt_client, PUB_TOPIC2, num_str, strlen(num_str), 0, 0);         ESP_LOGI(TAG, "num: %d", num);  // 正确打印整型     } }  // MQTT事件处理 static void mqtt_event_handler(void *handler_args, esp_event_base_t base,                                int32_t event_id, void *event_data) {     esp_mqtt_event_handle_t event = event_data;      switch (event->event_id) {         case MQTT_EVENT_CONNECTED://<MQTT连接事件>             ESP_LOGI(TAG, "MQTT Connected");             mqtt_connected = true;             //----------------MQTT订阅-----------------------------------------------------------------//             esp_mqtt_client_subscribe(mqtt_client, SUB_TOPIC, 0);//订阅主题ledctrl             // 创建状态定时器(3秒周期)             esp_timer_create_args_t timer_cfg = {                 .callback = &publish_status,                 .name = "status_timer"             };             esp_timer_create(&timer_cfg, &status_timer);             esp_timer_start_periodic(status_timer, 3000000);             break;          case MQTT_EVENT_DISCONNECTED:             ESP_LOGI(TAG, "MQTT Disconnected");             mqtt_connected = false;             esp_timer_stop(status_timer);             break;          case MQTT_EVENT_DATA: {//<MQTT接收数据事件>             // 处理接收数据             //----------------判断接收主题-----------------------------------------------------------------//             char topic[event->topic_len + 1];             memcpy(topic, event->topic, event->topic_len);             topic[event->topic_len] = '';             //----------------判断接收数据-----------------------------------------------------------------//             char data[event->data_len + 1];             memcpy(data, event->data, event->data_len);             data[event->data_len] = '';             //----------------打印接收主题+数据-----------------------------------------------------------------//             ESP_LOGI(TAG, "Received: Topic=%s, Data=%s", topic, data);              if (strcmp(topic, SUB_TOPIC) == 0) {//判断接收主题(ledctrl)数据——>控制led灯->推送led状态信息                 if (strcmp(data, "on") == 0) {                     gpio_set_level(OUTPUT_PIN, 0);                     //推送led状态信息到主题ledstate                     esp_mqtt_client_publish(mqtt_client, PUB_TOPIC3, "led设备已开启", 0, 0, 0);//同时推送状态到主题ledstate                 } else if (strcmp(data, "off") == 0) {                     gpio_set_level(OUTPUT_PIN, 1);                     esp_mqtt_client_publish(mqtt_client, PUB_TOPIC3, "led设备已关闭", 0, 0, 0);                 }             }             break;         }          case MQTT_EVENT_ERROR:             ESP_LOGE(TAG, "MQTT Error");             break;          default:             break;     } }  // WiFi事件处理 static void wifi_event_handler(void *arg, esp_event_base_t event_base,                                int32_t event_id, void *event_data) {     if (event_id == IP_EVENT_STA_GOT_IP) {         // WiFi连接成功后启动MQTT         esp_mqtt_client_config_t mqtt_cfg = {             .broker.address.uri = BROKER_URI,             .credentials.client_id = MQTT_CLIENT_ID         };         mqtt_client = esp_mqtt_client_init(&mqtt_cfg);         esp_mqtt_client_register_event(mqtt_client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL);         esp_mqtt_client_start(mqtt_client);     } }  void app_main(void) {     // 初始化NVS(堆栈存储wifi连接数据等。。。)     esp_err_t ret = nvs_flash_init();     if (ret == ESP_ERR_NVS_NO_FREE_PAGES || ret == ESP_ERR_NVS_NEW_VERSION_FOUND) {         ESP_ERROR_CHECK(nvs_flash_erase());         ret = nvs_flash_init();     }     ESP_ERROR_CHECK(ret);      // 初始化网络接口     ESP_ERROR_CHECK(esp_netif_init());     ESP_ERROR_CHECK(esp_event_loop_create_default());          // 配置WiFi     esp_netif_create_default_wifi_sta();     wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();     ESP_ERROR_CHECK(esp_wifi_init(&cfg));          // 注册WiFi事件     esp_event_handler_instance_t instance_any_id;     ESP_ERROR_CHECK(esp_event_handler_instance_register(IP_EVENT,                                                         IP_EVENT_STA_GOT_IP,                                                         &wifi_event_handler,                                                         NULL,                                                         &instance_any_id));     // 设置WiFi参数     wifi_config_t wifi_config = {         .sta = {             .ssid = WIFI_SSID,             .password = WIFI_PASS,         },     };     ESP_ERROR_CHECK(esp_wifi_set_mode(WIFI_MODE_STA));//设置为STA模式     ESP_ERROR_CHECK(esp_wifi_set_config(WIFI_IF_STA, &wifi_config));//设置WiFi参数     ESP_ERROR_CHECK(esp_wifi_start());//启动WiFi     ESP_ERROR_CHECK(esp_wifi_connect());//连接WiFi      // 初始化GPIO(led)     init_gpio();      // 保持主任务运行     while (1) {         vTaskDelay(pdMS_TO_TICKS(1000));     } } 

发表评论

评论已关闭。

相关文章