Skip to content

Kindin-X/CurvaCore

Repository files navigation

CurvaCore-GNN 项目架构指南

项目类型: 大规模图神经网络训练系统
核心技术: Python + C++ + io_uring
应用场景: 超大图(超出单机内存)的 GNN 训练(但由于设备限制,目前只用小规模图测试)


总体架构

CurvaCore-GNN 采用 三阶段离线-在线流水线 架构:

Phase 1: 图拓扑重构        Phase 2: 采样与打包          Phase 3: 异步I/O训练
┌─────────────────┐      ┌──────────────────┐      ┌─────────────────┐
│ stage1_rewiring │ ---> │ stage2_packing   │ ---> │ stage3_training │
│   (Python)      │      │   (Python+C++)   │      │  (C++/Python)   │
└─────────────────┘      └──────────────────┘      └─────────────────┘
   离线预处理           离线数据优化              在线训练执行
   
输入: 原始图拓扑      输入: Phase1输出       输入: Phase2输出
输出: 重构图结构      输出: 物理打包格式     输出: 训练

文件详细说明

根目录核心文件

1. stage1_rewiring.py - Phase 1:图拓扑重构

位置: ./stage1_rewiring.py
关键函数:

函数名 功能说明
prepare_ogb_data() 加载 Cora/OGB 数据集,导出初始二进制文件
load_graph() 从 CSR 二进制格式加载图结构入内存
apply_degree_cap() 对图的最大度数进行截断(稳定邻接表大小)
save_graph() 将处理后的图转换为 CSR 二进制导出

流程:

# 步骤 1: 加载数据 → 转 CSR → 导出二进制
prepare_ogb_data()  
# offset.bin, edges.bin, raw_features.bin

# 步骤 2: 读取原始图
edge_index, num_nodes = load_graph("offset.bin", "edges.bin")

# 步骤 3: 运行 SDRF 图重构算法 (来自 laser-release)
new_edge_index, _ = sdrf(data, loops=5, remove_edges=False, is_undirected=True)

# 步骤 4: 度数截断(为 RingSampler 稳定化)
capped_edge_index = apply_degree_cap(new_edge_index, num_nodes, max_degree=64)

# 步骤 5: 导出重构结果
save_graph(capped_edge_index, num_nodes, "rewired_indptr.bin", "rewired_indices.bin")

关键依赖:

  • laser-release/laser/rewiring/sdrf/sdrf.py - SDRF 算法实现
  • PyTorch Geometric - 图数据结构
  • OGB - 数据集下载

输入文件: 无(自动下载 Cora)
输出文件:

  • offset.bin - 图索引指针 (uint64)
  • edges.bin - 图边列表 (uint64)
  • raw_features.bin - 原始节点特征 (float32)
  • rewired_indptr.bin - 重构后索引指针
  • rewired_indices.bin - 重构后边列表

2. stage2_packing.py - Phase 2:采样与特征打包

位置: ./stage2_packing.py
关键函数:

函数名 功能说明
convert_phase1_to_offgs_dataset() 将二进制格式转换为 DiskGNN 标准格式
run_diskgnn_offline_sampling() 执行离线采样,统计节点访问热度
run_diskgnn_feat_packing() MinHash 特征聚类与物理重排
extract_to_ringsampler_format() 提取为 RingSampler 优化格式

流程:

# 步骤 1: 组织数据为 DiskGNN 标准格式
convert_phase1_to_offgs_dataset(
    "rewired_indptr.bin", "rewired_indices.bin", "raw_features.bin",
    "./temp_offgs_dataset"
)
# 产生: graph.pth (GraphBolt), features.bin, split_idx.pth, conf.json

# 步骤 2: 离线采样 (fanout=10,10,两层邻接)
run_diskgnn_offline_sampling(
    "./temp_offgs_dataset", "./temp_sampling_output", 
    batch_size=1024, fanout="10,10"
)
# 统计每个节点被采样的频率

# 步骤 3: 特征打包与重排 (基于 MinHash 降低 I/O 碎片)
run_diskgnn_feat_packing(
    "./temp_offgs_dataset", "./temp_sampling_output",
    fanout="10,10", cache_size=10*1024*1024
)
# 生成: meta_node_popularity.pt (热度排序)

# 步骤 4: 提取为 RingSampler 格式(4KB 对齐)
extract_to_ringsampler_format(
    "./temp_offgs_dataset", "./temp_sampling_output", "10,10",
    "offset.bin", "edges.bin", "features.bin"
)

关键依赖:

  • DiskGNN/python/offgs - OffgsDataset 数据加载器
  • DiskGNN/examples/sampling.py - DGL/GraphBolt 采样引擎
  • DiskGNN/examples/feat_packing.py - MinHash 打包与重排
  • DiskGNN/src/* - C++ 底层库

输入文件:

  • offset.bin, edges.bin, raw_features.bin

输出文件:

  • offset.bin - 重排后的 CSR 索引 (uint64)
  • edges.bin - 重排后的邻接表 (uint32)
  • features.bin - 按热度重排的特征(4KB 对齐)

3. stage3_training.py - Phase 3:异步 I/O 训练

位置: ./stage3_training.py
关键函数:

函数名 功能说明
io_producer() 后台 I/O 线程:采样 + 读特征
RingEngine.sample_batch() C++ 端异步拓扑采样
RingEngine.read_features() C++ 端零拷贝特征读取(O_DIRECT)

流程:

# 步骤 1: 初始化异步 I/O 引擎
engine = stage3_engine.RingEngine(
    "features.bin", "edges.bin", "offset.bin",
    num_features=128, q_depth=128  # io_uring 队列深度
)

# 步骤 2: 后台 I/O 线程生产者
def io_producer():
    for epoch in range(num_epochs):
        for batch_idx in range(num_batches_per_epoch):
            # 随机选择 target nodes
            target_nodes = np.random.randint(0, num_nodes, size=batch_size)
            
            # 异步拓扑采样(RingSampler 嵌入)
            unique_nodes = engine.sample_batch(target_nodes)  # C++
            
            # 零拷贝异步特征读取(O_DIRECT)
            feat_numpy, offsets = engine.read_features(unique_nodes)  # C++
            
            # 推入预取队列供 GPU 消费
            prefetch_queue.put((feat_numpy, offsets, unique_nodes))

# 步骤 3: 主线程 GPU 消费者(双缓冲流水线)
for epoch in range(num_epochs):
    for batch_idx in range(num_batches_per_epoch):
        # 等待 I/O 数据
        feat_numpy, offsets, unique_nodes = prefetch_queue.get()
        
        # 在 CPU 上快速裁剪掉 4KB 对齐填充
        clean_feat = feat_numpy[row_indices, col_indices]
        
        # 只传输紧凑特征给 GPU
        feat_tensor = torch.from_numpy(clean_feat).to(device)
        
        # 由于这部分不是项目重点,所以用极简模型做 forward/backward
        out = model(feat_tensor)
        loss = out.sum()
        loss.backward()
        optimizer.step()

关键依赖:

  • C++ 编译扩展(编译自 stage3_engine.cpp`)
  • RingSampler/src/*.c - io_uring 采样核心

输入文件:

  • offset.bin, edges.bin, features.bin
  • temp_offgs_dataset/conf.json

输出:

  • 训练日志(时间统计)

其他目录结构

DiskGNN/ - 存储与 I/O 优化框架

DiskGNN/
├── python/
│   ├── offgs/
│   │   ├── __init__.py
│   │   ├── dataset.py           # OffgsDataset 类:加载二进制组织的图数据
│   │   │   └── class OffgsDataset:
│   │   │       ├── __init__(path)      # 加载 conf.json, graph.pth
│   │   │       ├── get_data()          # 返回 (features, edge_index, labels)
│   │   │       └── __getitem__(idx)    # 按 mini-batch 获取数据
│   │   └── utils.py             # 辅助:数据加载工具函数
│   └── setup.py                 # offgs 模块的安装脚本
│
├── examples/
│   ├── sampling.py              # run(args, dataset) - DGL 离线采样
│   │   └── class Args:
│   │       ├── dataset          # 数据集名称
│   │       ├── fanout           # "10,10" 采样层数
│   │       ├── batchsize        # mini-batch 大小
│   │       ├── store_path       # 采样输出路径
│   │       └── num_workers      # 线程数量
│   │
│   ├── feat_packing.py          # run(dataset, args) - MinHash 打包与重排
│   │   └── 使用 DGL GraphBolt API
│   │       ├── 计算节点热度(采样频率)
│   │       ├── MinHash 聚类相似节点
│   │       ├── 产生新 ID 映射表
│   │       └── 重排特征文件(4KB 对齐)
│   │
│   └── utils.py                 # 数据加载、图格式转换等工具
│       ├── load_dataset()
│       ├── create_loader()
│       └── check_device()
│
└── src/                         # C++ 核心库

laser-release/ - 图拓扑优化算法

laser-release/
├── laser/
│   └── rewiring/
│       └── sdrf/                #SDRF (Steepest Descent Ricci Flow)
│           ├── sdrf.py          
│           │   ├── 输入: PyG Data 对象 (edge_index, x)
│           │   ├── 输入: loops=5 迭代次数
│           │   ├── 输入: remove_edges=False 是否删边
│           │   ├── 输出: (new_edge_index, edge_weights)
│           │   └── 原理: 基于 Ricci 曲率的图重构
│           │
│           └── __init__.py
|        └──transform.py
│
├── setup.py                    
└── 

RingSampler/src/ - 高性能拓扑采样核心

RingSampler/src/
├── core.h / core.c              # io_uring 初始化、队列管理
│   ├── struct io_uring ring     # io_uring 上下文
│   ├── io_uring_queue_init()    # 初始化队列
│   ├── io_uring_submit()        # 提交 I/O 请求
│   └── io_uring_wait_cqe()      # 等待完成事件
│
├── layer_sampling.c / .h        # K-hop 邻接表采样
│   ├── sample_one_layer()       # 从图中采样一跳邻接
│   └── fanout 参数化
│
├── random_selection.c / .h      # 随机选择(ReservoirSampling)
│   └── random_select_nodes()    # 无偏采样
│
├── remove_duplicate.c / .h      # 去重优化
│   └── deduplicate_set()        # HashSet 基去重
│
└── utils.c / .h                 # 工具函数
    ├── index_lookup()           # 数据结构查询
    └── memory_align()           # 4KB 对齐辅助

编译相关文件

编译命令:

python setup_stage3.py build_ext --inplace
# 产生: stage3_engine.cpython-310-x86_64-linux-gnu.so,可能有一些重复定义,用CFLAGS=‘-fcommon’即可

stage3_engine.cpp - Python/C++ 桥接与 io_uring 引擎

关键设计:

  • io_uring 双队列: 分别用于特征读(O_DIRECT)和拓扑采样
  • 零拷贝接口: 直接返回 NumPy 数组指针,避免数据复制
  • 4KB 对齐: 特征文件按 4KB 页大小对齐,提高 SSD I/O 效率

数据流图

Phase 1: 拓扑重构
┌──────────────────┐
│ Cora 数据集      │
└────────┬─────────┘
         │ (加载)
         ▼
┌──────────────────┐
│ edge_index       │
│ x (features)     │
└────────┬─────────┘
         │
         │ (SDRF 算法)
         ▼
┌──────────────────────┐
│ new_edge_index       │
│ (拓扑重构后)         │
└────────┬─────────────┘
         │ (度数截断)
         ▼
┌──────────────────────┐
│ rewired_*.bin        │
│ (二进制 CSR 格式)    │
└──────────────────────┘

Phase 2: 采样与打包
┌──────────────────────┐
│ rewired_*.bin        │ (Phase 1 输出)
│ raw_features.bin     │
└────────┬─────────────┘
         │
         │ (格式转换)
         ▼
┌──────────────────────┐
│ OffgsDataset         │
│ (GraphBolt 格式)     │
└────────┬─────────────┘
         │
         │ (DGL 离线采样)
         ▼
┌──────────────────────┐
│ 节点热度统计         │
│ meta_node_popularity │
└────────┬─────────────┘
         │
         │ (MinHash 打包)
         ▼
┌──────────────────────────┐
│ 重排特征文件             │
│ (4KB 对齐)               │
│ ID 映射表                │
└────────┬─────────────────┘
         │
         ▼
┌──────────────────────────┐
│ offset.bin (uint64)      │
│ edges.bin (uint32)       │
│ features.bin (4KB对齐)   │
└──────────────────────────┘

Phase 3: 异步 I/O 训练
┌──────────────────────────┐
│ offset.bin, edges.bin    │ (Phase 2 输出)
│ features.bin             │
└────────┬─────────────────┘
         │
         │ (RingEngine 初始化)
         ▼
┌──────────────────────────┐
│ io_uring 队列 (双队列)   │
│ - 特征读取队列 (O_DIRECT)│
│ - 拓扑采样队列           │
└────────┬─────────────────┘
         │
    ┌────┴───────────┐
    │                │
    ▼ (后台)         ▼ (主线程)
┌─────────────┐  ┌──────────────┐
│ I/O 线程    │  │ GPU 训练     │
│ - 采样      │  │ - Forward    │
│ - 读特征    │  │ - Backward   │
└──────┬──────┘  └──────────────┘
       │
    (预取队列)
       │
      ▼
   模型收敛

核心参数说明

参数名 阶段 含义 默认值 备注
loops Phase 1 SDRF 迭代次数 5 越多图优化越好,耗时越长
max_degree Phase 1 度数截断上限 64 稳定 RingSampler 内存占用
fanout Phase 2 采样配置 "layer1,layer2,..." "10,10" 2层邻接,每层采10个
batch_size Phase 2/3 mini-batch 大小 1024 越大内存压力越大
q_depth Phase 3 io_uring 队列深度 128 越大 I/O 吞吐越高
feat_cache_size Phase 2 特征缓存大小 10MB 用于 MinHash 聚类

完整执行流程示例

# 前置:编译 C++ 扩展
python setup_stage3.py build_ext --inplace

# Phase 1:图拓扑重构(~时间取决于图大小)
python stage1_rewiring.py
# 输入: 无(自动下载)
# 输出: rewired_indptr.bin, rewired_indices.bin, raw_features.bin

# Phase 2:采样与特征打包
python stage2_packing.py
# 输入: 上一步的三个文件
# 输出: offset.bin, edges.bin, features.bin (优化后)

# Phase 3:异步 I/O 训练(持续,3 epochs × 10 batches)
python stage3_training.py
# 输入: phase2 的三个文件 + conf.json
# 输出: 时间统计、训练日志

最后更新: 2026-04-11

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors