diff --git a/Cargo.lock b/Cargo.lock index f8960dec0..ad52adbe3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -396,6 +396,25 @@ dependencies = [ "cc", ] +[[package]] +name = "coin_cbc" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d602045cd2e7ad02608a71492af94357f493a6f3c934ce854c03bf10fddc5780" +dependencies = [ + "coin_cbc_sys", + "lazy_static", +] + +[[package]] +name = "coin_cbc_sys" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "085619f8bdc38e24e25c6336ecc3f2e6c0543d67566dff6daef0e32f7ac20f76" +dependencies = [ + "pkg-config", +] + [[package]] name = "colorchoice" version = "1.0.4" @@ -1370,6 +1389,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "leb128fmt" version = "0.1.0" @@ -1710,6 +1735,12 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" +[[package]] +name = "pkg-config" +version = "0.3.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" + [[package]] name = "platforms" version = "2.0.0" @@ -2355,6 +2386,7 @@ dependencies = [ "bitflags 2.11.0", "bstr", "bytes", + "coin_cbc", "criterion", "derive_builder", "derive_more 2.1.1", diff --git a/crates/hyperqueue/Cargo.toml b/crates/hyperqueue/Cargo.toml index 67758ba54..bffa0fb2c 100644 --- a/crates/hyperqueue/Cargo.toml +++ b/crates/hyperqueue/Cargo.toml @@ -77,6 +77,7 @@ dashboard = ["dep:ratatui", "dep:crossterm", "dep:unicode-width"] highs = ["tako/highs"] microlp = ["tako/microlp"] +coin_cbc = ["tako/coin_cbc"] [[bench]] name = "benchmark" diff --git a/crates/tako/Cargo.toml b/crates/tako/Cargo.toml index 2c1a69e77..46d6f5923 100644 --- a/crates/tako/Cargo.toml +++ b/crates/tako/Cargo.toml @@ -11,6 +11,7 @@ rust-version.workspace = true default = ["highs"] highs = ["dep:highs"] microlp = ["dep:microlp"] +coin_cbc = ["dep:coin_cbc"] [dependencies] anyhow = { workspace = true } @@ -39,6 +40,7 @@ itertools = { workspace = true } highs = { version = "1.12", optional = true } microlp = { version = "0.2", optional = true } +coin_cbc = { version = "0.1", optional = true } hashbrown = { version = "0.16", features = ["serde", "inline-more"], default-features = false } priority-queue = "2" diff --git a/crates/tako/src/internal/scheduler/gap_cache.rs b/crates/tako/src/internal/scheduler/gap_cache.rs index 7340f8e5d..5b6042a7b 100644 --- a/crates/tako/src/internal/scheduler/gap_cache.rs +++ b/crates/tako/src/internal/scheduler/gap_cache.rs @@ -138,15 +138,14 @@ fn compute_gap( c.into_iter(), ); } - let Some(solution) = solver.solve() else { + let Some((solution, _)) = solver.solve() else { return 0; }; - let counts = solution.0.get_values(); let mut resources = resources.clone(); - for (idx, rq) in high_priority_rqv.requests().iter().enumerate() { + for (var, rq) in vars.iter().zip(high_priority_rqv.requests().iter()) { resources.remove_multiple_masked( rq, - counts[idx].round() as u32, + solution.get_value(*var).round() as u32, entry.resource_id, ); } diff --git a/crates/tako/src/internal/scheduler/solver.rs b/crates/tako/src/internal/scheduler/solver.rs index bdc965b63..c9a29deaf 100644 --- a/crates/tako/src/internal/scheduler/solver.rs +++ b/crates/tako/src/internal/scheduler/solver.rs @@ -65,7 +65,7 @@ pub(crate) fn run_scheduling_solver( let mut solver = LpSolver::new(false); - let mut placements: Map<(WorkerId, ResourceRqId, ResourceVariantId), (_, u32)> = Map::new(); + let mut placements: Map<(WorkerId, ResourceRqId, ResourceVariantId), Variable> = Map::new(); let mut tasks_count_vars: Map> = Map::new(); let mut worker_res_constraint = vec![Vec::new(); n_resources]; @@ -94,10 +94,7 @@ pub(crate) fn run_scheduling_solver( worker, &resource_sums, ); - placements.insert( - (worker.id, batch.resource_rq_id, v_idx), - (v, solver.last_var_idx()), - ); + placements.insert((worker.id, batch.resource_rq_id, v_idx), v); // Insert into worker resource constraints for (r, amount) in worker.resources.iter_pairs() { worker_res_constraint[r.as_usize()].push((v, amount.as_f64())); @@ -111,10 +108,7 @@ pub(crate) fn run_scheduling_solver( set_placement_name(&mut solver, worker.id, batch.resource_rq_id, v_idx); let v = create_sn_var(&mut solver, rq, n_workers, w_idx, worker, &resource_sums); - placements.insert( - (worker.id, batch.resource_rq_id, v_idx), - (v, solver.last_var_idx()), - ); + placements.insert((worker.id, batch.resource_rq_id, v_idx), v); tasks_count_vars .entry(batch.resource_rq_id) .or_default() @@ -188,7 +182,7 @@ pub(crate) fn run_scheduling_solver( for (group_name, group) in worker_groups.iter() { temp.clear(); for w_id in group.worker_ids() { - if let Some((v, _)) = placements.get(&(w_id, batch.resource_rq_id, rv_id)) { + if let Some(v) = placements.get(&(w_id, batch.resource_rq_id, rv_id)) { temp.push(*v) } } @@ -264,9 +258,7 @@ pub(crate) fn run_scheduling_solver( continue; } for v_id in batch_rqv.variant_ids() { - if let Some((var, _)) = - placements.get(&(w.id, batch.resource_rq_id, v_id)) - { + if let Some(var) = placements.get(&(w.id, batch.resource_rq_id, v_id)) { let gap = scheduler_cache.gap_cache.get_gap( *blocker_rq_id, batch.resource_rq_id, @@ -352,7 +344,6 @@ pub(crate) fn run_scheduling_solver( return result; }; - let values = solution.get_values(); for batch in task_batches { let resource_rq_id = batch.resource_rq_id; let rqv = request_map.get(resource_rq_id); @@ -361,8 +352,8 @@ pub(crate) fn run_scheduling_solver( let n_nodes = rqv.get(v_id).n_nodes() as usize; let mut ws: Vec> = Vec::new(); for worker in &workers { - if let Some((_, var_idx)) = placements.get(&(worker.id, resource_rq_id, v_id)) { - let count = values[*var_idx as usize].round() as u32; + if let Some(v) = placements.get(&(worker.id, resource_rq_id, v_id)) { + let count = solution.get_value(*v).round() as u32; if count > 0 { if let Some(last) = ws.last_mut() && last.len() < n_nodes @@ -384,16 +375,10 @@ pub(crate) fn run_scheduling_solver( let counts: Map<_, _> = workers .iter() .filter_map(|w| { - placements - .get(&(w.id, resource_rq_id, v_id)) - .and_then(|(_, var_idx)| { - let count = values[*var_idx as usize].round() as u32; - if count > 0 { - Some((w.id, values[*var_idx as usize].round() as u32)) - } else { - None - } - }) + placements.get(&(w.id, resource_rq_id, v_id)).and_then(|v| { + let count = solution.get_value(*v).round() as u32; + if count > 0 { Some((w.id, count)) } else { None } + }) }) .collect(); if !counts.is_empty() { diff --git a/crates/tako/src/internal/solver/coin_cbc.rs b/crates/tako/src/internal/solver/coin_cbc.rs new file mode 100644 index 000000000..fd2f1e76e --- /dev/null +++ b/crates/tako/src/internal/solver/coin_cbc.rs @@ -0,0 +1,80 @@ +use crate::internal::solver::{ConstraintType, LpInnerSolver, LpSolution}; +use coin_cbc::{Col, Model, Sense}; + +pub(crate) struct CoinCbcSolver { + model: Model, +} + +impl CoinCbcSolver { + pub fn new() -> Self { + let mut model = Model::default(); + model.set_parameter("log", "0"); + model.set_obj_sense(Sense::Maximize); + CoinCbcSolver { model } + } +} + +impl LpInnerSolver for CoinCbcSolver { + type Variable = Col; + type Solution = coin_cbc::Solution; + + #[inline] + fn add_variable(&mut self, weight: f64, min: f64, max: f64) -> Self::Variable { + let col = self.model.add_col(); + self.model.set_obj_coeff(col, weight); + self.model.set_col_lower(col, min); + self.model.set_col_upper(col, max); + col + } + + #[inline] + fn add_bool_variable(&mut self, weight: f64) -> Self::Variable { + let col = self.model.add_binary(); + self.model.set_obj_coeff(col, weight); + col + } + + #[inline] + fn add_nat_variable(&mut self, weight: f64) -> Self::Variable { + let col = self.model.add_integer(); + self.model.set_obj_coeff(col, weight); + self.model.set_col_lower(col, 0.0); + col + } + + #[inline] + fn add_constraint( + &mut self, + constraint_type: ConstraintType, + value: f64, + variables: impl Iterator, + ) { + let row = self.model.add_row(); + match constraint_type { + ConstraintType::Min => self.model.set_row_lower(row, value), + ConstraintType::Max => self.model.set_row_upper(row, value), + ConstraintType::Eq => self.model.set_row_equal(row, value), + } + for (col, coeff) in variables { + self.model.set_weight(row, col, coeff); + } + } + + fn solve(self) -> Option<(Self::Solution, f64)> { + let solution = self.model.solve(); + if !solution.raw().is_proven_optimal() { + return None; + } + let obj = solution.raw().obj_value(); + Some((solution, obj)) + } +} + +impl LpSolution for coin_cbc::Solution { + type Variable = Col; + + #[inline] + fn get_value(&self, v: Col) -> f64 { + self.col(v) + } +} diff --git a/crates/tako/src/internal/solver/highs.rs b/crates/tako/src/internal/solver/highs.rs index d9bedf4d4..64e9ad90f 100644 --- a/crates/tako/src/internal/solver/highs.rs +++ b/crates/tako/src/internal/solver/highs.rs @@ -54,8 +54,10 @@ impl LpInnerSolver for HighsSolver { } impl LpSolution for highs::Solution { + type Variable = highs::Col; + #[inline] - fn get_values(&self) -> &[f64] { - self.columns() + fn get_value(&self, v: highs::Col) -> f64 { + self[v] } } diff --git a/crates/tako/src/internal/solver/microlp.rs b/crates/tako/src/internal/solver/microlp.rs index abef8c4c5..788971157 100644 --- a/crates/tako/src/internal/solver/microlp.rs +++ b/crates/tako/src/internal/solver/microlp.rs @@ -1,5 +1,5 @@ use crate::internal::solver::{ConstraintType, LpInnerSolver, LpSolution}; -use microlp::ComparisonOp; +use microlp::{ComparisonOp, Solution}; pub(crate) struct MicrolpSolver(microlp::Problem); @@ -13,7 +13,7 @@ impl MicrolpSolver { impl LpInnerSolver for MicrolpSolver { type Variable = microlp::Variable; - type Solution = MicrolpSolution; + type Solution = Solution; #[inline] fn add_variable(&mut self, weight: f64, min: f64, max: f64) -> Self::Variable { @@ -58,17 +58,16 @@ impl LpInnerSolver for MicrolpSolver { let Ok(solution) = self.0.solve() else { return None; }; - Some(( - MicrolpSolution(solution.iter().map(|x| *x.1).collect()), - solution.objective(), - )) + let objective = solution.objective(); + Some((solution, objective)) } } -pub(crate) struct MicrolpSolution(Vec); +impl LpSolution for microlp::Solution { + type Variable = microlp::Variable; -impl LpSolution for MicrolpSolution { - fn get_values(&self) -> &[f64] { - self.0.as_slice() + #[inline] + fn get_value(&self, v: microlp::Variable) -> f64 { + *self.var_value(v) } } diff --git a/crates/tako/src/internal/solver/mod.rs b/crates/tako/src/internal/solver/mod.rs index 4497a7c65..261133200 100644 --- a/crates/tako/src/internal/solver/mod.rs +++ b/crates/tako/src/internal/solver/mod.rs @@ -1,3 +1,5 @@ +#[cfg(all(feature = "coin_cbc", not(feature = "microlp"), not(feature = "highs")))] +pub(crate) mod coin_cbc; #[cfg(feature = "highs")] pub(crate) mod highs; #[cfg(all(feature = "microlp", not(feature = "highs")))] @@ -9,6 +11,9 @@ pub(crate) type LpInnerSolverImpl = highs::HighsSolver; #[cfg(all(feature = "microlp", not(feature = "highs")))] pub(crate) type LpInnerSolverImpl = microlp::MicrolpSolver; +#[cfg(all(feature = "coin_cbc", not(feature = "microlp"), not(feature = "highs")))] +pub(crate) type LpInnerSolverImpl = coin_cbc::CoinCbcSolver; + pub(crate) type Variable = ::Variable; pub(crate) type Solution = ::Solution; @@ -21,7 +26,7 @@ pub(crate) enum ConstraintType { pub(crate) trait LpInnerSolver { type Variable: Copy; - type Solution: LpSolution; + type Solution: LpSolution; fn add_variable(&mut self, weight: f64, min: f64, max: f64) -> Self::Variable; fn add_bool_variable(&mut self, weight: f64) -> Self::Variable; @@ -36,12 +41,12 @@ pub(crate) trait LpInnerSolver { } pub(crate) trait LpSolution { - fn get_values(&self) -> &[f64]; + type Variable: Copy; + fn get_value(&self, v: Self::Variable) -> f64; } pub(crate) struct LpSolver { solver: LpInnerSolverImpl, - n_vars: u32, #[cfg(debug_assertions)] verbose: bool, @@ -50,7 +55,7 @@ pub(crate) struct LpSolver { #[cfg(debug_assertions)] name_config: Option, #[cfg(debug_assertions)] - variables: Vec<(String, f64)>, + variables: Vec<(String, f64, Variable)>, } #[cfg(debug_assertions)] @@ -68,21 +73,20 @@ impl LpSolver { let name = self.name_config.take(); if let Some(name) = name { self.var_name_map.insert(variable, self.variables.len()); - self.variables.push((name, weight)); + self.variables.push((name, weight, variable)); } variable } pub fn new(verbose: bool) -> Self { - #[cfg(not(any(feature = "highs", feature = "microlp")))] + #[cfg(not(any(feature = "highs", feature = "microlp", feature = "coin_cbc")))] { compile_error!( - "You have to enable either the `highs` or the `microlp` feature using `cargo build ... --features `" + "You have to enable either the `highs`, `microlp`, or `coin_cbc` feature using `cargo build ... --features `" ) } LpSolver { verbose, - n_vars: 0, solver: LpInnerSolverImpl::new(), var_name_map: Default::default(), variables: Default::default(), @@ -161,7 +165,7 @@ impl LpSolver { pub fn solve(self) -> Option<(Solution, f64)> { if self.verbose { println!("Weights:"); - for (name, weight) in self.variables.iter() { + for (name, weight, _var) in self.variables.iter() { if *weight != 0.0 { println!("{} -> {}", name, weight); } @@ -172,8 +176,8 @@ impl LpSolver { && self.verbose { println!("==== Solution: ===="); - for ((name, _weight), value) in self.variables.iter().zip(s.get_values()) { - println!("{} = {}", name, value); + for (name, _weight, var) in self.variables.iter() { + println!("{} = {}", name, s.get_value(*var)); } } s @@ -198,7 +202,6 @@ impl LpSolver { pub fn new(_verbose: bool) -> Self { LpSolver { solver: LpInnerSolverImpl::new(), - n_vars: 0, } } @@ -220,29 +223,21 @@ impl LpSolver { } impl LpSolver { - #[inline] - pub fn last_var_idx(&self) -> u32 { - self.n_vars - 1 - } - #[inline] pub fn add_variable(&mut self, weight: f64, min: f64, max: f64) -> Variable { let v = self.solver.add_variable(weight, min, max); - self.n_vars += 1; self.new_var(v, weight) } #[inline] pub fn add_bool_variable(&mut self, weight: f64) -> Variable { let v = self.solver.add_bool_variable(weight); - self.n_vars += 1; self.new_var(v, weight) } #[inline] pub fn add_nat_variable(&mut self, weight: f64) -> Variable { let v = self.solver.add_nat_variable(weight); - self.n_vars += 1; self.new_var(v, weight) } } diff --git a/crates/tako/src/internal/tests/test_scheduler_sn.rs b/crates/tako/src/internal/tests/test_scheduler_sn.rs index cce21fd79..11771f71e 100644 --- a/crates/tako/src/internal/tests/test_scheduler_sn.rs +++ b/crates/tako/src/internal/tests/test_scheduler_sn.rs @@ -497,17 +497,32 @@ fn test_schedule_gap_filling2() { fn test_schedule_gap_filling3() { let mut rt = TestEnv::new(); rt.new_named_resource("foo"); - rt.new_workers(2, &WorkerBuilder::new(34)); + let ws = rt.new_workers(2, &WorkerBuilder::new(34)); - let ta = TaskBuilder::new().cpus(9); - let tb = TaskBuilder::new().cpus(3); + let ta = TaskBuilder::new().cpus(3); + let tb = TaskBuilder::new().cpus(9); - rt.new_tasks(5, &tb.clone().user_priority(10)); - rt.new_tasks(6, &ta.clone().user_priority(10)); - rt.new_tasks(5, &tb.clone().user_priority(9)); + rt.new_tasks(5, &ta.clone().user_priority(10)); + let ts2 = rt.new_tasks(6, &tb.clone().user_priority(10)); + let ts3 = rt.new_tasks(5, &ta.clone().user_priority(9)); rt.schedule(); - let counts = assigned_counts(&mut rt); - assert_eq!(counts, [4, 6]); + + for w in ws { + let mut cpus = 0; + let mut t3count = 0; + for t in &rt.worker(w).sn_assignment().unwrap().assign_tasks { + if ts2.contains(t) { + cpus += 9; + } else { + cpus += 3; + if ts3.contains(t) { + t3count += 1; + } + } + } + assert_eq!(cpus, 33); + assert!(t3count <= 2); + } } #[test] diff --git a/crates/tako/src/internal/worker/resources/concise.rs b/crates/tako/src/internal/worker/resources/concise.rs index f5a377bf3..77482ce71 100644 --- a/crates/tako/src/internal/worker/resources/concise.rs +++ b/crates/tako/src/internal/worker/resources/concise.rs @@ -118,6 +118,7 @@ impl ConciseResourceState { } } + #[cfg(test)] pub fn n_groups(&self) -> usize { self.free.len() } diff --git a/crates/tako/src/internal/worker/resources/groups.rs b/crates/tako/src/internal/worker/resources/groups.rs index 5e3c7c171..3c95b73d7 100644 --- a/crates/tako/src/internal/worker/resources/groups.rs +++ b/crates/tako/src/internal/worker/resources/groups.rs @@ -17,7 +17,7 @@ type GroupIndices = SmallVec<[usize; 2]>; type SelectedGroups = SmallVec<[GroupIndices; FAST_MAX_COUPLED_RESOURCES]>; /* - This is the main solver for the NUMA aware scheduling. It find the optimal solution + This is the main solver for the NUMA aware scheduling. It finds the optimal solution wrt minimizing the total number of groups & respecting weights between groups. Note that the solver does not select specific indices nor the number of indices that would be taken @@ -140,21 +140,14 @@ pub fn group_solver( ); } let (solution, objective_value): (_, _) = solver.solve()?; - let values = solution.get_values(); - let mut index = 0; Some(( - entries - .iter() - .map(|entry| { - let r = free.get(entry.resource_id); - let n = r.n_groups(); - let g = values[index..index + n] + vars.iter() + .map(|var_group| { + var_group .iter() .enumerate() - .filter_map(|(i, v)| (*v > 0.5).then_some(i)) - .collect(); - index += n; - g + .filter_map(|(i, v)| (solution.get_value(*v) > 0.5).then_some(i)) + .collect() }) .collect(), objective_value,