Skip to content
6 changes: 3 additions & 3 deletions cli/src/agent/backends/acp/AcpSdkBackend.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ describe('AcpSdkBackend', () => {
});
}, 0);

await sleep(5);
await sleep(1);

setTimeout(() => {
backendInternal.handleSessionUpdate({
Expand All @@ -111,7 +111,7 @@ describe('AcpSdkBackend', () => {
status: 'in_progress'
}
});
}, 3);
}, 1);

setTimeout(() => {
backendInternal.handleSessionUpdate({
Expand All @@ -123,7 +123,7 @@ describe('AcpSdkBackend', () => {
rawOutput: { ok: true }
}
});
}, 6);
}, 2);

return { stopReason: 'end_turn' };
},
Expand Down
10 changes: 10 additions & 0 deletions cli/src/codex/appServerTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,13 @@ export interface TurnInterruptResponse {
ok: boolean;
[key: string]: unknown;
}

export interface TurnSteerParams {
threadId: string;
expectedTurnId: string;
input: UserInput[];
}

export interface TurnSteerResponse {
[key: string]: unknown;
}
69 changes: 69 additions & 0 deletions cli/src/codex/codexAppServerClient.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { EventEmitter } from 'node:events';
import { afterEach, describe, expect, it, vi } from 'vitest';

const harness = vi.hoisted(() => ({
child: null as (EventEmitter & {
stdout: EventEmitter & { setEncoding: (encoding: string) => void };
stderr: EventEmitter & { setEncoding: (encoding: string) => void };
stdin: { end: () => void };
}) | null,
killCalls: 0
}));

vi.mock('node:child_process', () => ({
spawn: vi.fn(() => {
const child = new EventEmitter() as EventEmitter & {
stdout: EventEmitter & { setEncoding: (encoding: string) => void };
stderr: EventEmitter & { setEncoding: (encoding: string) => void };
stdin: { end: () => void };
};
child.stdout = Object.assign(new EventEmitter(), { setEncoding: () => {} });
child.stderr = Object.assign(new EventEmitter(), { setEncoding: () => {} });
child.stdin = { end: vi.fn() };
harness.child = child;
return child;
})
}));

vi.mock('@/utils/process', () => ({
killProcessByChildProcess: vi.fn(async (child: EventEmitter) => {
harness.killCalls += 1;
child.emit('exit', 0, null);
})
}));

import { CodexAppServerClient } from './codexAppServerClient';

describe('CodexAppServerClient disconnect handler', () => {
afterEach(() => {
harness.child = null;
harness.killCalls = 0;
});

it('notifies once when the app-server exits unexpectedly', async () => {
const client = new CodexAppServerClient();
let disconnects = 0;
client.setDisconnectHandler(() => {
disconnects += 1;
});

await client.connect();
harness.child?.emit('exit', 1, null);

expect(disconnects).toBe(1);
});

it('does not notify the disconnect handler during intentional shutdown', async () => {
const client = new CodexAppServerClient();
let disconnects = 0;
client.setDisconnectHandler(() => {
disconnects += 1;
});

await client.connect();
await client.disconnect();

expect(harness.killCalls).toBe(1);
expect(disconnects).toBe(0);
});
});
36 changes: 32 additions & 4 deletions cli/src/codex/codexAppServerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import type {
TurnStartParams,
TurnStartResponse,
TurnInterruptParams,
TurnInterruptResponse
TurnInterruptResponse,
TurnSteerParams,
TurnSteerResponse
} from './appServerTypes';

type JsonRpcLiteRequest = {
Expand Down Expand Up @@ -64,6 +66,7 @@ export class CodexAppServerClient {
private readonly pending = new Map<number, PendingRequest>();
private readonly requestHandlers = new Map<string, RequestHandler>();
private notificationHandler: ((method: string, params: unknown) => void) | null = null;
private disconnectHandler: ((error: Error) => void) | null = null;
private protocolError: Error | null = null;

static readonly DEFAULT_TIMEOUT_MS = 14 * 24 * 60 * 60 * 1000;
Expand Down Expand Up @@ -96,23 +99,27 @@ export class CodexAppServerClient {

this.process.on('exit', (code, signal) => {
const message = `Codex app-server exited (code=${code ?? 'null'}, signal=${signal ?? 'null'})`;
const error = new Error(message);
logger.debug(message);
this.rejectAllPending(new Error(message));
this.rejectAllPending(error);
this.connected = false;
this.resetParserState();
this.process = null;
this.notifyDisconnected(error);
});

this.process.on('error', (error) => {
logger.debug('[CodexAppServer] Process error', error);
const message = error instanceof Error ? error.message : String(error);
this.rejectAllPending(new Error(
const disconnectError = new Error(
`Failed to spawn codex app-server: ${message}. Is it installed and on PATH?`,
{ cause: error }
));
);
this.rejectAllPending(disconnectError);
this.connected = false;
this.resetParserState();
this.process = null;
this.notifyDisconnected(disconnectError);
});

this.connected = true;
Expand All @@ -123,6 +130,10 @@ export class CodexAppServerClient {
this.notificationHandler = handler;
}

setDisconnectHandler(handler: ((error: Error) => void) | null): void {
this.disconnectHandler = handler;
}

registerRequestHandler(method: string, handler: RequestHandler): void {
this.requestHandlers.set(method, handler);
}
Expand Down Expand Up @@ -164,13 +175,22 @@ export class CodexAppServerClient {
return response as TurnInterruptResponse;
}

async steerTurn(params: TurnSteerParams, options?: { signal?: AbortSignal }): Promise<TurnSteerResponse> {
const response = await this.sendRequest('turn/steer', params, {
signal: options?.signal,
timeoutMs: CodexAppServerClient.DEFAULT_TIMEOUT_MS
});
return response as TurnSteerResponse;
}

async disconnect(): Promise<void> {
if (!this.connected) {
return;
}

const child = this.process;
this.process = null;
this.disconnectHandler = null;

try {
child?.stdin.end();
Expand All @@ -188,6 +208,14 @@ export class CodexAppServerClient {
logger.debug('[CodexAppServer] Disconnected');
}

private notifyDisconnected(error: Error): void {
try {
this.disconnectHandler?.(error);
} catch (handlerError) {
logger.debug('[CodexAppServer] Disconnect handler error', handlerError);
}
}

private async sendRequest(
method: string,
params?: unknown,
Expand Down
Loading
Loading