diff --git a/packages/gambit-core/decks/openai/codex-sdk/codex_client.ts b/packages/gambit-core/decks/openai/codex-sdk/codex_client.ts index 39e4c49a..7e78366c 100644 --- a/packages/gambit-core/decks/openai/codex-sdk/codex_client.ts +++ b/packages/gambit-core/decks/openai/codex-sdk/codex_client.ts @@ -13,7 +13,7 @@ type CodexEvent = | { type: "thread.started"; thread_id?: unknown } | { type: "item.completed"; - item?: { type?: unknown; text?: unknown }; + item?: { id?: unknown; type?: unknown; text?: unknown }; } | { type: string; [key: string]: unknown }; @@ -53,6 +53,9 @@ function parseCodexEvents(stdout: string): { if (!item || typeof item !== "object") continue; const rec = item as Record; if (rec.type !== "agent_message") continue; + if (typeof rec.id !== "string" || !rec.id.trim()) { + throw new Error("codex exec emitted agent_message without item.id"); + } if (typeof rec.text !== "string") continue; const next = rec.text.trim(); if (next) assistantText = next; diff --git a/src/cli.codex_smoke.test.ts b/src/cli.codex_smoke.test.ts index 10a0c8e1..f3dd32b5 100644 --- a/src/cli.codex_smoke.test.ts +++ b/src/cli.codex_smoke.test.ts @@ -97,7 +97,7 @@ if [ -z "\${CODEX_ARGS_LOG:-}" ]; then fi printf '%s\n' "$@" > "$CODEX_ARGS_LOG" echo '{"type":"thread.started","thread_id":"thread-smoke"}' -echo '{"type":"item.completed","item":{"type":"agent_message","text":"ok"}}' +echo '{"type":"item.completed","item":{"id":"msg_1","type":"agent_message","text":"ok"}}' echo '{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1,"total_tokens":2}}' `; await Deno.writeTextFile(binPath, script); diff --git a/src/providers/codex.test.ts b/src/providers/codex.test.ts index 1b01c195..b35698a0 100644 --- a/src/providers/codex.test.ts +++ b/src/providers/codex.test.ts @@ -1,4 +1,4 @@ -import { assertEquals, assertThrows } from "@std/assert"; +import { assertEquals, assertRejects, assertThrows } from "@std/assert"; import { createCodexProvider, parseCodexArgsForTest } from "./codex.ts"; import type { ProviderTraceEvent, SavedState } from "@bolt-foundry/gambit-core"; @@ -15,7 +15,7 @@ Deno.test("codex provider starts thread and resumes with saved thread id", async ? [ JSON.stringify({ type: "item.completed", - item: { type: "agent_message", text: "second reply" }, + item: { id: "msg_2", type: "agent_message", text: "second reply" }, }), JSON.stringify({ type: "turn.completed", @@ -26,7 +26,7 @@ Deno.test("codex provider starts thread and resumes with saved thread id", async JSON.stringify({ type: "thread.started", thread_id: threadId }), JSON.stringify({ type: "item.completed", - item: { type: "agent_message", text: "first reply" }, + item: { id: "msg_1", type: "agent_message", text: "first reply" }, }), JSON.stringify({ type: "turn.completed", @@ -101,7 +101,11 @@ Deno.test("codex provider responses returns updatedState with thread metadata", JSON.stringify({ type: "thread.started", thread_id: "thread-rsp" }), JSON.stringify({ type: "item.completed", - item: { type: "agent_message", text: "response mode reply" }, + item: { + id: "msg_1", + type: "agent_message", + text: "response mode reply", + }, }), ].join("\n"), ), @@ -135,7 +139,11 @@ Deno.test("codex provider updatedState does not carry prior traces", async () => JSON.stringify({ type: "thread.started", thread_id: "thread-rsp" }), JSON.stringify({ type: "item.completed", - item: { type: "agent_message", text: "response mode reply" }, + item: { + id: "msg_1", + type: "agent_message", + text: "response mode reply", + }, }), ].join("\n"), ), @@ -172,7 +180,11 @@ Deno.test("codex provider responses forwards request.params to codex args", asyn JSON.stringify({ type: "thread.started", thread_id: "thread-rsp" }), JSON.stringify({ type: "item.completed", - item: { type: "agent_message", text: "response mode reply" }, + item: { + id: "msg_1", + type: "agent_message", + text: "response mode reply", + }, }), ].join("\n"), ), @@ -210,7 +222,11 @@ Deno.test("codex provider responses forwards abort signal to command runner", as JSON.stringify({ type: "thread.started", thread_id: "thread-rsp" }), JSON.stringify({ type: "item.completed", - item: { type: "agent_message", text: "response mode reply" }, + item: { + id: "msg_1", + type: "agent_message", + text: "response mode reply", + }, }), ].join("\n"), ), @@ -408,6 +424,52 @@ Deno.test("codex provider preserves multiple assistant message items in order", ); }); +Deno.test("codex provider requires assistant item ids from codex", async () => { + const provider = createCodexProvider({ + runCommand: ({ onStdoutLine }) => { + const lines = [ + JSON.stringify({ + type: "item.delta", + item: { type: "agent_message", text: "draft " }, + }), + JSON.stringify({ + type: "item.delta", + item: { type: "agent_message", text: "reply" }, + }), + JSON.stringify({ + type: "item.completed", + item: { type: "agent_message", text: "draft reply" }, + }), + ]; + lines.forEach((line) => onStdoutLine?.(line)); + return Promise.resolve({ + success: true, + code: 0, + stdout: enc.encode(lines.join("\n")), + stderr: new Uint8Array(), + }); + }, + }); + + await assertRejects( + async () => { + await provider.responses?.({ + request: { + model: "codex-cli/default", + stream: true, + input: [{ + type: "message", + role: "user", + content: [{ type: "input_text", text: "hi" }], + }], + }, + }); + }, + Error, + "missing required item.id", + ); +}); + Deno.test("codex provider streams completed-only assistant text once", async () => { const streamedText: Array = []; const provider = createCodexProvider({ @@ -471,7 +533,7 @@ Deno.test("codex provider emits tool traces for mcp tool events", async () => { }), JSON.stringify({ type: "item.completed", - item: { type: "agent_message", text: "done" }, + item: { id: "msg_done", type: "agent_message", text: "done" }, }), ]; lines.forEach((line) => onStdoutLine?.(line)); @@ -544,7 +606,7 @@ Deno.test("codex provider emits tool traces for command execution events", async }), JSON.stringify({ type: "item.completed", - item: { type: "agent_message", text: "done" }, + item: { id: "msg_done", type: "agent_message", text: "done" }, }), ]; lines.forEach((line) => onStdoutLine?.(line)); @@ -630,7 +692,7 @@ Deno.test("codex provider emits in-progress tool results for command execution d }), JSON.stringify({ type: "item.completed", - item: { type: "agent_message", text: "done" }, + item: { id: "msg_done", type: "agent_message", text: "done" }, }), ]; lines.forEach((line) => onStdoutLine?.(line)); @@ -686,7 +748,7 @@ Deno.test("codex provider emits tool traces for file change events", async () => }), JSON.stringify({ type: "item.completed", - item: { type: "agent_message", text: "done" }, + item: { id: "msg_done", type: "agent_message", text: "done" }, }), ]; lines.forEach((line) => onStdoutLine?.(line)); diff --git a/src/providers/codex.ts b/src/providers/codex.ts index ade66f3f..06ebc27d 100644 --- a/src/providers/codex.ts +++ b/src/providers/codex.ts @@ -448,6 +448,53 @@ type CodexAssistantStreamState = { emittedTerminalAssistantItemIds: Set; }; +function requireCodexAssistantItemId(input: { + payloadType: string; + record: Record; +}): string { + const itemId = typeof input.record.id === "string" + ? input.record.id.trim() + : ""; + if (itemId) return itemId; + throw new Error( + `Codex ${input.payloadType} agent_message is missing required item.id.`, + ); +} + +function resolveCodexAssistantItemIdentity(input: { + payloadType: string; + record: Record; + assistantState: Pick< + CodexAssistantStreamState, + "assistantOutputIndexByItemId" + >; + nextOutputIndexRef: { value: number }; +}): { + itemId: string; + outputIndex: number; +} { + const itemId = requireCodexAssistantItemId({ + payloadType: input.payloadType, + record: input.record, + }); + const existing = input.assistantState.assistantOutputIndexByItemId.get( + itemId, + ); + if (typeof existing === "number") { + return { + itemId, + outputIndex: existing, + }; + } + const next = input.nextOutputIndexRef.value; + input.nextOutputIndexRef.value += 1; + input.assistantState.assistantOutputIndexByItemId.set(itemId, next); + return { + itemId, + outputIndex: next, + }; +} + function emitCodexAssistantTextEvents(input: { event: Record; emit: (event: Record) => void; @@ -464,19 +511,12 @@ function emitCodexAssistantTextEvents(input: { const record = item as Record; if (record.type !== "agent_message") return; - const itemId = typeof record.id === "string" && record.id.trim().length > 0 - ? record.id.trim() - : `assistant_${input.nextOutputIndexRef.value}`; - const outputIndex = (() => { - const existing = input.assistantState.assistantOutputIndexByItemId.get( - itemId, - ); - if (typeof existing === "number") return existing; - const next = input.nextOutputIndexRef.value; - input.nextOutputIndexRef.value += 1; - input.assistantState.assistantOutputIndexByItemId.set(itemId, next); - return next; - })(); + const { itemId, outputIndex } = resolveCodexAssistantItemIdentity({ + payloadType, + record, + assistantState: input.assistantState, + nextOutputIndexRef: input.nextOutputIndexRef, + }); const text = extractCodexItemText(record); if (!text) return; @@ -748,6 +788,13 @@ function parseCodexStdout(stdout: string): { completionTokens: number; totalTokens: number; } | undefined; + const assistantState: CodexAssistantStreamState = { + streamedText: "", + sawAssistantTextStream: false, + assistantOutputIndexByItemId: new Map(), + emittedTerminalAssistantItemIds: new Set(), + }; + const nextOutputIndexRef = { value: 0 }; for (const line of stdout.split(/\r?\n/)) { const trimmed = line.trim(); @@ -767,10 +814,20 @@ function parseCodexStdout(stdout: string): { continue; } - if (parsed.type === "item.completed" || parsed.type === "item.done") { + if ( + parsed.type === "item.delta" || parsed.type === "item.completed" || + parsed.type === "item.done" + ) { const item = parsed.item as Record | undefined; if (!item || typeof item !== "object") continue; if (item.type !== "agent_message") continue; + const { itemId } = resolveCodexAssistantItemIdentity({ + payloadType: parsed.type, + record: item as Record, + assistantState, + nextOutputIndexRef, + }); + if (parsed.type === "item.delta") continue; const content = typeof item.text === "string" ? item.text.trim() : Array.isArray(item.content) @@ -785,9 +842,7 @@ function parseCodexStdout(stdout: string): { : ""; if (content) { assistantMessages.push({ - itemId: typeof item.id === "string" && item.id.trim().length > 0 - ? item.id.trim() - : null, + itemId, text: content, }); } diff --git a/src/providers/provider_conformance.test.ts b/src/providers/provider_conformance.test.ts index 969dead1..689184d6 100644 --- a/src/providers/provider_conformance.test.ts +++ b/src/providers/provider_conformance.test.ts @@ -170,7 +170,7 @@ Deno.test("provider conformance: codex responses forwards abort signal", async ( JSON.stringify({ type: "thread.started", thread_id: "thread-1" }), JSON.stringify({ type: "item.completed", - item: { type: "agent_message", text: "ok" }, + item: { id: "msg_1", type: "agent_message", text: "ok" }, }), ].join("\n"), ),