From c66f0243d8bdd73eab27a8cfd56298e05ad5f664 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Mon, 30 Mar 2026 11:43:07 -0400 Subject: [PATCH] Added support for asap-planner-rs to infer query repetitions and config from Prometheus query log --- Cargo.lock | 1 + asap-planner-rs/Cargo.toml | 1 + asap-planner-rs/src/config/input.rs | 9 +- asap-planner-rs/src/lib.rs | 25 ++ asap-planner-rs/src/main.rs | 26 +- asap-planner-rs/src/output/generator.rs | 29 +- asap-planner-rs/src/query_log/converter.rs | 55 ++++ asap-planner-rs/src/query_log/frequency.rs | 277 ++++++++++++++++++ asap-planner-rs/src/query_log/mod.rs | 7 + asap-planner-rs/src/query_log/parser.rs | 114 +++++++ .../test_data/metrics/http_requests.yaml | 3 + .../test_data/query_logs/instant_only.log | 10 + .../test_data/query_logs/range_only.log | 5 + .../query_logs/single_occurrence.log | 2 + .../test_data/query_logs/with_malformed.log | 7 + asap-planner-rs/tests/integration.rs | 77 ++++- .../bootstrap-config-from-query-log.md | 49 ++++ docs/README.md | 1 + 18 files changed, 679 insertions(+), 19 deletions(-) create mode 100644 asap-planner-rs/src/query_log/converter.rs create mode 100644 asap-planner-rs/src/query_log/frequency.rs create mode 100644 asap-planner-rs/src/query_log/mod.rs create mode 100644 asap-planner-rs/src/query_log/parser.rs create mode 100644 asap-planner-rs/tests/comparison/test_data/metrics/http_requests.yaml create mode 100644 asap-planner-rs/tests/comparison/test_data/query_logs/instant_only.log create mode 100644 asap-planner-rs/tests/comparison/test_data/query_logs/range_only.log create mode 100644 asap-planner-rs/tests/comparison/test_data/query_logs/single_occurrence.log create mode 100644 asap-planner-rs/tests/comparison/test_data/query_logs/with_malformed.log create mode 100644 docs/03-how-to-guides/operations/bootstrap-config-from-query-log.md diff --git a/Cargo.lock b/Cargo.lock index 1ebbc7b..8c639a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -381,6 +381,7 @@ name = "asap_planner" version = "0.1.0" dependencies = [ "anyhow", + "chrono", "clap 4.6.0", "indexmap 2.13.0", "pretty_assertions", diff --git a/asap-planner-rs/Cargo.toml b/asap-planner-rs/Cargo.toml index 0c28bc4..6b4d87a 100644 --- a/asap-planner-rs/Cargo.toml +++ b/asap-planner-rs/Cargo.toml @@ -23,6 +23,7 @@ tracing.workspace = true tracing-subscriber.workspace = true clap.workspace = true indexmap.workspace = true +chrono.workspace = true promql-parser = "0.5.0" [dev-dependencies] diff --git a/asap-planner-rs/src/config/input.rs b/asap-planner-rs/src/config/input.rs index b50d0f3..8993db1 100644 --- a/asap-planner-rs/src/config/input.rs +++ b/asap-planner-rs/src/config/input.rs @@ -13,10 +13,17 @@ pub struct QueryGroup { pub id: Option, pub queries: Vec, pub repetition_delay: u64, + #[serde(default)] pub controller_options: ControllerOptions, + /// Per-group step override (seconds). Falls back to `RuntimeOptions::step` when None. + #[serde(default)] + pub step: Option, + /// Per-group range_duration override (seconds). Falls back to `RuntimeOptions::range_duration` when None. + #[serde(default)] + pub range_duration: Option, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Default)] pub struct ControllerOptions { pub accuracy_sla: f64, pub latency_sla: f64, diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index 4f3c346..90ec069 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -2,6 +2,7 @@ pub mod config; pub mod error; pub mod output; pub mod planner; +pub mod query_log; use serde_yaml::Value as YamlValue; use std::path::Path; @@ -178,6 +179,30 @@ impl Controller { }) } + /// Build a `Controller` from a Prometheus query log file and a metrics config YAML. + /// + /// - `log_path`: newline-delimited JSON query log (Prometheus `--query.log-file` output) + /// - `metrics_path`: YAML file with a `metrics:` section listing metric names and labels + pub fn from_query_log( + log_path: &Path, + metrics_path: &Path, + opts: RuntimeOptions, + ) -> Result { + let entries = query_log::parse_log_file(log_path)?; + let (instants, ranges) = + query_log::infer_queries(&entries, opts.prometheus_scrape_interval); + + let metrics_yaml = std::fs::read_to_string(metrics_path)?; + let metrics_config: query_log::MetricsConfig = serde_yaml::from_str(&metrics_yaml)?; + + let config = query_log::to_controller_config(instants, ranges, metrics_config.metrics); + + Ok(Self { + config, + options: opts, + }) + } + pub fn generate(&self) -> Result { let output = output::generator::generate_plan(&self.config, &self.options)?; Ok(PlannerOutput { diff --git a/asap-planner-rs/src/main.rs b/asap-planner-rs/src/main.rs index c1fa0b9..393a7ec 100644 --- a/asap-planner-rs/src/main.rs +++ b/asap-planner-rs/src/main.rs @@ -5,8 +5,17 @@ use std::path::PathBuf; #[derive(Parser, Debug)] #[command(name = "asap-planner", about = "ASAP Query Planner")] struct Args { - #[arg(long = "input_config")] - input_config: PathBuf, + /// Path to a hand-authored YAML workload config. Mutually exclusive with --query-log. + #[arg(long = "input_config", conflicts_with = "query_log")] + input_config: Option, + + /// Path to a Prometheus query log file (newline-delimited JSON). Mutually exclusive with --input_config. + #[arg(long = "query-log", conflicts_with = "input_config")] + query_log: Option, + + /// Path to a metrics config YAML (required when using --query-log). + #[arg(long = "metrics-config", requires = "query_log")] + metrics_config: Option, #[arg(long = "output_dir")] output_dir: PathBuf, @@ -60,9 +69,18 @@ fn main() -> anyhow::Result<()> { step: args.step, }; - let controller = Controller::from_file(&args.input_config, opts)?; - controller.generate_to_dir(&args.output_dir)?; + let controller = match (args.input_config, args.query_log) { + (Some(config_path), None) => Controller::from_file(&config_path, opts)?, + (None, Some(log_path)) => { + let metrics_path = args + .metrics_config + .expect("--metrics-config is required when using --query-log"); + Controller::from_query_log(&log_path, &metrics_path, opts)? + } + _ => anyhow::bail!("exactly one of --input_config or --query-log must be provided"), + }; + controller.generate_to_dir(&args.output_dir)?; println!("Generated configs in {}", args.output_dir.display()); Ok(()) } diff --git a/asap-planner-rs/src/output/generator.rs b/asap-planner-rs/src/output/generator.rs index 75f51fd..efc5122 100644 --- a/asap-planner-rs/src/output/generator.rs +++ b/asap-planner-rs/src/output/generator.rs @@ -57,8 +57,8 @@ pub fn generate_plan( metric_schema.clone(), opts.streaming_engine, controller_config.sketch_parameters.clone(), - opts.range_duration, - opts.step, + qg.range_duration.unwrap_or(opts.range_duration), + qg.step.unwrap_or(opts.step), cleanup_policy, ); @@ -73,14 +73,25 @@ pub fn generate_plan( } if should_process { - let (configs, cleanup_param) = processor.get_streaming_aggregation_configs()?; - let mut keys_for_query = Vec::new(); - for config in configs { - let key = config.identifying_key(); - keys_for_query.push((key.clone(), cleanup_param)); - dedup_map.entry(key).or_insert(config); + match processor.get_streaming_aggregation_configs() { + Ok((configs, cleanup_param)) => { + let mut keys_for_query = Vec::new(); + for config in configs { + let key = config.identifying_key(); + keys_for_query.push((key.clone(), cleanup_param)); + dedup_map.entry(key).or_insert(config); + } + query_keys_map.insert(query_string.clone(), keys_for_query); + } + Err(ControllerError::UnknownMetric(ref metric)) => { + tracing::warn!( + query = %query_string, + metric = %metric, + "skipping query referencing unknown metric" + ); + } + Err(e) => return Err(e), } - query_keys_map.insert(query_string.clone(), keys_for_query); } } } diff --git a/asap-planner-rs/src/query_log/converter.rs b/asap-planner-rs/src/query_log/converter.rs new file mode 100644 index 0000000..5703d04 --- /dev/null +++ b/asap-planner-rs/src/query_log/converter.rs @@ -0,0 +1,55 @@ +use serde::Deserialize; + +use crate::config::input::{ + AggregateCleanupConfig, ControllerConfig, MetricDefinition, QueryGroup, +}; + +use super::frequency::{InstantQueryInfo, RangeQueryInfo}; + +/// Subset of ControllerConfig used when loading from a metrics-only YAML file. +#[derive(Deserialize)] +pub struct MetricsConfig { + pub metrics: Vec, +} + +/// Build a `ControllerConfig` from extracted instant and range queries plus a metrics definition. +/// +/// Each query becomes its own `QueryGroup` (one query per group, no SLA fields needed). +pub fn to_controller_config( + instants: Vec, + ranges: Vec, + metrics: Vec, +) -> ControllerConfig { + let mut query_groups: Vec = Vec::new(); + + for info in instants { + query_groups.push(QueryGroup { + id: None, + queries: vec![info.query], + repetition_delay: info.repetition_delay, + controller_options: Default::default(), + step: None, + range_duration: None, + }); + } + + for info in ranges { + query_groups.push(QueryGroup { + id: None, + queries: vec![info.query], + repetition_delay: info.repetition_delay, + controller_options: Default::default(), + step: Some(info.step), + range_duration: Some(info.range_duration), + }); + } + + ControllerConfig { + query_groups, + metrics, + sketch_parameters: None, + aggregate_cleanup: Some(AggregateCleanupConfig { + policy: Some("read_based".to_string()), + }), + } +} diff --git a/asap-planner-rs/src/query_log/frequency.rs b/asap-planner-rs/src/query_log/frequency.rs new file mode 100644 index 0000000..90877a8 --- /dev/null +++ b/asap-planner-rs/src/query_log/frequency.rs @@ -0,0 +1,277 @@ +use chrono::{DateTime, Utc}; +use std::collections::HashMap; + +use super::parser::LogEntry; + +/// A single (query_string, step) variant paired with all its log entries. +type QueryVariant<'a> = ((String, u64), Vec<&'a LogEntry>); + +#[derive(Debug, Clone, PartialEq)] +pub struct InstantQueryInfo { + pub query: String, + /// Median inter-arrival time rounded to nearest scrape interval (seconds). + pub repetition_delay: u64, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct RangeQueryInfo { + pub query: String, + /// Median inter-arrival time rounded to nearest scrape interval (seconds). + pub repetition_delay: u64, + /// Step from params.step (seconds). + pub step: u64, + /// Median of (end − start) across occurrences (seconds). + pub range_duration: u64, +} + +/// Infer query repetition delays from a slice of parsed log entries. +/// +/// Returns `(instant_queries, range_queries)`. +/// +/// Rules: +/// - Entries with step == 0 are instant; others are range. +/// - Group by (query_string, step). +/// - If the same query string appears under multiple step values, keep the +/// variant with the most occurrences and warn about discarded variants. +/// - Groups with only 1 occurrence are skipped with a warning. +/// - `repetition_delay` = median inter-arrival time, rounded to nearest +/// `scrape_interval` second. +/// - If `|raw_median − rounded| / scrape_interval ≥ 0.1` a warning is emitted. +pub fn infer_queries( + entries: &[LogEntry], + scrape_interval: u64, +) -> (Vec, Vec) { + // Group by (query_string, step) + let mut groups: HashMap<(String, u64), Vec<&LogEntry>> = HashMap::new(); + for entry in entries { + groups + .entry((entry.query.clone(), entry.step)) + .or_default() + .push(entry); + } + + // Per query string, if it appears under multiple steps, keep the most frequent. + let mut by_query: HashMap> = HashMap::new(); + for ((query, step), entries) in groups { + by_query + .entry(query.clone()) + .or_default() + .push(((query, step), entries)); + } + + let mut instant_results: Vec = Vec::new(); + let mut range_results: Vec = Vec::new(); + + for (_query_str, mut variants) in by_query { + // Sort descending by count so [0] is the most frequent variant. + variants.sort_by(|a, b| b.1.len().cmp(&a.1.len())); + + if variants.len() > 1 { + tracing::warn!( + query = %variants[0].0.0, + kept_step = variants[0].0.1, + kept_count = variants[0].1.len(), + "query appears with multiple step values; keeping most-frequent variant" + ); + } + + let ((query, step), variant_entries) = variants.remove(0); + + if variant_entries.len() < 2 { + tracing::warn!(%query, "query appears only once in log; skipping"); + continue; + } + + let repetition_delay = infer_repetition_delay(&variant_entries, scrape_interval, &query); + + if step == 0 { + instant_results.push(InstantQueryInfo { + query, + repetition_delay, + }); + } else { + let range_duration = median_range_duration(&variant_entries); + range_results.push(RangeQueryInfo { + query, + repetition_delay, + step, + range_duration, + }); + } + } + + (instant_results, range_results) +} + +/// Compute median inter-arrival time from timestamps and round to nearest scrape interval. +fn infer_repetition_delay(entries: &[&LogEntry], scrape_interval: u64, query: &str) -> u64 { + let mut timestamps: Vec> = entries.iter().map(|e| e.ts).collect(); + timestamps.sort(); + + let deltas: Vec = timestamps + .windows(2) + .map(|w| (w[1] - w[0]).num_seconds() as f64) + .collect(); + + let raw_median = median_f64(&deltas); + let rounded = round_to_nearest(raw_median, scrape_interval); + + let misalignment = (raw_median - rounded as f64).abs() / scrape_interval as f64; + if misalignment >= 0.1 { + tracing::warn!( + %query, + raw_median_secs = raw_median, + rounded_secs = rounded, + misalignment_pct = misalignment * 100.0, + "inferred repetition_delay is poorly aligned with scrape_interval; result may be inaccurate" + ); + } + + rounded +} + +/// Median of (end − start) durations in seconds. +fn median_range_duration(entries: &[&LogEntry]) -> u64 { + let mut durations: Vec = entries + .iter() + .map(|e| (e.end - e.start).num_seconds() as f64) + .collect(); + durations.sort_by(|a, b| a.partial_cmp(b).unwrap()); + median_f64(&durations) as u64 +} + +/// Median of a sorted-or-unsorted slice of f64 values. +fn median_f64(values: &[f64]) -> f64 { + assert!(!values.is_empty()); + let mut sorted = values.to_vec(); + sorted.sort_by(|a, b| a.partial_cmp(b).unwrap()); + let mid = sorted.len() / 2; + if sorted.len().is_multiple_of(2) { + (sorted[mid - 1] + sorted[mid]) / 2.0 + } else { + sorted[mid] + } +} + +/// Round `value` to the nearest multiple of `interval`, minimum 1 interval. +fn round_to_nearest(value: f64, interval: u64) -> u64 { + let interval_f = interval as f64; + let rounded = (value / interval_f).round() as u64; + rounded.max(1) * interval +} + +// ── unit tests ──────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeZone; + + fn make_instant_entries( + base_ts: DateTime, + interval_secs: i64, + count: u32, + ) -> Vec { + (0..count) + .map(|i| LogEntry { + query: "rate(http_requests_total[5m])".to_string(), + start: base_ts + chrono::Duration::seconds(interval_secs * i as i64), + end: base_ts + chrono::Duration::seconds(interval_secs * i as i64), + step: 0, + ts: base_ts + chrono::Duration::seconds(interval_secs * i as i64), + }) + .collect() + } + + fn make_range_entries( + base_ts: DateTime, + interval_secs: i64, + range_secs: i64, + step: u64, + count: u32, + ) -> Vec { + (0..count) + .map(|i| { + let start = base_ts + chrono::Duration::seconds(interval_secs * i as i64); + LogEntry { + query: "rate(http_requests_total[5m])".to_string(), + start, + end: start + chrono::Duration::seconds(range_secs), + step, + ts: base_ts + chrono::Duration::seconds(interval_secs * i as i64), + } + }) + .collect() + } + + fn base_ts() -> DateTime { + Utc.with_ymd_and_hms(2025, 12, 2, 18, 0, 0).unwrap() + } + + #[test] + fn single_occurrence_skipped() { + let entries = make_instant_entries(base_ts(), 60, 1); + let (instants, ranges) = infer_queries(&entries, 15); + assert!(instants.is_empty()); + assert!(ranges.is_empty()); + } + + #[test] + fn median_inter_arrival_odd_count() { + // 5 entries at exactly 60s apart → 4 deltas all 60s → median=60 → rounded=60 + let entries = make_instant_entries(base_ts(), 60, 5); + let (instants, _) = infer_queries(&entries, 15); + assert_eq!(instants.len(), 1); + assert_eq!(instants[0].repetition_delay, 60); + } + + #[test] + fn median_inter_arrival_even_count() { + // 4 entries at 60s apart → 3 deltas all 60s → median=60 → rounded=60 + let entries = make_instant_entries(base_ts(), 60, 4); + let (instants, _) = infer_queries(&entries, 15); + assert_eq!(instants.len(), 1); + assert_eq!(instants[0].repetition_delay, 60); + } + + #[test] + fn round_down_to_nearest_scrape() { + // raw=16s, scrape=15 → nearest=15 (|16-15|/15=6.7% < 10%, no warn) + assert_eq!(round_to_nearest(16.0, 15), 15); + } + + #[test] + fn round_up_to_nearest_scrape() { + // raw=23s, scrape=15 → nearest=30 (23 is closer to 30 than 15) + assert_eq!(round_to_nearest(23.0, 15), 30); + } + + #[test] + fn misaligned_still_returns_result() { + // raw=22s, scrape=15 → rounds to 15, but |22-15|/15=46.7% ≥ 10% → warn emitted + // We only verify the value is returned (warning is logged, not returned) + assert_eq!(round_to_nearest(22.0, 15), 15); + } + + #[test] + fn range_duration_from_start_end() { + let entries = make_range_entries(base_ts(), 60, 3600, 30, 5); + let (_, ranges) = infer_queries(&entries, 15); + assert_eq!(ranges.len(), 1); + assert_eq!(ranges[0].range_duration, 3600); + assert_eq!(ranges[0].step, 30); + } + + #[test] + fn same_query_multiple_steps_keeps_most_frequent() { + let mut entries = make_instant_entries(base_ts(), 60, 3); // step=0, 3 occurrences + let range_entries = make_range_entries(base_ts(), 60, 3600, 30, 2); // step=30, 2 occurrences + entries.extend(range_entries); + + let (instants, ranges) = infer_queries(&entries, 15); + // step=0 variant has more occurrences → kept as instant + assert_eq!(instants.len(), 1); + // step=30 variant discarded + assert!(ranges.is_empty()); + } +} diff --git a/asap-planner-rs/src/query_log/mod.rs b/asap-planner-rs/src/query_log/mod.rs new file mode 100644 index 0000000..d198741 --- /dev/null +++ b/asap-planner-rs/src/query_log/mod.rs @@ -0,0 +1,7 @@ +pub mod converter; +pub mod frequency; +pub mod parser; + +pub use converter::{to_controller_config, MetricsConfig}; +pub use frequency::{infer_queries, InstantQueryInfo, RangeQueryInfo}; +pub use parser::{parse_log_file, LogEntry}; diff --git a/asap-planner-rs/src/query_log/parser.rs b/asap-planner-rs/src/query_log/parser.rs new file mode 100644 index 0000000..7a14a05 --- /dev/null +++ b/asap-planner-rs/src/query_log/parser.rs @@ -0,0 +1,114 @@ +use chrono::{DateTime, Utc}; +use serde::Deserialize; + +#[derive(Debug, Clone, PartialEq)] +pub struct LogEntry { + pub query: String, + pub start: DateTime, + pub end: DateTime, + /// Step in seconds; 0 means instant query. + pub step: u64, + pub ts: DateTime, +} + +// ── raw serde shapes ────────────────────────────────────────────────────────── + +#[derive(Deserialize)] +struct RawEntry { + params: RawParams, + ts: DateTime, +} + +#[derive(Deserialize)] +struct RawParams { + query: String, + start: DateTime, + end: DateTime, + step: u64, +} + +// ── public API ──────────────────────────────────────────────────────────────── + +/// Parse a Prometheus query log file (newline-delimited JSON). +/// Malformed or incomplete lines are skipped with a warning; they never cause a panic. +pub fn parse_log_file(path: &std::path::Path) -> Result, std::io::Error> { + let contents = std::fs::read_to_string(path)?; + Ok(parse_log_str(&contents)) +} + +/// Parse a Prometheus query log from an in-memory string. +pub fn parse_log_str(input: &str) -> Vec { + input + .lines() + .enumerate() + .filter_map(|(line_no, line)| { + let line = line.trim(); + if line.is_empty() { + return None; + } + match serde_json::from_str::(line) { + Ok(raw) => Some(LogEntry { + query: raw.params.query, + start: raw.params.start, + end: raw.params.end, + step: raw.params.step, + ts: raw.ts, + }), + Err(e) => { + tracing::warn!(line = line_no + 1, error = %e, "skipping malformed query log line"); + None + } + } + }) + .collect() +} + +// ── unit tests ──────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + const INSTANT_LINE: &str = r#"{"params":{"end":"2025-12-02T18:00:00.000Z","query":"rate(http_requests_total[5m])","start":"2025-12-02T18:00:00.000Z","step":0},"ts":"2025-12-02T18:00:00.001Z"}"#; + const RANGE_LINE: &str = r#"{"params":{"end":"2025-12-02T19:00:00.000Z","query":"rate(http_requests_total[5m])","start":"2025-12-02T18:00:00.000Z","step":30},"ts":"2025-12-02T18:00:00.001Z"}"#; + + #[test] + fn parse_valid_instant_entry() { + let entries = parse_log_str(INSTANT_LINE); + assert_eq!(entries.len(), 1); + let e = &entries[0]; + assert_eq!(e.query, "rate(http_requests_total[5m])"); + assert_eq!(e.step, 0); + assert_eq!(e.start, e.end); + } + + #[test] + fn parse_valid_range_entry() { + let entries = parse_log_str(RANGE_LINE); + assert_eq!(entries.len(), 1); + let e = &entries[0]; + assert_eq!(e.step, 30); + assert_ne!(e.start, e.end); + let duration = (e.end - e.start).num_seconds(); + assert_eq!(duration, 3600); + } + + #[test] + fn malformed_json_returns_empty() { + let entries = parse_log_str("not valid json at all"); + assert!(entries.is_empty()); + } + + #[test] + fn missing_params_field_skipped() { + let entries = parse_log_str(r#"{"ts":"2025-12-02T18:00:00.000Z"}"#); + assert!(entries.is_empty()); + } + + #[test] + fn mixed_lines_skips_bad() { + let input = format!("{}\nnot json\n{}", INSTANT_LINE, INSTANT_LINE); + let entries = parse_log_str(&input); + assert_eq!(entries.len(), 2); + } +} diff --git a/asap-planner-rs/tests/comparison/test_data/metrics/http_requests.yaml b/asap-planner-rs/tests/comparison/test_data/metrics/http_requests.yaml new file mode 100644 index 0000000..dc7ec91 --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/metrics/http_requests.yaml @@ -0,0 +1,3 @@ +metrics: + - metric: http_requests_total + labels: [instance, job, method, status] diff --git a/asap-planner-rs/tests/comparison/test_data/query_logs/instant_only.log b/asap-planner-rs/tests/comparison/test_data/query_logs/instant_only.log new file mode 100644 index 0000000..c94846d --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/query_logs/instant_only.log @@ -0,0 +1,10 @@ +{"params":{"end":"2025-12-02T18:00:00.000Z","query":"rate(http_requests_total[5m])","start":"2025-12-02T18:00:00.000Z","step":0},"ts":"2025-12-02T18:00:00.001Z"} +{"params":{"end":"2025-12-02T18:01:00.000Z","query":"rate(http_requests_total[5m])","start":"2025-12-02T18:01:00.000Z","step":0},"ts":"2025-12-02T18:01:00.001Z"} +{"params":{"end":"2025-12-02T18:02:00.000Z","query":"rate(http_requests_total[5m])","start":"2025-12-02T18:02:00.000Z","step":0},"ts":"2025-12-02T18:02:00.001Z"} +{"params":{"end":"2025-12-02T18:03:00.000Z","query":"rate(http_requests_total[5m])","start":"2025-12-02T18:03:00.000Z","step":0},"ts":"2025-12-02T18:03:00.001Z"} +{"params":{"end":"2025-12-02T18:04:00.000Z","query":"rate(http_requests_total[5m])","start":"2025-12-02T18:04:00.000Z","step":0},"ts":"2025-12-02T18:04:00.001Z"} +{"params":{"end":"2025-12-02T18:00:00.000Z","query":"sum(http_requests_total)","start":"2025-12-02T18:00:00.000Z","step":0},"ts":"2025-12-02T18:00:00.002Z"} +{"params":{"end":"2025-12-02T18:01:00.000Z","query":"sum(http_requests_total)","start":"2025-12-02T18:01:00.000Z","step":0},"ts":"2025-12-02T18:01:00.002Z"} +{"params":{"end":"2025-12-02T18:02:00.000Z","query":"sum(http_requests_total)","start":"2025-12-02T18:02:00.000Z","step":0},"ts":"2025-12-02T18:02:00.002Z"} +{"params":{"end":"2025-12-02T18:03:00.000Z","query":"sum(http_requests_total)","start":"2025-12-02T18:03:00.000Z","step":0},"ts":"2025-12-02T18:03:00.002Z"} +{"params":{"end":"2025-12-02T18:04:00.000Z","query":"sum(http_requests_total)","start":"2025-12-02T18:04:00.000Z","step":0},"ts":"2025-12-02T18:04:00.002Z"} diff --git a/asap-planner-rs/tests/comparison/test_data/query_logs/range_only.log b/asap-planner-rs/tests/comparison/test_data/query_logs/range_only.log new file mode 100644 index 0000000..bb1107b --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/query_logs/range_only.log @@ -0,0 +1,5 @@ +{"params":{"end":"2025-12-02T19:00:00.000Z","query":"rate(http_requests_total[5m])","start":"2025-12-02T18:00:00.000Z","step":30},"ts":"2025-12-02T18:00:00.001Z"} +{"params":{"end":"2025-12-02T19:01:00.000Z","query":"rate(http_requests_total[5m])","start":"2025-12-02T18:01:00.000Z","step":30},"ts":"2025-12-02T18:01:00.001Z"} +{"params":{"end":"2025-12-02T19:02:00.000Z","query":"rate(http_requests_total[5m])","start":"2025-12-02T18:02:00.000Z","step":30},"ts":"2025-12-02T18:02:00.001Z"} +{"params":{"end":"2025-12-02T19:03:00.000Z","query":"rate(http_requests_total[5m])","start":"2025-12-02T18:03:00.000Z","step":30},"ts":"2025-12-02T18:03:00.001Z"} +{"params":{"end":"2025-12-02T19:04:00.000Z","query":"rate(http_requests_total[5m])","start":"2025-12-02T18:04:00.000Z","step":30},"ts":"2025-12-02T18:04:00.001Z"} diff --git a/asap-planner-rs/tests/comparison/test_data/query_logs/single_occurrence.log b/asap-planner-rs/tests/comparison/test_data/query_logs/single_occurrence.log new file mode 100644 index 0000000..5857715 --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/query_logs/single_occurrence.log @@ -0,0 +1,2 @@ +{"params":{"end":"2025-12-02T18:00:00.000Z","query":"rate(http_requests_total[5m])","start":"2025-12-02T18:00:00.000Z","step":0},"ts":"2025-12-02T18:00:00.001Z"} +{"params":{"end":"2025-12-02T18:00:00.000Z","query":"sum(http_requests_total)","start":"2025-12-02T18:00:00.000Z","step":0},"ts":"2025-12-02T18:00:00.002Z"} diff --git a/asap-planner-rs/tests/comparison/test_data/query_logs/with_malformed.log b/asap-planner-rs/tests/comparison/test_data/query_logs/with_malformed.log new file mode 100644 index 0000000..1c5c2cf --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/query_logs/with_malformed.log @@ -0,0 +1,7 @@ +{"params":{"end":"2025-12-02T18:00:00.000Z","query":"rate(http_requests_total[5m])","start":"2025-12-02T18:00:00.000Z","step":0},"ts":"2025-12-02T18:00:00.001Z"} +not valid json at all +{"params":{"end":"2025-12-02T18:01:00.000Z","query":"rate(http_requests_total[5m])","start":"2025-12-02T18:01:00.000Z","step":0},"ts":"2025-12-02T18:01:00.001Z"} +{"missing_params": true, "ts": "2025-12-02T18:01:30.000Z"} +{"params":{"end":"2025-12-02T18:02:00.000Z","query":"rate(http_requests_total[5m])","start":"2025-12-02T18:02:00.000Z","step":0},"ts":"2025-12-02T18:02:00.001Z"} +{"params":{"end":"2025-12-02T18:03:00.000Z","query":"rate(http_requests_total[5m])","start":"2025-12-02T18:03:00.000Z","step":0},"ts":"2025-12-02T18:03:00.001Z"} +{"params":{"end":"2025-12-02T18:04:00.000Z","query":"rate(http_requests_total[5m])","start":"2025-12-02T18:04:00.000Z","step":0},"ts":"2025-12-02T18:04:00.001Z"} diff --git a/asap-planner-rs/tests/integration.rs b/asap-planner-rs/tests/integration.rs index d51db0d..a7f39ac 100644 --- a/asap-planner-rs/tests/integration.rs +++ b/asap-planner-rs/tests/integration.rs @@ -1,6 +1,73 @@ use asap_planner::{Controller, ControllerError, RuntimeOptions, StreamingEngine}; use std::path::Path; +// ─── query_log integration tests ───────────────────────────────────────────── + +#[test] +fn query_log_instant_produces_valid_configs() { + let c = Controller::from_query_log( + Path::new("tests/comparison/test_data/query_logs/instant_only.log"), + Path::new("tests/comparison/test_data/metrics/http_requests.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + assert!(out.streaming_aggregation_count() > 0); + assert!(out.inference_query_count() > 0); +} + +#[test] +fn query_log_range_produces_valid_configs() { + let c = Controller::from_query_log( + Path::new("tests/comparison/test_data/query_logs/range_only.log"), + Path::new("tests/comparison/test_data/metrics/http_requests.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + // range_only.log has step=30, so effective window size must be ≤ 30 + assert!(out.all_tumbling_window_sizes_leq(30)); +} + +#[test] +fn query_log_single_occurrence_excluded() { + let c = Controller::from_query_log( + Path::new("tests/comparison/test_data/query_logs/single_occurrence.log"), + Path::new("tests/comparison/test_data/metrics/http_requests.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + assert_eq!(out.inference_query_count(), 0); +} + +#[test] +fn query_log_malformed_lines_skipped() { + // with_malformed.log has 5 valid entries for rate() interspersed with bad lines + let c = Controller::from_query_log( + Path::new("tests/comparison/test_data/query_logs/with_malformed.log"), + Path::new("tests/comparison/test_data/metrics/http_requests.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + assert!(out.inference_query_count() > 0); +} + +#[test] +fn query_log_output_files_written() { + let dir = tempfile::tempdir().unwrap(); + let c = Controller::from_query_log( + Path::new("tests/comparison/test_data/query_logs/instant_only.log"), + Path::new("tests/comparison/test_data/metrics/http_requests.yaml"), + arroyo_opts(), + ) + .unwrap(); + c.generate_to_dir(dir.path()).unwrap(); + assert!(dir.path().join("streaming_config.yaml").exists()); + assert!(dir.path().join("inference_config.yaml").exists()); +} + fn arroyo_opts() -> RuntimeOptions { RuntimeOptions { prometheus_scrape_interval: 15, @@ -276,7 +343,8 @@ metrics: } #[test] -fn query_referencing_unknown_metric_returns_error() { +fn query_referencing_unknown_metric_is_skipped_with_warning() { + // Unknown metric no longer aborts the run; the query is silently skipped. let yaml = r#" query_groups: - id: 1 @@ -291,10 +359,9 @@ metrics: labels: ["instance"] "#; let c = Controller::from_yaml(yaml, arroyo_opts()).unwrap(); - assert!(matches!( - c.generate(), - Err(ControllerError::UnknownMetric(_)) - )); + let out = c.generate().unwrap(); + assert_eq!(out.inference_query_count(), 0); + assert_eq!(out.streaming_aggregation_count(), 0); } #[test] diff --git a/docs/03-how-to-guides/operations/bootstrap-config-from-query-log.md b/docs/03-how-to-guides/operations/bootstrap-config-from-query-log.md new file mode 100644 index 0000000..0bc3dfb --- /dev/null +++ b/docs/03-how-to-guides/operations/bootstrap-config-from-query-log.md @@ -0,0 +1,49 @@ +# Bootstrap Config from Prometheus Query Log + +Generate sketch configs from real query traffic instead of hand-authoring a workload YAML. + +## Steps + +### 1. Enable the Prometheus query log + +Add to your Prometheus startup flags: +``` +--query.log-file=/var/log/prometheus/query.log +``` + +Let it run for a representative period (hours to days). Each line is a JSON entry: +```json +{"params":{"query":"rate(http_requests_total[5m])","start":"2025-12-02T18:00:00Z","end":"2025-12-02T18:00:00Z","step":0},"ts":"2025-12-02T18:00:00.001Z"} +``` + +### 2. Create a metrics config + +List the metrics you want ASAP to sketch: +```yaml +# metrics.yaml +metrics: + - metric: http_requests_total + labels: [instance, job, method, status] + - metric: node_cpu_seconds_total + labels: [instance, mode] +``` + +### 3. Run the planner + +```bash +asap-planner \ + --query-log /var/log/prometheus/query.log \ + --metrics-config metrics.yaml \ + --output_dir ./configs \ + --prometheus_scrape_interval 15 \ + --streaming_engine arroyo +``` + +This writes `streaming_config.yaml` and `inference_config.yaml` to `./configs/`. + +## Notes + +- Queries appearing only once are skipped (need frequency to infer repeat interval) +- Queries referencing metrics not in `metrics.yaml` are skipped with a warning +- Unsupported PromQL patterns (e.g. `absent`, complex multi-level aggregations) are skipped +- Repeat interval is inferred from median inter-arrival time, rounded to the nearest scrape interval diff --git a/docs/README.md b/docs/README.md index 6cb5a7b..4e424d5 100644 --- a/docs/README.md +++ b/docs/README.md @@ -37,6 +37,7 @@ Task-oriented guides for common operations: ### Operations Tasks - [Manual Stack Run for Prometheus](03-how-to-guides/operations/manual-stack-run-prometheus.md) - Run ASAP components manually to accelerate Prometheus +- [Bootstrap Config from Query Log](03-how-to-guides/operations/bootstrap-config-from-query-log.md) - Auto-generate sketch configs from Prometheus query traffic - [Manual Stack Run for Clickhouse](03-how-to-guides/operations/manual-stack-run-clickhouse.md) - Run ASAP components manually to accelerate Clickhouse - [Deploy to CloudLab](03-how-to-guides/operations/deploy-cloudlab.md) - Deployment guide - [Troubleshooting](03-how-to-guides/operations/troubleshooting.md) - Common issues & solutions