Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,112 @@

## Unreleased

## 0.5.0rc1 (2026-05-13)

Release-candidate cut of the "plug-and-play" release for internal soak.
`pip install flow-doctor==0.5.0rc1` requires `--pre`, so this build
won't accidentally land on consumers pinning `flow-doctor>=0.4` until
0.5.0 final ships. The content below is the planned 0.5.0 changelog
entry verbatim — 0.5.0 final will republish it once the rcN cycle
clears soak.

Three SOTA-target proposals from the plug-and-play planning doc
(Pydantic v2 config, typed contract + testing plugin, ecosystem polish)
land together. Existing 0.4.0 consumers keep working unchanged —
`flow_doctor.init(config_path=...)` is still supported through the
0.5.0 deprecation window. New consumers should adopt
`FlowDoctor.builder(...)` for typed, IDE-discoverable configuration
with no yaml required.

### Added

- **Pydantic v2 config models.** All 11 config dataclasses
(`FlowDoctorConfig`, `NotifyChannelConfig`, `RateLimitConfig`,
`DiagnosisConfig`, `RemediationConfig`, etc.) are now `pydantic.BaseModel`
via a shared `_ConfigModel` base. Field names + defaults preserved so
existing test fixtures and 0.4.0 callers keep working unchanged.
Adds `pydantic>=2.0` to runtime deps.
- **Typed per-channel notifier configs.** `SlackNotifierConfig`,
`EmailNotifierConfig`, `GitHubNotifierConfig`, `S3NotifierConfig` ship
as Pydantic models exposed as the discriminated union `NotifierConfig`
via `Field(discriminator="type")`. `EmailNotifierConfig.recipients`
accepts a CSV string or a list and normalizes via a `field_validator`.
- **`FlowDoctor.builder()` fluent API.** `FlowDoctor.builder(flow_name)`
returns a `FlowDoctorBuilder` with chainable `add_notifier / with_repo /
with_dedup / with_store / with_diagnosis / with_github / with_auto_fix /
with_remediation / with_handler / with_dependencies` methods plus
`build_config()` and `build(strict=True)`. Recommended entry point for
new code — typed, IDE-discoverable, no yaml.

```python
from flow_doctor import FlowDoctor
from flow_doctor.notify import EmailNotifierConfig

fd = (
FlowDoctor.builder("morning-signal")
.add_notifier(EmailNotifierConfig(
sender="x@y.com",
recipients=["x@y.com"],
smtp_password=os.environ["GMAIL_APP_PASSWORD"],
))
.with_dedup(cooldown_minutes=60)
.build()
)
```
- **`FlowDoctorProtocol` public contract.** `@runtime_checkable`
Protocol declaring `report() / guard() / monitor() / report_async()`.
Consumers type-hint against the Protocol and swap in test doubles
(e.g. `RecordingFlowDoctor`) with `mypy --strict` + `isinstance()`
verification.
- **`flow_doctor.context()` contextvars.** Per-task/-thread contextvars
for `flow_name` / `stage` / arbitrary extras. Inner scopes shadow
outer ones; the active snapshot is merged into every report's
`context` at `_build_context()` time. Deep call-stacks no longer
thread `context=...` explicitly.

```python
with flow_doctor.context(flow_name="morning-signal", stage="rank"):
run_rank() # any fd.report() inside picks up flow_name + stage
```
- **`FlowDoctor.report_async()`.** Async coroutine running the existing
sync pipeline via `asyncio.to_thread()`. `contextvars` inherit across
the thread boundary automatically.
- **`flow_doctor.testing` pytest plugin.** `RecordingFlowDoctor`
in-memory test double implementing `FlowDoctorProtocol` +
`ReportedIncident` dataclass with `.clear() / .last / .of_type(exc_name)`
ergonomic helpers. Pytest fixture `flow_doctor_recorder` registered
via `[project.entry-points.pytest11]` — downstreams `pip install
flow-doctor` and the fixture is auto-discoverable in any test file
with no imports.
- **`flow_doctor.otel.report_to_otel_span_event(report)`.** Pure-Python
OTel-compatible serialization. Maps `flow_name → resource.service.name`,
`context["stage"] → event.name`, exception fields → OTel exception
attributes, severity → severity_text + severity_number, created_at →
time_unix_nano, context dict flattened with `"context."` prefix.
No `opentelemetry-*` dep — the actual OTLP exporter is queued for
v0.6.0.
- **PEP 561 `py.typed` marker.** Ships in the wheel via
`[tool.setuptools.package-data]` so mypy / pyright treat flow-doctor's
annotations as authoritative in `--strict` mode.
- **PEP 702 `@deprecated` markers.** `flow_doctor.init()` carries a
runtime DeprecationWarning + static `__deprecated__` attribute
pointing at `FlowDoctor.builder()`. `NotifyChannelConfig` carries
the static-only marker (`category=None`) because the omnibus form
is still the internal lingua franca the builder folds typed configs
into. Adds `typing_extensions>=4.5` (PEP 702 backport for Python
3.9-3.12; stdlib in 3.13+).

### Deprecated

- `flow_doctor.init(config_path=..., **kwargs)` is deprecated in favor
of `FlowDoctor.builder(...)`. Will be **removed in 0.6.0**. The yaml
shim continues to work through the 0.5.0 series.
- `NotifyChannelConfig` is deprecated for direct construction in favor
of the typed `SlackNotifierConfig` / `EmailNotifierConfig` /
`GitHubNotifierConfig` / `S3NotifierConfig`. Will be **removed in
0.6.0**. Static-only deprecation — no runtime warning is emitted
because the omnibus form is still the internal lingua franca.

### Fixed

- Dedup signatures for non-exception string reports now normalize
Expand All @@ -15,6 +121,16 @@
the cooldown window never engaged. Error codes and other semantic
numbers are preserved so distinct incidents remain distinct.

### Roadmap (deferred to 0.6.0)

- **OTLP exporter notifier.** Direct ship to an OpenTelemetry
collector via `opentelemetry-exporter-otlp`. Shape already ships in
0.5.0 via `flow_doctor.otel.report_to_otel_span_event`.
- **`pydantic-settings` BaseSettings env-var injection.** Pydantic-native
`FLOW_DOCTOR_*` autoload as a parallel path to the existing per-notifier
`_env_fallback` chain.
- **Hard removal of `flow_doctor.init()` and `NotifyChannelConfig`.**

## 0.3.0 (2026-04-10)

Two independent changes folded into one release because 0.2.0 was the
Expand Down
2 changes: 1 addition & 1 deletion flow_doctor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@
"current_context",
"init",
]
__version__ = "0.4.0"
__version__ = "0.5.0rc1"
6 changes: 6 additions & 0 deletions flow_doctor/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from datetime import datetime
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional

from typing_extensions import deprecated

if TYPE_CHECKING:
from flow_doctor.core.builder import FlowDoctorBuilder

Expand Down Expand Up @@ -977,6 +979,10 @@ def digest(self, since: Optional[datetime] = None) -> Optional[str]:
return None


@deprecated(
"flow_doctor.init() is deprecated; use FlowDoctor.builder() for typed, "
"IDE-discoverable configuration. The yaml shim will be removed in 0.6.0."
)
def init(
config_path: Optional[str] = None,
*,
Expand Down
11 changes: 11 additions & 0 deletions flow_doctor/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import yaml
from pydantic import BaseModel, ConfigDict, Field
from typing_extensions import deprecated

from flow_doctor.core.errors import ConfigError

Expand All @@ -34,6 +35,16 @@ class _ConfigModel(BaseModel):
model_config = ConfigDict(extra="ignore", validate_assignment=False)


@deprecated(
"NotifyChannelConfig is deprecated for direct use; construct a typed "
"SlackNotifierConfig / EmailNotifierConfig / GitHubNotifierConfig / "
"S3NotifierConfig from flow_doctor.notify instead. Will be removed in "
"0.6.0. (Static-only deprecation — no runtime DeprecationWarning is "
"emitted because the omnibus model is still the internal lingua franca "
"that the builder folds typed configs into; only consumers constructing "
"it explicitly trip the type-checker.)",
category=None,
)
class NotifyChannelConfig(_ConfigModel):
type: str # "slack", "email", "github", or "s3"
# Slack fields
Expand Down
157 changes: 157 additions & 0 deletions flow_doctor/otel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""OpenTelemetry-compatible serialization for ``Report``.

Pure-Python adapter — no ``opentelemetry-*`` dependency. The actual
exporter (which would speak OTLP to a collector) is deferred to v0.6.0
so the optional ``opentelemetry-exporter-otlp`` dep family can land in
its own release cycle. What ships here is the shape that exporter will
emit, so callers running on Datadog / Honeycomb / Grafana Cloud /
Sentry-via-OTel can already convert reports to OTel ``SpanEvent``
dicts and ship them through their own collector today.

Field mapping (plan table):

================== =========================================
Flow Doctor field OTel SpanEvent field
================== =========================================
flow_name ``resource.service.name``
context["stage"] ``event.name`` (falls back to "report")
error_type ``exception.type`` (attribute)
error_message ``exception.message`` (attribute)
traceback ``exception.stacktrace`` (attribute)
severity ``event.severity_text`` + ``severity_number``
created_at ``time_unix_nano`` (top-level)
context (other) flattened into ``attributes`` with
``context.<key>`` dot-prefix
error_signature ``flow_doctor.error_signature`` attribute
cascade_source ``flow_doctor.cascade_source`` attribute
================== =========================================

OTel attribute values are restricted to ``str | bool | int | float``
(or homogeneous arrays of those). Nested dicts in ``context`` are
flattened with dot-separated keys; non-primitive values are coerced
to ``str(value)`` so the shape stays exporter-safe.
"""

from __future__ import annotations

from datetime import datetime, timezone
from typing import Any, Dict, Iterable, Mapping, Tuple

from flow_doctor.core.models import Report, Severity

# OTel severity_number table (subset relevant to us). Full table at
# https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber
_SEVERITY_TEXT_TO_NUMBER: Dict[str, Tuple[str, int]] = {
Severity.CRITICAL.value: ("FATAL", 21),
Severity.ERROR.value: ("ERROR", 17),
Severity.WARNING.value: ("WARN", 13),
}


def _coerce_attribute_value(v: Any) -> Any:
"""Coerce a Python value to something OTel attributes accept."""
if isinstance(v, (str, bool, int, float)):
return v
if isinstance(v, (list, tuple)):
# OTel arrays must be homogeneous. We promote to a stringified
# list when items are heterogeneous, which is exporter-safe.
coerced = [_coerce_attribute_value(item) for item in v]
types = {type(x) for x in coerced}
if len(types) <= 1:
return coerced
return [str(x) for x in coerced]
if v is None:
return ""
return str(v)


def _flatten_context(
ctx: Mapping[str, Any], prefix: str = "context"
) -> Iterable[Tuple[str, Any]]:
"""Yield ``(attribute_key, attribute_value)`` pairs flattened from a
nested context dict. Nested dicts get dot-prefixed keys; lists get
coerced to OTel-array-safe form via ``_coerce_attribute_value``."""
for k, v in ctx.items():
key = f"{prefix}.{k}" if prefix else k
if isinstance(v, Mapping):
yield from _flatten_context(v, prefix=key)
else:
yield key, _coerce_attribute_value(v)


def _to_unix_nano(ts: datetime) -> int:
"""Convert a (naive UTC by convention) ``datetime`` to nanoseconds
since the Unix epoch."""
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
return int(ts.timestamp() * 1_000_000_000)


def report_to_otel_span_event(report: Report) -> Dict[str, Any]:
"""Serialize a :class:`Report` to an OTel ``SpanEvent``-shaped dict.

The result is JSON-safe and ready to fan-out to any OTel collector
via a future ``OTelExporter`` notifier (v0.6.0) or via the caller's
own collector today. Top-level shape::

{
"resource": {"service.name": "<flow_name>"},
"name": "<event.name>",
"time_unix_nano": 1735603200000000000,
"severity_text": "ERROR",
"severity_number": 17,
"attributes": {
"exception.type": "ValueError",
"exception.message": "boom",
"exception.stacktrace": "...",
"flow_doctor.error_signature": "...",
"context.stage": "ingest",
"context.run_id": "abc",
},
}
"""
context: Dict[str, Any] = report.context or {}
stage = context.get("stage")
event_name = stage if stage else "report"

sev_text, sev_number = _SEVERITY_TEXT_TO_NUMBER.get(
report.severity, (report.severity.upper(), 17)
)

attributes: Dict[str, Any] = {}
if report.error_type:
attributes["exception.type"] = report.error_type
if report.error_message:
attributes["exception.message"] = report.error_message
if report.traceback:
attributes["exception.stacktrace"] = report.traceback
if report.error_signature:
attributes["flow_doctor.error_signature"] = report.error_signature
if report.cascade_source:
attributes["flow_doctor.cascade_source"] = report.cascade_source
if report.dedup_count and report.dedup_count != 1:
attributes["flow_doctor.dedup_count"] = report.dedup_count
if report.logs:
attributes["flow_doctor.logs"] = report.logs

# Flatten any remaining context dict entries (skip the stage we
# already promoted to event.name + the flow_name which is on the
# resource — keeping them only in attributes would duplicate).
for k, v in _flatten_context(context):
# ``context.flow_name`` and ``context.stage`` are promoted —
# skip duplicates.
if k in ("context.flow_name", "context.stage"):
continue
attributes[k] = v

return {
"resource": {"service.name": report.flow_name},
"name": event_name,
"time_unix_nano": _to_unix_nano(report.created_at),
"severity_text": sev_text,
"severity_number": sev_number,
"attributes": attributes,
}


__all__ = ["report_to_otel_span_event"]
Empty file added flow_doctor/py.typed
Empty file.
15 changes: 14 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "flow-doctor"
version = "0.4.0"
version = "0.5.0rc1"
description = "Pipeline error handler: capture, deduplicate, diagnose, and auto-fix failures."
readme = "README.md"
requires-python = ">=3.9"
Expand All @@ -27,6 +27,8 @@ classifiers = [
dependencies = [
"pyyaml>=6.0",
"pydantic>=2.0",
# PEP 702 @deprecated backport for Python 3.9-3.12 (stdlib in 3.13+).
"typing_extensions>=4.5",
]

[project.optional-dependencies]
Expand Down Expand Up @@ -56,5 +58,16 @@ flow_doctor_testing = "flow_doctor.testing._plugin"
[tool.setuptools.packages.find]
include = ["flow_doctor*"]

[tool.setuptools.package-data]
# Ship PEP 561 marker so mypy/pyright treat flow-doctor's annotations as
# authoritative when consumers depend on flow-doctor in --strict mode.
flow_doctor = ["py.typed"]

[tool.pytest.ini_options]
testpaths = ["tests"]
filterwarnings = [
# Our own tests exercise the legacy flow_doctor.init() path. Downstream
# consumers still see the DeprecationWarning at their call site; this
# filter only suppresses the noise inside our suite.
"ignore:flow_doctor.init.. is deprecated:DeprecationWarning",
]
Loading
Loading