From 49a02560f522342142d0ec82b6fb14112595ca67 Mon Sep 17 00:00:00 2001 From: zz_y Date: Sat, 21 Feb 2026 15:23:09 -0700 Subject: [PATCH 01/12] Add standalone precompute engine to replace Arroyo streaming pipeline Implements a single-node multi-threaded precompute engine as a new module and binary target within QueryEngineRust. The engine ingests Prometheus remote write samples, buffers them per-series with out-of-order handling, detects closed tumbling/sliding windows via event-time watermarks, feeds samples into accumulator wrappers for all existing sketch types, and emits PrecomputedOutput directly to the store. New modules: config, series_buffer, window_manager, accumulator_factory, series_router, worker, output_sink, and the PrecomputeEngine orchestrator. The binary supports embedded store + query HTTP server for single-process deployment. Co-Authored-By: Claude Opus 4.6 --- asap-query-engine/Cargo.toml | 4 + .../src/bin/precompute_engine.rs | 138 ++++ asap-query-engine/src/lib.rs | 1 + .../precompute_engine/accumulator_factory.rs | 661 ++++++++++++++++++ .../src/precompute_engine/config.rs | 48 ++ .../src/precompute_engine/mod.rs | 177 +++++ .../src/precompute_engine/output_sink.rs | 83 +++ .../src/precompute_engine/series_buffer.rs | 161 +++++ .../src/precompute_engine/series_router.rs | 103 +++ .../src/precompute_engine/window_manager.rs | 154 ++++ .../src/precompute_engine/worker.rs | 472 +++++++++++++ 11 files changed, 2002 insertions(+) create mode 100644 asap-query-engine/src/bin/precompute_engine.rs create mode 100644 asap-query-engine/src/precompute_engine/accumulator_factory.rs create mode 100644 asap-query-engine/src/precompute_engine/config.rs create mode 100644 asap-query-engine/src/precompute_engine/mod.rs create mode 100644 asap-query-engine/src/precompute_engine/output_sink.rs create mode 100644 asap-query-engine/src/precompute_engine/series_buffer.rs create mode 100644 asap-query-engine/src/precompute_engine/series_router.rs create mode 100644 asap-query-engine/src/precompute_engine/window_manager.rs create mode 100644 asap-query-engine/src/precompute_engine/worker.rs diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index 6c031ae..ab5c437 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -59,6 +59,10 @@ tracing-appender = "0.2" elastic_dsl_utilities.workspace = true sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust", rev = "440427438fdaf3ac2298b53ee148f9e12a64ffcc" } +[[bin]] +name = "precompute_engine" +path = "src/bin/precompute_engine.rs" + [dev-dependencies] ctor = "0.2" tempfile = "3.20.0" diff --git a/asap-query-engine/src/bin/precompute_engine.rs b/asap-query-engine/src/bin/precompute_engine.rs new file mode 100644 index 0000000..70b90f3 --- /dev/null +++ b/asap-query-engine/src/bin/precompute_engine.rs @@ -0,0 +1,138 @@ +use clap::Parser; +use query_engine_rust::data_model::{ + CleanupPolicy, InferenceConfig, LockStrategy, StreamingConfig, +}; +use query_engine_rust::drivers::query::adapters::AdapterConfig; +use query_engine_rust::engines::SimpleEngine; +use query_engine_rust::precompute_engine::config::PrecomputeEngineConfig; +use query_engine_rust::precompute_engine::output_sink::StoreOutputSink; +use query_engine_rust::precompute_engine::PrecomputeEngine; +use query_engine_rust::stores::SimpleMapStore; +use query_engine_rust::{HttpServer, HttpServerConfig}; +use sketch_db_common::enums::QueryLanguage; +use std::sync::Arc; +use tracing::info; + +#[derive(Parser, Debug)] +#[command(name = "precompute_engine")] +#[command(about = "Standalone precompute engine for SketchDB")] +struct Args { + /// Path to streaming config YAML file + #[arg(long)] + streaming_config: String, + + /// Port for Prometheus remote write ingest + #[arg(long, default_value_t = 9090)] + ingest_port: u16, + + /// Number of worker threads + #[arg(long, default_value_t = 4)] + num_workers: usize, + + /// Maximum allowed lateness for out-of-order samples (ms) + #[arg(long, default_value_t = 5000)] + allowed_lateness_ms: i64, + + /// Maximum buffered samples per series + #[arg(long, default_value_t = 10000)] + max_buffer_per_series: usize, + + /// Flush interval for idle window detection (ms) + #[arg(long, default_value_t = 1000)] + flush_interval_ms: u64, + + /// MPSC channel buffer size per worker + #[arg(long, default_value_t = 10000)] + channel_buffer_size: usize, + + /// Port for the query HTTP server (0 to disable) + #[arg(long, default_value_t = 8080)] + query_port: u16, + + /// Lock strategy for the store + #[arg(long, value_enum, default_value_t = LockStrategy::PerKey)] + lock_strategy: LockStrategy, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize tracing + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .init(); + + let args = Args::parse(); + + info!("Loading streaming config from: {}", args.streaming_config); + let streaming_config = Arc::new(StreamingConfig::from_yaml_file(&args.streaming_config)?); + + info!( + "Loaded {} aggregation configs", + streaming_config.get_all_aggregation_configs().len() + ); + + // Create the store + let store: Arc = Arc::new( + SimpleMapStore::new_with_strategy( + streaming_config.clone(), + CleanupPolicy::CircularBuffer, + args.lock_strategy, + ), + ); + + // Optionally start the query HTTP server + if args.query_port > 0 { + let inference_config = InferenceConfig::new( + QueryLanguage::promql, + CleanupPolicy::CircularBuffer, + ); + let query_engine = Arc::new(SimpleEngine::new( + store.clone(), + inference_config, + streaming_config.clone(), + 15, // default prometheus scrape interval + QueryLanguage::promql, + )); + let http_config = HttpServerConfig { + port: args.query_port, + handle_http_requests: true, + adapter_config: AdapterConfig { + protocol: query_engine_rust::data_model::QueryProtocol::PrometheusHttp, + language: QueryLanguage::promql, + fallback: None, + }, + }; + let http_server = HttpServer::new(http_config, query_engine, store.clone()); + tokio::spawn(async move { + if let Err(e) = http_server.run().await { + tracing::error!("Query server error: {}", e); + } + }); + info!("Query server started on port {}", args.query_port); + } + + // Build the precompute engine config + let engine_config = PrecomputeEngineConfig { + num_workers: args.num_workers, + ingest_port: args.ingest_port, + allowed_lateness_ms: args.allowed_lateness_ms, + max_buffer_per_series: args.max_buffer_per_series, + flush_interval_ms: args.flush_interval_ms, + channel_buffer_size: args.channel_buffer_size, + }; + + // Create the output sink (writes directly to the store) + let output_sink: Arc = + Arc::new(StoreOutputSink::new(store)); + + // Build and run the engine + let engine = PrecomputeEngine::new(engine_config, streaming_config, output_sink); + + info!("Starting precompute engine..."); + engine.run().await?; + + Ok(()) +} diff --git a/asap-query-engine/src/lib.rs b/asap-query-engine/src/lib.rs index e70ad09..f5363f6 100644 --- a/asap-query-engine/src/lib.rs +++ b/asap-query-engine/src/lib.rs @@ -14,6 +14,7 @@ fn init_sketch_backend_for_tests() { pub mod data_model; pub mod drivers; pub mod engines; +pub mod precompute_engine; pub mod precompute_operators; pub mod stores; diff --git a/asap-query-engine/src/precompute_engine/accumulator_factory.rs b/asap-query-engine/src/precompute_engine/accumulator_factory.rs new file mode 100644 index 0000000..cd44e37 --- /dev/null +++ b/asap-query-engine/src/precompute_engine/accumulator_factory.rs @@ -0,0 +1,661 @@ +use crate::data_model::{AggregateCore, KeyByLabelValues, Measurement}; +use crate::precompute_operators::{ + CountMinSketchAccumulator, DatasketchesKLLAccumulator, HydraKllSketchAccumulator, + IncreaseAccumulator, MinMaxAccumulator, MultipleIncreaseAccumulator, + MultipleMinMaxAccumulator, MultipleSumAccumulator, SumAccumulator, +}; +use sketch_db_common::aggregation_config::AggregationConfig; + +/// Trait for feeding samples into accumulators in the precompute engine. +/// +/// This provides a uniform interface over all accumulator types so that the +/// worker loop doesn't need to know which concrete type it's dealing with. +pub trait AccumulatorUpdater: Send { + /// Feed a single (value, timestamp_ms) pair — for SingleSubpopulation types. + fn update_single(&mut self, value: f64, timestamp_ms: i64); + + /// Feed a keyed (key, value, timestamp_ms) triple — for MultipleSubpopulation types. + fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, timestamp_ms: i64); + + /// Extract the final accumulator as a boxed `AggregateCore`. + fn take_accumulator(&mut self) -> Box; + + /// Reset internal state for reuse (avoids re-allocation). + fn reset(&mut self); + + /// Whether this updater is keyed (MultipleSubpopulation). + fn is_keyed(&self) -> bool; + + /// Estimated memory usage in bytes. + fn memory_usage_bytes(&self) -> usize; +} + +// --------------------------------------------------------------------------- +// SumAccumulatorUpdater +// --------------------------------------------------------------------------- + +pub struct SumAccumulatorUpdater { + acc: SumAccumulator, +} + +impl SumAccumulatorUpdater { + pub fn new() -> Self { + Self { + acc: SumAccumulator::new(), + } + } +} + +impl AccumulatorUpdater for SumAccumulatorUpdater { + fn update_single(&mut self, value: f64, _timestamp_ms: i64) { + self.acc.update(value); + } + + fn update_keyed(&mut self, _key: &KeyByLabelValues, value: f64, timestamp_ms: i64) { + self.update_single(value, timestamp_ms); + } + + fn take_accumulator(&mut self) -> Box { + let result = Box::new(self.acc.clone()); + self.reset(); + result + } + + fn reset(&mut self) { + self.acc = SumAccumulator::new(); + } + + fn is_keyed(&self) -> bool { + false + } + + fn memory_usage_bytes(&self) -> usize { + std::mem::size_of::() + } +} + +// --------------------------------------------------------------------------- +// MinMaxAccumulatorUpdater +// --------------------------------------------------------------------------- + +pub struct MinMaxAccumulatorUpdater { + acc: MinMaxAccumulator, + sub_type: String, +} + +impl MinMaxAccumulatorUpdater { + pub fn new(sub_type: String) -> Self { + Self { + acc: MinMaxAccumulator::new(sub_type.clone()), + sub_type, + } + } +} + +impl AccumulatorUpdater for MinMaxAccumulatorUpdater { + fn update_single(&mut self, value: f64, _timestamp_ms: i64) { + self.acc.update(value); + } + + fn update_keyed(&mut self, _key: &KeyByLabelValues, value: f64, timestamp_ms: i64) { + self.update_single(value, timestamp_ms); + } + + fn take_accumulator(&mut self) -> Box { + let result = Box::new(self.acc.clone()); + self.reset(); + result + } + + fn reset(&mut self) { + self.acc = MinMaxAccumulator::new(self.sub_type.clone()); + } + + fn is_keyed(&self) -> bool { + false + } + + fn memory_usage_bytes(&self) -> usize { + std::mem::size_of::() + } +} + +// --------------------------------------------------------------------------- +// IncreaseAccumulatorUpdater +// --------------------------------------------------------------------------- + +pub struct IncreaseAccumulatorUpdater { + acc: Option, +} + +impl IncreaseAccumulatorUpdater { + pub fn new() -> Self { + Self { acc: None } + } +} + +impl AccumulatorUpdater for IncreaseAccumulatorUpdater { + fn update_single(&mut self, value: f64, timestamp_ms: i64) { + let measurement = Measurement::new(value); + match &mut self.acc { + Some(acc) => acc.update(measurement, timestamp_ms), + None => { + self.acc = Some(IncreaseAccumulator::new( + measurement.clone(), + timestamp_ms, + measurement, + timestamp_ms, + )); + } + } + } + + fn update_keyed(&mut self, _key: &KeyByLabelValues, value: f64, timestamp_ms: i64) { + self.update_single(value, timestamp_ms); + } + + fn take_accumulator(&mut self) -> Box { + let acc = self.acc.take().unwrap_or_else(|| { + IncreaseAccumulator::new( + Measurement::new(0.0), + 0, + Measurement::new(0.0), + 0, + ) + }); + let result = Box::new(acc); + self.reset(); + result + } + + fn reset(&mut self) { + self.acc = None; + } + + fn is_keyed(&self) -> bool { + false + } + + fn memory_usage_bytes(&self) -> usize { + std::mem::size_of::>() + } +} + +// --------------------------------------------------------------------------- +// KllAccumulatorUpdater +// --------------------------------------------------------------------------- + +pub struct KllAccumulatorUpdater { + acc: DatasketchesKLLAccumulator, +} + +impl KllAccumulatorUpdater { + pub fn new(k: u16) -> Self { + Self { + acc: DatasketchesKLLAccumulator::new(k), + } + } +} + +impl AccumulatorUpdater for KllAccumulatorUpdater { + fn update_single(&mut self, value: f64, _timestamp_ms: i64) { + self.acc._update(value); + } + + fn update_keyed(&mut self, _key: &KeyByLabelValues, value: f64, timestamp_ms: i64) { + self.update_single(value, timestamp_ms); + } + + fn take_accumulator(&mut self) -> Box { + let result = Box::new(self.acc.clone()); + self.reset(); + result + } + + fn reset(&mut self) { + // Re-create with same k + self.acc = DatasketchesKLLAccumulator::new(200); // TODO: preserve k from config + } + + fn is_keyed(&self) -> bool { + false + } + + fn memory_usage_bytes(&self) -> usize { + // KLL sketch size is hard to estimate precisely; use a rough estimate + std::mem::size_of::() + 4096 + } +} + +// --------------------------------------------------------------------------- +// MultipleSumUpdater +// --------------------------------------------------------------------------- + +pub struct MultipleSumUpdater { + acc: MultipleSumAccumulator, +} + +impl MultipleSumUpdater { + pub fn new() -> Self { + Self { + acc: MultipleSumAccumulator::new(), + } + } +} + +impl AccumulatorUpdater for MultipleSumUpdater { + fn update_single(&mut self, _value: f64, _timestamp_ms: i64) { + // Multiple-subpopulation — use update_keyed instead + } + + fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) { + self.acc.update(key.clone(), value); + } + + fn take_accumulator(&mut self) -> Box { + let result = Box::new(self.acc.clone()); + self.reset(); + result + } + + fn reset(&mut self) { + self.acc = MultipleSumAccumulator::new(); + } + + fn is_keyed(&self) -> bool { + true + } + + fn memory_usage_bytes(&self) -> usize { + std::mem::size_of::() + + self.acc.sums.len() * (std::mem::size_of::() + 8) + } +} + +// --------------------------------------------------------------------------- +// MultipleMinMaxUpdater +// --------------------------------------------------------------------------- + +pub struct MultipleMinMaxUpdater { + acc: MultipleMinMaxAccumulator, + sub_type: String, +} + +impl MultipleMinMaxUpdater { + pub fn new(sub_type: String) -> Self { + Self { + acc: MultipleMinMaxAccumulator::new(sub_type.clone()), + sub_type, + } + } +} + +impl AccumulatorUpdater for MultipleMinMaxUpdater { + fn update_single(&mut self, _value: f64, _timestamp_ms: i64) { + // Multiple-subpopulation — use update_keyed instead + } + + fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) { + self.acc.update(key.clone(), value); + } + + fn take_accumulator(&mut self) -> Box { + let result = Box::new(self.acc.clone()); + self.reset(); + result + } + + fn reset(&mut self) { + self.acc = MultipleMinMaxAccumulator::new(self.sub_type.clone()); + } + + fn is_keyed(&self) -> bool { + true + } + + fn memory_usage_bytes(&self) -> usize { + std::mem::size_of::() + + self.acc.values.len() * (std::mem::size_of::() + 8) + } +} + +// --------------------------------------------------------------------------- +// MultipleIncreaseUpdater +// --------------------------------------------------------------------------- + +pub struct MultipleIncreaseUpdater { + acc: MultipleIncreaseAccumulator, +} + +impl MultipleIncreaseUpdater { + pub fn new() -> Self { + Self { + acc: MultipleIncreaseAccumulator::new(), + } + } +} + +impl AccumulatorUpdater for MultipleIncreaseUpdater { + fn update_single(&mut self, _value: f64, _timestamp_ms: i64) { + // Multiple-subpopulation — use update_keyed instead + } + + fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, timestamp_ms: i64) { + let measurement = Measurement::new(value); + // If key already exists, update it; otherwise create new + if self.acc.increases.contains_key(key) { + if let Some(existing) = self.acc.increases.get_mut(key) { + existing.update(measurement, timestamp_ms); + } + } else { + let new_acc = IncreaseAccumulator::new( + measurement.clone(), + timestamp_ms, + measurement, + timestamp_ms, + ); + self.acc.update(key.clone(), new_acc); + } + } + + fn take_accumulator(&mut self) -> Box { + let result = Box::new(self.acc.clone()); + self.reset(); + result + } + + fn reset(&mut self) { + self.acc = MultipleIncreaseAccumulator::new(); + } + + fn is_keyed(&self) -> bool { + true + } + + fn memory_usage_bytes(&self) -> usize { + std::mem::size_of::() + + self.acc.increases.len() + * (std::mem::size_of::() + + std::mem::size_of::()) + } +} + +// --------------------------------------------------------------------------- +// CmsAccumulatorUpdater (CountMinSketch) +// --------------------------------------------------------------------------- + +pub struct CmsAccumulatorUpdater { + acc: CountMinSketchAccumulator, + row_num: usize, + col_num: usize, +} + +impl CmsAccumulatorUpdater { + pub fn new(row_num: usize, col_num: usize) -> Self { + Self { + acc: CountMinSketchAccumulator::new(row_num, col_num), + row_num, + col_num, + } + } +} + +impl AccumulatorUpdater for CmsAccumulatorUpdater { + fn update_single(&mut self, _value: f64, _timestamp_ms: i64) { + // CMS is keyed — use update_keyed + } + + fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) { + // CMS._update is private, so we access the sketch directly + // We replicate the hash logic from CountMinSketchAccumulator._update + let key_json = key.serialize_to_json(); + let key_values: Vec = if let Some(obj) = key_json.as_object() { + obj.values() + .map(|v| v.as_str().unwrap_or("").to_string()) + .collect() + } else { + vec!["".to_string()] + }; + let key_str = key_values.join(";"); + let key_bytes = key_str.as_bytes(); + + for i in 0..self.acc.row_num { + let hash_value = xxhash_rust::xxh32::xxh32(key_bytes, i as u32); + let col_index = (hash_value as usize) % self.acc.col_num; + self.acc.sketch[i][col_index] += value; + } + } + + fn take_accumulator(&mut self) -> Box { + let result = Box::new(self.acc.clone()); + self.reset(); + result + } + + fn reset(&mut self) { + self.acc = CountMinSketchAccumulator::new(self.row_num, self.col_num); + } + + fn is_keyed(&self) -> bool { + true + } + + fn memory_usage_bytes(&self) -> usize { + std::mem::size_of::() + + self.row_num * self.col_num * std::mem::size_of::() + } +} + +// --------------------------------------------------------------------------- +// HydraKllAccumulatorUpdater +// --------------------------------------------------------------------------- + +pub struct HydraKllAccumulatorUpdater { + acc: HydraKllSketchAccumulator, + row_num: usize, + col_num: usize, + k: u16, +} + +impl HydraKllAccumulatorUpdater { + pub fn new(row_num: usize, col_num: usize, k: u16) -> Self { + Self { + acc: HydraKllSketchAccumulator::new(row_num, col_num, k), + row_num, + col_num, + k, + } + } +} + +impl AccumulatorUpdater for HydraKllAccumulatorUpdater { + fn update_single(&mut self, _value: f64, _timestamp_ms: i64) { + // HydraKLL is keyed — use update_keyed + } + + fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) { + self.acc.update(key, value); + } + + fn take_accumulator(&mut self) -> Box { + let result = Box::new(self.acc.clone()); + self.reset(); + result + } + + fn reset(&mut self) { + self.acc = HydraKllSketchAccumulator::new(self.row_num, self.col_num, self.k); + } + + fn is_keyed(&self) -> bool { + true + } + + fn memory_usage_bytes(&self) -> usize { + // Rough estimate: each cell is a KLL sketch + std::mem::size_of::() + + self.row_num * self.col_num * 4096 + } +} + +// --------------------------------------------------------------------------- +// Factory function +// --------------------------------------------------------------------------- + +/// Create an appropriate `AccumulatorUpdater` from an `AggregationConfig`. +pub fn create_accumulator_updater( + config: &AggregationConfig, +) -> Box { + let agg_type = config.aggregation_type.as_str(); + let sub_type = config.aggregation_sub_type.as_str(); + + match agg_type { + "SingleSubpopulation" => match sub_type { + "Sum" | "sum" => Box::new(SumAccumulatorUpdater::new()), + "Min" | "min" => Box::new(MinMaxAccumulatorUpdater::new("min".to_string())), + "Max" | "max" => Box::new(MinMaxAccumulatorUpdater::new("max".to_string())), + "Increase" | "increase" => Box::new(IncreaseAccumulatorUpdater::new()), + "DatasketchesKLL" | "datasketches_kll" | "KLL" | "kll" => { + let k = config + .parameters + .get("k") + .and_then(|v| v.as_u64()) + .unwrap_or(200) as u16; + Box::new(KllAccumulatorUpdater::new(k)) + } + other => { + tracing::warn!( + "Unknown SingleSubpopulation sub_type '{}', defaulting to Sum", + other + ); + Box::new(SumAccumulatorUpdater::new()) + } + }, + "MultipleSubpopulation" => match sub_type { + "Sum" | "sum" => Box::new(MultipleSumUpdater::new()), + "Min" | "min" => Box::new(MultipleMinMaxUpdater::new("min".to_string())), + "Max" | "max" => Box::new(MultipleMinMaxUpdater::new("max".to_string())), + "Increase" | "increase" => Box::new(MultipleIncreaseUpdater::new()), + "CountMinSketch" | "count_min_sketch" | "CMS" | "cms" => { + let row_num = config + .parameters + .get("row_num") + .and_then(|v| v.as_u64()) + .unwrap_or(4) as usize; + let col_num = config + .parameters + .get("col_num") + .and_then(|v| v.as_u64()) + .unwrap_or(1000) as usize; + Box::new(CmsAccumulatorUpdater::new(row_num, col_num)) + } + "HydraKLL" | "hydra_kll" => { + let row_num = config + .parameters + .get("row_num") + .and_then(|v| v.as_u64()) + .unwrap_or(4) as usize; + let col_num = config + .parameters + .get("col_num") + .and_then(|v| v.as_u64()) + .unwrap_or(1000) as usize; + let k = config + .parameters + .get("k") + .and_then(|v| v.as_u64()) + .unwrap_or(200) as u16; + Box::new(HydraKllAccumulatorUpdater::new(row_num, col_num, k)) + } + other => { + tracing::warn!( + "Unknown MultipleSubpopulation sub_type '{}', defaulting to Sum", + other + ); + Box::new(MultipleSumUpdater::new()) + } + }, + other => { + tracing::warn!( + "Unknown aggregation_type '{}', defaulting to SingleSubpopulation Sum", + other + ); + Box::new(SumAccumulatorUpdater::new()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_sum_updater() { + let mut updater = SumAccumulatorUpdater::new(); + assert!(!updater.is_keyed()); + + updater.update_single(1.0, 1000); + updater.update_single(2.0, 2000); + updater.update_single(3.0, 3000); + + let acc = updater.take_accumulator(); + assert_eq!(acc.type_name(), "SumAccumulator"); + } + + #[test] + fn test_minmax_updater() { + let mut updater = MinMaxAccumulatorUpdater::new("max".to_string()); + updater.update_single(5.0, 1000); + updater.update_single(3.0, 2000); + updater.update_single(7.0, 3000); + + let acc = updater.take_accumulator(); + assert_eq!(acc.type_name(), "MinMaxAccumulator"); + } + + #[test] + fn test_increase_updater() { + let mut updater = IncreaseAccumulatorUpdater::new(); + updater.update_single(10.0, 1000); + updater.update_single(15.0, 2000); + + let acc = updater.take_accumulator(); + assert_eq!(acc.type_name(), "IncreaseAccumulator"); + } + + #[test] + fn test_kll_updater() { + let mut updater = KllAccumulatorUpdater::new(200); + for i in 1..=10 { + updater.update_single(i as f64, i * 1000); + } + + let acc = updater.take_accumulator(); + assert_eq!(acc.type_name(), "DatasketchesKLLAccumulator"); + } + + #[test] + fn test_multiple_sum_updater() { + let mut updater = MultipleSumUpdater::new(); + assert!(updater.is_keyed()); + + let key_a = KeyByLabelValues::new_with_labels(vec!["a".to_string()]); + let key_b = KeyByLabelValues::new_with_labels(vec!["b".to_string()]); + + updater.update_keyed(&key_a, 1.0, 1000); + updater.update_keyed(&key_b, 2.0, 2000); + + let acc = updater.take_accumulator(); + assert_eq!(acc.type_name(), "MultipleSumAccumulator"); + } + + #[test] + fn test_reset_clears_state() { + let mut updater = SumAccumulatorUpdater::new(); + updater.update_single(100.0, 1000); + updater.reset(); + // After reset, should produce a fresh accumulator + let acc = updater.take_accumulator(); + assert_eq!(acc.type_name(), "SumAccumulator"); + } +} diff --git a/asap-query-engine/src/precompute_engine/config.rs b/asap-query-engine/src/precompute_engine/config.rs new file mode 100644 index 0000000..ba02a71 --- /dev/null +++ b/asap-query-engine/src/precompute_engine/config.rs @@ -0,0 +1,48 @@ +use serde::{Deserialize, Serialize}; + +/// Configuration for the precompute engine. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PrecomputeEngineConfig { + /// Number of worker threads for parallel processing. + pub num_workers: usize, + /// Port for the Prometheus remote write ingest endpoint. + pub ingest_port: u16, + /// Maximum allowed lateness for out-of-order samples (milliseconds). + /// Samples arriving later than this behind the watermark are dropped. + pub allowed_lateness_ms: i64, + /// Maximum number of buffered samples per series before oldest are evicted. + pub max_buffer_per_series: usize, + /// Interval at which the flush timer fires to close idle windows (milliseconds). + pub flush_interval_ms: u64, + /// Capacity of the MPSC channel between router and each worker. + pub channel_buffer_size: usize, +} + +impl Default for PrecomputeEngineConfig { + fn default() -> Self { + Self { + num_workers: 4, + ingest_port: 9090, + allowed_lateness_ms: 5_000, + max_buffer_per_series: 10_000, + flush_interval_ms: 1_000, + channel_buffer_size: 10_000, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_config() { + let config = PrecomputeEngineConfig::default(); + assert_eq!(config.num_workers, 4); + assert_eq!(config.ingest_port, 9090); + assert_eq!(config.allowed_lateness_ms, 5_000); + assert_eq!(config.max_buffer_per_series, 10_000); + assert_eq!(config.flush_interval_ms, 1_000); + assert_eq!(config.channel_buffer_size, 10_000); + } +} diff --git a/asap-query-engine/src/precompute_engine/mod.rs b/asap-query-engine/src/precompute_engine/mod.rs new file mode 100644 index 0000000..71536e2 --- /dev/null +++ b/asap-query-engine/src/precompute_engine/mod.rs @@ -0,0 +1,177 @@ +pub mod accumulator_factory; +pub mod config; +pub mod output_sink; +pub mod series_buffer; +pub mod series_router; +pub mod window_manager; +pub mod worker; + +use crate::data_model::StreamingConfig; +use crate::drivers::ingest::prometheus_remote_write::decode_prometheus_remote_write; +use crate::precompute_engine::config::PrecomputeEngineConfig; +use crate::precompute_engine::output_sink::OutputSink; +use crate::precompute_engine::series_router::{SeriesRouter, WorkerMessage}; +use crate::precompute_engine::worker::Worker; +use axum::{body::Bytes, extract::State, http::StatusCode, routing::post, Router}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::net::TcpListener; +use tokio::sync::mpsc; +use tracing::{info, warn}; + +/// Shared state for the ingest HTTP handler. +struct IngestState { + router: SeriesRouter, + samples_ingested: std::sync::atomic::AtomicU64, +} + +/// The top-level precompute engine orchestrator. +/// +/// Creates worker threads, the series router, and the Axum ingest server. +pub struct PrecomputeEngine { + config: PrecomputeEngineConfig, + streaming_config: Arc, + output_sink: Arc, +} + +impl PrecomputeEngine { + pub fn new( + config: PrecomputeEngineConfig, + streaming_config: Arc, + output_sink: Arc, + ) -> Self { + Self { + config, + streaming_config, + output_sink, + } + } + + /// Start the precompute engine. This spawns worker tasks and the HTTP + /// ingest server, then blocks until shutdown. + pub async fn run(self) -> Result<(), Box> { + let num_workers = self.config.num_workers; + let channel_size = self.config.channel_buffer_size; + + // Build MPSC channels for each worker + let mut senders = Vec::with_capacity(num_workers); + let mut receivers = Vec::with_capacity(num_workers); + for _ in 0..num_workers { + let (tx, rx) = mpsc::channel::(channel_size); + senders.push(tx); + receivers.push(rx); + } + + // Build the router + let router = SeriesRouter::new(senders); + + // Build aggregation config map from streaming config + let agg_configs: HashMap = self + .streaming_config + .get_all_aggregation_configs() + .clone(); + + // Spawn workers + let mut worker_handles = Vec::with_capacity(num_workers); + for (id, rx) in receivers.into_iter().enumerate() { + let worker = Worker::new( + id, + rx, + self.output_sink.clone(), + agg_configs.clone(), + self.config.max_buffer_per_series, + self.config.allowed_lateness_ms, + ); + let handle = tokio::spawn(async move { + worker.run().await; + }); + worker_handles.push(handle); + } + + info!( + "PrecomputeEngine started with {} workers on port {}", + num_workers, self.config.ingest_port + ); + + // Build the ingest state + let ingest_state = Arc::new(IngestState { + router, + samples_ingested: std::sync::atomic::AtomicU64::new(0), + }); + + // Start flush timer + let flush_state = ingest_state.clone(); + let flush_interval_ms = self.config.flush_interval_ms; + tokio::spawn(async move { + let mut interval = + tokio::time::interval(tokio::time::Duration::from_millis(flush_interval_ms)); + loop { + interval.tick().await; + if let Err(e) = flush_state.router.broadcast_flush().await { + warn!("Flush broadcast error: {}", e); + break; + } + } + }); + + // Start the Axum HTTP server for Prometheus remote write ingest + let app = Router::new() + .route("/api/v1/write", post(handle_ingest)) + .with_state(ingest_state); + + let addr = format!("0.0.0.0:{}", self.config.ingest_port); + info!("Ingest server listening on {}", addr); + + let listener = TcpListener::bind(&addr).await?; + axum::serve(listener, app).await?; + + // Wait for workers to finish (this only happens on shutdown) + for handle in worker_handles { + let _ = handle.await; + } + + Ok(()) + } +} + +/// Axum handler for Prometheus remote write. +async fn handle_ingest( + State(state): State>, + body: Bytes, +) -> StatusCode { + let samples = match decode_prometheus_remote_write(&body) { + Ok(s) => s, + Err(e) => { + warn!("Failed to decode remote write: {}", e); + return StatusCode::BAD_REQUEST; + } + }; + + if samples.is_empty() { + return StatusCode::NO_CONTENT; + } + + let count = samples.len() as u64; + state + .samples_ingested + .fetch_add(count, std::sync::atomic::Ordering::Relaxed); + + // Group samples by series key for batch routing + let mut by_series: HashMap<&str, Vec<(i64, f64)>> = HashMap::new(); + for s in &samples { + by_series + .entry(&s.labels) + .or_default() + .push((s.timestamp_ms, s.value)); + } + + // Route each series batch to the correct worker + for (series_key, batch) in by_series { + if let Err(e) = state.router.route(series_key, batch).await { + warn!("Routing error for {}: {}", series_key, e); + return StatusCode::INTERNAL_SERVER_ERROR; + } + } + + StatusCode::NO_CONTENT +} diff --git a/asap-query-engine/src/precompute_engine/output_sink.rs b/asap-query-engine/src/precompute_engine/output_sink.rs new file mode 100644 index 0000000..445cb49 --- /dev/null +++ b/asap-query-engine/src/precompute_engine/output_sink.rs @@ -0,0 +1,83 @@ +use crate::data_model::{AggregateCore, PrecomputedOutput}; +use crate::stores::Store; +use std::sync::Arc; + +/// Trait for emitting completed window outputs. +pub trait OutputSink: Send + Sync { + fn emit_batch( + &self, + outputs: Vec<(PrecomputedOutput, Box)>, + ) -> Result<(), Box>; +} + +/// Output sink that writes directly to a `Store`. +pub struct StoreOutputSink { + store: Arc, +} + +impl StoreOutputSink { + pub fn new(store: Arc) -> Self { + Self { store } + } +} + +impl OutputSink for StoreOutputSink { + fn emit_batch( + &self, + outputs: Vec<(PrecomputedOutput, Box)>, + ) -> Result<(), Box> { + if outputs.is_empty() { + return Ok(()); + } + self.store.insert_precomputed_output_batch(outputs) + } +} + +/// Output sink for raw passthrough mode — forwards raw samples to the store +/// without sketch computation. In this mode the samples are stored as +/// SumAccumulators (one per sample). +pub struct RawPassthroughSink { + store: Arc, +} + +impl RawPassthroughSink { + pub fn new(store: Arc) -> Self { + Self { store } + } +} + +impl OutputSink for RawPassthroughSink { + fn emit_batch( + &self, + outputs: Vec<(PrecomputedOutput, Box)>, + ) -> Result<(), Box> { + if outputs.is_empty() { + return Ok(()); + } + self.store.insert_precomputed_output_batch(outputs) + } +} + +/// A no-op sink for testing that just counts emitted batches. +pub struct NoopOutputSink { + pub emit_count: std::sync::atomic::AtomicU64, +} + +impl NoopOutputSink { + pub fn new() -> Self { + Self { + emit_count: std::sync::atomic::AtomicU64::new(0), + } + } +} + +impl OutputSink for NoopOutputSink { + fn emit_batch( + &self, + outputs: Vec<(PrecomputedOutput, Box)>, + ) -> Result<(), Box> { + self.emit_count + .fetch_add(outputs.len() as u64, std::sync::atomic::Ordering::Relaxed); + Ok(()) + } +} diff --git a/asap-query-engine/src/precompute_engine/series_buffer.rs b/asap-query-engine/src/precompute_engine/series_buffer.rs new file mode 100644 index 0000000..a663142 --- /dev/null +++ b/asap-query-engine/src/precompute_engine/series_buffer.rs @@ -0,0 +1,161 @@ +use std::collections::BTreeMap; + +/// Per-series sample buffer backed by a `BTreeMap` for automatic +/// ordering by timestamp. Tracks a per-series watermark. +pub struct SeriesBuffer { + /// Samples keyed by timestamp_ms. BTreeMap keeps them sorted. + samples: BTreeMap, + /// High-watermark: the maximum timestamp seen so far for this series. + watermark_ms: i64, + /// Maximum number of samples to retain. When exceeded, oldest are evicted. + max_buffer_size: usize, +} + +impl SeriesBuffer { + pub fn new(max_buffer_size: usize) -> Self { + Self { + samples: BTreeMap::new(), + watermark_ms: i64::MIN, + max_buffer_size, + } + } + + /// Insert a sample. Updates the watermark if `timestamp_ms` is the new max. + /// Returns `true` if the sample was actually inserted (not a duplicate timestamp + /// with the same value). + pub fn insert(&mut self, timestamp_ms: i64, value: f64) -> bool { + if timestamp_ms > self.watermark_ms { + self.watermark_ms = timestamp_ms; + } + self.samples.insert(timestamp_ms, value); + + // Enforce max buffer size by evicting oldest entries + while self.samples.len() > self.max_buffer_size { + self.samples.pop_first(); + } + + true + } + + /// Return current watermark. + pub fn watermark_ms(&self) -> i64 { + self.watermark_ms + } + + /// Read all samples in `[start_ms, end_ms)` — inclusive start, exclusive end. + /// Returns them in timestamp order. + pub fn read_range(&self, start_ms: i64, end_ms: i64) -> Vec<(i64, f64)> { + self.samples + .range(start_ms..end_ms) + .map(|(&ts, &val)| (ts, val)) + .collect() + } + + /// Drain (remove and return) all samples with `timestamp_ms < up_to_ms`. + pub fn drain_up_to(&mut self, up_to_ms: i64) -> Vec<(i64, f64)> { + let mut drained = Vec::new(); + // split_off returns everything >= up_to_ms; we keep that part + let remaining = self.samples.split_off(&up_to_ms); + // self.samples now contains everything < up_to_ms + drained.extend(self.samples.iter().map(|(&ts, &val)| (ts, val))); + self.samples = remaining; + drained + } + + /// Number of buffered samples. + pub fn len(&self) -> usize { + self.samples.len() + } + + /// Whether the buffer is empty. + pub fn is_empty(&self) -> bool { + self.samples.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_insert_and_watermark() { + let mut buf = SeriesBuffer::new(100); + assert_eq!(buf.watermark_ms(), i64::MIN); + + buf.insert(1000, 1.0); + assert_eq!(buf.watermark_ms(), 1000); + + buf.insert(500, 0.5); // out-of-order + assert_eq!(buf.watermark_ms(), 1000); // watermark should not go back + + buf.insert(2000, 2.0); + assert_eq!(buf.watermark_ms(), 2000); + } + + #[test] + fn test_sorted_order() { + let mut buf = SeriesBuffer::new(100); + buf.insert(3000, 3.0); + buf.insert(1000, 1.0); + buf.insert(2000, 2.0); + + let all = buf.read_range(0, 4000); + assert_eq!(all, vec![(1000, 1.0), (2000, 2.0), (3000, 3.0)]); + } + + #[test] + fn test_read_range() { + let mut buf = SeriesBuffer::new(100); + for t in [1000, 2000, 3000, 4000, 5000] { + buf.insert(t, t as f64); + } + + // [2000, 4000) should return 2000, 3000 + let range = buf.read_range(2000, 4000); + assert_eq!(range, vec![(2000, 2000.0), (3000, 3000.0)]); + } + + #[test] + fn test_drain_up_to() { + let mut buf = SeriesBuffer::new(100); + for t in [1000, 2000, 3000, 4000, 5000] { + buf.insert(t, t as f64); + } + + let drained = buf.drain_up_to(3000); + assert_eq!(drained, vec![(1000, 1000.0), (2000, 2000.0)]); + assert_eq!(buf.len(), 3); // 3000, 4000, 5000 remain + } + + #[test] + fn test_max_buffer_enforcement() { + let mut buf = SeriesBuffer::new(3); + buf.insert(1000, 1.0); + buf.insert(2000, 2.0); + buf.insert(3000, 3.0); + buf.insert(4000, 4.0); // should evict 1000 + assert_eq!(buf.len(), 3); + + let all = buf.read_range(0, 5000); + assert_eq!(all, vec![(2000, 2.0), (3000, 3.0), (4000, 4.0)]); + } + + #[test] + fn test_dedup_by_timestamp() { + let mut buf = SeriesBuffer::new(100); + buf.insert(1000, 1.0); + buf.insert(1000, 2.0); // same timestamp, overwrites + assert_eq!(buf.len(), 1); + + let all = buf.read_range(0, 2000); + assert_eq!(all, vec![(1000, 2.0)]); + } + + #[test] + fn test_empty_operations() { + let buf = SeriesBuffer::new(100); + assert!(buf.is_empty()); + assert_eq!(buf.len(), 0); + assert_eq!(buf.read_range(0, 1000), vec![]); + } +} diff --git a/asap-query-engine/src/precompute_engine/series_router.rs b/asap-query-engine/src/precompute_engine/series_router.rs new file mode 100644 index 0000000..fd05817 --- /dev/null +++ b/asap-query-engine/src/precompute_engine/series_router.rs @@ -0,0 +1,103 @@ +use tokio::sync::mpsc; +use xxhash_rust::xxh64::xxh64; + +/// A message sent from the router to a worker. +#[derive(Debug)] +pub enum WorkerMessage { + /// A batch of samples for the same series. + Samples { + series_key: String, + samples: Vec<(i64, f64)>, // (timestamp_ms, value) + }, + /// Signal the worker to flush/check idle windows. + Flush, + /// Graceful shutdown. + Shutdown, +} + +/// Routes incoming samples to one of N workers based on a consistent hash +/// of the series label string. +pub struct SeriesRouter { + senders: Vec>, + num_workers: usize, +} + +impl SeriesRouter { + pub fn new(senders: Vec>) -> Self { + let num_workers = senders.len(); + Self { + senders, + num_workers, + } + } + + /// Route a batch of samples for one series to the appropriate worker. + pub async fn route( + &self, + series_key: &str, + samples: Vec<(i64, f64)>, + ) -> Result<(), Box> { + let worker_idx = self.worker_for(series_key); + self.senders[worker_idx] + .send(WorkerMessage::Samples { + series_key: series_key.to_string(), + samples, + }) + .await + .map_err(|e| format!("Failed to send to worker {}: {}", worker_idx, e))?; + Ok(()) + } + + /// Broadcast a flush signal to all workers. + pub async fn broadcast_flush(&self) -> Result<(), Box> { + for (i, sender) in self.senders.iter().enumerate() { + sender + .send(WorkerMessage::Flush) + .await + .map_err(|e| format!("Failed to send flush to worker {}: {}", i, e))?; + } + Ok(()) + } + + /// Broadcast shutdown to all workers. + pub async fn broadcast_shutdown(&self) -> Result<(), Box> { + for (i, sender) in self.senders.iter().enumerate() { + sender + .send(WorkerMessage::Shutdown) + .await + .map_err(|e| format!("Failed to send shutdown to worker {}: {}", i, e))?; + } + Ok(()) + } + + /// Determine which worker handles a given series key. + fn worker_for(&self, series_key: &str) -> usize { + let hash = xxh64(series_key.as_bytes(), 0); + (hash as usize) % self.num_workers + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_consistent_routing() { + // Build a router with dummy senders (we only test the hash logic) + let (senders, _receivers): (Vec<_>, Vec<_>) = (0..4) + .map(|_| mpsc::channel::(10)) + .unzip(); + + let router = SeriesRouter::new(senders); + + // Same key should always go to the same worker + let w1 = router.worker_for("cpu{host=\"a\"}"); + let w2 = router.worker_for("cpu{host=\"a\"}"); + assert_eq!(w1, w2); + + // Different keys may go to different workers (probabilistic, but verifiable) + let _ = router.worker_for("cpu{host=\"b\"}"); + // Just ensure no panic and result is in range + assert!(router.worker_for("mem{host=\"a\"}") < 4); + } +} diff --git a/asap-query-engine/src/precompute_engine/window_manager.rs b/asap-query-engine/src/precompute_engine/window_manager.rs new file mode 100644 index 0000000..929f928 --- /dev/null +++ b/asap-query-engine/src/precompute_engine/window_manager.rs @@ -0,0 +1,154 @@ +/// Manages tumbling window boundaries and detects which windows have closed +/// based on watermark advancement. +pub struct WindowManager { + /// Window size in milliseconds. + window_size_ms: i64, + /// Slide interval in milliseconds (== window_size_ms for tumbling windows). + slide_interval_ms: i64, +} + +impl WindowManager { + /// Create a new WindowManager. + /// + /// `window_size_secs` and `slide_interval_secs` come from `AggregationConfig` + /// (which stores them in seconds). They are converted to milliseconds internally. + pub fn new(window_size_secs: u64, slide_interval_secs: u64) -> Self { + let window_size_ms = (window_size_secs * 1000) as i64; + let slide_interval_ms = if slide_interval_secs == 0 { + window_size_ms // tumbling window + } else { + (slide_interval_secs * 1000) as i64 + }; + Self { + window_size_ms, + slide_interval_ms, + } + } + + pub fn window_size_ms(&self) -> i64 { + self.window_size_ms + } + + /// Compute the window start for a given timestamp. + /// Windows are aligned to epoch (multiples of slide_interval_ms). + pub fn window_start_for(&self, timestamp_ms: i64) -> i64 { + // Floor-divide to the nearest slide interval boundary + let n = timestamp_ms.div_euclid(self.slide_interval_ms); + n * self.slide_interval_ms + } + + /// Return window starts whose windows are now closed, given that the + /// watermark advanced from `previous_wm` to `current_wm`. + /// + /// A window `[start, start + window_size_ms)` is closed when + /// `current_wm >= start + window_size_ms`. + /// + /// Returns window starts in ascending order. + pub fn closed_windows(&self, previous_wm: i64, current_wm: i64) -> Vec { + if current_wm <= previous_wm || previous_wm == i64::MIN { + // No watermark advancement, or first sample ever (nothing to close yet + // — the window that contains the first sample is still open). + return Vec::new(); + } + + let mut closed = Vec::new(); + + // The earliest window start that *could* have been open at previous_wm. + // A window is open if its end (start + window_size_ms) > previous_wm. + // So the oldest open window start was: previous_wm - window_size_ms + 1, + // aligned down to slide_interval. + let earliest_open_start = self.window_start_for( + (previous_wm - self.window_size_ms + 1).max(0), + ); + + let mut start = earliest_open_start; + while start + self.window_size_ms <= current_wm { + // This window was NOT closed at previous_wm but IS closed at current_wm + if start + self.window_size_ms > previous_wm { + closed.push(start); + } + start += self.slide_interval_ms; + } + + closed + } + + /// Return the window `[start, end)` boundaries for a given window start. + pub fn window_bounds(&self, window_start: i64) -> (i64, i64) { + (window_start, window_start + self.window_size_ms) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_tumbling_window_start() { + // 60-second (60000ms) tumbling windows + let wm = WindowManager::new(60, 0); + + assert_eq!(wm.window_start_for(0), 0); + assert_eq!(wm.window_start_for(59_999), 0); + assert_eq!(wm.window_start_for(60_000), 60_000); + assert_eq!(wm.window_start_for(119_999), 60_000); + assert_eq!(wm.window_start_for(120_000), 120_000); + } + + #[test] + fn test_no_closed_windows_on_first_sample() { + let wm = WindowManager::new(60, 0); + let closed = wm.closed_windows(i64::MIN, 30_000); + assert!(closed.is_empty()); + } + + #[test] + fn test_tumbling_window_close() { + // 60s tumbling windows + let wm = WindowManager::new(60, 0); + + // Watermark advances from 30_000 to 70_000 + // Window [0, 60_000) closes when wm >= 60_000 + let closed = wm.closed_windows(30_000, 70_000); + assert_eq!(closed, vec![0]); + } + + #[test] + fn test_multiple_window_closes() { + // 10s (10000ms) tumbling windows + let wm = WindowManager::new(10, 0); + + // Watermark jumps from 5_000 to 35_000 — closes windows 0, 10_000, 20_000 + let closed = wm.closed_windows(5_000, 35_000); + assert_eq!(closed, vec![0, 10_000, 20_000]); + } + + #[test] + fn test_no_close_when_watermark_stagnant() { + let wm = WindowManager::new(60, 0); + let closed = wm.closed_windows(30_000, 30_000); + assert!(closed.is_empty()); + } + + #[test] + fn test_window_bounds() { + let wm = WindowManager::new(60, 0); + assert_eq!(wm.window_bounds(0), (0, 60_000)); + assert_eq!(wm.window_bounds(60_000), (60_000, 120_000)); + } + + #[test] + fn test_sliding_window() { + // 30s window, 10s slide + let wm = WindowManager::new(30, 10); + + assert_eq!(wm.window_start_for(0), 0); + assert_eq!(wm.window_start_for(9_999), 0); + assert_eq!(wm.window_start_for(10_000), 10_000); + + // Watermark advances from 15_000 to 35_000 + // Window [0, 30_000) closes at wm=30_000 (was open at 15_000) + let closed = wm.closed_windows(15_000, 35_000); + assert_eq!(closed, vec![0]); + } +} diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs new file mode 100644 index 0000000..08951f7 --- /dev/null +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -0,0 +1,472 @@ +use crate::data_model::{AggregateCore, KeyByLabelValues, PrecomputedOutput}; +use crate::precompute_engine::accumulator_factory::{create_accumulator_updater, AccumulatorUpdater}; +use crate::precompute_engine::output_sink::OutputSink; +use crate::precompute_engine::series_buffer::SeriesBuffer; +use crate::precompute_engine::series_router::WorkerMessage; +use crate::precompute_engine::window_manager::WindowManager; +use sketch_db_common::aggregation_config::AggregationConfig; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::mpsc; +use tracing::{debug, info, warn}; + +/// Per-aggregation state within a series: the window manager and active +/// window accumulators. +struct AggregationState { + config: AggregationConfig, + window_manager: WindowManager, + /// Active windows keyed by window_start_ms. + active_windows: HashMap>, +} + +/// Per-series state owned by the worker. +struct SeriesState { + buffer: SeriesBuffer, + previous_watermark_ms: i64, + /// One AggregationState per matching aggregation config. + aggregations: Vec, +} + +/// Worker that processes samples for a shard of the series space. +pub struct Worker { + id: usize, + receiver: mpsc::Receiver, + output_sink: Arc, + /// Map from series key to per-series state. + series_map: HashMap, + /// Aggregation configs, keyed by aggregation_id. + agg_configs: HashMap, + /// Max buffer size per series. + max_buffer_per_series: usize, + /// Allowed lateness in ms. + allowed_lateness_ms: i64, +} + +impl Worker { + pub fn new( + id: usize, + receiver: mpsc::Receiver, + output_sink: Arc, + agg_configs: HashMap, + max_buffer_per_series: usize, + allowed_lateness_ms: i64, + ) -> Self { + Self { + id, + receiver, + output_sink, + series_map: HashMap::new(), + agg_configs, + max_buffer_per_series, + allowed_lateness_ms, + } + } + + /// Run the worker loop. Blocks until shutdown. + pub async fn run(mut self) { + info!("Worker {} started", self.id); + + while let Some(msg) = self.receiver.recv().await { + match msg { + WorkerMessage::Samples { + series_key, + samples, + } => { + if let Err(e) = self.process_samples(&series_key, samples) { + warn!("Worker {} error processing {}: {}", self.id, series_key, e); + } + } + WorkerMessage::Flush => { + if let Err(e) = self.flush_all() { + warn!("Worker {} flush error: {}", self.id, e); + } + } + WorkerMessage::Shutdown => { + info!("Worker {} shutting down", self.id); + // Final flush before shutdown + if let Err(e) = self.flush_all() { + warn!("Worker {} final flush error: {}", self.id, e); + } + break; + } + } + } + + info!( + "Worker {} stopped, {} active series", + self.id, + self.series_map.len() + ); + } + + /// Find all aggregation configs whose metric/spatial_filter matches this series. + fn matching_agg_configs(&self, series_key: &str) -> Vec<(u64, &AggregationConfig)> { + let metric_name = extract_metric_name(series_key); + + self.agg_configs + .iter() + .filter(|(_, config)| { + // Match on metric name + config.metric == metric_name + || config.spatial_filter_normalized == metric_name + || config.spatial_filter == metric_name + }) + .map(|(&id, config)| (id, config)) + .collect() + } + + /// Get or create the SeriesState for a series key. + fn get_or_create_series_state(&mut self, series_key: &str) -> &mut SeriesState { + if !self.series_map.contains_key(series_key) { + let matching = self.matching_agg_configs(series_key); + let aggregations = matching + .into_iter() + .map(|(_, config)| AggregationState { + window_manager: WindowManager::new( + config.window_size, + config.slide_interval, + ), + config: config.clone(), + active_windows: HashMap::new(), + }) + .collect(); + + self.series_map.insert( + series_key.to_string(), + SeriesState { + buffer: SeriesBuffer::new(self.max_buffer_per_series), + previous_watermark_ms: i64::MIN, + aggregations, + }, + ); + } + + self.series_map.get_mut(series_key).unwrap() + } + + fn process_samples( + &mut self, + series_key: &str, + samples: Vec<(i64, f64)>, + ) -> Result<(), Box> { + // Copy scalars out of self before taking &mut self.series_map + let worker_id = self.id; + let allowed_lateness_ms = self.allowed_lateness_ms; + + // Ensure state exists + self.get_or_create_series_state(series_key); + + let state = self.series_map.get_mut(series_key).unwrap(); + + if state.aggregations.is_empty() { + return Ok(()); + } + + // Insert samples into buffer, dropping late arrivals + for &(ts, val) in &samples { + if state.buffer.watermark_ms() != i64::MIN + && ts < state.buffer.watermark_ms() - allowed_lateness_ms + { + debug!( + "Worker {} dropping late sample for {}: ts={} watermark={}", + worker_id, + series_key, + ts, + state.buffer.watermark_ms() + ); + continue; + } + state.buffer.insert(ts, val); + } + + let current_wm = state.buffer.watermark_ms(); + let previous_wm = state.previous_watermark_ms; + + let mut emit_batch: Vec<(PrecomputedOutput, Box)> = Vec::new(); + + for agg_state in &mut state.aggregations { + let closed = agg_state + .window_manager + .closed_windows(previous_wm, current_wm); + + // Feed each incoming sample to the correct active window accumulator + for &(ts, val) in &samples { + if current_wm != i64::MIN && ts < current_wm - allowed_lateness_ms { + continue; // already dropped + } + + let window_start = agg_state.window_manager.window_start_for(ts); + + let updater = agg_state + .active_windows + .entry(window_start) + .or_insert_with(|| create_accumulator_updater(&agg_state.config)); + + if updater.is_keyed() { + let key = extract_key_from_series(series_key, &agg_state.config); + updater.update_keyed(&key, val, ts); + } else { + updater.update_single(val, ts); + } + } + + // Emit closed windows + for window_start in &closed { + if let Some(mut updater) = agg_state.active_windows.remove(window_start) { + let (_, window_end) = + agg_state.window_manager.window_bounds(*window_start); + + let key = if updater.is_keyed() { + Some(extract_key_from_series(series_key, &agg_state.config)) + } else { + None + }; + + let output = PrecomputedOutput::new( + *window_start as u64, + window_end as u64, + key, + agg_state.config.aggregation_id, + ); + + let accumulator = updater.take_accumulator(); + emit_batch.push((output, accumulator)); + } + } + } + + state.previous_watermark_ms = current_wm; + + // Emit to output sink + if !emit_batch.is_empty() { + debug!( + "Worker {} emitting {} outputs for {}", + worker_id, + emit_batch.len(), + series_key + ); + self.output_sink.emit_batch(emit_batch)?; + } + + Ok(()) + } + + /// Flush all series — force-close windows that are past due. + fn flush_all(&mut self) -> Result<(), Box> { + let mut emit_batch: Vec<(PrecomputedOutput, Box)> = Vec::new(); + + for (series_key, state) in &mut self.series_map { + let current_wm = state.buffer.watermark_ms(); + let previous_wm = state.previous_watermark_ms; + + for agg_state in &mut state.aggregations { + let closed = agg_state + .window_manager + .closed_windows(previous_wm, current_wm); + + for window_start in &closed { + if let Some(mut updater) = agg_state.active_windows.remove(window_start) { + let (_, window_end) = + agg_state.window_manager.window_bounds(*window_start); + + let key = if updater.is_keyed() { + Some(extract_key_from_series( + series_key, + &agg_state.config, + )) + } else { + None + }; + + let output = PrecomputedOutput::new( + *window_start as u64, + window_end as u64, + key, + agg_state.config.aggregation_id, + ); + + let accumulator = updater.take_accumulator(); + emit_batch.push((output, accumulator)); + } + } + } + + state.previous_watermark_ms = current_wm; + } + + if !emit_batch.is_empty() { + debug!( + "Worker {} flush emitting {} outputs", + self.id, + emit_batch.len() + ); + self.output_sink.emit_batch(emit_batch)?; + } + + Ok(()) + } +} + +/// Extract the metric name from a series key like `"metric_name{key1=\"val1\"}"`. +pub fn extract_metric_name(series_key: &str) -> &str { + match series_key.find('{') { + Some(pos) => &series_key[..pos], + None => series_key, + } +} + +/// Extract grouping label values from a series key string based on the +/// aggregation config's `grouping_labels`. +/// +/// The series key format is: `metric_name{label1="val1",label2="val2",...}` +pub fn extract_key_from_series( + series_key: &str, + config: &AggregationConfig, +) -> KeyByLabelValues { + let labels = parse_labels_from_series_key(series_key); + let mut values = Vec::new(); + + for label_name in &config.grouping_labels.labels { + if let Some(val) = labels.get(label_name.as_str()) { + values.push(val.to_string()); + } else { + values.push(String::new()); + } + } + + KeyByLabelValues::new_with_labels(values) +} + +/// Parse label key-value pairs from a series key string. +/// `"metric{a=\"b\",c=\"d\"}"` → `{("a", "b"), ("c", "d")}` +fn parse_labels_from_series_key(series_key: &str) -> HashMap<&str, &str> { + let mut labels = HashMap::new(); + + let start = match series_key.find('{') { + Some(pos) => pos + 1, + None => return labels, + }; + let end = match series_key.rfind('}') { + Some(pos) => pos, + None => return labels, + }; + + if start >= end { + return labels; + } + + let label_str = &series_key[start..end]; + + // Parse comma-separated key="value" pairs + // Simple parser that handles the expected format + let mut remaining = label_str; + while !remaining.is_empty() { + // Find the '=' separator + let eq_pos = match remaining.find('=') { + Some(pos) => pos, + None => break, + }; + let key = remaining[..eq_pos].trim(); + + // Expect "value" after = + let after_eq = &remaining[eq_pos + 1..]; + if !after_eq.starts_with('"') { + break; + } + + // Find closing quote + let value_start = 1; // skip opening quote + let value_end = match after_eq[value_start..].find('"') { + Some(pos) => value_start + pos, + None => break, + }; + + let value = &after_eq[value_start..value_end]; + labels.insert(key, value); + + // Move past the closing quote and optional comma + let consumed = value_end + 1; // past closing quote + remaining = &after_eq[consumed..]; + if remaining.starts_with(',') { + remaining = &remaining[1..]; + } + } + + labels +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_metric_name() { + assert_eq!( + extract_metric_name("http_requests_total{method=\"GET\"}"), + "http_requests_total" + ); + assert_eq!(extract_metric_name("up"), "up"); + assert_eq!( + extract_metric_name("cpu_usage{host=\"a\",zone=\"us\"}"), + "cpu_usage" + ); + } + + #[test] + fn test_parse_labels() { + let labels = + parse_labels_from_series_key("metric{method=\"GET\",status=\"200\"}"); + assert_eq!(labels.get("method"), Some(&"GET")); + assert_eq!(labels.get("status"), Some(&"200")); + } + + #[test] + fn test_parse_labels_no_labels() { + let labels = parse_labels_from_series_key("metric"); + assert!(labels.is_empty()); + } + + #[test] + fn test_parse_labels_empty_braces() { + let labels = parse_labels_from_series_key("metric{}"); + assert!(labels.is_empty()); + } + + #[test] + fn test_extract_key_from_series() { + use serde_json::json; + + let config = AggregationConfig::new( + 1, + "SingleSubpopulation".to_string(), + "Sum".to_string(), + HashMap::new(), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![ + "method".to_string(), + "status".to_string(), + ]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + String::new(), + 60, + "http_requests_total".to_string(), + "http_requests_total".to_string(), + None, + None, + Some(60), + Some(0), + None, + None, + None, + ); + + let key = extract_key_from_series( + "http_requests_total{method=\"GET\",status=\"200\"}", + &config, + ); + assert_eq!( + key.labels, + vec!["GET".to_string(), "200".to_string()] + ); + } +} From ba9ce1682de59eaa959978dd405eeb03c9ba1954 Mon Sep 17 00:00:00 2001 From: zz_y Date: Mon, 23 Feb 2026 11:23:02 -0700 Subject: [PATCH 02/12] Restore standalone precompute engine binary with fixed imports Replace sketch_db_common::enums::QueryLanguage import with query_engine_rust::data_model::QueryLanguage in the standalone binary, and integrate PrecomputeEngine into main.rs as an alternative to the removed PrometheusRemoteWriteServer. Co-Authored-By: Claude Opus 4.6 --- .../src/bin/precompute_engine.rs | 2 +- asap-query-engine/src/lib.rs | 4 + asap-query-engine/src/main.rs | 84 ++++++++++++------- 3 files changed, 60 insertions(+), 30 deletions(-) diff --git a/asap-query-engine/src/bin/precompute_engine.rs b/asap-query-engine/src/bin/precompute_engine.rs index 70b90f3..c8e8fd8 100644 --- a/asap-query-engine/src/bin/precompute_engine.rs +++ b/asap-query-engine/src/bin/precompute_engine.rs @@ -9,7 +9,7 @@ use query_engine_rust::precompute_engine::output_sink::StoreOutputSink; use query_engine_rust::precompute_engine::PrecomputeEngine; use query_engine_rust::stores::SimpleMapStore; use query_engine_rust::{HttpServer, HttpServerConfig}; -use sketch_db_common::enums::QueryLanguage; +use query_engine_rust::data_model::QueryLanguage; use std::sync::Arc; use tracing::info; diff --git a/asap-query-engine/src/lib.rs b/asap-query-engine/src/lib.rs index f5363f6..a76efeb 100644 --- a/asap-query-engine/src/lib.rs +++ b/asap-query-engine/src/lib.rs @@ -43,6 +43,10 @@ pub use drivers::{ OtlpReceiverConfig, }; +pub use precompute_engine::config::PrecomputeEngineConfig; +pub use precompute_engine::output_sink::StoreOutputSink; +pub use precompute_engine::PrecomputeEngine; + pub use utils::{normalize_spatial_filter, read_inference_config, read_streaming_config}; pub type Result = std::result::Result>; diff --git a/asap-query-engine/src/main.rs b/asap-query-engine/src/main.rs index 00be2fe..a2f9160 100644 --- a/asap-query-engine/src/main.rs +++ b/asap-query-engine/src/main.rs @@ -12,7 +12,8 @@ use query_engine_rust::drivers::AdapterConfig; use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config}; use query_engine_rust::{ HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtlpReceiver, - OtlpReceiverConfig, Result, SimpleEngine, SimpleMapStore, + OtlpReceiverConfig, PrecomputeEngine, PrecomputeEngineConfig, Result, SimpleEngine, + SimpleMapStore, StoreOutputSink, }; #[derive(Parser, Debug)] @@ -102,10 +103,6 @@ struct Args { #[arg(long, default_value = "9090")] prometheus_remote_write_port: u16, - /// Automatically initialize all sketch types for newly seen series during ingestion - #[arg(long, default_value = "true")] - auto_init_sketches: bool, - /// Path to promsketch configuration YAML file (optional; uses defaults if omitted) #[arg(long)] promsketch_config: Option, @@ -133,6 +130,26 @@ struct Args { /// OTLP HTTP listen port #[arg(long, default_value = "4318")] otel_http_port: u16, + + /// Number of precompute engine worker threads + #[arg(long, default_value = "4")] + precompute_num_workers: usize, + + /// Maximum allowed lateness for out-of-order samples (milliseconds) + #[arg(long, default_value = "5000")] + precompute_allowed_lateness_ms: i64, + + /// Maximum buffered samples per series before eviction + #[arg(long, default_value = "10000")] + precompute_max_buffer_per_series: usize, + + /// Interval at which the flush timer fires (milliseconds) + #[arg(long, default_value = "1000")] + precompute_flush_interval_ms: u64, + + /// Capacity of the channel between router and each worker + #[arg(long, default_value = "10000")] + precompute_channel_buffer_size: usize, } #[tokio::main] @@ -273,25 +290,34 @@ async fn main() -> Result<()> { None }; - // Setup Prometheus remote write server - // let prometheus_remote_write_handle = if args.enable_prometheus_remote_write { - // let prw_config = PrometheusRemoteWriteConfig { - // port: args.prometheus_remote_write_port, - // auto_init_sketches: args.auto_init_sketches, - // }; - // let server = PrometheusRemoteWriteServer::new(prw_config, promsketch_store.clone()); - // info!( - // "Starting Prometheus remote write server on port {}", - // args.prometheus_remote_write_port - // ); - // Some(tokio::spawn(async move { - // if let Err(e) = server.run().await { - // error!("Prometheus remote write server error: {}", e); - // } - // })) - // } else { - // None - // }; + // Setup precompute engine (replaces standalone Prometheus remote write server) + let precompute_handle = if args.enable_prometheus_remote_write { + let precompute_config = PrecomputeEngineConfig { + num_workers: args.precompute_num_workers, + ingest_port: args.prometheus_remote_write_port, + allowed_lateness_ms: args.precompute_allowed_lateness_ms, + max_buffer_per_series: args.precompute_max_buffer_per_series, + flush_interval_ms: args.precompute_flush_interval_ms, + channel_buffer_size: args.precompute_channel_buffer_size, + }; + let output_sink = Arc::new(StoreOutputSink::new(store.clone())); + let engine = PrecomputeEngine::new( + precompute_config, + streaming_config.clone(), + output_sink, + ); + info!( + "Starting precompute engine on port {}", + args.prometheus_remote_write_port + ); + Some(tokio::spawn(async move { + if let Err(e) = engine.run().await { + error!("Precompute engine error: {}", e); + } + })) + } else { + None + }; //info!("=== TEMPORARY: Using ClickHouse HTTP adapter ==="); //info!("ClickHouse endpoint will be available at: /clickhouse/query"); @@ -343,11 +369,11 @@ async fn main() -> Result<()> { let _ = handle.await; } - // if let Some(handle) = prometheus_remote_write_handle { - // info!("Shutting down Prometheus remote write server..."); - // handle.abort(); - // let _ = handle.await; - // } + if let Some(handle) = precompute_handle { + info!("Shutting down precompute engine..."); + handle.abort(); + let _ = handle.await; + } info!("Shutdown complete"); Ok(()) From 488f80fec1bc313bba0b075cf5ffd05637d8baf8 Mon Sep 17 00:00:00 2001 From: zz_y Date: Mon, 23 Feb 2026 12:14:53 -0700 Subject: [PATCH 03/12] Wire up DatasketchesKLL in accumulator factory and add E2E test Handle top-level aggregation types (DatasketchesKLL, Sum, Min, Max, etc.) directly in the factory match, fixing the fallback to Sum that broke quantile queries. Also preserve the K parameter in KllAccumulatorUpdater::reset() instead of hardcoding 200. Add test_e2e_precompute binary that validates the full ingest -> precompute -> store -> query pipeline end-to-end. Co-Authored-By: Claude Opus 4.6 --- asap-query-engine/Cargo.toml | 4 + .../src/bin/test_e2e_precompute.rs | 262 ++++++++++++++++++ .../precompute_engine/accumulator_factory.rs | 51 +++- 3 files changed, 315 insertions(+), 2 deletions(-) create mode 100644 asap-query-engine/src/bin/test_e2e_precompute.rs diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index ab5c437..a5fca27 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -63,6 +63,10 @@ sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust", rev = name = "precompute_engine" path = "src/bin/precompute_engine.rs" +[[bin]] +name = "test_e2e_precompute" +path = "src/bin/test_e2e_precompute.rs" + [dev-dependencies] ctor = "0.2" tempfile = "3.20.0" diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs new file mode 100644 index 0000000..c4370cb --- /dev/null +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -0,0 +1,262 @@ +//! End-to-end test for the standalone precompute_engine binary. +//! +//! This binary: +//! 1. Starts a PrecomputeEngine in-process (same as the precompute_engine binary) +//! 2. Sends Prometheus remote write samples via HTTP +//! 3. Queries the PromQL endpoint and prints results +//! +//! Usage: +//! cargo run --bin test_e2e_precompute + +use prost::Message; +use query_engine_rust::data_model::{LockStrategy, QueryLanguage}; +use query_engine_rust::drivers::ingest::prometheus_remote_write::{ + Label, Sample, TimeSeries, WriteRequest, +}; +use query_engine_rust::drivers::query::adapters::AdapterConfig; +use query_engine_rust::engines::SimpleEngine; +use query_engine_rust::precompute_engine::config::PrecomputeEngineConfig; +use query_engine_rust::precompute_engine::output_sink::StoreOutputSink; +use query_engine_rust::precompute_engine::PrecomputeEngine; +use query_engine_rust::stores::SimpleMapStore; +use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config}; +use query_engine_rust::{HttpServer, HttpServerConfig}; +use std::sync::Arc; + +const INGEST_PORT: u16 = 19090; +const QUERY_PORT: u16 = 18080; +const SCRAPE_INTERVAL: u64 = 1; // 1 second to match tumblingWindowSize + +fn build_remote_write_body(timeseries: Vec) -> Vec { + let write_req = WriteRequest { timeseries }; + let proto_bytes = write_req.encode_to_vec(); + snap::raw::Encoder::new() + .compress_vec(&proto_bytes) + .expect("snappy compress failed") +} + +fn make_sample(metric: &str, label_0: &str, timestamp_ms: i64, value: f64) -> TimeSeries { + TimeSeries { + labels: vec![ + Label { + name: "__name__".into(), + value: metric.into(), + }, + Label { + name: "instance".into(), + value: "i1".into(), + }, + Label { + name: "job".into(), + value: "test".into(), + }, + Label { + name: "label_0".into(), + value: label_0.into(), + }, + Label { + name: "label_1".into(), + value: "v1".into(), + }, + ], + samples: vec![Sample { + value, + timestamp: timestamp_ms, + }], + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .init(); + + // Load configs the same way main.rs does + let inference_config = + read_inference_config("examples/promql/inference_config.yaml", QueryLanguage::promql)?; + println!( + "Loaded inference config with {} query configs", + inference_config.query_configs.len() + ); + for qc in &inference_config.query_configs { + println!(" Query: '{}' -> {:?}", qc.query, qc.aggregations); + } + + let cleanup_policy = inference_config.cleanup_policy; + let streaming_config = Arc::new(read_streaming_config( + "examples/promql/streaming_config.yaml", + &inference_config, + )?); + println!( + "Loaded streaming config with {} aggregation configs", + streaming_config.get_all_aggregation_configs().len() + ); + + println!("\n=== Starting precompute engine (ingest={INGEST_PORT}, query={QUERY_PORT}) ==="); + + // Create store + let store: Arc = Arc::new( + SimpleMapStore::new_with_strategy( + streaming_config.clone(), + cleanup_policy, + LockStrategy::PerKey, + ), + ); + + // Start query server + let query_engine = Arc::new(SimpleEngine::new( + store.clone(), + inference_config, + streaming_config.clone(), + SCRAPE_INTERVAL, + QueryLanguage::promql, + )); + let http_config = HttpServerConfig { + port: QUERY_PORT, + handle_http_requests: true, + adapter_config: AdapterConfig { + protocol: query_engine_rust::data_model::QueryProtocol::PrometheusHttp, + language: QueryLanguage::promql, + fallback: None, + }, + }; + let http_server = HttpServer::new(http_config, query_engine, store.clone()); + tokio::spawn(async move { + if let Err(e) = http_server.run().await { + eprintln!("Query server error: {e}"); + } + }); + + // Start precompute engine + let engine_config = PrecomputeEngineConfig { + num_workers: 2, + ingest_port: INGEST_PORT, + allowed_lateness_ms: 5000, + max_buffer_per_series: 10000, + flush_interval_ms: 200, + channel_buffer_size: 10000, + }; + let output_sink = Arc::new(StoreOutputSink::new(store.clone())); + let engine = PrecomputeEngine::new(engine_config, streaming_config, output_sink); + tokio::spawn(async move { + if let Err(e) = engine.run().await { + eprintln!("Precompute engine error: {e}"); + } + }); + + // Wait for servers to bind + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + let client = reqwest::Client::new(); + + // ----------------------------------------------------------------------- + // Send samples across multiple 1-second tumbling windows. + // tumblingWindowSize=1 means windows are [0,1000), [1000,2000), etc. + // We need enough windows of data so the query engine can find results. + // ----------------------------------------------------------------------- + println!("\n=== Sending remote write samples ==="); + + // Send 20 windows worth of data (timestamps 0ms..20000ms = 0s..20s) + // Each window gets one sample. + for window in 0..20 { + let ts = window * 1000 + 500; // mid-window + let val = 10.0 + window as f64; + let body = build_remote_write_body(vec![make_sample("fake_metric", "groupA", ts, val)]); + + let resp = client + .post(format!("http://localhost:{INGEST_PORT}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(body) + .send() + .await?; + + println!(" Sent t={ts}ms v={val} -> HTTP {}", resp.status().as_u16()); + } + + // Advance watermark well past to close all windows + println!("\n=== Advancing watermark to close all windows ==="); + let body = build_remote_write_body(vec![make_sample("fake_metric", "groupA", 25000, 0.0)]); + let resp = client + .post(format!("http://localhost:{INGEST_PORT}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(body) + .send() + .await?; + println!(" Sent t=25000ms v=0 -> HTTP {}", resp.status().as_u16()); + + // Wait for flush + processing + println!("\n Waiting for flush..."); + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + // ----------------------------------------------------------------------- + // Query the PromQL endpoint + // The inference_config has: "quantile by (label_0) (0.99, fake_metric)" + // which maps to aggregation_id 1. + // ----------------------------------------------------------------------- + println!("\n=== Querying PromQL endpoint ==="); + + // Use the exact query pattern from inference_config + let queries_instant = vec![ + ("quantile by (label_0) (0.99, fake_metric)", "10", "Configured query at t=10"), + ("quantile by (label_0) (0.99, fake_metric)", "15", "Configured query at t=15"), + ("sum_over_time(fake_metric[1s])", "10", "Temporal: sum_over_time at t=10"), + ("sum(fake_metric)", "10", "Spatial: sum at t=10"), + ]; + + for (query, time, label) in &queries_instant { + println!("\n--- Instant query: {label} ---"); + let resp = client + .get(format!("http://localhost:{QUERY_PORT}/api/v1/query")) + .query(&[("query", *query), ("time", *time)]) + .send() + .await? + .text() + .await?; + print_json(&resp); + } + + // Range query + println!("\n--- Range query: quantile by (label_0) (0.99, fake_metric) t=5..20 step=1 ---"); + let resp = client + .get(format!("http://localhost:{QUERY_PORT}/api/v1/query_range")) + .query(&[ + ("query", "quantile by (label_0) (0.99, fake_metric)"), + ("start", "5"), + ("end", "20"), + ("step", "1"), + ]) + .send() + .await? + .text() + .await?; + print_json(&resp); + + // Runtime info + println!("\n--- Runtime info ---"); + let resp = client + .get(format!( + "http://localhost:{QUERY_PORT}/api/v1/status/runtimeinfo" + )) + .send() + .await? + .text() + .await?; + print_json(&resp); + + println!("\n=== E2E test complete ==="); + + Ok(()) +} + +fn print_json(s: &str) { + match serde_json::from_str::(s) { + Ok(v) => println!("{}", serde_json::to_string_pretty(&v).unwrap()), + Err(_) => println!("{s}"), + } +} diff --git a/asap-query-engine/src/precompute_engine/accumulator_factory.rs b/asap-query-engine/src/precompute_engine/accumulator_factory.rs index cd44e37..47e66a4 100644 --- a/asap-query-engine/src/precompute_engine/accumulator_factory.rs +++ b/asap-query-engine/src/precompute_engine/accumulator_factory.rs @@ -187,12 +187,14 @@ impl AccumulatorUpdater for IncreaseAccumulatorUpdater { pub struct KllAccumulatorUpdater { acc: DatasketchesKLLAccumulator, + k: u16, } impl KllAccumulatorUpdater { pub fn new(k: u16) -> Self { Self { acc: DatasketchesKLLAccumulator::new(k), + k, } } } @@ -213,8 +215,7 @@ impl AccumulatorUpdater for KllAccumulatorUpdater { } fn reset(&mut self) { - // Re-create with same k - self.acc = DatasketchesKLLAccumulator::new(200); // TODO: preserve k from config + self.acc = DatasketchesKLLAccumulator::new(self.k); } fn is_keyed(&self) -> bool { @@ -575,6 +576,52 @@ pub fn create_accumulator_updater( Box::new(MultipleSumUpdater::new()) } }, + // Top-level aggregation types (e.g. "DatasketchesKLL" directly in aggregationType) + "DatasketchesKLL" | "datasketches_kll" | "KLL" | "kll" => { + let k = config + .parameters + .get("K") + .or_else(|| config.parameters.get("k")) + .and_then(|v| v.as_u64()) + .unwrap_or(200) as u16; + Box::new(KllAccumulatorUpdater::new(k)) + } + "Sum" | "sum" => Box::new(SumAccumulatorUpdater::new()), + "Min" | "min" => Box::new(MinMaxAccumulatorUpdater::new("min".to_string())), + "Max" | "max" => Box::new(MinMaxAccumulatorUpdater::new("max".to_string())), + "Increase" | "increase" => Box::new(IncreaseAccumulatorUpdater::new()), + "CountMinSketch" | "count_min_sketch" | "CMS" | "cms" => { + let row_num = config + .parameters + .get("row_num") + .and_then(|v| v.as_u64()) + .unwrap_or(4) as usize; + let col_num = config + .parameters + .get("col_num") + .and_then(|v| v.as_u64()) + .unwrap_or(1000) as usize; + Box::new(CmsAccumulatorUpdater::new(row_num, col_num)) + } + "HydraKLL" | "hydra_kll" => { + let row_num = config + .parameters + .get("row_num") + .and_then(|v| v.as_u64()) + .unwrap_or(4) as usize; + let col_num = config + .parameters + .get("col_num") + .and_then(|v| v.as_u64()) + .unwrap_or(1000) as usize; + let k = config + .parameters + .get("K") + .or_else(|| config.parameters.get("k")) + .and_then(|v| v.as_u64()) + .unwrap_or(200) as u16; + Box::new(HydraKllAccumulatorUpdater::new(row_num, col_num, k)) + } other => { tracing::warn!( "Unknown aggregation_type '{}', defaulting to SingleSubpopulation Sum", From 907c4b6ef7d7129178207605981bbb6af24ffb05 Mon Sep 17 00:00:00 2001 From: zz_y Date: Mon, 23 Feb 2026 13:59:41 -0700 Subject: [PATCH 04/12] Add raw-only mode to PrecomputeEngine that skips aggregation When pass_raw_samples is enabled, each incoming sample is emitted directly to the store as a SumAccumulator without windowing or watermark advancement. This supports use cases where raw passthrough is needed instead of sketch-based aggregation. Co-Authored-By: Claude Opus 4.6 --- .../src/bin/precompute_engine.rs | 18 ++++- .../src/bin/test_e2e_precompute.rs | 81 ++++++++++++++++++- asap-query-engine/src/main.rs | 2 + .../src/precompute_engine/config.rs | 9 +++ .../src/precompute_engine/mod.rs | 2 + .../src/precompute_engine/worker.rs | 50 ++++++++++++ 6 files changed, 158 insertions(+), 4 deletions(-) diff --git a/asap-query-engine/src/bin/precompute_engine.rs b/asap-query-engine/src/bin/precompute_engine.rs index c8e8fd8..ad91c3f 100644 --- a/asap-query-engine/src/bin/precompute_engine.rs +++ b/asap-query-engine/src/bin/precompute_engine.rs @@ -5,7 +5,7 @@ use query_engine_rust::data_model::{ use query_engine_rust::drivers::query::adapters::AdapterConfig; use query_engine_rust::engines::SimpleEngine; use query_engine_rust::precompute_engine::config::PrecomputeEngineConfig; -use query_engine_rust::precompute_engine::output_sink::StoreOutputSink; +use query_engine_rust::precompute_engine::output_sink::{RawPassthroughSink, StoreOutputSink}; use query_engine_rust::precompute_engine::PrecomputeEngine; use query_engine_rust::stores::SimpleMapStore; use query_engine_rust::{HttpServer, HttpServerConfig}; @@ -52,6 +52,14 @@ struct Args { /// Lock strategy for the store #[arg(long, value_enum, default_value_t = LockStrategy::PerKey)] lock_strategy: LockStrategy, + + /// Skip aggregation and pass each raw sample directly to the store + #[arg(long, default_value_t = false)] + pass_raw_samples: bool, + + /// Aggregation ID to stamp on each raw-mode output + #[arg(long, default_value_t = 0)] + raw_mode_aggregation_id: u64, } #[tokio::main] @@ -122,11 +130,17 @@ async fn main() -> Result<(), Box> { max_buffer_per_series: args.max_buffer_per_series, flush_interval_ms: args.flush_interval_ms, channel_buffer_size: args.channel_buffer_size, + pass_raw_samples: args.pass_raw_samples, + raw_mode_aggregation_id: args.raw_mode_aggregation_id, }; // Create the output sink (writes directly to the store) let output_sink: Arc = - Arc::new(StoreOutputSink::new(store)); + if args.pass_raw_samples { + Arc::new(RawPassthroughSink::new(store)) + } else { + Arc::new(StoreOutputSink::new(store)) + }; // Build and run the engine let engine = PrecomputeEngine::new(engine_config, streaming_config, output_sink); diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index c4370cb..dadabe4 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -16,7 +16,7 @@ use query_engine_rust::drivers::ingest::prometheus_remote_write::{ use query_engine_rust::drivers::query::adapters::AdapterConfig; use query_engine_rust::engines::SimpleEngine; use query_engine_rust::precompute_engine::config::PrecomputeEngineConfig; -use query_engine_rust::precompute_engine::output_sink::StoreOutputSink; +use query_engine_rust::precompute_engine::output_sink::{RawPassthroughSink, StoreOutputSink}; use query_engine_rust::precompute_engine::PrecomputeEngine; use query_engine_rust::stores::SimpleMapStore; use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config}; @@ -25,6 +25,7 @@ use std::sync::Arc; const INGEST_PORT: u16 = 19090; const QUERY_PORT: u16 = 18080; +const RAW_INGEST_PORT: u16 = 19091; const SCRAPE_INTERVAL: u64 = 1; // 1 second to match tumblingWindowSize fn build_remote_write_body(timeseries: Vec) -> Vec { @@ -139,9 +140,11 @@ async fn main() -> Result<(), Box> { max_buffer_per_series: 10000, flush_interval_ms: 200, channel_buffer_size: 10000, + pass_raw_samples: false, + raw_mode_aggregation_id: 0, }; let output_sink = Arc::new(StoreOutputSink::new(store.clone())); - let engine = PrecomputeEngine::new(engine_config, streaming_config, output_sink); + let engine = PrecomputeEngine::new(engine_config, streaming_config.clone(), output_sink); tokio::spawn(async move { if let Err(e) = engine.run().await { eprintln!("Precompute engine error: {e}"); @@ -249,6 +252,80 @@ async fn main() -> Result<(), Box> { .await?; print_json(&resp); + // ----------------------------------------------------------------------- + // RAW MODE TEST + // ----------------------------------------------------------------------- + println!("\n=== Starting raw-mode precompute engine (ingest={RAW_INGEST_PORT}) ==="); + + // The raw engine reuses the same store so we can query results directly. + // Pick aggregation_id = 1 to match the existing streaming config. + let raw_agg_id: u64 = 1; + let raw_engine_config = PrecomputeEngineConfig { + num_workers: 1, + ingest_port: RAW_INGEST_PORT, + allowed_lateness_ms: 5000, + max_buffer_per_series: 10000, + flush_interval_ms: 200, + channel_buffer_size: 10000, + pass_raw_samples: true, + raw_mode_aggregation_id: raw_agg_id, + }; + let raw_sink = Arc::new(RawPassthroughSink::new(store.clone())); + let raw_engine = PrecomputeEngine::new( + raw_engine_config, + streaming_config.clone(), + raw_sink, + ); + tokio::spawn(async move { + if let Err(e) = raw_engine.run().await { + eprintln!("Raw precompute engine error: {e}"); + } + }); + + // Wait for server to bind + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + // Send a few raw samples — no need to advance watermark. + println!("\n=== Sending raw-mode samples ==="); + let raw_timestamps = vec![100_000i64, 101_000, 102_000]; + let raw_values = vec![42.0f64, 43.0, 44.0]; + for (&ts, &val) in raw_timestamps.iter().zip(raw_values.iter()) { + let body = build_remote_write_body(vec![make_sample("fake_metric", "groupA", ts, val)]); + let resp = client + .post(format!("http://localhost:{RAW_INGEST_PORT}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(body) + .send() + .await?; + println!(" Sent raw t={ts}ms v={val} -> HTTP {}", resp.status().as_u16()); + } + + // Short wait for processing (no watermark advancement needed) + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + // Verify raw samples appeared in the store + println!("\n=== Verifying raw samples in store ==="); + let results = store.query_precomputed_output( + "fake_metric", + raw_agg_id, + 100_000, + 103_000, + )?; + let total_buckets: usize = results.values().map(|v| v.len()).sum(); + println!(" Found {total_buckets} buckets for aggregation_id={raw_agg_id} in [100000, 103000)"); + assert!( + total_buckets >= 3, + "Expected at least 3 raw samples in store, got {total_buckets}" + ); + + for (key, buckets) in &results { + for ((start, end), _acc) in buckets { + println!(" key={key:?} start={start} end={end}"); + } + } + println!(" Raw mode test PASSED"); + println!("\n=== E2E test complete ==="); Ok(()) diff --git a/asap-query-engine/src/main.rs b/asap-query-engine/src/main.rs index a2f9160..4c10339 100644 --- a/asap-query-engine/src/main.rs +++ b/asap-query-engine/src/main.rs @@ -299,6 +299,8 @@ async fn main() -> Result<()> { max_buffer_per_series: args.precompute_max_buffer_per_series, flush_interval_ms: args.precompute_flush_interval_ms, channel_buffer_size: args.precompute_channel_buffer_size, + pass_raw_samples: false, + raw_mode_aggregation_id: 0, }; let output_sink = Arc::new(StoreOutputSink::new(store.clone())); let engine = PrecomputeEngine::new( diff --git a/asap-query-engine/src/precompute_engine/config.rs b/asap-query-engine/src/precompute_engine/config.rs index ba02a71..7bc8df9 100644 --- a/asap-query-engine/src/precompute_engine/config.rs +++ b/asap-query-engine/src/precompute_engine/config.rs @@ -16,6 +16,11 @@ pub struct PrecomputeEngineConfig { pub flush_interval_ms: u64, /// Capacity of the MPSC channel between router and each worker. pub channel_buffer_size: usize, + /// When true, skip all aggregation and pass each raw sample directly to the + /// output sink as a `SumAccumulator::with_sum(value)`. + pub pass_raw_samples: bool, + /// Aggregation ID to stamp on each raw-mode output. + pub raw_mode_aggregation_id: u64, } impl Default for PrecomputeEngineConfig { @@ -27,6 +32,8 @@ impl Default for PrecomputeEngineConfig { max_buffer_per_series: 10_000, flush_interval_ms: 1_000, channel_buffer_size: 10_000, + pass_raw_samples: false, + raw_mode_aggregation_id: 0, } } } @@ -44,5 +51,7 @@ mod tests { assert_eq!(config.max_buffer_per_series, 10_000); assert_eq!(config.flush_interval_ms, 1_000); assert_eq!(config.channel_buffer_size, 10_000); + assert!(!config.pass_raw_samples); + assert_eq!(config.raw_mode_aggregation_id, 0); } } diff --git a/asap-query-engine/src/precompute_engine/mod.rs b/asap-query-engine/src/precompute_engine/mod.rs index 71536e2..ad1a2a6 100644 --- a/asap-query-engine/src/precompute_engine/mod.rs +++ b/asap-query-engine/src/precompute_engine/mod.rs @@ -81,6 +81,8 @@ impl PrecomputeEngine { agg_configs.clone(), self.config.max_buffer_per_series, self.config.allowed_lateness_ms, + self.config.pass_raw_samples, + self.config.raw_mode_aggregation_id, ); let handle = tokio::spawn(async move { worker.run().await; diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index 08951f7..347a679 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -4,6 +4,7 @@ use crate::precompute_engine::output_sink::OutputSink; use crate::precompute_engine::series_buffer::SeriesBuffer; use crate::precompute_engine::series_router::WorkerMessage; use crate::precompute_engine::window_manager::WindowManager; +use crate::precompute_operators::sum_accumulator::SumAccumulator; use sketch_db_common::aggregation_config::AggregationConfig; use std::collections::HashMap; use std::sync::Arc; @@ -40,6 +41,10 @@ pub struct Worker { max_buffer_per_series: usize, /// Allowed lateness in ms. allowed_lateness_ms: i64, + /// When true, skip aggregation and pass raw samples through. + pass_raw_samples: bool, + /// Aggregation ID stamped on each raw-mode output. + raw_mode_aggregation_id: u64, } impl Worker { @@ -50,6 +55,8 @@ impl Worker { agg_configs: HashMap, max_buffer_per_series: usize, allowed_lateness_ms: i64, + pass_raw_samples: bool, + raw_mode_aggregation_id: u64, ) -> Self { Self { id, @@ -59,6 +66,8 @@ impl Worker { agg_configs, max_buffer_per_series, allowed_lateness_ms, + pass_raw_samples, + raw_mode_aggregation_id, } } @@ -149,6 +158,10 @@ impl Worker { series_key: &str, samples: Vec<(i64, f64)>, ) -> Result<(), Box> { + if self.pass_raw_samples { + return self.process_samples_raw(series_key, samples); + } + // Copy scalars out of self before taking &mut self.series_map let worker_id = self.id; let allowed_lateness_ms = self.allowed_lateness_ms; @@ -251,8 +264,45 @@ impl Worker { Ok(()) } + /// Raw fast-path: emit each sample as a standalone `SumAccumulator`. + fn process_samples_raw( + &self, + series_key: &str, + samples: Vec<(i64, f64)>, + ) -> Result<(), Box> { + let mut emit_batch: Vec<(PrecomputedOutput, Box)> = + Vec::with_capacity(samples.len()); + + for (ts, val) in samples { + let output = PrecomputedOutput::new( + ts as u64, + ts as u64, + None, + self.raw_mode_aggregation_id, + ); + let accumulator = SumAccumulator::with_sum(val); + emit_batch.push((output, Box::new(accumulator))); + } + + if !emit_batch.is_empty() { + debug!( + "Worker {} raw-emitting {} samples for {}", + self.id, + emit_batch.len(), + series_key + ); + self.output_sink.emit_batch(emit_batch)?; + } + + Ok(()) + } + /// Flush all series — force-close windows that are past due. fn flush_all(&mut self) -> Result<(), Box> { + if self.pass_raw_samples { + return Ok(()); + } + let mut emit_batch: Vec<(PrecomputedOutput, Box)> = Vec::new(); for (series_key, state) in &mut self.series_map { From 5558db2c2ffad36d0f0836b7d5c2bc3dfd086dda Mon Sep 17 00:00:00 2001 From: zz_y Date: Mon, 23 Feb 2026 14:16:58 -0700 Subject: [PATCH 05/12] Add debug-level tracing spans for e2e latency measurement in precompute engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instrument the ingest→worker→store pipeline with debug_span! and an Instant-based e2e latency log. All spans are at debug level so there is zero overhead at the default info level. Enable FmtSpan::CLOSE in both binaries to emit span durations when RUST_LOG=debug. Co-Authored-By: Claude Opus 4.6 --- .../src/bin/precompute_engine.rs | 2 + .../src/bin/test_e2e_precompute.rs | 1 + .../src/precompute_engine/mod.rs | 70 +++++++++++-------- .../src/precompute_engine/output_sink.rs | 3 + .../src/precompute_engine/series_router.rs | 4 ++ .../src/precompute_engine/worker.rs | 15 +++- 6 files changed, 63 insertions(+), 32 deletions(-) diff --git a/asap-query-engine/src/bin/precompute_engine.rs b/asap-query-engine/src/bin/precompute_engine.rs index ad91c3f..bb71efd 100644 --- a/asap-query-engine/src/bin/precompute_engine.rs +++ b/asap-query-engine/src/bin/precompute_engine.rs @@ -12,6 +12,7 @@ use query_engine_rust::{HttpServer, HttpServerConfig}; use query_engine_rust::data_model::QueryLanguage; use std::sync::Arc; use tracing::info; +use tracing_subscriber::fmt::format::FmtSpan; #[derive(Parser, Debug)] #[command(name = "precompute_engine")] @@ -70,6 +71,7 @@ async fn main() -> Result<(), Box> { tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), ) + .with_span_events(FmtSpan::CLOSE) .init(); let args = Args::parse(); diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index dadabe4..22d4be8 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -74,6 +74,7 @@ async fn main() -> Result<(), Box> { tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), ) + .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE) .init(); // Load configs the same way main.rs does diff --git a/asap-query-engine/src/precompute_engine/mod.rs b/asap-query-engine/src/precompute_engine/mod.rs index ad1a2a6..1fe7110 100644 --- a/asap-query-engine/src/precompute_engine/mod.rs +++ b/asap-query-engine/src/precompute_engine/mod.rs @@ -17,7 +17,8 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::net::TcpListener; use tokio::sync::mpsc; -use tracing::{info, warn}; +use std::time::Instant; +use tracing::{debug_span, info, warn, Instrument}; /// Shared state for the ingest HTTP handler. struct IngestState { @@ -141,39 +142,46 @@ async fn handle_ingest( State(state): State>, body: Bytes, ) -> StatusCode { - let samples = match decode_prometheus_remote_write(&body) { - Ok(s) => s, - Err(e) => { - warn!("Failed to decode remote write: {}", e); - return StatusCode::BAD_REQUEST; - } - }; + let ingest_span = debug_span!("ingest", body_len = body.len()); + let ingest_received_at = Instant::now(); + + async { + let samples = match decode_prometheus_remote_write(&body) { + Ok(s) => s, + Err(e) => { + warn!("Failed to decode remote write: {}", e); + return StatusCode::BAD_REQUEST; + } + }; - if samples.is_empty() { - return StatusCode::NO_CONTENT; - } + if samples.is_empty() { + return StatusCode::NO_CONTENT; + } - let count = samples.len() as u64; - state - .samples_ingested - .fetch_add(count, std::sync::atomic::Ordering::Relaxed); - - // Group samples by series key for batch routing - let mut by_series: HashMap<&str, Vec<(i64, f64)>> = HashMap::new(); - for s in &samples { - by_series - .entry(&s.labels) - .or_default() - .push((s.timestamp_ms, s.value)); - } + let count = samples.len() as u64; + state + .samples_ingested + .fetch_add(count, std::sync::atomic::Ordering::Relaxed); + + // Group samples by series key for batch routing + let mut by_series: HashMap<&str, Vec<(i64, f64)>> = HashMap::new(); + for s in &samples { + by_series + .entry(&s.labels) + .or_default() + .push((s.timestamp_ms, s.value)); + } - // Route each series batch to the correct worker - for (series_key, batch) in by_series { - if let Err(e) = state.router.route(series_key, batch).await { - warn!("Routing error for {}: {}", series_key, e); - return StatusCode::INTERNAL_SERVER_ERROR; + // Route each series batch to the correct worker + for (series_key, batch) in by_series { + if let Err(e) = state.router.route(series_key, batch, ingest_received_at).await { + warn!("Routing error for {}: {}", series_key, e); + return StatusCode::INTERNAL_SERVER_ERROR; + } } - } - StatusCode::NO_CONTENT + StatusCode::NO_CONTENT + } + .instrument(ingest_span) + .await } diff --git a/asap-query-engine/src/precompute_engine/output_sink.rs b/asap-query-engine/src/precompute_engine/output_sink.rs index 445cb49..6654902 100644 --- a/asap-query-engine/src/precompute_engine/output_sink.rs +++ b/asap-query-engine/src/precompute_engine/output_sink.rs @@ -1,6 +1,7 @@ use crate::data_model::{AggregateCore, PrecomputedOutput}; use crate::stores::Store; use std::sync::Arc; +use tracing::debug_span; /// Trait for emitting completed window outputs. pub trait OutputSink: Send + Sync { @@ -29,6 +30,7 @@ impl OutputSink for StoreOutputSink { if outputs.is_empty() { return Ok(()); } + let _span = debug_span!("store_insert", batch_size = outputs.len()).entered(); self.store.insert_precomputed_output_batch(outputs) } } @@ -54,6 +56,7 @@ impl OutputSink for RawPassthroughSink { if outputs.is_empty() { return Ok(()); } + let _span = debug_span!("store_insert_raw", batch_size = outputs.len()).entered(); self.store.insert_precomputed_output_batch(outputs) } } diff --git a/asap-query-engine/src/precompute_engine/series_router.rs b/asap-query-engine/src/precompute_engine/series_router.rs index fd05817..4642f7f 100644 --- a/asap-query-engine/src/precompute_engine/series_router.rs +++ b/asap-query-engine/src/precompute_engine/series_router.rs @@ -1,3 +1,4 @@ +use std::time::Instant; use tokio::sync::mpsc; use xxhash_rust::xxh64::xxh64; @@ -8,6 +9,7 @@ pub enum WorkerMessage { Samples { series_key: String, samples: Vec<(i64, f64)>, // (timestamp_ms, value) + ingest_received_at: Instant, }, /// Signal the worker to flush/check idle windows. Flush, @@ -36,12 +38,14 @@ impl SeriesRouter { &self, series_key: &str, samples: Vec<(i64, f64)>, + ingest_received_at: Instant, ) -> Result<(), Box> { let worker_idx = self.worker_for(series_key); self.senders[worker_idx] .send(WorkerMessage::Samples { series_key: series_key.to_string(), samples, + ingest_received_at, }) .await .map_err(|e| format!("Failed to send to worker {}: {}", worker_idx, e))?; diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index 347a679..4719f1c 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -9,7 +9,7 @@ use sketch_db_common::aggregation_config::AggregationConfig; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc; -use tracing::{debug, info, warn}; +use tracing::{debug, debug_span, info, warn}; /// Per-aggregation state within a series: the window manager and active /// window accumulators. @@ -80,10 +80,23 @@ impl Worker { WorkerMessage::Samples { series_key, samples, + ingest_received_at, } => { + let sample_count = samples.len(); + let _span = debug_span!( + "worker_process", + worker_id = self.id, + series = %series_key, + sample_count, + ) + .entered(); if let Err(e) = self.process_samples(&series_key, samples) { warn!("Worker {} error processing {}: {}", self.id, series_key, e); } + debug!( + e2e_latency_us = ingest_received_at.elapsed().as_micros() as u64, + "e2e: ingest->worker complete" + ); } WorkerMessage::Flush => { if let Err(e) = self.flush_all() { From 3239648909db37c7700d6585b6328d9e200d1f4b Mon Sep 17 00:00:00 2001 From: zz_y Date: Wed, 25 Mar 2026 06:32:04 -0500 Subject: [PATCH 06/12] fix: add precompute_engine and test_e2e_precompute bin stubs to Dockerfile dependency cache layer Co-Authored-By: Claude Sonnet 4.6 --- asap-query-engine/Dockerfile | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/asap-query-engine/Dockerfile b/asap-query-engine/Dockerfile index d4c2f5a..890a065 100644 --- a/asap-query-engine/Dockerfile +++ b/asap-query-engine/Dockerfile @@ -25,10 +25,14 @@ COPY asap-query-engine/Cargo.toml ./asap-query-engine/ COPY asap-planner-rs/Cargo.toml ./asap-planner-rs/ # Create dummy source files so Cargo can resolve all workspace members -RUN mkdir -p asap-query-engine/src && echo "fn main() {}" > asap-query-engine/src/main.rs && \ - mkdir -p asap-query-engine/benches && echo "fn main() {}" > asap-query-engine/benches/simple_store_bench.rs && \ - mkdir -p asap-planner-rs/src && echo "fn main() {}" > asap-planner-rs/src/main.rs && \ - echo "pub fn placeholder() {}" >> asap-planner-rs/src/lib.rs +# All explicit [[bin]] targets in Cargo.toml must have stubs here for the dependency cache layer +RUN mkdir -p asap-query-engine/src/bin \ + && echo "fn main() {}" > asap-query-engine/src/main.rs \ + && echo "fn main() {}" > asap-query-engine/src/bin/precompute_engine.rs \ + && echo "fn main() {}" > asap-query-engine/src/bin/test_e2e_precompute.rs \ + && mkdir -p asap-query-engine/benches && echo "fn main() {}" > asap-query-engine/benches/simple_store_bench.rs \ + && mkdir -p asap-planner-rs/src && echo "fn main() {}" > asap-planner-rs/src/main.rs \ + && echo "pub fn placeholder() {}" >> asap-planner-rs/src/lib.rs # Build dependencies (this layer will be cached) WORKDIR /code/asap-query-engine From 8af309b0f13e92e97bb2fc3bb6ddb31e41499c93 Mon Sep 17 00:00:00 2001 From: zz_y Date: Sat, 28 Mar 2026 14:12:13 -0500 Subject: [PATCH 07/12] fix: resolve build errors in precompute engine (E0432, E0609, E0061) and apply fmt - Restore decode_prometheus_remote_write and related types that were fully commented out in prometheus_remote_write.rs - Fix CountMinSketchAccumulator field access via .inner. in accumulator_factory - Fix AggregationConfig::new call in worker test (missing slide_interval arg, wrong types for spatial_filter/metric, extra trailing None) - Apply cargo fmt across precompute engine files Co-Authored-By: Claude Sonnet 4.6 --- .../src/bin/precompute_engine.rs | 15 +- .../src/bin/test_e2e_precompute.rs | 49 ++-- .../drivers/ingest/prometheus_remote_write.rs | 265 +++++++++--------- asap-query-engine/src/main.rs | 7 +- .../precompute_engine/accumulator_factory.rs | 24 +- .../src/precompute_engine/mod.rs | 19 +- .../src/precompute_engine/series_router.rs | 5 +- .../src/precompute_engine/window_manager.rs | 5 +- .../src/precompute_engine/worker.rs | 46 +-- 9 files changed, 204 insertions(+), 231 deletions(-) diff --git a/asap-query-engine/src/bin/precompute_engine.rs b/asap-query-engine/src/bin/precompute_engine.rs index bb71efd..e0ec1fe 100644 --- a/asap-query-engine/src/bin/precompute_engine.rs +++ b/asap-query-engine/src/bin/precompute_engine.rs @@ -1,4 +1,5 @@ use clap::Parser; +use query_engine_rust::data_model::QueryLanguage; use query_engine_rust::data_model::{ CleanupPolicy, InferenceConfig, LockStrategy, StreamingConfig, }; @@ -9,7 +10,6 @@ use query_engine_rust::precompute_engine::output_sink::{RawPassthroughSink, Stor use query_engine_rust::precompute_engine::PrecomputeEngine; use query_engine_rust::stores::SimpleMapStore; use query_engine_rust::{HttpServer, HttpServerConfig}; -use query_engine_rust::data_model::QueryLanguage; use std::sync::Arc; use tracing::info; use tracing_subscriber::fmt::format::FmtSpan; @@ -85,20 +85,17 @@ async fn main() -> Result<(), Box> { ); // Create the store - let store: Arc = Arc::new( - SimpleMapStore::new_with_strategy( + let store: Arc = + Arc::new(SimpleMapStore::new_with_strategy( streaming_config.clone(), CleanupPolicy::CircularBuffer, args.lock_strategy, - ), - ); + )); // Optionally start the query HTTP server if args.query_port > 0 { - let inference_config = InferenceConfig::new( - QueryLanguage::promql, - CleanupPolicy::CircularBuffer, - ); + let inference_config = + InferenceConfig::new(QueryLanguage::promql, CleanupPolicy::CircularBuffer); let query_engine = Arc::new(SimpleEngine::new( store.clone(), inference_config, diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index 22d4be8..779124f 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -78,8 +78,10 @@ async fn main() -> Result<(), Box> { .init(); // Load configs the same way main.rs does - let inference_config = - read_inference_config("examples/promql/inference_config.yaml", QueryLanguage::promql)?; + let inference_config = read_inference_config( + "examples/promql/inference_config.yaml", + QueryLanguage::promql, + )?; println!( "Loaded inference config with {} query configs", inference_config.query_configs.len() @@ -101,13 +103,12 @@ async fn main() -> Result<(), Box> { println!("\n=== Starting precompute engine (ingest={INGEST_PORT}, query={QUERY_PORT}) ==="); // Create store - let store: Arc = Arc::new( - SimpleMapStore::new_with_strategy( + let store: Arc = + Arc::new(SimpleMapStore::new_with_strategy( streaming_config.clone(), cleanup_policy, LockStrategy::PerKey, - ), - ); + )); // Start query server let query_engine = Arc::new(SimpleEngine::new( @@ -207,9 +208,21 @@ async fn main() -> Result<(), Box> { // Use the exact query pattern from inference_config let queries_instant = vec![ - ("quantile by (label_0) (0.99, fake_metric)", "10", "Configured query at t=10"), - ("quantile by (label_0) (0.99, fake_metric)", "15", "Configured query at t=15"), - ("sum_over_time(fake_metric[1s])", "10", "Temporal: sum_over_time at t=10"), + ( + "quantile by (label_0) (0.99, fake_metric)", + "10", + "Configured query at t=10", + ), + ( + "quantile by (label_0) (0.99, fake_metric)", + "15", + "Configured query at t=15", + ), + ( + "sum_over_time(fake_metric[1s])", + "10", + "Temporal: sum_over_time at t=10", + ), ("sum(fake_metric)", "10", "Spatial: sum at t=10"), ]; @@ -272,11 +285,7 @@ async fn main() -> Result<(), Box> { raw_mode_aggregation_id: raw_agg_id, }; let raw_sink = Arc::new(RawPassthroughSink::new(store.clone())); - let raw_engine = PrecomputeEngine::new( - raw_engine_config, - streaming_config.clone(), - raw_sink, - ); + let raw_engine = PrecomputeEngine::new(raw_engine_config, streaming_config.clone(), raw_sink); tokio::spawn(async move { if let Err(e) = raw_engine.run().await { eprintln!("Raw precompute engine error: {e}"); @@ -299,7 +308,10 @@ async fn main() -> Result<(), Box> { .body(body) .send() .await?; - println!(" Sent raw t={ts}ms v={val} -> HTTP {}", resp.status().as_u16()); + println!( + " Sent raw t={ts}ms v={val} -> HTTP {}", + resp.status().as_u16() + ); } // Short wait for processing (no watermark advancement needed) @@ -307,12 +319,7 @@ async fn main() -> Result<(), Box> { // Verify raw samples appeared in the store println!("\n=== Verifying raw samples in store ==="); - let results = store.query_precomputed_output( - "fake_metric", - raw_agg_id, - 100_000, - 103_000, - )?; + let results = store.query_precomputed_output("fake_metric", raw_agg_id, 100_000, 103_000)?; let total_buckets: usize = results.values().map(|v| v.len()).sum(); println!(" Found {total_buckets} buckets for aggregation_id={raw_agg_id} in [100000, 103000)"); assert!( diff --git a/asap-query-engine/src/drivers/ingest/prometheus_remote_write.rs b/asap-query-engine/src/drivers/ingest/prometheus_remote_write.rs index 428c9de..dd45029 100644 --- a/asap-query-engine/src/drivers/ingest/prometheus_remote_write.rs +++ b/asap-query-engine/src/drivers/ingest/prometheus_remote_write.rs @@ -1,5 +1,6 @@ +use prost::Message; + // use axum::{body::Bytes, extract::State, http::StatusCode, routing::post, Router}; -// use prost::Message; // use std::sync::Arc; // use tokio::net::TcpListener; // use tracing::{debug, error, info, warn}; @@ -7,137 +8,137 @@ // // use crate::stores::promsketch_store::metrics as ps_metrics; // // use crate::stores::promsketch_store::PromSketchStore; -// // --------------------------------------------------------------------------- -// // Protobuf message types (Prometheus remote write wire format) -// // --------------------------------------------------------------------------- -// // These mirror the upstream proto definitions in prometheus/prompb but are -// // defined inline via prost derive macros so we don't need a .proto file or -// // build script. - -// #[derive(Clone, PartialEq, Message)] -// pub struct WriteRequest { -// #[prost(message, repeated, tag = "1")] -// pub timeseries: Vec, -// } - -// #[derive(Clone, PartialEq, Message)] -// pub struct TimeSeries { -// #[prost(message, repeated, tag = "1")] -// pub labels: Vec