-
Notifications
You must be signed in to change notification settings - Fork 1
Utils_SafeQueue
SweerItTer edited this page Feb 21, 2026
·
4 revisions
SafeQueue 是 utilsCore 核心模块的线程安全队列工具类,提供多线程安全的队列操作,针对智能指针有特化版本。
- 提供线程安全的队列操作
- 支持智能指针特化
- 循环队列实现
- 溢出策略支持
- 生产者-消费者模式
- 多线程数据交换
- 智能指针队列
- 缓冲区管理
- 依赖: C++ STL
- 被依赖: CameraController, VisionPipeline 等多个模块
SafeQueue 是线程安全队列的封装类,提供:
- 针对智能指针的特化版本
- 循环队列实现
- 溢出策略支持
- 阻塞和非阻塞模式
- 生产者-消费者模式: 队列作为缓冲区
- RAII: 自动管理资源
- 模板特化: 针对智能指针的特化版本
template <typename T, typename Enable = void>
class SafeQueue;template <typename Ptr>
class SafeQueue<Ptr, typename std::enable_if<
std::is_same<Ptr, std::shared_ptr<typename Ptr::element_type>>::value ||
std::is_same<Ptr, std::unique_ptr<typename Ptr::element_type>>::value
>::type>;说明:
- 针对
std::shared_ptr和std::unique_ptr特化 - 使用 SFINAE 机制实现
-
typename Ptr::element_type获取源指针类型
溢出策略枚举,定义队列满时的行为。
enum class OverflowPolicy {
DISCARD_OLDEST, // 丢弃最旧的项目
DISCARD_NEWEST, // 丢弃最新的项目
BLOCK, // 阻塞直到有空间
THROW_EXCEPTION // 抛出异常
};explicit SafeQueue(size_t capacity, OverflowPolicy policy = OverflowPolicy::DISCARD_OLDEST);
~SafeQueue();
// 禁止拷贝
SafeQueue(const SafeQueue&) = delete;
SafeQueue& operator=(const SafeQueue&) = delete;
// 允许移动
SafeQueue(SafeQueue&& other) noexcept;
SafeQueue& operator=(SafeQueue&& other) noexcept;参数说明:
-
capacity(输入): 队列容量 -
policy(输入): 溢出策略 -
other(输入): 要移动的源队列
返回值: 无
所有权归属:
- SafeQueue 拥有队列元素的所有权
- 移动操作会转移所有权
注意事项:
- capacity 必须大于 0
- 默认溢出策略为 DISCARD_OLDEST
- 析构时会清空队列
- 禁止拷贝构造和拷贝赋值
- 支持移动构造和移动赋值
- 移动操作是线程安全的(使用双锁避免死锁)
- 移动构造函数通过调用移动赋值运算符实现
- 移动赋值运算符使用双锁机制(std::lock)避免对称死锁
- 移动后源对象会被重置为空状态(head_=0, tail_=0, size_=0, capacity_=0, shutdown_=true)
- 移动操作会转移所有队列元素的所有权
使用例程:
// 创建队列,容量为 100,溢出时丢弃最旧的
SafeQueue<FramePtr> queue(100, SafeQueue<FramePtr>::OverflowPolicy::DISCARD_OLDEST);
// 创建队列,容量为 100,溢出时阻塞
SafeQueue<FramePtr> queue(100, SafeQueue<FramePtr>::OverflowPolicy::BLOCK);
// 移动构造
SafeQueue<FramePtr> queue2 = std::move(queue);
// 移动赋值
SafeQueue<FramePtr> queue3;
queue3 = std::move(queue2);bool enqueue(Ptr&& item);参数说明:
-
item(输入): 要入队的元素(右值引用)
返回值:
-
true: 入队成功 -
false: 入队失败(队列已关闭或溢出策略为 DISCARD_NEWEST)
所有权归属:
- 元素的所有权转移给队列
注意事项:
- 使用移动语义
- 如果队列已满,根据溢出策略处理
- 如果队列已关闭,返回 false
使用例程:
SafeQueue<FramePtr> queue(100);
// 入队
FramePtr frame = std::make_shared<Frame>();
queue.enqueue(std::move(frame));Ptr dequeue();参数说明: 无
返回值: 队列中的元素(智能指针)
所有权归属:
- 元素的所有权转移给调用者
注意事项:
- 如果队列为空,会阻塞直到有元素
- 如果队列已关闭且为空,返回 nullptr
- 线程安全
使用例程:
SafeQueue<FramePtr> queue;
// 出队(阻塞)
FramePtr frame = queue.dequeue();
if (frame) {
process_frame(frame);
}bool try_dequeue(Ptr& item);参数说明:
-
item(输出): 出队的元素
返回值:
-
true: 出队成功 -
false: 队列为空
所有权归属:
- 元素的所有权转移给调用者
注意事项:
- 如果队列为空,立即返回 false
- 线程安全
- 不会阻塞
使用例程:
SafeQueue<FramePtr> queue;
// 出队(非阻塞)
FramePtr frame;
if (queue.try_dequeue(frame)) {
process_frame(frame);
} else {
printf("Queue is empty\n");
}element* front() const;参数说明: 无
返回值: 队首元素的原始指针(不转移所有权)
所有权归属:
- 只读访问
注意事项:
- 如果队列为空,返回 nullptr
- 返回的是原始指针,不是智能指针
- 不要持有这个指针超过队列生命周期
使用例程:
SafeQueue<FramePtr> queue;
Frame* frame_ptr = queue.front();
if (frame_ptr) {
printf("Front frame width: %d\n", frame_ptr->meta.w);
}void clear();参数说明: 无
返回值: 无
所有权归属:
- 销毁所有元素
注意事项:
- 清空队列
- 重置 head_, tail_, size_
- 通知 not_full_cond_
size_t size() const;参数说明: 无
返回值: 队列大小
所有权归属:
- 只读访问
bool empty() const;参数说明: 无
返回值:
-
true: 队列为空 -
false: 队列不为空
所有权归属:
- 只读访问
void shutdown();参数说明: 无
返回值: 无
所有权归属:
- 无所有权转移
注意事项:
- 设置 shutdown_ 标志
- 唤醒所有等待的线程
- 关闭后不再允许入队
使用例程:
SafeQueue<FramePtr> queue;
// 关闭队列
queue.shutdown();size_t getBufferRealSize();参数说明: 无
返回值: 缓冲区实际大小(vector 的 capacity)
所有权归属:
- 只读访问
注意事项:
- 返回 vector 的 capacity
- 不是队列的实际元素数量
- 用于调试和性能分析
template <typename Ptr>
class SafeQueue<Ptr, ...> {
private:
std::vector<Ptr> buffer_; // 底层存储
size_t head_ = 0; // 队列头部索引
size_t tail_ = 0; // 队列尾部索引
size_t size_ = 0; // 当前队列大小
size_t capacity_ = 0; // 队列容量
std::atomic<bool> shutdown_{false}; // 关闭标志
mutable std::mutex mutex_; // 互斥锁
std::condition_variable not_empty_cond_; // 非空条件变量
std::condition_variable not_full_cond_; // 非满条件变量
OverflowPolicy policy_; // 溢出策略
};buffer_: [0] [1] [2] [3] [4] [5] [6] [7] [9]
^ ^
tail_ head_
入队: buffer_[tail_] = item; tail_ = (tail_ + 1) % capacity_
出队: item = buffer_[head_]; head_ = (head_ + 1) % capacity_
-
队列操作: 使用
std::mutex保护队列 -
条件变量: 使用
std::condition_variable通知线程 -
原子操作:
std::atomic<bool>保护 shutdown 标志
- 可以并发调用
enqueue()和dequeue() - 可以并发调用
enqueue()和enqueue() - 可以并发调用
dequeue()和dequeue() - 所有操作都是线程安全的
SafeQueue<FramePtr> queue(100);
// 生产者
std::thread producer([&]() {
for (int i = 0; i < 100; ++i) {
FramePtr frame = std::make_shared<Frame>();
queue.enqueue(std::move(frame));
}
});
// 消费者
std::thread consumer([&]() {
while (true) {
FramePtr frame = queue.dequeue();
if (frame) {
process_frame(frame);
} else {
break; // 队列已关闭
}
}
});
producer.join();
queue.shutdown();
consumer.join();// 丢弃最旧的
SafeQueue<FramePtr> queue(100, SafeQueue<FramePtr>::OverflowPolicy::DISCARD_OLDEST);
// 丢弃最新的
SafeQueue<FramePtr> queue(100, SafeQueue<FramePtr>::OverflowPolicy::DISCARD_NEWEST);
// 阻塞
SafeQueue<FramePtr> queue(100, SafeQueue<FramePtr>::OverflowPolicy::BLOCK);
// 抛出异常
SafeQueue<FramePtr> queue(100, SafeQueue<FramePtr>::OverflowPolicy::THROW_EXCEPTION);SafeQueue<FramePtr> queue(100);
// 非阻塞出队
FramePtr frame;
if (queue.try_dequeue(frame)) {
process_frame(frame);
} else {
printf("Queue is empty, skipping\n");
}SafeQueue<FramePtr> queue(100);
// 查看队首
Frame* frame_ptr = queue.front();
if (frame_ptr) {
printf("Front frame width: %d\n", frame_ptr->meta.w);
}SafeQueue<FramePtr> queue(100);
// 关闭队列
queue.shutdown();
// 关闭后无法入队
FramePtr frame = std::make_shared<Frame>();
bool success = queue.enqueue(std::move(frame)); // 返回 false- 智能指针特化: SafeQueue 只针对智能指针有特化版本
- 循环队列: 使用 vector 实现循环队列
- 溢出策略: 根据需求选择合适的溢出策略
- shutdown: 关闭队列后无法入队
- 线程安全: 所有操作都是线程安全的
- 移动语义: 优先使用移动语义减少拷贝
- front(): 返回原始指针,不要持有超过队列生命周期
- 析构: 析构时会自动调用 shutdown()
- ConcurrentQueue - 并发队列
- OrderedQueue - 有序队列
- Utils 模块总览
主页
API 文档
DMA 模块
DRM 模块
- DRM 模块总览
- DeviceController - DRM 设备控制器
- DrmLayer - DRM 图层管理
- PlanesCompositor - DRM 平面合成器
- DrmBpp - DRM 格式定义
NET 模块
- NET 模块总览
- TcpServer - TCP 服务器
- SocketConnection - Socket 连接管理
- CommandHandler - 命令处理器
- DataPacket - 数据包
V4L2 模块
- V4L2 模块总览
- CameraController - V4L2 摄像头控制器
- Frame - V4L2 帧数据结构
- FormatTool - V4L2 格式工具
- Exception - V4L2 异常类
V4L2Param 模块
- V4L2Param 模块总览
- ParamControl - 参数控制
- ParamLogger - 参数日志
- ParamProcessor - 参数处理器
RGA 模块
- RGA 模块总览
- RgaConverter - RGA 转换器
- RgaProcessor - RGA 处理器
- FormatTool - RGA 格式工具
MPP 模块
- MPP 模块总览
- EncoderContext - 编码器上下文
- EncoderCore - 编码器核心
- JpegEncoder - JPEG 编码器
- StreamWriter - 流写入器
- MppResourceGuard - MPP 资源守护
- FileTools - 文件工具
- FormatTool - 格式工具
Sys 模块
- Sys 模块总览
- CpuMonitor - CPU 监控器
- MemoryMonitor - 内存监控器
- Base - 基础类
Mouse 模块
- Mouse 模块总览
- Watcher - 鼠标监视器
Utils 模块
- Utils 模块总览
- AsyncThreadPool - 异步线程池
- ConcurrentQueue - 并发队列
- FdWrapper - 文件描述符包装器
- FenceWatcher - 围栏监视器
- FixedSizePool - 固定大小对象池
- Logger - 日志记录器
- ObjectsPool - 对象池
- OrderedQueue - 有序队列
- ProgressBar - 进度条
- SafeQueue - 安全队列
- SharedBufferState - 共享缓冲区状态
- SimpleVariant - 简单变体类型
- ThreadPauser - 线程暂停器
- ThreadUtils - 线程工具
- Types - 类型定义
- UdevMonitor - Udev 监视器