From a5219f14ecd66a6f63adeafc366dc742c88ba6f6 Mon Sep 17 00:00:00 2001 From: Liu-KM Date: Thu, 16 Apr 2026 23:30:55 +0800 Subject: [PATCH 1/9] feat(codex): add app-server turn steering client --- cli/src/codex/appServerTypes.ts | 10 ++++++++++ cli/src/codex/codexAppServerClient.ts | 12 +++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/cli/src/codex/appServerTypes.ts b/cli/src/codex/appServerTypes.ts index fdb7fcf6b..15b3f719d 100644 --- a/cli/src/codex/appServerTypes.ts +++ b/cli/src/codex/appServerTypes.ts @@ -144,3 +144,13 @@ export interface TurnInterruptResponse { ok: boolean; [key: string]: unknown; } + +export interface TurnSteerParams { + threadId: string; + expectedTurnId: string; + input: UserInput[]; +} + +export interface TurnSteerResponse { + [key: string]: unknown; +} diff --git a/cli/src/codex/codexAppServerClient.ts b/cli/src/codex/codexAppServerClient.ts index b45b4976b..ccbc5e829 100644 --- a/cli/src/codex/codexAppServerClient.ts +++ b/cli/src/codex/codexAppServerClient.ts @@ -11,7 +11,9 @@ import type { TurnStartParams, TurnStartResponse, TurnInterruptParams, - TurnInterruptResponse + TurnInterruptResponse, + TurnSteerParams, + TurnSteerResponse } from './appServerTypes'; type JsonRpcLiteRequest = { @@ -164,6 +166,14 @@ export class CodexAppServerClient { return response as TurnInterruptResponse; } + async steerTurn(params: TurnSteerParams, options?: { signal?: AbortSignal }): Promise { + const response = await this.sendRequest('turn/steer', params, { + signal: options?.signal, + timeoutMs: CodexAppServerClient.DEFAULT_TIMEOUT_MS + }); + return response as TurnSteerResponse; + } + async disconnect(): Promise { if (!this.connected) { return; From 7be5f63dc05a429dfcf1b72ca5aca3ae869df32e Mon Sep 17 00:00:00 2001 From: Liu-KM Date: Thu, 16 Apr 2026 23:31:21 +0800 Subject: [PATCH 2/9] feat(codex): steer queued messages into active turns --- cli/src/codex/codexRemoteLauncher.test.ts | 82 ++++++++++++++++++++++- cli/src/codex/codexRemoteLauncher.ts | 77 +++++++++++++++++++++ 2 files changed, 156 insertions(+), 3 deletions(-) diff --git a/cli/src/codex/codexRemoteLauncher.test.ts b/cli/src/codex/codexRemoteLauncher.test.ts index 6d1b2c570..13327fde0 100644 --- a/cli/src/codex/codexRemoteLauncher.test.ts +++ b/cli/src/codex/codexRemoteLauncher.test.ts @@ -5,7 +5,12 @@ import type { EnhancedMode } from './loop'; const harness = vi.hoisted(() => ({ notifications: [] as Array<{ method: string; params: unknown }>, registerRequestCalls: [] as string[], - initializeCalls: [] as unknown[] + initializeCalls: [] as unknown[], + startTurnCalls: [] as unknown[], + steerTurnCalls: [] as unknown[], + notificationHandler: null as ((method: string, params: unknown) => void) | null, + startTurnImpl: null as null | (() => Promise<{ turn: Record }>), + steerTurnImpl: null as null | (() => Promise>) })); vi.mock('./codexAppServerClient', () => { @@ -21,6 +26,7 @@ vi.mock('./codexAppServerClient', () => { setNotificationHandler(handler: ((method: string, params: unknown) => void) | null): void { this.notificationHandler = handler; + harness.notificationHandler = handler; } registerRequestHandler(method: string): void { @@ -36,6 +42,10 @@ vi.mock('./codexAppServerClient', () => { } async startTurn(): Promise<{ turn: Record }> { + harness.startTurnCalls.push({}); + if (harness.startTurnImpl) { + return harness.startTurnImpl() as Promise<{ turn: Record }>; + } const started = { turn: {} }; harness.notifications.push({ method: 'turn/started', params: started }); this.notificationHandler?.('turn/started', started); @@ -47,6 +57,14 @@ vi.mock('./codexAppServerClient', () => { return { turn: {} }; } + async steerTurn(params: unknown): Promise> { + harness.steerTurnCalls.push(params); + if (harness.steerTurnImpl) { + return harness.steerTurnImpl(); + } + return {}; + } + async interruptTurn(): Promise> { return {}; } @@ -80,10 +98,12 @@ function createMode(): EnhancedMode { }; } -function createSessionStub() { +function createSessionStub(options?: { closeQueue?: boolean }) { const queue = new MessageQueue2((mode) => JSON.stringify(mode)); queue.push('hello from launcher test', createMode()); - queue.close(); + if (options?.closeQueue ?? true) { + queue.close(); + } const sessionEvents: Array<{ type: string; [key: string]: unknown }> = []; const codexMessages: unknown[] = []; @@ -163,11 +183,34 @@ function createSessionStub() { }; } +function waitFor(condition: () => boolean, timeoutMs = 1000): Promise { + const startedAt = Date.now(); + return new Promise((resolve, reject) => { + const tick = () => { + if (condition()) { + resolve(); + return; + } + if (Date.now() - startedAt > timeoutMs) { + reject(new Error('Timed out waiting for condition')); + return; + } + setTimeout(tick, 10); + }; + tick(); + }); +} + describe('codexRemoteLauncher', () => { afterEach(() => { harness.notifications = []; harness.registerRequestCalls = []; harness.initializeCalls = []; + harness.startTurnCalls = []; + harness.steerTurnCalls = []; + harness.notificationHandler = null; + harness.startTurnImpl = null; + harness.steerTurnImpl = null; }); it('finishes a turn and emits ready when task lifecycle events omit turn_id', async () => { @@ -198,4 +241,37 @@ describe('codexRemoteLauncher', () => { expect(thinkingChanges).toContain(true); expect(session.thinking).toBe(false); }); + + it('does not start a second turn while the first turn is still active', async () => { + harness.startTurnImpl = async () => { + const started = { turn: { id: 'turn-1' } }; + harness.notifications.push({ method: 'turn/started', params: started }); + harness.notificationHandler?.('turn/started', started); + return { turn: { id: 'turn-1' } }; + }; + + const { session, sessionEvents, thinkingChanges } = createSessionStub({ closeQueue: false }); + const launcherPromise = codexRemoteLauncher(session as never); + + await waitFor(() => harness.startTurnCalls.length === 1); + + session.queue.push('second message', createMode()); + session.queue.close(); + + await waitFor(() => harness.steerTurnCalls.length === 1); + expect(harness.startTurnCalls).toHaveLength(1); + + const completed = { status: 'Completed', turn: { id: 'turn-1' } }; + harness.notifications.push({ method: 'turn/completed', params: completed }); + harness.notificationHandler?.('turn/completed', completed); + + const exitReason = await launcherPromise; + + expect(exitReason).toBe('exit'); + expect(harness.startTurnCalls).toHaveLength(1); + expect(harness.steerTurnCalls).toHaveLength(1); + expect(sessionEvents.filter((event) => event.type === 'ready').length).toBeGreaterThanOrEqual(1); + expect(thinkingChanges).toContain(true); + expect(session.thinking).toBe(false); + }); }); diff --git a/cli/src/codex/codexRemoteLauncher.ts b/cli/src/codex/codexRemoteLauncher.ts index 53024deec..c42abd95e 100644 --- a/cli/src/codex/codexRemoteLauncher.ts +++ b/cli/src/codex/codexRemoteLauncher.ts @@ -240,7 +240,73 @@ class CodexRemoteLauncher extends RemoteLauncherBase { let scheduleReadyAfterTurn: (() => void) | null = null; let clearReadyAfterTurnTimer: (() => void) | null = null; let turnInFlight = false; + let steeringInFlight = false; + let activeTurnModeHash: string | null = null; let allowAnonymousTerminalEvent = false; + let resolveTurnSettled: (() => void) | null = null; + + const settleTurnInFlight = () => { + if (!resolveTurnSettled) { + return; + } + const resolve = resolveTurnSettled; + resolveTurnSettled = null; + resolve(); + }; + + const maybeSteerQueuedMessages = async () => { + if (this.shouldExit || turnInFlight === false || steeringInFlight) { + return; + } + if (!this.currentThreadId || !this.currentTurnId || !activeTurnModeHash) { + return; + } + if (session.queue.size() === 0) { + return; + } + + const nextItem = session.queue.queue[0]; + if (!nextItem || nextItem.isolate || nextItem.modeHash !== activeTurnModeHash) { + return; + } + + steeringInFlight = true; + let batch: QueuedMessage | null = null; + + try { + batch = await session.queue.waitForMessagesAndGetAsString(); + if (!batch) { + return; + } + if (batch.isolate || batch.hash !== activeTurnModeHash) { + session.queue.unshift(batch.message, batch.mode); + return; + } + + messageBuffer.addMessage(batch.message, 'user'); + await appServerClient.steerTurn({ + threadId: this.currentThreadId, + expectedTurnId: this.currentTurnId, + input: [{ type: 'text', text: batch.message }] + }, { + signal: this.abortController.signal + }); + } catch (error) { + if (batch) { + session.queue.unshift(batch.message, batch.mode); + } + logger.debug('[Codex] Failed to steer active turn; keeping message queued for next turn', error); + } finally { + steeringInFlight = false; + if (turnInFlight && session.queue.size() > 0) { + void maybeSteerQueuedMessages(); + } + } + }; + + session.queue.setOnMessage(() => { + void maybeSteerQueuedMessages(); + }); const handleCodexEvent = (msg: Record) => { const msgType = asString(msg.type); @@ -282,6 +348,7 @@ class CodexRemoteLauncher extends RemoteLauncherBase { return; } this.currentTurnId = null; + activeTurnModeHash = null; allowAnonymousTerminalEvent = false; } @@ -330,7 +397,9 @@ class CodexRemoteLauncher extends RemoteLauncherBase { } if (isTerminalEvent) { turnInFlight = false; + activeTurnModeHash = null; allowAnonymousTerminalEvent = false; + settleTurnInFlight(); if (session.thinking) { logger.debug('thinking completed'); session.onThinkingChange(false); @@ -688,7 +757,11 @@ class CodexRemoteLauncher extends RemoteLauncherBase { cliOverrides: session.codexCliOverrides }); turnInFlight = true; + activeTurnModeHash = message.hash; allowAnonymousTerminalEvent = false; + const turnSettled = new Promise((resolve) => { + resolveTurnSettled = resolve; + }); const turnResponse = await appServerClient.startTurn(turnParams, { signal: this.abortController.signal }); @@ -700,12 +773,15 @@ class CodexRemoteLauncher extends RemoteLauncherBase { } else if (!this.currentTurnId) { allowAnonymousTerminalEvent = true; } + await turnSettled; } catch (error) { logger.warn('Error in codex session:', error); const isAbortError = error instanceof Error && error.name === 'AbortError'; turnInFlight = false; + activeTurnModeHash = null; allowAnonymousTerminalEvent = false; this.currentTurnId = null; + settleTurnInFlight(); if (isAbortError) { messageBuffer.addMessage('Aborted by user', 'status'); @@ -739,6 +815,7 @@ class CodexRemoteLauncher extends RemoteLauncherBase { protected async cleanup(): Promise { logger.debug('[codex-remote]: cleanup start'); + this.session.queue.setOnMessage(null); try { await this.appServerClient.disconnect(); } catch (error) { From f9b38bcc9a4d4071bc023de7275a0a68a1d4eb0d Mon Sep 17 00:00:00 2001 From: Liu-KM Date: Mon, 20 Apr 2026 12:09:58 +0800 Subject: [PATCH 3/9] fix(codex): settle steering after turn id and disconnects --- cli/src/codex/codexAppServerClient.ts | 27 ++++++++-- cli/src/codex/codexRemoteLauncher.test.ts | 63 +++++++++++++++++++++++ cli/src/codex/codexRemoteLauncher.ts | 10 ++++ 3 files changed, 96 insertions(+), 4 deletions(-) diff --git a/cli/src/codex/codexAppServerClient.ts b/cli/src/codex/codexAppServerClient.ts index ccbc5e829..d84003102 100644 --- a/cli/src/codex/codexAppServerClient.ts +++ b/cli/src/codex/codexAppServerClient.ts @@ -66,6 +66,7 @@ export class CodexAppServerClient { private readonly pending = new Map(); private readonly requestHandlers = new Map(); private notificationHandler: ((method: string, params: unknown) => void) | null = null; + private disconnectHandler: ((error: Error) => void) | null = null; private protocolError: Error | null = null; static readonly DEFAULT_TIMEOUT_MS = 14 * 24 * 60 * 60 * 1000; @@ -98,23 +99,27 @@ export class CodexAppServerClient { this.process.on('exit', (code, signal) => { const message = `Codex app-server exited (code=${code ?? 'null'}, signal=${signal ?? 'null'})`; + const error = new Error(message); logger.debug(message); - this.rejectAllPending(new Error(message)); + this.rejectAllPending(error); this.connected = false; this.resetParserState(); this.process = null; + this.notifyDisconnected(error); }); this.process.on('error', (error) => { logger.debug('[CodexAppServer] Process error', error); const message = error instanceof Error ? error.message : String(error); - this.rejectAllPending(new Error( + const disconnectError = new Error( `Failed to spawn codex app-server: ${message}. Is it installed and on PATH?`, { cause: error } - )); + ); + this.rejectAllPending(disconnectError); this.connected = false; this.resetParserState(); this.process = null; + this.notifyDisconnected(disconnectError); }); this.connected = true; @@ -125,6 +130,10 @@ export class CodexAppServerClient { this.notificationHandler = handler; } + setDisconnectHandler(handler: ((error: Error) => void) | null): void { + this.disconnectHandler = handler; + } + registerRequestHandler(method: string, handler: RequestHandler): void { this.requestHandlers.set(method, handler); } @@ -190,14 +199,24 @@ export class CodexAppServerClient { } catch (error) { logger.debug('[CodexAppServer] Error while stopping process', error); } finally { - this.rejectAllPending(new Error('Codex app-server disconnected')); + const disconnectError = new Error('Codex app-server disconnected'); + this.rejectAllPending(disconnectError); this.connected = false; this.resetParserState(); + this.notifyDisconnected(disconnectError); } logger.debug('[CodexAppServer] Disconnected'); } + private notifyDisconnected(error: Error): void { + try { + this.disconnectHandler?.(error); + } catch (handlerError) { + logger.debug('[CodexAppServer] Disconnect handler error', handlerError); + } + } + private async sendRequest( method: string, params?: unknown, diff --git a/cli/src/codex/codexRemoteLauncher.test.ts b/cli/src/codex/codexRemoteLauncher.test.ts index 13327fde0..6895c6b14 100644 --- a/cli/src/codex/codexRemoteLauncher.test.ts +++ b/cli/src/codex/codexRemoteLauncher.test.ts @@ -9,6 +9,7 @@ const harness = vi.hoisted(() => ({ startTurnCalls: [] as unknown[], steerTurnCalls: [] as unknown[], notificationHandler: null as ((method: string, params: unknown) => void) | null, + disconnectHandler: null as ((error: Error) => void) | null, startTurnImpl: null as null | (() => Promise<{ turn: Record }>), steerTurnImpl: null as null | (() => Promise>) })); @@ -33,6 +34,10 @@ vi.mock('./codexAppServerClient', () => { harness.registerRequestCalls.push(method); } + setDisconnectHandler(handler: ((error: Error) => void) | null): void { + harness.disconnectHandler = handler; + } + async startThread(): Promise<{ thread: { id: string }; model: string }> { return { thread: { id: 'thread-anonymous' }, model: 'gpt-5.4' }; } @@ -209,6 +214,7 @@ describe('codexRemoteLauncher', () => { harness.startTurnCalls = []; harness.steerTurnCalls = []; harness.notificationHandler = null; + harness.disconnectHandler = null; harness.startTurnImpl = null; harness.steerTurnImpl = null; }); @@ -274,4 +280,61 @@ describe('codexRemoteLauncher', () => { expect(thinkingChanges).toContain(true); expect(session.thinking).toBe(false); }); + + it('steers queued messages once the active turn id becomes known', async () => { + let releaseTurnStart!: () => void; + harness.startTurnImpl = async () => { + await new Promise((resolve) => { + releaseTurnStart = resolve; + }); + return { turn: { id: 'turn-late-id' } }; + }; + + const { session, sessionEvents, thinkingChanges } = createSessionStub({ closeQueue: false }); + const launcherPromise = codexRemoteLauncher(session as never); + + await waitFor(() => harness.startTurnCalls.length === 1); + + session.queue.push('second message before turn id', createMode()); + session.queue.close(); + expect(harness.steerTurnCalls).toHaveLength(0); + + releaseTurnStart(); + await waitFor(() => harness.steerTurnCalls.length === 1); + expect(harness.startTurnCalls).toHaveLength(1); + + const completed = { status: 'Completed', turn: { id: 'turn-late-id' } }; + harness.notifications.push({ method: 'turn/completed', params: completed }); + harness.notificationHandler?.('turn/completed', completed); + + const exitReason = await launcherPromise; + + expect(exitReason).toBe('exit'); + expect(harness.startTurnCalls).toHaveLength(1); + expect(harness.steerTurnCalls).toHaveLength(1); + expect(sessionEvents.filter((event) => event.type === 'ready').length).toBeGreaterThanOrEqual(1); + expect(thinkingChanges).not.toContain(true); + expect(session.thinking).toBe(false); + }); + + it('settles an active turn when the app-server disconnects before a terminal event', async () => { + harness.startTurnImpl = async () => { + const started = { turn: { id: 'turn-disconnect' } }; + harness.notifications.push({ method: 'turn/started', params: started }); + harness.notificationHandler?.('turn/started', started); + return { turn: { id: 'turn-disconnect' } }; + }; + + const { session, thinkingChanges } = createSessionStub(); + const launcherPromise = codexRemoteLauncher(session as never); + + await waitFor(() => harness.disconnectHandler !== null && session.thinking); + harness.disconnectHandler?.(new Error('app-server disconnected in test')); + + const exitReason = await launcherPromise; + + expect(exitReason).toBe('exit'); + expect(thinkingChanges).toContain(true); + expect(session.thinking).toBe(false); + }); }); diff --git a/cli/src/codex/codexRemoteLauncher.ts b/cli/src/codex/codexRemoteLauncher.ts index c42abd95e..51a36690d 100644 --- a/cli/src/codex/codexRemoteLauncher.ts +++ b/cli/src/codex/codexRemoteLauncher.ts @@ -328,6 +328,7 @@ class CodexRemoteLauncher extends RemoteLauncherBase { if (turnId) { this.currentTurnId = turnId; allowAnonymousTerminalEvent = false; + void maybeSteerQueuedMessages(); } else if (!this.currentTurnId) { allowAnonymousTerminalEvent = true; } @@ -603,6 +604,14 @@ class CodexRemoteLauncher extends RemoteLauncherBase { } }); + appServerClient.setDisconnectHandler(() => { + turnInFlight = false; + activeTurnModeHash = null; + allowAnonymousTerminalEvent = false; + this.currentTurnId = null; + settleTurnInFlight(); + }); + const { server: happyServer, mcpServers } = await buildHapiMcpBridge(session.client); this.happyServer = happyServer; @@ -770,6 +779,7 @@ class CodexRemoteLauncher extends RemoteLauncherBase { const turnId = asString(turn?.id); if (turnId) { this.currentTurnId = turnId; + void maybeSteerQueuedMessages(); } else if (!this.currentTurnId) { allowAnonymousTerminalEvent = true; } From 24f0286abf44b2b5512d5cf9713cd1afd894e9e7 Mon Sep 17 00:00:00 2001 From: Liu-KM Date: Mon, 20 Apr 2026 20:26:36 +0800 Subject: [PATCH 4/9] fix(codex): avoid disconnect callback during shutdown --- cli/src/codex/codexAppServerClient.test.ts | 69 ++++++++++++++++++++++ cli/src/codex/codexAppServerClient.ts | 5 +- 2 files changed, 71 insertions(+), 3 deletions(-) create mode 100644 cli/src/codex/codexAppServerClient.test.ts diff --git a/cli/src/codex/codexAppServerClient.test.ts b/cli/src/codex/codexAppServerClient.test.ts new file mode 100644 index 000000000..ed3d23be7 --- /dev/null +++ b/cli/src/codex/codexAppServerClient.test.ts @@ -0,0 +1,69 @@ +import { EventEmitter } from 'node:events'; +import { afterEach, describe, expect, it, vi } from 'vitest'; + +const harness = vi.hoisted(() => ({ + child: null as (EventEmitter & { + stdout: EventEmitter & { setEncoding: (encoding: string) => void }; + stderr: EventEmitter & { setEncoding: (encoding: string) => void }; + stdin: { end: () => void }; + }) | null, + killCalls: 0 +})); + +vi.mock('node:child_process', () => ({ + spawn: vi.fn(() => { + const child = new EventEmitter() as EventEmitter & { + stdout: EventEmitter & { setEncoding: (encoding: string) => void }; + stderr: EventEmitter & { setEncoding: (encoding: string) => void }; + stdin: { end: () => void }; + }; + child.stdout = Object.assign(new EventEmitter(), { setEncoding: () => {} }); + child.stderr = Object.assign(new EventEmitter(), { setEncoding: () => {} }); + child.stdin = { end: vi.fn() }; + harness.child = child; + return child; + }) +})); + +vi.mock('@/utils/process', () => ({ + killProcessByChildProcess: vi.fn(async (child: EventEmitter) => { + harness.killCalls += 1; + child.emit('exit', 0, null); + }) +})); + +import { CodexAppServerClient } from './codexAppServerClient'; + +describe('CodexAppServerClient disconnect handler', () => { + afterEach(() => { + harness.child = null; + harness.killCalls = 0; + }); + + it('notifies once when the app-server exits unexpectedly', async () => { + const client = new CodexAppServerClient(); + let disconnects = 0; + client.setDisconnectHandler(() => { + disconnects += 1; + }); + + await client.connect(); + harness.child?.emit('exit', 1, null); + + expect(disconnects).toBe(1); + }); + + it('does not notify the disconnect handler during intentional shutdown', async () => { + const client = new CodexAppServerClient(); + let disconnects = 0; + client.setDisconnectHandler(() => { + disconnects += 1; + }); + + await client.connect(); + await client.disconnect(); + + expect(harness.killCalls).toBe(1); + expect(disconnects).toBe(0); + }); +}); diff --git a/cli/src/codex/codexAppServerClient.ts b/cli/src/codex/codexAppServerClient.ts index d84003102..65265fd6d 100644 --- a/cli/src/codex/codexAppServerClient.ts +++ b/cli/src/codex/codexAppServerClient.ts @@ -190,6 +190,7 @@ export class CodexAppServerClient { const child = this.process; this.process = null; + this.disconnectHandler = null; try { child?.stdin.end(); @@ -199,11 +200,9 @@ export class CodexAppServerClient { } catch (error) { logger.debug('[CodexAppServer] Error while stopping process', error); } finally { - const disconnectError = new Error('Codex app-server disconnected'); - this.rejectAllPending(disconnectError); + this.rejectAllPending(new Error('Codex app-server disconnected')); this.connected = false; this.resetParserState(); - this.notifyDisconnected(disconnectError); } logger.debug('[CodexAppServer] Disconnected'); From 20f447bba00156fa25db74ca4befe3daab2d38f1 Mon Sep 17 00:00:00 2001 From: Liu-KM Date: Mon, 20 Apr 2026 20:39:11 +0800 Subject: [PATCH 5/9] fix(codex): reset thread after app-server disconnect --- cli/src/codex/codexRemoteLauncher.test.ts | 34 +++++++++++++++++++---- cli/src/codex/codexRemoteLauncher.ts | 2 ++ 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/cli/src/codex/codexRemoteLauncher.test.ts b/cli/src/codex/codexRemoteLauncher.test.ts index 6895c6b14..9c29fd8ac 100644 --- a/cli/src/codex/codexRemoteLauncher.test.ts +++ b/cli/src/codex/codexRemoteLauncher.test.ts @@ -6,6 +6,8 @@ const harness = vi.hoisted(() => ({ notifications: [] as Array<{ method: string; params: unknown }>, registerRequestCalls: [] as string[], initializeCalls: [] as unknown[], + startThreadCalls: [] as unknown[], + resumeThreadCalls: [] as unknown[], startTurnCalls: [] as unknown[], steerTurnCalls: [] as unknown[], notificationHandler: null as ((method: string, params: unknown) => void) | null, @@ -38,11 +40,13 @@ vi.mock('./codexAppServerClient', () => { harness.disconnectHandler = handler; } - async startThread(): Promise<{ thread: { id: string }; model: string }> { + async startThread(params: unknown): Promise<{ thread: { id: string }; model: string }> { + harness.startThreadCalls.push(params); return { thread: { id: 'thread-anonymous' }, model: 'gpt-5.4' }; } - async resumeThread(): Promise<{ thread: { id: string }; model: string }> { + async resumeThread(params: unknown): Promise<{ thread: { id: string }; model: string }> { + harness.resumeThreadCalls.push(params); return { thread: { id: 'thread-anonymous' }, model: 'gpt-5.4' }; } @@ -211,6 +215,8 @@ describe('codexRemoteLauncher', () => { harness.notifications = []; harness.registerRequestCalls = []; harness.initializeCalls = []; + harness.startThreadCalls = []; + harness.resumeThreadCalls = []; harness.startTurnCalls = []; harness.steerTurnCalls = []; harness.notificationHandler = null; @@ -317,23 +323,39 @@ describe('codexRemoteLauncher', () => { expect(session.thinking).toBe(false); }); - it('settles an active turn when the app-server disconnects before a terminal event', async () => { + it('re-establishes thread lifecycle after disconnecting during an active turn', async () => { + let turnStarts = 0; harness.startTurnImpl = async () => { - const started = { turn: { id: 'turn-disconnect' } }; + turnStarts += 1; + const turnId = `turn-${turnStarts}`; + const started = { turn: { id: turnId } }; harness.notifications.push({ method: 'turn/started', params: started }); harness.notificationHandler?.('turn/started', started); - return { turn: { id: 'turn-disconnect' } }; + + if (turnStarts === 2) { + const completed = { status: 'Completed', turn: { id: turnId } }; + harness.notifications.push({ method: 'turn/completed', params: completed }); + harness.notificationHandler?.('turn/completed', completed); + } + + return { turn: {} }; }; - const { session, thinkingChanges } = createSessionStub(); + const { session, thinkingChanges } = createSessionStub({ closeQueue: false }); const launcherPromise = codexRemoteLauncher(session as never); await waitFor(() => harness.disconnectHandler !== null && session.thinking); harness.disconnectHandler?.(new Error('app-server disconnected in test')); + session.queue.push('message after app-server reconnect', createMode()); + session.queue.close(); + + await waitFor(() => harness.startTurnCalls.length === 2); const exitReason = await launcherPromise; expect(exitReason).toBe('exit'); + expect(harness.startThreadCalls).toHaveLength(1); + expect(harness.resumeThreadCalls).toHaveLength(1); expect(thinkingChanges).toContain(true); expect(session.thinking).toBe(false); }); diff --git a/cli/src/codex/codexRemoteLauncher.ts b/cli/src/codex/codexRemoteLauncher.ts index 51a36690d..9af9d72f0 100644 --- a/cli/src/codex/codexRemoteLauncher.ts +++ b/cli/src/codex/codexRemoteLauncher.ts @@ -609,6 +609,8 @@ class CodexRemoteLauncher extends RemoteLauncherBase { activeTurnModeHash = null; allowAnonymousTerminalEvent = false; this.currentTurnId = null; + this.currentThreadId = null; + hasThread = false; settleTurnInFlight(); }); From d2982c7ae411d07d53a9279399a4216fb230f95f Mon Sep 17 00:00:00 2001 From: Liu-KM Date: Mon, 20 Apr 2026 20:48:11 +0800 Subject: [PATCH 6/9] fix(codex): suspend steering after failed retry --- .../agent/backends/acp/AcpSdkBackend.test.ts | 6 +-- cli/src/codex/codexRemoteLauncher.test.ts | 47 +++++++++++++++++++ cli/src/codex/codexRemoteLauncher.ts | 10 +++- 3 files changed, 58 insertions(+), 5 deletions(-) diff --git a/cli/src/agent/backends/acp/AcpSdkBackend.test.ts b/cli/src/agent/backends/acp/AcpSdkBackend.test.ts index e4c8bb5e4..5ed281da8 100644 --- a/cli/src/agent/backends/acp/AcpSdkBackend.test.ts +++ b/cli/src/agent/backends/acp/AcpSdkBackend.test.ts @@ -98,7 +98,7 @@ describe('AcpSdkBackend', () => { }); }, 0); - await sleep(5); + await sleep(1); setTimeout(() => { backendInternal.handleSessionUpdate({ @@ -111,7 +111,7 @@ describe('AcpSdkBackend', () => { status: 'in_progress' } }); - }, 3); + }, 1); setTimeout(() => { backendInternal.handleSessionUpdate({ @@ -123,7 +123,7 @@ describe('AcpSdkBackend', () => { rawOutput: { ok: true } } }); - }, 6); + }, 2); return { stopReason: 'end_turn' }; }, diff --git a/cli/src/codex/codexRemoteLauncher.test.ts b/cli/src/codex/codexRemoteLauncher.test.ts index 9c29fd8ac..1e76507e3 100644 --- a/cli/src/codex/codexRemoteLauncher.test.ts +++ b/cli/src/codex/codexRemoteLauncher.test.ts @@ -287,6 +287,53 @@ describe('codexRemoteLauncher', () => { expect(session.thinking).toBe(false); }); + it('keeps failed steering messages queued for the next turn without retrying the active turn', async () => { + let turnStarts = 0; + harness.startTurnImpl = async () => { + turnStarts += 1; + const turnId = `turn-${turnStarts}`; + const started = { turn: { id: turnId } }; + harness.notifications.push({ method: 'turn/started', params: started }); + harness.notificationHandler?.('turn/started', started); + + if (turnStarts === 2) { + const completed = { status: 'Completed', turn: { id: turnId } }; + harness.notifications.push({ method: 'turn/completed', params: completed }); + harness.notificationHandler?.('turn/completed', completed); + } + + return { turn: {} }; + }; + harness.steerTurnImpl = async () => { + throw new Error('steer rejected'); + }; + + const { session } = createSessionStub({ closeQueue: false }); + const launcherPromise = codexRemoteLauncher(session as never); + + await waitFor(() => harness.startTurnCalls.length === 1); + + session.queue.push('second message', createMode()); + await waitFor(() => harness.steerTurnCalls.length === 1); + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(harness.steerTurnCalls).toHaveLength(1); + expect(harness.startTurnCalls).toHaveLength(1); + + const completed = { status: 'Completed', turn: { id: 'turn-1' } }; + harness.notifications.push({ method: 'turn/completed', params: completed }); + harness.notificationHandler?.('turn/completed', completed); + session.queue.close(); + + await waitFor(() => harness.startTurnCalls.length === 2); + const exitReason = await launcherPromise; + + expect(exitReason).toBe('exit'); + expect(harness.steerTurnCalls).toHaveLength(1); + expect(harness.startTurnCalls).toHaveLength(2); + expect(session.thinking).toBe(false); + }); + it('steers queued messages once the active turn id becomes known', async () => { let releaseTurnStart!: () => void; harness.startTurnImpl = async () => { diff --git a/cli/src/codex/codexRemoteLauncher.ts b/cli/src/codex/codexRemoteLauncher.ts index 9af9d72f0..4b4a12a02 100644 --- a/cli/src/codex/codexRemoteLauncher.ts +++ b/cli/src/codex/codexRemoteLauncher.ts @@ -241,6 +241,7 @@ class CodexRemoteLauncher extends RemoteLauncherBase { let clearReadyAfterTurnTimer: (() => void) | null = null; let turnInFlight = false; let steeringInFlight = false; + let steeringSuspendedForTurn = false; let activeTurnModeHash: string | null = null; let allowAnonymousTerminalEvent = false; let resolveTurnSettled: (() => void) | null = null; @@ -255,7 +256,7 @@ class CodexRemoteLauncher extends RemoteLauncherBase { }; const maybeSteerQueuedMessages = async () => { - if (this.shouldExit || turnInFlight === false || steeringInFlight) { + if (this.shouldExit || turnInFlight === false || steeringInFlight || steeringSuspendedForTurn) { return; } if (!this.currentThreadId || !this.currentTurnId || !activeTurnModeHash) { @@ -294,11 +295,12 @@ class CodexRemoteLauncher extends RemoteLauncherBase { } catch (error) { if (batch) { session.queue.unshift(batch.message, batch.mode); + steeringSuspendedForTurn = true; } logger.debug('[Codex] Failed to steer active turn; keeping message queued for next turn', error); } finally { steeringInFlight = false; - if (turnInFlight && session.queue.size() > 0) { + if (!steeringSuspendedForTurn && turnInFlight && session.queue.size() > 0) { void maybeSteerQueuedMessages(); } } @@ -398,6 +400,7 @@ class CodexRemoteLauncher extends RemoteLauncherBase { } if (isTerminalEvent) { turnInFlight = false; + steeringSuspendedForTurn = false; activeTurnModeHash = null; allowAnonymousTerminalEvent = false; settleTurnInFlight(); @@ -606,6 +609,7 @@ class CodexRemoteLauncher extends RemoteLauncherBase { appServerClient.setDisconnectHandler(() => { turnInFlight = false; + steeringSuspendedForTurn = false; activeTurnModeHash = null; allowAnonymousTerminalEvent = false; this.currentTurnId = null; @@ -768,6 +772,7 @@ class CodexRemoteLauncher extends RemoteLauncherBase { cliOverrides: session.codexCliOverrides }); turnInFlight = true; + steeringSuspendedForTurn = false; activeTurnModeHash = message.hash; allowAnonymousTerminalEvent = false; const turnSettled = new Promise((resolve) => { @@ -790,6 +795,7 @@ class CodexRemoteLauncher extends RemoteLauncherBase { logger.warn('Error in codex session:', error); const isAbortError = error instanceof Error && error.name === 'AbortError'; turnInFlight = false; + steeringSuspendedForTurn = false; activeTurnModeHash = null; allowAnonymousTerminalEvent = false; this.currentTurnId = null; From cf2ac3690946f45bfd633cc8ae8e41f588cbc575 Mon Sep 17 00:00:00 2001 From: Liu-KM Date: Mon, 20 Apr 2026 20:56:58 +0800 Subject: [PATCH 7/9] fix(codex): surface app-server disconnect as turn failure --- cli/src/codex/codexRemoteLauncher.test.ts | 3 ++- cli/src/codex/codexRemoteLauncher.ts | 8 +++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/cli/src/codex/codexRemoteLauncher.test.ts b/cli/src/codex/codexRemoteLauncher.test.ts index 1e76507e3..7e680b179 100644 --- a/cli/src/codex/codexRemoteLauncher.test.ts +++ b/cli/src/codex/codexRemoteLauncher.test.ts @@ -388,7 +388,7 @@ describe('codexRemoteLauncher', () => { return { turn: {} }; }; - const { session, thinkingChanges } = createSessionStub({ closeQueue: false }); + const { session, sessionEvents, thinkingChanges } = createSessionStub({ closeQueue: false }); const launcherPromise = codexRemoteLauncher(session as never); await waitFor(() => harness.disconnectHandler !== null && session.thinking); @@ -403,6 +403,7 @@ describe('codexRemoteLauncher', () => { expect(exitReason).toBe('exit'); expect(harness.startThreadCalls).toHaveLength(1); expect(harness.resumeThreadCalls).toHaveLength(1); + expect(sessionEvents).toContainEqual({ type: 'message', message: 'Process exited unexpectedly' }); expect(thinkingChanges).toContain(true); expect(session.thinking).toBe(false); }); diff --git a/cli/src/codex/codexRemoteLauncher.ts b/cli/src/codex/codexRemoteLauncher.ts index 4b4a12a02..8228157e6 100644 --- a/cli/src/codex/codexRemoteLauncher.ts +++ b/cli/src/codex/codexRemoteLauncher.ts @@ -244,6 +244,7 @@ class CodexRemoteLauncher extends RemoteLauncherBase { let steeringSuspendedForTurn = false; let activeTurnModeHash: string | null = null; let allowAnonymousTerminalEvent = false; + let appServerDisconnectError: Error | null = null; let resolveTurnSettled: (() => void) | null = null; const settleTurnInFlight = () => { @@ -607,7 +608,8 @@ class CodexRemoteLauncher extends RemoteLauncherBase { } }); - appServerClient.setDisconnectHandler(() => { + appServerClient.setDisconnectHandler((error) => { + appServerDisconnectError = error; turnInFlight = false; steeringSuspendedForTurn = false; activeTurnModeHash = null; @@ -771,6 +773,7 @@ class CodexRemoteLauncher extends RemoteLauncherBase { }, cliOverrides: session.codexCliOverrides }); + appServerDisconnectError = null; turnInFlight = true; steeringSuspendedForTurn = false; activeTurnModeHash = message.hash; @@ -791,6 +794,9 @@ class CodexRemoteLauncher extends RemoteLauncherBase { allowAnonymousTerminalEvent = true; } await turnSettled; + if (appServerDisconnectError) { + throw appServerDisconnectError; + } } catch (error) { logger.warn('Error in codex session:', error); const isAbortError = error instanceof Error && error.name === 'AbortError'; From 13b26f8b87c1c92685eb3310c64910584e330b0a Mon Sep 17 00:00:00 2001 From: Liu-KM Date: Mon, 20 Apr 2026 21:11:01 +0800 Subject: [PATCH 8/9] fix(codex): drop aborted steering batches --- cli/src/codex/codexRemoteLauncher.test.ts | 64 +++++++++++++++++++++-- cli/src/codex/codexRemoteLauncher.ts | 12 +++-- 2 files changed, 70 insertions(+), 6 deletions(-) diff --git a/cli/src/codex/codexRemoteLauncher.test.ts b/cli/src/codex/codexRemoteLauncher.test.ts index 7e680b179..a9f6ada11 100644 --- a/cli/src/codex/codexRemoteLauncher.test.ts +++ b/cli/src/codex/codexRemoteLauncher.test.ts @@ -13,7 +13,10 @@ const harness = vi.hoisted(() => ({ notificationHandler: null as ((method: string, params: unknown) => void) | null, disconnectHandler: null as ((error: Error) => void) | null, startTurnImpl: null as null | (() => Promise<{ turn: Record }>), - steerTurnImpl: null as null | (() => Promise>) + steerTurnImpl: null as null | (( + params: unknown, + options?: { signal?: AbortSignal } + ) => Promise>) })); vi.mock('./codexAppServerClient', () => { @@ -66,10 +69,13 @@ vi.mock('./codexAppServerClient', () => { return { turn: {} }; } - async steerTurn(params: unknown): Promise> { + async steerTurn( + params: unknown, + options?: { signal?: AbortSignal } + ): Promise> { harness.steerTurnCalls.push(params); if (harness.steerTurnImpl) { - return harness.steerTurnImpl(); + return harness.steerTurnImpl(params, options); } return {}; } @@ -334,6 +340,58 @@ describe('codexRemoteLauncher', () => { expect(session.thinking).toBe(false); }); + it('drops an in-flight steering batch when abort clears the queue', async () => { + let turnStarts = 0; + harness.startTurnImpl = async () => { + turnStarts += 1; + const turnId = `turn-${turnStarts}`; + const started = { turn: { id: turnId } }; + harness.notifications.push({ method: 'turn/started', params: started }); + harness.notificationHandler?.('turn/started', started); + + if (turnStarts > 1) { + const completed = { status: 'Completed', turn: { id: turnId } }; + harness.notifications.push({ method: 'turn/completed', params: completed }); + harness.notificationHandler?.('turn/completed', completed); + } + + return { turn: { id: turnId } }; + }; + harness.steerTurnImpl = async (_params, options) => { + return await new Promise>((_resolve, reject) => { + options?.signal?.addEventListener('abort', () => { + const error = new Error('Request aborted'); + error.name = 'AbortError'; + reject(error); + }, { once: true }); + }); + }; + + const { session, rpcHandlers } = createSessionStub({ closeQueue: false }); + const launcherPromise = codexRemoteLauncher(session as never); + + await waitFor(() => harness.startTurnCalls.length === 1); + + session.queue.push('message to steer then abort', createMode()); + await waitFor(() => harness.steerTurnCalls.length === 1); + + const abortHandler = rpcHandlers.get('abort'); + expect(abortHandler).toBeDefined(); + await abortHandler?.({}); + + const completed = { status: 'Completed', turn: { id: 'turn-1' } }; + harness.notifications.push({ method: 'turn/completed', params: completed }); + harness.notificationHandler?.('turn/completed', completed); + session.queue.close(); + + const exitReason = await launcherPromise; + + expect(exitReason).toBe('exit'); + expect(harness.steerTurnCalls).toHaveLength(1); + expect(harness.startTurnCalls).toHaveLength(1); + expect(session.thinking).toBe(false); + }); + it('steers queued messages once the active turn id becomes known', async () => { let releaseTurnStart!: () => void; harness.startTurnImpl = async () => { diff --git a/cli/src/codex/codexRemoteLauncher.ts b/cli/src/codex/codexRemoteLauncher.ts index 8228157e6..736e8a1fa 100644 --- a/cli/src/codex/codexRemoteLauncher.ts +++ b/cli/src/codex/codexRemoteLauncher.ts @@ -274,6 +274,7 @@ class CodexRemoteLauncher extends RemoteLauncherBase { steeringInFlight = true; let batch: QueuedMessage | null = null; + let steerSignal: AbortSignal | null = null; try { batch = await session.queue.waitForMessagesAndGetAsString(); @@ -286,19 +287,24 @@ class CodexRemoteLauncher extends RemoteLauncherBase { } messageBuffer.addMessage(batch.message, 'user'); + steerSignal = this.abortController.signal; await appServerClient.steerTurn({ threadId: this.currentThreadId, expectedTurnId: this.currentTurnId, input: [{ type: 'text', text: batch.message }] }, { - signal: this.abortController.signal + signal: steerSignal }); } catch (error) { - if (batch) { + const isAbortError = error instanceof Error && error.name === 'AbortError'; + const wasAborted = Boolean(steerSignal?.aborted); + if (batch && !isAbortError && !wasAborted && !this.shouldExit) { session.queue.unshift(batch.message, batch.mode); steeringSuspendedForTurn = true; + logger.debug('[Codex] Failed to steer active turn; keeping message queued for next turn', error); + } else { + logger.debug('[Codex] Steering aborted; dropping queued message', error); } - logger.debug('[Codex] Failed to steer active turn; keeping message queued for next turn', error); } finally { steeringInFlight = false; if (!steeringSuspendedForTurn && turnInFlight && session.queue.size() > 0) { From deadd1b4d302ad5616178da07da709c337195381 Mon Sep 17 00:00:00 2001 From: Liu-KM Date: Mon, 20 Apr 2026 21:25:26 +0800 Subject: [PATCH 9/9] fix(codex): keep steering queued until accepted --- cli/src/codex/codexRemoteLauncher.test.ts | 40 ++++++++++++ cli/src/codex/codexRemoteLauncher.ts | 22 ++++--- cli/src/utils/MessageQueue2.ts | 76 ++++++++++++++++++----- 3 files changed, 113 insertions(+), 25 deletions(-) diff --git a/cli/src/codex/codexRemoteLauncher.test.ts b/cli/src/codex/codexRemoteLauncher.test.ts index a9f6ada11..93e8f767b 100644 --- a/cli/src/codex/codexRemoteLauncher.test.ts +++ b/cli/src/codex/codexRemoteLauncher.test.ts @@ -293,6 +293,46 @@ describe('codexRemoteLauncher', () => { expect(session.thinking).toBe(false); }); + it('keeps a steering batch queued until the steer RPC succeeds', async () => { + let resolveSteer!: () => void; + harness.startTurnImpl = async () => { + const started = { turn: { id: 'turn-1' } }; + harness.notifications.push({ method: 'turn/started', params: started }); + harness.notificationHandler?.('turn/started', started); + return { turn: { id: 'turn-1' } }; + }; + harness.steerTurnImpl = async () => { + return await new Promise>((resolve) => { + resolveSteer = () => resolve({}); + }); + }; + + const { session } = createSessionStub({ closeQueue: false }); + const launcherPromise = codexRemoteLauncher(session as never); + + await waitFor(() => harness.startTurnCalls.length === 1); + + session.queue.push('second message awaiting steer acceptance', createMode()); + await waitFor(() => harness.steerTurnCalls.length === 1); + + expect(session.queue.size()).toBe(1); + + resolveSteer(); + await waitFor(() => session.queue.size() === 0); + + const completed = { status: 'Completed', turn: { id: 'turn-1' } }; + harness.notifications.push({ method: 'turn/completed', params: completed }); + harness.notificationHandler?.('turn/completed', completed); + session.queue.close(); + + const exitReason = await launcherPromise; + + expect(exitReason).toBe('exit'); + expect(harness.startTurnCalls).toHaveLength(1); + expect(harness.steerTurnCalls).toHaveLength(1); + expect(session.thinking).toBe(false); + }); + it('keeps failed steering messages queued for the next turn without retrying the active turn', async () => { let turnStarts = 0; harness.startTurnImpl = async () => { diff --git a/cli/src/codex/codexRemoteLauncher.ts b/cli/src/codex/codexRemoteLauncher.ts index 736e8a1fa..e8552827c 100644 --- a/cli/src/codex/codexRemoteLauncher.ts +++ b/cli/src/codex/codexRemoteLauncher.ts @@ -16,6 +16,7 @@ import { AppServerEventConverter } from './utils/appServerEventConverter'; import { registerAppServerPermissionHandlers } from './utils/appServerPermissionAdapter'; import { buildThreadStartParams, buildTurnStartParams } from './utils/appServerConfig'; import { shouldIgnoreTerminalEvent } from './utils/terminalEventGuard'; +import type { PeekedMessageBatch } from '@/utils/MessageQueue2'; import { RemoteLauncherBase, type RemoteLauncherDisplayContext, @@ -24,6 +25,7 @@ import { type HappyServer = Awaited>['server']; type QueuedMessage = { message: string; mode: EnhancedMode; isolate: boolean; hash: string }; +type PeekedQueuedMessage = PeekedMessageBatch; class CodexRemoteLauncher extends RemoteLauncherBase { private readonly session: CodexSession; @@ -273,20 +275,18 @@ class CodexRemoteLauncher extends RemoteLauncherBase { } steeringInFlight = true; - let batch: QueuedMessage | null = null; + let batch: PeekedQueuedMessage | null = null; let steerSignal: AbortSignal | null = null; try { - batch = await session.queue.waitForMessagesAndGetAsString(); + batch = session.queue.peekMessagesAndGetAsString(); if (!batch) { return; } if (batch.isolate || batch.hash !== activeTurnModeHash) { - session.queue.unshift(batch.message, batch.mode); return; } - messageBuffer.addMessage(batch.message, 'user'); steerSignal = this.abortController.signal; await appServerClient.steerTurn({ threadId: this.currentThreadId, @@ -295,13 +295,19 @@ class CodexRemoteLauncher extends RemoteLauncherBase { }, { signal: steerSignal }); + if (steerSignal.aborted || this.shouldExit) { + return; + } + if (session.queue.consumePeekedBatch(batch)) { + messageBuffer.addMessage(batch.message, 'user'); + } else { + logger.debug('[Codex] Steered batch was no longer queued after steer success'); + } } catch (error) { - const isAbortError = error instanceof Error && error.name === 'AbortError'; const wasAborted = Boolean(steerSignal?.aborted); - if (batch && !isAbortError && !wasAborted && !this.shouldExit) { - session.queue.unshift(batch.message, batch.mode); + if (batch && !wasAborted && !this.shouldExit) { steeringSuspendedForTurn = true; - logger.debug('[Codex] Failed to steer active turn; keeping message queued for next turn', error); + logger.debug('[Codex] Failed to steer active turn; leaving message queued for next turn', error); } else { logger.debug('[Codex] Steering aborted; dropping queued message', error); } diff --git a/cli/src/utils/MessageQueue2.ts b/cli/src/utils/MessageQueue2.ts index 6ba5fcdd1..b89dbfef4 100644 --- a/cli/src/utils/MessageQueue2.ts +++ b/cli/src/utils/MessageQueue2.ts @@ -1,12 +1,23 @@ import { logger } from "@/ui/logger"; -interface QueueItem { +export interface QueueItem { message: string; mode: T; modeHash: string; isolate?: boolean; // If true, this message must be processed alone } +export type MessageBatch = { + message: string; + mode: T; + isolate: boolean; + hash: string; +}; + +export type PeekedMessageBatch = MessageBatch & { + items: QueueItem[]; +}; + /** * A mode-aware message queue that stores messages with their modes. * Returns consistent batches of messages with the same mode. @@ -221,7 +232,7 @@ export class MessageQueue2 { * Wait for messages and return all messages with the same mode as a single string * Returns { message: string, mode: T } or null if aborted/closed */ - async waitForMessagesAndGetAsString(abortSignal?: AbortSignal): Promise<{ message: string, mode: T, isolate: boolean, hash: string } | null> { + async waitForMessagesAndGetAsString(abortSignal?: AbortSignal): Promise | null> { // If we have messages, return them immediately if (this.queue.length > 0) { return this.collectBatch(); @@ -245,41 +256,72 @@ export class MessageQueue2 { /** * Collect a batch of messages with the same mode, respecting isolation requirements */ - private collectBatch(): { message: string, mode: T, hash: string, isolate: boolean } | null { + /** + * Peek at the next batch without removing it from the queue. + * Call consumePeekedBatch() after the downstream consumer accepts it. + */ + peekMessagesAndGetAsString(): PeekedMessageBatch | null { if (this.queue.length === 0) { return null; } const firstItem = this.queue[0]; - const sameModeMessages: string[] = []; - let mode = firstItem.mode; - let isolate = firstItem.isolate ?? false; + const items: QueueItem[] = []; + const mode = firstItem.mode; + const isolate = firstItem.isolate ?? false; const targetModeHash = firstItem.modeHash; // If the first message requires isolation, only process it alone if (firstItem.isolate) { - const item = this.queue.shift()!; - sameModeMessages.push(item.message); - logger.debug(`[MessageQueue2] Collected isolated message with mode hash: ${targetModeHash}`); + items.push(firstItem); + logger.debug(`[MessageQueue2] Peeked isolated message with mode hash: ${targetModeHash}`); } else { // Collect all messages with the same mode until we hit an isolated message - while (this.queue.length > 0 && - this.queue[0].modeHash === targetModeHash && - !this.queue[0].isolate) { - const item = this.queue.shift()!; - sameModeMessages.push(item.message); + while (items.length < this.queue.length && + this.queue[items.length].modeHash === targetModeHash && + !this.queue[items.length].isolate) { + items.push(this.queue[items.length]); } - logger.debug(`[MessageQueue2] Collected batch of ${sameModeMessages.length} messages with mode hash: ${targetModeHash}`); + logger.debug(`[MessageQueue2] Peeked batch of ${items.length} messages with mode hash: ${targetModeHash}`); } // Join all messages with newlines - const combinedMessage = sameModeMessages.join('\n'); + const combinedMessage = items.map((item) => item.message).join('\n'); return { message: combinedMessage, mode, hash: targetModeHash, - isolate + isolate, + items + }; + } + + /** + * Remove a previously peeked batch only if it is still at the front. + * Returns false when the queue changed, for example after reset(). + */ + consumePeekedBatch(batch: PeekedMessageBatch): boolean { + for (let index = 0; index < batch.items.length; index += 1) { + if (this.queue[index] !== batch.items[index]) { + return false; + } + } + this.queue.splice(0, batch.items.length); + return true; + } + + private collectBatch(): MessageBatch | null { + const batch = this.peekMessagesAndGetAsString(); + if (!batch) { + return null; + } + this.consumePeekedBatch(batch); + return { + message: batch.message, + mode: batch.mode, + hash: batch.hash, + isolate: batch.isolate }; }