Skip to content

cipher813/flow-doctor

Repository files navigation

Flow Doctor

Python License: MIT Tests PyPI Typed

Pipeline error handler for Python. Captures exceptions, deduplicates failure signatures, optionally diagnoses root causes with LLMs, routes alerts (Telegram / Slack / email / GitHub / S3 / custom), and can generate fix PRs.

Typed, IDE-discoverable configuration. Pydantic v2 models + a fluent FlowDoctor.builder() mean you don't need a yaml file — and when you have one, the schema is enforced at load time.

Fail-loud by default. Configuration errors — missing tokens, unresolved ${VAR} references, misconfigured notifiers — raise ConfigError at construction time instead of silently degrading. A silently-degraded error monitor defeats the purpose.

from flow_doctor import FlowDoctor, TelegramNotifierConfig

fd = (
    FlowDoctor.builder("morning-signal")
    .add_notifier(TelegramNotifierConfig())  # creds from FLOW_DOCTOR_TELEGRAM_*
    .with_dedup(cooldown_minutes=60)
    .build()
)

with fd.guard():
    run_pipeline()  # exceptions captured, deduplicated, routed, re-raised

How It Works

Exception → Capture → Dedup → Diagnose (LLM, opt) → Notify (Telegram/...) → Fix PR (opt)
  1. Capture — exception, traceback, logs, contextvars (flow_name, stage)
  2. Dedup — same error signature within cooldown window is suppressed (normalized to ignore reqIds, UUIDs, contract symbols, etc.)
  3. Cascade — if a declared upstream dependency also failed, tag it and skip diagnosis
  4. Diagnose (opt) — check the knowledge base (free), then call Claude if rate limit allows
  5. Notify — Telegram / Slack / email / GitHub issue / S3 changelog (rate-limited with daily digest fallback)
  6. Fix (opt) — human adds flow-doctor:fix label on a filed issue, triggering automated fix PR generation

Installation

While 0.5.0 is in the rc cycle:

pip install --pre flow-doctor                          # core only
pip install --pre "flow-doctor[diagnosis]"             # + LLM diagnosis (anthropic SDK)
pip install --pre "flow-doctor[diagnosis,remediation]" # + auto-remediation (boto3)
pip install --pre "flow-doctor[all]"                   # everything

The --pre flag is required while the version tag has an rc suffix; drop it once 0.5.0 final ships. Pinning flow-doctor==0.5.0rc2 works regardless.

Quick Start — FlowDoctor.builder() (recommended)

The builder is typed, IDE-discoverable, and works without a yaml file. Notifier credentials fall through to FLOW_DOCTOR_* env vars when not passed inline.

from flow_doctor import FlowDoctor, TelegramNotifierConfig

fd = (
    FlowDoctor.builder("morning-signal")
    .add_notifier(TelegramNotifierConfig())  # bot_token + chat_id from env
    .with_dedup(cooldown_minutes=60)
    .build()
)

Three idiomatic ways to use the resulting FlowDoctor in your pipeline:

# 1. Context manager — exception is captured + re-raised
with fd.guard():
    run_pipeline()

# 2. Decorator
@fd.monitor
def lambda_handler(event, context):
    run_pipeline()

# 3. Direct reporting — never crashes the caller
try:
    run_pipeline()
except Exception as e:
    fd.report(e, context={"date": "2026-05-13"})

Async pipelines:

async def run():
    try:
        await pipeline()
    except Exception as exc:
        await fd.report_async(exc)

Contextvars propagate automatically. Stamp flow_name / stage once and any fd.report() inside picks them up — no need to thread context=... through every layer:

import flow_doctor

with flow_doctor.context(flow_name="morning-signal", stage="rank"):
    run_rank()  # any fd.report() inside auto-records flow_name + stage

Notifier configs (typed)

Five first-class notifiers ship today, each with its own Pydantic config exposed via the discriminated union NotifierConfig:

Config Channel Setup
TelegramNotifierConfig Telegram Bot API @BotFather/newbot → bot token + chat_id. Recommended default.
SlackNotifierConfig Slack incoming webhook Slack app → incoming webhook URL
EmailNotifierConfig SMTP (Gmail / any) sender + recipients + SMTP password (Gmail App Password works)
GitHubNotifierConfig GitHub issues PAT with Issues: write on the target repo
S3NotifierConfig System-wide changelog corpus Bucket + subsystem; IAM allows s3:PutObject on the prefix

Mix freely:

from flow_doctor import (
    EmailNotifierConfig,
    FlowDoctor,
    GitHubNotifierConfig,
    TelegramNotifierConfig,
)

fd = (
    FlowDoctor.builder("alpha-engine-predictor")
    .add_notifier(TelegramNotifierConfig(message_thread_id=42))   # forum topic
    .add_notifier(GitHubNotifierConfig(repo="me/alpha-engine"))   # token from env
    .add_notifier(EmailNotifierConfig(sender="me@x.com",
                                       recipients=["me@x.com"]))
    .build()
)

Telegram — recommended default

Why Telegram leads the examples:

  • Two-minute setup. Message @BotFather/newbot → save the token. Then DM your bot, then GET https://api.telegram.org/bot<TOKEN>/getUpdates and grab result[0].message.chat.id.
  • Per-flow routing for free. One bot, N channels via chat_id, or N forum-topic threads via message_thread_id in a supergroup.
  • Mobile push is automatic. No "did the email go to spam" mystery.
  • Token rotation is one @BotFather call. No app password / SES verified identity / Slack workspace admin.
export FLOW_DOCTOR_TELEGRAM_BOT_TOKEN=1234567890:ABC...
export FLOW_DOCTOR_TELEGRAM_CHAT_ID=-1001234567890  # negative for supergroups/channels
FlowDoctor.builder("pipeline").add_notifier(TelegramNotifierConfig()).build()

Testing — flow_doctor.testing pytest plugin

The plugin is auto-discovered (registered via [project.entry-points.pytest11]). Downstream tests get a flow_doctor_recorder fixture with no imports required.

def test_pipeline_reports_db_errors(flow_doctor_recorder):
    run_pipeline_that_should_fail(flow_doctor_recorder)
    assert len(flow_doctor_recorder.reports) == 1
    assert flow_doctor_recorder.last.exc_type == "DBError"
    assert flow_doctor_recorder.last.ambient_context["stage"] == "ingest"

flow_doctor_recorder is a RecordingFlowDoctor — it implements FlowDoctorProtocol, so wherever production code expects a FlowDoctorProtocol you can swap it in directly. It also snapshots any active flow_doctor.context() scope onto each captured incident's ambient_context field.

Helpers on the recorder: .clear(), .last, .of_type(exc_name), plus full async support via await recorder.report_async(...).

Type-checked contract — FlowDoctorProtocol

from flow_doctor import FlowDoctorProtocol

def make_pipeline(fd: FlowDoctorProtocol):
    with fd.guard():
        ...

@runtime_checkable, so isinstance(fd, FlowDoctorProtocol) works at runtime as well as at type-check time. Combined with the shipped py.typed marker, mypy --strict and pyright treat flow-doctor's annotations as authoritative.

OpenTelemetry — flow_doctor.otel

Pure-Python adapter that serializes a Report into an OTel SpanEvent-shaped dict — ready to ship to a collector via your existing OTLP exporter today. No opentelemetry-* dependency in this release; the bundled OTLP exporter notifier is on the 0.6.0 roadmap.

from flow_doctor.otel import report_to_otel_span_event

span_event = report_to_otel_span_event(report)
# {
#   "resource": {"service.name": "<flow_name>"},
#   "name": "<stage>",
#   "time_unix_nano": ...,
#   "severity_text": "ERROR", "severity_number": 17,
#   "attributes": {
#     "exception.type": "ValueError",
#     "exception.message": "...",
#     "exception.stacktrace": "...",
#     "flow_doctor.error_signature": "...",
#     "context.run_id": "...",
#   },
# }

Configuration

Inline kwargs / builder (recommended)

See the Quick Start. No yaml required.

YAML file (legacy / multi-environment)

flow_name: my-pipeline
repo: owner/repo

notify:
  - type: telegram
    bot_token: ${FLOW_DOCTOR_TELEGRAM_BOT_TOKEN}
    chat_id: -1001234567890
    message_thread_id: 42      # optional: forum-topic routing
  - type: github
    repo: owner/repo
  - type: s3
    bucket: my-changelog-bucket
    subsystem: predictor       # one of the documented vocab values

store:
  type: sqlite
  path: flow_doctor.db

diagnosis:
  enabled: true
  model: claude-sonnet-4-6-20250514
  api_key: ${ANTHROPIC_API_KEY}
  timeout_seconds: 30
  max_daily_cost_usd: 1.00

github:
  token: ${GITHUB_TOKEN}
  labels: [flow-doctor]

rate_limits:
  max_diagnosed_per_day: 3
  max_issues_per_day: 3
  dedup_cooldown_minutes: 60

dependencies:
  - upstream-pipeline

remediation:
  enabled: true
  dry_run: true
  auto_remediate_min_confidence: 0.9

auto_fix:
  enabled: true
  confidence_threshold: 0.90
  test_command: "python -m pytest tests/ -x -q"
  scope:
    allow: ["src/", "lib/"]
    deny: ["*.yaml", "*.yml"]
# Deprecated since 0.5.0; will be removed in 0.6.0. Use FlowDoctor.builder() instead.
fd = flow_doctor.init(config_path="flow-doctor.yaml")

${VAR} references resolve from the process environment at load time. Unresolved references raise ConfigError — no silent passthrough.

Environment Variables

flow-doctor reads credentials from environment variables as its primary configuration mechanism. Every notifier has a documented fallback chain: explicit value → FLOW_DOCTOR_* canonical name → common conventions.

Canonical contract

Variable Used by Fallback chain Required when
FLOW_DOCTOR_TELEGRAM_BOT_TOKEN Telegram notifier FLOW_DOCTOR_TELEGRAM_BOT_TOKENTELEGRAM_BOT_TOKEN Telegram notifier config has no explicit bot_token field
FLOW_DOCTOR_TELEGRAM_CHAT_ID Telegram notifier FLOW_DOCTOR_TELEGRAM_CHAT_IDTELEGRAM_CHAT_ID Telegram notifier config has no explicit chat_id field
FLOW_DOCTOR_GITHUB_TOKEN GitHub notifier, auto-fix PR creator FLOW_DOCTOR_GITHUB_TOKENGH_TOKENGITHUB_TOKEN Any GitHub notifier or auto-fix is configured
FLOW_DOCTOR_GITHUB_REPO GitHub notifier FLOW_DOCTOR_GITHUB_REPO GitHub notifier config has no explicit repo field
FLOW_DOCTOR_SMTP_PASSWORD Email notifier FLOW_DOCTOR_SMTP_PASSWORDGMAIL_APP_PASSWORD SMTP requires auth
FLOW_DOCTOR_SMTP_SENDER Email notifier FLOW_DOCTOR_SMTP_SENDEREMAIL_SENDER Email notifier config has no explicit sender field
FLOW_DOCTOR_SMTP_RECIPIENTS Email notifier FLOW_DOCTOR_SMTP_RECIPIENTSEMAIL_RECIPIENTS Email notifier config has no explicit recipients field
FLOW_DOCTOR_SLACK_WEBHOOK Slack notifier FLOW_DOCTOR_SLACK_WEBHOOKSLACK_WEBHOOK_URL Slack notifier config has no explicit webhook_url field
FLOW_DOCTOR_S3_BUCKET S3 notifier FLOW_DOCTOR_S3_BUCKETCHANGELOG_BUCKET S3 notifier config has no explicit bucket field
FLOW_DOCTOR_ANTHROPIC_API_KEY LLM diagnosis, auto-fix generator FLOW_DOCTOR_ANTHROPIC_API_KEYANTHROPIC_API_KEY diagnosis.enabled: true or auto-fix is on
FLOW_DOCTOR_SKIP_PREFLIGHT All notifiers' validate() (literal) Set to 1 in tests / offline boot to bypass token/preflight network calls

Precedence for every field is: explicit value in kwargs/yaml → canonical FLOW_DOCTOR_* env var → convention fallbacks in the order listed. The first non-empty value wins. Missing values raise ConfigError at construction time naming the specific field and the env vars that would satisfy it.

Env-var-only quickstart — Telegram

Two env vars, two lines of Python, working alerts on the next exception:

export FLOW_DOCTOR_TELEGRAM_BOT_TOKEN=1234567890:ABC...
export FLOW_DOCTOR_TELEGRAM_CHAT_ID=-1001234567890
from flow_doctor import FlowDoctor, TelegramNotifierConfig

fd = FlowDoctor.builder("pipeline").add_notifier(TelegramNotifierConfig()).build()

Strict mode and degraded mode

FlowDoctor.builder().build() and flow_doctor.init() both default to strict=True. Any configuration error (missing required field, unresolved ${VAR}, unknown notifier type) raises ConfigError and prevents startup. This is the recommended default — a non-running flow-doctor is a loud failure; a silently-degraded flow-doctor is a silent one.

If you genuinely want best-effort init that logs errors but keeps running with no notifiers, opt in explicitly:

fd = FlowDoctor.builder("pipeline").build(strict=False)

Logging-handler integration

Attach to Python's logging system if you want every WARNING+ log to flow through dedup + diagnosis + notify without touching call sites:

import logging
import flow_doctor

fd = flow_doctor.FlowDoctor.builder("pipeline").add_notifier(
    flow_doctor.TelegramNotifierConfig()
).build()

handler = fd.get_handler(level=logging.WARNING)
logging.getLogger().addHandler(handler)

logger.warning("Upstream data is 48h stale")  # → captured + routed
logger.error("S3 backup failed: AccessDenied")
logger.exception("Pipeline crashed")

The handler is non-blockingemit() enqueues work and returns immediately; a background thread calls fd.report() asynchronously.

Log capture

Attach recent logs to the next error report for richer diagnosis context:

with fd.capture_logs(level=logging.INFO):
    logger.info("Starting scan with 900 tickers...")
    run_pipeline()
    # All captured logs are attached to the next fd.report() call

Features

Error capture and dedup

  • Traceback extraction with frame-based signature hashing
  • Configurable cooldown window (default 60 min) — same error captured once, not spammed
  • Variable-token normalization: reqIds, conIds, contract symbols, UUIDs, AWS request IDs are stripped before hashing, so a library logging the same error against N objects collapses to one signature
  • Cascade detection tags downstream failures caused by upstream dependency outages
  • Automatic secret scrubbing (AWS keys, Bearer tokens, passwords in URLs)

LLM diagnosis

  • Structured root cause analysis via Claude: category, confidence, affected files, remediation
  • Six categories: TRANSIENT, DATA, CODE, CONFIG, EXTERNAL, INFRA
  • Knowledge base caching — known patterns matched for free before calling the LLM
  • Git context assembly (recent commits, changed files) for better diagnosis accuracy
  • Daily cost cap (default $1.00) and rate limiting (default 3 diagnoses/day)

Notifications

  • Telegram — Bot API, per-chat / per-thread routing, mobile push (recommended default)
  • Slack — webhook-based alerts with severity emoji + diagnosis snippet
  • Email — SMTP (Gmail/any) with detailed body
  • GitHub issues — auto-filed with diagnosis, traceback, captured logs, machine-readable metadata
  • S3 — writes schema-1.0.0 entries to a system-wide changelog corpus
  • Daily digest — summarizes rate-limited / suppressed errors at end of day
  • Custom notifiers — subclass flow_doctor.notify.base.Notifier; the abstract base is a public extension point

Auto-Fix PRs

Human-in-the-loop: a human reviews a filed issue's diagnosis, adds a flow-doctor:fix label, and a GitHub Actions workflow generates a validated fix PR.

  1. An error occurs and Flow Doctor creates a GitHub issue with structured diagnosis
  2. A human reviews the diagnosis and adds the flow-doctor:fix label
  3. GitHub Actions triggers flow-doctor generate-fix
  4. The CLI generates a diff via LLM, validates against scope rules, runs tests
  5. If tests pass, a PR is opened. If tests fail, a comment explains what went wrong.

Safety gates — fix generation is skipped when:

  • Confidence below threshold (default 90%)
  • Category is EXTERNAL or INFRA (nothing to fix in code)
  • Config issue involves credentials/secrets
  • Generated diff touches files outside configured scope
  • Tests fail after applying the fix

Remediation playbooks

Define patterns that map failure signatures to automated actions:

from flow_doctor.remediation.playbook import (
    Playbook, PlaybookPattern, RemediationAction, RemediationType,
)

my_playbook = Playbook(patterns=[
    PlaybookPattern(
        name="service_down",
        description="App service not responding",
        category="INFRA",
        message_pattern=r"(connection refused|service unavailable)",
        action=RemediationAction(
            action_type=RemediationType.RESTART_SERVICE,
            description="Restart the app service",
            commands=["sudo systemctl restart myapp"],
            ssm_target="app-server",
        ),
    ),
])

Auto-Fix CLI

flow-doctor generate-fix \
  --issue-number 42 \
  --repo owner/repo \
  --token $GITHUB_TOKEN \
  --config flow-doctor.yaml \
  --dry-run

GitHub Actions workflow (copy to your repo at .github/workflows/flow-doctor-fix.yml):

name: Flow Doctor Fix
on:
  issues:
    types: [labeled]
jobs:
  generate-fix:
    if: github.event.label.name == 'flow-doctor:fix'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.12'
      - run: pip install --pre "flow-doctor[diagnosis]"
      - run: |
          python -m flow_doctor.fix.cli generate-fix \
            --issue-number ${{ github.event.issue.number }} \
            --repo ${{ github.repository }} \
            --token $GITHUB_TOKEN
        env:
          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
          ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}

Migrating from 0.4.x

flow_doctor.init(config_path=...) and direct construction of NotifyChannelConfig are both @deprecated (PEP 702) in 0.5.0. Both still work — they'll be removed in 0.6.0. Migration:

# 0.4.x — still works in 0.5.x, but mypy/pyright will flag it
fd = flow_doctor.init(config_path="flow-doctor.yaml")

# 0.5.x — typed, IDE-discoverable, no yaml
fd = (
    FlowDoctor.builder("pipeline")
    .add_notifier(TelegramNotifierConfig())
    .build()
)

The 0.5.x yaml shim is API-compatible with 0.4.x configs; existing yaml files keep working through the deprecation window.

Architecture

flow_doctor/
  core/           # Client, builder, config (Pydantic v2), models, dedup,
                  # rate limiting, scrubber, logging handler, contextvars
  _protocol.py    # FlowDoctorProtocol public contract
  notify/         # Telegram, Slack, Email, GitHub, S3 — concrete notifiers
                  # + typed Pydantic config models (discriminated union)
  diagnosis/      # LLM provider, context assembly, knowledge base, git context
  digest/         # Daily digest generator
  fix/            # Auto-fix: LLM generator, scope guard, test validator, PR creator, CLI
  remediation/    # Decision gate, executor, playbook patterns
  storage/        # SQLite backend (thread-safe, per-thread connections)
  testing/        # RecordingFlowDoctor + pytest plugin (auto-discovered)
  otel.py         # Report → OTel SpanEvent serialization adapter
  py.typed        # PEP 561 marker — annotations are authoritative for mypy/pyright

Development

git clone https://github.com/cipher813/flow-doctor.git
cd flow-doctor
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev,diagnosis]"

python -m pytest tests/ -x -q             # 376 tests
python -m pytest tests/ --cov=flow_doctor # coverage report
python examples/smoke_test.py              # end-to-end smoke test

License

MIT

About

Pipeline error handler for Python. Captures exceptions, diagnoses root causes with LLMs, files GitHub issues, and generates fix PRs — all from a logging handler or single report() call.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages