diff --git a/samcli/commands/local/lib/local_lambda.py b/samcli/commands/local/lib/local_lambda.py index 25720aab27b..1bd809f6cd4 100644 --- a/samcli/commands/local/lib/local_lambda.py +++ b/samcli/commands/local/lib/local_lambda.py @@ -271,6 +271,46 @@ def invoke( return headers + def invoke_streaming( + self, + function_identifier: str, + event: str, + tenant_id: Optional[str] = None, + stderr: Optional[StreamWriter] = None, + override_runtime: Optional[str] = None, + function: Optional[Function] = None, + ): + """ + Streaming counterpart of :meth:`invoke`. + + Resolves the function, then asks the underlying LambdaRuntime to + perform a streaming invocation. Returns a tuple + ``(response, cleanup)`` where ``response`` is the raw streaming + :class:`requests.Response` from the local Runtime Interface + Emulator (RIE) and ``cleanup`` is a callable the caller MUST run + when it has finished consuming the body. + + This is used by the API Gateway local service to forward + Lambda response-streaming output (e.g. Server-Sent Events + produced by ``awslambda.streamifyResponse`` in Node.js) to the + browser without buffering. + """ + if not function: + function = self.get_function(function_identifier, tenant_id) + config = self.get_invoke_config(function, override_runtime) + + return self.local_runtime.invoke_streaming( + config, + event, + tenant_id, + debug_context=self.debug_context, + stderr=stderr, + container_host=self.container_host, + container_host_interface=self.container_host_interface, + extra_hosts=self.extra_hosts, + container_dns=self.container_dns, + ) + def is_debugging(self) -> bool: """ Are we debugging the invoke? diff --git a/samcli/local/apigw/local_apigw_service.py b/samcli/local/apigw/local_apigw_service.py index 9a4d5c89840..110fd426aee 100644 --- a/samcli/local/apigw/local_apigw_service.py +++ b/samcli/local/apigw/local_apigw_service.py @@ -32,6 +32,10 @@ from samcli.local.apigw.path_converter import PathConverter from samcli.local.apigw.route import Route from samcli.local.apigw.service_error_responses import ServiceErrorResponses +from samcli.local.apigw.streaming_response import ( + build_streaming_flask_response, + is_streaming_response, +) from samcli.local.docker.exceptions import DockerContainerCreationFailedException from samcli.local.events.api_event import ( ContextHTTP, @@ -60,6 +64,12 @@ def to_url(self, value): class LocalApigwService(BaseLocalService): _DEFAULT_PORT = 3000 _DEFAULT_HOST = "127.0.0.1" + # Hard ceiling on the size of a buffered (non-streaming) Lambda + # response we are willing to materialize in memory. Slightly above + # AWS Lambda's documented 6 MB synchronous response payload limit so + # legitimate responses pass through unmodified, but stops a misbehaving + # or hung runtime from blowing the SAM CLI process up. + _BUFFERED_RESPONSE_BYTE_CAP = 8 * 1024 * 1024 def __init__( self, @@ -630,6 +640,140 @@ def _invoke_lambda_function( return lambda_response + def _route_uses_response_stream(self, route: Route) -> bool: + """Return ``True`` if the function backing ``route`` opts into + Lambda response streaming via ``FunctionUrlConfig.InvokeMode == + RESPONSE_STREAM``. Routes whose function does not declare it + (or whose Function can't be resolved) fall through to the + historical buffered invoke path.""" + if not route.function_name: + return False + provider = getattr(self.lambda_runner, "provider", None) + if provider is None: + return False + try: + function = provider.get(route.function_name) + except Exception: # pylint: disable=broad-except + return False + if function is None: + return False + url_config = getattr(function, "function_url_config", None) or {} + invoke_mode = url_config.get("InvokeMode") if isinstance(url_config, dict) else None + return isinstance(invoke_mode, str) and invoke_mode.upper() == "RESPONSE_STREAM" + + def _invoke_lambda_function_streaming( + self, + lambda_function_name: str, + event: dict, + tenant_id: Optional[str], + cors_headers: Dict[str, str], + ): + """ + Streaming counterpart of :meth:`_invoke_lambda_function`. Only + used for routes whose Lambda function opts in via + ``FunctionUrlConfig.InvokeMode == RESPONSE_STREAM``. + + Issues the invoke against the local RIE with ``stream=True`` so + the response body is exposed to us as it arrives. If the runtime + advertises a streaming response (via the + ``Lambda-Runtime-Function-Response-Mode`` header) we forward the + bytes through to the browser via + :func:`build_streaming_flask_response`. Otherwise we drain the + body, parse the standard API Gateway proxy JSON and return a + regular Flask response built from it — that case is rare for an + opted-in route (it would mean the function is misconfigured) but + keeps the behavior well-defined. + """ + event_str = json.dumps(event, sort_keys=True) + response, cleanup = self.lambda_runner.invoke_streaming( + lambda_function_name, event_str, tenant_id=tenant_id, stderr=self.stderr + ) + + if is_streaming_response(response): + LOG.debug("Lambda function returned a streaming response, forwarding chunks as they arrive") + try: + flask_response = build_streaming_flask_response( + response, + on_complete=cleanup, + extra_headers=cors_headers, + ) + except BaseException: + # build_streaming_flask_response synchronously iterates the + # upstream body to parse the http-integration JSON prelude. + # If that read fails (connection reset, RIE crash, malformed + # body) the Flask response is never built, ``call_on_close`` + # never fires, and the cleanup chain would leak the timeout + # timer, the container's concurrency slot, the open + # requests.Response, and (for cold runtimes) the container + # itself. Run cleanup explicitly here so a construction-time + # failure does not strand resources. + cleanup() + raise + # cleanup is now owned by the Flask response lifecycle + # (Response.call_on_close inside build_streaming_flask_response). + return flask_response + + # Buffered response: drain the body now so we can release the + # container as soon as possible, then route through the existing + # API Gateway parser. + # + # We use iter_content with a hard byte cap instead of + # response.content because the latter would happily allocate + # gigabytes if a misconfigured (or hung) runtime kept writing, + # and would block the container's concurrency slot for the + # entire read. + try: + chunks: list = [] + received = 0 + for chunk in response.iter_content(chunk_size=64 * 1024): + if not chunk: + continue + received += len(chunk) + if received > self._BUFFERED_RESPONSE_BYTE_CAP: + LOG.error( + "Buffered response from streaming-enabled Lambda %s exceeded %d bytes; aborting read", + lambda_function_name, + self._BUFFERED_RESPONSE_BYTE_CAP, + ) + raise LambdaResponseParseException( + f"Lambda response exceeds {self._BUFFERED_RESPONSE_BYTE_CAP} byte cap" + ) + chunks.append(chunk) + body_bytes = b"".join(chunks) + finally: + cleanup() + + # Preserve the historical behaviour of get_lambda_output() which + # round-trips through json.dumps to normalize ordering / encoding. + body_text: Union[str, bytes] + try: + body_text = json.dumps(json.loads(body_bytes), ensure_ascii=False) + except (ValueError, json.JSONDecodeError): + try: + body_text = body_bytes.decode("utf-8") + except UnicodeDecodeError: + body_text = body_bytes # type: ignore[assignment] + + if isinstance(body_text, str) and LambdaOutputParser.is_lambda_error_response(body_text): + raise LambdaResponseParseException + + # An opted-in route should always return a streaming response. + # If we end up here the function is misconfigured (declares + # RESPONSE_STREAM but returned a buffered body). Surface the + # raw body verbatim with CORS headers; we deliberately do NOT + # try to parse it as an API Gateway proxy response — callers + # that need that should remove InvokeMode=RESPONSE_STREAM. + LOG.warning( + "Lambda function %s is declared with InvokeMode=RESPONSE_STREAM but did not stream; " + "returning the raw body verbatim", + lambda_function_name, + ) + return self.service_response( + body_text if isinstance(body_text, str) else body_text.decode("utf-8", errors="replace"), + Headers(cors_headers), + 200, + ) + def _request_handler(self, **kwargs): """ We handle all requests to the host:port. The general flow of handling a request is as follows @@ -735,11 +879,57 @@ def _request_handler(self, **kwargs): return auth_service_error endpoint_service_error = None - try: - # Extract tenant-id from HTTP request header - tenant_id = request.headers.get("X-Amz-Tenant-Id") - # invoke the route's Lambda function + # Extract tenant-id from HTTP request header + tenant_id = request.headers.get("X-Amz-Tenant-Id") + + # The streaming pipeline is opt-in per function via + # FunctionUrlConfig.InvokeMode == RESPONSE_STREAM, mirroring the + # contract Lambda Function URLs use in production. Functions that + # don't declare it keep going through the historical buffered + # invoke path untouched. + use_streaming = self._route_uses_response_stream(route) + + if use_streaming: + streaming_response = None + try: + streaming_response = self._invoke_lambda_function_streaming( + route.function_name, route_lambda_event, tenant_id, cors_headers + ) + except TenantIdValidationError as e: + endpoint_service_error = ServiceErrorResponses.tenant_id_validation_error(str(e)) + except FunctionNotFound: + endpoint_service_error = ServiceErrorResponses.lambda_not_found_response() + except UnsupportedInlineCodeError: + endpoint_service_error = ServiceErrorResponses.not_implemented_locally( + "Inline code is not supported for sam local commands. Please write your code in a separate file." + ) + except DockerContainerCreationFailedException as ex: + endpoint_service_error = ServiceErrorResponses.container_creation_failed(ex.message) + except MissingFunctionNameException as ex: + endpoint_service_error = ServiceErrorResponses.lambda_failure_response( + f"Failed to execute endpoint. Got an invalid function name ({str(ex)})", + ) + except LambdaResponseParseException: + # Mirror the buffered path: a malformed / oversized + # buffered fallback body inside the streaming pipeline + # must surface as the same body-failure response and + # status code, not as a generic invoke failure. + endpoint_service_error = ServiceErrorResponses.lambda_body_failure_response() + except Exception as ex: # pylint: disable=broad-except + # Catch-all for unexpected failures in the streaming + # invoke (e.g. requests.ConnectionError from the upstream + # stream POST). Surface as a Lambda failure response + # instead of a generic 500 from Flask. + LOG.error("Failed to invoke streaming Lambda function: %s", ex, exc_info=True) + endpoint_service_error = ServiceErrorResponses.lambda_failure_response() + + if endpoint_service_error: + return endpoint_service_error + return streaming_response + + try: + # invoke the route's Lambda function (buffered path, unchanged) lambda_response = self._invoke_lambda_function(route.function_name, route_lambda_event, tenant_id) except TenantIdValidationError as e: endpoint_service_error = ServiceErrorResponses.tenant_id_validation_error(str(e)) diff --git a/samcli/local/apigw/streaming_response.py b/samcli/local/apigw/streaming_response.py new file mode 100644 index 00000000000..a5b109cf245 --- /dev/null +++ b/samcli/local/apigw/streaming_response.py @@ -0,0 +1,235 @@ +""" +Helpers for forwarding Lambda response streaming to the local API Gateway +client. + +When a Lambda function uses response streaming (e.g. via the Node.js +``awslambda.streamifyResponse``) the runtime POSTs its response body to the +Runtime API with the ``Lambda-Runtime-Function-Response-Mode: streaming`` +header. The streaming-enabled fork of the Runtime Interface Emulator +forwards this header to the invoke caller (SAM CLI) and emits the body +chunks straight through. This module turns that wire format into a +:class:`flask.Response` that streams chunks to the browser as they arrive, +parsing the optional Lambda HTTP-integration prelude (used by Function +URLs and API Gateway HTTP API) when present. +""" + +from __future__ import annotations + +import json +import logging +from typing import Callable, Dict, Iterable, Iterator, Optional, Tuple + +from flask import Response + +LOG = logging.getLogger(__name__) + +# HTTP header set by the runtime / RIE on streaming responses. +RESPONSE_MODE_HEADER = "Lambda-Runtime-Function-Response-Mode" +RESPONSE_MODE_STREAMING = "streaming" + +# Content type used by Lambda Function URLs / API Gateway HTTP API for +# streamed responses. The body starts with a JSON prelude describing the +# desired HTTP response status, headers and cookies, followed by 8 NUL +# bytes, followed by the raw body bytes. +HTTP_INTEGRATION_CONTENT_TYPE = "application/vnd.awslambda.http-integration-response" +_PRELUDE_DELIMITER = b"\x00" * 8 +# Cap the size of the JSON prelude we are willing to scan for. AWS's +# documented limit is much smaller than this, but we keep some headroom +# for unusual header sets. +_PRELUDE_MAX_BYTES = 64 * 1024 + +# How many bytes we ask urllib3 for on each read. Smaller values give +# slightly lower latency for tiny SSE frames at the cost of more syscalls; +# 4 KiB is a reasonable compromise that still lets a single TCP segment +# carrying multiple SSE events be delivered in one yield. +_STREAM_READ_SIZE = 4096 + + +def is_streaming_response(resp) -> bool: + """Return ``True`` if ``resp`` (a ``requests.Response``) carries the + streaming response-mode header.""" + return resp.headers.get(RESPONSE_MODE_HEADER, "").lower() == RESPONSE_MODE_STREAMING + + +def _read_streaming_chunks(resp) -> Iterator[bytes]: + """Yield raw bytes from a streaming response with minimal buffering. + + We bypass ``iter_content`` and pull from ``raw.read1`` (when + available) because we want each TCP segment to surface as soon as the + network delivers it; ``iter_content`` would otherwise wait until + ``chunk_size`` bytes have accumulated. + """ + raw = getattr(resp, "raw", None) + read1 = getattr(raw, "read1", None) if raw is not None else None + + if callable(read1): + while True: + try: + chunk = read1(_STREAM_READ_SIZE) + except Exception: # pragma: no cover - network noise + LOG.debug("Streaming read failed", exc_info=True) + break + if not chunk: + break + yield chunk + else: + for chunk in resp.iter_content(chunk_size=_STREAM_READ_SIZE): + if chunk: + yield chunk + + +def _peek_prelude(byte_iter: Iterator[bytes]) -> Tuple[Optional[bytes], Iterator[bytes]]: + """ + Pull bytes from ``byte_iter`` until we see the 8-NUL prelude + delimiter or hit ``_PRELUDE_MAX_BYTES`` without finding it. + + Returns a tuple ``(prelude, rest_iter)`` where: + + * ``prelude`` is the JSON-encoded prelude bytes (excluding the + delimiter), or ``None`` if no delimiter was found within the cap + (in which case the bytes already consumed are re-emitted at the + start of ``rest_iter``). + * ``rest_iter`` is an iterator that yields any leftover bytes + followed by the rest of the original iterator. + """ + buffered = bytearray() + for chunk in byte_iter: + buffered.extend(chunk) + idx = bytes(buffered).find(_PRELUDE_DELIMITER) + if idx != -1: + prelude = bytes(buffered[:idx]) + tail = bytes(buffered[idx + len(_PRELUDE_DELIMITER):]) + + def _rest_with_tail(): + if tail: + yield tail + yield from byte_iter + + return prelude, _rest_with_tail() + if len(buffered) >= _PRELUDE_MAX_BYTES: + break + + leftover = bytes(buffered) + + def _rest_with_leftover(): + if leftover: + yield leftover + yield from byte_iter + + return None, _rest_with_leftover() + + +def _parse_prelude(prelude_bytes: bytes) -> Tuple[int, Dict[str, str], list]: + """Parse the JSON prelude. Falls back to defaults for missing fields.""" + try: + decoded = json.loads(prelude_bytes.decode("utf-8")) + except (UnicodeDecodeError, json.JSONDecodeError) as exc: + LOG.warning("Failed to parse Lambda HTTP-integration prelude: %s", exc) + return 200, {}, [] + + if not isinstance(decoded, dict): + return 200, {}, [] + + status_code = int(decoded.get("statusCode", 200)) + headers = decoded.get("headers") or {} + if not isinstance(headers, dict): + headers = {} + cookies = decoded.get("cookies") or [] + if not isinstance(cookies, list): + cookies = [] + # Normalize header values to strings (Lambda allows numbers too). + headers = {str(k): str(v) for k, v in headers.items()} + return status_code, headers, cookies + + +def build_streaming_flask_response( + resp, + on_complete: Callable[[], None], + extra_headers: Optional[Dict[str, str]] = None, +) -> Response: + """ + Build a Flask :class:`~flask.Response` that forwards a streaming + Lambda response body to the HTTP client as it arrives. + + Parameters + ---------- + resp : requests.Response + The streaming response returned by the local RIE invoke. Must + have been opened with ``stream=True``. + on_complete : Callable + Called exactly once after the response generator is fully + consumed (or aborted). Used to release container resources. + extra_headers : Optional[Dict[str, str]] + Additional headers (e.g. CORS) to add to the outgoing response. + + Returns + ------- + flask.Response + A response whose body is a generator producing the Lambda + function's streamed bytes. + """ + extra_headers = extra_headers or {} + upstream_content_type = resp.headers.get("Content-Type", "") + is_http_integration = HTTP_INTEGRATION_CONTENT_TYPE in upstream_content_type + + chunk_iter = _read_streaming_chunks(resp) + + status_code = 200 + response_headers: Dict[str, str] = {} + cookies: list = [] + body_iter: Iterable[bytes] = chunk_iter + + if is_http_integration: + prelude, rest_iter = _peek_prelude(chunk_iter) + if prelude is not None: + status_code, response_headers, cookies = _parse_prelude(prelude) + else: + LOG.warning( + "Streaming response declared http-integration content type but did not " + "contain the 8-NUL prelude delimiter within %d bytes; passing body through verbatim", + _PRELUDE_MAX_BYTES, + ) + body_iter = rest_iter + else: + # Non-integration streaming: pass content-type and other useful + # headers through unchanged. + if upstream_content_type: + response_headers["Content-Type"] = upstream_content_type + + # Make sure intermediaries do not buffer the response. Without this + # some reverse proxies hold the bytes back until the connection + # closes. + response_headers.setdefault("Cache-Control", "no-cache") + response_headers.setdefault("X-Accel-Buffering", "no") + response_headers.update(extra_headers) + + completion_called = {"done": False} + + def _safe_complete() -> None: + if completion_called["done"]: + return + completion_called["done"] = True + try: + on_complete() + except Exception: # pragma: no cover - best effort + LOG.debug("on_complete callback raised", exc_info=True) + + def _wrapped_body() -> Iterator[bytes]: + try: + for chunk in body_iter: + if chunk: + yield chunk + finally: + _safe_complete() + + flask_response = Response(_wrapped_body(), status=status_code) + for key, value in response_headers.items(): + flask_response.headers[key] = value + for cookie in cookies: + if isinstance(cookie, str): + flask_response.headers.add("Set-Cookie", cookie) + + # Hook werkzeug's "response close" lifecycle in case the consumer + # disconnects before the body generator finishes naturally. + flask_response.call_on_close(_safe_complete) + return flask_response diff --git a/samcli/local/docker/container.py b/samcli/local/docker/container.py index ca96dd6aeef..b2a80e57890 100644 --- a/samcli/local/docker/container.py +++ b/samcli/local/docker/container.py @@ -560,6 +560,99 @@ def _make_http_request(self, event, tenant_id=None) -> Tuple[Union[str, bytes], LOG.debug("Failed to deserialize response from RIE, returning the raw response as is") return resp.content, False + # ------------------------------------------------------------------ + # Response streaming support + # ------------------------------------------------------------------ + # The methods below provide a "streaming" companion to the + # _make_http_request / wait_for_http_response pair above. They are used + # by the local API Gateway service when a Lambda function returns its + # response progressively (e.g. via Node.js `awslambda.streamifyResponse` + # which talks the Lambda response streaming protocol). Instead of + # buffering the whole response from RIE, we keep the connection open + # with `stream=True` and hand the raw `requests.Response` back to the + # caller, which will iterate body chunks and forward them to the + # browser. + + def wait_for_streaming_response(self, event: str, tenant_id: Optional[str] = None): + """ + Block until the container's socket is ready, then POST `event` to + the RIE invoke endpoint. + + Returns a two-tuple ``(response, release)`` where: + + * ``response`` is a raw :class:`requests.Response` opened with + ``stream=True``. The caller is responsible for iterating it and + calling ``close()`` when done. + * ``release`` is a zero-arg callable that releases this + invocation's concurrency-semaphore permit. It is **idempotent + per call site**: each call to ``wait_for_streaming_response`` + gets its own closure with its own flag, so two concurrent + invocations against a container with ``AWS_LAMBDA_MAX_CONCURRENCY > 1`` + cannot accidentally overwrite each other's release state and + leak permits. + + This method intentionally does NOT use the ``@retry`` decorator + that ``wait_for_http_response`` does: retrying mid-stream would + misorder bytes already delivered to the client. Connection-level + retries are handled by ``_wait_for_socket_connection`` below. + """ + self.start_logs_thread_if_not_alive(None) + self._wait_for_socket_connection() + + # Honor the per-container concurrency semaphore. Note that for a + # streaming invoke the semaphore is held for the entire duration of + # the stream (not just the HTTP send), which is the correct + # behavior: a streaming function occupies the runtime slot until + # it finishes. The caller MUST invoke the returned ``release`` + # closure when iteration ends. + slot_held = False + if self._concurrency_semaphore: + self._concurrency_semaphore.acquire() + slot_held = True + + # Per-invocation release state. Storing this on ``self`` would + # cause cross-invocation overwrites at ``max_concurrency > 1``; + # closing over locals keeps each call site independent. + released = {"value": False} + release_lock = threading.Lock() + + def release() -> None: + if not slot_held: + return + with release_lock: + if released["value"]: + return + released["value"] = True + # Plain ``threading.Semaphore`` (not BoundedSemaphore) + # would silently let an over-release inflate the permit + # count past ``max_concurrency`` — the guard above is the + # only thing keeping the concurrency cap honest. + self._concurrency_semaphore.release() + + try: + response = self._make_streaming_http_request(event, tenant_id) + except Exception: + # If the request itself blew up, immediately release the + # slot we just acquired; otherwise we would leak permits. + release() + raise + + return response, release + + def _make_streaming_http_request(self, event: str, tenant_id: Optional[str] = None): + headers = {"Content-Type": "application/json"} + if tenant_id is not None: + headers["X-Amz-Tenant-Id"] = tenant_id + LOG.debug("Adding tenant-id header to streaming invoke: %s", tenant_id) + + return requests.post( + self.URL.format(host=self._container_host, port=self.rapid_port_host, function_name="function"), + data=event.encode("utf-8"), + headers=headers, + stream=True, + timeout=(self.RAPID_CONNECTION_TIMEOUT, None), + ) + def wait_for_result(self, full_path, event, stdout, stderr, start_timer=None, tenant_id=None): # NOTE(sriram-mv): Let logging happen in its own thread, so that a http request can be sent. # NOTE(sriram-mv): All logging is re-directed to stderr, so that only the lambda function return diff --git a/samcli/local/lambdafn/runtime.py b/samcli/local/lambdafn/runtime.py index 0473caa9b47..a38d04d0e67 100644 --- a/samcli/local/lambdafn/runtime.py +++ b/samcli/local/lambdafn/runtime.py @@ -361,6 +361,152 @@ def _on_invoke_done(self, container): self._container_manager.stop(container) self._clean_decompressed_paths() + @capture_parameter("runtimeMetric", "runtimes", 1, parameter_nested_identifier="runtime", as_list=True) + def invoke_streaming( + self, + function_config, + event, + tenant_id=None, + debug_context=None, + stderr: Optional[StreamWriter] = None, + container_host=None, + container_host_interface=None, + extra_hosts=None, + container_dns=None, + ): + """ + Streaming counterpart of :meth:`invoke`. + + Starts (or reuses, for warm runtimes) the container, arms a + streaming-specific timeout timer, then issues a single POST to + the RIE invoke endpoint with ``stream=True`` and returns a + tuple ``(response, cleanup)``: + + * ``response`` is the raw :class:`requests.Response` whose body + can be iterated chunk-by-chunk to forward Server-Sent Events + or any other progressively-emitted Lambda response to a + browser. + * ``cleanup`` is a callable the caller MUST invoke once it has + finished reading the body. It cancels the timeout timer, + releases the container's concurrency slot, closes the + response, and (for non-warm runtimes) stops the container. + It is safe to call multiple times. + + Why a dedicated timer instead of :meth:`_configure_interrupt`? + ``_configure_interrupt`` is overridden by :class:`WarmLambdaRuntime` + so that *its* ``timer_handler`` only logs — stopping the + container would defeat warm-container reuse for the buffered + path. But this method returns its :class:`requests.Response` + before the function finishes streaming, so for the streaming + path we need to actively unblock the consumer's + ``iter_content`` when the timeout elapses. The dedicated timer + here closes the upstream response (and only stops the container + for cold runtimes), which makes the consumer's body iterator + EOF and lets the standard ``cleanup`` chain run, while leaving + the warm-container's container reusable for the next request. + """ + if isinstance(function_config, type(None)): # pragma: no cover - defensive + raise ValueError("function_config is required") + + container = None + timer = None + timer_fired = threading.Event() + try: + container = self.create( + function_config, debug_context, container_host, container_host_interface, extra_hosts, container_dns + ) + container = self.run( + container, + function_config, + debug_context, + container_host, + container_host_interface, + extra_hosts, + container_dns, + ) + + if stderr is not None: + container.start_logs_thread_if_not_alive(stderr) + + response, release_slot = container.wait_for_streaming_response(event, tenant_id=tenant_id) + except Exception: + # Failed before we got a response: cancel any timer we + # armed and fall back to the standard cleanup path (stop + # the cold container, leave warm ones alone). + if container is not None: + try: + self._on_invoke_done(container) + except Exception: # pragma: no cover - best effort + LOG.debug("Cleanup after failed streaming invoke errored", exc_info=True) + raise + + is_warm_runtime = isinstance(self, WarmLambdaRuntime) + + def _on_timeout(): + timer_fired.set() + LOG.info( + "Streaming function '%s' timed out after %d seconds", + function_config.full_path, + function_config.timeout, + ) + # Closing the response forces the consumer's ``iter_content`` + # to raise / EOF, which fires the body generator's ``finally`` + # and triggers ``cleanup()`` below — that releases the slot + # and (for cold runtimes) stops the container. + try: + response.close() + except Exception: # pragma: no cover - best effort + LOG.debug("Closing response in timeout handler errored", exc_info=True) + # For cold runtimes we also stop the container right away + # so the function process is killed instead of being left + # to keep writing to a now-closed connection. We intentionally + # do NOT stop warm containers here: WarmLambdaRuntime caches + # them across requests, and stopping would leak a stale entry + # in ``_containers`` that the next invoke would try to reuse. + # RIE's own internal timeout still applies in the warm case. + if not is_warm_runtime: + try: + self._container_manager.stop(container) + except Exception: # pragma: no cover - best effort + LOG.debug("Stopping container in timeout handler errored", exc_info=True) + + # Only arm the timer when not debugging — the buffered path + # skips its timer in debug mode for the same reason. + if function_config.timeout and not debug_context: + timer = threading.Timer(function_config.timeout, _on_timeout) + timer.daemon = True + timer.start() + + cleanup_done = threading.Event() + + def cleanup(): + if cleanup_done.is_set(): + return + cleanup_done.set() + # Cancel the timeout timer first. If it already fired this + # is a no-op; if the stream finished normally this prevents + # the timer from firing late and closing an already-closed + # response. + if timer is not None: + try: + timer.cancel() + except Exception: # pragma: no cover - best effort + LOG.debug("Cancelling timeout timer errored", exc_info=True) + try: + response.close() + except Exception: # pragma: no cover - best effort + LOG.debug("Closing streaming RIE response errored", exc_info=True) + try: + release_slot() + except Exception: # pragma: no cover - best effort + LOG.debug("Releasing streaming slot errored", exc_info=True) + try: + self._on_invoke_done(container) + except Exception: # pragma: no cover - best effort + LOG.debug("Post-streaming container cleanup errored", exc_info=True) + + return response, cleanup + def _check_exit_state(self, container: Container): """ Check and validate the exit state of the invoke container. diff --git a/samcli/local/rapid/aws-lambda-rie-arm64 b/samcli/local/rapid/aws-lambda-rie-arm64 index 5abff92dcd2..ea5d4145a73 100755 Binary files a/samcli/local/rapid/aws-lambda-rie-arm64 and b/samcli/local/rapid/aws-lambda-rie-arm64 differ diff --git a/samcli/local/rapid/aws-lambda-rie-x86_64 b/samcli/local/rapid/aws-lambda-rie-x86_64 index 272f749d476..15c3d2a2d45 100755 Binary files a/samcli/local/rapid/aws-lambda-rie-x86_64 and b/samcli/local/rapid/aws-lambda-rie-x86_64 differ