Skip to content
575 changes: 575 additions & 0 deletions docs/pipe-master-slave-complete.md

Large diffs are not rendered by default.

149 changes: 149 additions & 0 deletions docs/pipe-master-slave-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# Master-Slave CLI Communication Architecture Design Document

## 1. Overview

This document describes a **master-slave architecture** for inter-CLI communication using Unix domain sockets (named pipes). The system allows independent CLI instances to form a coordination network where:

- **Master CLI**: A control center that connects to multiple slave CLIs, sends them tasks, and receives their full session data (user input + AI output + tool results) for review.
- **Slave CLI**: An autonomous worker that processes tasks independently. When attached by a master, it automatically reports all session activity back to the master.
- **Standalone CLI**: The default mode — a completely normal, independent CLI with no special behavior.

## 2. Core Principles

1. **Independence by default**: Every CLI starts as standalone. No master/slave behavior until explicitly activated via `/attach`.
2. **Master is a monitor, not a terminal proxy**: The master CLI's own conversation/commands remain fully functional. Master monitors slaves, it doesn't become them.
3. **Slave is autonomous**: Slave executes its own AI queries and tool calls. It just reports what happens to the master.
4. **Multiple slaves**: A master can attach to multiple slaves simultaneously.
5. **Bidirectional control**: Master can send prompts to slaves via `/send`, and receives session reports automatically.
6. **Clean detach**: Either side can disconnect, returning to standalone mode.

## 3. Architecture

### 3.1 Transport Layer (existing — `src/utils/pipeTransport.ts`)

- **PipeServer**: Each CLI creates a Unix domain socket server at `~/.claude/pipes/{session-id}.sock`
- **PipeClient**: Connects to a remote PipeServer for communication
- **Protocol**: NDJSON (newline-delimited JSON) over Unix domain sockets
- **Message types**: `ping/pong`, `attach_request/accept/reject`, `detach`, `prompt`, `stream`, `tool_start`, `tool_result`, `done`, `error`

### 3.2 State Model (`AppState.pipeIpc`)

```typescript
pipeIpc: {
role: 'standalone' | 'master' | 'slave'
serverName: string | null // This CLI's pipe server name
// Master-specific
slaves: Map<string, SlaveInfo> // Connected slaves (name → info)
// Slave-specific
attachedBy: string | null // Master pipe name (when slave)
}

type SlaveInfo = {
name: string
connectedAt: string // ISO timestamp
status: 'connected' | 'busy' | 'idle'
history: SessionEntry[] // Full session transcript
}

type SessionEntry = {
type: 'prompt' | 'stream' | 'tool_start' | 'tool_result' | 'done' | 'error'
content: string
from: string
timestamp: string
meta?: Record<string, unknown>
}
```

### 3.3 Hooks

#### `usePipeIpc` (every CLI)
- On mount: create PipeServer for this session
- Handle `attach_request` → accept, switch to slave role, begin auto-reporting
- Handle `prompt` → inject via `handleIncomingPrompt`
- Handle `detach` → revert to standalone
- **Auto-report**: When in slave role, relay all session events (user input, AI output, tool calls) to master via `globalThis.__pipeSendToMaster`

#### `useMasterMonitor` (master only)
- Active when `role === 'master'`
- For each connected slave PipeClient: listen for `stream`, `tool_start`, `tool_result`, `done`, `error`
- Store received messages into `slaves[name].history`
- Update slave status (`busy`/`idle`) based on `prompt`/`done` events

### 3.4 Session Relay (Slave → Master)

When a CLI is in slave role, the REPL's `onQueryEvent` handler additionally calls `globalThis.__pipeSendToMaster()` to forward:
- **AI stream fragments** → `{ type: 'stream', data: text }`
- **Tool start** → `{ type: 'tool_start', data: toolName, meta: { toolUseId } }`
- **Tool results** → `{ type: 'tool_result', data: resultText, meta: { toolUseId } }`
- **Turn complete** → `{ type: 'done' }`
- **Errors** → `{ type: 'error', data: errorMessage }`

### 3.5 Commands

| Command | Description |
|---------|-------------|
| `/pipes` | List all discoverable pipe servers with liveness status |
| `/attach <name>` | Connect to a slave CLI, begin receiving session reports |
| `/detach [name]` | Disconnect from one slave (or all if no arg) |
| `/send <name> <msg>` | Inject a prompt into a slave CLI |
| `/history <name>` | View a slave's full session transcript |
| `/status` | Overview of all connected slaves and their status |

## 4. Flow Diagrams

### 4.1 Attach Flow
```
Master CLI Slave CLI
| |
|-- /attach cli-abc12345 --------> |
| (PipeClient connects) |
| |
|-- attach_request ---------------> |
| |-- (checks role == standalone)
|<------------- attach_accept ------|-- (sets role = slave)
|-- (sets role = master) |
|-- (adds to slaves map) |
| |
|<============ auto-report =========| (slave sends all session data)
```

### 4.2 Send Flow
```
Master CLI Slave CLI
| |
|-- /send cli-abc12345 "task" --> |
|-- prompt {data: "task"} --------> |
| |-- (handleIncomingPrompt)
| |-- (AI processes task)
|<-------- stream {data: "..."} ----|
|<-------- tool_start -------------|
|<-------- tool_result ------------|
|<-------- done -------------------|
|-- (stores in history) |
```

### 4.3 Detach Flow
```
Master CLI Slave CLI
| |
|-- /detach cli-abc12345 --------> |
|-- detach ----------------------> |
| |-- (sets role = standalone)
|-- (removes from slaves map) |-- (stops auto-report)
|-- (role stays master or |
| becomes standalone if |
| no more slaves) |
```

## 5. Implementation Plan

1. Update `AppState.pipeIpc` to support multi-slave master model
2. Rewrite `usePipeIpc` hook for correct slave behavior
3. Create `useMasterMonitor` hook for master-side monitoring
4. Rewrite `/attach` command for multi-slave support
5. Rewrite `/detach` command with optional target
6. Update `/pipes` command
7. Create `/send`, `/history`, `/status` commands
8. Mount hooks in REPL.tsx
9. Register all commands in commands.ts
10. Integrate session relay into REPL's `onQueryEvent`
25 changes: 25 additions & 0 deletions src/assistant/gate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { feature } from 'bun:bundle'
import { getKairosActive } from '../bootstrap/state.js'
import { getFeatureValue_CACHED_MAY_BE_STALE } from '../services/analytics/growthbook.js'

/**
* Runtime gate for KAIROS features.
*
* Build-time: feature('KAIROS') must be on (checked by caller before
* this module is required).
*
* Runtime: tengu_kairos_assistant GrowthBook flag acts as a remote kill
* switch, and kairosActive state must be true (set during bootstrap when
* the session qualifies for KAIROS features).
*/
export async function isKairosEnabled(): Promise<boolean> {
if (!feature('KAIROS')) {
return false
}
if (
!getFeatureValue_CACHED_MAY_BE_STALE('tengu_kairos_assistant', false)
) {
return false
}
return getKairosActive()
}
12 changes: 12 additions & 0 deletions src/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ import terminalSetup from './commands/terminalSetup/index.js'
import usage from './commands/usage/index.js'
import theme from './commands/theme/index.js'
import vim from './commands/vim/index.js'
import attach from './commands/attach/index.js'
import detach from './commands/detach/index.js'
import pipes from './commands/pipes/index.js'
import send from './commands/send/index.js'
import pipeHistory from './commands/history/index.js'
import pipeStatus from './commands/pipe-status/index.js'
import { feature } from 'bun:bundle'
// Dead code elimination: conditional imports
/* eslint-disable @typescript-eslint/no-require-imports */
Expand Down Expand Up @@ -326,6 +332,12 @@ const COMMANDS = memoize((): Command[] => [
...(bridge ? [bridge] : []),
...(remoteControlServerCommand ? [remoteControlServerCommand] : []),
...(voiceCommand ? [voiceCommand] : []),
attach,
detach,
pipes,
send,
pipeHistory,
pipeStatus,
thinkback,
thinkbackPlay,
permissions,
Expand Down
36 changes: 36 additions & 0 deletions src/commands/assistant/assistant.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import type { LocalJSXCommandContext } from '../../commands.js'
import type { LocalJSXCommandOnDone } from '../../types/command.js'

/**
* /assistant command implementation.
*
* Opens the Kairos assistant panel. In the current build the panel is
* rendered by the REPL layer when kairosActive is true; the slash command
* simply toggles visibility and prints a confirmation line.
*/
export async function call(
onDone: LocalJSXCommandOnDone,
context: LocalJSXCommandContext,
_args: string,
): Promise<React.ReactNode> {
const { setAppState, getAppState } = context

const current = getAppState()
const isVisible = (current as Record<string, unknown>).assistantPanelVisible

if (isVisible) {
setAppState((prev: Record<string, unknown>) => ({
...prev,
assistantPanelVisible: false,
}))
onDone('Assistant panel hidden.', { display: 'system' })
} else {
setAppState((prev: Record<string, unknown>) => ({
...prev,
assistantPanelVisible: true,
}))
onDone('Assistant panel opened.', { display: 'system' })
}

return null
}
25 changes: 25 additions & 0 deletions src/commands/assistant/gate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { feature } from 'bun:bundle'
import { getKairosActive } from '../../bootstrap/state.js'
import { getFeatureValue_CACHED_MAY_BE_STALE } from '../../services/analytics/growthbook.js'

/**
* Runtime gate for the /assistant command.
*
* Build-time: feature('KAIROS') must be on (checked in commands.ts before
* the module is even required).
*
* Runtime: tengu_kairos_assistant GrowthBook flag acts as a remote kill
* switch, and kairosActive state must be true (set during bootstrap when
* the session qualifies for KAIROS features).
*/
export function isAssistantEnabled(): boolean {
if (!feature('KAIROS')) {
return false
}
if (
!getFeatureValue_CACHED_MAY_BE_STALE('tengu_kairos_assistant', false)
) {
return false
}
return getKairosActive()
}
16 changes: 16 additions & 0 deletions src/commands/assistant/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import type { Command } from '../../commands.js'
import { isAssistantEnabled } from './gate.js'

const assistant = {
type: 'local-jsx',
name: 'assistant',
description: 'Open the Kairos assistant panel',
isEnabled: isAssistantEnabled,
get isHidden() {
return !isAssistantEnabled()
},
immediate: true,
load: () => import('./assistant.js'),
} satisfies Command

export default assistant
97 changes: 97 additions & 0 deletions src/commands/attach/attach.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import type { LocalCommandCall } from '../../types/command.js'
import { connectToPipe, getPipeIpc, type PipeClient, type PipeMessage } from '../../utils/pipeTransport.js'
import { addSlaveClient } from '../../hooks/useMasterMonitor.js'

export const call: LocalCommandCall = async (args, context) => {
const targetName = args.trim()
if (!targetName) {
return {
type: 'text',
value: 'Usage: /attach <pipe-name>\nUse /pipes to list available pipes.',
}
}

const currentState = context.getAppState()

// Check if already attached to this slave
if (getPipeIpc(currentState).slaves[targetName]) {
return {
type: 'text',
value: `Already attached to "${targetName}".`,
}
}

// Cannot attach when in slave mode
if (getPipeIpc(currentState).role === 'slave') {
return {
type: 'text',
value: 'Cannot attach: this CLI is in slave mode. Use /detach from the master first.',
}
}

// Connect to the target pipe server
let client: PipeClient
try {
const myName = getPipeIpc(currentState).serverName ?? `master-${process.pid}`
client = await connectToPipe(targetName, myName)
} catch (err) {
return {
type: 'text',
value: `Failed to connect to "${targetName}": ${err instanceof Error ? err.message : String(err)}`,
}
}

// Send attach request and wait for response
return new Promise((resolve) => {
const timeout = setTimeout(() => {
client.disconnect()
resolve({
type: 'text',
value: `Attach to "${targetName}" timed out (no response within 5s).`,
})
}, 5000)

client.onMessage((msg: PipeMessage) => {
if (msg.type === 'attach_accept') {
clearTimeout(timeout)

// Register the slave client in the module-level registry
addSlaveClient(targetName, client)

// Update AppState: add slave and switch to master role
context.setAppState((prev) => ({
...prev,
pipeIpc: {
...getPipeIpc(prev),
role: 'master',
slaves: {
...getPipeIpc(prev).slaves,
[targetName]: {
name: targetName,
connectedAt: new Date().toISOString(),
status: 'idle' as const,
history: [],
},
},
},
}))

const slaveCount = Object.keys(getPipeIpc(currentState).slaves).length + 1
resolve({
type: 'text',
value: `Attached to "${targetName}" as master. Now monitoring ${slaveCount} slave(s).\nUse /send ${targetName} <message> to send tasks.\nUse /status to see all slaves.\nUse /detach ${targetName} to disconnect.`,
})
} else if (msg.type === 'attach_reject') {
clearTimeout(timeout)
client.disconnect()

resolve({
type: 'text',
value: `Attach rejected by "${targetName}": ${msg.data ?? 'unknown reason'}`,
})
}
})

client.send({ type: 'attach_request' })
})
}
12 changes: 12 additions & 0 deletions src/commands/attach/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { Command } from '../../commands.js'

const attach = {
type: 'local',
name: 'attach',
description: 'Attach to a slave CLI to monitor and control it',
argumentHint: '<pipe-name>',
supportsNonInteractive: false,
load: () => import('./attach.js'),
} satisfies Command

export default attach
Loading