Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,31 @@ print("exit code:", result.exit_code)
> Note: Commands run in the `/workspace` directory by default, this is not configurable


### Detached Commands

Pass `detached=True` to run a long-lived background process.

```python
# Start a background server
handle = await sandbox.exec("python server.py", detached=True)

# Wait for it to exit (e.g. after you stop it)
result = await handle.wait()
print("exit code:", result.exit_code)

# Send a signal to stop it
await handle.kill()
```

If the process is still running and you need a handle to it from a different context, use `get_command()` to re-attach by `exec_id`:

```python
handle = await sandbox.get_command(exec_id, on_output=lambda data, stream: print(data, end=""))
await handle.wait()
```

> Note: Only 5 background processes are allowed to run at once currently.

### File Management

```python
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dev = [

[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
addopts = "--cov=src/aio_lib_sandbox --cov-report=xml --cov-report=term-missing"

[tool.coverage.run]
Expand Down
4 changes: 4 additions & 0 deletions src/aio_lib_sandbox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from .errors import (
SandboxClientError,
SandboxCommandNotFoundError,
SandboxInitializationError,
SandboxInvalidPortError,
SandboxNotFoundError,
Expand All @@ -21,6 +22,7 @@
from .sandbox import Sandbox
from .types import (
SANDBOX_SIZES,
DetachedCommandHandle,
EgressRule,
ExecResult,
ExecTask,
Expand All @@ -33,6 +35,7 @@

__all__ = [
"Sandbox",
"DetachedCommandHandle",
"ExecResult",
"ExecTask",
"WriteResult",
Expand All @@ -51,4 +54,5 @@
"SandboxUnauthorizedError",
"SandboxTimeoutError",
"SandboxWebSocketError",
"SandboxCommandNotFoundError",
]
4 changes: 3 additions & 1 deletion src/aio_lib_sandbox/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ class SandboxWebSocketError(SandboxSDKError):
"""WebSocket transport error."""


class SandboxCommandNotFoundError(SandboxSDKError):
"""Raised by ``get_command()`` when no running process matches the provided exec_id."""

class SandboxPortNotProvisionedError(SandboxClientError):
"""Port was not declared in ``create(ports=[...])`` and cannot be retrieved."""


class SandboxInvalidPortError(SandboxClientError):
"""Port value is not a valid integer in the range 1–65535."""
129 changes: 103 additions & 26 deletions src/aio_lib_sandbox/sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
normalize_api_host,
sandbox_http_error,
)
from .ws import PendingExec, PendingFileOp, WsSession
from .ws import PendingExec, PendingFileOp, PendingGetOp, WsSession
from .errors import (
SandboxClientError,
SandboxInitializationError,
Expand All @@ -29,7 +29,7 @@
)
from .types import (
SANDBOX_SIZES,
ExecResult,
DetachedCommandHandle,
ExecTask,
FileEntry,
Policy,
Expand Down Expand Up @@ -257,29 +257,54 @@ def exec(
command: str,
*,
timeout: float | None = None,
detached: bool = False,
on_output: Callable[[str, str], None] | None = None,
stdin: str | bytes | None = None,
) -> ExecTask:
"""Execute a command inside the sandbox.

Returns an :class:`ExecTask` that can be ``await``-ed for the result.
The task's ``exec_id`` attribute can be used with
:meth:`write_stdin` / :meth:`close_stdin` before awaiting.
For **foreground** commands (default), returns an :class:`ExecTask` that
resolves to :class:`ExecResult` when the command exits.

For **detached** commands (``detached=True``), returns an
:class:`ExecTask` that resolves to a :class:`DetachedCommandHandle`
immediately once the process has started. Up to 5 detached commands may
run concurrently per sandbox.

``timeout`` is not supported with ``detached=True``.

Args:
command: Shell command to run.
timeout: Timeout in milliseconds (not seconds).
timeout: Timeout in milliseconds (foreground only).
detached: When ``True``, run as a detached background process.
on_output: Callback invoked with ``(data, stream)`` for each chunk.
stdin: Data to write to stdin at startup.

Returns:
An :class:`ExecTask` that resolves to an :class:`ExecResult`.
An :class:`ExecTask` that resolves to :class:`ExecResult` (foreground)
or :class:`DetachedCommandHandle` (detached).
"""
self.ensure_open()

if detached and timeout is not None:
raise SandboxClientError(
"timeout is not supported with detached=True"
)

exec_id = f"exec-{secrets.token_hex(12)}"
loop = asyncio.get_running_loop()
future: asyncio.Future[ExecResult] = loop.create_future()
pending = PendingExec(future=future, on_output=on_output)
future: asyncio.Future[Any] = loop.create_future()

wait_future: asyncio.Future[Any] | None = None
if detached:
wait_future = loop.create_future()

pending = PendingExec(
future=future,
on_output=on_output,
detached=detached,
wait_future=wait_future,
)

if timeout is not None:
pending.timeout_handle = loop.call_later(
Expand All @@ -292,18 +317,63 @@ def exec(

self.session.register_exec(exec_id, pending)

async def _run() -> ExecResult:
await self.session.send_frame(
{"type": "exec.run", "execId": exec_id, "command": command}
)
async def _run() -> Any:
frame: dict[str, Any] = {"type": "exec.run", "execId": exec_id, "command": command}
if detached:
frame["detached"] = True
await self.session.send_frame(frame)
pending.started.set()
if stdin is not None:
await self.write_stdin(exec_id, stdin)
await self.close_stdin(exec_id)
return await future
result = await future
if not detached:
return result
return DetachedCommandHandle(
exec_id=exec_id,
pid=result["pid"],
started_at=result["started_at"],
detached=True,
wait_future=wait_future,
sandbox_ref=self,
command=command,
)

return ExecTask(exec_id=exec_id, _task=loop.create_task(_run()))

async def get_command(
self,
exec_id: str,
*,
on_output: Callable[[str, str], None] | None = None,
) -> DetachedCommandHandle:
"""Re-attach to a detached command that is still running in the sandbox.

Sends an ``exec.get`` frame and resolves with a
:class:`DetachedCommandHandle` once the ``exec.info`` response arrives.

Args:
exec_id: The ``exec_id`` returned by the original :meth:`exec` call.
on_output: Callback invoked with ``(data, stream)`` for live output
from the re-attach point onward.

Returns:
A :class:`DetachedCommandHandle`.

Raises:
SandboxCommandNotFoundError: When no process with that ``exec_id``
is currently running (already exited or never existed).
"""
self.ensure_open()
loop = asyncio.get_running_loop()
future: asyncio.Future[DetachedCommandHandle] = loop.create_future()
self.session.register_get_op(
exec_id,
PendingGetOp(future=future, on_output=on_output, sandbox_ref=self),
)
await self.session.send_frame({"type": "exec.get", "execId": exec_id})
return await future

async def kill(self, exec_id: str, signal: str = "SIGTERM") -> None:
"""Send a signal to a running command."""
self.ensure_open()
Expand Down Expand Up @@ -420,19 +490,26 @@ async def destroy(self) -> dict[str, Any]:
base = self.management_endpoint or self.api_host
url = f"{base}/api/v1/namespaces/{self.namespace}/sandbox/{self.id}"
headers = {"Authorization": build_auth_header(self.api_key)}
if self.session:
self.session.begin_intentional_close()

async with httpx.AsyncClient(verify=self.verify_ssl) as client:
try:
resp = await client.delete(url, headers=headers)
except httpx.HTTPError as exc:
raise SandboxClientError(
f"Could not destroy sandbox '{self.id}': {exc}"
) from exc

if not resp.is_success:
msg = resp.text
detail = f"Could not destroy sandbox '{self.id}': {resp.status_code}{f' {msg}' if msg else ''}"
raise sandbox_http_error(resp.status_code, detail)
try:
async with httpx.AsyncClient(verify=self.verify_ssl) as client:
try:
resp = await client.delete(url, headers=headers)
except httpx.HTTPError as exc:
raise SandboxClientError(
f"Could not destroy sandbox '{self.id}': {exc}"
) from exc

if not resp.is_success:
msg = resp.text
detail = f"Could not destroy sandbox '{self.id}': {resp.status_code}{f' {msg}' if msg else ''}"
raise sandbox_http_error(resp.status_code, detail)
except Exception:
if self.session:
self.session.cancel_intentional_close()
raise

payload = resp.json()
self.status = payload.get("status", self.status)
Expand Down
59 changes: 57 additions & 2 deletions src/aio_lib_sandbox/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import asyncio
from collections.abc import Generator
from dataclasses import dataclass
from typing import Any, List, Union
from typing import TYPE_CHECKING, Any, List, Optional, Union

if TYPE_CHECKING:
pass


# ------------------------------------------------------------------
Expand All @@ -32,7 +35,8 @@ class ExecResult:
exec_id: str
stdout: str
stderr: str
exit_code: int
exit_code: int | None
destroyed: bool = False


@dataclass
Expand Down Expand Up @@ -71,6 +75,57 @@ def __await__(self) -> Generator[Any, None, ExecResult]:
return self._task.__await__()


class DetachedCommandHandle:
"""Handle for a detached background command.

Returned by ``await sandbox.exec(cmd, detached=True)`` once the process has
started. Use :meth:`wait` to block until the process exits,
:meth:`write_stdin` / :meth:`close_stdin` to interact with stdin, and
:meth:`kill` to terminate it.

Do not instantiate directly — use :meth:`Sandbox.exec` with
``detached=True`` or :meth:`Sandbox.get_command`.
"""

def __init__(
self,
exec_id: str,
pid: int,
started_at: int,
detached: bool,
wait_future: asyncio.Future[Any],
sandbox_ref: Any,
command: Optional[str] = None,
) -> None:
self.exec_id = exec_id
self.pid = pid
self.started_at = started_at
self.detached = detached
self.command = command
self._wait_future = wait_future
self._sandbox = sandbox_ref

async def wait(self) -> dict[str, Any]:
"""Wait for the process to exit.

Returns:
``{"exit_code": int}`` when the process terminates.
"""
return await self._wait_future

async def write_stdin(self, data: Union[str, bytes]) -> None:
"""Write data to the process stdin."""
await self._sandbox.write_stdin(self.exec_id, data)

async def close_stdin(self) -> None:
"""Close stdin, sending EOF to the process."""
await self._sandbox.close_stdin(self.exec_id)

async def kill(self, signal: str = "SIGTERM") -> None:
"""Send a signal to the process."""
await self._sandbox.kill(self.exec_id, signal)


# ------------------------------------------------------------------
# Network policy TypedDicts
# ------------------------------------------------------------------
Expand Down
Loading
Loading