diff --git a/CLAUDE.md b/CLAUDE.md index d989065e..fe4552dd 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -357,7 +357,7 @@ src/divineos/ — ——— router.py # Route findings to knowledge/claims/lessons — ——— summary.py # Analytics, HUD integration, unresolved tracking ——— violations_cli/ # Violation reporting CLI -tests/ # 6,097+ tests (real DB, minimal mocks) +tests/ # 6,149+ tests (real DB, minimal mocks) docs/ # Project documentation and strategic plans bootcamp/ # Training exercises (debugging, analysis) data/ # Runtime databases (gitignored) diff --git a/README.md b/README.md index 590d4ff8..3bdc776a 100644 --- a/README.md +++ b/README.md @@ -16,8 +16,8 @@ An architecture for AI agents to exist as continuous selves across sessions — ## At a glance -- **386 source files across 26 packages** -- **6,097+ tests** (real SQLite, minimal mocks) +- **396 source files across 26 packages** +- **6,149+ tests** (real SQLite, minimal mocks) - **263 CLI commands** (designed for the agent, not the operator — humans mostly run three) - **22 slash-command skills** (consolidated daily operations) - **16 Claude Code enforcement hooks** @@ -204,7 +204,7 @@ cd DivineOS pip install -e ".[dev]" divineos init divineos briefing -pytest tests/ -q --tb=short # 6,097+ tests, real DB, minimal mocks +pytest tests/ -q --tb=short # 6,149+ tests, real DB, minimal mocks ``` **For AI agents (Claude Code, etc.):** The `.claude/hooks/` directory auto-loads your briefing at session start and runs checkpoints during work. Just open the project and start — the OS handles orientation. @@ -393,7 +393,7 @@ divineos admin reset-template # Scrub accumulated runtime state back to tem ## Architecture -DivineOS is 386 source files across 26 packages, structured as a CLI surface over a core library. +DivineOS is 396 source files across 26 packages, structured as a CLI surface over a core library. **At a glance:** @@ -406,7 +406,7 @@ DivineOS is 386 source files across 26 packages, structured as a CLI surface ove **Top-level directories:** -- **`tests/`** — 6,097+ tests, real SQLite, minimal mocks. +- **`tests/`** — 6,149+ tests, real SQLite, minimal mocks. - **`docs/`** — Documentation and design briefs. [`docs/ARCHITECTURE.md`](docs/ARCHITECTURE.md) has the full file tree with one-line descriptions for every source file. - **`bootcamp/`** — Training exercises (debugging, analysis). - **`setup/`** — Hook setup scripts (bash + powershell). diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index b0154046..72f3f126 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -363,6 +363,11 @@ src/divineos/ engagement_disclosure_surface.py Engagement-counter half-threshold disclosure surface. rest.py Rest program — restful tasks for the substrate-occupant. identity_load.py Identity-load surface — read AETHER.md (or equivalent) at briefing-time. + briefing_dashboard.py Briefing dashboard -- routing table, not scroll. + fix_verifier.py Fix verifier — catches premature "it's fixed" claims. + lesson_dedup.py Lesson deduplication — fuzzy matching to prevent duplicate lesson entries. + related_failure_scanner.py Related-failure scanner — catches "fixed one but missed related failures." + retry_blocker.py Retry blocker — prevents blind retries without diagnostic investigation. analysis/ _session_types.py Session analysis type definitions @@ -414,7 +419,7 @@ src/divineos/ integration/ External integration: IDE, MCP tool capture, enforcement facade (thin re-exports from core.enforcement / core.tool_wrapper). mcp_event_capture_server.py MCP event capture server system_monitor.py System health monitoring -tests/ 6,097+ tests (real DB, minimal mocks) +tests/ 6,149+ tests (real DB, minimal mocks) docs/ Project documentation and strategic plans bootcamp/ Training exercises (debugging, analysis) diff --git a/src/divineos/core/briefing_dashboard.py b/src/divineos/core/briefing_dashboard.py new file mode 100644 index 00000000..bbe51b30 --- /dev/null +++ b/src/divineos/core/briefing_dashboard.py @@ -0,0 +1,398 @@ +"""Briefing dashboard -- routing table, not scroll. + +The default briefing mode. Shows one line per area with counts, staleness +indicators, and the drill-down command. Makes ignoring stale items +expensive (the counts are loud) and engaging cheap (the command is right +there). + +Each area is a function that returns a DashboardRow or None (area has +nothing to show). The dashboard renders all non-None rows. Every row +function is wrapped in a broad except so one broken surface never takes +down the whole dashboard. +""" + +from __future__ import annotations + +import time +from dataclasses import dataclass +from typing import Any + +_SECONDS_PER_DAY = 86400 +_ERRORS = (Exception,) + + +def _safe_get(obj: object, key: str, default: object = None) -> Any: + """Get attribute from dict or dataclass — handles both shapes.""" + if isinstance(obj, dict): + return obj.get(key, default) + return getattr(obj, key, default) + + +@dataclass +class DashboardRow: + area: str + count: int + stale_count: int + drill_down: str + detail: str = "" + + +def _row_corrections() -> DashboardRow | None: + try: + from divineos.core.corrections import STALE_DAYS, open_corrections + + opens = open_corrections() + if not opens: + return None + stale = sum(1 for c in opens if c.get("age_days", 0) >= STALE_DAYS) + return DashboardRow( + area="Corrections", + count=len(opens), + stale_count=stale, + drill_down="divineos corrections --open", + ) + except _ERRORS: + return None + + +def _row_claims() -> DashboardRow | None: + try: + from divineos.core.claim_store import list_claims + + claims = list_claims(limit=200) + open_claims = [ + c for c in claims if c.get("status", "").upper() in ("OPEN", "INVESTIGATING") + ] + if not open_claims: + return None + now = time.time() + stale = 0 + for c in open_claims: + created = c.get("created_at", 0) + if isinstance(created, str): + try: + import datetime + + dt = datetime.datetime.fromisoformat(created) + created = dt.timestamp() + except (ValueError, TypeError): + created = 0 + if created and (now - created) / _SECONDS_PER_DAY >= 7: + stale += 1 + return DashboardRow( + area="Claims", + count=len(open_claims), + stale_count=stale, + drill_down="divineos claims list", + ) + except _ERRORS: + return None + + +def _row_audit_findings() -> DashboardRow | None: + try: + from divineos.core.watchmen.store import list_findings + + findings = list_findings() + unresolved = [f for f in findings if f.status.value not in ("RESOLVED", "DISMISSED")] + if not unresolved: + return None + return DashboardRow( + area="Audit findings", + count=len(unresolved), + stale_count=0, + drill_down="divineos audit list", + ) + except _ERRORS: + return None + + +def _row_preregs() -> DashboardRow | None: + try: + from divineos.core.pre_registrations.store import list_pre_registrations + + preregs = list_pre_registrations() + open_preregs = [p for p in preregs if _safe_get(p, "outcome", "OPEN") == "OPEN"] + if not open_preregs: + return None + now = time.time() + overdue = 0 + for p in open_preregs: + review_ts = float(_safe_get(p, "review_date_ts", 0) or 0) + if review_ts and review_ts < now: + overdue += 1 + return DashboardRow( + area="Pre-registrations", + count=len(open_preregs), + stale_count=overdue, + drill_down="divineos prereg list", + detail="overdue" if overdue else "", + ) + except _ERRORS: + return None + + +def _row_goals() -> DashboardRow | None: + try: + from divineos.core.hud_state import get_active_goals + + goals = get_active_goals() + if not goals: + return None + return DashboardRow( + area="Goals", + count=len(goals), + stale_count=0, + drill_down="divineos hud --brief", + ) + except _ERRORS: + return None + + +def _row_drift_state() -> DashboardRow | None: + try: + from divineos.core.watchmen.drift_state import compute_drift_state + + ds = compute_drift_state() + turns = ds.turns_since_medium + open_findings = ds.open_findings_above_low + if turns < 50 and open_findings == 0: + return None + detail_parts = [] + if turns: + detail_parts.append(f"{turns} turns since audit") + if open_findings: + detail_parts.append(f"{open_findings} open findings") + return DashboardRow( + area="Drift state", + count=turns, + stale_count=open_findings, + drill_down="divineos inspect drift", + detail=", ".join(detail_parts), + ) + except _ERRORS: + return None + + +def _row_compass() -> DashboardRow | None: + try: + from divineos.core.moral_compass import compass_summary + + summary = compass_summary() + observed = summary.get("observed_spectrums", 0) + total = summary.get("total_spectrums", 10) + drifting = summary.get("drifting", []) + concerns = summary.get("concerns", []) + unobserved = summary.get("unobserved_count", total) + drift_count = len(drifting) + len(concerns) + if observed == 0 and drift_count == 0: + return DashboardRow( + area="Compass", + count=0, + stale_count=0, + drill_down="divineos compass", + detail=f"{unobserved}/{total} spectrums unobserved", + ) + if drift_count > 0: + return DashboardRow( + area="Compass", + count=observed, + stale_count=drift_count, + drill_down="divineos compass", + detail=f"{drift_count} drift/concern(s)", + ) + return None + except _ERRORS: + return None + + +def _row_gate_failures() -> DashboardRow | None: + try: + from divineos.core.failure_diagnostics import recent_failures + + failures = recent_failures("gate") + if not failures: + return None + # Only surface failures from the last 24 hours — older ones are + # historical noise (the underlying issue is likely fixed). + cutoff = time.time() - _SECONDS_PER_DAY + recent = [f for f in failures if f.get("timestamp", 0) >= cutoff] + if not recent: + return None + return DashboardRow( + area="Gate failures", + count=len(recent), + stale_count=len(recent), + drill_down="divineos briefing --full", + detail="silent fail-open events (last 24h)", + ) + except _ERRORS: + return None + + +def _row_lessons() -> DashboardRow | None: + try: + from divineos.core.knowledge.lessons import get_lessons + + lessons = get_lessons(status="active", limit=100) + if not lessons: + return None + return DashboardRow( + area="Active lessons", + count=len(lessons), + stale_count=0, + drill_down="divineos lessons", + ) + except _ERRORS: + return None + + +def _row_handoff() -> DashboardRow | None: + try: + from divineos.core.hud_handoff import load_handoff_note + + note = load_handoff_note() + if not note: + return None + return DashboardRow( + area="Handoff note", + count=1, + stale_count=0, + drill_down="divineos hud --brief", + detail="from last session", + ) + except _ERRORS: + return None + + +def _row_holding() -> DashboardRow | None: + try: + from divineos.core.holding import get_holding + + items = get_holding() + if not items: + return None + return DashboardRow( + area="Holding room", + count=len(items), + stale_count=0, + drill_down="divineos holding list", + ) + except _ERRORS: + return None + + +def _row_questions() -> DashboardRow | None: + try: + from divineos.core.questions import get_questions + + open_q = get_questions(status="OPEN") + if not open_q: + return None + return DashboardRow( + area="Open questions", + count=len(open_q), + stale_count=0, + drill_down="divineos questions", + ) + except _ERRORS: + return None + + +def _row_explorations() -> DashboardRow | None: + try: + from pathlib import Path + + explore_dir = Path("exploration") + if not explore_dir.exists(): + return None + entries = [e for e in explore_dir.glob("*.md") if e.name != "README.md"] + if not entries: + return None + return DashboardRow( + area="Explorations", + count=len(entries), + stale_count=0, + drill_down="divineos mansion study", + ) + except _ERRORS: + return None + + +def _row_family_letters() -> DashboardRow | None: + try: + from pathlib import Path + + letters_dir = Path("family") / "letters" + if not letters_dir.exists(): + return None + letters = [f for f in letters_dir.glob("*.md") if f.name != "README.md"] + if not letters: + return None + return DashboardRow( + area="Family letters", + count=len(letters), + stale_count=0, + drill_down="ls family/letters/", + ) + except _ERRORS: + return None + + +# Ordered by importance: urgent items first, then state, then context +_ROW_FNS = [ + _row_corrections, + _row_handoff, + _row_claims, + _row_audit_findings, + _row_preregs, + _row_gate_failures, + _row_goals, + _row_lessons, + _row_drift_state, + _row_compass, + _row_holding, + _row_questions, + _row_explorations, + _row_family_letters, +] + + +def render_dashboard() -> str: + """Render the routing-table dashboard.""" + rows: list[DashboardRow] = [] + for fn in _ROW_FNS: + try: + row = fn() + if row is not None: + rows.append(row) + except _ERRORS: + continue + + lines = [ + "", + "=== BRIEFING DASHBOARD ===", + "", + ] + + if not rows: + lines.append(" All clear -- no open items.") + else: + has_stale = any(r.stale_count > 0 for r in rows) + if has_stale: + lines.append(" !! Stale items need attention (marked with !!)") + lines.append("") + + for row in rows: + stale_marker = f" ({row.stale_count} stale !!)" if row.stale_count else "" + detail_str = f" -- {row.detail}" if row.detail else "" + lines.append(f" {row.area}: {row.count}{stale_marker}{detail_str}") + lines.append(f" -> {row.drill_down}") + + lines.append("") + lines.append(" Cold-start map: LOADOUT.md") + lines.append(" Bio: divineos bio show") + lines.append(" Full briefing: divineos briefing --full") + lines.append("") + + return "\n".join(lines) diff --git a/src/divineos/core/corrections.py b/src/divineos/core/corrections.py index e2b4495d..949b1161 100644 --- a/src/divineos/core/corrections.py +++ b/src/divineos/core/corrections.py @@ -1,18 +1,21 @@ -"""Corrections notebook — the user's exact words, raw, no framing. +"""Corrections notebook -- the user's exact words, raw, no framing. When the user corrects something, the architectural fix is to capture their -exact words verbatim with a timestamp and nothing else — no severity, no +exact words verbatim with a timestamp and nothing else -- no severity, no category, no interpretation field. The reflex this is meant to replace is the one that turns 'they said X' into 'I got Y wrong about X.' Distortion rides on truth. The fix is to keep the truth uncoated. -Design layer: the analysis-as-substitute pattern fires pre-analytically; -only a different reflex can intercept it, and reflexes come from reps under -live conditions. This is the rep-tool. Structural layer: the rep alone dies -when the session dies — so it must be carved into structure to survive. +Resolution tracking (added 2026-05-08): corrections now carry a status +field (OPEN -> ADDRESSED -> RESOLVED). OPEN means unaddressed. ADDRESSED +means work was done but not yet verified. RESOLVED means done -- the +correction no longer surfaces in the briefing. Resolution is append-only: +a separate JSONL line records the status transition with evidence, so the +original correction text is never touched. -Both layers in one file: write raw, store persistent, surface in briefing -so I read the actual words on resumption before forming any frame. +Staleness: corrections OPEN longer than STALE_DAYS get a warning marker +in the briefing. The system tells me what's rotting instead of relying on +me to notice. """ from __future__ import annotations @@ -24,6 +27,9 @@ from divineos.core._hud_io import _ensure_hud_dir _CORRECTIONS_FILE = "corrections.jsonl" +_RESOLUTIONS_FILE = "correction_resolutions.jsonl" +STALE_DAYS = 3 +_SECONDS_PER_DAY = 86400 _CORR_ERRORS = (OSError, json.JSONDecodeError, KeyError, TypeError, ValueError) @@ -32,10 +38,14 @@ def _path() -> Any: return _ensure_hud_dir() / _CORRECTIONS_FILE +def _resolutions_path() -> Any: + return _ensure_hud_dir() / _RESOLUTIONS_FILE + + def log_correction(text: str, session_id: str | None = None) -> dict[str, Any]: """Capture a correction verbatim. No framing. No interpretation. - Append-only JSONL — never edits, never reframes. The whole point is + Append-only JSONL -- never edits, never reframes. The whole point is that what gets stored is exactly what was said, not my reading of it. """ entry: dict[str, Any] = { @@ -70,6 +80,95 @@ def load_corrections() -> list[dict[str, Any]]: return out +def _load_resolutions() -> dict[float, dict[str, Any]]: + """Load resolution records keyed by correction timestamp.""" + p = _resolutions_path() + if not p.exists(): + return {} + out: dict[float, dict[str, Any]] = {} + try: + with p.open(encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + rec = json.loads(line) + key = rec.get("correction_timestamp", 0.0) + out[key] = rec + except json.JSONDecodeError: + continue + except _CORR_ERRORS: + return {} + return out + + +def resolve_correction( + correction_timestamp: float, + status: str = "RESOLVED", + evidence: str = "", +) -> dict[str, Any]: + """Record a resolution for a correction. Append-only -- never edits the original.""" + if status not in ("ADDRESSED", "RESOLVED"): + raise ValueError(f"status must be ADDRESSED or RESOLVED, got {status!r}") + entry: dict[str, Any] = { + "correction_timestamp": correction_timestamp, + "status": status, + "evidence": evidence, + "resolved_at": time.time(), + } + line = json.dumps(entry, ensure_ascii=False) + with _resolutions_path().open("a", encoding="utf-8") as f: + f.write(line + "\n") + return entry + + +def correction_status(correction: dict[str, Any]) -> str: + """Return the current status of a correction: OPEN, ADDRESSED, or RESOLVED.""" + resolutions = _load_resolutions() + ts = correction.get("timestamp", 0.0) + res = resolutions.get(ts) + if res: + return str(res.get("status", "OPEN")) + return "OPEN" + + +def corrections_with_status() -> list[dict[str, Any]]: + """Return all corrections annotated with status and age.""" + all_c = load_corrections() + resolutions = _load_resolutions() + now = time.time() + out: list[dict[str, Any]] = [] + for c in all_c: + ts = c.get("timestamp", 0.0) + age_days = (now - ts) / _SECONDS_PER_DAY + res = resolutions.get(ts) + status = res.get("status", "OPEN") if res else "OPEN" + enriched = {**c, "status": status, "age_days": age_days} + if res: + enriched["evidence"] = res.get("evidence", "") + enriched["resolved_at"] = res.get("resolved_at", 0.0) + out.append(enriched) + return out + + +def open_corrections() -> list[dict[str, Any]]: + """Return only OPEN corrections, newest first.""" + all_enriched = corrections_with_status() + return list(reversed([c for c in all_enriched if c["status"] == "OPEN"])) + + +def _age_label(age_days: float) -> str: + """Human-readable age with staleness marker.""" + if age_days < 1: + return "today" + days = int(age_days) + label = f"{days}d ago" + if days >= STALE_DAYS: + label += " !!" + return label + + def recent_corrections(limit: int = 5) -> list[dict[str, Any]]: """Return the most recent N corrections, newest first.""" all_c = load_corrections() @@ -77,23 +176,38 @@ def recent_corrections(limit: int = 5) -> list[dict[str, Any]]: def format_for_briefing(limit: int = 5) -> str: - """Render recent corrections for the briefing surface. + """Render OPEN corrections for the briefing surface. - Read these BEFORE forming any frame about what's going on. The whole - purpose is to put the user's actual words in front of next-me before - any interpretation layer engages. + Only OPEN corrections appear. Each shows age and staleness markers. + ADDRESSED/RESOLVED corrections are cleared from the briefing view. """ - recents = recent_corrections(limit=limit) - if not recents: + open_c = open_corrections() + if not open_c: return "" - lines = ["", "# Recent Corrections (read raw — the user's exact words)", ""] - for c in recents: + shown = open_c[:limit] + stale_count = sum(1 for c in open_c if c.get("age_days", 0) >= STALE_DAYS) + + lines = ["", "# Open Corrections (read raw -- the user's exact words)", ""] + if stale_count: + lines.append(f" !! {stale_count} correction(s) unresolved for {STALE_DAYS}+ days") + lines.append( + ' Resolve with: divineos correction resolve --evidence "what addressed it"' + ) + lines.append("") + + for i, c in enumerate(shown, 1): ts = time.strftime("%Y-%m-%d %H:%M", time.localtime(c.get("timestamp", 0))) + age = _age_label(c.get("age_days", 0)) text = (c.get("text") or "").strip() - # Don't truncate. The whole point is the full uncoated text. - lines.append(f" [{ts}]") + lines.append(f" [{i}] [{ts}] ({age})") for ln in text.splitlines() or [text]: lines.append(f" {ln}") lines.append("") + + remaining = len(open_c) - len(shown) + if remaining > 0: + lines.append(f" ... and {remaining} more. Run: divineos corrections --open") + lines.append("") + return "\n".join(lines) diff --git a/src/divineos/core/fix_verifier.py b/src/divineos/core/fix_verifier.py new file mode 100644 index 00000000..e28a36d0 --- /dev/null +++ b/src/divineos/core/fix_verifier.py @@ -0,0 +1,119 @@ +"""Fix verifier — catches premature "it's fixed" claims. + +Lesson x4 (active): "I claimed something was fixed but the error came back." + +## Architecture + +After a tool failure followed by an Edit (likely a fix attempt), the +system sets a "pending verification" marker. If the agent then tries +another Edit or Write (moving on to new work) without running tests +or re-running the failed command, it gets an advisory nudge. + +This is advisory (soft-advise), not blocking. The agent might be making +a multi-file fix that requires several edits before verification. +Blocking would be too aggressive. + +## How it works + +1. PostToolUse records failures in the retry_tracker (shared with retry_blocker). +2. PostToolUse detects when an Edit follows a failure (fix attempt). +3. Sets a "pending_verification" marker. +4. PreToolUse checks: if pending_verification is set and the next tool + is Edit/Write (new work without verification), emit advisory. +5. Running tests (pytest, Bash with test commands) or re-running the + failed command clears the marker. + +## Marker file + +``~/.divineos/pending_verification.json`` — simple JSON with the +fix details. Auto-expires after 10 minutes. +""" + +from __future__ import annotations + +import json +import time +from pathlib import Path +from typing import Any + +from divineos.core.paths import marker_path as _marker_path + +VERIFICATION_EXPIRY_SECONDS = 600 # 10 minutes + + +def _marker_file() -> Path: + return _marker_path("pending_verification.json") + + +def mark_fix_attempted(file_path: str, error_context: str = "") -> None: + """Record that a fix was attempted — verification is now expected.""" + path = _marker_file() + path.parent.mkdir(parents=True, exist_ok=True) + data = { + "timestamp": time.time(), + "file_path": file_path, + "error_context": error_context[:200], + } + path.write_text(json.dumps(data), encoding="utf-8") + + +def clear_verification() -> None: + """Clear the pending verification marker (tests ran or command re-run).""" + path = _marker_file() + if path.exists(): + path.unlink(missing_ok=True) + + +def check_verification_needed(tool_name: str) -> str | None: + """Check if the agent is moving on without verifying a fix. + + Returns advisory message if pending, None otherwise. + """ + if tool_name not in ("Edit", "Write", "MultiEdit", "NotebookEdit"): + return None + + path = _marker_file() + if not path.exists(): + return None + + try: + data = json.loads(path.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + return None + + ts = data.get("timestamp", 0) + if time.time() - ts > VERIFICATION_EXPIRY_SECONDS: + path.unlink(missing_ok=True) + return None + + file_name = Path(data.get("file_path", "")).name + age = int(time.time() - ts) + + return ( + f"VERIFY-FIX REMINDER: You edited {file_name} {age}s ago as a fix, " + f"but haven't verified it works yet. Run tests or re-run the " + f"failed command before moving on. " + f"(Lesson x4: 'claimed fixed but the error came back.')" + ) + + +def is_verification_command(tool_name: str, tool_input: dict[str, Any]) -> bool: + """True if this tool call counts as fix verification.""" + if tool_name == "Bash": + cmd = tool_input.get("command", "") + verification_prefixes = ( + "pytest", + "python -m pytest", + "python -m unittest", + "npm test", + "cargo test", + "go test", + "make test", + "bash scripts/precommit", + ) + for prefix in verification_prefixes: + if cmd.startswith(prefix): + return True + # Re-running the same kind of command that failed + # is also verification (checking if the fix worked) + return False diff --git a/src/divineos/core/lesson_dedup.py b/src/divineos/core/lesson_dedup.py new file mode 100644 index 00000000..018bce3a --- /dev/null +++ b/src/divineos/core/lesson_dedup.py @@ -0,0 +1,114 @@ +"""Lesson deduplication — fuzzy matching to prevent duplicate lesson entries. + +The lesson store had 5 groups of exact duplicates and 3 groups of +semantic duplicates (e.g. "retried 2x", "retried 11x", "retried +without investigating" — same failure, different text). The extraction +pipeline's content_hash dedup only catches exact matches. + +This module adds fuzzy matching so semantically-equivalent lessons +merge instead of multiplying. + +## Algorithm + +1. Normalize: lowercase, strip numbers, strip session IDs, collapse + whitespace. +2. Compute word-level Jaccard similarity between the normalized + candidate and each existing active/improving lesson. +3. If similarity >= MERGE_THRESHOLD (0.6), return the existing lesson + for merging instead of creating a new one. + +## Why Jaccard and not embeddings + +- No external dependencies (no torch, no API calls). +- Fast enough to run in the extraction pipeline hot path. +- The failure mode we're catching (same behavioral pattern, different + wording) has high word overlap by construction — the agent describes + the same mistake with mostly the same words each time. +- 0.6 threshold catches "retried 2x" ≈ "retried 11x" (high overlap) + while separating genuinely different lessons (low overlap). +""" + +from __future__ import annotations + +import re +from typing import Any + +# Similarity threshold for merging. 0.6 = 60% word overlap. +# Tuned empirically against the 5 known duplicate groups: +# "retried 2x" vs "retried 11x" → ~0.75 (caught) +# "edited without reading" vs "broke tests" → ~0.15 (not caught) +MERGE_THRESHOLD = 0.6 + +# Patterns to strip during normalization +_SESSION_ID_RE = re.compile(r"[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}") +_NUMBERS_RE = re.compile(r"\b\d+\w*\b") +_MULTI_SPACE = re.compile(r"\s+") +_PUNCTUATION = re.compile(r"[^\w\s]") + + +def _normalize(text: str) -> set[str]: + """Normalize lesson text to a word set for comparison.""" + t = text.lower() + # Strip session IDs — they make otherwise-identical lessons look different + t = _SESSION_ID_RE.sub("", t) + # Strip bare numbers — "2x" vs "11x" shouldn't differentiate + t = _NUMBERS_RE.sub("", t) + # Strip punctuation — "errors," and "errors" should match + t = _PUNCTUATION.sub("", t) + # Collapse whitespace + t = _MULTI_SPACE.sub(" ", t).strip() + # Split into word set, filter short words + words = {w for w in t.split() if len(w) > 2} + return words + + +def _jaccard(a: set[str], b: set[str]) -> float: + """Jaccard similarity between two word sets.""" + if not a or not b: + return 0.0 + intersection = len(a & b) + union = len(a | b) + return intersection / union if union > 0 else 0.0 + + +def find_duplicate( + candidate: str, + existing_lessons: list[dict[str, Any]], + threshold: float = MERGE_THRESHOLD, +) -> dict[str, Any] | None: + """Find an existing lesson that is a fuzzy match for the candidate. + + Args: + candidate: The text of the new lesson being considered. + existing_lessons: List of lesson dicts with at least 'description' + and 'lesson_id' keys. + threshold: Jaccard similarity threshold for merging. + + Returns: + The best-matching existing lesson dict if similarity >= threshold, + or None if no match found. + """ + if not candidate or not existing_lessons: + return None + + candidate_words = _normalize(candidate) + if len(candidate_words) < 3: + # Too short to meaningfully compare + return None + + best_match = None + best_score = 0.0 + + for lesson in existing_lessons: + desc = lesson.get("description", "") + if not desc: + continue + lesson_words = _normalize(desc) + score = _jaccard(candidate_words, lesson_words) + if score > best_score: + best_score = score + best_match = lesson + + if best_score >= threshold and best_match is not None: + return best_match + return None diff --git a/src/divineos/core/related_failure_scanner.py b/src/divineos/core/related_failure_scanner.py new file mode 100644 index 00000000..5363ac35 --- /dev/null +++ b/src/divineos/core/related_failure_scanner.py @@ -0,0 +1,139 @@ +"""Related-failure scanner — catches "fixed one but missed related failures." + +Lesson x8 (second most repeated): "I fixed one problem but missed +related failures. Check all affected areas after a fix." + +## Architecture + +After an Edit tool succeeds, this module checks whether the old_string +pattern appears in other files in the same codebase. If it does, the +PostToolUse hook surfaces an advisory: "You fixed this in file X — +but the same pattern exists in files Y and Z." + +This is advisory (soft-advise), not blocking. The agent gets the +information and decides whether the other occurrences need fixing. +Blocking would be too aggressive — sometimes the "same pattern" in +other files is intentionally different. + +## How it works + +1. PostToolUse hook calls ``scan_for_related()`` after a successful Edit. +2. The scanner greps for the old_string (or a simplified version of it) + across ``src/`` and ``tests/``. +3. If matches are found in OTHER files, it returns an advisory message. +4. The hook surfaces the advisory via ``_make_soft_advise()``. + +## Performance + +Only runs on Edit (not Write, not Bash). Only greps if the old_string +is >= 10 chars (short strings produce too many false matches). Limits +results to 5 files to keep the message readable. +""" + +from __future__ import annotations + +import subprocess +from pathlib import Path + +# Don't scan for patterns shorter than this — too many false matches. +MIN_PATTERN_LENGTH = 10 + +# Maximum files to report in the advisory. +MAX_REPORTED_FILES = 5 + + +def scan_for_related( + file_path: str, + old_string: str, + repo_root: str | None = None, +) -> str | None: + """Check if old_string appears in other files. + + Returns an advisory message if matches found, None otherwise. + """ + if not old_string or len(old_string.strip()) < MIN_PATTERN_LENGTH: + return None + + # Use the first meaningful line of the old_string as search pattern. + # Full multi-line patterns are too specific to match elsewhere. + lines = [ln.strip() for ln in old_string.strip().splitlines() if ln.strip()] + if not lines: + return None + + # Pick the most distinctive line (longest, avoiding common boilerplate) + search_line = max(lines, key=len) + if len(search_line) < MIN_PATTERN_LENGTH: + return None + + # Determine repo root + if repo_root is None: + try: + result = subprocess.run( + ["git", "rev-parse", "--show-toplevel"], + capture_output=True, + text=True, + timeout=5, + ) + repo_root = result.stdout.strip() if result.returncode == 0 else "." + except (subprocess.TimeoutExpired, OSError): + repo_root = "." + + # Run ripgrep or grep for the pattern + try: + result = subprocess.run( + [ + "rg", + "--files-with-matches", + "--fixed-strings", + "--glob", + "*.py", + "--max-count", + "1", + search_line[:80], + ], + capture_output=True, + text=True, + timeout=10, + cwd=repo_root, + ) + if result.returncode != 0: + return None + matched_files = result.stdout.strip().splitlines() + except (subprocess.TimeoutExpired, FileNotFoundError, OSError): + # Fallback to grep if rg not available + try: + result = subprocess.run( + ["grep", "-rl", "--include=*.py", search_line[:60], "src/", "tests/"], + capture_output=True, + text=True, + timeout=10, + cwd=repo_root, + ) + if result.returncode != 0: + return None + matched_files = result.stdout.strip().splitlines() + except (subprocess.TimeoutExpired, FileNotFoundError, OSError): + return None + + # Normalize and exclude the file we just edited + norm_edited = Path(file_path).resolve() + other_files = [] + for f in matched_files: + norm_f = (Path(repo_root) / f).resolve() + if norm_f != norm_edited: + other_files.append(f) + + if not other_files: + return None + + shown = other_files[:MAX_REPORTED_FILES] + extra = len(other_files) - len(shown) + file_list = ", ".join(shown) + extra_note = f" (+{extra} more)" if extra else "" + + return ( + f"RELATED-PATTERN CHECK: The pattern you just changed in " + f"{Path(file_path).name} also appears in: {file_list}{extra_note}. " + f"Check whether those files need the same fix. " + f"(Lesson x8: 'fixed one but missed related failures.')" + ) diff --git a/src/divineos/core/retry_blocker.py b/src/divineos/core/retry_blocker.py new file mode 100644 index 00000000..7b17fd7a --- /dev/null +++ b/src/divineos/core/retry_blocker.py @@ -0,0 +1,197 @@ +"""Retry blocker — prevents blind retries without diagnostic investigation. + +Lesson x11 (most repeated behavioral failure): "I retried a failed +action without investigating the cause." This module is the riverbank. + +## Architecture (Revelation principle) + +Make the right path cheap: diagnostic commands (Read, Grep, git diff, +divineos ask) automatically clear the block. Make the wrong path +expensive: retrying a failed command without investigation is blocked. + +## How it works + +1. PostToolUse hook calls ``record_failure()`` when a tool errors. +2. PreToolUse gate calls ``check_retry()`` on the next tool call. +3. If the upcoming command has the same signature as a recent + uninvestigated failure, the gate blocks. +4. Any diagnostic command calls ``mark_investigated()``, clearing + the block. + +## Marker file + +``~/.divineos/retry_tracker.json`` — a list of recent failure records. +Auto-expires after 5 minutes. Ring buffer capped at 10 entries. + +## Calibration (over-inclusive principle) + +Wide net on "same command" (tool_name + target file or first 3 words). +Narrow gate on what clears (only genuine read/inspect commands count). +""" + +from __future__ import annotations + +import json +import re +import time +from pathlib import Path +from typing import Any + +from divineos.core.paths import marker_path as _marker_path_under_home + +FAILURE_EXPIRY_SECONDS = 300 +MAX_TRACKED_FAILURES = 10 + +_DIVINEOS_SUBCMD_RE = re.compile(r"\bdivineos\s+(\w[\w-]*)") + + +def _tracker_path() -> Path: + return _marker_path_under_home("retry_tracker.json") + + +def _load_tracker() -> list[dict[str, Any]]: + path = _tracker_path() + if not path.exists(): + return [] + try: + data = json.loads(path.read_text(encoding="utf-8")) + if not isinstance(data, list): + return [] + except (json.JSONDecodeError, OSError): + return [] + now = time.time() + return [e for e in data if now - e.get("timestamp", 0) < FAILURE_EXPIRY_SECONDS] + + +def _save_tracker(entries: list[dict[str, Any]]) -> None: + path = _tracker_path() + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(entries[-MAX_TRACKED_FAILURES:]), encoding="utf-8") + + +def _command_signature(tool_name: str, tool_input: dict[str, Any]) -> str: + """Extract a similarity signature for retry detection. + + Two calls are "substantially similar" if they produce the same + signature. Over-inclusive by design — false positives are cheap + (agent just has to read something first), false negatives are + expensive (blind retry loop continues). + """ + if tool_name in ("Edit", "Write", "MultiEdit", "NotebookEdit"): + return f"{tool_name}:{tool_input.get('file_path', '')}" + if tool_name == "Bash": + cmd = tool_input.get("command", "") + parts = cmd.split()[:3] + return f"Bash:{' '.join(parts)}" + # For other tools, use tool name + first string argument + for _k, v in sorted(tool_input.items()): + if isinstance(v, str) and v: + return f"{tool_name}:{v[:60]}" + return tool_name + + +def record_failure(tool_name: str, tool_input: dict[str, Any], error: str = "") -> None: + """Record a tool failure. Called by PostToolUse on error.""" + entries = _load_tracker() + entries.append( + { + "timestamp": time.time(), + "signature": _command_signature(tool_name, tool_input), + "tool_name": tool_name, + "error_snippet": error[:200], + "investigated": False, + } + ) + _save_tracker(entries) + + +def mark_investigated() -> None: + """Mark all failures as investigated. Called when a diagnostic runs.""" + entries = _load_tracker() + if not entries: + return + for e in entries: + e["investigated"] = True + _save_tracker(entries) + + +def clear_all() -> None: + """Remove the tracker file entirely.""" + path = _tracker_path() + if path.exists(): + path.unlink(missing_ok=True) + + +def check_retry(tool_name: str, tool_input: dict[str, Any]) -> str | None: + """Check if this tool call is a blind retry of a recent failure. + + Returns denial message string if blocking, None if allowed. + """ + entries = _load_tracker() + if not entries: + return None + + sig = _command_signature(tool_name, tool_input) + matches = [e for e in entries if e.get("signature") == sig and not e.get("investigated", False)] + if not matches: + return None + + last = matches[-1] + err = last.get("error_snippet", "") + age = int(time.time() - last.get("timestamp", 0)) + + return ( + f"BLOCKED: This looks like a retry of a command that failed {age}s ago " + f"without investigation in between. " + f"{'Error was: ' + err + '. ' if err else ''}" + f"Investigate first — read the error, check the file, understand why " + f"it failed. Diagnostic commands (Read, Grep, Glob, git diff/log/status, " + f"divineos ask/recall/context) clear this block automatically." + ) + + +# --- Diagnostic detection --- + +_DIAGNOSTIC_TOOLS = frozenset({"Read", "Grep", "Glob"}) + +_DIAGNOSTIC_BASH_PREFIXES = ( + "git log", + "git diff", + "git status", + "git show", + "cat ", + "head ", + "tail ", + "ls ", + "find ", + "python -c", + "type ", +) + +_DIAGNOSTIC_DIVINEOS = frozenset( + { + "ask", + "recall", + "context", + "briefing", + "inspect", + "body", + "health", + "verify", + } +) + + +def is_diagnostic_command(tool_name: str, tool_input: dict[str, Any]) -> bool: + """True if this tool call counts as diagnostic investigation.""" + if tool_name in _DIAGNOSTIC_TOOLS: + return True + if tool_name == "Bash": + cmd = tool_input.get("command", "") + for prefix in _DIAGNOSTIC_BASH_PREFIXES: + if cmd.startswith(prefix): + return True + m = _DIVINEOS_SUBCMD_RE.search(cmd) + if m and m.group(1) in _DIAGNOSTIC_DIVINEOS: + return True + return False diff --git a/tests/test_briefing_dashboard.py b/tests/test_briefing_dashboard.py new file mode 100644 index 00000000..01affdb6 --- /dev/null +++ b/tests/test_briefing_dashboard.py @@ -0,0 +1,68 @@ +"""Tests for the briefing dashboard -- routing table mode.""" + +from divineos.core.briefing_dashboard import DashboardRow, render_dashboard + + +class TestDashboardRow: + def test_row_fields(self): + row = DashboardRow( + area="Corrections", + count=5, + stale_count=2, + drill_down="divineos corrections --open", + ) + assert row.area == "Corrections" + assert row.count == 5 + assert row.stale_count == 2 + assert row.detail == "" + + +class TestRenderDashboard: + def test_renders_without_error(self, tmp_path, monkeypatch): + monkeypatch.setenv("DIVINEOS_DB", str(tmp_path / "test.db")) + output = render_dashboard() + assert isinstance(output, str) + + def test_shows_all_clear_when_empty(self, tmp_path, monkeypatch): + monkeypatch.setenv("DIVINEOS_DB", str(tmp_path / "test.db")) + output = render_dashboard() + assert "All clear" in output or "DASHBOARD" in output + + def test_shows_corrections_when_present(self, tmp_path, monkeypatch): + monkeypatch.setenv("DIVINEOS_DB", str(tmp_path / "test.db")) + from divineos.core.corrections import log_correction + + log_correction("test correction") + output = render_dashboard() + assert "Corrections" in output + assert "divineos corrections --open" in output + + def test_shows_stale_warning(self, tmp_path, monkeypatch): + monkeypatch.setenv("DIVINEOS_DB", str(tmp_path / "test.db")) + import json + import time + from divineos.core.corrections import _path + + entry = {"text": "old", "timestamp": time.time() - 5 * 86400, "session_id": ""} + with _path().open("a", encoding="utf-8") as f: + f.write(json.dumps(entry) + "\n") + output = render_dashboard() + assert "stale" in output + assert "!!" in output + + def test_full_briefing_pointer(self, tmp_path, monkeypatch): + monkeypatch.setenv("DIVINEOS_DB", str(tmp_path / "test.db")) + from divineos.core.corrections import log_correction + + log_correction("something") + output = render_dashboard() + assert "divineos briefing --full" in output + + def test_resolved_corrections_not_counted(self, tmp_path, monkeypatch): + monkeypatch.setenv("DIVINEOS_DB", str(tmp_path / "test.db")) + from divineos.core.corrections import log_correction, resolve_correction + + entry = log_correction("resolved one") + resolve_correction(entry["timestamp"], evidence="done") + output = render_dashboard() + assert "Corrections" not in output diff --git a/tests/test_fix_verifier.py b/tests/test_fix_verifier.py new file mode 100644 index 00000000..1b1ff72f --- /dev/null +++ b/tests/test_fix_verifier.py @@ -0,0 +1,65 @@ +"""Tests for fix verifier — catches premature 'it's fixed' claims.""" + +import json +import time + +from divineos.core.fix_verifier import ( + check_verification_needed, + clear_verification, + is_verification_command, + mark_fix_attempted, +) + + +class TestMarkAndCheck: + def test_no_pending_returns_none(self, tmp_path, monkeypatch): + monkeypatch.setenv("DIVINEOS_HOME", str(tmp_path)) + assert check_verification_needed("Edit") is None + + def test_mark_then_check_returns_advisory(self, tmp_path, monkeypatch): + monkeypatch.setenv("DIVINEOS_HOME", str(tmp_path)) + mark_fix_attempted("src/foo.py", "NameError: bar") + msg = check_verification_needed("Edit") + assert msg is not None + assert "VERIFY-FIX" in msg + assert "foo.py" in msg + + def test_clear_removes_pending(self, tmp_path, monkeypatch): + monkeypatch.setenv("DIVINEOS_HOME", str(tmp_path)) + mark_fix_attempted("src/foo.py") + clear_verification() + assert check_verification_needed("Edit") is None + + def test_only_fires_on_edit_write(self, tmp_path, monkeypatch): + monkeypatch.setenv("DIVINEOS_HOME", str(tmp_path)) + mark_fix_attempted("src/foo.py") + # Non-edit tools don't trigger the advisory + assert check_verification_needed("Read") is None + assert check_verification_needed("Bash") is None + assert check_verification_needed("Grep") is None + + def test_expires_after_timeout(self, tmp_path, monkeypatch): + monkeypatch.setenv("DIVINEOS_HOME", str(tmp_path)) + from divineos.core.paths import marker_path + + mark_fix_attempted("src/foo.py") + # Manually backdate the marker + path = marker_path("pending_verification.json") + data = json.loads(path.read_text(encoding="utf-8")) + data["timestamp"] = time.time() - 700 # > 600s expiry + path.write_text(json.dumps(data), encoding="utf-8") + assert check_verification_needed("Edit") is None + + +class TestVerificationCommands: + def test_pytest_is_verification(self): + assert is_verification_command("Bash", {"command": "pytest tests/ -q"}) + + def test_precommit_is_verification(self): + assert is_verification_command("Bash", {"command": "bash scripts/precommit.sh"}) + + def test_random_bash_is_not(self): + assert not is_verification_command("Bash", {"command": "ls -la"}) + + def test_edit_is_not_verification(self): + assert not is_verification_command("Edit", {"file_path": "foo.py"}) diff --git a/tests/test_lesson_dedup.py b/tests/test_lesson_dedup.py new file mode 100644 index 00000000..fc6f9d54 --- /dev/null +++ b/tests/test_lesson_dedup.py @@ -0,0 +1,93 @@ +"""Tests for lesson fuzzy deduplication.""" + +from divineos.core.lesson_dedup import _jaccard, _normalize, find_duplicate + + +class TestNormalize: + def test_strips_numbers(self): + words = _normalize("I retried a failed action 11x without investigating") + assert "11x" not in words # number stripped, 'x' too short + + def test_strips_session_ids(self): + words = _normalize("session 4517c734-1fe1-4ad0-b0e0-4e4e4300953b failed") + # UUID should be stripped + assert "4517c734-1fe1-4ad0-b0e0-4e4e4300953b" not in " ".join(words) + + def test_lowercase(self): + words = _normalize("RETRIED Failed ACTION") + assert "retried" in words + assert "failed" in words + + def test_filters_short_words(self): + words = _normalize("I am a bad AI") + # 'I', 'am', 'a' are <= 2 chars, filtered + assert "bad" in words + + +class TestJaccard: + def test_identical_sets(self): + assert _jaccard({"a", "b", "c"}, {"a", "b", "c"}) == 1.0 + + def test_disjoint_sets(self): + assert _jaccard({"a", "b"}, {"c", "d"}) == 0.0 + + def test_partial_overlap(self): + # {a,b,c} & {b,c,d} = {b,c}, union = {a,b,c,d} + assert _jaccard({"a", "b", "c"}, {"b", "c", "d"}) == 0.5 + + def test_empty_set(self): + assert _jaccard(set(), {"a"}) == 0.0 + + +class TestFindDuplicate: + def test_catches_retry_variants(self): + """The core use case: 'retried 2x' and 'retried 11x' are the same lesson.""" + existing = [ + { + "lesson_id": "abc", + "description": "I retried a failed action without investigating the cause. Investigate errors, dont blindly retry.", + }, + ] + candidate = "I retried a failed action 2x without investigating the cause. I need to investigate errors, not blindly retry" + match = find_duplicate(candidate, existing) + assert match is not None + assert match["lesson_id"] == "abc" + + def test_different_lessons_not_matched(self): + """Genuinely different lessons should not match.""" + existing = [ + { + "lesson_id": "abc", + "description": "I retried a failed action without investigating the cause.", + }, + ] + candidate = "I edited files without reading them first. I must read before I edit." + match = find_duplicate(candidate, existing) + assert match is None + + def test_empty_existing(self): + match = find_duplicate("some lesson", []) + assert match is None + + def test_short_candidate_skipped(self): + """Very short candidates can't meaningfully compare.""" + existing = [{"lesson_id": "abc", "description": "I retried without investigating."}] + match = find_duplicate("bad", existing) + assert match is None + + def test_best_match_returned(self): + """When multiple lessons match, the best one is returned.""" + existing = [ + { + "lesson_id": "low", + "description": "I upset the user by acting without pausing to understand the situation.", + }, + { + "lesson_id": "high", + "description": "I retried a failed action without investigating the cause. Investigate errors, dont blindly retry.", + }, + ] + candidate = "I retried a failed action without investigating the cause. I need to investigate errors, not blindly retry." + match = find_duplicate(candidate, existing) + assert match is not None + assert match["lesson_id"] == "high" diff --git a/tests/test_related_failure_scanner.py b/tests/test_related_failure_scanner.py new file mode 100644 index 00000000..6640450f --- /dev/null +++ b/tests/test_related_failure_scanner.py @@ -0,0 +1,35 @@ +"""Tests for the related-failure scanner.""" + +from divineos.core.related_failure_scanner import scan_for_related + + +class TestScanForRelated: + def test_short_patterns_skipped(self): + """Patterns < 10 chars produce too many false matches.""" + result = scan_for_related("/foo.py", "x = 1") + assert result is None + + def test_empty_pattern_skipped(self): + result = scan_for_related("/foo.py", "") + assert result is None + + def test_none_when_no_matches(self, tmp_path): + """No matches returns None.""" + test_file = tmp_path / "test.py" + test_file.write_text("unique_pattern_xyz_12345") + result = scan_for_related( + str(test_file), + "this_pattern_does_not_exist_anywhere_in_any_file", + repo_root=str(tmp_path), + ) + assert result is None + + def test_multiline_uses_longest_line(self): + """Multi-line patterns use the longest line for search.""" + # Just verify it doesn't crash on multiline input + result = scan_for_related( + "/foo.py", + "short\nthis_is_a_much_longer_line_that_should_be_picked\nalso short", + ) + # Result depends on whether rg/grep finds matches; we just test no crash + assert result is None or "RELATED-PATTERN" in result diff --git a/tests/test_retry_blocker.py b/tests/test_retry_blocker.py new file mode 100644 index 00000000..9d145164 --- /dev/null +++ b/tests/test_retry_blocker.py @@ -0,0 +1,118 @@ +"""Tests for the retry blocker gate.""" + +import json +import time + +import pytest + +from divineos.core.retry_blocker import ( + _command_signature, + _tracker_path, + check_retry, + clear_all, + is_diagnostic_command, + mark_investigated, + record_failure, +) + + +@pytest.fixture(autouse=True) +def _clean_tracker(): + """Ensure clean state before and after each test.""" + clear_all() + yield + clear_all() + + +class TestCommandSignature: + def test_edit_uses_file_path(self): + sig = _command_signature("Edit", {"file_path": "/foo/bar.py", "old_string": "x"}) + assert sig == "Edit:/foo/bar.py" + + def test_bash_uses_first_three_words(self): + sig = _command_signature("Bash", {"command": "pytest tests/ -q --tb=short"}) + assert sig == "Bash:pytest tests/ -q" + + def test_bash_short_command(self): + sig = _command_signature("Bash", {"command": "ls"}) + assert sig == "Bash:ls" + + def test_other_tool_uses_first_string_arg(self): + sig = _command_signature("Grep", {"pattern": "foo.*bar", "path": "/src"}) + # sorted keys: path comes before pattern + assert "Grep:" in sig + + +class TestRecordAndCheck: + def test_first_attempt_not_blocked(self): + """First attempt at a command is never blocked.""" + result = check_retry("Edit", {"file_path": "/foo.py"}) + assert result is None + + def test_retry_after_failure_blocked(self): + """Same command after failure without investigation is blocked.""" + record_failure("Edit", {"file_path": "/foo.py"}, "SyntaxError") + result = check_retry("Edit", {"file_path": "/foo.py"}) + assert result is not None + assert "BLOCKED" in result + assert "SyntaxError" in result + + def test_different_command_not_blocked(self): + """Different command after failure is not blocked.""" + record_failure("Edit", {"file_path": "/foo.py"}, "error") + result = check_retry("Edit", {"file_path": "/bar.py"}) + assert result is None + + def test_investigation_clears_block(self): + """Marking as investigated clears the retry block.""" + record_failure("Edit", {"file_path": "/foo.py"}, "error") + mark_investigated() + result = check_retry("Edit", {"file_path": "/foo.py"}) + assert result is None + + def test_clear_all_removes_tracker(self): + record_failure("Edit", {"file_path": "/foo.py"}, "error") + clear_all() + result = check_retry("Edit", {"file_path": "/foo.py"}) + assert result is None + + +class TestDiagnosticDetection: + def test_read_is_diagnostic(self): + assert is_diagnostic_command("Read", {"file_path": "/foo.py"}) + + def test_grep_is_diagnostic(self): + assert is_diagnostic_command("Grep", {"pattern": "foo"}) + + def test_glob_is_diagnostic(self): + assert is_diagnostic_command("Glob", {"pattern": "*.py"}) + + def test_git_diff_is_diagnostic(self): + assert is_diagnostic_command("Bash", {"command": "git diff src/"}) + + def test_divineos_ask_is_diagnostic(self): + assert is_diagnostic_command("Bash", {"command": "divineos ask 'retry'"}) + + def test_edit_is_not_diagnostic(self): + assert not is_diagnostic_command("Edit", {"file_path": "/foo.py"}) + + def test_write_is_not_diagnostic(self): + assert not is_diagnostic_command("Write", {"file_path": "/foo.py"}) + + def test_bash_edit_is_not_diagnostic(self): + assert not is_diagnostic_command("Bash", {"command": "sed -i 's/foo/bar/' file.py"}) + + +class TestExpiry: + def test_old_failures_expire(self, monkeypatch): + """Failures older than FAILURE_EXPIRY_SECONDS are pruned.""" + record_failure("Edit", {"file_path": "/foo.py"}, "error") + + # Manually age the entry + path = _tracker_path() + data = json.loads(path.read_text()) + data[0]["timestamp"] = time.time() - 400 # > 300s expiry + path.write_text(json.dumps(data)) + + result = check_retry("Edit", {"file_path": "/foo.py"}) + assert result is None # expired, not blocked