Skip to content

MPP_StreamWriter

SweerItTer edited this page Feb 1, 2026 · 3 revisions

StreamWriter API 文档

概述

StreamWriter 是 utilsCore MPP 模块的核心类,提供双线程分段写入功能,支持将编码器生成的 EncodedMeta 推入队列并自动分段写入文件。

职责

  • 分段写入编码数据
  • 双线程写入(writerA / writerB)
  • 自动文件分段
  • 调度线程分发

适用场景

  • 视频录制
  • 流媒体输出
  • 大文件分段保存

依赖关系

  • 依赖: MppEncoderCore
  • 被依赖: RecordPipeline 等模块

类分析

StreamWriter 类

职责与用途

StreamWriter 是流写入器的封装类,提供:

  • 双线程分段写入
  • 自动文件命名(如 output_001.h264)
  • 调度线程负责切片决策与分发
  • 写线程负责写入和释放 slot

设计模式

  • 生产者-消费者模式: 编码器生产 meta,StreamWriter 消费
  • RAII: FILEGuard 自动管理 FILE* 生命周期

常量定义

STREAMWRITER_DEFAULT_I_PACKETS_PER_SEGMENT

static constexpr size_t STREAMWRITER_DEFAULT_I_PACKETS_PER_SEGMENT = 60;

说明: 默认每片 I 帧包数量


STREAMWRITER_WRITER_COUNT

static constexpr size_t STREAMWRITER_WRITER_COUNT = 2;

说明: 写线程数量(固定为 2)


私有类定义

FILEGuard - 文件句柄保护类

class FILEGuard {
public:
    explicit FILEGuard(FILE *f);
    ~FILEGuard();
    FILE *get() const { return fp_; }
    void reset(FILE *f);
private:
    FILE *fp_ = nullptr;
    int fd = 0;
};

职责: RAII 管理文件句柄

方法:

  • get(): 获取文件指针
  • reset(): 重置文件指针

WriterCtx - 写线程上下文

struct WriterCtx {
    std::queue<MppEncoderCore::EncodedMeta> queue; ///< 待写 meta 队列
    std::mutex mtx;                                ///< 队列保护锁
    std::condition_variable cv;                    ///< 唤醒条件
    std::unique_ptr<FILEGuard> fp;                 ///< 当前写入段的文件指针
    int fd = -1;                                   ///< 当前流对应的文件描述符
};

字段说明:

  • queue: 待写 meta 队列
  • mtx: 队列保护锁
  • cv: 唤醒条件
  • fp: 当前写入段的文件指针
  • fd: 当前流对应的文件描述符

公共 API 方法

构造函数

explicit StreamWriter(const std::string &path);

参数说明:

  • path (输入): 输出基础路径,例如 "output.h264"

返回值: 无

所有权归属:

  • StreamWriter 拥有内部资源的所有权

注意事项:

  1. 构造后会启动调度线程与写线程
  2. 文件会自动分段命名(如 output_001.h264)
  3. 禁止复制

使用例程:

StreamWriter writer("output.h264");
// 会生成 output_001.h264, output_002.h264, ...

析构函数

~StreamWriter();

参数说明: 无

返回值: 无

所有权归属:

  • 资源自动释放

注意事项:

  1. 会安全停止所有线程
  2. 关闭所有文件

pushMeta() - 推入编码结果 meta

bool pushMeta(const MppEncoderCore::EncodedMeta &meta);

参数说明:

  • meta (输入): 来自 MppEncoderCore 的 EncodedMeta

返回值:

  • true: 推入成功
  • false: meta 无效或写器已停止

所有权归属:

  • StreamWriter 持有 meta 的所有权

注意事项:

  1. 将 meta 推入到调度队列
  2. 调度线程按顺序分发 meta 给当前写线程
  3. 每达到固定 packet 数(默认 60)切换到另一个写线程

使用例程:

StreamWriter writer("output.h264");

// 从编码器获取 meta
MppEncoderCore::EncodedMeta meta;
// ... 编码 ...

// 推入写入器
if (writer.pushMeta(meta)) {
    printf("Push meta success\n");
} else {
    printf("Push meta failed\n");
}

stop() - 停止写线程与调度线程

void stop();

参数说明: 无

返回值: 无

所有权归属:

  • 无所有权转移

注意事项:

  1. 停止调度线程
  2. 停止两个写线程
  3. 关闭所有文件

使用例程:

StreamWriter writer("output.h264");

// ... 写入 ...

writer.stop();

私有 API 方法

initThreads() - 初始化线程

bool initThreads();

返回值:

  • true: 成功
  • false: 失败

说明: 初始化调度线程和写线程


dispatchLoop() - 调度线程主循环

void dispatchLoop();

说明: 调度线程负责切片决策与分发


writerLoop() - 写线程主循环

void writerLoop(WriterCtx *ctx);

参数说明:

  • ctx (输入): 写线程上下文

说明: 写线程负责获取 packet,写入并释放 slot


openNewSegmentFor() - 打开下一个分段文件

bool openNewSegmentFor(WriterCtx *ctx);

参数说明:

  • ctx (输入): 写线程上下文

返回值:

  • true: 成功
  • false: 失败

说明: 为指定 writer 打开下一个分段文件


obtainPacketForMeta() - 等待并获取 packet

bool obtainPacketForMeta(MppEncoderCore::EncodedPacketPtr &packet,
                         MppEncoderCore::EncodedMeta &meta);

参数说明:

  • packet (输出): 编码包指针
  • meta (输入): 编码元数据

返回值:

  • true: 成功
  • false: 失败

说明: 等待并获取 packet


编码流程

写入流程图

1. 编码器生成 EncodedMeta
   ↓
2. pushMeta() 推入调度队列
   ↓
3. 调度线程分发 meta 给当前写线程
   ↓
4. 写线程获取 packet
   ↓
5. 写入文件
   ↓
6. 释放 slot
   ↓
7. 检查是否达到分段数量
   ↓
8. 切换到另一个写线程,创建新文件段

数据流

MppEncoderCore → EncodedMeta → StreamWriter → 调度线程 → 写线程A/B → 文件

文件命名规则

命名规则

输入 "output.h264" → 产生 "output_001.h264", "output_002.h264", ...

示例

StreamWriter writer("output.h264");
// 生成: output_001.h264, output_002.h264, output_003.h264, ...

线程安全说明

同步机制

  • 调度队列: 使用 std::mutexstd::condition_variable
  • 写线程队列: 每个 writer 有独立的锁和条件变量
  • 原子变量: 使用 std::atomic 保护计数器

线程安全建议

  • pushMeta() 是线程安全的
  • stop() 应该在所有 pushMeta() 完成后调用

典型使用场景

场景 1: 基本使用

StreamWriter writer("output.h264");

// 编码并写入
while (running) {
    auto meta = encoder.encode(frame);
    if (meta) {
        writer.pushMeta(*meta);
    }
}

writer.stop();

场景 2: 视频录制

StreamWriter writer("video.h264");

camera.setFrameCallback([&](FramePtr frame) {
    auto meta = encoder.encode(frame);
    if (meta) {
        writer.pushMeta(*meta);
    }
});

// 按键停止录制
button.onPress([&]() {
    writer.stop();
});

场景 3: 定时录制

StreamWriter writer("video.h264");

// 录制 10 秒
std::thread([&]() {
    auto start = std::chrono::steady_clock::now();
    while (std::chrono::steady_clock::now() - start < std::chrono::seconds(10)) {
        auto meta = encoder.encode(get_frame());
        if (meta) {
            writer.pushMeta(*meta);
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(33));
    }
    writer.stop();
}).detach();

注意事项

  1. 文件路径: 确保有写入权限
  2. 磁盘空间: 确保有足够磁盘空间
  3. 分段数量: 默认每 60 个 I 帧包切一次
  4. 线程安全: pushMeta() 是线程安全的
  5. 停止写入: 程序退出前调用 stop()
  6. 文件命名: 文件名自动生成,格式为 basename_XXX.suffix
  7. 双线程: 使用双线程写入,提高性能
  8. 禁止复制: 禁止复制和赋值

相关文档


参考资料

主页

API 文档

DMA 模块

DRM 模块

NET 模块

V4L2 模块

V4L2Param 模块

RGA 模块

MPP 模块

Sys 模块

Mouse 模块

Utils 模块

Clone this wiki locally