From 02dd2e8e42d9f3e6e225e82409835a5230fcc374 Mon Sep 17 00:00:00 2001 From: iceice400 Date: Sun, 7 Jun 2026 21:52:59 -0400 Subject: [PATCH] feat(metrics): Prometheus-compatible /metrics endpoint --- docs/CONFIGURATION.md | 35 +++ src/api/routes/metrics_routes.py | 359 +++++++++++++++++++++++++++++ src/api/server.py | 13 ++ src/capture/capture_coordinator.py | 9 + src/capture/concentrator_source.py | 5 + src/config.py | 10 + tests/test_metrics_routes.py | 144 ++++++++++++ 7 files changed, 575 insertions(+) create mode 100644 src/api/routes/metrics_routes.py create mode 100644 tests/test_metrics_routes.py diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index f12b077..9e62d23 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -477,6 +477,37 @@ storage: Packets are stored in a local SQLite database. Old packets are pruned automatically based on `max_packets_retained`. +### Prometheus metrics (`/metrics`) + +Optional Prometheus text scrape endpoint for LAN monitoring. **Off by default** — enabling does not change packet capture, relay, or dashboard behaviour. + +```yaml +metrics: + enabled: false + require_auth: true # when false, /metrics is open on the LAN (use firewall rules) +``` + +When `metrics.enabled: true`, Prometheus (or any scraper) can poll: + +```text +http://:8080/metrics +``` + +Exposed series include packet counts, node totals, RSSI/SNR averages, noise floor, relay stats, per-channel duty estimates (ToA), SX1302 CRC counters, and process uptime. Labels use protocol/channel/reason only — never PSKs, tokens, or node secrets. + +**Example `prometheus.yml` scrape job (auth disabled):** + +```yaml +scrape_configs: + - job_name: meshpoint + scrape_interval: 30s + static_configs: + - targets: ["192.168.1.50:8080"] + metrics_path: /metrics +``` + +When `require_auth: true`, configure your scraper to send the dashboard session cookie or Bearer JWT (same as other protected API routes). + --- ## Dashboard @@ -744,6 +775,10 @@ storage: # local SQLite packet store max_packets_retained: 100000 cleanup_interval_seconds: 3600 +metrics: # Prometheus /metrics scrape (off by default) + enabled: false + require_auth: true + dashboard: # local web UI host: "0.0.0.0" port: 8080 diff --git a/src/api/routes/metrics_routes.py b/src/api/routes/metrics_routes.py new file mode 100644 index 0000000..dd25562 --- /dev/null +++ b/src/api/routes/metrics_routes.py @@ -0,0 +1,359 @@ +"""Prometheus-compatible metrics scrape endpoint (PR 09). + +Zero-dependency text exposition format. Disabled by default via +``metrics.enabled`` in config. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from typing import TYPE_CHECKING + +from fastapi import APIRouter, Header, HTTPException, Request +from fastapi.responses import PlainTextResponse + +from src.config import MetricsConfig +from src.version import __version__ + +if TYPE_CHECKING: + from src.analytics.signal_analyzer import SignalAnalyzer + from src.analytics.stats_reporter import StatsReporter + from src.analytics.traffic_monitor import TrafficMonitor + from src.api.telemetry.noise_floor import NoiseFloorTracker + from src.capture.capture_coordinator import CaptureCoordinator + from src.relay.relay_manager import RelayManager + from src.storage.node_repository import NodeRepository + +router = APIRouter(tags=["metrics"]) + +_config: MetricsConfig | None = None +_start_time: datetime | None = None +_stats_reporter: StatsReporter | None = None +_signal_analyzer: SignalAnalyzer | None = None +_traffic_monitor: TrafficMonitor | None = None +_relay_manager: RelayManager | None = None +_node_repo: NodeRepository | None = None +_noise_floor_tracker: NoiseFloorTracker | None = None +_capture_coordinator: CaptureCoordinator | None = None +_region: str = "US" + + +def init_routes( + *, + metrics_config: MetricsConfig, + stats_reporter: StatsReporter, + signal_analyzer: SignalAnalyzer, + traffic_monitor: TrafficMonitor, + relay_manager: RelayManager, + node_repo: NodeRepository, + noise_floor_tracker: NoiseFloorTracker, + capture_coordinator: CaptureCoordinator, + region: str, +) -> None: + global _config, _start_time, _stats_reporter, _signal_analyzer + global _traffic_monitor, _relay_manager, _node_repo + global _noise_floor_tracker, _capture_coordinator, _region + _config = metrics_config + _start_time = datetime.now(timezone.utc) + _stats_reporter = stats_reporter + _signal_analyzer = signal_analyzer + _traffic_monitor = traffic_monitor + _relay_manager = relay_manager + _node_repo = node_repo + _noise_floor_tracker = noise_floor_tracker + _capture_coordinator = capture_coordinator + _region = region or "US" + + +@router.get("/metrics") +async def prometheus_metrics( + request: Request, + authorization: str | None = Header(default=None), +) -> PlainTextResponse: + if _config is None or not _config.enabled: + raise HTTPException(status_code=404, detail="metrics disabled") + + if _config.require_auth: + from src.api.auth.dependencies import require_auth + + await require_auth(request, authorization) + + body = await _render_metrics() + return PlainTextResponse( + body, + media_type="text/plain; version=0.0.4; charset=utf-8", + ) + + +async def _render_metrics() -> str: + writer = PrometheusWriter() + uptime = _uptime_seconds() + writer.gauge( + "meshpoint_uptime_seconds", + uptime, + help="Seconds since the metrics collector started", + ) + writer.gauge( + "meshpoint_info", + 1, + labels={"version": __version__, "region": _region}, + help="Meshpoint build info (always 1)", + ) + + if _stats_reporter is not None: + report = _stats_reporter.build_report() + writer.counter( + "meshpoint_packets_session_total", + report.get("total_packets", 0), + help="Decoded packets since last heartbeat reset", + ) + writer.gauge( + "meshpoint_packets_per_minute", + report.get("packets_per_minute", 0), + help="Session packet rate (packets per minute)", + ) + for protocol, count in (report.get("protocols") or {}).items(): + writer.counter( + "meshpoint_protocol_packets_session_total", + count, + labels={"protocol": _safe_label(protocol)}, + help="Session packets by protocol", + ) + rssi_count = report.get("rssi_count") or 0 + if rssi_count: + writer.gauge( + "meshpoint_rssi_average_dbm", + round(report["rssi_sum"] / rssi_count, 2), + help="Average RSSI over session samples", + ) + snr_count = report.get("snr_count") or 0 + if snr_count: + writer.gauge( + "meshpoint_snr_average_db", + round(report["snr_sum"] / snr_count, 2), + help="Average SNR over session samples", + ) + writer.counter( + "meshpoint_packets_direct_session_total", + report.get("direct_count", 0), + help="Direct (0-hop) packets in session", + ) + writer.counter( + "meshpoint_packets_relayed_session_total", + report.get("relayed_count", 0), + help="Relayed (1+ hop) packets in session", + ) + + if _traffic_monitor is not None: + traffic = await _traffic_monitor.get_traffic_summary() + writer.counter( + "meshpoint_packets_database_total", + traffic.get("total_packets", 0), + help="Total packets stored in SQLite", + ) + writer.gauge( + "meshpoint_packets_last_hour", + traffic.get("packets_last_hour", 0), + help="Packets received in the last hour", + ) + writer.gauge( + "meshpoint_packets_last_minute", + traffic.get("packets_last_minute", 0), + help="Packets received in the last minute", + ) + + if _signal_analyzer is not None: + signal = await _signal_analyzer.get_signal_summary() + if signal.get("avg_rssi") is not None: + writer.gauge( + "meshpoint_rssi_recent_average_dbm", + signal["avg_rssi"], + help="Average RSSI over the 200 most recent packets", + ) + if signal.get("avg_snr") is not None: + writer.gauge( + "meshpoint_snr_recent_average_db", + signal["avg_snr"], + help="Average SNR over the 200 most recent packets", + ) + + if _node_repo is not None: + writer.gauge( + "meshpoint_nodes_total", + await _node_repo.get_count(), + help="Known nodes in the local database", + ) + writer.gauge( + "meshpoint_nodes_active_24h", + await _node_repo.get_active_count(24), + help="Nodes heard in the last 24 hours", + ) + + if _noise_floor_tracker is not None: + floor = _noise_floor_tracker.snapshot() + value = floor.get("value_dbm") + if value is not None: + writer.gauge( + "meshpoint_noise_floor_dbm", + value, + labels={"source": _safe_label(floor.get("source") or "unknown")}, + help="Estimated noise floor (dBm)", + ) + writer.gauge( + "meshpoint_noise_floor_stale", + 1 if floor.get("stale") else 0, + help="1 when the noise-floor estimate is stale", + ) + + if _relay_manager is not None: + relay = _relay_manager.get_stats() + writer.gauge( + "meshpoint_relay_enabled", + 1 if relay.get("enabled") else 0, + help="1 when experimental relay is enabled", + ) + writer.counter( + "meshpoint_relay_relayed_total", + relay.get("relayed", 0), + help="Packets relayed since process start", + ) + writer.counter( + "meshpoint_relay_rejected_total", + relay.get("rejected", 0), + help="Packets rejected by relay filters", + ) + for reason, count in (relay.get("rejection_reasons") or {}).items(): + writer.counter( + "meshpoint_relay_rejected_total", + count, + labels={"reason": _safe_label(reason)}, + help="Relay rejections by reason", + ) + writer.gauge( + "meshpoint_relay_rate_per_minute", + relay.get("current_rate", 0), + help="Current relay rate (packets per minute)", + ) + writer.gauge( + "meshpoint_relay_rate_remaining", + relay.get("rate_remaining", 0), + help="Remaining relay capacity in the current window", + ) + budget = relay.get("channel_budget") or {} + writer.gauge( + "meshpoint_relay_duty_usage_percent", + budget.get("relay_total_usage_percent", 0), + help="Aggregate relay duty usage (ToA estimate)", + ) + for channel in budget.get("channels") or []: + ch = channel.get("channel") + if ch is None: + continue + writer.gauge( + "meshpoint_relay_duty_channel_usage_percent", + channel.get("usage_percent", 0), + labels={"channel": str(ch)}, + help="Per-channel relay duty usage (ToA estimate)", + ) + + if _capture_coordinator is not None: + rx = _capture_coordinator.concentrator_rx_stats() + writer.counter( + "meshpoint_rx_crc_bad_total", + rx.get("crc_bad_total", 0), + help="SX1302 CRC_BAD frames since concentrator start", + ) + writer.counter( + "meshpoint_rx_no_crc_total", + rx.get("no_crc_total", 0), + help="SX1302 NO_CRC frames since concentrator start", + ) + + return writer.render() + + +def _uptime_seconds() -> int: + if _start_time is None: + return 0 + return int((datetime.now(timezone.utc) - _start_time).total_seconds()) + + +def _safe_label(value: str) -> str: + """Restrict labels to safe alphanumeric tokens (no secrets).""" + cleaned = "".join( + ch if ch.isalnum() or ch in "-_" else "_" + for ch in str(value).strip() + ) + return cleaned[:64] or "unknown" + + +class PrometheusWriter: + """Minimal Prometheus text 0.0.4 writer (no external deps).""" + + def __init__(self) -> None: + self._lines: list[str] = [] + self._declared: set[tuple[str, str]] = set() + + def gauge( + self, + name: str, + value: int | float, + *, + labels: dict[str, str] | None = None, + help: str = "", + ) -> None: + self._emit(name, "gauge", value, labels=labels, help=help) + + def counter( + self, + name: str, + value: int | float, + *, + labels: dict[str, str] | None = None, + help: str = "", + ) -> None: + self._emit(name, "counter", value, labels=labels, help=help) + + def _emit( + self, + name: str, + metric_type: str, + value: int | float, + *, + labels: dict[str, str] | None, + help: str, + ) -> None: + key = (name, metric_type) + if key not in self._declared: + if help: + self._lines.append(f"# HELP {name} {help}") + self._lines.append(f"# TYPE {name} {metric_type}") + self._declared.add(key) + + label_str = _format_labels(labels) + if isinstance(value, float): + rendered = f"{value:.6g}" + else: + rendered = str(int(value)) + self._lines.append(f"{name}{label_str} {rendered}") + + def render(self) -> str: + return "\n".join(self._lines) + "\n" + + +def _format_labels(labels: dict[str, str] | None) -> str: + if not labels: + return "" + parts = [ + f'{_escape_label(k)}="{_escape_label(v)}"' + for k, v in sorted(labels.items()) + ] + return "{" + ",".join(parts) + "}" + + +def _escape_label(value: str) -> str: + return ( + str(value) + .replace("\\", "\\\\") + .replace("\n", "\\n") + .replace('"', '\\"') + ) diff --git a/src/api/server.py b/src/api/server.py index 3bd4fb0..0bbbfab 100644 --- a/src/api/server.py +++ b/src/api/server.py @@ -50,6 +50,7 @@ nodes, packets, public_radar_routes, + metrics_routes, stats_routes, system_config_routes, system_metrics, @@ -255,6 +256,7 @@ async def lifespan(app: FastAPI): ) app.include_router(auth_routes.router) + app.include_router(metrics_routes.router) app.include_router(identity_routes.router) app.include_router(auth_config_routes.router) app.include_router(public_radar_routes.router) @@ -1236,6 +1238,17 @@ def _init_routes( node_repo=coord.node_repo, packet_repo=coord.packet_repo, ) + metrics_routes.init_routes( + metrics_config=config.metrics, + stats_reporter=coord.stats_reporter, + signal_analyzer=signal_analyzer, + traffic_monitor=traffic_monitor, + relay_manager=coord.relay_manager, + node_repo=coord.node_repo, + noise_floor_tracker=noise_floor_tracker, + capture_coordinator=coord.capture_coordinator, + region=config.radio.region or "US", + ) meshcore_tx = None if tx_service and hasattr(tx_service, '_meshcore_tx'): diff --git a/src/capture/capture_coordinator.py b/src/capture/capture_coordinator.py index 2be03e4..eab23ec 100644 --- a/src/capture/capture_coordinator.py +++ b/src/capture/capture_coordinator.py @@ -51,6 +51,15 @@ async def stop(self) -> None: self._tasks.clear() logger.info("CaptureCoordinator stopped") + def concentrator_rx_stats(self) -> dict[str, int]: + """SX1302 CRC counters when a concentrator source is registered.""" + for source in self._sources: + stats = getattr(source, "rx_crc_stats", None) + if stats is not None: + bad, no_crc = stats + return {"crc_bad_total": bad, "no_crc_total": no_crc} + return {"crc_bad_total": 0, "no_crc_total": 0} + async def packets(self) -> AsyncIterator[RawCapture]: """Yield packets from all sources via the shared queue.""" while self._running: diff --git a/src/capture/concentrator_source.py b/src/capture/concentrator_source.py index b3cd2fb..0ea87a2 100644 --- a/src/capture/concentrator_source.py +++ b/src/capture/concentrator_source.py @@ -74,6 +74,11 @@ def name(self) -> str: def is_running(self) -> bool: return self._running + @property + def rx_crc_stats(self) -> tuple[int, int]: + """(crc_bad_total, no_crc_total) since concentrator start.""" + return self._wrapper.crc_bad_count, self._wrapper.no_crc_count + async def start(self) -> None: self._wrapper.load() diff --git a/src/config.py b/src/config.py index 4622e2b..3d79e18 100644 --- a/src/config.py +++ b/src/config.py @@ -114,6 +114,14 @@ class StorageConfig: cleanup_interval_seconds: int = 3600 +@dataclass +class MetricsConfig: + """Prometheus-compatible /metrics scrape endpoint (PR 09).""" + + enabled: bool = False + require_auth: bool = True + + @dataclass class DashboardConfig: host: str = "0.0.0.0" # nosec B104 -- intentional for local device dashboard @@ -319,6 +327,7 @@ class AppConfig: transmit: TransmitConfig = field(default_factory=TransmitConfig) web_auth: WebAuthConfig = field(default_factory=WebAuthConfig) location: LocationConfig = field(default_factory=LocationConfig) + metrics: MetricsConfig = field(default_factory=MetricsConfig) def _resolve_radio_frequency(radio: "RadioConfig") -> None: @@ -401,6 +410,7 @@ def _apply_yaml(cfg: AppConfig, path: Path) -> None: "transmit": cfg.transmit, "web_auth": cfg.web_auth, "location": cfg.location, + "metrics": cfg.metrics, } unknown_keys: list[str] = [] diff --git a/tests/test_metrics_routes.py b/tests/test_metrics_routes.py new file mode 100644 index 0000000..c84b760 --- /dev/null +++ b/tests/test_metrics_routes.py @@ -0,0 +1,144 @@ +"""Tests for Prometheus /metrics exposition (PR 09).""" +from __future__ import annotations + +import asyncio +import unittest +from unittest.mock import AsyncMock, MagicMock + +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from src.analytics.stats_reporter import StatsReporter +from src.api.routes import metrics_routes +from src.api.routes.metrics_routes import PrometheusWriter, _escape_label +from src.api.telemetry.noise_floor import NoiseFloorTracker +from src.config import MetricsConfig +from src.relay.relay_manager import RelayManager + + +def _run(coro): + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + +class TestPrometheusWriter(unittest.TestCase): + def test_renders_counter_and_gauge(self) -> None: + writer = PrometheusWriter() + writer.counter( + "meshpoint_packets_total", + 42, + help="Packet count", + ) + writer.gauge( + "meshpoint_noise_floor_dbm", + -108.5, + labels={"source": "packets"}, + help="Noise floor", + ) + text = writer.render() + self.assertIn("# HELP meshpoint_packets_total Packet count", text) + self.assertIn("# TYPE meshpoint_packets_total counter", text) + self.assertIn("meshpoint_packets_total 42", text) + self.assertIn('meshpoint_noise_floor_dbm{source="packets"} -108.5', text) + + def test_type_declared_once_with_multiple_label_sets(self) -> None: + writer = PrometheusWriter() + writer.counter("meshpoint_relay_rejected_total", 3, help="Rejected") + writer.counter( + "meshpoint_relay_rejected_total", + 1, + labels={"reason": "rate_limit"}, + ) + text = writer.render() + self.assertEqual(text.count("# TYPE meshpoint_relay_rejected_total"), 1) + + def test_escape_label_quotes(self) -> None: + self.assertEqual(_escape_label('say "hi"'), 'say \\"hi\\"') + + +class TestMetricsRoute(unittest.TestCase): + def _client( + self, + *, + enabled: bool = True, + require_auth: bool = False, + ) -> TestClient: + stats = StatsReporter() + stats.record_packet( + protocol="meshtastic", + packet_type="text", + rssi=-85.0, + snr=8.0, + hop_start=3, + hop_limit=3, + ) + + traffic = MagicMock() + traffic.get_traffic_summary = AsyncMock( + return_value={ + "total_packets": 100, + "packets_last_hour": 12, + "packets_last_minute": 1, + } + ) + signal = MagicMock() + signal.get_signal_summary = AsyncMock( + return_value={"avg_rssi": -90.0, "avg_snr": 7.5, "sample_count": 5} + ) + node_repo = MagicMock() + node_repo.get_count = AsyncMock(return_value=8) + node_repo.get_active_count = AsyncMock(return_value=5) + + relay = RelayManager(enabled=False, max_relay_per_minute=20, burst_size=5) + capture = MagicMock() + capture.concentrator_rx_stats.return_value = { + "crc_bad_total": 2, + "no_crc_total": 1, + } + + metrics_routes.init_routes( + metrics_config=MetricsConfig( + enabled=enabled, + require_auth=require_auth, + ), + stats_reporter=stats, + signal_analyzer=signal, + traffic_monitor=traffic, + relay_manager=relay, + node_repo=node_repo, + noise_floor_tracker=NoiseFloorTracker(), + capture_coordinator=capture, + region="US", + ) + app = FastAPI() + app.include_router(metrics_routes.router) + return TestClient(app) + + def test_disabled_returns_404(self) -> None: + client = self._client(enabled=False) + res = client.get("/metrics") + self.assertEqual(res.status_code, 404) + + def test_enabled_returns_prometheus_text(self) -> None: + client = self._client(enabled=True, require_auth=False) + res = client.get("/metrics") + self.assertEqual(res.status_code, 200) + self.assertIn("text/plain", res.headers["content-type"]) + body = res.text + self.assertIn("meshpoint_packets_session_total", body) + self.assertIn('protocol="meshtastic"', body) + self.assertIn("meshpoint_nodes_total", body) + self.assertIn("meshpoint_rx_crc_bad_total", body) + self.assertNotIn("1PG7OiAp", body) + + def test_render_metrics_async(self) -> None: + client = self._client(enabled=True, require_auth=False) + body = _run(metrics_routes._render_metrics()) + self.assertIn("meshpoint_uptime_seconds", body) + + +if __name__ == "__main__": + unittest.main()