diff --git a/README.md b/README.md index 3712ca9..4c3f570 100644 --- a/README.md +++ b/README.md @@ -96,6 +96,31 @@ print("exit code:", result.exit_code) > Note: Commands run in the `/workspace` directory by default, this is not configurable +### Detached Commands + +Pass `detached=True` to run a long-lived background process. + +```python +# Start a background server +handle = await sandbox.exec("python server.py", detached=True) + +# Wait for it to exit (e.g. after you stop it) +result = await handle.wait() +print("exit code:", result.exit_code) + +# Send a signal to stop it +await handle.kill() +``` + +If the process is still running and you need a handle to it from a different context, use `get_command()` to re-attach by `exec_id`: + +```python +handle = await sandbox.get_command(exec_id, on_output=lambda data, stream: print(data, end="")) +await handle.wait() +``` + +> Note: Only 5 background processes are allowed to run at once currently. + ### File Management ```python diff --git a/pyproject.toml b/pyproject.toml index 4a54b86..d77a6c7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ dev = [ [tool.pytest.ini_options] asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" addopts = "--cov=src/aio_lib_sandbox --cov-report=xml --cov-report=term-missing" [tool.coverage.run] diff --git a/src/aio_lib_sandbox/__init__.py b/src/aio_lib_sandbox/__init__.py index 50d3b57..51dd099 100644 --- a/src/aio_lib_sandbox/__init__.py +++ b/src/aio_lib_sandbox/__init__.py @@ -9,6 +9,7 @@ from .errors import ( SandboxClientError, + SandboxCommandNotFoundError, SandboxInitializationError, SandboxInvalidPortError, SandboxNotFoundError, @@ -21,6 +22,7 @@ from .sandbox import Sandbox from .types import ( SANDBOX_SIZES, + DetachedCommandHandle, EgressRule, ExecResult, ExecTask, @@ -33,6 +35,7 @@ __all__ = [ "Sandbox", + "DetachedCommandHandle", "ExecResult", "ExecTask", "WriteResult", @@ -51,4 +54,5 @@ "SandboxUnauthorizedError", "SandboxTimeoutError", "SandboxWebSocketError", + "SandboxCommandNotFoundError", ] diff --git a/src/aio_lib_sandbox/errors.py b/src/aio_lib_sandbox/errors.py index 65da889..89f93f0 100644 --- a/src/aio_lib_sandbox/errors.py +++ b/src/aio_lib_sandbox/errors.py @@ -34,9 +34,11 @@ class SandboxWebSocketError(SandboxSDKError): """WebSocket transport error.""" +class SandboxCommandNotFoundError(SandboxSDKError): + """Raised by ``get_command()`` when no running process matches the provided exec_id.""" + class SandboxPortNotProvisionedError(SandboxClientError): """Port was not declared in ``create(ports=[...])`` and cannot be retrieved.""" - class SandboxInvalidPortError(SandboxClientError): """Port value is not a valid integer in the range 1–65535.""" diff --git a/src/aio_lib_sandbox/sandbox.py b/src/aio_lib_sandbox/sandbox.py index dd3af2c..e4f142b 100644 --- a/src/aio_lib_sandbox/sandbox.py +++ b/src/aio_lib_sandbox/sandbox.py @@ -19,7 +19,7 @@ normalize_api_host, sandbox_http_error, ) -from .ws import PendingExec, PendingFileOp, WsSession +from .ws import PendingExec, PendingFileOp, PendingGetOp, WsSession from .errors import ( SandboxClientError, SandboxInitializationError, @@ -29,7 +29,7 @@ ) from .types import ( SANDBOX_SIZES, - ExecResult, + DetachedCommandHandle, ExecTask, FileEntry, Policy, @@ -257,29 +257,54 @@ def exec( command: str, *, timeout: float | None = None, + detached: bool = False, on_output: Callable[[str, str], None] | None = None, stdin: str | bytes | None = None, ) -> ExecTask: """Execute a command inside the sandbox. - Returns an :class:`ExecTask` that can be ``await``-ed for the result. - The task's ``exec_id`` attribute can be used with - :meth:`write_stdin` / :meth:`close_stdin` before awaiting. + For **foreground** commands (default), returns an :class:`ExecTask` that + resolves to :class:`ExecResult` when the command exits. + + For **detached** commands (``detached=True``), returns an + :class:`ExecTask` that resolves to a :class:`DetachedCommandHandle` + immediately once the process has started. Up to 5 detached commands may + run concurrently per sandbox. + + ``timeout`` is not supported with ``detached=True``. Args: command: Shell command to run. - timeout: Timeout in milliseconds (not seconds). + timeout: Timeout in milliseconds (foreground only). + detached: When ``True``, run as a detached background process. on_output: Callback invoked with ``(data, stream)`` for each chunk. stdin: Data to write to stdin at startup. Returns: - An :class:`ExecTask` that resolves to an :class:`ExecResult`. + An :class:`ExecTask` that resolves to :class:`ExecResult` (foreground) + or :class:`DetachedCommandHandle` (detached). """ self.ensure_open() + + if detached and timeout is not None: + raise SandboxClientError( + "timeout is not supported with detached=True" + ) + exec_id = f"exec-{secrets.token_hex(12)}" loop = asyncio.get_running_loop() - future: asyncio.Future[ExecResult] = loop.create_future() - pending = PendingExec(future=future, on_output=on_output) + future: asyncio.Future[Any] = loop.create_future() + + wait_future: asyncio.Future[Any] | None = None + if detached: + wait_future = loop.create_future() + + pending = PendingExec( + future=future, + on_output=on_output, + detached=detached, + wait_future=wait_future, + ) if timeout is not None: pending.timeout_handle = loop.call_later( @@ -292,18 +317,63 @@ def exec( self.session.register_exec(exec_id, pending) - async def _run() -> ExecResult: - await self.session.send_frame( - {"type": "exec.run", "execId": exec_id, "command": command} - ) + async def _run() -> Any: + frame: dict[str, Any] = {"type": "exec.run", "execId": exec_id, "command": command} + if detached: + frame["detached"] = True + await self.session.send_frame(frame) pending.started.set() if stdin is not None: await self.write_stdin(exec_id, stdin) await self.close_stdin(exec_id) - return await future + result = await future + if not detached: + return result + return DetachedCommandHandle( + exec_id=exec_id, + pid=result["pid"], + started_at=result["started_at"], + detached=True, + wait_future=wait_future, + sandbox_ref=self, + command=command, + ) return ExecTask(exec_id=exec_id, _task=loop.create_task(_run())) + async def get_command( + self, + exec_id: str, + *, + on_output: Callable[[str, str], None] | None = None, + ) -> DetachedCommandHandle: + """Re-attach to a detached command that is still running in the sandbox. + + Sends an ``exec.get`` frame and resolves with a + :class:`DetachedCommandHandle` once the ``exec.info`` response arrives. + + Args: + exec_id: The ``exec_id`` returned by the original :meth:`exec` call. + on_output: Callback invoked with ``(data, stream)`` for live output + from the re-attach point onward. + + Returns: + A :class:`DetachedCommandHandle`. + + Raises: + SandboxCommandNotFoundError: When no process with that ``exec_id`` + is currently running (already exited or never existed). + """ + self.ensure_open() + loop = asyncio.get_running_loop() + future: asyncio.Future[DetachedCommandHandle] = loop.create_future() + self.session.register_get_op( + exec_id, + PendingGetOp(future=future, on_output=on_output, sandbox_ref=self), + ) + await self.session.send_frame({"type": "exec.get", "execId": exec_id}) + return await future + async def kill(self, exec_id: str, signal: str = "SIGTERM") -> None: """Send a signal to a running command.""" self.ensure_open() @@ -420,19 +490,26 @@ async def destroy(self) -> dict[str, Any]: base = self.management_endpoint or self.api_host url = f"{base}/api/v1/namespaces/{self.namespace}/sandbox/{self.id}" headers = {"Authorization": build_auth_header(self.api_key)} + if self.session: + self.session.begin_intentional_close() - async with httpx.AsyncClient(verify=self.verify_ssl) as client: - try: - resp = await client.delete(url, headers=headers) - except httpx.HTTPError as exc: - raise SandboxClientError( - f"Could not destroy sandbox '{self.id}': {exc}" - ) from exc - - if not resp.is_success: - msg = resp.text - detail = f"Could not destroy sandbox '{self.id}': {resp.status_code}{f' {msg}' if msg else ''}" - raise sandbox_http_error(resp.status_code, detail) + try: + async with httpx.AsyncClient(verify=self.verify_ssl) as client: + try: + resp = await client.delete(url, headers=headers) + except httpx.HTTPError as exc: + raise SandboxClientError( + f"Could not destroy sandbox '{self.id}': {exc}" + ) from exc + + if not resp.is_success: + msg = resp.text + detail = f"Could not destroy sandbox '{self.id}': {resp.status_code}{f' {msg}' if msg else ''}" + raise sandbox_http_error(resp.status_code, detail) + except Exception: + if self.session: + self.session.cancel_intentional_close() + raise payload = resp.json() self.status = payload.get("status", self.status) diff --git a/src/aio_lib_sandbox/types.py b/src/aio_lib_sandbox/types.py index b8f8890..35a7510 100644 --- a/src/aio_lib_sandbox/types.py +++ b/src/aio_lib_sandbox/types.py @@ -8,7 +8,10 @@ import asyncio from collections.abc import Generator from dataclasses import dataclass -from typing import Any, List, Union +from typing import TYPE_CHECKING, Any, List, Optional, Union + +if TYPE_CHECKING: + pass # ------------------------------------------------------------------ @@ -32,7 +35,8 @@ class ExecResult: exec_id: str stdout: str stderr: str - exit_code: int + exit_code: int | None + destroyed: bool = False @dataclass @@ -71,6 +75,57 @@ def __await__(self) -> Generator[Any, None, ExecResult]: return self._task.__await__() +class DetachedCommandHandle: + """Handle for a detached background command. + + Returned by ``await sandbox.exec(cmd, detached=True)`` once the process has + started. Use :meth:`wait` to block until the process exits, + :meth:`write_stdin` / :meth:`close_stdin` to interact with stdin, and + :meth:`kill` to terminate it. + + Do not instantiate directly — use :meth:`Sandbox.exec` with + ``detached=True`` or :meth:`Sandbox.get_command`. + """ + + def __init__( + self, + exec_id: str, + pid: int, + started_at: int, + detached: bool, + wait_future: asyncio.Future[Any], + sandbox_ref: Any, + command: Optional[str] = None, + ) -> None: + self.exec_id = exec_id + self.pid = pid + self.started_at = started_at + self.detached = detached + self.command = command + self._wait_future = wait_future + self._sandbox = sandbox_ref + + async def wait(self) -> dict[str, Any]: + """Wait for the process to exit. + + Returns: + ``{"exit_code": int}`` when the process terminates. + """ + return await self._wait_future + + async def write_stdin(self, data: Union[str, bytes]) -> None: + """Write data to the process stdin.""" + await self._sandbox.write_stdin(self.exec_id, data) + + async def close_stdin(self) -> None: + """Close stdin, sending EOF to the process.""" + await self._sandbox.close_stdin(self.exec_id) + + async def kill(self, signal: str = "SIGTERM") -> None: + """Send a signal to the process.""" + await self._sandbox.kill(self.exec_id, signal) + + # ------------------------------------------------------------------ # Network policy TypedDicts # ------------------------------------------------------------------ diff --git a/src/aio_lib_sandbox/ws.py b/src/aio_lib_sandbox/ws.py index 0b51be9..062cfbd 100644 --- a/src/aio_lib_sandbox/ws.py +++ b/src/aio_lib_sandbox/ws.py @@ -22,11 +22,12 @@ from .frames import is_auth_ack, parse_frame from .errors import ( SandboxClientError, + SandboxCommandNotFoundError, SandboxTimeoutError, SandboxUnauthorizedError, SandboxWebSocketError, ) -from .types import ExecResult, FileEntry, WriteResult +from .types import DetachedCommandHandle, ExecResult, FileEntry, WriteResult logger = logging.getLogger("aio_lib_sandbox") @@ -38,12 +39,16 @@ @dataclass class PendingExec: - future: asyncio.Future[ExecResult] + future: asyncio.Future[Any] started: asyncio.Event = field(default_factory=asyncio.Event) stdout: str = "" stderr: str = "" on_output: Callable[[str, str], None] | None = None timeout_handle: asyncio.TimerHandle | None = None + # Detached-process fields + detached: bool = False + resolved: bool = False # True once the outer future has been set (exec.detached) + wait_future: asyncio.Future[Any] | None = None # resolves on exec.exit for detached @dataclass @@ -51,6 +56,15 @@ class PendingFileOp: future: asyncio.Future[Any] +@dataclass +class PendingGetOp: + """Pending exec.get operation (resolves when exec.info arrives).""" + + future: asyncio.Future[Any] + on_output: Callable[[str, str], None] | None = None + sandbox_ref: Any = None + + # --------------------------------------------------------------------------- # WebSocket session # --------------------------------------------------------------------------- @@ -78,7 +92,9 @@ def __init__( self.ws: websockets.ClientConnection | None = None self.pending_execs: dict[str, PendingExec] = {} self.pending_file_ops: dict[str, PendingFileOp] = {} + self.pending_get_ops: dict[str, PendingGetOp] = {} self.listener_task: asyncio.Task[None] | None = None + self.intentional_close = False # ------------------------------------------------------------------ # Connection @@ -131,14 +147,28 @@ def ensure_open(self) -> None: if self.ws is None: raise SandboxWebSocketError(f"Sandbox '{self.id}' is not connected") - async def close(self) -> None: + def begin_intentional_close(self) -> None: + """Mark the next WebSocket close as expected by sandbox teardown.""" + self.intentional_close = True + + def cancel_intentional_close(self) -> None: + """Clear a previously requested intentional close.""" + self.intentional_close = False + + async def close(self, *, intentional: bool = False) -> None: """Cancel the listener task and close the socket.""" + if intentional: + self.begin_intentional_close() + if self.intentional_close: + self.resolve_all_on_intentional_close() if self.listener_task: self.listener_task.cancel() self.listener_task = None if self.ws: await self.ws.close() self.ws = None + if self.intentional_close: + self.intentional_close = False # ------------------------------------------------------------------ # Pending operation management @@ -150,6 +180,9 @@ def register_exec(self, exec_id: str, pending: PendingExec) -> None: def register_file_op(self, exec_id: str, pending: PendingFileOp) -> None: self.pending_file_ops[exec_id] = pending + def register_get_op(self, exec_id: str, pending: PendingGetOp) -> None: + self.pending_get_ops[exec_id] = pending + def reject_pending( self, store: dict[str, Any], exec_id: str, error: Exception ) -> None: @@ -161,11 +194,73 @@ def reject_pending( if not pending.future.done(): pending.future.set_exception(error) + def reject_exec(self, exec_id: str, error: Exception) -> None: + """Reject a pending exec, honouring the detached/resolved state.""" + pending = self.pending_execs.pop(exec_id, None) + if pending is None: + return + if pending.timeout_handle: + pending.timeout_handle.cancel() + if pending.detached and pending.resolved: + # Outer future already resolved; reject the wait() future instead. + if pending.wait_future and not pending.wait_future.done(): + pending.wait_future.set_exception(error) + else: + if not pending.future.done(): + pending.future.set_exception(error) + def reject_all(self, error: Exception) -> None: for eid in list(self.pending_execs): - self.reject_pending(self.pending_execs, eid, error) + self.reject_exec(eid, error) for eid in list(self.pending_file_ops): self.reject_pending(self.pending_file_ops, eid, error) + for eid in list(self.pending_get_ops): + pending = self.pending_get_ops.pop(eid, None) + if pending and not pending.future.done(): + pending.future.set_exception(error) + + def resolve_exec_on_intentional_close(self, exec_id: str) -> None: + """Resolve a pending exec during an intentional sandbox shutdown.""" + pending = self.pending_execs.pop(exec_id, None) + if pending is None: + return + if pending.timeout_handle: + pending.timeout_handle.cancel() + + wait_result = {"exit_code": None, "destroyed": True} + if pending.detached: + if not pending.future.done(): + pending.resolved = True + pending.future.set_result( + {"pid": None, "started_at": None, "destroyed": True} + ) + if pending.wait_future and not pending.wait_future.done(): + pending.wait_future.set_result(wait_result) + return + + if not pending.future.done(): + pending.future.set_result( + ExecResult( + exec_id=exec_id, + stdout=pending.stdout, + stderr=pending.stderr, + exit_code=None, + destroyed=True, + ) + ) + + def resolve_all_on_intentional_close(self) -> None: + """Drain tracked operations without errors during sandbox destroy.""" + for eid in list(self.pending_execs): + self.resolve_exec_on_intentional_close(eid) + for eid in list(self.pending_file_ops): + pending = self.pending_file_ops.pop(eid, None) + if pending and not pending.future.done(): + pending.future.set_result(None) + for eid in list(self.pending_get_ops): + pending = self.pending_get_ops.pop(eid, None) + if pending and not pending.future.done(): + pending.future.set_result(None) async def wait_for_exec_start(self, exec_id: str) -> None: pending = self.pending_execs.get(exec_id) @@ -198,18 +293,32 @@ async def listen(self) -> None: if frame is None or is_auth_ack(frame, self.id): continue exec_id = frame.get("execId") - if exec_id in self.pending_file_ops: + ftype = frame.get("type") + + # exec.info is always routed to pending_get_ops. + # Error frames for pending get ops also go there (before exec map check). + if ftype == "exec.info" or ( + ftype == "error" and exec_id in self.pending_get_ops + ): + self.handle_get_frame(frame) + elif exec_id in self.pending_file_ops: self.handle_file_frame(frame) elif exec_id in self.pending_execs: self.handle_exec_frame(frame) except websockets.ConnectionClosed as exc: + if self.intentional_close: + self.resolve_all_on_intentional_close() + return + close_code = exc.rcvd.code if exc.rcvd is not None else 1006 self.reject_all( SandboxWebSocketError( - f"Sandbox '{self.id}' WebSocket closed with code {exc.code}" + f"Sandbox '{self.id}' WebSocket closed with code {close_code}" ) ) finally: self.ws = None + if self.intentional_close: + self.intentional_close = False # ------------------------------------------------------------------ # Frame handlers @@ -234,13 +343,40 @@ def handle_exec_frame(self, frame: dict[str, Any]) -> None: pending.on_output(data, stream) return + # Detached ack: resolve the outer future with the raw pid/startedAt data. + # Sandbox._run() wraps this into a DetachedCommandHandle after awaiting. + # The entry stays in pending_execs to receive subsequent output and exec.exit. + if ftype == "exec.detached": + if pending.timeout_handle: + pending.timeout_handle.cancel() + pending.timeout_handle = None + pending.resolved = True + if not pending.future.done(): + pending.future.set_result({"pid": frame.get("pid", 0), "started_at": frame.get("startedAt", 0)}) + return + if ftype == "exec.exit": - self.resolve_exec(exec_id, frame) + self.pending_execs.pop(exec_id, None) + if pending.timeout_handle: + pending.timeout_handle.cancel() + if pending.detached and pending.resolved: + # Detached: outer future already resolved; drive the wait() future. + if pending.wait_future and not pending.wait_future.done(): + pending.wait_future.set_result({"exit_code": frame.get("exitCode", -1)}) + else: + if not pending.future.done(): + pending.future.set_result( + ExecResult( + exec_id=exec_id, + stdout=pending.stdout, + stderr=pending.stderr, + exit_code=frame.get("exitCode", -1), + ) + ) return if ftype == "error": - self.reject_pending( - self.pending_execs, + self.reject_exec( exec_id, SandboxClientError(frame.get("message", f"Command '{exec_id}' failed")), ) @@ -292,23 +428,105 @@ def handle_file_frame(self, frame: dict[str, Any]) -> None: # Resolution helpers # ------------------------------------------------------------------ - def resolve_exec(self, exec_id: str, frame: dict[str, Any]) -> None: - pending = self.pending_execs.pop(exec_id, None) + def resolve_file_op(self, exec_id: str, result: Any) -> None: + pending = self.pending_file_ops.pop(exec_id, None) + if pending and not pending.future.done(): + pending.future.set_result(result) + + def handle_get_frame(self, frame: dict[str, Any]) -> None: + """Handle exec.info (response to exec.get) and related error frames.""" + exec_id = frame.get("execId") + pending = self.pending_get_ops.get(exec_id) if pending is None: return - if pending.timeout_handle: - pending.timeout_handle.cancel() + + if frame.get("type") == "exec.info": + self.resolve_get_op(frame, pending) + elif frame.get("type") == "error": + self.reject_get_op(frame, pending) + + def resolve_get_op(self, frame: dict[str, Any], pending: PendingGetOp) -> None: + """Resolve a pending exec.get by building a command handle and settling the caller's future.""" + exec_id = frame.get("execId") + self.pending_get_ops.pop(exec_id, None) + wait_future = self.resolve_exec_entry(frame, pending) + command_obj = self.build_command_object(frame, wait_future, pending.sandbox_ref) if not pending.future.done(): - pending.future.set_result( - ExecResult( - exec_id=exec_id, - stdout=pending.stdout, - stderr=pending.stderr, - exit_code=frame.get("exitCode", -1), + pending.future.set_result(command_obj) + + def reject_get_op(self, frame: dict[str, Any], pending: PendingGetOp) -> None: + """Reject a pending exec.get with a not-found error.""" + exec_id = frame.get("execId") + self.pending_get_ops.pop(exec_id, None) + if not pending.future.done(): + pending.future.set_exception( + SandboxCommandNotFoundError( + frame.get("message", f"No running process for execId '{exec_id}'") ) ) - def resolve_file_op(self, exec_id: str, result: Any) -> None: - pending = self.pending_file_ops.pop(exec_id, None) - if pending and not pending.future.done(): - pending.future.set_result(result) + def resolve_exec_entry(self, frame: dict[str, Any], pending: PendingGetOp) -> "asyncio.Future[Any]": + """Return the wait future for the exec — reusing an existing entry from the same + session, or registering a fresh reattached entry for a new/previous connection.""" + exec_id = frame.get("execId") + existing = self.pending_execs.get(exec_id) + if existing: + self.merge_on_output_callback(existing, pending.on_output) + return existing.wait_future + return self.register_reattached_exec(frame, pending.on_output) + + def merge_on_output_callback( + self, + existing: PendingExec, + on_output: Callable[[str, str], None] | None, + ) -> None: + """Append ``on_output`` to an existing exec entry's callback chain, + preserving the previous handler.""" + if not on_output: + return + prev = existing.on_output + def merged(data: str, stream: str, _prev=prev, _new=on_output) -> None: + if _prev: + _prev(data, stream) + _new(data, stream) + existing.on_output = merged + + def register_reattached_exec( + self, + frame: dict[str, Any], + on_output: Callable[[str, str], None] | None, + ) -> "asyncio.Future[Any]": + """Create a fresh ``pending_execs`` entry for a process reattached from a + previous connection and return its wait future.""" + exec_id = frame.get("execId") + loop = asyncio.get_running_loop() + wait_future: asyncio.Future[Any] = loop.create_future() + placeholder: asyncio.Future[Any] = loop.create_future() + monitoring = PendingExec( + future=placeholder, + on_output=on_output, + detached=frame.get("detached", False), + resolved=True, + wait_future=wait_future, + ) + placeholder.set_result(None) + self.pending_execs[exec_id] = monitoring + return wait_future + + def build_command_object( + self, + frame: dict[str, Any], + wait_future: "asyncio.Future[Any]", + sandbox: Any, + ) -> DetachedCommandHandle: + """Build the :class:`DetachedCommandHandle` returned to the caller of ``get_command``.""" + exec_id = frame.get("execId") + return DetachedCommandHandle( + exec_id=exec_id, + pid=frame.get("pid", 0), + started_at=frame.get("startedAt", 0), + detached=frame.get("detached", False), + wait_future=wait_future, + sandbox_ref=sandbox, + command=frame.get("command"), + ) diff --git a/tests/test_sandbox.py b/tests/test_sandbox.py index 4f0cd82..a014b81 100644 --- a/tests/test_sandbox.py +++ b/tests/test_sandbox.py @@ -5,12 +5,16 @@ import asyncio import base64 +import json from unittest.mock import AsyncMock, MagicMock, patch +import httpx import pytest +import websockets from aio_lib_sandbox import ( SANDBOX_SIZES, + DetachedCommandHandle, ExecResult, FileEntry, Sandbox, @@ -18,6 +22,7 @@ ) from aio_lib_sandbox.errors import ( SandboxClientError, + SandboxCommandNotFoundError, SandboxInitializationError, SandboxInvalidPortError, SandboxNotFoundError, @@ -27,8 +32,8 @@ SandboxWebSocketError, ) from aio_lib_sandbox.frames import normalize_size +from aio_lib_sandbox.ws import PendingExec, PendingFileOp, PendingGetOp, WsSession from aio_lib_sandbox.sandbox import _parse_preview_urls -from aio_lib_sandbox.ws import PendingExec, PendingFileOp, WsSession # --------------------------------------------------------------------------- # Helpers @@ -77,6 +82,22 @@ def _frame(ws, payload: dict) -> None: # call handle_exec_frame / handle_file_frame directly. +class _AsyncFrameStream: + def __init__(self, *frames): + self.frames = list(frames) + + def __aiter__(self): + return self + + async def __anext__(self): + if not self.frames: + raise StopAsyncIteration + frame = self.frames.pop(0) + if isinstance(frame, BaseException): + raise frame + return frame + + # --------------------------------------------------------------------------- # normalize_size # --------------------------------------------------------------------------- @@ -172,6 +193,9 @@ async def test_create_calls_api_and_connects(self, monkeypatch): "status": "ready", "token": "tok-new", "maxLifetime": 3600, + "previewUrls": { + "3000": "https://sb-new-3000.preview.example.net", + }, } with patch("aio_lib_sandbox.sandbox.api_request", new=AsyncMock(return_value=payload)) as mock_req, \ @@ -185,6 +209,9 @@ async def test_create_calls_api_and_connects(self, monkeypatch): assert sandbox.id == "sb-new" assert sandbox.status == "ready" + assert sandbox.preview_urls == { + 3000: "https://sb-new-3000.preview.example.net", + } mock_req.assert_called_once() mock_connect.assert_called_once() @@ -292,6 +319,174 @@ async def test_create_forwards_ports_and_parses_preview_urls(self): assert sandbox.get_url(3000) == "https://sb-ports-3000.preview.example.net" +# --------------------------------------------------------------------------- +# WebSocket connection +# --------------------------------------------------------------------------- + + +class TestWebSocketConnection: + @pytest.mark.asyncio + async def test_connect_opens_socket_authenticates_and_starts_listener(self): + session = WsSession( + sandbox_id="sb-test", + endpoint="wss://runtime.example.net/ws", + token="tok-abc", + verify_ssl=False, + ) + ws = AsyncMock() + + async def noop_listen(): + return None + + with patch("aio_lib_sandbox.ws.websockets.connect", new=AsyncMock(return_value=ws)) as connect, \ + patch.object(session, "authenticate", new=AsyncMock()) as authenticate, \ + patch.object(session, "listen", new=noop_listen): + await session.connect() + await session.listener_task + + assert session.ws is ws + connect.assert_awaited_once() + assert connect.await_args.kwargs["ssl"].check_hostname is False + authenticate.assert_awaited_once() + + @pytest.mark.asyncio + async def test_connect_is_idempotent_when_socket_exists(self): + session = WsSession( + sandbox_id="sb-test", + endpoint="wss://runtime.example.net/ws", + token="tok-abc", + ) + session.ws = AsyncMock() + + with patch("aio_lib_sandbox.ws.websockets.connect", new=AsyncMock()) as connect: + await session.connect() + + connect.assert_not_called() + + @pytest.mark.asyncio + async def test_connect_wraps_websocket_connect_errors(self): + session = WsSession( + sandbox_id="sb-test", + endpoint="wss://runtime.example.net/ws", + token="tok-abc", + ) + + with patch( + "aio_lib_sandbox.ws.websockets.connect", + new=AsyncMock(side_effect=OSError("network down")), + ): + with pytest.raises(SandboxWebSocketError, match="Could not connect sandbox"): + await session.connect() + + @pytest.mark.asyncio + async def test_authenticate_rejects_non_ack_frame(self): + session = WsSession( + sandbox_id="sb-test", + endpoint="wss://runtime.example.net/ws", + token="tok-abc", + ) + session.ws = AsyncMock() + session.ws.recv = AsyncMock(return_value=json.dumps({"type": "auth.error"})) + + with pytest.raises(SandboxUnauthorizedError, match="rejected"): + await session.authenticate() + + session.ws.send.assert_awaited_once_with( + json.dumps({"type": "auth", "token": "tok-abc"}) + ) + + def test_ensure_open_raises_when_socket_missing(self): + session = WsSession( + sandbox_id="sb-test", + endpoint="wss://runtime.example.net/ws", + token="tok-abc", + ) + + with pytest.raises(SandboxWebSocketError, match="is not connected"): + session.ensure_open() + + @pytest.mark.asyncio + async def test_listen_routes_frames_and_clears_socket(self): + session = WsSession( + sandbox_id="sb-test", + endpoint="wss://runtime.example.net/ws", + token="tok-abc", + ) + session.ws = _AsyncFrameStream( + "not-json", + json.dumps({"type": "auth.ok", "sandboxId": "sb-test"}), + json.dumps({"type": "exec.info", "execId": "get-1"}), + json.dumps({"type": "file.content", "execId": "file-1"}), + json.dumps({"type": "exec.output", "execId": "exec-1"}), + ) + session.pending_get_ops["get-1"] = PendingGetOp( + future=asyncio.get_running_loop().create_future() + ) + session.pending_file_ops["file-1"] = PendingFileOp( + future=asyncio.get_running_loop().create_future() + ) + session.pending_execs["exec-1"] = PendingExec( + future=asyncio.get_running_loop().create_future() + ) + session.handle_get_frame = MagicMock() + session.handle_file_frame = MagicMock() + session.handle_exec_frame = MagicMock() + + await session.listen() + + session.handle_get_frame.assert_called_once_with( + {"type": "exec.info", "execId": "get-1"} + ) + session.handle_file_frame.assert_called_once_with( + {"type": "file.content", "execId": "file-1"} + ) + session.handle_exec_frame.assert_called_once_with( + {"type": "exec.output", "execId": "exec-1"} + ) + assert session.ws is None + + @pytest.mark.asyncio + async def test_listen_rejects_pending_on_unintentional_close(self): + session = WsSession( + sandbox_id="sb-test", + endpoint="wss://runtime.example.net/ws", + token="tok-abc", + ) + future = asyncio.get_running_loop().create_future() + session.pending_execs["exec-1"] = PendingExec(future=future) + session.ws = _AsyncFrameStream(websockets.ConnectionClosedError(None, None)) + + await session.listen() + + with pytest.raises(SandboxWebSocketError, match="closed with code 1006"): + await future + assert session.ws is None + + @pytest.mark.asyncio + async def test_listen_resolves_pending_on_intentional_close(self): + session = WsSession( + sandbox_id="sb-test", + endpoint="wss://runtime.example.net/ws", + token="tok-abc", + ) + future = asyncio.get_running_loop().create_future() + session.pending_execs["exec-1"] = PendingExec(future=future) + session.intentional_close = True + session.ws = _AsyncFrameStream(websockets.ConnectionClosedError(None, None)) + + await session.listen() + + assert await future == ExecResult( + exec_id="exec-1", + stdout="", + stderr="", + exit_code=None, + destroyed=True, + ) + assert session.ws is None + assert session.intentional_close is False + + # --------------------------------------------------------------------------- # Sandbox.get() # --------------------------------------------------------------------------- @@ -453,6 +648,14 @@ def test_exec_raises_when_not_connected(self): with pytest.raises(SandboxWebSocketError): sandbox.exec("cmd") + def test_handle_exec_frame_ignores_missing_pending_exec(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + + sandbox.session.handle_exec_frame( + {"type": "exec.output", "execId": "exec-missing", "data": "ignored"} + ) + # --------------------------------------------------------------------------- # File operations @@ -563,6 +766,28 @@ async def test_list_files_empty(self): result = await future assert result == [] + def test_handle_file_frame_ignores_missing_pending_operation(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + + sandbox.session.handle_file_frame( + {"type": "file.content", "execId": "file-missing", "content": "ignored"} + ) + + @pytest.mark.asyncio + async def test_file_error_frame_rejects_pending_operation(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + future = asyncio.get_running_loop().create_future() + sandbox.session.pending_file_ops["file-err"] = PendingFileOp(future=future) + + sandbox.session.handle_file_frame( + {"type": "error", "execId": "file-err", "message": "read failed"} + ) + + with pytest.raises(SandboxClientError, match="read failed"): + await future + # --------------------------------------------------------------------------- # get_url @@ -622,6 +847,62 @@ async def test_destroy_calls_delete_and_closes(self): assert sandbox.status == "destroyed" ws.close.assert_called_once() + @pytest.mark.asyncio + async def test_destroy_resolves_pending_foreground_exec(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + + mock_resp = MagicMock() + mock_resp.is_success = True + mock_resp.json.return_value = {"status": "destroyed"} + + mock_client = AsyncMock() + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + mock_client.delete = AsyncMock(return_value=mock_resp) + + task = sandbox.exec("sleep 100") + await asyncio.sleep(0) + + with patch("aio_lib_sandbox.sandbox.httpx.AsyncClient", return_value=mock_client): + await sandbox.destroy() + + result = await task + assert result == ExecResult( + exec_id=task.exec_id, + stdout="", + stderr="", + exit_code=None, + destroyed=True, + ) + + @pytest.mark.asyncio + async def test_destroy_resolves_detached_wait(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + + mock_resp = MagicMock() + mock_resp.is_success = True + mock_resp.json.return_value = {"status": "destroyed"} + + mock_client = AsyncMock() + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + mock_client.delete = AsyncMock(return_value=mock_resp) + + task = sandbox.exec("sleep infinity", detached=True) + await asyncio.sleep(0) + sandbox.session.handle_exec_frame( + {"type": "exec.detached", "execId": task.exec_id, "pid": 1234, "startedAt": 100} + ) + handle = await task + wait_task = asyncio.create_task(handle.wait()) + + with patch("aio_lib_sandbox.sandbox.httpx.AsyncClient", return_value=mock_client): + await sandbox.destroy() + + assert await wait_task == {"exit_code": None, "destroyed": True} + @pytest.mark.asyncio async def test_destroy_raises_on_401(self): sandbox = _make_sandbox() @@ -641,6 +922,22 @@ async def test_destroy_raises_on_401(self): with pytest.raises(SandboxUnauthorizedError): await sandbox.destroy() + @pytest.mark.asyncio + async def test_destroy_wraps_http_client_errors_and_clears_intentional_close(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + + mock_client = AsyncMock() + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + mock_client.delete = AsyncMock(side_effect=httpx.HTTPError("boom")) + + with patch("aio_lib_sandbox.sandbox.httpx.AsyncClient", return_value=mock_client): + with pytest.raises(SandboxClientError, match="Could not destroy sandbox"): + await sandbox.destroy() + + assert sandbox.session.intentional_close is False + # --------------------------------------------------------------------------- # WebSocket close drains pending operations @@ -669,6 +966,174 @@ async def test_reject_all_pending_on_close(self): with pytest.raises(SandboxWebSocketError): await file_future + @pytest.mark.asyncio + async def test_reject_all_rejects_pending_get_operations(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + loop = asyncio.get_running_loop() + get_future = loop.create_future() + sandbox.session.pending_get_ops["exec-1"] = PendingGetOp(future=get_future) + + sandbox.session.reject_all(SandboxWebSocketError("closed")) + + with pytest.raises(SandboxWebSocketError, match="closed"): + await get_future + + @pytest.mark.asyncio + async def test_register_file_op_tracks_pending_operation(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + pending = PendingFileOp(future=asyncio.get_running_loop().create_future()) + + sandbox.session.register_file_op("file-1", pending) + + assert sandbox.session.pending_file_ops["file-1"] is pending + + def test_reject_pending_ignores_missing_operation(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + + sandbox.session.reject_pending( + sandbox.session.pending_file_ops, + "file-missing", + SandboxWebSocketError("closed"), + ) + + assert sandbox.session.pending_file_ops == {} + + @pytest.mark.asyncio + async def test_close_with_intentional_flag_resolves_pending_and_cancels_listener(self): + sandbox = _make_sandbox() + ws = _inject_ws(sandbox) + exec_future = asyncio.get_running_loop().create_future() + sandbox.session.pending_execs["exec-1"] = PendingExec(future=exec_future) + sandbox.session.listener_task = asyncio.create_task(asyncio.sleep(60)) + + await sandbox.session.close(intentional=True) + + assert await exec_future == ExecResult( + exec_id="exec-1", + stdout="", + stderr="", + exit_code=None, + destroyed=True, + ) + assert sandbox.session.listener_task is None + assert sandbox.session.ws is None + assert sandbox.session.intentional_close is False + ws.close.assert_awaited_once() + + @pytest.mark.asyncio + async def test_resolve_all_on_intentional_close_resolves_file_and_get_ops(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + loop = asyncio.get_running_loop() + file_future = loop.create_future() + get_future = loop.create_future() + sandbox.session.pending_file_ops["file-1"] = PendingFileOp(future=file_future) + sandbox.session.pending_get_ops["exec-1"] = PendingGetOp(future=get_future) + + sandbox.session.resolve_all_on_intentional_close() + + assert await file_future is None + assert await get_future is None + + @pytest.mark.asyncio + async def test_resolve_detached_before_ack_on_intentional_close(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + task = sandbox.exec("sleep infinity", detached=True) + await asyncio.sleep(0) + + sandbox.session.resolve_exec_on_intentional_close(task.exec_id) + + handle = await task + assert handle.pid is None + assert handle.started_at is None + assert await handle.wait() == {"exit_code": None, "destroyed": True} + + def test_resolve_exec_on_intentional_close_ignores_missing_exec(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + + sandbox.session.resolve_exec_on_intentional_close("exec-missing") + + @pytest.mark.asyncio + async def test_resolve_exec_on_intentional_close_cancels_timeout_handle(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + future = asyncio.get_running_loop().create_future() + timeout_handle = MagicMock() + sandbox.session.pending_execs["exec-1"] = PendingExec( + future=future, + timeout_handle=timeout_handle, + ) + + sandbox.session.resolve_exec_on_intentional_close("exec-1") + + timeout_handle.cancel.assert_called_once() + assert await future == ExecResult( + exec_id="exec-1", + stdout="", + stderr="", + exit_code=None, + destroyed=True, + ) + + def test_reject_exec_ignores_missing_exec(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + + sandbox.session.reject_exec("exec-missing", SandboxWebSocketError("closed")) + + @pytest.mark.asyncio + async def test_reject_exec_cancels_timeout_handle(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + future = asyncio.get_running_loop().create_future() + timeout_handle = MagicMock() + sandbox.session.pending_execs["exec-1"] = PendingExec( + future=future, + timeout_handle=timeout_handle, + ) + + sandbox.session.reject_exec("exec-1", SandboxWebSocketError("closed")) + + timeout_handle.cancel.assert_called_once() + with pytest.raises(SandboxWebSocketError, match="closed"): + await future + + @pytest.mark.asyncio + async def test_wait_for_exec_start_blocks_until_started(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + loop = asyncio.get_running_loop() + pending = PendingExec(future=loop.create_future()) + sandbox.session.pending_execs["exec-1"] = pending + + waiter = asyncio.create_task(sandbox.session.wait_for_exec_start("exec-1")) + await asyncio.sleep(0) + assert not waiter.done() + + pending.started.set() + await waiter + + def test_timeout_exec_still_rejects_when_no_running_loop(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + loop = asyncio.new_event_loop() + try: + future = loop.create_future() + sandbox.session.pending_execs["exec-1"] = PendingExec(future=future) + + sandbox.session.timeout_exec("exec-1", "sleep 10", 1000) + + assert future.done() + with pytest.raises(SandboxTimeoutError): + future.result() + finally: + loop.close() + # --------------------------------------------------------------------------- # Policy passthrough (mirrors aio-lib-runtime-python tests) @@ -739,9 +1204,325 @@ async def _mock_req(method, url, *, api_key, body=None, **kw): # --------------------------------------------------------------------------- -# _parse_preview_urls +# Detached exec # --------------------------------------------------------------------------- +class TestDetachedExec: + @pytest.mark.asyncio + async def test_exec_detached_resolves_with_handle_on_exec_detached(self): + sandbox = _make_sandbox() + ws = _inject_ws(sandbox) + ws.send = AsyncMock() + + task = sandbox.exec("npm run dev", detached=True) + exec_id = task.exec_id + + await asyncio.sleep(0) + + sandbox.session.handle_exec_frame( + {"type": "exec.detached", "execId": exec_id, "pid": 9999, "startedAt": 1234567890} + ) + + handle = await task + assert isinstance(handle, DetachedCommandHandle) + assert handle.exec_id == exec_id + assert handle.pid == 9999 + assert handle.started_at == 1234567890 + assert handle.detached is True + + @pytest.mark.asyncio + async def test_exec_detached_wait_resolves_on_exec_exit(self): + sandbox = _make_sandbox() + ws = _inject_ws(sandbox) + ws.send = AsyncMock() + + task = sandbox.exec("sleep 100", detached=True) + exec_id = task.exec_id + + await asyncio.sleep(0) + sandbox.session.handle_exec_frame( + {"type": "exec.detached", "execId": exec_id, "pid": 1234, "startedAt": 1000} + ) + + handle = await task + + wait_coro = handle.wait() + sandbox.session.handle_exec_frame( + {"type": "exec.exit", "execId": exec_id, "exitCode": 0} + ) + + result = await wait_coro + assert result["exit_code"] == 0 + + @pytest.mark.asyncio + async def test_exec_detached_output_frames_delivered_after_detached_ack(self): + sandbox = _make_sandbox() + ws = _inject_ws(sandbox) + ws.send = AsyncMock() + + chunks = [] + task = sandbox.exec("npm run dev", detached=True, on_output=lambda d, s: chunks.append((d, s))) + exec_id = task.exec_id + + await asyncio.sleep(0) + sandbox.session.handle_exec_frame( + {"type": "exec.detached", "execId": exec_id, "pid": 9000, "startedAt": 1} + ) + await task + + sandbox.session.handle_exec_frame( + {"type": "exec.output", "execId": exec_id, "stream": "stdout", "data": "compiled\n"} + ) + assert chunks == [("compiled\n", "stdout")] + + @pytest.mark.asyncio + async def test_exec_detached_error_after_ack_rejects_wait(self): + sandbox = _make_sandbox() + ws = _inject_ws(sandbox) + ws.send = AsyncMock() + + task = sandbox.exec("bad-cmd", detached=True) + exec_id = task.exec_id + + await asyncio.sleep(0) + sandbox.session.handle_exec_frame( + {"type": "exec.detached", "execId": exec_id, "pid": 1, "startedAt": 1} + ) + handle = await task + + wait_coro = handle.wait() + sandbox.session.handle_exec_frame( + {"type": "error", "execId": exec_id, "message": "process crashed"} + ) + + with pytest.raises(SandboxClientError, match="process crashed"): + await wait_coro + + def test_exec_detached_with_timeout_raises(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + + with pytest.raises(SandboxClientError, match="timeout"): + sandbox.exec("cmd", detached=True, timeout=1000) + + @pytest.mark.asyncio + async def test_detached_handle_forwards_stdin_and_kill_helpers(self): + sandbox = _make_sandbox() + wait_future = asyncio.get_running_loop().create_future() + handle = DetachedCommandHandle( + exec_id="exec-1", + pid=1234, + started_at=100, + detached=True, + wait_future=wait_future, + sandbox_ref=sandbox, + ) + sandbox.write_stdin = AsyncMock() + sandbox.close_stdin = AsyncMock() + sandbox.kill = AsyncMock() + + await handle.write_stdin("input") + await handle.close_stdin() + await handle.kill("SIGKILL") + + sandbox.write_stdin.assert_awaited_once_with("exec-1", "input") + sandbox.close_stdin.assert_awaited_once_with("exec-1") + sandbox.kill.assert_awaited_once_with("exec-1", "SIGKILL") + + @pytest.mark.asyncio + async def test_detached_ack_cancels_timeout_handle(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + loop = asyncio.get_running_loop() + future = loop.create_future() + wait_future = loop.create_future() + timeout_handle = MagicMock() + sandbox.session.pending_execs["exec-1"] = PendingExec( + future=future, + timeout_handle=timeout_handle, + detached=True, + wait_future=wait_future, + ) + + sandbox.session.handle_exec_frame( + {"type": "exec.detached", "execId": "exec-1", "pid": 1234, "startedAt": 100} + ) + + timeout_handle.cancel.assert_called_once() + assert sandbox.session.pending_execs["exec-1"].timeout_handle is None + + @pytest.mark.asyncio + async def test_exec_exit_cancels_timeout_handle(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + future = asyncio.get_running_loop().create_future() + timeout_handle = MagicMock() + sandbox.session.pending_execs["exec-1"] = PendingExec( + future=future, + timeout_handle=timeout_handle, + ) + + sandbox.session.handle_exec_frame( + {"type": "exec.exit", "execId": "exec-1", "exitCode": 0} + ) + + timeout_handle.cancel.assert_called_once() + assert (await future).exit_code == 0 + + +# --------------------------------------------------------------------------- +# get_command +# --------------------------------------------------------------------------- + +class TestGetCommand: + @pytest.mark.asyncio + async def test_get_command_resolves_with_handle_on_exec_info(self): + sandbox = _make_sandbox() + ws = _inject_ws(sandbox) + ws.send = AsyncMock() + + loop = asyncio.get_event_loop() + coro = sandbox.get_command("exec-d1e2f3a4") + task = loop.create_task(coro) + + await asyncio.sleep(0) + + sandbox.session.handle_get_frame({ + "type": "exec.info", + "execId": "exec-d1e2f3a4", + "command": "npm run dev", + "pid": 5678, + "startedAt": 1711036812, + "detached": True, + }) + + handle = await task + assert isinstance(handle, DetachedCommandHandle) + assert handle.exec_id == "exec-d1e2f3a4" + assert handle.command == "npm run dev" + assert handle.pid == 5678 + assert handle.started_at == 1711036812 + + @pytest.mark.asyncio + async def test_get_command_wait_resolves_on_exec_exit(self): + sandbox = _make_sandbox() + ws = _inject_ws(sandbox) + ws.send = AsyncMock() + + loop = asyncio.get_event_loop() + coro = sandbox.get_command("exec-reattach") + get_task = loop.create_task(coro) + + await asyncio.sleep(0) + sandbox.session.handle_get_frame({ + "type": "exec.info", + "execId": "exec-reattach", + "command": "sleep 60", + "pid": 1111, + "startedAt": 100, + "detached": True, + }) + + handle = await get_task + wait_coro = handle.wait() + + sandbox.session.handle_exec_frame( + {"type": "exec.exit", "execId": "exec-reattach", "exitCode": 143} + ) + + result = await wait_coro + assert result["exit_code"] == 143 + + @pytest.mark.asyncio + async def test_get_command_raises_command_not_found_on_error(self): + sandbox = _make_sandbox() + ws = _inject_ws(sandbox) + ws.send = AsyncMock() + + loop = asyncio.get_event_loop() + coro = sandbox.get_command("exec-gone") + get_task = loop.create_task(coro) + + await asyncio.sleep(0) + sandbox.session.handle_get_frame({ + "type": "error", + "execId": "exec-gone", + "code": "NOT_FOUND", + "message": "no running process for execId", + }) + + with pytest.raises(SandboxCommandNotFoundError): + await get_task + + @pytest.mark.asyncio + async def test_get_command_reuses_existing_exec_and_merges_output_callbacks(self): + sandbox = _make_sandbox() + ws = _inject_ws(sandbox) + ws.send = AsyncMock() + original_chunks = [] + reattached_chunks = [] + + task = sandbox.exec( + "npm run dev", + detached=True, + on_output=lambda data, stream: original_chunks.append((data, stream)), + ) + await asyncio.sleep(0) + sandbox.session.handle_exec_frame( + {"type": "exec.detached", "execId": task.exec_id, "pid": 1234, "startedAt": 100} + ) + original_handle = await task + + get_task = asyncio.create_task( + sandbox.get_command( + task.exec_id, + on_output=lambda data, stream: reattached_chunks.append((data, stream)), + ) + ) + await asyncio.sleep(0) + sandbox.session.handle_get_frame({ + "type": "exec.info", + "execId": task.exec_id, + "command": "npm run dev", + "pid": 1234, + "startedAt": 100, + "detached": True, + }) + reattached_handle = await get_task + + assert reattached_handle._wait_future is original_handle._wait_future + sandbox.session.handle_exec_frame( + {"type": "exec.output", "execId": task.exec_id, "stream": "stderr", "data": "ready\n"} + ) + assert original_chunks == [("ready\n", "stderr")] + assert reattached_chunks == [("ready\n", "stderr")] + + @pytest.mark.asyncio + async def test_merge_on_output_callback_ignores_missing_new_callback(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + chunks = [] + pending = PendingExec( + future=asyncio.get_running_loop().create_future(), + on_output=lambda data, stream: chunks.append((data, stream)), + ) + + sandbox.session.merge_on_output_callback(pending, None) + pending.on_output("line\n", "stdout") + + assert chunks == [("line\n", "stdout")] + + def test_handle_get_frame_ignores_missing_pending_operation(self): + sandbox = _make_sandbox() + _inject_ws(sandbox) + + sandbox.session.handle_get_frame( + {"type": "exec.info", "execId": "exec-missing"} + ) + +# --------------------------------------------------------------------------- +# _parse_preview_urls +# --------------------------------------------------------------------------- class TestParsePreviewUrls: def test_returns_empty_for_non_dict(self):