Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 105 additions & 20 deletions internal/lambda/rapidcore/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"math"
"net/http"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -288,6 +289,46 @@ func (s *Server) SetInternalStateGetter(cb interop.InternalStateGetter) {
s.InternalStateGetter = cb
}

// streamingCopy pumps bytes from src to dst with a small intermediate buffer
// and an explicit Flush after every Write. This is the heart of the
// response-streaming pass-through used by the local Runtime Interface
// Emulator: every chunk the runtime writes to its /response HTTP body is
// promptly forwarded to the caller (e.g. SAM CLI proxy → browser) instead of
// being buffered until end-of-stream.
//
// The buffer is intentionally tiny so SSE frames (often only tens of bytes)
// are not coalesced into larger reads when the runtime flushes one event at
// a time. io.Copy's default 32KiB buffer would, in combination with TCP-level
// merging, defeat that goal.
func streamingCopy(dst io.Writer, src io.Reader) (int64, error) {
const bufSize = 4096
buf := make([]byte, bufSize)
flusher, _ := dst.(http.Flusher)
var total int64
for {
nr, rerr := src.Read(buf)
if nr > 0 {
nw, werr := dst.Write(buf[:nr])
total += int64(nw)
if werr != nil {
return total, werr
}
if nw < nr {
return total, io.ErrShortWrite
}
if flusher != nil {
flusher.Flush()
}
}
if rerr == io.EOF {
return total, nil
}
if rerr != nil {
return total, rerr
}
}
}

func (s *Server) sendResponseUnsafe(invokeID string, additionalHeaders map[string]string, payload io.Reader, trailers http.Header, request *interop.CancellableRequest, runtimeCalledResponse bool) error {
if s.invokeCtx == nil || invokeID != s.invokeCtx.Token.InvokeID {
return interop.ErrInvalidInvokeID
Expand All @@ -309,37 +350,81 @@ func (s *Server) sendResponseUnsafe(invokeID string, additionalHeaders map[strin
reportedErr = err
}
} else {
data, err := io.ReadAll(payload)
if err != nil {
return fmt.Errorf("Failed to read response on %s: %s", invokeID, err)
// Determine whether the runtime told us this is a streaming
// response. Streaming runtimes (e.g. Node.js awslambda.streamifyResponse)
// post chunks of bytes over time and rely on the platform to forward
// each chunk to the caller as it arrives. The header lives in
// additionalHeaders because the runtime API handler
// (rapi/handler/invocationresponse.go) parses it from the runtime's
// POST /response request.
// The runtime API handler stores the response mode the runtime sent
// (e.g. Node.js "streamifyResponse" sends `streaming`, lowercase),
// while the interop constant has the capitalized form `Streaming`.
// Compare case-insensitively so we recognize streaming responses
// regardless of which casing the runtime used.
functionResponseMode := additionalHeaders[directinvoke.FunctionResponseModeHeader]
isStreaming := strings.EqualFold(functionResponseMode, string(interop.FunctionResponseModeStreaming))

startReadingResponseMonoTimeMs := metering.Monotime()

// Set Content-Type before any byte is written so the reply stream
// can commit headers on the first Write.
if ct, ok := additionalHeaders[directinvoke.ContentTypeHeader]; ok && ct != "" {
s.invokeCtx.ReplyStream.Header().Add(directinvoke.ContentTypeHeader, ct)
}
if len(data) > interop.MaxPayloadSize {
return &interop.ErrorResponseTooLarge{
ResponseSize: len(data),
MaxResponseSize: interop.MaxPayloadSize,
if isStreaming {
// Advertise the function response mode to the caller (SAM CLI)
// so it can switch into streaming pass-through itself.
s.invokeCtx.ReplyStream.Header().Add(directinvoke.FunctionResponseModeHeader, functionResponseMode)
}

var written int64
var copyErr error
if isStreaming {
// Pump bytes from the runtime's /response request body straight
// through to the caller. A small buffer is used so even tiny
// chunks (e.g. SSE frames a few dozen bytes long) flow with
// minimal latency. Each Write on ReplyStream is flushed by the
// rie.ResponseWriterProxy when an http.Flusher is available.
//
// Streaming responses do not enforce interop.MaxPayloadSize:
// Lambda response streaming is explicitly designed for payloads
// larger than 6MB and for unbounded SSE-style streams.
written, copyErr = streamingCopy(s.invokeCtx.ReplyStream, payload)
} else {
// Buffered legacy path: collect the whole body, enforce the
// 6MB response cap, and emit it in one Write.
data, err := io.ReadAll(payload)
if err != nil {
return fmt.Errorf("Failed to read response on %s: %s", invokeID, err)
}
if len(data) > interop.MaxPayloadSize {
return &interop.ErrorResponseTooLarge{
ResponseSize: len(data),
MaxResponseSize: interop.MaxPayloadSize,
}
}
n, err := s.invokeCtx.ReplyStream.Write(data)
written = int64(n)
copyErr = err
}
if copyErr != nil {
return fmt.Errorf("Failed to write response to %s: %s", invokeID, copyErr)
}

startReadingResponseMonoTimeMs := metering.Monotime()
s.invokeCtx.ReplyStream.Header().Add(directinvoke.ContentTypeHeader, additionalHeaders[directinvoke.ContentTypeHeader])
written, err := s.invokeCtx.ReplyStream.Write(data)
if err != nil {
return fmt.Errorf("Failed to write response to %s: %s", invokeID, err)
responseMode := interop.FunctionResponseModeBuffered
if isStreaming {
responseMode = interop.FunctionResponseModeStreaming
}

s.sendResponseChan <- &interop.InvokeResponseMetrics{
ProducedBytes: int64(written),
ProducedBytes: written,
StartReadingResponseMonoTimeMs: startReadingResponseMonoTimeMs,
FinishReadingResponseMonoTimeMs: metering.Monotime(),
TimeShapedNs: int64(-1),
OutboundThroughputBps: int64(-1),
// FIXME:
// The runtime tells whether the function response mode is streaming or not.
// Ideally, we would want to use that value here. Since I'm just rebasing, I will leave
// as-is, but we should use that instead of relying on our memory to set this here
// because we "know" it's a streaming code path.
FunctionResponseMode: interop.FunctionResponseModeBuffered,
RuntimeCalledResponse: runtimeCalledResponse,
FunctionResponseMode: responseMode,
RuntimeCalledResponse: runtimeCalledResponse,
}
}

Expand Down
23 changes: 21 additions & 2 deletions internal/lambda/rie/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,12 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs i
}
fmt.Println("START RequestId: " + invokePayload.ID + " Version: " + functionVersion)

// If we write to 'w' directly and waitUntilRelease fails, we won't be able to propagate error anymore
invokeResp := &ResponseWriterProxy{}
// We forward writes to 'w' through the proxy so that response chunks
// produced by the runtime (Lambda response streaming / SSE) are flushed
// to the caller as soon as they arrive. The proxy still keeps a copy of
// the body it has not committed yet so that error code paths below can
// emit a synthetic error response when streaming has not started.
invokeResp := &ResponseWriterProxy{Underlying: w}
if err := sandbox.Invoke(invokeResp, invokePayload); err != nil {
switch err {

Expand All @@ -165,6 +169,11 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs i
w.WriteHeader(http.StatusInternalServerError)
return
case rapidcore.ErrInitDoneFailed:
if invokeResp.Started {
// Streaming response was already partially emitted; the
// connection is the only signal we can give the caller now.
return
}
w.WriteHeader(http.StatusBadGateway)
w.Write(invokeResp.Body)
return
Expand All @@ -188,6 +197,9 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs i
return
// AwaitRelease errors:
case rapidcore.ErrInvokeDoneFailed:
if invokeResp.Started {
return
}
w.WriteHeader(http.StatusBadGateway)
w.Write(invokeResp.Body)
return
Expand All @@ -208,6 +220,13 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs i

printEndReports(invokePayload.ID, initDuration, memorySize, invokeStart, timeoutDuration)

// If streaming has already started, the runtime's /response body has
// been forwarded chunk-by-chunk to the caller through invokeResp; do
// not attempt to (re)write headers/status nor double-emit the body.
if invokeResp.Started {
return
}

if invokeResp.StatusCode != 0 {
w.WriteHeader(invokeResp.StatusCode)
}
Expand Down
92 changes: 89 additions & 3 deletions internal/lambda/rie/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,93 @@ func (t ErrorType) String() string {
return fmt.Sprintf("Cannot stringify standalone.ErrorType.%d", int(t))
}

// ResponseWriterProxy is the http.ResponseWriter wrapper used between the
// rapidcore Server and the real caller-facing connection.
//
// Historically it acted as a tiny buffer: it captured a single status code and
// a single body slab so that, after the invoke returned, the rie.InvokeHandler
// could decide whether to forward the captured response or to overwrite it
// with an error of its own. That design forced the runtime's /response body to
// be fully read before any byte was written to the caller, which made
// Lambda response streaming (e.g. Server-Sent Events) impossible.
//
// The proxy can now optionally hold an Underlying http.ResponseWriter. When
// set, writes are forwarded straight to the underlying writer and immediately
// flushed if the writer implements http.Flusher, while still keeping a copy of
// the headers/status that we accumulate before the first write so we can
// commit them on demand. Errors paths that fire BEFORE any data has been
// streamed (Started == false) keep their existing behavior of being able to
// write a synthetic response. Once Started is true, the rie.InvokeHandler
// must not attempt to set headers or status again on the underlying writer.
type ResponseWriterProxy struct {
// Body keeps a copy of any bytes written while we have not yet committed
// to streaming (Underlying == nil or Started == false). It is preserved
// for backward compatibility with error code paths in rie.InvokeHandler
// that re-emit invokeResp.Body on failure. Once we start streaming we
// stop accumulating to avoid unbounded memory growth.
Body []byte
StatusCode int

// Underlying, when non-nil, is the real http.ResponseWriter we forward
// writes to. Setting this enables true response streaming: every Write
// passes straight through and triggers a Flush.
Underlying http.ResponseWriter

// headers accumulates Header().Add(...) calls until the first Write.
// On the first Write they are copied onto Underlying.Header().
headers http.Header

// Started is set to true after the first Write that has been committed
// to Underlying. Once true, headers and status are locked.
Started bool
}

// Header returns the staged headers map. When streaming has not yet started
// these are local to the proxy; once Started, they are merged onto the real
// writer's header map.
func (w *ResponseWriterProxy) Header() http.Header {
return http.Header{}
if w.Started && w.Underlying != nil {
return w.Underlying.Header()
}
if w.headers == nil {
w.headers = make(http.Header)
}
return w.headers
}

// Write forwards bytes to the underlying writer when available, flushing
// after each chunk so that callers (browsers consuming SSE) see chunks as
// soon as the runtime emits them.
func (w *ResponseWriterProxy) Write(b []byte) (int, error) {
w.Body = b
return 0, nil
if w.Underlying != nil {
if !w.Started {
// Promote staged headers + status onto the underlying writer.
if w.headers != nil {
underlyingHeaders := w.Underlying.Header()
for k, vs := range w.headers {
for _, v := range vs {
underlyingHeaders.Add(k, v)
}
}
}
if w.StatusCode != 0 {
w.Underlying.WriteHeader(w.StatusCode)
}
w.Started = true
}
n, err := w.Underlying.Write(b)
if f, ok := w.Underlying.(http.Flusher); ok {
f.Flush()
}
return n, err
}

// No streaming target: behave like the original buffer-everything proxy
// (note: the original returned (0, nil) which is technically a violation
// of io.Writer; we return len(b) to be correct, since callers like
// io.Copy interpret a short write as an error).
w.Body = append(w.Body, b...)
return len(b), nil
}

func (w *ResponseWriterProxy) WriteHeader(statusCode int) {
Expand All @@ -43,3 +118,14 @@ func (w *ResponseWriterProxy) WriteHeader(statusCode int) {
func (w *ResponseWriterProxy) IsError() bool {
return w.StatusCode != 0 && w.StatusCode/100 != 2
}

// Flush implements http.Flusher so that callers performing io.Copy on us
// (or wrapping us) can drive intermediate flushes too.
func (w *ResponseWriterProxy) Flush() {
if w.Underlying == nil {
return
}
if f, ok := w.Underlying.(http.Flusher); ok {
f.Flush()
}
}