From 4a7e53a41c855b9cee0889fd35944ab2d68d33c9 Mon Sep 17 00:00:00 2001 From: avtc Date: Tue, 12 May 2026 01:45:27 +0300 Subject: [PATCH] =?UTF-8?q?feat:=20add=20intercom=20extension=20API=20for?= =?UTF-8?q?=20typed=20subagent=E2=86=94root=20messaging?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Expose a public API (emitted via `intercom:ready` event) that allows other extensions to communicate with the root session (hasUI=true) through structured typed messages: - `registerHandler(contentType, handler)` — extensions register handlers for specific content types; pi-intercom auto-routes incoming messages and sends replies back to the caller - `sendAndWait({ contentType, payload, text, timeoutMs })` — queued, serialized RPC-style call from subagent to root session with configurable timeout - `contentType` + `payload` fields on Message for structured routing - `replyTimeoutMs` config option (default 10 min, supports Infinity) - Reload-safe listener cleanup via globalThis-backed unsubscribe array This decouples pi-intercom from specific extension integrations — any extension can now interact with the user through the root session without pi-intercom knowing about the use case. --- broker/client.ts | 4 ++ config.ts | 14 ++++ index.ts | 164 ++++++++++++++++++++++++++++++++++++++++++++--- types.ts | 4 ++ 4 files changed, 176 insertions(+), 10 deletions(-) diff --git a/broker/client.ts b/broker/client.ts index a6647a7..bd4ef88 100644 --- a/broker/client.ts +++ b/broker/client.ts @@ -13,6 +13,8 @@ interface SendOptions { replyTo?: string; expectsReply?: boolean; messageId?: string; + contentType?: string; + payload?: Record; } interface SendResult { @@ -487,6 +489,8 @@ export class IntercomClient extends EventEmitter { timestamp: Date.now(), replyTo: options.replyTo, expectsReply: options.expectsReply, + contentType: options.contentType, + payload: options.payload, content: { text: options.text, attachments: options.attachments, diff --git a/config.ts b/config.ts index 32e3127..b5c138c 100644 --- a/config.ts +++ b/config.ts @@ -20,6 +20,9 @@ export interface IntercomConfig { /** Show reply hint in incoming messages (default: true) */ replyHint: boolean; + + /** Timeout in ms for ask/contact_supervisor replies. Infinity = no timeout. Default: 600000 (10 minutes) */ + replyTimeoutMs?: number; } const CONFIG_PATH = join(homedir(), ".pi/agent/intercom/config.json"); @@ -30,6 +33,7 @@ const defaults: IntercomConfig = { confirmSend: false, enabled: true, replyHint: true, + replyTimeoutMs: 600_000, }; export function loadConfig(): IntercomConfig { @@ -100,6 +104,16 @@ export function loadConfig(): IntercomConfig { config.status = parsedConfig.status; } + if (Object.hasOwn(parsedConfig, "replyTimeoutMs")) { + if (typeof parsedConfig.replyTimeoutMs !== "number") { + throw new Error(`"replyTimeoutMs" must be a number`); + } + if (!Number.isFinite(parsedConfig.replyTimeoutMs) && parsedConfig.replyTimeoutMs !== Infinity) { + throw new Error(`"replyTimeoutMs" must be a finite number or Infinity`); + } + config.replyTimeoutMs = parsedConfig.replyTimeoutMs; + } + return config; } catch (error) { console.error(`Failed to load intercom config at ${CONFIG_PATH}:`, error); diff --git a/index.ts b/index.ts index 81340c0..6cf4d29 100644 --- a/index.ts +++ b/index.ts @@ -369,6 +369,12 @@ function parseSubagentIntercomPayload(payload: unknown): { to: string; message: return { to: record.to, message: record.message, ...(requestId ? { requestId } : {}) }; } function resolveIntercomPresenceName(sessionName: string | undefined, sessionId: string): string { + // If PI_SUBAGENT_INTERCOM_SESSION_NAME is set, use it as the intercom presence name + // This allows fork parents to give children unique names to avoid broker session name collisions + const envOverride = process.env[SUBAGENT_INTERCOM_SESSION_NAME_ENV]?.trim(); + if (envOverride) { + return envOverride; + } const trimmedName = sessionName?.trim(); if (trimmedName) { return trimmedName; @@ -409,7 +415,16 @@ function previewText(value: unknown, maxLength = 72): string | undefined { function firstTextContent(result: { content?: Array<{ type: string; text?: string }> }): string { return result.content?.find((item) => item.type === "text" && typeof item.text === "string")?.text?.replace(/\*\*/g, "") ?? ""; } +// Track pi.events listeners for cleanup on reload. +// globalThis survives module re-import during /reload. +const _gt = globalThis as { __piIntercomHookUnsubs?: Array<() => void> }; +const _hookUnsubs = _gt.__piIntercomHookUnsubs ??= []; + export default function piIntercomExtension(pi: ExtensionAPI) { + // Unsubscribe previous reload's listeners before registering new ones + for (const unsub of _hookUnsubs) unsub(); + _hookUnsubs.length = 0; + let client: IntercomClient | null = null; const config: IntercomConfig = loadConfig(); let runtimeContext: ExtensionContext | null = null; @@ -436,7 +451,31 @@ export default function piIntercomExtension(pi: ExtensionAPI) { resolve: (message: Message) => void; reject: (error: Error) => void; } | null = null; - function waitForReply(from: string, replyTo: string, signal?: AbortSignal): Promise { + // Queue for sendAndWait calls — ensures only one reply waiter is active at a time. + // Concurrent calls chain onto this promise so they execute sequentially. + let sendAndWaitQueue: Promise = Promise.resolve(); + let pendingTypedHandler: Promise | null = null; + + // --- Typed message handler registry (hook system) --- + // Extensions register handlers for specific contentType values. + // pi-intercom routes incoming messages to the registered handler. + type TypedMessageHandler = ( + ctx: ExtensionContext, + from: SessionInfo, + message: Message, + ) => Promise<{ text: string } | undefined>; + + const typedHandlers = new Map(); + + function registerTypedHandler(contentType: string, handler: TypedMessageHandler): void { + if (typedHandlers.has(contentType)) { + console.error(`[pi-intercom] Warning: overwriting existing handler for contentType="${contentType}"`); + } + typedHandlers.set(contentType, handler); + } + // --- End typed handler registry --- + + function waitForReply(from: string, replyTo: string, signal?: AbortSignal, timeoutMs?: number): Promise { if (replyWaiter) { return Promise.reject(new Error("Already waiting for a reply")); } @@ -444,11 +483,17 @@ export default function piIntercomExtension(pi: ExtensionAPI) { return Promise.reject(new Error("Cancelled")); } return new Promise((resolve, reject) => { - const timeout = setTimeout(() => { - rejectReplyWaiter(new Error(`No reply from "${from}" within 10 minutes`)); - }, 10 * 60 * 1000); + const effectiveTimeout = timeoutMs ?? config.replyTimeoutMs ?? 600_000; + const timeout = effectiveTimeout === Infinity + ? undefined + : setTimeout(() => { + const duration = effectiveTimeout >= 60000 + ? `${Math.round(effectiveTimeout / 60000)} minutes` + : `${Math.round(effectiveTimeout / 1000)} seconds`; + rejectReplyWaiter(new Error(`No reply from "${from}" within ${duration}`)); + }, effectiveTimeout); const cleanup = () => { - clearTimeout(timeout); + if (timeout !== undefined) clearTimeout(timeout); signal?.removeEventListener("abort", onAbort); if (replyWaiter?.replyTo === replyTo) { replyWaiter = null; @@ -642,6 +687,29 @@ export default function piIntercomExtension(pi: ExtensionAPI) { if (!liveContext) { return; } + // Route typed messages to registered handlers (before replyWaiter check) + // Sequential: wait for any previous typed handler to finish before starting the next. + + // Generic registry-based routing + if (message.contentType && typedHandlers.has(message.contentType)) { + const handler = typedHandlers.get(message.contentType)!; + pendingTypedHandler = (pendingTypedHandler ?? Promise.resolve()) + .then(async () => { + const liveCheck = () => getLiveContext(runtimeContext, runtimeGeneration); + const activeCtx = liveCheck(); + if (!activeCtx?.hasUI) return; + try { + const reply = await handler(activeCtx, from, message); + const activeClient = client; + if (reply && activeClient?.isConnected()) { + await activeClient.send(from.id, { text: reply.text, replyTo: message.id }); + } + } catch (err) { + log.error(`typed handler error (${message.contentType}):`, err); + } + }); + return; + } if (replyWaiter) { const senderTarget = from.name || from.id; const fromMatches = senderTarget.toLowerCase() === replyWaiter.from.toLowerCase() @@ -892,21 +960,21 @@ export default function piIntercomExtension(pi: ExtensionAPI) { } })(); } - pi.events.on(SUBAGENT_CONTROL_INTERCOM_EVENT, (payload) => { + _hookUnsubs.push(pi.events.on(SUBAGENT_CONTROL_INTERCOM_EVENT, (payload) => { relaySubagentIntercomPayload(payload, { sender: "subagent-control", status: "needs_attention", errorEntryType: "intercom_control_error", }); - }); - pi.events.on(SUBAGENT_RESULT_INTERCOM_EVENT, (payload) => { + })); + _hookUnsubs.push(pi.events.on(SUBAGENT_RESULT_INTERCOM_EVENT, (payload) => { relaySubagentIntercomPayload(payload, { sender: "subagent-result", status: "result", errorEntryType: "intercom_result_error", acknowledge: true, }); - }); + })); pi.on("session_start", (_event, ctx) => { if (!config.enabled) { return; @@ -938,6 +1006,10 @@ export default function piIntercomExtension(pi: ExtensionAPI) { scheduleReconnect(); }); }, 0); + // Emit intercom:ready after all extensions have loaded. + // session_start fires after discoverAndLoadExtensions completes, + // so all consumer extensions have their listeners registered. + pi.events.emit("intercom:ready", intercomApi); }); pi.on("session_shutdown", async () => { @@ -1498,7 +1570,7 @@ Usage: }; } const questionId = randomUUID(); - replyPromise = waitForReply(sendTo, questionId, _signal); + replyPromise = waitForReply(sendTo, questionId, _signal, config.replyTimeoutMs); const sendResult = await connectedClient.send(sendTo, { messageId: questionId, text: message, @@ -1775,4 +1847,76 @@ Usage: description: "Open session intercom", handler: async (ctx) => openIntercomOverlay(ctx), }); + + // --- Typed message handler registration + sendAndWait via pi.events --- + // intercomApi is emitted via pi.events("intercom:ready") in session_start, + // which fires after ALL extensions have loaded — no load-order issues. + + type SendAndWaitOptions = { + contentType: string; + payload: Record; + text: string; + signal?: AbortSignal; + }; + + type SendAndWaitResult = { text: string }; + + async function sendAndWait(options: SendAndWaitOptions): Promise { + const { contentType, payload, text, signal } = options; + const metadata = childOrchestratorMetadata; + if (!metadata) { + throw new Error("Not in subagent context — PI_SUBAGENT_ORCHESTRATOR_TARGET not set"); + } + + const activeClient = client; + if (!activeClient?.isConnected()) { + throw new Error("Intercom client not connected"); + } + + const sendTo = await resolveSessionTarget(activeClient, metadata.orchestratorTarget) ?? metadata.orchestratorTarget; + const messageId = randomUUID(); + + if (signal?.aborted) { + throw new Error("Cancelled"); + } + + // Queue behind any in-progress sendAndWait call. + // This ensures only one replyWaiter is active at a time. + // Concurrent callers wait for their turn, then execute sequentially. + let queueResolve!: () => void; + const queuePromise = new Promise(r => { queueResolve = r; }); + const previousQueue = sendAndWaitQueue; + sendAndWaitQueue = queuePromise; + await previousQueue; + + try { + const replyPromise = waitForReply(sendTo, messageId, signal); + + // Send typed message + const sendResult = await activeClient.send(sendTo, { + text, + contentType, + payload, + expectsReply: true, + messageId, + }); + + if (!sendResult.delivered) { + // Clean up reply waiter + if (replyWaiter) { + try { replyWaiter.reject(new Error(`Message not delivered: ${sendResult.reason ?? "unknown"}`)); } catch { } + } + throw new Error(`Message not delivered: ${sendResult.reason ?? "unknown"}`); + } + + // Wait for reply + const reply = await replyPromise; + return { text: reply.content.text }; + } finally { + // Always release the queue so the next queued call can proceed + queueResolve(); + } + } + + const intercomApi = { registerHandler: registerTypedHandler, sendAndWait }; } diff --git a/types.ts b/types.ts index f597c90..15be1c3 100644 --- a/types.ts +++ b/types.ts @@ -14,12 +14,16 @@ export interface Message { timestamp: number; replyTo?: string; expectsReply?: boolean; + contentType?: MessageContentType; content: { text: string; attachments?: Attachment[]; }; + payload?: Record; } +export type MessageContentType = string; + export interface Attachment { type: "file" | "snippet" | "context"; name: string;