diff --git a/Cargo.lock b/Cargo.lock index cc56da4..74f8fcf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -387,6 +387,7 @@ dependencies = [ "pretty_assertions", "promql-parser", "promql_utilities", + "reqwest", "serde", "serde_json", "serde_yaml", diff --git a/asap-planner-rs/Cargo.toml b/asap-planner-rs/Cargo.toml index 8e7288b..bb7499b 100644 --- a/asap-planner-rs/Cargo.toml +++ b/asap-planner-rs/Cargo.toml @@ -27,6 +27,7 @@ clap.workspace = true indexmap.workspace = true chrono.workspace = true promql-parser = "0.5.0" +reqwest = { version = "0.11", features = ["blocking", "json"] } [dev-dependencies] tempfile = "3.20" diff --git a/asap-planner-rs/docker-compose.yml.j2 b/asap-planner-rs/docker-compose.yml.j2 index d36368f..06d42d2 100644 --- a/asap-planner-rs/docker-compose.yml.j2 +++ b/asap-planner-rs/docker-compose.yml.j2 @@ -9,7 +9,8 @@ services: "--input_config", "/app/input/config.yaml", "--output_dir", "/app/output", "--prometheus_scrape_interval", "{{ prometheus_scrape_interval }}", - "--streaming_engine", "{{ streaming_engine }}"{% if punting %}, + "--streaming_engine", "{{ streaming_engine }}", + "--prometheus-url", "{{ prometheus_url }}"{% if punting %}, "--enable-punting"{% endif %} ] restart: no diff --git a/asap-planner-rs/src/config/input.rs b/asap-planner-rs/src/config/input.rs index 431bc77..090e20d 100644 --- a/asap-planner-rs/src/config/input.rs +++ b/asap-planner-rs/src/config/input.rs @@ -1,9 +1,9 @@ use serde::Deserialize; #[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields)] pub struct ControllerConfig { pub query_groups: Vec, - pub metrics: Vec, pub sketch_parameters: Option, pub aggregate_cleanup: Option, } diff --git a/asap-planner-rs/src/error.rs b/asap-planner-rs/src/error.rs index 6806a5e..0274880 100644 --- a/asap-planner-rs/src/error.rs +++ b/asap-planner-rs/src/error.rs @@ -18,4 +18,6 @@ pub enum ControllerError { SqlParse(String), #[error("Unknown table: {0}")] UnknownTable(String), + #[error("Prometheus client error: {0}")] + PrometheusClient(String), } diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index 744e754..77adcbb 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -2,6 +2,7 @@ pub mod config; pub mod error; pub mod output; pub mod planner; +pub mod prometheus_client; pub mod query_log; use serde_yaml::Value as YamlValue; @@ -12,6 +13,8 @@ pub use config::input::SQLControllerConfig; pub use error::ControllerError; pub use output::generator::{GeneratorOutput, PuntedQuery}; pub use output::sql_generator::SQLRuntimeOptions; +pub use prometheus_client::build_schema_from_prometheus; +pub use sketch_db_common::PromQLSchema; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum StreamingEngine { @@ -30,6 +33,7 @@ pub struct RuntimeOptions { pub struct Controller { config: ControllerConfig, + schema: PromQLSchema, options: RuntimeOptions, } @@ -279,45 +283,110 @@ impl SQLController { } impl Controller { - pub fn from_file(path: &Path, opts: RuntimeOptions) -> Result { + /// Build a `Controller` from a config file, fetching metric labels from Prometheus. + /// + /// `prometheus_url` is queried via `GET /api/v1/series?match[]=` for each metric + /// name found in the config's PromQL queries. + pub fn from_file( + path: &Path, + opts: RuntimeOptions, + prometheus_url: &str, + ) -> Result { let yaml_str = std::fs::read_to_string(path)?; - Self::from_yaml(&yaml_str, opts) + let config: ControllerConfig = serde_yaml::from_str(&yaml_str)?; + let all_queries: Vec = config + .query_groups + .iter() + .flat_map(|qg| qg.queries.clone()) + .collect(); + let schema = prometheus_client::build_schema_from_prometheus(prometheus_url, &all_queries)?; + Ok(Self { + config, + schema, + options: opts, + }) + } + + /// Build a `Controller` from a config file with a caller-supplied `PromQLSchema`. + /// + /// Use this when the schema is available without querying Prometheus (e.g. in tests + /// or when the schema is constructed in-process by the caller). + pub fn from_file_with_schema( + path: &Path, + schema: PromQLSchema, + opts: RuntimeOptions, + ) -> Result { + let yaml_str = std::fs::read_to_string(path)?; + let config: ControllerConfig = serde_yaml::from_str(&yaml_str)?; + Ok(Self { + config, + schema, + options: opts, + }) } - pub fn from_yaml(yaml: &str, opts: RuntimeOptions) -> Result { + /// Build a `Controller` from a YAML string with a caller-supplied `PromQLSchema`. + pub fn from_yaml_with_schema( + yaml: &str, + schema: PromQLSchema, + opts: RuntimeOptions, + ) -> Result { let config: ControllerConfig = serde_yaml::from_str(yaml)?; Ok(Self { config, + schema, options: opts, }) } - /// Build a `Controller` from a Prometheus query log file and a metrics config YAML. + /// Build a `Controller` from a Prometheus query log file, fetching metric labels from + /// Prometheus. /// /// - `log_path`: newline-delimited JSON query log (Prometheus `--query.log-file` output) - /// - `metrics_path`: YAML file with a `metrics:` section listing metric names and labels + /// - `prometheus_url`: base URL queried for label discovery pub fn from_query_log( log_path: &Path, - metrics_path: &Path, opts: RuntimeOptions, + prometheus_url: &str, ) -> Result { let entries = query_log::parse_log_file(log_path)?; let (instants, ranges) = query_log::infer_queries(&entries, opts.prometheus_scrape_interval); + let config = query_log::to_controller_config(instants, ranges); + let all_queries: Vec = config + .query_groups + .iter() + .flat_map(|qg| qg.queries.clone()) + .collect(); + let schema = prometheus_client::build_schema_from_prometheus(prometheus_url, &all_queries)?; + Ok(Self { + config, + schema, + options: opts, + }) + } - let metrics_yaml = std::fs::read_to_string(metrics_path)?; - let metrics_config: query_log::MetricsConfig = serde_yaml::from_str(&metrics_yaml)?; - - let config = query_log::to_controller_config(instants, ranges, metrics_config.metrics); - + /// Build a `Controller` from a Prometheus query log file with a caller-supplied `PromQLSchema`. + /// + /// Use this when the schema is available without querying Prometheus (e.g. in tests). + pub fn from_query_log_with_schema( + log_path: &Path, + schema: PromQLSchema, + opts: RuntimeOptions, + ) -> Result { + let entries = query_log::parse_log_file(log_path)?; + let (instants, ranges) = + query_log::infer_queries(&entries, opts.prometheus_scrape_interval); + let config = query_log::to_controller_config(instants, ranges); Ok(Self { config, + schema, options: opts, }) } pub fn generate(&self) -> Result { - let output = output::generator::generate_plan(&self.config, &self.options)?; + let output = output::generator::generate_plan(&self.config, &self.schema, &self.options)?; Ok(PlannerOutput { punted_queries: output.punted_queries, streaming_yaml: output.streaming_yaml, diff --git a/asap-planner-rs/src/main.rs b/asap-planner-rs/src/main.rs index 79a54c0..c274a26 100644 --- a/asap-planner-rs/src/main.rs +++ b/asap-planner-rs/src/main.rs @@ -14,16 +14,17 @@ struct Args { #[arg(long = "query-log", conflicts_with = "input_config")] query_log: Option, - /// Path to a metrics config YAML (required when using --query-log). - #[arg(long = "metrics-config", requires = "query_log")] - metrics_config: Option, - #[arg(long = "output_dir")] output_dir: PathBuf, #[arg(long = "prometheus_scrape_interval", required = false)] prometheus_scrape_interval: Option, + /// Base URL of the Prometheus instance used to auto-infer metric label sets. + /// Required for PromQL mode. Example: http://localhost:9090 + #[arg(long = "prometheus-url", required = false)] + prometheus_url: Option, + #[arg(long = "streaming_engine", value_enum)] streaming_engine: EngineArg, @@ -73,6 +74,9 @@ fn main() -> anyhow::Result<()> { let scrape_interval = args.prometheus_scrape_interval.ok_or_else(|| { anyhow::anyhow!("--prometheus_scrape_interval is required for PromQL mode") })?; + let prometheus_url = args + .prometheus_url + .ok_or_else(|| anyhow::anyhow!("--prometheus-url is required for PromQL mode"))?; let opts = RuntimeOptions { prometheus_scrape_interval: scrape_interval, streaming_engine: engine, @@ -81,12 +85,11 @@ fn main() -> anyhow::Result<()> { step: args.step, }; let controller = match (args.input_config, args.query_log) { - (Some(config_path), None) => Controller::from_file(&config_path, opts)?, + (Some(config_path), None) => { + Controller::from_file(&config_path, opts, &prometheus_url)? + } (None, Some(log_path)) => { - let metrics_path = args - .metrics_config - .expect("--metrics-config is required when using --query-log"); - Controller::from_query_log(&log_path, &metrics_path, opts)? + Controller::from_query_log(&log_path, opts, &prometheus_url)? } _ => anyhow::bail!( "exactly one of --input_config or --query-log must be provided for PromQL mode" diff --git a/asap-planner-rs/src/output/generator.rs b/asap-planner-rs/src/output/generator.rs index bf82157..aaa05a9 100644 --- a/asap-planner-rs/src/output/generator.rs +++ b/asap-planner-rs/src/output/generator.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use promql_utilities::data_model::KeyByLabelNames; use sketch_db_common::enums::CleanupPolicy; +use sketch_db_common::PromQLSchema; use crate::config::input::ControllerConfig; use crate::error::ControllerError; @@ -17,14 +18,10 @@ type LeafEntries = Vec<(String, Vec<(String, Option)>)>; /// Run the full planning pipeline and produce YAML outputs pub fn generate_plan( controller_config: &ControllerConfig, + schema: &PromQLSchema, opts: &RuntimeOptions, ) -> Result { - // Build metric schema - let mut metric_schema = sketch_db_common::PromQLSchema::new(); - for md in &controller_config.metrics { - metric_schema = - metric_schema.add_metric(md.metric.clone(), KeyByLabelNames::new(md.labels.clone())); - } + let metric_schema = schema.clone(); // Determine cleanup policy let cleanup_policy_str = controller_config diff --git a/asap-planner-rs/src/prometheus_client.rs b/asap-planner-rs/src/prometheus_client.rs new file mode 100644 index 0000000..d063452 --- /dev/null +++ b/asap-planner-rs/src/prometheus_client.rs @@ -0,0 +1,154 @@ +use std::collections::HashSet; + +use promql_parser::parser::Expr; +use promql_utilities::data_model::KeyByLabelNames; +use sketch_db_common::PromQLSchema; +use tracing::warn; + +use crate::error::ControllerError; + +/// Walk a PromQL AST and collect all metric names referenced by VectorSelectors. +fn collect_metric_names(expr: &Expr, names: &mut HashSet) { + match expr { + Expr::VectorSelector(vs) => { + if let Some(name) = &vs.name { + names.insert(name.clone()); + } + } + Expr::MatrixSelector(ms) => { + if let Some(name) = &ms.vs.name { + names.insert(name.clone()); + } + } + Expr::Call(call) => { + for arg in &call.args.args { + collect_metric_names(arg, names); + } + } + Expr::Aggregate(agg) => { + collect_metric_names(&agg.expr, names); + } + Expr::Binary(bin) => { + collect_metric_names(&bin.lhs, names); + collect_metric_names(&bin.rhs, names); + } + Expr::Subquery(sq) => { + collect_metric_names(&sq.expr, names); + } + _ => {} + } +} + +/// Extract all unique metric names referenced in a slice of PromQL query strings. +/// Queries that fail to parse are skipped with a warning. +pub fn extract_metric_names(queries: &[String]) -> HashSet { + let mut names = HashSet::new(); + for query in queries { + match promql_parser::parser::parse(query) { + Ok(expr) => collect_metric_names(&expr, &mut names), + Err(e) => warn!( + "Could not parse query {:?} for metric name extraction: {}", + query, e + ), + } + } + names +} + +/// Query Prometheus `GET /api/v1/series?match[]=` and return the set of label key names +/// for that metric, or `None` if no series were found. +/// +/// Internal `__*__` labels (e.g. `__name__`) are excluded from the result. +/// +/// TODO: This queries only the last 5 minutes of series data (Prometheus default when no +/// `start`/`end` parameters are provided). Expand to a configurable lookback window to capture +/// metrics that have not been seen recently. +fn fetch_labels_for_metric( + prometheus_url: &str, + metric_name: &str, +) -> Result>, ControllerError> { + let url = format!("{}/api/v1/series", prometheus_url.trim_end_matches('/')); + let client = reqwest::blocking::Client::new(); + let response = client + .get(&url) + .query(&[("match[]", metric_name)]) + .send() + .map_err(|e| { + ControllerError::PrometheusClient(format!( + "HTTP request failed for metric '{}': {}", + metric_name, e + )) + })?; + + if !response.status().is_success() { + return Err(ControllerError::PrometheusClient(format!( + "Prometheus returned HTTP {} for metric '{}'", + response.status(), + metric_name + ))); + } + + let body: serde_json::Value = response.json().map_err(|e| { + ControllerError::PrometheusClient(format!( + "Failed to parse Prometheus response for metric '{}': {}", + metric_name, e + )) + })?; + + let data = match body.get("data").and_then(|d| d.as_array()) { + Some(arr) => arr, + None => { + warn!( + "Prometheus returned no 'data' array for metric '{}'; skipping", + metric_name + ); + return Ok(None); + } + }; + + if data.is_empty() { + warn!( + "Prometheus returned no series for metric '{}' in the last 5 minutes; skipping", + metric_name + ); + return Ok(None); + } + + // Collect all unique label key names across all returned series, + // filtering out internal __*__ labels. + let mut label_keys: HashSet = HashSet::new(); + for series in data { + if let Some(labels) = series.as_object() { + for key in labels.keys() { + if !key.starts_with("__") { + label_keys.insert(key.clone()); + } + } + } + } + + Ok(Some(label_keys.into_iter().collect())) +} + +/// Build a `PromQLSchema` by querying Prometheus for each metric name found in the given +/// PromQL queries. Metrics with no series in Prometheus are skipped with a warning. +pub fn build_schema_from_prometheus( + prometheus_url: &str, + queries: &[String], +) -> Result { + let metric_names = extract_metric_names(queries); + let mut schema = PromQLSchema::new(); + + for metric_name in metric_names { + match fetch_labels_for_metric(prometheus_url, &metric_name)? { + Some(labels) => { + schema = schema.add_metric(metric_name, KeyByLabelNames::new(labels)); + } + None => { + // Warning already emitted inside fetch_labels_for_metric. + } + } + } + + Ok(schema) +} diff --git a/asap-planner-rs/src/query_log/converter.rs b/asap-planner-rs/src/query_log/converter.rs index 5703d04..86e68c1 100644 --- a/asap-planner-rs/src/query_log/converter.rs +++ b/asap-planner-rs/src/query_log/converter.rs @@ -1,24 +1,13 @@ -use serde::Deserialize; - -use crate::config::input::{ - AggregateCleanupConfig, ControllerConfig, MetricDefinition, QueryGroup, -}; +use crate::config::input::{AggregateCleanupConfig, ControllerConfig, QueryGroup}; use super::frequency::{InstantQueryInfo, RangeQueryInfo}; -/// Subset of ControllerConfig used when loading from a metrics-only YAML file. -#[derive(Deserialize)] -pub struct MetricsConfig { - pub metrics: Vec, -} - -/// Build a `ControllerConfig` from extracted instant and range queries plus a metrics definition. +/// Build a `ControllerConfig` from extracted instant and range queries. /// /// Each query becomes its own `QueryGroup` (one query per group, no SLA fields needed). pub fn to_controller_config( instants: Vec, ranges: Vec, - metrics: Vec, ) -> ControllerConfig { let mut query_groups: Vec = Vec::new(); @@ -46,7 +35,6 @@ pub fn to_controller_config( ControllerConfig { query_groups, - metrics, sketch_parameters: None, aggregate_cleanup: Some(AggregateCleanupConfig { policy: Some("read_based".to_string()), diff --git a/asap-planner-rs/src/query_log/mod.rs b/asap-planner-rs/src/query_log/mod.rs index d198741..135e3fe 100644 --- a/asap-planner-rs/src/query_log/mod.rs +++ b/asap-planner-rs/src/query_log/mod.rs @@ -2,6 +2,6 @@ pub mod converter; pub mod frequency; pub mod parser; -pub use converter::{to_controller_config, MetricsConfig}; +pub use converter::to_controller_config; pub use frequency::{infer_queries, InstantQueryInfo, RangeQueryInfo}; pub use parser::{parse_log_file, LogEntry}; diff --git a/asap-planner-rs/tests/comparison/test_data/configs/cleanup_circular.yaml b/asap-planner-rs/tests/comparison/test_data/configs/cleanup_circular.yaml index 54f1ea4..40d6c1f 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/cleanup_circular.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/cleanup_circular.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "circular_buffer" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/cleanup_read_based.yaml b/asap-planner-rs/tests/comparison/test_data/configs/cleanup_read_based.yaml index b3b332e..2fcd799 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/cleanup_read_based.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/cleanup_read_based.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/deduplicated.yaml b/asap-planner-rs/tests/comparison/test_data/configs/deduplicated.yaml index 1af476f..4fe8897 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/deduplicated.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/deduplicated.yaml @@ -15,8 +15,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/increase.yaml b/asap-planner-rs/tests/comparison/test_data/configs/increase.yaml index 2a018e9..d723557 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/increase.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/increase.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/mixed_workload.yaml b/asap-planner-rs/tests/comparison/test_data/configs/mixed_workload.yaml index 105be71..c4cac06 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/mixed_workload.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/mixed_workload.yaml @@ -7,8 +7,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/quantile_over_time.yaml b/asap-planner-rs/tests/comparison/test_data/configs/quantile_over_time.yaml index eb4e8bc..0d37afb 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/quantile_over_time.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/quantile_over_time.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/range_query.yaml b/asap-planner-rs/tests/comparison/test_data/configs/range_query.yaml index b3b332e..2fcd799 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/range_query.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/range_query.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/rate_increase.yaml b/asap-planner-rs/tests/comparison/test_data/configs/rate_increase.yaml index b3b332e..2fcd799 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/rate_increase.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/rate_increase.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/spatial_quantile.yaml b/asap-planner-rs/tests/comparison/test_data/configs/spatial_quantile.yaml index fc1e1eb..91b9ca9 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/spatial_quantile.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/spatial_quantile.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/sum_by.yaml b/asap-planner-rs/tests/comparison/test_data/configs/sum_by.yaml index b1c8ddd..7c68d86 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/sum_by.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/sum_by.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/sum_by_overlapping.yaml b/asap-planner-rs/tests/comparison/test_data/configs/sum_by_overlapping.yaml index 6e777e6..1324c78 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/sum_by_overlapping.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/sum_by_overlapping.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/sum_over_time.yaml b/asap-planner-rs/tests/comparison/test_data/configs/sum_over_time.yaml index ca7969c..c560bdd 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/sum_over_time.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/sum_over_time.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/temporal_overlapping.yaml b/asap-planner-rs/tests/comparison/test_data/configs/temporal_overlapping.yaml index 0049e63..4e74cb8 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/temporal_overlapping.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/temporal_overlapping.yaml @@ -9,8 +9,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/topk.yaml b/asap-planner-rs/tests/comparison/test_data/configs/topk.yaml index 4aa2b6e..3d996d8 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/topk.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/topk.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/integration.rs b/asap-planner-rs/tests/integration.rs index 3fecdbc..337a57e 100644 --- a/asap-planner-rs/tests/integration.rs +++ b/asap-planner-rs/tests/integration.rs @@ -1,13 +1,39 @@ -use asap_planner::{Controller, ControllerError, RuntimeOptions, StreamingEngine}; +use asap_planner::{Controller, ControllerError, PromQLSchema, RuntimeOptions, StreamingEngine}; +use promql_utilities::data_model::KeyByLabelNames; use std::path::Path; +// ─── helpers ───────────────────────────────────────────────────────────────── + +fn arroyo_opts() -> RuntimeOptions { + RuntimeOptions { + prometheus_scrape_interval: 15, + streaming_engine: StreamingEngine::Arroyo, + enable_punting: false, + range_duration: 0, + step: 0, + } +} + +/// Standard test schema: http_requests_total with [instance, job, method, status]. +fn http_requests_schema() -> PromQLSchema { + PromQLSchema::new().add_metric( + "http_requests_total".to_string(), + KeyByLabelNames::new(vec![ + "instance".to_string(), + "job".to_string(), + "method".to_string(), + "status".to_string(), + ]), + ) +} + // ─── query_log integration tests ───────────────────────────────────────────── #[test] fn query_log_instant_produces_valid_configs() { - let c = Controller::from_query_log( + let c = Controller::from_query_log_with_schema( Path::new("tests/comparison/test_data/query_logs/instant_only.log"), - Path::new("tests/comparison/test_data/metrics/http_requests.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -18,9 +44,9 @@ fn query_log_instant_produces_valid_configs() { #[test] fn query_log_range_produces_valid_configs() { - let c = Controller::from_query_log( + let c = Controller::from_query_log_with_schema( Path::new("tests/comparison/test_data/query_logs/range_only.log"), - Path::new("tests/comparison/test_data/metrics/http_requests.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -31,9 +57,9 @@ fn query_log_range_produces_valid_configs() { #[test] fn query_log_single_occurrence_excluded() { - let c = Controller::from_query_log( + let c = Controller::from_query_log_with_schema( Path::new("tests/comparison/test_data/query_logs/single_occurrence.log"), - Path::new("tests/comparison/test_data/metrics/http_requests.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -44,9 +70,9 @@ fn query_log_single_occurrence_excluded() { #[test] fn query_log_malformed_lines_skipped() { // with_malformed.log has 5 valid entries for rate() interspersed with bad lines - let c = Controller::from_query_log( + let c = Controller::from_query_log_with_schema( Path::new("tests/comparison/test_data/query_logs/with_malformed.log"), - Path::new("tests/comparison/test_data/metrics/http_requests.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -57,9 +83,9 @@ fn query_log_malformed_lines_skipped() { #[test] fn query_log_output_files_written() { let dir = tempfile::tempdir().unwrap(); - let c = Controller::from_query_log( + let c = Controller::from_query_log_with_schema( Path::new("tests/comparison/test_data/query_logs/instant_only.log"), - Path::new("tests/comparison/test_data/metrics/http_requests.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -68,22 +94,13 @@ fn query_log_output_files_written() { assert!(dir.path().join("inference_config.yaml").exists()); } -fn arroyo_opts() -> RuntimeOptions { - RuntimeOptions { - prometheus_scrape_interval: 15, - streaming_engine: StreamingEngine::Arroyo, - enable_punting: false, - range_duration: 0, - step: 0, - } -} - #[test] fn quantile_over_time_produces_kll() { // quantile_over_time groups by all labels → 1 DatasketchesKLL config // Arroyo/Flink maintains one sketch per unique label-value combination at runtime - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/quantile_over_time.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -95,8 +112,9 @@ fn quantile_over_time_produces_kll() { #[test] fn rate_produces_multiple_increase_only() { - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/rate_increase.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -107,8 +125,9 @@ fn rate_produces_multiple_increase_only() { #[test] fn only_spatial_window_equals_scrape_interval() { - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/spatial_quantile.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -118,8 +137,9 @@ fn only_spatial_window_equals_scrape_interval() { #[test] fn duplicate_aggregation_configs_are_deduped() { - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/deduplicated.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -130,8 +150,9 @@ fn duplicate_aggregation_configs_are_deduped() { #[test] fn topk_produces_count_min_sketch_with_heap() { - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/topk.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -146,8 +167,9 @@ fn range_query_uses_effective_repeat() { step: 30, ..arroyo_opts() }; - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/range_query.yaml"), + http_requests_schema(), opts, ) .unwrap(); @@ -158,8 +180,9 @@ fn range_query_uses_effective_repeat() { #[test] fn output_files_written_to_dir() { let dir = tempfile::tempdir().unwrap(); - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/mixed_workload.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -177,8 +200,9 @@ fn rate_tumbling_window_size_equals_effective_repeat() { step: 30, ..arroyo_opts() }; - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/rate_increase.yaml"), + http_requests_schema(), opts, ) .unwrap(); @@ -194,8 +218,9 @@ fn increase_tumbling_window_size_equals_effective_repeat() { step: 30, ..arroyo_opts() }; - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/increase.yaml"), + http_requests_schema(), opts, ) .unwrap(); @@ -212,8 +237,9 @@ fn quantile_over_time_tumbling_window_size_equals_effective_repeat() { step: 30, ..arroyo_opts() }; - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/quantile_over_time.yaml"), + http_requests_schema(), opts, ) .unwrap(); @@ -225,8 +251,9 @@ fn quantile_over_time_tumbling_window_size_equals_effective_repeat() { #[test] fn sum_over_time_produces_count_min_sketch_with_delta_set() { // sum_over_time is Approximate → CountMinSketch + DeltaSetAggregator pairing - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/sum_over_time.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -238,8 +265,9 @@ fn sum_over_time_produces_count_min_sketch_with_delta_set() { #[test] fn sum_by_produces_count_min_sketch_with_delta_set() { - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/sum_by.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -252,8 +280,9 @@ fn sum_by_produces_count_min_sketch_with_delta_set() { #[test] fn sum_by_rollup_excludes_groupby_labels() { // sum by (job, method) → rollup gets labels NOT in by-clause - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/sum_by.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -277,13 +306,10 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance"] aggregate_cleanup: policy: "not_a_real_policy" "#; - let c = Controller::from_yaml(yaml, arroyo_opts()).unwrap(); + let c = Controller::from_yaml_with_schema(yaml, http_requests_schema(), arroyo_opts()).unwrap(); assert!(matches!( c.generate(), Err(ControllerError::PlannerError(_)) @@ -302,11 +328,8 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance"] "#; - let c = Controller::from_yaml(yaml, arroyo_opts()).unwrap(); + let c = Controller::from_yaml_with_schema(yaml, http_requests_schema(), arroyo_opts()).unwrap(); assert!(matches!( c.generate(), Err(ControllerError::DuplicateQuery(_)) @@ -331,11 +354,8 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance"] "#; - let c = Controller::from_yaml(yaml, arroyo_opts()).unwrap(); + let c = Controller::from_yaml_with_schema(yaml, http_requests_schema(), arroyo_opts()).unwrap(); assert!(matches!( c.generate(), Err(ControllerError::DuplicateQuery(_)) @@ -354,11 +374,9 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance"] "#; - let c = Controller::from_yaml(yaml, arroyo_opts()).unwrap(); + // Schema only knows about http_requests_total, not unknown_metric. + let c = Controller::from_yaml_with_schema(yaml, http_requests_schema(), arroyo_opts()).unwrap(); let out = c.generate().unwrap(); assert_eq!(out.inference_query_count(), 0); assert_eq!(out.streaming_aggregation_count(), 0); @@ -366,7 +384,26 @@ metrics: #[test] fn malformed_yaml_returns_parse_error() { - let result = Controller::from_yaml("{ invalid yaml :", arroyo_opts()); + let result = + Controller::from_yaml_with_schema("{ invalid yaml :", PromQLSchema::new(), arroyo_opts()); + assert!(matches!(result, Err(ControllerError::YamlParse(_)))); +} + +#[test] +fn stale_metrics_field_in_yaml_returns_parse_error() { + // Configs that still contain a top-level `metrics:` key must fail loudly + // (deny_unknown_fields is set on ControllerConfig). + let yaml = r#" +query_groups: + - id: 1 + queries: + - "rate(http_requests_total[5m])" + repetition_delay: 300 +metrics: + - metric: "http_requests_total" + labels: ["instance"] +"#; + let result = Controller::from_yaml_with_schema(yaml, http_requests_schema(), arroyo_opts()); assert!(matches!(result, Err(ControllerError::YamlParse(_)))); } @@ -378,8 +415,9 @@ fn malformed_yaml_returns_parse_error() { #[test] fn temporal_overlapping_window_size_equals_t_repeat() { // [5m] range repeated every 60s → windowSize = 60, not 300 - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/temporal_overlapping.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -391,8 +429,9 @@ fn temporal_overlapping_window_size_equals_t_repeat() { fn temporal_overlapping_all_function_types_present() { // rate+increase → MultipleIncrease (deduped to 1), sum_over_time → CountMinSketch+DeltaSet, // quantile_over_time → DatasketchesKLL; 4 unique streaming aggregation configs total - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/temporal_overlapping.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -406,8 +445,9 @@ fn temporal_overlapping_all_function_types_present() { #[test] fn temporal_overlapping_cleanup_param_equals_range_over_repeat() { // t_lookback = 5m = 300s, effective_repeat = 60s → ceil(300/60) = 5 - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/temporal_overlapping.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -501,8 +541,9 @@ fn binary_arithmetic_with_non_acceleratable_arm_produces_no_configs() { fn temporal_overlapping_rate_increase_deduped() { // rate and increase produce identical MultipleIncrease configs → 1 streaming entry shared, // but inference config still tracks 4 queries separately - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/temporal_overlapping.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); diff --git a/asap-quickstart/docker-compose.yml b/asap-quickstart/docker-compose.yml index 8c94352..299b5e8 100644 --- a/asap-quickstart/docker-compose.yml +++ b/asap-quickstart/docker-compose.yml @@ -136,9 +136,6 @@ services: interval: 10s timeout: 5s retries: 5 - depends_on: - asap-summary-ingest: - condition: service_completed_successfully restart: no grafana: @@ -179,12 +176,16 @@ services: - "--input_config=/config/controller-config.yaml" - "--output_dir=/asap-planner-output" - "--prometheus_scrape_interval=1" + - "--prometheus-url=http://prometheus:9090" - "--streaming_engine=arroyo" - "--range-duration=300" - "--step=10" volumes: - ./config/controller-config.yaml:/config/controller-config.yaml:ro - asap-planner-output:/asap-planner-output + depends_on: + prometheus: + condition: service_healthy restart: "no" asap-summary-ingest: @@ -348,7 +349,6 @@ services: - "--dataset=sine" - "--num-labels=3" - "--num-values-per-label=30,30,30" - - "--num-values-per-label=30,30,30" - "--metric-type=gauge" - "--metric-name=sensor_reading" - "--label-names=region,service,host" diff --git a/asap-tools/experiments/experiment_run_e2e.py b/asap-tools/experiments/experiment_run_e2e.py index 446014a..0b2e66a 100644 --- a/asap-tools/experiments/experiment_run_e2e.py +++ b/asap-tools/experiments/experiment_run_e2e.py @@ -318,12 +318,40 @@ def main(cfg: DictConfig): # copy_controller_client_config(args.controller_client_config, local_experiment_dir) if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME: + # The controller queries Prometheus to auto-infer metric labels, so + # exporters and Prometheus must be up and have at least one scrape + # worth of data before the controller runs. + if config.check_exporter_and_queries_exist( + "fake_exporter", cfg.experiment_params + ): + exporter_service.start( + config=exporter_config["exporter_list"]["fake_exporter"], + experiment_output_dir=experiment_output_dir, + local_experiment_dir=local_experiment_dir, + ) + prometheus_service.start( + experiment_output_dir=experiment_output_dir, + local_experiment_dir=local_experiment_dir, + experiment_mode=experiment_mode, + ) + # Wait for two scrape intervals so Prometheus has series to return. + label_discovery_wait = prometheus_scrape_interval * 2 + print( + f"Waiting {label_discovery_wait}s for Prometheus to scrape initial data " + f"before running controller label inference..." + ) + time.sleep(label_discovery_wait) + + prometheus_url = ( + f"http://localhost:{prometheus_service.get_query_endpoint_port()}" + ) controller_service.start( controller_input_file=controller_client_config, prometheus_scrape_interval=prometheus_scrape_interval, streaming_engine=args.streaming_engine, controller_remote_output_dir=CONTROLLER_REMOTE_OUTPUT_DIR, punting=args.controller_punting, + prometheus_url=prometheus_url, ) sync.rsync_controller_config_remote_to_local( provider, diff --git a/asap-tools/experiments/experiment_utils/services/misc.py b/asap-tools/experiments/experiment_utils/services/misc.py index 798e63d..5afaf67 100644 --- a/asap-tools/experiments/experiment_utils/services/misc.py +++ b/asap-tools/experiments/experiment_utils/services/misc.py @@ -198,6 +198,7 @@ def start( streaming_engine: str, controller_remote_output_dir: str, punting: bool, + prometheus_url: str, **kwargs, ) -> None: """ @@ -209,6 +210,7 @@ def start( streaming_engine: Type of streaming engine controller_remote_output_dir: Controller output directory punting: Enable query punting based on performance heuristics + prometheus_url: Base URL of the Prometheus instance for metric label inference **kwargs: Additional configuration """ if self.use_container: @@ -218,6 +220,7 @@ def start( streaming_engine, controller_remote_output_dir, punting, + prometheus_url, ) else: return self._start_bare_metal( @@ -226,6 +229,7 @@ def start( streaming_engine, controller_remote_output_dir, punting, + prometheus_url, ) def _start_bare_metal( @@ -235,12 +239,14 @@ def _start_bare_metal( streaming_engine: str, controller_remote_output_dir: str, punting: bool, + prometheus_url: str, ) -> None: - cmd = "./target/release/asap-planner --input_config {} --prometheus_scrape_interval {} --output_dir {} --streaming_engine {}".format( + cmd = "./target/release/asap-planner --input_config {} --prometheus_scrape_interval {} --output_dir {} --streaming_engine {} --prometheus-url {}".format( controller_input_file, prometheus_scrape_interval, controller_remote_output_dir, streaming_engine, + prometheus_url, ) if punting: cmd += " --enable-punting" @@ -261,6 +267,7 @@ def _start_containerized( streaming_engine: str, controller_remote_output_dir: str, punting: bool, + prometheus_url: str, ): controller_dir = os.path.join( self.provider.get_home_dir(), "code", "asap-planner-rs" @@ -288,6 +295,7 @@ def _start_containerized( generate_cmd += f" --controller-output-dir {controller_remote_output_dir}" generate_cmd += f" --prometheus-scrape-interval {prometheus_scrape_interval}" generate_cmd += f" --streaming-engine {streaming_engine}" + generate_cmd += f" --prometheus-url {prometheus_url}" if punting: generate_cmd += " --punting" diff --git a/asap-tools/experiments/generate_controller_compose.py b/asap-tools/experiments/generate_controller_compose.py index 439185d..63ed78e 100644 --- a/asap-tools/experiments/generate_controller_compose.py +++ b/asap-tools/experiments/generate_controller_compose.py @@ -18,6 +18,7 @@ def generate_compose_file( prometheus_scrape_interval: int, streaming_engine: str, punting: bool, + prometheus_url: str, ): """Generate docker-compose.yml from template with provided variables.""" @@ -41,6 +42,7 @@ def generate_compose_file( "prometheus_scrape_interval": prometheus_scrape_interval, "streaming_engine": streaming_engine, "punting": punting, + "prometheus_url": prometheus_url, } # Render the template @@ -117,6 +119,11 @@ def main(): action="store_true", help="Enable query punting based on performance heuristics", ) + parser.add_argument( + "--prometheus-url", + required=True, + help="Base URL of the Prometheus instance for metric label inference (e.g. http://localhost:9090)", + ) args = parser.parse_args() @@ -130,6 +137,7 @@ def main(): prometheus_scrape_interval=args.prometheus_scrape_interval, streaming_engine=args.streaming_engine, punting=args.punting, + prometheus_url=args.prometheus_url, )