跳转至

6201-蓝牙升级教程文档

作者:孟伟 | 最后修改:2026-02-27

一、蓝牙 FOTA 概述

蓝牙 FOTA(Firmware Over The Air)是通过蓝牙低功耗(BLE)技术实现设备固件远程无线升级的功能。本教程基于 LuatOS 系统,演示了 Air6201 系列模块通过 BLE 进行固件升级的演示方案。

1.1 蓝牙 FOTA 有什么用?

蓝牙 FOTA 扩展功能为物联网设备提供无线固件升级解决方案,主要特点包括:

  • 无线升级:无需有线连接,通过 BLE 无线通信实现固件更新
  • 分段传输:支持大文件分片传输,适应 BLE 的数据包限制
  • 安全可靠:支持固件完整性验证,确保升级过程安全

二、演示功能概述

Air6201 系列模块通过 BLE(蓝牙低功耗)进行固件远程升级(FOTA)的完整实现方案,支持两种升级方式:

  1. 文件写入方式:将接收到的升级包数据先保存到本地临时文件,完成数据接收后再执行升级

  2. 优点:适合完整固件升级,逻辑清晰,易于调试

  3. 缺点:需要额外的文件系统分区存储空间保存完整固件
  4. 适用场景:存储空间充足的设备。
  5. 分段写入方式:直接将接收到的升级包数据通过 fota.run()函数处理,无需保存到本地文件

  6. 优点:节省存储空间,适合差分升级

  7. 缺点:对内存要求较高,需要足够的内存缓冲区实时处理数据
  8. 适用场景:存储空间有限的设备

本 demo 适用场景:

  • 智能硬件、可穿戴设备等蓝牙连接设备的固件升级
  • 需要自定义升级流程的应用场景
  • 无网络环境下的设备固件更新

三、准备硬件环境

  1. Air6201 核心板一块
  2. TYPE-C USB 数据线一根,接线方式为:

  3. TYPE-C USB 数据线直接插到核心板的 TYPE-C USB 座子,另一端连接电脑 USB 口

四、软件环境

在开始实践本示例之前,先筹备一下软件环境:

  1. 烧录工具: Luatools 工具;需要注意的是 luatools 工具版本必须为 3.1.10 及以上版本,否则制作的升级包没办法升级。
  2. 内核固件:Air6201 V2001 版本固件以上(25/12/17 日后的固件)
  3. LuatOS 需要的脚本和资源文件

脚本和资源文件:https://gitee.com/openLuat/LuatOS/tree/master/module/Air6201/demo/fota/ble_fota

准备好软件环境之后,接下来查看如何烧录项目文件到 Air6201 核心板,将本篇文章中演示使用的项目文件烧录到 Air6201 核心板中。

  1. Python 环境:Python 3 环境(用于运行 ble_fota_tool.py 发送升级包)
  2. 需要安装 bleak 库,可通过以下命令安装:
pip install bleak

五、API 接口说明

详细 API 文档请参考:

ble:https://docs.openluat.com/osapi/core/ble/

fota:https://docs.openluat.com/osapi/core/fota/

六、蓝牙 GATT 服务设计

6.1 服务与特征值定义

6.1.1 命令报文格式

开始升级命令

  • 命令码:0x01
  • 格式:[命令码(1字节)] + [文件大小(4字节)]
  • 示例:01 75 15 00 00 表示开始升级,固件大小为 5493 字节
  • 说明:

  • 命令码固定为 0x01

  • 文件大小为 4 字节小端序
  • 设备收到此命令后,初始化 FOTA 子系统并准备接收数据

结束升级命令

  • 命令码:0x02
  • 格式:[命令码(1字节)]
  • 示例:02
  • 说明:

  • 命令码固定为 0x02

  • 设备收到此命令后,验证固件完整性并执行升级

6.1.2 数据报文格式

  • 格式:[固件数据(n字节)]
  • 默认长度:200 字节(可通过 ble_fota_tool.py 配置调整,最大 256)
  • 说明:

  • 固件数据为二进制格式

  • 每包数据大小不超过 BLE 数据包最大长度:Air6201 系列模组硬件支持最大 256 字节,考虑到稳定性建议保持默认的 200 字节
  • 设备收到数据后,根据所选的 FOTA 方式进行处理:
    • 文件方式:将数据写入临时文件
    • 分段方式:直接通过 fota.run()处理数据

6.2 升级流程说明

用法:

  1. 先把脚本和固件烧录到 Air6201 模块中,并确认设备正常启动
  2. 模块启动后会自动开启 BLE 广播,广播名称为"Air6201_FOTA"
  3. 在电脑端操作:运行 ble_fota_tool.py 脚本连接设备并发送升级固件

  4. 注意:确保升级文件名为正确格式,并且与 ble_fota_tool.py 在同一目录下

  5. 观察日志输出确认升级进度
  6. 模块接收并验证固件成功后,会自动重启并应用新固件

BLE 通讯过程说明:

蓝牙 FOTA 升级通过 BLE 特征值进行命令控制和数据传输:

  1. 上位机通过 BLE 扫描并连接名为"Air6201_FOTA"的设备
  2. 上位机向命令特征值(F001)发送开始升级命令(0x01)和固件大小
  3. 设备初始化 FOTA 功能并准备接收数据
  4. 上位机向数据特征值(F002)分包发送固件数据
  5. 设备接收并保存数据到临时文件
  6. 上位机发送结束升级命令(0x02)
  7. 设备验证固件完整性并执行 FOTA 升级流程
  8. 升级成功后设备自动重启

6.3、蓝牙 FOTA 升级工具使用说明

6.3.1 工具功能

  • 连接 Air6201 设备并发送升级包
  • 显示升级进度和状态信息
  • 支持自定义升级包文件名

6.3.2 使用方法

  1. 确保 Air6201 设备处于蓝牙广播状态,设备名称为"Air6201_FOTA"
  2. 在命令行中运行 ble_fota_tool.py 脚本,指定升级包文件名:
  3. bash
  4. python ble_fota_tool.py -f 升级包文件名

6.3.3 参数说明

  • -d--device:指定目标设备名称(默认值为"Air6201_FOTA")
  • -f--firmware:指定固件文件路径(必需参数)

6.3.4 工作流程

当运行脚本后,它会自动执行以下步骤:

  1. 加载指定的固件文件
  2. 扫描并发现蓝牙设备
  3. 连接到目标设备
  4. 发送开始升级命令
  5. 发送固件数据到设备
  6. 发送结束升级命令
  7. 等待设备处理升级
  8. 断开连接并完成升级过程

七、代码示例介绍

下面将演示蓝牙 FOTA 的两种典型场景:文件方式升级和分包方式升级,用户可根据实际需求选择配置方式。

7.1 主要代码分析

7.1.1 文件系统方式 FOTA 升级业务逻辑模块(ble_file_fota.lua)

FOTA 业务逻辑模块,负责升级流程控制、命令处理和文件操作;:

--[[
-- 蓝牙FOTA升级功能(文件写入方式)
-- 提供通过蓝牙低功耗(BLE)接收升级包数据进行固件升级的功能

本文件为FOTA业务逻辑处理模块,核心业务逻辑为:
1. 处理接收到的BLE写入请求数据
2. 实现FOTA升级流程的控制
3. 管理升级状态和文件操作

本文件的对外接口有1个:
1. ble_file_fota.proc(service_uuid, char_uuid, data): 处理接收到的BLE写入请求数据

依赖模块:
- ble_main: 用于提供BLE服务和事件处理
]]

local ble_file_fota = {}

-- 升级状态管理
local upgrade_state = {
    is_upgrading = false,          -- 是否正在升级
    total_size = 0,                -- 总文件大小(字节)
    received_size = 0,             -- 已接收大小(字节)
    upgrade_file = "/ble_fota.bin" -- 临时升级文件路径
}

-- 配置参数
local config = {
    service_uuid = "F000",   -- FOTA服务UUID(短格式)
    char_uuid_cmd = "F001",  -- 命令特征值UUID
    char_uuid_data = "F002", -- 数据特征值UUID
    max_packet_size = 200    -- BLE数据包最大长度(字节)
}
local function ble_reboot()
    -- 完成FOTA流程并重启
    fota.finish(true)
    log.info("FOTA_CMD", "正在重启设备...")
    rtos.reboot()
end
-- 处理FOTA命令
-- @param cmd_data 命令数据,格式:[命令码(1字节)] 或 [命令码(1字节) + 文件大小(4字节)]
local function handle_command(cmd_data)
    log.info("FOTA_CMD", "收到命令数据:", cmd_data:toHex(), "长度:", #cmd_data)

    -- 检查命令数据是否有效
    if #cmd_data < 1 then
        log.error("FOTA_CMD", "命令数据为空")
        return
    end

    -- 解析命令码(第一个字节)
    local cmd = cmd_data:byte(1)
    log.info("FOTA_CMD", "解析命令码:", cmd, string.format("(0x%02X)", cmd))

    -- 命令0x01:开始升级
    if cmd == 0x01 then
        log.info("FOTA_CMD", "处理开始升级命令")

        -- 检查命令格式:需要至少5字节(1字节命令码 + 4字节文件大小)
        if #cmd_data >= 5 then
            -- 解析文件大小(小端序,从第2字节开始)
            local total_size = string.unpack("<I4", cmd_data, 2)
            log.info("FOTA_CMD", "文件总大小:", total_size, "字节")

            -- 初始化FOTA子系统
            log.info("FOTA_CMD", "初始化FOTA子系统...")
            if fota.init() then
                log.info("FOTA_CMD", "FOTA初始化成功")

                -- 等待FOTA底层准备就绪
                log.info("FOTA_CMD", "等待FOTA底层准备...")
                -- 等待FOTA底层准备就绪,最多等待10秒
                local wait_count = 0
                local wait_ok = false
                while wait_count < 100 do -- 最多轮询100次,每次100ms,共10秒
                    if fota.wait() then
                        wait_ok = true
                        break
                    end
                    sys.wait(100)
                    wait_count = wait_count + 1
                end

                if wait_ok then
                    log.info("FOTA_CMD", "FOTA底层准备就绪")

                    -- 删除旧的临时文件(如果存在)
                    if os.remove(upgrade_state.upgrade_file) then
                        log.info("FOTA_CMD", "已清理旧临时文件")
                    end

                    -- 更新升级状态
                    upgrade_state.is_upgrading = true
                    upgrade_state.total_size = total_size
                    upgrade_state.received_size = 0

                    log.info("FOTA_CMD", "升级状态已设置",
                        "总大小:", upgrade_state.total_size,
                        "临时文件:", upgrade_state.upgrade_file)
                    log.info("FOTA_CMD", "准备接收固件数据...")
                else
                    log.error("FOTA_CMD", "FOTA底层准备超时")
                    fota.finish(false)
                    upgrade_state.is_upgrading = false
                end
            else
                log.error("FOTA_CMD", "FOTA初始化失败")
            end
        else
            log.error("FOTA_CMD", "开始命令格式错误,长度不足")
        end

        -- 命令0x02:结束升级
    elseif cmd == 0x02 then
        log.info("FOTA_CMD", "处理结束升级命令")

        -- 检查是否处于升级状态
        if not upgrade_state.is_upgrading then
            log.warn("FOTA_CMD", "未处于升级状态,忽略结束命令")
            return
        end

        -- 验证文件完整性
        log.info("FOTA_CMD", "验证文件完整性...")
        log.info("FOTA_CMD", "已接收:", upgrade_state.received_size, "字节")
        log.info("FOTA_CMD", "应接收:", upgrade_state.total_size, "字节")

        if upgrade_state.received_size == upgrade_state.total_size then
            log.info("FOTA_CMD", "文件完整性验证通过")

            -- 执行FOTA升级
            log.info("FOTA_CMD", "开始执行FOTA升级...")
            local result, isDone = fota.file(upgrade_state.upgrade_file)
            log.info("FOTA_CMD", "FOTA升级结果:", "result:", result, "isDone:", isDone)

            if result and isDone then
                log.info("FOTA_CMD", " FOTA升级成功!")

                -- 延迟重启,给用户一些反应时间
                log.info("FOTA_CMD", "2秒后设备将自动重启...,重启后通过日志判断最终是否升级成功")

                -- 延迟2秒后重启设备
                sys.timerStart(ble_reboot, 2000)
            else
                log.error("FOTA_CMD", "FOTA升级失败")
            end
        else
            log.error("FOTA_CMD", "文件不完整,升级失败")
        end

        -- 清理升级状态(无论成功还是失败)
        log.info("FOTA_CMD", "清理升级状态...")
        upgrade_state.is_upgrading = false

        -- 删除临时文件
        if upgrade_state.upgrade_file then
            if os.remove(upgrade_state.upgrade_file) then
                log.info("FOTA_CMD", "已删除临时文件")
            else
                log.warn("FOTA_CMD", "删除临时文件失败")
            end
        end

        -- 结束FOTA流程
        fota.finish(false)
        log.info("FOTA_CMD", "升级流程结束")
    else
        log.warn("FOTA_CMD", "未知命令码:", cmd, string.format("(0x%02X)", cmd))
    end
end

-- 处理FOTA数据
-- @param data 固件数据块
local function handle_data(data)
    log.info("FOTA_DATA", "收到数据包,长度:", #data, "字节")

    -- 检查是否处于升级状态
    if not upgrade_state.is_upgrading then
        log.warn("FOTA_DATA", "未处于升级状态,忽略数据")
        return
    end

    -- 保存数据到临时文件
    log.info("FOTA_DATA", "写入文件:", upgrade_state.upgrade_file)
    local file = io.open(upgrade_state.upgrade_file, "ab")
    if file then
        -- 写入数据
        file:write(data)
        file:close()

        -- 更新接收状态
        upgrade_state.received_size = upgrade_state.received_size + #data

        -- 计算并显示进度
        local progress = math.floor((upgrade_state.received_size / upgrade_state.total_size) * 100)

        -- 每50个数据包或完成时打印进度
        if upgrade_state.received_size % (config.max_packet_size * 50) == 0 or
            upgrade_state.received_size >= upgrade_state.total_size then
            log.info("FOTA_DATA", "升级进度:", progress, "%",
                "(", upgrade_state.received_size, "/", upgrade_state.total_size, ")")
        end

        log.info("FOTA_DATA", "数据写入成功,当前总计:", upgrade_state.received_size, "字节")
    else
        log.error("FOTA_DATA", "打开文件失败:", upgrade_state.upgrade_file)

        -- 文件操作失败,终止升级
        upgrade_state.is_upgrading = false
        fota.finish(false)
    end
end

-- 处理接收到的BLE写入请求数据
-- @param service_uuid 服务UUID
-- @param char_uuid 特征值UUID
-- @param data 写入的数据
function ble_file_fota.proc(service_uuid, char_uuid, data)
    log.info("ble_file_fota", "处理写入数据", service_uuid, char_uuid, data:toHex())

    -- 简化的UUID匹配逻辑:检查UUID是否包含我们的短UUID
    local is_service_match = string.find(service_uuid:lower(), config.service_uuid:lower())
    local is_cmd_match = string.find(char_uuid:lower(), config.char_uuid_cmd:lower())
    local is_data_match = string.find(char_uuid:lower(), config.char_uuid_data:lower())

    log.info("ble_file_fota", "UUID匹配结果:",
        "服务匹配:", is_service_match,
        "命令匹配:", is_cmd_match,
        "数据匹配:", is_data_match)

    if is_service_match then
        if is_cmd_match then
            -- 命令特征值:处理FOTA命令
            log.info("ble_file_fota", "命令特征值匹配,处理命令")
            handle_command(data)
        elseif is_data_match then
            -- 数据特征值:处理FOTA数据
            log.info("ble_file_fota", "数据特征值匹配,处理数据")
            handle_data(data)
        else
            log.warn("ble_file_fota", "未知的特征值UUID:", char_uuid)
        end
    else
        log.warn("ble_file_fota", "未知的服务UUID:", service_uuid)
    end
end

function ble_file_fota.proc_disconnect()
    log.info("ble_file_fota", "处理连接断开事件")

    -- 如果正在升级,连接断开则终止升级
    if upgrade_state.is_upgrading then
        log.error("ble_file_fota", "升级过程中连接断开,终止升级")
        upgrade_state.is_upgrading = false

        -- 删除临时文件
        if upgrade_state.upgrade_file then
            os.remove(upgrade_state.upgrade_file)
        end

        -- 结束FOTA流程
        fota.finish(false)
    end
end

return ble_file_fota

7.1.2 分包方式 FOTA 升级业务逻辑模块(ble_packet_fota.lua)

--[[
-- 蓝牙FOTA升级功能(分段写入方式)
-- 提供通过蓝牙低功耗(BLE)接收升级包数据进行固件升级的功能

本文件为FOTA业务逻辑处理模块,核心业务逻辑为:
1. 处理接收到的BLE写入请求数据
2. 实现FOTA升级流程的控制(分段写入方式)
3. 管理升级状态和分段数据操作

本文件的对外接口有1个:
1. ble_packet_fota.proc(service_uuid, char_uuid, data): 处理接收到的BLE写入请求数据

依赖模块:
- ble_main: 用于提供BLE服务和事件处理
]]

local ble_packet_fota = {}

-- 升级状态管理
local upgrade_state = {
    is_upgrading = false, -- 是否正在升级
    total_size = 0,       -- 总文件大小(字节)
    received_size = 0,    -- 已接收大小(字节)
    upgrade_packet = 0    -- 升级包计数器
}

-- 配置参数
local config = {
    service_uuid = "F000",   -- FOTA服务UUID(短格式)
    char_uuid_cmd = "F001",  -- 命令特征值UUID
    char_uuid_data = "F002", -- 数据特征值UUID
    max_packet_size = 200    -- BLE数据包最大长度(字节)
}
local function ble_reboot()
    -- 完成FOTA流程并重启
    fota.finish(true)
    log.info("FOTA_CMD", "正在重启设备...")
    rtos.reboot()
end
-- 处理FOTA命令
-- @param cmd_data 命令数据,格式:[命令码(1字节)] 或 [命令码(1字节) + 文件大小(4字节)]
local function handle_command(cmd_data)
    log.info("FOTA_CMD", "收到命令数据:", cmd_data:toHex(), "长度:", #cmd_data)

    -- 检查命令数据是否有效
    if #cmd_data < 1 then
        log.error("FOTA_CMD", "命令数据为空")
        return
    end

    -- 解析命令码(第一个字节)
    local cmd = cmd_data:byte(1)
    log.info("FOTA_CMD", "解析命令码:", cmd, string.format("(0x%02X)", cmd))

    -- 命令0x01:开始升级
    if cmd == 0x01 then
        log.info("FOTA_CMD", "处理开始升级命令")

        -- 检查命令格式:需要至少5字节(1字节命令码 + 4字节文件大小)
        if #cmd_data >= 5 then
            -- 解析文件大小(小端序,从第2字节开始)
            local total_size = string.unpack("<I4", cmd_data, 2)
            log.info("FOTA_CMD", "文件总大小:", total_size, "字节")

            -- 初始化FOTA子系统
            log.info("FOTA_CMD", "初始化FOTA子系统...")
            if fota.init() then
                log.info("FOTA_CMD", "FOTA初始化成功")

                -- 等待FOTA底层准备就绪
                log.info("FOTA_CMD", "等待FOTA底层准备...")
                -- 等待FOTA底层准备就绪,最多等待10秒
                local wait_count = 0
                local wait_ok = false
                while wait_count < 100 do -- 最多轮询100次,每次100ms,共10秒
                    if fota.wait() then
                        wait_ok = true
                        break
                    end
                    sys.wait(100)
                    wait_count = wait_count + 1
                end

                if wait_ok then
                    log.info("FOTA_CMD", "FOTA底层准备就绪")

                    -- 更新升级状态
                    upgrade_state.is_upgrading = true
                    upgrade_state.total_size = total_size
                    upgrade_state.received_size = 0
                    upgrade_state.upgrade_packet = 0

                    log.info("FOTA_CMD", "升级状态已设置",
                        "总大小:", upgrade_state.total_size)
                    log.info("FOTA_CMD", "准备接收固件数据...")
                else
                    log.error("FOTA_CMD", "FOTA底层准备超时")
                    fota.finish(false)
                    upgrade_state.is_upgrading = false
                end
            else
                log.error("FOTA_CMD", "FOTA初始化失败")
            end
        else
            log.error("FOTA_CMD", "开始命令格式错误,长度不足")
        end

        -- 命令0x02:结束升级(通知升级包发完)
    elseif cmd == 0x02 then
        log.info("FOTA_CMD", "处理结束升级命令")

        -- 检查是否处于升级状态
        if not upgrade_state.is_upgrading then
            log.warn("FOTA_CMD", "未处于升级状态,忽略结束命令")
            return
        end

        -- 验证文件完整性
        log.info("FOTA_CMD", "验证文件完整性...")
        log.info("FOTA_CMD", "已接收:", upgrade_state.received_size, "字节")
        log.info("FOTA_CMD", "应接收:", upgrade_state.total_size, "字节")

        if upgrade_state.received_size == upgrade_state.total_size then
            log.info("FOTA_CMD", "文件完整性验证通过")
            log.info("FOTA_CMD", "升级数据已全部接收,等待升级完成...")

            -- 等待底层校验结束
            local success = false
            for i = 1, 30 do -- 最多等待3秒
                sys.wait(100)
                local succ, fotaDone = fota.isDone()
                if not succ then
                    log.error("FOTA_CMD", "校验过程出错")
                    fota.finish(false)
                    upgrade_state.is_upgrading = false
                    break
                end
                if fotaDone then
                    log.info("FOTA_CMD", "FOTA升级成功!")

                    -- 延迟重启,给用户一些反应时间
                    log.info("FOTA_CMD", "2秒后设备将自动重启...,重启后通过日志判断最终是否升级成功")

                    -- 延迟2秒后重启设备
                    sys.timerStart(ble_reboot, 2000)
                    success = true
                    break
                end
            end

            if not success then
                log.error("FOTA_CMD", "校验超时")
                fota.finish(false)
                upgrade_state.is_upgrading = false
            end
        else
            log.error("FOTA_CMD", "文件不完整,升级失败")
            -- 清理升级状态
            upgrade_state.is_upgrading = false
            fota.finish(false)
        end

        log.info("FOTA_CMD", "结束升级命令处理完成")
    else
        log.warn("FOTA_CMD", "未知命令码:", cmd, string.format("(0x%02X)", cmd))
    end
end

-- 处理FOTA数据
-- @param data 固件数据块
local function handle_data(data)
    log.info("FOTA_DATA", "收到数据包,长度:", #data, "字节")

    -- 检查是否处于升级状态
    if not upgrade_state.is_upgrading then
        log.warn("FOTA_DATA", "未处于升级状态,忽略数据")
        return
    end

    -- 直接使用fota.run()处理分段数据,不写入文件
    log.info("FOTA_DATA", "处理分段数据,包序号:", upgrade_state.upgrade_packet)
    local result, isDone = fota.run(data)
    log.info("FOTA_DATA", "分段写入结果:", "result:", result, "isDone:", isDone)

    if result then
        -- 更新接收状态
        upgrade_state.received_size = upgrade_state.received_size + #data
        upgrade_state.upgrade_packet = upgrade_state.upgrade_packet + 1

        -- 计算并显示进度
        local progress = math.floor((upgrade_state.received_size / upgrade_state.total_size) * 100)

        -- 每50个数据包或完成时打印进度
        if upgrade_state.received_size % (config.max_packet_size * 50) == 0 or
            upgrade_state.received_size >= upgrade_state.total_size then
            log.info("FOTA_DATA", "升级进度:", progress, "%",
                "(", upgrade_state.received_size, "/", upgrade_state.total_size, ")")
        end

        log.info("FOTA_DATA", "数据写入成功,当前总计:", upgrade_state.received_size, "字节")

        -- 如果所有数据都已接收,检查升级是否完成
        if upgrade_state.received_size >= upgrade_state.total_size then
            log.info("FOTA_DATA", "所有数据已接收,等待升级完成...")
        end
    else
        log.error("FOTA_DATA", "分段写入失败")

        -- 分段写入失败,终止升级
        upgrade_state.is_upgrading = false
        fota.finish(false)
    end
end

-- 处理接收到的BLE写入请求数据
-- @param service_uuid 服务UUID
-- @param char_uuid 特征值UUID
-- @param data 写入的数据
function ble_packet_fota.proc(service_uuid, char_uuid, data)
    log.info("ble_packet_fota", "处理写入数据", service_uuid, char_uuid, data:toHex())

    -- 简化的UUID匹配逻辑:检查UUID是否包含我们的短UUID
    local is_service_match = string.find(service_uuid:lower(), config.service_uuid:lower())
    local is_cmd_match = string.find(char_uuid:lower(), config.char_uuid_cmd:lower())
    local is_data_match = string.find(char_uuid:lower(), config.char_uuid_data:lower())

    log.info("ble_packet_fota", "UUID匹配结果:",
        "服务匹配:", is_service_match,
        "命令匹配:", is_cmd_match,
        "数据匹配:", is_data_match)

    if is_service_match then
        if is_cmd_match then
            -- 命令特征值:处理FOTA命令
            log.info("ble_packet_fota", "命令特征值匹配,处理命令")
            handle_command(data)
        elseif is_data_match then
            -- 数据特征值:处理FOTA数据
            log.info("ble_packet_fota", "数据特征值匹配,处理数据")
            handle_data(data)
        else
            log.warn("ble_packet_fota", "未知的特征值UUID:", char_uuid)
        end
    else
        log.warn("ble_packet_fota", "未知的服务UUID:", service_uuid)
    end
end

-- 处理BLE连接断开事件
-- @return nil
function ble_packet_fota.proc_disconnect()
    log.info("ble_packet_fota", "处理连接断开事件")

    -- 如果正在升级,连接断开则终止升级
    if upgrade_state.is_upgrading then
        log.error("ble_packet_fota", "升级过程中连接断开,终止升级")
        upgrade_state.is_upgrading = false

        -- 结束FOTA流程
        fota.finish(false)
    end
end

return ble_packet_fota

7.1.2 BLE 服务主功能模块(ble_main.lua)

--[[
-- BLE服务主功能模块
-- 提供BLE服务的初始化、配置和事件处理
-- 不包含FOTA业务逻辑,仅处理BLE相关功能

依赖模块:
- ble_file_fota: 用于处理FOTA相关业务逻辑(文件写入方式)
- ble_packet_fota: 用于处理FOTA相关业务逻辑(分段写入方式)
]]

-- 选择FOTA升级方式:"file" 或 "packet"
-- 1. file方式:将升级包数据先写入本地文件,然后调用fota.file()进行升级
-- 2. packet方式:直接使用fota.packet()处理分段数据,不写入文件,适合差分升级
local fota_mode = "packet" -- 默认使用file方式

-- 根据选择加载对应的FOTA模块
local ble_fota_main
if fota_mode == "file" then
    ble_fota_main = require "ble_file_fota"
else
    ble_fota_main = require "ble_packet_fota"
end

-- ble_main的任务名
local TASK_NAME = "BLE_MAIN"

-- 配置参数
config = {
    device_name = "Air6201_FOTA", -- 设备广播名称
    service_uuid = "F000",        -- FOTA服务UUID(短格式)
    char_uuid_cmd = "F001",       -- 命令特征值UUID
    char_uuid_data = "F002",      -- 数据特征值UUID
    max_packet_size = 20          -- BLE数据包最大长度(字节)
}

local bluetooth_device = nil
local ble_device = nil
local adv_create = nil
local gatt_create = nil

-- GATT服务数据库定义
-- 这里定义了BLE设备提供的服务和特征值
local att_db = {
    string.fromHex(config.service_uuid), -- Service UUID
    {
        string.fromHex(config.char_uuid_cmd),
        ble.WRITE | ble.WRITE_CMD
    },
    {
        string.fromHex(config.char_uuid_data),
        ble.WRITE | ble.WRITE_CMD
    }
}

-- BLE事件回调函数
local function ble_event_cb(ble_dev, event, param)
    log.info("BLE_EVENT", "收到BLE事件:", event)

    -- 根据LuatOS BLE事件枚举处理不同事件
    if event == ble.EVENT_CONN then
        -- 连接成功事件
        log.info("BLE_EVENT", "设备已连接", "地址:", param.addr and param.addr:toHex() or "未知")
        sys.sendMsg(TASK_NAME, "BLE_EVENT", "CONNECT", param)
    elseif event == ble.EVENT_DISCONN then
        -- 连接断开事件
        log.info("BLE_EVENT", "设备已断开连接", "原因:", param.reason or "未知")
        sys.sendMsg(TASK_NAME, "BLE_EVENT", "DISCONNECTED", param.reason)
    elseif event == ble.EVENT_WRITE then
        -- 写入事件 - 这是关键事件!
        log.info("BLE_EVENT", "处理写入事件")

        -- 检查参数是否完整
        if not param or not param.uuid_service or not param.uuid_characteristic or not param.data then
            log.error("BLE_EVENT", "写入事件参数不完整")
            return
        end

        -- 获取服务UUID和特征值UUID
        local service_uuid = param.uuid_service:toHex()
        local char_uuid = param.uuid_characteristic:toHex()
        local data = param.data

        log.info("BLE_EVENT", "服务UUID:", service_uuid)
        log.info("BLE_EVENT", "特征值UUID:", char_uuid)
        log.info("BLE_EVENT", "数据长度:", #data, "字节")
        sys.sendMsg(TASK_NAME, "BLE_EVENT", "WRITE_REQ", param)
    elseif event == ble.EVENT_READ then
        -- 读取事件 - 外围设备收到主设备读请求
        log.info("BLE_EVENT", "处理读取事件")
    elseif event == ble.EVENT_READ_VALUE then
        -- 读取操作完成事件 - 中心设备读取特征值完成
        log.info("BLE_EVENT", "读取操作完成", "数据:", param.data and param.data:toHex() or "无数据")
    elseif event == ble.EVENT_SCAN_REPORT then
        -- 扫描报告事件 - 中心设备扫描到其他BLE设备
        log.info("BLE_EVENT", "扫描报告", "RSSI:", param.rssi, "地址:", param.adv_addr and param.adv_addr:toHex() or "未知")
    elseif event == ble.EVENT_SCAN_STOP then
        -- 扫描停止事件
        log.info("BLE_EVENT", "扫描停止")
    else
        -- 其他事件
        log.info("BLE_EVENT", "其他事件类型:", event)
        if param then
            -- 尝试打印参数的基本信息,避免直接打印table导致错误
            if type(param) == "table" then
                log.info("BLE_EVENT", "事件参数为table,包含字段:", #param)
                for k, v in pairs(param) do
                    if type(v) == "string" then
                        log.info("BLE_EVENT", "参数字段:", k, "值:", v:toHex())
                    else
                        log.info("BLE_EVENT", "参数字段:", k, "类型:", type(v))
                    end
                end
            else
                log.info("BLE_EVENT", "事件参数类型:", type(param))
            end
        end
    end
end

-- 初始化BLE功能
local function ble_init()
    log.info("BLE_INIT", "开始初始化BLE...")

    -- 初始化蓝牙核心
    bluetooth_device = bluetooth.init()
    if not bluetooth_device then
        log.error("BLE_INIT", "蓝牙核心初始化失败")
        return false
    end
    log.info("BLE_INIT", "蓝牙核心初始化成功")

    -- 初始化BLE功能
    ble_device = bluetooth_device:ble(ble_event_cb)
    if not ble_device then
        log.error("BLE_INIT", "BLE功能初始化失败")
        return false
    end
    log.info("BLE_INIT", "BLE功能初始化成功")

    -- 创建GATT服务
    gatt_create = ble_device:gatt_create(att_db)
    if not gatt_create then
        log.error("BLE_INIT", "GATT服务创建失败")
        return false
    end
    log.info("BLE_INIT", "GATT服务创建成功")

    -- 配置广播数据
    log.info("BLE_INIT", "配置广播数据...")
    adv_create = ble_device:adv_create({
        addr_mode = ble.PUBLIC,
        channel_map = ble.CHNLS_ALL,
        intv_min = 120,
        intv_max = 120,
        adv_data = {
            { ble.FLAGS,               string.char(0x06) },  -- BLE标志
            { ble.COMPLETE_LOCAL_NAME, config.device_name }, -- 设备名称
        }
    })

    if not adv_create then
        log.error("BLE_INIT", "广播配置失败")
        return false
    end
    log.info("BLE_INIT", "广播配置成功")

    -- 开始广播
    ble_device:adv_start()
    log.info("BLE_INIT", "BLE广播已启动,设备名称:", config.device_name)

    return true
end

-- 主任务处理函数
local function ble_main_task_func()
    local result, msg

    while true do
        result = ble_init()
        if not result then
            log.error("ble_main_task_func", "ble_init error")
            goto EXCEPTION_PROC
        end

        while true do
            msg = sys.waitMsg(TASK_NAME, "BLE_EVENT")

            if not msg then
                log.error("ble_main_task_func", "waitMsg timeout")
                goto EXCEPTION_PROC
            end

            if msg[2] == "CONNECT" then
                local conn_param = msg[3]
                log.info("BLE", "设备连接成功: " .. conn_param.addr:toHex())
            elseif msg[2] == "DISCONNECTED" then
                log.info("BLE", "设备断开连接,原因: " .. msg[3])
                -- 通知FOTA模块连接断开
                ble_fota_main.proc_disconnect()
                break
            -- 收到中心设备的写请求,将写的数据发给ble_fota_main模块处理
            elseif msg[2] == "WRITE_REQ" then
                local ble_param = msg[3]
                local service_uuid = ble_param.uuid_service:toHex()
                local char_uuid = ble_param.uuid_characteristic:toHex()
                local data = ble_param.data

                log.info("BLE", "收到写请求: " .. service_uuid .. " " .. char_uuid .. " " .. data:toHex())
                ble_fota_main.proc(service_uuid, char_uuid, data)
            end
        end

        ::EXCEPTION_PROC::
        log.error("ble_main_task_func", "异常退出, 5秒后重新开启广播")

        -- 停止广播
        if ble_device then
            ble_device:adv_stop()
        end

        -- 清空此task绑定的消息队列中的未处理的消息
        sys.cleanMsg(TASK_NAME)

        -- 5秒后跳转到循环体开始位置,自动重试
        sys.wait(5000)
    end
end

-- 启动主任务
sys.taskInitEx(ble_main_task_func, TASK_NAME)

7.1.3 Python 升级工具(ble_fota_tool.py)

#!/usr/bin/env python3
import asyncio
import struct
import time
from bleak import BleakScanner, BleakClient

# 完整UUID定义
FOTA_SERVICE_UUID = "0000f000-0000-1000-8000-00805f9b34fb"  # 完整服务UUID
FOTA_CMD_CHAR_UUID = "0000f001-0000-1000-8000-00805f9b34fb"  # 完整命令特征UUID
FOTA_DATA_CHAR_UUID = "0000f002-0000-1000-8000-00805f9b34fb"  # 完整数据特征UUID

# Command definitions
CMD_START_UPGRADE = 0x01
CMD_END_UPGRADE = 0x02

# 每包数据大小
MAX_PACKET_SIZE = 200

class SimpleFotaTool:
    def __init__(self, device_name, firmware_path):
        self.device_name = device_name
        self.firmware_path = firmware_path
        self.client = None
        self.firmware_data = None
        self.total_size = 0
        self.target_device = None

    async def load_firmware(self):
        """Load firmware file into memory"""
        try:
            with open(self.firmware_path, 'rb') as f:
                self.firmware_data = f.read()
            self.total_size = len(self.firmware_data)
            print(f"   固件加载完成,大小: {self.total_size} 字节")
            return True
        except Exception as e:
            print(f"   加载固件失败: {e}")
            return False

    async def scan_device(self):
        """Scan for the target device"""
        print("\n2. 扫描目标设备...")
        print("   正在扫描,请等待...")

        try:
            devices = await BleakScanner.discover(timeout=10.0)

            found_devices = []
            for device in devices:
                if device.name and self.device_name in device.name:
                    found_devices.append(device)
                    print(f"   找到匹配设备: {device.name} (地址: {device.address})")

            if not found_devices:
                print(f"   未找到设备: {self.device_name}")
                return None

            # 选择第一个匹配的设备
            self.target_device = found_devices[0]
            print(f"   选择设备: {self.target_device.name} (地址: {self.target_device.address})")
            return self.target_device

        except Exception as e:
            print(f"   扫描失败: {e}")
            return None

    async def connect_device(self, device):
        """Connect to the target device"""
        print("\n3. 建立BLE连接...")
        try:
            self.client = BleakClient(device.address)
            await self.client.connect(timeout=30.0)
            print(f"   连接成功,状态: {self.client.is_connected}")

            # 调试:打印所有服务和特征值
            print("\n4. 发现服务和特征值...")

            # 兼容不同版本的Bleak库
            try:
                # 新版本Bleak
                services = self.client.services
            except AttributeError:
                # 旧版本Bleak
                services = await self.client.get_services()

            fota_service_found = False
            for service in services:
                if service.uuid.lower() == FOTA_SERVICE_UUID.lower():
                    fota_service_found = True
                    print(f"   找到FOTA服务: {service.uuid}")
                    for char in service.characteristics:
                        print(f"     特征值: {char.uuid} - 属性: {char.properties}")
                        if char.uuid.lower() == FOTA_CMD_CHAR_UUID.lower():
                            print(f"       -> 命令特征值 (可写)")
                        elif char.uuid.lower() == FOTA_DATA_CHAR_UUID.lower():
                            print(f"       -> 数据特征值 (可写)")

            if not fota_service_found:
                print("   警告: 未找到FOTA服务,但继续尝试...")

            return True
        except Exception as e:
            print(f"   连接失败: {e}")
            return False

    async def write_characteristic(self, uuid, data):
        """写入特征值"""
        try:
            await self.client.write_gatt_char(uuid, data, response=True)

            # 正确提取短UUID(从完整UUID中提取f001/f002部分)
            # 完整UUID格式: "0000f001-0000-1000-8000-00805f9b34fb"
            # 我们想要提取 "f001" 部分
            short_uuid = uuid.split('-')[0][-4:]
            print(f"   写入特征值 {short_uuid},数据长度: {len(data)} 字节")
            return True
        except Exception as e:
            print(f"   写入特征值失败: {e}")
            return False

    async def send_start_command(self):
        """发送开始升级命令"""
        print("\n5. 发送开始升级命令...")

        # 连接成功后短暂延时
        print("   连接成功,等待1秒...")
        await asyncio.sleep(1)

        # 发送开始升级命令
        start_cmd = struct.pack("<BI", CMD_START_UPGRADE, self.total_size)
        if not await self.write_characteristic(FOTA_CMD_CHAR_UUID, start_cmd):
            return False

        print("   开始命令发送完成")
        await asyncio.sleep(1)  # 等待设备准备
        return True

    async def send_firmware_data(self):
        """Send firmware data in chunks with optimized delay"""
        print("\n6. 分块传输固件数据...")
        sent_bytes = 0
        start_time = time.time()
        packet_count = 0

        # 优化延时:减少到100ms以提高速度
        PACKET_DELAY = 0.1

        while sent_bytes < self.total_size:
            chunk_size = min(MAX_PACKET_SIZE, self.total_size - sent_bytes)
            chunk = self.firmware_data[sent_bytes:sent_bytes + chunk_size]

            if not await self.write_characteristic(FOTA_DATA_CHAR_UUID, chunk):
                return False

            sent_bytes += chunk_size
            packet_count += 1

            # 短暂延时,避免数据丢失
            await asyncio.sleep(PACKET_DELAY)

            # 每20个数据包显示一次进度
            if packet_count % 20 == 0 or sent_bytes >= self.total_size:
                progress = (sent_bytes / self.total_size) * 100
                elapsed = time.time() - start_time
                speed = sent_bytes / elapsed / 1024 if elapsed > 0 else 0
                remaining_time = (self.total_size - sent_bytes) / (sent_bytes / elapsed) if sent_bytes > 0 else 0
                print(f"   进度: {progress:.1f}% - {speed:.1f} KB/s - 已发送 {packet_count} 包 - 预计剩余: {remaining_time:.1f}s")

        total_time = time.time() - start_time
        avg_speed = self.total_size / total_time / 1024
        print(f"   数据传输完成! 总时间: {total_time:.1f}s, 平均速度: {avg_speed:.1f} KB/s")
        return True

    async def end_upgrade(self):
        """Send end upgrade command"""
        print("\n7. 发送结束升级命令...")
        end_cmd = struct.pack("B", CMD_END_UPGRADE)
        if not await self.write_characteristic(FOTA_CMD_CHAR_UUID, end_cmd):
            return False

        print("   结束命令发送完成")

        # 等待设备处理
        print("\n8. 等待设备处理升级...")
        await asyncio.sleep(5)  # 给设备足够时间处理
        return True

    async def run(self):
        """Main execution flow"""

        # 1. 加载固件文件
        print("\n1. 加载固件文件...")
        if not await self.load_firmware():
            return False

        # 2. 扫描目标设备
        device = await self.scan_device()
        if not device:
            return False

        # 3. 连接设备
        if not await self.connect_device(device):
            return False

        try:
            # 4. 发送开始命令
            if not await self.send_start_command():
                return False

            # 5. 发送固件数据
            if not await self.send_firmware_data():
                return False

            # 6. 结束升级
            if not await self.end_upgrade():
                return False

            print("\n" + "="*50)
            print("升级流程完成! 设备应该正在重启...")
            print("="*50)
            return True

        except Exception as e:
            print(f"   升级过程中出现错误: {e}")
            return False
        finally:
            # 断开连接
            if self.client and self.client.is_connected:
                await self.client.disconnect()
                print("   已断开连接")

async def main():
    import argparse
    parser = argparse.ArgumentParser(description="蓝牙FOTA升级工具")
    parser.add_argument("-f", "--firmware", required=True, help="固件文件路径")
    parser.add_argument("-d", "--device", default="Air6201_FOTA", help="设备名称")

    args = parser.parse_args()

    tool = SimpleFotaTool(args.device, args.firmware)
    success = await tool.run()
    return success

if __name__ == "__main__":
    success = asyncio.run(main())
    exit(0 if success else 1)

7.2 升级包制作

升级包制作是 FOTA 功能的关键步骤,确保按照升级包制作流程操作。

7.3 功能验证

7.3.1 文件系统方式

代码中修改 fota_mode 参数为"file"方式:

-- 选择FOTA升级方式:"file" 或 "packet"
-- 1. file方式:将升级包数据先写入本地文件,然后调用fota.file()进行升级
-- 2. packet方式:直接使用fota.packet()处理分段数据,不写入文件,适合差分升级
local fota_mode = "file" -- 默认使用file方式

模组 luatools 日志如下:

日志解读:

  1. 设备启动,蓝牙服务初始化成功
  2. 收到开始升级命令,初始化 FOTA 分区和缓冲区
  3. 开始分段接收升级数据保存到文件中
  4. 所有数据接收完成,验证固件完整性
  5. 运行升级流程,升级完成,设备重启
  6. 重启后新版本正常运行,确认升级成功

上位机工具日志:

进入到 ble_fota 目录下运行 cmd 命令进入命令行程序,然后等待设备运行并启动蓝牙广播后运行下面命令:

python ble_fota_tool.py -f ble_fota.bin

7.3.2 使用分包方式升级:

代码中修改 fota_mode 参数为"packet"方式:

-- 选择FOTA升级方式:"file" 或 "packet"
-- 1. file方式:将升级包数据先写入本地文件,然后调用fota.file()进行升级
-- 2. packet方式:直接使用fota.packet()处理分段数据,不写入文件,适合差分升级
local fota_mode = "packet" -- 默认使用file方式

模组 luatools 日志如下:

日志解读:

  1. 设备启动,蓝牙服务初始化成功
  2. 收到开始升级命令,初始化 FOTA 分区和缓冲区
  3. 开始分段接收升级数据并通过 fota.run 接口写入 fota 分区
  4. 所有数据接收完成,验证固件完整性
  5. 运行升级流程,升级完成,设备重启
  6. 重启后新版本正常运行,确认升级成功

上位机工具日志:

进入到 ble_fota 目录下运行 cmd 命令进入命令行程序,然后等待设备运行并启动蓝牙广播后运行下面命令:

python ble_fota_tool.py -f ble_fota.bin

八、常见问题与解决方案

8.1 蓝牙连接失败

  • 问题:Python 脚本无法找到或连接设备
  • 解决方案:

  • 确认设备已上电并正常启动

  • 检查设备名称是否为"Air6201_FOTA"
  • 确认电脑蓝牙功能已开启

8.2 数据传输中断

  • 问题:升级过程中数据传输中断
  • 解决方案:

  • 确保设备与电脑之间没有障碍物

  • 避免其他蓝牙设备的干扰
  • 检查设备电量是否充足
  • 重新运行升级工具

8.3 升级后版本未更新

  • 问题:升级完成后版本号未变化
  • 解决方案:

  • 检查升级包制作是否正确

  • 确认 VERSION 变量已修改
  • 查看设备日志确认升级是否成功
  • 尝试完全断电后重新上电

8.4 Python 环境问题

  • 问题:Python 脚本无法运行
  • 解决方案:

  • 确认已安装 Python 3.x

  • 安装必要的库:pip install bleak
  • 检查脚本文件路径是否正确
  • 确保有足够的权限运行脚本

九、总结

至此,本教程详细介绍了如何使用蓝牙 FOTA 功能实现 Air6201 设备的无线固件升级。蓝牙 FOTA 提供了一种便捷、低功耗的固件更新方案,特别适合需要远程维护的物联网设备。