diff --git a/gate/fixer.py b/gate/fixer.py index 3645197d..9be229d5 100644 --- a/gate/fixer.py +++ b/gate/fixer.py @@ -1014,7 +1014,9 @@ def _loop() -> None: def run(self) -> FixResult: """Execute the fix pipeline.""" if self._cancelled.is_set(): - return FixResult(success=False, summary="Cancelled before start") + return FixResult( + success=False, skipped=True, summary="Cancelled before start" + ) self._fix_start_monotonic = time.monotonic() self._start_watchdog() @@ -1046,15 +1048,21 @@ def run(self) -> FixResult: ) finding_count = len(actionable) - # Check fix attempt limits + # Check fix attempt limits — cooldown / soft / lifetime caps + # are policy decisions, not failures, so the result is marked + # ``skipped`` so reviews.jsonl can distinguish "we chose not to + # try" from "we tried and failed". ``log_fix_result`` maps that + # to ``decision: "fix_skipped"`` (audit P3.1, May 13). allowed, reason = state.check_fix_limits(self.pr_number, self.config, repo=self.repo) if not allowed: github.comment_pr( self.repo, self.pr_number, - f"**Gate Auto-Fix: Attempt limit reached** — {reason}", + f"**Gate Auto-Fix: skipped** — {reason}", + ) + return FixResult( + success=False, skipped=True, reason=reason, summary=reason ) - return FixResult(success=False, reason=reason, summary=reason) triage = self._read_json("triage.json") or {} risk_level = triage.get("risk_level", "medium") @@ -1140,7 +1148,9 @@ def run(self) -> FixResult: for iteration in range(1, MAX_ITERATIONS + 1): if self._cancelled.is_set(): - return FixResult(success=False, summary="Cancelled") + return FixResult( + success=False, skipped=True, summary="Cancelled" + ) write_live_log( self.pr_number, @@ -1290,7 +1300,12 @@ def run(self) -> FixResult: except FileNotFoundError as e: logger.warning(f"Fix pipeline aborted (workspace deleted): {e}") - return FixResult(success=False, summary="Workspace deleted (cancelled)") + # Worktree teardown is the cancellation cascade — not a + # real failure. Mark skipped so reviews.jsonl distinguishes + # this from iteration-exhausted / crash failures. + return FixResult( + success=False, skipped=True, summary="Workspace deleted (cancelled)" + ) except Exception as e: logger.exception(f"Fix pipeline failed for PR #{self.pr_number}") notify.fix_failed(self.pr_number, str(e), 0, self.repo) @@ -1607,10 +1622,15 @@ def _run_rereview(self) -> bool: result = run_with_retry( lambda: StructuredRunner().run( - "fix-rereview", assembled, self.workspace, self.config + "fix-rereview", + assembled, + self.workspace, + self.config, + cancelled=self._cancelled, ), "fix-rereview", self.config, + cancelled=self._cancelled, ) # Write result diff --git a/gate/fixer_polish.py b/gate/fixer_polish.py index 5d7989a7..961b61cb 100644 --- a/gate/fixer_polish.py +++ b/gate/fixer_polish.py @@ -511,10 +511,16 @@ def _run_fix_polish_audit(pipeline: "FixPipeline") -> dict | None: def _run() -> StageResult: return StructuredRunner().run( - "fix-polish", assembled, pipeline.workspace, pipeline.config + "fix-polish", + assembled, + pipeline.workspace, + pipeline.config, + cancelled=pipeline._cancelled, ) - result = run_with_retry(_run, "fix-polish", pipeline.config) + result = run_with_retry( + _run, "fix-polish", pipeline.config, cancelled=pipeline._cancelled + ) data = result.data or {} (pipeline.workspace / "fix-polish.json").write_text(json.dumps(data, indent=2)) clean = data.get("clean", True) diff --git a/gate/github.py b/gate/github.py index f87befa1..ddc84aaa 100644 --- a/gate/github.py +++ b/gate/github.py @@ -463,6 +463,23 @@ def _build_comment(verdict: dict, build: dict | None) -> str: return md +def _is_own_pr(pr_author: str, config: dict | None) -> bool: + """True when the PR was opened by the configured bot account. + + Used by ``post_review`` / ``approve_pr`` to short-circuit the + ``gh pr review`` round-trip that GitHub rejects with + ``GraphQL: Review Can not approve / request changes on your own + pull request``. The existing stderr-fallback (parse the error, + re-post as a comment) still acts as belt-and-suspenders for the + case where ``bot_account`` is misconfigured or ``pr_author`` is + empty (e.g. health.py orphan-cleanup paths). + """ + if not pr_author or not config: + return False + bot = (config.get("repo") or {}).get("bot_account") or "" + return bool(bot) and pr_author.lower() == bot.lower() + + def post_review( repo: str, pr_number: int, @@ -470,6 +487,7 @@ def post_review( build: dict | None, sha: str, config: dict | None = None, + pr_author: str = "", ) -> None: """Post a review to the PR. Always enforcement mode. @@ -478,12 +496,29 @@ def post_review( Also upserts a sticky human-readable summary comment (separate from the GitHub review object) so the team always has one canonical place to read Gate's verdict for this PR. + + ``pr_author`` (when supplied) plus ``config["repo"]["bot_account"]`` + let us detect bot-authored PRs up front and route directly to + ``comment_pr`` instead of making the ``gh pr review`` call that + GitHub rejects for self-reviews. The legacy stderr fallback is + preserved for the misconfigured / unknown-author case. """ comment = _build_comment(verdict, build) decision = verdict.get("decision", "approve") findings = verdict.get("findings", []) - if decision in ("approve", "approve_with_notes"): + own_pr = _is_own_pr(pr_author, config) + + if own_pr: + # Bot opened this PR — GitHub forbids self-reviews. Post the + # verdict as a comment instead of trying (and failing) the + # ``gh pr review`` call. No WARNING, no wasted round-trip, no + # race with the sticky-comment update. + comment_pr(repo, pr_number, comment) + logger.info( + f"PR #{pr_number}: bot-authored PR, posted review as comment ({decision})" + ) + elif decision in ("approve", "approve_with_notes"): try: _gh(["pr", "review", str(pr_number), "--repo", repo, "--approve", "--body", comment]) logger.info(f"PR #{pr_number} approved ({decision})") @@ -509,6 +544,11 @@ def post_review( else: raise + # Escalation runs for any non-approve verdict regardless of who + # opened the PR — humans still need to be paged when there's a + # critical finding or low-confidence verdict, and the label / + # reviewer assignment paths work the same on bot-authored PRs. + if decision not in ("approve", "approve_with_notes"): has_critical = any( f.get("severity") == "critical" and f.get("introduced_by_pr") is not False for f in findings @@ -631,8 +671,28 @@ def complete_check_run( # ── Simple PR Operations ──────────────────────────────────── -def approve_pr(repo: str, pr_number: int, body: str) -> None: - """Approve a PR, falling back to a comment if we own the PR.""" +def approve_pr( + repo: str, + pr_number: int, + body: str, + pr_author: str = "", + config: dict | None = None, +) -> None: + """Approve a PR, falling back to a comment if we own the PR. + + ``pr_author`` + ``config["repo"]["bot_account"]`` let us detect + bot-authored PRs and route directly to ``comment_pr`` without + incurring the failing ``gh pr review --approve`` round-trip. The + legacy stderr fallback below is retained as belt-and-suspenders + for the case where one or both args are unavailable (e.g. + health.py orphan-cleanup paths that don't have a config in hand). + """ + if _is_own_pr(pr_author, config): + comment_pr(repo, pr_number, body) + logger.info( + f"PR #{pr_number}: bot-authored PR, posted approval as comment" + ) + return try: _gh(["pr", "review", str(pr_number), "--repo", repo, "--approve", "--body", body]) except subprocess.CalledProcessError as e: diff --git a/gate/health.py b/gate/health.py index e830813f..9258cca5 100644 --- a/gate/health.py +++ b/gate/health.py @@ -376,9 +376,19 @@ def _process_marker(pr_dir: Path, default_repo: str, label: str) -> None: ) try: pr_number = int(pr_num) + # ``pr_author`` is intentionally omitted here: the + # active_review.json marker does not persist it (the + # orchestrator writes the marker before fetching PR + # info), and this orphan-cleanup path is a fail-open + # recovery that runs at most a few times per day. The + # stderr-fallback in ``approve_pr`` still handles bot + # PRs correctly — it just costs one extra GraphQL + # round-trip and a WARNING log line per orphaned bot + # PR, which is acceptable for a recovery path. github.approve_pr( repo, pr_number, "**Gate (error)** — review process died. Auto-approving.", + config=config, ) except ValueError: pass diff --git a/gate/logger.py b/gate/logger.py index a27c29f8..022a61a7 100644 --- a/gate/logger.py +++ b/gate/logger.py @@ -162,13 +162,23 @@ def log_fix_result( ) -> None: """Append a fix result entry to reviews.jsonl. - ``status`` may be one of ``"succeeded"``, ``"failed"``, or ``"no_op"``. - When not provided it is derived from ``fix_success`` (legacy callers - that have not been updated yet). ``no_op`` lets log consumers - distinguish "the fix pipeline intentionally did nothing" (e.g. a - graceful no-op on an approve_with_notes PR with no mechanical work) - from "the fix pipeline landed commits" and from "the fix pipeline - failed" (audit A10). + ``status`` may be one of ``"succeeded"``, ``"failed"``, ``"no_op"``, + or ``"skipped"``. When not provided it is derived from + ``fix_success`` (legacy callers that have not been updated yet). + + The four statuses let log consumers distinguish: + + - ``no_op`` — the pipeline intentionally did nothing (e.g. graceful + no-op on an approve_with_notes PR with no mechanical work) + - ``succeeded`` — the pipeline landed commits + - ``failed`` — the pipeline attempted real work and could not + produce a valid diff (iteration exhaustion, crash) + - ``skipped`` — the pipeline short-circuited before attempting any + work because of cancellation (supersede / operator cancel / + workspace teardown) or policy (cooldown, soft / lifetime limit). + ``fix_skipped`` is excluded from the success-rate denominator in + ``gate.reports`` the same way ``fix_no_op`` is, since neither + represents an actual fix attempt with an outcome. Hopper-mode kwargs (``pipeline_mode``, ``sub_scope_*``, ``wall_clock_seconds``, ``runaway_guard_hit``, ``fixed_count``, @@ -177,12 +187,13 @@ def log_fix_result( """ if status is None: status = "succeeded" if fix_success else "failed" - if status == "no_op": - decision = "fix_no_op" - elif status == "succeeded": - decision = "fix_succeeded" - else: - decision = "fix_failed" + decision_map = { + "succeeded": "fix_succeeded", + "no_op": "fix_no_op", + "skipped": "fix_skipped", + "failed": "fix_failed", + } + decision = decision_map.get(status, "fix_failed") entry = { "timestamp": datetime.now(timezone.utc).isoformat(), "repo": repo, diff --git a/gate/orchestrator.py b/gate/orchestrator.py index 8bbaf17e..381e8fc6 100644 --- a/gate/orchestrator.py +++ b/gate/orchestrator.py @@ -254,6 +254,7 @@ def run(self) -> None: github.approve_pr( self.repo, self.pr_number, f"**Gate: {action}** — review skipped by label.", + pr_author=self.pr_author, config=self.config, ) github.complete_check_run( self.repo, self.check_run_id, @@ -275,6 +276,7 @@ def run(self) -> None: self.repo, self.pr_number, "**Gate (circuit breaker)** — last 3 reviews were errors. " "Auto-approving. Investigate the gate machine.", + pr_author=self.pr_author, config=self.config, ) github.complete_check_run( self.repo, self.check_run_id, @@ -377,6 +379,7 @@ def run(self) -> None: github.approve_pr( self.repo, self.pr_number, "**Gate (quota pause)** — auto-approved, quota low.", + pr_author=self.pr_author, config=self.config, ) github.complete_check_run( self.repo, self.check_run_id, @@ -398,6 +401,7 @@ def run(self) -> None: github.approve_pr( self.repo, self.pr_number, "**Gate (cycle limit)** — auto-approved after max review cycles.", + pr_author=self.pr_author, config=self.config, ) github.complete_check_run( self.repo, self.check_run_id, @@ -430,7 +434,33 @@ def run(self) -> None: self._update_check("Stage 2: Build verification...") write_live_log(self.pr_number, "Build starting", "stage", repo=self.repo) build_result = builder.run_build(self.workspace, config=self.config) - (self.workspace / "build.json").write_text(json.dumps(build_result, indent=2)) + # Guard the build.json write the same way ``_save_stage_result`` + # guards stage outputs. The queue can supersede a review + # mid-build, at which point the worktree may already be torn + # down — without this guard the write raises FileNotFoundError + # and the review exits as ``error`` instead of ``cancelled`` + # (observed on PR #318 May 11, 2026). The reason-split log + # message distinguishes a real cancel from a workspace-gone + # race so future audits don't conflate them. + if self._cancelled.is_set() or not self.workspace.exists(): + reason = "review cancelled" if self._cancelled.is_set() else "workspace gone" + logger.info(f"Skipping build.json write: {reason}") + self._emit("review_completed", review_id=review_id, decision="skip") + return + try: + (self.workspace / "build.json").write_text( + json.dumps(build_result, indent=2) + ) + except (FileNotFoundError, OSError) as e: + if self._cancelled.is_set() or not self.workspace.exists(): + logger.info( + f"build.json write failed after cancel/teardown: {e}" + ) + self._emit( + "review_completed", review_id=review_id, decision="skip" + ) + return + raise # === Fast-track check === fast_track = ( @@ -579,7 +609,7 @@ def run(self) -> None: # === POST REVIEW === github.post_review( self.repo, self.pr_number, verdict.data, build_result, self.head_sha, - config=self.config, + config=self.config, pr_author=self.pr_author, ) # Complete check run @@ -719,7 +749,20 @@ def run(self) -> None: # statuses API maps ``neutral`` → ``success`` state; the # title/description is the primary visible signal. fix_total = fix_result.fixed_count + fix_result.not_fixed_count - if not fix_result.success: + if fix_result.skipped: + # Cancellation (supersede / operator cancel / + # workspace teardown) or policy block (cooldown, + # soft / lifetime limit). Map to ``neutral`` so + # the PR check shows yellow rather than red and + # log as ``fix_skipped`` so reviews.jsonl + # success-rate isn't distorted by non-attempts. + fix_conclusion = "neutral" + check_title = ( + f"Gate Auto-Fix: skipped ({fix_result.reason})" + if fix_result.reason + else "Gate Auto-Fix: skipped" + ) + elif not fix_result.success: fix_conclusion = "failure" check_title = "Gate Auto-Fix: failed" elif is_no_op: @@ -756,7 +799,8 @@ def run(self) -> None: fix_result.summary, decision, repo=self.repo, fix_elapsed_seconds=fix_elapsed, status=( - "no_op" if is_no_op + "skipped" if fix_result.skipped + else "no_op" if is_no_op else ("succeeded" if fix_result.success else "failed") ), pipeline_mode=fix_result.pipeline_mode or None, @@ -847,6 +891,7 @@ def run(self) -> None: github.approve_pr( self.repo, self.pr_number, f"**Gate (error)** — review failed: {e}. Auto-approving.", + pr_author=self.pr_author, config=self.config, ) if self.check_run_id: github.complete_check_run( @@ -880,6 +925,7 @@ def _run_agent_stage(self, stage_name: str) -> StageResult: lambda: self._spawn_and_wait_agent(stage_name), stage_name, self.config, + cancelled=self._cancelled, ) def _spawn_and_wait_agent(self, stage_name: str) -> StageResult: @@ -904,9 +950,6 @@ def _spawn_and_wait_agent(self, stage_name: str) -> StageResult: deadline = time.monotonic() + timeout while time.monotonic() < deadline: - if self._cancelled.is_set(): - kill_window(pane_id) - return StageResult(stage=stage_name, success=False, data={}, cancelled=True) if result_file.exists(): try: envelope = json.loads(result_file.read_text()) @@ -919,7 +962,18 @@ def _spawn_and_wait_agent(self, stage_name: str) -> StageResult: ) except json.JSONDecodeError: pass - time.sleep(5) + # Wait up to 5s, but wake immediately on cancel. Using + # ``Event.wait`` instead of ``time.sleep`` cuts the + # worst-case cancellation latency in this loop from ~5s + # (next iteration's flag check) to ~0s (return as soon as + # cancel arrives). Net effect on PRs that get superseded + # mid-stage: orchestrator releases its executor slot and + # tears down the tmux window in <1s instead of up to 5s. + if self._cancelled.wait(5): + kill_window(pane_id) + with self._panes_lock: + self._active_panes.pop(stage_name, None) + return StageResult(stage=stage_name, success=False, data={}, cancelled=True) kill_window(pane_id) with self._panes_lock: @@ -932,9 +986,16 @@ def _run_structured_stage(self, stage_name: str) -> StageResult: vars_dict = prompt.build_vars(self.workspace, stage_name, self._env_vars(), self.config) assembled = prompt.safe_substitute(template, vars_dict, f"orchestrator-{stage_name}") return run_with_retry( - lambda: StructuredRunner().run(stage_name, assembled, self.workspace, self.config), + lambda: StructuredRunner().run( + stage_name, + assembled, + self.workspace, + self.config, + cancelled=self._cancelled, + ), stage_name, self.config, + cancelled=self._cancelled, ) # ── Helpers ────────────────────────────────────────────── @@ -996,14 +1057,22 @@ def _save_stage_result(self, stage_name: str, result: StageResult) -> None: if not self.workspace or not result.data: return if self._cancelled.is_set() or not self.workspace.exists(): - logger.info(f"Skipping {stage_name}.json write: review cancelled") + # Split the reason so the audit trail tells "operator/queue + # cancelled this review" apart from "the worktree got torn + # down out from under us mid-stage". Before this split the + # log line always said "review cancelled" regardless of + # which branch fired, which made the May 9 ~2h tails on + # PRs #326 / #330 look like a cancellation bug when the + # real cause was workspace teardown. + reason = "review cancelled" if self._cancelled.is_set() else "workspace gone" + logger.info(f"Skipping {stage_name}.json write: {reason}") return path = self.workspace / f"{stage_name}.json" try: path.write_text(json.dumps(result.data, indent=2)) except (FileNotFoundError, OSError) as e: - if self._cancelled.is_set(): - logger.info(f"{stage_name}.json write failed after cancel: {e}") + if self._cancelled.is_set() or not self.workspace.exists(): + logger.info(f"{stage_name}.json write failed after cancel/teardown: {e}") return raise diff --git a/gate/quota.py b/gate/quota.py index d24468d5..8de1a438 100644 --- a/gate/quota.py +++ b/gate/quota.py @@ -122,8 +122,21 @@ def _fail_open(reason: str, auth_drift: bool = False) -> dict: and the operator can tell "Anthropic is down" apart from "the token expired". A one-shot ntfy alert is fired at most once per 24h so we don't spam the operator (Group 4A). + + Auth-drift WARNINGs are gated on the same 24h marker as the ntfy + alert: the first fail-open in a 24h window logs at WARNING (with a + re-auth hint); subsequent calls in the same window log at DEBUG. + Without this, an expired token produces one WARNING per PR review + indefinitely (audit observed 201 WARNs over 30 days). """ - logger.warning(f"Quota check fail-open: {reason}") + if auth_drift and _auth_drift_marker_fresh(): + logger.debug(f"Quota check fail-open: {reason}") + else: + suffix = ( + " — run `claude auth login --method browser` to refresh" + if auth_drift else "" + ) + logger.warning(f"Quota check fail-open: {reason}{suffix}") if auth_drift: _maybe_alert_auth_drift(reason) return { @@ -143,6 +156,21 @@ def _auth_drift_marker_path() -> Path: return state_dir() / "quota-auth-drift-alerted.txt" +def _auth_drift_marker_fresh() -> bool: + """True if the auth-drift marker was written within the alert cooldown. + + Used by ``_fail_open`` to gate the WARNING-vs-DEBUG decision so an + operator with an expired token doesn't see one WARNING per PR + review. File-based so the gate survives process restarts. + """ + marker = _auth_drift_marker_path() + try: + last = float(marker.read_text().strip() or "0") + except (OSError, ValueError): + return False + return (datetime.now(timezone.utc).timestamp() - last) < _AUTH_DRIFT_ALERT_COOLDOWN_S + + def _maybe_alert_auth_drift(reason: str) -> None: """Fire a single auth-drift alert per 24h window.""" try: diff --git a/gate/reports.py b/gate/reports.py index 9d9bacea..9e529085 100644 --- a/gate/reports.py +++ b/gate/reports.py @@ -13,8 +13,10 @@ {``approve``, ``approve_with_notes``, ``request_changes``, ``error``, ``skip``, ``cancelled``}. * Fix-followup records — ``is_fix_followup: true`` and ``decision`` ∈ - {``fix_succeeded``, ``fix_failed``, ``fix_no_op``}. These do NOT - count as fresh reviews; they're aggregated separately. + {``fix_succeeded``, ``fix_failed``, ``fix_no_op``, ``fix_skipped``}. + These do NOT count as fresh reviews; they're aggregated separately. + ``fix_no_op`` and ``fix_skipped`` are excluded from the success-rate + denominator (neither represents a real fix attempt). Cost tracking is intentionally out of scope for Phase 1 — the ``reviews.jsonl`` writer doesn't emit token/cost fields today (see the @@ -78,7 +80,9 @@ def parse_since(spec: str) -> _dt.timedelta: "approve", "approve_with_notes", "request_changes", "error", "skip", "cancelled", }) -_FIX_DECISIONS = frozenset({"fix_succeeded", "fix_failed", "fix_no_op"}) +_FIX_DECISIONS = frozenset( + {"fix_succeeded", "fix_failed", "fix_no_op", "fix_skipped"} +) def _is_review(row: dict) -> bool: @@ -212,7 +216,13 @@ def summarize( ) succeeded = fix_outcomes.get("fix_succeeded", 0) failed = fix_outcomes.get("fix_failed", 0) - fix_attempts = succeeded + failed # exclude fix_no_op from rate denominator + # Exclude ``fix_no_op`` and ``fix_skipped`` from the rate + # denominator: neither represents an actual fix attempt that + # produced a verdict. ``fix_no_op`` is graceful-no-op + # (approve_with_notes with no mechanical work); ``fix_skipped`` + # is cancellation (supersede / operator cancel / workspace + # teardown) or policy block (cooldown, soft / lifetime limit). + fix_attempts = succeeded + failed fix_rate: float | None = None if fix_attempts: fix_rate = succeeded / fix_attempts diff --git a/gate/runner.py b/gate/runner.py index ef250af2..ee198f2d 100644 --- a/gate/runner.py +++ b/gate/runner.py @@ -512,8 +512,22 @@ def run( prompt_text: str, workspace: Path, config: dict, + cancelled: threading.Event | None = None, ) -> StageResult: - """Run Claude with --print and capture structured JSON output.""" + """Run Claude with --print and capture structured JSON output. + + When ``cancelled`` is provided, a watchdog thread polls the event + and terminates the Claude subprocess if cancellation is requested + mid-flight. Without this, a structured stage that took up to + ``timeouts.structured_stage_s`` (600s in production) seconds to + complete would block cancellation that long. With ``cancelled`` + the worst-case cancel latency is the watchdog poll interval + (~0.5s) plus the subprocess teardown grace period. + + Back-compat: callers passing ``cancelled=None`` (the default) get + the legacy blocking behavior — no watchdog thread, no + ``cancelled=True`` on the result. + """ cmd = ["claude", "--dangerously-skip-permissions", "--print"] model_key = stage.replace("-", "_") @@ -545,36 +559,15 @@ def run( timeout = config.get("timeouts", {}).get("structured_stage_s", 120) try: - proc = subprocess.run( + proc = subprocess.Popen( cmd, - input=prompt_text, - capture_output=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, text=True, env=env, cwd=str(workspace), - timeout=timeout, ) - - if proc.returncode != 0: - stderr = proc.stderr or "" - if _is_rate_limited(stderr): - return StageResult( - stage=stage, success=False, data={}, is_rate_limited=True - ) - if _is_transient(stderr): - return StageResult( - stage=stage, success=False, data={}, is_transient=True - ) - return StageResult.fallback(stage) - - # Parse structured output from stdout - result = self._parse_output(proc.stdout, stage) - if not result: - return StageResult.fallback(stage) - return StageResult(stage=stage, success=True, data=result) - - except subprocess.TimeoutExpired: - return StageResult.fallback(stage) except FileNotFoundError: return StageResult( stage=stage, @@ -583,6 +576,68 @@ def run( error="claude command not found", ) + # Watchdog thread: terminate the subprocess if cancel arrives + # mid-flight. Only spawned when ``cancelled`` is provided so + # legacy call sites (no event) keep their cheaper code path. + stop_watchdog = threading.Event() + + def _watch() -> None: + while not stop_watchdog.wait(0.5): + if ( + cancelled is not None + and cancelled.is_set() + and proc.poll() is None + ): + proc.terminate() + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + proc.kill() + return + + watchdog: threading.Thread | None = None + if cancelled is not None: + watchdog = threading.Thread( + target=_watch, name=f"struct-watch-{stage}", daemon=True + ) + watchdog.start() + + try: + try: + stdout, stderr = proc.communicate(input=prompt_text, timeout=timeout) + except subprocess.TimeoutExpired: + proc.kill() + stdout, stderr = proc.communicate() + return StageResult.fallback(stage) + finally: + stop_watchdog.set() + if watchdog is not None: + watchdog.join(timeout=1.0) + + # If cancellation set the watchdog to terminate the process, the + # return code will be non-zero with truncated output. Distinguish + # this from a real failure so callers don't retry. + if cancelled is not None and cancelled.is_set(): + return StageResult(stage=stage, success=False, data={}, cancelled=True) + + if proc.returncode != 0: + stderr_text = stderr or "" + if _is_rate_limited(stderr_text): + return StageResult( + stage=stage, success=False, data={}, is_rate_limited=True + ) + if _is_transient(stderr_text): + return StageResult( + stage=stage, success=False, data={}, is_transient=True + ) + return StageResult.fallback(stage) + + # Parse structured output from stdout + result = self._parse_output(stdout, stage) + if not result: + return StageResult.fallback(stage) + return StageResult(stage=stage, success=True, data=result) + def _parse_output(self, stdout: str, stage: str) -> dict | None: """Parse Claude's structured output. @@ -611,6 +666,7 @@ def run_with_retry( stage: str, config: dict, max_retries: int | None = None, + cancelled: threading.Event | None = None, ) -> StageResult: """Retry a stage with exponential backoff on rate limits/transient errors. @@ -621,13 +677,35 @@ def run_with_retry( stage: Stage name for fallback generation. config: Config dict with retry settings. max_retries: Override max retries (defaults to config value). + cancelled: Optional threading.Event. When provided, the retry + loop short-circuits between attempts and the back-off + sleeps wake immediately on ``set()``, so a supersede that + arrives during the exponential back-off doesn't burn the + full sleep (worst case today is ~5 min when stacked across + attempts). Back-compat: ``cancelled=None`` preserves the + legacy blocking ``time.sleep`` behavior exactly. """ retry_config = config.get("retry", {}) max_retries = max_retries or retry_config.get("max_retries", 4) base_delay = retry_config.get("base_delay_s", 60) transient_delay = retry_config.get("transient_base_delay_s", 10) + def _sleep_or_cancel(delay: float) -> bool: + """Sleep for ``delay`` seconds, returning True if cancelled. + + Uses ``Event.wait(delay)`` when a cancellation event is wired in + so the sleep can be interrupted; falls back to ``time.sleep`` + otherwise so legacy callers see byte-identical behavior. + """ + if cancelled is not None: + return cancelled.wait(delay) + time.sleep(delay) + return False + for attempt in range(1, max_retries + 1): + if cancelled is not None and cancelled.is_set(): + return StageResult(stage=stage, success=False, cancelled=True) + result = run_fn() if result.success: @@ -642,7 +720,8 @@ def run_with_retry( f"[{stage}] Rate limited, attempt {attempt}/{max_retries}, " f"waiting {delay:.0f}s" ) - time.sleep(delay) + if _sleep_or_cancel(delay): + return StageResult(stage=stage, success=False, cancelled=True) continue if result.is_transient and attempt < max_retries: @@ -651,7 +730,8 @@ def run_with_retry( f"[{stage}] Transient error, attempt {attempt}/{max_retries}, " f"waiting {delay:.0f}s" ) - time.sleep(delay) + if _sleep_or_cancel(delay): + return StageResult(stage=stage, success=False, cancelled=True) continue # Non-retryable error diff --git a/gate/schemas.py b/gate/schemas.py index a5789782..6b0e2bfa 100644 --- a/gate/schemas.py +++ b/gate/schemas.py @@ -319,6 +319,13 @@ class FixResult: sub_scope_empty: int = 0 wall_clock_seconds: int = 0 runaway_guard_hit: bool = False + # True when the fix pipeline intentionally short-circuited without + # attempting any real work — cancellation (supersede / operator + # cancel / workspace teardown) or policy block (cooldown, soft / + # lifetime limit). Loggers and check-run emitters treat these as + # ``fix_skipped`` rather than ``fix_failed`` so reviews.jsonl + # success-rate is not distorted by non-failures. + skipped: bool = False # Senior-authored commit message telemetry. ``"senior"`` when Gate # accepted the hopper-mode ``final_commit_message``, ``"synth"`` when # validation rejected it and Gate's template was used instead. Empty diff --git a/gate/tui.py b/gate/tui.py index 35cbf1f5..46fbe5ce 100644 --- a/gate/tui.py +++ b/gate/tui.py @@ -119,6 +119,12 @@ def _sanitize_pane_line(line: str, width: int = 72) -> str: # clean). Without an explicit entry it fell through to the "?" # fallback, which looked like an error in the reviews table. "fix_no_op": "⚒—", + # ``fix_skipped`` is emitted when the fix pipeline short-circuited + # before attempting work — cancellation (supersede / operator + # cancel / workspace teardown) or policy (cooldown, soft / lifetime + # limit). Excluded from the success-rate denominator in + # ``gate.reports`` since neither outcome is a real attempt. + "fix_skipped": "⚒○", } DECISION_COLORS = { @@ -130,6 +136,7 @@ def _sanitize_pane_line(line: str, width: int = 72) -> str: "fix_succeeded": "bright_green", "fix_failed": "bright_yellow", "fix_no_op": "dim", + "fix_skipped": "dim", } STAGE_COLORS = { @@ -1363,15 +1370,37 @@ def _poll_log(self, reviews: list[dict] | None = None) -> None: try: w = self.query_one(f"#{pane_info['widget_id']}", RichLog) container = w.parent - prev_sep = container.previous_sibling if container else None - if prev_sep and "log-pane-separator" in (prev_sep.classes or set()): - prev_sep.remove() - elif container: - next_sep = container.next_sibling - if next_sep and "log-pane-separator" in (next_sep.classes or set()): - next_sep.remove() - if container: - container.remove() + if container is None: + del self._log_panes[key] + continue + # Look up the container's position via the public + # ``children`` collection and remove an adjacent + # separator if present. Textual's Widget has no + # ``previous_sibling`` attribute (the audit observed + # 40+ AttributeError crashes from probing it); the + # prior code's intent — find the separator paired + # with this pane — survives via index lookup. + siblings = list(panes_container.children) + idx = siblings.index(container) + # Panes after the first are mounted preceded by a + # separator (see add path below). Removing the + # ``before`` separator is the canonical path; for + # the first pane (idx == 0) fall back to the + # following separator so we don't leave a stale + # leading divider. + if ( + idx > 0 + and "log-pane-separator" + in (siblings[idx - 1].classes or set()) + ): + siblings[idx - 1].remove() + elif ( + idx + 1 < len(siblings) + and "log-pane-separator" + in (siblings[idx + 1].classes or set()) + ): + siblings[idx + 1].remove() + container.remove() del self._log_panes[key] except Exception: logger.exception("Failed to remove log pane %s, rebuilding", key) diff --git a/tests/test_fixer.py b/tests/test_fixer.py index f0b1b3ac..1e6673af 100644 --- a/tests/test_fixer.py +++ b/tests/test_fixer.py @@ -1789,3 +1789,84 @@ def test_bootstrap_success_first_attempt( assert mock_bs.call_count == 1 assert pipe.codex_thread_id == "thread-abc" mock_notify.codex_unavailable.assert_not_called() + + +class TestFixResultSkipped: + """Audit P3.1: cancellation and policy-block branches return + ``FixResult(skipped=True)`` rather than the legacy + ``success=False``-with-no-skipped-flag that ``log_fix_result`` + mapped to ``decision: "fix_failed"``. PR #340 hit the cooldown + branch on May 13 2026 and was logged as a spurious ``fix_failed``, + which is what motivated this distinction. + """ + + def _make_pipeline(self, sample_config, tmp_path): + findings = [ + {"severity": "warning", "message": "test", "file": "a.ts", "line": 1} + ] + verdict = {"decision": "request_changes", "findings": findings} + (tmp_path / "verdict.json").write_text("{}") + (tmp_path / "triage.json").write_text("{}") + return FixPipeline( + pr_number=1, repo="a/b", workspace=tmp_path, + verdict=verdict, build={}, config=sample_config, + ) + + def test_cancelled_before_start_returns_skipped( + self, sample_config, tmp_path + ): + """Pre-set cancel event → ``skipped=True`` (was ``failed``).""" + pipe = self._make_pipeline(sample_config, tmp_path) + pipe._cancelled.set() + result = pipe.run() + assert result.success is False + assert result.skipped is True + assert "Cancelled" in result.summary + + @patch("gate.fixer.github") + @patch("gate.fixer.state.check_fix_limits") + def test_fix_cooldown_returns_skipped( + self, mock_limits, mock_gh, sample_config, tmp_path + ): + """``check_fix_limits`` returning False for cooldown maps to + ``skipped=True``. This is the exact PR #340 case.""" + mock_limits.return_value = (False, "Fix cooldown active (268s remaining)") + pipe = self._make_pipeline(sample_config, tmp_path) + result = pipe.run() + assert result.success is False + assert result.skipped is True + assert "cooldown" in result.reason.lower() + + @patch("gate.fixer.github") + @patch("gate.fixer.state.check_fix_limits") + def test_soft_limit_returns_skipped( + self, mock_limits, mock_gh, sample_config, tmp_path + ): + """Soft / lifetime limit hits are policy decisions, not + failures — share the same ``skipped`` mapping as the cooldown. + """ + mock_limits.return_value = (False, "Soft fix limit reached (3 this review cycle)") + pipe = self._make_pipeline(sample_config, tmp_path) + result = pipe.run() + assert result.success is False + assert result.skipped is True + + @patch("gate.fixer.github") + @patch("gate.fixer.state.check_fix_limits") + def test_lifetime_limit_returns_skipped( + self, mock_limits, mock_gh, sample_config, tmp_path + ): + mock_limits.return_value = (False, "Lifetime fix limit reached (6)") + pipe = self._make_pipeline(sample_config, tmp_path) + result = pipe.run() + assert result.success is False + assert result.skipped is True + + def test_skipped_default_false_on_real_failure_paths(self): + """Belt-and-suspenders: a default-constructed ``FixResult`` does + not accidentally claim ``skipped=True``. The flag must be + opt-in or the orchestrator will mis-route real failures. + """ + from gate.schemas import FixResult + result = FixResult(success=False, summary="something bad") + assert result.skipped is False diff --git a/tests/test_github.py b/tests/test_github.py index af19dfe8..377329cc 100644 --- a/tests/test_github.py +++ b/tests/test_github.py @@ -408,6 +408,53 @@ def test_swallows_other_errors_without_commenting( approve_pr("owner/repo", 42, "Looks good") mock_comment.assert_not_called() + # ── Bot-authored PR routing (audit P2.3) ───────────────── + + @patch("gate.github.comment_pr") + @patch("gate.github._gh") + def test_bot_authored_pr_skips_gh_review_and_comments_directly( + self, mock_gh, mock_comment + ): + """When ``pr_author == bot_account``, we know GitHub will reject + the self-review GraphQL call. Skip the wasted round-trip and + post as a comment directly. + """ + config = {"repo": {"bot_account": "openlawbot"}} + approve_pr( + "owner/repo", 42, "Looks good", + pr_author="openlawbot", config=config, + ) + mock_gh.assert_not_called() + mock_comment.assert_called_once_with("owner/repo", 42, "Looks good") + + @patch("gate.github.comment_pr") + @patch("gate.github._gh") + def test_human_pr_still_calls_gh_review(self, mock_gh, mock_comment): + config = {"repo": {"bot_account": "openlawbot"}} + mock_gh.return_value = "" + approve_pr( + "owner/repo", 42, "Looks good", + pr_author="alice", config=config, + ) + mock_gh.assert_called_once() + mock_comment.assert_not_called() + + @patch("gate.github.comment_pr") + @patch("gate.github._gh") + def test_empty_pr_author_falls_through_to_legacy_path( + self, mock_gh, mock_comment + ): + """``pr_author=""`` (the default) preserves legacy behavior — + the helper still attempts ``gh pr review`` and relies on the + stderr fallback if the bot is actually the author. This is the + path used by health.py orphan cleanup, which doesn't have + pr_author readily available. + """ + config = {"repo": {"bot_account": "openlawbot"}} + mock_gh.return_value = "" + approve_pr("owner/repo", 42, "Looks good", config=config) + mock_gh.assert_called_once() + class TestPostReview: @patch("gate.github.upsert_sticky_summary") @@ -435,6 +482,74 @@ def test_request_changes_falls_back_to_comment_when_own_pr_blocked( assert "Changes requested" in args[2] mock_sticky.assert_called_once_with("owner/repo", 42, verdict, None) + # ── Bot-authored PR routing (audit P2.3) ───────────────── + + @patch("gate.github.upsert_sticky_summary") + @patch("gate.github.comment_pr") + @patch("gate.github._gh") + def test_bot_authored_request_changes_skips_gh_review( + self, mock_gh, mock_comment, mock_sticky + ): + """Bot-authored PR + request_changes: post as a comment up front, + do not attempt the failing ``gh pr review --request-changes``. + Escalation still runs (label + reviewer) because humans need to + be paged regardless of who opened the PR. + """ + config = {"repo": {"bot_account": "openlawbot", "escalation_reviewers": ""}} + verdict = { + "decision": "request_changes", + "confidence": "high", + "summary": "Needs fixes", + "findings": [{"severity": "error", "file": "x.py", "message": "bug"}], + "stats": {"stages_run": 4}, + } + + post_review( + "owner/repo", 42, verdict, None, "deadbeef", + config=config, pr_author="openlawbot", + ) + + # No `pr review` calls at all — the bot path bypasses gh entirely + # for the review step. (Other `_gh` calls — e.g. escalation + # labels — are also gated, since no reviewers are configured.) + review_calls = [ + c for c in mock_gh.call_args_list + if c.args and c.args[0][:2] == ["pr", "review"] + ] + assert review_calls == [] + mock_comment.assert_called_once() + mock_sticky.assert_called_once() + + @patch("gate.github.upsert_sticky_summary") + @patch("gate.github.comment_pr") + @patch("gate.github._gh") + def test_bot_authored_approve_skips_gh_review( + self, mock_gh, mock_comment, mock_sticky + ): + """Bot-authored PR + approve: same routing — comment, no + ``gh pr review --approve`` round-trip. + """ + config = {"repo": {"bot_account": "openlawbot"}} + verdict = { + "decision": "approve", + "confidence": "high", + "summary": "LGTM", + "findings": [], + "stats": {"stages_run": 4}, + } + + post_review( + "owner/repo", 42, verdict, None, "deadbeef", + config=config, pr_author="openlawbot", + ) + + review_calls = [ + c for c in mock_gh.call_args_list + if c.args and c.args[0][:2] == ["pr", "review"] + ] + assert review_calls == [] + mock_comment.assert_called_once() + class TestCommentPr: @patch("gate.github._gh") diff --git a/tests/test_logger.py b/tests/test_logger.py index d2bb74de..5ac90421 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -188,6 +188,44 @@ def test_fix_result_omits_hopper_kwargs_when_not_set(self, tmp_path): ): assert key not in entry + def test_fix_result_skipped_writes_fix_skipped_decision(self, tmp_path): + """Audit P3.1: ``status="skipped"`` maps to ``decision: "fix_skipped"``. + + Cooldown / soft-limit / lifetime-limit / cancellation outcomes + previously logged as ``fix_failed`` (PR #340 nit on May 13). + After the fix, downstream consumers see them as a distinct + decision excluded from the success-rate denominator. + """ + logs_dir = tmp_path / "logs" + logs_dir.mkdir() + jsonl = logs_dir / "reviews.jsonl" + + with _patch_logs(logs_dir): + log_fix_result( + 340, False, "Fix cooldown active (268s remaining)", + "approve_with_notes", + repo="org/repo", + fix_elapsed_seconds=1, + status="skipped", + ) + entry = json.loads(jsonl.read_text().strip()) + assert entry["decision"] == "fix_skipped" + assert entry["is_fix_followup"] is True + assert entry["fix_summary"].startswith("Fix cooldown") + + def test_fix_result_no_op_unchanged_after_skipped_added(self, tmp_path): + """Regression: adding ``skipped`` to the decision map must not + silently change the legacy ``no_op`` mapping. + """ + logs_dir = tmp_path / "logs" + logs_dir.mkdir() + jsonl = logs_dir / "reviews.jsonl" + + with _patch_logs(logs_dir): + log_fix_result(7, True, "no mechanical work", "approve_with_notes", status="no_op") + entry = json.loads(jsonl.read_text().strip()) + assert entry["decision"] == "fix_no_op" + class TestWriteSidecarMeta: def test_writes_meta_file(self, tmp_path): diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py index 7e269592..765f308c 100644 --- a/tests/test_orchestrator.py +++ b/tests/test_orchestrator.py @@ -374,6 +374,55 @@ def test_writes_when_healthy(self, orchestrator, tmp_path): orchestrator._save_stage_result("triage", result) assert (tmp_path / "triage.json").exists() + # ── Misleading-message split (audit P2.2b) ─────────────── + + def test_log_message_says_cancelled_when_cancel_set( + self, orchestrator, tmp_path, caplog + ): + """Cancel-set branch logs ``review cancelled``.""" + import logging + + from gate.schemas import StageResult + + orchestrator.workspace = tmp_path + orchestrator._cancelled.set() + result = StageResult(stage="triage", success=True, data={"ok": True}) + + with caplog.at_level(logging.INFO, logger="gate.orchestrator"): + orchestrator._save_stage_result("triage", result) + + skip_msgs = [ + r.message for r in caplog.records + if "Skipping" in r.message and "triage.json" in r.message + ] + assert any("review cancelled" in m for m in skip_msgs) + + def test_log_message_says_workspace_gone_when_only_workspace_missing( + self, orchestrator, tmp_path, caplog + ): + """Workspace-missing branch logs ``workspace gone`` — the + misleading ``review cancelled`` message is what made the May 9 + 2-hour tails on PR #326/#330 look like a cancellation bug when + the real cause was workspace teardown. + """ + import logging + + from gate.schemas import StageResult + + # Workspace path that doesn't exist; cancel flag stays UNSET. + orchestrator.workspace = tmp_path / "ghost" + result = StageResult(stage="triage", success=True, data={"ok": True}) + + with caplog.at_level(logging.INFO, logger="gate.orchestrator"): + orchestrator._save_stage_result("triage", result) + + skip_msgs = [ + r.message for r in caplog.records + if "Skipping" in r.message and "triage.json" in r.message + ] + assert any("workspace gone" in m for m in skip_msgs) + assert not any("review cancelled" in m for m in skip_msgs) + # ── Gate 1: Labels ─────────────────────────────────────────── diff --git a/tests/test_quota.py b/tests/test_quota.py index 99d00eb0..cd716e08 100644 --- a/tests/test_quota.py +++ b/tests/test_quota.py @@ -1,8 +1,11 @@ """Tests for gate.quota module.""" +import logging +import time from unittest.mock import patch from gate.quota import ( + _AUTH_DRIFT_ALERT_COOLDOWN_S, _fail_open, _read_cache, _write_cache, @@ -17,6 +20,107 @@ def test_returns_quota_ok(self): assert result["quota_ok"] is True assert "fail-open" in result["reason"] + # ── Auth-drift 401 noise gating (audit P1.2) ───────────── + + def test_auth_drift_first_401_logs_warning_with_reauth_hint( + self, tmp_path, caplog + ): + """First 401 in a 24h window logs at WARNING with re-auth hint.""" + marker = tmp_path / "quota-auth-drift-alerted.txt" + with ( + patch("gate.quota._auth_drift_marker_path", lambda: marker), + # Skip the actual notify so caplog only captures our log. + patch("gate.quota._maybe_alert_auth_drift"), + caplog.at_level(logging.WARNING, logger="gate.quota"), + ): + result = _fail_open("HTTP 401 from usage API", auth_drift=True) + + assert result["auth_drift"] is True + warnings = [ + r for r in caplog.records + if r.levelname == "WARNING" and "fail-open" in r.message + ] + assert len(warnings) == 1 + assert "claude auth login" in warnings[0].message + + def test_auth_drift_repeat_401_within_cooldown_logs_debug( + self, tmp_path, caplog + ): + """Second+ 401 within 24h logs at DEBUG, not WARNING. + + Prevents the 200+ identical WARNINGs observed in production + (audit, May 13 2026: 201 fail-open lines in 30 days). + """ + marker = tmp_path / "quota-auth-drift-alerted.txt" + # Pretend we already alerted 5 minutes ago. + marker.write_text(str(time.time() - 300)) + + with ( + patch("gate.quota._auth_drift_marker_path", lambda: marker), + patch("gate.quota._maybe_alert_auth_drift"), + caplog.at_level(logging.DEBUG, logger="gate.quota"), + ): + _fail_open("HTTP 401 from usage API", auth_drift=True) + + # No WARNING for the fail-open line; the message landed at DEBUG. + warnings = [ + r for r in caplog.records + if r.levelname == "WARNING" and "fail-open" in r.message + ] + assert warnings == [] + debugs = [ + r for r in caplog.records + if r.levelname == "DEBUG" and "fail-open" in r.message + ] + assert len(debugs) == 1 + + def test_auth_drift_after_marker_expires_logs_warning_again( + self, tmp_path, caplog + ): + """When the marker is older than the 24h cooldown, the next 401 + is WARNING again. Ensures the operator still gets a daily ping. + """ + marker = tmp_path / "quota-auth-drift-alerted.txt" + # Marker is stale (older than the cooldown). + marker.write_text(str(time.time() - _AUTH_DRIFT_ALERT_COOLDOWN_S - 60)) + + with ( + patch("gate.quota._auth_drift_marker_path", lambda: marker), + patch("gate.quota._maybe_alert_auth_drift"), + caplog.at_level(logging.WARNING, logger="gate.quota"), + ): + _fail_open("HTTP 401 from usage API", auth_drift=True) + + warnings = [ + r for r in caplog.records + if r.levelname == "WARNING" and "fail-open" in r.message + ] + assert len(warnings) == 1 + + def test_non_auth_failure_always_warns_regardless_of_marker( + self, tmp_path, caplog + ): + """The marker only gates auth-drift fail-opens. Non-auth + failures (network errors, JSON parse errors, etc.) should still + log at WARNING every time so the operator can see them. + """ + marker = tmp_path / "quota-auth-drift-alerted.txt" + marker.write_text(str(time.time())) + + with ( + patch("gate.quota._auth_drift_marker_path", lambda: marker), + caplog.at_level(logging.WARNING, logger="gate.quota"), + ): + _fail_open("network error", auth_drift=False) + + warnings = [ + r for r in caplog.records + if r.levelname == "WARNING" and "fail-open" in r.message + ] + assert len(warnings) == 1 + # No re-auth hint on non-auth failures. + assert "claude auth login" not in warnings[0].message + class TestCache: def test_write_and_read(self, tmp_path): diff --git a/tests/test_reports.py b/tests/test_reports.py index 8fc85567..1c00b0dd 100644 --- a/tests/test_reports.py +++ b/tests/test_reports.py @@ -171,6 +171,47 @@ def test_fix_no_op_excluded_from_rate_denominator(self): report = summarize(rows) assert report.fix_success_rate == 1.0 + def test_fix_skipped_excluded_from_rate_denominator(self): + """Audit P3.1: ``fix_skipped`` (cooldown / cancellation / limit) + is not a real fix attempt and must not drag the success rate + down. Same treatment as ``fix_no_op``. + """ + rows = [ + _fix("fix_succeeded"), + _fix("fix_skipped"), + _fix("fix_skipped"), + _fix("fix_skipped"), + ] + report = summarize(rows) + assert report.fix_success_rate == 1.0 + assert report.fix_outcomes.get("fix_skipped") == 3 + + def test_fix_skipped_classified_as_fix_followup(self): + """``_is_fix_followup`` must recognise the new ``fix_skipped`` + decision so the row goes into the fix-outcomes bucket rather + than ``decisions_by_repo``. + """ + rows = [_review("approve"), _fix("fix_skipped")] + report = summarize(rows) + assert report.total_reviews == 1 + assert report.total_fix_followups == 1 + assert report.fix_outcomes.get("fix_skipped") == 1 + + def test_fix_rate_with_mixed_outcomes_excludes_skipped_and_no_op(self): + """Belt-and-suspenders: explicitly verify rate = 2/3, not 2/6, + when fix_succeeded=2, fix_failed=1, fix_no_op=2, fix_skipped=1. + """ + rows = [ + _fix("fix_succeeded"), + _fix("fix_succeeded"), + _fix("fix_failed"), + _fix("fix_no_op"), + _fix("fix_no_op"), + _fix("fix_skipped"), + ] + report = summarize(rows) + assert report.fix_success_rate == 2 / 3 + def test_review_time_avg_and_p95(self): rows = [_review("approve", seconds=10), _review("approve", seconds=20), _review("approve", seconds=30)] diff --git a/tests/test_runner.py b/tests/test_runner.py index e34a6698..5ff3b10d 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -5,7 +5,10 @@ """ import json -from unittest.mock import patch +import subprocess +import threading +import time +from unittest.mock import MagicMock, patch from gate.runner import ( ReviewRunner, @@ -183,47 +186,69 @@ def test_parse_output_empty(self): assert runner._parse_output("", "triage") is None assert runner._parse_output(" ", "triage") is None - @patch("gate.runner.subprocess.run") - def test_run_passes_prompt_via_stdin_not_argv(self, mock_run, tmp_path): + @patch("gate.runner.subprocess.Popen") + def test_run_passes_prompt_via_stdin_not_argv(self, mock_popen, tmp_path): """Regression test for ARG_MAX overflow on PRs with huge diffs. - Before this fix, the assembled prompt was appended to argv with - ``cmd.append(prompt_text)`` and any prompt larger than macOS - ARG_MAX (~1 MB) raised + Before the original ARG_MAX fix, the assembled prompt was + appended to argv with ``cmd.append(prompt_text)`` and any + prompt larger than macOS ARG_MAX (~1 MB) raised ``OSError: [Errno 7] Argument list too long: 'claude'`` inside ``_execute_child`` before claude even started — silently failing the structured stage with no actionable error. - adin-chat PR #261 (554 files, 5.5 MB diff) was the original repro. + adin-chat PR #261 (554 files, 5.5 MB diff) was the original + repro. After the May 2026 cancellation refactor (audit P2.1) + we run claude via ``Popen`` + ``communicate(input=...)`` so a + watchdog thread can ``terminate()`` mid-flight on cancel; + the prompt-via-stdin contract is preserved. """ - mock_run.return_value = type( - "P", (), {"returncode": 0, "stdout": '{"change_type":"x"}', "stderr": ""} - )() + proc = MagicMock() + proc.communicate.return_value = ('{"change_type":"x"}', "") + proc.returncode = 0 + mock_popen.return_value = proc + runner = StructuredRunner() prompt = "x" * 5_500_000 # 5.5 MB — same scale as PR #261 runner.run("triage", prompt, tmp_path, {"models": {}, "timeouts": {}}) - call = mock_run.call_args - cmd = call.args[0] + + popen_call = mock_popen.call_args + cmd = popen_call.args[0] assert prompt not in cmd, ( "prompt must NOT be appended to argv (would hit ARG_MAX on big PRs); " - "pass it via input= instead" + "pass it via communicate(input=...) instead" + ) + # The prompt is delivered through ``communicate(input=prompt)``, + # which requires ``stdin=PIPE`` on the Popen call so the child + # has a pipe to read from. Verify both halves of that contract. + assert popen_call.kwargs.get("stdin") == subprocess.PIPE, ( + "Popen must set stdin=PIPE so communicate(input=...) has somewhere to write" ) - assert call.kwargs.get("input") == prompt, "prompt must be piped via stdin" + proc.communicate.assert_called_once() + comm_kwargs = proc.communicate.call_args.kwargs + assert comm_kwargs.get("input") == prompt, "prompt must be piped via stdin" assert "--print" in cmd, "claude must run with --print so it reads stdin" - @patch("gate.runner.subprocess.run") - def test_run_does_not_set_stdin_kwarg(self, mock_run, tmp_path): - """``input=`` and ``stdin=`` are mutually exclusive in subprocess.run. + @patch("gate.runner.subprocess.Popen") + def test_run_sets_stdin_pipe_for_communicate(self, mock_popen, tmp_path): + """``Popen(stdin=PIPE)`` is required for ``communicate(input=...)``. - Setting both raises ValueError at runtime. Pin the contract so - nobody re-adds an explicit ``stdin=DEVNULL``. + Pins the contract that the prompt-via-stdin path uses a pipe + (not DEVNULL, not file). Replaces the prior ``test_run_does_not_ + set_stdin_kwarg`` regression which only made sense with + ``subprocess.run(input=...)``. """ - mock_run.return_value = type( - "P", (), {"returncode": 0, "stdout": "{}", "stderr": ""} - )() + proc = MagicMock() + proc.communicate.return_value = ("{}", "") + proc.returncode = 0 + mock_popen.return_value = proc + runner = StructuredRunner() runner.run("triage", "tiny prompt", tmp_path, {"models": {}, "timeouts": {}}) - assert "stdin" not in mock_run.call_args.kwargs + + assert mock_popen.call_args.kwargs.get("stdin") == subprocess.PIPE + assert mock_popen.call_args.kwargs.get("stdout") == subprocess.PIPE + assert mock_popen.call_args.kwargs.get("stderr") == subprocess.PIPE class TestRunWithRetry: @@ -284,6 +309,206 @@ def test_cancelled_result_not_retried(self, sample_config): result = run_with_retry(lambda: cancelled, "triage", sample_config) assert result.cancelled is True + # ── Cancellation event plumbing (audit P2.1) ───────────── + + def test_cancelled_event_short_circuits_before_first_attempt( + self, sample_config + ): + """Pre-set cancel event causes early return before run_fn runs. + + Mirrors the orchestrator behavior where a queue supersede can + flip ``_cancelled`` between stage selection and the stage's + first attempt. Without this gate the orchestrator burns one + full stage attempt's worth of time after the cancel. + """ + cancelled = threading.Event() + cancelled.set() + called = [0] + + def run_fn(): + called[0] += 1 + return StageResult(stage="triage", success=True, data={"key": "val"}) + + result = run_with_retry( + run_fn, "triage", sample_config, cancelled=cancelled + ) + assert called[0] == 0 + assert result.cancelled is True + assert result.success is False + + def test_cancelled_event_breaks_rate_limit_retry_sleep(self, sample_config): + """Setting the event mid retry-sleep wakes the wait immediately. + + Without ``Event.wait``, the rate-limit back-off uses + ``time.sleep(60+)`` which blocks supersede latency by minutes. + With the event, ``wait(delay)`` returns True the moment the + event is set from another thread. + """ + # Force a short base delay so the test can complete fast. + config = {"retry": {"max_retries": 2, "base_delay_s": 5}} + cancelled = threading.Event() + rate_limited = StageResult( + stage="triage", success=False, is_rate_limited=True + ) + + # Fire the cancel ~50ms into the rate-limit sleep. + def _cancel_soon(): + time.sleep(0.05) + cancelled.set() + + threading.Thread(target=_cancel_soon, daemon=True).start() + + start = time.monotonic() + result = run_with_retry( + lambda: rate_limited, "triage", config, cancelled=cancelled + ) + elapsed = time.monotonic() - start + + assert result.cancelled is True + assert elapsed < 1.0, ( + f"cancel during rate-limit sleep should wake in <1s, took {elapsed:.2f}s" + ) + + def test_cancelled_event_breaks_transient_retry_sleep(self, sample_config): + """Same shape as the rate-limit test but for the transient branch.""" + config = { + "retry": {"max_retries": 2, "base_delay_s": 60, "transient_base_delay_s": 5} + } + cancelled = threading.Event() + transient = StageResult(stage="triage", success=False, is_transient=True) + + def _cancel_soon(): + time.sleep(0.05) + cancelled.set() + + threading.Thread(target=_cancel_soon, daemon=True).start() + + start = time.monotonic() + result = run_with_retry( + lambda: transient, "triage", config, cancelled=cancelled + ) + elapsed = time.monotonic() - start + + assert result.cancelled is True + assert elapsed < 1.0, ( + f"cancel during transient sleep should wake in <1s, took {elapsed:.2f}s" + ) + + def test_cancelled_none_preserves_legacy_behavior(self, sample_config): + """Back-compat: ``cancelled=None`` (the default) uses time.sleep. + + Legacy callers that haven't been updated must see byte-identical + behavior. Verified by patching ``time.sleep`` and asserting it + was invoked (the Event.wait path would skip the patch). + """ + rate_limited = StageResult( + stage="triage", success=False, is_rate_limited=True + ) + + with patch("gate.runner.time.sleep") as mock_sleep: + run_with_retry(lambda: rate_limited, "triage", sample_config) + assert mock_sleep.called, "legacy callers must still use time.sleep" + + +class TestStructuredRunnerCancellation: + """Watchdog-based cancellation for the structured stage subprocess. + + Audit P2.1: ``StructuredRunner.run`` previously blocked up to + ``structured_stage_s`` (600s in production config) on a single + attempt. With the watchdog wired in, cancellation arriving mid-call + terminates the Claude subprocess within ~0.5s. + """ + + @patch("gate.runner.subprocess.Popen") + def test_cancelled_event_terminates_subprocess(self, mock_popen, tmp_path): + cancelled = threading.Event() + + proc = MagicMock() + # Simulate Claude taking forever: communicate() blocks until cancel + # is set, then unblocks (mimicking proc.terminate()). + comm_done = threading.Event() + + def _communicate(input=None, timeout=None): + # Wait for the watchdog to call terminate(), then return. + comm_done.wait(timeout=3.0) + return ("", "killed by cancel") + + def _terminate(): + comm_done.set() + + proc.communicate.side_effect = _communicate + proc.terminate.side_effect = _terminate + proc.kill = MagicMock() + proc.poll.return_value = None + proc.returncode = -15 # SIGTERM + proc.wait.return_value = -15 + mock_popen.return_value = proc + + # Fire cancel ~100ms after run() starts. + def _cancel_soon(): + time.sleep(0.1) + cancelled.set() + + threading.Thread(target=_cancel_soon, daemon=True).start() + + runner = StructuredRunner() + result = runner.run( + "triage", "prompt", tmp_path, + {"models": {}, "timeouts": {}}, + cancelled=cancelled, + ) + + assert result.cancelled is True + assert proc.terminate.called, "watchdog must call terminate() on cancel" + + @patch("gate.runner.subprocess.Popen") + def test_cancelled_after_completion_returns_normal_result( + self, mock_popen, tmp_path + ): + """If cancel arrives after communicate() returns cleanly, the + normal parse path runs — we don't retroactively mark success as + cancelled. + """ + cancelled = threading.Event() + + proc = MagicMock() + proc.communicate.return_value = ('{"change_type":"refactor"}', "") + proc.returncode = 0 + proc.poll.return_value = 0 + mock_popen.return_value = proc + + runner = StructuredRunner() + result = runner.run( + "triage", "prompt", tmp_path, + {"models": {}, "timeouts": {}}, + cancelled=cancelled, + ) + + # Cancel set AFTER run completes — should not affect result. + cancelled.set() + + assert result.cancelled is False + assert result.success is True + + @patch("gate.runner.subprocess.Popen") + def test_no_watchdog_when_cancelled_is_none(self, mock_popen, tmp_path): + """Legacy callers (``cancelled=None``) get the cheaper code path — + no watchdog thread, no terminate() machinery. + """ + proc = MagicMock() + proc.communicate.return_value = ('{"x": 1}', "") + proc.returncode = 0 + mock_popen.return_value = proc + + runner = StructuredRunner() + # We can't directly assert "no thread spawned" without + # introspection, but we can assert terminate is not called. + result = runner.run( + "triage", "prompt", tmp_path, {"models": {}, "timeouts": {}} + ) + assert proc.terminate.called is False + assert result.success is True + class TestReviewRunnerHandleSignalSweep: """Fix 2d: the runner's shutdown handler must ``pkill -TERM -P``