跳转至

Air8201的MQTT功能

本章开始学习使用Air8201模块进行MQTT链接操作,所需要用到的脚本存于LuatOS-Air8201\demo\mqtt文件夹中,若没有找到该脚本,可能代码并非最新,请根据前面教学重新拉取。

本章教程是通过使用LuatOS-Air8201\demo\mqtt下的脚本代码对Air8201模块进行MQTT链接操作。操作例程包括有MQTT单链接、MQTT多链接、MQTT SSL不带证书链接、MQTT SSL带证书链接,大家可根据自身需求选择对应的例程学习。

固件地址:https://gitee.com/openLuat/LuatOS-Air8201/tree/master/core

脚本地址:https://gitee.com/openLuat/LuatOS-Air8201/tree/master/demo/mqtt

1,搭建环境

1.1 Luatools工具

在Luatools工具项目管理中新建项目,重新选择底层固件和脚本文件。

也可以在原有项目下通过删除旧脚本、添加新脚本的方式进而实现不同功能。

image

1.2 MQTTX工具

本章教程以MQTTX工具为例进行学习,下载地址为 https://mqttx.app/ ,大家也可以使用其他MQTT工具。现在大家先照着我的操作,我们先把MQTTX工具配置一下。

下载好软件后,根据下方图中操作指示填写信息。

image

填写好信息,点击连接。下一步开始添加订阅主题和发布消息主题,看下图,注意主题格式

订阅主题格式要求默认为 /luatos/pub/ 加模块的IMEI号,例如 /luatos/pub/864536071785271

发布主题格式要求默认为 /luatos/pub/ 加模块的IMEI号,例如 /luatos/sub/864536071785271

image

MQTTX配置已经完成,现在开始正式学习,学成之后便可通过MQTT进行自由通信了,实际效果如下图所示:

image

2,MQTT单链接示例

2.1 main.lua说明

在main.lua中我们需要调用single_mqtt,代码参考如下:

-- Luatools需要PROJECT和VERSION这两个信息
PROJECT = "mqttdemo"
VERSION = "1.0.0"

--[[
本demo需要mqtt库, 大部分能联网的设备都具有这个库
mqtt也是内置库, 无需require
]]

-- sys库是标配
_G.sys = require("sys")
--[[特别注意, 使用mqtt库需要下列语句]]
_G.sysplus = require("sysplus")

require "single_mqtt"       -- MQTT单链接

-- require "multilink_mqtt"    -- MQTT多链接

-- require "ssl_mqtt"             -- MQTTS SSL链接

-- 用户代码已结束---------------------------------------------
-- 结尾总是这一句
sys.run()
-- sys.run()之后后面不要加任何语句!!!!!

2.2 single_mqtt.lua说明

下面将对single_mqtt.lua中的代码进行简单说明,并指导大家修改指定参数,以便顺利进行MQTT单链接操作。

  • 在代码开头部分,根据自己的服务器修改指定的参数。
  • 需要注意的是user_name和password在有些服务器上是可以不传入的,或者是对传入的值没有要求限制。

  • 要根据实际服务器要求来填写。

--根据自己的服务器修改以下参数
local mqtt_host = "lbsmqtt.airm2m.com"
local mqtt_port = 1884
local mqtt_isssl = false
local client_id = "abc"
local user_name = "user"
local password = "password"

local pub_topic = "/luatos/pub/" .. (mcu.unique_id():toHex())
local sub_topic = "/luatos/sub/" .. (mcu.unique_id():toHex())
  • 此task实现的是mqtt的连接、订阅消息、发布消息的流程。
  • 要先等待网络就绪之后才可进行mqtt后续操作
  • 待网络就绪之后,根据代码编写情况此时client_id、pub_topic和sub_topic会发生变化,会覆盖掉代码开头部分时的配置,这点需要注意。device_id为模块的IMEI号
sys.taskInit(function()
    -- 等待联网
    local ret, device_id = sys.waitUntil("net_ready")
    -- 下面的是mqtt的参数均可自行修改
    client_id = device_id
    pub_topic = "/luatos/pub/" .. device_id
    sub_topic = "/luatos/sub/" .. device_id

    -- 打印一下上报(pub)和下发(sub)的topic名称
    -- 上报: 设备 ---> 服务器
    -- 下发: 设备 <--- 服务器
    -- 可使用mqtt.x等客户端进行调试
    log.info("mqtt", "pub", pub_topic)
    log.info("mqtt", "sub", sub_topic)

    if mqtt == nil then
        while 1 do
            sys.wait(1000)
            log.info("bsp", "本bsp未适配mqtt库, 请查证")
        end
    end

    -------------------------------------
    -------- MQTT 演示代码 --------------
    -------------------------------------

    mqttc = mqtt.create(nil, mqtt_host, mqtt_port, mqtt_isssl)

    mqttc:auth(client_id,user_name,password) -- client_id必填,其余选填
    -- mqttc:keepalive(240) -- 默认值240s
    mqttc:autoreconn(true, 3000) -- 自动重连机制

    mqttc:on(function(mqtt_client, event, data, payload)
        -- 用户自定义代码
        log.info("mqtt", "event", event, mqtt_client, data, payload)
        if event == "conack" then
            -- 联上了
            sys.publish("mqtt_conack")
            mqtt_client:subscribe(sub_topic)--单主题订阅
            -- mqtt_client:subscribe({[topic1]=1,[topic2]=1,[topic3]=1})--多主题订阅
        elseif event == "recv" then
            log.info("mqtt", "downlink", "topic", data, "payload", payload)
            sys.publish("mqtt_payload", data, payload)
        elseif event == "sent" then
            -- log.info("mqtt", "sent", "pkgid", data)
        -- elseif event == "disconnect" then
            -- 非自动重连时,按需重启mqttc
            -- mqtt_client:connect()
        end
    end)

    -- mqttc自动处理重连, 除非自行关闭
    mqttc:connect()
    sys.waitUntil("mqtt_conack")
    while true do
        -- 演示等待其他task发送过来的上报信息
        local ret, topic, data, qos = sys.waitUntil("mqtt_pub", 300000)
        if ret then
            -- 提供关闭本while循环的途径, 不需要可以注释掉
            if topic == "close" then break end
            mqttc:publish(topic, data, qos)
        end
        -- 如果没有其他task上报, 可以写个空等待
        --sys.wait(60000000)
    end
    mqttc:close()
    mqttc = nil
end)
  • 此task的功能为模块每3秒向服务器发送一次数据
-- 这里演示在另一个task里上报数据, 会定时上报数据,不需要就注释掉
sys.taskInit(function()
    sys.wait(3000)
    local data = "123,"
    local qos = 1 -- QOS0不带puback, QOS1是带puback的
    while true do
        sys.wait(3000)
        if mqttc and mqttc:ready() then
            local pkgid = mqttc:publish(pub_topic, data .. os.date(), qos)
        end
    end
end)
  • 此代码可实现mqtt-uart透传,利用串口工具给服务器发消息或者接收来自服务器的消息
  • 注意要使用串口1,且波特率为9600
-- 以下是演示与uart结合, 简单的mqtt-uart透传实现,不需要就注释掉
local uart_id = 1
uart.setup(uart_id, 9600)
uart.on(uart_id, "receive", function(id, len)
    local data = ""
    while 1 do
        local tmp = uart.read(uart_id)
        if not tmp or #tmp == 0 then
            break
        end
        data = data .. tmp
    end
    log.info("uart", "uart收到数据长度", #data)
    sys.publish("mqtt_pub", pub_topic, data)
end)
sys.subscribe("mqtt_payload", function(topic, payload)
    log.info("uart", "uart发送数据长度", #payload)
    uart.write(1, payload)
end)
  • 此task是通过使用rtos.meminfo()查询内存信息,并进行打印。rtos库详细信息请参考 RTOS底层操作库
sys.taskInit(function ()
    while true do
        sys.wait(3000)
        log.info("lua", rtos.meminfo())
        log.info("sys", rtos.meminfo("sys"))
    end
end)

2.3 示例效果

MQTT单链接示例如下图所示,实现效果为模块每3秒向服务器发送一次数据

image

前面代码中所提到的mqtt-uart透传实现效果图如下所示:

image

3,MQTT多链接示例

3.1 main.lua说明

在main.lua中我们需要调用multilink_mqtt,代码参考如下:

-- Luatools需要PROJECT和VERSION这两个信息
PROJECT = "mqttdemo"
VERSION = "1.0.0"

--[[
本demo需要mqtt库, 大部分能联网的设备都具有这个库
mqtt也是内置库, 无需require
]]

-- sys库是标配
_G.sys = require("sys")
--[[特别注意, 使用mqtt库需要下列语句]]
_G.sysplus = require("sysplus")

-- require "single_mqtt"       -- MQTT单链接

require "multilink_mqtt"    -- MQTT多链接

-- require "ssl_mqtt"             -- MQTTS SSL链接

-- 用户代码已结束---------------------------------------------
-- 结尾总是这一句
sys.run()
-- sys.run()之后后面不要加任何语句!!!!!
  • 在代码开头部分,大家根据自己的服务器修改指定的参数
  • 特别说明:client1_pub_topicclient1_sub_topicclient2_pub_topicclient2_sub_topic 在后面函数中会再次赋参数,因此会覆盖掉这里的参数,所以大家可以选择不填。
    --根据自己的服务器修改以下参数
    local mqtt1_param = {
        mqttc = nil,
        host = "lbsmqtt.airm2m.com",
        port = 1884,
        is_ssl = false,
        clientId = "client1",
        username = "user",
        password = "password"
    }

    local mqtt2_param = {
        mqttc = nil,
        host = "lbsmqtt.airm2m.com",
        port = 1884,
        is_ssl = false,
        clientId = "client2",
        username = "user",
        password = "password"
    }

    local client1_pub_topic = ""
    local client1_sub_topic = ""

    local client2_pub_topic = ""
    local client2_sub_topic = ""
  • create_mqtt函数主要功能是创建并配置MQTT客户端对象,具体步骤包括:
  • 使用mqtt.create创建一个MQTT客户端对象,并将其存储在mqtt_param表的mqttc字段中。
  • 使用log.info打印MQTT客户端的配置信息。
  • 使用mqttc:auth进行MQTT三元组配置。
  • 使用mqttc:autoreconn配置自动重连机制,true表示启动自动重连机制,3000为自动重连周期,单位为ms。
    -- 创建mqtt客户端对象
    function create_mqtt(mqtt_param)
        mqtt_param["mqttc"] = mqtt.create(nil, mqtt_param["host"], mqtt_param["port"], mqtt_param["is_ssl"])

        log.info("mqtt", "mqttc", mqtt_param["host"], mqtt_param["port"], mqtt_param["is_ssl"], mqtt_param["clientId"], mqtt_param["username"], mqtt_param["password"])

        mqtt_param["mqttc"]:auth(mqtt_param["clientId"], mqtt_param["username"], mqtt_param["password"]) -- client_id必填,其余选填
        -- mqttc:keepalive(240) -- 默认值240s
        mqtt_param["mqttc"]:autoreconn(true, 3000) -- 自动重连机制
    end
  • mqtt_client1函数主要功能是创建并配置一个MQTT客户端1(client 1),并链接到指定的MQTT服务器,具体步骤包括:
  • client1_pub_topicclient1_sub_topic分别定义了客户端1的上报主题和订阅主题,device_id为设备的IMEI号。
  • 使用log.info函数打印客户端1的上报和下发主题。
  • 使用create_mqtt函数创建MQTT客户端1,并传入mqtt1_param表中参数。
  • 设置MQTT客户端1的事件回调函数,event为事件类型标识,可能出现的值有"conack"(连接确认)、"recv"(接收消息)、"sent"(发送完成)、"disconnect"(服务器断开连接)等,再根据不同事件类型执行不同的功能。
  • 调用connect方法连接到MQTT服务器。
    function mqtt_client1()
        client1_pub_topic = "/luatos/pub/client1/" .. device_id
        client1_sub_topic = "/luatos/sub/client1/" .. device_id

        -- 打印一下上报(pub)和下发(sub)的topic名称
        -- 上报: 设备 ---> 服务器
        -- 下发: 设备 <--- 服务器
        -- 可使用mqtt.x等客户端进行调试
        log.info("mqtt1", "pub", client1_pub_topic)
        log.info("mqtt1", "sub", client1_sub_topic)

        -- 创建第一个mqtt客户端
        create_mqtt(mqtt1_param)
        mqtt1_param["mqttc"]:on(function(mqtt_client, event, data, payload)
            -- 用户自定义代码
            log.info("mqtt", "event", event, mqtt_client, data, payload)
            if event == "conack" then
                -- 联上了
                sys.publish("mqtt1_conack")
                mqtt_client:subscribe(client1_sub_topic)--单主题订阅
                -- mqtt_client:subscribe({[topic1]=1,[topic2]=1,[topic3]=1})--多主题订阅
            elseif event == "recv" then
                log.info("mqtt", "downlink", "topic", data, "payload", payload)
                sys.publish("mqtt_payload", data, payload)
            elseif event == "sent" then
                -- log.info("mqtt", "sent", "pkgid", data)
            -- elseif event == "disconnect" then
                -- 非自动重连时,按需重启mqttc
                -- mqtt_client:connect()
            end
        end)
        mqtt1_param["mqttc"]:connect()
    end
  • mqtt_client2函数主要功能是创建并配置一个MQTT客户端2(client 2),并链接到指定的MQTT服务器,代码内容与mqtt_client1类似,便不再做介绍了。
    function mqtt_client2()
        client2_pub_topic = "/luatos/pub/client2/" .. device_id
        client2_sub_topic = "/luatos/sub/client2/" .. device_id

        log.info("mqtt2", "pub", client2_pub_topic)
        log.info("mqtt2", "sub", client2_sub_topic)

        -- 创建第二个mqtt客户端2
        create_mqtt(mqtt2_param)
        mqtt2_param["mqttc"]:on(function(mqtt_client, event, data, payload)
            -- 用户自定义代码
            log.info("mqtt", "event", event, mqtt_client, data, payload)
            if event == "conack" then
                -- 联上了
                sys.publish("mqtt2_conack")
                mqtt_client:subscribe(client2_sub_topic)--单主题订阅
                -- mqtt_client:subscribe({[topic1]=1,[topic2]=1,[topic3]=1})--多主题订阅
            elseif event == "recv" then
                log.info("mqtt", "downlink", "topic", data, "payload", payload)
                sys.publish("mqtt_payload", data, payload)
            elseif event == "sent" then
                -- log.info("mqtt", "sent", "pkgid", data)
            -- elseif event == "disconnect" then
                -- 非自动重连时,按需重启mqttc
                -- mqtt_client:connect()
            end
        end)
        mqtt2_param["mqttc"]:connect()
    end
  • sys.taskInit为主task函数,函数主要功能是初始化刚才那两个MQTT客户端,确保它们能够成功连接到服务器,并进行周期性的发布消息以实现与服务器的通信。代码中还进行了设备联网检查及库的兼容性验证,确保在合适环境下运行。具体步骤包括:
  • 使用sys.waitUntil让系统等待网络连接就绪。
  • 使用mobile.imei()获取模块IMEI号后赋值给device_id作为设备ID。
  • 代码检查是否存在有可用的MQTT库。若不存在,进入一个无限循环,每秒打印一个日志信息,告知用户未找到 MQTT 库。
  • 分别启动两个MQTT客户端,并等待与服务器成功连接的确认。
  • 设定要发布的数据及qos(服务质量)等级,qos为1表示消息至少会被传递一次。
  • 使用一个无限循环,每隔3秒检查MQTT客户端是否准备好,并发送带有时间戳的数据到指定的主题。
    -- 主task
    sys.taskInit(function()
        -- 等待联网
        sys.waitUntil("IP_READY")

        device_id = mobile.imei()   -- 获取模块的IMEI

        -- 检测当前底层core是否有包含mqtt库
        if mqtt == nil then
            while 1 do
                sys.wait(1000)
                log.info("bsp", "本bsp未适配mqtt库, 请查证")
            end
        end

        sys.taskInit(mqtt_client1)      -- 创建并连接第一个mqtt客户端
        sys.waitUntil("mqtt1_conack")

        sys.taskInit(mqtt_client2)      -- 创建并连接第二个mqtt客户端
        sys.waitUntil("mqtt2_conack")

        sys.wait(3000)
        local data = "123,"
        local qos = 1 -- QOS0不带puback, QOS1是带puback的
        while true do
            sys.wait(3000)
            if mqtt1_param["mqttc"] and mqtt1_param["mqttc"]:ready() then
                local pkgid = mqtt1_param["mqttc"]:publish(client1_pub_topic, data .. os.date(), qos)
            end

            if mqtt2_param["mqttc"] and mqtt2_param["mqttc"]:ready() then
                local pkgid = mqtt2_param["mqttc"]:publish(client2_pub_topic, data .. os.date(), qos)
            end
        end

        -- 主动断开mqtt连接
        -- mqtt1_param["mqttc"]:close()
        -- mqtt1_param["mqttc"] = nil

        -- mqtt2_param["mqttc"]:close()
        -- mqtt2_param["mqttc"] = nil
    end)
  • sys.taskInit的主要功能是每隔3秒打印一次Lua程序和操作系统的内存使用情况。
    sys.taskInit(function ()
        -- 循环打印内存占用情况
        while true do
            sys.wait(3000)
            log.info("lua", rtos.meminfo())
            log.info("sys", rtos.meminfo("sys"))
        end
    end)

3.3 示例效果

Client 1:

image

Client 2:

image

4,MQTT SSL不带证书链接示例

4.1 main.lua说明

在main.lua中我们需要调用ssl_mqtt,代码参考如下:

    -- Luatools需要PROJECT和VERSION这两个信息
    PROJECT = "mqttdemo"
    VERSION = "1.0.0"

    --[[
    本demo需要mqtt库, 大部分能联网的设备都具有这个库
    mqtt也是内置库, 无需require
    ]]

    -- sys库是标配
    _G.sys = require("sys")
    --[[特别注意, 使用mqtt库需要下列语句]]
    _G.sysplus = require("sysplus")

    -- require "single_mqtt"       -- MQTT单链接

    -- require "multilink_mqtt"    -- MQTT多链接

    require "ssl_mqtt"             -- MQTTS SSL链接

    -- 用户代码已结束---------------------------------------------
    -- 结尾总是这一句
    sys.run()
    -- sys.run()之后后面不要加任何语句!!!!!

4.2 ssl_mqtt.lua说明(不带证书链接)

  • 在代码开头部分,根据自己的服务器修改对应参数。
  • 特别注意,MQTT SSL不带证书链接与带证书链接为同一个文件,我们本节教程是MQTT SSL不带证书链接,因此要将 mqtt_isssl 的值改为 true ,大家可自行参考下方代码进行修改。
    --根据自己的服务器修改以下参数
    local mqtt_host = "broker.emqx.io"
    local mqtt_port = 8883
    local mqtt_isssl = true -- 是否使用ssl> false 不加密 | true 无证书加密 | table 有证书加密
    -- 带证书的ssl连接,把证书文件作为脚本文件一起烧录到模块内,就可以用/luadb/路径直接读取
        mqtt_isssl = {
            server_cert = io.readFile("/luadb/broker.emqx.io-ca.crt"),
            client_cert=nil,
            client_key=nil,
            client_password=nil,
            verify=1;
        }
    local client_id = "abc"
    local user_name = "user"
    local password = "password"

    local pub_topic = "/luatos/pub/" .. (mcu.unique_id():toHex())
    local sub_topic = "/luatos/sub/" .. (mcu.unique_id():toHex())

    local mqttc = nil
  • 其余代码部分就与MQTT单链接示例中的 single_mqtt.lua 代码相同,同样为避免重复信息过多,影响阅读感受,大家可转到 2.2 single_mqtt.lua说明 进行了解。

4.3 示例效果

image

5,MQTT SSL带证书链接示例

5.1 main.lua说明

在main.lua中我们依旧是需要调用ssl_mqtt,代码参考如下:

    -- Luatools需要PROJECT和VERSION这两个信息
    PROJECT = "mqttdemo"
    VERSION = "1.0.0"

    --[[
    本demo需要mqtt库, 大部分能联网的设备都具有这个库
    mqtt也是内置库, 无需require
    ]]

    -- sys库是标配
    _G.sys = require("sys")
    --[[特别注意, 使用mqtt库需要下列语句]]
    _G.sysplus = require("sysplus")

    -- require "single_mqtt"       -- MQTT单链接

    -- require "multilink_mqtt"    -- MQTT多链接

    require "ssl_mqtt"             -- MQTTS SSL链接

    -- 用户代码已结束---------------------------------------------
    -- 结尾总是这一句
    sys.run()
    -- sys.run()之后后面不要加任何语句!!!!!

5.2 ssl_mqtt.lua说明(带证书链接)

  • 在代码开头部分,依旧需要大家根据自己的服务器进行修改对应参数。
  • 不过需要注意的是,本次是使用MQTT SSL带证书链接,所以需要将 mqtt_isssl 的值改为 table 。
  • 另外需要注意的是,既然是带证书链接,那么肯定是需要准备好证书文件了,大家在使用自己的服务器时,一定要准备好对应的证书文件才行,证书文件建议直接放在 LuatOS-Air8201\demo\mqtt 文件夹下,证书文件路径根据代码中示例自行修改。
  • 在烧录时,要将证书文件作为脚本文件一同烧录到模块中去,第二章包含有烧录教程,大家可自行参考。
    --根据自己的服务器修改以下参数
    local mqtt_host = "broker.emqx.io"
    local mqtt_port = 8883
    -- 无证书加密 和 下面带证书的ssl方式 根据实际需求配置(二选一)
    -- local mqtt_isssl = true    -- 设置为true,为无证书加密
    local mqtt_isssl = {          -- 设置为table类型,带证书的ssl
        -- 把证书文件作为脚本文件一起烧录到模块内,就可以用/luadb/路径直接读取
        server_cert = io.readFile("/luadb/broker.emqx.io-ca.crt"),
        client_cert=nil,
        client_key=nil,
        client_password=nil,
        verify=1;
    }
    local client_id = "abc"
    local user_name = "user"
    local password = "password"

    local pub_topic = "/luatos/pub/" .. (mcu.unique_id():toHex())
    local sub_topic = "/luatos/sub/" .. (mcu.unique_id():toHex())

    local mqttc = nil
  • 其余代码部分就与MQTT单链接示例中的 single_mqtt.lua 代码相同,为避免重复信息过多,影响阅读感受,大家可转到 2.2 single_mqtt.lua说明 进行了解。

5.3 示例效果

image