MQTT
一、MQTT 介绍
MQTT 是一种低开销、低带宽占用的即时通讯协议,可以用极少的代码和带宽为远程设备提供实时可靠的消息服务。它适用于硬件性能低下的设备以及网络状况不佳的环境,因此在物联网(IoT)小型设备和移动应用等方面有广泛应用。
MQTT 采用发布/订阅通信模型,客户端可以发布消息到主题(Topic),也可以订阅主题来接收消息。这种模式解耦了消息的发送者和接收者。
MQTT 的消息传递质量分为三种: QoS 0:最多一次交付,即消息可能丢失; QoS 1:至少一次交付,即消息可能会重复; QoS 2:恰好一次交付,即消息不会丢失也不会重复。
二、演示功能概述
本 demo 通过使用 Air8101 开发板,带你快速体验通过 MQTT 协议进行数据接收与发送。
三、准备硬件环境
“古人云:‘工欲善其事,必先利其器。’在深入介绍本功能示例之前,我们首先需要确保以下硬件环境的准备工作已经完成。”
参考:硬件环境清单,准备以及组装好硬件环境。
四、软件环境
“凡事预则立,不预则废。”在详细阐述本功能示例之前,我们需先精心筹备好以下软件环境。
- Luatools 工具;
- 内核固件文件(底层 core 固件文件):LuatOS-SoC_V10001_Air8101.soc;参考项目使用的内核固件;
- luatos 需要的脚本和资源文件
脚本和资源文件:https://gitee.com/openLuat/LuatOS-Air8101/tree/master/demo/wlan/softAP
lib 脚本文件:使用 Luatools 烧录时,勾选 添加默认 lib 选项,使用默认 lib 脚本文件;
准备好软件环境之后,接下来查看如何烧录项目文件到 Air8101 开发板,将本篇文章中演示使用的项目文件烧录到 Air8101 开发板中。
五、MQTT API 介绍
5.1 创建一个 mqtt client 实例
mqtt.client(clientId, keepAlive, username, password, cleanSession, will, version)
参数
名称 | 传入值类型 | 释义 |
clientId | string | 确保设备唯一性 |
keepAlive | number | 可选参数,默认为300 心跳间隔(单位为秒),默认300秒 |
username | string | 可选参数,默认为"" 用户名,用户名为空配置为""或者nil |
password | string | 可选参数,默认为"" 密码,密码为空配置为""或者nil |
cleanSession | number | 可选参数,默认为1 1/0 |
will | table | 可选参数,默认为nil 遗嘱参数,格式为{qos=,retain=,topic=,payload=} |
version | string | 可选参数,默认为"3.1.1" MQTT版本号,仅支持"3.1"和"3.1.1" |
返回值
table mqttc client 实例
例子
mqttc = mqtt.client("clientid-123")
mqttc = mqtt.client("clientid-123",200)
mqttc = mqtt.client("clientid-123",nil,"user","password")
mqttc = mqtt.client("clientid-123",nil,"user","password",nil,{qos=0,retain=0,topic="willTopic",payload="willTopic"},"3.1")
5.2 连接 mqtt 服务器
mqttc:connect(host, port, transport, cert, timeout)
参数
名称 | 传入值类型 | 释义 |
host | string | 服务器地址 |
port | param | string或者number类型,服务器端口 |
transport | string | 可选参数,默认为"tcp" “tcp"或者"tcp_ssl” |
cert | table | 可选参数,默认为nil table或者nil类型,ssl证书,当transport为"tcp_ssl"时,此参数才有意义。cert格式如下: { caCert = “ca.crt”, --CA证书文件(Base64编码 X.509格式),如果存在此参数,则表示客户端会对服务器的证书进行校验;不存在则不校验 clientCert = “client.crt”, --客户端证书文件(Base64编码 X.509格式),服务器对客户端的证书进行校验时会用到此参数 clientKey = “client.key”, --客户端私钥文件(Base64编码 X.509格式) clientPassword = “123456”, --客户端证书文件密码[可选] } |
timeout | number | 可选参数,默认为120 可选参数,socket连接超时时间,单位秒 |
返回值
result true 表示成功,false 或者 nil 表示失败
例子
mqttc = mqtt.client("clientid-123", nil, nil, false); mqttc:connect("mqttserver.com", 1883, "tcp", 5)
5.3 订阅主题
mqttc:subscribe(topic, qos)
参数
名称 | 传入值类型 | 释义 |
topic | param | string或者table类型,一个主题时为string类型,多个主题时为table类型,主题内容为UTF8编码 |
qos | param | 可选参数,默认为0 number或者nil,topic为一个主题时,qos为number类型(0/1/2,默认0);topic为多个主题时,qos为nil |
返回值
bool true 表示成功,false 或者 nil 表示失败
例子
mqttc:subscribe("/abc", 0) -- subscribe topic "/abc" with qos = 0
mqttc:subscribe({["/topic1"] = 0, ["/topic2"] = 1, ["/topic3"] = 2}) -- subscribe multi topic
5.4 取消订阅主题
mqttc:unsubscribe(topic)
参数
名称 | 传入值类型 | 释义 |
topic | param | string或者table类型,一个主题时为string类型,多个主题时为table类型,主题内容为UTF8编码 |
返回值
bool true 表示成功,false 或者 nil 表示失败
例子
mqttc:unsubscribe("/abc") -- unsubscribe topic "/abc"
mqttc:unsubscribe({"/topic1", "/topic2", "/topic3"}) -- unsubscribe multi topic
5.5 发布一条消息
mqttc:publish(topic, payload, qos, retain)
参数
名称 | 传入值类型 | 释义 |
topic payload | string string | UTF8编码的字符串 用户自己控制payload的编码,mqtt.lua不会对payload做任何编码转换 |
qos 0/1/2, default | number | 可选参数,默认为0 |
retain | number | 可选参数,默认为0 0或者1 |
返回值
bool 发布成功返回 true,失败返回 false
例子
mqttc = mqtt.client("clientid-123", nil, nil, false)
mqttc:connect("mqttserver.com", 1883, "tcp")
mqttc:publish("/topic", "publish from luat mqtt client", 0)
5.6 接收消息
mqttc:receive(timeout, msg)
参数
名称 | 传入值类型 | 释义 |
timeout msg | number string | 接收超时时间,单位毫秒 可选参数,默认为nil 可选参数,控制socket所在的线程退出recv阻塞状态 |
返回值
result 数据接收结果,true 表示成功,false 表示失败
data
如果 result 为 true,表示服务器发过来的 mqtt 包
如果 result 为 false,超时失败,data 为"timeout"
如果 result 为 false,msg 控制退出,data 为 msg 的字符串
如果 result 为 false,socket 连接被动断开控制退出,data 为"CLOSED"
如果 result 为 false,PDP 断开连接控制退出,data 为"IP_ERROR_IND"
如果 result 为 false,mqtt 不处于连接状态,data 为 nil
如果 result 为 false,收到了 PUBLISH 报文,发送 PUBACK 或者 PUBREC 报文失败,data 为 nil
如果 result 为 false,收到了 PUBREC 报文,发送 PUBREL 报文失败,data 为 nil
如果 result 为 false,收到了 PUBREL 报文,发送 PUBCOMP 报文失败,data 为 nil
如果 result 为 false,发送 PINGREQ 报文失败,data 为 nil
param 如果是 msg 控制退出,param 的值是 msg 的参数;其余情况无意义,为 nil
例子
true, packet = mqttc:receive(2000)
false, error_message = mqttc:receive(2000)
false, msg, para = mqttc:receive(2000,"APP_SEND_DATA")
5.7 断开与服务器的连接
mqttc:disconnect()
参数
无
返回值
nil
例子
mqttc = mqtt.client("clientid-123", nil, nil, false)
mqttc:connect("mqttserver.com", 1883, "tcp")
process data
mqttc:disconnect()
六、代码示例介绍
6.1 DEMO 软件流程图
6.2 配置
6.2.1 MQTT 的 4 个重要配置
- local mqtt_host = "lbsmqtt.airm2m.com"(MQTT 服务器地址,这里使用合宙提供公用测试服务器)
- local mqtt_port = 1884(MQTT 端口)
- local mqtt_client_id = "abc" (暂时只定义,但暂未使用)
- local user_name = "user"(MQTT 服务器登录用户名)
- local password = "password"(MQTT 服务器登录密码)
6.2.2 本 DEMO 通信使用的 2 个重要主题
- local pub_topic ="1001001/up"(设备发布主题,可自行修改)
- local sub_topic = "1001001/down"(设备订阅主题,可自行修改)
6.3 完整程序清单
注:完整复制后保存为 main.lua,可直接使用
-- LuaTools需要PROJECT和VERSION这两个信息
PROJECT = "mqttdemo"
VERSION = "1.0.0"
--[[
本demo需要mqtt库,
mqtt也是内置库, 无需require
]]
-- sys库是标配
_G.sys = require("sys")
--[[特别注意, 使用mqtt库需要下列语句]]
_G.sysplus = require("sysplus")
--根据自己的服务器修改以下参数
local mqtt_host = "lbsmqtt.airm2m.com"
local mqtt_port = 1884
local mqtt_isssl = false
local ca_file = false
local client_id = "mqttx_b55c41b7"
local user_name = "user"
local password = "password"
local pub_topic = ""-- .. (mcu.unique_id():toHex())
local sub_topic = ""-- .. (mcu.unique_id():toHex())
local mqttc = nil
sys.taskInit(function()
-----------------------------
-- 统一联网函数, 可自行删减
----------------------------
if wlan and wlan.connect then
-- wifi 联网, Air8101系列均支持
local ssid = "Xiaomi_110"
local password = "1234567890"
log.info("wifi", ssid, password)
wlan.init()
wlan.setMode(wlan.STATION)
wlan.connect(ssid, password, 1)
--等待WIFI联网结果,WIFI联网成功后,内核固件会产生一个"IP_READY"消息
local result, data = sys.waitUntil("IP_READY")
log.info("wlan", "IP_READY", result, data)
device_id = wlan.getMac()
else
-- 其他不认识的bsp, 循环提示一下吧
while 1 do
sys.wait(1000)
log.info("bsp", "本bsp可能未适配网络层, 请查证")
end
end
log.info("已联网")
sys.publish("net_ready")
end)
sys.taskInit(function()
-- 等待联网
sys.waitUntil("net_ready")
local device_id = "1001001"
client_id = device_id
pub_topic = device_id .. "/up" -- 设备发布的主题,开发者可自行修改
sub_topic = device_id .. "/down" -- 设备订阅的主题,开发者可自行修改
-- 打印一下上报(pub)和下发(sub)的topic名称
-- 上报: 设备 ---> 服务器
-- 下发: 设备 <--- 服务器
-- 可使用mqtt.x等客户端进行调试
log.info("mqtt", "pub", pub_topic)
log.info("mqtt", "sub", sub_topic)
-- 打印一下支持的加密套件, 通常来说, 固件已包含常见的99%的加密套件
-- if crypto.cipher_suites then
-- log.info("cipher", "suites", json.encode(crypto.cipher_suites()))
-- end
if mqtt == nil then
while 1 do
sys.wait(1000)
log.info("bsp", "本bsp未适配mqtt库, 请查证")
end
end
-------------------------------------
-------- MQTT 演示代码 --------------
-------------------------------------
mqttc = mqtt.create(nil, mqtt_host, mqtt_port, mqtt_isssl, ca_file)
mqttc:auth(client_id,user_name,password) -- client_id必填,其余选填
-- mqttc:keepalive(240) -- 默认值240s
mqttc:autoreconn(true, 3000) -- 自动重连机制
mqttc:on(function(mqtt_client, event, data, payload)
-- 用户自定义代码
log.info("mqtt", "event", event, mqtt_client, data, payload)
if event == "conack" then
-- 联上了
sys.publish("mqtt_conack")
mqtt_client:subscribe(sub_topic)--单主题订阅
-- mqtt_client:subscribe({[topic1]=1,[topic2]=1,[topic3]=1})--多主题订阅
elseif event == "recv" then
log.info("mqtt", "downlink", "topic", data, "payload:", payload)
log.info("mqtt", "uplink", "topic", pub_topic, "payload:", payload)
sys.publish("mqtt_pub", pub_topic, payload) --将收到的数据,通过发布主题目,进行发送
elseif event == "sent" then
log.info("mqtt", "sent", "pkgid", data)
elseif event == "disconnect" then
-- 非自动重连时,按需重启mqttc
-- mqtt_client:connect()
end
end)
-- mqttc自动处理重连, 除非自行关闭
mqttc:connect()
sys.waitUntil("mqtt_conack")
while true do
-- 演示等待其他task发送过来的上报信息
local ret, topic, data, qos = sys.waitUntil("mqtt_pub", 300000)
if ret then
-- 提供关闭本while循环的途径, 不需要可以注释掉
if topic == "close" then break end
mqttc:publish(topic, data, qos)
end
-- 如果没有其他task上报, 可以写个空等待
--sys.wait(6000)
end
mqttc:close()
mqttc = nil
end)
-- 用户代码已结束---------------------------------------------
-- 结尾总是这一句
sys.run()
-- sys.run()之后后面不要加任何语句!!!!!
七、功能验证
7.1 查看调试日志,获取 WIFI 信息和设备发布与订阅主题
7.2 打开 MQTT 客户端 MQTT.fx 应用程序并配置
7.2.1 MQTT 客户端基本配置
ProfileName:合宙(可修改为你想要的名称)
BrokerAddress:http://lbsmqtt.airm2m.com (合宙提供的免费测试服务器,也可修改为自己的服务器)
BrokerPort:1883 (端口号)
UserName:user
Password:password
7.2.2 两个重要主题
设备发布主题:1001001/up (设备向服务器发送数据使用)
设备订阅主题:1001001/down (接收服务器数据主题)
7.3 MQTT 客户端订阅设备主题
7.4 MQTT 客户端给开发板发送数据
7.5 MQTT 客户端订阅的设备数据
7.6 连接 MQTT 服务器成功日志
7.7 接收和发送数据日志
7.8 热点关闭时候日志
7.9 热点关闭一段时间,然后再打开热点,wifi 可以自动重新连接热点自动连接获取 ip
7.10 热点关闭一段时间,然后再打开热点,MQTT 自动重新连接
首先开启自动重连机制 mqttc:autoreconn(true, 3000) ,
然后热点关闭一段时间,然后再打开热点,wifi 可以自动重新连接,并且能够接收到之前订阅的主题消息。
总结
至此,我们已使用 Air8101 开发板完成了 MQTT 通信的基本功能。