diff --git a/src/agents/managed.test.ts b/src/agents/managed.test.ts new file mode 100644 index 0000000..c6d9d74 --- /dev/null +++ b/src/agents/managed.test.ts @@ -0,0 +1,137 @@ +import { test, describe } from "node:test"; +import assert from "node:assert/strict"; +import type Anthropic from "@anthropic-ai/sdk"; +import { ManagedRegistry } from "./managed.js"; +import type { Provider, ProviderResult, ProviderUsage } from "../providers/types.js"; +import type { ToolDefinition } from "../types.js"; + +let idN = 0; +function usage(o: Partial = {}): ProviderUsage { + return { inputTokens: 0, outputTokens: 0, cacheReadTokens: 0, cacheWriteTokens: 0, ...o }; +} +function toolUse(name: string, input: unknown = {}): Anthropic.ContentBlock { + return { type: "tool_use", id: "t" + ++idN, name, input } as Anthropic.ContentBlock; +} +function text(t: string): Anthropic.ContentBlock { + return { type: "text", text: t } as Anthropic.ContentBlock; +} +function turn(content: Anthropic.ContentBlock[], stopReason: string): ProviderResult { + return { content, stopReason, usage: usage() }; +} +function scripted(queue: ProviderResult[]): Provider { + let i = 0; + return { + name: "fake", + async runTurn() { + if (i < queue.length) return queue[i++]!; + throw new Error("scripted provider drained"); + }, + }; +} +const editTool: ToolDefinition = { + name: "edit", // in DEFAULT_MUTATING_TOOLS → triggers approval-pause + description: "edit a file", + inputSchema: { type: "object" } as Anthropic.Tool.InputSchema, + async execute() { return "edited"; }, +}; + +function waitFor(fn: () => boolean, ms = 3000): Promise { + return new Promise((resolve, reject) => { + const t0 = Date.now(); + const iv = setInterval(() => { + if (fn()) { clearInterval(iv); resolve(); } + else if (Date.now() - t0 > ms) { clearInterval(iv); reject(new Error("waitFor timeout")); } + }, 10); + }); +} + +describe("ManagedAgent — approval-paused tool flow", () => { + test("pauses on a mutating tool, resumes on approve, records activity", async () => { + const reg = new ManagedRegistry(); + const v = reg.start({ + task: "fix it", + cwd: "/tmp/proj", + systemPrompt: "sys", + tools: [editTool], + provider: scripted([ + turn([toolUse("edit", { file_path: "foo.ts" })], "tool_use"), + turn([text("done")], "end_turn"), + ]), + }); + const a = reg.get(v.id)!; + + await waitFor(() => a.view().pending?.tool === "edit"); + assert.equal(a.view().state, "waiting"); + assert.equal(a.view().stateReason, "permission"); + + assert.equal(reg.decide(v.id, true), true); + await waitFor(() => { const x = a.view(); return x.state === "waiting" && !x.pending; }); + + const view = a.view(); + assert.ok(view.lastTools.includes("edit"), "tool recorded"); + assert.ok(view.filesTouched.includes("foo.ts"), "file recorded"); + assert.equal(view.lastText, "done"); + assert.ok(view.turnCount >= 1); + reg.cancel(v.id); + }); + + test("deny lets the agent continue (tool not run)", async () => { + let ran = false; + const reg = new ManagedRegistry(); + const v = reg.start({ + task: "x", + cwd: "/tmp", + systemPrompt: "sys", + tools: [{ ...editTool, async execute() { ran = true; return "ran"; } }], + provider: scripted([turn([toolUse("edit", {})], "tool_use"), turn([text("ok")], "end_turn")]), + }); + const a = reg.get(v.id)!; + await waitFor(() => !!a.view().pending); + reg.decide(v.id, false); + await waitFor(() => a.view().state === "waiting" && !a.view().pending); + assert.equal(ran, false, "denied tool must not execute"); + reg.cancel(v.id); + }); +}); + +describe("ManagedAgent — follow-ups + cancel", () => { + test("send injects a follow-up that continues the agent", async () => { + const reg = new ManagedRegistry(); + const v = reg.start({ + task: "first", + cwd: "/tmp", + systemPrompt: "sys", + tools: [], + provider: scripted([turn([text("first-done")], "end_turn"), turn([text("second-done")], "end_turn")]), + }); + const a = reg.get(v.id)!; + await waitFor(() => a.view().state === "waiting" && a.view().lastText === "first-done"); + reg.send(v.id, "now do more"); + await waitFor(() => a.view().lastText === "second-done"); + reg.cancel(v.id); + }); + + test("cancel stops the agent (state done)", async () => { + const reg = new ManagedRegistry(); + const v = reg.start({ + task: "x", + cwd: "/tmp", + systemPrompt: "sys", + tools: [], + provider: scripted([turn([text("idle")], "end_turn")]), + }); + const a = reg.get(v.id)!; + await waitFor(() => a.view().state === "waiting"); + assert.equal(reg.cancel(v.id), true); + await waitFor(() => a.view().state === "done"); + reg.cancel(v.id); // idempotent — stays done, no throw + assert.equal(a.view().state, "done"); + }); + + test("send/decide/cancel on an unknown id → false", () => { + const reg = new ManagedRegistry(); + assert.equal(reg.send("nope", "x"), false); + assert.equal(reg.decide("nope", true), false); + assert.equal(reg.cancel("nope"), false); + }); +}); diff --git a/src/agents/managed.ts b/src/agents/managed.ts new file mode 100644 index 0000000..4009f70 --- /dev/null +++ b/src/agents/managed.ts @@ -0,0 +1,287 @@ +/** + * Managed agents — the controllable unit of the agent control plane. + * + * Unlike an externally-started CLI (observe + cancel only — different identity + * space, no control channel), a managed agent is one LISA runs itself by looping + * `runAgent`: it has a stable id, streams live progress, takes follow-up + * commands, pauses on every mutating tool for the user's approve/deny, and + * cancels cleanly. This is how the user "sends commands to a running agent." + * + * Tool policy (per product decision): FULL tools, but every mutating call + * (DEFAULT_MUTATING_TOOLS / DEFAULT_MUTATING_ACTIONS) blocks on a UI decision — + * so an unattended managed agent can't write/exec without explicit approval. + */ +import { EventEmitter } from "node:events"; +import { runAgent } from "../agent.js"; +import { DEFAULT_MODEL } from "../llm.js"; +import { providerForModel } from "../providers/registry.js"; +import { + isMutatingCall, + DEFAULT_MUTATING_TOOLS, + DEFAULT_MUTATING_ACTIONS, + type ApprovalConfig, +} from "../approval.js"; +import type { Provider } from "../providers/types.js"; +import type { AgentEvent, StoredMessage, ToolDefinition } from "../types.js"; + +export type ManagedState = "working" | "waiting" | "error" | "done"; + +/** Public, serializable snapshot of a managed agent. */ +export interface ManagedView { + id: string; + project: string; + cwd: string; + model: string; + state: ManagedState; + stateReason: string; + lastMtime: number; + turnCount: number; + tokens: { input: number; output: number }; + lastTools: string[]; + filesTouched: string[]; + lastText: string; + /** Set while paused awaiting approve/deny of a mutating tool. */ + pending?: { tool: string }; +} + +export interface ManagedStartOpts { + task: string; + cwd: string; + systemPrompt: string; + tools: ToolDefinition[]; + model?: string; + /** Injectable provider (tests); defaults to providerForModel(model). */ + provider?: Provider; + /** Override the clock (tests). */ + now?: () => number; +} + +const MAX_TOOLS = 6; +const MAX_FILES = 10; +const APPROVAL_CFG: ApprovalConfig = { + mode: "ask-mutating", + mutatingTools: DEFAULT_MUTATING_TOOLS, + mutatingActions: DEFAULT_MUTATING_ACTIONS, +}; + +let counter = 0; + +export class ManagedAgent { + readonly id: string; + readonly cwd: string; + readonly project: string; + readonly model: string; + state: ManagedState = "working"; + stateReason = "starting"; + lastMtime: number; + turnCount = 0; + private tokensIn = 0; + private tokensOut = 0; + lastTools: string[] = []; + filesTouched: string[] = []; + lastText = ""; + pending?: { tool: string; resolve: (allow: boolean) => void }; + /** Set by the registry to broadcast snapshots. */ + onChange: () => void = () => {}; + + private history: StoredMessage[] = []; + private readonly tools: ToolDefinition[]; + private readonly systemPrompt: string; + private readonly provider: Provider; + private readonly ac = new AbortController(); + private readonly now: () => number; + private queue: string[] = []; + private wake?: () => void; + + constructor(opts: ManagedStartOpts) { + this.id = "m" + (++counter).toString(36) + "-" + this.now0().toString(36).slice(-4); + this.cwd = opts.cwd; + this.project = opts.cwd.split("/").filter(Boolean).pop() || opts.cwd; + this.model = opts.model ?? DEFAULT_MODEL; + this.provider = opts.provider ?? providerForModel(this.model); + this.tools = opts.tools; + this.systemPrompt = opts.systemPrompt; + this.now = opts.now ?? Date.now; + this.lastMtime = this.now(); + this.queue.push(opts.task); + void this.runLoop(); + } + + // Date.now indirection only for the id seed (constructor runs before this.now set). + private now0(): number { + return Date.now(); + } + + /** Queue a follow-up command; wakes the loop if it's idle. */ + send(text: string): void { + if (this.ac.signal.aborted) return; + this.queue.push(text); + this.wake?.(); + } + + /** Resolve a pending approve/deny. Returns false if nothing is pending. */ + decide(allow: boolean): boolean { + if (!this.pending) return false; + const p = this.pending; + this.setState("working", "running"); + p.resolve(allow); + return true; + } + + /** Abort the run; unblocks any pending tool/input. */ + cancel(): void { + if (this.ac.signal.aborted) return; + this.ac.abort(); + this.pending?.resolve(false); + this.wake?.(); + this.setState("done", "cancelled"); + } + + view(): ManagedView { + return { + id: this.id, + project: this.project, + cwd: this.cwd, + model: this.model, + state: this.state, + stateReason: this.stateReason, + lastMtime: this.lastMtime, + turnCount: this.turnCount, + tokens: { input: this.tokensIn, output: this.tokensOut }, + lastTools: [...this.lastTools], + filesTouched: [...this.filesTouched], + lastText: this.lastText, + ...(this.pending ? { pending: { tool: this.pending.tool } } : {}), + }; + } + + // ── internals ── + + private async runLoop(): Promise { + while (!this.ac.signal.aborted) { + const message = await this.nextInput(); + if (message === null) break; + this.setState("working", "running"); + try { + const result = await runAgent({ + provider: this.provider, + systemPrompt: this.systemPrompt, + tools: this.tools, + toolCtx: { cwd: this.cwd, signal: this.ac.signal, log: () => {} }, + history: this.history, + userMessage: message, + model: this.model, + maxIterations: 64, + onEvent: (e) => this.onAgentEvent(e), + approval: (tool, input) => this.approve(tool, input), + }); + this.history = result.history; + this.tokensIn += result.inputTokens; + this.tokensOut += result.outputTokens; + if (result.finalText) this.lastText = result.finalText; + this.setState("waiting", result.stopReason); + } catch (err) { + if (this.ac.signal.aborted) break; + this.setState("error", (err as Error).message.slice(0, 80)); + } + } + if (this.state !== "done") this.setState("done", "done"); + } + + /** Resolve with the next queued command, or null when cancelled. */ + private nextInput(): Promise { + if (this.ac.signal.aborted) return Promise.resolve(null); + if (this.queue.length) return Promise.resolve(this.queue.shift()!); + return new Promise((resolve) => { + this.wake = () => { + this.wake = undefined; + resolve(this.ac.signal.aborted ? null : this.queue.shift() ?? null); + }; + }); + } + + /** Approval callback: auto-allow safe tools; block mutating ones on a UI decision. */ + private approve(tool: string, input: unknown): Promise<{ allow: boolean; reason?: string }> { + if (!isMutatingCall(APPROVAL_CFG, tool, input)) return Promise.resolve({ allow: true }); + return new Promise((resolve) => { + this.pending = { + tool, + resolve: (allow) => { + this.pending = undefined; + resolve(allow ? { allow: true } : { allow: false, reason: "denied in Lisa" }); + }, + }; + this.setState("waiting", "permission"); + }); + } + + private onAgentEvent(e: AgentEvent): void { + if (e.type === "turn_start") this.turnCount++; + if (e.type === "tool_call_start") { + if (e.toolName) { + this.lastTools.push(e.toolName); + if (this.lastTools.length > MAX_TOOLS) this.lastTools.shift(); + } + const input = e.toolInput as Record | undefined; + const p = input && (input.file_path ?? input.path ?? input.filename); + if (typeof p === "string" && p) { + this.filesTouched.push(p); + if (this.filesTouched.length > MAX_FILES) this.filesTouched.shift(); + } + } + if (e.type === "text_delta" && e.text) this.lastText = (this.lastText + e.text).slice(-2000); + this.touch(); + } + + private setState(s: ManagedState, reason: string): void { + this.state = s; + this.stateReason = reason; + this.touch(); + } + + private touch(): void { + this.lastMtime = this.now(); + this.onChange(); + } +} + +/** Process-wide registry of managed agents; emits "update" with a ManagedView. */ +export class ManagedRegistry extends EventEmitter { + private agents = new Map(); + + start(opts: ManagedStartOpts): ManagedView { + const a = new ManagedAgent(opts); + a.onChange = () => this.emit("update", a.view()); + this.agents.set(a.id, a); + this.emit("update", a.view()); + return a.view(); + } + + send(id: string, text: string): boolean { + const a = this.agents.get(id); + if (!a) return false; + a.send(text); + return true; + } + + decide(id: string, allow: boolean): boolean { + return this.agents.get(id)?.decide(allow) ?? false; + } + + cancel(id: string): boolean { + const a = this.agents.get(id); + if (!a) return false; + a.cancel(); + return true; + } + + get(id: string): ManagedAgent | undefined { + return this.agents.get(id); + } + + list(): ManagedView[] { + return [...this.agents.values()].map((a) => a.view()); + } +} + +export const managedRegistry = new ManagedRegistry(); diff --git a/src/integrations/hub.ts b/src/integrations/hub.ts index 08b16e4..e21c63d 100644 --- a/src/integrations/hub.ts +++ b/src/integrations/hub.ts @@ -54,6 +54,10 @@ export const DEFAULT_ORCHESTRATOR_CONFIG: OrchestratorConfig = { // in with `{ "enabled": true, "pin": ["slug-a"] }`. Needs a TAKO_KEY to make // any calls, so it's empty until you delegate to a remote agent. takoapi: { enabled: false }, + // Managed: LISA's OWN controllable agents (start/send/cancel/approve). On by + // default — it just reflects the in-process registry (empty until you start + // one), so it adds nothing at rest. + managed: { enabled: true }, }, visibility: "activity", }; diff --git a/src/integrations/managed/observer.ts b/src/integrations/managed/observer.ts new file mode 100644 index 0000000..b0bfaa8 --- /dev/null +++ b/src/integrations/managed/observer.ts @@ -0,0 +1,51 @@ +/** + * Managed-agent observer — surfaces LISA's own managed agents in the + * orchestrator hub alongside observed CLIs, so they appear in the roster + * (island / GUI / menu bar) with live progress and a pendingPermission flag. + * Reads the in-process managedRegistry; emits on its "update" event. + */ +import { EventEmitter } from "node:events"; +import { registerIntegration } from "../registry.js"; +import { managedRegistry, type ManagedView } from "../../agents/managed.js"; +import type { AgentIntegrationConfig, AgentObserver, AgentSession } from "../types.js"; + +function toSession(v: ManagedView): AgentSession { + return { + agent: "managed", + sessionId: v.id, + project: v.project, + cwd: v.cwd, + state: v.state, + stateReason: v.stateReason, + lastMtime: v.lastMtime, + activity: { + turnCount: v.turnCount, + lastTools: v.lastTools, + filesTouched: v.filesTouched, + tokens: v.tokens, + ...(v.pending ? { pendingPermission: v.pending.tool } : {}), + }, + }; +} + +export class ManagedObserver extends EventEmitter implements AgentObserver { + readonly agent = "managed"; + private emitFn: ((s: AgentSession) => void) | null = null; + private readonly onUpdate = (v: ManagedView) => this.emitFn?.(toSession(v)); + + async start(emit: (s: AgentSession) => void): Promise { + this.emitFn = emit; + managedRegistry.on("update", this.onUpdate); + } + + list(): AgentSession[] { + return managedRegistry.list().map(toSession); + } + + async stop(): Promise { + managedRegistry.off("update", this.onUpdate); + this.emitFn = null; + } +} + +registerIntegration("managed", (_cfg: AgentIntegrationConfig) => new ManagedObserver()); diff --git a/src/integrations/registry.ts b/src/integrations/registry.ts index 50d58c4..5f9e7d7 100644 --- a/src/integrations/registry.ts +++ b/src/integrations/registry.ts @@ -60,4 +60,5 @@ export async function registerBuiltinIntegrations(): Promise { await import("./git/observer.js"); await import("./shell/observer.js"); await import("./takoapi/observer.js"); + await import("./managed/observer.js"); } diff --git a/src/web/server.ts b/src/web/server.ts index 9316219..0a403ed 100644 --- a/src/web/server.ts +++ b/src/web/server.ts @@ -34,6 +34,7 @@ import { transcribeAudio } from "../voice/transcribe.js"; import { polishDictation, type DictationProvider } from "../voice/dictation.js"; import { listGrants, grant, revoke, revokeAll, isGranted, SENSE_SIGNALS, SIGNAL_DESCRIPTIONS } from "../consent/store.js"; import { signalAgentTool } from "../tools/signal_agent.js"; +import { managedRegistry } from "../agents/managed.js"; import { SenseService } from "../sense/service.js"; import { ScreenSource } from "../sense/screen.js"; import { VoiceSource } from "../sense/voice.js"; @@ -767,6 +768,61 @@ export async function startWebServer(opts: WebServerOptions): Promise t.name !== "dispatch_agent" && t.name !== "signal_agent"); + const systemPrompt = + `You are a delegated agent working in ${cwd}, launched by the user through Lisa. ` + + `Complete the user's task using the available tools, then report what you did concisely. ` + + `Mutating actions (writes, shell, etc.) pause for the user's approval — keep going after each decision.`; + const view = managedRegistry.start({ + task: payload.task, + cwd, + systemPrompt, + tools, + model: typeof payload.model === "string" ? payload.model : opts.model, + }); + res.writeHead(200, { "content-type": "application/json" }); + res.end(JSON.stringify({ ok: true, agent: view })); + return; + } + if (req.method === "POST" && url.startsWith("/api/agents/managed/")) { + const rest = url.slice("/api/agents/managed/".length); + const slash = rest.indexOf("/"); + const id = slash >= 0 ? rest.slice(0, slash) : rest; + const action = slash >= 0 ? rest.slice(slash + 1) : ""; + let mBody = ""; + for await (const chunk of req) mBody += chunk.toString("utf8"); + let payload: { text?: unknown; allow?: unknown } = {}; + try { payload = mBody ? JSON.parse(mBody) : {}; } catch { /* tolerate empty/none */ } + let ok = false; + if (action === "send" && typeof payload.text === "string") ok = managedRegistry.send(id, payload.text); + else if (action === "cancel") ok = managedRegistry.cancel(id); + else if (action === "approve") ok = managedRegistry.decide(id, payload.allow !== false); + else { + res.writeHead(400, { "content-type": "text/plain" }); + res.end("action must be send|cancel|approve"); + return; + } + res.writeHead(ok ? 200 : 404, { "content-type": "application/json" }); + res.end(JSON.stringify({ ok })); + return; + } + // Recent ambient sense events, for the island's "recently sensed" list. if (req.method === "GET" && url === "/api/sense/recent") { const events = readSenseEvents().slice(-30).reverse(); // newest first