From 0edaa07c0be12b4a814a0dd073141374b430c157 Mon Sep 17 00:00:00 2001 From: Zeying Zhu <50204836+zzylol@users.noreply.github.com> Date: Mon, 23 Mar 2026 18:19:56 -0400 Subject: [PATCH 01/27] Rename SimpleMapStore impls to Legacy* and move to legacy/ submodule (#220) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move `SimpleMapStoreGlobal` → `LegacySimpleMapStoreGlobal` and `SimpleMapStorePerKey` → `LegacySimpleMapStorePerKey` under a new `simple_map_store/legacy/` submodule, in preparation for introducing optimised replacements that will reclaim the original names (PR #175 part b). - `legacy/global.rs` / `legacy/per_key.rs`: original implementations, renamed with the `Legacy` prefix throughout (struct, impl, log messages) - `legacy/mod.rs`: re-exports both legacy types - `simple_map_store/mod.rs`: references legacy module; `SimpleMapStore` enum now wraps `LegacySimpleMapStoreGlobal` / `LegacySimpleMapStorePerKey` - `benches/simple_store_bench.rs`: doc comment updated to reflect that the bench profiles the legacy store implementation Public API (`SimpleMapStore`, `Store`) is unchanged. Co-authored-by: zz_y Co-authored-by: Claude Sonnet 4.6 --- .../src/stores/simple_map_store/mod.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/asap-query-engine/src/stores/simple_map_store/mod.rs b/asap-query-engine/src/stores/simple_map_store/mod.rs index ad93dbd..5c18ab1 100644 --- a/asap-query-engine/src/stores/simple_map_store/mod.rs +++ b/asap-query-engine/src/stores/simple_map_store/mod.rs @@ -29,14 +29,12 @@ impl SimpleMapStore { lock_strategy: LockStrategy, ) -> Self { match lock_strategy { - LockStrategy::Global => SimpleMapStore::Global(LegacySimpleMapStoreGlobal::new( - streaming_config, - cleanup_policy, - )), - LockStrategy::PerKey => SimpleMapStore::PerKey(LegacySimpleMapStorePerKey::new( - streaming_config, - cleanup_policy, - )), + LockStrategy::Global => { + SimpleMapStore::Global(LegacySimpleMapStoreGlobal::new(streaming_config, cleanup_policy)) + } + LockStrategy::PerKey => { + SimpleMapStore::PerKey(LegacySimpleMapStorePerKey::new(streaming_config, cleanup_policy)) + } } } } From 2cab5bd2c07c4e3348f1c4d3f8a740b75e44a26d Mon Sep 17 00:00:00 2001 From: zz_y Date: Mon, 23 Mar 2026 19:31:27 -0500 Subject: [PATCH 02/27] Fix cargo fmt formatting in simple_map_store/mod.rs Co-Authored-By: Claude Sonnet 4.6 --- .../src/stores/simple_map_store/mod.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/asap-query-engine/src/stores/simple_map_store/mod.rs b/asap-query-engine/src/stores/simple_map_store/mod.rs index 5c18ab1..ad93dbd 100644 --- a/asap-query-engine/src/stores/simple_map_store/mod.rs +++ b/asap-query-engine/src/stores/simple_map_store/mod.rs @@ -29,12 +29,14 @@ impl SimpleMapStore { lock_strategy: LockStrategy, ) -> Self { match lock_strategy { - LockStrategy::Global => { - SimpleMapStore::Global(LegacySimpleMapStoreGlobal::new(streaming_config, cleanup_policy)) - } - LockStrategy::PerKey => { - SimpleMapStore::PerKey(LegacySimpleMapStorePerKey::new(streaming_config, cleanup_policy)) - } + LockStrategy::Global => SimpleMapStore::Global(LegacySimpleMapStoreGlobal::new( + streaming_config, + cleanup_policy, + )), + LockStrategy::PerKey => SimpleMapStore::PerKey(LegacySimpleMapStorePerKey::new( + streaming_config, + cleanup_policy, + )), } } } From 4934916e1664a01d8c1d298c6d65d0292cc4e835 Mon Sep 17 00:00:00 2001 From: zz_y Date: Thu, 19 Mar 2026 21:22:04 -0500 Subject: [PATCH 03/27] Add Store correctness contract test suite MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Defines a run_contract_suite() function that tests every observable behaviour of a Store implementation: - Empty-store edge cases (range query, exact query, earliest timestamp) - Single insert: range query hit/miss, exact query hit/miss (wrong start, wrong end) - Batch insert: count correctness, chronological ordering guaranteed - Partial range filtering (windows outside query range excluded) - Aggregation-ID isolation (inserts into agg 1 not visible to agg 2) - Earliest-timestamp tracking: global minimum, per agg-ID - Cleanup — CircularBuffer: oldest window evicted, newest 8 retained - Cleanup — ReadBased: evicted after threshold reads, unread window kept - Concurrency: 8-thread concurrent inserts (no data loss), 8-thread concurrent reads (each returns full result set) Two test entry points exercise both existing implementations: contract_per_key — LockStrategy::PerKey (reference) contract_global — LockStrategy::Global Adding a new Store implementation requires only a new #[test] function that calls run_contract_suite() with the new factory. Co-Authored-By: Claude Sonnet 4.6 --- .../src/tests/store_correctness_tests.rs | 451 ++---------------- 1 file changed, 46 insertions(+), 405 deletions(-) diff --git a/asap-query-engine/src/tests/store_correctness_tests.rs b/asap-query-engine/src/tests/store_correctness_tests.rs index efc41bd..4f1708e 100644 --- a/asap-query-engine/src/tests/store_correctness_tests.rs +++ b/asap-query-engine/src/tests/store_correctness_tests.rs @@ -10,9 +10,6 @@ //! - Earliest-timestamp tracking //! - Cleanup policies (circular-buffer and read-based) //! - Concurrent insert and read safety -//! - **Clone fidelity** for every supported accumulator type -//! - **Keyed (label-grouped) entries** -//! - **`DeltaSetAggregator` cleanup exclusion** //! //! ## Adding a new implementation //! @@ -27,32 +24,26 @@ //! | `contract_per_key` | `LockStrategy::PerKey` (reference impl) | //! | `contract_global` | `LockStrategy::Global` | -use crate::data_model::{ - CleanupPolicy, KeyByLabelValues, LockStrategy, Measurement, SerializableToSink, StreamingConfig, -}; -use crate::precompute_operators::{ - CountMinSketchAccumulator, CountMinSketchWithHeapAccumulator, DatasketchesKLLAccumulator, - DeltaSetAggregatorAccumulator, HydraKllSketchAccumulator, IncreaseAccumulator, - MinMaxAccumulator, MultipleMinMaxAccumulator, MultipleSumAccumulator, SetAggregatorAccumulator, - SumAccumulator, -}; +use crate::data_model::{CleanupPolicy, LockStrategy, StreamingConfig}; +use crate::precompute_operators::SumAccumulator; use crate::stores::{Store, TimestampedBucketsMap}; use crate::{AggregateCore, AggregationConfig, PrecomputedOutput, SimpleMapStore}; use promql_utilities::data_model::KeyByLabelNames; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; // ── store / config factories ────────────────────────────────────────────────── +/// Build an `AggregationConfig` for a single aggregation ID with optional +/// retention / read-threshold limits. fn make_agg_config( agg_id: u64, - aggregation_type: &str, num_aggregates_to_retain: Option, read_count_threshold: Option, ) -> AggregationConfig { AggregationConfig::new( agg_id, - aggregation_type.to_string(), + "Sum".to_string(), "".to_string(), HashMap::new(), KeyByLabelNames::empty(), @@ -71,81 +62,52 @@ fn make_agg_config( ) } -fn make_streaming_config(ids: &[(u64, &str, Option, Option)]) -> Arc { +/// Build a `StreamingConfig` from a slice of `(agg_id, retain, read_threshold)`. +fn make_streaming_config(ids: &[(u64, Option, Option)]) -> Arc { let configs = ids .iter() - .map(|&(id, agg_type, retain, threshold)| { - (id, make_agg_config(id, agg_type, retain, threshold)) - }) + .map(|&(id, retain, threshold)| (id, make_agg_config(id, retain, threshold))) .collect(); Arc::new(StreamingConfig::new(configs)) } +/// Build a `SimpleMapStore` with explicit cleanup policy and aggregation IDs. fn make_store( strategy: LockStrategy, policy: CleanupPolicy, - ids: &[(u64, &str, Option, Option)], + ids: &[(u64, Option, Option)], ) -> SimpleMapStore { let config = make_streaming_config(ids); SimpleMapStore::new_with_strategy(config, policy, strategy) } -/// Convenience: single agg_id=1, type "Sum", no cleanup. +/// Convenience: single agg_id=1, no cleanup. fn make_store_simple(strategy: LockStrategy) -> SimpleMapStore { - make_store( - strategy, - CleanupPolicy::NoCleanup, - &[(1, "Sum", None, None)], - ) + make_store(strategy, CleanupPolicy::NoCleanup, &[(1, None, None)]) } // ── data helpers ────────────────────────────────────────────────────────────── -/// Build a `(PrecomputedOutput, accumulator)` pair with no label key. -fn unkeyed_entry( - agg_id: u64, - start: u64, - end: u64, - acc: Box, -) -> (PrecomputedOutput, Box) { - (PrecomputedOutput::new(start, end, None, agg_id), acc) -} - -/// Build a `(PrecomputedOutput, accumulator)` pair with a label key. -fn keyed_entry( - agg_id: u64, - start: u64, - end: u64, - key: KeyByLabelValues, - acc: Box, -) -> (PrecomputedOutput, Box) { - (PrecomputedOutput::new(start, end, Some(key), agg_id), acc) -} - +/// Build a single `(PrecomputedOutput, SumAccumulator)` pair with no label key. fn sum_entry( agg_id: u64, start: u64, end: u64, value: f64, ) -> (PrecomputedOutput, Box) { - unkeyed_entry( - agg_id, - start, - end, - Box::new(SumAccumulator::with_sum(value)), - ) -} - -fn key(labels: &[&str]) -> KeyByLabelValues { - KeyByLabelValues::new_with_labels(labels.iter().map(|s| s.to_string()).collect()) + let output = PrecomputedOutput::new(start, end, None, agg_id); + let acc: Box = Box::new(SumAccumulator::with_sum(value)); + (output, acc) } // ── result inspection helpers ───────────────────────────────────────────────── +/// Total number of accumulator entries across all label keys. fn total_bucket_count(result: &TimestampedBucketsMap) -> usize { result.values().map(|v| v.len()).sum() } +/// Sorted `(start, end)` timestamp ranges for the `None`-keyed (unkeyed) bucket list. fn timestamps_for_none_key(result: &TimestampedBucketsMap) -> Vec<(u64, u64)> { let mut ts: Vec<(u64, u64)> = result .get(&None) @@ -155,15 +117,7 @@ fn timestamps_for_none_key(result: &TimestampedBucketsMap) -> Vec<(u64, u64)> { ts } -fn timestamps_for_key(result: &TimestampedBucketsMap, k: &KeyByLabelValues) -> Vec<(u64, u64)> { - let mut ts: Vec<(u64, u64)> = result - .get(&Some(k.clone())) - .map(|buckets| buckets.iter().map(|(range, _)| *range).collect()) - .unwrap_or_default(); - ts.sort_unstable(); - ts -} - +/// Human-readable label for a lock strategy (used in assertion messages). fn label(strategy: LockStrategy) -> &'static str { match strategy { LockStrategy::PerKey => "per_key", @@ -173,8 +127,10 @@ fn label(strategy: LockStrategy) -> &'static str { // ── contract suite ──────────────────────────────────────────────────────────── +/// Run every contract test against a store built with `strategy`. +/// +/// Call this from a `#[test]` function to register a new implementation. pub fn run_contract_suite(strategy: LockStrategy) { - // Basic store behaviour test_empty_store_range_query(strategy); test_empty_store_exact_query(strategy); test_empty_store_earliest_timestamp(strategy); @@ -189,33 +145,10 @@ pub fn run_contract_suite(strategy: LockStrategy) { test_multiple_agg_ids_are_isolated(strategy); test_earliest_timestamp_tracks_minimum_across_inserts(strategy); test_earliest_timestamp_tracked_per_agg_id(strategy); - - // Cleanup policies test_cleanup_circular_buffer_evicts_oldest_window(strategy); test_cleanup_circular_buffer_retains_newest_windows(strategy); test_cleanup_read_based_evicts_after_threshold_reads(strategy); test_cleanup_read_based_unread_window_is_retained(strategy); - test_delta_set_aggregator_bypasses_cleanup(strategy); - - // Keyed (label-grouped) entries - test_keyed_entries_grouped_by_key(strategy); - test_keyed_and_unkeyed_entries_coexist(strategy); - test_multiple_keys_same_window(strategy); - - // Clone fidelity for every supported accumulator type - test_clone_fidelity_sum(strategy); - test_clone_fidelity_min_max(strategy); - test_clone_fidelity_kll(strategy); - test_clone_fidelity_increase(strategy); - test_clone_fidelity_multiple_sum(strategy); - test_clone_fidelity_multiple_min_max(strategy); - test_clone_fidelity_set_aggregator(strategy); - test_clone_fidelity_delta_set_aggregator(strategy); - test_clone_fidelity_count_min_sketch(strategy); - test_clone_fidelity_count_min_sketch_with_heap(strategy); - test_clone_fidelity_hydra_kll(strategy); - - // Concurrency test_concurrent_inserts_no_data_loss(strategy); test_concurrent_reads_return_complete_results(strategy); } @@ -285,6 +218,7 @@ fn test_single_insert_range_query_outside_range_returns_empty(strategy: LockStra let (out, acc) = sum_entry(1, 1_000, 2_000, 1.0); store.insert_precomputed_output(out, acc).unwrap(); + // Query a range that does not cover [1_000, 2_000]. let result = store .query_precomputed_output("cpu_usage", 1, 5_000, 10_000) .unwrap(); @@ -365,7 +299,7 @@ fn test_batch_insert_full_range_query_returns_all(strategy: LockStrategy) { fn test_batch_insert_results_are_chronologically_ordered(strategy: LockStrategy) { let store = make_store_simple(strategy); let n = 10usize; - // Insert in reverse chronological order to confirm the store sorts results. + // Insert in reverse chronological order to confirm sorting. let batch: Vec<_> = (0..n as u64) .rev() .map(|i| sum_entry(1, i * 60_000, (i + 1) * 60_000, i as f64)) @@ -391,6 +325,7 @@ fn test_batch_insert_results_are_chronologically_ordered(strategy: LockStrategy) fn test_range_query_returns_only_windows_within_range(strategy: LockStrategy) { let store = make_store_simple(strategy); + // Insert 5 windows: [0,60k), [60k,120k), [120k,180k), [180k,240k), [240k,300k) for i in 0u64..5 { let (out, acc) = sum_entry(1, i * 60_000, (i + 1) * 60_000, i as f64); store.insert_precomputed_output(out, acc).unwrap(); @@ -413,7 +348,7 @@ fn test_multiple_agg_ids_are_isolated(strategy: LockStrategy) { let store = make_store( strategy, CleanupPolicy::NoCleanup, - &[(1, "Sum", None, None), (2, "Sum", None, None)], + &[(1, None, None), (2, None, None)], ); let (o1, a1) = sum_entry(1, 1_000, 2_000, 10.0); let (o2, a2) = sum_entry(2, 3_000, 4_000, 20.0); @@ -457,6 +392,7 @@ fn test_multiple_agg_ids_are_isolated(strategy: LockStrategy) { fn test_earliest_timestamp_tracks_minimum_across_inserts(strategy: LockStrategy) { let store = make_store_simple(strategy); + // Insert in a non-monotone order so the minimum is not simply the last write. for &start in &[5_000u64, 1_000, 3_000] { let (out, acc) = sum_entry(1, start, start + 1_000, 1.0); store.insert_precomputed_output(out, acc).unwrap(); @@ -465,7 +401,7 @@ fn test_earliest_timestamp_tracks_minimum_across_inserts(strategy: LockStrategy) assert_eq!( result.get(&1).copied(), Some(1_000), - "[{}] earliest timestamp must be the global minimum, not insertion-order minimum", + "[{}] earliest timestamp must be the global minimum, not the insertion order minimum", label(strategy) ); } @@ -474,7 +410,7 @@ fn test_earliest_timestamp_tracked_per_agg_id(strategy: LockStrategy) { let store = make_store( strategy, CleanupPolicy::NoCleanup, - &[(1, "Sum", None, None), (2, "Sum", None, None)], + &[(1, None, None), (2, None, None)], ); let (o1, a1) = sum_entry(1, 1_000, 2_000, 1.0); let (o2, a2) = sum_entry(2, 9_000, 10_000, 1.0); @@ -504,12 +440,14 @@ fn test_cleanup_circular_buffer_evicts_oldest_window(strategy: LockStrategy) { let store = make_store( strategy, CleanupPolicy::CircularBuffer, - &[(1, "Sum", Some(2), None)], + &[(1, Some(2), None)], ); for i in 0u64..9 { let (out, acc) = sum_entry(1, i * 60_000, (i + 1) * 60_000, i as f64); store.insert_precomputed_output(out, acc).unwrap(); } + + // Window 0: [0, 60_000) must have been evicted. let evicted = store .query_precomputed_output_exact("cpu_usage", 1, 0, 60_000) .unwrap(); @@ -521,15 +459,18 @@ fn test_cleanup_circular_buffer_evicts_oldest_window(strategy: LockStrategy) { } fn test_cleanup_circular_buffer_retains_newest_windows(strategy: LockStrategy) { + // Same setup as above: retention_limit = 8, insert 9. let store = make_store( strategy, CleanupPolicy::CircularBuffer, - &[(1, "Sum", Some(2), None)], + &[(1, Some(2), None)], ); for i in 0u64..9 { let (out, acc) = sum_entry(1, i * 60_000, (i + 1) * 60_000, i as f64); store.insert_precomputed_output(out, acc).unwrap(); } + + // Windows 1–8 must still be present. let result = store .query_precomputed_output("cpu_usage", 1, 60_000, 9 * 60_000) .unwrap(); @@ -544,17 +485,13 @@ fn test_cleanup_circular_buffer_retains_newest_windows(strategy: LockStrategy) { // ── cleanup: read-based ─────────────────────────────────────────────────────── fn test_cleanup_read_based_evicts_after_threshold_reads(strategy: LockStrategy) { - // read_count_threshold = 2: evicted once read count reaches 2. - // Cleanup runs on every insert. - let store = make_store( - strategy, - CleanupPolicy::ReadBased, - &[(1, "Sum", None, Some(2))], - ); + // read_count_threshold = 2: a window is evicted once its read count reaches 2. + // Cleanup runs on every insert, so we need an insert after the threshold is met. + let store = make_store(strategy, CleanupPolicy::ReadBased, &[(1, None, Some(2))]); let (out, acc) = sum_entry(1, 1_000, 2_000, 1.0); store.insert_precomputed_output(out, acc).unwrap(); - // Read 1 — count becomes 1, window kept on next insert. + // Read 1 — count becomes 1 (< threshold 2), window kept on next insert. store .query_precomputed_output("cpu_usage", 1, 0, u64::MAX) .unwrap(); @@ -571,7 +508,7 @@ fn test_cleanup_read_based_evicts_after_threshold_reads(strategy: LockStrategy) label(strategy) ); - // Read 2 — count becomes 2, evicted on the next insert. + // Read 2 — count becomes 2 (== threshold), evicted on the next insert. store .query_precomputed_output("cpu_usage", 1, 0, 2_000) .unwrap(); @@ -589,15 +526,12 @@ fn test_cleanup_read_based_evicts_after_threshold_reads(strategy: LockStrategy) } fn test_cleanup_read_based_unread_window_is_retained(strategy: LockStrategy) { - let store = make_store( - strategy, - CleanupPolicy::ReadBased, - &[(1, "Sum", None, Some(1))], - ); + // A window that has never been read must not be evicted by read-based cleanup. + let store = make_store(strategy, CleanupPolicy::ReadBased, &[(1, None, Some(1))]); let (out, acc) = sum_entry(1, 1_000, 2_000, 1.0); store.insert_precomputed_output(out, acc).unwrap(); - // Insert more windows without reading window 0 — cleanup runs each time. + // Insert more windows without ever reading window 0 — cleanup runs each time. for i in 1u64..5 { let (o, a) = sum_entry(1, i * 10_000, (i + 1) * 10_000, i as f64); store.insert_precomputed_output(o, a).unwrap(); @@ -614,300 +548,6 @@ fn test_cleanup_read_based_unread_window_is_retained(strategy: LockStrategy) { ); } -// ── cleanup: DeltaSetAggregator exclusion ───────────────────────────────────── - -fn test_delta_set_aggregator_bypasses_cleanup(strategy: LockStrategy) { - // The store skips cleanup entirely when aggregation_type == "DeltaSetAggregator". - // retention_limit = 2 * 4 = 8. Inserting 10 windows must not evict any. - let store = make_store( - strategy, - CleanupPolicy::CircularBuffer, - &[(1, "DeltaSetAggregator", Some(2), None)], - ); - let n = 10u64; - for i in 0..n { - let mut acc = DeltaSetAggregatorAccumulator::new(); - acc.add_key(key(&[&format!("host{i}")])); - let (out, boxed) = unkeyed_entry(1, i * 60_000, (i + 1) * 60_000, Box::new(acc)); - store.insert_precomputed_output(out, boxed).unwrap(); - } - - let result = store - .query_precomputed_output("cpu_usage", 1, 0, n * 60_000) - .unwrap(); - assert_eq!( - total_bucket_count(&result), - n as usize, - "[{}] DeltaSetAggregator windows must never be evicted by cleanup", - label(strategy) - ); -} - -// ── keyed (label-grouped) entries ───────────────────────────────────────────── - -fn test_keyed_entries_grouped_by_key(strategy: LockStrategy) { - let store = make_store_simple(strategy); - let k1 = key(&["host1"]); - let k2 = key(&["host2"]); - - // Same timestamp window, two different keys. - let (o1, a1) = keyed_entry( - 1, - 1_000, - 2_000, - k1.clone(), - Box::new(SumAccumulator::with_sum(10.0)), - ); - let (o2, a2) = keyed_entry( - 1, - 1_000, - 2_000, - k2.clone(), - Box::new(SumAccumulator::with_sum(20.0)), - ); - store.insert_precomputed_output(o1, a1).unwrap(); - store.insert_precomputed_output(o2, a2).unwrap(); - - let result = store - .query_precomputed_output("cpu_usage", 1, 0, u64::MAX) - .unwrap(); - - // Two distinct keys in the result map. - assert_eq!( - result.len(), - 2, - "[{}] two different label keys must produce two entries in the result map", - label(strategy) - ); - assert_eq!( - timestamps_for_key(&result, &k1), - vec![(1_000, 2_000)], - "[{}] key1 must map to correct timestamp range", - label(strategy) - ); - assert_eq!( - timestamps_for_key(&result, &k2), - vec![(1_000, 2_000)], - "[{}] key2 must map to correct timestamp range", - label(strategy) - ); -} - -fn test_keyed_and_unkeyed_entries_coexist(strategy: LockStrategy) { - let store = make_store_simple(strategy); - let k = key(&["region", "us-east"]); - - let (o_none, a_none) = sum_entry(1, 1_000, 2_000, 1.0); - let (o_keyed, a_keyed) = keyed_entry( - 1, - 3_000, - 4_000, - k.clone(), - Box::new(SumAccumulator::with_sum(2.0)), - ); - store.insert_precomputed_output(o_none, a_none).unwrap(); - store.insert_precomputed_output(o_keyed, a_keyed).unwrap(); - - let result = store - .query_precomputed_output("cpu_usage", 1, 0, u64::MAX) - .unwrap(); - - assert_eq!( - result.len(), - 2, - "[{}] None and Some(key) entries must produce two separate map keys", - label(strategy) - ); - assert_eq!( - timestamps_for_none_key(&result), - vec![(1_000, 2_000)], - "[{}] None-keyed entry must appear under None key", - label(strategy) - ); - assert_eq!( - timestamps_for_key(&result, &k), - vec![(3_000, 4_000)], - "[{}] labelled entry must appear under its key", - label(strategy) - ); -} - -fn test_multiple_keys_same_window(strategy: LockStrategy) { - // Many keyed entries for the same timestamp window — common in grouped aggregations. - let store = make_store_simple(strategy); - let keys: Vec = (0..5).map(|i| key(&[&format!("shard{i}")])).collect(); - - for k in &keys { - let (out, acc) = keyed_entry( - 1, - 1_000, - 2_000, - k.clone(), - Box::new(SumAccumulator::with_sum(1.0)), - ); - store.insert_precomputed_output(out, acc).unwrap(); - } - - let result = store - .query_precomputed_output("cpu_usage", 1, 0, u64::MAX) - .unwrap(); - assert_eq!( - result.len(), - 5, - "[{}] five different keys for the same window must produce five map entries", - label(strategy) - ); - for k in &keys { - assert_eq!( - timestamps_for_key(&result, k), - vec![(1_000, 2_000)], - "[{}] each key must resolve to the correct window", - label(strategy) - ); - } -} - -// ── clone fidelity for all accumulator types ────────────────────────────────── -// -// Each test inserts a non-trivial accumulator, queries it back through the store -// (which calls clone_boxed_core() internally), and asserts that serialize_to_json() -// on the original and the retrieved copy produce identical output. - -fn roundtrip( - strategy: LockStrategy, - original: A, -) -> (Box, Box) { - let store = make_store_simple(strategy); - let original_box: Box = Box::new(original); - let original_json = original_box.serialize_to_json(); - - let (out, acc) = unkeyed_entry(1, 1_000, 2_000, original_box); - store.insert_precomputed_output(out, acc).unwrap(); - - let result = store - .query_precomputed_output("cpu_usage", 1, 0, u64::MAX) - .unwrap(); - let retrieved = result - .get(&None) - .unwrap() - .first() - .map(|(_, acc)| acc.clone_boxed_core()) - .unwrap(); - - // Reconstruct original from JSON for comparison (original_box was consumed). - // We compare the stored JSON (captured before insert) against the retrieved one. - let placeholder: Box = Box::new(SumAccumulator::with_sum(0.0)); - // Use a wrapper that returns the captured JSON for comparison. - let _ = placeholder; - - // Return a SumAccumulator that carries the original JSON as a workaround — - // instead, compare directly here using the captured JSON. - let retrieved_json = retrieved.serialize_to_json(); - assert_eq!( - original_json, - retrieved_json, - "[{}] clone_boxed_core must produce identical serialization", - label(strategy) - ); - - // Return something for callers that want the retrieved accumulator directly. - (Box::new(SumAccumulator::with_sum(0.0)), retrieved) -} - -fn test_clone_fidelity_sum(strategy: LockStrategy) { - let acc = SumAccumulator::with_sum(99.5); - roundtrip(strategy, acc); -} - -fn test_clone_fidelity_min_max(strategy: LockStrategy) { - let acc = MinMaxAccumulator::with_value(42.0, "max".to_string()); - roundtrip(strategy, acc); -} - -fn test_clone_fidelity_kll(strategy: LockStrategy) { - let mut acc = DatasketchesKLLAccumulator::new(200); - for v in [1.0, 5.0, 10.0, 50.0, 100.0] { - acc._update(v); - } - roundtrip(strategy, acc); -} - -fn test_clone_fidelity_increase(strategy: LockStrategy) { - let acc = IncreaseAccumulator::new(Measurement::new(1.0), 100, Measurement::new(50.0), 500); - roundtrip(strategy, acc); -} - -fn test_clone_fidelity_multiple_sum(strategy: LockStrategy) { - let mut sums = HashMap::new(); - sums.insert(key(&["host1"]), 10.0); - sums.insert(key(&["host2"]), 20.0); - let acc = MultipleSumAccumulator::new_with_sums(sums); - roundtrip(strategy, acc); -} - -fn test_clone_fidelity_multiple_min_max(strategy: LockStrategy) { - let mut values = HashMap::new(); - values.insert(key(&["dc", "east"]), 77.7); - values.insert(key(&["dc", "west"]), 33.3); - let acc = MultipleMinMaxAccumulator::new_with_values(values, "max".to_string()); - roundtrip(strategy, acc); -} - -fn test_clone_fidelity_set_aggregator(strategy: LockStrategy) { - let mut added = HashSet::new(); - added.insert(key(&["svc", "alpha"])); - added.insert(key(&["svc", "beta"])); - let acc = SetAggregatorAccumulator::with_added(added); - roundtrip(strategy, acc); -} - -fn test_clone_fidelity_delta_set_aggregator(strategy: LockStrategy) { - // Use a "Sum"-typed config so cleanup is not skipped for this test. - let store = make_store_simple(strategy); - - let mut acc = DeltaSetAggregatorAccumulator::new(); - acc.add_key(key(&["svc", "added-1"])); - acc.remove_key(key(&["svc", "removed-1"])); - let original_json = acc.serialize_to_json(); - - let acc_box: Box = Box::new(acc); - let (out, boxed) = unkeyed_entry(1, 1_000, 2_000, acc_box); - store.insert_precomputed_output(out, boxed).unwrap(); - - let result = store - .query_precomputed_output("cpu_usage", 1, 0, u64::MAX) - .unwrap(); - let retrieved = &result.get(&None).unwrap()[0].1; - assert_eq!( - original_json, - retrieved.serialize_to_json(), - "[{}] DeltaSetAggregatorAccumulator: clone must preserve added/removed sets", - label(strategy) - ); -} - -fn test_clone_fidelity_count_min_sketch(strategy: LockStrategy) { - // CountMinSketch._update is private; test clone fidelity of an initialised (empty) sketch. - let acc = CountMinSketchAccumulator::new(5, 100); - roundtrip(strategy, acc); -} - -fn test_clone_fidelity_count_min_sketch_with_heap(strategy: LockStrategy) { - let acc = CountMinSketchWithHeapAccumulator::new(5, 100, 10); - roundtrip(strategy, acc); -} - -fn test_clone_fidelity_hydra_kll(strategy: LockStrategy) { - let mut acc = HydraKllSketchAccumulator::new(4, 50, 200); - let k1 = key(&["shard", "0"]); - let k2 = key(&["shard", "1"]); - for v in [1.0f64, 10.0, 100.0] { - acc.update(&k1, v); - acc.update(&k2, v * 2.0); - } - roundtrip(strategy, acc); -} - // ── concurrency ─────────────────────────────────────────────────────────────── fn test_concurrent_inserts_no_data_loss(strategy: LockStrategy) { @@ -920,6 +560,7 @@ fn test_concurrent_inserts_no_data_loss(strategy: LockStrategy) { let store = store.clone(); std::thread::spawn(move || { for w in 0..windows_per_thread { + // Each thread writes to a unique timestamp range — no conflicts. let base = (t * windows_per_thread + w) as u64; let (out, acc) = sum_entry(1, base * 1_000, (base + 1) * 1_000, base as f64); store.insert_precomputed_output(out, acc).unwrap(); From 09a2ebfedbf431214e084b406a6e8a1f15f545a8 Mon Sep 17 00:00:00 2001 From: zz_y Date: Thu, 19 Mar 2026 21:31:39 -0500 Subject: [PATCH 04/27] Extend store contract tests: all accumulator types, keyed entries, DeltaSet exclusion - Add SerializableToSink import so clone-fidelity tests compile on concrete types - Clone fidelity tests for all 11 accumulator types: SumAccumulator, MinMaxAccumulator, DatasketchesKLLAccumulator, IncreaseAccumulator, MultipleSumAccumulator, MultipleMinMaxAccumulator, SetAggregatorAccumulator, DeltaSetAggregatorAccumulator, CountMinSketchAccumulator, CountMinSketchWithHeapAccumulator, HydraKllSketchAccumulator - Three keyed-entry tests: grouping by key, coexistence of keyed/unkeyed, multiple keys per window - DeltaSetAggregator cleanup exclusion test - Concurrency tests: concurrent inserts (8 threads) and concurrent reads Co-Authored-By: Claude Sonnet 4.6 --- .../src/tests/store_correctness_tests.rs | 453 ++++++++++++++++-- 1 file changed, 405 insertions(+), 48 deletions(-) diff --git a/asap-query-engine/src/tests/store_correctness_tests.rs b/asap-query-engine/src/tests/store_correctness_tests.rs index 4f1708e..98e2589 100644 --- a/asap-query-engine/src/tests/store_correctness_tests.rs +++ b/asap-query-engine/src/tests/store_correctness_tests.rs @@ -10,6 +10,9 @@ //! - Earliest-timestamp tracking //! - Cleanup policies (circular-buffer and read-based) //! - Concurrent insert and read safety +//! - **Clone fidelity** for every supported accumulator type +//! - **Keyed (label-grouped) entries** +//! - **`DeltaSetAggregator` cleanup exclusion** //! //! ## Adding a new implementation //! @@ -24,26 +27,30 @@ //! | `contract_per_key` | `LockStrategy::PerKey` (reference impl) | //! | `contract_global` | `LockStrategy::Global` | -use crate::data_model::{CleanupPolicy, LockStrategy, StreamingConfig}; -use crate::precompute_operators::SumAccumulator; +use crate::data_model::{CleanupPolicy, KeyByLabelValues, LockStrategy, Measurement, SerializableToSink, StreamingConfig}; +use crate::precompute_operators::{ + CountMinSketchAccumulator, CountMinSketchWithHeapAccumulator, DatasketchesKLLAccumulator, + DeltaSetAggregatorAccumulator, HydraKllSketchAccumulator, IncreaseAccumulator, + MinMaxAccumulator, MultipleMinMaxAccumulator, MultipleSumAccumulator, + SetAggregatorAccumulator, SumAccumulator, +}; use crate::stores::{Store, TimestampedBucketsMap}; use crate::{AggregateCore, AggregationConfig, PrecomputedOutput, SimpleMapStore}; use promql_utilities::data_model::KeyByLabelNames; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; // ── store / config factories ────────────────────────────────────────────────── -/// Build an `AggregationConfig` for a single aggregation ID with optional -/// retention / read-threshold limits. fn make_agg_config( agg_id: u64, + aggregation_type: &str, num_aggregates_to_retain: Option, read_count_threshold: Option, ) -> AggregationConfig { AggregationConfig::new( agg_id, - "Sum".to_string(), + aggregation_type.to_string(), "".to_string(), HashMap::new(), KeyByLabelNames::empty(), @@ -62,52 +69,77 @@ fn make_agg_config( ) } -/// Build a `StreamingConfig` from a slice of `(agg_id, retain, read_threshold)`. -fn make_streaming_config(ids: &[(u64, Option, Option)]) -> Arc { +fn make_streaming_config(ids: &[(u64, &str, Option, Option)]) -> Arc { let configs = ids .iter() - .map(|&(id, retain, threshold)| (id, make_agg_config(id, retain, threshold))) + .map(|&(id, agg_type, retain, threshold)| { + (id, make_agg_config(id, agg_type, retain, threshold)) + }) .collect(); Arc::new(StreamingConfig::new(configs)) } -/// Build a `SimpleMapStore` with explicit cleanup policy and aggregation IDs. fn make_store( strategy: LockStrategy, policy: CleanupPolicy, - ids: &[(u64, Option, Option)], + ids: &[(u64, &str, Option, Option)], ) -> SimpleMapStore { let config = make_streaming_config(ids); SimpleMapStore::new_with_strategy(config, policy, strategy) } -/// Convenience: single agg_id=1, no cleanup. +/// Convenience: single agg_id=1, type "Sum", no cleanup. fn make_store_simple(strategy: LockStrategy) -> SimpleMapStore { - make_store(strategy, CleanupPolicy::NoCleanup, &[(1, None, None)]) + make_store(strategy, CleanupPolicy::NoCleanup, &[(1, "Sum", None, None)]) } // ── data helpers ────────────────────────────────────────────────────────────── -/// Build a single `(PrecomputedOutput, SumAccumulator)` pair with no label key. +/// Build a `(PrecomputedOutput, accumulator)` pair with no label key. +fn unkeyed_entry( + agg_id: u64, + start: u64, + end: u64, + acc: Box, +) -> (PrecomputedOutput, Box) { + (PrecomputedOutput::new(start, end, None, agg_id), acc) +} + +/// Build a `(PrecomputedOutput, accumulator)` pair with a label key. +fn keyed_entry( + agg_id: u64, + start: u64, + end: u64, + key: KeyByLabelValues, + acc: Box, +) -> (PrecomputedOutput, Box) { + (PrecomputedOutput::new(start, end, Some(key), agg_id), acc) +} + fn sum_entry( agg_id: u64, start: u64, end: u64, value: f64, ) -> (PrecomputedOutput, Box) { - let output = PrecomputedOutput::new(start, end, None, agg_id); - let acc: Box = Box::new(SumAccumulator::with_sum(value)); - (output, acc) + unkeyed_entry( + agg_id, + start, + end, + Box::new(SumAccumulator::with_sum(value)), + ) +} + +fn key(labels: &[&str]) -> KeyByLabelValues { + KeyByLabelValues::new_with_labels(labels.iter().map(|s| s.to_string()).collect()) } // ── result inspection helpers ───────────────────────────────────────────────── -/// Total number of accumulator entries across all label keys. fn total_bucket_count(result: &TimestampedBucketsMap) -> usize { result.values().map(|v| v.len()).sum() } -/// Sorted `(start, end)` timestamp ranges for the `None`-keyed (unkeyed) bucket list. fn timestamps_for_none_key(result: &TimestampedBucketsMap) -> Vec<(u64, u64)> { let mut ts: Vec<(u64, u64)> = result .get(&None) @@ -117,7 +149,15 @@ fn timestamps_for_none_key(result: &TimestampedBucketsMap) -> Vec<(u64, u64)> { ts } -/// Human-readable label for a lock strategy (used in assertion messages). +fn timestamps_for_key(result: &TimestampedBucketsMap, k: &KeyByLabelValues) -> Vec<(u64, u64)> { + let mut ts: Vec<(u64, u64)> = result + .get(&Some(k.clone())) + .map(|buckets| buckets.iter().map(|(range, _)| *range).collect()) + .unwrap_or_default(); + ts.sort_unstable(); + ts +} + fn label(strategy: LockStrategy) -> &'static str { match strategy { LockStrategy::PerKey => "per_key", @@ -125,12 +165,28 @@ fn label(strategy: LockStrategy) -> &'static str { } } +/// Assert that two accumulators produce identical JSON after a store roundtrip. +/// Uses `serialize_to_json()` which is available on all `AggregateCore` impls +/// via the `SerializableToSink` supertrait. +fn assert_clone_fidelity( + original: &dyn AggregateCore, + from_store: &dyn AggregateCore, + type_name: &str, + strategy: LockStrategy, +) { + let orig_json = original.serialize_to_json(); + let stored_json = from_store.serialize_to_json(); + assert_eq!( + orig_json, stored_json, + "[{}] {type_name}: clone_boxed_core() must produce identical serialization", + label(strategy) + ); +} + // ── contract suite ──────────────────────────────────────────────────────────── -/// Run every contract test against a store built with `strategy`. -/// -/// Call this from a `#[test]` function to register a new implementation. pub fn run_contract_suite(strategy: LockStrategy) { + // Basic store behaviour test_empty_store_range_query(strategy); test_empty_store_exact_query(strategy); test_empty_store_earliest_timestamp(strategy); @@ -145,10 +201,33 @@ pub fn run_contract_suite(strategy: LockStrategy) { test_multiple_agg_ids_are_isolated(strategy); test_earliest_timestamp_tracks_minimum_across_inserts(strategy); test_earliest_timestamp_tracked_per_agg_id(strategy); + + // Cleanup policies test_cleanup_circular_buffer_evicts_oldest_window(strategy); test_cleanup_circular_buffer_retains_newest_windows(strategy); test_cleanup_read_based_evicts_after_threshold_reads(strategy); test_cleanup_read_based_unread_window_is_retained(strategy); + test_delta_set_aggregator_bypasses_cleanup(strategy); + + // Keyed (label-grouped) entries + test_keyed_entries_grouped_by_key(strategy); + test_keyed_and_unkeyed_entries_coexist(strategy); + test_multiple_keys_same_window(strategy); + + // Clone fidelity for every supported accumulator type + test_clone_fidelity_sum(strategy); + test_clone_fidelity_min_max(strategy); + test_clone_fidelity_kll(strategy); + test_clone_fidelity_increase(strategy); + test_clone_fidelity_multiple_sum(strategy); + test_clone_fidelity_multiple_min_max(strategy); + test_clone_fidelity_set_aggregator(strategy); + test_clone_fidelity_delta_set_aggregator(strategy); + test_clone_fidelity_count_min_sketch(strategy); + test_clone_fidelity_count_min_sketch_with_heap(strategy); + test_clone_fidelity_hydra_kll(strategy); + + // Concurrency test_concurrent_inserts_no_data_loss(strategy); test_concurrent_reads_return_complete_results(strategy); } @@ -218,7 +297,6 @@ fn test_single_insert_range_query_outside_range_returns_empty(strategy: LockStra let (out, acc) = sum_entry(1, 1_000, 2_000, 1.0); store.insert_precomputed_output(out, acc).unwrap(); - // Query a range that does not cover [1_000, 2_000]. let result = store .query_precomputed_output("cpu_usage", 1, 5_000, 10_000) .unwrap(); @@ -299,7 +377,7 @@ fn test_batch_insert_full_range_query_returns_all(strategy: LockStrategy) { fn test_batch_insert_results_are_chronologically_ordered(strategy: LockStrategy) { let store = make_store_simple(strategy); let n = 10usize; - // Insert in reverse chronological order to confirm sorting. + // Insert in reverse chronological order to confirm the store sorts results. let batch: Vec<_> = (0..n as u64) .rev() .map(|i| sum_entry(1, i * 60_000, (i + 1) * 60_000, i as f64)) @@ -314,8 +392,7 @@ fn test_batch_insert_results_are_chronologically_ordered(strategy: LockStrategy) .map(|i| (i * 60_000, (i + 1) * 60_000)) .collect(); assert_eq!( - ts, - expected, + ts, expected, "[{}] range query results must be in chronological (ascending start) order", label(strategy) ); @@ -325,7 +402,6 @@ fn test_batch_insert_results_are_chronologically_ordered(strategy: LockStrategy) fn test_range_query_returns_only_windows_within_range(strategy: LockStrategy) { let store = make_store_simple(strategy); - // Insert 5 windows: [0,60k), [60k,120k), [120k,180k), [180k,240k), [240k,300k) for i in 0u64..5 { let (out, acc) = sum_entry(1, i * 60_000, (i + 1) * 60_000, i as f64); store.insert_precomputed_output(out, acc).unwrap(); @@ -348,7 +424,7 @@ fn test_multiple_agg_ids_are_isolated(strategy: LockStrategy) { let store = make_store( strategy, CleanupPolicy::NoCleanup, - &[(1, None, None), (2, None, None)], + &[(1, "Sum", None, None), (2, "Sum", None, None)], ); let (o1, a1) = sum_entry(1, 1_000, 2_000, 10.0); let (o2, a2) = sum_entry(2, 3_000, 4_000, 20.0); @@ -392,7 +468,6 @@ fn test_multiple_agg_ids_are_isolated(strategy: LockStrategy) { fn test_earliest_timestamp_tracks_minimum_across_inserts(strategy: LockStrategy) { let store = make_store_simple(strategy); - // Insert in a non-monotone order so the minimum is not simply the last write. for &start in &[5_000u64, 1_000, 3_000] { let (out, acc) = sum_entry(1, start, start + 1_000, 1.0); store.insert_precomputed_output(out, acc).unwrap(); @@ -401,7 +476,7 @@ fn test_earliest_timestamp_tracks_minimum_across_inserts(strategy: LockStrategy) assert_eq!( result.get(&1).copied(), Some(1_000), - "[{}] earliest timestamp must be the global minimum, not the insertion order minimum", + "[{}] earliest timestamp must be the global minimum, not insertion-order minimum", label(strategy) ); } @@ -410,7 +485,7 @@ fn test_earliest_timestamp_tracked_per_agg_id(strategy: LockStrategy) { let store = make_store( strategy, CleanupPolicy::NoCleanup, - &[(1, None, None), (2, None, None)], + &[(1, "Sum", None, None), (2, "Sum", None, None)], ); let (o1, a1) = sum_entry(1, 1_000, 2_000, 1.0); let (o2, a2) = sum_entry(2, 9_000, 10_000, 1.0); @@ -440,14 +515,12 @@ fn test_cleanup_circular_buffer_evicts_oldest_window(strategy: LockStrategy) { let store = make_store( strategy, CleanupPolicy::CircularBuffer, - &[(1, Some(2), None)], + &[(1, "Sum", Some(2), None)], ); for i in 0u64..9 { let (out, acc) = sum_entry(1, i * 60_000, (i + 1) * 60_000, i as f64); store.insert_precomputed_output(out, acc).unwrap(); } - - // Window 0: [0, 60_000) must have been evicted. let evicted = store .query_precomputed_output_exact("cpu_usage", 1, 0, 60_000) .unwrap(); @@ -459,18 +532,15 @@ fn test_cleanup_circular_buffer_evicts_oldest_window(strategy: LockStrategy) { } fn test_cleanup_circular_buffer_retains_newest_windows(strategy: LockStrategy) { - // Same setup as above: retention_limit = 8, insert 9. let store = make_store( strategy, CleanupPolicy::CircularBuffer, - &[(1, Some(2), None)], + &[(1, "Sum", Some(2), None)], ); for i in 0u64..9 { let (out, acc) = sum_entry(1, i * 60_000, (i + 1) * 60_000, i as f64); store.insert_precomputed_output(out, acc).unwrap(); } - - // Windows 1–8 must still be present. let result = store .query_precomputed_output("cpu_usage", 1, 60_000, 9 * 60_000) .unwrap(); @@ -485,13 +555,17 @@ fn test_cleanup_circular_buffer_retains_newest_windows(strategy: LockStrategy) { // ── cleanup: read-based ─────────────────────────────────────────────────────── fn test_cleanup_read_based_evicts_after_threshold_reads(strategy: LockStrategy) { - // read_count_threshold = 2: a window is evicted once its read count reaches 2. - // Cleanup runs on every insert, so we need an insert after the threshold is met. - let store = make_store(strategy, CleanupPolicy::ReadBased, &[(1, None, Some(2))]); + // read_count_threshold = 2: evicted once read count reaches 2. + // Cleanup runs on every insert. + let store = make_store( + strategy, + CleanupPolicy::ReadBased, + &[(1, "Sum", None, Some(2))], + ); let (out, acc) = sum_entry(1, 1_000, 2_000, 1.0); store.insert_precomputed_output(out, acc).unwrap(); - // Read 1 — count becomes 1 (< threshold 2), window kept on next insert. + // Read 1 — count becomes 1, window kept on next insert. store .query_precomputed_output("cpu_usage", 1, 0, u64::MAX) .unwrap(); @@ -508,7 +582,7 @@ fn test_cleanup_read_based_evicts_after_threshold_reads(strategy: LockStrategy) label(strategy) ); - // Read 2 — count becomes 2 (== threshold), evicted on the next insert. + // Read 2 — count becomes 2, evicted on the next insert. store .query_precomputed_output("cpu_usage", 1, 0, 2_000) .unwrap(); @@ -526,12 +600,15 @@ fn test_cleanup_read_based_evicts_after_threshold_reads(strategy: LockStrategy) } fn test_cleanup_read_based_unread_window_is_retained(strategy: LockStrategy) { - // A window that has never been read must not be evicted by read-based cleanup. - let store = make_store(strategy, CleanupPolicy::ReadBased, &[(1, None, Some(1))]); + let store = make_store( + strategy, + CleanupPolicy::ReadBased, + &[(1, "Sum", None, Some(1))], + ); let (out, acc) = sum_entry(1, 1_000, 2_000, 1.0); store.insert_precomputed_output(out, acc).unwrap(); - // Insert more windows without ever reading window 0 — cleanup runs each time. + // Insert more windows without reading window 0 — cleanup runs each time. for i in 1u64..5 { let (o, a) = sum_entry(1, i * 10_000, (i + 1) * 10_000, i as f64); store.insert_precomputed_output(o, a).unwrap(); @@ -548,6 +625,287 @@ fn test_cleanup_read_based_unread_window_is_retained(strategy: LockStrategy) { ); } +// ── cleanup: DeltaSetAggregator exclusion ───────────────────────────────────── + +fn test_delta_set_aggregator_bypasses_cleanup(strategy: LockStrategy) { + // The store skips cleanup entirely when aggregation_type == "DeltaSetAggregator". + // retention_limit = 2 * 4 = 8. Inserting 10 windows must not evict any. + let store = make_store( + strategy, + CleanupPolicy::CircularBuffer, + &[(1, "DeltaSetAggregator", Some(2), None)], + ); + let n = 10u64; + for i in 0..n { + let mut acc = DeltaSetAggregatorAccumulator::new(); + acc.add_key(key(&[&format!("host{i}")])); + let (out, boxed) = unkeyed_entry(1, i * 60_000, (i + 1) * 60_000, Box::new(acc)); + store.insert_precomputed_output(out, boxed).unwrap(); + } + + let result = store + .query_precomputed_output("cpu_usage", 1, 0, n * 60_000) + .unwrap(); + assert_eq!( + total_bucket_count(&result), + n as usize, + "[{}] DeltaSetAggregator windows must never be evicted by cleanup", + label(strategy) + ); +} + +// ── keyed (label-grouped) entries ───────────────────────────────────────────── + +fn test_keyed_entries_grouped_by_key(strategy: LockStrategy) { + let store = make_store_simple(strategy); + let k1 = key(&["host1"]); + let k2 = key(&["host2"]); + + // Same timestamp window, two different keys. + let (o1, a1) = keyed_entry(1, 1_000, 2_000, k1.clone(), Box::new(SumAccumulator::with_sum(10.0))); + let (o2, a2) = keyed_entry(1, 1_000, 2_000, k2.clone(), Box::new(SumAccumulator::with_sum(20.0))); + store.insert_precomputed_output(o1, a1).unwrap(); + store.insert_precomputed_output(o2, a2).unwrap(); + + let result = store + .query_precomputed_output("cpu_usage", 1, 0, u64::MAX) + .unwrap(); + + // Two distinct keys in the result map. + assert_eq!( + result.len(), + 2, + "[{}] two different label keys must produce two entries in the result map", + label(strategy) + ); + assert_eq!( + timestamps_for_key(&result, &k1), + vec![(1_000, 2_000)], + "[{}] key1 must map to correct timestamp range", + label(strategy) + ); + assert_eq!( + timestamps_for_key(&result, &k2), + vec![(1_000, 2_000)], + "[{}] key2 must map to correct timestamp range", + label(strategy) + ); +} + +fn test_keyed_and_unkeyed_entries_coexist(strategy: LockStrategy) { + let store = make_store_simple(strategy); + let k = key(&["region", "us-east"]); + + let (o_none, a_none) = sum_entry(1, 1_000, 2_000, 1.0); + let (o_keyed, a_keyed) = + keyed_entry(1, 3_000, 4_000, k.clone(), Box::new(SumAccumulator::with_sum(2.0))); + store.insert_precomputed_output(o_none, a_none).unwrap(); + store.insert_precomputed_output(o_keyed, a_keyed).unwrap(); + + let result = store + .query_precomputed_output("cpu_usage", 1, 0, u64::MAX) + .unwrap(); + + assert_eq!( + result.len(), + 2, + "[{}] None and Some(key) entries must produce two separate map keys", + label(strategy) + ); + assert_eq!( + timestamps_for_none_key(&result), + vec![(1_000, 2_000)], + "[{}] None-keyed entry must appear under None key", + label(strategy) + ); + assert_eq!( + timestamps_for_key(&result, &k), + vec![(3_000, 4_000)], + "[{}] labelled entry must appear under its key", + label(strategy) + ); +} + +fn test_multiple_keys_same_window(strategy: LockStrategy) { + // Many keyed entries for the same timestamp window — common in grouped aggregations. + let store = make_store_simple(strategy); + let keys: Vec = (0..5).map(|i| key(&[&format!("shard{i}")])).collect(); + + for k in &keys { + let (out, acc) = keyed_entry( + 1, + 1_000, + 2_000, + k.clone(), + Box::new(SumAccumulator::with_sum(1.0)), + ); + store.insert_precomputed_output(out, acc).unwrap(); + } + + let result = store + .query_precomputed_output("cpu_usage", 1, 0, u64::MAX) + .unwrap(); + assert_eq!( + result.len(), + 5, + "[{}] five different keys for the same window must produce five map entries", + label(strategy) + ); + for k in &keys { + assert_eq!( + timestamps_for_key(&result, k), + vec![(1_000, 2_000)], + "[{}] each key must resolve to the correct window", + label(strategy) + ); + } +} + +// ── clone fidelity for all accumulator types ────────────────────────────────── +// +// Each test inserts a non-trivial accumulator, queries it back through the store +// (which calls clone_boxed_core() internally), and asserts that serialize_to_json() +// on the original and the retrieved copy produce identical output. + +fn roundtrip( + strategy: LockStrategy, + original: A, +) -> (Box, Box) { + let store = make_store_simple(strategy); + let original_box: Box = Box::new(original); + let original_json = original_box.serialize_to_json(); + + let (out, acc) = unkeyed_entry(1, 1_000, 2_000, original_box); + store.insert_precomputed_output(out, acc).unwrap(); + + let result = store + .query_precomputed_output("cpu_usage", 1, 0, u64::MAX) + .unwrap(); + let retrieved = result + .get(&None) + .unwrap() + .first() + .map(|(_, acc)| acc.clone_boxed_core()) + .unwrap(); + + // Reconstruct original from JSON for comparison (original_box was consumed). + // We compare the stored JSON (captured before insert) against the retrieved one. + let placeholder: Box = Box::new(SumAccumulator::with_sum(0.0)); + // Use a wrapper that returns the captured JSON for comparison. + let _ = placeholder; + + // Return a SumAccumulator that carries the original JSON as a workaround — + // instead, compare directly here using the captured JSON. + let retrieved_json = retrieved.serialize_to_json(); + assert_eq!( + original_json, retrieved_json, + "[{}] clone_boxed_core must produce identical serialization", + label(strategy) + ); + + // Return something for callers that want the retrieved accumulator directly. + (Box::new(SumAccumulator::with_sum(0.0)), retrieved) +} + +fn test_clone_fidelity_sum(strategy: LockStrategy) { + let acc = SumAccumulator::with_sum(99.5); + roundtrip(strategy, acc); +} + +fn test_clone_fidelity_min_max(strategy: LockStrategy) { + let acc = MinMaxAccumulator::with_value(42.0, "max".to_string()); + roundtrip(strategy, acc); +} + +fn test_clone_fidelity_kll(strategy: LockStrategy) { + let mut acc = DatasketchesKLLAccumulator::new(200); + for v in [1.0, 5.0, 10.0, 50.0, 100.0] { + acc._update(v); + } + roundtrip(strategy, acc); +} + +fn test_clone_fidelity_increase(strategy: LockStrategy) { + let acc = IncreaseAccumulator::new( + Measurement::new(1.0), + 100, + Measurement::new(50.0), + 500, + ); + roundtrip(strategy, acc); +} + +fn test_clone_fidelity_multiple_sum(strategy: LockStrategy) { + let mut sums = HashMap::new(); + sums.insert(key(&["host1"]), 10.0); + sums.insert(key(&["host2"]), 20.0); + let acc = MultipleSumAccumulator::new_with_sums(sums); + roundtrip(strategy, acc); +} + +fn test_clone_fidelity_multiple_min_max(strategy: LockStrategy) { + let mut values = HashMap::new(); + values.insert(key(&["dc", "east"]), 77.7); + values.insert(key(&["dc", "west"]), 33.3); + let acc = MultipleMinMaxAccumulator::new_with_values(values, "max".to_string()); + roundtrip(strategy, acc); +} + +fn test_clone_fidelity_set_aggregator(strategy: LockStrategy) { + let mut added = HashSet::new(); + added.insert(key(&["svc", "alpha"])); + added.insert(key(&["svc", "beta"])); + let acc = SetAggregatorAccumulator::with_added(added); + roundtrip(strategy, acc); +} + +fn test_clone_fidelity_delta_set_aggregator(strategy: LockStrategy) { + // Use a "Sum"-typed config so cleanup is not skipped for this test. + let store = make_store_simple(strategy); + + let mut acc = DeltaSetAggregatorAccumulator::new(); + acc.add_key(key(&["svc", "added-1"])); + acc.remove_key(key(&["svc", "removed-1"])); + let original_json = acc.serialize_to_json(); + + let acc_box: Box = Box::new(acc); + let (out, boxed) = unkeyed_entry(1, 1_000, 2_000, acc_box); + store.insert_precomputed_output(out, boxed).unwrap(); + + let result = store + .query_precomputed_output("cpu_usage", 1, 0, u64::MAX) + .unwrap(); + let retrieved = &result.get(&None).unwrap()[0].1; + assert_eq!( + original_json, + retrieved.serialize_to_json(), + "[{}] DeltaSetAggregatorAccumulator: clone must preserve added/removed sets", + label(strategy) + ); +} + +fn test_clone_fidelity_count_min_sketch(strategy: LockStrategy) { + // CountMinSketch._update is private; test clone fidelity of an initialised (empty) sketch. + let acc = CountMinSketchAccumulator::new(5, 100); + roundtrip(strategy, acc); +} + +fn test_clone_fidelity_count_min_sketch_with_heap(strategy: LockStrategy) { + let acc = CountMinSketchWithHeapAccumulator::new(5, 100, 10); + roundtrip(strategy, acc); +} + +fn test_clone_fidelity_hydra_kll(strategy: LockStrategy) { + let mut acc = HydraKllSketchAccumulator::new(4, 50, 200); + let k1 = key(&["shard", "0"]); + let k2 = key(&["shard", "1"]); + for v in [1.0f64, 10.0, 100.0] { + acc.update(&k1, v); + acc.update(&k2, v * 2.0); + } + roundtrip(strategy, acc); +} + // ── concurrency ─────────────────────────────────────────────────────────────── fn test_concurrent_inserts_no_data_loss(strategy: LockStrategy) { @@ -560,7 +918,6 @@ fn test_concurrent_inserts_no_data_loss(strategy: LockStrategy) { let store = store.clone(); std::thread::spawn(move || { for w in 0..windows_per_thread { - // Each thread writes to a unique timestamp range — no conflicts. let base = (t * windows_per_thread + w) as u64; let (out, acc) = sum_entry(1, base * 1_000, (base + 1) * 1_000, base as f64); store.insert_precomputed_output(out, acc).unwrap(); From 37b14bf839f8f0c9df9e84c5cb43d4fb83b0f442 Mon Sep 17 00:00:00 2001 From: zz_y Date: Thu, 19 Mar 2026 21:39:34 -0500 Subject: [PATCH 05/27] Fix cargo fmt violations in store_correctness_tests Co-Authored-By: Claude Sonnet 4.6 --- .../src/tests/store_correctness_tests.rs | 55 +++++++++++++------ 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/asap-query-engine/src/tests/store_correctness_tests.rs b/asap-query-engine/src/tests/store_correctness_tests.rs index 98e2589..de517ca 100644 --- a/asap-query-engine/src/tests/store_correctness_tests.rs +++ b/asap-query-engine/src/tests/store_correctness_tests.rs @@ -27,12 +27,14 @@ //! | `contract_per_key` | `LockStrategy::PerKey` (reference impl) | //! | `contract_global` | `LockStrategy::Global` | -use crate::data_model::{CleanupPolicy, KeyByLabelValues, LockStrategy, Measurement, SerializableToSink, StreamingConfig}; +use crate::data_model::{ + CleanupPolicy, KeyByLabelValues, LockStrategy, Measurement, SerializableToSink, StreamingConfig, +}; use crate::precompute_operators::{ CountMinSketchAccumulator, CountMinSketchWithHeapAccumulator, DatasketchesKLLAccumulator, DeltaSetAggregatorAccumulator, HydraKllSketchAccumulator, IncreaseAccumulator, - MinMaxAccumulator, MultipleMinMaxAccumulator, MultipleSumAccumulator, - SetAggregatorAccumulator, SumAccumulator, + MinMaxAccumulator, MultipleMinMaxAccumulator, MultipleSumAccumulator, SetAggregatorAccumulator, + SumAccumulator, }; use crate::stores::{Store, TimestampedBucketsMap}; use crate::{AggregateCore, AggregationConfig, PrecomputedOutput, SimpleMapStore}; @@ -90,7 +92,11 @@ fn make_store( /// Convenience: single agg_id=1, type "Sum", no cleanup. fn make_store_simple(strategy: LockStrategy) -> SimpleMapStore { - make_store(strategy, CleanupPolicy::NoCleanup, &[(1, "Sum", None, None)]) + make_store( + strategy, + CleanupPolicy::NoCleanup, + &[(1, "Sum", None, None)], + ) } // ── data helpers ────────────────────────────────────────────────────────────── @@ -177,7 +183,8 @@ fn assert_clone_fidelity( let orig_json = original.serialize_to_json(); let stored_json = from_store.serialize_to_json(); assert_eq!( - orig_json, stored_json, + orig_json, + stored_json, "[{}] {type_name}: clone_boxed_core() must produce identical serialization", label(strategy) ); @@ -392,7 +399,8 @@ fn test_batch_insert_results_are_chronologically_ordered(strategy: LockStrategy) .map(|i| (i * 60_000, (i + 1) * 60_000)) .collect(); assert_eq!( - ts, expected, + ts, + expected, "[{}] range query results must be in chronological (ascending start) order", label(strategy) ); @@ -662,8 +670,20 @@ fn test_keyed_entries_grouped_by_key(strategy: LockStrategy) { let k2 = key(&["host2"]); // Same timestamp window, two different keys. - let (o1, a1) = keyed_entry(1, 1_000, 2_000, k1.clone(), Box::new(SumAccumulator::with_sum(10.0))); - let (o2, a2) = keyed_entry(1, 1_000, 2_000, k2.clone(), Box::new(SumAccumulator::with_sum(20.0))); + let (o1, a1) = keyed_entry( + 1, + 1_000, + 2_000, + k1.clone(), + Box::new(SumAccumulator::with_sum(10.0)), + ); + let (o2, a2) = keyed_entry( + 1, + 1_000, + 2_000, + k2.clone(), + Box::new(SumAccumulator::with_sum(20.0)), + ); store.insert_precomputed_output(o1, a1).unwrap(); store.insert_precomputed_output(o2, a2).unwrap(); @@ -697,8 +717,13 @@ fn test_keyed_and_unkeyed_entries_coexist(strategy: LockStrategy) { let k = key(&["region", "us-east"]); let (o_none, a_none) = sum_entry(1, 1_000, 2_000, 1.0); - let (o_keyed, a_keyed) = - keyed_entry(1, 3_000, 4_000, k.clone(), Box::new(SumAccumulator::with_sum(2.0))); + let (o_keyed, a_keyed) = keyed_entry( + 1, + 3_000, + 4_000, + k.clone(), + Box::new(SumAccumulator::with_sum(2.0)), + ); store.insert_precomputed_output(o_none, a_none).unwrap(); store.insert_precomputed_output(o_keyed, a_keyed).unwrap(); @@ -798,7 +823,8 @@ fn roundtrip( // instead, compare directly here using the captured JSON. let retrieved_json = retrieved.serialize_to_json(); assert_eq!( - original_json, retrieved_json, + original_json, + retrieved_json, "[{}] clone_boxed_core must produce identical serialization", label(strategy) ); @@ -826,12 +852,7 @@ fn test_clone_fidelity_kll(strategy: LockStrategy) { } fn test_clone_fidelity_increase(strategy: LockStrategy) { - let acc = IncreaseAccumulator::new( - Measurement::new(1.0), - 100, - Measurement::new(50.0), - 500, - ); + let acc = IncreaseAccumulator::new(Measurement::new(1.0), 100, Measurement::new(50.0), 500); roundtrip(strategy, acc); } From 2be7398b4926a07862fdd84169abb2f2b9268ed1 Mon Sep 17 00:00:00 2001 From: zz_y Date: Thu, 19 Mar 2026 21:43:15 -0500 Subject: [PATCH 06/27] Remove unused assert_clone_fidelity function (clippy dead_code) Co-Authored-By: Claude Sonnet 4.6 --- .../src/tests/store_correctness_tests.rs | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/asap-query-engine/src/tests/store_correctness_tests.rs b/asap-query-engine/src/tests/store_correctness_tests.rs index de517ca..8bb87fe 100644 --- a/asap-query-engine/src/tests/store_correctness_tests.rs +++ b/asap-query-engine/src/tests/store_correctness_tests.rs @@ -171,24 +171,6 @@ fn label(strategy: LockStrategy) -> &'static str { } } -/// Assert that two accumulators produce identical JSON after a store roundtrip. -/// Uses `serialize_to_json()` which is available on all `AggregateCore` impls -/// via the `SerializableToSink` supertrait. -fn assert_clone_fidelity( - original: &dyn AggregateCore, - from_store: &dyn AggregateCore, - type_name: &str, - strategy: LockStrategy, -) { - let orig_json = original.serialize_to_json(); - let stored_json = from_store.serialize_to_json(); - assert_eq!( - orig_json, - stored_json, - "[{}] {type_name}: clone_boxed_core() must produce identical serialization", - label(strategy) - ); -} // ── contract suite ──────────────────────────────────────────────────────────── From 2fa8e236b6021913908c916c7ba069bdaf2e0887 Mon Sep 17 00:00:00 2001 From: zz_y Date: Thu, 19 Mar 2026 21:47:01 -0500 Subject: [PATCH 07/27] Fix extra blank line (cargo fmt) Co-Authored-By: Claude Sonnet 4.6 --- asap-query-engine/src/tests/store_correctness_tests.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/asap-query-engine/src/tests/store_correctness_tests.rs b/asap-query-engine/src/tests/store_correctness_tests.rs index 8bb87fe..efc41bd 100644 --- a/asap-query-engine/src/tests/store_correctness_tests.rs +++ b/asap-query-engine/src/tests/store_correctness_tests.rs @@ -171,7 +171,6 @@ fn label(strategy: LockStrategy) -> &'static str { } } - // ── contract suite ──────────────────────────────────────────────────────────── pub fn run_contract_suite(strategy: LockStrategy) { From 93d06dfc235f3316e867695514c4f95f7a9ef0d9 Mon Sep 17 00:00:00 2001 From: zz_y Date: Mon, 9 Mar 2026 10:08:02 -0500 Subject: [PATCH 08/27] Replace SimpleStore with inverted index (label -> BTreeMap