-
Notifications
You must be signed in to change notification settings - Fork 1
Utils_OrderedQueue
SweerItTer edited this page Feb 1, 2026
·
3 revisions
OrderedQueue 是 utilsCore Utils 模块的核心类,提供高性能无锁环形缓冲有序队列,支持多生产者并发入队和按 frame_id 顺序出队。
- 保持队列元素的 frame_id 顺序
- 无锁并发入队
- 环形缓冲 + CAS 操作
- 支持多种溢出策略
- 提供统计信息
- 乱序帧数据重组
- 需要保证帧序的流式处理
- 多生产者单消费者
- 网络乱序数据重组
- 依赖: 标准库
- 被依赖: VisionPipeline, RecordPipeline 等模块
OrderedQueue 是无锁环形缓冲有序队列的封装类,提供:
- 多生产者并发入队(lock-free CAS)
- 按 frame_id 顺序出队
- 环形缓冲避免 map 分配开销
- 可选丢弃策略
- 统计信息
- 无锁算法: 使用 CAS 操作
- 环形缓冲: 环形索引计算
- 生产者-消费者: 支持多生产者单消费者
容量超限或 slot 冲突处理策略。
enum class OverflowPolicy {
DISCARD_OLDEST, // 丢弃环形缓冲中旧的帧
DISCARD_NEWEST, // 丢弃当前入队帧
BLOCK, // 阻塞等待空槽
THROW_EXCEPTION // 抛异常
};explicit OrderedQueue(size_t capacity);参数说明:
-
capacity(输入): 环形缓冲大小(最好大于最大乱序跨度)
返回值: 无
所有权归属:
- OrderedQueue 拥有队列资源的所有权
注意事项:
- 容量会自动调整为 2 的幂
- 建议容量大于最大乱序跨度
使用例程:
// 创建容量为 100 的队列(实际会调整为 128)
OrderedQueue<FramePtr> queue(100);bool enqueue(uint64_t frame_id, T&& data, OverflowPolicy policy = OverflowPolicy::DISCARD_NEWEST);参数说明:
-
frame_id(输入): 当前帧 id -
data(输入): 帧数据 T(由外部管理) -
policy(输入): slot 冲突或容量超限时的处理策略
返回值:
-
true: 成功入队 -
false: 被丢弃
所有权归属:
- data 的所有权转移到队列
注意事项:
- 支持多生产者并发入队
- 使用 CAS 操作避免锁
- frame_id 小于 expected_id 时视为过期数据,返回 false
- slot 冲突时根据 policy 处理
使用例程:
OrderedQueue<FramePtr> queue(100);
// 入队
FramePtr frame = std::make_shared<Frame>();
bool success = queue.enqueue(frame->meta.frame_id, std::move(frame));
// 带溢出策略
bool success = queue.enqueue(frame_id, std::move(frame),
OrderedQueue<FramePtr>::OverflowPolicy::DISCARD_OLDEST);bool try_dequeue(T& data_out, int64_t timeout_ms = 0);参数说明:
-
data_out(输出): 输出参数,成功时为有效 T -
timeout_ms(输入): 超时毫秒,0 表示非阻塞立即返回
返回值:
-
true: 成功出队 -
false: 超时或队列为空
所有权归属:
- data_out 的所有权转移给调用者
注意事项:
- 按 expected_id 顺序取出数据
- timeout_ms = 0 表示非阻塞立即返回
- 单消费者或多消费者顺序出队
使用例程:
OrderedQueue<FramePtr> queue(100);
// 非阻塞出队
FramePtr frame;
if (queue.try_dequeue(frame)) {
process_frame(frame);
} else {
printf("No frame available\n");
}
// 超时出队(1 秒)
FramePtr frame;
if (queue.try_dequeue(frame, 1000)) {
process_frame(frame);
} else {
printf("Timeout\n");
}size_t size() const;参数说明: 无
返回值: 近似队列大小
所有权归属:
- 只读访问
注意事项:
- 近似值,不是精确值
- 计算方式:total_enqueued - total_dequeued
bool empty() const;参数说明: 无
返回值:
-
true: 队列为空 -
false: 队列不为空
所有权归属:
- 只读访问
uint64_t get_expected_id() const;参数说明: 无
返回值: 消费者期望的下一个 frame_id
所有权归属:
- 只读访问
使用例程:
uint64_t expected = queue.get_expected_id();
printf("Expected frame_id: %lu\n", expected);struct Stats {
uint64_t total_enqueued; // 总入队数
uint64_t total_dequeued; // 总出队数
uint64_t timeout_skip; // 超时跳过数
uint64_t slot_conflict; // slot 冲突数
uint64_t pending; // pending 数量
double timeout_rate; // 超时率
double conflict_rate; // 冲突率
};Stats get_stats() const;参数说明: 无
返回值: 统计信息
所有权归属:
- 只读访问
使用例程:
auto stats = queue.get_stats();
printf("Enqueued: %lu, Dequeued: %lu, Pending: %lu\n",
stats.total_enqueued, stats.total_dequeued, stats.pending);void print_stats() const;参数说明: 无
返回值: 无
使用例程:
queue.print_stats();
// 输出:
// ===== OrderedQueue Statistics =====
// Enqueued: 1000
// Dequeued: 950
// Pending: 50
// Timeout skip: 10 (1.05%)
// Slot conflict: 20 (2.00%)
// ====================================void reset_stats();参数说明: 无
返回值: 无
使用例程:
queue.reset_stats();struct BufferSlot {
std::atomic<bool>* filled = nullptr; // true 表示 slot 已被占用
uint64_t frame_id{0}; // 当前帧 id
T data_; // 外部管理的帧数据
};size_t idx = frame_id & (capacity_ - 1); // 环形索引计算bool expected = false;
while (!ring_buffer_[idx].filled->compare_exchange_weak(
expected, true, std::memory_order_acq_rel))
{
// slot 已被占用,根据 policy 处理
}- 入队操作: 使用 CAS 操作(lock-free)
- 出队操作: 单消费者或多消费者顺序出队
- 统计信息: 使用原子操作
- 可以并发调用 enqueue()
- try_dequeue() 需要外部同步(如果多消费者)
- 所有统计操作都是线程安全的
OrderedQueue<FramePtr> queue(100);
// 生产者
for (uint64_t i = 0; i < 100; ++i) {
FramePtr frame = std::make_shared<Frame>();
frame->meta.frame_id = i;
queue.enqueue(i, std::move(frame));
}
// 消费者
FramePtr frame;
while (queue.try_dequeue(frame)) {
process_frame(frame);
}OrderedQueue<FramePtr> queue(100);
// 乱序入队
queue.enqueue(100, frame100);
queue.enqueue(98, frame98);
queue.enqueue(99, frame99);
// 按顺序出队(98, 99, 100)
FramePtr frame;
while (queue.try_dequeue(frame, 1000)) {
printf("Frame %lu\n", frame->meta.frame_id);
}OrderedQueue<FramePtr> queue(100);
// 丢弃最旧的
queue.enqueue(frame_id, std::move(frame),
OrderedQueue<FramePtr>::OverflowPolicy::DISCARD_OLDEST);
// 丢弃最新的
queue.enqueue(frame_id, std::move(frame),
OrderedQueue<FramePtr>::OverflowPolicy::DISCARD_NEWEST);
// 阻塞等待
queue.enqueue(frame_id, std::move(frame),
OrderedQueue<FramePtr>::OverflowPolicy::BLOCK);
// 抛异常
try {
queue.enqueue(frame_id, std::move(frame),
OrderedQueue<FramePtr>::OverflowPolicy::THROW_EXCEPTION);
} catch (const std::runtime_error& e) {
printf("Error: %s\n", e.what());
}OrderedQueue<FramePtr> queue(100);
// 运行一段时间...
// 打印统计信息
queue.print_stats();
// 获取详细统计
auto stats = queue.get_stats();
printf("Conflict rate: %.2f%%\n", stats.conflict_rate * 100);
printf("Timeout rate: %.2f%%\n", stats.timeout_rate * 100);OrderedQueue<FramePtr> queue(100);
// 多个生产者
std::vector<std::thread> producers;
for (int i = 0; i < 4; ++i) {
producers.emplace_back([i, &queue]() {
for (uint64_t j = 0; j < 100; ++j) {
FramePtr frame = std::make_shared<Frame>();
frame->meta.frame_id = j;
queue.enqueue(j, std::move(frame));
}
});
}
// 消费者
FramePtr frame;
while (queue.try_dequeue(frame, 1000)) {
process_frame(frame);
}
for (auto& t : producers) t.join();- 容量建议: 容量建议为 2 的幂,方便快速索引计算
- 乱序度: 高乱序入队可能导致 slot 冲突
- 溢出策略: 根据场景选择合适的溢出策略
- frame_id: frame_id 必须单调递增
- 过期数据: frame_id 小于 expected_id 时视为过期数据
- 无锁入队: enqueue() 使用 CAS 操作,支持多生产者
- 顺序出队: try_dequeue() 按 expected_id 顺序出队
- 统计信息: 统计信息使用原子操作,线程安全
- SafeQueue - 安全队列
- ConcurrentQueue - 并发队列
- Utils 模块总览
主页
API 文档
DMA 模块
DRM 模块
- DRM 模块总览
- DeviceController - DRM 设备控制器
- DrmLayer - DRM 图层管理
- PlanesCompositor - DRM 平面合成器
- DrmBpp - DRM 格式定义
NET 模块
- NET 模块总览
- TcpServer - TCP 服务器
- SocketConnection - Socket 连接管理
- CommandHandler - 命令处理器
- DataPacket - 数据包
V4L2 模块
- V4L2 模块总览
- CameraController - V4L2 摄像头控制器
- Frame - V4L2 帧数据结构
- FormatTool - V4L2 格式工具
- Exception - V4L2 异常类
V4L2Param 模块
- V4L2Param 模块总览
- ParamControl - 参数控制
- ParamLogger - 参数日志
- ParamProcessor - 参数处理器
RGA 模块
- RGA 模块总览
- RgaConverter - RGA 转换器
- RgaProcessor - RGA 处理器
- FormatTool - RGA 格式工具
MPP 模块
- MPP 模块总览
- EncoderContext - 编码器上下文
- EncoderCore - 编码器核心
- JpegEncoder - JPEG 编码器
- StreamWriter - 流写入器
- MppResourceGuard - MPP 资源守护
- FileTools - 文件工具
- FormatTool - 格式工具
Sys 模块
- Sys 模块总览
- CpuMonitor - CPU 监控器
- MemoryMonitor - 内存监控器
- Base - 基础类
Mouse 模块
- Mouse 模块总览
- Watcher - 鼠标监视器
Utils 模块
- Utils 模块总览
- AsyncThreadPool - 异步线程池
- ConcurrentQueue - 并发队列
- FdWrapper - 文件描述符包装器
- FenceWatcher - 围栏监视器
- FixedSizePool - 固定大小对象池
- Logger - 日志记录器
- ObjectsPool - 对象池
- OrderedQueue - 有序队列
- ProgressBar - 进度条
- SafeQueue - 安全队列
- SharedBufferState - 共享缓冲区状态
- SimpleVariant - 简单变体类型
- ThreadPauser - 线程暂停器
- ThreadUtils - 线程工具
- Types - 类型定义
- UdevMonitor - Udev 监视器