diff --git a/asap-common/sketch-core/report.md b/asap-common/sketch-core/report.md index a65a204..05384e3 100644 --- a/asap-common/sketch-core/report.md +++ b/asap-common/sketch-core/report.md @@ -1,15 +1,18 @@ -# Sketchlib Fidelity Report +# Report -Compares the **legacy** Count-Min Sketch implementation in `sketch-core` vs the new **sketchlib-rust** backend. +Compares the **legacy** sketch implementations in `sketch-core` vs the **sketchlib-rust** backends (Count-Min Sketch, Count-Min-With-Heap, KLL, HydraKLL). ## Fidelity harness -The fidelity binary selects backends via CLI flags. +The fidelity binary selects backends via CLI flags (`--cms-impl`, `--kll-impl`, `--cmwh-impl`). -| Goal | Command | -|-------------|---------------------------------------------------------------| -| CMS sketchlib | `cargo run -p sketch-core --bin sketchlib_fidelity -- --cms-impl sketchlib` | -| CMS legacy | `cargo run -p sketch-core --bin sketchlib_fidelity -- --cms-impl legacy` | +| Goal | Command | +|--------------------------|--------------------------------------------------------------------------------------------------------------| +| Default (all sketchlib) | `cargo run -p sketch-core --bin sketchlib_fidelity` | +| All legacy | `cargo run -p sketch-core --bin sketchlib_fidelity -- --cms-impl legacy --kll-impl legacy --cmwh-impl legacy` | +| Legacy KLL only | `cargo run -p sketch-core --bin sketchlib_fidelity -- --cms-impl sketchlib --kll-impl legacy --cmwh-impl sketchlib` | +| CMS sketchlib only | `cargo run -p sketch-core --bin sketchlib_fidelity -- --cms-impl sketchlib` | +| CMS legacy only | `cargo run -p sketch-core --bin sketchlib_fidelity -- --cms-impl legacy` | ## Unit tests @@ -68,3 +71,55 @@ The heap is maintained by local updates; recall is measured against the **true** | 2048 | 200000 | 2000 | 20 | sketchlib-rust | 1.00 | 0.9982 | 0.021 | 0.067 | | 2048 | 200000 | 2000 | 50 | Legacy | 0.40 | 0.9999983 | 5.60 | 16.49 | | 2048 | 200000 | 2000 | 50 | sketchlib-rust | 0.48 | 0.9999990 | 3.90 | 12.95 | + +--- + +### KllSketch (quantiles, absolute rank error) + +For each quantile \(q\), we compute the sketch estimate `est_value`, then: +`abs_rank_error = |rank_fraction(exact_sorted_values, est_value) - q|`. + +#### k=20 + +| n_updates | Mode | q=0.5 | q=0.9 | q=0.99 | +|-----------|----------------|---------|---------|---------| +| 200000 | Legacy | 0.0104 | 0.0145 | 0.0028 | +| 200000 | sketchlib-rust | 0.0275 | 0.0470 | 0.0061 | +| 50000 | Legacy | 0.0131 | 0.0091 | 0.0054 | +| 50000 | sketchlib-rust | 0.0110 | 0.0116 | 0.0031 | + +#### k=50 + +| n_updates | Mode | q=0.5 | q=0.9 | q=0.99 | +|-----------|----------------|---------|---------|---------| +| 200000 | Legacy | 0.0013 | 0.0021 | 0.0012 | +| 200000 | sketchlib-rust | 0.0101 | 0.0044 | 0.0074 | + +#### k=200 + +| n_updates | Mode | q=0.5 | q=0.9 | q=0.99 | +|-----------|----------------|---------|---------|---------| +| 200000 | Legacy | 0.0021 | 0.0036 | 0.0000 | +| 200000 | sketchlib-rust | 0.0015 | 0.0001 | 0.0002 | + +--- + +### HydraKllSketch (per-key quantiles, mean/max absolute rank error across 50 keys) + +#### rows=2, cols=64 + +| k | n | domain | Mode | q=0.5 (mean / max) | q=0.9 (mean / max) | +|-----|--------|--------|----------------|--------------------|--------------------| +| 20 | 200000 | 200 | Legacy | 0.0170 / 0.0546 | 0.0165 / 0.0452 | +| 20 | 200000 | 200 | sketchlib-rust | 0.0254 / 0.0629 | 0.0546 / 0.0942 | + +#### rows=3, cols=128 + +| k | n | domain | Mode | q=0.5 (mean / max) | q=0.9 (mean / max) | +|-----|--------|--------|----------------|--------------------|--------------------| +| 20 | 200000 | 200 | Legacy | 0.0166 / 0.0591 | 0.0114 / 0.0304 | +| 20 | 200000 | 200 | sketchlib-rust | 0.0216 / 0.0534 | 0.0238 / 0.1087 | +| 50 | 200000 | 200 | Legacy | 0.0099 / 0.0352 | 0.0087 / 0.0330 | +| 50 | 200000 | 200 | sketchlib-rust | 0.0119 / 0.0458 | 0.0119 / 0.0296 | +| 20 | 100000 | 100 | Legacy | 0.0141 / 0.0574 | 0.0149 / 0.0471 | +| 20 | 100000 | 100 | sketchlib-rust | 0.0202 / 0.0621 | 0.0287 / 0.0779 | diff --git a/asap-common/sketch-core/src/bin/sketchlib_fidelity.rs b/asap-common/sketch-core/src/bin/sketchlib_fidelity.rs index 99ca914..4203af3 100644 --- a/asap-common/sketch-core/src/bin/sketchlib_fidelity.rs +++ b/asap-common/sketch-core/src/bin/sketchlib_fidelity.rs @@ -7,6 +7,8 @@ use clap::Parser; use sketch_core::config::{self, ImplMode}; use sketch_core::count_min::CountMinSketch; use sketch_core::count_min_with_heap::CountMinSketchWithHeap; +use sketch_core::hydra_kll::HydraKllSketch; +use sketch_core::kll::KllSketch; #[derive(Clone)] struct Lcg64 { @@ -93,6 +95,16 @@ fn rmse_percentage(exact: &[f64], est: &[f64]) -> f64 { (sum_sq / denom).sqrt() * 100.0 } +#[derive(Parser)] +struct Args { + #[arg(long, value_enum, default_value_t = sketch_core::config::DEFAULT_CMS_IMPL)] + cms_impl: ImplMode, + #[arg(long, value_enum, default_value_t = sketch_core::config::DEFAULT_KLL_IMPL)] + kll_impl: ImplMode, + #[arg(long, value_enum, default_value_t = sketch_core::config::DEFAULT_CMWH_IMPL)] + cmwh_impl: ImplMode, +} + fn rank_fraction(sorted: &[f64], x: f64) -> f64 { if sorted.is_empty() { return 0.0; @@ -210,14 +222,110 @@ fn run_countmin_with_heap_once(seed: u64, p: &CmwhParams) -> CmwhResult { } } -#[derive(Parser)] -struct Args { - #[arg(long, value_enum, default_value_t = sketch_core::config::DEFAULT_CMS_IMPL)] - cms_impl: ImplMode, - #[arg(long, value_enum, default_value_t = sketch_core::config::DEFAULT_KLL_IMPL)] - kll_impl: ImplMode, - #[arg(long, value_enum, default_value_t = sketch_core::config::DEFAULT_CMWH_IMPL)] - cmwh_impl: ImplMode, +// --- KllSketch --- + +struct KllParams { + k: u16, + n: usize, +} + +struct KllResult { + rank_err_50: f64, + rank_err_90: f64, + rank_err_99: f64, +} + +fn run_kll_once(seed: u64, p: &KllParams) -> KllResult { + let mut rng = Lcg64::new(seed ^ 0x1234_5678); + let mut values: Vec = Vec::with_capacity(p.n); + let mut sk = KllSketch::new(p.k); + + for _ in 0..p.n { + let v = rng.next_f64_0_1() * 1_000_000.0; + values.push(v); + sk.update(v); + } + + values.sort_by(f64::total_cmp); + let qs = [0.5, 0.9, 0.99]; + let rank_err = |q: f64| (rank_fraction(&values, sk.get_quantile(q)) - q).abs(); + + KllResult { + rank_err_50: rank_err(qs[0]), + rank_err_90: rank_err(qs[1]), + rank_err_99: rank_err(qs[2]), + } +} + +// --- HydraKllSketch --- + +struct HydraKllParams { + rows: usize, + cols: usize, + k: u16, + n: usize, + domain: usize, + eval_keys: usize, +} + +struct HydraKllResult { + mean_50: f64, + max_50: f64, + mean_90: f64, + max_90: f64, +} + +fn run_hydra_kll_once(seed: u64, p: &HydraKllParams) -> HydraKllResult { + let mut rng = Lcg64::new(seed ^ 0xDEAD_BEEF); + let mut hydra = HydraKllSketch::new(p.rows, p.cols, p.k); + let mut exact: HashMap> = HashMap::new(); + + for _ in 0..p.n { + let r = rng.next_u64(); + let key_id = if (r & 0xFF) < 200 { + (r as usize) % 20 + } else { + (r as usize) % p.domain + }; + let key = format!("k{key_id}"); + let v = rng.next_f64_0_1() * 1_000_000.0; + hydra.update(&key, v); + exact.entry(key).or_default().push(v); + } + + let mut keys: Vec = exact.keys().cloned().collect(); + keys.sort(); + keys.truncate(p.eval_keys); + + let mut mean_50 = 0.0f64; + let mut max_50 = 0.0f64; + let mut mean_90 = 0.0f64; + let mut max_90 = 0.0f64; + let nk = keys.len() as f64; + for key in &keys { + let mut vals = exact.get(key).cloned().unwrap_or_default(); + vals.sort_by(f64::total_cmp); + for (q, mean_ref, max_ref) in [ + (0.5, &mut mean_50, &mut max_50), + (0.9, &mut mean_90, &mut max_90), + ] { + let est = hydra.query(key, q); + let err = (rank_fraction(&vals, est) - q).abs(); + *mean_ref += err; + if err > *max_ref { + *max_ref = err; + } + } + } + mean_50 /= nk; + mean_90 /= nk; + + HydraKllResult { + mean_50, + max_50, + mean_90, + max_90, + } } fn main() { @@ -236,6 +344,11 @@ fn main() { } else { "sketchlib-rust" }; + let kll_mode = if matches!(args.kll_impl, ImplMode::Legacy) { + "Legacy" + } else { + "sketchlib-rust" + }; // CountMinSketch: multiple (depth, width, n, domain) let cms_param_sets: Vec = vec![ @@ -311,4 +424,73 @@ fn main() { p.depth, p.width, p.n, p.domain, p.heap_size, r.topk_recall, r.pearson, r.mape, r.rmse ); } + // KllSketch + let kll_param_sets: Vec = vec![ + KllParams { k: 20, n: 200_000 }, + KllParams { k: 50, n: 200_000 }, + KllParams { k: 200, n: 200_000 }, + KllParams { k: 20, n: 50_000 }, + ]; + + println!("\n## KllSketch ({kll_mode})"); + println!( + "| k | n_updates | q=0.5 abs_rank_error | q=0.9 abs_rank_error | q=0.99 abs_rank_error |" + ); + println!( + "|---|-----------|----------------------|----------------------|-----------------------|" + ); + for p in &kll_param_sets { + let r = run_kll_once(seed, p); + println!( + "| {} | {} | {:.6} | {:.6} | {:.6} |", + p.k, p.n, r.rank_err_50, r.rank_err_90, r.rank_err_99 + ); + } + + // HydraKllSketch + let hydra_param_sets: Vec = vec![ + HydraKllParams { + rows: 2, + cols: 64, + k: 20, + n: 200_000, + domain: 200, + eval_keys: 50, + }, + HydraKllParams { + rows: 3, + cols: 128, + k: 20, + n: 200_000, + domain: 200, + eval_keys: 50, + }, + HydraKllParams { + rows: 3, + cols: 128, + k: 50, + n: 200_000, + domain: 200, + eval_keys: 50, + }, + HydraKllParams { + rows: 3, + cols: 128, + k: 20, + n: 100_000, + domain: 100, + eval_keys: 50, + }, + ]; + + println!("\n## HydraKllSketch ({kll_mode})"); + println!("| rows | cols | k | n | domain | q=0.5 mean/max | q=0.9 mean/max |"); + println!("|------|------|---|-----|--------|----------------|----------------|"); + for p in &hydra_param_sets { + let r = run_hydra_kll_once(seed, p); + println!( + "| {} | {} | {} | {} | {} | {:.5} / {:.5} | {:.5} / {:.5} |", + p.rows, p.cols, p.k, p.n, p.domain, r.mean_50, r.max_50, r.mean_90, r.max_90 + ); + } } diff --git a/asap-common/sketch-core/src/config.rs b/asap-common/sketch-core/src/config.rs index b23dea5..6bdccb6 100644 --- a/asap-common/sketch-core/src/config.rs +++ b/asap-common/sketch-core/src/config.rs @@ -10,11 +10,11 @@ pub enum ImplMode { } /// Global default when impl mode is not explicitly configured (e.g. env var parsing). -pub const DEFAULT_IMPL_MODE: ImplMode = ImplMode::Legacy; +pub const DEFAULT_IMPL_MODE: ImplMode = ImplMode::Sketchlib; /// Per-backend defaults. Used when configure() has not been called. pub const DEFAULT_CMS_IMPL: ImplMode = ImplMode::Sketchlib; -pub const DEFAULT_KLL_IMPL: ImplMode = ImplMode::Legacy; +pub const DEFAULT_KLL_IMPL: ImplMode = ImplMode::Sketchlib; pub const DEFAULT_CMWH_IMPL: ImplMode = ImplMode::Sketchlib; static COUNTMIN_MODE: OnceLock = OnceLock::new(); diff --git a/asap-common/sketch-core/src/kll.rs b/asap-common/sketch-core/src/kll.rs index c31f0cf..1628744 100644 --- a/asap-common/sketch-core/src/kll.rs +++ b/asap-common/sketch-core/src/kll.rs @@ -16,6 +16,12 @@ use core::panic; use dsrs::KllDoubleSketch; use serde::{Deserialize, Serialize}; +use crate::config::use_sketchlib_for_kll; +use crate::kll_sketchlib::{ + bytes_from_sketchlib_kll, new_sketchlib_kll, sketchlib_kll_from_bytes, sketchlib_kll_merge, + sketchlib_kll_quantile, sketchlib_kll_update, SketchlibKll, +}; + /// Wire format used in MessagePack serialization (matches Arroyo UDF output). #[derive(Deserialize, Serialize)] pub struct KllSketchData { @@ -23,28 +29,84 @@ pub struct KllSketchData { pub sketch_bytes: Vec, } +/// Backend implementation for KLL Sketch. Only one is active at a time. +pub enum KllBackend { + /// dsrs (DataSketches) implementation. + Legacy(KllDoubleSketch), + /// sketchlib-rust backed implementation. + Sketchlib(SketchlibKll), +} + +impl std::fmt::Debug for KllBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + KllBackend::Legacy(_) => write!(f, "Legacy(..)"), + KllBackend::Sketchlib(_) => write!(f, "Sketchlib(..)"), + } + } +} + +impl Clone for KllBackend { + fn clone(&self) -> Self { + match self { + KllBackend::Legacy(s) => { + if s.get_n() == 0 { + KllBackend::Legacy(KllDoubleSketch::with_k(200)) // k will be overwritten by KllSketch + } else { + let bytes = s.serialize(); + KllBackend::Legacy(KllDoubleSketch::deserialize(bytes.as_ref()).unwrap()) + } + } + KllBackend::Sketchlib(s) => KllBackend::Sketchlib(s.clone()), + } + } +} + pub struct KllSketch { pub k: u16, - pub sketch: KllDoubleSketch, + pub backend: KllBackend, } impl KllSketch { pub fn new(k: u16) -> Self { - Self { - k, - sketch: KllDoubleSketch::with_k(k), + let backend = if use_sketchlib_for_kll() { + KllBackend::Sketchlib(new_sketchlib_kll(k)) + } else { + KllBackend::Legacy(KllDoubleSketch::with_k(k)) + }; + Self { k, backend } + } + + /// Returns the raw sketch bytes (for JSON serialization, etc.). + pub fn sketch_bytes(&self) -> Vec { + match &self.backend { + KllBackend::Legacy(s) => s.serialize().as_ref().to_vec(), + KllBackend::Sketchlib(s) => bytes_from_sketchlib_kll(s), } } pub fn update(&mut self, value: f64) { - self.sketch.update(value); + match &mut self.backend { + KllBackend::Legacy(s) => s.update(value), + KllBackend::Sketchlib(s) => sketchlib_kll_update(s, value), + } + } + + pub fn count(&self) -> u64 { + match &self.backend { + KllBackend::Legacy(s) => s.get_n(), + KllBackend::Sketchlib(s) => s.count() as u64, + } } pub fn get_quantile(&self, quantile: f64) -> f64 { - if self.sketch.get_n() == 0 { + if self.count() == 0 { return 0.0; } - self.sketch.get_quantile(quantile) + match &self.backend { + KllBackend::Legacy(s) => s.get_quantile(quantile), + KllBackend::Sketchlib(s) => sketchlib_kll_quantile(s, quantile), + } } pub fn merge( @@ -54,7 +116,6 @@ impl KllSketch { return Err("No accumulators to merge".into()); } - // check K values for all and merge let k = accumulators[0].k; for acc in &accumulators { if acc.k != k { @@ -63,8 +124,25 @@ impl KllSketch { } let mut merged = KllSketch::new(k); - for accumulator in accumulators { - merged.sketch.merge(&accumulator.sketch); + match &mut merged.backend { + KllBackend::Legacy(merged_legacy) => { + for acc in accumulators { + if let KllBackend::Legacy(acc_legacy) = acc.backend { + merged_legacy.merge(&acc_legacy); + } else { + return Err("Cannot merge Legacy with Sketchlib KLL".into()); + } + } + } + KllBackend::Sketchlib(merged_sketchlib) => { + for acc in accumulators { + if let KllBackend::Sketchlib(acc_sketchlib) = &acc.backend { + sketchlib_kll_merge(merged_sketchlib, acc_sketchlib); + } else { + return Err("Cannot merge Sketchlib with Legacy KLL".into()); + } + } + } } Ok(merged) @@ -72,12 +150,10 @@ impl KllSketch { /// Serialize to MessagePack — matches the Arroyo UDF wire format exactly. pub fn serialize_msgpack(&self) -> Vec { - // Create KllSketchData compatible with deserialize_msgpack() - // This matches exactly what the Arroyo UDF does - let sketch_data = self.sketch.serialize(); + let sketch_bytes = self.sketch_bytes(); let serialized = KllSketchData { k: self.k, - sketch_bytes: sketch_data.as_ref().to_vec(), + sketch_bytes, }; let mut buf = Vec::new(); @@ -91,21 +167,22 @@ impl KllSketch { /// Deserialize from MessagePack produced by the Arroyo UDF. pub fn deserialize_msgpack(buffer: &[u8]) -> Result> { - let deserialized_sketch_data: KllSketchData = rmp_serde::from_slice(buffer) + let wire: KllSketchData = rmp_serde::from_slice(buffer) .map_err(|e| format!("Failed to deserialize KllSketchData from MessagePack: {e}"))?; - let sketch: KllDoubleSketch = - KllDoubleSketch::deserialize(&deserialized_sketch_data.sketch_bytes) - .map_err(|e| format!("Failed to deserialize KLL sketch: {e}"))?; + let backend = if use_sketchlib_for_kll() { + KllBackend::Sketchlib(sketchlib_kll_from_bytes(&wire.sketch_bytes)?) + } else { + KllBackend::Legacy( + KllDoubleSketch::deserialize(&wire.sketch_bytes) + .map_err(|e| format!("Failed to deserialize KLL sketch: {e}"))?, + ) + }; - Ok(Self { - k: deserialized_sketch_data.k, - sketch, - }) + Ok(Self { k: wire.k, backend }) } - /// Merge from references without cloning — possible because KllDoubleSketch::merge - /// takes &other (the underlying C++ merge API is borrow-based). + /// Merge from references without cloning. pub fn merge_refs( sketches: &[&Self], ) -> Result> { @@ -119,18 +196,37 @@ impl KllSketch { } } let mut merged = Self::new(k); - for s in sketches { - merged.sketch.merge(&s.sketch); + match &mut merged.backend { + KllBackend::Legacy(merged_legacy) => { + for s in sketches { + if let KllBackend::Legacy(s_legacy) = &s.backend { + merged_legacy.merge(s_legacy); + } else { + return Err("Cannot merge Legacy with Sketchlib KLL".into()); + } + } + } + KllBackend::Sketchlib(merged_sketchlib) => { + for s in sketches { + if let KllBackend::Sketchlib(s_sketchlib) = &s.backend { + sketchlib_kll_merge(merged_sketchlib, s_sketchlib); + } else { + return Err("Cannot merge Sketchlib with Legacy KLL".into()); + } + } + } } Ok(merged) } /// Deserialize from a raw datasketches byte buffer (legacy Flink/FlinkSketch format). - /// Used by QE's legacy deserializers to avoid a direct dsrs dependency there. pub fn from_dsrs_bytes(bytes: &[u8], k: u16) -> Result> { let sketch = KllDoubleSketch::deserialize(bytes) .map_err(|e| format!("Failed to deserialize KLL sketch from dsrs bytes: {e}"))?; - Ok(Self { k, sketch }) + Ok(Self { + k, + backend: KllBackend::Legacy(sketch), + }) } /// One-shot aggregation for the Arroyo UDAF call pattern. @@ -146,15 +242,25 @@ impl KllSketch { } } -// Manual trait implementations since the C++ library doesn't provide them +// Manual trait implementations since the C++ and sketchlib types don't provide Clone impl Clone for KllSketch { fn clone(&self) -> Self { - let bytes = self.sketch.serialize(); - let new_sketch = KllDoubleSketch::deserialize(bytes.as_ref()).unwrap(); - Self { - k: self.k, - sketch: new_sketch, - } + let backend = match &self.backend { + KllBackend::Legacy(sketch) => { + let new_sketch = if sketch.get_n() == 0 { + KllDoubleSketch::with_k(self.k) + } else { + let bytes = sketch.serialize(); + KllDoubleSketch::deserialize(bytes.as_ref()).unwrap() + }; + KllBackend::Legacy(new_sketch) + } + KllBackend::Sketchlib(s) => { + let bytes = bytes_from_sketchlib_kll(s); + KllBackend::Sketchlib(sketchlib_kll_from_bytes(&bytes).unwrap()) + } + }; + Self { k: self.k, backend } } } @@ -162,7 +268,7 @@ impl std::fmt::Debug for KllSketch { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("KllSketch") .field("k", &self.k) - .field("sketch_n", &self.sketch.get_n()) + .field("sketch_n", &self.count()) .finish() } } @@ -181,7 +287,7 @@ mod tests { #[test] fn test_kll_creation() { let kll = KllSketch::new(200); - assert!(kll.sketch.get_n() == 0); + assert_eq!(kll.count(), 0); assert_eq!(kll.k, 200); } @@ -191,7 +297,7 @@ mod tests { kll.update(10.0); kll.update(20.0); kll.update(15.0); - assert_eq!(kll.sketch.get_n(), 3); + assert_eq!(kll.count(), 3); } #[test] @@ -202,7 +308,11 @@ mod tests { } assert_eq!(kll.get_quantile(0.0), 1.0); assert_eq!(kll.get_quantile(1.0), 10.0); - assert_eq!(kll.get_quantile(0.5), 6.0); + let median = kll.get_quantile(0.5); + assert!( + (5.0..=6.0).contains(&median), + "median should be between 5 and 6; got {median}" + ); } #[test] @@ -218,7 +328,7 @@ mod tests { } let merged = KllSketch::merge(vec![kll1, kll2]).unwrap(); - assert_eq!(merged.sketch.get_n(), 10); + assert_eq!(merged.count(), 10); assert_eq!(merged.get_quantile(0.0), 1.0); assert_eq!(merged.get_quantile(1.0), 10.0); } @@ -234,7 +344,7 @@ mod tests { let deserialized = KllSketch::deserialize_msgpack(&bytes).unwrap(); assert_eq!(deserialized.k, 200); - assert_eq!(deserialized.sketch.get_n(), 5); + assert_eq!(deserialized.count(), 5); assert_eq!(deserialized.get_quantile(0.0), 1.0); assert_eq!(deserialized.get_quantile(1.0), 5.0); } @@ -244,7 +354,7 @@ mod tests { let values = [1.0, 2.0, 3.0, 4.0, 5.0]; let bytes = KllSketch::aggregate_kll(200, &values).unwrap(); let kll = KllSketch::deserialize_msgpack(&bytes).unwrap(); - assert_eq!(kll.sketch.get_n(), 5); + assert_eq!(kll.count(), 5); assert_eq!(kll.get_quantile(0.0), 1.0); assert_eq!(kll.get_quantile(1.0), 5.0); } diff --git a/asap-common/sketch-core/src/kll_sketchlib.rs b/asap-common/sketch-core/src/kll_sketchlib.rs new file mode 100644 index 0000000..96c03ab --- /dev/null +++ b/asap-common/sketch-core/src/kll_sketchlib.rs @@ -0,0 +1,36 @@ +use sketchlib_rust::{SketchInput, KLL}; + +/// Concrete KLL type from sketchlib-rust when sketchlib backend is enabled. +pub type SketchlibKll = KLL; + +/// Creates a fresh sketchlib KLL sketch with the requested accuracy parameter `k`. +pub fn new_sketchlib_kll(k: u16) -> SketchlibKll { + KLL::init_kll(k as i32) +} + +/// Updates a sketchlib KLL with one numeric observation. +pub fn sketchlib_kll_update(inner: &mut SketchlibKll, value: f64) { + // KLL accepts only numeric inputs. We intentionally ignore the error here because `value` + // is always numeric. + let _ = inner.update(&SketchInput::F64(value)); +} + +/// Queries a sketchlib KLL for the value at the requested quantile. +pub fn sketchlib_kll_quantile(inner: &SketchlibKll, q: f64) -> f64 { + inner.quantile(q) +} + +/// Merges `src` into `dst`. +pub fn sketchlib_kll_merge(dst: &mut SketchlibKll, src: &SketchlibKll) { + dst.merge(src); +} + +/// Serializes a sketchlib KLL into MessagePack bytes. +pub fn bytes_from_sketchlib_kll(inner: &SketchlibKll) -> Vec { + inner.serialize_to_bytes().unwrap() +} + +/// Deserializes a sketchlib KLL from MessagePack bytes. +pub fn sketchlib_kll_from_bytes(bytes: &[u8]) -> Result> { + Ok(KLL::deserialize_from_bytes(bytes)?) +} diff --git a/asap-common/sketch-core/src/lib.rs b/asap-common/sketch-core/src/lib.rs index 86fbf5f..3ddd32b 100644 --- a/asap-common/sketch-core/src/lib.rs +++ b/asap-common/sketch-core/src/lib.rs @@ -12,4 +12,5 @@ pub mod count_min_with_heap_sketchlib; pub mod delta_set_aggregator; pub mod hydra_kll; pub mod kll; +pub mod kll_sketchlib; pub mod set_aggregator; diff --git a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs index b074fad..7297680 100644 --- a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs @@ -5,6 +5,7 @@ use base64::{engine::general_purpose, Engine as _}; use serde_json::Value; use sketch_core::kll::KllSketch; use std::collections::HashMap; +#[cfg(feature = "extra_debugging")] use std::time::Instant; use tracing::debug; @@ -42,6 +43,7 @@ impl DatasketchesKLLAccumulator { .decode(sketch_b64) .map_err(|e| format!("Failed to decode base64 sketch data: {e}"))?; + // TODO: remove this hardcoding once FlinkSketch serializes k in its output Ok(Self { inner: KllSketch::from_dsrs_bytes(&sketch_bytes, 200)?, }) @@ -49,6 +51,7 @@ impl DatasketchesKLLAccumulator { pub fn deserialize_from_bytes(buffer: &[u8]) -> Result> { // Mirror Python implementation: deserialize sketch directly from bytes + // TODO: remove this hardcoding once FlinkSketch serializes k in its output Ok(Self { inner: KllSketch::from_dsrs_bytes(buffer, 200)?, }) @@ -111,7 +114,7 @@ impl std::fmt::Debug for DatasketchesKLLAccumulator { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("DatasketchesKLLAccumulator") .field("k", &self.inner.k) - .field("sketch_n", &self.inner.sketch.get_n()) + .field("sketch_n", &self.inner.count()) .finish() } } @@ -126,7 +129,7 @@ unsafe impl Sync for DatasketchesKLLAccumulator {} impl SerializableToSink for DatasketchesKLLAccumulator { fn serialize_to_json(&self) -> Value { // Mirror Python implementation: {"sketch": base64_encoded_string} - let sketch_bytes = self.inner.sketch.serialize(); + let sketch_bytes = self.inner.sketch_bytes(); let sketch_b64 = general_purpose::STANDARD.encode(&sketch_bytes); serde_json::json!({ "sketch": sketch_b64 }) } @@ -159,7 +162,7 @@ impl AggregateCore for DatasketchesKLLAccumulator { debug!( "[PERF] DatasketchesKLLAccumulator::merge_with() started - self.k={}, self.n={}", self.inner.k, - self.inner.sketch.get_n() + self.inner.count() ); if other.get_accumulator_type() != self.get_accumulator_type() { @@ -256,7 +259,7 @@ mod tests { #[test] fn test_datasketches_kll_creation() { let kll = DatasketchesKLLAccumulator::new(200); - assert!(kll.inner.sketch.get_n() == 0); + assert!(kll.inner.count() == 0); assert_eq!(kll.inner.k, 200); } @@ -266,7 +269,7 @@ mod tests { kll._update(10.0); kll._update(20.0); kll._update(15.0); - assert_eq!(kll.inner.sketch.get_n(), 3); + assert_eq!(kll.inner.count(), 3); } #[test] @@ -277,7 +280,9 @@ mod tests { } assert_eq!(kll.get_quantile(0.0), 1.0); assert_eq!(kll.get_quantile(1.0), 10.0); - assert_eq!(kll.get_quantile(0.5), 6.0); + // Sketchlib KLL is approximate; 0.5 quantile of 1..10 may be 5, 6, or 7. + let q50 = kll.get_quantile(0.5); + assert!((q50 - 6.0).abs() <= 1.0, "expected median ~6, got {q50}"); } #[test] @@ -290,7 +295,11 @@ mod tests { let mut query_kwargs = HashMap::new(); query_kwargs.insert("quantile".to_string(), "0.5".to_string()); let result = kll.query(Statistic::Quantile, Some(&query_kwargs)).unwrap(); - assert_eq!(result, 6.0); + // Sketchlib KLL is approximate; 0.5 quantile of 1..10 may be 5, 6, or 7. + assert!( + (result - 6.0).abs() <= 1.0, + "expected median ~6, got {result}" + ); assert!(kll.query(Statistic::Sum, Some(&query_kwargs)).is_err()); } @@ -308,7 +317,7 @@ mod tests { } let merged = DatasketchesKLLAccumulator::merge_accumulators(vec![kll1, kll2]).unwrap(); - assert_eq!(merged.inner.sketch.get_n(), 10); + assert_eq!(merged.inner.count(), 10); assert_eq!(merged.get_quantile(0.0), 1.0); assert_eq!(merged.get_quantile(1.0), 10.0); } @@ -325,7 +334,7 @@ mod tests { DatasketchesKLLAccumulator::deserialize_from_bytes_arroyo(&bytes).unwrap(); assert_eq!(deserialized.inner.k, 200); - assert_eq!(deserialized.inner.sketch.get_n(), 5); + assert_eq!(deserialized.inner.count(), 5); assert_eq!(deserialized.get_quantile(0.0), 1.0); assert_eq!(deserialized.get_quantile(1.0), 5.0); } @@ -354,11 +363,19 @@ mod tests { let mut query_kwargs = HashMap::new(); query_kwargs.insert("quantile".to_string(), "0.5".to_string()); let result = kll.query(Statistic::Quantile, Some(&query_kwargs)).unwrap(); - assert_eq!(result, 6.0); + // Sketchlib KLL is approximate; 0.5 quantile of 1..10 may be 5, 6, or 7. + assert!( + (result - 6.0).abs() <= 1.0, + "expected median ~6, got {result}" + ); query_kwargs.insert("quantile".to_string(), "0.9".to_string()); let result = kll.query(Statistic::Quantile, Some(&query_kwargs)).unwrap(); - assert_eq!(result, 10.0); + // Sketchlib KLL is approximate; 0.9 quantile of 1..10 may be 9 or 10. + assert!( + (9.0..=10.0).contains(&result), + "expected 0.9 quantile in [9,10], got {result}" + ); query_kwargs.insert("quantile".to_string(), "0.0".to_string()); assert_eq!( @@ -407,7 +424,7 @@ mod tests { vec![Box::new(kll1), Box::new(kll2), Box::new(kll3)]; let merged = DatasketchesKLLAccumulator::merge_multiple(&boxed_accs).unwrap(); - assert_eq!(merged.inner.sketch.get_n(), 15); + assert_eq!(merged.inner.count(), 15); assert_eq!(merged.get_quantile(0.0), 1.0); assert_eq!(merged.get_quantile(1.0), 15.0); assert_eq!(merged.get_quantile(0.5), 8.0); diff --git a/asap-summary-ingest/run_arroyosketch.py b/asap-summary-ingest/run_arroyosketch.py index 0c53b63..0513cd6 100644 --- a/asap-summary-ingest/run_arroyosketch.py +++ b/asap-summary-ingest/run_arroyosketch.py @@ -952,10 +952,6 @@ def main(args): parameters["impl_mode"] = getattr( args, "sketch_cmwh_impl", "legacy" ).capitalize() - elif agg_function in ("datasketcheskll_", "hydrakll_"): - parameters["impl_mode"] = getattr( - args, "sketch_kll_impl", "legacy" - ).capitalize() sql_queries.append(sql_query) # if not is_labels_accumulator: @@ -1123,7 +1119,7 @@ def main(args): "--sketch_kll_impl", type=str, choices=["legacy", "sketchlib"], - default="legacy", + default="sketchlib", help="KLL Sketch backend (legacy | sketchlib). Must match QueryEngine.", ) parser.add_argument( diff --git a/asap-summary-ingest/templates/udfs/datasketcheskll_.rs.j2 b/asap-summary-ingest/templates/udfs/datasketcheskll_.rs.j2 index ca34027..5898c92 100644 --- a/asap-summary-ingest/templates/udfs/datasketcheskll_.rs.j2 +++ b/asap-summary-ingest/templates/udfs/datasketcheskll_.rs.j2 @@ -1,15 +1,14 @@ /* [dependencies] -dsrs = { git = "https://github.com/SketchDB/datasketches-rs" } +sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust" } arroyo-udf-plugin = "0.1" rmp-serde = "1.1" serde = { version = "1.0", features = ["derive"] } */ use arroyo_udf_plugin::udf; -use dsrs::KllDoubleSketch; -use rmp_serde::Serializer; use serde::{Deserialize, Serialize}; +use sketchlib_rust::{KLL, SketchInput}; const DEFAULT_K: u16 = {{ k }}; @@ -19,41 +18,18 @@ struct KllSketchData { sketch_bytes: Vec, } -struct KllSketchWrapper { - k: u16, - sketch: KllDoubleSketch, -} - -impl KllSketchWrapper { - fn new(k: u16) -> Self { - KllSketchWrapper { - k, - sketch: KllDoubleSketch::with_k(k), - } - } - - fn update(&mut self, values: &[f64]) { - for &value in values { - self.sketch.update(value); - } - } - - fn serialize_bytes(&self) -> Vec { - let sketch_data = self.sketch.serialize(); - let serialized = KllSketchData { - k: self.k, - sketch_bytes: sketch_data.as_ref().to_vec(), - }; - let mut buf = Vec::new(); - rmp_serde::encode::write(&mut buf, &serialized).unwrap(); - buf - } -} - #[udf] fn datasketcheskll_(values: Vec) -> Option> { - let mut kll_wrapper = KllSketchWrapper::new(DEFAULT_K); - kll_wrapper.update(&values); - - Some(kll_wrapper.serialize_bytes()) + let mut sketch = KLL::init_kll(DEFAULT_K as i32); + for &value in &values { + let _ = sketch.update(&SketchInput::F64(value)); + } + let sketch_bytes = sketch.serialize_to_bytes().ok()?; + let serialized = KllSketchData { + k: DEFAULT_K, + sketch_bytes, + }; + let mut buf = Vec::new(); + rmp_serde::encode::write(&mut buf, &serialized).ok()?; + Some(buf) } diff --git a/asap-summary-ingest/templates/udfs/hydrakll_.rs.j2 b/asap-summary-ingest/templates/udfs/hydrakll_.rs.j2 index b9be3cb..67c558d 100644 --- a/asap-summary-ingest/templates/udfs/hydrakll_.rs.j2 +++ b/asap-summary-ingest/templates/udfs/hydrakll_.rs.j2 @@ -1,6 +1,6 @@ /* [dependencies] -dsrs = { git = "https://github.com/SketchDB/datasketches-rs" } +sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust" } arroyo-udf-plugin = "0.1" rmp-serde = "1.1" serde = { version = "1.0", features = ["derive"] } @@ -8,16 +8,15 @@ xxhash-rust = { version = "0.8", features = ["xxh32"] } */ use arroyo_udf_plugin::udf; -use dsrs::KllDoubleSketch; use rmp_serde::Serializer; use serde::{Deserialize, Serialize}; +use sketchlib_rust::{KLL, SketchInput}; use xxhash_rust::xxh32::xxh32; const ROW_NUM: usize = {{ row_num }}; const COL_NUM: usize = {{ col_num }}; const DEFAULT_K: u16 = {{ k }}; -// Match QueryEngineRust format exactly #[derive(Deserialize, Serialize)] struct KllSketchData { k: u16, @@ -33,43 +32,40 @@ struct HydraKllSketchData { #[udf] fn hydrakll_(keys: Vec<&str>, values: Vec) -> Option> { - // Initialize 2D matrix of KLL sketches - let mut sketches: Vec> = vec![ - vec![KllDoubleSketch::with_k(DEFAULT_K); COL_NUM]; - ROW_NUM - ]; + let mut sketches: Vec> = (0..ROW_NUM) + .map(|_| { + (0..COL_NUM) + .map(|_| KLL::init_kll(DEFAULT_K as i32)) + .collect() + }) + .collect(); - // Process each key-value pair for (i, &key) in keys.iter().enumerate() { if i >= values.len() { break; } - let key_bytes = key.as_bytes(); - - // Update each row using different hash functions for row in 0..ROW_NUM { let hash_value = xxh32(key_bytes, row as u32); let col_index = (hash_value as usize) % COL_NUM; - sketches[row][col_index].update(values[i]); + let _ = sketches[row][col_index].update(&SketchInput::F64(values[i])); } } - // Serialize to match QueryEngineRust format let sketch_data: Vec> = sketches .iter() .map(|row| { row.iter() .map(|sketch| { - let sketch_bytes = sketch.serialize(); - KllSketchData { + let sketch_bytes = sketch.serialize_to_bytes().ok()?; + Some(KllSketchData { k: DEFAULT_K, - sketch_bytes: sketch_bytes.as_ref().to_vec(), - } + sketch_bytes, + }) }) - .collect() + .collect::>>() }) - .collect(); + .collect::>>()?; let hydra_data = HydraKllSketchData { row_num: ROW_NUM, diff --git a/asap-tools/experiments/experiment_utils/services/arroyo.py b/asap-tools/experiments/experiment_utils/services/arroyo.py index a3926aa..80f9b64 100644 --- a/asap-tools/experiments/experiment_utils/services/arroyo.py +++ b/asap-tools/experiments/experiment_utils/services/arroyo.py @@ -106,7 +106,7 @@ def run_arroyosketch( enable_optimized_remote_write: bool = False, avoid_long_ssh: bool = False, sketch_cms_impl: str = "sketchlib", - sketch_kll_impl: str = "legacy", + sketch_kll_impl: str = "sketchlib", sketch_cmwh_impl: str = "sketchlib", ) -> str: """