Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
### Bugs Fixed

### Other Changes
- Move sampler argument extraction logic from distro to respective samplers
([#45849](https://github.com/Azure/azure-sdk-for-python/pull/45849))

## 1.0.0b49 (2026-03-19)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
# Licensed under the MIT License.

import math
import os
import threading
import time
from typing import Optional, Sequence
from logging import getLogger
from opentelemetry.context import Context
from opentelemetry.trace import Link, SpanKind, format_trace_id
from opentelemetry.sdk.trace.sampling import (
Expand All @@ -16,6 +18,10 @@
from opentelemetry.trace.span import TraceState
from opentelemetry.util.types import Attributes

from opentelemetry.sdk.environment_variables import (
OTEL_TRACES_SAMPLER_ARG,
)

from azure.monitor.opentelemetry.exporter._constants import _SAMPLE_RATE_KEY

from azure.monitor.opentelemetry.exporter.export.trace._utils import (
Expand All @@ -24,6 +30,13 @@
parent_context_sampling,
)

_INVALID_TRACES_PER_SECOND_MESSAGE = "Invalid value '%s' for traces per second. Expected a float. Defaulting to %s."
_INVALID_TRACES_PER_SECOND_MESSAGE_NEGATIVE_VALUE = (
"Invalid value '%s' for traces per second. It should be a non-negative number. Defaulting to %s"
)

_logger = getLogger(__name__)


class _State:
def __init__(self, effective_window_count: float, effective_window_nanoseconds: float, last_nano_time: int):
Expand All @@ -33,13 +46,13 @@ def __init__(self, effective_window_count: float, effective_window_nanoseconds:


class RateLimitedSamplingPercentage:
def __init__(self, target_spans_per_second_limit: float, round_to_nearest: bool = True):
if target_spans_per_second_limit < 0.0:
def __init__(self, traces_per_second: float, round_to_nearest: bool = True):
if traces_per_second < 0.0:
raise ValueError("Limit for sampled spans per second must be nonnegative!")
# Hardcoded adaptation time of 0.1 seconds for adjusting to sudden changes in telemetry volumes
adaptation_time_seconds = 0.1
self._inverse_adaptation_time_nanoseconds = 1e-9 / adaptation_time_seconds
self._target_spans_per_nanosecond_limit = 1e-9 * target_spans_per_second_limit
self._target_spans_per_nanosecond_limit = 1e-9 * traces_per_second
initial_nano_time = int(time.time_ns())
self._state = _State(0.0, 0.0, initial_nano_time)
self._lock = threading.Lock()
Expand Down Expand Up @@ -82,9 +95,42 @@ def get(self) -> float:


class RateLimitedSampler(Sampler):
def __init__(self, target_spans_per_second_limit: float):
self._sampling_percentage_generator = RateLimitedSamplingPercentage(target_spans_per_second_limit)
self._description = f"RateLimitedSampler{{{target_spans_per_second_limit}}}"
def __init__(self, traces_per_second: Optional[float] = None):
default_traces_per_second = 5.0
if traces_per_second is not None:
try:
traces_per_second = float(traces_per_second)
if not math.isfinite(traces_per_second) or traces_per_second < 0.0:
_logger.error(
_INVALID_TRACES_PER_SECOND_MESSAGE_NEGATIVE_VALUE, traces_per_second, default_traces_per_second
)
traces_per_second = default_traces_per_second
else:
_logger.info("Using rate limited sampler: %s traces per second", traces_per_second)
except (ValueError, TypeError):
_logger.error(_INVALID_TRACES_PER_SECOND_MESSAGE, traces_per_second, default_traces_per_second)
traces_per_second = default_traces_per_second
else:
sampling_arg = os.environ.get(OTEL_TRACES_SAMPLER_ARG)
try:
sampler_value = float(sampling_arg) if sampling_arg is not None else default_traces_per_second
if not math.isfinite(sampler_value) or sampler_value < 0.0:
_logger.error(
_INVALID_TRACES_PER_SECOND_MESSAGE_NEGATIVE_VALUE, sampler_value, default_traces_per_second
)
traces_per_second = default_traces_per_second
else:
_logger.info("Using rate limited sampler: %s traces per second", sampler_value)
traces_per_second = sampler_value
except ValueError:
_logger.error( # pylint: disable=C0301
_INVALID_TRACES_PER_SECOND_MESSAGE,
sampling_arg,
default_traces_per_second,
)
traces_per_second = default_traces_per_second
self._sampling_percentage_generator = RateLimitedSamplingPercentage(traces_per_second)
self._description = f"RateLimitedSampler{{{traces_per_second}}}"

def should_sample(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import math
import os
from typing import Optional, Sequence
from logging import getLogger

from opentelemetry.context import Context
from opentelemetry.trace import Link, SpanKind, format_trace_id
Expand All @@ -13,10 +16,18 @@
from opentelemetry.trace.span import TraceState
from opentelemetry.util.types import Attributes

from opentelemetry.sdk.environment_variables import (
OTEL_TRACES_SAMPLER_ARG,
)

from azure.monitor.opentelemetry.exporter.export.trace._utils import _get_DJB2_sample_score

from azure.monitor.opentelemetry.exporter._constants import _SAMPLE_RATE_KEY

_INVALID_FLOAT_MESSAGE = "Value of %s must be a float. Defaulting to %s."

_logger = getLogger(__name__)


# Sampler is responsible for the following:
# Implements same trace id hashing algorithm so that traces are sampled the same across multiple nodes (via AI SDKS)
Expand All @@ -27,9 +38,47 @@ class ApplicationInsightsSampler(Sampler):
"""Sampler that implements the same probability sampling algorithm as the ApplicationInsights SDKs."""

# sampling_ratio must take a value in the range [0,1]
def __init__(self, sampling_ratio: float = 1.0):
if not 0.0 <= sampling_ratio <= 1.0:
raise ValueError("sampling_ratio must be in the range [0,1]")
def __init__(self, sampling_ratio: Optional[float] = None):
default_sampling_ratio = 1.0
if sampling_ratio is not None:
try:
sampling_ratio = float(sampling_ratio)
if not math.isfinite(sampling_ratio) or sampling_ratio < 0.0 or sampling_ratio > 1.0:
_logger.error(
"Invalid value '%s' for sampling ratio. "
"Sampling ratio must be in the range [0.0, 1.0]. "
"Defaulting to %s.",
sampling_ratio,
default_sampling_ratio,
)
sampling_ratio = default_sampling_ratio
except (ValueError, TypeError):
_logger.error(
"Invalid value '%s' for sampling ratio. Defaulting to %s.", sampling_ratio, default_sampling_ratio
)
sampling_ratio = default_sampling_ratio
else:
sampling_arg = os.environ.get(OTEL_TRACES_SAMPLER_ARG)
try:
sampler_value = float(sampling_arg) if sampling_arg is not None else default_sampling_ratio
if not math.isfinite(sampler_value) or sampler_value < 0.0 or sampler_value > 1.0:
_logger.error(
"Invalid value '%s' for OTEL_TRACES_SAMPLER_ARG. "
"It should be a value between 0 and 1. Defaulting to %s.",
sampling_arg,
default_sampling_ratio,
)
sampling_ratio = default_sampling_ratio
else:
_logger.info("Using sampling ratio: %s", sampler_value)
sampling_ratio = sampler_value
except ValueError:
_logger.error( # pylint: disable=C0301
_INVALID_FLOAT_MESSAGE,
OTEL_TRACES_SAMPLER_ARG,
default_sampling_ratio,
)
sampling_ratio = default_sampling_ratio
self._ratio = sampling_ratio
self._sample_rate = sampling_ratio * 100

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,81 @@ def test_sampler_creation(self):
self.assertIsInstance(sampler, RateLimitedSampler)
self.assertEqual(sampler.get_description(), f"RateLimitedSampler{{{target_rate}}}")

# Test that negative target rates raise a ValueError
def test_negative_rate_raises_error(self):
with self.assertRaises(ValueError):
RateLimitedSampler(-1.0)
# Test that negative explicit traces per second logs error and defaults to 5.0
def test_negative_rate_defaults_to_5(self):
sampler = RateLimitedSampler(-1.0)
self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}")

# Test that non-numeric explicit traces per second logs error and defaults to 5.0
def test_invalid_type_rate_defaults(self):
sampler = RateLimitedSampler({}) # type: ignore
self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}")

# Test default traces per second when no argument and no env var set
def test_constructor_default_traces_per_second(self):
with patch.dict("os.environ", {}, clear=True):
sampler = RateLimitedSampler()
self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}")

# Test traces per second is read from OTEL_TRACES_SAMPLER_ARG when no explicit value passed
def test_constructor_traces_per_second_from_env_var(self):
with patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "10.0"}):
sampler = RateLimitedSampler()
self.assertEqual(sampler.get_description(), "RateLimitedSampler{10.0}")

# Test explicit traces per second takes precedence over OTEL_TRACES_SAMPLER_ARG
def test_constructor_explicit_value_passed_through_distro(self):
with patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "10.0"}):
traces_per_second = 2.0
sampler = RateLimitedSampler(traces_per_second)
self.assertEqual(sampler.get_description(), f"RateLimitedSampler{{{traces_per_second}}}")

# Test env var with negative value logs error and defaults to 5.0
def test_constructor_env_var_negative(self):
with patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "-5.0"}):
sampler = RateLimitedSampler()
self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}")

# Test env var with invalid float value logs error and defaults to 5.0
def test_constructor_env_var_invalid_float(self):
with patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "not_a_number"}):
sampler = RateLimitedSampler()
self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}")

# Test that infinite explicit traces per second logs error and defaults to 5.0
def test_infinite_traces_per_second_explicit(self):
with patch.dict("os.environ", {}, clear=True):
sampler = RateLimitedSampler(float("inf"))
self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}")
sampler = RateLimitedSampler(float("-inf"))
self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}")

# Test infinite value from env var falls back to 5.0
def test_infinite_traces_per_second_env_var(self):
with patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "inf"}):
sampler = RateLimitedSampler()
self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}")
with patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "-inf"}):
sampler = RateLimitedSampler()
self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}")

# Test that NaN explicit traces per second logs error and defaults to 5.0
def test_nan_traces_per_second_explicit(self):
with patch.dict("os.environ", {}, clear=True):
sampler = RateLimitedSampler(float("nan"))
self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}")

# Test that non-numeric explicit traces per second logs error and defaults to 5.0
def test_non_numeric_traces_per_second_explicit(self):
with patch.dict("os.environ", {}, clear=True):
sampler = RateLimitedSampler("def") # type: ignore
self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}")

# Test NaN value from env var falls back to 5.0
def test_nan_traces_per_second_env_var(self):
with patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "nan"}):
sampler = RateLimitedSampler()
self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}")

# Test sampling behavior with zero target rate
def test_zero_rate_sampling(self):
Expand Down
Loading
Loading