Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions .cursor/rules/project-conventions.mdc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ Tasks are stored in DynamoDB with the following fields:
- Plan results are appended as `## Plan (timestamp)` sections

## Task Triggering
A polling daemon (`run_poller.py`) runs on EC2 as a systemd service and checks DynamoDB every `POLL_INTERVAL` seconds (default 15) for pending tasks and `reply_pending` comments. It spawns `run_task.py` subprocesses to handle them — no SSM trigger from Lambda is needed. The Lambda API just writes to DynamoDB (e.g. sets status to `pending` or `reply_pending: true`) and returns. SSM is only used for `cancelRunner` (SIGTERM), directive decomposition, daily cycle triggers, and autopilot plan proposals.
A polling daemon (`run_poller.py`) runs on EC2 as a systemd service and checks DynamoDB every `POLL_INTERVAL` seconds (default 15) for pending tasks and `reply_pending` comments. It spawns `run_task.py` subprocesses to handle them — no SSM trigger from Lambda is needed. The Lambda API just writes to DynamoDB (e.g. sets status to `pending` or `reply_pending: true`) and returns. SSM is used for `cancelRunner` (SIGTERM), directive decomposition, daily cycle triggers, autopilot plan proposals, and PM sweeps.

**PM sweep exception:** The poller does **not** spawn PM chat replies or human-assigned task replies. These are handled by the hourly Autopilot Lambda, which triggers `run_task.py --pm-reply <project_id>` via SSM for projects with `reply_pending: true`.

Up to `MAX_CONCURRENT_RUNNERS` (default 2) agents can run in parallel. Each process acquires a numbered slot lockfile (`runner.lock.0`, `runner.lock.1`, …); if all slots are occupied it exits cleanly. `run_one` claims a task by immediately flipping it to `in_progress` then re-reading to confirm it won any race — if another runner already claimed it, the process bails out.

Expand All @@ -79,13 +81,22 @@ Role assignment during planning: both `PLAN_PROMPT` and `PLAN_ONLY_PROMPT` inclu
## Comment-Triggered Agent Replies
When a user posts a comment via `POST /api/tasks/{id}/comment`, the Lambda API sets `reply_pending: true` on the task in DynamoDB and returns. The EC2 poller (`run_poller.py`) detects `reply_pending` tasks every poll cycle and spawns `run_task.py --reply <task_id>`. `run_comment_reply` in `runner.py` clears `reply_pending` when done. The UI polls every 3s while `reply_pending` is true and shows a spinning "Agent is composing a reply…" indicator. Agent comments are styled in indigo with a bot icon. `POST /api/tasks/{id}/reply` still exists for manual triggering (sets `reply_pending: true`).

**Exception — human-assigned tasks with a project:** The poller skips these. Instead, the comment API also sets `reply_pending: true` on the PROJECT record. The hourly Autopilot Lambda detects this and triggers `run_task.py --pm-reply <project_id>` via SSM, so the PM agent handles the response in its next sweep (see PM Agent Sweep below).

## Human-Assigned Tasks
Tasks with `assignee: "human"` are for the human operator (e.g., "set up GA4 property", "provide API key"). The daily cycle creates them automatically when the agent identifies something it needs from the human. Lifecycle:
1. Daily cycle creates task with `assignee: "human"`, `status: "pending"`
Tasks with `assignee: "human"` are for the human operator (e.g., "set up GA4 property", "provide API key"). The daily cycle and PM agent create them automatically when the agent identifies something it needs from the human. Lifecycle:
1. Daily cycle or PM agent creates task with `assignee: "human"`, `status: "pending"`
2. Human sees them in "My Tasks" sidebar filter and "Your Tasks" section on ProjectDetail
3. Human does the work and sets status to `in_review`
4. Next daily cycle reviews: if done correctly → `completed`; if not → `pending` with an agent comment explaining what's missing
The EC2 poller ignores `assignee: "human"` tasks (they are not picked up for agent execution). The `human` count in the API counts endpoint includes all non-terminal human-assigned tasks.
3. Human does the work and sets status to `in_review` — or posts a comment answering the question
4. Next PM sweep or daily cycle reviews: if done correctly → `completed`; if not → `pending` with an agent comment explaining what's missing
The EC2 poller ignores `assignee: "human"` tasks (they are not picked up for agent execution). When a human comments on a human-assigned task with a `project_id`, the PM agent handles the reply during its hourly sweep (not the generic comment reply agent). The `human` count in the API counts endpoint includes all non-terminal human-assigned tasks.

## PM Agent Sweep
The PM agent (`src/pm_agent.py`) runs on an hourly schedule, piggybacking on the Autopilot Lambda. It does **not** respond to chat messages or human task replies immediately. Instead:
1. Human posts a project chat message or comments on a human-assigned task → `reply_pending: true` set on the PROJECT record
2. Hourly Autopilot Lambda detects `reply_pending` on active projects → triggers `run_task.py --pm-reply <project_id>` via SSM
3. `run_pm_reply()` processes all queued chat messages AND reviews human-assigned tasks with pending replies in a single agent session
4. The PM can: acknowledge human responses, mark tasks complete, post follow-up questions, create new agent/human tasks

## Web UI — React SPA + API

Expand Down Expand Up @@ -150,7 +161,7 @@ Projects are long-lived entities (`pk=PROJECT#<id>`, `sk=PROJECT`) with a title,
`infra/packages/metrics/` — scheduled Lambda (cron `0 6 * * ? *`) fetches metrics for active projects with KPIs. Adapters: PageSpeed Insights (free, no auth), GitHub (existing token). Writes `SNAPSHOT#<date>` records to DynamoDB, updates KPI current values, triggers `run_daily_cycle` on EC2 via SSM.

### Autopilot Lambda
`infra/packages/autopilot/` — scheduled Lambda (cron hourly) queries active projects with `autopilot: true`. **Continuous** mode (`autopilot_mode: continuous`, not `cycle_paused`): triggers `run_task.py --propose-plan` on EC2. **Daily** mode: triggers only at **07:00 UTC** (same hourly cron; filtered in Lambda). EC2 `src/autopilot.py` enforces the same rules.
`infra/packages/autopilot/` — scheduled Lambda (cron hourly) queries active projects with `autopilot: true`. **Continuous** mode (`autopilot_mode: continuous`, not `cycle_paused`): triggers `run_task.py --propose-plan` on EC2. **Daily** mode: triggers only at **07:00 UTC** (same hourly cron; filtered in Lambda). EC2 `src/autopilot.py` enforces the same rules. The same Lambda also triggers **PM sweeps** for any active project with `reply_pending: true`, running `run_task.py --pm-reply <project_id>` via SSM. This processes queued project chat messages and reviews human-assigned tasks with pending replies in a single agent session.

### Autonomous Daily Cycle
Projects can define KPIs (`kpis` array on the PROJECT record). When KPIs are present, the Metrics Lambda collects data daily and triggers `run_daily_cycle()` in `src/objectives.py` on EC2. The cycle: loads 14 days of snapshots + proposals + recent tasks + human tasks in review → calls the agent (opus) with a reflection prompt → parses structured JSON → creates PROP records (proposals) and `assignee: "human"` tasks. The agent also reviews human tasks marked `in_review`, either completing them or returning them to `pending` with a comment.
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ frontend/node_modules/
frontend/dist/
.DS_Store
Thumbs.db
.cursor/rules/local-*.mdc
6 changes: 3 additions & 3 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ Tasks flow through: **create worktree → plan subtasks → execute → update d

**Autonomous objectives pipeline**: Projects with KPIs get a daily autonomous cycle. The Metrics Lambda (`infra/packages/metrics/`) runs at 6 AM UTC, fetches metrics (PageSpeed Insights, GitHub, optionally GA4/Search Console), writes SNAPSHOT records, and triggers `run_task.py --daily-cycle <project_id>` on EC2 via SSM. The daily cycle (`src/objectives.py`) loads metric history, proposals, and recent tasks, then calls the agent (opus) with a reflection prompt. The agent returns structured proposals and human requests. Proposals queue for human approval in the web UI; approved proposals become tasks in the existing pipeline. See `docs/autonomous-objectives.md`.

**Autopilot pipeline**: Projects with `autopilot: true` use `autopilot_mode` **`daily`** (default) or **`continuous`**. The Autopilot Lambda (`infra/packages/autopilot/`) runs **hourly**; **daily** projects are only dispatched at **07:00 UTC** (filtered in Lambda; EC2 enforces the same). **Continuous** projects run while a cycle is active (`cycle_started_at`, `cycle_max_hours`, not `cycle_paused`). `run_task.py --propose-plan <project_id> [--regenerate] [--plan-suffix]` on EC2 calls `src/autopilot.py`, which uses the agent (opus) and `./ctx`. **Daily**: `PLAN#YYYY-MM-DD`, human approves in the UI. **Continuous**: `PLAN#YYYY-MM-DDTHH:MM:SS` UTC, auto-approved tasks, pauses + Discord on time window, failures, or human-only backlog. Plan tasks use `directive_sk=PLAN#…`. Directives cancel pending plan tasks.
**Autopilot pipeline**: Projects with `autopilot: true` use `autopilot_mode` **`daily`** (default) or **`continuous`**. The Autopilot Lambda (`infra/packages/autopilot/`) runs **hourly**; **daily** projects are only dispatched at **07:00 UTC** (filtered in Lambda; EC2 enforces the same). **Continuous** projects run while a cycle is active (`cycle_started_at`, `cycle_max_hours`, not `cycle_paused`). `run_task.py --propose-plan <project_id> [--regenerate] [--plan-suffix]` on EC2 calls `src/autopilot.py`, which uses the agent (opus) and `./ctx`. **Daily**: `PLAN#YYYY-MM-DD`, human approves in the UI. **Continuous**: `PLAN#YYYY-MM-DDTHH:MM:SS` UTC, auto-approved tasks, pauses + Discord on time window, failures, or human-only backlog. Plan tasks use `directive_sk=PLAN#…`. Directives cancel pending plan tasks. The same hourly Lambda also triggers **PM sweeps** for projects with `reply_pending: true` — processing queued chat messages and human-assigned task replies in a single agent session (`run_task.py --pm-reply <project_id>`).

Every pipeline event is logged to `pipeline.log` (structured JSONL) with timestamps, task IDs, stages, runtimes, and model info. The Activity page (`/activity`) exposes this in the UI.

## Task dispatch — EC2 polling

The Lambda API does **not** trigger task execution directly. It writes to DynamoDB (creates tasks as `pending`, sets `reply_pending: true` for comments) and returns. A polling daemon on EC2 (`run_poller.py`) checks DynamoDB every `POLL_INTERVAL` seconds (default 15) and spawns `run_task.py` subprocesses for pending tasks and reply-pending comments.
The Lambda API does **not** trigger task execution directly. It writes to DynamoDB (creates tasks as `pending`, sets `reply_pending: true` for comments) and returns. A polling daemon on EC2 (`run_poller.py`) checks DynamoDB every `POLL_INTERVAL` seconds (default 15) and spawns `run_task.py` subprocesses for pending tasks and reply-pending comments (excluding human-assigned tasks with a project, which are handled by the PM sweep).

- **EC2**: Set `DYNAMO_TABLE=agent-tasks` in `.env`. Run `run_poller.py` as a systemd service.
- **SSM**: Only used for `cancelRunner` (SIGTERM to kill a running agent), directive decomposition, daily cycle triggers, and autopilot plan proposals. `EC2_INSTANCE_ID` (or SST secret `Ec2InstanceId`) is required for these operations.
- **SSM**: Used for `cancelRunner` (SIGTERM to kill a running agent), directive decomposition, daily cycle triggers, autopilot plan proposals, and PM sweeps. `EC2_INSTANCE_ID` (or SST secret `Ec2InstanceId`) is required for these operations.
- **Activity**: Pipeline events are written to DynamoDB, so the Activity page (which reads from Dynamo via the API) shows task_start, execute_done, etc.

## Python tests
Expand Down
2 changes: 1 addition & 1 deletion docs/dynamo-schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ Project metadata. `id` is 8-char hex.
| cycle_pause_reason | string? | `time_expired` · `blocked` · `failures` · `manual` |
| cycle_feedback | string? | Human notes for next planner pass |
| next_check_at | string? | Agent-requested deferral (ISO); empty if none |
| reply_pending | bool? | When true, EC2 poller runs `run_task.py --pm-reply` for project-level PM chat |
| reply_pending | bool? | When true, hourly Autopilot Lambda triggers `run_task.py --pm-reply` via SSM for project-level PM chat |

**GSI:** project-list-index (`proj_status`, `project_updated`).

Expand Down
2 changes: 1 addition & 1 deletion infra/packages/api/src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export interface Project {
cycle_feedback: string;
/** Agent-requested “check back after” time (ISO); empty if none */
next_check_at: string;
/** PM chat: EC2 poller should run run_task.py --pm-reply */
/** PM chat: hourly Autopilot Lambda triggers run_task.py --pm-reply via SSM */
reply_pending: boolean;
}

Expand Down
8 changes: 8 additions & 0 deletions infra/packages/api/src/routes/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,14 @@ tasks.post("/tasks/:id/comment", async (c) => {
if (!comment) return c.json({ error: "not found" }, 404);

await db.setReplyPending(c.req.param("id"), true);

// For human-assigned tasks with a project, also flag the project for PM sweep
// so the hourly autopilot Lambda triggers the PM to review the response.
const task = await db.getTask(c.req.param("id"));
if (task?.assignee === "human" && task?.project_id) {
await db.updateProject(task.project_id, { reply_pending: true });
}

return c.json({
author: comment.author,
body: comment.body,
Expand Down
67 changes: 59 additions & 8 deletions infra/packages/autopilot/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,17 @@ function shouldTriggerProposePlan(item: Record<string, unknown>): boolean {
return hour === 7;
}

async function listActiveAutopilotProjectIds(): Promise<string[]> {
const ids: string[] = [];
function shouldTriggerPmSweep(item: Record<string, unknown>): boolean {
if (!item.project_id) return false;
return item.reply_pending === true;
}

async function listActiveProjects(): Promise<{
autopilotIds: string[];
pmSweepIds: string[];
}> {
const autopilotIds: string[] = [];
const pmSweepIds: string[] = [];
let lastKey: Record<string, unknown> | undefined;
do {
const resp = await ddb.send(
Expand All @@ -42,13 +51,17 @@ async function listActiveAutopilotProjectIds(): Promise<string[]> {
}),
);
for (const item of resp.Items ?? []) {
if (shouldTriggerProposePlan(item as Record<string, unknown>)) {
ids.push(item.project_id as string);
const rec = item as Record<string, unknown>;
if (shouldTriggerProposePlan(rec)) {
autopilotIds.push(rec.project_id as string);
}
if (shouldTriggerPmSweep(rec)) {
pmSweepIds.push(rec.project_id as string);
}
}
lastKey = resp.LastEvaluatedKey;
} while (lastKey);
return ids;
return { autopilotIds, pmSweepIds };
}

async function triggerProposePlan(projectId: string): Promise<void> {
Expand Down Expand Up @@ -76,20 +89,58 @@ async function triggerProposePlan(projectId: string): Promise<void> {
console.log(`Triggered autopilot propose-plan for project ${projectId}`);
}

async function triggerPmSweep(projectId: string): Promise<void> {
const instanceId = await resolveInstanceId();
if (!instanceId) {
console.warn("Could not resolve EC2 instance — skipping PM sweep trigger");
return;
}
const esc = (s: string) => s.replace(/'/g, "'\\''");
await ssm.send(
new SendCommandCommand({
InstanceIds: [instanceId],
DocumentName: "AWS-RunShellScript",
Parameters: {
commands: [
`sudo -u ec2-user setsid ${VENV_PYTHON} ${RUN_TASK_SCRIPT} --pm-reply '${esc(
projectId,
)}' >/dev/null 2>&1 &`,
],
workingDirectory: [WORK_DIR],
},
TimeoutSeconds: 600,
}),
);
console.log(`Triggered PM sweep for project ${projectId}`);
}

export async function handler(): Promise<void> {
const today = new Date().toISOString().slice(0, 10);
console.log(`Autopilot plan Lambda running (UTC date ${today}, hourly)`);

const projectIds = await listActiveAutopilotProjectIds();
console.log(`Found ${projectIds.length} active autopilot project(s)`);
const { autopilotIds, pmSweepIds } = await listActiveProjects();
console.log(
`Found ${autopilotIds.length} autopilot project(s), ${pmSweepIds.length} project(s) needing PM sweep`,
);

for (const id of projectIds) {
for (const id of autopilotIds) {
try {
await triggerProposePlan(id);
} catch (err) {
console.error(`Failed to trigger propose-plan for ${id}:`, err);
}
}

for (const id of pmSweepIds) {
if (autopilotIds.includes(id)) {
console.log(`PM sweep for ${id} — project already triggered for autopilot, triggering PM separately`);
}
try {
await triggerPmSweep(id);
} catch (err) {
console.error(`Failed to trigger PM sweep for ${id}:`, err);
}
}

console.log("Autopilot plan Lambda complete");
}
18 changes: 8 additions & 10 deletions run_poller.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
"""Polling daemon that picks up pending tasks, task comment replies, and project PM chat from DynamoDB.
"""Polling daemon that picks up pending tasks and task comment replies from DynamoDB.

Replaces SSM-based triggering from the Lambda API. Runs as a systemd service
on EC2 and spawns run_task.py subprocesses — existing slot locking handles
Expand Down Expand Up @@ -148,18 +148,16 @@ def main():

reply_tasks = store.list_reply_pending()
for t in reply_tasks:
# Human-assigned tasks with a project are handled by the PM
# during the hourly autopilot sweep — don't spawn a generic reply.
if getattr(t, "assignee", "agent") == "human" and getattr(t, "project_id", ""):
continue
_spawn(["--reply", t.id])
log.info("Spawned reply runner for task %s", t.id)

try:
from src.projects_dynamo import list_project_reply_pending
except ImportError:
list_project_reply_pending = None # type: ignore[assignment]

if list_project_reply_pending is not None:
for pid in list_project_reply_pending():
_spawn(["--pm-reply", pid])
log.info("Spawned PM reply runner for project %s", pid)
# PM chat replies are processed during the hourly autopilot sweep,
# not immediately. The Lambda triggers --pm-sweep for projects that
# have reply_pending=true or human tasks needing review.

except Exception:
log.exception("Poller iteration failed")
Expand Down
2 changes: 1 addition & 1 deletion src/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ bot.py — Discord bot with slash commands (discord.py)
task_store.py — Shared task types (Task, enums, Comment)
dynamo_store.py — DynamoDB-backed task store (`DynamoTaskStore`)
projects_dynamo.py — DynamoDB helpers for project records (get_project, update_project, directives, PLAN# plans (date or UTC datetime suffix), MEMORY# agent notes, CHAT# project PM thread, `reply_pending`, `post_system_message`; `resolve_memory_by_ref` for CLI lookup by sk/suffix)
pm_agent.py — Project-level PM chat: `run_pm_reply()` with `./ctx` + JSON actions (`run_task.py --pm-reply <project_id>`)
pm_agent.py — Project-level PM sweep (hourly via Autopilot Lambda): processes queued chat messages + reviews human-assigned tasks with pending replies in one session; `run_pm_reply()` with `./ctx` + JSON actions (`run_task.py --pm-reply <project_id>`)
objectives.py — Daily KPI cycle: lean prompt + `./ctx` CLI + optional read-only repo worktree; `run_daily_cycle()`
context_cli.py — Dynamo-backed context tool for agents (`./ctx spec|kpis|snapshots|tasks|proposals|human-tasks|plans|memory`); also exports `write_ctx_script()` to inject `./ctx` into any agent working dir
autopilot.py — Autopilot plan proposal (daily vs continuous): lean prompt + `./ctx` CLI (`propose_daily_plan`; EC2 `run_task.py --propose-plan [--regenerate] [--plan-suffix]`)
Expand Down
Loading
Loading