Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions asap-planner-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ clap.workspace = true
indexmap.workspace = true
chrono.workspace = true
promql-parser = "0.5.0"
reqwest = { version = "0.11", features = ["blocking", "json"] }

[dev-dependencies]
tempfile = "3.20"
Expand Down
3 changes: 2 additions & 1 deletion asap-planner-rs/docker-compose.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ services:
"--input_config", "/app/input/config.yaml",
"--output_dir", "/app/output",
"--prometheus_scrape_interval", "{{ prometheus_scrape_interval }}",
"--streaming_engine", "{{ streaming_engine }}"{% if punting %},
"--streaming_engine", "{{ streaming_engine }}",
"--prometheus-url", "{{ prometheus_url }}"{% if punting %},
"--enable-punting"{% endif %}
]
restart: no
2 changes: 1 addition & 1 deletion asap-planner-rs/src/config/input.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use serde::Deserialize;

#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ControllerConfig {
pub query_groups: Vec<QueryGroup>,
pub metrics: Vec<MetricDefinition>,
pub sketch_parameters: Option<SketchParameterOverrides>,
pub aggregate_cleanup: Option<AggregateCleanupConfig>,
}
Expand Down
2 changes: 2 additions & 0 deletions asap-planner-rs/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ pub enum ControllerError {
SqlParse(String),
#[error("Unknown table: {0}")]
UnknownTable(String),
#[error("Prometheus client error: {0}")]
PrometheusClient(String),
}
93 changes: 81 additions & 12 deletions asap-planner-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod config;
pub mod error;
pub mod output;
pub mod planner;
pub mod prometheus_client;
pub mod query_log;

use serde_yaml::Value as YamlValue;
Expand All @@ -12,6 +13,8 @@ pub use config::input::SQLControllerConfig;
pub use error::ControllerError;
pub use output::generator::{GeneratorOutput, PuntedQuery};
pub use output::sql_generator::SQLRuntimeOptions;
pub use prometheus_client::build_schema_from_prometheus;
pub use sketch_db_common::PromQLSchema;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamingEngine {
Expand All @@ -30,6 +33,7 @@ pub struct RuntimeOptions {

pub struct Controller {
config: ControllerConfig,
schema: PromQLSchema,
options: RuntimeOptions,
}

Expand Down Expand Up @@ -279,45 +283,110 @@ impl SQLController {
}

impl Controller {
pub fn from_file(path: &Path, opts: RuntimeOptions) -> Result<Self, ControllerError> {
/// Build a `Controller` from a config file, fetching metric labels from Prometheus.
///
/// `prometheus_url` is queried via `GET /api/v1/series?match[]=<metric>` for each metric
/// name found in the config's PromQL queries.
pub fn from_file(
path: &Path,
opts: RuntimeOptions,
prometheus_url: &str,
) -> Result<Self, ControllerError> {
let yaml_str = std::fs::read_to_string(path)?;
Self::from_yaml(&yaml_str, opts)
let config: ControllerConfig = serde_yaml::from_str(&yaml_str)?;
let all_queries: Vec<String> = config
.query_groups
.iter()
.flat_map(|qg| qg.queries.clone())
.collect();
let schema = prometheus_client::build_schema_from_prometheus(prometheus_url, &all_queries)?;
Ok(Self {
config,
schema,
options: opts,
})
}

/// Build a `Controller` from a config file with a caller-supplied `PromQLSchema`.
///
/// Use this when the schema is available without querying Prometheus (e.g. in tests
/// or when the schema is constructed in-process by the caller).
pub fn from_file_with_schema(
path: &Path,
schema: PromQLSchema,
opts: RuntimeOptions,
) -> Result<Self, ControllerError> {
let yaml_str = std::fs::read_to_string(path)?;
let config: ControllerConfig = serde_yaml::from_str(&yaml_str)?;
Ok(Self {
config,
schema,
options: opts,
})
}

pub fn from_yaml(yaml: &str, opts: RuntimeOptions) -> Result<Self, ControllerError> {
/// Build a `Controller` from a YAML string with a caller-supplied `PromQLSchema`.
pub fn from_yaml_with_schema(
yaml: &str,
schema: PromQLSchema,
opts: RuntimeOptions,
) -> Result<Self, ControllerError> {
let config: ControllerConfig = serde_yaml::from_str(yaml)?;
Ok(Self {
config,
schema,
options: opts,
})
}

/// Build a `Controller` from a Prometheus query log file and a metrics config YAML.
/// Build a `Controller` from a Prometheus query log file, fetching metric labels from
/// Prometheus.
///
/// - `log_path`: newline-delimited JSON query log (Prometheus `--query.log-file` output)
/// - `metrics_path`: YAML file with a `metrics:` section listing metric names and labels
/// - `prometheus_url`: base URL queried for label discovery
pub fn from_query_log(
log_path: &Path,
metrics_path: &Path,
opts: RuntimeOptions,
prometheus_url: &str,
) -> Result<Self, ControllerError> {
let entries = query_log::parse_log_file(log_path)?;
let (instants, ranges) =
query_log::infer_queries(&entries, opts.prometheus_scrape_interval);
let config = query_log::to_controller_config(instants, ranges);
let all_queries: Vec<String> = config
.query_groups
.iter()
.flat_map(|qg| qg.queries.clone())
.collect();
let schema = prometheus_client::build_schema_from_prometheus(prometheus_url, &all_queries)?;
Ok(Self {
config,
schema,
options: opts,
})
}

let metrics_yaml = std::fs::read_to_string(metrics_path)?;
let metrics_config: query_log::MetricsConfig = serde_yaml::from_str(&metrics_yaml)?;

let config = query_log::to_controller_config(instants, ranges, metrics_config.metrics);

/// Build a `Controller` from a Prometheus query log file with a caller-supplied `PromQLSchema`.
///
/// Use this when the schema is available without querying Prometheus (e.g. in tests).
pub fn from_query_log_with_schema(
log_path: &Path,
schema: PromQLSchema,
opts: RuntimeOptions,
) -> Result<Self, ControllerError> {
let entries = query_log::parse_log_file(log_path)?;
let (instants, ranges) =
query_log::infer_queries(&entries, opts.prometheus_scrape_interval);
let config = query_log::to_controller_config(instants, ranges);
Ok(Self {
config,
schema,
options: opts,
})
}

pub fn generate(&self) -> Result<PlannerOutput, ControllerError> {
let output = output::generator::generate_plan(&self.config, &self.options)?;
let output = output::generator::generate_plan(&self.config, &self.schema, &self.options)?;
Ok(PlannerOutput {
punted_queries: output.punted_queries,
streaming_yaml: output.streaming_yaml,
Expand Down
21 changes: 12 additions & 9 deletions asap-planner-rs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@ struct Args {
#[arg(long = "query-log", conflicts_with = "input_config")]
query_log: Option<PathBuf>,

/// Path to a metrics config YAML (required when using --query-log).
#[arg(long = "metrics-config", requires = "query_log")]
metrics_config: Option<PathBuf>,

#[arg(long = "output_dir")]
output_dir: PathBuf,

#[arg(long = "prometheus_scrape_interval", required = false)]
prometheus_scrape_interval: Option<u64>,

/// Base URL of the Prometheus instance used to auto-infer metric label sets.
/// Required for PromQL mode. Example: http://localhost:9090
#[arg(long = "prometheus-url", required = false)]
prometheus_url: Option<String>,

#[arg(long = "streaming_engine", value_enum)]
streaming_engine: EngineArg,

Expand Down Expand Up @@ -73,6 +74,9 @@ fn main() -> anyhow::Result<()> {
let scrape_interval = args.prometheus_scrape_interval.ok_or_else(|| {
anyhow::anyhow!("--prometheus_scrape_interval is required for PromQL mode")
})?;
let prometheus_url = args
.prometheus_url
.ok_or_else(|| anyhow::anyhow!("--prometheus-url is required for PromQL mode"))?;
let opts = RuntimeOptions {
prometheus_scrape_interval: scrape_interval,
streaming_engine: engine,
Expand All @@ -81,12 +85,11 @@ fn main() -> anyhow::Result<()> {
step: args.step,
};
let controller = match (args.input_config, args.query_log) {
(Some(config_path), None) => Controller::from_file(&config_path, opts)?,
(Some(config_path), None) => {
Controller::from_file(&config_path, opts, &prometheus_url)?
}
(None, Some(log_path)) => {
let metrics_path = args
.metrics_config
.expect("--metrics-config is required when using --query-log");
Controller::from_query_log(&log_path, &metrics_path, opts)?
Controller::from_query_log(&log_path, opts, &prometheus_url)?
}
_ => anyhow::bail!(
"exactly one of --input_config or --query-log must be provided for PromQL mode"
Expand Down
9 changes: 3 additions & 6 deletions asap-planner-rs/src/output/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::collections::HashMap;

use promql_utilities::data_model::KeyByLabelNames;
use sketch_db_common::enums::CleanupPolicy;
use sketch_db_common::PromQLSchema;

use crate::config::input::ControllerConfig;
use crate::error::ControllerError;
Expand All @@ -14,14 +15,10 @@ use crate::RuntimeOptions;
/// Run the full planning pipeline and produce YAML outputs
pub fn generate_plan(
controller_config: &ControllerConfig,
schema: &PromQLSchema,
opts: &RuntimeOptions,
) -> Result<GeneratorOutput, ControllerError> {
// Build metric schema
let mut metric_schema = sketch_db_common::PromQLSchema::new();
for md in &controller_config.metrics {
metric_schema =
metric_schema.add_metric(md.metric.clone(), KeyByLabelNames::new(md.labels.clone()));
}
let metric_schema = schema.clone();

// Determine cleanup policy
let cleanup_policy_str = controller_config
Expand Down
Loading
Loading