项目类型: 大规模图神经网络训练系统
核心技术: Python + C++ + io_uring
应用场景: 超大图(超出单机内存)的 GNN 训练(但由于设备限制,目前只用小规模图测试)
CurvaCore-GNN 采用 三阶段离线-在线流水线 架构:
Phase 1: 图拓扑重构 Phase 2: 采样与打包 Phase 3: 异步I/O训练
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ stage1_rewiring │ ---> │ stage2_packing │ ---> │ stage3_training │
│ (Python) │ │ (Python+C++) │ │ (C++/Python) │
└─────────────────┘ └──────────────────┘ └─────────────────┘
离线预处理 离线数据优化 在线训练执行
输入: 原始图拓扑 输入: Phase1输出 输入: Phase2输出
输出: 重构图结构 输出: 物理打包格式 输出: 训练
位置: ./stage1_rewiring.py
关键函数:
| 函数名 | 功能说明 |
|---|---|
prepare_ogb_data() |
加载 Cora/OGB 数据集,导出初始二进制文件 |
load_graph() |
从 CSR 二进制格式加载图结构入内存 |
apply_degree_cap() |
对图的最大度数进行截断(稳定邻接表大小) |
save_graph() |
将处理后的图转换为 CSR 二进制导出 |
流程:
# 步骤 1: 加载数据 → 转 CSR → 导出二进制
prepare_ogb_data()
# offset.bin, edges.bin, raw_features.bin
# 步骤 2: 读取原始图
edge_index, num_nodes = load_graph("offset.bin", "edges.bin")
# 步骤 3: 运行 SDRF 图重构算法 (来自 laser-release)
new_edge_index, _ = sdrf(data, loops=5, remove_edges=False, is_undirected=True)
# 步骤 4: 度数截断(为 RingSampler 稳定化)
capped_edge_index = apply_degree_cap(new_edge_index, num_nodes, max_degree=64)
# 步骤 5: 导出重构结果
save_graph(capped_edge_index, num_nodes, "rewired_indptr.bin", "rewired_indices.bin")关键依赖:
laser-release/laser/rewiring/sdrf/sdrf.py- SDRF 算法实现- PyTorch Geometric - 图数据结构
- OGB - 数据集下载
输入文件: 无(自动下载 Cora)
输出文件:
offset.bin- 图索引指针 (uint64)edges.bin- 图边列表 (uint64)raw_features.bin- 原始节点特征 (float32)rewired_indptr.bin- 重构后索引指针rewired_indices.bin- 重构后边列表
位置: ./stage2_packing.py
关键函数:
| 函数名 | 功能说明 |
|---|---|
convert_phase1_to_offgs_dataset() |
将二进制格式转换为 DiskGNN 标准格式 |
run_diskgnn_offline_sampling() |
执行离线采样,统计节点访问热度 |
run_diskgnn_feat_packing() |
MinHash 特征聚类与物理重排 |
extract_to_ringsampler_format() |
提取为 RingSampler 优化格式 |
流程:
# 步骤 1: 组织数据为 DiskGNN 标准格式
convert_phase1_to_offgs_dataset(
"rewired_indptr.bin", "rewired_indices.bin", "raw_features.bin",
"./temp_offgs_dataset"
)
# 产生: graph.pth (GraphBolt), features.bin, split_idx.pth, conf.json
# 步骤 2: 离线采样 (fanout=10,10,两层邻接)
run_diskgnn_offline_sampling(
"./temp_offgs_dataset", "./temp_sampling_output",
batch_size=1024, fanout="10,10"
)
# 统计每个节点被采样的频率
# 步骤 3: 特征打包与重排 (基于 MinHash 降低 I/O 碎片)
run_diskgnn_feat_packing(
"./temp_offgs_dataset", "./temp_sampling_output",
fanout="10,10", cache_size=10*1024*1024
)
# 生成: meta_node_popularity.pt (热度排序)
# 步骤 4: 提取为 RingSampler 格式(4KB 对齐)
extract_to_ringsampler_format(
"./temp_offgs_dataset", "./temp_sampling_output", "10,10",
"offset.bin", "edges.bin", "features.bin"
)关键依赖:
DiskGNN/python/offgs- OffgsDataset 数据加载器DiskGNN/examples/sampling.py- DGL/GraphBolt 采样引擎DiskGNN/examples/feat_packing.py- MinHash 打包与重排DiskGNN/src/*- C++ 底层库
输入文件:
offset.bin,edges.bin,raw_features.bin
输出文件:
offset.bin- 重排后的 CSR 索引 (uint64)edges.bin- 重排后的邻接表 (uint32)features.bin- 按热度重排的特征(4KB 对齐)
位置: ./stage3_training.py
关键函数:
| 函数名 | 功能说明 |
|---|---|
io_producer() |
后台 I/O 线程:采样 + 读特征 |
RingEngine.sample_batch() |
C++ 端异步拓扑采样 |
RingEngine.read_features() |
C++ 端零拷贝特征读取(O_DIRECT) |
流程:
# 步骤 1: 初始化异步 I/O 引擎
engine = stage3_engine.RingEngine(
"features.bin", "edges.bin", "offset.bin",
num_features=128, q_depth=128 # io_uring 队列深度
)
# 步骤 2: 后台 I/O 线程生产者
def io_producer():
for epoch in range(num_epochs):
for batch_idx in range(num_batches_per_epoch):
# 随机选择 target nodes
target_nodes = np.random.randint(0, num_nodes, size=batch_size)
# 异步拓扑采样(RingSampler 嵌入)
unique_nodes = engine.sample_batch(target_nodes) # C++
# 零拷贝异步特征读取(O_DIRECT)
feat_numpy, offsets = engine.read_features(unique_nodes) # C++
# 推入预取队列供 GPU 消费
prefetch_queue.put((feat_numpy, offsets, unique_nodes))
# 步骤 3: 主线程 GPU 消费者(双缓冲流水线)
for epoch in range(num_epochs):
for batch_idx in range(num_batches_per_epoch):
# 等待 I/O 数据
feat_numpy, offsets, unique_nodes = prefetch_queue.get()
# 在 CPU 上快速裁剪掉 4KB 对齐填充
clean_feat = feat_numpy[row_indices, col_indices]
# 只传输紧凑特征给 GPU
feat_tensor = torch.from_numpy(clean_feat).to(device)
# 由于这部分不是项目重点,所以用极简模型做 forward/backward
out = model(feat_tensor)
loss = out.sum()
loss.backward()
optimizer.step()关键依赖:
C++ 编译扩展(编译自stage3_engine.cpp`)RingSampler/src/*.c- io_uring 采样核心
输入文件:
offset.bin,edges.bin,features.bintemp_offgs_dataset/conf.json
输出:
- 训练日志(时间统计)
DiskGNN/
├── python/
│ ├── offgs/
│ │ ├── __init__.py
│ │ ├── dataset.py # OffgsDataset 类:加载二进制组织的图数据
│ │ │ └── class OffgsDataset:
│ │ │ ├── __init__(path) # 加载 conf.json, graph.pth
│ │ │ ├── get_data() # 返回 (features, edge_index, labels)
│ │ │ └── __getitem__(idx) # 按 mini-batch 获取数据
│ │ └── utils.py # 辅助:数据加载工具函数
│ └── setup.py # offgs 模块的安装脚本
│
├── examples/
│ ├── sampling.py # run(args, dataset) - DGL 离线采样
│ │ └── class Args:
│ │ ├── dataset # 数据集名称
│ │ ├── fanout # "10,10" 采样层数
│ │ ├── batchsize # mini-batch 大小
│ │ ├── store_path # 采样输出路径
│ │ └── num_workers # 线程数量
│ │
│ ├── feat_packing.py # run(dataset, args) - MinHash 打包与重排
│ │ └── 使用 DGL GraphBolt API
│ │ ├── 计算节点热度(采样频率)
│ │ ├── MinHash 聚类相似节点
│ │ ├── 产生新 ID 映射表
│ │ └── 重排特征文件(4KB 对齐)
│ │
│ └── utils.py # 数据加载、图格式转换等工具
│ ├── load_dataset()
│ ├── create_loader()
│ └── check_device()
│
└── src/ # C++ 核心库
laser-release/
├── laser/
│ └── rewiring/
│ └── sdrf/ #SDRF (Steepest Descent Ricci Flow)
│ ├── sdrf.py
│ │ ├── 输入: PyG Data 对象 (edge_index, x)
│ │ ├── 输入: loops=5 迭代次数
│ │ ├── 输入: remove_edges=False 是否删边
│ │ ├── 输出: (new_edge_index, edge_weights)
│ │ └── 原理: 基于 Ricci 曲率的图重构
│ │
│ └── __init__.py
| └──transform.py
│
├── setup.py
└──
RingSampler/src/
├── core.h / core.c # io_uring 初始化、队列管理
│ ├── struct io_uring ring # io_uring 上下文
│ ├── io_uring_queue_init() # 初始化队列
│ ├── io_uring_submit() # 提交 I/O 请求
│ └── io_uring_wait_cqe() # 等待完成事件
│
├── layer_sampling.c / .h # K-hop 邻接表采样
│ ├── sample_one_layer() # 从图中采样一跳邻接
│ └── fanout 参数化
│
├── random_selection.c / .h # 随机选择(ReservoirSampling)
│ └── random_select_nodes() # 无偏采样
│
├── remove_duplicate.c / .h # 去重优化
│ └── deduplicate_set() # HashSet 基去重
│
└── utils.c / .h # 工具函数
├── index_lookup() # 数据结构查询
└── memory_align() # 4KB 对齐辅助
编译命令:
python setup_stage3.py build_ext --inplace
# 产生: stage3_engine.cpython-310-x86_64-linux-gnu.so,可能有一些重复定义,用CFLAGS=‘-fcommon’即可关键设计:
- io_uring 双队列: 分别用于特征读(O_DIRECT)和拓扑采样
- 零拷贝接口: 直接返回 NumPy 数组指针,避免数据复制
- 4KB 对齐: 特征文件按 4KB 页大小对齐,提高 SSD I/O 效率
Phase 1: 拓扑重构
┌──────────────────┐
│ Cora 数据集 │
└────────┬─────────┘
│ (加载)
▼
┌──────────────────┐
│ edge_index │
│ x (features) │
└────────┬─────────┘
│
│ (SDRF 算法)
▼
┌──────────────────────┐
│ new_edge_index │
│ (拓扑重构后) │
└────────┬─────────────┘
│ (度数截断)
▼
┌──────────────────────┐
│ rewired_*.bin │
│ (二进制 CSR 格式) │
└──────────────────────┘
Phase 2: 采样与打包
┌──────────────────────┐
│ rewired_*.bin │ (Phase 1 输出)
│ raw_features.bin │
└────────┬─────────────┘
│
│ (格式转换)
▼
┌──────────────────────┐
│ OffgsDataset │
│ (GraphBolt 格式) │
└────────┬─────────────┘
│
│ (DGL 离线采样)
▼
┌──────────────────────┐
│ 节点热度统计 │
│ meta_node_popularity │
└────────┬─────────────┘
│
│ (MinHash 打包)
▼
┌──────────────────────────┐
│ 重排特征文件 │
│ (4KB 对齐) │
│ ID 映射表 │
└────────┬─────────────────┘
│
▼
┌──────────────────────────┐
│ offset.bin (uint64) │
│ edges.bin (uint32) │
│ features.bin (4KB对齐) │
└──────────────────────────┘
Phase 3: 异步 I/O 训练
┌──────────────────────────┐
│ offset.bin, edges.bin │ (Phase 2 输出)
│ features.bin │
└────────┬─────────────────┘
│
│ (RingEngine 初始化)
▼
┌──────────────────────────┐
│ io_uring 队列 (双队列) │
│ - 特征读取队列 (O_DIRECT)│
│ - 拓扑采样队列 │
└────────┬─────────────────┘
│
┌────┴───────────┐
│ │
▼ (后台) ▼ (主线程)
┌─────────────┐ ┌──────────────┐
│ I/O 线程 │ │ GPU 训练 │
│ - 采样 │ │ - Forward │
│ - 读特征 │ │ - Backward │
└──────┬──────┘ └──────────────┘
│
(预取队列)
│
▼
模型收敛
| 参数名 | 阶段 | 含义 | 默认值 | 备注 |
|---|---|---|---|---|
loops |
Phase 1 | SDRF 迭代次数 | 5 | 越多图优化越好,耗时越长 |
max_degree |
Phase 1 | 度数截断上限 | 64 | 稳定 RingSampler 内存占用 |
fanout |
Phase 2 | 采样配置 "layer1,layer2,..." | "10,10" | 2层邻接,每层采10个 |
batch_size |
Phase 2/3 | mini-batch 大小 | 1024 | 越大内存压力越大 |
q_depth |
Phase 3 | io_uring 队列深度 | 128 | 越大 I/O 吞吐越高 |
feat_cache_size |
Phase 2 | 特征缓存大小 | 10MB | 用于 MinHash 聚类 |
# 前置:编译 C++ 扩展
python setup_stage3.py build_ext --inplace
# Phase 1:图拓扑重构(~时间取决于图大小)
python stage1_rewiring.py
# 输入: 无(自动下载)
# 输出: rewired_indptr.bin, rewired_indices.bin, raw_features.bin
# Phase 2:采样与特征打包
python stage2_packing.py
# 输入: 上一步的三个文件
# 输出: offset.bin, edges.bin, features.bin (优化后)
# Phase 3:异步 I/O 训练(持续,3 epochs × 10 batches)
python stage3_training.py
# 输入: phase2 的三个文件 + conf.json
# 输出: 时间统计、训练日志最后更新: 2026-04-11