From c5476bc0273688469649ee3ed0a86d228f10776e Mon Sep 17 00:00:00 2001 From: mikemolinet Date: Wed, 15 Apr 2026 16:13:31 -0700 Subject: [PATCH] fix: emit content_part.done and populate output_text.done.text per Responses API spec The /v1/responses streaming handler violates the OpenAI Responses API SSE lifecycle spec in two ways: 1. response.content_part.done is never emitted. Per the spec (https://platform.openai.com/docs/api-reference/responses-streaming), the event sequence for a text content part should be: content_part.added -> output_text.delta* -> output_text.done -> content_part.done -> output_item.done 2. response.output_text.done is emitted with text: "" instead of the accumulated output text. The spec requires the final content. Accumulate delta tokens in a local variable at the streaming call site, emit the missing response.content_part.done event with the accumulated text in part.text, and populate output_text.done.text with the same accumulated content. Gate the new content_part.done event on at least one delta having been received, keeping the content-part added/done lifecycle symmetric. Adds one regression test in index.test.js that asserts: - output_text.done.text equals the accumulated deltas - content_part.done event is present with part.text populated - correct ordering (output_text.done < content_part.done < output_item.done) Closes #48 --- index.js | 19 ++++++++++++- index.test.js | 77 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) diff --git a/index.js b/index.js index e3532e9..a50cbdf 100644 --- a/index.js +++ b/index.js @@ -907,6 +907,9 @@ export function createProxyFetchHandler(client) { ) let partIndex = 0 + // Accumulate delta tokens so we can populate `text` on output_text.done and content_part.done per the + // OpenAI Responses API SSE spec (https://platform.openai.com/docs/api-reference/responses-streaming). + let accumulatedText = "" const runPromise = executePromptStreaming( client, model, @@ -925,6 +928,7 @@ export function createProxyFetchHandler(client) { ) partIndex++ } + accumulatedText += delta queue.enqueue( sseEvent("response.output_text.delta", { type: "response.output_text.delta", @@ -943,9 +947,22 @@ export function createProxyFetchHandler(client) { item_id: itemID, output_index: 0, content_index: 0, - text: "", + text: accumulatedText, }), ) + if (partIndex > 0) { + // Only emit content_part.done if content_part.added was emitted (i.e. at least one delta arrived). + // Keeps the content-part lifecycle symmetric per the OpenAI Responses API spec. + queue.enqueue( + sseEvent("response.content_part.done", { + type: "response.content_part.done", + item_id: itemID, + output_index: 0, + content_index: 0, + part: { type: "output_text", text: accumulatedText, annotations: [] }, + }), + ) + } queue.enqueue( sseEvent("response.output_item.done", { type: "response.output_item.done", diff --git a/index.test.js b/index.test.js index f663ea0..b55b403 100644 --- a/index.test.js +++ b/index.test.js @@ -79,6 +79,21 @@ function createStreamingClient(chunks) { } } +function parseSseStream(text) { + // Parses SSE `event: \ndata: \n\n` chunks into an ordered array. + // Local to this test file; not exported. + return text + .split("\n\n") + .filter((block) => block.trim()) + .map((block) => { + const eventLine = block.match(/^event: (.+)$/m) + const dataLine = block.match(/^data: (.+)$/m) + if (!eventLine || !dataLine) return null + return { event: eventLine[1], data: JSON.parse(dataLine[1]) } + }) + .filter(Boolean) +} + test("OPTIONS preflight returns CORS headers", async () => { const handler = createProxyFetchHandler(createClient()) const request = new Request("http://127.0.0.1:4010/v1/models", { @@ -1107,6 +1122,68 @@ test("POST /v1/responses stream: true returns SSE lifecycle events", async () => assert.ok(text.includes("response.completed")) }) +test("POST /v1/responses stream: true emits content_part.done with accumulated text per OpenAI spec", async () => { + const events = [ + { + type: "message.part.updated", + properties: { + part: { sessionID: "sess-123", type: "text" }, + delta: "The answer", + }, + }, + { + type: "message.part.updated", + properties: { + part: { sessionID: "sess-123", type: "text" }, + delta: " is 42.", + }, + }, + { type: "session.idle", properties: { sessionID: "sess-123" } }, + ] + + const handler = createProxyFetchHandler(createStreamingClient(events)) + const request = new Request("http://127.0.0.1:4010/v1/responses", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + model: "gpt-4o", + stream: true, + input: "What is 6 times 7?", + }), + }) + + const response = await handler(request) + const text = await response.text() + const parsed = parseSseStream(text) + const names = parsed.map((e) => e.event) + + // Discriminator 1 (gap #3): output_text.done.text must be the accumulated content + const outputTextDone = parsed.find((e) => e.event === "response.output_text.done") + assert.ok(outputTextDone, "response.output_text.done event must be present") + assert.equal(outputTextDone.data.text, "The answer is 42.") + + // Discriminator 2 (gap #2): content_part.done must be present with populated part.text + const contentPartDone = parsed.find((e) => e.event === "response.content_part.done") + assert.ok(contentPartDone, "response.content_part.done event must be present") + assert.equal(contentPartDone.data.part.type, "output_text") + assert.equal(contentPartDone.data.part.text, "The answer is 42.") + assert.deepEqual(contentPartDone.data.part.annotations, []) + + // Ordering: output_text.done -> content_part.done -> output_item.done + const idxOutputTextDone = names.indexOf("response.output_text.done") + const idxContentPartDone = names.indexOf("response.content_part.done") + const idxOutputItemDone = names.indexOf("response.output_item.done") + assert.ok(idxOutputTextDone >= 0, "output_text.done must be in the stream") + assert.ok( + idxContentPartDone > idxOutputTextDone, + "content_part.done must follow output_text.done", + ) + assert.ok( + idxOutputItemDone > idxContentPartDone, + "output_item.done must follow content_part.done", + ) +}) + test("POST /v1/responses stream: true with session.error emits response.failed", async () => { const events = [ {