-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat(local): forward Lambda response streaming through start-api #9028
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,10 @@ | |
| from samcli.local.apigw.path_converter import PathConverter | ||
| from samcli.local.apigw.route import Route | ||
| from samcli.local.apigw.service_error_responses import ServiceErrorResponses | ||
| from samcli.local.apigw.streaming_response import ( | ||
| build_streaming_flask_response, | ||
| is_streaming_response, | ||
| ) | ||
| from samcli.local.docker.exceptions import DockerContainerCreationFailedException | ||
| from samcli.local.events.api_event import ( | ||
| ContextHTTP, | ||
|
|
@@ -60,6 +64,12 @@ def to_url(self, value): | |
| class LocalApigwService(BaseLocalService): | ||
| _DEFAULT_PORT = 3000 | ||
| _DEFAULT_HOST = "127.0.0.1" | ||
| # Hard ceiling on the size of a buffered (non-streaming) Lambda | ||
| # response we are willing to materialize in memory. Slightly above | ||
| # AWS Lambda's documented 6 MB synchronous response payload limit so | ||
| # legitimate responses pass through unmodified, but stops a misbehaving | ||
| # or hung runtime from blowing the SAM CLI process up. | ||
| _BUFFERED_RESPONSE_BYTE_CAP = 8 * 1024 * 1024 | ||
|
|
||
| def __init__( | ||
| self, | ||
|
|
@@ -630,6 +640,140 @@ def _invoke_lambda_function( | |
|
|
||
| return lambda_response | ||
|
|
||
| def _route_uses_response_stream(self, route: Route) -> bool: | ||
| """Return ``True`` if the function backing ``route`` opts into | ||
| Lambda response streaming via ``FunctionUrlConfig.InvokeMode == | ||
| RESPONSE_STREAM``. Routes whose function does not declare it | ||
| (or whose Function can't be resolved) fall through to the | ||
| historical buffered invoke path.""" | ||
| if not route.function_name: | ||
| return False | ||
| provider = getattr(self.lambda_runner, "provider", None) | ||
| if provider is None: | ||
| return False | ||
| try: | ||
| function = provider.get(route.function_name) | ||
| except Exception: # pylint: disable=broad-except | ||
| return False | ||
| if function is None: | ||
| return False | ||
| url_config = getattr(function, "function_url_config", None) or {} | ||
| invoke_mode = url_config.get("InvokeMode") if isinstance(url_config, dict) else None | ||
| return isinstance(invoke_mode, str) and invoke_mode.upper() == "RESPONSE_STREAM" | ||
|
|
||
| def _invoke_lambda_function_streaming( | ||
| self, | ||
| lambda_function_name: str, | ||
| event: dict, | ||
| tenant_id: Optional[str], | ||
| cors_headers: Dict[str, str], | ||
| ): | ||
| """ | ||
| Streaming counterpart of :meth:`_invoke_lambda_function`. Only | ||
| used for routes whose Lambda function opts in via | ||
| ``FunctionUrlConfig.InvokeMode == RESPONSE_STREAM``. | ||
|
|
||
| Issues the invoke against the local RIE with ``stream=True`` so | ||
| the response body is exposed to us as it arrives. If the runtime | ||
| advertises a streaming response (via the | ||
| ``Lambda-Runtime-Function-Response-Mode`` header) we forward the | ||
| bytes through to the browser via | ||
| :func:`build_streaming_flask_response`. Otherwise we drain the | ||
| body, parse the standard API Gateway proxy JSON and return a | ||
| regular Flask response built from it — that case is rare for an | ||
| opted-in route (it would mean the function is misconfigured) but | ||
| keeps the behavior well-defined. | ||
| """ | ||
| event_str = json.dumps(event, sort_keys=True) | ||
| response, cleanup = self.lambda_runner.invoke_streaming( | ||
| lambda_function_name, event_str, tenant_id=tenant_id, stderr=self.stderr | ||
| ) | ||
|
|
||
| if is_streaming_response(response): | ||
| LOG.debug("Lambda function returned a streaming response, forwarding chunks as they arrive") | ||
| try: | ||
| flask_response = build_streaming_flask_response( | ||
| response, | ||
| on_complete=cleanup, | ||
| extra_headers=cors_headers, | ||
| ) | ||
| except BaseException: | ||
| # build_streaming_flask_response synchronously iterates the | ||
| # upstream body to parse the http-integration JSON prelude. | ||
| # If that read fails (connection reset, RIE crash, malformed | ||
| # body) the Flask response is never built, ``call_on_close`` | ||
| # never fires, and the cleanup chain would leak the timeout | ||
| # timer, the container's concurrency slot, the open | ||
| # requests.Response, and (for cold runtimes) the container | ||
| # itself. Run cleanup explicitly here so a construction-time | ||
| # failure does not strand resources. | ||
| cleanup() | ||
| raise | ||
| # cleanup is now owned by the Flask response lifecycle | ||
| # (Response.call_on_close inside build_streaming_flask_response). | ||
| return flask_response | ||
|
|
||
| # Buffered response: drain the body now so we can release the | ||
| # container as soon as possible, then route through the existing | ||
| # API Gateway parser. | ||
| # | ||
| # We use iter_content with a hard byte cap instead of | ||
| # response.content because the latter would happily allocate | ||
| # gigabytes if a misconfigured (or hung) runtime kept writing, | ||
| # and would block the container's concurrency slot for the | ||
| # entire read. | ||
| try: | ||
| chunks: list = [] | ||
| received = 0 | ||
| for chunk in response.iter_content(chunk_size=64 * 1024): | ||
| if not chunk: | ||
| continue | ||
| received += len(chunk) | ||
| if received > self._BUFFERED_RESPONSE_BYTE_CAP: | ||
| LOG.error( | ||
| "Buffered response from streaming-enabled Lambda %s exceeded %d bytes; aborting read", | ||
| lambda_function_name, | ||
| self._BUFFERED_RESPONSE_BYTE_CAP, | ||
| ) | ||
| raise LambdaResponseParseException( | ||
| f"Lambda response exceeds {self._BUFFERED_RESPONSE_BYTE_CAP} byte cap" | ||
| ) | ||
| chunks.append(chunk) | ||
| body_bytes = b"".join(chunks) | ||
| finally: | ||
| cleanup() | ||
|
|
||
| # Preserve the historical behaviour of get_lambda_output() which | ||
| # round-trips through json.dumps to normalize ordering / encoding. | ||
| body_text: Union[str, bytes] | ||
| try: | ||
| body_text = json.dumps(json.loads(body_bytes), ensure_ascii=False) | ||
| except (ValueError, json.JSONDecodeError): | ||
| try: | ||
| body_text = body_bytes.decode("utf-8") | ||
| except UnicodeDecodeError: | ||
| body_text = body_bytes # type: ignore[assignment] | ||
|
|
||
| if isinstance(body_text, str) and LambdaOutputParser.is_lambda_error_response(body_text): | ||
| raise LambdaResponseParseException | ||
|
|
||
| # An opted-in route should always return a streaming response. | ||
| # If we end up here the function is misconfigured (declares | ||
| # RESPONSE_STREAM but returned a buffered body). Surface the | ||
| # raw body verbatim with CORS headers; we deliberately do NOT | ||
| # try to parse it as an API Gateway proxy response — callers | ||
| # that need that should remove InvokeMode=RESPONSE_STREAM. | ||
| LOG.warning( | ||
| "Lambda function %s is declared with InvokeMode=RESPONSE_STREAM but did not stream; " | ||
| "returning the raw body verbatim", | ||
| lambda_function_name, | ||
| ) | ||
| return self.service_response( | ||
| body_text if isinstance(body_text, str) else body_text.decode("utf-8", errors="replace"), | ||
| Headers(cors_headers), | ||
| 200, | ||
| ) | ||
|
|
||
| def _request_handler(self, **kwargs): | ||
| """ | ||
| We handle all requests to the host:port. The general flow of handling a request is as follows | ||
|
|
@@ -735,11 +879,57 @@ def _request_handler(self, **kwargs): | |
| return auth_service_error | ||
|
|
||
| endpoint_service_error = None | ||
| try: | ||
| # Extract tenant-id from HTTP request header | ||
| tenant_id = request.headers.get("X-Amz-Tenant-Id") | ||
|
|
||
| # invoke the route's Lambda function | ||
| # Extract tenant-id from HTTP request header | ||
| tenant_id = request.headers.get("X-Amz-Tenant-Id") | ||
|
|
||
| # The streaming pipeline is opt-in per function via | ||
| # FunctionUrlConfig.InvokeMode == RESPONSE_STREAM, mirroring the | ||
| # contract Lambda Function URLs use in production. Functions that | ||
| # don't declare it keep going through the historical buffered | ||
| # invoke path untouched. | ||
| use_streaming = self._route_uses_response_stream(route) | ||
|
|
||
| if use_streaming: | ||
| streaming_response = None | ||
| try: | ||
| streaming_response = self._invoke_lambda_function_streaming( | ||
| route.function_name, route_lambda_event, tenant_id, cors_headers | ||
| ) | ||
| except TenantIdValidationError as e: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [ERROR_HANDLING] The new streaming branch's
Add an explicit handler before the catch-all to mirror the buffered path: except MissingFunctionNameException as ex:
endpoint_service_error = ServiceErrorResponses.lambda_failure_response(
f"Failed to execute endpoint. Got an invalid function name ({str(ex)})",
)
except LambdaResponseParseException:
endpoint_service_error = ServiceErrorResponses.lambda_body_failure_response()
except Exception as ex: # pylint: disable=broad-except
...
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in f2e262f. Added an explicit |
||
| endpoint_service_error = ServiceErrorResponses.tenant_id_validation_error(str(e)) | ||
| except FunctionNotFound: | ||
| endpoint_service_error = ServiceErrorResponses.lambda_not_found_response() | ||
| except UnsupportedInlineCodeError: | ||
| endpoint_service_error = ServiceErrorResponses.not_implemented_locally( | ||
| "Inline code is not supported for sam local commands. Please write your code in a separate file." | ||
| ) | ||
| except DockerContainerCreationFailedException as ex: | ||
| endpoint_service_error = ServiceErrorResponses.container_creation_failed(ex.message) | ||
| except MissingFunctionNameException as ex: | ||
| endpoint_service_error = ServiceErrorResponses.lambda_failure_response( | ||
| f"Failed to execute endpoint. Got an invalid function name ({str(ex)})", | ||
| ) | ||
| except LambdaResponseParseException: | ||
| # Mirror the buffered path: a malformed / oversized | ||
| # buffered fallback body inside the streaming pipeline | ||
| # must surface as the same body-failure response and | ||
| # status code, not as a generic invoke failure. | ||
| endpoint_service_error = ServiceErrorResponses.lambda_body_failure_response() | ||
| except Exception as ex: # pylint: disable=broad-except | ||
| # Catch-all for unexpected failures in the streaming | ||
| # invoke (e.g. requests.ConnectionError from the upstream | ||
| # stream POST). Surface as a Lambda failure response | ||
| # instead of a generic 500 from Flask. | ||
| LOG.error("Failed to invoke streaming Lambda function: %s", ex, exc_info=True) | ||
| endpoint_service_error = ServiceErrorResponses.lambda_failure_response() | ||
|
|
||
| if endpoint_service_error: | ||
| return endpoint_service_error | ||
| return streaming_response | ||
|
|
||
| try: | ||
| # invoke the route's Lambda function (buffered path, unchanged) | ||
| lambda_response = self._invoke_lambda_function(route.function_name, route_lambda_event, tenant_id) | ||
| except TenantIdValidationError as e: | ||
| endpoint_service_error = ServiceErrorResponses.tenant_id_validation_error(str(e)) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[RESOURCE_MANAGEMENT] -668
response.contenton astream=Trueresponse materializes the entire body in memory in one shot. For a buffered Lambda response that's the same memory profile as today, but on an unexpectedly-large body (e.g. a function that returns megabytes despite not being streaming-tagged) this now reads it all before the slot is released, blocking the container's concurrency slot for longer than necessary and pushing the Python process memory up. Consider whether you want a size cap here, or at least an explicit timeout — the streaming POST usestimeout=(connect, None), so a hung non-streaming runtime that never closes the body would hold the slot indefinitely.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in d82da42. The buffered drain in
_invoke_lambda_function_streamingnow usesresponse.iter_content(chunk_size=64KiB)against a hard_BUFFERED_RESPONSE_BYTE_CAP = 8 MiB(slightly above Lambda's 6 MB sync response limit). If a misconfigured runtime exceeds the cap we abort the read, log, and raiseLambdaResponseParseExceptionso the container slot is released and a user-facing failure response is returned. This path is also now only reached for routes that opted intoRESPONSE_STREAMand accidentally returned a buffered body, so the blast radius is small.