From e179959bd8f1f0e5869a11fe73581297d920695a Mon Sep 17 00:00:00 2001 From: DizzyMii Date: Fri, 29 May 2026 12:00:03 -0600 Subject: [PATCH] fix(adapter-anthropic): flush trailing SSE buffer on stream close parseSSE only emitted events split on a blank line, so a final event such as message_stop that is not followed by a trailing "\n\n" was left in the buffer and dropped, so the terminal usage/end chunks were never yielded. Flush the remaining buffer after the read loop, matching the OpenAI-compat adapter. --- .changeset/fix-anthropic-sse-flush.md | 5 +++ packages/adapter-anthropic/src/index.ts | 17 +++++++++ .../adapter-anthropic/test/surface.test.ts | 36 +++++++++++++++++++ 3 files changed, 58 insertions(+) create mode 100644 .changeset/fix-anthropic-sse-flush.md diff --git a/.changeset/fix-anthropic-sse-flush.md b/.changeset/fix-anthropic-sse-flush.md new file mode 100644 index 0000000..2bd4f52 --- /dev/null +++ b/.changeset/fix-anthropic-sse-flush.md @@ -0,0 +1,5 @@ +--- +"@flint/adapter-anthropic": patch +--- + +Flush the trailing SSE buffer when the stream closes. The Anthropic adapter's `parseSSE` only emitted events split on `\n\n`, so a final event (such as `message_stop`) that arrives without a trailing blank line was left in the buffer and silently dropped — preventing the terminal `usage`/`end` chunks from being yielded. This mirrors the flush already performed by the OpenAI-compat adapter. diff --git a/packages/adapter-anthropic/src/index.ts b/packages/adapter-anthropic/src/index.ts index 9680e97..3a33cd2 100644 --- a/packages/adapter-anthropic/src/index.ts +++ b/packages/adapter-anthropic/src/index.ts @@ -288,6 +288,23 @@ async function* parseSSE( } } } + + // Flush any remaining buffer content after the stream closes. A final event + // (e.g. message_stop) may not be terminated by a trailing "\n\n". + if (buffer.trim()) { + let event = ''; + let data = ''; + for (const line of buffer.split('\n')) { + if (line.startsWith('event: ')) { + event = line.slice(7).trim(); + } else if (line.startsWith('data: ')) { + data = line.slice(6); + } + } + if (event && data) { + yield { event, data }; + } + } } finally { reader.releaseLock(); } diff --git a/packages/adapter-anthropic/test/surface.test.ts b/packages/adapter-anthropic/test/surface.test.ts index 008f2e5..ccd0c85 100644 --- a/packages/adapter-anthropic/test/surface.test.ts +++ b/packages/adapter-anthropic/test/surface.test.ts @@ -624,6 +624,42 @@ describe('anthropicAdapter — stream()', () => { expect(endChunk?.reason).toBe('end'); }); + it('emits the final event when the stream is not terminated by a blank line', async () => { + // Drop the trailing "\n\n" so message_stop sits in the buffer at stream close. + const sse = sseBody( + { + event: 'message_start', + data: { message: { usage: { input_tokens: 15, output_tokens: 0 } } }, + }, + { event: 'content_block_start', data: { index: 0, content_block: { type: 'text' } } }, + { + event: 'content_block_delta', + data: { index: 0, delta: { type: 'text_delta', text: 'Hi' } }, + }, + { event: 'content_block_stop', data: { index: 0 } }, + { + event: 'message_delta', + data: { delta: { stop_reason: 'end_turn' }, usage: { output_tokens: 3 } }, + }, + { event: 'message_stop', data: {} }, + ).replace(/\n\n$/, ''); + + const a = anthropicAdapter({ apiKey: 'test', fetch: mockFetch(sse) }); + const chunks: StreamChunk[] = []; + for await (const chunk of a.stream({ + model: 'claude-3-5-haiku-20241022', + messages: [{ role: 'user', content: 'Hi' }], + })) { + chunks.push(chunk); + } + + const endChunk = chunks.find((c) => c.type === 'end') as + | { type: 'end'; reason: string } + | undefined; + expect(endChunk).toBeDefined(); + expect(endChunk?.reason).toBe('end'); + }); + it('throws AdapterError on HTTP error response (stream)', async () => { const a = anthropicAdapter({ apiKey: 'test',