Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0edaa07
Rename SimpleMapStore impls to Legacy* and move to legacy/ submodule …
zzylol Mar 23, 2026
2cab5bd
Fix cargo fmt formatting in simple_map_store/mod.rs
Mar 24, 2026
4934916
Add Store correctness contract test suite
zzylol Mar 20, 2026
09a2ebf
Extend store contract tests: all accumulator types, keyed entries, De…
zzylol Mar 20, 2026
37b14bf
Fix cargo fmt violations in store_correctness_tests
zzylol Mar 20, 2026
2be7398
Remove unused assert_clone_fidelity function (clippy dead_code)
zzylol Mar 20, 2026
2fa8e23
Fix extra blank line (cargo fmt)
zzylol Mar 20, 2026
93d06df
Replace SimpleStore with inverted index (label -> BTreeMap<Time>)
zzylol Mar 9, 2026
4c2675e
Expand INDEX_DESIGN.md with full theoretical complexity analysis
zzylol Mar 9, 2026
ac83c2f
Apply three VictoriaMetrics-inspired optimizations to SimpleMapStore
zzylol Mar 9, 2026
4f5cd59
Fix Dockerfile bench stub and apply cargo fmt
zzylol Mar 10, 2026
a75ef74
Add legacy store for benchmark comparison
zzylol Mar 10, 2026
c7fc505
Fix clippy errors: type_complexity, unused_mut, collapsible_if, depre…
zzylol Mar 11, 2026
e926ed0
Three further index optimizations: time-primary scan, flat sealed epo…
zzylol Mar 11, 2026
dc46cf8
Make MutableEpoch insert O(1) amortized: append-only raw buffer
zzylol Mar 11, 2026
626c35f
Add window_to_ids exact-lookup index to MutableEpoch: O(M) → O(m)
zzylol Mar 11, 2026
24bb6f8
update
zzylol Mar 11, 2026
1966ce3
refactor merge simple store benchmarks
zzylol Mar 20, 2026
aa98701
refactor move legacy simple stores into module
zzylol Mar 20, 2026
564748c
refactor: optimize MutableEpoch insert path (columnar storage, lazy i…
zzylol Mar 20, 2026
24d4e0f
docs: update INDEX_DESIGN to reflect columnar MutableEpoch/SealedEpoc…
zzylol Mar 20, 2026
11fb532
fix: match legacy per-window CircularBuffer eviction semantics
zzylol Mar 20, 2026
761cc47
style: apply cargo fmt
zzylol Mar 20, 2026
c237588
fix: extract GroupedBatch type alias to satisfy clippy::type_complexity
zzylol Mar 20, 2026
8e0b4e3
fix: restore legacy store files to simple_store_opt baseline
Mar 24, 2026
fd93329
fix: update legacy stores to return Arc<dyn AggregateCore> and fix mo…
Mar 24, 2026
f278e01
fix: declare global and per_key as modules so common.rs items are rea…
Mar 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions asap-query-engine/src/engines/physical/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ mod tests {
use crate::precompute_operators::SumAccumulator;
use crate::stores::traits::TimestampedBucket;

fn make_bucket(acc: Box<dyn crate::AggregateCore>) -> TimestampedBucket {
fn make_bucket(acc: Arc<dyn crate::AggregateCore>) -> TimestampedBucket {
((0, 0), acc)
}

Expand All @@ -184,13 +184,13 @@ mod tests {
let key1 = KeyByLabelValues {
labels: vec!["host-a".to_string()],
};
let acc1 = Box::new(SumAccumulator::with_sum(100.0));
let acc1 = Arc::new(SumAccumulator::with_sum(100.0)) as Arc<dyn crate::AggregateCore>;
store_result.insert(Some(key1), vec![make_bucket(acc1)]);

let key2 = KeyByLabelValues {
labels: vec!["host-b".to_string()],
};
let acc2 = Box::new(SumAccumulator::with_sum(200.0));
let acc2 = Arc::new(SumAccumulator::with_sum(200.0)) as Arc<dyn crate::AggregateCore>;
store_result.insert(Some(key2), vec![make_bucket(acc2)]);

let label_names = vec!["host".to_string()];
Expand All @@ -207,7 +207,7 @@ mod tests {
let key1 = KeyByLabelValues {
labels: vec!["host-a".to_string(), "region-1".to_string()],
};
let acc1 = Box::new(SumAccumulator::with_sum(100.0));
let acc1 = Arc::new(SumAccumulator::with_sum(100.0)) as Arc<dyn crate::AggregateCore>;
store_result.insert(Some(key1), vec![make_bucket(acc1)]);

let label_names = vec!["host".to_string(), "region".to_string()];
Expand All @@ -221,7 +221,7 @@ mod tests {
fn test_store_result_to_record_batch_no_key() {
let mut store_result: TimestampedBucketsMap = HashMap::new();

let acc = Box::new(SumAccumulator::with_sum(500.0));
let acc = Arc::new(SumAccumulator::with_sum(500.0)) as Arc<dyn crate::AggregateCore>;
store_result.insert(None, vec![make_bucket(acc)]);

let label_names: Vec<String> = vec![];
Expand Down Expand Up @@ -293,7 +293,9 @@ mod tests {
};
store_result.insert(
Some(key),
vec![make_bucket(Box::new(SumAccumulator::with_sum(1.0)))],
vec![make_bucket(
Arc::new(SumAccumulator::with_sum(1.0)) as Arc<dyn crate::AggregateCore>
)],
);
let label_names: Vec<String> = vec!["l1", "l2", "l3", "l4", "l5"]
.into_iter()
Expand All @@ -312,7 +314,9 @@ mod tests {
};
store_result.insert(
Some(key),
vec![make_bucket(Box::new(SumAccumulator::with_sum(42.0)))],
vec![make_bucket(
Arc::new(SumAccumulator::with_sum(42.0)) as Arc<dyn crate::AggregateCore>
)],
);
let label_names = vec!["host".to_string(), "region".to_string()];
let batch = store_result_to_record_batch(&store_result, &label_names).unwrap();
Expand Down Expand Up @@ -345,15 +349,15 @@ mod tests {
vec![
(
(100, 200),
Box::new(SumAccumulator::with_sum(10.0)) as Box<dyn crate::AggregateCore>,
Arc::new(SumAccumulator::with_sum(10.0)) as Arc<dyn crate::AggregateCore>,
),
(
(200, 300),
Box::new(SumAccumulator::with_sum(20.0)) as Box<dyn crate::AggregateCore>,
Arc::new(SumAccumulator::with_sum(20.0)) as Arc<dyn crate::AggregateCore>,
),
(
(300, 400),
Box::new(SumAccumulator::with_sum(30.0)) as Box<dyn crate::AggregateCore>,
Arc::new(SumAccumulator::with_sum(30.0)) as Arc<dyn crate::AggregateCore>,
),
],
);
Expand Down Expand Up @@ -426,13 +430,19 @@ mod tests {
store_result.insert(
Some(key1),
vec![
make_bucket(Box::new(SumAccumulator::with_sum(1.0))),
make_bucket(Box::new(SumAccumulator::with_sum(2.0))),
make_bucket(
Arc::new(SumAccumulator::with_sum(1.0)) as Arc<dyn crate::AggregateCore>
),
make_bucket(
Arc::new(SumAccumulator::with_sum(2.0)) as Arc<dyn crate::AggregateCore>
),
],
);
store_result.insert(
Some(key2),
vec![make_bucket(Box::new(SumAccumulator::with_sum(3.0)))],
vec![make_bucket(
Arc::new(SumAccumulator::with_sum(3.0)) as Arc<dyn crate::AggregateCore>
)],
);
assert_eq!(count_store_result_rows(&store_result), 3);

Expand Down
6 changes: 3 additions & 3 deletions asap-query-engine/src/engines/simple_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ impl SimpleEngine {
}
// Extract bucket from timestamped tuple
let (_, bucket) = timestamped_buckets.into_iter().next().unwrap();
(key, bucket)
(key, bucket.as_ref().clone_boxed_core())
})
.collect()
} else {
Expand Down Expand Up @@ -2811,9 +2811,9 @@ impl SimpleEngine {
};

// Build lookup: bucket_start_timestamp -> bucket for O(1) access
let bucket_map: HashMap<u64, &Box<dyn AggregateCore>> = timestamped_buckets
let bucket_map: HashMap<u64, &dyn AggregateCore> = timestamped_buckets
.iter()
.map(|((start, _), bucket)| (*start, bucket))
.map(|((start, _), bucket)| (*start, bucket.as_ref()))
.collect();

debug!(
Expand Down
245 changes: 245 additions & 0 deletions asap-query-engine/src/stores/simple_map_store/INDEX_DESIGN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
# SimpleStore Index Design

## Overview

`SimpleMapStore` uses an **epoch-partitioned columnar store** with label interning. The design applies six optimizations targeting the two most expensive paths: ingestion and range scan.

| Opt | What | Where |
|-----|------|-------|
| 1 | Lazy `window_to_ids` index — built on first exact query, invalidated cheaply on insert | `MutableEpoch` |
| 2 | Offset-based index — stores `u32` column offsets, not `Arc` clones | `MutableEpoch::exact_query` |
| 3 | Monotonic ingest fast path — skip `HashSet` probe for consecutive same-window inserts | `MutableEpoch::insert` |
| 4 | Batch metadata hoisting — config lookup, label interning, timestamp update moved out of per-item loop | `global.rs`, `per_key.rs` |
| 5 | Columnar storage — three parallel arrays; range scan hot loop touches only `windows_col` | `MutableEpoch` |
| 6 | Pre-allocated epoch buffers — `with_capacity(prev_epoch.len())` on rotation | `maybe_rotate_epoch` |

---

## Data Structures

### Types (`common.rs`)

```rust
pub type MetricID = u32; // compact interned label ID
pub type EpochID = u64; // monotonically increasing epoch counter
pub type TimestampRange = (u64, u64); // (start_timestamp, end_timestamp)
pub type MetricBucketMap = HashMap<MetricID, Vec<(TimestampRange, Arc<dyn AggregateCore>)>>;
```

### `InternTable` (`common.rs`)

```
InternTable {
label_to_id: HashMap<Option<KeyByLabelValues>, MetricID>
id_to_label: Vec<Option<KeyByLabelValues>>
}
```

- `intern(label)` — O(1) amortized via `HashMap::entry`; no double-hashing
- `resolve(id)` — O(1) indexed Vec lookup
- All internal maps key on `MetricID` (u32), never on full label strings

### `MutableEpoch` (`common.rs`)

Active epoch: append-only insert, O(1) amortized.

```
MutableEpoch {
// Columnar storage (Opt 5): three parallel arrays
windows_col: Vec<TimestampRange>
metric_ids_col: Vec<MetricID>
aggregates_col: Vec<Arc<dyn AggregateCore>>

// Distinct-window count for epoch rotation threshold
windows_set: HashSet<TimestampRange>

// Monotonic ingest fast path (Opt 3)
last_window: Option<TimestampRange>

// Lazy offset index (Opt 1 + 2): built on first exact_query, None after any insert
window_to_ids: Option<HashMap<TimestampRange, Vec<u32>>>

// Epoch bounds for O(1) skip check (updated incrementally on insert)
min_start: Option<u64>
max_end: Option<u64>
}
```

**Insert** (`O(1)` amortized):
- Opt 3: if incoming window == `last_window`, skip `windows_set.insert` entirely
- Three `Vec::push` calls — no secondary index maintenance
- `window_to_ids = None` — single pointer-width write to invalidate the index

**`seal()` → `SealedEpoch`** (`O(M log M)`, paid once at rotation):
- Zips the three columns into tuples, sorts by `(TimestampRange, MetricID)`, moves `Arc`s without cloning

**`exact_query(&mut self)`** (`O(M)` first call after a write, `O(m)` cached):
- Opt 1 + 2: if `window_to_ids` is `None`, build it from `windows_col` in one pass storing `u32` offsets
- Cache is valid until the next `insert`

**`range_query_into`** (`O(M)` mutable epoch):
- Opt 5: hot loop iterates only `windows_col`; aggregate pointer only chased on match

### `SealedEpoch` (`common.rs`)

Immutable epoch: flat sorted `Vec` for cache-friendly binary-search scans.

```
SealedEpoch {
entries: Vec<(TimestampRange, MetricID, Arc<dyn AggregateCore>)> // sorted by (TR, MetricID)
min_start: Option<u64>
max_end: Option<u64>
}
```

**`range_query_into`** (`O(log N + k)`): `partition_point` to find start, linear scan until `tr.0 > end`

**`exact_query`** (`O(log N + m)`): `partition_point` to find the window, linear scan while `tr == range`

### Per-Key Store (`per_key.rs`)

Each `aggregation_id` gets its own `StoreKeyData` behind a per-key `RwLock`:

```
DashMap<aggregation_id, Arc<RwLock<StoreKeyData>>>

StoreKeyData {
intern: InternTable
current_epoch: MutableEpoch // always present, accepts inserts
sealed_epochs: BTreeMap<EpochID, SealedEpoch>
current_epoch_id: EpochID
epoch_capacity: Option<usize> // None = unlimited
max_epochs: usize // default 4
read_counts: Mutex<HashMap<TimestampRange, u64>>
}
```

`read_counts` is behind an inner `Mutex` so queries can hold the outer `RwLock::read` and still update counts.

### Global Store (`global.rs`)

Same per-key epoch structure, but all aggregation_ids share a single `Mutex<StoreData>`:

```
Mutex<StoreData>

StoreData {
stores: HashMap<aggregation_id, PerKeyState>
read_counts: HashMap<aggregation_id, HashMap<TimestampRange, u64>>
metrics: HashSet<String>
}

PerKeyState {
intern: InternTable
current_epoch: MutableEpoch
sealed_epochs: BTreeMap<EpochID, SealedEpoch>
current_epoch_id: EpochID
epoch_capacity: Option<usize>
max_epochs: usize
}
```

No inner `Mutex` for `read_counts` — the outer `Mutex` already serializes all access.

---

## Complexity

### Variables

| Symbol | Meaning |
|--------|---------|
| A | Distinct aggregation IDs |
| L | Distinct label combinations |
| N | Distinct time windows per epoch |
| E | Epochs retained (≤ `max_epochs`, default 4) |
| M | Total entries in an epoch (`windows_col.len()`) |
| k | Matched results or entries removed |
| m | Labels present in a specific time window |

### Time Complexity

| Operation | Time | Notes |
|-----------|------|-------|
| **Insert** | O(1) amortized | Three `Vec::push` + conditional `HashSet::insert` (skipped by Opt 3 on ordered ingest) |
| **Seal** | O(M log M) | Paid once at rotation; not on insert hot path |
| **Epoch rotation** | O(M log M + 1) | Seal current + drop oldest in O(1) |
| **Range query** (mutable epoch) | O(M) | Linear scan of `windows_col` only |
| **Range query** (sealed epoch) | O(log N + k) | Binary search + linear scan |
| **Range query** (full store) | O(M + E · (log N + k)) | One mutable scan + binary-search per sealed epoch |
| **Exact query** (first after write) | O(M) | Build `window_to_ids` from `windows_col` |
| **Exact query** (cached) | O(m) | HashMap lookup + `Arc::clone` per offset |
| **Exact query** (sealed epoch) | O(log N + m) | Binary search to window + linear scan |
| **ReadBased cleanup** | O(N + k · m) | Scan `read_counts` + targeted removal via `remove_windows` |
| **get_earliest_timestamp** | O(A) | DashMap iteration with AtomicU64 loads |

### Space

| Structure | Space |
|-----------|-------|
| `InternTable` | O(L) per agg_id |
| `MutableEpoch` columns | O(M) |
| `SealedEpoch` entries | O(M) per sealed epoch |
| `window_to_ids` (when built) | O(M) |
| `read_counts` | O(N) total |
| **Total** | **O(A · E · M)** where E ≤ `max_epochs` |

---

## Query Mechanics

### Range Query `[start, end]`

1. Acquire **read lock** on `StoreKeyData`
2. Scan `current_epoch.range_query_into(start, end)` — O(M), touches only `windows_col` in hot loop
3. For each sealed epoch (newest first):
- Skip if `min_start > end || max_end < start` — O(1) bounds check
- `sealed_epoch.range_query_into(start, end)` — O(log N + k) binary search + scan
4. Resolve MetricIDs → labels via `InternTable` in one pass
5. Briefly acquire inner `Mutex` to update `read_counts`

### Exact Query `(exact_start, exact_end)`

1. Acquire **write lock** (needed to potentially build the lazy `window_to_ids` index)
2. Try `current_epoch.exact_query(range)` — builds/uses cached `window_to_ids`
3. If not found, iterate `sealed_epochs.values().rev()` calling `SealedEpoch::exact_query`
4. Return owned `Vec<(MetricID, Arc<dyn AggregateCore>)>`, drop write lock
5. Re-acquire read lock to resolve MetricIDs → labels

---

## Cleanup Policies

### CircularBuffer

Epoch-based eviction — O(1) amortized per insert:

1. On first insert: set `epoch_capacity` from `num_aggregates_to_retain`
2. After each insert: call `maybe_rotate_epoch()`
- If `current_epoch.window_count() >= epoch_capacity`: seal current epoch, open new one with `with_capacity(hint)` (Opt 6)
- If `1 + sealed_epochs.len() > max_epochs`: pop oldest sealed epoch in O(1), purge its windows from `read_counts`

### ReadBased

Read-count triggered eviction:

1. Scan `read_counts` for windows with `count >= threshold`
2. For each such window, call `MutableEpoch::remove_windows` or `SealedEpoch::remove_windows`
3. Drop any epochs that become empty

### NoCleanup

No eviction — data accumulates indefinitely.

---

## Concurrency (Per-Key Store)

| Operation | Lock |
|-----------|------|
| **Insert** | `RwLock::write` for the batch duration |
| **Range query** | `RwLock::read` → brief `Mutex::lock` on `read_counts` |
| **Exact query** | `RwLock::write` (lazy index build) → drop → `RwLock::read` for label resolution |
| **Cleanup** | Under existing write lock; `Mutex::get_mut()` bypasses inner lock |

Multiple readers per `aggregation_id` run concurrently. Writers only block readers of the same `aggregation_id`.
Loading
Loading