Air201的MQTT功能
本章开始学习使用Air201模块进行MQTT链接操作,所需要用到的脚本存于LuatOS-Air201\demo\mqtt
文件夹中,若没有找到该脚本,可能代码并非最新,请根据前面教学重新拉取。
本章教程是通过使用LuatOS-Air201\demo\mqtt
下的脚本代码对Air201模块进行MQTT链接操作。操作例程包括有MQTT单链接、MQTT多链接、MQTT SSL不带证书链接、MQTT SSL带证书链接,大家可根据自身需求选择对应的例程学习。
固件地址:https://gitee.com/openLuat/LuatOS-Air201/tree/master/core
脚本地址:https://gitee.com/openLuat/LuatOS-Air201/tree/master/demo/mqtt
1,搭建环境
1.1 Luatools工具
在Luatools工具项目管理中新建项目,重新选择底层固件和脚本文件。
也可以在原有项目下通过删除旧脚本、添加新脚本的方式进而实现不同功能。
1.2 MQTTX工具
本章教程以MQTTX工具为例进行学习,下载地址为 https://mqttx.app/ ,大家也可以使用其他MQTT工具。现在大家先照着我的操作,我们先把MQTTX工具配置一下。
下载好软件后,根据下方图中操作指示填写信息。
填写好信息,点击连接。下一步开始添加订阅主题和发布消息主题,看下图,注意主题格式
订阅主题格式要求默认为 /luatos/pub/ 加模块的IMEI号,例如 /luatos/pub/864536071785271
发布主题格式要求默认为 /luatos/pub/ 加模块的IMEI号,例如 /luatos/sub/864536071785271
MQTTX配置已经完成,现在开始正式学习,学成之后便可通过MQTT进行自由通信了,实际效果如下图所示:
2,MQTT单链接示例
2.1 main.lua说明
在main.lua中我们需要调用single_mqtt,代码参考如下:
-- Luatools需要PROJECT和VERSION这两个信息
PROJECT = "mqttdemo"
VERSION = "1.0.0"
--[[
本demo需要mqtt库, 大部分能联网的设备都具有这个库
mqtt也是内置库, 无需require
]]
-- sys库是标配
_G.sys = require("sys")
--[[特别注意, 使用mqtt库需要下列语句]]
_G.sysplus = require("sysplus")
require "single_mqtt" -- MQTT单链接
-- require "multilink_mqtt" -- MQTT多链接
-- require "ssl_mqtt" -- MQTTS SSL链接
-- 用户代码已结束---------------------------------------------
-- 结尾总是这一句
sys.run()
-- sys.run()之后后面不要加任何语句!!!!!
2.2 single_mqtt.lua说明
下面将对single_mqtt.lua中的代码进行简单说明,并指导大家修改指定参数,以便顺利进行MQTT单链接操作。
- 在代码开头部分,根据自己的服务器修改指定的参数。
-
需要注意的是user_name和password在有些服务器上是可以不传入的,或者是对传入的值没有要求限制。
-
要根据实际服务器要求来填写。
--根据自己的服务器修改以下参数
local mqtt_host = "lbsmqtt.airm2m.com"
local mqtt_port = 1884
local mqtt_isssl = false
local client_id = "abc"
local user_name = "user"
local password = "password"
local pub_topic = "/luatos/pub/" .. (mcu.unique_id():toHex())
local sub_topic = "/luatos/sub/" .. (mcu.unique_id():toHex())
- 此task实现的是mqtt的连接、订阅消息、发布消息的流程。
- 要先等待网络就绪之后才可进行mqtt后续操作
- 待网络就绪之后,根据代码编写情况此时client_id、pub_topic和sub_topic会发生变化,会覆盖掉代码开头部分时的配置,这点需要注意。device_id为模块的IMEI号
sys.taskInit(function()
-- 等待联网
local ret, device_id = sys.waitUntil("net_ready")
-- 下面的是mqtt的参数均可自行修改
client_id = device_id
pub_topic = "/luatos/pub/" .. device_id
sub_topic = "/luatos/sub/" .. device_id
-- 打印一下上报(pub)和下发(sub)的topic名称
-- 上报: 设备 ---> 服务器
-- 下发: 设备 <--- 服务器
-- 可使用mqtt.x等客户端进行调试
log.info("mqtt", "pub", pub_topic)
log.info("mqtt", "sub", sub_topic)
if mqtt == nil then
while 1 do
sys.wait(1000)
log.info("bsp", "本bsp未适配mqtt库, 请查证")
end
end
-------------------------------------
-------- MQTT 演示代码 --------------
-------------------------------------
mqttc = mqtt.create(nil, mqtt_host, mqtt_port, mqtt_isssl)
mqttc:auth(client_id,user_name,password) -- client_id必填,其余选填
-- mqttc:keepalive(240) -- 默认值240s
mqttc:autoreconn(true, 3000) -- 自动重连机制
mqttc:on(function(mqtt_client, event, data, payload)
-- 用户自定义代码
log.info("mqtt", "event", event, mqtt_client, data, payload)
if event == "conack" then
-- 联上了
sys.publish("mqtt_conack")
mqtt_client:subscribe(sub_topic)--单主题订阅
-- mqtt_client:subscribe({[topic1]=1,[topic2]=1,[topic3]=1})--多主题订阅
elseif event == "recv" then
log.info("mqtt", "downlink", "topic", data, "payload", payload)
sys.publish("mqtt_payload", data, payload)
elseif event == "sent" then
-- log.info("mqtt", "sent", "pkgid", data)
-- elseif event == "disconnect" then
-- 非自动重连时,按需重启mqttc
-- mqtt_client:connect()
end
end)
-- mqttc自动处理重连, 除非自行关闭
mqttc:connect()
sys.waitUntil("mqtt_conack")
while true do
-- 演示等待其他task发送过来的上报信息
local ret, topic, data, qos = sys.waitUntil("mqtt_pub", 300000)
if ret then
-- 提供关闭本while循环的途径, 不需要可以注释掉
if topic == "close" then break end
mqttc:publish(topic, data, qos)
end
-- 如果没有其他task上报, 可以写个空等待
--sys.wait(60000000)
end
mqttc:close()
mqttc = nil
end)
- 此task的功能为模块每3秒向服务器发送一次数据
-- 这里演示在另一个task里上报数据, 会定时上报数据,不需要就注释掉
sys.taskInit(function()
sys.wait(3000)
local data = "123,"
local qos = 1 -- QOS0不带puback, QOS1是带puback的
while true do
sys.wait(3000)
if mqttc and mqttc:ready() then
local pkgid = mqttc:publish(pub_topic, data .. os.date(), qos)
end
end
end)
- 此代码可实现mqtt-uart透传,利用串口工具给服务器发消息或者接收来自服务器的消息
- 注意要使用串口1,且波特率为9600
-- 以下是演示与uart结合, 简单的mqtt-uart透传实现,不需要就注释掉
local uart_id = 1
uart.setup(uart_id, 9600)
uart.on(uart_id, "receive", function(id, len)
local data = ""
while 1 do
local tmp = uart.read(uart_id)
if not tmp or #tmp == 0 then
break
end
data = data .. tmp
end
log.info("uart", "uart收到数据长度", #data)
sys.publish("mqtt_pub", pub_topic, data)
end)
sys.subscribe("mqtt_payload", function(topic, payload)
log.info("uart", "uart发送数据长度", #payload)
uart.write(1, payload)
end)
- 此task是通过使用rtos.meminfo()查询内存信息,并进行打印。rtos库详细信息请参考 RTOS底层操作库 。
sys.taskInit(function ()
while true do
sys.wait(3000)
log.info("lua", rtos.meminfo())
log.info("sys", rtos.meminfo("sys"))
end
end)
2.3 示例效果
MQTT单链接示例如下图所示,实现效果为模块每3秒向服务器发送一次数据
前面代码中所提到的mqtt-uart透传实现效果图如下所示:
3,MQTT多链接示例
3.1 main.lua说明
在main.lua中我们需要调用multilink_mqtt,代码参考如下:
-- Luatools需要PROJECT和VERSION这两个信息
PROJECT = "mqttdemo"
VERSION = "1.0.0"
--[[
本demo需要mqtt库, 大部分能联网的设备都具有这个库
mqtt也是内置库, 无需require
]]
-- sys库是标配
_G.sys = require("sys")
--[[特别注意, 使用mqtt库需要下列语句]]
_G.sysplus = require("sysplus")
-- require "single_mqtt" -- MQTT单链接
require "multilink_mqtt" -- MQTT多链接
-- require "ssl_mqtt" -- MQTTS SSL链接
-- 用户代码已结束---------------------------------------------
-- 结尾总是这一句
sys.run()
-- sys.run()之后后面不要加任何语句!!!!!
3.2 multilink_mqtt.lua说明
- 在代码开头部分,大家根据自己的服务器修改指定的参数
- 特别说明:
client1_pub_topic
、client1_sub_topic
、client2_pub_topic
和client2_sub_topic
在后面函数中会再次赋参数,因此会覆盖掉这里的参数,所以大家可以选择不填。 -
```Lua --根据自己的服务器修改以下参数 local mqtt1_param = { mqttc = nil, host = "lbsmqtt.airm2m.com", port = 1884, is_ssl = false, clientId = "client1", username = "user", password = "password" }
local mqtt2_param = { mqttc = nil, host = "lbsmqtt.airm2m.com", port = 1884, is_ssl = false, clientId = "client2", username = "user", password = "password" }
local client1_pub_topic = "" local client1_sub_topic = ""
local client2_pub_topic = "" local client2_sub_topic = ""
- 此`create_mqtt`函数主要功能是创建并配置MQTT客户端对象,具体步骤包括: - 使用`mqtt.create`创建一个MQTT客户端对象,并将其存储在`mqtt_param`表的`mqttc`字段中。 - 使用`log.info`打印MQTT客户端的配置信息。 - 使用`mqttc:auth`进行MQTT三元组配置。 - 使用`mqttc:autoreconn`配置自动重连机制,`true`表示启动自动重连机制,`3000`为自动重连周期,单位为ms。 -
Lua -- 创建mqtt客户端对象 function create_mqtt(mqtt_param) mqtt_param["mqttc"] = mqtt.create(nil, mqtt_param["host"], mqtt_param["port"], mqtt_param["is_ssl"])log.info("mqtt", "mqttc", mqtt_param["host"], mqtt_param["port"], mqtt_param["is_ssl"], mqtt_param["clientId"], mqtt_param["username"], mqtt_param["password"]) mqtt_param["mqttc"]:auth(mqtt_param["clientId"], mqtt_param["username"], mqtt_param["password"]) -- client_id必填,其余选填 -- mqttc:keepalive(240) -- 默认值240s mqtt_param["mqttc"]:autoreconn(true, 3000) -- 自动重连机制
end
- 此`mqtt_client1`函数主要功能是创建并配置一个MQTT客户端1(client 1),并链接到指定的MQTT服务器,具体步骤包括: - `client1_pub_topic`和`client1_sub_topic`分别定义了客户端1的上报主题和订阅主题,`device_id`为设备的IMEI号。 - 使用`log.info`函数打印客户端1的上报和下发主题。 - 使用`create_mqtt`函数创建MQTT客户端1,并传入`mqtt1_param`表中参数。 - 设置MQTT客户端1的事件回调函数,`event`为事件类型标识,可能出现的值有`"conack"`(连接确认)、`"recv"`(接收消息)、`"sent"`(发送完成)、`"disconnect"`(服务器断开连接)等,再根据不同事件类型执行不同的功能。 - 调用`connect`方法连接到MQTT服务器。 -
Lua function mqtt_client1() client1_pub_topic = "/luatos/pub/client1/" .. device_id client1_sub_topic = "/luatos/sub/client1/" .. device_id-- 打印一下上报(pub)和下发(sub)的topic名称 -- 上报: 设备 ---> 服务器 -- 下发: 设备 <--- 服务器 -- 可使用mqtt.x等客户端进行调试 log.info("mqtt1", "pub", client1_pub_topic) log.info("mqtt1", "sub", client1_sub_topic) -- 创建第一个mqtt客户端 create_mqtt(mqtt1_param) mqtt1_param["mqttc"]:on(function(mqtt_client, event, data, payload) -- 用户自定义代码 log.info("mqtt", "event", event, mqtt_client, data, payload) if event == "conack" then -- 联上了 sys.publish("mqtt1_conack") mqtt_client:subscribe(client1_sub_topic)--单主题订阅 -- mqtt_client:subscribe({[topic1]=1,[topic2]=1,[topic3]=1})--多主题订阅 elseif event == "recv" then log.info("mqtt", "downlink", "topic", data, "payload", payload) sys.publish("mqtt_payload", data, payload) elseif event == "sent" then -- log.info("mqtt", "sent", "pkgid", data) -- elseif event == "disconnect" then -- 非自动重连时,按需重启mqttc -- mqtt_client:connect() end end) mqtt1_param["mqttc"]:connect()
end
- 此`mqtt_client2`函数主要功能是创建并配置一个MQTT客户端2(client 2),并链接到指定的MQTT服务器,代码内容与`mqtt_client1`类似,便不再做介绍了。 -
Lua function mqtt_client2() client2_pub_topic = "/luatos/pub/client2/" .. device_id client2_sub_topic = "/luatos/sub/client2/" .. device_idlog.info("mqtt2", "pub", client2_pub_topic) log.info("mqtt2", "sub", client2_sub_topic) -- 创建第二个mqtt客户端2 create_mqtt(mqtt2_param) mqtt2_param["mqttc"]:on(function(mqtt_client, event, data, payload) -- 用户自定义代码 log.info("mqtt", "event", event, mqtt_client, data, payload) if event == "conack" then -- 联上了 sys.publish("mqtt2_conack") mqtt_client:subscribe(client2_sub_topic)--单主题订阅 -- mqtt_client:subscribe({[topic1]=1,[topic2]=1,[topic3]=1})--多主题订阅 elseif event == "recv" then log.info("mqtt", "downlink", "topic", data, "payload", payload) sys.publish("mqtt_payload", data, payload) elseif event == "sent" then -- log.info("mqtt", "sent", "pkgid", data) -- elseif event == "disconnect" then -- 非自动重连时,按需重启mqttc -- mqtt_client:connect() end end) mqtt2_param["mqttc"]:connect()
end
- 此`sys.taskInit`为主task函数,函数主要功能是初始化刚才那两个MQTT客户端,确保它们能够成功连接到服务器,并进行周期性的发布消息以实现与服务器的通信。代码中还进行了设备联网检查及库的兼容性验证,确保在合适环境下运行。具体步骤包括: - 使用`sys.waitUntil`让系统等待网络连接就绪。 - 使用`mobile.imei()`获取模块IMEI号后赋值给`device_id`作为设备ID。 - 代码检查是否存在有可用的MQTT库。若不存在,进入一个无限循环,每秒打印一个日志信息,告知用户未找到 MQTT 库。 - 分别启动两个MQTT客户端,并等待与服务器成功连接的确认。 - 设定要发布的数据及qos(服务质量)等级,qos为1表示消息至少会被传递一次。 - 使用一个无限循环,每隔3秒检查MQTT客户端是否准备好,并发送带有时间戳的数据到指定的主题。 -
Lua -- 主task sys.taskInit(function() -- 等待联网 sys.waitUntil("IP_READY")device_id = mobile.imei() -- 获取模块的IMEI -- 检测当前底层core是否有包含mqtt库 if mqtt == nil then while 1 do sys.wait(1000) log.info("bsp", "本bsp未适配mqtt库, 请查证") end end sys.taskInit(mqtt_client1) -- 创建并连接第一个mqtt客户端 sys.waitUntil("mqtt1_conack") sys.taskInit(mqtt_client2) -- 创建并连接第二个mqtt客户端 sys.waitUntil("mqtt2_conack") sys.wait(3000) local data = "123," local qos = 1 -- QOS0不带puback, QOS1是带puback的 while true do sys.wait(3000) if mqtt1_param["mqttc"] and mqtt1_param["mqttc"]:ready() then local pkgid = mqtt1_param["mqttc"]:publish(client1_pub_topic, data .. os.date(), qos) end if mqtt2_param["mqttc"] and mqtt2_param["mqttc"]:ready() then local pkgid = mqtt2_param["mqttc"]:publish(client2_pub_topic, data .. os.date(), qos) end end -- 主动断开mqtt连接 -- mqtt1_param["mqttc"]:close() -- mqtt1_param["mqttc"] = nil -- mqtt2_param["mqttc"]:close() -- mqtt2_param["mqttc"] = nil
end)
- 此`sys.taskInit`的主要功能是每隔3秒打印一次Lua程序和操作系统的内存使用情况。 -
Lua sys.taskInit(function () -- 循环打印内存占用情况 while true do sys.wait(3000) log.info("lua", rtos.meminfo()) log.info("sys", rtos.meminfo("sys")) end end) ```
3.3 示例效果
Client 1:
Client 2:
4,MQTT SSL不带证书链接示例
4.1 main.lua说明
在main.lua中我们需要调用ssl_mqtt,代码参考如下:
-- Luatools需要PROJECT和VERSION这两个信息
PROJECT = "mqttdemo"
VERSION = "1.0.0"
--[[
本demo需要mqtt库, 大部分能联网的设备都具有这个库
mqtt也是内置库, 无需require
]]
-- sys库是标配
_G.sys = require("sys")
--[[特别注意, 使用mqtt库需要下列语句]]
_G.sysplus = require("sysplus")
-- require "single_mqtt" -- MQTT单链接
-- require "multilink_mqtt" -- MQTT多链接
require "ssl_mqtt" -- MQTTS SSL链接
-- 用户代码已结束---------------------------------------------
-- 结尾总是这一句
sys.run()
-- sys.run()之后后面不要加任何语句!!!!!
4.2 ssl_mqtt.lua说明
- 在代码开头部分,根据自己的服务器修改对应参数。
- 特别注意,MQTT SSL不带证书链接与带证书链接为同一个文件,我们本节教程是MQTT SSL不带证书链接,因此要将 mqtt_isssl 的值改为 true ,大家可自行参考下方代码进行修改。
-
```Lua --根据自己的服务器修改以下参数 local mqtt_host = "broker.emqx.io" local mqtt_port = 8883 local mqtt_isssl = true -- 是否使用ssl> false 不加密 | true 无证书加密 | table 有证书加密 -- 带证书的ssl连接,把证书文件作为脚本文件一起烧录到模块内,就可以用/luadb/路径直接读取 mqtt_isssl = { server_cert = io.readFile("/luadb/broker.emqx.io-ca.crt"), client_cert=nil, client_key=nil, client_password=nil, verify=1; } local client_id = "abc" local user_name = "user" local password = "password"
local pub_topic = "/luatos/pub/" .. (mcu.unique_id():toHex()) local sub_topic = "/luatos/sub/" .. (mcu.unique_id():toHex())
local mqttc = nil ``` - 其余代码部分就与MQTT单链接示例中的 single_mqtt.lua 代码相同,同样为避免重复信息过多,影响阅读感受,大家可转到 2.2 single_mqtt.lua说明 进行了解。
4.3 示例效果
5,MQTT SSL带证书链接示例
5.1 main.lua说明
在main.lua中我们依旧是需要调用ssl_mqtt,代码参考如下:
-- Luatools需要PROJECT和VERSION这两个信息
PROJECT = "mqttdemo"
VERSION = "1.0.0"
--[[
本demo需要mqtt库, 大部分能联网的设备都具有这个库
mqtt也是内置库, 无需require
]]
-- sys库是标配
_G.sys = require("sys")
--[[特别注意, 使用mqtt库需要下列语句]]
_G.sysplus = require("sysplus")
-- require "single_mqtt" -- MQTT单链接
-- require "multilink_mqtt" -- MQTT多链接
require "ssl_mqtt" -- MQTTS SSL链接
-- 用户代码已结束---------------------------------------------
-- 结尾总是这一句
sys.run()
-- sys.run()之后后面不要加任何语句!!!!!
5.2 ssl_mqtt.lua说明
- 在代码开头部分,依旧需要大家根据自己的服务器进行修改对应参数。
- 不过需要注意的是,本次是使用MQTT SSL带证书链接,所以需要将 mqtt_isssl 的值改为 table 。
- 另外需要注意的是,既然是带证书链接,那么肯定是需要准备好证书文件了,大家在使用自己的服务器时,一定要准备好对应的证书文件才行,证书文件建议直接放在
LuatOS-Air201\demo\mqtt
文件夹下,证书文件路径根据代码中示例自行修改。 - 在烧录时,要将证书文件作为脚本文件一同烧录到模块中去,第二章包含有烧录教程,大家可自行参考。
-
```Lua --根据自己的服务器修改以下参数 local mqtt_host = "broker.emqx.io" local mqtt_port = 8883 -- 无证书加密 和 下面带证书的ssl方式 根据实际需求配置(二选一) -- local mqtt_isssl = true -- 设置为true,为无证书加密 local mqtt_isssl = { -- 设置为table类型,带证书的ssl -- 把证书文件作为脚本文件一起烧录到模块内,就可以用/luadb/路径直接读取 server_cert = io.readFile("/luadb/broker.emqx.io-ca.crt"), client_cert=nil, client_key=nil, client_password=nil, verify=1; } local client_id = "abc" local user_name = "user" local password = "password"
local pub_topic = "/luatos/pub/" .. (mcu.unique_id():toHex()) local sub_topic = "/luatos/sub/" .. (mcu.unique_id():toHex())
local mqttc = nil ``` - 其余代码部分就与MQTT单链接示例中的 single_mqtt.lua 代码相同,为避免重复信息过多,影响阅读感受,大家可转到 2.2 single_mqtt.lua说明 进行了解。
5.3 示例效果