diff --git a/Cargo.lock b/Cargo.lock index 4de8ab3..b0bbcd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3446,8 +3446,8 @@ dependencies = [ "bincode", "chrono", "clap 4.5.60", - "ctor", "criterion", + "ctor", "dashmap 5.5.3", "datafusion", "datafusion_summary_library", @@ -3541,15 +3541,6 @@ dependencies = [ "getrandom 0.2.17", ] -[[package]] -name = "rand_core" -version = "0.9.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c" -dependencies = [ - "getrandom 0.3.4", -] - [[package]] name = "rayon" version = "1.11.0" diff --git a/asap-common/dependencies/rs/sketch_db_common/src/inference_config.rs b/asap-common/dependencies/rs/sketch_db_common/src/inference_config.rs index 48bd2c4..5778a6c 100644 --- a/asap-common/dependencies/rs/sketch_db_common/src/inference_config.rs +++ b/asap-common/dependencies/rs/sketch_db_common/src/inference_config.rs @@ -17,7 +17,7 @@ pub enum SchemaConfig { PromQL(PromQLSchema), SQL(SQLSchema), ElasticQueryDSL, - ElasticSQL, + ElasticSQL(SQLSchema), } #[derive(Debug, Clone)] @@ -33,7 +33,7 @@ impl InferenceConfig { QueryLanguage::promql => SchemaConfig::PromQL(PromQLSchema::new()), QueryLanguage::sql => SchemaConfig::SQL(SQLSchema::new(Vec::new())), QueryLanguage::elastic_querydsl => SchemaConfig::ElasticQueryDSL, - QueryLanguage::elastic_sql => SchemaConfig::ElasticSQL, + QueryLanguage::elastic_sql => SchemaConfig::ElasticSQL(SQLSchema::new(Vec::new())), }; Self { schema, @@ -61,7 +61,10 @@ impl InferenceConfig { SchemaConfig::SQL(sql_schema) } QueryLanguage::elastic_querydsl => SchemaConfig::ElasticQueryDSL, - QueryLanguage::elastic_sql => SchemaConfig::ElasticSQL, + QueryLanguage::elastic_sql => { + let sql_schema = Self::parse_sql_schema(data)?; + SchemaConfig::SQL(sql_schema) + } }; let cleanup_policy = Self::parse_cleanup_policy(data)?; diff --git a/asap-query-engine/src/data_model/streaming_config.rs b/asap-query-engine/src/data_model/streaming_config.rs index 746a632..127a8c2 100644 --- a/asap-query-engine/src/data_model/streaming_config.rs +++ b/asap-query-engine/src/data_model/streaming_config.rs @@ -72,7 +72,7 @@ impl StreamingConfig { SchemaConfig::PromQL(_) => QueryLanguage::promql, SchemaConfig::SQL(_) => QueryLanguage::sql, SchemaConfig::ElasticQueryDSL => QueryLanguage::elastic_querydsl, - SchemaConfig::ElasticSQL => QueryLanguage::elastic_sql, + SchemaConfig::ElasticSQL(_) => QueryLanguage::elastic_sql, }) .unwrap_or(QueryLanguage::promql); // Default to promql if no inference_config diff --git a/asap-query-engine/src/drivers/query/adapters/elastic_http.rs b/asap-query-engine/src/drivers/query/adapters/elastic_http.rs index fce8a9f..1182447 100644 --- a/asap-query-engine/src/drivers/query/adapters/elastic_http.rs +++ b/asap-query-engine/src/drivers/query/adapters/elastic_http.rs @@ -1,6 +1,7 @@ use super::config::AdapterConfig; use super::traits::*; use crate::data_model::QueryLanguage; +use crate::QueryResult; use async_trait::async_trait; use axum::{ body::Bytes, @@ -11,7 +12,7 @@ use axum::{ use serde_json::{json, Value}; use std::collections::HashMap; use std::sync::Arc; -use tracing::{debug, error}; +use tracing::{debug, error, info}; /// Elasticsearch HTTP protocol adapter pub struct ElasticHttpAdapter { @@ -26,34 +27,29 @@ impl ElasticHttpAdapter { /// Parse Elasticsearch query from JSON body fn parse_elasticsearch_query(&self, body: &Bytes) -> Result { - debug!( - "Elasticsearch adapter: parsing query for language {:?}", - self.config.language - ); - - // Parse the JSON body let json_body: Value = serde_json::from_slice(body) .map_err(|e| AdapterError::ParseError(format!("Invalid JSON: {}", e)))?; - // Store the entire query as a JSON string - let query = serde_json::to_string(&json_body) - .map_err(|e| AdapterError::ParseError(format!("Failed to serialize query: {}", e)))?; + // Extract the SQL string from the "query" field + let query = match &self.config.language { + QueryLanguage::elastic_sql => json_body + .get("query") + .and_then(|v| v.as_str()) + .ok_or_else(|| AdapterError::MissingParameter("query".to_string()))? + .to_string(), + _ => { + // For QueryDSL, keep the full JSON as before + serde_json::to_string(&json_body).map_err(|e| { + AdapterError::ParseError(format!("Failed to serialize query: {}", e)) + })? + } + }; let time = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs_f64(); - debug!( - "Elasticsearch adapter: parsed {} query with time={}", - if matches!(self.config.language, QueryLanguage::elastic_sql) { - "SQL" - } else { - "Query DSL" - }, - time - ); - Ok(ParsedQueryRequest { query, time }) } } @@ -126,34 +122,51 @@ impl QueryRequestAdapter for ElasticHttpAdapter { impl QueryResponseAdapter for ElasticHttpAdapter { async fn format_success_response( &self, - _result: &QueryExecutionResult, + result: &QueryExecutionResult, ) -> Result { - debug!("Elasticsearch adapter: formatting success response"); + info!( + "SKETCH HIT: serving {} rows from precomputed sketches", + match &result.query_result { + QueryResult::Vector(v) => v.values.len(), + QueryResult::Matrix(_) => 0, + } + ); - // For now, since we're falling back for every query, - // the result from the fallback will be passed through - // In the future, this could transform local execution results - // to Elasticsearch format + let label_names = &result.query_output_labels.labels; + + let hits: Vec = match &result.query_result { + QueryResult::Vector(instant_vector) => instant_vector + .values + .iter() + .map(|element| { + let mut source = serde_json::Map::new(); + for (i, label_name) in label_names.iter().enumerate() { + let label_value = element.labels.get(i).map(|s| s.as_str()).unwrap_or(""); + source.insert(label_name.clone(), json!(label_value)); + } + source.insert("value".to_string(), json!(element.value)); + json!({ + "_source": source + }) + }) + .collect(), + QueryResult::Matrix(_) => { + return Err(StatusCode::NOT_IMPLEMENTED); + } + }; - // Return a stub Elasticsearch-style response for now. let response = json!({ "took": 0, "timed_out": false, "hits": { "total": { - "value": 0, + "value": hits.len(), "relation": "eq" }, - "hits": [] + "hits": hits } }); - debug!( - "Elasticsearch adapter: returning stub response: {}", - serde_json::to_string_pretty(&response) - .unwrap_or_else(|_| "Unable to format".to_string()) - ); - Ok(Json(response).into_response()) } @@ -370,7 +383,6 @@ mod tests { let sql_query = json!({ "query": "SELECT status, COUNT(*) FROM logs GROUP BY status", - "fetch_size": 100, "time_zone": "UTC" }); @@ -379,8 +391,10 @@ mod tests { assert!(result.is_ok(), "SQL query with params should parse"); let parsed = result.unwrap(); - assert!(parsed.query.contains("fetch_size")); - assert!(parsed.query.contains("time_zone")); + assert!( + !parsed.query.contains("time_zone"), + "SQL adapter should extract only the query string, not extra params" + ); } /// Test: Invalid JSON should return error diff --git a/asap-query-engine/src/drivers/query/fallback/elastic.rs b/asap-query-engine/src/drivers/query/fallback/elastic.rs index 5617faa..b365047 100644 --- a/asap-query-engine/src/drivers/query/fallback/elastic.rs +++ b/asap-query-engine/src/drivers/query/fallback/elastic.rs @@ -8,6 +8,8 @@ use serde_json::Value; use std::collections::HashMap; use tracing::{debug, error}; +const ELASTIC_FETCH_SIZE: u64 = 1000; + /// Fallback client for Elasticsearch HTTP API pub struct ElasticHttpFallback { client: Client, @@ -80,11 +82,23 @@ impl FallbackClient for ElasticHttpFallback { debug!("Full forwarding URL: {}", full_url); - let query_body: Value = match serde_json::from_str(&request.query) { - Ok(json) => json, - Err(e) => { - error!("Failed to parse query as JSON: {}", e); - return Err(StatusCode::INTERNAL_SERVER_ERROR); + let query_body: Value = match self.language { + QueryLanguage::elastic_sql => { + // query is a raw SQL string, need to wrap it for the ES SQL endpoint + serde_json::json!({ + "query": request.query.trim().trim_end_matches(';'), + "fetch_size": ELASTIC_FETCH_SIZE, + }) + } + _ => { + // query is already a JSON string (Query DSL) + match serde_json::from_str(&request.query) { + Ok(json) => json, + Err(e) => { + error!("Failed to parse query as JSON: {}", e); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + } } }; diff --git a/asap-query-engine/src/engines/simple_engine.rs b/asap-query-engine/src/engines/simple_engine.rs index e60fa58..2753d66 100644 --- a/asap-query-engine/src/engines/simple_engine.rs +++ b/asap-query-engine/src/engines/simple_engine.rs @@ -1132,7 +1132,7 @@ impl SimpleEngine { return None; } &SchemaConfig::ElasticQueryDSL => todo!(), - &SchemaConfig::ElasticSQL => todo!(), + SchemaConfig::ElasticSQL(sql_schema) => sql_schema.clone(), }; let statements = parser::parse_sql(&GenericDialect {}, query.as_str()).unwrap(); @@ -1473,7 +1473,7 @@ impl SimpleEngine { QueryLanguage::promql => self.handle_query_promql(query, time), QueryLanguage::sql => self.handle_query_sql(query, time), QueryLanguage::elastic_querydsl => self.handle_query_elastic(), - QueryLanguage::elastic_sql => self.handle_query_elastic(), + QueryLanguage::elastic_sql => self.handle_query_sql(query, time), } } @@ -1919,7 +1919,7 @@ impl SimpleEngine { warn!("PromQL query requested but config has ElasticQueryDSL schema"); return None; } - &SchemaConfig::ElasticSQL => { + SchemaConfig::ElasticSQL(_) => { warn!("PromQL query requested but config has ElasticSQL schema"); return None; } diff --git a/asap-query-engine/src/tests/test_utilities/config_builders.rs b/asap-query-engine/src/tests/test_utilities/config_builders.rs index 5af70ce..c88dad8 100644 --- a/asap-query-engine/src/tests/test_utilities/config_builders.rs +++ b/asap-query-engine/src/tests/test_utilities/config_builders.rs @@ -310,7 +310,7 @@ mod tests { } SchemaConfig::SQL(_) => panic!("Expected PromQL schema"), SchemaConfig::ElasticQueryDSL => panic!("Expected PromQL schema"), - SchemaConfig::ElasticSQL => panic!("Expected PromQL schema"), + SchemaConfig::ElasticSQL(_) => panic!("Expected PromQL schema"), } // Verify query configs (2 queries: PromQL + SQL)