Skip to content

Add streaming precompute engine: sketch-based windowed aggregation over Prometheus/VictoriaMetrics remote write#162

Open
zzylol wants to merge 38 commits intomainfrom
156-discussion-requirements-and-design-choices-for-a-sketch-based-streaming-pre-compute
Open

Add streaming precompute engine: sketch-based windowed aggregation over Prometheus/VictoriaMetrics remote write#162
zzylol wants to merge 38 commits intomainfrom
156-discussion-requirements-and-design-choices-for-a-sketch-based-streaming-pre-compute

Conversation

@zzylol
Copy link
Copy Markdown
Contributor

@zzylol zzylol commented Mar 6, 2026

Summary

Design document: precompute_engine_design_doc.md

Introduces the Precompute Engine — a real-time streaming aggregation system that ingests raw time-series samples via Prometheus and VictoriaMetrics remote write, computes windowed sketch aggregations, and writes results directly to the ASAP store for fast query-time retrieval.

Architecture

Prometheus Remote Write       VictoriaMetrics Remote Write
 (Snappy + Protobuf)             (Zstd + Protobuf)
         |                              |
         v                              v
POST /api/v1/write           POST /api/v1/import
          \                           /
           Axum HTTP Server (:9090)
                    |
          route_decoded_samples()
           (group by series key)
                    |
             SeriesRouter (xxhash64)
           /        |        \
      Worker 0   Worker 1   Worker N-1
      (shard 0)  (shard 1)  (shard N-1)
                    |
             OutputSink.emit_batch()
                    |
             SimpleMapStore (PerKey)
                    |
            Query Engine (PromQL / SQL)

Single-machine, multi-threaded: all workers are Tokio async tasks in one process. Series are hash-partitioned across workers — no cross-shard coordination, no locks within a worker.

Key Design Choices

Pane-based incremental sliding windows
The timeline is divided into non-overlapping panes of size slide_interval. Each sliding window is composed of W = window_size / slide_interval consecutive panes. Consecutive windows share W-1 panes.

This avoids re-processing samples N×W times (naïve approach) — each sample updates exactly 1 pane. Window close merges W panes via AggregateCore::merge_with(). Non-invertible sketches (KLL, CMS, HydraKLL, MinMax, Increase) work correctly because merge is universal; subtraction-based approaches would only work for Sum.

For tumbling windows (W=1), panes degenerate to 1 pane = 1 window — no extra merges.

Supported accumulator types

Aggregation Sub-type Notes
SingleSubpopulation Sum, Min/Max, Increase, KLL per-metric, no grouping
MultipleSubpopulation Sum, Min/Max, Increase, CMS, HydraKLL grouped by label values

Late data handling (LateDataPolicy)

  • Drop: samples behind the watermark by more than allowed_lateness_ms are discarded silently.
  • ForwardToStore: late samples are emitted as mini-accumulators alongside the original window; query-time merge (SummaryMergeMultipleExec) combines them automatically.

Zero-serialization write path
Closed window accumulators flow from worker to store as in-memory Box<dyn AggregateCore> trait objects via direct function calls — no serialization, no IPC, no network hops between aggregation and storage. Contrast with the Kafka/Arroyo path where precomputes arrive hex-encoded + gzip-compressed + MessagePack-serialized.

Raw passthrough mode
pass_raw_samples = true bypasses all aggregation. Each sample is emitted as a SumAccumulator::with_sum(value) point-window, enabling store-and-forward of raw data.

Cross-Series (GROUP BY) Aggregation

The AggregationConfig has three label dimensions that control the spatial aggregation shape:

Field Role
grouping_labels Store GROUP BY key — labels preserved in PrecomputedOutput.key
aggregated_labels Internal sketch sub-keys (MultipleSubpopulation only)
rollup_labels Dropped entirely at ingest; not recoverable at query time

A config with grouping_labels: [job] and rollup_labels: [instance] means: "sum across all instances, keep one output series per job value." Multiple input series (metric{job=j1,instance=h1}, metric{job=j1,instance=h2}, ...) all contribute to the same logical output key (job=j1).

How the engine handles this across workers:

Because routing is by full series key (xxhash64(series_key) % N), two series that share a grouping_labels key but differ in rolled-up labels land on different workers:

metric{job=j1, instance=h1} -> Worker 0 -> pane accumulator keyed (job=j1)
metric{job=j1, instance=h2} -> Worker 3 -> pane accumulator keyed (job=j1)

Each worker independently closes its window and emits a separate PrecomputedOutput with key (job=j1) for the same window [0, 60s). The store appends rather than overwrites on the same (aggregation_id, key, window) tuple:

store[(agg_id, key=(j1), [0,60s))] -> [acc_worker0, acc_worker3]

Query-time SummaryMergeMultipleExec merges all entries for the same key and window via AggregateCore::merge_with(), producing the correct combined result. No ingest-time cross-worker coordination is needed.

Eventual consistency property:

Workers have independent watermarks. For a standard Prometheus scrape (all instances delivered in one HTTP batch to route_batch()), all workers receive their samples in the same round-trip and close the window on the same flush cycle. The incompleteness window — the time between the first and last worker emitting for the same cross-series window — is typically in the millisecond range (bounded by worker task scheduling jitter on the shared Tokio thread pool).

For staggered multi-source producers (different series arriving from different remote-write clients at different times), the incompleteness window can be longer, bounded by the spread of producer arrival times. In both cases the result is eventually consistent: once all contributing workers have emitted, the store holds a complete set of accumulators and queries return the correct merged value.

This deferred-merge design is intentional: it avoids any cross-worker coordination or global watermark synchronization at ingest time, keeping the shared-nothing worker architecture intact. The store's append-multiple-per-window design and the query-time merge already handle the fan-in correctly for both the cross-series aggregation case and the ForwardToStore late-data case.

Performance (measured on this machine)

Hardware

  • CPU: 2x Intel Xeon E5-2630 v3 @ 2.40 GHz (8 cores / 16 threads per socket, 32 logical CPUs total, max turbo 3.2 GHz)
  • L3 cache: 20 MiB per socket (40 MiB total)
  • RAM: 125 GiB

All benchmarks run with cargo run --release. Engine and client run in the same process.

Batch latency — 1,000 samples in one HTTP request (raw mode, 4 workers)

Payload (snappy) HTTP round-trip
12,253 bytes 6.1 ms

Raw-mode flush throughput — 1,000 requests × 10,000 samples (workers = senders, NoopOutputSink)

Measures time from first request sent until all samples are flushed (emitted) from the engine, isolating engine processing throughput from store I/O. Flush lag behind send completion was ≤ 12 ms at all worker counts — the engine flushes as fast as it receives.

Workers Send (samples/sec) Flush E2E (samples/sec) Speedup
1 465,039 465,039 1.00x
2 918,189 917,248 1.97x
4 1,810,115 1,806,470 3.88x
8 4,377,675 4,356,324 9.37x
16 8,990,338 8,900,489 19.13x

Near-linear scaling to 16 workers. At 16 workers the engine sustains ~8.9M samples/sec flush throughput on this hardware.

Note on super-linear speedup at 8× and 16× workers: The benchmark uses workers = senders — sender concurrency scales with worker count. The 1-worker baseline is bottlenecked by a single sender: one CPU core for Snappy compression, one in-flight HTTP connection, one channel to fill. At 16 workers, 16 concurrent senders parallelize compression across 16 cores and pipeline 16 HTTP connections simultaneously, pushing aggregate send throughput well past what a single sender can achieve. The apparent 9.37×/19.13× speedup therefore reflects sender-side parallelism as much as engine-side scaling. A clean engine-scaling measurement would fix sender count and vary only worker count; the 1-worker baseline is an artificially low reference point because the sender saturates before the engine does.

For comparison, the same 4-worker raw-mode test with StoreOutputSink (writing 10M samples to SimpleMapStore) achieves 105,293 samples/sec E2E — ~17x lower, confirming the store's RwLock is the bottleneck in the stored path, not the engine itself.

Windowed aggregation throughput — 200 requests × 5,000 samples, 50 series, 4 workers, 4 concurrent senders (NoopOutputSink)

Config Send (samples/sec) E2E (samples/sec) Batch latency
Tumbling 10s Sum (W=1) 987,143 660,814 24.7 ms
Sliding 30s/10s Sum (W=3) 937,540 637,977 17.4 ms
Sliding 60s/10s Sum (W=6) 1,005,785 668,677 22.1 ms

E2E throughput is nearly identical across W=1 and W=6, confirming the pane-based optimization: each sample touches exactly 1 pane regardless of window overlap.

Worker scalability — Sliding 30s/10s Sum (W=3), 200 requests × 5,000 samples, 100 series (NoopOutputSink)

Workers Send (samples/sec) E2E (samples/sec) Speedup vs 1 worker
1 247,805 220,451 1.00x
2 476,275 384,591 1.74x
4 902,488 621,468 2.82x
8 1,873,707 966,405 4.38x
16 4,160,976 1,347,133 6.11x

Windowed aggregation scales less than raw mode because window close merges (W-1 per window) and pane BTreeMap operations add per-worker CPU work that the raw path skips. E2E gains also flatten at high worker counts due to Tokio task scheduling overhead on this 32-logical-CPU machine.

New Files

File Purpose
precompute_engine/mod.rs Orchestrator, Axum HTTP server, flush timer
precompute_engine/config.rs PrecomputeEngineConfig, LateDataPolicy
precompute_engine/worker.rs Per-shard processing: watermark, pane buffering, window close, emit; unit tests for all correctness properties
precompute_engine/series_router.rs xxhash64 routing: series key -> worker index
precompute_engine/series_buffer.rs Per-series BTreeMap sample buffer with watermark
precompute_engine/window_manager.rs Tumbling/sliding window alignment, closure detection, pane enumeration
precompute_engine/accumulator_factory.rs AccumulatorUpdater trait + factory for all sketch types
precompute_engine/output_sink.rs OutputSink trait + StoreOutputSink, NoopOutputSink, CapturingOutputSink (for testing)
precompute_engine/precompute_engine_design_doc.md Full design document
bin/precompute_engine.rs Standalone CLI binary
bin/test_e2e_precompute.rs In-process E2E test: ingest -> aggregate -> PromQL query, batch latency and throughput benchmarks, worker scalability benchmark
drivers/ingest/prometheus_remote_write.rs Prometheus remote write decoder (Snappy + protobuf)
drivers/ingest/victoriametrics_remote_write.rs VictoriaMetrics remote write decoder (Zstd + protobuf)

Configuration

# Standalone binary
cargo run --bin precompute_engine -- \
  --streaming-config streaming_config.yaml \
  --ingest-port 9090 \
  --num-workers 4 \
  --allowed-lateness-ms 5000 \
  --flush-interval-ms 1000 \
  --late-data-policy drop

# Embedded in main binary (shares store with Kafka consumer path)
query_engine_rust --enable-prometheus-remote-write ...

Validation

Unit tests — worker.rs (6 tests via CapturingOutputSink)

CapturingOutputSink stores all (PrecomputedOutput, Box<dyn AggregateCore>) emits in a Mutex<Vec<...>> and exposes drain() / len(). Tests call process_samples and flush_all directly on the Worker struct without spinning up any Tokio tasks or HTTP servers.

Test What it verifies
test_raw_mode_forwarding 3 samples → 3 emits; each has start == end == ts and SumAccumulator.sum == value
test_tumbling_window_correctness Samples at t=1s/5s/9s sum to 6; window [0,10s) closes on t=10s sample; start=0, end=10000, sum=6
test_sliding_window_pane_sharing Single sample at t=15s in 30s/10s window → 2 emits for [0,30s) and [10s,40s), both sum=42 via shared pane snapshot/take
test_groupby_separate_emits_per_series Two series (host=A, host=B) on the same worker produce 2 independent MultipleSumAccumulator emits — no ingest-time cross-series merge
test_late_data_drop Sample behind watermark - allowed_lateness_ms with Drop policy → 0 emits
test_late_data_forward_to_store Late sample for an evicted pane with ForwardToStore policy → 1 emit as mini-accumulator with correct window bounds and sum

E2E tests — test_e2e_precompute.rs

  • Engine starts in-process, sends Prometheus remote-write batches over HTTP, queries results via PromQL HTTP endpoint, validates aggregated values match expected.
  • Raw-mode verification: 3 samples sent, looked up directly in store.

Static checks

  • cargo check -p query_engine_rust passes
  • Clippy clean (cargo clippy -p query_engine_rust)

@zzylol zzylol changed the title Port asap-internal PR #405 into reorged ASAPQuery Port asap-internal PR #405 of hand-crafted precompute engine into reorged ASAPQuery Mar 8, 2026
zzylol and others added 23 commits March 9, 2026 05:39
Implements a single-node multi-threaded precompute engine as a new module
and binary target within QueryEngineRust. The engine ingests Prometheus
remote write samples, buffers them per-series with out-of-order handling,
detects closed tumbling/sliding windows via event-time watermarks, feeds
samples into accumulator wrappers for all existing sketch types, and emits
PrecomputedOutput directly to the store.

New modules: config, series_buffer, window_manager, accumulator_factory,
series_router, worker, output_sink, and the PrecomputeEngine orchestrator.
The binary supports embedded store + query HTTP server for single-process
deployment.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace sketch_db_common::enums::QueryLanguage import with
query_engine_rust::data_model::QueryLanguage in the standalone binary,
and integrate PrecomputeEngine into main.rs as an alternative to the
removed PrometheusRemoteWriteServer.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Handle top-level aggregation types (DatasketchesKLL, Sum, Min, Max,
etc.) directly in the factory match, fixing the fallback to Sum that
broke quantile queries. Also preserve the K parameter in
KllAccumulatorUpdater::reset() instead of hardcoding 200.

Add test_e2e_precompute binary that validates the full ingest ->
precompute -> store -> query pipeline end-to-end.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When pass_raw_samples is enabled, each incoming sample is emitted
directly to the store as a SumAccumulator without windowing or
watermark advancement. This supports use cases where raw passthrough
is needed instead of sketch-based aggregation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…te engine

Instrument the ingest→worker→store pipeline with debug_span! and an
Instant-based e2e latency log. All spans are at debug level so there is
zero overhead at the default info level. Enable FmtSpan::CLOSE in both
binaries to emit span durations when RUST_LOG=debug.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sends 10 series × 100 samples in a single HTTP request to the raw-mode
engine and verifies all 1000 samples land in the store. Prints client
RTT and per-series e2e_latency_us at debug level.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sends 1000 requests × 10000 samples (50 distinct series) to the
raw-mode engine and polls until all samples are stored. Reports both
send throughput and e2e throughput including drain time.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…dows

Previously each sample was assigned to only one window via
window_start_for(), which is incorrect for sliding windows where
window_size > slide_interval. Added window_starts_containing() that
returns all window starts whose range covers the timestamp, and use
it in the worker aggregation loop.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…n doc

Fix ghost accumulator bug where late samples passing the allowed_lateness
check could create orphaned accumulators for already-closed windows. Add
configurable LateDataPolicy (Drop/ForwardToStore) to control behavior.
Drop prevents ghost windows; ForwardToStore emits mini-accumulators for
query-time merge. Also add precompute engine design document.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Extract format-agnostic routing logic into route_decoded_samples() helper
and register a VictoriaMetrics remote write endpoint at /api/v1/import
alongside the existing Prometheus endpoint at /api/v1/write.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Route each sample to exactly 1 pane (sub-window of size slide_interval)
instead of W overlapping window accumulators. When a window closes, merge
its constituent panes via AggregateCore::merge_with(). This reduces
per-sample accumulator updates from W to 1 for sliding windows.

- Add snapshot_accumulator() to AccumulatorUpdater trait (9 implementations)
- Add pane_start_for(), panes_for_window(), slide_interval_ms() to WindowManager
- Replace active_windows HashMap with active_panes BTreeMap in worker
- Rewrite sample routing and window close logic with pane merging
- Extract PrecomputeEngine to engine.rs, ingest handlers to ingest_handler.rs
- Update design doc with pane-based architecture

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add tumbling and sliding window throughput benchmarks to E2E test
  (Tumbling 10s, Sliding 30s/10s W=3, Sliding 60s/10s W=6)
- Each benchmark uses NoopOutputSink to isolate worker throughput
- Fix cargo fmt formatting in window_manager.rs and worker.rs
- Pin sketchlib-rust to rev 663a1df for Chapter enum compatibility
- Fix CMS updater field access (self.acc.inner.{row_num,col_num,sketch})
- Fix SimpleEngine::new missing promsketch_store arg in E2E test

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Parameterize run_single_bench with num_workers and add
run_scalability_benchmark() that measures throughput across
1, 2, 4, 8, 16 workers with sliding 30s/10s Sum (W=3).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use tokio::task::spawn_blocking to build request payloads in parallel
across 8 threads, and send HTTP requests concurrently via 8 tokio tasks
(matching the pattern used in run_single_bench).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The 10M-sample throughput test needs ~94s to fully drain through the
store, exceeding the previous 60s deadline.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@zzylol zzylol force-pushed the 156-discussion-requirements-and-design-choices-for-a-sketch-based-streaming-pre-compute branch from 30f2f29 to 369836c Compare March 9, 2026 10:40
@zzylol zzylol changed the title Port asap-internal PR #405 of hand-crafted precompute engine into reorged ASAPQuery Add streaming precompute engine: sketch-based windowed aggregation over Prometheus/VictoriaMetrics remote write Mar 9, 2026
@zzylol zzylol self-assigned this Mar 9, 2026
zzylol and others added 2 commits March 9, 2026 09:36
Adds a CapturingOutputSink to output_sink.rs that stores all emitted
(PrecomputedOutput, Box<dyn AggregateCore>) pairs in a Mutex<Vec> for
inspection in tests, alongside drain() and len() helpers.

Adds 6 unit tests in worker.rs covering:
- Raw mode: each sample forwarded as SumAccumulator with sum == value
- Tumbling window: correct boundary and aggregated sum on window close
- Sliding window pane sharing: single sample emits in both overlapping
  windows with the correct sum via snapshot/take pane merge
- GROUP BY separate emits: two series on the same worker produce
  independent per-series accumulators (no ingest-time cross-series merge)
- Late data Drop: sample behind watermark - allowed_lateness not emitted
- Late data ForwardToStore: late sample for evicted pane emitted as
  mini-accumulator with correct window bounds and sum

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Add section 4 on cross-series GROUP BY aggregation: label dimension
  roles (grouping/aggregated/rollup), cross-worker fan-in via store append,
  and eventual consistency property
- Add CapturingOutputSink to OutputSink implementations (section 3.8)
- Expand section 11 (Testing) with the 6 new worker.rs correctness tests
- Update section 9 (Performance) with measured throughput numbers and a
  note on the workers=senders benchmark caveat causing super-linear speedup
- Renumber sections 5-11 to accommodate the new GROUP BY section
- Update file map entry for output_sink.rs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
zzylol and others added 5 commits March 9, 2026 09:46
Adds a subsection under §4 (Cross-Series GROUP BY Aggregation) that
clarifies how the pane-sharing optimization interacts with cross-worker
fan-in. Key point: pane snapshot/take is an intra-worker detail; each
worker always emits one complete accumulator per closed window regardless
of W, so the store append + query-time merge path is identical for
tumbling and sliding windows.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds an 'Accumulator lifecycle and ownership' subsection to §3.4 (Worker)
explaining the three-level lazy initialisation: configs copied to all
workers at startup, AggregationState created on first sample per series,
AccumulatorUpdater created on first sample per pane. Includes the full
ownership hierarchy diagram and the deterministic-hash ownership guarantee.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…uter

Adds a subsection under §3.3 (SeriesRouter) covering the four failure
modes of hash-mod routing (hash skew, hot series, GROUP BY fan-in,
static assignment) and a concrete mitigation strategy for each:
virtual nodes, weight-aware placement, grouping-key routing /
two-phase aggregation, state migration at window boundaries. Includes
a practical recommendation to add MPSC channel backpressure metrics
as the first observability step before any structural change.

No implementation changes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The original comment said "Manages tumbling window boundaries" which
implied it only handled tumbling windows. Updated to reflect that it
handles both tumbling and sliding windows, and clarified the
slide_interval_ms field comment to distinguish the two cases.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…er tests

Adds §12 (Known Data Loss Cases and Fault Tolerance TODOs) to the design
doc covering all six data loss scenarios with a summary table, and four
actionable TODOs:
- Warn on unmatched series (case 4)
- Opt-in force-flush of open panes on shutdown (case 5)
- Worker restart on panic (case 6)
- Write-ahead log / periodic pane snapshots for full crash recovery

Also applies rustfmt formatting fixes to worker.rs test module (import
ordering, line wrapping) to pass cargo fmt --check.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
zzylol and others added 2 commits March 9, 2026 16:28
…nfig YAML

Adds test_worker_from_streaming_config_yaml which parses an inline YAML
string through StreamingConfig::from_yaml_data (the same path the Python
controller output takes at engine startup) and verifies that a Worker
built from the resulting agg_configs correctly closes a tumbling window
and computes the expected sum.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
zzylol and others added 6 commits March 11, 2026 11:00
accumulator_factory.rs:
- Add impl_clone_accumulator_methods! macro to replace 16 identical
  take_accumulator/snapshot_accumulator bodies across 8 updater structs
- Add config_is_keyed() free fn for static keyed check without allocating
  an updater (replaces throwaway-updater pattern used 4+ times)
- Add kll_k_param(), cms_params(), hydra_kll_params() helpers to
  eliminate 4 copy-pasted parameter extraction blocks in the factory
- Fix latent bug: SingleSubpopulation/KLL arm now checks "K" (capital)
  before "k", consistent with all other arms
- Add tests: test_config_is_keyed, test_kll_k_param_capital_k

worker.rs:
- AggregationState.config: AggregationConfig -> Arc<AggregationConfig>
  eliminating N×M deep config copies across series×aggregations
- Worker.agg_configs and Worker::new param updated to Arc map;
  matching_agg_configs returns owned Arc clones (removes lifetime tie)
- Add apply_sample() free fn replacing 3 copies of is_keyed dispatch
- Add merge_panes_for_window() free fn replacing identical ~40-line
  pane-merge loop in both process_samples and flush_all
- Fix ForwardToStore path: is_keyed() and extract_key_from_series()
  each called once instead of twice

engine.rs:
- Wrap aggregation configs in Arc once at engine startup; per-worker
  agg_configs.clone() is now N pointer bumps, not N full deep copies

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Rewrites the "Optional second-tier merge workers" section to fix seven
correctness issues identified in review:

1. Deadlock (critical): workers with no matching series never emitted
   WindowCompletion. Fix: introduce per_agg_watermark per worker,
   advanced by both data arrival and the flush timer (wall-clock floor),
   so every worker emits WindowCompletion for every aggregation on every
   flush cycle regardless of whether it has matching series.

2. WindowCompletion routing (critical): routing partials by
   (agg_id, key, window) while routing key-less completions by
   (agg_id, window) required an O(M) broadcast of completions to all
   merge workers. Fix: route ALL messages — partials and completions —
   by hash(agg_id, window_start, window_end) only, co-locating all keys
   for a window on one merge worker and eliminating the broadcast.

3. Conflicting struct definitions (critical): two contradictory
   WindowCompletion structs were present (one with key, one without).
   Fix: adopt a single key-less definition; explain the rationale.

4. Merge-tier watermark undefined (significant): PendingWindowState had
   no eviction bound. Fix: define merge_watermark as the min of
   worker_watermark_ms values received via completions; add
   force-eviction after eviction_grace_period_ms for lagging workers.

5. Late data re-finalization (significant): "re-open the merged result"
   was unspecified after a window was already written to the store. Fix:
   late corrections remain store appends with query-time merge (same as
   non-merge-tier path); canonical finalized output is never mutated.

6. Message ordering (minor): partials and completions sent through
   separate channels had no ordering guarantee. Fix: both go through the
   same MPSC channel from a given first-tier worker; partials always
   enqueued before the completion for the same window.

7. State structure (minor): per-key completed_sources tracking was
   incompatible with key-less completions (required O(keys) scan).
   Fix: split state into window_completions (window-level) and
   pending_partials (key-level) maps; completion check is O(1),
   finalization scan is O(keys per window) done once at close.

Also updates the rollout strategy to note that per_agg_watermark and
flush-timer-driven WindowCompletion emission must be deployed in
first-tier workers before any merge worker is started.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…arking

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Discussion] Requirements and design choices for a sketch-based streaming pre-compute

1 participant