Skip to content
134 changes: 133 additions & 1 deletion asap-query-engine/src/bin/test_e2e_precompute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Pick aggregation_id = 1 to match the existing streaming config.
let raw_agg_id: u64 = 1;
let raw_engine_config = PrecomputeEngineConfig {
num_workers: 1,
num_workers: 4,
ingest_port: RAW_INGEST_PORT,
allowed_lateness_ms: 5000,
max_buffer_per_series: 10000,
Expand Down Expand Up @@ -334,6 +334,138 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}
println!(" Raw mode test PASSED");

// -----------------------------------------------------------------------
// BATCH LATENCY TEST
// Send 1000 samples in a single HTTP request to measure realistic e2e
// latency. Uses the raw-mode engine to avoid window-close dependencies.
// -----------------------------------------------------------------------
println!("\n=== Batch latency test: 1000 samples in one request ===");

// Build a single WriteRequest with 1000 TimeSeries entries spread across
// 10 series × 100 timestamps each, so routing fans out to workers.
let mut batch_timeseries = Vec::with_capacity(1000);
for series_idx in 0..10 {
let label_val = format!("batch_{series_idx}");
for t in 0..100 {
let ts = 200_000 + series_idx * 1000 + t; // unique ts per sample
let val = (series_idx * 100 + t) as f64;
batch_timeseries.push(make_sample("fake_metric", &label_val, ts, val));
}
}
let batch_body = build_remote_write_body(batch_timeseries);
println!(
" Payload size: {} bytes (snappy-compressed)",
batch_body.len()
);

let t0 = std::time::Instant::now();
let resp = client
.post(format!("http://localhost:{RAW_INGEST_PORT}/api/v1/write"))
.header("Content-Type", "application/x-protobuf")
.header("Content-Encoding", "snappy")
.body(batch_body)
.send()
.await?;
let client_rtt = t0.elapsed();
println!(
" HTTP response: {} in {:.3}ms",
resp.status().as_u16(),
client_rtt.as_secs_f64() * 1000.0,
);

// Wait for all workers to finish processing
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;

// Verify samples landed in the store
let batch_results =
store.query_precomputed_output("fake_metric", raw_agg_id, 200_000, 210_000)?;
let batch_buckets: usize = batch_results.values().map(|v| v.len()).sum();
println!(" Stored {batch_buckets} buckets from batch (expected 1000)");
assert!(
batch_buckets >= 1000,
"Expected at least 1000 raw samples in store, got {batch_buckets}"
);
println!(" Batch latency test PASSED");

// -----------------------------------------------------------------------
// THROUGHPUT TEST
// Send many requests back-to-back and measure sustained throughput
// (samples/sec). Uses the raw-mode engine for a clean measurement.
// -----------------------------------------------------------------------
println!("\n=== Throughput test: 1000 requests × 10000 samples ===");

let num_requests = 1000u64;
let samples_per_request = 10_000u64;
let total_samples = num_requests * samples_per_request;

// Pre-build all request bodies so serialization doesn't count against throughput
let mut bodies = Vec::with_capacity(num_requests as usize);
for req_idx in 0..num_requests {
let mut timeseries = Vec::with_capacity(samples_per_request as usize);
for s in 0..samples_per_request {
let series_label = format!("tp_{}", s % 50); // 50 distinct series
let ts = 300_000 + req_idx as i64 * 10_000 + s as i64;
timeseries.push(make_sample("fake_metric", &series_label, ts, s as f64));
}
bodies.push(build_remote_write_body(timeseries));
}

let throughput_start = std::time::Instant::now();

for (i, body) in bodies.into_iter().enumerate() {
let resp = client
.post(format!("http://localhost:{RAW_INGEST_PORT}/api/v1/write"))
.header("Content-Type", "application/x-protobuf")
.header("Content-Encoding", "snappy")
.body(body)
.send()
.await?;
if resp.status() != reqwest::StatusCode::NO_CONTENT {
eprintln!(" Request {i} failed: {}", resp.status());
}
}

let send_elapsed = throughput_start.elapsed();
println!(
" All {} requests sent in {:.1}ms",
num_requests,
send_elapsed.as_secs_f64() * 1000.0,
);

// Poll until workers drain or timeout after 60s
let max_ts = 300_000u64 + num_requests * 10_000 + samples_per_request;
let drain_deadline = std::time::Instant::now() + std::time::Duration::from_secs(60);
let mut tp_buckets: usize;
loop {
let tp_results =
store.query_precomputed_output("fake_metric", raw_agg_id, 300_000, max_ts)?;
tp_buckets = tp_results.values().map(|v| v.len()).sum();
if tp_buckets as u64 >= total_samples || std::time::Instant::now() > drain_deadline {
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
}
let total_elapsed = throughput_start.elapsed();

let send_throughput = total_samples as f64 / send_elapsed.as_secs_f64();
let e2e_throughput = tp_buckets as f64 / total_elapsed.as_secs_f64();
println!(" Stored {tp_buckets}/{total_samples} samples");
println!(
" Send throughput: {:.0} samples/sec ({:.1}ms for {total_samples} samples)",
send_throughput,
send_elapsed.as_secs_f64() * 1000.0,
);
println!(
" E2E throughput: {:.0} samples/sec ({:.1}ms until all stored)",
e2e_throughput,
total_elapsed.as_secs_f64() * 1000.0,
);
assert!(
tp_buckets as u64 >= total_samples,
"Expected at least {total_samples} samples in store, got {tp_buckets}"
);
println!(" Throughput test PASSED");

println!("\n=== E2E test complete ===");

Ok(())
Expand Down
24 changes: 14 additions & 10 deletions asap-query-engine/src/precompute_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,20 @@ async fn handle_ingest(State(state): State<Arc<IngestState>>, body: Bytes) -> St
.push((s.timestamp_ms, s.value));
}

// Route each series batch to the correct worker
for (series_key, batch) in by_series {
if let Err(e) = state
.router
.route(series_key, batch, ingest_received_at)
.await
{
warn!("Routing error for {}: {}", series_key, e);
return StatusCode::INTERNAL_SERVER_ERROR;
}
// Convert to owned keys for batch routing
let by_series_owned: HashMap<String, Vec<(i64, f64)>> = by_series
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect();

// Route all series to workers concurrently
if let Err(e) = state
.router
.route_batch(by_series_owned, ingest_received_at)
.await
{
warn!("Batch routing error: {}", e);
return StatusCode::INTERNAL_SERVER_ERROR;
}

StatusCode::NO_CONTENT
Expand Down
46 changes: 46 additions & 0 deletions asap-query-engine/src/precompute_engine/series_router.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use futures::future::try_join_all;
use std::collections::HashMap;
use std::time::Instant;
use tokio::sync::mpsc;
use xxhash_rust::xxh64::xxh64;
Expand Down Expand Up @@ -52,6 +54,50 @@ impl SeriesRouter {
Ok(())
}

/// Route a pre-grouped batch of series to workers concurrently.
///
/// Groups messages by target worker, then sends to each worker in parallel
/// (messages within a single worker are sent sequentially to preserve ordering).
pub async fn route_batch(
&self,
by_series: HashMap<String, Vec<(i64, f64)>>,
ingest_received_at: Instant,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Group messages by target worker index
let mut per_worker: HashMap<usize, Vec<WorkerMessage>> = HashMap::new();
for (series_key, samples) in by_series {
let worker_idx = self.worker_for(&series_key);
per_worker
.entry(worker_idx)
.or_default()
.push(WorkerMessage::Samples {
series_key,
samples,
ingest_received_at,
});
}

// Send to each worker concurrently
try_join_all(per_worker.into_iter().map(|(worker_idx, messages)| {
let sender = &self.senders[worker_idx];
async move {
for msg in messages {
sender
.send(msg)
.await
.map_err(|e| format!("Failed to send to worker {}: {}", worker_idx, e))?;
}
Ok::<(), String>(())
}
}))
.await
.map_err(|e| -> Box<dyn std::error::Error + Send + Sync> {
Box::new(std::io::Error::other(e))
})?;

Ok(())
}

/// Broadcast a flush signal to all workers.
pub async fn broadcast_flush(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
for (i, sender) in self.senders.iter().enumerate() {
Expand Down
44 changes: 44 additions & 0 deletions asap-query-engine/src/precompute_engine/window_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,20 @@ impl WindowManager {
closed
}

/// Return all window starts whose window `[start, start + window_size_ms)`
/// contains the given timestamp. For tumbling windows this returns exactly
/// one start; for sliding windows it returns `ceil(window_size / slide)`
/// starts.
pub fn window_starts_containing(&self, timestamp_ms: i64) -> Vec<i64> {
let mut starts = Vec::new();
let mut start = self.window_start_for(timestamp_ms);
while start + self.window_size_ms > timestamp_ms {
starts.push(start);
start -= self.slide_interval_ms;
}
starts
}

/// Return the window `[start, end)` boundaries for a given window start.
pub fn window_bounds(&self, window_start: i64) -> (i64, i64) {
(window_start, window_start + self.window_size_ms)
Expand Down Expand Up @@ -150,4 +164,34 @@ mod tests {
let closed = wm.closed_windows(15_000, 35_000);
assert_eq!(closed, vec![0]);
}

#[test]
fn test_window_starts_containing_tumbling() {
// 60s tumbling windows — each sample belongs to exactly one window
let wm = WindowManager::new(60, 0);
let mut starts = wm.window_starts_containing(15_000);
starts.sort();
assert_eq!(starts, vec![0]);

let mut starts = wm.window_starts_containing(60_000);
starts.sort();
assert_eq!(starts, vec![60_000]);
}

#[test]
fn test_window_starts_containing_sliding() {
// 30s window, 10s slide — each sample belongs to 3 windows
let wm = WindowManager::new(30, 10);

// t=15_000 belongs to [0, 30_000), [10_000, 40_000)
// and [-10_000, 20_000) which starts negative — still returned
let mut starts = wm.window_starts_containing(15_000);
starts.sort();
assert_eq!(starts, vec![-10_000, 0, 10_000]);

// t=30_000 belongs to [10_000, 40_000), [20_000, 50_000), [30_000, 60_000)
let mut starts = wm.window_starts_containing(30_000);
starts.sort();
assert_eq!(starts, vec![10_000, 20_000, 30_000]);
}
}
26 changes: 14 additions & 12 deletions asap-query-engine/src/precompute_engine/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,18 +221,20 @@ impl Worker {
continue; // already dropped
}

let window_start = agg_state.window_manager.window_start_for(ts);

let updater = agg_state
.active_windows
.entry(window_start)
.or_insert_with(|| create_accumulator_updater(&agg_state.config));

if updater.is_keyed() {
let key = extract_key_from_series(series_key, &agg_state.config);
updater.update_keyed(&key, val, ts);
} else {
updater.update_single(val, ts);
let window_starts = agg_state.window_manager.window_starts_containing(ts);

for window_start in window_starts {
let updater = agg_state
.active_windows
.entry(window_start)
.or_insert_with(|| create_accumulator_updater(&agg_state.config));

if updater.is_keyed() {
let key = extract_key_from_series(series_key, &agg_state.config);
updater.update_keyed(&key, val, ts);
} else {
updater.update_single(val, ts);
}
}
}

Expand Down
Loading