Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 68 additions & 1 deletion asap-planner-rs/src/output/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ use sketch_db_common::enums::CleanupPolicy;

use crate::config::input::ControllerConfig;
use crate::error::ControllerError;
use crate::planner::single_query::{IntermediateAggConfig, SingleQueryProcessor};
use crate::planner::single_query::{BinaryArm, IntermediateAggConfig, SingleQueryProcessor};
use crate::RuntimeOptions;

/// `(query_string, Vec<(identifying_key, cleanup_param)>)` pairs produced by binary leaf decomposition.
type LeafEntries = Vec<(String, Vec<(String, Option<u64>)>)>;

/// Run the full planning pipeline and produce YAML outputs
pub fn generate_plan(
controller_config: &ControllerConfig,
Expand Down Expand Up @@ -92,6 +95,14 @@ pub fn generate_plan(
}
Err(e) => return Err(e),
}
} else if let Some(arm_entries) =
collect_binary_leaf_entries(&processor, &mut dedup_map)?
{
// Binary arithmetic: register each leaf arm in dedup_map and query_keys_map
for (arm_query, keys_for_arm) in arm_entries {
// Use `entry` so a standalone query that duplicates an arm wins
query_keys_map.entry(arm_query).or_insert(keys_for_arm);
}
}
}
}
Expand Down Expand Up @@ -123,6 +134,62 @@ pub fn generate_plan(
})
}

/// Recursively collect (arm_query_string, Vec<(dedup_key, cleanup_param)>) pairs
/// from a binary arithmetic expression, registering new configs in `dedup_map`.
///
/// Returns `Some(Vec<...>)` when every leaf arm is acceleratable.
/// Returns `None` if any arm is unsupported (caller should skip the query).
/// Returns `Err` only on internal planner errors.
fn collect_binary_leaf_entries(
processor: &SingleQueryProcessor,
dedup_map: &mut IndexMap<String, IntermediateAggConfig>,
) -> Result<Option<LeafEntries>, ControllerError> {
let arms = match processor.get_binary_arm_queries() {
Some(arms) => arms,
None => return Ok(None), // not a binary expression
};

let mut all_entries: LeafEntries = Vec::new();

for arm in [arms.0, arms.1] {
match arm {
BinaryArm::Scalar(_) => {
// Scalar literals need no aggregation config — skip silently.
}
BinaryArm::Query(arm_query) => {
let arm_processor = processor.make_arm_processor(arm_query.clone());

if arm_processor.is_supported() {
// Leaf arm: gather its streaming aggregation configs.
let (configs, cleanup_param) =
arm_processor.get_streaming_aggregation_configs()?;
let mut keys_for_arm = Vec::new();
for config in configs {
let key = config.identifying_key();
keys_for_arm.push((key.clone(), cleanup_param));
dedup_map.entry(key).or_insert(config);
}
all_entries.push((arm_query, keys_for_arm));
} else {
// The arm might itself be a binary expression — recurse.
match collect_binary_leaf_entries(&arm_processor, dedup_map)? {
Some(sub_entries) => {
all_entries.extend(sub_entries);
}
None => {
// Arm is neither a supported leaf nor a binary expression.
// This entire query cannot be accelerated.
return Ok(None);
}
}
}
}
}
}

Ok(Some(all_entries))
}

pub fn parse_cleanup_policy(s: &str) -> Result<CleanupPolicy, ControllerError> {
match s {
"circular_buffer" => Ok(CleanupPolicy::CircularBuffer),
Expand Down
63 changes: 63 additions & 0 deletions asap-planner-rs/src/planner/single_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,37 @@ use crate::planner::logics::{
use crate::planner::patterns::build_patterns;
use crate::StreamingEngine;

/// Represents one arm of a binary arithmetic expression in the planner.
#[derive(Debug, Clone)]
pub enum BinaryArm {
/// A PromQL query expression that may be acceleratable.
Query(String),
/// A scalar literal (e.g. `100` in `rate(x[5m]) * 100`).
Scalar(f64),
}

/// Convert an AST expression to a `BinaryArm`. Scalar literals become
/// `BinaryArm::Scalar`; everything else is serialized to a query string.
/// Outer parentheses are stripped so nested binary arms can be re-parsed
/// as `Binary` expressions (not `Paren`).
fn expr_to_binary_arm(expr: &promql_parser::parser::Expr) -> BinaryArm {
let inner = strip_parens(expr);
if let promql_parser::parser::Expr::NumberLiteral(nl) = inner {
BinaryArm::Scalar(nl.val)
} else {
BinaryArm::Query(format!("{}", inner))
}
}

/// Recursively remove outer `Paren` wrappers from an expression.
fn strip_parens(expr: &promql_parser::parser::Expr) -> &promql_parser::parser::Expr {
if let promql_parser::parser::Expr::Paren(paren) = expr {
strip_parens(&paren.expr)
} else {
expr
}
}

/// Internal representation of an aggregation config before IDs are assigned
#[derive(Debug, Clone)]
pub struct IntermediateAggConfig {
Expand Down Expand Up @@ -161,6 +192,38 @@ impl SingleQueryProcessor {
}
}

/// Returns `Some((lhs, rhs))` if this query is a binary arithmetic expression.
/// Each arm is either a query string (`BinaryArm::Query`) or a scalar literal
/// (`BinaryArm::Scalar`). Returns `None` if the query is not a binary expression
/// or cannot be parsed.
pub fn get_binary_arm_queries(&self) -> Option<(BinaryArm, BinaryArm)> {
let ast = promql_parser::parser::parse(&self.query).ok()?;
if let promql_parser::parser::Expr::Binary(binary) = ast {
let lhs = expr_to_binary_arm(binary.lhs.as_ref());
let rhs = expr_to_binary_arm(binary.rhs.as_ref());
// Only handle arithmetic operators (not comparison or set operators)
if !binary.op.is_comparison_operator() && !binary.op.is_set_operator() {
return Some((lhs, rhs));
}
}
None
}

/// Create a new processor for an arm query, reusing all parameters from this processor.
pub fn make_arm_processor(&self, arm_query: String) -> Self {
SingleQueryProcessor::new(
arm_query,
self.t_repeat,
self.prometheus_scrape_interval,
self.metric_schema.clone(),
self.streaming_engine,
self.sketch_parameters.clone(),
self.range_duration,
self.step,
self.cleanup_policy,
)
}

/// Check if query should be processed (supported pattern)
pub fn is_supported(&self) -> bool {
if let Ok(ast) = promql_parser::parser::parse(&self.query) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
query_groups:
- id: 1
queries:
- "rate(errors_total[5m]) / rate(requests_total[5m])"
repetition_delay: 300
controller_options:
accuracy_sla: 0.99
latency_sla: 1.0
metrics:
- metric: "errors_total"
labels: ["instance", "job"]
- metric: "requests_total"
labels: ["instance", "job"]
aggregate_cleanup:
policy: "read_based"
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
query_groups:
- id: 1
queries:
- "rate(errors_total[5m]) / rate(requests_total[5m])"
- "rate(errors_total[5m])"
repetition_delay: 300
controller_options:
accuracy_sla: 0.99
latency_sla: 1.0
metrics:
- metric: "errors_total"
labels: ["instance", "job"]
- metric: "requests_total"
labels: ["instance", "job"]
aggregate_cleanup:
policy: "read_based"
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
query_groups:
- id: 1
queries:
- "(rate(a_total[5m]) + rate(b_total[5m])) / rate(c_total[5m])"
repetition_delay: 300
controller_options:
accuracy_sla: 0.99
latency_sla: 1.0
metrics:
- metric: "a_total"
labels: ["instance"]
- metric: "b_total"
labels: ["instance"]
- metric: "c_total"
labels: ["instance"]
aggregate_cleanup:
policy: "read_based"
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
query_groups:
- id: 1
queries:
- "foo(errors_total[5m]) / rate(requests_total[5m])"
repetition_delay: 300
controller_options:
accuracy_sla: 0.99
latency_sla: 1.0
metrics:
- metric: "errors_total"
labels: ["instance"]
- metric: "requests_total"
labels: ["instance"]
aggregate_cleanup:
policy: "read_based"
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
query_groups:
- id: 1
queries:
- "rate(errors_total[5m]) * 100"
repetition_delay: 300
controller_options:
accuracy_sla: 0.99
latency_sla: 1.0
metrics:
- metric: "errors_total"
labels: ["instance", "job"]
aggregate_cleanup:
policy: "read_based"
67 changes: 67 additions & 0 deletions asap-planner-rs/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,73 @@ fn temporal_overlapping_cleanup_param_equals_range_over_repeat() {
);
}

// --- Binary arithmetic tests ---

#[test]
fn binary_arithmetic_produces_two_leaf_configs() {
let c = Controller::from_file(
Path::new("tests/comparison/test_data/configs/binary_arithmetic.yaml"),
arroyo_opts(),
)
.unwrap();
let out = c.generate().unwrap();
// Two arms → two streaming aggregation configs
assert_eq!(out.streaming_aggregation_count(), 2);
// Two separate query_config entries (one per arm)
assert_eq!(out.inference_query_count(), 2);
}

#[test]
fn binary_arithmetic_deduplicates_shared_arm() {
let c = Controller::from_file(
Path::new("tests/comparison/test_data/configs/binary_arithmetic_dedup.yaml"),
arroyo_opts(),
)
.unwrap();
let out = c.generate().unwrap();
// errors_total arm is shared — only 2 streaming configs total (not 3)
assert_eq!(out.streaming_aggregation_count(), 2);
// 2 query_config entries: rate(errors_total[5m]) and rate(requests_total[5m])
assert_eq!(out.inference_query_count(), 2);
}

#[test]
fn nested_binary_arithmetic_produces_three_leaf_configs() {
let c = Controller::from_file(
Path::new("tests/comparison/test_data/configs/binary_arithmetic_nested.yaml"),
arroyo_opts(),
)
.unwrap();
let out = c.generate().unwrap();
assert_eq!(out.streaming_aggregation_count(), 3);
assert_eq!(out.inference_query_count(), 3);
}

#[test]
fn binary_arithmetic_scalar_constant_produces_one_leaf_config() {
let c = Controller::from_file(
Path::new("tests/comparison/test_data/configs/binary_arithmetic_scalar.yaml"),
arroyo_opts(),
)
.unwrap();
let out = c.generate().unwrap();
// Only the vector arm needs a streaming config; 100 is a literal
assert_eq!(out.streaming_aggregation_count(), 1);
assert_eq!(out.inference_query_count(), 1);
}

#[test]
fn binary_arithmetic_with_non_acceleratable_arm_produces_no_configs() {
let c = Controller::from_file(
Path::new("tests/comparison/test_data/configs/binary_arithmetic_non_acceleratable.yaml"),
arroyo_opts(),
)
.unwrap();
let out = c.generate().unwrap();
assert_eq!(out.streaming_aggregation_count(), 0);
assert_eq!(out.inference_query_count(), 0);
}

#[test]
fn temporal_overlapping_rate_increase_deduped() {
// rate and increase produce identical MultipleIncrease configs → 1 streaming entry shared,
Expand Down
Loading
Loading