Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions flow_doctor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,35 @@
"""Flow Doctor -- call-site error handler for pipeline reliability."""

from flow_doctor._protocol import FlowDoctorProtocol
from flow_doctor.core._context import context, current_context
from flow_doctor.core.builder import FlowDoctorBuilder
from flow_doctor.core.client import FlowDoctor, init
from flow_doctor.core.errors import ConfigError, FlowDoctorError
from flow_doctor.core.handler import FlowDoctorHandler
from flow_doctor.core.models import Severity
from flow_doctor.notify.configs import (
EmailNotifierConfig,
GitHubNotifierConfig,
NotifierConfig,
S3NotifierConfig,
SlackNotifierConfig,
)

__all__ = [
"ConfigError",
"EmailNotifierConfig",
"FlowDoctor",
"FlowDoctorBuilder",
"FlowDoctorError",
"FlowDoctorHandler",
"FlowDoctorProtocol",
"GitHubNotifierConfig",
"NotifierConfig",
"S3NotifierConfig",
"Severity",
"SlackNotifierConfig",
"context",
"current_context",
"init",
]
__version__ = "0.4.0"
68 changes: 68 additions & 0 deletions flow_doctor/_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""Public ``FlowDoctorProtocol`` contract.

Consumers type-hint against the Protocol rather than the concrete
``FlowDoctor`` class so they can swap in test doubles (e.g.
``RecordingFlowDoctor``) and let ``mypy --strict`` verify the contract.

The Protocol is intentionally tight — it declares only the surface
consumers can rely on across versions. Internal helpers (``status()``,
``digest()``, ``history()``, ``get_handler()``, etc.) are NOT part of
the Protocol so we can evolve them without bumping the major version.
"""

from __future__ import annotations

from typing import (
Any,
Awaitable,
Callable,
ContextManager,
Dict,
Optional,
Protocol,
Union,
runtime_checkable,
)


@runtime_checkable
class FlowDoctorProtocol(Protocol):
"""The cross-version public contract of a flow-doctor instance."""

def report(
self,
error: Union[BaseException, str, None] = None,
*,
severity: str = "error",
context: Optional[Dict[str, Any]] = None,
logs: Optional[str] = None,
message: Optional[str] = None,
) -> Optional[str]:
"""Report an error or message. Returns a report id, or None
if suppressed by dedup. Never raises on the caller's behalf."""
...

def guard(self) -> ContextManager[None]:
"""Context manager that reports + re-raises any exception
raised within its block."""
...

def monitor(self, func: Optional[Callable] = None, **kwargs: Any) -> Any:
"""Decorator form of ``guard()``."""
...

def report_async(
self,
error: Union[BaseException, str, None] = None,
*,
severity: str = "error",
context: Optional[Dict[str, Any]] = None,
logs: Optional[str] = None,
message: Optional[str] = None,
) -> Awaitable[Optional[str]]:
"""Async counterpart of :meth:`report` — fire-and-forget from
async pipelines without blocking the event loop."""
...


__all__ = ["FlowDoctorProtocol"]
102 changes: 102 additions & 0 deletions flow_doctor/core/_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Context-variable plumbing for ``flow_name`` / ``stage`` propagation.

Deep call-stacks shouldn't have to thread ``context={"stage": "..."}``
through every layer to land it on a Flow Doctor report. The contextvars
defined here let consumers wrap a section of work in
``flow_doctor.context(flow_name=..., stage=...)`` and any
``fd.report(...)`` inside auto-picks up those values without explicit
plumbing. ``ContextVar`` is per-task in asyncio and per-thread in sync
code, so the propagation is correct under concurrency.
"""

from __future__ import annotations

import contextvars
from contextlib import contextmanager
from typing import Any, Dict, Iterator, Optional

_flow_name_var: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar(
"flow_doctor.flow_name", default=None
)
_stage_var: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar(
"flow_doctor.stage", default=None
)
# Arbitrary extras for advanced consumers — appears under ``context.extra``
# on the persisted Report alongside ``flow_name``/``stage``.
_extra_var: contextvars.ContextVar[Optional[Dict[str, Any]]] = contextvars.ContextVar(
"flow_doctor.extra", default=None
)


def current_flow_name() -> Optional[str]:
return _flow_name_var.get()


def current_stage() -> Optional[str]:
return _stage_var.get()


def current_extra() -> Optional[Dict[str, Any]]:
return _extra_var.get()


def current_context() -> Dict[str, Any]:
"""Snapshot the current Flow Doctor contextvars as a dict, suitable
for merging into a report's ``context`` payload."""
ctx: Dict[str, Any] = {}
fn = _flow_name_var.get()
if fn:
ctx["flow_name"] = fn
stg = _stage_var.get()
if stg:
ctx["stage"] = stg
extra = _extra_var.get()
if extra:
ctx.update(extra)
return ctx


@contextmanager
def context(
*,
flow_name: Optional[str] = None,
stage: Optional[str] = None,
**extra: Any,
) -> Iterator[None]:
"""Push ``flow_name`` / ``stage`` / arbitrary extras onto the
current execution context for the duration of the ``with`` block.

Nesting is supported — inner contexts shadow outer ones for the keys
they specify. Other keys remain visible from the outer scope::

with flow_doctor.context(flow_name="morning-signal", stage="ingest"):
run_ingest()
with flow_doctor.context(stage="rank"):
run_rank()
# any fd.report() inside picks up flow_name="morning-signal",
# stage="rank"
"""
tokens = []
if flow_name is not None:
tokens.append(_flow_name_var.set(flow_name))
if stage is not None:
tokens.append(_stage_var.set(stage))
if extra:
merged = dict(_extra_var.get() or {})
merged.update(extra)
tokens.append(_extra_var.set(merged))
try:
yield
finally:
# Reset in reverse order so nested ``set`` calls unwind correctly.
for token in reversed(tokens):
token.var.reset(token)


__all__ = [
"context",
"current_context",
"current_extra",
"current_flow_name",
"current_stage",
]
190 changes: 190 additions & 0 deletions flow_doctor/core/builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
"""Typed fluent builder for ``FlowDoctor`` — no yaml required.

The builder is the SOTA-facing replacement for ``flow_doctor.init()``.
Each method returns ``self`` so calls chain, each accepts a typed
Pydantic sub-config so IDE autocomplete and ``mypy --strict`` work
end-to-end. ``init(config_path=...)`` keeps working for back-compat
(see private/plug-and-play-260513.md Proposal 1).
"""

from __future__ import annotations

from typing import TYPE_CHECKING, List, Optional, Union

from flow_doctor.core.config import (
AutoFixConfig,
DiagnosisConfig,
FlowDoctorConfig,
GitHubConfig,
HandlerConfig,
NotifyChannelConfig,
RateLimitConfig,
RemediationConfig,
StoreConfig,
)
from flow_doctor.notify.configs import (
EmailNotifierConfig,
GitHubNotifierConfig,
S3NotifierConfig,
SlackNotifierConfig,
)

if TYPE_CHECKING:
from flow_doctor.core.client import FlowDoctor


_PerTypeNotifier = Union[
SlackNotifierConfig,
EmailNotifierConfig,
GitHubNotifierConfig,
S3NotifierConfig,
]


class FlowDoctorBuilder:
"""Fluent builder for :class:`FlowDoctor`.

Typical usage::

fd = (
FlowDoctor.builder("morning-signal")
.add_notifier(EmailNotifierConfig(
sender="x@y.com",
recipients=["x@y.com"],
smtp_password=os.environ["GMAIL_APP_PASSWORD"],
))
.with_dedup(cooldown_minutes=60)
.build()
)

Every ``with_*`` / ``add_*`` method returns ``self`` so calls chain.
``build()`` materializes a ``FlowDoctorConfig`` and constructs a
``FlowDoctor``; ``build_config()`` returns just the config (useful
for tests and for handing a config off to a custom subclass).
"""

def __init__(self, flow_name: str):
self._flow_name = flow_name
self._repo: Optional[str] = None
self._owner: Optional[str] = None
self._notifiers: List[NotifyChannelConfig] = []
self._store: Optional[StoreConfig] = None
self._dedup_cooldown_minutes: Optional[int] = None
self._rate_limits: Optional[RateLimitConfig] = None
self._diagnosis: Optional[DiagnosisConfig] = None
self._github: Optional[GitHubConfig] = None
self._auto_fix: Optional[AutoFixConfig] = None
self._remediation: Optional[RemediationConfig] = None
self._handler: Optional[HandlerConfig] = None
self._dependencies: List[str] = []

def add_notifier(
self,
cfg: Union[_PerTypeNotifier, NotifyChannelConfig],
) -> "FlowDoctorBuilder":
"""Append a notifier.

Accepts a typed per-channel config (``SlackNotifierConfig``,
``EmailNotifierConfig``, ``GitHubNotifierConfig``,
``S3NotifierConfig``) or the legacy omnibus
``NotifyChannelConfig`` (for back-compat with 0.4.0 callers).
"""
if isinstance(cfg, NotifyChannelConfig):
self._notifiers.append(cfg)
else:
self._notifiers.append(cfg.to_channel_config())
return self

def with_repo(
self, repo: str, *, owner: Optional[str] = None
) -> "FlowDoctorBuilder":
self._repo = repo
if owner is not None:
self._owner = owner
return self

def with_dedup(self, *, cooldown_minutes: int) -> "FlowDoctorBuilder":
self._dedup_cooldown_minutes = cooldown_minutes
return self

def with_store(
self,
*,
type: str = "sqlite",
path: str = "flow_doctor.db",
bucket: Optional[str] = None,
prefix: Optional[str] = None,
) -> "FlowDoctorBuilder":
self._store = StoreConfig(
type=type, path=path, bucket=bucket, prefix=prefix
)
return self

def with_rate_limits(self, cfg: RateLimitConfig) -> "FlowDoctorBuilder":
self._rate_limits = cfg
return self

def with_diagnosis(self, cfg: DiagnosisConfig) -> "FlowDoctorBuilder":
self._diagnosis = cfg
return self

def with_github(self, cfg: GitHubConfig) -> "FlowDoctorBuilder":
self._github = cfg
return self

def with_auto_fix(self, cfg: AutoFixConfig) -> "FlowDoctorBuilder":
self._auto_fix = cfg
return self

def with_remediation(self, cfg: RemediationConfig) -> "FlowDoctorBuilder":
self._remediation = cfg
return self

def with_handler(self, cfg: HandlerConfig) -> "FlowDoctorBuilder":
self._handler = cfg
return self

def with_dependencies(self, deps: List[str]) -> "FlowDoctorBuilder":
self._dependencies = list(deps)
return self

def build_config(self) -> FlowDoctorConfig:
"""Materialize the ``FlowDoctorConfig`` without instantiating
``FlowDoctor``. Useful for tests + for handing the config to
a custom subclass."""
kwargs: dict = {
"flow_name": self._flow_name,
"notify": list(self._notifiers),
}
if self._repo is not None:
kwargs["repo"] = self._repo
if self._owner is not None:
kwargs["owner"] = self._owner
if self._store is not None:
kwargs["store"] = self._store
if self._rate_limits is not None:
kwargs["rate_limits"] = self._rate_limits
if self._dedup_cooldown_minutes is not None:
kwargs["dedup_cooldown_minutes"] = self._dedup_cooldown_minutes
if self._diagnosis is not None:
kwargs["diagnosis"] = self._diagnosis
if self._github is not None:
kwargs["github"] = self._github
if self._auto_fix is not None:
kwargs["auto_fix"] = self._auto_fix
if self._remediation is not None:
kwargs["remediation"] = self._remediation
if self._handler is not None:
kwargs["handler"] = self._handler
if self._dependencies:
kwargs["dependencies"] = self._dependencies
return FlowDoctorConfig(**kwargs)

def build(self, *, strict: bool = True) -> "FlowDoctor":
"""Materialize the config and construct a :class:`FlowDoctor`."""
from flow_doctor.core.client import FlowDoctor

return FlowDoctor(self.build_config(), strict=strict)


__all__ = ["FlowDoctorBuilder"]
Loading
Loading