Skip to content

Utils_OrderedQueue

SweerItTer edited this page Feb 1, 2026 · 3 revisions

OrderedQueue API 文档

概述

OrderedQueue 是 utilsCore Utils 模块的核心类,提供高性能无锁环形缓冲有序队列,支持多生产者并发入队和按 frame_id 顺序出队。

职责

  • 保持队列元素的 frame_id 顺序
  • 无锁并发入队
  • 环形缓冲 + CAS 操作
  • 支持多种溢出策略
  • 提供统计信息

适用场景

  • 乱序帧数据重组
  • 需要保证帧序的流式处理
  • 多生产者单消费者
  • 网络乱序数据重组

依赖关系

  • 依赖: 标准库
  • 被依赖: VisionPipeline, RecordPipeline 等模块

类分析

OrderedQueue 类

职责与用途

OrderedQueue 是无锁环形缓冲有序队列的封装类,提供:

  • 多生产者并发入队(lock-free CAS)
  • 按 frame_id 顺序出队
  • 环形缓冲避免 map 分配开销
  • 可选丢弃策略
  • 统计信息

设计模式

  • 无锁算法: 使用 CAS 操作
  • 环形缓冲: 环形索引计算
  • 生产者-消费者: 支持多生产者单消费者

OverflowPolicy 枚举

容量超限或 slot 冲突处理策略。

enum class OverflowPolicy {
    DISCARD_OLDEST,  // 丢弃环形缓冲中旧的帧
    DISCARD_NEWEST,  // 丢弃当前入队帧
    BLOCK,           // 阻塞等待空槽
    THROW_EXCEPTION  // 抛异常
};

公共 API 方法

构造函数

explicit OrderedQueue(size_t capacity);

参数说明:

  • capacity (输入): 环形缓冲大小(最好大于最大乱序跨度)

返回值: 无

所有权归属:

  • OrderedQueue 拥有队列资源的所有权

注意事项:

  1. 容量会自动调整为 2 的幂
  2. 建议容量大于最大乱序跨度

使用例程:

// 创建容量为 100 的队列(实际会调整为 128)
OrderedQueue<FramePtr> queue(100);

enqueue() - 入队操作

bool enqueue(uint64_t frame_id, T&& data, OverflowPolicy policy = OverflowPolicy::DISCARD_NEWEST);

参数说明:

  • frame_id (输入): 当前帧 id
  • data (输入): 帧数据 T(由外部管理)
  • policy (输入): slot 冲突或容量超限时的处理策略

返回值:

  • true: 成功入队
  • false: 被丢弃

所有权归属:

  • data 的所有权转移到队列

注意事项:

  1. 支持多生产者并发入队
  2. 使用 CAS 操作避免锁
  3. frame_id 小于 expected_id 时视为过期数据,返回 false
  4. slot 冲突时根据 policy 处理

使用例程:

OrderedQueue<FramePtr> queue(100);

// 入队
FramePtr frame = std::make_shared<Frame>();
bool success = queue.enqueue(frame->meta.frame_id, std::move(frame));

// 带溢出策略
bool success = queue.enqueue(frame_id, std::move(frame), 
    OrderedQueue<FramePtr>::OverflowPolicy::DISCARD_OLDEST);

try_dequeue() - 按顺序出队

bool try_dequeue(T& data_out, int64_t timeout_ms = 0);

参数说明:

  • data_out (输出): 输出参数,成功时为有效 T
  • timeout_ms (输入): 超时毫秒,0 表示非阻塞立即返回

返回值:

  • true: 成功出队
  • false: 超时或队列为空

所有权归属:

  • data_out 的所有权转移给调用者

注意事项:

  1. 按 expected_id 顺序取出数据
  2. timeout_ms = 0 表示非阻塞立即返回
  3. 单消费者或多消费者顺序出队

使用例程:

OrderedQueue<FramePtr> queue(100);

// 非阻塞出队
FramePtr frame;
if (queue.try_dequeue(frame)) {
    process_frame(frame);
} else {
    printf("No frame available\n");
}

// 超时出队(1 秒)
FramePtr frame;
if (queue.try_dequeue(frame, 1000)) {
    process_frame(frame);
} else {
    printf("Timeout\n");
}

size() - 近似队列大小

size_t size() const;

参数说明: 无

返回值: 近似队列大小

所有权归属:

  • 只读访问

注意事项:

  1. 近似值,不是精确值
  2. 计算方式:total_enqueued - total_dequeued

empty() - 检查队列是否为空

bool empty() const;

参数说明: 无

返回值:

  • true: 队列为空
  • false: 队列不为空

所有权归属:

  • 只读访问

get_expected_id() - 获取期望的 frame_id

uint64_t get_expected_id() const;

参数说明: 无

返回值: 消费者期望的下一个 frame_id

所有权归属:

  • 只读访问

使用例程:

uint64_t expected = queue.get_expected_id();
printf("Expected frame_id: %lu\n", expected);

统计接口

Stats 结构体

struct Stats {
    uint64_t total_enqueued;   // 总入队数
    uint64_t total_dequeued;   // 总出队数
    uint64_t timeout_skip;     // 超时跳过数
    uint64_t slot_conflict;    // slot 冲突数
    uint64_t pending;          // pending 数量
    double timeout_rate;       // 超时率
    double conflict_rate;      // 冲突率
};

get_stats() - 获取统计信息

Stats get_stats() const;

参数说明: 无

返回值: 统计信息

所有权归属:

  • 只读访问

使用例程:

auto stats = queue.get_stats();
printf("Enqueued: %lu, Dequeued: %lu, Pending: %lu\n", 
       stats.total_enqueued, stats.total_dequeued, stats.pending);

print_stats() - 打印统计信息

void print_stats() const;

参数说明: 无

返回值: 无

使用例程:

queue.print_stats();
// 输出:
// ===== OrderedQueue Statistics =====
// Enqueued:       1000
// Dequeued:       950
// Pending:        50
// Timeout skip:   10 (1.05%)
// Slot conflict:  20 (2.00%)
// ====================================

reset_stats() - 重置统计信息

void reset_stats();

参数说明: 无

返回值: 无

使用例程:

queue.reset_stats();

内部实现

环形缓冲槽

struct BufferSlot {
    std::atomic<bool>* filled = nullptr;  // true 表示 slot 已被占用
    uint64_t frame_id{0};                 // 当前帧 id
    T data_;                              // 外部管理的帧数据
};

环形索引计算

size_t idx = frame_id & (capacity_ - 1);  // 环形索引计算

CAS 操作

bool expected = false;
while (!ring_buffer_[idx].filled->compare_exchange_weak(
    expected, true, std::memory_order_acq_rel))
{
    // slot 已被占用,根据 policy 处理
}

线程安全说明

同步机制

  • 入队操作: 使用 CAS 操作(lock-free)
  • 出队操作: 单消费者或多消费者顺序出队
  • 统计信息: 使用原子操作

线程安全建议

  • 可以并发调用 enqueue()
  • try_dequeue() 需要外部同步(如果多消费者)
  • 所有统计操作都是线程安全的

典型使用场景

场景 1: 基本使用

OrderedQueue<FramePtr> queue(100);

// 生产者
for (uint64_t i = 0; i < 100; ++i) {
    FramePtr frame = std::make_shared<Frame>();
    frame->meta.frame_id = i;
    queue.enqueue(i, std::move(frame));
}

// 消费者
FramePtr frame;
while (queue.try_dequeue(frame)) {
    process_frame(frame);
}

场景 2: 乱序重组

OrderedQueue<FramePtr> queue(100);

// 乱序入队
queue.enqueue(100, frame100);
queue.enqueue(98, frame98);
queue.enqueue(99, frame99);

// 按顺序出队(98, 99, 100)
FramePtr frame;
while (queue.try_dequeue(frame, 1000)) {
    printf("Frame %lu\n", frame->meta.frame_id);
}

场景 3: 溢出策略

OrderedQueue<FramePtr> queue(100);

// 丢弃最旧的
queue.enqueue(frame_id, std::move(frame), 
    OrderedQueue<FramePtr>::OverflowPolicy::DISCARD_OLDEST);

// 丢弃最新的
queue.enqueue(frame_id, std::move(frame), 
    OrderedQueue<FramePtr>::OverflowPolicy::DISCARD_NEWEST);

// 阻塞等待
queue.enqueue(frame_id, std::move(frame), 
    OrderedQueue<FramePtr>::OverflowPolicy::BLOCK);

// 抛异常
try {
    queue.enqueue(frame_id, std::move(frame), 
        OrderedQueue<FramePtr>::OverflowPolicy::THROW_EXCEPTION);
} catch (const std::runtime_error& e) {
    printf("Error: %s\n", e.what());
}

场景 4: 统计监控

OrderedQueue<FramePtr> queue(100);

// 运行一段时间...

// 打印统计信息
queue.print_stats();

// 获取详细统计
auto stats = queue.get_stats();
printf("Conflict rate: %.2f%%\n", stats.conflict_rate * 100);
printf("Timeout rate: %.2f%%\n", stats.timeout_rate * 100);

场景 5: 多生产者

OrderedQueue<FramePtr> queue(100);

// 多个生产者
std::vector<std::thread> producers;
for (int i = 0; i < 4; ++i) {
    producers.emplace_back([i, &queue]() {
        for (uint64_t j = 0; j < 100; ++j) {
            FramePtr frame = std::make_shared<Frame>();
            frame->meta.frame_id = j;
            queue.enqueue(j, std::move(frame));
        }
    });
}

// 消费者
FramePtr frame;
while (queue.try_dequeue(frame, 1000)) {
    process_frame(frame);
}

for (auto& t : producers) t.join();

注意事项

  1. 容量建议: 容量建议为 2 的幂,方便快速索引计算
  2. 乱序度: 高乱序入队可能导致 slot 冲突
  3. 溢出策略: 根据场景选择合适的溢出策略
  4. frame_id: frame_id 必须单调递增
  5. 过期数据: frame_id 小于 expected_id 时视为过期数据
  6. 无锁入队: enqueue() 使用 CAS 操作,支持多生产者
  7. 顺序出队: try_dequeue() 按 expected_id 顺序出队
  8. 统计信息: 统计信息使用原子操作,线程安全

相关文档


参考资料

主页

API 文档

DMA 模块

DRM 模块

NET 模块

V4L2 模块

V4L2Param 模块

RGA 模块

MPP 模块

Sys 模块

Mouse 模块

Utils 模块

Clone this wiki locally