Skip to content

NET_README

SweerItTer edited this page Feb 21, 2026 · 1 revision

NET 模块

概述

NET 模块提供了基于 Linux Socket 的 TCP 服务器功能,支持命令注册、DMA-BUF 鄄拷贝传输、多客户端并发连接等特性。

设计哲学

NET 模块严格遵循 utilsCore 的设计哲学:

1. 数据流优先

  • 清晰的接收-解析-执行-发送流程
  • 每个客户端有独立的接收/发送线程
  • 线程池异步处理命令
  • 发送队列解耦生产者和消费者

2. 显式硬件暴露

  • Socket 文件描述符通过 FdWrapper 显式管理
  • 连接状态和错误信息对使用者可见
  • 支持 DMA-BUF 文件描述符传输(零拷贝)

3. RAII 原则

  • FdWrapper 自动管理 socket 生命周期
  • 移动语义,禁止拷贝
  • 异常安全析构

4. 工程可读性

  • 清晰的接口设计
  • 详细的日志输出
  • 明确的职责划分

5. 模块化架构

  • TcpServer: 专注连接管理和数据传输
  • CommandHandler: 专注命令注册和分发
  • SocketConnection: 专注单个客户端连接处理

6. 现代 C++ 最佳实践

  • 使用 std::weak_ptr 避免所有权问题
  • 使用 std::shared_ptr 管理共享资源
  • 使用移动语义优化性能
  • 使用 noexcept 标记不抛出异常的函数

核心类

TcpServer

TCP 服务器主类,负责服务器生命周期管理和全局调度。

职责:

  • 服务器启动/停止
  • 客户端连接管理
  • 数据发送(单播/广播)
  • 命令处理委托

使用示例:

#include "net/tcpServer.h"

int main() {
    TcpServerConfig config;
    config.port = 8080;
    config.maxClients = 64;

    // 创建命令处理器
    auto commandHandler = std::make_shared<CommandHandler>();
    commandHandler->registerCommand("ECHO", [](uint64_t clientId, auto, const std::string& params) {
        return params;
    });

    // 创建服务器并设置命令处理器(注意:必须使用 shared_ptr 管理)
    auto server = std::make_shared<TcpServer>(config, commandHandler);
    server->start();

    while (server->isRunning()) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }

    return 0;
}

详见:TcpServer 详细文档

SocketConnection

Socket 连接管理类,负责单个客户端连接的生命周期。

职责:

  • 接收客户端数据
  • 解析命令并委托给 TcpServer
  • 发送响应数据
  • 连接状态管理
  • 错误处理

详见:SocketConnection 详细文档

CommandHandler

命令处理器,管理命令注册表和分发逻辑。

职责:

  • 命令注册/注销
  • 命令执行
  • 命令元数据管理

详见:CommandHandler 详细文档

DataPacket

数据包类型,支持多种数据传输方式。

数据类型:

  • TEXT: 普通文本数据
  • DMABUF_FD: DMA-BUF 文件描述符(零拷贝)
  • RAW_FD: 裸文件描述符

使用示例:

// 发送文本
DataPacket pkt = DataPacket::createText("Hello World");
server.sendToClient(clientId, pkt);

// 发送 DMA-BUF(零拷贝)
DmaBufferPtr dmabuf = DmaBuffer::create(1920, 1080, DRM_FORMAT_ARGB8888);
DataPacket pkt = DataPacket::createDmaBuf(dmabuf);
server.sendToClient(clientId, pkt);

详见:DataPacket 详细文档

命令注册

NET 模块支持高度自定义的命令注册机制。

注册命令

auto commandHandler = std::make_shared<CommandHandler>();

commandHandler->registerCommand("GET", [](uint64_t clientId, const std::string& command, const std::string& params) {
    return "OK GET " + params;
});

注销命令

commandHandler->unregisterCommand("GET");

命令处理流程

Client → SocketConnection → TcpServer → CommandHandler → Callback
         (接收数据)        (连接管理)    (命令分发)    (业务逻辑)

常用命令示例

GETFILENAME

commandHandler->registerCommand("GETFILENAME",
    [](uint64_t clientId, const std::string& command, const std::string& params) {
        std::string filename = params.empty() ? "default.txt" : params;
        // 处理文件请求
        return "OK " + filename;
    },
    "Get file content by filename"
);

ECHO

commandHandler->registerCommand("ECHO", [](uint64_t clientId, const std::string& command, const std::string& params) {
    return params;
});

TIME

commandHandler->registerCommand("TIME", [](uint64_t clientId, const std::string& command, const std::string& params) {
    auto now = std::chrono::system_clock::now();
    auto time = std::chrono::system_clock::to_time_t(now);
    return std::string(ctime(&time));
});

线程模型

┌─────────────────────────────────────────────────────────┐
│                     TcpServer                            │
│  ┌─────────────────────────────────────────────────┐   │
│  │            Accept Thread                         │   │
│  │  - accept() 循环                                 │   │
│  │  - 为每个客户端创建 SocketConnection             │   │
│  └─────────────────────────────────────────────────┘   │
│                                                          │
│  ┌─────────────────────────────────────────────────┐   │
│  │         Client 1 SocketConnection                │   │
│  │  ┌─────────────┐  ┌─────────────┐              │   │
│  │  │ Recv Thread │  │ Send Thread │              │   │
│  │  │ - recv()    │  │ - send()    │              │   │
│  │  │ - 解析命令  │  │ - 队列处理  │              │   │
│  │  └─────────────┘  └─────────────┘              │   │
│  └─────────────────────────────────────────────────┘   │
│                                                          │
│  ┌─────────────────────────────────────────────────┐   │
│  │         Client 2 SocketConnection                │   │
│  │  ┌─────────────┐  ┌─────────────┐              │   │
│  │  │ Recv Thread │  │ Send Thread │              │   │
│  │  │ - recv()    │  │ - send()    │              │   │
│  │  │ - 解析命令  │  │ - 队列处理  │              │   │
│  │  └─────────────┘  └─────────────┘              │   │
│  └─────────────────────────────────────────────────┘   │
│                                                          │
│  ┌─────────────────────────────────────────────────┐   │
│  │            ThreadPool                           │   │
│  │  Worker 1  Worker 2  Worker 3  Worker 4        │   │
│  │  [命令执行]  [命令执行]  [命令执行]  [命令执行]  │   │
│  └─────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────┘

流程图

单次连接流程

sequenceDiagram
    participant Client as 客户端
    participant SC as SocketConnection
    participant TS as TcpServer
    participant CH as CommandHandler
    participant Callback as 命令回调

    Client->>SC: 发送命令: GETFILENAME file.txt
    SC->>SC: 接收数据
    SC->>SC: 解析命令和参数
    SC->>TS: executeCommand(clientId, "GETFILENAME", "file.txt")
    TS->>CH: executeCommand(clientId, "GETFILENAME", "file.txt")
    CH->>CH: 查找命令注册表
    CH->>Callback: 调用回调函数
    Callback-->>CH: 返回响应: "OK file.txt"
    CH-->>TS: 返回响应
    TS-->>SC: 返回响应
    SC->>SC: 发送响应: "OK file.txt"
    SC-->>Client: 发送响应
Loading

多客户并发流程

sequenceDiagram
    participant Client1 as 客户端1
    participant Client2 as 客户端2
    participant AcceptThread as Accept Thread
    participant TS as TcpServer
    participant SC1 as SocketConnection1
    participant SC2 as SocketConnection2
    participant ThreadPool as ThreadPool
    participant CH as CommandHandler

    AcceptThread->>TS: accept() 接受连接
    TS->>SC1: 创建 SocketConnection1
    TS->>SC2: 创建 SocketConnection2

    Note over Client1,Client2: 并发执行,互不干扰

    par Client1 发送命令
        Client1->>SC1: 发送命令: ECHO hello
        SC1->>SC1: 解析命令 (独立缓冲区)
        SC1->>ThreadPool: enqueue(命令处理)
    and Client2 发送命令
        Client2->>SC2: 发送命令: TIME
        SC2->>SC2: 解析命令 (独立缓冲区)
        SC2->>ThreadPool: enqueue(命令处理)
    end

    Note over ThreadPool: 线程池并发处理

    par 线程池处理 Client1 命令
        ThreadPool->>CH: 执行 ECHO 命令
        CH->>CH: 查找命令
        CH-->>ThreadPool: 返回响应: "hello"
    and 线程池处理 Client2 命令
        ThreadPool->>CH: 执行 TIME 命令
        CH->>CH: 查找命令
        CH-->>ThreadPool: 返回响应
    end

    Note over SC1,SC2: 独立发送队列

    par Client1 接收响应
        ThreadPool-->>SC1: 响应
        SC1->>SC1: 加入发送队列
        SC1->>Client1: 发送响应
    and Client2 接收响应
        ThreadPool-->>SC2: 响应
        SC2->>SC2: 加入发送队列
        SC2->>Client2: 发送响应
    end

    Note over Client1,Client2: 无串包,响应准确
Loading

并发特性说明

特性 说明
独立接收线程 每个客户端有独立的接收线程,各自维护独立的接收缓冲区
独立解析 命令解析在各自的连接对象中进行,互不干扰
线程池调度 命令处理由线程池调度,支持并发执行
独立发送队列 每个连接有独立的发送队列,避免发送冲突
无串包 每个连接的响应只发送给对应的客户端
及时性 客户端可以同时发送命令,互不影响

数据发送流程

sequenceDiagram
    participant App as 应用程序
    participant TS as TcpServer
    participant SC as SocketConnection
    participant SendThread as Send Thread
    participant Client as 客户端

    App->>TS: sendToClient(clientId, packet)
    TS->>SC: 查找 SocketConnection
    SC->>SC: 加入发送队列
    SC->>SendThread: notify_one()
    SendThread->>SC: 从队列取出数据
    SendThread->>Client: sendmsg() 发送数据
    SendThread->>SC: 更新发送统计
    SC-->>TS: 返回成功
    TS-->>App: 返回 true
Loading

DMA-BUF 零拷贝流程

sequenceDiagram
    participant App as 应用程序
    participant TS as TcpServer
    participant SC as SocketConnection
    participant SendThread as Send Thread
    participant Client as 客户端

    App->>TS: sendToClient(clientId, DataPacket::createDmaBuf(dmabuf))
    TS->>SC: 查找 SocketConnection
    SC->>SC: 加入发送队列
    SC->>SendThread: notify_one()
    SendThread->>SC: 从队列取出数据
    SendThread->>SC: 检查数据类型 = DMABUF_FD
    SendThread->>SendThread: 使用 SCM_RIGHTS 传递 fd
    SendThread->>Client: sendmsg() 发送 fd
    SendThread->>Client: sendmsg() 发送数据
    SC->>SC: 更新发送统计
    SC-->>TS: 返回成功
    TS-->>App: 返回 true
Loading

服务器生命周期流程

sequenceDiagram
    participant Main as main()
    participant TS as TcpServer
    participant CH as CommandHandler
    participant AcceptThread as Accept Thread
    participant SC as SocketConnection

    Main->>CH: 创建 CommandHandler
    Main->>CH: registerCommand()
    Main->>TS: TcpServer(config, &handler)
    Main->>TS: start()
    TS->>TS: 创建监听 socket
    TS->>TS: 设置非阻塞模式
    TS->>AcceptThread: 启动 accept 循环

    AcceptThread->>AcceptThread: accept() 等待连接
    AcceptThread->>TS: accept() 接受连接
    TS->>SC: 创建 SocketConnection
    TS->>SC: startReceiveLoop()
    SC->>SC: 启动接收线程
    SC->>SC: 启动发送线程

    Note over Main,SC: 服务器运行中...

    Main->>TS: stop()
    TS->>AcceptThread: 停止 accept 循环
    AcceptThread->>AcceptThread: join()
    TS->>SC: 断开所有连接
    SC->>SC: 停止接收/发送线程
    SC->>SC: join()
    SC-->>TS: 连接关闭
    TS-->>Main: 服务器停止
Loading

性能优化

  1. 非阻塞 I/O: Socket 设置为非阻塞模式
  2. 线程池: 复用线程,减少创建开销
  3. 零拷贝: 使用 SCM_RIGHTS 传递 DMA-BUF fd
  4. 队列缓冲: 发送队列解耦生产者和消费者

与其他模块集成

与 V4L2 集成

#include "v4l2/cameraController.h"
#include "net/tcpServer.h"

auto camera = std::make_shared<CameraController>(/* config */);
camera->start();

camera.setFrameCallback([&](FramePtr frame){
    DmaBufferPtr dmabuf = frame->sharedState(0)->dmabuf_ptr;
    server.broadcast(DataPacket::createDmaBuf(dmabuf));
});

与 RGA 集成

#include "rga/rgaProcessor.h"

auto& rga = RgaConverter::instance();
FramePtr processed = rga->dosomething();

server.sendToClient(clientId, DataPacket::createDmaBuf(processed));

配置选项

详见:TcpServerConfig

主要配置项:

  • port: 监听端口
  • maxClients: 最大客户端连接数
  • threadPoolMin/Max: 线程池大小
  • enableKeepAlive: TCP Keep-Alive 开关

错误处理

  • 所有异常通过 TcpException 抛出
  • Socket 错误自动记录到 lastError_
  • 连接状态通过 ConnectionState 暴露

注意事项

  1. 命令名称大小写敏感

    • "GET" 和 "get" 是不同的命令
    • 建议使用大写命名约定
  2. DMA-BUF 所有权转移

    • 使用 createDmaBuf() 后,DmaBufferPtr 的所有权共享(拷贝)给 DataPacket
    • 发送完成后,DmaBufferPtr 自动释放
  3. 线程安全

    • TcpServer 的方法是线程安全的
    • CommandHandler 的方法是线程安全的
    • CommandCallback 在 ThreadPool 中执行,需要线程安全
  4. 资源清理

    • 断开连接时,所有资源自动清理(RAII)
    • 不要在回调中持有 SocketConnection 的裸指针
  5. 所有权管理

    • TcpServer 必须使用 shared_ptr 管理,因为内部使用 shared_from_this() 获取实例
    • SocketConnection 使用 weak_ptr 引用 TcpServer,通过 lock() 安全访问
    • TcpServer 使用 weak_ptr 引用 CommandHandler,避免循环引用
    • CommandHandler 使用 shared_ptr 管理自身生命周期

相关模块

  • asyncThreadPool: 线程池
  • SafeQueue: 安全队列
  • DmaBuffer: DMA-BUF 管理
  • FdWrapper: FD 资源管理

示例代码

参考文档

主页

API 文档

DMA 模块

DRM 模块

NET 模块

V4L2 模块

V4L2Param 模块

RGA 模块

MPP 模块

Sys 模块

Mouse 模块

Utils 模块

Clone this wiki locally