diff --git a/asap-planner-rs/src/output/generator.rs b/asap-planner-rs/src/output/generator.rs index efc5122..bf82157 100644 --- a/asap-planner-rs/src/output/generator.rs +++ b/asap-planner-rs/src/output/generator.rs @@ -8,9 +8,12 @@ use sketch_db_common::enums::CleanupPolicy; use crate::config::input::ControllerConfig; use crate::error::ControllerError; -use crate::planner::single_query::{IntermediateAggConfig, SingleQueryProcessor}; +use crate::planner::single_query::{BinaryArm, IntermediateAggConfig, SingleQueryProcessor}; use crate::RuntimeOptions; +/// `(query_string, Vec<(identifying_key, cleanup_param)>)` pairs produced by binary leaf decomposition. +type LeafEntries = Vec<(String, Vec<(String, Option)>)>; + /// Run the full planning pipeline and produce YAML outputs pub fn generate_plan( controller_config: &ControllerConfig, @@ -92,6 +95,14 @@ pub fn generate_plan( } Err(e) => return Err(e), } + } else if let Some(arm_entries) = + collect_binary_leaf_entries(&processor, &mut dedup_map)? + { + // Binary arithmetic: register each leaf arm in dedup_map and query_keys_map + for (arm_query, keys_for_arm) in arm_entries { + // Use `entry` so a standalone query that duplicates an arm wins + query_keys_map.entry(arm_query).or_insert(keys_for_arm); + } } } } @@ -123,6 +134,62 @@ pub fn generate_plan( }) } +/// Recursively collect (arm_query_string, Vec<(dedup_key, cleanup_param)>) pairs +/// from a binary arithmetic expression, registering new configs in `dedup_map`. +/// +/// Returns `Some(Vec<...>)` when every leaf arm is acceleratable. +/// Returns `None` if any arm is unsupported (caller should skip the query). +/// Returns `Err` only on internal planner errors. +fn collect_binary_leaf_entries( + processor: &SingleQueryProcessor, + dedup_map: &mut IndexMap, +) -> Result, ControllerError> { + let arms = match processor.get_binary_arm_queries() { + Some(arms) => arms, + None => return Ok(None), // not a binary expression + }; + + let mut all_entries: LeafEntries = Vec::new(); + + for arm in [arms.0, arms.1] { + match arm { + BinaryArm::Scalar(_) => { + // Scalar literals need no aggregation config — skip silently. + } + BinaryArm::Query(arm_query) => { + let arm_processor = processor.make_arm_processor(arm_query.clone()); + + if arm_processor.is_supported() { + // Leaf arm: gather its streaming aggregation configs. + let (configs, cleanup_param) = + arm_processor.get_streaming_aggregation_configs()?; + let mut keys_for_arm = Vec::new(); + for config in configs { + let key = config.identifying_key(); + keys_for_arm.push((key.clone(), cleanup_param)); + dedup_map.entry(key).or_insert(config); + } + all_entries.push((arm_query, keys_for_arm)); + } else { + // The arm might itself be a binary expression — recurse. + match collect_binary_leaf_entries(&arm_processor, dedup_map)? { + Some(sub_entries) => { + all_entries.extend(sub_entries); + } + None => { + // Arm is neither a supported leaf nor a binary expression. + // This entire query cannot be accelerated. + return Ok(None); + } + } + } + } + } + } + + Ok(Some(all_entries)) +} + pub fn parse_cleanup_policy(s: &str) -> Result { match s { "circular_buffer" => Ok(CleanupPolicy::CircularBuffer), diff --git a/asap-planner-rs/src/planner/single_query.rs b/asap-planner-rs/src/planner/single_query.rs index 00e1d99..12c01c4 100644 --- a/asap-planner-rs/src/planner/single_query.rs +++ b/asap-planner-rs/src/planner/single_query.rs @@ -21,6 +21,37 @@ use crate::planner::logics::{ use crate::planner::patterns::build_patterns; use crate::StreamingEngine; +/// Represents one arm of a binary arithmetic expression in the planner. +#[derive(Debug, Clone)] +pub enum BinaryArm { + /// A PromQL query expression that may be acceleratable. + Query(String), + /// A scalar literal (e.g. `100` in `rate(x[5m]) * 100`). + Scalar(f64), +} + +/// Convert an AST expression to a `BinaryArm`. Scalar literals become +/// `BinaryArm::Scalar`; everything else is serialized to a query string. +/// Outer parentheses are stripped so nested binary arms can be re-parsed +/// as `Binary` expressions (not `Paren`). +fn expr_to_binary_arm(expr: &promql_parser::parser::Expr) -> BinaryArm { + let inner = strip_parens(expr); + if let promql_parser::parser::Expr::NumberLiteral(nl) = inner { + BinaryArm::Scalar(nl.val) + } else { + BinaryArm::Query(format!("{}", inner)) + } +} + +/// Recursively remove outer `Paren` wrappers from an expression. +fn strip_parens(expr: &promql_parser::parser::Expr) -> &promql_parser::parser::Expr { + if let promql_parser::parser::Expr::Paren(paren) = expr { + strip_parens(&paren.expr) + } else { + expr + } +} + /// Internal representation of an aggregation config before IDs are assigned #[derive(Debug, Clone)] pub struct IntermediateAggConfig { @@ -161,6 +192,38 @@ impl SingleQueryProcessor { } } + /// Returns `Some((lhs, rhs))` if this query is a binary arithmetic expression. + /// Each arm is either a query string (`BinaryArm::Query`) or a scalar literal + /// (`BinaryArm::Scalar`). Returns `None` if the query is not a binary expression + /// or cannot be parsed. + pub fn get_binary_arm_queries(&self) -> Option<(BinaryArm, BinaryArm)> { + let ast = promql_parser::parser::parse(&self.query).ok()?; + if let promql_parser::parser::Expr::Binary(binary) = ast { + let lhs = expr_to_binary_arm(binary.lhs.as_ref()); + let rhs = expr_to_binary_arm(binary.rhs.as_ref()); + // Only handle arithmetic operators (not comparison or set operators) + if !binary.op.is_comparison_operator() && !binary.op.is_set_operator() { + return Some((lhs, rhs)); + } + } + None + } + + /// Create a new processor for an arm query, reusing all parameters from this processor. + pub fn make_arm_processor(&self, arm_query: String) -> Self { + SingleQueryProcessor::new( + arm_query, + self.t_repeat, + self.prometheus_scrape_interval, + self.metric_schema.clone(), + self.streaming_engine, + self.sketch_parameters.clone(), + self.range_duration, + self.step, + self.cleanup_policy, + ) + } + /// Check if query should be processed (supported pattern) pub fn is_supported(&self) -> bool { if let Ok(ast) = promql_parser::parser::parse(&self.query) { diff --git a/asap-planner-rs/tests/comparison/test_data/configs/binary_arithmetic.yaml b/asap-planner-rs/tests/comparison/test_data/configs/binary_arithmetic.yaml new file mode 100644 index 0000000..c6b8101 --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/binary_arithmetic.yaml @@ -0,0 +1,15 @@ +query_groups: + - id: 1 + queries: + - "rate(errors_total[5m]) / rate(requests_total[5m])" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "errors_total" + labels: ["instance", "job"] + - metric: "requests_total" + labels: ["instance", "job"] +aggregate_cleanup: + policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/binary_arithmetic_dedup.yaml b/asap-planner-rs/tests/comparison/test_data/configs/binary_arithmetic_dedup.yaml new file mode 100644 index 0000000..4dc2eab --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/binary_arithmetic_dedup.yaml @@ -0,0 +1,16 @@ +query_groups: + - id: 1 + queries: + - "rate(errors_total[5m]) / rate(requests_total[5m])" + - "rate(errors_total[5m])" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "errors_total" + labels: ["instance", "job"] + - metric: "requests_total" + labels: ["instance", "job"] +aggregate_cleanup: + policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/binary_arithmetic_nested.yaml b/asap-planner-rs/tests/comparison/test_data/configs/binary_arithmetic_nested.yaml new file mode 100644 index 0000000..f5abf8a --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/binary_arithmetic_nested.yaml @@ -0,0 +1,17 @@ +query_groups: + - id: 1 + queries: + - "(rate(a_total[5m]) + rate(b_total[5m])) / rate(c_total[5m])" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "a_total" + labels: ["instance"] + - metric: "b_total" + labels: ["instance"] + - metric: "c_total" + labels: ["instance"] +aggregate_cleanup: + policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/binary_arithmetic_non_acceleratable.yaml b/asap-planner-rs/tests/comparison/test_data/configs/binary_arithmetic_non_acceleratable.yaml new file mode 100644 index 0000000..6f53e4c --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/binary_arithmetic_non_acceleratable.yaml @@ -0,0 +1,15 @@ +query_groups: + - id: 1 + queries: + - "foo(errors_total[5m]) / rate(requests_total[5m])" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "errors_total" + labels: ["instance"] + - metric: "requests_total" + labels: ["instance"] +aggregate_cleanup: + policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/binary_arithmetic_scalar.yaml b/asap-planner-rs/tests/comparison/test_data/configs/binary_arithmetic_scalar.yaml new file mode 100644 index 0000000..35e7dd3 --- /dev/null +++ b/asap-planner-rs/tests/comparison/test_data/configs/binary_arithmetic_scalar.yaml @@ -0,0 +1,13 @@ +query_groups: + - id: 1 + queries: + - "rate(errors_total[5m]) * 100" + repetition_delay: 300 + controller_options: + accuracy_sla: 0.99 + latency_sla: 1.0 +metrics: + - metric: "errors_total" + labels: ["instance", "job"] +aggregate_cleanup: + policy: "read_based" diff --git a/asap-planner-rs/tests/integration.rs b/asap-planner-rs/tests/integration.rs index a7f39ac..3fecdbc 100644 --- a/asap-planner-rs/tests/integration.rs +++ b/asap-planner-rs/tests/integration.rs @@ -430,6 +430,73 @@ fn temporal_overlapping_cleanup_param_equals_range_over_repeat() { ); } +// --- Binary arithmetic tests --- + +#[test] +fn binary_arithmetic_produces_two_leaf_configs() { + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/binary_arithmetic.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + // Two arms → two streaming aggregation configs + assert_eq!(out.streaming_aggregation_count(), 2); + // Two separate query_config entries (one per arm) + assert_eq!(out.inference_query_count(), 2); +} + +#[test] +fn binary_arithmetic_deduplicates_shared_arm() { + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/binary_arithmetic_dedup.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + // errors_total arm is shared — only 2 streaming configs total (not 3) + assert_eq!(out.streaming_aggregation_count(), 2); + // 2 query_config entries: rate(errors_total[5m]) and rate(requests_total[5m]) + assert_eq!(out.inference_query_count(), 2); +} + +#[test] +fn nested_binary_arithmetic_produces_three_leaf_configs() { + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/binary_arithmetic_nested.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + assert_eq!(out.streaming_aggregation_count(), 3); + assert_eq!(out.inference_query_count(), 3); +} + +#[test] +fn binary_arithmetic_scalar_constant_produces_one_leaf_config() { + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/binary_arithmetic_scalar.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + // Only the vector arm needs a streaming config; 100 is a literal + assert_eq!(out.streaming_aggregation_count(), 1); + assert_eq!(out.inference_query_count(), 1); +} + +#[test] +fn binary_arithmetic_with_non_acceleratable_arm_produces_no_configs() { + let c = Controller::from_file( + Path::new("tests/comparison/test_data/configs/binary_arithmetic_non_acceleratable.yaml"), + arroyo_opts(), + ) + .unwrap(); + let out = c.generate().unwrap(); + assert_eq!(out.streaming_aggregation_count(), 0); + assert_eq!(out.inference_query_count(), 0); +} + #[test] fn temporal_overlapping_rate_increase_deduped() { // rate and increase produce identical MultipleIncrease configs → 1 streaming entry shared, diff --git a/asap-query-engine/src/engines/logical/plan_builder.rs b/asap-query-engine/src/engines/logical/plan_builder.rs index 400c374..93c646b 100644 --- a/asap-query-engine/src/engines/logical/plan_builder.rs +++ b/asap-query-engine/src/engines/logical/plan_builder.rs @@ -6,10 +6,15 @@ use arrow::datatypes::{DataType, Field}; use datafusion::common::{DFSchema, DFSchemaRef}; use datafusion::error::DataFusionError; -use datafusion::logical_expr::{Extension, LogicalPlan}; +use datafusion::logical_expr::{ + binary_expr, Expr as DFExpr, Extension, JoinType, LogicalPlan, LogicalPlanBuilder, Operator, + SubqueryAlias, +}; +use datafusion::prelude::{col, lit}; use datafusion_summary_library::{ InferOperation, PrecomputedSummaryRead, SketchType, SummaryInfer, SummaryMergeMultiple, }; +use promql_parser::parser::token::{self, T_ADD, T_DIV, T_MOD, T_MUL, T_POW, T_SUB}; use promql_utilities::query_logics::enums::Statistic; use std::sync::Arc; @@ -242,6 +247,109 @@ impl QueryExecutionContext { } } +// ============================================================================ +// Binary arithmetic plan builders (standalone functions, not on impl block) +// ============================================================================ + +/// Map a PromQL `TokenType` to the corresponding DataFusion `Operator`. +/// Returns an error for non-arithmetic operators. +pub fn token_type_to_df_operator(op: &token::TokenType) -> Result { + match op.id() { + id if id == T_ADD => Ok(Operator::Plus), + id if id == T_SUB => Ok(Operator::Minus), + id if id == T_MUL => Ok(Operator::Multiply), + id if id == T_DIV => Ok(Operator::Divide), + id if id == T_MOD => Ok(Operator::Modulo), + id if id == T_POW => Ok(Operator::BitwiseXor), // Note: bitwise XOR used as proxy for ^ + _ => Err(DataFusionError::Plan(format!( + "Unsupported binary operator for arithmetic plan: {}", + op + ))), + } +} + +/// Builds a DataFusion logical plan for a vector op vector binary expression: +/// +/// ```text +/// Projection(lhs.label1 AS label1, ..., lhs.value OP rhs.value AS value) +/// └── Join(inner, on = label_columns) +/// ├── SubqueryAlias("lhs") └── lhs_plan +/// └── SubqueryAlias("rhs") └── rhs_plan +/// ``` +/// +/// The `label_columns` are the label names shared by both sides; the join is +/// an inner join on those columns. +pub fn build_binary_vector_plan( + lhs_plan: LogicalPlan, + rhs_plan: LogicalPlan, + op: &token::TokenType, + label_columns: Vec, +) -> Result { + let df_op = token_type_to_df_operator(op)?; + + // Wrap each side in a SubqueryAlias so columns are qualified (lhs.x, rhs.x) + let lhs_aliased = SubqueryAlias::try_new(Arc::new(lhs_plan), "lhs")?; + let rhs_aliased = SubqueryAlias::try_new(Arc::new(rhs_plan), "rhs")?; + + // Build the join keys: qualified column names for each label column. + // The `join` function expects Vec> (i.e. qualified col names). + let join_keys_left: Vec = label_columns.iter().map(|c| format!("lhs.{}", c)).collect(); + let join_keys_right: Vec = label_columns.iter().map(|c| format!("rhs.{}", c)).collect(); + + let joined_plan = LogicalPlanBuilder::from(LogicalPlan::SubqueryAlias(lhs_aliased)) + .join( + LogicalPlan::SubqueryAlias(rhs_aliased), + JoinType::Inner, + (join_keys_left, join_keys_right), + None, + )? + .build()?; + + // Projection: pass through label columns from lhs, compute value = lhs.value OP rhs.value + let mut proj_exprs: Vec = label_columns + .iter() + .map(|c| col(format!("lhs.{}", c)).alias(c.as_str())) + .collect(); + let value_expr = binary_expr(col("lhs.value"), df_op, col("rhs.value")).alias("value"); + proj_exprs.push(value_expr); + + LogicalPlanBuilder::from(joined_plan) + .project(proj_exprs)? + .build() +} + +/// Builds a DataFusion logical plan for a scalar op vector (or vector op scalar) expression: +/// +/// ```text +/// Projection(label1, ..., scalar OP value AS value) +/// └── vector_plan +/// ``` +/// +/// If `scalar_on_left` is true the expression is `scalar OP value`; +/// otherwise it is `value OP scalar`. +pub fn build_scalar_plan( + vector_plan: LogicalPlan, + scalar: f64, + op: &token::TokenType, + scalar_on_left: bool, + label_columns: Vec, +) -> Result { + let df_op = token_type_to_df_operator(op)?; + + let value_expr = if scalar_on_left { + binary_expr(lit(scalar), df_op, col("value")).alias("value") + } else { + binary_expr(col("value"), df_op, lit(scalar)).alias("value") + }; + + let mut proj_exprs: Vec = label_columns.iter().map(|c| col(c.as_str())).collect(); + proj_exprs.push(value_expr); + + LogicalPlanBuilder::from(vector_plan) + .project(proj_exprs)? + .build() +} + #[cfg(test)] mod tests { use super::*; diff --git a/asap-query-engine/src/engines/simple_engine.rs b/asap-query-engine/src/engines/simple_engine.rs index 5f431d5..92e0e7c 100644 --- a/asap-query-engine/src/engines/simple_engine.rs +++ b/asap-query-engine/src/engines/simple_engine.rs @@ -983,6 +983,500 @@ impl SimpleEngine { Ok(results) } + /// Executes a pre-built DataFusion logical plan and returns results. + /// + /// This is the shared execution kernel used by both `execute_plan` (for single-metric + /// queries) and the binary arithmetic dispatch path. + pub async fn execute_logical_plan( + &self, + logical_plan: datafusion::logical_expr::LogicalPlan, + label_names: Vec, + metric: &str, + statistic: &Statistic, + ) -> Result, String> { + use datafusion::execution::context::SessionContext; + use datafusion::physical_plan::collect; + + use super::physical::conversion::record_batch_to_result_map; + + // Create session context with our custom extension planner + let session_ctx = SessionContext::new(); + #[allow(deprecated)] + let state = session_ctx.state().with_query_planner(std::sync::Arc::new( + super::physical::CustomQueryPlanner::new(self.store.clone()), + )); + + let physical_plan = state + .create_physical_plan(&logical_plan) + .await + .map_err(|e| format!("Failed to create physical plan: {}", e))?; + + let task_ctx = session_ctx.task_ctx(); + let batches = collect(physical_plan, task_ctx) + .await + .map_err(|e| format!("Failed to execute plan: {}", e))?; + + let label_name_strs: Vec<&str> = label_names.iter().map(String::as_str).collect(); + let mut all_results: HashMap, f64> = HashMap::new(); + for batch in &batches { + let batch_results = record_batch_to_result_map(batch, &label_name_strs, "value") + .map_err(|e| format!("Failed to convert results: {}", e))?; + all_results.extend(batch_results); + } + + Ok(self.format_final_results(all_results, statistic, metric, false)) + } + + /// Finds a query config by structurally comparing `arm_ast` against each + /// config's parsed query. + /// + /// Both the arm AST and each config's query string are first normalized to + /// the canonical `Display` form produced by `promql_parser`. This ensures + /// that user-written variants like `"sum(x) by (lbl)"` and the parser's + /// canonical `"sum by (lbl) (x)"` compare equal. + pub fn find_query_config_promql_structural( + &self, + arm_ast: &promql_parser::parser::Expr, + ) -> Option<&QueryConfig> { + let arm_canonical = format!("{}", arm_ast); + self.inference_config.query_configs.iter().find(|config| { + let config_canonical = promql_parser::parser::parse(&config.query) + .map(|ast| format!("{}", ast)) + .unwrap_or_default(); + config_canonical == arm_canonical + }) + } + + /// Variant of `build_query_execution_context_promql` that accepts a pre-parsed + /// AST node and a pre-found `QueryConfig`, avoiding redundant parsing and lookup. + pub fn build_query_execution_context_from_ast( + &self, + arm_ast: &promql_parser::parser::Expr, + query_config: &QueryConfig, + time: f64, + ) -> Option { + let query_time = Self::convert_query_time_to_data_time(time); + + let mut found_match = None; + for (pattern_type, patterns) in &self.controller_patterns { + for pattern in patterns { + let match_result = pattern.matches(arm_ast); + if match_result.matches { + found_match = Some((*pattern_type, match_result)); + break; + } + } + if found_match.is_some() { + break; + } + } + + let (query_pattern_type, match_result) = found_match?; + + let (metric, spatial_filter) = get_metric_and_spatial_filter(&match_result); + + let promql_schema = match &self.inference_config.schema { + SchemaConfig::PromQL(schema) => schema, + _ => return None, + }; + let all_labels = promql_schema + .get_labels(&metric) + .cloned() + .unwrap_or_else(|| { + warn!("No metric configuration found for '{}'", metric); + panic!("No metric configuration found"); + }); + + let mut query_output_labels = match query_pattern_type { + QueryPatternType::OnlyTemporal => all_labels.clone(), + QueryPatternType::OnlySpatial => { + get_spatial_aggregation_output_labels(&match_result, &all_labels) + } + QueryPatternType::OneTemporalOneSpatial => { + let temporal_aggregation = match_result.get_function_name().unwrap(); + let spatial_aggregation = match_result.get_aggregation_op().unwrap(); + match get_is_collapsable(&temporal_aggregation, &spatial_aggregation) { + false => all_labels.clone(), + true => get_spatial_aggregation_output_labels(&match_result, &all_labels), + } + } + }; + + let timestamps = + self.calculate_query_timestamps_promql(query_time, query_pattern_type, &match_result); + + let statistics_to_compute = get_statistics_to_compute(query_pattern_type, &match_result); + if statistics_to_compute.len() != 1 { + panic!( + "Expected exactly one statistic, found {}", + statistics_to_compute.len() + ); + } + let statistic_to_compute = statistics_to_compute.first().unwrap(); + + if *statistic_to_compute == Statistic::Topk { + let mut new_labels = vec!["__name__".to_string()]; + new_labels.extend(query_output_labels.labels); + query_output_labels = KeyByLabelNames::new(new_labels); + } + + let query_kwargs = self + .build_query_kwargs_promql(statistic_to_compute, query_pattern_type, &match_result) + .map_err(|e| { + warn!("{}", e); + e + }) + .ok()?; + + let metadata = QueryMetadata { + query_output_labels: query_output_labels.clone(), + statistic_to_compute: *statistic_to_compute, + query_kwargs, + }; + + let agg_info = self.get_aggregation_id_info(query_config); + + let query_plan = self + .create_store_query_plan(&metric, ×tamps, &agg_info) + .map_err(|e| { + warn!("Failed to create store query plan: {}", e); + e + }) + .ok()?; + + let do_merge = query_pattern_type == QueryPatternType::OnlyTemporal + || query_pattern_type == QueryPatternType::OneTemporalOneSpatial; + + let grouping_labels = self + .streaming_config + .get_aggregation_config(agg_info.aggregation_id_for_value) + .map(|config| config.grouping_labels.clone()) + .unwrap_or_else(|| query_output_labels.clone()); + + let aggregated_labels = self + .streaming_config + .get_aggregation_config(agg_info.aggregation_id_for_key) + .map(|config| config.aggregated_labels.clone()) + .unwrap_or_else(KeyByLabelNames::empty); + + Some(QueryExecutionContext { + metric: metric.clone(), + metadata, + store_plan: query_plan, + agg_info, + do_merge, + spatial_filter, + query_time, + grouping_labels, + aggregated_labels, + }) + } + + /// Recursively builds a DataFusion logical plan for one arm of a binary + /// arithmetic expression. + /// + /// - Leaf arm (supported PromQL pattern): look up config structurally, build + /// context, return its `to_logical_plan()` together with the output label names. + /// - Binary arm: recursively build both sub-arms and combine with + /// `build_binary_vector_plan`. + /// - Scalar literal: returns `None` (handled by the caller separately). + fn build_arm_logical_plan( + &self, + arm_ast: &promql_parser::parser::Expr, + time: f64, + ) -> Option<(datafusion::logical_expr::LogicalPlan, Vec)> { + use crate::engines::logical::plan_builder::build_binary_vector_plan; + use promql_parser::parser::Expr; + + match arm_ast { + Expr::NumberLiteral(_) => None, // caller handles scalars + Expr::Paren(paren) => self.build_arm_logical_plan(&paren.expr, time), + Expr::Binary(binary) => { + // Nested binary expression — recurse on both sides + let (lhs_plan, lhs_labels) = self.build_arm_logical_plan(&binary.lhs, time)?; + let (rhs_plan, _) = self.build_arm_logical_plan(&binary.rhs, time)?; + let combined = + build_binary_vector_plan(lhs_plan, rhs_plan, &binary.op, lhs_labels.clone()) + .ok()?; + Some((combined, lhs_labels)) + } + other => { + // Leaf pattern: structural config lookup + context + plan + let config = self.find_query_config_promql_structural(other)?; + let ctx = self.build_query_execution_context_from_ast(other, config, time)?; + let label_names = ctx.metadata.query_output_labels.labels.clone(); + let plan = ctx.to_logical_plan().ok()?; + Some((plan, label_names)) + } + } + } + + /// Handles a binary arithmetic PromQL expression by building a combined + /// DataFusion plan (vector–vector join or scalar projection) and executing it. + /// + /// Returns `None` if any arm is not acceleratable (caller falls back to Prometheus). + fn handle_binary_expr_promql( + &self, + ast: &promql_parser::parser::Expr, + time: f64, + ) -> Option<(KeyByLabelNames, QueryResult)> { + use crate::engines::logical::plan_builder::{build_binary_vector_plan, build_scalar_plan}; + use promql_parser::parser::Expr; + + let query_time = Self::convert_query_time_to_data_time(time); + + let binary = match ast { + Expr::Binary(b) => b, + _ => return None, + }; + + let lhs = binary.lhs.as_ref(); + let rhs = binary.rhs.as_ref(); + let op = &binary.op; + + // Check for scalar on right + if let Expr::NumberLiteral(nl) = rhs { + let (vector_plan, label_names) = self.build_arm_logical_plan(lhs, time)?; + let combined = + build_scalar_plan(vector_plan, nl.val, op, false, label_names.clone()).ok()?; + let results = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(self.execute_logical_plan( + combined, + label_names.clone(), + "", + &Statistic::Sum, + )) + }) + .ok()?; + let output_labels = KeyByLabelNames::new(label_names); + return Some((output_labels, QueryResult::vector(results, query_time))); + } + + // Check for scalar on left + if let Expr::NumberLiteral(nl) = lhs { + let (vector_plan, label_names) = self.build_arm_logical_plan(rhs, time)?; + let combined = + build_scalar_plan(vector_plan, nl.val, op, true, label_names.clone()).ok()?; + let results = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(self.execute_logical_plan( + combined, + label_names.clone(), + "", + &Statistic::Sum, + )) + }) + .ok()?; + let output_labels = KeyByLabelNames::new(label_names); + return Some((output_labels, QueryResult::vector(results, query_time))); + } + + // Vector–vector + let (lhs_plan, lhs_labels) = self.build_arm_logical_plan(lhs, time)?; + let (rhs_plan, _) = self.build_arm_logical_plan(rhs, time)?; + let combined = build_binary_vector_plan(lhs_plan, rhs_plan, op, lhs_labels.clone()).ok()?; + let results = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(self.execute_logical_plan( + combined, + lhs_labels.clone(), + "", + &Statistic::Sum, + )) + }) + .ok()?; + let output_labels = KeyByLabelNames::new(lhs_labels); + Some((output_labels, QueryResult::vector(results, query_time))) + } + + /// Applies a PromQL binary arithmetic operator to two f64 values. + fn apply_range_binary_op( + op: &promql_parser::parser::token::TokenType, + lhs: f64, + rhs: f64, + ) -> f64 { + use promql_parser::parser::token::{T_ADD, T_DIV, T_MOD, T_MUL, T_POW, T_SUB}; + let id = op.id(); + if id == T_ADD { + lhs + rhs + } else if id == T_SUB { + lhs - rhs + } else if id == T_MUL { + lhs * rhs + } else if id == T_DIV { + lhs / rhs + } else if id == T_MOD { + lhs % rhs + } else if id == T_POW { + lhs.powf(rhs) + } else { + f64::NAN + } + } + + /// Recursively builds a range execution context for one arm of a binary arithmetic expression. + fn build_arm_range_context( + &self, + arm_ast: &promql_parser::parser::Expr, + start: f64, + end: f64, + step: f64, + ) -> Option<(RangeQueryExecutionContext, Vec)> { + use promql_parser::parser::Expr; + + match arm_ast { + Expr::NumberLiteral(_) => None, // caller handles scalars + Expr::Paren(paren) => self.build_arm_range_context(&paren.expr, start, end, step), + other => { + let config = self.find_query_config_promql_structural(other)?; + let base_context = + self.build_query_execution_context_from_ast(other, config, end)?; + let label_names = base_context.metadata.query_output_labels.labels.clone(); + + let start_ms = Self::convert_query_time_to_data_time(start); + let end_ms = Self::convert_query_time_to_data_time(end); + let step_ms = (step * 1000.0) as u64; + + let tumbling_window_ms = self + .streaming_config + .get_aggregation_config(base_context.agg_info.aggregation_id_for_value) + .map(|c| c.window_size * 1000)?; + + self.validate_range_query_params(start_ms, end_ms, step_ms, tumbling_window_ms) + .map_err(|e| { + warn!("Range arm query validation failed: {}", e); + e + }) + .ok()?; + + let lookback_ms = base_context.store_plan.values_query.end_timestamp + - base_context.store_plan.values_query.start_timestamp; + + let buckets_per_step = (step_ms / tumbling_window_ms) as usize; + let lookback_bucket_count = (lookback_ms / tumbling_window_ms) as usize; + + let mut extended_store_plan = base_context.store_plan.clone(); + extended_store_plan.values_query.start_timestamp = + start_ms.saturating_sub(lookback_ms); + extended_store_plan.values_query.end_timestamp = end_ms; + extended_store_plan.values_query.is_exact_query = false; + + let range_context = RangeQueryExecutionContext { + base: QueryExecutionContext { + store_plan: extended_store_plan, + ..base_context + }, + range_params: RangeQueryParams { + start: start_ms, + end: end_ms, + step: step_ms, + }, + buckets_per_step, + lookback_bucket_count, + tumbling_window_ms, + }; + + Some((range_context, label_names)) + } + } + } + + /// Handles a binary arithmetic PromQL expression for range queries. + /// + /// Evaluates each arm independently over the full range, then joins the + /// resulting series by label key and applies the arithmetic operator + /// sample-by-sample at matching timestamps. + fn handle_binary_expr_range_promql( + &self, + ast: &promql_parser::parser::Expr, + start: f64, + end: f64, + step: f64, + ) -> Option<(KeyByLabelNames, QueryResult)> { + use promql_parser::parser::Expr; + + let binary = match ast { + Expr::Binary(b) => b, + _ => return None, + }; + + let lhs = binary.lhs.as_ref(); + let rhs = binary.rhs.as_ref(); + let op = &binary.op; + + // Scalar on right: evaluate vector arm, apply scalar per sample + if let Expr::NumberLiteral(nl) = rhs { + let (lhs_ctx, lhs_labels) = self.build_arm_range_context(lhs, start, end, step)?; + let lhs_results = self.execute_range_query_pipeline(&lhs_ctx).ok()?; + let scalar = nl.val; + let combined: Vec = lhs_results + .into_iter() + .map(|mut elem| { + for s in &mut elem.samples { + s.value = Self::apply_range_binary_op(op, s.value, scalar); + } + elem + }) + .collect(); + let output_labels = KeyByLabelNames::new(lhs_labels); + return Some((output_labels, QueryResult::matrix(combined))); + } + + // Scalar on left: evaluate vector arm, apply scalar per sample + if let Expr::NumberLiteral(nl) = lhs { + let (rhs_ctx, rhs_labels) = self.build_arm_range_context(rhs, start, end, step)?; + let rhs_results = self.execute_range_query_pipeline(&rhs_ctx).ok()?; + let scalar = nl.val; + let combined: Vec = rhs_results + .into_iter() + .map(|mut elem| { + for s in &mut elem.samples { + s.value = Self::apply_range_binary_op(op, scalar, s.value); + } + elem + }) + .collect(); + let output_labels = KeyByLabelNames::new(rhs_labels); + return Some((output_labels, QueryResult::matrix(combined))); + } + + // Vector-vector: evaluate both arms, join by label key, apply op per matching timestamp + let (lhs_ctx, lhs_labels) = self.build_arm_range_context(lhs, start, end, step)?; + let (rhs_ctx, _) = self.build_arm_range_context(rhs, start, end, step)?; + let lhs_results = self.execute_range_query_pipeline(&lhs_ctx).ok()?; + let rhs_results = self.execute_range_query_pipeline(&rhs_ctx).ok()?; + + // Build lookup: label_key -> {timestamp -> value} for rhs + let mut rhs_map: HashMap> = HashMap::new(); + for elem in rhs_results { + let ts_map: HashMap = elem + .samples + .iter() + .map(|s| (s.timestamp, s.value)) + .collect(); + rhs_map.insert(elem.labels, ts_map); + } + + let mut combined: Vec = Vec::new(); + for lhs_elem in lhs_results { + if let Some(rhs_ts_map) = rhs_map.get(&lhs_elem.labels) { + let mut new_elem = RangeVectorElement::new(lhs_elem.labels.clone()); + for s in &lhs_elem.samples { + if let Some(&rhs_val) = rhs_ts_map.get(&s.timestamp) { + new_elem.add_sample( + s.timestamp, + Self::apply_range_binary_op(op, s.value, rhs_val), + ); + } + } + if !new_elem.samples.is_empty() { + combined.push(new_elem); + } + } + } + + let output_labels = KeyByLabelNames::new(lhs_labels); + Some((output_labels, QueryResult::matrix(combined))) + } + /// Formats unformatted results into final InstantVectorElement format /// For topk queries (when enabled), sorts by value and prepends metric name to keys fn format_final_results( @@ -1906,6 +2400,20 @@ impl SimpleEngine { let query_start_time = Instant::now(); debug!("Handling query: {} at time {}", query, time); + // Check for binary arithmetic before attempting single-query dispatch. + // Binary expressions won't have a matching query_config, so we handle them here. + if let Ok(ast) = promql_parser::parser::parse(&query) { + if matches!(&ast, promql_parser::parser::Expr::Binary(_)) { + let result = self.handle_binary_expr_promql(&ast, time); + let total_query_duration = query_start_time.elapsed(); + debug!( + "Binary arithmetic query handling took: {:.2}ms", + total_query_duration.as_secs_f64() * 1000.0 + ); + return result; + } + } + let context = self.build_query_execution_context_promql(query, time)?; debug!( @@ -2861,6 +3369,19 @@ impl SimpleEngine { query, start, end, step ); + // Check for binary arithmetic before attempting single-query dispatch. + if let Ok(ast) = promql_parser::parser::parse(&query) { + if matches!(&ast, promql_parser::parser::Expr::Binary(_)) { + let result = self.handle_binary_expr_range_promql(&ast, start, end, step); + let total_duration = query_start_time.elapsed(); + debug!( + "Binary arithmetic range query handling took: {:.2}ms", + total_duration.as_secs_f64() * 1000.0 + ); + return result; + } + } + let context = self.build_range_query_execution_context_promql(query, start, end, step)?; // Execute range query pipeline diff --git a/asap-query-engine/src/tests/datafusion/dispatch_arithmetic_tests.rs b/asap-query-engine/src/tests/datafusion/dispatch_arithmetic_tests.rs new file mode 100644 index 0000000..c020c7d --- /dev/null +++ b/asap-query-engine/src/tests/datafusion/dispatch_arithmetic_tests.rs @@ -0,0 +1,139 @@ +//! Dispatch-level tests for binary arithmetic PromQL handling. +//! +//! Tests that `handle_query_promql` routes binary expressions correctly: +//! returning `Some` for acceleratable queries and `None` for non-acceleratable +//! ones (graceful fallback to Prometheus). + +#[cfg(test)] +mod tests { + use crate::precompute_operators::sum_accumulator::SumAccumulator; + use crate::tests::test_utilities::engine_factories::{ + create_engine_single_pop, create_engine_two_metrics, + }; + + const QUERY_TIME: f64 = 1000.0; + + #[tokio::test(flavor = "multi_thread")] + async fn test_handle_query_promql_binary_returns_result() { + let engine = create_engine_two_metrics( + "errors_total", + "SumAccumulator", + vec!["host"], + vec![( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(100.0)) as Box, + )], + "sum(errors_total) by (host)", + "requests_total", + "SumAccumulator", + vec!["host"], + vec![( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(200.0)) as Box, + )], + "sum(requests_total) by (host)", + ); + + let result = engine.handle_query_promql( + "sum(errors_total) by (host) / sum(requests_total) by (host)".to_string(), + QUERY_TIME, + ); + assert!(result.is_some(), "Binary query should return Some"); + let (labels, qr) = result.unwrap(); + assert!(!labels.labels.is_empty(), "Should have output label names"); + let elements = match qr { + crate::engines::query_result::QueryResult::Vector(iv) => iv.values, + _ => panic!("Expected vector result"), + }; + assert_eq!(elements.len(), 1); + assert!((elements[0].value - 0.5).abs() < 1e-10); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_handle_query_promql_non_acceleratable_arm_returns_none() { + // Only requests_total is configured; foo() is not a known function. + let engine = create_engine_single_pop( + "requests_total", + "SumAccumulator", + vec!["host"], + vec![( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(200.0)), + )], + "sum(requests_total) by (host)", + ); + + // foo() is not a supported PromQL function → arm lookup fails → returns None + let result = engine.handle_query_promql( + "foo(errors_total[5m]) / sum(requests_total) by (host)".to_string(), + QUERY_TIME, + ); + assert!( + result.is_none(), + "Should return None for non-acceleratable arm (graceful fallback)" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_handle_query_promql_scalar_binary_returns_result() { + let engine = create_engine_single_pop( + "errors_total", + "SumAccumulator", + vec!["host"], + vec![( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(7.0)), + )], + "sum(errors_total) by (host)", + ); + + let result = + engine.handle_query_promql("sum(errors_total) by (host) * 100".to_string(), QUERY_TIME); + assert!(result.is_some(), "Scalar binary should return Some"); + let (_, qr) = result.unwrap(); + let elements = match qr { + crate::engines::query_result::QueryResult::Vector(iv) => iv.values, + _ => panic!("Expected vector result"), + }; + assert_eq!(elements.len(), 1); + assert!((elements[0].value - 700.0).abs() < 1e-10, "7 * 100 = 700"); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_handle_query_promql_single_metric_still_works() { + // Regression: single-metric queries continue to work after binary dispatch is wired in. + let engine = create_engine_single_pop( + "http_requests", + "SumAccumulator", + vec!["host"], + vec![ + ( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(100.0)), + ), + ( + Some(vec!["host-b".to_string()]), + Box::new(SumAccumulator::with_sum(200.0)), + ), + ], + "sum(http_requests) by (host)", + ); + + let result = + engine.handle_query_promql("sum(http_requests) by (host)".to_string(), QUERY_TIME); + assert!( + result.is_some(), + "Single-metric query should still work after binary dispatch" + ); + let (_, qr) = result.unwrap(); + let elements = match qr { + crate::engines::query_result::QueryResult::Vector(iv) => iv.values, + _ => panic!("Expected vector result"), + }; + assert_eq!(elements.len(), 2); + let mut values: Vec = elements.iter().map(|e| e.value).collect(); + values.sort_by(|a, b| a.partial_cmp(b).unwrap()); + assert!((values[0] - 100.0).abs() < 1e-10); + assert!((values[1] - 200.0).abs() < 1e-10); + } +} diff --git a/asap-query-engine/src/tests/datafusion/mod.rs b/asap-query-engine/src/tests/datafusion/mod.rs index 563954c..0d1df2f 100644 --- a/asap-query-engine/src/tests/datafusion/mod.rs +++ b/asap-query-engine/src/tests/datafusion/mod.rs @@ -4,7 +4,11 @@ //! and accumulator serialization that back the DataFusion-based query path. pub mod accumulator_serde_tests; +pub mod dispatch_arithmetic_tests; +pub mod plan_builder_binary_tests; pub mod plan_builder_regression_tests; +pub mod plan_execution_arithmetic_tests; pub mod plan_execution_dual_input_tests; pub mod plan_execution_temporal_tests; pub mod plan_execution_tests; +pub mod structural_matching_tests; diff --git a/asap-query-engine/src/tests/datafusion/plan_builder_binary_tests.rs b/asap-query-engine/src/tests/datafusion/plan_builder_binary_tests.rs new file mode 100644 index 0000000..0ecdbcf --- /dev/null +++ b/asap-query-engine/src/tests/datafusion/plan_builder_binary_tests.rs @@ -0,0 +1,189 @@ +//! Binary plan builder tests. +//! +//! Tests that `build_binary_vector_plan` and `build_scalar_plan` produce correct +//! DataFusion logical plan structures. + +#[cfg(test)] +mod tests { + use crate::engines::logical::plan_builder::{build_binary_vector_plan, build_scalar_plan}; + use crate::engines::simple_engine::{ + AggregationIdInfo, QueryExecutionContext, QueryMetadata, StoreQueryParams, StoreQueryPlan, + }; + use datafusion::logical_expr::LogicalPlan; + use promql_parser::parser::token::{TokenType, T_ADD, T_DIV, T_MOD, T_MUL, T_POW, T_SUB}; + use promql_utilities::data_model::KeyByLabelNames; + use promql_utilities::query_logics::enums::Statistic; + use std::collections::HashMap; + + fn make_context( + metric: &str, + statistic: Statistic, + labels: Vec<&str>, + ) -> QueryExecutionContext { + let label_strings: Vec = labels.into_iter().map(String::from).collect(); + QueryExecutionContext { + metric: metric.to_string(), + metadata: QueryMetadata { + query_output_labels: KeyByLabelNames::new(label_strings.clone()), + statistic_to_compute: statistic, + query_kwargs: HashMap::new(), + }, + store_plan: StoreQueryPlan { + values_query: StoreQueryParams { + metric: metric.to_string(), + aggregation_id: 1, + start_timestamp: 1000, + end_timestamp: 2000, + is_exact_query: true, + }, + keys_query: None, + }, + agg_info: AggregationIdInfo { + aggregation_id_for_key: 1, + aggregation_id_for_value: 1, + aggregation_type_for_key: "SumAggregator".to_string(), + aggregation_type_for_value: "SumAggregator".to_string(), + }, + do_merge: false, + spatial_filter: String::new(), + query_time: 2000, + grouping_labels: KeyByLabelNames::new(label_strings.clone()), + aggregated_labels: KeyByLabelNames::empty(), + } + } + + fn collect_node_names(plan: &LogicalPlan) -> Vec { + let mut names = Vec::new(); + collect_recursive(plan, &mut names); + names + } + + fn collect_recursive(plan: &LogicalPlan, names: &mut Vec) { + match plan { + LogicalPlan::Extension(ext) => { + names.push(ext.node.name().to_string()); + for input in ext.node.inputs() { + collect_recursive(input, names); + } + } + LogicalPlan::Projection(p) => { + names.push("Projection".to_string()); + collect_recursive(&p.input, names); + } + LogicalPlan::Join(j) => { + names.push("Join".to_string()); + collect_recursive(&j.left, names); + collect_recursive(&j.right, names); + } + LogicalPlan::SubqueryAlias(a) => { + names.push("SubqueryAlias".to_string()); + collect_recursive(&a.input, names); + } + other => { + names.push( + format!("{:?}", other) + .split('(') + .next() + .unwrap_or("Unknown") + .to_string(), + ); + } + } + } + + fn contains_node(plan: &LogicalPlan, name: &str) -> bool { + collect_node_names(plan).iter().any(|n| n == name) + } + + #[test] + fn test_binary_vector_plan_structure_divide() { + let lhs_ctx = make_context("errors", Statistic::Sum, vec!["host"]); + let rhs_ctx = make_context("requests", Statistic::Sum, vec!["host"]); + let lhs_plan = lhs_ctx.to_logical_plan().unwrap(); + let rhs_plan = rhs_ctx.to_logical_plan().unwrap(); + + let op = TokenType::new(T_DIV); + let plan = + build_binary_vector_plan(lhs_plan, rhs_plan, &op, vec!["host".to_string()]).unwrap(); + + let names = collect_node_names(&plan); + assert_eq!(names[0], "Projection", "Root should be Projection"); + assert!(contains_node(&plan, "Join"), "Plan should contain a Join"); + let alias_count = names.iter().filter(|n| *n == "SubqueryAlias").count(); + assert_eq!( + alias_count, 2, + "Plan should contain two SubqueryAlias nodes" + ); + } + + #[test] + fn test_binary_vector_plan_all_operators() { + let ops = [T_ADD, T_SUB, T_MUL, T_DIV, T_POW, T_MOD]; + for op_id in ops { + let lhs_ctx = make_context("metric_a", Statistic::Sum, vec!["host"]); + let rhs_ctx = make_context("metric_b", Statistic::Sum, vec!["host"]); + let lhs_plan = lhs_ctx.to_logical_plan().unwrap(); + let rhs_plan = rhs_ctx.to_logical_plan().unwrap(); + + let op = TokenType::new(op_id); + let result = + build_binary_vector_plan(lhs_plan, rhs_plan, &op, vec!["host".to_string()]); + assert!( + result.is_ok(), + "Operator {:?} should produce a valid plan", + op + ); + let names = collect_node_names(&result.unwrap()); + assert_eq!(names[0], "Projection"); + } + } + + #[test] + fn test_scalar_right_plan_structure() { + let ctx = make_context("errors", Statistic::Sum, vec!["host"]); + let vector_plan = ctx.to_logical_plan().unwrap(); + + let op = TokenType::new(T_MUL); + let plan = + build_scalar_plan(vector_plan, 100.0, &op, false, vec!["host".to_string()]).unwrap(); + + let names = collect_node_names(&plan); + assert_eq!(names[0], "Projection", "Root should be Projection"); + assert!( + !contains_node(&plan, "Join"), + "Scalar plan should not have a Join" + ); + assert!( + !contains_node(&plan, "SubqueryAlias"), + "Scalar plan should not have SubqueryAlias" + ); + } + + #[test] + fn test_scalar_left_plan_structure() { + let ctx = make_context("success", Statistic::Sum, vec!["host"]); + let vector_plan = ctx.to_logical_plan().unwrap(); + + let op = TokenType::new(T_SUB); + let plan = + build_scalar_plan(vector_plan, 1.0, &op, true, vec!["host".to_string()]).unwrap(); + + let names = collect_node_names(&plan); + assert_eq!(names[0], "Projection"); + assert!(!contains_node(&plan, "Join")); + } + + #[test] + fn test_scalar_left_division_plan_structure() { + // 1.0 / rate(metric[5m]) — scalar on left with Div + let ctx = make_context("metric", Statistic::Sum, vec!["host"]); + let vector_plan = ctx.to_logical_plan().unwrap(); + + let op = TokenType::new(T_DIV); + let result = build_scalar_plan(vector_plan, 1.0, &op, true, vec!["host".to_string()]); + assert!( + result.is_ok(), + "scalar-left division plan should build without error" + ); + } +} diff --git a/asap-query-engine/src/tests/datafusion/plan_execution_arithmetic_tests.rs b/asap-query-engine/src/tests/datafusion/plan_execution_arithmetic_tests.rs new file mode 100644 index 0000000..b196841 --- /dev/null +++ b/asap-query-engine/src/tests/datafusion/plan_execution_arithmetic_tests.rs @@ -0,0 +1,368 @@ +//! Binary arithmetic plan execution integration tests. +//! +//! Verify that binary arithmetic queries (vector/vector and scalar/vector) +//! produce numerically correct results when executed end-to-end through +//! `handle_binary_expr_promql` via DataFusion. + +#[cfg(test)] +mod tests { + use crate::precompute_operators::sum_accumulator::SumAccumulator; + use crate::tests::test_utilities::engine_factories::{ + create_engine_three_metrics, create_engine_two_metrics, + }; + + const QUERY_TIME: f64 = 1000.0; + + fn host_a_b_data( + val_a: f64, + val_b: f64, + ) -> ( + crate::tests::test_utilities::engine_factories::AccumulatorData, + crate::tests::test_utilities::engine_factories::AccumulatorData, + ) { + let data_a = vec![ + ( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(val_a)) as Box, + ), + ( + Some(vec!["host-b".to_string()]), + Box::new(SumAccumulator::with_sum(val_a / 2.0)), + ), + ]; + let data_b = vec![ + ( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(val_b)) as Box, + ), + ( + Some(vec!["host-b".to_string()]), + Box::new(SumAccumulator::with_sum(val_b / 2.0)), + ), + ]; + (data_a, data_b) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_vector_vector_divide_produces_ratio() { + // errors/host-a = 100, requests/host-a = 200 → ratio 0.5 + let (data_errors, data_requests) = host_a_b_data(100.0, 200.0); + let engine = create_engine_two_metrics( + "errors_total", + "SumAccumulator", + vec!["host"], + data_errors, + "sum(errors_total) by (host)", + "requests_total", + "SumAccumulator", + vec!["host"], + data_requests, + "sum(requests_total) by (host)", + ); + + let query = "sum(errors_total) by (host) / sum(requests_total) by (host)"; + let result = engine.handle_query_promql(query.to_string(), QUERY_TIME); + assert!(result.is_some(), "Expected Some result for binary query"); + let (_, qr) = result.unwrap(); + let elements = match qr { + crate::engines::query_result::QueryResult::Vector(iv) => iv.values, + _ => panic!("Expected vector result"), + }; + assert_eq!(elements.len(), 2, "Expected 2 result rows"); + for elem in &elements { + let approx = (elem.value - 0.5).abs(); + assert!( + approx < 1e-10, + "Expected ratio 0.5, got {} for labels {:?}", + elem.value, + elem.labels + ); + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_vector_vector_multiply() { + let (data_a, data_b) = host_a_b_data(3.0, 4.0); + let engine = create_engine_two_metrics( + "metric_a", + "SumAccumulator", + vec!["host"], + data_a, + "sum(metric_a) by (host)", + "metric_b", + "SumAccumulator", + vec!["host"], + data_b, + "sum(metric_b) by (host)", + ); + + let query = "sum(metric_a) by (host) * sum(metric_b) by (host)"; + let result = engine.handle_query_promql(query.to_string(), QUERY_TIME); + let (_, qr) = result.expect("Expected result"); + let elements = match qr { + crate::engines::query_result::QueryResult::Vector(iv) => iv.values, + _ => panic!("Expected vector result"), + }; + assert_eq!(elements.len(), 2); + // host-a: 3 * 4 = 12, host-b: 1.5 * 2.0 = 3.0 + let mut values: Vec = elements.iter().map(|e| e.value).collect(); + values.sort_by(|a, b| a.partial_cmp(b).unwrap()); + assert!((values[0] - 3.0).abs() < 1e-10); + assert!((values[1] - 12.0).abs() < 1e-10); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_vector_vector_add() { + let (data_a, data_b) = host_a_b_data(10.0, 20.0); + let engine = create_engine_two_metrics( + "metric_a", + "SumAccumulator", + vec!["host"], + data_a, + "sum(metric_a) by (host)", + "metric_b", + "SumAccumulator", + vec!["host"], + data_b, + "sum(metric_b) by (host)", + ); + + let query = "sum(metric_a) by (host) + sum(metric_b) by (host)"; + let result = engine.handle_query_promql(query.to_string(), QUERY_TIME); + let (_, qr) = result.expect("Expected result"); + let elements = match qr { + crate::engines::query_result::QueryResult::Vector(iv) => iv.values, + _ => panic!("Expected vector result"), + }; + assert_eq!(elements.len(), 2); + // host-a: 10+20=30, host-b: 5+10=15 + let mut values: Vec = elements.iter().map(|e| e.value).collect(); + values.sort_by(|a, b| a.partial_cmp(b).unwrap()); + assert!((values[0] - 15.0).abs() < 1e-10); + assert!((values[1] - 30.0).abs() < 1e-10); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_vector_vector_subtract() { + let (data_a, data_b) = host_a_b_data(50.0, 30.0); + let engine = create_engine_two_metrics( + "metric_a", + "SumAccumulator", + vec!["host"], + data_a, + "sum(metric_a) by (host)", + "metric_b", + "SumAccumulator", + vec!["host"], + data_b, + "sum(metric_b) by (host)", + ); + + let query = "sum(metric_a) by (host) - sum(metric_b) by (host)"; + let result = engine.handle_query_promql(query.to_string(), QUERY_TIME); + let (_, qr) = result.expect("Expected result"); + let elements = match qr { + crate::engines::query_result::QueryResult::Vector(iv) => iv.values, + _ => panic!("Expected vector result"), + }; + assert_eq!(elements.len(), 2); + // host-a: 50-30=20, host-b: 25-15=10 + let mut values: Vec = elements.iter().map(|e| e.value).collect(); + values.sort_by(|a, b| a.partial_cmp(b).unwrap()); + assert!((values[0] - 10.0).abs() < 1e-10); + assert!((values[1] - 20.0).abs() < 1e-10); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_vector_vector_inner_join_drops_unmatched() { + // errors has host-a and host-b; requests only has host-a + let data_errors = vec![ + ( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(100.0)) as Box, + ), + ( + Some(vec!["host-b".to_string()]), + Box::new(SumAccumulator::with_sum(50.0)), + ), + ]; + let data_requests = vec![( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(200.0)) as Box, + )]; + + let engine = create_engine_two_metrics( + "errors_total", + "SumAccumulator", + vec!["host"], + data_errors, + "sum(errors_total) by (host)", + "requests_total", + "SumAccumulator", + vec!["host"], + data_requests, + "sum(requests_total) by (host)", + ); + + let query = "sum(errors_total) by (host) / sum(requests_total) by (host)"; + let result = engine.handle_query_promql(query.to_string(), QUERY_TIME); + let (_, qr) = result.expect("Expected result"); + let elements = match qr { + crate::engines::query_result::QueryResult::Vector(iv) => iv.values, + _ => panic!("Expected vector result"), + }; + // Inner join: only host-a is present in both → 1 result + assert_eq!( + elements.len(), + 1, + "Inner join should drop unmatched label set (host-b)" + ); + assert!((elements[0].value - 0.5).abs() < 1e-10); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_scalar_right_multiply() { + // sum(errors_total) by (host) * 100 + let data = vec![( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(5.0)) as Box, + )]; + let engine = create_engine_two_metrics( + "errors_total", + "SumAccumulator", + vec!["host"], + data, + "sum(errors_total) by (host)", + // second metric not used but factory requires it; use empty data + "dummy", + "SumAccumulator", + vec!["host"], + vec![], + "sum(dummy) by (host)", + ); + + let query = "sum(errors_total) by (host) * 100"; + let result = engine.handle_query_promql(query.to_string(), QUERY_TIME); + let (_, qr) = result.expect("Expected result for scalar-right multiply"); + let elements = match qr { + crate::engines::query_result::QueryResult::Vector(iv) => iv.values, + _ => panic!("Expected vector result"), + }; + assert_eq!(elements.len(), 1); + assert!( + (elements[0].value - 500.0).abs() < 1e-10, + "5.0 * 100 = 500.0" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_scalar_left_subtract() { + // 1 - sum(success_total) by (host) + let data = vec![( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(0.9)) as Box, + )]; + let engine = create_engine_two_metrics( + "success_total", + "SumAccumulator", + vec!["host"], + data, + "sum(success_total) by (host)", + "dummy", + "SumAccumulator", + vec!["host"], + vec![], + "sum(dummy) by (host)", + ); + + let query = "1 - sum(success_total) by (host)"; + let result = engine.handle_query_promql(query.to_string(), QUERY_TIME); + let (_, qr) = result.expect("Expected result for scalar-left subtract"); + let elements = match qr { + crate::engines::query_result::QueryResult::Vector(iv) => iv.values, + _ => panic!("Expected vector result"), + }; + assert_eq!(elements.len(), 1); + assert!( + (elements[0].value - 0.1).abs() < 1e-10, + "1 - 0.9 = 0.1, got {}", + elements[0].value + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_nested_binary_expression() { + // (sum(metric_a) by (host) + sum(metric_b) by (host)) / sum(metric_c) by (host) + // host-a: a=100, b=200, c=300 → (100+200)/300 = 1.0 + // host-b: a=50, b=100, c=150 → (50+100)/150 = 1.0 + let data_a = vec![ + ( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(100.0)) as Box, + ), + ( + Some(vec!["host-b".to_string()]), + Box::new(SumAccumulator::with_sum(50.0)) as Box, + ), + ]; + let data_b = vec![ + ( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(200.0)) as Box, + ), + ( + Some(vec!["host-b".to_string()]), + Box::new(SumAccumulator::with_sum(100.0)) as Box, + ), + ]; + let data_c = vec![ + ( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(300.0)) as Box, + ), + ( + Some(vec!["host-b".to_string()]), + Box::new(SumAccumulator::with_sum(150.0)) as Box, + ), + ]; + + let engine = create_engine_three_metrics( + "metric_a", + "SumAccumulator", + vec!["host"], + data_a, + "sum(metric_a) by (host)", + "metric_b", + "SumAccumulator", + vec!["host"], + data_b, + "sum(metric_b) by (host)", + "metric_c", + "SumAccumulator", + vec!["host"], + data_c, + "sum(metric_c) by (host)", + ); + + let query = "(sum(metric_a) by (host) + sum(metric_b) by (host)) / sum(metric_c) by (host)"; + let result = engine.handle_query_promql(query.to_string(), QUERY_TIME); + let (_, qr) = result.expect("Expected result for nested binary expression"); + let elements = match qr { + crate::engines::query_result::QueryResult::Vector(iv) => iv.values, + _ => panic!("Expected vector result"), + }; + assert_eq!( + elements.len(), + 2, + "Expected 2 result rows (host-a and host-b)" + ); + for elem in &elements { + assert!( + (elem.value - 1.0).abs() < 1e-10, + "Expected (a+b)/c = 1.0, got {} for labels {:?}", + elem.value, + elem.labels + ); + } + } +} diff --git a/asap-query-engine/src/tests/datafusion/structural_matching_tests.rs b/asap-query-engine/src/tests/datafusion/structural_matching_tests.rs new file mode 100644 index 0000000..37fecb8 --- /dev/null +++ b/asap-query-engine/src/tests/datafusion/structural_matching_tests.rs @@ -0,0 +1,120 @@ +//! Structural PromQL matching tests. +//! +//! Verifies that `find_query_config_promql_structural` can look up query configs +//! by AST-serialised arm strings, which is the mechanism used during binary +//! arithmetic dispatch. + +#[cfg(test)] +mod tests { + use crate::precompute_operators::sum_accumulator::SumAccumulator; + use crate::tests::test_utilities::engine_factories::create_engine_single_pop; + + #[test] + fn test_structural_match_rate_query_finds_config() { + let engine = create_engine_single_pop( + "http_requests_total", + "MultipleIncrease", + vec!["host"], + vec![( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(100.0)), + )], + "rate(http_requests_total[5m])", + ); + + let ast = + promql_parser::parser::parse("rate(http_requests_total[5m])").expect("parse failed"); + let result = engine.find_query_config_promql_structural(&ast); + assert!( + result.is_some(), + "Expected to find config for rate query, got None" + ); + } + + #[test] + fn test_structural_match_wrong_metric_returns_none() { + let engine = create_engine_single_pop( + "http_requests_total", + "MultipleIncrease", + vec!["host"], + vec![( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(100.0)), + )], + "rate(http_requests_total[5m])", + ); + + let ast = promql_parser::parser::parse("rate(other_metric[5m])").expect("parse failed"); + let result = engine.find_query_config_promql_structural(&ast); + assert!( + result.is_none(), + "Should not find config for different metric" + ); + } + + #[test] + fn test_structural_match_wrong_range_returns_none() { + let engine = create_engine_single_pop( + "http_requests_total", + "MultipleIncrease", + vec!["host"], + vec![( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(100.0)), + )], + "rate(http_requests_total[5m])", + ); + + let ast = + promql_parser::parser::parse("rate(http_requests_total[1m])").expect("parse failed"); + let result = engine.find_query_config_promql_structural(&ast); + assert!( + result.is_none(), + "Should not find config for different range" + ); + } + + #[test] + fn test_structural_match_wrong_function_returns_none() { + let engine = create_engine_single_pop( + "http_requests_total", + "MultipleIncrease", + vec!["host"], + vec![( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(100.0)), + )], + "rate(http_requests_total[5m])", + ); + + let ast = promql_parser::parser::parse("increase(http_requests_total[5m])") + .expect("parse failed"); + let result = engine.find_query_config_promql_structural(&ast); + assert!( + result.is_none(), + "Should not match a different function name" + ); + } + + #[test] + fn test_structural_match_spatial_query() { + let engine = create_engine_single_pop( + "http_requests_total", + "SumAccumulator", + vec!["host"], + vec![( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(100.0)), + )], + "sum(http_requests_total) by (host)", + ); + + let ast = promql_parser::parser::parse("sum(http_requests_total) by (host)") + .expect("parse failed"); + let result = engine.find_query_config_promql_structural(&ast); + assert!( + result.is_some(), + "Expected to find config for sum by (host) query" + ); + } +} diff --git a/asap-query-engine/src/tests/test_utilities/engine_factories.rs b/asap-query-engine/src/tests/test_utilities/engine_factories.rs index 63b769e..b1a27e1 100644 --- a/asap-query-engine/src/tests/test_utilities/engine_factories.rs +++ b/asap-query-engine/src/tests/test_utilities/engine_factories.rs @@ -260,6 +260,222 @@ pub fn create_engine_dual_input( ) } +/// Creates a SimpleEngine with two independent metrics, each with their own +/// aggregation config and query_config. +/// +/// agg_id=1 → metric_a, agg_id=2 → metric_b. +/// Both are registered as separate query_configs in the inference config. +#[allow(clippy::too_many_arguments)] +pub fn create_engine_two_metrics( + metric_a: &str, + aggregation_type_a: &str, + grouping_labels_a: Vec<&str>, + data_a: AccumulatorData, + query_a: &str, + metric_b: &str, + aggregation_type_b: &str, + grouping_labels_b: Vec<&str>, + data_b: AccumulatorData, + query_b: &str, +) -> SimpleEngine { + let labels_a: Vec = grouping_labels_a.iter().map(|s| s.to_string()).collect(); + let labels_b: Vec = grouping_labels_b.iter().map(|s| s.to_string()).collect(); + + let mut aggregation_configs = HashMap::new(); + + let agg_config_a = AggregationConfig { + aggregation_id: 1, + aggregation_type: aggregation_type_a.to_string(), + aggregation_sub_type: String::new(), + parameters: HashMap::new(), + grouping_labels: KeyByLabelNames::new(labels_a.clone()), + aggregated_labels: KeyByLabelNames::empty(), + rollup_labels: KeyByLabelNames::empty(), + original_yaml: String::new(), + window_size: 1, + slide_interval: 1, + window_type: "tumbling".to_string(), + spatial_filter: String::new(), + spatial_filter_normalized: String::new(), + metric: metric_a.to_string(), + num_aggregates_to_retain: None, + read_count_threshold: None, + table_name: None, + value_column: None, + }; + aggregation_configs.insert(1u64, agg_config_a); + + let agg_config_b = AggregationConfig { + aggregation_id: 2, + aggregation_type: aggregation_type_b.to_string(), + aggregation_sub_type: String::new(), + parameters: HashMap::new(), + grouping_labels: KeyByLabelNames::new(labels_b.clone()), + aggregated_labels: KeyByLabelNames::empty(), + rollup_labels: KeyByLabelNames::empty(), + original_yaml: String::new(), + window_size: 1, + slide_interval: 1, + window_type: "tumbling".to_string(), + spatial_filter: String::new(), + spatial_filter_normalized: String::new(), + metric: metric_b.to_string(), + num_aggregates_to_retain: None, + read_count_threshold: None, + table_name: None, + value_column: None, + }; + aggregation_configs.insert(2u64, agg_config_b); + + let streaming_config = Arc::new(StreamingConfig { + aggregation_configs, + }); + + let store = Arc::new(SimpleMapStore::new( + streaming_config.clone(), + CleanupPolicy::NoCleanup, + )); + + let timestamp = 1_000_000_u64; + for (label_values_opt, acc) in data_a { + let key = label_values_opt.map(|labels| KeyByLabelValues { labels }); + let output = PrecomputedOutput::new(timestamp, timestamp, key, 1); + store.insert_precomputed_output(output, acc).unwrap(); + } + for (label_values_opt, acc) in data_b { + let key = label_values_opt.map(|labels| KeyByLabelValues { labels }); + let output = PrecomputedOutput::new(timestamp, timestamp, key, 2); + store.insert_precomputed_output(output, acc).unwrap(); + } + + // Schema includes both metrics + let promql_schema = PromQLSchema::new() + .add_metric(metric_a.to_string(), KeyByLabelNames::new(labels_a)) + .add_metric(metric_b.to_string(), KeyByLabelNames::new(labels_b)); + + let query_config_a = + QueryConfig::new(query_a.to_string()).add_aggregation(AggregationReference::new(1, None)); + let query_config_b = + QueryConfig::new(query_b.to_string()).add_aggregation(AggregationReference::new(2, None)); + + let inference_config = InferenceConfig { + schema: SchemaConfig::PromQL(promql_schema), + query_configs: vec![query_config_a, query_config_b], + cleanup_policy: CleanupPolicy::NoCleanup, + }; + + SimpleEngine::new( + store, + inference_config, + streaming_config, + 1, + QueryLanguage::promql, + ) +} + +/// Creates a SimpleEngine with three independent metrics, each with their own +/// aggregation config and query_config. +/// +/// agg_id=1 → metric_a, agg_id=2 → metric_b, agg_id=3 → metric_c. +#[allow(clippy::too_many_arguments)] +pub fn create_engine_three_metrics( + metric_a: &str, + aggregation_type_a: &str, + grouping_labels_a: Vec<&str>, + data_a: AccumulatorData, + query_a: &str, + metric_b: &str, + aggregation_type_b: &str, + grouping_labels_b: Vec<&str>, + data_b: AccumulatorData, + query_b: &str, + metric_c: &str, + aggregation_type_c: &str, + grouping_labels_c: Vec<&str>, + data_c: AccumulatorData, + query_c: &str, +) -> SimpleEngine { + let labels_a: Vec = grouping_labels_a.iter().map(|s| s.to_string()).collect(); + let labels_b: Vec = grouping_labels_b.iter().map(|s| s.to_string()).collect(); + let labels_c: Vec = grouping_labels_c.iter().map(|s| s.to_string()).collect(); + + let mut aggregation_configs = HashMap::new(); + + for (id, agg_type, labels, metric) in [ + (1u64, aggregation_type_a, &labels_a, metric_a), + (2u64, aggregation_type_b, &labels_b, metric_b), + (3u64, aggregation_type_c, &labels_c, metric_c), + ] { + aggregation_configs.insert( + id, + AggregationConfig { + aggregation_id: id, + aggregation_type: agg_type.to_string(), + aggregation_sub_type: String::new(), + parameters: HashMap::new(), + grouping_labels: KeyByLabelNames::new(labels.clone()), + aggregated_labels: KeyByLabelNames::empty(), + rollup_labels: KeyByLabelNames::empty(), + original_yaml: String::new(), + window_size: 1, + slide_interval: 1, + window_type: "tumbling".to_string(), + spatial_filter: String::new(), + spatial_filter_normalized: String::new(), + metric: metric.to_string(), + num_aggregates_to_retain: None, + read_count_threshold: None, + table_name: None, + value_column: None, + }, + ); + } + + let streaming_config = Arc::new(StreamingConfig { + aggregation_configs, + }); + + let store = Arc::new(SimpleMapStore::new( + streaming_config.clone(), + CleanupPolicy::NoCleanup, + )); + + let timestamp = 1_000_000_u64; + for (agg_id, data) in [(1u64, data_a), (2u64, data_b), (3u64, data_c)] { + for (label_values_opt, acc) in data { + let key = label_values_opt.map(|labels| KeyByLabelValues { labels }); + let output = PrecomputedOutput::new(timestamp, timestamp, key, agg_id); + store.insert_precomputed_output(output, acc).unwrap(); + } + } + + let promql_schema = PromQLSchema::new() + .add_metric(metric_a.to_string(), KeyByLabelNames::new(labels_a)) + .add_metric(metric_b.to_string(), KeyByLabelNames::new(labels_b)) + .add_metric(metric_c.to_string(), KeyByLabelNames::new(labels_c)); + + let inference_config = InferenceConfig { + schema: SchemaConfig::PromQL(promql_schema), + query_configs: vec![ + QueryConfig::new(query_a.to_string()) + .add_aggregation(AggregationReference::new(1, None)), + QueryConfig::new(query_b.to_string()) + .add_aggregation(AggregationReference::new(2, None)), + QueryConfig::new(query_c.to_string()) + .add_aggregation(AggregationReference::new(3, None)), + ], + cleanup_policy: CleanupPolicy::NoCleanup, + }; + + SimpleEngine::new( + store, + inference_config, + streaming_config, + 1, + QueryLanguage::promql, + ) +} + /// Creates a single-pop engine with data at multiple timestamps for testing merge. #[allow(clippy::type_complexity)] pub fn create_engine_multi_timestamp( diff --git a/docs/design-252-arithmetic-operators.md b/docs/design-252-arithmetic-operators.md new file mode 100644 index 0000000..541f2fc --- /dev/null +++ b/docs/design-252-arithmetic-operators.md @@ -0,0 +1,112 @@ +# Design: PromQL Arithmetic Operator Acceleration (Issue #252) + +## Problem + +ASAPQuery accelerates PromQL queries by pre-computing sketches over streaming data and serving answers from those sketches at query time, bypassing the underlying TSDB for supported query patterns. + +The supported patterns today are all single-expression forms: +- `rate(metric[range])`, `increase(metric[range])`, etc. (OnlyTemporal) +- `sum(metric) by (label)`, `quantile(...)`, etc. (OnlySpatial) +- Combinations like `sum by (host) (rate(metric[range]))` (OneTemporalOneSpatial) + +Binary arithmetic expressions like `rate(errors[5m]) / rate(requests[5m])` fall through entirely — `handle_query_promql` finds no matching pattern and returns `None`, causing a full fallback to Prometheus. This is a significant coverage gap: error rate, saturation, and ratio queries are extremely common in practice. + +--- + +## Approaches Considered + +### Option A: Extend the PromQL-specific execution path + +Detect binary arithmetic at the top of `handle_query_promql`, execute each arm through the existing `execute_query_pipeline`, collect two `HashMap` result sets, and combine them with a Rust-level label-matching join. + +**Pros:** Self-contained, surgical change. No DataFusion involvement in the combination step. + +**Cons:** The combination logic (label-matching join + f64 arithmetic) would be written twice — once here, and again later when the PromQL execution is migrated to DataFusion. `execute_query_pipeline` is already on a path to being replaced. + +### Option B: DataFusion JOIN + Projection + +Use the existing DataFusion execution path (`execute_plan`, already tested but not wired in for PromQL) to build a plan that looks like: + +``` +Projection (value = lhs.value OP rhs.value, label columns...) + └── Join (inner, on = label columns) + ├── SubqueryAlias("lhs") → SummaryInfer → SummaryMergeMultiple → PrecomputedSummaryRead + └── SubqueryAlias("rhs") → SummaryInfer → SummaryMergeMultiple → PrecomputedSummaryRead +``` + +For scalar-vector arithmetic (`rate(errors[5m]) * 100`), the plan is simpler — just a `Projection` on top of the single arm's plan, no join needed. + +**Pros:** `execute_plan` is tested and ready to wire in. DataFusion's `Join + Projection` replaces hand-written label-matching join logic. Wiring this in for binary arithmetic also migrates all PromQL execution to `execute_plan`, eliminating `execute_query_pipeline` as an active path. + +**Cons:** Slightly more complex plan construction (SubqueryAlias for column disambiguation). The migration of all PromQL to `execute_plan` is a broader scope, but it's the right time to do it. + +--- + +## Decision: Option B (DataFusion) + +The decisive factor: `execute_plan` is tested and just not wired in. Building the arithmetic combination in Rust (Option A) would be work done twice — the exact same join logic would need to be re-implemented in DataFusion when the migration happens. Option B does it once, correctly. + +The additional complexity of the DataFusion `Join + Projection` plan is manageable and follows patterns already established in the codebase (`SubqueryAlias`, `LogicalPlanBuilder`). + +--- + +## Design + +### Planner (`asap-planner-rs`) + +For a query like `rate(errors[5m]) / rate(requests[5m])`, the planner: + +1. Detects the top-level `BinaryExpr` in the PromQL AST +2. Checks whether each arm is individually acceleratable (matches an existing pattern) +3. If both arms are acceleratable: emits a separate `QueryConfig` entry for each arm, as if they were independent queries +4. If either arm is not acceleratable: skips both — the engine will fall back to Prometheus +5. Deduplicates: if an arm's config already exists (e.g., `rate(errors[5m])` was also configured as a standalone query), it reuses the existing entry + +For scalar-vector arithmetic (`rate(errors[5m]) * 100`): the scalar literal is not a metric expression and needs no aggregation config. The planner emits a config only for the vector arm. + +**The planner does not emit a combined config for the binary expression itself.** The engine detects the arithmetic operator at query time from the PromQL AST and handles the combination. + +### Engine (`asap-query-engine`) + +#### Config lookup: structural PromQL matching + +To find a config for each arm at query time, the engine uses structural AST comparison (`find_query_config_promql_structural`) rather than exact string matching. This mirrors the existing `find_query_config_sql` pattern and is robust to formatting differences. + +Structural equality for PromQL compares: function name, metric name, label selectors, range duration. Evaluation timestamps are ignored. + +#### Plan construction + +For **vector op vector**: +1. Find config for each arm via structural matching +2. Build a `QueryExecutionContext` for each arm (reusing existing logic) +3. Call `to_logical_plan()` on each context to get the sketch sub-plans +4. Wrap each sub-plan in a `SubqueryAlias` (`"lhs"` / `"rhs"`) to disambiguate the `value` column name +5. Build a DataFusion inner `Join` on the shared label columns +6. Add a `Projection` computing `lhs.value OP rhs.value AS value` and projecting label columns through + +For **scalar op vector** (e.g., `rate(errors[5m]) * 100`): +1. Build a `QueryExecutionContext` for the vector arm only +2. Call `to_logical_plan()` on the context +3. Add a `Projection` computing `value OP lit(scalar) AS value` (or `lit(scalar) OP value` if scalar is on the left) + +#### Execution + +Both cases execute through `execute_logical_plan` — the refactored inner method extracted from `execute_plan`. This method takes a pre-built `LogicalPlan` and runs it through the DataFusion session. + +#### Dispatch + +In `handle_query_promql`, after parsing the PromQL AST, the engine checks whether the top node is a `BinaryExpr`. If yes, it routes to `handle_binary_expr_promql`. If no, it follows the existing single-expression path — but now calling `execute_plan` instead of `execute_query_pipeline`. + +Recursion is natural: `(A + B) / C` is a `BinaryExpr` at the top level. The LHS is itself a `BinaryExpr`, which `handle_binary_expr_promql` handles recursively when building the LHS sub-plan. + +If either arm cannot be accelerated (no matching config, or unsupported pattern), `handle_binary_expr_promql` returns `None` and the caller falls back to Prometheus for the whole query. + +### Operators supported + +All six PromQL binary arithmetic operators: `+`, `-`, `*`, `/`, `^`, `%`. + +### Out of scope + +- PromQL vector matching modifiers (`on()`, `ignoring()`, `group_left()`, `group_right()`): the join is always on all label columns. Adding vector matching is a follow-on. +- Range queries (only instant queries addressed here). +- Comparison operators (`==`, `!=`, `>`, etc.) and set operators (`and`, `or`, `unless`).