-
Notifications
You must be signed in to change notification settings - Fork 1
Utils_AsyncThreadPool
SweerItTer edited this page Feb 1, 2026
·
3 revisions
asyncThreadPool 是 utilsCore Utils 模块的异步线程池类,提供动态伸缩的线程池管理和异步任务执行功能。
- 管理工作线程的创建和销毁
- 提供异步任务执行接口
- 支持任务队列管理
- 动态调整线程数量
- 支持阻塞和非阻塞入队
- 异步任务执行
- 多线程任务处理
- 并行计算
- 后台任务处理
- YOLO 模型推理
- 动态伸缩: 根据负载自动调整线程数量
- 线程复用: 避免频繁创建/销毁线程
- 任务队列: 使用无锁队列(ConcurrentQueue)
- 阻塞等待: 支持任务队列满时阻塞
- 依赖: C++ STL, moodycamel::ConcurrentQueue
- 被依赖: VisionPipeline, rknnPool 等模块
asyncThreadPool 是异步线程池的封装类,提供:
- 动态线程池管理(最小/最大线程数)
- 异步任务执行(enqueue)
- 非阻塞任务入队(try_enqueue)
- 自动线程调度
- 管理线程监控
- 线程池模式: 复用线程,减少创建开销
- 生产者-消费者模式: 任务队列管理
- 单例管理: 内部管理线程管理器
asyncThreadPool(std::size_t minThreads=-1, std::size_t maxThreads=-1, std::size_t maxQueueSize = 64)参数说明:
-
minThreads(输入): 最小线程数(-1 表示不限制,默认 1) -
maxThreads(输入): 最大线程数(-1 表示不限制,默认 CPU 核心数) -
maxQueueSize(输入): 队列最大长度(默认 64)
返回值: 无
所有权归属:
- asyncThreadPool 拥有线程的所有权
注意事项:
- 构造时创建最小数量的线程
- 线程数会在 minThreads 和 maxThreads 之间动态调整
- 管理线程每 5 秒检查一次线程负载
使用例程:
// 使用默认参数(1-CPU核心数线程,队列长度 64)
asyncThreadPool pool;
// 指定线程数范围(2-4 线程,队列长度 128)
asyncThreadPool pool(2, 4, 128);
// 只指定最大线程数(默认最小 1)
asyncThreadPool pool(-1, 8);template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>参数说明:
-
f(输入): 要执行的函数(可调用对象) -
args(输入): 函数参数
返回值:
- 成功: 返回
std::future<R>对象,用于获取任务执行结果 - 失败: 返回无效的 future(线程池已停止)
所有权归属:
- 任务参数的所有权通过 std::forward 转移
- future 对象由调用者持有
注意事项:
- 如果队列满,阻塞等待直到有空位
- 线程池停止时返回无效 future
- 使用 std::packaged_task 包装任务
- 返回值类型为
std::result_of<F(Args...)>::type
使用例程:
asyncThreadPool pool(2, 4, 64);
// 提交无返回值任务
auto future = pool.enqueue([]() {
printf("Task executed\n");
});
// 提交有返回值任务
auto future = pool.enqueue([](int x, int y) {
return x + y;
}, 10, 20);
// 等待结果
int result = future.get();
printf("Result: %d\n", result);template<class F, class... Args>
auto try_enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>参数说明:
-
f(输入): 要执行的函数(可调用对象) -
args(输入): 函数参数
返回值:
- 成功: 返回
std::future<R>对象 - 失败(队列满或线程池停止): 返回无效的 future
所有权归属:
- 任务参数的所有权通过 std::forward 转移
- future 对象由调用者持有
注意事项:
- 如果队列满,直接返回无效 future,不阻塞
- 线程池停止时返回无效 future
- 可以通过 future.valid() 检查是否入队成功
- 适用于不能阻塞的场景
使用例程:
asyncThreadPool pool(2, 4, 64);
// 非阻塞提交任务
auto future = pool.try_enqueue([]() {
printf("Task executed\n");
});
if (future.valid()) {
// 任务已入队
future.wait();
} else {
// 队列满,任务未入队
printf("Queue full, task rejected\n");
}void stop()参数说明: 无
返回值: 无
所有权归属:
- 停止后线程池对象不再可用
注意事项:
- 停止管理线程
- 停止所有工作线程
- 队列中的任务不会被执行
- 已提交的任务会执行完成
使用例程:
asyncThreadPool pool(2, 4, 64);
// 提交任务
pool.enqueue([]() { printf("Task 1\n"); });
// 停止线程池
pool.stop();class asyncThreadPool {
private:
std::atomic<bool> running_; // 运行标志
std::atomic<size_t> activeTasks_; // 活跃任务数
std::mutex queueMtx_; // 队列互斥锁
std::condition_variable condition_; // 队列非空条件
std::condition_variable queueNotFullCv_; // 队列未满条件
std::condition_variable managerCv_; // 管理线程条件
moodycamel::ConcurrentQueue<std::function<void()>> tasks_; // 任务队列
std::size_t maxQueueSize_; // 最大队列长度
std::thread managerThread_; // 管理线程
std::vector<std::shared_ptr<WorkerWrapper>> workers_; // 工作线程列表
size_t minThreads_; // 最小线程数
size_t maxThreads_; // 最大线程数
};提交任务流程:
1. 调用 enqueue()
↓
2. 创建 packaged_task 和 future
↓
3. 获取互斥锁
↓
4. 等待队列非满(阻塞)
↓
5. 任务入队
↓
6. 通知一个工作线程
↓
7. 通知管理线程
↓
8. 返回 future
工作线程流程:
1. 工作线程启动
↓
2. 等待队列非空
↓
3. 取出任务
↓
4. 执行任务
↓
5. 继续等待
线程池会根据负载自动调整线程数量:
- 扩容: 当活跃任务数 >= 线程数 且 线程数 < maxThreads 时
- 缩容: 当活跃任务数 < 线程数/2 且 线程数 > minThreads 时
- 每 5 秒检查一次线程负载
- 动态创建/销毁工作线程
- 记录每个线程的活跃时间
asyncThreadPool pool(2, 4, 64);
// 提交多个任务
std::vector<std::future<int>> futures;
for (int i = 0; i < 10; ++i) {
futures.push_back(pool.enqueue([i]() {
return i * i;
}));
}
// 等待所有结果
for (auto& f : futures) {
printf("Result: %d\n", f.get());
}asyncThreadPool pool(2, 4, 64);
for (int i = 0; i < 100; ++i) {
auto future = pool.try_enqueue([i]() {
process(i);
});
if (!future.valid()) {
printf("Queue full at task %d\n", i);
// 可以丢弃任务或使用其他策略
}
}class VisionPipeline::Impl {
asyncThreadPool pool; // 线程池
void mainLoop() {
while (running.load()) {
// ...
pool.enqueue([this]{
// 计算 FPS
perf.endFrame();
// 显示回调
if (showCb) showCb(safetyGetCurrentFrame());
// RGA 图像传递
if (cb && ModelStatus::Start == modelStatus) {
FramePtr frame{nullptr};
if (getCurrentRGAFrame(frame) && frame) {
cb(frame->sharedState(0)->dmabuf_ptr, frame);
}
}
});
}
}
};asyncThreadPool pool(2, 4, 64);
auto future = pool.enqueue([]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
return 42;
});
// 等待任务完成(带超时)
if (future.wait_for(std::chrono::milliseconds(500)) == std::future_status::ready) {
int result = future.get();
printf("Result: %d\n", result);
} else {
printf("Timeout\n");
}- 避免频繁创建/销毁线程
- 线程数量在 minThreads 和 maxThreads 之间动态调整
- 管理线程定期检查负载
- 使用 moodycamel::ConcurrentQueue
- 高并发性能优化
- 减少锁竞争
-
enqueue(): 阻塞等待队列有空位 -
try_enqueue(): 非阻塞,队列满时直接返回
- 线程数范围: minThreads 应 <= maxThreads
- 队列长度: maxQueueSize 需要根据任务量合理设置
- 任务异常: 任务抛出的异常会在 future.get() 时重新抛出
- 线程亲和性: 工作线程没有绑定 CPU 核心
- 停止时机: stop() 后不能再提交任务
- 资源清理: 析构时会自动调用 stop()
- 返回值: 使用 future.get() 获取返回值
- 内存占用: 线程数和队列长度都会影响内存占用
- ConcurrentQueue - 无锁队列
- ThreadUtils - 线程工具
- Utils 模块总览
主页
API 文档
DMA 模块
DRM 模块
- DRM 模块总览
- DeviceController - DRM 设备控制器
- DrmLayer - DRM 图层管理
- PlanesCompositor - DRM 平面合成器
- DrmBpp - DRM 格式定义
NET 模块
- NET 模块总览
- TcpServer - TCP 服务器
- SocketConnection - Socket 连接管理
- CommandHandler - 命令处理器
- DataPacket - 数据包
V4L2 模块
- V4L2 模块总览
- CameraController - V4L2 摄像头控制器
- Frame - V4L2 帧数据结构
- FormatTool - V4L2 格式工具
- Exception - V4L2 异常类
V4L2Param 模块
- V4L2Param 模块总览
- ParamControl - 参数控制
- ParamLogger - 参数日志
- ParamProcessor - 参数处理器
RGA 模块
- RGA 模块总览
- RgaConverter - RGA 转换器
- RgaProcessor - RGA 处理器
- FormatTool - RGA 格式工具
MPP 模块
- MPP 模块总览
- EncoderContext - 编码器上下文
- EncoderCore - 编码器核心
- JpegEncoder - JPEG 编码器
- StreamWriter - 流写入器
- MppResourceGuard - MPP 资源守护
- FileTools - 文件工具
- FormatTool - 格式工具
Sys 模块
- Sys 模块总览
- CpuMonitor - CPU 监控器
- MemoryMonitor - 内存监控器
- Base - 基础类
Mouse 模块
- Mouse 模块总览
- Watcher - 鼠标监视器
Utils 模块
- Utils 模块总览
- AsyncThreadPool - 异步线程池
- ConcurrentQueue - 并发队列
- FdWrapper - 文件描述符包装器
- FenceWatcher - 围栏监视器
- FixedSizePool - 固定大小对象池
- Logger - 日志记录器
- ObjectsPool - 对象池
- OrderedQueue - 有序队列
- ProgressBar - 进度条
- SafeQueue - 安全队列
- SharedBufferState - 共享缓冲区状态
- SimpleVariant - 简单变体类型
- ThreadPauser - 线程暂停器
- ThreadUtils - 线程工具
- Types - 类型定义
- UdevMonitor - Udev 监视器