-
Notifications
You must be signed in to change notification settings - Fork 1
MPP_StreamWriter
SweerItTer edited this page Feb 1, 2026
·
3 revisions
StreamWriter 是 utilsCore MPP 模块的核心类,提供双线程分段写入功能,支持将编码器生成的 EncodedMeta 推入队列并自动分段写入文件。
- 分段写入编码数据
- 双线程写入(writerA / writerB)
- 自动文件分段
- 调度线程分发
- 视频录制
- 流媒体输出
- 大文件分段保存
- 依赖: MppEncoderCore
- 被依赖: RecordPipeline 等模块
StreamWriter 是流写入器的封装类,提供:
- 双线程分段写入
- 自动文件命名(如 output_001.h264)
- 调度线程负责切片决策与分发
- 写线程负责写入和释放 slot
- 生产者-消费者模式: 编码器生产 meta,StreamWriter 消费
- RAII: FILEGuard 自动管理 FILE* 生命周期
static constexpr size_t STREAMWRITER_DEFAULT_I_PACKETS_PER_SEGMENT = 60;说明: 默认每片 I 帧包数量
static constexpr size_t STREAMWRITER_WRITER_COUNT = 2;说明: 写线程数量(固定为 2)
class FILEGuard {
public:
explicit FILEGuard(FILE *f);
~FILEGuard();
FILE *get() const { return fp_; }
void reset(FILE *f);
private:
FILE *fp_ = nullptr;
int fd = 0;
};职责: RAII 管理文件句柄
方法:
-
get(): 获取文件指针 -
reset(): 重置文件指针
struct WriterCtx {
std::queue<MppEncoderCore::EncodedMeta> queue; ///< 待写 meta 队列
std::mutex mtx; ///< 队列保护锁
std::condition_variable cv; ///< 唤醒条件
std::unique_ptr<FILEGuard> fp; ///< 当前写入段的文件指针
int fd = -1; ///< 当前流对应的文件描述符
};字段说明:
-
queue: 待写 meta 队列 -
mtx: 队列保护锁 -
cv: 唤醒条件 -
fp: 当前写入段的文件指针 -
fd: 当前流对应的文件描述符
explicit StreamWriter(const std::string &path);参数说明:
-
path(输入): 输出基础路径,例如 "output.h264"
返回值: 无
所有权归属:
- StreamWriter 拥有内部资源的所有权
注意事项:
- 构造后会启动调度线程与写线程
- 文件会自动分段命名(如 output_001.h264)
- 禁止复制
使用例程:
StreamWriter writer("output.h264");
// 会生成 output_001.h264, output_002.h264, ...~StreamWriter();参数说明: 无
返回值: 无
所有权归属:
- 资源自动释放
注意事项:
- 会安全停止所有线程
- 关闭所有文件
bool pushMeta(const MppEncoderCore::EncodedMeta &meta);参数说明:
-
meta(输入): 来自 MppEncoderCore 的 EncodedMeta
返回值:
-
true: 推入成功 -
false: meta 无效或写器已停止
所有权归属:
- StreamWriter 持有 meta 的所有权
注意事项:
- 将 meta 推入到调度队列
- 调度线程按顺序分发 meta 给当前写线程
- 每达到固定 packet 数(默认 60)切换到另一个写线程
使用例程:
StreamWriter writer("output.h264");
// 从编码器获取 meta
MppEncoderCore::EncodedMeta meta;
// ... 编码 ...
// 推入写入器
if (writer.pushMeta(meta)) {
printf("Push meta success\n");
} else {
printf("Push meta failed\n");
}void stop();参数说明: 无
返回值: 无
所有权归属:
- 无所有权转移
注意事项:
- 停止调度线程
- 停止两个写线程
- 关闭所有文件
使用例程:
StreamWriter writer("output.h264");
// ... 写入 ...
writer.stop();bool initThreads();返回值:
-
true: 成功 -
false: 失败
说明: 初始化调度线程和写线程
void dispatchLoop();说明: 调度线程负责切片决策与分发
void writerLoop(WriterCtx *ctx);参数说明:
-
ctx(输入): 写线程上下文
说明: 写线程负责获取 packet,写入并释放 slot
bool openNewSegmentFor(WriterCtx *ctx);参数说明:
-
ctx(输入): 写线程上下文
返回值:
-
true: 成功 -
false: 失败
说明: 为指定 writer 打开下一个分段文件
bool obtainPacketForMeta(MppEncoderCore::EncodedPacketPtr &packet,
MppEncoderCore::EncodedMeta &meta);参数说明:
-
packet(输出): 编码包指针 -
meta(输入): 编码元数据
返回值:
-
true: 成功 -
false: 失败
说明: 等待并获取 packet
1. 编码器生成 EncodedMeta
↓
2. pushMeta() 推入调度队列
↓
3. 调度线程分发 meta 给当前写线程
↓
4. 写线程获取 packet
↓
5. 写入文件
↓
6. 释放 slot
↓
7. 检查是否达到分段数量
↓
8. 切换到另一个写线程,创建新文件段
MppEncoderCore → EncodedMeta → StreamWriter → 调度线程 → 写线程A/B → 文件
输入 "output.h264" → 产生 "output_001.h264", "output_002.h264", ...
StreamWriter writer("output.h264");
// 生成: output_001.h264, output_002.h264, output_003.h264, ...-
调度队列: 使用
std::mutex和std::condition_variable - 写线程队列: 每个 writer 有独立的锁和条件变量
-
原子变量: 使用
std::atomic保护计数器
- pushMeta() 是线程安全的
- stop() 应该在所有 pushMeta() 完成后调用
StreamWriter writer("output.h264");
// 编码并写入
while (running) {
auto meta = encoder.encode(frame);
if (meta) {
writer.pushMeta(*meta);
}
}
writer.stop();StreamWriter writer("video.h264");
camera.setFrameCallback([&](FramePtr frame) {
auto meta = encoder.encode(frame);
if (meta) {
writer.pushMeta(*meta);
}
});
// 按键停止录制
button.onPress([&]() {
writer.stop();
});StreamWriter writer("video.h264");
// 录制 10 秒
std::thread([&]() {
auto start = std::chrono::steady_clock::now();
while (std::chrono::steady_clock::now() - start < std::chrono::seconds(10)) {
auto meta = encoder.encode(get_frame());
if (meta) {
writer.pushMeta(*meta);
}
std::this_thread::sleep_for(std::chrono::milliseconds(33));
}
writer.stop();
}).detach();- 文件路径: 确保有写入权限
- 磁盘空间: 确保有足够磁盘空间
- 分段数量: 默认每 60 个 I 帧包切一次
- 线程安全: pushMeta() 是线程安全的
- 停止写入: 程序退出前调用 stop()
-
文件命名: 文件名自动生成,格式为
basename_XXX.suffix - 双线程: 使用双线程写入,提高性能
- 禁止复制: 禁止复制和赋值
- EncoderContext - 编码器上下文
- EncoderCore - 编码器核心
- MPP 模块总览
主页
API 文档
DMA 模块
DRM 模块
- DRM 模块总览
- DeviceController - DRM 设备控制器
- DrmLayer - DRM 图层管理
- PlanesCompositor - DRM 平面合成器
- DrmBpp - DRM 格式定义
NET 模块
- NET 模块总览
- TcpServer - TCP 服务器
- SocketConnection - Socket 连接管理
- CommandHandler - 命令处理器
- DataPacket - 数据包
V4L2 模块
- V4L2 模块总览
- CameraController - V4L2 摄像头控制器
- Frame - V4L2 帧数据结构
- FormatTool - V4L2 格式工具
- Exception - V4L2 异常类
V4L2Param 模块
- V4L2Param 模块总览
- ParamControl - 参数控制
- ParamLogger - 参数日志
- ParamProcessor - 参数处理器
RGA 模块
- RGA 模块总览
- RgaConverter - RGA 转换器
- RgaProcessor - RGA 处理器
- FormatTool - RGA 格式工具
MPP 模块
- MPP 模块总览
- EncoderContext - 编码器上下文
- EncoderCore - 编码器核心
- JpegEncoder - JPEG 编码器
- StreamWriter - 流写入器
- MppResourceGuard - MPP 资源守护
- FileTools - 文件工具
- FormatTool - 格式工具
Sys 模块
- Sys 模块总览
- CpuMonitor - CPU 监控器
- MemoryMonitor - 内存监控器
- Base - 基础类
Mouse 模块
- Mouse 模块总览
- Watcher - 鼠标监视器
Utils 模块
- Utils 模块总览
- AsyncThreadPool - 异步线程池
- ConcurrentQueue - 并发队列
- FdWrapper - 文件描述符包装器
- FenceWatcher - 围栏监视器
- FixedSizePool - 固定大小对象池
- Logger - 日志记录器
- ObjectsPool - 对象池
- OrderedQueue - 有序队列
- ProgressBar - 进度条
- SafeQueue - 安全队列
- SharedBufferState - 共享缓冲区状态
- SimpleVariant - 简单变体类型
- ThreadPauser - 线程暂停器
- ThreadUtils - 线程工具
- Types - 类型定义
- UdevMonitor - Udev 监视器