跳转至

MQTT

一、MQTT 介绍

MQTT 是一种低开销、低带宽占用的即时通讯协议,可以用极少的代码和带宽为远程设备提供实时可靠的消息服务。它适用于硬件性能低下的设备以及网络状况不佳的环境,因此在物联网(IoT)小型设备和移动应用等方面有广泛应用。

MQTT 采用发布/订阅通信模型,客户端可以发布消息到主题(Topic),也可以订阅主题来接收消息。这种模式解耦了消息的发送者和接收者。

MQTT 的消息传递质量分为三种: QoS 0:最多一次交付,即消息可能丢失; QoS 1:至少一次交付,即消息可能会重复; QoS 2:恰好一次交付,即消息不会丢失也不会重复。

二、演示功能概述

本 demo 通过使用 Air8101 开发板,带你快速体验通过 MQTT 协议进行数据接收与发送。

三、准备硬件环境

“古人云:‘工欲善其事,必先利其器。’在深入介绍本功能示例之前,我们首先需要确保以下硬件环境的准备工作已经完成。”

参考:硬件环境清单,准备以及组装好硬件环境。

四、软件环境

“凡事预则立,不预则废。”在详细阐述本功能示例之前,我们需先精心筹备好以下软件环境。

  1. Luatools 工具
  2. 内核固件文件(底层 core 固件文件):LuatOS-SoC_V10001_Air8101.soc;参考项目使用的内核固件
  3. luatos 需要的脚本和资源文件

脚本和资源文件:https://gitee.com/openLuat/LuatOS-Air8101/tree/master/demo/wlan/softAP

lib 脚本文件:使用 Luatools 烧录时,勾选 添加默认 lib 选项,使用默认 lib 脚本文件;

准备好软件环境之后,接下来查看如何烧录项目文件到 Air8101 开发板,将本篇文章中演示使用的项目文件烧录到 Air8101 开发板中。

五、MQTT API 介绍

5.1 创建一个 mqtt client 实例

mqtt.client(clientId, keepAlive, username, password, cleanSession, will, version)

参数

名称
传入值类型
释义
clientId
string
确保设备唯一性
keepAlive
number
可选参数,默认为300 心跳间隔(单位为秒),默认300秒
username
string
可选参数,默认为"" 用户名,用户名为空配置为""或者nil
password
string
可选参数,默认为"" 密码,密码为空配置为""或者nil
cleanSession
number
可选参数,默认为1 1/0
will
table
可选参数,默认为nil 遗嘱参数,格式为{qos=,retain=,topic=,payload=}
version
string
可选参数,默认为"3.1.1" MQTT版本号,仅支持"3.1"和"3.1.1"

返回值

table mqttc client 实例

例子

mqttc = mqtt.client("clientid-123")
mqttc = mqtt.client("clientid-123",200)
mqttc = mqtt.client("clientid-123",nil,"user","password")
mqttc = mqtt.client("clientid-123",nil,"user","password",nil,{qos=0,retain=0,topic="willTopic",payload="willTopic"},"3.1")

5.2 连接 mqtt 服务器

mqttc:connect(host, port, transport, cert, timeout)

参数

名称
传入值类型
释义
host
string
服务器地址
port
param
string或者number类型,服务器端口
transport
string
可选参数,默认为"tcp" “tcp"或者"tcp_ssl”
cert
table
可选参数,默认为nil table或者nil类型,ssl证书,当transport为"tcp_ssl"时,此参数才有意义。cert格式如下:
{
caCert = “ca.crt”, --CA证书文件(Base64编码 X.509格式),如果存在此参数,则表示客户端会对服务器的证书进行校验;不存在则不校验
clientCert = “client.crt”, --客户端证书文件(Base64编码 X.509格式),服务器对客户端的证书进行校验时会用到此参数
clientKey = “client.key”, --客户端私钥文件(Base64编码 X.509格式)
clientPassword = “123456”, --客户端证书文件密码[可选]
}
timeout
number
可选参数,默认为120 可选参数,socket连接超时时间,单位秒

返回值

result true 表示成功,false 或者 nil 表示失败

例子

mqttc = mqtt.client("clientid-123", nil, nil, false); mqttc:connect("mqttserver.com", 1883, "tcp", 5)

5.3 订阅主题

mqttc:subscribe(topic, qos)

参数

名称
传入值类型
释义
topic

param

string或者table类型,一个主题时为string类型,多个主题时为table类型,主题内容为UTF8编码
qos
param
可选参数,默认为0 number或者nil,topic为一个主题时,qos为number类型(0/1/2,默认0);topic为多个主题时,qos为nil

返回值

bool true 表示成功,false 或者 nil 表示失败

例子

mqttc:subscribe("/abc", 0) -- subscribe topic "/abc" with qos = 0
mqttc:subscribe({["/topic1"] = 0, ["/topic2"] = 1, ["/topic3"] = 2}) -- subscribe multi topic

5.4 取消订阅主题

mqttc:unsubscribe(topic)

参数

名称
传入值类型
释义
topic
param
string或者table类型,一个主题时为string类型,多个主题时为table类型,主题内容为UTF8编码

返回值

bool true 表示成功,false 或者 nil 表示失败

例子

mqttc:unsubscribe("/abc") -- unsubscribe topic "/abc"
mqttc:unsubscribe({"/topic1", "/topic2", "/topic3"}) -- unsubscribe multi topic

5.5 发布一条消息

mqttc:publish(topic, payload, qos, retain)

参数

名称
传入值类型
释义
topic
payload
string
string
UTF8编码的字符串
用户自己控制payload的编码,mqtt.lua不会对payload做任何编码转换
qos 0/1/2, default
number
可选参数,默认为0
retain
number
可选参数,默认为0 0或者1

返回值

bool 发布成功返回 true,失败返回 false

例子

mqttc = mqtt.client("clientid-123", nil, nil, false)
mqttc:connect("mqttserver.com", 1883, "tcp")
mqttc:publish("/topic", "publish from luat mqtt client", 0)

5.6 接收消息

mqttc:receive(timeout, msg)

参数

名称
传入值类型
释义
timeout
msg
number
string
接收超时时间,单位毫秒
可选参数,默认为nil 可选参数,控制socket所在的线程退出recv阻塞状态

返回值

result 数据接收结果,true 表示成功,false 表示失败

data

如果 result 为 true,表示服务器发过来的 mqtt 包

如果 result 为 false,超时失败,data 为"timeout"

如果 result 为 false,msg 控制退出,data 为 msg 的字符串

如果 result 为 false,socket 连接被动断开控制退出,data 为"CLOSED"

如果 result 为 false,PDP 断开连接控制退出,data 为"IP_ERROR_IND"

如果 result 为 false,mqtt 不处于连接状态,data 为 nil

如果 result 为 false,收到了 PUBLISH 报文,发送 PUBACK 或者 PUBREC 报文失败,data 为 nil

如果 result 为 false,收到了 PUBREC 报文,发送 PUBREL 报文失败,data 为 nil

如果 result 为 false,收到了 PUBREL 报文,发送 PUBCOMP 报文失败,data 为 nil

如果 result 为 false,发送 PINGREQ 报文失败,data 为 nil

param 如果是 msg 控制退出,param 的值是 msg 的参数;其余情况无意义,为 nil

例子

true, packet = mqttc:receive(2000)
false, error_message = mqttc:receive(2000)
false, msg, para = mqttc:receive(2000,"APP_SEND_DATA")

5.7 断开与服务器的连接

mqttc:disconnect()

参数

返回值

nil

例子

mqttc = mqtt.client("clientid-123", nil, nil, false)
mqttc:connect("mqttserver.com", 1883, "tcp")
process data
mqttc:disconnect()

六、代码示例介绍

6.1 DEMO 软件流程图

6.2 配置

6.2.1 MQTT 的 4 个重要配置

  • local mqtt_host = "lbsmqtt.airm2m.com"(MQTT 服务器地址,这里使用合宙提供公用测试服务器)
  • local mqtt_port = 1884(MQTT 端口)
  • local mqtt_client_id = "abc" (暂时只定义,但暂未使用)
  • local user_name = "user"(MQTT 服务器登录用户名)
  • local password = "password"(MQTT 服务器登录密码)

6.2.2 本 DEMO 通信使用的 2 个重要主题

  • local pub_topic ="1001001/up"(设备发布主题,可自行修改)
  • local sub_topic = "1001001/down"(设备订阅主题,可自行修改)

6.3 完整程序清单

注:完整复制后保存为 main.lua,可直接使用

-- LuaTools需要PROJECT和VERSION这两个信息
PROJECT = "mqttdemo"
VERSION = "1.0.0"

--[[
本demo需要mqtt库,
mqtt也是内置库, 无需require
]]

-- sys库是标配
_G.sys = require("sys")
--[[特别注意, 使用mqtt库需要下列语句]]
_G.sysplus = require("sysplus")

--根据自己的服务器修改以下参数
local mqtt_host = "lbsmqtt.airm2m.com"
local mqtt_port = 1884
local mqtt_isssl = false
local ca_file = false

local client_id = "mqttx_b55c41b7"
local user_name = "user"
local password = "password"

local pub_topic = ""-- .. (mcu.unique_id():toHex())
local sub_topic = ""-- .. (mcu.unique_id():toHex())

local mqttc = nil



sys.taskInit(function()
    -----------------------------
    -- 统一联网函数, 可自行删减
    ----------------------------
    if wlan and wlan.connect then
        -- wifi 联网, Air8101系列均支持
        local ssid = "Xiaomi_110"
        local password = "1234567890"
        log.info("wifi", ssid, password)

        wlan.init()
        wlan.setMode(wlan.STATION)
        wlan.connect(ssid, password, 1)
        --等待WIFI联网结果,WIFI联网成功后,内核固件会产生一个"IP_READY"消息
        local result, data = sys.waitUntil("IP_READY")
        log.info("wlan", "IP_READY", result, data)
        device_id = wlan.getMac()

    else
         -- 其他不认识的bsp, 循环提示一下吧
         while 1 do
            sys.wait(1000)
            log.info("bsp", "本bsp可能未适配网络层, 请查证")
        end
    end
    log.info("已联网")


    sys.publish("net_ready")
end)

sys.taskInit(function()
    -- 等待联网
    sys.waitUntil("net_ready")
    local device_id = "1001001"

    client_id = device_id
    pub_topic = device_id .. "/up"  -- 设备发布的主题,开发者可自行修改
    sub_topic = device_id .. "/down" -- 设备订阅的主题,开发者可自行修改

    -- 打印一下上报(pub)和下发(sub)的topic名称
    -- 上报: 设备 ---> 服务器
    -- 下发: 设备 <--- 服务器
    -- 可使用mqtt.x等客户端进行调试
    log.info("mqtt", "pub", pub_topic)
    log.info("mqtt", "sub", sub_topic)

    -- 打印一下支持的加密套件, 通常来说, 固件已包含常见的99%的加密套件
    -- if crypto.cipher_suites then
    --     log.info("cipher", "suites", json.encode(crypto.cipher_suites()))
    -- end
    if mqtt == nil then
        while 1 do
            sys.wait(1000)
            log.info("bsp", "本bsp未适配mqtt库, 请查证")
        end
    end

    -------------------------------------
    -------- MQTT 演示代码 --------------
    -------------------------------------

    mqttc = mqtt.create(nil, mqtt_host, mqtt_port, mqtt_isssl, ca_file)

    mqttc:auth(client_id,user_name,password) -- client_id必填,其余选填
    -- mqttc:keepalive(240) -- 默认值240s
    mqttc:autoreconn(true, 3000) -- 自动重连机制

    mqttc:on(function(mqtt_client, event, data, payload)
        -- 用户自定义代码
        log.info("mqtt", "event", event, mqtt_client, data, payload)
        if event == "conack" then
            -- 联上了

            sys.publish("mqtt_conack")
            mqtt_client:subscribe(sub_topic)--单主题订阅
            -- mqtt_client:subscribe({[topic1]=1,[topic2]=1,[topic3]=1})--多主题订阅
        elseif event == "recv" then
            log.info("mqtt", "downlink", "topic", data, "payload:", payload)
            log.info("mqtt", "uplink", "topic", pub_topic, "payload:", payload)
            sys.publish("mqtt_pub", pub_topic, payload)  --将收到的数据,通过发布主题目,进行发送
        elseif event == "sent" then
            log.info("mqtt", "sent", "pkgid", data)
        elseif event == "disconnect" then

            -- 非自动重连时,按需重启mqttc
            -- mqtt_client:connect()

        end
    end)

    -- mqttc自动处理重连, 除非自行关闭
    mqttc:connect()
    sys.waitUntil("mqtt_conack")
    while true do
        -- 演示等待其他task发送过来的上报信息
        local ret, topic, data, qos = sys.waitUntil("mqtt_pub", 300000)
        if ret then
            -- 提供关闭本while循环的途径, 不需要可以注释掉
            if topic == "close" then break end
            mqttc:publish(topic, data, qos)
        end

        -- 如果没有其他task上报, 可以写个空等待
        --sys.wait(6000)
    end
    mqttc:close()
    mqttc = nil
end)

-- 用户代码已结束---------------------------------------------
-- 结尾总是这一句
sys.run()
-- sys.run()之后后面不要加任何语句!!!!!

七、功能验证

7.1 查看调试日志,获取 WIFI 信息和设备发布与订阅主题

7.2 打开 MQTT 客户端 MQTT.fx 应用程序并配置

7.2.1 MQTT 客户端基本配置

ProfileName:合宙(可修改为你想要的名称)

BrokerAddress:http://lbsmqtt.airm2m.com (合宙提供的免费测试服务器,也可修改为自己的服务器)

BrokerPort:1883 (端口号)

UserName:user

Password:password

7.2.2 两个重要主题

设备发布主题:1001001/up (设备向服务器发送数据使用)

设备订阅主题:1001001/down (接收服务器数据主题)

7.3 MQTT 客户端订阅设备主题

7.4 MQTT 客户端给开发板发送数据

7.5 MQTT 客户端订阅的设备数据

7.6 连接 MQTT 服务器成功日志

7.7 接收和发送数据日志

7.8 热点关闭时候日志

7.9 热点关闭一段时间,然后再打开热点,wifi 可以自动重新连接热点自动连接获取 ip

7.10 热点关闭一段时间,然后再打开热点,MQTT 自动重新连接

首先开启自动重连机制 mqttc:autoreconn(true, 3000) ,

然后热点关闭一段时间,然后再打开热点,wifi 可以自动重新连接,并且能够接收到之前订阅的主题消息。

总结

至此,我们已使用 Air8101 开发板完成了 MQTT 通信的基本功能。