Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/gambit-core/decks/openai/codex-sdk/codex_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type CodexEvent =
| { type: "thread.started"; thread_id?: unknown }
| {
type: "item.completed";
item?: { type?: unknown; text?: unknown };
item?: { id?: unknown; type?: unknown; text?: unknown };
}
| { type: string; [key: string]: unknown };

Expand Down Expand Up @@ -53,6 +53,9 @@ function parseCodexEvents(stdout: string): {
if (!item || typeof item !== "object") continue;
const rec = item as Record<string, unknown>;
if (rec.type !== "agent_message") continue;
if (typeof rec.id !== "string" || !rec.id.trim()) {
throw new Error("codex exec emitted agent_message without item.id");
}
if (typeof rec.text !== "string") continue;
const next = rec.text.trim();
if (next) assistantText = next;
Expand Down
2 changes: 1 addition & 1 deletion src/cli.codex_smoke.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ if [ -z "\${CODEX_ARGS_LOG:-}" ]; then
fi
printf '%s\n' "$@" > "$CODEX_ARGS_LOG"
echo '{"type":"thread.started","thread_id":"thread-smoke"}'
echo '{"type":"item.completed","item":{"type":"agent_message","text":"ok"}}'
echo '{"type":"item.completed","item":{"id":"msg_1","type":"agent_message","text":"ok"}}'
echo '{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1,"total_tokens":2}}'
`;
await Deno.writeTextFile(binPath, script);
Expand Down
84 changes: 73 additions & 11 deletions src/providers/codex.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { assertEquals, assertThrows } from "@std/assert";
import { assertEquals, assertRejects, assertThrows } from "@std/assert";
import { createCodexProvider, parseCodexArgsForTest } from "./codex.ts";
import type { ProviderTraceEvent, SavedState } from "@bolt-foundry/gambit-core";

Expand All @@ -15,7 +15,7 @@ Deno.test("codex provider starts thread and resumes with saved thread id", async
? [
JSON.stringify({
type: "item.completed",
item: { type: "agent_message", text: "second reply" },
item: { id: "msg_2", type: "agent_message", text: "second reply" },
}),
JSON.stringify({
type: "turn.completed",
Expand All @@ -26,7 +26,7 @@ Deno.test("codex provider starts thread and resumes with saved thread id", async
JSON.stringify({ type: "thread.started", thread_id: threadId }),
JSON.stringify({
type: "item.completed",
item: { type: "agent_message", text: "first reply" },
item: { id: "msg_1", type: "agent_message", text: "first reply" },
}),
JSON.stringify({
type: "turn.completed",
Expand Down Expand Up @@ -101,7 +101,11 @@ Deno.test("codex provider responses returns updatedState with thread metadata",
JSON.stringify({ type: "thread.started", thread_id: "thread-rsp" }),
JSON.stringify({
type: "item.completed",
item: { type: "agent_message", text: "response mode reply" },
item: {
id: "msg_1",
type: "agent_message",
text: "response mode reply",
},
}),
].join("\n"),
),
Expand Down Expand Up @@ -135,7 +139,11 @@ Deno.test("codex provider updatedState does not carry prior traces", async () =>
JSON.stringify({ type: "thread.started", thread_id: "thread-rsp" }),
JSON.stringify({
type: "item.completed",
item: { type: "agent_message", text: "response mode reply" },
item: {
id: "msg_1",
type: "agent_message",
text: "response mode reply",
},
}),
].join("\n"),
),
Expand Down Expand Up @@ -172,7 +180,11 @@ Deno.test("codex provider responses forwards request.params to codex args", asyn
JSON.stringify({ type: "thread.started", thread_id: "thread-rsp" }),
JSON.stringify({
type: "item.completed",
item: { type: "agent_message", text: "response mode reply" },
item: {
id: "msg_1",
type: "agent_message",
text: "response mode reply",
},
}),
].join("\n"),
),
Expand Down Expand Up @@ -210,7 +222,11 @@ Deno.test("codex provider responses forwards abort signal to command runner", as
JSON.stringify({ type: "thread.started", thread_id: "thread-rsp" }),
JSON.stringify({
type: "item.completed",
item: { type: "agent_message", text: "response mode reply" },
item: {
id: "msg_1",
type: "agent_message",
text: "response mode reply",
},
}),
].join("\n"),
),
Expand Down Expand Up @@ -408,6 +424,52 @@ Deno.test("codex provider preserves multiple assistant message items in order",
);
});

Deno.test("codex provider requires assistant item ids from codex", async () => {
const provider = createCodexProvider({
runCommand: ({ onStdoutLine }) => {
const lines = [
JSON.stringify({
type: "item.delta",
item: { type: "agent_message", text: "draft " },
}),
JSON.stringify({
type: "item.delta",
item: { type: "agent_message", text: "reply" },
}),
JSON.stringify({
type: "item.completed",
item: { type: "agent_message", text: "draft reply" },
}),
];
lines.forEach((line) => onStdoutLine?.(line));
return Promise.resolve({
success: true,
code: 0,
stdout: enc.encode(lines.join("\n")),
stderr: new Uint8Array(),
});
},
});

await assertRejects(
async () => {
await provider.responses?.({
request: {
model: "codex-cli/default",
stream: true,
input: [{
type: "message",
role: "user",
content: [{ type: "input_text", text: "hi" }],
}],
},
});
},
Error,
"missing required item.id",
);
});

Deno.test("codex provider streams completed-only assistant text once", async () => {
const streamedText: Array<string> = [];
const provider = createCodexProvider({
Expand Down Expand Up @@ -471,7 +533,7 @@ Deno.test("codex provider emits tool traces for mcp tool events", async () => {
}),
JSON.stringify({
type: "item.completed",
item: { type: "agent_message", text: "done" },
item: { id: "msg_done", type: "agent_message", text: "done" },
}),
];
lines.forEach((line) => onStdoutLine?.(line));
Expand Down Expand Up @@ -544,7 +606,7 @@ Deno.test("codex provider emits tool traces for command execution events", async
}),
JSON.stringify({
type: "item.completed",
item: { type: "agent_message", text: "done" },
item: { id: "msg_done", type: "agent_message", text: "done" },
}),
];
lines.forEach((line) => onStdoutLine?.(line));
Expand Down Expand Up @@ -630,7 +692,7 @@ Deno.test("codex provider emits in-progress tool results for command execution d
}),
JSON.stringify({
type: "item.completed",
item: { type: "agent_message", text: "done" },
item: { id: "msg_done", type: "agent_message", text: "done" },
}),
];
lines.forEach((line) => onStdoutLine?.(line));
Expand Down Expand Up @@ -686,7 +748,7 @@ Deno.test("codex provider emits tool traces for file change events", async () =>
}),
JSON.stringify({
type: "item.completed",
item: { type: "agent_message", text: "done" },
item: { id: "msg_done", type: "agent_message", text: "done" },
}),
];
lines.forEach((line) => onStdoutLine?.(line));
Expand Down
89 changes: 72 additions & 17 deletions src/providers/codex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,53 @@ type CodexAssistantStreamState = {
emittedTerminalAssistantItemIds: Set<string>;
};

function requireCodexAssistantItemId(input: {
payloadType: string;
record: Record<string, JSONValue>;
}): string {
const itemId = typeof input.record.id === "string"
? input.record.id.trim()
: "";
if (itemId) return itemId;
throw new Error(
`Codex ${input.payloadType} agent_message is missing required item.id.`,
);
}

function resolveCodexAssistantItemIdentity(input: {
payloadType: string;
record: Record<string, JSONValue>;
assistantState: Pick<
CodexAssistantStreamState,
"assistantOutputIndexByItemId"
>;
nextOutputIndexRef: { value: number };
}): {
itemId: string;
outputIndex: number;
} {
const itemId = requireCodexAssistantItemId({
payloadType: input.payloadType,
record: input.record,
});
const existing = input.assistantState.assistantOutputIndexByItemId.get(
itemId,
);
if (typeof existing === "number") {
return {
itemId,
outputIndex: existing,
};
}
const next = input.nextOutputIndexRef.value;
input.nextOutputIndexRef.value += 1;
input.assistantState.assistantOutputIndexByItemId.set(itemId, next);
return {
itemId,
outputIndex: next,
};
}

function emitCodexAssistantTextEvents(input: {
event: Record<string, JSONValue>;
emit: (event: Record<string, JSONValue>) => void;
Expand All @@ -464,19 +511,12 @@ function emitCodexAssistantTextEvents(input: {
const record = item as Record<string, JSONValue>;
if (record.type !== "agent_message") return;

const itemId = typeof record.id === "string" && record.id.trim().length > 0
? record.id.trim()
: `assistant_${input.nextOutputIndexRef.value}`;
const outputIndex = (() => {
const existing = input.assistantState.assistantOutputIndexByItemId.get(
itemId,
);
if (typeof existing === "number") return existing;
const next = input.nextOutputIndexRef.value;
input.nextOutputIndexRef.value += 1;
input.assistantState.assistantOutputIndexByItemId.set(itemId, next);
return next;
})();
const { itemId, outputIndex } = resolveCodexAssistantItemIdentity({
payloadType,
record,
assistantState: input.assistantState,
nextOutputIndexRef: input.nextOutputIndexRef,
});
const text = extractCodexItemText(record);
if (!text) return;

Expand Down Expand Up @@ -748,6 +788,13 @@ function parseCodexStdout(stdout: string): {
completionTokens: number;
totalTokens: number;
} | undefined;
const assistantState: CodexAssistantStreamState = {
streamedText: "",
sawAssistantTextStream: false,
assistantOutputIndexByItemId: new Map<string, number>(),
emittedTerminalAssistantItemIds: new Set<string>(),
};
const nextOutputIndexRef = { value: 0 };

for (const line of stdout.split(/\r?\n/)) {
const trimmed = line.trim();
Expand All @@ -767,10 +814,20 @@ function parseCodexStdout(stdout: string): {
continue;
}

if (parsed.type === "item.completed" || parsed.type === "item.done") {
if (
parsed.type === "item.delta" || parsed.type === "item.completed" ||
parsed.type === "item.done"
) {
const item = parsed.item as Record<string, unknown> | undefined;
if (!item || typeof item !== "object") continue;
if (item.type !== "agent_message") continue;
const { itemId } = resolveCodexAssistantItemIdentity({
payloadType: parsed.type,
record: item as Record<string, JSONValue>,
assistantState,
nextOutputIndexRef,
});
if (parsed.type === "item.delta") continue;
const content = typeof item.text === "string"
? item.text.trim()
: Array.isArray(item.content)
Expand All @@ -785,9 +842,7 @@ function parseCodexStdout(stdout: string): {
: "";
if (content) {
assistantMessages.push({
itemId: typeof item.id === "string" && item.id.trim().length > 0
? item.id.trim()
: null,
itemId,
text: content,
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/providers/provider_conformance.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ Deno.test("provider conformance: codex responses forwards abort signal", async (
JSON.stringify({ type: "thread.started", thread_id: "thread-1" }),
JSON.stringify({
type: "item.completed",
item: { type: "agent_message", text: "ok" },
item: { id: "msg_1", type: "agent_message", text: "ok" },
}),
].join("\n"),
),
Expand Down
Loading