diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..3ea0852 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +target/ +.git/ diff --git a/.gitignore b/.gitignore index ffbe6e0..d09d69f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ target/ experiment_outputs/ +asap-quickstart/bin/ # Runtime and generated files metadata/ diff --git a/asap-common/dependencies/rs/asap_types/src/streaming_config.rs b/asap-common/dependencies/rs/asap_types/src/streaming_config.rs index 37b6fa0..5a7b800 100644 --- a/asap-common/dependencies/rs/asap_types/src/streaming_config.rs +++ b/asap-common/dependencies/rs/asap_types/src/streaming_config.rs @@ -1,5 +1,4 @@ use anyhow::Result; -use core::panic; use serde::{Deserialize, Serialize}; use serde_yaml::Value; use std::collections::HashMap; @@ -83,7 +82,12 @@ impl StreamingConfig { if let Some(aggregations) = data.get("aggregations").and_then(|v| v.as_sequence()) { for aggregation_data in aggregations { if let Some(aggregation_id) = aggregation_data.get("aggregationId") { - let aggregation_id_u64 = aggregation_id.as_u64().or_else(|| panic!()).unwrap(); + let aggregation_id_u64 = aggregation_id.as_u64().ok_or_else(|| { + anyhow::anyhow!( + "aggregationId must be a valid u64, got: {:?}", + aggregation_id + ) + })?; let num_aggregates_to_retain = retention_map.get(&aggregation_id_u64); let read_count_threshold = read_count_threshold_map.get(&aggregation_id_u64); let config = AggregationConfig::from_yaml_data( diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index 05dddb2..b432627 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -72,6 +72,10 @@ path = "src/bin/test_e2e_precompute.rs" name = "bench_precompute_sketch" path = "src/bin/bench_precompute_sketch.rs" +[[bin]] +name = "e2e_quickstart_resource_test" +path = "src/bin/e2e_quickstart_resource_test.rs" + [dev-dependencies] ctor = "0.2" tempfile = "3.20.0" diff --git a/asap-query-engine/Dockerfile b/asap-query-engine/Dockerfile index c036e1a..f54cdce 100644 --- a/asap-query-engine/Dockerfile +++ b/asap-query-engine/Dockerfile @@ -31,6 +31,7 @@ RUN mkdir -p asap-query-engine/src/bin \ && echo "fn main() {}" > asap-query-engine/src/bin/precompute_engine.rs \ && echo "fn main() {}" > asap-query-engine/src/bin/test_e2e_precompute.rs \ && echo "fn main() {}" > asap-query-engine/src/bin/bench_precompute_sketch.rs \ + && echo "fn main() {}" > asap-query-engine/src/bin/e2e_quickstart_resource_test.rs \ && mkdir -p asap-query-engine/benches && echo "fn main() {}" > asap-query-engine/benches/simple_store_bench.rs \ && mkdir -p asap-planner-rs/src && echo "fn main() {}" > asap-planner-rs/src/main.rs \ && echo "pub fn placeholder() {}" >> asap-planner-rs/src/lib.rs diff --git a/asap-query-engine/src/bin/e2e_quickstart_resource_test.rs b/asap-query-engine/src/bin/e2e_quickstart_resource_test.rs new file mode 100644 index 0000000..fb1e790 --- /dev/null +++ b/asap-query-engine/src/bin/e2e_quickstart_resource_test.rs @@ -0,0 +1,420 @@ +//! E2E resource usage test for the precompute engine with quickstart-like data patterns. +//! +//! Simulates 7 fake exporters × 27,000 series each = 189,000 series of `sensor_reading`, +//! scraped at 1s intervals via Prometheus remote write, matching the quickstart setup. +//! After 10 seconds of ingestion, reports CPU and memory usage. +//! +//! Usage: +//! cargo run --release --bin e2e_quickstart_resource_test + +use asap_types::aggregation_config::AggregationConfig; +use asap_types::enums::WindowType; +use promql_utilities::query_logics::enums::AggregationType; +use prost::Message; +use query_engine_rust::data_model::{CleanupPolicy, LockStrategy, StreamingConfig}; +use query_engine_rust::drivers::ingest::prometheus_remote_write::{ + Label, Sample, TimeSeries, WriteRequest, +}; +use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig}; +use query_engine_rust::precompute_engine::output_sink::StoreOutputSink; +use query_engine_rust::precompute_engine::PrecomputeEngine; +use query_engine_rust::stores::{SimpleMapStore, Store}; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +const INGEST_PORT: u16 = 19400; +const NUM_WORKERS: usize = 4; +const DURATION_SECS: u64 = 10; + +// Quickstart pattern: 7 exporters × 30×30×30 = 189,000 series +const PATTERNS: &[&str] = &[ + "constant", + "linear-up", + "linear-down", + "sine", + "sine-noise", + "step", + "exp-up", +]; +const NUM_REGIONS: usize = 30; +const NUM_SERVICES: usize = 30; +const NUM_HOSTS: usize = 30; + +fn build_remote_write_body(timeseries: Vec) -> Vec { + let write_req = WriteRequest { timeseries }; + let proto_bytes = write_req.encode_to_vec(); + snap::raw::Encoder::new() + .compress_vec(&proto_bytes) + .expect("snappy compress failed") +} + +fn make_sensor_reading( + pattern: &str, + region: &str, + service: &str, + host: &str, + instance: &str, + timestamp_ms: i64, + value: f64, +) -> TimeSeries { + TimeSeries { + labels: vec![ + Label { + name: "__name__".into(), + value: "sensor_reading".into(), + }, + Label { + name: "host".into(), + value: host.into(), + }, + Label { + name: "instance".into(), + value: instance.into(), + }, + Label { + name: "job".into(), + value: "pattern-exporters".into(), + }, + Label { + name: "pattern".into(), + value: pattern.into(), + }, + Label { + name: "region".into(), + value: region.into(), + }, + Label { + name: "service".into(), + value: service.into(), + }, + ], + samples: vec![Sample { + value, + timestamp: timestamp_ms, + }], + } +} + +/// Generate a value based on pattern type and timestamp +fn pattern_value(pattern: &str, t_secs: f64, base: f64) -> f64 { + match pattern { + "constant" => base * 1000.0, + "linear-up" => base * 1000.0 + t_secs * 10.0, + "linear-down" => base * 1000.0 - t_secs * 10.0, + "sine" => base * 1000.0 + 500.0 * (t_secs * std::f64::consts::PI / 30.0).sin(), + "sine-noise" => { + base * 1000.0 + + 500.0 * (t_secs * std::f64::consts::PI / 30.0).sin() + + 50.0 * ((t_secs * 7.3).sin()) + } + "step" => { + if (t_secs as i64 / 10) % 2 == 0 { + base * 1000.0 + } else { + base * 1000.0 + 500.0 + } + } + "exp-up" => base * 1000.0 * (1.0 + t_secs * 0.01).powf(2.0), + _ => base * 1000.0, + } +} + +fn make_kll_streaming_config() -> Arc { + // Match quickstart: DatasketchesKLL, K=200, quantile by (pattern), window=10s tumbling + let mut params = HashMap::new(); + params.insert("K".to_string(), serde_json::Value::from(200u64)); + + // Grouping by pattern (spatial key), rolling up region/service/host/instance/job + let grouping = promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![ + "pattern".to_string(), + ]); + let rollup = promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![ + "instance".to_string(), + "job".to_string(), + "region".to_string(), + "service".to_string(), + "host".to_string(), + ]); + let aggregated = promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]); + + let agg_config = AggregationConfig::new( + 1, + AggregationType::DatasketchesKLL, + String::new(), + params, + grouping, + rollup, + aggregated, + String::new(), + 10, // window size = 10s (matching quickstart range-duration/step) + 10, // tumbling + WindowType::Tumbling, + "sensor_reading".to_string(), + "sensor_reading".to_string(), + None, + None, + None, + None, + ); + + let mut agg_map = HashMap::new(); + agg_map.insert(1u64, agg_config); + Arc::new(StreamingConfig::new(agg_map)) +} + +fn read_proc_status() -> (u64, u64, u64) { + // Returns (VmRSS in KB, VmPeak in KB, VmSize in KB) + let status = std::fs::read_to_string("/proc/self/status").unwrap_or_default(); + let mut vm_rss = 0u64; + let mut vm_peak = 0u64; + let mut vm_size = 0u64; + for line in status.lines() { + if line.starts_with("VmRSS:") { + vm_rss = line + .split_whitespace() + .nth(1) + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + } else if line.starts_with("VmPeak:") { + vm_peak = line + .split_whitespace() + .nth(1) + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + } else if line.starts_with("VmSize:") { + vm_size = line + .split_whitespace() + .nth(1) + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + } + } + (vm_rss, vm_peak, vm_size) +} + +fn read_proc_cpu_time() -> (f64, f64) { + // Returns (user_time_secs, system_time_secs) from /proc/self/stat + let stat = std::fs::read_to_string("/proc/self/stat").unwrap_or_default(); + let parts: Vec<&str> = stat.split_whitespace().collect(); + if parts.len() > 14 { + let ticks_per_sec = 100.0; // typical Linux CLK_TCK + let utime = parts[13].parse::().unwrap_or(0.0) / ticks_per_sec; + let stime = parts[14].parse::().unwrap_or(0.0) / ticks_per_sec; + (utime, stime) + } else { + (0.0, 0.0) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("warn")), + ) + .init(); + + let streaming_config = make_kll_streaming_config(); + let store: Arc = Arc::new(SimpleMapStore::new_with_strategy( + streaming_config.clone(), + CleanupPolicy::CircularBuffer, + LockStrategy::PerKey, + )); + + let engine_config = PrecomputeEngineConfig { + num_workers: NUM_WORKERS, + ingest_port: INGEST_PORT, + allowed_lateness_ms: 5_000, + max_buffer_per_series: 10_000, + flush_interval_ms: 1_000, + channel_buffer_size: 50_000, + pass_raw_samples: false, + raw_mode_aggregation_id: 0, + late_data_policy: LateDataPolicy::Drop, + }; + let output_sink = Arc::new(StoreOutputSink::new(store.clone())); + let engine = PrecomputeEngine::new(engine_config, streaming_config, output_sink); + tokio::spawn(async move { + if let Err(e) = engine.run().await { + eprintln!("Precompute engine error: {e}"); + } + }); + + // Wait for server to bind + tokio::time::sleep(Duration::from_secs(1)).await; + + let series_per_pattern = NUM_REGIONS * NUM_SERVICES * NUM_HOSTS; // 27,000 + let total_series = PATTERNS.len() * series_per_pattern; // 189,000 + + println!("=== Precompute Engine E2E Resource Test ==="); + println!(" Patterns: {} ({:?})", PATTERNS.len(), PATTERNS); + println!( + " Series per pattern: {} ({}×{}×{})", + series_per_pattern, NUM_REGIONS, NUM_SERVICES, NUM_HOSTS + ); + println!(" Total series: {}", total_series); + println!(" Workers: {}", NUM_WORKERS); + println!(" Duration: {}s", DURATION_SECS); + println!(" Aggregation: DatasketchesKLL K=200, tumbling 10s, group by pattern"); + println!(); + + let (rss_before, _, _) = read_proc_status(); + let (cpu_user_before, cpu_sys_before) = read_proc_cpu_time(); + println!( + "Before ingestion: VmRSS = {} KB ({:.1} MB)", + rss_before, + rss_before as f64 / 1024.0 + ); + + let client = reqwest::Client::builder() + .pool_max_idle_per_host(8) + .build()?; + + let start = Instant::now(); + let mut total_samples_sent = 0u64; + let mut tick = 0u64; + + println!("\n--- Sending data (simulating Prometheus scrape at 1s intervals) ---"); + + while start.elapsed() < Duration::from_secs(DURATION_SECS) { + let tick_start = Instant::now(); + let timestamp_ms = (tick * 1000 + 500) as i64; // mid-second + let t_secs = tick as f64; + + // Build all timeseries for this tick. + // In the quickstart, Prometheus batches all scraped series into remote write. + // We send in chunks to avoid building a single massive request. + let chunk_size = 10_000; // series per HTTP request + let mut all_timeseries = Vec::with_capacity(total_series); + + for (p_idx, pattern) in PATTERNS.iter().enumerate() { + let instance = format!("fake-exporter-{}:5000{}", pattern, p_idx); + for r in 0..NUM_REGIONS { + let region = format!("region{}", r); + for s in 0..NUM_SERVICES { + let service = format!("svc{}", s); + for h in 0..NUM_HOSTS { + let host = format!("host{}", h); + let base = (r * NUM_SERVICES * NUM_HOSTS + s * NUM_HOSTS + h) as f64 + / (series_per_pattern as f64); + let value = pattern_value(pattern, t_secs, base); + all_timeseries.push(make_sensor_reading( + pattern, + ®ion, + &service, + &host, + &instance, + timestamp_ms, + value, + )); + } + } + } + } + + // Send in parallel chunks + let mut handles = Vec::new(); + for chunk in all_timeseries.chunks(chunk_size) { + let body = build_remote_write_body(chunk.to_vec()); + let client = client.clone(); + handles.push(tokio::spawn(async move { + let resp = client + .post(format!("http://localhost:{INGEST_PORT}/api/v1/write")) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "snappy") + .body(body) + .send() + .await; + matches!(resp, Ok(r) if r.status().is_success() || r.status() == reqwest::StatusCode::NO_CONTENT) + })); + } + + let mut all_ok = true; + for handle in handles { + if !handle.await.unwrap_or(false) { + all_ok = false; + } + } + + total_samples_sent += total_series as u64; + let send_time = tick_start.elapsed(); + + if tick.is_multiple_of(2) || !all_ok { + println!( + " tick={} t={}ms samples={} send_time={:.0}ms ok={}", + tick, + timestamp_ms, + total_series, + send_time.as_secs_f64() * 1000.0, + all_ok + ); + } + + tick += 1; + + // Sleep until next 1-second tick + let elapsed_in_tick = tick_start.elapsed(); + if elapsed_in_tick < Duration::from_secs(1) { + tokio::time::sleep(Duration::from_secs(1) - elapsed_in_tick).await; + } + } + + let wall_time = start.elapsed(); + + // Wait a bit for processing to finish + tokio::time::sleep(Duration::from_secs(2)).await; + + let (rss_after, vm_peak, vm_size) = read_proc_status(); + let (cpu_user_after, cpu_sys_after) = read_proc_cpu_time(); + + let cpu_user = cpu_user_after - cpu_user_before; + let cpu_sys = cpu_sys_after - cpu_sys_before; + let cpu_total = cpu_user + cpu_sys; + + println!("\n=== Resource Usage Report (after {}s) ===", DURATION_SECS); + println!(" Wall time: {:.1}s", wall_time.as_secs_f64()); + println!(" Ticks completed: {}", tick); + println!(" Total samples sent: {}", total_samples_sent); + println!( + " Avg throughput: {:.0} samples/sec", + total_samples_sent as f64 / wall_time.as_secs_f64() + ); + println!(); + println!(" --- Memory ---"); + println!( + " VmRSS (current): {} KB ({:.1} MB)", + rss_after, + rss_after as f64 / 1024.0 + ); + println!( + " VmPeak: {} KB ({:.1} MB)", + vm_peak, + vm_peak as f64 / 1024.0 + ); + println!( + " VmSize: {} KB ({:.1} MB)", + vm_size, + vm_size as f64 / 1024.0 + ); + println!( + " RSS delta: {} KB ({:.1} MB)", + rss_after.saturating_sub(rss_before), + rss_after.saturating_sub(rss_before) as f64 / 1024.0 + ); + println!(); + println!(" --- CPU ---"); + println!(" User time: {:.2}s", cpu_user); + println!(" System time: {:.2}s", cpu_sys); + println!(" Total CPU time: {:.2}s", cpu_total); + println!( + " CPU utilization: {:.1}% (of {:.1}s wall time)", + cpu_total / wall_time.as_secs_f64() * 100.0, + wall_time.as_secs_f64() + ); + + println!("\n=== Test complete ==="); + + Ok(()) +} diff --git a/asap-query-engine/src/data_model/enums.rs b/asap-query-engine/src/data_model/enums.rs index 885fbee..d8c4459 100644 --- a/asap-query-engine/src/data_model/enums.rs +++ b/asap-query-engine/src/data_model/enums.rs @@ -4,9 +4,10 @@ pub enum InputFormat { Byte, } -#[derive(clap::ValueEnum, Clone, Debug)] +#[derive(clap::ValueEnum, Clone, Debug, PartialEq)] pub enum StreamingEngine { Arroyo, + Precompute, } pub use asap_types::enums::{CleanupPolicy, QueryLanguage, WindowType}; diff --git a/asap-query-engine/src/main.rs b/asap-query-engine/src/main.rs index 0be1d13..fa589aa 100644 --- a/asap-query-engine/src/main.rs +++ b/asap-query-engine/src/main.rs @@ -10,6 +10,7 @@ use sketch_core::config::{self, ImplMode}; use query_engine_rust::data_model::enums::{InputFormat, LockStrategy, StreamingEngine}; use query_engine_rust::drivers::AdapterConfig; use query_engine_rust::precompute_engine::config::LateDataPolicy; +use query_engine_rust::precompute_engine::PrecomputeWorkerDiagnostics; use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config}; use query_engine_rust::{ HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtlpReceiver, @@ -20,13 +21,13 @@ use query_engine_rust::{ #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { - /// Kafka topic to consume from + /// Kafka topic to consume from (required when streaming-engine=arroyo) #[arg(long)] - kafka_topic: String, + kafka_topic: Option, - /// Input format for Kafka messages + /// Input format for Kafka messages (required when streaming-engine=arroyo) #[arg(long, value_enum)] - input_format: InputFormat, + input_format: Option, /// Configuration file path #[arg(long)] @@ -37,7 +38,7 @@ struct Args { streaming_config: String, /// Streaming engine to use - #[arg(long, value_enum)] + #[arg(long, value_enum, default_value = "arroyo")] streaming_engine: StreamingEngine, /// Prometheus scrape interval in seconds @@ -241,41 +242,54 @@ async fn main() -> Result<()> { args.query_language, )); - // Setup Kafka consumer (equivalent to Python's kafka_thread) - let kafka_config = KafkaConsumerConfig { - broker: args.kafka_broker.clone(), - topic: args.kafka_topic.clone(), - group_id: "query-engine-rust".to_string(), - auto_offset_reset: "beginning".to_string(), - input_format: args.input_format, - decompress_json: args.decompress_json, - batch_size: 1000, - poll_timeout_ms: 1000, - streaming_engine: args.streaming_engine.clone(), - dump_precomputes: args.dump_precomputes, - dump_output_dir: if args.dump_precomputes { - Some(args.output_dir.clone()) - } else { - None - }, - }; + // Setup Kafka consumer (only when not using precompute engine as the streaming backend) + let kafka_handle = if args.streaming_engine == StreamingEngine::Precompute { + info!("Using precompute engine as streaming backend — skipping Kafka consumer"); + None + } else { + let kafka_topic = args.kafka_topic.clone().unwrap_or_else(|| { + error!("--kafka-topic is required when --streaming-engine is not precompute"); + std::process::exit(1); + }); + let input_format = args.input_format.unwrap_or_else(|| { + error!("--input-format is required when --streaming-engine is not precompute"); + std::process::exit(1); + }); + let kafka_config = KafkaConsumerConfig { + broker: args.kafka_broker.clone(), + topic: kafka_topic.clone(), + group_id: "query-engine-rust".to_string(), + auto_offset_reset: "beginning".to_string(), + input_format, + decompress_json: args.decompress_json, + batch_size: 1000, + poll_timeout_ms: 1000, + streaming_engine: args.streaming_engine.clone(), + dump_precomputes: args.dump_precomputes, + dump_output_dir: if args.dump_precomputes { + Some(args.output_dir.clone()) + } else { + None + }, + }; - let store_for_kafka = store.clone(); - let kafka_consumer_result = - KafkaConsumer::new(kafka_config, store_for_kafka, streaming_config.clone()); - let kafka_handle = match kafka_consumer_result { - Ok(mut consumer) => { - info!("Starting Kafka consumer for topic: {}", args.kafka_topic); - Some(tokio::spawn(async move { - if let Err(e) = consumer.run().await { - error!("Kafka consumer error: {}", e); - } - })) - } - Err(e) => { - error!("Failed to create Kafka consumer: {}", e); - info!("Continuing without Kafka consumer"); - None + let store_for_kafka = store.clone(); + let kafka_consumer_result = + KafkaConsumer::new(kafka_config, store_for_kafka, streaming_config.clone()); + match kafka_consumer_result { + Ok(mut consumer) => { + info!("Starting Kafka consumer for topic: {}", kafka_topic); + Some(tokio::spawn(async move { + if let Err(e) = consumer.run().await { + error!("Kafka consumer error: {}", e); + } + })) + } + Err(e) => { + error!("Failed to create Kafka consumer: {}", e); + info!("Continuing without Kafka consumer"); + None + } } }; @@ -300,7 +314,10 @@ async fn main() -> Result<()> { }; // Setup precompute engine (replaces standalone Prometheus remote write server) - let precompute_handle = if args.enable_prometheus_remote_write { + // Automatically enable when using precompute streaming engine + let enable_precompute = + args.enable_prometheus_remote_write || args.streaming_engine == StreamingEngine::Precompute; + let precompute_handle = if enable_precompute { let precompute_config = PrecomputeEngineConfig { num_workers: args.precompute_num_workers, ingest_port: args.prometheus_remote_write_port, @@ -315,16 +332,29 @@ async fn main() -> Result<()> { let output_sink = Arc::new(StoreOutputSink::new(store.clone())); let engine = PrecomputeEngine::new(precompute_config, streaming_config.clone(), output_sink); + let worker_diagnostics = engine.diagnostics(); info!( "Starting precompute engine on port {}", args.prometheus_remote_write_port ); + + // Spawn periodic memory diagnostics logger + let diag_store = store.clone(); + tokio::spawn(async move { + spawn_memory_diagnostics(diag_store, Some(worker_diagnostics)).await; + }); + Some(tokio::spawn(async move { if let Err(e) = engine.run().await { error!("Precompute engine error: {}", e); } })) } else { + // Even without precompute, log store diagnostics + let diag_store = store.clone(); + tokio::spawn(async move { + spawn_memory_diagnostics(diag_store, None).await; + }); None }; @@ -419,6 +449,59 @@ async fn main() -> Result<()> { Ok(()) } +/// Periodic memory diagnostics logger — runs every 30 seconds. +async fn spawn_memory_diagnostics( + store: Arc, + worker_diagnostics: Option>, +) { + use std::sync::atomic::Ordering; + + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30)); + loop { + interval.tick().await; + + // 1. Store diagnostics + let store_diag = store.diagnostic_info(); + info!( + "[MEMORY_DIAG] Store: {} aggregation(s), {} total time_map entries, {:.2} KB total sketch bytes", + store_diag.num_aggregations, + store_diag.total_time_map_entries, + store_diag.total_sketch_bytes as f64 / 1024.0, + ); + for agg in &store_diag.per_aggregation { + info!( + "[MEMORY_DIAG] agg_id={}: time_map_len={}, read_counts_len={}, aggregate_objects={}, sketch_bytes={:.2} KB", + agg.aggregation_id, + agg.time_map_len, + agg.read_counts_len, + agg.num_aggregate_objects, + agg.sketch_bytes as f64 / 1024.0, + ); + } + + // 2. Worker diagnostics (precompute engine only) + if let Some(ref diag) = worker_diagnostics { + let total_groups: usize = diag + .worker_group_counts + .iter() + .map(|c| c.load(Ordering::Relaxed)) + .sum(); + info!( + "[MEMORY_DIAG] PrecomputeEngine: {} total groups across {} workers", + total_groups, + diag.worker_group_counts.len(), + ); + for (i, counter) in diag.worker_group_counts.iter().enumerate() { + info!( + "[MEMORY_DIAG] worker_{}: group_states_len={}", + i, + counter.load(Ordering::Relaxed), + ); + } + } + } +} + fn setup_logging( output_dir: &str, log_level: &str, diff --git a/asap-query-engine/src/precompute_engine/engine.rs b/asap-query-engine/src/precompute_engine/engine.rs index 4ae38ab..8fd45b8 100644 --- a/asap-query-engine/src/precompute_engine/engine.rs +++ b/asap-query-engine/src/precompute_engine/engine.rs @@ -9,11 +9,18 @@ use crate::precompute_engine::worker::{Worker, WorkerRuntimeConfig}; use asap_types::aggregation_config::AggregationConfig; use axum::{routing::post, Router}; use std::collections::HashMap; +use std::sync::atomic::{AtomicI64, AtomicUsize}; use std::sync::Arc; use tokio::net::TcpListener; use tokio::sync::mpsc; use tracing::{info, warn}; +/// Shared diagnostic counters readable from outside the engine. +pub struct PrecomputeWorkerDiagnostics { + pub worker_group_counts: Vec>, + pub worker_watermarks: Vec>, +} + /// The top-level precompute engine orchestrator. /// /// Creates worker threads, the series router, and the Axum ingest server. @@ -21,6 +28,7 @@ pub struct PrecomputeEngine { config: PrecomputeEngineConfig, streaming_config: Arc, output_sink: Arc, + diagnostics: Arc, } impl PrecomputeEngine { @@ -29,13 +37,29 @@ impl PrecomputeEngine { streaming_config: Arc, output_sink: Arc, ) -> Self { + let worker_group_counts = (0..config.num_workers) + .map(|_| Arc::new(AtomicUsize::new(0))) + .collect(); + let worker_watermarks = (0..config.num_workers) + .map(|_| Arc::new(AtomicI64::new(i64::MIN))) + .collect(); + let diagnostics = Arc::new(PrecomputeWorkerDiagnostics { + worker_group_counts, + worker_watermarks, + }); Self { config, streaming_config, output_sink, + diagnostics, } } + /// Get a handle to worker diagnostics, readable even after `run()` starts. + pub fn diagnostics(&self) -> Arc { + self.diagnostics.clone() + } + /// Start the precompute engine. This spawns worker tasks and the HTTP /// ingest server, then blocks until shutdown. pub async fn run(self) -> Result<(), Box> { @@ -63,6 +87,9 @@ impl PrecomputeEngine { .map(|(&id, cfg)| (id, Arc::new(cfg.clone()))) .collect(); + // Build a Vec> for the ingest handler + let agg_configs_vec: Vec> = agg_configs.values().cloned().collect(); + // Spawn workers let mut worker_handles = Vec::with_capacity(num_workers); for (id, rx) in receivers.into_iter().enumerate() { @@ -78,6 +105,9 @@ impl PrecomputeEngine { raw_mode_aggregation_id: self.config.raw_mode_aggregation_id, late_data_policy: self.config.late_data_policy, }, + self.diagnostics.worker_group_counts[id].clone(), + self.diagnostics.worker_watermarks[id].clone(), + self.diagnostics.worker_watermarks.to_vec(), ); let handle = tokio::spawn(async move { worker.run().await; @@ -94,6 +124,8 @@ impl PrecomputeEngine { let ingest_state = Arc::new(IngestState { router, samples_ingested: std::sync::atomic::AtomicU64::new(0), + agg_configs: agg_configs_vec, + pass_raw_samples: self.config.pass_raw_samples, }); // Start flush timer diff --git a/asap-query-engine/src/precompute_engine/ingest_handler.rs b/asap-query-engine/src/precompute_engine/ingest_handler.rs index 57537dd..03b9ac5 100644 --- a/asap-query-engine/src/precompute_engine/ingest_handler.rs +++ b/asap-query-engine/src/precompute_engine/ingest_handler.rs @@ -1,6 +1,8 @@ use crate::drivers::ingest::prometheus_remote_write::decode_prometheus_remote_write; use crate::drivers::ingest::victoriametrics_remote_write::decode_victoriametrics_remote_write; -use crate::precompute_engine::series_router::SeriesRouter; +use crate::precompute_engine::series_router::{SeriesRouter, WorkerMessage}; +use crate::precompute_engine::worker::{extract_metric_name, parse_labels_from_series_key}; +use asap_types::aggregation_config::AggregationConfig; use axum::{body::Bytes, extract::State, http::StatusCode}; use std::collections::HashMap; use std::sync::Arc; @@ -11,9 +13,28 @@ use tracing::warn; pub(crate) struct IngestState { pub(crate) router: SeriesRouter, pub(crate) samples_ingested: std::sync::atomic::AtomicU64, + /// Aggregation configs for group-key extraction. + pub(crate) agg_configs: Vec>, + /// When true, skip group-key extraction and pass raw samples through. + pub(crate) pass_raw_samples: bool, } -/// Shared logic: group decoded samples by series key and route to workers. +/// Extract the group key (grouping label values joined by semicolons) +/// for a given series key and aggregation config. +fn extract_group_key(series_key: &str, config: &AggregationConfig) -> String { + let labels = parse_labels_from_series_key(series_key); + let mut values = Vec::new(); + for label_name in &config.grouping_labels.labels { + if let Some(val) = labels.get(label_name.as_str()) { + values.push(*val); + } else { + values.push(""); + } + } + values.join(";") +} + +/// Shared logic: group decoded samples by (agg_id, group_key) and route to workers. async fn route_decoded_samples( state: &IngestState, samples: Vec, @@ -28,25 +49,75 @@ async fn route_decoded_samples( .samples_ingested .fetch_add(count, std::sync::atomic::Ordering::Relaxed); - // Group samples by series key for batch routing - let mut by_series: HashMap<&str, Vec<(i64, f64)>> = HashMap::new(); + if state.pass_raw_samples { + // Raw mode: group by series key and send as RawSamples + let mut by_series: HashMap<&str, Vec<(i64, f64)>> = HashMap::new(); + for s in &samples { + by_series + .entry(&s.labels) + .or_default() + .push((s.timestamp_ms, s.value)); + } + let messages: Vec = by_series + .into_iter() + .map(|(k, v)| WorkerMessage::RawSamples { + series_key: k.to_string(), + samples: v, + ingest_received_at, + }) + .collect(); + + if let Err(e) = state + .router + .route_group_batch(messages, ingest_received_at) + .await + { + warn!("Batch routing error: {}", e); + return StatusCode::INTERNAL_SERVER_ERROR; + } + return StatusCode::NO_CONTENT; + } + + // Group-by mode: for each sample, find matching agg configs and group by + // (agg_id, group_key). This is the equivalent of Arroyo's GROUP BY. + // + // Key: (agg_id, group_key) → Vec<(series_key, timestamp_ms, value)> + type GroupKey = (u64, String); + type SampleTuple = (String, i64, f64); + let mut by_group: HashMap> = HashMap::new(); + for s in &samples { - by_series - .entry(&s.labels) - .or_default() - .push((s.timestamp_ms, s.value)); + let metric_name = extract_metric_name(&s.labels); + for config in &state.agg_configs { + if config.metric != metric_name + && config.spatial_filter_normalized != metric_name + && config.spatial_filter != metric_name + { + continue; + } + let group_key = extract_group_key(&s.labels, config); + by_group + .entry((config.aggregation_id, group_key)) + .or_default() + .push((s.labels.clone(), s.timestamp_ms, s.value)); + } } - // Convert to owned keys for batch routing - let by_series_owned: HashMap> = by_series + let messages: Vec = by_group .into_iter() - .map(|(k, v)| (k.to_string(), v)) + .map( + |((agg_id, group_key), samples)| WorkerMessage::GroupSamples { + agg_id, + group_key, + samples, + ingest_received_at, + }, + ) .collect(); - // Route all series to workers concurrently if let Err(e) = state .router - .route_batch(by_series_owned, ingest_received_at) + .route_group_batch(messages, ingest_received_at) .await { warn!("Batch routing error: {}", e); diff --git a/asap-query-engine/src/precompute_engine/mod.rs b/asap-query-engine/src/precompute_engine/mod.rs index 7b960ca..0edda2f 100644 --- a/asap-query-engine/src/precompute_engine/mod.rs +++ b/asap-query-engine/src/precompute_engine/mod.rs @@ -8,4 +8,4 @@ pub mod series_router; pub mod window_manager; pub mod worker; -pub use engine::PrecomputeEngine; +pub use engine::{PrecomputeEngine, PrecomputeWorkerDiagnostics}; diff --git a/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md b/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md index 5ab706f..55070a1 100644 --- a/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md +++ b/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md @@ -51,6 +51,122 @@ A periodic **flush timer** broadcasts `Flush` messages to all workers so that windows that would otherwise remain open (no new samples arriving) are closed and emitted. +## 2.1 Watermark Propagation + +### How watermarks work + +A watermark is a monotonically increasing timestamp assertion: **"no more events +with timestamp <= W will arrive."** It tells the system when a time window can +be safely closed and its results emitted. + +``` +Time ──────────────────────────────────────────────────────────► + +Event Stream (arriving out of order): + t=3 t=1 t=5 t=2 t=7 t=4 t=9 t=6 t=11 t=8 t=13 + ● ● ● ● ● ● ● ● ● ● ● + +Watermark (max_ts - allowed_lateness, where lateness=2): + W=1 W=1 W=3 W=3 W=5 W=5 W=7 W=7 W=9 W=9 W=11 + ─────┘ ─────┘ ─────┘ ─────┘ ─────┘ │ + (no advance, (advances) │ + older event) │ + +Window Lifecycle (window_size=5, slide=5): + │ + ┌─────────────────────┐ │ + │ Window [0, 5) │ │ + │ collects: t=3,1,2,4│ │ + │ │── W=5 crosses end ──► EMIT │ + └─────────────────────┘ │ + │ + ┌─────────────────────┐ │ + │ Window [5, 10) │ │ + │ collects: t=5,7,9,6│ │ + │ │── W=11 crosses end │ + └─────────────────────┘ ──► EMIT │ + │ + ┌─────────────────────┐ │ + │ Window [10, 15) │ │ + │ collects: t=11,13 │ (still open, │ + │ ...waiting... │ W=11 < 15) │ + └─────────────────────┘ │ + + +Late data handling: + + Timeline: ... t=6 t=10 t=3 ... + ● ● ● + │ + Watermark W=8 ─┘ + t=3 < W - allowed_lateness(2) = 6? + 3 < 6 → YES, late → DROP (or ForwardToStore) +``` + +### Cross-group watermark propagation + +Without cross-group propagation, each group tracks its own watermark +independently. If a group stops receiving data, its watermark freezes and its +windows never close. Cross-group propagation solves this with two layers: + +**Layer 1 — Intra-worker (max):** Each worker computes its worker watermark as +`max(all group watermarks)`. This represents "time has progressed to at least +here on this worker." During each flush, idle groups are advanced to the worker +watermark. + +**Layer 2 — Cross-worker (min):** Each worker publishes its worker watermark to +a shared `Arc`. The global watermark is `min(all worker watermarks)`, +ignoring workers that have not yet started. This becomes the floor for all group +watermarks across all workers. + +``` + ┌──────────────────────────────────────────┐ + │ Shared Atomics │ + │ AtomicI64[0] AtomicI64[1] AtomicI64[2]│ + │ 100s 80s 90s │ + └──┬──────────────┬──────────────┬─────────┘ + │ store │ store │ store + │ (Release) │ (Release) │ (Release) + ┌────────┴───┐ ┌───────┴────┐ ┌─────┴──────┐ + │ Worker 0 │ │ Worker 1 │ │ Worker 2 │ + │ │ │ │ │ │ + │ Groups: │ │ Groups: │ │ Groups: │ + │ A: wm=100s│ │ C: wm=80s│ │ E: wm=90s│ + │ B: wm=50s │ │ D: wm=80s│ │ F: wm=30s│ + │ │ │ │ │ │ + │ worker_wm │ │ worker_wm │ │ worker_wm │ + │ = max(A,B) │ │ = max(C,D) │ │ = max(E,F) │ + │ = 100s │ │ = 80s │ │ = 90s │ + └────────────┘ └────────────┘ └────────────┘ + │ load all │ load all │ load all + │ (Acquire) │ (Acquire) │ (Acquire) + ▼ ▼ ▼ + global_wm = min(100s, 80s, 90s) = 80s + + On flush, each group's effective watermark becomes: + max(group_wm, global_wm) + 1ms + + Worker 0: Group B (50s) → advanced to 80s → closes [50s, 80s] windows + Worker 2: Group F (30s) → advanced to 80s → closes [30s, 80s] windows +``` + +**Why max within a worker?** We want to propagate forward progress from active +groups to idle groups on the same worker. + +**Why min across workers?** Conservative: only advance as far as ALL workers +agree time has progressed. If worker 1 is behind at 80s, we should not close +windows at 90s on worker 2 because worker 1 might still send data for those +windows. + +**Staleness:** Because workers read each other's atomics during flush, the +global watermark may be up to one `flush_interval_ms` (default 1s) stale. +This is acceptable — it only means idle groups close windows one flush cycle +later than they theoretically could. + +**Unstarted workers:** Workers that have not yet received any data remain at +`i64::MIN` and are excluded from the global watermark min calculation. This +prevents a cold worker from blocking the entire system. + ## 3. Components ### 3.1 PrecomputeEngine (`mod.rs`) diff --git a/asap-query-engine/src/precompute_engine/series_router.rs b/asap-query-engine/src/precompute_engine/series_router.rs index 94d757a..45ca1b2 100644 --- a/asap-query-engine/src/precompute_engine/series_router.rs +++ b/asap-query-engine/src/precompute_engine/series_router.rs @@ -7,20 +7,34 @@ use xxhash_rust::xxh64::xxh64; /// A message sent from the router to a worker. #[derive(Debug)] pub enum WorkerMessage { - /// A batch of samples for the same series. - Samples { + /// A batch of samples for the same series, routed by series key. + /// Used in `pass_raw_samples` mode where no aggregation is needed. + RawSamples { series_key: String, samples: Vec<(i64, f64)>, // (timestamp_ms, value) ingest_received_at: Instant, }, + /// A batch of samples destined for a specific aggregation group. + /// All samples share the same (agg_id, group_key) and are fed into + /// a single shared accumulator (like Arroyo's GROUP BY). + GroupSamples { + agg_id: u64, + /// Grouping label values joined by semicolons (e.g. "constant"). + /// Empty string if the aggregation has no grouping labels. + group_key: String, + /// Each entry: (series_key, timestamp_ms, value). + /// series_key is needed for keyed (MultipleSubpopulation) accumulators + /// to extract the aggregated-label key. + samples: Vec<(String, i64, f64)>, + ingest_received_at: Instant, + }, /// Signal the worker to flush/check idle windows. Flush, /// Graceful shutdown. Shutdown, } -/// Routes incoming samples to one of N workers based on a consistent hash -/// of the series label string. +/// Routes incoming samples to one of N workers based on a consistent hash. pub struct SeriesRouter { senders: Vec>, num_workers: usize, @@ -35,46 +49,26 @@ impl SeriesRouter { } } - /// Route a batch of samples for one series to the appropriate worker. - pub async fn route( - &self, - series_key: &str, - samples: Vec<(i64, f64)>, - ingest_received_at: Instant, - ) -> Result<(), Box> { - let worker_idx = self.worker_for(series_key); - self.senders[worker_idx] - .send(WorkerMessage::Samples { - series_key: series_key.to_string(), - samples, - ingest_received_at, - }) - .await - .map_err(|e| format!("Failed to send to worker {}: {}", worker_idx, e))?; - Ok(()) - } - - /// Route a pre-grouped batch of series to workers concurrently. + /// Route a pre-grouped batch of group messages to workers concurrently. /// - /// Groups messages by target worker, then sends to each worker in parallel - /// (messages within a single worker are sent sequentially to preserve ordering). - pub async fn route_batch( + /// Each `GroupSamples` message is routed by `hash(agg_id, group_key)`. + /// Messages within a single worker are sent sequentially to preserve ordering. + pub async fn route_group_batch( &self, - by_series: HashMap>, - ingest_received_at: Instant, + messages: Vec, + _ingest_received_at: Instant, ) -> Result<(), Box> { // Group messages by target worker index let mut per_worker: HashMap> = HashMap::new(); - for (series_key, samples) in by_series { - let worker_idx = self.worker_for(&series_key); - per_worker - .entry(worker_idx) - .or_default() - .push(WorkerMessage::Samples { - series_key, - samples, - ingest_received_at, - }); + for msg in messages { + let worker_idx = match &msg { + WorkerMessage::GroupSamples { + agg_id, group_key, .. + } => self.worker_for_group(*agg_id, group_key), + WorkerMessage::RawSamples { series_key, .. } => self.worker_for(series_key), + _ => 0, + }; + per_worker.entry(worker_idx).or_default().push(msg); } // Send to each worker concurrently @@ -120,7 +114,16 @@ impl SeriesRouter { Ok(()) } - /// Determine which worker handles a given series key. + /// Determine which worker handles a given group key. + fn worker_for_group(&self, agg_id: u64, group_key: &str) -> usize { + // Hash both agg_id and group_key together for consistent routing + let mut hash_input = agg_id.to_le_bytes().to_vec(); + hash_input.extend_from_slice(group_key.as_bytes()); + let hash = xxh64(&hash_input, 0); + (hash as usize) % self.num_workers + } + + /// Determine which worker handles a given series key (for raw mode). fn worker_for(&self, series_key: &str) -> usize { let hash = xxh64(series_key.as_bytes(), 0); (hash as usize) % self.num_workers @@ -132,8 +135,28 @@ mod tests { use super::*; #[test] - fn test_consistent_routing() { - // Build a router with dummy senders (we only test the hash logic) + fn test_consistent_group_routing() { + let (senders, _receivers): (Vec<_>, Vec<_>) = + (0..4).map(|_| mpsc::channel::(10)).unzip(); + + let router = SeriesRouter::new(senders); + + // Same (agg_id, group_key) should always go to the same worker + let w1 = router.worker_for_group(1, "constant"); + let w2 = router.worker_for_group(1, "constant"); + assert_eq!(w1, w2); + + // Different group keys may go to different workers + let _ = router.worker_for_group(1, "sine"); + assert!(router.worker_for_group(1, "linear-up") < 4); + + // Different agg_ids with same group key may go to different workers + let _ = router.worker_for_group(2, "constant"); + assert!(router.worker_for_group(2, "constant") < 4); + } + + #[test] + fn test_raw_mode_routing() { let (senders, _receivers): (Vec<_>, Vec<_>) = (0..4).map(|_| mpsc::channel::(10)).unzip(); @@ -143,10 +166,6 @@ mod tests { let w1 = router.worker_for("cpu{host=\"a\"}"); let w2 = router.worker_for("cpu{host=\"a\"}"); assert_eq!(w1, w2); - - // Different keys may go to different workers (probabilistic, but verifiable) - let _ = router.worker_for("cpu{host=\"b\"}"); - // Just ensure no panic and result is in range assert!(router.worker_for("mem{host=\"a\"}") < 4); } } diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index b7f5624..8be32e4 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -1,40 +1,33 @@ use crate::data_model::{AggregateCore, KeyByLabelValues, PrecomputedOutput}; use crate::precompute_engine::accumulator_factory::{ - config_is_keyed, create_accumulator_updater, AccumulatorUpdater, + create_accumulator_updater, AccumulatorUpdater, }; use crate::precompute_engine::config::LateDataPolicy; use crate::precompute_engine::output_sink::OutputSink; -use crate::precompute_engine::series_buffer::SeriesBuffer; use crate::precompute_engine::series_router::WorkerMessage; use crate::precompute_engine::window_manager::WindowManager; use crate::precompute_operators::sum_accumulator::SumAccumulator; use asap_types::aggregation_config::AggregationConfig; use std::collections::{BTreeMap, HashMap}; +use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; use std::sync::Arc; use tokio::sync::mpsc; use tracing::{debug, debug_span, info, warn}; -/// Per-aggregation state within a series: the window manager and active -/// pane accumulators. +/// Per-group aggregation state: window manager + active pane accumulators. +/// This is the equivalent of one (agg_id, group_key) in Arroyo's GROUP BY. /// -/// Uses pane-based sliding window computation: each sample is routed to -/// exactly 1 pane (sub-window of size `slide_interval`). When a window -/// closes, its constituent panes are merged. This reduces per-sample -/// accumulator updates from W to 1 (where W = window_size / slide_interval). -struct AggregationState { +/// All raw series sharing the same grouping label values feed into the same +/// accumulator, producing one output per (group_key, window) — exactly like +/// Arroyo's `GROUP BY window, key`. +struct GroupState { config: Arc, window_manager: WindowManager, /// Active panes keyed by pane_start_ms. - /// BTreeMap for ordered iteration (needed for pane eviction). active_panes: BTreeMap>, -} - -/// Per-series state owned by the worker. -struct SeriesState { - buffer: SeriesBuffer, + /// Per-group watermark: tracks the maximum timestamp seen across all + /// series in this group on this worker. previous_watermark_ms: i64, - /// One AggregationState per matching aggregation config. - aggregations: Vec, } /// Runtime configuration for a Worker, grouping non-structural parameters. @@ -46,17 +39,20 @@ pub struct WorkerRuntimeConfig { pub late_data_policy: LateDataPolicy, } -/// Worker that processes samples for a shard of the series space. +/// Worker that processes samples for a shard of the group space. +/// +/// Unlike the old per-series design, this worker maintains accumulators +/// keyed by `(agg_id, group_key)`. Multiple raw series with the same +/// grouping label values share a single accumulator, producing one merged +/// output per window — matching Arroyo's `GROUP BY` semantics. pub struct Worker { id: usize, receiver: mpsc::Receiver, output_sink: Arc, - /// Map from series key to per-series state. - series_map: HashMap, + /// Map from (agg_id, group_key) to per-group state. + group_states: HashMap<(u64, String), GroupState>, /// Aggregation configs, keyed by aggregation_id. agg_configs: HashMap>, - /// Max buffer size per series. - max_buffer_per_series: usize, /// Allowed lateness in ms. allowed_lateness_ms: i64, /// When true, skip aggregation and pass raw samples through. @@ -65,18 +61,29 @@ pub struct Worker { raw_mode_aggregation_id: u64, /// Policy for handling late samples that arrive after their window has closed. late_data_policy: LateDataPolicy, + /// This worker's watermark atomic, shared with engine for cross-worker reads. + /// Updated during flush with max(all group watermarks). + worker_watermark: Arc, + /// All worker watermark atomics (including self), for computing global watermark. + all_worker_watermarks: Vec>, + /// Externally-readable group count for diagnostics. + group_count: Arc, } impl Worker { + #[allow(clippy::too_many_arguments)] pub fn new( id: usize, receiver: mpsc::Receiver, output_sink: Arc, agg_configs: HashMap>, runtime_config: WorkerRuntimeConfig, + group_count: Arc, + worker_watermark: Arc, + all_worker_watermarks: Vec>, ) -> Self { let WorkerRuntimeConfig { - max_buffer_per_series, + max_buffer_per_series: _, allowed_lateness_ms, pass_raw_samples, raw_mode_aggregation_id, @@ -86,13 +93,15 @@ impl Worker { id, receiver, output_sink, - series_map: HashMap::new(), + group_states: HashMap::new(), agg_configs, - max_buffer_per_series, allowed_lateness_ms, pass_raw_samples, raw_mode_aggregation_id, late_data_policy, + worker_watermark, + all_worker_watermarks, + group_count, } } @@ -102,27 +111,52 @@ impl Worker { while let Some(msg) = self.receiver.recv().await { match msg { - WorkerMessage::Samples { - series_key, + WorkerMessage::GroupSamples { + agg_id, + group_key, samples, ingest_received_at, } => { let sample_count = samples.len(); let _span = debug_span!( - "worker_process", + "worker_process_group", worker_id = self.id, - series = %series_key, + agg_id, + group = %group_key, sample_count, ) .entered(); - if let Err(e) = self.process_samples(&series_key, samples) { - warn!("Worker {} error processing {}: {}", self.id, series_key, e); + if let Err(e) = self.process_group_samples(agg_id, &group_key, samples) { + warn!( + "Worker {} error processing group ({}, {}): {}", + self.id, agg_id, group_key, e + ); } debug!( e2e_latency_us = ingest_received_at.elapsed().as_micros() as u64, "e2e: ingest->worker complete" ); } + WorkerMessage::RawSamples { + series_key, + samples, + ingest_received_at, + } => { + let _span = debug_span!( + "worker_process_raw", + worker_id = self.id, + series = %series_key, + sample_count = samples.len(), + ) + .entered(); + if let Err(e) = self.process_samples_raw(&series_key, samples) { + warn!("Worker {} raw error for {}: {}", self.id, series_key, e); + } + debug!( + e2e_latency_us = ingest_received_at.elapsed().as_micros() as u64, + "e2e: ingest->worker complete (raw)" + ); + } WorkerMessage::Flush => { if let Err(e) = self.flush_all() { warn!("Worker {} flush error: {}", self.id, e); @@ -130,7 +164,6 @@ impl Worker { } WorkerMessage::Shutdown => { info!("Worker {} shutting down", self.id); - // Final flush before shutdown if let Err(e) = self.flush_all() { warn!("Worker {} final flush error: {}", self.id, e); } @@ -140,188 +173,150 @@ impl Worker { } info!( - "Worker {} stopped, {} active series", + "Worker {} stopped, {} active groups", self.id, - self.series_map.len() + self.group_states.len() ); } - /// Find all aggregation configs whose metric/spatial_filter matches this series. - /// Returns owned `Arc` clones so callers are not lifetime-bound to `&self`. - fn matching_agg_configs(&self, series_key: &str) -> Vec<(u64, Arc)> { - let metric_name = extract_metric_name(series_key); - - self.agg_configs - .iter() - .filter(|(_, config)| { - // Match on metric name - config.metric == metric_name - || config.spatial_filter_normalized == metric_name - || config.spatial_filter == metric_name - }) - .map(|(&id, config)| (id, Arc::clone(config))) - .collect() - } - - /// Get or create the SeriesState for a series key. - fn get_or_create_series_state(&mut self, series_key: &str) -> &mut SeriesState { - if !self.series_map.contains_key(series_key) { - let matching = self.matching_agg_configs(series_key); - let aggregations = matching - .into_iter() - .map(|(_, config)| AggregationState { - window_manager: WindowManager::new(config.window_size, config.slide_interval), - config, // Arc clone is cheap; no deep copy - active_panes: BTreeMap::new(), - }) - .collect(); - - self.series_map.insert( - series_key.to_string(), - SeriesState { - buffer: SeriesBuffer::new(self.max_buffer_per_series), - previous_watermark_ms: i64::MIN, - aggregations, - }, - ); + /// Get or create the GroupState for a (agg_id, group_key) pair. + /// Returns None if agg_id has no matching config. + fn get_or_create_group_state( + &mut self, + agg_id: u64, + group_key: &str, + ) -> Option<&mut GroupState> { + let key = (agg_id, group_key.to_string()); + if !self.group_states.contains_key(&key) { + let config = self.agg_configs.get(&agg_id)?; + let gs = GroupState { + window_manager: WindowManager::new(config.window_size, config.slide_interval), + config: Arc::clone(config), + active_panes: BTreeMap::new(), + previous_watermark_ms: i64::MIN, + }; + self.group_states.insert(key.clone(), gs); + self.group_count + .store(self.group_states.len(), Ordering::Relaxed); } - - self.series_map.get_mut(series_key).unwrap() + self.group_states.get_mut(&key) } - fn process_samples( + /// Process a batch of samples for a specific (agg_id, group_key). + /// All samples in the batch feed into the same shared accumulator. + /// + /// This is the core of the Arroyo-equivalent GROUP BY logic. + pub fn process_group_samples( &mut self, - series_key: &str, - samples: Vec<(i64, f64)>, + agg_id: u64, + group_key: &str, + samples: Vec<(String, i64, f64)>, // (series_key, timestamp_ms, value) ) -> Result<(), Box> { - if self.pass_raw_samples { - return self.process_samples_raw(series_key, samples); - } - - // Copy scalars out of self before taking &mut self.series_map let worker_id = self.id; let allowed_lateness_ms = self.allowed_lateness_ms; let late_data_policy = self.late_data_policy; - // Ensure state exists - self.get_or_create_series_state(series_key); - - let state = self.series_map.get_mut(series_key).unwrap(); - - if state.aggregations.is_empty() { + if self.get_or_create_group_state(agg_id, group_key).is_none() { + warn!( + "Worker {} skipping samples for unknown agg_id={}, group_key={}", + self.id, agg_id, group_key + ); return Ok(()); } + let state = self + .group_states + .get_mut(&(agg_id, group_key.to_string())) + .unwrap(); - // Insert samples into buffer, dropping late arrivals - for &(ts, val) in &samples { - if state.buffer.watermark_ms() != i64::MIN - && ts < state.buffer.watermark_ms() - allowed_lateness_ms - { - debug!( - "Worker {} dropping late sample for {}: ts={} watermark={}", - worker_id, - series_key, - ts, - state.buffer.watermark_ms() - ); - continue; - } - state.buffer.insert(ts, val); - } - - let current_wm = state.buffer.watermark_ms(); + // Find the max timestamp in this batch to advance the watermark + let batch_max_ts = samples + .iter() + .map(|(_, ts, _)| *ts) + .max() + .unwrap_or(i64::MIN); let previous_wm = state.previous_watermark_ms; + let current_wm = if batch_max_ts > previous_wm { + batch_max_ts + } else { + previous_wm + }; let mut emit_batch: Vec<(PrecomputedOutput, Box)> = Vec::new(); - for agg_state in &mut state.aggregations { - let closed = agg_state - .window_manager - .closed_windows(previous_wm, current_wm); - - // Pane-based sample routing: each sample goes to exactly 1 pane - for &(ts, val) in &samples { - if current_wm != i64::MIN && ts < current_wm - allowed_lateness_ms { - continue; // already dropped - } + // Route each sample to its pane + for (series_key, ts, val) in &samples { + // Drop late samples + if previous_wm != i64::MIN && *ts < previous_wm - allowed_lateness_ms { + debug!( + "Worker {} dropping late sample for group ({}, {}): ts={} watermark={}", + worker_id, agg_id, group_key, ts, previous_wm + ); + continue; + } - let pane_start = agg_state.window_manager.pane_start_for(ts); - let pane_end = pane_start + agg_state.window_manager.slide_interval_ms(); + let pane_start = state.window_manager.pane_start_for(*ts); + let pane_end = pane_start + state.window_manager.slide_interval_ms(); - // Check if pane was already evicted (late data for a closed window). - // A pane is evicted when its oldest window closes, i.e. the window - // starting at pane_start. If that window is closed, the pane is gone. - if !agg_state.active_panes.contains_key(&pane_start) - && current_wm >= pane_start + agg_state.window_manager.window_size_ms() - { - // The window starting at this pane_start is already closed, - // so this pane was evicted — handle as late data. - let window_start = pane_start; - let window_end = pane_start + agg_state.window_manager.window_size_ms(); - match late_data_policy { - LateDataPolicy::Drop => { - debug!( - "Dropping late sample for evicted pane [{}, {})", - pane_start, pane_end - ); - continue; - } - LateDataPolicy::ForwardToStore => { - let mut updater = create_accumulator_updater(&agg_state.config); - apply_sample(&mut *updater, series_key, val, ts, &agg_state.config); - let key = if config_is_keyed(&agg_state.config) { - Some(extract_key_from_series(series_key, &agg_state.config)) - } else { - None - }; - let output = PrecomputedOutput::new( - window_start as u64, - window_end as u64, - key, - agg_state.config.aggregation_id, - ); - emit_batch.push((output, updater.take_accumulator())); - debug!( - "Forwarding late sample to store for evicted pane [{}, {})", - pane_start, pane_end - ); - continue; - } + // Check if pane was already evicted (late data for a closed window) + if !state.active_panes.contains_key(&pane_start) + && current_wm >= pane_start + state.window_manager.window_size_ms() + { + let window_start = pane_start; + let window_end = pane_start + state.window_manager.window_size_ms(); + match late_data_policy { + LateDataPolicy::Drop => { + debug!( + "Dropping late sample for evicted pane [{}, {})", + pane_start, pane_end + ); + continue; + } + LateDataPolicy::ForwardToStore => { + let mut updater = create_accumulator_updater(&state.config); + apply_sample(&mut *updater, series_key, *val, *ts, &state.config); + let key = build_group_key_label_values(group_key); + let output = PrecomputedOutput::new( + window_start as u64, + window_end as u64, + Some(key), + agg_id, + ); + emit_batch.push((output, updater.take_accumulator())); + debug!( + "Forwarding late sample to store for evicted pane [{}, {})", + pane_start, pane_end + ); + continue; } } - - // Normal path: route sample to its single pane - let updater = agg_state - .active_panes - .entry(pane_start) - .or_insert_with(|| create_accumulator_updater(&agg_state.config)); - - apply_sample(&mut **updater, series_key, val, ts, &agg_state.config); } - // Emit closed windows by merging their constituent panes - for window_start in &closed { - let (_, window_end) = agg_state.window_manager.window_bounds(*window_start); - let pane_starts = agg_state.window_manager.panes_for_window(*window_start); + // Normal path: route sample to its single pane accumulator + let updater = state + .active_panes + .entry(pane_start) + .or_insert_with(|| create_accumulator_updater(&state.config)); - if let Some(accumulator) = - merge_panes_for_window(&mut agg_state.active_panes, &pane_starts) - { - let key = if config_is_keyed(&agg_state.config) { - Some(extract_key_from_series(series_key, &agg_state.config)) - } else { - None - }; + apply_sample(&mut **updater, series_key, *val, *ts, &state.config); + } - let output = PrecomputedOutput::new( - *window_start as u64, - window_end as u64, - key, - agg_state.config.aggregation_id, - ); + // Check for closed windows + let closed = state.window_manager.closed_windows(previous_wm, current_wm); - emit_batch.push((output, accumulator)); - } + for window_start in &closed { + let (_, window_end) = state.window_manager.window_bounds(*window_start); + let pane_starts = state.window_manager.panes_for_window(*window_start); + + if let Some(accumulator) = merge_panes_for_window(&mut state.active_panes, &pane_starts) + { + let key = build_group_key_label_values(group_key); + let output = PrecomputedOutput::new( + *window_start as u64, + window_end as u64, + Some(key), + agg_id, + ); + emit_batch.push((output, accumulator)); } } @@ -330,10 +325,11 @@ impl Worker { // Emit to output sink if !emit_batch.is_empty() { debug!( - "Worker {} emitting {} outputs for {}", + "Worker {} emitting {} outputs for group ({}, {})", worker_id, emit_batch.len(), - series_key + agg_id, + group_key ); self.output_sink.emit_batch(emit_batch)?; } @@ -342,7 +338,7 @@ impl Worker { } /// Raw fast-path: emit each sample as a standalone `SumAccumulator`. - fn process_samples_raw( + pub fn process_samples_raw( &self, series_key: &str, samples: Vec<(i64, f64)>, @@ -370,49 +366,74 @@ impl Worker { Ok(()) } - /// Flush all series — force-close windows that are past due. + /// Flush all groups with cross-group watermark propagation. + /// + /// 1. Compute worker watermark = max(all group watermarks) + /// 2. Publish it for cross-worker reads + /// 3. Compute global watermark = min(all worker watermarks) + /// 4. Advance idle groups to the global watermark, closing due windows fn flush_all(&mut self) -> Result<(), Box> { if self.pass_raw_samples { return Ok(()); } - let mut emit_batch: Vec<(PrecomputedOutput, Box)> = Vec::new(); + // Step 1: Compute worker watermark = max of all group watermarks. + let worker_wm = self + .group_states + .values() + .map(|s| s.previous_watermark_ms) + .filter(|&wm| wm != i64::MIN) + .max() + .unwrap_or(i64::MIN); - for (series_key, state) in &mut self.series_map { - let current_wm = state.buffer.watermark_ms(); - let previous_wm = state.previous_watermark_ms; + // Step 2: Publish our worker watermark for cross-worker reads. + self.worker_watermark.store(worker_wm, Ordering::Release); - for agg_state in &mut state.aggregations { - let closed = agg_state - .window_manager - .closed_windows(previous_wm, current_wm); + // Step 3: Compute global watermark = min(all worker watermarks). + let global_wm = self.compute_global_watermark(); - for window_start in &closed { - let (_, window_end) = agg_state.window_manager.window_bounds(*window_start); - let pane_starts = agg_state.window_manager.panes_for_window(*window_start); + // Step 4: For each group, advance watermark and close due windows. + let mut emit_batch: Vec<(PrecomputedOutput, Box)> = Vec::new(); - if let Some(accumulator) = - merge_panes_for_window(&mut agg_state.active_panes, &pane_starts) - { - let key = if config_is_keyed(&agg_state.config) { - Some(extract_key_from_series(series_key, &agg_state.config)) - } else { - None - }; + for ((agg_id, group_key), state) in &mut self.group_states { + if state.previous_watermark_ms == i64::MIN { + continue; // No samples received yet — no panes to close. + } - let output = PrecomputedOutput::new( - *window_start as u64, - window_end as u64, - key, - agg_state.config.aggregation_id, - ); + // Effective watermark: max(group's own, global) + 1ms for boundary. + let propagated_wm = if global_wm != i64::MIN { + state.previous_watermark_ms.max(global_wm) + } else { + state.previous_watermark_ms + }; + let effective_wm = propagated_wm.saturating_add(1); - emit_batch.push((output, accumulator)); - } + let closed = state + .window_manager + .closed_windows(state.previous_watermark_ms, effective_wm); + + for window_start in &closed { + let (_, window_end) = state.window_manager.window_bounds(*window_start); + let pane_starts = state.window_manager.panes_for_window(*window_start); + + if let Some(accumulator) = + merge_panes_for_window(&mut state.active_panes, &pane_starts) + { + let key = build_group_key_label_values(group_key); + let output = PrecomputedOutput::new( + *window_start as u64, + window_end as u64, + Some(key), + *agg_id, + ); + emit_batch.push((output, accumulator)); } } - state.previous_watermark_ms = current_wm; + // Update group watermark to reflect the advancement. + if effective_wm > state.previous_watermark_ms { + state.previous_watermark_ms = effective_wm; + } } if !emit_batch.is_empty() { @@ -426,6 +447,34 @@ impl Worker { Ok(()) } + + /// Compute the global watermark as min(all worker watermarks), ignoring + /// workers that haven't started yet (still at i64::MIN). + fn compute_global_watermark(&self) -> i64 { + let mut global_wm = i64::MAX; + let mut any_started = false; + for wm_atomic in &self.all_worker_watermarks { + let wm = wm_atomic.load(Ordering::Acquire); + if wm != i64::MIN { + global_wm = global_wm.min(wm); + any_started = true; + } + } + if any_started { + global_wm + } else { + i64::MIN + } + } +} + +/// Build a `KeyByLabelValues` from a semicolon-delimited group key string. +/// e.g. "constant" → KeyByLabelValues { labels: ["constant"] } +/// e.g. "us-east;svc-a" → KeyByLabelValues { labels: ["us-east", "svc-a"] } +/// e.g. "" → KeyByLabelValues { labels: [""] } +fn build_group_key_label_values(group_key: &str) -> KeyByLabelValues { + let labels: Vec = group_key.split(';').map(|s| s.to_string()).collect(); + KeyByLabelValues::new_with_labels(labels) } /// Extract the metric name from a series key like `"metric_name{key1=\"val1\"}"`. @@ -457,7 +506,7 @@ pub fn extract_key_from_series(series_key: &str, config: &AggregationConfig) -> /// Parse label key-value pairs from a series key string. /// `"metric{a=\"b\",c=\"d\"}"` → `{("a", "b"), ("c", "d")}` -fn parse_labels_from_series_key(series_key: &str) -> HashMap<&str, &str> { +pub fn parse_labels_from_series_key(series_key: &str) -> HashMap<&str, &str> { let mut labels = HashMap::new(); let start = match series_key.find('{') { @@ -476,23 +525,19 @@ fn parse_labels_from_series_key(series_key: &str) -> HashMap<&str, &str> { let label_str = &series_key[start..end]; // Parse comma-separated key="value" pairs - // Simple parser that handles the expected format let mut remaining = label_str; while !remaining.is_empty() { - // Find the '=' separator let eq_pos = match remaining.find('=') { Some(pos) => pos, None => break, }; let key = remaining[..eq_pos].trim(); - // Expect "value" after = let after_eq = &remaining[eq_pos + 1..]; if !after_eq.starts_with('"') { break; } - // Find closing quote let value_start = 1; // skip opening quote let value_end = match after_eq[value_start..].find('"') { Some(pos) => value_start + pos, @@ -502,8 +547,7 @@ fn parse_labels_from_series_key(series_key: &str) -> HashMap<&str, &str> { let value = &after_eq[value_start..value_end]; labels.insert(key, value); - // Move past the closing quote and optional comma - let consumed = value_end + 1; // past closing quote + let consumed = value_end + 1; remaining = &after_eq[consumed..]; if remaining.starts_with(',') { remaining = &remaining[1..]; @@ -514,7 +558,12 @@ fn parse_labels_from_series_key(series_key: &str) -> HashMap<&str, &str> { } /// Route a single sample to `updater`, dispatching keyed vs. non-keyed based on config. -/// Eliminates repeated `if updater.is_keyed()` blocks at call sites. +/// +/// For keyed accumulators (MultipleSum, CMS, HydraKLL), the key is extracted +/// from the series' **aggregated_labels** — these are the labels that become +/// the key dimension *inside* the sketch (e.g., which bucket in a CMS, which +/// entry in a MultipleSumAccumulator's HashMap). This matches the Arroyo SQL +/// pattern: `udf(concat_ws(';', aggregated_labels), value)`. fn apply_sample( updater: &mut dyn AccumulatorUpdater, series_key: &str, @@ -523,13 +572,34 @@ fn apply_sample( config: &AggregationConfig, ) { if updater.is_keyed() { - let key = extract_key_from_series(series_key, config); + let key = extract_aggregated_key_from_series(series_key, config); updater.update_keyed(&key, val, ts); } else { updater.update_single(val, ts); } } +/// Extract aggregated label values from a series key string. +/// These are the labels that form the key dimension *inside* keyed accumulators +/// (MultipleSum, CMS, HydraKLL), matching Arroyo's `agg_columns`. +fn extract_aggregated_key_from_series( + series_key: &str, + config: &AggregationConfig, +) -> KeyByLabelValues { + let labels = parse_labels_from_series_key(series_key); + let mut values = Vec::new(); + + for label_name in &config.aggregated_labels.labels { + if let Some(val) = labels.get(label_name.as_str()) { + values.push(val.to_string()); + } else { + values.push(String::new()); + } + } + + KeyByLabelValues::new_with_labels(values) +} + /// Merge the pane accumulators that constitute a closed window. /// /// The oldest pane (index 0) is taken destructively from `active_panes` @@ -628,6 +698,29 @@ mod tests { window_secs: u64, slide_secs: u64, grouping: Vec<&str>, + ) -> AggregationConfig { + make_agg_config_full( + id, + metric, + agg_type, + agg_sub_type, + window_secs, + slide_secs, + grouping, + vec![], + ) + } + + #[allow(clippy::too_many_arguments)] + fn make_agg_config_full( + id: u64, + metric: &str, + agg_type: AggregationType, + agg_sub_type: &str, + window_secs: u64, + slide_secs: u64, + grouping: Vec<&str>, + aggregated: Vec<&str>, ) -> AggregationConfig { let window_type = if slide_secs == 0 || slide_secs == window_secs { WindowType::Tumbling @@ -642,7 +735,9 @@ mod tests { promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new( grouping.iter().map(|s| s.to_string()).collect(), ), - promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new( + aggregated.iter().map(|s| s.to_string()).collect(), + ), promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), String::new(), window_secs, @@ -665,6 +760,7 @@ mod tests { late_policy: LateDataPolicy, ) -> Worker { let (_tx, rx) = tokio::sync::mpsc::channel(1); + let wm = Arc::new(AtomicI64::new(i64::MIN)); Worker::new( 0, rx, @@ -677,16 +773,26 @@ mod tests { raw_mode_aggregation_id: raw_agg_id, late_data_policy: late_policy, }, + Arc::new(AtomicUsize::new(0)), + wm.clone(), + vec![wm], ) } - /// Wrap a `HashMap` for use with `make_worker`. fn arc_configs( configs: HashMap, ) -> HashMap> { configs.into_iter().map(|(k, v)| (k, Arc::new(v))).collect() } + /// Helper to make GroupSamples from simple (ts, val) pairs for a single series. + fn group_samples(series_key: &str, samples: Vec<(i64, f64)>) -> Vec<(String, i64, f64)> { + samples + .into_iter() + .map(|(ts, val)| (series_key.to_string(), ts, val)) + .collect() + } + // ----------------------------------------------------------------------- // Test: raw mode — each sample forwarded as SumAccumulator with sum==value // ----------------------------------------------------------------------- @@ -694,19 +800,19 @@ mod tests { #[test] fn test_raw_mode_forwarding() { let sink = Arc::new(CapturingOutputSink::new()); - let mut worker = make_worker(HashMap::new(), sink.clone(), true, 99, LateDataPolicy::Drop); + let worker = make_worker(HashMap::new(), sink.clone(), true, 99, LateDataPolicy::Drop); let samples = vec![(1000_i64, 1.5_f64), (2000, 2.5), (3000, 7.0)]; worker - .process_samples("cpu{host=\"a\"}", samples.clone()) + .process_samples_raw("cpu{host=\"a\"}", samples.clone()) .unwrap(); let captured = sink.drain(); assert_eq!(captured.len(), 3, "should emit one output per raw sample"); for ((ts, val), (output, acc)) in samples.iter().zip(captured.iter()) { - assert_eq!(output.start_timestamp as i64, *ts, "start should equal ts"); - assert_eq!(output.end_timestamp as i64, *ts, "end should equal ts"); + assert_eq!(output.start_timestamp as i64, *ts); + assert_eq!(output.end_timestamp as i64, *ts); assert_eq!(output.aggregation_id, 99); let sum_acc = acc .as_any() @@ -748,24 +854,21 @@ mod tests { ); // Samples in window [0, 10000ms): sum should be 1+2+3=6. - // Send one at a time so the watermark advances incrementally — - // a batch's max-ts becomes the new watermark, and with - // allowed_lateness_ms=0 any ts < watermark in the same call is dropped. + // All go to the same group (agg_id=1, group_key="") worker - .process_samples("cpu", vec![(1000_i64, 1.0)]) + .process_group_samples(1, "", group_samples("cpu", vec![(1000, 1.0)])) .unwrap(); worker - .process_samples("cpu", vec![(5000_i64, 2.0)]) + .process_group_samples(1, "", group_samples("cpu", vec![(5000, 2.0)])) .unwrap(); worker - .process_samples("cpu", vec![(9000_i64, 3.0)]) + .process_group_samples(1, "", group_samples("cpu", vec![(9000, 3.0)])) .unwrap(); - // No windows closed yet (watermark still below 10000) assert_eq!(sink.len(), 0); - // Sample at t=10000ms advances watermark to 10000, closing [0, 10000) + // Sample at t=10000ms closes [0, 10000) worker - .process_samples("cpu", vec![(10000_i64, 100.0)]) + .process_group_samples(1, "", group_samples("cpu", vec![(10000, 100.0)])) .unwrap(); let captured = sink.drain(); @@ -775,10 +878,6 @@ mod tests { assert_eq!(output.aggregation_id, 1); assert_eq!(output.start_timestamp, 0); assert_eq!(output.end_timestamp, 10_000); - assert!( - output.key.is_none(), - "SingleSubpopulation should have no key" - ); let sum_acc = acc .as_any() @@ -792,23 +891,24 @@ mod tests { } // ----------------------------------------------------------------------- - // Test: sliding window pane sharing — one sample, two window emits, same sum + // Test: GROUP BY — multiple series merged into same group accumulator // ----------------------------------------------------------------------- #[test] - fn test_sliding_window_pane_sharing() { - // 30s window, 10s slide → W=3 panes per window + fn test_group_by_merges_series() { + // SingleSubpopulation Sum with no grouping labels + // Two different series in the same group → both feed same accumulator let config = make_agg_config( - 2, + 1, "cpu", AggregationType::SingleSubpopulation, "Sum", - 30, 10, + 0, vec![], ); let mut agg_configs = HashMap::new(); - agg_configs.insert(2, config); + agg_configs.insert(1, config); let sink = Arc::new(CapturingOutputSink::new()); let mut worker = make_worker( @@ -819,69 +919,61 @@ mod tests { LateDataPolicy::Drop, ); - // Sample at t=15000ms → goes to pane 10000ms - // previous_wm == i64::MIN → no windows close + // Two different series, same group (agg_id=1, group_key="") + // Both feed into the same accumulator worker - .process_samples("cpu", vec![(15_000_i64, 42.0)]) + .process_group_samples( + 1, + "", + vec![ + ("cpu{host=\"A\"}".to_string(), 1000, 10.0), + ("cpu{host=\"B\"}".to_string(), 2000, 20.0), + ], + ) .unwrap(); assert_eq!(sink.len(), 0); - // Sample at t=45000ms → advances watermark to 45000ms - // Closes windows [0, 30000) and [10000, 40000) + // Close the window worker - .process_samples("cpu", vec![(45_000_i64, 0.0)]) + .process_group_samples(1, "", group_samples("cpu{host=\"A\"}", vec![(10000, 0.0)])) .unwrap(); let captured = sink.drain(); - // Both windows should emit — one from pane merge snapshot, one from take - // Window [0, 30000): panes [0, 10000, 20000]; pane 10000 snapshot → sum=42 - // Window [10000, 40000): panes [10000, 20000, 30000]; pane 10000 take → sum=42 - assert_eq!( - captured.len(), - 2, - "two windows containing the pane should emit" - ); + assert_eq!(captured.len(), 1, "one output per group per window"); - let window_starts: Vec = captured.iter().map(|(o, _)| o.start_timestamp).collect(); - assert!(window_starts.contains(&0), "window [0, 30000) should emit"); + let (output, acc) = &captured[0]; + assert_eq!(output.aggregation_id, 1); + assert_eq!(output.start_timestamp, 0); + assert_eq!(output.end_timestamp, 10_000); + + let sum_acc = acc + .as_any() + .downcast_ref::() + .expect("should be SumAccumulator"); assert!( - window_starts.contains(&10_000), - "window [10000, 40000) should emit" + (sum_acc.sum - 30.0).abs() < 1e-10, + "sum should be 10+20=30, got {} (both series merged)", + sum_acc.sum ); - - for (output, acc) in &captured { - let sum_acc = acc - .as_any() - .downcast_ref::() - .expect("should be SumAccumulator"); - assert!( - (sum_acc.sum - 42.0).abs() < 1e-10, - "window {:?} should have sum=42 via pane sharing, got {}", - output.start_timestamp, - sum_acc.sum - ); - } } // ----------------------------------------------------------------------- - // Test: GROUP BY — two series on same worker produce separate accumulators + // Test: GROUP BY with grouping labels — different groups produce separate outputs // ----------------------------------------------------------------------- #[test] - fn test_groupby_separate_emits_per_series() { - // MultipleSubpopulation Sum with grouping on "host" - // Two series on same worker → same window accumulator per-agg holds both keys + fn test_different_groups_separate_outputs() { let config = make_agg_config( - 3, + 1, "cpu", - AggregationType::MultipleSubpopulation, + AggregationType::SingleSubpopulation, "Sum", 10, 0, - vec!["host"], + vec!["pattern"], ); let mut agg_configs = HashMap::new(); - agg_configs.insert(3, config); + agg_configs.insert(1, config); let sink = Arc::new(CapturingOutputSink::new()); let mut worker = make_worker( @@ -892,113 +984,338 @@ mod tests { LateDataPolicy::Drop, ); - // Feed two series in the same window [0, 10000ms) + // Group "constant" gets samples worker - .process_samples("cpu{host=\"A\"}", vec![(1000_i64, 10.0)]) + .process_group_samples( + 1, + "constant", + group_samples("cpu{pattern=\"constant\"}", vec![(1000, 5.0)]), + ) .unwrap(); + // Group "sine" gets samples worker - .process_samples("cpu{host=\"B\"}", vec![(2000_i64, 20.0)]) + .process_group_samples( + 1, + "sine", + group_samples("cpu{pattern=\"sine\"}", vec![(2000, 7.0)]), + ) .unwrap(); - assert_eq!(sink.len(), 0, "no windows closed yet"); - // Advance watermark to close [0, 10000) for series "A" + // Close both groups' windows worker - .process_samples("cpu{host=\"A\"}", vec![(10_000_i64, 0.0)]) + .process_group_samples( + 1, + "constant", + group_samples("cpu{pattern=\"constant\"}", vec![(10000, 0.0)]), + ) .unwrap(); - // Also advance "B"'s watermark worker - .process_samples("cpu{host=\"B\"}", vec![(10_000_i64, 0.0)]) + .process_group_samples( + 1, + "sine", + group_samples("cpu{pattern=\"sine\"}", vec![(10000, 0.0)]), + ) .unwrap(); let captured = sink.drain(); - // Each series has its own SeriesState and independent pane accumulators. - // The MultipleSubpopulation accumulator for each series records its own key. - // So we get 2 emits (one per series), each a MultipleSumAccumulator with a single key. - assert_eq!( - captured.len(), - 2, - "each series emits independently — no ingest-time merge" - ); + assert_eq!(captured.len(), 2, "two groups → two outputs"); - // Verify the grouping keys are distinct - let mut found_a = false; - let mut found_b = false; + let mut sums_by_key: HashMap = HashMap::new(); for (output, acc) in &captured { - assert_eq!(output.start_timestamp, 0); - assert_eq!(output.end_timestamp, 10_000); - let ms_acc = acc - .as_any() - .downcast_ref::() - .expect("should be MultipleSumAccumulator"); - for (key, &sum) in &ms_acc.sums { - if key.labels == vec!["A".to_string()] { - assert!((sum - 10.0).abs() < 1e-10); - found_a = true; - } - if key.labels == vec!["B".to_string()] { - assert!((sum - 20.0).abs() < 1e-10); - found_b = true; - } - } + let sum_acc = acc.as_any().downcast_ref::().unwrap(); + let key = output.key.as_ref().unwrap().labels.join(";"); + sums_by_key.insert(key, sum_acc.sum); } - assert!(found_a, "expected emit for host=A"); - assert!(found_b, "expected emit for host=B"); + assert!((sums_by_key["constant"] - 5.0).abs() < 1e-10); + assert!((sums_by_key["sine"] - 7.0).abs() < 1e-10); } + // ----------------------------------------------------------------------- + // Test: KLL GROUP BY — multiple series merged into one KLL sketch per group + // ----------------------------------------------------------------------- + #[test] - fn test_arroyosketch_multiple_sum_matches_handcrafted_precompute_output() { - let config = make_agg_config( - 11, - "cpu", - AggregationType::MultipleSum, - "sum", + fn test_kll_group_by_merges_series() { + let mut config = make_agg_config( + 1, + "latency", + AggregationType::DatasketchesKLL, + "", 10, 0, - vec!["host"], + vec!["pattern"], ); + config + .parameters + .insert("K".to_string(), serde_json::Value::from(20_u64)); let mut agg_configs = HashMap::new(); - agg_configs.insert(11, config.clone()); + agg_configs.insert(1, config); let sink = Arc::new(CapturingOutputSink::new()); let mut worker = make_worker( - arc_configs(agg_configs.clone()), + arc_configs(agg_configs), sink.clone(), false, 0, LateDataPolicy::Drop, ); + // Three different series all in group "constant" — all feed one KLL worker - .process_samples("cpu{host=\"A\"}", vec![(1_000_i64, 1.0)]) - .unwrap(); - worker - .process_samples("cpu{host=\"A\"}", vec![(5_000_i64, 2.0)]) - .unwrap(); - worker - .process_samples("cpu{host=\"A\"}", vec![(9_000_i64, 3.0)]) + .process_group_samples( + 1, + "constant", + vec![ + ( + "latency{pattern=\"constant\",host=\"a\"}".to_string(), + 1000, + 10.0, + ), + ( + "latency{pattern=\"constant\",host=\"b\"}".to_string(), + 2000, + 20.0, + ), + ( + "latency{pattern=\"constant\",host=\"c\"}".to_string(), + 3000, + 30.0, + ), + ], + ) .unwrap(); + + // Close the window worker - .process_samples("cpu{host=\"A\"}", vec![(10_000_i64, 0.0)]) + .process_group_samples( + 1, + "constant", + group_samples( + "latency{pattern=\"constant\",host=\"a\"}", + vec![(10000, 0.0)], + ), + ) .unwrap(); let captured = sink.drain(); - assert_eq!(captured.len(), 1, "expected one closed window output"); + assert_eq!(captured.len(), 1, "one KLL output for the whole group"); - let (handcrafted_output, handcrafted_acc) = &captured[0]; - let handcrafted_acc = handcrafted_acc + let (output, acc) = &captured[0]; + assert_eq!(output.aggregation_id, 1); + let kll = acc .as_any() - .downcast_ref::() - .expect("hand-crafted engine should emit MultipleSumAccumulator"); + .downcast_ref::() + .expect("should be KLL"); + assert_eq!( + kll.inner.count(), + 3, + "KLL should contain all 3 series' samples" + ); + } - let mut arroyo_sums = HashMap::new(); - arroyo_sums.insert("A".to_string(), 6.0); - let arroyo_precompute_bytes = - rmp_serde::to_vec(&arroyo_sums).expect("Arroyo MessagePack encoding should succeed"); + // ----------------------------------------------------------------------- + // Test: sliding window pane sharing + // ----------------------------------------------------------------------- - let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); - encoder - .write_all(&arroyo_precompute_bytes) - .expect("gzip encoding should succeed"); + #[test] + fn test_sliding_window_pane_sharing() { + // 30s window, 10s slide → W=3 panes per window + let config = make_agg_config( + 2, + "cpu", + AggregationType::SingleSubpopulation, + "Sum", + 30, + 10, + vec![], + ); + let mut agg_configs = HashMap::new(); + agg_configs.insert(2, config); + + let sink = Arc::new(CapturingOutputSink::new()); + let mut worker = make_worker( + arc_configs(agg_configs), + sink.clone(), + false, + 0, + LateDataPolicy::Drop, + ); + + // Sample at t=15000ms → goes to pane 10000ms + worker + .process_group_samples(2, "", group_samples("cpu", vec![(15_000, 42.0)])) + .unwrap(); + assert_eq!(sink.len(), 0); + + // Sample at t=45000ms → advances watermark to 45000ms + // Closes windows [0, 30000) and [10000, 40000) + worker + .process_group_samples(2, "", group_samples("cpu", vec![(45_000, 0.0)])) + .unwrap(); + + let captured = sink.drain(); + assert_eq!( + captured.len(), + 2, + "two windows containing the pane should emit" + ); + + let window_starts: Vec = captured.iter().map(|(o, _)| o.start_timestamp).collect(); + assert!(window_starts.contains(&0)); + assert!(window_starts.contains(&10_000)); + + for (_output, acc) in &captured { + let sum_acc = acc + .as_any() + .downcast_ref::() + .expect("should be SumAccumulator"); + assert!( + (sum_acc.sum - 42.0).abs() < 1e-10, + "window should have sum=42 via pane sharing, got {}", + sum_acc.sum + ); + } + } + + // ----------------------------------------------------------------------- + // Test: MultipleSubpopulation — keyed accumulator with aggregated labels + // Matches planner output: grouping=[], aggregated=[host] + // All series go to one group, host is the key dimension INSIDE the sketch + // ----------------------------------------------------------------------- + + #[test] + fn test_keyed_accumulator_aggregated_labels() { + // Like planner output for `sum by (host) (cpu)`: + // grouping=[] (empty), aggregated=[host] (key inside MultipleSumAccumulator) + let config = make_agg_config_full( + 3, + "cpu", + AggregationType::MultipleSubpopulation, + "Sum", + 10, + 0, + vec![], // grouping: empty — one output group + vec!["host"], // aggregated: host is the key INSIDE the sketch + ); + let mut agg_configs = HashMap::new(); + agg_configs.insert(3, config); + + let sink = Arc::new(CapturingOutputSink::new()); + let mut worker = make_worker( + arc_configs(agg_configs), + sink.clone(), + false, + 0, + LateDataPolicy::Drop, + ); + + // Both series go to the SAME group (group_key="" since grouping is empty). + // The host label is extracted as the aggregated key inside the accumulator. + worker + .process_group_samples( + 3, + "", + vec![ + ("cpu{host=\"A\"}".to_string(), 1000, 10.0), + ("cpu{host=\"B\"}".to_string(), 2000, 20.0), + ], + ) + .unwrap(); + + // Close the single group's window + worker + .process_group_samples(3, "", group_samples("cpu{host=\"A\"}", vec![(10000, 0.0)])) + .unwrap(); + + let captured = sink.drain(); + assert_eq!( + captured.len(), + 1, + "one group → one output (both hosts inside)" + ); + + let (_output, acc) = &captured[0]; + let ms_acc = acc + .as_any() + .downcast_ref::() + .expect("should be MultipleSumAccumulator"); + + // The MultipleSumAccumulator should have two internal keys: "A" and "B" + assert_eq!(ms_acc.sums.len(), 2, "two host keys inside one accumulator"); + + let mut found_a = false; + let mut found_b = false; + for (key, &sum) in &ms_acc.sums { + if key.labels == vec!["A".to_string()] { + assert!((sum - 10.0).abs() < 1e-10); + found_a = true; + } + if key.labels == vec!["B".to_string()] { + assert!((sum - 20.0).abs() < 1e-10); + found_b = true; + } + } + assert!(found_a, "expected key A inside accumulator"); + assert!(found_b, "expected key B inside accumulator"); + } + + // ----------------------------------------------------------------------- + // Test: Arroyo KLL equivalence — same output as Arroyo pipeline + // ----------------------------------------------------------------------- + #[test] + fn test_arroyosketch_multiple_sum_matches_handcrafted_precompute_output() { + let config = make_agg_config( + 11, + "cpu", + AggregationType::MultipleSum, + "sum", + 10, + 0, + vec!["host"], + ); + let mut agg_configs = HashMap::new(); + agg_configs.insert(11, config.clone()); + + let sink = Arc::new(CapturingOutputSink::new()); + let mut worker = make_worker( + arc_configs(agg_configs.clone()), + sink.clone(), + false, + 0, + LateDataPolicy::Drop, + ); + + worker + .process_samples("cpu{host=\"A\"}", vec![(1_000_i64, 1.0)]) + .unwrap(); + worker + .process_samples("cpu{host=\"A\"}", vec![(5_000_i64, 2.0)]) + .unwrap(); + worker + .process_samples("cpu{host=\"A\"}", vec![(9_000_i64, 3.0)]) + .unwrap(); + worker + .process_samples("cpu{host=\"A\"}", vec![(10_000_i64, 0.0)]) + .unwrap(); + + let captured = sink.drain(); + assert_eq!(captured.len(), 1, "expected one closed window output"); + + let (handcrafted_output, handcrafted_acc) = &captured[0]; + let handcrafted_acc = handcrafted_acc + .as_any() + .downcast_ref::() + .expect("hand-crafted engine should emit MultipleSumAccumulator"); + + let mut arroyo_sums = HashMap::new(); + arroyo_sums.insert("A".to_string(), 6.0); + let arroyo_precompute_bytes = + rmp_serde::to_vec(&arroyo_sums).expect("Arroyo MessagePack encoding should succeed"); + + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder + .write_all(&arroyo_precompute_bytes) + .expect("gzip encoding should succeed"); let arroyo_json = json!({ "aggregation_id": 11, "window": { @@ -1064,11 +1381,11 @@ mod tests { let samples = vec![(1_000_i64, 10.0), (5_000_i64, 20.0), (9_000_i64, 30.0)]; for &(ts, value) in &samples { worker - .process_samples("latency", vec![(ts, value)]) + .process_group_samples(12, "", group_samples("latency", vec![(ts, value)])) .unwrap(); } worker - .process_samples("latency", vec![(10_000_i64, 0.0)]) + .process_group_samples(12, "", group_samples("latency", vec![(10_000, 0.0)])) .unwrap(); let captured = sink.drain(); @@ -1118,11 +1435,6 @@ mod tests { handcrafted_output.end_timestamp, arroyo_output.end_timestamp ); - assert_eq!(handcrafted_output.key, None); - assert_eq!( - arroyo_output.key, - Some(KeyByLabelValues::new_with_labels(vec![String::new()])) - ); assert_eq!(handcrafted_acc.inner.k, arroyo_acc.inner.k); assert_eq!(handcrafted_acc.inner.count(), arroyo_acc.inner.count()); @@ -1135,7 +1447,109 @@ mod tests { } // ----------------------------------------------------------------------- - // Test: late data drop — sample behind watermark - allowed_lateness not emitted + // Test: Arroyo MultipleSum equivalence + // ----------------------------------------------------------------------- + + #[test] + fn test_arroyosketch_multiple_sum_matches_handcrafted_precompute_output() { + // Like planner output: grouping=[], aggregated=[host] + let config = make_agg_config_full( + 11, + "cpu", + AggregationType::MultipleSum, + "sum", + 10, + 0, + vec![], + vec!["host"], + ); + let mut agg_configs = HashMap::new(); + agg_configs.insert(11, config.clone()); + + let sink = Arc::new(CapturingOutputSink::new()); + let mut worker = make_worker( + arc_configs(agg_configs.clone()), + sink.clone(), + false, + 0, + LateDataPolicy::Drop, + ); + + // All samples go to group "" (empty group key since grouping=[]). + // The host label is the aggregated key inside the accumulator. + worker + .process_group_samples(11, "", group_samples("cpu{host=\"A\"}", vec![(1_000, 1.0)])) + .unwrap(); + worker + .process_group_samples(11, "", group_samples("cpu{host=\"A\"}", vec![(5_000, 2.0)])) + .unwrap(); + worker + .process_group_samples(11, "", group_samples("cpu{host=\"A\"}", vec![(9_000, 3.0)])) + .unwrap(); + worker + .process_group_samples( + 11, + "", + group_samples("cpu{host=\"A\"}", vec![(10_000, 0.0)]), + ) + .unwrap(); + + let captured = sink.drain(); + assert_eq!(captured.len(), 1, "expected one closed window output"); + + let (handcrafted_output, handcrafted_acc) = &captured[0]; + let handcrafted_acc = handcrafted_acc + .as_any() + .downcast_ref::() + .expect("hand-crafted engine should emit MultipleSumAccumulator"); + + // Arroyo: GROUP BY '' (empty key), UDF gets host="A" as aggregated key + let mut arroyo_sums = HashMap::new(); + arroyo_sums.insert("A".to_string(), 6.0); + let arroyo_precompute_bytes = + rmp_serde::to_vec(&arroyo_sums).expect("Arroyo MessagePack encoding should succeed"); + + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder + .write_all(&arroyo_precompute_bytes) + .expect("gzip encoding should succeed"); + let arroyo_json = json!({ + "aggregation_id": 11, + "window": { + "start": "1970-01-01T00:00:00", + "end": "1970-01-01T00:00:10" + }, + "key": "", + "precompute": hex::encode(encoder.finish().expect("gzip finalize should succeed")) + }); + + let streaming_config = StreamingConfig::new(agg_configs); + let (arroyo_output, arroyo_acc) = + PrecomputedOutput::deserialize_from_json_arroyo(&arroyo_json, &streaming_config) + .expect("Arroyo precompute should deserialize"); + let arroyo_acc = arroyo_acc + .as_any() + .downcast_ref::() + .expect("Arroyo payload should deserialize to MultipleSumAccumulator"); + + assert_eq!( + handcrafted_output.aggregation_id, + arroyo_output.aggregation_id + ); + assert_eq!( + handcrafted_output.start_timestamp, + arroyo_output.start_timestamp + ); + assert_eq!( + handcrafted_output.end_timestamp, + arroyo_output.end_timestamp + ); + assert_eq!(handcrafted_output.key, arroyo_output.key); + assert_eq!(handcrafted_acc.sums, arroyo_acc.sums); + } + + // ----------------------------------------------------------------------- + // Test: late data drop // ----------------------------------------------------------------------- #[test] @@ -1153,8 +1567,8 @@ mod tests { agg_configs.insert(4, config); let sink = Arc::new(CapturingOutputSink::new()); - // allowed_lateness_ms = 0 let (_tx, rx) = tokio::sync::mpsc::channel(1); + let wm = Arc::new(AtomicI64::new(i64::MIN)); let mut worker = Worker::new( 0, rx, @@ -1167,25 +1581,27 @@ mod tests { raw_mode_aggregation_id: 0, late_data_policy: LateDataPolicy::Drop, }, + Arc::new(AtomicUsize::new(0)), + wm.clone(), + vec![wm], ); - // Establish watermark at t=20000ms (closes [0, 10000) and [10000, 20000)) + // Establish watermark at t=20000ms worker - .process_samples("cpu", vec![(20_000_i64, 1.0)]) + .process_group_samples(4, "", group_samples("cpu", vec![(20_000, 1.0)])) .unwrap(); - let _ = sink.drain(); // discard any earlier emissions + let _ = sink.drain(); - // Send a late sample (ts=5000 is behind watermark=20000 with lateness=0) + // Send a late sample worker - .process_samples("cpu", vec![(5_000_i64, 99.0)]) + .process_group_samples(4, "", group_samples("cpu", vec![(5_000, 99.0)])) .unwrap(); - // No new emission should occur (late sample is dropped) - assert_eq!(sink.len(), 0, "late sample should be dropped, not emitted"); + assert_eq!(sink.len(), 0, "late sample should be dropped"); } // ----------------------------------------------------------------------- - // Test: late data ForwardToStore — late sample emitted as mini-accumulator + // Test: late data ForwardToStore // ----------------------------------------------------------------------- #[test] @@ -1204,9 +1620,7 @@ mod tests { let sink = Arc::new(CapturingOutputSink::new()); let (_tx, rx) = tokio::sync::mpsc::channel(1); - // allowed_lateness_ms = 15000 — large enough that ts=8000 passes the - // lateness filter (8000 >= 20000 - 15000 = 5000) while pane 0 is already - // evicted (window [0,10000) closed when watermark reached 20000). + let wm = Arc::new(AtomicI64::new(i64::MIN)); let mut worker = Worker::new( 0, rx, @@ -1219,31 +1633,30 @@ mod tests { raw_mode_aggregation_id: 0, late_data_policy: LateDataPolicy::ForwardToStore, }, + Arc::new(AtomicUsize::new(0)), + wm.clone(), + vec![wm], ); - // Seed pane 0, then advance watermark to 20000 (evicts pane 0) - worker.process_samples("cpu", vec![(500_i64, 1.0)]).unwrap(); + // Seed then advance watermark to 20000 + worker + .process_group_samples(5, "", group_samples("cpu", vec![(500, 1.0)])) + .unwrap(); worker - .process_samples("cpu", vec![(20_000_i64, 0.0)]) + .process_group_samples(5, "", group_samples("cpu", vec![(20_000, 0.0)])) .unwrap(); - let _ = sink.drain(); // discard the [0,10000) window emit + let _ = sink.drain(); - // Send a late sample for the evicted pane 0 (ts=8000 passes the - // lateness filter but pane 0 is gone → ForwardToStore path) + // Send late sample for evicted pane worker - .process_samples("cpu", vec![(8_000_i64, 55.0)]) + .process_group_samples(5, "", group_samples("cpu", vec![(8_000, 55.0)])) .unwrap(); let captured = sink.drain(); - assert_eq!( - captured.len(), - 1, - "ForwardToStore policy should emit the late sample" - ); + assert_eq!(captured.len(), 1, "ForwardToStore should emit"); let (output, acc) = &captured[0]; assert_eq!(output.aggregation_id, 5); - // The late sample is emitted with the window it belongs to: pane_start=0, window=[0,10000) assert_eq!(output.start_timestamp, 0); assert_eq!(output.end_timestamp, 10_000); @@ -1259,13 +1672,11 @@ mod tests { } // ----------------------------------------------------------------------- - // Test: worker built from a parsed streaming_config YAML + // Test: worker from streaming_config YAML // ----------------------------------------------------------------------- #[test] fn test_worker_from_streaming_config_yaml() { - // A minimal streaming_config.yaml payload — the same format the Python - // controller writes to disk and the engine reads at startup. let yaml = r#" aggregations: - aggregationId: 10 @@ -1288,34 +1699,29 @@ aggregations: let streaming_config = StreamingConfig::from_yaml_data(&data, None).expect("valid streaming config"); - assert!( - streaming_config.contains(10), - "aggregation 10 should be present" - ); + assert!(streaming_config.contains(10)); let agg_configs = arc_configs(streaming_config.get_all_aggregation_configs().clone()); let sink = Arc::new(CapturingOutputSink::new()); let mut worker = make_worker(agg_configs, sink.clone(), false, 0, LateDataPolicy::Drop); - // Three samples inside window [0, 10_000ms) worker - .process_samples("requests_total", vec![(1_000_i64, 3.0)]) + .process_group_samples(10, "", group_samples("requests_total", vec![(1_000, 3.0)])) .unwrap(); worker - .process_samples("requests_total", vec![(5_000_i64, 4.0)]) + .process_group_samples(10, "", group_samples("requests_total", vec![(5_000, 4.0)])) .unwrap(); worker - .process_samples("requests_total", vec![(9_000_i64, 5.0)]) + .process_group_samples(10, "", group_samples("requests_total", vec![(9_000, 5.0)])) .unwrap(); - assert_eq!(sink.len(), 0, "window not yet closed"); + assert_eq!(sink.len(), 0); - // Advance watermark past window boundary to close [0, 10_000ms) worker - .process_samples("requests_total", vec![(10_000_i64, 0.0)]) + .process_group_samples(10, "", group_samples("requests_total", vec![(10_000, 0.0)])) .unwrap(); let captured = sink.drain(); - assert_eq!(captured.len(), 1, "exactly one window should close"); + assert_eq!(captured.len(), 1); let (output, acc) = &captured[0]; assert_eq!(output.aggregation_id, 10); @@ -1364,4 +1770,214 @@ aggregations: ); assert_eq!(key.labels, vec!["GET".to_string(), "200".to_string()]); } + + #[test] + fn test_build_group_key_label_values() { + let key = build_group_key_label_values("constant"); + assert_eq!(key.labels, vec!["constant".to_string()]); + + let key = build_group_key_label_values("us-east;svc-a"); + assert_eq!(key.labels, vec!["us-east".to_string(), "svc-a".to_string()]); + + let key = build_group_key_label_values(""); + assert_eq!(key.labels, vec!["".to_string()]); + } + + // ----------------------------------------------------------------------- + // Tests: cross-group watermark propagation + // ----------------------------------------------------------------------- + + #[test] + fn test_intra_worker_watermark_propagation() { + // Two groups on the same worker. Group A advances to t=100s. + // Group B has data at t=10s and then goes idle. + // After flush, group B's idle windows should close via propagation. + let config = make_agg_config( + 1, + "cpu", + AggregationType::SingleSubpopulation, + "Sum", + 10, + 0, + vec![], + ); + let agg_configs = arc_configs(HashMap::from([(1, config)])); + let sink = Arc::new(CapturingOutputSink::new()); + let mut worker = make_worker(agg_configs, sink.clone(), false, 0, LateDataPolicy::Drop); + + // Group A: send sample at t=5s (within window [0, 10s)) + worker + .process_group_samples(1, "groupA", group_samples("cpu", vec![(5_000, 1.0)])) + .unwrap(); + // Group B: send sample at t=5s (within window [0, 10s)) + worker + .process_group_samples(1, "groupB", group_samples("cpu", vec![(5_000, 2.0)])) + .unwrap(); + let _ = sink.drain(); + + // Advance group A's watermark to t=100s (closes many windows). + worker + .process_group_samples(1, "groupA", group_samples("cpu", vec![(100_000, 3.0)])) + .unwrap(); + let _ = sink.drain(); + + // Group B has NOT received new data — its watermark is still at 5s. + // Flush should propagate group A's watermark to group B. + worker.flush_all().unwrap(); + let flushed = sink.drain(); + + // Group B's window [0, 10s) should now be closed via propagation. + let group_b_outputs: Vec<_> = flushed + .iter() + .filter(|(out, _)| { + out.key + .as_ref() + .map(|k| k.labels == vec!["groupB".to_string()]) + .unwrap_or(false) + }) + .collect(); + assert!( + !group_b_outputs.is_empty(), + "idle group B should have windows closed via watermark propagation" + ); + } + + #[test] + fn test_compute_global_watermark_min_of_started() { + let wm0 = Arc::new(AtomicI64::new(100_000)); + let wm1 = Arc::new(AtomicI64::new(80_000)); + let wm2 = Arc::new(AtomicI64::new(90_000)); + let all = vec![wm0.clone(), wm1.clone(), wm2.clone()]; + + let (_tx, rx) = tokio::sync::mpsc::channel(1); + let worker = Worker::new( + 0, + rx, + Arc::new(CapturingOutputSink::new()), + HashMap::new(), + WorkerRuntimeConfig { + max_buffer_per_series: 10_000, + allowed_lateness_ms: 0, + pass_raw_samples: false, + raw_mode_aggregation_id: 0, + late_data_policy: LateDataPolicy::Drop, + }, + Arc::new(AtomicUsize::new(0)), + wm0, + all, + ); + + assert_eq!(worker.compute_global_watermark(), 80_000); + } + + #[test] + fn test_compute_global_watermark_ignores_unstarted() { + let wm0 = Arc::new(AtomicI64::new(100_000)); + let wm1 = Arc::new(AtomicI64::new(i64::MIN)); // not started + let all = vec![wm0.clone(), wm1.clone()]; + + let (_tx, rx) = tokio::sync::mpsc::channel(1); + let worker = Worker::new( + 0, + rx, + Arc::new(CapturingOutputSink::new()), + HashMap::new(), + WorkerRuntimeConfig { + max_buffer_per_series: 10_000, + allowed_lateness_ms: 0, + pass_raw_samples: false, + raw_mode_aggregation_id: 0, + late_data_policy: LateDataPolicy::Drop, + }, + Arc::new(AtomicUsize::new(0)), + wm0, + all, + ); + + assert_eq!( + worker.compute_global_watermark(), + 100_000, + "unstarted workers (i64::MIN) should be ignored" + ); + } + + #[test] + fn test_compute_global_watermark_all_unstarted() { + let wm0 = Arc::new(AtomicI64::new(i64::MIN)); + let wm1 = Arc::new(AtomicI64::new(i64::MIN)); + let all = vec![wm0.clone(), wm1.clone()]; + + let (_tx, rx) = tokio::sync::mpsc::channel(1); + let worker = Worker::new( + 0, + rx, + Arc::new(CapturingOutputSink::new()), + HashMap::new(), + WorkerRuntimeConfig { + max_buffer_per_series: 10_000, + allowed_lateness_ms: 0, + pass_raw_samples: false, + raw_mode_aggregation_id: 0, + late_data_policy: LateDataPolicy::Drop, + }, + Arc::new(AtomicUsize::new(0)), + wm0, + all, + ); + + assert_eq!( + worker.compute_global_watermark(), + i64::MIN, + "all unstarted should return i64::MIN" + ); + } + + #[test] + fn test_flush_publishes_worker_watermark() { + let config = make_agg_config( + 1, + "cpu", + AggregationType::SingleSubpopulation, + "Sum", + 10, + 0, + vec![], + ); + let agg_configs = arc_configs(HashMap::from([(1, config)])); + let sink = Arc::new(CapturingOutputSink::new()); + let wm = Arc::new(AtomicI64::new(i64::MIN)); + let all = vec![wm.clone()]; + let (_tx, rx) = tokio::sync::mpsc::channel(1); + let mut worker = Worker::new( + 0, + rx, + sink, + agg_configs, + WorkerRuntimeConfig { + max_buffer_per_series: 10_000, + allowed_lateness_ms: 0, + pass_raw_samples: false, + raw_mode_aggregation_id: 0, + late_data_policy: LateDataPolicy::Drop, + }, + Arc::new(AtomicUsize::new(0)), + wm.clone(), + all, + ); + + assert_eq!(wm.load(Ordering::Acquire), i64::MIN); + + // Send data at t=50s + worker + .process_group_samples(1, "", group_samples("cpu", vec![(50_000, 1.0)])) + .unwrap(); + + // Flush should publish worker watermark + worker.flush_all().unwrap(); + assert_eq!( + wm.load(Ordering::Acquire), + 50_000, + "worker watermark should be published after flush" + ); + } } diff --git a/asap-query-engine/src/stores/simple_map_store/global.rs b/asap-query-engine/src/stores/simple_map_store/global.rs index 66529db..e539ae8 100644 --- a/asap-query-engine/src/stores/simple_map_store/global.rs +++ b/asap-query-engine/src/stores/simple_map_store/global.rs @@ -156,6 +156,53 @@ impl SimpleMapStoreGlobal { cleanup_policy, } } + + /// Collect diagnostic info about store contents. + pub fn diagnostic_info(&self) -> super::StoreDiagnostics { + use super::{AggregationDiagnostic, StoreDiagnostics}; + + let data = self.lock.lock().unwrap(); + let mut per_aggregation = Vec::new(); + let mut total_time_map_entries: usize = 0; + let total_sketch_bytes: usize = 0; + + for (&agg_id, per_key) in &data.stores { + let time_map_len = per_key.current_epoch.window_count() + + per_key + .sealed_epochs + .values() + .map(|e| e.distinct_window_count()) + .sum::(); + let read_counts_len = data + .read_counts + .get(&agg_id) + .map(|rc| rc.len()) + .unwrap_or(0); + total_time_map_entries += time_map_len; + + let num_aggregate_objects = per_key.current_epoch.len() + + per_key + .sealed_epochs + .values() + .map(|e| e.entries.len()) + .sum::(); + + per_aggregation.push(AggregationDiagnostic { + aggregation_id: agg_id, + time_map_len, + read_counts_len, + num_aggregate_objects, + sketch_bytes: 0, // skip serialization for diagnostics + }); + } + + StoreDiagnostics { + num_aggregations: data.stores.len(), + total_time_map_entries, + total_sketch_bytes, + per_aggregation, + } + } } /// Extracted config fields needed inside the locked batch loop. diff --git a/asap-query-engine/src/stores/simple_map_store/mod.rs b/asap-query-engine/src/stores/simple_map_store/mod.rs index 2600c28..29c78d6 100644 --- a/asap-query-engine/src/stores/simple_map_store/mod.rs +++ b/asap-query-engine/src/stores/simple_map_store/mod.rs @@ -7,16 +7,32 @@ use crate::data_model::{ AggregateCore, CleanupPolicy, LockStrategy, PrecomputedOutput, StreamingConfig, }; use crate::stores::{Store, StoreResult, TimestampedBucketsMap}; +use global::SimpleMapStoreGlobal; +use per_key::SimpleMapStorePerKey; use std::collections::HashMap; use std::sync::Arc; -pub use legacy::LegacySimpleMapStoreGlobal; -pub use legacy::LegacySimpleMapStorePerKey; +/// Diagnostic snapshot from a single aggregation ID in the store. +pub struct AggregationDiagnostic { + pub aggregation_id: u64, + pub time_map_len: usize, + pub read_counts_len: usize, + pub num_aggregate_objects: usize, + pub sketch_bytes: usize, +} + +/// Diagnostic snapshot of the entire store. +pub struct StoreDiagnostics { + pub num_aggregations: usize, + pub total_time_map_entries: usize, + pub total_sketch_bytes: usize, + pub per_aggregation: Vec, +} /// Enum wrapper that dispatches to either global or per-key lock implementation pub enum SimpleMapStore { - Global(LegacySimpleMapStoreGlobal), - PerKey(LegacySimpleMapStorePerKey), + Global(SimpleMapStoreGlobal), + PerKey(SimpleMapStorePerKey), } impl SimpleMapStore { @@ -25,6 +41,14 @@ impl SimpleMapStore { Self::new_with_strategy(streaming_config, cleanup_policy, LockStrategy::PerKey) } + /// Collect diagnostic info for memory investigation. + pub fn diagnostic_info(&self) -> StoreDiagnostics { + match self { + SimpleMapStore::Global(store) => store.diagnostic_info(), + SimpleMapStore::PerKey(store) => store.diagnostic_info(), + } + } + /// Constructor with explicit lock strategy (used by main.rs) pub fn new_with_strategy( streaming_config: Arc, @@ -32,14 +56,12 @@ impl SimpleMapStore { lock_strategy: LockStrategy, ) -> Self { match lock_strategy { - LockStrategy::Global => SimpleMapStore::Global(LegacySimpleMapStoreGlobal::new( - streaming_config, - cleanup_policy, - )), - LockStrategy::PerKey => SimpleMapStore::PerKey(LegacySimpleMapStorePerKey::new( - streaming_config, - cleanup_policy, - )), + LockStrategy::Global => { + SimpleMapStore::Global(SimpleMapStoreGlobal::new(streaming_config, cleanup_policy)) + } + LockStrategy::PerKey => { + SimpleMapStore::PerKey(SimpleMapStorePerKey::new(streaming_config, cleanup_policy)) + } } } } diff --git a/asap-query-engine/src/stores/simple_map_store/per_key.rs b/asap-query-engine/src/stores/simple_map_store/per_key.rs index a1cdfb8..728a5aa 100644 --- a/asap-query-engine/src/stores/simple_map_store/per_key.rs +++ b/asap-query-engine/src/stores/simple_map_store/per_key.rs @@ -185,6 +185,53 @@ impl SimpleMapStorePerKey { } } + /// Collect diagnostic info about store contents. + pub fn diagnostic_info(&self) -> super::StoreDiagnostics { + use super::{AggregationDiagnostic, StoreDiagnostics}; + + let mut per_aggregation = Vec::new(); + let mut total_time_map_entries: usize = 0; + let total_sketch_bytes: usize = 0; + + for entry in self.store.iter() { + let agg_id = *entry.key(); + let data = match entry.value().read() { + Ok(d) => d, + Err(_) => continue, + }; + let time_map_len = data.current_epoch.window_count() + + data + .sealed_epochs + .values() + .map(|e| e.distinct_window_count()) + .sum::(); + let read_counts_len = data.read_counts.lock().map(|rc| rc.len()).unwrap_or(0); + total_time_map_entries += time_map_len; + + let num_aggregate_objects = data.current_epoch.len() + + data + .sealed_epochs + .values() + .map(|e| e.entries.len()) + .sum::(); + + per_aggregation.push(AggregationDiagnostic { + aggregation_id: agg_id, + time_map_len, + read_counts_len, + num_aggregate_objects, + sketch_bytes: 0, // skip serialization for diagnostics + }); + } + + StoreDiagnostics { + num_aggregations: self.store.len(), + total_time_map_entries, + total_sketch_bytes, + per_aggregation, + } + } + fn cleanup_old_aggregates( &self, data: &mut StoreKeyData, diff --git a/asap-query-engine/tests/e2e_precompute_equivalence.rs b/asap-query-engine/tests/e2e_precompute_equivalence.rs index c80f5ec..a1ecee1 100644 --- a/asap-query-engine/tests/e2e_precompute_equivalence.rs +++ b/asap-query-engine/tests/e2e_precompute_equivalence.rs @@ -37,6 +37,29 @@ fn make_agg_config( window_secs: u64, slide_secs: u64, grouping: Vec<&str>, +) -> AggregationConfig { + make_agg_config_full( + id, + metric, + agg_type, + agg_sub_type, + window_secs, + slide_secs, + grouping, + vec![], + ) +} + +#[allow(clippy::too_many_arguments)] +fn make_agg_config_full( + id: u64, + metric: &str, + agg_type: AggregationType, + agg_sub_type: &str, + window_secs: u64, + slide_secs: u64, + grouping: Vec<&str>, + aggregated: Vec<&str>, ) -> AggregationConfig { let window_type = if slide_secs == 0 || slide_secs == window_secs { WindowType::Tumbling @@ -51,7 +74,9 @@ fn make_agg_config( promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new( grouping.iter().map(|s| s.to_string()).collect(), ), - promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new( + aggregated.iter().map(|s| s.to_string()).collect(), + ), promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), String::new(), window_secs, @@ -265,14 +290,15 @@ async fn e2e_multiple_sum_output_matches_arroyo() { let agg_id = 2u64; let window_secs = 10u64; - let config = make_agg_config( + let config = make_agg_config_full( agg_id, "cpu", AggregationType::MultipleSum, "sum", window_secs, 0, - vec!["host"], + vec![], // grouping: none + vec!["host"], // aggregated: host is the key INSIDE the sketch ); let mut agg_map = HashMap::new(); agg_map.insert(agg_id, config); diff --git a/asap-quickstart/Dockerfile.queryengine-local b/asap-quickstart/Dockerfile.queryengine-local new file mode 100644 index 0000000..1988f35 --- /dev/null +++ b/asap-quickstart/Dockerfile.queryengine-local @@ -0,0 +1,14 @@ +# Lightweight image that copies a pre-built query engine binary +FROM ubuntu:24.04 + +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates libssl3 zlib1g \ + && rm -rf /var/lib/apt/lists/* + +COPY asap-quickstart/bin/query_engine_rust /usr/local/bin/query_engine_rust + +EXPOSE 8088 + +ENTRYPOINT ["query_engine_rust"] diff --git a/asap-quickstart/config/prometheus-precompute.yml b/asap-quickstart/config/prometheus-precompute.yml new file mode 100644 index 0000000..ee1979b --- /dev/null +++ b/asap-quickstart/config/prometheus-precompute.yml @@ -0,0 +1,36 @@ +# Prometheus configuration for pattern-based fake exporters demo +# Uses the hand-crafted precompute engine (instead of Arroyo) for sketch building + +global: + scrape_interval: 1s + evaluation_interval: 1s + +# Remote write configuration to send metrics to the precompute engine for sketch building +remote_write: + - url: http://queryengine:9091/api/v1/write + queue_config: + batch_send_deadline: 1s + write_relabel_configs: + - source_labels: [__name__] + regex: sensor_reading + action: keep + +scrape_configs: + # Scrape pattern-based fake exporters + # Each exporter generates one pattern type (constant, sine, linear, etc.) + # All metrics have a 'pattern' label indicating their pattern type + - job_name: 'pattern-exporters' + metric_relabel_configs: + - source_labels: [__name__] + regex: sensor_reading + action: keep + static_configs: + - targets: + - 'fake-exporter-constant:50000' + - 'fake-exporter-linear-up:50001' + - 'fake-exporter-linear-down:50002' + - 'fake-exporter-sine:50003' + - 'fake-exporter-sine-noise:50004' + - 'fake-exporter-step:50005' + - 'fake-exporter-spiky:50006' + - 'fake-exporter-exp-up:50007' diff --git a/asap-quickstart/docker-compose-precompute.dev.yml b/asap-quickstart/docker-compose-precompute.dev.yml new file mode 100644 index 0000000..9786ea9 --- /dev/null +++ b/asap-quickstart/docker-compose-precompute.dev.yml @@ -0,0 +1,12 @@ +name: asapquery-quickstart-precompute + +# Development override for precompute variant: builds query engine from local source. +# +# Usage: +# docker compose -f docker-compose-precompute.yml -f docker-compose-precompute.dev.yml up -d --build + +services: + queryengine: + build: + context: .. + dockerfile: asap-query-engine/Dockerfile diff --git a/asap-quickstart/docker-compose-precompute.local.yml b/asap-quickstart/docker-compose-precompute.local.yml new file mode 100644 index 0000000..c2a2320 --- /dev/null +++ b/asap-quickstart/docker-compose-precompute.local.yml @@ -0,0 +1,12 @@ +name: asapquery-quickstart-precompute + +# Override: builds query engine from pre-built local binary (fast). +# Usage: +# cargo build --release -p query_engine_rust # build locally first +# sudo docker compose -f docker-compose-precompute.yml -f docker-compose-precompute.local.yml up -d --build + +services: + queryengine: + build: + context: .. + dockerfile: asap-quickstart/Dockerfile.queryengine-local diff --git a/asap-quickstart/docker-compose-precompute.yml b/asap-quickstart/docker-compose-precompute.yml new file mode 100644 index 0000000..e0353ad --- /dev/null +++ b/asap-quickstart/docker-compose-precompute.yml @@ -0,0 +1,322 @@ +name: asapquery-quickstart-precompute + +# Docker Compose file for pattern-based fake exporters demo +# Uses the hand-crafted precompute engine instead of Arroyo for streaming aggregation +# This eliminates the need for Kafka, Arroyo, and asap-summary-ingest + +networks: + asap-network: + driver: bridge + ipam: + driver: default + config: + - subnet: 172.25.0.0/16 + +volumes: + prometheus-data: + grafana-data: + asap-planner-output: + +services: + ############################################################################# + # INFRASTRUCTURE SERVICES + ############################################################################# + + prometheus: + image: prom/prometheus:v3.9.1 + container_name: asap-prometheus + hostname: prometheus + networks: + - asap-network + ports: + - "9090:9090" + volumes: + - ./config/prometheus-precompute.yml:/etc/prometheus/prometheus.yml:ro + - prometheus-data:/prometheus + command: + - "--config.file=/etc/prometheus/prometheus.yml" + - "--storage.tsdb.path=/prometheus" + - "--web.console.libraries=/usr/share/prometheus/console_libraries" + - "--web.console.templates=/usr/share/prometheus/consoles" + - "--web.enable-lifecycle" + healthcheck: + test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider http://localhost:9090/-/healthy || exit 1"] + interval: 10s + timeout: 5s + retries: 5 + depends_on: + queryengine: + condition: service_healthy + restart: no + + grafana: + image: grafana/grafana-enterprise:12.3.3 + container_name: asap-grafana + hostname: grafana + networks: + - asap-network + ports: + - "3000:3000" + environment: + - GF_SECURITY_ADMIN_PASSWORD=admin + - GF_SECURITY_ADMIN_USER=admin + - GF_USERS_ALLOW_SIGN_UP=false + - GF_SERVER_ROOT_URL=http://localhost:3000 + - GF_SECURITY_ALLOW_EMBEDDING=true + volumes: + - grafana-data:/var/lib/grafana + - ./config/grafana/provisioning:/etc/grafana/provisioning:ro + healthcheck: + test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider http://localhost:3000/api/health || exit 1"] + interval: 10s + timeout: 5s + retries: 5 + restart: no + + ############################################################################# + # INIT CONTAINERS + ############################################################################# + + asap-planner-rs: + image: ghcr.io/projectasap/asap-planner-rs:v0.2.0 + container_name: asap-planner-rs + hostname: asap-planner-rs + networks: + - asap-network + command: + - "--input_config=/config/controller-config.yaml" + - "--output_dir=/asap-planner-output" + - "--prometheus_scrape_interval=1" + - "--streaming_engine=arroyo" + - "--range-duration=300" + - "--step=10" + volumes: + - ./config/controller-config.yaml:/config/controller-config.yaml:ro + - asap-planner-output:/asap-planner-output + restart: "no" + + ############################################################################# + # CORE SERVICES + ############################################################################# + + queryengine: + image: ghcr.io/projectasap/asap-query-engine:v0.2.0-precompute + container_name: asap-queryengine + hostname: queryengine + networks: + - asap-network + ports: + - "8088:8088" + expose: + - "9091" + environment: + - RUST_LOG=INFO + - RUST_BACKTRACE=1 + volumes: + - asap-planner-output:/asap-planner-output:ro + - ./output/queryengine:/app/outputs + command: + - "--config=/asap-planner-output/inference_config.yaml" + - "--streaming-config=/asap-planner-output/streaming_config.yaml" + - "--prometheus-server=http://prometheus:9090" + - "--prometheus-scrape-interval=1" + - "--streaming-engine=precompute" + - "--prometheus-remote-write-port=9091" + - "--delete-existing-db" + - "--log-level=INFO" + - "--output-dir=/app/outputs" + - "--query-language=PROMQL" + - "--lock-strategy=per-key" + - "--forward-unsupported-queries" + healthcheck: + test: ["CMD-SHELL", "bash -c 'echo > /dev/tcp/localhost/8088' 2>/dev/null || exit 1"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 15s + depends_on: + asap-planner-rs: + condition: service_completed_successfully + restart: no + + ############################################################################# + # PATTERN-BASED FAKE EXPORTERS + # Each exporter generates one pattern type with the 'pattern' label + # All series within an exporter follow the same pattern shape with variation + ############################################################################# + + # Constant values - baseline for comparison + fake-exporter-constant: + image: ghcr.io/projectasap/asap-fake-exporter:v0.2.0 + container_name: asap-fake-exporter-constant + hostname: fake-exporter-constant + networks: + - asap-network + expose: + - "50000" + command: + - "--port=50000" + - "--valuescale=1000" + - "--dataset=constant" + - "--num-labels=3" + - "--num-values-per-label=30,30,30" + - "--metric-type=gauge" + - "--metric-name=sensor_reading" + - "--label-names=region,service,host" + - "--label-value-prefixes=region,svc,host" + - "--add-pattern-label" + restart: no + + # Linear increasing - tests trend preservation + fake-exporter-linear-up: + image: ghcr.io/projectasap/asap-fake-exporter:v0.2.0 + container_name: asap-fake-exporter-linear-up + hostname: fake-exporter-linear-up + networks: + - asap-network + expose: + - "50001" + command: + - "--port=50001" + - "--valuescale=1000" + - "--dataset=linear-up" + - "--num-labels=3" + - "--num-values-per-label=30,30,30" + - "--metric-type=gauge" + - "--metric-name=sensor_reading" + - "--label-names=region,service,host" + - "--label-value-prefixes=region,svc,host" + - "--add-pattern-label" + restart: no + + # Linear decreasing - tests trend preservation + fake-exporter-linear-down: + image: ghcr.io/projectasap/asap-fake-exporter:v0.2.0 + container_name: asap-fake-exporter-linear-down + hostname: fake-exporter-linear-down + networks: + - asap-network + expose: + - "50002" + command: + - "--port=50002" + - "--valuescale=1000" + - "--dataset=linear-down" + - "--num-labels=3" + - "--num-values-per-label=30,30,30" + - "--metric-type=gauge" + - "--metric-name=sensor_reading" + - "--label-names=region,service,host" + - "--label-value-prefixes=region,svc,host" + - "--add-pattern-label" + restart: no + + # Sine wave - tests periodicity preservation + fake-exporter-sine: + image: ghcr.io/projectasap/asap-fake-exporter:v0.2.0 + container_name: asap-fake-exporter-sine + hostname: fake-exporter-sine + networks: + - asap-network + expose: + - "50003" + command: + - "--port=50003" + - "--valuescale=1000" + - "--dataset=sine" + - "--num-labels=3" + - "--num-values-per-label=30,30,30" + - "--metric-type=gauge" + - "--metric-name=sensor_reading" + - "--label-names=region,service,host" + - "--label-value-prefixes=region,svc,host" + - "--add-pattern-label" + restart: no + + # Sine with noise - tests signal extraction / smoothing + fake-exporter-sine-noise: + image: ghcr.io/projectasap/asap-fake-exporter:v0.2.0 + container_name: asap-fake-exporter-sine-noise + hostname: fake-exporter-sine-noise + networks: + - asap-network + expose: + - "50004" + command: + - "--port=50004" + - "--valuescale=1000" + - "--dataset=sine-noise" + - "--num-labels=3" + - "--num-values-per-label=30,30,30" + - "--metric-type=gauge" + - "--metric-name=sensor_reading" + - "--label-names=region,service,host" + - "--label-value-prefixes=region,svc,host" + - "--add-pattern-label" + restart: no + + # Step function - tests edge preservation + fake-exporter-step: + image: ghcr.io/projectasap/asap-fake-exporter:v0.2.0 + container_name: asap-fake-exporter-step + hostname: fake-exporter-step + networks: + - asap-network + expose: + - "50005" + command: + - "--port=50005" + - "--valuescale=1000" + - "--dataset=step" + - "--num-labels=3" + - "--num-values-per-label=30,30,30" + - "--metric-type=gauge" + - "--metric-name=sensor_reading" + - "--label-names=region,service,host" + - "--label-value-prefixes=region,svc,host" + - "--add-pattern-label" + restart: no + + # Spiky pattern - tests sudden spikes/drops + fake-exporter-spiky: + image: ghcr.io/projectasap/asap-fake-exporter:v0.2.0 + container_name: asap-fake-exporter-spiky + hostname: fake-exporter-spiky + networks: + - asap-network + expose: + - "50006" + command: + - "--port=50006" + - "--valuescale=1000" + - "--dataset=spiky" + - "--num-labels=3" + - "--num-values-per-label=30,30,30" + - "--metric-type=gauge" + - "--metric-name=sensor_reading" + - "--label-names=region,service,host" + - "--label-value-prefixes=region,svc,host" + - "--add-pattern-label" + restart: no + + # Exponential growth - tests non-linear patterns + fake-exporter-exp-up: + image: ghcr.io/projectasap/asap-fake-exporter:v0.2.0 + container_name: asap-fake-exporter-exp-up + hostname: fake-exporter-exp-up + networks: + - asap-network + expose: + - "50007" + command: + - "--port=50007" + - "--valuescale=1000" + - "--dataset=exp-up" + - "--num-labels=3" + - "--num-values-per-label=30,30,30" + - "--metric-type=gauge" + - "--metric-name=sensor_reading" + - "--label-names=region,service,host" + - "--label-value-prefixes=region,svc,host" + - "--add-pattern-label" + restart: no