diff --git a/.cursor/rules/project-conventions.mdc b/.cursor/rules/project-conventions.mdc index 544cc0c..56cdb73 100644 --- a/.cursor/rules/project-conventions.mdc +++ b/.cursor/rules/project-conventions.mdc @@ -53,7 +53,9 @@ Tasks are stored in DynamoDB with the following fields: - Plan results are appended as `## Plan (timestamp)` sections ## Task Triggering -A polling daemon (`run_poller.py`) runs on EC2 as a systemd service and checks DynamoDB every `POLL_INTERVAL` seconds (default 15) for pending tasks and `reply_pending` comments. It spawns `run_task.py` subprocesses to handle them — no SSM trigger from Lambda is needed. The Lambda API just writes to DynamoDB (e.g. sets status to `pending` or `reply_pending: true`) and returns. SSM is only used for `cancelRunner` (SIGTERM), directive decomposition, daily cycle triggers, and autopilot plan proposals. +A polling daemon (`run_poller.py`) runs on EC2 as a systemd service and checks DynamoDB every `POLL_INTERVAL` seconds (default 15) for pending tasks and `reply_pending` comments. It spawns `run_task.py` subprocesses to handle them — no SSM trigger from Lambda is needed. The Lambda API just writes to DynamoDB (e.g. sets status to `pending` or `reply_pending: true`) and returns. SSM is used for `cancelRunner` (SIGTERM), directive decomposition, daily cycle triggers, autopilot plan proposals, and PM sweeps. + +**PM sweep exception:** The poller does **not** spawn PM chat replies or human-assigned task replies. These are handled by the hourly Autopilot Lambda, which triggers `run_task.py --pm-reply ` via SSM for projects with `reply_pending: true`. Up to `MAX_CONCURRENT_RUNNERS` (default 2) agents can run in parallel. Each process acquires a numbered slot lockfile (`runner.lock.0`, `runner.lock.1`, …); if all slots are occupied it exits cleanly. `run_one` claims a task by immediately flipping it to `in_progress` then re-reading to confirm it won any race — if another runner already claimed it, the process bails out. @@ -79,13 +81,22 @@ Role assignment during planning: both `PLAN_PROMPT` and `PLAN_ONLY_PROMPT` inclu ## Comment-Triggered Agent Replies When a user posts a comment via `POST /api/tasks/{id}/comment`, the Lambda API sets `reply_pending: true` on the task in DynamoDB and returns. The EC2 poller (`run_poller.py`) detects `reply_pending` tasks every poll cycle and spawns `run_task.py --reply `. `run_comment_reply` in `runner.py` clears `reply_pending` when done. The UI polls every 3s while `reply_pending` is true and shows a spinning "Agent is composing a reply…" indicator. Agent comments are styled in indigo with a bot icon. `POST /api/tasks/{id}/reply` still exists for manual triggering (sets `reply_pending: true`). +**Exception — human-assigned tasks with a project:** The poller skips these. Instead, the comment API also sets `reply_pending: true` on the PROJECT record. The hourly Autopilot Lambda detects this and triggers `run_task.py --pm-reply ` via SSM, so the PM agent handles the response in its next sweep (see PM Agent Sweep below). + ## Human-Assigned Tasks -Tasks with `assignee: "human"` are for the human operator (e.g., "set up GA4 property", "provide API key"). The daily cycle creates them automatically when the agent identifies something it needs from the human. Lifecycle: -1. Daily cycle creates task with `assignee: "human"`, `status: "pending"` +Tasks with `assignee: "human"` are for the human operator (e.g., "set up GA4 property", "provide API key"). The daily cycle and PM agent create them automatically when the agent identifies something it needs from the human. Lifecycle: +1. Daily cycle or PM agent creates task with `assignee: "human"`, `status: "pending"` 2. Human sees them in "My Tasks" sidebar filter and "Your Tasks" section on ProjectDetail -3. Human does the work and sets status to `in_review` -4. Next daily cycle reviews: if done correctly → `completed`; if not → `pending` with an agent comment explaining what's missing -The EC2 poller ignores `assignee: "human"` tasks (they are not picked up for agent execution). The `human` count in the API counts endpoint includes all non-terminal human-assigned tasks. +3. Human does the work and sets status to `in_review` — or posts a comment answering the question +4. Next PM sweep or daily cycle reviews: if done correctly → `completed`; if not → `pending` with an agent comment explaining what's missing +The EC2 poller ignores `assignee: "human"` tasks (they are not picked up for agent execution). When a human comments on a human-assigned task with a `project_id`, the PM agent handles the reply during its hourly sweep (not the generic comment reply agent). The `human` count in the API counts endpoint includes all non-terminal human-assigned tasks. + +## PM Agent Sweep +The PM agent (`src/pm_agent.py`) runs on an hourly schedule, piggybacking on the Autopilot Lambda. It does **not** respond to chat messages or human task replies immediately. Instead: +1. Human posts a project chat message or comments on a human-assigned task → `reply_pending: true` set on the PROJECT record +2. Hourly Autopilot Lambda detects `reply_pending` on active projects → triggers `run_task.py --pm-reply ` via SSM +3. `run_pm_reply()` processes all queued chat messages AND reviews human-assigned tasks with pending replies in a single agent session +4. The PM can: acknowledge human responses, mark tasks complete, post follow-up questions, create new agent/human tasks ## Web UI — React SPA + API @@ -150,7 +161,7 @@ Projects are long-lived entities (`pk=PROJECT#`, `sk=PROJECT`) with a title, `infra/packages/metrics/` — scheduled Lambda (cron `0 6 * * ? *`) fetches metrics for active projects with KPIs. Adapters: PageSpeed Insights (free, no auth), GitHub (existing token). Writes `SNAPSHOT#` records to DynamoDB, updates KPI current values, triggers `run_daily_cycle` on EC2 via SSM. ### Autopilot Lambda -`infra/packages/autopilot/` — scheduled Lambda (cron hourly) queries active projects with `autopilot: true`. **Continuous** mode (`autopilot_mode: continuous`, not `cycle_paused`): triggers `run_task.py --propose-plan` on EC2. **Daily** mode: triggers only at **07:00 UTC** (same hourly cron; filtered in Lambda). EC2 `src/autopilot.py` enforces the same rules. +`infra/packages/autopilot/` — scheduled Lambda (cron hourly) queries active projects with `autopilot: true`. **Continuous** mode (`autopilot_mode: continuous`, not `cycle_paused`): triggers `run_task.py --propose-plan` on EC2. **Daily** mode: triggers only at **07:00 UTC** (same hourly cron; filtered in Lambda). EC2 `src/autopilot.py` enforces the same rules. The same Lambda also triggers **PM sweeps** for any active project with `reply_pending: true`, running `run_task.py --pm-reply ` via SSM. This processes queued project chat messages and reviews human-assigned tasks with pending replies in a single agent session. ### Autonomous Daily Cycle Projects can define KPIs (`kpis` array on the PROJECT record). When KPIs are present, the Metrics Lambda collects data daily and triggers `run_daily_cycle()` in `src/objectives.py` on EC2. The cycle: loads 14 days of snapshots + proposals + recent tasks + human tasks in review → calls the agent (opus) with a reflection prompt → parses structured JSON → creates PROP records (proposals) and `assignee: "human"` tasks. The agent also reviews human tasks marked `in_review`, either completing them or returning them to `pending` with a comment. diff --git a/.gitignore b/.gitignore index 150045e..b5301e8 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ frontend/node_modules/ frontend/dist/ .DS_Store Thumbs.db +.cursor/rules/local-*.mdc diff --git a/AGENTS.md b/AGENTS.md index 910c6a3..9c96b4f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -36,16 +36,16 @@ Tasks flow through: **create worktree → plan subtasks → execute → update d **Autonomous objectives pipeline**: Projects with KPIs get a daily autonomous cycle. The Metrics Lambda (`infra/packages/metrics/`) runs at 6 AM UTC, fetches metrics (PageSpeed Insights, GitHub, optionally GA4/Search Console), writes SNAPSHOT records, and triggers `run_task.py --daily-cycle ` on EC2 via SSM. The daily cycle (`src/objectives.py`) loads metric history, proposals, and recent tasks, then calls the agent (opus) with a reflection prompt. The agent returns structured proposals and human requests. Proposals queue for human approval in the web UI; approved proposals become tasks in the existing pipeline. See `docs/autonomous-objectives.md`. -**Autopilot pipeline**: Projects with `autopilot: true` use `autopilot_mode` **`daily`** (default) or **`continuous`**. The Autopilot Lambda (`infra/packages/autopilot/`) runs **hourly**; **daily** projects are only dispatched at **07:00 UTC** (filtered in Lambda; EC2 enforces the same). **Continuous** projects run while a cycle is active (`cycle_started_at`, `cycle_max_hours`, not `cycle_paused`). `run_task.py --propose-plan [--regenerate] [--plan-suffix]` on EC2 calls `src/autopilot.py`, which uses the agent (opus) and `./ctx`. **Daily**: `PLAN#YYYY-MM-DD`, human approves in the UI. **Continuous**: `PLAN#YYYY-MM-DDTHH:MM:SS` UTC, auto-approved tasks, pauses + Discord on time window, failures, or human-only backlog. Plan tasks use `directive_sk=PLAN#…`. Directives cancel pending plan tasks. +**Autopilot pipeline**: Projects with `autopilot: true` use `autopilot_mode` **`daily`** (default) or **`continuous`**. The Autopilot Lambda (`infra/packages/autopilot/`) runs **hourly**; **daily** projects are only dispatched at **07:00 UTC** (filtered in Lambda; EC2 enforces the same). **Continuous** projects run while a cycle is active (`cycle_started_at`, `cycle_max_hours`, not `cycle_paused`). `run_task.py --propose-plan [--regenerate] [--plan-suffix]` on EC2 calls `src/autopilot.py`, which uses the agent (opus) and `./ctx`. **Daily**: `PLAN#YYYY-MM-DD`, human approves in the UI. **Continuous**: `PLAN#YYYY-MM-DDTHH:MM:SS` UTC, auto-approved tasks, pauses + Discord on time window, failures, or human-only backlog. Plan tasks use `directive_sk=PLAN#…`. Directives cancel pending plan tasks. The same hourly Lambda also triggers **PM sweeps** for projects with `reply_pending: true` — processing queued chat messages and human-assigned task replies in a single agent session (`run_task.py --pm-reply `). Every pipeline event is logged to `pipeline.log` (structured JSONL) with timestamps, task IDs, stages, runtimes, and model info. The Activity page (`/activity`) exposes this in the UI. ## Task dispatch — EC2 polling -The Lambda API does **not** trigger task execution directly. It writes to DynamoDB (creates tasks as `pending`, sets `reply_pending: true` for comments) and returns. A polling daemon on EC2 (`run_poller.py`) checks DynamoDB every `POLL_INTERVAL` seconds (default 15) and spawns `run_task.py` subprocesses for pending tasks and reply-pending comments. +The Lambda API does **not** trigger task execution directly. It writes to DynamoDB (creates tasks as `pending`, sets `reply_pending: true` for comments) and returns. A polling daemon on EC2 (`run_poller.py`) checks DynamoDB every `POLL_INTERVAL` seconds (default 15) and spawns `run_task.py` subprocesses for pending tasks and reply-pending comments (excluding human-assigned tasks with a project, which are handled by the PM sweep). - **EC2**: Set `DYNAMO_TABLE=agent-tasks` in `.env`. Run `run_poller.py` as a systemd service. -- **SSM**: Only used for `cancelRunner` (SIGTERM to kill a running agent), directive decomposition, daily cycle triggers, and autopilot plan proposals. `EC2_INSTANCE_ID` (or SST secret `Ec2InstanceId`) is required for these operations. +- **SSM**: Used for `cancelRunner` (SIGTERM to kill a running agent), directive decomposition, daily cycle triggers, autopilot plan proposals, and PM sweeps. `EC2_INSTANCE_ID` (or SST secret `Ec2InstanceId`) is required for these operations. - **Activity**: Pipeline events are written to DynamoDB, so the Activity page (which reads from Dynamo via the API) shows task_start, execute_done, etc. ## Python tests diff --git a/docs/dynamo-schema.md b/docs/dynamo-schema.md index d147140..ddd65c7 100644 --- a/docs/dynamo-schema.md +++ b/docs/dynamo-schema.md @@ -166,7 +166,7 @@ Project metadata. `id` is 8-char hex. | cycle_pause_reason | string? | `time_expired` · `blocked` · `failures` · `manual` | | cycle_feedback | string? | Human notes for next planner pass | | next_check_at | string? | Agent-requested deferral (ISO); empty if none | -| reply_pending | bool? | When true, EC2 poller runs `run_task.py --pm-reply` for project-level PM chat | +| reply_pending | bool? | When true, hourly Autopilot Lambda triggers `run_task.py --pm-reply` via SSM for project-level PM chat | **GSI:** project-list-index (`proj_status`, `project_updated`). diff --git a/infra/packages/api/src/lib/types.ts b/infra/packages/api/src/lib/types.ts index 74d0959..c8a832f 100644 --- a/infra/packages/api/src/lib/types.ts +++ b/infra/packages/api/src/lib/types.ts @@ -87,7 +87,7 @@ export interface Project { cycle_feedback: string; /** Agent-requested “check back after” time (ISO); empty if none */ next_check_at: string; - /** PM chat: EC2 poller should run run_task.py --pm-reply */ + /** PM chat: hourly Autopilot Lambda triggers run_task.py --pm-reply via SSM */ reply_pending: boolean; } diff --git a/infra/packages/api/src/routes/tasks.ts b/infra/packages/api/src/routes/tasks.ts index 5d25831..a3b8fca 100644 --- a/infra/packages/api/src/routes/tasks.ts +++ b/infra/packages/api/src/routes/tasks.ts @@ -250,6 +250,14 @@ tasks.post("/tasks/:id/comment", async (c) => { if (!comment) return c.json({ error: "not found" }, 404); await db.setReplyPending(c.req.param("id"), true); + + // For human-assigned tasks with a project, also flag the project for PM sweep + // so the hourly autopilot Lambda triggers the PM to review the response. + const task = await db.getTask(c.req.param("id")); + if (task?.assignee === "human" && task?.project_id) { + await db.updateProject(task.project_id, { reply_pending: true }); + } + return c.json({ author: comment.author, body: comment.body, diff --git a/infra/packages/autopilot/src/index.ts b/infra/packages/autopilot/src/index.ts index abf5155..bca698b 100644 --- a/infra/packages/autopilot/src/index.ts +++ b/infra/packages/autopilot/src/index.ts @@ -28,8 +28,17 @@ function shouldTriggerProposePlan(item: Record): boolean { return hour === 7; } -async function listActiveAutopilotProjectIds(): Promise { - const ids: string[] = []; +function shouldTriggerPmSweep(item: Record): boolean { + if (!item.project_id) return false; + return item.reply_pending === true; +} + +async function listActiveProjects(): Promise<{ + autopilotIds: string[]; + pmSweepIds: string[]; +}> { + const autopilotIds: string[] = []; + const pmSweepIds: string[] = []; let lastKey: Record | undefined; do { const resp = await ddb.send( @@ -42,13 +51,17 @@ async function listActiveAutopilotProjectIds(): Promise { }), ); for (const item of resp.Items ?? []) { - if (shouldTriggerProposePlan(item as Record)) { - ids.push(item.project_id as string); + const rec = item as Record; + if (shouldTriggerProposePlan(rec)) { + autopilotIds.push(rec.project_id as string); + } + if (shouldTriggerPmSweep(rec)) { + pmSweepIds.push(rec.project_id as string); } } lastKey = resp.LastEvaluatedKey; } while (lastKey); - return ids; + return { autopilotIds, pmSweepIds }; } async function triggerProposePlan(projectId: string): Promise { @@ -76,14 +89,41 @@ async function triggerProposePlan(projectId: string): Promise { console.log(`Triggered autopilot propose-plan for project ${projectId}`); } +async function triggerPmSweep(projectId: string): Promise { + const instanceId = await resolveInstanceId(); + if (!instanceId) { + console.warn("Could not resolve EC2 instance — skipping PM sweep trigger"); + return; + } + const esc = (s: string) => s.replace(/'/g, "'\\''"); + await ssm.send( + new SendCommandCommand({ + InstanceIds: [instanceId], + DocumentName: "AWS-RunShellScript", + Parameters: { + commands: [ + `sudo -u ec2-user setsid ${VENV_PYTHON} ${RUN_TASK_SCRIPT} --pm-reply '${esc( + projectId, + )}' >/dev/null 2>&1 &`, + ], + workingDirectory: [WORK_DIR], + }, + TimeoutSeconds: 600, + }), + ); + console.log(`Triggered PM sweep for project ${projectId}`); +} + export async function handler(): Promise { const today = new Date().toISOString().slice(0, 10); console.log(`Autopilot plan Lambda running (UTC date ${today}, hourly)`); - const projectIds = await listActiveAutopilotProjectIds(); - console.log(`Found ${projectIds.length} active autopilot project(s)`); + const { autopilotIds, pmSweepIds } = await listActiveProjects(); + console.log( + `Found ${autopilotIds.length} autopilot project(s), ${pmSweepIds.length} project(s) needing PM sweep`, + ); - for (const id of projectIds) { + for (const id of autopilotIds) { try { await triggerProposePlan(id); } catch (err) { @@ -91,5 +131,16 @@ export async function handler(): Promise { } } + for (const id of pmSweepIds) { + if (autopilotIds.includes(id)) { + console.log(`PM sweep for ${id} — project already triggered for autopilot, triggering PM separately`); + } + try { + await triggerPmSweep(id); + } catch (err) { + console.error(`Failed to trigger PM sweep for ${id}:`, err); + } + } + console.log("Autopilot plan Lambda complete"); } diff --git a/run_poller.py b/run_poller.py index b3334a0..6e44945 100644 --- a/run_poller.py +++ b/run_poller.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -"""Polling daemon that picks up pending tasks, task comment replies, and project PM chat from DynamoDB. +"""Polling daemon that picks up pending tasks and task comment replies from DynamoDB. Replaces SSM-based triggering from the Lambda API. Runs as a systemd service on EC2 and spawns run_task.py subprocesses — existing slot locking handles @@ -148,18 +148,16 @@ def main(): reply_tasks = store.list_reply_pending() for t in reply_tasks: + # Human-assigned tasks with a project are handled by the PM + # during the hourly autopilot sweep — don't spawn a generic reply. + if getattr(t, "assignee", "agent") == "human" and getattr(t, "project_id", ""): + continue _spawn(["--reply", t.id]) log.info("Spawned reply runner for task %s", t.id) - try: - from src.projects_dynamo import list_project_reply_pending - except ImportError: - list_project_reply_pending = None # type: ignore[assignment] - - if list_project_reply_pending is not None: - for pid in list_project_reply_pending(): - _spawn(["--pm-reply", pid]) - log.info("Spawned PM reply runner for project %s", pid) + # PM chat replies are processed during the hourly autopilot sweep, + # not immediately. The Lambda triggers --pm-sweep for projects that + # have reply_pending=true or human tasks needing review. except Exception: log.exception("Poller iteration failed") diff --git a/src/AGENTS.md b/src/AGENTS.md index cebf08b..404c12b 100644 --- a/src/AGENTS.md +++ b/src/AGENTS.md @@ -21,7 +21,7 @@ bot.py — Discord bot with slash commands (discord.py) task_store.py — Shared task types (Task, enums, Comment) dynamo_store.py — DynamoDB-backed task store (`DynamoTaskStore`) projects_dynamo.py — DynamoDB helpers for project records (get_project, update_project, directives, PLAN# plans (date or UTC datetime suffix), MEMORY# agent notes, CHAT# project PM thread, `reply_pending`, `post_system_message`; `resolve_memory_by_ref` for CLI lookup by sk/suffix) -pm_agent.py — Project-level PM chat: `run_pm_reply()` with `./ctx` + JSON actions (`run_task.py --pm-reply `) +pm_agent.py — Project-level PM sweep (hourly via Autopilot Lambda): processes queued chat messages + reviews human-assigned tasks with pending replies in one session; `run_pm_reply()` with `./ctx` + JSON actions (`run_task.py --pm-reply `) objectives.py — Daily KPI cycle: lean prompt + `./ctx` CLI + optional read-only repo worktree; `run_daily_cycle()` context_cli.py — Dynamo-backed context tool for agents (`./ctx spec|kpis|snapshots|tasks|proposals|human-tasks|plans|memory`); also exports `write_ctx_script()` to inject `./ctx` into any agent working dir autopilot.py — Autopilot plan proposal (daily vs continuous): lean prompt + `./ctx` CLI (`propose_daily_plan`; EC2 `run_task.py --propose-plan [--regenerate] [--plan-suffix]`) diff --git a/src/dynamo_store.py b/src/dynamo_store.py index e779ee4..a60a870 100644 --- a/src/dynamo_store.py +++ b/src/dynamo_store.py @@ -319,6 +319,19 @@ def list_reply_pending(self) -> List[Task]: """Return tasks with reply_pending=true (typically 0-1 at a time).""" return self._scan_meta(FilterExpression=Attr("reply_pending").eq(True)) + def list_human_reply_pending_for_project(self, project_id: str) -> List[Task]: + """Return human-assigned tasks with reply_pending=true for a given project.""" + if not project_id: + return [] + return self._query_all( + IndexName="project-index", + KeyConditionExpression=Key("project_id").eq(project_id), + FilterExpression=( + Attr("assignee").eq("human") + & Attr("reply_pending").eq(True) + ), + ) + def delete(self, task_id: str) -> bool: pk = _pk(task_id) resp = self._table.query( diff --git a/src/pipeline.py b/src/pipeline.py index 3ef17bb..f826cb4 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -173,6 +173,10 @@ def _build_role_options(): that requires your attention. You are running inside the same git worktree where you did the \ original work, so all your previous changes are present. +## Task +**%s** +%s + ## Latest Comment (from %s) %s @@ -182,6 +186,30 @@ def _build_role_options(): information, acknowledge it and take any appropriate action. Respond concisely.""" ) +HUMAN_TASK_REPLY_PROMPT = ( + SECURITY_PREFIX + + """\ +You are an AI assistant managing a task that was assigned to a human operator. The task asked \ +the human to provide information, make a decision, or perform an action that only a human can do. \ +The human has now replied. Your job is to acknowledge their response and help move the task forward. + +## Task +**%s** +%s + +## Latest Comment (from %s) +%s + +## Your Instructions +1. Acknowledge the human's response and summarise the key decisions or information they provided. +2. If their response fully addresses the task, say so and recommend marking this task as completed. +3. If their response is partial or raises follow-up questions, note what is still outstanding. +4. Do NOT attempt to execute engineering work yourself (e.g. writing code, creating infrastructure, \ +making PRs). If engineering work is needed based on the human's response, suggest creating a \ +separate task for it — describe what that task should contain. +5. Keep your reply concise and actionable.""" +) + def pick_next_task(store): # type: (Any) -> Optional[Any] @@ -761,6 +789,16 @@ def run_comment_reply(store, task_id): log.warning("Comment reply: task %s not found", task_id) return False + # Human-assigned tasks with a project are handled by the PM during the + # hourly autopilot sweep — don't process them here. + is_human = getattr(task, "assignee", "agent") == "human" + if is_human and getattr(task, "project_id", ""): + log.info( + "Comment reply: task %s is human-assigned with project — deferring to PM sweep", + task_id, + ) + return False + # Atomically claim the reply by clearing reply_pending before doing any work. # If reply_pending is already false another process already claimed it — bail. if not task.reply_pending: @@ -782,14 +820,31 @@ def run_comment_reply(store, task_id): latest = user_comments[-1] - prompt = COMMENT_REPLY_PROMPT % ( - latest.author, - latest.body, - ) + desc_snippet = (task.description or "")[:500] + if is_human: + prompt = HUMAN_TASK_REPLY_PROMPT % ( + task.title, + desc_snippet, + latest.author, + latest.body, + ) + else: + prompt = COMMENT_REPLY_PROMPT % ( + task.title, + desc_snippet, + latest.author, + latest.body, + ) plog(task_id, "reply_start", "execute", "Responding to comment") - wt_path, created_fresh = _get_or_create_reply_worktree(task) + # Human-assigned tasks (without a project — rare fallback) always get a + # text-only reply in a tmpdir; they should never create code or worktrees. + if is_human: + wt_path = None + created_fresh = False + else: + wt_path, created_fresh = _get_or_create_reply_worktree(task) try: if wt_path: diff --git a/src/pm_agent.py b/src/pm_agent.py index 0a5c26a..8870501 100644 --- a/src/pm_agent.py +++ b/src/pm_agent.py @@ -17,6 +17,7 @@ list_chat_messages, ) from .roles import ROLES +from .task_store import TaskStatus log = logging.getLogger(__name__) @@ -101,6 +102,7 @@ def _build_role_list() -> str: ## Recent chat (oldest first) {chat_block} +{human_tasks_block}\ ## Available tools Load context on demand from this directory: ./ctx spec # Full project spec (markdown) @@ -116,12 +118,16 @@ def _build_role_list() -> str: {role_list} ## Your task -Respond to the human's latest message in the chat. Use ./ctx when you need facts you do not \ -already have. You may create work by returning structured fields in your JSON response. +Review the chat and any human tasks with pending replies below. Use ./ctx when you need facts \ +you do not already have. You may create work by returning structured fields in your JSON response. + +For human tasks with pending replies: the human has responded to a task you previously assigned. \ +Review their response. If it fully addresses the task, include it in "complete_tasks". If it \ +needs more information, include it in "reply_to_tasks" with a follow-up question. Respond ONLY with valid JSON (no markdown fences, no extra text): {{ - "reply": "markdown message to the human (required)", + "reply": "markdown message to the human in the chat thread (required, even if empty string)", "create_agent_tasks": [ {{ "title": "short imperative title", @@ -136,25 +142,50 @@ def _build_role_list() -> str: "description": "clear acceptance criteria", "priority": "low|medium|high|urgent" }} + ], + "complete_tasks": ["task_id1", "task_id2"], + "reply_to_tasks": [ + {{ + "task_id": "the task id", + "comment": "your follow-up question or acknowledgement" + }} ] }} -Use empty arrays [] if you are not creating tasks. The "reply" field is always required. +Use empty arrays [] for any field you are not using. The "reply" field is always required \ +(use "" if you have nothing to say in chat but are acting on tasks). """ ) def run_pm_reply(store: Any, project_id: str) -> bool: - """Handle one PM chat turn: claim, run agent in ctx dir, parse JSON, create tasks, post reply.""" + """Handle one PM sweep: chat replies + human-task review in a single agent session.""" proj = get_project(project_id) if not proj: log.warning("pm_reply: project %s not found", project_id) return False - if not proj.get("reply_pending"): - log.info("pm_reply: reply_pending false for project %s — skip", project_id) + + has_chat = bool(proj.get("reply_pending")) + human_tasks = store.list_human_reply_pending_for_project(project_id) + + if not has_chat and not human_tasks: + log.info("pm_reply: nothing to do for project %s", project_id) return False - if not claim_project_pm_reply(project_id): + # Claim the chat reply_pending if set (atomic guard against concurrent runs). + if has_chat and not claim_project_pm_reply(project_id): log.info("pm_reply: lost claim race for project %s", project_id) + has_chat = False + + # Clear reply_pending on each human task we're about to process. + claimed_tasks = [] # type: List[Any] + for ht in human_tasks: + store.set_reply_pending(ht.id, False) + ht_refreshed = store.get(ht.id) + if ht_refreshed and not ht_refreshed.reply_pending: + claimed_tasks.append(ht_refreshed) + + if not has_chat and not claimed_tasks: + log.info("pm_reply: lost all claims for project %s", project_id) return False proj = get_project(project_id) @@ -164,16 +195,8 @@ def run_pm_reply(store: Any, project_id: str) -> bool: title = str(proj.get("title", "Project")) target_repo = str(proj.get("target_repo", "") or "(not set)").strip() or "(not set)" messages = list_chat_messages(project_id, limit=30) - if not messages: - log.info("pm_reply: no chat messages for project %s", project_id) - add_chat_message( - project_id, - "pm-agent", - "I did not find a message to respond to.", - ) - return True - lines = [] + lines = [] # type: List[str] for m in messages: author = str(m.get("author", "")) body = str(m.get("body", "")).strip() @@ -181,10 +204,33 @@ def run_pm_reply(store: Any, project_id: str) -> bool: lines.append("- (%s) **%s**\n %s" % (ts, author, body)) chat_block = "\n".join(lines) if lines else "(no messages)" + # Build human tasks context block + ht_lines = [] # type: List[str] + for ht in claimed_tasks: + comments = store.get_comments(ht.id) + user_comments = [c for c in comments if c.author != "agent"] + latest_comment = user_comments[-1].body if user_comments else "(no reply yet)" + ht_lines.append( + "- **[%s]** %s (status: %s)\n" + " Description: %s\n" + " Human's latest reply: %s" + % (ht.id, ht.title, ht.status.value, (ht.description or "")[:300], latest_comment) + ) + + if ht_lines: + human_tasks_block = ( + "## Human tasks awaiting your review\n" + "These are tasks you previously assigned to the human. They have responded.\n" + "%s\n\n" % "\n".join(ht_lines) + ) + else: + human_tasks_block = "" + prompt = PM_AGENT_PROMPT.format( title=title, target_repo=target_repo, chat_block=chat_block, + human_tasks_block=human_tasks_block, role_list=_build_role_list(), ) @@ -280,6 +326,40 @@ def run_pm_reply(store: Any, project_id: str) -> bool: extras.append("Assigned you a task: **%s**" % norm["title"][:80]) except Exception: log.warning("pm_reply: could not create human task", exc_info=True) + + # Process task completions + complete_ids = parsed.get("complete_tasks", []) + if not isinstance(complete_ids, list): + complete_ids = [] + claimed_ids = {ht.id for ht in claimed_tasks} + for tid in complete_ids: + tid = str(tid).strip() + if tid not in claimed_ids: + log.warning("pm_reply: agent tried to complete task %s not in claimed set", tid) + continue + try: + store.update_status(tid, TaskStatus.COMPLETED) + store.add_comment(tid, "pm-agent", "Marked complete by PM — response accepted.") + extras.append("Completed task: **%s**" % tid) + except Exception: + log.warning("pm_reply: could not complete task %s", tid, exc_info=True) + + # Process task replies (follow-up questions) + reply_items = parsed.get("reply_to_tasks", []) + if not isinstance(reply_items, list): + reply_items = [] + for ri in reply_items: + if not isinstance(ri, dict): + continue + tid = str(ri.get("task_id", "")).strip() + comment = str(ri.get("comment", "")).strip() + if not tid or not comment or tid not in claimed_ids: + continue + try: + store.add_comment(tid, "pm-agent", comment) + extras.append("Replied to task: **%s**" % tid) + except Exception: + log.warning("pm_reply: could not reply to task %s", tid, exc_info=True) else: reply_body = agent_text.strip() @@ -289,7 +369,8 @@ def run_pm_reply(store: Any, project_id: str) -> bool: if extras: reply_body += "\n\n---\n" + "\n".join(extras) - add_chat_message(project_id, "pm-agent", reply_body) + if reply_body: + add_chat_message(project_id, "pm-agent", reply_body) plog( project_id, "pm_reply_done", diff --git a/tests/test_runner_helpers.py b/tests/test_runner_helpers.py index 702f1a5..afa3d9e 100644 --- a/tests/test_runner_helpers.py +++ b/tests/test_runner_helpers.py @@ -1072,6 +1072,91 @@ def test_preexisting_worktree_not_cleaned_up(self, tmp_tasks, monkeypatch): assert cleanup_calls == [] + def test_human_task_with_project_deferred_to_pm(self, tmp_tasks, monkeypatch): + """Human-assigned tasks with a project_id are deferred to the PM sweep.""" + from src.pipeline import run_comment_reply + + agent_calls = [] + + def counting_agent(*a, **kw): + import subprocess as _sp + + agent_calls.append(1) + fake = _sp.CompletedProcess(args=[], returncode=0, stdout="reply", stderr="") + return fake, 1.0, "", {} + + monkeypatch.setattr("src.pipeline.run_agent", counting_agent) + + task = tmp_tasks.create( + title="Confirm DNS setup", + description="Please confirm DNS", + assignee="human", + project_id="proj-123", + ) + tmp_tasks.add_comment(task.id, "web", "Yes, please proceed") + self._set_reply_pending(tmp_tasks, task.id) + + result = run_comment_reply(tmp_tasks, task.id) + assert result is False + assert agent_calls == [] + # reply_pending should still be true (not claimed) + assert tmp_tasks.get(task.id).reply_pending is True + + def test_human_task_without_project_uses_human_prompt(self, tmp_tasks, monkeypatch): + """Human-assigned tasks without a project_id use the HUMAN_TASK_REPLY_PROMPT.""" + from src.pipeline import run_comment_reply + + captured = {} + + def capture_agent(prompt, **kw): + import subprocess as _sp + + captured["prompt"] = prompt + captured["kw"] = kw + fake = _sp.CompletedProcess(args=[], returncode=0, stdout="acknowledged", stderr="") + return fake, 1.0, "", {} + + monkeypatch.setattr("src.pipeline.run_agent", capture_agent) + + task = tmp_tasks.create( + title="Provide API key", + description="Please provide the GA4 API key", + assignee="human", + ) + tmp_tasks.add_comment(task.id, "web", "Here is the key: abc123") + self._set_reply_pending(tmp_tasks, task.id) + + result = run_comment_reply(tmp_tasks, task.id) + assert result is True + assert "assigned to a human operator" in captured["prompt"] + assert "Provide API key" in captured["prompt"] + assert "Here is the key: abc123" in captured["prompt"] + assert "Do NOT attempt to execute engineering work" in captured["prompt"] + + def test_prompt_includes_task_context(self, tmp_tasks, monkeypatch): + """Both prompts now include the task title and description.""" + from src.pipeline import run_comment_reply + + captured = {} + + def capture_agent(prompt, **kw): + import subprocess as _sp + + captured["prompt"] = prompt + fake = _sp.CompletedProcess(args=[], returncode=0, stdout="ok", stderr="") + return fake, 1.0, "", {} + + monkeypatch.setattr("src.pipeline.run_agent", capture_agent) + + task = tmp_tasks.create(title="Fix the login bug", description="Users cannot log in") + tmp_tasks.add_comment(task.id, "web", "Still broken") + self._set_reply_pending(tmp_tasks, task.id) + + run_comment_reply(tmp_tasks, task.id) + + assert "Fix the login bug" in captured["prompt"] + assert "Users cannot log in" in captured["prompt"] + class TestCancelViaStatusUpdate: """cancel_runner is called when PATCH /status sets cancelled."""