diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 9b52c11..c1376aa 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -8,7 +8,6 @@ on: branches: [main] paths: - 'asap-common/installation/**' - - 'asap-planner/**' - 'asap-planner-rs/**' - 'asap-summary-ingest/**' - 'asap-query-engine/**' @@ -70,22 +69,6 @@ jobs: docker push ghcr.io/projectasap/asap-base:${{ steps.tag.outputs.value }} docker push ghcr.io/projectasap/asap-base:latest - # --- Planner (Python, depends on base) --- - - name: Build planner image - run: | - docker build \ - -t asap-planner:local \ - -f asap-planner/Dockerfile \ - asap-planner - - - name: Push planner image - if: startsWith(github.ref, 'refs/tags/') || github.event_name == 'workflow_dispatch' - run: | - docker tag asap-planner:local ghcr.io/projectasap/asap-planner:${{ steps.tag.outputs.value }} - docker tag asap-planner:local ghcr.io/projectasap/asap-planner:latest - docker push ghcr.io/projectasap/asap-planner:${{ steps.tag.outputs.value }} - docker push ghcr.io/projectasap/asap-planner:latest - # --- Planner RS (Rust) --- - name: Build and push planner-rs uses: docker/build-push-action@v6 diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 858230c..d0990ea 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -6,7 +6,6 @@ on: paths: - 'asap-summary-ingest/**' - 'asap-tools/queriers/prometheus-client/**' - - 'asap-planner/**' - 'asap-tools/**' - 'asap-tools/data-sources/prometheus-exporters/**' - 'asap-tools/execution-utilities/**' @@ -17,7 +16,6 @@ on: paths: - 'asap-summary-ingest/**' - 'asap-tools/queriers/prometheus-client/**' - - 'asap-planner/**' - 'asap-tools/**' - 'asap-tools/data-sources/prometheus-exporters/**' - 'asap-tools/execution-utilities/**' @@ -35,7 +33,6 @@ jobs: outputs: summary_ingest: ${{ steps.filter.outputs.summary_ingest }} prometheus_client: ${{ steps.filter.outputs.prometheus_client }} - controller: ${{ steps.filter.outputs.controller }} utilities: ${{ steps.filter.outputs.utilities }} prometheus_exporters: ${{ steps.filter.outputs.prometheus_exporters }} execution_utilities: ${{ steps.filter.outputs.execution_utilities }} @@ -51,9 +48,6 @@ jobs: prometheus_client: - 'asap-tools/queriers/prometheus-client/**' - 'asap-common/dependencies/py/**' - controller: - - 'asap-planner/**' - - 'asap-common/dependencies/py/**' utilities: - 'asap-tools/**' - 'asap-common/dependencies/py/**' @@ -125,35 +119,6 @@ jobs: working-directory: asap-tools/queriers/prometheus-client run: mypy . - test-controller: - needs: detect-changes - if: needs.detect-changes.outputs.controller == 'true' - runs-on: ubuntu-latest - strategy: - matrix: - python-version: ["3.8", "3.9", "3.10", "3.11"] - steps: - - uses: actions/checkout@v4 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 - with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install black==24.8.0 flake8==6.1.0 - if [ -f asap-planner/requirements.txt ]; then pip install -r asap-planner/requirements.txt; fi - - name: Check formatting with Black - working-directory: asap-planner - run: black --check --diff . - - name: Lint with flake8 - working-directory: asap-planner - run: | - # Stop the build if there are Python syntax errors or undefined names - flake8 . --config=../.flake8 --count --select=E9,F63,F7,F82 --show-source --statistics - # Exit-zero treats all errors as warnings - flake8 . --config=../.flake8 --count --exit-zero --max-complexity=10 --statistics - test-utilities: needs: detect-changes if: needs.detect-changes.outputs.utilities == 'true' diff --git a/README.md b/README.md index 9c3edca..ec3aa34 100644 --- a/README.md +++ b/README.md @@ -44,11 +44,11 @@ ASAPQuery uses **streaming sketches** to: ## Architecture -ASAPQuery has four main components: the **asap-planner** generates sketch configurations from your query workload, **asap-sketch-ingest** deploys streaming pipelines in **Arroyo** that continuously build sketches from live Prometheus metrics, and **asap-query-engine** intercepts PromQL queries and serves them from those pre-computed sketches. +ASAPQuery has four main components: the **asap-planner-rs** generates sketch configurations from your query workload, **asap-sketch-ingest** deploys streaming pipelines in **Arroyo** that continuously build sketches from live Prometheus metrics, and **asap-query-engine** intercepts PromQL queries and serves them from those pre-computed sketches. ### Components -- **[asap-planner](asap-planner/)** - Analyzes a PromQL query workload and auto-generates sketch configurations for asap-sketch-ingest and asap-query-engine +- **[asap-planner-rs](asap-planner-rs/)** - Analyzes a PromQL query workload and auto-generates sketch configurations for asap-sketch-ingest and asap-query-engine - **[asap-sketch-ingest](asap-sketch-ingest/)** - Deploys Arroyo streaming pipelines that continuously compute and publish sketches from live metrics - **[arroyo](https://github.com/ProjectASAP/arroyo)** - Fork of the [Arroyo](https://github.com/ArroyoSystems/arroyo) stream processing engine that runs the sketch-building SQL pipelines - **[asap-query-engine](asap-query-engine/)** - Intercepts incoming PromQL queries and serves them from pre-computed sketches, falling back to Prometheus for unsupported queries @@ -57,7 +57,7 @@ ASAPQuery has four main components: the **asap-planner** generates sketch config ``` ├── asap-quickstart/ # Self-contained demo (start here!) -├── asap-planner/ # Auto-configuration service +├── asap-planner-rs/ # Auto-configuration service ├── asap-sketch-ingest/ # Arroyo pipeline deployer └── asap-query-engine/ # Query serving engine # Note: Arroyo fork lives at https://github.com/ProjectASAP/arroyo diff --git a/asap-planner/controller-cli-compose.yml.j2 b/asap-planner-rs/controller-cli-compose.yml.j2 similarity index 100% rename from asap-planner/controller-cli-compose.yml.j2 rename to asap-planner-rs/controller-cli-compose.yml.j2 diff --git a/asap-planner-rs/tests/comparison/comparator.py b/asap-planner-rs/tests/comparison/comparator.py deleted file mode 100644 index 3bef1e8..0000000 --- a/asap-planner-rs/tests/comparison/comparator.py +++ /dev/null @@ -1,119 +0,0 @@ -#!/usr/bin/env python3 -"""Semantic comparison of Python and Rust planner outputs.""" -import yaml -import sys -from typing import List, Optional - - -def normalize_labels(labels) -> frozenset: - if labels is None: - return frozenset() - if isinstance(labels, list): - return frozenset(labels) - return frozenset() - - -def agg_signature(agg: dict) -> tuple: - """Canonical signature for matching aggregations across Python/Rust outputs.""" - return ( - agg.get("aggregationType", ""), - agg.get("aggregationSubType", ""), - agg.get("metric", "") or agg.get("table_name", ""), - agg.get("spatialFilter", ""), - normalize_labels(agg.get("labels", {}).get("rollup")), - normalize_labels(agg.get("labels", {}).get("grouping")), - normalize_labels(agg.get("labels", {}).get("aggregated")), - ) - - -def compare_streaming_configs(py_path: str, rs_path: str) -> List[str]: - """Compare two streaming_config.yaml files. Returns list of error messages.""" - errors = [] - with open(py_path) as f: - py = yaml.safe_load(f) - with open(rs_path) as f: - rs = yaml.safe_load(f) - - py_aggs = py.get("aggregations", []) - rs_aggs = rs.get("aggregations", []) - - if len(py_aggs) != len(rs_aggs): - errors.append(f"Aggregation count mismatch: Python={len(py_aggs)}, Rust={len(rs_aggs)}") - return errors - - py_by_sig = {agg_signature(a): a for a in py_aggs} - rs_by_sig = {agg_signature(a): a for a in rs_aggs} - - for sig, py_agg in py_by_sig.items(): - if sig not in rs_by_sig: - errors.append(f"Aggregation missing in Rust: {sig}") - continue - rs_agg = rs_by_sig[sig] - - for field in ["windowType", "windowSize", "slideInterval", "tumblingWindowSize"]: - if py_agg.get(field) != rs_agg.get(field): - errors.append( - f"Field '{field}' mismatch for {sig}: Python={py_agg.get(field)}, Rust={rs_agg.get(field)}" - ) - - if py_agg.get("parameters") != rs_agg.get("parameters"): - errors.append( - f"Parameters mismatch for {sig}: Python={py_agg.get('parameters')}, Rust={rs_agg.get('parameters')}" - ) - - return errors - - -def compare_inference_configs(py_path: str, rs_path: str) -> List[str]: - """Compare two inference_config.yaml files. Returns list of error messages.""" - errors = [] - with open(py_path) as f: - py = yaml.safe_load(f) - with open(rs_path) as f: - rs = yaml.safe_load(f) - - py_policy = py.get("cleanup_policy", {}).get("name", "") - rs_policy = rs.get("cleanup_policy", {}).get("name", "") - if py_policy != rs_policy: - errors.append(f"Cleanup policy mismatch: Python={py_policy}, Rust={rs_policy}") - - py_queries = py.get("queries", []) - rs_queries = rs.get("queries", []) - - if len(py_queries) != len(rs_queries): - errors.append(f"Query count mismatch: Python={len(py_queries)}, Rust={len(rs_queries)}") - return errors - - py_by_q = {q["query"]: q for q in py_queries} - rs_by_q = {q["query"]: q for q in rs_queries} - - for query, py_q in py_by_q.items(): - if query not in rs_by_q: - errors.append(f"Query missing in Rust output: {query}") - continue - rs_q = rs_by_q[query] - - py_agg_count = len(py_q.get("aggregations", [])) - rs_agg_count = len(rs_q.get("aggregations", [])) - if py_agg_count != rs_agg_count: - errors.append( - f"Aggregation count mismatch for query '{query}': Python={py_agg_count}, Rust={rs_agg_count}" - ) - - return errors - - -if __name__ == "__main__": - if len(sys.argv) != 5: - print("Usage: comparator.py ") - sys.exit(1) - - errors = compare_streaming_configs(sys.argv[1], sys.argv[3]) - errors += compare_inference_configs(sys.argv[2], sys.argv[4]) - - if errors: - for e in errors: - print(f" DIFF: {e}") - sys.exit(1) - else: - print(" MATCH") diff --git a/asap-planner-rs/tests/comparison/master_test_runner.py b/asap-planner-rs/tests/comparison/master_test_runner.py deleted file mode 100644 index c288a62..0000000 --- a/asap-planner-rs/tests/comparison/master_test_runner.py +++ /dev/null @@ -1,146 +0,0 @@ -#!/usr/bin/env python3 -"""Run all comparison tests between Python and Rust planner.""" -import os -import sys -import json -import shutil -import subprocess - -REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../..")) -RUST_BINARY = os.path.join(REPO_ROOT, "target", "release", "asap-planner") -COMPARATOR = os.path.join(os.path.dirname(__file__), "comparator.py") -TEST_CASES_PATH = os.path.join(os.path.dirname(__file__), "test_cases.json") -COMPARISON_DIR = os.path.dirname(__file__) -TEST_OUTPUTS_DIR = os.path.join(os.path.dirname(__file__), "test_outputs") - - -def run_python_planner(tc: dict, output_dir: str) -> bool: - planner_script = os.path.join(REPO_ROOT, "asap-planner", "main_controller.py") - args = [ - sys.executable, - planner_script, - "--input_config", - os.path.join(COMPARISON_DIR, tc["input_config"]), - "--output_dir", - output_dir, - "--prometheus_scrape_interval", - str(tc["prometheus_scrape_interval"]), - "--streaming_engine", - tc["streaming_engine"], - ] - if tc.get("enable_punting"): - args.append("--enable-punting") - if tc.get("range_duration", 0) > 0: - args += ["--range-duration", str(tc["range_duration"])] - if tc.get("step", 0) > 0: - args += ["--step", str(tc["step"])] - - env = os.environ.copy() - py_utils = os.path.join( - REPO_ROOT, "asap-common", "dependencies", "py", "promql_utilities" - ) - planner_path = os.path.join(REPO_ROOT, "asap-planner") - env["PYTHONPATH"] = f"{py_utils}:{planner_path}:{env.get('PYTHONPATH', '')}" - - result = subprocess.run(args, env=env, capture_output=True, text=True) - if result.returncode != 0: - print(f" Python planner failed:\n{result.stderr}") - return False - return True - - -def run_rust_planner(tc: dict, output_dir: str) -> bool: - args = [ - RUST_BINARY, - "--input_config", - os.path.join(COMPARISON_DIR, tc["input_config"]), - "--output_dir", - output_dir, - "--prometheus_scrape_interval", - str(tc["prometheus_scrape_interval"]), - "--streaming_engine", - tc["streaming_engine"], - ] - if tc.get("enable_punting"): - args.append("--enable-punting") - if tc.get("range_duration", 0) > 0: - args += ["--range-duration", str(tc["range_duration"])] - if tc.get("step", 0) > 0: - args += ["--step", str(tc["step"])] - - result = subprocess.run(args, capture_output=True, text=True) - if result.returncode != 0: - print(f" Rust planner failed:\n{result.stderr}") - return False - return True - - -def main(): - # Build Rust binary - print("Building Rust planner...") - build = subprocess.run( - ["cargo", "build", "-p", "asap_planner", "--release"], - cwd=REPO_ROOT, - capture_output=True, - text=True, - ) - if build.returncode != 0: - print(f"Build failed:\n{build.stderr}") - sys.exit(1) - print("Build OK\n") - - with open(TEST_CASES_PATH) as f: - test_cases = json.load(f)["test_cases"] - - passed = 0 - failed = 0 - - # Clear and recreate test_outputs dir - if os.path.exists(TEST_OUTPUTS_DIR): - shutil.rmtree(TEST_OUTPUTS_DIR) - os.makedirs(TEST_OUTPUTS_DIR) - - for tc in test_cases: - print(f"Test: {tc['id']}") - tc_out = os.path.join(TEST_OUTPUTS_DIR, tc["id"]) - py_dir = os.path.join(tc_out, "python") - rs_dir = os.path.join(tc_out, "rust") - os.makedirs(py_dir) - os.makedirs(rs_dir) - - if not run_python_planner(tc, py_dir): - print(" [FAIL] Python planner error") - failed += 1 - continue - if not run_rust_planner(tc, rs_dir): - print(" [FAIL] Rust planner error") - failed += 1 - continue - - result = subprocess.run( - [ - sys.executable, - COMPARATOR, - os.path.join(py_dir, "streaming_config.yaml"), - os.path.join(py_dir, "inference_config.yaml"), - os.path.join(rs_dir, "streaming_config.yaml"), - os.path.join(rs_dir, "inference_config.yaml"), - ], - capture_output=True, - text=True, - ) - if result.returncode == 0: - print(" [PASS]") - passed += 1 - else: - print(" [FAIL]") - print(result.stdout) - failed += 1 - - print(f"\nResults: {passed}/{passed + failed} passed") - if failed > 0: - sys.exit(1) - - -if __name__ == "__main__": - main() diff --git a/asap-planner-rs/tests/comparison/test_cases.json b/asap-planner-rs/tests/comparison/test_cases.json deleted file mode 100644 index c9727e4..0000000 --- a/asap-planner-rs/tests/comparison/test_cases.json +++ /dev/null @@ -1,643 +0,0 @@ -{ - "test_cases": [ - { - "id": "mixed_workload", - "input_config": "test_data/configs/mixed_workload.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "range_query", - "input_config": "test_data/configs/range_query.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 3600, - "step": 60 - }, - { - "id": "cleanup_circular", - "input_config": "test_data/configs/cleanup_circular.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "quantile_over_time", - "input_config": "test_data/configs/quantile_over_time.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "spatial_quantile", - "input_config": "test_data/configs/spatial_quantile.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "rate_increase", - "input_config": "test_data/configs/rate_increase.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "topk", - "input_config": "test_data/configs/topk.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "cleanup_read_based", - "input_config": "test_data/configs/cleanup_read_based.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "deduplicated", - "input_config": "test_data/configs/deduplicated.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "increase", - "input_config": "test_data/configs/increase.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "sum_over_time", - "input_config": "test_data/configs/sum_over_time.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "sum_by", - "input_config": "test_data/configs/sum_by.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "temporal_overlapping", - "input_config": "test_data/configs/temporal_overlapping.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "sum_by_overlapping", - "input_config": "test_data/configs/sum_by_overlapping.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "arroyo_optimized_test_generated_1_run1", - "input_config": "test_data/configs/experiments/arroyo_optimized_test_generated_1_run1.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": true, - "range_duration": 0, - "step": 0 - }, - { - "id": "arroyo_optimized_test_generated_1_run1_only_temporal", - "input_config": "test_data/configs/experiments/arroyo_optimized_test_generated_1_run1_only_temporal.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": true, - "range_duration": 0, - "step": 0 - }, - { - "id": "arroyo_optimized_test_generated_1_run2_only_temporal", - "input_config": "test_data/configs/experiments/arroyo_optimized_test_generated_1_run2_only_temporal.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": true, - "range_duration": 0, - "step": 0 - }, - { - "id": "bad_workload_1_test_1", - "input_config": "test_data/configs/experiments/bad_workload_1_test_1.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "debugging_qot_1", - "input_config": "test_data/configs/experiments/debugging_qot_1.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "debugging_rate_1", - "input_config": "test_data/configs/experiments/debugging_rate_1.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "final_generated_workload_prometheus_20251204_231532_10", - "input_config": "test_data/configs/experiments/final_generated_workload_prometheus_20251204_231532_10.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": true, - "range_duration": 0, - "step": 0 - }, - { - "id": "final_generated_workload_prometheus_20251204_231532_2", - "input_config": "test_data/configs/experiments/final_generated_workload_prometheus_20251204_231532_2.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": true, - "range_duration": 0, - "step": 0 - }, - { - "id": "final_generated_workload_prometheus_20251204_231532_3", - "input_config": "test_data/configs/experiments/final_generated_workload_prometheus_20251204_231532_3.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": true, - "range_duration": 0, - "step": 0 - }, - { - "id": "final_generated_workload_prometheus_20251204_231532_4", - "input_config": "test_data/configs/experiments/final_generated_workload_prometheus_20251204_231532_4.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": true, - "range_duration": 0, - "step": 0 - }, - { - "id": "final_generated_workload_prometheus_20251204_231532_5", - "input_config": "test_data/configs/experiments/final_generated_workload_prometheus_20251204_231532_5.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": true, - "range_duration": 0, - "step": 0 - }, - { - "id": "final_generated_workload_prometheus_20251204_231532_6", - "input_config": "test_data/configs/experiments/final_generated_workload_prometheus_20251204_231532_6.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": true, - "range_duration": 0, - "step": 0 - }, - { - "id": "final_generated_workload_prometheus_20251204_231532_7", - "input_config": "test_data/configs/experiments/final_generated_workload_prometheus_20251204_231532_7.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": true, - "range_duration": 0, - "step": 0 - }, - { - "id": "final_generated_workload_prometheus_20251204_231532_8", - "input_config": "test_data/configs/experiments/final_generated_workload_prometheus_20251204_231532_8.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": true, - "range_duration": 0, - "step": 0 - }, - { - "id": "final_generated_workload_prometheus_20251204_231532_9", - "input_config": "test_data/configs/experiments/final_generated_workload_prometheus_20251204_231532_9.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": true, - "range_duration": 0, - "step": 0 - }, - { - "id": "generated_workload_prometheus_20251204_231532_1_no_parallel", - "input_config": "test_data/configs/experiments/generated_workload_prometheus_20251204_231532_1_no_parallel.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "generated_workload_victoriametrics_20251204_231532_10", - "input_config": "test_data/configs/experiments/generated_workload_victoriametrics_20251204_231532_10.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "generated_workload_victoriametrics_20251204_231532_2", - "input_config": "test_data/configs/experiments/generated_workload_victoriametrics_20251204_231532_2.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "generated_workload_victoriametrics_20251204_231532_3", - "input_config": "test_data/configs/experiments/generated_workload_victoriametrics_20251204_231532_3.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "generated_workload_victoriametrics_20251204_231532_4", - "input_config": "test_data/configs/experiments/generated_workload_victoriametrics_20251204_231532_4.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "generated_workload_victoriametrics_20251204_231532_5", - "input_config": "test_data/configs/experiments/generated_workload_victoriametrics_20251204_231532_5.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "generated_workload_victoriametrics_20251204_231532_6", - "input_config": "test_data/configs/experiments/generated_workload_victoriametrics_20251204_231532_6.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "generated_workload_victoriametrics_20251204_231532_7", - "input_config": "test_data/configs/experiments/generated_workload_victoriametrics_20251204_231532_7.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "generated_workload_victoriametrics_20251204_231532_8", - "input_config": "test_data/configs/experiments/generated_workload_victoriametrics_20251204_231532_8.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "generated_workload_victoriametrics_20251204_231532_9", - "input_config": "test_data/configs/experiments/generated_workload_victoriametrics_20251204_231532_9.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "non_quantile_1_test", - "input_config": "test_data/configs/experiments/non_quantile_1_test.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "non_quantile_1s_4queries_100valuesperlabel_2labels", - "input_config": "test_data/configs/experiments/non_quantile_1s_4queries_100valuesperlabel_2labels.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "non_quantile_30m_10_card_1000_1", - "input_config": "test_data/configs/experiments/non_quantile_30m_10_card_1000_1.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "non_quantile_30m_30_card_1000_1", - "input_config": "test_data/configs/experiments/non_quantile_30m_30_card_1000_1.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "non_quantile_30m_60_card_1000_1", - "input_config": "test_data/configs/experiments/non_quantile_30m_60_card_1000_1.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "post_optimization_generated_workload_prometheus_20251204_231532_4_nopunt_debug4", - "input_config": "test_data/configs/experiments/post_optimization_generated_workload_prometheus_20251204_231532_4_nopunt_debug4.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": true, - "range_duration": 0, - "step": 0 - }, - { - "id": "qot_120m_1_card_2_0", - "input_config": "test_data/configs/experiments/qot_120m_1_card_2_0.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "qot_120m_1_card_2_7", - "input_config": "test_data/configs/experiments/qot_120m_1_card_2_7.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "qot_15m_1_card_2_0", - "input_config": "test_data/configs/experiments/qot_15m_1_card_2_0.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "qot_15m_1_card_2_7_rerun", - "input_config": "test_data/configs/experiments/qot_15m_1_card_2_7_rerun.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "qot_15m_1s_card_1000", - "input_config": "test_data/configs/experiments/qot_15m_1s_card_1000.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "qot_15m_1s_optimizedmerge_1", - "input_config": "test_data/configs/experiments/qot_15m_1s_optimizedmerge_1.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "qot_15m_300s_card_1000_2", - "input_config": "test_data/configs/experiments/qot_15m_300s_card_1000_2.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "qot_15m_60s_card_1000_2", - "input_config": "test_data/configs/experiments/qot_15m_60s_card_1000_2.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "qot_15m_900s_card_1000", - "input_config": "test_data/configs/experiments/qot_15m_900s_card_1000.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "qot_1m_1_card_2_0", - "input_config": "test_data/configs/experiments/qot_1m_1_card_2_0.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "qot_30m_15m", - "input_config": "test_data/configs/experiments/qot_30m_15m.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "qot_30m_1_card_2_0", - "input_config": "test_data/configs/experiments/qot_30m_1_card_2_0.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "qot_30m_1m", - "input_config": "test_data/configs/experiments/qot_30m_1m.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "qot_30m_30m", - "input_config": "test_data/configs/experiments/qot_30m_30m.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "qot_30m_5m", - "input_config": "test_data/configs/experiments/qot_30m_5m.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "qot_60m_1_card_2_0", - "input_config": "test_data/configs/experiments/qot_60m_1_card_2_0.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "quantile_1s_10queries_100valuesperlabel_2labels", - "input_config": "test_data/configs/experiments/quantile_1s_10queries_100valuesperlabel_2labels.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "quantile_1s_10queries_10valuesperlabel", - "input_config": "test_data/configs/experiments/quantile_1s_10queries_10valuesperlabel.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "quantile_1s_10queries_10valuesperlabel_3_K_10", - "input_config": "test_data/configs/experiments/quantile_1s_10queries_10valuesperlabel_3_K_10.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "quantile_1s_10queries_10valuesperlabel_3_K_100", - "input_config": "test_data/configs/experiments/quantile_1s_10queries_10valuesperlabel_3_K_100.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "quantile_1s_10queries_10valuesperlabel_3_K_20", - "input_config": "test_data/configs/experiments/quantile_1s_10queries_10valuesperlabel_3_K_20.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "quantile_1s_10queries_10valuesperlabel_3_K_50", - "input_config": "test_data/configs/experiments/quantile_1s_10queries_10valuesperlabel_3_K_50.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "quantile_1s_10queries_10valuesperlabel_3_K_8", - "input_config": "test_data/configs/experiments/quantile_1s_10queries_10valuesperlabel_3_K_8.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "result_1_collapsable_1", - "input_config": "test_data/configs/experiments/result_1_collapsable_1.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "result_1_collapsable_3", - "input_config": "test_data/configs/experiments/result_1_collapsable_3.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - }, - { - "id": "result_1_non_quantile_1s_1_vm", - "input_config": "test_data/configs/experiments/result_1_non_quantile_1s_1_vm.yaml", - "prometheus_scrape_interval": 15, - "streaming_engine": "arroyo", - "enable_punting": false, - "range_duration": 0, - "step": 0 - } - ] -} diff --git a/asap-planner/.gitignore b/asap-planner/.gitignore deleted file mode 100644 index 35758b0..0000000 --- a/asap-planner/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -**/*.pyc -**/__pycache__ -.DS_store diff --git a/asap-planner/Dockerfile b/asap-planner/Dockerfile deleted file mode 100644 index 7b66ff0..0000000 --- a/asap-planner/Dockerfile +++ /dev/null @@ -1,24 +0,0 @@ -FROM sketchdb-base:latest - -LABEL maintainer="SketchDB Team" -LABEL description="Main Controller for SketchDB" - -# Set working directory -WORKDIR /app - -# Copy requirements first for better layer caching -COPY requirements.txt . - -# Install Python dependencies -RUN pip install --no-cache-dir -r requirements.txt - -# Copy application code -COPY classes/ ./classes/ -COPY utils/ ./utils/ -COPY main_controller.py . - -# Create directories for input/output -RUN mkdir -p /app/input /app/output - -# Set the entry point -ENTRYPOINT ["python", "main_controller.py"] \ No newline at end of file diff --git a/asap-planner/LICENSE b/asap-planner/LICENSE deleted file mode 100644 index 404d657..0000000 --- a/asap-planner/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2025 SketchDB - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/asap-planner/README.md b/asap-planner/README.md deleted file mode 100644 index e3fc3d0..0000000 --- a/asap-planner/README.md +++ /dev/null @@ -1,124 +0,0 @@ -# Controller - -The Controller is ASAP's auto-configuration service that determines optimal sketch parameters based on query workload and SLAs. - -## Purpose - -Given a workload of PromQL queries, the Controller: -1. Analyzes each query to determine which sketch algorithm to use -2. Computes sketch parameters (size, accuracy) based on SLAs -3. Generates `streaming_config.yaml` for asap-sketch-ingest -4. Generates `inference_config.yaml` for QueryEngine - -This automation eliminates manual configuration and ensures sketches meet performance targets. - -## How It Works - -### Input: controller-config.yaml - -The user provides a configuration file describing: -- **Queries** to accelerate -- **Metrics** metadata (labels, cardinality estimates) -- **SLAs** (accuracy, latency targets) (**CURRENTLY IGNORED**) - -**Example:** -```yaml -query_groups: - - id: 1 - queries: - - "quantile by (job) (0.99, http_request_duration_seconds)" - - "sum by (job) (rate(http_requests_total[5m]))" - client_options: - repetitions: 10 - starting_delay: 60 - controller_options: - accuracy_sla: 0.99 # 99% accuracy - latency_sla: 1.0 # 1 second max latency - -metrics: - - metric: "http_request_duration_seconds" - labels: ["job", "instance", "method", "status"] - cardinality: - job: 10 - instance: 100 - method: 5 - status: 4 - - metric: "http_requests_total" - labels: ["job", "instance", "method", "status"] -``` - -### Process: Analyze and Configure - -1. **Parse queries** (`utils/parse_query.py`) - - Extract query type (quantile, sum, avg, etc.) - - Identify aggregation labels - - Determine time range - -2. **Select sketch algorithm** (`utils/logics.py::decide_sketch_type()`) - - Quantile queries → DDSketch or KLL - - Sum/count queries → Simple aggregation - - Consider query patterns and SLAs - -3. **Compute sketch parameters** (`utils/logics.py`) - - Calculate sketch size based on accuracy SLA - - Determine merge strategy for aggregations - - Set up windowing parameters - -4. **Generate configs** - - `streaming_config.yaml` → Describes which sketches to build - - `inference_config.yaml` → Describes how to query sketches - -### Output Files - -**streaming_config.yaml** (for asap-sketch-ingest): -```yaml -sketches: - - metric: "http_request_duration_seconds" - sketch_type: "ddsketch" - parameters: - alpha: 0.01 # 1% relative error - max_num_bins: 2048 - aggregation: - - "job" - window: "1h" -``` - -**inference_config.yaml** (for QueryEngine): -```yaml -sketches: - - metric: "http_request_duration_seconds" - sketch_type: "ddsketch" - labels: ["job"] - kafka_topic: "sketches" -``` - -## Key Files - -**TODO** - -## Configuration Schema - -### controller-config.yaml - -```yaml -query_groups: - - id: # Unique group ID - queries: # List of PromQL queries - - "" - client_options: # Query execution options - repetitions: # How many times to run - starting_delay: # Delay before first run (seconds) - repetition_delay: # Delay between runs (seconds) - query_time_offset: # Time offset for queries (seconds) - controller_options: - accuracy_sla: # 0.0-1.0 (default: 0.99) - latency_sla: # Seconds (default: 1.0) - sketch_type: # Optional: force specific sketch - custom_sketch_params: # Optional: override params - -metrics: - - metric: "" # Prometheus metric name - labels: [] # List of label names - cardinality: # Optional: estimated cardinalities - : -``` diff --git a/asap-planner/classes/SingleQueryConfig.py b/asap-planner/classes/SingleQueryConfig.py deleted file mode 100644 index 5638f10..0000000 --- a/asap-planner/classes/SingleQueryConfig.py +++ /dev/null @@ -1,544 +0,0 @@ -import copy -from loguru import logger - -import promql_parser -from typing import Optional, Tuple, List - -from promql_utilities.ast_matching.PromQLPattern import PromQLPattern, MatchResult -from promql_utilities.ast_matching.PromQLPatternBuilder import PromQLPatternBuilder -from promql_utilities.query_logics.enums import ( - QueryPatternType, - QueryTreatmentType, - CleanupPolicy, -) -from promql_utilities.query_logics.logics import ( - get_is_collapsable, - map_statistic_to_precompute_operator, -) -from promql_utilities.query_logics.parsing import ( - get_metric_and_spatial_filter, - get_statistics_to_compute, -) -from promql_utilities.query_logics.parsing import get_spatial_aggregation_output_labels -from promql_utilities.data_model.KeyByLabelNames import KeyByLabelNames - -from promql_utilities.streaming_config.StreamingAggregationConfig import ( - StreamingAggregationConfig, -) -from utils import logics - -# import utils.promql - -from promql_utilities.streaming_config.MetricConfig import MetricConfig - - -class SingleQueryConfig: - def __init__( - self, - config: dict, - metric_config: MetricConfig, - prometheus_scrape_interval: int, - streaming_engine: str, - sketch_parameters: dict, - ): - self.config = config - self.query = config["query"] - self.query_ast = promql_parser.parse(self.query) - self.t_repeat = int(config["t_repeat"]) - self.prometheus_scrape_interval = prometheus_scrape_interval - self.__dict__.update(config["options"]) - # self.accuracy_sla = float(config["accuracy_sla"]) - # self.latency_sla = float(config["latency_sla"]) - self.metric_config = metric_config - self.streaming_engine = streaming_engine - self.sketch_parameters = sketch_parameters - self.range_duration = config["range_duration"] - self.step = config["step"] - - self.patterns = { - QueryPatternType.ONLY_TEMPORAL: [ - PromQLPattern( - PromQLPatternBuilder.function( - "quantile_over_time", - PromQLPatternBuilder.number(), - PromQLPatternBuilder.matrix_selector( - PromQLPatternBuilder.metric(collect_as="metric"), - collect_as="range_vector", - ), - collect_as="function", - collect_args_as="function_args", - ) - ), - PromQLPattern( - PromQLPatternBuilder.function( - [ - "sum_over_time", - "count_over_time", - "avg_over_time", - "min_over_time", - "max_over_time", - # "stddev_over_time", - # "stdvar_over_time", - "increase", - "rate", - ], - PromQLPatternBuilder.matrix_selector( - PromQLPatternBuilder.metric(collect_as="metric"), - collect_as="range_vector", - ), - collect_as="function", - collect_args_as="function_args", - ) - ), - ], - # TODO: add topk/bottomk - QueryPatternType.ONLY_SPATIAL: [ - PromQLPattern( - PromQLPatternBuilder.aggregation( - [ - "sum", - "count", - "avg", - "quantile", - "min", - "max", - "topk", - # "stddev", - # "stdvar", - ], - PromQLPatternBuilder.metric(collect_as="metric"), - collect_as="aggregation", - ) - ) - ], - # TODO: need some way of specifying pattern using an existing pattern - QueryPatternType.ONE_TEMPORAL_ONE_SPATIAL: [ - PromQLPattern( - PromQLPatternBuilder.aggregation( - [ - "sum", - "count", - "avg", - "quantile", - "min", - "max", - # "stddev", - # "stdvar", - ], - PromQLPatternBuilder.function( - "quantile_over_time", - PromQLPatternBuilder.number(), - PromQLPatternBuilder.matrix_selector( - PromQLPatternBuilder.metric(collect_as="metric"), - collect_as="range_vector", - ), - collect_as="function", - collect_args_as="function_args", - ), - collect_as="aggregation", - ) - ), - PromQLPattern( - PromQLPatternBuilder.aggregation( - [ - "sum", - "count", - "avg", - "quantile", - "min", - "max", - # "stddev", - # "stdvar", - ], - PromQLPatternBuilder.function( - [ - "sum_over_time", - "count_over_time", - "avg_over_time", - "min_over_time", - "max_over_time", - # "stddev_over_time", - # "stdvar_over_time", - "increase", - "rate", - ], - PromQLPatternBuilder.matrix_selector( - PromQLPatternBuilder.metric(collect_as="metric"), - collect_as="range_vector", - ), - collect_as="function", - collect_args_as="function_args", - ), - collect_as="aggregation", - ) - ), - ], - } - - self.query_pattern_type = None - self.query_pattern_match = None - self.query_treatment_type = None - - self.process_query() - - def process_query(self): - query_pattern_type, match = self.match_query_pattern() - - if query_pattern_type and match: - self.query_pattern_type = query_pattern_type - self.query_pattern_match = match - self.query_treatment_type = self.get_query_treatment_type() - logger.debug("Query treatment type: {}", self.query_treatment_type) - else: - # self.logger.warning("Query pattern not supported: %s", self.query) - logger.warning("Query pattern not supported: {}", self.query) - - def should_be_performant(self) -> bool: - if self.query_pattern_type == QueryPatternType.ONLY_TEMPORAL: - # Check quantile_over_time, rate, increase - # Calculate number of data points per key - function_name = self.query_pattern_match.tokens["function"]["name"] - if function_name in ["rate", "increase", "quantile_over_time"]: - num_data_points_per_tumbling_window = ( - self.t_repeat / self.prometheus_scrape_interval - ) - range_duration = int( - self.query_pattern_match.tokens["range_vector"][ - "range" - ].total_seconds() - ) - if num_data_points_per_tumbling_window < 60: - logger.info( - "[Performance Check Failed] num_data_points_per_tumbling_window {} < 60", - num_data_points_per_tumbling_window, - ) - return False - # bound time for merging for quantile_over_time - if function_name == "quantile_over_time": - if range_duration / self.t_repeat > 15: - logger.info( - "[Performance Check Failed] range_duration / t_repeat {} > 15", - range_duration / self.t_repeat, - ) - return False - return True - elif self.query_pattern_type == QueryPatternType.ONLY_SPATIAL: - return True - elif self.query_pattern_type == QueryPatternType.ONE_TEMPORAL_ONE_SPATIAL: - # TODO: might need to add checks here - return True - else: - return True - - def is_supported(self) -> bool: - return ( - self.query_pattern_type is not None and self.query_pattern_match is not None - ) - - def match_query_pattern( - self, - ) -> Tuple[Optional[QueryPatternType], Optional[MatchResult]]: - for pattern_type, patterns in self.patterns.items(): - for pattern in patterns: - match = pattern.matches(self.query_ast, debug=False) - if match: - logger.debug("Matched pattern: {}", pattern_type) - return pattern_type, match - return None, None - - def get_query_treatment_type(self): - assert self.query_pattern_type and self.query_pattern_match - - if ( - self.query_pattern_type == QueryPatternType.ONLY_TEMPORAL - or self.query_pattern_type == QueryPatternType.ONE_TEMPORAL_ONE_SPATIAL - ): - if self.query_pattern_match.tokens["function"]["name"] in [ - "quantile_over_time", - "sum_over_time", - "count_over_time", - "avg_over_time", - ]: - return QueryTreatmentType.APPROXIMATE - else: - return QueryTreatmentType.EXACT - elif self.query_pattern_type == QueryPatternType.ONLY_SPATIAL: - if self.query_pattern_match.tokens["aggregation"]["op"] in [ - "quantile", - "sum", - "count", - "avg", - "topk", - ]: - return QueryTreatmentType.APPROXIMATE - else: - return QueryTreatmentType.EXACT - else: - raise ValueError("Invalid query pattern type") - - def get_streaming_aggregation_configs( - self, - ) -> Tuple[List[StreamingAggregationConfig], int]: - assert ( - self.query_pattern_type - and self.query_pattern_match - and self.query_treatment_type - ) - - template_config = StreamingAggregationConfig() - template_config.aggregationId = -1 - # template_config.metric = self.query_pattern_match.tokens["metric"]["name"] - - # setting spatial filter - # if self.query_pattern_match.tokens["metric"]["labels"].matchers: - # template_config.spatialFilter = ( - # self.query_pattern_match.tokens["metric"]["ast"] - # .prettify() - # .split("{")[1] - # .split("}")[0] - # ) - # template_config.metric = template_config.metric.split("{")[0] - # else: - # template_config.spatialFilter = "" - - template_config.metric, template_config.spatialFilter = ( - get_metric_and_spatial_filter(self.query_pattern_match) - ) - - statistics_to_compute = get_statistics_to_compute( - self.query_pattern_type, self.query_pattern_match - ) - - # if ( - # self.query_pattern_type == QueryPatternType.ONLY_TEMPORAL - # or self.query_pattern_type == QueryPatternType.ONE_TEMPORAL_ONE_SPATIAL - # ): - # statistic_to_compute = self.query_pattern_match.tokens["function"][ - # "name" - # ].split("_")[0] - # template_config.tumblingWindowSize = self.t_repeat - # elif self.query_pattern_type == QueryPatternType.ONLY_SPATIAL: - # statistic_to_compute = self.query_pattern_match.tokens["aggregation"]["op"] - # template_config.tumblingWindowSize = self.prometheus_scrape_interval - # else: - # raise ValueError("Invalid query pattern type") - - configs = [] - - for statistic_to_compute in statistics_to_compute: - - aggregation_type, aggregation_sub_type = ( - map_statistic_to_precompute_operator( - statistic_to_compute, self.query_treatment_type - ) - ) - - # NEW: Set window parameters (auto-decides sliding vs tumbling based on query type) - # Issue #236: Sliding windows for ONLY_TEMPORAL queries (except DeltaSetAggregator) - # Issue #329: For range queries, use min(t_repeat, step) as effective repeat interval - logics.set_window_parameters( - self.query_pattern_type, - self.query_pattern_match, - self.t_repeat, - self.prometheus_scrape_interval, - aggregation_type, - template_config, - self.step, - ) - - # for aggregation_type, aggregation_sub_type in list_of_precompute_operators: - - all_labels = self.metric_config.config[template_config.metric] - - if self.query_pattern_type == QueryPatternType.ONLY_TEMPORAL: - template_config.labels["rollup"] = KeyByLabelNames([]) - - logics.set_subpopulation_labels( - statistic_to_compute, aggregation_type, all_labels, template_config - ) - - # if logics.does_precompute_operator_support_subpopulations( - # statistic_to_compute, aggregation_type - # ): - # template_config.labels["grouping"] = KeyByLabelNames([]) - # template_config.labels["aggregated"] = copy.deepcopy( - # self.metric_config.config[template_config.metric] - # ) - # else: - # template_config.labels["grouping"] = copy.deepcopy( - # self.metric_config.config[template_config.metric] - # ) - # template_config.labels["aggregated"] = KeyByLabelNames([]) - - elif self.query_pattern_type == QueryPatternType.ONLY_SPATIAL: - # aggregation_modifier = self.query_pattern_match.tokens["aggregation"][ - # "modifier" - # ] - # aggregation_modifier_labels = None - # if aggregation_modifier.type == aggregation_modifier.type.By: - # aggregation_modifier_labels = KeyByLabelNames( - # aggregation_modifier.labels - # ) - # elif aggregation_modifier.type == aggregation_modifier.type.Without: - # aggregation_modifier_labels = self.metric_config.config[ - # template_config.metric - # ] - KeyByLabelNames(aggregation_modifier.labels) - # else: - # raise ValueError("Invalid aggregation modifier") - - spatial_aggregation_output_labels = ( - get_spatial_aggregation_output_labels( - self.query_pattern_match, all_labels - ) - ) - - template_config.labels["rollup"] = ( - all_labels - spatial_aggregation_output_labels - ) - - logics.set_subpopulation_labels( - statistic_to_compute, - aggregation_type, - spatial_aggregation_output_labels, - template_config, - ) - - # if logics.does_precompute_operator_support_subpopulations( - # statistic_to_compute, aggregation_type - # ): - # template_config.labels["aggregated"] = copy.deepcopy( - # aggregation_modifier_labels - # ) - # template_config.labels["grouping"] = KeyByLabelNames([]) - # else: - # template_config.labels["aggregated"] = KeyByLabelNames([]) - # template_config.labels["grouping"] = copy.deepcopy( - # aggregation_modifier_labels - # ) - - elif self.query_pattern_type == QueryPatternType.ONE_TEMPORAL_ONE_SPATIAL: - collapsable = get_is_collapsable( - self.query_pattern_match.tokens["function"]["name"], - self.query_pattern_match.tokens["aggregation"]["op"], - ) - - if not collapsable: - template_config.labels["rollup"] = KeyByLabelNames([]) - - logics.set_subpopulation_labels( - statistic_to_compute, - aggregation_type, - all_labels, - template_config, - ) - - # if logics.does_precompute_operator_support_subpopulations( - # statistic_to_compute, aggregation_type - # ): - # template_config.labels["grouping"] = KeyByLabelNames([]) - # template_config.labels["aggregated"] = copy.deepcopy( - # self.metric_config.config[template_config.metric] - # ) - # else: - # template_config.labels["grouping"] = copy.deepcopy( - # self.metric_config.config[template_config.metric] - # ) - # template_config.labels["aggregated"] = KeyByLabelNames([]) - else: - # aggregation_modifier = self.query_pattern_match.tokens[ - # "aggregation" - # ]["modifier"] - # aggregation_modifier_labels = None - # if aggregation_modifier.type == aggregation_modifier.type.By: - # aggregation_modifier_labels = KeyByLabelNames( - # aggregation_modifier.labels - # ) - # elif aggregation_modifier.type == aggregation_modifier.type.Without: - # aggregation_modifier_labels = self.metric_config.config[ - # template_config.metric - # ] - KeyByLabelNames(aggregation_modifier.labels) - # else: - # raise ValueError("Invalid aggregation modifier") - - spatial_aggregation_output_labels = ( - get_spatial_aggregation_output_labels( - self.query_pattern_match, all_labels - ) - ) - - template_config.labels["rollup"] = ( - all_labels - spatial_aggregation_output_labels - ) - - logics.set_subpopulation_labels( - statistic_to_compute, - aggregation_type, - spatial_aggregation_output_labels, - template_config, - ) - - # if logics.does_precompute_operator_support_subpopulations( - # statistic_to_compute, aggregation_type - # ): - # template_config.labels["aggregated"] = copy.deepcopy( - # aggregation_modifier_labels - # ) - # template_config.labels["grouping"] = KeyByLabelNames([]) - # else: - # template_config.labels["aggregated"] = KeyByLabelNames([]) - # template_config.labels["grouping"] = copy.deepcopy( - # aggregation_modifier_labels - # ) - - config = copy.deepcopy(template_config) - config.aggregationType = aggregation_type - config.aggregationSubType = aggregation_sub_type - config.parameters = logics.get_precompute_operator_parameters( - aggregation_type, - aggregation_sub_type, - self.query_pattern_match, - self.sketch_parameters, - ) - - # TODO: remove this hardcoding once promql_utilities.query_logics has updated logic - # https://github.com/SketchDB/Utilities/issues/44 - if aggregation_type in ["CountMinSketch", "HydraKLL"]: - # add another precompute operator for DeltaSetAggregator - delta_set_config = copy.deepcopy(template_config) - if ( - self.streaming_engine == "flink" - or self.streaming_engine == "arroyo" - ): - delta_set_config.aggregationType = "DeltaSetAggregator" - else: - raise ValueError( - f"Unsupported streaming engine: {self.streaming_engine}" - ) - delta_set_config.aggregationSubType = "" - delta_set_config.parameters = logics.get_precompute_operator_parameters( - delta_set_config.aggregationType, - delta_set_config.aggregationSubType, - self.query_pattern_match, - self.sketch_parameters, - ) - configs.append(delta_set_config) - configs.append(config) - - # Calculate cleanup parameter based on cleanup policy and window type - # This must be done AFTER set_window_parameters() has been called - cleanup_policy = self.config["cleanup_policy"] - if cleanup_policy == CleanupPolicy.NO_CLEANUP: - logger.info("Cleanup policy is NO_CLEANUP - cleanup_param will be None") - cleanup_param = None - else: - cleanup_param = logics.get_cleanup_param( - cleanup_policy=cleanup_policy, - query_pattern_type=self.query_pattern_type, - query_pattern_match=self.query_pattern_match, - t_repeat=self.t_repeat, - window_type=template_config.windowType, - range_duration=self.range_duration, - step=self.step, - ) - - return configs, cleanup_param diff --git a/asap-planner/classes/WorkloadConfig.py b/asap-planner/classes/WorkloadConfig.py deleted file mode 100644 index c77b3fe..0000000 --- a/asap-planner/classes/WorkloadConfig.py +++ /dev/null @@ -1,17 +0,0 @@ -from typing import List - -from classes.SingleQueryConfig import SingleQueryConfig - - -class WorkloadConfig: - def __init__(self, singe_query_configs: List[SingleQueryConfig]): - pass - - def remove_common_subexpressions(self): - pass - - def get_streaming_config(self): - pass - - def get_estimation_config(self): - pass diff --git a/asap-planner/docker-compose.yml.j2 b/asap-planner/docker-compose.yml.j2 deleted file mode 100644 index d36368f..0000000 --- a/asap-planner/docker-compose.yml.j2 +++ /dev/null @@ -1,15 +0,0 @@ -services: - controller: - image: sketchdb-controller:latest - container_name: {{ container_name }} - volumes: - - {{ input_config_path }}:/app/input/config.yaml:ro - - {{ output_dir }}:/app/output - command: [ - "--input_config", "/app/input/config.yaml", - "--output_dir", "/app/output", - "--prometheus_scrape_interval", "{{ prometheus_scrape_interval }}", - "--streaming_engine", "{{ streaming_engine }}"{% if punting %}, - "--enable-punting"{% endif %} - ] - restart: no diff --git a/asap-planner/installation/install.sh b/asap-planner/installation/install.sh deleted file mode 100755 index 7fe93cd..0000000 --- a/asap-planner/installation/install.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/bash - -set -e - -THIS_DIR=$(dirname "$(readlink -f "${BASH_SOURCE[0]}")") -PARENT_DIR=$(dirname "$THIS_DIR") - -echo "Building Controller Docker image..." -cd "$PARENT_DIR" -docker build . -f Dockerfile -t sketchdb-controller:latest - -echo "Controller Docker image built successfully: sketchdb-controller:latest" diff --git a/asap-planner/main_controller.py b/asap-planner/main_controller.py deleted file mode 100644 index 471697b..0000000 --- a/asap-planner/main_controller.py +++ /dev/null @@ -1,173 +0,0 @@ -import os -import yaml -import argparse -from loguru import logger - -from classes.SingleQueryConfig import SingleQueryConfig -from promql_utilities.streaming_config.MetricConfig import MetricConfig -from promql_utilities.query_logics.enums import CleanupPolicy - - -def read_config(config_path) -> dict: - config_yaml = None - with open(config_path, "r") as f: - config_yaml = yaml.safe_load(f) - return config_yaml - - -def validate_config(config_yaml): - # NOTE: only allow unique query strings for now - query_strings = set() - for query_group_yaml in config_yaml["query_groups"]: - for query_string in query_group_yaml["queries"]: - if query_string in query_strings: - raise ValueError(f"Duplicate query string: {query_string}") - query_strings.add(query_string) - - -def main(args): - input_config_yaml = read_config(args.input_config) - - validate_config(input_config_yaml) - - metric_config = MetricConfig.from_list(input_config_yaml["metrics"]) - - # Read cleanup policy configuration (default to READ_BASED if not specified) - cleanup_policy_str = input_config_yaml.get("aggregate_cleanup", {}).get( - "policy", "read_based" - ) - try: - cleanup_policy = CleanupPolicy(cleanup_policy_str) - except ValueError: - valid_policies = [p.value for p in CleanupPolicy] - raise ValueError( - f"Invalid cleanup policy: '{cleanup_policy_str}'. " - f"Valid options: {valid_policies}" - ) - logger.info("Cleanup policy: {}", cleanup_policy.value) - - # Read sketch parameters configuration (use None to apply defaults in logics.py) - sketch_parameters = input_config_yaml.get("sketch_parameters", None) - if sketch_parameters: - logger.info("Using custom sketch parameters: {}", sketch_parameters) - else: - logger.info("Using default sketch parameters") - - streaming_aggregation_configs_map = {} - query_aggregation_config_keys_map = {} - - for query_group_yaml in input_config_yaml["query_groups"]: - for query_string in query_group_yaml["queries"]: - single_query_config_yaml = { - "query": query_string, - "t_repeat": query_group_yaml["repetition_delay"], - "options": query_group_yaml["controller_options"], - "cleanup_policy": cleanup_policy, - "range_duration": args.range_duration, - "step": args.step, - } - - logger.debug("Processing query {}", query_string) - - single_query_config = SingleQueryConfig( - single_query_config_yaml, - metric_config, - args.prometheus_scrape_interval, - args.streaming_engine, - sketch_parameters, - ) - - should_process_query = single_query_config.is_supported() - if args.enable_punting: - should_process_query = ( - should_process_query and single_query_config.should_be_performant() - ) - - if should_process_query: - query_aggregation_config_keys_map[single_query_config.query] = [] - current_configs, num_aggregates_to_retain = ( - single_query_config.get_streaming_aggregation_configs() - ) - - for current_config in current_configs: - key = current_config.get_identifying_key() - query_aggregation_config_keys_map[single_query_config.query].append( - (key, num_aggregates_to_retain) - ) - if key not in streaming_aggregation_configs_map: - streaming_aggregation_configs_map[key] = current_config - else: - logger.warning("Unsupported query") - - for idx, k in enumerate(streaming_aggregation_configs_map.keys()): - streaming_aggregation_configs_map[k].aggregationId = idx + 1 - - streaming_config = { - "aggregations": [ - config.to_dict(metric_config, "promql") - for config in streaming_aggregation_configs_map.values() - ], - "metrics": metric_config.config, - } - inference_config = { - "cleanup_policy": {"name": cleanup_policy.value}, - "queries": [], - "metrics": metric_config.config, - } - for query, streaming_config_keys in query_aggregation_config_keys_map.items(): - inference_config["queries"].append({"query": query, "aggregations": []}) - for streaming_config_key in streaming_config_keys: - aggregation_entry = { - "aggregation_id": streaming_aggregation_configs_map[ - streaming_config_key[0] - ].aggregationId, - } - # Add the appropriate parameter based on cleanup policy - cleanup_value = streaming_config_key[1] - if ( - cleanup_policy == CleanupPolicy.CIRCULAR_BUFFER - and cleanup_value is not None - ): - aggregation_entry["num_aggregates_to_retain"] = cleanup_value - elif ( - cleanup_policy == CleanupPolicy.READ_BASED and cleanup_value is not None - ): - aggregation_entry["read_count_threshold"] = cleanup_value - # For NO_CLEANUP, we don't add any parameter - inference_config["queries"][-1]["aggregations"].append(aggregation_entry) - - os.makedirs(args.output_dir, exist_ok=True) - with open(f"{args.output_dir}/streaming_config.yaml", "w") as f: - f.write(yaml.dump(streaming_config)) - - with open(f"{args.output_dir}/inference_config.yaml", "w") as f: - f.write(yaml.dump(inference_config)) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--input_config", type=str, required=True) - parser.add_argument("--output_dir", type=str, required=True) - parser.add_argument("--prometheus_scrape_interval", type=int, required=True) - parser.add_argument( - "--streaming_engine", type=str, choices=["flink", "arroyo"], required=True - ) - parser.add_argument( - "--enable-punting", - action="store_true", - help="Enable query punting based on performance heuristics", - ) - parser.add_argument( - "--range-duration", - type=int, - default=0, - help="Range query duration (end - start) in seconds. 0 for instant queries.", - ) - parser.add_argument( - "--step", - type=int, - default=0, - help="Range query step in seconds. Required if range-duration > 0.", - ) - args = parser.parse_args() - main(args) diff --git a/asap-planner/requirements.txt b/asap-planner/requirements.txt deleted file mode 100644 index ee973aa..0000000 --- a/asap-planner/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -loguru==0.7.3 -promql_parser==0.5.0 -PyYAML==6.0.2 diff --git a/asap-planner/utils/logics.py b/asap-planner/utils/logics.py deleted file mode 100644 index 31bb761..0000000 --- a/asap-planner/utils/logics.py +++ /dev/null @@ -1,396 +0,0 @@ -import copy -import math -from loguru import logger - -from promql_utilities.data_model.KeyByLabelNames import KeyByLabelNames -from promql_utilities.query_logics.enums import QueryPatternType, CleanupPolicy -from promql_utilities.ast_matching.PromQLPattern import MatchResult -from promql_utilities.query_logics.logics import ( - does_precompute_operator_support_subpopulations, -) - -CMS_WITH_HEAP_MULT = 4 - -# Default sketch parameters for backward compatibility -DEFAULT_SKETCH_PARAMETERS = { - "CountMinSketch": {"depth": 3, "width": 1024}, - "CountMinSketchWithHeap": {"depth": 3, "width": 1024, "heap_multiplier": 4}, - "DatasketchesKLL": {"K": 20}, - "HydraKLL": {"row_num": 3, "col_num": 1024, "k": 20}, -} - - -def get_effective_repeat(t_repeat: int, step: int) -> int: - """ - Calculate effective repeat interval for range queries. - - For range queries (step > 0), use the smaller of t_repeat and step to ensure - we produce aggregates frequently enough to support the query step size. - For instant queries (step = 0), use t_repeat. - """ - return min(t_repeat, step) if step > 0 else t_repeat - - -# TODO: -# We only show the logic of `get_precompute_operator_parameters` here. -# Semantics for topk query will be added in later PRs. -def get_precompute_operator_parameters( - aggregation_type: str, - aggregation_sub_type: str, - query_pattern_match: MatchResult, - sketch_parameters: dict, -) -> dict: - # Allow partial overrides: use provided parameters, fall back to defaults per sketch type - if sketch_parameters is None: - sketch_parameters = {} - - if aggregation_type in [ - "Increase", - "MinMax", - "Sum", - "MultipleIncrease", - "MultipleMinMax", - "MultipleSum", - "DeltaSetAggregator", - "SetAggregator", - ]: - return {} - elif aggregation_type == "CountMinSketch": - params = sketch_parameters.get( - "CountMinSketch", DEFAULT_SKETCH_PARAMETERS["CountMinSketch"] - ) - return {"depth": params["depth"], "width": params["width"]} - elif aggregation_type == "CountMinSketchWithHeap": - if aggregation_sub_type == "topk": - if "aggregation" not in query_pattern_match.tokens: - raise ValueError( - f"{aggregation_sub_type} query missing aggregator in the match tokens" - ) - if "param" not in query_pattern_match.tokens["aggregation"]: - raise ValueError( - f"{aggregation_sub_type} query missing required 'k' parameter" - ) - k = int(query_pattern_match.tokens["aggregation"]["param"].val) - params = sketch_parameters.get( - "CountMinSketchWithHeap", - DEFAULT_SKETCH_PARAMETERS["CountMinSketchWithHeap"], - ) - heap_mult = params.get("heap_multiplier", CMS_WITH_HEAP_MULT) - return { - "depth": params["depth"], - "width": params["width"], - "heapsize": k * heap_mult, - } - else: - raise ValueError( - f"Aggregation sub-type {aggregation_sub_type} for CountMinSketchWithHeap not supported" - ) - elif aggregation_type == "DatasketchesKLL": - params = sketch_parameters.get( - "DatasketchesKLL", DEFAULT_SKETCH_PARAMETERS["DatasketchesKLL"] - ) - return {"K": params["K"]} - elif aggregation_type == "HydraKLL": - params = sketch_parameters.get( - "HydraKLL", DEFAULT_SKETCH_PARAMETERS["HydraKLL"] - ) - return { - "row_num": params["row_num"], - "col_num": params["col_num"], - "k": params["k"], - } - # elif aggregation_type == "UnivMon": - # return {"depth": 3, "width": 2048, "levels": 16} - else: - raise NotImplementedError(f"Aggregation type {aggregation_type} not supported") - - -def get_cleanup_param( - cleanup_policy: CleanupPolicy, - query_pattern_type, - query_pattern_match, - t_repeat: int, - window_type: str, - range_duration: int, - step: int, -) -> int: - """ - Calculate cleanup parameter based on cleanup policy and range query params. - - Sliding windows (both policies): range_duration / step + 1 - Tumbling circular_buffer: (T_lookback + range_duration) / min(T_repeat, step) - Tumbling read_based: (T_lookback / min(T_repeat, step)) * (range_duration / step + 1) - - For ONLY_SPATIAL queries, T_lookback = T_repeat. - For instant queries, range_duration = 0 and effective_repeat = T_repeat. - - Args: - cleanup_policy: CleanupPolicy.CIRCULAR_BUFFER or CleanupPolicy.READ_BASED - query_pattern_type: QueryPatternType enum - query_pattern_match: MatchResult with query tokens - t_repeat: Query repeat interval in seconds - window_type: "sliding" or "tumbling" - range_duration: end - start in seconds (0 for instant queries) - step: Range query step in seconds (required if range_duration > 0) - - Raises: - ValueError: If exactly one of range_duration or step is zero - """ - # Validation: range_duration and step must both be zero (instant) or both non-zero (range) - if (range_duration == 0) != (step == 0): - raise ValueError( - f"range_duration and step must both be 0 (instant query) or both > 0 (range query). " - f"Got range_duration={range_duration}, step={step}" - ) - - is_range_query = step > 0 - - # For ONLY_SPATIAL, T_lookback = T_repeat - if query_pattern_type == QueryPatternType.ONLY_SPATIAL: - t_lookback = t_repeat - else: - t_lookback = int( - query_pattern_match.tokens["range_vector"]["range"].total_seconds() - ) - - # For sliding windows: range_duration / step + 1 (same for both policies) - if window_type == "sliding": - if is_range_query: - result = range_duration // step + 1 - else: - result = 1 # instant query - logger.debug( - f"Sliding window mode: cleanup_param = {result} " - f"(range_duration={range_duration}s, step={step}s)" - ) - return result - - # Tumbling window calculations - effective_repeat = get_effective_repeat(t_repeat, step) - - # We use ceiling division because even if the time span doesn't fully fill - # a bucket, we still need that bucket to cover the partial data. - # E.g., if T_lookback=10s and effective_repeat=100s, we still need 1 bucket. - if cleanup_policy == CleanupPolicy.CIRCULAR_BUFFER: - # ceil((T_lookback + range_duration) / effective_repeat) - result = math.ceil((t_lookback + range_duration) / effective_repeat) - elif cleanup_policy == CleanupPolicy.READ_BASED: - # ceil(T_lookback / effective_repeat) * (range_duration / step + 1) - lookback_buckets = math.ceil(t_lookback / effective_repeat) - if is_range_query: - num_steps = range_duration // step + 1 - else: - num_steps = 1 # instant query - result = lookback_buckets * num_steps - else: - raise ValueError(f"Invalid cleanup policy: {cleanup_policy}") - - logger.debug( - f"Tumbling window mode ({cleanup_policy.value}): cleanup_param = {result} " - f"(t_lookback={t_lookback}s, t_repeat={t_repeat}s, " - f"range_duration={range_duration}s, step={step}s)" - ) - return result - - -def should_use_sliding_window(query_pattern_type, aggregation_type): - """ - Decide if sliding windows should be used based on query type and aggregation type. - - For Issue #236: Use sliding windows for ALL ONLY_TEMPORAL queries except DeltaSetAggregator. - This eliminates merging overhead in QueryEngine at the cost of more computation in Arroyo. - - Args: - query_pattern_type: ONLY_TEMPORAL, ONLY_SPATIAL, or ONE_TEMPORAL_ONE_SPATIAL - aggregation_type: Type of aggregation (e.g., 'DatasketchesKLL', 'Sum', etc.) - - Returns: - bool: True if sliding windows should be used - """ - # NOTE: returning False since sliding window pipelines are causing arroyo to crash - return False - # Only use sliding for ONLY_TEMPORAL queries (not ONE_TEMPORAL_ONE_SPATIAL or ONLY_SPATIAL) - if query_pattern_type != QueryPatternType.ONLY_TEMPORAL: - logger.debug( - f"Query pattern {query_pattern_type} not eligible for sliding windows " - f"(only ONLY_TEMPORAL supported)" - ) - return False - - # Explicitly exclude DeltaSetAggregator (paired with CMS but needs tumbling) - if aggregation_type == "DeltaSetAggregator": - logger.debug("DeltaSetAggregator excluded from sliding windows") - return False - - # All other ONLY_TEMPORAL aggregations use sliding windows - logger.info( - f"Aggregation type '{aggregation_type}' with {query_pattern_type} -> SLIDING windows" - ) - return True - - -def set_window_parameters( - query_pattern_type, - query_pattern_match, - t_repeat, - prometheus_scrape_interval, - aggregation_type, - template_config, - step: int, -): - """ - Set window parameters for streaming aggregation config. - Auto-decides between sliding and tumbling windows based on query type and aggregation cost. - - For ONLY_TEMPORAL queries with expensive aggregations (KLL, CMS): - - Uses SLIDING windows: windowSize = range duration, slideInterval = effective_repeat - - This reduces QueryEngine latency by avoiding merges (Arroyo does more work upfront) - - For other queries: - - Uses TUMBLING windows: windowSize = slideInterval = effective_repeat - - This is the original behavior - - For range queries (step > 0), effective_repeat = min(t_repeat, step). - For instant queries (step = 0), effective_repeat = t_repeat. - - Args: - query_pattern_type: Pattern type (ONLY_TEMPORAL, ONLY_SPATIAL, ONE_TEMPORAL_ONE_SPATIAL) - query_pattern_match: Matched PromQL pattern containing query metadata - t_repeat: Query repeat interval in seconds - prometheus_scrape_interval: Scrape interval in seconds - aggregation_type: Type of aggregation operator - template_config: StreamingAggregationConfig to update - step: Range query step in seconds (0 for instant queries) - """ - # For range queries, use min(t_repeat, step) as the effective repeat interval - effective_repeat = get_effective_repeat(t_repeat, step) - - # Decide if we should use sliding windows - use_sliding_window = should_use_sliding_window(query_pattern_type, aggregation_type) - - if use_sliding_window: - # SLIDING WINDOW for ONLY_TEMPORAL queries with expensive aggregations - logger.info( - f"Configuring SLIDING WINDOW for {query_pattern_type} " - f"with {aggregation_type}" - ) - - if query_pattern_type == QueryPatternType.ONLY_TEMPORAL: - # Window size = range duration (e.g., 15m = 900s) - range_seconds = int( - query_pattern_match.tokens["range_vector"]["range"].total_seconds() - ) - - # Check if this is actually a tumbling window (windowSize == slideInterval) - if range_seconds == effective_repeat: - logger.info( - f"Detected windowSize == slideInterval ({range_seconds}s). " - f"Using tumbling window instead of sliding for efficiency." - ) - template_config.windowSize = effective_repeat - template_config.slideInterval = effective_repeat - template_config.windowType = "tumbling" - template_config.tumblingWindowSize = effective_repeat - else: - # True sliding window - template_config.windowSize = range_seconds - template_config.slideInterval = effective_repeat - template_config.windowType = "sliding" - - logger.info( - f"Sliding window params: windowSize={range_seconds}s, " - f"slideInterval={effective_repeat}s " - f"(each window has {range_seconds} seconds of data, slides every {effective_repeat}s)" - ) - - # Set deprecated field for backward compatibility - template_config.tumblingWindowSize = effective_repeat - else: - # This should never be reached due to should_use_sliding_window() check - assert False, ( - f"should_use_sliding_window returned True for {query_pattern_type}, " - f"but sliding windows only supported for ONLY_TEMPORAL" - ) - else: - # TUMBLING WINDOW (existing logic) - logger.info( - f"Configuring TUMBLING WINDOW for {query_pattern_type} " - f"with {aggregation_type}" - ) - _set_tumbling_window_parameters( - query_pattern_type, - effective_repeat, - prometheus_scrape_interval, - template_config, - ) - - -def _set_tumbling_window_parameters( - query_pattern_type, effective_repeat, prometheus_scrape_interval, template_config -): - """ - Set tumbling window parameters. - - Args: - query_pattern_type: Pattern type (ONLY_TEMPORAL, ONLY_SPATIAL, ONE_TEMPORAL_ONE_SPATIAL) - effective_repeat: Effective repeat interval (min(t_repeat, step) for range queries) - prometheus_scrape_interval: Scrape interval in seconds - template_config: StreamingAggregationConfig to update - """ - if ( - query_pattern_type == QueryPatternType.ONLY_TEMPORAL - or query_pattern_type == QueryPatternType.ONE_TEMPORAL_ONE_SPATIAL - ): - template_config.windowSize = effective_repeat - template_config.slideInterval = effective_repeat - template_config.windowType = "tumbling" - template_config.tumblingWindowSize = effective_repeat - - logger.debug( - f"Tumbling window params: windowSize={effective_repeat}s, slideInterval={effective_repeat}s" - ) - elif query_pattern_type == QueryPatternType.ONLY_SPATIAL: - template_config.windowSize = prometheus_scrape_interval - template_config.slideInterval = prometheus_scrape_interval - template_config.windowType = "tumbling" - template_config.tumblingWindowSize = prometheus_scrape_interval - - logger.debug( - f"Tumbling window params: windowSize={prometheus_scrape_interval}s, " - f"slideInterval={prometheus_scrape_interval}s" - ) - else: - raise ValueError("Invalid query pattern type") - - -# COMMENTED OUT - Original function kept for rollback -# Issue #236: Replaced with set_window_parameters() to support sliding windows -# -# def set_tumbling_window_size( -# query_pattern_type, t_repeat, prometheus_scrape_interval, template_config -# ): -# if ( -# query_pattern_type == QueryPatternType.ONLY_TEMPORAL -# or query_pattern_type == QueryPatternType.ONE_TEMPORAL_ONE_SPATIAL -# ): -# template_config.tumblingWindowSize = t_repeat -# elif query_pattern_type == QueryPatternType.ONLY_SPATIAL: -# template_config.tumblingWindowSize = prometheus_scrape_interval -# else: -# raise ValueError("Invalid query pattern type") - - -def set_subpopulation_labels( - statistic_to_compute, - aggregation_type, - subpopulation_labels: KeyByLabelNames, - template_config, -): - if does_precompute_operator_support_subpopulations( - statistic_to_compute, aggregation_type - ): - template_config.labels["grouping"] = KeyByLabelNames([]) - template_config.labels["aggregated"] = copy.deepcopy(subpopulation_labels) - else: - template_config.labels["grouping"] = copy.deepcopy(subpopulation_labels) - template_config.labels["aggregated"] = KeyByLabelNames([]) diff --git a/asap-planner/utils/test_logics.py b/asap-planner/utils/test_logics.py deleted file mode 100644 index b44c889..0000000 --- a/asap-planner/utils/test_logics.py +++ /dev/null @@ -1,268 +0,0 @@ -"""Unit tests for logics.py cleanup parameter calculations.""" - -import pytest -from datetime import timedelta -from unittest.mock import MagicMock - -from promql_utilities.query_logics.enums import QueryPatternType, CleanupPolicy -from logics import get_cleanup_param - - -def create_mock_match(range_seconds: int) -> MagicMock: - """Create a mock match result with the given range duration.""" - mock = MagicMock() - mock.tokens = {"range_vector": {"range": timedelta(seconds=range_seconds)}} - return mock - - -class TestGetCleanupParamValidation: - """Tests for validation logic in get_cleanup_param.""" - - def test_range_duration_without_step_raises_error(self): - """range_duration > 0 with step = 0 is invalid.""" - mock_match = create_mock_match(900) - with pytest.raises(ValueError, match="must both be 0.*or both > 0"): - get_cleanup_param( - cleanup_policy=CleanupPolicy.CIRCULAR_BUFFER, - query_pattern_type=QueryPatternType.ONLY_TEMPORAL, - query_pattern_match=mock_match, - t_repeat=30, - window_type="tumbling", - range_duration=3600, - step=0, - ) - - def test_step_without_range_duration_raises_error(self): - """step > 0 with range_duration = 0 is invalid.""" - mock_match = create_mock_match(900) - with pytest.raises(ValueError, match="must both be 0.*or both > 0"): - get_cleanup_param( - cleanup_policy=CleanupPolicy.CIRCULAR_BUFFER, - query_pattern_type=QueryPatternType.ONLY_TEMPORAL, - query_pattern_match=mock_match, - t_repeat=30, - window_type="tumbling", - range_duration=0, - step=60, - ) - - def test_instant_query_both_zero_is_valid(self): - """Instant queries: both range_duration=0 and step=0 is valid.""" - mock_match = create_mock_match(900) - result = get_cleanup_param( - cleanup_policy=CleanupPolicy.CIRCULAR_BUFFER, - query_pattern_type=QueryPatternType.ONLY_TEMPORAL, - query_pattern_match=mock_match, - t_repeat=30, - window_type="tumbling", - range_duration=0, - step=0, - ) - assert result == 30 # ceil(900 / 30) = 30 - - -class TestSlidingWindowCleanupParam: - """Tests for sliding window cleanup parameter calculations.""" - - def test_sliding_instant_query(self): - """Sliding window instant query returns 1.""" - mock_match = create_mock_match(900) - result = get_cleanup_param( - cleanup_policy=CleanupPolicy.CIRCULAR_BUFFER, - query_pattern_type=QueryPatternType.ONLY_TEMPORAL, - query_pattern_match=mock_match, - t_repeat=30, - window_type="sliding", - range_duration=0, - step=0, - ) - assert result == 1 - - def test_sliding_range_query(self): - """Sliding window: range_duration / step + 1.""" - mock_match = create_mock_match(900) - # range_duration=3600, step=60 -> 3600/60 + 1 = 61 - result = get_cleanup_param( - cleanup_policy=CleanupPolicy.CIRCULAR_BUFFER, - query_pattern_type=QueryPatternType.ONLY_TEMPORAL, - query_pattern_match=mock_match, - t_repeat=30, - window_type="sliding", - range_duration=3600, - step=60, - ) - assert result == 61 - - def test_sliding_same_for_both_policies(self): - """Sliding windows use same formula for both policies.""" - mock_match = create_mock_match(900) - result_cb = get_cleanup_param( - cleanup_policy=CleanupPolicy.CIRCULAR_BUFFER, - query_pattern_type=QueryPatternType.ONLY_TEMPORAL, - query_pattern_match=mock_match, - t_repeat=30, - window_type="sliding", - range_duration=3600, - step=60, - ) - result_rb = get_cleanup_param( - cleanup_policy=CleanupPolicy.READ_BASED, - query_pattern_type=QueryPatternType.ONLY_TEMPORAL, - query_pattern_match=mock_match, - t_repeat=30, - window_type="sliding", - range_duration=3600, - step=60, - ) - assert result_cb == result_rb == 61 - - -class TestTumblingCircularBufferCleanupParam: - """Tests for tumbling window + circular_buffer cleanup parameter.""" - - def test_instant_query(self): - """Instant query: T_lookback / T_repeat.""" - mock_match = create_mock_match(900) # 15 minutes - # T_lookback=900, T_repeat=30 -> 900/30 = 30 - result = get_cleanup_param( - cleanup_policy=CleanupPolicy.CIRCULAR_BUFFER, - query_pattern_type=QueryPatternType.ONLY_TEMPORAL, - query_pattern_match=mock_match, - t_repeat=30, - window_type="tumbling", - range_duration=0, - step=0, - ) - assert result == 30 - - def test_range_query(self): - """Range query: (T_lookback + range_duration) / min(T_repeat, step).""" - mock_match = create_mock_match(900) # 15 minutes - # T_lookback=900, range_duration=3600, T_repeat=30, step=60 - # effective_repeat = min(30, 60) = 30 - # (900 + 3600) / 30 = 150 - result = get_cleanup_param( - cleanup_policy=CleanupPolicy.CIRCULAR_BUFFER, - query_pattern_type=QueryPatternType.ONLY_TEMPORAL, - query_pattern_match=mock_match, - t_repeat=30, - window_type="tumbling", - range_duration=3600, - step=60, - ) - assert result == 150 - - def test_step_smaller_than_t_repeat(self): - """When step < T_repeat, use step as effective_repeat.""" - mock_match = create_mock_match(900) - # T_lookback=900, range_duration=3600, T_repeat=60, step=30 - # effective_repeat = min(60, 30) = 30 - # (900 + 3600) / 30 = 150 - result = get_cleanup_param( - cleanup_policy=CleanupPolicy.CIRCULAR_BUFFER, - query_pattern_type=QueryPatternType.ONLY_TEMPORAL, - query_pattern_match=mock_match, - t_repeat=60, - window_type="tumbling", - range_duration=3600, - step=30, - ) - assert result == 150 - - -class TestTumblingReadBasedCleanupParam: - """Tests for tumbling window + read_based cleanup parameter.""" - - def test_instant_query(self): - """Instant query: (T_lookback / T_repeat) * 1.""" - mock_match = create_mock_match(900) # 15 minutes - # T_lookback=900, T_repeat=30 -> (900/30) * 1 = 30 - result = get_cleanup_param( - cleanup_policy=CleanupPolicy.READ_BASED, - query_pattern_type=QueryPatternType.ONLY_TEMPORAL, - query_pattern_match=mock_match, - t_repeat=30, - window_type="tumbling", - range_duration=0, - step=0, - ) - assert result == 30 - - def test_range_query(self): - """Range query: (T_lookback / min(T_repeat, step)) * (range_duration / step + 1).""" - mock_match = create_mock_match(900) # 15 minutes - # T_lookback=900, range_duration=3600, T_repeat=30, step=60 - # effective_repeat = min(30, 60) = 30 - # lookback_buckets = 900 / 30 = 30 - # num_steps = 3600 / 60 + 1 = 61 - # result = 30 * 61 = 1830 - result = get_cleanup_param( - cleanup_policy=CleanupPolicy.READ_BASED, - query_pattern_type=QueryPatternType.ONLY_TEMPORAL, - query_pattern_match=mock_match, - t_repeat=30, - window_type="tumbling", - range_duration=3600, - step=60, - ) - assert result == 1830 - - -class TestOnlySpatialQueries: - """Tests for ONLY_SPATIAL queries (T_lookback = T_repeat).""" - - def test_only_spatial_instant_query(self): - """ONLY_SPATIAL uses T_lookback = T_repeat.""" - mock_match = MagicMock() # No range_vector token needed - # T_lookback = T_repeat = 30 - # circular_buffer instant: T_lookback / T_repeat = 30/30 = 1 - result = get_cleanup_param( - cleanup_policy=CleanupPolicy.CIRCULAR_BUFFER, - query_pattern_type=QueryPatternType.ONLY_SPATIAL, - query_pattern_match=mock_match, - t_repeat=30, - window_type="tumbling", - range_duration=0, - step=0, - ) - assert result == 1 - - def test_only_spatial_range_query(self): - """ONLY_SPATIAL range query uses T_lookback = T_repeat.""" - mock_match = MagicMock() - # T_lookback = T_repeat = 30, range_duration=3600, step=60 - # effective_repeat = min(30, 60) = 30 - # circular_buffer: (30 + 3600) / 30 = 121 - result = get_cleanup_param( - cleanup_policy=CleanupPolicy.CIRCULAR_BUFFER, - query_pattern_type=QueryPatternType.ONLY_SPATIAL, - query_pattern_match=mock_match, - t_repeat=30, - window_type="tumbling", - range_duration=3600, - step=60, - ) - assert result == 121 - - -class TestMinimumResult: - """Tests that result is always at least 1.""" - - def test_minimum_result_is_one(self): - """Result should never be less than 1.""" - mock_match = create_mock_match(10) # Very small lookback - result = get_cleanup_param( - cleanup_policy=CleanupPolicy.CIRCULAR_BUFFER, - query_pattern_type=QueryPatternType.ONLY_TEMPORAL, - query_pattern_match=mock_match, - t_repeat=100, # Larger than lookback - window_type="tumbling", - range_duration=0, - step=0, - ) - # 10 / 100 = 0, but should be at least 1 - assert result == 1 - - -if __name__ == "__main__": - pytest.main([__file__, "-v"]) diff --git a/asap-quickstart/README.md b/asap-quickstart/README.md index 1419da9..e014241 100644 --- a/asap-quickstart/README.md +++ b/asap-quickstart/README.md @@ -16,7 +16,7 @@ Then it adds ASAPQuery's components on top: - **Query Engine** - Prometheus-compatible API with sketch-based acceleration - **[Arroyo](https://github.com/ProjectASAP/arroyo) + asap-summary-ingest** - Streaming engine with pipelines configured for building sketches - **Kafka** - Message broker for streaming data from Arroyo to the Query Engine -- **asap-planner** - Automatically configures sketches from PromQL queries +- **asap-planner-rs** - Automatically configures sketches from PromQL queries Once you run the quickstart, you will see a pre-configured Grafana dashboard that compares Prometheues and ASAPQuery side-by-side. You will see **visually indistinguishable** results from Prometheus and ASAPQuery, with ASAPQuery being 100x faster @@ -88,7 +88,7 @@ Run `python3 set_data_cardinality.py ` where `M` is the number of labels To modify the queries in the Grafana dashboard and run ASAPQuery against those: -#### 1. Edit the asap-planner Config +#### 1. Edit the asap-planner-rs Config Edit `config/controller-config.yaml`: diff --git a/asap-quickstart/docker-compose.yml b/asap-quickstart/docker-compose.yml index d5b12b1..8c94352 100644 --- a/asap-quickstart/docker-compose.yml +++ b/asap-quickstart/docker-compose.yml @@ -169,10 +169,10 @@ services: # INIT CONTAINERS ############################################################################# - asap-planner: + asap-planner-rs: image: ghcr.io/projectasap/asap-planner-rs:v0.2.0 - container_name: asap-planner - hostname: asap-planner + container_name: asap-planner-rs + hostname: asap-planner-rs networks: - asap-network command: @@ -210,7 +210,7 @@ services: - asap-planner-output:/asap-planner-output:ro - ./output/asap-summary-ingest:/asap-summary-ingest-output depends_on: - asap-planner: + asap-planner-rs: condition: service_completed_successfully arroyo: condition: service_healthy diff --git a/asap-summary-ingest/README.md b/asap-summary-ingest/README.md index 25a985f..c7f2c93 100644 --- a/asap-summary-ingest/README.md +++ b/asap-summary-ingest/README.md @@ -4,19 +4,19 @@ asap-summary-ingest is the pipeline configurator that creates Arroyo streaming p ## Purpose -Given `streaming_config.yaml` (generated by asap-planner), asap-summary-ingest: +Given `streaming_config.yaml` (generated by asap-planner-rs), asap-summary-ingest: 1. Renders SQL query templates using Jinja2 2. Creates Arroyo pipelines via REST API 3. Configures sketch-building UDFs with parameters 4. Sets up connections to Kafka for sketch output -This automation eliminates manual pipeline creation and ensures consistency with asap-planner decisions. +This automation eliminates manual pipeline creation and ensures consistency with asap-planner-rs decisions. ## How It Works ### Input: streaming_config.yaml -The asap-planner generates this file describing which sketches to build: +The asap-planner-rs generates this file describing which sketches to build: **TODO** diff --git a/asap-tools/asap-cli/src/docker_util.rs b/asap-tools/asap-cli/src/docker_util.rs index 89f1c9b..d071efd 100644 --- a/asap-tools/asap-cli/src/docker_util.rs +++ b/asap-tools/asap-cli/src/docker_util.rs @@ -227,7 +227,7 @@ pub async fn generate_controller_compose( let code_dir = project_root.to_string_lossy(); // Paths for template and script - let controller_dir = format!("{}/asap-planner", code_dir); + let controller_dir = format!("{}/asap-planner-rs", code_dir); let template_path = format!("{}/controller-cli-compose.yml.j2", controller_dir); let helper_script = format!("{}/asap-tools/experiments/generate_controller_compose.py", code_dir); diff --git a/asap-tools/components.conf b/asap-tools/components.conf index 26d58ac..080389e 100644 --- a/asap-tools/components.conf +++ b/asap-tools/components.conf @@ -7,7 +7,6 @@ asap-common #FlinkSketch #QueryEngine asap-query-engine -asap-planner asap-planner-rs #prometheus-kafka-adapter asap-summary-ingest diff --git a/asap-tools/docs/deployment.md b/asap-tools/docs/deployment.md index 79e7cc2..ab67cbf 100644 --- a/asap-tools/docs/deployment.md +++ b/asap-tools/docs/deployment.md @@ -64,7 +64,7 @@ The script executes these phases in order: - Builds and installs project-specific code: - asap-common (base Docker image) - asap-query-engine (Rust binary + Docker image) - - asap-planner (Docker image) + - asap-planner-rs (Rust binary + Docker image) - Arroyo (Node.js frontend + Rust binary + Docker image) - asap-summary-ingest (Python scripts) - asap-tools/queriers/prometheus-client, asap-tools/data-sources/prometheus-exporters, asap-tools/execution-utilities, asap-tools/prometheus-benchmark @@ -76,7 +76,7 @@ The script executes these phases in order: ├── code/ # All component source code │ ├── asap-tools/ │ ├── asap-query-engine/ -│ ├── asap-planner/ +│ ├── asap-planner-rs/ │ ├── asap-summary-ingest/ │ ├── arroyo/ │ ├── asap-common/ @@ -147,7 +147,7 @@ asap-tools asap-common sketchlib-rust asap-query-engine -asap-planner +asap-planner-rs asap-summary-ingest asap-quickstart asap-tools/data-sources/prometheus-exporters @@ -261,12 +261,13 @@ docker build -t sketchdb-queryengine-rust:latest . # Build Docker image ``` **Deployment:** Can run as Docker container or bare-metal binary -#### 3. asap-planner -**What:** Service that generates sketch configurations based on query patterns +#### 3. asap-planner-rs +**What:** Rust-based service that generates sketch configurations based on query patterns **Build Process:** ```bash -cd asap-planner -docker build -t sketchdb-controller:latest . +cd asap-planner-rs +cargo build --release -p asap_planner +docker build . -f Dockerfile -t sketchdb-controller:latest ``` **Deployment:** Docker container only diff --git a/docs/01-getting-started/architecture.md b/docs/01-getting-started/architecture.md index 5910f41..5afa5e0 100644 --- a/docs/01-getting-started/architecture.md +++ b/docs/01-getting-started/architecture.md @@ -186,7 +186,7 @@ graph LR | **asap-query-engine** | Answers PromQL queries using sketches | Rust | `asap-query-engine/` | | **Arroyo** | Stream processing for building sketches | Rust (forked) | [github.com/ProjectASAP/arroyo](https://github.com/ProjectASAP/arroyo) | | **asap-sketch-ingest** | Configures Arroyo pipelines from config | Python | `asap-sketch-ingest/` | -| **asap-planner** | Auto-determines sketch parameters | Python | `asap-planner/` | +| **asap-planner-rs** | Auto-determines sketch parameters | Rust | `asap-planner-rs/` | | **Kafka** | Message broker for sketch distribution | Apache Kafka | (external) | | **Prometheus** | Time-series database (existing) | Go | (external) | | **Exporters** | Generate synthetic metrics for testing | Rust/Python | `asap-tools/data-sources/prometheus-exporters/` | @@ -228,7 +228,7 @@ graph LR - Serde for serialization - DataSketches (dsrs) for sketch algorithms -- **Python** - asap-planner, asap-sketch-ingest, experiment framework +- **Python** - asap-sketch-ingest, experiment framework - PyYAML for config parsing - Jinja2 for SQL templates - Requests for HTTP clients @@ -264,7 +264,7 @@ ASAPQuery/ │ ├── templates/ # Jinja2 SQL templates │ └── utils/ # Arroyo API client │ -├── asap-planner/ # Auto-configuration service +├── asap-planner-rs/ # Auto-configuration service │ ├── main_controller.py # Entry point │ ├── classes/ # Config data structures │ └── utils/ # Decision logic diff --git a/docs/02-components/README.md b/docs/02-components/README.md index 9c8929b..a4762e4 100644 --- a/docs/02-components/README.md +++ b/docs/02-components/README.md @@ -9,7 +9,7 @@ This document provides an overview of all ASAP components and links to detailed | **asap-query-engine** | Answers PromQL queries using sketches | Rust | [Details](query-engine.md) · [Code](../../asap-query-engine/) · [Dev Docs](../../asap-query-engine/docs/README.md) | | **Arroyo** | Stream processing for building sketches | Rust (forked) | [Details](arroyo.md) · [Code](https://github.com/ProjectASAP/arroyo) | | **asap-sketch-ingest** | Configures Arroyo pipelines from config | Python | [Details](arroyosketch.md) · [Code](../../asap-sketch-ingest/) · [README](../../asap-sketch-ingest/README.md) | -| **asap-planner** | Auto-determines sketch parameters | Python | [Details](controller.md) · [Code](../../asap-planner/) · [README](../../asap-planner/README.md) | +| **asap-planner-rs-rs** | Auto-determines sketch parameters | Rust | [Details](controller.md) · [Code](../../asap-planner-rs-rs/) | | **Exporters** | Generate synthetic metrics for testing | Rust/Python | [Details](exporters.md) · [Code](../../asap-tools/data-sources/prometheus-exporters/) · [README](../../asap-tools/data-sources/prometheus-exporters/README.md) | | **asap-tools** | Experiment framework for CloudLab | Python | [Details](utilities.md) · [Code](../../asap-tools/) · [Docs](../../asap-tools/docs/architecture.md) | @@ -19,7 +19,7 @@ This document provides an overview of all ASAP components and links to detailed graph TB subgraph "Configuration (Offline)" U[User] -->|edits| CC[controller-config.yaml] - CC --> C[asap-planner] + CC --> C[asap-planner-rs] C -->|streaming_config.yaml| AS[asap-sketch-ingest] C -->|inference_config.yaml| Q AS -->|create pipelines| A @@ -73,7 +73,7 @@ These run continuously to serve queries: These run once to set up the system: -- **[asap-planner](controller.md)** - Determines optimal sketch parameters +- **[asap-planner-rs](controller.md)** - Determines optimal sketch parameters - Analyzes query workload - Selects sketch algorithms - Generates configs for Arroyo and QueryEngine @@ -111,7 +111,7 @@ Performance-critical components written in Rust: Configuration and orchestration in Python: -- **asap-planner** - Query analysis and config generation +- **asap-planner-rs** - Query analysis and config generation - **asap-sketch-ingest** - Pipeline configuration - **asap-tools** - Experiment framework - **Python Exporters** - Simpler metric generators @@ -122,7 +122,7 @@ Configuration and orchestration in Python: asap-query-engine ├── Kafka (runtime) - Consumes sketches ├── Prometheus (runtime, optional) - Fallback queries -└── inference_config.yaml (config) - From asap-planner +└── inference_config.yaml (config) - From asap-planner-rs Arroyo ├── Prometheus (runtime) - Remote write source @@ -131,9 +131,9 @@ Arroyo asap-sketch-ingest ├── Arroyo (runtime) - Creates pipelines via API -└── streaming_config.yaml (config) - From asap-planner +└── streaming_config.yaml (config) - From asap-planner-rs -asap-planner +asap-planner-rs ├── controller-config.yaml (input) - User-provided ├── streaming_config.yaml (output) - For asap-sketch-ingest └── inference_config.yaml (output) - For asap-query-engine @@ -153,7 +153,7 @@ asap-tools - [asap-query-engine](query-engine.md) - Query processor deep dive - [Arroyo](arroyo.md) - Streaming engine + ASAP customizations - [asap-sketch-ingest](arroyosketch.md) - Pipeline configurator -- [asap-planner](controller.md) - Auto-configuration service +- [asap-planner-rs](controller.md) - Auto-configuration service - [Exporters](exporters.md) - Metric generators - [asap-tools](utilities.md) - Experiment framework @@ -162,7 +162,7 @@ asap-tools For implementation details, see READMEs co-located with code: - [asap-query-engine/docs/](../../asap-query-engine/docs/README.md) - Extensibility guides -- [asap-planner/README.md](../../asap-planner/README.md) - asap-planner internals +- [asap-planner-rs/README.md](../../asap-planner-rs/README.md) - asap-planner-rs internals - [asap-sketch-ingest/README.md](../../asap-sketch-ingest/README.md) - Pipeline config internals - [asap-tools/data-sources/prometheus-exporters/README.md](../../asap-tools/data-sources/prometheus-exporters/README.md) - Exporter implementations - [asap-tools/docs/](../../asap-tools/docs/architecture.md) - Experiment framework architecture diff --git a/docs/03-how-to-guides/development/adapt-pr-after-reorg.md b/docs/03-how-to-guides/development/adapt-pr-after-reorg.md index e25abf7..1d07009 100644 --- a/docs/03-how-to-guides/development/adapt-pr-after-reorg.md +++ b/docs/03-how-to-guides/development/adapt-pr-after-reorg.md @@ -122,8 +122,8 @@ git push --force-with-lease origin your-branch-name Key CI jobs to watch: - **Rust** — Cargo build and tests for `asap-query-engine` and `asap-common` -- **Python** — Linting/type-checking for `asap-tools/queriers/prometheus-client` and `asap-planner` -- **Docker** — Image builds for `asap-query-engine`, `asap-planner`, `asap-sketch-ingest` +- **Python** — Linting/type-checking for `asap-tools/queriers/prometheus-client` +- **Docker** — Image builds for `asap-query-engine`, `asap-planner-rs`, `asap-sketch-ingest` --- diff --git a/docs/03-how-to-guides/development/add-new-sketch.md b/docs/03-how-to-guides/development/add-new-sketch.md index b88ec20..13b1652 100644 --- a/docs/03-how-to-guides/development/add-new-sketch.md +++ b/docs/03-how-to-guides/development/add-new-sketch.md @@ -75,15 +75,9 @@ pub use your_sketch_accumulator::*; --- -## Step 4: asap-planner - Sketch Parameters (Optional) +## Step 4: asap-planner-rs - Sketch Parameters (Optional) -**File to modify**: `asap-planner/classes/StreamingAggregationConfig.py` or `asap-planner/utils/logics.py` - -**What to add**: -- Custom sketch parameters (size, epsilon, etc.) in `get_sketch_parameters()` or similar -- SLA-based parameter computation in `compute_sketch_parameters()` if needed - -**Usually**: asap-planner picks up sketch automatically from asap-common mapping. +**Usually**: asap-planner-rs picks up sketch automatically from asap-common mapping. Custom sketch parameters (size, epsilon, etc.) can be added in the Rust source under `asap-planner-rs/src/`. --- @@ -92,7 +86,7 @@ pub use your_sketch_accumulator::*; - [ ] `validate_udfs.py` passes (ArroyoSketch) - [ ] `cargo build --release` succeeds (asap-query-engine) - [ ] `cargo test` passes (asap-query-engine) -- [ ] End-to-end: asap-planner → asap-sketch-ingest → Arroyo → Kafka → QueryEngine → Query result +- [ ] End-to-end: asap-planner-rs → asap-sketch-ingest → Arroyo → Kafka → QueryEngine → Query result --- diff --git a/docs/03-how-to-guides/operations/publishing-docker-images.md b/docs/03-how-to-guides/operations/publishing-docker-images.md index 230cc43..800255a 100644 --- a/docs/03-how-to-guides/operations/publishing-docker-images.md +++ b/docs/03-how-to-guides/operations/publishing-docker-images.md @@ -16,7 +16,7 @@ That's it. GitHub Actions will build and push all images tagged as both `v0.2.0` | Image | Source | |---|---| | `ghcr.io/projectasap/asap-base` | `asap-common/installation/` | -| `ghcr.io/projectasap/asap-planner` | `asap-planner/` | +| `ghcr.io/projectasap/asap-planner-rs` | `asap-planner-rs/` | | `ghcr.io/projectasap/asap-sketch-ingest` | `asap-sketch-ingest/` | | `ghcr.io/projectasap/asap-query-engine` | `asap-query-engine/` | | `ghcr.io/projectasap/asap-prometheus-client` | `asap-tools/queriers/prometheus-client/` | diff --git a/docs/README.md b/docs/README.md index bdc6389..fb380b2 100644 --- a/docs/README.md +++ b/docs/README.md @@ -14,7 +14,7 @@ Deep dives into each component: - [Query Engine](02-components/query-engine.md) - Rust query processor - [Arroyo](02-components/arroyo.md) - Streaming engine (fork + customizations) - [asap-sketch-ingest](02-components/arroyosketch.md) - Pipeline configurator -- [asap-planner](02-components/controller.md) - Auto-configuration service +- [asap-planner-rs](02-components/controller.md) - Auto-configuration service - [Exporters](02-components/exporters.md) - Metric generators - [asap-tools](02-components/utilities.md) - Experiment framework @@ -54,6 +54,6 @@ Developer practices and infrastructure: Technical details co-located with code: - [asap-query-engine](../asap-query-engine/docs/README.md) - Extensibility guides - [asap-tools/Experiments](../asap-tools/docs/architecture.md) - Experiment framework architecture -- [asap-planner](../asap-planner/README.md) - Controller internals +- [asap-planner-rs](../asap-planner-rs/README.md) - Controller internals - [asap-sketch-ingest](../asap-sketch-ingest/README.md) - Pipeline configuration - [asap-tools/data-sources/prometheus-exporters](../asap-tools/data-sources/prometheus-exporters/README.md) - Exporter implementations