-
Notifications
You must be signed in to change notification settings - Fork 1
Utils_ConcurrentQueue
SweerItTer edited this page Feb 21, 2026
·
4 revisions
ConcurrentQueue 是 utilsCore Utils 模块提供的 moodycamel::ConcurrentQueue 无锁多生产者多消费者队列的封装,提供高性能并发数据交换功能。
- 无锁并发队列
- 高性能多生产者多消费者
- 支持显式生产者/消费者 Token
- 支持批量操作
- 高并发数据交换
- 任务队列
- 消息传递
- 生产者消费者模式
- 依赖: moodycamel::ConcurrentQueue 库
- 被依赖: VisionPipeline, RecordPipeline 等模块
ConcurrentQueue 是高性能无锁并发队列的封装类,提供:
- 多生产者多消费者并发访问
- 显式生产者/消费者 Token
- 批量操作
- 内存优化
- 生产者-消费者模式: 多生产者多消费者
- 无锁算法: 使用 CAS 操作
struct moodycamel::ProducerToken;职责: 显式生产者标识,用于优化生产者性能
使用例程:
moodycamel::ProducerToken token(queue);
queue.enqueue(token, item);struct moodycamel::ConsumerToken;职责: 显式消费者标识,用于优化消费者性能
使用例程:
moodycamel::ConsumerToken token(queue);
queue.try_dequeue(token, item);ConcurrentQueue();
explicit ConcurrentQueue(size_t initialCapacity);
~ConcurrentQueue();参数说明:
-
initialCapacity(输入): 初始容量(可选)
返回值: 无
所有权归属:
- ConcurrentQueue 拥有队列资源的所有权
注意事项:
- 默认构造不预分配空间
- 可以指定初始容量以优化性能
使用例程:
// 默认构造
ConcurrentQueue<int> queue;
// 指定初始容量
ConcurrentQueue<int> queue(1000);bool enqueue(const T& item);
bool enqueue(T&& item);参数说明:
-
item(输入): 要入队的元素
返回值:
- true: 入队成功
所有权归属:
- 元素所有权转移给队列
注意事项:
- 无锁操作
- 高性能
- 适用于偶尔生产场景
使用例程:
queue.enqueue(42);
queue.enqueue(std::string("hello"));bool enqueue(moodycamel::ProducerToken& token, const T& item);
bool enqueue(moodycamel::ProducerToken& token, T&& item);参数说明:
-
token(输入): 生产者 Token -
item(输入): 要入队的元素
返回值:
- true: 入队成功
所有权归属:
- 元素所有权转移给队列
注意事项:
- 性能优于隐式生产者
- 适用于频繁生产场景
- 每个线程应使用自己的 Token
使用例程:
moodycamel::ProducerToken token(queue);
queue.enqueue(token, 42);bool try_dequeue(T& item);参数说明:
-
item(输出): 出队的元素
返回值:
- true: 出队成功
- false: 队列空
所有权归属:
- 元素所有权转移给调用者
注意事项:
- 无锁操作
- 非阻塞
- 立即返回
使用例程:
T item;
if (queue.try_dequeue(item)) {
printf("Got item\n");
}bool try_dequeue(moodycamel::ConsumerToken& token, T& item);参数说明:
-
token(输入): 消费者 Token -
item(输出): 出队的元素
返回值:
- true: 出队成功
- false: 队列空
所有权归属:
- 元素所有权转移给调用者
注意事项:
- 性能优于隐式消费者
- 适用于频繁消费场景
- 每个线程应使用自己的 Token
使用例程:
moodycamel::ConsumerToken token(queue);
T item;
if (queue.try_dequeue(token, item)) {
printf("Got item\n");
}size_t size_approx() const;参数说明: 无
返回值: 近似队列大小
所有权归属:
- 无所有权转移
注意事项:
- 近似值,不是精确值
- 只读操作
- 线程安全
使用例程:
printf("Approximate size: %zu\n", queue.size_approx());bool empty() const;参数说明: 无
返回值:
- true: 队列为空
- false: 队列不为空
所有权归属:
- 无所有权转移
注意事项:
- 近似值,不是精确值
- 只读操作
- 线程安全
使用例程:
if (queue.empty()) {
printf("Queue is empty\n");
}- 无锁算法: 使用 CAS 操作
- 内存屏障: 确保内存可见性
- 可以并发调用 enqueue()
- 可以并发调用 try_dequeue()
- 所有操作都是线程安全的
- 使用 Token 可以提高性能
ConcurrentQueue<int> queue;
// 生产者
std::thread producer([&]() {
for (int i = 0; i < 100; ++i) {
queue.enqueue(i);
}
});
// 消费者
std::thread consumer([&]() {
int item;
while (queue.try_dequeue(item)) {
printf("Got: %d\n", item);
}
});
producer.join();
consumer.join();ConcurrentQueue<int> queue;
// 生产者
std::thread producer([&]() {
moodycamel::ProducerToken token(queue);
for (int i = 0; i < 100; ++i) {
queue.enqueue(token, i);
}
});
// 消费者
std::thread consumer([&]() {
moodycamel::ConsumerToken token(queue);
int item;
while (queue.try_dequeue(token, item)) {
printf("Got: %d\n", item);
}
});
producer.join();
consumer.join();ConcurrentQueue<Task> queue;
// 多个生产者
std::vector<std::thread> producers;
for (int i = 0; i < 4; ++i) {
producers.emplace_back([&, i]() {
moodycamel::ProducerToken token(queue);
for (int j = 0; j < 100; ++j) {
queue.enqueue(token, Task{i, j});
}
});
}
// 多个消费者
std::vector<std::thread> consumers;
for (int i = 0; i < 4; ++i) {
consumers.emplace_back([&]() {
moodycamel::ConsumerToken token(queue);
Task item;
while (queue.try_dequeue(token, item)) {
process(item);
}
});
}
for (auto& t : producers) t.join();
for (auto& t : consumers) t.join();- 无锁算法: 使用 CAS 操作,高性能
- 近似值: size_approx() 返回近似值
- Token 优化: 频繁生产/消费使用 Token 提高性能
- 线程安全: 所有操作都是线程安全的
- 性能: 比 std::queue + mutex 更快
- Token 作用域: Token 生命周期应小于队列
- SafeQueue - 安全队列
- OrderedQueue - 有序队列
- Utils 模块总览
主页
API 文档
DMA 模块
DRM 模块
- DRM 模块总览
- DeviceController - DRM 设备控制器
- DrmLayer - DRM 图层管理
- PlanesCompositor - DRM 平面合成器
- DrmBpp - DRM 格式定义
NET 模块
- NET 模块总览
- TcpServer - TCP 服务器
- SocketConnection - Socket 连接管理
- CommandHandler - 命令处理器
- DataPacket - 数据包
V4L2 模块
- V4L2 模块总览
- CameraController - V4L2 摄像头控制器
- Frame - V4L2 帧数据结构
- FormatTool - V4L2 格式工具
- Exception - V4L2 异常类
V4L2Param 模块
- V4L2Param 模块总览
- ParamControl - 参数控制
- ParamLogger - 参数日志
- ParamProcessor - 参数处理器
RGA 模块
- RGA 模块总览
- RgaConverter - RGA 转换器
- RgaProcessor - RGA 处理器
- FormatTool - RGA 格式工具
MPP 模块
- MPP 模块总览
- EncoderContext - 编码器上下文
- EncoderCore - 编码器核心
- JpegEncoder - JPEG 编码器
- StreamWriter - 流写入器
- MppResourceGuard - MPP 资源守护
- FileTools - 文件工具
- FormatTool - 格式工具
Sys 模块
- Sys 模块总览
- CpuMonitor - CPU 监控器
- MemoryMonitor - 内存监控器
- Base - 基础类
Mouse 模块
- Mouse 模块总览
- Watcher - 鼠标监视器
Utils 模块
- Utils 模块总览
- AsyncThreadPool - 异步线程池
- ConcurrentQueue - 并发队列
- FdWrapper - 文件描述符包装器
- FenceWatcher - 围栏监视器
- FixedSizePool - 固定大小对象池
- Logger - 日志记录器
- ObjectsPool - 对象池
- OrderedQueue - 有序队列
- ProgressBar - 进度条
- SafeQueue - 安全队列
- SharedBufferState - 共享缓冲区状态
- SimpleVariant - 简单变体类型
- ThreadPauser - 线程暂停器
- ThreadUtils - 线程工具
- Types - 类型定义
- UdevMonitor - Udev 监视器