Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 132 additions & 124 deletions asap-planner-rs/src/output/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub fn generate_plan(
})
}

fn parse_cleanup_policy(s: &str) -> Result<CleanupPolicy, ControllerError> {
pub fn parse_cleanup_policy(s: &str) -> Result<CleanupPolicy, ControllerError> {
match s {
"circular_buffer" => Ok(CleanupPolicy::CircularBuffer),
"read_based" => Ok(CleanupPolicy::ReadBased),
Expand All @@ -124,7 +124,7 @@ fn parse_cleanup_policy(s: &str) -> Result<CleanupPolicy, ControllerError> {
}
}

fn key_by_labels_to_yaml(labels: &KeyByLabelNames) -> YamlValue {
pub fn key_by_labels_to_yaml(labels: &KeyByLabelNames) -> YamlValue {
YamlValue::Sequence(
labels
.labels
Expand All @@ -134,7 +134,134 @@ fn key_by_labels_to_yaml(labels: &KeyByLabelNames) -> YamlValue {
)
}

fn params_to_yaml(params: &HashMap<String, JsonValue>) -> YamlValue {
pub fn build_aggregation_entry(id: u32, cfg: &IntermediateAggConfig) -> YamlValue {
let mut map = serde_yaml::Mapping::new();
map.insert(
YamlValue::String("aggregationId".to_string()),
YamlValue::Number(id.into()),
);
map.insert(
YamlValue::String("aggregationSubType".to_string()),
YamlValue::String(cfg.aggregation_sub_type.clone()),
);
map.insert(
YamlValue::String("aggregationType".to_string()),
YamlValue::String(cfg.aggregation_type.clone()),
);

let mut labels_map = serde_yaml::Mapping::new();
labels_map.insert(
YamlValue::String("aggregated".to_string()),
key_by_labels_to_yaml(&cfg.aggregated_labels),
);
labels_map.insert(
YamlValue::String("grouping".to_string()),
key_by_labels_to_yaml(&cfg.grouping_labels),
);
labels_map.insert(
YamlValue::String("rollup".to_string()),
key_by_labels_to_yaml(&cfg.rollup_labels),
);
map.insert(
YamlValue::String("labels".to_string()),
YamlValue::Mapping(labels_map),
);

map.insert(
YamlValue::String("metric".to_string()),
YamlValue::String(cfg.metric.clone()),
);
map.insert(
YamlValue::String("parameters".to_string()),
params_to_yaml(&cfg.parameters),
);
map.insert(
YamlValue::String("slideInterval".to_string()),
YamlValue::Number(cfg.slide_interval.into()),
);
map.insert(
YamlValue::String("spatialFilter".to_string()),
YamlValue::String(cfg.spatial_filter.clone()),
);
map.insert(
YamlValue::String("table_name".to_string()),
match &cfg.table_name {
Some(t) => YamlValue::String(t.clone()),
None => YamlValue::Null,
},
);
map.insert(
YamlValue::String("value_column".to_string()),
match &cfg.value_column {
Some(v) => YamlValue::String(v.clone()),
None => YamlValue::Null,
},
);
map.insert(
YamlValue::String("windowSize".to_string()),
YamlValue::Number(cfg.window_size.into()),
);
map.insert(
YamlValue::String("windowType".to_string()),
YamlValue::String(cfg.window_type.clone()),
);

YamlValue::Mapping(map)
}

pub fn build_queries_yaml(
cleanup_policy: CleanupPolicy,
query_keys_map: &IndexMap<String, Vec<(String, Option<u64>)>>,
id_map: &HashMap<String, u32>,
) -> Vec<YamlValue> {
query_keys_map
.iter()
.map(|(query_str, keys)| {
let aggregations: Vec<YamlValue> = keys
.iter()
.map(|(key, cleanup_param)| {
let agg_id = id_map[key];
let mut agg_map = serde_yaml::Mapping::new();
agg_map.insert(
YamlValue::String("aggregation_id".to_string()),
YamlValue::Number(agg_id.into()),
);
if let Some(param) = cleanup_param {
match cleanup_policy {
CleanupPolicy::CircularBuffer => {
agg_map.insert(
YamlValue::String("num_aggregates_to_retain".to_string()),
YamlValue::Number((*param).into()),
);
}
CleanupPolicy::ReadBased => {
agg_map.insert(
YamlValue::String("read_count_threshold".to_string()),
YamlValue::Number((*param).into()),
);
}
CleanupPolicy::NoCleanup => {}
}
}
YamlValue::Mapping(agg_map)
})
.collect();

let mut q_map = serde_yaml::Mapping::new();
q_map.insert(
YamlValue::String("aggregations".to_string()),
YamlValue::Sequence(aggregations),
);
q_map.insert(
YamlValue::String("query".to_string()),
YamlValue::String(query_str.clone()),
);
YamlValue::Mapping(q_map)
})
.collect()
}

pub fn params_to_yaml(params: &HashMap<String, JsonValue>) -> YamlValue {
if params.is_empty() {
return YamlValue::Mapping(serde_yaml::Mapping::new());
}
Expand Down Expand Up @@ -169,82 +296,7 @@ fn build_streaming_yaml(
) -> Result<YamlValue, ControllerError> {
let aggregations: Vec<YamlValue> = dedup_map
.iter()
.map(|(key, cfg)| {
let id = id_map[key];
let mut map = serde_yaml::Mapping::new();
map.insert(
YamlValue::String("aggregationId".to_string()),
YamlValue::Number(id.into()),
);
map.insert(
YamlValue::String("aggregationSubType".to_string()),
YamlValue::String(cfg.aggregation_sub_type.clone()),
);
map.insert(
YamlValue::String("aggregationType".to_string()),
YamlValue::String(cfg.aggregation_type.clone()),
);

// labels
let mut labels_map = serde_yaml::Mapping::new();
labels_map.insert(
YamlValue::String("aggregated".to_string()),
key_by_labels_to_yaml(&cfg.aggregated_labels),
);
labels_map.insert(
YamlValue::String("grouping".to_string()),
key_by_labels_to_yaml(&cfg.grouping_labels),
);
labels_map.insert(
YamlValue::String("rollup".to_string()),
key_by_labels_to_yaml(&cfg.rollup_labels),
);
map.insert(
YamlValue::String("labels".to_string()),
YamlValue::Mapping(labels_map),
);

map.insert(
YamlValue::String("metric".to_string()),
YamlValue::String(cfg.metric.clone()),
);
map.insert(
YamlValue::String("parameters".to_string()),
params_to_yaml(&cfg.parameters),
);
map.insert(
YamlValue::String("slideInterval".to_string()),
YamlValue::Number(cfg.slide_interval.into()),
);
map.insert(
YamlValue::String("spatialFilter".to_string()),
YamlValue::String(cfg.spatial_filter.clone()),
);
map.insert(
YamlValue::String("table_name".to_string()),
match &cfg.table_name {
Some(t) => YamlValue::String(t.clone()),
None => YamlValue::Null,
},
);
map.insert(
YamlValue::String("value_column".to_string()),
match &cfg.value_column {
Some(v) => YamlValue::String(v.clone()),
None => YamlValue::Null,
},
);
map.insert(
YamlValue::String("windowSize".to_string()),
YamlValue::Number(cfg.window_size.into()),
);
map.insert(
YamlValue::String("windowType".to_string()),
YamlValue::String(cfg.window_type.clone()),
);

YamlValue::Mapping(map)
})
.map(|(key, cfg)| build_aggregation_entry(id_map[key], cfg))
.collect();

// Build metrics section
Expand Down Expand Up @@ -282,51 +334,7 @@ fn build_inference_yaml(
YamlValue::String(cleanup_policy_str.to_string()),
);

let queries: Vec<YamlValue> = query_keys_map
.iter()
.map(|(query_str, keys)| {
let aggregations: Vec<YamlValue> = keys
.iter()
.map(|(key, cleanup_param)| {
let agg_id = id_map[key];
let mut agg_map = serde_yaml::Mapping::new();
agg_map.insert(
YamlValue::String("aggregation_id".to_string()),
YamlValue::Number(agg_id.into()),
);
if let Some(param) = cleanup_param {
match cleanup_policy {
CleanupPolicy::CircularBuffer => {
agg_map.insert(
YamlValue::String("num_aggregates_to_retain".to_string()),
YamlValue::Number((*param).into()),
);
}
CleanupPolicy::ReadBased => {
agg_map.insert(
YamlValue::String("read_count_threshold".to_string()),
YamlValue::Number((*param).into()),
);
}
CleanupPolicy::NoCleanup => {}
}
}
YamlValue::Mapping(agg_map)
})
.collect();

let mut q_map = serde_yaml::Mapping::new();
q_map.insert(
YamlValue::String("aggregations".to_string()),
YamlValue::Sequence(aggregations),
);
q_map.insert(
YamlValue::String("query".to_string()),
YamlValue::String(query_str.clone()),
);
YamlValue::Mapping(q_map)
})
.collect();
let queries = build_queries_yaml(cleanup_policy, query_keys_map, id_map);

// Build metrics section
let mut metrics_map = serde_yaml::Mapping::new();
Expand Down
67 changes: 54 additions & 13 deletions asap-planner-rs/src/planner/logics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,15 @@ pub struct IntermediateWindowConfig {
pub window_type: String,
}

pub fn get_precompute_operator_parameters(
/// Shared sketch parameter builder used by both PromQL and SQL paths.
///
/// `topk_k` is only required for `CountMinSketchWithHeap`: PromQL supplies it
/// from the `topk(k, …)` query argument; SQL passes `None` (SQL never produces
/// this operator today, so the `None` branch is unreachable in practice).
pub fn build_sketch_parameters(
aggregation_type: &str,
aggregation_sub_type: &str,
match_result: &PromQLMatchResult,
topk_k: Option<u64>,
sketch_params: Option<&SketchParameterOverrides>,
) -> Result<HashMap<String, serde_json::Value>, String> {
match aggregation_type {
Expand Down Expand Up @@ -110,16 +115,8 @@ pub fn get_precompute_operator_parameters(
aggregation_sub_type
));
}
// Get k from aggregation param
let k: u64 = match_result
.tokens
.get("aggregation")
.and_then(|t| t.aggregation.as_ref())
.and_then(|a| a.param.as_ref())
.and_then(|p| p.parse::<f64>().ok())
.map(|f| f as u64)
.ok_or_else(|| "topk query missing required 'k' parameter".to_string())?;

let k = topk_k
.ok_or_else(|| "CountMinSketchWithHeap requires a topk k value".to_string())?;
let depth = sketch_params
.and_then(|p| p.count_min_sketch_with_heap.as_ref())
.map(|p| p.depth)
Expand All @@ -132,7 +129,6 @@ pub fn get_precompute_operator_parameters(
.and_then(|p| p.count_min_sketch_with_heap.as_ref())
.and_then(|p| p.heap_multiplier)
.unwrap_or(DEFAULT_CMS_HEAP_MULT);

let mut m = HashMap::new();
m.insert("depth".to_string(), serde_json::Value::Number(depth.into()));
m.insert("width".to_string(), serde_json::Value::Number(width.into()));
Expand Down Expand Up @@ -183,6 +179,35 @@ pub fn get_precompute_operator_parameters(
}
}

/// PromQL wrapper: extracts the topk `k` from the match result when needed,
/// then delegates to `build_sketch_parameters`.
pub fn build_sketch_parameters_from_promql(
aggregation_type: &str,
aggregation_sub_type: &str,
match_result: &PromQLMatchResult,
sketch_params: Option<&SketchParameterOverrides>,
) -> Result<HashMap<String, serde_json::Value>, String> {
let topk_k = if aggregation_type == "CountMinSketchWithHeap" {
let k: u64 = match_result
.tokens
.get("aggregation")
.and_then(|t| t.aggregation.as_ref())
.and_then(|a| a.param.as_ref())
.and_then(|p| p.parse::<f64>().ok())
.map(|f| f as u64)
.ok_or_else(|| "topk query missing required 'k' parameter".to_string())?;
Some(k)
} else {
None
};
build_sketch_parameters(
aggregation_type,
aggregation_sub_type,
topk_k,
sketch_params,
)
}

pub fn get_cleanup_param(
cleanup_policy: CleanupPolicy,
query_pattern_type: QueryPatternType,
Expand Down Expand Up @@ -266,6 +291,22 @@ pub fn set_subpopulation_labels(
}
}

/// SQL cleanup param — SQL queries are always instant (no range_duration/step).
pub fn get_sql_cleanup_param(
cleanup_policy: CleanupPolicy,
t_lookback: u64,
t_repeat: u64,
) -> Result<u64, String> {
match cleanup_policy {
CleanupPolicy::CircularBuffer | CleanupPolicy::ReadBased => {
Ok(t_lookback.div_ceil(t_repeat))
}
CleanupPolicy::NoCleanup => {
Err("NoCleanup policy should not call get_sql_cleanup_param".to_string())
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading
Loading