diff --git a/.github/workflows/accuracy_performance.yml b/.github/workflows/accuracy_performance.yml new file mode 100644 index 0000000..b27089d --- /dev/null +++ b/.github/workflows/accuracy_performance.yml @@ -0,0 +1,163 @@ +name: PR Evaluation + +# NOTE: GitHub-hosted runners are noisy. Latency numbers are indicative only. +# For precise benchmarks, register a self-hosted runner once asap-tools infra +# is decoupled from Cloudlab. See PDF eval guide Phase 3. + +on: + pull_request: + branches: + - main + paths: + - 'asap-query-engine/**' + - 'asap-planner-rs/**' + - 'asap-summary-ingest/**' + - 'asap-quickstart/**' + - '.github/workflows/accuracy_performance.yml' + - 'benchmarks/**' + workflow_dispatch: + +permissions: + contents: read + packages: write + pull-requests: write + +jobs: + # --------------------------------------------------------------------------- + # Job 1: build images once from branch code and push with a SHA-based tag. + # All downstream jobs pull these images instead of rebuilding. + # --------------------------------------------------------------------------- + build: + name: Build CI images + runs-on: ubuntu-latest + outputs: + image-tag: ${{ steps.tag.outputs.value }} + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to GHCR + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.repository_owner }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Compute image tag + id: tag + run: echo "value=sha-$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT + + - name: Build and push asap-planner-rs + uses: docker/build-push-action@v6 + with: + context: . + file: asap-planner-rs/Dockerfile + push: true + tags: ghcr.io/projectasap/asap-planner-rs:${{ steps.tag.outputs.value }} + cache-from: type=registry,ref=ghcr.io/projectasap/asap-planner-rs:buildcache + cache-to: type=registry,ref=ghcr.io/projectasap/asap-planner-rs:buildcache,mode=max + + - name: Build and push asap-summary-ingest + uses: docker/build-push-action@v6 + with: + context: asap-summary-ingest + file: asap-summary-ingest/Dockerfile + push: true + tags: ghcr.io/projectasap/asap-summary-ingest:${{ steps.tag.outputs.value }} + build-args: BASE_IMAGE=ghcr.io/projectasap/asap-base:latest + + - name: Build and push asap-query-engine + uses: docker/build-push-action@v6 + with: + context: . + file: asap-query-engine/Dockerfile + push: true + tags: ghcr.io/projectasap/asap-query-engine:${{ steps.tag.outputs.value }} + cache-from: type=registry,ref=ghcr.io/projectasap/asap-query-engine:buildcache + cache-to: type=registry,ref=ghcr.io/projectasap/asap-query-engine:buildcache,mode=max + + # --------------------------------------------------------------------------- + # Job 2: pull the images built above, deploy the full stack, and evaluate. + # --------------------------------------------------------------------------- + eval: + name: Full-stack PR evaluation + needs: build + runs-on: ubuntu-latest + timeout-minutes: 60 + env: + ASAP_IMAGE_TAG: ${{ needs.build.outputs.image-tag }} + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Log in to GHCR + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.repository_owner }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Pull and start full stack + run: | + docker compose \ + -f asap-quickstart/docker-compose.yml \ + -f benchmarks/docker-compose.yml \ + up -d + + - name: Show running containers + run: | + docker compose \ + -f asap-quickstart/docker-compose.yml \ + -f benchmarks/docker-compose.yml \ + ps + + - name: Wait for all services to be healthy + run: bash benchmarks/scripts/wait_for_stack.sh + + - name: Wait for pipeline and data ingestion + run: bash benchmarks/scripts/ingest_wait.sh + + - name: Set up Python 3.11 + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install Python dependencies + run: pip install requests + + - name: Run baseline queries (Prometheus) + run: python benchmarks/scripts/run_baseline.py + + - name: Run ASAP queries (query engine) + run: python benchmarks/scripts/run_asap.py + + - name: Compare results and evaluate + run: python benchmarks/scripts/compare.py + + - name: Upload evaluation reports + if: always() + uses: actions/upload-artifact@v4 + with: + name: eval-reports-${{ github.run_id }} + path: benchmarks/reports/ + + - name: Print docker logs on failure + if: failure() + run: | + docker compose \ + -f asap-quickstart/docker-compose.yml \ + -f benchmarks/docker-compose.yml \ + logs --no-color + + - name: Teardown stack + if: always() + run: | + docker compose \ + -f asap-quickstart/docker-compose.yml \ + -f benchmarks/docker-compose.yml \ + down -v diff --git a/asap-summary-ingest/Dockerfile b/asap-summary-ingest/Dockerfile index 8432840..f8a193a 100644 --- a/asap-summary-ingest/Dockerfile +++ b/asap-summary-ingest/Dockerfile @@ -1,4 +1,5 @@ -FROM sketchdb-base:latest +ARG BASE_IMAGE=sketchdb-base:latest +FROM ${BASE_IMAGE} LABEL maintainer="SketchDB Team" LABEL description="ArroyoSketch pipeline configuration service" diff --git a/benchmarks/docker-compose.yml b/benchmarks/docker-compose.yml new file mode 100644 index 0000000..fea9921 --- /dev/null +++ b/benchmarks/docker-compose.yml @@ -0,0 +1,20 @@ +# CI image override: replaces quickstart's pinned release images with images +# built from the current branch. Intended for use as a Compose override: +# +# ASAP_IMAGE_TAG=sha- docker compose \ +# --project-directory . \ +# -f asap-quickstart/docker-compose.yml \ +# -f benchmarks/docker-compose.yml \ +# up -d +# +# ASAP_IMAGE_TAG is set automatically by the 'build' job in accuracy_performance.yml. + +services: + asap-planner-rs: + image: ghcr.io/projectasap/asap-planner-rs:${ASAP_IMAGE_TAG} + + asap-summary-ingest: + image: ghcr.io/projectasap/asap-summary-ingest:${ASAP_IMAGE_TAG} + + queryengine: + image: ghcr.io/projectasap/asap-query-engine:${ASAP_IMAGE_TAG} diff --git a/benchmarks/golden/.gitkeep b/benchmarks/golden/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/benchmarks/queries/promql_suite.json b/benchmarks/queries/promql_suite.json new file mode 100644 index 0000000..53c16bd --- /dev/null +++ b/benchmarks/queries/promql_suite.json @@ -0,0 +1,18 @@ +{ + "queries": [ + {"id": "avg_all", "expr": "avg(sensor_reading)", "approximate": false}, + {"id": "sum_all", "expr": "sum(sensor_reading)", "approximate": false}, + {"id": "max_all", "expr": "max(sensor_reading)", "approximate": false}, + {"id": "min_all", "expr": "min(sensor_reading)", "approximate": false}, + {"id": "q50_all", "expr": "quantile(0.50, sensor_reading)", "approximate": true}, + {"id": "q90_all", "expr": "quantile(0.90, sensor_reading)", "approximate": true}, + {"id": "q95_all", "expr": "quantile(0.95, sensor_reading)", "approximate": true}, + {"id": "q99_all", "expr": "quantile(0.99, sensor_reading)", "approximate": true}, + {"id": "q95_by_pattern", "expr": "quantile by (pattern) (0.95, sensor_reading)", "approximate": true}, + {"id": "q99_by_pattern", "expr": "quantile by (pattern) (0.99, sensor_reading)", "approximate": true}, + {"id": "q50_by_pattern", "expr": "quantile by (pattern) (0.50, sensor_reading)", "approximate": true}, + {"id": "avg_by_pattern", "expr": "avg by (pattern) (sensor_reading)", "approximate": false}, + {"id": "sum_by_region", "expr": "sum by (region) (sensor_reading)", "approximate": false}, + {"id": "max_by_service", "expr": "max by (service) (sensor_reading)", "approximate": false} + ] +} diff --git a/benchmarks/reports/.gitkeep b/benchmarks/reports/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/benchmarks/scripts/compare.py b/benchmarks/scripts/compare.py new file mode 100644 index 0000000..2452c11 --- /dev/null +++ b/benchmarks/scripts/compare.py @@ -0,0 +1,313 @@ +#!/usr/bin/env python3 +"""compare.py — compares baseline (Prometheus) vs ASAP query engine results. + +Applies pass/fail policy: + - FAIL if any query returned an error in ASAP (query failure) + - WARN if any ASAP-native query has relative error > max_error (default 5%) + - WARN (not FAIL) on >10% p95 latency regression (GH runner noise) + +Generates benchmarks/reports/eval_report.md and prints it to stdout. +Exits 1 if any PASS/FAIL check failed. + +Usage: + python benchmarks/scripts/compare.py \ + [--baseline FILE] \ + [--asap FILE] \ + [--output FILE] \ + [--max-error FLOAT] +""" + +import argparse +import json +import os +import sys +from datetime import datetime, timezone + +DEFAULT_BASELINE = os.path.join( + os.path.dirname(__file__), "..", "reports", "baseline_results.json" +) +DEFAULT_ASAP = os.path.join( + os.path.dirname(__file__), "..", "reports", "asap_results.json" +) +DEFAULT_OUTPUT = os.path.join( + os.path.dirname(__file__), "..", "reports", "eval_report.md" +) +DEFAULT_MAX_ERROR = 0.05 # 5 % +LATENCY_REGRESSION_THRESHOLD = 0.10 # 10 % — warn only + + +# ── helpers ────────────────────────────────────────────────────────────────── + +def percentile(values: list[float], pct: float) -> float: + """Compute percentile (0-100) from a list of floats using linear interpolation.""" + if not values: + return float("nan") + sorted_vals = sorted(values) + n = len(sorted_vals) + if n == 1: + return sorted_vals[0] + idx = (pct / 100.0) * (n - 1) + lo = int(idx) + hi = lo + 1 + if hi >= n: + return sorted_vals[-1] + frac = idx - lo + return sorted_vals[lo] * (1 - frac) + sorted_vals[hi] * frac + + +def valid_latencies(latencies: list) -> list[float]: + """Strip None entries (failed HTTP requests) from a latency list.""" + return [x for x in latencies if x is not None] + + +def label_key(metric: dict) -> str: + """Canonical sort key for a Prometheus metric label set.""" + return json.dumps(metric, sort_keys=True) + + +def extract_scalar_value(result: list) -> float | None: + """Return a single float from a scalar/single-entry vector result.""" + if not result: + return None + entry = result[0] + try: + return float(entry["value"][1]) + except (KeyError, IndexError, ValueError, TypeError): + return None + + +def compute_relative_error(baseline_val: float, asap_val: float) -> float: + """Relative error: |asap - baseline| / max(|baseline|, 1e-9).""" + denom = max(abs(baseline_val), 1e-9) + return abs(asap_val - baseline_val) / denom + + +def compare_results( + baseline_result: list, asap_result: list +) -> tuple[float | None, str]: + """ + Compare two Prometheus vector result arrays. + + Returns (max_relative_error, note). + For grouped results, matches entries by label set. + """ + if not baseline_result and not asap_result: + return 0.0, "both empty" + if not baseline_result: + return None, "baseline empty" + if not asap_result: + return None, "asap empty" + + # Build label-key → float maps + baseline_map: dict[str, float] = {} + for entry in baseline_result: + key = label_key(entry.get("metric", {})) + try: + baseline_map[key] = float(entry["value"][1]) + except (KeyError, IndexError, ValueError, TypeError): + pass + + asap_map: dict[str, float] = {} + for entry in asap_result: + key = label_key(entry.get("metric", {})) + try: + asap_map[key] = float(entry["value"][1]) + except (KeyError, IndexError, ValueError, TypeError): + pass + + # Single-value result (no labels / collapsed) + if len(baseline_map) == 1 and len(asap_map) == 1: + bv = next(iter(baseline_map.values())) + av = next(iter(asap_map.values())) + return compute_relative_error(bv, av), "" + + # Multi-value: match by label set + max_err = 0.0 + missing = [] + for key, bv in baseline_map.items(): + if key not in asap_map: + missing.append(key) + continue + err = compute_relative_error(bv, asap_map[key]) + max_err = max(max_err, err) + + note = f"{len(missing)} label set(s) missing from ASAP" if missing else "" + return max_err, note + + +# ── main ───────────────────────────────────────────────────────────────────── + +def main() -> None: + parser = argparse.ArgumentParser(description="Compare baseline vs ASAP results") + parser.add_argument("--baseline", default=DEFAULT_BASELINE, help="Baseline JSON file") + parser.add_argument("--asap", default=DEFAULT_ASAP, help="ASAP results JSON file") + parser.add_argument("--output", default=DEFAULT_OUTPUT, help="Output Markdown file") + parser.add_argument( + "--max-error", + type=float, + default=DEFAULT_MAX_ERROR, + help="Max relative error for ASAP-native queries (default: 0.01 = 1%%)", + ) + args = parser.parse_args() + + with open(args.baseline) as f: + baseline_data = json.load(f) + with open(args.asap) as f: + asap_data = json.load(f) + + baseline_results = baseline_data["results"] + asap_results = asap_data["results"] + + # Collect all query IDs (union) + all_ids = sorted(set(list(baseline_results.keys()) + list(asap_results.keys()))) + + failures: list[str] = [] + warnings: list[str] = [] + rows: list[dict] = [] + + for qid in all_ids: + b = baseline_results.get(qid, {}) + a = asap_results.get(qid, {}) + + approximate = a.get("approximate", False) + asap_status = a.get("status", "missing") + asap_error = a.get("error") + + # Latency p95 + b_lats = valid_latencies(b.get("latencies_ms", [])) + a_lats = valid_latencies(a.get("latencies_ms", [])) + b_p95 = percentile(b_lats, 95) if b_lats else float("nan") + a_p95 = percentile(a_lats, 95) if a_lats else float("nan") + + # Relative error + rel_error: float | None = None + note = "" + if asap_status == "success" and b.get("status") == "success": + rel_error, note = compare_results( + b.get("data", []), a.get("data", []) + ) + + # Pass/fail determination + row_status = "PASS" + + if asap_status == "error" or asap_status == "missing": + row_status = "FAIL" + reason = asap_error or "query not present in ASAP results" + failures.append(f"{qid}: ASAP query failed — {reason}") + + elif approximate and rel_error is not None and rel_error > args.max_error: + row_status = "WARN" + warnings.append( + f"{qid}: relative error {rel_error:.4f} > threshold {args.max_error:.4f} — informational only" + ) + + # Latency regression — warn only + if ( + not (b_p95 != b_p95) # not NaN + and not (a_p95 != a_p95) + and b_p95 > 0 + ): + latency_regression = (a_p95 - b_p95) / b_p95 + if latency_regression > LATENCY_REGRESSION_THRESHOLD: + warnings.append( + f"{qid}: p95 latency regression {latency_regression:.1%} " + f"(baseline={b_p95:.1f}ms, asap={a_p95:.1f}ms) — " + "GH runner noise expected; informational only" + ) + + rows.append( + { + "id": qid, + "approximate": approximate, + "b_p95": b_p95, + "a_p95": a_p95, + "rel_error": rel_error, + "status": row_status, + "note": note, + } + ) + + overall = "PASS" if not failures else "FAIL" + + # ── Build Markdown report ──────────────────────────────────────────────── + now = datetime.now(timezone.utc).isoformat() + lines: list[str] = [] + lines.append("# ASAP PR Evaluation Report") + lines.append("") + lines.append(f"**Generated:** {now}") + lines.append( + f"**Overall verdict:** {'✅ PASS' if overall == 'PASS' else '❌ FAIL'}" + ) + lines.append(f"**Max error threshold:** {args.max_error:.2%}") + lines.append("") + + # Summary table + lines.append( + "| Query | ASAP Native | Baseline p95 (ms) | ASAP p95 (ms) | Rel Error | Status |" + ) + lines.append("|-------|:-----------:|:-----------------:|:-------------:|:---------:|:------:|") + for row in rows: + native_mark = "yes" if row["approximate"] else "no" + b_p95_s = f"{row['b_p95']:.1f}" if row["b_p95"] == row["b_p95"] else "n/a" + a_p95_s = f"{row['a_p95']:.1f}" if row["a_p95"] == row["a_p95"] else "n/a" + if row["rel_error"] is None: + rel_err_s = "n/a" + else: + rel_err_s = f"{row['rel_error']:.4f}" + status_s = "✅ PASS" if row["status"] == "PASS" else "❌ FAIL" + note_s = f" ({row['note']})" if row["note"] else "" + lines.append( + f"| {row['id']}{note_s} | {native_mark} | {b_p95_s} | {a_p95_s} " + f"| {rel_err_s} | {status_s} |" + ) + + lines.append("") + + if failures: + lines.append("## Failures") + lines.append("") + for f_msg in failures: + lines.append(f"- {f_msg}") + lines.append("") + + if warnings: + lines.append("## Latency Warnings (informational)") + lines.append("") + lines.append( + "> **Note:** GitHub-hosted runners exhibit significant timing noise. " + "Latency numbers are indicative only. For precise benchmarks, register " + "a self-hosted runner once asap-tools infra is decoupled from Cloudlab " + "(see PDF eval guide Phase 3)." + ) + lines.append("") + for w in warnings: + lines.append(f"- {w}") + lines.append("") + + lines.append("---") + lines.append( + "_This report was generated automatically by `benchmarks/scripts/compare.py`._" + ) + + report = "\n".join(lines) + print(report) + + os.makedirs(os.path.dirname(os.path.abspath(args.output)), exist_ok=True) + with open(args.output, "w") as f: + f.write(report + "\n") + print(f"\n[compare] Report saved to {args.output}", file=sys.stderr) + + if overall == "FAIL": + print( + f"\n[compare] Evaluation FAILED. {len(failures)} check(s) failed:", + file=sys.stderr, + ) + for msg in failures: + print(f" - {msg}", file=sys.stderr) + sys.exit(1) + else: + print("\n[compare] Evaluation PASSED.", file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/benchmarks/scripts/ingest_wait.sh b/benchmarks/scripts/ingest_wait.sh new file mode 100644 index 0000000..8b12d3a --- /dev/null +++ b/benchmarks/scripts/ingest_wait.sh @@ -0,0 +1,91 @@ +#!/usr/bin/env bash +set -euo pipefail + +# ingest_wait.sh — waits for the asap-demo Arroyo pipeline to reach RUNNING +# state, then sleeps to allow sketches to accumulate before verifying that the +# query engine has ingested data. + +ARROYO_URL="http://localhost:5115/api/v1/pipelines" +QE_URL="http://localhost:8088/api/v1/query" +PIPELINE_NAME="asap-demo" +MAX_PIPELINE_WAIT=600 # seconds — Arroyo must compile Rust UDFs; allow extra time +ACCUMULATE_SLEEP=90 # seconds after pipeline is running +SLEEP=5 + +# ── 1. Wait for asap-demo pipeline to reach RUNNING ───────────────────────── +echo "[ingest_wait] Waiting for Arroyo pipeline '${PIPELINE_NAME}' to reach RUNNING state ..." +elapsed=0 +while true; do + state=$(curl -sf --max-time 10 "${ARROYO_URL}" 2>/dev/null \ + | python3 -c " +import sys, json +try: + data = json.load(sys.stdin) + # 'data' key may be null when no pipelines exist; use 'or []' to handle that + pipelines = data if isinstance(data, list) else (data.get('data') or []) + for p in pipelines: + name = str(p.get('name') or p.get('id') or '') + # Normalise hyphens/underscores so 'asap-demo' matches 'asap_demo' + if '${PIPELINE_NAME}'.replace('-', '_') in name.replace('-', '_'): + state = p.get('state') + stop = p.get('stop', '') + # Arroyo signals a running pipeline via state=null and stop='none' + if state is None and stop == 'none': + print('Running') + elif state is not None: + print(str(state)) + else: + print('stopped') + break + else: + # No matching pipeline found yet — print nothing so caller retries + pass +except Exception: + pass +" 2>/dev/null || true) + + if [ "${state}" = "Running" ] || [ "${state}" = "RUNNING" ] || [ "${state}" = "running" ]; then + echo "[ingest_wait] Pipeline '${PIPELINE_NAME}' is RUNNING (${elapsed}s elapsed)" + break + fi + + if [ "${elapsed}" -ge "${MAX_PIPELINE_WAIT}" ]; then + echo "[ingest_wait] ERROR: Pipeline '${PIPELINE_NAME}' did not reach RUNNING within ${MAX_PIPELINE_WAIT}s (last state: '${state:-unknown}')" >&2 + # Dump pipeline list for diagnosis + echo "[ingest_wait] Current Arroyo pipeline list:" >&2 + curl -sf --max-time 10 "${ARROYO_URL}" 2>/dev/null | python3 -m json.tool 2>/dev/null >&2 || true + exit 1 + fi + + echo "[ingest_wait] Pipeline state: '${state:-not found}' — retrying in ${SLEEP}s (${elapsed}s elapsed) ..." + sleep "${SLEEP}" + elapsed=$(( elapsed + SLEEP )) +done + +# ── 2. Allow sketches to accumulate ───────────────────────────────────────── +echo "[ingest_wait] Pipeline running. Sleeping ${ACCUMULATE_SLEEP}s for sketches to accumulate ..." +sleep "${ACCUMULATE_SLEEP}" + +# ── 3. Verify query engine has data ───────────────────────────────────────── +echo "[ingest_wait] Verifying query engine has data ..." +response=$(curl -sf --max-time 10 \ + "${QE_URL}?query=avg%28sensor_reading%29" 2>/dev/null || true) + +if [ -z "${response}" ]; then + echo "[ingest_wait] ERROR: Query engine returned empty response." >&2 + exit 1 +fi + +result_count=$(echo "${response}" | python3 -c " +import sys, json +data = json.load(sys.stdin) +result = data.get('data', {}).get('result', []) +print(len(result)) +" 2>/dev/null || echo "0") + +if [ "${result_count}" -eq 0 ]; then + echo "[ingest_wait] ERROR: Query engine has no data yet (result array is empty)." >&2 + exit 1 +fi + +echo "[ingest_wait] Query engine has data (${result_count} result entries). Ready for benchmarking." diff --git a/benchmarks/scripts/run_asap.py b/benchmarks/scripts/run_asap.py new file mode 100644 index 0000000..1c6d145 --- /dev/null +++ b/benchmarks/scripts/run_asap.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +"""run_asap.py — queries ASAPQuery engine for each entry in promql_suite.json. + +Runs each query 3 times to gather latency samples, then writes results to +benchmarks/reports/asap_results.json. + +Usage: + python benchmarks/scripts/run_asap.py \ + [--asap-url URL] \ + [--output FILE] +""" + +import argparse +import json +import os +import time +import urllib.parse +from datetime import datetime, timezone + +import requests + +SUITE_PATH = os.path.join( + os.path.dirname(__file__), "..", "queries", "promql_suite.json" +) +DEFAULT_OUTPUT = os.path.join( + os.path.dirname(__file__), "..", "reports", "asap_results.json" +) +RUNS_PER_QUERY = 3 + + +def query_asap(base_url: str, expr: str, ts: float) -> tuple[dict, float]: + """Issue a single instant query to the ASAP query engine, return (parsed_json, latency_ms).""" + encoded = urllib.parse.quote(expr, safe="") + url = f"{base_url}/api/v1/query?query={encoded}&time={ts}" + t0 = time.monotonic() + resp = requests.get(url, timeout=30) + latency_ms = (time.monotonic() - t0) * 1000.0 + resp.raise_for_status() + return resp.json(), latency_ms + + +def main() -> None: + parser = argparse.ArgumentParser(description="Run ASAP query engine benchmark queries") + parser.add_argument( + "--asap-url", + default="http://localhost:8088", + help="ASAP query engine base URL (default: http://localhost:8088)", + ) + parser.add_argument( + "--output", + default=DEFAULT_OUTPUT, + help="Output JSON file path", + ) + args = parser.parse_args() + + with open(SUITE_PATH) as f: + suite = json.load(f) + + results: dict[str, dict] = {} + now = time.time() + + for q in suite["queries"]: + qid = q["id"] + expr = q["expr"] + approximate = q.get("approximate", False) + latencies = [] + last_data = [] + last_error = None + last_status = "success" + + print(f"[asap] Running query '{qid}' (approximate={approximate}): {expr}") + for run in range(1, RUNS_PER_QUERY + 1): + try: + payload, lat = query_asap(args.asap_url, expr, now) + latencies.append(lat) + if payload.get("status") == "success": + last_data = payload.get("data", {}).get("result", []) + last_status = "success" + last_error = None + else: + last_status = "error" + last_error = payload.get("error", "unknown error") + last_data = [] + print(f" run {run}/{RUNS_PER_QUERY}: {lat:.1f} ms status={last_status}") + except Exception as exc: # noqa: BLE001 + last_status = "error" + last_error = str(exc) + last_data = [] + latencies.append(None) + print(f" run {run}/{RUNS_PER_QUERY}: ERROR — {exc}") + + results[qid] = { + "status": last_status, + "approximate": approximate, + "latencies_ms": latencies, + "data": last_data, + "error": last_error, + } + + output = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "asap_url": args.asap_url, + "results": results, + } + + os.makedirs(os.path.dirname(os.path.abspath(args.output)), exist_ok=True) + with open(args.output, "w") as f: + json.dump(output, f, indent=2) + print(f"\n[asap] Results saved to {args.output}") + + +if __name__ == "__main__": + main() diff --git a/benchmarks/scripts/run_baseline.py b/benchmarks/scripts/run_baseline.py new file mode 100644 index 0000000..6e09e41 --- /dev/null +++ b/benchmarks/scripts/run_baseline.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 +"""run_baseline.py — queries Prometheus for each entry in promql_suite.json. + +Runs each query 3 times to gather latency samples, then writes results to +benchmarks/reports/baseline_results.json. + +Usage: + python benchmarks/scripts/run_baseline.py \ + [--prometheus-url URL] \ + [--output FILE] +""" + +import argparse +import json +import os +import time +import urllib.parse +from datetime import datetime, timezone + +import requests + +SUITE_PATH = os.path.join( + os.path.dirname(__file__), "..", "queries", "promql_suite.json" +) +DEFAULT_OUTPUT = os.path.join( + os.path.dirname(__file__), "..", "reports", "baseline_results.json" +) +RUNS_PER_QUERY = 3 + + +def query_prometheus(base_url: str, expr: str, ts: float) -> tuple[dict, float]: + """Issue a single instant query, return (parsed_json, latency_ms).""" + encoded = urllib.parse.quote(expr, safe="") + url = f"{base_url}/api/v1/query?query={encoded}&time={ts}" + t0 = time.monotonic() + resp = requests.get(url, timeout=30) + latency_ms = (time.monotonic() - t0) * 1000.0 + resp.raise_for_status() + return resp.json(), latency_ms + + +def main() -> None: + parser = argparse.ArgumentParser(description="Run baseline Prometheus queries") + parser.add_argument( + "--prometheus-url", + default="http://localhost:9090", + help="Prometheus base URL (default: http://localhost:9090)", + ) + parser.add_argument( + "--output", + default=DEFAULT_OUTPUT, + help="Output JSON file path", + ) + args = parser.parse_args() + + with open(SUITE_PATH) as f: + suite = json.load(f) + + results: dict[str, dict] = {} + now = time.time() + + for q in suite["queries"]: + qid = q["id"] + expr = q["expr"] + latencies = [] + last_data = [] + last_error = None + last_status = "success" + + print(f"[baseline] Running query '{qid}': {expr}") + for run in range(1, RUNS_PER_QUERY + 1): + try: + payload, lat = query_prometheus(args.prometheus_url, expr, now) + latencies.append(lat) + if payload.get("status") == "success": + last_data = payload.get("data", {}).get("result", []) + last_status = "success" + last_error = None + else: + last_status = "error" + last_error = payload.get("error", "unknown error") + last_data = [] + print(f" run {run}/{RUNS_PER_QUERY}: {lat:.1f} ms status={last_status}") + except Exception as exc: # noqa: BLE001 + last_status = "error" + last_error = str(exc) + last_data = [] + latencies.append(None) + print(f" run {run}/{RUNS_PER_QUERY}: ERROR — {exc}") + + results[qid] = { + "status": last_status, + "latencies_ms": latencies, + "data": last_data, + "error": last_error, + } + + output = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "prometheus_url": args.prometheus_url, + "results": results, + } + + os.makedirs(os.path.dirname(os.path.abspath(args.output)), exist_ok=True) + with open(args.output, "w") as f: + json.dump(output, f, indent=2) + print(f"\n[baseline] Results saved to {args.output}") + + +if __name__ == "__main__": + main() diff --git a/benchmarks/scripts/wait_for_stack.sh b/benchmarks/scripts/wait_for_stack.sh new file mode 100644 index 0000000..672aa6d --- /dev/null +++ b/benchmarks/scripts/wait_for_stack.sh @@ -0,0 +1,34 @@ +#!/usr/bin/env bash +set -euo pipefail + +# wait_for_stack.sh — polls each service until healthy or times out. +# Used in CI after `docker compose up -d --build`. + +MAX_WAIT=180 # seconds per service +SLEEP=5 + +wait_for_url() { + local name="$1" + local url="$2" + local elapsed=0 + + echo "[wait_for_stack] Waiting for ${name} at ${url} ..." + while true; do + if curl -sf --max-time 5 "${url}" > /dev/null 2>&1; then + echo "[wait_for_stack] ${name} is healthy (${elapsed}s elapsed)" + return 0 + fi + if [ "${elapsed}" -ge "${MAX_WAIT}" ]; then + echo "[wait_for_stack] ERROR: ${name} did not become healthy within ${MAX_WAIT}s" >&2 + return 1 + fi + sleep "${SLEEP}" + elapsed=$(( elapsed + SLEEP )) + done +} + +wait_for_url "Prometheus" "http://localhost:9090/-/healthy" +wait_for_url "Arroyo API" "http://localhost:5115/api/v1/pipelines" +wait_for_url "QueryEngine" "http://localhost:8088/api/v1/query?query=vector(1)" + +echo "[wait_for_stack] All services are healthy."