From 1e2a98956cb69cf1d8b9cf7bee87786ed6f401ca Mon Sep 17 00:00:00 2001 From: Alexis de Treglode Date: Fri, 15 May 2026 11:12:59 -0700 Subject: [PATCH] #175 feat: support Lambda response streaming end-to-end The non-direct invoke path used by the local Runtime Interface Emulator (the one SAM CLI talks to over /2015-03-31/functions/function/invocations) was buffering the runtime's /response body via io.ReadAll before writing a single slab back to the caller, and the rie.ResponseWriterProxy was storing that slab in a []byte. Both layers had to drain before the caller saw any byte, which made Server-Sent Events and other Lambda response-streaming workloads impossible to test locally. This change wires the real http.ResponseWriter all the way down: * rie.ResponseWriterProxy gains an Underlying http.ResponseWriter and becomes a streaming pass-through that copies staged headers/status on the first Write, then forwards every Write to the underlying writer and Flushes after each one. The body buffer is still kept for the pre-stream error paths in InvokeHandler. * rapidcore.Server.sendResponseUnsafe now detects streaming responses via additionalHeaders[Lambda-Runtime-Function-Response-Mode] (matched case-insensitively, since the runtime sends "streaming" and the interop constant is "Streaming") and uses a new streamingCopy helper that pipes the runtime's POST body through to the reply stream with a tiny 4KiB buffer and an explicit Flush after every Write. The buffered (legacy) branch is preserved and continues to enforce interop.MaxPayloadSize for non-streaming responses. * rie.InvokeHandler creates the proxy with the real ResponseWriter and skips the trailing WriteHeader/Write if streaming has already begun. Error code paths that would emit a synthetic body are gated on the new Started flag so we never try to overwrite an in-flight stream. End-to-end test: a Node.js handler using awslambda.streamifyResponse that emits one SSE frame per second for 10 seconds now reaches an EventSource client (and curl -N) one frame at a time, with the JSON HTTP-integration prelude correctly forwarded to the caller via the Content-Type and Lambda-Runtime-Function-Response-Mode headers. All existing unit tests under internal/lambda/... still pass. Co-authored-by: Cursor --- internal/lambda/rapidcore/server.go | 125 +++++++++++++++++++++++----- internal/lambda/rie/handlers.go | 23 ++++- internal/lambda/rie/util.go | 92 +++++++++++++++++++- 3 files changed, 215 insertions(+), 25 deletions(-) diff --git a/internal/lambda/rapidcore/server.go b/internal/lambda/rapidcore/server.go index b82d8276..b1bfa108 100644 --- a/internal/lambda/rapidcore/server.go +++ b/internal/lambda/rapidcore/server.go @@ -11,6 +11,7 @@ import ( "io" "math" "net/http" + "strings" "sync" "time" @@ -288,6 +289,46 @@ func (s *Server) SetInternalStateGetter(cb interop.InternalStateGetter) { s.InternalStateGetter = cb } +// streamingCopy pumps bytes from src to dst with a small intermediate buffer +// and an explicit Flush after every Write. This is the heart of the +// response-streaming pass-through used by the local Runtime Interface +// Emulator: every chunk the runtime writes to its /response HTTP body is +// promptly forwarded to the caller (e.g. SAM CLI proxy → browser) instead of +// being buffered until end-of-stream. +// +// The buffer is intentionally tiny so SSE frames (often only tens of bytes) +// are not coalesced into larger reads when the runtime flushes one event at +// a time. io.Copy's default 32KiB buffer would, in combination with TCP-level +// merging, defeat that goal. +func streamingCopy(dst io.Writer, src io.Reader) (int64, error) { + const bufSize = 4096 + buf := make([]byte, bufSize) + flusher, _ := dst.(http.Flusher) + var total int64 + for { + nr, rerr := src.Read(buf) + if nr > 0 { + nw, werr := dst.Write(buf[:nr]) + total += int64(nw) + if werr != nil { + return total, werr + } + if nw < nr { + return total, io.ErrShortWrite + } + if flusher != nil { + flusher.Flush() + } + } + if rerr == io.EOF { + return total, nil + } + if rerr != nil { + return total, rerr + } + } +} + func (s *Server) sendResponseUnsafe(invokeID string, additionalHeaders map[string]string, payload io.Reader, trailers http.Header, request *interop.CancellableRequest, runtimeCalledResponse bool) error { if s.invokeCtx == nil || invokeID != s.invokeCtx.Token.InvokeID { return interop.ErrInvalidInvokeID @@ -309,37 +350,81 @@ func (s *Server) sendResponseUnsafe(invokeID string, additionalHeaders map[strin reportedErr = err } } else { - data, err := io.ReadAll(payload) - if err != nil { - return fmt.Errorf("Failed to read response on %s: %s", invokeID, err) + // Determine whether the runtime told us this is a streaming + // response. Streaming runtimes (e.g. Node.js awslambda.streamifyResponse) + // post chunks of bytes over time and rely on the platform to forward + // each chunk to the caller as it arrives. The header lives in + // additionalHeaders because the runtime API handler + // (rapi/handler/invocationresponse.go) parses it from the runtime's + // POST /response request. + // The runtime API handler stores the response mode the runtime sent + // (e.g. Node.js "streamifyResponse" sends `streaming`, lowercase), + // while the interop constant has the capitalized form `Streaming`. + // Compare case-insensitively so we recognize streaming responses + // regardless of which casing the runtime used. + functionResponseMode := additionalHeaders[directinvoke.FunctionResponseModeHeader] + isStreaming := strings.EqualFold(functionResponseMode, string(interop.FunctionResponseModeStreaming)) + + startReadingResponseMonoTimeMs := metering.Monotime() + + // Set Content-Type before any byte is written so the reply stream + // can commit headers on the first Write. + if ct, ok := additionalHeaders[directinvoke.ContentTypeHeader]; ok && ct != "" { + s.invokeCtx.ReplyStream.Header().Add(directinvoke.ContentTypeHeader, ct) } - if len(data) > interop.MaxPayloadSize { - return &interop.ErrorResponseTooLarge{ - ResponseSize: len(data), - MaxResponseSize: interop.MaxPayloadSize, + if isStreaming { + // Advertise the function response mode to the caller (SAM CLI) + // so it can switch into streaming pass-through itself. + s.invokeCtx.ReplyStream.Header().Add(directinvoke.FunctionResponseModeHeader, functionResponseMode) + } + + var written int64 + var copyErr error + if isStreaming { + // Pump bytes from the runtime's /response request body straight + // through to the caller. A small buffer is used so even tiny + // chunks (e.g. SSE frames a few dozen bytes long) flow with + // minimal latency. Each Write on ReplyStream is flushed by the + // rie.ResponseWriterProxy when an http.Flusher is available. + // + // Streaming responses do not enforce interop.MaxPayloadSize: + // Lambda response streaming is explicitly designed for payloads + // larger than 6MB and for unbounded SSE-style streams. + written, copyErr = streamingCopy(s.invokeCtx.ReplyStream, payload) + } else { + // Buffered legacy path: collect the whole body, enforce the + // 6MB response cap, and emit it in one Write. + data, err := io.ReadAll(payload) + if err != nil { + return fmt.Errorf("Failed to read response on %s: %s", invokeID, err) } + if len(data) > interop.MaxPayloadSize { + return &interop.ErrorResponseTooLarge{ + ResponseSize: len(data), + MaxResponseSize: interop.MaxPayloadSize, + } + } + n, err := s.invokeCtx.ReplyStream.Write(data) + written = int64(n) + copyErr = err + } + if copyErr != nil { + return fmt.Errorf("Failed to write response to %s: %s", invokeID, copyErr) } - startReadingResponseMonoTimeMs := metering.Monotime() - s.invokeCtx.ReplyStream.Header().Add(directinvoke.ContentTypeHeader, additionalHeaders[directinvoke.ContentTypeHeader]) - written, err := s.invokeCtx.ReplyStream.Write(data) - if err != nil { - return fmt.Errorf("Failed to write response to %s: %s", invokeID, err) + responseMode := interop.FunctionResponseModeBuffered + if isStreaming { + responseMode = interop.FunctionResponseModeStreaming } s.sendResponseChan <- &interop.InvokeResponseMetrics{ - ProducedBytes: int64(written), + ProducedBytes: written, StartReadingResponseMonoTimeMs: startReadingResponseMonoTimeMs, FinishReadingResponseMonoTimeMs: metering.Monotime(), TimeShapedNs: int64(-1), OutboundThroughputBps: int64(-1), - // FIXME: - // The runtime tells whether the function response mode is streaming or not. - // Ideally, we would want to use that value here. Since I'm just rebasing, I will leave - // as-is, but we should use that instead of relying on our memory to set this here - // because we "know" it's a streaming code path. - FunctionResponseMode: interop.FunctionResponseModeBuffered, - RuntimeCalledResponse: runtimeCalledResponse, + FunctionResponseMode: responseMode, + RuntimeCalledResponse: runtimeCalledResponse, } } diff --git a/internal/lambda/rie/handlers.go b/internal/lambda/rie/handlers.go index 1abe1541..3b92ada7 100644 --- a/internal/lambda/rie/handlers.go +++ b/internal/lambda/rie/handlers.go @@ -151,8 +151,12 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs i } fmt.Println("START RequestId: " + invokePayload.ID + " Version: " + functionVersion) - // If we write to 'w' directly and waitUntilRelease fails, we won't be able to propagate error anymore - invokeResp := &ResponseWriterProxy{} + // We forward writes to 'w' through the proxy so that response chunks + // produced by the runtime (Lambda response streaming / SSE) are flushed + // to the caller as soon as they arrive. The proxy still keeps a copy of + // the body it has not committed yet so that error code paths below can + // emit a synthetic error response when streaming has not started. + invokeResp := &ResponseWriterProxy{Underlying: w} if err := sandbox.Invoke(invokeResp, invokePayload); err != nil { switch err { @@ -165,6 +169,11 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs i w.WriteHeader(http.StatusInternalServerError) return case rapidcore.ErrInitDoneFailed: + if invokeResp.Started { + // Streaming response was already partially emitted; the + // connection is the only signal we can give the caller now. + return + } w.WriteHeader(http.StatusBadGateway) w.Write(invokeResp.Body) return @@ -188,6 +197,9 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs i return // AwaitRelease errors: case rapidcore.ErrInvokeDoneFailed: + if invokeResp.Started { + return + } w.WriteHeader(http.StatusBadGateway) w.Write(invokeResp.Body) return @@ -208,6 +220,13 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs i printEndReports(invokePayload.ID, initDuration, memorySize, invokeStart, timeoutDuration) + // If streaming has already started, the runtime's /response body has + // been forwarded chunk-by-chunk to the caller through invokeResp; do + // not attempt to (re)write headers/status nor double-emit the body. + if invokeResp.Started { + return + } + if invokeResp.StatusCode != 0 { w.WriteHeader(invokeResp.StatusCode) } diff --git a/internal/lambda/rie/util.go b/internal/lambda/rie/util.go index f7592c2b..863878e2 100644 --- a/internal/lambda/rie/util.go +++ b/internal/lambda/rie/util.go @@ -22,18 +22,93 @@ func (t ErrorType) String() string { return fmt.Sprintf("Cannot stringify standalone.ErrorType.%d", int(t)) } +// ResponseWriterProxy is the http.ResponseWriter wrapper used between the +// rapidcore Server and the real caller-facing connection. +// +// Historically it acted as a tiny buffer: it captured a single status code and +// a single body slab so that, after the invoke returned, the rie.InvokeHandler +// could decide whether to forward the captured response or to overwrite it +// with an error of its own. That design forced the runtime's /response body to +// be fully read before any byte was written to the caller, which made +// Lambda response streaming (e.g. Server-Sent Events) impossible. +// +// The proxy can now optionally hold an Underlying http.ResponseWriter. When +// set, writes are forwarded straight to the underlying writer and immediately +// flushed if the writer implements http.Flusher, while still keeping a copy of +// the headers/status that we accumulate before the first write so we can +// commit them on demand. Errors paths that fire BEFORE any data has been +// streamed (Started == false) keep their existing behavior of being able to +// write a synthetic response. Once Started is true, the rie.InvokeHandler +// must not attempt to set headers or status again on the underlying writer. type ResponseWriterProxy struct { + // Body keeps a copy of any bytes written while we have not yet committed + // to streaming (Underlying == nil or Started == false). It is preserved + // for backward compatibility with error code paths in rie.InvokeHandler + // that re-emit invokeResp.Body on failure. Once we start streaming we + // stop accumulating to avoid unbounded memory growth. Body []byte StatusCode int + + // Underlying, when non-nil, is the real http.ResponseWriter we forward + // writes to. Setting this enables true response streaming: every Write + // passes straight through and triggers a Flush. + Underlying http.ResponseWriter + + // headers accumulates Header().Add(...) calls until the first Write. + // On the first Write they are copied onto Underlying.Header(). + headers http.Header + + // Started is set to true after the first Write that has been committed + // to Underlying. Once true, headers and status are locked. + Started bool } +// Header returns the staged headers map. When streaming has not yet started +// these are local to the proxy; once Started, they are merged onto the real +// writer's header map. func (w *ResponseWriterProxy) Header() http.Header { - return http.Header{} + if w.Started && w.Underlying != nil { + return w.Underlying.Header() + } + if w.headers == nil { + w.headers = make(http.Header) + } + return w.headers } +// Write forwards bytes to the underlying writer when available, flushing +// after each chunk so that callers (browsers consuming SSE) see chunks as +// soon as the runtime emits them. func (w *ResponseWriterProxy) Write(b []byte) (int, error) { - w.Body = b - return 0, nil + if w.Underlying != nil { + if !w.Started { + // Promote staged headers + status onto the underlying writer. + if w.headers != nil { + underlyingHeaders := w.Underlying.Header() + for k, vs := range w.headers { + for _, v := range vs { + underlyingHeaders.Add(k, v) + } + } + } + if w.StatusCode != 0 { + w.Underlying.WriteHeader(w.StatusCode) + } + w.Started = true + } + n, err := w.Underlying.Write(b) + if f, ok := w.Underlying.(http.Flusher); ok { + f.Flush() + } + return n, err + } + + // No streaming target: behave like the original buffer-everything proxy + // (note: the original returned (0, nil) which is technically a violation + // of io.Writer; we return len(b) to be correct, since callers like + // io.Copy interpret a short write as an error). + w.Body = append(w.Body, b...) + return len(b), nil } func (w *ResponseWriterProxy) WriteHeader(statusCode int) { @@ -43,3 +118,14 @@ func (w *ResponseWriterProxy) WriteHeader(statusCode int) { func (w *ResponseWriterProxy) IsError() bool { return w.StatusCode != 0 && w.StatusCode/100 != 2 } + +// Flush implements http.Flusher so that callers performing io.Copy on us +// (or wrapping us) can drive intermediate flushes too. +func (w *ResponseWriterProxy) Flush() { + if w.Underlying == nil { + return + } + if f, ok := w.Underlying.(http.Flusher); ok { + f.Flush() + } +}