Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 98 additions & 119 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions asap-planner-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ path = "src/main.rs"
[dependencies]
sketch_db_common.workspace = true
promql_utilities.workspace = true
sql_utilities.workspace = true
sqlparser = "0.59.0"
serde.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
Expand Down
24 changes: 24 additions & 0 deletions asap-planner-rs/src/config/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,27 @@ pub struct HydraParams {
pub col_num: u64,
pub k: u64,
}

#[derive(Debug, Clone, Deserialize)]
pub struct SQLControllerConfig {
pub query_groups: Vec<SQLQueryGroup>,
pub tables: Vec<TableDefinition>,
pub sketch_parameters: Option<SketchParameterOverrides>,
pub aggregate_cleanup: Option<AggregateCleanupConfig>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct SQLQueryGroup {
pub id: Option<u32>,
pub queries: Vec<String>,
pub repetition_delay: u64,
pub controller_options: ControllerOptions,
}

#[derive(Debug, Clone, Deserialize)]
pub struct TableDefinition {
pub name: String,
pub time_column: String,
pub value_columns: Vec<String>,
pub metadata_columns: Vec<String>,
}
4 changes: 4 additions & 0 deletions asap-planner-rs/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ pub enum ControllerError {
PlannerError(String),
#[error("Unknown metric: {0}")]
UnknownMetric(String),
#[error("SQL parse error: {0}")]
SqlParse(String),
#[error("Unknown table: {0}")]
UnknownTable(String),
}
113 changes: 113 additions & 0 deletions asap-planner-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use serde_yaml::Value as YamlValue;
use std::path::Path;

pub use config::input::ControllerConfig;
pub use config::input::SQLControllerConfig;
pub use error::ControllerError;
pub use output::generator::{GeneratorOutput, PuntedQuery};
pub use output::sql_generator::SQLRuntimeOptions;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamingEngine {
Expand Down Expand Up @@ -162,6 +164,117 @@ impl PlannerOutput {
pub fn to_inference_yaml_string(&self) -> Result<String, anyhow::Error> {
Ok(serde_yaml::to_string(&self.inference_yaml)?)
}

/// Returns the table_name field of the first aggregation matching agg_type.
pub fn aggregation_table_name(&self, agg_type: &str) -> Option<String> {
if let YamlValue::Mapping(root) = &self.streaming_yaml {
if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") {
for agg in aggs {
if let YamlValue::Mapping(m) = agg {
if let Some(YamlValue::String(t)) = m.get("aggregationType") {
if t == agg_type {
if let Some(YamlValue::String(name)) = m.get("table_name") {
return Some(name.clone());
}
}
}
}
}
}
}
None
}

/// Returns the value_column field of the first aggregation matching agg_type.
pub fn aggregation_value_column(&self, agg_type: &str) -> Option<String> {
if let YamlValue::Mapping(root) = &self.streaming_yaml {
if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") {
for agg in aggs {
if let YamlValue::Mapping(m) = agg {
if let Some(YamlValue::String(t)) = m.get("aggregationType") {
if t == agg_type {
if let Some(YamlValue::String(col)) = m.get("value_column") {
return Some(col.clone());
}
}
}
}
}
}
}
None
}

/// Returns true if any aggregation has the matching type AND sub_type.
pub fn has_aggregation_type_and_sub_type(&self, agg_type: &str, sub_type: &str) -> bool {
if let YamlValue::Mapping(root) = &self.streaming_yaml {
if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") {
return aggs.iter().any(|agg| {
if let YamlValue::Mapping(m) = agg {
let type_matches = m.get("aggregationType").and_then(|v| {
if let YamlValue::String(s) = v {
Some(s.as_str())
} else {
None
}
}) == Some(agg_type);
let sub_matches = m.get("aggregationSubType").and_then(|v| {
if let YamlValue::String(s) = v {
Some(s.as_str())
} else {
None
}
}) == Some(sub_type);
type_matches && sub_matches
} else {
false
}
});
}
}
false
}
}

pub struct SQLController {
config: SQLControllerConfig,
options: SQLRuntimeOptions,
}

impl SQLController {
pub fn from_file(path: &Path, opts: SQLRuntimeOptions) -> Result<Self, ControllerError> {
let yaml_str = std::fs::read_to_string(path)?;
Self::from_yaml(&yaml_str, opts)
}

pub fn from_yaml(yaml: &str, opts: SQLRuntimeOptions) -> Result<Self, ControllerError> {
let config: SQLControllerConfig = serde_yaml::from_str(yaml)?;
Ok(Self {
config,
options: opts,
})
}

pub fn generate(&self) -> Result<PlannerOutput, ControllerError> {
let output = output::sql_generator::generate_sql_plan(&self.config, &self.options)?;
Ok(PlannerOutput {
punted_queries: output.punted_queries,
streaming_yaml: output.streaming_yaml,
inference_yaml: output.inference_yaml,
aggregation_count: output.aggregation_count,
query_count: output.query_count,
})
}

pub fn generate_to_dir(&self, dir: &Path) -> Result<PlannerOutput, ControllerError> {
let output = self.generate()?;
std::fs::create_dir_all(dir)?;
let streaming_str = serde_yaml::to_string(&output.streaming_yaml)?;
let inference_str = serde_yaml::to_string(&output.inference_yaml)?;
std::fs::write(dir.join("streaming_config.yaml"), streaming_str)?;
std::fs::write(dir.join("inference_config.yaml"), inference_str)?;
Ok(output)
}
}

impl Controller {
Expand Down
54 changes: 41 additions & 13 deletions asap-planner-rs/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use asap_planner::{Controller, RuntimeOptions, StreamingEngine};
use asap_planner::{Controller, RuntimeOptions, SQLController, SQLRuntimeOptions, StreamingEngine};
use clap::Parser;
use sketch_db_common::enums::QueryLanguage;
use std::path::PathBuf;

#[derive(Parser, Debug)]
Expand All @@ -11,8 +12,8 @@ struct Args {
#[arg(long = "output_dir")]
output_dir: PathBuf,

#[arg(long = "prometheus_scrape_interval")]
prometheus_scrape_interval: u64,
#[arg(long = "prometheus_scrape_interval", required = false)]
prometheus_scrape_interval: Option<u64>,

#[arg(long = "streaming_engine", value_enum)]
streaming_engine: EngineArg,
Expand All @@ -26,6 +27,12 @@ struct Args {
#[arg(long = "step", default_value = "0")]
step: u64,

#[arg(long = "query-language", value_enum, default_value = "promql")]
query_language: QueryLanguage,

#[arg(long = "data-ingestion-interval", required = false)]
data_ingestion_interval: Option<u64>,

#[arg(short, long, action = clap::ArgAction::Count)]
verbose: u8,
}
Expand All @@ -52,16 +59,37 @@ fn main() -> anyhow::Result<()> {
EngineArg::Flink => StreamingEngine::Flink,
};

let opts = RuntimeOptions {
prometheus_scrape_interval: args.prometheus_scrape_interval,
streaming_engine: engine,
enable_punting: args.enable_punting,
range_duration: args.range_duration,
step: args.step,
};

let controller = Controller::from_file(&args.input_config, opts)?;
controller.generate_to_dir(&args.output_dir)?;
match args.query_language {
QueryLanguage::promql => {
let scrape_interval = args.prometheus_scrape_interval.ok_or_else(|| {
anyhow::anyhow!("--prometheus_scrape_interval is required for PromQL mode")
})?;
let opts = RuntimeOptions {
prometheus_scrape_interval: scrape_interval,
streaming_engine: engine,
enable_punting: args.enable_punting,
range_duration: args.range_duration,
step: args.step,
};
let controller = Controller::from_file(&args.input_config, opts)?;
controller.generate_to_dir(&args.output_dir)?;
}
QueryLanguage::sql | QueryLanguage::elastic_sql => {
let interval = args.data_ingestion_interval.ok_or_else(|| {
anyhow::anyhow!("--data-ingestion-interval is required for SQL mode")
})?;
let opts = SQLRuntimeOptions {
streaming_engine: engine,
query_evaluation_time: None,
data_ingestion_interval: interval,
};
SQLController::from_file(&args.input_config, opts)?
.generate_to_dir(&args.output_dir)?;
}
QueryLanguage::elastic_querydsl => {
anyhow::bail!("ElasticQueryDSL is not yet supported");
}
}

println!("Generated configs in {}", args.output_dir.display());
Ok(())
Expand Down
1 change: 1 addition & 0 deletions asap-planner-rs/src/output/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod generator;
pub mod sql_generator;
pub use generator::*;
Loading
Loading