Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions samcli/commands/local/lib/local_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,46 @@ def invoke(

return headers

def invoke_streaming(
self,
function_identifier: str,
event: str,
tenant_id: Optional[str] = None,
stderr: Optional[StreamWriter] = None,
override_runtime: Optional[str] = None,
function: Optional[Function] = None,
):
"""
Streaming counterpart of :meth:`invoke`.

Resolves the function, then asks the underlying LambdaRuntime to
perform a streaming invocation. Returns a tuple
``(response, cleanup)`` where ``response`` is the raw streaming
:class:`requests.Response` from the local Runtime Interface
Emulator (RIE) and ``cleanup`` is a callable the caller MUST run
when it has finished consuming the body.

This is used by the API Gateway local service to forward
Lambda response-streaming output (e.g. Server-Sent Events
produced by ``awslambda.streamifyResponse`` in Node.js) to the
browser without buffering.
"""
if not function:
function = self.get_function(function_identifier, tenant_id)
config = self.get_invoke_config(function, override_runtime)

return self.local_runtime.invoke_streaming(
config,
event,
tenant_id,
debug_context=self.debug_context,
stderr=stderr,
container_host=self.container_host,
container_host_interface=self.container_host_interface,
extra_hosts=self.extra_hosts,
container_dns=self.container_dns,
)

def is_debugging(self) -> bool:
"""
Are we debugging the invoke?
Expand Down
198 changes: 194 additions & 4 deletions samcli/local/apigw/local_apigw_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
from samcli.local.apigw.path_converter import PathConverter
from samcli.local.apigw.route import Route
from samcli.local.apigw.service_error_responses import ServiceErrorResponses
from samcli.local.apigw.streaming_response import (
build_streaming_flask_response,
is_streaming_response,
)
from samcli.local.docker.exceptions import DockerContainerCreationFailedException
from samcli.local.events.api_event import (
ContextHTTP,
Expand Down Expand Up @@ -60,6 +64,12 @@ def to_url(self, value):
class LocalApigwService(BaseLocalService):
_DEFAULT_PORT = 3000
_DEFAULT_HOST = "127.0.0.1"
# Hard ceiling on the size of a buffered (non-streaming) Lambda
# response we are willing to materialize in memory. Slightly above
# AWS Lambda's documented 6 MB synchronous response payload limit so
# legitimate responses pass through unmodified, but stops a misbehaving
# or hung runtime from blowing the SAM CLI process up.
_BUFFERED_RESPONSE_BYTE_CAP = 8 * 1024 * 1024

def __init__(
self,
Expand Down Expand Up @@ -630,6 +640,140 @@ def _invoke_lambda_function(

return lambda_response

def _route_uses_response_stream(self, route: Route) -> bool:
"""Return ``True`` if the function backing ``route`` opts into
Lambda response streaming via ``FunctionUrlConfig.InvokeMode ==
RESPONSE_STREAM``. Routes whose function does not declare it
(or whose Function can't be resolved) fall through to the
historical buffered invoke path."""
if not route.function_name:
return False
provider = getattr(self.lambda_runner, "provider", None)
if provider is None:
return False
try:
function = provider.get(route.function_name)
except Exception: # pylint: disable=broad-except
return False
if function is None:
return False
url_config = getattr(function, "function_url_config", None) or {}
invoke_mode = url_config.get("InvokeMode") if isinstance(url_config, dict) else None
return isinstance(invoke_mode, str) and invoke_mode.upper() == "RESPONSE_STREAM"

def _invoke_lambda_function_streaming(
self,
lambda_function_name: str,
event: dict,
tenant_id: Optional[str],
cors_headers: Dict[str, str],
):
"""
Streaming counterpart of :meth:`_invoke_lambda_function`. Only
used for routes whose Lambda function opts in via
``FunctionUrlConfig.InvokeMode == RESPONSE_STREAM``.

Issues the invoke against the local RIE with ``stream=True`` so
the response body is exposed to us as it arrives. If the runtime
advertises a streaming response (via the
``Lambda-Runtime-Function-Response-Mode`` header) we forward the
bytes through to the browser via
:func:`build_streaming_flask_response`. Otherwise we drain the
body, parse the standard API Gateway proxy JSON and return a
regular Flask response built from it — that case is rare for an
opted-in route (it would mean the function is misconfigured) but
keeps the behavior well-defined.
"""
event_str = json.dumps(event, sort_keys=True)
response, cleanup = self.lambda_runner.invoke_streaming(
lambda_function_name, event_str, tenant_id=tenant_id, stderr=self.stderr
)

if is_streaming_response(response):
LOG.debug("Lambda function returned a streaming response, forwarding chunks as they arrive")
try:
flask_response = build_streaming_flask_response(
response,
on_complete=cleanup,
extra_headers=cors_headers,
)
except BaseException:
# build_streaming_flask_response synchronously iterates the
# upstream body to parse the http-integration JSON prelude.
# If that read fails (connection reset, RIE crash, malformed
# body) the Flask response is never built, ``call_on_close``
# never fires, and the cleanup chain would leak the timeout
# timer, the container's concurrency slot, the open
# requests.Response, and (for cold runtimes) the container
# itself. Run cleanup explicitly here so a construction-time
# failure does not strand resources.
cleanup()
raise
# cleanup is now owned by the Flask response lifecycle
# (Response.call_on_close inside build_streaming_flask_response).
return flask_response

# Buffered response: drain the body now so we can release the
# container as soon as possible, then route through the existing
# API Gateway parser.
#
# We use iter_content with a hard byte cap instead of
# response.content because the latter would happily allocate
# gigabytes if a misconfigured (or hung) runtime kept writing,
# and would block the container's concurrency slot for the
# entire read.
try:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[RESOURCE_MANAGEMENT] -668

try:
   body_bytes = response.content
finally:
   cleanup()

response.content on a stream=True response materializes the entire body in memory in one shot. For a buffered Lambda response that's the same memory profile as today, but on an unexpectedly-large body (e.g. a function that returns megabytes despite not being streaming-tagged) this now reads it all before the slot is released, blocking the container's concurrency slot for longer than necessary and pushing the Python process memory up. Consider whether you want a size cap here, or at least an explicit timeout — the streaming POST uses timeout=(connect, None), so a hung non-streaming runtime that never closes the body would hold the slot indefinitely.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in d82da42. The buffered drain in _invoke_lambda_function_streaming now uses response.iter_content(chunk_size=64KiB) against a hard _BUFFERED_RESPONSE_BYTE_CAP = 8 MiB (slightly above Lambda's 6 MB sync response limit). If a misconfigured runtime exceeds the cap we abort the read, log, and raise LambdaResponseParseException so the container slot is released and a user-facing failure response is returned. This path is also now only reached for routes that opted into RESPONSE_STREAM and accidentally returned a buffered body, so the blast radius is small.

chunks: list = []
received = 0
for chunk in response.iter_content(chunk_size=64 * 1024):
if not chunk:
continue
received += len(chunk)
if received > self._BUFFERED_RESPONSE_BYTE_CAP:
LOG.error(
"Buffered response from streaming-enabled Lambda %s exceeded %d bytes; aborting read",
lambda_function_name,
self._BUFFERED_RESPONSE_BYTE_CAP,
)
raise LambdaResponseParseException(
f"Lambda response exceeds {self._BUFFERED_RESPONSE_BYTE_CAP} byte cap"
)
chunks.append(chunk)
body_bytes = b"".join(chunks)
finally:
cleanup()

# Preserve the historical behaviour of get_lambda_output() which
# round-trips through json.dumps to normalize ordering / encoding.
body_text: Union[str, bytes]
try:
body_text = json.dumps(json.loads(body_bytes), ensure_ascii=False)
except (ValueError, json.JSONDecodeError):
try:
body_text = body_bytes.decode("utf-8")
except UnicodeDecodeError:
body_text = body_bytes # type: ignore[assignment]

if isinstance(body_text, str) and LambdaOutputParser.is_lambda_error_response(body_text):
raise LambdaResponseParseException

# An opted-in route should always return a streaming response.
# If we end up here the function is misconfigured (declares
# RESPONSE_STREAM but returned a buffered body). Surface the
# raw body verbatim with CORS headers; we deliberately do NOT
# try to parse it as an API Gateway proxy response — callers
# that need that should remove InvokeMode=RESPONSE_STREAM.
LOG.warning(
"Lambda function %s is declared with InvokeMode=RESPONSE_STREAM but did not stream; "
"returning the raw body verbatim",
lambda_function_name,
)
return self.service_response(
body_text if isinstance(body_text, str) else body_text.decode("utf-8", errors="replace"),
Headers(cors_headers),
200,
)

def _request_handler(self, **kwargs):
"""
We handle all requests to the host:port. The general flow of handling a request is as follows
Expand Down Expand Up @@ -735,11 +879,57 @@ def _request_handler(self, **kwargs):
return auth_service_error

endpoint_service_error = None
try:
# Extract tenant-id from HTTP request header
tenant_id = request.headers.get("X-Amz-Tenant-Id")

# invoke the route's Lambda function
# Extract tenant-id from HTTP request header
tenant_id = request.headers.get("X-Amz-Tenant-Id")

# The streaming pipeline is opt-in per function via
# FunctionUrlConfig.InvokeMode == RESPONSE_STREAM, mirroring the
# contract Lambda Function URLs use in production. Functions that
# don't declare it keep going through the historical buffered
# invoke path untouched.
use_streaming = self._route_uses_response_stream(route)

if use_streaming:
streaming_response = None
try:
streaming_response = self._invoke_lambda_function_streaming(
route.function_name, route_lambda_event, tenant_id, cors_headers
)
except TenantIdValidationError as e:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ERROR_HANDLING] The new streaming branch's try/except catches TenantIdValidationError, FunctionNotFound, UnsupportedInlineCodeError, DockerContainerCreationFailedException, MissingFunctionNameException, and a catch-all Exception — but not LambdaResponseParseException. The buffered path right below it does catch it explicitly and returns ServiceErrorResponses.lambda_body_failure_response() (HTTP 500).

_invoke_lambda_function_streaming raises LambdaResponseParseException in two places it added in this PR — when the buffered drain trips _BUFFERED_RESPONSE_BYTE_CAP (line 722) and when the body looks like a Lambda error envelope (line 742). On streaming-enabled routes those failures will fall into except Exception and surface as lambda_failure_response() (HTTP 502), while the same condition on a non-streaming route surfaces as HTTP 500. Same root cause, different status code and log message depending purely on whether the route opted into InvokeMode=RESPONSE_STREAM.

Add an explicit handler before the catch-all to mirror the buffered path:

except MissingFunctionNameException as ex:
   endpoint_service_error = ServiceErrorResponses.lambda_failure_response(
       f"Failed to execute endpoint. Got an invalid function name ({str(ex)})",
   )
except LambdaResponseParseException:
   endpoint_service_error = ServiceErrorResponses.lambda_body_failure_response()
except Exception as ex:  # pylint: disable=broad-except
   ...

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in f2e262f. Added an explicit except LambdaResponseParseException clause before the catch-all in the streaming branch of _request_handler, returning ServiceErrorResponses.lambda_body_failure_response() (HTTP 500). This matches the buffered path exactly, so the _BUFFERED_RESPONSE_BYTE_CAP violation and the Lambda-error-envelope detection in _invoke_lambda_function_streaming now produce the same HTTP status regardless of whether the route opted into InvokeMode=RESPONSE_STREAM.

endpoint_service_error = ServiceErrorResponses.tenant_id_validation_error(str(e))
except FunctionNotFound:
endpoint_service_error = ServiceErrorResponses.lambda_not_found_response()
except UnsupportedInlineCodeError:
endpoint_service_error = ServiceErrorResponses.not_implemented_locally(
"Inline code is not supported for sam local commands. Please write your code in a separate file."
)
except DockerContainerCreationFailedException as ex:
endpoint_service_error = ServiceErrorResponses.container_creation_failed(ex.message)
except MissingFunctionNameException as ex:
endpoint_service_error = ServiceErrorResponses.lambda_failure_response(
f"Failed to execute endpoint. Got an invalid function name ({str(ex)})",
)
except LambdaResponseParseException:
# Mirror the buffered path: a malformed / oversized
# buffered fallback body inside the streaming pipeline
# must surface as the same body-failure response and
# status code, not as a generic invoke failure.
endpoint_service_error = ServiceErrorResponses.lambda_body_failure_response()
except Exception as ex: # pylint: disable=broad-except
# Catch-all for unexpected failures in the streaming
# invoke (e.g. requests.ConnectionError from the upstream
# stream POST). Surface as a Lambda failure response
# instead of a generic 500 from Flask.
LOG.error("Failed to invoke streaming Lambda function: %s", ex, exc_info=True)
endpoint_service_error = ServiceErrorResponses.lambda_failure_response()

if endpoint_service_error:
return endpoint_service_error
return streaming_response

try:
# invoke the route's Lambda function (buffered path, unchanged)
lambda_response = self._invoke_lambda_function(route.function_name, route_lambda_event, tenant_id)
except TenantIdValidationError as e:
endpoint_service_error = ServiceErrorResponses.tenant_id_validation_error(str(e))
Expand Down
Loading