diff --git a/asap-tools/execution-utilities/benchmark/README.md b/asap-tools/execution-utilities/benchmark/README.md new file mode 100644 index 0000000..9a0608c --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/README.md @@ -0,0 +1,344 @@ +# ASAP Generalized Benchmark Pipeline + +Measures ASAP query latency (KLL sketch) against ClickHouse baseline for +arbitrary datasets. Supports ClickBench and H2O groupby out of the box. + +## Architecture + +``` +data_file → prepare_data.py → arroyo_file.json + ↓ + export_to_arroyo.py (file source) + ↓ + sketch_topic (Kafka) + ↓ + QueryEngineRust :8088 + ↓ +data_file → export_to_database.py run_benchmark.py → results/ + ↓ + ClickHouse :8123 (baseline) +``` + +**Key difference from the old pipeline:** Arroyo reads directly from a local +file (`single_file_custom` connector) rather than from a Kafka input topic. +Kafka is still required for the **sketch output** topic (`sketch_topic`). + +--- + +## Prerequisites + +```bash +export INSTALL_DIR=/scratch/sketch_db_for_prometheus +pip3 install --user -r requirements.txt + +# Build binaries (one-time) +cd ~/ASAPQuery/asap-query-engine && cargo build --release +``` + +--- + +## ClickBench + ClickHouse End-to-End Example + +### Step 1 — Download dataset + +```bash +cd ~/ASAPQuery/asap-tools/execution-utilities/benchmark +python download_dataset.py --dataset clickbench --output-dir ./data +``` + +Optionally limit to 1M rows: + +```bash +cd ./data +mv hits.json.gz hits_full.json.gz +zcat hits_full.json.gz | head -n 1000000 | gzip > hits.json.gz +``` + +### Step 2 — Prepare data for Arroyo file source + +The Arroyo file source requires RFC3339 timestamps and string metadata columns. +This step converts the raw ClickBench JSON: + +```bash +python prepare_data.py \ + --dataset clickbench \ + --input ./data/hits.json.gz \ + --output ./data/hits_arroyo.json \ + --max-rows 1000000 +``` + +This produces `hits_arroyo.json` with: +- `EventTime` converted from `"2013-07-14 20:38:47"` → `"2013-07-14T20:38:47Z"` +- `RegionID`, `OS`, `UserAgent`, `TraficSourceID` as strings +- Records sorted by `EventTime` + +### Step 3 — Start infrastructure + +```bash +# Kafka +~/ASAPQuery/asap-tools/installation/kafka/run.sh $INSTALL_DIR/kafka + +# Create sketch output topic +KAFKA=$INSTALL_DIR/kafka/bin +$KAFKA/kafka-topics.sh --bootstrap-server localhost:9092 --create \ + --topic sketch_topic --partitions 1 --replication-factor 1 \ + --config max.message.bytes=20971520 + +# ClickHouse +~/ASAPQuery/asap-tools/installation/clickhouse/run.sh $INSTALL_DIR +``` + +### Step 4 — Start Arroyo cluster + +```bash +~/ASAPQuery/asap-summary-ingest/target/release/arroyo \ + --config ~/ASAPQuery/asap-summary-ingest/config.yaml cluster \ + > /tmp/arroyo.log 2>&1 & +``` + +### Step 5 — Launch Arroyo sketch pipeline (file source) + +```bash +python export_to_arroyo.py \ + --streaming-config ./configs/clickbench_streaming.yaml \ + --source-type file \ + --input-file ./data/hits_arroyo.json \ + --file-format json \ + --ts-format rfc3339 \ + --pipeline-name clickbench_pipeline \ + --arroyosketch-dir ~/ASAPQuery/asap-summary-ingest \ + --output-dir ./arroyo_outputs +``` + +### Step 6 — Start QueryEngineRust + +```bash +cd ~/ASAPQuery/asap-query-engine +nohup ./target/release/query_engine_rust \ + --kafka-topic sketch_topic --input-format json \ + --config ~/ASAPQuery/asap-tools/execution-utilities/benchmark/configs/clickbench_inference.yaml \ + --streaming-config ~/ASAPQuery/asap-tools/execution-utilities/benchmark/configs/clickbench_streaming.yaml \ + --http-port 8088 --delete-existing-db --log-level DEBUG \ + --output-dir ./output --streaming-engine arroyo \ + --query-language SQL --lock-strategy per-key \ + --prometheus-scrape-interval 1 > /tmp/query_engine.log 2>&1 & +``` + +### Step 7 — Load data into ClickHouse (baseline) + +```bash +cd ~/ASAPQuery/asap-tools/execution-utilities/benchmark +python export_to_database.py \ + --dataset clickbench \ + --file-path ./data/hits.json.gz \ + --clickhouse-url "http://localhost:8123/" \ + --init-sql-file ./configs/clickbench_hits_init.sql +``` + +Verify: `$INSTALL_DIR/clickhouse client --query "SELECT count(*) FROM hits"` + +### Step 8 — Generate SQL query files + +```bash +python generate_queries.py \ + --table-name hits \ + --ts-column EventTime \ + --value-column ResolutionWidth \ + --group-by-columns RegionID,OS,UserAgent,TraficSourceID \ + --window-size 10 \ + --num-queries 50 \ + --ts-format datetime \ + --window-form dateadd \ + --auto-detect-timestamps \ + --data-file ./data/hits_arroyo.json \ + --data-file-format json \ + --output-prefix ./queries/clickbench +``` + +This writes `queries/clickbench_asap.sql` and `queries/clickbench_clickhouse.sql`. + +### Step 9 — Run benchmark + +```bash +python run_benchmark.py \ + --mode both \ + --asap-sql-file ./queries/clickbench_asap.sql \ + --baseline-sql-file ./queries/clickbench_clickhouse.sql \ + --output-dir ./results \ + --output-prefix clickbench +``` + +Results: `results/clickbench_asap.csv`, `results/clickbench_baseline.csv`, +`results/clickbench_comparison.png`. + +--- + +## H2O GroupBy End-to-End Example + +### Step 1 — Download dataset + +```bash +python download_dataset.py --dataset h2o --output-dir ./data +``` + +### Step 2 — Prepare data for Arroyo file source + +```bash +python prepare_data.py \ + --dataset h2o \ + --input ./data/G1_1e7_1e2_0_0.csv \ + --output ./data/h2o_arroyo.json \ + --max-rows 1000000 +``` + +### Steps 3–4 — Start infrastructure and Arroyo (same as ClickBench) + +### Step 5 — Launch Arroyo sketch pipeline + +```bash +python export_to_arroyo.py \ + --streaming-config ./configs/h2o_streaming.yaml \ + --source-type file \ + --input-file ./data/h2o_arroyo.json \ + --file-format json \ + --ts-format rfc3339 \ + --pipeline-name h2o_pipeline \ + --arroyosketch-dir ~/ASAPQuery/asap-summary-ingest \ + --output-dir ./arroyo_outputs +``` + +### Step 6 — Start QueryEngineRust + +```bash +cd ~/ASAPQuery/asap-query-engine +nohup ./target/release/query_engine_rust \ + --kafka-topic sketch_topic --input-format json \ + --config ~/ASAPQuery/asap-tools/execution-utilities/benchmark/configs/h2o_inference.yaml \ + --streaming-config ~/ASAPQuery/asap-tools/execution-utilities/benchmark/configs/h2o_streaming.yaml \ + --http-port 8088 --delete-existing-db --log-level DEBUG \ + --output-dir ./output --streaming-engine arroyo \ + --query-language SQL --lock-strategy per-key \ + --prometheus-scrape-interval 1 > /tmp/query_engine.log 2>&1 & +``` + +### Step 7 — Load data into ClickHouse (baseline) + +```bash +python export_to_database.py \ + --dataset h2o \ + --file-path ./data/G1_1e7_1e2_0_0.csv \ + --init-sql-file ./configs/h2o_init.sql \ + --max-rows 1000000 +``` + +### Step 8 — Generate SQL query files + +```bash +python generate_queries.py \ + --table-name h2o_groupby \ + --ts-column timestamp \ + --value-column v1 \ + --group-by-columns id1,id2 \ + --window-size 10 \ + --num-queries 50 \ + --ts-format iso \ + --auto-detect-timestamps \ + --data-file ./data/h2o_arroyo.json \ + --data-file-format json \ + --output-prefix ./queries/h2o +``` + +### Step 9 — Run benchmark + +```bash +python run_benchmark.py \ + --mode both \ + --asap-sql-file ./queries/h2o_asap.sql \ + --baseline-sql-file ./queries/h2o_clickhouse.sql \ + --output-dir ./results \ + --output-prefix h2o +``` + +--- + +## Custom Dataset + +```bash +# 1. Download (any HTTP URL) +python download_dataset.py --dataset custom \ + --custom-url https://example.com/mydata.json.gz \ + --output-dir ./data + +# 2. Prepare (edit prepare_data.py for your schema, or skip if already RFC3339) + +# 3. Export to Arroyo +python export_to_arroyo.py \ + --streaming-config ./configs/my_streaming.yaml \ + --source-type file \ + --input-file ./data/mydata.json \ + --file-format json \ + --ts-format rfc3339 \ + --pipeline-name my_pipeline \ + --arroyosketch-dir ~/ASAPQuery/asap-summary-ingest + +# 4. Export to ClickHouse +python export_to_database.py \ + --dataset custom \ + --file-path ./data/mydata.json \ + --init-sql-file ./configs/my_init.sql \ + --table-name my_table + +# 5. Generate queries +python generate_queries.py \ + --table-name my_table \ + --ts-column event_time \ + --value-column metric_value \ + --group-by-columns region,host \ + --window-size 10 \ + --num-queries 50 \ + --auto-detect-timestamps \ + --data-file ./data/mydata.json \ + --output-prefix ./queries/my_dataset + +# 6. Run benchmark +python run_benchmark.py \ + --mode both \ + --asap-sql-file ./queries/my_dataset_asap.sql \ + --baseline-sql-file ./queries/my_dataset_clickhouse.sql \ + --output-dir ./results +``` + +--- + +## Reset + +```bash +pkill -f "arroyo"; pkill -f "query_engine_rust" +sleep 2 +pkill -f "kafka-server-start.sh"; pkill -f "clickhouse server" +sleep 2 +rm -rf /tmp/arroyo/ + +KAFKA=$INSTALL_DIR/kafka/bin +$KAFKA/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic sketch_topic + +cd ~/ASAPQuery/asap-summary-ingest +python3 delete_pipeline.py --all_pipelines + +$INSTALL_DIR/clickhouse client --query "TRUNCATE TABLE hits" +# or for H2O: $INSTALL_DIR/clickhouse client --query "TRUNCATE TABLE h2o_groupby" +``` + +--- + +## Files + +| File | Purpose | +|------|---------| +| `download_dataset.py` | Download ClickBench, H2O, or custom datasets | +| `prepare_data.py` | Convert raw data to Arroyo file source format (RFC3339, string columns) | +| `export_to_arroyo.py` | Launch Arroyo sketch pipeline (file or kafka source) | +| `export_to_database.py` | Load data into ClickHouse for baseline | +| `generate_queries.py` | Generate paired ASAP + ClickHouse SQL query files | +| `run_benchmark.py` | Run queries and produce CSV results + plots | +| `configs/` | Dataset-specific streaming/inference YAML and ClickHouse init SQL | diff --git a/asap-tools/execution-utilities/benchmark/configs/clickbench_hits_init.sql b/asap-tools/execution-utilities/benchmark/configs/clickbench_hits_init.sql new file mode 100644 index 0000000..b462fae --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/configs/clickbench_hits_init.sql @@ -0,0 +1,115 @@ +-- ClickHouse init for ClickBench baseline (MergeTree only, no Kafka engine) +-- Use this with export_to_database.py --dataset clickbench --init-sql-file + +CREATE TABLE IF NOT EXISTS hits +( + WatchID Int64, + JavaEnable UInt8, + Title String, + GoodEvent Int16, + EventTime DateTime, + EventDate Date, + CounterID UInt32, + ClientIP Int32, + RegionID UInt32, + UserID Int64, + CounterClass Int8, + OS UInt8, + UserAgent UInt8, + URL String, + Referer String, + IsRefresh UInt8, + RefererCategoryID UInt16, + RefererRegionID UInt32, + URLCategoryID UInt16, + URLRegionID UInt32, + ResolutionWidth UInt16, + ResolutionHeight UInt16, + ResolutionDepth UInt8, + FlashMajor UInt8, + FlashMinor UInt8, + FlashMinor2 String, + NetMajor UInt8, + NetMinor UInt8, + UserAgentMajor UInt16, + UserAgentMinor String, + CookieEnable UInt8, + JavascriptEnable UInt8, + IsMobile UInt8, + MobilePhone UInt8, + MobilePhoneModel String, + Params String, + IPNetworkID UInt32, + TraficSourceID Int8, + SearchEngineID UInt16, + SearchPhrase String, + AdvEngineID UInt8, + IsArtifical UInt8, + WindowClientWidth UInt16, + WindowClientHeight UInt16, + ClientTimeZone Int16, + ClientEventTime DateTime, + SilverlightVersion1 UInt8, + SilverlightVersion2 UInt8, + SilverlightVersion3 UInt32, + SilverlightVersion4 UInt16, + PageCharset String, + CodeVersion UInt32, + IsLink UInt8, + IsDownload UInt8, + IsNotBounce UInt8, + FUniqID Int64, + OriginalURL String, + HID UInt32, + IsOldCounter UInt8, + IsEvent UInt8, + IsParameter UInt8, + DontCountHits UInt8, + WithHash UInt8, + HitColor String, + LocalEventTime DateTime, + Age UInt8, + Sex UInt8, + Income UInt8, + Interests UInt16, + Robotness UInt8, + RemoteIP Int32, + WindowName Int32, + OpenerName Int32, + HistoryLength Int16, + BrowserLanguage String, + BrowserCountry String, + SocialNetwork String, + SocialAction String, + HTTPError UInt16, + SendTiming UInt32, + DNSTiming UInt32, + ConnectTiming UInt32, + ResponseStartTiming UInt32, + ResponseEndTiming UInt32, + FetchTiming UInt32, + SocialSourceNetworkID UInt8, + SocialSourcePage String, + ParamPrice Int64, + ParamOrderID String, + ParamCurrency String, + ParamCurrencyID UInt16, + OpenstatServiceName String, + OpenstatCampaignID String, + OpenstatAdID String, + OpenstatSourceID String, + UTMSource String, + UTMMedium String, + UTMCampaign String, + UTMContent String, + UTMTerm String, + FromTag String, + HasGCLID UInt8, + RefererHash Int64, + URLHash Int64, + CLID UInt32 +) +ENGINE = MergeTree +PARTITION BY toYYYYMM(EventDate) +ORDER BY (CounterID, EventDate, intHash32(UserID), EventTime, WatchID) +SETTINGS index_granularity = 8192; diff --git a/asap-tools/execution-utilities/benchmark/configs/clickbench_inference.yaml b/asap-tools/execution-utilities/benchmark/configs/clickbench_inference.yaml new file mode 100644 index 0000000..7c4af09 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/configs/clickbench_inference.yaml @@ -0,0 +1,21 @@ +# ASAP Inference Config for ClickBench Hits Dataset +# Source: asap_query_latency/inference_config.yaml + +tables: + - name: hits + time_column: EventTime + metadata_columns: [RegionID, OS, UserAgent, TraficSourceID] + value_columns: [ResolutionWidth] + +cleanup_policy: + name: read_based + +queries: + # Temporal queries (10s window, all labels) - QUANTILE + - aggregations: + - aggregation_id: 12 + read_count_threshold: 999999 + query: | + SELECT QUANTILE(0.95, ResolutionWidth) FROM hits + WHERE EventTime BETWEEN DATEADD(s, -10, NOW()) AND NOW() + GROUP BY RegionID, OS, UserAgent, TraficSourceID diff --git a/asap-tools/execution-utilities/benchmark/configs/clickbench_streaming.yaml b/asap-tools/execution-utilities/benchmark/configs/clickbench_streaming.yaml new file mode 100644 index 0000000..3d18e1e --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/configs/clickbench_streaming.yaml @@ -0,0 +1,26 @@ +# ASAP Streaming Config for ClickBench Hits Dataset +# Defines sketch aggregations for Arroyo to compute +# Source: asap_query_latency/streaming_config.yaml + +tables: + - name: hits + time_column: EventTime + metadata_columns: [RegionID, OS, UserAgent, TraficSourceID] + value_columns: [ResolutionWidth] + +aggregations: + # Temporal queries (10s window, all labels) - QUANTILE (DatasketchesKLL) + - aggregationId: 12 + aggregationType: DatasketchesKLL + aggregationSubType: '' + labels: + grouping: [RegionID, OS, UserAgent, TraficSourceID] + rollup: [] + aggregated: [] + table_name: hits + value_column: ResolutionWidth + parameters: + K: 200 + windowSize: 10 + windowType: tumbling + spatialFilter: '' diff --git a/asap-tools/execution-utilities/benchmark/configs/h2o_inference.yaml b/asap-tools/execution-utilities/benchmark/configs/h2o_inference.yaml new file mode 100644 index 0000000..0d1e45b --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/configs/h2o_inference.yaml @@ -0,0 +1,20 @@ +# ASAP Inference Config for H2O GroupBy Dataset +# Source: asap_benchmark_pipeline/inference_config.yaml + +tables: + - name: h2o_groupby + time_column: timestamp + metadata_columns: [id1, id2] + value_columns: [v1] + +cleanup_policy: + name: read_based + +queries: + - aggregations: + - aggregation_id: 12 + read_count_threshold: 999999 + query: |- + SELECT QUANTILE(0.95, v1) FROM h2o_groupby + WHERE timestamp BETWEEN DATEADD(s, -10, NOW()) AND NOW() + GROUP BY id1, id2; diff --git a/asap-tools/execution-utilities/benchmark/configs/h2o_init.sql b/asap-tools/execution-utilities/benchmark/configs/h2o_init.sql new file mode 100644 index 0000000..dbaf81c --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/configs/h2o_init.sql @@ -0,0 +1,20 @@ +-- ClickHouse init for H2O GroupBy baseline (MergeTree, direct load) +-- Use this with export_to_database.py --dataset h2o --init-sql-file +-- Source: asap_benchmark_pipeline/h2o_init.sql + +DROP TABLE IF EXISTS h2o_groupby; + +CREATE TABLE IF NOT EXISTS h2o_groupby +( + timestamp DateTime, + id1 String, + id2 String, + id3 String, + id4 Int32, + id5 Int32, + id6 Int32, + v1 Int32, + v2 Int32, + v3 Float64 +) ENGINE = MergeTree() +ORDER BY (id1, id2); diff --git a/asap-tools/execution-utilities/benchmark/configs/h2o_streaming.yaml b/asap-tools/execution-utilities/benchmark/configs/h2o_streaming.yaml new file mode 100644 index 0000000..c500d69 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/configs/h2o_streaming.yaml @@ -0,0 +1,26 @@ +# ASAP Streaming Config for H2O GroupBy Dataset +# Source: asap_benchmark_pipeline/streaming_config.yaml + +tables: + - name: h2o_groupby + time_column: timestamp + metadata_columns: [id1, id2] + value_columns: [v1] + +aggregations: + # Temporal queries (10s window, all labels) - QUANTILE (DatasketchesKLL) + - aggregationId: 12 + aggregationType: DatasketchesKLL + aggregationSubType: '' + labels: + grouping: [id1, id2] + rollup: [] + aggregated: [] + table_name: h2o_groupby + value_column: v1 + parameters: + K: 200 + tumblingWindowSize: 10 + windowSize: 10 + windowType: tumbling + spatialFilter: '' diff --git a/asap-tools/execution-utilities/benchmark/download_dataset.py b/asap-tools/execution-utilities/benchmark/download_dataset.py new file mode 100644 index 0000000..5226ae5 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/download_dataset.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 +""" +Unified dataset downloader for the ASAP benchmark pipeline. + +Supports ClickBench (hits.json.gz), H2O groupby (G1_1e7_1e2_0_0.csv), +or any custom HTTP URL. + +Usage: + python download_dataset.py --dataset clickbench --output-dir ./data + python download_dataset.py --dataset h2o --output-dir ./data + python download_dataset.py --dataset custom --custom-url https://... --output-dir ./data +""" + +import argparse +import os +import sys +import urllib.request + + +CLICKBENCH_URL = "https://datasets.clickhouse.com/hits_compatible/hits.json.gz" +CLICKBENCH_FILENAME = "hits.json.gz" + +H2O_FILE_ID = "15SVQjQ2QehzYDLoDonio4aP7xqdMiNyi" +H2O_FILENAME = "G1_1e7_1e2_0_0.csv" + + +def _http_download(url: str, output_path: str) -> str: + """Download a file via HTTP with progress reporting.""" + print(f"Downloading from {url}") + request = urllib.request.Request( + url, headers={"User-Agent": "Mozilla/5.0 (compatible; ASAP-Benchmark/1.0)"} + ) + try: + with urllib.request.urlopen(request) as response: + total_size = int(response.headers.get("Content-Length", 0)) + downloaded = 0 + last_percent = -1 + block_size = 8192 * 128 # ~1 MB blocks + + with open(output_path, "wb") as f: + while True: + block = response.read(block_size) + if not block: + break + f.write(block) + downloaded += len(block) + if total_size > 0: + percent = downloaded * 100 // total_size + if percent != last_percent: + last_percent = percent + mb = downloaded / (1024 * 1024) + total_mb = total_size / (1024 * 1024) + sys.stdout.write( + f"\rProgress: {percent}% ({mb:.1f}/{total_mb:.1f} MB)" + ) + sys.stdout.flush() + + print("\nDownload complete!") + return output_path + + except urllib.error.HTTPError as e: + print(f"\nDownload failed: HTTP {e.code} - {e.reason}") + raise + + +def download_clickbench(output_path: str, force: bool = False) -> str: + """Download hits.json.gz from ClickHouse datasets CDN.""" + if not force and os.path.exists(output_path): + print(f"Using existing file: {output_path}") + return output_path + print("Downloading ClickBench dataset (~14 GB compressed). Please wait...") + return _http_download(CLICKBENCH_URL, output_path) + + +def download_h2o(output_path: str, force: bool = False) -> str: + """Download H2O groupby CSV (~300 MB) from Google Drive via gdown.""" + if not force and os.path.exists(output_path) and os.path.getsize(output_path) > 100 * 1024 * 1024: + print(f"Using existing file: {output_path}") + return output_path + + try: + import gdown + except ImportError: + print("Installing gdown...") + import subprocess + subprocess.check_call([sys.executable, "-m", "pip", "install", "gdown"]) + import gdown + + print(f"Downloading H2O dataset via gdown (ID: {H2O_FILE_ID})...") + url = f"https://drive.google.com/uc?id={H2O_FILE_ID}" + gdown.download(url, output_path, quiet=False) + return output_path + + +def download_custom(url: str, output_path: str, force: bool = False) -> str: + """Download a dataset from an arbitrary HTTP URL.""" + if not force and os.path.exists(output_path): + print(f"Using existing file: {output_path}") + return output_path + return _http_download(url, output_path) + + +def main(): + parser = argparse.ArgumentParser( + description="Download benchmark datasets", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "--dataset", + choices=["clickbench", "h2o", "custom"], + required=True, + help="Dataset to download", + ) + parser.add_argument( + "--output-dir", + required=True, + help="Directory to save the downloaded file", + ) + parser.add_argument( + "--output-file", + default=None, + help="Exact output file path (overrides --output-dir)", + ) + parser.add_argument( + "--custom-url", + default=None, + help="URL to download (required when --dataset custom)", + ) + parser.add_argument( + "--force-redownload", + action="store_true", + help="Re-download even if the file already exists", + ) + args = parser.parse_args() + + if args.dataset == "custom" and not args.custom_url: + parser.error("--custom-url is required when --dataset custom") + + os.makedirs(args.output_dir, exist_ok=True) + + if args.output_file: + output_path = args.output_file + os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True) + elif args.dataset == "clickbench": + output_path = os.path.join(args.output_dir, CLICKBENCH_FILENAME) + elif args.dataset == "h2o": + output_path = os.path.join(args.output_dir, H2O_FILENAME) + else: + filename = args.custom_url.rstrip("/").split("/")[-1] or "data" + output_path = os.path.join(args.output_dir, filename) + + if args.dataset == "clickbench": + download_clickbench(output_path, force=args.force_redownload) + elif args.dataset == "h2o": + download_h2o(output_path, force=args.force_redownload) + else: + download_custom(args.custom_url, output_path, force=args.force_redownload) + + print(f"Dataset saved to: {output_path}") + + +if __name__ == "__main__": + main() diff --git a/asap-tools/execution-utilities/benchmark/export_to_arroyo.py b/asap-tools/execution-utilities/benchmark/export_to_arroyo.py new file mode 100644 index 0000000..6e72af7 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/export_to_arroyo.py @@ -0,0 +1,254 @@ +#!/usr/bin/env python3 +""" +Launch an Arroyo sketch pipeline against a dataset. + +Supports two source modes: + file (default): Arroyo reads directly from a local JSON/Parquet file. + No Kafka input topic is required. + kafka: Arroyo reads from a Kafka topic (legacy path). + +In both cases the sketch output is written to a Kafka topic (default: +sketch_topic) for consumption by QueryEngineRust. + +Usage: + # File source (recommended) + python export_to_arroyo.py \\ + --streaming-config configs/clickbench_streaming.yaml \\ + --source-type file \\ + --input-file ./data/hits.json.gz \\ + --file-format json \\ + --ts-format rfc3339 \\ + --pipeline-name clickbench_pipeline \\ + --arroyosketch-dir ~/ASAPQuery/asap-summary-ingest + + # Kafka source (legacy) + python export_to_arroyo.py \\ + --streaming-config configs/h2o_streaming.yaml \\ + --source-type kafka \\ + --input-kafka-topic h2o_groupby \\ + --pipeline-name h2o_pipeline \\ + --arroyosketch-dir ~/ASAPQuery/asap-summary-ingest +""" + +import argparse +import os +import subprocess +import sys +import time + +import requests + +DEFAULT_ARROYO_URL = "http://localhost:5115/api/v1" +DEFAULT_OUTPUT_KAFKA_TOPIC = "sketch_topic" +DEFAULT_PARALLELISM = 1 +DEFAULT_WAIT_TIMEOUT = 300 + + +def wait_for_pipeline_running( + pipeline_name: str, + arroyo_url: str = DEFAULT_ARROYO_URL, + timeout: int = DEFAULT_WAIT_TIMEOUT, +) -> bool: + """Poll the Arroyo API until the named pipeline reaches RUNNING state. + + Translated from asap_benchmark_pipeline/run_pipeline.sh lines 107-141. + A pipeline is considered running when its 'state' field is None and + 'stop' is 'none' (Arroyo's representation of a healthy running pipeline). + """ + print(f"Waiting for pipeline '{pipeline_name}' to reach RUNNING state...") + elapsed = 0 + while True: + state = "error" + try: + r = requests.get(f"{arroyo_url}/pipelines", timeout=5) + if r.ok: + data = r.json() + for p in data.get("data", []): + if p.get("name") == pipeline_name: + s = p.get("state") + stop = p.get("stop", "") + if s is None and stop == "none": + state = "running" + else: + state = str(s).lower() if s else "unknown" + break + else: + state = "not_found" + except Exception: + state = "error" + + if state == "running": + print(f"Pipeline '{pipeline_name}' is RUNNING") + return True + + print(f" Pipeline state: {state} (elapsed: {elapsed}s)") + time.sleep(5) + elapsed += 5 + if elapsed >= timeout: + print( + f"ERROR: Pipeline did not reach RUNNING state within {timeout}s" + ) + return False + + +def build_arroyosketch_cmd(args, arroyosketch_script: str) -> list: + """Build the run_arroyosketch.py command from our CLI arguments.""" + cmd = [ + sys.executable, + arroyosketch_script, + "--source_type", args.source_type, + "--output_format", "json", + "--pipeline_name", args.pipeline_name, + "--config_file_path", os.path.abspath(args.streaming_config), + "--output_kafka_topic", args.output_kafka_topic, + "--output_dir", os.path.abspath(args.output_dir), + "--parallelism", str(args.parallelism), + "--query_language", "sql", + ] + + if args.source_type == "file": + cmd += [ + "--input_file_path", os.path.abspath(args.input_file), + "--file_format", args.file_format, + "--ts_format", args.ts_format, + ] + elif args.source_type == "kafka": + cmd += [ + "--kafka_input_format", "json", + "--input_kafka_topic", args.input_kafka_topic, + ] + + return cmd + + +def main(): + parser = argparse.ArgumentParser( + description="Launch Arroyo sketch pipeline (file or kafka source)", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "--streaming-config", + required=True, + help="Path to streaming_config.yaml", + ) + parser.add_argument( + "--source-type", + choices=["file", "kafka"], + default="file", + help="Data source type (default: file)", + ) + # File source args + parser.add_argument( + "--input-file", + default=None, + help="Path to input data file (required for --source-type file)", + ) + parser.add_argument( + "--file-format", + choices=["json", "parquet"], + default="json", + help="File format (default: json)", + ) + parser.add_argument( + "--ts-format", + choices=["unix_millis", "unix_seconds", "rfc3339"], + default="rfc3339", + help="Timestamp format in the data file (default: rfc3339)", + ) + # Kafka source args + parser.add_argument( + "--input-kafka-topic", + default=None, + help="Kafka topic to read from (required for --source-type kafka)", + ) + # Common args + parser.add_argument( + "--output-kafka-topic", + default=DEFAULT_OUTPUT_KAFKA_TOPIC, + help=f"Kafka topic for sketch output (default: {DEFAULT_OUTPUT_KAFKA_TOPIC})", + ) + parser.add_argument( + "--pipeline-name", + required=True, + help="Arroyo pipeline name", + ) + parser.add_argument( + "--parallelism", + type=int, + default=DEFAULT_PARALLELISM, + help=f"Arroyo pipeline parallelism (default: {DEFAULT_PARALLELISM})", + ) + parser.add_argument( + "--arroyosketch-dir", + required=True, + help="Path to asap-summary-ingest/ directory (contains run_arroyosketch.py)", + ) + parser.add_argument( + "--arroyo-url", + default=DEFAULT_ARROYO_URL, + help=f"Arroyo API base URL (default: {DEFAULT_ARROYO_URL})", + ) + parser.add_argument( + "--output-dir", + default="./arroyo_outputs", + help="Directory for Arroyo pipeline output artifacts (default: ./arroyo_outputs)", + ) + parser.add_argument( + "--wait-for-pipeline", + action="store_true", + default=True, + help="Poll until pipeline reaches RUNNING state (default: True)", + ) + parser.add_argument( + "--no-wait", + action="store_true", + help="Do not wait for pipeline to reach RUNNING state", + ) + parser.add_argument( + "--wait-timeout", + type=int, + default=DEFAULT_WAIT_TIMEOUT, + help=f"Seconds to wait for RUNNING state (default: {DEFAULT_WAIT_TIMEOUT})", + ) + + args = parser.parse_args() + + # Validate source-specific required args + if args.source_type == "file" and not args.input_file: + parser.error("--input-file is required when --source-type file") + if args.source_type == "kafka" and not args.input_kafka_topic: + parser.error("--input-kafka-topic is required when --source-type kafka") + + arroyosketch_script = os.path.join( + os.path.abspath(args.arroyosketch_dir), "run_arroyosketch.py" + ) + if not os.path.exists(arroyosketch_script): + print(f"ERROR: run_arroyosketch.py not found at {arroyosketch_script}") + sys.exit(1) + + os.makedirs(args.output_dir, exist_ok=True) + + cmd = build_arroyosketch_cmd(args, arroyosketch_script) + print(f"Launching Arroyo pipeline '{args.pipeline_name}' ({args.source_type} source)...") + print(f"Command: {' '.join(cmd)}") + + result = subprocess.run(cmd) + if result.returncode != 0: + print(f"ERROR: run_arroyosketch.py exited with code {result.returncode}") + sys.exit(result.returncode) + + if not args.no_wait: + success = wait_for_pipeline_running( + args.pipeline_name, + arroyo_url=args.arroyo_url, + timeout=args.wait_timeout, + ) + if not success: + sys.exit(1) + + print("Done.") + + +if __name__ == "__main__": + main() diff --git a/asap-tools/execution-utilities/benchmark/export_to_database.py b/asap-tools/execution-utilities/benchmark/export_to_database.py new file mode 100644 index 0000000..d958364 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/export_to_database.py @@ -0,0 +1,353 @@ +#!/usr/bin/env python3 +""" +Load a dataset into ClickHouse for baseline comparison. + +Supports ClickBench (hits.json.gz), H2O groupby CSV, or a custom table. + +Usage: + # ClickBench + python export_to_database.py \\ + --dataset clickbench \\ + --file-path ./data/hits.json.gz \\ + --init-sql-file ../clickhouse-benchmark-pipeline/clickhouse/clickbench_init.sql + + # H2O + python export_to_database.py \\ + --dataset h2o \\ + --file-path ./data/G1_1e7_1e2_0_0.csv \\ + --init-sql-file ../asap_benchmark_pipeline/h2o_init.sql + + # Custom JSON file + python export_to_database.py \\ + --dataset custom \\ + --file-path ./data/mydata.json \\ + --table-name mytable \\ + --ts-column event_time \\ + --ts-assignment passthrough +""" + +import argparse +import gzip +import os +import subprocess +import sys +from datetime import datetime, timezone +from pathlib import Path + +import requests + +DEFAULT_CLICKHOUSE_URL = "http://localhost:8123/" +H2O_BATCH_SIZE = 50_000 +H2O_ROWS_PER_SECOND = 1000 +H2O_BASE_EPOCH = 1704067200 # 2024-01-01T00:00:00Z + + +def _exec_clickhouse_sql(clickhouse_url: str, sql: str, label: str = ""): + """Execute a SQL statement via the ClickHouse HTTP API.""" + r = requests.post(clickhouse_url, data=sql.encode()) + if not r.ok: + print(f" WARN [{label}]: {r.text.strip()[:200]}") + else: + short = sql.strip()[:80].replace("\n", " ") + print(f" OK: {short}") + + +def run_init_sql(clickhouse_url: str, init_sql_file: str): + """Execute DDL statements from a SQL file.""" + print(f"Running init SQL from {init_sql_file}...") + with open(init_sql_file) as f: + content = f.read() + stmts = [s.strip() for s in content.split(";") if s.strip()] + for stmt in stmts: + _exec_clickhouse_sql(clickhouse_url, stmt, label=stmt[:40]) + + +def check_row_count(clickhouse_url: str, table_name: str) -> int: + r = requests.post(clickhouse_url, data=f"SELECT count(*) FROM {table_name}") + if r.ok: + return int(r.text.strip()) + return 0 + + +def load_clickbench( + clickhouse_url: str, + file_path: str, + init_sql_file: str = None, + skip_table_init: bool = False, + skip_if_loaded: bool = False, + max_rows: int = 0, +): + """Load hits.json.gz into ClickHouse. + + Uses `zcat | clickhouse-client INSERT` for gzip-compressed JSON. + Adapted from asap_query_latency/run_benchmark.py:load_clickbench_data(). + """ + if not skip_table_init and init_sql_file: + run_init_sql(clickhouse_url, init_sql_file) + + if skip_if_loaded: + count = check_row_count(clickhouse_url, "hits") + if count > 0: + print(f"Data already loaded ({count:,} rows). Skipping.") + return True + + if not os.path.exists(file_path): + print(f"ERROR: Data file not found: {file_path}") + return False + + print(f"Loading ClickBench data from {file_path}...") + if max_rows > 0: + # Pipe through head to limit rows + cmd = ( + f"zcat {file_path} | head -n {max_rows} | " + f"clickhouse-client --query='INSERT INTO hits FORMAT JSONEachRow'" + ) + else: + cmd = ( + f"zcat {file_path} | " + f"clickhouse-client --query='INSERT INTO hits FORMAT JSONEachRow'" + ) + + result = subprocess.run(cmd, shell=True) + if result.returncode != 0: + print("ERROR: ClickHouse insert failed") + return False + + count = check_row_count(clickhouse_url, "hits") + print(f"Loaded {count:,} rows into ClickHouse (hits)") + return True + + +def _flush_h2o_batch(clickhouse_url: str, rows: list): + """Flush a batch of H2O rows to ClickHouse via HTTP INSERT.""" + sql = "INSERT INTO h2o_groupby VALUES " + ",".join(rows) + r = requests.post(clickhouse_url, data=sql.encode()) + if not r.ok: + raise RuntimeError(f"ClickHouse insert failed: {r.text[:200]}") + + +def load_h2o( + clickhouse_url: str, + file_path: str, + init_sql_file: str = None, + skip_table_init: bool = False, + skip_if_loaded: bool = False, + max_rows: int = 0, +): + """Load H2O groupby CSV into ClickHouse with synthetic timestamps. + + Timestamps are assigned at H2O_ROWS_PER_SECOND rows/sec starting from + H2O_BASE_EPOCH (2024-01-01T00:00:00Z). + Adapted from asap_benchmark_pipeline/run_benchmark.py:load_h2o_data_clickhouse(). + """ + if not skip_table_init and init_sql_file: + run_init_sql(clickhouse_url, init_sql_file) + + if skip_if_loaded: + count = check_row_count(clickhouse_url, "h2o_groupby") + if count > 0: + print(f"Data already loaded ({count:,} rows). Skipping.") + return True + + if not os.path.exists(file_path): + print(f"ERROR: Data file not found: {file_path}") + return False + + print(f"Inserting H2O data from {file_path} into ClickHouse...") + batch: list = [] + total = 0 + + with open(file_path, "r", encoding="utf-8") as f: + f.readline() # skip header + for i, line in enumerate(f): + if max_rows > 0 and i >= max_rows: + break + parts = line.rstrip("\n").split(",") + abs_sec = H2O_BASE_EPOCH + i // H2O_ROWS_PER_SECOND + ts = datetime.fromtimestamp(abs_sec, tz=timezone.utc) + ts_str = ts.strftime("%Y-%m-%d %H:%M:%S") + + batch.append( + f"('{ts_str}','{parts[0]}','{parts[1]}','{parts[2]}'," + f"{parts[3]},{parts[4]},{parts[5]}," + f"{parts[6]},{parts[7]},{parts[8]})" + ) + + if len(batch) >= H2O_BATCH_SIZE: + _flush_h2o_batch(clickhouse_url, batch) + total += len(batch) + batch = [] + if total % 500_000 == 0: + print(f" Inserted {total:,} rows...") + + if batch: + _flush_h2o_batch(clickhouse_url, batch) + total += len(batch) + + print(f"Loaded {total:,} rows into ClickHouse (h2o_groupby)") + return True + + +def load_custom( + clickhouse_url: str, + file_path: str, + table_name: str, + ts_column: str, + ts_assignment: str = "passthrough", + init_sql_file: str = None, + skip_table_init: bool = False, + skip_if_loaded: bool = False, + max_rows: int = 0, +): + """Load a custom JSON or CSV file into ClickHouse. + + For JSON files: uses INSERT FORMAT JSONEachRow via clickhouse-client. + ts_assignment='synthetic' is only supported for CSV (same logic as H2O). + """ + if not skip_table_init and init_sql_file: + run_init_sql(clickhouse_url, init_sql_file) + + if skip_if_loaded: + count = check_row_count(clickhouse_url, table_name) + if count > 0: + print(f"Data already loaded ({count:,} rows). Skipping.") + return True + + if not os.path.exists(file_path): + print(f"ERROR: Data file not found: {file_path}") + return False + + path_lower = file_path.lower() + if path_lower.endswith(".json.gz") or path_lower.endswith(".jsonl.gz"): + head_cmd = f"| head -n {max_rows}" if max_rows > 0 else "" + cmd = ( + f"zcat {file_path} {head_cmd} | " + f"clickhouse-client --query='INSERT INTO {table_name} FORMAT JSONEachRow'" + ) + print(f"Loading {file_path} into ClickHouse ({table_name})...") + result = subprocess.run(cmd, shell=True) + if result.returncode != 0: + print("ERROR: ClickHouse insert failed") + return False + elif path_lower.endswith(".json") or path_lower.endswith(".jsonl"): + head_cmd = f"head -n {max_rows} {file_path} | " if max_rows > 0 else "" + cmd = ( + f"{head_cmd}clickhouse-client --query='INSERT INTO {table_name} FORMAT JSONEachRow' " + f"< {file_path}" + ) + print(f"Loading {file_path} into ClickHouse ({table_name})...") + result = subprocess.run(cmd, shell=True) + if result.returncode != 0: + print("ERROR: ClickHouse insert failed") + return False + else: + print(f"ERROR: Unsupported file format for {file_path}. Use --dataset h2o for CSV.") + return False + + count = check_row_count(clickhouse_url, table_name) + print(f"Loaded {count:,} rows into ClickHouse ({table_name})") + return True + + +def main(): + parser = argparse.ArgumentParser( + description="Load a dataset into ClickHouse for baseline comparison", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "--dataset", + choices=["clickbench", "h2o", "custom"], + required=True, + help="Dataset type", + ) + parser.add_argument( + "--file-path", + required=True, + help="Path to the source data file", + ) + parser.add_argument( + "--clickhouse-url", + default=DEFAULT_CLICKHOUSE_URL, + help=f"ClickHouse HTTP URL (default: {DEFAULT_CLICKHOUSE_URL})", + ) + parser.add_argument( + "--init-sql-file", + default=None, + help="DDL SQL file to run before loading (CREATE TABLE ...)", + ) + parser.add_argument( + "--table-name", + default=None, + help="Target table name (required for --dataset custom)", + ) + parser.add_argument( + "--ts-column", + default=None, + help="Timestamp column name (for --dataset custom)", + ) + parser.add_argument( + "--ts-assignment", + choices=["synthetic", "passthrough"], + default="passthrough", + help="How to assign timestamps for custom CSV data (default: passthrough)", + ) + parser.add_argument( + "--skip-table-init", + action="store_true", + help="Skip CREATE TABLE (assume tables already exist)", + ) + parser.add_argument( + "--skip-if-loaded", + action="store_true", + help="Skip insert if the table already has rows", + ) + parser.add_argument( + "--max-rows", + type=int, + default=0, + help="Maximum rows to load (0 = all)", + ) + + args = parser.parse_args() + + if args.dataset == "custom" and not args.table_name: + parser.error("--table-name is required when --dataset custom") + + success = False + if args.dataset == "clickbench": + success = load_clickbench( + args.clickhouse_url, + args.file_path, + init_sql_file=args.init_sql_file, + skip_table_init=args.skip_table_init, + skip_if_loaded=args.skip_if_loaded, + max_rows=args.max_rows, + ) + elif args.dataset == "h2o": + success = load_h2o( + args.clickhouse_url, + args.file_path, + init_sql_file=args.init_sql_file, + skip_table_init=args.skip_table_init, + skip_if_loaded=args.skip_if_loaded, + max_rows=args.max_rows, + ) + else: + success = load_custom( + args.clickhouse_url, + args.file_path, + table_name=args.table_name, + ts_column=args.ts_column, + ts_assignment=args.ts_assignment, + init_sql_file=args.init_sql_file, + skip_table_init=args.skip_table_init, + skip_if_loaded=args.skip_if_loaded, + max_rows=args.max_rows, + ) + + sys.exit(0 if success else 1) + + +if __name__ == "__main__": + main() diff --git a/asap-tools/execution-utilities/benchmark/generate_queries.py b/asap-tools/execution-utilities/benchmark/generate_queries.py new file mode 100644 index 0000000..1398910 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/generate_queries.py @@ -0,0 +1,390 @@ +#!/usr/bin/env python3 +""" +Generate paired ASAP and ClickHouse SQL query files for benchmarking. + +Each query targets a fixed time window (window-end timestamp) and matches the +annotation format `-- T{NNN}: description` expected by run_benchmark.py. + +Output: + {prefix}_asap.sql QUANTILE(q, col) syntax for QueryEngineRust + {prefix}_clickhouse.sql quantile(q)(col) syntax for ClickHouse baseline + +Usage: + # Auto-detect timestamps from data file + python generate_queries.py \\ + --table-name hits \\ + --ts-column EventTime \\ + --value-column ResolutionWidth \\ + --group-by-columns RegionID,OS,UserAgent,TraficSourceID \\ + --window-size 10 \\ + --num-queries 50 \\ + --auto-detect-timestamps \\ + --data-file ./data/hits.json.gz \\ + --data-file-format json.gz \\ + --output-prefix ./queries/clickbench + + # Explicit timestamp file (one ISO timestamp per line) + python generate_queries.py \\ + --table-name h2o_groupby \\ + --ts-column timestamp \\ + --value-column v1 \\ + --group-by-columns id1,id2 \\ + --window-size 10 \\ + --num-queries 50 \\ + --timestamps-file ./my_timestamps.txt \\ + --output-prefix ./queries/h2o +""" + +import argparse +import gzip +import json +import sys +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import List, Optional + + +SAMPLE_SIZE = 10_000 # rows to read for timestamp auto-detection + + +def _parse_timestamp(value: str) -> Optional[datetime]: + """Try to parse a timestamp string in common formats.""" + value = str(value).strip() + for fmt in ( + "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%dT%H:%M:%S.%fZ", + "%Y-%m-%dT%H:%M:%S", + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%d", + ): + try: + return datetime.strptime(value, fmt).replace(tzinfo=timezone.utc) + except ValueError: + pass + # Try unix seconds/millis (numeric string) + try: + v = float(value) + if v > 1e12: # millis + return datetime.fromtimestamp(v / 1000, tz=timezone.utc) + return datetime.fromtimestamp(v, tz=timezone.utc) + except ValueError: + pass + return None + + +def _read_timestamps_from_json( + file_path: str, ts_column: str, compressed: bool +) -> List[datetime]: + """Read up to SAMPLE_SIZE timestamps from a JSON-lines file.""" + timestamps = [] + opener = gzip.open if compressed else open + mode = "rt" if compressed else "r" + with opener(file_path, mode) as f: + for i, line in enumerate(f): + if i >= SAMPLE_SIZE: + break + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + val = obj.get(ts_column) + if val is not None: + ts = _parse_timestamp(val) + if ts: + timestamps.append(ts) + except (json.JSONDecodeError, KeyError): + continue + return timestamps + + +def _read_timestamps_from_csv( + file_path: str, ts_column: str +) -> List[datetime]: + """Read up to SAMPLE_SIZE timestamps from a CSV file.""" + import csv + timestamps = [] + with open(file_path, "r", newline="") as f: + reader = csv.DictReader(f) + if ts_column not in (reader.fieldnames or []): + print( + f"WARNING: Column '{ts_column}' not found in CSV. " + f"Available: {reader.fieldnames}" + ) + return [] + for i, row in enumerate(reader): + if i >= SAMPLE_SIZE: + break + ts = _parse_timestamp(row[ts_column]) + if ts: + timestamps.append(ts) + return timestamps + + +def detect_timestamps( + data_file: str, data_file_format: str, ts_column: str +) -> tuple: + """Return (min_ts, max_ts) from a sample of the data file.""" + fmt = data_file_format.lower() + if fmt in ("json.gz", "jsonl.gz"): + timestamps = _read_timestamps_from_json(data_file, ts_column, compressed=True) + elif fmt in ("json", "jsonl"): + timestamps = _read_timestamps_from_json(data_file, ts_column, compressed=False) + elif fmt == "csv": + timestamps = _read_timestamps_from_csv(data_file, ts_column) + else: + print(f"ERROR: Unsupported data file format: {data_file_format}") + sys.exit(1) + + if not timestamps: + print( + f"ERROR: No '{ts_column}' timestamps found in the first {SAMPLE_SIZE} " + f"rows of {data_file}" + ) + sys.exit(1) + + return min(timestamps), max(timestamps) + + +def _snap_to_window_boundary(ts: datetime, window_size: int) -> datetime: + """Round a timestamp up to the next window boundary (epoch-aligned). + + Arroyo tumbling windows are aligned to epoch multiples of window_size. + Querying at a non-boundary timestamp will miss the sketch. + """ + epoch_sec = int(ts.timestamp()) + remainder = epoch_sec % window_size + if remainder == 0: + return ts + snapped = epoch_sec + (window_size - remainder) + return datetime.fromtimestamp(snapped, tz=timezone.utc) + + +def generate_window_ends( + min_ts: datetime, + max_ts: datetime, + window_size: int, + stride: int, + num_queries: int, +) -> List[datetime]: + """Generate evenly-spaced window-end timestamps within [min_ts, max_ts]. + + Timestamps are snapped to epoch-aligned window boundaries so that + Arroyo's tumbling window sketches can be found by QueryEngineRust. + """ + # First valid window-end: snap to next boundary after min_ts + window_size + earliest = min_ts + timedelta(seconds=window_size) + start = _snap_to_window_boundary(earliest, window_size) + if start >= max_ts: + print( + f"WARNING: window_size ({window_size}s) exceeds the data time range " + f"({(max_ts - min_ts).total_seconds():.0f}s). Using max_ts as only endpoint." + ) + return [max_ts] + + ends = [] + current = start + while current <= max_ts and len(ends) < num_queries: + ends.append(current) + current += timedelta(seconds=stride) + + return ends + + +def format_ts(ts: datetime, ts_format: str) -> str: + """Format a timestamp for SQL injection.""" + if ts_format == "iso": + return ts.strftime("%Y-%m-%dT%H:%M:%SZ") + else: # datetime + return ts.strftime("%Y-%m-%d %H:%M:%S") + + +def generate_sql_files( + table_name: str, + ts_column: str, + value_column: str, + group_by_columns: List[str], + quantile: float, + window_size: int, + window_ends: List[datetime], + ts_format: str, + window_form: str, + output_prefix: str, +): + """Write the paired ASAP and ClickHouse SQL files.""" + group_by_clause = ", ".join(group_by_columns) + asap_lines = [] + ch_lines = [] + + for i, end_ts in enumerate(window_ends): + end_str = format_ts(end_ts, ts_format) + start_ts = end_ts - timedelta(seconds=window_size) + start_str = format_ts(start_ts, ts_format) + label = f"T{i:03d}" + desc = f"quantile window ending at {end_str}" + + if window_form == "dateadd": + where_clause = ( + f"{ts_column} BETWEEN DATEADD(s, -{window_size}, '{end_str}') AND '{end_str}'" + ) + else: + where_clause = ( + f"{ts_column} BETWEEN '{start_str}' AND '{end_str}'" + ) + + asap_sql = ( + f"-- {label}: {desc}\n" + f"SELECT QUANTILE({quantile}, {value_column}) FROM {table_name} " + f"WHERE {where_clause} GROUP BY {group_by_clause};" + ) + ch_sql = ( + f"-- {label}: {desc}\n" + f"SELECT quantile({quantile})({value_column}) FROM {table_name} " + f"WHERE {where_clause} GROUP BY {group_by_clause};" + ) + + asap_lines.append(asap_sql) + ch_lines.append(ch_sql) + + asap_file = f"{output_prefix}_asap.sql" + ch_file = f"{output_prefix}_clickhouse.sql" + + Path(asap_file).parent.mkdir(parents=True, exist_ok=True) + + with open(asap_file, "w") as f: + f.write("\n".join(asap_lines) + "\n") + + with open(ch_file, "w") as f: + f.write("\n".join(ch_lines) + "\n") + + print(f"Generated {len(window_ends)} queries:") + print(f" ASAP: {asap_file}") + print(f" ClickHouse: {ch_file}") + + +def main(): + parser = argparse.ArgumentParser( + description="Generate paired ASAP + ClickHouse SQL query files", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + # Table/column config + parser.add_argument("--table-name", required=True) + parser.add_argument("--ts-column", required=True, help="Timestamp column name") + parser.add_argument("--value-column", required=True, help="Column to compute quantile on") + parser.add_argument( + "--group-by-columns", + required=True, + help="Comma-separated GROUP BY columns", + ) + # Query parameters + parser.add_argument("--quantile", type=float, default=0.95) + parser.add_argument("--window-size", type=int, default=10, help="Window size in seconds") + parser.add_argument("--num-queries", type=int, default=50) + parser.add_argument( + "--ts-format", + choices=["iso", "datetime"], + default="iso", + help="Timestamp format in SQL: iso='YYYY-MM-DDTHH:MM:SSZ', datetime='YYYY-MM-DD HH:MM:SS' (default: iso)", + ) + parser.add_argument( + "--window-form", + choices=["explicit", "dateadd"], + default="explicit", + help="SQL window form: explicit='BETWEEN start AND end', dateadd='BETWEEN DATEADD(s,-N,end) AND end' (default: explicit)", + ) + parser.add_argument( + "--output-prefix", + required=True, + help="Output file prefix (e.g. ./queries/clickbench → clickbench_asap.sql + clickbench_clickhouse.sql)", + ) + # Timestamp sources (mutually exclusive) + ts_group = parser.add_mutually_exclusive_group(required=True) + ts_group.add_argument( + "--auto-detect-timestamps", + action="store_true", + help="Scan data file to determine time range", + ) + ts_group.add_argument( + "--timestamps-file", + default=None, + help="File with explicit window-end timestamps (one ISO timestamp per line)", + ) + # Auto-detect options + parser.add_argument( + "--data-file", + default=None, + help="Path to data file (required with --auto-detect-timestamps)", + ) + parser.add_argument( + "--data-file-format", + choices=["json", "jsonl", "json.gz", "jsonl.gz", "csv"], + default="json", + help="Data file format (default: json)", + ) + parser.add_argument( + "--stride-seconds", + type=int, + default=None, + help="Spacing between window-end timestamps (default: window-size * 3)", + ) + + args = parser.parse_args() + + if args.auto_detect_timestamps and not args.data_file: + parser.error("--data-file is required when --auto-detect-timestamps is set") + + group_by_columns = [c.strip() for c in args.group_by_columns.split(",")] + stride = args.stride_seconds if args.stride_seconds else args.window_size * 3 + + # Determine window-end timestamps + if args.timestamps_file: + window_ends = [] + with open(args.timestamps_file) as f: + for line in f: + line = line.strip() + if not line: + continue + ts = _parse_timestamp(line) + if ts: + window_ends.append(ts) + else: + print(f"WARNING: Could not parse timestamp: {line!r}") + if not window_ends: + print("ERROR: No valid timestamps found in --timestamps-file") + sys.exit(1) + window_ends = window_ends[: args.num_queries] + print( + f"Using {len(window_ends)} timestamps from {args.timestamps_file} " + f"({window_ends[0]} – {window_ends[-1]})" + ) + else: + print(f"Scanning {args.data_file} for timestamp range...") + min_ts, max_ts = detect_timestamps( + args.data_file, args.data_file_format, args.ts_column + ) + print(f" Detected range: {min_ts} – {max_ts}") + window_ends = generate_window_ends( + min_ts, max_ts, args.window_size, stride, args.num_queries + ) + print( + f" Generated {len(window_ends)} window endpoints " + f"(stride={stride}s, window={args.window_size}s)" + ) + + generate_sql_files( + table_name=args.table_name, + ts_column=args.ts_column, + value_column=args.value_column, + group_by_columns=group_by_columns, + quantile=args.quantile, + window_size=args.window_size, + window_ends=window_ends, + ts_format=args.ts_format, + window_form=args.window_form, + output_prefix=args.output_prefix, + ) + + +if __name__ == "__main__": + main() diff --git a/asap-tools/execution-utilities/benchmark/prepare_data.py b/asap-tools/execution-utilities/benchmark/prepare_data.py new file mode 100644 index 0000000..33bc207 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/prepare_data.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +""" +Prepare data files for use with the Arroyo file source. + +The Arroyo file source (single_file_custom connector) requires: + - JSON-lines format + - Timestamps in RFC3339 format (e.g. "2013-07-14T20:38:47Z") + - Metadata columns (GROUP BY columns) as strings + - Value columns as floats + +This script converts raw downloaded datasets into the right format. + +Usage: + # ClickBench: convert hits.json.gz → hits_arroyo.json + python prepare_data.py --dataset clickbench \\ + --input ./data/hits.json.gz \\ + --output ./data/hits_arroyo.json \\ + [--max-rows 1000000] + + # H2O: convert G1_1e7_1e2_0_0.csv → h2o_arroyo.json (adds synthetic timestamps) + python prepare_data.py --dataset h2o \\ + --input ./data/G1_1e7_1e2_0_0.csv \\ + --output ./data/h2o_arroyo.json \\ + [--max-rows 1000000] +""" + +import argparse +import gzip +import json +import sys +from datetime import datetime, timedelta, timezone +from pathlib import Path + +# Synthetic timestamp base for H2O (2024-01-01T00:00:00Z) +H2O_BASE_EPOCH = 1704067200 +H2O_ROWS_PER_SECOND = 1000 + +# ClickBench columns needed by Arroyo (must match streaming_config.yaml) +CB_TIMESTAMP_FIELD = "EventTime" +CB_VALUE_FIELDS = ["ResolutionWidth"] +CB_METADATA_FIELDS = ["RegionID", "OS", "UserAgent", "TraficSourceID"] +CB_KEEP_FIELDS = [CB_TIMESTAMP_FIELD] + CB_VALUE_FIELDS + CB_METADATA_FIELDS + +# H2O columns +H2O_TIMESTAMP_FIELD = "timestamp" +H2O_METADATA_FIELDS = ["id1", "id2"] +H2O_VALUE_FIELDS = ["v1"] + + +def _parse_clickbench_ts(ts_str: str) -> str: + """Convert 'YYYY-MM-DD HH:MM:SS' → 'YYYY-MM-DDTHH:MM:SSZ' (RFC3339).""" + try: + dt = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S") + return dt.strftime("%Y-%m-%dT%H:%M:%SZ") + except ValueError: + return ts_str # already RFC3339 or unknown format + + +def prepare_clickbench(input_path: str, output_path: str, max_rows: int = 0): + """Convert hits.json.gz to Arroyo-compatible JSON. + + - Converts EventTime to RFC3339 + - Stringifies integer metadata columns (RegionID, OS, UserAgent, TraficSourceID) + - Sorts by EventTime (required for Arroyo event-time watermarks) + - Writes only the fields needed by the streaming config + """ + print(f"Reading {input_path}...") + records = [] + + opener = gzip.open if input_path.endswith(".gz") else open + with opener(input_path, "rt") as f: + for i, line in enumerate(f): + if max_rows > 0 and i >= max_rows: + break + if i % 100_000 == 0 and i > 0: + print(f" Read {i:,} rows...", end="\r") + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + except json.JSONDecodeError: + continue + + ts = _parse_clickbench_ts(str(obj.get(CB_TIMESTAMP_FIELD, ""))) + record = {CB_TIMESTAMP_FIELD: ts} + for col in CB_VALUE_FIELDS: + record[col] = float(obj.get(col, 0)) + for col in CB_METADATA_FIELDS: + record[col] = str(obj.get(col, "")) + records.append(record) + + print(f"\nSorting {len(records):,} records by {CB_TIMESTAMP_FIELD}...") + records.sort(key=lambda r: r[CB_TIMESTAMP_FIELD]) + + print(f"Writing to {output_path}...") + with open(output_path, "w") as f: + for record in records: + f.write(json.dumps(record) + "\n") + + print(f"Done. {len(records):,} records written.") + if records: + print(f" Time range: {records[0][CB_TIMESTAMP_FIELD]} – {records[-1][CB_TIMESTAMP_FIELD]}") + + +def prepare_h2o(input_path: str, output_path: str, max_rows: int = 0): + """Convert H2O CSV to Arroyo-compatible JSON with synthetic timestamps. + + - Adds synthetic RFC3339 timestamps at H2O_ROWS_PER_SECOND rows/sec + starting from 2024-01-01T00:00:00Z + - Converts id4, id5, id6 to strings (metadata columns are expected as strings) + """ + print(f"Reading {input_path}...") + count = 0 + + with open(input_path, "r", encoding="utf-8") as fin, \ + open(output_path, "w") as fout: + + header = fin.readline().strip() + cols = header.split(",") + id_idx = {c: i for i, c in enumerate(cols)} + + for i, line in enumerate(fin): + if max_rows > 0 and i >= max_rows: + break + if i % 100_000 == 0 and i > 0: + print(f" Written {i:,} rows...", end="\r") + + parts = line.rstrip("\n").split(",") + abs_sec = H2O_BASE_EPOCH + i // H2O_ROWS_PER_SECOND + ms = i % H2O_ROWS_PER_SECOND + ts = datetime.fromtimestamp(abs_sec, tz=timezone.utc) + ts_str = ts.strftime("%Y-%m-%dT%H:%M:%S") + f".{ms:03d}Z" + + record = { + H2O_TIMESTAMP_FIELD: ts_str, + "id1": parts[id_idx["id1"]], + "id2": parts[id_idx["id2"]], + "id3": parts[id_idx["id3"]], + "id4": int(parts[id_idx["id4"]]), + "id5": int(parts[id_idx["id5"]]), + "id6": int(parts[id_idx["id6"]]), + "v1": float(parts[id_idx["v1"]]), + "v2": float(parts[id_idx["v2"]]), + "v3": float(parts[id_idx["v3"]]), + } + fout.write(json.dumps(record) + "\n") + count += 1 + + print(f"\nDone. {count:,} records written to {output_path}.") + first_ts = datetime.fromtimestamp(H2O_BASE_EPOCH, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + last_ts = datetime.fromtimestamp(H2O_BASE_EPOCH + count // H2O_ROWS_PER_SECOND, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + print(f" Time range: {first_ts} – {last_ts}") + + +def main(): + parser = argparse.ArgumentParser( + description="Prepare dataset files for Arroyo file source", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "--dataset", + choices=["clickbench", "h2o"], + required=True, + help="Dataset type to prepare", + ) + parser.add_argument("--input", required=True, help="Path to raw input file") + parser.add_argument("--output", required=True, help="Path to write prepared JSON file") + parser.add_argument( + "--max-rows", + type=int, + default=0, + help="Max rows to process (0 = all, default: 0)", + ) + args = parser.parse_args() + + Path(args.output).parent.mkdir(parents=True, exist_ok=True) + + if args.dataset == "clickbench": + prepare_clickbench(args.input, args.output, args.max_rows) + else: + prepare_h2o(args.input, args.output, args.max_rows) + + +if __name__ == "__main__": + main() diff --git a/asap-tools/execution-utilities/benchmark/requirements.txt b/asap-tools/execution-utilities/benchmark/requirements.txt new file mode 100644 index 0000000..8567631 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/requirements.txt @@ -0,0 +1,5 @@ +requests>=2.28 +gdown>=4.7 +pyyaml>=6.0 +matplotlib>=3.7 +numpy>=1.24 diff --git a/asap-tools/execution-utilities/benchmark/run_benchmark.py b/asap-tools/execution-utilities/benchmark/run_benchmark.py new file mode 100644 index 0000000..a196ace --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/run_benchmark.py @@ -0,0 +1,434 @@ +#!/usr/bin/env python3 +""" +Unified benchmark runner: ASAP (QueryEngineRust) vs ClickHouse baseline. + +Reads SQL files generated by generate_queries.py, sends each query to the +configured endpoint, and writes results to CSV. With --mode both, runs +baseline then ASAP and generates a latency comparison plot. + +Usage: + # Both modes with comparison plot + python run_benchmark.py \\ + --mode both \\ + --asap-sql-file ./queries/clickbench_asap.sql \\ + --baseline-sql-file ./queries/clickbench_clickhouse.sql \\ + --output-dir ./results + + # ASAP only + python run_benchmark.py \\ + --mode asap \\ + --asap-sql-file ./queries/h2o_asap.sql \\ + --output-dir ./results + + # Baseline only + python run_benchmark.py \\ + --mode baseline \\ + --baseline-sql-file ./queries/h2o_clickhouse.sql \\ + --output-dir ./results +""" + +import argparse +import csv +import re +import time +import urllib.parse +from pathlib import Path +from typing import List, Optional, Tuple + +import matplotlib.pyplot as plt +import numpy as np +import requests + +DEFAULT_ASAP_URL = "http://localhost:8088/clickhouse/query" +DEFAULT_CLICKHOUSE_URL = "http://localhost:8123/?session_timezone=UTC" +DEFAULT_OUTPUT_DIR = "./results" +DEFAULT_OUTPUT_PREFIX = "benchmark" + + +# --------------------------------------------------------------------------- +# Query extraction +# Reused from asap_query_latency/run_benchmark.py:extract_queries_from_sql() +# --------------------------------------------------------------------------- + + +def extract_queries_from_sql(sql_file: Path) -> List[Tuple[str, str]]: + """Extract (query_id, sql) pairs from an annotated SQL file. + + Expects lines of the form: + -- T001: description + SELECT ... ; + """ + with open(sql_file) as f: + content = f.read() + pattern = r"-- ([A-Za-z0-9_]+):[^\n]*\n(SELECT[^;]+;)" + return [ + (qid, sql.strip()) + for qid, sql in re.findall(pattern, content, re.DOTALL | re.IGNORECASE) + ] + + +# --------------------------------------------------------------------------- +# Query runner +# Adapted from asap_benchmark_pipeline/run_benchmark.py:run_query() +# Uses requests.Session for connection reuse across queries. +# --------------------------------------------------------------------------- + + +def run_query( + query: str, + endpoint_url: str, + session: requests.Session, + timeout: int = 30, + debug: bool = False, +) -> Tuple[float, Optional[str], Optional[str]]: + """Send a single SQL query and return (latency_ms, result_text, error).""" + encoded_query = urllib.parse.quote(query) + separator = "&" if "?" in endpoint_url else "?" + url = f"{endpoint_url}{separator}query={encoded_query}" + + try: + start = time.time() + response = session.get(url, timeout=timeout) + latency_ms = (time.time() - start) * 1000 + + if debug: + source = "OK" if response.status_code == 200 else f"HTTP {response.status_code}" + print(f" [{source}] {latency_ms:.2f}ms") + + if response.status_code == 200: + return latency_ms, response.text.strip(), None + else: + return latency_ms, None, f"HTTP {response.status_code}: {response.text[:200]}" + except requests.Timeout: + return timeout * 1000.0, None, "Timeout" + except Exception as e: + return 0.0, None, str(e) + + +# --------------------------------------------------------------------------- +# Benchmark runner +# Consolidated from both asap_query_latency/run_benchmark.py and +# asap_benchmark_pipeline/run_benchmark.py:run_benchmark(). +# --------------------------------------------------------------------------- + + +def _infer_pattern(query_id: str) -> str: + if query_id.startswith("ST"): + return "SpatioTemporal" + if query_id.startswith("S"): + return "Spatial" + if query_id.startswith("T"): + return "Temporal" + if query_id.startswith("N"): + return "Nested" + if query_id.startswith("D"): + return "Dated" + if query_id.startswith("L"): + return "LongRange" + return "Unknown" + + +def _latency_summary(latencies: List[float], label: str): + if not latencies: + return + s = sorted(latencies) + n = len(s) + print(f"\n{label} ({n} successful queries):") + print( + f" min={s[0]:.2f}ms avg={sum(s)/n:.2f}ms " + f"p50={s[int(n*0.50)]:.2f}ms p95={s[int(n*0.95)]:.2f}ms max={s[-1]:.2f}ms" + ) + + +def run_benchmark( + sql_file: Path, + endpoint_url: str, + output_csv: Path, + mode: str, + query_filter: Optional[List[str]] = None, + timeout: int = 30, + repeat: int = 1, + debug: bool = False, + no_plot: bool = False, +): + """Run all queries and write results to CSV. + + CSV columns: query_id, query_pattern, latency_ms, result_rows, + result_full, error, mode + """ + print(f"\nRunning benchmark in {mode.upper()} mode...") + print(f"Endpoint: {endpoint_url}") + print(f"SQL file: {sql_file}") + print(f"Output: {output_csv}") + if debug: + print("Debug: per-request HTTP status shown.") + + queries = extract_queries_from_sql(sql_file) + if query_filter: + queries = [(qid, sql) for qid, sql in queries if qid in query_filter] + print(f"Found {len(queries)} queries (repeat={repeat})") + + output_csv.parent.mkdir(parents=True, exist_ok=True) + session = requests.Session() + latencies_ok: List[float] = [] + plot_latencies: List[float] = [] + + with open(output_csv, "w", newline="") as csvfile: + writer = csv.writer(csvfile) + writer.writerow( + ["query_id", "query_pattern", "latency_ms", "result_rows", "result_full", "error", "mode"] + ) + + for query_id, sql in queries: + pattern = _infer_pattern(query_id) + print(f"Running {query_id}...", end=" " if not debug else "\n", flush=True) + + # Repeat and take median + trial_latencies = [] + last_result, last_error = None, None + for _ in range(repeat): + lat, result, error = run_query(sql, endpoint_url, session, timeout, debug) + trial_latencies.append(lat) + last_result, last_error = result, error + if error: + break # don't retry on error + + latency_ms = sorted(trial_latencies)[len(trial_latencies) // 2] + + if last_error: + print(f"ERROR {last_error}") + writer.writerow([query_id, pattern, f"{latency_ms:.2f}", 0, "", last_error, mode]) + plot_latencies.append(0.0) + else: + result_lines = last_result.strip().split("\n") if last_result else [] + num_rows = len(result_lines) + preview = last_result.replace("\n", " | ")[:200] if last_result else "" + latencies_ok.append(latency_ms) + plot_latencies.append(latency_ms) + print(f"{latency_ms:.2f}ms ({num_rows} rows)") + writer.writerow( + [query_id, pattern, f"{latency_ms:.2f}", num_rows, preview, "", mode] + ) + + time.sleep(0.1) + + print(f"\nResults saved to {output_csv}") + _latency_summary(latencies_ok, f"Latency summary") + + if not no_plot and plot_latencies: + _plot_single(plot_latencies, mode, output_csv.with_suffix(".png")) + + +def _plot_single(latencies: List[float], mode: str, out_path: Path): + """Bar chart of per-query latency for a single mode.""" + color = "#4682b4" if mode == "asap" else "#f4a460" + x = list(range(1, len(latencies) + 1)) + plt.figure(figsize=(12, 5)) + plt.bar(x, latencies, color=color, edgecolor="black") + plt.xlabel("Query Execution Order") + plt.ylabel("Latency (ms)") + plt.title(f"Query Latency — {mode.upper()} mode") + plt.grid(axis="y", linestyle="--", alpha=0.7) + plt.tight_layout() + plt.savefig(out_path, dpi=150) + plt.close() + print(f"Plot saved to {out_path}") + + +def _plot_comparison(asap_csv: Path, baseline_csv: Path, out_path: Path): + """Two-panel comparison plot: per-query bars + speedup bars. + + Adapted from asap_query_latency/plot_latency.py. + """ + def _load(path): + rows = {} + with open(path) as f: + for row in csv.DictReader(f): + if not row["error"]: + rows[row["query_id"]] = float(row["latency_ms"]) + return rows + + asap = _load(asap_csv) + base = _load(baseline_csv) + qids = sorted(set(asap) & set(base)) + if not qids: + print("WARNING: No common query IDs for comparison plot.") + return + + x = np.arange(len(qids)) + a_vals = [asap[q] for q in qids] + b_vals = [base[q] for q in qids] + speedup = [b / a if a > 0 else 0 for a, b in zip(a_vals, b_vals)] + + fig, (ax1, ax2) = plt.subplots( + 2, 1, figsize=(14, 7), gridspec_kw={"height_ratios": [3, 1]} + ) + + w = 0.4 + ax1.bar(x - w / 2, b_vals, w, label="ClickHouse baseline", color="#f4a460") + ax1.bar(x + w / 2, a_vals, w, label="ASAP (KLL sketch)", color="#4682b4") + ax1.set_xticks(x) + ax1.set_xticklabels(qids, rotation=90, fontsize=7) + ax1.set_ylabel("Latency (ms)") + ax1.set_title( + f"Query latency: ASAP vs ClickHouse baseline " + f"(p50: {np.median(a_vals):.1f}ms vs {np.median(b_vals):.1f}ms)" + ) + ax1.legend() + ax1.set_xlim(-0.6, len(qids) - 0.4) + + ax2.bar(x, speedup, color="#2e8b57", width=0.7) + ax2.axhline( + np.mean(speedup), + color="red", + linewidth=1, + linestyle="--", + label=f"mean {np.mean(speedup):.1f}×", + ) + ax2.set_xticks(x) + ax2.set_xticklabels(qids, rotation=90, fontsize=7) + ax2.set_ylabel("Speedup (×)") + ax2.legend(fontsize=8) + ax2.set_xlim(-0.6, len(qids) - 0.4) + + plt.tight_layout() + plt.savefig(out_path, dpi=150) + plt.close() + print(f"Comparison plot saved to {out_path}") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +def main(): + parser = argparse.ArgumentParser( + description="Benchmark ASAP vs ClickHouse baseline", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "--mode", + choices=["asap", "baseline", "both"], + default="both", + help="Which mode(s) to run (default: both)", + ) + parser.add_argument( + "--asap-sql-file", + default=None, + help="SQL file for ASAP mode (required if mode is asap or both)", + ) + parser.add_argument( + "--baseline-sql-file", + default=None, + help="SQL file for baseline mode (required if mode is baseline or both)", + ) + parser.add_argument( + "--asap-url", + default=DEFAULT_ASAP_URL, + help=f"QueryEngineRust endpoint (default: {DEFAULT_ASAP_URL})", + ) + parser.add_argument( + "--clickhouse-url", + default=DEFAULT_CLICKHOUSE_URL, + help=f"ClickHouse HTTP URL (default: {DEFAULT_CLICKHOUSE_URL})", + ) + parser.add_argument( + "--output-dir", + default=DEFAULT_OUTPUT_DIR, + help=f"Directory for results (default: {DEFAULT_OUTPUT_DIR})", + ) + parser.add_argument( + "--output-prefix", + default=DEFAULT_OUTPUT_PREFIX, + help=f"Prefix for output files (default: {DEFAULT_OUTPUT_PREFIX})", + ) + parser.add_argument( + "--query-filter", + default=None, + help="Comma-separated query IDs to run (e.g. T000,T001)", + ) + parser.add_argument( + "--repeat", + type=int, + default=1, + help="Run each query N times and report the median (default: 1)", + ) + parser.add_argument( + "--timeout", + type=int, + default=30, + help="Per-query timeout in seconds (default: 30)", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Show per-query HTTP status", + ) + parser.add_argument( + "--no-plot", + action="store_true", + help="Do not generate any plots", + ) + # Ignored flag for backward compatibility + parser.add_argument( + "--measure-pipeline-overhead", + action="store_true", + help="(No-op) Pipeline overhead measurement is not applicable with file source", + ) + + args = parser.parse_args() + + if args.measure_pipeline_overhead: + print( + "WARNING: --measure-pipeline-overhead is not applicable when using " + "file source (no Kafka ingest). Ignoring." + ) + + # Validate required SQL files + if args.mode in ("asap", "both") and not args.asap_sql_file: + parser.error("--asap-sql-file is required when --mode is asap or both") + if args.mode in ("baseline", "both") and not args.baseline_sql_file: + parser.error("--baseline-sql-file is required when --mode is baseline or both") + + output_dir = Path(args.output_dir) + prefix = args.output_prefix + query_filter = [q.strip() for q in args.query_filter.split(",")] if args.query_filter else None + + asap_csv = output_dir / f"{prefix}_asap.csv" + baseline_csv = output_dir / f"{prefix}_baseline.csv" + + if args.mode in ("baseline", "both"): + run_benchmark( + sql_file=Path(args.baseline_sql_file), + endpoint_url=args.clickhouse_url, + output_csv=baseline_csv, + mode="baseline", + query_filter=query_filter, + timeout=args.timeout, + repeat=args.repeat, + debug=args.debug, + no_plot=args.no_plot, + ) + + if args.mode in ("asap", "both"): + run_benchmark( + sql_file=Path(args.asap_sql_file), + endpoint_url=args.asap_url, + output_csv=asap_csv, + mode="asap", + query_filter=query_filter, + timeout=args.timeout, + repeat=args.repeat, + debug=args.debug, + no_plot=args.no_plot, + ) + + if args.mode == "both" and not args.no_plot: + comparison_png = output_dir / f"{prefix}_comparison.png" + _plot_comparison(asap_csv, baseline_csv, comparison_png) + + +if __name__ == "__main__": + main()