From d47d60fabc1b37784860decdd964e6c6045134e5 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Mon, 16 Mar 2026 16:54:36 -0400 Subject: [PATCH 01/14] Simple parsing logic for static set of Elastic DSL queries. --- Cargo.lock | 8 + Cargo.toml | 2 + .../rs/elastic_dsl_utilities/Cargo.toml | 8 + .../rs/elastic_dsl_utilities/src/lib.rs | 7 + .../rs/elastic_dsl_utilities/src/parsing.rs | 338 +++++++++++++++ .../rs/elastic_dsl_utilities/src/pattern.rs | 388 ++++++++++++++++++ .../rs/elastic_dsl_utilities/src/types.rs | 111 +++++ .../supported_es_queries.md | 168 ++++++++ 8 files changed, 1030 insertions(+) create mode 100644 asap-common/dependencies/rs/elastic_dsl_utilities/Cargo.toml create mode 100644 asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs create mode 100644 asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs create mode 100644 asap-common/dependencies/rs/elastic_dsl_utilities/src/pattern.rs create mode 100644 asap-common/dependencies/rs/elastic_dsl_utilities/src/types.rs create mode 100644 asap-common/dependencies/rs/elastic_dsl_utilities/supported_es_queries.md diff --git a/Cargo.lock b/Cargo.lock index f8793a9..5f3eee0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1461,6 +1461,14 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "elastic_dsl_utilities" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "encoding_rs" version = "0.8.35" diff --git a/Cargo.toml b/Cargo.toml index 5101fb6..b082be7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "asap-common/sketch-core", "asap-common/dependencies/rs/promql_utilities", "asap-common/dependencies/rs/sql_utilities", + "asap-common/dependencies/rs/elastic_dsl_utilities", "asap-common/dependencies/rs/sketch_db_common", "asap-common/dependencies/rs/datafusion_summary_library", "asap-query-engine", @@ -33,3 +34,4 @@ promql_utilities = { path = "asap-common/dependencies/rs/promql_utilities" } sql_utilities = { path = "asap-common/dependencies/rs/sql_utilities" } sketch_db_common = { path = "asap-common/dependencies/rs/sketch_db_common" } datafusion_summary_library = { path = "asap-common/dependencies/rs/datafusion_summary_library" } +elastic_dsl_utilities = { path = "asap-common/dependencies/rs/elastic_dsl_utilities" } diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/Cargo.toml b/asap-common/dependencies/rs/elastic_dsl_utilities/Cargo.toml new file mode 100644 index 0000000..73b6cce --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "elastic_dsl_utilities" +edition.workspace = true +version.workspace = true + +[dependencies] +serde.workspace = true +serde_json.workspace = true diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs new file mode 100644 index 0000000..c03929f --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs @@ -0,0 +1,7 @@ +pub mod parsing; +pub mod pattern; +pub mod types; + +pub use parsing::*; +pub use pattern::{classify, parse_and_classify}; +pub use types::*; diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs new file mode 100644 index 0000000..de87035 --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs @@ -0,0 +1,338 @@ +use serde_json::Value; + +use crate::types::{BucketSpec, LabelFilter, MetricAggType, MetricAggregation, TimeRange}; + +// --------------------------------------------------------------------------- +// Metric aggregation helpers +// --------------------------------------------------------------------------- + +/// Try to extract a list of metric aggregations from the top-level `"aggs"` +/// object of a query. Returns `None` if *any* aggregation entry is not one of +/// the recognised metric types (avg / min / max / sum / percentiles). +pub fn extract_metric_aggs(aggs: &Value) -> Option> { + let obj = aggs.as_object()?; + if obj.is_empty() { + return None; + } + + let mut result = Vec::with_capacity(obj.len()); + for (result_name, agg_body) in obj { + // Each aggregation body is an object that should contain exactly one + // recognised metric aggregation key. + let body_obj = agg_body.as_object()?; + let mut found = None; + for (key, inner) in body_obj { + if let Some(agg_type) = MetricAggType::from_str(key) { + let field = inner.get("field")?.as_str()?.to_owned(); + found = Some(MetricAggregation { + result_name: result_name.clone(), + agg_type, + field, + }); + break; + } + } + result.push(found?); + } + Some(result) +} + +// --------------------------------------------------------------------------- +// Time range helpers +// --------------------------------------------------------------------------- + +/// Try to extract a `TimeRange` from a bare `{"range": {"": {...}}}` +/// query value. Accepts either string or numeric values for gte/lte. +pub fn extract_time_range(query: &Value) -> Option { + let range_obj = query.get("range")?.as_object()?; + // There should be exactly one field entry in the range object. + if range_obj.len() != 1 { + return None; + } + let (field, bounds) = range_obj.iter().next()?; + let gte = bounds.get("gte").and_then(value_to_string); + let lte = bounds.get("lte").and_then(value_to_string); + Some(TimeRange { + field: field.clone(), + gte, + lte, + }) +} + +fn value_to_string(v: &Value) -> Option { + match v { + Value::String(s) => Some(s.clone()), + Value::Number(n) => Some(n.to_string()), + _ => None, + } +} + +// --------------------------------------------------------------------------- +// Term / label-filter helpers +// --------------------------------------------------------------------------- + +/// Strip the `.keyword` suffix from a field name, if present. +fn strip_keyword_suffix(field: &str) -> &str { + field.strip_suffix(".keyword").unwrap_or(field) +} + +/// Try to extract a `LabelFilter` from a single `"term"` query object. +/// +/// Handles both the opensearch-dsl long form: +/// ```json +/// { "term": { "field": { "value": "val" } } } +/// ``` +/// and the ES shorthand: +/// ```json +/// { "term": { "field": "val" } } +/// ``` +pub fn extract_label_filter_from_term(term_query: &Value) -> Option { + let term_obj = term_query.get("term")?.as_object()?; + if term_obj.len() != 1 { + return None; + } + let (raw_field, field_value) = term_obj.iter().next()?; + let field = strip_keyword_suffix(raw_field).to_owned(); + let value = if let Some(s) = field_value.as_str() { + // Shorthand: "field": "value" + s.to_owned() + } else if let Some(inner) = field_value.as_object() { + // Long form: "field": { "value": "..." } + inner.get("value")?.as_str()?.to_owned() + } else { + return None; + }; + Some(LabelFilter { field, value }) +} + +// --------------------------------------------------------------------------- +// Bool filter helpers +// --------------------------------------------------------------------------- + +/// Try to extract a list of label filters (and optionally a time range) from a +/// `{"bool": {"filter": [...]}}` query structure. +/// +/// The `filter` array must contain at least a term query, and may also contain +/// a range query. Additional (unrecognised) entries in the array cause this +/// function to return `None`. +pub fn extract_label_filters(query: &Value) -> Option<(Vec, Option)> { + let filter_clauses = query.get("bool")?.get("filter")?; + + // The filter value may be an array (multiple clauses) or a single object. + let clauses: Vec<&Value> = if let Some(arr) = filter_clauses.as_array() { + arr.iter().collect() + } else if filter_clauses.is_object() { + vec![filter_clauses] + } else { + return None; + }; + + let mut label_filters: Vec = Vec::new(); + let mut time_range: Option = None; + + for clause in clauses { + if clause.get("term").is_some() { + label_filters.push(extract_label_filter_from_term(clause)?); + } else if clause.get("range").is_some() { + if time_range.is_some() { + continue; // Multiple range clauses - ignore all but the first. + } + time_range = Some(extract_time_range(clause)?); + } else { + // Unknown clause type in the filter. + return None; + } + } + + Some((label_filters, time_range)) +} + +// --------------------------------------------------------------------------- +// Batched-filters helpers +// --------------------------------------------------------------------------- + +/// Try to extract a batched filters aggregation (essentially the groupby buckets) from the top-level `"aggs"` +/// object. +/// +/// Expected shape: +/// ```json +/// { +/// "aggs": { +/// "": { +/// "filters": { +/// "filters": { +/// "": { "term": { ... } }, +/// "": { "term": { ... } } +/// } +/// }, +/// "aggs": { ... metric aggs ... } +/// } +/// } +/// } +/// ``` +/// +/// Returns `(result_name, buckets, metric_aggregations)` on success. +pub fn extract_batched_filters( + aggs: &Value, +) -> Option<(String, Vec, Vec)> { + let obj = aggs.as_object()?; + // There must be exactly one top-level aggregation entry. + if obj.len() != 1 { + return None; + } + let (result_name, agg_body) = obj.iter().next()?; + + // The body must have a "filters" key (the bucket aggregation type). + let filters_agg = agg_body.get("filters")?; + let filters_map = filters_agg.get("filters")?.as_object()?; + + let mut buckets = Vec::with_capacity(filters_map.len()); + for (bucket_name, bucket_filter) in filters_map { + let label_filter = extract_label_filter_from_term(bucket_filter)?; + buckets.push(BucketSpec { + bucket_name: bucket_name.clone(), + filter: label_filter, + }); + } + + if buckets.is_empty() { + return None; + } + + // The nested "aggs" holds the metric sub-aggregations. + let nested_aggs = agg_body.get("aggs").unwrap_or(&Value::Null); + let metric_aggs = extract_metric_aggs(nested_aggs)?; + + Some((result_name.clone(), buckets, metric_aggs)) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_extract_metric_aggs_basic() { + let aggs = json!({ + "avg_latency": { "avg": { "field": "latency_ms" } }, + "max_latency": { "max": { "field": "latency_ms" } }, + "p95_latency": { "percentiles": { "field": "latency_ms", "percents": [95] } } + }); + let result = extract_metric_aggs(&aggs).unwrap(); + assert_eq!(result.len(), 3); + let avg = result.iter().find(|a| a.result_name == "avg_latency").unwrap(); + assert_eq!(avg.agg_type, MetricAggType::Avg); + assert_eq!(avg.field, "latency_ms"); + let p95 = result.iter().find(|a| a.result_name == "p95_latency").unwrap(); + assert_eq!(p95.agg_type, MetricAggType::Percentiles); + assert_eq!(p95.field, "latency_ms"); + } + + #[test] + fn test_extract_metric_aggs_rejects_unknown_type() { + let aggs = json!({ + "by_service": { "terms": { "field": "service" } } + }); + assert!(extract_metric_aggs(&aggs).is_none()); + } + + #[test] + fn test_extract_time_range() { + let query = json!({ + "range": { + "@timestamp": { "gte": "now-30s", "lte": "now" } + } + }); + let tr = extract_time_range(&query).unwrap(); + assert_eq!(tr.field, "@timestamp"); + assert_eq!(tr.gte.as_deref(), Some("now-30s")); + assert_eq!(tr.lte.as_deref(), Some("now")); + } + + #[test] + fn test_extract_label_filter_long_form() { + let term = json!({ "term": { "service.keyword": { "value": "frontend" } } }); + let f = extract_label_filter_from_term(&term).unwrap(); + assert_eq!(f.field, "service"); + assert_eq!(f.value, "frontend"); + } + + #[test] + fn test_extract_label_filter_shorthand() { + let term = json!({ "term": { "env": "production" } }); + let f = extract_label_filter_from_term(&term).unwrap(); + assert_eq!(f.field, "env"); + assert_eq!(f.value, "production"); + } + + #[test] + fn test_extract_bool_filter_term_and_range() { + let query = json!({ + "bool": { + "filter": [ + { "term": { "service.keyword": { "value": "frontend" } } }, + { "term": { "env.keyword": { "value": "production" } } }, + { "range": { "@timestamp": { "gte": "now-30s", "lte": "now" } } } + ] + } + }); + let (lf, tr) = extract_label_filters(&query).unwrap(); + assert_eq!(lf[0].field, "service"); + assert_eq!(lf[0].value, "frontend"); + assert_eq!(lf[1].field, "env"); + assert_eq!(lf[1].value, "production"); + let tr = tr.unwrap(); + assert_eq!(tr.field, "@timestamp"); + } + + #[test] + fn test_extract_bool_filter_term_only() { + let query = json!({ + "bool": { + "filter": [ + { "term": { "env": "staging" } } + ] + } + }); + let (lf, tr) = extract_label_filters(&query).unwrap(); + assert_eq!(lf[0].field, "env"); + assert_eq!(lf[0].value, "staging"); + assert!(tr.is_none()); + } + + #[test] + fn test_extract_bool_filter_single_object() { + // filter as a plain object (not array) + let query = json!({ + "bool": { + "filter": { "term": { "region": "us-east-1" } } + } + }); + let (lf, tr) = extract_label_filters(&query).unwrap(); + assert_eq!(lf[0].field, "region"); + assert_eq!(lf[0].value, "us-east-1"); + assert!(tr.is_none()); + } + + #[test] + fn test_extract_batched_filters() { + let aggs = json!({ + "by_service": { + "filters": { + "filters": { + "frontend": { "term": { "service.keyword": { "value": "frontend" } } }, + "backend": { "term": { "service.keyword": { "value": "backend" } } } + } + }, + "aggs": { + "avg_latency": { "avg": { "field": "latency_ms" } } + } + } + }); + let (name, buckets, metric_aggs) = extract_batched_filters(&aggs).unwrap(); + assert_eq!(name, "by_service"); + assert_eq!(buckets.len(), 2); + assert_eq!(metric_aggs.len(), 1); + assert_eq!(metric_aggs[0].agg_type, MetricAggType::Avg); + } +} diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/pattern.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/pattern.rs new file mode 100644 index 0000000..0656840 --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/pattern.rs @@ -0,0 +1,388 @@ +use serde_json::Value; + +use crate::{ + parsing::{ + extract_batched_filters, extract_label_filters, extract_metric_aggs, + extract_time_range, + }, + types::EsDslQueryPattern, +}; + +/// Classify a parsed ES DSL query `Value` into one of the recognised +/// sketch-acceleratable patterns (or `Unknown` if it does not match any). +/// +/// The classification logic follows the three templates documented in +/// `supported_es_queries.md`: +/// +/// - **Template 1** (`SimpleAggregation`): `size=0`, top-level metric `aggs` +/// (avg/min/max/sum/percentiles), optional bare `range` query. +/// - **Template 2** (`FilteredAggregation`): `size=0`, top-level metric `aggs` +/// (avg/min/max/sum/percentiles), +/// `bool.filter` query combining a `term` label filter and an optional +/// `range` time filter. +/// - **Template 3** (`FilteredAggregationBatched`): `size=0`, single top-level +/// `filters` bucket aggregation with named buckets, nested metric sub-aggs, +/// optional bare `range` top-level query. +/// +/// TODO: More robust parsing logic and complex pattern support (e.g. generic pattern building, structured AST, etc). +pub fn classify(value: &Value) -> EsDslQueryPattern { + // Gate: size must be explicitly 0. + match value.get("size") { + Some(Value::Number(n)) => { + if n.as_u64() != Some(0) { + return EsDslQueryPattern::Unknown; + } + } + _ => return EsDslQueryPattern::Unknown, + } + + + let aggs = value.get("aggs").unwrap_or(&Value::Null); + let query = value.get("query"); + + // ------------------------------------------------------------------ + // Template 3: batched filters aggregation. + // ------------------------------------------------------------------ + if let Some((result_name, buckets, aggregations)) = extract_batched_filters(aggs) { + // Allow an optional top-level range query alongside the batched aggs. + let time_range = query.and_then(|q| extract_time_range(q)); + // If there *is* a query but it's not a range, reject the match. + if query.is_some() && time_range.is_none() && query != Some(&Value::Null) { + // Non-range query next to batched filters — not a supported pattern. + } else { + return EsDslQueryPattern::FilteredAggregationBatched { + result_name, + buckets, + time_range, + aggregations, + }; + } + } + + // ------------------------------------------------------------------ + // Templates 1 & 2 require metric-only top-level aggregations. + // ------------------------------------------------------------------ + let aggregations = match extract_metric_aggs(aggs) { + Some(a) => a, + None => return EsDslQueryPattern::Unknown, + }; + + match query { + // No query clause at all -> Template 1 without time range. + None => EsDslQueryPattern::SimpleAggregation { + time_range: None, + aggregations, + }, + + Some(q) => { + // Template 2: bool.filter with term (+ optional range). + if let Some((label_filters, time_range)) = extract_label_filters(q) { + return EsDslQueryPattern::FilteredAggregation { + label_filters, + time_range, + aggregations, + }; + } + + // Template 1: bare range query. + if let Some(time_range) = extract_time_range(q) { + return EsDslQueryPattern::SimpleAggregation { + time_range: Some(time_range), + aggregations, + }; + } + + // Query is present but doesn't match any supported form. + EsDslQueryPattern::Unknown + } + } +} + +/// Parse a raw JSON string as an ES DSL query and classify it into a sketch-acceleratable pattern, returning the extracted structured components if successful. +/// +/// Returns a `serde_json::Error` if the input is not valid JSON. +pub fn parse_and_classify(json: &str) -> Result { + let value: Value = serde_json::from_str(json)?; + Ok(classify(&value)) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::{LabelFilter, MetricAggType}; + use serde_json::json; + + // ----------------------------------------------------------------------- + // Template 1 — Simple Aggregation + // ----------------------------------------------------------------------- + + #[test] + fn test_t1_simple_agg_with_time_range() { + let query = json!({ + "size": 0, + "query": { + "range": { + "@timestamp": { "gte": "now-30s", "lte": "now" } + } + }, + "aggs": { + "avg_latency": { "avg": { "field": "latency_ms" } }, + "max_latency": { "max": { "field": "latency_ms" } } + } + }); + + let pattern = classify(&query); + match pattern { + EsDslQueryPattern::SimpleAggregation { time_range, aggregations } => { + let tr = time_range.unwrap(); + assert_eq!(tr.field, "@timestamp"); + assert_eq!(tr.gte.as_deref(), Some("now-30s")); + assert_eq!(tr.lte.as_deref(), Some("now")); + assert_eq!(aggregations.len(), 2); + } + other => panic!("Expected SimpleAggregation, got {:?}", other), + } + } + + #[test] + fn test_t1_simple_agg_no_query() { + let query = json!({ + "size": 0, + "aggs": { + "total_bytes": { "sum": { "field": "bytes" } } + } + }); + + let pattern = classify(&query); + match pattern { + EsDslQueryPattern::SimpleAggregation { time_range, aggregations } => { + assert!(time_range.is_none()); + assert_eq!(aggregations.len(), 1); + assert_eq!(aggregations[0].agg_type, MetricAggType::Sum); + assert_eq!(aggregations[0].field, "bytes"); + } + other => panic!("Expected SimpleAggregation, got {:?}", other), + } + } + + #[test] + fn test_t1_percentiles_aggregation() { + let query = json!({ + "size": 0, + "aggs": { + "p95_latency": { "percentiles": { "field": "latency_ms", "percents": [95] } } + } + }); + + let pattern = classify(&query); + match pattern { + EsDslQueryPattern::SimpleAggregation { time_range, aggregations } => { + assert!(time_range.is_none()); + assert_eq!(aggregations.len(), 1); + assert_eq!(aggregations[0].agg_type, MetricAggType::Percentiles); + assert_eq!(aggregations[0].field, "latency_ms"); + } + other => panic!("Expected SimpleAggregation, got {:?}", other), + } + } + + #[test] + fn test_neg_size_absent_is_unknown() { + let query = json!({ + "aggs": { + "min_val": { "min": { "field": "response_time" } } + } + }); + + assert_eq!(classify(&query), EsDslQueryPattern::Unknown); + } + + // ----------------------------------------------------------------------- + // Template 2 — Filtered Aggregation + // ----------------------------------------------------------------------- + + #[test] + fn test_t2_filtered_agg_term_and_range() { + let query = json!({ + "size": 0, + "query": { + "bool": { + "filter": [ + { "term": { "service.keyword": { "value": "frontend" } } }, + { "term": { "env.keyword": { "value": "staging" } } }, + { "range": { "@timestamp": { "gte": "now-30s", "lte": "now" } } } + ] + } + }, + "aggs": { + "avg_latency": { "avg": { "field": "latency_ms" } } + } + }); + + let pattern = classify(&query); + match pattern { + EsDslQueryPattern::FilteredAggregation { label_filters, time_range, aggregations } => { + assert_eq!(label_filters[0].field, "service"); + assert_eq!(label_filters[0].value, "frontend"); + assert_eq!(label_filters[1].field, "env"); + assert_eq!(label_filters[1].value, "staging"); + let tr = time_range.unwrap(); + assert_eq!(tr.field, "@timestamp"); + assert_eq!(aggregations.len(), 1); + } + other => panic!("Expected FilteredAggregation, got {:?}", other), + } + } + + #[test] + fn test_t2_filtered_agg_term_only() { + let query = json!({ + "size": 0, + "query": { + "bool": { + "filter": [ + { "term": { "env": "staging" } } + ] + } + }, + "aggs": { + "p99_latency": { "max": { "field": "latency_ms" } } + } + }); + + let pattern = classify(&query); + match pattern { + EsDslQueryPattern::FilteredAggregation { label_filters, time_range, aggregations } => { + assert_eq!(label_filters[0], LabelFilter { field: "env".into(), value: "staging".into() }); + assert!(time_range.is_none()); + assert_eq!(aggregations.len(), 1); + } + other => panic!("Expected FilteredAggregation, got {:?}", other), + } + } + + // ----------------------------------------------------------------------- + // Template 3 — Filtered Aggregation Batched + // ----------------------------------------------------------------------- + + #[test] + fn test_t3_batched_filters() { + let query = json!({ + "size": 0, + "aggs": { + "by_service": { + "filters": { + "filters": { + "frontend": { "term": { "service.keyword": { "value": "frontend" } } }, + "backend": { "term": { "service.keyword": { "value": "backend" } } } + } + }, + "aggs": { + "avg_latency": { "avg": { "field": "latency_ms" } } + } + } + } + }); + + let pattern = classify(&query); + match pattern { + EsDslQueryPattern::FilteredAggregationBatched { result_name, buckets, time_range, aggregations } => { + assert_eq!(result_name, "by_service"); + assert_eq!(buckets.len(), 2); + assert!(time_range.is_none()); + assert_eq!(aggregations.len(), 1); + assert_eq!(aggregations[0].agg_type, MetricAggType::Avg); + } + other => panic!("Expected FilteredAggregationBatched, got {:?}", other), + } + } + + #[test] + fn test_t3_batched_filters_with_time_range() { + let query = json!({ + "size": 0, + "query": { + "range": { + "@timestamp": { "gte": "now-1m", "lte": "now" } + } + }, + "aggs": { + "by_region": { + "filters": { + "filters": { + "us-east": { "term": { "region": "us-east-1" } }, + "us-west": { "term": { "region": "us-west-2" } } + } + }, + "aggs": { + "total_requests": { "sum": { "field": "request_count" } } + } + } + } + }); + + let pattern = classify(&query); + match pattern { + EsDslQueryPattern::FilteredAggregationBatched { time_range, aggregations, .. } => { + let tr = time_range.unwrap(); + assert_eq!(tr.field, "@timestamp"); + assert_eq!(tr.gte.as_deref(), Some("now-1m")); + assert_eq!(aggregations[0].agg_type, MetricAggType::Sum); + } + other => panic!("Expected FilteredAggregationBatched, got {:?}", other), + } + } + + // ----------------------------------------------------------------------- + // Negative cases + // ----------------------------------------------------------------------- + + #[test] + fn test_neg_size_nonzero_is_unknown() { + let query = json!({ + "size": 10, + "aggs": { + "avg_val": { "avg": { "field": "cpu" } } + } + }); + assert_eq!(classify(&query), EsDslQueryPattern::Unknown); + } + + #[test] + fn test_neg_unknown_agg_type_is_unknown() { + let query = json!({ + "size": 0, + "aggs": { + "by_service": { "terms": { "field": "service" } } + } + }); + assert_eq!(classify(&query), EsDslQueryPattern::Unknown); + } + + #[test] + fn test_neg_unsupported_query_type_is_unknown() { + // A match query in the top-level query is not a supported pattern. + let query = json!({ + "size": 0, + "query": { + "match": { "message": "error" } + }, + "aggs": { + "count": { "sum": { "field": "bytes" } } + } + }); + assert_eq!(classify(&query), EsDslQueryPattern::Unknown); + } + + #[test] + fn test_parse_and_classify_roundtrip() { + let json = r#"{"size":0,"aggs":{"avg_cpu":{"avg":{"field":"cpu_usage"}}}}"#; + let result = parse_and_classify(json).unwrap(); + assert!(matches!(result, EsDslQueryPattern::SimpleAggregation { .. })); + } + + #[test] + fn test_parse_and_classify_invalid_json() { + assert!(parse_and_classify("{invalid}").is_err()); + } +} diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/types.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/types.rs new file mode 100644 index 0000000..17c41c7 --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/types.rs @@ -0,0 +1,111 @@ +use serde::{Deserialize, Serialize}; + +/// The metric aggregation function type. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum MetricAggType { + Avg, + Min, + Max, + Sum, + Percentiles, +} + +impl MetricAggType { + /// Returns the JSON key name for this aggregation type. + pub fn as_str(&self) -> &'static str { + match self { + MetricAggType::Avg => "avg", + MetricAggType::Min => "min", + MetricAggType::Max => "max", + MetricAggType::Sum => "sum", + MetricAggType::Percentiles => "percentiles", + } + } + + /// Try to parse from a string key. + pub fn from_str(s: &str) -> Option { + match s { + "avg" => Some(MetricAggType::Avg), + "min" => Some(MetricAggType::Min), + "max" => Some(MetricAggType::Max), + "sum" => Some(MetricAggType::Sum), + "percentiles" => Some(MetricAggType::Percentiles), + _ => None, + } + } +} + +/// A simple equality filter on a label (string-valued field). +/// The `.keyword` suffix is stripped from the field name. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct LabelFilter { + pub field: String, + pub value: String, +} + +/// An optional time range applied to a timestamp field. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TimeRange { + pub field: String, + pub gte: Option, + pub lte: Option, +} + +/// A single metric aggregation extracted from an ES query. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct MetricAggregation { + /// The top-level aggregation result key (the name given by the user). + pub result_name: String, + pub agg_type: MetricAggType, + /// The document field being aggregated over. + pub field: String, +} + +/// One bucket in a batched-filter (multi-bucket) aggregation. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct BucketSpec { + pub bucket_name: String, + pub filter: LabelFilter, +} + +/// The classified pattern of an ES DSL query, along with the extracted +/// structured components needed to route it to a sketch fast-path. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum EsDslQueryPattern { + /// Template 1: metric aggregations over all data, with an optional time + /// range filter. + /// + /// ES: `{ "size": 0, "query": { "range": {...} }, "aggs": { ... } }` + SimpleAggregation { + time_range: Option, + aggregations: Vec, + }, + + /// Template 2: metric aggregations with a label equality filter plus an + /// optional time range, expressed as a bool filter. + /// + /// ES: `{ "size": 0, "query": { "bool": { "filter": [...] } }, "aggs": { ... } }` + FilteredAggregation { + label_filters: Vec, + time_range: Option, + aggregations: Vec, + }, + + /// Template 3: a single top-level bucket aggregation that groups documents + /// into named buckets via per-bucket term filters, with nested metric + /// sub-aggregations. + /// + /// ES: `{ "size": 0, "aggs": { "": { "filters": { "filters": {...} }, + /// "aggs": { ... } } } }` + FilteredAggregationBatched { + /// The name of the outer (bucket) aggregation. + result_name: String, + buckets: Vec, + time_range: Option, + aggregations: Vec, + }, + + /// The query did not match any recognised sketch-acceleratable pattern. + Unknown, +} diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/supported_es_queries.md b/asap-common/dependencies/rs/elastic_dsl_utilities/supported_es_queries.md new file mode 100644 index 0000000..1cfaa7e --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/supported_es_queries.md @@ -0,0 +1,168 @@ +# (ASAP) Potential Elasticsearch Queries to be Supported + +Info dump about what Elasticsearch queries could be supported by ASAP based on my understanding of the query engine. + +## Supportable Query Types + +Elastic DSL single level aggregation queries that explicitly don't return any records (`"size": 0`), with basic filtering by column label values, seem the most similar to the Prometheus style queries, so they could maybe be translated. + +Here is a list of ES aggregation functions that seem to map well to the current sketches available: + +- `"percentiles"` (quantiles) +- `"min"` +- `"max"` +- `"sum"` +- `"avg"` + +## Query Templates + +Templates outlining the basic structure for various kinds of Elasticsearch queries. For the following examples, we use `${name}` syntax to denote dynamic/user provided variables. In each query, the time range specifier would be optional. + +### 1\. Simple Aggregation + +Compute summary statistics on all data for one or more data columns (metrics). + +```json +{ + "size": 0, + "query": { + "range": { + "@timestamp": { + "gte": "now-30s", + "lte": "now" + } + } + }, + "aggs": { + "${result1}": { + "${agg_type1}": { + "field": "${metric_name1}", + } + }, + "${result2}": { + "${agg_type2}": { + "field": "${metric_name2}", + "${param1}": "${arg1}" + } + } + } +} +``` + +This is semantically equivalent to the following SQL. + +```sql +SELECT + AGG1(metric_name1) AS result1, + AGG2(metric_name2) AS result2 +FROM table_name +WHERE time_created >= NOW() - INTERVAL '30 seconds'; +``` + +### 2\. Filtered Aggregation + +Compute summary statistics for metrics over a specific combination of label values. + +```json +{ + "size": 0, + "query": { + "bool": { + "filter": { + "term": { "${field1}.keyword": "${value1}" }, + "range": { + "@timestamp": { + "gte": "now-30s", + "lte": "now" + } + } + } + } + }, + "aggs": { + "${result1}": { + "${agg_type1}": { + "field": "${metric_name1}", + } + }, + "${result2}": { + "${agg_type2}": { + "field": "${metric_name2}", + "${param1}": "${arg1}" + } + } + } +} +``` + +The corresponding SQL is as follows. + +```sql +SELECT + AGG1(metric_name1) AS result1, + AGG2(metric_name2) AS result2 +FROM table_name +WHERE field1 = value1 AND time_created >= NOW() - INTERVAL '30 seconds'; +``` + +### 3\. Filtered Aggregation (Batched) + +Compute summary statistics for a metric, grouping by column labels. + +```json +{ + "size": 0, + "query": { + "range": { + "@timestamp": { + "gte": "now-30s", + "lte": "now" + } + } + }, + "aggs": { + "${result1}": { + "filters": { + "filters": { + "${bucket1}": { + "term": { "${field1}.keyword": "${value1}" }, + }, + "${bucket2}": { + "term": { "${field1}.keyword": "${value2}" }, + } + } + }, + "aggs": { + "${agg_name1}": { + "${agg_type1}": { + "field": "${metric_name1}", + } + } + } + } + } +} +``` + +Here is the corresponding SQL. + +```sql +SELECT + bucket, + AGG1(metric_name1) AS agg_name1 +FROM ( + SELECT + CASE + WHEN field1 = @value1 THEN 'bucket1' + WHEN field1 = @value2 THEN 'bucket2' + ELSE 'drop' + END AS bucket, + metric_name1 + FROM table_name + WHERE time_created >= NOW() - INTERVAL '30 seconds' +) +WHERE bucket != 'drop' +GROUP BY bucket; +``` + +Of course, if you are bucketing by every unique value, then the above statement reduces to a regular `GROUP BY`. From 7d66d8c1614100bc2ae91a1097d9dba5612f7451 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Mon, 16 Mar 2026 20:20:44 -0400 Subject: [PATCH 02/14] Extract aggregation func kwargs during ES DSL parsing. --- .../dependencies/rs/elastic_dsl_utilities/src/parsing.rs | 7 +++++++ .../dependencies/rs/elastic_dsl_utilities/src/types.rs | 1 + 2 files changed, 8 insertions(+) diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs index de87035..5f00c45 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs @@ -24,10 +24,16 @@ pub fn extract_metric_aggs(aggs: &Value) -> Option> { for (key, inner) in body_obj { if let Some(agg_type) = MetricAggType::from_str(key) { let field = inner.get("field")?.as_str()?.to_owned(); + let kwargs_map = inner.as_object()?.iter() + .filter(|(k, _)| *k != "field") + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let kwargs = serde_json::Value::Object(kwargs_map); found = Some(MetricAggregation { result_name: result_name.clone(), agg_type, field, + params: Some(kwargs), }); break; } @@ -226,6 +232,7 @@ mod tests { let p95 = result.iter().find(|a| a.result_name == "p95_latency").unwrap(); assert_eq!(p95.agg_type, MetricAggType::Percentiles); assert_eq!(p95.field, "latency_ms"); + assert_eq!(p95.params.as_ref().unwrap().get("percents").unwrap(), &json!([95])); } #[test] diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/types.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/types.rs index 17c41c7..9dd6091 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/types.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/types.rs @@ -60,6 +60,7 @@ pub struct MetricAggregation { pub agg_type: MetricAggType, /// The document field being aggregated over. pub field: String, + pub params: Option, // Optional additional parameters (e.g. percentiles values) } /// One bucket in a batched-filter (multi-bucket) aggregation. From f2fb41eaccfd3c1a3cd2849e96f148a8a02aff29 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Mon, 16 Mar 2026 20:48:39 -0400 Subject: [PATCH 03/14] Initial code for manual QueryExecutionContext creation (ES DSL). Currently does not parse date ranges, and no integration with aggregation/inference config. --- Cargo.lock | 1 + asap-query-engine/Cargo.toml | 1 + .../src/engines/simple_engine.rs | 182 +++++++++++++++++- 3 files changed, 179 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f3eee0..58dd05f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3165,6 +3165,7 @@ dependencies = [ "datafusion", "datafusion_summary_library", "dsrs", + "elastic_dsl_utilities", "flate2", "form_urlencoded", "futures", diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index 11484c6..ae09007 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -54,6 +54,7 @@ lazy_static = "1.4" zstd = "0.13" reqwest = { version = "0.11", features = ["json"] } tracing-appender = "0.2" +elastic_dsl_utilities.workspace = true [dev-dependencies] tempfile = "3.20.0" diff --git a/asap-query-engine/src/engines/simple_engine.rs b/asap-query-engine/src/engines/simple_engine.rs index 1fa7558..971ccb7 100644 --- a/asap-query-engine/src/engines/simple_engine.rs +++ b/asap-query-engine/src/engines/simple_engine.rs @@ -9,7 +9,7 @@ use crate::stores::{Store, TimestampedBucketsMap}; use core::panic; use promql_utilities::get_is_collapsable; use serde_json::Value; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Instant; use tracing::{debug, warn}; @@ -31,6 +31,11 @@ use sqlparser::parser::Parser as parser; // SQL issue: refactor simpleengine to create matchresult similar to SQLquerydata +use elastic_dsl_utilities::pattern::parse_and_classify; +use elastic_dsl_utilities::types::{ + EsDslQueryPattern, LabelFilter, MetricAggType, MetricAggregation, +}; + // Type alias for merged outputs (single aggregate per key after merging) type MergedOutputsMap = HashMap, Box>; @@ -1445,13 +1450,180 @@ impl SimpleEngine { match self.query_language { QueryLanguage::promql => self.handle_query_promql(query, time), QueryLanguage::sql => self.handle_query_sql(query, time), - QueryLanguage::elastic_querydsl => self.handle_query_elastic(), - QueryLanguage::elastic_sql => self.handle_query_elastic(), + QueryLanguage::elastic_querydsl => self.handle_query_elastic(query, time), + QueryLanguage::elastic_sql => self.handle_query_elastic(query, time), } } - pub fn handle_query_elastic(&self) -> Option<(KeyByLabelNames, QueryResult)> { - None + pub fn handle_query_elastic( + &self, + query: String, + time: f64, + ) -> Option<(KeyByLabelNames, QueryResult)> { + let context = self.build_query_execution_context_elastic(query, time)?; + // Execute complete query pipeline + let results = self + .execute_query_pipeline(&context, false) // SQL: topk disabled + .map_err(|e| { + warn!("Query execution failed: {}", e); + e + }) + .ok()?; + + Some(( + context.metadata.query_output_labels, + QueryResult::vector(results, context.query_time), + )) + } + + pub fn build_query_execution_context_elastic( + &self, + query: String, + time: f64, + ) -> Option { + let query_time = Self::convert_query_time_to_data_time(time); + + // 1. Parse query DSL somehow. Elasticsearch DSL crate does not support deserializing, but maybe can use Opensearch instead? + // 2. Determine whether query is supported using some AST representation or hardcoded pattern matching. + let query_pattern: EsDslQueryPattern = + parse_and_classify(&query).unwrap_or_else(|_| EsDslQueryPattern::Unknown); + match query_pattern { + EsDslQueryPattern::Unknown => { + debug!("Could not parse query into known pattern"); + return None; + } + _ => { + debug!("Parsed query pattern: {:?}", query_pattern); + } + } + + // 3. Convert parsed query into execution context components (labels, statistic, kwargs, metadata, store query plan, etc.) + + // TODO: Figure out how to handle query configuration for ElasticSearch queries. + let query_config = self.find_query_config(&query)?; + + let do_merge = true; // No "instant" queries in ElasticSearch supported for now, so we always need to merge. + let agg_info = self.get_aggregation_id_info(query_config); + + let (metric, query_metadata) = self.build_query_metadata_elastic(&query_pattern)?; + + let spatial_filter = String::new(); // Placeholder - extract from query if applicable + + // TODO: Need way to parse ES DSL "date math". + let timestamps = QueryTimestamps { + start_timestamp: query_time - 60000, // Placeholder - determine based on query + end_timestamp: query_time, + }; + + let query_plan = self + .create_store_query_plan(&metric, ×tamps, &agg_info) + .map_err(|e| { + warn!("Failed to create store query plan: {}", e); + e + }) + .ok()?; + + let grouping_labels = self + .streaming_config + .get_aggregation_config(agg_info.aggregation_id_for_value) + .map(|config| config.grouping_labels.clone()) + .unwrap_or_else(|| query_metadata.query_output_labels.clone()); + + let aggregated_labels = self + .streaming_config + .get_aggregation_config(agg_info.aggregation_id_for_key) + .map(|config| config.aggregated_labels.clone()) + .unwrap_or_else(KeyByLabelNames::empty); + + Some(QueryExecutionContext { + metric: metric, + metadata: query_metadata, + store_plan: query_plan.clone(), + agg_info: agg_info.clone(), + do_merge, + spatial_filter, + query_time, + grouping_labels, + aggregated_labels, + }) + } + + fn build_query_metadata_elastic( + &self, + query_pattern: &EsDslQueryPattern, + ) -> Option<(String, QueryMetadata)> { + // Constructs QueryMetadata based on the parsed ES DSL query pattern. This includes determining the + // metric to query, the statistic to compute, and any relevant query kwargs (e.g. quantile value for percentiles). + + // Figure out aggregation type and what labels are included in output. + // By default, we only include grouping labels in the output for ES DSL. + let aggregation: MetricAggregation; // Take first aggregation by default since current engine doesn't support multiple aggregations in a single query. + let mut query_output_labels = match query_pattern { + EsDslQueryPattern::SimpleAggregation { aggregations, .. } + | EsDslQueryPattern::FilteredAggregation { aggregations, .. } => { + aggregation = aggregations.first()?.clone(); + KeyByLabelNames::empty() + } + EsDslQueryPattern::FilteredAggregationBatched { + aggregations, + buckets, + .. + } => { + // The labels output = every unique column that we grouped by in the DSL query. + let mut labels: HashSet = HashSet::new(); + for bucket in buckets { + labels.insert(bucket.filter.field.clone()); + } + aggregation = aggregations.first()?.clone(); + KeyByLabelNames::new(labels.into_iter().collect()) + } + _ => { + debug!("Query pattern does not match known aggregation types for label extraction"); + return None; + } + }; + + let metric = aggregation.field.clone(); + + // Map ElasticSearch aggregation types to our internal Statistic enum. + let statistic_to_compute = match aggregation.agg_type { + MetricAggType::Percentiles => Statistic::Quantile, + MetricAggType::Avg => Statistic::Rate, + MetricAggType::Sum => Statistic::Sum, + MetricAggType::Min => Statistic::Min, + MetricAggType::Max => Statistic::Max, + }; + // For topk queries, prepend "__name__" to query_output_labels + if statistic_to_compute == Statistic::Topk { + let mut new_labels = vec!["__name__".to_string()]; + new_labels.extend(query_output_labels.labels); + query_output_labels = KeyByLabelNames::new(new_labels); + } + + let mut query_kwargs = HashMap::new(); // Placeholder - build based on query and statistic + match aggregation.agg_type { + MetricAggType::Percentiles => { + // Extract quantile value from aggregation parameters and add to query_kwargs + if let Some(params) = &aggregation.params { + if let Some(percents) = params.get("percents") { + // Get first value from percents array since we only support one quantile argument for now. + let quantile = percents + .as_array() + .and_then(|arr| arr.first()) + .and_then(|v| v.as_f64()); + query_kwargs.insert("quantile".to_string(), quantile?.to_string()); + } + } + } + _ => {} + } + + let metadata = QueryMetadata { + query_output_labels: query_output_labels.clone(), + statistic_to_compute, + query_kwargs: query_kwargs.clone(), + }; + Some((metric, metadata)) } // /// Try to extract sketch query components from a PromQL query string. From c4cc7a9f58936b416a690d0c9a8a4437af0c7efa Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Thu, 19 Mar 2026 17:29:11 -0400 Subject: [PATCH 04/14] Preliminary test that query context can execute (basic quantile query). --- .../src/engines/simple_engine.rs | 7 +- .../src/tests/elastic_query_tests.rs | 173 ++++++++++++++++++ asap-query-engine/src/tests/mod.rs | 1 + 3 files changed, 178 insertions(+), 3 deletions(-) create mode 100644 asap-query-engine/src/tests/elastic_query_tests.rs diff --git a/asap-query-engine/src/engines/simple_engine.rs b/asap-query-engine/src/engines/simple_engine.rs index 971ccb7..de68b78 100644 --- a/asap-query-engine/src/engines/simple_engine.rs +++ b/asap-query-engine/src/engines/simple_engine.rs @@ -1461,6 +1461,7 @@ impl SimpleEngine { time: f64, ) -> Option<(KeyByLabelNames, QueryResult)> { let context = self.build_query_execution_context_elastic(query, time)?; + println!("Built execution context for ElasticSearch query {:?}", context); // Execute complete query pipeline let results = self .execute_query_pipeline(&context, false) // SQL: topk disabled @@ -1501,9 +1502,9 @@ impl SimpleEngine { // TODO: Figure out how to handle query configuration for ElasticSearch queries. let query_config = self.find_query_config(&query)?; + let agg_info = self.get_aggregation_id_info(query_config); let do_merge = true; // No "instant" queries in ElasticSearch supported for now, so we always need to merge. - let agg_info = self.get_aggregation_id_info(query_config); let (metric, query_metadata) = self.build_query_metadata_elastic(&query_pattern)?; @@ -1511,8 +1512,8 @@ impl SimpleEngine { // TODO: Need way to parse ES DSL "date math". let timestamps = QueryTimestamps { - start_timestamp: query_time - 60000, // Placeholder - determine based on query - end_timestamp: query_time, + start_timestamp: 0, // Placeholder - determine based on query + end_timestamp: query_time, // Placeholder - 1 hour before query_time }; let query_plan = self diff --git a/asap-query-engine/src/tests/elastic_query_tests.rs b/asap-query-engine/src/tests/elastic_query_tests.rs new file mode 100644 index 0000000..d4210b9 --- /dev/null +++ b/asap-query-engine/src/tests/elastic_query_tests.rs @@ -0,0 +1,173 @@ +#[cfg(test)] +use crate::data_model::{CleanupPolicy, InferenceConfig, QueryLanguage, StreamingConfig}; +use crate::drivers::query::adapters::AdapterConfig; +use crate::drivers::query::servers::http::{HttpServer, HttpServerConfig}; +use crate::engines::SimpleEngine; +use crate::stores::simple_map_store::SimpleMapStore; +use reqwest::Client; +use serde_json::{json, Value}; +use sketchlib_rust::elastic; +use std::sync::Arc; +use tokio::net::TcpListener; +use tokio::time::{sleep, Duration}; + +use crate::data_model::{AggregateCore, KeyByLabelValues, PrecomputedOutput}; +use crate::precompute_operators::{ + DatasketchesKLLAccumulator, DeltaSetAggregatorAccumulator, CountMinSketchAccumulator, SumAccumulator, +}; + +use crate::tests::test_utilities::{self, create_engine_multi_timestamp, create_engine_single_pop}; + +#[test] +fn test_esdsl_groupby_aggregation_query_sum() { + + let _ = tracing_subscriber::fmt() + .with_max_level(tracing::Level::DEBUG) + .with_test_writer() // Routes output through the test runner's capture mechanism + .try_init(); + + // Elastic DSL query (batch filtered). + let elastic_query = json!({ + "size": 0, + "aggs": { + "out": { + "filters": { + "filters": { + "bucket1": { + "term": { "host.keyword": "host-a" }, + }, + "bucket2": { + "term": { "host.keyword": "host-b" }, + } + } + }, + "aggs": { + "out": { + "sum": { + "field": "http_requests", + } + } + } + } + } + }); + + let engine = create_engine_single_pop( + "http_requests", + "SumAccumulator", + vec!["host"], + vec![ + ( + Some(vec!["host-a".to_string()]), + Box::new(SumAccumulator::with_sum(100.0)), + ), + ( + Some(vec!["host-b".to_string()]), + Box::new(SumAccumulator::with_sum(200.0)), + ), + ], + &elastic_query.to_string(), + ); + + let time = 1_000.0; // Arbitrary timestamp for testing + let output = engine.handle_query_elastic(elastic_query.to_string(), time); + if let Some((_, result)) = output { + let result_json = serde_json::to_string(&result).unwrap(); + println!("Query Result: {result_json}"); + } else { + panic!("Expected query result, got None"); + } + +} + +#[test] +fn test_esdsl_groupby_aggregation_quantile() { + let _ = tracing_subscriber::fmt() + .with_max_level(tracing::Level::DEBUG) + .with_test_writer() // Routes output through the test runner's capture mechanism + .try_init(); + + let mut kll_a_1 = DatasketchesKLLAccumulator::new(200); + for v in 1..=100 { + kll_a_1._update(v as f64); + } + let mut kll_a_2 = DatasketchesKLLAccumulator::new(200); + for v in 101..=200 { + kll_a_2._update(v as f64); + } + let mut kll_b_1 = DatasketchesKLLAccumulator::new(200); + for v in 1..=200 { + kll_b_1._update(v as f64); + } + let mut kll_b_2 = DatasketchesKLLAccumulator::new(200); + for v in 201..=400 { + kll_b_2._update(v as f64); + } + + // Elastic DSL query (batch filtered). + let elastic_query = json!({ + "size": 0, + "aggs": { + "out": { + "filters": { + "filters": { + "bucket1": { + "term": { "host.keyword": "host-a" }, + }, + "bucket2": { + "term": { "host.keyword": "host-b" }, + } + } + }, + "aggs": { + "out": { + "percentiles": { + "field": "http_requests", + "percents": [0.90] + } + } + } + } + } + }); + + let engine = create_engine_multi_timestamp( + "http_requests", + "DatasketchesKLLAccumulator", + vec!["host"], + vec![ + ( + 999_000, + Some(vec!["host-a".to_string()]), + Box::new(kll_a_1), + ), + ( + 999_000, + Some(vec!["host-b".to_string()]), + Box::new(kll_b_1), + ), + ( + 1_000_000, + Some(vec!["host-a".to_string()]), + Box::new(kll_a_2), + ), + ( + 1_000_000, + Some(vec!["host-b".to_string()]), + Box::new(kll_b_2), + ), + ], + &elastic_query.to_string(), + ); + + let time = 1_000.0; // Arbitrary timestamp for testing + let output = engine.handle_query_elastic(elastic_query.to_string(), time); + if let Some((_, result)) = output { + let result_json = serde_json::to_string(&result).unwrap(); + println!("Query Result: {result_json}"); + } else { + panic!("Expected query result, got None"); + } + +} + diff --git a/asap-query-engine/src/tests/mod.rs b/asap-query-engine/src/tests/mod.rs index 3457e5b..a0d8921 100644 --- a/asap-query-engine/src/tests/mod.rs +++ b/asap-query-engine/src/tests/mod.rs @@ -1,6 +1,7 @@ pub mod clickhouse_forwarding_tests; pub mod datafusion; pub mod elastic_forwarding_tests; +pub mod elastic_query_tests; pub mod prometheus_forwarding_tests; pub mod query_equivalence_tests; pub mod trait_design_tests; From 737c64cef954693f97d95a5bd896ddbb97b17cc0 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Thu, 19 Mar 2026 18:59:14 -0400 Subject: [PATCH 05/14] Simplify query patterns matched (support only simple aggregation and groupby) to map more easily to internal representation. --- .../rs/elastic_dsl_utilities/src/parsing.rs | 157 ++++++++--- .../rs/elastic_dsl_utilities/src/pattern.rs | 258 +++++++++--------- .../rs/elastic_dsl_utilities/src/types.rs | 37 +-- .../supported_es_queries.md | 105 +++---- .../src/engines/simple_engine.rs | 18 +- 5 files changed, 306 insertions(+), 269 deletions(-) diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs index 5f00c45..ca9c4bc 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs @@ -1,6 +1,6 @@ use serde_json::Value; -use crate::types::{BucketSpec, LabelFilter, MetricAggType, MetricAggregation, TimeRange}; +use crate::types::{GroupBySpec, LabelFilter, MetricAggType, MetricAggregation, TimeRange}; // --------------------------------------------------------------------------- // Metric aggregation helpers @@ -33,7 +33,11 @@ pub fn extract_metric_aggs(aggs: &Value) -> Option> { result_name: result_name.clone(), agg_type, field, - params: Some(kwargs), + params: if kwargs.as_object().is_some_and(|o| o.is_empty()) { + None + } else { + Some(kwargs) + }, }); break; } @@ -141,7 +145,7 @@ pub fn extract_label_filters(query: &Value) -> Option<(Vec, Option< label_filters.push(extract_label_filter_from_term(clause)?); } else if clause.get("range").is_some() { if time_range.is_some() { - continue; // Multiple range clauses - ignore all but the first. + return None; } time_range = Some(extract_time_range(clause)?); } else { @@ -154,62 +158,94 @@ pub fn extract_label_filters(query: &Value) -> Option<(Vec, Option< } // --------------------------------------------------------------------------- -// Batched-filters helpers +// Query predicate helpers // --------------------------------------------------------------------------- -/// Try to extract a batched filters aggregation (essentially the groupby buckets) from the top-level `"aggs"` -/// object. +/// Extract optional predicates from top-level query: +/// - `{"range": ...}` -> `(label_filters=[], time_range=Some(...))` +/// - `{"bool": {"filter": ...}}` -> label filters + optional time range +/// - `None`/`null` query is represented by caller as `(vec![], None)`. +pub fn extract_predicates_from_query(query: &Value) -> Option<(Vec, Option)> { + if query.is_null() { + return Some((Vec::new(), None)); + } + + if let Some(time_range) = extract_time_range(query) { + return Some((Vec::new(), Some(time_range))); + } + + if query.get("bool").is_some() { + return extract_label_filters(query); + } + + None +} + +// --------------------------------------------------------------------------- +// Group-by helpers +// --------------------------------------------------------------------------- + +/// Try to extract a grouped aggregation from top-level `"aggs"` object. /// /// Expected shape: /// ```json /// { /// "aggs": { -/// "": { -/// "filters": { -/// "filters": { -/// "": { "term": { ... } }, -/// "": { "term": { ... } } -/// } +/// "": { +/// "terms": { "field": "