Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
4749e59
feat(pd): support MLA cache layout
xiaguan May 28, 2026
1ae6142
Merge remote-tracking branch 'origin/master' into docs/pd-mla-design
xiaguan May 28, 2026
aa3acb8
chore(pd): add H20 Kimi launch script
xiaguan May 28, 2026
7a45fd7
fix: stabilize pd rdma producer release
xiaguan May 28, 2026
874e0c6
fix: improve pd rdma push throughput
xiaguan May 28, 2026
0851fd0
docs: record h20 pd mla benchmark results
xiaguan May 28, 2026
532d094
docs: add h20 kimi pd mla experiment note
xiaguan May 29, 2026
a46afc1
docs: define h20 kimi aligned ttft sweep
xiaguan May 29, 2026
9e196db
chore: align h20 kimi sweep tooling
xiaguan May 29, 2026
382a50d
fix: add pd rdma latency distribution logs
xiaguan May 29, 2026
3a526a3
chore: expand h20 kimi sweep summary
xiaguan May 29, 2026
7a84a0e
docs: record h20 kimi baseline sweep
xiaguan May 29, 2026
346d7e2
fix: avoid hanging h20 nic monitor
xiaguan May 29, 2026
d10e358
fix: reduce pd handshake overhead
xiaguan May 29, 2026
33186f5
test: strengthen pd rdma integration benchmark
xiaguan May 29, 2026
6530727
perf: reduce pd connector dispatch overhead
xiaguan May 29, 2026
4d75058
docs: record h20 tcp microbench
xiaguan May 29, 2026
33d943c
chore: trace pd scheduler ingress latency
xiaguan May 29, 2026
3368acb
perf: reduce pd decode wait registration overhead
xiaguan May 29, 2026
07e0db2
docs: record rejected early prefill experiment
xiaguan May 29, 2026
d83029e
docs: complete h20 waitmini ttft sweep
xiaguan May 29, 2026
780b85b
docs: record vllm recompute boundary
xiaguan May 29, 2026
8e03f69
docs: record active nic window analysis
xiaguan May 29, 2026
f4b8635
docs: explain layer cadence bandwidth ceiling
xiaguan May 29, 2026
2574016
perf: log pd ready-window nic utilization
xiaguan May 29, 2026
8a359f6
perf: log pd decode completion tail
xiaguan May 29, 2026
2f15691
perf: parallelize pd decode rdma waits
xiaguan May 29, 2026
c706900
chore: summarize pd connector latency logs
xiaguan May 29, 2026
407e879
perf: log pd event ready bandwidth
xiaguan May 29, 2026
74cf993
fix: refuse h20 kimi start on busy gpus
xiaguan May 29, 2026
8317c87
chore: add h20 idle gpu scanner
xiaguan May 29, 2026
bbde5f7
chore: include rdma nics in h20 idle scan
xiaguan May 29, 2026
75b3217
chore: support h20 pd probe readiness
xiaguan May 29, 2026
be4db0c
docs: record h20 event-ready probe
xiaguan May 29, 2026
1cb6495
chore: move h20 helpers out of repo
xiaguan May 29, 2026
c366b07
Merge remote-tracking branch 'origin/master' into docs/pd-mla-design
xiaguan May 29, 2026
d4e3161
chore: clean pd mla pr surface
xiaguan May 29, 2026
c7cb763
ci: run pre-commit cargo test with cuda-13 feature
xiaguan Jun 1, 2026
1abd685
docs: move pd mla dev logs to standalone docs repo
xiaguan Jun 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 0 additions & 45 deletions docs/pd-bench-results.md

This file was deleted.

70 changes: 38 additions & 32 deletions pegaflow-transfer/src/v2/verbs/verbs_domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub struct VerbsDomain {
recv_ops: VecDeque<RecvOpContext>,
send_ops: VecDeque<SendOpContext>,
write_ops: VecDeque<NonNull<WriteOpContext>>,
write_op_contexts: HashSet<NonNull<WriteOpContext>>,
completions: VecDeque<DomainCompletionEntry>,

ud_mempool: MemoryPool,
Expand Down Expand Up @@ -357,6 +358,7 @@ impl VerbsDomain {
recv_ops: VecDeque::with_capacity(MAX_OPS),
send_ops: VecDeque::with_capacity(MAX_OPS),
write_ops: VecDeque::with_capacity(MAX_OPS),
write_op_contexts: HashSet::with_capacity(MAX_OPS),
completions: VecDeque::with_capacity(MAX_OPS),

ud_mempool,
Expand Down Expand Up @@ -549,24 +551,19 @@ impl VerbsDomain {
debug!("handle_peer_handshake: domain={} info={info:?}", self.name);
let peer_addr = DomainAddress::from(&info.ud_addr);

let peer = if let Some(peer) = self.peers.get_mut(&peer_addr) {
let send_response = if let Some(peer) = self.peers.get(&peer_addr) {
if let PeerState::Established = peer.state {
return Ok(());
}
peer
false
} else {
// TODO: unify the code with do_submit
let peer = self.create_peer(&peer_addr, vec![], vec![])?;
self.peers.insert(peer_addr.clone(), peer);
let peer = unsafe { self.peers.get(&peer_addr).unwrap_unchecked() };
let buf = unsafe { self.ud_mempool.alloc() }.ok_or(FabricLibError::Custom(
"Failed to allocate UD message buffer",
))?;
self.connect_peer(peer, buf)?;
unsafe { self.peers.get_mut(&peer_addr).unwrap_unchecked() }
true
};

// Activate QP
let peer = unsafe { self.peers.get_mut(&peer_addr).unwrap_unchecked() };

let pkey_index = 0; // TODO: get pkey_index
peer.msg_rc.rc_reset_to_init(self.port_num, pkey_index)?;
Expand Down Expand Up @@ -619,6 +616,15 @@ impl VerbsDomain {
// Submit pending submits
let msg_qp = peer.msg_rc.qp;
let rma_qp = peer.rma_rc.qp;

if send_response {
let buf = unsafe { self.ud_mempool.alloc() }.ok_or(FabricLibError::Custom(
"Failed to allocate UD message buffer",
))?;
let peer = unsafe { self.peers.get(&peer_addr).unwrap_unchecked() };
self.connect_peer(peer, buf)?;
}

for (transfer_id, op) in pending_submits {
self.do_submit_outbound_op(transfer_id, op, msg_qp, rma_qp);
}
Expand Down Expand Up @@ -843,6 +849,7 @@ impl VerbsDomain {
})
};
let context_ptr = unsafe { NonNull::new_unchecked(context) };
self.write_op_contexts.insert(context_ptr);

// Try to eagerly post if currently there's no pending write ops.
Self::progress_rdma_write_op_context(context);
Expand Down Expand Up @@ -971,6 +978,7 @@ impl VerbsDomain {
if context.in_queue {
return;
}
self.write_op_contexts.remove(&ptr);
unsafe { self.objpool_wr.free_and_drop(context.wr_chain_buffer) };
unsafe { self.objpool_write_op.free_and_drop(ptr) };
}
Expand Down Expand Up @@ -1047,29 +1055,27 @@ impl VerbsDomain {

// Check if the completion is an error.
if wc.status != IBV_WC_SUCCESS {
let transfer_id: Option<TransferId> = match wc.opcode {
IBV_WC_RECV | IBV_WC_SEND => {
Some(unsafe { transmute::<u64, TransferId>(wc.wr_id) })
}
IBV_WC_RDMA_WRITE => {
if let Some(context) = unsafe { (wc.wr_id as *mut WriteOpContext).as_mut() } {
context.cnt_finished_ops += 1;
let ret = if context.bad {
None
} else {
// Return error to the caller only once.
context.bad = true;
Some(context.transfer_id)
};
self.maybe_drop_write_op_context(unsafe {
NonNull::new_unchecked(context)
});
ret
} else {
None
let write_context = NonNull::new(wc.wr_id as *mut WriteOpContext)
.filter(|ptr| self.write_op_contexts.contains(ptr));
let transfer_id: Option<TransferId> = if let Some(mut ptr) = write_context {
let context = unsafe { ptr.as_mut() };
context.cnt_finished_ops += 1;
let ret = if context.bad {
None
} else {
// Return error to the caller only once.
context.bad = true;
Some(context.transfer_id)
};
self.maybe_drop_write_op_context(ptr);
ret
} else {
match wc.opcode {
IBV_WC_RECV | IBV_WC_SEND => {
Some(unsafe { transmute::<u64, TransferId>(wc.wr_id) })
}
_ => None,
}
_ => None,
};
let errmsg = unsafe {
CStr::from_ptr(ibv_wc_status_str(wc.status))
Expand All @@ -1078,8 +1084,8 @@ impl VerbsDomain {
};
return if let Some(transfer_id) = transfer_id {
warn!(
"Encountered RDMA op error. Send DomainCompletionEntry::Error to the caller: domain={} wr_id={} status={} opcode={}",
self.name, wc.wr_id, wc.status, wc.opcode
"Encountered RDMA op error. Send DomainCompletionEntry::Error to the caller: domain={} wr_id={} status={} msg={} opcode={} vendor_err={} qp_num={}",
self.name, wc.wr_id, wc.status, errmsg, wc.opcode, wc.vendor_err, wc.qp_num
);
Some(DomainCompletionEntry::Error(
transfer_id,
Expand Down
2 changes: 1 addition & 1 deletion prek.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ repo = "local"
hooks = [
{ id = "cargo-fmt", name = "cargo fmt", language = "system", entry = "cargo fmt --all -- --check", files = { glob = ["Cargo.toml", "**/Cargo.toml", "**/*.rs"] }, pass_filenames = false, require_serial = true },
{ id = "cargo-clippy", name = "cargo clippy", language = "system", entry = "cargo clippy --workspace --all-targets -- -D warnings", files = { glob = ["Cargo.toml", "**/Cargo.toml", "**/*.rs"] }, pass_filenames = false, require_serial = true },
{ id = "cargo-test-release", name = "cargo test --release", language = "system", entry = "cargo test --release", pass_filenames = false, always_run = true, require_serial = true, stages = ["pre-commit"] },
{ id = "cargo-test-release", name = "cargo test --release", language = "system", entry = "cargo test --release --no-default-features --features cuda-13", pass_filenames = false, always_run = true, require_serial = true, stages = ["pre-commit"] },
]

[[repos]]
Expand Down
21 changes: 19 additions & 2 deletions python/pegaflow/pd_connector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,28 @@

from pegaflow.pd_connector.metadata import PdConnectorMetadata
from pegaflow.pd_connector.scheduler import PdSchedulerConnector
from pegaflow.pd_connector.worker import PdWorkerConnector
from pegaflow.pd_connector.worker import PdWorkerConnector, model_uses_mla


class PdConnector(KVConnectorBase_V1, SupportsHMA):
"""Thin vLLM facade for the experimental P/D push connector."""

def __init__(self, vllm_config: Any, role: KVConnectorRole, kv_cache_config: Any = None):
super().__init__(vllm_config, role, kv_cache_config)
_assert_supported_config(vllm_config)
self._scheduler: PdSchedulerConnector | None = None
self._worker: PdWorkerConnector | None = None
if role == KVConnectorRole.SCHEDULER:
self._scheduler = PdSchedulerConnector(vllm_config)
elif role == KVConnectorRole.WORKER:
self._worker = PdWorkerConnector(vllm_config)
self._worker = PdWorkerConnector(vllm_config, kv_cache_config=kv_cache_config)
else:
raise ValueError(f"unsupported KV connector role: {role}")

@classmethod
def get_required_kvcache_layout(cls, vllm_config: Any) -> str | None:
if model_uses_mla(vllm_config):
return None
return "HND"

@classmethod
Expand Down Expand Up @@ -131,3 +134,17 @@ def request_finished_all_groups(


__all__ = ["PdConnector"]


def _assert_supported_config(vllm_config: Any) -> None:
if not model_uses_mla(vllm_config):
return
parallel_config = getattr(vllm_config, "parallel_config", None)
dcp_world_size = int(getattr(parallel_config, "decode_context_parallel_size", 1) or 1)
pcp_world_size = int(getattr(parallel_config, "prefill_context_parallel_size", 1) or 1)
assert dcp_world_size == 1, (
"PdConnector MLA first version requires decode_context_parallel_size == 1"
)
assert pcp_world_size == 1, (
"PdConnector MLA first version requires prefill_context_parallel_size == 1"
)
Loading