diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index a5fca27..df006de 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -67,6 +67,7 @@ path = "src/bin/precompute_engine.rs" name = "test_e2e_precompute" path = "src/bin/test_e2e_precompute.rs" + [dev-dependencies] ctor = "0.2" tempfile = "3.20.0" diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index a0ebab5..faf1fe1 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -566,6 +566,17 @@ struct BenchResult { batch_latency_ms: f64, } +struct BenchRunConfig { + label: String, + port: u16, + streaming_config: Arc, + num_workers: usize, + num_concurrent_senders: usize, + num_requests: u64, + samples_per_request: u64, + num_series: u64, +} + /// Build an AggregationConfig for Sum with specified window parameters. fn make_sum_agg_config( agg_id: u64, @@ -599,18 +610,20 @@ fn make_sum_agg_config( } /// Run a single windowed benchmark and return the results. -#[allow(clippy::too_many_arguments)] async fn run_single_bench( client: &reqwest::Client, - label: &str, - port: u16, - streaming_config: Arc, - num_workers: usize, - num_concurrent_senders: usize, - num_requests: u64, - samples_per_request: u64, - num_series: u64, + config: BenchRunConfig, ) -> Result> { + let BenchRunConfig { + label, + port, + streaming_config, + num_workers, + num_concurrent_senders, + num_requests, + samples_per_request, + num_series, + } = config; let total_samples = num_requests * samples_per_request; let noop_sink = Arc::new(NoopOutputSink::new()); @@ -728,7 +741,7 @@ async fn run_single_bench( println!(" Batch latency: {batch_latency_ms:.1}ms"); Ok(BenchResult { - label: label.to_string(), + label, send_throughput, e2e_throughput, batch_latency_ms, @@ -760,14 +773,16 @@ async fn run_windowed_benchmarks( let r = run_single_bench( client, - label, - port, - sc, - 4, - 4, // concurrent senders to saturate workers - num_requests, - samples_per_request, - num_series, + BenchRunConfig { + label: label.to_string(), + port, + streaming_config: sc, + num_workers: 4, + num_concurrent_senders: 4, // concurrent senders to saturate workers + num_requests, + samples_per_request, + num_series, + }, ) .await?; results.push(r); @@ -802,14 +817,16 @@ async fn run_scalability_benchmark( let r = run_single_bench( client, - &label, - port, - sc, - num_workers, - num_workers, // concurrent senders match worker count - num_requests, - samples_per_request, - num_series, + BenchRunConfig { + label, + port, + streaming_config: sc, + num_workers, + num_concurrent_senders: num_workers, // concurrent senders match worker count + num_requests, + samples_per_request, + num_series, + }, ) .await?; results.push(r); diff --git a/asap-query-engine/src/data_model/precomputed_output.rs b/asap-query-engine/src/data_model/precomputed_output.rs index ac1c8f5..d57c67f 100644 --- a/asap-query-engine/src/data_model/precomputed_output.rs +++ b/asap-query-engine/src/data_model/precomputed_output.rs @@ -385,7 +385,7 @@ impl PrecomputedOutput { Ok(Box::new(accumulator)) } "MultipleSum" => { - let accumulator = MultipleSumAccumulator::deserialize_from_bytes(buffer) + let accumulator = MultipleSumAccumulator::deserialize_from_bytes_arroyo(buffer) .map_err(|e| format!("Failed to deserialize MultipleSumAccumulator: {e}"))?; Ok(Box::new(accumulator)) } diff --git a/asap-query-engine/src/main.rs b/asap-query-engine/src/main.rs index 70149c4..0e5cb8f 100644 --- a/asap-query-engine/src/main.rs +++ b/asap-query-engine/src/main.rs @@ -9,6 +9,7 @@ use sketch_core::config::{self, ImplMode}; use query_engine_rust::data_model::enums::{InputFormat, LockStrategy, StreamingEngine}; use query_engine_rust::drivers::AdapterConfig; +use query_engine_rust::precompute_engine::config::LateDataPolicy; use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config}; use query_engine_rust::{ HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtlpReceiver, @@ -301,7 +302,7 @@ async fn main() -> Result<()> { channel_buffer_size: args.precompute_channel_buffer_size, pass_raw_samples: false, raw_mode_aggregation_id: 0, - late_data_policy: query_engine_rust::precompute_engine::config::LateDataPolicy::Drop, + late_data_policy: LateDataPolicy::Drop, }; let output_sink = Arc::new(StoreOutputSink::new(store.clone())); let engine = diff --git a/asap-query-engine/src/precompute_engine/accumulator_factory.rs b/asap-query-engine/src/precompute_engine/accumulator_factory.rs index 52c376c..028d47f 100644 --- a/asap-query-engine/src/precompute_engine/accumulator_factory.rs +++ b/asap-query-engine/src/precompute_engine/accumulator_factory.rs @@ -633,6 +633,15 @@ pub fn create_accumulator_updater(config: &AggregationConfig) -> Box Box::new(MultipleSumUpdater::new()), + "MultipleIncrease" | "multiple_increase" => Box::new(MultipleIncreaseUpdater::new()), + "MultipleMinMax" | "multiple_min_max" => Box::new(MultipleMinMaxUpdater::new( + if sub_type.eq_ignore_ascii_case("max") { + "max".to_string() + } else { + "min".to_string() + }, + )), "Sum" | "sum" => Box::new(SumAccumulatorUpdater::new()), "Min" | "min" => Box::new(MinMaxAccumulatorUpdater::new("min".to_string())), "Max" | "max" => Box::new(MinMaxAccumulatorUpdater::new("max".to_string())), diff --git a/asap-query-engine/src/precompute_engine/output_sink.rs b/asap-query-engine/src/precompute_engine/output_sink.rs index 69bd3a1..d1bc2bc 100644 --- a/asap-query-engine/src/precompute_engine/output_sink.rs +++ b/asap-query-engine/src/precompute_engine/output_sink.rs @@ -1,6 +1,6 @@ use crate::data_model::{AggregateCore, PrecomputedOutput}; use crate::stores::Store; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use tracing::debug_span; /// Trait for emitting completed window outputs. @@ -61,6 +61,47 @@ impl OutputSink for RawPassthroughSink { } } +/// A capturing sink for testing that stores all emitted outputs. +pub struct CapturingOutputSink { + pub captured: Mutex)>>, +} + +impl CapturingOutputSink { + pub fn new() -> Self { + Self { + captured: Mutex::new(Vec::new()), + } + } + + pub fn drain(&self) -> Vec<(PrecomputedOutput, Box)> { + self.captured.lock().unwrap().drain(..).collect() + } + + pub fn len(&self) -> usize { + self.captured.lock().unwrap().len() + } + + pub fn is_empty(&self) -> bool { + self.captured.lock().unwrap().is_empty() + } +} + +impl Default for CapturingOutputSink { + fn default() -> Self { + Self::new() + } +} + +impl OutputSink for CapturingOutputSink { + fn emit_batch( + &self, + outputs: Vec<(PrecomputedOutput, Box)>, + ) -> Result<(), Box> { + self.captured.lock().unwrap().extend(outputs); + Ok(()) + } +} + /// A no-op sink for testing that just counts emitted batches. pub struct NoopOutputSink { pub emit_count: std::sync::atomic::AtomicU64, diff --git a/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md b/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md index 25bd7a4..2244b45 100644 --- a/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md +++ b/asap-query-engine/src/precompute_engine/precompute_engine_design_doc.md @@ -116,6 +116,39 @@ enum WorkerMessage { `route_batch()` groups messages by target worker and sends them in parallel for throughput while preserving per-worker ordering. +#### Load balancing trade-offs and alternatives + +The current hash-mod scheme is correct and low-overhead but has four distinct failure modes. Each has a corresponding mitigation strategy. + +**Problem 1: hash skew — uneven series count per worker** + +With a good hash and N workers the variance in series count is O(√(S/N)), which is negligible at large S. Virtual nodes (each physical worker owns K hash ring slots) reduce variance further at zero runtime cost but are rarely necessary with xxhash64 in practice. + +**Problem 2: hot series — a few series dominate sample volume** + +The hash does not know about per-series sample rates. If one metric is scraped at 1 s while others are at 60 s, the owning worker handles 60× more samples. + +*Mitigation — weight-aware initial placement:* on first sight of a series, assign it to the least-loaded worker (by current sample rate) and record the assignment in a small routing table that replaces the hash lookup. The assignment remains stable (one series = one worker always), so no cross-worker state is needed. The routing table fits in memory for millions of series. Works well when series rates are observable at assignment time (e.g. from Prometheus service discovery). + +**Problem 3: GROUP BY fan-in — cross-worker store entries require query-time merge** + +Because routing is by full series key, two series that share a `grouping_labels` value but differ in rolled-up labels land on different workers and emit independent accumulators for the same `(agg_id, key, window)` tuple (see §4). The store must append multiple entries and the query engine merges them. + +*Mitigation A — route by grouping key:* use `xxhash64(grouping_key)` instead of the full series key. All series rolling up into the same GROUP BY bucket land on one worker, which merges them before emitting. The store gets exactly one entry per `(agg_id, key, window)` and no query-time fan-in is needed. Trade-offs: routing requires knowing the config's `grouping_labels` at ingest time; creates a new hot-key risk when one grouping value covers far more series than others; a series matched by multiple configs with different grouping keys would need to be sent to multiple workers. + +*Mitigation B — two-phase aggregation:* keep routing by series key (local aggregation as now) but emit partial accumulators to a second tier of reduce-workers routed by grouping key. Reduce-workers merge partials and write a single entry to the store. Eliminates query-time fan-in without the hot-key risk. Adds one extra hop of latency and requires coordinating two flush cycles. + +**Problem 4: static assignment — series stuck on overloaded workers** + +Hash-based assignment is fixed for the lifetime of the process. A series that begins emitting at 100× its original rate stays on the same worker forever. + +*Mitigation — state migration at window boundaries:* when a series has no open panes (i.e. `active_panes` is empty after a window close), its state can be serialized, sent to a new worker, and the routing table updated atomically. The empty-panes condition occurs naturally at every tumbling window boundary, or periodically for sliding windows after all panes are evicted. Operationally complex but sound — no split-window state is possible if migration is gated on the empty-panes condition. + +**Practical signal: channel backpressure** + +Before investing in any of the above, add observability to the bounded MPSC channels — if a worker's channel is frequently near capacity, that is the primary signal that routing is imbalanced. Exposing `channel.capacity()` (remaining slots) per worker as a metric is cheap and pinpoints which worker is the bottleneck, providing the data needed to choose between the mitigations above. + + ### 3.4 Worker (`worker.rs`) Each worker owns an isolated shard of the series space. @@ -150,6 +183,49 @@ struct AggregationState { } ``` +#### Accumulator lifecycle and ownership + +Accumulators are not pre-assigned — they are created **lazily** at three nested levels: + +**1. At engine startup** (`engine.rs`): every worker receives a full copy of all `AggregationConfig`s. All workers are symmetric; none is pre-assigned to any series or config. + +```rust +let agg_configs = streaming_config.get_all_aggregation_configs().clone(); +for (id, rx) in receivers { + Worker::new(id, rx, sink.clone(), agg_configs.clone(), ...) +} +``` + +**2. On first sample for a series** (`get_or_create_series_state`): the worker calls `matching_agg_configs(series_key)` to filter the config map by metric name, then creates one `AggregationState` per match (a `WindowManager` + empty pane map). No accumulators exist yet. + +```rust +let aggregations = matching_agg_configs(series_key).map(|(_, config)| AggregationState { + window_manager: WindowManager::new(config.window_size, config.slide_interval), + config: config.clone(), + active_panes: BTreeMap::new(), // ← empty; no memory allocated for sketches yet +}).collect(); +``` + +**3. On first sample in a pane** (`process_samples`): the accumulator is created the moment a sample falls into a pane that does not yet exist in `active_panes`. + +```rust +let updater = agg_state.active_panes + .entry(pane_start) + .or_insert_with(|| create_accumulator_updater(&agg_state.config)); +``` + +**Ownership hierarchy:** + +``` +Worker +└── series_map[series_key] one entry per series this worker owns + └── aggregations[i] one AggregationState per matching config + └── active_panes[pane_start] one AccumulatorUpdater per open pane + └── Box the actual sketch / sum / minmax / etc. +``` + +Because `xxhash64(series_key) % N` is deterministic, a series always lands on the same worker. Its accumulators live in exactly one worker with no sharing and no locking. Workers that never receive a series never allocate any state for it. + #### Pane-Based Sliding Window Optimization The worker uses **pane-based incremental computation** to reduce per-sample @@ -353,9 +429,83 @@ trait OutputSink: Send + Sync { **Implementations:** - `StoreOutputSink` — calls `store.insert_precomputed_output_batch()` - `RawPassthroughSink` — same interface, used for raw mode -- `NoopOutputSink` — testing helper that counts emitted items +- `NoopOutputSink` — testing helper that counts emitted items via `AtomicU64` +- `CapturingOutputSink` — testing helper that stores all emitted `(PrecomputedOutput, Box)` pairs in a `Mutex>`, with `drain()` and `len()` for assertions + +## 4. Cross-Series (GROUP BY) Aggregation + +### Label dimension roles -## 4. Data Model +`AggregationConfig` has three label dimension fields that control the spatial aggregation shape: + +| Field | Role | +|---|---| +| `grouping_labels` | Labels preserved in `PrecomputedOutput.key`; form the GROUP BY key visible at query time | +| `aggregated_labels` | Internal sub-keys for MultipleSubpopulation sketches (e.g. CMS, HydraKLL) | +| `rollup_labels` | Dropped entirely at ingest; not recoverable at query time | + +A config with `grouping_labels: [job]` and `rollup_labels: [instance]` means: "aggregate across all instances, keep one output series per job value." Multiple input series (`metric{job=j1,instance=h1}`, `metric{job=j1,instance=h2}`, ...) all contribute to the same logical output key `(job=j1)`. + +### Cross-worker fan-in + +Because routing is by full series key (`xxhash64(series_key) % N`), two series that share a `grouping_labels` value but differ in rolled-up labels typically land on different workers: + +``` +metric{job=j1, instance=h1} → Worker 0 → pane accumulator with key (job=j1) +metric{job=j1, instance=h2} → Worker 3 → pane accumulator with key (job=j1) +``` + +Each worker independently closes its window and emits a separate `PrecomputedOutput` with key `(job=j1)` for the same window `[0, 60s)`. The store **appends** rather than overwrites on the same `(aggregation_id, key, window)` tuple: + +``` +store[(agg_id, key=(j1), [0,60s))] → [acc_worker0, acc_worker3] +``` + +Query-time `SummaryMergeMultipleExec` merges all entries for the same key and window via `AggregateCore::merge_with()`. No ingest-time cross-worker coordination is needed. + +### Eventual consistency + +Workers have independent watermarks. For a standard Prometheus scrape (all instances delivered in one HTTP batch via `route_batch()`), all workers receive their samples in the same round-trip and close the window on the same flush cycle. The incompleteness window — time between the first and last worker emitting for the same cross-series window — is typically milliseconds (bounded by Tokio task scheduling jitter). + +For staggered multi-source producers arriving at different times, the incompleteness window is bounded by the spread of producer arrival times. In both cases the result is **eventually consistent**: once all contributing workers have emitted, the store holds a complete set of accumulators and queries return the correct merged value. + +This deferred-merge design is intentional — it preserves the shared-nothing worker architecture with zero ingest-time cross-worker coordination. The store's append-multiple-per-window design and the query-time merge handle the fan-in correctly for both cross-series aggregation and `ForwardToStore` late data. + +### Sliding windows with cross-worker GROUP BY + +The pane-sharing optimization is an **intra-worker** implementation detail. From the store and query engine's perspective, each worker always emits a complete, self-consistent accumulator for each closed window — tumbling or sliding makes no difference to the cross-worker fan-in. + +**Within a single worker** (e.g. Worker 0, series `{job=j1, instance=h1}`, 30s/10s sliding): + +``` +Window [0, 30s) — panes [0, 10s, 20s] + pane 0: take (evict — no future window needs it) + pane 10s: snapshot (shared with [10s, 40s)) + pane 20s: snapshot (shared with [10s, 40s) and [20s, 50s)) + → emit: acc_w0, key=(j1), window=[0,30s), sum = v_0 + v_10 + v_20 + +Window [10s, 40s) — panes [10s, 20s, 30s] + pane 10s: take (evict — snapshot for [0,30s) already completed) + pane 20s: snapshot + pane 30s: snapshot (or take, depending on future windows) + → emit: acc_w0, key=(j1), window=[10s,40s), sum = v_10 + v_20 + v_30 +``` + +Worker 3 (series `{job=j1, instance=h2}`) performs the same steps independently — its own pane `BTreeMap`, its own snapshots, its own emits. + +**What the store sees:** + +``` +store[(agg_id, key=(j1), [0, 30s))] → [acc_w0, acc_w3] +store[(agg_id, key=(j1), [10s,40s))] → [acc_w0, acc_w3] +store[(agg_id, key=(j1), [20s,50s))] → [acc_w0, acc_w3] +``` + +Each entry is a complete accumulator from one worker for one window. Query-time merge combines them identically to the tumbling case. + +The pane snapshot/take logic reduces memory and CPU inside each worker (avoiding re-accumulation of shared panes), but what exits the worker is always one standalone `Box` per window. Consecutive sliding windows `[0,30s)` and `[10s,40s)` share panes *inside* the worker but have independent store entries — their cross-worker merges at query time are completely unrelated. + +## 5. Data Model ### PrecomputedOutput @@ -393,7 +543,7 @@ pub struct AggregationConfig { } ``` -## 5. Store Integration +## 6. Store Integration ### Write path @@ -455,7 +605,7 @@ automatically combined with original window data at query time. | ReadBased | Remove after `read_count >= threshold` | | NoCleanup | Retain forever | -## 6. Late Data Handling +## 7. Late Data Handling Two checks determine whether a sample is "late": @@ -476,7 +626,7 @@ For case 2, the `LateDataPolicy` controls behavior: as normal closed-window outputs. The store appends it alongside the original window data, and query-time merge combines them. -## 7. Concurrency Model +## 8. Concurrency Model The current implementation is **single-machine, multi-threaded**. All components (HTTP server, workers, store) run within a single OS process as Tokio async @@ -499,7 +649,7 @@ across multiple engine instances (e.g. via consistent hashing at the load balancer level), each running this same single-process architecture independently. -## 8. Performance Characteristics +## 9. Performance Characteristics **Ingest path (per batch):** - Sample insert: O(log B) per sample (BTreeMap, B = buffer size) @@ -513,11 +663,14 @@ independently. - O(A × W_open) active pane accumulators (fewer than window accumulators since panes are shared across overlapping windows) -**Throughput:** +**Throughput (measured, 2x Xeon E5-2630 v3, 32 logical CPUs, 125 GiB RAM):** +- Raw mode, `NoopOutputSink`, 16 workers: **~8.9M samples/sec** flush throughput; near-linear scaling (19x at 16 workers vs 16x ideal). +- Windowed aggregation (Sum, W=1-6), 4 workers: **~660K samples/sec** E2E; throughput is nearly identical across W=1 and W=6, confirming the pane-based optimization. - Workers process in parallel with no cross-shard coordination. -- E2E test demonstrates ~10M samples/sec sustained throughput with raw mode. -## 9. CLI Usage +**Benchmark caveat -- `workers = senders` coupling:** The raw-mode scalability benchmark uses one concurrent HTTP sender per worker. The 1-worker baseline is bottlenecked by a single sender (one CPU for Snappy compression, one in-flight HTTP connection); at 16 workers, 16 senders parallelize compression across cores and pipeline connections. The apparent super-linear speedup (9.37x at 8 workers, 19.13x at 16 workers) reflects sender-side parallelism as much as engine-side scaling. A clean engine-scaling measurement would fix sender count and vary only worker count. + +## 10. CLI Usage ### Standalone binary @@ -541,17 +694,66 @@ The precompute engine is also embedded in the main `query_engine_rust` binary, enabled via `--enable-prometheus-remote-write`. In this mode it shares the store with the Kafka consumer path. -## 10. Testing +## 11. Testing + +- **Unit tests -- `worker.rs` (correctness, via `CapturingOutputSink`):** + + | Test | What it verifies | + |---|---| + | `test_raw_mode_forwarding` | 3 samples -> 3 emits; `start == end == ts`, `SumAccumulator.sum == value` | + | `test_tumbling_window_correctness` | Samples at t=1s/5s/9s; window [0,10s) closes on t=10s; `sum=6` | + | `test_sliding_window_pane_sharing` | Sample at t=15s in 30s/10s window -> 2 emits for [0,30s) and [10s,40s), both `sum=42` via shared pane snapshot/take | + | `test_groupby_separate_emits_per_series` | Two series (`host=A`, `host=B`) on same worker -> 2 independent `MultipleSumAccumulator` emits (no ingest-time cross-series merge) | + | `test_late_data_drop` | Sample behind `watermark - allowed_lateness_ms` with `Drop` policy -> 0 emits | + | `test_late_data_forward_to_store` | Late sample for evicted pane with `ForwardToStore` -> 1 emit as mini-accumulator with correct window bounds and sum | + +- **Unit tests -- other modules**: `window_manager.rs` (tumbling/sliding arithmetic, pane enumeration, closure detection), `series_buffer.rs` (ordering, watermark), `accumulator_factory.rs` (updater creation and reset), `series_router.rs` (consistent hash routing), `config.rs` (defaults). -- **Unit tests**: `worker.rs` (metric/label extraction), `window_manager.rs` - (tumbling/sliding arithmetic), `series_buffer.rs` (ordering, watermark), - `accumulator_factory.rs` (updater creation), `series_router.rs` (hashing), - `config.rs` (defaults). - **E2E test** (`bin/test_e2e_precompute.rs`): Starts engine + store + query - server in-process, sends remote-write samples, queries via PromQL HTTP, - validates results. Includes batch latency and throughput benchmarks. + server in-process, sends remote-write samples over HTTP, queries via PromQL HTTP, + validates aggregated results. Includes batch latency benchmark, windowed throughput + benchmark (W=1/3/6), and worker scalability benchmark (1-16 workers). + +## 12. Known Data Loss Cases and Fault Tolerance TODOs + +The engine is currently in-memory and single-process with no persistence of in-flight window state. The following cases result in data loss: + +| # | Case | When it occurs | Mitigation status | +|---|---|---|---| +| 1 | **Explicit late drop** | `LateDataPolicy::Drop` + `ts < watermark - allowed_lateness_ms` | Intended; use `ForwardToStore` to avoid | +| 2 | **Intra-batch lateness** | Within a single `process_samples` call, `current_wm` is set to the batch's max timestamp before pane routing; with `allowed_lateness_ms=0` every sample below the batch max is dropped | Set `allowed_lateness_ms` >= max timestamp spread within a producer batch | +| 3 | **Evicted pane + Drop** | Sample passes watermark check but its pane was already evicted (window closed); `Drop` policy discards it | Use `ForwardToStore` | +| 4 | **No matching config** | `matching_agg_configs` returns empty -- metric name in the series key does not match any config's `metric` or `spatial_filter`; worker silently returns `Ok(())` | No warning is logged. TODO: emit a metric or log at warn level for unmatched series | +| 5 | **Open panes on shutdown** | `flush_all` only emits windows already closed by the watermark; panes that are still open at shutdown are discarded | TODO (see below) | +| 6 | **Worker panic** | Tokio task dies; all series owned by that worker lose their pane state; subsequent sends log a warning and drop | TODO (see below) | + +### TODO: open-pane flush on shutdown + +`flush_all` currently only closes windows whose `end <= watermark`. On graceful shutdown it should optionally force-close all open panes by advancing each series watermark to `i64::MAX` (or to `current_wm + window_size_ms`) before the final flush. This would emit partial windows with whatever samples have accumulated, allowing downstream consumers to decide whether to use them. + +This behaviour should be opt-in (a `force_flush_on_shutdown: bool` config flag) because partial windows can be misleading for consumers that expect complete windows. + +### TODO: warn on unmatched series + +Case 4 is silent and hard to diagnose. The fix is a single warn-level log (rate-limited per series key) in `get_or_create_series_state` when `aggregations.is_empty()`, plus a Prometheus counter `precompute_unmatched_series_total`. + +### TODO: worker restart on panic + +Currently a panicked worker is never restarted. The engine should catch task failures (via `JoinHandle`) in the main `run()` loop and respawn the worker with a fresh receiver, re-routing future series to surviving workers (or to the replacement) in the meantime. In-flight pane state for the crashed worker is still lost -- full recovery would require the WAL approach below. + +### TODO: write-ahead log (WAL) for pane state + +Cases 5 and 6 both stem from the same root cause: pane accumulators exist only in memory. A WAL would persist each pane update (series key, aggregation id, pane start, serialized accumulator delta) to disk or an external log (e.g. Kafka) before acknowledging the ingest HTTP request. On restart, the worker replays the WAL to reconstruct open panes before resuming normal processing. + +Trade-offs: +- Serialization cost: accumulators must be serializable (all current types are, via `rmp-serde`) +- WAL volume: one entry per sample per matching aggregation config -- potentially high; batching per pane per flush interval reduces this significantly +- Recovery time: proportional to WAL size since last checkpoint +- Complexity: requires a checkpoint mechanism to bound recovery time and WAL size + +A lighter alternative: **periodic pane snapshots** written to disk at each flush interval. On restart, replay only samples received since the last snapshot. This bounds recovery time to `flush_interval_ms` worth of samples at the cost of snapshot I/O every flush cycle. -## 11. File Map +## 13. File Map | File | Purpose | |------|---------| @@ -562,6 +764,6 @@ store with the Kafka consumer path. | `precompute_engine/series_buffer.rs` | Per-series BTreeMap sample buffer | | `precompute_engine/window_manager.rs` | Tumbling/sliding window logic | | `precompute_engine/accumulator_factory.rs` | `AccumulatorUpdater` trait + factory | -| `precompute_engine/output_sink.rs` | `OutputSink` trait + Store/Noop impls | +| `precompute_engine/output_sink.rs` | `OutputSink` trait + `StoreOutputSink`, `NoopOutputSink`, `CapturingOutputSink` (testing) | | `bin/precompute_engine.rs` | Standalone CLI binary | | `bin/test_e2e_precompute.rs` | End-to-end integration test | diff --git a/asap-query-engine/src/precompute_engine/window_manager.rs b/asap-query-engine/src/precompute_engine/window_manager.rs index 841c182..b6d513b 100644 --- a/asap-query-engine/src/precompute_engine/window_manager.rs +++ b/asap-query-engine/src/precompute_engine/window_manager.rs @@ -1,5 +1,8 @@ -/// Manages tumbling window boundaries and detects which windows have closed -/// based on watermark advancement. +/// Manages tumbling and sliding window boundaries and detects which windows +/// have closed based on watermark advancement. +/// +/// Tumbling windows are a special case where `slide_interval == window_size`. +/// The same logic handles both — no separate code paths. pub struct WindowManager { /// Window size in milliseconds. window_size_ms: i64, diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index 93ac194..09a9739 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -571,6 +571,10 @@ fn parse_labels_from_series_key(series_key: &str) -> HashMap<&str, &str> { mod tests { use super::*; + use flate2::{write::GzEncoder, Compression}; + use serde_json::json; + use std::io::Write; + #[test] fn test_extract_metric_name() { assert_eq!( @@ -603,6 +607,652 @@ mod tests { assert!(labels.is_empty()); } + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + use crate::data_model::StreamingConfig; + use crate::precompute_engine::config::LateDataPolicy; + use crate::precompute_engine::output_sink::CapturingOutputSink; + use crate::precompute_operators::datasketches_kll_accumulator::DatasketchesKLLAccumulator; + use crate::precompute_operators::multiple_sum_accumulator::MultipleSumAccumulator; + use crate::precompute_operators::sum_accumulator::SumAccumulator; + use sketch_core::kll::KllSketch; + + fn make_agg_config( + id: u64, + metric: &str, + agg_type: &str, + agg_sub_type: &str, + window_secs: u64, + slide_secs: u64, + grouping: Vec<&str>, + ) -> AggregationConfig { + let window_type = if slide_secs == 0 || slide_secs == window_secs { + "tumbling" + } else { + "sliding" + }; + AggregationConfig::new( + id, + agg_type.to_string(), + agg_sub_type.to_string(), + HashMap::new(), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new( + grouping.iter().map(|s| s.to_string()).collect(), + ), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + String::new(), + window_secs, + slide_secs, + window_type.to_string(), + metric.to_string(), + metric.to_string(), + None, + None, + None, + None, + ) + } + + fn make_worker( + agg_configs: HashMap, + sink: Arc, + pass_raw: bool, + raw_agg_id: u64, + late_policy: LateDataPolicy, + ) -> Worker { + let (_tx, rx) = tokio::sync::mpsc::channel(1); + Worker::new( + 0, + rx, + sink, + agg_configs, + 10_000, // max_buffer_per_series + 0, // allowed_lateness_ms + pass_raw, + raw_agg_id, + late_policy, + ) + } + + // ----------------------------------------------------------------------- + // Test: raw mode — each sample forwarded as SumAccumulator with sum==value + // ----------------------------------------------------------------------- + + #[test] + fn test_raw_mode_forwarding() { + let sink = Arc::new(CapturingOutputSink::new()); + let mut worker = make_worker(HashMap::new(), sink.clone(), true, 99, LateDataPolicy::Drop); + + let samples = vec![(1000_i64, 1.5_f64), (2000, 2.5), (3000, 7.0)]; + worker + .process_samples("cpu{host=\"a\"}", samples.clone()) + .unwrap(); + + let captured = sink.drain(); + assert_eq!(captured.len(), 3, "should emit one output per raw sample"); + + for ((ts, val), (output, acc)) in samples.iter().zip(captured.iter()) { + assert_eq!(output.start_timestamp as i64, *ts, "start should equal ts"); + assert_eq!(output.end_timestamp as i64, *ts, "end should equal ts"); + assert_eq!(output.aggregation_id, 99); + let sum_acc = acc + .as_any() + .downcast_ref::() + .expect("should be SumAccumulator"); + assert!( + (sum_acc.sum - val).abs() < 1e-10, + "sum should equal sample value" + ); + } + } + + // ----------------------------------------------------------------------- + // Test: tumbling window — correct window boundaries and sum + // ----------------------------------------------------------------------- + + #[test] + fn test_tumbling_window_correctness() { + // 10s tumbling window + let config = make_agg_config(1, "cpu", "SingleSubpopulation", "Sum", 10, 0, vec![]); + let mut agg_configs = HashMap::new(); + agg_configs.insert(1, config); + + let sink = Arc::new(CapturingOutputSink::new()); + let mut worker = make_worker(agg_configs, sink.clone(), false, 0, LateDataPolicy::Drop); + + // Samples in window [0, 10000ms): sum should be 1+2+3=6. + // Send one at a time so the watermark advances incrementally — + // a batch's max-ts becomes the new watermark, and with + // allowed_lateness_ms=0 any ts < watermark in the same call is dropped. + worker + .process_samples("cpu", vec![(1000_i64, 1.0)]) + .unwrap(); + worker + .process_samples("cpu", vec![(5000_i64, 2.0)]) + .unwrap(); + worker + .process_samples("cpu", vec![(9000_i64, 3.0)]) + .unwrap(); + // No windows closed yet (watermark still below 10000) + assert_eq!(sink.len(), 0); + + // Sample at t=10000ms advances watermark to 10000, closing [0, 10000) + worker + .process_samples("cpu", vec![(10000_i64, 100.0)]) + .unwrap(); + + let captured = sink.drain(); + assert_eq!(captured.len(), 1, "exactly one window should close"); + + let (output, acc) = &captured[0]; + assert_eq!(output.aggregation_id, 1); + assert_eq!(output.start_timestamp, 0); + assert_eq!(output.end_timestamp, 10_000); + assert!( + output.key.is_none(), + "SingleSubpopulation should have no key" + ); + + let sum_acc = acc + .as_any() + .downcast_ref::() + .expect("should be SumAccumulator"); + assert!( + (sum_acc.sum - 6.0).abs() < 1e-10, + "sum should be 1+2+3=6, got {}", + sum_acc.sum + ); + } + + // ----------------------------------------------------------------------- + // Test: sliding window pane sharing — one sample, two window emits, same sum + // ----------------------------------------------------------------------- + + #[test] + fn test_sliding_window_pane_sharing() { + // 30s window, 10s slide → W=3 panes per window + let config = make_agg_config(2, "cpu", "SingleSubpopulation", "Sum", 30, 10, vec![]); + let mut agg_configs = HashMap::new(); + agg_configs.insert(2, config); + + let sink = Arc::new(CapturingOutputSink::new()); + let mut worker = make_worker(agg_configs, sink.clone(), false, 0, LateDataPolicy::Drop); + + // Sample at t=15000ms → goes to pane 10000ms + // previous_wm == i64::MIN → no windows close + worker + .process_samples("cpu", vec![(15_000_i64, 42.0)]) + .unwrap(); + assert_eq!(sink.len(), 0); + + // Sample at t=45000ms → advances watermark to 45000ms + // Closes windows [0, 30000) and [10000, 40000) + worker + .process_samples("cpu", vec![(45_000_i64, 0.0)]) + .unwrap(); + + let captured = sink.drain(); + // Both windows should emit — one from pane merge snapshot, one from take + // Window [0, 30000): panes [0, 10000, 20000]; pane 10000 snapshot → sum=42 + // Window [10000, 40000): panes [10000, 20000, 30000]; pane 10000 take → sum=42 + assert_eq!( + captured.len(), + 2, + "two windows containing the pane should emit" + ); + + let window_starts: Vec = captured.iter().map(|(o, _)| o.start_timestamp).collect(); + assert!(window_starts.contains(&0), "window [0, 30000) should emit"); + assert!( + window_starts.contains(&10_000), + "window [10000, 40000) should emit" + ); + + for (output, acc) in &captured { + let sum_acc = acc + .as_any() + .downcast_ref::() + .expect("should be SumAccumulator"); + assert!( + (sum_acc.sum - 42.0).abs() < 1e-10, + "window {:?} should have sum=42 via pane sharing, got {}", + output.start_timestamp, + sum_acc.sum + ); + } + } + + // ----------------------------------------------------------------------- + // Test: GROUP BY — two series on same worker produce separate accumulators + // ----------------------------------------------------------------------- + + #[test] + fn test_groupby_separate_emits_per_series() { + // MultipleSubpopulation Sum with grouping on "host" + // Two series on same worker → same window accumulator per-agg holds both keys + let config = make_agg_config( + 3, + "cpu", + "MultipleSubpopulation", + "Sum", + 10, + 0, + vec!["host"], + ); + let mut agg_configs = HashMap::new(); + agg_configs.insert(3, config); + + let sink = Arc::new(CapturingOutputSink::new()); + let mut worker = make_worker(agg_configs, sink.clone(), false, 0, LateDataPolicy::Drop); + + // Feed two series in the same window [0, 10000ms) + worker + .process_samples("cpu{host=\"A\"}", vec![(1000_i64, 10.0)]) + .unwrap(); + worker + .process_samples("cpu{host=\"B\"}", vec![(2000_i64, 20.0)]) + .unwrap(); + assert_eq!(sink.len(), 0, "no windows closed yet"); + + // Advance watermark to close [0, 10000) for series "A" + worker + .process_samples("cpu{host=\"A\"}", vec![(10_000_i64, 0.0)]) + .unwrap(); + // Also advance "B"'s watermark + worker + .process_samples("cpu{host=\"B\"}", vec![(10_000_i64, 0.0)]) + .unwrap(); + + let captured = sink.drain(); + // Each series has its own SeriesState and independent pane accumulators. + // The MultipleSubpopulation accumulator for each series records its own key. + // So we get 2 emits (one per series), each a MultipleSumAccumulator with a single key. + assert_eq!( + captured.len(), + 2, + "each series emits independently — no ingest-time merge" + ); + + // Verify the grouping keys are distinct + let mut found_a = false; + let mut found_b = false; + for (output, acc) in &captured { + assert_eq!(output.start_timestamp, 0); + assert_eq!(output.end_timestamp, 10_000); + let ms_acc = acc + .as_any() + .downcast_ref::() + .expect("should be MultipleSumAccumulator"); + for (key, &sum) in &ms_acc.sums { + if key.labels == vec!["A".to_string()] { + assert!((sum - 10.0).abs() < 1e-10); + found_a = true; + } + if key.labels == vec!["B".to_string()] { + assert!((sum - 20.0).abs() < 1e-10); + found_b = true; + } + } + } + assert!(found_a, "expected emit for host=A"); + assert!(found_b, "expected emit for host=B"); + } + + #[test] + fn test_arroyosketch_multiple_sum_matches_handcrafted_precompute_output() { + let config = make_agg_config(11, "cpu", "MultipleSum", "sum", 10, 0, vec!["host"]); + let mut agg_configs = HashMap::new(); + agg_configs.insert(11, config.clone()); + + let sink = Arc::new(CapturingOutputSink::new()); + let mut worker = make_worker( + agg_configs.clone(), + sink.clone(), + false, + 0, + LateDataPolicy::Drop, + ); + + worker + .process_samples("cpu{host=\"A\"}", vec![(1_000_i64, 1.0)]) + .unwrap(); + worker + .process_samples("cpu{host=\"A\"}", vec![(5_000_i64, 2.0)]) + .unwrap(); + worker + .process_samples("cpu{host=\"A\"}", vec![(9_000_i64, 3.0)]) + .unwrap(); + worker + .process_samples("cpu{host=\"A\"}", vec![(10_000_i64, 0.0)]) + .unwrap(); + + let captured = sink.drain(); + assert_eq!(captured.len(), 1, "expected one closed window output"); + + let (handcrafted_output, handcrafted_acc) = &captured[0]; + let handcrafted_acc = handcrafted_acc + .as_any() + .downcast_ref::() + .expect("hand-crafted engine should emit MultipleSumAccumulator"); + + let mut arroyo_sums = HashMap::new(); + arroyo_sums.insert("A".to_string(), 6.0); + let arroyo_precompute_bytes = + rmp_serde::to_vec(&arroyo_sums).expect("Arroyo MessagePack encoding should succeed"); + + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder + .write_all(&arroyo_precompute_bytes) + .expect("gzip encoding should succeed"); + let arroyo_json = json!({ + "aggregation_id": 11, + "window": { + "start": "1970-01-01T00:00:00", + "end": "1970-01-01T00:00:10" + }, + "key": "A", + "precompute": hex::encode(encoder.finish().expect("gzip finalize should succeed")) + }); + + let streaming_config = StreamingConfig::new(agg_configs); + let (arroyo_output, arroyo_acc) = + PrecomputedOutput::deserialize_from_json_arroyo(&arroyo_json, &streaming_config) + .expect("Arroyo precompute should deserialize"); + let arroyo_acc = arroyo_acc + .as_any() + .downcast_ref::() + .expect("Arroyo payload should deserialize to MultipleSumAccumulator"); + + assert_eq!( + handcrafted_output.aggregation_id, + arroyo_output.aggregation_id + ); + assert_eq!( + handcrafted_output.start_timestamp, + arroyo_output.start_timestamp + ); + assert_eq!( + handcrafted_output.end_timestamp, + arroyo_output.end_timestamp + ); + assert_eq!(handcrafted_output.key, arroyo_output.key); + assert_eq!(handcrafted_acc.sums, arroyo_acc.sums); + } + + #[test] + fn test_arroyosketch_kll_matches_handcrafted_precompute_output() { + let mut config = make_agg_config(12, "latency", "DatasketchesKLL", "", 10, 0, vec![]); + config + .parameters + .insert("K".to_string(), serde_json::Value::from(20_u64)); + + let mut agg_configs = HashMap::new(); + agg_configs.insert(12, config); + + let sink = Arc::new(CapturingOutputSink::new()); + let mut worker = make_worker( + agg_configs.clone(), + sink.clone(), + false, + 0, + LateDataPolicy::Drop, + ); + + let samples = vec![(1_000_i64, 10.0), (5_000_i64, 20.0), (9_000_i64, 30.0)]; + for &(ts, value) in &samples { + worker + .process_samples("latency", vec![(ts, value)]) + .unwrap(); + } + worker + .process_samples("latency", vec![(10_000_i64, 0.0)]) + .unwrap(); + + let captured = sink.drain(); + assert_eq!(captured.len(), 1, "expected one closed window output"); + + let (handcrafted_output, handcrafted_acc) = &captured[0]; + let handcrafted_acc = handcrafted_acc + .as_any() + .downcast_ref::() + .expect("hand-crafted engine should emit DatasketchesKLLAccumulator"); + + let arroyo_precompute_bytes = KllSketch::aggregate_kll(20, &[10.0, 20.0, 30.0]) + .expect("Arroyo KLL aggregation should produce bytes"); + + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder + .write_all(&arroyo_precompute_bytes) + .expect("gzip encoding should succeed"); + let arroyo_json = json!({ + "aggregation_id": 12, + "window": { + "start": "1970-01-01T00:00:00", + "end": "1970-01-01T00:00:10" + }, + "key": "", + "precompute": hex::encode(encoder.finish().expect("gzip finalize should succeed")) + }); + + let streaming_config = StreamingConfig::new(agg_configs); + let (arroyo_output, arroyo_acc) = + PrecomputedOutput::deserialize_from_json_arroyo(&arroyo_json, &streaming_config) + .expect("Arroyo KLL precompute should deserialize"); + let arroyo_acc = arroyo_acc + .as_any() + .downcast_ref::() + .expect("Arroyo payload should deserialize to DatasketchesKLLAccumulator"); + + assert_eq!( + handcrafted_output.aggregation_id, + arroyo_output.aggregation_id + ); + assert_eq!( + handcrafted_output.start_timestamp, + arroyo_output.start_timestamp + ); + assert_eq!( + handcrafted_output.end_timestamp, + arroyo_output.end_timestamp + ); + assert_eq!(handcrafted_output.key, None); + assert_eq!( + arroyo_output.key, + Some(KeyByLabelValues::new_with_labels(vec![String::new()])) + ); + assert_eq!(handcrafted_acc.inner.k, arroyo_acc.inner.k); + assert_eq!(handcrafted_acc.inner.count(), arroyo_acc.inner.count()); + + for quantile in [0.0, 0.5, 1.0] { + assert_eq!( + handcrafted_acc.get_quantile(quantile), + arroyo_acc.get_quantile(quantile) + ); + } + } + + // ----------------------------------------------------------------------- + // Test: late data drop — sample behind watermark - allowed_lateness not emitted + // ----------------------------------------------------------------------- + + #[test] + fn test_late_data_drop() { + let config = make_agg_config(4, "cpu", "SingleSubpopulation", "Sum", 10, 0, vec![]); + let mut agg_configs = HashMap::new(); + agg_configs.insert(4, config); + + let sink = Arc::new(CapturingOutputSink::new()); + // allowed_lateness_ms = 0 + let (_tx, rx) = tokio::sync::mpsc::channel(1); + let mut worker = Worker::new( + 0, + rx, + sink.clone(), + agg_configs, + 10_000, // max_buffer_per_series + 0, // allowed_lateness_ms + false, // pass_raw_samples + 0, // raw_mode_aggregation_id + LateDataPolicy::Drop, + ); + + // Establish watermark at t=20000ms (closes [0, 10000) and [10000, 20000)) + worker + .process_samples("cpu", vec![(20_000_i64, 1.0)]) + .unwrap(); + let _ = sink.drain(); // discard any earlier emissions + + // Send a late sample (ts=5000 is behind watermark=20000 with lateness=0) + worker + .process_samples("cpu", vec![(5_000_i64, 99.0)]) + .unwrap(); + + // No new emission should occur (late sample is dropped) + assert_eq!(sink.len(), 0, "late sample should be dropped, not emitted"); + } + + // ----------------------------------------------------------------------- + // Test: late data ForwardToStore — late sample emitted as mini-accumulator + // ----------------------------------------------------------------------- + + #[test] + fn test_late_data_forward_to_store() { + let config = make_agg_config(5, "cpu", "SingleSubpopulation", "Sum", 10, 0, vec![]); + let mut agg_configs = HashMap::new(); + agg_configs.insert(5, config); + + let sink = Arc::new(CapturingOutputSink::new()); + let (_tx, rx) = tokio::sync::mpsc::channel(1); + // allowed_lateness_ms = 15000 — large enough that ts=8000 passes the + // lateness filter (8000 >= 20000 - 15000 = 5000) while pane 0 is already + // evicted (window [0,10000) closed when watermark reached 20000). + let mut worker = Worker::new( + 0, + rx, + sink.clone(), + agg_configs, + 10_000, // max_buffer_per_series + 15_000, // allowed_lateness_ms + false, // pass_raw_samples + 0, // raw_mode_aggregation_id + LateDataPolicy::ForwardToStore, + ); + + // Seed pane 0, then advance watermark to 20000 (evicts pane 0) + worker.process_samples("cpu", vec![(500_i64, 1.0)]).unwrap(); + worker + .process_samples("cpu", vec![(20_000_i64, 0.0)]) + .unwrap(); + let _ = sink.drain(); // discard the [0,10000) window emit + + // Send a late sample for the evicted pane 0 (ts=8000 passes the + // lateness filter but pane 0 is gone → ForwardToStore path) + worker + .process_samples("cpu", vec![(8_000_i64, 55.0)]) + .unwrap(); + + let captured = sink.drain(); + assert_eq!( + captured.len(), + 1, + "ForwardToStore policy should emit the late sample" + ); + + let (output, acc) = &captured[0]; + assert_eq!(output.aggregation_id, 5); + // The late sample is emitted with the window it belongs to: pane_start=0, window=[0,10000) + assert_eq!(output.start_timestamp, 0); + assert_eq!(output.end_timestamp, 10_000); + + let sum_acc = acc + .as_any() + .downcast_ref::() + .expect("should be SumAccumulator"); + assert!( + (sum_acc.sum - 55.0).abs() < 1e-10, + "late sample sum should be 55.0, got {}", + sum_acc.sum + ); + } + + // ----------------------------------------------------------------------- + // Test: worker built from a parsed streaming_config YAML + // ----------------------------------------------------------------------- + + #[test] + fn test_worker_from_streaming_config_yaml() { + // A minimal streaming_config.yaml payload — the same format the Python + // controller writes to disk and the engine reads at startup. + let yaml = r#" +aggregations: +- aggregationId: 10 + aggregationType: SingleSubpopulation + aggregationSubType: Sum + labels: + grouping: [] + rollup: [] + aggregated: [] + metric: requests_total + parameters: {} + tumblingWindowSize: 10 + windowSize: 10 + windowType: tumbling + slideInterval: 0 + spatialFilter: '' +"#; + + let data: serde_yaml::Value = serde_yaml::from_str(yaml).expect("valid YAML"); + let streaming_config = + StreamingConfig::from_yaml_data(&data, None).expect("valid streaming config"); + + assert!( + streaming_config.contains(10), + "aggregation 10 should be present" + ); + + let agg_configs = streaming_config.get_all_aggregation_configs().clone(); + let sink = Arc::new(CapturingOutputSink::new()); + let mut worker = make_worker(agg_configs, sink.clone(), false, 0, LateDataPolicy::Drop); + + // Three samples inside window [0, 10_000ms) + worker + .process_samples("requests_total", vec![(1_000_i64, 3.0)]) + .unwrap(); + worker + .process_samples("requests_total", vec![(5_000_i64, 4.0)]) + .unwrap(); + worker + .process_samples("requests_total", vec![(9_000_i64, 5.0)]) + .unwrap(); + assert_eq!(sink.len(), 0, "window not yet closed"); + + // Advance watermark past window boundary to close [0, 10_000ms) + worker + .process_samples("requests_total", vec![(10_000_i64, 0.0)]) + .unwrap(); + + let captured = sink.drain(); + assert_eq!(captured.len(), 1, "exactly one window should close"); + + let (output, acc) = &captured[0]; + assert_eq!(output.aggregation_id, 10); + assert_eq!(output.start_timestamp, 0); + assert_eq!(output.end_timestamp, 10_000); + + let sum_acc = acc + .as_any() + .downcast_ref::() + .expect("should be SumAccumulator"); + assert!( + (sum_acc.sum - 12.0).abs() < 1e-10, + "sum should be 3+4+5=12, got {}", + sum_acc.sum + ); + } + #[test] fn test_extract_key_from_series() { let config = AggregationConfig::new( @@ -618,7 +1268,7 @@ mod tests { promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), String::new(), 60, - 30, + 0, "tumbling".to_string(), "http_requests_total".to_string(), "http_requests_total".to_string(), diff --git a/asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2 b/asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2 index 6da33fb..59d563b 100644 --- a/asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2 +++ b/asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2 @@ -3,7 +3,7 @@ rmp-serde = "1.1" serde = { version = "1.0", features = ["derive"] } twox-hash = "2.1.0" -sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust" } +sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust", rev = "440427438fdaf3ac2298b53ee148f9e12a64ffcc" } */ use arroyo_udf_plugin::udf; diff --git a/asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2 b/asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2 index 0c83694..a0d1c87 100644 --- a/asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2 +++ b/asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2 @@ -3,7 +3,7 @@ rmp-serde = "1.1" serde = { version = "1.0", features = ["derive"] } twox-hash = "2.1.0" -sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust" } +sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust", rev = "440427438fdaf3ac2298b53ee148f9e12a64ffcc" } */ use arroyo_udf_plugin::udf; diff --git a/asap-summary-ingest/templates/udfs/countminsketchwithheap_topk.rs.j2 b/asap-summary-ingest/templates/udfs/countminsketchwithheap_topk.rs.j2 index e789c02..73d814c 100644 --- a/asap-summary-ingest/templates/udfs/countminsketchwithheap_topk.rs.j2 +++ b/asap-summary-ingest/templates/udfs/countminsketchwithheap_topk.rs.j2 @@ -3,7 +3,7 @@ rmp-serde = "1.1" serde = { version = "1.0", features = ["derive"] } twox-hash = "2.1.0" -sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust" } +sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust", rev = "440427438fdaf3ac2298b53ee148f9e12a64ffcc" } */ use std::cmp::Ordering; diff --git a/asap-summary-ingest/templates/udfs/datasketcheskll_.rs.j2 b/asap-summary-ingest/templates/udfs/datasketcheskll_.rs.j2 index 5898c92..1b74982 100644 --- a/asap-summary-ingest/templates/udfs/datasketcheskll_.rs.j2 +++ b/asap-summary-ingest/templates/udfs/datasketcheskll_.rs.j2 @@ -1,6 +1,6 @@ /* [dependencies] -sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust" } +sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust", rev = "440427438fdaf3ac2298b53ee148f9e12a64ffcc" } arroyo-udf-plugin = "0.1" rmp-serde = "1.1" serde = { version = "1.0", features = ["derive"] } diff --git a/asap-summary-ingest/templates/udfs/hydrakll_.rs.j2 b/asap-summary-ingest/templates/udfs/hydrakll_.rs.j2 index 67c558d..10c51c6 100644 --- a/asap-summary-ingest/templates/udfs/hydrakll_.rs.j2 +++ b/asap-summary-ingest/templates/udfs/hydrakll_.rs.j2 @@ -1,6 +1,6 @@ /* [dependencies] -sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust" } +sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust", rev = "440427438fdaf3ac2298b53ee148f9e12a64ffcc" } arroyo-udf-plugin = "0.1" rmp-serde = "1.1" serde = { version = "1.0", features = ["derive"] }