From f54eba8066699dcb59d4f52ecc3c7eb05f351cf8 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Wed, 13 May 2026 12:18:31 -0700 Subject: [PATCH 1/4] Migrate core/config.py dataclasses to Pydantic v2 Replaces all 11 @dataclass declarations in flow_doctor/core/config.py with pydantic.BaseModel via a shared _ConfigModel base. Field names and defaults are preserved so existing test fixtures and consumers (alpha-engine calls flow_doctor.init(config_path=...)) keep working unchanged. The migration is the foundation for the FlowDoctor.builder() fluent API and Pydantic BaseSettings-driven env-var injection landing in subsequent commits on this branch (per private/plug-and-play-260513.md Proposal 1). load_config() gains pass-through branches that accept already-constructed Pydantic instances in addition to dicts, so the upcoming builder can hand typed sub-configs straight to it without re-serializing through YAML. Adds pydantic>=2.0 to [project.dependencies]; alpha-engine + morning-signal already run Pydantic 2.x per the plan's compat audit. 296/296 tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- flow_doctor/core/config.py | 136 ++++++++++++++++++++++--------------- pyproject.toml | 1 + 2 files changed, 83 insertions(+), 54 deletions(-) diff --git a/flow_doctor/core/config.py b/flow_doctor/core/config.py index 05f7e3a..efba190 100644 --- a/flow_doctor/core/config.py +++ b/flow_doctor/core/config.py @@ -1,20 +1,40 @@ -"""Configuration: YAML file + inline Python kwargs.""" +"""Configuration: YAML file + inline Python kwargs. + +Backed by Pydantic v2 ``BaseModel`` (since v0.5.0) — field names + defaults +match the prior ``@dataclass``-based shape, so callers constructing +``FlowDoctorConfig(...)``, ``NotifyChannelConfig(...)``, etc. by keyword +keep working unchanged. The benefit of the migration is type validation +at construction time and a stable foundation for the typed builder API +and the Pydantic ``BaseSettings`` env-var contract. +""" from __future__ import annotations import os import re -from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional import yaml +from pydantic import BaseModel, ConfigDict, Field from flow_doctor.core.errors import ConfigError -@dataclass -class NotifyChannelConfig: +class _ConfigModel(BaseModel): + """Base for flow-doctor config models. + + ``extra="ignore"`` matches the prior dataclass behaviour (unknown + keys in inline kwargs / yaml were silently dropped). ``validate_assignment`` + is off so test/runtime code that mutates an attribute after construction + (e.g. ``cfg.market_hours_lockout = False``) keeps working the same way + it did with dataclasses. + """ + + model_config = ConfigDict(extra="ignore", validate_assignment=False) + + +class NotifyChannelConfig(_ConfigModel): type: str # "slack", "email", "github", or "s3" # Slack fields webhook_url: Optional[str] = None @@ -39,16 +59,14 @@ class NotifyChannelConfig: default_resolution_type: Optional[str] = None -@dataclass -class StoreConfig: +class StoreConfig(_ConfigModel): type: str = "sqlite" path: str = "flow_doctor.db" bucket: Optional[str] = None prefix: Optional[str] = None -@dataclass -class RateLimitConfig: +class RateLimitConfig(_ConfigModel): max_diagnosed_per_day: int = 3 max_issues_per_day: int = 3 max_alerts_per_day: int = 5 @@ -57,8 +75,7 @@ class RateLimitConfig: dedup_cooldown_minutes: int = 60 -@dataclass -class DiagnosisConfig: +class DiagnosisConfig(_ConfigModel): enabled: bool = False provider: str = "anthropic" model: str = "claude-sonnet-4-6-20250514" @@ -68,30 +85,26 @@ class DiagnosisConfig: max_daily_cost_usd: float = 1.00 # Hard cap on daily LLM spend -@dataclass -class GitHubConfig: +class GitHubConfig(_ConfigModel): token: Optional[str] = None - labels: List[str] = field(default_factory=lambda: ["flow-doctor"]) + labels: List[str] = Field(default_factory=lambda: ["flow-doctor"]) -@dataclass -class ScopeConfig: - allow: List[str] = field(default_factory=list) - deny: List[str] = field(default_factory=list) +class ScopeConfig(_ConfigModel): + allow: List[str] = Field(default_factory=list) + deny: List[str] = Field(default_factory=list) -@dataclass -class AutoFixConfig: +class AutoFixConfig(_ConfigModel): enabled: bool = False confidence_threshold: float = 0.90 - scope: ScopeConfig = field(default_factory=ScopeConfig) + scope: ScopeConfig = Field(default_factory=ScopeConfig) test_command: str = "python -m pytest tests/ -x -q" dry_run: bool = True model: Optional[str] = None -@dataclass -class RemediationConfig: +class RemediationConfig(_ConfigModel): enabled: bool = False dry_run: bool = True # Log actions without executing auto_remediate_min_confidence: float = 0.9 @@ -121,33 +134,31 @@ class RemediationConfig: # payment processors, medical software). Issue-filing still works # for these repos — only code modifications are blocked. Matches # GitHub-style "owner/name" or bare "name" (case-insensitive). - deny_repos: List[str] = field(default_factory=list) + deny_repos: List[str] = Field(default_factory=list) -@dataclass -class HandlerConfig: +class HandlerConfig(_ConfigModel): level: str = "ERROR" - include_patterns: List[str] = field(default_factory=list) - exclude_patterns: List[str] = field(default_factory=list) + include_patterns: List[str] = Field(default_factory=list) + exclude_patterns: List[str] = Field(default_factory=list) queue_size: int = 100 -@dataclass -class FlowDoctorConfig: +class FlowDoctorConfig(_ConfigModel): flow_name: str = "default" repo: Optional[str] = None owner: Optional[str] = None - notify: List[NotifyChannelConfig] = field(default_factory=list) - store: StoreConfig = field(default_factory=StoreConfig) - rate_limits: RateLimitConfig = field(default_factory=RateLimitConfig) - dependencies: List[str] = field(default_factory=list) + notify: List[NotifyChannelConfig] = Field(default_factory=list) + store: StoreConfig = Field(default_factory=StoreConfig) + rate_limits: RateLimitConfig = Field(default_factory=RateLimitConfig) + dependencies: List[str] = Field(default_factory=list) dedup_cooldown_minutes: int = 60 - diagnosis: DiagnosisConfig = field(default_factory=DiagnosisConfig) - github: GitHubConfig = field(default_factory=GitHubConfig) - auto_fix: AutoFixConfig = field(default_factory=AutoFixConfig) - remediation: RemediationConfig = field(default_factory=RemediationConfig) + diagnosis: DiagnosisConfig = Field(default_factory=DiagnosisConfig) + github: GitHubConfig = Field(default_factory=GitHubConfig) + auto_fix: AutoFixConfig = Field(default_factory=AutoFixConfig) + remediation: RemediationConfig = Field(default_factory=RemediationConfig) handler: Optional[HandlerConfig] = None - extra: Dict[str, Any] = field(default_factory=dict) + extra: Dict[str, Any] = Field(default_factory=dict) _ENV_VAR_RE = re.compile(r"\$\{([^}]+)\}") @@ -309,6 +320,10 @@ def load_config( notify = _parse_notify_shorthand(notify_raw) elif notify_raw and isinstance(notify_raw[0], dict): notify = _parse_notify_dicts(notify_raw) + elif notify_raw and isinstance(notify_raw[0], NotifyChannelConfig): + # Inline kwargs may pass already-constructed NotifyChannelConfig + # instances (the builder does this); accept them verbatim. + notify = list(notify_raw) else: notify = [] else: @@ -319,21 +334,26 @@ def load_config( # Parse rate limits rl_raw = raw.get("rate_limits", {}) - rate_limits = RateLimitConfig( - max_diagnosed_per_day=rl_raw.get("max_diagnosed_per_day", 3), - max_issues_per_day=rl_raw.get("max_issues_per_day", 3), - max_alerts_per_day=rl_raw.get("max_alerts_per_day", 5), - daily_digest=rl_raw.get("daily_digest", True), - digest_time=rl_raw.get("digest_time", "17:00"), - dedup_cooldown_minutes=rl_raw.get("dedup_cooldown_minutes", - raw.get("dedup_cooldown_minutes", 60)), - ) + if isinstance(rl_raw, RateLimitConfig): + rate_limits = rl_raw + else: + rate_limits = RateLimitConfig( + max_diagnosed_per_day=rl_raw.get("max_diagnosed_per_day", 3), + max_issues_per_day=rl_raw.get("max_issues_per_day", 3), + max_alerts_per_day=rl_raw.get("max_alerts_per_day", 5), + daily_digest=rl_raw.get("daily_digest", True), + digest_time=rl_raw.get("digest_time", "17:00"), + dedup_cooldown_minutes=rl_raw.get("dedup_cooldown_minutes", + raw.get("dedup_cooldown_minutes", 60)), + ) dedup_cooldown = raw.get("dedup_cooldown_minutes", rate_limits.dedup_cooldown_minutes) # Parse diagnosis config diag_raw = raw.get("diagnosis", {}) - if isinstance(diag_raw, dict): + if isinstance(diag_raw, DiagnosisConfig): + diagnosis_config = diag_raw + elif isinstance(diag_raw, dict): diag_raw = _resolve_dict(diag_raw) diagnosis_config = DiagnosisConfig( enabled=diag_raw.get("enabled", False), @@ -349,7 +369,9 @@ def load_config( # Parse github config gh_raw = raw.get("github", {}) - if isinstance(gh_raw, dict): + if isinstance(gh_raw, GitHubConfig): + github_config = gh_raw + elif isinstance(gh_raw, dict): gh_raw = _resolve_dict(gh_raw) github_config = GitHubConfig( token=gh_raw.get("token"), @@ -360,7 +382,9 @@ def load_config( # Parse auto_fix config af_raw = raw.get("auto_fix", {}) - if isinstance(af_raw, dict): + if isinstance(af_raw, AutoFixConfig): + auto_fix_config = af_raw + elif isinstance(af_raw, dict): af_raw = _resolve_dict(af_raw) scope_raw = af_raw.get("scope", {}) scope_config = ScopeConfig( @@ -380,11 +404,13 @@ def load_config( # Parse remediation config rem_raw = raw.get("remediation", {}) - if isinstance(rem_raw, dict): + if isinstance(rem_raw, RemediationConfig): + remediation_config = rem_raw + elif isinstance(rem_raw, dict): rem_raw = _resolve_dict(rem_raw) - # Defaults here match the RemediationConfig dataclass defaults + # Defaults here match the RemediationConfig model defaults # (not inlined) so there's one source of truth. If you change a - # default, change it in the dataclass at the top of this file. + # default, change it in the model at the top of this file. _defaults = RemediationConfig() deny_repos_raw = rem_raw.get("deny_repos", []) if isinstance(deny_repos_raw, str): @@ -418,7 +444,9 @@ def load_config( # Parse handler config handler_raw = raw.get("handler") - if isinstance(handler_raw, dict): + if isinstance(handler_raw, HandlerConfig): + handler_config = handler_raw + elif isinstance(handler_raw, dict): handler_raw = _resolve_dict(handler_raw) handler_config = HandlerConfig( level=handler_raw.get("level", "ERROR"), diff --git a/pyproject.toml b/pyproject.toml index e020949..ca24e86 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ classifiers = [ ] dependencies = [ "pyyaml>=6.0", + "pydantic>=2.0", ] [project.optional-dependencies] From f7488ae871284be9b43e054c0b3fc48d4cce99b9 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Wed, 13 May 2026 12:21:10 -0700 Subject: [PATCH 2/4] Add typed per-channel notifier configs + FlowDoctor.builder() Introduces the SOTA-facing entry point for new consumers (plan Proposal 1): - flow_doctor/notify/configs.py defines SlackNotifierConfig, EmailNotifierConfig, GitHubNotifierConfig, S3NotifierConfig as Pydantic v2 models, exposed as the discriminated union NotifierConfig via Field(discriminator="type"). EmailNotifierConfig.recipients accepts either a CSV string or a list and normalizes via a field_validator on the way down to the legacy NotifyChannelConfig. - flow_doctor/core/builder.py adds FlowDoctorBuilder with fluent add_notifier / with_repo / with_dedup / with_store / with_diagnosis / with_github / with_auto_fix / with_remediation / with_handler / with_dependencies methods plus build_config() and build(strict=True). - FlowDoctor.builder(flow_name) classmethod is the recommended entry point for new code; flow_doctor.init(config_path=...) still works unchanged for 0.4.0 callers. The new typed configs and builder are re-exported from the flow_doctor and flow_doctor.notify package roots. 18 new tests cover: per-channel config -> legacy NotifyChannelConfig round-trip, list-to-CSV recipients normalization, fluent chaining, unspecified-section defaults, the morning-signal cutover use case, and the discriminated-union deserialization path (TypeAdapter + ValidationError for unknown channel types). Suite: 314/314 pass (296 baseline + 18 new). Co-Authored-By: Claude Opus 4.7 (1M context) --- flow_doctor/__init__.py | 14 ++ flow_doctor/core/builder.py | 190 ++++++++++++++++++++++++ flow_doctor/core/client.py | 33 ++++- flow_doctor/notify/__init__.py | 17 +++ flow_doctor/notify/configs.py | 130 ++++++++++++++++ tests/test_builder.py | 263 +++++++++++++++++++++++++++++++++ 6 files changed, 646 insertions(+), 1 deletion(-) create mode 100644 flow_doctor/core/builder.py create mode 100644 flow_doctor/notify/configs.py create mode 100644 tests/test_builder.py diff --git a/flow_doctor/__init__.py b/flow_doctor/__init__.py index bcfdc46..f1d6110 100644 --- a/flow_doctor/__init__.py +++ b/flow_doctor/__init__.py @@ -1,16 +1,30 @@ """Flow Doctor -- call-site error handler for pipeline reliability.""" +from flow_doctor.core.builder import FlowDoctorBuilder from flow_doctor.core.client import FlowDoctor, init from flow_doctor.core.errors import ConfigError, FlowDoctorError from flow_doctor.core.handler import FlowDoctorHandler from flow_doctor.core.models import Severity +from flow_doctor.notify.configs import ( + EmailNotifierConfig, + GitHubNotifierConfig, + NotifierConfig, + S3NotifierConfig, + SlackNotifierConfig, +) __all__ = [ "ConfigError", + "EmailNotifierConfig", "FlowDoctor", + "FlowDoctorBuilder", "FlowDoctorError", "FlowDoctorHandler", + "GitHubNotifierConfig", + "NotifierConfig", + "S3NotifierConfig", "Severity", + "SlackNotifierConfig", "init", ] __version__ = "0.4.0" diff --git a/flow_doctor/core/builder.py b/flow_doctor/core/builder.py new file mode 100644 index 0000000..a57a282 --- /dev/null +++ b/flow_doctor/core/builder.py @@ -0,0 +1,190 @@ +"""Typed fluent builder for ``FlowDoctor`` — no yaml required. + +The builder is the SOTA-facing replacement for ``flow_doctor.init()``. +Each method returns ``self`` so calls chain, each accepts a typed +Pydantic sub-config so IDE autocomplete and ``mypy --strict`` work +end-to-end. ``init(config_path=...)`` keeps working for back-compat +(see private/plug-and-play-260513.md Proposal 1). +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, List, Optional, Union + +from flow_doctor.core.config import ( + AutoFixConfig, + DiagnosisConfig, + FlowDoctorConfig, + GitHubConfig, + HandlerConfig, + NotifyChannelConfig, + RateLimitConfig, + RemediationConfig, + StoreConfig, +) +from flow_doctor.notify.configs import ( + EmailNotifierConfig, + GitHubNotifierConfig, + S3NotifierConfig, + SlackNotifierConfig, +) + +if TYPE_CHECKING: + from flow_doctor.core.client import FlowDoctor + + +_PerTypeNotifier = Union[ + SlackNotifierConfig, + EmailNotifierConfig, + GitHubNotifierConfig, + S3NotifierConfig, +] + + +class FlowDoctorBuilder: + """Fluent builder for :class:`FlowDoctor`. + + Typical usage:: + + fd = ( + FlowDoctor.builder("morning-signal") + .add_notifier(EmailNotifierConfig( + sender="x@y.com", + recipients=["x@y.com"], + smtp_password=os.environ["GMAIL_APP_PASSWORD"], + )) + .with_dedup(cooldown_minutes=60) + .build() + ) + + Every ``with_*`` / ``add_*`` method returns ``self`` so calls chain. + ``build()`` materializes a ``FlowDoctorConfig`` and constructs a + ``FlowDoctor``; ``build_config()`` returns just the config (useful + for tests and for handing a config off to a custom subclass). + """ + + def __init__(self, flow_name: str): + self._flow_name = flow_name + self._repo: Optional[str] = None + self._owner: Optional[str] = None + self._notifiers: List[NotifyChannelConfig] = [] + self._store: Optional[StoreConfig] = None + self._dedup_cooldown_minutes: Optional[int] = None + self._rate_limits: Optional[RateLimitConfig] = None + self._diagnosis: Optional[DiagnosisConfig] = None + self._github: Optional[GitHubConfig] = None + self._auto_fix: Optional[AutoFixConfig] = None + self._remediation: Optional[RemediationConfig] = None + self._handler: Optional[HandlerConfig] = None + self._dependencies: List[str] = [] + + def add_notifier( + self, + cfg: Union[_PerTypeNotifier, NotifyChannelConfig], + ) -> "FlowDoctorBuilder": + """Append a notifier. + + Accepts a typed per-channel config (``SlackNotifierConfig``, + ``EmailNotifierConfig``, ``GitHubNotifierConfig``, + ``S3NotifierConfig``) or the legacy omnibus + ``NotifyChannelConfig`` (for back-compat with 0.4.0 callers). + """ + if isinstance(cfg, NotifyChannelConfig): + self._notifiers.append(cfg) + else: + self._notifiers.append(cfg.to_channel_config()) + return self + + def with_repo( + self, repo: str, *, owner: Optional[str] = None + ) -> "FlowDoctorBuilder": + self._repo = repo + if owner is not None: + self._owner = owner + return self + + def with_dedup(self, *, cooldown_minutes: int) -> "FlowDoctorBuilder": + self._dedup_cooldown_minutes = cooldown_minutes + return self + + def with_store( + self, + *, + type: str = "sqlite", + path: str = "flow_doctor.db", + bucket: Optional[str] = None, + prefix: Optional[str] = None, + ) -> "FlowDoctorBuilder": + self._store = StoreConfig( + type=type, path=path, bucket=bucket, prefix=prefix + ) + return self + + def with_rate_limits(self, cfg: RateLimitConfig) -> "FlowDoctorBuilder": + self._rate_limits = cfg + return self + + def with_diagnosis(self, cfg: DiagnosisConfig) -> "FlowDoctorBuilder": + self._diagnosis = cfg + return self + + def with_github(self, cfg: GitHubConfig) -> "FlowDoctorBuilder": + self._github = cfg + return self + + def with_auto_fix(self, cfg: AutoFixConfig) -> "FlowDoctorBuilder": + self._auto_fix = cfg + return self + + def with_remediation(self, cfg: RemediationConfig) -> "FlowDoctorBuilder": + self._remediation = cfg + return self + + def with_handler(self, cfg: HandlerConfig) -> "FlowDoctorBuilder": + self._handler = cfg + return self + + def with_dependencies(self, deps: List[str]) -> "FlowDoctorBuilder": + self._dependencies = list(deps) + return self + + def build_config(self) -> FlowDoctorConfig: + """Materialize the ``FlowDoctorConfig`` without instantiating + ``FlowDoctor``. Useful for tests + for handing the config to + a custom subclass.""" + kwargs: dict = { + "flow_name": self._flow_name, + "notify": list(self._notifiers), + } + if self._repo is not None: + kwargs["repo"] = self._repo + if self._owner is not None: + kwargs["owner"] = self._owner + if self._store is not None: + kwargs["store"] = self._store + if self._rate_limits is not None: + kwargs["rate_limits"] = self._rate_limits + if self._dedup_cooldown_minutes is not None: + kwargs["dedup_cooldown_minutes"] = self._dedup_cooldown_minutes + if self._diagnosis is not None: + kwargs["diagnosis"] = self._diagnosis + if self._github is not None: + kwargs["github"] = self._github + if self._auto_fix is not None: + kwargs["auto_fix"] = self._auto_fix + if self._remediation is not None: + kwargs["remediation"] = self._remediation + if self._handler is not None: + kwargs["handler"] = self._handler + if self._dependencies: + kwargs["dependencies"] = self._dependencies + return FlowDoctorConfig(**kwargs) + + def build(self, *, strict: bool = True) -> "FlowDoctor": + """Materialize the config and construct a :class:`FlowDoctor`.""" + from flow_doctor.core.client import FlowDoctor + + return FlowDoctor(self.build_config(), strict=strict) + + +__all__ = ["FlowDoctorBuilder"] diff --git a/flow_doctor/core/client.py b/flow_doctor/core/client.py index 123df30..298bf9a 100644 --- a/flow_doctor/core/client.py +++ b/flow_doctor/core/client.py @@ -10,7 +10,10 @@ import traceback as tb_module from contextlib import contextmanager from datetime import datetime -from typing import Any, Callable, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional + +if TYPE_CHECKING: + from flow_doctor.core.builder import FlowDoctorBuilder from flow_doctor.core.config import FlowDoctorConfig, load_config from flow_doctor.core.dedup import ( @@ -73,6 +76,34 @@ def emit(self, record: logging.LogRecord) -> None: class FlowDoctor: """Main Flow Doctor client.""" + @classmethod + def builder(cls, flow_name: str) -> "FlowDoctorBuilder": + """Return a fluent builder for constructing a ``FlowDoctor``. + + Preferred entry point for new code — typed, IDE-discoverable, + no yaml required. See :class:`flow_doctor.core.builder.FlowDoctorBuilder` + for the full API. + + Example:: + + from flow_doctor import FlowDoctor + from flow_doctor.notify import EmailNotifierConfig + + fd = ( + FlowDoctor.builder("morning-signal") + .add_notifier(EmailNotifierConfig( + sender="x@y.com", + recipients=["x@y.com"], + smtp_password=os.environ["GMAIL_APP_PASSWORD"], + )) + .with_dedup(cooldown_minutes=60) + .build() + ) + """ + from flow_doctor.core.builder import FlowDoctorBuilder + + return FlowDoctorBuilder(flow_name) + def __init__(self, config: FlowDoctorConfig, *, strict: bool = True): """Initialize a FlowDoctor instance. diff --git a/flow_doctor/notify/__init__.py b/flow_doctor/notify/__init__.py index e69de29..1214560 100644 --- a/flow_doctor/notify/__init__.py +++ b/flow_doctor/notify/__init__.py @@ -0,0 +1,17 @@ +"""Notifier package — concrete notifiers + per-channel typed configs.""" + +from flow_doctor.notify.configs import ( + EmailNotifierConfig, + GitHubNotifierConfig, + NotifierConfig, + S3NotifierConfig, + SlackNotifierConfig, +) + +__all__ = [ + "EmailNotifierConfig", + "GitHubNotifierConfig", + "NotifierConfig", + "S3NotifierConfig", + "SlackNotifierConfig", +] diff --git a/flow_doctor/notify/configs.py b/flow_doctor/notify/configs.py new file mode 100644 index 0000000..4244864 --- /dev/null +++ b/flow_doctor/notify/configs.py @@ -0,0 +1,130 @@ +"""Per-channel typed notifier configs (Pydantic v2 discriminated union). + +These are the SOTA-facing replacement for the omnibus +``flow_doctor.core.config.NotifyChannelConfig``. Each notifier channel +gets its own model with only the fields it actually consumes — so +IDE autocomplete and ``mypy --strict`` surface required fields at the +construction site, and a misplaced ``token=`` on an Email config is a +type error rather than a silent typo. + +The omnibus ``NotifyChannelConfig`` is still the internal lingua franca +that ``FlowDoctor._init_notifiers`` consumes. Each typed config exposes +``to_channel_config()`` so the builder can fold typed inputs back into +the legacy shape without touching the init code path. +""" + +from __future__ import annotations + +from typing import Annotated, List, Literal, Optional, Union + +from pydantic import BaseModel, ConfigDict, Field, field_validator + +from flow_doctor.core.config import NotifyChannelConfig + + +class _NotifierConfigBase(BaseModel): + model_config = ConfigDict(extra="ignore", validate_assignment=False) + + +class SlackNotifierConfig(_NotifierConfigBase): + type: Literal["slack"] = "slack" + webhook_url: Optional[str] = None + channel: Optional[str] = None + + def to_channel_config(self) -> NotifyChannelConfig: + return NotifyChannelConfig( + type="slack", + webhook_url=self.webhook_url, + channel=self.channel, + ) + + +class EmailNotifierConfig(_NotifierConfigBase): + type: Literal["email"] = "email" + sender: Optional[str] = None + # The legacy NotifyChannelConfig.recipients is a CSV string; accept + # either ``"a@x, b@y"`` or ``["a@x", "b@y"]`` at the typed surface + # and normalize to CSV on the way down to the omnibus form. + recipients: Optional[Union[str, List[str]]] = None + smtp_host: str = "smtp.gmail.com" + smtp_port: int = 587 + smtp_password: Optional[str] = None + + @field_validator("recipients", mode="after") + @classmethod + def _normalize_recipients( + cls, v: Optional[Union[str, List[str]]] + ) -> Optional[str]: + if v is None: + return None + if isinstance(v, list): + cleaned = [str(item).strip() for item in v if item] + return ", ".join(cleaned) if cleaned else None + return v + + def to_channel_config(self) -> NotifyChannelConfig: + return NotifyChannelConfig( + type="email", + sender=self.sender, + recipients=self.recipients, # already CSV after the validator + smtp_host=self.smtp_host, + smtp_port=self.smtp_port, + smtp_password=self.smtp_password, + ) + + +class GitHubNotifierConfig(_NotifierConfigBase): + type: Literal["github"] = "github" + repo: Optional[str] = None + token: Optional[str] = None + labels: Optional[List[str]] = None + + def to_channel_config(self) -> NotifyChannelConfig: + return NotifyChannelConfig( + type="github", + repo=self.repo, + token=self.token, + labels=self.labels, + ) + + +class S3NotifierConfig(_NotifierConfigBase): + type: Literal["s3"] = "s3" + bucket: Optional[str] = None + subsystem: Optional[str] = None + entry_prefix: str = "changelog/entries" + default_root_cause_category: str = "code_bug" + default_resolution_type: Optional[str] = None + + def to_channel_config(self) -> NotifyChannelConfig: + return NotifyChannelConfig( + type="s3", + bucket=self.bucket, + subsystem=self.subsystem, + entry_prefix=self.entry_prefix, + default_root_cause_category=self.default_root_cause_category, + default_resolution_type=self.default_resolution_type, + ) + + +# Discriminated union of all typed notifier configs. Consumers can +# type-hint as ``NotifierConfig`` and Pydantic will pick the right +# concrete model based on the ``type`` field. +NotifierConfig = Annotated[ + Union[ + SlackNotifierConfig, + EmailNotifierConfig, + GitHubNotifierConfig, + S3NotifierConfig, + ], + Field(discriminator="type"), +] + + +__all__ = [ + "EmailNotifierConfig", + "GitHubNotifierConfig", + "NotifierConfig", + "S3NotifierConfig", + "SlackNotifierConfig", +] diff --git a/tests/test_builder.py b/tests/test_builder.py new file mode 100644 index 0000000..537f5ee --- /dev/null +++ b/tests/test_builder.py @@ -0,0 +1,263 @@ +"""Tests for the FlowDoctor.builder() fluent API + typed notifier configs. + +Mirrors the morning-signal cutover use case in private/plug-and-play-260513.md: +a downstream consumer constructs a FlowDoctor without yaml, in 5 typed lines. +""" + +from __future__ import annotations + +import tempfile + +import pytest + +from flow_doctor import ( + EmailNotifierConfig, + FlowDoctor, + FlowDoctorBuilder, + GitHubNotifierConfig, + S3NotifierConfig, + SlackNotifierConfig, +) +from flow_doctor.core.config import ( + DiagnosisConfig, + FlowDoctorConfig, + GitHubConfig, + NotifyChannelConfig, + RateLimitConfig, +) + + +# --------------------------------------------------------------------------- +# Per-type notifier configs +# --------------------------------------------------------------------------- + + +def test_email_notifier_config_normalizes_list_to_csv(): + cfg = EmailNotifierConfig( + sender="x@y.com", + recipients=["a@y.com", "b@y.com", " c@y.com "], + smtp_password="secret", + ) + assert cfg.recipients == "a@y.com, b@y.com, c@y.com" + + +def test_email_notifier_config_accepts_csv_string_unchanged(): + cfg = EmailNotifierConfig( + sender="x@y.com", recipients="a@y.com, b@y.com" + ) + assert cfg.recipients == "a@y.com, b@y.com" + + +def test_email_notifier_config_to_channel_config_round_trip(): + cfg = EmailNotifierConfig( + sender="x@y.com", + recipients=["a@y.com"], + smtp_password="secret", + smtp_host="smtp.example.com", + smtp_port=25, + ) + legacy = cfg.to_channel_config() + assert isinstance(legacy, NotifyChannelConfig) + assert legacy.type == "email" + assert legacy.sender == "x@y.com" + assert legacy.recipients == "a@y.com" + assert legacy.smtp_host == "smtp.example.com" + assert legacy.smtp_port == 25 + assert legacy.smtp_password == "secret" + + +def test_slack_notifier_config_to_channel_config(): + cfg = SlackNotifierConfig(webhook_url="https://hooks", channel="#ops") + legacy = cfg.to_channel_config() + assert legacy.type == "slack" + assert legacy.webhook_url == "https://hooks" + assert legacy.channel == "#ops" + + +def test_github_notifier_config_to_channel_config(): + cfg = GitHubNotifierConfig( + repo="owner/repo", token="ghs_xxx", labels=["bug", "flow-doctor"] + ) + legacy = cfg.to_channel_config() + assert legacy.type == "github" + assert legacy.repo == "owner/repo" + assert legacy.token == "ghs_xxx" + assert legacy.labels == ["bug", "flow-doctor"] + + +def test_s3_notifier_config_to_channel_config_preserves_changelog_fields(): + cfg = S3NotifierConfig( + bucket="alpha-engine-research", + subsystem="predictor", + entry_prefix="changelog/entries", + default_root_cause_category="data_quality", + default_resolution_type="config", + ) + legacy = cfg.to_channel_config() + assert legacy.type == "s3" + assert legacy.bucket == "alpha-engine-research" + assert legacy.subsystem == "predictor" + assert legacy.entry_prefix == "changelog/entries" + assert legacy.default_root_cause_category == "data_quality" + assert legacy.default_resolution_type == "config" + + +# --------------------------------------------------------------------------- +# Builder +# --------------------------------------------------------------------------- + + +def test_builder_returns_builder_instance(): + builder = FlowDoctor.builder("test-flow") + assert isinstance(builder, FlowDoctorBuilder) + + +def test_builder_methods_return_self_for_chaining(): + builder = FlowDoctor.builder("test-flow") + assert builder.with_repo("owner/repo") is builder + assert builder.with_dedup(cooldown_minutes=30) is builder + assert ( + builder.add_notifier(SlackNotifierConfig(webhook_url="https://x")) + is builder + ) + + +def test_builder_build_config_assembles_typed_inputs(): + config = ( + FlowDoctor.builder("morning-signal") + .with_repo("owner/repo", owner="@brian") + .add_notifier( + EmailNotifierConfig( + sender="x@y.com", + recipients=["x@y.com"], + smtp_password="secret", + ) + ) + .with_dedup(cooldown_minutes=60) + .build_config() + ) + assert isinstance(config, FlowDoctorConfig) + assert config.flow_name == "morning-signal" + assert config.repo == "owner/repo" + assert config.owner == "@brian" + assert config.dedup_cooldown_minutes == 60 + assert len(config.notify) == 1 + notif = config.notify[0] + assert notif.type == "email" + assert notif.sender == "x@y.com" + assert notif.recipients == "x@y.com" + assert notif.smtp_password == "secret" + + +def test_builder_accepts_legacy_notify_channel_config(): + """Back-compat: existing 0.4.0 callers that construct a raw + NotifyChannelConfig can still pass it to add_notifier().""" + legacy = NotifyChannelConfig(type="slack", webhook_url="https://x") + config = FlowDoctor.builder("test").add_notifier(legacy).build_config() + assert config.notify[0] is legacy + + +def test_builder_unspecified_sections_use_defaults(): + config = FlowDoctor.builder("min").build_config() + assert config.flow_name == "min" + assert config.notify == [] + assert config.store.type == "sqlite" + assert config.diagnosis.enabled is False + assert config.remediation.enabled is False + + +def test_builder_with_rate_limits_overrides_defaults(): + config = ( + FlowDoctor.builder("test") + .with_rate_limits(RateLimitConfig(max_alerts_per_day=42)) + .build_config() + ) + assert config.rate_limits.max_alerts_per_day == 42 + + +def test_builder_with_diagnosis_and_github_compose(): + config = ( + FlowDoctor.builder("test") + .with_diagnosis(DiagnosisConfig(enabled=True, api_key="sk-xxx")) + .with_github(GitHubConfig(token="ghs_xxx")) + .build_config() + ) + assert config.diagnosis.enabled is True + assert config.diagnosis.api_key == "sk-xxx" + assert config.github.token == "ghs_xxx" + + +def test_builder_with_dependencies_replaces_list(): + config = ( + FlowDoctor.builder("test") + .with_dependencies(["pkg-a", "pkg-b"]) + .build_config() + ) + assert config.dependencies == ["pkg-a", "pkg-b"] + + +def test_builder_build_constructs_flow_doctor_instance(): + """End-to-end: builder.build() returns a working FlowDoctor. + + Uses sqlite at a temp path + no notifiers so we don't touch the network. + """ + with tempfile.NamedTemporaryFile(suffix=".db") as f: + fd = ( + FlowDoctor.builder("test-flow") + .with_store(path=f.name) + .build() + ) + assert fd.config.flow_name == "test-flow" + assert fd.config.store.path == f.name + # report() should not crash with zero notifiers configured + report_id = fd.report("synthetic", severity="error") + assert report_id is not None + + +def test_builder_multiple_notifiers_preserved_in_order(): + config = ( + FlowDoctor.builder("test") + .add_notifier(SlackNotifierConfig(webhook_url="https://slack")) + .add_notifier(EmailNotifierConfig(sender="x@y.com", recipients="x@y.com")) + .add_notifier(GitHubNotifierConfig(repo="o/r", token="ghs_xxx")) + .build_config() + ) + assert [n.type for n in config.notify] == ["slack", "email", "github"] + + +# --------------------------------------------------------------------------- +# Discriminated union round-trip +# --------------------------------------------------------------------------- + + +def test_notifier_config_discriminated_union_picks_correct_type(): + """When deserializing a dict, Pydantic must pick the right concrete + config based on the ``type`` discriminator.""" + from pydantic import TypeAdapter + + from flow_doctor.notify import NotifierConfig + + adapter = TypeAdapter(NotifierConfig) + parsed = adapter.validate_python( + {"type": "email", "sender": "x@y.com", "recipients": "x@y.com"} + ) + assert isinstance(parsed, EmailNotifierConfig) + assert parsed.sender == "x@y.com" + + parsed = adapter.validate_python( + {"type": "slack", "webhook_url": "https://x", "channel": "#ops"} + ) + assert isinstance(parsed, SlackNotifierConfig) + assert parsed.channel == "#ops" + + +def test_notifier_config_union_rejects_unknown_type(): + """Discriminator union must surface unknown channel types as a + pydantic ValidationError, not a silent fallback.""" + from pydantic import TypeAdapter, ValidationError + + from flow_doctor.notify import NotifierConfig + + adapter = TypeAdapter(NotifierConfig) + with pytest.raises(ValidationError): + adapter.validate_python({"type": "telegram", "webhook_url": "x"}) From 870e40df9657680ac49796b0842eb3c8ec632f8c Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Wed, 13 May 2026 12:26:25 -0700 Subject: [PATCH 3/4] Add FlowDoctorProtocol, contextvars, and async report_async API First slice of Proposal 2 from the plan: - flow_doctor/_protocol.py defines @runtime_checkable FlowDoctorProtocol declaring report() / guard() / monitor() / report_async() as the cross-version public contract. Consumers can now type-hint against the Protocol and swap in test doubles (RecordingFlowDoctor lands in the next commit) with mypy / isinstance verification. - flow_doctor/core/_context.py defines per-task/-thread contextvars for flow_name + stage + arbitrary extras, exposed via flow_doctor.context(). Inner scopes shadow outer ones; the active snapshot is merged into every report's context payload at _build_context() time. Deep call-stacks no longer need to thread context=... explicitly. - FlowDoctor.report_async() coroutine runs the existing sync pipeline via asyncio.to_thread() so async callers don't block the event loop. contextvars are inherited across the thread boundary automatically via contextvars.copy_context() (asyncio.to_thread does this for free). Suite: 322/322 pass (8 new). Tests cover: runtime Protocol satisfaction for FlowDoctor + arbitrary stubs, contextvars propagation onto reports, inner-scope shadowing, post-scope reset (no leaks across tests), and contextvar inheritance into report_async's worker thread. Co-Authored-By: Claude Opus 4.7 (1M context) --- flow_doctor/__init__.py | 5 + flow_doctor/_protocol.py | 68 ++++++++++++++ flow_doctor/core/_context.py | 102 ++++++++++++++++++++ flow_doctor/core/client.py | 49 ++++++++++ tests/test_protocol_and_context.py | 145 +++++++++++++++++++++++++++++ 5 files changed, 369 insertions(+) create mode 100644 flow_doctor/_protocol.py create mode 100644 flow_doctor/core/_context.py create mode 100644 tests/test_protocol_and_context.py diff --git a/flow_doctor/__init__.py b/flow_doctor/__init__.py index f1d6110..e1761aa 100644 --- a/flow_doctor/__init__.py +++ b/flow_doctor/__init__.py @@ -1,5 +1,7 @@ """Flow Doctor -- call-site error handler for pipeline reliability.""" +from flow_doctor._protocol import FlowDoctorProtocol +from flow_doctor.core._context import context, current_context from flow_doctor.core.builder import FlowDoctorBuilder from flow_doctor.core.client import FlowDoctor, init from flow_doctor.core.errors import ConfigError, FlowDoctorError @@ -20,11 +22,14 @@ "FlowDoctorBuilder", "FlowDoctorError", "FlowDoctorHandler", + "FlowDoctorProtocol", "GitHubNotifierConfig", "NotifierConfig", "S3NotifierConfig", "Severity", "SlackNotifierConfig", + "context", + "current_context", "init", ] __version__ = "0.4.0" diff --git a/flow_doctor/_protocol.py b/flow_doctor/_protocol.py new file mode 100644 index 0000000..3708e91 --- /dev/null +++ b/flow_doctor/_protocol.py @@ -0,0 +1,68 @@ +"""Public ``FlowDoctorProtocol`` contract. + +Consumers type-hint against the Protocol rather than the concrete +``FlowDoctor`` class so they can swap in test doubles (e.g. +``RecordingFlowDoctor``) and let ``mypy --strict`` verify the contract. + +The Protocol is intentionally tight — it declares only the surface +consumers can rely on across versions. Internal helpers (``status()``, +``digest()``, ``history()``, ``get_handler()``, etc.) are NOT part of +the Protocol so we can evolve them without bumping the major version. +""" + +from __future__ import annotations + +from typing import ( + Any, + Awaitable, + Callable, + ContextManager, + Dict, + Optional, + Protocol, + Union, + runtime_checkable, +) + + +@runtime_checkable +class FlowDoctorProtocol(Protocol): + """The cross-version public contract of a flow-doctor instance.""" + + def report( + self, + error: Union[BaseException, str, None] = None, + *, + severity: str = "error", + context: Optional[Dict[str, Any]] = None, + logs: Optional[str] = None, + message: Optional[str] = None, + ) -> Optional[str]: + """Report an error or message. Returns a report id, or None + if suppressed by dedup. Never raises on the caller's behalf.""" + ... + + def guard(self) -> ContextManager[None]: + """Context manager that reports + re-raises any exception + raised within its block.""" + ... + + def monitor(self, func: Optional[Callable] = None, **kwargs: Any) -> Any: + """Decorator form of ``guard()``.""" + ... + + def report_async( + self, + error: Union[BaseException, str, None] = None, + *, + severity: str = "error", + context: Optional[Dict[str, Any]] = None, + logs: Optional[str] = None, + message: Optional[str] = None, + ) -> Awaitable[Optional[str]]: + """Async counterpart of :meth:`report` — fire-and-forget from + async pipelines without blocking the event loop.""" + ... + + +__all__ = ["FlowDoctorProtocol"] diff --git a/flow_doctor/core/_context.py b/flow_doctor/core/_context.py new file mode 100644 index 0000000..4c48c69 --- /dev/null +++ b/flow_doctor/core/_context.py @@ -0,0 +1,102 @@ +"""Context-variable plumbing for ``flow_name`` / ``stage`` propagation. + +Deep call-stacks shouldn't have to thread ``context={"stage": "..."}`` +through every layer to land it on a Flow Doctor report. The contextvars +defined here let consumers wrap a section of work in +``flow_doctor.context(flow_name=..., stage=...)`` and any +``fd.report(...)`` inside auto-picks up those values without explicit +plumbing. ``ContextVar`` is per-task in asyncio and per-thread in sync +code, so the propagation is correct under concurrency. +""" + +from __future__ import annotations + +import contextvars +from contextlib import contextmanager +from typing import Any, Dict, Iterator, Optional + +_flow_name_var: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar( + "flow_doctor.flow_name", default=None +) +_stage_var: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar( + "flow_doctor.stage", default=None +) +# Arbitrary extras for advanced consumers — appears under ``context.extra`` +# on the persisted Report alongside ``flow_name``/``stage``. +_extra_var: contextvars.ContextVar[Optional[Dict[str, Any]]] = contextvars.ContextVar( + "flow_doctor.extra", default=None +) + + +def current_flow_name() -> Optional[str]: + return _flow_name_var.get() + + +def current_stage() -> Optional[str]: + return _stage_var.get() + + +def current_extra() -> Optional[Dict[str, Any]]: + return _extra_var.get() + + +def current_context() -> Dict[str, Any]: + """Snapshot the current Flow Doctor contextvars as a dict, suitable + for merging into a report's ``context`` payload.""" + ctx: Dict[str, Any] = {} + fn = _flow_name_var.get() + if fn: + ctx["flow_name"] = fn + stg = _stage_var.get() + if stg: + ctx["stage"] = stg + extra = _extra_var.get() + if extra: + ctx.update(extra) + return ctx + + +@contextmanager +def context( + *, + flow_name: Optional[str] = None, + stage: Optional[str] = None, + **extra: Any, +) -> Iterator[None]: + """Push ``flow_name`` / ``stage`` / arbitrary extras onto the + current execution context for the duration of the ``with`` block. + + Nesting is supported — inner contexts shadow outer ones for the keys + they specify. Other keys remain visible from the outer scope:: + + with flow_doctor.context(flow_name="morning-signal", stage="ingest"): + run_ingest() + with flow_doctor.context(stage="rank"): + run_rank() + # any fd.report() inside picks up flow_name="morning-signal", + # stage="rank" + """ + tokens = [] + if flow_name is not None: + tokens.append(_flow_name_var.set(flow_name)) + if stage is not None: + tokens.append(_stage_var.set(stage)) + if extra: + merged = dict(_extra_var.get() or {}) + merged.update(extra) + tokens.append(_extra_var.set(merged)) + try: + yield + finally: + # Reset in reverse order so nested ``set`` calls unwind correctly. + for token in reversed(tokens): + token.var.reset(token) + + +__all__ = [ + "context", + "current_context", + "current_extra", + "current_flow_name", + "current_stage", +] diff --git a/flow_doctor/core/client.py b/flow_doctor/core/client.py index 298bf9a..29d74ec 100644 --- a/flow_doctor/core/client.py +++ b/flow_doctor/core/client.py @@ -378,6 +378,41 @@ def report( print(f"[flow-doctor] Internal error in report(): {exc}", file=sys.stderr) return None + async def report_async( + self, + error: Any = None, + *, + severity: str = Severity.ERROR.value, + context: Optional[Dict[str, Any]] = None, + logs: Optional[str] = None, + message: Optional[str] = None, + ) -> Optional[str]: + """Async counterpart of :meth:`report`. + + Offloads the sync persist / notify / diagnosis work to a thread + so an async caller's event loop stays unblocked. The thread + inherits the current ``contextvars`` automatically (``asyncio.to_thread`` + uses ``contextvars.copy_context()``), so any ``flow_doctor.context()`` + scope active in the caller propagates to the recorded report. + """ + import asyncio + + try: + return await asyncio.to_thread( + self._do_report, + error, + severity=severity, + context=context, + logs=logs, + message=message, + ) + except Exception as exc: + print( + f"[flow-doctor] Internal error in report_async(): {exc}", + file=sys.stderr, + ) + return None + def _do_report( self, error: Any, @@ -488,6 +523,20 @@ def _build_context(self, user_context: Optional[Dict[str, Any]]) -> Dict[str, An env_subset[key] = os.environ[key] ctx["environment"] = self._scrubber.scrub_env_vars(env_subset) + # Auto-pick contextvars stamped via flow_doctor.context(...). + # Inner scopes shadow outer ones; the keys land at the top level + # so downstream notifiers and digests can surface ``stage`` + # without crawling into ``user``. + from flow_doctor.core._context import current_context as _current_context + + ambient = _current_context() + if ambient: + # ``flow_name`` here overrides the static config value when the + # caller explicitly stamped a different flow_name — useful for + # multi-tenant pipelines that share a single FlowDoctor. + for k, v in ambient.items(): + ctx[k] = v + # User-supplied context if user_context: ctx["user"] = self._scrubber.scrub_dict(user_context) diff --git a/tests/test_protocol_and_context.py b/tests/test_protocol_and_context.py new file mode 100644 index 0000000..718076f --- /dev/null +++ b/tests/test_protocol_and_context.py @@ -0,0 +1,145 @@ +"""Tests for FlowDoctorProtocol, flow_doctor.context(), and report_async().""" + +from __future__ import annotations + +import asyncio +import tempfile + +import flow_doctor +from flow_doctor import FlowDoctor, FlowDoctorProtocol + + +def _make_fd(db_path: str) -> FlowDoctor: + return FlowDoctor.builder("ctx-test").with_store(path=db_path).build() + + +# --------------------------------------------------------------------------- +# FlowDoctorProtocol +# --------------------------------------------------------------------------- + + +def test_flow_doctor_satisfies_protocol_at_runtime(): + """FlowDoctor (concrete) must pass a runtime isinstance() check + against the Protocol, so consumers can swap in test doubles.""" + with tempfile.NamedTemporaryFile(suffix=".db") as f: + fd = _make_fd(f.name) + assert isinstance(fd, FlowDoctorProtocol) + + +def test_protocol_is_runtime_checkable(): + """``@runtime_checkable`` lets us isinstance-check non-FlowDoctor + objects that happen to implement the surface — important for the + RecordingFlowDoctor double landing in a follow-up commit.""" + + class _StubDoctor: + def report(self, error=None, *, severity="error", context=None, logs=None, message=None): + return "stub" + + def guard(self): + from contextlib import contextmanager + + @contextmanager + def _cm(): + yield + + return _cm() + + def monitor(self, func=None, **kwargs): + return func + + async def report_async(self, error=None, *, severity="error", context=None, logs=None, message=None): + return "stub-async" + + assert isinstance(_StubDoctor(), FlowDoctorProtocol) + + +# --------------------------------------------------------------------------- +# flow_doctor.context() + contextvars propagation +# --------------------------------------------------------------------------- + + +def test_context_manager_propagates_flow_name_and_stage_to_report(): + with tempfile.NamedTemporaryFile(suffix=".db") as f: + fd = _make_fd(f.name) + with flow_doctor.context(flow_name="morning-signal", stage="ingest"): + report_id = fd.report(ValueError("boom")) + assert report_id is not None + reports = fd.history() + assert reports[0].context.get("flow_name") == "morning-signal" + assert reports[0].context.get("stage") == "ingest" + + +def test_context_nesting_inner_scope_shadows_outer(): + """Inner context wins for keys it specifies; unspecified keys + fall through from the outer scope.""" + with tempfile.NamedTemporaryFile(suffix=".db") as f: + fd = _make_fd(f.name) + with flow_doctor.context(flow_name="morning-signal", stage="ingest"): + with flow_doctor.context(stage="rank"): + report_id = fd.report(ValueError("inner")) + assert report_id is not None + reports = fd.history() + ctx = reports[0].context + assert ctx.get("flow_name") == "morning-signal" # from outer + assert ctx.get("stage") == "rank" # shadowed by inner + + +def test_context_resets_after_exit(): + """After the ``with`` block exits, contextvars must be back to + the prior state — leaks between tests would be a correctness bug.""" + from flow_doctor.core._context import current_flow_name, current_stage + + assert current_flow_name() is None + assert current_stage() is None + with flow_doctor.context(flow_name="x", stage="y"): + assert current_flow_name() == "x" + assert current_stage() == "y" + assert current_flow_name() is None + assert current_stage() is None + + +def test_context_extra_kwargs_merged_into_report_context(): + with tempfile.NamedTemporaryFile(suffix=".db") as f: + fd = _make_fd(f.name) + with flow_doctor.context(flow_name="t", run_id="run-42"): + fd.report("synthetic") + ctx = fd.history()[0].context + assert ctx.get("flow_name") == "t" + assert ctx.get("run_id") == "run-42" + + +# --------------------------------------------------------------------------- +# report_async() +# --------------------------------------------------------------------------- + + +def test_report_async_persists_a_report(): + with tempfile.NamedTemporaryFile(suffix=".db") as f: + fd = _make_fd(f.name) + + async def _run(): + return await fd.report_async(ValueError("async boom")) + + report_id = asyncio.run(_run()) + assert report_id is not None + reports = fd.history() + assert reports[0].error_type == "ValueError" + + +def test_report_async_inherits_contextvars_across_thread_boundary(): + """asyncio.to_thread (used internally by report_async) snapshots + contextvars via contextvars.copy_context() before dispatching the + worker. The flow_doctor.context() values set in the calling task + must therefore land on the persisted report.""" + with tempfile.NamedTemporaryFile(suffix=".db") as f: + fd = _make_fd(f.name) + + async def _run(): + with flow_doctor.context(flow_name="async-flow", stage="rank"): + return await fd.report_async("rank failure") + + report_id = asyncio.run(_run()) + assert report_id is not None + ctx = fd.history()[0].context + assert ctx.get("flow_name") == "async-flow" + assert ctx.get("stage") == "rank" From 42646ba3867798e06f9ceb27c328a3a091bd6bc2 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Wed, 13 May 2026 12:28:12 -0700 Subject: [PATCH 4/4] Add flow_doctor.testing pytest plugin + RecordingFlowDoctor Closes the testing-module half of Proposal 2: - flow_doctor/testing/_recording.py defines ReportedIncident dataclass (error / severity / context / logs / message + exc_type/exc_message derived from BaseException + ambient_context snapshotted from any flow_doctor.context() scope at report time) and RecordingFlowDoctor which implements FlowDoctorProtocol with report() / report_async() / guard() / monitor() recording calls in-memory. Ergonomic helpers: .clear(), .last, .of_type(exc_name). - flow_doctor/testing/_plugin.py exposes a `flow_doctor_recorder` pytest fixture (fresh per test). - pyproject.toml registers the plugin via [project.entry-points.pytest11], so downstreams pip install flow-doctor and get the fixture auto-discovered with no imports in their tests. 13 new tests cover: Protocol satisfaction at runtime, exception/string metadata capture, ambient flow_doctor.context() propagation onto the recorded ambient_context dict, .clear()/.of_type()/.last helpers, guard() and monitor() decorator behaviour, async report_async() round- trip, fresh-per-test fixture isolation, and pytest11 entry-point wiring (the fixture arrives without an explicit import in the consumer test). Suite: 335/335 pass (322 prior + 13 new). Co-Authored-By: Claude Opus 4.7 (1M context) --- flow_doctor/testing/__init__.py | 23 +++++ flow_doctor/testing/_plugin.py | 29 ++++++ flow_doctor/testing/_recording.py | 140 ++++++++++++++++++++++++++ pyproject.toml | 3 + tests/test_testing_recorder.py | 160 ++++++++++++++++++++++++++++++ 5 files changed, 355 insertions(+) create mode 100644 flow_doctor/testing/__init__.py create mode 100644 flow_doctor/testing/_plugin.py create mode 100644 flow_doctor/testing/_recording.py create mode 100644 tests/test_testing_recorder.py diff --git a/flow_doctor/testing/__init__.py b/flow_doctor/testing/__init__.py new file mode 100644 index 0000000..7eb0cc8 --- /dev/null +++ b/flow_doctor/testing/__init__.py @@ -0,0 +1,23 @@ +"""Test utilities for downstream consumers of flow-doctor. + +This package ships a ``RecordingFlowDoctor`` test double that implements +:class:`flow_doctor.FlowDoctorProtocol` and a pytest plugin that +registers a ``flow_doctor_recorder`` fixture. Downstreams just +``pip install flow-doctor`` — no import required in their test files +— and the fixture is auto-discovered via ``entry_points.pytest11``. + +Direct imports are still supported for projects that prefer to be +explicit:: + + from flow_doctor.testing import RecordingFlowDoctor, ReportedIncident +""" + +from flow_doctor.testing._recording import ( + RecordingFlowDoctor, + ReportedIncident, +) + +__all__ = [ + "RecordingFlowDoctor", + "ReportedIncident", +] diff --git a/flow_doctor/testing/_plugin.py b/flow_doctor/testing/_plugin.py new file mode 100644 index 0000000..ffeca0c --- /dev/null +++ b/flow_doctor/testing/_plugin.py @@ -0,0 +1,29 @@ +"""Pytest plugin entry point. + +Registered via ``[project.entry-points.pytest11]`` in ``pyproject.toml``. +Once ``flow-doctor`` is installed, the ``flow_doctor_recorder`` fixture +is auto-discoverable in any pytest test file without an ``import``. +""" + +from __future__ import annotations + +import pytest + +from flow_doctor.testing._recording import RecordingFlowDoctor + + +@pytest.fixture +def flow_doctor_recorder() -> RecordingFlowDoctor: + """A fresh :class:`RecordingFlowDoctor` per test. + + Usage:: + + def test_pipeline_reports_db_errors(flow_doctor_recorder): + run_pipeline_that_should_fail(flow_doctor_recorder) + assert len(flow_doctor_recorder.reports) == 1 + assert flow_doctor_recorder.last.exc_type == "DBError" + """ + return RecordingFlowDoctor() + + +__all__ = ["flow_doctor_recorder"] diff --git a/flow_doctor/testing/_recording.py b/flow_doctor/testing/_recording.py new file mode 100644 index 0000000..1bd4ed2 --- /dev/null +++ b/flow_doctor/testing/_recording.py @@ -0,0 +1,140 @@ +"""``RecordingFlowDoctor`` — in-memory test double for downstream tests. + +Records every ``report()`` / ``report_async()`` call as a +:class:`ReportedIncident` so consumer test files can make crisp +assertions on the behaviour of their pipelines under failure. + +Implements :class:`flow_doctor.FlowDoctorProtocol`, so wherever +production code expects a ``FlowDoctorProtocol`` you can drop in +a ``RecordingFlowDoctor`` and ``mypy --strict`` stays clean. +""" + +from __future__ import annotations + +import functools +from contextlib import contextmanager +from dataclasses import dataclass, field +from typing import Any, Callable, ContextManager, Dict, Iterator, List, Optional + + +@dataclass +class ReportedIncident: + """One captured ``report()`` call. Comparable for ergonomic asserts.""" + + error: Any = None + severity: str = "error" + context: Optional[Dict[str, Any]] = None + logs: Optional[str] = None + message: Optional[str] = None + exc_type: Optional[str] = None + exc_message: Optional[str] = None + # Populated automatically from any ``flow_doctor.context(...)`` scope + # active when ``report()`` was called. + ambient_context: Dict[str, Any] = field(default_factory=dict) + + +class RecordingFlowDoctor: + """In-memory recorder satisfying :class:`FlowDoctorProtocol`.""" + + def __init__(self) -> None: + self.reports: List[ReportedIncident] = [] + self._id_counter = 0 + + # ----- public protocol ------------------------------------------------- + + def report( + self, + error: Any = None, + *, + severity: str = "error", + context: Optional[Dict[str, Any]] = None, + logs: Optional[str] = None, + message: Optional[str] = None, + ) -> Optional[str]: + from flow_doctor.core._context import current_context + + exc_type: Optional[str] = None + exc_message: Optional[str] = None + if isinstance(error, BaseException): + exc_type = type(error).__qualname__ + exc_message = str(error) + incident = ReportedIncident( + error=error, + severity=severity, + context=context, + logs=logs, + message=message, + exc_type=exc_type, + exc_message=exc_message, + ambient_context=dict(current_context()), + ) + self.reports.append(incident) + self._id_counter += 1 + return f"recorded-{self._id_counter}" + + async def report_async( + self, + error: Any = None, + *, + severity: str = "error", + context: Optional[Dict[str, Any]] = None, + logs: Optional[str] = None, + message: Optional[str] = None, + ) -> Optional[str]: + # No event-loop work to do — recording is in-memory. We keep the + # method async so the Protocol contract is satisfied and consumer + # ``await fd.report_async(...)`` calls in tests round-trip cleanly. + return self.report( + error, + severity=severity, + context=context, + logs=logs, + message=message, + ) + + @contextmanager + def guard(self) -> Iterator[None]: + try: + yield + except Exception as exc: + self.report(exc) + raise + + def monitor(self, func: Optional[Callable] = None, **kwargs: Any) -> Any: + if func is None: + + def decorator(f: Callable) -> Callable: + return self._wrap_monitor(f) + + return decorator + return self._wrap_monitor(func) + + def _wrap_monitor(self, func: Callable) -> Callable: + @functools.wraps(func) + def wrapper(*args: Any, **kw: Any) -> Any: + try: + return func(*args, **kw) + except Exception as exc: + self.report(exc) + raise + + return wrapper + + # ----- ergonomic helpers for tests ------------------------------------ + + def clear(self) -> None: + """Reset captured reports — useful for table-driven tests.""" + self.reports.clear() + self._id_counter = 0 + + @property + def last(self) -> Optional[ReportedIncident]: + """Most recently captured incident, or None if empty.""" + return self.reports[-1] if self.reports else None + + def of_type(self, exc_type: str) -> List[ReportedIncident]: + """Filter captured incidents by exception type name.""" + return [r for r in self.reports if r.exc_type == exc_type] + + +__all__ = ["RecordingFlowDoctor", "ReportedIncident"] diff --git a/pyproject.toml b/pyproject.toml index ca24e86..c2e5b73 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,9 @@ Issues = "https://github.com/cipher813/flow-doctor/issues" [project.scripts] flow-doctor = "flow_doctor.fix.cli:main" +[project.entry-points.pytest11] +flow_doctor_testing = "flow_doctor.testing._plugin" + [tool.setuptools.packages.find] include = ["flow_doctor*"] diff --git a/tests/test_testing_recorder.py b/tests/test_testing_recorder.py new file mode 100644 index 0000000..2197136 --- /dev/null +++ b/tests/test_testing_recorder.py @@ -0,0 +1,160 @@ +"""Tests for the flow_doctor.testing pytest plugin + RecordingFlowDoctor. + +The plugin is registered via [project.entry-points.pytest11] in +pyproject.toml. The fixture is exercised here both via direct import and +via the auto-discovered fixture name to verify the entry-point wiring. +""" + +from __future__ import annotations + +import asyncio + +import pytest + +import flow_doctor +from flow_doctor import FlowDoctorProtocol +from flow_doctor.testing import RecordingFlowDoctor, ReportedIncident + + +# --------------------------------------------------------------------------- +# Protocol satisfaction +# --------------------------------------------------------------------------- + + +def test_recording_flow_doctor_satisfies_protocol(): + """RecordingFlowDoctor must runtime-isinstance against the Protocol + so production code typed as FlowDoctorProtocol accepts it as a drop-in.""" + rec = RecordingFlowDoctor() + assert isinstance(rec, FlowDoctorProtocol) + + +# --------------------------------------------------------------------------- +# report() capture +# --------------------------------------------------------------------------- + + +def test_report_captures_exception_metadata(): + rec = RecordingFlowDoctor() + rid = rec.report(ValueError("bad input")) + assert rid == "recorded-1" + assert len(rec.reports) == 1 + incident = rec.last + assert isinstance(incident, ReportedIncident) + assert incident.exc_type == "ValueError" + assert incident.exc_message == "bad input" + assert incident.severity == "error" + + +def test_report_captures_string_message(): + rec = RecordingFlowDoctor() + rec.report("synthetic warning", severity="warning") + assert rec.last.exc_type is None + assert rec.last.error == "synthetic warning" + assert rec.last.severity == "warning" + + +def test_report_captures_explicit_context_and_logs(): + rec = RecordingFlowDoctor() + rec.report( + RuntimeError("x"), + context={"order_id": 42}, + logs="prior log line", + ) + assert rec.last.context == {"order_id": 42} + assert rec.last.logs == "prior log line" + + +def test_clear_resets_state(): + rec = RecordingFlowDoctor() + rec.report("a") + rec.report("b") + rec.clear() + assert rec.reports == [] + assert rec.report("c") == "recorded-1" + + +def test_of_type_filters_by_exception_class(): + rec = RecordingFlowDoctor() + rec.report(ValueError("v")) + rec.report(RuntimeError("r")) + rec.report(ValueError("v2")) + assert len(rec.of_type("ValueError")) == 2 + assert len(rec.of_type("RuntimeError")) == 1 + + +# --------------------------------------------------------------------------- +# Ambient context propagation +# --------------------------------------------------------------------------- + + +def test_report_captures_ambient_flow_doctor_context(): + rec = RecordingFlowDoctor() + with flow_doctor.context(flow_name="morning-signal", stage="rank"): + rec.report(ValueError("rank failure")) + assert rec.last.ambient_context == { + "flow_name": "morning-signal", + "stage": "rank", + } + + +# --------------------------------------------------------------------------- +# guard() + monitor() +# --------------------------------------------------------------------------- + + +def test_guard_reports_and_reraises(): + rec = RecordingFlowDoctor() + with pytest.raises(KeyError): + with rec.guard(): + raise KeyError("missing") + assert rec.last.exc_type == "KeyError" + + +def test_monitor_decorator_reports_and_reraises(): + rec = RecordingFlowDoctor() + + @rec.monitor + def fail(): + raise ZeroDivisionError("nope") + + with pytest.raises(ZeroDivisionError): + fail() + assert rec.last.exc_type == "ZeroDivisionError" + + +# --------------------------------------------------------------------------- +# report_async() +# --------------------------------------------------------------------------- + + +def test_report_async_captures_in_recorder(): + rec = RecordingFlowDoctor() + + async def _run(): + await rec.report_async(ValueError("async")) + + asyncio.run(_run()) + assert rec.last.exc_type == "ValueError" + + +# --------------------------------------------------------------------------- +# Pytest plugin auto-fixture +# --------------------------------------------------------------------------- + + +def test_pytest_plugin_fixture_is_auto_discovered(flow_doctor_recorder): + """``flow_doctor_recorder`` arrives without an explicit ``import`` — + this exercises the [project.entry-points.pytest11] wiring.""" + assert isinstance(flow_doctor_recorder, RecordingFlowDoctor) + flow_doctor_recorder.report(ValueError("fixture-supplied")) + assert flow_doctor_recorder.last.exc_type == "ValueError" + + +def test_pytest_plugin_fixture_is_fresh_per_test_a(flow_doctor_recorder): + flow_doctor_recorder.report("test-a") + assert len(flow_doctor_recorder.reports) == 1 + + +def test_pytest_plugin_fixture_is_fresh_per_test_b(flow_doctor_recorder): + """If the fixture leaked state from test_a, this assertion would fail.""" + assert flow_doctor_recorder.reports == []