diff --git a/asap-query-engine/src/engines/physical/conversion.rs b/asap-query-engine/src/engines/physical/conversion.rs index 9a4b3c2..26cde34 100644 --- a/asap-query-engine/src/engines/physical/conversion.rs +++ b/asap-query-engine/src/engines/physical/conversion.rs @@ -173,7 +173,7 @@ mod tests { use crate::precompute_operators::SumAccumulator; use crate::stores::traits::TimestampedBucket; - fn make_bucket(acc: Box) -> TimestampedBucket { + fn make_bucket(acc: Arc) -> TimestampedBucket { ((0, 0), acc) } @@ -184,13 +184,13 @@ mod tests { let key1 = KeyByLabelValues { labels: vec!["host-a".to_string()], }; - let acc1 = Box::new(SumAccumulator::with_sum(100.0)); + let acc1 = Arc::new(SumAccumulator::with_sum(100.0)) as Arc; store_result.insert(Some(key1), vec![make_bucket(acc1)]); let key2 = KeyByLabelValues { labels: vec!["host-b".to_string()], }; - let acc2 = Box::new(SumAccumulator::with_sum(200.0)); + let acc2 = Arc::new(SumAccumulator::with_sum(200.0)) as Arc; store_result.insert(Some(key2), vec![make_bucket(acc2)]); let label_names = vec!["host".to_string()]; @@ -207,7 +207,7 @@ mod tests { let key1 = KeyByLabelValues { labels: vec!["host-a".to_string(), "region-1".to_string()], }; - let acc1 = Box::new(SumAccumulator::with_sum(100.0)); + let acc1 = Arc::new(SumAccumulator::with_sum(100.0)) as Arc; store_result.insert(Some(key1), vec![make_bucket(acc1)]); let label_names = vec!["host".to_string(), "region".to_string()]; @@ -221,7 +221,7 @@ mod tests { fn test_store_result_to_record_batch_no_key() { let mut store_result: TimestampedBucketsMap = HashMap::new(); - let acc = Box::new(SumAccumulator::with_sum(500.0)); + let acc = Arc::new(SumAccumulator::with_sum(500.0)) as Arc; store_result.insert(None, vec![make_bucket(acc)]); let label_names: Vec = vec![]; @@ -293,7 +293,9 @@ mod tests { }; store_result.insert( Some(key), - vec![make_bucket(Box::new(SumAccumulator::with_sum(1.0)))], + vec![make_bucket( + Arc::new(SumAccumulator::with_sum(1.0)) as Arc + )], ); let label_names: Vec = vec!["l1", "l2", "l3", "l4", "l5"] .into_iter() @@ -312,7 +314,9 @@ mod tests { }; store_result.insert( Some(key), - vec![make_bucket(Box::new(SumAccumulator::with_sum(42.0)))], + vec![make_bucket( + Arc::new(SumAccumulator::with_sum(42.0)) as Arc + )], ); let label_names = vec!["host".to_string(), "region".to_string()]; let batch = store_result_to_record_batch(&store_result, &label_names).unwrap(); @@ -345,15 +349,15 @@ mod tests { vec![ ( (100, 200), - Box::new(SumAccumulator::with_sum(10.0)) as Box, + Arc::new(SumAccumulator::with_sum(10.0)) as Arc, ), ( (200, 300), - Box::new(SumAccumulator::with_sum(20.0)) as Box, + Arc::new(SumAccumulator::with_sum(20.0)) as Arc, ), ( (300, 400), - Box::new(SumAccumulator::with_sum(30.0)) as Box, + Arc::new(SumAccumulator::with_sum(30.0)) as Arc, ), ], ); @@ -426,13 +430,19 @@ mod tests { store_result.insert( Some(key1), vec![ - make_bucket(Box::new(SumAccumulator::with_sum(1.0))), - make_bucket(Box::new(SumAccumulator::with_sum(2.0))), + make_bucket( + Arc::new(SumAccumulator::with_sum(1.0)) as Arc + ), + make_bucket( + Arc::new(SumAccumulator::with_sum(2.0)) as Arc + ), ], ); store_result.insert( Some(key2), - vec![make_bucket(Box::new(SumAccumulator::with_sum(3.0)))], + vec![make_bucket( + Arc::new(SumAccumulator::with_sum(3.0)) as Arc + )], ); assert_eq!(count_store_result_rows(&store_result), 3); diff --git a/asap-query-engine/src/engines/simple_engine.rs b/asap-query-engine/src/engines/simple_engine.rs index 3339b47..be8ceba 100644 --- a/asap-query-engine/src/engines/simple_engine.rs +++ b/asap-query-engine/src/engines/simple_engine.rs @@ -743,7 +743,7 @@ impl SimpleEngine { } // Extract bucket from timestamped tuple let (_, bucket) = timestamped_buckets.into_iter().next().unwrap(); - (key, bucket) + (key, bucket.as_ref().clone_boxed_core()) }) .collect() } else { @@ -2811,9 +2811,9 @@ impl SimpleEngine { }; // Build lookup: bucket_start_timestamp -> bucket for O(1) access - let bucket_map: HashMap> = timestamped_buckets + let bucket_map: HashMap = timestamped_buckets .iter() - .map(|((start, _), bucket)| (*start, bucket)) + .map(|((start, _), bucket)| (*start, bucket.as_ref())) .collect(); debug!( diff --git a/asap-query-engine/src/stores/simple_map_store/INDEX_DESIGN.md b/asap-query-engine/src/stores/simple_map_store/INDEX_DESIGN.md new file mode 100644 index 0000000..594e6a9 --- /dev/null +++ b/asap-query-engine/src/stores/simple_map_store/INDEX_DESIGN.md @@ -0,0 +1,245 @@ +# SimpleStore Index Design + +## Overview + +`SimpleMapStore` uses an **epoch-partitioned columnar store** with label interning. The design applies six optimizations targeting the two most expensive paths: ingestion and range scan. + +| Opt | What | Where | +|-----|------|-------| +| 1 | Lazy `window_to_ids` index — built on first exact query, invalidated cheaply on insert | `MutableEpoch` | +| 2 | Offset-based index — stores `u32` column offsets, not `Arc` clones | `MutableEpoch::exact_query` | +| 3 | Monotonic ingest fast path — skip `HashSet` probe for consecutive same-window inserts | `MutableEpoch::insert` | +| 4 | Batch metadata hoisting — config lookup, label interning, timestamp update moved out of per-item loop | `global.rs`, `per_key.rs` | +| 5 | Columnar storage — three parallel arrays; range scan hot loop touches only `windows_col` | `MutableEpoch` | +| 6 | Pre-allocated epoch buffers — `with_capacity(prev_epoch.len())` on rotation | `maybe_rotate_epoch` | + +--- + +## Data Structures + +### Types (`common.rs`) + +```rust +pub type MetricID = u32; // compact interned label ID +pub type EpochID = u64; // monotonically increasing epoch counter +pub type TimestampRange = (u64, u64); // (start_timestamp, end_timestamp) +pub type MetricBucketMap = HashMap)>>; +``` + +### `InternTable` (`common.rs`) + +``` +InternTable { + label_to_id: HashMap, MetricID> + id_to_label: Vec> +} +``` + +- `intern(label)` — O(1) amortized via `HashMap::entry`; no double-hashing +- `resolve(id)` — O(1) indexed Vec lookup +- All internal maps key on `MetricID` (u32), never on full label strings + +### `MutableEpoch` (`common.rs`) + +Active epoch: append-only insert, O(1) amortized. + +``` +MutableEpoch { + // Columnar storage (Opt 5): three parallel arrays + windows_col: Vec + metric_ids_col: Vec + aggregates_col: Vec> + + // Distinct-window count for epoch rotation threshold + windows_set: HashSet + + // Monotonic ingest fast path (Opt 3) + last_window: Option + + // Lazy offset index (Opt 1 + 2): built on first exact_query, None after any insert + window_to_ids: Option>> + + // Epoch bounds for O(1) skip check (updated incrementally on insert) + min_start: Option + max_end: Option +} +``` + +**Insert** (`O(1)` amortized): +- Opt 3: if incoming window == `last_window`, skip `windows_set.insert` entirely +- Three `Vec::push` calls — no secondary index maintenance +- `window_to_ids = None` — single pointer-width write to invalidate the index + +**`seal()` → `SealedEpoch`** (`O(M log M)`, paid once at rotation): +- Zips the three columns into tuples, sorts by `(TimestampRange, MetricID)`, moves `Arc`s without cloning + +**`exact_query(&mut self)`** (`O(M)` first call after a write, `O(m)` cached): +- Opt 1 + 2: if `window_to_ids` is `None`, build it from `windows_col` in one pass storing `u32` offsets +- Cache is valid until the next `insert` + +**`range_query_into`** (`O(M)` mutable epoch): +- Opt 5: hot loop iterates only `windows_col`; aggregate pointer only chased on match + +### `SealedEpoch` (`common.rs`) + +Immutable epoch: flat sorted `Vec` for cache-friendly binary-search scans. + +``` +SealedEpoch { + entries: Vec<(TimestampRange, MetricID, Arc)> // sorted by (TR, MetricID) + min_start: Option + max_end: Option +} +``` + +**`range_query_into`** (`O(log N + k)`): `partition_point` to find start, linear scan until `tr.0 > end` + +**`exact_query`** (`O(log N + m)`): `partition_point` to find the window, linear scan while `tr == range` + +### Per-Key Store (`per_key.rs`) + +Each `aggregation_id` gets its own `StoreKeyData` behind a per-key `RwLock`: + +``` +DashMap>> + +StoreKeyData { + intern: InternTable + current_epoch: MutableEpoch // always present, accepts inserts + sealed_epochs: BTreeMap + current_epoch_id: EpochID + epoch_capacity: Option // None = unlimited + max_epochs: usize // default 4 + read_counts: Mutex> +} +``` + +`read_counts` is behind an inner `Mutex` so queries can hold the outer `RwLock::read` and still update counts. + +### Global Store (`global.rs`) + +Same per-key epoch structure, but all aggregation_ids share a single `Mutex`: + +``` +Mutex + +StoreData { + stores: HashMap + read_counts: HashMap> + metrics: HashSet +} + +PerKeyState { + intern: InternTable + current_epoch: MutableEpoch + sealed_epochs: BTreeMap + current_epoch_id: EpochID + epoch_capacity: Option + max_epochs: usize +} +``` + +No inner `Mutex` for `read_counts` — the outer `Mutex` already serializes all access. + +--- + +## Complexity + +### Variables + +| Symbol | Meaning | +|--------|---------| +| A | Distinct aggregation IDs | +| L | Distinct label combinations | +| N | Distinct time windows per epoch | +| E | Epochs retained (≤ `max_epochs`, default 4) | +| M | Total entries in an epoch (`windows_col.len()`) | +| k | Matched results or entries removed | +| m | Labels present in a specific time window | + +### Time Complexity + +| Operation | Time | Notes | +|-----------|------|-------| +| **Insert** | O(1) amortized | Three `Vec::push` + conditional `HashSet::insert` (skipped by Opt 3 on ordered ingest) | +| **Seal** | O(M log M) | Paid once at rotation; not on insert hot path | +| **Epoch rotation** | O(M log M + 1) | Seal current + drop oldest in O(1) | +| **Range query** (mutable epoch) | O(M) | Linear scan of `windows_col` only | +| **Range query** (sealed epoch) | O(log N + k) | Binary search + linear scan | +| **Range query** (full store) | O(M + E · (log N + k)) | One mutable scan + binary-search per sealed epoch | +| **Exact query** (first after write) | O(M) | Build `window_to_ids` from `windows_col` | +| **Exact query** (cached) | O(m) | HashMap lookup + `Arc::clone` per offset | +| **Exact query** (sealed epoch) | O(log N + m) | Binary search to window + linear scan | +| **ReadBased cleanup** | O(N + k · m) | Scan `read_counts` + targeted removal via `remove_windows` | +| **get_earliest_timestamp** | O(A) | DashMap iteration with AtomicU64 loads | + +### Space + +| Structure | Space | +|-----------|-------| +| `InternTable` | O(L) per agg_id | +| `MutableEpoch` columns | O(M) | +| `SealedEpoch` entries | O(M) per sealed epoch | +| `window_to_ids` (when built) | O(M) | +| `read_counts` | O(N) total | +| **Total** | **O(A · E · M)** where E ≤ `max_epochs` | + +--- + +## Query Mechanics + +### Range Query `[start, end]` + +1. Acquire **read lock** on `StoreKeyData` +2. Scan `current_epoch.range_query_into(start, end)` — O(M), touches only `windows_col` in hot loop +3. For each sealed epoch (newest first): + - Skip if `min_start > end || max_end < start` — O(1) bounds check + - `sealed_epoch.range_query_into(start, end)` — O(log N + k) binary search + scan +4. Resolve MetricIDs → labels via `InternTable` in one pass +5. Briefly acquire inner `Mutex` to update `read_counts` + +### Exact Query `(exact_start, exact_end)` + +1. Acquire **write lock** (needed to potentially build the lazy `window_to_ids` index) +2. Try `current_epoch.exact_query(range)` — builds/uses cached `window_to_ids` +3. If not found, iterate `sealed_epochs.values().rev()` calling `SealedEpoch::exact_query` +4. Return owned `Vec<(MetricID, Arc)>`, drop write lock +5. Re-acquire read lock to resolve MetricIDs → labels + +--- + +## Cleanup Policies + +### CircularBuffer + +Epoch-based eviction — O(1) amortized per insert: + +1. On first insert: set `epoch_capacity` from `num_aggregates_to_retain` +2. After each insert: call `maybe_rotate_epoch()` + - If `current_epoch.window_count() >= epoch_capacity`: seal current epoch, open new one with `with_capacity(hint)` (Opt 6) + - If `1 + sealed_epochs.len() > max_epochs`: pop oldest sealed epoch in O(1), purge its windows from `read_counts` + +### ReadBased + +Read-count triggered eviction: + +1. Scan `read_counts` for windows with `count >= threshold` +2. For each such window, call `MutableEpoch::remove_windows` or `SealedEpoch::remove_windows` +3. Drop any epochs that become empty + +### NoCleanup + +No eviction — data accumulates indefinitely. + +--- + +## Concurrency (Per-Key Store) + +| Operation | Lock | +|-----------|------| +| **Insert** | `RwLock::write` for the batch duration | +| **Range query** | `RwLock::read` → brief `Mutex::lock` on `read_counts` | +| **Exact query** | `RwLock::write` (lazy index build) → drop → `RwLock::read` for label resolution | +| **Cleanup** | Under existing write lock; `Mutex::get_mut()` bypasses inner lock | + +Multiple readers per `aggregation_id` run concurrently. Writers only block readers of the same `aggregation_id`. diff --git a/asap-query-engine/src/stores/simple_map_store/common.rs b/asap-query-engine/src/stores/simple_map_store/common.rs new file mode 100644 index 0000000..2ac3ff1 --- /dev/null +++ b/asap-query-engine/src/stores/simple_map_store/common.rs @@ -0,0 +1,375 @@ +use crate::data_model::{AggregateCore, KeyByLabelValues}; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +pub type MetricID = u32; +pub type EpochID = u64; +pub type TimestampRange = (u64, u64); +pub type MetricBucketMap = HashMap)>>; + +/// Assigns a compact MetricID (u32) to each unique label combination. +/// Label strings stored once; all internal maps use MetricID (O(1) key ops). +pub struct InternTable { + label_to_id: HashMap, MetricID>, + id_to_label: Vec>, +} + +impl InternTable { + pub fn new() -> Self { + Self { + label_to_id: HashMap::new(), + id_to_label: Vec::new(), + } + } + + /// Intern a label, assigning a new MetricID if first seen. + /// Uses HashMap::entry to avoid double-hashing. + pub fn intern(&mut self, label: Option) -> MetricID { + let next_id = self.id_to_label.len() as MetricID; + match self.label_to_id.entry(label) { + std::collections::hash_map::Entry::Occupied(e) => *e.get(), + std::collections::hash_map::Entry::Vacant(e) => { + self.id_to_label.push(e.key().clone()); + *e.insert(next_id) + } + } + } + + /// O(1) resolution by MetricID. + pub fn resolve(&self, id: MetricID) -> &Option { + &self.id_to_label[id as usize] + } + + /// Number of interned labels. + pub fn len(&self) -> usize { + self.id_to_label.len() + } +} + +/// Mutable (active) epoch: pure append-only insert, O(1) amortized. +/// +/// # Optimizations applied +/// +/// **Opt 5 — Columnar storage**: timestamps, MetricIDs, and aggregates are kept in three +/// separate parallel arrays instead of one array of tuples. The range-query hot loop only +/// scans `windows_col` (contiguous u64 pairs) and does not touch aggregate pointers unless a +/// window actually matches, cutting cache pressure significantly for sparse range queries. +/// +/// **Opt 1 + 2 — Lazy offset index**: `window_to_ids` is built on the *first* `exact_query` +/// after any write batch and stores u32 column offsets rather than Arc clones. Any `insert` +/// simply sets the field to `None` (one pointer-width write); there are no HashMap lookups, +/// no `HashSet::insert` calls for the index, and no atomic refcount bumps on the hot insert +/// path. The index is rebuilt in O(M) on demand from `windows_col` alone. +/// +/// **Opt 3 — Monotonic ingest fast path**: `last_window` tracks the most recently inserted +/// window. Consecutive inserts to the same window (multiple label combinations for one time +/// bucket — the common case in ordered TSDB ingestion) skip the `windows_set` HashSet probe +/// entirely. +/// +/// **Opt 6 — Pre-allocated column buffers**: `with_capacity(n)` reserves space upfront using +/// the previous epoch's entry count, avoiding Vec reallocation during the next epoch fill. +pub struct MutableEpoch { + // Columnar storage: three parallel arrays (Opt 5) + windows_col: Vec, + metric_ids_col: Vec, + aggregates_col: Vec>, + + // Distinct-window count for epoch rotation threshold + windows_set: HashSet, + + // Monotonic ingest fast path: skip HashSet probe for consecutive same-window inserts (Opt 3) + last_window: Option, + + // Lazy offset index: built on first exact_query, invalidated on any insert (Opt 1 + 2). + // Stores column indices (u32) instead of Arc clones — zero atomic ops during insert. + window_to_ids: Option>>, + + /// Epoch time bounds for O(1) skip check, updated incrementally on insert. + min_start: Option, + max_end: Option, +} + +impl MutableEpoch { + pub fn new() -> Self { + Self::with_capacity(0) + } + + /// Pre-allocate column buffers with a capacity hint (Opt 6). + /// Pass the previous epoch's `len()` to avoid reallocation during the next epoch fill. + pub fn with_capacity(cap: usize) -> Self { + Self { + windows_col: Vec::with_capacity(cap), + metric_ids_col: Vec::with_capacity(cap), + aggregates_col: Vec::with_capacity(cap), + windows_set: HashSet::new(), + last_window: None, + window_to_ids: None, + min_start: None, + max_end: None, + } + } + + pub fn window_count(&self) -> usize { + self.windows_set.len() + } + + /// Total raw entries across all windows and labels. + pub fn len(&self) -> usize { + self.windows_col.len() + } + + /// Returns `(min_start, max_end)` across all windows, or `None` if empty. + /// Used for the epoch-skip check: `min_start > end || max_end < start`. + pub fn time_bounds(&self) -> Option<(u64, u64)> { + match (self.min_start, self.max_end) { + (Some(s), Some(e)) => Some((s, e)), + _ => None, + } + } + + /// O(1) amortized insert: three column pushes + conditional HashSet insert + bounds update. + /// + /// No secondary-index maintenance and no Arc clone for any index. The lazy `window_to_ids` + /// is invalidated by setting it to `None` — a single pointer-width write with no HashMap or + /// HashSet work. + pub fn insert( + &mut self, + metric_id: MetricID, + range: TimestampRange, + agg: Arc, + ) { + // Opt 3: skip HashSet probe when the incoming window equals the last inserted window. + // Multiple label combinations arriving for the same time bucket (the common ordered- + // ingest pattern) cost zero HashSet operations after the first. + if self.last_window != Some(range) { + self.windows_set.insert(range); + self.last_window = Some(range); + } + + // Opt 5: columnar append — no secondary index, no Arc clone + self.windows_col.push(range); + self.metric_ids_col.push(metric_id); + self.aggregates_col.push(agg); + + // Opt 1: invalidate lazy index at zero cost + self.window_to_ids = None; + + self.min_start = Some(self.min_start.map_or(range.0, |m| m.min(range.0))); + self.max_end = Some(self.max_end.map_or(range.1, |m| m.max(range.1))); + } + + /// Consume this epoch and produce an immutable SealedEpoch by sorting in-place. + /// Zips the three columns into tuples and sorts — moves Arcs without cloning. + /// O(M log M) paid once at rotation time, not at query time. + pub fn seal(self) -> SealedEpoch { + let min_start = self.min_start; + let max_end = self.max_end; + let mut entries: Vec<(TimestampRange, MetricID, Arc)> = self + .windows_col + .into_iter() + .zip(self.metric_ids_col) + .zip(self.aggregates_col) + .map(|((tr, mid), agg)| (tr, mid, agg)) + .collect(); + entries.sort_unstable_by_key(|(tr, metric_id, _)| (*tr, *metric_id)); + // Count distinct windows in the sorted entries (consecutive dupes are adjacent). + let distinct_window_count = entries.windows(2).filter(|w| w[0].0 != w[1].0).count() + + if entries.is_empty() { 0 } else { 1 }; + SealedEpoch { + entries, + min_start, + max_end, + distinct_window_count, + } + } + + /// Opt 5: scans only `windows_col` for time-range filtering — cache-friendly because + /// only contiguous TimestampRange values are touched in the hot loop. Aggregate pointers + /// are chased only for entries that actually match the range. + /// O(M) where M ≤ epoch_capacity × labels_per_window. + pub fn range_query_into( + &self, + start: u64, + end: u64, + out: &mut MetricBucketMap, + matched_windows: &mut Vec, + ) { + for (i, &tr) in self.windows_col.iter().enumerate() { + if tr.0 < start || tr.0 > end || tr.1 > end { + continue; + } + let metric_id = self.metric_ids_col[i]; + out.entry(metric_id) + .or_default() + .push((tr, Arc::clone(&self.aggregates_col[i]))); + matched_windows.push(tr); + } + } + + /// Opt 1 + 2: lazy exact match — O(m) after the index is built, O(M) to build once. + /// + /// The offset index (`HashMap>`) is constructed from `windows_col` + /// on the first call after any write batch, then cached. Building it scans `windows_col` + /// once with no Arc clones (only integer offsets are stored). The index remains valid + /// until the next `insert`, which sets `window_to_ids = None`. + /// + /// Takes `&mut self` because building the index mutates `window_to_ids`. + /// Callers must hold exclusive (write) access to the containing epoch. + pub fn exact_query( + &mut self, + range: TimestampRange, + ) -> Option)>> { + if self.window_to_ids.is_none() { + let mut idx: HashMap> = + HashMap::with_capacity(self.windows_set.len()); + for (i, &tr) in self.windows_col.iter().enumerate() { + idx.entry(tr).or_default().push(i as u32); + } + self.window_to_ids = Some(idx); + } + let offsets = self.window_to_ids.as_ref().unwrap().get(&range)?; + Some( + offsets + .iter() + .map(|&i| { + let i = i as usize; + (self.metric_ids_col[i], Arc::clone(&self.aggregates_col[i])) + }) + .collect(), + ) + } + + /// Remove specific windows (ReadBased cleanup). + /// Drains all three columns in lockstep — moves Arcs without cloning. + pub fn remove_windows(&mut self, windows: &[TimestampRange]) { + let window_set: HashSet = windows.iter().copied().collect(); + + let old_windows = std::mem::take(&mut self.windows_col); + let old_metrics = std::mem::take(&mut self.metric_ids_col); + let old_aggs = std::mem::take(&mut self.aggregates_col); + + for ((tr, mid), agg) in old_windows.into_iter().zip(old_metrics).zip(old_aggs) { + if !window_set.contains(&tr) { + self.windows_col.push(tr); + self.metric_ids_col.push(mid); + self.aggregates_col.push(agg); + } + } + + for window in windows { + self.windows_set.remove(window); + } + + // Invalidate lazy index and monotonic fast-path hint. + self.window_to_ids = None; + self.last_window = None; + + // Recompute bounds (cleanup is rare; linear scan is fine). + self.min_start = self.windows_col.iter().map(|tr| tr.0).min(); + self.max_end = self.windows_col.iter().map(|tr| tr.1).max(); + } +} + +/// Sealed (immutable) epoch: flat sorted `Vec` for cache-friendly range scans. +/// +/// Produced by `MutableEpoch::seal()`. Entries are sorted by `(TimestampRange, MetricID)`: +/// all entries for the same window are contiguous, which is cache-friendly for both +/// range queries (binary-search start + linear scan) and exact queries. +pub struct SealedEpoch { + /// Sorted by (TimestampRange, MetricID). + pub entries: Vec<(TimestampRange, MetricID, Arc)>, + /// Precomputed for O(1) epoch-skip check. + pub min_start: Option, + pub max_end: Option, + /// Number of distinct time windows in this epoch — O(1) read. + distinct_window_count: usize, +} + +impl SealedEpoch { + pub fn is_empty(&self) -> bool { + self.entries.is_empty() + } + + /// O(1) count of distinct time windows in this epoch. + pub fn distinct_window_count(&self) -> usize { + self.distinct_window_count + } + + /// Returns `(min_start, max_end)`, or `None` if empty. + pub fn time_bounds(&self) -> Option<(u64, u64)> { + match (self.min_start, self.max_end) { + (Some(s), Some(e)) => Some((s, e)), + _ => None, + } + } + + /// Binary-search start + linear scan — O(log N + actual_matches), cache-friendly. + pub fn range_query_into( + &self, + start: u64, + end: u64, + out: &mut MetricBucketMap, + matched_windows: &mut Vec, + ) { + let start_pos = self.entries.partition_point(|(tr, _, _)| tr.0 < start); + for (tr, metric_id, agg) in &self.entries[start_pos..] { + if tr.0 > end { + break; + } + if tr.1 > end { + continue; + } + out.entry(*metric_id) + .or_default() + .push((*tr, Arc::clone(agg))); + matched_windows.push(*tr); + } + } + + /// Binary-search exact window match — O(log N + m) where m = labels in that window. + pub fn exact_query( + &self, + range: TimestampRange, + ) -> Option)>> { + let start_pos = self.entries.partition_point(|(tr, _, _)| *tr < range); + let mut out = Vec::new(); + for (tr, metric_id, agg) in &self.entries[start_pos..] { + if *tr != range { + break; + } + out.push((*metric_id, Arc::clone(agg))); + } + if out.is_empty() { + None + } else { + Some(out) + } + } + + /// Remove specific windows (ReadBased / CircularBuffer cleanup). Rebuilds Vec in one pass. + /// Also updates `distinct_window_count`. + pub fn remove_windows(&mut self, windows: &[TimestampRange]) { + let window_set: HashSet = windows.iter().copied().collect(); + self.entries.retain(|(tr, _, _)| !window_set.contains(tr)); + self.min_start = self.entries.iter().map(|(tr, _, _)| tr.0).min(); + self.max_end = self.entries.iter().map(|(tr, _, _)| tr.1).max(); + // Recount distinct windows (entries remain sorted; dedup in one pass). + let mut count = 0usize; + let mut last: Option = None; + for (tr, _, _) in &self.entries { + if last != Some(*tr) { + count += 1; + last = Some(*tr); + } + } + self.distinct_window_count = count; + } + + /// Deduplicated windows (entries sorted, so consecutive dupes are adjacent). + /// Used to purge `read_counts` when this epoch is dropped. + pub fn unique_windows(&self) -> Vec { + let mut windows: Vec = self.entries.iter().map(|(tr, _, _)| *tr).collect(); + windows.dedup(); + windows + } +} diff --git a/asap-query-engine/src/stores/simple_map_store/global.rs b/asap-query-engine/src/stores/simple_map_store/global.rs new file mode 100644 index 0000000..37ca9a6 --- /dev/null +++ b/asap-query-engine/src/stores/simple_map_store/global.rs @@ -0,0 +1,648 @@ +use crate::data_model::{AggregateCore, CleanupPolicy, PrecomputedOutput, StreamingConfig}; +use crate::stores::simple_map_store::common::{ + EpochID, InternTable, MetricBucketMap, MutableEpoch, SealedEpoch, TimestampRange, +}; +use crate::stores::{Store, StoreResult, TimestampedBucketsMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::sync::Arc; +use std::sync::Mutex; +use std::time::Instant; +use tracing::{debug, error, info}; + +type StoreKey = u64; // aggregation_id + +/// Per-aggregation_id state within the global store +struct PerKeyState { + /// Label interning table (Optimization 1) + intern: InternTable, + + /// Active epoch — always present, accepts inserts. + current_epoch: MutableEpoch, + + /// Sealed (immutable) epochs stored as flat sorted Vecs (Optimization 2). + sealed_epochs: BTreeMap, + + /// Monotonically increasing ID of the current epoch. + current_epoch_id: EpochID, + + /// Max distinct time-windows per epoch before sealing. + /// None = unlimited (set on first insert from num_aggregates_to_retain). + epoch_capacity: Option, + + /// Max total epochs (1 current + sealed) to retain. + max_epochs: usize, +} + +impl PerKeyState { + fn new() -> Self { + Self { + intern: InternTable::new(), + current_epoch: MutableEpoch::new(), + sealed_epochs: BTreeMap::new(), + current_epoch_id: 0, + epoch_capacity: None, + max_epochs: 4, + } + } + + /// Set epoch_capacity on first insert (no-op after first call). + fn configure_epochs(&mut self, num_aggregates_to_retain: Option) { + if self.epoch_capacity.is_none() { + if let Some(cap) = num_aggregates_to_retain { + self.epoch_capacity = Some(cap as usize); + } + } + } + + /// Seal the current epoch when full, then evict the minimum number of oldest windows + /// to keep total distinct windows ≤ `epoch_capacity * max_epochs`. + /// Returns the evicted windows so the caller can clean up `read_counts`. + fn maybe_rotate_epoch(&mut self) -> Vec { + let capacity = match self.epoch_capacity { + Some(c) if c > 0 => c, + _ => return Vec::new(), // unlimited + }; + let retention_limit = capacity * self.max_epochs; + + // Step 1: seal current epoch if it has hit the window capacity threshold. + if self.current_epoch.window_count() >= capacity { + let hint = self.current_epoch.len(); + let old = std::mem::replace(&mut self.current_epoch, MutableEpoch::with_capacity(hint)); + self.sealed_epochs.insert(self.current_epoch_id, old.seal()); + self.current_epoch_id += 1; + } + + // Step 2: evict oldest windows until total distinct windows ≤ retention_limit. + let total: usize = self.current_epoch.window_count() + + self + .sealed_epochs + .values() + .map(|e| e.distinct_window_count()) + .sum::(); + + if total <= retention_limit { + return Vec::new(); + } + let mut over = total - retention_limit; + let mut evicted = Vec::new(); + + while over > 0 { + let oldest_id = match self.sealed_epochs.keys().next().copied() { + Some(id) => id, + None => break, + }; + let oldest_windows = self.sealed_epochs[&oldest_id].unique_windows(); + let n_evict = over.min(oldest_windows.len()); + let to_remove = oldest_windows[..n_evict].to_vec(); + over -= n_evict; + evicted.extend_from_slice(&to_remove); + + if n_evict == oldest_windows.len() { + self.sealed_epochs.remove(&oldest_id); + } else { + self.sealed_epochs + .get_mut(&oldest_id) + .unwrap() + .remove_windows(&to_remove); + } + } + evicted + } +} + +struct StoreData { + /// Per-aggregation_id state (replaces old nested HashMap) + stores: HashMap, + + /// Track metrics that have been created + metrics: HashSet, + + /// Count items inserted per metric for logging + items_inserted: HashMap, + + /// Track earliest timestamp per aggregation ID + earliest_timestamp_per_aggregation_id: HashMap, + + /// Track how many times each aggregate window has been read (per store key) + /// No inner Mutex needed — outer Mutex serializes everything. + read_counts: HashMap>, +} + +/// In-memory storage implementation using single mutex (like Python version) +pub struct SimpleMapStoreGlobal { + // Single global mutex protecting all data structures + lock: Mutex, + + // Store the streaming configuration + streaming_config: Arc, + + // Policy for cleaning up old aggregates + cleanup_policy: CleanupPolicy, +} + +impl SimpleMapStoreGlobal { + pub fn new(streaming_config: Arc, cleanup_policy: CleanupPolicy) -> Self { + Self { + lock: Mutex::new(StoreData { + stores: HashMap::new(), + metrics: HashSet::new(), + items_inserted: HashMap::new(), + earliest_timestamp_per_aggregation_id: HashMap::new(), + read_counts: HashMap::new(), + }), + streaming_config, + cleanup_policy, + } + } +} + +/// Extracted config fields needed inside the locked batch loop. +type GroupedBatch = HashMap< + StoreKey, + ( + BatchConfig, + u64, + Vec<(PrecomputedOutput, Box)>, + ), +>; + +/// Pre-computed outside the lock to avoid per-item config lookups (Opt 4). +struct BatchConfig { + metric: String, + is_delta: bool, + num_aggregates_to_retain: Option, + read_count_threshold: Option, +} + +#[async_trait::async_trait] +impl Store for SimpleMapStoreGlobal { + fn insert_precomputed_output( + &self, + output: PrecomputedOutput, + precompute: Box, + ) -> StoreResult<()> { + self.insert_precomputed_output_batch(vec![(output, precompute)]) + } + + fn insert_precomputed_output_batch( + &self, + outputs: Vec<(PrecomputedOutput, Box)>, + ) -> StoreResult<()> { + let batch_insert_start_time = Instant::now(); + let batch_size = outputs.len(); + + // Opt 4: Pre-group by aggregation_id and resolve config BEFORE acquiring the lock. + // Config lookups (streaming_config HashMap access) are moved out of the hot locked + // loop: each unique aggregation_id pays one lookup regardless of batch size. + // Also pre-compute batch_min_ts per group to collapse N earliest-ts updates into 1. + let mut grouped: GroupedBatch = HashMap::new(); + + for (output, precompute) in outputs { + let aggregation_config = self + .streaming_config + .get_aggregation_config(output.aggregation_id); + + if aggregation_config.is_none() { + error!( + "Aggregation config not found for aggregation_id {}. Skipping insert.", + output.aggregation_id + ); + continue; + } + let aggregation_config = aggregation_config.unwrap(); + + let store_key = output.aggregation_id; + let ts = output.start_timestamp; + + let entry = grouped.entry(store_key).or_insert_with(|| { + ( + BatchConfig { + metric: aggregation_config.metric.clone(), + is_delta: aggregation_config.aggregation_type == "DeltaSetAggregator", + num_aggregates_to_retain: aggregation_config.num_aggregates_to_retain, + read_count_threshold: aggregation_config.read_count_threshold, + }, + u64::MAX, + Vec::new(), + ) + }); + // Track batch minimum timestamp for earliest-ts update (Opt 4) + entry.1 = entry.1.min(ts); + entry.2.push((output, precompute)); + } + + // Measure lock acquisition time + #[cfg(feature = "lock_profiling")] + let lock_wait_start = Instant::now(); + + let mut data = self.lock.lock().unwrap(); + + #[cfg(feature = "lock_profiling")] + { + let lock_wait_duration = lock_wait_start.elapsed(); + info!( + "🔒 Insert lock wait time: {:.2}ms (batch_size: {})", + lock_wait_duration.as_secs_f64() * 1000.0, + batch_size + ); + } + + #[cfg(feature = "lock_profiling")] + let lock_hold_start = Instant::now(); + + for (store_key, (cfg, batch_min_ts, items)) in grouped { + // Opt 4: one metrics insert per group (was one per item) + data.metrics.insert(cfg.metric.clone()); + + // Opt 4: one earliest-ts update per group using the pre-computed batch minimum + let entry = data + .earliest_timestamp_per_aggregation_id + .entry(store_key) + .or_insert(batch_min_ts); + *entry = (*entry).min(batch_min_ts); + + let batch_len = items.len() as u64; + + // Ensure PerKeyState exists and configure epoch capacity once per group (Opt 4). + // configure_epochs is a no-op after the first call, so calling it once here + // avoids the is_none() check on every inner iteration. + { + let per_key = data + .stores + .entry(store_key) + .or_insert_with(PerKeyState::new); + if !cfg.is_delta { + per_key.configure_epochs(cfg.num_aggregates_to_retain); + } + } // per_key borrow ends here + + for (output, precompute) in items { + // Get per_key fresh each iteration so the borrow of data.stores ends before + // the cleanup branches borrow data.read_counts (different field — NLL splits + // them, but only if the per_key borrow scope is confined to each iteration). + let per_key = data.stores.get_mut(&store_key).unwrap(); + + // Intern the label key (Optimization 1) + let timestamp_range = (output.start_timestamp, output.end_timestamp); + let metric_id = per_key.intern.intern(output.key); + + // Insert into current (mutable) epoch. + per_key + .current_epoch + .insert(metric_id, timestamp_range, Arc::from(precompute)); + + // Apply retention policy if configured (but exclude DeltaSetAggregator). + // per_key is last used above; NLL ends its borrow so data.read_counts can + // be accessed in the cleanup branches below. + if !cfg.is_delta { + match self.cleanup_policy { + CleanupPolicy::CircularBuffer => { + let dropped_windows = data + .stores + .get_mut(&store_key) + .unwrap() + .maybe_rotate_epoch(); + if !dropped_windows.is_empty() { + if let Some(rc_map) = data.read_counts.get_mut(&store_key) { + for window in &dropped_windows { + rc_map.remove(window); + } + } + for window in &dropped_windows { + debug!( + "Removed old aggregate for {} aggregation_id {} window {}-{} (epoch rotation)", + cfg.metric, store_key, window.0, window.1 + ); + } + } + } + CleanupPolicy::ReadBased => { + if let Some(threshold) = cfg.read_count_threshold { + let rc_map = data.read_counts.entry(store_key).or_default(); + let windows_to_remove: Vec = rc_map + .iter() + .filter(|(_, &count)| count >= threshold) + .map(|(range, _)| *range) + .collect(); + + if !windows_to_remove.is_empty() { + for window in &windows_to_remove { + debug!( + "Removed aggregate for {} aggregation_id {} window {}-{} (read_count >= threshold: {})", + cfg.metric, store_key, window.0, window.1, threshold + ); + rc_map.remove(window); + } + + let per_key = data.stores.get_mut(&store_key).unwrap(); + per_key.current_epoch.remove_windows(&windows_to_remove); + per_key.sealed_epochs.retain(|_, epoch| { + epoch.remove_windows(&windows_to_remove); + !epoch.is_empty() + }); + } + } + } + CleanupPolicy::NoCleanup => {} + } + } + } + + // Opt 4: one count update per group (was one per item) + let current_count = data.items_inserted.entry(cfg.metric.clone()).or_insert(0); + let old_count = *current_count; + *current_count += batch_len; + if *current_count / 1000 > old_count / 1000 { + debug!("Inserted {} items into {}", current_count, cfg.metric); + } + } + + #[cfg(feature = "lock_profiling")] + { + let lock_hold_duration = lock_hold_start.elapsed(); + info!( + "🔓 Insert lock hold time: {:.2}ms (batch_size: {})", + lock_hold_duration.as_secs_f64() * 1000.0, + batch_size + ); + } + + let batch_insert_duration = batch_insert_start_time.elapsed(); + debug!( + "Batch insert of {} items took: {:.2}ms", + batch_size, + batch_insert_duration.as_secs_f64() * 1000.0 + ); + Ok(()) + } + + fn query_precomputed_output( + &self, + metric: &str, + aggregation_id: u64, + start: u64, + end: u64, + ) -> Result> { + if start > end { + debug!( + "Invalid query range for metric {} agg_id {}: start {} > end {}", + metric, aggregation_id, start, end + ); + return Ok(HashMap::new()); + } + + let query_start_time = Instant::now(); + let store_key = aggregation_id; + + // Measure lock acquisition time + #[cfg(feature = "lock_profiling")] + let lock_wait_start = Instant::now(); + + // Single lock for entire query + let mut data = self.lock.lock().unwrap(); + + #[cfg(feature = "lock_profiling")] + { + let lock_wait_duration = lock_wait_start.elapsed(); + info!( + "🔒 Query lock wait time: {:.2}ms (metric: {}, agg_id: {})", + lock_wait_duration.as_secs_f64() * 1000.0, + metric, + aggregation_id + ); + } + + #[cfg(feature = "lock_profiling")] + let lock_hold_start = Instant::now(); + + let mut total_entries = 0; + let mut matched_windows: Vec = Vec::new(); + + let range_scan_start_time = Instant::now(); + + let mut mid: MetricBucketMap = { + let per_key = match data.stores.get(&store_key) { + Some(pk) => pk, + None => { + info!("Metric {} not found in store", metric); + return Ok(HashMap::new()); + } + }; + + let mut mid: MetricBucketMap = HashMap::with_capacity(per_key.intern.len()); + + // Query current (mutable) epoch. + if let Some((min_start, max_end)) = per_key.current_epoch.time_bounds() { + if !(min_start > end || max_end < start) { + per_key.current_epoch.range_query_into( + start, + end, + &mut mid, + &mut matched_windows, + ); + } + } + + // Query sealed epochs; skip those with no overlap. + for epoch in per_key.sealed_epochs.values() { + let Some((min_start, max_end)) = epoch.time_bounds() else { + continue; + }; + if min_start > end || max_end < start { + continue; + } + epoch.range_query_into(start, end, &mut mid, &mut matched_windows); + } + + mid + }; + + // Resolve MetricIDs → labels in a single pass (scope ends before read_counts borrow) + let results: TimestampedBucketsMap = { + let per_key = data.stores.get(&store_key).unwrap(); + let mut r = HashMap::with_capacity(mid.len()); + for (metric_id, buckets) in mid.drain() { + total_entries += buckets.len(); + let label = per_key.intern.resolve(metric_id).clone(); + r.insert(label, buckets); + } + r + }; + + // Update read counts (outer Mutex already held — no inner Mutex needed) + let rc_map = data.read_counts.entry(store_key).or_default(); + for window in &matched_windows { + *rc_map.entry(*window).or_insert(0) += 1; + } + + let range_scan_duration = range_scan_start_time.elapsed(); + debug!( + "Range scanning took: {:.2}ms", + range_scan_duration.as_secs_f64() * 1000.0 + ); + + let query_duration = query_start_time.elapsed(); + debug!( + "Total query took: {:.2}ms", + query_duration.as_secs_f64() * 1000.0 + ); + + debug!( + "Found {} entries for query on {} (aggregation_id: {}, start: {}, end: {})", + total_entries, metric, aggregation_id, start, end + ); + debug!("Found {} unique keys", results.len()); + + #[cfg(feature = "lock_profiling")] + { + let lock_hold_duration = lock_hold_start.elapsed(); + info!( + "🔓 Query lock hold time: {:.2}ms (metric: {}, agg_id: {}, entries: {})", + lock_hold_duration.as_secs_f64() * 1000.0, + metric, + aggregation_id, + total_entries + ); + } + + Ok(results) + } + + fn query_precomputed_output_exact( + &self, + metric: &str, + aggregation_id: u64, + exact_start: u64, + exact_end: u64, + ) -> Result> { + if exact_start > exact_end { + debug!( + "Invalid exact query range for metric {} agg_id {}: start {} > end {}", + metric, aggregation_id, exact_start, exact_end + ); + return Ok(HashMap::new()); + } + + let query_start_time = Instant::now(); + let store_key = aggregation_id; + + // Measure lock acquisition time + #[cfg(feature = "lock_profiling")] + let lock_wait_start = Instant::now(); + + let mut data = self.lock.lock().unwrap(); + + #[cfg(feature = "lock_profiling")] + { + let lock_wait_duration = lock_wait_start.elapsed(); + info!( + "🔒 Exact query lock wait time: {:.2}ms (metric: {}, agg_id: {})", + lock_wait_duration.as_secs_f64() * 1000.0, + metric, + aggregation_id + ); + } + + #[cfg(feature = "lock_profiling")] + let lock_hold_start = Instant::now(); + + let timestamp_range = (exact_start, exact_end); + + // Opt 1: exact_query now takes &mut self (lazy index build). + // Call it inside a scoped block so the &mut borrow on data.stores ends before we + // re-borrow data.stores immutably to resolve MetricIDs → labels. + let entries_opt: Option> = { + let per_key = match data.stores.get_mut(&store_key) { + Some(pk) => pk, + None => { + debug!("Metric {} not found in store for exact query", metric); + return Ok(HashMap::new()); + } + }; + // Check current epoch first (newest). exact_query returns an owned Vec so the + // &mut borrow of per_key ends immediately — no lifetime overlap with the + // sealed_epochs scan below. + per_key + .current_epoch + .exact_query(timestamp_range) + .or_else(|| { + per_key + .sealed_epochs + .values() + .rev() + .find_map(|epoch| epoch.exact_query(timestamp_range)) + }) + }; // &mut borrow of data.stores ends here + + let mut results: TimestampedBucketsMap = HashMap::new(); + let mut total_entries = 0; + let found_match = entries_opt.is_some(); + + if let Some(entries) = entries_opt { + let per_key = data.stores.get(&store_key).unwrap(); + for (metric_id, agg) in entries { + let label = per_key.intern.resolve(metric_id).clone(); + results + .entry(label) + .or_default() + .push((timestamp_range, agg)); + total_entries += 1; + } + } + + if found_match { + debug!( + "Exact match FOUND for [{}, {}]: {} entries across {} keys", + exact_start, + exact_end, + total_entries, + results.len() + ); + } else { + debug!( + "Exact match NOT FOUND for metric: {}, agg_id: {}, range: [{}, {}]", + metric, aggregation_id, exact_start, exact_end + ); + } + + // Update read count (outer Mutex held — no inner Mutex needed) + if found_match { + let rc_map = data.read_counts.entry(store_key).or_default(); + *rc_map.entry(timestamp_range).or_insert(0) += 1; + } + + #[cfg(feature = "lock_profiling")] + { + let lock_hold_duration = lock_hold_start.elapsed(); + info!( + "🔓 Exact query lock hold time: {:.2}ms (metric: {}, agg_id: {}, found: {})", + lock_hold_duration.as_secs_f64() * 1000.0, + metric, + aggregation_id, + !results.is_empty() + ); + } + + let query_duration = query_start_time.elapsed(); + debug!( + "Exact timestamp query took: {:.2}ms (found: {})", + query_duration.as_secs_f64() * 1000.0, + !results.is_empty() + ); + + Ok(results) + } + + fn get_earliest_timestamp_per_aggregation_id( + &self, + ) -> Result, Box> { + let data = self.lock.lock().unwrap(); + Ok(data.earliest_timestamp_per_aggregation_id.clone()) + } + + fn close(&self) -> StoreResult<()> { + // For in-memory store, no cleanup needed + info!("SimpleMapStoreGlobal closed"); + Ok(()) + } +} diff --git a/asap-query-engine/src/stores/simple_map_store/legacy/global.rs b/asap-query-engine/src/stores/simple_map_store/legacy/global.rs index d0bdc41..5d842e0 100644 --- a/asap-query-engine/src/stores/simple_map_store/legacy/global.rs +++ b/asap-query-engine/src/stores/simple_map_store/legacy/global.rs @@ -383,7 +383,7 @@ impl Store for LegacySimpleMapStoreGlobal { results .entry(key_opt.clone()) .or_default() - .push((*timestamp_range, precompute.clone_boxed_core())); + .push((*timestamp_range, precompute.clone_boxed_core().into())); total_entries += 1; } @@ -485,7 +485,7 @@ impl Store for LegacySimpleMapStoreGlobal { results .entry(key_opt.clone()) .or_default() - .push((timestamp_range, precompute.clone_boxed_core())); + .push((timestamp_range, precompute.clone_boxed_core().into())); total_entries += 1; } diff --git a/asap-query-engine/src/stores/simple_map_store/legacy/per_key.rs b/asap-query-engine/src/stores/simple_map_store/legacy/per_key.rs index 7075543..8f6745c 100644 --- a/asap-query-engine/src/stores/simple_map_store/legacy/per_key.rs +++ b/asap-query-engine/src/stores/simple_map_store/legacy/per_key.rs @@ -452,7 +452,7 @@ impl Store for LegacySimpleMapStorePerKey { results .entry(key_opt.clone()) .or_default() - .push((*timestamp_range, precompute.clone_boxed_core())); + .push((*timestamp_range, precompute.clone_boxed_core().into())); total_entries += 1; } @@ -572,7 +572,7 @@ impl Store for LegacySimpleMapStorePerKey { results .entry(key_opt.clone()) .or_default() - .push((timestamp_range, precompute.clone_boxed_core())); + .push((timestamp_range, precompute.clone_boxed_core().into())); total_entries += 1; } diff --git a/asap-query-engine/src/stores/simple_map_store/mod.rs b/asap-query-engine/src/stores/simple_map_store/mod.rs index ad93dbd..2600c28 100644 --- a/asap-query-engine/src/stores/simple_map_store/mod.rs +++ b/asap-query-engine/src/stores/simple_map_store/mod.rs @@ -1,4 +1,7 @@ +mod common; +pub mod global; pub mod legacy; +pub mod per_key; use crate::data_model::{ AggregateCore, CleanupPolicy, LockStrategy, PrecomputedOutput, StreamingConfig, diff --git a/asap-query-engine/src/stores/simple_map_store/per_key.rs b/asap-query-engine/src/stores/simple_map_store/per_key.rs new file mode 100644 index 0000000..5a6cbd3 --- /dev/null +++ b/asap-query-engine/src/stores/simple_map_store/per_key.rs @@ -0,0 +1,715 @@ +use crate::data_model::{AggregateCore, CleanupPolicy, PrecomputedOutput, StreamingConfig}; +use crate::stores::simple_map_store::common::{ + EpochID, InternTable, MetricBucketMap, MetricID, MutableEpoch, SealedEpoch, TimestampRange, +}; +use crate::stores::{Store, StoreResult, TimestampedBucketsMap}; +use dashmap::DashMap; +use std::collections::{BTreeMap, HashMap}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::Instant; +use tracing::{debug, error, info}; + +type StoreKey = u64; // aggregation_id + +/// Per-aggregation_id data protected by RwLock +struct StoreKeyData { + /// Label interning table (Optimization 1) + intern: InternTable, + + /// Active epoch — always present, accepts inserts. + current_epoch: MutableEpoch, + + /// Sealed (immutable) epochs stored as flat sorted Vecs (Optimization 2). + sealed_epochs: BTreeMap, + + /// Monotonically increasing ID of the current epoch. + current_epoch_id: EpochID, + + /// Max distinct time-windows per epoch before sealing. + /// None = unlimited (set on first insert from num_aggregates_to_retain). + epoch_capacity: Option, + + /// Max total epochs (1 current + sealed) to retain before dropping the oldest. + max_epochs: usize, + + /// Track how many times each timestamp range has been read. + /// Behind Mutex so range queries can use a read lock on the outer RwLock. + read_counts: Mutex>, +} + +impl StoreKeyData { + fn new() -> Self { + Self { + intern: InternTable::new(), + current_epoch: MutableEpoch::new(), + sealed_epochs: BTreeMap::new(), + current_epoch_id: 0, + epoch_capacity: None, + max_epochs: 4, + read_counts: Mutex::new(HashMap::new()), + } + } + + /// Set epoch_capacity on first insert (no-op after first call). + fn configure_epochs(&mut self, num_aggregates_to_retain: Option) { + if self.epoch_capacity.is_none() { + if let Some(cap) = num_aggregates_to_retain { + self.epoch_capacity = Some(cap as usize); + } + } + } + + /// Seal the current epoch when full, then evict the minimum number of oldest windows + /// to keep total distinct windows ≤ `epoch_capacity * max_epochs`. + /// + /// Matches legacy per-window eviction semantics: only the exact number of windows + /// needed to reach the retention limit are removed, which may be fewer than a full epoch. + fn maybe_rotate_epoch(&mut self) { + let capacity = match self.epoch_capacity { + Some(c) if c > 0 => c, + _ => return, // unlimited + }; + let retention_limit = capacity * self.max_epochs; + + // Step 1: seal current epoch if it has hit the window capacity threshold. + if self.current_epoch.window_count() >= capacity { + let hint = self.current_epoch.len(); + let old = std::mem::replace(&mut self.current_epoch, MutableEpoch::with_capacity(hint)); + self.sealed_epochs.insert(self.current_epoch_id, old.seal()); + self.current_epoch_id += 1; + } + + // Step 2: evict oldest windows until total distinct windows ≤ retention_limit. + // Uses O(E) distinct_window_count() calls (E ≤ max_epochs, a small constant). + let total: usize = self.current_epoch.window_count() + + self + .sealed_epochs + .values() + .map(|e| e.distinct_window_count()) + .sum::(); + + if total <= retention_limit { + return; + } + let mut over = total - retention_limit; + + while over > 0 { + let oldest_id = match self.sealed_epochs.keys().next().copied() { + Some(id) => id, + None => break, + }; + let oldest_windows = self.sealed_epochs[&oldest_id].unique_windows(); + let n_evict = over.min(oldest_windows.len()); + let to_remove = oldest_windows[..n_evict].to_vec(); + over -= n_evict; + + { + let read_counts = self.read_counts.get_mut().unwrap(); + for w in &to_remove { + read_counts.remove(w); + } + } + if n_evict == oldest_windows.len() { + self.sealed_epochs.remove(&oldest_id); + } else { + self.sealed_epochs + .get_mut(&oldest_id) + .unwrap() + .remove_windows(&to_remove); + } + } + } + + /// Apply ReadBased cleanup across current and sealed epochs. + fn cleanup_read_based(&mut self, metric: &str, aggregation_id: u64, threshold: u64) { + let read_counts = self.read_counts.get_mut().unwrap(); + + let windows_to_remove: Vec = read_counts + .iter() + .filter(|(_, &count)| count >= threshold) + .map(|(range, _)| *range) + .collect(); + + if windows_to_remove.is_empty() { + return; + } + + for window in &windows_to_remove { + debug!( + "Removed aggregate for {} aggregation_id {} window {}-{} (read_count >= threshold: {})", + metric, aggregation_id, window.0, window.1, threshold + ); + read_counts.remove(window); + } + + // Remove from current epoch. + self.current_epoch.remove_windows(&windows_to_remove); + + // Remove from sealed epochs; drop any that become empty. + self.sealed_epochs.retain(|_, epoch| { + epoch.remove_windows(&windows_to_remove); + !epoch.is_empty() + }); + } +} + +/// In-memory storage implementation using per-key locks for concurrency +pub struct SimpleMapStorePerKey { + // Lock-free concurrent outer map - per aggregation_id + store: DashMap>>, + + // Separate concurrent maps for global state + earliest_timestamps: DashMap, + metrics: DashMap, // HashSet equivalent + items_inserted: DashMap, + + // Store the streaming configuration + streaming_config: Arc, + + // Policy for cleaning up old aggregates + cleanup_policy: CleanupPolicy, +} + +impl SimpleMapStorePerKey { + pub fn new(streaming_config: Arc, cleanup_policy: CleanupPolicy) -> Self { + Self { + store: DashMap::new(), + earliest_timestamps: DashMap::new(), + metrics: DashMap::new(), + items_inserted: DashMap::new(), + streaming_config, + cleanup_policy, + } + } + + fn cleanup_old_aggregates( + &self, + data: &mut StoreKeyData, + metric: &str, + aggregation_id: u64, + num_aggregates_to_retain: Option, + read_count_threshold: Option, + ) { + match self.cleanup_policy { + CleanupPolicy::CircularBuffer => { + // configure_epochs was already called before insert; + // rotation is handled by maybe_rotate_epoch after each insert batch. + // Nothing additional needed here. + let _ = (num_aggregates_to_retain, metric, aggregation_id); + } + CleanupPolicy::ReadBased => { + if let Some(threshold) = read_count_threshold { + data.cleanup_read_based(metric, aggregation_id, threshold); + } + } + CleanupPolicy::NoCleanup => { + // Do nothing - no cleanup + } + } + } + + fn insert_for_store_key( + &self, + store_key: &StoreKey, + metric: &str, + items: Vec<(PrecomputedOutput, Box)>, + ) -> StoreResult<()> { + let aggregation_id = *store_key; + let metric_key = metric.to_string(); + let inserted_delta = items.len() as u64; + + // Opt 4: compute batch minimum timestamp before acquiring any lock. + // Collapses N per-item atomic fetch_min calls into one (Opt 4). + let batch_min_ts = items + .iter() + .map(|(o, _)| o.start_timestamp) + .min() + .unwrap_or(u64::MAX); + + // Measure lock acquisition time + #[cfg(feature = "lock_profiling")] + let lock_wait_start = Instant::now(); + + // Get or create the store data for this key + let store_data_lock = self + .store + .entry(*store_key) + .or_insert_with(|| Arc::new(RwLock::new(StoreKeyData::new()))); + + #[cfg(feature = "lock_profiling")] + { + let lock_wait_duration = lock_wait_start.elapsed(); + info!( + "🔒 Insert DashMap get time: {:.2}ms (metric: {}, agg_id: {}, items: {})", + lock_wait_duration.as_secs_f64() * 1000.0, + metric, + *store_key, + items.len() + ); + } + + #[cfg(feature = "lock_profiling")] + let rwlock_wait_start = Instant::now(); + + // Acquire write lock for this aggregation_id only + let mut data = store_data_lock.write().map_err(|e| { + format!( + "Failed to acquire write lock for aggregation_id {}: {}", + store_key, e + ) + })?; + + #[cfg(feature = "lock_profiling")] + { + let rwlock_wait_duration = rwlock_wait_start.elapsed(); + info!( + "🔒 Insert RwLock wait time: {:.2}ms (metric: {}, agg_id: {}, items: {})", + rwlock_wait_duration.as_secs_f64() * 1000.0, + metric, + *store_key, + items.len() + ); + } + + #[cfg(feature = "lock_profiling")] + let lock_hold_start = Instant::now(); + + // Create metric if needed (lock-free DashMap insert) + self.metrics.entry(metric_key.clone()).or_insert(()); + + // Opt 4: one atomic earliest-ts update per batch using the pre-computed minimum. + // Replaces N per-item fetch_min calls with a single one. + self.earliest_timestamps + .entry(aggregation_id) + .and_modify(|earliest| { + earliest.fetch_min(batch_min_ts, Ordering::Relaxed); + }) + .or_insert_with(|| AtomicU64::new(batch_min_ts)); + + // Update insertion counter once per grouped batch (instead of once per item). + let items_inserted_counter = self + .items_inserted + .entry(metric_key) + .or_insert_with(|| AtomicU64::new(0)); + let previous_total = items_inserted_counter.fetch_add(inserted_delta, Ordering::Relaxed); + let new_total = previous_total + inserted_delta; + if new_total / 1000 > previous_total / 1000 { + debug!("Inserted {} items into {}", new_total, metric); + } + + // Get aggregation config once for cleanup settings + let aggregation_config = self + .streaming_config + .get_aggregation_config(aggregation_id) + .ok_or_else(|| format!("Aggregation config not found for {}", aggregation_id))?; + + // Configure epoch capacity on first insert (Optimization 2) + if aggregation_config.aggregation_type != "DeltaSetAggregator" { + data.configure_epochs(aggregation_config.num_aggregates_to_retain); + } + + for (output, precompute) in items { + // Intern the label key (Optimization 1) + let timestamp_range = (output.start_timestamp, output.end_timestamp); + let metric_id: MetricID = data.intern.intern(output.key); + + // Insert into current (mutable) epoch. + data.current_epoch + .insert(metric_id, timestamp_range, Arc::from(precompute)); + + // After each item, check if we should rotate (CircularBuffer, Optimization 2) + if aggregation_config.aggregation_type != "DeltaSetAggregator" + && matches!(self.cleanup_policy, CleanupPolicy::CircularBuffer) + { + data.maybe_rotate_epoch(); + } + } + + // Apply retention policy if configured (but exclude DeltaSetAggregator) + if aggregation_config.aggregation_type != "DeltaSetAggregator" { + self.cleanup_old_aggregates( + &mut data, + metric, + aggregation_id, + aggregation_config.num_aggregates_to_retain, + aggregation_config.read_count_threshold, + ); + } + + #[cfg(feature = "lock_profiling")] + { + let lock_hold_duration = lock_hold_start.elapsed(); + info!( + "🔓 Insert lock hold time: {:.2}ms (metric: {}, agg_id: {})", + lock_hold_duration.as_secs_f64() * 1000.0, + metric, + *store_key + ); + } + + Ok(()) + } +} + +#[async_trait::async_trait] +impl Store for SimpleMapStorePerKey { + fn insert_precomputed_output( + &self, + output: PrecomputedOutput, + precompute: Box, + ) -> StoreResult<()> { + self.insert_precomputed_output_batch(vec![(output, precompute)]) + } + + fn insert_precomputed_output_batch( + &self, + outputs: Vec<(PrecomputedOutput, Box)>, + ) -> StoreResult<()> { + let batch_insert_start_time = Instant::now(); + let batch_size = outputs.len(); + + // Group by aggregation_id + #[allow(clippy::type_complexity)] + let mut grouped: HashMap< + StoreKey, + (String, Vec<(PrecomputedOutput, Box)>), + > = HashMap::new(); + + for (output, precompute) in outputs { + let aggregation_config = self + .streaming_config + .get_aggregation_config(output.aggregation_id); + + if aggregation_config.is_none() { + error!( + "Aggregation config not found for aggregation_id {}. Skipping insert.", + output.aggregation_id + ); + continue; + } + let aggregation_config = aggregation_config.unwrap(); + + let metric = aggregation_config.metric.clone(); + let store_key = output.aggregation_id; + + grouped + .entry(store_key) + .or_insert_with(|| (metric.clone(), Vec::new())) + .1 + .push((output, precompute)); + } + + // Process each aggregation_id group; each iteration locks at most one key. + for (store_key, (metric, items)) in grouped { + self.insert_for_store_key(&store_key, &metric, items)?; + } + + let batch_insert_duration = batch_insert_start_time.elapsed(); + debug!( + "Batch insert of {} items took: {:.2}ms", + batch_size, + batch_insert_duration.as_secs_f64() * 1000.0 + ); + Ok(()) + } + + fn query_precomputed_output( + &self, + metric: &str, + aggregation_id: u64, + start: u64, + end: u64, + ) -> Result> { + if start > end { + debug!( + "Invalid query range for metric {} agg_id {}: start {} > end {}", + metric, aggregation_id, start, end + ); + return Ok(HashMap::new()); + } + + let query_start_time = Instant::now(); + let store_key = aggregation_id; + + // Measure lock acquisition time + #[cfg(feature = "lock_profiling")] + let lock_wait_start = Instant::now(); + + // Get the store data for this aggregation_id + let store_data_lock = match self.store.get(&store_key) { + Some(lock) => lock, + None => { + info!("Metric {} not found in store", metric); + return Ok(HashMap::new()); + } + }; + + #[cfg(feature = "lock_profiling")] + { + let lock_wait_duration = lock_wait_start.elapsed(); + info!( + "🔒 Query DashMap get time: {:.2}ms (metric: {}, agg_id: {})", + lock_wait_duration.as_secs_f64() * 1000.0, + metric, + aggregation_id + ); + } + + #[cfg(feature = "lock_profiling")] + let rwlock_wait_start = Instant::now(); + + // Range queries use a read lock — no mutation of epoch data needed. + let data = store_data_lock.read().map_err(|e| { + format!( + "Failed to acquire read lock for query aggregation_id {}: {}", + store_key, e + ) + })?; + + #[cfg(feature = "lock_profiling")] + { + let rwlock_wait_duration = rwlock_wait_start.elapsed(); + info!( + "🔒 Query RwLock wait time: {:.2}ms (metric: {}, agg_id: {})", + rwlock_wait_duration.as_secs_f64() * 1000.0, + metric, + aggregation_id + ); + } + + #[cfg(feature = "lock_profiling")] + let lock_hold_start = Instant::now(); + + let mut total_entries = 0; + let mut matched_windows: Vec = Vec::new(); + + let range_scan_start_time = Instant::now(); + + let mut mid: MetricBucketMap = HashMap::with_capacity(data.intern.len()); + + // Query current (mutable) epoch. + if let Some((min_start, max_end)) = data.current_epoch.time_bounds() { + if !(min_start > end || max_end < start) { + data.current_epoch + .range_query_into(start, end, &mut mid, &mut matched_windows); + } + } + + // Query sealed epochs; skip those with no overlap. + for epoch in data.sealed_epochs.values() { + let Some((min_start, max_end)) = epoch.time_bounds() else { + continue; + }; + if min_start > end || max_end < start { + continue; + } + epoch.range_query_into(start, end, &mut mid, &mut matched_windows); + } + + // Resolve MetricIDs → labels in a single pass + let mut results: TimestampedBucketsMap = HashMap::with_capacity(mid.len()); + for (metric_id, buckets) in mid { + total_entries += buckets.len(); + let label = data.intern.resolve(metric_id).clone(); + results.insert(label, buckets); + } + + // Update read counts via inner Mutex + { + let mut read_counts = data.read_counts.lock().unwrap(); + for window in &matched_windows { + *read_counts.entry(*window).or_insert(0) += 1; + } + } + + let range_scan_duration = range_scan_start_time.elapsed(); + debug!( + "Range scanning took: {:.2}ms", + range_scan_duration.as_secs_f64() * 1000.0 + ); + + let query_duration = query_start_time.elapsed(); + debug!( + "Total query took: {:.2}ms", + query_duration.as_secs_f64() * 1000.0 + ); + + debug!( + "Found {} entries for query on {} (aggregation_id: {}, start: {}, end: {})", + total_entries, metric, aggregation_id, start, end + ); + debug!("Found {} unique keys", results.len()); + + #[cfg(feature = "lock_profiling")] + { + let lock_hold_duration = lock_hold_start.elapsed(); + info!( + "🔓 Query lock hold time: {:.2}ms (metric: {}, agg_id: {}, entries: {})", + lock_hold_duration.as_secs_f64() * 1000.0, + metric, + aggregation_id, + total_entries + ); + } + + Ok(results) + } + + fn query_precomputed_output_exact( + &self, + metric: &str, + aggregation_id: u64, + exact_start: u64, + exact_end: u64, + ) -> Result> { + if exact_start > exact_end { + debug!( + "Invalid exact query range for metric {} agg_id {}: start {} > end {}", + metric, aggregation_id, exact_start, exact_end + ); + return Ok(HashMap::new()); + } + + let query_start_time = Instant::now(); + let store_key = aggregation_id; + + // Measure lock acquisition time + #[cfg(feature = "lock_profiling")] + let lock_wait_start = Instant::now(); + + // Get the store data for this aggregation_id + let store_data_lock = match self.store.get(&store_key) { + Some(lock) => lock, + None => { + debug!("Metric {} not found in store for exact query", metric); + return Ok(HashMap::new()); + } + }; + + #[cfg(feature = "lock_profiling")] + { + let lock_wait_duration = lock_wait_start.elapsed(); + info!( + "🔒 Exact query DashMap get time: {:.2}ms (metric: {}, agg_id: {})", + lock_wait_duration.as_secs_f64() * 1000.0, + metric, + aggregation_id + ); + } + + #[cfg(feature = "lock_profiling")] + let rwlock_wait_start = Instant::now(); + + // Opt 1: exact_query takes &mut self (lazy index build), so we need a write lock. + // Range queries still use a read lock — only exact queries pay the write-lock cost. + let mut data = store_data_lock.write().map_err(|e| { + format!( + "Failed to acquire write lock for exact query aggregation_id {}: {}", + store_key, e + ) + })?; + + #[cfg(feature = "lock_profiling")] + { + let rwlock_wait_duration = rwlock_wait_start.elapsed(); + info!( + "🔒 Exact query RwLock wait time: {:.2}ms (metric: {}, agg_id: {})", + rwlock_wait_duration.as_secs_f64() * 1000.0, + metric, + aggregation_id + ); + } + + #[cfg(feature = "lock_profiling")] + let lock_hold_start = Instant::now(); + + let timestamp_range = (exact_start, exact_end); + + // Opt 1: exact_query on the mutable epoch builds the lazy offset index if absent, + // then looks up the window in O(m). Returns an owned Vec — the &mut borrow ends here. + let entries_opt: Option)>> = + data.current_epoch.exact_query(timestamp_range).or_else(|| { + data.sealed_epochs + .values() + .rev() + .find_map(|epoch| epoch.exact_query(timestamp_range)) + }); + + let mut results: TimestampedBucketsMap = HashMap::new(); + let mut total_entries = 0; + let found_match = entries_opt.is_some(); + + if let Some(entries) = entries_opt { + for (metric_id, agg) in entries { + let label = data.intern.resolve(metric_id).clone(); + results + .entry(label) + .or_default() + .push((timestamp_range, agg)); + total_entries += 1; + } + } + + if found_match { + debug!( + "Exact match FOUND for [{}, {}]: {} entries across {} keys", + exact_start, + exact_end, + total_entries, + results.len() + ); + } else { + debug!( + "Exact match NOT FOUND for metric: {}, agg_id: {}, range: [{}, {}]", + metric, aggregation_id, exact_start, exact_end + ); + } + + // Update read count — write lock already held, no inner Mutex needed + if found_match { + let mut read_counts = data.read_counts.lock().unwrap(); + *read_counts.entry(timestamp_range).or_insert(0) += 1; + } + + #[cfg(feature = "lock_profiling")] + { + let lock_hold_duration = lock_hold_start.elapsed(); + info!( + "🔓 Exact query lock hold time: {:.2}ms (metric: {}, agg_id: {}, found: {})", + lock_hold_duration.as_secs_f64() * 1000.0, + metric, + aggregation_id, + !results.is_empty() + ); + } + + let query_duration = query_start_time.elapsed(); + debug!( + "Exact timestamp query took: {:.2}ms (found: {})", + query_duration.as_secs_f64() * 1000.0, + !results.is_empty() + ); + + Ok(results) + } + + fn get_earliest_timestamp_per_aggregation_id( + &self, + ) -> Result, Box> { + // No lock needed - DashMap with AtomicU64 + let result = self + .earliest_timestamps + .iter() + .map(|entry| (*entry.key(), entry.value().load(Ordering::Relaxed))) + .collect(); + + Ok(result) + } + + fn close(&self) -> StoreResult<()> { + // For in-memory store, no cleanup needed + info!("SimpleMapStorePerKey closed"); + Ok(()) + } +} diff --git a/asap-query-engine/src/stores/traits.rs b/asap-query-engine/src/stores/traits.rs index a851071..679568c 100644 --- a/asap-query-engine/src/stores/traits.rs +++ b/asap-query-engine/src/stores/traits.rs @@ -1,8 +1,9 @@ use crate::data_model::{AggregateCore, KeyByLabelValues, PrecomputedOutput}; use std::collections::HashMap; +use std::sync::Arc; /// A bucket with its timestamp range: ((start_timestamp, end_timestamp), aggregate) -pub type TimestampedBucket = ((u64, u64), Box); +pub type TimestampedBucket = ((u64, u64), Arc); /// Map from key to timestamped buckets (sparse - only contains buckets that exist) pub type TimestampedBucketsMap = HashMap, Vec>;