Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/hyperqueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ dashboard = ["dep:ratatui", "dep:crossterm", "dep:unicode-width"]

highs = ["tako/highs"]
microlp = ["tako/microlp"]
coin_cbc = ["tako/coin_cbc"]

[[bench]]
name = "benchmark"
Expand Down
2 changes: 2 additions & 0 deletions crates/tako/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ rust-version.workspace = true
default = ["highs"]
highs = ["dep:highs"]
microlp = ["dep:microlp"]
coin_cbc = ["dep:coin_cbc"]

[dependencies]
anyhow = { workspace = true }
Expand Down Expand Up @@ -39,6 +40,7 @@ itertools = { workspace = true }

highs = { version = "1.12", optional = true }
microlp = { version = "0.2", optional = true }
coin_cbc = { version = "0.1", optional = true }

hashbrown = { version = "0.16", features = ["serde", "inline-more"], default-features = false }
priority-queue = "2"
Expand Down
7 changes: 3 additions & 4 deletions crates/tako/src/internal/scheduler/gap_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,14 @@ fn compute_gap(
c.into_iter(),
);
}
let Some(solution) = solver.solve() else {
let Some((solution, _)) = solver.solve() else {
return 0;
};
let counts = solution.0.get_values();
let mut resources = resources.clone();
for (idx, rq) in high_priority_rqv.requests().iter().enumerate() {
for (var, rq) in vars.iter().zip(high_priority_rqv.requests().iter()) {
resources.remove_multiple_masked(
rq,
counts[idx].round() as u32,
solution.get_value(*var).round() as u32,
entry.resource_id,
);
}
Expand Down
37 changes: 11 additions & 26 deletions crates/tako/src/internal/scheduler/solver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub(crate) fn run_scheduling_solver(

let mut solver = LpSolver::new(false);

let mut placements: Map<(WorkerId, ResourceRqId, ResourceVariantId), (_, u32)> = Map::new();
let mut placements: Map<(WorkerId, ResourceRqId, ResourceVariantId), Variable> = Map::new();
let mut tasks_count_vars: Map<ResourceRqId, Vec<_>> = Map::new();

let mut worker_res_constraint = vec![Vec::new(); n_resources];
Expand Down Expand Up @@ -94,10 +94,7 @@ pub(crate) fn run_scheduling_solver(
worker,
&resource_sums,
);
placements.insert(
(worker.id, batch.resource_rq_id, v_idx),
(v, solver.last_var_idx()),
);
placements.insert((worker.id, batch.resource_rq_id, v_idx), v);
// Insert into worker resource constraints
for (r, amount) in worker.resources.iter_pairs() {
worker_res_constraint[r.as_usize()].push((v, amount.as_f64()));
Expand All @@ -111,10 +108,7 @@ pub(crate) fn run_scheduling_solver(
set_placement_name(&mut solver, worker.id, batch.resource_rq_id, v_idx);
let v =
create_sn_var(&mut solver, rq, n_workers, w_idx, worker, &resource_sums);
placements.insert(
(worker.id, batch.resource_rq_id, v_idx),
(v, solver.last_var_idx()),
);
placements.insert((worker.id, batch.resource_rq_id, v_idx), v);
tasks_count_vars
.entry(batch.resource_rq_id)
.or_default()
Expand Down Expand Up @@ -188,7 +182,7 @@ pub(crate) fn run_scheduling_solver(
for (group_name, group) in worker_groups.iter() {
temp.clear();
for w_id in group.worker_ids() {
if let Some((v, _)) = placements.get(&(w_id, batch.resource_rq_id, rv_id)) {
if let Some(v) = placements.get(&(w_id, batch.resource_rq_id, rv_id)) {
temp.push(*v)
}
}
Expand Down Expand Up @@ -264,9 +258,7 @@ pub(crate) fn run_scheduling_solver(
continue;
}
for v_id in batch_rqv.variant_ids() {
if let Some((var, _)) =
placements.get(&(w.id, batch.resource_rq_id, v_id))
{
if let Some(var) = placements.get(&(w.id, batch.resource_rq_id, v_id)) {
let gap = scheduler_cache.gap_cache.get_gap(
*blocker_rq_id,
batch.resource_rq_id,
Expand Down Expand Up @@ -352,7 +344,6 @@ pub(crate) fn run_scheduling_solver(
return result;
};

let values = solution.get_values();
for batch in task_batches {
let resource_rq_id = batch.resource_rq_id;
let rqv = request_map.get(resource_rq_id);
Expand All @@ -361,8 +352,8 @@ pub(crate) fn run_scheduling_solver(
let n_nodes = rqv.get(v_id).n_nodes() as usize;
let mut ws: Vec<ThinVec<WorkerId>> = Vec::new();
for worker in &workers {
if let Some((_, var_idx)) = placements.get(&(worker.id, resource_rq_id, v_id)) {
let count = values[*var_idx as usize].round() as u32;
if let Some(v) = placements.get(&(worker.id, resource_rq_id, v_id)) {
let count = solution.get_value(*v).round() as u32;
if count > 0 {
if let Some(last) = ws.last_mut()
&& last.len() < n_nodes
Expand All @@ -384,16 +375,10 @@ pub(crate) fn run_scheduling_solver(
let counts: Map<_, _> = workers
.iter()
.filter_map(|w| {
placements
.get(&(w.id, resource_rq_id, v_id))
.and_then(|(_, var_idx)| {
let count = values[*var_idx as usize].round() as u32;
if count > 0 {
Some((w.id, values[*var_idx as usize].round() as u32))
} else {
None
}
})
placements.get(&(w.id, resource_rq_id, v_id)).and_then(|v| {
let count = solution.get_value(*v).round() as u32;
if count > 0 { Some((w.id, count)) } else { None }
})
})
.collect();
if !counts.is_empty() {
Expand Down
80 changes: 80 additions & 0 deletions crates/tako/src/internal/solver/coin_cbc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use crate::internal::solver::{ConstraintType, LpInnerSolver, LpSolution};
use coin_cbc::{Col, Model, Sense};

pub(crate) struct CoinCbcSolver {
model: Model,
}

impl CoinCbcSolver {
pub fn new() -> Self {
let mut model = Model::default();
model.set_parameter("log", "0");
model.set_obj_sense(Sense::Maximize);
CoinCbcSolver { model }
}
}

impl LpInnerSolver for CoinCbcSolver {
type Variable = Col;
type Solution = coin_cbc::Solution;

#[inline]
fn add_variable(&mut self, weight: f64, min: f64, max: f64) -> Self::Variable {
let col = self.model.add_col();
self.model.set_obj_coeff(col, weight);
self.model.set_col_lower(col, min);
self.model.set_col_upper(col, max);
col
}

#[inline]
fn add_bool_variable(&mut self, weight: f64) -> Self::Variable {
let col = self.model.add_binary();
self.model.set_obj_coeff(col, weight);
col
}

#[inline]
fn add_nat_variable(&mut self, weight: f64) -> Self::Variable {
let col = self.model.add_integer();
self.model.set_obj_coeff(col, weight);
self.model.set_col_lower(col, 0.0);
col
}

#[inline]
fn add_constraint(
&mut self,
constraint_type: ConstraintType,
value: f64,
variables: impl Iterator<Item = (Self::Variable, f64)>,
) {
let row = self.model.add_row();
match constraint_type {
ConstraintType::Min => self.model.set_row_lower(row, value),
ConstraintType::Max => self.model.set_row_upper(row, value),
ConstraintType::Eq => self.model.set_row_equal(row, value),
}
for (col, coeff) in variables {
self.model.set_weight(row, col, coeff);
}
}

fn solve(self) -> Option<(Self::Solution, f64)> {
let solution = self.model.solve();
if !solution.raw().is_proven_optimal() {
return None;
}
let obj = solution.raw().obj_value();
Some((solution, obj))
}
}

impl LpSolution for coin_cbc::Solution {
type Variable = Col;

#[inline]
fn get_value(&self, v: Col) -> f64 {
self.col(v)
}
}
6 changes: 4 additions & 2 deletions crates/tako/src/internal/solver/highs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ impl LpInnerSolver for HighsSolver {
}

impl LpSolution for highs::Solution {
type Variable = highs::Col;

#[inline]
fn get_values(&self) -> &[f64] {
self.columns()
fn get_value(&self, v: highs::Col) -> f64 {
self[v]
}
}
19 changes: 9 additions & 10 deletions crates/tako/src/internal/solver/microlp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::internal::solver::{ConstraintType, LpInnerSolver, LpSolution};
use microlp::ComparisonOp;
use microlp::{ComparisonOp, Solution};

pub(crate) struct MicrolpSolver(microlp::Problem);

Expand All @@ -13,7 +13,7 @@ impl MicrolpSolver {

impl LpInnerSolver for MicrolpSolver {
type Variable = microlp::Variable;
type Solution = MicrolpSolution;
type Solution = Solution;

#[inline]
fn add_variable(&mut self, weight: f64, min: f64, max: f64) -> Self::Variable {
Expand Down Expand Up @@ -58,17 +58,16 @@ impl LpInnerSolver for MicrolpSolver {
let Ok(solution) = self.0.solve() else {
return None;
};
Some((
MicrolpSolution(solution.iter().map(|x| *x.1).collect()),
solution.objective(),
))
let objective = solution.objective();
Some((solution, objective))
}
}

pub(crate) struct MicrolpSolution(Vec<f64>);
impl LpSolution for microlp::Solution {
type Variable = microlp::Variable;

impl LpSolution for MicrolpSolution {
fn get_values(&self) -> &[f64] {
self.0.as_slice()
#[inline]
fn get_value(&self, v: microlp::Variable) -> f64 {
*self.var_value(v)
}
}
Loading
Loading