From f4c3a2b3f1694f54954ea17228a4b0b46bb44b43 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Wed, 13 May 2026 13:41:58 -0700 Subject: [PATCH] =?UTF-8?q?Add=20TelegramNotifier=20=E2=80=94=20recommende?= =?UTF-8?q?d=20default=20for=20new=20consumers=20(rc2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Telegram beats SMTP / SES / Slack on every axis that matters for single-dev and small-team ops: two-minute @BotFather setup, no app password / verified-identity dance, per-flow routing via chat_id or forum message_thread_id, mobile push for free, clean token rotation. This rc promotes it to the recommended default; SMTP / SES (via the existing EmailNotifier) / Slack / GitHub / S3 stay as alternates for consumers that need them. - flow_doctor/notify/telegram.py: TelegramNotifier (Bot API sendMessage POST, urllib-only to keep core dep-free, 4096-char Telegram-limit truncation with [truncated] sentinel, /getMe preflight that respects FLOW_DOCTOR_SKIP_PREFLIGHT). send() returns the non-secret target identifier "telegram:[:]" so action.target never persists the bot token. - flow_doctor/notify/configs.py: TelegramNotifierConfig joins the Field(discriminator="type") union. Accepts chat_id as int (typical, negative for supergroups / channels) or str ("@channelusername" for public channels). - flow_doctor/core/config.py: NotifyChannelConfig extended with bot_token / chat_id / message_thread_id / parse_mode (default Markdown) / disable_notification fields so yaml-driven configs and the to_channel_config() lift path both work. _parse_notify_dicts forwards the new keys. - flow_doctor/core/client.py: _init_notifiers handles type="telegram" with bot_token + chat_id ConfigError messages pointing at @BotFather and the getUpdates lookup. FLOW_DOCTOR_TELEGRAM_BOT_TOKEN + FLOW_DOCTOR_TELEGRAM_CHAT_ID env fallbacks (with TELEGRAM_BOT_TOKEN / TELEGRAM_CHAT_ID as conventional aliases); numeric env chat_ids coerce to int, @channelusername stays str. _send_notifications dispatches to ActionType.TELEGRAM_ALERT. - flow_doctor/core/models.py: TELEGRAM_ALERT joins the ActionType enum. - flow_doctor/{__init__,notify/__init__}.py: TelegramNotifierConfig re-exported at both package roots. - Version bump 0.5.0rc1 → 0.5.0rc2 in pyproject.toml + __init__.py. CHANGELOG entry documents the rationale + setup recipe. 20 new tests cover: round-trip through legacy NotifyChannelConfig, builder integration, env-var coercion (numeric → int, @channel → str), missing-field ConfigError shape, urllib POST URL + payload (chat_id / text / parse_mode / message_thread_id / disable_notification gating), target-id format on success, never-raises behavior on API ok=false and network failures, truncation, ActionType dispatch end-to-end through _send_notifications, and preflight skip + auth failure paths. Suite: 376/376 pass (356 prior + 20 new). One pre-existing assertion in test_builder used "telegram" as an example unknown channel type — updated to "pagerduty" since telegram is now a first-class member of the discriminated union. Co-Authored-By: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 37 ++++ flow_doctor/__init__.py | 4 +- flow_doctor/core/client.py | 48 ++++- flow_doctor/core/config.py | 16 +- flow_doctor/core/models.py | 1 + flow_doctor/notify/__init__.py | 2 + flow_doctor/notify/configs.py | 43 ++++ flow_doctor/notify/telegram.py | 253 ++++++++++++++++++++++++ pyproject.toml | 2 +- tests/test_builder.py | 2 +- tests/test_telegram_notifier.py | 338 ++++++++++++++++++++++++++++++++ 11 files changed, 740 insertions(+), 6 deletions(-) create mode 100644 flow_doctor/notify/telegram.py create mode 100644 tests/test_telegram_notifier.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 066dbf3..7bd989a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,43 @@ ## Unreleased +## 0.5.0rc2 (2026-05-13) + +Adds Telegram as the **recommended default notifier** for new consumers. + +### Added + +- **`TelegramNotifier` + `TelegramNotifierConfig`.** Sends alerts via the + Telegram Bot API. Setup is two minutes (message `@BotFather` → `/newbot` + → save the token → grab the `chat_id` from + `https://api.telegram.org/bot/getUpdates`). One bot fans out to + N flows via `chat_id` and the optional `message_thread_id` (forum + topics in supergroups), no per-channel webhook required. + Env-var contract: `FLOW_DOCTOR_TELEGRAM_BOT_TOKEN` + + `FLOW_DOCTOR_TELEGRAM_CHAT_ID` (with `TELEGRAM_BOT_TOKEN` / + `TELEGRAM_CHAT_ID` as conventional aliases). + Numeric env chat_ids auto-coerce to `int`; `@channelusername` style + stays `str`. Persisted action target is the non-secret + `telegram:[:]` identifier — never the bot token. +- **`ActionType.TELEGRAM_ALERT`** in the persisted action enum. +- **Telegram parse / payload knobs** in both the typed config and the + yaml-driven omnibus form: `parse_mode` (default `"Markdown"`), + `disable_notification`, `message_thread_id`. +- **Preflight bypass parity.** `TelegramNotifier.validate()` calls + `/getMe` to fail fast on a revoked bot token, with the same + `FLOW_DOCTOR_SKIP_PREFLIGHT=1` opt-out the other notifiers use for + tests / offline boot. + +### Rationale + +For single-dev and small-team ops, Telegram beats SMTP/SES/Slack on +setup cost (no app password, no verified-identity dance, no workspace +admin), routing (per-chat or per-thread is built in), and mobile UX +(push is automatic). Slack / Email / GitHub / S3 stay as alternates; +the change is to which notifier the README + builder examples lead with. + +Suite: 376/376 pass (+20 new Telegram tests). + ## 0.5.0rc1 (2026-05-13) Release-candidate cut of the "plug-and-play" release for internal soak. diff --git a/flow_doctor/__init__.py b/flow_doctor/__init__.py index ee8e5c3..d4e9c64 100644 --- a/flow_doctor/__init__.py +++ b/flow_doctor/__init__.py @@ -13,6 +13,7 @@ NotifierConfig, S3NotifierConfig, SlackNotifierConfig, + TelegramNotifierConfig, ) __all__ = [ @@ -28,8 +29,9 @@ "S3NotifierConfig", "Severity", "SlackNotifierConfig", + "TelegramNotifierConfig", "context", "current_context", "init", ] -__version__ = "0.5.0rc1" +__version__ = "0.5.0rc2" diff --git a/flow_doctor/core/client.py b/flow_doctor/core/client.py index 5900da4..ac38ba7 100644 --- a/flow_doctor/core/client.py +++ b/flow_doctor/core/client.py @@ -49,6 +49,8 @@ "slack_webhook": ["FLOW_DOCTOR_SLACK_WEBHOOK", "SLACK_WEBHOOK_URL"], "anthropic_api_key": ["FLOW_DOCTOR_ANTHROPIC_API_KEY", "ANTHROPIC_API_KEY"], "s3_bucket": ["FLOW_DOCTOR_S3_BUCKET", "CHANGELOG_BUCKET"], + "telegram_bot_token": ["FLOW_DOCTOR_TELEGRAM_BOT_TOKEN", "TELEGRAM_BOT_TOKEN"], + "telegram_chat_id": ["FLOW_DOCTOR_TELEGRAM_CHAT_ID", "TELEGRAM_CHAT_ID"], } @@ -288,10 +290,51 @@ def _init_notifiers(config: FlowDoctorConfig) -> List[Notifier]: default_resolution_type=nc.default_resolution_type, )) + elif nc.type == "telegram": + bot_token = nc.bot_token or _env_fallback("telegram_bot_token") + raw_chat = nc.chat_id + if raw_chat is None: + raw_chat = _env_fallback("telegram_chat_id") + # chat_id from env is a string; coerce to int when + # it's a numeric id (negative for channels/groups, + # positive for users) so the Bot API receives the + # right JSON type. ``@channelusername`` style stays str. + if raw_chat is not None and ( + raw_chat.lstrip("-").isdigit() + ): + raw_chat = int(raw_chat) + missing = [] + if not bot_token: + missing.append( + f"bot_token (or one of: " + f"{', '.join(_ENV_FALLBACKS['telegram_bot_token'])}). " + "Create one via @BotFather → /newbot" + ) + if raw_chat in (None, ""): + missing.append( + f"chat_id (or one of: " + f"{', '.join(_ENV_FALLBACKS['telegram_chat_id'])}). " + "After messaging the bot, look it up at " + "https://api.telegram.org/bot/getUpdates" + ) + if missing: + raise ConfigError( + f"{label}: telegram notifier is missing required field(s): " + f"{'; '.join(missing)}." + ) + from flow_doctor.notify.telegram import TelegramNotifier + notifiers.append(TelegramNotifier( + bot_token=bot_token, + chat_id=raw_chat, + message_thread_id=nc.message_thread_id, + parse_mode=nc.parse_mode, + disable_notification=nc.disable_notification, + )) + else: raise ConfigError( f"{label}: unknown notifier type '{nc.type}'. " - f"Supported types: slack, email, github, s3." + f"Supported types: slack, email, github, s3, telegram." ) return notifiers @@ -708,6 +751,7 @@ def _send_notifications( from flow_doctor.notify.email import EmailNotifier from flow_doctor.notify.github import GitHubNotifier from flow_doctor.notify.s3 import S3Notifier + from flow_doctor.notify.telegram import TelegramNotifier if isinstance(notifier, SlackNotifier): action_type = ActionType.SLACK_ALERT.value @@ -717,6 +761,8 @@ def _send_notifications( action_type = ActionType.GITHUB_ISSUE.value elif isinstance(notifier, S3Notifier): action_type = ActionType.S3_ALERT.value + elif isinstance(notifier, TelegramNotifier): + action_type = ActionType.TELEGRAM_ALERT.value else: action_type = "unknown_alert" diff --git a/flow_doctor/core/config.py b/flow_doctor/core/config.py index 28bb435..d416d85 100644 --- a/flow_doctor/core/config.py +++ b/flow_doctor/core/config.py @@ -13,7 +13,7 @@ import os import re from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union import yaml from pydantic import BaseModel, ConfigDict, Field @@ -46,7 +46,7 @@ class _ConfigModel(BaseModel): category=None, ) class NotifyChannelConfig(_ConfigModel): - type: str # "slack", "email", "github", or "s3" + type: str # "slack", "email", "github", "s3", or "telegram" # Slack fields webhook_url: Optional[str] = None channel: Optional[str] = None @@ -68,6 +68,13 @@ class NotifyChannelConfig(_ConfigModel): entry_prefix: str = "changelog/entries" default_root_cause_category: str = "code_bug" default_resolution_type: Optional[str] = None + # Telegram fields (Bot API). bot_token + chat_id are required at + # init time; message_thread_id is optional for forum topics. + bot_token: Optional[str] = None + chat_id: Optional[Union[int, str]] = None + message_thread_id: Optional[int] = None + parse_mode: Optional[str] = "Markdown" + disable_notification: bool = False class StoreConfig(_ConfigModel): @@ -278,6 +285,11 @@ def _parse_notify_dicts(items: List[Dict]) -> List[NotifyChannelConfig]: entry_prefix=item.get("entry_prefix", "changelog/entries"), default_root_cause_category=item.get("default_root_cause_category", "code_bug"), default_resolution_type=item.get("default_resolution_type"), + bot_token=item.get("bot_token"), + chat_id=item.get("chat_id"), + message_thread_id=item.get("message_thread_id"), + parse_mode=item.get("parse_mode", "Markdown"), + disable_notification=item.get("disable_notification", False), )) return configs diff --git a/flow_doctor/core/models.py b/flow_doctor/core/models.py index 2f4fa85..311200c 100644 --- a/flow_doctor/core/models.py +++ b/flow_doctor/core/models.py @@ -30,6 +30,7 @@ class ActionType(str, Enum): GITHUB_ISSUE = "github_issue" GITHUB_PR = "github_pr" S3_ALERT = "s3_alert" + TELEGRAM_ALERT = "telegram_alert" class ActionStatus(str, Enum): diff --git a/flow_doctor/notify/__init__.py b/flow_doctor/notify/__init__.py index 1214560..6c225f4 100644 --- a/flow_doctor/notify/__init__.py +++ b/flow_doctor/notify/__init__.py @@ -6,6 +6,7 @@ NotifierConfig, S3NotifierConfig, SlackNotifierConfig, + TelegramNotifierConfig, ) __all__ = [ @@ -14,4 +15,5 @@ "NotifierConfig", "S3NotifierConfig", "SlackNotifierConfig", + "TelegramNotifierConfig", ] diff --git a/flow_doctor/notify/configs.py b/flow_doctor/notify/configs.py index 4244864..a1f35db 100644 --- a/flow_doctor/notify/configs.py +++ b/flow_doctor/notify/configs.py @@ -21,6 +21,11 @@ from flow_doctor.core.config import NotifyChannelConfig +# Telegram chat_id may be an integer (typical) or a "@channelusername" +# string (public channels only). The typed config preserves the +# union; the legacy omnibus form already accepts ``Union[int, str]``. +TelegramChatId = Union[int, str] + class _NotifierConfigBase(BaseModel): model_config = ConfigDict(extra="ignore", validate_assignment=False) @@ -107,6 +112,42 @@ def to_channel_config(self) -> NotifyChannelConfig: ) +class TelegramNotifierConfig(_NotifierConfigBase): + """Recommended default notifier (since 0.5.0rc2). + + Setup: message ``@BotFather`` → ``/newbot`` → save the bot token. + Add the bot to your target chat. Look up the ``chat_id`` via + ``GET https://api.telegram.org/bot/getUpdates`` after sending + the bot a message. For forum-style supergroups, also note the + ``message_thread_id`` of the topic you want notifications routed to. + """ + + type: Literal["telegram"] = "telegram" + bot_token: Optional[str] = None + chat_id: Optional[TelegramChatId] = None + # Forum supergroups support per-topic routing — use this to fan out + # N flow-doctor flows (am, pm, alpha-engine, predictor, ...) into + # one chat without N bots. + message_thread_id: Optional[int] = None + # Telegram parse_mode for the message body. ``Markdown`` matches + # the legacy / minimal Markdown flavour; pass ``MarkdownV2`` for + # the strict escaping mode, ``HTML`` for HTML, or ``None`` for + # plain text. + parse_mode: Optional[str] = "Markdown" + # Silent delivery — Telegram still pushes but without a sound. + disable_notification: bool = False + + def to_channel_config(self) -> NotifyChannelConfig: + return NotifyChannelConfig( + type="telegram", + bot_token=self.bot_token, + chat_id=self.chat_id, + message_thread_id=self.message_thread_id, + parse_mode=self.parse_mode, + disable_notification=self.disable_notification, + ) + + # Discriminated union of all typed notifier configs. Consumers can # type-hint as ``NotifierConfig`` and Pydantic will pick the right # concrete model based on the ``type`` field. @@ -116,6 +157,7 @@ def to_channel_config(self) -> NotifyChannelConfig: EmailNotifierConfig, GitHubNotifierConfig, S3NotifierConfig, + TelegramNotifierConfig, ], Field(discriminator="type"), ] @@ -127,4 +169,5 @@ def to_channel_config(self) -> NotifyChannelConfig: "NotifierConfig", "S3NotifierConfig", "SlackNotifierConfig", + "TelegramNotifierConfig", ] diff --git a/flow_doctor/notify/telegram.py b/flow_doctor/notify/telegram.py new file mode 100644 index 0000000..edf35bd --- /dev/null +++ b/flow_doctor/notify/telegram.py @@ -0,0 +1,253 @@ +"""Telegram Bot API notification backend. + +Telegram is the recommended default for flow-doctor consumers — one bot +token gets you N routed channels via ``chat_id`` and optional forum +``message_thread_id``, mobile push is automatic, and credential rotation +is one ``@BotFather`` call. SMTP/SES email + Slack + GitHub stay as +alternates for consumers that need them. + +Setup recipe:: + + 1. Message @BotFather → /newbot → save the bot token. + 2. Add the bot to your target chat / channel. + 3. Get the chat_id: + - Personal chat: send a message to the bot, then + GET https://api.telegram.org/bot/getUpdates and read + result[].message.chat.id (positive integer). + - Group / channel: as above (negative integer, often starts with + -100 for supergroups + channels). + - Forum-style supergroup: also note message_thread_id for the + specific topic you want notifications routed to. + 4. Set FLOW_DOCTOR_TELEGRAM_BOT_TOKEN + FLOW_DOCTOR_TELEGRAM_CHAT_ID + in the env, or pass them inline via TelegramNotifierConfig. +""" + +from __future__ import annotations + +import json +import logging +import sys +from typing import Optional, Union +from urllib.error import URLError +from urllib.request import Request, urlopen + +from flow_doctor.core.models import Diagnosis, Report +from flow_doctor.notify.base import Notifier + +_logger = logging.getLogger("flow_doctor") + +# Telegram caps a single sendMessage payload at 4096 characters. The +# adapter truncates with a sentinel so the bot API never 400s on a +# long traceback / log capture. +_MAX_MESSAGE_LEN = 4096 +_TRUNCATION_SUFFIX = "\n…[truncated]" + + +class TelegramNotifier(Notifier): + """Send alerts via the Telegram Bot API. + + ``chat_id`` may be an integer (typical) or a ``@channelusername`` + string (public channels only). ``message_thread_id`` routes the + message to a specific topic in a forum-style supergroup, which is + the cleanest way to fan out N flow-doctor flows into one chat + without N bots. + """ + + _API_BASE = "https://api.telegram.org" + + def __init__( + self, + bot_token: str, + chat_id: Union[int, str], + *, + message_thread_id: Optional[int] = None, + parse_mode: Optional[str] = "Markdown", + disable_notification: bool = False, + ): + self.bot_token = bot_token + self.chat_id = chat_id + self.message_thread_id = message_thread_id + self.parse_mode = parse_mode + self.disable_notification = disable_notification + + # ----- public API ----------------------------------------------------- + + def send( + self, + report: Report, + flow_name: str, + diagnosis: Optional[Diagnosis] = None, + ) -> Optional[str]: + try: + text = self._format_message(report, flow_name, diagnosis) + text = _truncate(text) + payload = { + "chat_id": self.chat_id, + "text": text, + } + if self.parse_mode: + payload["parse_mode"] = self.parse_mode + if self.message_thread_id is not None: + payload["message_thread_id"] = self.message_thread_id + if self.disable_notification: + payload["disable_notification"] = True + + data = json.dumps(payload).encode("utf-8") + req = Request( + f"{self._API_BASE}/bot{self.bot_token}/sendMessage", + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urlopen(req, timeout=10) as resp: + if resp.status == 200: + body = resp.read().decode("utf-8", errors="replace") + try: + parsed = json.loads(body) + except json.JSONDecodeError: + parsed = {} + if parsed.get("ok"): + # Return a stable, non-secret target identifier so + # the action target can be stored without leaking + # the bot token. ``chat_id`` is enough to identify + # the destination on a per-install basis. + target = f"telegram:{self.chat_id}" + if self.message_thread_id is not None: + target += f":{self.message_thread_id}" + return target + description = parsed.get("description", "unknown") + _logger.critical( + "flow-doctor Telegram API returned ok=false: %s", + description, + ) + return None + _logger.critical( + "flow-doctor Telegram API returned HTTP %s", resp.status, + ) + return None + except URLError as e: + _logger.critical( + "flow-doctor Telegram notification failed (network): %s", + e, exc_info=True, + ) + print( + f"[flow-doctor] Telegram notification failed: {e}", + file=sys.stderr, + ) + return None + except Exception as e: + _logger.critical( + "flow-doctor Telegram notification failed: %s", + e, exc_info=True, + ) + print( + f"[flow-doctor] Telegram notification failed: {e}", + file=sys.stderr, + ) + return None + + def validate(self) -> None: + """Preflight: confirm the bot token is valid via ``getMe``. + + Bypassed when ``FLOW_DOCTOR_SKIP_PREFLIGHT`` is set (mirrors the + same env-var contract the other notifiers use for tests / offline + boot).""" + import os + + if os.environ.get("FLOW_DOCTOR_SKIP_PREFLIGHT"): + return None + try: + req = Request( + f"{self._API_BASE}/bot{self.bot_token}/getMe", + method="GET", + ) + with urlopen(req, timeout=10) as resp: + if resp.status != 200: + from flow_doctor.core.errors import ConfigError + + raise ConfigError( + f"Telegram bot token preflight failed: HTTP {resp.status}. " + "Verify the token at https://t.me/BotFather (/mybots → API Token)." + ) + body = resp.read().decode("utf-8", errors="replace") + parsed = json.loads(body) + if not parsed.get("ok"): + from flow_doctor.core.errors import ConfigError + + raise ConfigError( + "Telegram bot token preflight failed: " + f"{parsed.get('description', 'unknown error')}. " + "Verify the token at https://t.me/BotFather (/mybots → API Token)." + ) + except URLError as e: + from flow_doctor.core.errors import ConfigError + + raise ConfigError( + f"Telegram bot token preflight failed (network): {e}. " + "Check connectivity to api.telegram.org or set " + "FLOW_DOCTOR_SKIP_PREFLIGHT=1 to defer the check." + ) + + # ----- helpers -------------------------------------------------------- + + @staticmethod + def _format_message( + report: Report, + flow_name: str, + diagnosis: Optional[Diagnosis] = None, + ) -> str: + severity_emoji = { + "critical": "🔴", + "error": "🟠", + "warning": "🟡", + } + emoji = severity_emoji.get(report.severity, "⚪") + lines = [ + f"{emoji} *\\[{report.severity.upper()}\\] {flow_name}*", + "", + ] + if report.error_type: + lines.append(f"*Error:* `{report.error_type}: {report.error_message}`") + else: + lines.append(f"*Message:* {report.error_message}") + + if report.cascade_source: + lines.append( + f"_Likely caused by upstream `{report.cascade_source}` failure_" + ) + + if report.traceback: + tb_lines = report.traceback.strip().splitlines()[-5:] + lines.append("") + lines.append("```") + lines.extend(tb_lines) + lines.append("```") + + if diagnosis: + category_emoji = { + "TRANSIENT": "🔄", "DATA": "📊", "CODE": "🐛", + "CONFIG": "⚙️", "EXTERNAL": "🌐", "INFRA": "🏗️", + }.get(diagnosis.category, "❓") + + lines.append("") + lines.append( + f"*Diagnosis:* {category_emoji} {diagnosis.category} " + f"(confidence: {diagnosis.confidence:.0%})" + ) + lines.append(f"_{diagnosis.root_cause[:300]}_") + + if diagnosis.remediation: + lines.append(f"\n*Remediation:* {diagnosis.remediation[:300]}") + + lines.append(f"\n_Report ID: {report.id}_") + return "\n".join(lines) + + +def _truncate(text: str) -> str: + if len(text) <= _MAX_MESSAGE_LEN: + return text + keep = _MAX_MESSAGE_LEN - len(_TRUNCATION_SUFFIX) + return text[:keep] + _TRUNCATION_SUFFIX + + +__all__ = ["TelegramNotifier"] diff --git a/pyproject.toml b/pyproject.toml index 0670edf..7eba07e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "flow-doctor" -version = "0.5.0rc1" +version = "0.5.0rc2" description = "Pipeline error handler: capture, deduplicate, diagnose, and auto-fix failures." readme = "README.md" requires-python = ">=3.9" diff --git a/tests/test_builder.py b/tests/test_builder.py index 537f5ee..9457867 100644 --- a/tests/test_builder.py +++ b/tests/test_builder.py @@ -260,4 +260,4 @@ def test_notifier_config_union_rejects_unknown_type(): adapter = TypeAdapter(NotifierConfig) with pytest.raises(ValidationError): - adapter.validate_python({"type": "telegram", "webhook_url": "x"}) + adapter.validate_python({"type": "pagerduty", "service_key": "x"}) diff --git a/tests/test_telegram_notifier.py b/tests/test_telegram_notifier.py new file mode 100644 index 0000000..586095f --- /dev/null +++ b/tests/test_telegram_notifier.py @@ -0,0 +1,338 @@ +"""Tests for the Telegram Bot API notifier. + +Covers the typed config, builder integration, init wiring (env-var +fallbacks + missing-field errors), payload shape, target-id return, +preflight handling, and length truncation. +""" + +from __future__ import annotations + +import io +import json +import tempfile +from unittest.mock import MagicMock, patch + +import pytest + +from flow_doctor import ( + FlowDoctor, + TelegramNotifierConfig, + init, +) +from flow_doctor.core.config import NotifyChannelConfig +from flow_doctor.core.errors import ConfigError +from flow_doctor.core.models import ActionType, Report, Severity +from flow_doctor.notify.telegram import ( + _MAX_MESSAGE_LEN, + TelegramNotifier, + _truncate, +) + + +# --------------------------------------------------------------------------- +# TelegramNotifierConfig (typed) +# --------------------------------------------------------------------------- + + +def test_telegram_config_to_channel_config_round_trip(): + cfg = TelegramNotifierConfig( + bot_token="123:abc", + chat_id=-1001234567890, + message_thread_id=42, + parse_mode="MarkdownV2", + disable_notification=True, + ) + legacy = cfg.to_channel_config() + assert isinstance(legacy, NotifyChannelConfig) + assert legacy.type == "telegram" + assert legacy.bot_token == "123:abc" + assert legacy.chat_id == -1001234567890 + assert legacy.message_thread_id == 42 + assert legacy.parse_mode == "MarkdownV2" + assert legacy.disable_notification is True + + +def test_telegram_config_accepts_string_chat_id_for_public_channels(): + cfg = TelegramNotifierConfig(bot_token="t", chat_id="@my_public_channel") + assert cfg.chat_id == "@my_public_channel" + + +def test_telegram_config_default_parse_mode_is_markdown(): + cfg = TelegramNotifierConfig(bot_token="t", chat_id=1) + assert cfg.parse_mode == "Markdown" + assert cfg.disable_notification is False + assert cfg.message_thread_id is None + + +# --------------------------------------------------------------------------- +# Builder integration +# --------------------------------------------------------------------------- + + +def test_builder_accepts_telegram_notifier_config(): + with tempfile.NamedTemporaryFile(suffix=".db") as f: + fd = ( + FlowDoctor.builder("morning-signal") + .with_store(path=f.name) + .add_notifier( + TelegramNotifierConfig(bot_token="123:abc", chat_id=-100) + ) + .build() + ) + assert len(fd.config.notify) == 1 + assert fd.config.notify[0].type == "telegram" + assert fd.config.notify[0].bot_token == "123:abc" + + +# --------------------------------------------------------------------------- +# Init wiring — missing fields surface as ConfigError +# --------------------------------------------------------------------------- + + +def test_init_rejects_telegram_without_bot_token_or_chat_id(monkeypatch): + monkeypatch.delenv("FLOW_DOCTOR_TELEGRAM_BOT_TOKEN", raising=False) + monkeypatch.delenv("FLOW_DOCTOR_TELEGRAM_CHAT_ID", raising=False) + monkeypatch.delenv("TELEGRAM_BOT_TOKEN", raising=False) + monkeypatch.delenv("TELEGRAM_CHAT_ID", raising=False) + with tempfile.NamedTemporaryFile(suffix=".db") as f: + with pytest.raises(ConfigError) as exc: + init( + store={"type": "sqlite", "path": f.name}, + notify=[{"type": "telegram"}], + ) + msg = str(exc.value) + assert "bot_token" in msg + assert "chat_id" in msg + assert "@BotFather" in msg + + +def test_init_picks_telegram_creds_from_env(monkeypatch): + """Numeric env chat_id coerces to int so the bot API receives the + correct JSON type (negative ints for supergroups / channels).""" + monkeypatch.setenv("FLOW_DOCTOR_TELEGRAM_BOT_TOKEN", "123:env-token") + monkeypatch.setenv("FLOW_DOCTOR_TELEGRAM_CHAT_ID", "-1001234567890") + with tempfile.NamedTemporaryFile(suffix=".db") as f: + fd = init( + store={"type": "sqlite", "path": f.name}, + notify=[{"type": "telegram"}], + ) + assert len(fd._notifiers) == 1 + notifier = fd._notifiers[0] + assert isinstance(notifier, TelegramNotifier) + assert notifier.bot_token == "123:env-token" + assert notifier.chat_id == -1001234567890 # coerced to int + + +def test_init_keeps_at_channel_chat_id_as_string(monkeypatch): + monkeypatch.setenv("FLOW_DOCTOR_TELEGRAM_BOT_TOKEN", "t") + monkeypatch.setenv("FLOW_DOCTOR_TELEGRAM_CHAT_ID", "@my_channel") + with tempfile.NamedTemporaryFile(suffix=".db") as f: + fd = init( + store={"type": "sqlite", "path": f.name}, + notify=[{"type": "telegram"}], + ) + assert fd._notifiers[0].chat_id == "@my_channel" + + +# --------------------------------------------------------------------------- +# Payload shape +# --------------------------------------------------------------------------- + + +def _make_report(**overrides) -> Report: + defaults = dict( + flow_name="morning-signal", + error_message="boom", + severity=Severity.ERROR.value, + error_type="ValueError", + traceback=None, + ) + defaults.update(overrides) + return Report(**defaults) + + +def _fake_urlopen_response(body: dict, status: int = 200): + """Build a context-manager-shaped fake for urlopen().""" + resp = MagicMock() + resp.status = status + resp.read.return_value = json.dumps(body).encode("utf-8") + resp.__enter__ = lambda self: self + resp.__exit__ = lambda self, *a: False + return resp + + +def test_send_posts_to_correct_bot_api_url(): + notifier = TelegramNotifier(bot_token="123:abc", chat_id=-100) + with patch("flow_doctor.notify.telegram.urlopen") as mock_urlopen: + mock_urlopen.return_value = _fake_urlopen_response({"ok": True}) + target = notifier.send(_make_report(), "morning-signal") + assert target == "telegram:-100" + req = mock_urlopen.call_args[0][0] + assert req.full_url == "https://api.telegram.org/bot123:abc/sendMessage" + assert req.get_method() == "POST" + + +def test_send_payload_includes_chat_id_text_and_parse_mode(): + notifier = TelegramNotifier(bot_token="t", chat_id=42) + with patch("flow_doctor.notify.telegram.urlopen") as mock_urlopen: + mock_urlopen.return_value = _fake_urlopen_response({"ok": True}) + notifier.send(_make_report(), "morning-signal") + req = mock_urlopen.call_args[0][0] + payload = json.loads(req.data.decode("utf-8")) + assert payload["chat_id"] == 42 + assert payload["parse_mode"] == "Markdown" + assert "boom" in payload["text"] + assert "morning-signal" in payload["text"] + + +def test_send_payload_includes_message_thread_id_when_set(): + notifier = TelegramNotifier( + bot_token="t", chat_id=1, message_thread_id=99 + ) + with patch("flow_doctor.notify.telegram.urlopen") as mock_urlopen: + mock_urlopen.return_value = _fake_urlopen_response({"ok": True}) + target = notifier.send(_make_report(), "morning-signal") + payload = json.loads(mock_urlopen.call_args[0][0].data.decode("utf-8")) + assert payload["message_thread_id"] == 99 + # message_thread_id appears in the target id so operators can see + # which forum topic an alert landed in. + assert target == "telegram:1:99" + + +def test_send_payload_omits_message_thread_id_when_unset(): + notifier = TelegramNotifier(bot_token="t", chat_id=1) + with patch("flow_doctor.notify.telegram.urlopen") as mock_urlopen: + mock_urlopen.return_value = _fake_urlopen_response({"ok": True}) + notifier.send(_make_report(), "morning-signal") + payload = json.loads(mock_urlopen.call_args[0][0].data.decode("utf-8")) + assert "message_thread_id" not in payload + + +def test_send_disable_notification_flag_passes_through(): + notifier = TelegramNotifier( + bot_token="t", chat_id=1, disable_notification=True + ) + with patch("flow_doctor.notify.telegram.urlopen") as mock_urlopen: + mock_urlopen.return_value = _fake_urlopen_response({"ok": True}) + notifier.send(_make_report(), "morning-signal") + payload = json.loads(mock_urlopen.call_args[0][0].data.decode("utf-8")) + assert payload["disable_notification"] is True + + +def test_send_with_parse_mode_none_omits_field(): + notifier = TelegramNotifier(bot_token="t", chat_id=1, parse_mode=None) + with patch("flow_doctor.notify.telegram.urlopen") as mock_urlopen: + mock_urlopen.return_value = _fake_urlopen_response({"ok": True}) + notifier.send(_make_report(), "morning-signal") + payload = json.loads(mock_urlopen.call_args[0][0].data.decode("utf-8")) + assert "parse_mode" not in payload + + +# --------------------------------------------------------------------------- +# Error paths — must NEVER raise upward +# --------------------------------------------------------------------------- + + +def test_send_returns_none_when_api_returns_ok_false(): + notifier = TelegramNotifier(bot_token="t", chat_id=1) + with patch("flow_doctor.notify.telegram.urlopen") as mock_urlopen: + mock_urlopen.return_value = _fake_urlopen_response( + {"ok": False, "description": "Forbidden: bot was kicked"} + ) + target = notifier.send(_make_report(), "morning-signal") + assert target is None + + +def test_send_returns_none_when_urlopen_raises(): + notifier = TelegramNotifier(bot_token="t", chat_id=1) + with patch( + "flow_doctor.notify.telegram.urlopen", + side_effect=ConnectionError("network down"), + ): + target = notifier.send(_make_report(), "morning-signal") + assert target is None + + +# --------------------------------------------------------------------------- +# Truncation +# --------------------------------------------------------------------------- + + +def test_truncate_passes_short_text_unchanged(): + assert _truncate("hello") == "hello" + + +def test_truncate_caps_long_text_at_telegram_limit(): + long = "x" * (_MAX_MESSAGE_LEN + 500) + out = _truncate(long) + assert len(out) == _MAX_MESSAGE_LEN + assert out.endswith("[truncated]") + + +# --------------------------------------------------------------------------- +# Action type dispatch +# --------------------------------------------------------------------------- + + +def test_send_notifications_dispatches_to_telegram_action_type(monkeypatch): + """Verify the dispatch loop in _send_notifications correctly maps + a TelegramNotifier instance to ActionType.TELEGRAM_ALERT so the + persisted action row reflects the channel that actually fired.""" + import sqlite3 + + monkeypatch.setenv("FLOW_DOCTOR_TELEGRAM_BOT_TOKEN", "t") + monkeypatch.setenv("FLOW_DOCTOR_TELEGRAM_CHAT_ID", "1") + + with tempfile.NamedTemporaryFile(suffix=".db") as f: + fd = init( + store={"type": "sqlite", "path": f.name}, + notify=[{"type": "telegram"}], + ) + + with patch("flow_doctor.notify.telegram.urlopen") as mock_urlopen: + mock_urlopen.return_value = _fake_urlopen_response({"ok": True}) + report_id = fd.report(ValueError("boom")) + + assert report_id is not None + # Inspect the actions table directly — the public storage API + # doesn't expose a per-report fetch but the dispatch decision + # lives in core/client.py and is the thing we want to verify. + with sqlite3.connect(f.name) as conn: + rows = conn.execute( + "SELECT action_type, target FROM actions WHERE report_id = ?", + (report_id,), + ).fetchall() + action_types = {row[0] for row in rows} + assert ActionType.TELEGRAM_ALERT.value in action_types + # And the persisted target should be the non-secret identifier + # the notifier returns (telegram:chat_id), never the bot token. + targets = {row[1] for row in rows if row[1] is not None} + assert any("telegram:" in t for t in targets) + assert not any("t" == t for t in targets) # never the raw token + + +# --------------------------------------------------------------------------- +# Preflight — bypassed by FLOW_DOCTOR_SKIP_PREFLIGHT +# --------------------------------------------------------------------------- + + +def test_validate_noops_when_skip_preflight_env_set(monkeypatch): + monkeypatch.setenv("FLOW_DOCTOR_SKIP_PREFLIGHT", "1") + notifier = TelegramNotifier(bot_token="t", chat_id=1) + # Should not touch the network. + with patch("flow_doctor.notify.telegram.urlopen") as mock_urlopen: + notifier.validate() + assert mock_urlopen.call_count == 0 + + +def test_validate_raises_on_unauthorized_bot_token(monkeypatch): + monkeypatch.delenv("FLOW_DOCTOR_SKIP_PREFLIGHT", raising=False) + notifier = TelegramNotifier(bot_token="bad", chat_id=1) + with patch("flow_doctor.notify.telegram.urlopen") as mock_urlopen: + mock_urlopen.return_value = _fake_urlopen_response( + {"ok": False, "description": "Unauthorized"}, status=200 + ) + with pytest.raises(ConfigError) as exc: + notifier.validate() + assert "Unauthorized" in str(exc.value) + assert "BotFather" in str(exc.value)