From 30151a862db5eb71fdafe128812b214d05f22b4b Mon Sep 17 00:00:00 2001 From: zz_y Date: Sat, 21 Feb 2026 15:23:09 -0700 Subject: [PATCH 01/19] Add standalone precompute engine to replace Arroyo streaming pipeline Implements a single-node multi-threaded precompute engine as a new module and binary target within QueryEngineRust. The engine ingests Prometheus remote write samples, buffers them per-series with out-of-order handling, detects closed tumbling/sliding windows via event-time watermarks, feeds samples into accumulator wrappers for all existing sketch types, and emits PrecomputedOutput directly to the store. New modules: config, series_buffer, window_manager, accumulator_factory, series_router, worker, output_sink, and the PrecomputeEngine orchestrator. The binary supports embedded store + query HTTP server for single-process deployment. Co-Authored-By: Claude Opus 4.6 --- asap-query-engine/Cargo.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index a5fca27..4373bfe 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -67,6 +67,10 @@ path = "src/bin/precompute_engine.rs" name = "test_e2e_precompute" path = "src/bin/test_e2e_precompute.rs" +[[bin]] +name = "precompute_engine" +path = "src/bin/precompute_engine.rs" + [dev-dependencies] ctor = "0.2" tempfile = "3.20.0" From 19c0f84933476e4949fec07592f38f0784b34e51 Mon Sep 17 00:00:00 2001 From: zz_y Date: Mon, 23 Feb 2026 12:14:53 -0700 Subject: [PATCH 02/19] Wire up DatasketchesKLL in accumulator factory and add E2E test Handle top-level aggregation types (DatasketchesKLL, Sum, Min, Max, etc.) directly in the factory match, fixing the fallback to Sum that broke quantile queries. Also preserve the K parameter in KllAccumulatorUpdater::reset() instead of hardcoding 200. Add test_e2e_precompute binary that validates the full ingest -> precompute -> store -> query pipeline end-to-end. Co-Authored-By: Claude Opus 4.6 --- asap-query-engine/Cargo.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index 4373bfe..2cb7ec3 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -71,6 +71,10 @@ path = "src/bin/test_e2e_precompute.rs" name = "precompute_engine" path = "src/bin/precompute_engine.rs" +[[bin]] +name = "test_e2e_precompute" +path = "src/bin/test_e2e_precompute.rs" + [dev-dependencies] ctor = "0.2" tempfile = "3.20.0" From c822c8b04ec8d1533100e46f0969bd9af89ebe97 Mon Sep 17 00:00:00 2001 From: zz_y Date: Mon, 23 Feb 2026 14:26:27 -0700 Subject: [PATCH 03/19] Add 1000-sample batch latency test to E2E precompute test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sends 10 series × 100 samples in a single HTTP request to the raw-mode engine and verifies all 1000 samples land in the store. Prints client RTT and per-series e2e_latency_us at debug level. Co-Authored-By: Claude Opus 4.6 --- .../src/bin/test_e2e_precompute.rs | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index a8235d0..9fa350c 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -334,6 +334,60 @@ async fn main() -> Result<(), Box> { } println!(" Raw mode test PASSED"); + // ----------------------------------------------------------------------- + // BATCH LATENCY TEST + // Send 1000 samples in a single HTTP request to measure realistic e2e + // latency. Uses the raw-mode engine to avoid window-close dependencies. + // ----------------------------------------------------------------------- + println!("\n=== Batch latency test: 1000 samples in one request ==="); + + // Build a single WriteRequest with 1000 TimeSeries entries spread across + // 10 series × 100 timestamps each, so routing fans out to workers. + let mut batch_timeseries = Vec::with_capacity(1000); + for series_idx in 0..10 { + let label_val = format!("batch_{series_idx}"); + for t in 0..100 { + let ts = 200_000 + (series_idx as i64) * 1000 + t; // unique ts per sample + let val = (series_idx * 100 + t) as f64; + batch_timeseries.push(make_sample("fake_metric", &label_val, ts, val)); + } + } + let batch_body = build_remote_write_body(batch_timeseries); + println!(" Payload size: {} bytes (snappy-compressed)", batch_body.len()); + + let t0 = std::time::Instant::now(); + let resp = client + .post(format!("http://localhost:{RAW_INGEST_PORT}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(batch_body) + .send() + .await?; + let client_rtt = t0.elapsed(); + println!( + " HTTP response: {} in {:.3}ms", + resp.status().as_u16(), + client_rtt.as_secs_f64() * 1000.0, + ); + + // Wait for all workers to finish processing + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + // Verify samples landed in the store + let batch_results = store.query_precomputed_output( + "fake_metric", + raw_agg_id, + 200_000, + 210_000, + )?; + let batch_buckets: usize = batch_results.values().map(|v| v.len()).sum(); + println!(" Stored {batch_buckets} buckets from batch (expected 1000)"); + assert!( + batch_buckets >= 1000, + "Expected at least 1000 raw samples in store, got {batch_buckets}" + ); + println!(" Batch latency test PASSED"); + println!("\n=== E2E test complete ==="); Ok(()) From eedc6ce7d762a9b002182d03f7715d292336e6ef Mon Sep 17 00:00:00 2001 From: zz_y Date: Mon, 23 Feb 2026 14:35:18 -0700 Subject: [PATCH 04/19] Add 10M-sample throughput test to E2E precompute test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sends 1000 requests × 10000 samples (50 distinct series) to the raw-mode engine and polls until all samples are stored. Reports both send throughput and e2e throughput including drain time. Co-Authored-By: Claude Opus 4.6 --- .../src/bin/test_e2e_precompute.rs | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index 9fa350c..be1e8bc 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -388,6 +388,89 @@ async fn main() -> Result<(), Box> { ); println!(" Batch latency test PASSED"); + // ----------------------------------------------------------------------- + // THROUGHPUT TEST + // Send many requests back-to-back and measure sustained throughput + // (samples/sec). Uses the raw-mode engine for a clean measurement. + // ----------------------------------------------------------------------- + println!("\n=== Throughput test: 1000 requests × 10000 samples ==="); + + let num_requests = 1000u64; + let samples_per_request = 10_000u64; + let total_samples = num_requests * samples_per_request; + + // Pre-build all request bodies so serialization doesn't count against throughput + let mut bodies = Vec::with_capacity(num_requests as usize); + for req_idx in 0..num_requests { + let mut timeseries = Vec::with_capacity(samples_per_request as usize); + for s in 0..samples_per_request { + let series_label = format!("tp_{}", s % 50); // 50 distinct series + let ts = 300_000 + req_idx as i64 * 10_000 + s as i64; + timeseries.push(make_sample("fake_metric", &series_label, ts, s as f64)); + } + bodies.push(build_remote_write_body(timeseries)); + } + + let throughput_start = std::time::Instant::now(); + + for (i, body) in bodies.into_iter().enumerate() { + let resp = client + .post(format!("http://localhost:{RAW_INGEST_PORT}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(body) + .send() + .await?; + if resp.status() != reqwest::StatusCode::NO_CONTENT { + eprintln!(" Request {i} failed: {}", resp.status()); + } + } + + let send_elapsed = throughput_start.elapsed(); + println!( + " All {} requests sent in {:.1}ms", + num_requests, + send_elapsed.as_secs_f64() * 1000.0, + ); + + // Poll until workers drain or timeout after 60s + let max_ts = 300_000u64 + num_requests * 10_000 + samples_per_request; + let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(60); + let mut tp_buckets: usize; + loop { + let tp_results = store.query_precomputed_output( + "fake_metric", + raw_agg_id, + 300_000, + max_ts, + )?; + tp_buckets = tp_results.values().map(|v| v.len()).sum(); + if tp_buckets as u64 >= total_samples || std::time::Instant::now() > drain_deadline { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + } + let total_elapsed = throughput_start.elapsed(); + + let send_throughput = total_samples as f64 / send_elapsed.as_secs_f64(); + let e2e_throughput = tp_buckets as f64 / total_elapsed.as_secs_f64(); + println!(" Stored {tp_buckets}/{total_samples} samples"); + println!( + " Send throughput: {:.0} samples/sec ({:.1}ms for {total_samples} samples)", + send_throughput, + send_elapsed.as_secs_f64() * 1000.0, + ); + println!( + " E2E throughput: {:.0} samples/sec ({:.1}ms until all stored)", + e2e_throughput, + total_elapsed.as_secs_f64() * 1000.0, + ); + assert!( + tp_buckets as u64 >= total_samples, + "Expected at least {total_samples} samples in store, got {tp_buckets}" + ); + println!(" Throughput test PASSED"); + println!("\n=== E2E test complete ==="); Ok(()) From b749ac3e44837945fc5b46919ac456f8d5618882 Mon Sep 17 00:00:00 2001 From: zz_y Date: Mon, 23 Feb 2026 17:42:19 -0700 Subject: [PATCH 05/19] update --- .../src/bin/test_e2e_precompute.rs | 2 +- .../src/precompute_engine/mod.rs | 33 ++++++++------ .../src/precompute_engine/series_router.rs | 43 +++++++++++++++++++ 3 files changed, 63 insertions(+), 15 deletions(-) diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index be1e8bc..38c2cd2 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -275,7 +275,7 @@ async fn main() -> Result<(), Box> { // Pick aggregation_id = 1 to match the existing streaming config. let raw_agg_id: u64 = 1; let raw_engine_config = PrecomputeEngineConfig { - num_workers: 1, + num_workers: 4, ingest_port: RAW_INGEST_PORT, allowed_lateness_ms: 5000, max_buffer_per_series: 10000, diff --git a/asap-query-engine/src/precompute_engine/mod.rs b/asap-query-engine/src/precompute_engine/mod.rs index c7fe7e9..5471618 100644 --- a/asap-query-engine/src/precompute_engine/mod.rs +++ b/asap-query-engine/src/precompute_engine/mod.rs @@ -15,9 +15,9 @@ use crate::precompute_engine::worker::Worker; use axum::{body::Bytes, extract::State, http::StatusCode, routing::post, Router}; use std::collections::HashMap; use std::sync::Arc; -use std::time::Instant; use tokio::net::TcpListener; use tokio::sync::mpsc; +use std::time::Instant; use tracing::{debug_span, info, warn, Instrument}; /// Shared state for the ingest HTTP handler. @@ -67,8 +67,10 @@ impl PrecomputeEngine { let router = SeriesRouter::new(senders); // Build aggregation config map from streaming config - let agg_configs: HashMap = - self.streaming_config.get_all_aggregation_configs().clone(); + let agg_configs: HashMap = self + .streaming_config + .get_all_aggregation_configs() + .clone(); // Spawn workers let mut worker_handles = Vec::with_capacity(num_workers); @@ -136,7 +138,10 @@ impl PrecomputeEngine { } /// Axum handler for Prometheus remote write. -async fn handle_ingest(State(state): State>, body: Bytes) -> StatusCode { +async fn handle_ingest( + State(state): State>, + body: Bytes, +) -> StatusCode { let ingest_span = debug_span!("ingest", body_len = body.len()); let ingest_received_at = Instant::now(); @@ -167,16 +172,16 @@ async fn handle_ingest(State(state): State>, body: Bytes) -> St .push((s.timestamp_ms, s.value)); } - // Route each series batch to the correct worker - for (series_key, batch) in by_series { - if let Err(e) = state - .router - .route(series_key, batch, ingest_received_at) - .await - { - warn!("Routing error for {}: {}", series_key, e); - return StatusCode::INTERNAL_SERVER_ERROR; - } + // Convert to owned keys for batch routing + let by_series_owned: HashMap> = by_series + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .collect(); + + // Route all series to workers concurrently + if let Err(e) = state.router.route_batch(by_series_owned, ingest_received_at).await { + warn!("Batch routing error: {}", e); + return StatusCode::INTERNAL_SERVER_ERROR; } StatusCode::NO_CONTENT diff --git a/asap-query-engine/src/precompute_engine/series_router.rs b/asap-query-engine/src/precompute_engine/series_router.rs index 1f8533c..dc6c642 100644 --- a/asap-query-engine/src/precompute_engine/series_router.rs +++ b/asap-query-engine/src/precompute_engine/series_router.rs @@ -1,4 +1,6 @@ +use std::collections::HashMap; use std::time::Instant; +use futures::future::try_join_all; use tokio::sync::mpsc; use xxhash_rust::xxh64::xxh64; @@ -52,6 +54,47 @@ impl SeriesRouter { Ok(()) } + /// Route a pre-grouped batch of series to workers concurrently. + /// + /// Groups messages by target worker, then sends to each worker in parallel + /// (messages within a single worker are sent sequentially to preserve ordering). + pub async fn route_batch( + &self, + by_series: HashMap>, + ingest_received_at: Instant, + ) -> Result<(), Box> { + // Group messages by target worker index + let mut per_worker: HashMap> = HashMap::new(); + for (series_key, samples) in by_series { + let worker_idx = self.worker_for(&series_key); + per_worker + .entry(worker_idx) + .or_default() + .push(WorkerMessage::Samples { + series_key, + samples, + ingest_received_at, + }); + } + + // Send to each worker concurrently + try_join_all(per_worker.into_iter().map(|(worker_idx, messages)| { + let sender = &self.senders[worker_idx]; + async move { + for msg in messages { + sender.send(msg).await.map_err(|e| { + format!("Failed to send to worker {}: {}", worker_idx, e) + })?; + } + Ok::<(), String>(()) + } + })) + .await + .map_err(|e| -> Box { Box::new(std::io::Error::new(std::io::ErrorKind::Other, e)) })?; + + Ok(()) + } + /// Broadcast a flush signal to all workers. pub async fn broadcast_flush(&self) -> Result<(), Box> { for (i, sender) in self.senders.iter().enumerate() { From 685655ebf28097e51f073aa444f7863a36d08883 Mon Sep 17 00:00:00 2001 From: zz_y Date: Tue, 24 Feb 2026 12:25:22 -0700 Subject: [PATCH 06/19] Fix sliding window aggregation: feed samples into all overlapping windows Previously each sample was assigned to only one window via window_start_for(), which is incorrect for sliding windows where window_size > slide_interval. Added window_starts_containing() that returns all window starts whose range covers the timestamp, and use it in the worker aggregation loop. Co-Authored-By: Claude Opus 4.6 --- .../src/precompute_engine/window_manager.rs | 44 +++++++++++++++++++ .../src/precompute_engine/worker.rs | 26 ++++++----- 2 files changed, 58 insertions(+), 12 deletions(-) diff --git a/asap-query-engine/src/precompute_engine/window_manager.rs b/asap-query-engine/src/precompute_engine/window_manager.rs index 4d329da..d8bf23b 100644 --- a/asap-query-engine/src/precompute_engine/window_manager.rs +++ b/asap-query-engine/src/precompute_engine/window_manager.rs @@ -72,6 +72,20 @@ impl WindowManager { closed } + /// Return all window starts whose window `[start, start + window_size_ms)` + /// contains the given timestamp. For tumbling windows this returns exactly + /// one start; for sliding windows it returns `ceil(window_size / slide)` + /// starts. + pub fn window_starts_containing(&self, timestamp_ms: i64) -> Vec { + let mut starts = Vec::new(); + let mut start = self.window_start_for(timestamp_ms); + while start + self.window_size_ms > timestamp_ms { + starts.push(start); + start -= self.slide_interval_ms; + } + starts + } + /// Return the window `[start, end)` boundaries for a given window start. pub fn window_bounds(&self, window_start: i64) -> (i64, i64) { (window_start, window_start + self.window_size_ms) @@ -150,4 +164,34 @@ mod tests { let closed = wm.closed_windows(15_000, 35_000); assert_eq!(closed, vec![0]); } + + #[test] + fn test_window_starts_containing_tumbling() { + // 60s tumbling windows — each sample belongs to exactly one window + let wm = WindowManager::new(60, 0); + let mut starts = wm.window_starts_containing(15_000); + starts.sort(); + assert_eq!(starts, vec![0]); + + let mut starts = wm.window_starts_containing(60_000); + starts.sort(); + assert_eq!(starts, vec![60_000]); + } + + #[test] + fn test_window_starts_containing_sliding() { + // 30s window, 10s slide — each sample belongs to 3 windows + let wm = WindowManager::new(30, 10); + + // t=15_000 belongs to [0, 30_000), [10_000, 40_000) + // and [-10_000, 20_000) which starts negative — still returned + let mut starts = wm.window_starts_containing(15_000); + starts.sort(); + assert_eq!(starts, vec![-10_000, 0, 10_000]); + + // t=30_000 belongs to [10_000, 40_000), [20_000, 50_000), [30_000, 60_000) + let mut starts = wm.window_starts_containing(30_000); + starts.sort(); + assert_eq!(starts, vec![10_000, 20_000, 30_000]); + } } diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index fac2d23..05c0cd2 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -221,18 +221,20 @@ impl Worker { continue; // already dropped } - let window_start = agg_state.window_manager.window_start_for(ts); - - let updater = agg_state - .active_windows - .entry(window_start) - .or_insert_with(|| create_accumulator_updater(&agg_state.config)); - - if updater.is_keyed() { - let key = extract_key_from_series(series_key, &agg_state.config); - updater.update_keyed(&key, val, ts); - } else { - updater.update_single(val, ts); + let window_starts = agg_state.window_manager.window_starts_containing(ts); + + for window_start in window_starts { + let updater = agg_state + .active_windows + .entry(window_start) + .or_insert_with(|| create_accumulator_updater(&agg_state.config)); + + if updater.is_keyed() { + let key = extract_key_from_series(series_key, &agg_state.config); + updater.update_keyed(&key, val, ts); + } else { + updater.update_single(val, ts); + } } } From 8df492799ec10943f817ceacf869c74cdeb0b5b1 Mon Sep 17 00:00:00 2001 From: zz_y Date: Tue, 31 Mar 2026 15:11:27 -0500 Subject: [PATCH 07/19] fix: remove duplicate [[bin]] entries in Cargo.toml from rebase merge Co-Authored-By: Claude Sonnet 4.6 --- asap-query-engine/Cargo.toml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index 2cb7ec3..a5fca27 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -67,14 +67,6 @@ path = "src/bin/precompute_engine.rs" name = "test_e2e_precompute" path = "src/bin/test_e2e_precompute.rs" -[[bin]] -name = "precompute_engine" -path = "src/bin/precompute_engine.rs" - -[[bin]] -name = "test_e2e_precompute" -path = "src/bin/test_e2e_precompute.rs" - [dev-dependencies] ctor = "0.2" tempfile = "3.20.0" From 3ea0d58d1491f22210c6e5f5ae3c671819ecea88 Mon Sep 17 00:00:00 2001 From: zz_y Date: Tue, 31 Mar 2026 15:13:39 -0500 Subject: [PATCH 08/19] style: apply cargo fmt Co-Authored-By: Claude Sonnet 4.6 --- .../src/bin/test_e2e_precompute.rs | 21 +++++++------------ .../src/precompute_engine/mod.rs | 19 ++++++++--------- .../src/precompute_engine/series_router.rs | 13 +++++++----- 3 files changed, 25 insertions(+), 28 deletions(-) diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index 38c2cd2..27eb820 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -353,7 +353,10 @@ async fn main() -> Result<(), Box> { } } let batch_body = build_remote_write_body(batch_timeseries); - println!(" Payload size: {} bytes (snappy-compressed)", batch_body.len()); + println!( + " Payload size: {} bytes (snappy-compressed)", + batch_body.len() + ); let t0 = std::time::Instant::now(); let resp = client @@ -374,12 +377,8 @@ async fn main() -> Result<(), Box> { tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; // Verify samples landed in the store - let batch_results = store.query_precomputed_output( - "fake_metric", - raw_agg_id, - 200_000, - 210_000, - )?; + let batch_results = + store.query_precomputed_output("fake_metric", raw_agg_id, 200_000, 210_000)?; let batch_buckets: usize = batch_results.values().map(|v| v.len()).sum(); println!(" Stored {batch_buckets} buckets from batch (expected 1000)"); assert!( @@ -438,12 +437,8 @@ async fn main() -> Result<(), Box> { let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(60); let mut tp_buckets: usize; loop { - let tp_results = store.query_precomputed_output( - "fake_metric", - raw_agg_id, - 300_000, - max_ts, - )?; + let tp_results = + store.query_precomputed_output("fake_metric", raw_agg_id, 300_000, max_ts)?; tp_buckets = tp_results.values().map(|v| v.len()).sum(); if tp_buckets as u64 >= total_samples || std::time::Instant::now() > drain_deadline { break; diff --git a/asap-query-engine/src/precompute_engine/mod.rs b/asap-query-engine/src/precompute_engine/mod.rs index 5471618..1f3f1df 100644 --- a/asap-query-engine/src/precompute_engine/mod.rs +++ b/asap-query-engine/src/precompute_engine/mod.rs @@ -15,9 +15,9 @@ use crate::precompute_engine::worker::Worker; use axum::{body::Bytes, extract::State, http::StatusCode, routing::post, Router}; use std::collections::HashMap; use std::sync::Arc; +use std::time::Instant; use tokio::net::TcpListener; use tokio::sync::mpsc; -use std::time::Instant; use tracing::{debug_span, info, warn, Instrument}; /// Shared state for the ingest HTTP handler. @@ -67,10 +67,8 @@ impl PrecomputeEngine { let router = SeriesRouter::new(senders); // Build aggregation config map from streaming config - let agg_configs: HashMap = self - .streaming_config - .get_all_aggregation_configs() - .clone(); + let agg_configs: HashMap = + self.streaming_config.get_all_aggregation_configs().clone(); // Spawn workers let mut worker_handles = Vec::with_capacity(num_workers); @@ -138,10 +136,7 @@ impl PrecomputeEngine { } /// Axum handler for Prometheus remote write. -async fn handle_ingest( - State(state): State>, - body: Bytes, -) -> StatusCode { +async fn handle_ingest(State(state): State>, body: Bytes) -> StatusCode { let ingest_span = debug_span!("ingest", body_len = body.len()); let ingest_received_at = Instant::now(); @@ -179,7 +174,11 @@ async fn handle_ingest( .collect(); // Route all series to workers concurrently - if let Err(e) = state.router.route_batch(by_series_owned, ingest_received_at).await { + if let Err(e) = state + .router + .route_batch(by_series_owned, ingest_received_at) + .await + { warn!("Batch routing error: {}", e); return StatusCode::INTERNAL_SERVER_ERROR; } diff --git a/asap-query-engine/src/precompute_engine/series_router.rs b/asap-query-engine/src/precompute_engine/series_router.rs index dc6c642..1cb4fca 100644 --- a/asap-query-engine/src/precompute_engine/series_router.rs +++ b/asap-query-engine/src/precompute_engine/series_router.rs @@ -1,6 +1,6 @@ +use futures::future::try_join_all; use std::collections::HashMap; use std::time::Instant; -use futures::future::try_join_all; use tokio::sync::mpsc; use xxhash_rust::xxh64::xxh64; @@ -82,15 +82,18 @@ impl SeriesRouter { let sender = &self.senders[worker_idx]; async move { for msg in messages { - sender.send(msg).await.map_err(|e| { - format!("Failed to send to worker {}: {}", worker_idx, e) - })?; + sender + .send(msg) + .await + .map_err(|e| format!("Failed to send to worker {}: {}", worker_idx, e))?; } Ok::<(), String>(()) } })) .await - .map_err(|e| -> Box { Box::new(std::io::Error::new(std::io::ErrorKind::Other, e)) })?; + .map_err(|e| -> Box { + Box::new(std::io::Error::new(std::io::ErrorKind::Other, e)) + })?; Ok(()) } From bf4c213e31921d170c707cb0c3a11eb719d197a9 Mon Sep 17 00:00:00 2001 From: zz_y Date: Tue, 31 Mar 2026 15:18:53 -0500 Subject: [PATCH 09/19] fix: use std::io::Error::other per clippy lint Co-Authored-By: Claude Sonnet 4.6 --- asap-query-engine/src/precompute_engine/series_router.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asap-query-engine/src/precompute_engine/series_router.rs b/asap-query-engine/src/precompute_engine/series_router.rs index 1cb4fca..94d757a 100644 --- a/asap-query-engine/src/precompute_engine/series_router.rs +++ b/asap-query-engine/src/precompute_engine/series_router.rs @@ -92,7 +92,7 @@ impl SeriesRouter { })) .await .map_err(|e| -> Box { - Box::new(std::io::Error::new(std::io::ErrorKind::Other, e)) + Box::new(std::io::Error::other(e)) })?; Ok(()) From b7e1b02ed8981054a7f082ebe85915a37efd1744 Mon Sep 17 00:00:00 2001 From: zz_y Date: Tue, 31 Mar 2026 15:20:59 -0500 Subject: [PATCH 10/19] fix: remove unnecessary cast per clippy lint Co-Authored-By: Claude Sonnet 4.6 --- asap-query-engine/src/bin/test_e2e_precompute.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index 27eb820..1cb3fe1 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -347,7 +347,7 @@ async fn main() -> Result<(), Box> { for series_idx in 0..10 { let label_val = format!("batch_{series_idx}"); for t in 0..100 { - let ts = 200_000 + (series_idx as i64) * 1000 + t; // unique ts per sample + let ts = 200_000 + series_idx * 1000 + t; // unique ts per sample let val = (series_idx * 100 + t) as f64; batch_timeseries.push(make_sample("fake_metric", &label_val, ts, val)); } From 7c76d04c147615e0c637d1d86ed61a42a58047b6 Mon Sep 17 00:00:00 2001 From: zz_y Date: Sat, 21 Feb 2026 15:23:09 -0700 Subject: [PATCH 11/19] Add standalone precompute engine to replace Arroyo streaming pipeline Implements a single-node multi-threaded precompute engine as a new module and binary target within QueryEngineRust. The engine ingests Prometheus remote write samples, buffers them per-series with out-of-order handling, detects closed tumbling/sliding windows via event-time watermarks, feeds samples into accumulator wrappers for all existing sketch types, and emits PrecomputedOutput directly to the store. New modules: config, series_buffer, window_manager, accumulator_factory, series_router, worker, output_sink, and the PrecomputeEngine orchestrator. The binary supports embedded store + query HTTP server for single-process deployment. Co-Authored-By: Claude Opus 4.6 --- asap-query-engine/Cargo.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index a5fca27..4373bfe 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -67,6 +67,10 @@ path = "src/bin/precompute_engine.rs" name = "test_e2e_precompute" path = "src/bin/test_e2e_precompute.rs" +[[bin]] +name = "precompute_engine" +path = "src/bin/precompute_engine.rs" + [dev-dependencies] ctor = "0.2" tempfile = "3.20.0" From e66e6e90327e460f2ded4ea1950605ab9ea84a99 Mon Sep 17 00:00:00 2001 From: zz_y Date: Mon, 23 Feb 2026 12:14:53 -0700 Subject: [PATCH 12/19] Wire up DatasketchesKLL in accumulator factory and add E2E test Handle top-level aggregation types (DatasketchesKLL, Sum, Min, Max, etc.) directly in the factory match, fixing the fallback to Sum that broke quantile queries. Also preserve the K parameter in KllAccumulatorUpdater::reset() instead of hardcoding 200. Add test_e2e_precompute binary that validates the full ingest -> precompute -> store -> query pipeline end-to-end. Co-Authored-By: Claude Opus 4.6 --- asap-query-engine/Cargo.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index 4373bfe..2cb7ec3 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -71,6 +71,10 @@ path = "src/bin/test_e2e_precompute.rs" name = "precompute_engine" path = "src/bin/precompute_engine.rs" +[[bin]] +name = "test_e2e_precompute" +path = "src/bin/test_e2e_precompute.rs" + [dev-dependencies] ctor = "0.2" tempfile = "3.20.0" From ba792003e2fe9f7b9c710b681c5d50f69f925aad Mon Sep 17 00:00:00 2001 From: zz_y Date: Thu, 26 Feb 2026 15:20:40 -0700 Subject: [PATCH 13/19] Add late data handling for closed windows and precompute engine design doc Fix ghost accumulator bug where late samples passing the allowed_lateness check could create orphaned accumulators for already-closed windows. Add configurable LateDataPolicy (Drop/ForwardToStore) to control behavior. Drop prevents ghost windows; ForwardToStore emits mini-accumulators for query-time merge. Also add precompute engine design document. Co-Authored-By: Claude Opus 4.6 --- .../src/bin/precompute_engine.rs | 7 +- .../src/bin/test_e2e_precompute.rs | 4 +- asap-query-engine/src/lib.rs | 2 +- .../src/precompute_engine/config.rs | 13 + .../src/precompute_engine/mod.rs | 1 + .../precompute_engine_design_doc.md | 704 +++++++++--------- .../src/precompute_engine/worker.rs | 51 ++ 7 files changed, 431 insertions(+), 351 deletions(-) diff --git a/asap-query-engine/src/bin/precompute_engine.rs b/asap-query-engine/src/bin/precompute_engine.rs index e0ec1fe..d5944a0 100644 --- a/asap-query-engine/src/bin/precompute_engine.rs +++ b/asap-query-engine/src/bin/precompute_engine.rs @@ -5,7 +5,7 @@ use query_engine_rust::data_model::{ }; use query_engine_rust::drivers::query::adapters::AdapterConfig; use query_engine_rust::engines::SimpleEngine; -use query_engine_rust::precompute_engine::config::PrecomputeEngineConfig; +use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig}; use query_engine_rust::precompute_engine::output_sink::{RawPassthroughSink, StoreOutputSink}; use query_engine_rust::precompute_engine::PrecomputeEngine; use query_engine_rust::stores::SimpleMapStore; @@ -61,6 +61,10 @@ struct Args { /// Aggregation ID to stamp on each raw-mode output #[arg(long, default_value_t = 0)] raw_mode_aggregation_id: u64, + + /// Policy for handling late samples that arrive after their window has closed + #[arg(long, value_enum, default_value_t = LateDataPolicy::Drop)] + late_data_policy: LateDataPolicy, } #[tokio::main] @@ -131,6 +135,7 @@ async fn main() -> Result<(), Box> { channel_buffer_size: args.channel_buffer_size, pass_raw_samples: args.pass_raw_samples, raw_mode_aggregation_id: args.raw_mode_aggregation_id, + late_data_policy: args.late_data_policy, }; // Create the output sink (writes directly to the store) diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index 1cb3fe1..f6751cb 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -15,7 +15,7 @@ use query_engine_rust::drivers::ingest::prometheus_remote_write::{ }; use query_engine_rust::drivers::query::adapters::AdapterConfig; use query_engine_rust::engines::SimpleEngine; -use query_engine_rust::precompute_engine::config::PrecomputeEngineConfig; +use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig}; use query_engine_rust::precompute_engine::output_sink::{RawPassthroughSink, StoreOutputSink}; use query_engine_rust::precompute_engine::PrecomputeEngine; use query_engine_rust::stores::SimpleMapStore; @@ -144,6 +144,7 @@ async fn main() -> Result<(), Box> { channel_buffer_size: 10000, pass_raw_samples: false, raw_mode_aggregation_id: 0, + late_data_policy: LateDataPolicy::Drop, }; let output_sink = Arc::new(StoreOutputSink::new(store.clone())); let engine = PrecomputeEngine::new(engine_config, streaming_config.clone(), output_sink); @@ -283,6 +284,7 @@ async fn main() -> Result<(), Box> { channel_buffer_size: 10000, pass_raw_samples: true, raw_mode_aggregation_id: raw_agg_id, + late_data_policy: LateDataPolicy::Drop, }; let raw_sink = Arc::new(RawPassthroughSink::new(store.clone())); let raw_engine = PrecomputeEngine::new(raw_engine_config, streaming_config.clone(), raw_sink); diff --git a/asap-query-engine/src/lib.rs b/asap-query-engine/src/lib.rs index a76efeb..5532db8 100644 --- a/asap-query-engine/src/lib.rs +++ b/asap-query-engine/src/lib.rs @@ -43,7 +43,7 @@ pub use drivers::{ OtlpReceiverConfig, }; -pub use precompute_engine::config::PrecomputeEngineConfig; +pub use precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig}; pub use precompute_engine::output_sink::StoreOutputSink; pub use precompute_engine::PrecomputeEngine; diff --git a/asap-query-engine/src/precompute_engine/config.rs b/asap-query-engine/src/precompute_engine/config.rs index 7bc8df9..c509ae5 100644 --- a/asap-query-engine/src/precompute_engine/config.rs +++ b/asap-query-engine/src/precompute_engine/config.rs @@ -1,5 +1,14 @@ use serde::{Deserialize, Serialize}; +/// Policy for handling late samples that arrive after their window has closed. +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize, clap::ValueEnum)] +pub enum LateDataPolicy { + /// Drop late samples that arrive after their window has closed. + Drop, + /// Forward late samples to the store to be merged with existing window data. + ForwardToStore, +} + /// Configuration for the precompute engine. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PrecomputeEngineConfig { @@ -21,6 +30,8 @@ pub struct PrecomputeEngineConfig { pub pass_raw_samples: bool, /// Aggregation ID to stamp on each raw-mode output. pub raw_mode_aggregation_id: u64, + /// Policy for handling late samples that arrive after their window has closed. + pub late_data_policy: LateDataPolicy, } impl Default for PrecomputeEngineConfig { @@ -34,6 +45,7 @@ impl Default for PrecomputeEngineConfig { channel_buffer_size: 10_000, pass_raw_samples: false, raw_mode_aggregation_id: 0, + late_data_policy: LateDataPolicy::Drop, } } } @@ -53,5 +65,6 @@ mod tests { assert_eq!(config.channel_buffer_size, 10_000); assert!(!config.pass_raw_samples); assert_eq!(config.raw_mode_aggregation_id, 0); + assert_eq!(config.late_data_policy, LateDataPolicy::Drop); } } diff --git a/asap-query-engine/src/precompute_engine/mod.rs b/asap-query-engine/src/precompute_engine/mod.rs index 1f3f1df..7324f26 100644 --- a/asap-query-engine/src/precompute_engine/mod.rs +++ b/asap-query-engine/src/precompute_engine/mod.rs @@ -82,6 +82,7 @@ impl PrecomputeEngine { self.config.allowed_lateness_ms, self.config.pass_raw_samples, self.config.raw_mode_aggregation_id, + self.config.late_data_policy, ); let handle = tokio::spawn(async move { worker.run().await; diff --git a/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md b/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md index 471bb34..f59a42f 100644 --- a/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md +++ b/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md @@ -2,422 +2,430 @@ ## 1. Overview -### Why this PR is needed - -ASAPQuery already has a query path over precomputed summaries, but before this PR -there was no standalone runtime inside `asap-query-engine` that could continuously: - -- accept raw metric samples, -- turn them into windowed precomputed outputs, and -- write those outputs into the same store that the query engine reads. - -PR #228 fills that gap by introducing a first working version of a **precompute -engine**. The engine runs as a separate binary, accepts Prometheus remote write -traffic, partitions incoming series across workers, computes windowed -accumulators, and stores the results for later query-time retrieval. - -### Why not ArroyoSketch? - -The existing precompute path — **ArroyoSketch** (`asap-summary-ingest/run_arroyosketch.py`) -— already performs windowed sketch aggregation, but it does so through an -entirely separate operational stack: - -| Dimension | ArroyoSketch | Precompute Engine (this PR) | -|---|---|---| -| **Runtime** | External Arroyo cluster (separate process, separate binary) | In-process Rust binary alongside the query engine | -| **Orchestration language** | Python (Jinja2 SQL templates deployed via REST API to Arroyo) | Native Rust, driven directly by `StreamingConfig` | -| **Ingest transport** | Kafka topic or Prometheus remote write → Arroyo pipeline | Prometheus remote write directly to the engine | -| **Output transport** | Kafka topic → consumed by a separate pipeline stage | Direct write to the store already read by the query engine | -| **Operational dependencies** | Arroyo cluster + Kafka brokers must be running and healthy | None beyond the query engine process itself | -| **Configuration coupling** | Arroyo pipeline SQL is rendered from `streaming_config.yaml` by a Python script; any config change requires re-deploying pipelines via the Arroyo REST API | Engine reads `StreamingConfig` directly at startup; same structs used throughout `asap-query-engine` | -| **Failure boundary** | Arroyo crash or Kafka lag is invisible to the query engine until queries begin returning stale results | Precompute workers and query engine share the same process and store; failures surface immediately | - -In short, ArroyoSketch trades simplicity for power: it is a general-purpose -streaming SQL engine that can express complex multi-stage pipelines, but it -requires standing up and operating Arroyo and Kafka as separate infrastructure. -That operational overhead is the main barrier to running the precompute path in -development, in CI, or in environments where Kafka is not already present. - -This PR replaces the ingest-and-aggregate role of ArroyoSketch with a -self-contained Rust implementation that has no external service dependencies, -shares the same store and configuration types as the rest of `asap-query-engine`, -and can be validated end to end in a single process. ArroyoSketch remains useful -as a production deployment option when Arroyo and Kafka are already available, -but the precompute engine is the path forward for native integration within the -Rust codebase. - -This PR is primarily about establishing the end-to-end execution path and the -core abstractions: - -- ingest endpoint, -- worker sharding model, -- window management, -- accumulator construction and update, -- output sink abstraction, and -- integration with the existing store and query engine. - -### Requirements - -The implementation in this PR is driven by the following requirements: - -1. ASAPQuery needs a native precompute path inside the Rust query engine codebase. -2. The system must ingest a high volume of time-series samples without forcing - cross-worker coordination on every sample. -3. Samples for the same series must be processed consistently by the same worker - so per-series state can stay local. -4. The engine must support windowed precomputation for the aggregation - configurations already defined in `StreamingConfig`. -5. The output must be written in the same `PrecomputedOutput` form already - consumed by the store and query engine. -6. The design must stay simple enough to validate correctness end to end before - adding more advanced features such as richer late-data policies or multi-stage - aggregation. - -### Scope of this PR - -This PR delivers a pragmatic v1: - -- single-process, multi-worker execution, -- Prometheus remote write ingest, -- store-backed output, -- watermark-based window closing, -- bounded per-series buffering, -- best-effort handling of out-of-order data via a lateness threshold, -- optional raw passthrough mode. - -It does **not** try to solve every future concern yet. In particular, it does -not add multi-stage aggregation, explicit late-data re-emission policies, -cross-worker merge coordination, or pane-based sliding-window optimization. +The Precompute Engine is a real-time streaming aggregation system that sits between +Prometheus-compatible metric producers and ASAP storage and query engine. It accepts raw +time-series samples via the Prometheus remote-write protocol, buffers them, computes +windowed aggregations (sketches), and writes the results to a store for fast +query-time retrieval. + +**Key properties:** +- Watermark-based windowed aggregation (tumbling and sliding windows) +- Multi-threaded, shard-nothing worker architecture +- Pluggable accumulator types (Sum, Min/Max, Increase, KLL, CMS, HydraKLL) +- Configurable late-data handling (Drop or ForwardToStore) +- Optional raw passthrough mode for bypassing aggregation ## 2. Architecture -### High-level data flow - -```text -Prometheus Remote Write - | - v -POST /api/v1/write (Axum) - | - v -decode_prometheus_remote_write() - | - v -group samples by series key - | - v -SeriesRouter (xxhash(series_key) % num_workers) - | - +-------------------+-------------------+-------------------+ - | | | | - v v v v - Worker 0 Worker 1 Worker 2 Worker N-1 - | | | | - | per-series buffer + per-aggregation active windows | - +-------------------+-------------------+-------------------+ - | - v - OutputSink::emit_batch() - | - v - Store - | - v - Query Engine ``` - -### Main components - -#### `PrecomputeEngine` (`mod.rs`) - -`PrecomputeEngine` is the top-level orchestrator. It: - -- loads aggregation configs from `StreamingConfig`, -- creates one bounded MPSC channel per worker, -- builds a `SeriesRouter`, -- spawns worker tasks, -- starts the ingest HTTP server, and -- starts a periodic flush loop. - -The engine keeps the worker model intentionally simple: workers are symmetric, -and routing is deterministic. - -#### `SeriesRouter` (`series_router.rs`) - -The router computes: - -```text -worker_idx = xxhash64(series_key) % num_workers + Prometheus Remote Write + | + Axum HTTP Server (:9090) + POST /api/v1/write + | + decode + group by series + | + SeriesRouter (hash) + / | \ + Worker 0 Worker 1 Worker 2 ... Worker N-1 + (shard 0) (shard 1) (shard 2) (shard N-1) + | | | | + +---------- + --------- + ----------- + + | + OutputSink.emit_batch() + | + Store + (SimpleMapStore / PerKey) + | + Query Engine + (PromQL / SQL / etc.) ``` -This guarantees that all samples for one exact series key go to the same worker. -That is the main design decision that keeps worker-local state lock-free. - -#### `Worker` (`worker.rs`) - -Each worker owns a shard of the series space. For each series it stores: - -- a `SeriesBuffer`, -- the previous watermark seen for that series, -- one `AggregationState` per matching aggregation config. - -Each `AggregationState` contains: - -- the copied `AggregationConfig`, -- a `WindowManager`, -- a map of active window accumulators. - -Workers receive `Samples`, `Flush`, and `Shutdown` messages. On samples, the -worker inserts data into the series buffer, applies lateness filtering, updates -active window accumulators, detects newly closed windows, and emits completed -accumulators to the sink. - -#### `SeriesBuffer` (`series_buffer.rs`) - -The buffer stores timestamped samples per series in timestamp order and tracks a -monotonic watermark. It is bounded by `max_buffer_per_series`, which prevents a -single hot or stalled series from growing unbounded in memory. - -#### `WindowManager` (`window_manager.rs`) - -`WindowManager` encapsulates window boundary logic: - -- map a timestamp to an aligned window start, -- decide which windows became closed after watermark advancement, -- return `[window_start, window_end)` bounds. - -The current implementation supports both tumbling and slide-aligned window -closure logic. Window close is driven by event-time watermark progression, not -wall-clock time. - -#### `AccumulatorUpdater` factory (`accumulator_factory.rs`) - -Workers do not hardcode sketch logic. Instead, they construct accumulator -updaters from the aggregation config. This keeps the precompute engine generic -across supported aggregation types and lets it emit the same accumulator objects -already used elsewhere in ASAPQuery. - -#### `OutputSink` (`output_sink.rs`) - -`OutputSink` separates computation from persistence. This PR ships three useful -implementations: - -- `StoreOutputSink` for normal precompute writes, -- `RawPassthroughSink` for writing raw samples as `SumAccumulator`s, -- `NoopOutputSink` for tests. - -### Execution model - -The execution model is: +A periodic **flush timer** broadcasts `Flush` messages to all workers so that +windows that would otherwise remain open (no new samples arriving) are closed +and emitted. -1. Decode one remote-write request. -2. Group samples by exact series key. -3. Route each grouped batch to one worker. -4. Process series state only on that worker. -5. Emit completed windows in batches to the sink. +## 3. Components -This design avoids per-sample cross-worker synchronization and keeps the first -version operationally understandable. +### 3.1 PrecomputeEngine (`mod.rs`) -## 3. Key Features Derived From the Requirements +Top-level orchestrator. On `run()`: -### Deterministic per-series routing +1. Creates one `mpsc::channel` per worker. +2. Constructs a `SeriesRouter` with the sender halves. +3. Spawns `Worker` tasks, each owning its receiver. +4. Spawns a flush timer that calls `router.broadcast_flush()` every + `flush_interval_ms`. +5. Starts the Axum HTTP server and blocks until shutdown. -Requirement: samples for one series must share local state. - -Derived feature: the hash-based router always sends the same series key to the -same worker. This means: - -- no shared mutable state across workers for a given series, -- no locking around per-series accumulators, -- predictable ownership of series-local watermarks and buffers. +```rust +pub struct PrecomputeEngine { + config: PrecomputeEngineConfig, + streaming_config: Arc, + output_sink: Arc, +} +``` -### Config-driven aggregation matching +### 3.2 Configuration (`config.rs`) + +```rust +pub struct PrecomputeEngineConfig { + pub num_workers: usize, // default: 4 + pub ingest_port: u16, // default: 9090 + pub allowed_lateness_ms: i64, // default: 5,000 + pub max_buffer_per_series: usize, // default: 10,000 + pub flush_interval_ms: u64, // default: 1,000 + pub channel_buffer_size: usize, // default: 10,000 + pub pass_raw_samples: bool, // default: false + pub raw_mode_aggregation_id: u64, // default: 0 + pub late_data_policy: LateDataPolicy, // default: Drop +} + +pub enum LateDataPolicy { + Drop, // Silently discard late samples for closed windows + ForwardToStore, // Emit a mini-accumulator for query-time merge +} +``` -Requirement: reuse aggregation definitions already present in the system. +### 3.3 SeriesRouter (`series_router.rs`) -Derived feature: each worker matches a series against the loaded -`AggregationConfig`s and creates aggregation state only for the configs relevant -to that series. The engine therefore stays driven by `StreamingConfig` instead -of inventing a separate configuration model. +Deterministic hash-based routing using XXHash64: -### Windowed precomputation with watermark closure +``` +worker_idx = xxhash64(series_key) % num_workers +``` -Requirement: emit queryable precomputed windows rather than raw streams only. +All samples for a given series always land on the same worker, so per-series +state (buffer, watermark, active windows) needs no synchronization. -Derived feature: each aggregation uses a `WindowManager` to: +**Message types:** +```rust +enum WorkerMessage { + Samples { series_key: String, samples: Vec<(i64, f64)>, ingest_received_at: Instant }, + Flush, + Shutdown, +} +``` -- align samples to windows, -- detect when watermark movement closes a window, -- emit `PrecomputedOutput` records with exact window bounds. +`route_batch()` groups messages by target worker and sends them in parallel for +throughput while preserving per-worker ordering. + +### 3.4 Worker (`worker.rs`) + +Each worker owns an isolated shard of the series space. + +```rust +struct Worker { + id: usize, + receiver: mpsc::Receiver, + output_sink: Arc, + series_map: HashMap, + agg_configs: HashMap, + max_buffer_per_series: usize, + allowed_lateness_ms: i64, + pass_raw_samples: bool, + raw_mode_aggregation_id: u64, + late_data_policy: LateDataPolicy, +} +``` -This gives the query engine stable window ranges to read later. +**Per-series state:** +```rust +struct SeriesState { + buffer: SeriesBuffer, // sorted sample buffer + previous_watermark_ms: i64, // last-seen watermark + aggregations: Vec, // one per matching config +} + +struct AggregationState { + config: AggregationConfig, + window_manager: WindowManager, + active_windows: HashMap>, +} +``` -### Bounded memory for series-local state +#### Processing pipeline (`process_samples`) -Requirement: the engine must remain safe under continuous ingestion. +``` +1. Match series to AggregationConfigs (by metric name / spatial_filter) +2. Insert samples into SeriesBuffer, update watermark +3. Drop samples beyond allowed_lateness_ms behind watermark +4. For each sample × each aggregation: + a. Compute window_starts_containing(ts) + b. For each window_start: + - If window not in active_windows AND already closed (watermark >= window_end): + → late_data_policy == Drop: skip + → late_data_policy == ForwardToStore: create mini-accumulator, emit + - Else: insert into active_windows, feed value to AccumulatorUpdater +5. Detect newly closed windows via closed_windows(prev_wm, current_wm) +6. Extract closed window accumulators → PrecomputedOutput + AggregateCore +7. Emit batch to OutputSink +8. Update previous_watermark_ms +``` -Derived feature: each series uses a bounded `SeriesBuffer`, and each worker uses -bounded channels from the router. This does not solve every overload scenario, -but it prevents the obvious unbounded growth cases in the v1 design. +#### Raw mode -### Optional raw passthrough mode +When `pass_raw_samples = true`, the entire aggregation pipeline is bypassed. +Each sample is emitted as a `SumAccumulator::with_sum(value)` with point-window +bounds `[ts, ts]` and the configured `raw_mode_aggregation_id`. -Requirement: support bring-up, debugging, and staged rollout. +### 3.5 SeriesBuffer (`series_buffer.rs`) -Derived feature: when `pass_raw_samples=true`, the worker bypasses windowed -aggregation and emits one `SumAccumulator` per sample. This is useful for -testing the ingest-to-store plumbing independently from sketch behavior. +Per-series in-memory buffer backed by `BTreeMap`. -### Direct integration with the existing store and query engine +```rust +struct SeriesBuffer { + samples: BTreeMap, // timestamp_ms → value + watermark_ms: i64, // max timestamp ever seen (monotonic) + max_buffer_size: usize, +} +``` -Requirement: the precompute path must fit ASAPQuery's existing runtime. +- Samples are automatically sorted by timestamp. +- Watermark only advances forward (monotonic). +- When the buffer exceeds `max_buffer_size`, the oldest samples are evicted. +- Supports range reads (`read_range`) and destructive drains (`drain_up_to`). -Derived feature: the standalone `precompute_engine` binary can be launched with: +### 3.6 WindowManager (`window_manager.rs`) -- a `StreamingConfig`, -- a store implementation, -- an optional query HTTP server in the same process. +Handles both tumbling and sliding window semantics. -That makes the PR immediately testable end to end. +```rust +struct WindowManager { + window_size_ms: i64, // e.g. 60_000 + slide_interval_ms: i64, // == window_size for tumbling; < window_size for sliding +} +``` -## 4. System Implementation Corner Cases +**Key methods:** -### Late and out-of-order samples +| Method | Description | +|--------|-------------| +| `window_start_for(ts)` | Align timestamp down to nearest slide boundary | +| `window_starts_containing(ts)` | All windows whose `[start, start+size)` includes `ts`. Tumbling → 1 window; sliding → `ceil(size/slide)` windows | +| `closed_windows(prev_wm, curr_wm)` | Windows that transitioned open→closed as the watermark advanced | +| `window_bounds(start)` | Returns `(start, start + window_size_ms)` | -The current policy is intentionally simple: +**Window closure rule:** a window `[S, S + size)` closes when `watermark >= S + size`. +Once closed, a window never reopens. -- if `timestamp < watermark - allowed_lateness_ms`, the sample is dropped; -- otherwise it is accepted. +### 3.7 AccumulatorUpdater (`accumulator_factory.rs`) -This means the PR chooses predictability over replay complexity. There is no -secondary path yet for re-opening or patching already emitted windows. +Trait-based interface for feeding samples into sketch accumulators: -### Idle series and flush behavior +```rust +trait AccumulatorUpdater: Send { + fn update_single(&mut self, value: f64, timestamp_ms: i64); + fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, timestamp_ms: i64); + fn take_accumulator(&mut self) -> Box; + fn reset(&mut self); + fn is_keyed(&self) -> bool; + fn memory_usage_bytes(&self) -> usize; +} +``` -The engine has a periodic flush loop, but the current implementation does **not** -advance watermarks on its own. As a result, a flush only emits windows that have -become closable due to prior event-time progress. If a series stops receiving -samples before a later sample advances the watermark, the worker does not invent -time progress just because wall-clock time passed. +The factory function `create_accumulator_updater(config)` dispatches on +`(aggregation_type, aggregation_sub_type)`: + +| Type | Sub-type | Updater | +|------|----------|---------| +| SingleSubpopulation | Sum | SumAccumulatorUpdater | +| SingleSubpopulation | Min/Max | MinMaxAccumulatorUpdater | +| SingleSubpopulation | Increase | IncreaseAccumulatorUpdater | +| SingleSubpopulation | KLL | KllAccumulatorUpdater | +| MultipleSubpopulation | Sum | MultipleSumUpdater | +| MultipleSubpopulation | Min/Max | MultipleMinMaxUpdater | +| MultipleSubpopulation | Increase | MultipleIncreaseUpdater | +| MultipleSubpopulation | CMS | CmsAccumulatorUpdater | +| MultipleSubpopulation | HydraKLL | HydraKllAccumulatorUpdater | + +### 3.8 OutputSink (`output_sink.rs`) + +```rust +trait OutputSink: Send + Sync { + fn emit_batch( + &self, + outputs: Vec<(PrecomputedOutput, Box)>, + ) -> Result<(), Box>; +} +``` -This is an important behavior boundary for this PR. +**Implementations:** +- `StoreOutputSink` — calls `store.insert_precomputed_output_batch()` +- `RawPassthroughSink` — same interface, used for raw mode +- `NoopOutputSink` — testing helper that counts emitted items -### Sliding-window semantics in v1 +## 4. Data Model -`WindowManager` understands slide intervals, and tests cover slide-aligned -window closing. However, this PR keeps the worker update path simple: samples -are placed into the accumulator keyed by `window_start_for(ts)`, and the design -does not yet implement the more advanced pane-sharing or multi-window fan-out -approach described in earlier discussion branches. +### PrecomputedOutput -So the current PR establishes the reusable windowing abstraction first, while -leaving richer sliding-window execution strategies for follow-up work. +```rust +pub struct PrecomputedOutput { + pub start_timestamp: u64, // window start (ms) + pub end_timestamp: u64, // window end (ms) + pub key: Option, // grouping key (e.g. method="GET") + pub aggregation_id: u64, +} +``` -### Cross-series aggregation across workers +### KeyByLabelValues + +Ordered vector of label values matching the `grouping_labels` in the aggregation +config. Serialized as semicolon-delimited strings for hashing/storage. + +### AggregationConfig + +Loaded from `streaming_config.yaml`: + +```rust +pub struct AggregationConfig { + pub aggregation_id: u64, + pub aggregation_type: String, // "SingleSubpopulation" | "MultipleSubpopulation" + pub aggregation_sub_type: String, // "Sum" | "Min" | "Max" | "Increase" | "KLL" | ... + pub parameters: HashMap, + pub grouping_labels: KeyByLabelNames, + pub window_size: u64, // seconds + pub slide_interval: u64, // seconds (0 = tumbling) + pub metric: String, + pub spatial_filter: String, + pub num_aggregates_to_retain: Option, + // ... +} +``` -Routing is based on the full series key, not on the final grouping key. That -keeps ingestion simple, but it also means different source series that -contribute to the same logical grouped result may be processed on different -workers. This PR does not introduce a second-tier reduce stage; it relies on the -existing downstream model of storing precomputed outputs and reading them later. +## 5. Store Integration -### Series-key parsing assumptions +### Write path -Grouping-label extraction currently parses series keys in the expected Prometheus -text form: +`OutputSink.emit_batch()` → `Store.insert_precomputed_output_batch()` -```text -metric_name{label1="value1",label2="value2"} +The `SimpleMapStore` (PerKey variant) uses: +``` +DashMap>> +``` +where: +```rust +struct StoreKeyData { + time_map: HashMap<(u64, u64), Vec<(Option, Box)>>, + read_counts: HashMap<(u64, u64), u64>, +} ``` -Missing grouping labels are converted to empty strings. This keeps the worker -path robust, but it is worth documenting because output keys depend on this -parsing behavior. - -### Raw mode loses label-group semantics - -In raw passthrough mode, the engine emits one point output per sample with -`key=None`. That is acceptable for the intended debugging and plumbing use case, -but it is deliberately not equivalent to fully configured grouped aggregation. +Multiple entries per `(start_ts, end_ts)` are allowed — they are appended, not +overwritten. This is what makes `ForwardToStore` late-data policy work: the late +mini-accumulator is stored alongside the original window accumulator. -## 5. Examples +### Read path / query-time merge -### Example 1: basic tumbling-window flow +At query time, `PrecomputedSummaryReadExec` reads sparse buckets from the store. +`SummaryMergeMultipleExec` groups by label key and merges via +`accumulator.merge_with()`. -Assume: +The `NaiveMerger` re-merges all accumulators in the window on each slide. +The store's existing multi-entry-per-window design means late data is +automatically combined with original window data at query time. -- metric: `fake_metric` -- window size: 60 seconds -- slide interval: 0 (tumbling) -- one sample arrives at `t=12_000 ms` +### Cleanup policies -The worker computes: +| Policy | Behavior | +|--------|----------| +| CircularBuffer | Keep N most recent windows (4x `num_aggregates_to_retain`) | +| ReadBased | Remove after `read_count >= threshold` | +| NoCleanup | Retain forever | -- `window_start = 0` -- `window_end = 60_000` +## 6. Late Data Handling -The sample updates the active accumulator for window `[0, 60_000)`. Once the -watermark later reaches at least `60_000`, the worker emits: +Two checks determine whether a sample is "late": -- `PrecomputedOutput(start=0, end=60_000, aggregation_id=...)` -- the finished accumulator for that window +1. **Watermark check** (sample-level): `ts < watermark - allowed_lateness_ms` → + sample is dropped entirely before reaching any aggregation logic. -### Example 2: out-of-order sample handling +2. **Window closure check** (window-level): the sample passes the watermark check + but targets a window that is already closed + (`window not in active_windows && watermark >= window_end`). -Assume: +For case 2, the `LateDataPolicy` controls behavior: -- current series watermark is `100_000 ms` -- `allowed_lateness_ms = 5_000` +- **Drop**: log at debug level and skip. No ghost accumulator is created + (fixing the original bug where `or_insert_with` would create orphaned entries). -Then: +- **ForwardToStore**: create a fresh `AccumulatorUpdater`, feed the single + late sample, wrap as `PrecomputedOutput`, and push into the same `emit_batch` + as normal closed-window outputs. The store appends it alongside the original + window data, and query-time merge combines them. -- sample at `97_000 ms` is accepted, -- sample at `94_999 ms` is dropped. +## 7. Concurrency Model -This keeps the lateness rule easy to reason about. +- **Ingest HTTP handler**: Axum async handler, stateless decoding. +- **SeriesRouter**: Lock-free hash routing. No shared mutable state. +- **Workers**: Each worker is single-threaded (owns its `series_map`). + No locks needed within a worker. +- **OutputSink / Store**: Thread-safe (`Arc`, DashMap-backed store). + Workers emit concurrently; the PerKey store uses per-aggregation_id RwLocks + to minimize contention. +- **Flush timer**: Separate tokio task, communicates via the same MPSC channels. -### Example 3: deterministic sharding +## 8. Performance Characteristics -Assume two incoming series: +**Ingest path (per batch):** +- Sample insert: O(log B) per sample (BTreeMap, B = buffer size) +- Window assignment: O(A × W) per sample (A = matching aggregations, + W = windows per sample — 1 for tumbling, ~3 for typical sliding) +- Accumulator update: O(1) for Sum/MinMax, O(log k) for KLL -- `cpu_usage{host="a",job="node"}` -- `cpu_usage{host="b",job="node"}` +**Memory:** +- O(S × N) buffered samples (S = max per series, N = active series) +- O(A × W_open) active window accumulators -The router hashes each full series key independently. Each series is assigned to -one worker, and every later batch for that same series goes back to that same -worker. The benefit is that each worker can maintain series-local state without -coordination. +**Throughput:** +- Workers process in parallel with no cross-shard coordination. +- E2E test demonstrates ~10M samples/sec sustained throughput with raw mode. -### Example 4: raw passthrough mode +## 9. CLI Usage -If `pass_raw_samples=true` and a sample arrives: +### Standalone binary -```text -series_key = fake_metric{instance="i1"} -timestamp = 25_000 -value = 42.0 +```bash +cargo run --bin precompute_engine -- \ + --streaming-config streaming_config.yaml \ + --ingest-port 9090 \ + --num-workers 4 \ + --allowed-lateness-ms 5000 \ + --max-buffer-per-series 10000 \ + --flush-interval-ms 1000 \ + --channel-buffer-size 10000 \ + --query-port 8080 \ + --lock-strategy per-key \ + --late-data-policy drop ``` -The worker emits one point output immediately: - -- `PrecomputedOutput(start=25_000, end=25_000, key=None, aggregation_id=raw_mode_aggregation_id)` -- `SumAccumulator::with_sum(42.0)` - -This mode is useful when validating the ingest path independently from -windowed aggregation correctness. - -## 6. Summary - -PR #228 introduces the first integrated precompute engine inside -`asap-query-engine`. The design deliberately favors a clear and testable v1: - -- one process, -- deterministic worker sharding, -- config-driven accumulator creation, -- watermark-based window emission, -- direct store integration. - -That foundation is the reason this PR is needed. It creates the runtime path -that later PRs can extend with more sophisticated window execution, richer late -data handling, and more advanced cross-worker aggregation strategies. +### Embedded in main binary + +The precompute engine is also embedded in the main `query_engine_rust` binary, +enabled via `--enable-prometheus-remote-write`. In this mode it shares the +store with the Kafka consumer path. + +## 10. Testing + +- **Unit tests**: `worker.rs` (metric/label extraction), `window_manager.rs` + (tumbling/sliding arithmetic), `series_buffer.rs` (ordering, watermark), + `accumulator_factory.rs` (updater creation), `series_router.rs` (hashing), + `config.rs` (defaults). +- **E2E test** (`bin/test_e2e_precompute.rs`): Starts engine + store + query + server in-process, sends remote-write samples, queries via PromQL HTTP, + validates results. Includes batch latency and throughput benchmarks. + +## 11. File Map + +| File | Purpose | +|------|---------| +| `precompute_engine/mod.rs` | Orchestrator, HTTP ingest handler | +| `precompute_engine/config.rs` | `PrecomputeEngineConfig`, `LateDataPolicy` | +| `precompute_engine/worker.rs` | Per-shard processing, aggregation, window management | +| `precompute_engine/series_router.rs` | Hash-based series → worker routing | +| `precompute_engine/series_buffer.rs` | Per-series BTreeMap sample buffer | +| `precompute_engine/window_manager.rs` | Tumbling/sliding window logic | +| `precompute_engine/accumulator_factory.rs` | `AccumulatorUpdater` trait + factory | +| `precompute_engine/output_sink.rs` | `OutputSink` trait + Store/Noop impls | +| `bin/precompute_engine.rs` | Standalone CLI binary | +| `bin/test_e2e_precompute.rs` | End-to-end integration test | diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index 05c0cd2..446c255 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -2,6 +2,7 @@ use crate::data_model::{AggregateCore, KeyByLabelValues, PrecomputedOutput}; use crate::precompute_engine::accumulator_factory::{ create_accumulator_updater, AccumulatorUpdater, }; +use crate::precompute_engine::config::LateDataPolicy; use crate::precompute_engine::output_sink::OutputSink; use crate::precompute_engine::series_buffer::SeriesBuffer; use crate::precompute_engine::series_router::WorkerMessage; @@ -47,6 +48,8 @@ pub struct Worker { pass_raw_samples: bool, /// Aggregation ID stamped on each raw-mode output. raw_mode_aggregation_id: u64, + /// Policy for handling late samples that arrive after their window has closed. + late_data_policy: LateDataPolicy, } impl Worker { @@ -60,6 +63,7 @@ impl Worker { allowed_lateness_ms: i64, pass_raw_samples: bool, raw_mode_aggregation_id: u64, + late_data_policy: LateDataPolicy, ) -> Self { Self { id, @@ -71,6 +75,7 @@ impl Worker { allowed_lateness_ms, pass_raw_samples, raw_mode_aggregation_id, + late_data_policy, } } @@ -178,6 +183,7 @@ impl Worker { // Copy scalars out of self before taking &mut self.series_map let worker_id = self.id; let allowed_lateness_ms = self.allowed_lateness_ms; + let late_data_policy = self.late_data_policy; // Ensure state exists self.get_or_create_series_state(series_key); @@ -224,6 +230,51 @@ impl Worker { let window_starts = agg_state.window_manager.window_starts_containing(ts); for window_start in window_starts { + let window_end = window_start + agg_state.window_manager.window_size_ms(); + + // Check if this window was already closed in a previous batch + if !agg_state.active_windows.contains_key(&window_start) + && current_wm >= window_end + { + // Window already closed — handle according to policy + match late_data_policy { + LateDataPolicy::Drop => { + debug!( + "Dropping late sample for closed window [{}, {})", + window_start, window_end + ); + continue; + } + LateDataPolicy::ForwardToStore => { + let mut updater = create_accumulator_updater(&agg_state.config); + if updater.is_keyed() { + let key = extract_key_from_series(series_key, &agg_state.config); + updater.update_keyed(&key, val, ts); + } else { + updater.update_single(val, ts); + } + let key = if updater.is_keyed() { + Some(extract_key_from_series(series_key, &agg_state.config)) + } else { + None + }; + let output = PrecomputedOutput::new( + window_start as u64, + window_end as u64, + key, + agg_state.config.aggregation_id, + ); + emit_batch.push((output, updater.take_accumulator())); + debug!( + "Forwarding late sample to store for closed window [{}, {})", + window_start, window_end + ); + continue; + } + } + } + + // Normal path: window is still open (or newly opened) let updater = agg_state .active_windows .entry(window_start) From fc34f14c732475acb6ec432e518286efebd9810f Mon Sep 17 00:00:00 2001 From: zz_y Date: Thu, 26 Feb 2026 15:23:55 -0700 Subject: [PATCH 14/19] Minor wording fix in precompute engine design doc Co-Authored-By: Claude Opus 4.6 --- .../src/precompute_engine/precompute_engine_design_doc.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md b/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md index f59a42f..4cfe0ab 100644 --- a/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md +++ b/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md @@ -4,7 +4,7 @@ The Precompute Engine is a real-time streaming aggregation system that sits between Prometheus-compatible metric producers and ASAP storage and query engine. It accepts raw -time-series samples via the Prometheus remote-write protocol, buffers them, computes +time-series samples via the Prometheus remote-write protocol ingestion connector, buffers them, computes windowed aggregations (sketches), and writes the results to a store for fast query-time retrieval. From 10e4161a87e8f62a3dd138955b3f554d74e913a8 Mon Sep 17 00:00:00 2001 From: zz_y Date: Thu, 26 Feb 2026 15:27:05 -0700 Subject: [PATCH 15/19] Clarify single-machine multi-threaded architecture in design doc Co-Authored-By: Claude Opus 4.6 --- .../precompute_engine_design_doc.md | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md b/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md index 4cfe0ab..64d6955 100644 --- a/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md +++ b/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md @@ -9,8 +9,9 @@ windowed aggregations (sketches), and writes the results to a store for fast query-time retrieval. **Key properties:** +- Single-machine, multi-threaded architecture (all workers run as async tasks within one process) - Watermark-based windowed aggregation (tumbling and sliding windows) -- Multi-threaded, shard-nothing worker architecture +- Shared-nothing worker design: series are hash-partitioned across threads with no cross-worker coordination - Pluggable accumulator types (Sum, Min/Max, Increase, KLL, CMS, HydraKLL) - Configurable late-data handling (Drop or ForwardToStore) - Optional raw passthrough mode for bypassing aggregation @@ -356,14 +357,26 @@ For case 2, the `LateDataPolicy` controls behavior: ## 7. Concurrency Model +The current implementation is **single-machine, multi-threaded**. All components +(HTTP server, workers, store) run within a single OS process as Tokio async +tasks on a shared thread pool. There is no distributed coordination, no +cross-machine communication, and no external dependency beyond the store. + - **Ingest HTTP handler**: Axum async handler, stateless decoding. - **SeriesRouter**: Lock-free hash routing. No shared mutable state. -- **Workers**: Each worker is single-threaded (owns its `series_map`). - No locks needed within a worker. +- **Workers**: Each worker is a single Tokio task that owns its `series_map` + exclusively. No locks needed within a worker — thread safety comes from + the hash-partitioning guarantee that each series is assigned to exactly one + worker. - **OutputSink / Store**: Thread-safe (`Arc`, DashMap-backed store). Workers emit concurrently; the PerKey store uses per-aggregation_id RwLocks to minimize contention. -- **Flush timer**: Separate tokio task, communicates via the same MPSC channels. +- **Flush timer**: Separate Tokio task, communicates via the same MPSC channels. + +Scaling beyond a single machine would require partitioning the series space +across multiple engine instances (e.g. via consistent hashing at the load +balancer level), each running this same single-process architecture +independently. ## 8. Performance Characteristics From 24ec6cefa4e8c0cac5f442c8eb8735959cd93b1a Mon Sep 17 00:00:00 2001 From: zz_y Date: Thu, 26 Feb 2026 15:38:24 -0700 Subject: [PATCH 16/19] Add multi-connector ingest support (Prometheus + VictoriaMetrics) Extract format-agnostic routing logic into route_decoded_samples() helper and register a VictoriaMetrics remote write endpoint at /api/v1/import alongside the existing Prometheus endpoint at /api/v1/write. Co-Authored-By: Claude Opus 4.6 --- .../src/precompute_engine/mod.rs | 121 +++++++++++------- .../precompute_engine_design_doc.md | 25 ++-- 2 files changed, 87 insertions(+), 59 deletions(-) diff --git a/asap-query-engine/src/precompute_engine/mod.rs b/asap-query-engine/src/precompute_engine/mod.rs index 7324f26..5801724 100644 --- a/asap-query-engine/src/precompute_engine/mod.rs +++ b/asap-query-engine/src/precompute_engine/mod.rs @@ -8,6 +8,7 @@ pub mod worker; use crate::data_model::StreamingConfig; use crate::drivers::ingest::prometheus_remote_write::decode_prometheus_remote_write; +use crate::drivers::ingest::victoriametrics_remote_write::decode_victoriametrics_remote_write; use crate::precompute_engine::config::PrecomputeEngineConfig; use crate::precompute_engine::output_sink::OutputSink; use crate::precompute_engine::series_router::{SeriesRouter, WorkerMessage}; @@ -18,7 +19,7 @@ use std::sync::Arc; use std::time::Instant; use tokio::net::TcpListener; use tokio::sync::mpsc; -use tracing::{debug_span, info, warn, Instrument}; +use tracing::{info, warn}; /// Shared state for the ingest HTTP handler. struct IngestState { @@ -116,9 +117,10 @@ impl PrecomputeEngine { } }); - // Start the Axum HTTP server for Prometheus remote write ingest + // Start the Axum HTTP server for ingest (Prometheus + VictoriaMetrics) let app = Router::new() - .route("/api/v1/write", post(handle_ingest)) + .route("/api/v1/write", post(handle_prometheus_ingest)) + .route("/api/v1/import", post(handle_victoriametrics_ingest)) .with_state(ingest_state); let addr = format!("0.0.0.0:{}", self.config.ingest_port); @@ -136,56 +138,77 @@ impl PrecomputeEngine { } } -/// Axum handler for Prometheus remote write. -async fn handle_ingest(State(state): State>, body: Bytes) -> StatusCode { - let ingest_span = debug_span!("ingest", body_len = body.len()); - let ingest_received_at = Instant::now(); +/// Shared logic: group decoded samples by series key and route to workers. +async fn route_decoded_samples( + state: &IngestState, + samples: Vec, + ingest_received_at: Instant, +) -> StatusCode { + if samples.is_empty() { + return StatusCode::NO_CONTENT; + } - async { - let samples = match decode_prometheus_remote_write(&body) { - Ok(s) => s, - Err(e) => { - warn!("Failed to decode remote write: {}", e); - return StatusCode::BAD_REQUEST; - } - }; + let count = samples.len() as u64; + state + .samples_ingested + .fetch_add(count, std::sync::atomic::Ordering::Relaxed); + + // Group samples by series key for batch routing + let mut by_series: HashMap<&str, Vec<(i64, f64)>> = HashMap::new(); + for s in &samples { + by_series + .entry(&s.labels) + .or_default() + .push((s.timestamp_ms, s.value)); + } - if samples.is_empty() { - return StatusCode::NO_CONTENT; - } + // Convert to owned keys for batch routing + let by_series_owned: HashMap> = by_series + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .collect(); + + // Route all series to workers concurrently + if let Err(e) = state + .router + .route_batch(by_series_owned, ingest_received_at) + .await + { + warn!("Batch routing error: {}", e); + return StatusCode::INTERNAL_SERVER_ERROR; + } - let count = samples.len() as u64; - state - .samples_ingested - .fetch_add(count, std::sync::atomic::Ordering::Relaxed); - - // Group samples by series key for batch routing - let mut by_series: HashMap<&str, Vec<(i64, f64)>> = HashMap::new(); - for s in &samples { - by_series - .entry(&s.labels) - .or_default() - .push((s.timestamp_ms, s.value)); - } + StatusCode::NO_CONTENT +} - // Convert to owned keys for batch routing - let by_series_owned: HashMap> = by_series - .into_iter() - .map(|(k, v)| (k.to_string(), v)) - .collect(); - - // Route all series to workers concurrently - if let Err(e) = state - .router - .route_batch(by_series_owned, ingest_received_at) - .await - { - warn!("Batch routing error: {}", e); - return StatusCode::INTERNAL_SERVER_ERROR; +/// Axum handler for Prometheus remote write (Snappy + Protobuf). +async fn handle_prometheus_ingest( + State(state): State>, + body: Bytes, +) -> StatusCode { + let ingest_received_at = Instant::now(); + let samples = match decode_prometheus_remote_write(&body) { + Ok(s) => s, + Err(e) => { + warn!("Failed to decode Prometheus remote write: {}", e); + return StatusCode::BAD_REQUEST; } + }; + route_decoded_samples(&state, samples, ingest_received_at).await +} - StatusCode::NO_CONTENT - } - .instrument(ingest_span) - .await +/// Axum handler for VictoriaMetrics remote write (Zstd + Protobuf). +async fn handle_victoriametrics_ingest( + State(state): State>, + body: Bytes, +) -> StatusCode { + let ingest_received_at = Instant::now(); + let samples = match decode_victoriametrics_remote_write(&body) { + Ok(s) => s, + Err(e) => { + warn!("Failed to decode VictoriaMetrics remote write: {}", e); + return StatusCode::BAD_REQUEST; + } + }; + route_decoded_samples(&state, samples, ingest_received_at).await } diff --git a/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md b/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md index 64d6955..d487767 100644 --- a/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md +++ b/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md @@ -3,10 +3,10 @@ ## 1. Overview The Precompute Engine is a real-time streaming aggregation system that sits between -Prometheus-compatible metric producers and ASAP storage and query engine. It accepts raw -time-series samples via the Prometheus remote-write protocol ingestion connector, buffers them, computes -windowed aggregations (sketches), and writes the results to a store for fast -query-time retrieval. +metric producers and ASAP storage and query engine. It accepts raw +time-series samples via multiple ingestion connectors (Prometheus remote write +and VictoriaMetrics remote write), buffers them, computes windowed aggregations +(sketches), and writes the results to a store for fast query-time retrieval. **Key properties:** - Single-machine, multi-threaded architecture (all workers run as async tasks within one process) @@ -19,12 +19,17 @@ query-time retrieval. ## 2. Architecture ``` - Prometheus Remote Write - | + Prometheus Remote Write VictoriaMetrics Remote Write + (Snappy + Protobuf) (Zstd + Protobuf) + | | + v v + POST /api/v1/write POST /api/v1/import + \ / + \ / Axum HTTP Server (:9090) - POST /api/v1/write | - decode + group by series + route_decoded_samples() + (group by series key) | SeriesRouter (hash) / | \ @@ -57,7 +62,7 @@ Top-level orchestrator. On `run()`: 3. Spawns `Worker` tasks, each owning its receiver. 4. Spawns a flush timer that calls `router.broadcast_flush()` every `flush_interval_ms`. -5. Starts the Axum HTTP server and blocks until shutdown. +5. Starts the Axum HTTP server with routes for each ingest connector and blocks until shutdown. ```rust pub struct PrecomputeEngine { @@ -362,7 +367,7 @@ The current implementation is **single-machine, multi-threaded**. All components tasks on a shared thread pool. There is no distributed coordination, no cross-machine communication, and no external dependency beyond the store. -- **Ingest HTTP handler**: Axum async handler, stateless decoding. +- **Ingest HTTP handlers**: Per-connector Axum async handlers (Prometheus, VictoriaMetrics) with shared format-agnostic routing logic. - **SeriesRouter**: Lock-free hash routing. No shared mutable state. - **Workers**: Each worker is a single Tokio task that owns its `series_map` exclusively. No locks needed within a worker — thread safety comes from From e9037835c89c058512802d42fa9806d08c0481c8 Mon Sep 17 00:00:00 2001 From: zz_y Date: Thu, 26 Feb 2026 15:43:30 -0700 Subject: [PATCH 17/19] Run cargo fmt Co-Authored-By: Claude Opus 4.6 --- asap-query-engine/src/precompute_engine/worker.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index 446c255..cb58fe0 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -248,7 +248,8 @@ impl Worker { LateDataPolicy::ForwardToStore => { let mut updater = create_accumulator_updater(&agg_state.config); if updater.is_keyed() { - let key = extract_key_from_series(series_key, &agg_state.config); + let key = + extract_key_from_series(series_key, &agg_state.config); updater.update_keyed(&key, val, ts); } else { updater.update_single(val, ts); From e5b6abc34f528dafc42fe8a788662d03ca34005f Mon Sep 17 00:00:00 2001 From: zz_y Date: Thu, 26 Feb 2026 16:16:27 -0700 Subject: [PATCH 18/19] Add pane-based incremental sliding window computation Route each sample to exactly 1 pane (sub-window of size slide_interval) instead of W overlapping window accumulators. When a window closes, merge its constituent panes via AggregateCore::merge_with(). This reduces per-sample accumulator updates from W to 1 for sliding windows. - Add snapshot_accumulator() to AccumulatorUpdater trait (9 implementations) - Add pane_start_for(), panes_for_window(), slide_interval_ms() to WindowManager - Replace active_windows HashMap with active_panes BTreeMap in worker - Rewrite sample routing and window close logic with pane merging - Extract PrecomputeEngine to engine.rs, ingest handlers to ingest_handler.rs - Update design doc with pane-based architecture Co-Authored-By: Claude Opus 4.6 --- .../precompute_engine/accumulator_factory.rs | 48 ++++ .../src/precompute_engine/engine.rs | 125 ++++++++++ .../src/precompute_engine/ingest_handler.rs | 89 +++++++ .../src/precompute_engine/mod.rs | 209 +---------------- .../precompute_engine_design_doc.md | 142 ++++++++++- .../src/precompute_engine/window_manager.rs | 96 ++++++++ .../src/precompute_engine/worker.rs | 220 ++++++++++++------ 7 files changed, 636 insertions(+), 293 deletions(-) create mode 100644 asap-query-engine/src/precompute_engine/engine.rs create mode 100644 asap-query-engine/src/precompute_engine/ingest_handler.rs diff --git a/asap-query-engine/src/precompute_engine/accumulator_factory.rs b/asap-query-engine/src/precompute_engine/accumulator_factory.rs index 7c181e6..52c376c 100644 --- a/asap-query-engine/src/precompute_engine/accumulator_factory.rs +++ b/asap-query-engine/src/precompute_engine/accumulator_factory.rs @@ -20,6 +20,10 @@ pub trait AccumulatorUpdater: Send { /// Extract the final accumulator as a boxed `AggregateCore`. fn take_accumulator(&mut self) -> Box; + /// Non-destructive read of the current accumulator state (clone without reset). + /// Used by pane-based sliding windows to read shared panes. + fn snapshot_accumulator(&self) -> Box; + /// Reset internal state for reuse (avoids re-allocation). fn reset(&mut self); @@ -67,6 +71,10 @@ impl AccumulatorUpdater for SumAccumulatorUpdater { result } + fn snapshot_accumulator(&self) -> Box { + Box::new(self.acc.clone()) + } + fn reset(&mut self) { self.acc = SumAccumulator::new(); } @@ -113,6 +121,10 @@ impl AccumulatorUpdater for MinMaxAccumulatorUpdater { result } + fn snapshot_accumulator(&self) -> Box { + Box::new(self.acc.clone()) + } + fn reset(&mut self) { self.acc = MinMaxAccumulator::new(self.sub_type.clone()); } @@ -175,6 +187,18 @@ impl AccumulatorUpdater for IncreaseAccumulatorUpdater { result } + fn snapshot_accumulator(&self) -> Box { + match &self.acc { + Some(acc) => Box::new(acc.clone()), + None => Box::new(IncreaseAccumulator::new( + Measurement::new(0.0), + 0, + Measurement::new(0.0), + 0, + )), + } + } + fn reset(&mut self) { self.acc = None; } @@ -221,6 +245,10 @@ impl AccumulatorUpdater for KllAccumulatorUpdater { result } + fn snapshot_accumulator(&self) -> Box { + Box::new(self.acc.clone()) + } + fn reset(&mut self) { self.acc = DatasketchesKLLAccumulator::new(self.k); } @@ -272,6 +300,10 @@ impl AccumulatorUpdater for MultipleSumUpdater { result } + fn snapshot_accumulator(&self) -> Box { + Box::new(self.acc.clone()) + } + fn reset(&mut self) { self.acc = MultipleSumAccumulator::new(); } @@ -319,6 +351,10 @@ impl AccumulatorUpdater for MultipleMinMaxUpdater { result } + fn snapshot_accumulator(&self) -> Box { + Box::new(self.acc.clone()) + } + fn reset(&mut self) { self.acc = MultipleMinMaxAccumulator::new(self.sub_type.clone()); } @@ -384,6 +420,10 @@ impl AccumulatorUpdater for MultipleIncreaseUpdater { result } + fn snapshot_accumulator(&self) -> Box { + Box::new(self.acc.clone()) + } + fn reset(&mut self) { self.acc = MultipleIncreaseAccumulator::new(); } @@ -435,6 +475,10 @@ impl AccumulatorUpdater for CmsAccumulatorUpdater { result } + fn snapshot_accumulator(&self) -> Box { + Box::new(self.acc.clone()) + } + fn reset(&mut self) { self.acc = CountMinSketchAccumulator::new(self.row_num, self.col_num); } @@ -486,6 +530,10 @@ impl AccumulatorUpdater for HydraKllAccumulatorUpdater { result } + fn snapshot_accumulator(&self) -> Box { + Box::new(self.acc.clone()) + } + fn reset(&mut self) { self.acc = HydraKllSketchAccumulator::new(self.row_num, self.col_num, self.k); } diff --git a/asap-query-engine/src/precompute_engine/engine.rs b/asap-query-engine/src/precompute_engine/engine.rs new file mode 100644 index 0000000..5df148c --- /dev/null +++ b/asap-query-engine/src/precompute_engine/engine.rs @@ -0,0 +1,125 @@ +use crate::data_model::StreamingConfig; +use crate::precompute_engine::config::PrecomputeEngineConfig; +use crate::precompute_engine::ingest_handler::{ + handle_prometheus_ingest, handle_victoriametrics_ingest, IngestState, +}; +use crate::precompute_engine::output_sink::OutputSink; +use crate::precompute_engine::series_router::{SeriesRouter, WorkerMessage}; +use crate::precompute_engine::worker::Worker; +use axum::{routing::post, Router}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::net::TcpListener; +use tokio::sync::mpsc; +use tracing::{info, warn}; + +/// The top-level precompute engine orchestrator. +/// +/// Creates worker threads, the series router, and the Axum ingest server. +pub struct PrecomputeEngine { + config: PrecomputeEngineConfig, + streaming_config: Arc, + output_sink: Arc, +} + +impl PrecomputeEngine { + pub fn new( + config: PrecomputeEngineConfig, + streaming_config: Arc, + output_sink: Arc, + ) -> Self { + Self { + config, + streaming_config, + output_sink, + } + } + + /// Start the precompute engine. This spawns worker tasks and the HTTP + /// ingest server, then blocks until shutdown. + pub async fn run(self) -> Result<(), Box> { + let num_workers = self.config.num_workers; + let channel_size = self.config.channel_buffer_size; + + // Build MPSC channels for each worker + let mut senders = Vec::with_capacity(num_workers); + let mut receivers = Vec::with_capacity(num_workers); + for _ in 0..num_workers { + let (tx, rx) = mpsc::channel::(channel_size); + senders.push(tx); + receivers.push(rx); + } + + // Build the router + let router = SeriesRouter::new(senders); + + // Build aggregation config map from streaming config + let agg_configs: HashMap = + self.streaming_config.get_all_aggregation_configs().clone(); + + // Spawn workers + let mut worker_handles = Vec::with_capacity(num_workers); + for (id, rx) in receivers.into_iter().enumerate() { + let worker = Worker::new( + id, + rx, + self.output_sink.clone(), + agg_configs.clone(), + self.config.max_buffer_per_series, + self.config.allowed_lateness_ms, + self.config.pass_raw_samples, + self.config.raw_mode_aggregation_id, + self.config.late_data_policy, + ); + let handle = tokio::spawn(async move { + worker.run().await; + }); + worker_handles.push(handle); + } + + info!( + "PrecomputeEngine started with {} workers on port {}", + num_workers, self.config.ingest_port + ); + + // Build the ingest state + let ingest_state = Arc::new(IngestState { + router, + samples_ingested: std::sync::atomic::AtomicU64::new(0), + }); + + // Start flush timer + let flush_state = ingest_state.clone(); + let flush_interval_ms = self.config.flush_interval_ms; + tokio::spawn(async move { + let mut interval = + tokio::time::interval(tokio::time::Duration::from_millis(flush_interval_ms)); + loop { + interval.tick().await; + if let Err(e) = flush_state.router.broadcast_flush().await { + warn!("Flush broadcast error: {}", e); + break; + } + } + }); + + // Start the Axum HTTP server for ingest (Prometheus + VictoriaMetrics) + let app = Router::new() + .route("/api/v1/write", post(handle_prometheus_ingest)) + .route("/api/v1/import", post(handle_victoriametrics_ingest)) + .with_state(ingest_state); + + let addr = format!("0.0.0.0:{}", self.config.ingest_port); + info!("Ingest server listening on {}", addr); + + let listener = TcpListener::bind(&addr).await?; + axum::serve(listener, app).await?; + + // Wait for workers to finish (this only happens on shutdown) + for handle in worker_handles { + let _ = handle.await; + } + + Ok(()) + } +} diff --git a/asap-query-engine/src/precompute_engine/ingest_handler.rs b/asap-query-engine/src/precompute_engine/ingest_handler.rs new file mode 100644 index 0000000..57537dd --- /dev/null +++ b/asap-query-engine/src/precompute_engine/ingest_handler.rs @@ -0,0 +1,89 @@ +use crate::drivers::ingest::prometheus_remote_write::decode_prometheus_remote_write; +use crate::drivers::ingest::victoriametrics_remote_write::decode_victoriametrics_remote_write; +use crate::precompute_engine::series_router::SeriesRouter; +use axum::{body::Bytes, extract::State, http::StatusCode}; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Instant; +use tracing::warn; + +/// Shared state for the ingest HTTP handler. +pub(crate) struct IngestState { + pub(crate) router: SeriesRouter, + pub(crate) samples_ingested: std::sync::atomic::AtomicU64, +} + +/// Shared logic: group decoded samples by series key and route to workers. +async fn route_decoded_samples( + state: &IngestState, + samples: Vec, + ingest_received_at: Instant, +) -> StatusCode { + if samples.is_empty() { + return StatusCode::NO_CONTENT; + } + + let count = samples.len() as u64; + state + .samples_ingested + .fetch_add(count, std::sync::atomic::Ordering::Relaxed); + + // Group samples by series key for batch routing + let mut by_series: HashMap<&str, Vec<(i64, f64)>> = HashMap::new(); + for s in &samples { + by_series + .entry(&s.labels) + .or_default() + .push((s.timestamp_ms, s.value)); + } + + // Convert to owned keys for batch routing + let by_series_owned: HashMap> = by_series + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .collect(); + + // Route all series to workers concurrently + if let Err(e) = state + .router + .route_batch(by_series_owned, ingest_received_at) + .await + { + warn!("Batch routing error: {}", e); + return StatusCode::INTERNAL_SERVER_ERROR; + } + + StatusCode::NO_CONTENT +} + +/// Axum handler for Prometheus remote write (Snappy + Protobuf). +pub(crate) async fn handle_prometheus_ingest( + State(state): State>, + body: Bytes, +) -> StatusCode { + let ingest_received_at = Instant::now(); + let samples = match decode_prometheus_remote_write(&body) { + Ok(s) => s, + Err(e) => { + warn!("Failed to decode Prometheus remote write: {}", e); + return StatusCode::BAD_REQUEST; + } + }; + route_decoded_samples(&state, samples, ingest_received_at).await +} + +/// Axum handler for VictoriaMetrics remote write (Zstd + Protobuf). +pub(crate) async fn handle_victoriametrics_ingest( + State(state): State>, + body: Bytes, +) -> StatusCode { + let ingest_received_at = Instant::now(); + let samples = match decode_victoriametrics_remote_write(&body) { + Ok(s) => s, + Err(e) => { + warn!("Failed to decode VictoriaMetrics remote write: {}", e); + return StatusCode::BAD_REQUEST; + } + }; + route_decoded_samples(&state, samples, ingest_received_at).await +} diff --git a/asap-query-engine/src/precompute_engine/mod.rs b/asap-query-engine/src/precompute_engine/mod.rs index 5801724..7b960ca 100644 --- a/asap-query-engine/src/precompute_engine/mod.rs +++ b/asap-query-engine/src/precompute_engine/mod.rs @@ -1,214 +1,11 @@ pub mod accumulator_factory; pub mod config; +mod engine; +mod ingest_handler; pub mod output_sink; pub mod series_buffer; pub mod series_router; pub mod window_manager; pub mod worker; -use crate::data_model::StreamingConfig; -use crate::drivers::ingest::prometheus_remote_write::decode_prometheus_remote_write; -use crate::drivers::ingest::victoriametrics_remote_write::decode_victoriametrics_remote_write; -use crate::precompute_engine::config::PrecomputeEngineConfig; -use crate::precompute_engine::output_sink::OutputSink; -use crate::precompute_engine::series_router::{SeriesRouter, WorkerMessage}; -use crate::precompute_engine::worker::Worker; -use axum::{body::Bytes, extract::State, http::StatusCode, routing::post, Router}; -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Instant; -use tokio::net::TcpListener; -use tokio::sync::mpsc; -use tracing::{info, warn}; - -/// Shared state for the ingest HTTP handler. -struct IngestState { - router: SeriesRouter, - samples_ingested: std::sync::atomic::AtomicU64, -} - -/// The top-level precompute engine orchestrator. -/// -/// Creates worker threads, the series router, and the Axum ingest server. -pub struct PrecomputeEngine { - config: PrecomputeEngineConfig, - streaming_config: Arc, - output_sink: Arc, -} - -impl PrecomputeEngine { - pub fn new( - config: PrecomputeEngineConfig, - streaming_config: Arc, - output_sink: Arc, - ) -> Self { - Self { - config, - streaming_config, - output_sink, - } - } - - /// Start the precompute engine. This spawns worker tasks and the HTTP - /// ingest server, then blocks until shutdown. - pub async fn run(self) -> Result<(), Box> { - let num_workers = self.config.num_workers; - let channel_size = self.config.channel_buffer_size; - - // Build MPSC channels for each worker - let mut senders = Vec::with_capacity(num_workers); - let mut receivers = Vec::with_capacity(num_workers); - for _ in 0..num_workers { - let (tx, rx) = mpsc::channel::(channel_size); - senders.push(tx); - receivers.push(rx); - } - - // Build the router - let router = SeriesRouter::new(senders); - - // Build aggregation config map from streaming config - let agg_configs: HashMap = - self.streaming_config.get_all_aggregation_configs().clone(); - - // Spawn workers - let mut worker_handles = Vec::with_capacity(num_workers); - for (id, rx) in receivers.into_iter().enumerate() { - let worker = Worker::new( - id, - rx, - self.output_sink.clone(), - agg_configs.clone(), - self.config.max_buffer_per_series, - self.config.allowed_lateness_ms, - self.config.pass_raw_samples, - self.config.raw_mode_aggregation_id, - self.config.late_data_policy, - ); - let handle = tokio::spawn(async move { - worker.run().await; - }); - worker_handles.push(handle); - } - - info!( - "PrecomputeEngine started with {} workers on port {}", - num_workers, self.config.ingest_port - ); - - // Build the ingest state - let ingest_state = Arc::new(IngestState { - router, - samples_ingested: std::sync::atomic::AtomicU64::new(0), - }); - - // Start flush timer - let flush_state = ingest_state.clone(); - let flush_interval_ms = self.config.flush_interval_ms; - tokio::spawn(async move { - let mut interval = - tokio::time::interval(tokio::time::Duration::from_millis(flush_interval_ms)); - loop { - interval.tick().await; - if let Err(e) = flush_state.router.broadcast_flush().await { - warn!("Flush broadcast error: {}", e); - break; - } - } - }); - - // Start the Axum HTTP server for ingest (Prometheus + VictoriaMetrics) - let app = Router::new() - .route("/api/v1/write", post(handle_prometheus_ingest)) - .route("/api/v1/import", post(handle_victoriametrics_ingest)) - .with_state(ingest_state); - - let addr = format!("0.0.0.0:{}", self.config.ingest_port); - info!("Ingest server listening on {}", addr); - - let listener = TcpListener::bind(&addr).await?; - axum::serve(listener, app).await?; - - // Wait for workers to finish (this only happens on shutdown) - for handle in worker_handles { - let _ = handle.await; - } - - Ok(()) - } -} - -/// Shared logic: group decoded samples by series key and route to workers. -async fn route_decoded_samples( - state: &IngestState, - samples: Vec, - ingest_received_at: Instant, -) -> StatusCode { - if samples.is_empty() { - return StatusCode::NO_CONTENT; - } - - let count = samples.len() as u64; - state - .samples_ingested - .fetch_add(count, std::sync::atomic::Ordering::Relaxed); - - // Group samples by series key for batch routing - let mut by_series: HashMap<&str, Vec<(i64, f64)>> = HashMap::new(); - for s in &samples { - by_series - .entry(&s.labels) - .or_default() - .push((s.timestamp_ms, s.value)); - } - - // Convert to owned keys for batch routing - let by_series_owned: HashMap> = by_series - .into_iter() - .map(|(k, v)| (k.to_string(), v)) - .collect(); - - // Route all series to workers concurrently - if let Err(e) = state - .router - .route_batch(by_series_owned, ingest_received_at) - .await - { - warn!("Batch routing error: {}", e); - return StatusCode::INTERNAL_SERVER_ERROR; - } - - StatusCode::NO_CONTENT -} - -/// Axum handler for Prometheus remote write (Snappy + Protobuf). -async fn handle_prometheus_ingest( - State(state): State>, - body: Bytes, -) -> StatusCode { - let ingest_received_at = Instant::now(); - let samples = match decode_prometheus_remote_write(&body) { - Ok(s) => s, - Err(e) => { - warn!("Failed to decode Prometheus remote write: {}", e); - return StatusCode::BAD_REQUEST; - } - }; - route_decoded_samples(&state, samples, ingest_received_at).await -} - -/// Axum handler for VictoriaMetrics remote write (Zstd + Protobuf). -async fn handle_victoriametrics_ingest( - State(state): State>, - body: Bytes, -) -> StatusCode { - let ingest_received_at = Instant::now(); - let samples = match decode_victoriametrics_remote_write(&body) { - Ok(s) => s, - Err(e) => { - warn!("Failed to decode VictoriaMetrics remote write: {}", e); - return StatusCode::BAD_REQUEST; - } - }; - route_decoded_samples(&state, samples, ingest_received_at).await -} +pub use engine::PrecomputeEngine; diff --git a/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md b/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md index d487767..fd181f6 100644 --- a/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md +++ b/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md @@ -146,10 +146,54 @@ struct SeriesState { struct AggregationState { config: AggregationConfig, window_manager: WindowManager, - active_windows: HashMap>, + active_panes: BTreeMap>, } ``` +#### Pane-Based Sliding Window Optimization + +The worker uses **pane-based incremental computation** to reduce per-sample +work for sliding windows. The timeline is divided into non-overlapping **panes** +of size `slide_interval`. Each window is composed of `W = window_size / +slide_interval` consecutive panes. Consecutive windows share W-1 panes. + +``` +Panes: [0,10) [10,20) [20,30) [30,40) [40,50) +Window A: [───────── 0,30 ─────────) +Window B: [───────── 10,40 ─────────) +Window C: [───────── 20,50 ─────────) +``` + +**Why panes instead of subtraction?** Only Sum is invertible. MinMax, Increase, +KLL, CMS, HydraKLL are all non-invertible. Pane+merge works universally because +all accumulator types implement `AggregateCore::merge_with()`. + +**Performance comparison** (N samples per window, W = window_size / slide_interval): + +| | Per-window approach | Pane-based | +|--|---------|------------| +| Per-sample accumulator updates | N × W | N × 1 | +| Per-window-close merges | 0 | W - 1 | +| Per-window-close clones | 0 | W - 2 (shared panes) | + +Net win when N >> W (typical: thousands of samples per window, W = 3-5). +For tumbling windows (W=1), panes degenerate to 1 pane = 1 window with +zero merges — identical behavior to the non-pane approach. + +**Key methods:** + +| Method | Description | +|--------|-------------| +| `pane_start_for(ts)` | Align timestamp to slide grid (same as `window_start_for`) | +| `panes_for_window(ws)` | All pane starts composing window `[ws, ws+size)` | +| `snapshot_accumulator()` | Non-destructive read of a pane's accumulator (for shared panes) | + +**Pane eviction:** When window `[S, S+W)` closes, pane `[S, S+slide)` is the +oldest pane and is not needed by any later window (next window starts at +`S+slide`). It is destructively consumed via `take_accumulator()` and removed +from `active_panes`. Remaining panes are read non-destructively via +`snapshot_accumulator()`. + #### Processing pipeline (`process_samples`) ``` @@ -157,14 +201,18 @@ struct AggregationState { 2. Insert samples into SeriesBuffer, update watermark 3. Drop samples beyond allowed_lateness_ms behind watermark 4. For each sample × each aggregation: - a. Compute window_starts_containing(ts) - b. For each window_start: - - If window not in active_windows AND already closed (watermark >= window_end): - → late_data_policy == Drop: skip - → late_data_policy == ForwardToStore: create mini-accumulator, emit - - Else: insert into active_windows, feed value to AccumulatorUpdater + a. Compute pane_start = pane_start_for(ts) + b. If pane was evicted (late data for closed window): + → late_data_policy == Drop: skip + → late_data_policy == ForwardToStore: create mini-accumulator, emit + c. Else: get-or-create pane in active_panes, feed value (1 update per sample) 5. Detect newly closed windows via closed_windows(prev_wm, current_wm) -6. Extract closed window accumulators → PrecomputedOutput + AggregateCore +6. For each closed window: + a. Get pane starts via panes_for_window(window_start) + b. Oldest pane: take_accumulator() + remove from active_panes (destructive) + c. Remaining panes: snapshot_accumulator() (non-destructive, shared) + d. Merge all pane accumulators via AggregateCore::merge_with() + e. Emit merged result as PrecomputedOutput + AggregateCore 7. Emit batch to OutputSink 8. Update previous_watermark_ms ``` @@ -211,10 +259,53 @@ struct WindowManager { | `window_starts_containing(ts)` | All windows whose `[start, start+size)` includes `ts`. Tumbling → 1 window; sliding → `ceil(size/slide)` windows | | `closed_windows(prev_wm, curr_wm)` | Windows that transitioned open→closed as the watermark advanced | | `window_bounds(start)` | Returns `(start, start + window_size_ms)` | +| `pane_start_for(ts)` | Pane start for a timestamp (same slide-aligned grid as `window_start_for`) | +| `panes_for_window(ws)` | All pane starts composing window `[ws, ws+size)`, in ascending order | +| `slide_interval_ms()` | Slide interval accessor | **Window closure rule:** a window `[S, S + size)` closes when `watermark >= S + size`. Once closed, a window never reopens. +#### Sliding window mechanics + +Tumbling windows are a special case of sliding windows where +`slide_interval == window_size`. The same code handles both — no separate paths. + +**`window_start_for(ts)`** aligns a timestamp to the slide grid: +```rust +let n = timestamp_ms.div_euclid(slide_interval_ms); +n * slide_interval_ms +``` + +**`window_starts_containing(ts)`** returns all windows whose `[start, start+size)` +contains the timestamp, by walking backwards from the aligned start: +```rust +let mut start = window_start_for(timestamp_ms); +while start + window_size_ms > timestamp_ms { + starts.push(start); + start -= slide_interval_ms; +} +``` + +For tumbling windows this always yields exactly 1 result. For sliding windows, +each sample belongs to `ceil(window_size / slide_interval)` overlapping windows. + +**Example** (30s window, 10s slide): +``` +t=15s → belongs to windows [0, 30s), [10s, 40s), [-10s, 20s) (3 windows) +t=35s → belongs to windows [30s, 60s), [20s, 50s), [10s, 40s) (3 windows) +``` + +**`closed_windows(prev_wm, curr_wm)`** finds windows that transitioned open→closed +as the watermark advanced. It scans forward from the earliest possibly-open window +start, collecting those where `start + size <= curr_wm` (now closed) AND +`start + size > prev_wm` (was still open before). + +The worker calls `window_starts_containing(ts)` for each incoming sample and feeds +the value into the accumulator for every matching window. When +`closed_windows()` fires, each closed window's accumulator is extracted and +emitted independently. + ### 3.7 AccumulatorUpdater (`accumulator_factory.rs`) Trait-based interface for feeding samples into sketch accumulators: @@ -224,12 +315,17 @@ trait AccumulatorUpdater: Send { fn update_single(&mut self, value: f64, timestamp_ms: i64); fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, timestamp_ms: i64); fn take_accumulator(&mut self) -> Box; + fn snapshot_accumulator(&self) -> Box; // non-destructive clone fn reset(&mut self); fn is_keyed(&self) -> bool; fn memory_usage_bytes(&self) -> usize; } ``` +`snapshot_accumulator()` returns a clone of the current state without resetting. +Used by pane-based sliding windows to read shared panes that are still needed +by future windows. + The factory function `create_accumulator_updater(config)` dispatches on `(aggregation_type, aggregation_sub_type)`: @@ -305,6 +401,28 @@ pub struct AggregationConfig { `OutputSink.emit_batch()` → `Store.insert_precomputed_output_batch()` +Because the precompute engine runs in the same process as the store, the write +path involves **zero serialization and zero network hops**. Closed window +accumulators flow from worker to store entirely as in-memory trait objects: + +``` +Worker: updater.take_accumulator() → Box (in-memory) + ↓ (direct function call, no IPC) +OutputSink: store.insert_precomputed_output_batch(outputs) (pass-through) + ↓ (direct function call, same process) +SimpleMapStore: HashMap entry insert → Box (stored as-is) +``` + +No serialization, deserialization, compression, or network transfer occurs +between the worker extracting an accumulator and the store persisting it. +The only network hops in the system are at the edges: HTTP ingest (in) and +HTTP query (out). Serialization of accumulators only happens on the read path +when query results are returned to clients. + +This is in contrast to the external Kafka ingest path, where precomputes from +Arroyo/Flink arrive hex-encoded + gzip-compressed + MessagePack-serialized and +require multiple deserialization steps. + The `SimpleMapStore` (PerKey variant) uses: ``` DashMap>> @@ -387,13 +505,15 @@ independently. **Ingest path (per batch):** - Sample insert: O(log B) per sample (BTreeMap, B = buffer size) -- Window assignment: O(A × W) per sample (A = matching aggregations, - W = windows per sample — 1 for tumbling, ~3 for typical sliding) +- Pane routing: O(A) per sample (A = matching aggregations; each sample + touches exactly 1 pane per aggregation, regardless of window overlap) - Accumulator update: O(1) for Sum/MinMax, O(log k) for KLL +- Window close: O(W-1) merges per closed window (W = window_size / slide_interval) **Memory:** - O(S × N) buffered samples (S = max per series, N = active series) -- O(A × W_open) active window accumulators +- O(A × W_open) active pane accumulators (fewer than window accumulators + since panes are shared across overlapping windows) **Throughput:** - Workers process in parallel with no cross-shard coordination. diff --git a/asap-query-engine/src/precompute_engine/window_manager.rs b/asap-query-engine/src/precompute_engine/window_manager.rs index d8bf23b..14b4d1e 100644 --- a/asap-query-engine/src/precompute_engine/window_manager.rs +++ b/asap-query-engine/src/precompute_engine/window_manager.rs @@ -90,6 +90,27 @@ impl WindowManager { pub fn window_bounds(&self, window_start: i64) -> (i64, i64) { (window_start, window_start + self.window_size_ms) } + + /// Slide interval accessor. + pub fn slide_interval_ms(&self) -> i64 { + self.slide_interval_ms + } + + /// Pane start for a timestamp. Panes are aligned to the slide_interval grid, + /// which is the same grid as `window_start_for`. + pub fn pane_start_for(&self, timestamp_ms: i64) -> i64 { + self.window_start_for(timestamp_ms) + } + + /// All pane starts composing a window, in ascending order. + /// A window `[ws, ws + window_size)` is composed of + /// `window_size / slide_interval` consecutive panes. + pub fn panes_for_window(&self, window_start: i64) -> Vec { + let num_panes = self.window_size_ms / self.slide_interval_ms; + (0..num_panes) + .map(|i| window_start + i * self.slide_interval_ms) + .collect() + } } #[cfg(test)] @@ -194,4 +215,79 @@ mod tests { starts.sort(); assert_eq!(starts, vec![10_000, 20_000, 30_000]); } + + // --- Pane method tests --- + + #[test] + fn test_pane_start_for_equals_window_start_for() { + // Pane start and window start use the same slide-aligned grid + let wm = WindowManager::new(30, 10); + for ts in [0, 5_000, 9_999, 10_000, 15_000, 25_000, 30_000] { + assert_eq!(wm.pane_start_for(ts), wm.window_start_for(ts)); + } + } + + #[test] + fn test_panes_for_window_sliding() { + // 30s window, 10s slide → 3 panes per window + let wm = WindowManager::new(30, 10); + + assert_eq!(wm.panes_for_window(0), vec![0, 10_000, 20_000]); + assert_eq!( + wm.panes_for_window(10_000), + vec![10_000, 20_000, 30_000] + ); + assert_eq!( + wm.panes_for_window(20_000), + vec![20_000, 30_000, 40_000] + ); + } + + #[test] + fn test_panes_for_window_tumbling_degeneration() { + // 60s tumbling window → 1 pane per window (no merges needed) + let wm = WindowManager::new(60, 0); + + assert_eq!(wm.panes_for_window(0), vec![0]); + assert_eq!(wm.panes_for_window(60_000), vec![60_000]); + } + + #[test] + fn test_slide_interval_ms_accessor() { + let wm_tumbling = WindowManager::new(60, 0); + assert_eq!(wm_tumbling.slide_interval_ms(), 60_000); + + let wm_sliding = WindowManager::new(30, 10); + assert_eq!(wm_sliding.slide_interval_ms(), 10_000); + } + + #[test] + fn test_panes_for_window_count() { + // W = window_size / slide_interval + let wm = WindowManager::new(30, 10); + assert_eq!(wm.panes_for_window(0).len(), 3); // 30/10 = 3 + + let wm2 = WindowManager::new(50, 10); + assert_eq!(wm2.panes_for_window(0).len(), 5); // 50/10 = 5 + + let wm3 = WindowManager::new(60, 0); + assert_eq!(wm3.panes_for_window(0).len(), 1); // tumbling = 1 + } + + #[test] + fn test_consecutive_windows_share_panes() { + // 30s window, 10s slide — consecutive windows share W-1 = 2 panes + let wm = WindowManager::new(30, 10); + + let panes_a = wm.panes_for_window(0); // [0, 10_000, 20_000] + let panes_b = wm.panes_for_window(10_000); // [10_000, 20_000, 30_000] + + // Shared panes: 10_000 and 20_000 + let shared: Vec = panes_a + .iter() + .filter(|p| panes_b.contains(p)) + .copied() + .collect(); + assert_eq!(shared, vec![10_000, 20_000]); + } } diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index cb58fe0..8e8c15b 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -9,18 +9,24 @@ use crate::precompute_engine::series_router::WorkerMessage; use crate::precompute_engine::window_manager::WindowManager; use crate::precompute_operators::sum_accumulator::SumAccumulator; use sketch_db_common::aggregation_config::AggregationConfig; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use tokio::sync::mpsc; use tracing::{debug, debug_span, info, warn}; /// Per-aggregation state within a series: the window manager and active -/// window accumulators. +/// pane accumulators. +/// +/// Uses pane-based sliding window computation: each sample is routed to +/// exactly 1 pane (sub-window of size `slide_interval`). When a window +/// closes, its constituent panes are merged. This reduces per-sample +/// accumulator updates from W to 1 (where W = window_size / slide_interval). struct AggregationState { config: AggregationConfig, window_manager: WindowManager, - /// Active windows keyed by window_start_ms. - active_windows: HashMap>, + /// Active panes keyed by pane_start_ms. + /// BTreeMap for ordered iteration (needed for pane eviction). + active_panes: BTreeMap>, } /// Per-series state owned by the worker. @@ -154,7 +160,7 @@ impl Worker { .map(|(_, config)| AggregationState { window_manager: WindowManager::new(config.window_size, config.slide_interval), config: config.clone(), - active_windows: HashMap::new(), + active_panes: BTreeMap::new(), }) .collect(); @@ -221,84 +227,119 @@ impl Worker { .window_manager .closed_windows(previous_wm, current_wm); - // Feed each incoming sample to the correct active window accumulator + // Pane-based sample routing: each sample goes to exactly 1 pane for &(ts, val) in &samples { if current_wm != i64::MIN && ts < current_wm - allowed_lateness_ms { continue; // already dropped } - let window_starts = agg_state.window_manager.window_starts_containing(ts); - - for window_start in window_starts { - let window_end = window_start + agg_state.window_manager.window_size_ms(); - - // Check if this window was already closed in a previous batch - if !agg_state.active_windows.contains_key(&window_start) - && current_wm >= window_end - { - // Window already closed — handle according to policy - match late_data_policy { - LateDataPolicy::Drop => { - debug!( - "Dropping late sample for closed window [{}, {})", - window_start, window_end - ); - continue; - } - LateDataPolicy::ForwardToStore => { - let mut updater = create_accumulator_updater(&agg_state.config); - if updater.is_keyed() { - let key = - extract_key_from_series(series_key, &agg_state.config); - updater.update_keyed(&key, val, ts); - } else { - updater.update_single(val, ts); - } - let key = if updater.is_keyed() { - Some(extract_key_from_series(series_key, &agg_state.config)) - } else { - None - }; - let output = PrecomputedOutput::new( - window_start as u64, - window_end as u64, - key, - agg_state.config.aggregation_id, - ); - emit_batch.push((output, updater.take_accumulator())); - debug!( - "Forwarding late sample to store for closed window [{}, {})", - window_start, window_end - ); - continue; + let pane_start = agg_state.window_manager.pane_start_for(ts); + let pane_end = pane_start + agg_state.window_manager.slide_interval_ms(); + + // Check if pane was already evicted (late data for a closed window). + // A pane is evicted when its oldest window closes, i.e. the window + // starting at pane_start. If that window is closed, the pane is gone. + if !agg_state.active_panes.contains_key(&pane_start) + && current_wm >= pane_start + agg_state.window_manager.window_size_ms() + { + // The window starting at this pane_start is already closed, + // so this pane was evicted — handle as late data. + let window_start = pane_start; + let window_end = pane_start + agg_state.window_manager.window_size_ms(); + match late_data_policy { + LateDataPolicy::Drop => { + debug!( + "Dropping late sample for evicted pane [{}, {})", + pane_start, pane_end + ); + continue; + } + LateDataPolicy::ForwardToStore => { + let mut updater = create_accumulator_updater(&agg_state.config); + if updater.is_keyed() { + let key = + extract_key_from_series(series_key, &agg_state.config); + updater.update_keyed(&key, val, ts); + } else { + updater.update_single(val, ts); } + let key = if updater.is_keyed() { + Some(extract_key_from_series(series_key, &agg_state.config)) + } else { + None + }; + let output = PrecomputedOutput::new( + window_start as u64, + window_end as u64, + key, + agg_state.config.aggregation_id, + ); + emit_batch.push((output, updater.take_accumulator())); + debug!( + "Forwarding late sample to store for evicted pane [{}, {})", + pane_start, pane_end + ); + continue; } } + } - // Normal path: window is still open (or newly opened) - let updater = agg_state - .active_windows - .entry(window_start) - .or_insert_with(|| create_accumulator_updater(&agg_state.config)); - - if updater.is_keyed() { - let key = extract_key_from_series(series_key, &agg_state.config); - updater.update_keyed(&key, val, ts); - } else { - updater.update_single(val, ts); - } + // Normal path: route sample to its single pane + let updater = agg_state + .active_panes + .entry(pane_start) + .or_insert_with(|| create_accumulator_updater(&agg_state.config)); + + if updater.is_keyed() { + let key = extract_key_from_series(series_key, &agg_state.config); + updater.update_keyed(&key, val, ts); + } else { + updater.update_single(val, ts); } } - // Emit closed windows + // Emit closed windows by merging their constituent panes for window_start in &closed { - if let Some(mut updater) = agg_state.active_windows.remove(window_start) { - let (_, window_end) = agg_state.window_manager.window_bounds(*window_start); - - let key = if updater.is_keyed() { - Some(extract_key_from_series(series_key, &agg_state.config)) + let (_, window_end) = agg_state.window_manager.window_bounds(*window_start); + let pane_starts = agg_state.window_manager.panes_for_window(*window_start); + + // Merge pane accumulators for this window. + // - Oldest pane (index 0): take_accumulator + remove (no future window needs it) + // - Remaining panes: snapshot_accumulator (shared with newer windows) + let mut merged: Option> = None; + + for (i, &ps) in pane_starts.iter().enumerate() { + let pane_acc = if i == 0 { + // Oldest pane: destructive take + evict from active_panes + agg_state + .active_panes + .remove(&ps) + .map(|mut updater| updater.take_accumulator()) } else { - None + // Shared pane: non-destructive snapshot + agg_state + .active_panes + .get(&ps) + .map(|updater| updater.snapshot_accumulator()) + }; + + if let Some(acc) = pane_acc { + merged = Some(match merged { + None => acc, + Some(existing) => existing.merge_with(acc.as_ref()).unwrap_or(existing), + }); + } + } + + if let Some(accumulator) = merged { + let key = { + // Check keyed-ness from accumulator type name or config + let test_updater = create_accumulator_updater(&agg_state.config); + if test_updater.is_keyed() { + Some(extract_key_from_series(series_key, &agg_state.config)) + } else { + None + } }; let output = PrecomputedOutput::new( @@ -308,7 +349,6 @@ impl Worker { agg_state.config.aggregation_id, ); - let accumulator = updater.take_accumulator(); emit_batch.push((output, accumulator)); } } @@ -377,13 +417,42 @@ impl Worker { .closed_windows(previous_wm, current_wm); for window_start in &closed { - if let Some(mut updater) = agg_state.active_windows.remove(window_start) { - let (_, window_end) = agg_state.window_manager.window_bounds(*window_start); + let (_, window_end) = agg_state.window_manager.window_bounds(*window_start); + let pane_starts = agg_state.window_manager.panes_for_window(*window_start); - let key = if updater.is_keyed() { - Some(extract_key_from_series(series_key, &agg_state.config)) + let mut merged: Option> = None; + + for (i, &ps) in pane_starts.iter().enumerate() { + let pane_acc = if i == 0 { + agg_state + .active_panes + .remove(&ps) + .map(|mut updater| updater.take_accumulator()) } else { - None + agg_state + .active_panes + .get(&ps) + .map(|updater| updater.snapshot_accumulator()) + }; + + if let Some(acc) = pane_acc { + merged = Some(match merged { + None => acc, + Some(existing) => { + existing.merge_with(acc.as_ref()).unwrap_or(existing) + } + }); + } + } + + if let Some(accumulator) = merged { + let key = { + let test_updater = create_accumulator_updater(&agg_state.config); + if test_updater.is_keyed() { + Some(extract_key_from_series(series_key, &agg_state.config)) + } else { + None + } }; let output = PrecomputedOutput::new( @@ -393,7 +462,6 @@ impl Worker { agg_state.config.aggregation_id, ); - let accumulator = updater.take_accumulator(); emit_batch.push((output, accumulator)); } } From e83a8d8c496bf5086a32f11069fe7232cc90fb0f Mon Sep 17 00:00:00 2001 From: zz_y Date: Wed, 1 Apr 2026 16:41:49 -0500 Subject: [PATCH 19/19] style: apply cargo fmt Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/precompute_engine/window_manager.rs | 12 +++--------- asap-query-engine/src/precompute_engine/worker.rs | 3 +-- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/asap-query-engine/src/precompute_engine/window_manager.rs b/asap-query-engine/src/precompute_engine/window_manager.rs index 14b4d1e..841c182 100644 --- a/asap-query-engine/src/precompute_engine/window_manager.rs +++ b/asap-query-engine/src/precompute_engine/window_manager.rs @@ -233,14 +233,8 @@ mod tests { let wm = WindowManager::new(30, 10); assert_eq!(wm.panes_for_window(0), vec![0, 10_000, 20_000]); - assert_eq!( - wm.panes_for_window(10_000), - vec![10_000, 20_000, 30_000] - ); - assert_eq!( - wm.panes_for_window(20_000), - vec![20_000, 30_000, 40_000] - ); + assert_eq!(wm.panes_for_window(10_000), vec![10_000, 20_000, 30_000]); + assert_eq!(wm.panes_for_window(20_000), vec![20_000, 30_000, 40_000]); } #[test] @@ -279,7 +273,7 @@ mod tests { // 30s window, 10s slide — consecutive windows share W-1 = 2 panes let wm = WindowManager::new(30, 10); - let panes_a = wm.panes_for_window(0); // [0, 10_000, 20_000] + let panes_a = wm.panes_for_window(0); // [0, 10_000, 20_000] let panes_b = wm.panes_for_window(10_000); // [10_000, 20_000, 30_000] // Shared panes: 10_000 and 20_000 diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index 8e8c15b..93ac194 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -257,8 +257,7 @@ impl Worker { LateDataPolicy::ForwardToStore => { let mut updater = create_accumulator_updater(&agg_state.config); if updater.is_keyed() { - let key = - extract_key_from_series(series_key, &agg_state.config); + let key = extract_key_from_series(series_key, &agg_state.config); updater.update_keyed(&key, val, ts); } else { updater.update_single(val, ts);