Skip to content

Utils_SafeQueue

SweerItTer edited this page Feb 21, 2026 · 4 revisions

SafeQueue API 文档

概述

SafeQueue 是 utilsCore 核心模块的线程安全队列工具类,提供多线程安全的队列操作,针对智能指针有特化版本。

职责

  • 提供线程安全的队列操作
  • 支持智能指针特化
  • 循环队列实现
  • 溢出策略支持

适用场景

  • 生产者-消费者模式
  • 多线程数据交换
  • 智能指针队列
  • 缓冲区管理

依赖关系

  • 依赖: C++ STL
  • 被依赖: CameraController, VisionPipeline 等多个模块

类分析

SafeQueue 类

职责与用途

SafeQueue 是线程安全队列的封装类,提供:

  • 针对智能指针的特化版本
  • 循环队列实现
  • 溢出策略支持
  • 阻塞和非阻塞模式

设计模式

  • 生产者-消费者模式: 队列作为缓冲区
  • RAII: 自动管理资源
  • 模板特化: 针对智能指针的特化版本

模板特化

主模板

template <typename T, typename Enable = void>
class SafeQueue;

智能指针特化版本

template <typename Ptr>
class SafeQueue<Ptr, typename std::enable_if<
    std::is_same<Ptr, std::shared_ptr<typename Ptr::element_type>>::value ||
    std::is_same<Ptr, std::unique_ptr<typename Ptr::element_type>>::value
>::type>;

说明:

  • 针对 std::shared_ptrstd::unique_ptr 特化
  • 使用 SFINAE 机制实现
  • typename Ptr::element_type 获取源指针类型

OverflowPolicy 枚举

溢出策略枚举,定义队列满时的行为。

enum class OverflowPolicy {
    DISCARD_OLDEST,  // 丢弃最旧的项目
    DISCARD_NEWEST,  // 丢弃最新的项目
    BLOCK,           // 阻塞直到有空间
    THROW_EXCEPTION  // 抛出异常
};

公共 API 方法

构造函数

explicit SafeQueue(size_t capacity, OverflowPolicy policy = OverflowPolicy::DISCARD_OLDEST);
~SafeQueue();

// 禁止拷贝
SafeQueue(const SafeQueue&) = delete;
SafeQueue& operator=(const SafeQueue&) = delete;

// 允许移动
SafeQueue(SafeQueue&& other) noexcept;
SafeQueue& operator=(SafeQueue&& other) noexcept;

参数说明:

  • capacity (输入): 队列容量
  • policy (输入): 溢出策略
  • other (输入): 要移动的源队列

返回值: 无

所有权归属:

  • SafeQueue 拥有队列元素的所有权
  • 移动操作会转移所有权

注意事项:

  1. capacity 必须大于 0
  2. 默认溢出策略为 DISCARD_OLDEST
  3. 析构时会清空队列
  4. 禁止拷贝构造和拷贝赋值
  5. 支持移动构造和移动赋值
  6. 移动操作是线程安全的(使用双锁避免死锁)
  7. 移动构造函数通过调用移动赋值运算符实现
  8. 移动赋值运算符使用双锁机制(std::lock)避免对称死锁
  9. 移动后源对象会被重置为空状态(head_=0, tail_=0, size_=0, capacity_=0, shutdown_=true)
  10. 移动操作会转移所有队列元素的所有权

使用例程:

// 创建队列,容量为 100,溢出时丢弃最旧的
SafeQueue<FramePtr> queue(100, SafeQueue<FramePtr>::OverflowPolicy::DISCARD_OLDEST);

// 创建队列,容量为 100,溢出时阻塞
SafeQueue<FramePtr> queue(100, SafeQueue<FramePtr>::OverflowPolicy::BLOCK);

// 移动构造
SafeQueue<FramePtr> queue2 = std::move(queue);

// 移动赋值
SafeQueue<FramePtr> queue3;
queue3 = std::move(queue2);

enqueue() - 入队

bool enqueue(Ptr&& item);

参数说明:

  • item (输入): 要入队的元素(右值引用)

返回值:

  • true: 入队成功
  • false: 入队失败(队列已关闭或溢出策略为 DISCARD_NEWEST)

所有权归属:

  • 元素的所有权转移给队列

注意事项:

  1. 使用移动语义
  2. 如果队列已满,根据溢出策略处理
  3. 如果队列已关闭,返回 false

使用例程:

SafeQueue<FramePtr> queue(100);

// 入队
FramePtr frame = std::make_shared<Frame>();
queue.enqueue(std::move(frame));

dequeue() - 阻塞式出队

Ptr dequeue();

参数说明: 无

返回值: 队列中的元素(智能指针)

所有权归属:

  • 元素的所有权转移给调用者

注意事项:

  1. 如果队列为空,会阻塞直到有元素
  2. 如果队列已关闭且为空,返回 nullptr
  3. 线程安全

使用例程:

SafeQueue<FramePtr> queue;

// 出队(阻塞)
FramePtr frame = queue.dequeue();
if (frame) {
    process_frame(frame);
}

try_dequeue() - 非阻塞出队

bool try_dequeue(Ptr& item);

参数说明:

  • item (输出): 出队的元素

返回值:

  • true: 出队成功
  • false: 队列为空

所有权归属:

  • 元素的所有权转移给调用者

注意事项:

  1. 如果队列为空,立即返回 false
  2. 线程安全
  3. 不会阻塞

使用例程:

SafeQueue<FramePtr> queue;

// 出队(非阻塞)
FramePtr frame;
if (queue.try_dequeue(frame)) {
    process_frame(frame);
} else {
    printf("Queue is empty\n");
}

front() - 获取队首元素指针

element* front() const;

参数说明: 无

返回值: 队首元素的原始指针(不转移所有权)

所有权归属:

  • 只读访问

注意事项:

  1. 如果队列为空,返回 nullptr
  2. 返回的是原始指针,不是智能指针
  3. 不要持有这个指针超过队列生命周期

使用例程:

SafeQueue<FramePtr> queue;

Frame* frame_ptr = queue.front();
if (frame_ptr) {
    printf("Front frame width: %d\n", frame_ptr->meta.w);
}

clear() - 清空队列

void clear();

参数说明: 无

返回值: 无

所有权归属:

  • 销毁所有元素

注意事项:

  1. 清空队列
  2. 重置 head_, tail_, size_
  3. 通知 not_full_cond_

size() - 获取队列大小

size_t size() const;

参数说明: 无

返回值: 队列大小

所有权归属:

  • 只读访问

empty() - 检查队列是否为空

bool empty() const;

参数说明: 无

返回值:

  • true: 队列为空
  • false: 队列不为空

所有权归属:

  • 只读访问

shutdown() - 关闭队列

void shutdown();

参数说明: 无

返回值: 无

所有权归属:

  • 无所有权转移

注意事项:

  1. 设置 shutdown_ 标志
  2. 唤醒所有等待的线程
  3. 关闭后不再允许入队

使用例程:

SafeQueue<FramePtr> queue;

// 关闭队列
queue.shutdown();

getBufferRealSize() - 获取缓冲区实际大小

size_t getBufferRealSize();

参数说明: 无

返回值: 缓冲区实际大小(vector 的 capacity)

所有权归属:

  • 只读访问

注意事项:

  1. 返回 vector 的 capacity
  2. 不是队列的实际元素数量
  3. 用于调试和性能分析

内部实现

数据成员

template <typename Ptr>
class SafeQueue<Ptr, ...> {
private:
    std::vector<Ptr> buffer_;               // 底层存储
    size_t head_ = 0;                       // 队列头部索引
    size_t tail_ = 0;                       // 队列尾部索引
    size_t size_ = 0;                       // 当前队列大小
    size_t capacity_ = 0;                   // 队列容量
    
    std::atomic<bool> shutdown_{false};     // 关闭标志
    mutable std::mutex mutex_;              // 互斥锁
    std::condition_variable not_empty_cond_; // 非空条件变量
    std::condition_variable not_full_cond_;  // 非满条件变量
    
    OverflowPolicy policy_;                 // 溢出策略
};

循环队列实现

buffer_: [0] [1] [2] [3] [4] [5] [6] [7] [9]
          ^                   ^
         tail_              head_

入队: buffer_[tail_] = item; tail_ = (tail_ + 1) % capacity_
出队: item = buffer_[head_]; head_ = (head_ + 1) % capacity_

线程安全说明

同步机制

  • 队列操作: 使用 std::mutex 保护队列
  • 条件变量: 使用 std::condition_variable 通知线程
  • 原子操作: std::atomic<bool> 保护 shutdown 标志

线程安全建议

  • 可以并发调用 enqueue()dequeue()
  • 可以并发调用 enqueue()enqueue()
  • 可以并发调用 dequeue()dequeue()
  • 所有操作都是线程安全的

典型使用场景

场景 1: 基本使用

SafeQueue<FramePtr> queue(100);

// 生产者
std::thread producer([&]() {
    for (int i = 0; i < 100; ++i) {
        FramePtr frame = std::make_shared<Frame>();
        queue.enqueue(std::move(frame));
    }
});

// 消费者
std::thread consumer([&]() {
    while (true) {
        FramePtr frame = queue.dequeue();
        if (frame) {
            process_frame(frame);
        } else {
            break;  // 队列已关闭
        }
    }
});

producer.join();
queue.shutdown();
consumer.join();

场景 2: 溢出策略

// 丢弃最旧的
SafeQueue<FramePtr> queue(100, SafeQueue<FramePtr>::OverflowPolicy::DISCARD_OLDEST);

// 丢弃最新的
SafeQueue<FramePtr> queue(100, SafeQueue<FramePtr>::OverflowPolicy::DISCARD_NEWEST);

// 阻塞
SafeQueue<FramePtr> queue(100, SafeQueue<FramePtr>::OverflowPolicy::BLOCK);

// 抛出异常
SafeQueue<FramePtr> queue(100, SafeQueue<FramePtr>::OverflowPolicy::THROW_EXCEPTION);

场景 3: 非阻塞出队

SafeQueue<FramePtr> queue(100);

// 非阻塞出队
FramePtr frame;
if (queue.try_dequeue(frame)) {
    process_frame(frame);
} else {
    printf("Queue is empty, skipping\n");
}

场景 4: 查看队首

SafeQueue<FramePtr> queue(100);

// 查看队首
Frame* frame_ptr = queue.front();
if (frame_ptr) {
    printf("Front frame width: %d\n", frame_ptr->meta.w);
}

场景 5: 关闭队列

SafeQueue<FramePtr> queue(100);

// 关闭队列
queue.shutdown();

// 关闭后无法入队
FramePtr frame = std::make_shared<Frame>();
bool success = queue.enqueue(std::move(frame));  // 返回 false

注意事项

  1. 智能指针特化: SafeQueue 只针对智能指针有特化版本
  2. 循环队列: 使用 vector 实现循环队列
  3. 溢出策略: 根据需求选择合适的溢出策略
  4. shutdown: 关闭队列后无法入队
  5. 线程安全: 所有操作都是线程安全的
  6. 移动语义: 优先使用移动语义减少拷贝
  7. front(): 返回原始指针,不要持有超过队列生命周期
  8. 析构: 析构时会自动调用 shutdown()

相关文档


参考资料

主页

API 文档

DMA 模块

DRM 模块

NET 模块

V4L2 模块

V4L2Param 模块

RGA 模块

MPP 模块

Sys 模块

Mouse 模块

Utils 模块

Clone this wiki locally