Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions asap-tools/execution-utilities/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@

clickhouse-benchmark-pipeline/benchmark_results/
**/data/

**/*.csv
**/*.png
205 changes: 205 additions & 0 deletions asap-tools/execution-utilities/asap_query_latency/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
# ASAP Benchmark: ClickBench `hits` Dataset

Measures ASAP query latency (KLL sketch) against 50 fixed-timestamp QUANTILE 0.95 queries over 1M ClickBench rows.

## Data Flow

```
hits.json.gz
└─> data_exporter (sort by EventTime → Kafka hits topic)
|
┌────────────────┴────────────────┐
↓ ↓
Arroyo (KLL sketch, 1-sec windows) ClickHouse (Kafka engine)
|
sketch_topic (Kafka)
|
QueryEngineRust :8088
|
run_benchmark.py → asap_results.csv
```

---

## Prerequisites

```bash
export INSTALL_DIR=/scratch/sketch_db_for_prometheus
```

### Build binaries (one-time)

```bash
# QueryEngineRust
cd ~/ASAPQuery/asap-query-engine && cargo build --release

# data_exporter
cd ~/ASAPQuery/asap-tools/execution-utilities/clickhouse-benchmark-pipeline/data_exporter
cargo build --release
```

### Download and chunk ClickBench data (one-time)

```bash
cd ~/ASAPQuery/asap-tools/execution-utilities/clickhouse-benchmark-pipeline
python3 benchmark_importer/download_data.py

DATA_DIR=~/ASAPQuery/asap-tools/execution-utilities/clickhouse-benchmark-pipeline/benchmark_importer/data
cd $DATA_DIR
mv hits.json.gz hits_full.json.gz
zcat hits_full.json.gz | head -n 1000000 | gzip > hits.json.gz
```

### Python dependencies (one-time)

```bash
pip3 install --user requests matplotlib numpy
pip3 install --user ~/ASAPQuery/asap-common/dependencies/py/promql_utilities/
```

---

## Run the Pipeline

**Order matters**: create the Arroyo pipeline before producing data (connection table uses `offset: latest`).

### Step 1 — Start Kafka

```bash
~/ASAPQuery/asap-tools/installation/kafka/run.sh $INSTALL_DIR/kafka
```

### Step 2 — Create Kafka topics

```bash
KAFKA=$INSTALL_DIR/kafka/bin
$KAFKA/kafka-topics.sh --bootstrap-server localhost:9092 --create \
--topic hits --partitions 1 --replication-factor 1
$KAFKA/kafka-topics.sh --bootstrap-server localhost:9092 --create \
--topic sketch_topic --partitions 1 --replication-factor 1 \
--config max.message.bytes=20971520
```

### Step 3 — Start ClickHouse

```bash
~/ASAPQuery/asap-tools/installation/clickhouse/run.sh $INSTALL_DIR
```

### Step 4 — Init ClickHouse tables

```bash
cd ~/ASAPQuery/asap-tools/execution-utilities/clickhouse-benchmark-pipeline
CLICKHOUSE_BIN=$INSTALL_DIR/clickhouse bash scripts/init_clickhouse.sh
```

### Step 5 — Start Arroyo cluster

```bash
~/ASAPQuery/asap-sketch-ingest/target/release/arroyo \
--config ~/ASAPQuery/asap-sketch-ingest/config.yaml cluster \
> /tmp/arroyo.log 2>&1 &
```

### Step 6 — Create ArroyoSketch pipeline

```bash
cd ~/ASAPQuery/asap-sketch-ingest
python3 run_arroyosketch.py \
--source_type kafka \
--kafka_input_format json \
--input_kafka_topic hits \
--output_format json \
--pipeline_name asap_hits_pipeline \
--config_file_path ~/ASAPQuery/asap-tools/execution-utilities/asap_query_latency/streaming_config.yaml \
--output_kafka_topic sketch_topic \
--output_dir ./outputs \
--parallelism 1 \
--query_language sql
```

### Step 7 — Start QueryEngineRust

```bash
cd ~/ASAPQuery/asap-query-engine
nohup ./target/release/query_engine_rust \
--kafka-topic sketch_topic --input-format json \
--config ~/ASAPQuery/asap-tools/execution-utilities/asap_query_latency/inference_config.yaml \
--streaming-config ~/ASAPQuery/asap-tools/execution-utilities/asap_query_latency/streaming_config.yaml \
--http-port 8088 --delete-existing-db --log-level DEBUG \
--output-dir ./output --streaming-engine arroyo \
--query-language SQL --lock-strategy per-key \
--prometheus-scrape-interval 1 > /tmp/query_engine.log 2>&1 &
```

### Step 8 — Produce ClickBench data

`data_exporter` reads all 1M records, transforms them (RFC3339 EventTime, stringify integer fields), sorts by EventTime, then sends to Kafka with event-time Kafka timestamps. This enables Arroyo's watermark to advance correctly for event-time windowing.

```bash
cd ~/ASAPQuery/asap-tools/execution-utilities/clickhouse-benchmark-pipeline
TOTAL_RECORDS=1000000 DATA_MODE=clickbench bash scripts/generate_data.sh
```

Wait for completion before running the benchmark.

### Step 9 — Run ASAP benchmark

```bash
cd ~/ASAPQuery/asap-tools/execution-utilities/asap_query_latency
./run_benchmark.py \
--mode asap \
--sql-file asap_quantile_queries.sql \
--output asap_results.csv
```

### Step 10 — Run baseline benchmark (ClickHouse)

Wait for Step 8 data ingestion to complete, then verify ClickHouse has all rows:

```bash
$INSTALL_DIR/clickhouse client --query "SELECT count(*) FROM hits"
# Should return 1000000
```

Run the benchmark:

```bash
cd ~/ASAPQuery/asap-tools/execution-utilities/asap_query_latency
./run_benchmark.py \
--mode baseline \
--sql-file clickhouse_quantile_queries.sql \
--output baseline_results.csv \
--clickhouse-url http://localhost:8123
```

---

## Reset (re-run from scratch)

```bash
# Kill workers and Arroyo cluster
pkill -f "arroyo"; pkill -f "query_engine_rust"; pkill -f "data_exporter"
sleep 2

# Stop Kafka and ClickHouse
pkill -f "kafka-server-start.sh"; pkill -f "clickhouse server"
sleep 2

# Clear Arroyo checkpoint state (REQUIRED — old watermark causes all windows to be skipped)
rm -rf /tmp/arroyo/

# Delete Kafka topics
KAFKA=$INSTALL_DIR/kafka/bin
$KAFKA/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic hits
$KAFKA/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic sketch_topic

# Delete old Arroyo pipeline
cd ~/ASAPQuery/asap-sketch-ingest
python3 delete_pipeline.py --all_pipelines

# Clear ClickHouse table
$INSTALL_DIR/clickhouse client --query "TRUNCATE TABLE hits"
```

Then repeat Steps 1–9.
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
-- T000: quantile window ending at 2013-07-14 20:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-14 20:00:00') AND '2013-07-14 20:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T001: quantile window ending at 2013-07-14 20:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-14 20:30:00') AND '2013-07-14 20:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T002: quantile window ending at 2013-07-14 21:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-14 21:00:00') AND '2013-07-14 21:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T003: quantile window ending at 2013-07-14 21:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-14 21:30:00') AND '2013-07-14 21:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T004: quantile window ending at 2013-07-14 22:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-14 22:00:00') AND '2013-07-14 22:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T005: quantile window ending at 2013-07-14 22:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-14 22:30:00') AND '2013-07-14 22:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T006: quantile window ending at 2013-07-14 23:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-14 23:00:00') AND '2013-07-14 23:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T007: quantile window ending at 2013-07-14 23:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-14 23:30:00') AND '2013-07-14 23:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T008: quantile window ending at 2013-07-15 00:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 00:00:00') AND '2013-07-15 00:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T009: quantile window ending at 2013-07-15 00:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 00:30:00') AND '2013-07-15 00:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T010: quantile window ending at 2013-07-15 01:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 01:00:00') AND '2013-07-15 01:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T011: quantile window ending at 2013-07-15 01:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 01:30:00') AND '2013-07-15 01:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T012: quantile window ending at 2013-07-15 02:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 02:00:00') AND '2013-07-15 02:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T013: quantile window ending at 2013-07-15 02:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 02:30:00') AND '2013-07-15 02:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T014: quantile window ending at 2013-07-15 03:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 03:00:00') AND '2013-07-15 03:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T015: quantile window ending at 2013-07-15 03:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 03:30:00') AND '2013-07-15 03:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T016: quantile window ending at 2013-07-15 04:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 04:00:00') AND '2013-07-15 04:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T017: quantile window ending at 2013-07-15 04:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 04:30:00') AND '2013-07-15 04:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T018: quantile window ending at 2013-07-15 05:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 05:00:00') AND '2013-07-15 05:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T019: quantile window ending at 2013-07-15 05:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 05:30:00') AND '2013-07-15 05:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T020: quantile window ending at 2013-07-15 06:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 06:00:00') AND '2013-07-15 06:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T021: quantile window ending at 2013-07-15 06:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 06:30:00') AND '2013-07-15 06:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T022: quantile window ending at 2013-07-15 07:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 07:00:00') AND '2013-07-15 07:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T023: quantile window ending at 2013-07-15 07:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 07:30:00') AND '2013-07-15 07:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T024: quantile window ending at 2013-07-15 08:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 08:00:00') AND '2013-07-15 08:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T025: quantile window ending at 2013-07-15 08:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 08:30:00') AND '2013-07-15 08:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T026: quantile window ending at 2013-07-15 09:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 09:00:00') AND '2013-07-15 09:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T027: quantile window ending at 2013-07-15 09:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 09:30:00') AND '2013-07-15 09:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T028: quantile window ending at 2013-07-15 10:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 10:00:00') AND '2013-07-15 10:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T029: quantile window ending at 2013-07-15 10:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 10:30:00') AND '2013-07-15 10:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T030: quantile window ending at 2013-07-15 11:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 11:00:00') AND '2013-07-15 11:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T031: quantile window ending at 2013-07-15 11:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 11:30:00') AND '2013-07-15 11:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T032: quantile window ending at 2013-07-15 12:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 12:00:00') AND '2013-07-15 12:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T033: quantile window ending at 2013-07-15 12:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 12:30:00') AND '2013-07-15 12:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T034: quantile window ending at 2013-07-15 13:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 13:00:00') AND '2013-07-15 13:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T035: quantile window ending at 2013-07-15 13:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 13:30:00') AND '2013-07-15 13:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T036: quantile window ending at 2013-07-15 14:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 14:00:00') AND '2013-07-15 14:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T037: quantile window ending at 2013-07-15 14:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 14:30:00') AND '2013-07-15 14:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T038: quantile window ending at 2013-07-15 15:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 15:00:00') AND '2013-07-15 15:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T039: quantile window ending at 2013-07-15 15:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 15:30:00') AND '2013-07-15 15:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T040: quantile window ending at 2013-07-15 16:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 16:00:00') AND '2013-07-15 16:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T041: quantile window ending at 2013-07-15 16:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 16:30:00') AND '2013-07-15 16:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T042: quantile window ending at 2013-07-15 17:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 17:00:00') AND '2013-07-15 17:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T043: quantile window ending at 2013-07-15 17:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 17:30:00') AND '2013-07-15 17:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T044: quantile window ending at 2013-07-15 18:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 18:00:00') AND '2013-07-15 18:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T045: quantile window ending at 2013-07-15 18:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 18:30:00') AND '2013-07-15 18:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T046: quantile window ending at 2013-07-15 19:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 19:00:00') AND '2013-07-15 19:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T047: quantile window ending at 2013-07-15 19:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 19:30:00') AND '2013-07-15 19:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T048: quantile window ending at 2013-07-15 20:00:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 20:00:00') AND '2013-07-15 20:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
-- T049: quantile window ending at 2013-07-15 20:30:00
SELECT QUANTILE(0.95, ResolutionWidth) FROM hits WHERE EventTime BETWEEN DATEADD(s, -10, '2013-07-15 20:30:00') AND '2013-07-15 20:30:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID;
Loading
Loading