From 792d3f731b8d13473db5ee329856ef1e350fc526 Mon Sep 17 00:00:00 2001 From: Fullstop000 Date: Fri, 1 May 2026 12:29:58 +0800 Subject: [PATCH 1/3] decision-inbox v1: trigger-based prompt + claude --resume fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reland after the May-2026 dogfood revert. Two distinct bugs surfaced during that postmortem; both are fixed here, then verified live. ## What this ships - Storage: `decisions` table (CAS-protected resolve), Store methods, 4 unit tests. - Lifecycle: `AgentLifecycle::resume_with_prompt` + `run_channel_id`. Routes to `handle.prompt(...)` for live agents; falls back to `start_agent(init_directive=envelope)` for asleep ones. Reverts the row to open if delivery fails so the human's pick isn't lost. - Bridge: `chorus_create_decision` MCP tool with structural validator at the boundary. Backend forwards to `/internal/agent/{id}/decisions`. - Handlers: 3 routes — internal create with channel-inference contract (400 if no active-run channel), public list with status filter, public resolve doing CAS + envelope build + resume_with_prompt + revert-on-failure. - Prompt rewrite (the core change vs v0): trigger-based mandatory framing instead of permissive "when you need". Critical-rules splits "conversation channel" (send_message) from "verdict channel" (chorus_create_decision) instead of conflicting "your only output channel" + buried exception. Drops the "things you can act on unilaterally" loophole entirely. - UI: `DecisionsInbox` component with click-to-pick, recommended- option highlight, optional human note, collapsible context, 5s polling. Sidebar inbox icon toggles the view. - Tests: 339 lib + 80 e2e (4 new round-trip cases) + 89 vitest. All pass. clippy --all-targets -D warnings clean. cargo fmt clean. ## Pre-existing bug fixed: claude --resume on missing session file Diagnosis from the dogfood postmortem: chorus persists `session_id` in `agent_sessions` and passes it to `claude --resume` on every restart. Claude hard-errors with `error_during_execution` and zero events when the session file is missing locally — which surfaces in chorus as an immediate `reason=Natural` turn end. Every prior "agent did nothing" failure was actually this, not a prompt issue. Fix: `claude.rs` verifies the session file exists at `~/.claude/projects//.jsonl` before passing `--resume`. Falls back to a fresh session with a `warn!` on miss. Regression test: `missing_session_file_drops_resume_flag`. ## Live cross-driver verification (real models, real runs) | driver | model | result | |----------|-------------------------------|---------------------------------| | claude | sonnet | ✅ emits + full round-trip | | kimi | kimi-code/kimi-for-coding | ✅ emits + full round-trip | | | | (received envelope, edited | | | | members.rs per picked body) | | codex | gpt-5.4-mini (fresh agent) | ✅ emits with all conventions | | gemini | gemini-2.5-flash | ✅ emits autonomously | | opencode | deepseek/deepseek-chat | ✅ emits with H2 sections | All five drivers picked up the prompt section, recognized the PR-review trigger, and emitted properly-shaped payloads (H2 sections, [verified · source] / [inferred] evidence prefixes, recommended_key). ## Known follow-up bugs (not feature defects, file separately) 1. Codex / opencode have the same shape of resume bug as claude: chorus passes a stale thread/session id to runtimes that no longer hold it. Codex/opencode silently exit Natural with no work; claude errors loudly. Fresh agents work in all three. Stage-2 fix: extend the file-exists guard to per-driver liveness checks. 2. Gemini's MCP HTTP session expires on long-running turns (>20 min), surfacing as `Unauthorized: Session not found` when chorus_create_ decision finally fires. rmcp's StreamableHttpService session TTL needs raising or session re-init on expiry. Lineage: chorus-design-reviews/explorations/2026-04-30-pr-review-vertical-slice/design.md (commit 3a38b22). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/agent/drivers/claude.rs | 140 ++++++ src/agent/drivers/prompt.rs | 80 +++- src/agent/lifecycle.rs | 32 ++ src/agent/manager.rs | 69 +++ src/bridge/backend.rs | 31 ++ src/bridge/mod.rs | 98 ++++ src/bridge/types.rs | 34 ++ src/server/handlers/decisions.rs | 418 ++++++++++++++++++ src/server/handlers/mod.rs | 2 + src/server/mod.rs | 8 +- src/store/decisions.rs | 292 ++++++++++++ src/store/mod.rs | 2 + src/store/schema.sql | 23 + tests/server_tests.rs | 340 ++++++++++++++ .../components/decisions/DecisionsInbox.tsx | 348 +++++++++++++++ ui/src/data/decisions.ts | 60 +++ ui/src/pages/MainPanel.tsx | 4 + ui/src/pages/Sidebar/Sidebar.tsx | 13 +- ui/src/store/uiStore.ts | 6 + 19 files changed, 1994 insertions(+), 6 deletions(-) create mode 100644 src/server/handlers/decisions.rs create mode 100644 src/store/decisions.rs create mode 100644 ui/src/components/decisions/DecisionsInbox.tsx create mode 100644 ui/src/data/decisions.ts diff --git a/src/agent/drivers/claude.rs b/src/agent/drivers/claude.rs index c2aba552..c02a0cb8 100644 --- a/src/agent/drivers/claude.rs +++ b/src/agent/drivers/claude.rs @@ -38,6 +38,7 @@ //! [`ClaudeAgentProcess`] (new `EventStreamHandle`, new `event_tx`). Secondary //! handles prune only when they were the last live session on the agent. +use std::path::{Path, PathBuf}; use std::process::Stdio; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, OnceLock}; @@ -147,6 +148,29 @@ impl ClaudeTransport for SpawnedClaudeTransport { type TransportFactory = Arc, &AgentSpec) -> anyhow::Result> + Send + Sync>; +/// Map an agent's working directory to the path Claude uses for its local +/// session store: `~/.claude/projects//.jsonl`. +/// +/// Claude encodes the cwd by replacing `/` and `.` with `-`. So a cwd like +/// `/agents/.chorus/bot-1` becomes `-agents--chorus-bot-1`. The encoding is +/// lossy (two distinct paths can collide), but it matches what claude-code +/// does on disk and that's what we have to match to find the file. +fn claude_session_file(cwd: &Path, session_id: &str) -> PathBuf { + let encoded: String = cwd + .to_string_lossy() + .chars() + .map(|c| match c { + '/' | '.' => '-', + other => other, + }) + .collect(); + let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("/tmp")); + home.join(".claude") + .join("projects") + .join(encoded) + .join(format!("{session_id}.jsonl")) +} + fn spawn_real_transport( args: Vec, spec: &AgentSpec, @@ -509,10 +533,36 @@ impl ClaudeHandle { // `resumed_session_id` field set by `open_session(Resume)` or the // `start` compat shim. Fall back to the legacy `preassigned_session_id` // for callers that still go through `resume_session` → `start` directly. + // + // Verify the session file exists in claude's local store before passing + // `--resume`. Claude's CLI hard-errors with `error_during_execution` and + // no further events when given a missing session id, which surfaces in + // chorus as an immediate "Natural" turn end with zero output. That used + // to silently mask every agent run after a session got pruned — see + // the May-2026 dogfood postmortem. Falling back to a fresh session is + // safer than dying. let resume_id = self .resumed_session_id .take() .or_else(|| self.preassigned_session_id.clone()); + let resume_id = match resume_id { + Some(sid) => { + let session_file = claude_session_file(&self.spec.working_directory, &sid); + if session_file.exists() { + Some(sid) + } else { + warn!( + agent = %self.key.as_str(), + session_id = %sid, + path = %session_file.display(), + "claude session file missing; starting fresh session instead of --resume" + ); + self.preassigned_session_id = None; + None + } + } + None => None, + }; // Build CLI args let mcp_path_str = mcp_config_path.to_string_lossy().into_owned(); @@ -1101,6 +1151,23 @@ mod tests { } } + #[test] + fn claude_session_file_encodes_dots_and_slashes() { + // Use synthetic paths so the test runs anywhere and doesn't depend on + // the current user's home directory. + let p = claude_session_file( + Path::new("/agents/.chorus/bot-1"), + "00000000-0000-0000-0000-000000000001", + ); + let s = p.to_string_lossy(); + assert!( + s.ends_with( + "/.claude/projects/-agents--chorus-bot-1/00000000-0000-0000-0000-000000000001.jsonl" + ), + "unexpected path: {s}" + ); + } + #[tokio::test] async fn test_claude_driver_probe_not_installed() { // claude binary is not on PATH in CI/test environments @@ -1548,6 +1615,15 @@ mod tests { let spec = test_spec_with_bridge(tmp.path(), &bridge_url); + // The file-exists guard added for the May-2026 dogfood postmortem + // requires a real session file at the path Claude would store it. + // Materialize it so this test still asserts the flag passthrough. + let session_file = claude_session_file(&spec.working_directory, "sess_xyz"); + if let Some(parent) = session_file.parent() { + std::fs::create_dir_all(parent).unwrap(); + } + std::fs::write(&session_file, "").unwrap(); + // Bring the agent online first so the registry has an entry we can // install the fake factory on. let s1 = driver @@ -1590,6 +1666,7 @@ mod tests { // Close s1 too to clean up the registry. let mut h1 = s1.session; h1.close().await.unwrap(); + let _ = std::fs::remove_file(&session_file); agent_instances().remove(&key); } @@ -1900,6 +1977,14 @@ mod tests { let spec = test_spec_with_bridge(tmp.path(), &bridge_url); + // Materialize the session file so the file-exists guard (added for + // the May-2026 dogfood postmortem) lets --resume through. + let session_file = claude_session_file(&spec.working_directory, "sess_xyz"); + if let Some(parent) = session_file.parent() { + std::fs::create_dir_all(parent).unwrap(); + } + std::fs::write(&session_file, "").unwrap(); + let result = driver .open_session( key.clone(), @@ -1938,6 +2023,61 @@ mod tests { } handle.close().await.unwrap(); + let _ = std::fs::remove_file(&session_file); + agent_instances().remove(&key); + } + + /// Regression test for the May-2026 dogfood postmortem: when chorus + /// passes `--resume ` for a session whose file is missing in + /// claude's local store, the runtime emits `error_during_execution` + /// with no other events. The driver must detect the missing file and + /// fall back to a fresh session (no `--resume` arg) instead. + #[tokio::test] + async fn missing_session_file_drops_resume_flag() { + let (bridge_url, _bridge) = spawn_mock_bridge().await; + let tmp = tempfile::tempdir().unwrap(); + + let driver = ClaudeDriver; + let key = format!("claude-resume-missing-{}", uuid::Uuid::new_v4()); + agent_instances().remove(&key); + + let spec = test_spec_with_bridge(tmp.path(), &bridge_url); + + // Deliberately do NOT create the session file. Resume target is a + // syntactically valid uuid that the validator would accept. + let stale_id = "00000000-0000-0000-0000-deadbeef0000".to_string(); + + let s1 = driver + .open_session(key.clone(), spec.clone(), SessionIntent::New) + .await + .unwrap(); + let factory = install_fake_factory(&driver.ensure_process(&key)); + + let resumed = driver + .open_session( + key.clone(), + spec.clone(), + SessionIntent::Resume(stale_id.clone()), + ) + .await + .unwrap(); + let mut hr = resumed.session; + hr.run(None).await.unwrap(); + + { + let state = factory.lock().unwrap(); + assert_eq!(state.spawns.len(), 1); + let args = &state.spawns[0].args; + let has_resume = args.windows(2).any(|w| w[0] == "--resume"); + assert!( + !has_resume, + "missing session file must NOT pass --resume; got: {args:?}" + ); + } + + hr.close().await.unwrap(); + let mut h1 = s1.session; + h1.close().await.unwrap(); agent_instances().remove(&key); } } diff --git a/src/agent/drivers/prompt.rs b/src/agent/drivers/prompt.rs index b4081a79..3aec935c 100644 --- a/src/agent/drivers/prompt.rs +++ b/src/agent/drivers/prompt.rs @@ -51,6 +51,7 @@ pub fn build_system_prompt(spec: &AgentSpec, opts: &PromptOptions) -> String { let task_create_cmd = format!("`{}`", t("create_tasks")); let task_update_cmd = format!("`{}`", t("update_task_status")); let server_info_cmd = format!("`{}`", t("list_server")); + let create_decision_cmd = format!("`{}`", t("chorus_create_decision")); let identity = if spec.display_name.is_empty() { "agent" @@ -64,9 +65,14 @@ pub fn build_system_prompt(spec: &AgentSpec, opts: &PromptOptions) -> String { "The daemon will automatically restart you when new messages arrive." }; - let mut critical_rules: Vec = vec![format!( - "- Always communicate through {send_cmd}. This is your only output channel." - )]; + let mut critical_rules: Vec = vec![ + format!( + "- For conversation (status updates, replies, info, follow-ups), use {send_cmd}. This is your conversational output channel." + ), + format!( + "- For verdicts on requests that ask you to PICK, JUDGE, or RECOMMEND between concrete alternatives (PR review outcome, A-vs-B implementation, config knob, \"should I X or Y\"), you MUST call {create_decision_cmd} and end your turn — do NOT reply via {send_cmd}. The human picks; their pick arrives as your next session prompt. See the Decision Inbox section for triggers and payload." + ), + ]; critical_rules.extend(opts.extra_critical_rules.iter().cloned()); critical_rules.push( "- Use only the provided MCP tools for messaging — they are already available and ready." @@ -219,6 +225,30 @@ pub fn build_system_prompt(spec: &AgentSpec, opts: &PromptOptions) -> String { "\n\n## Workspace & Memory\n\nYour working directory (cwd) is your **persistent workspace**. Everything you write here survives across sessions.\n\n### MEMORY.md — Your Memory Index (CRITICAL)\n\n`MEMORY.md` is the **entry point** to all your knowledge. It is the first file read on every startup (including after context compression). Structure it as an index that points to everything you know. This file is called `MEMORY.md` (not tied to any specific runtime) — keep it updated after every significant interaction or learning.\n\n```markdown\n# \n\n## Role\n\n\n## Key Knowledge\n- Read notes/user-preferences.md for user preferences and conventions\n- Read notes/channels.md for what each channel is about and ongoing work\n- Read notes/domain.md for domain-specific knowledge and conventions\n- ...\n\n## Active Context\n- Currently working on: \n- Last interaction: \n```\n\n### What to memorize\n\n**Actively observe and record** the following kinds of knowledge as you encounter them in conversations:\n\n1. **User preferences** — How the user likes things done, communication style, coding conventions, tool preferences, recurring patterns in their requests.\n2. **World/project context** — The project structure, tech stack, architectural decisions, team conventions, deployment patterns.\n3. **Domain knowledge** — Domain-specific terminology, conventions, best practices you learn through tasks.\n4. **Work history** — What has been done, decisions made and why, problems solved, approaches that worked or failed.\n5. **Channel context** — What each channel is about, who participates, what's being discussed, ongoing tasks per channel.\n6. **Other agents** — What other agents do, their specialties, collaboration patterns, how to work with them effectively.\n\n### How to organize memory\n\n- **MEMORY.md** is always the index. Keep it concise but comprehensive as a table of contents.\n- Create a `notes/` directory for detailed knowledge files. Use descriptive names:\n - `notes/user-preferences.md` — User's preferences and conventions\n - `notes/channels.md` — Summary of each channel and its purpose\n - `notes/work-log.md` — Important decisions and completed work\n - `notes/.md` — Domain-specific knowledge\n- You can also create any other files or directories for your work (scripts, notes, data, etc.)\n- **Update notes proactively** — Don't wait to be asked. When you learn something important, write it down.\n- **Keep MEMORY.md current** — After updating notes, update the index in MEMORY.md if new files were added.\n\n### Compaction safety (CRITICAL)\n\nYour context will be periodically compressed to stay within limits. When this happens, you lose your in-context conversation history but MEMORY.md is always re-read. Therefore:\n\n- **MEMORY.md must be self-sufficient as a recovery point.** After reading it, you should be able to understand who you are, what you know, and what you were working on.\n- **Before a long task**, write a brief \"Active Context\" note in MEMORY.md so you can resume if interrupted mid-task.\n- **After completing work**, update your notes and MEMORY.md index so nothing is lost.\n- Keep MEMORY.md complete enough that context compression preserves: which channel is about what, what tasks are in progress, what the user has asked for, and what other agents are doing." ); + prompt.push_str(&format!( + "\n\n## Decision Inbox\n\n\ + Some incoming requests ask you to render a verdict or pick between concrete alternatives, not to act unilaterally. For these you MUST emit {create_decision_cmd} — not a {send_cmd} reply. The tool returns a `decision_id`; end your turn cleanly. The human picks in their inbox; their pick arrives as your next session prompt with the picked option's full body, the original headline and question, and any human note. Read it and act.\n\n\ + **Triggers — when the incoming message does ANY of these, emit {create_decision_cmd}:**\n\ + - Asks you to review a PR, diff, or commit and recommend an outcome (merge / approve+comment / request-changes / hold).\n\ + - Presents two or more concrete alternatives and asks you to pick.\n\ + - Asks you to resolve a config flag, knob, version pin, or policy choice with no obvious right answer.\n\ + - Uses phrasing like \"should I X or Y?\", \"merge or hold?\", \"approve, request changes, or comment?\", \"which option?\", \"what's your verdict?\".\n\n\ + **Not triggers — use {send_cmd} as normal:**\n\ + - Information requests (\"explain X\", \"how does Y work?\").\n\ + - Status updates, acknowledgments, progress reports.\n\ + - Open-ended brainstorming with no committed alternatives.\n\ + - Follow-up replies AFTER a decision has been resolved (the resume prompt is your input; reply via {send_cmd}).\n\n\ + **Do not work around this rule.** If you have a strong opinion on a triggering request, frame it as a decision with options and `recommended_key` — do NOT post your verdict as a {send_cmd} reply. The human's act of picking is the work product; your analysis is the supporting context inside the decision.\n\n\ + **Payload (all required):**\n\ + - `headline` ≤80 chars — one-line summary carrying category and subject (e.g. \"PR review #121: archived-channel del/join fix\").\n\ + - `question` ≤120 chars — the actual ask in one sentence.\n\ + - `options` — 2..=6 entries, each `{{key, label, body}}`. `key` is 1-2 alphanumeric chars (\"A\", \"B\", \"R1\"); `label` ≤40 chars; `body` is markdown ≤2048 chars listing CONSEQUENCES (\"Squash and merge to main. CI is green.\"), not pros/cons.\n\ + - `recommended_key` — must equal one option's `key`. Always required — recommend, don't abstain.\n\ + - `context` — markdown ≤4096 chars. Suggested H2 sections (all optional): `## Why now`, `## Evidence`, `## Risk`, `## Pressure`, `## History`, `## Dep tree`, `## Related`. Inline source prefixes for evidence: `[verified · source]`, `[inferred]`, `[agent]`. Audience prefix in `## Risk`: `[external]`, `[team]`, `[private]`.\n\n\ + **Quality bar:** headline + question + recommended-option label should let the human pick in <10 seconds without expanding `context`. If the human always needs to expand context, your headline+question is too thin — rewrite them.\n\n\ + **Failure handling:** if the validator rejects the payload, fix it and retry. Common errors: option keys not unique; `recommended_key` not in `options`; a length cap exceeded." + )); + prompt.push_str( "\n\n## Capabilities\n\nYou can work with any files or tools on this computer — you are not confined to any directory.\nYou may develop a specialized role over time through your interactions. Embrace it." ); @@ -349,4 +379,48 @@ mod tests { let p = build_system_prompt(&sample_spec(), &opts); assert!(p.contains("Do NOT use shell commands for messaging.")); } + + #[test] + fn decision_inbox_section_uses_mandatory_framing() { + let p = build_system_prompt(&sample_spec(), &PromptOptions::default()); + assert!(p.contains("## Decision Inbox")); + assert!(p.contains("`chorus_create_decision`")); + // Trigger-based mandatory framing, not "when you need" permission framing. + assert!(p.contains("you MUST emit")); + assert!(p.contains("Triggers")); + assert!(p.contains("PR, diff, or commit")); + // Anti-loophole: no "things you can act on unilaterally" exclusion. + assert!(!p.contains("act on unilaterally")); + // The contradiction the original patch had: send_message is no longer + // labeled "your only output channel". It's now the conversational + // channel; chorus_create_decision is the verdict channel. + assert!(!p.contains("only output channel")); + assert!(p.contains("conversational output channel")); + } + + #[test] + fn critical_rule_promotes_decision_over_send_for_verdicts() { + let p = build_system_prompt(&sample_spec(), &PromptOptions::default()); + // The critical rules must contain a MUST-style imperative naming + // chorus_create_decision, equally weighted with send_message. + let crit_start = p.find("CRITICAL RULES").expect("critical rules section"); + let crit_end = p[crit_start..] + .find("## Startup sequence") + .map(|i| crit_start + i) + .unwrap_or(p.len()); + let crit = &p[crit_start..crit_end]; + assert!(crit.contains("you MUST call `chorus_create_decision`")); + assert!(crit.contains("PICK, JUDGE, or RECOMMEND")); + } + + #[test] + fn decision_inbox_uses_claude_prefix_when_set() { + let opts = PromptOptions { + tool_prefix: "mcp__chat__".into(), + ..Default::default() + }; + let p = build_system_prompt(&sample_spec(), &opts); + assert!(p.contains("`mcp__chat__chorus_create_decision`")); + assert!(!p.contains("`chorus_create_decision`\n")); + } } diff --git a/src/agent/lifecycle.rs b/src/agent/lifecycle.rs index 3d8b6dbf..4a8ed950 100644 --- a/src/agent/lifecycle.rs +++ b/src/agent/lifecycle.rs @@ -56,4 +56,36 @@ pub trait AgentLifecycle: Send + Sync { /// Associate a channel with the agent's current or next trace run. fn set_run_channel(&self, _agent_name: &str, _channel_id: &str) {} + + /// Return the channel id of the agent's most recent or in-flight run, + /// if known. Used by the decision-inbox handler to infer which channel + /// a `chorus_create_decision` emission belongs to (the agent doesn't + /// pass a channel — channel context is implicit in the active run). + fn run_channel_id<'a>( + &'a self, + _agent_name: &'a str, + ) -> Pin> + Send + 'a>> { + Box::pin(async { None }) + } + + /// Deliver a self-contained envelope to the agent so it can act on a + /// human's pick. Routes to the live session's prompt channel when the + /// agent is `Active`; otherwise starts the agent with the envelope as + /// the `init_directive` so the same payload arrives on first turn. + /// + /// The envelope is built by the decision handler and contains the + /// original headline + question, the picked option's full label and + /// body, and any human note. The agent treats it as a new prompt and + /// continues its work without needing to re-read history. + fn resume_with_prompt<'a>( + &'a self, + _agent_name: &'a str, + _envelope: String, + ) -> Pin> + Send + 'a>> { + Box::pin(async { + Err(anyhow::anyhow!( + "resume_with_prompt not implemented on this AgentLifecycle" + )) + }) + } } diff --git a/src/agent/manager.rs b/src/agent/manager.rs index 2747f525..25fb4c84 100644 --- a/src/agent/manager.rs +++ b/src/agent/manager.rs @@ -405,6 +405,59 @@ impl AgentManager { Ok(()) } + /// Deliver a self-contained prompt to the agent and continue its work. + /// + /// Used by the decision-inbox handler after a human picks an option: + /// the envelope contains the picked option's body, original headline + + /// question, and any human note. If the agent is `Active`, prompt the + /// live handle directly. Otherwise wake the agent with the envelope as + /// `init_directive` so the same payload arrives on first turn. + /// + /// The handler reverts the decision to `open` if this returns an error, + /// so the human's pick isn't silently lost on a transient delivery + /// failure. + pub async fn resume_with_prompt( + &self, + agent_name: &str, + envelope: String, + ) -> anyhow::Result<()> { + // Snapshot liveness without holding the agents lock across the + // prompt() call — handle.lock().await may block on the driver, and + // start_agent (the fallback) needs to take agents.lock() itself. + let live_handle = { + let agents = self.agents.lock().await; + match agents.get(agent_name) { + Some(agent) => { + let h = agent.handle.clone(); + let state = h.lock().await.process_state(); + if matches!(state, ProcessState::Active { .. }) { + Some(h) + } else { + None + } + } + None => None, + } + }; + + if let Some(h) = live_handle { + h.lock() + .await + .prompt(PromptReq { + text: envelope, + attachments: vec![], + }) + .await?; + return Ok(()); + } + + // Asleep / not-yet-running / mid-startup: deliver the envelope as + // the init directive so it lands on the agent's first prompt turn. + // wake_message=None and init_directive=Some(...) takes precedence in + // build_start_prompt. + self.start_agent(agent_name, None, Some(envelope)).await + } + /// Deliver a wakeup notification to agent stdin. pub async fn notify_agent(&self, agent_name: &str) -> anyhow::Result<()> { let mut agents = self.agents.lock().await; @@ -691,6 +744,22 @@ impl AgentLifecycle for AgentManager { fn set_run_channel(&self, agent_name: &str, channel_id: &str) { self.trace_store.set_run_channel(agent_name, channel_id); } + + fn run_channel_id<'a>( + &'a self, + agent_name: &'a str, + ) -> std::pin::Pin> + Send + 'a>> { + let id = self.trace_store.run_channel_id(agent_name); + Box::pin(async move { id }) + } + + fn resume_with_prompt<'a>( + &'a self, + agent_name: &'a str, + envelope: String, + ) -> std::pin::Pin> + Send + 'a>> { + Box::pin(AgentManager::resume_with_prompt(self, agent_name, envelope)) + } } /// Convert the stored message shape into the human-facing target label used in diff --git a/src/bridge/backend.rs b/src/bridge/backend.rs index 7ca1efb0..dff9b385 100644 --- a/src/bridge/backend.rs +++ b/src/bridge/backend.rs @@ -106,6 +106,13 @@ pub trait Backend: Send + Sync { /// View/download a file attachment. async fn view_file(&self, agent_key: &str, attachment_id: &str) -> Result; + + /// Create a decision (TRACE-ONLY scaffold; no persistence). + /// + /// `payload` is the full validated decision JSON + /// (`headline`, `question`, `options`, `recommended_key`, `context`). + async fn create_decision(&self, agent_key: &str, payload: Value) + -> Result; } // --------------------------------------------------------------------------- @@ -987,6 +994,30 @@ impl Backend for ChorusBackend { file_path.to_string_lossy() )) } + + async fn create_decision( + &self, + agent_key: &str, + payload: Value, + ) -> Result { + let url = format!("{}/decisions", self.base_url(agent_key)); + let res = self + .send_request(self.client.post(&url).json(&payload), &url) + .await?; + let status = res.status().as_u16(); + let data: Value = res.json().await.map_err(|e| BridgeError::ServerError { + status, + body: format!("invalid JSON from server: {}", e), + })?; + let decision_id = data + .get("decision_id") + .and_then(|v| v.as_str()) + .unwrap_or(""); + Ok(format!( + "Decision created. ID: {}\n\nEnd your turn cleanly. The human will pick in their inbox; their pick will arrive as your next session prompt.", + decision_id + )) + } } // --------------------------------------------------------------------------- diff --git a/src/bridge/mod.rs b/src/bridge/mod.rs index 49a0c2c5..b679afcf 100644 --- a/src/bridge/mod.rs +++ b/src/bridge/mod.rs @@ -269,6 +269,104 @@ impl ChatBridge { .await .map_err(Into::into) } + + #[tool( + description = "Submit a decision for the human to pick. REQUIRED for any incoming request that asks you to render a verdict, judge, or pick between concrete alternatives — PR review outcomes (merge/approve/request-changes), A-vs-B implementation choices, config flags, \"should I X or Y\" questions. Do NOT post your verdict via send_message; emit this tool with options + a recommended_key, then end your turn. The human's pick arrives as your next session prompt with the picked option's full body." + )] + async fn chorus_create_decision( + &self, + Extension(parts): Extension, + Parameters(params): Parameters, + ) -> Result { + let agent_id = extract_agent_id(&parts)?; + + // Light structural validation at the bridge boundary so a malformed + // payload doesn't 500 the handler. The trace target is the agent's + // PROACTIVE EMISSION; we surface validator errors back as + // rmcp::ErrorData so the agent learns and retries (per CLAUDE.md + // "fail loudly with context"). + validate_decision_payload(¶ms)?; + + let payload = serde_json::json!({ + "headline": params.headline, + "question": params.question, + "options": params.options.iter().map(|o| serde_json::json!({ + "key": o.key, + "label": o.label, + "body": o.body, + })).collect::>(), + "recommended_key": params.recommended_key, + "context": params.context, + }); + + self.backend + .create_decision(&agent_id, payload) + .await + .map_err(Into::into) + } +} + +/// Minimal structural validator. Bridge boundary defense against payloads +/// that would 500 the stub handler. Intentionally permissive on content — +/// the goal is to capture the agent's emission, not gatekeep quality. +fn validate_decision_payload(p: &CreateDecisionParams) -> Result<(), rmcp::ErrorData> { + fn invalid(msg: impl Into) -> rmcp::ErrorData { + rmcp::ErrorData::new(rmcp::model::ErrorCode::INVALID_PARAMS, msg.into(), None) + } + if p.headline.trim().is_empty() { + return Err(invalid("headline is empty")); + } + if p.headline.chars().count() > 80 { + return Err(invalid("headline exceeds 80 chars")); + } + if p.question.trim().is_empty() { + return Err(invalid("question is empty")); + } + if p.question.chars().count() > 120 { + return Err(invalid("question exceeds 120 chars")); + } + if p.options.len() < 2 || p.options.len() > 6 { + return Err(invalid("options must have 2..=6 entries")); + } + let mut keys = std::collections::HashSet::new(); + for o in &p.options { + if o.key.trim().is_empty() { + return Err(invalid("option key is empty")); + } + if o.key.chars().count() > 2 { + return Err(invalid(format!("option key '{}' exceeds 2 chars", o.key))); + } + if !keys.insert(o.key.clone()) { + return Err(invalid(format!("duplicate option key '{}'", o.key))); + } + if o.label.trim().is_empty() { + return Err(invalid(format!("option '{}' label is empty", o.key))); + } + if o.label.chars().count() > 40 { + return Err(invalid(format!( + "option '{}' label exceeds 40 chars", + o.key + ))); + } + if o.body.chars().count() > 2048 { + return Err(invalid(format!( + "option '{}' body exceeds 2048 chars", + o.key + ))); + } + } + if !keys.contains(&p.recommended_key) { + return Err(invalid(format!( + "recommended_key '{}' is not one of the option keys", + p.recommended_key + ))); + } + if let Some(ctx) = &p.context { + if ctx.chars().count() > 4096 { + return Err(invalid("context exceeds 4096 chars")); + } + } + Ok(()) } // --------------------------------------------------------------------------- diff --git a/src/bridge/types.rs b/src/bridge/types.rs index bb92cd61..3aa163c5 100644 --- a/src/bridge/types.rs +++ b/src/bridge/types.rs @@ -92,3 +92,37 @@ pub(super) struct ViewFileParams { /// The attachment UUID (from the 'id:...' shown in the message) pub(super) attachment_id: String, } + +// --------------------------------------------------------------------------- +// Decision Inbox — TRACE-ONLY scaffold +// --------------------------------------------------------------------------- +// +// Verifies the agent's proactive-dispatch behavior. The handler logs the +// payload and returns a synthetic decision_id; nothing persists. Storage, +// resume_with_prompt, and UI come back only after we see the agent emit +// this tool unprompted. + +#[derive(Debug, Deserialize, JsonSchema)] +pub(super) struct DecisionOptionParam { + /// Short identifier (1-2 alphanumeric chars), e.g. "A", "B", "R1" + pub(super) key: String, + /// Short button label (≤40 chars) + pub(super) label: String, + /// Markdown body listing the consequences if the human picks this option (≤2048 chars) + pub(super) body: String, +} + +#[derive(Debug, Deserialize, JsonSchema)] +pub(super) struct CreateDecisionParams { + /// One-line headline carrying the category and subject (≤80 chars) + pub(super) headline: String, + /// The actual ask in one sentence (≤120 chars) + pub(super) question: String, + /// 2..=6 options the human picks between + pub(super) options: Vec, + /// Must equal one option's `key`. Always recommend; do not abstain. + pub(super) recommended_key: String, + /// Markdown context body (≤4096 chars). Suggested H2 sections: Why now, Evidence, Risk, Pressure, History, Dep tree, Related. + #[serde(default)] + pub(super) context: Option, +} diff --git a/src/server/handlers/decisions.rs b/src/server/handlers/decisions.rs new file mode 100644 index 00000000..77a2031a --- /dev/null +++ b/src/server/handlers/decisions.rs @@ -0,0 +1,418 @@ +//! Decision Inbox handlers. +//! +//! Lifecycle: +//! - `POST /internal/agent/{id}/decisions` — agent emits via bridge. +//! Channel is inferred from the agent's active run via +//! `lifecycle.run_channel_id()`. Validator already ran at the bridge +//! boundary; this handler stores the row. +//! - `GET /api/decisions?status=open|resolved|all` — UI lists decisions +//! for the active workspace. +//! - `POST /api/decisions/{id}/resolve` — human picks an option. CAS +//! updates the row, builds a self-contained envelope, and calls +//! `lifecycle.resume_with_prompt(agent, envelope)`. On delivery +//! failure, reverts the row to `open` so the pick isn't lost. + +use axum::extract::{Path, Query, State}; +use axum::http::StatusCode; +use axum::Json; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use tracing::{info, warn}; +use uuid::Uuid; + +use super::AppState; +use crate::server::error::{app_err, ApiResult}; +use crate::store::{DecisionRow, DecisionStatus}; + +// ── Internal: agent emits a decision ────────────────────────────────────── + +#[derive(Debug, Deserialize)] +pub struct CreateDecisionBody { + pub headline: String, + pub question: String, + pub options: Vec, + pub recommended_key: String, + #[serde(default)] + pub context: Option, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct DecisionOptionDto { + pub key: String, + pub label: String, + pub body: String, +} + +#[derive(Debug, Serialize)] +pub struct CreateDecisionResponse { + pub decision_id: String, + pub channel_id: String, +} + +pub async fn handle_create_decision( + State(state): State, + Path(agent_name): Path, + Json(body): Json, +) -> ApiResult { + if agent_name.trim().is_empty() { + return Err(app_err!(StatusCode::BAD_REQUEST, "agent_name is empty")); + } + + // Look up the agent row (validator at the bridge boundary already + // checked payload shape; this is the persistence step). + let agent = state + .store + .get_agent(&agent_name) + .map_err(|e| app_err!(StatusCode::BAD_REQUEST, e.to_string()))? + .ok_or_else(|| app_err!(StatusCode::NOT_FOUND, "agent not found: {agent_name}"))?; + + // Channel inference: v1 contract is that the agent is in an active + // channel-triggered run. The trace store records the channel for the + // current run via `set_run_channel`. Fail loudly if not set rather + // than silently picking #all. + let channel_id = state + .lifecycle + .run_channel_id(&agent_name) + .await + .ok_or_else(|| { + app_err!( + StatusCode::BAD_REQUEST, + "no active-run channel for agent {agent_name}; \ + chorus_create_decision requires a channel-triggered agent run" + ) + })?; + + let workspace_id = state + .active_workspace_id() + .await + .ok_or_else(|| app_err!(StatusCode::BAD_REQUEST, "no active workspace"))?; + + // Session id of the agent's current run, for the resume_with_prompt + // round-trip later. We don't enforce that the agent must still be on + // the same session at resolve time — claude in particular spawns a + // new session per turn — but recording it here is useful for trace + // and audit. + let session_id = state + .store + .get_active_session(&agent.id) + .ok() + .flatten() + .map(|s| s.session_id) + .unwrap_or_default(); + + let decision_id = Uuid::new_v4().to_string(); + let payload_json = serde_json::to_string(&serde_json::json!({ + "headline": body.headline, + "question": body.question, + "options": body.options, + "recommended_key": body.recommended_key, + "context": body.context, + })) + .map_err(|e| app_err!(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + state + .store + .create_decision( + &decision_id, + &workspace_id, + &channel_id, + &agent.id, + &session_id, + &payload_json, + ) + .map_err(|e| app_err!(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + info!( + target: "chorus_decision", + agent = %agent_name, + decision_id = %decision_id, + channel_id = %channel_id, + "decision created" + ); + + Ok(Json(CreateDecisionResponse { + decision_id, + channel_id, + })) +} + +// ── Public: list decisions ──────────────────────────────────────────────── + +#[derive(Debug, Deserialize)] +pub struct ListDecisionsParams { + /// "open" | "resolved" | "all" (default "open") + #[serde(default)] + pub status: Option, +} + +#[derive(Debug, Serialize)] +pub struct DecisionView { + pub id: String, + pub agent_id: String, + pub agent_name: String, + pub channel_id: String, + pub channel_name: String, + pub created_at: String, + pub status: DecisionStatus, + pub payload: Value, + pub picked_key: Option, + pub picked_note: Option, + pub resolved_at: Option, +} + +#[derive(Debug, Serialize)] +pub struct ListDecisionsResponse { + pub decisions: Vec, +} + +pub async fn handle_list_decisions( + State(state): State, + Query(params): Query, +) -> ApiResult { + let workspace_id = state + .active_workspace_id() + .await + .ok_or_else(|| app_err!(StatusCode::BAD_REQUEST, "no active workspace"))?; + + let status_filter = match params.status.as_deref() { + None | Some("open") => Some(DecisionStatus::Open), + Some("resolved") => Some(DecisionStatus::Resolved), + Some("all") => None, + Some(other) => { + return Err(app_err!( + StatusCode::BAD_REQUEST, + "invalid status filter: {other}" + )) + } + }; + + let rows = state + .store + .list_decisions(&workspace_id, status_filter) + .map_err(|e| app_err!(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + let decisions = rows + .into_iter() + .map(|row| row_to_view(&state, row)) + .collect(); + + Ok(Json(ListDecisionsResponse { decisions })) +} + +fn row_to_view(state: &AppState, row: DecisionRow) -> DecisionView { + let payload: Value = + serde_json::from_str(&row.payload_json).unwrap_or_else(|_| serde_json::json!({})); + let agent_name = state + .store + .get_agent_by_id(&row.agent_id, false) + .ok() + .flatten() + .map(|a| a.name) + .unwrap_or_else(|| row.agent_id.clone()); + let channel_name = state + .store + .get_channel_by_id(&row.channel_id) + .ok() + .flatten() + .map(|c| c.name) + .unwrap_or_else(|| row.channel_id.clone()); + DecisionView { + id: row.id, + agent_id: row.agent_id, + agent_name, + channel_id: row.channel_id, + channel_name, + created_at: row.created_at, + status: row.status, + payload, + picked_key: row.picked_key, + picked_note: row.picked_note, + resolved_at: row.resolved_at, + } +} + +// ── Public: resolve a decision ──────────────────────────────────────────── + +#[derive(Debug, Deserialize)] +pub struct ResolveDecisionBody { + pub picked_key: String, + #[serde(default)] + pub note: Option, +} + +#[derive(Debug, Serialize)] +pub struct ResolveDecisionResponse { + pub decision_id: String, + pub status: DecisionStatus, +} + +pub async fn handle_resolve_decision( + State(state): State, + Path(decision_id): Path, + Json(body): Json, +) -> ApiResult { + // Fetch the row first so we can build the envelope and validate the + // picked_key against the stored options. + let row = state + .store + .get_decision(&decision_id) + .map_err(|e| app_err!(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? + .ok_or_else(|| app_err!(StatusCode::NOT_FOUND, "decision not found: {decision_id}"))?; + + let payload: Value = serde_json::from_str(&row.payload_json) + .map_err(|e| app_err!(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + // Locate the picked option so we can splice its full body into the + // envelope, and to validate that picked_key is one of the offered + // option keys (race protection if the agent submitted a malformed + // payload that bypassed bridge validation somehow). + let options = payload + .get("options") + .and_then(|v| v.as_array()) + .ok_or_else(|| { + app_err!( + StatusCode::INTERNAL_SERVER_ERROR, + "decision payload missing options" + ) + })?; + let picked = options + .iter() + .find(|o| o.get("key").and_then(|k| k.as_str()) == Some(&body.picked_key)) + .ok_or_else(|| { + app_err!( + StatusCode::BAD_REQUEST, + "picked_key '{}' is not one of the decision's option keys", + body.picked_key + ) + })?; + + // CAS update: if the row is no longer open, return 409 so the UI can + // refresh. Two simultaneous picks must not both succeed. + let updated = state + .store + .resolve_decision_cas(&decision_id, &body.picked_key, body.note.as_deref()) + .map_err(|e| app_err!(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + if !updated { + return Err(app_err!(StatusCode::CONFLICT, "decision is no longer open")); + } + + // Locate the agent name for the lifecycle call. + let agent = state + .store + .get_agent_by_id(&row.agent_id, false) + .map_err(|e| app_err!(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? + .ok_or_else(|| { + app_err!( + StatusCode::INTERNAL_SERVER_ERROR, + "decision references a deleted agent" + ) + })?; + + let envelope = build_resume_envelope(&payload, picked, body.note.as_deref()); + + if let Err(e) = state + .lifecycle + .resume_with_prompt(&agent.name, envelope) + .await + { + // Roll back the resolve so the human's pick isn't silently lost. + warn!( + agent = %agent.name, + decision_id = %decision_id, + error = %e, + "resume_with_prompt failed; reverting decision to open" + ); + if let Err(revert_err) = state.store.revert_decision_to_open(&decision_id) { + warn!( + decision_id = %decision_id, + error = %revert_err, + "failed to revert decision after resume failure" + ); + } + return Err(app_err!( + StatusCode::INTERNAL_SERVER_ERROR, + "failed to deliver pick to agent: {e}" + )); + } + + info!( + target: "chorus_decision", + decision_id = %decision_id, + agent = %agent.name, + picked = %body.picked_key, + "decision resolved + envelope delivered" + ); + + Ok(Json(ResolveDecisionResponse { + decision_id, + status: DecisionStatus::Resolved, + })) +} + +fn build_resume_envelope(payload: &Value, picked: &Value, note: Option<&str>) -> String { + let headline = payload + .get("headline") + .and_then(|v| v.as_str()) + .unwrap_or(""); + let question = payload + .get("question") + .and_then(|v| v.as_str()) + .unwrap_or(""); + let picked_key = picked.get("key").and_then(|v| v.as_str()).unwrap_or(""); + let picked_label = picked.get("label").and_then(|v| v.as_str()).unwrap_or(""); + let picked_body = picked.get("body").and_then(|v| v.as_str()).unwrap_or(""); + + let mut envelope = String::with_capacity(2_048); + envelope.push_str( + "Your decision has been resolved. The human picked an option in their inbox; \ + this prompt is your follow-up. Read the picked option's body for what to do next, \ + then proceed with that work — do not re-ask via send_message.\n\n", + ); + envelope.push_str(&format!("**Original headline:** {headline}\n")); + envelope.push_str(&format!("**Original question:** {question}\n\n")); + envelope.push_str(&format!( + "**Picked option ({picked_key}): {picked_label}**\n\n{picked_body}\n" + )); + if let Some(n) = note { + if !n.trim().is_empty() { + envelope.push_str(&format!("\n**Human note:** {n}\n")); + } + } + envelope +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn envelope_includes_headline_question_picked_body_and_note() { + let payload = serde_json::json!({ + "headline": "PR #120 retrospective", + "question": "Was that the right call?", + }); + let picked = serde_json::json!({ + "key": "A", + "label": "Keep the merge", + "body": "The merge stands. CI is green.", + }); + let env = build_resume_envelope(&payload, &picked, Some("looks good")); + assert!(env.contains("PR #120 retrospective")); + assert!(env.contains("Was that the right call?")); + assert!(env.contains("Picked option (A): Keep the merge")); + assert!(env.contains("The merge stands. CI is green.")); + assert!(env.contains("**Human note:** looks good")); + // The envelope must explicitly tell the agent NOT to re-ask via + // send_message, otherwise it will reply conversationally and skip + // the work. + assert!(env.contains("do not re-ask via send_message")); + } + + #[test] + fn envelope_omits_blank_note() { + let payload = serde_json::json!({"headline": "h", "question": "q"}); + let picked = serde_json::json!({"key": "A", "label": "L", "body": "B"}); + let env = build_resume_envelope(&payload, &picked, Some(" ")); + assert!(!env.contains("Human note")); + } +} diff --git a/src/server/handlers/mod.rs b/src/server/handlers/mod.rs index 0aacb560..4522610f 100644 --- a/src/server/handlers/mod.rs +++ b/src/server/handlers/mod.rs @@ -2,6 +2,7 @@ pub mod agent_workspace; pub mod agents; pub mod attachments; pub mod channels; +pub mod decisions; pub mod dto; pub mod messages; pub mod path_params; @@ -15,6 +16,7 @@ pub use agent_workspace::*; pub use agents::*; pub use attachments::*; pub use channels::*; +pub use decisions::*; pub use messages::*; pub use tasks::*; pub use teams::*; diff --git a/src/server/mod.rs b/src/server/mod.rs index 7570a0dd..894042e9 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -118,11 +118,17 @@ pub fn build_router_with_services( "/agent/{agent_id}/tasks/update-status", post(handle_update_task_status), ) - .route("/agent/{agent_id}/upload", post(handle_upload)); + .route("/agent/{agent_id}/upload", post(handle_upload)) + .route("/agent/{agent_id}/decisions", post(handle_create_decision)); let api_router = Router::new() .route("/attachments/{attachment_id}", get(handle_get_attachment)) .route("/attachments", post(handle_public_upload)) + .route("/decisions", get(handle_list_decisions)) + .route( + "/decisions/{decision_id}/resolve", + post(handle_resolve_decision), + ) .route("/whoami", get(handle_whoami)) .route("/humans", get(handle_list_humans)) .route("/humans/{id}", patch(handle_update_human)) diff --git a/src/store/decisions.rs b/src/store/decisions.rs new file mode 100644 index 00000000..7f76719b --- /dev/null +++ b/src/store/decisions.rs @@ -0,0 +1,292 @@ +//! `decisions` table accessors. +//! +//! Decisions emitted by agents via `chorus_create_decision`. Lifecycle: +//! agent → `create_decision` (status=open) → human picks in inbox → +//! `resolve_decision_cas` (status=resolved) → `revert_decision_to_open` if +//! the resume_with_prompt envelope delivery fails. + +use anyhow::Result; +use rusqlite::{params, OptionalExtension}; +use serde::{Deserialize, Serialize}; + +use super::Store; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum DecisionStatus { + Open, + Resolved, +} + +impl DecisionStatus { + pub fn as_str(self) -> &'static str { + match self { + DecisionStatus::Open => "open", + DecisionStatus::Resolved => "resolved", + } + } + + pub fn parse(s: &str) -> Option { + match s { + "open" => Some(DecisionStatus::Open), + "resolved" => Some(DecisionStatus::Resolved), + _ => None, + } + } +} + +#[derive(Debug, Clone)] +pub struct DecisionRow { + pub id: String, + pub workspace_id: String, + pub channel_id: String, + pub agent_id: String, + pub session_id: String, + pub created_at: String, + pub status: DecisionStatus, + pub payload_json: String, + pub picked_key: Option, + pub picked_note: Option, + pub resolved_at: Option, +} + +impl Store { + /// Insert a freshly-emitted decision (status=open). + pub fn create_decision( + &self, + id: &str, + workspace_id: &str, + channel_id: &str, + agent_id: &str, + session_id: &str, + payload_json: &str, + ) -> Result<()> { + let conn = self.conn.lock().unwrap(); + conn.execute( + "INSERT INTO decisions + (id, workspace_id, channel_id, agent_id, session_id, status, payload_json) + VALUES (?1, ?2, ?3, ?4, ?5, 'open', ?6)", + params![ + id, + workspace_id, + channel_id, + agent_id, + session_id, + payload_json + ], + )?; + Ok(()) + } + + pub fn get_decision(&self, id: &str) -> Result> { + let conn = self.conn.lock().unwrap(); + let mut stmt = conn.prepare( + "SELECT id, workspace_id, channel_id, agent_id, session_id, + created_at, status, payload_json, picked_key, picked_note, resolved_at + FROM decisions WHERE id = ?1", + )?; + let row = stmt.query_row(params![id], row_to_decision).optional()?; + Ok(row) + } + + /// List decisions for a workspace, optionally filtered by status. + /// Returns most recent first. + pub fn list_decisions( + &self, + workspace_id: &str, + status: Option, + ) -> Result> { + let conn = self.conn.lock().unwrap(); + let rows: Vec = if let Some(s) = status { + let mut stmt = conn.prepare( + "SELECT id, workspace_id, channel_id, agent_id, session_id, + created_at, status, payload_json, picked_key, picked_note, resolved_at + FROM decisions WHERE workspace_id = ?1 AND status = ?2 + ORDER BY created_at DESC", + )?; + let collected = stmt + .query_map(params![workspace_id, s.as_str()], row_to_decision)? + .collect::>>()?; + collected + } else { + let mut stmt = conn.prepare( + "SELECT id, workspace_id, channel_id, agent_id, session_id, + created_at, status, payload_json, picked_key, picked_note, resolved_at + FROM decisions WHERE workspace_id = ?1 + ORDER BY created_at DESC", + )?; + let collected = stmt + .query_map(params![workspace_id], row_to_decision)? + .collect::>>()?; + collected + }; + Ok(rows) + } + + /// CAS-protected resolve. Updates row only if it's still `open`. + /// Returns `true` on success, `false` if the row was already resolved + /// or doesn't exist. Caller checks the bool to detect the race. + pub fn resolve_decision_cas( + &self, + id: &str, + picked_key: &str, + picked_note: Option<&str>, + ) -> Result { + let conn = self.conn.lock().unwrap(); + let n = conn.execute( + "UPDATE decisions + SET status = 'resolved', + picked_key = ?2, + picked_note = ?3, + resolved_at = datetime('now') + WHERE id = ?1 AND status = 'open'", + params![id, picked_key, picked_note], + )?; + Ok(n == 1) + } + + /// Roll a resolved decision back to open. Used when `resume_with_prompt` + /// fails to deliver the envelope — the human's pick should not be lost + /// silently, so we re-arm the decision and surface the error. + pub fn revert_decision_to_open(&self, id: &str) -> Result<()> { + let conn = self.conn.lock().unwrap(); + conn.execute( + "UPDATE decisions + SET status = 'open', + picked_key = NULL, + picked_note = NULL, + resolved_at = NULL + WHERE id = ?1 AND status = 'resolved'", + params![id], + )?; + Ok(()) + } +} + +fn row_to_decision(row: &rusqlite::Row) -> rusqlite::Result { + let status_str: String = row.get(6)?; + let status = DecisionStatus::parse(&status_str).ok_or_else(|| { + rusqlite::Error::FromSqlConversionFailure( + 6, + rusqlite::types::Type::Text, + format!("invalid decision status: {status_str}").into(), + ) + })?; + Ok(DecisionRow { + id: row.get(0)?, + workspace_id: row.get(1)?, + channel_id: row.get(2)?, + agent_id: row.get(3)?, + session_id: row.get(4)?, + created_at: row.get(5)?, + status, + payload_json: row.get(7)?, + picked_key: row.get(8)?, + picked_note: row.get(9)?, + resolved_at: row.get(10)?, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::store::Store; + use tempfile::TempDir; + + fn fresh_store() -> (TempDir, Store) { + let tmp = TempDir::new().unwrap(); + let db_path = tmp.path().join("chorus.db"); + let store = Store::open(db_path.to_str().unwrap()).unwrap(); + (tmp, store) + } + + fn seed_agent_and_workspace(store: &Store) -> (String, String) { + // Minimal seed so the FK on agent_id is satisfied. + let conn = store.conn.lock().unwrap(); + let workspace_id = "ws-1".to_string(); + conn.execute( + "INSERT INTO workspaces (id, name, slug) VALUES (?1, 'test', 'test')", + params![workspace_id], + ) + .unwrap(); + let agent_id = "agent-1".to_string(); + conn.execute( + "INSERT INTO agents (id, workspace_id, name, display_name, runtime, model) + VALUES (?1, ?2, 'bot', 'Bot', 'claude', 'sonnet')", + params![agent_id, workspace_id], + ) + .unwrap(); + (workspace_id, agent_id) + } + + #[test] + fn create_and_get_round_trip() { + let (_tmp, store) = fresh_store(); + let (ws, ag) = seed_agent_and_workspace(&store); + store + .create_decision("d1", &ws, "ch1", &ag, "sess1", r#"{"k":"v"}"#) + .unwrap(); + let row = store.get_decision("d1").unwrap().unwrap(); + assert_eq!(row.id, "d1"); + assert_eq!(row.status, DecisionStatus::Open); + assert_eq!(row.payload_json, r#"{"k":"v"}"#); + assert!(row.picked_key.is_none()); + } + + #[test] + fn list_filters_by_status() { + let (_tmp, store) = fresh_store(); + let (ws, ag) = seed_agent_and_workspace(&store); + store + .create_decision("a", &ws, "c", &ag, "s", "{}") + .unwrap(); + store + .create_decision("b", &ws, "c", &ag, "s", "{}") + .unwrap(); + assert!(store.resolve_decision_cas("a", "X", None).unwrap()); + let open = store + .list_decisions(&ws, Some(DecisionStatus::Open)) + .unwrap(); + assert_eq!(open.len(), 1); + assert_eq!(open[0].id, "b"); + let resolved = store + .list_decisions(&ws, Some(DecisionStatus::Resolved)) + .unwrap(); + assert_eq!(resolved.len(), 1); + assert_eq!(resolved[0].id, "a"); + let all = store.list_decisions(&ws, None).unwrap(); + assert_eq!(all.len(), 2); + } + + #[test] + fn cas_returns_false_on_double_resolve() { + let (_tmp, store) = fresh_store(); + let (ws, ag) = seed_agent_and_workspace(&store); + store + .create_decision("d1", &ws, "c", &ag, "s", "{}") + .unwrap(); + assert!(store.resolve_decision_cas("d1", "A", None).unwrap()); + // Second pick must fail without erroring. + assert!(!store.resolve_decision_cas("d1", "B", None).unwrap()); + let row = store.get_decision("d1").unwrap().unwrap(); + assert_eq!(row.picked_key.as_deref(), Some("A")); + } + + #[test] + fn revert_reopens_resolved_row() { + let (_tmp, store) = fresh_store(); + let (ws, ag) = seed_agent_and_workspace(&store); + store + .create_decision("d1", &ws, "c", &ag, "s", "{}") + .unwrap(); + store.resolve_decision_cas("d1", "A", Some("note")).unwrap(); + store.revert_decision_to_open("d1").unwrap(); + let row = store.get_decision("d1").unwrap().unwrap(); + assert_eq!(row.status, DecisionStatus::Open); + assert!(row.picked_key.is_none()); + assert!(row.picked_note.is_none()); + assert!(row.resolved_at.is_none()); + // CAS must work again after revert. + assert!(store.resolve_decision_cas("d1", "B", None).unwrap()); + } +} diff --git a/src/store/mod.rs b/src/store/mod.rs index b471affb..d89dc566 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -1,6 +1,7 @@ pub mod agents; pub mod attachments; pub mod channels; +pub mod decisions; pub mod humans; pub mod inbox; pub mod messages; @@ -24,6 +25,7 @@ pub use agents::AgentRecordUpsert; pub use agents::{Agent, AgentEnvVar}; pub use attachments::Attachment; pub use channels::{Channel, ChannelListParams, ChannelMember, ChannelMemberProfile, ChannelType}; +pub use decisions::{DecisionRow, DecisionStatus}; pub use humans::Human; pub use inbox::{InboxConversationNotificationView, InboxConversationStateView}; pub use messages::{ diff --git a/src/store/schema.sql b/src/store/schema.sql index 088f27f7..26c2c072 100644 --- a/src/store/schema.sql +++ b/src/store/schema.sql @@ -259,3 +259,26 @@ CREATE TABLE IF NOT EXISTS agent_sessions ( ); CREATE INDEX IF NOT EXISTS idx_agent_sessions_agent_active ON agent_sessions(agent_id, is_active); + +-- Decisions emitted by agents via chorus_create_decision MCP tool. +-- One row per emission. Status transitions Open → Resolved are CAS-protected +-- (UPDATE WHERE status='open') so two simultaneous picks can't both succeed. +-- Stores the full payload as JSON in TEXT (SQLite has no JSONB); shape is +-- enforced at the validator boundary, not the schema. +CREATE TABLE IF NOT EXISTS decisions ( + id TEXT PRIMARY KEY NOT NULL, + workspace_id TEXT NOT NULL, + channel_id TEXT NOT NULL, + agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE, + session_id TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + status TEXT NOT NULL CHECK(status IN ('open','resolved')), + payload_json TEXT NOT NULL, + picked_key TEXT, + picked_note TEXT, + resolved_at TEXT +); +CREATE INDEX IF NOT EXISTS idx_decisions_workspace_status + ON decisions(workspace_id, status, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_decisions_agent_status + ON decisions(agent_id, status); diff --git a/tests/server_tests.rs b/tests/server_tests.rs index 6962c3da..cd560ed1 100644 --- a/tests/server_tests.rs +++ b/tests/server_tests.rs @@ -110,6 +110,14 @@ struct MockLifecycle { /// Tracks which agents are currently "running" so that process_state, /// start_agent, and stop_agent share one source of truth. running: Mutex>, + /// Records every (agent_name, envelope) pair delivered via + /// `resume_with_prompt`. Decision-inbox round-trip tests assert the + /// envelope reached the agent. + resumed_with: Mutex>, + /// Per-agent channel id returned by `run_channel_id`. Tests preset + /// this to simulate the trace_store's "current channel for current + /// run" record without spinning a real AgentManager. + run_channels: Mutex>, } struct MockRuntimeStatusProvider { @@ -165,6 +173,19 @@ impl MockLifecycle { fn mark_running(&self, agent_name: &str) { self.running.lock().unwrap().insert(agent_name.to_string()); } + + /// Pre-populate the run-channel mapping so handlers that call + /// `lifecycle.run_channel_id(agent)` see this value during the test. + fn set_run_channel(&self, agent_name: &str, channel_id: &str) { + self.run_channels + .lock() + .unwrap() + .insert(agent_name.to_string(), channel_id.to_string()); + } + + fn resumed_calls(&self) -> Vec<(String, String)> { + self.resumed_with.lock().unwrap().clone() + } } impl AgentLifecycle for MockLifecycle { @@ -234,6 +255,28 @@ impl AgentLifecycle for MockLifecycle { fn get_all_agent_activity_states(&self) -> Vec<(String, String, String)> { activity_log::all_activity_states(&self.activity_logs) } + + fn run_channel_id<'a>( + &'a self, + agent_name: &'a str, + ) -> Pin> + Send + 'a>> { + let id = self.run_channels.lock().unwrap().get(agent_name).cloned(); + Box::pin(async move { id }) + } + + fn resume_with_prompt<'a>( + &'a self, + agent_name: &'a str, + envelope: String, + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + self.resumed_with + .lock() + .unwrap() + .push((agent_name.to_string(), envelope)); + Ok(()) + }) + } } impl AgentLifecycle for FailStartLifecycle { @@ -3512,3 +3555,300 @@ async fn test_manual_restart_does_not_fire_intro_directive() { "restart must not pass an init directive — only first-time creation does" ); } + +// ────────────────────────────────────────────────────────────────────────── +// Decision Inbox e2e +// ────────────────────────────────────────────────────────────────────────── + +/// Round-trip: agent emits a decision via the bridge endpoint, human picks +/// an option via the public API, the resume envelope reaches the agent +/// via `lifecycle.resume_with_prompt` with the picked option's body and +/// the original headline + question + human note inlined. +#[tokio::test] +async fn decision_round_trip_agent_creates_human_resolves_agent_resumed() { + let (store, app, lifecycle) = setup_with_lifecycle(); + + // Channel for inference. Bot1 must be in an active run with this + // channel set, otherwise the create endpoint correctly 400s. + let channel = store + .create_channel( + "engineering", + Some("Engineering"), + ChannelType::Channel, + None, + ) + .unwrap(); + join_channel_silent(&store, "engineering", "bot1", "agent"); + lifecycle.set_run_channel("bot1", &channel); + + // 1. agent → bridge → POST /internal/agent/bot1/decisions + let create_body = serde_json::json!({ + "headline": "PR #120 retro: archived-channel del/join", + "question": "Was the merge the right call, or should we revert?", + "options": [ + {"key": "A", "label": "Keep the merge", "body": "The merge stands. Tests pass."}, + {"key": "B", "label": "Revert and add tests", "body": "Revert, add the integration test, re-merge."}, + ], + "recommended_key": "A", + "context": "## Why now\nUser asked.\n", + }); + let resp = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/internal/agent/bot1/decisions") + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&create_body).unwrap())) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK, "decision create must 200"); + let body = axum::body::to_bytes(resp.into_body(), 1_000_000) + .await + .unwrap(); + let created: serde_json::Value = serde_json::from_slice(&body).unwrap(); + let decision_id = created["decision_id"].as_str().unwrap().to_string(); + assert_eq!(created["channel_id"].as_str().unwrap(), channel); + + // 2. human → GET /api/decisions?status=open returns the new row. + let resp = app + .clone() + .oneshot( + Request::builder() + .uri("/api/decisions?status=open") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body = axum::body::to_bytes(resp.into_body(), 1_000_000) + .await + .unwrap(); + let listed: serde_json::Value = serde_json::from_slice(&body).unwrap(); + let decisions = listed["decisions"].as_array().unwrap(); + assert_eq!(decisions.len(), 1); + assert_eq!(decisions[0]["id"].as_str().unwrap(), decision_id); + assert_eq!(decisions[0]["agent_name"].as_str().unwrap(), "bot1"); + assert_eq!( + decisions[0]["channel_name"].as_str().unwrap(), + "engineering" + ); + + // 3. human → POST /api/decisions/{id}/resolve picks B with a note. + let resolve_body = serde_json::json!({"picked_key": "B", "note": "needs tests first"}); + let resp = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/decisions/{decision_id}/resolve")) + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&resolve_body).unwrap())) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK, "resolve must 200"); + + // 4. resume_with_prompt fired with the right payload. + let calls = lifecycle.resumed_calls(); + assert_eq!( + calls.len(), + 1, + "resume_with_prompt must be called exactly once" + ); + let (agent, envelope) = &calls[0]; + assert_eq!(agent, "bot1"); + assert!( + envelope.contains("PR #120 retro"), + "envelope must include original headline; got: {envelope}" + ); + assert!(envelope.contains("Was the merge the right call")); + assert!(envelope.contains("Picked option (B): Revert and add tests")); + assert!(envelope.contains("Revert, add the integration test")); + assert!(envelope.contains("needs tests first")); + + // 5. The decision row is now resolved, so /open lists nothing. + let resp = app + .clone() + .oneshot( + Request::builder() + .uri("/api/decisions?status=open") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + let body = axum::body::to_bytes(resp.into_body(), 1_000_000) + .await + .unwrap(); + let listed: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(listed["decisions"].as_array().unwrap().len(), 0); +} + +#[tokio::test] +async fn decision_resolve_double_pick_returns_409() { + let (store, app, lifecycle) = setup_with_lifecycle(); + let channel = store + .create_channel("eng", Some("Eng"), ChannelType::Channel, None) + .unwrap(); + join_channel_silent(&store, "eng", "bot1", "agent"); + lifecycle.set_run_channel("bot1", &channel); + + let create_body = serde_json::json!({ + "headline": "h", + "question": "q", + "options": [ + {"key": "A", "label": "L", "body": "B"}, + {"key": "B", "label": "L2", "body": "B2"}, + ], + "recommended_key": "A", + }); + let resp = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/internal/agent/bot1/decisions") + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&create_body).unwrap())) + .unwrap(), + ) + .await + .unwrap(); + let body = axum::body::to_bytes(resp.into_body(), 1_000_000) + .await + .unwrap(); + let created: serde_json::Value = serde_json::from_slice(&body).unwrap(); + let decision_id = created["decision_id"].as_str().unwrap().to_string(); + + // First pick: 200 + let resp = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/decisions/{decision_id}/resolve")) + .header("content-type", "application/json") + .body(Body::from( + serde_json::to_vec(&serde_json::json!({"picked_key": "A"})).unwrap(), + )) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + // Second pick: 409 — CAS-protected. + let resp = app + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/decisions/{decision_id}/resolve")) + .header("content-type", "application/json") + .body(Body::from( + serde_json::to_vec(&serde_json::json!({"picked_key": "B"})).unwrap(), + )) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::CONFLICT, + "second pick on a resolved decision must 409" + ); +} + +#[tokio::test] +async fn decision_create_without_active_channel_returns_400() { + // No set_run_channel — the channel-inference contract must fail + // loudly rather than silently routing to #all. + let (_store, app, _lifecycle) = setup_with_lifecycle(); + + let create_body = serde_json::json!({ + "headline": "h", + "question": "q", + "options": [ + {"key": "A", "label": "L", "body": "B"}, + {"key": "B", "label": "L2", "body": "B2"}, + ], + "recommended_key": "A", + }); + let resp = app + .oneshot( + Request::builder() + .method("POST") + .uri("/internal/agent/bot1/decisions") + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&create_body).unwrap())) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let body = axum::body::to_bytes(resp.into_body(), 1_000_000) + .await + .unwrap(); + let err: serde_json::Value = serde_json::from_slice(&body).unwrap(); + let msg = err["error"].as_str().unwrap(); + assert!( + msg.contains("active-run channel"), + "error must name the missing channel context; got: {msg}" + ); +} + +#[tokio::test] +async fn decision_resolve_unknown_picked_key_returns_400() { + let (store, app, lifecycle) = setup_with_lifecycle(); + let channel = store + .create_channel("eng", Some("Eng"), ChannelType::Channel, None) + .unwrap(); + join_channel_silent(&store, "eng", "bot1", "agent"); + lifecycle.set_run_channel("bot1", &channel); + + let create_body = serde_json::json!({ + "headline": "h", + "question": "q", + "options": [ + {"key": "A", "label": "L", "body": "B"}, + {"key": "B", "label": "L2", "body": "B2"}, + ], + "recommended_key": "A", + }); + let resp = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/internal/agent/bot1/decisions") + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&create_body).unwrap())) + .unwrap(), + ) + .await + .unwrap(); + let body = axum::body::to_bytes(resp.into_body(), 1_000_000) + .await + .unwrap(); + let created: serde_json::Value = serde_json::from_slice(&body).unwrap(); + let decision_id = created["decision_id"].as_str().unwrap().to_string(); + + let resp = app + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/decisions/{decision_id}/resolve")) + .header("content-type", "application/json") + .body(Body::from( + serde_json::to_vec(&serde_json::json!({"picked_key": "Z"})).unwrap(), + )) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); +} diff --git a/ui/src/components/decisions/DecisionsInbox.tsx b/ui/src/components/decisions/DecisionsInbox.tsx new file mode 100644 index 00000000..6291b53f --- /dev/null +++ b/ui/src/components/decisions/DecisionsInbox.tsx @@ -0,0 +1,348 @@ +import { useCallback, useEffect, useMemo, useState } from 'react' +import { + type DecisionStatusFilter, + type DecisionView, + listDecisions, + resolveDecision, +} from '../../data/decisions' + +const POLL_INTERVAL_MS = 5_000 + +export function DecisionsInbox(): JSX.Element { + const [filter, setFilter] = useState('open') + const [decisions, setDecisions] = useState([]) + const [focused, setFocused] = useState(null) + const [error, setError] = useState(null) + const [picking, setPicking] = useState(null) + const [note, setNote] = useState('') + + const refresh = useCallback(async () => { + try { + const r = await listDecisions(filter) + setDecisions(r.decisions) + setError(null) + // If the focused decision dropped off the list (e.g., resolved by + // someone else), clear the focus so the right pane goes back to its + // empty state. + if (focused && !r.decisions.find((d) => d.id === focused)) { + setFocused(null) + } + } catch (e) { + setError(e instanceof Error ? e.message : String(e)) + } + }, [filter, focused]) + + useEffect(() => { + void refresh() + const id = setInterval(() => void refresh(), POLL_INTERVAL_MS) + return () => clearInterval(id) + }, [refresh]) + + const focusedDecision = useMemo( + () => decisions.find((d) => d.id === focused) ?? null, + [decisions, focused], + ) + + async function pickOption(decisionId: string, optionKey: string) { + setPicking(decisionId + ':' + optionKey) + try { + await resolveDecision(decisionId, optionKey, note.trim() || undefined) + setNote('') + await refresh() + } catch (e) { + setError(e instanceof Error ? e.message : String(e)) + } finally { + setPicking(null) + } + } + + return ( +
+
+

Decision Inbox

+
+ {(['open', 'resolved', 'all'] as DecisionStatusFilter[]).map((s) => ( + + ))} +
+
+ + {error &&
{error}
} + +
+
    + {decisions.length === 0 && ( +
  • No {filter === 'all' ? '' : filter} decisions.
  • + )} + {decisions.map((d) => ( +
  • setFocused(d.id)} + style={{ + ...styles.row, + ...(focused === d.id ? styles.rowActive : {}), + }} + > +
    {d.payload.headline || '(no headline)'}
    +
    + {d.agent_name} · #{d.channel_name} · {d.status} +
    +
  • + ))} +
+ +
+ {!focusedDecision ? ( +
Select a decision on the left.
+ ) : ( + void pickOption(focusedDecision.id, key)} + picking={picking} + /> + )} +
+
+
+ ) +} + +interface DecisionDetailProps { + decision: DecisionView + note: string + onChangeNote: (s: string) => void + onPick: (key: string) => void + picking: string | null +} + +function DecisionDetail(props: DecisionDetailProps) { + const { decision, note, onChangeNote, onPick, picking } = props + const { payload } = decision + const isResolved = decision.status === 'resolved' + + return ( +
+

{payload.headline}

+

{payload.question}

+
+ From {decision.agent_name} in #{decision.channel_name} ·{' '} + {decision.created_at} +
+ +
+ {payload.options.map((opt) => { + const recommended = opt.key === payload.recommended_key + const pickedThisOne = decision.picked_key === opt.key + const id = decision.id + ':' + opt.key + return ( +
+
+ {opt.key} + {opt.label} + {recommended && recommended} +
+
{opt.body}
+ {!isResolved && ( + + )} +
+ ) + })} +
+ + {!isResolved && ( +
+ +