Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,22 @@
- Removed `SimpleConcurrentLogProcessor` and the `experimental_logs_concurrent_log_processor`
feature flag. The use cases it was designed for (ETW/user_events exporters) are
better served by modeling those exporters as processors directly.
- **Added** `Counter::bind()` and `Histogram::bind()` SDK implementations that
return pre-bound measurement handles (`BoundCounter<T>`, `BoundHistogram<T>`).
Bound instruments resolve the attribute-to-aggregator mapping once at bind time
and cache the result, eliminating per-call HashMap lookups. View attribute
filtering is applied at bind time so the hot path stays free of per-call
attribute processing. Bound and unbound recordings with the same (post-view)
attribute set always aggregate into the same data point, including the empty
attribute set. Bound entries are never evicted during delta collection while
a handle exists — idle cycles produce no export but the tracker persists. If
`bind()` is called at the cardinality limit, the handle binds directly to
the overflow tracker — its writes stay on the same direct (no-lookup) hot
path and consistently land in the `otel.metric.overflow=true` bucket for
the lifetime of the handle. To recover a bound handle after delta collection
frees space, drop the existing handle and call `bind()` again. Gated behind
the `experimental_metrics_bound_instruments` feature flag. Benchmarks show
~28x speedup for counter operations and ~9x for histograms.
- Delta metrics collection now uses in-place eviction instead of draining the
HashMap on every collect cycle. Stale attribute sets that received no measurements
since the last collection are evicted. Note: recovery from cardinality overflow
Expand Down
6 changes: 6 additions & 0 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ experimental_metrics_custom_reader = ["metrics"]
experimental_logs_batch_log_processor_with_async_runtime = ["logs", "experimental_async_runtime"]
experimental_trace_batch_span_processor_with_async_runtime = ["tokio/sync", "trace", "experimental_async_runtime"]
experimental_metrics_disable_name_validation = ["metrics"]
experimental_metrics_bound_instruments = ["metrics", "opentelemetry/experimental_metrics_bound_instruments"]
bench_profiling = []

[[bench]]
Expand Down Expand Up @@ -123,6 +124,11 @@ name = "log"
harness = false
required-features = ["logs"]

[[bench]]
name = "bound_instruments"
harness = false
required-features = ["metrics", "experimental_metrics_custom_reader", "experimental_metrics_bound_instruments", "spec_unstable_metrics_views"]

[lib]
bench = false

Expand Down
207 changes: 207 additions & 0 deletions opentelemetry-sdk/benches/bound_instruments.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::{metrics::MeterProvider as _, Key, KeyValue};
use opentelemetry_sdk::metrics::{Instrument, ManualReader, SdkMeterProvider, Stream, Temporality};

// Run this benchmark with:
// cargo bench --bench bound_instruments --features metrics,experimental_metrics_custom_reader,experimental_metrics_bound_instruments,spec_unstable_metrics_views
//
// Apple M4 Max, 16 cores (12 performance + 4 efficiency), macOS 15.4
//
// Results (3 attributes: method, status, path):
// Counter_Unbound_Delta time: [50.20 ns]
// Counter_Bound_Delta time: [ 1.80 ns] ~28x faster
// Counter_Bound_With_View_Delta time: [ 1.82 ns] view filter applied at bind, not on hot path
// Counter_Bound_AtOverflow_Delta time: [ 1.82 ns] bind() at cardinality limit binds directly to the overflow
// tracker — perf parity with a normal bind, no per-call resolution
// Histogram_Unbound_Delta time: [58.64 ns]
// Histogram_Bound_Delta time: [ 6.50 ns] ~9.0x faster
// Histogram_Bound_AtOverflow_Delta time: [ 6.58 ns] perf parity with a normal bind
// Counter_Bound_Multithread/2 time: [21.59 µs] (100 adds/thread)
// Counter_Bound_Multithread/4 time: [37.21 µs] (100 adds/thread)
// Counter_Bound_Multithread/8 time: [71.70 µs] (100 adds/thread)
//
// Note: criterion does not fail CI on regression by itself. These numbers are
// reference values for human review; use `cargo criterion --baseline` locally
// if you need automated comparison against a saved baseline.

fn create_provider(temporality: Temporality) -> SdkMeterProvider {
let reader = ManualReader::builder()
.with_temporality(temporality)
.build();
SdkMeterProvider::builder().with_reader(reader).build()
}

fn bench_bound_instruments(c: &mut Criterion) {
let mut group = c.benchmark_group("BoundInstruments");
group.sample_size(100);

let attrs = [
KeyValue::new("method", "GET"),
KeyValue::new("status", "200"),
KeyValue::new("path", "/api/v1/users"),
];

// Counter: Unbound vs Bound (Delta)
{
let provider = create_provider(Temporality::Delta);
let meter = provider.meter("bench");
let counter = meter.u64_counter("unbound").build();
group.bench_function("Counter_Unbound_Delta", |b| {
b.iter(|| counter.add(1, &attrs));
});
}

{
let provider = create_provider(Temporality::Delta);
let meter = provider.meter("bench");
let counter = meter.u64_counter("bound").build();
let bound = counter.bind(&attrs);
group.bench_function("Counter_Bound_Delta", |b| {
b.iter(|| bound.add(1));
});
}

// Counter: Bound with a View filter — confirms the filter is applied at
// bind() time and the hot path stays free of attribute processing.
{
let view = |i: &opentelemetry_sdk::metrics::Instrument| {
if i.name() == "bound_with_view" {
Stream::builder()
.with_allowed_attribute_keys(vec![
Key::new("method"),
Key::new("status"),
Key::new("path"),
])
.build()
.ok()
} else {
None
}
};
let reader = ManualReader::builder()
.with_temporality(Temporality::Delta)
.build();
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_view(view)
.build();
let meter = provider.meter("bench");
let counter = meter.u64_counter("bound_with_view").build();
let bound = counter.bind(&attrs);
group.bench_function("Counter_Bound_With_View_Delta", |b| {
b.iter(|| bound.add(1));
});
}

// Counter: Bound at overflow — confirms that binding when the cardinality
// limit is exhausted yields the same hot-path performance as a normal bind
// (writes go directly to the overflow tracker, no per-call resolution).
{
let cardinality_limit = 4;
let view = move |i: &Instrument| {
if i.name() == "bound_at_overflow" {
Stream::builder()
.with_cardinality_limit(cardinality_limit)
.build()
.ok()
} else {
None
}
};
let reader = ManualReader::builder()
.with_temporality(Temporality::Delta)
.build();
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_view(view)
.build();
let meter = provider.meter("bench");
let counter = meter.u64_counter("bound_at_overflow").build();
// Saturate cardinality with unbound calls so bind() lands in overflow.
for i in 0..cardinality_limit {
counter.add(1, &[KeyValue::new("filler", i as i64)]);
}
let bound = counter.bind(&attrs);
group.bench_function("Counter_Bound_AtOverflow_Delta", |b| {
b.iter(|| bound.add(1));
});
}

// Histogram: Unbound vs Bound (Delta)
{
let provider = create_provider(Temporality::Delta);
let meter = provider.meter("bench");
let histogram = meter.f64_histogram("unbound_hist").build();
group.bench_function("Histogram_Unbound_Delta", |b| {
b.iter(|| histogram.record(1.5, &attrs));
});
}

{
let provider = create_provider(Temporality::Delta);
let meter = provider.meter("bench");
let histogram = meter.f64_histogram("bound_hist").build();
let bound = histogram.bind(&attrs);
group.bench_function("Histogram_Bound_Delta", |b| {
b.iter(|| bound.record(1.5));
});
}

// Histogram: Bound at overflow — same property as the counter version.
{
let cardinality_limit = 4;
let view = move |i: &Instrument| {
if i.name() == "bound_hist_at_overflow" {
Stream::builder()
.with_cardinality_limit(cardinality_limit)
.build()
.ok()
} else {
None
}
};
let reader = ManualReader::builder()
.with_temporality(Temporality::Delta)
.build();
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_view(view)
.build();
let meter = provider.meter("bench");
let histogram = meter.f64_histogram("bound_hist_at_overflow").build();
for i in 0..cardinality_limit {
histogram.record(1.5, &[KeyValue::new("filler", i as i64)]);
}
let bound = histogram.bind(&attrs);
group.bench_function("Histogram_Bound_AtOverflow_Delta", |b| {
b.iter(|| bound.record(1.5));
});
}

// Multi-threaded bound counter
for num_threads in [2, 4, 8] {
let provider = create_provider(Temporality::Delta);
let meter = provider.meter("bench");
let counter = meter.u64_counter("mt_bound").build();
let bound = counter.bind(&attrs);

group.bench_function(format!("Counter_Bound_Multithread/{num_threads}"), |b| {
b.iter(|| {
std::thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
for _ in 0..100 {
bound.add(1);
}
});
}
});
});
});
}

group.finish();
}

criterion_group!(benches, bench_bound_instruments);
criterion_main!(benches);
27 changes: 27 additions & 0 deletions opentelemetry-sdk/src/metrics/instrument.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use std::{borrow::Cow, collections::HashSet, error::Error, sync::Arc};

#[cfg(feature = "experimental_metrics_bound_instruments")]
use opentelemetry::metrics::BoundSyncInstrument;
use opentelemetry::{
metrics::{AsyncInstrument, SyncInstrument},
InstrumentationScope, Key, KeyValue,
};

#[cfg(feature = "experimental_metrics_bound_instruments")]
use crate::metrics::internal::BoundMeasure;
use crate::metrics::{aggregation::Aggregation, internal::Measure};

use super::meter::{
Expand Down Expand Up @@ -388,6 +392,29 @@ impl<T: Copy + 'static> SyncInstrument<T> for ResolvedMeasures<T> {
measure.call(val, attrs)
}
}

#[cfg(feature = "experimental_metrics_bound_instruments")]
fn bind(&self, attrs: &[KeyValue]) -> Box<dyn BoundSyncInstrument<T> + Send + Sync> {
let bound_measures: Vec<Box<dyn BoundMeasure<T>>> =
self.measures.iter().map(|m| m.bind(attrs)).collect();
Box::new(ResolvedBoundMeasures {
measures: bound_measures,
})
}
}

#[cfg(feature = "experimental_metrics_bound_instruments")]
pub(crate) struct ResolvedBoundMeasures<T> {
measures: Vec<Box<dyn BoundMeasure<T>>>,
}

#[cfg(feature = "experimental_metrics_bound_instruments")]
impl<T: Copy + 'static> BoundSyncInstrument<T> for ResolvedBoundMeasures<T> {
fn measure(&self, val: T) {
for measure in &self.measures {
measure.call(val);
}
}
}

#[derive(Clone)]
Expand Down
33 changes: 33 additions & 0 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,39 @@ use super::{
/// Receives measurements to be aggregated.
pub(crate) trait Measure<T>: Send + Sync + 'static {
fn call(&self, measurement: T, attrs: &[KeyValue]);

#[cfg(feature = "experimental_metrics_bound_instruments")]
fn bind(&self, attrs: &[KeyValue]) -> Box<dyn BoundMeasure<T>>;
}

/// A pre-bound measurement handle that bypasses attribute lookup.
#[cfg(feature = "experimental_metrics_bound_instruments")]
pub(crate) trait BoundMeasure<T>: Send + Sync + 'static {
fn call(&self, measurement: T);
}

/// A bound handle that drops every measurement silently. Used when
/// `ValueMap::bind` returns `None` because the trackers `RwLock` is poisoned —
/// an extremely rare degenerate state in which the SDK can no longer aggregate
/// reliably. Returning a noop here mirrors `measure()`'s own poison handling
/// (silent drop) rather than panicking on the user's hot path.
#[cfg(feature = "experimental_metrics_bound_instruments")]
pub(crate) struct NoopBoundMeasure<T> {
_marker: marker::PhantomData<T>,
}

#[cfg(feature = "experimental_metrics_bound_instruments")]
impl<T> NoopBoundMeasure<T> {
pub(crate) fn new() -> Self {
Self {
_marker: marker::PhantomData,
}
}
}

#[cfg(feature = "experimental_metrics_bound_instruments")]
impl<T: Send + Sync + 'static> BoundMeasure<T> for NoopBoundMeasure<T> {
fn call(&self, _measurement: T) {}
}

/// Stores the aggregate of measurements into the aggregation and returns the number
Expand Down
Loading
Loading