Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions broker/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ interface SendOptions {
replyTo?: string;
expectsReply?: boolean;
messageId?: string;
contentType?: string;
payload?: Record<string, unknown>;
}

interface SendResult {
Expand Down Expand Up @@ -487,6 +489,8 @@ export class IntercomClient extends EventEmitter {
timestamp: Date.now(),
replyTo: options.replyTo,
expectsReply: options.expectsReply,
contentType: options.contentType,
payload: options.payload,
content: {
text: options.text,
attachments: options.attachments,
Expand Down
14 changes: 14 additions & 0 deletions config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ export interface IntercomConfig {

/** Show reply hint in incoming messages (default: true) */
replyHint: boolean;

/** Timeout in ms for ask/contact_supervisor replies. Infinity = no timeout. Default: 600000 (10 minutes) */
replyTimeoutMs?: number;
}

const CONFIG_PATH = join(homedir(), ".pi/agent/intercom/config.json");
Expand All @@ -30,6 +33,7 @@ const defaults: IntercomConfig = {
confirmSend: false,
enabled: true,
replyHint: true,
replyTimeoutMs: 600_000,
};

export function loadConfig(): IntercomConfig {
Expand Down Expand Up @@ -100,6 +104,16 @@ export function loadConfig(): IntercomConfig {
config.status = parsedConfig.status;
}

if (Object.hasOwn(parsedConfig, "replyTimeoutMs")) {
if (typeof parsedConfig.replyTimeoutMs !== "number") {
throw new Error(`"replyTimeoutMs" must be a number`);
}
if (!Number.isFinite(parsedConfig.replyTimeoutMs) && parsedConfig.replyTimeoutMs !== Infinity) {
throw new Error(`"replyTimeoutMs" must be a finite number or Infinity`);
}
config.replyTimeoutMs = parsedConfig.replyTimeoutMs;
}

return config;
} catch (error) {
console.error(`Failed to load intercom config at ${CONFIG_PATH}:`, error);
Expand Down
164 changes: 154 additions & 10 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,12 @@ function parseSubagentIntercomPayload(payload: unknown): { to: string; message:
return { to: record.to, message: record.message, ...(requestId ? { requestId } : {}) };
}
function resolveIntercomPresenceName(sessionName: string | undefined, sessionId: string): string {
// If PI_SUBAGENT_INTERCOM_SESSION_NAME is set, use it as the intercom presence name
// This allows fork parents to give children unique names to avoid broker session name collisions
const envOverride = process.env[SUBAGENT_INTERCOM_SESSION_NAME_ENV]?.trim();
if (envOverride) {
return envOverride;
}
const trimmedName = sessionName?.trim();
if (trimmedName) {
return trimmedName;
Expand Down Expand Up @@ -409,7 +415,16 @@ function previewText(value: unknown, maxLength = 72): string | undefined {
function firstTextContent(result: { content?: Array<{ type: string; text?: string }> }): string {
return result.content?.find((item) => item.type === "text" && typeof item.text === "string")?.text?.replace(/\*\*/g, "") ?? "";
}
// Track pi.events listeners for cleanup on reload.
// globalThis survives module re-import during /reload.
const _gt = globalThis as { __piIntercomHookUnsubs?: Array<() => void> };
const _hookUnsubs = _gt.__piIntercomHookUnsubs ??= [];

export default function piIntercomExtension(pi: ExtensionAPI) {
// Unsubscribe previous reload's listeners before registering new ones
for (const unsub of _hookUnsubs) unsub();
_hookUnsubs.length = 0;

let client: IntercomClient | null = null;
const config: IntercomConfig = loadConfig();
let runtimeContext: ExtensionContext | null = null;
Expand All @@ -436,19 +451,49 @@ export default function piIntercomExtension(pi: ExtensionAPI) {
resolve: (message: Message) => void;
reject: (error: Error) => void;
} | null = null;
function waitForReply(from: string, replyTo: string, signal?: AbortSignal): Promise<Message> {
// Queue for sendAndWait calls — ensures only one reply waiter is active at a time.
// Concurrent calls chain onto this promise so they execute sequentially.
let sendAndWaitQueue: Promise<void> = Promise.resolve();
let pendingTypedHandler: Promise<void> | null = null;

// --- Typed message handler registry (hook system) ---
// Extensions register handlers for specific contentType values.
// pi-intercom routes incoming messages to the registered handler.
type TypedMessageHandler = (
ctx: ExtensionContext,
from: SessionInfo,
message: Message,
) => Promise<{ text: string } | undefined>;

const typedHandlers = new Map<string, TypedMessageHandler>();

function registerTypedHandler(contentType: string, handler: TypedMessageHandler): void {
if (typedHandlers.has(contentType)) {
console.error(`[pi-intercom] Warning: overwriting existing handler for contentType="${contentType}"`);
}
typedHandlers.set(contentType, handler);
}
// --- End typed handler registry ---

function waitForReply(from: string, replyTo: string, signal?: AbortSignal, timeoutMs?: number): Promise<Message> {
if (replyWaiter) {
return Promise.reject(new Error("Already waiting for a reply"));
}
if (signal?.aborted) {
return Promise.reject(new Error("Cancelled"));
}
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
rejectReplyWaiter(new Error(`No reply from "${from}" within 10 minutes`));
}, 10 * 60 * 1000);
const effectiveTimeout = timeoutMs ?? config.replyTimeoutMs ?? 600_000;
const timeout = effectiveTimeout === Infinity
? undefined
: setTimeout(() => {
const duration = effectiveTimeout >= 60000
? `${Math.round(effectiveTimeout / 60000)} minutes`
: `${Math.round(effectiveTimeout / 1000)} seconds`;
rejectReplyWaiter(new Error(`No reply from "${from}" within ${duration}`));
}, effectiveTimeout);
const cleanup = () => {
clearTimeout(timeout);
if (timeout !== undefined) clearTimeout(timeout);
signal?.removeEventListener("abort", onAbort);
if (replyWaiter?.replyTo === replyTo) {
replyWaiter = null;
Expand Down Expand Up @@ -642,6 +687,29 @@ export default function piIntercomExtension(pi: ExtensionAPI) {
if (!liveContext) {
return;
}
// Route typed messages to registered handlers (before replyWaiter check)
// Sequential: wait for any previous typed handler to finish before starting the next.

// Generic registry-based routing
if (message.contentType && typedHandlers.has(message.contentType)) {
const handler = typedHandlers.get(message.contentType)!;
pendingTypedHandler = (pendingTypedHandler ?? Promise.resolve())
.then(async () => {
const liveCheck = () => getLiveContext(runtimeContext, runtimeGeneration);
const activeCtx = liveCheck();
if (!activeCtx?.hasUI) return;
try {
const reply = await handler(activeCtx, from, message);
const activeClient = client;
if (reply && activeClient?.isConnected()) {
await activeClient.send(from.id, { text: reply.text, replyTo: message.id });
}
} catch (err) {
log.error(`typed handler error (${message.contentType}):`, err);
}
});
return;
}
if (replyWaiter) {
const senderTarget = from.name || from.id;
const fromMatches = senderTarget.toLowerCase() === replyWaiter.from.toLowerCase()
Expand Down Expand Up @@ -892,21 +960,21 @@ export default function piIntercomExtension(pi: ExtensionAPI) {
}
})();
}
pi.events.on(SUBAGENT_CONTROL_INTERCOM_EVENT, (payload) => {
_hookUnsubs.push(pi.events.on(SUBAGENT_CONTROL_INTERCOM_EVENT, (payload) => {
relaySubagentIntercomPayload(payload, {
sender: "subagent-control",
status: "needs_attention",
errorEntryType: "intercom_control_error",
});
});
pi.events.on(SUBAGENT_RESULT_INTERCOM_EVENT, (payload) => {
}));
_hookUnsubs.push(pi.events.on(SUBAGENT_RESULT_INTERCOM_EVENT, (payload) => {
relaySubagentIntercomPayload(payload, {
sender: "subagent-result",
status: "result",
errorEntryType: "intercom_result_error",
acknowledge: true,
});
});
}));
pi.on("session_start", (_event, ctx) => {
if (!config.enabled) {
return;
Expand Down Expand Up @@ -938,6 +1006,10 @@ export default function piIntercomExtension(pi: ExtensionAPI) {
scheduleReconnect();
});
}, 0);
// Emit intercom:ready after all extensions have loaded.
// session_start fires after discoverAndLoadExtensions completes,
// so all consumer extensions have their listeners registered.
pi.events.emit("intercom:ready", intercomApi);
});

pi.on("session_shutdown", async () => {
Expand Down Expand Up @@ -1498,7 +1570,7 @@ Usage:
};
}
const questionId = randomUUID();
replyPromise = waitForReply(sendTo, questionId, _signal);
replyPromise = waitForReply(sendTo, questionId, _signal, config.replyTimeoutMs);
const sendResult = await connectedClient.send(sendTo, {
messageId: questionId,
text: message,
Expand Down Expand Up @@ -1775,4 +1847,76 @@ Usage:
description: "Open session intercom",
handler: async (ctx) => openIntercomOverlay(ctx),
});

// --- Typed message handler registration + sendAndWait via pi.events ---
// intercomApi is emitted via pi.events("intercom:ready") in session_start,
// which fires after ALL extensions have loaded — no load-order issues.

type SendAndWaitOptions = {
contentType: string;
payload: Record<string, unknown>;
text: string;
signal?: AbortSignal;
};

type SendAndWaitResult = { text: string };

async function sendAndWait(options: SendAndWaitOptions): Promise<SendAndWaitResult> {
const { contentType, payload, text, signal } = options;
const metadata = childOrchestratorMetadata;
if (!metadata) {
throw new Error("Not in subagent context — PI_SUBAGENT_ORCHESTRATOR_TARGET not set");
}

const activeClient = client;
if (!activeClient?.isConnected()) {
throw new Error("Intercom client not connected");
}

const sendTo = await resolveSessionTarget(activeClient, metadata.orchestratorTarget) ?? metadata.orchestratorTarget;
const messageId = randomUUID();

if (signal?.aborted) {
throw new Error("Cancelled");
}

// Queue behind any in-progress sendAndWait call.
// This ensures only one replyWaiter is active at a time.
// Concurrent callers wait for their turn, then execute sequentially.
let queueResolve!: () => void;
const queuePromise = new Promise<void>(r => { queueResolve = r; });
const previousQueue = sendAndWaitQueue;
sendAndWaitQueue = queuePromise;
await previousQueue;

try {
const replyPromise = waitForReply(sendTo, messageId, signal);

// Send typed message
const sendResult = await activeClient.send(sendTo, {
text,
contentType,
payload,
expectsReply: true,
messageId,
});

if (!sendResult.delivered) {
// Clean up reply waiter
if (replyWaiter) {
try { replyWaiter.reject(new Error(`Message not delivered: ${sendResult.reason ?? "unknown"}`)); } catch { }
}
throw new Error(`Message not delivered: ${sendResult.reason ?? "unknown"}`);
}

// Wait for reply
const reply = await replyPromise;
return { text: reply.content.text };
} finally {
// Always release the queue so the next queued call can proceed
queueResolve();
}
}

const intercomApi = { registerHandler: registerTypedHandler, sendAndWait };
}
4 changes: 4 additions & 0 deletions types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ export interface Message {
timestamp: number;
replyTo?: string;
expectsReply?: boolean;
contentType?: MessageContentType;
content: {
text: string;
attachments?: Attachment[];
};
payload?: Record<string, unknown>;
}

export type MessageContentType = string;

export interface Attachment {
type: "file" | "snippet" | "context";
name: string;
Expand Down