6201-蓝牙升级教程文档
作者:孟伟 | 最后修改:2026-02-27
一、蓝牙 FOTA 概述
蓝牙 FOTA(Firmware Over The Air)是通过蓝牙低功耗(BLE)技术实现设备固件远程无线升级的功能。本教程基于 LuatOS 系统,演示了 Air6201 系列模块通过 BLE 进行固件升级的演示方案。
1.1 蓝牙 FOTA 有什么用?
蓝牙 FOTA 扩展功能为物联网设备提供无线固件升级解决方案,主要特点包括:
- 无线升级:无需有线连接,通过 BLE 无线通信实现固件更新
- 分段传输:支持大文件分片传输,适应 BLE 的数据包限制
- 安全可靠:支持固件完整性验证,确保升级过程安全
二、演示功能概述
Air6201 系列模块通过 BLE(蓝牙低功耗)进行固件远程升级(FOTA)的完整实现方案,支持两种升级方式:
-
文件写入方式:将接收到的升级包数据先保存到本地临时文件,完成数据接收后再执行升级
-
优点:适合完整固件升级,逻辑清晰,易于调试
- 缺点:需要额外的文件系统分区存储空间保存完整固件
- 适用场景:存储空间充足的设备。
-
分段写入方式:直接将接收到的升级包数据通过 fota.run()函数处理,无需保存到本地文件
-
优点:节省存储空间,适合差分升级
- 缺点:对内存要求较高,需要足够的内存缓冲区实时处理数据
- 适用场景:存储空间有限的设备
本 demo 适用场景:
- 智能硬件、可穿戴设备等蓝牙连接设备的固件升级
- 需要自定义升级流程的应用场景
- 无网络环境下的设备固件更新
三、准备硬件环境

- Air6201 核心板一块
-
TYPE-C USB 数据线一根,接线方式为:
-
TYPE-C USB 数据线直接插到核心板的 TYPE-C USB 座子,另一端连接电脑 USB 口
四、软件环境
在开始实践本示例之前,先筹备一下软件环境:
- 烧录工具: Luatools 工具;需要注意的是 luatools 工具版本必须为 3.1.10 及以上版本,否则制作的升级包没办法升级。
- 内核固件:Air6201 V2001 版本固件以上(25/12/17 日后的固件)
- LuatOS 需要的脚本和资源文件
脚本和资源文件:https://gitee.com/openLuat/LuatOS/tree/master/module/Air6201/demo/fota/ble_fota
准备好软件环境之后,接下来查看如何烧录项目文件到 Air6201 核心板,将本篇文章中演示使用的项目文件烧录到 Air6201 核心板中。
- Python 环境:Python 3 环境(用于运行 ble_fota_tool.py 发送升级包)
- 需要安装 bleak 库,可通过以下命令安装:
pip install bleak
五、API 接口说明
详细 API 文档请参考:
ble:https://docs.openluat.com/osapi/core/ble/
fota:https://docs.openluat.com/osapi/core/fota/
六、蓝牙 GATT 服务设计
6.1 服务与特征值定义
6.1.1 命令报文格式
开始升级命令
- 命令码:0x01
- 格式:
[命令码(1字节)] + [文件大小(4字节)] - 示例:
01 75 15 00 00表示开始升级,固件大小为 5493 字节 -
说明:
-
命令码固定为 0x01
- 文件大小为 4 字节小端序
- 设备收到此命令后,初始化 FOTA 子系统并准备接收数据
结束升级命令
- 命令码:0x02
- 格式:
[命令码(1字节)] - 示例:
02 -
说明:
-
命令码固定为 0x02
- 设备收到此命令后,验证固件完整性并执行升级
6.1.2 数据报文格式
- 格式:
[固件数据(n字节)] - 默认长度:200 字节(可通过 ble_fota_tool.py 配置调整,最大 256)
-
说明:
-
固件数据为二进制格式
- 每包数据大小不超过 BLE 数据包最大长度:Air6201 系列模组硬件支持最大 256 字节,考虑到稳定性建议保持默认的 200 字节
- 设备收到数据后,根据所选的 FOTA 方式进行处理:
- 文件方式:将数据写入临时文件
- 分段方式:直接通过 fota.run()处理数据
6.2 升级流程说明
用法:
- 先把脚本和固件烧录到 Air6201 模块中,并确认设备正常启动
- 模块启动后会自动开启 BLE 广播,广播名称为"Air6201_FOTA"
-
在电脑端操作:运行 ble_fota_tool.py 脚本连接设备并发送升级固件
-
注意:确保升级文件名为正确格式,并且与 ble_fota_tool.py 在同一目录下
- 观察日志输出确认升级进度
- 模块接收并验证固件成功后,会自动重启并应用新固件
BLE 通讯过程说明:
蓝牙 FOTA 升级通过 BLE 特征值进行命令控制和数据传输:
- 上位机通过 BLE 扫描并连接名为"Air6201_FOTA"的设备
- 上位机向命令特征值(F001)发送开始升级命令(0x01)和固件大小
- 设备初始化 FOTA 功能并准备接收数据
- 上位机向数据特征值(F002)分包发送固件数据
- 设备接收并保存数据到临时文件
- 上位机发送结束升级命令(0x02)
- 设备验证固件完整性并执行 FOTA 升级流程
- 升级成功后设备自动重启
6.3、蓝牙 FOTA 升级工具使用说明
6.3.1 工具功能
- 连接 Air6201 设备并发送升级包
- 显示升级进度和状态信息
- 支持自定义升级包文件名
6.3.2 使用方法
- 确保 Air6201 设备处于蓝牙广播状态,设备名称为"Air6201_FOTA"
- 在命令行中运行 ble_fota_tool.py 脚本,指定升级包文件名:
- bash
- python ble_fota_tool.py -f 升级包文件名
6.3.3 参数说明
-d或--device:指定目标设备名称(默认值为"Air6201_FOTA")-f或--firmware:指定固件文件路径(必需参数)
6.3.4 工作流程
当运行脚本后,它会自动执行以下步骤:
- 加载指定的固件文件
- 扫描并发现蓝牙设备
- 连接到目标设备
- 发送开始升级命令
- 发送固件数据到设备
- 发送结束升级命令
- 等待设备处理升级
- 断开连接并完成升级过程
七、代码示例介绍
下面将演示蓝牙 FOTA 的两种典型场景:文件方式升级和分包方式升级,用户可根据实际需求选择配置方式。
7.1 主要代码分析
7.1.1 文件系统方式 FOTA 升级业务逻辑模块(ble_file_fota.lua)
FOTA 业务逻辑模块,负责升级流程控制、命令处理和文件操作;:
--[[
-- 蓝牙FOTA升级功能(文件写入方式)
-- 提供通过蓝牙低功耗(BLE)接收升级包数据进行固件升级的功能
本文件为FOTA业务逻辑处理模块,核心业务逻辑为:
1. 处理接收到的BLE写入请求数据
2. 实现FOTA升级流程的控制
3. 管理升级状态和文件操作
本文件的对外接口有1个:
1. ble_file_fota.proc(service_uuid, char_uuid, data): 处理接收到的BLE写入请求数据
依赖模块:
- ble_main: 用于提供BLE服务和事件处理
]]
local ble_file_fota = {}
-- 升级状态管理
local upgrade_state = {
is_upgrading = false, -- 是否正在升级
total_size = 0, -- 总文件大小(字节)
received_size = 0, -- 已接收大小(字节)
upgrade_file = "/ble_fota.bin" -- 临时升级文件路径
}
-- 配置参数
local config = {
service_uuid = "F000", -- FOTA服务UUID(短格式)
char_uuid_cmd = "F001", -- 命令特征值UUID
char_uuid_data = "F002", -- 数据特征值UUID
max_packet_size = 200 -- BLE数据包最大长度(字节)
}
local function ble_reboot()
-- 完成FOTA流程并重启
fota.finish(true)
log.info("FOTA_CMD", "正在重启设备...")
rtos.reboot()
end
-- 处理FOTA命令
-- @param cmd_data 命令数据,格式:[命令码(1字节)] 或 [命令码(1字节) + 文件大小(4字节)]
local function handle_command(cmd_data)
log.info("FOTA_CMD", "收到命令数据:", cmd_data:toHex(), "长度:", #cmd_data)
-- 检查命令数据是否有效
if #cmd_data < 1 then
log.error("FOTA_CMD", "命令数据为空")
return
end
-- 解析命令码(第一个字节)
local cmd = cmd_data:byte(1)
log.info("FOTA_CMD", "解析命令码:", cmd, string.format("(0x%02X)", cmd))
-- 命令0x01:开始升级
if cmd == 0x01 then
log.info("FOTA_CMD", "处理开始升级命令")
-- 检查命令格式:需要至少5字节(1字节命令码 + 4字节文件大小)
if #cmd_data >= 5 then
-- 解析文件大小(小端序,从第2字节开始)
local total_size = string.unpack("<I4", cmd_data, 2)
log.info("FOTA_CMD", "文件总大小:", total_size, "字节")
-- 初始化FOTA子系统
log.info("FOTA_CMD", "初始化FOTA子系统...")
if fota.init() then
log.info("FOTA_CMD", "FOTA初始化成功")
-- 等待FOTA底层准备就绪
log.info("FOTA_CMD", "等待FOTA底层准备...")
-- 等待FOTA底层准备就绪,最多等待10秒
local wait_count = 0
local wait_ok = false
while wait_count < 100 do -- 最多轮询100次,每次100ms,共10秒
if fota.wait() then
wait_ok = true
break
end
sys.wait(100)
wait_count = wait_count + 1
end
if wait_ok then
log.info("FOTA_CMD", "FOTA底层准备就绪")
-- 删除旧的临时文件(如果存在)
if os.remove(upgrade_state.upgrade_file) then
log.info("FOTA_CMD", "已清理旧临时文件")
end
-- 更新升级状态
upgrade_state.is_upgrading = true
upgrade_state.total_size = total_size
upgrade_state.received_size = 0
log.info("FOTA_CMD", "升级状态已设置",
"总大小:", upgrade_state.total_size,
"临时文件:", upgrade_state.upgrade_file)
log.info("FOTA_CMD", "准备接收固件数据...")
else
log.error("FOTA_CMD", "FOTA底层准备超时")
fota.finish(false)
upgrade_state.is_upgrading = false
end
else
log.error("FOTA_CMD", "FOTA初始化失败")
end
else
log.error("FOTA_CMD", "开始命令格式错误,长度不足")
end
-- 命令0x02:结束升级
elseif cmd == 0x02 then
log.info("FOTA_CMD", "处理结束升级命令")
-- 检查是否处于升级状态
if not upgrade_state.is_upgrading then
log.warn("FOTA_CMD", "未处于升级状态,忽略结束命令")
return
end
-- 验证文件完整性
log.info("FOTA_CMD", "验证文件完整性...")
log.info("FOTA_CMD", "已接收:", upgrade_state.received_size, "字节")
log.info("FOTA_CMD", "应接收:", upgrade_state.total_size, "字节")
if upgrade_state.received_size == upgrade_state.total_size then
log.info("FOTA_CMD", "文件完整性验证通过")
-- 执行FOTA升级
log.info("FOTA_CMD", "开始执行FOTA升级...")
local result, isDone = fota.file(upgrade_state.upgrade_file)
log.info("FOTA_CMD", "FOTA升级结果:", "result:", result, "isDone:", isDone)
if result and isDone then
log.info("FOTA_CMD", " FOTA升级成功!")
-- 延迟重启,给用户一些反应时间
log.info("FOTA_CMD", "2秒后设备将自动重启...,重启后通过日志判断最终是否升级成功")
-- 延迟2秒后重启设备
sys.timerStart(ble_reboot, 2000)
else
log.error("FOTA_CMD", "FOTA升级失败")
end
else
log.error("FOTA_CMD", "文件不完整,升级失败")
end
-- 清理升级状态(无论成功还是失败)
log.info("FOTA_CMD", "清理升级状态...")
upgrade_state.is_upgrading = false
-- 删除临时文件
if upgrade_state.upgrade_file then
if os.remove(upgrade_state.upgrade_file) then
log.info("FOTA_CMD", "已删除临时文件")
else
log.warn("FOTA_CMD", "删除临时文件失败")
end
end
-- 结束FOTA流程
fota.finish(false)
log.info("FOTA_CMD", "升级流程结束")
else
log.warn("FOTA_CMD", "未知命令码:", cmd, string.format("(0x%02X)", cmd))
end
end
-- 处理FOTA数据
-- @param data 固件数据块
local function handle_data(data)
log.info("FOTA_DATA", "收到数据包,长度:", #data, "字节")
-- 检查是否处于升级状态
if not upgrade_state.is_upgrading then
log.warn("FOTA_DATA", "未处于升级状态,忽略数据")
return
end
-- 保存数据到临时文件
log.info("FOTA_DATA", "写入文件:", upgrade_state.upgrade_file)
local file = io.open(upgrade_state.upgrade_file, "ab")
if file then
-- 写入数据
file:write(data)
file:close()
-- 更新接收状态
upgrade_state.received_size = upgrade_state.received_size + #data
-- 计算并显示进度
local progress = math.floor((upgrade_state.received_size / upgrade_state.total_size) * 100)
-- 每50个数据包或完成时打印进度
if upgrade_state.received_size % (config.max_packet_size * 50) == 0 or
upgrade_state.received_size >= upgrade_state.total_size then
log.info("FOTA_DATA", "升级进度:", progress, "%",
"(", upgrade_state.received_size, "/", upgrade_state.total_size, ")")
end
log.info("FOTA_DATA", "数据写入成功,当前总计:", upgrade_state.received_size, "字节")
else
log.error("FOTA_DATA", "打开文件失败:", upgrade_state.upgrade_file)
-- 文件操作失败,终止升级
upgrade_state.is_upgrading = false
fota.finish(false)
end
end
-- 处理接收到的BLE写入请求数据
-- @param service_uuid 服务UUID
-- @param char_uuid 特征值UUID
-- @param data 写入的数据
function ble_file_fota.proc(service_uuid, char_uuid, data)
log.info("ble_file_fota", "处理写入数据", service_uuid, char_uuid, data:toHex())
-- 简化的UUID匹配逻辑:检查UUID是否包含我们的短UUID
local is_service_match = string.find(service_uuid:lower(), config.service_uuid:lower())
local is_cmd_match = string.find(char_uuid:lower(), config.char_uuid_cmd:lower())
local is_data_match = string.find(char_uuid:lower(), config.char_uuid_data:lower())
log.info("ble_file_fota", "UUID匹配结果:",
"服务匹配:", is_service_match,
"命令匹配:", is_cmd_match,
"数据匹配:", is_data_match)
if is_service_match then
if is_cmd_match then
-- 命令特征值:处理FOTA命令
log.info("ble_file_fota", "命令特征值匹配,处理命令")
handle_command(data)
elseif is_data_match then
-- 数据特征值:处理FOTA数据
log.info("ble_file_fota", "数据特征值匹配,处理数据")
handle_data(data)
else
log.warn("ble_file_fota", "未知的特征值UUID:", char_uuid)
end
else
log.warn("ble_file_fota", "未知的服务UUID:", service_uuid)
end
end
function ble_file_fota.proc_disconnect()
log.info("ble_file_fota", "处理连接断开事件")
-- 如果正在升级,连接断开则终止升级
if upgrade_state.is_upgrading then
log.error("ble_file_fota", "升级过程中连接断开,终止升级")
upgrade_state.is_upgrading = false
-- 删除临时文件
if upgrade_state.upgrade_file then
os.remove(upgrade_state.upgrade_file)
end
-- 结束FOTA流程
fota.finish(false)
end
end
return ble_file_fota
7.1.2 分包方式 FOTA 升级业务逻辑模块(ble_packet_fota.lua)
--[[
-- 蓝牙FOTA升级功能(分段写入方式)
-- 提供通过蓝牙低功耗(BLE)接收升级包数据进行固件升级的功能
本文件为FOTA业务逻辑处理模块,核心业务逻辑为:
1. 处理接收到的BLE写入请求数据
2. 实现FOTA升级流程的控制(分段写入方式)
3. 管理升级状态和分段数据操作
本文件的对外接口有1个:
1. ble_packet_fota.proc(service_uuid, char_uuid, data): 处理接收到的BLE写入请求数据
依赖模块:
- ble_main: 用于提供BLE服务和事件处理
]]
local ble_packet_fota = {}
-- 升级状态管理
local upgrade_state = {
is_upgrading = false, -- 是否正在升级
total_size = 0, -- 总文件大小(字节)
received_size = 0, -- 已接收大小(字节)
upgrade_packet = 0 -- 升级包计数器
}
-- 配置参数
local config = {
service_uuid = "F000", -- FOTA服务UUID(短格式)
char_uuid_cmd = "F001", -- 命令特征值UUID
char_uuid_data = "F002", -- 数据特征值UUID
max_packet_size = 200 -- BLE数据包最大长度(字节)
}
local function ble_reboot()
-- 完成FOTA流程并重启
fota.finish(true)
log.info("FOTA_CMD", "正在重启设备...")
rtos.reboot()
end
-- 处理FOTA命令
-- @param cmd_data 命令数据,格式:[命令码(1字节)] 或 [命令码(1字节) + 文件大小(4字节)]
local function handle_command(cmd_data)
log.info("FOTA_CMD", "收到命令数据:", cmd_data:toHex(), "长度:", #cmd_data)
-- 检查命令数据是否有效
if #cmd_data < 1 then
log.error("FOTA_CMD", "命令数据为空")
return
end
-- 解析命令码(第一个字节)
local cmd = cmd_data:byte(1)
log.info("FOTA_CMD", "解析命令码:", cmd, string.format("(0x%02X)", cmd))
-- 命令0x01:开始升级
if cmd == 0x01 then
log.info("FOTA_CMD", "处理开始升级命令")
-- 检查命令格式:需要至少5字节(1字节命令码 + 4字节文件大小)
if #cmd_data >= 5 then
-- 解析文件大小(小端序,从第2字节开始)
local total_size = string.unpack("<I4", cmd_data, 2)
log.info("FOTA_CMD", "文件总大小:", total_size, "字节")
-- 初始化FOTA子系统
log.info("FOTA_CMD", "初始化FOTA子系统...")
if fota.init() then
log.info("FOTA_CMD", "FOTA初始化成功")
-- 等待FOTA底层准备就绪
log.info("FOTA_CMD", "等待FOTA底层准备...")
-- 等待FOTA底层准备就绪,最多等待10秒
local wait_count = 0
local wait_ok = false
while wait_count < 100 do -- 最多轮询100次,每次100ms,共10秒
if fota.wait() then
wait_ok = true
break
end
sys.wait(100)
wait_count = wait_count + 1
end
if wait_ok then
log.info("FOTA_CMD", "FOTA底层准备就绪")
-- 更新升级状态
upgrade_state.is_upgrading = true
upgrade_state.total_size = total_size
upgrade_state.received_size = 0
upgrade_state.upgrade_packet = 0
log.info("FOTA_CMD", "升级状态已设置",
"总大小:", upgrade_state.total_size)
log.info("FOTA_CMD", "准备接收固件数据...")
else
log.error("FOTA_CMD", "FOTA底层准备超时")
fota.finish(false)
upgrade_state.is_upgrading = false
end
else
log.error("FOTA_CMD", "FOTA初始化失败")
end
else
log.error("FOTA_CMD", "开始命令格式错误,长度不足")
end
-- 命令0x02:结束升级(通知升级包发完)
elseif cmd == 0x02 then
log.info("FOTA_CMD", "处理结束升级命令")
-- 检查是否处于升级状态
if not upgrade_state.is_upgrading then
log.warn("FOTA_CMD", "未处于升级状态,忽略结束命令")
return
end
-- 验证文件完整性
log.info("FOTA_CMD", "验证文件完整性...")
log.info("FOTA_CMD", "已接收:", upgrade_state.received_size, "字节")
log.info("FOTA_CMD", "应接收:", upgrade_state.total_size, "字节")
if upgrade_state.received_size == upgrade_state.total_size then
log.info("FOTA_CMD", "文件完整性验证通过")
log.info("FOTA_CMD", "升级数据已全部接收,等待升级完成...")
-- 等待底层校验结束
local success = false
for i = 1, 30 do -- 最多等待3秒
sys.wait(100)
local succ, fotaDone = fota.isDone()
if not succ then
log.error("FOTA_CMD", "校验过程出错")
fota.finish(false)
upgrade_state.is_upgrading = false
break
end
if fotaDone then
log.info("FOTA_CMD", "FOTA升级成功!")
-- 延迟重启,给用户一些反应时间
log.info("FOTA_CMD", "2秒后设备将自动重启...,重启后通过日志判断最终是否升级成功")
-- 延迟2秒后重启设备
sys.timerStart(ble_reboot, 2000)
success = true
break
end
end
if not success then
log.error("FOTA_CMD", "校验超时")
fota.finish(false)
upgrade_state.is_upgrading = false
end
else
log.error("FOTA_CMD", "文件不完整,升级失败")
-- 清理升级状态
upgrade_state.is_upgrading = false
fota.finish(false)
end
log.info("FOTA_CMD", "结束升级命令处理完成")
else
log.warn("FOTA_CMD", "未知命令码:", cmd, string.format("(0x%02X)", cmd))
end
end
-- 处理FOTA数据
-- @param data 固件数据块
local function handle_data(data)
log.info("FOTA_DATA", "收到数据包,长度:", #data, "字节")
-- 检查是否处于升级状态
if not upgrade_state.is_upgrading then
log.warn("FOTA_DATA", "未处于升级状态,忽略数据")
return
end
-- 直接使用fota.run()处理分段数据,不写入文件
log.info("FOTA_DATA", "处理分段数据,包序号:", upgrade_state.upgrade_packet)
local result, isDone = fota.run(data)
log.info("FOTA_DATA", "分段写入结果:", "result:", result, "isDone:", isDone)
if result then
-- 更新接收状态
upgrade_state.received_size = upgrade_state.received_size + #data
upgrade_state.upgrade_packet = upgrade_state.upgrade_packet + 1
-- 计算并显示进度
local progress = math.floor((upgrade_state.received_size / upgrade_state.total_size) * 100)
-- 每50个数据包或完成时打印进度
if upgrade_state.received_size % (config.max_packet_size * 50) == 0 or
upgrade_state.received_size >= upgrade_state.total_size then
log.info("FOTA_DATA", "升级进度:", progress, "%",
"(", upgrade_state.received_size, "/", upgrade_state.total_size, ")")
end
log.info("FOTA_DATA", "数据写入成功,当前总计:", upgrade_state.received_size, "字节")
-- 如果所有数据都已接收,检查升级是否完成
if upgrade_state.received_size >= upgrade_state.total_size then
log.info("FOTA_DATA", "所有数据已接收,等待升级完成...")
end
else
log.error("FOTA_DATA", "分段写入失败")
-- 分段写入失败,终止升级
upgrade_state.is_upgrading = false
fota.finish(false)
end
end
-- 处理接收到的BLE写入请求数据
-- @param service_uuid 服务UUID
-- @param char_uuid 特征值UUID
-- @param data 写入的数据
function ble_packet_fota.proc(service_uuid, char_uuid, data)
log.info("ble_packet_fota", "处理写入数据", service_uuid, char_uuid, data:toHex())
-- 简化的UUID匹配逻辑:检查UUID是否包含我们的短UUID
local is_service_match = string.find(service_uuid:lower(), config.service_uuid:lower())
local is_cmd_match = string.find(char_uuid:lower(), config.char_uuid_cmd:lower())
local is_data_match = string.find(char_uuid:lower(), config.char_uuid_data:lower())
log.info("ble_packet_fota", "UUID匹配结果:",
"服务匹配:", is_service_match,
"命令匹配:", is_cmd_match,
"数据匹配:", is_data_match)
if is_service_match then
if is_cmd_match then
-- 命令特征值:处理FOTA命令
log.info("ble_packet_fota", "命令特征值匹配,处理命令")
handle_command(data)
elseif is_data_match then
-- 数据特征值:处理FOTA数据
log.info("ble_packet_fota", "数据特征值匹配,处理数据")
handle_data(data)
else
log.warn("ble_packet_fota", "未知的特征值UUID:", char_uuid)
end
else
log.warn("ble_packet_fota", "未知的服务UUID:", service_uuid)
end
end
-- 处理BLE连接断开事件
-- @return nil
function ble_packet_fota.proc_disconnect()
log.info("ble_packet_fota", "处理连接断开事件")
-- 如果正在升级,连接断开则终止升级
if upgrade_state.is_upgrading then
log.error("ble_packet_fota", "升级过程中连接断开,终止升级")
upgrade_state.is_upgrading = false
-- 结束FOTA流程
fota.finish(false)
end
end
return ble_packet_fota
7.1.2 BLE 服务主功能模块(ble_main.lua)
--[[
-- BLE服务主功能模块
-- 提供BLE服务的初始化、配置和事件处理
-- 不包含FOTA业务逻辑,仅处理BLE相关功能
依赖模块:
- ble_file_fota: 用于处理FOTA相关业务逻辑(文件写入方式)
- ble_packet_fota: 用于处理FOTA相关业务逻辑(分段写入方式)
]]
-- 选择FOTA升级方式:"file" 或 "packet"
-- 1. file方式:将升级包数据先写入本地文件,然后调用fota.file()进行升级
-- 2. packet方式:直接使用fota.packet()处理分段数据,不写入文件,适合差分升级
local fota_mode = "packet" -- 默认使用file方式
-- 根据选择加载对应的FOTA模块
local ble_fota_main
if fota_mode == "file" then
ble_fota_main = require "ble_file_fota"
else
ble_fota_main = require "ble_packet_fota"
end
-- ble_main的任务名
local TASK_NAME = "BLE_MAIN"
-- 配置参数
config = {
device_name = "Air6201_FOTA", -- 设备广播名称
service_uuid = "F000", -- FOTA服务UUID(短格式)
char_uuid_cmd = "F001", -- 命令特征值UUID
char_uuid_data = "F002", -- 数据特征值UUID
max_packet_size = 20 -- BLE数据包最大长度(字节)
}
local bluetooth_device = nil
local ble_device = nil
local adv_create = nil
local gatt_create = nil
-- GATT服务数据库定义
-- 这里定义了BLE设备提供的服务和特征值
local att_db = {
string.fromHex(config.service_uuid), -- Service UUID
{
string.fromHex(config.char_uuid_cmd),
ble.WRITE | ble.WRITE_CMD
},
{
string.fromHex(config.char_uuid_data),
ble.WRITE | ble.WRITE_CMD
}
}
-- BLE事件回调函数
local function ble_event_cb(ble_dev, event, param)
log.info("BLE_EVENT", "收到BLE事件:", event)
-- 根据LuatOS BLE事件枚举处理不同事件
if event == ble.EVENT_CONN then
-- 连接成功事件
log.info("BLE_EVENT", "设备已连接", "地址:", param.addr and param.addr:toHex() or "未知")
sys.sendMsg(TASK_NAME, "BLE_EVENT", "CONNECT", param)
elseif event == ble.EVENT_DISCONN then
-- 连接断开事件
log.info("BLE_EVENT", "设备已断开连接", "原因:", param.reason or "未知")
sys.sendMsg(TASK_NAME, "BLE_EVENT", "DISCONNECTED", param.reason)
elseif event == ble.EVENT_WRITE then
-- 写入事件 - 这是关键事件!
log.info("BLE_EVENT", "处理写入事件")
-- 检查参数是否完整
if not param or not param.uuid_service or not param.uuid_characteristic or not param.data then
log.error("BLE_EVENT", "写入事件参数不完整")
return
end
-- 获取服务UUID和特征值UUID
local service_uuid = param.uuid_service:toHex()
local char_uuid = param.uuid_characteristic:toHex()
local data = param.data
log.info("BLE_EVENT", "服务UUID:", service_uuid)
log.info("BLE_EVENT", "特征值UUID:", char_uuid)
log.info("BLE_EVENT", "数据长度:", #data, "字节")
sys.sendMsg(TASK_NAME, "BLE_EVENT", "WRITE_REQ", param)
elseif event == ble.EVENT_READ then
-- 读取事件 - 外围设备收到主设备读请求
log.info("BLE_EVENT", "处理读取事件")
elseif event == ble.EVENT_READ_VALUE then
-- 读取操作完成事件 - 中心设备读取特征值完成
log.info("BLE_EVENT", "读取操作完成", "数据:", param.data and param.data:toHex() or "无数据")
elseif event == ble.EVENT_SCAN_REPORT then
-- 扫描报告事件 - 中心设备扫描到其他BLE设备
log.info("BLE_EVENT", "扫描报告", "RSSI:", param.rssi, "地址:", param.adv_addr and param.adv_addr:toHex() or "未知")
elseif event == ble.EVENT_SCAN_STOP then
-- 扫描停止事件
log.info("BLE_EVENT", "扫描停止")
else
-- 其他事件
log.info("BLE_EVENT", "其他事件类型:", event)
if param then
-- 尝试打印参数的基本信息,避免直接打印table导致错误
if type(param) == "table" then
log.info("BLE_EVENT", "事件参数为table,包含字段:", #param)
for k, v in pairs(param) do
if type(v) == "string" then
log.info("BLE_EVENT", "参数字段:", k, "值:", v:toHex())
else
log.info("BLE_EVENT", "参数字段:", k, "类型:", type(v))
end
end
else
log.info("BLE_EVENT", "事件参数类型:", type(param))
end
end
end
end
-- 初始化BLE功能
local function ble_init()
log.info("BLE_INIT", "开始初始化BLE...")
-- 初始化蓝牙核心
bluetooth_device = bluetooth.init()
if not bluetooth_device then
log.error("BLE_INIT", "蓝牙核心初始化失败")
return false
end
log.info("BLE_INIT", "蓝牙核心初始化成功")
-- 初始化BLE功能
ble_device = bluetooth_device:ble(ble_event_cb)
if not ble_device then
log.error("BLE_INIT", "BLE功能初始化失败")
return false
end
log.info("BLE_INIT", "BLE功能初始化成功")
-- 创建GATT服务
gatt_create = ble_device:gatt_create(att_db)
if not gatt_create then
log.error("BLE_INIT", "GATT服务创建失败")
return false
end
log.info("BLE_INIT", "GATT服务创建成功")
-- 配置广播数据
log.info("BLE_INIT", "配置广播数据...")
adv_create = ble_device:adv_create({
addr_mode = ble.PUBLIC,
channel_map = ble.CHNLS_ALL,
intv_min = 120,
intv_max = 120,
adv_data = {
{ ble.FLAGS, string.char(0x06) }, -- BLE标志
{ ble.COMPLETE_LOCAL_NAME, config.device_name }, -- 设备名称
}
})
if not adv_create then
log.error("BLE_INIT", "广播配置失败")
return false
end
log.info("BLE_INIT", "广播配置成功")
-- 开始广播
ble_device:adv_start()
log.info("BLE_INIT", "BLE广播已启动,设备名称:", config.device_name)
return true
end
-- 主任务处理函数
local function ble_main_task_func()
local result, msg
while true do
result = ble_init()
if not result then
log.error("ble_main_task_func", "ble_init error")
goto EXCEPTION_PROC
end
while true do
msg = sys.waitMsg(TASK_NAME, "BLE_EVENT")
if not msg then
log.error("ble_main_task_func", "waitMsg timeout")
goto EXCEPTION_PROC
end
if msg[2] == "CONNECT" then
local conn_param = msg[3]
log.info("BLE", "设备连接成功: " .. conn_param.addr:toHex())
elseif msg[2] == "DISCONNECTED" then
log.info("BLE", "设备断开连接,原因: " .. msg[3])
-- 通知FOTA模块连接断开
ble_fota_main.proc_disconnect()
break
-- 收到中心设备的写请求,将写的数据发给ble_fota_main模块处理
elseif msg[2] == "WRITE_REQ" then
local ble_param = msg[3]
local service_uuid = ble_param.uuid_service:toHex()
local char_uuid = ble_param.uuid_characteristic:toHex()
local data = ble_param.data
log.info("BLE", "收到写请求: " .. service_uuid .. " " .. char_uuid .. " " .. data:toHex())
ble_fota_main.proc(service_uuid, char_uuid, data)
end
end
::EXCEPTION_PROC::
log.error("ble_main_task_func", "异常退出, 5秒后重新开启广播")
-- 停止广播
if ble_device then
ble_device:adv_stop()
end
-- 清空此task绑定的消息队列中的未处理的消息
sys.cleanMsg(TASK_NAME)
-- 5秒后跳转到循环体开始位置,自动重试
sys.wait(5000)
end
end
-- 启动主任务
sys.taskInitEx(ble_main_task_func, TASK_NAME)
7.1.3 Python 升级工具(ble_fota_tool.py)
#!/usr/bin/env python3
import asyncio
import struct
import time
from bleak import BleakScanner, BleakClient
# 完整UUID定义
FOTA_SERVICE_UUID = "0000f000-0000-1000-8000-00805f9b34fb" # 完整服务UUID
FOTA_CMD_CHAR_UUID = "0000f001-0000-1000-8000-00805f9b34fb" # 完整命令特征UUID
FOTA_DATA_CHAR_UUID = "0000f002-0000-1000-8000-00805f9b34fb" # 完整数据特征UUID
# Command definitions
CMD_START_UPGRADE = 0x01
CMD_END_UPGRADE = 0x02
# 每包数据大小
MAX_PACKET_SIZE = 200
class SimpleFotaTool:
def __init__(self, device_name, firmware_path):
self.device_name = device_name
self.firmware_path = firmware_path
self.client = None
self.firmware_data = None
self.total_size = 0
self.target_device = None
async def load_firmware(self):
"""Load firmware file into memory"""
try:
with open(self.firmware_path, 'rb') as f:
self.firmware_data = f.read()
self.total_size = len(self.firmware_data)
print(f" 固件加载完成,大小: {self.total_size} 字节")
return True
except Exception as e:
print(f" 加载固件失败: {e}")
return False
async def scan_device(self):
"""Scan for the target device"""
print("\n2. 扫描目标设备...")
print(" 正在扫描,请等待...")
try:
devices = await BleakScanner.discover(timeout=10.0)
found_devices = []
for device in devices:
if device.name and self.device_name in device.name:
found_devices.append(device)
print(f" 找到匹配设备: {device.name} (地址: {device.address})")
if not found_devices:
print(f" 未找到设备: {self.device_name}")
return None
# 选择第一个匹配的设备
self.target_device = found_devices[0]
print(f" 选择设备: {self.target_device.name} (地址: {self.target_device.address})")
return self.target_device
except Exception as e:
print(f" 扫描失败: {e}")
return None
async def connect_device(self, device):
"""Connect to the target device"""
print("\n3. 建立BLE连接...")
try:
self.client = BleakClient(device.address)
await self.client.connect(timeout=30.0)
print(f" 连接成功,状态: {self.client.is_connected}")
# 调试:打印所有服务和特征值
print("\n4. 发现服务和特征值...")
# 兼容不同版本的Bleak库
try:
# 新版本Bleak
services = self.client.services
except AttributeError:
# 旧版本Bleak
services = await self.client.get_services()
fota_service_found = False
for service in services:
if service.uuid.lower() == FOTA_SERVICE_UUID.lower():
fota_service_found = True
print(f" 找到FOTA服务: {service.uuid}")
for char in service.characteristics:
print(f" 特征值: {char.uuid} - 属性: {char.properties}")
if char.uuid.lower() == FOTA_CMD_CHAR_UUID.lower():
print(f" -> 命令特征值 (可写)")
elif char.uuid.lower() == FOTA_DATA_CHAR_UUID.lower():
print(f" -> 数据特征值 (可写)")
if not fota_service_found:
print(" 警告: 未找到FOTA服务,但继续尝试...")
return True
except Exception as e:
print(f" 连接失败: {e}")
return False
async def write_characteristic(self, uuid, data):
"""写入特征值"""
try:
await self.client.write_gatt_char(uuid, data, response=True)
# 正确提取短UUID(从完整UUID中提取f001/f002部分)
# 完整UUID格式: "0000f001-0000-1000-8000-00805f9b34fb"
# 我们想要提取 "f001" 部分
short_uuid = uuid.split('-')[0][-4:]
print(f" 写入特征值 {short_uuid},数据长度: {len(data)} 字节")
return True
except Exception as e:
print(f" 写入特征值失败: {e}")
return False
async def send_start_command(self):
"""发送开始升级命令"""
print("\n5. 发送开始升级命令...")
# 连接成功后短暂延时
print(" 连接成功,等待1秒...")
await asyncio.sleep(1)
# 发送开始升级命令
start_cmd = struct.pack("<BI", CMD_START_UPGRADE, self.total_size)
if not await self.write_characteristic(FOTA_CMD_CHAR_UUID, start_cmd):
return False
print(" 开始命令发送完成")
await asyncio.sleep(1) # 等待设备准备
return True
async def send_firmware_data(self):
"""Send firmware data in chunks with optimized delay"""
print("\n6. 分块传输固件数据...")
sent_bytes = 0
start_time = time.time()
packet_count = 0
# 优化延时:减少到100ms以提高速度
PACKET_DELAY = 0.1
while sent_bytes < self.total_size:
chunk_size = min(MAX_PACKET_SIZE, self.total_size - sent_bytes)
chunk = self.firmware_data[sent_bytes:sent_bytes + chunk_size]
if not await self.write_characteristic(FOTA_DATA_CHAR_UUID, chunk):
return False
sent_bytes += chunk_size
packet_count += 1
# 短暂延时,避免数据丢失
await asyncio.sleep(PACKET_DELAY)
# 每20个数据包显示一次进度
if packet_count % 20 == 0 or sent_bytes >= self.total_size:
progress = (sent_bytes / self.total_size) * 100
elapsed = time.time() - start_time
speed = sent_bytes / elapsed / 1024 if elapsed > 0 else 0
remaining_time = (self.total_size - sent_bytes) / (sent_bytes / elapsed) if sent_bytes > 0 else 0
print(f" 进度: {progress:.1f}% - {speed:.1f} KB/s - 已发送 {packet_count} 包 - 预计剩余: {remaining_time:.1f}s")
total_time = time.time() - start_time
avg_speed = self.total_size / total_time / 1024
print(f" 数据传输完成! 总时间: {total_time:.1f}s, 平均速度: {avg_speed:.1f} KB/s")
return True
async def end_upgrade(self):
"""Send end upgrade command"""
print("\n7. 发送结束升级命令...")
end_cmd = struct.pack("B", CMD_END_UPGRADE)
if not await self.write_characteristic(FOTA_CMD_CHAR_UUID, end_cmd):
return False
print(" 结束命令发送完成")
# 等待设备处理
print("\n8. 等待设备处理升级...")
await asyncio.sleep(5) # 给设备足够时间处理
return True
async def run(self):
"""Main execution flow"""
# 1. 加载固件文件
print("\n1. 加载固件文件...")
if not await self.load_firmware():
return False
# 2. 扫描目标设备
device = await self.scan_device()
if not device:
return False
# 3. 连接设备
if not await self.connect_device(device):
return False
try:
# 4. 发送开始命令
if not await self.send_start_command():
return False
# 5. 发送固件数据
if not await self.send_firmware_data():
return False
# 6. 结束升级
if not await self.end_upgrade():
return False
print("\n" + "="*50)
print("升级流程完成! 设备应该正在重启...")
print("="*50)
return True
except Exception as e:
print(f" 升级过程中出现错误: {e}")
return False
finally:
# 断开连接
if self.client and self.client.is_connected:
await self.client.disconnect()
print(" 已断开连接")
async def main():
import argparse
parser = argparse.ArgumentParser(description="蓝牙FOTA升级工具")
parser.add_argument("-f", "--firmware", required=True, help="固件文件路径")
parser.add_argument("-d", "--device", default="Air6201_FOTA", help="设备名称")
args = parser.parse_args()
tool = SimpleFotaTool(args.device, args.firmware)
success = await tool.run()
return success
if __name__ == "__main__":
success = asyncio.run(main())
exit(0 if success else 1)
7.2 升级包制作
升级包制作是 FOTA 功能的关键步骤,确保按照升级包制作流程操作。
7.3 功能验证
7.3.1 文件系统方式
代码中修改 fota_mode 参数为"file"方式:
-- 选择FOTA升级方式:"file" 或 "packet"
-- 1. file方式:将升级包数据先写入本地文件,然后调用fota.file()进行升级
-- 2. packet方式:直接使用fota.packet()处理分段数据,不写入文件,适合差分升级
local fota_mode = "file" -- 默认使用file方式
模组 luatools 日志如下:
日志解读:
- 设备启动,蓝牙服务初始化成功
- 收到开始升级命令,初始化 FOTA 分区和缓冲区
- 开始分段接收升级数据保存到文件中
- 所有数据接收完成,验证固件完整性
- 运行升级流程,升级完成,设备重启
- 重启后新版本正常运行,确认升级成功





上位机工具日志:
进入到 ble_fota 目录下运行 cmd 命令进入命令行程序,然后等待设备运行并启动蓝牙广播后运行下面命令:
python ble_fota_tool.py -f ble_fota.bin


7.3.2 使用分包方式升级:
代码中修改 fota_mode 参数为"packet"方式:
-- 选择FOTA升级方式:"file" 或 "packet"
-- 1. file方式:将升级包数据先写入本地文件,然后调用fota.file()进行升级
-- 2. packet方式:直接使用fota.packet()处理分段数据,不写入文件,适合差分升级
local fota_mode = "packet" -- 默认使用file方式
模组 luatools 日志如下:
日志解读:
- 设备启动,蓝牙服务初始化成功
- 收到开始升级命令,初始化 FOTA 分区和缓冲区
- 开始分段接收升级数据并通过 fota.run 接口写入 fota 分区
- 所有数据接收完成,验证固件完整性
- 运行升级流程,升级完成,设备重启
- 重启后新版本正常运行,确认升级成功





上位机工具日志:
进入到 ble_fota 目录下运行 cmd 命令进入命令行程序,然后等待设备运行并启动蓝牙广播后运行下面命令:
python ble_fota_tool.py -f ble_fota.bin


八、常见问题与解决方案
8.1 蓝牙连接失败
- 问题:Python 脚本无法找到或连接设备
-
解决方案:
-
确认设备已上电并正常启动
- 检查设备名称是否为"Air6201_FOTA"
- 确认电脑蓝牙功能已开启
8.2 数据传输中断
- 问题:升级过程中数据传输中断
-
解决方案:
-
确保设备与电脑之间没有障碍物
- 避免其他蓝牙设备的干扰
- 检查设备电量是否充足
- 重新运行升级工具
8.3 升级后版本未更新
- 问题:升级完成后版本号未变化
-
解决方案:
-
检查升级包制作是否正确
- 确认 VERSION 变量已修改
- 查看设备日志确认升级是否成功
- 尝试完全断电后重新上电
8.4 Python 环境问题
- 问题:Python 脚本无法运行
-
解决方案:
-
确认已安装 Python 3.x
- 安装必要的库:
pip install bleak - 检查脚本文件路径是否正确
- 确保有足够的权限运行脚本
九、总结
至此,本教程详细介绍了如何使用蓝牙 FOTA 功能实现 Air6201 设备的无线固件升级。蓝牙 FOTA 提供了一种便捷、低功耗的固件更新方案,特别适合需要远程维护的物联网设备。