diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index a8235d0..1cb3fe1 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -275,7 +275,7 @@ async fn main() -> Result<(), Box> { // Pick aggregation_id = 1 to match the existing streaming config. let raw_agg_id: u64 = 1; let raw_engine_config = PrecomputeEngineConfig { - num_workers: 1, + num_workers: 4, ingest_port: RAW_INGEST_PORT, allowed_lateness_ms: 5000, max_buffer_per_series: 10000, @@ -334,6 +334,138 @@ async fn main() -> Result<(), Box> { } println!(" Raw mode test PASSED"); + // ----------------------------------------------------------------------- + // BATCH LATENCY TEST + // Send 1000 samples in a single HTTP request to measure realistic e2e + // latency. Uses the raw-mode engine to avoid window-close dependencies. + // ----------------------------------------------------------------------- + println!("\n=== Batch latency test: 1000 samples in one request ==="); + + // Build a single WriteRequest with 1000 TimeSeries entries spread across + // 10 series × 100 timestamps each, so routing fans out to workers. + let mut batch_timeseries = Vec::with_capacity(1000); + for series_idx in 0..10 { + let label_val = format!("batch_{series_idx}"); + for t in 0..100 { + let ts = 200_000 + series_idx * 1000 + t; // unique ts per sample + let val = (series_idx * 100 + t) as f64; + batch_timeseries.push(make_sample("fake_metric", &label_val, ts, val)); + } + } + let batch_body = build_remote_write_body(batch_timeseries); + println!( + " Payload size: {} bytes (snappy-compressed)", + batch_body.len() + ); + + let t0 = std::time::Instant::now(); + let resp = client + .post(format!("http://localhost:{RAW_INGEST_PORT}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(batch_body) + .send() + .await?; + let client_rtt = t0.elapsed(); + println!( + " HTTP response: {} in {:.3}ms", + resp.status().as_u16(), + client_rtt.as_secs_f64() * 1000.0, + ); + + // Wait for all workers to finish processing + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + // Verify samples landed in the store + let batch_results = + store.query_precomputed_output("fake_metric", raw_agg_id, 200_000, 210_000)?; + let batch_buckets: usize = batch_results.values().map(|v| v.len()).sum(); + println!(" Stored {batch_buckets} buckets from batch (expected 1000)"); + assert!( + batch_buckets >= 1000, + "Expected at least 1000 raw samples in store, got {batch_buckets}" + ); + println!(" Batch latency test PASSED"); + + // ----------------------------------------------------------------------- + // THROUGHPUT TEST + // Send many requests back-to-back and measure sustained throughput + // (samples/sec). Uses the raw-mode engine for a clean measurement. + // ----------------------------------------------------------------------- + println!("\n=== Throughput test: 1000 requests × 10000 samples ==="); + + let num_requests = 1000u64; + let samples_per_request = 10_000u64; + let total_samples = num_requests * samples_per_request; + + // Pre-build all request bodies so serialization doesn't count against throughput + let mut bodies = Vec::with_capacity(num_requests as usize); + for req_idx in 0..num_requests { + let mut timeseries = Vec::with_capacity(samples_per_request as usize); + for s in 0..samples_per_request { + let series_label = format!("tp_{}", s % 50); // 50 distinct series + let ts = 300_000 + req_idx as i64 * 10_000 + s as i64; + timeseries.push(make_sample("fake_metric", &series_label, ts, s as f64)); + } + bodies.push(build_remote_write_body(timeseries)); + } + + let throughput_start = std::time::Instant::now(); + + for (i, body) in bodies.into_iter().enumerate() { + let resp = client + .post(format!("http://localhost:{RAW_INGEST_PORT}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(body) + .send() + .await?; + if resp.status() != reqwest::StatusCode::NO_CONTENT { + eprintln!(" Request {i} failed: {}", resp.status()); + } + } + + let send_elapsed = throughput_start.elapsed(); + println!( + " All {} requests sent in {:.1}ms", + num_requests, + send_elapsed.as_secs_f64() * 1000.0, + ); + + // Poll until workers drain or timeout after 60s + let max_ts = 300_000u64 + num_requests * 10_000 + samples_per_request; + let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(60); + let mut tp_buckets: usize; + loop { + let tp_results = + store.query_precomputed_output("fake_metric", raw_agg_id, 300_000, max_ts)?; + tp_buckets = tp_results.values().map(|v| v.len()).sum(); + if tp_buckets as u64 >= total_samples || std::time::Instant::now() > drain_deadline { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + } + let total_elapsed = throughput_start.elapsed(); + + let send_throughput = total_samples as f64 / send_elapsed.as_secs_f64(); + let e2e_throughput = tp_buckets as f64 / total_elapsed.as_secs_f64(); + println!(" Stored {tp_buckets}/{total_samples} samples"); + println!( + " Send throughput: {:.0} samples/sec ({:.1}ms for {total_samples} samples)", + send_throughput, + send_elapsed.as_secs_f64() * 1000.0, + ); + println!( + " E2E throughput: {:.0} samples/sec ({:.1}ms until all stored)", + e2e_throughput, + total_elapsed.as_secs_f64() * 1000.0, + ); + assert!( + tp_buckets as u64 >= total_samples, + "Expected at least {total_samples} samples in store, got {tp_buckets}" + ); + println!(" Throughput test PASSED"); + println!("\n=== E2E test complete ==="); Ok(()) diff --git a/asap-query-engine/src/precompute_engine/mod.rs b/asap-query-engine/src/precompute_engine/mod.rs index c7fe7e9..1f3f1df 100644 --- a/asap-query-engine/src/precompute_engine/mod.rs +++ b/asap-query-engine/src/precompute_engine/mod.rs @@ -167,16 +167,20 @@ async fn handle_ingest(State(state): State>, body: Bytes) -> St .push((s.timestamp_ms, s.value)); } - // Route each series batch to the correct worker - for (series_key, batch) in by_series { - if let Err(e) = state - .router - .route(series_key, batch, ingest_received_at) - .await - { - warn!("Routing error for {}: {}", series_key, e); - return StatusCode::INTERNAL_SERVER_ERROR; - } + // Convert to owned keys for batch routing + let by_series_owned: HashMap> = by_series + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .collect(); + + // Route all series to workers concurrently + if let Err(e) = state + .router + .route_batch(by_series_owned, ingest_received_at) + .await + { + warn!("Batch routing error: {}", e); + return StatusCode::INTERNAL_SERVER_ERROR; } StatusCode::NO_CONTENT diff --git a/asap-query-engine/src/precompute_engine/series_router.rs b/asap-query-engine/src/precompute_engine/series_router.rs index 1f8533c..94d757a 100644 --- a/asap-query-engine/src/precompute_engine/series_router.rs +++ b/asap-query-engine/src/precompute_engine/series_router.rs @@ -1,3 +1,5 @@ +use futures::future::try_join_all; +use std::collections::HashMap; use std::time::Instant; use tokio::sync::mpsc; use xxhash_rust::xxh64::xxh64; @@ -52,6 +54,50 @@ impl SeriesRouter { Ok(()) } + /// Route a pre-grouped batch of series to workers concurrently. + /// + /// Groups messages by target worker, then sends to each worker in parallel + /// (messages within a single worker are sent sequentially to preserve ordering). + pub async fn route_batch( + &self, + by_series: HashMap>, + ingest_received_at: Instant, + ) -> Result<(), Box> { + // Group messages by target worker index + let mut per_worker: HashMap> = HashMap::new(); + for (series_key, samples) in by_series { + let worker_idx = self.worker_for(&series_key); + per_worker + .entry(worker_idx) + .or_default() + .push(WorkerMessage::Samples { + series_key, + samples, + ingest_received_at, + }); + } + + // Send to each worker concurrently + try_join_all(per_worker.into_iter().map(|(worker_idx, messages)| { + let sender = &self.senders[worker_idx]; + async move { + for msg in messages { + sender + .send(msg) + .await + .map_err(|e| format!("Failed to send to worker {}: {}", worker_idx, e))?; + } + Ok::<(), String>(()) + } + })) + .await + .map_err(|e| -> Box { + Box::new(std::io::Error::other(e)) + })?; + + Ok(()) + } + /// Broadcast a flush signal to all workers. pub async fn broadcast_flush(&self) -> Result<(), Box> { for (i, sender) in self.senders.iter().enumerate() { diff --git a/asap-query-engine/src/precompute_engine/window_manager.rs b/asap-query-engine/src/precompute_engine/window_manager.rs index 4d329da..d8bf23b 100644 --- a/asap-query-engine/src/precompute_engine/window_manager.rs +++ b/asap-query-engine/src/precompute_engine/window_manager.rs @@ -72,6 +72,20 @@ impl WindowManager { closed } + /// Return all window starts whose window `[start, start + window_size_ms)` + /// contains the given timestamp. For tumbling windows this returns exactly + /// one start; for sliding windows it returns `ceil(window_size / slide)` + /// starts. + pub fn window_starts_containing(&self, timestamp_ms: i64) -> Vec { + let mut starts = Vec::new(); + let mut start = self.window_start_for(timestamp_ms); + while start + self.window_size_ms > timestamp_ms { + starts.push(start); + start -= self.slide_interval_ms; + } + starts + } + /// Return the window `[start, end)` boundaries for a given window start. pub fn window_bounds(&self, window_start: i64) -> (i64, i64) { (window_start, window_start + self.window_size_ms) @@ -150,4 +164,34 @@ mod tests { let closed = wm.closed_windows(15_000, 35_000); assert_eq!(closed, vec![0]); } + + #[test] + fn test_window_starts_containing_tumbling() { + // 60s tumbling windows — each sample belongs to exactly one window + let wm = WindowManager::new(60, 0); + let mut starts = wm.window_starts_containing(15_000); + starts.sort(); + assert_eq!(starts, vec![0]); + + let mut starts = wm.window_starts_containing(60_000); + starts.sort(); + assert_eq!(starts, vec![60_000]); + } + + #[test] + fn test_window_starts_containing_sliding() { + // 30s window, 10s slide — each sample belongs to 3 windows + let wm = WindowManager::new(30, 10); + + // t=15_000 belongs to [0, 30_000), [10_000, 40_000) + // and [-10_000, 20_000) which starts negative — still returned + let mut starts = wm.window_starts_containing(15_000); + starts.sort(); + assert_eq!(starts, vec![-10_000, 0, 10_000]); + + // t=30_000 belongs to [10_000, 40_000), [20_000, 50_000), [30_000, 60_000) + let mut starts = wm.window_starts_containing(30_000); + starts.sort(); + assert_eq!(starts, vec![10_000, 20_000, 30_000]); + } } diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index fac2d23..05c0cd2 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -221,18 +221,20 @@ impl Worker { continue; // already dropped } - let window_start = agg_state.window_manager.window_start_for(ts); - - let updater = agg_state - .active_windows - .entry(window_start) - .or_insert_with(|| create_accumulator_updater(&agg_state.config)); - - if updater.is_keyed() { - let key = extract_key_from_series(series_key, &agg_state.config); - updater.update_keyed(&key, val, ts); - } else { - updater.update_single(val, ts); + let window_starts = agg_state.window_manager.window_starts_containing(ts); + + for window_start in window_starts { + let updater = agg_state + .active_windows + .entry(window_start) + .or_insert_with(|| create_accumulator_updater(&agg_state.config)); + + if updater.is_keyed() { + let key = extract_key_from_series(series_key, &agg_state.config); + updater.update_keyed(&key, val, ts); + } else { + updater.update_single(val, ts); + } } }