Conversation
Implements a single-node multi-threaded precompute engine as a new module and binary target within QueryEngineRust. The engine ingests Prometheus remote write samples, buffers them per-series with out-of-order handling, detects closed tumbling/sliding windows via event-time watermarks, feeds samples into accumulator wrappers for all existing sketch types, and emits PrecomputedOutput directly to the store. New modules: config, series_buffer, window_manager, accumulator_factory, series_router, worker, output_sink, and the PrecomputeEngine orchestrator. The binary supports embedded store + query HTTP server for single-process deployment. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace sketch_db_common::enums::QueryLanguage import with query_engine_rust::data_model::QueryLanguage in the standalone binary, and integrate PrecomputeEngine into main.rs as an alternative to the removed PrometheusRemoteWriteServer. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Handle top-level aggregation types (DatasketchesKLL, Sum, Min, Max, etc.) directly in the factory match, fixing the fallback to Sum that broke quantile queries. Also preserve the K parameter in KllAccumulatorUpdater::reset() instead of hardcoding 200. Add test_e2e_precompute binary that validates the full ingest -> precompute -> store -> query pipeline end-to-end. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When pass_raw_samples is enabled, each incoming sample is emitted directly to the store as a SumAccumulator without windowing or watermark advancement. This supports use cases where raw passthrough is needed instead of sketch-based aggregation. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…te engine Instrument the ingest→worker→store pipeline with debug_span! and an Instant-based e2e latency log. All spans are at debug level so there is zero overhead at the default info level. Enable FmtSpan::CLOSE in both binaries to emit span durations when RUST_LOG=debug. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sends 10 series × 100 samples in a single HTTP request to the raw-mode engine and verifies all 1000 samples land in the store. Prints client RTT and per-series e2e_latency_us at debug level. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sends 1000 requests × 10000 samples (50 distinct series) to the raw-mode engine and polls until all samples are stored. Reports both send throughput and e2e throughput including drain time. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…dows Previously each sample was assigned to only one window via window_start_for(), which is incorrect for sliding windows where window_size > slide_interval. Added window_starts_containing() that returns all window starts whose range covers the timestamp, and use it in the worker aggregation loop. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…n doc Fix ghost accumulator bug where late samples passing the allowed_lateness check could create orphaned accumulators for already-closed windows. Add configurable LateDataPolicy (Drop/ForwardToStore) to control behavior. Drop prevents ghost windows; ForwardToStore emits mini-accumulators for query-time merge. Also add precompute engine design document. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Extract format-agnostic routing logic into route_decoded_samples() helper and register a VictoriaMetrics remote write endpoint at /api/v1/import alongside the existing Prometheus endpoint at /api/v1/write. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Route each sample to exactly 1 pane (sub-window of size slide_interval) instead of W overlapping window accumulators. When a window closes, merge its constituent panes via AggregateCore::merge_with(). This reduces per-sample accumulator updates from W to 1 for sliding windows. - Add snapshot_accumulator() to AccumulatorUpdater trait (9 implementations) - Add pane_start_for(), panes_for_window(), slide_interval_ms() to WindowManager - Replace active_windows HashMap with active_panes BTreeMap in worker - Rewrite sample routing and window close logic with pane merging - Extract PrecomputeEngine to engine.rs, ingest handlers to ingest_handler.rs - Update design doc with pane-based architecture Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add tumbling and sliding window throughput benchmarks to E2E test
(Tumbling 10s, Sliding 30s/10s W=3, Sliding 60s/10s W=6)
- Each benchmark uses NoopOutputSink to isolate worker throughput
- Fix cargo fmt formatting in window_manager.rs and worker.rs
- Pin sketchlib-rust to rev 663a1df for Chapter enum compatibility
- Fix CMS updater field access (self.acc.inner.{row_num,col_num,sketch})
- Fix SimpleEngine::new missing promsketch_store arg in E2E test
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Parameterize run_single_bench with num_workers and add run_scalability_benchmark() that measures throughput across 1, 2, 4, 8, 16 workers with sliding 30s/10s Sum (W=3). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use tokio::task::spawn_blocking to build request payloads in parallel across 8 threads, and send HTTP requests concurrently via 8 tokio tasks (matching the pattern used in run_single_bench). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The 10M-sample throughput test needs ~94s to fully drain through the store, exceeding the previous 60s deadline. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
30f2f29 to
369836c
Compare
Adds a CapturingOutputSink to output_sink.rs that stores all emitted (PrecomputedOutput, Box<dyn AggregateCore>) pairs in a Mutex<Vec> for inspection in tests, alongside drain() and len() helpers. Adds 6 unit tests in worker.rs covering: - Raw mode: each sample forwarded as SumAccumulator with sum == value - Tumbling window: correct boundary and aggregated sum on window close - Sliding window pane sharing: single sample emits in both overlapping windows with the correct sum via snapshot/take pane merge - GROUP BY separate emits: two series on the same worker produce independent per-series accumulators (no ingest-time cross-series merge) - Late data Drop: sample behind watermark - allowed_lateness not emitted - Late data ForwardToStore: late sample for evicted pane emitted as mini-accumulator with correct window bounds and sum Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Add section 4 on cross-series GROUP BY aggregation: label dimension roles (grouping/aggregated/rollup), cross-worker fan-in via store append, and eventual consistency property - Add CapturingOutputSink to OutputSink implementations (section 3.8) - Expand section 11 (Testing) with the 6 new worker.rs correctness tests - Update section 9 (Performance) with measured throughput numbers and a note on the workers=senders benchmark caveat causing super-linear speedup - Renumber sections 5-11 to accommodate the new GROUP BY section - Update file map entry for output_sink.rs Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds a subsection under §4 (Cross-Series GROUP BY Aggregation) that clarifies how the pane-sharing optimization interacts with cross-worker fan-in. Key point: pane snapshot/take is an intra-worker detail; each worker always emits one complete accumulator per closed window regardless of W, so the store append + query-time merge path is identical for tumbling and sliding windows. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds an 'Accumulator lifecycle and ownership' subsection to §3.4 (Worker) explaining the three-level lazy initialisation: configs copied to all workers at startup, AggregationState created on first sample per series, AccumulatorUpdater created on first sample per pane. Includes the full ownership hierarchy diagram and the deterministic-hash ownership guarantee. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…uter Adds a subsection under §3.3 (SeriesRouter) covering the four failure modes of hash-mod routing (hash skew, hot series, GROUP BY fan-in, static assignment) and a concrete mitigation strategy for each: virtual nodes, weight-aware placement, grouping-key routing / two-phase aggregation, state migration at window boundaries. Includes a practical recommendation to add MPSC channel backpressure metrics as the first observability step before any structural change. No implementation changes. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The original comment said "Manages tumbling window boundaries" which implied it only handled tumbling windows. Updated to reflect that it handles both tumbling and sliding windows, and clarified the slide_interval_ms field comment to distinguish the two cases. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…er tests Adds §12 (Known Data Loss Cases and Fault Tolerance TODOs) to the design doc covering all six data loss scenarios with a summary table, and four actionable TODOs: - Warn on unmatched series (case 4) - Opt-in force-flush of open panes on shutdown (case 5) - Worker restart on panic (case 6) - Write-ahead log / periodic pane snapshots for full crash recovery Also applies rustfmt formatting fixes to worker.rs test module (import ordering, line wrapping) to pass cargo fmt --check. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…nfig YAML Adds test_worker_from_streaming_config_yaml which parses an inline YAML string through StreamingConfig::from_yaml_data (the same path the Python controller output takes at engine startup) and verifies that a Worker built from the resulting agg_configs correctly closes a tumbling window and computes the expected sum. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… results: sum, kll
accumulator_factory.rs: - Add impl_clone_accumulator_methods! macro to replace 16 identical take_accumulator/snapshot_accumulator bodies across 8 updater structs - Add config_is_keyed() free fn for static keyed check without allocating an updater (replaces throwaway-updater pattern used 4+ times) - Add kll_k_param(), cms_params(), hydra_kll_params() helpers to eliminate 4 copy-pasted parameter extraction blocks in the factory - Fix latent bug: SingleSubpopulation/KLL arm now checks "K" (capital) before "k", consistent with all other arms - Add tests: test_config_is_keyed, test_kll_k_param_capital_k worker.rs: - AggregationState.config: AggregationConfig -> Arc<AggregationConfig> eliminating N×M deep config copies across series×aggregations - Worker.agg_configs and Worker::new param updated to Arc map; matching_agg_configs returns owned Arc clones (removes lifetime tie) - Add apply_sample() free fn replacing 3 copies of is_keyed dispatch - Add merge_panes_for_window() free fn replacing identical ~40-line pane-merge loop in both process_samples and flush_all - Fix ForwardToStore path: is_keyed() and extract_key_from_series() each called once instead of twice engine.rs: - Wrap aggregation configs in Arc once at engine startup; per-worker agg_configs.clone() is now N pointer bumps, not N full deep copies Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Rewrites the "Optional second-tier merge workers" section to fix seven correctness issues identified in review: 1. Deadlock (critical): workers with no matching series never emitted WindowCompletion. Fix: introduce per_agg_watermark per worker, advanced by both data arrival and the flush timer (wall-clock floor), so every worker emits WindowCompletion for every aggregation on every flush cycle regardless of whether it has matching series. 2. WindowCompletion routing (critical): routing partials by (agg_id, key, window) while routing key-less completions by (agg_id, window) required an O(M) broadcast of completions to all merge workers. Fix: route ALL messages — partials and completions — by hash(agg_id, window_start, window_end) only, co-locating all keys for a window on one merge worker and eliminating the broadcast. 3. Conflicting struct definitions (critical): two contradictory WindowCompletion structs were present (one with key, one without). Fix: adopt a single key-less definition; explain the rationale. 4. Merge-tier watermark undefined (significant): PendingWindowState had no eviction bound. Fix: define merge_watermark as the min of worker_watermark_ms values received via completions; add force-eviction after eviction_grace_period_ms for lagging workers. 5. Late data re-finalization (significant): "re-open the merged result" was unspecified after a window was already written to the store. Fix: late corrections remain store appends with query-time merge (same as non-merge-tier path); canonical finalized output is never mutated. 6. Message ordering (minor): partials and completions sent through separate channels had no ordering guarantee. Fix: both go through the same MPSC channel from a given first-tier worker; partials always enqueued before the completion for the same window. 7. State structure (minor): per-key completed_sources tracking was incompatible with key-less completions (required O(keys) scan). Fix: split state into window_completions (window-level) and pending_partials (key-level) maps; completion check is O(1), finalization scan is O(keys per window) done once at close. Also updates the rollout strategy to note that per_agg_watermark and flush-timer-driven WindowCompletion emission must be deployed in first-tier workers before any merge worker is started. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…arking Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This was referenced Mar 25, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Introduces the Precompute Engine — a real-time streaming aggregation system that ingests raw time-series samples via Prometheus and VictoriaMetrics remote write, computes windowed sketch aggregations, and writes results directly to the ASAP store for fast query-time retrieval.
Architecture
Single-machine, multi-threaded: all workers are Tokio async tasks in one process. Series are hash-partitioned across workers — no cross-shard coordination, no locks within a worker.
Key Design Choices
Pane-based incremental sliding windows
The timeline is divided into non-overlapping panes of size
slide_interval. Each sliding window is composed ofW = window_size / slide_intervalconsecutive panes. Consecutive windows shareW-1panes.This avoids re-processing samples N×W times (naïve approach) — each sample updates exactly 1 pane. Window close merges W panes via
AggregateCore::merge_with(). Non-invertible sketches (KLL, CMS, HydraKLL, MinMax, Increase) work correctly because merge is universal; subtraction-based approaches would only work for Sum.For tumbling windows (
W=1), panes degenerate to 1 pane = 1 window — no extra merges.Supported accumulator types
Late data handling (
LateDataPolicy)Drop: samples behind the watermark by more thanallowed_lateness_msare discarded silently.ForwardToStore: late samples are emitted as mini-accumulators alongside the original window; query-time merge (SummaryMergeMultipleExec) combines them automatically.Zero-serialization write path
Closed window accumulators flow from worker to store as in-memory
Box<dyn AggregateCore>trait objects via direct function calls — no serialization, no IPC, no network hops between aggregation and storage. Contrast with the Kafka/Arroyo path where precomputes arrive hex-encoded + gzip-compressed + MessagePack-serialized.Raw passthrough mode
pass_raw_samples = truebypasses all aggregation. Each sample is emitted as aSumAccumulator::with_sum(value)point-window, enabling store-and-forward of raw data.Cross-Series (GROUP BY) Aggregation
The
AggregationConfighas three label dimensions that control the spatial aggregation shape:grouping_labelsPrecomputedOutput.keyaggregated_labelsrollup_labelsA config with
grouping_labels: [job]androllup_labels: [instance]means: "sum across all instances, keep one output series per job value." Multiple input series (metric{job=j1,instance=h1},metric{job=j1,instance=h2}, ...) all contribute to the same logical output key(job=j1).How the engine handles this across workers:
Because routing is by full series key (
xxhash64(series_key) % N), two series that share agrouping_labelskey but differ in rolled-up labels land on different workers:Each worker independently closes its window and emits a separate
PrecomputedOutputwith key(job=j1)for the same window[0, 60s). The store appends rather than overwrites on the same(aggregation_id, key, window)tuple:Query-time
SummaryMergeMultipleExecmerges all entries for the same key and window viaAggregateCore::merge_with(), producing the correct combined result. No ingest-time cross-worker coordination is needed.Eventual consistency property:
Workers have independent watermarks. For a standard Prometheus scrape (all instances delivered in one HTTP batch to
route_batch()), all workers receive their samples in the same round-trip and close the window on the same flush cycle. The incompleteness window — the time between the first and last worker emitting for the same cross-series window — is typically in the millisecond range (bounded by worker task scheduling jitter on the shared Tokio thread pool).For staggered multi-source producers (different series arriving from different remote-write clients at different times), the incompleteness window can be longer, bounded by the spread of producer arrival times. In both cases the result is eventually consistent: once all contributing workers have emitted, the store holds a complete set of accumulators and queries return the correct merged value.
This deferred-merge design is intentional: it avoids any cross-worker coordination or global watermark synchronization at ingest time, keeping the shared-nothing worker architecture intact. The store's append-multiple-per-window design and the query-time merge already handle the fan-in correctly for both the cross-series aggregation case and the
ForwardToStorelate-data case.Performance (measured on this machine)
Hardware
All benchmarks run with
cargo run --release. Engine and client run in the same process.Batch latency — 1,000 samples in one HTTP request (raw mode, 4 workers)
Raw-mode flush throughput — 1,000 requests × 10,000 samples (workers = senders, NoopOutputSink)
Measures time from first request sent until all samples are flushed (emitted) from the engine, isolating engine processing throughput from store I/O. Flush lag behind send completion was ≤ 12 ms at all worker counts — the engine flushes as fast as it receives.
Near-linear scaling to 16 workers. At 16 workers the engine sustains ~8.9M samples/sec flush throughput on this hardware.
For comparison, the same 4-worker raw-mode test with
StoreOutputSink(writing 10M samples toSimpleMapStore) achieves 105,293 samples/sec E2E — ~17x lower, confirming the store's RwLock is the bottleneck in the stored path, not the engine itself.Windowed aggregation throughput — 200 requests × 5,000 samples, 50 series, 4 workers, 4 concurrent senders (NoopOutputSink)
E2E throughput is nearly identical across W=1 and W=6, confirming the pane-based optimization: each sample touches exactly 1 pane regardless of window overlap.
Worker scalability — Sliding 30s/10s Sum (W=3), 200 requests × 5,000 samples, 100 series (NoopOutputSink)
Windowed aggregation scales less than raw mode because window close merges (W-1 per window) and pane BTreeMap operations add per-worker CPU work that the raw path skips. E2E gains also flatten at high worker counts due to Tokio task scheduling overhead on this 32-logical-CPU machine.
New Files
precompute_engine/mod.rsprecompute_engine/config.rsPrecomputeEngineConfig,LateDataPolicyprecompute_engine/worker.rsprecompute_engine/series_router.rsprecompute_engine/series_buffer.rsprecompute_engine/window_manager.rsprecompute_engine/accumulator_factory.rsAccumulatorUpdatertrait + factory for all sketch typesprecompute_engine/output_sink.rsOutputSinktrait +StoreOutputSink,NoopOutputSink,CapturingOutputSink(for testing)precompute_engine/precompute_engine_design_doc.mdbin/precompute_engine.rsbin/test_e2e_precompute.rsdrivers/ingest/prometheus_remote_write.rsdrivers/ingest/victoriametrics_remote_write.rsConfiguration
Validation
Unit tests —
worker.rs(6 tests viaCapturingOutputSink)CapturingOutputSinkstores all(PrecomputedOutput, Box<dyn AggregateCore>)emits in aMutex<Vec<...>>and exposesdrain()/len(). Tests callprocess_samplesandflush_alldirectly on theWorkerstruct without spinning up any Tokio tasks or HTTP servers.test_raw_mode_forwardingstart == end == tsandSumAccumulator.sum == valuetest_tumbling_window_correctnessstart=0, end=10000, sum=6test_sliding_window_pane_sharingsum=42via shared pane snapshot/taketest_groupby_separate_emits_per_serieshost=A,host=B) on the same worker produce 2 independentMultipleSumAccumulatoremits — no ingest-time cross-series mergetest_late_data_dropwatermark - allowed_lateness_mswithDroppolicy → 0 emitstest_late_data_forward_to_storeForwardToStorepolicy → 1 emit as mini-accumulator with correct window bounds and sumE2E tests —
test_e2e_precompute.rsStatic checks
cargo check -p query_engine_rustpassescargo clippy -p query_engine_rust)