From 480ae78c9e3c4e06039027d8adbb57fb59aef8ef Mon Sep 17 00:00:00 2001 From: zz_y Date: Mon, 6 Apr 2026 20:45:59 -0500 Subject: [PATCH 1/3] refactor: eliminate code duplication + add bench_precompute_sketch binary - Refactor Worker to use WorkerRuntimeConfig struct and Arc - Add bench_precompute_sketch binary for precompute engine benchmarking - Add e2e_precompute_equivalence integration test - Expand design doc with second-tier merge worker design Co-Authored-By: Claude Opus 4.6 (1M context) --- asap-query-engine/Cargo.toml | 3 + .../src/bin/bench_precompute_sketch.rs | 496 ++++++++++++++++++ .../precompute_engine/accumulator_factory.rs | 329 +++++++----- .../src/precompute_engine/engine.rs | 12 +- .../precompute_engine_design_doc.md | 354 +++++++++++++ .../src/precompute_engine/worker.rs | 229 ++++---- .../tests/e2e_precompute_equivalence.rs | 317 +++++++++++ 7 files changed, 1487 insertions(+), 253 deletions(-) create mode 100644 asap-query-engine/src/bin/bench_precompute_sketch.rs create mode 100644 asap-query-engine/tests/e2e_precompute_equivalence.rs diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index df006de..e533add 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -67,6 +67,9 @@ path = "src/bin/precompute_engine.rs" name = "test_e2e_precompute" path = "src/bin/test_e2e_precompute.rs" +[[bin]] +name = "bench_precompute_sketch" +path = "src/bin/bench_precompute_sketch.rs" [dev-dependencies] ctor = "0.2" diff --git a/asap-query-engine/src/bin/bench_precompute_sketch.rs b/asap-query-engine/src/bin/bench_precompute_sketch.rs new file mode 100644 index 0000000..41a2322 --- /dev/null +++ b/asap-query-engine/src/bin/bench_precompute_sketch.rs @@ -0,0 +1,496 @@ +use clap::Parser; +use prost::Message; +use query_engine_rust::data_model::{AggregateCore, CleanupPolicy, LockStrategy, PrecomputedOutput, StreamingConfig}; +use query_engine_rust::drivers::ingest::prometheus_remote_write::{ + Label, Sample, TimeSeries, WriteRequest, +}; +use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig}; +use query_engine_rust::precompute_engine::output_sink::OutputSink; +use query_engine_rust::precompute_engine::PrecomputeEngine; +use query_engine_rust::stores::{SimpleMapStore, Store}; +use sketch_db_common::aggregation_config::AggregationConfig; +use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +#[derive(Parser, Debug)] +#[command(name = "bench_precompute_sketch")] +#[command(about = "Benchmark the precompute engine with DatasketchesKLL accumulators")] +struct Args { + #[arg(long, default_value_t = 4)] + workers: usize, + + #[arg(long, default_value_t = 4)] + concurrent_senders: usize, + + #[arg(long, default_value_t = 50)] + num_series: usize, + + #[arg(long, default_value_t = 100)] + samples_per_series: usize, + + #[arg(long, default_value_t = 100)] + num_requests: usize, + + #[arg(long, default_value_t = 5)] + latency_repetitions: usize, + + #[arg(long, default_value_t = 10)] + window_size_secs: u64, + + #[arg(long, default_value_t = 200)] + k: u16, + + #[arg(long, default_value_t = 19300)] + latency_port: u16, + + #[arg(long, default_value_t = 19301)] + throughput_port: u16, +} + +struct TrackingStoreSink { + store: Arc, + emitted_outputs: AtomicU64, +} + +impl TrackingStoreSink { + fn new(store: Arc) -> Self { + Self { + store, + emitted_outputs: AtomicU64::new(0), + } + } + + fn emitted(&self) -> u64 { + self.emitted_outputs.load(Ordering::Relaxed) + } +} + +impl OutputSink for TrackingStoreSink { + fn emit_batch( + &self, + outputs: Vec<(PrecomputedOutput, Box)>, + ) -> Result<(), Box> { + if outputs.is_empty() { + return Ok(()); + } + let emitted = outputs.len() as u64; + self.store.insert_precomputed_output_batch(outputs)?; + self.emitted_outputs.fetch_add(emitted, Ordering::Relaxed); + Ok(()) + } +} + +fn build_remote_write_body(timeseries: Vec) -> Vec { + let write_req = WriteRequest { timeseries }; + let proto_bytes = write_req.encode_to_vec(); + snap::raw::Encoder::new() + .compress_vec(&proto_bytes) + .expect("snappy compression should succeed") +} + +fn make_timeseries(metric: &str, label_0: &str, samples: Vec) -> TimeSeries { + TimeSeries { + labels: vec![ + Label { + name: "__name__".into(), + value: metric.into(), + }, + Label { + name: "instance".into(), + value: "bench".into(), + }, + Label { + name: "job".into(), + value: "bench".into(), + }, + Label { + name: "label_0".into(), + value: label_0.into(), + }, + ], + samples, + } +} + +fn make_kll_streaming_config(aggregation_id: u64, window_size_secs: u64, k: u16) -> Arc { + let mut params = HashMap::new(); + params.insert("K".to_string(), serde_json::Value::from(k as u64)); + + let agg_config = AggregationConfig::new( + aggregation_id, + "DatasketchesKLL".to_string(), + String::new(), + params, + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + String::new(), + window_size_secs, + window_size_secs, + "tumbling".to_string(), + "bench_metric".to_string(), + "bench_metric".to_string(), + None, + None, + None, + ); + + let mut agg_map = HashMap::new(); + agg_map.insert(aggregation_id, agg_config); + Arc::new(StreamingConfig::new(agg_map)) +} + +fn make_store(streaming_config: Arc) -> Arc { + Arc::new(SimpleMapStore::new_with_strategy( + streaming_config, + CleanupPolicy::CircularBuffer, + LockStrategy::PerKey, + )) +} + +async fn start_engine( + port: u16, + workers: usize, + streaming_config: Arc, + sink: Arc, +) { + let config = PrecomputeEngineConfig { + num_workers: workers, + ingest_port: port, + allowed_lateness_ms: 5_000, + max_buffer_per_series: 100_000, + flush_interval_ms: 100, + channel_buffer_size: 50_000, + pass_raw_samples: false, + raw_mode_aggregation_id: 0, + late_data_policy: LateDataPolicy::Drop, + }; + let engine = PrecomputeEngine::new(config, streaming_config, sink); + tokio::spawn(async move { + if let Err(err) = engine.run().await { + eprintln!("precompute engine on port {port} failed: {err}"); + } + }); + tokio::time::sleep(Duration::from_millis(500)).await; +} + +fn build_window_body( + metric: &str, + num_series: usize, + samples_per_series: usize, + window_start_ms: i64, + window_size_ms: i64, +) -> Vec { + let mut timeseries = Vec::with_capacity(num_series); + for series_idx in 0..num_series { + let label = format!("series_{series_idx}"); + let mut samples = Vec::with_capacity(samples_per_series); + for sample_idx in 0..samples_per_series { + let offset = (sample_idx as i64) % window_size_ms.max(1); + samples.push(Sample { + value: (series_idx * samples_per_series + sample_idx) as f64, + timestamp: window_start_ms + offset, + }); + } + timeseries.push(make_timeseries(metric, &label, samples)); + } + build_remote_write_body(timeseries) +} + +fn build_watermark_body(metric: &str, num_series: usize, timestamp_ms: i64) -> Vec { + let mut timeseries = Vec::with_capacity(num_series); + for series_idx in 0..num_series { + let label = format!("series_{series_idx}"); + timeseries.push(make_timeseries( + metric, + &label, + vec![Sample { + value: 0.0, + timestamp: timestamp_ms, + }], + )); + } + build_remote_write_body(timeseries) +} + +async fn post_body( + client: &reqwest::Client, + port: u16, + body: Vec, +) -> Result<(), Box> { + let response = client + .post(format!("http://localhost:{port}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(body) + .send() + .await?; + if response.status() != reqwest::StatusCode::NO_CONTENT { + return Err(format!("unexpected HTTP status {}", response.status()).into()); + } + Ok(()) +} + +async fn wait_for_emitted_outputs( + sink: &TrackingStoreSink, + expected_outputs: u64, + timeout: Duration, +) -> Result> { + let start = Instant::now(); + let deadline = start + timeout; + loop { + if sink.emitted() >= expected_outputs { + return Ok(start.elapsed()); + } + if Instant::now() >= deadline { + return Err(format!( + "timed out waiting for outputs: expected {expected_outputs}, saw {}", + sink.emitted() + ) + .into()); + } + tokio::time::sleep(Duration::from_millis(20)).await; + } +} + +async fn run_latency_benchmark( + client: &reqwest::Client, + args: &Args, +) -> Result<(), Box> { + let aggregation_id = 101; + let metric = "bench_metric"; + let window_size_ms = (args.window_size_secs * 1000) as i64; + let streaming_config = make_kll_streaming_config(aggregation_id, args.window_size_secs, args.k); + let store = make_store(streaming_config.clone()); + let sink = Arc::new(TrackingStoreSink::new(store.clone())); + + start_engine( + args.latency_port, + args.workers, + streaming_config, + sink.clone(), + ) + .await; + + let warmup_body = build_window_body(metric, 1, 1, 0, window_size_ms); + let warmup_watermark = build_watermark_body(metric, 1, window_size_ms); + post_body(client, args.latency_port, warmup_body).await?; + post_body(client, args.latency_port, warmup_watermark).await?; + wait_for_emitted_outputs(&sink, 1, Duration::from_secs(5)).await?; + + let mut latencies_ms = Vec::with_capacity(args.latency_repetitions); + let mut rtts_ms = Vec::with_capacity(args.latency_repetitions); + + for rep in 0..args.latency_repetitions { + let baseline = sink.emitted(); + let window_start_ms = ((rep as i64) + 2) * 2 * window_size_ms; + let batch_body = build_window_body( + metric, + args.num_series, + args.samples_per_series, + window_start_ms, + window_size_ms, + ); + let watermark_body = build_watermark_body( + metric, + args.num_series, + window_start_ms + window_size_ms, + ); + + let t0 = Instant::now(); + post_body(client, args.latency_port, batch_body).await?; + let batch_rtt = t0.elapsed(); + post_body(client, args.latency_port, watermark_body).await?; + let e2e = wait_for_emitted_outputs( + &sink, + baseline + args.num_series as u64, + Duration::from_secs(10), + ) + .await?; + + latencies_ms.push(e2e.as_secs_f64() * 1000.0); + rtts_ms.push(batch_rtt.as_secs_f64() * 1000.0); + } + + let latency_store_results = store.query_precomputed_output( + metric, + aggregation_id, + 0, + ((args.latency_repetitions as u64) + 10) * args.window_size_secs * 1000, + )?; + let stored_windows: usize = latency_store_results.values().map(|buckets| buckets.len()).sum(); + + println!("\n=== DatasketchesKLL latency benchmark ==="); + println!( + " Config: {} workers, {} series, {} samples/series, K={}, {} repetitions", + args.workers, args.num_series, args.samples_per_series, args.k, args.latency_repetitions + ); + println!( + " HTTP RTT ms: min {:.2}, mean {:.2}, max {:.2}", + min_ms(&rtts_ms), + mean_ms(&rtts_ms), + max_ms(&rtts_ms) + ); + println!( + " E2E latency ms: min {:.2}, mean {:.2}, max {:.2}", + min_ms(&latencies_ms), + mean_ms(&latencies_ms), + max_ms(&latencies_ms) + ); + println!( + " Stored windows: {} (expected at least {})", + stored_windows, + 1 + args.latency_repetitions * args.num_series + ); + + Ok(()) +} + +async fn run_throughput_benchmark( + client: &reqwest::Client, + args: &Args, +) -> Result<(), Box> { + let aggregation_id = 202; + let metric = "bench_metric"; + let window_size_ms = (args.window_size_secs * 1000) as i64; + let total_samples = (args.num_requests * args.num_series * args.samples_per_series) as u64; + let expected_outputs = (args.num_requests * args.num_series) as u64; + + let streaming_config = make_kll_streaming_config(aggregation_id, args.window_size_secs, args.k); + let store = make_store(streaming_config.clone()); + let sink = Arc::new(TrackingStoreSink::new(store.clone())); + + start_engine( + args.throughput_port, + args.workers, + streaming_config, + sink.clone(), + ) + .await; + + let mut bodies = Vec::with_capacity(args.num_requests); + for req_idx in 0..args.num_requests { + let window_start_ms = (req_idx as i64) * window_size_ms; + bodies.push(build_window_body( + metric, + args.num_series, + args.samples_per_series, + window_start_ms, + window_size_ms, + )); + } + let final_watermark = build_watermark_body( + metric, + args.num_series, + (args.num_requests as i64) * window_size_ms, + ); + + let throughput_start = Instant::now(); + let mut chunks = vec![Vec::new(); args.concurrent_senders]; + for (idx, body) in bodies.into_iter().enumerate() { + chunks[idx % args.concurrent_senders].push(body); + } + + let mut handles = Vec::with_capacity(args.concurrent_senders); + for chunk in chunks { + let client = client.clone(); + let port = args.throughput_port; + handles.push(tokio::spawn(async move { + for body in chunk { + post_body(&client, port, body).await?; + } + Ok::<(), Box>(()) + })); + } + + for handle in handles { + handle.await??; + } + post_body(client, args.throughput_port, final_watermark).await?; + let send_elapsed = throughput_start.elapsed(); + + let wait_elapsed = wait_for_emitted_outputs( + &sink, + expected_outputs, + Duration::from_secs(60), + ) + .await?; + let total_elapsed = throughput_start.elapsed(); + + let store_results = store.query_precomputed_output( + metric, + aggregation_id, + 0, + ((args.num_requests as u64) + 2) * args.window_size_secs * 1000, + )?; + let stored_windows: usize = store_results.values().map(|buckets| buckets.len()).sum(); + + println!("\n=== DatasketchesKLL throughput benchmark ==="); + println!( + " Config: {} workers, {} senders, {} requests, {} series, {} samples/series, K={}", + args.workers, + args.concurrent_senders, + args.num_requests, + args.num_series, + args.samples_per_series, + args.k + ); + println!(" Total samples: {total_samples}"); + println!( + " Send throughput: {:.0} samples/sec ({:.1}ms)", + total_samples as f64 / send_elapsed.as_secs_f64(), + send_elapsed.as_secs_f64() * 1000.0 + ); + println!( + " E2E throughput: {:.0} samples/sec ({:.1}ms, drain wait {:.1}ms)", + total_samples as f64 / total_elapsed.as_secs_f64(), + total_elapsed.as_secs_f64() * 1000.0, + wait_elapsed.as_secs_f64() * 1000.0 + ); + println!( + " Stored windows: {} (expected {})", + stored_windows, expected_outputs + ); + + if stored_windows as u64 != expected_outputs { + return Err(format!( + "throughput benchmark stored {stored_windows} windows, expected {expected_outputs}" + ) + .into()); + } + + Ok(()) +} + +fn min_ms(values: &[f64]) -> f64 { + values.iter().copied().fold(f64::INFINITY, f64::min) +} + +fn mean_ms(values: &[f64]) -> f64 { + values.iter().sum::() / values.len() as f64 +} + +fn max_ms(values: &[f64]) -> f64 { + values.iter().copied().fold(f64::NEG_INFINITY, f64::max) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::parse(); + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE) + .init(); + + let client = reqwest::Client::new(); + + run_latency_benchmark(&client, &args).await?; + run_throughput_benchmark(&client, &args).await?; + + Ok(()) +} diff --git a/asap-query-engine/src/precompute_engine/accumulator_factory.rs b/asap-query-engine/src/precompute_engine/accumulator_factory.rs index 028d47f..500b122 100644 --- a/asap-query-engine/src/precompute_engine/accumulator_factory.rs +++ b/asap-query-engine/src/precompute_engine/accumulator_factory.rs @@ -6,6 +6,24 @@ use crate::precompute_operators::{ }; use sketch_db_common::aggregation_config::AggregationConfig; +/// Generate the two boilerplate clone-based `AccumulatorUpdater` methods +/// for updaters whose inner `acc` field implements `Clone + AggregateCore`. +/// Not applicable to `IncreaseAccumulatorUpdater` (its `acc` is `Option<_>` +/// with non-trivial `None` handling). +macro_rules! impl_clone_accumulator_methods { + ($acc_field:ident) => { + fn take_accumulator(&mut self) -> Box { + let result = Box::new(self.$acc_field.clone()); + self.reset(); + result + } + + fn snapshot_accumulator(&self) -> Box { + Box::new(self.$acc_field.clone()) + } + }; +} + /// Trait for feeding samples into accumulators in the precompute engine. /// /// This provides a uniform interface over all accumulator types so that the @@ -65,15 +83,7 @@ impl AccumulatorUpdater for SumAccumulatorUpdater { self.update_single(value, timestamp_ms); } - fn take_accumulator(&mut self) -> Box { - let result = Box::new(self.acc.clone()); - self.reset(); - result - } - - fn snapshot_accumulator(&self) -> Box { - Box::new(self.acc.clone()) - } + impl_clone_accumulator_methods!(acc); fn reset(&mut self) { self.acc = SumAccumulator::new(); @@ -115,15 +125,7 @@ impl AccumulatorUpdater for MinMaxAccumulatorUpdater { self.update_single(value, timestamp_ms); } - fn take_accumulator(&mut self) -> Box { - let result = Box::new(self.acc.clone()); - self.reset(); - result - } - - fn snapshot_accumulator(&self) -> Box { - Box::new(self.acc.clone()) - } + impl_clone_accumulator_methods!(acc); fn reset(&mut self) { self.acc = MinMaxAccumulator::new(self.sub_type.clone()); @@ -178,6 +180,7 @@ impl AccumulatorUpdater for IncreaseAccumulatorUpdater { self.update_single(value, timestamp_ms); } + // Hand-written: acc is Option<_> with non-trivial None handling. fn take_accumulator(&mut self) -> Box { let acc = self.acc.take().unwrap_or_else(|| { IncreaseAccumulator::new(Measurement::new(0.0), 0, Measurement::new(0.0), 0) @@ -239,15 +242,7 @@ impl AccumulatorUpdater for KllAccumulatorUpdater { self.update_single(value, timestamp_ms); } - fn take_accumulator(&mut self) -> Box { - let result = Box::new(self.acc.clone()); - self.reset(); - result - } - - fn snapshot_accumulator(&self) -> Box { - Box::new(self.acc.clone()) - } + impl_clone_accumulator_methods!(acc); fn reset(&mut self) { self.acc = DatasketchesKLLAccumulator::new(self.k); @@ -294,15 +289,7 @@ impl AccumulatorUpdater for MultipleSumUpdater { self.acc.update(key.clone(), value); } - fn take_accumulator(&mut self) -> Box { - let result = Box::new(self.acc.clone()); - self.reset(); - result - } - - fn snapshot_accumulator(&self) -> Box { - Box::new(self.acc.clone()) - } + impl_clone_accumulator_methods!(acc); fn reset(&mut self) { self.acc = MultipleSumAccumulator::new(); @@ -345,15 +332,7 @@ impl AccumulatorUpdater for MultipleMinMaxUpdater { self.acc.update(key.clone(), value); } - fn take_accumulator(&mut self) -> Box { - let result = Box::new(self.acc.clone()); - self.reset(); - result - } - - fn snapshot_accumulator(&self) -> Box { - Box::new(self.acc.clone()) - } + impl_clone_accumulator_methods!(acc); fn reset(&mut self) { self.acc = MultipleMinMaxAccumulator::new(self.sub_type.clone()); @@ -414,15 +393,7 @@ impl AccumulatorUpdater for MultipleIncreaseUpdater { } } - fn take_accumulator(&mut self) -> Box { - let result = Box::new(self.acc.clone()); - self.reset(); - result - } - - fn snapshot_accumulator(&self) -> Box { - Box::new(self.acc.clone()) - } + impl_clone_accumulator_methods!(acc); fn reset(&mut self) { self.acc = MultipleIncreaseAccumulator::new(); @@ -469,15 +440,7 @@ impl AccumulatorUpdater for CmsAccumulatorUpdater { self.acc.inner.update(&key.to_semicolon_str(), value); } - fn take_accumulator(&mut self) -> Box { - let result = Box::new(self.acc.clone()); - self.reset(); - result - } - - fn snapshot_accumulator(&self) -> Box { - Box::new(self.acc.clone()) - } + impl_clone_accumulator_methods!(acc); fn reset(&mut self) { self.acc = CountMinSketchAccumulator::new(self.row_num, self.col_num); @@ -524,15 +487,7 @@ impl AccumulatorUpdater for HydraKllAccumulatorUpdater { self.acc.update(key, value); } - fn take_accumulator(&mut self) -> Box { - let result = Box::new(self.acc.clone()); - self.reset(); - result - } - - fn snapshot_accumulator(&self) -> Box { - Box::new(self.acc.clone()) - } + impl_clone_accumulator_methods!(acc); fn reset(&mut self) { self.acc = HydraKllSketchAccumulator::new(self.row_num, self.col_num, self.k); @@ -548,6 +503,67 @@ impl AccumulatorUpdater for HydraKllAccumulatorUpdater { } } +// --------------------------------------------------------------------------- +// Config helpers +// --------------------------------------------------------------------------- + +/// Return `true` if `config` produces a keyed (MultipleSubpopulation) updater, +/// without allocating an updater object. +/// +/// **Contract:** this must agree with every concrete `AccumulatorUpdater::is_keyed()` +/// implementation. When a new accumulator type is added, update both here and +/// in the corresponding struct. +pub fn config_is_keyed(config: &AggregationConfig) -> bool { + matches!( + config.aggregation_type.as_str(), + "MultipleSubpopulation" + | "MultipleSum" + | "multiple_sum" + | "MultipleIncrease" + | "multiple_increase" + | "MultipleMinMax" + | "multiple_min_max" + | "CountMinSketch" + | "count_min_sketch" + | "CMS" + | "cms" + | "HydraKLL" + | "hydra_kll" + ) +} + +/// Extract the KLL `k` parameter. Capital `"K"` takes precedence over lowercase +/// `"k"` to match the convention used by the top-level aggregation type arms. +fn kll_k_param(config: &AggregationConfig) -> u16 { + config + .parameters + .get("K") + .or_else(|| config.parameters.get("k")) + .and_then(|v| v.as_u64()) + .unwrap_or(200) as u16 +} + +/// Extract `(row_num, col_num)` for CMS / HydraKLL configs. +fn cms_params(config: &AggregationConfig) -> (usize, usize) { + let row_num = config + .parameters + .get("row_num") + .and_then(|v| v.as_u64()) + .unwrap_or(4) as usize; + let col_num = config + .parameters + .get("col_num") + .and_then(|v| v.as_u64()) + .unwrap_or(1000) as usize; + (row_num, col_num) +} + +/// Extract `(row_num, col_num, k)` for HydraKLL configs. +fn hydra_kll_params(config: &AggregationConfig) -> (usize, usize, u16) { + let (row_num, col_num) = cms_params(config); + (row_num, col_num, kll_k_param(config)) +} + // --------------------------------------------------------------------------- // Factory function // --------------------------------------------------------------------------- @@ -564,12 +580,7 @@ pub fn create_accumulator_updater(config: &AggregationConfig) -> Box Box::new(MinMaxAccumulatorUpdater::new("max".to_string())), "Increase" | "increase" => Box::new(IncreaseAccumulatorUpdater::new()), "DatasketchesKLL" | "datasketches_kll" | "KLL" | "kll" => { - let k = config - .parameters - .get("k") - .and_then(|v| v.as_u64()) - .unwrap_or(200) as u16; - Box::new(KllAccumulatorUpdater::new(k)) + Box::new(KllAccumulatorUpdater::new(kll_k_param(config))) } other => { tracing::warn!( @@ -585,34 +596,11 @@ pub fn create_accumulator_updater(config: &AggregationConfig) -> Box Box::new(MultipleMinMaxUpdater::new("max".to_string())), "Increase" | "increase" => Box::new(MultipleIncreaseUpdater::new()), "CountMinSketch" | "count_min_sketch" | "CMS" | "cms" => { - let row_num = config - .parameters - .get("row_num") - .and_then(|v| v.as_u64()) - .unwrap_or(4) as usize; - let col_num = config - .parameters - .get("col_num") - .and_then(|v| v.as_u64()) - .unwrap_or(1000) as usize; + let (row_num, col_num) = cms_params(config); Box::new(CmsAccumulatorUpdater::new(row_num, col_num)) } "HydraKLL" | "hydra_kll" => { - let row_num = config - .parameters - .get("row_num") - .and_then(|v| v.as_u64()) - .unwrap_or(4) as usize; - let col_num = config - .parameters - .get("col_num") - .and_then(|v| v.as_u64()) - .unwrap_or(1000) as usize; - let k = config - .parameters - .get("k") - .and_then(|v| v.as_u64()) - .unwrap_or(200) as u16; + let (row_num, col_num, k) = hydra_kll_params(config); Box::new(HydraKllAccumulatorUpdater::new(row_num, col_num, k)) } other => { @@ -623,15 +611,9 @@ pub fn create_accumulator_updater(config: &AggregationConfig) -> Box { - let k = config - .parameters - .get("K") - .or_else(|| config.parameters.get("k")) - .and_then(|v| v.as_u64()) - .unwrap_or(200) as u16; - Box::new(KllAccumulatorUpdater::new(k)) + Box::new(KllAccumulatorUpdater::new(kll_k_param(config))) } "MultipleSum" | "multiple_sum" => Box::new(MultipleSumUpdater::new()), "MultipleIncrease" | "multiple_increase" => Box::new(MultipleIncreaseUpdater::new()), @@ -647,35 +629,11 @@ pub fn create_accumulator_updater(config: &AggregationConfig) -> Box Box::new(MinMaxAccumulatorUpdater::new("max".to_string())), "Increase" | "increase" => Box::new(IncreaseAccumulatorUpdater::new()), "CountMinSketch" | "count_min_sketch" | "CMS" | "cms" => { - let row_num = config - .parameters - .get("row_num") - .and_then(|v| v.as_u64()) - .unwrap_or(4) as usize; - let col_num = config - .parameters - .get("col_num") - .and_then(|v| v.as_u64()) - .unwrap_or(1000) as usize; + let (row_num, col_num) = cms_params(config); Box::new(CmsAccumulatorUpdater::new(row_num, col_num)) } "HydraKLL" | "hydra_kll" => { - let row_num = config - .parameters - .get("row_num") - .and_then(|v| v.as_u64()) - .unwrap_or(4) as usize; - let col_num = config - .parameters - .get("col_num") - .and_then(|v| v.as_u64()) - .unwrap_or(1000) as usize; - let k = config - .parameters - .get("K") - .or_else(|| config.parameters.get("k")) - .and_then(|v| v.as_u64()) - .unwrap_or(200) as u16; + let (row_num, col_num, k) = hydra_kll_params(config); Box::new(HydraKllAccumulatorUpdater::new(row_num, col_num, k)) } other => { @@ -761,4 +719,101 @@ mod tests { let acc = updater.take_accumulator(); assert_eq!(acc.type_name(), "SumAccumulator"); } + + #[test] + fn test_config_is_keyed() { + use std::collections::HashMap; + + let make_config = |agg_type: &str, sub_type: &str| { + AggregationConfig::new( + 1, + agg_type.to_string(), + sub_type.to_string(), + HashMap::new(), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + String::new(), + 60, + "m".to_string(), + "m".to_string(), + None, + None, + Some(60), + Some(0), + None, + None, + None, + ) + }; + + // Non-keyed types + assert!(!config_is_keyed(&make_config("SingleSubpopulation", "Sum"))); + assert!(!config_is_keyed(&make_config("Sum", ""))); + assert!(!config_is_keyed(&make_config("DatasketchesKLL", ""))); + assert!(!config_is_keyed(&make_config("KLL", ""))); + assert!(!config_is_keyed(&make_config("Increase", ""))); + + // Keyed types + assert!(config_is_keyed(&make_config("MultipleSubpopulation", "Sum"))); + assert!(config_is_keyed(&make_config("MultipleSum", ""))); + assert!(config_is_keyed(&make_config("MultipleIncrease", ""))); + assert!(config_is_keyed(&make_config("MultipleMinMax", ""))); + assert!(config_is_keyed(&make_config("CountMinSketch", ""))); + assert!(config_is_keyed(&make_config("CMS", ""))); + assert!(config_is_keyed(&make_config("HydraKLL", ""))); + + // Verify agreement with updater.is_keyed() + for (agg_type, sub_type) in &[ + ("SingleSubpopulation", "Sum"), + ("MultipleSubpopulation", "Sum"), + ("MultipleSum", ""), + ("DatasketchesKLL", ""), + ("CountMinSketch", ""), + ] { + let config = make_config(agg_type, sub_type); + let updater = create_accumulator_updater(&config); + assert_eq!( + config_is_keyed(&config), + updater.is_keyed(), + "config_is_keyed disagrees with updater.is_keyed() for type={}", + agg_type + ); + } + } + + #[test] + fn test_kll_k_param_capital_k() { + // SingleSubpopulation/KLL with capital "K" param should use it (not default to 200) + use std::collections::HashMap; + let mut params = HashMap::new(); + params.insert("K".to_string(), serde_json::Value::from(50_u64)); + let config = AggregationConfig::new( + 1, + "SingleSubpopulation".to_string(), + "DatasketchesKLL".to_string(), + params, + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + String::new(), + 60, + "m".to_string(), + "m".to_string(), + None, + None, + Some(60), + Some(0), + None, + None, + None, + ); + let updater = create_accumulator_updater(&config); + let acc = updater.snapshot_accumulator(); + let kll = acc + .as_any() + .downcast_ref::() + .expect("should be KLL"); + assert_eq!(kll.inner.k, 50, "k should be 50 from capital-K param"); + } } diff --git a/asap-query-engine/src/precompute_engine/engine.rs b/asap-query-engine/src/precompute_engine/engine.rs index 5df148c..2765671 100644 --- a/asap-query-engine/src/precompute_engine/engine.rs +++ b/asap-query-engine/src/precompute_engine/engine.rs @@ -7,6 +7,7 @@ use crate::precompute_engine::output_sink::OutputSink; use crate::precompute_engine::series_router::{SeriesRouter, WorkerMessage}; use crate::precompute_engine::worker::Worker; use axum::{routing::post, Router}; +use sketch_db_common::aggregation_config::AggregationConfig; use std::collections::HashMap; use std::sync::Arc; use tokio::net::TcpListener; @@ -53,9 +54,14 @@ impl PrecomputeEngine { // Build the router let router = SeriesRouter::new(senders); - // Build aggregation config map from streaming config - let agg_configs: HashMap = - self.streaming_config.get_all_aggregation_configs().clone(); + // Build aggregation config map from streaming config, wrapping each config + // in Arc so all workers share one copy per aggregation (no N×M deep clones). + let agg_configs: HashMap> = self + .streaming_config + .get_all_aggregation_configs() + .iter() + .map(|(&id, cfg)| (id, Arc::new(cfg.clone()))) + .collect(); // Spawn workers let mut worker_handles = Vec::with_capacity(num_workers); diff --git a/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md b/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md index 2244b45..5ab706f 100644 --- a/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md +++ b/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md @@ -505,6 +505,360 @@ Each entry is a complete accumulator from one worker for one window. Query-time The pane snapshot/take logic reduces memory and CPU inside each worker (avoiding re-accumulation of shared panes), but what exits the worker is always one standalone `Box` per window. Consecutive sliding windows `[0,30s)` and `[10s,40s)` share panes *inside* the worker but have independent store entries — their cross-worker merges at query time are completely unrelated. +### Optional second-tier merge workers for cross-worker accumulator reduction + +The current design relies on **query-time merge** for cross-worker fan-in. That +is sufficient for eventual consistency, but it leaves two important gaps: + +1. **Query latency / repeated work** — queries must repeatedly merge + per-worker fragments for the same `(aggregation_id, key, window)` tuple. +2. **No canonical reduced output** — the store holds worker-local fragments + rather than a single merged accumulator for each logical output window. + +This affects **all mergeable accumulators**, not just sliding-window sketches: + +- tumbling-window group-by aggregates spread across workers +- sliding-window exact outputs +- keyed sketches such as CMS / HydraKLL +- late-data mini-accumulators emitted via `ForwardToStore` + +Sliding windows are the strongest motivation because exact reads benefit most +from canonical pre-merged output, but the same second-tier reduction design can +be used for any accumulator type that implements `AggregateCore::merge_with()`. + +To close those gaps without giving up shared-nothing ingest, add an optional +**second tier of merge workers** between the first-tier workers and the final +store. + +#### Goal + +Produce **one canonical accumulator per logical output window**: + +``` +(aggregation_id, grouping_key, window_start, window_end) -> merged accumulator +``` + +so that: + +- the store can hold one merged output per `(aggregation_id, key, window)` +- query-time merge is reduced or eliminated for merge-tier-enabled aggregations +- distributed output matches the semantics of a single logical aggregation over + the full input stream + +For sliding windows, this gives semantic equivalence to a single logical +`HOP(slide, size)` aggregation over the distributed input stream. For tumbling +windows and other mergeable accumulators, it gives the same result the query +engine would otherwise have to reconstruct on read. + +#### High-level architecture + +``` +Remote write + -> SeriesRouter + -> First-tier workers (per-series pane accumulation) + -> MergeRouter + -> Merge workers (per-window fan-in, all keys co-located) + -> FinalOutputSink + -> Store +``` + +The first tier is unchanged: it still owns all per-series pane state and emits +one standalone accumulator per closed window. The new part is that these emits +go to the `MergeRouter` instead of directly to the final store. + +#### Routing key for the merge tier + +All messages — both `PartialWindowAggregate` and `WindowCompletion` — are +routed by the **window identity alone**, without the grouping key: + +``` +merge_key = hash(aggregation_id, window_start_ms, window_end_ms) +``` + +This ensures that: + +1. All partials for any key within the same window land on the same merge + worker, so the merge worker can finalize all keys for a window in one pass. +2. `WindowCompletion` messages (which carry no key) route to exactly the same + merge worker as the partials they complete, through the same MPSC channel. + This preserves FIFO ordering: all `PartialWindowAggregate` messages from a + first-tier worker for a given window are enqueued before its + `WindowCompletion` for that window. + +Routing by `(agg_id, key, window)` is explicitly avoided. It would put partials +for different keys on different merge workers, requiring `WindowCompletion` to be +broadcast to every merge worker — an O(M) fanout for M merge workers per +completion. Window-level routing eliminates that broadcast entirely. + +#### Messages emitted by first-tier workers + +Each first-tier worker emits two message types per closed window per +aggregation, always in this order within the same MPSC channel to the merge +worker: + +1. Zero or more `PartialWindowAggregate` (one per series that had data in this + window), followed immediately by: +2. Exactly one `WindowCompletion` + +```rust +/// Partial result from one first-tier worker for one closed window. +/// Zero or more of these arrive before the WindowCompletion for the same +/// (aggregation_id, window_start_ms, window_end_ms, source_worker_id). +struct PartialWindowAggregate { + aggregation_id: u64, + key: Option, // grouping key extracted from the series + window_start_ms: u64, + window_end_ms: u64, + source_worker_id: usize, + accumulator: Box, +} + +/// Signals that source_worker_id will send no more on-time PartialWindowAggregates +/// for (aggregation_id, window_start_ms, window_end_ms). +/// One per (aggregation_id, window, source_worker_id) — no key field. +struct WindowCompletion { + aggregation_id: u64, + window_start_ms: u64, + window_end_ms: u64, + source_worker_id: usize, + worker_watermark_ms: i64, // the worker's watermark at the time of completion +} +``` + +`WindowCompletion` carries no `key` field. It is a window-level signal, not a +key-level signal. The merge worker infers which keys exist from the partials it +has received; it finalizes all observed keys once all sources have completed the +window. + +Because both message types are enqueued into the **same MPSC channel** from a +given first-tier worker to its target merge worker, the merge worker always +observes partials before the completion for the same +`(aggregation_id, window, source)` triple. No additional ordering guarantee is +needed. + +#### First-tier worker watermark for WindowCompletion + +`WindowCompletion` must be emitted by **every** first-tier worker for every +window of every aggregation it is configured for — including workers that +received no data for that aggregation or window. + +A worker may have no matching series for an aggregation (its `series_map` has +no entry for any series matching that config). In that case it will never +receive samples for that aggregation and its per-series watermarks will never +advance. Without an explicit mechanism, it would never emit `WindowCompletion`, +causing the merge worker to deadlock waiting for N completions. + +**Fix: per-worker global watermark, advanced by the flush timer.** + +Each first-tier worker maintains a `per_agg_watermark: HashMap` — +one entry per aggregation config it knows about, initialized to `i64::MIN`. + +The watermark for aggregation `agg_id` is updated by two sources: + +1. **Data arrival**: when any series matching `agg_id` closes a window, the + worker updates `per_agg_watermark[agg_id]` to the maximum watermark seen + across all its matching series. + +2. **Flush timer**: on each `WorkerMessage::Flush`, for every aggregation config + the worker is configured with, the worker advances `per_agg_watermark[agg_id]` + to at least `now_ms - allowed_lateness_ms` (wall-clock derived floor). This + ensures forward progress even when no data arrives for a given aggregation. + +After updating `per_agg_watermark[agg_id]`, the worker calls +`window_manager.closed_windows(previous_agg_wm, new_agg_wm)` and emits a +`WindowCompletion` for each newly closed window — even if no partials were +emitted for that window from this worker. + +This guarantees that every merge worker eventually receives exactly N +`WindowCompletion` messages per `(aggregation_id, window)` — one from each of +the N first-tier workers — and can finalize without deadlock. + +#### Late data and `ForwardToStore` + +When `LateDataPolicy::ForwardToStore` is active and a late sample falls into an +already-closed (and potentially already-merged) window, the first-tier worker +emits a `PartialWindowAggregate` for that window as it does today. + +The merge tier treats these late partials as **store appends**, not as +corrections to a finalized canonical entry. The canonical merged output written +at finalization time remains unchanged. The store accumulates the late partial +alongside it, and query-time `SummaryMergeMultipleExec` merges them on read. + +This is consistent with the existing `ForwardToStore` semantics and avoids the +need to read-modify-write a finalized store entry. The benefit of the merge tier +(one canonical output per window) applies only to on-time data; late corrections +fall back to the same append + query-time-merge path as the non-merge-tier +design. + +#### Merge-worker state + +Each merge worker maintains two separate tables, separating window-level +completion tracking from key-level partial accumulation: + +```rust +struct MergeWorkerState { + /// Per-window: which first-tier workers have sent WindowCompletion. + /// Key: (aggregation_id, window_start_ms, window_end_ms) + window_completions: HashMap<(u64, u64, u64), HashSet>, + + /// Per (aggregation_id, key, window): partial accumulators received so far. + /// Key: (aggregation_id, key, window_start_ms, window_end_ms) + pending_partials: HashMap<(u64, Option, u64, u64), Vec>>, +} +``` + +Separating these two maps is necessary because: + +- `window_completions` is indexed by window only (no key) — matching + `WindowCompletion`'s key-less definition. +- `pending_partials` is indexed by `(agg_id, key, window)` — matching the + per-key partials arriving from first-tier workers. + +Applying a single key-less `WindowCompletion` to the per-key `pending_partials` +map requires only a lookup in `window_completions` followed by iteration over +all `pending_partials` entries sharing the same `(agg_id, window)` — done once +at finalization, not per-completion. + +#### Merge-tier watermark and pending state eviction + +The merge tier tracks a **merge watermark** per aggregation: + +```rust +merge_watermark: HashMap // aggregation_id -> min watermark across sources +``` + +Each `WindowCompletion` carries `worker_watermark_ms`. The merge worker updates: + +```rust +merge_watermark[agg_id] = min over all sources of their latest watermark_ms +``` + +Any pending window with `window_end_ms <= merge_watermark[agg_id]` that has +also received all N completions is eligible for finalization and eviction. Any +pending window older than `merge_watermark[agg_id] - eviction_grace_period_ms` +is force-finalized with whatever partials have arrived, even if not all N +completions have been received (handles stuck or lagging first-tier workers). + +This bounds the size of `window_completions` and `pending_partials` in +proportion to `eviction_grace_period_ms / slide_interval_ms`, regardless of +workload. + +#### Completion protocol + +A merge worker finalizes all keys for a window when: + +```text +window_completions[(agg_id, window_start, window_end)].len() == num_first_tier_workers +``` + +Steps: +1. Check `window_completions` for the window just completed. +2. If all N sources have completed, collect all `pending_partials` entries + matching `(agg_id, *, window_start, window_end)` — iterate over keys + observed for that window. +3. For each observed key, merge its partial accumulators and write one + canonical `PrecomputedOutput` to the final store. +4. Remove the window from both `window_completions` and all matching entries + in `pending_partials`. + +If a `WindowCompletion` arrives for a window that has no partials in +`pending_partials` (i.e., no series on any first-tier worker matched this +aggregation for this window), the merge worker simply records the completion. +Once all N completions arrive, there is nothing to finalize and the window is +evicted immediately. + +#### Finalization + +When a window is complete, the merge worker: + +1. Iterates all keys observed for `(agg_id, window)` in `pending_partials`. +2. For each key, folds its `Vec>` with + `AggregateCore::merge_with()`. +3. Writes one canonical `PrecomputedOutput` per key to the final store. +4. Removes all entries for this window from both state maps. + +This produces the canonical shape expected by merge-tier-enabled reads: + +``` +store[(agg_id, key=(j1), [0,30s))] -> [merged_acc] +store[(agg_id, key=(j1), [10s,40s))] -> [merged_acc] +``` + +and for tumbling windows: + +``` +store[(agg_id, key=(j1), [0,60s))] -> [merged_acc] +``` + +#### Query-path simplification + +With the merge tier enabled for an aggregation: + +- **Instant sliding queries** become exact reads with no cross-worker merge. +- **Range sliding queries** iterate exact sliding windows and read one merged + accumulator per key/window. +- **Tumbling queries** read already-merged buckets instead of merging + per-worker fragments on demand. +- **General aggregate queries** over Sum, Min/Max, KLL, CMS, HydraKLL, etc. can + all consume canonical outputs if their aggregation id is merge-tier-enabled. + +This removes the current ambiguity where the store can return multiple exact +matches for one logical output window and the query layer must decide whether +to merge or pick one. Late corrections (via `ForwardToStore`) continue to use +query-time merge for their incremental updates. + +#### Why a separate merge worker is preferable to routing by grouping key + +Compared with "route all contributing series for a grouping key to the same +worker", the merge-tier approach: + +- preserves per-series ownership in the first tier +- avoids hot-spotting first-tier workers on popular grouping keys +- keeps pane state local to the ingest worker that already owns the series +- allows the merge tier to scale independently of ingest + +Compared with routing the merge tier by `(agg_id, key, window)`, window-level +routing: + +- avoids broadcasting `WindowCompletion` to every merge worker (O(M) fanout) +- keeps all keys for a window co-located, enabling one-pass finalization +- uses the same MPSC channel for partials and completions, giving free + ordering guarantees between them + +The cost is that all keys for a window go to the same merge worker, which +limits key-level parallelism within a window. In practice, the number of +distinct keys per window is bounded and this is not a bottleneck. + +#### Failure and persistence considerations + +This design introduces **merge-tier in-flight state** in addition to the +existing pane state in first-tier workers. To make the whole system robust, the +same durability story must cover both tiers: + +- first-tier WAL / pane snapshots protect open panes +- merge-tier WAL / partial-window snapshots protect unfinalized merge state + +Without persistence, a merge-worker crash loses pending cross-worker fan-in even +if the first-tier workers are healthy. The merge-tier watermark and +force-eviction after `eviction_grace_period_ms` bound the exposure window. + +#### Rollout strategy + +The cleanest migration path is: + +1. Keep the current store-append + query-time-merge model as the default. +2. Add `per_agg_watermark` tracking and flush-timer-driven `WindowCompletion` + emission to first-tier workers (required for correctness before any merge + worker is deployed). +3. Add an optional `MergeWorkerOutputSink` controlled by a per-aggregation flag. +4. Enable the merge tier first for **sliding-window aggregations** where the + benefit is largest. +5. Expand to tumbling windows and other high-fan-in accumulators once stable. + +Step 2 must precede step 3: first-tier workers must emit `WindowCompletion` for +all aggregations via the flush timer before any merge worker is deployed, +otherwise merge workers deadlock on their first window. + ## 5. Data Model ### PrecomputedOutput diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index 09a9739..3d73ceb 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -1,6 +1,6 @@ use crate::data_model::{AggregateCore, KeyByLabelValues, PrecomputedOutput}; use crate::precompute_engine::accumulator_factory::{ - create_accumulator_updater, AccumulatorUpdater, + config_is_keyed, create_accumulator_updater, AccumulatorUpdater, }; use crate::precompute_engine::config::LateDataPolicy; use crate::precompute_engine::output_sink::OutputSink; @@ -22,7 +22,7 @@ use tracing::{debug, debug_span, info, warn}; /// closes, its constituent panes are merged. This reduces per-sample /// accumulator updates from W to 1 (where W = window_size / slide_interval). struct AggregationState { - config: AggregationConfig, + config: Arc, window_manager: WindowManager, /// Active panes keyed by pane_start_ms. /// BTreeMap for ordered iteration (needed for pane eviction). @@ -45,7 +45,7 @@ pub struct Worker { /// Map from series key to per-series state. series_map: HashMap, /// Aggregation configs, keyed by aggregation_id. - agg_configs: HashMap, + agg_configs: HashMap>, /// Max buffer size per series. max_buffer_per_series: usize, /// Allowed lateness in ms. @@ -64,12 +64,8 @@ impl Worker { id: usize, receiver: mpsc::Receiver, output_sink: Arc, - agg_configs: HashMap, - max_buffer_per_series: usize, - allowed_lateness_ms: i64, - pass_raw_samples: bool, - raw_mode_aggregation_id: u64, - late_data_policy: LateDataPolicy, + agg_configs: HashMap>, + runtime_config: WorkerRuntimeConfig, ) -> Self { Self { id, @@ -136,7 +132,8 @@ impl Worker { } /// Find all aggregation configs whose metric/spatial_filter matches this series. - fn matching_agg_configs(&self, series_key: &str) -> Vec<(u64, &AggregationConfig)> { + /// Returns owned `Arc` clones so callers are not lifetime-bound to `&self`. + fn matching_agg_configs(&self, series_key: &str) -> Vec<(u64, Arc)> { let metric_name = extract_metric_name(series_key); self.agg_configs @@ -147,7 +144,7 @@ impl Worker { || config.spatial_filter_normalized == metric_name || config.spatial_filter == metric_name }) - .map(|(&id, config)| (id, config)) + .map(|(&id, config)| (id, Arc::clone(config))) .collect() } @@ -159,7 +156,7 @@ impl Worker { .into_iter() .map(|(_, config)| AggregationState { window_manager: WindowManager::new(config.window_size, config.slide_interval), - config: config.clone(), + config, // Arc clone is cheap; no deep copy active_panes: BTreeMap::new(), }) .collect(); @@ -256,13 +253,8 @@ impl Worker { } LateDataPolicy::ForwardToStore => { let mut updater = create_accumulator_updater(&agg_state.config); - if updater.is_keyed() { - let key = extract_key_from_series(series_key, &agg_state.config); - updater.update_keyed(&key, val, ts); - } else { - updater.update_single(val, ts); - } - let key = if updater.is_keyed() { + apply_sample(&mut *updater, series_key, val, ts, &agg_state.config); + let key = if config_is_keyed(&agg_state.config) { Some(extract_key_from_series(series_key, &agg_state.config)) } else { None @@ -289,12 +281,7 @@ impl Worker { .entry(pane_start) .or_insert_with(|| create_accumulator_updater(&agg_state.config)); - if updater.is_keyed() { - let key = extract_key_from_series(series_key, &agg_state.config); - updater.update_keyed(&key, val, ts); - } else { - updater.update_single(val, ts); - } + apply_sample(&mut **updater, series_key, val, ts, &agg_state.config); } // Emit closed windows by merging their constituent panes @@ -302,43 +289,13 @@ impl Worker { let (_, window_end) = agg_state.window_manager.window_bounds(*window_start); let pane_starts = agg_state.window_manager.panes_for_window(*window_start); - // Merge pane accumulators for this window. - // - Oldest pane (index 0): take_accumulator + remove (no future window needs it) - // - Remaining panes: snapshot_accumulator (shared with newer windows) - let mut merged: Option> = None; - - for (i, &ps) in pane_starts.iter().enumerate() { - let pane_acc = if i == 0 { - // Oldest pane: destructive take + evict from active_panes - agg_state - .active_panes - .remove(&ps) - .map(|mut updater| updater.take_accumulator()) + if let Some(accumulator) = + merge_panes_for_window(&mut agg_state.active_panes, &pane_starts) + { + let key = if config_is_keyed(&agg_state.config) { + Some(extract_key_from_series(series_key, &agg_state.config)) } else { - // Shared pane: non-destructive snapshot - agg_state - .active_panes - .get(&ps) - .map(|updater| updater.snapshot_accumulator()) - }; - - if let Some(acc) = pane_acc { - merged = Some(match merged { - None => acc, - Some(existing) => existing.merge_with(acc.as_ref()).unwrap_or(existing), - }); - } - } - - if let Some(accumulator) = merged { - let key = { - // Check keyed-ness from accumulator type name or config - let test_updater = create_accumulator_updater(&agg_state.config); - if test_updater.is_keyed() { - Some(extract_key_from_series(series_key, &agg_state.config)) - } else { - None - } + None }; let output = PrecomputedOutput::new( @@ -419,39 +376,13 @@ impl Worker { let (_, window_end) = agg_state.window_manager.window_bounds(*window_start); let pane_starts = agg_state.window_manager.panes_for_window(*window_start); - let mut merged: Option> = None; - - for (i, &ps) in pane_starts.iter().enumerate() { - let pane_acc = if i == 0 { - agg_state - .active_panes - .remove(&ps) - .map(|mut updater| updater.take_accumulator()) + if let Some(accumulator) = + merge_panes_for_window(&mut agg_state.active_panes, &pane_starts) + { + let key = if config_is_keyed(&agg_state.config) { + Some(extract_key_from_series(series_key, &agg_state.config)) } else { - agg_state - .active_panes - .get(&ps) - .map(|updater| updater.snapshot_accumulator()) - }; - - if let Some(acc) = pane_acc { - merged = Some(match merged { - None => acc, - Some(existing) => { - existing.merge_with(acc.as_ref()).unwrap_or(existing) - } - }); - } - } - - if let Some(accumulator) = merged { - let key = { - let test_updater = create_accumulator_updater(&agg_state.config); - if test_updater.is_keyed() { - Some(extract_key_from_series(series_key, &agg_state.config)) - } else { - None - } + None }; let output = PrecomputedOutput::new( @@ -567,6 +498,60 @@ fn parse_labels_from_series_key(series_key: &str) -> HashMap<&str, &str> { labels } +/// Route a single sample to `updater`, dispatching keyed vs. non-keyed based on config. +/// Eliminates repeated `if updater.is_keyed()` blocks at call sites. +fn apply_sample( + updater: &mut dyn AccumulatorUpdater, + series_key: &str, + val: f64, + ts: i64, + config: &AggregationConfig, +) { + if updater.is_keyed() { + let key = extract_key_from_series(series_key, config); + updater.update_keyed(&key, val, ts); + } else { + updater.update_single(val, ts); + } +} + +/// Merge the pane accumulators that constitute a closed window. +/// +/// The oldest pane (index 0) is taken destructively from `active_panes` +/// (no future window needs it). All later panes are snapshot-read +/// (non-destructive; they are shared by newer overlapping windows). +/// +/// Returns `None` if all panes for the window are absent. +fn merge_panes_for_window( + active_panes: &mut BTreeMap>, + pane_starts: &[i64], +) -> Option> { + let mut merged: Option> = None; + + for (i, &ps) in pane_starts.iter().enumerate() { + let pane_acc = if i == 0 { + // Oldest pane: destructive take + evict + active_panes + .remove(&ps) + .map(|mut updater| updater.take_accumulator()) + } else { + // Shared pane: non-destructive snapshot + active_panes + .get(&ps) + .map(|updater| updater.snapshot_accumulator()) + }; + + if let Some(acc) = pane_acc { + merged = Some(match merged { + None => acc, + Some(existing) => existing.merge_with(acc.as_ref()).unwrap_or(existing), + }); + } + } + + merged +} + #[cfg(test)] mod tests { use super::*; @@ -657,7 +642,7 @@ mod tests { } fn make_worker( - agg_configs: HashMap, + agg_configs: HashMap>, sink: Arc, pass_raw: bool, raw_agg_id: u64, @@ -677,6 +662,16 @@ mod tests { ) } + /// Wrap a `HashMap` for use with `make_worker`. + fn arc_configs( + configs: HashMap, + ) -> HashMap> { + configs + .into_iter() + .map(|(k, v)| (k, Arc::new(v))) + .collect() + } + // ----------------------------------------------------------------------- // Test: raw mode — each sample forwarded as SumAccumulator with sum==value // ----------------------------------------------------------------------- @@ -684,7 +679,8 @@ mod tests { #[test] fn test_raw_mode_forwarding() { let sink = Arc::new(CapturingOutputSink::new()); - let mut worker = make_worker(HashMap::new(), sink.clone(), true, 99, LateDataPolicy::Drop); + let mut worker = + make_worker(HashMap::new(), sink.clone(), true, 99, LateDataPolicy::Drop); let samples = vec![(1000_i64, 1.5_f64), (2000, 2.5), (3000, 7.0)]; worker @@ -721,7 +717,8 @@ mod tests { agg_configs.insert(1, config); let sink = Arc::new(CapturingOutputSink::new()); - let mut worker = make_worker(agg_configs, sink.clone(), false, 0, LateDataPolicy::Drop); + let mut worker = + make_worker(arc_configs(agg_configs), sink.clone(), false, 0, LateDataPolicy::Drop); // Samples in window [0, 10000ms): sum should be 1+2+3=6. // Send one at a time so the watermark advances incrementally — @@ -779,7 +776,8 @@ mod tests { agg_configs.insert(2, config); let sink = Arc::new(CapturingOutputSink::new()); - let mut worker = make_worker(agg_configs, sink.clone(), false, 0, LateDataPolicy::Drop); + let mut worker = + make_worker(arc_configs(agg_configs), sink.clone(), false, 0, LateDataPolicy::Drop); // Sample at t=15000ms → goes to pane 10000ms // previous_wm == i64::MIN → no windows close @@ -846,7 +844,8 @@ mod tests { agg_configs.insert(3, config); let sink = Arc::new(CapturingOutputSink::new()); - let mut worker = make_worker(agg_configs, sink.clone(), false, 0, LateDataPolicy::Drop); + let mut worker = + make_worker(arc_configs(agg_configs), sink.clone(), false, 0, LateDataPolicy::Drop); // Feed two series in the same window [0, 10000ms) worker @@ -909,7 +908,7 @@ mod tests { let sink = Arc::new(CapturingOutputSink::new()); let mut worker = make_worker( - agg_configs.clone(), + arc_configs(agg_configs.clone()), sink.clone(), false, 0, @@ -994,7 +993,7 @@ mod tests { let sink = Arc::new(CapturingOutputSink::new()); let mut worker = make_worker( - agg_configs.clone(), + arc_configs(agg_configs.clone()), sink.clone(), false, 0, @@ -1091,12 +1090,14 @@ mod tests { 0, rx, sink.clone(), - agg_configs, - 10_000, // max_buffer_per_series - 0, // allowed_lateness_ms - false, // pass_raw_samples - 0, // raw_mode_aggregation_id - LateDataPolicy::Drop, + arc_configs(agg_configs), + WorkerRuntimeConfig { + max_buffer_per_series: 10_000, + allowed_lateness_ms: 0, + pass_raw_samples: false, + raw_mode_aggregation_id: 0, + late_data_policy: LateDataPolicy::Drop, + }, ); // Establish watermark at t=20000ms (closes [0, 10000) and [10000, 20000)) @@ -1133,12 +1134,14 @@ mod tests { 0, rx, sink.clone(), - agg_configs, - 10_000, // max_buffer_per_series - 15_000, // allowed_lateness_ms - false, // pass_raw_samples - 0, // raw_mode_aggregation_id - LateDataPolicy::ForwardToStore, + arc_configs(agg_configs), + WorkerRuntimeConfig { + max_buffer_per_series: 10_000, + allowed_lateness_ms: 15_000, + pass_raw_samples: false, + raw_mode_aggregation_id: 0, + late_data_policy: LateDataPolicy::ForwardToStore, + }, ); // Seed pane 0, then advance watermark to 20000 (evicts pane 0) @@ -1213,7 +1216,7 @@ aggregations: "aggregation 10 should be present" ); - let agg_configs = streaming_config.get_all_aggregation_configs().clone(); + let agg_configs = arc_configs(streaming_config.get_all_aggregation_configs().clone()); let sink = Arc::new(CapturingOutputSink::new()); let mut worker = make_worker(agg_configs, sink.clone(), false, 0, LateDataPolicy::Drop); diff --git a/asap-query-engine/tests/e2e_precompute_equivalence.rs b/asap-query-engine/tests/e2e_precompute_equivalence.rs new file mode 100644 index 0000000..f211afe --- /dev/null +++ b/asap-query-engine/tests/e2e_precompute_equivalence.rs @@ -0,0 +1,317 @@ +//! End-to-end integration tests: precompute engine output equivalence +//! with ArroYo sketch format. +//! +//! Each test: +//! 1. Starts a PrecomputeEngine backed by a CapturingOutputSink +//! 2. Sends Prometheus remote write samples via HTTP (Snappy-compressed protobuf) +//! 3. Advances the watermark past the window boundary to close it +//! 4. Drains captured outputs and verifies equivalence with ArroYo-format accumulators + +use flate2::{write::GzEncoder, Compression}; +use prost::Message; +use serde_json::json; +use sketch_db_common::aggregation_config::AggregationConfig; +use sketch_core::kll::KllSketch; +use std::collections::HashMap; +use std::io::Write; +use std::sync::Arc; + +use query_engine_rust::data_model::{PrecomputedOutput, StreamingConfig}; +use query_engine_rust::drivers::ingest::prometheus_remote_write::{ + Label, Sample, TimeSeries, WriteRequest, +}; +use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig}; +use query_engine_rust::precompute_engine::output_sink::CapturingOutputSink; +use query_engine_rust::precompute_engine::PrecomputeEngine; +use query_engine_rust::precompute_operators::datasketches_kll_accumulator::DatasketchesKLLAccumulator; +use query_engine_rust::precompute_operators::multiple_sum_accumulator::MultipleSumAccumulator; + +// ─── helpers ──────────────────────────────────────────────────────────────── + +fn make_agg_config( + id: u64, + metric: &str, + agg_type: &str, + agg_sub_type: &str, + window_secs: u64, + slide_secs: u64, + grouping: Vec<&str>, +) -> AggregationConfig { + let window_type = if slide_secs == 0 || slide_secs == window_secs { + "tumbling" + } else { + "sliding" + }; + AggregationConfig::new( + id, + agg_type.to_string(), + agg_sub_type.to_string(), + HashMap::new(), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new( + grouping.iter().map(|s| s.to_string()).collect(), + ), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + String::new(), + window_secs, + slide_secs, + window_type.to_string(), + metric.to_string(), + metric.to_string(), + None, + None, + None, + ) +} + +fn make_timeseries(metric: &str, extra_labels: Vec<(&str, &str)>, ts_ms: i64, value: f64) -> TimeSeries { + let mut labels = vec![Label { + name: "__name__".into(), + value: metric.into(), + }]; + for (k, v) in extra_labels { + labels.push(Label { + name: k.into(), + value: v.into(), + }); + } + TimeSeries { + labels, + samples: vec![Sample { + value, + timestamp: ts_ms, + }], + } +} + +fn build_remote_write_body(timeseries: Vec) -> Vec { + let write_req = WriteRequest { timeseries }; + let proto_bytes = write_req.encode_to_vec(); + snap::raw::Encoder::new() + .compress_vec(&proto_bytes) + .expect("snappy compress failed") +} + +async fn send_remote_write(client: &reqwest::Client, port: u16, timeseries: Vec) { + let body = build_remote_write_body(timeseries); + let resp = client + .post(format!("http://localhost:{port}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(body) + .send() + .await + .expect("HTTP send failed"); + assert!( + resp.status().as_u16() == 204, + "ingest returned unexpected status {}", + resp.status() + ); +} + +fn engine_config(port: u16) -> PrecomputeEngineConfig { + PrecomputeEngineConfig { + num_workers: 2, + ingest_port: port, + allowed_lateness_ms: 0, + max_buffer_per_series: 10_000, + flush_interval_ms: 100, + channel_buffer_size: 10_000, + pass_raw_samples: false, + raw_mode_aggregation_id: 0, + late_data_policy: LateDataPolicy::Drop, + } +} + +fn gzip_hex(bytes: &[u8]) -> String { + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(bytes).unwrap(); + hex::encode(encoder.finish().unwrap()) +} + +// ─── test 1: DatasketchesKLL output matches ArroYo KLL ────────────────────── + +/// Full e2e: send KLL samples through the HTTP ingest → PrecomputeEngine stack, +/// then verify the emitted DatasketchesKLLAccumulator matches what ArroYo's +/// KllSketch::aggregate_kll would produce for the same values. +#[tokio::test] +async fn e2e_kll_output_matches_arroyo() { + let port = 19400u16; + let agg_id = 1u64; + let window_secs = 10u64; + let k = 20u16; + + let mut kll_config = make_agg_config(agg_id, "latency", "DatasketchesKLL", "", window_secs, 0, vec![]); + kll_config + .parameters + .insert("K".to_string(), serde_json::Value::from(k as u64)); + + let mut agg_map = HashMap::new(); + agg_map.insert(agg_id, kll_config); + let streaming_config = Arc::new(StreamingConfig::new(agg_map.clone())); + + let sink = Arc::new(CapturingOutputSink::new()); + let engine = PrecomputeEngine::new(engine_config(port), streaming_config, sink.clone()); + tokio::spawn(async move { + let _ = engine.run().await; + }); + // Wait for the HTTP server to bind + tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; + + let client = reqwest::Client::new(); + let values = [10.0f64, 20.0, 30.0]; + + // Three samples inside window [0ms, 10_000ms) + for (i, &v) in values.iter().enumerate() { + let ts_ms = (i as i64 + 1) * 1_000; + send_remote_write(&client, port, vec![make_timeseries("latency", vec![], ts_ms, v)]).await; + } + + // Advance watermark past window end to trigger close + send_remote_write(&client, port, vec![make_timeseries("latency", vec![], 15_000, 0.0)]).await; + + // Wait for flush + tokio::time::sleep(tokio::time::Duration::from_millis(600)).await; + + let captured = sink.drain(); + assert_eq!( + captured.len(), + 1, + "expected exactly one closed window output; got {}", + captured.len() + ); + + let (handcrafted_output, handcrafted_acc_box) = &captured[0]; + let handcrafted_acc = handcrafted_acc_box + .as_any() + .downcast_ref::() + .expect("captured accumulator should be DatasketchesKLLAccumulator"); + + // Build the ArroYo-format equivalent and deserialize it + let arroyo_bytes = KllSketch::aggregate_kll(k, &values).expect("KllSketch::aggregate_kll failed"); + let arroyo_json = json!({ + "aggregation_id": agg_id, + "window": { "start": "1970-01-01T00:00:00", "end": "1970-01-01T00:00:10" }, + "key": "", + "precompute": gzip_hex(&arroyo_bytes), + }); + let streaming_config_for_deser = StreamingConfig::new(agg_map); + let (_arroyo_output, arroyo_acc_box) = + PrecomputedOutput::deserialize_from_json_arroyo(&arroyo_json, &streaming_config_for_deser) + .expect("ArroYo KLL deserialization failed"); + let arroyo_acc = arroyo_acc_box + .as_any() + .downcast_ref::() + .expect("ArroYo payload should deserialize to DatasketchesKLLAccumulator"); + + // Window metadata + assert_eq!(handcrafted_output.aggregation_id, agg_id); + assert_eq!(handcrafted_output.start_timestamp, 0); + assert_eq!(handcrafted_output.end_timestamp, window_secs * 1_000); + + // Sketch contents + assert_eq!(handcrafted_acc.inner.k, arroyo_acc.inner.k, "KLL k mismatch"); + assert_eq!( + handcrafted_acc.inner.sketch.get_n(), + arroyo_acc.inner.sketch.get_n(), + "KLL sample count mismatch" + ); + for q in [0.0f64, 0.25, 0.5, 0.75, 1.0] { + assert_eq!( + handcrafted_acc.get_quantile(q), + arroyo_acc.get_quantile(q), + "KLL quantile {q} mismatch" + ); + } +} + +// ─── test 2: MultipleSum output matches ArroYo MultipleSum ────────────────── + +/// Full e2e: send MultipleSum samples (grouped by "host") through the HTTP +/// ingest → PrecomputeEngine stack, then verify the emitted +/// MultipleSumAccumulator matches the ArroYo MessagePack-encoded sums map. +#[tokio::test] +async fn e2e_multiple_sum_output_matches_arroyo() { + let port = 19401u16; + let agg_id = 2u64; + let window_secs = 10u64; + + let config = make_agg_config(agg_id, "cpu", "MultipleSum", "sum", window_secs, 0, vec!["host"]); + let mut agg_map = HashMap::new(); + agg_map.insert(agg_id, config); + let streaming_config = Arc::new(StreamingConfig::new(agg_map.clone())); + + let sink = Arc::new(CapturingOutputSink::new()); + let engine = PrecomputeEngine::new(engine_config(port), streaming_config, sink.clone()); + tokio::spawn(async move { + let _ = engine.run().await; + }); + tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; + + let client = reqwest::Client::new(); + + // Three samples for host=A inside window [0ms, 10_000ms): sum = 1+2+3 = 6 + for (ts, v) in [(1_000i64, 1.0f64), (5_000, 2.0), (9_000, 3.0)] { + send_remote_write( + &client, + port, + vec![make_timeseries("cpu", vec![("host", "A")], ts, v)], + ) + .await; + } + + // Advance watermark to close the window + send_remote_write( + &client, + port, + vec![make_timeseries("cpu", vec![("host", "A")], 15_000, 0.0)], + ) + .await; + + tokio::time::sleep(tokio::time::Duration::from_millis(600)).await; + + let captured = sink.drain(); + assert_eq!( + captured.len(), + 1, + "expected one closed window output; got {}", + captured.len() + ); + + let (handcrafted_output, handcrafted_acc_box) = &captured[0]; + let handcrafted_acc = handcrafted_acc_box + .as_any() + .downcast_ref::() + .expect("captured accumulator should be MultipleSumAccumulator"); + + // Build the ArroYo-format equivalent and deserialize it + let mut expected_sums: HashMap = HashMap::new(); + expected_sums.insert("A".to_string(), 6.0); + let arroyo_bytes = rmp_serde::to_vec(&expected_sums).expect("msgpack encoding failed"); + let arroyo_json = json!({ + "aggregation_id": agg_id, + "window": { "start": "1970-01-01T00:00:00", "end": "1970-01-01T00:00:10" }, + "key": "A", + "precompute": gzip_hex(&arroyo_bytes), + }); + let streaming_config_for_deser = StreamingConfig::new(agg_map); + let (_arroyo_output, arroyo_acc_box) = + PrecomputedOutput::deserialize_from_json_arroyo(&arroyo_json, &streaming_config_for_deser) + .expect("ArroYo MultipleSum deserialization failed"); + let arroyo_acc = arroyo_acc_box + .as_any() + .downcast_ref::() + .expect("ArroYo payload should deserialize to MultipleSumAccumulator"); + + // Window metadata + assert_eq!(handcrafted_output.aggregation_id, agg_id); + assert_eq!(handcrafted_output.start_timestamp, 0); + assert_eq!(handcrafted_output.end_timestamp, window_secs * 1_000); + + // Accumulator contents + assert_eq!( + handcrafted_acc.sums, + arroyo_acc.sums, + "MultipleSum sums map mismatch" + ); +} From 5980bf8e5edd93f050cd08c05671c81ef54d6821 Mon Sep 17 00:00:00 2001 From: zz_y Date: Mon, 6 Apr 2026 20:55:22 -0500 Subject: [PATCH 2/3] style: fix rustfmt formatting across PR F files Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/bin/bench_precompute_sketch.rs | 30 ++++++----- .../precompute_engine/accumulator_factory.rs | 5 +- .../src/precompute_engine/worker.rs | 35 +++++++----- .../tests/e2e_precompute_equivalence.rs | 54 +++++++++++++++---- 4 files changed, 87 insertions(+), 37 deletions(-) diff --git a/asap-query-engine/src/bin/bench_precompute_sketch.rs b/asap-query-engine/src/bin/bench_precompute_sketch.rs index 41a2322..db9c6f7 100644 --- a/asap-query-engine/src/bin/bench_precompute_sketch.rs +++ b/asap-query-engine/src/bin/bench_precompute_sketch.rs @@ -1,6 +1,8 @@ use clap::Parser; use prost::Message; -use query_engine_rust::data_model::{AggregateCore, CleanupPolicy, LockStrategy, PrecomputedOutput, StreamingConfig}; +use query_engine_rust::data_model::{ + AggregateCore, CleanupPolicy, LockStrategy, PrecomputedOutput, StreamingConfig, +}; use query_engine_rust::drivers::ingest::prometheus_remote_write::{ Label, Sample, TimeSeries, WriteRequest, }; @@ -114,7 +116,11 @@ fn make_timeseries(metric: &str, label_0: &str, samples: Vec) -> TimeSer } } -fn make_kll_streaming_config(aggregation_id: u64, window_size_secs: u64, k: u16) -> Arc { +fn make_kll_streaming_config( + aggregation_id: u64, + window_size_secs: u64, + k: u16, +) -> Arc { let mut params = HashMap::new(); params.insert("K".to_string(), serde_json::Value::from(k as u64)); @@ -293,11 +299,8 @@ async fn run_latency_benchmark( window_start_ms, window_size_ms, ); - let watermark_body = build_watermark_body( - metric, - args.num_series, - window_start_ms + window_size_ms, - ); + let watermark_body = + build_watermark_body(metric, args.num_series, window_start_ms + window_size_ms); let t0 = Instant::now(); post_body(client, args.latency_port, batch_body).await?; @@ -320,7 +323,10 @@ async fn run_latency_benchmark( 0, ((args.latency_repetitions as u64) + 10) * args.window_size_secs * 1000, )?; - let stored_windows: usize = latency_store_results.values().map(|buckets| buckets.len()).sum(); + let stored_windows: usize = latency_store_results + .values() + .map(|buckets| buckets.len()) + .sum(); println!("\n=== DatasketchesKLL latency benchmark ==="); println!( @@ -411,12 +417,8 @@ async fn run_throughput_benchmark( post_body(client, args.throughput_port, final_watermark).await?; let send_elapsed = throughput_start.elapsed(); - let wait_elapsed = wait_for_emitted_outputs( - &sink, - expected_outputs, - Duration::from_secs(60), - ) - .await?; + let wait_elapsed = + wait_for_emitted_outputs(&sink, expected_outputs, Duration::from_secs(60)).await?; let total_elapsed = throughput_start.elapsed(); let store_results = store.query_precomputed_output( diff --git a/asap-query-engine/src/precompute_engine/accumulator_factory.rs b/asap-query-engine/src/precompute_engine/accumulator_factory.rs index 500b122..78f5d46 100644 --- a/asap-query-engine/src/precompute_engine/accumulator_factory.rs +++ b/asap-query-engine/src/precompute_engine/accumulator_factory.rs @@ -755,7 +755,10 @@ mod tests { assert!(!config_is_keyed(&make_config("Increase", ""))); // Keyed types - assert!(config_is_keyed(&make_config("MultipleSubpopulation", "Sum"))); + assert!(config_is_keyed(&make_config( + "MultipleSubpopulation", + "Sum" + ))); assert!(config_is_keyed(&make_config("MultipleSum", ""))); assert!(config_is_keyed(&make_config("MultipleIncrease", ""))); assert!(config_is_keyed(&make_config("MultipleMinMax", ""))); diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index 3d73ceb..c10c4d6 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -666,10 +666,7 @@ mod tests { fn arc_configs( configs: HashMap, ) -> HashMap> { - configs - .into_iter() - .map(|(k, v)| (k, Arc::new(v))) - .collect() + configs.into_iter().map(|(k, v)| (k, Arc::new(v))).collect() } // ----------------------------------------------------------------------- @@ -679,8 +676,7 @@ mod tests { #[test] fn test_raw_mode_forwarding() { let sink = Arc::new(CapturingOutputSink::new()); - let mut worker = - make_worker(HashMap::new(), sink.clone(), true, 99, LateDataPolicy::Drop); + let mut worker = make_worker(HashMap::new(), sink.clone(), true, 99, LateDataPolicy::Drop); let samples = vec![(1000_i64, 1.5_f64), (2000, 2.5), (3000, 7.0)]; worker @@ -717,8 +713,13 @@ mod tests { agg_configs.insert(1, config); let sink = Arc::new(CapturingOutputSink::new()); - let mut worker = - make_worker(arc_configs(agg_configs), sink.clone(), false, 0, LateDataPolicy::Drop); + let mut worker = make_worker( + arc_configs(agg_configs), + sink.clone(), + false, + 0, + LateDataPolicy::Drop, + ); // Samples in window [0, 10000ms): sum should be 1+2+3=6. // Send one at a time so the watermark advances incrementally — @@ -776,8 +777,13 @@ mod tests { agg_configs.insert(2, config); let sink = Arc::new(CapturingOutputSink::new()); - let mut worker = - make_worker(arc_configs(agg_configs), sink.clone(), false, 0, LateDataPolicy::Drop); + let mut worker = make_worker( + arc_configs(agg_configs), + sink.clone(), + false, + 0, + LateDataPolicy::Drop, + ); // Sample at t=15000ms → goes to pane 10000ms // previous_wm == i64::MIN → no windows close @@ -844,8 +850,13 @@ mod tests { agg_configs.insert(3, config); let sink = Arc::new(CapturingOutputSink::new()); - let mut worker = - make_worker(arc_configs(agg_configs), sink.clone(), false, 0, LateDataPolicy::Drop); + let mut worker = make_worker( + arc_configs(agg_configs), + sink.clone(), + false, + 0, + LateDataPolicy::Drop, + ); // Feed two series in the same window [0, 10000ms) worker diff --git a/asap-query-engine/tests/e2e_precompute_equivalence.rs b/asap-query-engine/tests/e2e_precompute_equivalence.rs index f211afe..d88ca8d 100644 --- a/asap-query-engine/tests/e2e_precompute_equivalence.rs +++ b/asap-query-engine/tests/e2e_precompute_equivalence.rs @@ -10,8 +10,8 @@ use flate2::{write::GzEncoder, Compression}; use prost::Message; use serde_json::json; -use sketch_db_common::aggregation_config::AggregationConfig; use sketch_core::kll::KllSketch; +use sketch_db_common::aggregation_config::AggregationConfig; use std::collections::HashMap; use std::io::Write; use std::sync::Arc; @@ -64,7 +64,12 @@ fn make_agg_config( ) } -fn make_timeseries(metric: &str, extra_labels: Vec<(&str, &str)>, ts_ms: i64, value: f64) -> TimeSeries { +fn make_timeseries( + metric: &str, + extra_labels: Vec<(&str, &str)>, + ts_ms: i64, + value: f64, +) -> TimeSeries { let mut labels = vec![Label { name: "__name__".into(), value: metric.into(), @@ -141,7 +146,15 @@ async fn e2e_kll_output_matches_arroyo() { let window_secs = 10u64; let k = 20u16; - let mut kll_config = make_agg_config(agg_id, "latency", "DatasketchesKLL", "", window_secs, 0, vec![]); + let mut kll_config = make_agg_config( + agg_id, + "latency", + "DatasketchesKLL", + "", + window_secs, + 0, + vec![], + ); kll_config .parameters .insert("K".to_string(), serde_json::Value::from(k as u64)); @@ -164,11 +177,21 @@ async fn e2e_kll_output_matches_arroyo() { // Three samples inside window [0ms, 10_000ms) for (i, &v) in values.iter().enumerate() { let ts_ms = (i as i64 + 1) * 1_000; - send_remote_write(&client, port, vec![make_timeseries("latency", vec![], ts_ms, v)]).await; + send_remote_write( + &client, + port, + vec![make_timeseries("latency", vec![], ts_ms, v)], + ) + .await; } // Advance watermark past window end to trigger close - send_remote_write(&client, port, vec![make_timeseries("latency", vec![], 15_000, 0.0)]).await; + send_remote_write( + &client, + port, + vec![make_timeseries("latency", vec![], 15_000, 0.0)], + ) + .await; // Wait for flush tokio::time::sleep(tokio::time::Duration::from_millis(600)).await; @@ -188,7 +211,8 @@ async fn e2e_kll_output_matches_arroyo() { .expect("captured accumulator should be DatasketchesKLLAccumulator"); // Build the ArroYo-format equivalent and deserialize it - let arroyo_bytes = KllSketch::aggregate_kll(k, &values).expect("KllSketch::aggregate_kll failed"); + let arroyo_bytes = + KllSketch::aggregate_kll(k, &values).expect("KllSketch::aggregate_kll failed"); let arroyo_json = json!({ "aggregation_id": agg_id, "window": { "start": "1970-01-01T00:00:00", "end": "1970-01-01T00:00:10" }, @@ -210,7 +234,10 @@ async fn e2e_kll_output_matches_arroyo() { assert_eq!(handcrafted_output.end_timestamp, window_secs * 1_000); // Sketch contents - assert_eq!(handcrafted_acc.inner.k, arroyo_acc.inner.k, "KLL k mismatch"); + assert_eq!( + handcrafted_acc.inner.k, arroyo_acc.inner.k, + "KLL k mismatch" + ); assert_eq!( handcrafted_acc.inner.sketch.get_n(), arroyo_acc.inner.sketch.get_n(), @@ -236,7 +263,15 @@ async fn e2e_multiple_sum_output_matches_arroyo() { let agg_id = 2u64; let window_secs = 10u64; - let config = make_agg_config(agg_id, "cpu", "MultipleSum", "sum", window_secs, 0, vec!["host"]); + let config = make_agg_config( + agg_id, + "cpu", + "MultipleSum", + "sum", + window_secs, + 0, + vec!["host"], + ); let mut agg_map = HashMap::new(); agg_map.insert(agg_id, config); let streaming_config = Arc::new(StreamingConfig::new(agg_map.clone())); @@ -310,8 +345,7 @@ async fn e2e_multiple_sum_output_matches_arroyo() { // Accumulator contents assert_eq!( - handcrafted_acc.sums, - arroyo_acc.sums, + handcrafted_acc.sums, arroyo_acc.sums, "MultipleSum sums map mismatch" ); } From 1c8c533279e27bc5fae9e084855b048b636ee9b7 Mon Sep 17 00:00:00 2001 From: zz_y Date: Mon, 6 Apr 2026 20:57:09 -0500 Subject: [PATCH 3/3] fix: add WorkerRuntimeConfig struct + fix compilation errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Define WorkerRuntimeConfig struct and destructure it in Worker::new - Update engine.rs to construct WorkerRuntimeConfig when creating workers - Fix make_worker test helper to use WorkerRuntimeConfig - Fix AggregationConfig::new calls in accumulator_factory tests (17-arg) - Fix .inner.sketch.get_n() → .inner.count() in e2e test Co-Authored-By: Claude Opus 4.6 (1M context) --- .../precompute_engine/accumulator_factory.rs | 10 +++---- .../src/precompute_engine/engine.rs | 14 +++++---- .../src/precompute_engine/worker.rs | 29 +++++++++++++++---- .../tests/e2e_precompute_equivalence.rs | 4 +-- 4 files changed, 37 insertions(+), 20 deletions(-) diff --git a/asap-query-engine/src/precompute_engine/accumulator_factory.rs b/asap-query-engine/src/precompute_engine/accumulator_factory.rs index 78f5d46..b446b19 100644 --- a/asap-query-engine/src/precompute_engine/accumulator_factory.rs +++ b/asap-query-engine/src/precompute_engine/accumulator_factory.rs @@ -735,13 +735,12 @@ mod tests { promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), String::new(), 60, + 0, + "tumbling".to_string(), "m".to_string(), "m".to_string(), None, None, - Some(60), - Some(0), - None, None, None, ) @@ -801,13 +800,12 @@ mod tests { promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), String::new(), 60, + 0, + "tumbling".to_string(), "m".to_string(), "m".to_string(), None, None, - Some(60), - Some(0), - None, None, None, ); diff --git a/asap-query-engine/src/precompute_engine/engine.rs b/asap-query-engine/src/precompute_engine/engine.rs index 2765671..cd6b428 100644 --- a/asap-query-engine/src/precompute_engine/engine.rs +++ b/asap-query-engine/src/precompute_engine/engine.rs @@ -5,7 +5,7 @@ use crate::precompute_engine::ingest_handler::{ }; use crate::precompute_engine::output_sink::OutputSink; use crate::precompute_engine::series_router::{SeriesRouter, WorkerMessage}; -use crate::precompute_engine::worker::Worker; +use crate::precompute_engine::worker::{Worker, WorkerRuntimeConfig}; use axum::{routing::post, Router}; use sketch_db_common::aggregation_config::AggregationConfig; use std::collections::HashMap; @@ -71,11 +71,13 @@ impl PrecomputeEngine { rx, self.output_sink.clone(), agg_configs.clone(), - self.config.max_buffer_per_series, - self.config.allowed_lateness_ms, - self.config.pass_raw_samples, - self.config.raw_mode_aggregation_id, - self.config.late_data_policy, + WorkerRuntimeConfig { + max_buffer_per_series: self.config.max_buffer_per_series, + allowed_lateness_ms: self.config.allowed_lateness_ms, + pass_raw_samples: self.config.pass_raw_samples, + raw_mode_aggregation_id: self.config.raw_mode_aggregation_id, + late_data_policy: self.config.late_data_policy, + }, ); let handle = tokio::spawn(async move { worker.run().await; diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index c10c4d6..f8459d7 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -37,6 +37,15 @@ struct SeriesState { aggregations: Vec, } +/// Runtime configuration for a Worker, grouping non-structural parameters. +pub struct WorkerRuntimeConfig { + pub max_buffer_per_series: usize, + pub allowed_lateness_ms: i64, + pub pass_raw_samples: bool, + pub raw_mode_aggregation_id: u64, + pub late_data_policy: LateDataPolicy, +} + /// Worker that processes samples for a shard of the series space. pub struct Worker { id: usize, @@ -59,7 +68,6 @@ pub struct Worker { } impl Worker { - #[allow(clippy::too_many_arguments)] pub fn new( id: usize, receiver: mpsc::Receiver, @@ -67,6 +75,13 @@ impl Worker { agg_configs: HashMap>, runtime_config: WorkerRuntimeConfig, ) -> Self { + let WorkerRuntimeConfig { + max_buffer_per_series, + allowed_lateness_ms, + pass_raw_samples, + raw_mode_aggregation_id, + late_data_policy, + } = runtime_config; Self { id, receiver, @@ -654,11 +669,13 @@ mod tests { rx, sink, agg_configs, - 10_000, // max_buffer_per_series - 0, // allowed_lateness_ms - pass_raw, - raw_agg_id, - late_policy, + WorkerRuntimeConfig { + max_buffer_per_series: 10_000, + allowed_lateness_ms: 0, + pass_raw_samples: pass_raw, + raw_mode_aggregation_id: raw_agg_id, + late_data_policy: late_policy, + }, ) } diff --git a/asap-query-engine/tests/e2e_precompute_equivalence.rs b/asap-query-engine/tests/e2e_precompute_equivalence.rs index d88ca8d..7629e26 100644 --- a/asap-query-engine/tests/e2e_precompute_equivalence.rs +++ b/asap-query-engine/tests/e2e_precompute_equivalence.rs @@ -239,8 +239,8 @@ async fn e2e_kll_output_matches_arroyo() { "KLL k mismatch" ); assert_eq!( - handcrafted_acc.inner.sketch.get_n(), - arroyo_acc.inner.sketch.get_n(), + handcrafted_acc.inner.count(), + arroyo_acc.inner.count(), "KLL sample count mismatch" ); for q in [0.0f64, 0.25, 0.5, 0.75, 1.0] {