From c430e326b9d0977538a4f4ff9059fd2e6e16d993 Mon Sep 17 00:00:00 2001 From: Florian Engelhardt Date: Tue, 10 Mar 2026 11:18:57 +0100 Subject: [PATCH 1/2] Add benchmark for timestamped `add_sample` --- libdd-profiling/benches/add_samples.rs | 37 +++++++++++++++++++++----- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/libdd-profiling/benches/add_samples.rs b/libdd-profiling/benches/add_samples.rs index fc27d886b3..4edcd94dd1 100644 --- a/libdd-profiling/benches/add_samples.rs +++ b/libdd-profiling/benches/add_samples.rs @@ -106,12 +106,20 @@ pub fn bench_add_sample_vs_add2(c: &mut Criterion) { let functions = dict.functions(); let thread_id = get_current_thread_id(); let thread_id_key: StringId2 = strings.try_insert("thread id").unwrap().into(); - let labels_api = vec![api::Label { - key: "thread id", - str: "", - num: thread_id, - num_unit: "", - }]; + let labels_api = vec![ + api::Label { + key: "thread id", + str: "", + num: thread_id, + num_unit: "", + }, + api::Label { + key: "thread name", + str: "this thread", + num: 0, + num_unit: "", + }, + ]; let frames2 = frames.map(|f| { let set_id = functions @@ -128,6 +136,23 @@ pub fn bench_add_sample_vs_add2(c: &mut Criterion) { }); let dict = profiling::profiles::collections::Arc::try_new(dict).unwrap(); + c.bench_function("profile_add_sample_timestamped_x1000", |b| { + b.iter(|| { + let mut profile = profiling::internal::Profile::try_new(&sample_types, None).unwrap(); + let (locations, values) = make_stack_api(frames.as_slice()); + for i in 0..1000 { + let sample = api::Sample { + locations: locations.clone(), + values: &values, + labels: labels_api.clone(), + }; + let ts = std::num::NonZeroI64::new(i + 1); + black_box(profile.try_add_sample(sample, ts)).unwrap(); + } + black_box(profile.only_for_testing_num_aggregated_samples()) + }) + }); + c.bench_function("profile_add_sample_frames_x1000", |b| { b.iter(|| { let mut profile = profiling::internal::Profile::try_new(&sample_types, None).unwrap(); From 96fdc3bf6ae7ede2fee42a7317d9c33e3ab05941 Mon Sep 17 00:00:00 2001 From: Florian Engelhardt Date: Tue, 10 Mar 2026 11:03:14 +0100 Subject: [PATCH 2/2] add `BufWriter` to limit calls to `ZSTD_compressStream` --- .../observation/timestamped_observations.rs | 18 +++++++++++------- libdd-profiling/src/profiles/compressor.rs | 10 ++++++++++ 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/libdd-profiling/src/internal/observation/timestamped_observations.rs b/libdd-profiling/src/internal/observation/timestamped_observations.rs index b21219ff9f..69a0282442 100644 --- a/libdd-profiling/src/internal/observation/timestamped_observations.rs +++ b/libdd-profiling/src/internal/observation/timestamped_observations.rs @@ -12,12 +12,12 @@ use crate::collections::identifiable::Id; use crate::internal::Timestamp; use crate::profiles::{DefaultObservationCodec as DefaultCodec, ObservationCodec}; use byteorder::{NativeEndian, ReadBytesExt}; -use std::io::{self, Write}; +use std::io::{self, BufWriter, Write}; pub type TimestampedObservations = TimestampedObservationsImpl; pub struct TimestampedObservationsImpl { - compressed_timestamped_data: C::Encoder, + compressed_timestamped_data: BufWriter, sample_types_len: usize, } @@ -40,10 +40,10 @@ impl TimestampedObservationsImpl { pub fn try_new(sample_types_len: usize) -> io::Result { Ok(Self { - compressed_timestamped_data: C::new_encoder( - Self::DEFAULT_BUFFER_SIZE, - Self::MAX_CAPACITY, - )?, + compressed_timestamped_data: BufWriter::with_capacity( + C::recommended_input_buf_size(), + C::new_encoder(Self::DEFAULT_BUFFER_SIZE, Self::MAX_CAPACITY)?, + ), sample_types_len, }) } @@ -74,8 +74,12 @@ impl TimestampedObservationsImpl { } pub fn try_into_iter(self) -> io::Result> { + let encoder = self + .compressed_timestamped_data + .into_inner() + .map_err(|e| e.into_error())?; Ok(TimestampedObservationsIterImpl { - decoder: C::encoder_into_decoder(self.compressed_timestamped_data)?, + decoder: C::encoder_into_decoder(encoder)?, sample_types_len: self.sample_types_len, }) } diff --git a/libdd-profiling/src/profiles/compressor.rs b/libdd-profiling/src/profiles/compressor.rs index 6de02b8abd..83dc326d48 100644 --- a/libdd-profiling/src/profiles/compressor.rs +++ b/libdd-profiling/src/profiles/compressor.rs @@ -145,6 +145,12 @@ pub trait ObservationCodec { fn new_encoder(size_hint: usize, max_capacity: usize) -> io::Result; fn encoder_into_decoder(encoder: Self::Encoder) -> io::Result; + + /// Returns the recommended input buffer size for the encoder. + /// Used to size the `BufWriter` that wraps the encoder. + fn recommended_input_buf_size() -> usize { + 0 + } } #[allow(unused)] @@ -181,6 +187,10 @@ impl ObservationCodec for ZstdObservationCodec { Err((_enc, error)) => Err(error), } } + + fn recommended_input_buf_size() -> usize { + zstd::Encoder::::recommended_input_size() + } } #[cfg(not(miri))]