Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions api_server_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
from cloud_pipelines_backend import database_ops
from cloud_pipelines_backend.instrumentation import api_tracing
from cloud_pipelines_backend.instrumentation import contextual_logging
from cloud_pipelines_backend.instrumentation import otel_tracing
from cloud_pipelines_backend.instrumentation import opentelemetry as otel

app = fastapi.FastAPI(
title="Cloud Pipelines API",
version="0.0.1",
separate_input_output_schemas=False,
)

# Configure OpenTelemetry tracing
otel_tracing.setup_api_tracing(app)
otel.setup_providers()
otel.instrument_fastapi(app)

# Add request context middleware for automatic request_id generation
app.add_middleware(api_tracing.RequestContextMiddleware)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""
OpenTelemetry instrumentation public API.

Usage::

from cloud_pipelines_backend.instrumentation import opentelemetry as otel

otel.setup_providers()
otel.instrument_fastapi(app)
"""

from cloud_pipelines_backend.instrumentation.opentelemetry import auto_instrumentation
from cloud_pipelines_backend.instrumentation.opentelemetry import providers
from cloud_pipelines_backend.instrumentation.opentelemetry import tracing

instrument_fastapi = auto_instrumentation.instrument_fastapi
setup_providers = providers.setup
setup_tracing = tracing.setup

__all__ = [
"instrument_fastapi",
"setup_providers",
"setup_tracing",
]
Copy link
Contributor

@Ark-kun Ark-kun Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming module with a singular noun often conflicts with local variables. Better use plural or more uncountable. (Maybe, configuration is OK despite technically being singular noun...)

Kubernetes has

from kubernetes import config
from kubernetes import client

and it's very inconvenient, forcing me to rename those imports every time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair! Changed.

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""
Shared OpenTelemetry configuration resolution.

Reads and validates OTel settings from environment variables.
"""

import dataclasses
import enum
import os


class ExporterProtocol(str, enum.Enum):
GRPC = "grpc"
HTTP = "http"


@dataclasses.dataclass(frozen=True, kw_only=True)
class OtelConfig:
endpoint: str
protocol: str
service_name: str


def resolve(service_name: str | None = None) -> OtelConfig | None:
"""Read and validate shared OTel configuration from environment variables.

Returns None if OTel is not configured (no exporter endpoint set).
Raises ValueError if the configuration is invalid.
"""
otel_endpoint = os.environ.get("TANGLE_OTEL_EXPORTER_ENDPOINT")
if not otel_endpoint:
return None

otel_protocol = os.environ.get(
"TANGLE_OTEL_EXPORTER_PROTOCOL", ExporterProtocol.GRPC
)

if service_name is None:
app_env = os.environ.get("TANGLE_ENV", "unknown")
service_name = f"tangle-{app_env}"

if not otel_endpoint.startswith(("http://", "https://")):
raise ValueError(
f"Invalid OTel endpoint format: {otel_endpoint}. "
f"Expected format: http://<host>:<port> or https://<host>:<port>"
)
try:
ExporterProtocol(otel_protocol)
except ValueError:
raise ValueError(
f"Invalid OTel protocol: {otel_protocol}. "
f"Expected values: {', '.join(e.value for e in ExporterProtocol)}"
)

return OtelConfig(
endpoint=otel_endpoint,
protocol=otel_protocol,
service_name=service_name,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""
OpenTelemetry provider state checks.

Queries the global OpenTelemetry SDK state to determine which
providers have been configured.
"""

from opentelemetry import metrics as otel_metrics
from opentelemetry import trace
from opentelemetry.sdk import metrics as otel_sdk_metrics
from opentelemetry.sdk import trace as otel_sdk_trace


def has_configured_providers() -> bool:
"""Check whether any OpenTelemetry SDK providers have been configured globally.

Logs provider is omitted while the OpenTelemetry Logs API remains experimental.
"""
return isinstance(
trace.get_tracer_provider(), otel_sdk_trace.TracerProvider
) or isinstance(otel_metrics.get_meter_provider(), otel_sdk_metrics.MeterProvider)
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
OpenTelemetry auto-instrumentation for FastAPI applications.

Instrumentation is only activated when at least one OpenTelemetry SDK
provider (traces, metrics, or logs) has been configured globally.
"""

import fastapi
import logging

from opentelemetry.instrumentation import fastapi as otel_fastapi

from cloud_pipelines_backend.instrumentation.opentelemetry._internal import providers

_logger = logging.getLogger(__name__)


def instrument_fastapi(app: fastapi.FastAPI) -> None:
"""
Apply OpenTelemetry auto-instrumentation to a FastAPI application.

No-op if no OpenTelemetry SDK providers have been configured globally,
since there would be no backend to receive the telemetry data.

See: https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/fastapi/fastapi.html

Args:
app: The FastAPI application instance to instrument.
"""
if not providers.has_configured_providers():
_logger.debug(
"Skipping FastAPI auto-instrumentation: no OpenTelemetry providers configured"
)
return

try:
otel_fastapi.FastAPIInstrumentor.instrument_app(app)
_logger.info("FastAPI auto-instrumentation enabled")
except Exception as e:
_logger.exception("Failed to apply FastAPI auto-instrumentation")
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""
OpenTelemetry provider setup.

Provides entry points to configure OpenTelemetry providers.
"""

import logging

from cloud_pipelines_backend.instrumentation.opentelemetry._internal import configuration
from cloud_pipelines_backend.instrumentation.opentelemetry import tracing

_logger = logging.getLogger(__name__)


def setup(service_name: str | None = None) -> None:
"""
Configure global OpenTelemetry providers (traces, metrics).

No-op if TANGLE_OTEL_EXPORTER_ENDPOINT is not set.

Use this for non-FastAPI entrypoints (e.g. orchestrators, workers) that
need telemetry but have no ASGI app to auto-instrument.

Args:
service_name: Override the default service name reported to the collector.
"""
try:
otel_config = configuration.resolve(service_name=service_name)
except Exception as e:
_logger.exception("Failed to resolve OpenTelemetry configuration")
return

if otel_config is None:
return

tracing.setup(
endpoint=otel_config.endpoint,
protocol=otel_config.protocol,
service_name=otel_config.service_name,
)

# TODO: Setup metrics provider once it's available
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""
OpenTelemetry tracing configuration.

This module sets up the global tracer provider with an OTLP exporter.
"""

import logging

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc import (
trace_exporter as otel_grpc_trace_exporter,
)
from opentelemetry.exporter.otlp.proto.http import (
trace_exporter as otel_http_trace_exporter,
)
from opentelemetry.sdk import resources as otel_resources
from opentelemetry.sdk import trace as otel_trace
from opentelemetry.sdk.trace import export as otel_trace_export

from cloud_pipelines_backend.instrumentation.opentelemetry._internal import configuration

_logger = logging.getLogger(__name__)


def setup(endpoint: str, protocol: str, service_name: str) -> None:
"""
Configure the global OpenTelemetry tracer provider.

Args:
endpoint: The OTLP collector endpoint URL.
protocol: The exporter protocol ("grpc" or "http").
service_name: The service name reported to the collector.
"""
try:
_logger.info(
f"Configuring OpenTelemetry tracing, endpoint={endpoint}, "
f"protocol={protocol}, service_name={service_name}"
)

if protocol == configuration.ExporterProtocol.GRPC:
otel_exporter = otel_grpc_trace_exporter.OTLPSpanExporter(endpoint=endpoint)
else:
otel_exporter = otel_http_trace_exporter.OTLPSpanExporter(endpoint=endpoint)

resource = otel_resources.Resource(
attributes={otel_resources.SERVICE_NAME: service_name}
)
tracer_provider = otel_trace.TracerProvider(resource=resource)
span_processor = otel_trace_export.BatchSpanProcessor(otel_exporter)
tracer_provider.add_span_processor(span_processor)
trace.set_tracer_provider(tracer_provider)

_logger.info("OpenTelemetry tracing configured successfully.")
except Exception as e:
_logger.exception("Failed to configure OpenTelemetry tracing")
97 changes: 0 additions & 97 deletions cloud_pipelines_backend/instrumentation/otel_tracing.py

This file was deleted.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ huggingface = [
"huggingface-hub[oauth]>=0.35.3",
]

[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "--import-mode=importlib"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which errors required making this change?
Is this still needed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will run the tests again tomorrow and confirm. And paste the error if it is still present.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The answer is yes, it is still needed.

Context:
--import-mode=importlib is required because the test layout has two files with the same name: test_providers.py exists in both tests/instrumentation/opentelemetry/ and tests/instrumentation/opentelemetry/_internal/. Pytest's default prepend import mode can't distinguish between identically-named test modules in sibling directories — it errors with "which is not the same as the test file we want to collect". importlib mode gives each file a unique identity based on its full path rather than its module name, which resolves the collision.

[tool.setuptools.packages.find]
include = ["cloud_pipelines_backend*"]
namespaces = true
Loading