From 30151a862db5eb71fdafe128812b214d05f22b4b Mon Sep 17 00:00:00 2001 From: zz_y Date: Sat, 21 Feb 2026 15:23:09 -0700 Subject: [PATCH 01/10] Add standalone precompute engine to replace Arroyo streaming pipeline Implements a single-node multi-threaded precompute engine as a new module and binary target within QueryEngineRust. The engine ingests Prometheus remote write samples, buffers them per-series with out-of-order handling, detects closed tumbling/sliding windows via event-time watermarks, feeds samples into accumulator wrappers for all existing sketch types, and emits PrecomputedOutput directly to the store. New modules: config, series_buffer, window_manager, accumulator_factory, series_router, worker, output_sink, and the PrecomputeEngine orchestrator. The binary supports embedded store + query HTTP server for single-process deployment. Co-Authored-By: Claude Opus 4.6 --- asap-query-engine/Cargo.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index a5fca27..4373bfe 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -67,6 +67,10 @@ path = "src/bin/precompute_engine.rs" name = "test_e2e_precompute" path = "src/bin/test_e2e_precompute.rs" +[[bin]] +name = "precompute_engine" +path = "src/bin/precompute_engine.rs" + [dev-dependencies] ctor = "0.2" tempfile = "3.20.0" From 19c0f84933476e4949fec07592f38f0784b34e51 Mon Sep 17 00:00:00 2001 From: zz_y Date: Mon, 23 Feb 2026 12:14:53 -0700 Subject: [PATCH 02/10] Wire up DatasketchesKLL in accumulator factory and add E2E test Handle top-level aggregation types (DatasketchesKLL, Sum, Min, Max, etc.) directly in the factory match, fixing the fallback to Sum that broke quantile queries. Also preserve the K parameter in KllAccumulatorUpdater::reset() instead of hardcoding 200. Add test_e2e_precompute binary that validates the full ingest -> precompute -> store -> query pipeline end-to-end. Co-Authored-By: Claude Opus 4.6 --- asap-query-engine/Cargo.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index 4373bfe..2cb7ec3 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -71,6 +71,10 @@ path = "src/bin/test_e2e_precompute.rs" name = "precompute_engine" path = "src/bin/precompute_engine.rs" +[[bin]] +name = "test_e2e_precompute" +path = "src/bin/test_e2e_precompute.rs" + [dev-dependencies] ctor = "0.2" tempfile = "3.20.0" From c822c8b04ec8d1533100e46f0969bd9af89ebe97 Mon Sep 17 00:00:00 2001 From: zz_y Date: Mon, 23 Feb 2026 14:26:27 -0700 Subject: [PATCH 03/10] Add 1000-sample batch latency test to E2E precompute test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sends 10 series × 100 samples in a single HTTP request to the raw-mode engine and verifies all 1000 samples land in the store. Prints client RTT and per-series e2e_latency_us at debug level. Co-Authored-By: Claude Opus 4.6 --- .../src/bin/test_e2e_precompute.rs | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index a8235d0..9fa350c 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -334,6 +334,60 @@ async fn main() -> Result<(), Box> { } println!(" Raw mode test PASSED"); + // ----------------------------------------------------------------------- + // BATCH LATENCY TEST + // Send 1000 samples in a single HTTP request to measure realistic e2e + // latency. Uses the raw-mode engine to avoid window-close dependencies. + // ----------------------------------------------------------------------- + println!("\n=== Batch latency test: 1000 samples in one request ==="); + + // Build a single WriteRequest with 1000 TimeSeries entries spread across + // 10 series × 100 timestamps each, so routing fans out to workers. + let mut batch_timeseries = Vec::with_capacity(1000); + for series_idx in 0..10 { + let label_val = format!("batch_{series_idx}"); + for t in 0..100 { + let ts = 200_000 + (series_idx as i64) * 1000 + t; // unique ts per sample + let val = (series_idx * 100 + t) as f64; + batch_timeseries.push(make_sample("fake_metric", &label_val, ts, val)); + } + } + let batch_body = build_remote_write_body(batch_timeseries); + println!(" Payload size: {} bytes (snappy-compressed)", batch_body.len()); + + let t0 = std::time::Instant::now(); + let resp = client + .post(format!("http://localhost:{RAW_INGEST_PORT}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(batch_body) + .send() + .await?; + let client_rtt = t0.elapsed(); + println!( + " HTTP response: {} in {:.3}ms", + resp.status().as_u16(), + client_rtt.as_secs_f64() * 1000.0, + ); + + // Wait for all workers to finish processing + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + // Verify samples landed in the store + let batch_results = store.query_precomputed_output( + "fake_metric", + raw_agg_id, + 200_000, + 210_000, + )?; + let batch_buckets: usize = batch_results.values().map(|v| v.len()).sum(); + println!(" Stored {batch_buckets} buckets from batch (expected 1000)"); + assert!( + batch_buckets >= 1000, + "Expected at least 1000 raw samples in store, got {batch_buckets}" + ); + println!(" Batch latency test PASSED"); + println!("\n=== E2E test complete ==="); Ok(()) From eedc6ce7d762a9b002182d03f7715d292336e6ef Mon Sep 17 00:00:00 2001 From: zz_y Date: Mon, 23 Feb 2026 14:35:18 -0700 Subject: [PATCH 04/10] Add 10M-sample throughput test to E2E precompute test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sends 1000 requests × 10000 samples (50 distinct series) to the raw-mode engine and polls until all samples are stored. Reports both send throughput and e2e throughput including drain time. Co-Authored-By: Claude Opus 4.6 --- .../src/bin/test_e2e_precompute.rs | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index 9fa350c..be1e8bc 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -388,6 +388,89 @@ async fn main() -> Result<(), Box> { ); println!(" Batch latency test PASSED"); + // ----------------------------------------------------------------------- + // THROUGHPUT TEST + // Send many requests back-to-back and measure sustained throughput + // (samples/sec). Uses the raw-mode engine for a clean measurement. + // ----------------------------------------------------------------------- + println!("\n=== Throughput test: 1000 requests × 10000 samples ==="); + + let num_requests = 1000u64; + let samples_per_request = 10_000u64; + let total_samples = num_requests * samples_per_request; + + // Pre-build all request bodies so serialization doesn't count against throughput + let mut bodies = Vec::with_capacity(num_requests as usize); + for req_idx in 0..num_requests { + let mut timeseries = Vec::with_capacity(samples_per_request as usize); + for s in 0..samples_per_request { + let series_label = format!("tp_{}", s % 50); // 50 distinct series + let ts = 300_000 + req_idx as i64 * 10_000 + s as i64; + timeseries.push(make_sample("fake_metric", &series_label, ts, s as f64)); + } + bodies.push(build_remote_write_body(timeseries)); + } + + let throughput_start = std::time::Instant::now(); + + for (i, body) in bodies.into_iter().enumerate() { + let resp = client + .post(format!("http://localhost:{RAW_INGEST_PORT}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(body) + .send() + .await?; + if resp.status() != reqwest::StatusCode::NO_CONTENT { + eprintln!(" Request {i} failed: {}", resp.status()); + } + } + + let send_elapsed = throughput_start.elapsed(); + println!( + " All {} requests sent in {:.1}ms", + num_requests, + send_elapsed.as_secs_f64() * 1000.0, + ); + + // Poll until workers drain or timeout after 60s + let max_ts = 300_000u64 + num_requests * 10_000 + samples_per_request; + let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(60); + let mut tp_buckets: usize; + loop { + let tp_results = store.query_precomputed_output( + "fake_metric", + raw_agg_id, + 300_000, + max_ts, + )?; + tp_buckets = tp_results.values().map(|v| v.len()).sum(); + if tp_buckets as u64 >= total_samples || std::time::Instant::now() > drain_deadline { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + } + let total_elapsed = throughput_start.elapsed(); + + let send_throughput = total_samples as f64 / send_elapsed.as_secs_f64(); + let e2e_throughput = tp_buckets as f64 / total_elapsed.as_secs_f64(); + println!(" Stored {tp_buckets}/{total_samples} samples"); + println!( + " Send throughput: {:.0} samples/sec ({:.1}ms for {total_samples} samples)", + send_throughput, + send_elapsed.as_secs_f64() * 1000.0, + ); + println!( + " E2E throughput: {:.0} samples/sec ({:.1}ms until all stored)", + e2e_throughput, + total_elapsed.as_secs_f64() * 1000.0, + ); + assert!( + tp_buckets as u64 >= total_samples, + "Expected at least {total_samples} samples in store, got {tp_buckets}" + ); + println!(" Throughput test PASSED"); + println!("\n=== E2E test complete ==="); Ok(()) From b749ac3e44837945fc5b46919ac456f8d5618882 Mon Sep 17 00:00:00 2001 From: zz_y Date: Mon, 23 Feb 2026 17:42:19 -0700 Subject: [PATCH 05/10] update --- .../src/bin/test_e2e_precompute.rs | 2 +- .../src/precompute_engine/mod.rs | 33 ++++++++------ .../src/precompute_engine/series_router.rs | 43 +++++++++++++++++++ 3 files changed, 63 insertions(+), 15 deletions(-) diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index be1e8bc..38c2cd2 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -275,7 +275,7 @@ async fn main() -> Result<(), Box> { // Pick aggregation_id = 1 to match the existing streaming config. let raw_agg_id: u64 = 1; let raw_engine_config = PrecomputeEngineConfig { - num_workers: 1, + num_workers: 4, ingest_port: RAW_INGEST_PORT, allowed_lateness_ms: 5000, max_buffer_per_series: 10000, diff --git a/asap-query-engine/src/precompute_engine/mod.rs b/asap-query-engine/src/precompute_engine/mod.rs index c7fe7e9..5471618 100644 --- a/asap-query-engine/src/precompute_engine/mod.rs +++ b/asap-query-engine/src/precompute_engine/mod.rs @@ -15,9 +15,9 @@ use crate::precompute_engine::worker::Worker; use axum::{body::Bytes, extract::State, http::StatusCode, routing::post, Router}; use std::collections::HashMap; use std::sync::Arc; -use std::time::Instant; use tokio::net::TcpListener; use tokio::sync::mpsc; +use std::time::Instant; use tracing::{debug_span, info, warn, Instrument}; /// Shared state for the ingest HTTP handler. @@ -67,8 +67,10 @@ impl PrecomputeEngine { let router = SeriesRouter::new(senders); // Build aggregation config map from streaming config - let agg_configs: HashMap = - self.streaming_config.get_all_aggregation_configs().clone(); + let agg_configs: HashMap = self + .streaming_config + .get_all_aggregation_configs() + .clone(); // Spawn workers let mut worker_handles = Vec::with_capacity(num_workers); @@ -136,7 +138,10 @@ impl PrecomputeEngine { } /// Axum handler for Prometheus remote write. -async fn handle_ingest(State(state): State>, body: Bytes) -> StatusCode { +async fn handle_ingest( + State(state): State>, + body: Bytes, +) -> StatusCode { let ingest_span = debug_span!("ingest", body_len = body.len()); let ingest_received_at = Instant::now(); @@ -167,16 +172,16 @@ async fn handle_ingest(State(state): State>, body: Bytes) -> St .push((s.timestamp_ms, s.value)); } - // Route each series batch to the correct worker - for (series_key, batch) in by_series { - if let Err(e) = state - .router - .route(series_key, batch, ingest_received_at) - .await - { - warn!("Routing error for {}: {}", series_key, e); - return StatusCode::INTERNAL_SERVER_ERROR; - } + // Convert to owned keys for batch routing + let by_series_owned: HashMap> = by_series + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .collect(); + + // Route all series to workers concurrently + if let Err(e) = state.router.route_batch(by_series_owned, ingest_received_at).await { + warn!("Batch routing error: {}", e); + return StatusCode::INTERNAL_SERVER_ERROR; } StatusCode::NO_CONTENT diff --git a/asap-query-engine/src/precompute_engine/series_router.rs b/asap-query-engine/src/precompute_engine/series_router.rs index 1f8533c..dc6c642 100644 --- a/asap-query-engine/src/precompute_engine/series_router.rs +++ b/asap-query-engine/src/precompute_engine/series_router.rs @@ -1,4 +1,6 @@ +use std::collections::HashMap; use std::time::Instant; +use futures::future::try_join_all; use tokio::sync::mpsc; use xxhash_rust::xxh64::xxh64; @@ -52,6 +54,47 @@ impl SeriesRouter { Ok(()) } + /// Route a pre-grouped batch of series to workers concurrently. + /// + /// Groups messages by target worker, then sends to each worker in parallel + /// (messages within a single worker are sent sequentially to preserve ordering). + pub async fn route_batch( + &self, + by_series: HashMap>, + ingest_received_at: Instant, + ) -> Result<(), Box> { + // Group messages by target worker index + let mut per_worker: HashMap> = HashMap::new(); + for (series_key, samples) in by_series { + let worker_idx = self.worker_for(&series_key); + per_worker + .entry(worker_idx) + .or_default() + .push(WorkerMessage::Samples { + series_key, + samples, + ingest_received_at, + }); + } + + // Send to each worker concurrently + try_join_all(per_worker.into_iter().map(|(worker_idx, messages)| { + let sender = &self.senders[worker_idx]; + async move { + for msg in messages { + sender.send(msg).await.map_err(|e| { + format!("Failed to send to worker {}: {}", worker_idx, e) + })?; + } + Ok::<(), String>(()) + } + })) + .await + .map_err(|e| -> Box { Box::new(std::io::Error::new(std::io::ErrorKind::Other, e)) })?; + + Ok(()) + } + /// Broadcast a flush signal to all workers. pub async fn broadcast_flush(&self) -> Result<(), Box> { for (i, sender) in self.senders.iter().enumerate() { From 685655ebf28097e51f073aa444f7863a36d08883 Mon Sep 17 00:00:00 2001 From: zz_y Date: Tue, 24 Feb 2026 12:25:22 -0700 Subject: [PATCH 06/10] Fix sliding window aggregation: feed samples into all overlapping windows Previously each sample was assigned to only one window via window_start_for(), which is incorrect for sliding windows where window_size > slide_interval. Added window_starts_containing() that returns all window starts whose range covers the timestamp, and use it in the worker aggregation loop. Co-Authored-By: Claude Opus 4.6 --- .../src/precompute_engine/window_manager.rs | 44 +++++++++++++++++++ .../src/precompute_engine/worker.rs | 26 ++++++----- 2 files changed, 58 insertions(+), 12 deletions(-) diff --git a/asap-query-engine/src/precompute_engine/window_manager.rs b/asap-query-engine/src/precompute_engine/window_manager.rs index 4d329da..d8bf23b 100644 --- a/asap-query-engine/src/precompute_engine/window_manager.rs +++ b/asap-query-engine/src/precompute_engine/window_manager.rs @@ -72,6 +72,20 @@ impl WindowManager { closed } + /// Return all window starts whose window `[start, start + window_size_ms)` + /// contains the given timestamp. For tumbling windows this returns exactly + /// one start; for sliding windows it returns `ceil(window_size / slide)` + /// starts. + pub fn window_starts_containing(&self, timestamp_ms: i64) -> Vec { + let mut starts = Vec::new(); + let mut start = self.window_start_for(timestamp_ms); + while start + self.window_size_ms > timestamp_ms { + starts.push(start); + start -= self.slide_interval_ms; + } + starts + } + /// Return the window `[start, end)` boundaries for a given window start. pub fn window_bounds(&self, window_start: i64) -> (i64, i64) { (window_start, window_start + self.window_size_ms) @@ -150,4 +164,34 @@ mod tests { let closed = wm.closed_windows(15_000, 35_000); assert_eq!(closed, vec![0]); } + + #[test] + fn test_window_starts_containing_tumbling() { + // 60s tumbling windows — each sample belongs to exactly one window + let wm = WindowManager::new(60, 0); + let mut starts = wm.window_starts_containing(15_000); + starts.sort(); + assert_eq!(starts, vec![0]); + + let mut starts = wm.window_starts_containing(60_000); + starts.sort(); + assert_eq!(starts, vec![60_000]); + } + + #[test] + fn test_window_starts_containing_sliding() { + // 30s window, 10s slide — each sample belongs to 3 windows + let wm = WindowManager::new(30, 10); + + // t=15_000 belongs to [0, 30_000), [10_000, 40_000) + // and [-10_000, 20_000) which starts negative — still returned + let mut starts = wm.window_starts_containing(15_000); + starts.sort(); + assert_eq!(starts, vec![-10_000, 0, 10_000]); + + // t=30_000 belongs to [10_000, 40_000), [20_000, 50_000), [30_000, 60_000) + let mut starts = wm.window_starts_containing(30_000); + starts.sort(); + assert_eq!(starts, vec![10_000, 20_000, 30_000]); + } } diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index fac2d23..05c0cd2 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -221,18 +221,20 @@ impl Worker { continue; // already dropped } - let window_start = agg_state.window_manager.window_start_for(ts); - - let updater = agg_state - .active_windows - .entry(window_start) - .or_insert_with(|| create_accumulator_updater(&agg_state.config)); - - if updater.is_keyed() { - let key = extract_key_from_series(series_key, &agg_state.config); - updater.update_keyed(&key, val, ts); - } else { - updater.update_single(val, ts); + let window_starts = agg_state.window_manager.window_starts_containing(ts); + + for window_start in window_starts { + let updater = agg_state + .active_windows + .entry(window_start) + .or_insert_with(|| create_accumulator_updater(&agg_state.config)); + + if updater.is_keyed() { + let key = extract_key_from_series(series_key, &agg_state.config); + updater.update_keyed(&key, val, ts); + } else { + updater.update_single(val, ts); + } } } From 8df492799ec10943f817ceacf869c74cdeb0b5b1 Mon Sep 17 00:00:00 2001 From: zz_y Date: Tue, 31 Mar 2026 15:11:27 -0500 Subject: [PATCH 07/10] fix: remove duplicate [[bin]] entries in Cargo.toml from rebase merge Co-Authored-By: Claude Sonnet 4.6 --- asap-query-engine/Cargo.toml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index 2cb7ec3..a5fca27 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -67,14 +67,6 @@ path = "src/bin/precompute_engine.rs" name = "test_e2e_precompute" path = "src/bin/test_e2e_precompute.rs" -[[bin]] -name = "precompute_engine" -path = "src/bin/precompute_engine.rs" - -[[bin]] -name = "test_e2e_precompute" -path = "src/bin/test_e2e_precompute.rs" - [dev-dependencies] ctor = "0.2" tempfile = "3.20.0" From 3ea0d58d1491f22210c6e5f5ae3c671819ecea88 Mon Sep 17 00:00:00 2001 From: zz_y Date: Tue, 31 Mar 2026 15:13:39 -0500 Subject: [PATCH 08/10] style: apply cargo fmt Co-Authored-By: Claude Sonnet 4.6 --- .../src/bin/test_e2e_precompute.rs | 21 +++++++------------ .../src/precompute_engine/mod.rs | 19 ++++++++--------- .../src/precompute_engine/series_router.rs | 13 +++++++----- 3 files changed, 25 insertions(+), 28 deletions(-) diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index 38c2cd2..27eb820 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -353,7 +353,10 @@ async fn main() -> Result<(), Box> { } } let batch_body = build_remote_write_body(batch_timeseries); - println!(" Payload size: {} bytes (snappy-compressed)", batch_body.len()); + println!( + " Payload size: {} bytes (snappy-compressed)", + batch_body.len() + ); let t0 = std::time::Instant::now(); let resp = client @@ -374,12 +377,8 @@ async fn main() -> Result<(), Box> { tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; // Verify samples landed in the store - let batch_results = store.query_precomputed_output( - "fake_metric", - raw_agg_id, - 200_000, - 210_000, - )?; + let batch_results = + store.query_precomputed_output("fake_metric", raw_agg_id, 200_000, 210_000)?; let batch_buckets: usize = batch_results.values().map(|v| v.len()).sum(); println!(" Stored {batch_buckets} buckets from batch (expected 1000)"); assert!( @@ -438,12 +437,8 @@ async fn main() -> Result<(), Box> { let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(60); let mut tp_buckets: usize; loop { - let tp_results = store.query_precomputed_output( - "fake_metric", - raw_agg_id, - 300_000, - max_ts, - )?; + let tp_results = + store.query_precomputed_output("fake_metric", raw_agg_id, 300_000, max_ts)?; tp_buckets = tp_results.values().map(|v| v.len()).sum(); if tp_buckets as u64 >= total_samples || std::time::Instant::now() > drain_deadline { break; diff --git a/asap-query-engine/src/precompute_engine/mod.rs b/asap-query-engine/src/precompute_engine/mod.rs index 5471618..1f3f1df 100644 --- a/asap-query-engine/src/precompute_engine/mod.rs +++ b/asap-query-engine/src/precompute_engine/mod.rs @@ -15,9 +15,9 @@ use crate::precompute_engine::worker::Worker; use axum::{body::Bytes, extract::State, http::StatusCode, routing::post, Router}; use std::collections::HashMap; use std::sync::Arc; +use std::time::Instant; use tokio::net::TcpListener; use tokio::sync::mpsc; -use std::time::Instant; use tracing::{debug_span, info, warn, Instrument}; /// Shared state for the ingest HTTP handler. @@ -67,10 +67,8 @@ impl PrecomputeEngine { let router = SeriesRouter::new(senders); // Build aggregation config map from streaming config - let agg_configs: HashMap = self - .streaming_config - .get_all_aggregation_configs() - .clone(); + let agg_configs: HashMap = + self.streaming_config.get_all_aggregation_configs().clone(); // Spawn workers let mut worker_handles = Vec::with_capacity(num_workers); @@ -138,10 +136,7 @@ impl PrecomputeEngine { } /// Axum handler for Prometheus remote write. -async fn handle_ingest( - State(state): State>, - body: Bytes, -) -> StatusCode { +async fn handle_ingest(State(state): State>, body: Bytes) -> StatusCode { let ingest_span = debug_span!("ingest", body_len = body.len()); let ingest_received_at = Instant::now(); @@ -179,7 +174,11 @@ async fn handle_ingest( .collect(); // Route all series to workers concurrently - if let Err(e) = state.router.route_batch(by_series_owned, ingest_received_at).await { + if let Err(e) = state + .router + .route_batch(by_series_owned, ingest_received_at) + .await + { warn!("Batch routing error: {}", e); return StatusCode::INTERNAL_SERVER_ERROR; } diff --git a/asap-query-engine/src/precompute_engine/series_router.rs b/asap-query-engine/src/precompute_engine/series_router.rs index dc6c642..1cb4fca 100644 --- a/asap-query-engine/src/precompute_engine/series_router.rs +++ b/asap-query-engine/src/precompute_engine/series_router.rs @@ -1,6 +1,6 @@ +use futures::future::try_join_all; use std::collections::HashMap; use std::time::Instant; -use futures::future::try_join_all; use tokio::sync::mpsc; use xxhash_rust::xxh64::xxh64; @@ -82,15 +82,18 @@ impl SeriesRouter { let sender = &self.senders[worker_idx]; async move { for msg in messages { - sender.send(msg).await.map_err(|e| { - format!("Failed to send to worker {}: {}", worker_idx, e) - })?; + sender + .send(msg) + .await + .map_err(|e| format!("Failed to send to worker {}: {}", worker_idx, e))?; } Ok::<(), String>(()) } })) .await - .map_err(|e| -> Box { Box::new(std::io::Error::new(std::io::ErrorKind::Other, e)) })?; + .map_err(|e| -> Box { + Box::new(std::io::Error::new(std::io::ErrorKind::Other, e)) + })?; Ok(()) } From bf4c213e31921d170c707cb0c3a11eb719d197a9 Mon Sep 17 00:00:00 2001 From: zz_y Date: Tue, 31 Mar 2026 15:18:53 -0500 Subject: [PATCH 09/10] fix: use std::io::Error::other per clippy lint Co-Authored-By: Claude Sonnet 4.6 --- asap-query-engine/src/precompute_engine/series_router.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asap-query-engine/src/precompute_engine/series_router.rs b/asap-query-engine/src/precompute_engine/series_router.rs index 1cb4fca..94d757a 100644 --- a/asap-query-engine/src/precompute_engine/series_router.rs +++ b/asap-query-engine/src/precompute_engine/series_router.rs @@ -92,7 +92,7 @@ impl SeriesRouter { })) .await .map_err(|e| -> Box { - Box::new(std::io::Error::new(std::io::ErrorKind::Other, e)) + Box::new(std::io::Error::other(e)) })?; Ok(()) From b7e1b02ed8981054a7f082ebe85915a37efd1744 Mon Sep 17 00:00:00 2001 From: zz_y Date: Tue, 31 Mar 2026 15:20:59 -0500 Subject: [PATCH 10/10] fix: remove unnecessary cast per clippy lint Co-Authored-By: Claude Sonnet 4.6 --- asap-query-engine/src/bin/test_e2e_precompute.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index 27eb820..1cb3fe1 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -347,7 +347,7 @@ async fn main() -> Result<(), Box> { for series_idx in 0..10 { let label_val = format!("batch_{series_idx}"); for t in 0..100 { - let ts = 200_000 + (series_idx as i64) * 1000 + t; // unique ts per sample + let ts = 200_000 + series_idx * 1000 + t; // unique ts per sample let val = (series_idx * 100 + t) as f64; batch_timeseries.push(make_sample("fake_metric", &label_val, ts, val)); }