Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,9 @@ export function createProxyFetchHandler(client) {
)

let partIndex = 0
// Accumulate delta tokens so we can populate `text` on output_text.done and content_part.done per the
// OpenAI Responses API SSE spec (https://platform.openai.com/docs/api-reference/responses-streaming).
let accumulatedText = ""
const runPromise = executePromptStreaming(
client,
model,
Expand All @@ -925,6 +928,7 @@ export function createProxyFetchHandler(client) {
)
partIndex++
}
accumulatedText += delta
queue.enqueue(
sseEvent("response.output_text.delta", {
type: "response.output_text.delta",
Expand All @@ -943,9 +947,22 @@ export function createProxyFetchHandler(client) {
item_id: itemID,
output_index: 0,
content_index: 0,
text: "",
text: accumulatedText,
}),
)
if (partIndex > 0) {
// Only emit content_part.done if content_part.added was emitted (i.e. at least one delta arrived).
// Keeps the content-part lifecycle symmetric per the OpenAI Responses API spec.
queue.enqueue(
sseEvent("response.content_part.done", {
type: "response.content_part.done",
item_id: itemID,
output_index: 0,
content_index: 0,
part: { type: "output_text", text: accumulatedText, annotations: [] },
}),
)
}
queue.enqueue(
sseEvent("response.output_item.done", {
type: "response.output_item.done",
Expand Down
77 changes: 77 additions & 0 deletions index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,21 @@ function createStreamingClient(chunks) {
}
}

function parseSseStream(text) {
// Parses SSE `event: <name>\ndata: <json>\n\n` chunks into an ordered array.
// Local to this test file; not exported.
return text
.split("\n\n")
.filter((block) => block.trim())
.map((block) => {
const eventLine = block.match(/^event: (.+)$/m)
const dataLine = block.match(/^data: (.+)$/m)
if (!eventLine || !dataLine) return null
return { event: eventLine[1], data: JSON.parse(dataLine[1]) }
})
.filter(Boolean)
}

test("OPTIONS preflight returns CORS headers", async () => {
const handler = createProxyFetchHandler(createClient())
const request = new Request("http://127.0.0.1:4010/v1/models", {
Expand Down Expand Up @@ -1107,6 +1122,68 @@ test("POST /v1/responses stream: true returns SSE lifecycle events", async () =>
assert.ok(text.includes("response.completed"))
})

test("POST /v1/responses stream: true emits content_part.done with accumulated text per OpenAI spec", async () => {
const events = [
{
type: "message.part.updated",
properties: {
part: { sessionID: "sess-123", type: "text" },
delta: "The answer",
},
},
{
type: "message.part.updated",
properties: {
part: { sessionID: "sess-123", type: "text" },
delta: " is 42.",
},
},
{ type: "session.idle", properties: { sessionID: "sess-123" } },
]

const handler = createProxyFetchHandler(createStreamingClient(events))
const request = new Request("http://127.0.0.1:4010/v1/responses", {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
model: "gpt-4o",
stream: true,
input: "What is 6 times 7?",
}),
})

const response = await handler(request)
const text = await response.text()
const parsed = parseSseStream(text)
const names = parsed.map((e) => e.event)

// Discriminator 1 (gap #3): output_text.done.text must be the accumulated content
const outputTextDone = parsed.find((e) => e.event === "response.output_text.done")
assert.ok(outputTextDone, "response.output_text.done event must be present")
assert.equal(outputTextDone.data.text, "The answer is 42.")

// Discriminator 2 (gap #2): content_part.done must be present with populated part.text
const contentPartDone = parsed.find((e) => e.event === "response.content_part.done")
assert.ok(contentPartDone, "response.content_part.done event must be present")
assert.equal(contentPartDone.data.part.type, "output_text")
assert.equal(contentPartDone.data.part.text, "The answer is 42.")
assert.deepEqual(contentPartDone.data.part.annotations, [])

// Ordering: output_text.done -> content_part.done -> output_item.done
const idxOutputTextDone = names.indexOf("response.output_text.done")
const idxContentPartDone = names.indexOf("response.content_part.done")
const idxOutputItemDone = names.indexOf("response.output_item.done")
assert.ok(idxOutputTextDone >= 0, "output_text.done must be in the stream")
assert.ok(
idxContentPartDone > idxOutputTextDone,
"content_part.done must follow output_text.done",
)
assert.ok(
idxOutputItemDone > idxContentPartDone,
"output_item.done must follow content_part.done",
)
})

test("POST /v1/responses stream: true with session.error emits response.failed", async () => {
const events = [
{
Expand Down