Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ scripts/

# Internal planning docs (not for public repo)
private/
.coverage
htmlcov/
56 changes: 56 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,62 @@

## Unreleased

## 0.5.0rc3 (2026-05-13)

Cleanup pass before the morning-signal cutover.

### Added

- **`TelegramNotifier.send_raw(text, *, parse_mode=, disable_notification=)`** —
adjacent flow-doctor subsystems can now POST arbitrary text through
the same bot + chat + thread + Markdown routing the structured
`send()` path uses, without conforming to the `Report` shape.
Returns the standard non-secret `"telegram:<chat_id>[:<thread>]"`
target identifier (or `None` on failure — never raises).
``parse_mode=None`` / ``disable_notification=False`` are honoured as
explicit overrides via a sentinel default; pass nothing to inherit
the instance defaults.
- **`RemediationConfig.telegram_bot_token` + `telegram_chat_id` +
`telegram_message_thread_id`** — first-class Telegram fields for the
remediation pipeline. `_init_remediation` builds a real
`TelegramNotifier` from these (with the `FLOW_DOCTOR_TELEGRAM_*`
env-var fallback chain) and hands it to `RemediationExecutor`.

### Changed

- **`RemediationExecutor`** now accepts a `telegram_notifier:
TelegramNotifier | None` kwarg in addition to the legacy
`telegram_webhook_url`. When both are supplied, the notifier wins.
Remediation pings going through it pick up Markdown rendering,
threading, target-id audit (`actions.target` row), and the same
`validate()` preflight as the rest of the notifier surface.
- `examples/smoke_test.py` rewritten to lead with
`FlowDoctor.builder()` + `TelegramNotifierConfig` (instead of the
now-`@deprecated` `flow_doctor.init()`). Adds smoke checks for
`flow_doctor.context()` propagation, `report_async()` from an
asyncio context, and `flow_doctor.otel.report_to_otel_span_event`
serialization. All offline (FLOW_DOCTOR_SKIP_PREFLIGHT=1 + fake
creds + sqlite at temp path).
- `[tool.coverage.run]` section added to `pyproject.toml`. Use the
canonical `python -m coverage run -m pytest && python -m coverage
report` instead of `pytest --cov=` — the latter misreports
module-level statement coverage under editable installs because
pytest-cov instruments after the import has already happened.

### Deprecated

- **`RemediationConfig.telegram_webhook_url`** is now soft-deprecated.
Kept for 0.4.x yaml back-compat through the 0.5.x series; consumers
should migrate to `telegram_bot_token` + `telegram_chat_id` (with
optional `telegram_message_thread_id`). Will be removed in 0.6.0.

### Coverage

Suite: 393/393 pass (376 prior + 17 new for remediation-Telegram
migration). Project-wide coverage 84% (canonical measurement; the
pytest-cov number that previously read 67% was a tool quirk, not a
real regression).

## 0.5.0rc2 (2026-05-13)

Adds Telegram as the **recommended default notifier** for new consumers.
Expand Down
146 changes: 121 additions & 25 deletions examples/smoke_test.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,79 @@
"""
Smoke test for Flow Doctor Phase 1.
Run from the flow-doctor repo root:
Smoke test for Flow Doctor (0.5.0rc+ surface).

Exercises the recommended FlowDoctor.builder() entry point, the typed
TelegramNotifierConfig (without actually firing — no real bot token
required), the flow_doctor.context() contextvars layer, the
report_async() coroutine, and the historical report-handling features
(guard / monitor / dedup / capture_logs / secret scrubbing /
never-crash-the-caller). All sqlite, no network.

Run from the flow-doctor repo root::

python examples/smoke_test.py
"""
import flow_doctor

from __future__ import annotations

import asyncio
import logging
import os
import tempfile

import flow_doctor
from flow_doctor import (
EmailNotifierConfig,
FlowDoctor,
FlowDoctorProtocol,
TelegramNotifierConfig,
)

db_path = os.path.join(tempfile.gettempdir(), "fd_smoke_test.db")
# Clean up from prior runs
if os.path.exists(db_path):
os.remove(db_path)

fd = flow_doctor.init(
flow_name="test-flow",
repo="your-org/your-repo",
owner="@your-username",
store=f"sqlite:///{db_path}",
# Skip notifier preflight network calls — this smoke test runs offline
# and the fake credentials below would otherwise trip /getMe and the
# Gmail SMTP banner. Downstream consumers should NOT set this in prod.
os.environ["FLOW_DOCTOR_SKIP_PREFLIGHT"] = "1"

# ---------------------------------------------------------------------------
# Build a FlowDoctor with the recommended (0.5.0rc+) builder API.
# Telegram is the recommended default; we wire fake creds since the
# smoke test stays offline.
# ---------------------------------------------------------------------------
fd: FlowDoctorProtocol = (
FlowDoctor.builder("smoke-test")
.with_repo("your-org/your-repo", owner="@your-username")
.with_store(path=db_path)
.with_dedup(cooldown_minutes=60)
.add_notifier(
TelegramNotifierConfig(
bot_token="123:fake-smoke-token",
chat_id=-1001234567890,
# message_thread_id=42, # optional: forum-topic routing
)
)
# A second notifier to demonstrate the discriminated union — also
# fake creds since we stay offline.
.add_notifier(
EmailNotifierConfig(
sender="alerts@example.com",
recipients=["oncall@example.com"],
smtp_password="fake-app-password",
)
)
.build()
)

print("=" * 60)
print("FLOW DOCTOR PHASE 1 — SMOKE TEST")
print("FLOW DOCTOR — SMOKE TEST (0.5.0rc+ surface)")
print("=" * 60)
print(f"Built via: FlowDoctor.builder() → satisfies FlowDoctorProtocol: "
f"{isinstance(fd, FlowDoctorProtocol)}")
print(f"Notifiers: {len(fd.config.notify)} "
f"({', '.join(n.type for n in fd.config.notify)})")


# --- Test 1: Exception report ---
print("\n--- Test 1: Exception report ---")
Expand All @@ -33,6 +84,7 @@
print(f" Report ID: {report_id}")
assert report_id is not None, "Expected a report ID"


# --- Test 2: guard() re-raises ---
print("\n--- Test 2: guard() context manager ---")
try:
Expand All @@ -43,6 +95,7 @@
else:
raise AssertionError("guard() should have re-raised")


# --- Test 3: monitor() decorator ---
print("\n--- Test 3: @monitor decorator ---")

Expand All @@ -57,6 +110,7 @@ def failing_function():
else:
raise AssertionError("monitor() should have re-raised")


# --- Test 4: Dedup suppression ---
print("\n--- Test 4: Dedup (5 identical errors → 1 report) ---")
dedup_results = []
Expand All @@ -73,13 +127,39 @@ def failing_function():
dedup_count = sum(1 for r in dedup_results if r is None)
print(f" → {new_count} new, {dedup_count} deduped")

# --- Test 5: Non-exception warning ---
print("\n--- Test 5: Non-exception warning ---")
report_id = fd.report("Scanner returned 0 candidates", severity="warning")
print(f" Warning report ID: {report_id}")

# --- Test 6: capture_logs() ---
print("\n--- Test 6: Log capture ---")
# --- Test 5: flow_doctor.context() contextvars propagation ---
print("\n--- Test 5: flow_doctor.context() ambient propagation ---")
with flow_doctor.context(flow_name="smoke-test-am", stage="ingest", run_id="run-42"):
try:
raise RuntimeError("ingest stage failed")
except Exception as e:
report_id = fd.report(e)

# Verify the context landed on the persisted report.
recent = fd.history(limit=1)[0]
print(f" Report flow_name: {recent.context.get('flow_name')}")
print(f" Report stage: {recent.context.get('stage')}")
print(f" Report run_id: {recent.context.get('run_id')}")
assert recent.context.get("stage") == "ingest"
assert recent.context.get("run_id") == "run-42"


# --- Test 6: report_async() ---
print("\n--- Test 6: report_async() from an asyncio context ---")
async def async_pipeline():
try:
raise TimeoutError("async pipeline tripped a deadline")
except Exception as e:
return await fd.report_async(e)

async_report_id = asyncio.run(async_pipeline())
print(f" Async report ID: {async_report_id}")
assert async_report_id is not None


# --- Test 7: capture_logs() ---
print("\n--- Test 7: Log capture ---")
logger = logging.getLogger("test.scanner")
with fd.capture_logs(level=logging.INFO):
logger.info("Starting scanner with 900 tickers")
Expand All @@ -90,8 +170,9 @@ def failing_function():
report_id = fd.report(e)
print(f" Report with logs: {report_id}")

# --- Test 7: Secret scrubbing ---
print("\n--- Test 7: Secret scrubbing ---")

# --- Test 8: Secret scrubbing ---
print("\n--- Test 8: Secret scrubbing ---")
try:
api_key = "AKIAIOSFODNN7EXAMPLE"
raise RuntimeError(f"S3 auth failed with key {api_key}")
Expand All @@ -105,12 +186,13 @@ def failing_function():
)
print(f" Scrubbed report ID: {report_id}")

# --- Test 8: report() never crashes ---
print("\n--- Test 8: report() never crashes caller ---")
# Create an fd with a broken store path
broken_fd = flow_doctor.init(
flow_name="broken-flow",
store="sqlite:////nonexistent/impossible/path/db.sqlite",

# --- Test 9: report() never crashes its caller ---
print("\n--- Test 9: report() never crashes caller ---")
broken_fd = (
FlowDoctor.builder("broken-flow")
.with_store(path="/nonexistent/impossible/path/db.sqlite")
.build(strict=False) # degraded mode — store init will fail loudly
)
try:
raise RuntimeError("this should not crash")
Expand All @@ -119,11 +201,25 @@ def failing_function():
print(f" Broken store report result: {result} (None is OK)")
print(" Caller survived — report() did not propagate")


# --- Test 10: OTel serialization ---
print("\n--- Test 10: OTel SpanEvent serialization ---")
from flow_doctor.otel import report_to_otel_span_event

span_event = report_to_otel_span_event(recent) # from test 5
print(f" resource.service.name: {span_event['resource']['service.name']}")
print(f" event.name: {span_event['name']}")
print(f" severity_text: {span_event['severity_text']}")
print(f" attributes.context.run_id: "
f"{span_event['attributes'].get('context.run_id')}")


# --- History ---
print("\n--- Report History ---")
for r in fd.history(limit=20):
dedup_str = f" (dedup x{r.dedup_count})" if r.dedup_count > 1 else ""
print(f" [{r.severity:8s}] {r.error_type or 'msg'}: {r.error_message[:60]}{dedup_str}")
print(f" [{r.severity:8s}] {r.error_type or 'msg'}: "
f"{r.error_message[:60]}{dedup_str}")

print("\n" + "=" * 60)
print("ALL SMOKE TESTS PASSED")
Expand Down
2 changes: 1 addition & 1 deletion flow_doctor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@
"current_context",
"init",
]
__version__ = "0.5.0rc2"
__version__ = "0.5.0rc3"
26 changes: 26 additions & 0 deletions flow_doctor/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,35 @@ def _init_remediation(self, config: FlowDoctorConfig) -> None:
gate_config.market_close_hour = 0

self._decision_gate = DecisionGate(config=gate_config, store=self._store)

# Preferred Telegram path (since 0.5.0rc3): build a real
# TelegramNotifier from the config and hand it to the
# executor. Falls back to the legacy webhook URL when only
# that's configured (for 0.4.x yaml back-compat).
telegram_notifier = None
tg_token = (
config.remediation.telegram_bot_token
or _env_fallback("telegram_bot_token")
)
tg_chat = config.remediation.telegram_chat_id
if tg_chat is None:
env_chat = _env_fallback("telegram_chat_id")
if env_chat is not None and env_chat.lstrip("-").isdigit():
tg_chat = int(env_chat)
elif env_chat is not None:
tg_chat = env_chat
if tg_token and tg_chat not in (None, ""):
from flow_doctor.notify.telegram import TelegramNotifier
telegram_notifier = TelegramNotifier(
bot_token=tg_token,
chat_id=tg_chat,
message_thread_id=config.remediation.telegram_message_thread_id,
)

self._remediation_executor = RemediationExecutor(
dry_run=config.remediation.dry_run,
store=self._store,
telegram_notifier=telegram_notifier,
telegram_webhook_url=config.remediation.telegram_webhook_url,
)
except Exception as e:
Expand Down
18 changes: 18 additions & 0 deletions flow_doctor/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,21 @@ class RemediationConfig(_ConfigModel):
max_auto_remediations_per_day: int = 2
max_auto_remediations_per_failure: int = 2
market_hours_lockout: bool = True
# Telegram routing for remediation auto-action pings. Preferred
# path since 0.5.0rc3 — the executor will build a real
# ``TelegramNotifier`` from these fields and route every remediation
# action's success / failure through it, picking up bot-token /
# chat-id / threading / Markdown rendering / target-id audit for
# free. Leave unset to skip Telegram notification entirely.
telegram_bot_token: Optional[str] = None
telegram_chat_id: Optional[Union[int, str]] = None
telegram_message_thread_id: Optional[int] = None
# Legacy webhook URL — kept for back-compat with 0.4.x configs.
# Was a misnomer (Telegram doesn't have user-installable webhooks
# the way Slack does), and the executor's bespoke POST format
# didn't compose with the rest of the notifier surface. New
# consumers should use ``telegram_bot_token`` + ``telegram_chat_id``
# above; this field will be removed in 0.6.0.
telegram_webhook_url: Optional[str] = None
s3_audit_bucket: Optional[str] = None
s3_audit_prefix: str = "flow-doctor/audit"
Expand Down Expand Up @@ -456,6 +471,9 @@ def load_config(
_defaults.max_auto_remediations_per_failure)),
market_hours_lockout=rem_raw.get(
"market_hours_lockout", _defaults.market_hours_lockout),
telegram_bot_token=rem_raw.get("telegram_bot_token"),
telegram_chat_id=rem_raw.get("telegram_chat_id"),
telegram_message_thread_id=rem_raw.get("telegram_message_thread_id"),
telegram_webhook_url=rem_raw.get("telegram_webhook_url"),
s3_audit_bucket=rem_raw.get("s3_audit_bucket"),
s3_audit_prefix=rem_raw.get(
Expand Down
Loading
Loading