Skip to content

Utils_ConcurrentQueue

SweerItTer edited this page Feb 21, 2026 · 4 revisions

ConcurrentQueue API 文档

概述

ConcurrentQueue 是 utilsCore Utils 模块提供的 moodycamel::ConcurrentQueue 无锁多生产者多消费者队列的封装,提供高性能并发数据交换功能。

职责

  • 无锁并发队列
  • 高性能多生产者多消费者
  • 支持显式生产者/消费者 Token
  • 支持批量操作

适用场景

  • 高并发数据交换
  • 任务队列
  • 消息传递
  • 生产者消费者模式

依赖关系

  • 依赖: moodycamel::ConcurrentQueue 库
  • 被依赖: VisionPipeline, RecordPipeline 等模块

类分析

ConcurrentQueue 类

职责与用途

ConcurrentQueue 是高性能无锁并发队列的封装类,提供:

  • 多生产者多消费者并发访问
  • 显式生产者/消费者 Token
  • 批量操作
  • 内存优化

设计模式

  • 生产者-消费者模式: 多生产者多消费者
  • 无锁算法: 使用 CAS 操作

Token 类型

ProducerToken - 生产者 Token

struct moodycamel::ProducerToken;

职责: 显式生产者标识,用于优化生产者性能

使用例程:

moodycamel::ProducerToken token(queue);
queue.enqueue(token, item);

ConsumerToken - 消费者 Token

struct moodycamel::ConsumerToken;

职责: 显式消费者标识,用于优化消费者性能

使用例程:

moodycamel::ConsumerToken token(queue);
queue.try_dequeue(token, item);

公共 API 方法

构造函数

ConcurrentQueue();
explicit ConcurrentQueue(size_t initialCapacity);
~ConcurrentQueue();

参数说明:

  • initialCapacity (输入): 初始容量(可选)

返回值: 无

所有权归属:

  • ConcurrentQueue 拥有队列资源的所有权

注意事项:

  1. 默认构造不预分配空间
  2. 可以指定初始容量以优化性能

使用例程:

// 默认构造
ConcurrentQueue<int> queue;

// 指定初始容量
ConcurrentQueue<int> queue(1000);

enqueue() - 入队(隐式生产者)

bool enqueue(const T& item);
bool enqueue(T&& item);

参数说明:

  • item (输入): 要入队的元素

返回值:

  • true: 入队成功

所有权归属:

  • 元素所有权转移给队列

注意事项:

  1. 无锁操作
  2. 高性能
  3. 适用于偶尔生产场景

使用例程:

queue.enqueue(42);
queue.enqueue(std::string("hello"));

enqueue() - 入队(显式生产者)

bool enqueue(moodycamel::ProducerToken& token, const T& item);
bool enqueue(moodycamel::ProducerToken& token, T&& item);

参数说明:

  • token (输入): 生产者 Token
  • item (输入): 要入队的元素

返回值:

  • true: 入队成功

所有权归属:

  • 元素所有权转移给队列

注意事项:

  1. 性能优于隐式生产者
  2. 适用于频繁生产场景
  3. 每个线程应使用自己的 Token

使用例程:

moodycamel::ProducerToken token(queue);
queue.enqueue(token, 42);

try_dequeue() - 尝试出队(隐式消费者)

bool try_dequeue(T& item);

参数说明:

  • item (输出): 出队的元素

返回值:

  • true: 出队成功
  • false: 队列空

所有权归属:

  • 元素所有权转移给调用者

注意事项:

  1. 无锁操作
  2. 非阻塞
  3. 立即返回

使用例程:

T item;
if (queue.try_dequeue(item)) {
    printf("Got item\n");
}

try_dequeue() - 尝试出队(显式消费者)

bool try_dequeue(moodycamel::ConsumerToken& token, T& item);

参数说明:

  • token (输入): 消费者 Token
  • item (输出): 出队的元素

返回值:

  • true: 出队成功
  • false: 队列空

所有权归属:

  • 元素所有权转移给调用者

注意事项:

  1. 性能优于隐式消费者
  2. 适用于频繁消费场景
  3. 每个线程应使用自己的 Token

使用例程:

moodycamel::ConsumerToken token(queue);
T item;
if (queue.try_dequeue(token, item)) {
    printf("Got item\n");
}

size_approx() - 获取近似大小

size_t size_approx() const;

参数说明: 无

返回值: 近似队列大小

所有权归属:

  • 无所有权转移

注意事项:

  1. 近似值,不是精确值
  2. 只读操作
  3. 线程安全

使用例程:

printf("Approximate size: %zu\n", queue.size_approx());

empty() - 检查是否为空

bool empty() const;

参数说明: 无

返回值:

  • true: 队列为空
  • false: 队列不为空

所有权归属:

  • 无所有权转移

注意事项:

  1. 近似值,不是精确值
  2. 只读操作
  3. 线程安全

使用例程:

if (queue.empty()) {
    printf("Queue is empty\n");
}

线程安全说明

同步机制

  • 无锁算法: 使用 CAS 操作
  • 内存屏障: 确保内存可见性

线程安全建议

  • 可以并发调用 enqueue()
  • 可以并发调用 try_dequeue()
  • 所有操作都是线程安全的
  • 使用 Token 可以提高性能

典型使用场景

场景 1: 基本使用(隐式)

ConcurrentQueue<int> queue;

// 生产者
std::thread producer([&]() {
    for (int i = 0; i < 100; ++i) {
        queue.enqueue(i);
    }
});

// 消费者
std::thread consumer([&]() {
    int item;
    while (queue.try_dequeue(item)) {
        printf("Got: %d\n", item);
    }
});

producer.join();
consumer.join();

场景 2: 使用 Token(显式)

ConcurrentQueue<int> queue;

// 生产者
std::thread producer([&]() {
    moodycamel::ProducerToken token(queue);
    for (int i = 0; i < 100; ++i) {
        queue.enqueue(token, i);
    }
});

// 消费者
std::thread consumer([&]() {
    moodycamel::ConsumerToken token(queue);
    int item;
    while (queue.try_dequeue(token, item)) {
        printf("Got: %d\n", item);
    }
});

producer.join();
consumer.join();

场景 3: 多生产者多消费者

ConcurrentQueue<Task> queue;

// 多个生产者
std::vector<std::thread> producers;
for (int i = 0; i < 4; ++i) {
    producers.emplace_back([&, i]() {
        moodycamel::ProducerToken token(queue);
        for (int j = 0; j < 100; ++j) {
            queue.enqueue(token, Task{i, j});
        }
    });
}

// 多个消费者
std::vector<std::thread> consumers;
for (int i = 0; i < 4; ++i) {
    consumers.emplace_back([&]() {
        moodycamel::ConsumerToken token(queue);
        Task item;
        while (queue.try_dequeue(token, item)) {
            process(item);
        }
    });
}

for (auto& t : producers) t.join();
for (auto& t : consumers) t.join();

注意事项

  1. 无锁算法: 使用 CAS 操作,高性能
  2. 近似值: size_approx() 返回近似值
  3. Token 优化: 频繁生产/消费使用 Token 提高性能
  4. 线程安全: 所有操作都是线程安全的
  5. 性能: 比 std::queue + mutex 更快
  6. Token 作用域: Token 生命周期应小于队列

相关文档


参考资料

主页

API 文档

DMA 模块

DRM 模块

NET 模块

V4L2 模块

V4L2Param 模块

RGA 模块

MPP 模块

Sys 模块

Mouse 模块

Utils 模块

Clone this wiki locally