diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index a304408ab63c..686a9943c902 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -9,6 +9,8 @@ ### Bugs Fixed ### Other Changes +- Move sampler argument extraction logic from distro to respective samplers + ([#45849](https://github.com/Azure/azure-sdk-for-python/pull/45849)) ## 1.0.0b49 (2026-03-19) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py index 3061fcafaa03..a772a59cd1db 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_rate_limited_sampling.py @@ -2,9 +2,11 @@ # Licensed under the MIT License. import math +import os import threading import time from typing import Optional, Sequence +from logging import getLogger from opentelemetry.context import Context from opentelemetry.trace import Link, SpanKind, format_trace_id from opentelemetry.sdk.trace.sampling import ( @@ -16,6 +18,10 @@ from opentelemetry.trace.span import TraceState from opentelemetry.util.types import Attributes +from opentelemetry.sdk.environment_variables import ( + OTEL_TRACES_SAMPLER_ARG, +) + from azure.monitor.opentelemetry.exporter._constants import _SAMPLE_RATE_KEY from azure.monitor.opentelemetry.exporter.export.trace._utils import ( @@ -24,6 +30,13 @@ parent_context_sampling, ) +_INVALID_TRACES_PER_SECOND_MESSAGE = "Invalid value '%s' for traces per second. Expected a float. Defaulting to %s." +_INVALID_TRACES_PER_SECOND_MESSAGE_NEGATIVE_VALUE = ( + "Invalid value '%s' for traces per second. It should be a non-negative number. Defaulting to %s" +) + +_logger = getLogger(__name__) + class _State: def __init__(self, effective_window_count: float, effective_window_nanoseconds: float, last_nano_time: int): @@ -33,13 +46,13 @@ def __init__(self, effective_window_count: float, effective_window_nanoseconds: class RateLimitedSamplingPercentage: - def __init__(self, target_spans_per_second_limit: float, round_to_nearest: bool = True): - if target_spans_per_second_limit < 0.0: + def __init__(self, traces_per_second: float, round_to_nearest: bool = True): + if traces_per_second < 0.0: raise ValueError("Limit for sampled spans per second must be nonnegative!") # Hardcoded adaptation time of 0.1 seconds for adjusting to sudden changes in telemetry volumes adaptation_time_seconds = 0.1 self._inverse_adaptation_time_nanoseconds = 1e-9 / adaptation_time_seconds - self._target_spans_per_nanosecond_limit = 1e-9 * target_spans_per_second_limit + self._target_spans_per_nanosecond_limit = 1e-9 * traces_per_second initial_nano_time = int(time.time_ns()) self._state = _State(0.0, 0.0, initial_nano_time) self._lock = threading.Lock() @@ -82,9 +95,42 @@ def get(self) -> float: class RateLimitedSampler(Sampler): - def __init__(self, target_spans_per_second_limit: float): - self._sampling_percentage_generator = RateLimitedSamplingPercentage(target_spans_per_second_limit) - self._description = f"RateLimitedSampler{{{target_spans_per_second_limit}}}" + def __init__(self, traces_per_second: Optional[float] = None): + default_traces_per_second = 5.0 + if traces_per_second is not None: + try: + traces_per_second = float(traces_per_second) + if not math.isfinite(traces_per_second) or traces_per_second < 0.0: + _logger.error( + _INVALID_TRACES_PER_SECOND_MESSAGE_NEGATIVE_VALUE, traces_per_second, default_traces_per_second + ) + traces_per_second = default_traces_per_second + else: + _logger.info("Using rate limited sampler: %s traces per second", traces_per_second) + except (ValueError, TypeError): + _logger.error(_INVALID_TRACES_PER_SECOND_MESSAGE, traces_per_second, default_traces_per_second) + traces_per_second = default_traces_per_second + else: + sampling_arg = os.environ.get(OTEL_TRACES_SAMPLER_ARG) + try: + sampler_value = float(sampling_arg) if sampling_arg is not None else default_traces_per_second + if not math.isfinite(sampler_value) or sampler_value < 0.0: + _logger.error( + _INVALID_TRACES_PER_SECOND_MESSAGE_NEGATIVE_VALUE, sampler_value, default_traces_per_second + ) + traces_per_second = default_traces_per_second + else: + _logger.info("Using rate limited sampler: %s traces per second", sampler_value) + traces_per_second = sampler_value + except ValueError: + _logger.error( # pylint: disable=C0301 + _INVALID_TRACES_PER_SECOND_MESSAGE, + sampling_arg, + default_traces_per_second, + ) + traces_per_second = default_traces_per_second + self._sampling_percentage_generator = RateLimitedSamplingPercentage(traces_per_second) + self._description = f"RateLimitedSampler{{{traces_per_second}}}" def should_sample( self, diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_sampling.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_sampling.py index 9f676bf50fa9..b707082f91fb 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_sampling.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_sampling.py @@ -1,6 +1,9 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +import math +import os from typing import Optional, Sequence +from logging import getLogger from opentelemetry.context import Context from opentelemetry.trace import Link, SpanKind, format_trace_id @@ -13,10 +16,18 @@ from opentelemetry.trace.span import TraceState from opentelemetry.util.types import Attributes +from opentelemetry.sdk.environment_variables import ( + OTEL_TRACES_SAMPLER_ARG, +) + from azure.monitor.opentelemetry.exporter.export.trace._utils import _get_DJB2_sample_score from azure.monitor.opentelemetry.exporter._constants import _SAMPLE_RATE_KEY +_INVALID_FLOAT_MESSAGE = "Value of %s must be a float. Defaulting to %s." + +_logger = getLogger(__name__) + # Sampler is responsible for the following: # Implements same trace id hashing algorithm so that traces are sampled the same across multiple nodes (via AI SDKS) @@ -27,9 +38,47 @@ class ApplicationInsightsSampler(Sampler): """Sampler that implements the same probability sampling algorithm as the ApplicationInsights SDKs.""" # sampling_ratio must take a value in the range [0,1] - def __init__(self, sampling_ratio: float = 1.0): - if not 0.0 <= sampling_ratio <= 1.0: - raise ValueError("sampling_ratio must be in the range [0,1]") + def __init__(self, sampling_ratio: Optional[float] = None): + default_sampling_ratio = 1.0 + if sampling_ratio is not None: + try: + sampling_ratio = float(sampling_ratio) + if not math.isfinite(sampling_ratio) or sampling_ratio < 0.0 or sampling_ratio > 1.0: + _logger.error( + "Invalid value '%s' for sampling ratio. " + "Sampling ratio must be in the range [0.0, 1.0]. " + "Defaulting to %s.", + sampling_ratio, + default_sampling_ratio, + ) + sampling_ratio = default_sampling_ratio + except (ValueError, TypeError): + _logger.error( + "Invalid value '%s' for sampling ratio. Defaulting to %s.", sampling_ratio, default_sampling_ratio + ) + sampling_ratio = default_sampling_ratio + else: + sampling_arg = os.environ.get(OTEL_TRACES_SAMPLER_ARG) + try: + sampler_value = float(sampling_arg) if sampling_arg is not None else default_sampling_ratio + if not math.isfinite(sampler_value) or sampler_value < 0.0 or sampler_value > 1.0: + _logger.error( + "Invalid value '%s' for OTEL_TRACES_SAMPLER_ARG. " + "It should be a value between 0 and 1. Defaulting to %s.", + sampling_arg, + default_sampling_ratio, + ) + sampling_ratio = default_sampling_ratio + else: + _logger.info("Using sampling ratio: %s", sampler_value) + sampling_ratio = sampler_value + except ValueError: + _logger.error( # pylint: disable=C0301 + _INVALID_FLOAT_MESSAGE, + OTEL_TRACES_SAMPLER_ARG, + default_sampling_ratio, + ) + sampling_ratio = default_sampling_ratio self._ratio = sampling_ratio self._sample_rate = sampling_ratio * 100 diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py index 03133d065aef..8253f95bf4e8 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_rate_limited_sampling.py @@ -173,10 +173,81 @@ def test_sampler_creation(self): self.assertIsInstance(sampler, RateLimitedSampler) self.assertEqual(sampler.get_description(), f"RateLimitedSampler{{{target_rate}}}") - # Test that negative target rates raise a ValueError - def test_negative_rate_raises_error(self): - with self.assertRaises(ValueError): - RateLimitedSampler(-1.0) + # Test that negative explicit traces per second logs error and defaults to 5.0 + def test_negative_rate_defaults_to_5(self): + sampler = RateLimitedSampler(-1.0) + self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}") + + # Test that non-numeric explicit traces per second logs error and defaults to 5.0 + def test_invalid_type_rate_defaults(self): + sampler = RateLimitedSampler({}) # type: ignore + self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}") + + # Test default traces per second when no argument and no env var set + def test_constructor_default_traces_per_second(self): + with patch.dict("os.environ", {}, clear=True): + sampler = RateLimitedSampler() + self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}") + + # Test traces per second is read from OTEL_TRACES_SAMPLER_ARG when no explicit value passed + def test_constructor_traces_per_second_from_env_var(self): + with patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "10.0"}): + sampler = RateLimitedSampler() + self.assertEqual(sampler.get_description(), "RateLimitedSampler{10.0}") + + # Test explicit traces per second takes precedence over OTEL_TRACES_SAMPLER_ARG + def test_constructor_explicit_value_passed_through_distro(self): + with patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "10.0"}): + traces_per_second = 2.0 + sampler = RateLimitedSampler(traces_per_second) + self.assertEqual(sampler.get_description(), f"RateLimitedSampler{{{traces_per_second}}}") + + # Test env var with negative value logs error and defaults to 5.0 + def test_constructor_env_var_negative(self): + with patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "-5.0"}): + sampler = RateLimitedSampler() + self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}") + + # Test env var with invalid float value logs error and defaults to 5.0 + def test_constructor_env_var_invalid_float(self): + with patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "not_a_number"}): + sampler = RateLimitedSampler() + self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}") + + # Test that infinite explicit traces per second logs error and defaults to 5.0 + def test_infinite_traces_per_second_explicit(self): + with patch.dict("os.environ", {}, clear=True): + sampler = RateLimitedSampler(float("inf")) + self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}") + sampler = RateLimitedSampler(float("-inf")) + self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}") + + # Test infinite value from env var falls back to 5.0 + def test_infinite_traces_per_second_env_var(self): + with patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "inf"}): + sampler = RateLimitedSampler() + self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}") + with patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "-inf"}): + sampler = RateLimitedSampler() + self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}") + + # Test that NaN explicit traces per second logs error and defaults to 5.0 + def test_nan_traces_per_second_explicit(self): + with patch.dict("os.environ", {}, clear=True): + sampler = RateLimitedSampler(float("nan")) + self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}") + + # Test that non-numeric explicit traces per second logs error and defaults to 5.0 + def test_non_numeric_traces_per_second_explicit(self): + with patch.dict("os.environ", {}, clear=True): + sampler = RateLimitedSampler("def") # type: ignore + self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}") + + # Test NaN value from env var falls back to 5.0 + def test_nan_traces_per_second_env_var(self): + with patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "nan"}): + sampler = RateLimitedSampler() + self.assertEqual(sampler.get_description(), "RateLimitedSampler{5.0}") # Test sampling behavior with zero target rate def test_zero_rate_sampling(self): diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_sampling.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_sampling.py index 5d9d8b580ee6..eecc815e3341 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_sampling.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/trace/test_sampling.py @@ -11,19 +11,109 @@ # pylint: disable=protected-access class TestApplicationInsightsSampler(unittest.TestCase): + @mock.patch.dict("os.environ", {}, clear=True) def test_constructor(self): sampler = ApplicationInsightsSampler() self.assertEqual(sampler._ratio, 1.0) self.assertEqual(sampler._sample_rate, 100) + @mock.patch.dict("os.environ", {}, clear=True) def test_constructor_ratio(self): sampler = ApplicationInsightsSampler(0.75) self.assertEqual(sampler._ratio, 0.75) self.assertEqual(sampler._sample_rate, 75) + @mock.patch.dict("os.environ", {}, clear=True) def test_invalid_ratio(self): - self.assertRaises(ValueError, lambda: ApplicationInsightsSampler(1.01)) - self.assertRaises(ValueError, lambda: ApplicationInsightsSampler(-0.01)) + # Invalid explicit ratio logs an error and defaults to 1.0 instead of raising + sampler = ApplicationInsightsSampler(1.01) + self.assertEqual(sampler._ratio, 1.0) + self.assertEqual(sampler._sample_rate, 100.0) + sampler = ApplicationInsightsSampler(-0.01) + self.assertEqual(sampler._ratio, 1.0) + self.assertEqual(sampler._sample_rate, 100.0) + + @mock.patch.dict("os.environ", {}, clear=True) + def test_invalid_type_ratio_defaults(self): + # Non-numeric explicit ratio logs an error and defaults to 1.0 + sampler = ApplicationInsightsSampler({}) # type: ignore + self.assertEqual(sampler._ratio, 1.0) + self.assertEqual(sampler._sample_rate, 100.0) + + @mock.patch.dict("os.environ", {}, clear=True) + def test_user_passed_value_through_distro(self): + sampler = ApplicationInsightsSampler(sampling_ratio=0.5) + self.assertEqual(sampler._ratio, 0.5) + self.assertEqual(sampler._sample_rate, 50.0) + + def test_constructor_sampler_arg(self): + with mock.patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "0.5"}): + sampler = ApplicationInsightsSampler() + self.assertEqual(sampler._ratio, 0.5) + self.assertEqual(sampler._sample_rate, 50.0) + + def test_constructor_explicit_ratio_ignores_sampler_arg(self): + # Explicit ratio passed from distro takes priority over env var + with mock.patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "0.3"}): + sampler = ApplicationInsightsSampler(0.75) + self.assertEqual(sampler._ratio, 0.75) + self.assertEqual(sampler._sample_rate, 75.0) + + def test_constructor_sampler_arg_invalid_range(self): + # Invalid env var with no explicit ratio falls back to 1.0 + with mock.patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "1.5"}): + sampler = ApplicationInsightsSampler() + self.assertEqual(sampler._ratio, 1.0) + self.assertEqual(sampler._sample_rate, 100.0) + + def test_constructor_sampler_arg_invalid_float(self): + # Non-numeric env var with no explicit ratio falls back to 1.0 + with mock.patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "not_a_number"}): + sampler = ApplicationInsightsSampler() + self.assertEqual(sampler._ratio, 1.0) + self.assertEqual(sampler._sample_rate, 100.0) + + @mock.patch.dict("os.environ", {}, clear=True) + def test_infinite_ratio_explicit(self): + # Infinite explicit ratio logs an error and defaults to 1.0 + sampler = ApplicationInsightsSampler(float("inf")) + self.assertEqual(sampler._ratio, 1.0) + self.assertEqual(sampler._sample_rate, 100.0) + sampler = ApplicationInsightsSampler(float("-inf")) + self.assertEqual(sampler._ratio, 1.0) + self.assertEqual(sampler._sample_rate, 100.0) + + @mock.patch.dict("os.environ", {}, clear=True) + def test_invalid_type_ratio_explicit(self): + # Non-numeric explicit ratio logs an error and defaults to 1.0 + sampler = ApplicationInsightsSampler("abc") # type: ignore + self.assertEqual(sampler._ratio, 1.0) + self.assertEqual(sampler._sample_rate, 100.0) + + def test_infinite_ratio_env_var(self): + # Infinite value from env var falls back to 1.0 + with mock.patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "inf"}): + sampler = ApplicationInsightsSampler() + self.assertEqual(sampler._ratio, 1.0) + self.assertEqual(sampler._sample_rate, 100.0) + with mock.patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "-inf"}): + sampler = ApplicationInsightsSampler() + self.assertEqual(sampler._ratio, 1.0) + self.assertEqual(sampler._sample_rate, 100.0) + + @mock.patch.dict("os.environ", {}, clear=True) + def test_nan_ratio_explicit(self): + # NaN explicit ratio logs an error and defaults to 1.0 + sampler = ApplicationInsightsSampler(float("nan")) + self.assertEqual(sampler._ratio, 1.0) + self.assertEqual(sampler._sample_rate, 100.0) + + def test_nan_ratio_env_var(self): + # NaN value from env var falls back to 1.0 + with mock.patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "nan"}): + sampler = ApplicationInsightsSampler() + self.assertEqual(sampler._ratio, 1.0) + self.assertEqual(sampler._sample_rate, 100.0) @mock.patch("azure.monitor.opentelemetry.exporter.export.trace._sampling._get_DJB2_sample_score") def test_should_sample(self, score_mock): @@ -41,14 +131,17 @@ def test_should_sample_not_sampled(self, score_mock): self.assertEqual(result.attributes["_MS.sampleRate"], 50) self.assertFalse(result.decision.is_sampled()) + @mock.patch.dict("os.environ", {}, clear=True) def test_sampler_factory(self): sampler = azure_monitor_opentelemetry_sampler_factory("1.0") self.assertEqual(sampler._ratio, 1.0) + @mock.patch.dict("os.environ", {}, clear=True) def test_sampler_factory_none(self): sampler = azure_monitor_opentelemetry_sampler_factory(None) self.assertEqual(sampler._ratio, 1.0) + @mock.patch.dict("os.environ", {}, clear=True) def test_sampler_factory_empty(self): sampler = azure_monitor_opentelemetry_sampler_factory("") self.assertEqual(sampler._ratio, 1.0)