diff --git a/Cargo.lock b/Cargo.lock index 934545f..1ebbc7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1613,6 +1613,15 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "elastic_dsl_utilities" +version = "0.1.0" +dependencies = [ + "chrono", + "serde", + "serde_json", +] + [[package]] name = "encoding_rs" version = "0.8.35" @@ -3551,6 +3560,7 @@ dependencies = [ "datafusion", "datafusion_summary_library", "dsrs", + "elastic_dsl_utilities", "flate2", "form_urlencoded", "futures", diff --git a/Cargo.toml b/Cargo.toml index f396d96..a283fea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "asap-common/sketch-core", "asap-common/dependencies/rs/promql_utilities", "asap-common/dependencies/rs/sql_utilities", + "asap-common/dependencies/rs/elastic_dsl_utilities", "asap-common/dependencies/rs/sketch_db_common", "asap-common/dependencies/rs/datafusion_summary_library", "asap-query-engine", @@ -34,5 +35,6 @@ promql_utilities = { path = "asap-common/dependencies/rs/promql_utilities" } sql_utilities = { path = "asap-common/dependencies/rs/sql_utilities" } sketch_db_common = { path = "asap-common/dependencies/rs/sketch_db_common" } datafusion_summary_library = { path = "asap-common/dependencies/rs/datafusion_summary_library" } +elastic_dsl_utilities = { path = "asap-common/dependencies/rs/elastic_dsl_utilities" } asap_planner = { path = "asap-planner-rs" } indexmap = { version = "2.0", features = ["serde"] } diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/Cargo.toml b/asap-common/dependencies/rs/elastic_dsl_utilities/Cargo.toml new file mode 100644 index 0000000..3726cab --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "elastic_dsl_utilities" +edition.workspace = true +version.workspace = true + +[dependencies] +serde.workspace = true +serde_json.workspace = true +chrono.workspace = true diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs new file mode 100644 index 0000000..c03929f --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs @@ -0,0 +1,7 @@ +pub mod parsing; +pub mod pattern; +pub mod types; + +pub use parsing::*; +pub use pattern::{classify, parse_and_classify}; +pub use types::*; diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs new file mode 100644 index 0000000..3963c1f --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs @@ -0,0 +1,439 @@ +use serde_json::Value; + +use crate::types::{GroupBySpec, LabelFilter, MetricAggType, MetricAggregation, TimeRange}; + +// --------------------------------------------------------------------------- +// Metric aggregation helpers +// --------------------------------------------------------------------------- + +/// Try to extract a list of metric aggregations from the top-level `"aggs"` +/// object of a query. Returns `None` if *any* aggregation entry is not one of +/// the recognised metric types (avg / min / max / sum / percentiles). +pub fn extract_metric_aggs(aggs: &Value) -> Option> { + let obj = aggs.as_object()?; + if obj.is_empty() { + return None; + } + + let mut result = Vec::with_capacity(obj.len()); + for (result_name, agg_body) in obj { + // Each aggregation body is an object that should contain exactly one + // recognised metric aggregation key. + let body_obj = agg_body.as_object()?; + let mut found = None; + for (key, inner) in body_obj { + if let Some(agg_type) = MetricAggType::from_json_str(key) { + let field = inner.get("field")?.as_str()?.to_owned(); + let kwargs_map = inner + .as_object()? + .iter() + .filter(|(k, _)| *k != "field") + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let kwargs = serde_json::Value::Object(kwargs_map); + found = Some(MetricAggregation { + result_name: result_name.clone(), + agg_type, + field, + params: if kwargs.as_object().is_some_and(|o| o.is_empty()) { + None + } else { + Some(kwargs) + }, + }); + break; + } + } + result.push(found?); + } + Some(result) +} + +// --------------------------------------------------------------------------- +// Time range helpers +// --------------------------------------------------------------------------- + +/// Try to extract a `TimeRange` from a bare `{"range": {"": {...}}}` +/// query value. Accepts either string or numeric values for gte/lte. +pub fn extract_time_range(query: &Value) -> Option { + let range_obj = query.get("range")?.as_object()?; + // There should be exactly one field entry in the range object. + if range_obj.len() != 1 { + return None; + } + let (field, bounds) = range_obj.iter().next()?; + let gte = bounds.get("gte").and_then(value_to_string); + let lte = bounds.get("lte").and_then(value_to_string); + Some(TimeRange { + field: field.clone(), + gte, + lte, + }) +} + +fn value_to_string(v: &Value) -> Option { + match v { + Value::String(s) => Some(s.clone()), + Value::Number(n) => Some(n.to_string()), + _ => None, + } +} + +// --------------------------------------------------------------------------- +// Term / label-filter helpers +// --------------------------------------------------------------------------- + +/// Strip the `.keyword` suffix from a field name, if present. +fn strip_keyword_suffix(field: &str) -> &str { + field.strip_suffix(".keyword").unwrap_or(field) +} + +/// Try to extract a `LabelFilter` from a single `"term"` query object. +/// +/// Handles both the opensearch-dsl long form: +/// ```json +/// { "term": { "field": { "value": "val" } } } +/// ``` +/// and the ES shorthand: +/// ```json +/// { "term": { "field": "val" } } +/// ``` +pub fn extract_label_filter_from_term(term_query: &Value) -> Option { + let term_obj = term_query.get("term")?.as_object()?; + if term_obj.len() != 1 { + return None; + } + let (raw_field, field_value) = term_obj.iter().next()?; + let field = strip_keyword_suffix(raw_field).to_owned(); + let value = if let Some(s) = field_value.as_str() { + // Shorthand: "field": "value" + s.to_owned() + } else if let Some(inner) = field_value.as_object() { + // Long form: "field": { "value": "..." } + inner.get("value")?.as_str()?.to_owned() + } else { + return None; + }; + Some(LabelFilter { field, value }) +} + +// --------------------------------------------------------------------------- +// Bool filter helpers +// --------------------------------------------------------------------------- + +/// Try to extract a list of label filters (and optionally a time range) from a +/// `{"bool": {"filter": [...]}}` query structure. +/// +/// The `filter` array must contain at least a term query, and may also contain +/// a range query. Additional (unrecognised) entries in the array cause this +/// function to return `None`. +pub fn extract_label_filters(query: &Value) -> Option<(Vec, Option)> { + let filter_clauses = query.get("bool")?.get("filter")?; + + // The filter value may be an array (multiple clauses) or a single object. + let clauses: Vec<&Value> = if let Some(arr) = filter_clauses.as_array() { + arr.iter().collect() + } else if filter_clauses.is_object() { + vec![filter_clauses] + } else { + return None; + }; + + let mut label_filters: Vec = Vec::new(); + let mut time_range: Option = None; + + for clause in clauses { + if clause.get("term").is_some() { + label_filters.push(extract_label_filter_from_term(clause)?); + } else if clause.get("range").is_some() { + if time_range.is_some() { + return None; + } + time_range = Some(extract_time_range(clause)?); + } else { + // Unknown clause type in the filter. + return None; + } + } + + Some((label_filters, time_range)) +} + +// --------------------------------------------------------------------------- +// Query predicate helpers +// --------------------------------------------------------------------------- + +/// Extract optional predicates from top-level query: +/// - `{"range": ...}` -> `(label_filters=[], time_range=Some(...))` +/// - `{"bool": {"filter": ...}}` -> label filters + optional time range +/// - `None`/`null` query is represented by caller as `(vec![], None)`. +pub fn extract_predicates_from_query( + query: &Value, +) -> Option<(Vec, Option)> { + if query.is_null() { + return Some((Vec::new(), None)); + } + + if let Some(time_range) = extract_time_range(query) { + return Some((Vec::new(), Some(time_range))); + } + + if query.get("bool").is_some() { + return extract_label_filters(query); + } + + None +} + +// --------------------------------------------------------------------------- +// Group-by helpers +// --------------------------------------------------------------------------- + +/// Try to extract a grouped aggregation from top-level `"aggs"` object. +/// +/// Expected shape: +/// ```json +/// { +/// "aggs": { +/// "": { +/// "terms": { "field": "