-
Notifications
You must be signed in to change notification settings - Fork 1
NET_README
SweerItTer edited this page Feb 21, 2026
·
1 revision
NET 模块提供了基于 Linux Socket 的 TCP 服务器功能,支持命令注册、DMA-BUF 鄄拷贝传输、多客户端并发连接等特性。
NET 模块严格遵循 utilsCore 的设计哲学:
- 清晰的接收-解析-执行-发送流程
- 每个客户端有独立的接收/发送线程
- 线程池异步处理命令
- 发送队列解耦生产者和消费者
- Socket 文件描述符通过 FdWrapper 显式管理
- 连接状态和错误信息对使用者可见
- 支持 DMA-BUF 文件描述符传输(零拷贝)
- FdWrapper 自动管理 socket 生命周期
- 移动语义,禁止拷贝
- 异常安全析构
- 清晰的接口设计
- 详细的日志输出
- 明确的职责划分
- TcpServer: 专注连接管理和数据传输
- CommandHandler: 专注命令注册和分发
- SocketConnection: 专注单个客户端连接处理
- 使用
std::weak_ptr避免所有权问题 - 使用
std::shared_ptr管理共享资源 - 使用移动语义优化性能
- 使用
noexcept标记不抛出异常的函数
TCP 服务器主类,负责服务器生命周期管理和全局调度。
职责:
- 服务器启动/停止
- 客户端连接管理
- 数据发送(单播/广播)
- 命令处理委托
使用示例:
#include "net/tcpServer.h"
int main() {
TcpServerConfig config;
config.port = 8080;
config.maxClients = 64;
// 创建命令处理器
auto commandHandler = std::make_shared<CommandHandler>();
commandHandler->registerCommand("ECHO", [](uint64_t clientId, auto, const std::string& params) {
return params;
});
// 创建服务器并设置命令处理器(注意:必须使用 shared_ptr 管理)
auto server = std::make_shared<TcpServer>(config, commandHandler);
server->start();
while (server->isRunning()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return 0;
}Socket 连接管理类,负责单个客户端连接的生命周期。
职责:
- 接收客户端数据
- 解析命令并委托给 TcpServer
- 发送响应数据
- 连接状态管理
- 错误处理
命令处理器,管理命令注册表和分发逻辑。
职责:
- 命令注册/注销
- 命令执行
- 命令元数据管理
数据包类型,支持多种数据传输方式。
数据类型:
- TEXT: 普通文本数据
- DMABUF_FD: DMA-BUF 文件描述符(零拷贝)
- RAW_FD: 裸文件描述符
使用示例:
// 发送文本
DataPacket pkt = DataPacket::createText("Hello World");
server.sendToClient(clientId, pkt);
// 发送 DMA-BUF(零拷贝)
DmaBufferPtr dmabuf = DmaBuffer::create(1920, 1080, DRM_FORMAT_ARGB8888);
DataPacket pkt = DataPacket::createDmaBuf(dmabuf);
server.sendToClient(clientId, pkt);NET 模块支持高度自定义的命令注册机制。
auto commandHandler = std::make_shared<CommandHandler>();
commandHandler->registerCommand("GET", [](uint64_t clientId, const std::string& command, const std::string& params) {
return "OK GET " + params;
});commandHandler->unregisterCommand("GET");Client → SocketConnection → TcpServer → CommandHandler → Callback
(接收数据) (连接管理) (命令分发) (业务逻辑)
commandHandler->registerCommand("GETFILENAME",
[](uint64_t clientId, const std::string& command, const std::string& params) {
std::string filename = params.empty() ? "default.txt" : params;
// 处理文件请求
return "OK " + filename;
},
"Get file content by filename"
);commandHandler->registerCommand("ECHO", [](uint64_t clientId, const std::string& command, const std::string& params) {
return params;
});commandHandler->registerCommand("TIME", [](uint64_t clientId, const std::string& command, const std::string& params) {
auto now = std::chrono::system_clock::now();
auto time = std::chrono::system_clock::to_time_t(now);
return std::string(ctime(&time));
});┌─────────────────────────────────────────────────────────┐
│ TcpServer │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Accept Thread │ │
│ │ - accept() 循环 │ │
│ │ - 为每个客户端创建 SocketConnection │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Client 1 SocketConnection │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Recv Thread │ │ Send Thread │ │ │
│ │ │ - recv() │ │ - send() │ │ │
│ │ │ - 解析命令 │ │ - 队列处理 │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Client 2 SocketConnection │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Recv Thread │ │ Send Thread │ │ │
│ │ │ - recv() │ │ - send() │ │ │
│ │ │ - 解析命令 │ │ - 队列处理 │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ ThreadPool │ │
│ │ Worker 1 Worker 2 Worker 3 Worker 4 │ │
│ │ [命令执行] [命令执行] [命令执行] [命令执行] │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
sequenceDiagram
participant Client as 客户端
participant SC as SocketConnection
participant TS as TcpServer
participant CH as CommandHandler
participant Callback as 命令回调
Client->>SC: 发送命令: GETFILENAME file.txt
SC->>SC: 接收数据
SC->>SC: 解析命令和参数
SC->>TS: executeCommand(clientId, "GETFILENAME", "file.txt")
TS->>CH: executeCommand(clientId, "GETFILENAME", "file.txt")
CH->>CH: 查找命令注册表
CH->>Callback: 调用回调函数
Callback-->>CH: 返回响应: "OK file.txt"
CH-->>TS: 返回响应
TS-->>SC: 返回响应
SC->>SC: 发送响应: "OK file.txt"
SC-->>Client: 发送响应
sequenceDiagram
participant Client1 as 客户端1
participant Client2 as 客户端2
participant AcceptThread as Accept Thread
participant TS as TcpServer
participant SC1 as SocketConnection1
participant SC2 as SocketConnection2
participant ThreadPool as ThreadPool
participant CH as CommandHandler
AcceptThread->>TS: accept() 接受连接
TS->>SC1: 创建 SocketConnection1
TS->>SC2: 创建 SocketConnection2
Note over Client1,Client2: 并发执行,互不干扰
par Client1 发送命令
Client1->>SC1: 发送命令: ECHO hello
SC1->>SC1: 解析命令 (独立缓冲区)
SC1->>ThreadPool: enqueue(命令处理)
and Client2 发送命令
Client2->>SC2: 发送命令: TIME
SC2->>SC2: 解析命令 (独立缓冲区)
SC2->>ThreadPool: enqueue(命令处理)
end
Note over ThreadPool: 线程池并发处理
par 线程池处理 Client1 命令
ThreadPool->>CH: 执行 ECHO 命令
CH->>CH: 查找命令
CH-->>ThreadPool: 返回响应: "hello"
and 线程池处理 Client2 命令
ThreadPool->>CH: 执行 TIME 命令
CH->>CH: 查找命令
CH-->>ThreadPool: 返回响应
end
Note over SC1,SC2: 独立发送队列
par Client1 接收响应
ThreadPool-->>SC1: 响应
SC1->>SC1: 加入发送队列
SC1->>Client1: 发送响应
and Client2 接收响应
ThreadPool-->>SC2: 响应
SC2->>SC2: 加入发送队列
SC2->>Client2: 发送响应
end
Note over Client1,Client2: 无串包,响应准确
| 特性 | 说明 |
|---|---|
| 独立接收线程 | 每个客户端有独立的接收线程,各自维护独立的接收缓冲区 |
| 独立解析 | 命令解析在各自的连接对象中进行,互不干扰 |
| 线程池调度 | 命令处理由线程池调度,支持并发执行 |
| 独立发送队列 | 每个连接有独立的发送队列,避免发送冲突 |
| 无串包 | 每个连接的响应只发送给对应的客户端 |
| 及时性 | 客户端可以同时发送命令,互不影响 |
sequenceDiagram
participant App as 应用程序
participant TS as TcpServer
participant SC as SocketConnection
participant SendThread as Send Thread
participant Client as 客户端
App->>TS: sendToClient(clientId, packet)
TS->>SC: 查找 SocketConnection
SC->>SC: 加入发送队列
SC->>SendThread: notify_one()
SendThread->>SC: 从队列取出数据
SendThread->>Client: sendmsg() 发送数据
SendThread->>SC: 更新发送统计
SC-->>TS: 返回成功
TS-->>App: 返回 true
sequenceDiagram
participant App as 应用程序
participant TS as TcpServer
participant SC as SocketConnection
participant SendThread as Send Thread
participant Client as 客户端
App->>TS: sendToClient(clientId, DataPacket::createDmaBuf(dmabuf))
TS->>SC: 查找 SocketConnection
SC->>SC: 加入发送队列
SC->>SendThread: notify_one()
SendThread->>SC: 从队列取出数据
SendThread->>SC: 检查数据类型 = DMABUF_FD
SendThread->>SendThread: 使用 SCM_RIGHTS 传递 fd
SendThread->>Client: sendmsg() 发送 fd
SendThread->>Client: sendmsg() 发送数据
SC->>SC: 更新发送统计
SC-->>TS: 返回成功
TS-->>App: 返回 true
sequenceDiagram
participant Main as main()
participant TS as TcpServer
participant CH as CommandHandler
participant AcceptThread as Accept Thread
participant SC as SocketConnection
Main->>CH: 创建 CommandHandler
Main->>CH: registerCommand()
Main->>TS: TcpServer(config, &handler)
Main->>TS: start()
TS->>TS: 创建监听 socket
TS->>TS: 设置非阻塞模式
TS->>AcceptThread: 启动 accept 循环
AcceptThread->>AcceptThread: accept() 等待连接
AcceptThread->>TS: accept() 接受连接
TS->>SC: 创建 SocketConnection
TS->>SC: startReceiveLoop()
SC->>SC: 启动接收线程
SC->>SC: 启动发送线程
Note over Main,SC: 服务器运行中...
Main->>TS: stop()
TS->>AcceptThread: 停止 accept 循环
AcceptThread->>AcceptThread: join()
TS->>SC: 断开所有连接
SC->>SC: 停止接收/发送线程
SC->>SC: join()
SC-->>TS: 连接关闭
TS-->>Main: 服务器停止
- 非阻塞 I/O: Socket 设置为非阻塞模式
- 线程池: 复用线程,减少创建开销
- 零拷贝: 使用 SCM_RIGHTS 传递 DMA-BUF fd
- 队列缓冲: 发送队列解耦生产者和消费者
#include "v4l2/cameraController.h"
#include "net/tcpServer.h"
auto camera = std::make_shared<CameraController>(/* config */);
camera->start();
camera.setFrameCallback([&](FramePtr frame){
DmaBufferPtr dmabuf = frame->sharedState(0)->dmabuf_ptr;
server.broadcast(DataPacket::createDmaBuf(dmabuf));
});#include "rga/rgaProcessor.h"
auto& rga = RgaConverter::instance();
FramePtr processed = rga->dosomething();
server.sendToClient(clientId, DataPacket::createDmaBuf(processed));主要配置项:
- port: 监听端口
- maxClients: 最大客户端连接数
- threadPoolMin/Max: 线程池大小
- enableKeepAlive: TCP Keep-Alive 开关
- 所有异常通过 TcpException 抛出
- Socket 错误自动记录到 lastError_
- 连接状态通过 ConnectionState 暴露
-
命令名称大小写敏感
- "GET" 和 "get" 是不同的命令
- 建议使用大写命名约定
-
DMA-BUF 所有权转移
- 使用 createDmaBuf() 后,DmaBufferPtr 的所有权共享(拷贝)给 DataPacket
- 发送完成后,DmaBufferPtr 自动释放
-
线程安全
- TcpServer 的方法是线程安全的
- CommandHandler 的方法是线程安全的
- CommandCallback 在 ThreadPool 中执行,需要线程安全
-
资源清理
- 断开连接时,所有资源自动清理(RAII)
- 不要在回调中持有 SocketConnection 的裸指针
-
所有权管理
- TcpServer 必须使用
shared_ptr管理,因为内部使用shared_from_this()获取实例 - SocketConnection 使用
weak_ptr引用 TcpServer,通过lock()安全访问 - TcpServer 使用
weak_ptr引用 CommandHandler,避免循环引用 - CommandHandler 使用
shared_ptr管理自身生命周期
- TcpServer 必须使用
- asyncThreadPool: 线程池
- SafeQueue: 安全队列
- DmaBuffer: DMA-BUF 管理
- FdWrapper: FD 资源管理
主页
API 文档
DMA 模块
DRM 模块
- DRM 模块总览
- DeviceController - DRM 设备控制器
- DrmLayer - DRM 图层管理
- PlanesCompositor - DRM 平面合成器
- DrmBpp - DRM 格式定义
NET 模块
- NET 模块总览
- TcpServer - TCP 服务器
- SocketConnection - Socket 连接管理
- CommandHandler - 命令处理器
- DataPacket - 数据包
V4L2 模块
- V4L2 模块总览
- CameraController - V4L2 摄像头控制器
- Frame - V4L2 帧数据结构
- FormatTool - V4L2 格式工具
- Exception - V4L2 异常类
V4L2Param 模块
- V4L2Param 模块总览
- ParamControl - 参数控制
- ParamLogger - 参数日志
- ParamProcessor - 参数处理器
RGA 模块
- RGA 模块总览
- RgaConverter - RGA 转换器
- RgaProcessor - RGA 处理器
- FormatTool - RGA 格式工具
MPP 模块
- MPP 模块总览
- EncoderContext - 编码器上下文
- EncoderCore - 编码器核心
- JpegEncoder - JPEG 编码器
- StreamWriter - 流写入器
- MppResourceGuard - MPP 资源守护
- FileTools - 文件工具
- FormatTool - 格式工具
Sys 模块
- Sys 模块总览
- CpuMonitor - CPU 监控器
- MemoryMonitor - 内存监控器
- Base - 基础类
Mouse 模块
- Mouse 模块总览
- Watcher - 鼠标监视器
Utils 模块
- Utils 模块总览
- AsyncThreadPool - 异步线程池
- ConcurrentQueue - 并发队列
- FdWrapper - 文件描述符包装器
- FenceWatcher - 围栏监视器
- FixedSizePool - 固定大小对象池
- Logger - 日志记录器
- ObjectsPool - 对象池
- OrderedQueue - 有序队列
- ProgressBar - 进度条
- SafeQueue - 安全队列
- SharedBufferState - 共享缓冲区状态
- SimpleVariant - 简单变体类型
- ThreadPauser - 线程暂停器
- ThreadUtils - 线程工具
- Types - 类型定义
- UdevMonitor - Udev 监视器