Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 86 additions & 16 deletions app/services/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,111 @@

logger = logging.getLogger(__name__)

#
def _is_running_in_container() -> bool:
"""
Used to check if we're running in a container or locally for development,
to determine how to resolve paths for Docker mounts.

Returns:
True when the current process appears to be running inside a container.
"""

if os.path.exists("/.dockerenv") or os.path.exists("/run/.containerenv"):
return True

cgroup_paths = ["/proc/self/cgroup", "/proc/1/cgroup"]
keywords = ("docker", "containerd", "kubepods", "podman", "lxc")

for path in cgroup_paths:
if not os.path.exists(path):
continue
try:
with open(path, "r", encoding="utf-8", errors="ignore") as handle:
contents = handle.read()
except OSError:
continue
if any(keyword in contents for keyword in keywords):
return True

return False


def _get_current_container(client):
"""Return the Docker container object for the current container."""
import socket

hostname = socket.gethostname()
try:
return client.containers.get(hostname)
except docker.errors.NotFound:
for container in client.containers.list(all=True):
if hostname == container.name or hostname in container.id:
return container
raise


def get_host_path_for_container_path(container_path: str) -> str:
"""
Resolves the host path corresponding to a given container path by inspecting
the current container's mounts using the Docker socket.

For local debugging (not in a container), returns the container_path as-is.

Args:
container_path (str): The absolute path inside the container to resolve.

Returns:
str: The corresponding absolute path on the host machine.
str: The corresponding absolute path on the host machine (or container_path if local).

Raises:
RuntimeError: If no mount is found covering the given container path.
Exception: If there is an error communicating with Docker or resolving the path.
RuntimeError: If no mount is found covering the given container path (in container only).
Exception: If there is an error communicating with Docker or resolving the path (in container only).
"""


# For local debugging, we assume the container_path is directly accessible on the host
if not _is_running_in_container():
logger.warning(
f"Running locally (not in container). Returning container_path as-is: {container_path}"
)
return container_path
Comment on lines +76 to +81

try:
client = docker.from_env()
import socket
hostname = socket.gethostname()
container = client.containers.get(hostname)
for mount in container.attrs["Mounts"]:

destination = mount.get("Destination", "")
if destination == container_path:
host_source = mount["Source"]
relative = os.path.relpath(container_path, destination)
return os.path.join(host_source, relative).replace("\\", "/")
container = _get_current_container(client)
container_path = os.path.normpath(container_path)

best_mount = None
best_destination = None
for mount in container.attrs.get("Mounts", []):
destination = mount.get("Destination")
if not destination:
continue
destination = os.path.normpath(destination)
if (
container_path == destination
or destination == os.sep
or container_path.startswith(destination + os.sep)
):
if best_destination is None or len(destination) > len(best_destination):
best_mount = mount
best_destination = destination

if best_mount is None:
raise RuntimeError(
f"No mount found covering container path: {container_path}"
)

host_source = best_mount["Source"]
relative = os.path.relpath(container_path, best_destination)
if relative == ".":
return host_source.replace("\\", "/")
return os.path.join(host_source, relative).replace("\\", "/")

except Exception as e:
logger.error(f"Could not resolve host path for {container_path}: {e}")
raise

raise RuntimeError(f"No mount found covering container path: {container_path}")


class LocalExecutor(SimulationExecutor):
def __init__(self, work_dir=None):
Expand Down
Loading