Skip to content

feat: Adding hand-crafted precompute engine option in e2e quickstart#270

Open
zzylol wants to merge 17 commits intomainfrom
feat/e2e-precompute-engine
Open

feat: Adding hand-crafted precompute engine option in e2e quickstart#270
zzylol wants to merge 17 commits intomainfrom
feat/e2e-precompute-engine

Conversation

@zzylol
Copy link
Copy Markdown
Contributor

@zzylol zzylol commented Apr 8, 2026

Summary

  • Replaces the Arroyo streaming pipeline (Kafka + Arroyo + asap-summary-ingest) with the hand-crafted precompute engine running inside the query engine process
  • Simplifies the e2e quickstart from ~12 services down to ~9, removing Kafka, Arroyo, kafka-init, and asap-summary-ingest
  • Adds Precompute variant to StreamingEngine enum and makes Kafka CLI args optional
  • Cross-group watermark propagation: two-layer watermark design (intra-worker max, cross-worker min via shared atomics) so idle groups get their windows closed
  • Migrates SimpleMapStore from legacy to current (epoch-based) store implementations
  • E2E resource test: standalone binary that simulates 189K series and reports CPU/memory

Data flow change

Before (Arroyo): Fake Exporters → Prometheus → Arroyo (via remote_write) → Kafka → asap-summary-ingest → Query Engine Store

After (Precompute): Fake Exporters → Prometheus → Query Engine precompute ingest (via remote_write) → Store (direct, zero-copy)

Code changes

Precompute engine

File Change
precompute_engine/worker.rs Refactor from per-series to per-group accumulators (GROUP BY semantics); add cross-group watermark propagation via shared Arc<AtomicI64>; fix panic on unknown agg_id (return Option)
precompute_engine/engine.rs Create shared watermark atomics per worker; pass to Worker::new; add worker_watermarks to PrecomputeWorkerDiagnostics
precompute_engine/series_router.rs Route by hash(agg_id, group_key) instead of series key; add GroupSamples message variant
precompute_engine/ingest_handler.rs Group decoded samples by (agg_id, group_key) before routing; match series to aggregation configs
precompute_engine/precompute_engine_design_doc.md Add watermark figure (event time, window lifecycle, late data) and cross-group propagation design section

Query engine integration

File Change
data_model/enums.rs Add Precompute to StreamingEngine enum
main.rs Make --kafka-topic/--input-format optional; skip Kafka consumer when --streaming-engine=precompute; auto-enable precompute engine; add memory diagnostics logging
bin/e2e_quickstart_resource_test.rs New: standalone E2E test simulating 7 exporters x 27K series for resource usage reporting

Store migration

File Change
stores/simple_map_store/mod.rs Switch SimpleMapStore enum to wrap SimpleMapStoreGlobal/SimpleMapStorePerKey (current) instead of legacy; move StoreDiagnostics/AggregationDiagnostic structs here
stores/simple_map_store/global.rs Add diagnostic_info() method
stores/simple_map_store/per_key.rs Add diagnostic_info() method

Bug fixes

File Change
asap_types/streaming_config.rs Fix panic on invalid aggregationId -> proper anyhow error
precompute_engine/worker.rs Fix flush_all no-op (was calling closed_windows(wm, wm) which always returns empty)

Docker / quickstart

File Change
docker-compose-precompute.yml New: base compose with Prometheus, query engine, 8 fake exporters (adds missing fake-exporter-spiky)
docker-compose-precompute.dev.yml New: override to build query engine from source
docker-compose-precompute.local.yml New: override to use pre-built local binary
Dockerfile.queryengine-local New: lightweight image copying pre-built binary
config/prometheus-precompute.yml New: Prometheus config pointing remote_write to query engine
.dockerignore Add target/ and .git/
.gitignore Add asap-quickstart/bin/

Quickstart benchmark results

Resource usage: Query engine 39% CPU, 30 MiB memory (vs Prometheus 37% CPU, 654 MiB)

Quantile query latency (ASAP vs Prometheus, 189K series):

Query ASAP Prometheus Speedup
p99 11ms 622ms 57x
p95 12ms 693ms 58x
p90 12ms 583ms 49x
p50 16ms 635ms 40x

Accuracy: <1% error at p95/p99, <7% at p50 (expected for KLL K=200)

Test plan

  • cargo check passes
  • cargo test -- all 434 tests pass (23 worker tests including 5 new watermark tests)
  • cargo fmt --check passes
  • Docker quickstart runs end-to-end with correct quantile results
  • Existing --streaming-engine=arroyo mode unaffected (backward compatible)

@zzylol zzylol changed the title feat: replace Arroyo pipeline with hand-crafted precompute engine in e2e quickstart feat: Adding hand-crafted precompute engine option in e2e quickstart Apr 8, 2026
@zzylol zzylol force-pushed the feat/e2e-precompute-engine branch 2 times, most recently from b02c161 to aec8a18 Compare April 9, 2026 17:19
zz_y and others added 14 commits April 9, 2026 15:23
…e2e quickstart

The e2e quickstart previously required Kafka, Arroyo, and asap-summary-ingest
to run the streaming aggregation pipeline. This replaces that entire stack with
the hand-crafted precompute engine running inside the query engine process,
significantly simplifying the deployment.

Changes:
- Add `Precompute` variant to `StreamingEngine` enum
- Make `--kafka-topic` and `--input-format` optional CLI args (only required
  when streaming-engine=arroyo)
- Skip Kafka consumer when streaming-engine=precompute
- Auto-enable precompute engine when streaming-engine=precompute
- Remove Kafka, Arroyo, and asap-summary-ingest services from docker-compose
- Point Prometheus remote_write to query engine's precompute ingest endpoint
- Add healthcheck to query engine service for proper startup ordering

Data flow before: Prometheus → Arroyo (via Kafka) → Kafka → Query Engine
Data flow after:  Prometheus → Query Engine (precompute engine, direct to store)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Instead of modifying the existing docker-compose.yml and prometheus.yml,
keep them unchanged (Arroyo-based) and add new files for the precompute
engine variant:

- docker-compose-precompute.yml: e2e stack using the hand-crafted
  precompute engine (no Kafka/Arroyo/asap-summary-ingest needed)
- prometheus-precompute.yml: remote_write pointing to queryengine:9091

Usage: docker compose -f docker-compose-precompute.yml up

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The query engine Docker image (Ubuntu 24.04 minimal) doesn't have
curl or wget. Use bash /dev/tcp for TCP port check healthcheck.

Also changed from /api/v1/query?query=up to a TCP check to avoid
the healthcheck failing when Prometheus isn't up yet (forward-
unsupported-queries would try to reach Prometheus).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The published v0.2.0 image doesn't have --streaming-engine=precompute.
Use v0.2.0-precompute tag for the locally built image with the new
streaming engine variant.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…accumulators

Match Arroyo's GROUP BY semantics: the ingest handler now extracts
grouping label values from each series and groups samples by
(agg_id, group_key) before routing to workers. The router hashes
by group key so all series sharing the same grouping labels land on
the same worker and feed a single shared accumulator.

For the quickstart (189K series, 7 pattern groups), this reduces
accumulator count from 189K to 7, store writes/sec from 189K to 7,
and eliminates query-time fan-in merge.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…r keys

For keyed accumulators (MultipleSum, CMS, HydraKLL), the key passed to
update_keyed must come from aggregated_labels — these are the labels
that form the key dimension inside the sketch (e.g., which entry in a
MultipleSumAccumulator's HashMap, which bucket in a CMS grid).

Previously, extract_key_from_series used grouping_labels, which the
planner sets to empty for keyed operators. This caused all samples to
hash to the same internal bucket, collapsing distinct keys.

Now matches the Arroyo SQL pattern:
  udf(concat_ws(';', aggregated_labels), value)  -- key inside sketch
  GROUP BY concat_ws(';', grouping_labels)        -- output group key

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Add .dockerignore to exclude target/ and .git/ from docker context
  (reduces context from ~1.9GB to ~34MB)
- Add Dockerfile.queryengine-local for fast local builds that copy
  pre-built binary instead of compiling inside docker
- Add docker-compose-precompute.local.yml override for local dev
- Fix Dockerfile stub for e2e_quickstart_resource_test binary

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…vice

- Replace panic on invalid aggregationId with proper anyhow error
- Return Option from get_or_create_group_state to handle unknown agg_id
- Fix flush_all no-op by advancing watermark by 1ms to close pending windows
- Add missing fake-exporter-spiky service in docker-compose-precompute.yml

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Two-layer watermark propagation that closes windows for idle groups:
- Intra-worker: worker_wm = max(all group watermarks), propagated to idle groups on flush
- Cross-worker: global_wm = min(all worker watermarks) via shared Arc<AtomicI64>
- Each group's effective watermark on flush = max(group_wm, global_wm) + 1ms

Adds watermark figure and design section to precompute_engine_design_doc.md.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tations

SimpleMapStore now wraps SimpleMapStoreGlobal/SimpleMapStorePerKey (the
epoch-based implementations) instead of the legacy stores. Adds
diagnostic_info() to both current stores. Legacy stores remain available
for benchmark comparisons only.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…stores

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@zzylol zzylol force-pushed the feat/e2e-precompute-engine branch from 33d541d to 56fea63 Compare April 9, 2026 19:25
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@zzylol zzylol force-pushed the feat/e2e-precompute-engine branch from 56fea63 to 932a8a0 Compare April 9, 2026 19:30
zzylol and others added 2 commits April 9, 2026 15:47
- Remove unused import `config_is_keyed`
- Remove unnecessary `mut` on worker variable
- Use `.to_vec()` instead of `.iter().cloned().collect()`
- Add type aliases to avoid clippy type_complexity warning
- Allow too_many_arguments on Worker::new and test helpers
- Fix e2e MultipleSum test: host belongs in aggregated_labels (inner
  sketch key), not grouping_labels

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant