From 9ae210fae308fd296a26461007f4959eff4f746a Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Wed, 25 Feb 2026 03:12:34 -0800 Subject: [PATCH] chore!: Refactor OTel exporters to be signal-specific This allows users to set different exporters for traces vs metrics Made-with: Cursor --- .../opentelemetry/_internal/configuration.py | 67 +++++++++------ .../opentelemetry/providers.py | 17 ++-- .../_internal/test_configuration.py | 86 ++++++++++++------- .../opentelemetry/test_providers.py | 20 +++-- 4 files changed, 120 insertions(+), 70 deletions(-) diff --git a/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/configuration.py b/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/configuration.py index d361c13..f02edd0 100644 --- a/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/configuration.py +++ b/cloud_pipelines_backend/instrumentation/opentelemetry/_internal/configuration.py @@ -15,30 +15,61 @@ class ExporterProtocol(str, enum.Enum): @dataclasses.dataclass(frozen=True, kw_only=True) -class OtelConfig: +class ExporterConfig: endpoint: str protocol: str + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class OtelConfig: service_name: str service_version: str + trace_exporter: ExporterConfig | None = None + + +def _resolve_exporter( + endpoint_var: str, + protocol_var: str, +) -> ExporterConfig | None: + endpoint = os.environ.get(endpoint_var) + if not endpoint: + return None + + protocol = os.environ.get(protocol_var, ExporterProtocol.GRPC) + + if not endpoint.startswith(("http://", "https://")): + raise ValueError( + f"Invalid OTel endpoint format: {endpoint}. " + f"Expected format: http://: or https://:" + ) + try: + ExporterProtocol(protocol) + except ValueError: + raise ValueError( + f"Invalid OTel protocol: {protocol}. " + f"Expected values: {', '.join(e.value for e in ExporterProtocol)}" + ) + + return ExporterConfig(endpoint=endpoint, protocol=protocol) def resolve( service_name: str | None = None, service_version: str | None = None, ) -> OtelConfig | None: - """Read and validate shared OTel configuration from environment variables. + """Read and validate OTel configuration from environment variables. - Returns None if OTel is not configured (no exporter endpoint set). - Raises ValueError if the configuration is invalid. + Returns None if no signals are configured (no exporter endpoints set). + Raises ValueError if any configured exporter has invalid settings. """ - otel_endpoint = os.environ.get("TANGLE_OTEL_EXPORTER_ENDPOINT") - if not otel_endpoint: - return None - - otel_protocol = os.environ.get( - "TANGLE_OTEL_EXPORTER_PROTOCOL", ExporterProtocol.GRPC + trace_exporter = _resolve_exporter( + endpoint_var="TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", + protocol_var="TANGLE_OTEL_TRACE_EXPORTER_PROTOCOL", ) + if trace_exporter is None: + return None + if service_name is None: app_env = os.environ.get("TANGLE_ENV", "unknown") service_name = f"tangle-{app_env}" @@ -46,22 +77,8 @@ def resolve( if service_version is None: service_version = os.environ.get("TANGLE_SERVICE_VERSION", "unknown") - if not otel_endpoint.startswith(("http://", "https://")): - raise ValueError( - f"Invalid OTel endpoint format: {otel_endpoint}. " - f"Expected format: http://: or https://:" - ) - try: - ExporterProtocol(otel_protocol) - except ValueError: - raise ValueError( - f"Invalid OTel protocol: {otel_protocol}. " - f"Expected values: {', '.join(e.value for e in ExporterProtocol)}" - ) - return OtelConfig( - endpoint=otel_endpoint, - protocol=otel_protocol, service_name=service_name, service_version=service_version, + trace_exporter=trace_exporter, ) diff --git a/cloud_pipelines_backend/instrumentation/opentelemetry/providers.py b/cloud_pipelines_backend/instrumentation/opentelemetry/providers.py index cd062ef..9769504 100644 --- a/cloud_pipelines_backend/instrumentation/opentelemetry/providers.py +++ b/cloud_pipelines_backend/instrumentation/opentelemetry/providers.py @@ -21,7 +21,7 @@ def setup( """ Configure global OpenTelemetry providers (traces, metrics). - No-op if TANGLE_OTEL_EXPORTER_ENDPOINT is not set. + No-op if no signal-specific exporter endpoints are set. Use this for non-FastAPI entrypoints (e.g. orchestrators, workers) that need telemetry but have no ASGI app to auto-instrument. @@ -42,11 +42,10 @@ def setup( if otel_config is None: return - tracing.setup( - endpoint=otel_config.endpoint, - protocol=otel_config.protocol, - service_name=otel_config.service_name, - service_version=otel_config.service_version, - ) - - # TODO: Setup metrics provider once it's available + if otel_config.trace_exporter: + tracing.setup( + endpoint=otel_config.trace_exporter.endpoint, + protocol=otel_config.trace_exporter.protocol, + service_name=otel_config.service_name, + service_version=otel_config.service_version, + ) diff --git a/tests/instrumentation/opentelemetry/_internal/test_configuration.py b/tests/instrumentation/opentelemetry/_internal/test_configuration.py index ed366eb..1417ae1 100644 --- a/tests/instrumentation/opentelemetry/_internal/test_configuration.py +++ b/tests/instrumentation/opentelemetry/_internal/test_configuration.py @@ -24,36 +24,42 @@ def test_invalid_value_raises(self): class TestResolve: """Tests for configuration.resolve().""" - def test_returns_none_when_endpoint_not_set(self, monkeypatch): - monkeypatch.delenv("TANGLE_OTEL_EXPORTER_ENDPOINT", raising=False) + def test_returns_none_when_no_exporters_configured(self, monkeypatch): + monkeypatch.delenv("TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", raising=False) result = configuration.resolve() assert result is None def test_returns_config_with_defaults(self, monkeypatch): - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317") - monkeypatch.delenv("TANGLE_OTEL_EXPORTER_PROTOCOL", raising=False) + monkeypatch.setenv( + "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + ) + monkeypatch.delenv("TANGLE_OTEL_TRACE_EXPORTER_PROTOCOL", raising=False) monkeypatch.delenv("TANGLE_ENV", raising=False) monkeypatch.delenv("TANGLE_SERVICE_VERSION", raising=False) result = configuration.resolve() assert result is not None - assert result.endpoint == "http://localhost:4317" - assert result.protocol == configuration.ExporterProtocol.GRPC + assert result.trace_exporter.endpoint == "http://localhost:4317" + assert result.trace_exporter.protocol == configuration.ExporterProtocol.GRPC assert result.service_name == "tangle-unknown" assert result.service_version == "unknown" def test_uses_custom_service_name(self, monkeypatch): - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317") + monkeypatch.setenv( + "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + ) result = configuration.resolve(service_name="my-api") assert result.service_name == "my-api" def test_service_name_includes_tangle_env(self, monkeypatch): - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317") + monkeypatch.setenv( + "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + ) monkeypatch.setenv("TANGLE_ENV", "production") result = configuration.resolve() @@ -61,52 +67,62 @@ def test_service_name_includes_tangle_env(self, monkeypatch): assert result.service_name == "tangle-production" def test_respects_http_protocol(self, monkeypatch): - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4318") - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_PROTOCOL", "http") + monkeypatch.setenv( + "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4318" + ) + monkeypatch.setenv("TANGLE_OTEL_TRACE_EXPORTER_PROTOCOL", "http") result = configuration.resolve() - assert result.protocol == configuration.ExporterProtocol.HTTP + assert result.trace_exporter.protocol == configuration.ExporterProtocol.HTTP def test_respects_grpc_protocol(self, monkeypatch): - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317") - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_PROTOCOL", "grpc") + monkeypatch.setenv( + "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + ) + monkeypatch.setenv("TANGLE_OTEL_TRACE_EXPORTER_PROTOCOL", "grpc") result = configuration.resolve() - assert result.protocol == configuration.ExporterProtocol.GRPC + assert result.trace_exporter.protocol == configuration.ExporterProtocol.GRPC def test_raises_on_invalid_endpoint_format(self, monkeypatch): - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "localhost:4317") + monkeypatch.setenv("TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "localhost:4317") with pytest.raises(ValueError, match="Invalid OTel endpoint format"): configuration.resolve() def test_raises_on_invalid_protocol(self, monkeypatch): - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317") - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_PROTOCOL", "websocket") + monkeypatch.setenv( + "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + ) + monkeypatch.setenv("TANGLE_OTEL_TRACE_EXPORTER_PROTOCOL", "websocket") with pytest.raises(ValueError, match="Invalid OTel protocol"): configuration.resolve() def test_accepts_https_endpoint(self, monkeypatch): monkeypatch.setenv( - "TANGLE_OTEL_EXPORTER_ENDPOINT", "https://collector.example.com:4317" + "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "https://collector.example.com:4317" ) result = configuration.resolve() - assert result.endpoint == "https://collector.example.com:4317" + assert result.trace_exporter.endpoint == "https://collector.example.com:4317" def test_uses_custom_service_version(self, monkeypatch): - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317") + monkeypatch.setenv( + "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + ) result = configuration.resolve(service_version="abc123") assert result.service_version == "abc123" def test_service_version_from_env(self, monkeypatch): - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317") + monkeypatch.setenv( + "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + ) monkeypatch.setenv("TANGLE_SERVICE_VERSION", "def456") result = configuration.resolve() @@ -114,7 +130,9 @@ def test_service_version_from_env(self, monkeypatch): assert result.service_version == "def456" def test_service_version_defaults_to_unknown(self, monkeypatch): - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317") + monkeypatch.setenv( + "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + ) monkeypatch.delenv("TANGLE_SERVICE_VERSION", raising=False) result = configuration.resolve() @@ -122,19 +140,29 @@ def test_service_version_defaults_to_unknown(self, monkeypatch): assert result.service_version == "unknown" def test_config_is_frozen(self, monkeypatch): - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317") + monkeypatch.setenv( + "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + ) result = configuration.resolve() with pytest.raises(AttributeError): - result.endpoint = "http://other:4317" + result.service_name = "other" - def test_config_requires_keyword_arguments(self, monkeypatch): - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317") + def test_exporter_config_is_frozen(self, monkeypatch): + monkeypatch.setenv( + "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + ) result = configuration.resolve() + with pytest.raises(AttributeError): + result.trace_exporter.endpoint = "http://other:4317" + + def test_config_requires_keyword_arguments(self): + with pytest.raises(TypeError): + configuration.OtelConfig("my-service", "unknown") + + def test_exporter_config_requires_keyword_arguments(self): with pytest.raises(TypeError): - configuration.OtelConfig( - result.endpoint, result.protocol, result.service_name - ) + configuration.ExporterConfig("http://localhost:4317", "grpc") diff --git a/tests/instrumentation/opentelemetry/test_providers.py b/tests/instrumentation/opentelemetry/test_providers.py index 9ccacf7..b8a4207 100644 --- a/tests/instrumentation/opentelemetry/test_providers.py +++ b/tests/instrumentation/opentelemetry/test_providers.py @@ -11,8 +11,8 @@ class TestProvidersSetup: """Tests for providers.setup().""" - def test_noop_when_endpoint_not_set(self, monkeypatch): - monkeypatch.delenv("TANGLE_OTEL_EXPORTER_ENDPOINT", raising=False) + def test_noop_when_no_exporters_configured(self, monkeypatch): + monkeypatch.delenv("TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", raising=False) providers.setup() @@ -21,15 +21,19 @@ def test_noop_when_endpoint_not_set(self, monkeypatch): ) def test_configures_tracer_provider(self, monkeypatch): - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317") - monkeypatch.delenv("TANGLE_OTEL_EXPORTER_PROTOCOL", raising=False) + monkeypatch.setenv( + "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + ) + monkeypatch.delenv("TANGLE_OTEL_TRACE_EXPORTER_PROTOCOL", raising=False) providers.setup() assert isinstance(trace.get_tracer_provider(), otel_sdk_trace.TracerProvider) def test_passes_custom_service_name(self, monkeypatch): - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317") + monkeypatch.setenv( + "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + ) providers.setup(service_name="my-orchestrator") @@ -37,7 +41,9 @@ def test_passes_custom_service_name(self, monkeypatch): assert provider.resource.attributes["service.name"] == "my-orchestrator" def test_passes_custom_service_version(self, monkeypatch): - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "http://localhost:4317") + monkeypatch.setenv( + "TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "http://localhost:4317" + ) providers.setup(service_name="test-service", service_version="abc123") @@ -45,7 +51,7 @@ def test_passes_custom_service_version(self, monkeypatch): assert provider.resource.attributes["service.version"] == "abc123" def test_catches_validation_errors(self, monkeypatch): - monkeypatch.setenv("TANGLE_OTEL_EXPORTER_ENDPOINT", "bad-endpoint") + monkeypatch.setenv("TANGLE_OTEL_TRACE_EXPORTER_ENDPOINT", "bad-endpoint") providers.setup()