feat: Adding hand-crafted precompute engine option in e2e quickstart#270
Open
feat: Adding hand-crafted precompute engine option in e2e quickstart#270
Conversation
b02c161 to
aec8a18
Compare
…e2e quickstart The e2e quickstart previously required Kafka, Arroyo, and asap-summary-ingest to run the streaming aggregation pipeline. This replaces that entire stack with the hand-crafted precompute engine running inside the query engine process, significantly simplifying the deployment. Changes: - Add `Precompute` variant to `StreamingEngine` enum - Make `--kafka-topic` and `--input-format` optional CLI args (only required when streaming-engine=arroyo) - Skip Kafka consumer when streaming-engine=precompute - Auto-enable precompute engine when streaming-engine=precompute - Remove Kafka, Arroyo, and asap-summary-ingest services from docker-compose - Point Prometheus remote_write to query engine's precompute ingest endpoint - Add healthcheck to query engine service for proper startup ordering Data flow before: Prometheus → Arroyo (via Kafka) → Kafka → Query Engine Data flow after: Prometheus → Query Engine (precompute engine, direct to store) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Instead of modifying the existing docker-compose.yml and prometheus.yml, keep them unchanged (Arroyo-based) and add new files for the precompute engine variant: - docker-compose-precompute.yml: e2e stack using the hand-crafted precompute engine (no Kafka/Arroyo/asap-summary-ingest needed) - prometheus-precompute.yml: remote_write pointing to queryengine:9091 Usage: docker compose -f docker-compose-precompute.yml up Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The query engine Docker image (Ubuntu 24.04 minimal) doesn't have curl or wget. Use bash /dev/tcp for TCP port check healthcheck. Also changed from /api/v1/query?query=up to a TCP check to avoid the healthcheck failing when Prometheus isn't up yet (forward- unsupported-queries would try to reach Prometheus). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The published v0.2.0 image doesn't have --streaming-engine=precompute. Use v0.2.0-precompute tag for the locally built image with the new streaming engine variant. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…accumulators Match Arroyo's GROUP BY semantics: the ingest handler now extracts grouping label values from each series and groups samples by (agg_id, group_key) before routing to workers. The router hashes by group key so all series sharing the same grouping labels land on the same worker and feed a single shared accumulator. For the quickstart (189K series, 7 pattern groups), this reduces accumulator count from 189K to 7, store writes/sec from 189K to 7, and eliminates query-time fan-in merge. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…r keys
For keyed accumulators (MultipleSum, CMS, HydraKLL), the key passed to
update_keyed must come from aggregated_labels — these are the labels
that form the key dimension inside the sketch (e.g., which entry in a
MultipleSumAccumulator's HashMap, which bucket in a CMS grid).
Previously, extract_key_from_series used grouping_labels, which the
planner sets to empty for keyed operators. This caused all samples to
hash to the same internal bucket, collapsing distinct keys.
Now matches the Arroyo SQL pattern:
udf(concat_ws(';', aggregated_labels), value) -- key inside sketch
GROUP BY concat_ws(';', grouping_labels) -- output group key
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Add .dockerignore to exclude target/ and .git/ from docker context (reduces context from ~1.9GB to ~34MB) - Add Dockerfile.queryengine-local for fast local builds that copy pre-built binary instead of compiling inside docker - Add docker-compose-precompute.local.yml override for local dev - Fix Dockerfile stub for e2e_quickstart_resource_test binary Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…vice - Replace panic on invalid aggregationId with proper anyhow error - Return Option from get_or_create_group_state to handle unknown agg_id - Fix flush_all no-op by advancing watermark by 1ms to close pending windows - Add missing fake-exporter-spiky service in docker-compose-precompute.yml Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Two-layer watermark propagation that closes windows for idle groups: - Intra-worker: worker_wm = max(all group watermarks), propagated to idle groups on flush - Cross-worker: global_wm = min(all worker watermarks) via shared Arc<AtomicI64> - Each group's effective watermark on flush = max(group_wm, global_wm) + 1ms Adds watermark figure and design section to precompute_engine_design_doc.md. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tations SimpleMapStore now wraps SimpleMapStoreGlobal/SimpleMapStorePerKey (the epoch-based implementations) instead of the legacy stores. Adds diagnostic_info() to both current stores. Legacy stores remain available for benchmark comparisons only. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…stores Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
33d541d to
56fea63
Compare
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
56fea63 to
932a8a0
Compare
- Remove unused import `config_is_keyed` - Remove unnecessary `mut` on worker variable - Use `.to_vec()` instead of `.iter().cloned().collect()` - Add type aliases to avoid clippy type_complexity warning - Allow too_many_arguments on Worker::new and test helpers - Fix e2e MultipleSum test: host belongs in aggregated_labels (inner sketch key), not grouping_labels Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Precomputevariant toStreamingEngineenum and makes Kafka CLI args optionalData flow change
Before (Arroyo):
Fake Exporters → Prometheus → Arroyo (via remote_write) → Kafka → asap-summary-ingest → Query Engine StoreAfter (Precompute):
Fake Exporters → Prometheus → Query Engine precompute ingest (via remote_write) → Store (direct, zero-copy)Code changes
Precompute engine
precompute_engine/worker.rsArc<AtomicI64>; fix panic on unknown agg_id (returnOption)precompute_engine/engine.rsWorker::new; addworker_watermarkstoPrecomputeWorkerDiagnosticsprecompute_engine/series_router.rshash(agg_id, group_key)instead of series key; addGroupSamplesmessage variantprecompute_engine/ingest_handler.rs(agg_id, group_key)before routing; match series to aggregation configsprecompute_engine/precompute_engine_design_doc.mdQuery engine integration
data_model/enums.rsPrecomputetoStreamingEngineenummain.rs--kafka-topic/--input-formatoptional; skip Kafka consumer when--streaming-engine=precompute; auto-enable precompute engine; add memory diagnostics loggingbin/e2e_quickstart_resource_test.rsStore migration
stores/simple_map_store/mod.rsSimpleMapStoreenum to wrapSimpleMapStoreGlobal/SimpleMapStorePerKey(current) instead of legacy; moveStoreDiagnostics/AggregationDiagnosticstructs herestores/simple_map_store/global.rsdiagnostic_info()methodstores/simple_map_store/per_key.rsdiagnostic_info()methodBug fixes
asap_types/streaming_config.rsaggregationId-> properanyhowerrorprecompute_engine/worker.rsflush_allno-op (was callingclosed_windows(wm, wm)which always returns empty)Docker / quickstart
docker-compose-precompute.ymlfake-exporter-spiky)docker-compose-precompute.dev.ymldocker-compose-precompute.local.ymlDockerfile.queryengine-localconfig/prometheus-precompute.yml.dockerignoretarget/and.git/.gitignoreasap-quickstart/bin/Quickstart benchmark results
Resource usage: Query engine 39% CPU, 30 MiB memory (vs Prometheus 37% CPU, 654 MiB)
Quantile query latency (ASAP vs Prometheus, 189K series):
Accuracy: <1% error at p95/p99, <7% at p50 (expected for KLL K=200)
Test plan
cargo checkpassescargo test-- all 434 tests pass (23 worker tests including 5 new watermark tests)cargo fmt --checkpasses--streaming-engine=arroyomode unaffected (backward compatible)