From c0bbdcdb64c03d5566a3c37c87009ff406929e59 Mon Sep 17 00:00:00 2001 From: gf5901 Date: Mon, 23 Mar 2026 02:46:27 +0000 Subject: [PATCH 1/2] fix: set child subtasks to completed (not in_review) and improve comment reply resilience Child subtasks created for UI visibility during planning don't have their own PRs, so setting them to in_review was semantically wrong and confusing. Now only the parent task gets in_review when a PR is created; subtasks always get completed. Also improves comment reply handling: - Add COMMENT_REPLY_TEXT_ONLY_PROMPT for when worktree cannot be recreated, so the agent doesn't falsely claim it has code access - Add better logging/diagnostics throughout _get_or_create_reply_worktree when recreation fails, making silent tmpdir fallback more visible - Add _maybe_rebase_for_merge_conflicts: when a comment mentions merge conflicts and the worktree is recreated, attempt git rebase onto the base branch before handing off to the agent - Pass comment_body to worktree helper to enable conflict-aware rebase - Add 9 new tests covering subtask status, text-only prompt fallback, worktree recreation, and rebase logic Made-with: Cursor --- src/pipeline.py | 126 +++++++++++++-- tests/test_runner_helpers.py | 304 ++++++++++++++++++++++++++++++++++- 2 files changed, 412 insertions(+), 18 deletions(-) diff --git a/src/pipeline.py b/src/pipeline.py index ce4dfb4..0383ff1 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -191,6 +191,34 @@ def _build_role_options(): Respond concisely.""" ) +COMMENT_REPLY_TEXT_ONLY_PROMPT = ( + SECURITY_PREFIX + + """\ +You are responding to a comment on a task you previously worked on. A user has posted a new \ +comment that requires your attention. Note: you do NOT have access to the code or git worktree \ +for this task — the worktree was cleaned up after the PR was created, and could not be \ +recreated. You can only provide a text response. + +## Task +**%s** +%s +%s +## Latest Comment (from %s) +%s + +## Your Instructions +Respond to the comment above with guidance, explanations, or next steps. Since you cannot \ +read or edit files directly, focus on: +- Answering questions about the changes you made +- Explaining your approach or reasoning +- Suggesting specific commands or code changes the user can make manually +- If the comment asks for code changes (CI fixes, merge conflicts, etc.), provide detailed \ +instructions the user can follow, or suggest re-running the task + +Do NOT claim you can edit files or that you have access to the worktree — you don't. \ +Respond concisely.""" +) + HUMAN_TASK_REPLY_PROMPT = ( SECURITY_PREFIX + """\ @@ -667,7 +695,7 @@ def run_directive(store, project_id, directive_sk): return True -def _get_or_create_reply_worktree(task): +def _get_or_create_reply_worktree(task, comment_body=""): # type: (...) -> tuple """Return (wt_path, created_fresh) for the task's worktree. @@ -705,10 +733,22 @@ def _get_or_create_reply_worktree(task): try: repo_dir = _resolve_repo_dir(task) except Exception: + log.warning( + "Comment reply: cannot resolve repo dir for task %s (target_repo=%s) — " + "falling back to text-only reply", + task.id, + getattr(task, "target_repo", ""), + ) return None, False try: - _run_cmd(["git", "fetch", "origin"], cwd=repo_dir, timeout=60) + fetch = _run_cmd(["git", "fetch", "origin"], cwd=repo_dir, timeout=60) + if fetch.returncode != 0: + log.warning( + "Comment reply: git fetch failed for task %s: %s", + task.id, + fetch.stderr[:200], + ) slug = _slugify_branch(task.title) branch = "task/%s-%s" % (task.id, slug) WORKTREE_BASE.mkdir(parents=True, exist_ok=True) @@ -718,7 +758,16 @@ def _get_or_create_reply_worktree(task): ) if result.returncode == 0: log.info("Comment reply: re-created worktree at %s on branch %s", wt_path, branch) + _maybe_rebase_for_merge_conflicts(wt_path, repo_dir, comment_body) return wt_path, True + + log.info( + "Comment reply: worktree add on branch '%s' failed (rc=%d, stderr=%s) — " + "trying fresh branch from default", + branch, + result.returncode, + result.stderr[:200], + ) # Branch doesn't exist remotely — fall back to default branch from .worktree import _get_default_branch @@ -730,12 +779,53 @@ def _get_or_create_reply_worktree(task): if result2.returncode == 0: log.info("Comment reply: created fresh worktree at %s on %s", wt_path, default_branch) return wt_path, True + + log.warning( + "Comment reply: all worktree creation attempts failed for task %s — " + "falling back to text-only reply (last stderr=%s)", + task.id, + result2.stderr[:200], + ) except Exception: - log.exception("Comment reply: failed to create worktree for task %s", task.id) + log.exception( + "Comment reply: unexpected error creating worktree for task %s — " + "falling back to text-only reply", + task.id, + ) return None, False +def _maybe_rebase_for_merge_conflicts(wt_path, repo_dir, comment_body): + # type: (str, str, str) -> None + """If the comment mentions merge conflicts, attempt a rebase onto the base branch.""" + if not comment_body: + return + lower = comment_body.lower() + conflict_keywords = ("merge conflict", "merge conflicts", "conflicts with", "cannot be merged") + if not any(kw in lower for kw in conflict_keywords): + return + + from .worktree import _get_default_branch + + default_branch = _get_default_branch(repo_dir) + log.info("Comment reply: comment mentions merge conflicts — attempting rebase onto %s", default_branch) + rebase = _run_cmd( + ["git", "rebase", "origin/%s" % default_branch], + cwd=wt_path, + timeout=120, + ) + if rebase.returncode == 0: + log.info("Comment reply: rebase onto %s succeeded", default_branch) + else: + log.warning( + "Comment reply: rebase failed (rc=%d) — aborting rebase; agent will need to resolve manually. stderr=%s", + rebase.returncode, + rebase.stderr[:300], + ) + _run_cmd(["git", "rebase", "--abort"], cwd=wt_path, timeout=15) + + def _check_and_save_ci(store, task, pr_url, wt_path): # type: (DynamoTaskStore, Any, str, str) -> None """Poll CI on a PR after a comment-reply push and save the result to the task.""" @@ -931,6 +1021,16 @@ def run_comment_reply(store, task_id): pr_section = "\n## Connected PR\nThis task has an open PR: %s\n" % pr_url else: pr_section = "" + plog(task_id, "reply_start", "execute", "Responding to comment") + + # Human-assigned tasks (without a project — rare fallback) always get a + # text-only reply in a tmpdir; they should never create code or worktrees. + if is_human: + wt_path = None + created_fresh = False + else: + wt_path, created_fresh = _get_or_create_reply_worktree(task, comment_body=latest.body) + if is_human: prompt = HUMAN_TASK_REPLY_PROMPT % ( task.title, @@ -938,7 +1038,7 @@ def run_comment_reply(store, task_id): latest.author, latest.body, ) - else: + elif wt_path: prompt = COMMENT_REPLY_PROMPT % ( task.title, desc_snippet, @@ -946,16 +1046,14 @@ def run_comment_reply(store, task_id): latest.author, latest.body, ) - - plog(task_id, "reply_start", "execute", "Responding to comment") - - # Human-assigned tasks (without a project — rare fallback) always get a - # text-only reply in a tmpdir; they should never create code or worktrees. - if is_human: - wt_path = None - created_fresh = False else: - wt_path, created_fresh = _get_or_create_reply_worktree(task) + prompt = COMMENT_REPLY_TEXT_ONLY_PROMPT % ( + task.title, + desc_snippet, + pr_section, + latest.author, + latest.body, + ) try: if wt_path: @@ -1276,7 +1374,7 @@ def _run_one_inner(store, task): final_status = TaskStatus.IN_REVIEW if pr_created else TaskStatus.COMPLETED for sub in subtasks: - store.update_status(sub.id, final_status) + store.update_status(sub.id, TaskStatus.COMPLETED) store.update_status(task.id, final_status) _notify_pm_chat_task_terminal( task, diff --git a/tests/test_runner_helpers.py b/tests/test_runner_helpers.py index afa3d9e..c2ff899 100644 --- a/tests/test_runner_helpers.py +++ b/tests/test_runner_helpers.py @@ -1,7 +1,9 @@ """Tests for runner helper functions — prompts, slugs, model resolution, appending.""" +import json import os import subprocess +from unittest.mock import patch from src.task_store import Task, TaskPriority, TaskStatus @@ -973,7 +975,7 @@ def capture_agent(prompt, cwd=None, **kw): # Stub worktree helper so it would return a path if not blocked monkeypatch.setattr( "src.pipeline._get_or_create_reply_worktree", - lambda task: ("/tmp/fake-wt", False), + lambda task, comment_body="": ("/tmp/fake-wt", False), ) task = tmp_tasks.create(title="T") @@ -1005,7 +1007,7 @@ def test_commit_reply_changes_called_when_wt_present(self, tmp_tasks, monkeypatc ) monkeypatch.setattr( "src.pipeline._get_or_create_reply_worktree", - lambda task: ("/tmp/fake-wt", False), + lambda task, comment_body="": ("/tmp/fake-wt", False), ) task = tmp_tasks.create(title="T") @@ -1027,7 +1029,7 @@ def test_fresh_worktree_cleaned_up_after_reply(self, tmp_tasks, monkeypatch): monkeypatch.setattr("src.pipeline._commit_reply_changes", lambda *a, **kw: None) monkeypatch.setattr( "src.pipeline._get_or_create_reply_worktree", - lambda task: ("/tmp/fake-wt", True), # created_fresh=True + lambda task, comment_body="": ("/tmp/fake-wt", True), # created_fresh=True ) cleanup_calls = [] @@ -1055,7 +1057,7 @@ def test_preexisting_worktree_not_cleaned_up(self, tmp_tasks, monkeypatch): monkeypatch.setattr("src.pipeline._commit_reply_changes", lambda *a, **kw: None) monkeypatch.setattr( "src.pipeline._get_or_create_reply_worktree", - lambda task: ("/tmp/fake-wt", False), # created_fresh=False + lambda task, comment_body="": ("/tmp/fake-wt", False), # created_fresh=False ) cleanup_calls = [] @@ -1228,3 +1230,297 @@ def test_update_status_endpoint_returns_400_for_invalid_status(self, tmp_tasks, endpoint_update_status(task.id, StatusBody(status="nonsense")) ) assert response.status_code == 400 + + +class TestTextOnlyCommentReply: + """When the worktree cannot be recreated, the agent gets a text-only prompt.""" + + def _set_reply_pending(self, tmp_tasks, task_id): + tmp_tasks.set_reply_pending(task_id, True) + + def test_text_only_prompt_when_worktree_returns_none(self, tmp_tasks, monkeypatch): + """If _get_or_create_reply_worktree returns None, use COMMENT_REPLY_TEXT_ONLY_PROMPT.""" + from src.pipeline import run_comment_reply + + captured = {} + + def capture_agent(prompt, **kw): + import subprocess as _sp + + captured["prompt"] = prompt + captured["cwd"] = kw.get("cwd") + fake = _sp.CompletedProcess(args=[], returncode=0, stdout="text reply", stderr="") + return fake, 1.0, "", {} + + monkeypatch.setattr("src.pipeline.run_agent", capture_agent) + monkeypatch.setattr( + "src.pipeline._get_or_create_reply_worktree", + lambda task, comment_body="": (None, False), + ) + + task = tmp_tasks.create(title="My broken task", description="Some desc") + tmp_tasks.add_comment(task.id, "web", "Why is CI failing?") + self._set_reply_pending(tmp_tasks, task.id) + + result = run_comment_reply(tmp_tasks, task.id) + + assert result is True + assert "you do NOT have access to the code" in captured["prompt"] + assert "same git worktree" not in captured["prompt"] + assert "My broken task" in captured["prompt"] + assert "Why is CI failing?" in captured["prompt"] + + def test_code_access_prompt_when_worktree_exists(self, tmp_tasks, monkeypatch): + """If _get_or_create_reply_worktree returns a path, use the standard prompt.""" + from src.pipeline import run_comment_reply + + captured = {} + + def capture_agent(prompt, **kw): + import subprocess as _sp + + captured["prompt"] = prompt + fake = _sp.CompletedProcess(args=[], returncode=0, stdout="code reply", stderr="") + return fake, 1.0, "", {} + + monkeypatch.setattr("src.pipeline.run_agent", capture_agent) + monkeypatch.setattr("src.pipeline._commit_reply_changes", lambda *a, **kw: None) + monkeypatch.setattr( + "src.pipeline._get_or_create_reply_worktree", + lambda task, comment_body="": ("/tmp/fake-wt", False), + ) + + task = tmp_tasks.create(title="Good task", description="Works fine") + tmp_tasks.add_comment(task.id, "web", "Please fix this") + self._set_reply_pending(tmp_tasks, task.id) + + result = run_comment_reply(tmp_tasks, task.id) + + assert result is True + assert "same git worktree" in captured["prompt"] + assert "do NOT have access" not in captured["prompt"] + + +class TestSubtaskStatusOnPr: + """Child subtasks always get completed, only the parent gets in_review.""" + + @patch("src.pipeline._notify_pm_chat_task_terminal") + @patch("src.pipeline.trigger_unblocked_dependents") + @patch("src.pipeline._maybe_finalize_directive_batch") + @patch("src.pipeline.commit_and_create_pr") + @patch("src.pipeline.run_agent") + @patch("src.pipeline.create_worktree", return_value="/tmp/wt") + @patch("src.pipeline.ensure_repo") + @patch("src.pipeline.cleanup_worktree") + @patch("src.pipeline.plog") + def test_subtasks_completed_when_parent_gets_in_review( + self, + mock_plog, + mock_cleanup, + mock_ensure, + mock_create_wt, + mock_agent, + mock_pr, + mock_finalize, + mock_unblock, + mock_notify, + tmp_tasks, + monkeypatch, + ): + from src.pipeline import _run_one_inner + + parent = tmp_tasks.create( + title="Parent task", + priority="high", + description="Step one: build the frontend. Then deploy. After that run integration tests.", + ) + tmp_tasks.update_status(parent.id, TaskStatus.IN_PROGRESS) + parent = tmp_tasks.get(parent.id) + + plan_json = ( + '[{"title": "Step 1", "description": "Do A"}, ' + '{"title": "Step 2", "description": "Do B"}]' + ) + mock_agent.return_value = ( + subprocess.CompletedProcess(args=[], returncode=0, stdout="done"), + 10.0, + "", + {}, + ) + mock_pr.return_value = "https://github.com/user/repo/pull/42" + + monkeypatch.setattr("src.pipeline.AUTO_PLAN", True) + monkeypatch.setattr("src.pipeline.AUTO_DOCS", False) + monkeypatch.setattr("src.pipeline.AUTO_PR", True) + monkeypatch.setattr("src.pipeline._resolve_model", lambda t: None) + monkeypatch.setattr("src.pipeline.plan_task", lambda s, t, cwd=None: json.loads(plan_json)) + + _run_one_inner(tmp_tasks, parent) + + parent_updated = tmp_tasks.get(parent.id) + assert parent_updated.status == TaskStatus.IN_REVIEW + + subtasks = tmp_tasks.list_subtasks(parent.id) + assert len(subtasks) == 2 + for sub in subtasks: + assert sub.status == TaskStatus.COMPLETED + + @patch("src.pipeline._notify_pm_chat_task_terminal") + @patch("src.pipeline.trigger_unblocked_dependents") + @patch("src.pipeline._maybe_finalize_directive_batch") + @patch("src.pipeline.commit_and_create_pr") + @patch("src.pipeline.run_agent") + @patch("src.pipeline.create_worktree", return_value="/tmp/wt") + @patch("src.pipeline.ensure_repo") + @patch("src.pipeline.cleanup_worktree") + @patch("src.pipeline.plog") + def test_subtasks_completed_when_no_pr( + self, + mock_plog, + mock_cleanup, + mock_ensure, + mock_create_wt, + mock_agent, + mock_pr, + mock_finalize, + mock_unblock, + mock_notify, + tmp_tasks, + monkeypatch, + ): + from src.pipeline import _run_one_inner + + parent = tmp_tasks.create(title="No PR task", priority="high") + tmp_tasks.update_status(parent.id, TaskStatus.IN_PROGRESS) + parent = tmp_tasks.get(parent.id) + + plan_json = '[{"title": "Step 1", "description": "Do A"}]' + mock_agent.return_value = ( + subprocess.CompletedProcess(args=[], returncode=0, stdout="done"), + 10.0, + "", + {}, + ) + mock_pr.return_value = None + + monkeypatch.setattr("src.pipeline.AUTO_PLAN", False) + monkeypatch.setattr("src.pipeline.AUTO_DOCS", False) + monkeypatch.setattr("src.pipeline.AUTO_PR", True) + monkeypatch.setattr("src.pipeline._resolve_model", lambda t: None) + + _run_one_inner(tmp_tasks, parent) + + parent_updated = tmp_tasks.get(parent.id) + assert parent_updated.status == TaskStatus.COMPLETED + + +class TestGetOrCreateReplyWorktree: + """Tests for _get_or_create_reply_worktree worktree recreation logic.""" + + def test_returns_none_when_in_progress(self, tmp_tasks): + from src.pipeline import _get_or_create_reply_worktree + + task = tmp_tasks.create(title="Running task") + tmp_tasks.update_status(task.id, TaskStatus.IN_PROGRESS) + task = tmp_tasks.get(task.id) + + wt_path, created = _get_or_create_reply_worktree(task) + assert wt_path is None + assert created is False + + def test_attempts_branch_recreation(self, tmp_tasks, monkeypatch): + """When the worktree doesn't exist, it attempts to re-create on the task branch.""" + from unittest.mock import MagicMock + + from src.pipeline import _get_or_create_reply_worktree + + task = tmp_tasks.create(title="Test task", target_repo="my-repo") + tmp_tasks.update_status(task.id, TaskStatus.IN_REVIEW) + task = tmp_tasks.get(task.id) + + cmd_calls = [] + success = MagicMock(returncode=0, stdout="", stderr="") + + def track_cmd(cmd, cwd=None, timeout=None): + cmd_calls.append(cmd) + return success + + monkeypatch.setattr("src.pipeline._run_cmd", track_cmd) + monkeypatch.setattr("src.pipeline._resolve_repo_dir", lambda t: "/fake/repo") + monkeypatch.setattr("src.pipeline.WORKTREE_BASE", MagicMock()) + monkeypatch.setattr("src.pipeline.WORKTREE_BASE.__truediv__", lambda s, x: "/tmp/task-worktrees/" + x) + + import pathlib + + monkeypatch.setattr(pathlib.Path, "exists", lambda self: False) + + wt_path, created = _get_or_create_reply_worktree(task) + + git_cmds = [c for c in cmd_calls if "worktree" in str(c)] + assert len(git_cmds) >= 1 + assert "add" in str(git_cmds[0]) + + def test_passes_comment_body_for_rebase(self, tmp_tasks, monkeypatch): + """Comment body mentioning merge conflicts triggers rebase attempt.""" + from unittest.mock import MagicMock + + from src.pipeline import _maybe_rebase_for_merge_conflicts + + rebase_calls = [] + success = MagicMock(returncode=0, stdout="", stderr="") + + def track_cmd(cmd, cwd=None, timeout=None): + rebase_calls.append(cmd) + return success + + monkeypatch.setattr("src.pipeline._run_cmd", track_cmd) + monkeypatch.setattr( + "src.worktree._get_default_branch", + lambda repo: "main", + ) + + _maybe_rebase_for_merge_conflicts("/tmp/wt", "/fake/repo", "There are merge conflicts in this PR") + + rebase_cmd = [c for c in rebase_calls if "rebase" in str(c)] + assert len(rebase_cmd) == 1 + assert "origin/main" in str(rebase_cmd[0]) + + def test_no_rebase_without_conflict_keywords(self, tmp_tasks, monkeypatch): + """No rebase attempted if comment doesn't mention conflicts.""" + from unittest.mock import MagicMock + + from src.pipeline import _maybe_rebase_for_merge_conflicts + + rebase_calls = [] + + def track_cmd(cmd, cwd=None, timeout=None): + rebase_calls.append(cmd) + return MagicMock(returncode=0, stdout="", stderr="") + + monkeypatch.setattr("src.pipeline._run_cmd", track_cmd) + + _maybe_rebase_for_merge_conflicts("/tmp/wt", "/fake/repo", "Please fix the typo") + + assert len(rebase_calls) == 0 + + def test_rebase_abort_on_failure(self, tmp_tasks, monkeypatch): + """If rebase fails, it runs git rebase --abort.""" + from unittest.mock import MagicMock + + from src.pipeline import _maybe_rebase_for_merge_conflicts + + cmd_calls = [] + + def track_cmd(cmd, cwd=None, timeout=None): + cmd_calls.append(cmd) + if "rebase" in str(cmd) and "--abort" not in str(cmd): + return MagicMock(returncode=1, stdout="", stderr="CONFLICT in file.py") + return MagicMock(returncode=0, stdout="", stderr="") + + monkeypatch.setattr("src.pipeline._run_cmd", track_cmd) + monkeypatch.setattr("src.worktree._get_default_branch", lambda repo: "main") + + _maybe_rebase_for_merge_conflicts("/tmp/wt", "/fake/repo", "merge conflicts need resolving") + + abort_calls = [c for c in cmd_calls if "--abort" in str(c)] + assert len(abort_calls) == 1 From a9f2f82aa6991c9d5e7e93e2471ecd7d76a207f9 Mon Sep 17 00:00:00 2001 From: gf5901 Date: Mon, 23 Mar 2026 02:55:56 +0000 Subject: [PATCH 2/2] fix(ci): ruff unused var + force AUTH_ENABLED off in API tests - Remove unused plan_json in test_subtasks_completed_when_no_pr (ruff F841) - Patch web_mod.AUTH_ENABLED=False in test client so /api tests pass when host env sets AUTH_EMAIL/AUTH_PASSWORD (conftest setdefault does not override) Made-with: Cursor --- tests/test_api.py | 3 +++ tests/test_runner_helpers.py | 1 - 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_api.py b/tests/test_api.py index 81e3acf..035e76a 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -20,6 +20,9 @@ def client(tmp_tasks, monkeypatch): monkeypatch.setattr(tasks_router, "_get_store", lambda: tmp_tasks) monkeypatch.setattr(web_mod, "trigger_runner", lambda task_id: None) monkeypatch.setattr(web_mod, "cancel_runner", lambda task_id: None) + # Tests assume auth is off; host env may set AUTH_EMAIL/PASSWORD (setdefault in conftest + # does not override). Force-disable so /api/* returns 200, not 401. + monkeypatch.setattr(web_mod, "AUTH_ENABLED", False) from src.web import app return TestClient(app, raise_server_exceptions=True) diff --git a/tests/test_runner_helpers.py b/tests/test_runner_helpers.py index c2ff899..f6af8c7 100644 --- a/tests/test_runner_helpers.py +++ b/tests/test_runner_helpers.py @@ -1394,7 +1394,6 @@ def test_subtasks_completed_when_no_pr( tmp_tasks.update_status(parent.id, TaskStatus.IN_PROGRESS) parent = tmp_tasks.get(parent.id) - plan_json = '[{"title": "Step 1", "description": "Do A"}]' mock_agent.return_value = ( subprocess.CompletedProcess(args=[], returncode=0, stdout="done"), 10.0,