From 3be0d512ee72a37a8348828b982c68c07e719bb4 Mon Sep 17 00:00:00 2001 From: Kavya Bhat Date: Mon, 23 Mar 2026 12:06:13 -0600 Subject: [PATCH 1/3] Update ElasticSQL handlers --- Cargo.lock | 11 +-- .../sketch_db_common/src/inference_config.rs | 9 +- .../src/data_model/streaming_config.rs | 2 +- .../drivers/query/adapters/elastic_http.rs | 83 +++++++++++-------- .../src/drivers/query/fallback/elastic.rs | 22 +++-- .../src/engines/simple_engine.rs | 6 +- .../tests/test_utilities/config_builders.rs | 2 +- 7 files changed, 77 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4de8ab3..b0bbcd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3446,8 +3446,8 @@ dependencies = [ "bincode", "chrono", "clap 4.5.60", - "ctor", "criterion", + "ctor", "dashmap 5.5.3", "datafusion", "datafusion_summary_library", @@ -3541,15 +3541,6 @@ dependencies = [ "getrandom 0.2.17", ] -[[package]] -name = "rand_core" -version = "0.9.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c" -dependencies = [ - "getrandom 0.3.4", -] - [[package]] name = "rayon" version = "1.11.0" diff --git a/asap-common/dependencies/rs/sketch_db_common/src/inference_config.rs b/asap-common/dependencies/rs/sketch_db_common/src/inference_config.rs index 48bd2c4..5778a6c 100644 --- a/asap-common/dependencies/rs/sketch_db_common/src/inference_config.rs +++ b/asap-common/dependencies/rs/sketch_db_common/src/inference_config.rs @@ -17,7 +17,7 @@ pub enum SchemaConfig { PromQL(PromQLSchema), SQL(SQLSchema), ElasticQueryDSL, - ElasticSQL, + ElasticSQL(SQLSchema), } #[derive(Debug, Clone)] @@ -33,7 +33,7 @@ impl InferenceConfig { QueryLanguage::promql => SchemaConfig::PromQL(PromQLSchema::new()), QueryLanguage::sql => SchemaConfig::SQL(SQLSchema::new(Vec::new())), QueryLanguage::elastic_querydsl => SchemaConfig::ElasticQueryDSL, - QueryLanguage::elastic_sql => SchemaConfig::ElasticSQL, + QueryLanguage::elastic_sql => SchemaConfig::ElasticSQL(SQLSchema::new(Vec::new())), }; Self { schema, @@ -61,7 +61,10 @@ impl InferenceConfig { SchemaConfig::SQL(sql_schema) } QueryLanguage::elastic_querydsl => SchemaConfig::ElasticQueryDSL, - QueryLanguage::elastic_sql => SchemaConfig::ElasticSQL, + QueryLanguage::elastic_sql => { + let sql_schema = Self::parse_sql_schema(data)?; + SchemaConfig::SQL(sql_schema) + } }; let cleanup_policy = Self::parse_cleanup_policy(data)?; diff --git a/asap-query-engine/src/data_model/streaming_config.rs b/asap-query-engine/src/data_model/streaming_config.rs index 746a632..127a8c2 100644 --- a/asap-query-engine/src/data_model/streaming_config.rs +++ b/asap-query-engine/src/data_model/streaming_config.rs @@ -72,7 +72,7 @@ impl StreamingConfig { SchemaConfig::PromQL(_) => QueryLanguage::promql, SchemaConfig::SQL(_) => QueryLanguage::sql, SchemaConfig::ElasticQueryDSL => QueryLanguage::elastic_querydsl, - SchemaConfig::ElasticSQL => QueryLanguage::elastic_sql, + SchemaConfig::ElasticSQL(_) => QueryLanguage::elastic_sql, }) .unwrap_or(QueryLanguage::promql); // Default to promql if no inference_config diff --git a/asap-query-engine/src/drivers/query/adapters/elastic_http.rs b/asap-query-engine/src/drivers/query/adapters/elastic_http.rs index fce8a9f..ca4062c 100644 --- a/asap-query-engine/src/drivers/query/adapters/elastic_http.rs +++ b/asap-query-engine/src/drivers/query/adapters/elastic_http.rs @@ -1,6 +1,7 @@ use super::config::AdapterConfig; use super::traits::*; use crate::data_model::QueryLanguage; +use crate::QueryResult; use async_trait::async_trait; use axum::{ body::Bytes, @@ -11,7 +12,7 @@ use axum::{ use serde_json::{json, Value}; use std::collections::HashMap; use std::sync::Arc; -use tracing::{debug, error}; +use tracing::{debug, error, info}; /// Elasticsearch HTTP protocol adapter pub struct ElasticHttpAdapter { @@ -26,34 +27,29 @@ impl ElasticHttpAdapter { /// Parse Elasticsearch query from JSON body fn parse_elasticsearch_query(&self, body: &Bytes) -> Result { - debug!( - "Elasticsearch adapter: parsing query for language {:?}", - self.config.language - ); - - // Parse the JSON body let json_body: Value = serde_json::from_slice(body) .map_err(|e| AdapterError::ParseError(format!("Invalid JSON: {}", e)))?; - // Store the entire query as a JSON string - let query = serde_json::to_string(&json_body) - .map_err(|e| AdapterError::ParseError(format!("Failed to serialize query: {}", e)))?; + // Extract the SQL string from the "query" field + let query = match &self.config.language { + QueryLanguage::elastic_sql => json_body + .get("query") + .and_then(|v| v.as_str()) + .ok_or_else(|| AdapterError::MissingParameter("query".to_string()))? + .to_string(), + _ => { + // For QueryDSL, keep the full JSON as before + serde_json::to_string(&json_body).map_err(|e| { + AdapterError::ParseError(format!("Failed to serialize query: {}", e)) + })? + } + }; let time = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs_f64(); - debug!( - "Elasticsearch adapter: parsed {} query with time={}", - if matches!(self.config.language, QueryLanguage::elastic_sql) { - "SQL" - } else { - "Query DSL" - }, - time - ); - Ok(ParsedQueryRequest { query, time }) } } @@ -126,34 +122,51 @@ impl QueryRequestAdapter for ElasticHttpAdapter { impl QueryResponseAdapter for ElasticHttpAdapter { async fn format_success_response( &self, - _result: &QueryExecutionResult, + result: &QueryExecutionResult, ) -> Result { - debug!("Elasticsearch adapter: formatting success response"); + info!( + "SKETCH HIT: serving {} rows from precomputed sketches", + match &result.query_result { + QueryResult::Vector(v) => v.values.len(), + QueryResult::Matrix(_) => 0, + } + ); - // For now, since we're falling back for every query, - // the result from the fallback will be passed through - // In the future, this could transform local execution results - // to Elasticsearch format + let label_names = &result.query_output_labels.labels; + + let hits: Vec = match &result.query_result { + QueryResult::Vector(instant_vector) => instant_vector + .values + .iter() + .map(|element| { + let mut source = serde_json::Map::new(); + for (i, label_name) in label_names.iter().enumerate() { + let label_value = element.labels.get(i).map(|s| s.as_str()).unwrap_or(""); + source.insert(label_name.clone(), json!(label_value)); + } + source.insert("value".to_string(), json!(element.value)); + json!({ + "_source": source + }) + }) + .collect(), + QueryResult::Matrix(_) => { + return Err(StatusCode::NOT_IMPLEMENTED); + } + }; - // Return a stub Elasticsearch-style response for now. let response = json!({ "took": 0, "timed_out": false, "hits": { "total": { - "value": 0, + "value": hits.len(), "relation": "eq" }, - "hits": [] + "hits": hits } }); - debug!( - "Elasticsearch adapter: returning stub response: {}", - serde_json::to_string_pretty(&response) - .unwrap_or_else(|_| "Unable to format".to_string()) - ); - Ok(Json(response).into_response()) } diff --git a/asap-query-engine/src/drivers/query/fallback/elastic.rs b/asap-query-engine/src/drivers/query/fallback/elastic.rs index 5617faa..e91cc29 100644 --- a/asap-query-engine/src/drivers/query/fallback/elastic.rs +++ b/asap-query-engine/src/drivers/query/fallback/elastic.rs @@ -80,11 +80,23 @@ impl FallbackClient for ElasticHttpFallback { debug!("Full forwarding URL: {}", full_url); - let query_body: Value = match serde_json::from_str(&request.query) { - Ok(json) => json, - Err(e) => { - error!("Failed to parse query as JSON: {}", e); - return Err(StatusCode::INTERNAL_SERVER_ERROR); + let query_body: Value = match self.language { + QueryLanguage::elastic_sql => { + // query is a raw SQL string, need to wrap it for the ES SQL endpoint + serde_json::json!({ + "query": request.query.trim().trim_end_matches(';'), + "fetch_size": 1000, + }) + } + _ => { + // query is already a JSON string (Query DSL) + match serde_json::from_str(&request.query) { + Ok(json) => json, + Err(e) => { + error!("Failed to parse query as JSON: {}", e); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + } } }; diff --git a/asap-query-engine/src/engines/simple_engine.rs b/asap-query-engine/src/engines/simple_engine.rs index e60fa58..2753d66 100644 --- a/asap-query-engine/src/engines/simple_engine.rs +++ b/asap-query-engine/src/engines/simple_engine.rs @@ -1132,7 +1132,7 @@ impl SimpleEngine { return None; } &SchemaConfig::ElasticQueryDSL => todo!(), - &SchemaConfig::ElasticSQL => todo!(), + SchemaConfig::ElasticSQL(sql_schema) => sql_schema.clone(), }; let statements = parser::parse_sql(&GenericDialect {}, query.as_str()).unwrap(); @@ -1473,7 +1473,7 @@ impl SimpleEngine { QueryLanguage::promql => self.handle_query_promql(query, time), QueryLanguage::sql => self.handle_query_sql(query, time), QueryLanguage::elastic_querydsl => self.handle_query_elastic(), - QueryLanguage::elastic_sql => self.handle_query_elastic(), + QueryLanguage::elastic_sql => self.handle_query_sql(query, time), } } @@ -1919,7 +1919,7 @@ impl SimpleEngine { warn!("PromQL query requested but config has ElasticQueryDSL schema"); return None; } - &SchemaConfig::ElasticSQL => { + SchemaConfig::ElasticSQL(_) => { warn!("PromQL query requested but config has ElasticSQL schema"); return None; } diff --git a/asap-query-engine/src/tests/test_utilities/config_builders.rs b/asap-query-engine/src/tests/test_utilities/config_builders.rs index 5af70ce..c88dad8 100644 --- a/asap-query-engine/src/tests/test_utilities/config_builders.rs +++ b/asap-query-engine/src/tests/test_utilities/config_builders.rs @@ -310,7 +310,7 @@ mod tests { } SchemaConfig::SQL(_) => panic!("Expected PromQL schema"), SchemaConfig::ElasticQueryDSL => panic!("Expected PromQL schema"), - SchemaConfig::ElasticSQL => panic!("Expected PromQL schema"), + SchemaConfig::ElasticSQL(_) => panic!("Expected PromQL schema"), } // Verify query configs (2 queries: PromQL + SQL) From 163bd9d57e071e663c48feb05b31590a3ef397e0 Mon Sep 17 00:00:00 2001 From: Kavya Bhat Date: Mon, 23 Mar 2026 12:22:25 -0600 Subject: [PATCH 2/3] Fix failing test --- .../src/drivers/query/adapters/elastic_http.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/asap-query-engine/src/drivers/query/adapters/elastic_http.rs b/asap-query-engine/src/drivers/query/adapters/elastic_http.rs index ca4062c..1182447 100644 --- a/asap-query-engine/src/drivers/query/adapters/elastic_http.rs +++ b/asap-query-engine/src/drivers/query/adapters/elastic_http.rs @@ -383,7 +383,6 @@ mod tests { let sql_query = json!({ "query": "SELECT status, COUNT(*) FROM logs GROUP BY status", - "fetch_size": 100, "time_zone": "UTC" }); @@ -392,8 +391,10 @@ mod tests { assert!(result.is_ok(), "SQL query with params should parse"); let parsed = result.unwrap(); - assert!(parsed.query.contains("fetch_size")); - assert!(parsed.query.contains("time_zone")); + assert!( + !parsed.query.contains("time_zone"), + "SQL adapter should extract only the query string, not extra params" + ); } /// Test: Invalid JSON should return error From 687f94789045f4c64b235fffafc57619fb8d0b41 Mon Sep 17 00:00:00 2001 From: Kavya Bhat Date: Mon, 23 Mar 2026 20:32:43 -0600 Subject: [PATCH 3/3] ASAP Elastic benchmarking pipeline --- .../src/ast_matching/sqlpattern_parser.rs | 32 +- .../clickhouse-benchmark-pipeline/config.env | 1 + .../data_exporter/src/main.rs | 87 ++--- .../csv_to_parquet.py | 43 +++ .../elastic_quantile_queries.sql | 102 ++++++ .../inference_config.yaml | 20 ++ .../elastic-asap-benchmarking/plot_latency.py | 63 ++++ .../run_elastic_benchmark.py | 312 ++++++++++++++++++ .../streaming_config.yaml | 24 ++ 9 files changed, 644 insertions(+), 40 deletions(-) create mode 100644 asap-tools/execution-utilities/elastic-asap-benchmarking/csv_to_parquet.py create mode 100644 asap-tools/execution-utilities/elastic-asap-benchmarking/elastic_quantile_queries.sql create mode 100644 asap-tools/execution-utilities/elastic-asap-benchmarking/inference_config.yaml create mode 100644 asap-tools/execution-utilities/elastic-asap-benchmarking/plot_latency.py create mode 100755 asap-tools/execution-utilities/elastic-asap-benchmarking/run_elastic_benchmark.py create mode 100644 asap-tools/execution-utilities/elastic-asap-benchmarking/streaming_config.yaml diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs index 549543d..3c833a0 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs @@ -336,6 +336,16 @@ impl SQLPatternParser { Expr::Function(func) if func.name.to_string().to_uppercase() == "DATEADD" => { self.parse_dateadd(func) } + + // Elastic semantics requires CAST() for datetime strings. + Expr::Cast { expr, .. } => match expr.as_ref() { + Expr::Value(ValueWithSpan { + value: SingleQuotedString(datetime_str), + .. + }) => Self::get_timestamp_from_datetime_str(datetime_str), + _ => None, + }, + _ => { panic!("invalid time syntax {:?}", highlow); } @@ -362,8 +372,8 @@ impl SQLPatternParser { _ => return None, }; - let start = self.get_timestamp_from_between_highlow(low.as_ref())?; - let end = self.get_timestamp_from_between_highlow(high.as_ref())?; + let start = self.get_timestamp_from_between_highlow(low)?; + let end = self.get_timestamp_from_between_highlow(high)?; let duration = end - start; @@ -388,6 +398,10 @@ impl SQLPatternParser { FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Identifier(ident))) => { ident.value.to_lowercase() } + FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(ValueWithSpan { + value: SingleQuotedString(s), + .. + }))) => s.to_lowercase(), _ => return None, }; @@ -429,6 +443,20 @@ impl SQLPatternParser { value: SingleQuotedString(datetime_str), span: _, }))) => parse_datetime(datetime_str).ok()?.timestamp().as_second() as f64, + + FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Cast { expr, .. })) => { + match expr.as_ref() { + Expr::Value(ValueWithSpan { + value: SingleQuotedString(datetime_str), + .. + }) => parse_datetime(datetime_str).ok()?.timestamp().as_second() as f64, + _ => { + println!("Unsupported CAST expression in DATEADD"); + return None; + } + } + } + _ => { println!("time upper bound not calculating from present"); return None; diff --git a/asap-tools/execution-utilities/clickhouse-benchmark-pipeline/config.env b/asap-tools/execution-utilities/clickhouse-benchmark-pipeline/config.env index 174e901..8eb58fb 100644 --- a/asap-tools/execution-utilities/clickhouse-benchmark-pipeline/config.env +++ b/asap-tools/execution-utilities/clickhouse-benchmark-pipeline/config.env @@ -38,3 +38,4 @@ ES_PORT=9200 ES_INDEX_NAME=h2o_benchmark ES_BULK_SIZE=10000 ES_API_KEY=your-api-key +TOTAL_RECORDS=5000000 diff --git a/asap-tools/execution-utilities/clickhouse-benchmark-pipeline/data_exporter/src/main.rs b/asap-tools/execution-utilities/clickhouse-benchmark-pipeline/data_exporter/src/main.rs index 19635d8..3eb9f5a 100644 --- a/asap-tools/execution-utilities/clickhouse-benchmark-pipeline/data_exporter/src/main.rs +++ b/asap-tools/execution-utilities/clickhouse-benchmark-pipeline/data_exporter/src/main.rs @@ -888,49 +888,60 @@ async fn run_h2o_elasticsearch_mode(args: &Args) -> Result<(), Box Result<(), Box List[Tuple[str, str]]: + """Extract query ID and SQL from a .sql file. + Matches comments like: -- T001: description + followed by a SELECT statement ending in ; + """ + queries = [] + with open(sql_file, "r") as f: + content = f.read() + + pattern = r"-- ([A-Za-z0-9_]+):[^\n]*\n(SELECT[^;]+;)" + matches = re.findall(pattern, content, re.DOTALL | re.IGNORECASE) + + for query_id, sql in matches: + queries.append((query_id, sql.strip())) + + return queries + + +def run_query( + query: str, + elastic_host: str, + elastic_port: int, + api_key: Optional[str], + timeout: int = 30, + fetch_size: int = 1000, +) -> Tuple[float, Optional[list], Optional[str]]: + """ + Run a query against Elasticsearch SQL API. + Returns (latency_ms, rows, error). + + Uses POST /_sql?format=json which is the standard ES SQL endpoint. + """ + url = f"http://{elastic_host}:{elastic_port}/_sql?format=json" + + headers = {"Content-Type": "application/json"} + if api_key: + headers["Authorization"] = f"ApiKey {api_key}" + + body = { + "query": query.strip().rstrip(";"), + "fetch_size": fetch_size, + } + + try: + start_time = time.time() + response = requests.post( + url, + headers=headers, + json=body, + timeout=timeout, + ) + latency_ms = (time.time() - start_time) * 1000 + + # if response.status_code == 200: + # data = response.json() + # print(f"DEBUG: {json.dumps(data)[:500]}") + + if response.status_code == 200: + data = response.json() + # Elasticsearch SQL format + if "rows" in data: + rows = data["rows"] + # Elasticsearch hits format (ASAP sketch hit) + elif "hits" in data and "hits" in data.get("hits", {}): + rows = data["hits"]["hits"] + # Prometheus/ASAP format + elif "data" in data and "result" in data.get("data", {}): + rows = data["data"]["result"] + else: + rows = [] + return latency_ms, rows, None + else: + error = f"HTTP {response.status_code}: {response.text[:200]}" + return latency_ms, None, error + + except requests.Timeout: + return timeout * 1000, None, "Timeout" + except Exception as e: + return 0, None, str(e) + + +def get_query_pattern(query_id: str) -> str: + """Categorize query by ID prefix, mirroring the ClickHouse benchmark script.""" + if query_id.startswith("ST"): + return "SpatioTemporal" + elif query_id.startswith("S"): + return "Spatial" + elif query_id.startswith("T"): + return "Temporal" + elif query_id.startswith("N"): + return "Nested" + elif query_id.startswith("D"): + return "Dated" + elif query_id.startswith("L"): + return "LongRange" + elif query_id.startswith("Q"): + return "Aggregate" + else: + return "Unknown" + + +def run_benchmark( + sql_file: Path, + elastic_host: str, + elastic_port: int, + api_key: Optional[str], + output_csv: Path, + mode: str = "baseline", + query_filter: Optional[List[str]] = None, + timeout: int = 30, +): + """Run all queries and save results to CSV.""" + # Override port for asap mode + if mode == "asap": + elastic_port = ASAP_PORT + + print(f"\nRunning Elasticsearch benchmark in {mode} mode...") + print(f"Endpoint: http://{elastic_host}:{elastic_port}/_sql") + print(f"Output: {output_csv}") + + queries = extract_queries_from_sql(sql_file) + + if query_filter: + queries = [(qid, sql) for qid, sql in queries if qid in query_filter] + + print(f"Found {len(queries)} queries\n") + + with open(output_csv, "w", newline="") as csvfile: + writer = csv.writer(csvfile) + writer.writerow( + [ + "query_id", + "query_pattern", + "latency_ms", + "result_rows", + "result_preview", + "error", + "mode", + ] + ) + + for query_id, sql in queries: + print(f"Running {query_id}...", end=" ", flush=True) + + pattern = get_query_pattern(query_id) + + latency_ms, rows, error = run_query( + sql, + elastic_host, + elastic_port, + api_key, + timeout=timeout, + ) + + if error: + print(f"✗ {error}") + writer.writerow( + [query_id, pattern, f"{latency_ms:.2f}", 0, "", error, mode] + ) + else: + num_rows = len(rows) if rows else 0 + # Preview: first row as a short string + preview = str(rows[0])[:100] if rows else "" + print(f"✓ {latency_ms:.2f}ms ({num_rows} rows)") + writer.writerow( + [ + query_id, + pattern, + f"{latency_ms:.2f}", + num_rows, + preview, + "", + mode, + ] + ) + + # Small delay between queries to avoid hammering the cluster + time.sleep(0.1) + + print(f"\n✓ Results saved to {output_csv}") + + +def check_connection( + elastic_host: str, elastic_port: int, api_key: Optional[str], mode: str = "baseline" +) -> bool: + """Verify Elasticsearch is reachable and the SQL plugin is available.""" + if mode == "asap": + elastic_port = ASAP_PORT + + url = f"http://{elastic_host}:{elastic_port}/_sql?format=json" + headers = {"Content-Type": "application/json"} + if api_key: + headers["Authorization"] = f"ApiKey {api_key}" + + try: + response = requests.post( + url, + headers=headers, + json={"query": "SELECT 1"}, + timeout=5, + ) + if response.status_code == 200: + print(f"✓ Connected to Elasticsearch at {elastic_host}:{elastic_port}") + return True + else: + print( + f"✗ Elasticsearch SQL returned HTTP {response.status_code}: {response.text[:200]}" + ) + return False + except Exception as e: + print(f"✗ Could not connect to Elasticsearch: {e}") + return False + + +def main(): + parser = argparse.ArgumentParser( + description="Benchmark queries against Elasticsearch SQL API" + ) + parser.add_argument( + "--elastic-host", + default="localhost", + help="Elasticsearch host (default: localhost)", + ) + parser.add_argument( + "--elastic-port", + type=int, + default=9200, + help="Elasticsearch port (default: 9200, overridden to 8088 when --mode asap)", + ) + parser.add_argument( + "--api-key", + default=None, + help="Elasticsearch API key (optional)", + ) + parser.add_argument( + "--sql-file", + required=True, + help="Path to .sql file containing queries", + ) + parser.add_argument( + "--output", + default="elastic_results.csv", + help="Output CSV file (default: elastic_results.csv)", + ) + parser.add_argument( + "--mode", + default="baseline", + help="Label for the 'mode' column in output CSV (default: baseline). Use 'asap' to target port 8088.", + ) + parser.add_argument( + "--filter", + default=None, + help="Comma-separated query IDs to run (e.g. T000,T001,Q1)", + ) + parser.add_argument( + "--timeout", + type=int, + default=30, + help="Per-query timeout in seconds (default: 30)", + ) + + args = parser.parse_args() + + # Verify connection first + # if not check_connection(args.elastic_host, args.elastic_port, args.api_key, args.mode): + # return 1 + + sql_file = Path(args.sql_file) + if not sql_file.exists(): + print(f"✗ SQL file not found: {sql_file}") + return 1 + + query_filter = [q.strip() for q in args.filter.split(",")] if args.filter else None + + run_benchmark( + sql_file=sql_file, + elastic_host=args.elastic_host, + elastic_port=args.elastic_port, + api_key=args.api_key, + output_csv=Path(args.output), + mode=args.mode, + query_filter=query_filter, + timeout=args.timeout, + ) + + return 0 + + +if __name__ == "__main__": + exit(main()) diff --git a/asap-tools/execution-utilities/elastic-asap-benchmarking/streaming_config.yaml b/asap-tools/execution-utilities/elastic-asap-benchmarking/streaming_config.yaml new file mode 100644 index 0000000..3e2a59a --- /dev/null +++ b/asap-tools/execution-utilities/elastic-asap-benchmarking/streaming_config.yaml @@ -0,0 +1,24 @@ +# ASAP Streaming Config for H2O Benchmark Dataset (Elasticsearch) + +tables: + - name: h2o_benchmark + time_column: timestamp + metadata_columns: [id1, id2, id3, id4, id5, id6] + value_columns: [v1, v2, v3] + +aggregations: + - aggregationId: 12 + aggregationType: DatasketchesKLL + aggregationSubType: '' + labels: + grouping: [id1, id2] + rollup: [id3, id4, id5, id6] + aggregated: [] + table_name: h2o_benchmark + value_column: v3 + parameters: + K: 200 + tumblingWindowSize: 10 + windowSize: 10 + windowType: tumbling + spatialFilter: ''