From 08528ad8a243e8c7bf3b96ea34c472d67c5c5bc8 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Wed, 1 Apr 2026 16:15:02 -0400 Subject: [PATCH 1/6] Added Claude implementation plan and a doc with the initial prompt and subsequent discussion on design choices --- .docs/CAPABILITY_MATCHING_DESIGN.md | 180 ++++++++++++ .docs/claude_plan | 411 ++++++++++++++++++++++++++++ 2 files changed, 591 insertions(+) create mode 100644 .docs/CAPABILITY_MATCHING_DESIGN.md create mode 100644 .docs/claude_plan diff --git a/.docs/CAPABILITY_MATCHING_DESIGN.md b/.docs/CAPABILITY_MATCHING_DESIGN.md new file mode 100644 index 0000000..16dad3c --- /dev/null +++ b/.docs/CAPABILITY_MATCHING_DESIGN.md @@ -0,0 +1,180 @@ +# Capability-Based Aggregation Matching — Design Decision Record + +## Problem Statement + +The query engine previously routed every incoming query to a sketch aggregation by matching the +query string against a pre-configured `query_configs` table in `InferenceConfig`. This meant: + +- Every distinct query string needed its own config entry, even when the same sketch could answer + multiple queries (e.g. `quantile(0.5, metric[5m])` and `quantile(0.9, metric[5m])` both need a + KLL sketch, but each required a separate config row). +- The system could not answer any query it had not been explicitly pre-configured for. + +The goal: let the engine understand what a query *needs* and find an existing aggregation that can +*provide* it, without requiring a one-to-one mapping in config. + +--- + +## Architecture Investigation + +Before designing anything, the existing query routing path was traced: + +1. An incoming query (PromQL or SQL) is parsed and reduced to a `QueryExecutionContext`. +2. Inside that process, `find_query_config` (exact string match) or `find_query_config_sql` + (structural AST match) look up a `QueryConfig` in `InferenceConfig.query_configs`. +3. `QueryConfig` is nothing more than a join record: query string → list of `aggregation_id`s. +4. `get_aggregation_id_info` then looks up those IDs in `StreamingConfig` to get the actual + `AggregationConfig` (sketch type, window, labels, etc.). + +The key insight: **all capability information lives in `AggregationConfig` inside `StreamingConfig`**. +The `QueryConfig` table is just indirection that requires manual pre-population. The fix is to +skip it and match against `AggregationConfig` directly when no pre-configured entry exists. + +--- + +## Design Questions and Answers + +The following questions were worked through before writing a single line of implementation. + +### Q1: When multiple aggregations are compatible, which one wins? +**Decision**: Prefer the largest `window_size`. Encapsulated in a separate `aggregation_priority` +comparator function so this policy is swappable later without touching the matching logic. + +### Q2: Label compatibility — how strict? +**Decision**: Strict exact match for now. A sketch grouped by `{job, instance}` does **not** serve +a query that groups by `{job}` only, even though collapsing labels is mathematically valid for +simple accumulators (Sum, Min, Max). The reason: for sketch types (KLL, CountMin), label collapsing +is not well-defined. Adding a TODO to relax this to "superset ok" for simple accumulators in a +future iteration. + +### Q3: Spatial filter compatibility? +**Decision**: If the stored aggregation has a non-empty `spatial_filter` and the query's normalized +filter differs (or is absent), reject. Never silently serve data filtered to `{env="prod"}` to a +query that expects unfiltered data. + +### Q4: Multi-population sketches (SetAggregator / DeltaSetAggregator)? +These require two aggregation IDs: one "value" aggregation and one "key" aggregation. The existing +`get_aggregation_id_info` already distinguishes them by type: `SetAggregator` and +`DeltaSetAggregator` are key aggregations; everything else is a value aggregation. + +**Decision**: Capability matching finds the value aggregation first (based on the statistic). If +the matched value type is a "multi-population" type (`MultipleSumAccumulator`, +`MultipleMinMaxAccumulator`, `MultipleIncreaseAccumulator`, `CountMinSketchWithHeap`), the matcher +then separately searches for a key aggregation (`SetAggregator` or `DeltaSetAggregator`) on the +same metric. Both IDs are required; if either is missing, the match fails. + +### Q5: Backward compatibility — keep old `query_configs` path? +**Decision**: Yes, as a primary route. The `query_configs` lookup runs first; capability matching +fires only when no pre-configured entry is found. This means existing deployments change behavior +only for queries that had no config entry. A `warn!` log is emitted whenever capability matching +is used, so operators can detect fallback usage. + +### Q6: Rich error messages on no-match? +**Decision**: Deferred. Collecting per-candidate rejection reasons adds significant complexity. +The matcher returns `None` on failure for now. + +### Q7: How to model `avg` (needs both Sum and Count)? +**Decision**: `QueryRequirements` holds `Vec`. For `avg`, this is `[Sum, Count]`. +All statistics in the vec must be satisfied by aggregations that share the **same** `window_size` +and `grouping_labels`. This ensures temporal consistency. + +### Q8: How is window type (sliding vs tumbling) expressed in requirements? +Framing the requirement as a specific `window_type` was considered but rejected. Instead, +`QueryRequirements` stores only `data_range_ms: Option` — the span of historical data the +query reads. Both tumbling and sliding aggregations can satisfy this, subject to different +compatibility rules: + +- **Tumbling**: `data_range_ms` must be a positive integer multiple of `window_size_ms` (so + multiple buckets can be merged to cover the range). +- **Sliding**: `data_range_ms` must equal `window_size_ms` exactly (a sliding window precomputes + exactly one range per timestamp; you can't merge overlapping windows). +- **Spatial-only** (`data_range_ms = None`): any window is compatible. + +### Q9: Where does the capability matching logic live? +**Decision**: `sketch_db_common` (the shared crate). Rationale: this logic is pure — it takes a +map of `AggregationConfig` values and a `QueryRequirements` and produces an `AggregationIdInfo`. +It has no dependency on query engine internals. Putting it in common means the planner and other +components can eventually reuse it. + +`AggregationIdInfo` (previously defined in `simple_engine.rs`) was moved to `sketch_db_common` as +a prerequisite, since the common function needs to return it. + +`StreamingConfig` (in `asap-query-engine`) gets a thin wrapper method that delegates to the +common function, so call sites inside the engine don't need to reach into common directly. + +### Q10: `aggregation_sub_type` — does it matter for matching? +**Decision**: Yes. `Min` requires `aggregation_sub_type == "min"`, `Max` requires `"max"`. The +`required_sub_type(statistic)` helper encodes this. Other statistics have no sub-type constraint. + +### Q11: For `Vec` — must all statistics agree on window and labels? +**Decision**: Yes. For `avg = [Sum, Count]`, the matched Sum aggregation and the matched Count +aggregation must have the same `window_size` and `grouping_labels`. This is the simpler, safer +choice — mixing aggregations with different windows or label granularities would produce +semantically incorrect results. + +--- + +## What Was Rejected + +### "Translate PromQL to SQL and execute via DataFusion SQL engine" +Considered as a broader architectural direction. Rejected for this feature because: +- Data is stored as binary sketches (KLL, CountMin, etc.), not raw values. SQL aggregation + functions cannot merge sketches natively. +- Every sketch operation would need a DataFusion UDF, recreating the existing operator logic + with more indirection. +- The existing `execute_plan()` path already uses DataFusion as an execution *framework* with + custom physical operators — that is the right abstraction boundary, not SQL strings. + +### Merging PromQL and SQL `build_query_execution_context` paths +The two build paths (PromQL and SQL) were kept separate. They parse different syntaxes into +different AST types. Merging them would require a common intermediate representation before the +current `QueryExecutionContext` and would not reduce complexity. The shared logic is the capability +matching layer, not the parsing layer. + +### Rich rejection errors +Collecting per-candidate rejection reasons (e.g. "found KLL for metric X but window 15 m doesn't +match 5 m query") was considered. Deferred: the matching logic touches every candidate and +collecting structured reasons multiplies the implementation surface significantly. Simple `None` +return with `debug!` logging is sufficient for now. + +--- + +## Final Architecture + +``` +Incoming query (PromQL or SQL) + │ + ▼ + Parse query AST + │ + ▼ +Try find_query_config / find_query_config_sql ← existing path (unchanged) + │ + ├── found ──► get_aggregation_id_info(config) ──► AggregationIdInfo + │ + └── not found ──► warn!("falling back to capability matching") + │ + ▼ + build_query_requirements_{promql|sql} + → QueryRequirements { + metric, statistics: Vec, + data_range_ms, grouping_labels, + spatial_filter_normalized + } + │ + ▼ + StreamingConfig::find_compatible_aggregation(&requirements) + → sketch_db_common::find_compatible_aggregation( + &self.aggregation_configs, requirements + ) + → Option +``` + +The `find_compatible_aggregation` function in `sketch_db_common`: +1. For each statistic, collects candidates from `StreamingConfig` passing all filters + (metric, type, sub-type, window, labels, spatial filter). +2. Sorts candidates by `aggregation_priority` (largest window first). +3. For `Vec`, ensures all statistics are satisfied by configs agreeing on + window and labels. +4. If the value aggregation type is multi-population, also finds the paired key aggregation. +5. Returns `AggregationIdInfo` or `None`. diff --git a/.docs/claude_plan b/.docs/claude_plan new file mode 100644 index 0000000..539f5cf --- /dev/null +++ b/.docs/claude_plan @@ -0,0 +1,411 @@ +# Capability-Based Aggregation Matching + +## Context + +Queries currently route to an aggregation ID by exact/structural string match against a pre-configured +`query_configs` table in `InferenceConfig`. This means every distinct query string (including +`quantile(0.5, ...)` vs `quantile(0.9, ...)`) requires its own config entry, even when the same +sketch can serve both. The goal is to add a capability-matching fallback: extract what a query +*needs* (metric, statistics, data range, labels, spatial filter) and find an `AggregationConfig` +in `StreamingConfig` that can satisfy it. The old `query_configs` path remains as the primary +route; capability matching fires only when no pre-configured entry is found. + +--- + +## Key Design Decisions (already settled) + +- **Fallback order**: `query_configs` lookup first; capability matching only when no entry found +- **Multiple statistics** (e.g. `avg` = `[Sum, Count]`): `Vec`; all must be satisfied by + configs with identical `window_size`, `grouping_labels`, and `spatial_filter` +- **Window**: store `data_range_ms: Option` (the range the query needs); both tumbling and + sliding configs can satisfy it — see compatibility rules below +- **Priority**: largest `window_size` wins among compatible candidates (encapsulated in its own + function so it is swappable) +- **Label compatibility**: strict exact match for now (add TODO for superset support) +- **Spatial filter**: if a config has a non-empty filter and the query's normalized filter differs, + reject +- **Sub-type**: must match (e.g. `Min` needs `aggregation_sub_type == "min"`) +- **Multi-population**: after finding the value aggregation, if its type is multi-population, also + find a key aggregation (`SetAggregator` or `DeltaSetAggregator`) on the same metric +- **`QueryRequirements` + capability logic**: lives in `sketch_db_common` +- **Fallback warning**: log `warn!` when capability matching is used (query not in `query_configs`) + +--- + +## Codebase Facts + +- `Statistic` enum: `promql_utilities::query_logics::enums` — already imported by `simple_engine.rs` + as `use promql_utilities::query_logics::enums::{QueryPatternType, Statistic};` +- `AggregationIdInfo`: defined in `asap-query-engine/src/engines/simple_engine.rs:43` — needs to + move to `sketch_db_common` +- `sketch_db_common` already depends on `promql_utilities` (uses `KeyByLabelNames`) +- `StreamingConfig`: in `asap-query-engine/src/data_model/streaming_config.rs` — stays there, gets + a new `find_compatible_aggregation` wrapper method +- `normalize_spatial_filter`: already in `sketch_db_common::utils` +- `AggregationConfig.aggregation_type` string values: `"Sum"`, `"MultipleSumAccumulator"`, + `"MinMax"`, `"MultipleMinMaxAccumulator"`, `"Increase"`, `"MultipleIncreaseAccumulator"`, + `"CountMinSketch"`, `"CountMinSketchWithHeap"` / `"CountMinSketchWithHeapAccumulator"`, + `"DatasketchesKLL"`, `"HydraKLL"`, `"SetAggregator"`, `"DeltaSetAggregator"` +- Multi-population value types (require a key aggregation): + `MultipleSumAccumulator`, `MultipleMinMaxAccumulator`, `MultipleIncreaseAccumulator`, + `CountMinSketchWithHeap`, `CountMinSketchWithHeapAccumulator` +- Key aggregation types: `SetAggregator`, `DeltaSetAggregator` +- Test utilities: `asap-query-engine/src/tests/test_utilities/config_builders.rs` has + `TestConfigBuilder` — reuse for integration tests + +--- + +## Implementation Steps (TDD order) + +### Step 1 — Move `AggregationIdInfo` to `sketch_db_common` + +**File**: `asap-common/dependencies/rs/sketch_db_common/src/aggregation_config.rs` + +Move the struct definition here (it has no engine-specific deps): + +```rust +#[derive(Debug, Clone)] +pub struct AggregationIdInfo { + pub aggregation_id_for_key: u64, + pub aggregation_id_for_value: u64, + pub aggregation_type_for_key: String, + pub aggregation_type_for_value: String, +} +``` + +In `sketch_db_common/src/lib.rs`, ensure it is re-exported via `pub use aggregation_config::*`. + +In `asap-query-engine/src/engines/simple_engine.rs`, remove the local definition and add: +```rust +use sketch_db_common::AggregationIdInfo; +``` +Verify nothing else breaks (`cargo check`). + +--- + +### Step 2 — Add `QueryRequirements` to `sketch_db_common` + +**New file**: `asap-common/dependencies/rs/sketch_db_common/src/query_requirements.rs` + +```rust +use promql_utilities::query_logics::enums::Statistic; +use promql_utilities::data_model::KeyByLabelNames; + +/// What a query needs in order to be answered by a stored aggregation. +#[derive(Debug, Clone)] +pub struct QueryRequirements { + /// Metric name (PromQL) or "table_name.value_column" (SQL) + pub metric: String, + /// One or more statistics needed. For avg this is [Sum, Count]. + /// All must be satisfied by aggregations sharing the same window / labels. + pub statistics: Vec, + /// The span of historical data the query reads, in milliseconds. + /// None for spatial-only queries (no time range). + pub data_range_ms: Option, + /// GROUP BY labels expected in the result. + pub grouping_labels: KeyByLabelNames, + /// Normalized label filter string (use normalize_spatial_filter). + pub spatial_filter_normalized: String, +} +``` + +Register in `lib.rs`: +```rust +pub mod query_requirements; +pub use query_requirements::*; +``` + +--- + +### Step 3 — Write tests for capability matching (TDD — write before implementation) + +**New file**: `asap-common/dependencies/rs/sketch_db_common/src/capability_matching.rs` + +Put the `#[cfg(test)]` block at the bottom of this file **before** writing any implementation. +Each test constructs a minimal `HashMap` and calls +`find_compatible_aggregation`. + +Helper for tests — build a minimal `AggregationConfig`: +```rust +fn make_config( + id: u64, metric: &str, agg_type: &str, sub_type: &str, + window_size_s: u64, window_type: &str, + grouping: &[&str], spatial_filter: &str, +) -> AggregationConfig { ... } +``` + +Tests to write (all should compile and **fail** before implementation): + +| Test name | What it asserts | +|-----------|----------------| +| `basic_sum_match` | Sum query finds Sum config on correct metric | +| `quantile_any_value_finds_kll` | `Statistic::Quantile` matches `DatasketchesKLL` regardless of quantile value | +| `quantile_matches_hydrarkll` | `Statistic::Quantile` also matches `HydraKLL` | +| `no_match_wrong_metric` | Returns None when metric differs | +| `no_match_wrong_type` | Sum query does not match `DatasketchesKLL` | +| `window_tumbling_exact` | 5 min query (300_000 ms) matches 300 s tumbling config | +| `window_tumbling_divisible` | 900_000 ms query matches 300 s tumbling config (3 buckets) | +| `window_tumbling_not_divisible` | 600_000 ms query does NOT match 900 s tumbling config | +| `window_sliding_exact` | 5 min query matches 300 s sliding config | +| `window_sliding_too_large` | Query range > sliding window_size → no match | +| `window_priority_largest_wins` | Two tumbling configs (300 s and 900 s) for same metric/stat — 900 s chosen for 900_000 ms query | +| `spatial_only_no_range` | `data_range_ms = None` matches any window_size | +| `label_strict_exact` | Exact label set matches | +| `label_strict_superset_rejected` | Config with `{job, instance}` does NOT match query requiring only `{job}` (strict for now) | +| `label_mismatch_rejected` | Completely different labels → None | +| `spatial_filter_empty_both` | Both empty → match | +| `spatial_filter_query_empty_config_has_filter` | Config has filter, query has none → no match | +| `spatial_filter_same` | Same normalized filter → match | +| `spatial_filter_different` | Different filters → no match | +| `sub_type_min_matches_min` | `Statistic::Min` finds `MinMax` config with `sub_type = "min"` | +| `sub_type_max_rejects_min` | `Statistic::Max` does NOT find `MinMax` config with `sub_type = "min"` | +| `multi_pop_finds_key_agg` | `Statistic::Topk` finds `CountMinSketchWithHeap` (value) + `DeltaSetAggregator` (key) on same metric; result has distinct key/value ids | +| `avg_finds_sum_and_count` | `statistics = [Sum, Count]`, two configs present → both found, same window/labels | +| `avg_different_windows_rejected` | Sum config has 300 s window, Count config has 900 s window → None (must agree) | + +--- + +### Step 4 — Implement capability matching in `sketch_db_common` + +**File**: `asap-common/dependencies/rs/sketch_db_common/src/capability_matching.rs` + +#### 4a. Pure compatibility helpers + +```rust +/// Returns the aggregation_type strings that can serve this statistic. +pub fn compatible_agg_types(stat: Statistic) -> &'static [&'static str] + +/// Returns the required aggregation_sub_type for this statistic, if any. +/// e.g. Min → Some("min"), Max → Some("max"), Quantile → None +pub fn required_sub_type(stat: Statistic) -> Option<&'static str> + +/// Whether this value aggregation type requires a paired key aggregation +/// (SetAggregator / DeltaSetAggregator). +pub fn is_multi_population_value_type(agg_type: &str) -> bool + +/// Window compatibility: can `config` serve a query needing `data_range_ms`? +/// - None (spatial-only): always true +/// - Tumbling: data_range_ms must be a positive multiple of window_size_ms +/// - Sliding: data_range_ms must equal window_size_ms exactly +pub fn window_compatible(config: &AggregationConfig, data_range_ms: Option) -> bool + +/// Label compatibility: strict exact match. +/// TODO: relax to superset (config.grouping_labels ⊇ req.grouping_labels) +pub fn labels_compatible(config_labels: &KeyByLabelNames, req_labels: &KeyByLabelNames) -> bool + +/// Spatial filter compatibility: both empty → ok; config non-empty and differs → reject. +pub fn spatial_filter_compatible(config_filter: &str, req_filter: &str) -> bool + +/// Aggregation priority comparator: prefer larger window_size (descending). +/// Separate function so callers can swap it out. +pub fn aggregation_priority(a: &AggregationConfig, b: &AggregationConfig) -> std::cmp::Ordering +``` + +#### 4b. Core matching function + +```rust +/// Find a compatible aggregation (or set of aggregations for avg / multi-pop queries) +/// given a map of all available aggregations and a set of query requirements. +/// +/// Returns None if no fully compatible match exists. +/// Logs a debug message for each candidate that was considered but rejected. +pub fn find_compatible_aggregation( + configs: &HashMap, + requirements: &QueryRequirements, +) -> Option +``` + +**Algorithm** (single-statistic path first, then generalize): + +1. For each statistic `s` in `requirements.statistics`: + - Collect all configs where: + - `config.metric == requirements.metric` + - `compatible_agg_types(s)` contains `config.aggregation_type` + - `required_sub_type(s)` matches `config.aggregation_sub_type` (if Some) + - `window_compatible(config, requirements.data_range_ms)` + - `labels_compatible(&config.grouping_labels, &requirements.grouping_labels)` + - `spatial_filter_compatible(&config.spatial_filter_normalized, &requirements.spatial_filter_normalized)` + - Sort by `aggregation_priority` (largest window first) + - Keep sorted list as `candidates[s]` + +2. If any statistic has zero candidates → return None + +3. For `Vec` with multiple entries (e.g. avg = [Sum, Count]): + - Take the best candidate for `statistics[0]` (first after sort) + - For each remaining statistic, find the best candidate that also shares the same + `window_size` and `grouping_labels` as the first + - If all are found → proceed; else return None + +4. The value aggregation is the candidate for `statistics[0]`. + +5. If `is_multi_population_value_type(value_agg.aggregation_type)`: + - Search configs for one where `aggregation_type` is `"SetAggregator"` or + `"DeltaSetAggregator"` AND `config.metric == requirements.metric` + - If not found → return None + - This becomes `aggregation_id_for_key` + +6. Otherwise `aggregation_id_for_key = aggregation_id_for_value`. + +7. Return: + ```rust + Some(AggregationIdInfo { + aggregation_id_for_value: value_agg.aggregation_id, + aggregation_type_for_value: value_agg.aggregation_type.clone(), + aggregation_id_for_key: key_agg.aggregation_id, + aggregation_type_for_key: key_agg.aggregation_type.clone(), + }) + ``` + +Register in `lib.rs`: +```rust +pub mod capability_matching; +pub use capability_matching::find_compatible_aggregation; +``` + +All tests from Step 3 should now pass. + +--- + +### Step 5 — Add wrapper method on `StreamingConfig` + +**File**: `asap-query-engine/src/data_model/streaming_config.rs` + +```rust +use sketch_db_common::{find_compatible_aggregation, QueryRequirements, AggregationIdInfo}; + +impl StreamingConfig { + pub fn find_compatible_aggregation( + &self, + requirements: &QueryRequirements, + ) -> Option { + find_compatible_aggregation(&self.aggregation_configs, requirements) + } +} +``` + +--- + +### Step 6 — Write tests for `QueryRequirements` extraction (TDD — write before implementation) + +**File**: add a new test module in `asap-query-engine/src/tests/` (e.g. `capability_matching_tests.rs`) + +Tests for `build_query_requirements_promql`: + +| Test | Input PromQL | Expected `QueryRequirements` field(s) | +|------|-------------|--------------------------------------| +| `promql_temporal_sum` | `sum_over_time(cpu[5m])` | `statistics=[Sum]`, `data_range_ms=Some(300_000)` | +| `promql_spatial_sum` | `sum(cpu)` | `statistics=[Sum]`, `data_range_ms=None` | +| `promql_temporal_quantile` | `quantile_over_time(0.9, latency[5m])` | `statistics=[Quantile]`, `data_range_ms=Some(300_000)` | +| `promql_temporal_rate` | `rate(requests[1m])` | `statistics=[Rate]`, `data_range_ms=Some(60_000)` | +| `promql_avg_expands` | `avg_over_time(cpu[5m])` | `statistics=[Sum, Count]` | +| `promql_label_extraction` | `sum(cpu{job="foo"})` | `grouping_labels` empty, `spatial_filter_normalized` encodes `job=foo` | + +Tests for `build_query_requirements_sql` (same spirit, different syntax). + +Integration tests for fallback wiring: + +| Test | Setup | Expected behaviour | +|------|-------|--------------------| +| `capability_fallback_fires_when_no_config` | No `query_configs` entry; streaming_config has compatible KLL | Returns valid result | +| `config_path_takes_priority` | Both config entry and compatible KLL exist | Uses config entry (no capability matching) | +| `capability_fallback_warns` | No config entry; capability match found | `warn!` is emitted (check tracing subscriber) | +| `no_match_returns_none` | No config entry; no compatible aggregation | Returns None | +| `quantile_different_values_same_agg` | query_configs empty; KLL agg for `latency`; two queries `q(0.5)` and `q(0.9)` | Both resolve to same `aggregation_id` | + +--- + +### Step 7 — Implement `build_query_requirements_promql` and `build_query_requirements_sql` + +**File**: `asap-query-engine/src/engines/simple_engine.rs` + +Add two private methods. Both reuse data already extracted during parsing (do not re-parse): + +```rust +fn build_query_requirements_promql( + &self, + match_result: &PromQLMatchResult, + query_pattern_type: QueryPatternType, +) -> QueryRequirements + +fn build_query_requirements_sql( + &self, + match_result: &SQLQuery, + query_pattern_type: QueryPatternType, +) -> QueryRequirements +``` + +**Extraction rules** (both methods): + +| `QueryRequirements` field | Source | +|--------------------------|--------| +| `metric` | Already extracted from the parsed AST in the build-context methods | +| `statistics` | Map from `Statistic` (already computed); `Avg` → `[Sum, Count]`, else `[stat]` | +| `data_range_ms` | `OnlySpatial` → `None`; temporal → `range_seconds * 1000` (from match_result range) | +| `grouping_labels` | GROUP BY / `by(...)` labels from parsed AST | +| `spatial_filter_normalized` | Call `normalize_spatial_filter(&spatial_filter)` (already in `sketch_db_common::utils`) | + +--- + +### Step 8 — Wire fallback into build context methods + +**File**: `asap-query-engine/src/engines/simple_engine.rs` + +In `build_query_execution_context_promql`, `build_query_execution_context_sql`, +`build_spatiotemporal_context`, and the Elastic DSL context builder — all four call +`find_query_config` / `find_query_config_sql` and then `get_aggregation_id_info`. Replace that +pair with: + +```rust +let agg_info: AggregationIdInfo = if let Some(config) = self.find_query_config(&query) { + self.get_aggregation_id_info(config) +} else { + warn!( + "No query_config entry for query '{}'. Attempting capability-based matching.", + query + ); + let requirements = self.build_query_requirements_promql(&match_result, query_pattern_type); + self.streaming_config.find_compatible_aggregation(&requirements)? +}; +``` + +For SQL, swap `find_query_config_sql` and `build_query_requirements_sql` accordingly. + +No other logic in the build-context methods changes. + +--- + +## Files Changed + +| File | Change | +|------|--------| +| `asap-common/dependencies/rs/sketch_db_common/src/aggregation_config.rs` | Add `AggregationIdInfo` struct here (move from engine) | +| `asap-common/dependencies/rs/sketch_db_common/src/query_requirements.rs` | **New** — `QueryRequirements` struct | +| `asap-common/dependencies/rs/sketch_db_common/src/capability_matching.rs` | **New** — all compatibility helpers + `find_compatible_aggregation` | +| `asap-common/dependencies/rs/sketch_db_common/src/lib.rs` | Register two new modules, re-export | +| `asap-query-engine/src/data_model/streaming_config.rs` | Add `find_compatible_aggregation` wrapper | +| `asap-query-engine/src/engines/simple_engine.rs` | Remove local `AggregationIdInfo`, add build-requirements helpers, wire fallback in 4 build-context methods | +| `asap-query-engine/src/tests/capability_matching_tests.rs` | **New** — integration tests for fallback path | + +--- + +## Verification + +```bash +# 1. All existing tests still pass (no regressions) +cargo test -p asap-query-engine + +# 2. New unit tests in sketch_db_common pass +cargo test -p sketch_db_common + +# 3. Manually verify quantile routing: configure a streaming_config with one +# DatasketchesKLL aggregation for a metric, no query_configs entry. +# Send quantile(0.5) and quantile(0.9) range queries — both should succeed +# and return results from the same aggregation_id (visible via debug logging). + +# 4. Verify fallback warning: set RUST_LOG=warn and send an unconfigured query — +# the warn! line should appear in stderr. + +# 5. Verify priority: configure two tumbling Sum aggregations for the same metric +# (window_size 300 s and 900 s). Send a sum_over_time(metric[900s]) query. +# Confirm the 900 s aggregation is selected (check via debug log or test assertion). +``` From a394cfad248e3fa88f65bdd4cffbb8d2e5dd6973 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Wed, 1 Apr 2026 16:28:27 -0400 Subject: [PATCH 2/6] Renamed plan file --- .docs/{claude_plan => claude_plan.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename .docs/{claude_plan => claude_plan.md} (100%) diff --git a/.docs/claude_plan b/.docs/claude_plan.md similarity index 100% rename from .docs/claude_plan rename to .docs/claude_plan.md From a9ba7b6b598bed3799afc7c0e7a1566dc1aafa0b Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Thu, 2 Apr 2026 09:21:30 -0400 Subject: [PATCH 3/6] Added implementation --- .../src/aggregation_config.rs | 11 + .../src/capability_matching.rs | 733 ++++++++++++++++++ .../rs/sketch_db_common/src/lib.rs | 4 + .../src/query_requirements.rs | 21 + .../src/bin/show_logical_plans.rs | 3 +- .../src/data_model/streaming_config.rs | 15 +- .../src/engines/logical/plan_builder.rs | 5 +- .../src/engines/simple_engine.rs | 200 +++-- .../src/tests/capability_matching_tests.rs | 255 ++++++ .../plan_builder_regression_tests.rs | 3 +- asap-query-engine/src/tests/mod.rs | 1 + .../src/tests/test_utilities/comparison.rs | 3 +- 12 files changed, 1194 insertions(+), 60 deletions(-) create mode 100644 asap-common/dependencies/rs/sketch_db_common/src/capability_matching.rs create mode 100644 asap-common/dependencies/rs/sketch_db_common/src/query_requirements.rs create mode 100644 asap-query-engine/src/tests/capability_matching_tests.rs diff --git a/asap-common/dependencies/rs/sketch_db_common/src/aggregation_config.rs b/asap-common/dependencies/rs/sketch_db_common/src/aggregation_config.rs index 4746acc..b709eaa 100644 --- a/asap-common/dependencies/rs/sketch_db_common/src/aggregation_config.rs +++ b/asap-common/dependencies/rs/sketch_db_common/src/aggregation_config.rs @@ -34,6 +34,17 @@ pub struct AggregationConfig { pub value_column: Option, // SQL mode: which value column to aggregate } +/// Aggregation IDs and types for both the key and value dimensions of a query. +/// For single-population queries, key and value share the same ID and type. +/// For multi-population queries (e.g. Topk), they differ. +#[derive(Debug, Clone)] +pub struct AggregationIdInfo { + pub aggregation_id_for_key: u64, + pub aggregation_id_for_value: u64, + pub aggregation_type_for_key: String, + pub aggregation_type_for_value: String, +} + // TODO: need to implement deserialization methods impl AggregationConfig { diff --git a/asap-common/dependencies/rs/sketch_db_common/src/capability_matching.rs b/asap-common/dependencies/rs/sketch_db_common/src/capability_matching.rs new file mode 100644 index 0000000..775ee29 --- /dev/null +++ b/asap-common/dependencies/rs/sketch_db_common/src/capability_matching.rs @@ -0,0 +1,733 @@ +use std::cmp::Ordering; +use std::collections::HashMap; + +use promql_utilities::data_model::KeyByLabelNames; +use promql_utilities::query_logics::enums::Statistic; + +use crate::aggregation_config::{AggregationConfig, AggregationIdInfo}; +use crate::query_requirements::QueryRequirements; +use crate::utils::normalize_spatial_filter; + +// --------------------------------------------------------------------------- +// Pure compatibility helpers +// --------------------------------------------------------------------------- + +/// Returns the aggregation_type strings that can serve this statistic. +pub fn compatible_agg_types(stat: Statistic) -> &'static [&'static str] { + match stat { + Statistic::Sum => &["Sum", "MultipleSumAccumulator"], + Statistic::Count => &[ + "CountMinSketch", + "CountMinSketchWithHeap", + "CountMinSketchWithHeapAccumulator", + ], + Statistic::Min => &["MinMax", "MultipleMinMaxAccumulator"], + Statistic::Max => &["MinMax", "MultipleMinMaxAccumulator"], + Statistic::Quantile => &["DatasketchesKLL", "HydraKLL"], + Statistic::Rate | Statistic::Increase => &["Increase", "MultipleIncreaseAccumulator"], + Statistic::Cardinality => &["SetAggregator", "DeltaSetAggregator"], + Statistic::Topk => &[ + "CountMinSketchWithHeap", + "CountMinSketchWithHeapAccumulator", + ], + } +} + +/// Returns the required aggregation_sub_type for this statistic, if any. +/// `Min` requires `"min"`, `Max` requires `"max"`. All others are unconstrained. +pub fn required_sub_type(stat: Statistic) -> Option<&'static str> { + match stat { + Statistic::Min => Some("min"), + Statistic::Max => Some("max"), + _ => None, + } +} + +/// Whether this value aggregation type requires a paired key aggregation +/// (`SetAggregator` or `DeltaSetAggregator`). +pub fn is_multi_population_value_type(agg_type: &str) -> bool { + matches!( + agg_type, + "MultipleSumAccumulator" + | "MultipleMinMaxAccumulator" + | "MultipleIncreaseAccumulator" + | "CountMinSketchWithHeap" + | "CountMinSketchWithHeapAccumulator" + ) +} + +/// Whether this type is a key aggregation (tracks which label-value combinations exist). +fn is_key_agg_type(agg_type: &str) -> bool { + matches!(agg_type, "SetAggregator" | "DeltaSetAggregator") +} + +/// Window compatibility: can `config` serve a query needing `data_range_ms`? +/// +/// - `None` (spatial-only): always compatible. +/// - Tumbling: `data_range_ms` must be a positive integer multiple of `window_size_ms`. +/// - Sliding: `data_range_ms` must equal `window_size_ms` exactly (a sliding window +/// precomputes one fixed range per timestamp; overlapping windows cannot be merged). +pub fn window_compatible(config: &AggregationConfig, data_range_ms: Option) -> bool { + let Some(range) = data_range_ms else { + return true; + }; + let window_ms = config.window_size * 1000; + if window_ms == 0 || range == 0 { + return false; + } + match config.window_type.as_str() { + "sliding" => range == window_ms, + _ => range % window_ms == 0, // tumbling (or unknown — treat as tumbling) + } +} + +/// Label compatibility: strict exact match. +/// TODO: relax to superset (config.grouping_labels ⊇ req.grouping_labels) for +/// simple accumulators (Sum, MinMax, Increase). +pub fn labels_compatible(config_labels: &KeyByLabelNames, req_labels: &KeyByLabelNames) -> bool { + config_labels == req_labels +} + +/// Spatial filter compatibility. +/// - Both empty → compatible. +/// - Config non-empty and matches query → compatible. +/// - Config non-empty and query differs (or is empty) → incompatible. +pub fn spatial_filter_compatible(config_filter: &str, req_filter: &str) -> bool { + let config_norm = normalize_spatial_filter(config_filter); + let req_norm = normalize_spatial_filter(req_filter); + if config_norm.is_empty() { + // Config has no filter — compatible with any query filter. + return true; + } + config_norm == req_norm +} + +/// Aggregation priority comparator: prefer larger `window_size` (descending). +/// This is a separate function so callers can swap the policy without touching matching logic. +pub fn aggregation_priority(a: &AggregationConfig, b: &AggregationConfig) -> Ordering { + b.window_size.cmp(&a.window_size) +} + +// --------------------------------------------------------------------------- +// Core matching function +// --------------------------------------------------------------------------- + +/// Find a compatible aggregation (or pair of aggregations for multi-population queries) +/// given all available aggregation configs and a set of query requirements. +/// +/// Returns `None` if no fully compatible match exists. +/// +/// Algorithm: +/// 1. For each statistic, collect and sort compatible candidates. +/// 2. For multi-statistic requirements (e.g. avg = [Sum, Count]), all must be +/// served by configs sharing the same `window_size` and `grouping_labels`. +/// 3. If the selected value aggregation type is multi-population, also find a +/// paired key aggregation (`SetAggregator` / `DeltaSetAggregator`) on the same metric. +pub fn find_compatible_aggregation( + configs: &HashMap, + requirements: &QueryRequirements, +) -> Option { + if requirements.statistics.is_empty() { + return None; + } + + // For each statistic, collect configs that pass all filters, sorted by priority. + let mut per_stat_candidates: Vec> = Vec::new(); + + for &stat in &requirements.statistics { + let types = compatible_agg_types(stat); + let sub_type = required_sub_type(stat); + + let mut candidates: Vec<&AggregationConfig> = configs + .values() + .filter(|c| { + c.metric == requirements.metric + && types.contains(&c.aggregation_type.as_str()) + && sub_type.is_none_or(|st| c.aggregation_sub_type == st) + && window_compatible(c, requirements.data_range_ms) + && labels_compatible(&c.grouping_labels, &requirements.grouping_labels) + && spatial_filter_compatible( + &c.spatial_filter_normalized, + &requirements.spatial_filter_normalized, + ) + }) + .collect(); + + candidates.sort_by(|a, b| aggregation_priority(a, b)); + + if candidates.is_empty() { + return None; + } + per_stat_candidates.push(candidates); + } + + // Pick the best candidate for the first statistic. + let value_agg = per_stat_candidates[0][0]; + + // For multi-statistic requirements, the remaining statistics must be served by a + // config that agrees on window_size and grouping_labels with the chosen value agg. + for candidates in per_stat_candidates.iter().skip(1) { + let found = candidates.iter().any(|c| { + c.window_size == value_agg.window_size && c.grouping_labels == value_agg.grouping_labels + }); + if !found { + return None; + } + } + + // If value type is multi-population, find the paired key aggregation. + let key_agg: &AggregationConfig = if is_multi_population_value_type(&value_agg.aggregation_type) + { + configs + .values() + .find(|c| c.metric == requirements.metric && is_key_agg_type(&c.aggregation_type))? + } else { + value_agg + }; + + Some(AggregationIdInfo { + aggregation_id_for_value: value_agg.aggregation_id, + aggregation_type_for_value: value_agg.aggregation_type.clone(), + aggregation_id_for_key: key_agg.aggregation_id, + aggregation_type_for_key: key_agg.aggregation_type.clone(), + }) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use crate::utils::normalize_spatial_filter; + use promql_utilities::data_model::KeyByLabelNames; + use std::collections::HashMap; + + #[allow(clippy::too_many_arguments)] + fn make_config( + id: u64, + metric: &str, + agg_type: &str, + sub_type: &str, + window_size_s: u64, + window_type: &str, + grouping: &[&str], + spatial_filter: &str, + ) -> AggregationConfig { + let grouping_labels = + KeyByLabelNames::new(grouping.iter().map(|s| s.to_string()).collect()); + let spatial_filter_normalized = normalize_spatial_filter(spatial_filter); + AggregationConfig { + aggregation_id: id, + aggregation_type: agg_type.to_string(), + aggregation_sub_type: sub_type.to_string(), + parameters: HashMap::new(), + grouping_labels, + aggregated_labels: KeyByLabelNames::new(vec![]), + rollup_labels: KeyByLabelNames::new(vec![]), + original_yaml: String::new(), + window_size: window_size_s, + slide_interval: window_size_s, + window_type: window_type.to_string(), + spatial_filter: spatial_filter.to_string(), + spatial_filter_normalized, + metric: metric.to_string(), + num_aggregates_to_retain: None, + read_count_threshold: None, + table_name: None, + value_column: None, + } + } + + fn req( + metric: &str, + stats: &[Statistic], + data_range_ms: Option, + grouping: &[&str], + spatial_filter: &str, + ) -> QueryRequirements { + QueryRequirements { + metric: metric.to_string(), + statistics: stats.to_vec(), + data_range_ms, + grouping_labels: KeyByLabelNames::new(grouping.iter().map(|s| s.to_string()).collect()), + spatial_filter_normalized: normalize_spatial_filter(spatial_filter), + } + } + + fn single_config(config: AggregationConfig) -> HashMap { + let mut m = HashMap::new(); + m.insert(config.aggregation_id, config); + m + } + + // --- basic type matching --- + + #[test] + fn basic_sum_match() { + let configs = single_config(make_config(1, "cpu", "Sum", "", 300, "tumbling", &[], "")); + let result = find_compatible_aggregation( + &configs, + &req("cpu", &[Statistic::Sum], Some(300_000), &[], ""), + ); + assert!(result.is_some()); + assert_eq!(result.unwrap().aggregation_id_for_value, 1); + } + + #[test] + fn quantile_any_value_finds_kll() { + let configs = single_config(make_config( + 2, + "lat", + "DatasketchesKLL", + "", + 300, + "tumbling", + &[], + "", + )); + // quantile value (0.5 or 0.9) is NOT part of QueryRequirements — both should find the same config + let r1 = find_compatible_aggregation( + &configs, + &req("lat", &[Statistic::Quantile], Some(300_000), &[], ""), + ); + let r2 = find_compatible_aggregation( + &configs, + &req("lat", &[Statistic::Quantile], Some(300_000), &[], ""), + ); + assert_eq!(r1.unwrap().aggregation_id_for_value, 2); + assert_eq!(r2.unwrap().aggregation_id_for_value, 2); + } + + #[test] + fn quantile_matches_hydrarkll() { + let configs = single_config(make_config( + 3, + "lat", + "HydraKLL", + "", + 300, + "tumbling", + &[], + "", + )); + let result = find_compatible_aggregation( + &configs, + &req("lat", &[Statistic::Quantile], Some(300_000), &[], ""), + ); + assert_eq!(result.unwrap().aggregation_id_for_value, 3); + } + + #[test] + fn no_match_wrong_metric() { + let configs = single_config(make_config(1, "cpu", "Sum", "", 300, "tumbling", &[], "")); + let result = find_compatible_aggregation( + &configs, + &req("mem", &[Statistic::Sum], Some(300_000), &[], ""), + ); + assert!(result.is_none()); + } + + #[test] + fn no_match_wrong_type() { + let configs = single_config(make_config( + 1, + "cpu", + "DatasketchesKLL", + "", + 300, + "tumbling", + &[], + "", + )); + let result = find_compatible_aggregation( + &configs, + &req("cpu", &[Statistic::Sum], Some(300_000), &[], ""), + ); + assert!(result.is_none()); + } + + // --- window compatibility --- + + #[test] + fn window_tumbling_exact() { + let configs = single_config(make_config(1, "cpu", "Sum", "", 300, "tumbling", &[], "")); + let result = find_compatible_aggregation( + &configs, + &req("cpu", &[Statistic::Sum], Some(300_000), &[], ""), + ); + assert!(result.is_some()); + } + + #[test] + fn window_tumbling_divisible() { + // 900_000 ms / 300 s = 3 buckets — valid merge + let configs = single_config(make_config(1, "cpu", "Sum", "", 300, "tumbling", &[], "")); + let result = find_compatible_aggregation( + &configs, + &req("cpu", &[Statistic::Sum], Some(900_000), &[], ""), + ); + assert!(result.is_some()); + } + + #[test] + fn window_tumbling_not_divisible() { + // 600_000 ms / 900 s is not a whole number + let configs = single_config(make_config(1, "cpu", "Sum", "", 900, "tumbling", &[], "")); + let result = find_compatible_aggregation( + &configs, + &req("cpu", &[Statistic::Sum], Some(600_000), &[], ""), + ); + assert!(result.is_none()); + } + + #[test] + fn window_sliding_exact() { + let configs = single_config(make_config(1, "cpu", "Sum", "", 300, "sliding", &[], "")); + let result = find_compatible_aggregation( + &configs, + &req("cpu", &[Statistic::Sum], Some(300_000), &[], ""), + ); + assert!(result.is_some()); + } + + #[test] + fn window_sliding_too_large() { + // Query range 600 s but sliding window only covers 300 s + let configs = single_config(make_config(1, "cpu", "Sum", "", 300, "sliding", &[], "")); + let result = find_compatible_aggregation( + &configs, + &req("cpu", &[Statistic::Sum], Some(600_000), &[], ""), + ); + assert!(result.is_none()); + } + + #[test] + fn window_priority_largest_wins() { + let mut configs = HashMap::new(); + configs.insert( + 1, + make_config(1, "cpu", "Sum", "", 300, "tumbling", &[], ""), + ); + configs.insert( + 2, + make_config(2, "cpu", "Sum", "", 900, "tumbling", &[], ""), + ); + // 900_000 ms is divisible by both 300 s and 900 s — prefer 900 s + let result = find_compatible_aggregation( + &configs, + &req("cpu", &[Statistic::Sum], Some(900_000), &[], ""), + ); + assert_eq!(result.unwrap().aggregation_id_for_value, 2); + } + + #[test] + fn spatial_only_no_range() { + // data_range_ms = None → any window size is compatible + let configs = single_config(make_config(1, "cpu", "Sum", "", 900, "tumbling", &[], "")); + let result = + find_compatible_aggregation(&configs, &req("cpu", &[Statistic::Sum], None, &[], "")); + assert!(result.is_some()); + } + + // --- label compatibility --- + + #[test] + fn label_strict_exact() { + let configs = single_config(make_config( + 1, + "cpu", + "Sum", + "", + 300, + "tumbling", + &["job"], + "", + )); + let result = find_compatible_aggregation( + &configs, + &req("cpu", &[Statistic::Sum], Some(300_000), &["job"], ""), + ); + assert!(result.is_some()); + } + + #[test] + fn label_strict_superset_rejected() { + // Config has {job, instance}, query wants only {job} — strict mode rejects + let configs = single_config(make_config( + 1, + "cpu", + "Sum", + "", + 300, + "tumbling", + &["job", "instance"], + "", + )); + let result = find_compatible_aggregation( + &configs, + &req("cpu", &[Statistic::Sum], Some(300_000), &["job"], ""), + ); + assert!(result.is_none()); + } + + #[test] + fn label_mismatch_rejected() { + let configs = single_config(make_config( + 1, + "cpu", + "Sum", + "", + 300, + "tumbling", + &["region"], + "", + )); + let result = find_compatible_aggregation( + &configs, + &req("cpu", &[Statistic::Sum], Some(300_000), &["job"], ""), + ); + assert!(result.is_none()); + } + + // --- spatial filter compatibility --- + + #[test] + fn spatial_filter_empty_both() { + let configs = single_config(make_config(1, "cpu", "Sum", "", 300, "tumbling", &[], "")); + let result = find_compatible_aggregation( + &configs, + &req("cpu", &[Statistic::Sum], Some(300_000), &[], ""), + ); + assert!(result.is_some()); + } + + #[test] + fn spatial_filter_query_empty_config_has_filter() { + // Config scoped to env=prod, query has no filter → reject + let configs = single_config(make_config( + 1, + "cpu", + "Sum", + "", + 300, + "tumbling", + &[], + "env=prod", + )); + let result = find_compatible_aggregation( + &configs, + &req("cpu", &[Statistic::Sum], Some(300_000), &[], ""), + ); + assert!(result.is_none()); + } + + #[test] + fn spatial_filter_same() { + let configs = single_config(make_config( + 1, + "cpu", + "Sum", + "", + 300, + "tumbling", + &[], + "env=prod", + )); + let result = find_compatible_aggregation( + &configs, + &req("cpu", &[Statistic::Sum], Some(300_000), &[], "env=prod"), + ); + assert!(result.is_some()); + } + + #[test] + fn spatial_filter_different() { + let configs = single_config(make_config( + 1, + "cpu", + "Sum", + "", + 300, + "tumbling", + &[], + "env=prod", + )); + let result = find_compatible_aggregation( + &configs, + &req("cpu", &[Statistic::Sum], Some(300_000), &[], "env=staging"), + ); + assert!(result.is_none()); + } + + // --- sub-type --- + + #[test] + fn sub_type_min_matches_min() { + let configs = single_config(make_config( + 1, + "cpu", + "MinMax", + "min", + 300, + "tumbling", + &[], + "", + )); + let result = find_compatible_aggregation( + &configs, + &req("cpu", &[Statistic::Min], Some(300_000), &[], ""), + ); + assert!(result.is_some()); + } + + #[test] + fn sub_type_max_rejects_min() { + // Max statistic requires sub_type == "max", but config has "min" + let configs = single_config(make_config( + 1, + "cpu", + "MinMax", + "min", + 300, + "tumbling", + &[], + "", + )); + let result = find_compatible_aggregation( + &configs, + &req("cpu", &[Statistic::Max], Some(300_000), &[], ""), + ); + assert!(result.is_none()); + } + + // --- multi-population --- + + #[test] + fn multi_pop_finds_key_agg() { + let mut configs = HashMap::new(); + configs.insert( + 10, + make_config( + 10, + "req", + "CountMinSketchWithHeap", + "", + 300, + "tumbling", + &[], + "", + ), + ); + configs.insert( + 11, + make_config( + 11, + "req", + "DeltaSetAggregator", + "", + 300, + "tumbling", + &[], + "", + ), + ); + let result = find_compatible_aggregation( + &configs, + &req("req", &[Statistic::Topk], Some(300_000), &[], ""), + ); + let info = result.unwrap(); + assert_eq!(info.aggregation_id_for_value, 10); + assert_eq!(info.aggregation_id_for_key, 11); + } + + #[test] + fn multi_pop_no_key_agg_returns_none() { + // CountMinSketchWithHeap present but no SetAggregator/DeltaSetAggregator + let configs = single_config(make_config( + 10, + "req", + "CountMinSketchWithHeap", + "", + 300, + "tumbling", + &[], + "", + )); + let result = find_compatible_aggregation( + &configs, + &req("req", &[Statistic::Topk], Some(300_000), &[], ""), + ); + assert!(result.is_none()); + } + + // --- avg (Vec) --- + + #[test] + fn avg_finds_sum_and_count() { + let mut configs = HashMap::new(); + configs.insert( + 1, + make_config(1, "cpu", "Sum", "", 300, "tumbling", &["job"], ""), + ); + configs.insert( + 2, + make_config( + 2, + "cpu", + "CountMinSketch", + "", + 300, + "tumbling", + &["job"], + "", + ), + ); + let result = find_compatible_aggregation( + &configs, + &req( + "cpu", + &[Statistic::Sum, Statistic::Count], + Some(300_000), + &["job"], + "", + ), + ); + assert!(result.is_some()); + } + + #[test] + fn avg_different_windows_rejected() { + let mut configs = HashMap::new(); + configs.insert( + 1, + make_config(1, "cpu", "Sum", "", 300, "tumbling", &["job"], ""), + ); + // Count config has different window_size — must be rejected + configs.insert( + 2, + make_config( + 2, + "cpu", + "CountMinSketch", + "", + 900, + "tumbling", + &["job"], + "", + ), + ); + let result = find_compatible_aggregation( + &configs, + &req( + "cpu", + &[Statistic::Sum, Statistic::Count], + Some(300_000), + &["job"], + "", + ), + ); + assert!(result.is_none()); + } +} diff --git a/asap-common/dependencies/rs/sketch_db_common/src/lib.rs b/asap-common/dependencies/rs/sketch_db_common/src/lib.rs index b77c30a..ee7cdc0 100644 --- a/asap-common/dependencies/rs/sketch_db_common/src/lib.rs +++ b/asap-common/dependencies/rs/sketch_db_common/src/lib.rs @@ -1,15 +1,19 @@ pub mod aggregation_config; pub mod aggregation_reference; +pub mod capability_matching; pub mod enums; pub mod inference_config; pub mod promql_schema; pub mod query_config; +pub mod query_requirements; pub mod traits; pub mod utils; pub use aggregation_config::*; pub use aggregation_reference::*; +pub use capability_matching::find_compatible_aggregation; pub use enums::*; pub use inference_config::*; pub use promql_schema::*; pub use query_config::*; +pub use query_requirements::*; diff --git a/asap-common/dependencies/rs/sketch_db_common/src/query_requirements.rs b/asap-common/dependencies/rs/sketch_db_common/src/query_requirements.rs new file mode 100644 index 0000000..218d94d --- /dev/null +++ b/asap-common/dependencies/rs/sketch_db_common/src/query_requirements.rs @@ -0,0 +1,21 @@ +use promql_utilities::data_model::KeyByLabelNames; +use promql_utilities::query_logics::enums::Statistic; + +/// What a query needs in order to be answered by a stored aggregation. +#[derive(Debug, Clone)] +pub struct QueryRequirements { + /// Metric name (PromQL) or "table_name.value_column" (SQL). + pub metric: String, + /// One or more statistics needed. + /// For avg this is [Sum, Count]; for everything else it is a single element. + /// All statistics must be satisfied by aggregations sharing the same + /// window_size and grouping_labels. + pub statistics: Vec, + /// The span of historical data the query reads, in milliseconds. + /// None for spatial-only queries (no time range). + pub data_range_ms: Option, + /// GROUP BY labels expected in the query result. + pub grouping_labels: KeyByLabelNames, + /// Normalized label filter (produced by normalize_spatial_filter). + pub spatial_filter_normalized: String, +} diff --git a/asap-query-engine/src/bin/show_logical_plans.rs b/asap-query-engine/src/bin/show_logical_plans.rs index f5210cc..24b31ad 100644 --- a/asap-query-engine/src/bin/show_logical_plans.rs +++ b/asap-query-engine/src/bin/show_logical_plans.rs @@ -15,8 +15,9 @@ use datafusion::logical_expr::LogicalPlan; use datafusion_summary_library::{PrecomputedSummaryRead, SummaryInfer, SummaryMergeMultiple}; use promql_utilities::data_model::KeyByLabelNames; use promql_utilities::query_logics::enums::Statistic; +use query_engine_rust::data_model::AggregationIdInfo; use query_engine_rust::engines::simple_engine::{ - AggregationIdInfo, QueryExecutionContext, QueryMetadata, StoreQueryParams, StoreQueryPlan, + QueryExecutionContext, QueryMetadata, StoreQueryParams, StoreQueryPlan, }; use std::collections::HashMap; diff --git a/asap-query-engine/src/data_model/streaming_config.rs b/asap-query-engine/src/data_model/streaming_config.rs index 127a8c2..e3c4450 100644 --- a/asap-query-engine/src/data_model/streaming_config.rs +++ b/asap-query-engine/src/data_model/streaming_config.rs @@ -7,9 +7,11 @@ use std::fs::File; use std::io::BufReader; use std::ops::Index; -use crate::data_model::aggregation_config::AggregationConfig; +use crate::data_model::aggregation_config::{AggregationConfig, AggregationIdInfo}; use crate::data_model::enums::QueryLanguage; use crate::data_model::inference_config::{InferenceConfig, SchemaConfig}; +use sketch_db_common::capability_matching::find_compatible_aggregation as common_find_compatible; +use sketch_db_common::query_requirements::QueryRequirements; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StreamingConfig { @@ -99,6 +101,17 @@ impl StreamingConfig { } } +impl StreamingConfig { + /// Find a compatible aggregation for the given requirements using capability-based matching. + /// Delegates to `sketch_db_common::find_compatible_aggregation`. + pub fn find_compatible_aggregation( + &self, + requirements: &QueryRequirements, + ) -> Option { + common_find_compatible(&self.aggregation_configs, requirements) + } +} + impl Index for StreamingConfig { type Output = AggregationConfig; diff --git a/asap-query-engine/src/engines/logical/plan_builder.rs b/asap-query-engine/src/engines/logical/plan_builder.rs index 400c374..45f5894 100644 --- a/asap-query-engine/src/engines/logical/plan_builder.rs +++ b/asap-query-engine/src/engines/logical/plan_builder.rs @@ -245,9 +245,8 @@ impl QueryExecutionContext { #[cfg(test)] mod tests { use super::*; - use crate::engines::simple_engine::{ - AggregationIdInfo, QueryMetadata, StoreQueryParams, StoreQueryPlan, - }; + use crate::data_model::AggregationIdInfo; + use crate::engines::simple_engine::{QueryMetadata, StoreQueryParams, StoreQueryPlan}; use promql_utilities::data_model::KeyByLabelNames; use std::collections::HashMap; diff --git a/asap-query-engine/src/engines/simple_engine.rs b/asap-query-engine/src/engines/simple_engine.rs index 5f431d5..cd5229d 100644 --- a/asap-query-engine/src/engines/simple_engine.rs +++ b/asap-query-engine/src/engines/simple_engine.rs @@ -1,5 +1,6 @@ use crate::data_model::{ - InferenceConfig, KeyByLabelValues, QueryConfig, QueryLanguage, SchemaConfig, StreamingConfig, + AggregationIdInfo, InferenceConfig, KeyByLabelValues, QueryConfig, QueryLanguage, SchemaConfig, + StreamingConfig, }; use crate::engines::query_result::{InstantVectorElement, QueryResult, RangeVectorElement}; // use crate::stores::promsketch_store::{ @@ -22,6 +23,8 @@ use promql_utilities::query_logics::enums::{QueryPatternType, Statistic}; use promql_utilities::query_logics::parsing::{ get_metric_and_spatial_filter, get_spatial_aggregation_output_labels, get_statistics_to_compute, }; +use sketch_db_common::query_requirements::QueryRequirements; +use sketch_db_common::utils::normalize_spatial_filter; use sql_utilities::ast_matching::QueryType; use sql_utilities::ast_matching::{SQLPatternMatcher, SQLPatternParser, SQLQuery}; @@ -37,16 +40,6 @@ use elastic_dsl_utilities::types::{EsDslQueryPattern, GroupBySpec, MetricAggType // Type alias for merged outputs (single aggregate per key after merging) type MergedOutputsMap = HashMap, Box>; -/// Information about bucket timeline for a single key (used for gap detection) -/// Aggregation IDs and types for key and value -#[derive(Debug, Clone)] -pub struct AggregationIdInfo { - pub aggregation_id_for_key: u64, - pub aggregation_id_for_value: u64, - pub aggregation_type_for_key: String, - pub aggregation_type_for_value: String, -} - /// Metadata extracted from a query, independent of query language #[derive(Debug, Clone)] pub struct QueryMetadata { @@ -1037,6 +1030,98 @@ impl SimpleEngine { } } + /// Extract QueryRequirements from a parsed PromQL match result. + /// Used as the fallback path when no query_configs entry is found. + fn build_query_requirements_promql( + &self, + match_result: &PromQLMatchResult, + query_pattern_type: QueryPatternType, + ) -> QueryRequirements { + let (metric, spatial_filter) = get_metric_and_spatial_filter(match_result); + + let statistics = get_statistics_to_compute(query_pattern_type, match_result); + + let data_range_ms = match query_pattern_type { + QueryPatternType::OnlySpatial => None, + _ => match_result + .get_range_duration() + .map(|d| d.num_seconds() as u64 * 1000), + }; + + let all_labels = match &self.inference_config.schema { + SchemaConfig::PromQL(schema) => schema + .get_labels(&metric) + .cloned() + .unwrap_or_else(KeyByLabelNames::empty), + _ => KeyByLabelNames::empty(), + }; + + let grouping_labels = match query_pattern_type { + QueryPatternType::OnlyTemporal => all_labels, + QueryPatternType::OnlySpatial | QueryPatternType::OneTemporalOneSpatial => { + get_spatial_aggregation_output_labels(match_result, &all_labels) + } + }; + + QueryRequirements { + metric, + statistics, + data_range_ms, + grouping_labels, + spatial_filter_normalized: normalize_spatial_filter(&spatial_filter), + } + } + + /// Extract QueryRequirements from a parsed SQL match result. + /// Used as the fallback path when no query_configs entry is found. + fn build_query_requirements_sql( + &self, + match_result: &SQLQuery, + query_pattern_type: QueryPatternType, + ) -> QueryRequirements { + let query_data = &match_result.query_data[0]; + let metric = query_data.metric.clone(); + + let statistic_name = match query_pattern_type { + QueryPatternType::OneTemporalOneSpatial => match_result.query_data[1] + .aggregation_info + .get_name() + .to_lowercase(), + _ => query_data.aggregation_info.get_name().to_lowercase(), + }; + + let statistics: Vec = if statistic_name == "avg" { + vec![Statistic::Sum, Statistic::Count] + } else if let Ok(stat) = statistic_name.parse::() { + vec![stat] + } else { + vec![] + }; + + let data_range_ms = match query_pattern_type { + QueryPatternType::OnlySpatial => None, + QueryPatternType::OnlyTemporal => { + let scrape_intervals = query_data.time_info.clone().get_duration() as u64; + Some(scrape_intervals * self.prometheus_scrape_interval * 1000) + } + QueryPatternType::OneTemporalOneSpatial => { + let scrape_intervals = + match_result.query_data[1].time_info.clone().get_duration() as u64; + Some(scrape_intervals * self.prometheus_scrape_interval * 1000) + } + }; + + let grouping_labels = KeyByLabelNames::new(query_data.labels.clone().into_iter().collect()); + + QueryRequirements { + metric, + statistics, + data_range_ms, + grouping_labels, + spatial_filter_normalized: normalize_spatial_filter(""), + } + } + fn get_aggregation_id_info(&self, query_config: &QueryConfig) -> AggregationIdInfo { let query_config_aggregations = &query_config.aggregations; let mut aggregation_id_for_key: Option = None; @@ -1186,8 +1271,6 @@ impl SimpleEngine { _ => panic!("Unsupported query type found"), }; - let query_config = self.find_query_config_sql(&query_data)?; - // For nested queries (spatial of temporal), the outer query has no time clause, // so we need to use the inner (temporal) query's time_info to compute query_time let query_time = match query_pattern_type { @@ -1281,10 +1364,11 @@ impl SimpleEngine { }; if statistics_to_compute.len() != 1 { - panic!( + warn!( "Expected exactly one statistic to compute, found {}", statistics_to_compute.len() ); + return None; } let statistic_to_compute = statistics_to_compute.first().unwrap(); @@ -1307,9 +1391,17 @@ impl SimpleEngine { let timestamps = self.calculate_query_timestamps_sql(query_time, query_pattern_type, &match_result); - // Precomputed output - - let agg_info = self.get_aggregation_id_info(query_config); + // Resolve aggregation: try pre-configured query_configs first, fall back to capability matching. + let agg_info: AggregationIdInfo = if let Some(config) = + self.find_query_config_sql(&query_data) + { + self.get_aggregation_id_info(config) + } else { + warn!("No query_config entry for SQL query. Attempting capability-based matching."); + let requirements = self.build_query_requirements_sql(&match_result, query_pattern_type); + self.streaming_config + .find_compatible_aggregation(&requirements)? + }; let metric = &match_result.query_data[0].metric; @@ -1373,8 +1465,6 @@ impl SimpleEngine { query_time: u64, query_data: &SQLQueryData, ) -> Option { - let query_config = self.find_query_config_sql(query_data)?; - // Output labels are the GROUP BY columns (subset of all labels) let query_output_labels = KeyByLabelNames::new( match_result.query_data[0] @@ -1399,10 +1489,11 @@ impl SimpleEngine { }; if statistics_to_compute.len() != 1 { - panic!( + warn!( "Expected exactly one statistic to compute, found {}", statistics_to_compute.len() ); + return None; } let statistic_to_compute = statistics_to_compute.first().unwrap(); @@ -1432,7 +1523,20 @@ impl SimpleEngine { end_timestamp, }; - let agg_info = self.get_aggregation_id_info(query_config); + // Resolve aggregation: try pre-configured query_configs first, fall back to capability matching. + let agg_info: AggregationIdInfo = if let Some(config) = + self.find_query_config_sql(query_data) + { + self.get_aggregation_id_info(config) + } else { + warn!( + "No query_config entry for SQL spatio-temporal query. Attempting capability-based matching." + ); + let requirements = + self.build_query_requirements_sql(match_result, QueryPatternType::OnlyTemporal); + self.streaming_config + .find_compatible_aggregation(&requirements)? + }; let metric = &match_result.query_data[0].metric; let query_plan = self @@ -2014,17 +2118,6 @@ impl SimpleEngine { query: String, time: f64, ) -> Option { - // Track query configuration processing latency - let config_start_time = Instant::now(); - - let query_config = self.find_query_config(&query)?; - - let config_duration = config_start_time.elapsed(); - debug!( - "[LATENCY] Query configuration processing: {:.2}ms", - config_duration.as_secs_f64() * 1000.0 - ); - let query_time = Self::convert_query_time_to_data_time(time); // Parse PromQL AST using promql-parser crate @@ -2105,16 +2198,13 @@ impl SimpleEngine { return None; } }; - let all_labels = promql_schema - .get_labels(&metric) - .cloned() - .unwrap_or_else(|| { - warn!( - "No metric configuration found for '{}', using empty labels", - metric - ); - panic!("No metric configuration found"); - }); + let all_labels = match promql_schema.get_labels(&metric).cloned() { + Some(labels) => labels, + None => { + warn!("No metric configuration found for '{}'", metric); + return None; + } + }; // Determine query output labels based on pattern type // TODO: should we be returning this and using it to convert to final HTTP response? @@ -2146,10 +2236,11 @@ impl SimpleEngine { // Extract statistics to compute using AST-based approach let statistics_to_compute = get_statistics_to_compute(query_pattern_type, &match_result); if statistics_to_compute.len() != 1 { - panic!( + warn!( "Expected exactly one statistic to compute, found {}", statistics_to_compute.len() ); + return None; } let statistic_to_compute = statistics_to_compute.first().unwrap(); @@ -2181,16 +2272,19 @@ impl SimpleEngine { query_kwargs: query_kwargs.clone(), }; - // Track aggregation configuration processing latency - let agg_config_start_time = Instant::now(); - - let agg_info = self.get_aggregation_id_info(query_config); - - let agg_config_duration = agg_config_start_time.elapsed(); - debug!( - "[LATENCY] Aggregation configuration processing: {:.2}ms", - agg_config_duration.as_secs_f64() * 1000.0 - ); + // Resolve aggregation: try pre-configured query_configs first, fall back to capability matching. + let agg_info: AggregationIdInfo = if let Some(config) = self.find_query_config(&query) { + self.get_aggregation_id_info(config) + } else { + warn!( + "No query_config entry for PromQL query '{}'. Attempting capability-based matching.", + query + ); + let requirements = + self.build_query_requirements_promql(&match_result, query_pattern_type); + self.streaming_config + .find_compatible_aggregation(&requirements)? + }; // Create query plan (determines window type and calculates timestamps) let query_plan = self diff --git a/asap-query-engine/src/tests/capability_matching_tests.rs b/asap-query-engine/src/tests/capability_matching_tests.rs new file mode 100644 index 0000000..7dbc75c --- /dev/null +++ b/asap-query-engine/src/tests/capability_matching_tests.rs @@ -0,0 +1,255 @@ +//! Integration tests for capability-based aggregation matching. +//! +//! These tests verify that when no pre-configured query_config entry exists, +//! the engine falls back to searching StreamingConfig by capability, and that +//! the existing query_config path still takes priority when an entry is present. + +use crate::data_model::{ + AggregationConfig, AggregationReference, CleanupPolicy, InferenceConfig, PrecomputedOutput, + PromQLSchema, QueryConfig, QueryLanguage, SchemaConfig, StreamingConfig, +}; +use crate::engines::simple_engine::SimpleEngine; +use crate::precompute_operators::datasketches_kll_accumulator::DatasketchesKLLAccumulator; +use crate::precompute_operators::sum_accumulator::SumAccumulator; +use crate::stores::simple_map_store::SimpleMapStore; +use crate::stores::traits::Store; +use promql_utilities::data_model::KeyByLabelNames; +use std::collections::HashMap; +use std::sync::Arc; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/// Build a minimal `AggregationConfig`. +fn make_agg_config( + id: u64, + metric: &str, + agg_type: &str, + window_size_s: u64, + window_type: &str, + grouping: &[&str], +) -> AggregationConfig { + AggregationConfig { + aggregation_id: id, + aggregation_type: agg_type.to_string(), + aggregation_sub_type: String::new(), + parameters: HashMap::new(), + grouping_labels: KeyByLabelNames::new(grouping.iter().map(|s| s.to_string()).collect()), + aggregated_labels: KeyByLabelNames::empty(), + rollup_labels: KeyByLabelNames::empty(), + original_yaml: String::new(), + window_size: window_size_s, + slide_interval: window_size_s, + window_type: window_type.to_string(), + spatial_filter: String::new(), + spatial_filter_normalized: String::new(), + metric: metric.to_string(), + num_aggregates_to_retain: None, + read_count_threshold: None, + table_name: None, + value_column: None, + } +} + +/// Build a `SimpleEngine` with an explicit list of `AggregationConfig`s and no query_configs. +/// Data is inserted at timestamp 1_000_000 with a window covering [1_000_000 - window_ms, 1_000_000]. +fn engine_no_query_configs( + metric: &str, + schema_labels: &[&str], + agg_configs: Vec, +) -> SimpleEngine { + let mut agg_map = HashMap::new(); + for c in &agg_configs { + agg_map.insert(c.aggregation_id, c.clone()); + } + let streaming_config = Arc::new(StreamingConfig { + aggregation_configs: agg_map, + }); + let store = Arc::new(SimpleMapStore::new( + streaming_config.clone(), + CleanupPolicy::NoCleanup, + )); + + // Insert a data point for each aggregation so queries can actually execute. + let ts = 1_000_000_u64; + for c in &agg_configs { + let window_ms = c.window_size * 1000; + let output = PrecomputedOutput::new(ts - window_ms, ts, None, c.aggregation_id); + let acc: Box = match c.aggregation_type.as_str() { + "DatasketchesKLL" => { + let mut kll = DatasketchesKLLAccumulator::new(200); + kll._update(1.0); + Box::new(kll) + } + _ => Box::new(SumAccumulator::with_sum(42.0)), + }; + store.insert_precomputed_output(output, acc).unwrap(); + } + + let schema_label_names = + KeyByLabelNames::new(schema_labels.iter().map(|s| s.to_string()).collect()); + let promql_schema = PromQLSchema::new().add_metric(metric.to_string(), schema_label_names); + + let inference_config = InferenceConfig { + schema: SchemaConfig::PromQL(promql_schema), + query_configs: vec![], // intentionally empty — forces capability matching + cleanup_policy: CleanupPolicy::NoCleanup, + }; + + SimpleEngine::new( + store, + inference_config, + streaming_config, + 1, + QueryLanguage::promql, + ) +} + +/// Build a `SimpleEngine` with both a query_config entry AND a streaming aggregation. +fn engine_with_query_config( + metric: &str, + schema_labels: &[&str], + agg_config: AggregationConfig, + promql_query: &str, +) -> SimpleEngine { + let agg_id = agg_config.aggregation_id; + let mut agg_map = HashMap::new(); + agg_map.insert(agg_id, agg_config.clone()); + let streaming_config = Arc::new(StreamingConfig { + aggregation_configs: agg_map, + }); + let store = Arc::new(SimpleMapStore::new( + streaming_config.clone(), + CleanupPolicy::NoCleanup, + )); + + let ts = 1_000_000_u64; + let window_ms = agg_config.window_size * 1000; + let output = PrecomputedOutput::new(ts - window_ms, ts, None, agg_id); + store + .insert_precomputed_output(output, Box::new(SumAccumulator::with_sum(99.0))) + .unwrap(); + + let schema_label_names = + KeyByLabelNames::new(schema_labels.iter().map(|s| s.to_string()).collect()); + let promql_schema = PromQLSchema::new().add_metric(metric.to_string(), schema_label_names); + + let query_config = QueryConfig::new(promql_query.to_string()) + .add_aggregation(AggregationReference::new(agg_id, None)); + + let inference_config = InferenceConfig { + schema: SchemaConfig::PromQL(promql_schema), + query_configs: vec![query_config], + cleanup_policy: CleanupPolicy::NoCleanup, + }; + + SimpleEngine::new( + store, + inference_config, + streaming_config, + 1, + QueryLanguage::promql, + ) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +/// When no query_config entry exists but a compatible Sum aggregation does, +/// capability matching should route to it and return a valid context. +#[test] +fn capability_fallback_fires_when_no_config() { + let agg = make_agg_config(1, "cpu", "Sum", 300, "tumbling", &[]); + let engine = engine_no_query_configs("cpu", &[], vec![agg]); + + // sum_over_time(cpu[5m]) — 5 min = 300 s matches the 300 s tumbling config + let ctx = + engine.build_query_execution_context_promql("sum_over_time(cpu[5m])".to_string(), 1000.0); + assert!( + ctx.is_some(), + "Expected capability matching to find a compatible aggregation" + ); + assert_eq!(ctx.unwrap().agg_info.aggregation_id_for_value, 1); +} + +/// When a query_config entry exists, the engine must use it (not capability matching). +/// We verify by giving the config a different agg_id than any compatible-by-type config. +#[test] +fn config_path_takes_priority_over_capability_matching() { + let agg = make_agg_config(42, "cpu", "Sum", 300, "tumbling", &[]); + let engine = engine_with_query_config("cpu", &[], agg, "sum_over_time(cpu[5m])"); + + let ctx = engine + .build_query_execution_context_promql("sum_over_time(cpu[5m])".to_string(), 1000.0) + .expect("should succeed via config path"); + + // The config path routes to agg_id=42 + assert_eq!(ctx.agg_info.aggregation_id_for_value, 42); +} + +/// A query for quantile(0.5) and quantile(0.9) should both resolve to the same +/// KLL aggregation when no query_configs are present. +#[test] +fn quantile_different_values_resolve_to_same_aggregation() { + let kll = make_agg_config(7, "latency", "DatasketchesKLL", 300, "tumbling", &[]); + let engine = engine_no_query_configs("latency", &[], vec![kll]); + + let q50 = engine.build_query_execution_context_promql( + "quantile_over_time(0.5, latency[5m])".to_string(), + 1000.0, + ); + let q90 = engine.build_query_execution_context_promql( + "quantile_over_time(0.9, latency[5m])".to_string(), + 1000.0, + ); + + assert!( + q50.is_some(), + "quantile(0.5) should resolve via capability matching" + ); + assert!( + q90.is_some(), + "quantile(0.9) should resolve via capability matching" + ); + assert_eq!( + q50.unwrap().agg_info.aggregation_id_for_value, + q90.unwrap().agg_info.aggregation_id_for_value, + "Both quantile queries should route to the same KLL aggregation" + ); +} + +/// When no config entry exists and no compatible aggregation exists, return None. +#[test] +fn no_match_returns_none() { + // KLL config present, but query asks for Sum — incompatible + let kll = make_agg_config(1, "cpu", "DatasketchesKLL", 300, "tumbling", &[]); + let engine = engine_no_query_configs("cpu", &[], vec![kll]); + + let ctx = + engine.build_query_execution_context_promql("sum_over_time(cpu[5m])".to_string(), 1000.0); + assert!( + ctx.is_none(), + "Should return None when no compatible aggregation exists" + ); +} + +/// When multiple compatible aggregations exist, the largest window should be preferred. +#[test] +fn priority_largest_window_wins() { + let small = make_agg_config(1, "cpu", "Sum", 300, "tumbling", &[]); + let large = make_agg_config(2, "cpu", "Sum", 900, "tumbling", &[]); + let engine = engine_no_query_configs("cpu", &[], vec![small, large]); + + // sum_over_time(cpu[15m]) = 900 s — both 300 s and 900 s configs match (900 = 3×300), + // but the largest window (900 s, id=2) should be preferred. + let ctx = engine + .build_query_execution_context_promql("sum_over_time(cpu[15m])".to_string(), 1000.0) + .expect("should find a compatible aggregation"); + + assert_eq!( + ctx.agg_info.aggregation_id_for_value, 2, + "The 900 s (id=2) aggregation should be preferred over the 300 s (id=1)" + ); +} diff --git a/asap-query-engine/src/tests/datafusion/plan_builder_regression_tests.rs b/asap-query-engine/src/tests/datafusion/plan_builder_regression_tests.rs index 6f3e4cf..f16835b 100644 --- a/asap-query-engine/src/tests/datafusion/plan_builder_regression_tests.rs +++ b/asap-query-engine/src/tests/datafusion/plan_builder_regression_tests.rs @@ -5,8 +5,9 @@ #[cfg(test)] mod tests { + use crate::data_model::AggregationIdInfo; use crate::engines::simple_engine::{ - AggregationIdInfo, QueryExecutionContext, QueryMetadata, StoreQueryParams, StoreQueryPlan, + QueryExecutionContext, QueryMetadata, StoreQueryParams, StoreQueryPlan, }; use promql_utilities::data_model::KeyByLabelNames; use promql_utilities::query_logics::enums::Statistic; diff --git a/asap-query-engine/src/tests/mod.rs b/asap-query-engine/src/tests/mod.rs index 9988d5f..9fbe6fc 100644 --- a/asap-query-engine/src/tests/mod.rs +++ b/asap-query-engine/src/tests/mod.rs @@ -1,3 +1,4 @@ +pub mod capability_matching_tests; pub mod clickhouse_forwarding_tests; pub mod datafusion; pub mod elastic_dsl_query_tests; diff --git a/asap-query-engine/src/tests/test_utilities/comparison.rs b/asap-query-engine/src/tests/test_utilities/comparison.rs index ed3cd3e..c5c5bdb 100644 --- a/asap-query-engine/src/tests/test_utilities/comparison.rs +++ b/asap-query-engine/src/tests/test_utilities/comparison.rs @@ -2,8 +2,9 @@ //! //! Provides assertion helpers for deep equality checking of query execution contexts. +use crate::data_model::AggregationIdInfo; use crate::engines::simple_engine::{ - AggregationIdInfo, QueryExecutionContext, QueryMetadata, StoreQueryParams, StoreQueryPlan, + QueryExecutionContext, QueryMetadata, StoreQueryParams, StoreQueryPlan, }; use promql_utilities::data_model::KeyByLabelNames; From 0e5655c9b285dea2a575c7f8d22a5fdfb6c620bb Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Thu, 2 Apr 2026 10:16:14 -0400 Subject: [PATCH 4/6] Added some debug statements --- Cargo.lock | 1 + .../rs/sketch_db_common/Cargo.toml | 1 + .../src/capability_matching.rs | 69 +++++++++++++++++-- 3 files changed, 66 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cc56da4..dc07a08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4156,6 +4156,7 @@ dependencies = [ "serde_json", "serde_yaml", "sql_utilities", + "tracing", ] [[package]] diff --git a/asap-common/dependencies/rs/sketch_db_common/Cargo.toml b/asap-common/dependencies/rs/sketch_db_common/Cargo.toml index fda61fd..c9c0ba6 100644 --- a/asap-common/dependencies/rs/sketch_db_common/Cargo.toml +++ b/asap-common/dependencies/rs/sketch_db_common/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true [dependencies] promql_utilities.workspace = true +tracing.workspace = true sql_utilities.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/asap-common/dependencies/rs/sketch_db_common/src/capability_matching.rs b/asap-common/dependencies/rs/sketch_db_common/src/capability_matching.rs index 775ee29..1dce691 100644 --- a/asap-common/dependencies/rs/sketch_db_common/src/capability_matching.rs +++ b/asap-common/dependencies/rs/sketch_db_common/src/capability_matching.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use promql_utilities::data_model::KeyByLabelNames; use promql_utilities::query_logics::enums::Statistic; +use tracing::{debug, warn}; use crate::aggregation_config::{AggregationConfig, AggregationIdInfo}; use crate::query_requirements::QueryRequirements; @@ -131,6 +132,15 @@ pub fn find_compatible_aggregation( return None; } + debug!( + metric = %requirements.metric, + statistics = ?requirements.statistics, + data_range_ms = ?requirements.data_range_ms, + grouping_labels = ?requirements.grouping_labels.labels, + "capability matching: searching {} aggregation config(s)", + configs.len(), + ); + // For each statistic, collect configs that pass all filters, sorted by priority. let mut per_stat_candidates: Vec> = Vec::new(); @@ -141,7 +151,7 @@ pub fn find_compatible_aggregation( let mut candidates: Vec<&AggregationConfig> = configs .values() .filter(|c| { - c.metric == requirements.metric + let ok = c.metric == requirements.metric && types.contains(&c.aggregation_type.as_str()) && sub_type.is_none_or(|st| c.aggregation_sub_type == st) && window_compatible(c, requirements.data_range_ms) @@ -149,15 +159,41 @@ pub fn find_compatible_aggregation( && spatial_filter_compatible( &c.spatial_filter_normalized, &requirements.spatial_filter_normalized, - ) + ); + if !ok { + debug!( + agg_id = c.aggregation_id, + agg_type = %c.aggregation_type, + metric = %c.metric, + window_size_s = c.window_size, + "capability matching: rejected config for {:?}", + stat, + ); + } + ok }) .collect(); candidates.sort_by(|a, b| aggregation_priority(a, b)); if candidates.is_empty() { + warn!( + metric = %requirements.metric, + statistic = ?stat, + "capability matching: no compatible aggregation found for statistic", + ); return None; } + + debug!( + statistic = ?stat, + num_candidates = candidates.len(), + chosen_agg_id = candidates[0].aggregation_id, + chosen_agg_type = %candidates[0].aggregation_type, + chosen_window_size_s = candidates[0].window_size, + "capability matching: found candidates, chose best", + ); + per_stat_candidates.push(candidates); } @@ -166,11 +202,17 @@ pub fn find_compatible_aggregation( // For multi-statistic requirements, the remaining statistics must be served by a // config that agrees on window_size and grouping_labels with the chosen value agg. - for candidates in per_stat_candidates.iter().skip(1) { + for (i, candidates) in per_stat_candidates.iter().enumerate().skip(1) { let found = candidates.iter().any(|c| { c.window_size == value_agg.window_size && c.grouping_labels == value_agg.grouping_labels }); if !found { + warn!( + metric = %requirements.metric, + statistic = ?requirements.statistics[i], + required_window_size_s = value_agg.window_size, + "capability matching: no matching window/labels for multi-statistic requirement", + ); return None; } } @@ -178,13 +220,30 @@ pub fn find_compatible_aggregation( // If value type is multi-population, find the paired key aggregation. let key_agg: &AggregationConfig = if is_multi_population_value_type(&value_agg.aggregation_type) { - configs + let ka = configs .values() - .find(|c| c.metric == requirements.metric && is_key_agg_type(&c.aggregation_type))? + .find(|c| c.metric == requirements.metric && is_key_agg_type(&c.aggregation_type)); + if ka.is_none() { + warn!( + metric = %requirements.metric, + value_agg_type = %value_agg.aggregation_type, + "capability matching: multi-population value agg requires a key agg (SetAggregator/DeltaSetAggregator) but none found", + ); + } + ka? } else { value_agg }; + debug!( + metric = %requirements.metric, + value_agg_id = value_agg.aggregation_id, + value_agg_type = %value_agg.aggregation_type, + key_agg_id = key_agg.aggregation_id, + key_agg_type = %key_agg.aggregation_type, + "capability matching: resolved", + ); + Some(AggregationIdInfo { aggregation_id_for_value: value_agg.aggregation_id, aggregation_type_for_value: value_agg.aggregation_type.clone(), From 0244096da452da328e18f5a8306ef0715e445c40 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Thu, 2 Apr 2026 15:39:54 -0400 Subject: [PATCH 5/6] Updated docs --- .docs/CAPABILITY_MATCHING_DESIGN.md | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/.docs/CAPABILITY_MATCHING_DESIGN.md b/.docs/CAPABILITY_MATCHING_DESIGN.md index 16dad3c..b870b09 100644 --- a/.docs/CAPABILITY_MATCHING_DESIGN.md +++ b/.docs/CAPABILITY_MATCHING_DESIGN.md @@ -2,13 +2,13 @@ ## Problem Statement -The query engine previously routed every incoming query to a sketch aggregation by matching the -query string against a pre-configured `query_configs` table in `InferenceConfig`. This meant: +Currntly, when query engine gets a query, it does an exact string match of that query against the inference config to find what aggregation ID to use. This results in 2 problems: +1. It also requires the inference config to exactly specify each and every query that can be supported. +2. This does NOT handle ad-hoc queries for which we may still have a sketch that is computed. -- Every distinct query string needed its own config entry, even when the same sketch could answer - multiple queries (e.g. `quantile(0.5, metric[5m])` and `quantile(0.9, metric[5m])` both need a - KLL sketch, but each required a separate config row). -- The system could not answer any query it had not been explicitly pre-configured for. +For instance, even when the same sketch can answer both `quantile(0.5, metric[5m])` and `quantile(0.9, metric[5m])`, we need 2 entries in inference config`. + +Moreover, if we now get `quantile(0.6, metric[5m])` or `quantile(0.5, metric[10m])`, those queries get punted even if we can support them. The goal: let the engine understand what a query *needs* and find an existing aggregation that can *provide* it, without requiring a one-to-one mapping in config. @@ -30,6 +30,8 @@ The key insight: **all capability information lives in `AggregationConfig` insid The `QueryConfig` table is just indirection that requires manual pre-population. The fix is to skip it and match against `AggregationConfig` directly when no pre-configured entry exists. +So the logic now is (a) define what the query needs (QueryRequirements), (b) match QueryRequirements against all the available aggregation_ids to see if there is a match. There is no explicit CapabilityProvidedBySketch data structure + --- ## Design Questions and Answers From 442b3a0becfe23d6af1b2190b49a5dc037f6470a Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Mon, 6 Apr 2026 10:20:57 -0400 Subject: [PATCH 6/6] Added asap-quickstart docker compose for devs, that builds ASAPQuery from scratch --- asap-quickstart/Makefile | 12 ++++++ asap-quickstart/docker-compose.dev.yml | 52 ++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 asap-quickstart/Makefile create mode 100644 asap-quickstart/docker-compose.dev.yml diff --git a/asap-quickstart/Makefile b/asap-quickstart/Makefile new file mode 100644 index 0000000..4059744 --- /dev/null +++ b/asap-quickstart/Makefile @@ -0,0 +1,12 @@ +up: + docker compose up -d + +up-dev: + docker build -t sketchdb-base:latest -f ../asap-common/installation/Dockerfile ../asap-common + docker compose -f docker-compose.yml -f docker-compose.dev.yml up -d --build + +down: + docker compose down + +down-dev: + docker compose -f docker-compose.yml -f docker-compose.dev.yml down diff --git a/asap-quickstart/docker-compose.dev.yml b/asap-quickstart/docker-compose.dev.yml new file mode 100644 index 0000000..96b5fc0 --- /dev/null +++ b/asap-quickstart/docker-compose.dev.yml @@ -0,0 +1,52 @@ +name: asapquery-quickstart + +# Development override: builds all ASAP services from local source instead of +# pulling pre-built images from ghcr. +# +# Usage: +# docker compose -f docker-compose.yml -f docker-compose.dev.yml up +# +# External users who just want the quickstart should use docker-compose.yml directly. + +services: + asap-planner-rs: + build: + context: .. + dockerfile: asap-planner-rs/Dockerfile + + asap-summary-ingest: + build: + context: ../asap-summary-ingest + + queryengine: + build: + context: .. + dockerfile: asap-query-engine/Dockerfile + + fake-exporter-constant: + build: + context: ../asap-tools/data-sources/prometheus-exporters/fake_exporter/fake_exporter_rust/fake_exporter + + fake-exporter-linear-up: + build: + context: ../asap-tools/data-sources/prometheus-exporters/fake_exporter/fake_exporter_rust/fake_exporter + + fake-exporter-linear-down: + build: + context: ../asap-tools/data-sources/prometheus-exporters/fake_exporter/fake_exporter_rust/fake_exporter + + fake-exporter-sine: + build: + context: ../asap-tools/data-sources/prometheus-exporters/fake_exporter/fake_exporter_rust/fake_exporter + + fake-exporter-sine-noise: + build: + context: ../asap-tools/data-sources/prometheus-exporters/fake_exporter/fake_exporter_rust/fake_exporter + + fake-exporter-step: + build: + context: ../asap-tools/data-sources/prometheus-exporters/fake_exporter/fake_exporter_rust/fake_exporter + + fake-exporter-exp-up: + build: + context: ../asap-tools/data-sources/prometheus-exporters/fake_exporter/fake_exporter_rust/fake_exporter