跳转至

8000-蓝牙升级教程文档

作者:孟伟 | 最后修改:2026-06-04

一、蓝牙 FOTA 概述

蓝牙 FOTA(Firmware Over The Air)是通过蓝牙低功耗(BLE)技术实现设备固件远程无线升级的功能。本教程基于 LuatOS 系统,演示了 Air8000 系列模块通过 BLE 进行固件升级的演示方案。

1.1 蓝牙 FOTA 有什么用?

蓝牙 FOTA 扩展功能为物联网设备提供无线固件升级解决方案,主要特点包括:

  • 无线升级:无需有线连接,通过 BLE 无线通信实现固件更新
  • 分段传输:支持大文件分片传输,适应 BLE 的数据包限制
  • 安全可靠:支持固件完整性验证,确保升级过程安全

二、演示功能概述

Air8000 系列模块通过 BLE(蓝牙低功耗)进行固件远程升级(FOTA)的完整实现方案,支持两种升级方式:

  1. 文件写入方式:将接收到的升级包数据先保存到本地临时文件,完成数据接收后再执行升级

  2. 优点:适合完整固件升级,逻辑清晰,易于调试

  3. 缺点:需要额外的文件系统分区存储空间保存完整固件
  4. 适用场景:存储空间充足的设备。
  5. 分段写入方式:直接将接收到的升级包数据通过 fota.run()函数处理,无需保存到本地文件

  6. 优点:节省存储空间,适合差分升级

  7. 缺点:对内存要求较高,需要足够的内存缓冲区实时处理数据
  8. 适用场景:存储空间有限的设备

本 demo 适用场景:

  • 智能硬件、可穿戴设备等蓝牙连接设备的固件升级
  • 需要自定义升级流程的应用场景
  • 无网络环境下的设备固件更新

三、准备硬件环境

  1. Air8000 核心板一块
  2. TYPE-C USB 数据线一根,接线方式为:

  3. Air8000 核心板通过 TYPE-C USB 口供电(核心板正面的 供电/充电 拨动开关 拨到供电一端;)

  4. TYPE-C USB 数据线直接插到核心板的 TYPE-C USB 座子,另一端连接电脑 USB 口

Air8000购买链接:Air8000_4G/WiFi/以太网-上海合宙LuatOS官方企业店-淘宝网

四、软件环境

在开始实践本示例之前,先筹备一下软件环境:

1.烧录工具: Luatools 工具

2.本demo开发测试时使用的固件为LuatOS-SoC_V2018_Air8000,本demo对固件版本没有什么特殊要求,所以你如果要测试本demo时,可以直接使用最新版本的内核固件;如果发现最新版本的内核固件测试有问题,可以使用我们开发本demo时使用的内核固件版本来对比测试;LuatOS 需要的脚本和资源文件

脚本和资源文件:https://gitee.com/openLuat/LuatOS/tree/master/module/Air8000/demo/fota/ble_fota

准备好软件环境之后,接下来查看如何烧录项目文件到 Air8000 核心板,将本篇文章中演示使用的项目文件烧录到 Air8000 核心板中。

  1. Python 环境:Python 3 环境(用于运行 ble_fota_tool.py 发送升级包)
  2. 需要安装 bleak 库,可通过以下命令安装:
pip install bleak

五、API 接口说明

详细 API 文档请参考:

ble:https://docs.openluat.com/osapi/core/ble/

fota:https://docs.openluat.com/osapi/core/fota/

六、蓝牙 GATT 服务设计

6.1 服务与特征值定义

6.1.1 命令报文格式

开始升级命令

  • 命令码:0x01
  • 格式:[命令码(1字节)] + [文件大小(4字节)]
  • 示例:01 75 15 00 00 表示开始升级,固件大小为 5493 字节
  • 说明:

  • 命令码固定为 0x01

  • 文件大小为 4 字节小端序
  • 设备收到此命令后,初始化 FOTA 子系统并准备接收数据

结束升级命令

  • 命令码:0x02
  • 格式:[命令码(1字节)]
  • 示例:02
  • 说明:

  • 命令码固定为 0x02

  • 设备收到此命令后,验证固件完整性并执行升级

6.1.2 数据报文格式

  • 格式:[固件数据(n字节)]
  • 默认长度:200 字节(可通过 ble_fota_tool.py 配置调整,最大 256)
  • 说明:

  • 固件数据为二进制格式

  • 每包数据大小不超过 BLE 数据包最大长度:Air8000 系列模组硬件支持最大 256 字节,考虑到稳定性建议保持默认的 200 字节
  • 设备收到数据后,根据所选的 FOTA 方式进行处理:
    • 文件方式:将数据写入临时文件
    • 分段方式:直接通过 fota.run()处理数据

6.2 升级流程说明

用法:

  1. 先把脚本和固件烧录到 Air8000 模块中,并确认设备正常启动
  2. 模块启动后会自动开启 BLE 广播,广播名称为"Air8000_FOTA"
  3. 在电脑端操作:运行 ble_fota_tool.py 脚本连接设备并发送升级固件

  4. 注意:确保升级文件名为正确格式,并且与 ble_fota_tool.py 在同一目录下

  5. 观察日志输出确认升级进度
  6. 模块接收并验证固件成功后,会自动重启并应用新固件

BLE 通讯过程说明:

蓝牙 FOTA 升级通过 BLE 特征值进行命令控制和数据传输:

  1. 上位机通过 BLE 扫描并连接名为"Air8000_FOTA"的设备
  2. 上位机向命令特征值(F001)发送开始升级命令(0x01)和固件大小
  3. 设备初始化 FOTA 功能并准备接收数据
  4. 上位机向数据特征值(F002)分包发送固件数据
  5. 设备接收并保存数据到临时文件
  6. 上位机发送结束升级命令(0x02)
  7. 设备验证固件完整性并执行 FOTA 升级流程
  8. 升级成功后设备自动重启

6.3、蓝牙 FOTA 升级工具使用说明

6.3.1 工具功能

  • 连接 Air8000 设备并发送升级包
  • 显示升级进度和状态信息
  • 支持自定义升级包文件名

6.3.2 使用方法

  1. 确保 Air8000 设备处于蓝牙广播状态,设备名称为"Air8000_FOTA"
  2. 在命令行中运行 ble_fota_tool.py 脚本,指定升级包文件名:
  3. bash
  4. python ble_fota_tool.py -f 升级包文件名

6.3.3 参数说明

  • -d--device:指定目标设备名称(默认值为"Air8000_FOTA")
  • -f--firmware:指定固件文件路径(必需参数)

6.3.4 工作流程

当运行脚本后,它会自动执行以下步骤:

  1. 加载指定的固件文件
  2. 扫描并发现蓝牙设备
  3. 连接到目标设备
  4. 发送开始升级命令
  5. 发送固件数据到设备
  6. 发送结束升级命令
  7. 等待设备处理升级
  8. 断开连接并完成升级过程

七、代码示例介绍

下面将演示蓝牙 FOTA 的两种典型场景:文件方式升级和分包方式升级,用户可根据实际需求选择配置方式。

7.1 主要代码分析

7.1.1 文件系统方式 FOTA 升级业务逻辑模块(ble_file_fota.lua)

FOTA 业务逻辑模块,负责升级流程控制、命令处理和文件操作;:

--[[
-- 蓝牙FOTA升级功能(文件写入方式)
-- 提供通过蓝牙低功耗(BLE)接收升级包数据进行固件升级的功能

本文件为FOTA业务逻辑处理模块,核心业务逻辑为:
1. 处理接收到的BLE写入请求数据
2. 实现FOTA升级流程的控制
3. 管理升级状态和文件操作

本文件的对外接口有1个:
1. ble_file_fota.proc(service_uuid, char_uuid, data): 处理接收到的BLE写入请求数据

依赖模块:
- ble_main: 用于提供BLE服务和事件处理
]]

local ble_file_fota = {}

-- 升级状态管理
local upgrade_state = {
    is_upgrading = false,          -- 是否正在升级
    total_size = 0,                -- 总文件大小(字节)
    received_size = 0,             -- 已接收大小(字节)
    upgrade_file = "/ble_fota.bin" -- 临时升级文件路径
}

-- 配置参数
local config = {
    service_uuid = "F000",   -- FOTA服务UUID(短格式)
    char_uuid_cmd = "F001",  -- 命令特征值UUID
    char_uuid_data = "F002", -- 数据特征值UUID
    max_packet_size = 200    -- BLE数据包最大长度(字节)
}
local function ble_reboot()
    -- 完成FOTA流程并重启
    fota.finish(true)
    log.info("FOTA_CMD", "正在重启设备...")
    rtos.reboot()
end
-- 处理FOTA命令
-- @param cmd_data 命令数据,格式:[命令码(1字节)] 或 [命令码(1字节) + 文件大小(4字节)]
local function handle_command(cmd_data)
    log.info("FOTA_CMD", "收到命令数据:", cmd_data:toHex(), "长度:", #cmd_data)

    -- 检查命令数据是否有效
    if #cmd_data < 1 then
        log.error("FOTA_CMD", "命令数据为空")
        return
    end

    -- 解析命令码(第一个字节)
    local cmd = cmd_data:byte(1)
    log.info("FOTA_CMD", "解析命令码:", cmd, string.format("(0x%02X)", cmd))

    -- 命令0x01:开始升级
    if cmd == 0x01 then
        log.info("FOTA_CMD", "处理开始升级命令")

        -- 检查命令格式:需要至少5字节(1字节命令码 + 4字节文件大小)
        if #cmd_data >= 5 then
            -- 解析文件大小(小端序,从第2字节开始)
            local total_size = string.unpack("<I4", cmd_data, 2)
            log.info("FOTA_CMD", "文件总大小:", total_size, "字节")

            -- 初始化FOTA子系统
            log.info("FOTA_CMD", "初始化FOTA子系统...")
            if fota.init() then
                log.info("FOTA_CMD", "FOTA初始化成功")

                -- 等待FOTA底层准备就绪
                log.info("FOTA_CMD", "等待FOTA底层准备...")
                -- 等待FOTA底层准备就绪,最多等待10秒
                local wait_count = 0
                local wait_ok = false
                while wait_count < 100 do -- 最多轮询100次,每次100ms,共10秒
                    if fota.wait() then
                        wait_ok = true
                        break
                    end
                    sys.wait(100)
                    wait_count = wait_count + 1
                end

                if wait_ok then
                    log.info("FOTA_CMD", "FOTA底层准备就绪")

                    -- 删除旧的临时文件(如果存在)
                    if os.remove(upgrade_state.upgrade_file) then
                        log.info("FOTA_CMD", "已清理旧临时文件")
                    end

                    -- 更新升级状态
                    upgrade_state.is_upgrading = true
                    upgrade_state.total_size = total_size
                    upgrade_state.received_size = 0

                    log.info("FOTA_CMD", "升级状态已设置",
                        "总大小:", upgrade_state.total_size,
                        "临时文件:", upgrade_state.upgrade_file)
                    log.info("FOTA_CMD", "准备接收固件数据...")
                else
                    log.error("FOTA_CMD", "FOTA底层准备超时")
                    fota.finish(false)
                    upgrade_state.is_upgrading = false
                end
            else
                log.error("FOTA_CMD", "FOTA初始化失败")
            end
        else
            log.error("FOTA_CMD", "开始命令格式错误,长度不足")
        end

        -- 命令0x02:结束升级
    elseif cmd == 0x02 then
        log.info("FOTA_CMD", "处理结束升级命令")

        -- 检查是否处于升级状态
        if not upgrade_state.is_upgrading then
            log.warn("FOTA_CMD", "未处于升级状态,忽略结束命令")
            return
        end

        -- 验证文件完整性
        log.info("FOTA_CMD", "验证文件完整性...")
        log.info("FOTA_CMD", "已接收:", upgrade_state.received_size, "字节")
        log.info("FOTA_CMD", "应接收:", upgrade_state.total_size, "字节")

        if upgrade_state.received_size == upgrade_state.total_size then
            log.info("FOTA_CMD", "文件完整性验证通过")

            -- 执行FOTA升级
            log.info("FOTA_CMD", "开始执行FOTA升级...")
            local result, isDone = fota.file(upgrade_state.upgrade_file)
            log.info("FOTA_CMD", "FOTA升级结果:", "result:", result, "isDone:", isDone)

            if result and isDone then
                log.info("FOTA_CMD", " FOTA升级成功!")

                -- 延迟重启,给用户一些反应时间
                log.info("FOTA_CMD", "2秒后设备将自动重启...,重启后通过日志判断最终是否升级成功")

                -- 延迟2秒后重启设备
                sys.timerStart(ble_reboot, 2000)
            else
                log.error("FOTA_CMD", "FOTA升级失败")
            end
        else
            log.error("FOTA_CMD", "文件不完整,升级失败")
        end

        -- 清理升级状态(无论成功还是失败)
        log.info("FOTA_CMD", "清理升级状态...")
        upgrade_state.is_upgrading = false

        -- 删除临时文件
        if upgrade_state.upgrade_file then
            if os.remove(upgrade_state.upgrade_file) then
                log.info("FOTA_CMD", "已删除临时文件")
            else
                log.warn("FOTA_CMD", "删除临时文件失败")
            end
        end

        -- 结束FOTA流程
        fota.finish(false)
        log.info("FOTA_CMD", "升级流程结束")
    else
        log.warn("FOTA_CMD", "未知命令码:", cmd, string.format("(0x%02X)", cmd))
    end
end

-- 处理FOTA数据
-- @param data 固件数据块
local function handle_data(data)
    log.info("FOTA_DATA", "收到数据包,长度:", #data, "字节")

    -- 检查是否处于升级状态
    if not upgrade_state.is_upgrading then
        log.warn("FOTA_DATA", "未处于升级状态,忽略数据")
        return
    end

    -- 保存数据到临时文件
    log.info("FOTA_DATA", "写入文件:", upgrade_state.upgrade_file)
    local file = io.open(upgrade_state.upgrade_file, "ab")
    if file then
        -- 写入数据
        file:write(data)
        file:close()

        -- 更新接收状态
        upgrade_state.received_size = upgrade_state.received_size + #data

        -- 计算并显示进度
        local progress = math.floor((upgrade_state.received_size / upgrade_state.total_size) * 100)

        -- 每50个数据包或完成时打印进度
        if upgrade_state.received_size % (config.max_packet_size * 50) == 0 or
            upgrade_state.received_size >= upgrade_state.total_size then
            log.info("FOTA_DATA", "升级进度:", progress, "%",
                "(", upgrade_state.received_size, "/", upgrade_state.total_size, ")")
        end

        log.info("FOTA_DATA", "数据写入成功,当前总计:", upgrade_state.received_size, "字节")
    else
        log.error("FOTA_DATA", "打开文件失败:", upgrade_state.upgrade_file)

        -- 文件操作失败,终止升级
        upgrade_state.is_upgrading = false
        fota.finish(false)
    end
end

-- 处理接收到的BLE写入请求数据
-- @param service_uuid 服务UUID
-- @param char_uuid 特征值UUID
-- @param data 写入的数据
function ble_file_fota.proc(service_uuid, char_uuid, data)
    log.info("ble_file_fota", "处理写入数据", service_uuid, char_uuid, data:toHex())

    -- 简化的UUID匹配逻辑:检查UUID是否包含我们的短UUID
    local is_service_match = string.find(service_uuid:lower(), config.service_uuid:lower())
    local is_cmd_match = string.find(char_uuid:lower(), config.char_uuid_cmd:lower())
    local is_data_match = string.find(char_uuid:lower(), config.char_uuid_data:lower())

    log.info("ble_file_fota", "UUID匹配结果:",
        "服务匹配:", is_service_match,
        "命令匹配:", is_cmd_match,
        "数据匹配:", is_data_match)

    if is_service_match then
        if is_cmd_match then
            -- 命令特征值:处理FOTA命令
            log.info("ble_file_fota", "命令特征值匹配,处理命令")
            handle_command(data)
        elseif is_data_match then
            -- 数据特征值:处理FOTA数据
            log.info("ble_file_fota", "数据特征值匹配,处理数据")
            handle_data(data)
        else
            log.warn("ble_file_fota", "未知的特征值UUID:", char_uuid)
        end
    else
        log.warn("ble_file_fota", "未知的服务UUID:", service_uuid)
    end
end

function ble_file_fota.proc_disconnect()
    log.info("ble_file_fota", "处理连接断开事件")

    -- 如果正在升级,连接断开则终止升级
    if upgrade_state.is_upgrading then
        log.error("ble_file_fota", "升级过程中连接断开,终止升级")
        upgrade_state.is_upgrading = false

        -- 删除临时文件
        if upgrade_state.upgrade_file then
            os.remove(upgrade_state.upgrade_file)
        end

        -- 结束FOTA流程
        fota.finish(false)
    end
end

return ble_file_fota

7.1.2 分包方式 FOTA 升级业务逻辑模块(ble_packet_fota.lua)

--[[
-- 蓝牙FOTA升级功能(分段写入方式)
-- 提供通过蓝牙低功耗(BLE)接收升级包数据进行固件升级的功能

本文件为FOTA业务逻辑处理模块,核心业务逻辑为:
1. 处理接收到的BLE写入请求数据
2. 实现FOTA升级流程的控制(分段写入方式)
3. 管理升级状态和分段数据操作

本文件的对外接口有1个:
1. ble_packet_fota.proc(service_uuid, char_uuid, data): 处理接收到的BLE写入请求数据

依赖模块:
- ble_main: 用于提供BLE服务和事件处理
]]

local ble_packet_fota = {}

-- 升级状态管理
local upgrade_state = {
    is_upgrading = false, -- 是否正在升级
    total_size = 0,       -- 总文件大小(字节)
    received_size = 0,    -- 已接收大小(字节)
    upgrade_packet = 0    -- 升级包计数器
}

-- 配置参数
local config = {
    service_uuid = "F000",   -- FOTA服务UUID(短格式)
    char_uuid_cmd = "F001",  -- 命令特征值UUID
    char_uuid_data = "F002", -- 数据特征值UUID
    max_packet_size = 200    -- BLE数据包最大长度(字节)
}
local function ble_reboot()
    -- 完成FOTA流程并重启
    fota.finish(true)
    log.info("FOTA_CMD", "正在重启设备...")
    rtos.reboot()
end
-- 处理FOTA命令
-- @param cmd_data 命令数据,格式:[命令码(1字节)] 或 [命令码(1字节) + 文件大小(4字节)]
local function handle_command(cmd_data)
    log.info("FOTA_CMD", "收到命令数据:", cmd_data:toHex(), "长度:", #cmd_data)

    -- 检查命令数据是否有效
    if #cmd_data < 1 then
        log.error("FOTA_CMD", "命令数据为空")
        return
    end

    -- 解析命令码(第一个字节)
    local cmd = cmd_data:byte(1)
    log.info("FOTA_CMD", "解析命令码:", cmd, string.format("(0x%02X)", cmd))

    -- 命令0x01:开始升级
    if cmd == 0x01 then
        log.info("FOTA_CMD", "处理开始升级命令")

        -- 检查命令格式:需要至少5字节(1字节命令码 + 4字节文件大小)
        if #cmd_data >= 5 then
            -- 解析文件大小(小端序,从第2字节开始)
            local total_size = string.unpack("<I4", cmd_data, 2)
            log.info("FOTA_CMD", "文件总大小:", total_size, "字节")

            -- 初始化FOTA子系统
            log.info("FOTA_CMD", "初始化FOTA子系统...")
            if fota.init() then
                log.info("FOTA_CMD", "FOTA初始化成功")

                -- 等待FOTA底层准备就绪
                log.info("FOTA_CMD", "等待FOTA底层准备...")
                -- 等待FOTA底层准备就绪,最多等待10秒
                local wait_count = 0
                local wait_ok = false
                while wait_count < 100 do -- 最多轮询100次,每次100ms,共10秒
                    if fota.wait() then
                        wait_ok = true
                        break
                    end
                    sys.wait(100)
                    wait_count = wait_count + 1
                end

                if wait_ok then
                    log.info("FOTA_CMD", "FOTA底层准备就绪")

                    -- 更新升级状态
                    upgrade_state.is_upgrading = true
                    upgrade_state.total_size = total_size
                    upgrade_state.received_size = 0
                    upgrade_state.upgrade_packet = 0

                    log.info("FOTA_CMD", "升级状态已设置",
                        "总大小:", upgrade_state.total_size)
                    log.info("FOTA_CMD", "准备接收固件数据...")
                else
                    log.error("FOTA_CMD", "FOTA底层准备超时")
                    fota.finish(false)
                    upgrade_state.is_upgrading = false
                end
            else
                log.error("FOTA_CMD", "FOTA初始化失败")
            end
        else
            log.error("FOTA_CMD", "开始命令格式错误,长度不足")
        end

        -- 命令0x02:结束升级(通知升级包发完)
    elseif cmd == 0x02 then
        log.info("FOTA_CMD", "处理结束升级命令")

        -- 检查是否处于升级状态
        if not upgrade_state.is_upgrading then
            log.warn("FOTA_CMD", "未处于升级状态,忽略结束命令")
            return
        end

        -- 验证文件完整性
        log.info("FOTA_CMD", "验证文件完整性...")
        log.info("FOTA_CMD", "已接收:", upgrade_state.received_size, "字节")
        log.info("FOTA_CMD", "应接收:", upgrade_state.total_size, "字节")

        if upgrade_state.received_size == upgrade_state.total_size then
            log.info("FOTA_CMD", "文件完整性验证通过")
            log.info("FOTA_CMD", "升级数据已全部接收,等待升级完成...")

            -- 等待底层校验结束
            local success = false
            for i = 1, 30 do -- 最多等待3秒
                sys.wait(100)
                local succ, fotaDone = fota.isDone()
                if not succ then
                    log.error("FOTA_CMD", "校验过程出错")
                    fota.finish(false)
                    upgrade_state.is_upgrading = false
                    break
                end
                if fotaDone then
                    log.info("FOTA_CMD", "FOTA升级成功!")

                    -- 延迟重启,给用户一些反应时间
                    log.info("FOTA_CMD", "2秒后设备将自动重启...,重启后通过日志判断最终是否升级成功")

                    -- 延迟2秒后重启设备
                    sys.timerStart(ble_reboot, 2000)
                    success = true
                    break
                end
            end

            if not success then
                log.error("FOTA_CMD", "校验超时")
                fota.finish(false)
                upgrade_state.is_upgrading = false
            end
        else
            log.error("FOTA_CMD", "文件不完整,升级失败")
            -- 清理升级状态
            upgrade_state.is_upgrading = false
            fota.finish(false)
        end

        log.info("FOTA_CMD", "结束升级命令处理完成")
    else
        log.warn("FOTA_CMD", "未知命令码:", cmd, string.format("(0x%02X)", cmd))
    end
end

-- 处理FOTA数据
-- @param data 固件数据块
local function handle_data(data)
    log.info("FOTA_DATA", "收到数据包,长度:", #data, "字节")

    -- 检查是否处于升级状态
    if not upgrade_state.is_upgrading then
        log.warn("FOTA_DATA", "未处于升级状态,忽略数据")
        return
    end

    -- 直接使用fota.run()处理分段数据,不写入文件
    log.info("FOTA_DATA", "处理分段数据,包序号:", upgrade_state.upgrade_packet)
    local result, isDone = fota.run(data)
    log.info("FOTA_DATA", "分段写入结果:", "result:", result, "isDone:", isDone)

    if result then
        -- 更新接收状态
        upgrade_state.received_size = upgrade_state.received_size + #data
        upgrade_state.upgrade_packet = upgrade_state.upgrade_packet + 1

        -- 计算并显示进度
        local progress = math.floor((upgrade_state.received_size / upgrade_state.total_size) * 100)

        -- 每50个数据包或完成时打印进度
        if upgrade_state.received_size % (config.max_packet_size * 50) == 0 or
            upgrade_state.received_size >= upgrade_state.total_size then
            log.info("FOTA_DATA", "升级进度:", progress, "%",
                "(", upgrade_state.received_size, "/", upgrade_state.total_size, ")")
        end

        log.info("FOTA_DATA", "数据写入成功,当前总计:", upgrade_state.received_size, "字节")

        -- 如果所有数据都已接收,检查升级是否完成
        if upgrade_state.received_size >= upgrade_state.total_size then
            log.info("FOTA_DATA", "所有数据已接收,等待升级完成...")
        end
    else
        log.error("FOTA_DATA", "分段写入失败")

        -- 分段写入失败,终止升级
        upgrade_state.is_upgrading = false
        fota.finish(false)
    end
end

-- 处理接收到的BLE写入请求数据
-- @param service_uuid 服务UUID
-- @param char_uuid 特征值UUID
-- @param data 写入的数据
function ble_packet_fota.proc(service_uuid, char_uuid, data)
    log.info("ble_packet_fota", "处理写入数据", service_uuid, char_uuid, data:toHex())

    -- 简化的UUID匹配逻辑:检查UUID是否包含我们的短UUID
    local is_service_match = string.find(service_uuid:lower(), config.service_uuid:lower())
    local is_cmd_match = string.find(char_uuid:lower(), config.char_uuid_cmd:lower())
    local is_data_match = string.find(char_uuid:lower(), config.char_uuid_data:lower())

    log.info("ble_packet_fota", "UUID匹配结果:",
        "服务匹配:", is_service_match,
        "命令匹配:", is_cmd_match,
        "数据匹配:", is_data_match)

    if is_service_match then
        if is_cmd_match then
            -- 命令特征值:处理FOTA命令
            log.info("ble_packet_fota", "命令特征值匹配,处理命令")
            handle_command(data)
        elseif is_data_match then
            -- 数据特征值:处理FOTA数据
            log.info("ble_packet_fota", "数据特征值匹配,处理数据")
            handle_data(data)
        else
            log.warn("ble_packet_fota", "未知的特征值UUID:", char_uuid)
        end
    else
        log.warn("ble_packet_fota", "未知的服务UUID:", service_uuid)
    end
end

-- 处理BLE连接断开事件
-- @return nil
function ble_packet_fota.proc_disconnect()
    log.info("ble_packet_fota", "处理连接断开事件")

    -- 如果正在升级,连接断开则终止升级
    if upgrade_state.is_upgrading then
        log.error("ble_packet_fota", "升级过程中连接断开,终止升级")
        upgrade_state.is_upgrading = false

        -- 结束FOTA流程
        fota.finish(false)
    end
end

return ble_packet_fota

7.1.2 BLE 服务主功能模块(ble_main.lua)

--[[
-- BLE服务主功能模块
-- 提供BLE服务的初始化、配置和事件处理
-- 不包含FOTA业务逻辑,仅处理BLE相关功能

依赖模块:
- ble_file_fota: 用于处理FOTA相关业务逻辑(文件写入方式)
- ble_packet_fota: 用于处理FOTA相关业务逻辑(分段写入方式)
]]

-- 选择FOTA升级方式:"file" 或 "packet"
-- 1. file方式:将升级包数据先写入本地文件,然后调用fota.file()进行升级
-- 2. packet方式:直接使用fota.packet()处理分段数据,不写入文件,适合差分升级
local fota_mode = "packet" -- 默认使用file方式

-- 根据选择加载对应的FOTA模块
local ble_fota_main
if fota_mode == "file" then
    ble_fota_main = require "ble_file_fota"
else
    ble_fota_main = require "ble_packet_fota"
end

-- ble_main的任务名
local TASK_NAME = "BLE_MAIN"

-- 配置参数
config = {
    device_name = "Air8000_FOTA", -- 设备广播名称
    service_uuid = "F000",        -- FOTA服务UUID(短格式)
    char_uuid_cmd = "F001",       -- 命令特征值UUID
    char_uuid_data = "F002",      -- 数据特征值UUID
    max_packet_size = 20          -- BLE数据包最大长度(字节)
}

local bluetooth_device = nil
local ble_device = nil
local adv_create = nil
local gatt_create = nil

-- GATT服务数据库定义
-- 这里定义了BLE设备提供的服务和特征值
local att_db = {
    string.fromHex(config.service_uuid), -- Service UUID
    {
        string.fromHex(config.char_uuid_cmd),
        ble.WRITE | ble.WRITE_CMD
    },
    {
        string.fromHex(config.char_uuid_data),
        ble.WRITE | ble.WRITE_CMD
    }
}

-- BLE事件回调函数
local function ble_event_cb(ble_dev, event, param)
    log.info("BLE_EVENT", "收到BLE事件:", event)

    -- 根据LuatOS BLE事件枚举处理不同事件
    if event == ble.EVENT_CONN then
        -- 连接成功事件
        log.info("BLE_EVENT", "设备已连接", "地址:", param.addr and param.addr:toHex() or "未知")
        sys.sendMsg(TASK_NAME, "BLE_EVENT", "CONNECT", param)
    elseif event == ble.EVENT_DISCONN then
        -- 连接断开事件
        log.info("BLE_EVENT", "设备已断开连接", "原因:", param.reason or "未知")
        sys.sendMsg(TASK_NAME, "BLE_EVENT", "DISCONNECTED", param.reason)
    elseif event == ble.EVENT_WRITE then
        -- 写入事件 - 这是关键事件!
        log.info("BLE_EVENT", "处理写入事件")

        -- 检查参数是否完整
        if not param or not param.uuid_service or not param.uuid_characteristic or not param.data then
            log.error("BLE_EVENT", "写入事件参数不完整")
            return
        end

        -- 获取服务UUID和特征值UUID
        local service_uuid = param.uuid_service:toHex()
        local char_uuid = param.uuid_characteristic:toHex()
        local data = param.data

        log.info("BLE_EVENT", "服务UUID:", service_uuid)
        log.info("BLE_EVENT", "特征值UUID:", char_uuid)
        log.info("BLE_EVENT", "数据长度:", #data, "字节")
        sys.sendMsg(TASK_NAME, "BLE_EVENT", "WRITE_REQ", param)
    elseif event == ble.EVENT_READ then
        -- 读取事件 - 外围设备收到主设备读请求
        log.info("BLE_EVENT", "处理读取事件")
    elseif event == ble.EVENT_READ_VALUE then
        -- 读取操作完成事件 - 中心设备读取特征值完成
        log.info("BLE_EVENT", "读取操作完成", "数据:", param.data and param.data:toHex() or "无数据")
    elseif event == ble.EVENT_SCAN_REPORT then
        -- 扫描报告事件 - 中心设备扫描到其他BLE设备
        log.info("BLE_EVENT", "扫描报告", "RSSI:", param.rssi, "地址:", param.adv_addr and param.adv_addr:toHex() or "未知")
    elseif event == ble.EVENT_SCAN_STOP then
        -- 扫描停止事件
        log.info("BLE_EVENT", "扫描停止")
    else
        -- 其他事件
        log.info("BLE_EVENT", "其他事件类型:", event)
        if param then
            -- 尝试打印参数的基本信息,避免直接打印table导致错误
            if type(param) == "table" then
                log.info("BLE_EVENT", "事件参数为table,包含字段:", #param)
                for k, v in pairs(param) do
                    if type(v) == "string" then
                        log.info("BLE_EVENT", "参数字段:", k, "值:", v:toHex())
                    else
                        log.info("BLE_EVENT", "参数字段:", k, "类型:", type(v))
                    end
                end
            else
                log.info("BLE_EVENT", "事件参数类型:", type(param))
            end
        end
    end
end

-- 初始化BLE功能
local function ble_init()
    log.info("BLE_INIT", "开始初始化BLE...")

    -- 初始化蓝牙核心
    bluetooth_device = bluetooth.init()
    if not bluetooth_device then
        log.error("BLE_INIT", "蓝牙核心初始化失败")
        return false
    end
    log.info("BLE_INIT", "蓝牙核心初始化成功")

    -- 初始化BLE功能
    ble_device = bluetooth_device:ble(ble_event_cb)
    if not ble_device then
        log.error("BLE_INIT", "BLE功能初始化失败")
        return false
    end
    log.info("BLE_INIT", "BLE功能初始化成功")

    -- 创建GATT服务
    gatt_create = ble_device:gatt_create(att_db)
    if not gatt_create then
        log.error("BLE_INIT", "GATT服务创建失败")
        return false
    end
    log.info("BLE_INIT", "GATT服务创建成功")

    -- 配置广播数据
    log.info("BLE_INIT", "配置广播数据...")
    adv_create = ble_device:adv_create({
        addr_mode = ble.PUBLIC,
        channel_map = ble.CHNLS_ALL,
        intv_min = 120,
        intv_max = 120,
        adv_data = {
            { ble.FLAGS,               string.char(0x06) },  -- BLE标志
            { ble.COMPLETE_LOCAL_NAME, config.device_name }, -- 设备名称
        }
    })

    if not adv_create then
        log.error("BLE_INIT", "广播配置失败")
        return false
    end
    log.info("BLE_INIT", "广播配置成功")

    -- 开始广播
    ble_device:adv_start()
    log.info("BLE_INIT", "BLE广播已启动,设备名称:", config.device_name)

    return true
end

-- 主任务处理函数
local function ble_main_task_func()
    local result, msg

    while true do
        result = ble_init()
        if not result then
            log.error("ble_main_task_func", "ble_init error")
            goto EXCEPTION_PROC
        end

        while true do
            msg = sys.waitMsg(TASK_NAME, "BLE_EVENT")

            if not msg then
                log.error("ble_main_task_func", "waitMsg timeout")
                goto EXCEPTION_PROC
            end

            if msg[2] == "CONNECT" then
                local conn_param = msg[3]
                log.info("BLE", "设备连接成功: " .. conn_param.addr:toHex())
            elseif msg[2] == "DISCONNECTED" then
                log.info("BLE", "设备断开连接,原因: " .. msg[3])
                -- 通知FOTA模块连接断开
                ble_fota_main.proc_disconnect()
                break
            -- 收到中心设备的写请求,将写的数据发给ble_fota_main模块处理
            elseif msg[2] == "WRITE_REQ" then
                local ble_param = msg[3]
                local service_uuid = ble_param.uuid_service:toHex()
                local char_uuid = ble_param.uuid_characteristic:toHex()
                local data = ble_param.data

                log.info("BLE", "收到写请求: " .. service_uuid .. " " .. char_uuid .. " " .. data:toHex())
                ble_fota_main.proc(service_uuid, char_uuid, data)
            end
        end

        ::EXCEPTION_PROC::
        log.error("ble_main_task_func", "异常退出, 5秒后重新开启广播")

        -- 停止广播
        if ble_device then
            ble_device:adv_stop()
        end

        -- 清空此task绑定的消息队列中的未处理的消息
        sys.cleanMsg(TASK_NAME)

        -- 5秒后跳转到循环体开始位置,自动重试
        sys.wait(5000)
    end
end

-- 启动主任务
sys.taskInitEx(ble_main_task_func, TASK_NAME)

7.1.3 Python 升级工具(ble_fota_tool.py)

class SimpleFotaTool:
    async def run(self):
        # 1. 加载固件文件
        await self.load_firmware()

        # 2. 扫描目标设备
        device = await self.scan_device()

        # 3. 连接设备
        await self.connect_device(device)

        # 4. 发送开始命令
        await self.send_start_command()

        # 5. 发送固件数据
        await self.send_firmware_data()

        # 6. 结束升级
        await self.end_upgrade()

7.2 升级包制作

升级包制作是 FOTA 功能的关键步骤,确保按照升级包制作流程操作。

7.3 功能验证

7.3.1 文件系统方式

代码中修改 fota_mode 参数为"file"方式:

-- 选择FOTA升级方式:"file" 或 "packet"
-- 1. file方式:将升级包数据先写入本地文件,然后调用fota.file()进行升级
-- 2. packet方式:直接使用fota.packet()处理分段数据,不写入文件,适合差分升级
local fota_mode = "file" -- 默认使用file方式

模组 luatools 日志如下:

日志解读:

  1. 设备启动,蓝牙服务初始化成功
  2. 收到开始升级命令,初始化 FOTA 分区和缓冲区
  3. 开始分段接收升级数据保存到文件中
  4. 所有数据接收完成,验证固件完整性
  5. 运行升级流程,升级完成,设备重启
  6. 重启后新版本正常运行,确认升级成功

上位机工具日志:

进入到 ble_fota 目录下运行 cmd 命令进入命令行程序,然后等待设备运行并启动蓝牙广播后运行下面命令:

python ble_fota_tool.py -f ble_fota.bin

7.3.2 使用分包方式升级:

代码中修改 fota_mode 参数为"packet"方式:

-- 选择FOTA升级方式:"file" 或 "packet"
-- 1. file方式:将升级包数据先写入本地文件,然后调用fota.file()进行升级
-- 2. packet方式:直接使用fota.packet()处理分段数据,不写入文件,适合差分升级
local fota_mode = "packet" -- 默认使用file方式

模组 luatools 日志如下:

日志解读:

  1. 设备启动,蓝牙服务初始化成功
  2. 收到开始升级命令,初始化 FOTA 分区和缓冲区
  3. 开始分段接收升级数据并通过 fota.run 接口写入 fota 分区
  4. 所有数据接收完成,验证固件完整性
  5. 运行升级流程,升级完成,设备重启
  6. 重启后新版本正常运行,确认升级成功

上位机工具日志:

进入到 ble_fota 目录下运行 cmd 命令进入命令行程序,然后等待设备运行并启动蓝牙广播后运行下面命令:

python ble_fota_tool.py -f ble_fota.bin

八、常见问题与解决方案

8.1 蓝牙连接失败

  • 问题:Python 脚本无法找到或连接设备
  • 解决方案:

  • 确认设备已上电并正常启动

  • 检查设备名称是否为"Air8000_FOTA"
  • 确认电脑蓝牙功能已开启

8.2 数据传输中断

  • 问题:升级过程中数据传输中断
  • 解决方案:

  • 确保设备与电脑之间没有障碍物

  • 避免其他蓝牙设备的干扰
  • 检查设备电量是否充足
  • 重新运行升级工具

8.3 升级后版本未更新

  • 问题:升级完成后版本号未变化
  • 解决方案:

  • 检查升级包制作是否正确

  • 确认 VERSION 变量已修改
  • 查看设备日志确认升级是否成功
  • 尝试完全断电后重新上电

8.4 Python 环境问题

  • 问题:Python 脚本无法运行
  • 解决方案:

  • 确认已安装 Python 3.x

  • 安装必要的库:pip install bleak
  • 检查脚本文件路径是否正确
  • 确保有足够的权限运行脚本

九、总结

至此,本教程详细介绍了如何使用蓝牙 FOTA 功能实现 Air8000 设备的无线固件升级。蓝牙 FOTA 提供了一种便捷、低功耗的固件更新方案,特别适合需要远程维护的物联网设备。