Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 78 additions & 7 deletions src/policies/power_of_two.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,19 @@ impl PowerOfTwoPolicy {
}

fn get_worker_load(&self, worker: &dyn Worker) -> isize {
// First check cached loads (from external monitoring)
let local = worker.load() as isize;

// Combine cached load (external monitoring, e.g. vLLM /load endpoint)
// with local counter (in-flight requests tracked by router).
// This avoids thundering herd when cached loads go stale between
// 5-second polling intervals.
if let Ok(loads) = self.cached_loads.read() {
if let Some(&load) = loads.get(worker.url()) {
return load;
if let Some(&cached) = loads.get(worker.url()) {
return cached + local;
Comment thread
jiahaoliang marked this conversation as resolved.
}
}

// Fall back to local load counter
worker.load() as isize
local
}
}

Expand Down Expand Up @@ -184,8 +188,75 @@ mod tests {
}
}

// Worker2 should be selected significantly more often
assert!(w2_selected > 35); // Should win most of the time
// With only 2 workers, power-of-two always picks the same pair,
// so the less-loaded one wins every time
assert_eq!(w2_selected, 50);
}

/// Verify that cached_load and local_counter are combined:
/// When two workers have equal cached_load, local_counter should break the tie.
/// This prevents thundering herd when cached_load goes stale between polling intervals.
#[test]
fn test_power_of_two_combines_cached_and_local_load() {
let policy = PowerOfTwoPolicy::new();
let worker1 = BasicWorker::new("http://w1:8000".to_string(), WorkerType::Regular);
let worker2 = BasicWorker::new("http://w2:8000".to_string(), WorkerType::Regular);

// Set equal cached_loads, but worker1 has higher local counter
let mut loads = HashMap::new();
loads.insert("http://w1:8000".to_string(), 50);
loads.insert("http://w2:8000".to_string(), 50);
policy.update_loads(&loads);

// Increment worker1's local counter
for _ in 0..5 {
worker1.increment_load();
}
// worker2 local counter remains 0

// worker1 total load = cached(50) + local(5) = 55
// worker2 total load = cached(50) + local(0) = 50
// So worker2 should be selected more often
let workers: Vec<Arc<dyn Worker>> = vec![Arc::new(worker1), Arc::new(worker2)];

let mut w2_selected = 0;
for _ in 0..50 {
if let Some(idx) = policy.select_worker(&workers, None) {
if idx == 1 {
w2_selected += 1;
}
}
}

// With only 2 workers, the less-loaded one is always selected
assert_eq!(w2_selected, 50);
}

/// Verify that without cached_load, only local_counter is used
#[test]
fn test_power_of_two_without_cached_loads_uses_local() {
let policy = PowerOfTwoPolicy::new();
let worker1 = BasicWorker::new("http://w1:8000".to_string(), WorkerType::Regular);
let worker2 = BasicWorker::new("http://w2:8000".to_string(), WorkerType::Regular);

// No cached_load set, rely only on local counter
for _ in 0..10 {
worker1.increment_load();
}

let workers: Vec<Arc<dyn Worker>> = vec![Arc::new(worker1), Arc::new(worker2)];

let mut w2_selected = 0;
for _ in 0..50 {
if let Some(idx) = policy.select_worker(&workers, None) {
if idx == 1 {
w2_selected += 1;
}
}
}

// With only 2 workers, the less-loaded one is always selected
assert_eq!(w2_selected, 50);
}

#[test]
Expand Down