diff --git a/docs/pipe-master-slave-complete.md b/docs/pipe-master-slave-complete.md new file mode 100644 index 0000000..0d0cd80 --- /dev/null +++ b/docs/pipe-master-slave-complete.md @@ -0,0 +1,575 @@ +# Master-Slave CLI 通信系统 — 完整文档 + +> 版本:1.0 +> 日期:2026-04-05 +> 分支:`claude/agent-teams-communication-YT18t` + +--- + +## 目录 + +1. [需求文档](#1-需求文档) +2. [设计文档](#2-设计文档) +3. [实现详情](#3-实现详情) +4. [文件清单与修改说明](#4-文件清单与修改说明) +5. [测试验证](#5-测试验证) + +--- + +## 1. 需求文档 + +### 1.1 背景 + +Claude Code CLI 目前每个终端实例是完全独立的。用户希望在多个终端之间建立通信机制,使得一个"主 CLI"可以控制和监控多个"从 CLI",形成类似调度中心的工作模式。 + +### 1.2 核心需求 + +| 编号 | 需求 | 优先级 | +|------|------|--------| +| R1 | 每个 CLI 默认是**完全独立**的,没有任何特殊行为 | 必须 | +| R2 | 主 CLI 通过 `/attach` 命令连接到从 CLI,成为**控制中心/监视器** | 必须 | +| R3 | 主 CLI 的自身会话/命令**保持完全正常**,不被劫持 | 必须 | +| R4 | 主 CLI 可以同时连接**多个**从 CLI | 必须 | +| R5 | 从 CLI 是**自主工作者**,独立处理 AI 查询和工具调用 | 必须 | +| R6 | 从 CLI 的命令和对话**只对自己生效** | 必须 | +| R7 | 从 CLI 被 attach 后**自动上报所有会话数据**给主 CLI | 必须 | +| R8 | 上报数据包括:用户输入 + AI 回复 + 工具调用结果 | 必须 | +| R9 | 主 CLI 可以审查从 CLI 的完整会话历史 | 必须 | +| R10 | 主 CLI 可以向从 CLI **发送任务**(注入 prompt) | 必须 | +| R11 | 断开连接后,双方都**恢复为独立模式** | 必须 | +| R12 | 必须是**真正独立的 CLI 进程**,不是模拟 | 必须 | + +### 1.3 命令列表 + +| 命令 | 说明 | 角色 | +|------|------|------| +| `/attach ` | 连接到从 CLI,开始接收会话报告 | 主 | +| `/detach [name]` | 断开一个从 CLI(无参数则断开全部) | 主 | +| `/pipes` | 发现所有可用的 CLI 管道 | 任意 | +| `/send ` | 向从 CLI 注入一条 prompt | 主 | +| `/history ` | 查看从 CLI 的完整会话记录 | 主 | +| `/pipe-status` | 查看所有已连接从 CLI 的状态概览 | 任意 | + +### 1.4 角色状态机 + +``` + /attach 被 attach +standalone ──────► master standalone ──────► slave + ▲ | ▲ | + │ /detach │ │ detach │ + │ (最后一个) │ │ │ + └────────────────┘ └────────────────┘ +``` + +- **standalone**(默认):完全独立,无特殊行为 +- **master**:已连接一个或多个 slave,接收会话数据 +- **slave**:被一个 master 控制,自动上报会话数据 + +--- + +## 2. 设计文档 + +### 2.1 整体架构 + +``` +┌─────────────────────┐ Unix Domain Socket ┌─────────────────────┐ +│ Master CLI │◄══════════════════════════════════► │ Slave CLI │ +│ │ ~/.claude/pipes/{name}.sock │ │ +│ ┌───────────────┐ │ │ ┌───────────────┐ │ +│ │ useMasterMon │ │ ◄── stream/tool/done/error ────── │ │ usePipeIpc │ │ +│ │ (接收存储) │ │ │ │ (自动上报) │ │ +│ └───────────────┘ │ ── prompt ──────────────────────► │ └───────────────┘ │ +│ │ │ │ +│ AppState.pipeIpc │ │ AppState.pipeIpc │ +│ role: 'master' │ │ role: 'slave' │ +│ slaves: { A, B } │ │ attachedBy: 'M' │ +└─────────────────────┘ └─────────────────────┘ +``` + +### 2.2 传输层 + +**协议**:NDJSON(换行分隔的 JSON),每条消息一行。 + +**Socket 路径**: +- Unix: `~/.claude/pipes/{name}.sock` +- Windows: `\\.\pipe\claude-code-{name}` + +**消息类型**: + +| 类型 | 方向 | 说明 | +|------|------|------| +| `ping` / `pong` | 双向 | 健康检查 | +| `attach_request` | M → S | 主请求附加 | +| `attach_accept` | S → M | 从接受附加 | +| `attach_reject` | S → M | 从拒绝(已被其他主控制) | +| `detach` | M → S | 主断开连接 | +| `prompt` | M → S | 主发送任务/提示 | +| `stream` | S → M | AI 输出流片段 | +| `tool_start` | S → M | 工具开始执行 | +| `tool_result` | S → M | 工具执行结果 | +| `done` | S → M | 一轮会话完成 | +| `error` | 双向 | 错误报告 | + +**消息格式**: +```typescript +type PipeMessage = { + type: PipeMessageType // 消息类型 + data?: string // 负载内容 + from?: string // 发送方管道名 + ts?: string // ISO 时间戳 + meta?: Record // 额外元数据 +} +``` + +### 2.3 状态模型 + +```typescript +// 添加到 AppState +type PipeIpcState = { + role: 'standalone' | 'master' | 'slave' + serverName: string | null // 本 CLI 的管道服务名 + slaves: Record // 主模式:已连接的从 CLI + attachedBy: string | null // 从模式:控制我的主 CLI +} + +type SlaveInfo = { + name: string + connectedAt: string // ISO 时间戳 + status: 'connected' | 'busy' | 'idle' + history: SessionEntry[] // 完整会话记录 +} + +type SessionEntry = { + type: 'prompt' | 'stream' | 'tool_start' | 'tool_result' | 'done' | 'error' + content: string + from: string + timestamp: string + meta?: Record +} +``` + +### 2.4 Attach 流程 + +``` +Master CLI Slave CLI + │ │ + │── /attach cli-abc12345 ────────────► │ + │ (PipeClient.connect) │ + │ │ + │── { type: "attach_request" } ──────► │ + │ │── 检查 role === 'standalone' + │◄── { type: "attach_accept" } ────────│── 设置 role = 'slave' + │── 设置 role = 'master' │ + │── 添加到 slaves map │ + │ │ + │◄═══════ 自动上报会话数据 ═══════════════│ (slave 发送所有 session 事件) +``` + +### 2.5 Send 流程 + +``` +Master CLI Slave CLI + │ │ + │── /send cli-abc12345 "任务" ────────► │ + │── { type: "prompt", data: "任务" } ──►│ + │ │── handleIncomingPrompt() + │ │── AI 处理任务 + │◄── { type: "stream", data: "..." } ──│ + │◄── { type: "tool_start", ... } ──────│ + │◄── { type: "tool_result", ... } ─────│ + │◄── { type: "done" } ─────────────────│ + │── 存入 slaves[name].history │ +``` + +### 2.6 Detach 流程 + +``` +Master CLI Slave CLI + │ │ + │── /detach cli-abc12345 ────────────► │ + │── { type: "detach" } ──────────────► │ + │ │── 设置 role = 'standalone' + │── 从 slaves map 删除 │── 停止自动上报 + │── (如无更多 slave → standalone) │ +``` + +--- + +## 3. 实现详情 + +### 3.1 传输层 — `src/utils/pipeTransport.ts` + +**状态**:已有,复用。 + +**类与函数**: + +| 组件 | 说明 | +|------|------| +| `PipeServer` | Unix socket 服务端,管理多个客户端连接。支持 `onMessage(handler)` 注册消息处理、`broadcast(msg)` 广播、`sendTo(socket, msg)` 定向发送 | +| `PipeClient` | 客户端,连接到远程 PipeServer。支持 `connect(timeout)` 自动重试(ENOENT 轮询)、`send(msg)`、`onMessage(handler)` | +| `createPipeServer(name)` | 工厂函数,创建并启动 PipeServer | +| `connectToPipe(target, sender, timeout)` | 工厂函数,创建 PipeClient 并连接 | +| `listPipes()` | 扫描 `~/.claude/pipes/` 目录,返回所有 `.sock` 文件名 | +| `isPipeAlive(name, timeout)` | 通过 ping/pong 检测管道是否存活 | + +**关键实现细节**: +- 使用 Node.js `net` 模块的 `createServer` / `createConnection` +- NDJSON 协议:每条消息是一行 JSON,用 `\n` 分隔 +- 缓冲区处理:`buffer += chunk; lines = buffer.split('\n'); buffer = lines.pop()` +- ENOENT 处理:连接前用 `fs.access()` 轮询等待 socket 文件存在 +- 清理机制:服务关闭时自动 `unlink` socket 文件 + +### 3.2 Slave 侧 Hook — `src/hooks/usePipeIpc.ts` + +**状态**:完全重写。 + +**核心逻辑**: + +``` +挂载时: + 1. 生成管道名: cli-{sessionId前8位} + 2. 创建 PipeServer 并监听 + 3. 设置 globalThis.__pipeSendToMaster 全局函数 + 4. 注册消息处理器: + - ping → pong + - attach_request → 检查角色 → accept/reject + - detach → 恢复 standalone + - prompt → handleIncomingPrompt 注入 +``` + +**`relayToMaster(msg)` 导出函数**: +- 由 REPL.tsx 的 `onQueryEvent` 调用 +- 通过 `globalThis.__pipeSendToMaster` 桥接,避免循环依赖 +- 仅在 `role === 'slave'` 时生效 + +**Master 断线处理**: +- 监听 master socket 的 `close` 事件 +- 自动恢复为 standalone + +### 3.3 Master 侧 Hook — `src/hooks/useMasterMonitor.ts` + +**状态**:新建。 + +**核心逻辑**: + +``` +当 role === 'master' 时: + 1. 遍历 _slaveClients Map 中所有 PipeClient + 2. 为每个 client 注册消息监听 + 3. 收到 stream/tool_start/tool_result/done/error → 存入 AppState.pipeIpc.slaves[name].history + 4. 更新 slave 状态: prompt → busy, done/error → idle + 5. 监听 slave disconnect → 自动从 slaves 删除 +``` + +**模块级 PipeClient 注册表**: + +| 函数 | 说明 | +|------|------| +| `addSlaveClient(name, client)` | 注册从 CLI 连接(由 /attach 调用) | +| `removeSlaveClient(name)` | 删除从 CLI 连接(由 /detach 调用) | +| `getSlaveClient(name)` | 获取指定从 CLI 连接(由 /send 调用) | +| `getAllSlaveClients()` | 获取所有连接(由 /pipe-status 调用) | + +### 3.4 命令实现 + +#### `/attach ` — `src/commands/attach/attach.ts` + +``` +1. 解析目标管道名 +2. 检查: 是否已连接该 slave?是否处于 slave 模式? +3. connectToPipe(target, myName) 建立连接 +4. 发送 attach_request,等待响应(5s 超时) +5. 收到 attach_accept: + - addSlaveClient(name, client) 注册到 Monitor + - 更新 AppState: role → master, 添加 slave 记录 +6. 收到 attach_reject: 断开,报告原因 +``` + +#### `/detach [name]` — `src/commands/detach/detach.ts` + +``` +有目标名: + 1. removeSlaveClient(name) + 2. 发送 detach 消息 + 3. client.disconnect() + 4. 从 AppState.slaves 删除 + 5. 如无更多 slave → role 恢复 standalone + +无目标名(全部断开): + 1. 遍历所有 slaveClients + 2. 对每个执行上述流程 + 3. role → standalone +``` + +#### `/pipes` — `src/commands/pipes/pipes.ts` + +``` +1. 显示本 CLI 的管道名和角色 +2. 如 master: 显示已连接 slave 列表 +3. 如 slave: 显示控制方 +4. listPipes() 列出所有管道文件 +5. isPipeAlive() 逐个检测存活状态 +6. 标记已 attach 的管道 +``` + +#### `/send ` — `src/commands/send/send.ts` + +``` +1. 检查 role === master +2. 解析: 第一个空格前是管道名,后面是消息 +3. getSlaveClient(name) 获取连接 +4. client.send({ type: 'prompt', data: message }) +5. 记录到 slaves[name].history (type: 'prompt') +6. 更新 slave 状态为 busy +``` + +#### `/history ` — `src/commands/history/history.ts` + +``` +1. 检查 role === master +2. 从 AppState.pipeIpc.slaves[name].history 读取记录 +3. 支持 --last N 参数限制显示条数 +4. 格式化输出: [时间] [类型] 内容 + 类型标记: [PROMPT] [AI] [TOOL>] [TOOL<] [DONE] [ERROR] +``` + +#### `/pipe-status` — `src/commands/pipe-status/pipe-status.ts` + +``` +1. standalone: 提示未连接 +2. slave: 显示控制方信息 +3. master: 逐个显示 slave 信息 + - 名称、状态(idle/busy)、连接状态、连接时间、历史条数 +``` + +### 3.5 REPL 集成 — `src/screens/REPL.tsx` + +**修改点一:导入**(第 144-145 行) +```typescript +import { usePipeIpc, relayToMaster } from '../hooks/usePipeIpc.js'; +import { useMasterMonitor } from '../hooks/useMasterMonitor.js'; +``` + +**修改点二:Hook 挂载**(第 4114-4120 行,位于 `useMailboxBridge` 之后) +```typescript +usePipeIpc({ + enabled: true, + isLoading, + onSubmitMessage: handleIncomingPrompt, +}); +useMasterMonitor(); +``` + +**修改点三:`resetLoadingState` 中发送 done 信号**(第 1607 行) +```typescript +relayToMaster({ type: 'done' }); +``` +每轮 AI 对话结束时通知 master。 + +**修改点四:`onQueryEvent` 中继 AI 输出** + +流式文本回调(第 2697 行): +```typescript +relayToMaster({ type: 'stream', data: newContent }); +``` + +消息处理回调中(第 2671-2678 行): +```typescript +// Tool 事件 +if (newMessage.type === 'progress') { + relayToMaster({ type: 'tool_start', data: tool, meta: { toolUseId } }); + relayToMaster({ type: 'tool_result', data: result, meta: { toolUseId } }); +} +// Assistant 文本消息 +if (newMessage.type === 'assistant') { + relayToMaster({ type: 'stream', data: text }); +} +``` + +**删除点:旧的 master 输入劫持** + +旧代码会在 master 模式下拦截用户输入转发给 slave,这**违反需求 R3**(主 CLI 保持完全正常)。已完全删除此逻辑。任务发送现在只通过 `/send` 命令。 + +### 3.6 状态定义 — `src/state/AppStateStore.ts` + +**新增类型**(第 93-116 行): +```typescript +export type SessionEntry = { + type: 'prompt' | 'stream' | 'tool_start' | 'tool_result' | 'done' | 'error' + content: string + from: string + timestamp: string + meta?: Record +} + +export type SlaveInfo = { + name: string + connectedAt: string + status: 'connected' | 'busy' | 'idle' + history: SessionEntry[] +} + +export type PipeIpcState = { + role: 'standalone' | 'master' | 'slave' + serverName: string | null + slaves: Record + attachedBy: string | null +} +``` + +**AppState 新增字段**(第 481 行): +```typescript +pipeIpc: PipeIpcState +``` + +**默认值**(第 598 行): +```typescript +pipeIpc: { + role: 'standalone', + serverName: null, + slaves: {}, + attachedBy: null, +} +``` + +### 3.7 命令注册 — `src/commands.ts` + +**新增导入**(第 59-64 行): +```typescript +import attach from './commands/attach/index.js' +import detach from './commands/detach/index.js' +import pipes from './commands/pipes/index.js' +import send from './commands/send/index.js' +import pipeHistory from './commands/history/index.js' +import pipeStatus from './commands/pipe-status/index.js' +``` + +**注册到 COMMANDS 数组**(第 335-340 行): +```typescript +attach, +detach, +pipes, +send, +pipeHistory, +pipeStatus, +``` + +--- + +## 4. 文件清单与修改说明 + +### 4.1 新建文件 + +| 文件 | 说明 | +|------|------| +| `docs/pipe-master-slave-design.md` | 设计文档 | +| `src/utils/pipeTransport.ts` | 传输层:PipeServer、PipeClient、NDJSON 协议 | +| `src/hooks/usePipeIpc.ts` | Slave 侧 Hook:自动建立 PipeServer、处理 attach/detach/prompt、relay | +| `src/hooks/useMasterMonitor.ts` | Master 侧 Hook:监听 slave 会话数据、存储历史记录 | +| `src/commands/attach/index.ts` | /attach 命令注册 | +| `src/commands/attach/attach.ts` | /attach 命令实现 | +| `src/commands/detach/index.ts` | /detach 命令注册 | +| `src/commands/detach/detach.ts` | /detach 命令实现 | +| `src/commands/pipes/index.ts` | /pipes 命令注册 | +| `src/commands/pipes/pipes.ts` | /pipes 命令实现 | +| `src/commands/send/index.ts` | /send 命令注册 | +| `src/commands/send/send.ts` | /send 命令实现 | +| `src/commands/history/index.ts` | /history 命令注册 | +| `src/commands/history/history.ts` | /history 命令实现 | +| `src/commands/pipe-status/index.ts` | /pipe-status 命令注册 | +| `src/commands/pipe-status/pipe-status.ts` | /pipe-status 命令实现 | +| `test-pipe-ipc.ts` | 双进程端到端测试脚本 | + +### 4.2 修改的已有文件 + +| 文件 | 修改内容 | +|------|----------| +| `src/state/AppStateStore.ts` | 新增 `SessionEntry`、`SlaveInfo`、`PipeIpcState` 类型;AppState 添加 `pipeIpc` 字段;`getDefaultAppState()` 添加默认值 | +| `src/commands.ts` | 导入 6 个新命令模块;注册到 `COMMANDS` 数组 | +| `src/screens/REPL.tsx` | 导入 `usePipeIpc`/`relayToMaster`/`useMasterMonitor`;挂载两个 Hook;`resetLoadingState` 中添加 done 信号;`onQueryEvent` 中添加 stream/tool/assistant 中继;**删除**旧的 master 输入劫持逻辑和旧的 `__pipeSendToMaster` 内联调用 | + +### 4.3 删除的文件 + +| 文件 | 原因 | +|------|------| +| `src/hooks/useMasterRelay.ts` | 旧的单 slave 架构,被 `useMasterMonitor.ts` 替代 | + +--- + +## 5. 测试验证 + +### 5.1 集成测试(同进程双端通信) + +通过 `test-pipe-ipc.ts` 在同一进程中创建独立的 PipeServer 和 PipeClient,通过真实 Unix domain socket 通信。 + +**测试用例**: + +| # | 测试项 | 结果 | +|---|--------|------| +| 1 | Slave PipeServer 启动成功 | ✓ PASS | +| 2 | `listPipes()` 发现管道 | ✓ PASS | +| 3 | `isPipeAlive()` ping/pong 健康检查 | ✓ PASS | +| 4 | Master 连接到 Slave | ✓ PASS | +| 5 | Attach 请求被接受 | ✓ PASS | +| 6 | 重复 attach 被正确拒绝 | ✓ PASS | +| 7 | 发送 prompt → 收到完整 session 数据 (stream×2 → tool_start → tool_result → done) | ✓ PASS | +| 8 | Stream 内容完整 | ✓ PASS | +| 9 | Tool 事件携带正确 metadata | ✓ PASS | +| 10 | Detach 命令 | ✓ PASS | +| 11 | Detach 后重新 attach 成功 | ✓ PASS | + +### 5.2 双独立进程测试 + +启动两个真正独立的操作系统进程: + +``` +进程 A (Slave, PID 12572): + bun run test-pipe-ipc.ts slave + +进程 B (Master, PID 12588): + bun run test-pipe-ipc.ts master +``` + +**验证结果**: + +``` +[SLAVE] Server started: test-slave-001 +[SLAVE] 等待 master 连接... + +[MASTER] Connecting to slave "test-slave-001"... +[MASTER] Connected! +[MASTER] 已发送 attach_request +[SLAVE] Accepted attach from test-master-001 +[MASTER] 收到: attach_accept test-slave-001 + +[MASTER] 已发送 prompt +[SLAVE] Received prompt: "请帮我分析这段代码的问题" +[SLAVE] Sent stream fragment 1 +[MASTER] 收到: stream Processing your request +[SLAVE] Sent stream fragment 2 +[MASTER] 收到: stream ... analyzing code... +[SLAVE] Sent tool_start +[MASTER] 收到: tool_start ReadFile +[SLAVE] Sent tool_result +[MASTER] 收到: tool_result file contents here... +[SLAVE] Sent done +[MASTER] 收到: done + +[SLAVE] Detached by test-master-001 +[MASTER] 已断开连接 +``` + +**结论**:两个独立进程通过 Unix domain socket 完成了完整的 attach → prompt → session 数据回传 → detach 流程。 + +### 5.3 测试命令 + +```bash +# 集成测试(11 项全部通过) +bun run test-pipe-ipc.ts + +# 双进程测试(终端 1) +bun run test-pipe-ipc.ts slave + +# 双进程测试(终端 2) +bun run test-pipe-ipc.ts master +``` diff --git a/docs/pipe-master-slave-design.md b/docs/pipe-master-slave-design.md new file mode 100644 index 0000000..d7bbff3 --- /dev/null +++ b/docs/pipe-master-slave-design.md @@ -0,0 +1,149 @@ +# Master-Slave CLI Communication Architecture Design Document + +## 1. Overview + +This document describes a **master-slave architecture** for inter-CLI communication using Unix domain sockets (named pipes). The system allows independent CLI instances to form a coordination network where: + +- **Master CLI**: A control center that connects to multiple slave CLIs, sends them tasks, and receives their full session data (user input + AI output + tool results) for review. +- **Slave CLI**: An autonomous worker that processes tasks independently. When attached by a master, it automatically reports all session activity back to the master. +- **Standalone CLI**: The default mode — a completely normal, independent CLI with no special behavior. + +## 2. Core Principles + +1. **Independence by default**: Every CLI starts as standalone. No master/slave behavior until explicitly activated via `/attach`. +2. **Master is a monitor, not a terminal proxy**: The master CLI's own conversation/commands remain fully functional. Master monitors slaves, it doesn't become them. +3. **Slave is autonomous**: Slave executes its own AI queries and tool calls. It just reports what happens to the master. +4. **Multiple slaves**: A master can attach to multiple slaves simultaneously. +5. **Bidirectional control**: Master can send prompts to slaves via `/send`, and receives session reports automatically. +6. **Clean detach**: Either side can disconnect, returning to standalone mode. + +## 3. Architecture + +### 3.1 Transport Layer (existing — `src/utils/pipeTransport.ts`) + +- **PipeServer**: Each CLI creates a Unix domain socket server at `~/.claude/pipes/{session-id}.sock` +- **PipeClient**: Connects to a remote PipeServer for communication +- **Protocol**: NDJSON (newline-delimited JSON) over Unix domain sockets +- **Message types**: `ping/pong`, `attach_request/accept/reject`, `detach`, `prompt`, `stream`, `tool_start`, `tool_result`, `done`, `error` + +### 3.2 State Model (`AppState.pipeIpc`) + +```typescript +pipeIpc: { + role: 'standalone' | 'master' | 'slave' + serverName: string | null // This CLI's pipe server name + // Master-specific + slaves: Map // Connected slaves (name → info) + // Slave-specific + attachedBy: string | null // Master pipe name (when slave) +} + +type SlaveInfo = { + name: string + connectedAt: string // ISO timestamp + status: 'connected' | 'busy' | 'idle' + history: SessionEntry[] // Full session transcript +} + +type SessionEntry = { + type: 'prompt' | 'stream' | 'tool_start' | 'tool_result' | 'done' | 'error' + content: string + from: string + timestamp: string + meta?: Record +} +``` + +### 3.3 Hooks + +#### `usePipeIpc` (every CLI) +- On mount: create PipeServer for this session +- Handle `attach_request` → accept, switch to slave role, begin auto-reporting +- Handle `prompt` → inject via `handleIncomingPrompt` +- Handle `detach` → revert to standalone +- **Auto-report**: When in slave role, relay all session events (user input, AI output, tool calls) to master via `globalThis.__pipeSendToMaster` + +#### `useMasterMonitor` (master only) +- Active when `role === 'master'` +- For each connected slave PipeClient: listen for `stream`, `tool_start`, `tool_result`, `done`, `error` +- Store received messages into `slaves[name].history` +- Update slave status (`busy`/`idle`) based on `prompt`/`done` events + +### 3.4 Session Relay (Slave → Master) + +When a CLI is in slave role, the REPL's `onQueryEvent` handler additionally calls `globalThis.__pipeSendToMaster()` to forward: +- **AI stream fragments** → `{ type: 'stream', data: text }` +- **Tool start** → `{ type: 'tool_start', data: toolName, meta: { toolUseId } }` +- **Tool results** → `{ type: 'tool_result', data: resultText, meta: { toolUseId } }` +- **Turn complete** → `{ type: 'done' }` +- **Errors** → `{ type: 'error', data: errorMessage }` + +### 3.5 Commands + +| Command | Description | +|---------|-------------| +| `/pipes` | List all discoverable pipe servers with liveness status | +| `/attach ` | Connect to a slave CLI, begin receiving session reports | +| `/detach [name]` | Disconnect from one slave (or all if no arg) | +| `/send ` | Inject a prompt into a slave CLI | +| `/history ` | View a slave's full session transcript | +| `/status` | Overview of all connected slaves and their status | + +## 4. Flow Diagrams + +### 4.1 Attach Flow +``` +Master CLI Slave CLI + | | + |-- /attach cli-abc12345 --------> | + | (PipeClient connects) | + | | + |-- attach_request ---------------> | + | |-- (checks role == standalone) + |<------------- attach_accept ------|-- (sets role = slave) + |-- (sets role = master) | + |-- (adds to slaves map) | + | | + |<============ auto-report =========| (slave sends all session data) +``` + +### 4.2 Send Flow +``` +Master CLI Slave CLI + | | + |-- /send cli-abc12345 "task" --> | + |-- prompt {data: "task"} --------> | + | |-- (handleIncomingPrompt) + | |-- (AI processes task) + |<-------- stream {data: "..."} ----| + |<-------- tool_start -------------| + |<-------- tool_result ------------| + |<-------- done -------------------| + |-- (stores in history) | +``` + +### 4.3 Detach Flow +``` +Master CLI Slave CLI + | | + |-- /detach cli-abc12345 --------> | + |-- detach ----------------------> | + | |-- (sets role = standalone) + |-- (removes from slaves map) |-- (stops auto-report) + |-- (role stays master or | + | becomes standalone if | + | no more slaves) | +``` + +## 5. Implementation Plan + +1. Update `AppState.pipeIpc` to support multi-slave master model +2. Rewrite `usePipeIpc` hook for correct slave behavior +3. Create `useMasterMonitor` hook for master-side monitoring +4. Rewrite `/attach` command for multi-slave support +5. Rewrite `/detach` command with optional target +6. Update `/pipes` command +7. Create `/send`, `/history`, `/status` commands +8. Mount hooks in REPL.tsx +9. Register all commands in commands.ts +10. Integrate session relay into REPL's `onQueryEvent` diff --git a/src/assistant/gate.ts b/src/assistant/gate.ts new file mode 100644 index 0000000..1602a3b --- /dev/null +++ b/src/assistant/gate.ts @@ -0,0 +1,25 @@ +import { feature } from 'bun:bundle' +import { getKairosActive } from '../bootstrap/state.js' +import { getFeatureValue_CACHED_MAY_BE_STALE } from '../services/analytics/growthbook.js' + +/** + * Runtime gate for KAIROS features. + * + * Build-time: feature('KAIROS') must be on (checked by caller before + * this module is required). + * + * Runtime: tengu_kairos_assistant GrowthBook flag acts as a remote kill + * switch, and kairosActive state must be true (set during bootstrap when + * the session qualifies for KAIROS features). + */ +export async function isKairosEnabled(): Promise { + if (!feature('KAIROS')) { + return false + } + if ( + !getFeatureValue_CACHED_MAY_BE_STALE('tengu_kairos_assistant', false) + ) { + return false + } + return getKairosActive() +} diff --git a/src/commands.ts b/src/commands.ts index 10f03b2..3a24957 100644 --- a/src/commands.ts +++ b/src/commands.ts @@ -56,6 +56,12 @@ import terminalSetup from './commands/terminalSetup/index.js' import usage from './commands/usage/index.js' import theme from './commands/theme/index.js' import vim from './commands/vim/index.js' +import attach from './commands/attach/index.js' +import detach from './commands/detach/index.js' +import pipes from './commands/pipes/index.js' +import send from './commands/send/index.js' +import pipeHistory from './commands/history/index.js' +import pipeStatus from './commands/pipe-status/index.js' import { feature } from 'bun:bundle' // Dead code elimination: conditional imports /* eslint-disable @typescript-eslint/no-require-imports */ @@ -326,6 +332,12 @@ const COMMANDS = memoize((): Command[] => [ ...(bridge ? [bridge] : []), ...(remoteControlServerCommand ? [remoteControlServerCommand] : []), ...(voiceCommand ? [voiceCommand] : []), + attach, + detach, + pipes, + send, + pipeHistory, + pipeStatus, thinkback, thinkbackPlay, permissions, diff --git a/src/commands/assistant/assistant.tsx b/src/commands/assistant/assistant.tsx new file mode 100644 index 0000000..df0e62e --- /dev/null +++ b/src/commands/assistant/assistant.tsx @@ -0,0 +1,36 @@ +import type { LocalJSXCommandContext } from '../../commands.js' +import type { LocalJSXCommandOnDone } from '../../types/command.js' + +/** + * /assistant command implementation. + * + * Opens the Kairos assistant panel. In the current build the panel is + * rendered by the REPL layer when kairosActive is true; the slash command + * simply toggles visibility and prints a confirmation line. + */ +export async function call( + onDone: LocalJSXCommandOnDone, + context: LocalJSXCommandContext, + _args: string, +): Promise { + const { setAppState, getAppState } = context + + const current = getAppState() + const isVisible = (current as Record).assistantPanelVisible + + if (isVisible) { + setAppState((prev: Record) => ({ + ...prev, + assistantPanelVisible: false, + })) + onDone('Assistant panel hidden.', { display: 'system' }) + } else { + setAppState((prev: Record) => ({ + ...prev, + assistantPanelVisible: true, + })) + onDone('Assistant panel opened.', { display: 'system' }) + } + + return null +} diff --git a/src/commands/assistant/gate.ts b/src/commands/assistant/gate.ts new file mode 100644 index 0000000..0bf42b1 --- /dev/null +++ b/src/commands/assistant/gate.ts @@ -0,0 +1,25 @@ +import { feature } from 'bun:bundle' +import { getKairosActive } from '../../bootstrap/state.js' +import { getFeatureValue_CACHED_MAY_BE_STALE } from '../../services/analytics/growthbook.js' + +/** + * Runtime gate for the /assistant command. + * + * Build-time: feature('KAIROS') must be on (checked in commands.ts before + * the module is even required). + * + * Runtime: tengu_kairos_assistant GrowthBook flag acts as a remote kill + * switch, and kairosActive state must be true (set during bootstrap when + * the session qualifies for KAIROS features). + */ +export function isAssistantEnabled(): boolean { + if (!feature('KAIROS')) { + return false + } + if ( + !getFeatureValue_CACHED_MAY_BE_STALE('tengu_kairos_assistant', false) + ) { + return false + } + return getKairosActive() +} diff --git a/src/commands/assistant/index.ts b/src/commands/assistant/index.ts new file mode 100644 index 0000000..18263be --- /dev/null +++ b/src/commands/assistant/index.ts @@ -0,0 +1,16 @@ +import type { Command } from '../../commands.js' +import { isAssistantEnabled } from './gate.js' + +const assistant = { + type: 'local-jsx', + name: 'assistant', + description: 'Open the Kairos assistant panel', + isEnabled: isAssistantEnabled, + get isHidden() { + return !isAssistantEnabled() + }, + immediate: true, + load: () => import('./assistant.js'), +} satisfies Command + +export default assistant diff --git a/src/commands/attach/attach.ts b/src/commands/attach/attach.ts new file mode 100644 index 0000000..f300ff8 --- /dev/null +++ b/src/commands/attach/attach.ts @@ -0,0 +1,97 @@ +import type { LocalCommandCall } from '../../types/command.js' +import { connectToPipe, getPipeIpc, type PipeClient, type PipeMessage } from '../../utils/pipeTransport.js' +import { addSlaveClient } from '../../hooks/useMasterMonitor.js' + +export const call: LocalCommandCall = async (args, context) => { + const targetName = args.trim() + if (!targetName) { + return { + type: 'text', + value: 'Usage: /attach \nUse /pipes to list available pipes.', + } + } + + const currentState = context.getAppState() + + // Check if already attached to this slave + if (getPipeIpc(currentState).slaves[targetName]) { + return { + type: 'text', + value: `Already attached to "${targetName}".`, + } + } + + // Cannot attach when in slave mode + if (getPipeIpc(currentState).role === 'slave') { + return { + type: 'text', + value: 'Cannot attach: this CLI is in slave mode. Use /detach from the master first.', + } + } + + // Connect to the target pipe server + let client: PipeClient + try { + const myName = getPipeIpc(currentState).serverName ?? `master-${process.pid}` + client = await connectToPipe(targetName, myName) + } catch (err) { + return { + type: 'text', + value: `Failed to connect to "${targetName}": ${err instanceof Error ? err.message : String(err)}`, + } + } + + // Send attach request and wait for response + return new Promise((resolve) => { + const timeout = setTimeout(() => { + client.disconnect() + resolve({ + type: 'text', + value: `Attach to "${targetName}" timed out (no response within 5s).`, + }) + }, 5000) + + client.onMessage((msg: PipeMessage) => { + if (msg.type === 'attach_accept') { + clearTimeout(timeout) + + // Register the slave client in the module-level registry + addSlaveClient(targetName, client) + + // Update AppState: add slave and switch to master role + context.setAppState((prev) => ({ + ...prev, + pipeIpc: { + ...getPipeIpc(prev), + role: 'master', + slaves: { + ...getPipeIpc(prev).slaves, + [targetName]: { + name: targetName, + connectedAt: new Date().toISOString(), + status: 'idle' as const, + history: [], + }, + }, + }, + })) + + const slaveCount = Object.keys(getPipeIpc(currentState).slaves).length + 1 + resolve({ + type: 'text', + value: `Attached to "${targetName}" as master. Now monitoring ${slaveCount} slave(s).\nUse /send ${targetName} to send tasks.\nUse /status to see all slaves.\nUse /detach ${targetName} to disconnect.`, + }) + } else if (msg.type === 'attach_reject') { + clearTimeout(timeout) + client.disconnect() + + resolve({ + type: 'text', + value: `Attach rejected by "${targetName}": ${msg.data ?? 'unknown reason'}`, + }) + } + }) + + client.send({ type: 'attach_request' }) + }) +} diff --git a/src/commands/attach/index.ts b/src/commands/attach/index.ts new file mode 100644 index 0000000..ab68930 --- /dev/null +++ b/src/commands/attach/index.ts @@ -0,0 +1,12 @@ +import type { Command } from '../../commands.js' + +const attach = { + type: 'local', + name: 'attach', + description: 'Attach to a slave CLI to monitor and control it', + argumentHint: '', + supportsNonInteractive: false, + load: () => import('./attach.js'), +} satisfies Command + +export default attach diff --git a/src/commands/buddy/buddy.tsx b/src/commands/buddy/buddy.tsx new file mode 100644 index 0000000..b4c8e87 --- /dev/null +++ b/src/commands/buddy/buddy.tsx @@ -0,0 +1,244 @@ +import React from 'react' +import { Box, Text } from '../../ink.js' +import { getGlobalConfig, saveGlobalConfig } from '../../utils/config.js' +import { + companionUserId, + getCompanion, + roll, +} from '../../buddy/companion.js' +import { isBuddyLive } from '../../buddy/useBuddyNotification.js' +import { + RARITY_COLORS, + RARITY_STARS, + STAT_NAMES, + type CompanionBones, + type CompanionSoul, + type StoredCompanion, +} from '../../buddy/types.js' +import { renderSprite } from '../../buddy/sprites.js' +import type { LocalJSXCommandCall } from '../../types/command.js' +import { useTheme } from '../../ink.js' + +// Fallback names when soul generation API is unavailable +const FALLBACK_NAMES = [ + 'Crumpet', + 'Soup', + 'Pickle', + 'Biscuit', + 'Moth', + 'Gravy', +] as const + +// Inspiration words for soul generation +const INSPIRATION = [ + 'thunder', 'biscuit', 'void', 'accordion', 'moss', 'velvet', 'static', + 'marble', 'squall', 'prism', 'glyph', 'lichen', 'torque', 'ember', 'drift', + 'mercury', 'fable', 'plume', 'cipher', 'soot', 'quartz', 'anthem', 'gauge', + 'thistle', 'rumble', 'opal', 'forge', 'vex', 'wane', 'yew', 'zest', +] as const + +function fallbackSoul(bones: CompanionBones): CompanionSoul { + const idx = bones.species.charCodeAt(0) + bones.eye.charCodeAt(0) + return { + name: FALLBACK_NAMES[idx % FALLBACK_NAMES.length]!, + personality: `A ${bones.rarity} ${bones.species} of few words.`, + } +} + +function hatchCompanion( + setCompanionReaction: (reaction: string | undefined) => void, +): StoredCompanion { + const userId = companionUserId() + const { bones } = roll(userId) + + // Use fallback soul generation (API-based soul generation requires + // firstParty org access to /api/organizations/{org}/claude_code/buddy_react) + const soul = fallbackSoul(bones) + const hatchedAt = Date.now() + + // Persist soul in global config + saveGlobalConfig(config => ({ + ...config, + companion: { ...soul, hatchedAt }, + })) + + return { ...soul, hatchedAt } +} + +function StatBar({ + name, + value, +}: { + name: string + value: number +}): React.ReactNode { + const filled = Math.round(value / 10) + const empty = 10 - filled + return ( + + {name.padEnd(10)} + {'█'.repeat(filled)} + {'░'.repeat(empty)} + {String(value).padStart(3)} + + ) +} + +function CompanionCard({ + bones, + soul, + lastReaction, +}: { + bones: CompanionBones + soul: CompanionSoul & { hatchedAt: number } + lastReaction?: string +}): React.ReactNode { + const [theme] = useTheme() + const rarityColor = + theme[RARITY_COLORS[bones.rarity]] ?? undefined + + return ( + + + + {RARITY_STARS[bones.rarity]} {bones.rarity.toUpperCase()} + + + + {bones.species.toUpperCase()} + + {bones.shiny && ( + + ✨ SHINY ✨ + + )} + + + {renderSprite(bones, 0).join('\n')} + + + + {soul.name} + + + + {soul.personality} + + + + {STAT_NAMES.map(stat => ( + + ))} + + {lastReaction && ( + + “{lastReaction}” + + )} + + ) +} + +function HatchingView({ + bones, + soul, + onDone, +}: { + bones: CompanionBones + soul: CompanionSoul & { hatchedAt: number } + onDone: (msg: string, opts?: { display?: string }) => void +}): React.ReactNode { + return ( + + + + + {soul.name} is here · it'll chime in as you code + + + your buddy won't count toward your usage + + + say its name to get its take · /buddy pet · /buddy off + + + + ) +} + +export const call: LocalJSXCommandCall = async (onDone, context, args) => { + const config = getGlobalConfig() + const arg = args?.trim() + + if (arg === 'off') { + if (config.companionMuted !== true) { + saveGlobalConfig(c => ({ ...c, companionMuted: true })) + } + onDone('companion muted', { display: 'system' }) + return null + } + + if (arg === 'on') { + if (config.companionMuted === true) { + saveGlobalConfig(c => ({ ...c, companionMuted: false })) + } + onDone('companion unmuted', { display: 'system' }) + return null + } + + if (!isBuddyLive()) { + onDone('buddy is unavailable on this configuration', { + display: 'system', + }) + return null + } + + if (arg === 'pet') { + const companion = getCompanion() + if (!companion) { + onDone('no companion yet · run /buddy first', { display: 'system' }) + return null + } + if (config.companionMuted === true) { + saveGlobalConfig(c => ({ ...c, companionMuted: false })) + } + context.setAppState((s: any) => ({ ...s, companionPetAt: Date.now() })) + onDone(`petted ${companion.name}`, { display: 'system' }) + return null + } + + // Unmute if muted + if (config.companionMuted === true) { + saveGlobalConfig(c => ({ ...c, companionMuted: false })) + } + + // Show existing companion + const existing = getCompanion() + if (existing) { + return ( + + ) + } + + // Hatch new companion + const setReaction = (reaction: string | undefined) => { + context.setAppState((s: any) => ({ ...s, companionReaction: reaction })) + } + const soul = hatchCompanion(setReaction) + const { bones } = roll(companionUserId()) + + return +} diff --git a/src/commands/buddy/index.ts b/src/commands/buddy/index.ts new file mode 100644 index 0000000..725f2a2 --- /dev/null +++ b/src/commands/buddy/index.ts @@ -0,0 +1,16 @@ +import type { Command } from '../../types/command.js' +import { isBuddyLive } from '../../buddy/useBuddyNotification.js' + +const buddy = { + type: 'local-jsx', + name: 'buddy', + description: 'Hatch a coding companion \u00b7 pet, off', + argumentHint: '[pet|off]', + get isHidden() { + return !isBuddyLive() + }, + immediate: true, + load: () => import('./buddy.js'), +} satisfies Command + +export default buddy diff --git a/src/commands/detach/detach.ts b/src/commands/detach/detach.ts new file mode 100644 index 0000000..4c03d65 --- /dev/null +++ b/src/commands/detach/detach.ts @@ -0,0 +1,88 @@ +import type { LocalCommandCall } from '../../types/command.js' +import { removeSlaveClient, getAllSlaveClients } from '../../hooks/useMasterMonitor.js' +import { getPipeIpc } from '../../utils/pipeTransport.js' + +export const call: LocalCommandCall = async (args, context) => { + const currentState = context.getAppState() + + if (getPipeIpc(currentState).role === 'standalone') { + return { type: 'text', value: 'Not attached to any CLI.' } + } + + if (getPipeIpc(currentState).role === 'slave') { + return { + type: 'text', + value: 'This CLI is in slave mode. The master must detach.', + } + } + + // Master mode + const targetName = args.trim() + + if (targetName) { + // Detach from a specific slave + const client = removeSlaveClient(targetName) + if (!client) { + return { + type: 'text', + value: `Not attached to "${targetName}". Use /status to see connected slaves.`, + } + } + + try { + client.send({ type: 'detach' }) + } catch { + // Socket may already be closed + } + client.disconnect() + + // Remove slave from state + context.setAppState((prev) => { + const { [targetName]: _removed, ...remainingSlaves } = getPipeIpc(prev).slaves + const hasSlaves = Object.keys(remainingSlaves).length > 0 + return { + ...prev, + pipeIpc: { + ...getPipeIpc(prev), + role: hasSlaves ? 'master' : 'standalone', + slaves: remainingSlaves, + }, + } + }) + + return { + type: 'text', + value: `Detached from "${targetName}".`, + } + } + + // No target specified — detach from ALL slaves + const allClients = getAllSlaveClients() + const slaveNames = Array.from(allClients.keys()) + + for (const name of slaveNames) { + const client = removeSlaveClient(name) + if (client) { + try { + client.send({ type: 'detach' }) + } catch { + // Ignore + } + client.disconnect() + } + } + + context.setAppState((prev) => ({ + ...prev, + pipeIpc: { + ...getPipeIpc(prev), + role: 'standalone', + slaves: {}, + }, + })) + + return { + type: 'text', + value: `Detached from ${slaveNames.length} slave(s): ${slaveNames.join(', ')}. Back to standalone mode.`, + } +} diff --git a/src/commands/detach/index.ts b/src/commands/detach/index.ts new file mode 100644 index 0000000..adac6a0 --- /dev/null +++ b/src/commands/detach/index.ts @@ -0,0 +1,12 @@ +import type { Command } from '../../commands.js' + +const detach = { + type: 'local', + name: 'detach', + description: 'Detach from a slave CLI (or all slaves if no name given)', + argumentHint: '[pipe-name]', + supportsNonInteractive: false, + load: () => import('./detach.js'), +} satisfies Command + +export default detach diff --git a/src/commands/force-snip.ts b/src/commands/force-snip.ts new file mode 100644 index 0000000..6d1a355 --- /dev/null +++ b/src/commands/force-snip.ts @@ -0,0 +1,59 @@ +import { randomUUID } from 'crypto' +import type { Command, LocalCommandCall } from '../types/command.js' +import type { Message } from '../types/message.js' + +/** + * Insert a snip boundary into the message array. + * + * A snip boundary is a system message that marks everything before it as + * "snipped". During the next query cycle, `snipCompactIfNeeded` (in + * services/compact/snipCompact.ts) detects this boundary and removes — or + * collapses — the older messages so they no longer consume context-window + * tokens. The REPL keeps the full history for UI scrollback; the boundary + * only affects model-facing projections. + * + * The `snipMetadata.removedUuids` field tells downstream consumers + * (sessionStorage persistence, snipProjection) which messages were removed. + */ +const call: LocalCommandCall = async (_args, context) => { + const { messages, setMessages } = context + + if (messages.length === 0) { + return { type: 'text', value: 'No messages to snip.' } + } + + // Collect UUIDs of every message that will be snipped (everything currently + // in the conversation). The next call to `snipCompactIfNeeded` will honour + // the boundary and strip these from the model-facing view. + const removedUuids = messages.map((m) => m.uuid) + + const boundaryMessage: Message = { + type: 'system', + subtype: 'snip_boundary', + content: '[snip] Conversation history before this point has been snipped.', + isMeta: true, + timestamp: new Date().toISOString(), + uuid: randomUUID(), + snipMetadata: { + removedUuids, + }, + } as Message // subtype is feature-gated; cast through Message + + setMessages((prev) => [...prev, boundaryMessage]) + + return { + type: 'text', + value: `Snipped ${removedUuids.length} message(s). Older history will be excluded from the next model query.`, + } +} + +const forceSnip = { + type: 'local', + name: 'force-snip', + description: 'Force snip conversation history at current point', + supportsNonInteractive: true, + isHidden: true, + load: () => Promise.resolve({ call }), +} satisfies Command + +export default forceSnip diff --git a/src/commands/fork/fork.tsx b/src/commands/fork/fork.tsx new file mode 100644 index 0000000..1cc5d42 --- /dev/null +++ b/src/commands/fork/fork.tsx @@ -0,0 +1,287 @@ +import { randomUUID, type UUID } from 'crypto' +import { mkdir, readFile, writeFile } from 'fs/promises' +import { getOriginalCwd, getSessionId } from '../../bootstrap/state.js' +import type { LocalJSXCommandContext } from '../../commands.js' +import { logEvent } from '../../services/analytics/index.js' +import type { LocalJSXCommandOnDone } from '../../types/command.js' +import type { + ContentReplacementEntry, + Entry, + LogOption, + SerializedMessage, + TranscriptMessage, +} from '../../types/logs.js' +import { parseJSONL } from '../../utils/json.js' +import { + getProjectDir, + getTranscriptPath, + getTranscriptPathForSession, + isTranscriptMessage, + saveCustomTitle, + searchSessionsByCustomTitle, +} from '../../utils/sessionStorage.js' +import { jsonStringify } from '../../utils/slowOperations.js' +import { escapeRegExp } from '../../utils/stringUtils.js' + +type TranscriptEntry = TranscriptMessage & { + forkedFrom?: { + sessionId: string + messageUuid: UUID + } +} + +/** + * Derive a single-line title base from the first user message. + * Collapses whitespace so multiline first messages don't break the saved title. + */ +function deriveFirstPrompt( + firstUserMessage: Extract | undefined, +): string { + const content = firstUserMessage?.message?.content + if (!content) return 'Forked conversation' + const raw = + typeof content === 'string' + ? content + : content.find( + (block): block is { type: 'text'; text: string } => + block.type === 'text', + )?.text + if (!raw) return 'Forked conversation' + return ( + raw.replace(/\s+/g, ' ').trim().slice(0, 100) || 'Forked conversation' + ) +} + +/** + * Creates a fork of the current conversation by copying from the transcript file. + * Preserves all original metadata (timestamps, gitBranch, etc.) while updating + * sessionId and adding forkedFrom traceability. + */ +async function createFork(customTitle?: string): Promise<{ + sessionId: UUID + title: string | undefined + forkPath: string + serializedMessages: SerializedMessage[] + contentReplacementRecords: ContentReplacementEntry['replacements'] +}> { + const forkSessionId = randomUUID() as UUID + const originalSessionId = getSessionId() + const projectDir = getProjectDir(getOriginalCwd()) + const forkSessionPath = getTranscriptPathForSession(forkSessionId) + const currentTranscriptPath = getTranscriptPath() + + // Ensure project directory exists + await mkdir(projectDir, { recursive: true, mode: 0o700 }) + + // Read current transcript file + let transcriptContent: Buffer + try { + transcriptContent = await readFile(currentTranscriptPath) + } catch { + throw new Error('No conversation to fork') + } + + if (transcriptContent.length === 0) { + throw new Error('No conversation to fork') + } + + // Parse all transcript entries (messages + metadata entries like content-replacement) + const entries = parseJSONL(transcriptContent) + + // Filter to only main conversation messages (exclude sidechains and non-message entries) + const mainConversationEntries = entries.filter( + (entry): entry is TranscriptMessage => + isTranscriptMessage(entry) && !entry.isSidechain, + ) + + // Content-replacement entries for the original session. These record which + // tool_result blocks were replaced with previews by the per-message budget. + // Without them in the fork JSONL, resuming the fork reconstructs state with + // an empty replacements Map, causing prompt cache misses. + const contentReplacementRecords = entries + .filter( + (entry): entry is ContentReplacementEntry => + entry.type === 'content-replacement' && + entry.sessionId === originalSessionId, + ) + .flatMap(entry => entry.replacements) + + if (mainConversationEntries.length === 0) { + throw new Error('No messages to fork') + } + + // Build forked entries with new sessionId and preserved metadata + let parentUuid: UUID | null = null + const lines: string[] = [] + const serializedMessages: SerializedMessage[] = [] + + for (const entry of mainConversationEntries) { + // Create forked transcript entry preserving all original metadata + const forkedEntry: TranscriptEntry = { + ...entry, + sessionId: forkSessionId, + parentUuid, + isSidechain: false, + forkedFrom: { + sessionId: originalSessionId, + messageUuid: entry.uuid, + }, + } + + // Build serialized message for LogOption + const serialized: SerializedMessage = { + ...entry, + sessionId: forkSessionId, + } + + serializedMessages.push(serialized) + lines.push(jsonStringify(forkedEntry)) + if (entry.type !== 'progress') { + parentUuid = entry.uuid + } + } + + // Append content-replacement entry (if any) with the fork's sessionId. + if (contentReplacementRecords.length > 0) { + const forkedReplacementEntry: ContentReplacementEntry = { + type: 'content-replacement', + sessionId: forkSessionId, + replacements: contentReplacementRecords, + } + lines.push(jsonStringify(forkedReplacementEntry)) + } + + // Write the fork session file + await writeFile(forkSessionPath, lines.join('\n') + '\n', { + encoding: 'utf8', + mode: 0o600, + }) + + return { + sessionId: forkSessionId, + title: customTitle, + forkPath: forkSessionPath, + serializedMessages, + contentReplacementRecords, + } +} + +/** + * Generates a unique fork name by checking for collisions with existing session names. + * If "baseName (Fork)" already exists, tries "baseName (Fork 2)", "baseName (Fork 3)", etc. + */ +async function getUniqueForkName(baseName: string): Promise { + const candidateName = `${baseName} (Fork)` + + // Check if this exact name already exists + const existingWithExactName = await searchSessionsByCustomTitle( + candidateName, + { exact: true }, + ) + + if (existingWithExactName.length === 0) { + return candidateName + } + + // Name collision - find a unique numbered suffix + const existingForks = await searchSessionsByCustomTitle(`${baseName} (Fork`) + + // Extract existing fork numbers to find the next available + const usedNumbers = new Set([1]) // Consider " (Fork)" as number 1 + const forkNumberPattern = new RegExp( + `^${escapeRegExp(baseName)} \\(Fork(?: (\\d+))?\\)$`, + ) + + for (const session of existingForks) { + const match = session.customTitle?.match(forkNumberPattern) + if (match) { + if (match[1]) { + usedNumbers.add(parseInt(match[1], 10)) + } else { + usedNumbers.add(1) // " (Fork)" without number is treated as 1 + } + } + } + + // Find the next available number + let nextNumber = 2 + while (usedNumbers.has(nextNumber)) { + nextNumber++ + } + + return `${baseName} (Fork ${nextNumber})` +} + +export async function call( + onDone: LocalJSXCommandOnDone, + context: LocalJSXCommandContext, + args: string, +): Promise { + const customTitle = args?.trim() || undefined + + const originalSessionId = getSessionId() + + try { + const { + sessionId, + title, + forkPath, + serializedMessages, + contentReplacementRecords, + } = await createFork(customTitle) + + // Build LogOption for resume + const now = new Date() + const firstPrompt = deriveFirstPrompt( + serializedMessages.find(m => m.type === 'user'), + ) + + // Save custom title with " (Fork)" suffix + // Handle collisions by adding a number suffix (e.g., " (Fork 2)", " (Fork 3)") + const baseName = title ?? firstPrompt + const effectiveTitle = await getUniqueForkName(baseName) + await saveCustomTitle(sessionId, effectiveTitle, forkPath) + + logEvent('tengu_conversation_forked', { + message_count: serializedMessages.length, + has_custom_title: !!title, + }) + + const forkLog: LogOption = { + date: now.toISOString().split('T')[0]!, + messages: serializedMessages, + fullPath: forkPath, + value: now.getTime(), + created: now, + modified: now, + firstPrompt, + messageCount: serializedMessages.length, + isSidechain: false, + sessionId, + customTitle: effectiveTitle, + contentReplacements: contentReplacementRecords, + } + + // Resume into the fork + const titleInfo = title ? ` "${title}"` : '' + const resumeHint = `\nTo resume the original: claude -r ${originalSessionId}` + const successMessage = `Forked conversation${titleInfo}. You are now in the fork.${resumeHint}` + + if (context.resume) { + await context.resume(sessionId, forkLog, 'fork') + onDone(successMessage, { display: 'system' }) + } else { + // Fallback if resume not available + onDone( + `Forked conversation${titleInfo}. Resume with: /resume ${sessionId}`, + ) + } + + return null + } catch (error) { + const message = + error instanceof Error ? error.message : 'Unknown error occurred' + onDone(`Failed to fork conversation: ${message}`) + return null + } +} diff --git a/src/commands/fork/index.ts b/src/commands/fork/index.ts new file mode 100644 index 0000000..183a3a2 --- /dev/null +++ b/src/commands/fork/index.ts @@ -0,0 +1,13 @@ +import type { Command } from '../../commands.js' + +const fork = { + type: 'local-jsx', + name: 'fork', + description: 'Create a fork of the current conversation at this point', + argumentHint: '[name]', + isEnabled: () => true, + isHidden: false, + load: () => import('./fork.js'), +} satisfies Command + +export default fork diff --git a/src/commands/history/history.ts b/src/commands/history/history.ts new file mode 100644 index 0000000..41d4e45 --- /dev/null +++ b/src/commands/history/history.ts @@ -0,0 +1,83 @@ +import type { LocalCommandCall } from '../../types/command.js' +import { getPipeIpc } from '../../utils/pipeTransport.js' + +export const call: LocalCommandCall = async (args, context) => { + const currentState = context.getAppState() + + if (getPipeIpc(currentState).role !== 'master') { + return { + type: 'text', + value: 'Not in master mode. Use /attach first.', + } + } + + const parts = args.trim().split(/\s+/) + const targetName = parts[0] + + if (!targetName) { + // Show list of slaves + const slaveNames = Object.keys(getPipeIpc(currentState).slaves) + if (slaveNames.length === 0) { + return { type: 'text', value: 'No slaves connected.' } + } + return { + type: 'text', + value: `Usage: /history \nConnected slaves: ${slaveNames.join(', ')}`, + } + } + + const slave = getPipeIpc(currentState).slaves[targetName] + if (!slave) { + return { + type: 'text', + value: `Not attached to "${targetName}". Use /status to see connected slaves.`, + } + } + + // Parse --last N + let limit = slave.history.length + const lastIdx = parts.indexOf('--last') + if (lastIdx !== -1 && parts[lastIdx + 1]) { + const n = parseInt(parts[lastIdx + 1], 10) + if (!isNaN(n) && n > 0) { + limit = n + } + } + + const entries = slave.history.slice(-limit) + + if (entries.length === 0) { + return { + type: 'text', + value: `No session history for "${targetName}" yet.`, + } + } + + const lines: string[] = [ + `Session history for "${targetName}" (${entries.length}/${slave.history.length} entries):`, + '', + ] + + for (const entry of entries) { + const time = entry.timestamp.slice(11, 19) // HH:MM:SS + const prefix = formatEntryType(entry.type) + const content = entry.content.length > 200 + ? entry.content.slice(0, 200) + '...' + : entry.content + lines.push(`[${time}] ${prefix} ${content}`) + } + + return { type: 'text', value: lines.join('\n') } +} + +function formatEntryType(type: string): string { + switch (type) { + case 'prompt': return '[PROMPT]' + case 'stream': return '[AI] ' + case 'tool_start': return '[TOOL>] ' + case 'tool_result': return '[TOOL<] ' + case 'done': return '[DONE] ' + case 'error': return '[ERROR] ' + default: return `[${type}]` + } +} diff --git a/src/commands/history/index.ts b/src/commands/history/index.ts new file mode 100644 index 0000000..3fa8a32 --- /dev/null +++ b/src/commands/history/index.ts @@ -0,0 +1,12 @@ +import type { Command } from '../../commands.js' + +const history = { + type: 'local', + name: 'history', + description: 'View session transcript of a connected slave CLI', + argumentHint: ' [--last N]', + supportsNonInteractive: false, + load: () => import('./history.js'), +} satisfies Command + +export default history diff --git a/src/commands/peers/index.ts b/src/commands/peers/index.ts new file mode 100644 index 0000000..c573114 --- /dev/null +++ b/src/commands/peers/index.ts @@ -0,0 +1,12 @@ +import type { Command } from '../../commands.js' + +const peers = { + type: 'local', + name: 'peers', + aliases: ['who'], + description: 'List connected Claude Code peers', + supportsNonInteractive: true, + load: () => import('./peers.js'), +} satisfies Command + +export default peers diff --git a/src/commands/peers/peers.ts b/src/commands/peers/peers.ts new file mode 100644 index 0000000..aed37d3 --- /dev/null +++ b/src/commands/peers/peers.ts @@ -0,0 +1,61 @@ +import type { LocalCommandCall } from '../../types/command.js' +import { listPeers, isPeerAlive } from '../../utils/udsClient.js' +import { getUdsMessagingSocketPath } from '../../utils/udsMessaging.js' + +export const call: LocalCommandCall = async (_args, _context) => { + const mySocket = getUdsMessagingSocketPath() + const peers = await listPeers() + + const lines: string[] = [] + + // Show own socket + lines.push(`Your socket: ${mySocket ?? '(not started)'}`) + lines.push('') + + if (peers.length === 0) { + lines.push('No other Claude Code peers found.') + } else { + lines.push(`Peers (${peers.length}):`) + lines.push('') + + for (const peer of peers) { + const alive = peer.messagingSocketPath + ? await isPeerAlive(peer.messagingSocketPath) + : false + const status = alive ? 'reachable' : 'unreachable' + const label = peer.name ?? peer.kind ?? 'interactive' + const cwd = peer.cwd ? ` cwd: ${peer.cwd}` : '' + const age = peer.startedAt + ? ` started: ${formatAge(peer.startedAt)}` + : '' + + lines.push( + ` [${status}] PID ${peer.pid} (${label})${cwd}${age}`, + ) + if (peer.messagingSocketPath) { + lines.push(` socket: ${peer.messagingSocketPath}`) + } + if (peer.sessionId) { + lines.push(` session: ${peer.sessionId}`) + } + } + } + + lines.push('') + lines.push( + 'To message a peer: use SendMessage with to="uds:"', + ) + + return { type: 'text', value: lines.join('\n') } +} + +function formatAge(startedAt: number): string { + const elapsed = Date.now() - startedAt + const seconds = Math.floor(elapsed / 1000) + if (seconds < 60) return `${seconds}s ago` + const minutes = Math.floor(seconds / 60) + if (minutes < 60) return `${minutes}m ago` + const hours = Math.floor(minutes / 60) + const remainingMinutes = minutes % 60 + return `${hours}h ${remainingMinutes}m ago` +} diff --git a/src/commands/pipe-status/index.ts b/src/commands/pipe-status/index.ts new file mode 100644 index 0000000..dc9e9f4 --- /dev/null +++ b/src/commands/pipe-status/index.ts @@ -0,0 +1,11 @@ +import type { Command } from '../../commands.js' + +const pipeStatus = { + type: 'local', + name: 'pipe-status', + description: 'Show status of all connected slave CLIs', + supportsNonInteractive: false, + load: () => import('./pipe-status.js'), +} satisfies Command + +export default pipeStatus diff --git a/src/commands/pipe-status/pipe-status.ts b/src/commands/pipe-status/pipe-status.ts new file mode 100644 index 0000000..073c366 --- /dev/null +++ b/src/commands/pipe-status/pipe-status.ts @@ -0,0 +1,59 @@ +import type { LocalCommandCall } from '../../types/command.js' +import { getAllSlaveClients } from '../../hooks/useMasterMonitor.js' +import { getPipeIpc } from '../../utils/pipeTransport.js' + +export const call: LocalCommandCall = async (_args, context) => { + const currentState = context.getAppState() + + if (getPipeIpc(currentState).role === 'standalone') { + return { + type: 'text', + value: 'Standalone mode — not connected to any CLIs.\nUse /attach to connect to a slave.', + } + } + + if (getPipeIpc(currentState).role === 'slave') { + return { + type: 'text', + value: `Slave mode — controlled by "${getPipeIpc(currentState).attachedBy}".\nAll session data is being reported to the master.`, + } + } + + // Master mode + const slaves = getPipeIpc(currentState).slaves + const slaveNames = Object.keys(slaves) + const clients = getAllSlaveClients() + + if (slaveNames.length === 0) { + return { + type: 'text', + value: 'Master mode but no slaves connected.\nUse /attach to connect.', + } + } + + const lines: string[] = [ + `Master mode — ${slaveNames.length} slave(s) connected:`, + '', + ] + + for (const name of slaveNames) { + const slave = slaves[name]! + const client = clients.get(name) + const connected = client?.connected ? 'connected' : 'disconnected' + const historyCount = slave.history.length + const connectedAt = slave.connectedAt.slice(11, 19) + + lines.push(` ${name}`) + lines.push(` Status: ${slave.status} (${connected})`) + lines.push(` Connected: ${connectedAt}`) + lines.push(` History: ${historyCount} entries`) + lines.push('') + } + + lines.push('Commands:') + lines.push(' /send — Send a task to a slave') + lines.push(' /history — View slave session transcript') + lines.push(' /detach [name] — Disconnect from a slave (or all)') + + return { type: 'text', value: lines.join('\n') } +} diff --git a/src/commands/pipes/index.ts b/src/commands/pipes/index.ts new file mode 100644 index 0000000..e1ee798 --- /dev/null +++ b/src/commands/pipes/index.ts @@ -0,0 +1,11 @@ +import type { Command } from '../../commands.js' + +const pipes = { + type: 'local', + name: 'pipes', + description: 'List available named pipes for terminal-to-terminal communication', + supportsNonInteractive: false, + load: () => import('./pipes.js'), +} satisfies Command + +export default pipes diff --git a/src/commands/pipes/pipes.ts b/src/commands/pipes/pipes.ts new file mode 100644 index 0000000..4974016 --- /dev/null +++ b/src/commands/pipes/pipes.ts @@ -0,0 +1,45 @@ +import type { LocalCommandCall } from '../../types/command.js' +import { listPipes, isPipeAlive, getPipeIpc } from '../../utils/pipeTransport.js' + +export const call: LocalCommandCall = async (_args, context) => { + const currentState = context.getAppState() + const myName = getPipeIpc(currentState).serverName + const role = getPipeIpc(currentState).role + + const allPipes = await listPipes() + + const lines: string[] = [] + + // Show own pipe name and status + lines.push(`Your pipe: ${myName ?? '(not started)'}`) + lines.push(`Role: ${role}`) + + if (role === 'master') { + const slaveNames = Object.keys(getPipeIpc(currentState).slaves) + lines.push(`Slaves (${slaveNames.length}): ${slaveNames.join(', ') || 'none'}`) + } else if (role === 'slave') { + lines.push(`Controlled by: ${getPipeIpc(currentState).attachedBy}`) + } + + lines.push('') + + // List other pipes with liveness check + const otherPipes = allPipes.filter((p) => p !== myName) + if (otherPipes.length === 0) { + lines.push('No other pipes found.') + } else { + lines.push(`Other pipes (${otherPipes.length}):`) + for (const name of otherPipes) { + const alive = await isPipeAlive(name) + const status = alive ? 'alive' : 'stale' + const isAttached = getPipeIpc(currentState).slaves[name] ? ' [attached]' : '' + lines.push(` ${name} [${status}]${isAttached}`) + } + } + + lines.push('') + lines.push('To attach: /attach ') + lines.push('To send: /send ') + + return { type: 'text', value: lines.join('\n') } +} diff --git a/src/commands/proactive.ts b/src/commands/proactive.ts new file mode 100644 index 0000000..7b85954 --- /dev/null +++ b/src/commands/proactive.ts @@ -0,0 +1 @@ +export default null diff --git a/src/commands/remoteControlServer/index.ts b/src/commands/remoteControlServer/index.ts new file mode 100644 index 0000000..7b85954 --- /dev/null +++ b/src/commands/remoteControlServer/index.ts @@ -0,0 +1 @@ +export default null diff --git a/src/commands/send/index.ts b/src/commands/send/index.ts new file mode 100644 index 0000000..85c908c --- /dev/null +++ b/src/commands/send/index.ts @@ -0,0 +1,12 @@ +import type { Command } from '../../commands.js' + +const send = { + type: 'local', + name: 'send', + description: 'Send a prompt/task to a connected slave CLI', + argumentHint: ' ', + supportsNonInteractive: false, + load: () => import('./send.js'), +} satisfies Command + +export default send diff --git a/src/commands/send/send.ts b/src/commands/send/send.ts new file mode 100644 index 0000000..158e5c5 --- /dev/null +++ b/src/commands/send/send.ts @@ -0,0 +1,94 @@ +import type { LocalCommandCall } from '../../types/command.js' +import { getSlaveClient } from '../../hooks/useMasterMonitor.js' +import { getPipeIpc } from '../../utils/pipeTransport.js' + +export const call: LocalCommandCall = async (args, context) => { + const currentState = context.getAppState() + + if (getPipeIpc(currentState).role !== 'master') { + return { + type: 'text', + value: 'Not in master mode. Use /attach first.', + } + } + + // Parse: first word is pipe name, rest is the message + const trimmed = args.trim() + const spaceIdx = trimmed.indexOf(' ') + if (spaceIdx === -1) { + return { + type: 'text', + value: 'Usage: /send ', + } + } + + const targetName = trimmed.slice(0, spaceIdx) + const message = trimmed.slice(spaceIdx + 1).trim() + + if (!message) { + return { + type: 'text', + value: 'Usage: /send ', + } + } + + const client = getSlaveClient(targetName) + if (!client) { + return { + type: 'text', + value: `Not attached to "${targetName}". Use /status to see connected slaves.`, + } + } + + if (!client.connected) { + return { + type: 'text', + value: `Connection to "${targetName}" is closed. Use /detach ${targetName} and re-attach.`, + } + } + + try { + client.send({ + type: 'prompt', + data: message, + }) + + // Record the sent prompt in history + context.setAppState((prev) => { + const slave = getPipeIpc(prev).slaves[targetName] + if (!slave) return prev + return { + ...prev, + pipeIpc: { + ...getPipeIpc(prev), + slaves: { + ...getPipeIpc(prev).slaves, + [targetName]: { + ...slave, + status: 'busy' as const, + history: [ + ...slave.history, + { + type: 'prompt' as const, + content: message, + from: getPipeIpc(currentState).serverName ?? 'master', + timestamp: new Date().toISOString(), + }, + ], + }, + }, + }, + } + }) + + return { + type: 'text', + value: `Sent to "${targetName}": ${message.slice(0, 100)}${message.length > 100 ? '...' : ''}`, + } + } catch (err) { + return { + type: 'text', + value: `Failed to send to "${targetName}": ${err instanceof Error ? err.message : String(err)}`, + } + } +} diff --git a/src/commands/subscribe-pr.ts b/src/commands/subscribe-pr.ts new file mode 100644 index 0000000..a38c4b3 --- /dev/null +++ b/src/commands/subscribe-pr.ts @@ -0,0 +1,174 @@ +import * as fs from 'node:fs' +import * as path from 'node:path' +import type { Command, LocalCommandCall } from '../types/command.js' +import { detectCurrentRepositoryWithHost } from '../utils/detectRepository.js' +import { getClaudeConfigHomeDir } from '../utils/envUtils.js' + +/** + * File-backed store for PR webhook subscriptions. + * Each subscription tracks the repo + PR number so the bridge layer + * (useReplBridge / webhookSanitizer) can filter inbound events. + */ +interface PRSubscription { + repo: string // "owner/repo" + prNumber: number + subscribedAt: string // ISO 8601 +} + +function getSubscriptionsFilePath(): string { + return path.join(getClaudeConfigHomeDir(), 'pr-subscriptions.json') +} + +function readSubscriptions(): PRSubscription[] { + const filePath = getSubscriptionsFilePath() + try { + const raw = fs.readFileSync(filePath, 'utf-8') + return JSON.parse(raw) as PRSubscription[] + } catch { + return [] + } +} + +function writeSubscriptions(subs: PRSubscription[]): void { + const filePath = getSubscriptionsFilePath() + const dir = path.dirname(filePath) + fs.mkdirSync(dir, { recursive: true }) + fs.writeFileSync(filePath, JSON.stringify(subs, null, 2), 'utf-8') +} + +/** + * Parse a PR URL or number into { repo, prNumber }. + * + * Accepts: + * - Full URL: https://github.com/owner/repo/pull/123 + * - Short ref: owner/repo#123 + * - Bare number: 123 (uses the current git repository) + */ +async function parsePRArg( + arg: string, +): Promise<{ repo: string; prNumber: number } | { error: string }> { + const trimmed = arg.trim() + + // Full GitHub PR URL + const urlMatch = trimmed.match( + /^https?:\/\/[^/]+\/([^/]+\/[^/]+)\/pull\/(\d+)/, + ) + if (urlMatch) { + return { repo: urlMatch[1]!, prNumber: parseInt(urlMatch[2]!, 10) } + } + + // Short ref: owner/repo#123 + const shortMatch = trimmed.match(/^([^/]+\/[^/]+)#(\d+)$/) + if (shortMatch) { + return { repo: shortMatch[1]!, prNumber: parseInt(shortMatch[2]!, 10) } + } + + // Bare number — resolve repo from current git checkout + const numMatch = trimmed.match(/^#?(\d+)$/) + if (numMatch) { + const prNumber = parseInt(numMatch[1]!, 10) + const detected = await detectCurrentRepositoryWithHost() + if (!detected) { + return { + error: + 'Could not detect the GitHub repository for the current directory. Provide a full PR URL instead.', + } + } + const repo = `${detected.owner}/${detected.repo}` + return { repo, prNumber } + } + + return { + error: `Unrecognised PR reference: "${trimmed}". Expected a PR URL, owner/repo#123, or a PR number.`, + } +} + +const call: LocalCommandCall = async (args, _context) => { + const trimmed = args.trim() + + // List current subscriptions + if (!trimmed || trimmed === '--list' || trimmed === 'list') { + const subs = readSubscriptions() + if (subs.length === 0) { + return { + type: 'text', + value: + 'No active PR subscriptions. Usage: /subscribe-pr ', + } + } + const lines = subs.map( + (s) => ` ${s.repo}#${s.prNumber} (since ${s.subscribedAt})`, + ) + return { + type: 'text', + value: `Active PR subscriptions:\n${lines.join('\n')}`, + } + } + + // Unsubscribe + if (trimmed.startsWith('--remove ') || trimmed.startsWith('remove ')) { + const rest = trimmed.replace(/^(--remove|remove)\s+/, '') + const parsed = await parsePRArg(rest) + if ('error' in parsed) { + return { type: 'text', value: parsed.error } + } + const subs = readSubscriptions() + const before = subs.length + const after = subs.filter( + (s) => !(s.repo === parsed.repo && s.prNumber === parsed.prNumber), + ) + if (after.length === before) { + return { + type: 'text', + value: `No subscription found for ${parsed.repo}#${parsed.prNumber}.`, + } + } + writeSubscriptions(after) + return { + type: 'text', + value: `Unsubscribed from ${parsed.repo}#${parsed.prNumber}.`, + } + } + + // Subscribe + const parsed = await parsePRArg(trimmed) + if ('error' in parsed) { + return { type: 'text', value: parsed.error } + } + + const subs = readSubscriptions() + const existing = subs.find( + (s) => s.repo === parsed.repo && s.prNumber === parsed.prNumber, + ) + if (existing) { + return { + type: 'text', + value: `Already subscribed to ${parsed.repo}#${parsed.prNumber} (since ${existing.subscribedAt}).`, + } + } + + subs.push({ + repo: parsed.repo, + prNumber: parsed.prNumber, + subscribedAt: new Date().toISOString(), + }) + writeSubscriptions(subs) + + return { + type: 'text', + value: `Subscribed to ${parsed.repo}#${parsed.prNumber}. You will receive notifications for comments, CI status, and reviews.`, + } +} + +const subscribePr = { + type: 'local', + name: 'subscribe-pr', + aliases: ['watch-pr'], + description: 'Subscribe to GitHub PR activity (comments, CI, reviews)', + argumentHint: '', + supportsNonInteractive: false, + isHidden: true, + load: () => Promise.resolve({ call }), +} satisfies Command + +export default subscribePr diff --git a/src/commands/torch.ts b/src/commands/torch.ts new file mode 100644 index 0000000..7b85954 --- /dev/null +++ b/src/commands/torch.ts @@ -0,0 +1 @@ +export default null diff --git a/src/commands/workflows/index.ts b/src/commands/workflows/index.ts new file mode 100644 index 0000000..d7d6447 --- /dev/null +++ b/src/commands/workflows/index.ts @@ -0,0 +1,25 @@ +import type { Command, LocalCommandCall } from '../../types/command.js' +import { getWorkflowCommands } from '../../tools/WorkflowTool/createWorkflowCommand.js' +import { getCwd } from '../../utils/cwd.js' + +const call: LocalCommandCall = async (_args, _context) => { + const commands = await getWorkflowCommands(getCwd()) + if (commands.length === 0) { + return { + type: 'text', + value: 'No workflows found. Add workflow files to .claude/workflows/ (YAML or Markdown).', + } + } + const list = commands.map((cmd) => ` /${cmd.name} - ${cmd.description}`).join('\n') + return { type: 'text', value: `Available workflows:\n${list}` } +} + +const workflows = { + type: 'local', + name: 'workflows', + description: 'List available workflow scripts', + supportsNonInteractive: true, + load: () => Promise.resolve({ call }), +} satisfies Command + +export default workflows diff --git a/src/components/permissions/MonitorPermissionRequest/MonitorPermissionRequest.tsx b/src/components/permissions/MonitorPermissionRequest/MonitorPermissionRequest.tsx index 0505f19..c54dcde 100644 --- a/src/components/permissions/MonitorPermissionRequest/MonitorPermissionRequest.tsx +++ b/src/components/permissions/MonitorPermissionRequest/MonitorPermissionRequest.tsx @@ -1,3 +1,163 @@ -export function MonitorPermissionRequest() { - return null +import React, { useCallback, useMemo } from 'react' +import { Box, Text, useTheme } from '../../../ink.js' +import { env } from '../../../utils/env.js' +import { shouldShowAlwaysAllowOptions } from '../../../utils/permissions/permissionsLoader.js' +import { truncateToLines } from '../../../utils/stringUtils.js' +import { logUnaryEvent } from '../../../utils/unaryLogging.js' +import { PermissionDialog } from '../PermissionDialog.js' +import { + PermissionPrompt, + type PermissionPromptOption, +} from '../PermissionPrompt.js' +import type { PermissionRequestProps } from '../PermissionRequest.js' +import { PermissionRuleExplanation } from '../PermissionRuleExplanation.js' + +type OptionValue = 'yes' | 'yes-dont-ask-again' | 'no' + +/** + * Permission request UI for the MonitorTool. Asks the user to confirm + * starting a long-running background monitor process. + * Follows the FallbackPermissionRequest pattern. + */ +export function MonitorPermissionRequest({ + toolUseConfirm, + onDone, + onReject, + workerBadge, +}: PermissionRequestProps): React.ReactNode { + const [theme] = useTheme() + + const input = toolUseConfirm.input as { + command: string + description: string + } + + const showAlwaysAllowOptions = useMemo( + () => shouldShowAlwaysAllowOptions(), + [], + ) + + const options: PermissionPromptOption[] = useMemo(() => { + const opts: PermissionPromptOption[] = [ + { + label: 'Yes', + value: 'yes', + feedbackConfig: { type: 'accept' as const }, + }, + ] + if (showAlwaysAllowOptions) { + opts.push({ + label: ( + + Yes, and don{'\u2019'}t ask again for{' '} + {toolUseConfirm.tool.name} commands + + ), + value: 'yes-dont-ask-again', + }) + } + opts.push({ + label: 'No', + value: 'no', + feedbackConfig: { type: 'reject' as const }, + }) + return opts + }, [showAlwaysAllowOptions, toolUseConfirm.tool.name]) + + const handleSelect = useCallback( + (value: OptionValue, feedback?: string) => { + switch (value) { + case 'yes': + logUnaryEvent({ + completion_type: 'tool_use_single', + event: 'accept', + metadata: { + language_name: 'none', + message_id: toolUseConfirm.assistantMessage.message.id, + platform: env.platform, + }, + }) + toolUseConfirm.onAllow(toolUseConfirm.input, [], feedback) + onDone() + break + case 'yes-dont-ask-again': + logUnaryEvent({ + completion_type: 'tool_use_single', + event: 'accept', + metadata: { + language_name: 'none', + message_id: toolUseConfirm.assistantMessage.message.id, + platform: env.platform, + }, + }) + toolUseConfirm.onAllow(toolUseConfirm.input, [ + { + type: 'addRules', + rules: [{ toolName: toolUseConfirm.tool.name }], + behavior: 'allow', + destination: 'localSettings', + }, + ]) + onDone() + break + case 'no': + logUnaryEvent({ + completion_type: 'tool_use_single', + event: 'reject', + metadata: { + language_name: 'none', + message_id: toolUseConfirm.assistantMessage.message.id, + platform: env.platform, + }, + }) + toolUseConfirm.onReject(feedback) + onReject() + onDone() + break + } + }, + [toolUseConfirm, onDone, onReject], + ) + + const handleCancel = useCallback(() => { + logUnaryEvent({ + completion_type: 'tool_use_single', + event: 'reject', + metadata: { + language_name: 'none', + message_id: toolUseConfirm.assistantMessage.message.id, + platform: env.platform, + }, + }) + toolUseConfirm.onReject() + onReject() + onDone() + }, [toolUseConfirm, onDone, onReject]) + + return ( + + + + + {input.description} + + + {truncateToLines(input.command, 5)} + + + + + options={options} + onSelect={handleSelect} + onCancel={handleCancel} + /> + + + ) } diff --git a/src/components/permissions/ReviewArtifactPermissionRequest/ReviewArtifactPermissionRequest.tsx b/src/components/permissions/ReviewArtifactPermissionRequest/ReviewArtifactPermissionRequest.tsx index 131732c..c4a2650 100644 --- a/src/components/permissions/ReviewArtifactPermissionRequest/ReviewArtifactPermissionRequest.tsx +++ b/src/components/permissions/ReviewArtifactPermissionRequest/ReviewArtifactPermissionRequest.tsx @@ -1,3 +1,74 @@ -export function ReviewArtifactPermissionRequest() { - return null +import React from 'react' +import { Box, Text } from '../../../ink.js' +import { Select } from '../../CustomSelect/select.js' +import { usePermissionRequestLogging } from '../hooks.js' +import { PermissionDialog } from '../PermissionDialog.js' +import type { PermissionRequestProps } from '../PermissionRequest.js' +import { logUnaryPermissionEvent } from '../utils.js' + +export function ReviewArtifactPermissionRequest({ + toolUseConfirm, + onDone, + onReject, + workerBadge, +}: PermissionRequestProps): React.ReactNode { + const { title, annotations, summary } = toolUseConfirm.input as { + title?: string + annotations?: Array<{ line?: number; message: string; severity?: string }> + summary?: string + } + + const unaryEvent = { + completion_type: 'tool_use_single' as const, + language_name: 'none', + } + usePermissionRequestLogging(toolUseConfirm, unaryEvent) + + const annotationCount = annotations?.length ?? 0 + + function handleResponse(value: 'yes' | 'no'): void { + if (value === 'yes') { + logUnaryPermissionEvent('tool_use_single', toolUseConfirm, 'accept') + toolUseConfirm.onAllow(toolUseConfirm.input, []) + onDone() + } else { + logUnaryPermissionEvent('tool_use_single', toolUseConfirm, 'reject') + toolUseConfirm.onReject() + onReject() + onDone() + } + } + + return ( + + + + Claude wants to review{title ? `: ${title}` : ' an artifact'}. + + + + + {annotationCount} annotation{annotationCount !== 1 ? 's' : ''} will + be presented. + + {summary ? Summary: {summary} : null} + + + + + +type WorkflowOutput = { output: string } + +export const WorkflowTool = buildTool({ + name: WORKFLOW_TOOL_NAME, + searchHint: 'execute user-defined workflow scripts', + maxResultSizeChars: 50_000, + strict: true, + + inputSchema, + + async description() { + return 'Execute a user-defined workflow script from .claude/workflows/' + }, + async prompt() { + return `Use the Workflow tool to execute user-defined workflow scripts located in .claude/workflows/. Workflows are YAML or Markdown files that define a sequence of steps for common development tasks. + +Guidelines: +- Specify the workflow name to execute (must match a file in .claude/workflows/) +- Optionally pass arguments that the workflow can use +- Workflows run in the context of the current project` + }, + userFacingName() { + return 'Workflow' + }, + isReadOnly() { + return false + }, + isEnabled() { + return true + }, + + renderToolUseMessage(input: Partial, { verbose }) { + const name = input.workflow ?? 'unknown' + if (verbose && input.args) { + return Workflow: {name} {input.args} + } + return Workflow: {name} + }, + + mapToolResultToToolResultBlockParam( + content: WorkflowOutput, + toolUseID: string, + ): ToolResultBlockParam { + return { + tool_use_id: toolUseID, + type: 'tool_result', + content: truncate(content.output, 50_000), + } + }, + + async call(_input: WorkflowInput, _context, _progress) { + // Workflow execution is wired by the WORKFLOW_SCRIPTS feature bootstrap. + // Without it, this tool is not functional. + return { + data: { + output: 'Error: Workflow execution requires the WORKFLOW_SCRIPTS runtime.', + }, + } + }, +}) diff --git a/src/tools/WorkflowTool/bundled/index.ts b/src/tools/WorkflowTool/bundled/index.ts new file mode 100644 index 0000000..eb6620c --- /dev/null +++ b/src/tools/WorkflowTool/bundled/index.ts @@ -0,0 +1,15 @@ +// Bundled workflow initialization. +// Called by tools.ts when WORKFLOW_SCRIPTS feature flag is enabled. +// Sets up any pre-bundled workflow scripts that ship with the CLI. + +/** + * Initialize bundled workflows. Called once at startup when the + * WORKFLOW_SCRIPTS feature flag is active. This is the hook point + * for registering any workflow scripts that are compiled into the + * binary (as opposed to user-authored ones in .claude/workflows/). + */ +export function initBundledWorkflows(): void { + // Bundled workflows are registered here at startup. + // Currently a no-op — all workflows are user-authored in .claude/workflows/. + // This function exists as the extension point for future built-in workflows. +} diff --git a/src/tools/WorkflowTool/constants.ts b/src/tools/WorkflowTool/constants.ts index 444a03a..49249ca 100644 --- a/src/tools/WorkflowTool/constants.ts +++ b/src/tools/WorkflowTool/constants.ts @@ -1 +1,3 @@ export const WORKFLOW_TOOL_NAME = 'workflow' +export const WORKFLOW_DIR_NAME = '.claude/workflows' +export const WORKFLOW_FILE_EXTENSIONS = ['.yml', '.yaml', '.md'] diff --git a/src/tools/WorkflowTool/createWorkflowCommand.ts b/src/tools/WorkflowTool/createWorkflowCommand.ts index 0c940f7..a6369f5 100644 --- a/src/tools/WorkflowTool/createWorkflowCommand.ts +++ b/src/tools/WorkflowTool/createWorkflowCommand.ts @@ -1,3 +1,41 @@ -export function createWorkflowCommand() { - return null +import { readdir } from 'fs/promises' +import { join, parse } from 'path' +import type { Command } from '../../types/command.js' +import { WORKFLOW_DIR_NAME, WORKFLOW_FILE_EXTENSIONS } from './constants.js' + +/** + * Scans .claude/workflows/ directory and creates Command objects for each workflow file. + * Each workflow file becomes a slash command (e.g. /workflow-name). + */ +export async function getWorkflowCommands(cwd: string): Promise { + const workflowDir = join(cwd, WORKFLOW_DIR_NAME) + let files: string[] + try { + files = await readdir(workflowDir) + } catch { + return [] + } + + const workflowFiles = files.filter((f) => { + const ext = parse(f).ext.toLowerCase() + return WORKFLOW_FILE_EXTENSIONS.includes(ext) + }) + + return workflowFiles.map((file) => { + const name = parse(file).name + return { + type: 'prompt' as const, + name, + description: `Run workflow: ${name}`, + kind: 'workflow' as const, + source: 'builtin' as const, + progressMessage: `Running workflow ${name}...`, + contentLength: 0, + async getPromptForCommand(args, _context) { + const { readFile } = await import('fs/promises') + const content = await readFile(join(workflowDir, file), 'utf-8') + return [{ type: 'text' as const, text: `Execute this workflow:\n\n${content}${args ? `\n\nArguments: ${args}` : ''}` }] + }, + } satisfies Command + }) } diff --git a/src/utils/pipeRepl.ts b/src/utils/pipeRepl.ts new file mode 100644 index 0000000..940eb00 --- /dev/null +++ b/src/utils/pipeRepl.ts @@ -0,0 +1,187 @@ +/** + * Pipe REPL - Interactive terminal-to-terminal communication demo + * + * This module wires up PipeServer / PipeClient to provide a bidirectional + * chat + remote-command channel between two independent Claude Code terminals. + * + * Usage (two separate terminals): + * + * Terminal A (server): + * import { startPipeRepl } from './pipeRepl.js' + * await startPipeRepl({ role: 'server', name: 'repl' }) + * + * Terminal B (client): + * import { startPipeRepl } from './pipeRepl.js' + * await startPipeRepl({ role: 'client', target: 'repl', name: 'client-b' }) + * + * Messages: + * - Plain text → chat message forwarded to the other side + * - /cmd → remote command execution request + * - /exit → graceful disconnect + * - /list → show all active pipes + * - /ping → latency check + */ + +import { createInterface } from 'readline' +import { + createPipeServer, + connectToPipe, + listPipes, + type PipeMessage, + type PipeServer, + type PipeClient, +} from './pipeTransport.js' + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +export type PipeReplOptions = + | { role: 'server'; name: string } + | { role: 'client'; target: string; name?: string } + +// --------------------------------------------------------------------------- +// REPL +// --------------------------------------------------------------------------- + +export async function startPipeRepl(options: PipeReplOptions): Promise { + let server: PipeServer | undefined + let client: PipeClient | undefined + + // A unified send function set after connection + let sendFn: (msg: PipeMessage) => void = () => {} + + const log = (prefix: string, text: string) => { + process.stdout.write(`\n${prefix} ${text}\n> `) + } + + // Handler for incoming messages (shared by server & client) + const handleMessage = (msg: PipeMessage, reply: (m: PipeMessage) => void) => { + switch (msg.type) { + case 'chat': + log(`[${msg.from}]`, msg.data ?? '') + break + + case 'cmd': + log(`[${msg.from}] CMD:`, msg.data ?? '') + // Execute and reply with result + try { + const { execSync } = require('child_process') as typeof import('child_process') + const output = execSync(msg.data ?? '', { + encoding: 'utf-8', + timeout: 10_000, + }).trim() + reply({ type: 'result', data: output }) + } catch (err: unknown) { + const errMsg = err instanceof Error ? err.message : String(err) + reply({ type: 'result', data: `ERROR: ${errMsg}` }) + } + break + + case 'result': + log('[RESULT]', msg.data ?? '(empty)') + break + + case 'ping': + reply({ type: 'pong', data: new Date().toISOString() }) + break + + case 'pong': + log('[PONG]', `from ${msg.from} at ${msg.data}`) + break + + case 'exit': + log('[SYSTEM]', `${msg.from} disconnected.`) + break + } + } + + // ---- Setup ---- + + if (options.role === 'server') { + server = await createPipeServer(options.name) + + // Auto-respond to pings (for health checks) + server.onMessage((msg, reply) => { + if (msg.type === 'ping') { + reply({ type: 'pong', data: new Date().toISOString() }) + } + }) + + server.onMessage(handleMessage) + sendFn = (msg) => server!.broadcast(msg) + + console.log(`[PIPE SERVER] Listening as "${options.name}"`) + console.log(`[PIPE SERVER] Socket: ${server.socketPath}`) + console.log(`[PIPE SERVER] Waiting for connections...`) + + server.on('connection', () => { + log('[SYSTEM]', `Client connected (${server!.connectionCount} total)`) + }) + server.on('disconnect', () => { + log('[SYSTEM]', `Client disconnected (${server!.connectionCount} remaining)`) + }) + } else { + const senderName = options.name ?? `client-${process.pid}` + client = await connectToPipe(options.target, senderName) + client.onMessage(handleMessage) + sendFn = (msg) => client!.send(msg) + + console.log(`[PIPE CLIENT] Connected to "${options.target}" as "${senderName}"`) + } + + // ---- Interactive loop ---- + + const rl = createInterface({ + input: process.stdin, + output: process.stdout, + prompt: '> ', + }) + + rl.prompt() + + rl.on('line', async (line) => { + const input = line.trim() + if (!input) { + rl.prompt() + return + } + + if (input === '/exit') { + sendFn({ type: 'exit' }) + console.log('[SYSTEM] Bye!') + if (server) await server.close() + if (client) client.disconnect() + rl.close() + process.exit(0) + } + + if (input === '/list') { + const pipes = await listPipes() + console.log(`[PIPES] Active: ${pipes.length > 0 ? pipes.join(', ') : '(none)'}`) + rl.prompt() + return + } + + if (input === '/ping') { + sendFn({ type: 'ping' }) + rl.prompt() + return + } + + if (input.startsWith('/cmd ')) { + sendFn({ type: 'cmd', data: input.slice(5) }) + rl.prompt() + return + } + + // Default: chat message + sendFn({ type: 'chat', data: input }) + rl.prompt() + }) + + rl.on('close', async () => { + if (server) await server.close() + if (client) client.disconnect() + }) +} diff --git a/src/utils/pipeTransport.ts b/src/utils/pipeTransport.ts new file mode 100644 index 0000000..da45559 --- /dev/null +++ b/src/utils/pipeTransport.ts @@ -0,0 +1,456 @@ +/** + * Named Pipe Transport - Unix domain socket IPC for CLI terminals + * + * Supports two modes: + * 1. Standalone: Two independent terminals chat via pipes + * 2. Master-Slave bridge: Master CLI attaches to Slave CLI, forwarding + * prompts and receiving streamed AI output back. + * + * Each CLI auto-creates a PipeServer at: + * ~/.claude/pipes/{session-short-id}.sock + * + * Protocol: newline-delimited JSON (NDJSON), one message per line. + */ + +import { createServer, createConnection, type Server, type Socket } from 'net' +import { mkdir, unlink, readdir } from 'fs/promises' +import { join } from 'path' +import { EventEmitter } from 'events' +import { getClaudeConfigHomeDir } from './envUtils.js' +import { logError } from './log.js' + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/** + * Message types exchanged over the pipe. + * + * Basic: ping, pong + * Control: attach_request, attach_accept, attach_reject, detach + * Data (M→S): prompt — master sends user input to slave + * Data (S→M): stream — slave streams AI output fragments + * tool_start — slave notifies tool execution start + * tool_result — slave notifies tool result + * done — slave signals turn complete + * error — either side reports an error + * Legacy: chat, cmd, result, exit — kept for backward compat + */ +export type PipeMessageType = + // Basic + | 'ping' + | 'pong' + // Control flow (master-slave bridge) + | 'attach_request' + | 'attach_accept' + | 'attach_reject' + | 'detach' + // Data flow (master → slave) + | 'prompt' + // Data flow (slave → master) + | 'stream' + | 'tool_start' + | 'tool_result' + | 'done' + | 'error' + // Legacy (standalone chat demo) + | 'chat' + | 'cmd' + | 'result' + | 'exit' + +export type PipeMessage = { + /** Discriminator */ + type: PipeMessageType + /** Payload (text, command output, prompt, stream fragment, etc.) */ + data?: string + /** Sender pipe name */ + from?: string + /** ISO timestamp */ + ts?: string + /** Additional metadata (tool name, error details, etc.) */ + meta?: Record +} + +export type PipeMessageHandler = (msg: PipeMessage, reply: (msg: PipeMessage) => void) => void + +// --------------------------------------------------------------------------- +// Paths +// --------------------------------------------------------------------------- + +function getPipesDir(): string { + return join(getClaudeConfigHomeDir(), 'pipes') +} + +export function getPipePath(name: string): string { + const safeName = name.replace(/[^a-zA-Z0-9_-]/g, '_') + if (process.platform === 'win32') { + return `\\\\.\\pipe\\claude-code-${safeName}` + } + return join(getPipesDir(), `${safeName}.sock`) +} + +async function ensurePipesDir(): Promise { + await mkdir(getPipesDir(), { recursive: true }) +} + +// --------------------------------------------------------------------------- +// Server (listener side) +// --------------------------------------------------------------------------- + +export class PipeServer extends EventEmitter { + private server: Server | null = null + private clients: Set = new Set() + private handlers: PipeMessageHandler[] = [] + readonly name: string + readonly socketPath: string + + constructor(name: string) { + super() + this.name = name + this.socketPath = getPipePath(name) + } + + /** + * Start listening for incoming connections. + */ + async start(): Promise { + await ensurePipesDir() + + // Clean up stale socket file (Unix only) + if (process.platform !== 'win32') { + try { + await unlink(this.socketPath) + } catch { + // File doesn't exist — fine + } + } + + return new Promise((resolve, reject) => { + this.server = createServer((socket) => { + this.clients.add(socket) + this.emit('connection', socket) + + let buffer = '' + + socket.on('data', (chunk) => { + buffer += chunk.toString() + const lines = buffer.split('\n') + buffer = lines.pop() ?? '' + + for (const line of lines) { + if (!line.trim()) continue + try { + const msg = JSON.parse(line) as PipeMessage + this.emit('message', msg) + const reply = (replyMsg: PipeMessage) => { + replyMsg.from = replyMsg.from ?? this.name + replyMsg.ts = replyMsg.ts ?? new Date().toISOString() + if (!socket.destroyed) { + socket.write(JSON.stringify(replyMsg) + '\n') + } + } + for (const handler of this.handlers) { + handler(msg, reply) + } + } catch { + // Malformed JSON — skip + } + } + }) + + socket.on('close', () => { + this.clients.delete(socket) + this.emit('disconnect', socket) + }) + + socket.on('error', (err) => { + this.clients.delete(socket) + logError(err) + }) + }) + + this.server.on('error', reject) + + this.server.listen(this.socketPath, () => { + resolve() + }) + }) + } + + /** + * Register a handler for incoming messages. + */ + onMessage(handler: PipeMessageHandler): void { + this.handlers.push(handler) + } + + /** + * Broadcast a message to all connected clients. + */ + broadcast(msg: PipeMessage): void { + msg.from = msg.from ?? this.name + msg.ts = msg.ts ?? new Date().toISOString() + const line = JSON.stringify(msg) + '\n' + for (const client of this.clients) { + if (!client.destroyed) { + client.write(line) + } + } + } + + /** + * Send to a specific socket (used for directed replies in attach flow). + */ + sendTo(socket: Socket, msg: PipeMessage): void { + msg.from = msg.from ?? this.name + msg.ts = msg.ts ?? new Date().toISOString() + if (!socket.destroyed) { + socket.write(JSON.stringify(msg) + '\n') + } + } + + get connectionCount(): number { + return this.clients.size + } + + async close(): Promise { + for (const client of this.clients) { + client.destroy() + } + this.clients.clear() + + return new Promise((resolve) => { + if (!this.server) { + resolve() + return + } + this.server.close(() => { + this.server = null + if (process.platform !== 'win32') { + void unlink(this.socketPath).catch(() => {}) + } + resolve() + }) + }) + } +} + +// --------------------------------------------------------------------------- +// Client (connector side) +// --------------------------------------------------------------------------- + +export class PipeClient extends EventEmitter { + private socket: Socket | null = null + private handlers: PipeMessageHandler[] = [] + readonly targetName: string + readonly senderName: string + readonly socketPath: string + + constructor(targetName: string, senderName?: string) { + super() + this.targetName = targetName + this.senderName = senderName ?? `client-${process.pid}` + this.socketPath = getPipePath(targetName) + } + + /** + * Connect to a remote pipe server. + * Retries automatically if the socket file doesn't exist yet (ENOENT). + */ + async connect(timeoutMs: number = 5000): Promise { + const { access } = await import('fs/promises') + const deadline = Date.now() + timeoutMs + const retryDelayMs = 300 + + // Wait for socket file to exist (Unix only) + if (process.platform !== 'win32') { + while (Date.now() < deadline) { + try { + await access(this.socketPath) + break + } catch { + if (Date.now() + retryDelayMs >= deadline) { + throw new Error( + `Pipe "${this.targetName}" not found at ${this.socketPath}. Is the server running?`, + ) + } + await new Promise((r) => setTimeout(r, retryDelayMs)) + } + } + } + + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + reject(new Error(`Connection to pipe "${this.targetName}" timed out after ${timeoutMs}ms`)) + }, Math.max(deadline - Date.now(), 1000)) + + const socket = createConnection({ path: this.socketPath }, () => { + clearTimeout(timer) + this.socket = socket + this.setupSocketListeners(socket) + this.emit('connected') + resolve() + }) + + socket.on('error', (err) => { + clearTimeout(timer) + socket.destroy() + reject(err) + }) + }) + } + + private setupSocketListeners(socket: Socket): void { + let buffer = '' + + socket.on('data', (chunk) => { + buffer += chunk.toString() + const lines = buffer.split('\n') + buffer = lines.pop() ?? '' + + for (const line of lines) { + if (!line.trim()) continue + try { + const msg = JSON.parse(line) as PipeMessage + this.emit('message', msg) + const reply = (replyMsg: PipeMessage) => this.send(replyMsg) + for (const handler of this.handlers) { + handler(msg, reply) + } + } catch { + // Malformed JSON — skip + } + } + }) + + socket.on('close', () => { + this.emit('disconnect') + }) + + socket.on('error', (err) => { + logError(err) + }) + } + + onMessage(handler: PipeMessageHandler): void { + this.handlers.push(handler) + } + + send(msg: PipeMessage): void { + if (!this.socket || this.socket.destroyed) { + throw new Error(`Not connected to pipe "${this.targetName}"`) + } + msg.from = msg.from ?? this.senderName + msg.ts = msg.ts ?? new Date().toISOString() + this.socket.write(JSON.stringify(msg) + '\n') + } + + disconnect(): void { + if (this.socket) { + this.socket.destroy() + this.socket = null + } + } + + get connected(): boolean { + return this.socket !== null && !this.socket.destroyed + } +} + +// --------------------------------------------------------------------------- +// Convenience factory functions +// --------------------------------------------------------------------------- + +export async function createPipeServer(name: string): Promise { + const server = new PipeServer(name) + await server.start() + return server +} + +export async function connectToPipe( + targetName: string, + senderName?: string, + timeoutMs?: number, +): Promise { + const client = new PipeClient(targetName, senderName) + await client.connect(timeoutMs) + return client +} + +/** + * List all active pipe names (by scanning the pipes directory). + */ +export async function listPipes(): Promise { + try { + await ensurePipesDir() + const files = await readdir(getPipesDir()) + return files + .filter((f) => f.endsWith('.sock')) + .map((f) => f.replace(/\.sock$/, '')) + } catch { + return [] + } +} + +/** + * Probe whether a pipe server is alive by sending a ping. + */ +export async function isPipeAlive(name: string, timeoutMs: number = 2000): Promise { + try { + const client = new PipeClient(name, '_probe') + await client.connect(timeoutMs) + + return new Promise((resolve) => { + const timer = setTimeout(() => { + client.disconnect() + resolve(false) + }, timeoutMs) + + client.onMessage((msg) => { + if (msg.type === 'pong') { + clearTimeout(timer) + client.disconnect() + resolve(true) + } + }) + + client.send({ type: 'ping' }) + }) + } catch { + return false + } +} + +// ─── PipeIpc AppState extension ────────────────────────────────────── +// AppState.pipeIpc is added at runtime when feature('PIPE_IPC') is on. +// These types and the default accessor ensure safe access from hooks +// and commands without modifying the original AppStateStore. + +export type PipeIpcSlaveState = { + client: PipeClient | null + history: Array<{ type: string; content: string; from: string; timestamp: string; meta?: Record }> +} + +export type PipeIpcState = { + role: 'standalone' | 'master' | 'slave' + serverName: string | null + attachedBy: string | null + slaves: Record +} + +const DEFAULT_PIPE_IPC: PipeIpcState = { + role: 'standalone', + serverName: null, + attachedBy: null, + slaves: {}, +} + +/** + * Safely read pipeIpc from AppState, returning the default if not yet initialized. + * This avoids crashes when the state hasn't been extended by the PIPE_IPC bootstrap. + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export function getPipeIpc(state: any): PipeIpcState { + return state?.pipeIpc ?? DEFAULT_PIPE_IPC +} diff --git a/src/utils/udsClient.ts b/src/utils/udsClient.ts index 934cdeb..781f3dd 100644 --- a/src/utils/udsClient.ts +++ b/src/utils/udsClient.ts @@ -1,3 +1,219 @@ -export async function createUdsClient() { - return null +/** + * UDS Client — connect to peer Claude Code sessions via Unix Domain Sockets. + * + * Peers are discovered by reading the PID-file registry in ~/.claude/sessions/ + * (written by concurrentSessions.ts) and checking each entry's + * `messagingSocketPath` field. A peer is "alive" if its PID is running and + * its socket accepts a ping/pong round-trip. + */ + +import { createConnection, type Socket } from 'net' +import { readdir, readFile } from 'fs/promises' +import { join } from 'path' +import { getClaudeConfigHomeDir } from './envUtils.js' +import { logForDebugging } from './debug.js' +import { errorMessage, isFsInaccessible } from './errors.js' +import { isProcessRunning } from './genericProcessUtils.js' +import { jsonParse, jsonStringify } from './slowOperations.js' +import type { SessionKind } from './concurrentSessions.js' +import type { UdsMessage } from './udsMessaging.js' + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +export type PeerSession = { + pid: number + sessionId?: string + cwd?: string + startedAt?: number + kind?: SessionKind + name?: string + messagingSocketPath?: string + entrypoint?: string + bridgeSessionId?: string | null + alive: boolean +} + +// --------------------------------------------------------------------------- +// Session directory +// --------------------------------------------------------------------------- + +function getSessionsDir(): string { + return join(getClaudeConfigHomeDir(), 'sessions') +} + +// --------------------------------------------------------------------------- +// Discovery +// --------------------------------------------------------------------------- + +/** + * List all live sessions from the PID registry, optionally probing their + * UDS sockets for liveness. Sessions whose PID is no longer running are + * excluded (and their stale files cleaned up). + */ +export async function listAllLiveSessions(): Promise { + const dir = getSessionsDir() + let files: string[] + try { + files = await readdir(dir) + } catch (e) { + if (!isFsInaccessible(e)) { + logForDebugging(`[udsClient] readdir failed: ${errorMessage(e)}`) + } + return [] + } + + const results: PeerSession[] = [] + + for (const file of files) { + if (!/^\d+\.json$/.test(file)) continue + const pid = parseInt(file.slice(0, -5), 10) + + if (!isProcessRunning(pid)) { + // Stale — skip (concurrentSessions handles cleanup) + continue + } + + try { + const raw = await readFile(join(dir, file), 'utf8') + const data = jsonParse(raw) as Record + results.push({ + pid, + sessionId: data.sessionId as string | undefined, + cwd: data.cwd as string | undefined, + startedAt: data.startedAt as number | undefined, + kind: data.kind as SessionKind | undefined, + name: data.name as string | undefined, + messagingSocketPath: data.messagingSocketPath as string | undefined, + entrypoint: data.entrypoint as string | undefined, + bridgeSessionId: data.bridgeSessionId as string | null | undefined, + alive: true, + }) + } catch { + // Corrupted file — skip + } + } + + return results +} + +/** + * List peer sessions that have a UDS messaging socket (i.e. can receive + * messages). Excludes the current process. + */ +export async function listPeers(): Promise { + const all = await listAllLiveSessions() + return all.filter( + s => s.pid !== process.pid && s.messagingSocketPath != null, + ) +} + +// --------------------------------------------------------------------------- +// Connection helpers +// --------------------------------------------------------------------------- + +/** + * Probe a UDS socket to check if a server is listening (ping/pong). + * Returns true if the peer responds within the timeout. + */ +export async function isPeerAlive(socketPath: string, timeoutMs = 3000): Promise { + return new Promise((resolve) => { + const conn = createConnection(socketPath, () => { + const ping: UdsMessage = { type: 'ping', ts: new Date().toISOString() } + conn.write(jsonStringify(ping) + '\n') + }) + + let resolved = false + + const timer = setTimeout(() => { + if (!resolved) { + resolved = true + conn.destroy() + resolve(false) + } + }, timeoutMs) + + let buffer = '' + conn.on('data', (chunk) => { + buffer += chunk.toString() + if (buffer.includes('"pong"')) { + if (!resolved) { + resolved = true + clearTimeout(timer) + conn.end() + resolve(true) + } + } + }) + + conn.on('error', () => { + if (!resolved) { + resolved = true + clearTimeout(timer) + resolve(false) + } + }) + }) +} + +/** + * Send a text message to a peer's UDS socket. This is the high-level helper + * used by SendMessageTool for `uds:` addresses. + */ +export async function sendToUdsSocket( + targetSocketPath: string, + message: string | Record, +): Promise { + const data = typeof message === 'string' ? message : jsonStringify(message) + const udsMsg: UdsMessage = { + type: 'text', + data, + ts: new Date().toISOString(), + } + + // Lazily import to avoid circular dep at module-load time + const { getUdsMessagingSocketPath } = await import('./udsMessaging.js') + udsMsg.from = getUdsMessagingSocketPath() + + return new Promise((resolve, reject) => { + const conn = createConnection(targetSocketPath, () => { + conn.write(jsonStringify(udsMsg) + '\n', (err) => { + conn.end() + if (err) reject(err) + else resolve() + }) + }) + conn.on('error', (err) => { + reject(new Error(`Failed to connect to peer at ${targetSocketPath}: ${errorMessage(err)}`)) + }) + conn.setTimeout(5000, () => { + conn.destroy(new Error('Connection timed out')) + }) + }) +} + +/** + * Connect to a peer and return the raw socket for bidirectional communication. + * The caller is responsible for managing the connection lifecycle. + */ +export function connectToPeer(socketPath: string): Promise { + return new Promise((resolve, reject) => { + const conn = createConnection(socketPath, () => { + resolve(conn) + }) + conn.on('error', reject) + conn.setTimeout(5000, () => { + conn.destroy(new Error('Connection timed out')) + }) + }) +} + +/** + * Disconnect a previously connected peer socket. + */ +export function disconnectPeer(socket: Socket): void { + if (!socket.destroyed) { + socket.end() + } } diff --git a/src/utils/udsMessaging.ts b/src/utils/udsMessaging.ts index 22e2a81..8bcd507 100644 --- a/src/utils/udsMessaging.ts +++ b/src/utils/udsMessaging.ts @@ -1 +1,270 @@ -export async function sendUdsMessage(): Promise {} +/** + * UDS Messaging Layer — Unix Domain Socket IPC for Claude Code instances. + * + * Each session auto-creates a UDS server so peer sessions can send messages. + * Protocol: newline-delimited JSON (NDJSON), one message per line. + * + * Socket path defaults to a tmpdir-based path derived from the session PID, + * but can be overridden via --messaging-socket-path. + */ + +import { createServer, type Server, type Socket } from 'net' +import { mkdir, unlink } from 'fs/promises' +import { dirname, join } from 'path' +import { tmpdir } from 'os' +import { registerCleanup } from './cleanupRegistry.js' +import { logForDebugging } from './debug.js' +import { errorMessage } from './errors.js' +import { jsonParse, jsonStringify } from './slowOperations.js' + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +export type UdsMessageType = + | 'text' + | 'notification' + | 'query' + | 'response' + | 'ping' + | 'pong' + +export type UdsMessage = { + /** Discriminator */ + type: UdsMessageType + /** Payload text / JSON content */ + data?: string + /** Sender socket path (so the receiver can reply) */ + from?: string + /** ISO timestamp */ + ts?: string + /** Optional metadata */ + meta?: Record +} + +export type UdsInboxEntry = { + id: string + message: UdsMessage + receivedAt: number + status: 'pending' | 'processed' +} + +// --------------------------------------------------------------------------- +// Module state +// --------------------------------------------------------------------------- + +let server: Server | null = null +let socketPath: string | null = null +let onEnqueueCb: (() => void) | null = null +const clients = new Set() +const inbox: UdsInboxEntry[] = [] +let nextId = 1 + +// --------------------------------------------------------------------------- +// Public API — socket path helpers +// --------------------------------------------------------------------------- + +/** + * Default socket path based on PID, placed in a tmpdir subdirectory so it + * survives across config-home changes and avoids polluting ~/.claude. + */ +export function getDefaultUdsSocketPath(): string { + return join(tmpdir(), 'claude-code-socks', `${process.pid}.sock`) +} + +/** + * Returns the socket path of the currently running server, or undefined + * if the server has not been started. + */ +export function getUdsMessagingSocketPath(): string | undefined { + return socketPath ?? undefined +} + +// --------------------------------------------------------------------------- +// Inbox +// --------------------------------------------------------------------------- + +/** + * Register a callback invoked whenever a message is enqueued into the inbox. + * Used by the print/SDK query loop to kick off processing. + */ +export function setOnEnqueue(cb: (() => void) | null): void { + onEnqueueCb = cb +} + +/** + * Drain all pending inbox messages, marking them processed. + */ +export function drainInbox(): UdsInboxEntry[] { + const pending = inbox.filter(e => e.status === 'pending') + for (const entry of pending) { + entry.status = 'processed' + } + return pending +} + +// --------------------------------------------------------------------------- +// Server +// --------------------------------------------------------------------------- + +/** + * Start the UDS messaging server on the given socket path. + * + * Exports `CLAUDE_CODE_MESSAGING_SOCKET` into `process.env` so child + * processes (hooks, spawned agents) can discover and connect back. + */ +export async function startUdsMessaging( + path: string, + opts?: { isExplicit?: boolean }, +): Promise { + if (server) { + logForDebugging('[udsMessaging] server already running, skipping start') + return + } + + // Ensure parent directory exists + await mkdir(dirname(path), { recursive: true }) + + // Clean up stale socket file + try { + await unlink(path) + } catch { + // ENOENT is fine + } + + socketPath = path + + await new Promise((resolve, reject) => { + const srv = createServer((socket) => { + clients.add(socket) + logForDebugging(`[udsMessaging] client connected (total: ${clients.size})`) + + let buffer = '' + + socket.on('data', (chunk) => { + buffer += chunk.toString() + const lines = buffer.split('\n') + buffer = lines.pop() ?? '' + + for (const line of lines) { + if (!line.trim()) continue + try { + const msg = jsonParse(line) as UdsMessage + + // Handle ping with automatic pong + if (msg.type === 'ping') { + const pong: UdsMessage = { + type: 'pong', + from: socketPath ?? undefined, + ts: new Date().toISOString(), + } + if (!socket.destroyed) { + socket.write(jsonStringify(pong) + '\n') + } + continue + } + + // Enqueue into inbox + const entry: UdsInboxEntry = { + id: `uds-${nextId++}`, + message: msg, + receivedAt: Date.now(), + status: 'pending', + } + inbox.push(entry) + logForDebugging( + `[udsMessaging] enqueued message type=${msg.type} from=${msg.from ?? 'unknown'}`, + ) + onEnqueueCb?.() + } catch { + // Malformed JSON — skip + } + } + }) + + socket.on('close', () => { + clients.delete(socket) + }) + + socket.on('error', (err) => { + clients.delete(socket) + logForDebugging(`[udsMessaging] client error: ${errorMessage(err)}`) + }) + }) + + srv.on('error', reject) + + srv.listen(path, () => { + server = srv + // Export so child processes can discover the socket + process.env.CLAUDE_CODE_MESSAGING_SOCKET = path + logForDebugging( + `[udsMessaging] server listening on ${path}${opts?.isExplicit ? ' (explicit)' : ''}`, + ) + resolve() + }) + }) + + // Register cleanup so the socket file is removed on exit + registerCleanup(async () => { + await stopUdsMessaging() + }) +} + +/** + * Stop the UDS messaging server and clean up the socket file. + */ +export async function stopUdsMessaging(): Promise { + if (!server) return + + // Close all connected clients + for (const socket of clients) { + socket.destroy() + } + clients.clear() + + await new Promise((resolve) => { + server!.close(() => resolve()) + }) + server = null + + // Remove socket file + if (socketPath) { + try { + await unlink(socketPath) + } catch { + // Already gone + } + delete process.env.CLAUDE_CODE_MESSAGING_SOCKET + logForDebugging(`[udsMessaging] server stopped, socket removed: ${socketPath}`) + socketPath = null + } +} + +/** + * Send a UDS message to a specific socket path (outbound — used when this + * session wants to push a message to a peer's server). + */ +export async function sendUdsMessage( + targetSocketPath: string, + message: UdsMessage, +): Promise { + const { createConnection } = await import('net') + message.from = message.from ?? socketPath ?? undefined + message.ts = message.ts ?? new Date().toISOString() + + return new Promise((resolve, reject) => { + const conn = createConnection(targetSocketPath, () => { + conn.write(jsonStringify(message) + '\n', (err) => { + conn.end() + if (err) reject(err) + else resolve() + }) + }) + conn.on('error', reject) + // Timeout so we don't hang on unreachable sockets + conn.setTimeout(5000, () => { + conn.destroy(new Error('Connection timed out')) + }) + }) +} diff --git a/test-pipe-ipc.ts b/test-pipe-ipc.ts new file mode 100644 index 0000000..182d1ea --- /dev/null +++ b/test-pipe-ipc.ts @@ -0,0 +1,450 @@ +#!/usr/bin/env bun +/** + * 双进程 Pipe IPC 端到端测试 + * + * 模拟两个独立 CLI 的完整通信流程: + * 1. 进程 A(slave):启动 PipeServer,等待 attach + * 2. 进程 B(master):连接到 A,发送 attach_request,发送 prompt,接收 stream/done + * + * 用法: + * bun run test-pipe-ipc.ts + * + * 或分别测试两个独立进程: + * bun run test-pipe-ipc.ts slave + * bun run test-pipe-ipc.ts master + */ + +import { + createPipeServer, + connectToPipe, + listPipes, + isPipeAlive, + type PipeMessage, + type PipeServer, + type PipeClient, +} from './src/utils/pipeTransport.js' + +const SLAVE_PIPE = 'test-slave-001' +const MASTER_NAME = 'test-master-001' + +// ─── Colors for output ─── +const RED = '\x1b[31m' +const GREEN = '\x1b[32m' +const YELLOW = '\x1b[33m' +const CYAN = '\x1b[36m' +const RESET = '\x1b[0m' + +function log(role: string, msg: string) { + const color = role === 'SLAVE' ? CYAN : YELLOW + const ts = new Date().toISOString().slice(11, 23) + console.log(`${color}[${ts}] [${role}]${RESET} ${msg}`) +} + +function pass(test: string) { + console.log(` ${GREEN}✓ PASS${RESET}: ${test}`) +} + +function fail(test: string, reason: string) { + console.log(` ${RED}✗ FAIL${RESET}: ${test} — ${reason}`) + process.exitCode = 1 +} + +// ═══════════════════════════════════════════════════════════════ +// Slave Process: starts PipeServer, handles attach/prompt/detach +// ═══════════════════════════════════════════════════════════════ +async function runSlave(): Promise { + log('SLAVE', `Starting PipeServer "${SLAVE_PIPE}"...`) + const server = await createPipeServer(SLAVE_PIPE) + log('SLAVE', `Server started at ${server.socketPath}`) + + let role: 'standalone' | 'slave' = 'standalone' + let masterSocket: import('net').Socket | null = null + + // Ping handler + server.onMessage((msg, reply) => { + if (msg.type === 'ping') { + reply({ type: 'pong' }) + } + }) + + // Attach handler + server.onMessage((msg, reply) => { + if (msg.type === 'attach_request') { + if (role === 'slave') { + reply({ type: 'attach_reject', data: 'Already attached' }) + log('SLAVE', `Rejected attach from ${msg.from}`) + return + } + + role = 'slave' + // Get latest client socket + const clients = Array.from((server as any).clients as Set) + masterSocket = clients[clients.length - 1] ?? null + + reply({ type: 'attach_accept', data: SLAVE_PIPE }) + log('SLAVE', `Accepted attach from ${msg.from}`) + } + }) + + // Detach handler + server.onMessage((msg, _reply) => { + if (msg.type === 'detach') { + role = 'standalone' + masterSocket = null + log('SLAVE', `Detached by ${msg.from}`) + } + }) + + // Prompt handler — simulate AI processing + server.onMessage((msg, _reply) => { + if (msg.type === 'prompt') { + if (role !== 'slave') return + log('SLAVE', `Received prompt: "${msg.data}"`) + + // Simulate AI response: stream fragments → tool_start → tool_result → done + if (masterSocket && !masterSocket.destroyed) { + const send = (m: PipeMessage) => { + m.from = m.from ?? SLAVE_PIPE + m.ts = m.ts ?? new Date().toISOString() + masterSocket!.write(JSON.stringify(m) + '\n') + } + + setTimeout(() => { + send({ type: 'stream', data: 'Processing your request' }) + log('SLAVE', 'Sent stream fragment 1') + }, 100) + + setTimeout(() => { + send({ type: 'stream', data: '... analyzing code...' }) + log('SLAVE', 'Sent stream fragment 2') + }, 200) + + setTimeout(() => { + send({ type: 'tool_start', data: 'ReadFile', meta: { toolUseId: 'tool-123' } }) + log('SLAVE', 'Sent tool_start') + }, 300) + + setTimeout(() => { + send({ type: 'tool_result', data: 'file contents here...', meta: { toolUseId: 'tool-123' } }) + log('SLAVE', 'Sent tool_result') + }, 400) + + setTimeout(() => { + send({ type: 'done' }) + log('SLAVE', 'Sent done') + }, 500) + } + } + }) + + return server +} + +// ═══════════════════════════════════════════════════════════════ +// Master Process: connects to slave, attaches, sends prompt +// ═══════════════════════════════════════════════════════════════ +async function runMaster(): Promise { + log('MASTER', `Connecting to slave "${SLAVE_PIPE}"...`) + const client = await connectToPipe(SLAVE_PIPE, MASTER_NAME) + log('MASTER', 'Connected!') + return client +} + +// ═══════════════════════════════════════════════════════════════ +// Full integration test (both in same process for simplicity, +// but they communicate via real Unix domain sockets) +// ═══════════════════════════════════════════════════════════════ +async function runFullTest() { + console.log('\n═══════════════════════════════════════════════') + console.log(' Pipe IPC 双进程通信端到端测试') + console.log('═══════════════════════════════════════════════\n') + + let testsPassed = 0 + let testsFailed = 0 + + // ── Test 1: Server startup ── + let server: PipeServer + try { + server = await runSlave() + pass('Slave PipeServer 启动成功') + testsPassed++ + } catch (err) { + fail('Slave PipeServer 启动', String(err)) + process.exit(1) + return // unreachable + } + + // ── Test 2: List pipes ── + try { + const pipes = await listPipes() + if (pipes.includes(SLAVE_PIPE)) { + pass(`listPipes() 发现了 "${SLAVE_PIPE}"`) + testsPassed++ + } else { + fail('listPipes()', `未找到 "${SLAVE_PIPE}",只有: ${pipes.join(', ')}`) + testsFailed++ + } + } catch (err) { + fail('listPipes()', String(err)) + testsFailed++ + } + + // ── Test 3: Ping/Pong (liveness check) ── + try { + const alive = await isPipeAlive(SLAVE_PIPE) + if (alive) { + pass('isPipeAlive() ping/pong 健康检查通过') + testsPassed++ + } else { + fail('isPipeAlive()', '返回 false') + testsFailed++ + } + } catch (err) { + fail('isPipeAlive()', String(err)) + testsFailed++ + } + + // ── Test 4: Master connects ── + let client: PipeClient + try { + client = await runMaster() + pass('Master 连接到 Slave 成功') + testsPassed++ + } catch (err) { + fail('Master 连接', String(err)) + await server.close() + process.exit(1) + return + } + + // ── Test 5: Attach request/accept ── + try { + const attachResult = await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('attach 超时')), 3000) + + client.onMessage((msg) => { + if (msg.type === 'attach_accept') { + clearTimeout(timeout) + resolve('accepted') + } else if (msg.type === 'attach_reject') { + clearTimeout(timeout) + resolve(`rejected: ${msg.data}`) + } + }) + + client.send({ type: 'attach_request' }) + log('MASTER', 'Sent attach_request') + }) + + if (attachResult === 'accepted') { + pass('Attach 请求被接受') + testsPassed++ + } else { + fail('Attach 请求', attachResult) + testsFailed++ + } + } catch (err) { + fail('Attach 请求', String(err)) + testsFailed++ + } + + // ── Test 6: Duplicate attach rejection ── + try { + const client2 = await connectToPipe(SLAVE_PIPE, 'intruder') + const rejectResult = await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('超时')), 3000) + client2.onMessage((msg) => { + if (msg.type === 'attach_reject') { + clearTimeout(timeout) + resolve('rejected') + } else if (msg.type === 'attach_accept') { + clearTimeout(timeout) + resolve('accepted (wrong!)') + } + }) + client2.send({ type: 'attach_request' }) + }) + + client2.disconnect() + + if (rejectResult === 'rejected') { + pass('重复 attach 被正确拒绝') + testsPassed++ + } else { + fail('重复 attach', `期望被拒绝,但结果是: ${rejectResult}`) + testsFailed++ + } + } catch (err) { + fail('重复 attach', String(err)) + testsFailed++ + } + + // ── Test 7: Send prompt and receive full session data ── + try { + const receivedMessages: PipeMessage[] = [] + + const sessionResult = await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('session 数据超时')), 5000) + + client.onMessage((msg) => { + if (['stream', 'tool_start', 'tool_result', 'done'].includes(msg.type)) { + receivedMessages.push(msg) + log('MASTER', `收到 ${msg.type}: ${(msg.data ?? '').slice(0, 50)}`) + + if (msg.type === 'done') { + clearTimeout(timeout) + resolve(receivedMessages) + } + } + }) + + client.send({ type: 'prompt', data: '请帮我分析代码' }) + log('MASTER', '发送 prompt: "请帮我分析代码"') + }) + + // Verify received message types + const types = sessionResult.map((m) => m.type) + const expectedTypes = ['stream', 'stream', 'tool_start', 'tool_result', 'done'] + + if (JSON.stringify(types) === JSON.stringify(expectedTypes)) { + pass(`收到完整 session 数据: ${types.join(' → ')}`) + testsPassed++ + } else { + fail('Session 数据', `期望 [${expectedTypes.join(', ')}],实际 [${types.join(', ')}]`) + testsFailed++ + } + + // Verify stream content + const streamContent = sessionResult + .filter((m) => m.type === 'stream') + .map((m) => m.data) + .join('') + if (streamContent.includes('Processing') && streamContent.includes('analyzing')) { + pass('Stream 内容完整') + testsPassed++ + } else { + fail('Stream 内容', `内容不完整: "${streamContent}"`) + testsFailed++ + } + + // Verify tool events + const toolStart = sessionResult.find((m) => m.type === 'tool_start') + if (toolStart?.data === 'ReadFile' && toolStart?.meta?.toolUseId === 'tool-123') { + pass('Tool 事件携带正确 metadata') + testsPassed++ + } else { + fail('Tool 事件', `数据不正确: ${JSON.stringify(toolStart)}`) + testsFailed++ + } + } catch (err) { + fail('Session 数据传输', String(err)) + testsFailed++ + } + + // ── Test 8: Detach ── + try { + client.send({ type: 'detach' }) + log('MASTER', '发送 detach') + // Give slave time to process + await new Promise((r) => setTimeout(r, 200)) + pass('Detach 命令发送成功') + testsPassed++ + } catch (err) { + fail('Detach', String(err)) + testsFailed++ + } + + // ── Test 9: Re-attach after detach ── + try { + const reattachResult = await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('re-attach 超时')), 3000) + client.onMessage((msg) => { + if (msg.type === 'attach_accept') { + clearTimeout(timeout) + resolve('accepted') + } else if (msg.type === 'attach_reject') { + clearTimeout(timeout) + resolve(`rejected: ${msg.data}`) + } + }) + client.send({ type: 'attach_request' }) + }) + + if (reattachResult === 'accepted') { + pass('Detach 后重新 attach 成功') + testsPassed++ + } else { + fail('重新 attach', reattachResult) + testsFailed++ + } + } catch (err) { + fail('重新 attach', String(err)) + testsFailed++ + } + + // ── Cleanup ── + client.disconnect() + await server.close() + log('SLAVE', 'Server 已关闭') + + // ── Summary ── + console.log('\n═══════════════════════════════════════════════') + console.log(` 测试结果: ${GREEN}${testsPassed} 通过${RESET}, ${testsFailed > 0 ? RED : GREEN}${testsFailed} 失败${RESET}`) + console.log('═══════════════════════════════════════════════\n') + + if (testsFailed > 0) { + process.exit(1) + } +} + +// ═══════════════════════════════════════════════════════════════ +// Standalone modes (for testing with truly separate processes) +// ═══════════════════════════════════════════════════════════════ +const mode = process.argv[2] + +if (mode === 'slave') { + // Run as standalone slave — waits for connections + console.log('启动 Slave 模式(按 Ctrl+C 退出)...\n') + const server = await runSlave() + log('SLAVE', `等待 master 连接... pipe: ${SLAVE_PIPE}`) + + process.on('SIGINT', async () => { + log('SLAVE', '关闭中...') + await server.close() + process.exit(0) + }) +} else if (mode === 'master') { + // Run as standalone master — connects to existing slave + console.log('启动 Master 模式...\n') + try { + const client = await runMaster() + + // Attach + client.onMessage((msg) => { + log('MASTER', `收到: ${msg.type} ${msg.data ?? ''}`) + }) + + client.send({ type: 'attach_request' }) + log('MASTER', '已发送 attach_request,等待响应...') + + // Wait a bit then send prompt + setTimeout(() => { + client.send({ type: 'prompt', data: '请帮我分析这段代码的问题' }) + log('MASTER', '已发送 prompt') + }, 1000) + + // Disconnect after 5s + setTimeout(() => { + client.send({ type: 'detach' }) + client.disconnect() + log('MASTER', '已断开连接') + process.exit(0) + }, 5000) + } catch (err) { + console.error('Master 连接失败:', err) + process.exit(1) + } +} else { + // Default: run full integrated test + await runFullTest() +}