Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ yuanrong = [
"openyuanrong-datasystem"
]
mooncake = [
"mooncake-transfer-engine==0.3.10.post2"
"mooncake-transfer-engine==0.3.10.post2",
"cuda-python",
]

# If you need to mimic `package_dir={'': '.'}`:
Expand Down
42 changes: 35 additions & 7 deletions scripts/performance_test/perftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,17 @@ class TQClientActor:
def __init__(self, config: dict[str, Any], use_complex_case: bool = False):
self.config = config
self.use_complex_case = use_complex_case
mooncake_cfg = config.get("backend", {}).get("MooncakeStore", {})
self.use_gdr = bool(mooncake_cfg.get("use_gdr", False))
self.gdr_device = "cuda:0"
self.test_data = None
self.total_data_size_gb = 0.0
self.test_keys = None

def initialize(self) -> None:
"""Initialize transfer_queue with the config."""
if self.use_gdr:
torch.cuda.set_device(self.gdr_device)
tq.init(OmegaConf.create(self.config))

def create_test_case(
Expand Down Expand Up @@ -249,11 +254,16 @@ def list_keys(self, partition_id: str) -> list[str]:
return list(partition_info[partition_id].keys())
return []

def get_data(self, partition_id: str, keys: list[str] | None = None) -> None:
def get_data(self, partition_id: str, keys: list[str] | None = None, move_to_gpu: bool = False) -> None:
"""Get data from storage using kv_batch_get."""
if keys is None:
keys = self.test_keys
tq.kv_batch_get(keys=keys, partition_id=partition_id)
result = tq.kv_batch_get(keys=keys, partition_id=partition_id)
if move_to_gpu:
cpu_tensors = [v for v in result.values() if torch.is_tensor(v) and not v.is_cuda]
torch.cuda.synchronize()
_ = [t.to(self.gdr_device) for t in cpu_tensors]
torch.cuda.synchronize()

def delete(self, partition_id: str, keys: list[str] | None = None) -> None:
"""Delete data from storage using kv_clear."""
Expand Down Expand Up @@ -316,6 +326,9 @@ def __init__(
# Get backend from config
self.backend = self.full_config["backend"]["storage_backend"]

# GDR is configured via backend.MooncakeStore.use_gdr (no separate CLI flag).
self.use_gdr = bool(self.full_config["backend"].get("MooncakeStore", {}).get("use_gdr", False))

# For Yuanrong, always use inter_node
self.use_inter_node = self.backend == "Yuanrong"

Expand All @@ -331,6 +344,18 @@ def _validate_args(self) -> None:
if self.use_inter_node and self.worker_node_ip is None:
raise ValueError("worker_node_ip is required for Yuanrong backend")

# GDR only applies to MooncakeStore on GPU; reject other combos up front.
if self.use_gdr:
if self.backend != "MooncakeStore":
raise ValueError(
f"backend.MooncakeStore.use_gdr=true requires the MooncakeStore backend, got '{self.backend}'."
)
if self.device != "gpu":
raise ValueError(
f"backend.MooncakeStore.use_gdr=true requires --device gpu "
f"(CUDA tensors are needed for the GDR path), got '{self.device}'."
)

def _prepare_config(self) -> dict[str, Any]:
"""Prepare the config by directly reading the backend_config file.

Expand Down Expand Up @@ -393,9 +418,9 @@ def _initialize_clients(self) -> None:
# Initialize transfer_queue
logger.info(f"Using {self.backend} as storage backend.")

w = self.writer.initialize.remote()
r = self.reader.initialize.remote()
ray.get([w, r])
# Writer first: ensures storage bootstrap binds to the head address before reader attaches.
ray.get(self.writer.initialize.remote())
ray.get(self.reader.initialize.remote())

def run_throughput_test(self, skip_dataset_create=False) -> dict[str, Any]:
"""Run the throughput test and print results.
Expand Down Expand Up @@ -438,10 +463,11 @@ def run_throughput_test(self, skip_dataset_create=False) -> dict[str, Any]:

time.sleep(2)

# GET_DATA operation using kv_batch_get
# GET_DATA operation using kv_batch_get; move_to_gpu adds H2D into get_time
move_to_gpu = self.device == "gpu" and not self.use_gdr
logger.info("Starting GET_DATA operation (kv_batch_get)...")
start_get_data = time.perf_counter()
ray.get(self.reader.get_data.remote(partition_id=partition_id, keys=keys))
ray.get(self.reader.get_data.remote(partition_id=partition_id, keys=keys, move_to_gpu=move_to_gpu))
end_get_data = time.perf_counter()
get_time = end_get_data - start_get_data
get_gbit_per_sec = (self.total_data_size_gb * 8) / get_time
Expand All @@ -462,6 +488,7 @@ def run_throughput_test(self, skip_dataset_create=False) -> dict[str, Any]:
logger.info("=" * 60)
logger.info(f"Backend: {self.backend}")
logger.info(f"Device: {self.device}")
logger.info(f"GDR: {self.use_gdr}")
logger.info(f"Total Data Size: {self.total_data_size_gb:.6f} GB")
logger.info(f"PUT Time: {put_time:.8f}s")
logger.info(f"GET Time: {get_time:.8f}s")
Expand All @@ -474,6 +501,7 @@ def run_throughput_test(self, skip_dataset_create=False) -> dict[str, Any]:
return {
"backend": self.backend,
"device": self.device,
"use_gdr": self.use_gdr,
"total_data_size_gb": self.total_data_size_gb,
"put_time": put_time,
"get_time": get_time,
Expand Down
2 changes: 2 additions & 0 deletions scripts/performance_test/perftest_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ backend:
# Network device name.
# Set to "" to let Mooncake auto-select available devices.
device_name: ""
# GPU Direct RDMA. When true, CUDA tensors are transferred directly from GPU memory
use_gdr: false

# For Yuanrong:
Yuanrong:
Expand Down
Loading
Loading