diff --git a/Cargo.toml b/Cargo.toml index 58c30b71..0c380cb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "ratelimit", "ringlog", "switchboard", + "waterfall", ] [profile.bench] diff --git a/awaken/Cargo.toml b/awaken/Cargo.toml index ee34b68f..4c1d079b 100644 --- a/awaken/Cargo.toml +++ b/awaken/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "awaken" -version = "0.2.0" +version = "0.3.0" edition = "2021" license = "Apache-2.0" authors = ["Brian Martin "] @@ -9,7 +9,7 @@ homepage = "https://github.com/pelikan-io/rustcommon" repository = "https://github.com/pelikan-io/rustcommon" [dependencies] -mio = "0.8.11" +mio = "1.1" [target.'cfg(target_os = "linux")'.dependencies] -libc = "0.2.139" +libc = "0.2.183" diff --git a/clocksource/Cargo.toml b/clocksource/Cargo.toml index 8345f06b..a7834b22 100644 --- a/clocksource/Cargo.toml +++ b/clocksource/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "clocksource" -version = "0.8.2" +version = "0.8.3" authors = ["Brian Martin "] edition = "2021" description = "Library for times and durations with fixed-size representations" @@ -9,8 +9,8 @@ homepage = "https://github.com/pelikan-io/rustcommon" repository = "https://github.com/pelikan-io/rustcommon" [dependencies] -libc = "0.2.147" -time = { version = "0.3.36", features = ["formatting"] } +libc = "0.2.183" +time = { version = "0.3.47", features = ["formatting"] } [target.'cfg(windows)'.dependencies] winapi = { version = "0.3.9", features = ["ntdef", "profileapi", "sysinfoapi"] } diff --git a/histogram/Cargo.toml b/histogram/Cargo.toml index 8b9fa032..139f15f0 100644 --- a/histogram/Cargo.toml +++ b/histogram/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "histogram" -version = "0.11.4" +version = "0.11.5" edition = "2021" authors = ["Brian Martin "] license = "MIT OR Apache-2.0" @@ -9,13 +9,13 @@ homepage = "https://github.com/pelikan-io/rustcommon" repository = "https://github.com/pelikan-io/rustcommon" [dependencies] -schemars = { version = "0.8", optional = true } -serde = { version = "1.0.144", features = ["derive"], optional = true } -thiserror = "1.0.47" +schemars = { version = "1.2", optional = true } +serde = { version = "1.0.228", features = ["derive"], optional = true } +thiserror = "2.0" [dev-dependencies] -criterion = "0.5.1" -rand = "0.8.5" +criterion = "0.8" +rand = "0.10" [features] schemars = ["dep:schemars", "serde"] diff --git a/histogram/src/sparse.rs b/histogram/src/sparse.rs index 9070acc0..2095e48e 100644 --- a/histogram/src/sparse.rs +++ b/histogram/src/sparse.rs @@ -331,7 +331,7 @@ impl From<&Histogram> for SparseHistogram { #[cfg(test)] mod tests { - use rand::Rng; + use rand::RngExt; use std::collections::HashMap; use super::*; @@ -506,11 +506,11 @@ mod tests { #[test] fn downsample() { let mut histogram = Histogram::new(8, 32).unwrap(); - let mut rng = rand::thread_rng(); + let mut rng = rand::rng(); // Generate 10,000 values to store in a sorted array and a histogram for _ in 0..10000 { - let v: u64 = rng.gen_range(1..2_u64.pow(histogram.config.max_value_power() as u32)); + let v: u64 = rng.random_range(1..2_u64.pow(histogram.config.max_value_power() as u32)); let _ = histogram.increment(v); } diff --git a/histogram/src/standard.rs b/histogram/src/standard.rs index 31bd51a1..e58c6d8d 100644 --- a/histogram/src/standard.rs +++ b/histogram/src/standard.rs @@ -250,7 +250,7 @@ impl Histogram { } /// Returns an interator across the histogram. - pub fn iter(&self) -> Iter { + pub fn iter(&self) -> Iter<'_> { Iter { index: 0, histogram: self, @@ -315,7 +315,7 @@ impl From<&SparseHistogram> for Histogram { #[cfg(test)] mod tests { use super::*; - use rand::Rng; + use rand::RngExt; #[cfg(target_pointer_width = "64")] #[test] @@ -408,11 +408,11 @@ mod tests { fn downsample() { let mut histogram = Histogram::new(8, 32).unwrap(); let mut vals: Vec = Vec::with_capacity(10000); - let mut rng = rand::thread_rng(); + let mut rng = rand::rng(); // Generate 10,000 values to store in a sorted array and a histogram for _ in 0..vals.capacity() { - let v: u64 = rng.gen_range(1..2_u64.pow(histogram.config.max_value_power() as u32)); + let v: u64 = rng.random_range(1..2_u64.pow(histogram.config.max_value_power() as u32)); vals.push(v); let _ = histogram.increment(v); } diff --git a/ratelimit/Cargo.toml b/ratelimit/Cargo.toml index 9744ca96..a64c31f4 100644 --- a/ratelimit/Cargo.toml +++ b/ratelimit/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ratelimit" -version = "0.10.0" +version = "0.10.1" authors = ["Brian Martin "] edition = "2021" license = "MIT OR Apache-2.0" @@ -10,5 +10,5 @@ repository = "https://github.com/pelikan-io/rustcommon" [dependencies] clocksource = { version = "0.8.0", path = "../clocksource" } -parking_lot = "0.12.1" -thiserror = "1.0.40" +parking_lot = "0.12.5" +thiserror = "2.0" diff --git a/ringlog/Cargo.toml b/ringlog/Cargo.toml index ebc8076d..97e572e3 100644 --- a/ringlog/Cargo.toml +++ b/ringlog/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ringlog" -version = "0.8.0" +version = "0.8.1" edition = "2021" license = "Apache-2.0" authors = ["Brian Martin "] @@ -9,9 +9,9 @@ homepage = "https://github.com/pelikan-io/rustcommon" repository = "https://github.com/pelikan-io/rustcommon" [dependencies] -ahash = "0.8.0" +ahash = "0.8.12" clocksource = { version = "0.8.0", path = "../clocksource" } -log = { version = "0.4.17", features = ["std"] } +log = { version = "0.4.29", features = ["std"] } metriken = { version = "0.7.0", optional = true } mpmc = "0.1.6" diff --git a/switchboard/Cargo.toml b/switchboard/Cargo.toml index 7e4bb924..99bb18a8 100644 --- a/switchboard/Cargo.toml +++ b/switchboard/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "switchboard" -version = "0.3.0" +version = "0.4.0" edition = "2021" license = "Apache-2.0" authors = ["Brian Martin "] @@ -9,11 +9,11 @@ homepage = "https://github.com/pelikan-io/rustcommon" repository = "https://github.com/pelikan-io/rustcommon" [dependencies] -awaken = { version = "0.2.0", path = "../awaken" } -crossbeam-queue = "0.3.8" -rand = "0.8.5" -rand_chacha = "0.3.1" -thiserror = "1.0" +awaken = { version = "0.3.0", path = "../awaken" } +crossbeam-queue = "0.3.12" +rand = "0.10" +rand_chacha = "0.10" +thiserror = "2.0" [dev-dependencies] -mio = { version = "0.8.11", features = ["os-poll"]} +mio = { version = "1.1", features = ["os-poll"]} diff --git a/switchboard/src/lib.rs b/switchboard/src/lib.rs index d2f9bfad..2c8eaaa3 100644 --- a/switchboard/src/lib.rs +++ b/switchboard/src/lib.rs @@ -7,9 +7,8 @@ pub use awaken::Waker; use crossbeam_queue::*; -use rand::distributions::Uniform; -use rand::Rng as RandRng; -use rand::SeedableRng; +use rand::distr::Uniform; +use rand::{RngExt, SeedableRng}; use rand_chacha::ChaCha20Rng; use std::sync::Arc; use thiserror::Error; @@ -176,8 +175,8 @@ impl Queues { a.push(Queues { senders: a_tx.clone(), receiver, - rng: ChaCha20Rng::from_entropy(), - distr: Uniform::new(0, a_tx.len()), + rng: ChaCha20Rng::from_rng(&mut rand::rng()), + distr: Uniform::new(0, a_tx.len()).unwrap(), id, }) } @@ -186,8 +185,8 @@ impl Queues { b.push(Queues { senders: b_tx.clone(), receiver, - rng: ChaCha20Rng::from_entropy(), - distr: Uniform::new(0, b_tx.len()), + rng: ChaCha20Rng::from_rng(&mut rand::rng()), + distr: Uniform::new(0, b_tx.len()).unwrap(), id, }) } @@ -223,7 +222,7 @@ impl Queues { sender: self.id, inner: item, }) - .map_err(|e| e.into_inner()) + .map_err(|e: TrackedItem| e.into_inner()) } /// Try to send a single item to any receiver. Uses a uniform random @@ -240,7 +239,7 @@ impl Queues { sender: self.id, inner: item, }) - .map_err(|e| e.into_inner()) + .map_err(|e: TrackedItem| e.into_inner()) } /// Wake any remote receivers which have been sent items since the last time diff --git a/waterfall/Cargo.toml b/waterfall/Cargo.toml index 12dd6a8d..e227ef23 100644 --- a/waterfall/Cargo.toml +++ b/waterfall/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "waterfall" -version = "0.8.2" +version = "0.9.0" authors = ["Brian Martin "] edition = "2021" license = "Apache-2.0" @@ -9,14 +9,13 @@ homepage = "https://github.com/pelikan-io/rustcommon" repository = "https://github.com/pelikan-io/rustcommon" [dependencies] -clocksource = { version = "0.6.0" } +clocksource = { version = "0.8.0", path = "../clocksource" } dejavu = "2.37.0" -image = "0.24.3" -log = "0.4.17" -heatmap = { version = "0.7.1", path = "../heatmap" } -histogram = { version = "0.7.1" } -rusttype = "0.9.2" +histogram = { version = "0.11.5", path = "../histogram" } +image = "0.25" +log = "0.4.29" +rusttype = "0.9.3" [dev-dependencies] -rand = "0.8.5" -rand_distr = "0.4.3" +rand = "0.10" +rand_distr = "0.6" diff --git a/waterfall/examples/simulator.rs b/waterfall/examples/simulator.rs index 39bf48ce..a434c813 100644 --- a/waterfall/examples/simulator.rs +++ b/waterfall/examples/simulator.rs @@ -2,9 +2,10 @@ // Licensed under the Apache License, Version 2.0 // http://www.apache.org/licenses/LICENSE-2.0 -use heatmap::*; -use rand::thread_rng; +use clocksource::precise::Duration; +use rand::RngExt; use rand_distr::*; +use std::time::Instant; use waterfall::*; fn main() { @@ -31,46 +32,38 @@ pub enum Shape { } pub fn simulate(shape: Shape) { - let duration = Duration::from_secs(10); + let run_duration = std::time::Duration::from_secs(10); println!( "Simulating for {:?} distribution for {:?} seconds", shape, - duration.as_secs_f64() + run_duration.as_secs_f64() ); - let heatmap = Heatmap::new( - 0, - 10, - 30, - Duration::from_secs(10), - Duration::from_millis(250), - None, - None, - ) - .unwrap(); + let mut heatmap = + Heatmap::new(0, 30, Duration::from_secs(10), Duration::from_millis(250)).unwrap(); let cauchy = Cauchy::new(500_000.0, 2_000.00).unwrap(); let normal = Normal::new(200_000.0, 100_000.0).unwrap(); - let uniform = Uniform::new_inclusive(10_000.0, 200_000.0); + let uniform = rand_distr::Uniform::new(10_000.0, 200_000.0).unwrap(); let triangular = Triangular::new(1.0, 200_000.0, 50_000.0).unwrap(); let gamma = Gamma::new(2.0, 2.0).unwrap(); - let mut rng = thread_rng(); + let mut rng = rand::rng(); let start = Instant::now(); loop { - if start.elapsed() >= duration { + if start.elapsed() >= run_duration { break; } let value: f64 = match shape { - Shape::Cauchy => cauchy.sample(&mut rng), - Shape::Normal => normal.sample(&mut rng), - Shape::Uniform => uniform.sample(&mut rng), - Shape::Triangular => triangular.sample(&mut rng), - Shape::Gamma => gamma.sample(&mut rng) * 100_000.0, + Shape::Cauchy => rng.sample(cauchy), + Shape::Normal => rng.sample(normal), + Shape::Uniform => rng.sample(uniform), + Shape::Triangular => rng.sample(triangular), + Shape::Gamma => rng.sample(gamma) * 100_000.0, }; let value = value.floor() as u64; if value != 0 { - let _ = heatmap.increment(Instant::now(), value, 1); + let _ = heatmap.increment(clocksource::precise::Instant::now(), value, 1); } } diff --git a/waterfall/src/heatmap.rs b/waterfall/src/heatmap.rs new file mode 100644 index 00000000..b2d55573 --- /dev/null +++ b/waterfall/src/heatmap.rs @@ -0,0 +1,122 @@ +use clocksource::precise::{Duration, Instant, UnixInstant}; +use histogram::Histogram; + +/// A time-series collection of histograms, where each slice represents a fixed +/// time interval. +pub struct Heatmap { + slices: Vec, + config: histogram::Config, + resolution: Duration, + start: UnixInstant, + current_tick: usize, +} + +impl Heatmap { + /// Create a new `Heatmap`. + /// + /// * `grouping_power` - controls the bucket granularity (see [`histogram::Config`]) + /// * `max_value_power` - controls the max representable value + /// * `span` - total time window the heatmap covers + /// * `resolution` - duration of each time slice + pub fn new( + grouping_power: u8, + max_value_power: u8, + span: Duration, + resolution: Duration, + ) -> Result { + let config = histogram::Config::new(grouping_power, max_value_power)?; + let num_slices = (span.as_nanos() / resolution.as_nanos()) as usize; + let slices = (0..num_slices) + .map(|_| Histogram::with_config(&config)) + .collect(); + + Ok(Self { + slices, + config, + resolution, + start: UnixInstant::now(), + current_tick: 0, + }) + } + + /// Record a value at the given instant. + pub fn increment( + &mut self, + now: Instant, + value: u64, + count: u64, + ) -> Result<(), histogram::Error> { + let elapsed = now.elapsed(); + // This is a rough approach: use the monotonic clock offset to pick a slice + let tick = if self.resolution.as_nanos() > 0 { + (elapsed.as_nanos() / self.resolution.as_nanos()) as usize + } else { + 0 + }; + // Wrap around if we exceed the number of slices + let idx = tick % self.slices.len(); + + // If we've advanced past our previous tick, clear stale slices + if tick > self.current_tick { + let start_clear = (self.current_tick + 1) % self.slices.len(); + let end_clear = (tick + 1) % self.slices.len(); + if tick - self.current_tick >= self.slices.len() { + // Clear all slices + for s in &mut self.slices { + for v in s.as_mut_slice().iter_mut() { + *v = 0; + } + } + } else if end_clear > start_clear { + for i in start_clear..end_clear { + for v in self.slices[i].as_mut_slice().iter_mut() { + *v = 0; + } + } + } else { + for i in start_clear..self.slices.len() { + for v in self.slices[i].as_mut_slice().iter_mut() { + *v = 0; + } + } + for i in 0..end_clear { + for v in self.slices[i].as_mut_slice().iter_mut() { + *v = 0; + } + } + } + self.current_tick = tick; + } + + self.slices[idx].add(value, count) + } + + /// Returns the number of active (non-empty or allocated) slices. + pub fn active_slices(&self) -> usize { + self.slices.len() + } + + /// Returns the number of buckets per histogram slice. + pub fn buckets(&self) -> usize { + self.config.total_buckets() + } + + /// Returns the time resolution (duration of each slice). + pub fn resolution(&self) -> Duration { + self.resolution + } + + /// Returns the start time of the heatmap. + pub fn start_at(&self) -> UnixInstant { + self.start + } +} + +impl<'a> IntoIterator for &'a Heatmap { + type Item = &'a Histogram; + type IntoIter = std::slice::Iter<'a, Histogram>; + + fn into_iter(self) -> Self::IntoIter { + self.slices.iter() + } +} diff --git a/waterfall/src/lib.rs b/waterfall/src/lib.rs index 47a392b2..c61b685f 100644 --- a/waterfall/src/lib.rs +++ b/waterfall/src/lib.rs @@ -4,10 +4,12 @@ //! This crate is used to render a waterfall style plot of a heatmap +mod heatmap; mod palettes; -use clocksource::{DateTime, Nanoseconds, UnixInstant}; -use heatmap::*; +use clocksource::datetime::DateTime; +use clocksource::precise::{Duration, UnixInstant}; +pub use heatmap::Heatmap; pub use palettes::Palette; use image::*; @@ -80,11 +82,11 @@ impl WaterfallBuilder { } // find the bucket with the highest weight - fn max_weight(&self, heatmap: &heatmap::Heatmap) -> f64 { + fn max_weight(&self, heatmap: &Heatmap) -> f64 { let mut max_weight = 0.0; for slice in heatmap { for b in slice { - let weight = self.weight(b.count().into(), b.high() - b.low() + 1); + let weight = self.weight(b.count(), b.end() - b.start() + 1); if weight > max_weight { max_weight = weight; } @@ -94,7 +96,7 @@ impl WaterfallBuilder { } /// Generate the waterfall from the provided heatmap - pub fn build(self, heatmap: &heatmap::Heatmap) { + pub fn build(self, heatmap: &Heatmap) { let height = heatmap.active_slices(); let width = heatmap.buckets(); @@ -123,7 +125,7 @@ impl WaterfallBuilder { // build grayscale buffer for (y, slice) in heatmap.into_iter().enumerate() { for (x, b) in slice.into_iter().enumerate() { - let weight = self.weight(b.count().into(), b.high() - b.low() + 1); + let weight = self.weight(b.count(), b.end() - b.start() + 1); let scaled_weight = weight / max_weight; let index = (scaled_weight * (colors.len() - 1) as f64).round() as u8; buf.put_pixel( @@ -149,7 +151,7 @@ impl WaterfallBuilder { // set the pixels in the buffer for (y, slice) in heatmap.into_iter().enumerate() { for (x, b) in slice.into_iter().enumerate() { - let weight = self.weight(b.count().into(), b.high() - b.low() + 1); + let weight = self.weight(b.count(), b.end() - b.start() + 1); let scaled_weight = weight / max_weight; let index = (scaled_weight * (colors.len() - 1) as f64).round() as usize; let color = colors[index]; @@ -166,7 +168,7 @@ impl WaterfallBuilder { if !label_keys.is_empty() { let slice = heatmap.into_iter().next().unwrap(); for (x, bucket) in slice.into_iter().enumerate() { - let value = bucket.high(); + let value = bucket.end(); if value >= label_keys[l] { if let Some(label) = labels.get(&label_keys[l]) { render_text(label, 25.0, x, 0, &mut buf); @@ -187,7 +189,7 @@ impl WaterfallBuilder { } // add the timestamp labels along the left side - let now = UnixInstant::>::now(); + let now = UnixInstant::now(); let mut display_time = heatmap.start_at(); let ntick = (1 + now.duration_since(display_time).as_nanos() / heatmap.resolution().as_nanos()) as usize;