diff --git a/Cargo.lock b/Cargo.lock index f8793a9..4ef853a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -370,6 +370,26 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "asap_planner" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap 4.5.60", + "indexmap", + "pretty_assertions", + "promql-parser", + "promql_utilities", + "serde", + "serde_json", + "serde_yaml", + "sketch_db_common", + "tempfile", + "thiserror 1.0.69", + "tracing", + "tracing-subscriber", +] + [[package]] name = "async-compression" version = "0.4.19" @@ -1417,6 +1437,12 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "diff" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" + [[package]] name = "digest" version = "0.10.7" @@ -3008,6 +3034,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "pretty_assertions" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ae130e2f271fbc2ac3a40fb1d07180839cdbbe443c7a27e1e3c13c5cac0116d" +dependencies = [ + "diff", + "yansi", +] + [[package]] name = "prettyplease" version = "0.2.37" @@ -5070,6 +5106,12 @@ dependencies = [ "lzma-sys", ] +[[package]] +name = "yansi" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" + [[package]] name = "yoke" version = "0.8.1" diff --git a/Cargo.toml b/Cargo.toml index 5101fb6..f396d96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "asap-common/dependencies/rs/sketch_db_common", "asap-common/dependencies/rs/datafusion_summary_library", "asap-query-engine", + "asap-planner-rs", ] [workspace.package] @@ -33,3 +34,5 @@ promql_utilities = { path = "asap-common/dependencies/rs/promql_utilities" } sql_utilities = { path = "asap-common/dependencies/rs/sql_utilities" } sketch_db_common = { path = "asap-common/dependencies/rs/sketch_db_common" } datafusion_summary_library = { path = "asap-common/dependencies/rs/datafusion_summary_library" } +asap_planner = { path = "asap-planner-rs" } +indexmap = { version = "2.0", features = ["serde"] } diff --git a/asap-planner-rs/Cargo.toml b/asap-planner-rs/Cargo.toml new file mode 100644 index 0000000..0c28bc4 --- /dev/null +++ b/asap-planner-rs/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "asap_planner" +version.workspace = true +edition.workspace = true + +[lib] +name = "asap_planner" +path = "src/lib.rs" + +[[bin]] +name = "asap-planner" +path = "src/main.rs" + +[dependencies] +sketch_db_common.workspace = true +promql_utilities.workspace = true +serde.workspace = true +serde_json.workspace = true +serde_yaml.workspace = true +thiserror.workspace = true +anyhow.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +clap.workspace = true +indexmap.workspace = true +promql-parser = "0.5.0" + +[dev-dependencies] +tempfile = "3.20" +pretty_assertions = "1.4" diff --git a/asap-planner-rs/src/config/input.rs b/asap-planner-rs/src/config/input.rs new file mode 100644 index 0000000..b50d0f3 --- /dev/null +++ b/asap-planner-rs/src/config/input.rs @@ -0,0 +1,72 @@ +use serde::Deserialize; + +#[derive(Debug, Clone, Deserialize)] +pub struct ControllerConfig { + pub query_groups: Vec, + pub metrics: Vec, + pub sketch_parameters: Option, + pub aggregate_cleanup: Option, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct QueryGroup { + pub id: Option, + pub queries: Vec, + pub repetition_delay: u64, + pub controller_options: ControllerOptions, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct ControllerOptions { + pub accuracy_sla: f64, + pub latency_sla: f64, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct MetricDefinition { + pub metric: String, + pub labels: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct AggregateCleanupConfig { + pub policy: Option, +} + +#[derive(Debug, Clone, Deserialize, Default)] +pub struct SketchParameterOverrides { + #[serde(rename = "CountMinSketch")] + pub count_min_sketch: Option, + #[serde(rename = "CountMinSketchWithHeap")] + pub count_min_sketch_with_heap: Option, + #[serde(rename = "DatasketchesKLL")] + pub datasketches_kll: Option, + #[serde(rename = "HydraKLL")] + pub hydra_kll: Option, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct CmsParams { + pub depth: u64, + pub width: u64, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct CmsHeapParams { + pub depth: u64, + pub width: u64, + pub heap_multiplier: Option, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct KllParams { + #[serde(rename = "K")] + pub k: u64, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct HydraParams { + pub row_num: u64, + pub col_num: u64, + pub k: u64, +} diff --git a/asap-planner-rs/src/config/mod.rs b/asap-planner-rs/src/config/mod.rs new file mode 100644 index 0000000..53b0e32 --- /dev/null +++ b/asap-planner-rs/src/config/mod.rs @@ -0,0 +1,2 @@ +pub mod input; +pub use input::*; diff --git a/asap-planner-rs/src/error.rs b/asap-planner-rs/src/error.rs new file mode 100644 index 0000000..cee5e5c --- /dev/null +++ b/asap-planner-rs/src/error.rs @@ -0,0 +1,17 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum ControllerError { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + #[error("YAML parse error: {0}")] + YamlParse(#[from] serde_yaml::Error), + #[error("PromQL parse error: {0}")] + PromQLParse(String), + #[error("Duplicate query: {0}")] + DuplicateQuery(String), + #[error("Planner error: {0}")] + PlannerError(String), + #[error("Unknown metric: {0}")] + UnknownMetric(String), +} diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs new file mode 100644 index 0000000..74992c0 --- /dev/null +++ b/asap-planner-rs/src/lib.rs @@ -0,0 +1,201 @@ +pub mod config; +pub mod error; +pub mod output; +pub mod planner; + +use serde_yaml::Value as YamlValue; +use std::path::Path; + +pub use config::input::ControllerConfig; +pub use error::ControllerError; +pub use output::generator::{GeneratorOutput, PuntedQuery}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum StreamingEngine { + Arroyo, + Flink, +} + +#[derive(Debug, Clone)] +pub struct RuntimeOptions { + pub prometheus_scrape_interval: u64, + pub streaming_engine: StreamingEngine, + pub enable_punting: bool, + pub range_duration: u64, + pub step: u64, +} + +pub struct Controller { + config: ControllerConfig, + options: RuntimeOptions, +} + +/// Output of the planning process — contains the two YAML configs +pub struct PlannerOutput { + pub punted_queries: Vec, + streaming_yaml: YamlValue, + inference_yaml: YamlValue, + aggregation_count: usize, + query_count: usize, +} + +impl PlannerOutput { + pub fn streaming_aggregation_count(&self) -> usize { + self.aggregation_count + } + + pub fn inference_query_count(&self) -> usize { + self.query_count + } + + pub fn has_aggregation_type(&self, t: &str) -> bool { + if let YamlValue::Mapping(root) = &self.streaming_yaml { + if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") { + return aggs.iter().any(|agg| { + if let YamlValue::Mapping(m) = agg { + if let Some(YamlValue::String(agg_type)) = m.get("aggregationType") { + return agg_type == t; + } + } + false + }); + } + } + false + } + + pub fn all_tumbling_window_sizes_eq(&self, s: u64) -> bool { + self.check_tumbling_window_sizes(|size| size == s) + } + + pub fn all_tumbling_window_sizes_leq(&self, s: u64) -> bool { + self.check_tumbling_window_sizes(|size| size <= s) + } + + fn check_tumbling_window_sizes(&self, predicate: impl Fn(u64) -> bool) -> bool { + if let YamlValue::Mapping(root) = &self.streaming_yaml { + if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") { + return aggs.iter().all(|agg| { + if let YamlValue::Mapping(m) = agg { + if let Some(val) = m.get("tumblingWindowSize") { + let size = match val { + YamlValue::Number(n) => n.as_u64().unwrap_or(0), + _ => 0, + }; + return predicate(size); + } + } + false + }); + } + } + false + } + + /// Returns the sorted labels for the first aggregation matching `agg_type`, + /// for the given `label_kind` ("rollup", "grouping", or "aggregated"). + pub fn aggregation_labels(&self, agg_type: &str, label_kind: &str) -> Vec { + if let YamlValue::Mapping(root) = &self.streaming_yaml { + if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") { + for agg in aggs { + if let YamlValue::Mapping(m) = agg { + if let Some(YamlValue::String(t)) = m.get("aggregationType") { + if t == agg_type { + if let Some(YamlValue::Mapping(labels)) = m.get("labels") { + if let Some(YamlValue::Sequence(seq)) = labels.get(label_kind) { + let mut result: Vec = seq + .iter() + .filter_map(|v| { + if let YamlValue::String(s) = v { + Some(s.clone()) + } else { + None + } + }) + .collect(); + result.sort(); + return result; + } + } + } + } + } + } + } + } + vec![] + } + + /// Returns the cleanup param (read_count_threshold or num_aggregates_to_retain) + /// for the first aggregation entry of the given query string. + pub fn inference_cleanup_param(&self, query: &str) -> Option { + if let YamlValue::Mapping(root) = &self.inference_yaml { + if let Some(YamlValue::Sequence(queries)) = root.get("queries") { + for q in queries { + if let YamlValue::Mapping(qm) = q { + if let Some(YamlValue::String(qs)) = qm.get("query") { + if qs == query { + if let Some(YamlValue::Sequence(aggs)) = qm.get("aggregations") { + if let Some(YamlValue::Mapping(agg)) = aggs.first() { + for key in + ["read_count_threshold", "num_aggregates_to_retain"] + { + if let Some(YamlValue::Number(n)) = agg.get(key) { + return n.as_u64(); + } + } + } + } + } + } + } + } + } + } + None + } + + pub fn to_streaming_yaml_string(&self) -> Result { + Ok(serde_yaml::to_string(&self.streaming_yaml)?) + } + + pub fn to_inference_yaml_string(&self) -> Result { + Ok(serde_yaml::to_string(&self.inference_yaml)?) + } +} + +impl Controller { + pub fn from_file(path: &Path, opts: RuntimeOptions) -> Result { + let yaml_str = std::fs::read_to_string(path)?; + Self::from_yaml(&yaml_str, opts) + } + + pub fn from_yaml(yaml: &str, opts: RuntimeOptions) -> Result { + let config: ControllerConfig = serde_yaml::from_str(yaml)?; + Ok(Self { + config, + options: opts, + }) + } + + pub fn generate(&self) -> Result { + let output = output::generator::generate_plan(&self.config, &self.options)?; + Ok(PlannerOutput { + punted_queries: output.punted_queries, + streaming_yaml: output.streaming_yaml, + inference_yaml: output.inference_yaml, + aggregation_count: output.aggregation_count, + query_count: output.query_count, + }) + } + + pub fn generate_to_dir(&self, dir: &Path) -> Result { + let output = self.generate()?; + std::fs::create_dir_all(dir)?; + let streaming_str = serde_yaml::to_string(&output.streaming_yaml)?; + let inference_str = serde_yaml::to_string(&output.inference_yaml)?; + std::fs::write(dir.join("streaming_config.yaml"), streaming_str)?; + std::fs::write(dir.join("inference_config.yaml"), inference_str)?; + Ok(output) + } +} diff --git a/asap-planner-rs/src/main.rs b/asap-planner-rs/src/main.rs new file mode 100644 index 0000000..c1fa0b9 --- /dev/null +++ b/asap-planner-rs/src/main.rs @@ -0,0 +1,68 @@ +use asap_planner::{Controller, RuntimeOptions, StreamingEngine}; +use clap::Parser; +use std::path::PathBuf; + +#[derive(Parser, Debug)] +#[command(name = "asap-planner", about = "ASAP Query Planner")] +struct Args { + #[arg(long = "input_config")] + input_config: PathBuf, + + #[arg(long = "output_dir")] + output_dir: PathBuf, + + #[arg(long = "prometheus_scrape_interval")] + prometheus_scrape_interval: u64, + + #[arg(long = "streaming_engine", value_enum)] + streaming_engine: EngineArg, + + #[arg(long = "enable-punting", default_value = "false")] + enable_punting: bool, + + #[arg(long = "range-duration", default_value = "0")] + range_duration: u64, + + #[arg(long = "step", default_value = "0")] + step: u64, + + #[arg(short, long, action = clap::ArgAction::Count)] + verbose: u8, +} + +#[derive(clap::ValueEnum, Debug, Clone, Copy)] +enum EngineArg { + Arroyo, + Flink, +} + +fn main() -> anyhow::Result<()> { + let args = Args::parse(); + + tracing_subscriber::fmt() + .with_max_level(if args.verbose > 0 { + tracing::Level::DEBUG + } else { + tracing::Level::WARN + }) + .init(); + + let engine = match args.streaming_engine { + EngineArg::Arroyo => StreamingEngine::Arroyo, + EngineArg::Flink => StreamingEngine::Flink, + }; + + let opts = RuntimeOptions { + prometheus_scrape_interval: args.prometheus_scrape_interval, + streaming_engine: engine, + enable_punting: args.enable_punting, + range_duration: args.range_duration, + step: args.step, + }; + + let controller = Controller::from_file(&args.input_config, opts)?; + controller.generate_to_dir(&args.output_dir)?; + + println!("Generated configs in {}", args.output_dir.display()); + Ok(()) +} diff --git a/asap-planner-rs/src/output/generator.rs b/asap-planner-rs/src/output/generator.rs new file mode 100644 index 0000000..31aaabf --- /dev/null +++ b/asap-planner-rs/src/output/generator.rs @@ -0,0 +1,372 @@ +use indexmap::IndexMap; +use serde_json::Value as JsonValue; +use serde_yaml::Value as YamlValue; +use std::collections::HashMap; + +use promql_utilities::data_model::KeyByLabelNames; +use sketch_db_common::enums::CleanupPolicy; + +use crate::config::input::ControllerConfig; +use crate::error::ControllerError; +use crate::planner::single_query::{IntermediateAggConfig, SingleQueryProcessor}; +use crate::RuntimeOptions; + +/// Run the full planning pipeline and produce YAML outputs +pub fn generate_plan( + controller_config: &ControllerConfig, + opts: &RuntimeOptions, +) -> Result { + // Build metric schema + let mut metric_schema = sketch_db_common::PromQLSchema::new(); + for md in &controller_config.metrics { + metric_schema = + metric_schema.add_metric(md.metric.clone(), KeyByLabelNames::new(md.labels.clone())); + } + + // Determine cleanup policy + let cleanup_policy_str = controller_config + .aggregate_cleanup + .as_ref() + .and_then(|c| c.policy.as_deref()) + .unwrap_or("read_based"); + let cleanup_policy = parse_cleanup_policy(cleanup_policy_str)?; + + // Validate no duplicate queries + let mut seen_queries = std::collections::HashSet::new(); + for qg in &controller_config.query_groups { + for q in &qg.queries { + if !seen_queries.insert(q.clone()) { + return Err(ControllerError::DuplicateQuery(q.clone())); + } + } + } + + // Deduplication map: identifying_key -> (agg_config, assigned_id_placeholder) + let mut dedup_map: IndexMap = IndexMap::new(); + // query_string -> Vec<(key, cleanup_param)> + let mut query_keys_map: IndexMap)>> = IndexMap::new(); + + let mut punted_queries: Vec = Vec::new(); + + for qg in &controller_config.query_groups { + for query_string in &qg.queries { + let processor = SingleQueryProcessor::new( + query_string.clone(), + qg.repetition_delay, + opts.prometheus_scrape_interval, + metric_schema.clone(), + opts.streaming_engine, + controller_config.sketch_parameters.clone(), + opts.range_duration, + opts.step, + cleanup_policy, + ); + + let mut should_process = processor.is_supported(); + if opts.enable_punting && should_process { + should_process = should_process && processor.should_be_performant(); + if !should_process { + punted_queries.push(PuntedQuery { + query: query_string.clone(), + }); + } + } + + if should_process { + let (configs, cleanup_param) = processor.get_streaming_aggregation_configs()?; + let mut keys_for_query = Vec::new(); + for config in configs { + let key = config.identifying_key(); + keys_for_query.push((key.clone(), cleanup_param)); + dedup_map.entry(key).or_insert(config); + } + query_keys_map.insert(query_string.clone(), keys_for_query); + } + } + } + + // Assign sequential IDs (1-indexed, insertion order) + let mut id_map: HashMap = HashMap::new(); + for (idx, key) in dedup_map.keys().enumerate() { + id_map.insert(key.clone(), idx as u32 + 1); + } + + // Build streaming_config YAML + let streaming_yaml = build_streaming_yaml(&dedup_map, &id_map, &metric_schema)?; + + // Build inference_config YAML + let inference_yaml = build_inference_yaml( + cleanup_policy, + cleanup_policy_str, + &query_keys_map, + &id_map, + &metric_schema, + )?; + + Ok(GeneratorOutput { + punted_queries, + streaming_yaml, + inference_yaml, + aggregation_count: dedup_map.len(), + query_count: query_keys_map.len(), + }) +} + +fn parse_cleanup_policy(s: &str) -> Result { + match s { + "circular_buffer" => Ok(CleanupPolicy::CircularBuffer), + "read_based" => Ok(CleanupPolicy::ReadBased), + "no_cleanup" => Ok(CleanupPolicy::NoCleanup), + other => Err(ControllerError::PlannerError(format!( + "Unknown cleanup policy: {}", + other + ))), + } +} + +fn key_by_labels_to_yaml(labels: &KeyByLabelNames) -> YamlValue { + YamlValue::Sequence( + labels + .labels + .iter() + .map(|l| YamlValue::String(l.clone())) + .collect(), + ) +} + +fn params_to_yaml(params: &HashMap) -> YamlValue { + if params.is_empty() { + return YamlValue::Mapping(serde_yaml::Mapping::new()); + } + let mut map = serde_yaml::Mapping::new(); + // Sort for determinism + let mut sorted: Vec<_> = params.iter().collect(); + sorted.sort_by_key(|(k, _)| k.as_str()); + for (k, v) in sorted { + let yaml_val = match v { + JsonValue::Number(n) => { + if let Some(i) = n.as_u64() { + YamlValue::Number(serde_yaml::Number::from(i)) + } else if let Some(f) = n.as_f64() { + YamlValue::Number(serde_yaml::Number::from(f)) + } else { + YamlValue::String(n.to_string()) + } + } + JsonValue::String(s) => YamlValue::String(s.clone()), + JsonValue::Bool(b) => YamlValue::Bool(*b), + other => YamlValue::String(other.to_string()), + }; + map.insert(YamlValue::String(k.clone()), yaml_val); + } + YamlValue::Mapping(map) +} + +fn build_streaming_yaml( + dedup_map: &IndexMap, + id_map: &HashMap, + metric_schema: &sketch_db_common::PromQLSchema, +) -> Result { + let aggregations: Vec = dedup_map + .iter() + .map(|(key, cfg)| { + let id = id_map[key]; + let mut map = serde_yaml::Mapping::new(); + map.insert( + YamlValue::String("aggregationId".to_string()), + YamlValue::Number(id.into()), + ); + map.insert( + YamlValue::String("aggregationSubType".to_string()), + YamlValue::String(cfg.aggregation_sub_type.clone()), + ); + map.insert( + YamlValue::String("aggregationType".to_string()), + YamlValue::String(cfg.aggregation_type.clone()), + ); + + // labels + let mut labels_map = serde_yaml::Mapping::new(); + labels_map.insert( + YamlValue::String("aggregated".to_string()), + key_by_labels_to_yaml(&cfg.aggregated_labels), + ); + labels_map.insert( + YamlValue::String("grouping".to_string()), + key_by_labels_to_yaml(&cfg.grouping_labels), + ); + labels_map.insert( + YamlValue::String("rollup".to_string()), + key_by_labels_to_yaml(&cfg.rollup_labels), + ); + map.insert( + YamlValue::String("labels".to_string()), + YamlValue::Mapping(labels_map), + ); + + map.insert( + YamlValue::String("metric".to_string()), + YamlValue::String(cfg.metric.clone()), + ); + map.insert( + YamlValue::String("parameters".to_string()), + params_to_yaml(&cfg.parameters), + ); + map.insert( + YamlValue::String("slideInterval".to_string()), + YamlValue::Number(cfg.slide_interval.into()), + ); + map.insert( + YamlValue::String("spatialFilter".to_string()), + YamlValue::String(cfg.spatial_filter.clone()), + ); + map.insert( + YamlValue::String("table_name".to_string()), + match &cfg.table_name { + Some(t) => YamlValue::String(t.clone()), + None => YamlValue::Null, + }, + ); + map.insert( + YamlValue::String("tumblingWindowSize".to_string()), + YamlValue::Number(cfg.tumbling_window_size.into()), + ); + map.insert( + YamlValue::String("value_column".to_string()), + match &cfg.value_column { + Some(v) => YamlValue::String(v.clone()), + None => YamlValue::Null, + }, + ); + map.insert( + YamlValue::String("windowSize".to_string()), + YamlValue::Number(cfg.window_size.into()), + ); + map.insert( + YamlValue::String("windowType".to_string()), + YamlValue::String(cfg.window_type.clone()), + ); + + YamlValue::Mapping(map) + }) + .collect(); + + // Build metrics section + let mut metrics_map = serde_yaml::Mapping::new(); + for (metric_name, labels) in &metric_schema.config { + metrics_map.insert( + YamlValue::String(metric_name.clone()), + key_by_labels_to_yaml(labels), + ); + } + + let mut root = serde_yaml::Mapping::new(); + root.insert( + YamlValue::String("aggregations".to_string()), + YamlValue::Sequence(aggregations), + ); + root.insert( + YamlValue::String("metrics".to_string()), + YamlValue::Mapping(metrics_map), + ); + + Ok(YamlValue::Mapping(root)) +} + +fn build_inference_yaml( + cleanup_policy: CleanupPolicy, + cleanup_policy_str: &str, + query_keys_map: &IndexMap)>>, + id_map: &HashMap, + metric_schema: &sketch_db_common::PromQLSchema, +) -> Result { + let mut cleanup_map = serde_yaml::Mapping::new(); + cleanup_map.insert( + YamlValue::String("name".to_string()), + YamlValue::String(cleanup_policy_str.to_string()), + ); + + let queries: Vec = query_keys_map + .iter() + .map(|(query_str, keys)| { + let aggregations: Vec = keys + .iter() + .map(|(key, cleanup_param)| { + let agg_id = id_map[key]; + let mut agg_map = serde_yaml::Mapping::new(); + agg_map.insert( + YamlValue::String("aggregation_id".to_string()), + YamlValue::Number(agg_id.into()), + ); + if let Some(param) = cleanup_param { + match cleanup_policy { + CleanupPolicy::CircularBuffer => { + agg_map.insert( + YamlValue::String("num_aggregates_to_retain".to_string()), + YamlValue::Number((*param).into()), + ); + } + CleanupPolicy::ReadBased => { + agg_map.insert( + YamlValue::String("read_count_threshold".to_string()), + YamlValue::Number((*param).into()), + ); + } + CleanupPolicy::NoCleanup => {} + } + } + YamlValue::Mapping(agg_map) + }) + .collect(); + + let mut q_map = serde_yaml::Mapping::new(); + q_map.insert( + YamlValue::String("aggregations".to_string()), + YamlValue::Sequence(aggregations), + ); + q_map.insert( + YamlValue::String("query".to_string()), + YamlValue::String(query_str.clone()), + ); + YamlValue::Mapping(q_map) + }) + .collect(); + + // Build metrics section + let mut metrics_map = serde_yaml::Mapping::new(); + for (metric_name, labels) in &metric_schema.config { + metrics_map.insert( + YamlValue::String(metric_name.clone()), + key_by_labels_to_yaml(labels), + ); + } + + let mut root = serde_yaml::Mapping::new(); + root.insert( + YamlValue::String("cleanup_policy".to_string()), + YamlValue::Mapping(cleanup_map), + ); + root.insert( + YamlValue::String("metrics".to_string()), + YamlValue::Mapping(metrics_map), + ); + root.insert( + YamlValue::String("queries".to_string()), + YamlValue::Sequence(queries), + ); + + Ok(YamlValue::Mapping(root)) +} + +#[derive(Debug, Clone)] +pub struct PuntedQuery { + pub query: String, +} + +pub struct GeneratorOutput { + pub punted_queries: Vec, + pub streaming_yaml: YamlValue, + pub inference_yaml: YamlValue, + pub aggregation_count: usize, + pub query_count: usize, +} diff --git a/asap-planner-rs/src/output/mod.rs b/asap-planner-rs/src/output/mod.rs new file mode 100644 index 0000000..225c968 --- /dev/null +++ b/asap-planner-rs/src/output/mod.rs @@ -0,0 +1,2 @@ +pub mod generator; +pub use generator::*; diff --git a/asap-planner-rs/src/planner/logics.rs b/asap-planner-rs/src/planner/logics.rs new file mode 100644 index 0000000..c18803c --- /dev/null +++ b/asap-planner-rs/src/planner/logics.rs @@ -0,0 +1,403 @@ +use crate::config::input::SketchParameterOverrides; +use promql_utilities::ast_matching::PromQLMatchResult; +use promql_utilities::data_model::KeyByLabelNames; +use promql_utilities::query_logics::enums::{QueryPatternType, Statistic}; +use promql_utilities::query_logics::logics::does_precompute_operator_support_subpopulations; +use sketch_db_common::enums::CleanupPolicy; +use std::collections::HashMap; + +// Default sketch parameters +const DEFAULT_CMS_DEPTH: u64 = 3; +const DEFAULT_CMS_WIDTH: u64 = 1024; +const DEFAULT_CMS_HEAP_MULT: u64 = 4; +const DEFAULT_KLL_K: u64 = 20; +const DEFAULT_HYDRA_ROW: u64 = 3; +const DEFAULT_HYDRA_COL: u64 = 1024; +const DEFAULT_HYDRA_K: u64 = 20; + +pub fn get_effective_repeat(t_repeat: u64, step: u64) -> u64 { + if step > 0 { + t_repeat.min(step) + } else { + t_repeat + } +} + +pub fn should_use_sliding_window( + _query_pattern_type: QueryPatternType, + _aggregation_type: &str, +) -> bool { + // HARDCODED: sliding windows crash Arroyo + false +} + +pub fn set_window_parameters( + query_pattern_type: QueryPatternType, + t_repeat: u64, + prometheus_scrape_interval: u64, + aggregation_type: &str, + step: u64, + config: &mut IntermediateWindowConfig, +) { + let effective_repeat = get_effective_repeat(t_repeat, step); + let _use_sliding = should_use_sliding_window(query_pattern_type, aggregation_type); + // use_sliding is always false, so always tumbling + set_tumbling_window_parameters( + query_pattern_type, + effective_repeat, + prometheus_scrape_interval, + config, + ); +} + +fn set_tumbling_window_parameters( + query_pattern_type: QueryPatternType, + effective_repeat: u64, + prometheus_scrape_interval: u64, + config: &mut IntermediateWindowConfig, +) { + match query_pattern_type { + QueryPatternType::OnlyTemporal | QueryPatternType::OneTemporalOneSpatial => { + config.window_size = effective_repeat; + config.slide_interval = effective_repeat; + config.window_type = "tumbling".to_string(); + config.tumbling_window_size = effective_repeat; + } + QueryPatternType::OnlySpatial => { + config.window_size = prometheus_scrape_interval; + config.slide_interval = prometheus_scrape_interval; + config.window_type = "tumbling".to_string(); + config.tumbling_window_size = prometheus_scrape_interval; + } + } +} + +/// A mutable window config holder used during planning +#[derive(Debug, Clone, Default)] +pub struct IntermediateWindowConfig { + pub window_size: u64, + pub slide_interval: u64, + pub window_type: String, + pub tumbling_window_size: u64, +} + +pub fn get_precompute_operator_parameters( + aggregation_type: &str, + aggregation_sub_type: &str, + match_result: &PromQLMatchResult, + sketch_params: Option<&SketchParameterOverrides>, +) -> Result, String> { + match aggregation_type { + "Increase" | "MinMax" | "Sum" | "MultipleIncrease" | "MultipleMinMax" | "MultipleSum" + | "DeltaSetAggregator" | "SetAggregator" => Ok(HashMap::new()), + + "CountMinSketch" => { + let depth = sketch_params + .and_then(|p| p.count_min_sketch.as_ref()) + .map(|p| p.depth) + .unwrap_or(DEFAULT_CMS_DEPTH); + let width = sketch_params + .and_then(|p| p.count_min_sketch.as_ref()) + .map(|p| p.width) + .unwrap_or(DEFAULT_CMS_WIDTH); + let mut m = HashMap::new(); + m.insert("depth".to_string(), serde_json::Value::Number(depth.into())); + m.insert("width".to_string(), serde_json::Value::Number(width.into())); + Ok(m) + } + + "CountMinSketchWithHeap" => { + if aggregation_sub_type != "topk" { + return Err(format!( + "Aggregation sub-type {} for CountMinSketchWithHeap not supported", + aggregation_sub_type + )); + } + // Get k from aggregation param + let k: u64 = match_result + .tokens + .get("aggregation") + .and_then(|t| t.aggregation.as_ref()) + .and_then(|a| a.param.as_ref()) + .and_then(|p| p.parse::().ok()) + .map(|f| f as u64) + .ok_or_else(|| "topk query missing required 'k' parameter".to_string())?; + + let depth = sketch_params + .and_then(|p| p.count_min_sketch_with_heap.as_ref()) + .map(|p| p.depth) + .unwrap_or(DEFAULT_CMS_DEPTH); + let width = sketch_params + .and_then(|p| p.count_min_sketch_with_heap.as_ref()) + .map(|p| p.width) + .unwrap_or(DEFAULT_CMS_WIDTH); + let heap_mult = sketch_params + .and_then(|p| p.count_min_sketch_with_heap.as_ref()) + .and_then(|p| p.heap_multiplier) + .unwrap_or(DEFAULT_CMS_HEAP_MULT); + + let mut m = HashMap::new(); + m.insert("depth".to_string(), serde_json::Value::Number(depth.into())); + m.insert("width".to_string(), serde_json::Value::Number(width.into())); + m.insert( + "heapsize".to_string(), + serde_json::Value::Number((k * heap_mult).into()), + ); + Ok(m) + } + + "DatasketchesKLL" => { + let k = sketch_params + .and_then(|p| p.datasketches_kll.as_ref()) + .map(|p| p.k) + .unwrap_or(DEFAULT_KLL_K); + let mut m = HashMap::new(); + m.insert("K".to_string(), serde_json::Value::Number(k.into())); + Ok(m) + } + + "HydraKLL" => { + let row_num = sketch_params + .and_then(|p| p.hydra_kll.as_ref()) + .map(|p| p.row_num) + .unwrap_or(DEFAULT_HYDRA_ROW); + let col_num = sketch_params + .and_then(|p| p.hydra_kll.as_ref()) + .map(|p| p.col_num) + .unwrap_or(DEFAULT_HYDRA_COL); + let k = sketch_params + .and_then(|p| p.hydra_kll.as_ref()) + .map(|p| p.k) + .unwrap_or(DEFAULT_HYDRA_K); + let mut m = HashMap::new(); + m.insert( + "row_num".to_string(), + serde_json::Value::Number(row_num.into()), + ); + m.insert( + "col_num".to_string(), + serde_json::Value::Number(col_num.into()), + ); + m.insert("k".to_string(), serde_json::Value::Number(k.into())); + Ok(m) + } + + other => Err(format!("Aggregation type {} not supported", other)), + } +} + +pub fn get_cleanup_param( + cleanup_policy: CleanupPolicy, + query_pattern_type: QueryPatternType, + match_result: &PromQLMatchResult, + t_repeat: u64, + window_type: &str, + range_duration: u64, + step: u64, +) -> Result { + // Validation + if (range_duration == 0) != (step == 0) { + return Err(format!( + "range_duration and step must both be 0 or both > 0. Got range_duration={}, step={}", + range_duration, step + )); + } + + let is_range_query = step > 0; + + let t_lookback: u64 = if query_pattern_type == QueryPatternType::OnlySpatial { + t_repeat + } else { + match_result + .get_range_duration() + .map(|d| d.num_seconds() as u64) + .ok_or_else(|| "No range_vector token found".to_string())? + }; + + if window_type == "sliding" { + let result = if is_range_query { + range_duration / step + 1 + } else { + 1 + }; + return Ok(result); + } + + // Tumbling + let effective_repeat = get_effective_repeat(t_repeat, step); + + let result = match cleanup_policy { + CleanupPolicy::CircularBuffer => { + // ceil((t_lookback + range_duration) / effective_repeat) + let numerator = t_lookback + range_duration; + numerator.div_ceil(effective_repeat) + } + CleanupPolicy::ReadBased => { + // ceil(t_lookback / effective_repeat) * (range_duration / step + 1) + let lookback_buckets = t_lookback.div_ceil(effective_repeat); + let num_steps = if is_range_query { + range_duration / step + 1 + } else { + 1 + }; + lookback_buckets * num_steps + } + CleanupPolicy::NoCleanup => { + return Err("NoCleanup policy should not call get_cleanup_param".to_string()); + } + }; + + Ok(result) +} + +pub fn set_subpopulation_labels( + statistic: Statistic, + aggregation_type: &str, + subpopulation_labels: &KeyByLabelNames, + rollup_labels: &mut KeyByLabelNames, + grouping_labels: &mut KeyByLabelNames, + aggregated_labels: &mut KeyByLabelNames, +) { + // rollup is set by caller before calling this function + let _ = rollup_labels; // not modified here + if does_precompute_operator_support_subpopulations(statistic, aggregation_type) { + *grouping_labels = KeyByLabelNames::empty(); + *aggregated_labels = subpopulation_labels.clone(); + } else { + *grouping_labels = subpopulation_labels.clone(); + *aggregated_labels = KeyByLabelNames::empty(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::planner::patterns::build_patterns; + + use promql_utilities::ast_matching::PromQLMatchResult; + use promql_utilities::query_logics::enums::QueryPatternType; + + fn match_query(query: &str) -> (QueryPatternType, PromQLMatchResult) { + let ast = promql_parser::parser::parse(query).unwrap(); + let patterns = build_patterns(); + for (pt, pattern) in &patterns { + let result = pattern.matches(&ast); + if result.matches { + return (*pt, result); + } + } + panic!("no pattern matched query: {}", query); + } + + // --- get_effective_repeat --- + + #[test] + fn effective_repeat_no_step() { + assert_eq!(get_effective_repeat(300, 0), 300); + } + + #[test] + fn effective_repeat_step_smaller_than_t_repeat() { + assert_eq!(get_effective_repeat(300, 30), 30); + } + + #[test] + fn effective_repeat_step_larger_than_t_repeat() { + assert_eq!(get_effective_repeat(30, 300), 30); + } + + // --- get_cleanup_param --- + + #[test] + fn cleanup_param_circular_buffer_spatial_instant_query() { + let (pt, mr) = match_query("sum(some_metric)"); + assert_eq!(pt, QueryPatternType::OnlySpatial); + // t_lookback = t_repeat = 300 (OnlySpatial path) + // effective_repeat = 300 (step=0) + // ceil((300 + 0) / 300) = 1 + let result = get_cleanup_param( + CleanupPolicy::CircularBuffer, + pt, + &mr, + 300, + "tumbling", + 0, + 0, + ) + .unwrap(); + assert_eq!(result, 1); + } + + #[test] + fn cleanup_param_circular_buffer_spatial_range_query() { + let (pt, mr) = match_query("sum(some_metric)"); + // t_lookback = t_repeat = 300, effective_repeat = min(300, 30) = 30 + // ceil((300 + 3600) / 30) = ceil(130) = 130 + let result = get_cleanup_param( + CleanupPolicy::CircularBuffer, + pt, + &mr, + 300, + "tumbling", + 3600, + 30, + ) + .unwrap(); + assert_eq!(result, 130); + } + + #[test] + fn cleanup_param_read_based_spatial_instant_query() { + let (pt, mr) = match_query("sum(some_metric)"); + // lookback_buckets = ceil(300/300) = 1, num_steps = 1 → result = 1 + let result = + get_cleanup_param(CleanupPolicy::ReadBased, pt, &mr, 300, "tumbling", 0, 0).unwrap(); + assert_eq!(result, 1); + } + + #[test] + fn cleanup_param_read_based_spatial_range_query() { + let (pt, mr) = match_query("sum(some_metric)"); + // lookback_buckets = ceil(300/30) = 10, num_steps = 3600/30 + 1 = 121 + // result = 10 * 121 = 1210 + let result = + get_cleanup_param(CleanupPolicy::ReadBased, pt, &mr, 300, "tumbling", 3600, 30) + .unwrap(); + assert_eq!(result, 1210); + } + + #[test] + fn cleanup_param_circular_buffer_temporal_instant_query() { + let (pt, mr) = match_query("rate(some_metric[5m])"); + assert_eq!(pt, QueryPatternType::OnlyTemporal); + // t_lookback = 5m = 300s (from [5m] range vector), range_duration=0, step=0 + // effective_repeat = 60, ceil((300 + 0) / 60) = 5 + let result = + get_cleanup_param(CleanupPolicy::CircularBuffer, pt, &mr, 60, "tumbling", 0, 0) + .unwrap(); + assert_eq!(result, 5); + } + + #[test] + fn cleanup_param_no_cleanup_returns_error() { + let (pt, mr) = match_query("sum(some_metric)"); + let result = get_cleanup_param(CleanupPolicy::NoCleanup, pt, &mr, 300, "tumbling", 0, 0); + assert!(result.is_err()); + } + + #[test] + fn cleanup_param_mismatched_range_and_step_returns_error() { + let (pt, mr) = match_query("sum(some_metric)"); + // range_duration > 0 but step == 0 is invalid + let result = get_cleanup_param( + CleanupPolicy::CircularBuffer, + pt, + &mr, + 300, + "tumbling", + 3600, + 0, + ); + assert!(result.is_err()); + } +} diff --git a/asap-planner-rs/src/planner/mod.rs b/asap-planner-rs/src/planner/mod.rs new file mode 100644 index 0000000..427dbdd --- /dev/null +++ b/asap-planner-rs/src/planner/mod.rs @@ -0,0 +1,4 @@ +pub mod logics; +pub mod patterns; +pub mod single_query; +pub use single_query::*; diff --git a/asap-planner-rs/src/planner/patterns.rs b/asap-planner-rs/src/planner/patterns.rs new file mode 100644 index 0000000..31b8554 --- /dev/null +++ b/asap-planner-rs/src/planner/patterns.rs @@ -0,0 +1,95 @@ +use promql_utilities::ast_matching::{PromQLPattern, PromQLPatternBuilder}; +use promql_utilities::query_logics::enums::QueryPatternType; + +/// Build all 5 patterns in priority order: ONLY_TEMPORAL (2), ONLY_SPATIAL (1), ONE_TEMPORAL_ONE_SPATIAL (2) +pub fn build_patterns() -> Vec<(QueryPatternType, PromQLPattern)> { + let metric_pattern = || PromQLPatternBuilder::metric(None, None, None, Some("metric")); + let range_vector_pattern = + || PromQLPatternBuilder::matrix_selector(metric_pattern(), None, Some("range_vector")); + + // ONLY_TEMPORAL pattern 1: quantile_over_time(phi, metric[range]) + let ot_quantile = PromQLPattern::new(PromQLPatternBuilder::function( + vec!["quantile_over_time"], + vec![ + PromQLPatternBuilder::number(None, None), + range_vector_pattern(), + ], + Some("function"), + Some("function_args"), + )); + + // ONLY_TEMPORAL pattern 2: sum_over_time/count_over_time/... (metric[range]) + let ot_temporal_funcs = PromQLPattern::new(PromQLPatternBuilder::function( + vec![ + "sum_over_time", + "count_over_time", + "avg_over_time", + "min_over_time", + "max_over_time", + "increase", + "rate", + ], + vec![range_vector_pattern()], + Some("function"), + Some("function_args"), + )); + + // ONLY_SPATIAL pattern: agg_op(metric) + let os_spatial = PromQLPattern::new(PromQLPatternBuilder::aggregation( + vec!["sum", "count", "avg", "quantile", "min", "max", "topk"], + metric_pattern(), + None, + None, + None, + Some("aggregation"), + )); + + // ONE_TEMPORAL_ONE_SPATIAL pattern 1: agg_op(quantile_over_time(phi, metric[range])) + let ottos_quantile = PromQLPattern::new(PromQLPatternBuilder::aggregation( + vec!["sum", "count", "avg", "quantile", "min", "max"], + PromQLPatternBuilder::function( + vec!["quantile_over_time"], + vec![ + PromQLPatternBuilder::number(None, None), + range_vector_pattern(), + ], + Some("function"), + Some("function_args"), + ), + None, + None, + None, + Some("aggregation"), + )); + + // ONE_TEMPORAL_ONE_SPATIAL pattern 2: agg_op(temporal_func(metric[range])) + let ottos_temporal = PromQLPattern::new(PromQLPatternBuilder::aggregation( + vec!["sum", "count", "avg", "quantile", "min", "max"], + PromQLPatternBuilder::function( + vec![ + "sum_over_time", + "count_over_time", + "avg_over_time", + "min_over_time", + "max_over_time", + "increase", + "rate", + ], + vec![range_vector_pattern()], + Some("function"), + Some("function_args"), + ), + None, + None, + None, + Some("aggregation"), + )); + + vec![ + (QueryPatternType::OnlyTemporal, ot_quantile), + (QueryPatternType::OnlyTemporal, ot_temporal_funcs), + (QueryPatternType::OnlySpatial, os_spatial), + (QueryPatternType::OneTemporalOneSpatial, ottos_quantile), + (QueryPatternType::OneTemporalOneSpatial, ottos_temporal), + ] +} diff --git a/asap-planner-rs/src/planner/single_query.rs b/asap-planner-rs/src/planner/single_query.rs new file mode 100644 index 0000000..9546992 --- /dev/null +++ b/asap-planner-rs/src/planner/single_query.rs @@ -0,0 +1,494 @@ +use promql_utilities::ast_matching::PromQLMatchResult; +use promql_utilities::data_model::KeyByLabelNames; +use promql_utilities::query_logics::enums::{QueryPatternType, QueryTreatmentType, Statistic}; +use promql_utilities::query_logics::logics::{ + get_is_collapsable, map_statistic_to_precompute_operator, +}; +use promql_utilities::query_logics::parsing::{ + get_metric_and_spatial_filter, get_spatial_aggregation_output_labels, get_statistics_to_compute, +}; +use serde_json::Value; +use sketch_db_common::enums::CleanupPolicy; +use sketch_db_common::PromQLSchema; +use std::collections::HashMap; + +use crate::config::input::SketchParameterOverrides; +use crate::error::ControllerError; +use crate::planner::logics::{ + get_cleanup_param, get_precompute_operator_parameters, set_subpopulation_labels, + set_window_parameters, IntermediateWindowConfig, +}; +use crate::planner::patterns::build_patterns; +use crate::StreamingEngine; + +/// Internal representation of an aggregation config before IDs are assigned +#[derive(Debug, Clone)] +pub struct IntermediateAggConfig { + pub aggregation_type: String, + pub aggregation_sub_type: String, + pub window_type: String, + pub window_size: u64, + pub slide_interval: u64, + pub tumbling_window_size: u64, + pub spatial_filter: String, + pub metric: String, + pub table_name: Option, + pub value_column: Option, + pub parameters: HashMap, + pub rollup_labels: KeyByLabelNames, + pub grouping_labels: KeyByLabelNames, + pub aggregated_labels: KeyByLabelNames, +} + +impl IntermediateAggConfig { + /// Canonical deduplication key matching Python's get_identifying_key() + pub fn identifying_key(&self) -> String { + // Build a canonical string representation matching Python's tuple + let mut params_vec: Vec<(String, String)> = self + .parameters + .iter() + .map(|(k, v)| (k.clone(), v.to_string())) + .collect(); + params_vec.sort_by_key(|(k, _)| k.clone()); + + let mut label_parts = String::new(); + // sorted label keys: aggregated, grouping, rollup + let mut label_keys = vec!["aggregated", "grouping", "rollup"]; + label_keys.sort(); + for k in label_keys { + let labels = match k { + "aggregated" => &self.aggregated_labels, + "grouping" => &self.grouping_labels, + "rollup" => &self.rollup_labels, + _ => unreachable!(), + }; + label_parts.push_str(&format!("{}:{:?};", k, labels.labels)); + } + + format!( + "{}|{}|{}|{}|{}|{}|{}|{}|{:?}|{:?}|{:?}|{}", + self.aggregation_type, + self.aggregation_sub_type, + self.window_type, + self.window_size, + self.slide_interval, + self.tumbling_window_size, + self.spatial_filter, + self.metric, + self.table_name, + self.value_column, + params_vec, + label_parts, + ) + } +} + +pub struct SingleQueryProcessor { + query: String, + t_repeat: u64, + prometheus_scrape_interval: u64, + metric_schema: PromQLSchema, + #[allow(dead_code)] + streaming_engine: StreamingEngine, + sketch_parameters: Option, + range_duration: u64, + step: u64, + cleanup_policy: CleanupPolicy, +} + +impl SingleQueryProcessor { + #[allow(clippy::too_many_arguments)] + pub fn new( + query: String, + t_repeat: u64, + prometheus_scrape_interval: u64, + metric_schema: PromQLSchema, + streaming_engine: StreamingEngine, + sketch_parameters: Option, + range_duration: u64, + step: u64, + cleanup_policy: CleanupPolicy, + ) -> Self { + Self { + query, + t_repeat, + prometheus_scrape_interval, + metric_schema, + streaming_engine, + sketch_parameters, + range_duration, + step, + cleanup_policy, + } + } + + /// Try to match query and return (pattern_type, match_result) or None + fn match_pattern( + &self, + ast: &promql_parser::parser::Expr, + ) -> Option<(QueryPatternType, PromQLMatchResult)> { + let patterns = build_patterns(); + for (pattern_type, pattern) in &patterns { + let result = pattern.matches(ast); + if result.matches { + return Some((*pattern_type, result)); + } + } + None + } + + /// Get treatment type (Exact vs Approximate) from pattern match + fn get_treatment_type( + pattern_type: QueryPatternType, + match_result: &PromQLMatchResult, + ) -> QueryTreatmentType { + match pattern_type { + QueryPatternType::OnlyTemporal | QueryPatternType::OneTemporalOneSpatial => { + let fn_name = match_result.get_function_name().unwrap_or_default(); + match fn_name.as_str() { + "quantile_over_time" | "sum_over_time" | "count_over_time" + | "avg_over_time" => QueryTreatmentType::Approximate, + _ => QueryTreatmentType::Exact, + } + } + QueryPatternType::OnlySpatial => { + let op = match_result.get_aggregation_op().unwrap_or_default(); + match op.as_str() { + "quantile" | "sum" | "count" | "avg" | "topk" => { + QueryTreatmentType::Approximate + } + _ => QueryTreatmentType::Exact, + } + } + } + } + + /// Check if query should be processed (supported pattern) + pub fn is_supported(&self) -> bool { + if let Ok(ast) = promql_parser::parser::parse(&self.query) { + self.match_pattern(&ast).is_some() + } else { + false + } + } + + /// Check if query should be performant (enable_punting check) + pub fn should_be_performant(&self) -> bool { + let ast = match promql_parser::parser::parse(&self.query) { + Ok(a) => a, + Err(_) => return false, + }; + let (pattern_type, match_result) = match self.match_pattern(&ast) { + Some(x) => x, + None => return true, + }; + + if pattern_type == QueryPatternType::OnlyTemporal { + let fn_name = match_result.get_function_name().unwrap_or_default(); + if matches!(fn_name.as_str(), "rate" | "increase" | "quantile_over_time") { + let num_data_points = self.t_repeat as f64 / self.prometheus_scrape_interval as f64; + if num_data_points < 60.0 { + return false; + } + if fn_name == "quantile_over_time" { + if let Some(range_dur) = match_result.get_range_duration() { + let range_secs = range_dur.num_seconds() as f64; + if range_secs / self.t_repeat as f64 > 15.0 { + return false; + } + } + } + } + } + true + } + + /// Generate streaming aggregation configs for this query + pub fn get_streaming_aggregation_configs( + &self, + ) -> Result<(Vec, Option), ControllerError> { + let ast = promql_parser::parser::parse(&self.query) + .map_err(|e| ControllerError::PromQLParse(e.to_string()))?; + + let (pattern_type, match_result) = self.match_pattern(&ast).ok_or_else(|| { + ControllerError::PlannerError(format!("Unsupported query: {}", self.query)) + })?; + + let treatment_type = Self::get_treatment_type(pattern_type, &match_result); + + let (metric, spatial_filter) = get_metric_and_spatial_filter(&match_result); + + let all_labels = self + .metric_schema + .get_labels(&metric) + .ok_or_else(|| ControllerError::UnknownMetric(metric.clone()))? + .clone(); + + let statistics = get_statistics_to_compute(pattern_type, &match_result); + + let mut configs: Vec = Vec::new(); + + // Shared window config (same for all statistics in this query) + let mut window_cfg = IntermediateWindowConfig::default(); + + // We use the first aggregation_type to set window parameters + // (window params don't depend on aggregation_type since sliding is disabled) + set_window_parameters( + pattern_type, + self.t_repeat, + self.prometheus_scrape_interval, + "any", // aggregation_type doesn't matter (sliding always false) + self.step, + &mut window_cfg, + ); + + for statistic in statistics { + let (aggregation_type, aggregation_sub_type) = + map_statistic_to_precompute_operator(statistic, treatment_type) + .map_err(ControllerError::PlannerError)?; + + // Compute labels + let (rollup_labels, grouping_labels, aggregated_labels) = compute_labels( + pattern_type, + statistic, + &aggregation_type, + &match_result, + &all_labels, + ); + + // Main config + let parameters = get_precompute_operator_parameters( + &aggregation_type, + &aggregation_sub_type, + &match_result, + self.sketch_parameters.as_ref(), + ) + .map_err(ControllerError::PlannerError)?; + + // DeltaSetAggregator pairing (hardcoded TODO) + if matches!(aggregation_type.as_str(), "CountMinSketch" | "HydraKLL") { + let delta_params = get_precompute_operator_parameters( + "DeltaSetAggregator", + "", + &match_result, + self.sketch_parameters.as_ref(), + ) + .map_err(ControllerError::PlannerError)?; + + configs.push(IntermediateAggConfig { + aggregation_type: "DeltaSetAggregator".to_string(), + aggregation_sub_type: String::new(), + window_type: window_cfg.window_type.clone(), + window_size: window_cfg.window_size, + slide_interval: window_cfg.slide_interval, + tumbling_window_size: window_cfg.tumbling_window_size, + spatial_filter: spatial_filter.clone(), + metric: metric.clone(), + table_name: None, + value_column: None, + parameters: delta_params, + rollup_labels: rollup_labels.clone(), + grouping_labels: grouping_labels.clone(), + aggregated_labels: aggregated_labels.clone(), + }); + } + + configs.push(IntermediateAggConfig { + aggregation_type, + aggregation_sub_type, + window_type: window_cfg.window_type.clone(), + window_size: window_cfg.window_size, + slide_interval: window_cfg.slide_interval, + tumbling_window_size: window_cfg.tumbling_window_size, + spatial_filter: spatial_filter.clone(), + metric: metric.clone(), + table_name: None, + value_column: None, + parameters, + rollup_labels, + grouping_labels, + aggregated_labels, + }); + } + + // Calculate cleanup param + let cleanup_param = if self.cleanup_policy == CleanupPolicy::NoCleanup { + None + } else { + Some( + get_cleanup_param( + self.cleanup_policy, + pattern_type, + &match_result, + self.t_repeat, + &window_cfg.window_type, + self.range_duration, + self.step, + ) + .map_err(ControllerError::PlannerError)?, + ) + }; + + Ok((configs, cleanup_param)) + } +} + +fn compute_labels( + pattern_type: QueryPatternType, + statistic: Statistic, + aggregation_type: &str, + match_result: &PromQLMatchResult, + all_labels: &KeyByLabelNames, +) -> (KeyByLabelNames, KeyByLabelNames, KeyByLabelNames) { + let mut rollup; + let mut grouping = KeyByLabelNames::empty(); + let mut aggregated = KeyByLabelNames::empty(); + + match pattern_type { + QueryPatternType::OnlyTemporal => { + rollup = KeyByLabelNames::empty(); + set_subpopulation_labels( + statistic, + aggregation_type, + all_labels, + &mut rollup, + &mut grouping, + &mut aggregated, + ); + } + QueryPatternType::OnlySpatial => { + // Match Python: if no by/without modifier, spatial_output = [] (rollup gets all labels). + // promql_utilities::get_spatial_aggregation_output_labels has a topk patch that returns + // all_labels when there is no modifier, but the Python planner returns [] in that case. + let has_modifier = match_result + .tokens + .get("aggregation") + .and_then(|t| t.aggregation.as_ref()) + .and_then(|a| a.modifier.as_ref()) + .is_some(); + let spatial_output = if has_modifier { + get_spatial_aggregation_output_labels(match_result, all_labels) + } else { + KeyByLabelNames::empty() + }; + rollup = all_labels.difference(&spatial_output); + set_subpopulation_labels( + statistic, + aggregation_type, + &spatial_output, + &mut rollup, + &mut grouping, + &mut aggregated, + ); + } + QueryPatternType::OneTemporalOneSpatial => { + let fn_name = match_result.get_function_name().unwrap_or_default(); + let agg_op = match_result.get_aggregation_op().unwrap_or_default(); + let collapsable = get_is_collapsable(&fn_name, &agg_op); + if !collapsable { + rollup = KeyByLabelNames::empty(); + set_subpopulation_labels( + statistic, + aggregation_type, + all_labels, + &mut rollup, + &mut grouping, + &mut aggregated, + ); + } else { + let spatial_output = + get_spatial_aggregation_output_labels(match_result, all_labels); + rollup = all_labels.difference(&spatial_output); + set_subpopulation_labels( + statistic, + aggregation_type, + &spatial_output, + &mut rollup, + &mut grouping, + &mut aggregated, + ); + } + } + } + + (rollup, grouping, aggregated) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::Value; + use std::collections::HashMap; + + fn base_config() -> IntermediateAggConfig { + IntermediateAggConfig { + aggregation_type: "MultipleIncrease".to_string(), + aggregation_sub_type: "rate".to_string(), + window_type: "tumbling".to_string(), + window_size: 300, + slide_interval: 300, + tumbling_window_size: 300, + spatial_filter: String::new(), + metric: "http_requests_total".to_string(), + table_name: None, + value_column: None, + parameters: HashMap::new(), + rollup_labels: KeyByLabelNames::new(vec!["instance".to_string()]), + grouping_labels: KeyByLabelNames::empty(), + aggregated_labels: KeyByLabelNames::empty(), + } + } + + #[test] + fn identifying_key_is_stable() { + let cfg = base_config(); + assert_eq!(cfg.identifying_key(), cfg.identifying_key()); + } + + #[test] + fn identical_configs_have_same_key() { + assert_eq!( + base_config().identifying_key(), + base_config().identifying_key() + ); + } + + #[test] + fn different_aggregation_type_produces_different_key() { + let cfg1 = base_config(); + let mut cfg2 = base_config(); + cfg2.aggregation_type = "DatasketchesKLL".to_string(); + assert_ne!(cfg1.identifying_key(), cfg2.identifying_key()); + } + + #[test] + fn different_window_size_produces_different_key() { + let cfg1 = base_config(); + let mut cfg2 = base_config(); + cfg2.window_size = 60; + assert_ne!(cfg1.identifying_key(), cfg2.identifying_key()); + } + + #[test] + fn different_rollup_labels_produce_different_key() { + let cfg1 = base_config(); + let mut cfg2 = base_config(); + cfg2.rollup_labels = KeyByLabelNames::new(vec!["job".to_string()]); + assert_ne!(cfg1.identifying_key(), cfg2.identifying_key()); + } + + #[test] + fn parameter_insertion_order_does_not_affect_key() { + let mut cfg1 = base_config(); + let mut cfg2 = base_config(); + cfg1.parameters + .insert("depth".to_string(), Value::Number(3.into())); + cfg1.parameters + .insert("width".to_string(), Value::Number(1024.into())); + cfg2.parameters + .insert("width".to_string(), Value::Number(1024.into())); + cfg2.parameters + .insert("depth".to_string(), Value::Number(3.into())); + assert_eq!(cfg1.identifying_key(), cfg2.identifying_key()); + } +} diff --git a/asap-planner-rs/tests/comparison/comparator.py b/asap-planner-rs/tests/comparison/comparator.py new file mode 100644 index 0000000..3bef1e8 --- /dev/null +++ b/asap-planner-rs/tests/comparison/comparator.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 +"""Semantic comparison of Python and Rust planner outputs.""" +import yaml +import sys +from typing import List, Optional + + +def normalize_labels(labels) -> frozenset: + if labels is None: + return frozenset() + if isinstance(labels, list): + return frozenset(labels) + return frozenset() + + +def agg_signature(agg: dict) -> tuple: + """Canonical signature for matching aggregations across Python/Rust outputs.""" + return ( + agg.get("aggregationType", ""), + agg.get("aggregationSubType", ""), + agg.get("metric", "") or agg.get("table_name", ""), + agg.get("spatialFilter", ""), + normalize_labels(agg.get("labels", {}).get("rollup")), + normalize_labels(agg.get("labels", {}).get("grouping")), + normalize_labels(agg.get("labels", {}).get("aggregated")), + ) + + +def compare_streaming_configs(py_path: str, rs_path: str) -> List[str]: + """Compare two streaming_config.yaml files. Returns list of error messages.""" + errors = [] + with open(py_path) as f: + py = yaml.safe_load(f) + with open(rs_path) as f: + rs = yaml.safe_load(f) + + py_aggs = py.get("aggregations", []) + rs_aggs = rs.get("aggregations", []) + + if len(py_aggs) != len(rs_aggs): + errors.append(f"Aggregation count mismatch: Python={len(py_aggs)}, Rust={len(rs_aggs)}") + return errors + + py_by_sig = {agg_signature(a): a for a in py_aggs} + rs_by_sig = {agg_signature(a): a for a in rs_aggs} + + for sig, py_agg in py_by_sig.items(): + if sig not in rs_by_sig: + errors.append(f"Aggregation missing in Rust: {sig}") + continue + rs_agg = rs_by_sig[sig] + + for field in ["windowType", "windowSize", "slideInterval", "tumblingWindowSize"]: + if py_agg.get(field) != rs_agg.get(field): + errors.append( + f"Field '{field}' mismatch for {sig}: Python={py_agg.get(field)}, Rust={rs_agg.get(field)}" + ) + + if py_agg.get("parameters") != rs_agg.get("parameters"): + errors.append( + f"Parameters mismatch for {sig}: Python={py_agg.get('parameters')}, Rust={rs_agg.get('parameters')}" + ) + + return errors + + +def compare_inference_configs(py_path: str, rs_path: str) -> List[str]: + """Compare two inference_config.yaml files. Returns list of error messages.""" + errors = [] + with open(py_path) as f: + py = yaml.safe_load(f) + with open(rs_path) as f: + rs = yaml.safe_load(f) + + py_policy = py.get("cleanup_policy", {}).get("name", "") + rs_policy = rs.get("cleanup_policy", {}).get("name", "") + if py_policy != rs_policy: + errors.append(f"Cleanup policy mismatch: Python={py_policy}, Rust={rs_policy}") + + py_queries = py.get("queries", []) + rs_queries = rs.get("queries", []) + + if len(py_queries) != len(rs_queries): + errors.append(f"Query count mismatch: Python={len(py_queries)}, Rust={len(rs_queries)}") + return errors + + py_by_q = {q["query"]: q for q in py_queries} + rs_by_q = {q["query"]: q for q in rs_queries} + + for query, py_q in py_by_q.items(): + if query not in rs_by_q: + errors.append(f"Query missing in Rust output: {query}") + continue + rs_q = rs_by_q[query] + + py_agg_count = len(py_q.get("aggregations", [])) + rs_agg_count = len(rs_q.get("aggregations", [])) + if py_agg_count != rs_agg_count: + errors.append( + f"Aggregation count mismatch for query '{query}': Python={py_agg_count}, Rust={rs_agg_count}" + ) + + return errors + + +if __name__ == "__main__": + if len(sys.argv) != 5: + print("Usage: comparator.py ") + sys.exit(1) + + errors = compare_streaming_configs(sys.argv[1], sys.argv[3]) + errors += compare_inference_configs(sys.argv[2], sys.argv[4]) + + if errors: + for e in errors: + print(f" DIFF: {e}") + sys.exit(1) + else: + print(" MATCH") diff --git a/asap-planner-rs/tests/comparison/master_test_runner.py b/asap-planner-rs/tests/comparison/master_test_runner.py new file mode 100644 index 0000000..c288a62 --- /dev/null +++ b/asap-planner-rs/tests/comparison/master_test_runner.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 +"""Run all comparison tests between Python and Rust planner.""" +import os +import sys +import json +import shutil +import subprocess + +REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../..")) +RUST_BINARY = os.path.join(REPO_ROOT, "target", "release", "asap-planner") +COMPARATOR = os.path.join(os.path.dirname(__file__), "comparator.py") +TEST_CASES_PATH = os.path.join(os.path.dirname(__file__), "test_cases.json") +COMPARISON_DIR = os.path.dirname(__file__) +TEST_OUTPUTS_DIR = os.path.join(os.path.dirname(__file__), "test_outputs") + + +def run_python_planner(tc: dict, output_dir: str) -> bool: + planner_script = os.path.join(REPO_ROOT, "asap-planner", "main_controller.py") + args = [ + sys.executable, + planner_script, + "--input_config", + os.path.join(COMPARISON_DIR, tc["input_config"]), + "--output_dir", + output_dir, + "--prometheus_scrape_interval", + str(tc["prometheus_scrape_interval"]), + "--streaming_engine", + tc["streaming_engine"], + ] + if tc.get("enable_punting"): + args.append("--enable-punting") + if tc.get("range_duration", 0) > 0: + args += ["--range-duration", str(tc["range_duration"])] + if tc.get("step", 0) > 0: + args += ["--step", str(tc["step"])] + + env = os.environ.copy() + py_utils = os.path.join( + REPO_ROOT, "asap-common", "dependencies", "py", "promql_utilities" + ) + planner_path = os.path.join(REPO_ROOT, "asap-planner") + env["PYTHONPATH"] = f"{py_utils}:{planner_path}:{env.get('PYTHONPATH', '')}" + + result = subprocess.run(args, env=env, capture_output=True, text=True) + if result.returncode != 0: + print(f" Python planner failed:\n{result.stderr}") + return False + return True + + +def run_rust_planner(tc: dict, output_dir: str) -> bool: + args = [ + RUST_BINARY, + "--input_config", + os.path.join(COMPARISON_DIR, tc["input_config"]), + "--output_dir", + output_dir, + "--prometheus_scrape_interval", + str(tc["prometheus_scrape_interval"]), + "--streaming_engine", + tc["streaming_engine"], + ] + if tc.get("enable_punting"): + args.append("--enable-punting") + if tc.get("range_duration", 0) > 0: + args += ["--range-duration", str(tc["range_duration"])] + if tc.get("step", 0) > 0: + args += ["--step", str(tc["step"])] + + result = subprocess.run(args, capture_output=True, text=True) + if result.returncode != 0: + print(f" Rust planner failed:\n{result.stderr}") + return False + return True + + +def main(): + # Build Rust binary + print("Building Rust planner...") + build = subprocess.run( + ["cargo", "build", "-p", "asap_planner", "--release"], + cwd=REPO_ROOT, + capture_output=True, + text=True, + ) + if build.returncode != 0: + print(f"Build failed:\n{build.stderr}") + sys.exit(1) + print("Build OK\n") + + with open(TEST_CASES_PATH) as f: + test_cases = json.load(f)["test_cases"] + + passed = 0 + failed = 0 + + # Clear and recreate test_outputs dir + if os.path.exists(TEST_OUTPUTS_DIR): + shutil.rmtree(TEST_OUTPUTS_DIR) + os.makedirs(TEST_OUTPUTS_DIR) + + for tc in test_cases: + print(f"Test: {tc['id']}") + tc_out = os.path.join(TEST_OUTPUTS_DIR, tc["id"]) + py_dir = os.path.join(tc_out, "python") + rs_dir = os.path.join(tc_out, "rust") + os.makedirs(py_dir) + os.makedirs(rs_dir) + + if not run_python_planner(tc, py_dir): + print(" [FAIL] Python planner error") + failed += 1 + continue + if not run_rust_planner(tc, rs_dir): + print(" [FAIL] Rust planner error") + failed += 1 + continue + + result = subprocess.run( + [ + sys.executable, + COMPARATOR, + os.path.join(py_dir, "streaming_config.yaml"), + os.path.join(py_dir, "inference_config.yaml"), + os.path.join(rs_dir, "streaming_config.yaml"), + os.path.join(rs_dir, "inference_config.yaml"), + ], + capture_output=True, + text=True, + ) + if result.returncode == 0: + print(" [PASS]") + passed += 1 + else: + print(" [FAIL]") + print(result.stdout) + failed += 1 + + print(f"\nResults: {passed}/{passed + failed} passed") + if failed > 0: + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/asap-planner-rs/tests/comparison/test_cases.json b/asap-planner-rs/tests/comparison/test_cases.json new file mode 100644 index 0000000..c9727e4 --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_cases.json @@ -0,0 +1,643 @@ +{ + "test_cases": [ + { + "id": "mixed_workload", + "input_config": "test_data/configs/mixed_workload.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "range_query", + "input_config": "test_data/configs/range_query.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 3600, + "step": 60 + }, + { + "id": "cleanup_circular", + "input_config": "test_data/configs/cleanup_circular.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "quantile_over_time", + "input_config": "test_data/configs/quantile_over_time.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "spatial_quantile", + "input_config": "test_data/configs/spatial_quantile.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "rate_increase", + "input_config": "test_data/configs/rate_increase.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "topk", + "input_config": "test_data/configs/topk.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "cleanup_read_based", + "input_config": "test_data/configs/cleanup_read_based.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "deduplicated", + "input_config": "test_data/configs/deduplicated.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "increase", + "input_config": "test_data/configs/increase.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "sum_over_time", + "input_config": "test_data/configs/sum_over_time.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "sum_by", + "input_config": "test_data/configs/sum_by.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "temporal_overlapping", + "input_config": "test_data/configs/temporal_overlapping.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "sum_by_overlapping", + "input_config": "test_data/configs/sum_by_overlapping.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "arroyo_optimized_test_generated_1_run1", + "input_config": "test_data/configs/experiments/arroyo_optimized_test_generated_1_run1.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": true, + "range_duration": 0, + "step": 0 + }, + { + "id": "arroyo_optimized_test_generated_1_run1_only_temporal", + "input_config": "test_data/configs/experiments/arroyo_optimized_test_generated_1_run1_only_temporal.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": true, + "range_duration": 0, + "step": 0 + }, + { + "id": "arroyo_optimized_test_generated_1_run2_only_temporal", + "input_config": "test_data/configs/experiments/arroyo_optimized_test_generated_1_run2_only_temporal.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": true, + "range_duration": 0, + "step": 0 + }, + { + "id": "bad_workload_1_test_1", + "input_config": "test_data/configs/experiments/bad_workload_1_test_1.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "debugging_qot_1", + "input_config": "test_data/configs/experiments/debugging_qot_1.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "debugging_rate_1", + "input_config": "test_data/configs/experiments/debugging_rate_1.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "final_generated_workload_prometheus_20251204_231532_10", + "input_config": "test_data/configs/experiments/final_generated_workload_prometheus_20251204_231532_10.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": true, + "range_duration": 0, + "step": 0 + }, + { + "id": "final_generated_workload_prometheus_20251204_231532_2", + "input_config": "test_data/configs/experiments/final_generated_workload_prometheus_20251204_231532_2.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": true, + "range_duration": 0, + "step": 0 + }, + { + "id": "final_generated_workload_prometheus_20251204_231532_3", + "input_config": "test_data/configs/experiments/final_generated_workload_prometheus_20251204_231532_3.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": true, + "range_duration": 0, + "step": 0 + }, + { + "id": "final_generated_workload_prometheus_20251204_231532_4", + "input_config": "test_data/configs/experiments/final_generated_workload_prometheus_20251204_231532_4.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": true, + "range_duration": 0, + "step": 0 + }, + { + "id": "final_generated_workload_prometheus_20251204_231532_5", + "input_config": "test_data/configs/experiments/final_generated_workload_prometheus_20251204_231532_5.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": true, + "range_duration": 0, + "step": 0 + }, + { + "id": "final_generated_workload_prometheus_20251204_231532_6", + "input_config": "test_data/configs/experiments/final_generated_workload_prometheus_20251204_231532_6.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": true, + "range_duration": 0, + "step": 0 + }, + { + "id": "final_generated_workload_prometheus_20251204_231532_7", + "input_config": "test_data/configs/experiments/final_generated_workload_prometheus_20251204_231532_7.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": true, + "range_duration": 0, + "step": 0 + }, + { + "id": "final_generated_workload_prometheus_20251204_231532_8", + "input_config": "test_data/configs/experiments/final_generated_workload_prometheus_20251204_231532_8.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": true, + "range_duration": 0, + "step": 0 + }, + { + "id": "final_generated_workload_prometheus_20251204_231532_9", + "input_config": "test_data/configs/experiments/final_generated_workload_prometheus_20251204_231532_9.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": true, + "range_duration": 0, + "step": 0 + }, + { + "id": "generated_workload_prometheus_20251204_231532_1_no_parallel", + "input_config": "test_data/configs/experiments/generated_workload_prometheus_20251204_231532_1_no_parallel.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "generated_workload_victoriametrics_20251204_231532_10", + "input_config": "test_data/configs/experiments/generated_workload_victoriametrics_20251204_231532_10.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "generated_workload_victoriametrics_20251204_231532_2", + "input_config": "test_data/configs/experiments/generated_workload_victoriametrics_20251204_231532_2.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "generated_workload_victoriametrics_20251204_231532_3", + "input_config": "test_data/configs/experiments/generated_workload_victoriametrics_20251204_231532_3.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "generated_workload_victoriametrics_20251204_231532_4", + "input_config": "test_data/configs/experiments/generated_workload_victoriametrics_20251204_231532_4.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "generated_workload_victoriametrics_20251204_231532_5", + "input_config": "test_data/configs/experiments/generated_workload_victoriametrics_20251204_231532_5.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "generated_workload_victoriametrics_20251204_231532_6", + "input_config": "test_data/configs/experiments/generated_workload_victoriametrics_20251204_231532_6.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "generated_workload_victoriametrics_20251204_231532_7", + "input_config": "test_data/configs/experiments/generated_workload_victoriametrics_20251204_231532_7.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "generated_workload_victoriametrics_20251204_231532_8", + "input_config": "test_data/configs/experiments/generated_workload_victoriametrics_20251204_231532_8.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "generated_workload_victoriametrics_20251204_231532_9", + "input_config": "test_data/configs/experiments/generated_workload_victoriametrics_20251204_231532_9.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "non_quantile_1_test", + "input_config": "test_data/configs/experiments/non_quantile_1_test.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "non_quantile_1s_4queries_100valuesperlabel_2labels", + "input_config": "test_data/configs/experiments/non_quantile_1s_4queries_100valuesperlabel_2labels.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "non_quantile_30m_10_card_1000_1", + "input_config": "test_data/configs/experiments/non_quantile_30m_10_card_1000_1.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "non_quantile_30m_30_card_1000_1", + "input_config": "test_data/configs/experiments/non_quantile_30m_30_card_1000_1.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "non_quantile_30m_60_card_1000_1", + "input_config": "test_data/configs/experiments/non_quantile_30m_60_card_1000_1.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "post_optimization_generated_workload_prometheus_20251204_231532_4_nopunt_debug4", + "input_config": "test_data/configs/experiments/post_optimization_generated_workload_prometheus_20251204_231532_4_nopunt_debug4.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": true, + "range_duration": 0, + "step": 0 + }, + { + "id": "qot_120m_1_card_2_0", + "input_config": "test_data/configs/experiments/qot_120m_1_card_2_0.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "qot_120m_1_card_2_7", + "input_config": "test_data/configs/experiments/qot_120m_1_card_2_7.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "qot_15m_1_card_2_0", + "input_config": "test_data/configs/experiments/qot_15m_1_card_2_0.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "qot_15m_1_card_2_7_rerun", + "input_config": "test_data/configs/experiments/qot_15m_1_card_2_7_rerun.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "qot_15m_1s_card_1000", + "input_config": "test_data/configs/experiments/qot_15m_1s_card_1000.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "qot_15m_1s_optimizedmerge_1", + "input_config": "test_data/configs/experiments/qot_15m_1s_optimizedmerge_1.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "qot_15m_300s_card_1000_2", + "input_config": "test_data/configs/experiments/qot_15m_300s_card_1000_2.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "qot_15m_60s_card_1000_2", + "input_config": "test_data/configs/experiments/qot_15m_60s_card_1000_2.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "qot_15m_900s_card_1000", + "input_config": "test_data/configs/experiments/qot_15m_900s_card_1000.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "qot_1m_1_card_2_0", + "input_config": "test_data/configs/experiments/qot_1m_1_card_2_0.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "qot_30m_15m", + "input_config": "test_data/configs/experiments/qot_30m_15m.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "qot_30m_1_card_2_0", + "input_config": "test_data/configs/experiments/qot_30m_1_card_2_0.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "qot_30m_1m", + "input_config": "test_data/configs/experiments/qot_30m_1m.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "qot_30m_30m", + "input_config": "test_data/configs/experiments/qot_30m_30m.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "qot_30m_5m", + "input_config": "test_data/configs/experiments/qot_30m_5m.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "qot_60m_1_card_2_0", + "input_config": "test_data/configs/experiments/qot_60m_1_card_2_0.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "quantile_1s_10queries_100valuesperlabel_2labels", + "input_config": "test_data/configs/experiments/quantile_1s_10queries_100valuesperlabel_2labels.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "quantile_1s_10queries_10valuesperlabel", + "input_config": "test_data/configs/experiments/quantile_1s_10queries_10valuesperlabel.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "quantile_1s_10queries_10valuesperlabel_3_K_10", + "input_config": "test_data/configs/experiments/quantile_1s_10queries_10valuesperlabel_3_K_10.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "quantile_1s_10queries_10valuesperlabel_3_K_100", + "input_config": "test_data/configs/experiments/quantile_1s_10queries_10valuesperlabel_3_K_100.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "quantile_1s_10queries_10valuesperlabel_3_K_20", + "input_config": "test_data/configs/experiments/quantile_1s_10queries_10valuesperlabel_3_K_20.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "quantile_1s_10queries_10valuesperlabel_3_K_50", + "input_config": "test_data/configs/experiments/quantile_1s_10queries_10valuesperlabel_3_K_50.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "quantile_1s_10queries_10valuesperlabel_3_K_8", + "input_config": "test_data/configs/experiments/quantile_1s_10queries_10valuesperlabel_3_K_8.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "result_1_collapsable_1", + "input_config": "test_data/configs/experiments/result_1_collapsable_1.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "result_1_collapsable_3", + "input_config": "test_data/configs/experiments/result_1_collapsable_3.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + }, + { + "id": "result_1_non_quantile_1s_1_vm", + "input_config": "test_data/configs/experiments/result_1_non_quantile_1s_1_vm.yaml", + "prometheus_scrape_interval": 15, + "streaming_engine": "arroyo", + "enable_punting": false, + "range_duration": 0, + "step": 0 + } + ] +} diff --git a/asap-planner-rs/tests/comparison/test_data/configs/cleanup_circular.yaml b/asap-planner-rs/tests/comparison/test_data/configs/cleanup_circular.yaml new file mode 100644 index 0000000..54f1ea4 --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/cleanup_circular.yaml @@ -0,0 +1,13 @@ +query_groups: + - id: 1 + queries: + - "rate(http_requests_total[5m])" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "http_requests_total" + labels: ["instance", "job", "method", "status"] +aggregate_cleanup: + policy: "circular_buffer" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/cleanup_read_based.yaml b/asap-planner-rs/tests/comparison/test_data/configs/cleanup_read_based.yaml new file mode 100644 index 0000000..b3b332e --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/cleanup_read_based.yaml @@ -0,0 +1,13 @@ +query_groups: + - id: 1 + queries: + - "rate(http_requests_total[5m])" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "http_requests_total" + labels: ["instance", "job", "method", "status"] +aggregate_cleanup: + policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/deduplicated.yaml b/asap-planner-rs/tests/comparison/test_data/configs/deduplicated.yaml new file mode 100644 index 0000000..1af476f --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/deduplicated.yaml @@ -0,0 +1,22 @@ +# Two different query strings that produce the same aggregation config +# rate and increase both map to MultipleIncrease with same window params +query_groups: + - id: 1 + queries: + - "rate(http_requests_total[5m])" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 + - id: 2 + queries: + - "increase(http_requests_total[5m])" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "http_requests_total" + labels: ["instance", "job", "method", "status"] +aggregate_cleanup: + policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/increase.yaml b/asap-planner-rs/tests/comparison/test_data/configs/increase.yaml new file mode 100644 index 0000000..2a018e9 --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/increase.yaml @@ -0,0 +1,13 @@ +query_groups: + - id: 1 + queries: + - "increase(http_requests_total[5m])" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "http_requests_total" + labels: ["instance", "job", "method", "status"] +aggregate_cleanup: + policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/mixed_workload.yaml b/asap-planner-rs/tests/comparison/test_data/configs/mixed_workload.yaml new file mode 100644 index 0000000..105be71 --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/mixed_workload.yaml @@ -0,0 +1,14 @@ +query_groups: + - id: 1 + queries: + - "rate(http_requests_total[5m])" + - "sum by (job) (rate(http_requests_total[5m]))" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "http_requests_total" + labels: ["instance", "job", "method", "status"] +aggregate_cleanup: + policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/quantile_over_time.yaml b/asap-planner-rs/tests/comparison/test_data/configs/quantile_over_time.yaml new file mode 100644 index 0000000..eb4e8bc --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/quantile_over_time.yaml @@ -0,0 +1,13 @@ +query_groups: + - id: 1 + queries: + - "quantile_over_time(0.99, http_requests_total[5m])" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "http_requests_total" + labels: ["instance", "job", "method", "status"] +aggregate_cleanup: + policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/range_query.yaml b/asap-planner-rs/tests/comparison/test_data/configs/range_query.yaml new file mode 100644 index 0000000..b3b332e --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/range_query.yaml @@ -0,0 +1,13 @@ +query_groups: + - id: 1 + queries: + - "rate(http_requests_total[5m])" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "http_requests_total" + labels: ["instance", "job", "method", "status"] +aggregate_cleanup: + policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/rate_increase.yaml b/asap-planner-rs/tests/comparison/test_data/configs/rate_increase.yaml new file mode 100644 index 0000000..b3b332e --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/rate_increase.yaml @@ -0,0 +1,13 @@ +query_groups: + - id: 1 + queries: + - "rate(http_requests_total[5m])" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "http_requests_total" + labels: ["instance", "job", "method", "status"] +aggregate_cleanup: + policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/spatial_quantile.yaml b/asap-planner-rs/tests/comparison/test_data/configs/spatial_quantile.yaml new file mode 100644 index 0000000..fc1e1eb --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/spatial_quantile.yaml @@ -0,0 +1,13 @@ +query_groups: + - id: 1 + queries: + - "quantile by (job) (0.99, http_requests_total)" + repetition_delay: 60 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "http_requests_total" + labels: ["instance", "job", "method", "status"] +aggregate_cleanup: + policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/sum_by.yaml b/asap-planner-rs/tests/comparison/test_data/configs/sum_by.yaml new file mode 100644 index 0000000..b1c8ddd --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/sum_by.yaml @@ -0,0 +1,13 @@ +query_groups: + - id: 1 + queries: + - "sum by (job, method) (http_requests_total)" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "http_requests_total" + labels: ["instance", "job", "method", "status"] +aggregate_cleanup: + policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/sum_by_overlapping.yaml b/asap-planner-rs/tests/comparison/test_data/configs/sum_by_overlapping.yaml new file mode 100644 index 0000000..6e777e6 --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/sum_by_overlapping.yaml @@ -0,0 +1,13 @@ +query_groups: + - id: 1 + queries: + - "sum by (job, method) (http_requests_total)" + repetition_delay: 60 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "http_requests_total" + labels: ["instance", "job", "method", "status"] +aggregate_cleanup: + policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/sum_over_time.yaml b/asap-planner-rs/tests/comparison/test_data/configs/sum_over_time.yaml new file mode 100644 index 0000000..ca7969c --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/sum_over_time.yaml @@ -0,0 +1,13 @@ +query_groups: + - id: 1 + queries: + - "sum_over_time(http_requests_total[5m])" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "http_requests_total" + labels: ["instance", "job", "method", "status"] +aggregate_cleanup: + policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/temporal_overlapping.yaml b/asap-planner-rs/tests/comparison/test_data/configs/temporal_overlapping.yaml new file mode 100644 index 0000000..0049e63 --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/temporal_overlapping.yaml @@ -0,0 +1,16 @@ +query_groups: + - id: 1 + queries: + - "rate(http_requests_total[5m])" + - "increase(http_requests_total[5m])" + - "sum_over_time(http_requests_total[5m])" + - "quantile_over_time(0.99, http_requests_total[5m])" + repetition_delay: 60 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "http_requests_total" + labels: ["instance", "job", "method", "status"] +aggregate_cleanup: + policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/topk.yaml b/asap-planner-rs/tests/comparison/test_data/configs/topk.yaml new file mode 100644 index 0000000..4aa2b6e --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/topk.yaml @@ -0,0 +1,13 @@ +query_groups: + - id: 1 + queries: + - "topk(5, http_requests_total)" + repetition_delay: 60 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "http_requests_total" + labels: ["instance", "job", "method", "status"] +aggregate_cleanup: + policy: "read_based" diff --git a/asap-planner-rs/tests/integration.rs b/asap-planner-rs/tests/integration.rs new file mode 100644 index 0000000..417faca --- /dev/null +++ b/asap-planner-rs/tests/integration.rs @@ -0,0 +1,378 @@ +use asap_planner::{Controller, ControllerError, RuntimeOptions, StreamingEngine}; +use std::path::Path; + +fn arroyo_opts() -> RuntimeOptions { + RuntimeOptions { + prometheus_scrape_interval: 15, + streaming_engine: StreamingEngine::Arroyo, + enable_punting: false, + range_duration: 0, + step: 0, + } +} + +#[test] +fn quantile_over_time_produces_kll() { + // quantile_over_time groups by all labels → 1 DatasketchesKLL config + // Arroyo/Flink maintains one sketch per unique label-value combination at runtime + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/quantile_over_time.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + assert_eq!(out.streaming_aggregation_count(), 1); + assert!(out.has_aggregation_type("DatasketchesKLL")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); +} + +#[test] +fn rate_produces_multiple_increase_only() { + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/rate_increase.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + assert_eq!(out.streaming_aggregation_count(), 1); + assert!(out.has_aggregation_type("MultipleIncrease")); +} + +#[test] +fn only_spatial_window_equals_scrape_interval() { + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/spatial_quantile.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + assert!(out.all_tumbling_window_sizes_eq(15)); +} + +#[test] +fn duplicate_aggregation_configs_are_deduped() { + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/deduplicated.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + assert_eq!(out.streaming_aggregation_count(), 1); + assert_eq!(out.inference_query_count(), 2); +} + +#[test] +fn topk_produces_count_min_sketch_with_heap() { + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/topk.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + assert!(out.has_aggregation_type("CountMinSketchWithHeap")); +} + +#[test] +fn range_query_uses_effective_repeat() { + let opts = RuntimeOptions { + range_duration: 3600, + step: 30, + ..arroyo_opts() + }; + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/range_query.yaml"), + opts, + ) + .unwrap(); + let out = c.generate().unwrap(); + assert!(out.all_tumbling_window_sizes_leq(30)); +} + +#[test] +fn output_files_written_to_dir() { + let dir = tempfile::tempdir().unwrap(); + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/mixed_workload.yaml"), + arroyo_opts(), + ) + .unwrap(); + c.generate_to_dir(dir.path()).unwrap(); + assert!(dir.path().join("streaming_config.yaml").exists()); + assert!(dir.path().join("inference_config.yaml").exists()); +} + +#[test] +fn rate_tumbling_window_size_equals_effective_repeat() { + // For range queries, effective_repeat = min(t_repeat=300, step=30) = 30 + // Tumbling window size must equal effective_repeat (sliding is always disabled) + let opts = RuntimeOptions { + range_duration: 3600, + step: 30, + ..arroyo_opts() + }; + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/rate_increase.yaml"), + opts, + ) + .unwrap(); + let out = c.generate().unwrap(); + assert!(out.all_tumbling_window_sizes_eq(30)); +} + +#[test] +fn increase_tumbling_window_size_equals_effective_repeat() { + // effective_repeat = min(t_repeat=300, step=30) = 30 + let opts = RuntimeOptions { + range_duration: 3600, + step: 30, + ..arroyo_opts() + }; + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/increase.yaml"), + opts, + ) + .unwrap(); + let out = c.generate().unwrap(); + assert!(out.has_aggregation_type("MultipleIncrease")); + assert!(out.all_tumbling_window_sizes_eq(30)); +} + +#[test] +fn quantile_over_time_tumbling_window_size_equals_effective_repeat() { + // effective_repeat = min(t_repeat=300, step=30) = 30 + let opts = RuntimeOptions { + range_duration: 3600, + step: 30, + ..arroyo_opts() + }; + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/quantile_over_time.yaml"), + opts, + ) + .unwrap(); + let out = c.generate().unwrap(); + assert!(out.has_aggregation_type("DatasketchesKLL")); + assert!(out.all_tumbling_window_sizes_eq(30)); +} + +#[test] +fn sum_over_time_produces_count_min_sketch_with_delta_set() { + // sum_over_time is Approximate → CountMinSketch + DeltaSetAggregator pairing + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/sum_over_time.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + assert_eq!(out.streaming_aggregation_count(), 2); + assert!(out.has_aggregation_type("CountMinSketch")); + assert!(out.has_aggregation_type("DeltaSetAggregator")); +} + +#[test] +fn sum_by_produces_count_min_sketch_with_delta_set() { + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/sum_by.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + assert_eq!(out.streaming_aggregation_count(), 2); + assert!(out.has_aggregation_type("CountMinSketch")); + assert!(out.has_aggregation_type("DeltaSetAggregator")); +} + +#[test] +fn sum_by_rollup_excludes_groupby_labels() { + // sum by (job, method) → rollup gets labels NOT in by-clause + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/sum_by.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + assert_eq!( + out.aggregation_labels("CountMinSketch", "rollup"), + vec!["instance", "status"] + ); +} + +// --- Error-path tests --- + +#[test] +fn unknown_cleanup_policy_returns_planner_error() { + let yaml = r#" +query_groups: + - id: 1 + queries: + - "rate(http_requests_total[5m])" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "http_requests_total" + labels: ["instance"] +aggregate_cleanup: + policy: "not_a_real_policy" +"#; + let c = Controller::from_yaml(yaml, arroyo_opts()).unwrap(); + assert!(matches!( + c.generate(), + Err(ControllerError::PlannerError(_)) + )); +} + +#[test] +fn duplicate_query_in_same_group_returns_error() { + let yaml = r#" +query_groups: + - id: 1 + queries: + - "rate(http_requests_total[5m])" + - "rate(http_requests_total[5m])" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "http_requests_total" + labels: ["instance"] +"#; + let c = Controller::from_yaml(yaml, arroyo_opts()).unwrap(); + assert!(matches!( + c.generate(), + Err(ControllerError::DuplicateQuery(_)) + )); +} + +#[test] +fn duplicate_query_across_groups_returns_error() { + let yaml = r#" +query_groups: + - id: 1 + queries: + - "rate(http_requests_total[5m])" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 + - id: 2 + queries: + - "rate(http_requests_total[5m])" + repetition_delay: 60 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "http_requests_total" + labels: ["instance"] +"#; + let c = Controller::from_yaml(yaml, arroyo_opts()).unwrap(); + assert!(matches!( + c.generate(), + Err(ControllerError::DuplicateQuery(_)) + )); +} + +#[test] +fn query_referencing_unknown_metric_returns_error() { + let yaml = r#" +query_groups: + - id: 1 + queries: + - "rate(unknown_metric[5m])" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "http_requests_total" + labels: ["instance"] +"#; + let c = Controller::from_yaml(yaml, arroyo_opts()).unwrap(); + assert!(matches!( + c.generate(), + Err(ControllerError::UnknownMetric(_)) + )); +} + +#[test] +fn malformed_yaml_returns_parse_error() { + let result = Controller::from_yaml("{ invalid yaml :", arroyo_opts()); + assert!(matches!(result, Err(ControllerError::YamlParse(_)))); +} + +// --- Overlapping window tests --- +// Queries where range vector > t_repeat: e.g. [5m] repeated every 60s. +// Windows are always tumbling (sliding disabled); the planner emits tumblingWindowSize=t_repeat +// and the cleanup param tells the query engine how many windows to retain to cover the range. + +#[test] +fn temporal_overlapping_window_size_equals_t_repeat() { + // [5m] range repeated every 60s → tumblingWindowSize = 60, not 300 + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/temporal_overlapping.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + assert!(out.all_tumbling_window_sizes_eq(60)); +} + +#[test] +fn temporal_overlapping_all_function_types_present() { + // rate+increase → MultipleIncrease (deduped to 1), sum_over_time → CountMinSketch+DeltaSet, + // quantile_over_time → DatasketchesKLL; 4 unique streaming aggregation configs total + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/temporal_overlapping.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + assert_eq!(out.streaming_aggregation_count(), 4); + assert!(out.has_aggregation_type("MultipleIncrease")); + assert!(out.has_aggregation_type("CountMinSketch")); + assert!(out.has_aggregation_type("DatasketchesKLL")); +} + +#[test] +fn temporal_overlapping_cleanup_param_equals_range_over_repeat() { + // t_lookback = 5m = 300s, effective_repeat = 60s → ceil(300/60) = 5 + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/temporal_overlapping.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + assert_eq!( + out.inference_cleanup_param("rate(http_requests_total[5m])"), + Some(5) + ); + assert_eq!( + out.inference_cleanup_param("increase(http_requests_total[5m])"), + Some(5) + ); + assert_eq!( + out.inference_cleanup_param("sum_over_time(http_requests_total[5m])"), + Some(5) + ); + assert_eq!( + out.inference_cleanup_param("quantile_over_time(0.99, http_requests_total[5m])"), + Some(5) + ); +} + +#[test] +fn temporal_overlapping_rate_increase_deduped() { + // rate and increase produce identical MultipleIncrease configs → 1 streaming entry shared, + // but inference config still tracks 4 queries separately + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/temporal_overlapping.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + assert_eq!(out.inference_query_count(), 4); + assert_eq!(out.streaming_aggregation_count(), 4); // not 5 +} diff --git a/asap-query-engine/Dockerfile b/asap-query-engine/Dockerfile index 190d9c0..6b6e00f 100644 --- a/asap-query-engine/Dockerfile +++ b/asap-query-engine/Dockerfile @@ -17,9 +17,12 @@ COPY asap-common/sketch-core ./asap-common/sketch-core COPY Cargo.toml ./ COPY Cargo.lock ./ COPY asap-query-engine/Cargo.toml ./asap-query-engine/ +COPY asap-planner-rs/Cargo.toml ./asap-planner-rs/ -# Create a dummy main.rs to build dependencies -RUN mkdir -p asap-query-engine/src && echo "fn main() {}" > asap-query-engine/src/main.rs +# Create dummy source files so Cargo can resolve all workspace members +RUN mkdir -p asap-query-engine/src && echo "fn main() {}" > asap-query-engine/src/main.rs && \ + mkdir -p asap-planner-rs/src && echo "fn main() {}" > asap-planner-rs/src/main.rs && \ + echo "pub fn placeholder() {}" >> asap-planner-rs/src/lib.rs # Build dependencies (this layer will be cached) WORKDIR /code/asap-query-engine