diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index f6751cb..a0ebab5 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -9,18 +9,22 @@ //! cargo run --bin test_e2e_precompute use prost::Message; -use query_engine_rust::data_model::{LockStrategy, QueryLanguage}; +use query_engine_rust::data_model::{LockStrategy, QueryLanguage, StreamingConfig}; use query_engine_rust::drivers::ingest::prometheus_remote_write::{ Label, Sample, TimeSeries, WriteRequest, }; use query_engine_rust::drivers::query::adapters::AdapterConfig; use query_engine_rust::engines::SimpleEngine; use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig}; -use query_engine_rust::precompute_engine::output_sink::{RawPassthroughSink, StoreOutputSink}; +use query_engine_rust::precompute_engine::output_sink::{ + NoopOutputSink, RawPassthroughSink, StoreOutputSink, +}; use query_engine_rust::precompute_engine::PrecomputeEngine; use query_engine_rust::stores::SimpleMapStore; use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config}; use query_engine_rust::{HttpServer, HttpServerConfig}; +use sketch_db_common::aggregation_config::AggregationConfig; +use std::collections::HashMap; use std::sync::Arc; const INGEST_PORT: u16 = 19090; @@ -394,37 +398,70 @@ async fn main() -> Result<(), Box> { // Send many requests back-to-back and measure sustained throughput // (samples/sec). Uses the raw-mode engine for a clean measurement. // ----------------------------------------------------------------------- - println!("\n=== Throughput test: 1000 requests × 10000 samples ==="); + let num_concurrent_senders = 8usize; + println!("\n=== Throughput test: 1000 requests × 10000 samples ({num_concurrent_senders} concurrent senders) ==="); let num_requests = 1000u64; let samples_per_request = 10_000u64; let total_samples = num_requests * samples_per_request; - // Pre-build all request bodies so serialization doesn't count against throughput + // Pre-build all request bodies in parallel using rayon-style chunking via tokio tasks. + // Each task builds its share of requests, then we flatten the results. + let num_build_tasks = num_concurrent_senders; + let requests_per_task = (num_requests as usize).div_ceil(num_build_tasks); + let mut build_handles = Vec::with_capacity(num_build_tasks); + for task_idx in 0..num_build_tasks { + let start = task_idx * requests_per_task; + let end = ((task_idx + 1) * requests_per_task).min(num_requests as usize); + build_handles.push(tokio::task::spawn_blocking(move || { + let mut chunk = Vec::with_capacity(end - start); + for req_idx in start..end { + let mut timeseries = Vec::with_capacity(samples_per_request as usize); + for s in 0..samples_per_request { + let series_label = format!("tp_{}", s % 50); // 50 distinct series + let ts = 300_000 + req_idx as i64 * 10_000 + s as i64; + timeseries.push(make_sample("fake_metric", &series_label, ts, s as f64)); + } + chunk.push(build_remote_write_body(timeseries)); + } + chunk + })); + } let mut bodies = Vec::with_capacity(num_requests as usize); - for req_idx in 0..num_requests { - let mut timeseries = Vec::with_capacity(samples_per_request as usize); - for s in 0..samples_per_request { - let series_label = format!("tp_{}", s % 50); // 50 distinct series - let ts = 300_000 + req_idx as i64 * 10_000 + s as i64; - timeseries.push(make_sample("fake_metric", &series_label, ts, s as f64)); - } - bodies.push(build_remote_write_body(timeseries)); + for handle in build_handles { + bodies.extend(handle.await?); } let throughput_start = std::time::Instant::now(); + // Send requests using multiple concurrent sender tasks + let mut body_chunks: Vec>> = vec![Vec::new(); num_concurrent_senders]; for (i, body) in bodies.into_iter().enumerate() { - let resp = client - .post(format!("http://localhost:{RAW_INGEST_PORT}/api/v1/write")) - .header("Content-Type", "application/x-protobuf") - .header("Content-Encoding", "snappy") - .body(body) - .send() - .await?; - if resp.status() != reqwest::StatusCode::NO_CONTENT { - eprintln!(" Request {i} failed: {}", resp.status()); - } + body_chunks[i % num_concurrent_senders].push(body); + } + let mut send_handles = Vec::new(); + for chunk in body_chunks { + let client = client.clone(); + let url = format!("http://localhost:{RAW_INGEST_PORT}/api/v1/write"); + send_handles.push(tokio::spawn(async move { + for body in chunk { + let resp = client + .post(&url) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(body) + .send() + .await; + if let Ok(r) = resp { + if r.status() != reqwest::StatusCode::NO_CONTENT { + eprintln!(" request failed: {}", r.status()); + } + } + } + })); + } + for handle in send_handles { + handle.await?; } let send_elapsed = throughput_start.elapsed(); @@ -436,7 +473,7 @@ async fn main() -> Result<(), Box> { // Poll until workers drain or timeout after 60s let max_ts = 300_000u64 + num_requests * 10_000 + samples_per_request; - let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(60); + let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(120); let mut tp_buckets: usize; loop { let tp_results = @@ -468,11 +505,319 @@ async fn main() -> Result<(), Box> { ); println!(" Throughput test PASSED"); + // ----------------------------------------------------------------------- + // WINDOWED AGGREGATION THROUGHPUT BENCHMARKS + // Compare tumbling vs sliding window performance with the pane-based + // engine. Each benchmark spins up its own PrecomputeEngine with a + // NoopOutputSink (to isolate worker throughput from store I/O). + // ----------------------------------------------------------------------- + let bench_results = run_windowed_benchmarks(&client).await?; + println!("\n=== Windowed aggregation benchmark summary ==="); + println!( + " {:<30} {:>12} {:>12} {:>14}", + "Config", "Send (s/s)", "E2E (s/s)", "Latency (ms)" + ); + for r in &bench_results { + println!( + " {:<30} {:>12.0} {:>12.0} {:>14.1}", + r.label, r.send_throughput, r.e2e_throughput, r.batch_latency_ms + ); + } + + // ----------------------------------------------------------------------- + // SCALABILITY TEST + // Measure throughput as a function of worker count (1, 2, 4, 8, 16) + // to verify linear scaling with cores. Uses sliding 30s/10s Sum (W=3) + // with NoopOutputSink. + // ----------------------------------------------------------------------- + let scale_results = run_scalability_benchmark(&client).await?; + println!("\n=== Scalability benchmark summary (Sliding 30s/10s Sum, W=3) ==="); + println!( + " {:<10} {:>12} {:>12} {:>10}", + "Workers", "Send (s/s)", "E2E (s/s)", "Speedup" + ); + let baseline_e2e = scale_results + .first() + .map(|r| r.e2e_throughput) + .unwrap_or(1.0); + for r in &scale_results { + println!( + " {:<10} {:>12.0} {:>12.0} {:>10.2}x", + r.label, + r.send_throughput, + r.e2e_throughput, + r.e2e_throughput / baseline_e2e + ); + } + println!("\n=== E2E test complete ==="); Ok(()) } +// --------------------------------------------------------------------------- +// Windowed aggregation benchmarks +// --------------------------------------------------------------------------- + +struct BenchResult { + label: String, + send_throughput: f64, + e2e_throughput: f64, + batch_latency_ms: f64, +} + +/// Build an AggregationConfig for Sum with specified window parameters. +fn make_sum_agg_config( + agg_id: u64, + window_size_secs: u64, + slide_interval_secs: u64, +) -> AggregationConfig { + let window_type = if slide_interval_secs == 0 || slide_interval_secs == window_size_secs { + "tumbling" + } else { + "sliding" + }; + AggregationConfig::new( + agg_id, + "SingleSubpopulation".to_string(), + "Sum".to_string(), + HashMap::new(), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + String::new(), + window_size_secs, + slide_interval_secs, + window_type.to_string(), + "bench_metric".to_string(), + "bench_metric".to_string(), + None, + None, + None, + None, + ) +} + +/// Run a single windowed benchmark and return the results. +#[allow(clippy::too_many_arguments)] +async fn run_single_bench( + client: &reqwest::Client, + label: &str, + port: u16, + streaming_config: Arc, + num_workers: usize, + num_concurrent_senders: usize, + num_requests: u64, + samples_per_request: u64, + num_series: u64, +) -> Result> { + let total_samples = num_requests * samples_per_request; + + let noop_sink = Arc::new(NoopOutputSink::new()); + let engine_config = PrecomputeEngineConfig { + num_workers, + ingest_port: port, + allowed_lateness_ms: 5000, + max_buffer_per_series: 100_000, + flush_interval_ms: 100, + channel_buffer_size: 50_000, + pass_raw_samples: false, + raw_mode_aggregation_id: 0, + late_data_policy: LateDataPolicy::Drop, + }; + let engine = PrecomputeEngine::new(engine_config, streaming_config, noop_sink.clone()); + tokio::spawn(async move { + if let Err(e) = engine.run().await { + eprintln!("Bench engine error: {e}"); + } + }); + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + // Pre-build request bodies. Timestamps are monotonically increasing + // across requests so windows close naturally as the watermark advances. + let mut bodies = Vec::with_capacity(num_requests as usize); + for req_idx in 0..num_requests { + let mut timeseries = Vec::with_capacity(samples_per_request as usize); + for s in 0..samples_per_request { + let series_label = format!("s_{}", s % num_series); + // Each request advances time by 1000ms (1 second) + let ts = (req_idx as i64) * 1000 + (s as i64 % 1000); + timeseries.push(make_sample("bench_metric", &series_label, ts, s as f64)); + } + bodies.push(build_remote_write_body(timeseries)); + } + + // --- Batch latency: single request --- + let latency_body = bodies[0].clone(); + let t0 = std::time::Instant::now(); + client + .post(format!("http://localhost:{port}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(latency_body) + .send() + .await?; + let batch_latency_ms = t0.elapsed().as_secs_f64() * 1000.0; + + // --- Throughput: all requests with concurrent senders --- + let throughput_start = std::time::Instant::now(); + + // Round-robin distribute request bodies across concurrent sender tasks + let mut body_chunks: Vec>> = vec![Vec::new(); num_concurrent_senders]; + for (i, body) in bodies.into_iter().enumerate() { + body_chunks[i % num_concurrent_senders].push(body); + } + let mut send_handles = Vec::new(); + for chunk in body_chunks { + let client = client.clone(); + let url = format!("http://localhost:{port}/api/v1/write"); + send_handles.push(tokio::spawn(async move { + for body in chunk { + let resp = client + .post(&url) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(body) + .send() + .await; + if let Ok(r) = resp { + if r.status() != reqwest::StatusCode::NO_CONTENT { + eprintln!(" request failed: {}", r.status()); + } + } + } + })); + } + for handle in send_handles { + handle.await?; + } + let send_elapsed = throughput_start.elapsed(); + + // Wait for workers to drain (poll emit_count on noop sink) + let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(30); + loop { + let emitted = noop_sink + .emit_count + .load(std::sync::atomic::Ordering::Relaxed); + if emitted > 0 || std::time::Instant::now() > drain_deadline { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + // Give workers a bit more time to finish in-flight work + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + let total_elapsed = throughput_start.elapsed(); + + let emitted = noop_sink + .emit_count + .load(std::sync::atomic::Ordering::Relaxed); + let send_throughput = total_samples as f64 / send_elapsed.as_secs_f64(); + let e2e_throughput = total_samples as f64 / total_elapsed.as_secs_f64(); + + println!(" {label}:"); + println!( + " Sent {total_samples} samples in {:.1}ms ({:.0} samples/sec)", + send_elapsed.as_secs_f64() * 1000.0, + send_throughput + ); + println!( + " E2E: {:.1}ms ({:.0} samples/sec), emitted {emitted} windows", + total_elapsed.as_secs_f64() * 1000.0, + e2e_throughput + ); + println!(" Batch latency: {batch_latency_ms:.1}ms"); + + Ok(BenchResult { + label: label.to_string(), + send_throughput, + e2e_throughput, + batch_latency_ms, + }) +} + +async fn run_windowed_benchmarks( + client: &reqwest::Client, +) -> Result, Box> { + let num_requests = 200u64; + let samples_per_request = 5_000u64; + let num_series = 50u64; + + let configs: Vec<(&str, u16, u64, u64)> = vec![ + // (label, port, window_size_secs, slide_interval_secs) + ("Tumbling 10s Sum", 19100, 10, 0), + ("Sliding 30s/10s Sum", 19101, 30, 10), + ("Sliding 60s/10s Sum (W=6)", 19102, 60, 10), + ]; + + println!("\n=== Windowed aggregation benchmarks ({num_requests} req × {samples_per_request} samples, {num_series} series) ==="); + + let mut results = Vec::new(); + for (label, port, window_size, slide_interval) in configs { + let agg_config = make_sum_agg_config(100, window_size, slide_interval); + let mut agg_map = HashMap::new(); + agg_map.insert(100u64, agg_config); + let sc = Arc::new(StreamingConfig::new(agg_map)); + + let r = run_single_bench( + client, + label, + port, + sc, + 4, + 4, // concurrent senders to saturate workers + num_requests, + samples_per_request, + num_series, + ) + .await?; + results.push(r); + } + + Ok(results) +} + +async fn run_scalability_benchmark( + client: &reqwest::Client, +) -> Result, Box> { + let num_requests = 200u64; + let samples_per_request = 5_000u64; + let num_series = 100u64; // more series to give workers enough parallel work + let worker_counts: Vec = vec![1, 2, 4, 8, 16]; + let base_port: u16 = 19200; + + println!( + "\n=== Scalability benchmark ({num_requests} req × {samples_per_request} samples, \ + {num_series} series, Sliding 30s/10s Sum) ===" + ); + + let mut results = Vec::new(); + for (i, &num_workers) in worker_counts.iter().enumerate() { + let port = base_port + i as u16; + let label = format!("{num_workers}"); + + let agg_config = make_sum_agg_config(200 + i as u64, 30, 10); + let mut agg_map = HashMap::new(); + agg_map.insert(200 + i as u64, agg_config); + let sc = Arc::new(StreamingConfig::new(agg_map)); + + let r = run_single_bench( + client, + &label, + port, + sc, + num_workers, + num_workers, // concurrent senders match worker count + num_requests, + samples_per_request, + num_series, + ) + .await?; + results.push(r); + } + + Ok(results) +} + fn print_json(s: &str) { match serde_json::from_str::(s) { Ok(v) => println!("{}", serde_json::to_string_pretty(&v).unwrap()), diff --git a/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md b/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md index fd181f6..25bd7a4 100644 --- a/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md +++ b/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md @@ -222,7 +222,6 @@ from `active_panes`. Remaining panes are read non-destructively via When `pass_raw_samples = true`, the entire aggregation pipeline is bypassed. Each sample is emitted as a `SumAccumulator::with_sum(value)` with point-window bounds `[ts, ts]` and the configured `raw_mode_aggregation_id`. - ### 3.5 SeriesBuffer (`series_buffer.rs`) Per-series in-memory buffer backed by `BTreeMap`. @@ -305,7 +304,6 @@ The worker calls `window_starts_containing(ts)` for each incoming sample and fee the value into the accumulator for every matching window. When `closed_windows()` fires, each closed window's accumulator is extracted and emitted independently. - ### 3.7 AccumulatorUpdater (`accumulator_factory.rs`) Trait-based interface for feeding samples into sketch accumulators: