From 095b1aa3606709de0c016d2418078aaef75640de Mon Sep 17 00:00:00 2001 From: Manish Gupta <59428681+mguptahub@users.noreply.github.com> Date: Thu, 28 May 2026 18:34:27 +0530 Subject: [PATCH] [WEB-7447] feat: migrate CE telemetry from OTLP traces to OTLP metrics (#9156) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [WEB-7447] feat: migrate CE telemetry from OTLP traces to OTLP metrics Replace span-based tracing (tracer.py) with OTLP observable gauges, mirroring the approach already used in plane-ee. Key changes: - Add otlp_endpoints.py — shared gRPC/HTTP endpoint helpers - Add telemetry_metrics.py — push_instance_metrics task using MeterProvider + observable gauges (service name: plane-ce-api) - User count excludes bots (is_bot=False) - Page count excludes bot-owned private pages only - Domain derived from WEB_URL env var - Celery beat entry replaced with timedelta schedule + configurable METRICS_PUSH_INTERVAL_MINUTES (default 360 min) - Add explicit opentelemetry-exporter-otlp-proto-grpc dep - Delete tracer.py and telemetry.py (no longer needed) Co-authored-by: Plane AI * fix: address review comments on CE telemetry metrics - harden grpc_endpoint_from_url for scheme-less OTLP_ENDPOINT values (e.g. "telemetry.plane.so:4317") by prepending "//" before urlparse - fix WEB_URL domain extraction for scheme-less values with same approach - replace N+1 workspace count queries (6×N) with 6 batched annotate(Count) aggregation queries — reduces DB load significantly at WORKSPACE_METRICS_LIMIT - add deterministic ordering (order_by created_at) to workspace slice - harden METRICS_PUSH_INTERVAL_MINUTES env parsing with try/except guard and positive-value validation to avoid crash on malformed input Co-authored-by: Plane AI * fix: cap METRICS_PUSH_INTERVAL_MINUTES to prevent timedelta overflow Add upper-bound check (10_000_000 minutes) and catch OverflowError alongside ValueError so an arbitrarily large env value cannot crash worker startup via timedelta(minutes=...) OverflowError. Co-authored-by: Plane AI --------- Co-authored-by: Plane AI --- apps/api/plane/celery.py | 23 +- .../license/bgtasks/telemetry_metrics.py | 381 ++++++++++++++++++ apps/api/plane/license/bgtasks/tracer.py | 105 ----- .../management/commands/register_instance.py | 6 +- apps/api/plane/settings/common.py | 2 +- apps/api/plane/utils/otlp_endpoints.py | 59 +++ apps/api/plane/utils/telemetry.py | 62 --- apps/api/requirements/base.txt | 1 + 8 files changed, 464 insertions(+), 175 deletions(-) create mode 100644 apps/api/plane/license/bgtasks/telemetry_metrics.py delete mode 100644 apps/api/plane/license/bgtasks/tracer.py create mode 100644 apps/api/plane/utils/otlp_endpoints.py delete mode 100644 apps/api/plane/utils/telemetry.py diff --git a/apps/api/plane/celery.py b/apps/api/plane/celery.py index c2455a9a51c..8ae7c7b7051 100644 --- a/apps/api/plane/celery.py +++ b/apps/api/plane/celery.py @@ -5,12 +5,13 @@ # Python imports import os import logging +from datetime import timedelta # Third party imports from celery import Celery from pythonjsonlogger.json import JsonFormatter from celery.signals import after_setup_logger, after_setup_task_logger -from celery.schedules import crontab +from celery.schedules import crontab, schedule # Module imports from plane.settings.redis import redis_instance @@ -20,6 +21,20 @@ ri = redis_instance() +# Configurable metrics push interval (in minutes) +# Default: 360 (6 hours), set to 5 for development/testing +def _get_metrics_push_interval_minutes() -> int: + raw = os.environ.get("METRICS_PUSH_INTERVAL_MINUTES", "360") + try: + value = int(raw) + # Cap at 10,000,000 minutes to prevent timedelta(minutes=...) OverflowError + # on arbitrarily large inputs while still allowing multi-year intervals. + return value if 0 < value <= 10_000_000 else 360 + except (ValueError, OverflowError): + return 360 + +METRICS_PUSH_INTERVAL_MINUTES = _get_metrics_push_interval_minutes() + app = Celery("plane") # Using a string here means the worker will not have to @@ -32,9 +47,9 @@ "task": "plane.bgtasks.email_notification_task.stack_email_notification", "schedule": crontab(minute="*/5"), # Every 5 minutes }, - "run-every-6-hours-for-instance-trace": { - "task": "plane.license.bgtasks.tracer.instance_traces", - "schedule": crontab(hour="*/6", minute=0), # Every 6 hours + "push-instance-metrics": { + "task": "plane.license.bgtasks.telemetry_metrics.push_instance_metrics", + "schedule": schedule(run_every=timedelta(minutes=METRICS_PUSH_INTERVAL_MINUTES)), }, # Occurs once every day "check-every-day-to-delete-hard-delete": { diff --git a/apps/api/plane/license/bgtasks/telemetry_metrics.py b/apps/api/plane/license/bgtasks/telemetry_metrics.py new file mode 100644 index 00000000000..41dcb0a74fa --- /dev/null +++ b/apps/api/plane/license/bgtasks/telemetry_metrics.py @@ -0,0 +1,381 @@ +# Copyright (c) 2023-present Plane Software, Inc. and contributors +# SPDX-License-Identifier: AGPL-3.0-only +# See the LICENSE file for details. + +# Python imports +import os +import logging +from urllib.parse import urlparse + +# Third party imports +from celery import shared_task +from django.db.models import Count +from opentelemetry import metrics +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.resources import Resource + +# Module imports +from plane.utils.otlp_endpoints import get_otlp_grpc_endpoint, get_otlp_http_metrics_url +from plane.license.models import Instance +from plane.db.models import ( + User, + Workspace, + Project, + Issue, + Module, + Cycle, + CycleIssue, + ModuleIssue, + Page, + WorkspaceMember, +) + +logger = logging.getLogger(__name__) + +WORKSPACE_METRICS_LIMIT = 1000 +FLUSH_TIMEOUT_MILLIS = 30000 +EXPORT_INTERVAL_MILLIS = 20000 + + +def _create_otlp_metric_exporter(): + """ + Create OTLP metric exporter based on OTLP_METRICS_PROTOCOL (http or grpc). + Uses shared endpoint helpers so metrics and traces target the same collector. + Default is grpc; override with OTLP_METRICS_PROTOCOL=http if needed. + """ + protocol = (os.environ.get("OTLP_METRICS_PROTOCOL") or "grpc").strip().lower() + + if protocol == "grpc": + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter as GrpcOTLPMetricExporter, + ) + + grpc_endpoint = get_otlp_grpc_endpoint() + insecure = os.environ.get("OTEL_EXPORTER_OTLP_METRICS_INSECURE", "").lower() == "true" + return GrpcOTLPMetricExporter(endpoint=grpc_endpoint, insecure=insecure) + + # HTTP fallback + from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( + OTLPMetricExporter as HttpOTLPMetricExporter, + ) + + return HttpOTLPMetricExporter(endpoint=get_otlp_http_metrics_url()) + + +def _collect_and_push_metrics() -> None: + """ + Collect instance metrics and push them to OTEL collector. + + Uses OTEL metrics SDK to push gauge metrics directly to the collector, + replacing the previous span-based tracing approach. + """ + # Check if the instance is registered + instance = Instance.objects.first() + + if instance is None: + logger.debug("No instance registered, skipping metrics push") + return + + if not instance.is_telemetry_enabled: + logger.debug("Telemetry disabled, skipping metrics push") + return + + # Configure OTEL metrics (gRPC default, or HTTP if OTLP_METRICS_PROTOCOL=http) + protocol = (os.environ.get("OTLP_METRICS_PROTOCOL") or "grpc").strip().lower() + export_endpoint = get_otlp_grpc_endpoint() if protocol == "grpc" else get_otlp_http_metrics_url() + + service_name = os.environ.get("SERVICE_NAME", "plane-ce-api") + + # Create resource with instance identification for the collector + resource = Resource.create({ + "service.name": service_name, + "instance_id": str(instance.instance_id or ""), + "plane.instance.type": "self-hosted", + }) + + # Configure the OTLP metric exporter (HTTP or gRPC) + logger.info(f"Configuring OTLP exporter: protocol={protocol}, endpoint={export_endpoint}") + exporter = _create_otlp_metric_exporter() + reader = PeriodicExportingMetricReader( + exporter, + export_interval_millis=EXPORT_INTERVAL_MILLIS, + ) + + # Create a new MeterProvider per execution. Gauges use callbacks that capture + # current DB counts, so we need fresh meters each run. provider.shutdown() in + # finally ensures clean teardown. For a 6-hour periodic task, this overhead is acceptable. + provider = MeterProvider(resource=resource, metric_readers=[reader]) + + try: + # Get a meter + meter = provider.get_meter(__name__) + + # Collect instance-level counts + user_count = User.objects.filter(is_bot=False).count() + workspace_count = Workspace.objects.count() + project_count = Project.objects.count() + issue_count = Issue.objects.count() + module_count = Module.objects.count() + cycle_count = Cycle.objects.count() + cycle_issue_count = CycleIssue.objects.count() + module_issue_count = ModuleIssue.objects.count() + page_count = Page.objects.exclude(owned_by__is_bot=True, access=1).count() + + # Derive domain from WEB_URL env var (e.g. https://plane.acmecorp.com -> plane.acmecorp.com). + # Prepend "//" for scheme-less values (e.g. "plane.acmecorp.com") so urlparse + # populates netloc correctly instead of treating the host as a path component. + web_url = os.environ.get("WEB_URL", "") + if web_url and "://" not in web_url: + web_url = "//" + web_url + domain = urlparse(web_url).netloc if web_url else "" + + # Common attributes for all instance-level metrics + instance_attrs = { + "instance_id": str(instance.instance_id or ""), + "instance_name": str(instance.instance_name or ""), + "current_version": str(instance.current_version or ""), + "latest_version": str(instance.latest_version or ""), + "edition": str(instance.edition or ""), + "domain": domain, + "is_verified": str(instance.is_verified).lower(), + "is_setup_done": str(instance.is_setup_done).lower(), + } + + # Create gauge callbacks for instance-level metrics + def users_callback(_options): + yield metrics.Observation(user_count, instance_attrs) + + def workspaces_callback(_options): + yield metrics.Observation(workspace_count, instance_attrs) + + def projects_callback(_options): + yield metrics.Observation(project_count, instance_attrs) + + def issues_callback(_options): + yield metrics.Observation(issue_count, instance_attrs) + + def modules_callback(_options): + yield metrics.Observation(module_count, instance_attrs) + + def cycles_callback(_options): + yield metrics.Observation(cycle_count, instance_attrs) + + def cycle_issues_callback(_options): + yield metrics.Observation(cycle_issue_count, instance_attrs) + + def module_issues_callback(_options): + yield metrics.Observation(module_issue_count, instance_attrs) + + def pages_callback(_options): + yield metrics.Observation(page_count, instance_attrs) + + # Register observable gauges for instance metrics + meter.create_observable_gauge( + name="plane_instance_users_total", + description="Total number of users in the Plane instance", + callbacks=[users_callback], + ) + meter.create_observable_gauge( + name="plane_instance_workspaces_total", + description="Total number of workspaces", + callbacks=[workspaces_callback], + ) + meter.create_observable_gauge( + name="plane_instance_projects_total", + description="Total number of projects across all workspaces", + callbacks=[projects_callback], + ) + meter.create_observable_gauge( + name="plane_instance_issues_total", + description="Total number of issues across all projects", + callbacks=[issues_callback], + ) + meter.create_observable_gauge( + name="plane_instance_modules_total", + description="Total number of modules", + callbacks=[modules_callback], + ) + meter.create_observable_gauge( + name="plane_instance_cycles_total", + description="Total number of cycles", + callbacks=[cycles_callback], + ) + meter.create_observable_gauge( + name="plane_instance_cycle_issues_total", + description="Total number of issues in cycles", + callbacks=[cycle_issues_callback], + ) + meter.create_observable_gauge( + name="plane_instance_module_issues_total", + description="Total number of issues in modules", + callbacks=[module_issues_callback], + ) + meter.create_observable_gauge( + name="plane_instance_pages_total", + description="Total number of pages", + callbacks=[pages_callback], + ) + + # Collect workspace-level metrics (limited to WORKSPACE_METRICS_LIMIT). + # Fetch workspaces in a deterministic order so the slice is stable across runs. + # Counts are batched into 6 aggregation queries instead of 6×N per-workspace + # queries (avoids N+1 at scale when WORKSPACE_METRICS_LIMIT is large). + instance_id_str = str(instance.instance_id or "") + workspaces = list(Workspace.objects.order_by("created_at")[:WORKSPACE_METRICS_LIMIT]) + workspace_ids = [ws.id for ws in workspaces] + + project_counts = dict( + Project.objects.filter(workspace_id__in=workspace_ids) + .values("workspace_id") + .annotate(count=Count("id")) + .values_list("workspace_id", "count") + ) + issue_counts = dict( + Issue.objects.filter(workspace_id__in=workspace_ids) + .values("workspace_id") + .annotate(count=Count("id")) + .values_list("workspace_id", "count") + ) + module_counts = dict( + Module.objects.filter(workspace_id__in=workspace_ids) + .values("workspace_id") + .annotate(count=Count("id")) + .values_list("workspace_id", "count") + ) + cycle_counts = dict( + Cycle.objects.filter(workspace_id__in=workspace_ids) + .values("workspace_id") + .annotate(count=Count("id")) + .values_list("workspace_id", "count") + ) + member_counts = dict( + WorkspaceMember.objects.filter(workspace_id__in=workspace_ids) + .values("workspace_id") + .annotate(count=Count("id")) + .values_list("workspace_id", "count") + ) + page_counts = dict( + Page.objects.filter(workspace_id__in=workspace_ids) + .exclude(owned_by__is_bot=True, access=1) + .values("workspace_id") + .annotate(count=Count("id")) + .values_list("workspace_id", "count") + ) + + workspace_metrics = [] + for workspace in workspaces: + ws_id = workspace.id + workspace_metrics.append({ + "instance_id": instance_id_str, + "workspace_id": str(ws_id), + "workspace_slug": str(workspace.slug), + "project_count": project_counts.get(ws_id, 0), + "issue_count": issue_counts.get(ws_id, 0), + "module_count": module_counts.get(ws_id, 0), + "cycle_count": cycle_counts.get(ws_id, 0), + "member_count": member_counts.get(ws_id, 0), + "page_count": page_counts.get(ws_id, 0), + }) + + def _ws_attrs(ws: dict) -> dict: + return { + "workspace_id": ws["workspace_id"], + "workspace_slug": ws["workspace_slug"], + "instance_id": ws["instance_id"], + } + + # Create callbacks for workspace-level metrics + def ws_projects_callback(_options): + for ws in workspace_metrics: + yield metrics.Observation(ws["project_count"], _ws_attrs(ws)) + + def ws_issues_callback(_options): + for ws in workspace_metrics: + yield metrics.Observation(ws["issue_count"], _ws_attrs(ws)) + + def ws_modules_callback(_options): + for ws in workspace_metrics: + yield metrics.Observation(ws["module_count"], _ws_attrs(ws)) + + def ws_cycles_callback(_options): + for ws in workspace_metrics: + yield metrics.Observation(ws["cycle_count"], _ws_attrs(ws)) + + def ws_members_callback(_options): + for ws in workspace_metrics: + yield metrics.Observation(ws["member_count"], _ws_attrs(ws)) + + def ws_pages_callback(_options): + for ws in workspace_metrics: + yield metrics.Observation(ws["page_count"], _ws_attrs(ws)) + + # Register observable gauges for workspace metrics + meter.create_observable_gauge( + name="plane_workspace_projects_total", + description="Number of projects per workspace", + callbacks=[ws_projects_callback], + ) + meter.create_observable_gauge( + name="plane_workspace_issues_total", + description="Number of issues per workspace", + callbacks=[ws_issues_callback], + ) + meter.create_observable_gauge( + name="plane_workspace_modules_total", + description="Number of modules per workspace", + callbacks=[ws_modules_callback], + ) + meter.create_observable_gauge( + name="plane_workspace_cycles_total", + description="Number of cycles per workspace", + callbacks=[ws_cycles_callback], + ) + meter.create_observable_gauge( + name="plane_workspace_members_total", + description="Number of members per workspace", + callbacks=[ws_members_callback], + ) + meter.create_observable_gauge( + name="plane_workspace_pages_total", + description="Number of pages per workspace", + callbacks=[ws_pages_callback], + ) + + # Force a synchronous flush to ensure all metrics are exported + # force_flush() blocks until all metrics are exported or timeout is reached + flush_success = provider.force_flush(timeout_millis=FLUSH_TIMEOUT_MILLIS) + + if flush_success: + logger.info( + f"Successfully pushed metrics to OTEL collector at {export_endpoint} " + f"for instance {instance.instance_id}" + ) + else: + logger.warning( + f"Metrics flush timed out for instance {instance.instance_id}, " + f"some metrics may not have been exported" + ) + + except Exception as e: + logger.exception(f"Error pushing metrics to OTEL collector: {e}") + # Don't re-raise: allow task to complete gracefully so it retries on next scheduled run + finally: + # Shutdown the provider to clean up resources + provider.shutdown() + + +@shared_task +def push_instance_metrics(): + """ + Celery task to push instance metrics to OTEL collector. + + Replaces the previous span-based tracing approach with OTLP metrics gauges. + Scheduled to run every 6 hours via Celery beat. + """ + logger.debug("Starting push_instance_metrics task") + try: + _collect_and_push_metrics() + logger.debug("Completed push_instance_metrics task") + except Exception as e: + logger.exception(f"Failed to push instance metrics: {e}") diff --git a/apps/api/plane/license/bgtasks/tracer.py b/apps/api/plane/license/bgtasks/tracer.py deleted file mode 100644 index f7c04b2a4b2..00000000000 --- a/apps/api/plane/license/bgtasks/tracer.py +++ /dev/null @@ -1,105 +0,0 @@ -# Copyright (c) 2023-present Plane Software, Inc. and contributors -# SPDX-License-Identifier: AGPL-3.0-only -# See the LICENSE file for details. - -# Third party imports -from celery import shared_task -from opentelemetry import trace - -# Module imports -from plane.license.models import Instance -from plane.db.models import ( - User, - Workspace, - Project, - Issue, - Module, - Cycle, - CycleIssue, - ModuleIssue, - Page, - WorkspaceMember, -) -from plane.utils.telemetry import init_tracer, shutdown_tracer - - -@shared_task -def instance_traces(): - try: - init_tracer() - # Check if the instance is registered - instance = Instance.objects.first() - - # If instance is None then return - if instance is None: - return - - if instance.is_telemetry_enabled: - # Get the tracer - tracer = trace.get_tracer(__name__) - # Instance details - with tracer.start_as_current_span("instance_details") as span: - # Count of all models - workspace_count = Workspace.objects.count() - user_count = User.objects.count() - project_count = Project.objects.count() - issue_count = Issue.objects.count() - module_count = Module.objects.count() - cycle_count = Cycle.objects.count() - cycle_issue_count = CycleIssue.objects.count() - module_issue_count = ModuleIssue.objects.count() - page_count = Page.objects.count() - - # Set span attributes - span.set_attribute("instance_id", instance.instance_id) - span.set_attribute("instance_name", instance.instance_name) - span.set_attribute("current_version", instance.current_version) - span.set_attribute("latest_version", instance.latest_version) - span.set_attribute("is_telemetry_enabled", instance.is_telemetry_enabled) - span.set_attribute("is_support_required", instance.is_support_required) - span.set_attribute("is_setup_done", instance.is_setup_done) - span.set_attribute("is_signup_screen_visited", instance.is_signup_screen_visited) - span.set_attribute("is_verified", instance.is_verified) - span.set_attribute("edition", instance.edition) - span.set_attribute("domain", instance.domain) - span.set_attribute("is_test", instance.is_test) - span.set_attribute("user_count", user_count) - span.set_attribute("workspace_count", workspace_count) - span.set_attribute("project_count", project_count) - span.set_attribute("issue_count", issue_count) - span.set_attribute("module_count", module_count) - span.set_attribute("cycle_count", cycle_count) - span.set_attribute("cycle_issue_count", cycle_issue_count) - span.set_attribute("module_issue_count", module_issue_count) - span.set_attribute("page_count", page_count) - - # Workspace details - for workspace in Workspace.objects.all(): - # Count of all models - project_count = Project.objects.filter(workspace=workspace).count() - issue_count = Issue.objects.filter(workspace=workspace).count() - module_count = Module.objects.filter(workspace=workspace).count() - cycle_count = Cycle.objects.filter(workspace=workspace).count() - cycle_issue_count = CycleIssue.objects.filter(workspace=workspace).count() - module_issue_count = ModuleIssue.objects.filter(workspace=workspace).count() - page_count = Page.objects.filter(workspace=workspace).count() - member_count = WorkspaceMember.objects.filter(workspace=workspace).count() - - # Set span attributes - with tracer.start_as_current_span("workspace_details") as span: - span.set_attribute("instance_id", instance.instance_id) - span.set_attribute("workspace_id", str(workspace.id)) - span.set_attribute("workspace_slug", workspace.slug) - span.set_attribute("project_count", project_count) - span.set_attribute("issue_count", issue_count) - span.set_attribute("module_count", module_count) - span.set_attribute("cycle_count", cycle_count) - span.set_attribute("cycle_issue_count", cycle_issue_count) - span.set_attribute("module_issue_count", module_issue_count) - span.set_attribute("page_count", page_count) - span.set_attribute("member_count", member_count) - - return - finally: - # Shutdown the tracer - shutdown_tracer() diff --git a/apps/api/plane/license/management/commands/register_instance.py b/apps/api/plane/license/management/commands/register_instance.py index 5ad6f7d2017..863ff62281e 100644 --- a/apps/api/plane/license/management/commands/register_instance.py +++ b/apps/api/plane/license/management/commands/register_instance.py @@ -15,7 +15,7 @@ # Module imports from plane.license.models import Instance, InstanceEdition -from plane.license.bgtasks.tracer import instance_traces +from plane.license.bgtasks.telemetry_metrics import push_instance_metrics class Command(BaseCommand): @@ -86,7 +86,7 @@ def handle(self, *args, **options): instance.edition = InstanceEdition.PLANE_COMMUNITY.value instance.save() - # Call the instance traces task - instance_traces.delay() + # Push instance metrics on registration + push_instance_metrics.delay() return diff --git a/apps/api/plane/settings/common.py b/apps/api/plane/settings/common.py index 59140c070d2..8dba3066e70 100644 --- a/apps/api/plane/settings/common.py +++ b/apps/api/plane/settings/common.py @@ -321,7 +321,7 @@ "plane.bgtasks.file_asset_task", "plane.bgtasks.email_notification_task", "plane.bgtasks.cleanup_task", - "plane.license.bgtasks.tracer", + "plane.license.bgtasks.telemetry_metrics", # management tasks "plane.bgtasks.dummy_data_task", # issue version tasks diff --git a/apps/api/plane/utils/otlp_endpoints.py b/apps/api/plane/utils/otlp_endpoints.py new file mode 100644 index 00000000000..11fecc14a41 --- /dev/null +++ b/apps/api/plane/utils/otlp_endpoints.py @@ -0,0 +1,59 @@ +# Copyright (c) 2023-present Plane Software, Inc. and contributors +# SPDX-License-Identifier: AGPL-3.0-only +# See the LICENSE file for details. + +""" +Shared OTLP endpoint helpers so metrics and traces use the same collector +when both are enabled. One URL (OTLP_ENDPOINT) is enough: same as traces +(e.g. https://telemetry.plane.so or https://telemetry.plane.town behind +nginx ingress with gRPC backend). +""" + +import os +from urllib.parse import urlparse + +# When no port in URL: https -> 443 (ingress), http -> 4317 (OTLP gRPC default) +OTLP_GRPC_DEFAULT_PORT = "4317" +HTTPS_DEFAULT_PORT = "443" + +_DEFAULT_OTLP_ENDPOINT = "https://telemetry.plane.so" + + +def grpc_endpoint_from_url(url: str) -> str: + """ + Derive gRPC host:port from OTLP_ENDPOINT URL. + - https://telemetry.plane.so -> telemetry.plane.so:443 (nginx ingress) + - https://telemetry.plane.town -> telemetry.plane.town:443 (dev) + - telemetry.plane.so:4317 -> telemetry.plane.so:4317 (scheme-less with port) + - telemetry.plane.so -> telemetry.plane.so:4317 (scheme-less, default gRPC port) + - Explicit port in URL is always preserved. + """ + # urlparse needs a scheme to correctly populate hostname/netloc. + # Scheme-less values like "host:port" are misread as scheme="host", path="port". + if "://" not in url: + url = "//" + url + parsed = urlparse(url) + host = parsed.hostname or "telemetry.plane.so" + if parsed.port is not None: + port = str(parsed.port) + elif parsed.scheme == "https": + port = HTTPS_DEFAULT_PORT + else: + port = OTLP_GRPC_DEFAULT_PORT + return f"{host}:{port}" + + +def get_otlp_grpc_endpoint() -> str: + """ + Return the gRPC endpoint (host:port) for OTLP traces and metrics. + Derived from OTLP_ENDPOINT so the same URL works for both (e.g. collector + behind nginx ingress with gRPC backend on 443). + """ + base = os.environ.get("OTLP_ENDPOINT", _DEFAULT_OTLP_ENDPOINT) + return grpc_endpoint_from_url(base) + + +def get_otlp_http_metrics_url() -> str: + """Return the HTTP URL for OTLP metrics (OTLP_ENDPOINT + /v1/metrics).""" + base = os.environ.get("OTLP_ENDPOINT", _DEFAULT_OTLP_ENDPOINT) + return f"{base.rstrip('/')}/v1/metrics" diff --git a/apps/api/plane/utils/telemetry.py b/apps/api/plane/utils/telemetry.py deleted file mode 100644 index e3646eaba14..00000000000 --- a/apps/api/plane/utils/telemetry.py +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright (c) 2023-present Plane Software, Inc. and contributors -# SPDX-License-Identifier: AGPL-3.0-only -# See the LICENSE file for details. - -# Python imports -import os -import atexit - -# Third party imports -from opentelemetry import trace -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.resources import Resource -from opentelemetry.instrumentation.django import DjangoInstrumentor - -# Global variable to track initialization -_TRACER_PROVIDER = None - - -def init_tracer(): - """Initialize OpenTelemetry with proper shutdown handling""" - global _TRACER_PROVIDER - - # If already initialized, return existing provider - if _TRACER_PROVIDER is not None: - return _TRACER_PROVIDER - - # Configure the tracer provider - service_name = os.environ.get("SERVICE_NAME", "plane-ce-api") - resource = Resource.create({"service.name": service_name}) - tracer_provider = TracerProvider(resource=resource) - - # Set as global tracer provider - trace.set_tracer_provider(tracer_provider) - - # Configure the OTLP exporter - otel_endpoint = os.environ.get("OTLP_ENDPOINT", "https://telemetry.plane.so") - otlp_exporter = OTLPSpanExporter(endpoint=otel_endpoint) - span_processor = BatchSpanProcessor(otlp_exporter) - tracer_provider.add_span_processor(span_processor) - - # Initialize Django instrumentation - DjangoInstrumentor().instrument() - - # Store provider globally - _TRACER_PROVIDER = tracer_provider - - # Register shutdown handler - atexit.register(shutdown_tracer) - - return tracer_provider - - -def shutdown_tracer(): - """Shutdown OpenTelemetry tracers and processors""" - global _TRACER_PROVIDER - - if _TRACER_PROVIDER is not None: - if hasattr(_TRACER_PROVIDER, "shutdown"): - _TRACER_PROVIDER.shutdown() - _TRACER_PROVIDER = None diff --git a/apps/api/requirements/base.txt b/apps/api/requirements/base.txt index 68c4a4a43cd..3aca207b684 100644 --- a/apps/api/requirements/base.txt +++ b/apps/api/requirements/base.txt @@ -67,6 +67,7 @@ opentelemetry-api==1.28.1 opentelemetry-sdk==1.28.1 opentelemetry-instrumentation-django==0.49b1 opentelemetry-exporter-otlp==1.28.1 +opentelemetry-exporter-otlp-proto-grpc==1.28.1 # OpenAPI Specification drf-spectacular==0.28.0 # html sanitizer