diff --git a/.gitignore b/.gitignore index a3c0289..2831069 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,5 @@ scripts/ # Internal planning docs (not for public repo) private/ +.coverage +htmlcov/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 7bd989a..45b5186 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,62 @@ ## Unreleased +## 0.5.0rc3 (2026-05-13) + +Cleanup pass before the morning-signal cutover. + +### Added + +- **`TelegramNotifier.send_raw(text, *, parse_mode=, disable_notification=)`** — + adjacent flow-doctor subsystems can now POST arbitrary text through + the same bot + chat + thread + Markdown routing the structured + `send()` path uses, without conforming to the `Report` shape. + Returns the standard non-secret `"telegram:[:]"` + target identifier (or `None` on failure — never raises). + ``parse_mode=None`` / ``disable_notification=False`` are honoured as + explicit overrides via a sentinel default; pass nothing to inherit + the instance defaults. +- **`RemediationConfig.telegram_bot_token` + `telegram_chat_id` + + `telegram_message_thread_id`** — first-class Telegram fields for the + remediation pipeline. `_init_remediation` builds a real + `TelegramNotifier` from these (with the `FLOW_DOCTOR_TELEGRAM_*` + env-var fallback chain) and hands it to `RemediationExecutor`. + +### Changed + +- **`RemediationExecutor`** now accepts a `telegram_notifier: + TelegramNotifier | None` kwarg in addition to the legacy + `telegram_webhook_url`. When both are supplied, the notifier wins. + Remediation pings going through it pick up Markdown rendering, + threading, target-id audit (`actions.target` row), and the same + `validate()` preflight as the rest of the notifier surface. +- `examples/smoke_test.py` rewritten to lead with + `FlowDoctor.builder()` + `TelegramNotifierConfig` (instead of the + now-`@deprecated` `flow_doctor.init()`). Adds smoke checks for + `flow_doctor.context()` propagation, `report_async()` from an + asyncio context, and `flow_doctor.otel.report_to_otel_span_event` + serialization. All offline (FLOW_DOCTOR_SKIP_PREFLIGHT=1 + fake + creds + sqlite at temp path). +- `[tool.coverage.run]` section added to `pyproject.toml`. Use the + canonical `python -m coverage run -m pytest && python -m coverage + report` instead of `pytest --cov=` — the latter misreports + module-level statement coverage under editable installs because + pytest-cov instruments after the import has already happened. + +### Deprecated + +- **`RemediationConfig.telegram_webhook_url`** is now soft-deprecated. + Kept for 0.4.x yaml back-compat through the 0.5.x series; consumers + should migrate to `telegram_bot_token` + `telegram_chat_id` (with + optional `telegram_message_thread_id`). Will be removed in 0.6.0. + +### Coverage + +Suite: 393/393 pass (376 prior + 17 new for remediation-Telegram +migration). Project-wide coverage 84% (canonical measurement; the +pytest-cov number that previously read 67% was a tool quirk, not a +real regression). + ## 0.5.0rc2 (2026-05-13) Adds Telegram as the **recommended default notifier** for new consumers. diff --git a/examples/smoke_test.py b/examples/smoke_test.py index c75b41c..9f62a7c 100644 --- a/examples/smoke_test.py +++ b/examples/smoke_test.py @@ -1,28 +1,79 @@ """ -Smoke test for Flow Doctor Phase 1. -Run from the flow-doctor repo root: +Smoke test for Flow Doctor (0.5.0rc+ surface). + +Exercises the recommended FlowDoctor.builder() entry point, the typed +TelegramNotifierConfig (without actually firing — no real bot token +required), the flow_doctor.context() contextvars layer, the +report_async() coroutine, and the historical report-handling features +(guard / monitor / dedup / capture_logs / secret scrubbing / +never-crash-the-caller). All sqlite, no network. + +Run from the flow-doctor repo root:: + python examples/smoke_test.py """ -import flow_doctor + +from __future__ import annotations + +import asyncio import logging import os import tempfile +import flow_doctor +from flow_doctor import ( + EmailNotifierConfig, + FlowDoctor, + FlowDoctorProtocol, + TelegramNotifierConfig, +) + db_path = os.path.join(tempfile.gettempdir(), "fd_smoke_test.db") -# Clean up from prior runs if os.path.exists(db_path): os.remove(db_path) -fd = flow_doctor.init( - flow_name="test-flow", - repo="your-org/your-repo", - owner="@your-username", - store=f"sqlite:///{db_path}", +# Skip notifier preflight network calls — this smoke test runs offline +# and the fake credentials below would otherwise trip /getMe and the +# Gmail SMTP banner. Downstream consumers should NOT set this in prod. +os.environ["FLOW_DOCTOR_SKIP_PREFLIGHT"] = "1" + +# --------------------------------------------------------------------------- +# Build a FlowDoctor with the recommended (0.5.0rc+) builder API. +# Telegram is the recommended default; we wire fake creds since the +# smoke test stays offline. +# --------------------------------------------------------------------------- +fd: FlowDoctorProtocol = ( + FlowDoctor.builder("smoke-test") + .with_repo("your-org/your-repo", owner="@your-username") + .with_store(path=db_path) + .with_dedup(cooldown_minutes=60) + .add_notifier( + TelegramNotifierConfig( + bot_token="123:fake-smoke-token", + chat_id=-1001234567890, + # message_thread_id=42, # optional: forum-topic routing + ) + ) + # A second notifier to demonstrate the discriminated union — also + # fake creds since we stay offline. + .add_notifier( + EmailNotifierConfig( + sender="alerts@example.com", + recipients=["oncall@example.com"], + smtp_password="fake-app-password", + ) + ) + .build() ) print("=" * 60) -print("FLOW DOCTOR PHASE 1 — SMOKE TEST") +print("FLOW DOCTOR — SMOKE TEST (0.5.0rc+ surface)") print("=" * 60) +print(f"Built via: FlowDoctor.builder() → satisfies FlowDoctorProtocol: " + f"{isinstance(fd, FlowDoctorProtocol)}") +print(f"Notifiers: {len(fd.config.notify)} " + f"({', '.join(n.type for n in fd.config.notify)})") + # --- Test 1: Exception report --- print("\n--- Test 1: Exception report ---") @@ -33,6 +84,7 @@ print(f" Report ID: {report_id}") assert report_id is not None, "Expected a report ID" + # --- Test 2: guard() re-raises --- print("\n--- Test 2: guard() context manager ---") try: @@ -43,6 +95,7 @@ else: raise AssertionError("guard() should have re-raised") + # --- Test 3: monitor() decorator --- print("\n--- Test 3: @monitor decorator ---") @@ -57,6 +110,7 @@ def failing_function(): else: raise AssertionError("monitor() should have re-raised") + # --- Test 4: Dedup suppression --- print("\n--- Test 4: Dedup (5 identical errors → 1 report) ---") dedup_results = [] @@ -73,13 +127,39 @@ def failing_function(): dedup_count = sum(1 for r in dedup_results if r is None) print(f" → {new_count} new, {dedup_count} deduped") -# --- Test 5: Non-exception warning --- -print("\n--- Test 5: Non-exception warning ---") -report_id = fd.report("Scanner returned 0 candidates", severity="warning") -print(f" Warning report ID: {report_id}") -# --- Test 6: capture_logs() --- -print("\n--- Test 6: Log capture ---") +# --- Test 5: flow_doctor.context() contextvars propagation --- +print("\n--- Test 5: flow_doctor.context() ambient propagation ---") +with flow_doctor.context(flow_name="smoke-test-am", stage="ingest", run_id="run-42"): + try: + raise RuntimeError("ingest stage failed") + except Exception as e: + report_id = fd.report(e) + +# Verify the context landed on the persisted report. +recent = fd.history(limit=1)[0] +print(f" Report flow_name: {recent.context.get('flow_name')}") +print(f" Report stage: {recent.context.get('stage')}") +print(f" Report run_id: {recent.context.get('run_id')}") +assert recent.context.get("stage") == "ingest" +assert recent.context.get("run_id") == "run-42" + + +# --- Test 6: report_async() --- +print("\n--- Test 6: report_async() from an asyncio context ---") +async def async_pipeline(): + try: + raise TimeoutError("async pipeline tripped a deadline") + except Exception as e: + return await fd.report_async(e) + +async_report_id = asyncio.run(async_pipeline()) +print(f" Async report ID: {async_report_id}") +assert async_report_id is not None + + +# --- Test 7: capture_logs() --- +print("\n--- Test 7: Log capture ---") logger = logging.getLogger("test.scanner") with fd.capture_logs(level=logging.INFO): logger.info("Starting scanner with 900 tickers") @@ -90,8 +170,9 @@ def failing_function(): report_id = fd.report(e) print(f" Report with logs: {report_id}") -# --- Test 7: Secret scrubbing --- -print("\n--- Test 7: Secret scrubbing ---") + +# --- Test 8: Secret scrubbing --- +print("\n--- Test 8: Secret scrubbing ---") try: api_key = "AKIAIOSFODNN7EXAMPLE" raise RuntimeError(f"S3 auth failed with key {api_key}") @@ -105,12 +186,13 @@ def failing_function(): ) print(f" Scrubbed report ID: {report_id}") -# --- Test 8: report() never crashes --- -print("\n--- Test 8: report() never crashes caller ---") -# Create an fd with a broken store path -broken_fd = flow_doctor.init( - flow_name="broken-flow", - store="sqlite:////nonexistent/impossible/path/db.sqlite", + +# --- Test 9: report() never crashes its caller --- +print("\n--- Test 9: report() never crashes caller ---") +broken_fd = ( + FlowDoctor.builder("broken-flow") + .with_store(path="/nonexistent/impossible/path/db.sqlite") + .build(strict=False) # degraded mode — store init will fail loudly ) try: raise RuntimeError("this should not crash") @@ -119,11 +201,25 @@ def failing_function(): print(f" Broken store report result: {result} (None is OK)") print(" Caller survived — report() did not propagate") + +# --- Test 10: OTel serialization --- +print("\n--- Test 10: OTel SpanEvent serialization ---") +from flow_doctor.otel import report_to_otel_span_event + +span_event = report_to_otel_span_event(recent) # from test 5 +print(f" resource.service.name: {span_event['resource']['service.name']}") +print(f" event.name: {span_event['name']}") +print(f" severity_text: {span_event['severity_text']}") +print(f" attributes.context.run_id: " + f"{span_event['attributes'].get('context.run_id')}") + + # --- History --- print("\n--- Report History ---") for r in fd.history(limit=20): dedup_str = f" (dedup x{r.dedup_count})" if r.dedup_count > 1 else "" - print(f" [{r.severity:8s}] {r.error_type or 'msg'}: {r.error_message[:60]}{dedup_str}") + print(f" [{r.severity:8s}] {r.error_type or 'msg'}: " + f"{r.error_message[:60]}{dedup_str}") print("\n" + "=" * 60) print("ALL SMOKE TESTS PASSED") diff --git a/flow_doctor/__init__.py b/flow_doctor/__init__.py index d4e9c64..f54c746 100644 --- a/flow_doctor/__init__.py +++ b/flow_doctor/__init__.py @@ -34,4 +34,4 @@ "current_context", "init", ] -__version__ = "0.5.0rc2" +__version__ = "0.5.0rc3" diff --git a/flow_doctor/core/client.py b/flow_doctor/core/client.py index ac38ba7..80c9cee 100644 --- a/flow_doctor/core/client.py +++ b/flow_doctor/core/client.py @@ -387,9 +387,35 @@ def _init_remediation(self, config: FlowDoctorConfig) -> None: gate_config.market_close_hour = 0 self._decision_gate = DecisionGate(config=gate_config, store=self._store) + + # Preferred Telegram path (since 0.5.0rc3): build a real + # TelegramNotifier from the config and hand it to the + # executor. Falls back to the legacy webhook URL when only + # that's configured (for 0.4.x yaml back-compat). + telegram_notifier = None + tg_token = ( + config.remediation.telegram_bot_token + or _env_fallback("telegram_bot_token") + ) + tg_chat = config.remediation.telegram_chat_id + if tg_chat is None: + env_chat = _env_fallback("telegram_chat_id") + if env_chat is not None and env_chat.lstrip("-").isdigit(): + tg_chat = int(env_chat) + elif env_chat is not None: + tg_chat = env_chat + if tg_token and tg_chat not in (None, ""): + from flow_doctor.notify.telegram import TelegramNotifier + telegram_notifier = TelegramNotifier( + bot_token=tg_token, + chat_id=tg_chat, + message_thread_id=config.remediation.telegram_message_thread_id, + ) + self._remediation_executor = RemediationExecutor( dry_run=config.remediation.dry_run, store=self._store, + telegram_notifier=telegram_notifier, telegram_webhook_url=config.remediation.telegram_webhook_url, ) except Exception as e: diff --git a/flow_doctor/core/config.py b/flow_doctor/core/config.py index d416d85..f24522c 100644 --- a/flow_doctor/core/config.py +++ b/flow_doctor/core/config.py @@ -142,6 +142,21 @@ class RemediationConfig(_ConfigModel): max_auto_remediations_per_day: int = 2 max_auto_remediations_per_failure: int = 2 market_hours_lockout: bool = True + # Telegram routing for remediation auto-action pings. Preferred + # path since 0.5.0rc3 — the executor will build a real + # ``TelegramNotifier`` from these fields and route every remediation + # action's success / failure through it, picking up bot-token / + # chat-id / threading / Markdown rendering / target-id audit for + # free. Leave unset to skip Telegram notification entirely. + telegram_bot_token: Optional[str] = None + telegram_chat_id: Optional[Union[int, str]] = None + telegram_message_thread_id: Optional[int] = None + # Legacy webhook URL — kept for back-compat with 0.4.x configs. + # Was a misnomer (Telegram doesn't have user-installable webhooks + # the way Slack does), and the executor's bespoke POST format + # didn't compose with the rest of the notifier surface. New + # consumers should use ``telegram_bot_token`` + ``telegram_chat_id`` + # above; this field will be removed in 0.6.0. telegram_webhook_url: Optional[str] = None s3_audit_bucket: Optional[str] = None s3_audit_prefix: str = "flow-doctor/audit" @@ -456,6 +471,9 @@ def load_config( _defaults.max_auto_remediations_per_failure)), market_hours_lockout=rem_raw.get( "market_hours_lockout", _defaults.market_hours_lockout), + telegram_bot_token=rem_raw.get("telegram_bot_token"), + telegram_chat_id=rem_raw.get("telegram_chat_id"), + telegram_message_thread_id=rem_raw.get("telegram_message_thread_id"), telegram_webhook_url=rem_raw.get("telegram_webhook_url"), s3_audit_bucket=rem_raw.get("s3_audit_bucket"), s3_audit_prefix=rem_raw.get( diff --git a/flow_doctor/notify/telegram.py b/flow_doctor/notify/telegram.py index edf35bd..a92cff4 100644 --- a/flow_doctor/notify/telegram.py +++ b/flow_doctor/notify/telegram.py @@ -27,7 +27,7 @@ import json import logging import sys -from typing import Optional, Union +from typing import Any, Optional, Union from urllib.error import URLError from urllib.request import Request, urlopen @@ -42,6 +42,11 @@ _MAX_MESSAGE_LEN = 4096 _TRUNCATION_SUFFIX = "\n…[truncated]" +# Sentinel for ``send_raw`` overrides — lets us distinguish "caller +# didn't pass this kwarg, use instance default" from "caller explicitly +# passed None to override to plain text / push-with-sound". +_UNSET: Any = object() + class TelegramNotifier(Notifier): """Send alerts via the Telegram Bot API. @@ -146,6 +151,82 @@ def send( ) return None + def send_raw( + self, + text: str, + *, + parse_mode: Any = _UNSET, + disable_notification: Any = _UNSET, + ) -> Optional[str]: + """POST an arbitrary text message to the configured chat. + + Distinct from :meth:`send`, which formats a structured Report. + ``send_raw`` is the convenience for adjacent flow-doctor + subsystems (remediation, custom success pings) that want to ride + the same bot + chat + thread routing without conforming to the + Report shape. Returns the standard non-secret target identifier + on success, or None on failure (errors are logged, never raised). + + ``parse_mode`` and ``disable_notification`` default to the + instance values supplied at construction time. Explicit + overrides — including ``parse_mode=None`` for plain-text + rendering when the body contains characters that Markdown + would otherwise mangle — are honoured. The sentinel lets us + distinguish "use instance default" from "explicit None". + """ + text = _truncate(text) + payload: dict = { + "chat_id": self.chat_id, + "text": text, + } + mode = self.parse_mode if parse_mode is _UNSET else parse_mode + if mode: + payload["parse_mode"] = mode + if self.message_thread_id is not None: + payload["message_thread_id"] = self.message_thread_id + quiet = ( + self.disable_notification + if disable_notification is _UNSET + else disable_notification + ) + if quiet: + payload["disable_notification"] = True + + try: + data = json.dumps(payload).encode("utf-8") + req = Request( + f"{self._API_BASE}/bot{self.bot_token}/sendMessage", + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urlopen(req, timeout=10) as resp: + if resp.status == 200: + body = resp.read().decode("utf-8", errors="replace") + try: + parsed = json.loads(body) + except json.JSONDecodeError: + parsed = {} + if parsed.get("ok"): + target = f"telegram:{self.chat_id}" + if self.message_thread_id is not None: + target += f":{self.message_thread_id}" + return target + _logger.critical( + "flow-doctor Telegram send_raw ok=false: %s", + parsed.get("description", "unknown"), + ) + return None + _logger.critical( + "flow-doctor Telegram send_raw HTTP %s", resp.status, + ) + return None + except Exception as e: + _logger.warning( + "flow-doctor Telegram send_raw failed: %s", e, + ) + return None + def validate(self) -> None: """Preflight: confirm the bot token is valid via ``getMe``. diff --git a/flow_doctor/remediation/executor.py b/flow_doctor/remediation/executor.py index b4430f3..fe8b5da 100644 --- a/flow_doctor/remediation/executor.py +++ b/flow_doctor/remediation/executor.py @@ -11,11 +11,14 @@ import sys from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Any, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Optional from flow_doctor.remediation.decision_gate import Decision, DecisionType from flow_doctor.remediation.playbook import RemediationAction, RemediationType +if TYPE_CHECKING: + from flow_doctor.notify.telegram import TelegramNotifier + logger = logging.getLogger("flow_doctor.remediation") @@ -47,13 +50,26 @@ def __init__( sfn_client=None, ec2_client=None, store=None, + telegram_notifier: "Optional[TelegramNotifier]" = None, telegram_webhook_url: Optional[str] = None, ): + """ + Args: + telegram_notifier: First-class ``TelegramNotifier`` to route + remediation pings through (preferred since 0.5.0rc3). + When supplied, gets bot-token / chat-id / threading / + Markdown formatting / target-id auditing. + telegram_webhook_url: Legacy back-compat path — POSTs a + ``{"text": ...}`` body to an arbitrary URL via urllib. + Kept so 0.4.x ``flow-doctor.yaml`` configs keep working + without code changes; will be removed in 0.6.0. + """ self.dry_run = dry_run self._ssm = ssm_client self._sfn = sfn_client self._ec2 = ec2_client self._store = store + self._telegram_notifier = telegram_notifier self._telegram_url = telegram_webhook_url def execute(self, decision: Decision) -> ExecutionResult: @@ -249,24 +265,32 @@ def _save_audit(self, decision: Decision, result: ExecutionResult) -> None: logger.error("Failed to save remediation audit: %s", e) def _notify_telegram(self, decision: Decision, result: ExecutionResult) -> None: - """Send Telegram notification for every remediation action.""" - if not self._telegram_url: + """Send Telegram notification for every remediation action. + + Preferred path (0.5.0rc3+): a first-class ``TelegramNotifier`` + passed via ``telegram_notifier=``. The pre-rc3 ``telegram_webhook_url`` + path is kept for back-compat with 0.4.x yaml configs and will be + removed in 0.6.0. + """ + if not self._telegram_notifier and not self._telegram_url: return - try: - emoji = "✅" if result.success else "❌" - mode = "[DRY RUN] " if result.dry_run else "" - pattern = decision.playbook_match.name if decision.playbook_match else "unknown" - msg = ( - f"{emoji} {mode}flow-doctor auto-remediation\n" - f"Pattern: {pattern}\n" - f"Action: {result.action_type}\n" - f"Flow: {decision.diagnosis.flow_name}\n" - f"Root cause: {decision.diagnosis.root_cause[:200]}\n" - ) - if result.error: - msg += f"Error: {result.error[:200]}\n" + msg = self._format_remediation_message(decision, result) + # Prefer the first-class notifier when configured. + if self._telegram_notifier is not None: + try: + self._telegram_notifier.send_raw(msg) + except Exception as e: + # send_raw() already logs + swallows; this except is the + # belt-and-suspenders barrier for anything that slips + # past, since the executor must never crash on + # notification failure. + logger.warning("Telegram notifier failed: %s", e) + return + + # Legacy webhook-URL path. + try: import urllib.request data = json.dumps({"text": msg}).encode("utf-8") req = urllib.request.Request( @@ -277,3 +301,25 @@ def _notify_telegram(self, decision: Decision, result: ExecutionResult) -> None: urllib.request.urlopen(req, timeout=5) except Exception as e: logger.warning("Telegram notification failed: %s", e) + + @staticmethod + def _format_remediation_message( + decision: Decision, result: ExecutionResult + ) -> str: + emoji = "✅" if result.success else "❌" + mode = "[DRY RUN] " if result.dry_run else "" + pattern = ( + decision.playbook_match.name + if decision.playbook_match + else "unknown" + ) + msg = ( + f"{emoji} {mode}flow-doctor auto-remediation\n" + f"Pattern: {pattern}\n" + f"Action: {result.action_type}\n" + f"Flow: {decision.diagnosis.flow_name}\n" + f"Root cause: {decision.diagnosis.root_cause[:200]}\n" + ) + if result.error: + msg += f"Error: {result.error[:200]}\n" + return msg diff --git a/pyproject.toml b/pyproject.toml index 7eba07e..52100c9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "flow-doctor" -version = "0.5.0rc2" +version = "0.5.0rc3" description = "Pipeline error handler: capture, deduplicate, diagnose, and auto-fix failures." readme = "README.md" requires-python = ">=3.9" @@ -63,6 +63,29 @@ include = ["flow_doctor*"] # authoritative when consumers depend on flow-doctor in --strict mode. flow_doctor = ["py.typed"] +[tool.coverage.run] +# Canonical coverage entry — invoke as +# python -m coverage run -m pytest && python -m coverage report +# pytest-cov's --cov= flag misreports module-level statement coverage +# under editable installs (`pip install -e .`) because it instruments +# AFTER the import has already happened; the direct ``coverage run`` +# path measures correctly. +source = ["flow_doctor"] +branch = true +omit = [ + "tests/*", + "*/__pycache__/*", +] + +[tool.coverage.report] +exclude_lines = [ + "pragma: no cover", + "raise NotImplementedError", + "if TYPE_CHECKING:", + "if __name__ == .__main__.:", +] +show_missing = true + [tool.pytest.ini_options] testpaths = ["tests"] filterwarnings = [ diff --git a/tests/test_remediation_telegram_notifier.py b/tests/test_remediation_telegram_notifier.py new file mode 100644 index 0000000..56b465a --- /dev/null +++ b/tests/test_remediation_telegram_notifier.py @@ -0,0 +1,348 @@ +"""Tests for the 0.5.0rc3 remediation → TelegramNotifier migration. + +Covers: +- TelegramNotifier.send_raw() — adjacent subsystems (remediation, + custom success pings) firing arbitrary text through the same bot + + chat + thread routing. +- RemediationExecutor consuming a TelegramNotifier instance (the new + preferred path) — verifies the executor calls send_raw() with the + remediation-formatted body. +- RemediationExecutor legacy telegram_webhook_url path — verifies the + bespoke urllib.urlopen POST still works for 0.4.x back-compat. +- _init_remediation in core/client.py building a TelegramNotifier from + RemediationConfig.telegram_bot_token + telegram_chat_id fields. +""" + +from __future__ import annotations + +import json +import tempfile +from unittest.mock import MagicMock, patch + +import pytest + +from flow_doctor.core.client import FlowDoctor +from flow_doctor.core.config import FlowDoctorConfig, RemediationConfig, StoreConfig +from flow_doctor.notify.telegram import TelegramNotifier +from flow_doctor.remediation.decision_gate import ( + Decision, + DecisionType, +) +from flow_doctor.remediation.executor import ExecutionResult, RemediationExecutor + + +def _fake_urlopen_response(body: dict, status: int = 200): + resp = MagicMock() + resp.status = status + resp.read.return_value = json.dumps(body).encode("utf-8") + resp.__enter__ = lambda self: self + resp.__exit__ = lambda self, *a: False + return resp + + +# --------------------------------------------------------------------------- +# TelegramNotifier.send_raw() +# --------------------------------------------------------------------------- + + +def test_send_raw_posts_text_to_chat_returns_target_id(): + notifier = TelegramNotifier(bot_token="t", chat_id=-100) + with patch("flow_doctor.notify.telegram.urlopen") as mock_urlopen: + mock_urlopen.return_value = _fake_urlopen_response({"ok": True}) + target = notifier.send_raw("hello remediation") + assert target == "telegram:-100" + payload = json.loads(mock_urlopen.call_args[0][0].data.decode("utf-8")) + assert payload["text"] == "hello remediation" + assert payload["chat_id"] == -100 + assert payload["parse_mode"] == "Markdown" # default + + +def test_send_raw_includes_message_thread_id_when_set(): + notifier = TelegramNotifier(bot_token="t", chat_id=1, message_thread_id=99) + with patch("flow_doctor.notify.telegram.urlopen") as mock_urlopen: + mock_urlopen.return_value = _fake_urlopen_response({"ok": True}) + target = notifier.send_raw("threaded") + payload = json.loads(mock_urlopen.call_args[0][0].data.decode("utf-8")) + assert payload["message_thread_id"] == 99 + assert target == "telegram:1:99" + + +def test_send_raw_override_parse_mode_per_call(): + """parse_mode arg overrides the instance default — useful for the + remediation pings where we may want plain text instead of Markdown.""" + notifier = TelegramNotifier(bot_token="t", chat_id=1, parse_mode="Markdown") + with patch("flow_doctor.notify.telegram.urlopen") as mock_urlopen: + mock_urlopen.return_value = _fake_urlopen_response({"ok": True}) + notifier.send_raw("plain", parse_mode=None) + payload = json.loads(mock_urlopen.call_args[0][0].data.decode("utf-8")) + assert "parse_mode" not in payload + + +def test_send_raw_returns_none_on_api_failure(): + notifier = TelegramNotifier(bot_token="t", chat_id=1) + with patch("flow_doctor.notify.telegram.urlopen") as mock_urlopen: + mock_urlopen.return_value = _fake_urlopen_response( + {"ok": False, "description": "Forbidden"} + ) + target = notifier.send_raw("nope") + assert target is None + + +def test_send_raw_swallows_network_failures(): + notifier = TelegramNotifier(bot_token="t", chat_id=1) + with patch( + "flow_doctor.notify.telegram.urlopen", + side_effect=ConnectionError("offline"), + ): + # Must NOT raise — adjacent subsystems rely on this for + # never-crash-the-caller semantics. + target = notifier.send_raw("never raises") + assert target is None + + +def test_send_raw_truncates_at_telegram_4096_limit(): + notifier = TelegramNotifier(bot_token="t", chat_id=1) + long = "x" * 5000 + with patch("flow_doctor.notify.telegram.urlopen") as mock_urlopen: + mock_urlopen.return_value = _fake_urlopen_response({"ok": True}) + notifier.send_raw(long) + payload = json.loads(mock_urlopen.call_args[0][0].data.decode("utf-8")) + assert len(payload["text"]) == 4096 + assert payload["text"].endswith("[truncated]") + + +def test_send_raw_http_non_200_returns_none(): + notifier = TelegramNotifier(bot_token="t", chat_id=1) + with patch("flow_doctor.notify.telegram.urlopen") as mock_urlopen: + mock_urlopen.return_value = _fake_urlopen_response( + {"ok": True}, status=502 + ) + target = notifier.send_raw("x") + assert target is None + + +# --------------------------------------------------------------------------- +# RemediationExecutor — first-class TelegramNotifier path (rc3+) +# --------------------------------------------------------------------------- + + +def _make_decision(success: bool = True) -> tuple[Decision, ExecutionResult]: + """Build a minimal Decision + ExecutionResult fixture for the + remediation notification path. The remediation pipeline normally + populates these — here we hand-build them with just the fields the + notification formatter actually reads.""" + decision = MagicMock(spec=Decision) + decision.decision_type = DecisionType.AUTO_REMEDIATE + decision.playbook_match = MagicMock() + decision.playbook_match.name = "service_down" + decision.diagnosis = MagicMock() + decision.diagnosis.flow_name = "alpha-engine-predictor" + decision.diagnosis.root_cause = "Lambda init exceeded 10s on cold-start" + + result = ExecutionResult( + success=success, + action_type="restart_service", + dry_run=False, + error="" if success else "ssm send_command failed", + ) + return decision, result + + +def test_executor_with_telegram_notifier_invokes_send_raw(): + notifier = TelegramNotifier(bot_token="t", chat_id=-100) + executor = RemediationExecutor( + dry_run=False, + store=MagicMock(), + telegram_notifier=notifier, + ) + decision, result = _make_decision(success=True) + + with patch.object(notifier, "send_raw", wraps=notifier.send_raw) as spy: + with patch("flow_doctor.notify.telegram.urlopen") as mock_urlopen: + mock_urlopen.return_value = _fake_urlopen_response({"ok": True}) + executor._notify_telegram(decision, result) + + assert spy.call_count == 1 + sent_text = spy.call_args[0][0] + assert "service_down" in sent_text + assert "restart_service" in sent_text + assert "alpha-engine-predictor" in sent_text + assert "✅" in sent_text # success emoji + assert "DRY RUN" not in sent_text + + +def test_executor_failure_message_includes_error_and_red_emoji(): + notifier = TelegramNotifier(bot_token="t", chat_id=1) + executor = RemediationExecutor( + store=MagicMock(), telegram_notifier=notifier + ) + decision, result = _make_decision(success=False) + + with patch.object(notifier, "send_raw") as mock_send: + executor._notify_telegram(decision, result) + + sent_text = mock_send.call_args[0][0] + assert "❌" in sent_text + assert "ssm send_command failed" in sent_text + + +def test_executor_dry_run_message_includes_dry_run_tag(): + notifier = TelegramNotifier(bot_token="t", chat_id=1) + executor = RemediationExecutor( + store=MagicMock(), telegram_notifier=notifier + ) + decision, result = _make_decision() + result.dry_run = True + + with patch.object(notifier, "send_raw") as mock_send: + executor._notify_telegram(decision, result) + + sent_text = mock_send.call_args[0][0] + assert "[DRY RUN]" in sent_text + + +def test_executor_notifier_path_swallows_send_raw_exceptions(): + """send_raw already swallows + logs failures internally, but the + executor adds a belt-and-suspenders try/except around it so an + unexpected raise can't crash the remediation pipeline.""" + notifier = TelegramNotifier(bot_token="t", chat_id=1) + executor = RemediationExecutor( + store=MagicMock(), telegram_notifier=notifier + ) + decision, result = _make_decision() + + with patch.object( + notifier, "send_raw", side_effect=RuntimeError("synthetic") + ): + # Must NOT raise. + executor._notify_telegram(decision, result) + + +# --------------------------------------------------------------------------- +# RemediationExecutor — legacy webhook URL path +# --------------------------------------------------------------------------- + + +def test_executor_legacy_webhook_url_still_posts(): + """Back-compat: 0.4.x configs only had telegram_webhook_url. The + legacy code path must still POST the same body shape it did before.""" + executor = RemediationExecutor( + store=MagicMock(), + telegram_webhook_url="https://example.com/some-webhook", + ) + decision, result = _make_decision() + + with patch("urllib.request.urlopen") as mock_urlopen: + executor._notify_telegram(decision, result) + + assert mock_urlopen.call_count == 1 + req = mock_urlopen.call_args[0][0] + assert req.full_url == "https://example.com/some-webhook" + payload = json.loads(req.data.decode("utf-8")) + assert "text" in payload + assert "service_down" in payload["text"] + + +def test_executor_with_neither_telegram_path_is_noop(): + executor = RemediationExecutor(store=MagicMock()) + decision, result = _make_decision() + + with patch("urllib.request.urlopen") as mock_urlopen: + executor._notify_telegram(decision, result) + + assert mock_urlopen.call_count == 0 + + +def test_executor_prefers_notifier_over_legacy_url(): + """When both are configured (transitional config), the notifier + wins. The legacy URL is the fallback for installs that haven't + moved over yet.""" + notifier = TelegramNotifier(bot_token="t", chat_id=1) + executor = RemediationExecutor( + store=MagicMock(), + telegram_notifier=notifier, + telegram_webhook_url="https://legacy.example.com/hook", + ) + decision, result = _make_decision() + + with patch.object(notifier, "send_raw") as mock_send_raw: + with patch("urllib.request.urlopen") as mock_urlopen: + executor._notify_telegram(decision, result) + + assert mock_send_raw.call_count == 1 + assert mock_urlopen.call_count == 0 # legacy path NOT touched + + +# --------------------------------------------------------------------------- +# _init_remediation in core/client.py — builds TelegramNotifier from +# RemediationConfig.telegram_bot_token + telegram_chat_id +# --------------------------------------------------------------------------- + + +def _build_fd_with_remediation_telegram( + *, + bot_token=None, + chat_id=None, + thread_id=None, + webhook_url=None, + db_path: str = ":memory:", +) -> FlowDoctor: + config = FlowDoctorConfig( + flow_name="rem-test", + store=StoreConfig(type="sqlite", path=db_path), + remediation=RemediationConfig( + enabled=True, + dry_run=True, + telegram_bot_token=bot_token, + telegram_chat_id=chat_id, + telegram_message_thread_id=thread_id, + telegram_webhook_url=webhook_url, + ), + ) + return FlowDoctor(config) + + +def test_init_remediation_builds_telegram_notifier_from_config(monkeypatch): + monkeypatch.delenv("FLOW_DOCTOR_TELEGRAM_BOT_TOKEN", raising=False) + monkeypatch.delenv("FLOW_DOCTOR_TELEGRAM_CHAT_ID", raising=False) + with tempfile.NamedTemporaryFile(suffix=".db") as f: + fd = _build_fd_with_remediation_telegram( + bot_token="123:abc", + chat_id=-100, + thread_id=7, + db_path=f.name, + ) + executor = fd._remediation_executor + assert executor is not None + assert executor._telegram_notifier is not None + assert executor._telegram_notifier.bot_token == "123:abc" + assert executor._telegram_notifier.chat_id == -100 + assert executor._telegram_notifier.message_thread_id == 7 + + +def test_init_remediation_pulls_telegram_creds_from_env(monkeypatch): + """Same env-var contract as the standalone TelegramNotifier wiring.""" + monkeypatch.setenv("FLOW_DOCTOR_TELEGRAM_BOT_TOKEN", "env-token") + monkeypatch.setenv("FLOW_DOCTOR_TELEGRAM_CHAT_ID", "-1001234") + with tempfile.NamedTemporaryFile(suffix=".db") as f: + fd = _build_fd_with_remediation_telegram(db_path=f.name) + notifier = fd._remediation_executor._telegram_notifier + assert notifier is not None + assert notifier.bot_token == "env-token" + assert notifier.chat_id == -1001234 # coerced from str + + +def test_init_remediation_no_telegram_when_only_legacy_url(monkeypatch): + """If only the legacy webhook URL is set (and no bot creds via + config or env), the new notifier is None and the executor falls + through to the legacy POST path.""" + monkeypatch.delenv("FLOW_DOCTOR_TELEGRAM_BOT_TOKEN", raising=False) + monkeypatch.delenv("FLOW_DOCTOR_TELEGRAM_CHAT_ID", raising=False) + with tempfile.NamedTemporaryFile(suffix=".db") as f: + fd = _build_fd_with_remediation_telegram( + webhook_url="https://legacy.example/hook", + db_path=f.name, + ) + executor = fd._remediation_executor + assert executor._telegram_notifier is None + assert executor._telegram_url == "https://legacy.example/hook"