Skip to content

Utils_AsyncThreadPool

SweerItTer edited this page Feb 1, 2026 · 3 revisions

asyncThreadPool API 文档

概述

asyncThreadPool 是 utilsCore Utils 模块的异步线程池类,提供动态伸缩的线程池管理和异步任务执行功能。

职责

  • 管理工作线程的创建和销毁
  • 提供异步任务执行接口
  • 支持任务队列管理
  • 动态调整线程数量
  • 支持阻塞和非阻塞入队

适用场景

  • 异步任务执行
  • 多线程任务处理
  • 并行计算
  • 后台任务处理
  • YOLO 模型推理

性能特性

  • 动态伸缩: 根据负载自动调整线程数量
  • 线程复用: 避免频繁创建/销毁线程
  • 任务队列: 使用无锁队列(ConcurrentQueue)
  • 阻塞等待: 支持任务队列满时阻塞

依赖关系

  • 依赖: C++ STL, moodycamel::ConcurrentQueue
  • 被依赖: VisionPipeline, rknnPool 等模块

类分析

asyncThreadPool 类

职责与用途

asyncThreadPool 是异步线程池的封装类,提供:

  • 动态线程池管理(最小/最大线程数)
  • 异步任务执行(enqueue)
  • 非阻塞任务入队(try_enqueue)
  • 自动线程调度
  • 管理线程监控

设计模式

  • 线程池模式: 复用线程,减少创建开销
  • 生产者-消费者模式: 任务队列管理
  • 单例管理: 内部管理线程管理器

公共 API 方法

构造函数

asyncThreadPool(std::size_t minThreads=-1, std::size_t maxThreads=-1, std::size_t maxQueueSize = 64)

参数说明:

  • minThreads (输入): 最小线程数(-1 表示不限制,默认 1)
  • maxThreads (输入): 最大线程数(-1 表示不限制,默认 CPU 核心数)
  • maxQueueSize (输入): 队列最大长度(默认 64)

返回值: 无

所有权归属:

  • asyncThreadPool 拥有线程的所有权

注意事项:

  1. 构造时创建最小数量的线程
  2. 线程数会在 minThreads 和 maxThreads 之间动态调整
  3. 管理线程每 5 秒检查一次线程负载

使用例程:

// 使用默认参数(1-CPU核心数线程,队列长度 64)
asyncThreadPool pool;

// 指定线程数范围(2-4 线程,队列长度 128)
asyncThreadPool pool(2, 4, 128);

// 只指定最大线程数(默认最小 1)
asyncThreadPool pool(-1, 8);

enqueue() - 阻塞入队任务

template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type>

参数说明:

  • f (输入): 要执行的函数(可调用对象)
  • args (输入): 函数参数

返回值:

  • 成功: 返回 std::future<R> 对象,用于获取任务执行结果
  • 失败: 返回无效的 future(线程池已停止)

所有权归属:

  • 任务参数的所有权通过 std::forward 转移
  • future 对象由调用者持有

注意事项:

  1. 如果队列满,阻塞等待直到有空位
  2. 线程池停止时返回无效 future
  3. 使用 std::packaged_task 包装任务
  4. 返回值类型为 std::result_of<F(Args...)>::type

使用例程:

asyncThreadPool pool(2, 4, 64);

// 提交无返回值任务
auto future = pool.enqueue([]() {
    printf("Task executed\n");
});

// 提交有返回值任务
auto future = pool.enqueue([](int x, int y) {
    return x + y;
}, 10, 20);

// 等待结果
int result = future.get();
printf("Result: %d\n", result);

try_enqueue() - 非阻塞入队任务

template<class F, class... Args>
auto try_enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type>

参数说明:

  • f (输入): 要执行的函数(可调用对象)
  • args (输入): 函数参数

返回值:

  • 成功: 返回 std::future<R> 对象
  • 失败(队列满或线程池停止): 返回无效的 future

所有权归属:

  • 任务参数的所有权通过 std::forward 转移
  • future 对象由调用者持有

注意事项:

  1. 如果队列满,直接返回无效 future,不阻塞
  2. 线程池停止时返回无效 future
  3. 可以通过 future.valid() 检查是否入队成功
  4. 适用于不能阻塞的场景

使用例程:

asyncThreadPool pool(2, 4, 64);

// 非阻塞提交任务
auto future = pool.try_enqueue([]() {
    printf("Task executed\n");
});

if (future.valid()) {
    // 任务已入队
    future.wait();
} else {
    // 队列满,任务未入队
    printf("Queue full, task rejected\n");
}

stop() - 停止线程池

void stop()

参数说明: 无

返回值: 无

所有权归属:

  • 停止后线程池对象不再可用

注意事项:

  1. 停止管理线程
  2. 停止所有工作线程
  3. 队列中的任务不会被执行
  4. 已提交的任务会执行完成

使用例程:

asyncThreadPool pool(2, 4, 64);

// 提交任务
pool.enqueue([]() { printf("Task 1\n"); });

// 停止线程池
pool.stop();

内部实现

数据结构

class asyncThreadPool {
private:
    std::atomic<bool> running_;                              // 运行标志
    std::atomic<size_t> activeTasks_;                       // 活跃任务数

    std::mutex queueMtx_;                                   // 队列互斥锁
    std::condition_variable condition_;                     // 队列非空条件
    std::condition_variable queueNotFullCv_;                 // 队列未满条件
    std::condition_variable managerCv_;                      // 管理线程条件

    moodycamel::ConcurrentQueue<std::function<void()>> tasks_; // 任务队列
    std::size_t maxQueueSize_;                              // 最大队列长度

    std::thread managerThread_;                             // 管理线程
    std::vector<std::shared_ptr<WorkerWrapper>> workers_;   // 工作线程列表

    size_t minThreads_;                                     // 最小线程数
    size_t maxThreads_;                                     // 最大线程数
};

工作流程

提交任务流程

1. 调用 enqueue()
   ↓
2. 创建 packaged_task 和 future
   ↓
3. 获取互斥锁
   ↓
4. 等待队列非满(阻塞)
   ↓
5. 任务入队
   ↓
6. 通知一个工作线程
   ↓
7. 通知管理线程
   ↓
8. 返回 future

工作线程流程

1. 工作线程启动
   ↓
2. 等待队列非空
   ↓
3. 取出任务
   ↓
4. 执行任务
   ↓
5. 继续等待

线程管理

动态伸缩

线程池会根据负载自动调整线程数量:

  • 扩容: 当活跃任务数 >= 线程数 且 线程数 < maxThreads 时
  • 缩容: 当活跃任务数 < 线程数/2 且 线程数 > minThreads 时

管理线程

  • 每 5 秒检查一次线程负载
  • 动态创建/销毁工作线程
  • 记录每个线程的活跃时间

典型使用场景

场景 1: 基本使用

asyncThreadPool pool(2, 4, 64);

// 提交多个任务
std::vector<std::future<int>> futures;
for (int i = 0; i < 10; ++i) {
    futures.push_back(pool.enqueue([i]() {
        return i * i;
    }));
}

// 等待所有结果
for (auto& f : futures) {
    printf("Result: %d\n", f.get());
}

场景 2: 非阻塞提交

asyncThreadPool pool(2, 4, 64);

for (int i = 0; i < 100; ++i) {
    auto future = pool.try_enqueue([i]() {
        process(i);
    });
    
    if (!future.valid()) {
        printf("Queue full at task %d\n", i);
        // 可以丢弃任务或使用其他策略
    }
}

场景 3: YOLO 模型推理(VisionPipeline)

class VisionPipeline::Impl {
    asyncThreadPool pool;  // 线程池
    
    void mainLoop() {
        while (running.load()) {
            // ...
            pool.enqueue([this]{
                // 计算 FPS
                perf.endFrame();
                
                // 显示回调
                if (showCb) showCb(safetyGetCurrentFrame());
                
                // RGA 图像传递
                if (cb && ModelStatus::Start == modelStatus) {
                    FramePtr frame{nullptr};
                    if (getCurrentRGAFrame(frame) && frame) {
                        cb(frame->sharedState(0)->dmabuf_ptr, frame);
                    }
                }
            });
        }
    }
};

场景 4: 等待超时

asyncThreadPool pool(2, 4, 64);

auto future = pool.enqueue([]() {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return 42;
});

// 等待任务完成(带超时)
if (future.wait_for(std::chrono::milliseconds(500)) == std::future_status::ready) {
    int result = future.get();
    printf("Result: %d\n", result);
} else {
    printf("Timeout\n");
}

性能特性

线程复用

  • 避免频繁创建/销毁线程
  • 线程数量在 minThreads 和 maxThreads 之间动态调整
  • 管理线程定期检查负载

无锁队列

  • 使用 moodycamel::ConcurrentQueue
  • 高并发性能优化
  • 减少锁竞争

阻塞/非阻塞

  • enqueue(): 阻塞等待队列有空位
  • try_enqueue(): 非阻塞,队列满时直接返回

注意事项

  1. 线程数范围: minThreads 应 <= maxThreads
  2. 队列长度: maxQueueSize 需要根据任务量合理设置
  3. 任务异常: 任务抛出的异常会在 future.get() 时重新抛出
  4. 线程亲和性: 工作线程没有绑定 CPU 核心
  5. 停止时机: stop() 后不能再提交任务
  6. 资源清理: 析构时会自动调用 stop()
  7. 返回值: 使用 future.get() 获取返回值
  8. 内存占用: 线程数和队列长度都会影响内存占用

相关文档


参考资料

主页

API 文档

DMA 模块

DRM 模块

NET 模块

V4L2 模块

V4L2Param 模块

RGA 模块

MPP 模块

Sys 模块

Mouse 模块

Utils 模块

Clone this wiki locally