Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub enum SchemaConfig {
PromQL(PromQLSchema),
SQL(SQLSchema),
ElasticQueryDSL,
ElasticSQL,
ElasticSQL(SQLSchema),
}

#[derive(Debug, Clone)]
Expand All @@ -33,7 +33,7 @@ impl InferenceConfig {
QueryLanguage::promql => SchemaConfig::PromQL(PromQLSchema::new()),
QueryLanguage::sql => SchemaConfig::SQL(SQLSchema::new(Vec::new())),
QueryLanguage::elastic_querydsl => SchemaConfig::ElasticQueryDSL,
QueryLanguage::elastic_sql => SchemaConfig::ElasticSQL,
QueryLanguage::elastic_sql => SchemaConfig::ElasticSQL(SQLSchema::new(Vec::new())),
};
Self {
schema,
Expand Down Expand Up @@ -61,7 +61,10 @@ impl InferenceConfig {
SchemaConfig::SQL(sql_schema)
}
QueryLanguage::elastic_querydsl => SchemaConfig::ElasticQueryDSL,
QueryLanguage::elastic_sql => SchemaConfig::ElasticSQL,
QueryLanguage::elastic_sql => {
let sql_schema = Self::parse_sql_schema(data)?;
SchemaConfig::SQL(sql_schema)
}
};

let cleanup_policy = Self::parse_cleanup_policy(data)?;
Expand Down
2 changes: 1 addition & 1 deletion asap-query-engine/src/data_model/streaming_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl StreamingConfig {
SchemaConfig::PromQL(_) => QueryLanguage::promql,
SchemaConfig::SQL(_) => QueryLanguage::sql,
SchemaConfig::ElasticQueryDSL => QueryLanguage::elastic_querydsl,
SchemaConfig::ElasticSQL => QueryLanguage::elastic_sql,
SchemaConfig::ElasticSQL(_) => QueryLanguage::elastic_sql,
})
.unwrap_or(QueryLanguage::promql); // Default to promql if no inference_config

Expand Down
90 changes: 52 additions & 38 deletions asap-query-engine/src/drivers/query/adapters/elastic_http.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::config::AdapterConfig;
use super::traits::*;
use crate::data_model::QueryLanguage;
use crate::QueryResult;
use async_trait::async_trait;
use axum::{
body::Bytes,
Expand All @@ -11,7 +12,7 @@ use axum::{
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{debug, error};
use tracing::{debug, error, info};

/// Elasticsearch HTTP protocol adapter
pub struct ElasticHttpAdapter {
Expand All @@ -26,34 +27,29 @@ impl ElasticHttpAdapter {

/// Parse Elasticsearch query from JSON body
fn parse_elasticsearch_query(&self, body: &Bytes) -> Result<ParsedQueryRequest, AdapterError> {
debug!(
"Elasticsearch adapter: parsing query for language {:?}",
self.config.language
);

// Parse the JSON body
let json_body: Value = serde_json::from_slice(body)
.map_err(|e| AdapterError::ParseError(format!("Invalid JSON: {}", e)))?;

// Store the entire query as a JSON string
let query = serde_json::to_string(&json_body)
.map_err(|e| AdapterError::ParseError(format!("Failed to serialize query: {}", e)))?;
// Extract the SQL string from the "query" field
let query = match &self.config.language {
QueryLanguage::elastic_sql => json_body
.get("query")
.and_then(|v| v.as_str())
.ok_or_else(|| AdapterError::MissingParameter("query".to_string()))?
.to_string(),
_ => {
// For QueryDSL, keep the full JSON as before
serde_json::to_string(&json_body).map_err(|e| {
AdapterError::ParseError(format!("Failed to serialize query: {}", e))
})?
}
};

let time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();

debug!(
"Elasticsearch adapter: parsed {} query with time={}",
if matches!(self.config.language, QueryLanguage::elastic_sql) {
"SQL"
} else {
"Query DSL"
},
time
);

Ok(ParsedQueryRequest { query, time })
}
}
Expand Down Expand Up @@ -126,34 +122,51 @@ impl QueryRequestAdapter for ElasticHttpAdapter {
impl QueryResponseAdapter for ElasticHttpAdapter {
async fn format_success_response(
&self,
_result: &QueryExecutionResult,
result: &QueryExecutionResult,
) -> Result<Response, StatusCode> {
debug!("Elasticsearch adapter: formatting success response");
info!(
"SKETCH HIT: serving {} rows from precomputed sketches",
match &result.query_result {
QueryResult::Vector(v) => v.values.len(),
QueryResult::Matrix(_) => 0,
}
);

// For now, since we're falling back for every query,
// the result from the fallback will be passed through
// In the future, this could transform local execution results
// to Elasticsearch format
let label_names = &result.query_output_labels.labels;

let hits: Vec<Value> = match &result.query_result {
QueryResult::Vector(instant_vector) => instant_vector
.values
.iter()
.map(|element| {
let mut source = serde_json::Map::new();
for (i, label_name) in label_names.iter().enumerate() {
let label_value = element.labels.get(i).map(|s| s.as_str()).unwrap_or("");
source.insert(label_name.clone(), json!(label_value));
}
source.insert("value".to_string(), json!(element.value));
json!({
"_source": source
})
})
.collect(),
QueryResult::Matrix(_) => {
return Err(StatusCode::NOT_IMPLEMENTED);
}
};

// Return a stub Elasticsearch-style response for now.
let response = json!({
"took": 0,
"timed_out": false,
"hits": {
"total": {
"value": 0,
"value": hits.len(),
"relation": "eq"
},
"hits": []
"hits": hits
}
});

debug!(
"Elasticsearch adapter: returning stub response: {}",
serde_json::to_string_pretty(&response)
.unwrap_or_else(|_| "Unable to format".to_string())
);

Ok(Json(response).into_response())
}

Expand Down Expand Up @@ -370,7 +383,6 @@ mod tests {

let sql_query = json!({
"query": "SELECT status, COUNT(*) FROM logs GROUP BY status",
"fetch_size": 100,
"time_zone": "UTC"
});

Expand All @@ -379,8 +391,10 @@ mod tests {

assert!(result.is_ok(), "SQL query with params should parse");
let parsed = result.unwrap();
assert!(parsed.query.contains("fetch_size"));
assert!(parsed.query.contains("time_zone"));
assert!(
!parsed.query.contains("time_zone"),
"SQL adapter should extract only the query string, not extra params"
);
}

/// Test: Invalid JSON should return error
Expand Down
24 changes: 19 additions & 5 deletions asap-query-engine/src/drivers/query/fallback/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use serde_json::Value;
use std::collections::HashMap;
use tracing::{debug, error};

const ELASTIC_FETCH_SIZE: u64 = 1000;

/// Fallback client for Elasticsearch HTTP API
pub struct ElasticHttpFallback {
client: Client,
Expand Down Expand Up @@ -80,11 +82,23 @@ impl FallbackClient for ElasticHttpFallback {

debug!("Full forwarding URL: {}", full_url);

let query_body: Value = match serde_json::from_str(&request.query) {
Ok(json) => json,
Err(e) => {
error!("Failed to parse query as JSON: {}", e);
return Err(StatusCode::INTERNAL_SERVER_ERROR);
let query_body: Value = match self.language {
QueryLanguage::elastic_sql => {
// query is a raw SQL string, need to wrap it for the ES SQL endpoint
serde_json::json!({
"query": request.query.trim().trim_end_matches(';'),
"fetch_size": ELASTIC_FETCH_SIZE,
})
}
_ => {
// query is already a JSON string (Query DSL)
match serde_json::from_str(&request.query) {
Ok(json) => json,
Err(e) => {
error!("Failed to parse query as JSON: {}", e);
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
}
}
};

Expand Down
6 changes: 3 additions & 3 deletions asap-query-engine/src/engines/simple_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1132,7 +1132,7 @@ impl SimpleEngine {
return None;
}
&SchemaConfig::ElasticQueryDSL => todo!(),
&SchemaConfig::ElasticSQL => todo!(),
SchemaConfig::ElasticSQL(sql_schema) => sql_schema.clone(),
};

let statements = parser::parse_sql(&GenericDialect {}, query.as_str()).unwrap();
Expand Down Expand Up @@ -1473,7 +1473,7 @@ impl SimpleEngine {
QueryLanguage::promql => self.handle_query_promql(query, time),
QueryLanguage::sql => self.handle_query_sql(query, time),
QueryLanguage::elastic_querydsl => self.handle_query_elastic(),
QueryLanguage::elastic_sql => self.handle_query_elastic(),
QueryLanguage::elastic_sql => self.handle_query_sql(query, time),
}
}

Expand Down Expand Up @@ -1919,7 +1919,7 @@ impl SimpleEngine {
warn!("PromQL query requested but config has ElasticQueryDSL schema");
return None;
}
&SchemaConfig::ElasticSQL => {
SchemaConfig::ElasticSQL(_) => {
warn!("PromQL query requested but config has ElasticSQL schema");
return None;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ mod tests {
}
SchemaConfig::SQL(_) => panic!("Expected PromQL schema"),
SchemaConfig::ElasticQueryDSL => panic!("Expected PromQL schema"),
SchemaConfig::ElasticSQL => panic!("Expected PromQL schema"),
SchemaConfig::ElasticSQL(_) => panic!("Expected PromQL schema"),
}

// Verify query configs (2 queries: PromQL + SQL)
Expand Down
Loading