LuatOS 的消息机制详解
作者:秦鹏
LuatOS 的消息机制是其多任务协作和事件驱动编程的核心部分,主要通过 sys
核心库实现。
消息机制包括消息的发送、接收、订阅,以及系统消息的定义和使用,下面分别详细描述其原理和使用方法。
一、LuatOS 消息机制的原理
1.1 消息机制的基本原理描述
LuatOS 基于消息队列实现任务间通信,消息队列遵循先进先出(FIFO)原则。
消息机制配合 Lua 的协程(coroutine),可以非常丝滑的实现协作式多任务。
每一个协程的运行,是相互逻辑独立的;
在协程之间收发消息,可以实现不同协程间的通信。
消息发布者调用 LuatOS 的 API 发送全局消息或者定向消息。
订阅者通过回调函数或协程等待消息并处理,实现异步事件驱动。
消息调度流程大致如下:
(1)发布消息时,将消息及参数插入消息队列。
(2)消息分发函数从队列取出消息,根据消息 ID 查找订阅者列表。
(3)调用订阅者的回调函数或恢复协程,传递消息参数。
(4)订阅者处理消息,实现任务间通信和事件响应。
这种设计使得任务间解耦,消息驱动程序结构清晰,适合物联网设备的异步事件处理需求。
1.2 LuatOS 的消息队列
LuatOS 的消息队列有两个: 用户消息队列和系统消息队列。
1.2.1 用户消息队列:
用户消息队列,存储在Lua层,由Lua脚本内部使用。
用户消息队列的发送接口有两个: sys.publish()和 sys.sendMsg()。
用户消息队列的接收接口有三个: sys.waitUntil()
,sys.waitMsg()
,sys.subscribe()
。
1.2.2 系统消息队列:
系统消息队列,存储在RTOS层,是RTOS操作系统的基本单元。
系统消息只能是使用 RTOS 的接口发送该消息,并且该发送消息接口是不开放给用户的。
系统消息队列的消息, 由 sys 核心库收取,并传送给对应的协程或者回调函数进行处理。
系统消息队列用于硬件事件,网络事件和定时器事件的传递。
比如说,当 Lua 脚本的某个协程运行了这行代码:
sys.wait(1000)
sys 核心库就会调用 rots 的timer 接口, 注册一个定时器消息。
当定时器超时后,rots 会发送给sys库一个定时器超时消息,sys库就能够通知对应的协程,可以恢复运行了。
1.3 topic 和消息的差异
1.3.1 topic 和消息的差异
LuatOS 的消息,分为两类: topic 和消息。
topic 跟事件的概念是类似的,在 LuatOS 语境里面,topic 往往就是指事件。
topic 是全局消息,是广播消息。
而消息跟 topic 的不同,消息是有特定接收方的,而 topic 没有特定接收方。
LuatOS 的 sys.publish 接口是发布一个全局消息,也就是发布了一个 topic,也是发布了一个事件。
sys.subscribe 接口订阅了一个 topic,也就是订阅了一个事件。
sys.sendMsg, 是发送的消息, sys.waitMsg, 是等待和处理一个消息,是有特定的协程名字作为接收方的。
为了简化描述,在下文的其余部分,如果没有特别的说明,我们不再区分 topic 和消息的差异,统一把 topic 和消息称为消息。
1.3.2 mqtt 和 LuatOS 的 topic 的差异
mqtt 的 topic 与 sys 的 topic 有如下差异:
mqtt 的 topic 的订阅关系是服务器侧(borker)维护的,支持严格匹配和通配符匹配;
sys 的 topic 的订阅关系是 LuatOS 嵌入式系统内部维护的,只支持严格匹配。
1.4 LuatOS 消息的使用
尽管有用户消息队列和系统消息队列两种差异,但是在使用消息的时候,可以不关注这种差异。
用户只需要调用 sys.publish()接口和 sys.sendMsg()接口,按照自己的需要发送消息。
在接收和处理消息的时候,用 sys.subscribe()接口,指定某个消息的回调函数,这时候的消息,也不需要区分用户队列的消息还是系统消息,只要知道消息的名字,都可以指定消息的回调处理函数。
使用 sys.waitUtil()接口,sys.waitMsg 接口的时候,也不需要关心是用户队列的消息还是系统队列的消息,只要知道消息名字,都可以处理。
所有的发送消息,和接收处理消息的接口的使用,接下来都会做消息的介绍。
二、消息的发送
2.1 全局消息(sys 库)
全局消息,也可以理解为广播消息,所有的协程都是可以监听和处理的。
使用`sys.publish(topic, arg1, arg2, ...)`向所有订阅该消息的广播发送该消息。
由于这个接口的发送是没有目标的标识,所以任何的协程都可以订阅处理这个消息。
例如:
sys.publish("NET_READY", true)
sys.publish("SENSOR_DATA", 25.5, "℃")
该消息发布后,会放入用户的消息队列,等待被订阅者处理。
2.2 定向消息(sys 库)
使用 sys.sendMsg(taskName, target, arg2, arg3, arg4)
向指定任务发送消息,可以指定接收消息的协程的名字,也可以同时给出消息携带的参数。
这种消息很适用于协程间的点对点通信。 例如:
-- 向名字为“NET_TASK” 的协程发送 “HTTP_RESP”消息,携带的参数有两个,分别给出来。
sys.sendMsg("NET_TASK", "HTTP_RESP", 200, "{data:123}")
- 该消息直接发送给指定任务,很适合请求-响应模型。
三、消息的接收处理
LuatOS 消息的接收处理有三种方式: 订阅消息,取消订阅息,等待消息。
3.1 订阅消息
通过 sys.subscribe(id, callback)
订阅指定消息的 ID,注册回调函数,当消息到达时调用回调函数进行处理。
例如:
local function TempFunc(t)
if t > 30 then sys.publish("FAN_ON") end
end
sys.subscribe("TEMP_UPDATE", TempFunc)
3.2 取消订阅
使用sys.unsubscribe(id, callback)`取消订阅,解除消息与回调的绑定。
3.3 等待消息
LuatOS 有两个等待消息的接口: waitUntil 和 waitMsg。
3.3.1 sys.waitUntil 接口
使用`sys.waitUntil(id, timeout)`阻塞当前协程,直到收到指定消息或超时,返回消息参数。
waitUntil 是等待特定全局消息的触发,可以被用于协程内监听系统级消息(如网络状态、硬件事件),也可以用于监听sys.publish发布的全局消息。
接口原型为:
local ok, data = sys.waitUntil(topic, timeout)
-- topic:消息字符串,事件标识(如 "NET_READY")。
-- timeout:整数,超时时间(毫秒),若为 nil 则无限等待。
-- 返回值:
-- ok:布尔值,true 表示收到消息,false 表示超时。
-- data:消息携带的数据。
使用示例:
local function SMSfunc()
result, data = sys.waitUntil("SIM_IND", 120000)
if result then
-- 处理消息
end
end
local function NetFunc()
-- 等待网络就绪消息,超时30秒
local ok, ip = sys.waitUntil("NET_READY", 30000)
if ok then
log.info("网络已连接,IP:", ip)
else
log.warn("网络连接超时")
end
end
sys.taskInit(SMSFunc)
sys.taskInit(NetFunc)
- 该机制基于协程挂起和恢复实现,方便同步等待异步事件。
3.3.2 sys.waitMsg 接口
waitMsg 接口用于定向接收协程间的消息,用于协程间的通信,比较适合请求-响应的多协程合作的工作模式。
接口原型:
local msg = sys.waitMsg(taskName, target, timeout)
-- taskName:字符串,接收消息的协程的名称(需唯一)。注意是等待消息的协程名称,也就是自己的协程名称,不是发送消息的协程的名称
-- target:字符串,接收的消息标识(若为 nil 则接收任意消息)。
-- timeout:整数,超时时间(毫秒),若为 nil 则无限等待。
--返回值:
-- msg:消息表(含 target、arg2、arg3 等字段),超时返回 false
使用示例:
-- 协程A:发送配置请求给到名为 "CONFIG_TASK" 的协程
sys.sendMsg("CONFIG_TASK", "GET_CONFIG", "param1")
-- 协程 "CONFIG_TASK":接收配置
local msg = sys.waitMsg("CONFIG_TASK", "GET_CONFIG", 5000)
if msg then
log.info("收到配置参数:", msg.arg2) -- 输出 "param1"
end
waitMsg 有如下几个特点:
(1)精准定向:可以通过 taskName
和 target
指定发送方与消息名称;
(2)参数传递:sendMsg 可以支持最多 3 个参数(通过 arg2
~arg4
字段);
(3)适用协程之间的通信(如 HTTP 请求响应、任务间数据同步)。
四、sys 系统消息
4.1 系统消息是什么
LuatOS 框架预定义了一些系统消息,开发者可以直接订阅这些消息实现对硬件和系统事件的响应。
系统消息是由 LuatOS 内核或底层驱动自动发布的全局事件,面向所有订阅者广播,适用于硬件状态、网络事件等。
LuatOS 的系统消息,跟普通的消息并没有区别,但是由于是 LuatOS 底层发布的消息,并没有指定明确的接收协程名称,所以系统消息只能是当做全局的广播消息来处理。
用户可以使用 sys.WaitUntil 和 sys.subscibe 接口来处理系统消息,不能用 susplus.waitMsg 接口处理系统消息。
以下按功能模块分类详细说明所有系统消息及其触发条件和参数:
4.2 系统消息详解
详细的系统消息的定义和解读,参见如下链接:
4.3 补充说明
1, 这些系统消息均为系统自动发布,不能由用户主动发布。
2, 可结合 LuatOS 库的 API(如 mobile.getCellInfo()
、mobile.scell()
)获取详细数据。
五、LuatOS 的定时器机制
LuatOS 定时器机制基于消息驱动,常用的软件定时器接口集中在 sys
库中。
定时器到时后,会由 sys 库接收到定时器超时消息,并触发注册的回调函数或唤醒挂起的任务。
下面详细介绍其用法和原理。
5.1 LuatOS 定时器的基本原理
定时器到时后,会产生一条定时器消息(如 `rtos.MSG_TIMER`),并携带定时器 ID 作为参数。
系统主循环sys.run() 会检测到该消息,并根据注册信息分发给对应的回调函数或唤醒等待的任务。
5.2 常用定时器 API 及用法
5.2.1 单次定时器
1,创建方式:
sys.timerStart(function_name, timeout_ms, ...)
-- function_name:到时后执行的函数
-- timeout_ms:延时毫秒数
-- ...:可选参数,传递给回调函数
2,代码示例:
local function tFunc(arg)
log.info("TIMER", "单次定时器触发", arg)
end
sys.timerStart(tFunc,200,"一次性参数")
5.2.2 循环定时器
1, 创建方式:
local timer_id = sys.timerLoopStart(function_name, interval_ms, ...)
-- 每隔 interval_ms 毫秒重复执行回调函数
-- 返回值 timer_id 可用于后续关闭定时器
2, 代码示例
-- 说明:每 5 秒触发一次
local function TFunc()
log.info("TIMER", "循环定时器触发")
end
local tid = sys.timerLoopStart(TFunc, 5000)
5.2.3 停止定时器
1, 创建方式:
(1)停止指定定时器:
sys.timerStop(timer_id)
(2)停止所有绑定到某个回调的定时器:
sys.timerStopAll(function_name)
2,代码示例
-- 5 秒后关闭循环定时器
local tid = sys.timerLoopStart(my_func, 1000)
sys.wait(5000)
sys.timerStop(tid)
5.2.4 在任务中延时/等待消息
1, 函数说明
(1)sys.wait(ms):在 task 协程中挂起指定毫秒数,底层用定时器实现
(2)sys.waitUntil("MSG_ID", timeout_ms):等待某个消息或超时
代码示例
local function waitFunc()
sys.wait(4000)
log.info("TIMER", "4秒后继续")
local result = sys.waitUntil("MSG_ID", 5000)
if result then
log.info("waitUntil", "收到MSG_ID消息")
else
log.info("waitUntil", "5秒超时,没有收到MSG_ID消息")
end
end
sys.taskInit(waitFunc)
local TASK_NAME = "tcp_client_main"
-- tcp client socket的任务处理函数
local function tcp_client_main_task_func()
while true do
result = sys.waitMsg(TASK_NAME, "TCP_CLIENT_MAIN_EVENT", 15000)
if result then
log.info("waitMsg", "收到TCP_CLIENT_MAIN_EVENT消息")
else
log.info("waitMsg", "15秒超时,没有收到TCP_CLIENT_MAIN_EVENT消息")
end
-- 5秒后跳转到循环体开始位置,继续执行
sys.wait(5000)
end
end
--创建并且启动一个task
--运行这个task的主函数tcp_client_main_task_func
sys.taskInitEx(tcp_client_main_task_func, TASK_NAME)
5.2.5 判断定时器状态
1,使用方式
sys.timerIsActive(timer_id):判断定时器是否激活
2, 代码示例
-- sys.timerLoopStart 创建了一个2秒循环一次的定时器,返回定时器ID。
-- 在一个任务中,等待5秒后,使用 sys.timerIsActive(timer_id) 检查定时器是否还在运行。
-- 如果定时器激活,则打印提示并停止定时器;否则说明定时器已经被停止或超时。
local function tFun()
log.info("TIMER", "循环定时器触发")
end
local timer_id = sys.timerLoopStart(tFun,2000)
-- 创建一个任务,5秒后检查定时器状态
local function chFunc()
sys.wait(5000) -- 等待5秒
if sys.timerIsActive(timer_id) then
log.info("TIMER", "定时器仍然激活,准备停止")
sys.timerStop(timer_id)
else
log.info("TIMER", "定时器已经不激活")
end
end
sys.taskInit(chFunc)
5.3 总结
1, sys.wait
sys.waitUntil
sys.waitMsg
只能在 task 协程中使用。
2, 定时器到时后,底层会向消息队列推送 rtos.MSG_TIMER
消息,并附带定时器 ID。
3, 系统主循环 sys.run() 检测到该消息后,会查找定时器池,将参数传递给注册的回调或唤醒挂起的协程
4, 定时器总结表
序号 | 功能 | API 示例 | 说明 |
1 | 单次定时器 | sys.timerStart(func, 2000, arg) | 2 秒后执行一次 |
2 | 循环定时器 | sys.timerLoopStart(func, 5000, arg) | 每 5 秒执行一次 |
3 | 停止定时器 | sys.timerStop(timer_id) | 停止指定定时器 |
4 | 停止所有定时器 | sys.timerStopAll(func) | 停止所有同回调定时器 |
5 | 延时 | sys.wait(1000) | 协程中延时 1 秒 |
6 | 等待消息 | sys.waitUntil("MSG_ID", 5000) | 等待消息或超时 |
7 | 等待消息 | sys.waitMsg("TASK_NAME", "MSG_ID", 5000) | 等待发送给"TASK_NAME"的消息或超时 |
8 | 判断状态 | sys.timerIsActive(timer_id) | 查询定时器是否激活 |
六、消息机制的使用流程示例
如下的示例,展示了如何订阅消息、发布消息和在任务中等待消息,帮助理解 LuatOS 消息机制的核心用法。
该示例完整展示了 LuatOS 消息机制的核心用法,建议在真实设备运行时结合串口日志观察执行流程。
实际项目中可根据需要调整消息类型和定时器间隔。
6.1 代码示例
PROJECT = "STRUCTURED_MSG_DEMO"
VERSION = "1.0.0"
-- 全局变量定义
local mainTaskTimerId -- 主任务循环定时器ID
local subTaskHandle -- 子任务句柄
-- 模块级函数声明
local function globalEventHandler(data)
log.info("全局处理器", "收到全局事件:", data)
end
local function loopTimerCallback()
log.info("定时器", "循环定时器触发")
end
local function stopLoopTimer(tid)
log.info("定时器", "停止循环定时器")
sys.timerStop(tid)
end
local function subTaskProcessor()
log.info("子任务", "启动")
while true do
local msg = sys.waitMsg("SUB_TASK", "DATA_CMD", -1)
if msg then
log.info("子任务", "收到数据", msg.arg2, msg.arg3)
sys.publish("GLOBAL_EVENT", "处理完成:"..msg.arg2)
end
end
end
local function mainTask()
-- 初始化循环定时器
mainTaskTimerId = sys.timerLoopStart(loopTimerCallback, 1000)
-- 设置5秒后停止定时器
sys.timerStart(stopLoopTimer, 5000, mainTaskTimerId)
-- 发送定向消息
for i = 1, 3 do
sys.sendMsg("SUB_TASK", "DATA_CMD", "数据包"..i, os.time())
sys.wait(1500)
end
-- 处理全局事件
while true do
local ok, data = sys.waitUntil("GLOBAL_EVENT", 3000)
if ok then
log.info("主任务", "收到全局回调:", data)
else
log.warn("主任务", "等待全局事件超时")
break
end
end
end
-- 初始化函数
local function init()
-- 注册全局事件监听
sys.subscribe("GLOBAL_EVENT", globalEventHandler)
-- 启动子任务
subTaskHandle = sys.taskInitEx(subTaskProcessor, "SUB_TASK")
-- 启动主任务
sys.taskInit(mainTask)
end
-- 系统启动
init()
sys.run()
6.2 代码的执行流程说明
1, 初始化阶段:
订阅消息"GLOBAL_EVENT"的处理函数globalEventHandler
启动一个名为"SUB_TASK"的任务,任务主函数为subTaskProcessor
启动一个任务,任务主函数为mainTask
2, 消息传递流程:
sequenceDiagram
mainTask->>subTaskHandle: sendMsg(DATA_CMD)
subTaskHandle->>GLOBAL_EVENT: publish(处理完成)
GLOBAL_EVENT->>mainTask: waitUntil接收
3, 定时器生命周期:
循环定时器mainTaskTimerId,每隔1秒触发一次,触发5次后,被另外一个5秒超时的单次定时器给关掉
另外一个5秒超时的单次定时器,把循环定时器mainTaskTimerId关掉后,自己也会关掉
6.3 执行日志
I/子任务 启动
I/定时器 循环定时器触发
I/主任务 收到全局回调: 处理完成:数据包1
I/定时器 循环定时器触发
I/子任务 收到数据 数据包2 1700000000
I/主任务 收到全局回调: 处理完成:数据包2
I/定时器 循环定时器触发
I/子任务 收到数据 数据包3 1700000001
I/主任务 收到全局回调: 处理完成:数据包3
I/定时器 停止循环定时器
I/主任务 等待全局事件超时
6.4 关键 API 对照表
功能 | API |
全局消息订阅 | sys.subscribe |
定时器管理 | sys.timerStart/Stop |
定向消息发送 | sys.sendMsg |
定向消息接收 | sys.waitMsg |
任务间延时 | sys.wait |
具名任务创建 | sys.taskInitEx |
七、总结
- 消息机制原理:基于消息队列和协程,发布者将消息放入队列,订阅者通过回调或协程等待并处理,支持异步事件驱动。
- 发送消息:
sys.publish()
广播消息,sys.sendMsg()
定向发送消息。 - 接收消息:
sys.subscribe()
订阅消息,sys.waitUntil()
和sys.waitMsg
()阻塞等待消息,sys.unsubscribe()
取消订阅。 - 系统消息:框架预定义多种系统事件消息,直接订阅即可响应硬件和系统事件。
- 应用场景:网络模块通信、传感器数据广播、定时器事件处理等。
通过合理使用 LuatOS 的消息机制,可以实现高效、解耦的物联网应用架构,支持复杂的事件驱动和多任务协作