diff --git a/README.md b/README.md index 4f336ab..6c6906b 100644 --- a/README.md +++ b/README.md @@ -545,7 +545,7 @@ Use the Docker Engine `/_ping` endpoint via HaRP’s ExApps HTTP frontend to con curl -fsS \ -H "harp-shared-key: " \ -H "docker-engine-port: 24000" \ - http://127.0.0.1:8780/exapps/app_api/v1.41/_ping + http://127.0.0.1:8780/exapps/app_api/v1.44/_ping ``` * `24000` is the **default** FRP remote port used by the HaRP container for the **built‑in/local** Docker Engine (enabled when `/var/run/docker.sock` is mounted). diff --git a/development/redeploy_host_k8s.sh b/development/redeploy_host_k8s.sh new file mode 100755 index 0000000..f8da05e --- /dev/null +++ b/development/redeploy_host_k8s.sh @@ -0,0 +1,88 @@ +#!/bin/sh +# SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors +# SPDX-License-Identifier: AGPL-3.0-or-later + +# Redeploy HaRP with Kubernetes backend for local development. +# +# Prerequisites: +# - kind cluster "nc-exapps" running (see docs/kubernetes-local-setup.md) +# - kubectl context set to kind-nc-exapps +# - Nextcloud Docker-Dev running with nginx proxy +# - nginx vhost configured to proxy /exapps/ to HaRP (see docs) + +set -e + +# ── Configuration ────────────────────────────────────────────────────── +KIND_CLUSTER="nc-exapps" +KIND_NODE="${KIND_CLUSTER}-control-plane" +K8S_CONTEXT="kind-${KIND_CLUSTER}" +K8S_NAMESPACE="nextcloud-exapps" +K8S_SA="harp-exapps" +NC_DOCKER_NETWORK="master_default" + +HP_SHARED_KEY="some_very_secure_password" +NC_INSTANCE_URL="http://nextcloud.local" +# ─────────────────────────────────────────────────────────────────────── + +echo "==> Obtaining K8s API server URL..." +K8S_API_SERVER=$(kubectl --context "$K8S_CONTEXT" config view --minify -o jsonpath='{.clusters[0].cluster.server}') +echo " API server: $K8S_API_SERVER" + +echo "==> Generating fresh bearer token for SA '$K8S_SA' (valid 1 year)..." +K8S_BEARER_TOKEN=$(kubectl --context "$K8S_CONTEXT" -n "$K8S_NAMESPACE" create token "$K8S_SA" --duration=8760h) +echo " Token generated (${#K8S_BEARER_TOKEN} chars)" + +# ── Ensure kind node can reach the Nextcloud Docker network ─────────── +echo "==> Connecting kind node '$KIND_NODE' to Docker network '$NC_DOCKER_NETWORK'..." +if docker network connect "$NC_DOCKER_NETWORK" "$KIND_NODE" 2>/dev/null; then + echo " Connected." +else + echo " Already connected (or network not found)." +fi + +# Detect the nginx proxy IP on NC_DOCKER_NETWORK for pod DNS resolution. +# Pods inside the kind cluster cannot resolve hostnames like "nextcloud.local" that only exist in the host's /etc/hosts. +# Try to inject hostAliases so that ExApp pods can reach Nextcloud. +echo "==> Detecting nginx proxy IP for host aliases..." +PROXY_IP=$(docker inspect master-proxy-1 \ + --format "{{(index .NetworkSettings.Networks \"$NC_DOCKER_NETWORK\").IPAddress}}" 2>/dev/null || true) +K8S_HOST_ALIASES="" +if [ -n "$PROXY_IP" ]; then + K8S_HOST_ALIASES="nextcloud.local:${PROXY_IP}" + echo " nextcloud.local -> $PROXY_IP" +else + echo " WARNING: Could not detect proxy IP. ExApp pods may not resolve nextcloud.local." +fi + +echo "==> Removing old HaRP container..." +docker container remove --force appapi-harp 2>/dev/null || true + +echo "==> Building HaRP image..." +docker build -t nextcloud-appapi-harp:local . + +echo "==> Starting HaRP container..." +docker run \ + -e HP_SHARED_KEY="$HP_SHARED_KEY" \ + -e NC_INSTANCE_URL="$NC_INSTANCE_URL" \ + -e HP_LOG_LEVEL="info" \ + -e HP_VERBOSE_START="1" \ + -e HP_K8S_ENABLED="true" \ + -e HP_K8S_API_SERVER="$K8S_API_SERVER" \ + -e HP_K8S_BEARER_TOKEN="$K8S_BEARER_TOKEN" \ + -e HP_K8S_NAMESPACE="$K8S_NAMESPACE" \ + -e HP_K8S_VERIFY_SSL="false" \ + -e HP_K8S_HOST_ALIASES="$K8S_HOST_ALIASES" \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v "$(pwd)/certs:/certs" \ + --name appapi-harp -h appapi-harp \ + --restart unless-stopped \ + --network=host \ + -d nextcloud-appapi-harp:local + +echo "==> HaRP container started. Waiting for health check..." +sleep 5 +if docker inspect appapi-harp --format '{{.State.Health.Status}}' 2>/dev/null | grep -q healthy; then + echo "==> HaRP is healthy!" +else + echo "==> HaRP still starting... check with: docker ps | grep harp" +fi diff --git a/haproxy_agent.py b/haproxy_agent.py index f0a35ca..3c6abc1 100644 --- a/haproxy_agent.py +++ b/haproxy_agent.py @@ -12,6 +12,7 @@ import os import re import socket +import ssl import tarfile import time from base64 import b64encode @@ -31,8 +32,26 @@ SPOA_ADDRESS = os.environ.get("HP_SPOA_ADDRESS", "127.0.0.1:9600") SPOA_HOST, SPOA_PORT = SPOA_ADDRESS.rsplit(":", 1) SPOA_PORT = int(SPOA_PORT) +# Kubernetes environment variables +K8S_ENABLED = os.environ.get("HP_K8S_ENABLED", "false").lower() in {"1", "true", "yes"} +K8S_NAMESPACE = os.environ.get("HP_K8S_NAMESPACE", "nextcloud-exapps") +K8S_API_SERVER = os.environ.get("HP_K8S_API_SERVER") # e.g. https://kubernetes.default.svc +K8S_CA_FILE = os.environ.get("HP_K8S_CA_FILE", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") +K8S_TOKEN = os.environ.get("HP_K8S_BEARER_TOKEN") +K8S_TOKEN_FILE = os.environ.get("HP_K8S_BEARER_TOKEN_FILE", "/var/run/secrets/kubernetes.io/serviceaccount/token") +K8S_VERIFY_SSL = os.environ.get("HP_K8S_VERIFY_SSL", "true").lower() != "false" +K8S_STORAGE_CLASS = os.environ.get("HP_K8S_STORAGE_CLASS", "") +K8S_DEFAULT_STORAGE_SIZE = os.environ.get("HP_K8S_DEFAULT_STORAGE_SIZE", "10Gi") +K8S_HOST_ALIASES_RAW = os.environ.get("HP_K8S_HOST_ALIASES", "") # "hostname:ip,hostname2:ip2" +if not K8S_API_SERVER and os.environ.get("KUBERNETES_SERVICE_HOST"): + host = os.environ["KUBERNETES_SERVICE_HOST"] + port = os.environ.get("KUBERNETES_SERVICE_PORT", "443") + K8S_API_SERVER = f"https://{host}:{port}" + +K8S_HTTP_TIMEOUT = aiohttp.ClientTimeout(total=60.0) +K8S_NAME_MAX_LENGTH = 63 # Set up the logging configuration -LOG_LEVEL = os.environ["HP_LOG_LEVEL"].upper() +LOG_LEVEL = os.environ.get("HP_LOG_LEVEL", "INFO").upper() logging.basicConfig(level=LOG_LEVEL) LOGGER = logging.getLogger(__name__) LOGGER.setLevel(level=LOG_LEVEL) @@ -59,7 +78,8 @@ LOGGER.error( "Invalid value for HP_TRUSTED_PROXY_IPS: %s. Client IP detection from headers is disabled. " "The X-Forwarded-For and X-Real-IP headers will not be respected. " - "This can lead to the outer proxy's IP being blocked during a bruteforce attempt instead of the actual client's IP.", + "This can lead to the outer proxy's IP being blocked " + "during a bruteforce attempt instead of the actual client's IP.", e, ) TRUSTED_PROXIES = [] @@ -105,20 +125,51 @@ class NcUser(BaseModel): access_level: AccessLevel = Field(..., description="ADMIN(2), USER(1), or PUBLIC(0)") +def _sanitize_k8s_name(raw: str) -> str: + """Convert an arbitrary string into a DNS-1123 compatible name for Kubernetes.""" + name = raw.lower().replace("_", "-") + name = re.sub(r"[^a-z0-9-]", "-", name) + name = re.sub(r"-+", "-", name).strip("-") + if not name: + name = "exapp" + if len(name) > K8S_NAME_MAX_LENGTH: + name = name[:K8S_NAME_MAX_LENGTH].rstrip("-") + return name + + class ExAppName(BaseModel): name: str = Field(..., description="ExApp name.") instance_id: str = Field("", description="Nextcloud instance ID.") + role_suffix: str = Field("", description="Role suffix, e.g. 'rp', 'idx'.") @computed_field @property def exapp_container_name(self) -> str: - return f"nc_app_{self.instance_id}_{self.name}" if self.instance_id else f"nc_app_{self.name}" + base = f"nc_app_{self.instance_id}_{self.name}" if self.instance_id else f"nc_app_{self.name}" + if self.role_suffix: + base = f"{base}_{self.role_suffix}" + return base @computed_field @property def exapp_container_volume(self) -> str: return f"{self.exapp_container_name}_data" + @computed_field + @property + def exapp_k8s_name(self) -> str: + """K8s Deployment name.""" + return _sanitize_k8s_name(self.exapp_container_name) + + @computed_field + @property + def exapp_k8s_volume_name(self) -> str: + """K8s PVC name.""" + base = _sanitize_k8s_name(self.exapp_container_volume) + if len(base) > K8S_NAME_MAX_LENGTH: + base = base[:K8S_NAME_MAX_LENGTH].rstrip("-") + return base + class CreateExAppMounts(BaseModel): source: str = Field(...) @@ -137,6 +188,17 @@ class CreateExAppPayload(ExAppName): mount_points: list[CreateExAppMounts] = Field([], description="List of mount points for the container.") resource_limits: dict[str, Any] = Field({}, description="Resource limits for the container.") + @model_validator(mode="before") + @classmethod + def accept_k8s_friendly_payload(cls, data: Any) -> Any: + """Map 'image' -> 'image_id' and default network_mode for K8s payloads.""" + if isinstance(data, dict): + if "image_id" not in data and "image" in data: + data = {**data, "image_id": data["image"]} # Allow 'image' instead of 'image_id' + if "network_mode" not in data: + data = {**data, "network_mode": "bridge"} # Default network_mode (used only for Docker) + return data + class RemoveExAppPayload(ExAppName): remove_data: bool = Field(False, description="Flag indicating whether the Docker ExApp volume should be deleted.") @@ -147,6 +209,30 @@ class InstallCertificatesPayload(ExAppName): install_frp_certs: bool = Field(True, description="Flag to control installation of FRP certificates.") +class ExposeExAppPayload(ExAppName): + port: int = Field(..., ge=1, le=65535, description="Port on which the ExApp listens inside the Pod/container.") + expose_type: Literal["nodeport", "clusterip", "loadbalancer", "manual"] = Field("nodeport") + upstream_host: str | None = Field(None, description="Host override. Required for expose_type=manual.") + upstream_port: int | None = Field(None, ge=1, le=65535, description="Port override (manual only).") + service_port: int | None = Field(None, ge=1, le=65535, description="Service port (defaults to payload.port).") + node_port: int | None = Field(None, ge=30000, le=32767) + external_traffic_policy: Literal["Cluster", "Local"] | None = Field(None) + load_balancer_ip: str | None = Field(None) + service_annotations: dict[str, str] = Field(default_factory=dict) + service_labels: dict[str, str] = Field(default_factory=dict) + wait_timeout_seconds: float = Field(60.0, ge=0, le=600) + wait_interval_seconds: float = Field(1.0, ge=0.1, le=10.0) + node_address_type: Literal["InternalIP", "ExternalIP"] = Field("InternalIP") + node_name: str | None = Field(None) + node_label_selector: str | None = Field(None) + + @model_validator(mode="after") + def validate_expose_payload(self) -> Self: + if self.expose_type == "manual" and not self.upstream_host: + raise ValueError("upstream_host is required when expose_type='manual'") + return self + + ############################################################################### # In-memory caches ############################################################################### @@ -312,13 +398,36 @@ async def exapps_msg( if request_headers["harp-shared-key"] != SHARED_KEY: await record_ip_failure(client_ip) return reply.set_txn_var("bad_request", 1) - exapp_record = ExApp( - exapp_token="", - exapp_version=request_headers["ex-app-version"], - host=request_headers["ex-app-host"], - port=int(request_headers["ex-app-port"]), - ) authorization_app_api = request_headers["authorization-app-api"] + # Prefer cached upstream (K8s expose sets correct host/port in cache) + async with EXAPP_CACHE_LOCK: + cached = EXAPP_CACHE.get(exapp_id_lower) + if cached: + exapp_record = cached + else: + exapp_record = ExApp( + exapp_token="", + exapp_version=request_headers["ex-app-version"], + host=request_headers["ex-app-host"], + port=int(request_headers["ex-app-port"]), + ) + # For K8s ExApps: resolve upstream from live Service + k8s_upstream = await _k8s_resolve_exapp_upstream(exapp_id_lower) + if k8s_upstream: + # Fetch full record (with token & routes) so cache is complete + try: + full_record = await nc_get_exapp(exapp_id_lower) + except Exception: + full_record = None + if full_record: + exapp_record = full_record + exapp_record.host, exapp_record.port = k8s_upstream + exapp_record.resolved_host = "" + LOGGER.info("Resolved K8s upstream for '%s': %s:%d", exapp_id, *k8s_upstream) + # Only cache if we have the full record (token + routes) + if full_record: + async with EXAPP_CACHE_LOCK: + EXAPP_CACHE[exapp_id_lower] = exapp_record if not exapp_record: async with EXAPP_CACHE_LOCK: @@ -330,6 +439,12 @@ async def exapps_msg( LOGGER.error("No such ExApp enabled: %s", exapp_id) await record_ip_failure(client_ip_str) return reply.set_txn_var("not_found", 1) + # For K8s ExApps: resolve upstream from live Service + k8s_upstream = await _k8s_resolve_exapp_upstream(exapp_id_lower) + if k8s_upstream: + exapp_record.host, exapp_record.port = k8s_upstream + exapp_record.resolved_host = "" + LOGGER.info("Resolved K8s upstream for '%s': %s:%d", exapp_id, *k8s_upstream) LOGGER.info("Received new ExApp record: %s", exapp_record) EXAPP_CACHE[exapp_id_lower] = exapp_record except ValidationError as e: @@ -1663,6 +1778,962 @@ def _get_certificate_update_command(os_info_content: str | None) -> list[str] | return None +############################################################################### +# Kubernetes helpers functions +############################################################################### + + +async def _parse_json_payload(request: web.Request, model: type[BaseModel]) -> Any: + """Parse JSON body and validate against a Pydantic model.""" + try: + payload_dict = await request.json() + except json.JSONDecodeError: + raise web.HTTPBadRequest(text="Invalid JSON body") from None + try: + return model.model_validate(payload_dict) + except ValidationError as e: + raise web.HTTPBadRequest(text=f"Payload validation error: {e}") from None + + +def _k8s_error_msg(data: dict[str, Any] | None, text: str) -> str: + """Extract a human-readable message from a K8s API response.""" + if isinstance(data, dict): + return (data or {}).get("message", text) + return text + + +def _get_k8s_token() -> str | None: + """Get (and cache) the Kubernetes bearer token.""" + global K8S_TOKEN + if K8S_TOKEN: + return K8S_TOKEN.strip() + if K8S_TOKEN_FILE and os.path.exists(K8S_TOKEN_FILE): + try: + with open(K8S_TOKEN_FILE, encoding="utf-8") as f: + token = f.read().strip() + if token: + K8S_TOKEN = token + return token + except Exception as e: + LOGGER.error("Failed to read Kubernetes token file '%s': %s", K8S_TOKEN_FILE, e) + LOGGER.error( + "Kubernetes bearer token not found. " + "Set HP_K8S_BEARER_TOKEN or HP_K8S_BEARER_TOKEN_FILE when HP_K8S_ENABLED=true." + ) + return None + + +def _get_k8s_ssl_context() -> ssl.SSLContext | bool: + """Return SSL context (or False to disable verification) for K8s API.""" + if not K8S_API_SERVER or not K8S_API_SERVER.startswith("https"): + return False + if not K8S_VERIFY_SSL: + return False + try: + cafile = K8S_CA_FILE if K8S_CA_FILE and os.path.exists(K8S_CA_FILE) else None + return ssl.create_default_context(cafile=cafile) + except Exception as e: + LOGGER.warning("Failed to create SSL context for Kubernetes API: %s", e) + return ssl.create_default_context() + + +def _ensure_k8s_configured() -> None: + if not K8S_ENABLED: + LOGGER.error("Kubernetes backend requested but HP_K8S_ENABLED is not true.") + raise web.HTTPServiceUnavailable(text="Kubernetes backend is disabled in HaRP.") + if not K8S_API_SERVER: + LOGGER.error("Kubernetes backend requested but HP_K8S_API_SERVER is not configured.") + raise web.HTTPServiceUnavailable(text="Kubernetes API server is not configured.") + if not _get_k8s_token(): + raise web.HTTPServiceUnavailable(text="Kubernetes token is not configured.") + + +async def _k8s_request( + method: str, + path: str, + *, + query: dict[str, str] | None = None, + json_body: Any | None = None, + content_type: str | None = None, +) -> tuple[int, dict[str, Any] | None, str]: + """Low-level helper for talking to the Kubernetes API.""" + _ensure_k8s_configured() + token = _get_k8s_token() + headers: dict[str, str] = { + "Authorization": f"Bearer {token}", + "Accept": "application/json", + } + if json_body is not None: + headers["Content-Type"] = content_type or "application/json" + + url = f"{K8S_API_SERVER}{path}" + ssl_ctx = _get_k8s_ssl_context() + connector = aiohttp.TCPConnector(ssl=ssl_ctx) + + async with aiohttp.ClientSession(timeout=K8S_HTTP_TIMEOUT, connector=connector) as session: + try: + async with session.request(method.upper(), url, headers=headers, params=query, json=json_body) as resp: + text = await resp.text() + data: dict[str, Any] | None = None + if "application/json" in resp.headers.get("Content-Type", "") and text: + try: + data = json.loads(text) + except json.JSONDecodeError: + LOGGER.warning("Failed to parse JSON from Kubernetes API %s %s: %s", method, url, text[:200]) + return resp.status, data, text + except aiohttp.ClientError as e: + LOGGER.error("Error communicating with Kubernetes API (%s %s): %s", method, url, e) + raise web.HTTPServiceUnavailable(text="Error communicating with Kubernetes API") from e + + +def _k8s_parse_env(env_list: list[str]) -> list[dict[str, str]]: + """Convert ['KEY=VALUE', ...] to Kubernetes env entries.""" + result: list[dict[str, str]] = [] + for raw in env_list: + if not raw: + continue + if "=" in raw: + name, value = raw.split("=", 1) + result.append({"name": name, "value": value}) + else: + result.append({"name": raw, "value": ""}) # No '=', keep name and use empty value + return result + + +def _k8s_build_resources(resource_limits: dict[str, Any], compute_device: str = "cpu") -> dict[str, Any]: + """Convert resource_limits dict to K8s resources spec.""" + if not resource_limits and compute_device == "cpu": + return {} + limits: dict[str, str] = {} + requests: dict[str, str] = {} + + # Memory + mem_val = resource_limits.get("memory") + mem_str: str | None = None + if isinstance(mem_val, int) and mem_val > 0: + # bytes -> Mi (ceil) + mem_mi = (mem_val + (1024 * 1024 - 1)) // (1024 * 1024) + mem_str = f"{mem_mi}Mi" + elif isinstance(mem_val, str) and mem_val: + mem_str = mem_val # Already in K8s units, e.g. "512Mi" + + if mem_str: + limits["memory"] = mem_str + requests["memory"] = mem_str # conservative: same as limit + + # CPU + cpu_str: str | None = None + nano_cpus = resource_limits.get("nanoCPUs") + if isinstance(nano_cpus, int) and nano_cpus > 0: + milli = (nano_cpus * 1000 + 1_000_000_000 - 1) // 1_000_000_000 # 1e9 nanoCPUs = 1 CPU => millicores + milli = max(1, milli) + cpu_str = f"{milli}m" + else: + cpu_val = resource_limits.get("cpu") + if isinstance(cpu_val, str) and cpu_val: + cpu_str = cpu_val # Already in K8s units, e.g. "500m" + + if cpu_str: + limits["cpu"] = cpu_str + requests["cpu"] = cpu_str + + if compute_device == "cuda": + limits["nvidia.com/gpu"] = "1" + + res: dict[str, Any] = {} + if limits: + res["limits"] = limits + if requests: + res["requests"] = requests + return res + + +def _k8s_parse_host_aliases() -> list[dict[str, Any]]: + """Parse HP_K8S_HOST_ALIASES into K8s hostAliases format.""" + if not K8S_HOST_ALIASES_RAW.strip(): + return [] + + ip_to_hosts: dict[str, list[str]] = {} + for entry in K8S_HOST_ALIASES_RAW.split(","): + entry = entry.strip() + if not entry or ":" not in entry: + continue + hostname, ip_addr = entry.rsplit(":", 1) + hostname = hostname.strip() + ip_addr = ip_addr.strip() + if hostname and ip_addr: + ip_to_hosts.setdefault(ip_addr, []).append(hostname) + + return [{"ip": ip, "hostnames": hosts} for ip, hosts in ip_to_hosts.items()] + + +async def _k8s_ensure_coredns_host_aliases() -> None: + """Patch CoreDNS to resolve HP_K8S_HOST_ALIASES cluster-wide.""" + if not K8S_ENABLED or not K8S_HOST_ALIASES_RAW.strip(): + return + + host_aliases = _k8s_parse_host_aliases() + if not host_aliases: + return + + LOGGER.info("Ensuring CoreDNS resolves host aliases: %s", K8S_HOST_ALIASES_RAW) + + try: + status, data, _text = await _k8s_request( + "GET", "/api/v1/namespaces/kube-system/configmaps/coredns", + ) + if status != 200 or not data: + LOGGER.warning("Could not read CoreDNS ConfigMap (HTTP %d), skipping.", status) + return + + corefile = data.get("data", {}).get("Corefile", "") + if not corefile: + LOGGER.warning("CoreDNS ConfigMap has no Corefile entry, skipping.") + return + + hosts_lines: list[str] = [] + for alias in host_aliases: + for hostname in alias["hostnames"]: + hosts_lines.append(f" {alias['ip']} {hostname}") + hosts_block = "hosts {\n" + "\n".join(hosts_lines) + "\n fallthrough\n }" + + hosts_re = re.compile(r"hosts\s*\{[^}]*\}") + if hosts_re.search(corefile): + new_corefile = hosts_re.sub(hosts_block, corefile, count=1) + elif "forward ." in corefile: + new_corefile = corefile.replace("forward .", f"{hosts_block}\n forward .", 1) + else: + LOGGER.warning( + "CoreDNS Corefile has no 'hosts' block and no 'forward' directive, cannot patch." + ) + return + + if new_corefile == corefile: + LOGGER.info("CoreDNS already has correct host aliases, no patch needed.") + return + + status, _, _text = await _k8s_request( + "PATCH", + "/api/v1/namespaces/kube-system/configmaps/coredns", + json_body={"data": {"Corefile": new_corefile}}, + content_type="application/strategic-merge-patch+json", + ) + if status != 200: + LOGGER.warning("Failed to patch CoreDNS ConfigMap (HTTP %d): %s", status, _text[:200]) + return + + restart_annotation = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + status, _, _text = await _k8s_request( + "PATCH", + "/apis/apps/v1/namespaces/kube-system/deployments/coredns", + json_body={ + "spec": {"template": {"metadata": {"annotations": { + "harp.nextcloud.com/restartedAt": restart_annotation, + }}}} + }, + content_type="application/strategic-merge-patch+json", + ) + if status != 200: + LOGGER.warning("Failed to restart CoreDNS Deployment (HTTP %d): %s", status, _text[:200]) + return + + LOGGER.info("CoreDNS patched and restarted with host aliases: %s", K8S_HOST_ALIASES_RAW) + + except Exception as exc: + LOGGER.warning("Failed to configure CoreDNS host aliases (non-fatal): %s", exc) + + +def _k8s_build_deployment_manifest(payload: CreateExAppPayload, replicas: int) -> dict[str, Any]: + """Build a Deployment manifest from CreateExAppPayload.""" + deployment_name = payload.exapp_k8s_name + pvc_name = payload.exapp_k8s_volume_name + + # Base ExApp name (without role suffix) - used for grouping multi-role Deployments. + base_exapp_name = _sanitize_k8s_name( + f"nc_app_{payload.instance_id}_{payload.name}" if payload.instance_id else f"nc_app_{payload.name}" + ) + + labels = { + "app": deployment_name, + "app.kubernetes.io/name": deployment_name, + "app.kubernetes.io/part-of": base_exapp_name, + "app.kubernetes.io/component": "exapp", + } + if payload.instance_id: + labels["app.kubernetes.io/instance"] = payload.instance_id + if payload.role_suffix: + labels["app.kubernetes.io/role"] = _sanitize_k8s_name(payload.role_suffix) + + container: dict[str, Any] = { + "name": "app", + "image": payload.image_id, + "imagePullPolicy": "IfNotPresent", + "env": _k8s_parse_env(payload.environment_variables), + } + + resources = _k8s_build_resources(payload.resource_limits, payload.compute_device) + if resources: + container["resources"] = resources + + # Main data volume + volumes = [ + { + "name": "data", + "persistentVolumeClaim": {"claimName": pvc_name}, + } + ] + volume_mounts = [ + { + "name": "data", + "mountPath": f"/{payload.exapp_container_volume}", + } + ] + + if payload.mount_points: + LOGGER.warning( + "Kubernetes backend currently ignores additional mount_points for ExApp '%s'.", + deployment_name, + ) + + container["volumeMounts"] = volume_mounts + + pod_spec: dict[str, Any] = {"containers": [container], "volumes": volumes} + host_aliases = _k8s_parse_host_aliases() + if host_aliases: + pod_spec["hostAliases"] = host_aliases + + return { + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": {"name": deployment_name, "labels": labels}, + "spec": { + "replicas": replicas, + "selector": {"matchLabels": {"app": deployment_name}}, + "template": {"metadata": {"labels": labels}, "spec": pod_spec}, + }, + } + + +def _k8s_build_service_manifest( + payload: ExposeExAppPayload, service_type: Literal["NodePort", "ClusterIP", "LoadBalancer"] +) -> dict[str, Any]: + service_name = payload.exapp_k8s_name + base_exapp_name = _sanitize_k8s_name( + f"nc_app_{payload.instance_id}_{payload.name}" if payload.instance_id else f"nc_app_{payload.name}" + ) + labels = { + "app": service_name, + "app.kubernetes.io/name": service_name, + "app.kubernetes.io/part-of": base_exapp_name, + "app.kubernetes.io/component": "exapp", + **(payload.service_labels or {}), + } + if payload.instance_id: + labels.setdefault("app.kubernetes.io/instance", payload.instance_id) + if payload.role_suffix: + labels["app.kubernetes.io/role"] = _sanitize_k8s_name(payload.role_suffix) + + metadata: dict[str, Any] = {"name": service_name, "labels": labels} + if payload.service_annotations: + metadata["annotations"] = payload.service_annotations + + svc_port = payload.service_port or payload.port + port_entry: dict[str, Any] = { + "name": "http", + "port": svc_port, + "targetPort": payload.port, + } + if service_type == "NodePort" and payload.node_port: + port_entry["nodePort"] = payload.node_port + + spec: dict[str, Any] = { + "type": service_type, + "selector": {"app": service_name}, + "ports": [port_entry], + } + + if payload.external_traffic_policy and service_type in ("NodePort", "LoadBalancer"): + spec["externalTrafficPolicy"] = payload.external_traffic_policy + + if service_type == "LoadBalancer" and payload.load_balancer_ip: + spec["loadBalancerIP"] = payload.load_balancer_ip + + return { + "apiVersion": "v1", + "kind": "Service", + "metadata": metadata, + "spec": spec, + } + + +async def _k8s_resolve_exapp_upstream(app_name: str) -> tuple[str, int] | None: + """Look up K8s Service for an ExApp, return (host, port) or None.""" + if not K8S_ENABLED or not K8S_API_SERVER or not _get_k8s_token(): + return None + + try: + exapp = ExAppName(name=app_name) + except Exception: + return None + + svc: dict[str, Any] | None = None + + # Try the base Service name first (single-role / legacy ExApps). + service_name = exapp.exapp_k8s_name + status, data, _ = await _k8s_request( + "GET", + f"/api/v1/namespaces/{K8S_NAMESPACE}/services/{service_name}", + ) + if status == 200 and isinstance(data, dict): + svc = data + + # Fallback: search for any Service labelled for this ExApp (multi-role). + if svc is None: + base_k8s_name = _sanitize_k8s_name(f"nc_app_{app_name}") + label_selector = f"app.kubernetes.io/part-of={base_k8s_name}" + status, svc_list, _ = await _k8s_request( + "GET", + f"/api/v1/namespaces/{K8S_NAMESPACE}/services?labelSelector={label_selector}", + ) + if status == 200 and isinstance(svc_list, dict): + items = svc_list.get("items") or [] + if items: + svc = items[0] + service_name = (svc.get("metadata") or {}).get("name", service_name) + + if svc is None: + return None + + svc_type = (svc.get("spec") or {}).get("type", "ClusterIP") + try: + if svc_type == "NodePort": + port = _k8s_extract_nodeport(svc) + host = await _k8s_pick_node_address(preferred_type="InternalIP") + return (host, port) + if svc_type == "ClusterIP": + port = _k8s_extract_service_port(svc) + host = _k8s_service_dns_name(service_name, K8S_NAMESPACE) + return (host, port) + if svc_type == "LoadBalancer": + port = _k8s_extract_service_port(svc) + host = _k8s_extract_loadbalancer_host(svc) + if host: + return (host, port) + except Exception as e: + LOGGER.warning("Failed to resolve K8s upstream for '%s': %s", app_name, e) + return None + + +def _k8s_service_dns_name(service_name: str, namespace: str) -> str: + # Cluster domain suffix is typically .svc.cluster.local, but .svc is enough inside most resolvers. + return f"{service_name}.{namespace}.svc" + + +async def _k8s_pick_node_address( + *, + preferred_type: Literal["InternalIP", "ExternalIP"], + node_name: str | None = None, + label_selector: str | None = None, +) -> str: + query = {"labelSelector": label_selector} if label_selector else None + status, nodes_data, text = await _k8s_request("GET", "/api/v1/nodes", query=query) + if status != 200 or not isinstance(nodes_data, dict): + raise web.HTTPServiceUnavailable( + text=f"Failed to list K8s nodes: Status {status}, {_k8s_error_msg(nodes_data, text)}" + ) + + items = nodes_data.get("items", []) + if node_name: + items = [n for n in items if n.get("metadata", {}).get("name") == node_name] + + if not items: + raise web.HTTPServiceUnavailable(text="No Kubernetes nodes found (after filtering).") + + def is_ready(node: dict[str, Any]) -> bool: + for cond in node.get("status", {}).get("conditions", []) or []: + if cond.get("type") == "Ready" and cond.get("status") == "True": + return True + return False + + ready_nodes = [n for n in items if is_ready(n)] + nodes = ready_nodes or items + + fallback_type = "ExternalIP" if preferred_type == "InternalIP" else "InternalIP" + address_type_order = [preferred_type, fallback_type, "Hostname"] + + for node in nodes: + for t in address_type_order: + for addr in node.get("status", {}).get("addresses", []) or []: + if addr.get("type") == t and addr.get("address"): + return str(addr["address"]) + + raise web.HTTPServiceUnavailable(text="Could not determine a node address (no InternalIP/ExternalIP/Hostname).") + + +def _k8s_extract_nodeport(service: dict[str, Any]) -> int: + ports = (service.get("spec") or {}).get("ports") or [] + if not ports or "nodePort" not in ports[0]: + raise web.HTTPServiceUnavailable(text="Service has no nodePort assigned.") + return int(ports[0]["nodePort"]) + + +def _k8s_extract_service_port(service: dict[str, Any]) -> int: + ports = (service.get("spec") or {}).get("ports") or [] + if not ports or "port" not in ports[0]: + raise web.HTTPServiceUnavailable(text="Service has no port defined.") + return int(ports[0]["port"]) + + +def _k8s_extract_loadbalancer_host(service: dict[str, Any]) -> str | None: + ingress = ((service.get("status") or {}).get("loadBalancer") or {}).get("ingress") or [] + if not ingress: + return None + first = ingress[0] or {} + return first.get("ip") or first.get("hostname") + + +async def _k8s_wait_for_loadbalancer_host(service_name: str, timeout_s: float, interval_s: float) -> str: + deadline = time.time() + max(0.0, timeout_s) + while True: + status, svc, text = await _k8s_request( + "GET", + f"/api/v1/namespaces/{K8S_NAMESPACE}/services/{service_name}", + ) + if status != 200 or not isinstance(svc, dict): + raise web.HTTPServiceUnavailable( + text=f"Failed to read Service '{service_name}': Status {status}, {_k8s_error_msg(svc, text)}" + ) + + host = _k8s_extract_loadbalancer_host(svc) + if host: + return host + + if time.time() >= deadline: + raise web.HTTPServiceUnavailable( + text=f"Timed out waiting for LoadBalancer address for Service '{service_name}'" + ) + + await asyncio.sleep(interval_s) + + +############################################################################### +# Endpoints for AppAPI to work with the Kubernetes API +############################################################################### + + +async def k8s_exapp_exists(request: web.Request): + payload = await _parse_json_payload(request, ExAppName) + deployment_name = payload.exapp_k8s_name + + status, data, text = await _k8s_request( + "GET", + f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments/{deployment_name}", + ) + if status == 200: + return web.json_response({"exists": True}) + if status == 404: + return web.json_response({"exists": False}) + LOGGER.error("Error checking deployment '%s' (status %s): %s", deployment_name, status, _k8s_error_msg(data, text)) + raise web.HTTPServiceUnavailable(text=f"Error checking deployment '{deployment_name}': Status {status}") + + +async def k8s_exapp_create(request: web.Request): + payload = await _parse_json_payload(request, CreateExAppPayload) + deployment_name = payload.exapp_k8s_name + pvc_name = payload.exapp_k8s_volume_name + + LOGGER.info("Creating K8s resources for '%s' (ns=%s).", payload.name, K8S_NAMESPACE) + + pvc_manifest: dict[str, Any] = { + "apiVersion": "v1", + "kind": "PersistentVolumeClaim", + "metadata": { + "name": pvc_name, + "labels": { + "app": deployment_name, + "app.kubernetes.io/name": deployment_name, + "app.kubernetes.io/component": "exapp", + }, + }, + "spec": { + "accessModes": ["ReadWriteOnce"], + "resources": {"requests": {"storage": K8S_DEFAULT_STORAGE_SIZE}}, + }, + } + if K8S_STORAGE_CLASS: + pvc_manifest["spec"]["storageClassName"] = K8S_STORAGE_CLASS + + status, data, text = await _k8s_request( + "POST", + f"/api/v1/namespaces/{K8S_NAMESPACE}/persistentvolumeclaims", + json_body=pvc_manifest, + ) + if status in (200, 201): + LOGGER.info("PVC '%s' created.", pvc_name) + elif status == 409: + LOGGER.info("PVC '%s' already exists.", pvc_name) + else: + LOGGER.error("Failed to create PVC '%s' (status %s): %s", pvc_name, status, _k8s_error_msg(data, text)) + raise web.HTTPServiceUnavailable(text=f"Failed to create PVC '{pvc_name}': Status {status}") + + deployment_manifest = _k8s_build_deployment_manifest(payload, replicas=0) + status, data, text = await _k8s_request( + "POST", + f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments", + json_body=deployment_manifest, + ) + if status in (200, 201): + LOGGER.info("Deployment '%s' created.", deployment_name) + return web.json_response({"name": deployment_name}, status=201) + if status == 409: + raise web.HTTPConflict(text=f"Deployment '{deployment_name}' already exists.") + LOGGER.error("Error creating deployment '%s' (status %s): %s", deployment_name, status, _k8s_error_msg(data, text)) + raise web.HTTPServiceUnavailable(text=f"Error creating deployment '{deployment_name}': Status {status}") + + +async def k8s_exapp_start(request: web.Request): + payload = await _parse_json_payload(request, ExAppName) + deployment_name = payload.exapp_k8s_name + + status, data, text = await _k8s_request( + "PATCH", + f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments/{deployment_name}", + json_body={"spec": {"replicas": 1}}, + content_type="application/strategic-merge-patch+json", + ) + if status in (200, 201): + LOGGER.info("Deployment '%s' scaled to 1.", deployment_name) + return web.HTTPNoContent() + if status == 404: + raise web.HTTPNotFound(text=f"Deployment '{deployment_name}' not found.") + LOGGER.error("Error starting '%s' (status %s): %s", deployment_name, status, _k8s_error_msg(data, text)) + raise web.HTTPServiceUnavailable(text=f"Error starting deployment '{deployment_name}': Status {status}") + + +async def k8s_exapp_stop(request: web.Request): + payload = await _parse_json_payload(request, ExAppName) + deployment_name = payload.exapp_k8s_name + + status, data, text = await _k8s_request( + "PATCH", + f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments/{deployment_name}", + json_body={"spec": {"replicas": 0}}, + content_type="application/strategic-merge-patch+json", + ) + if status in (200, 201): + LOGGER.info("Deployment '%s' scaled to 0.", deployment_name) + return web.HTTPNoContent() + if status == 404: + raise web.HTTPNotFound(text=f"Deployment '{deployment_name}' not found.") + LOGGER.error("Error stopping '%s' (status %s): %s", deployment_name, status, _k8s_error_msg(data, text)) + raise web.HTTPServiceUnavailable(text=f"Error stopping deployment '{deployment_name}': Status {status}") + + +async def k8s_exapp_wait_for_start(request: web.Request): + payload = await _parse_json_payload(request, ExAppName) + deployment_name = payload.exapp_k8s_name + label_selector = f"app={deployment_name}" + + # Startup timeout (seconds) applies only *after* the image has been pulled. While the image is being pulled + # we wait indefinitely (image sizes vary and we cannot predict download speed). + startup_timeout = 90.0 + sleep_interval = 1.0 + # Safety cap: abort even during image pull after this many seconds to avoid waiting forever if something + # is truly stuck (e.g. registry unreachable but not yet in ImagePullBackOff). + image_pull_max_wait = 3600.0 + + LOGGER.info( + "Waiting for Kubernetes pod(s) of deployment '%s' to become Ready " + "(namespace=%s, startup_timeout=%.0fs, image_pull_max_wait=%.0fs).", + deployment_name, + K8S_NAMESPACE, + startup_timeout, + image_pull_max_wait, + ) + + last_phase: str | None = None + last_reason: str | None = None + last_message: str | None = None + image_pulled = False + start_time = time.monotonic() + post_pull_start: float | None = None + + while True: + elapsed = time.monotonic() - start_time + + if image_pulled: + assert post_pull_start is not None + if time.monotonic() - post_pull_start > startup_timeout: + LOGGER.warning( + "Deployment '%s' did not become Ready within %.0fs after image pull.", + deployment_name, + startup_timeout, + ) + break + elif elapsed > image_pull_max_wait: + LOGGER.warning( + "Deployment '%s' image pull did not complete within %.0fs.", + deployment_name, + image_pull_max_wait, + ) + break + + status, data, text = await _k8s_request( + "GET", + f"/api/v1/namespaces/{K8S_NAMESPACE}/pods", + query={"labelSelector": label_selector}, + ) + if status != 200: + LOGGER.error( + "Error listing pods for '%s' (status %s): %s", deployment_name, status, _k8s_error_msg(data, text) + ) + raise web.HTTPServiceUnavailable( + text=f"Error listing pods for deployment '{deployment_name}': Status {status}" + ) + + items = (data or {}).get("items", []) if isinstance(data, dict) else [] + if not items: + LOGGER.debug( + "No pods yet for deployment '%s' (elapsed %.0fs).", + deployment_name, + elapsed, + ) + last_phase = "Pending" + else: + # Take the first pod; for single-replica deployments this is enough. + pod = items[0] + pod_status = pod.get("status", {}) + phase = pod_status.get("phase", "Unknown") + last_phase = phase + conditions = pod_status.get("conditions", []) + ready = any(c.get("type") == "Ready" and c.get("status") == "True" for c in conditions) + last_reason = pod_status.get("reason") + last_message = pod_status.get("message") + + # Inspect container statuses to detect image pull vs app startup. + container_statuses = pod_status.get("containerStatuses", []) + waiting_reason = None + if container_statuses: + cs = container_statuses[0] + waiting = cs.get("state", {}).get("waiting", {}) + waiting_reason = waiting.get("reason") + + LOGGER.debug( + "Pod status for '%s' (elapsed %.0fs): phase=%s, ready=%s, " + "waiting_reason=%s, reason=%s, message=%s", + deployment_name, + elapsed, + phase, + ready, + waiting_reason, + last_reason, + last_message, + ) + + if phase == "Running" and ready: + LOGGER.info("Deployment '%s' pod is Running and Ready.", deployment_name) + return web.json_response( + { + "started": True, + "status": "running", + "health": "ready", + "reason": last_reason, + "message": last_message, + } + ) + + if phase in ("Failed", "Unknown", "Succeeded"): + LOGGER.warning( + "Deployment '%s' pod is in phase '%s', treating as not successfully started.", + deployment_name, + phase, + ) + return web.json_response( + { + "started": False, + "status": phase, + "health": "not_ready", + "reason": last_reason, + "message": last_message, + } + ) + + # Fail fast on image pull errors. + if waiting_reason in ("ErrImagePull", "ImagePullBackOff", "InvalidImageName"): + wait_msg = container_statuses[0].get("state", {}).get("waiting", {}).get("message", "") + LOGGER.error( + "Deployment '%s' pod has image pull error: %s - %s", + deployment_name, + waiting_reason, + wait_msg, + ) + return web.json_response( + { + "started": False, + "status": waiting_reason, + "health": "image_error", + "reason": waiting_reason, + "message": wait_msg, + } + ) + + # Detect when image pull is done: once the container has a non-pull waiting reason (e.g. CrashLoopBackOff) + # or is running/terminated, the image has been pulled. + if not image_pulled: + # Still pulling: no container statuses yet, or waiting with ContainerCreating / PodInitializing. + still_pulling = ( + not container_statuses + or waiting_reason in ("ContainerCreating", "PodInitializing", None) + ) + if not still_pulling: + image_pulled = True + post_pull_start = time.monotonic() + LOGGER.info( + "Deployment '%s' image pull completed after %.0fs. " + "Starting startup countdown (%.0fs).", + deployment_name, + elapsed, + startup_timeout, + ) + + await asyncio.sleep(sleep_interval) + + return web.json_response( + { + "started": False, + "status": last_phase or "unknown", + "health": "timeout", + "reason": last_reason, + "message": last_message, + } + ) + + +async def k8s_exapp_remove(request: web.Request): + payload = await _parse_json_payload(request, RemoveExAppPayload) + deployment_name = payload.exapp_k8s_name + pvc_name = payload.exapp_k8s_volume_name + + LOGGER.info( + "Removing K8s resources for '%s' (ns=%s, remove_data=%s).", deployment_name, K8S_NAMESPACE, payload.remove_data + ) + + status, data, text = await _k8s_request( + "DELETE", f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments/{deployment_name}", + ) + if status not in (200, 202, 404): + LOGGER.error( + "Error removing deployment '%s' (status %s): %s", deployment_name, status, _k8s_error_msg(data, text) + ) + raise web.HTTPServiceUnavailable(text=f"Error removing deployment '{deployment_name}': Status {status}") + + if payload.remove_data: + status, data, text = await _k8s_request( + "DELETE", f"/api/v1/namespaces/{K8S_NAMESPACE}/persistentvolumeclaims/{pvc_name}", + ) + if status not in (200, 202, 404): + LOGGER.error("Error removing PVC '%s' (status %s): %s", pvc_name, status, _k8s_error_msg(data, text)) + raise web.HTTPServiceUnavailable(text=f"Error removing PVC '{pvc_name}': Status {status}") + + status, data, text = await _k8s_request( + "DELETE", f"/api/v1/namespaces/{K8S_NAMESPACE}/services/{deployment_name}", + ) + if status not in (200, 202, 404): + LOGGER.error("Error removing Service '%s' (status %s): %s", deployment_name, status, _k8s_error_msg(data, text)) + raise web.HTTPServiceUnavailable(text=f"Error removing Service '{deployment_name}': Status {status}") + return web.HTTPNoContent() + + +async def k8s_exapp_install_certificates(request: web.Request): + # K8s: certificates are handled via Secrets/volume mounts, this is a no-op. + await _parse_json_payload(request, InstallCertificatesPayload) + return web.HTTPNoContent() + + +async def k8s_exapp_expose(request: web.Request): + payload = await _parse_json_payload(request, ExposeExAppPayload) + app_id = payload.name.lower() + service_name = payload.exapp_k8s_name + + if payload.expose_type == "manual": + upstream_host = payload.upstream_host + upstream_port = int(payload.upstream_port or payload.port) + else: + _ensure_k8s_configured() + + type_map: dict[str, Literal["NodePort", "ClusterIP", "LoadBalancer"]] = { + "nodeport": "NodePort", "clusterip": "ClusterIP", "loadbalancer": "LoadBalancer", + } + desired_type = type_map.get(payload.expose_type) + if not desired_type: + raise web.HTTPBadRequest(text=f"Unknown expose_type '{payload.expose_type}'") + + service_manifest = _k8s_build_service_manifest(payload, desired_type) + status, data, text = await _k8s_request( + "POST", f"/api/v1/namespaces/{K8S_NAMESPACE}/services", json_body=service_manifest, + ) + if status not in (200, 201, 409): + LOGGER.error( + "Failed to create Service '%s' (status %s): %s", service_name, status, _k8s_error_msg(data, text) + ) + raise web.HTTPServiceUnavailable(text=f"Failed to create Service '{service_name}': Status {status}") + + status, svc, text = await _k8s_request( + "GET", f"/api/v1/namespaces/{K8S_NAMESPACE}/services/{service_name}", + ) + if status != 200 or not isinstance(svc, dict): + LOGGER.error("Failed to read Service '%s' (status %s): %s", service_name, status, _k8s_error_msg(svc, text)) + raise web.HTTPServiceUnavailable(text=f"Failed to read Service '{service_name}': Status {status}") + + if payload.expose_type == "nodeport": + upstream_port = _k8s_extract_nodeport(svc) + upstream_host = payload.upstream_host or await _k8s_pick_node_address( + preferred_type=payload.node_address_type, + node_name=payload.node_name, + label_selector=payload.node_label_selector, + ) + elif payload.expose_type == "clusterip": + upstream_port = _k8s_extract_service_port(svc) + upstream_host = payload.upstream_host or _k8s_service_dns_name(service_name, K8S_NAMESPACE) + else: # loadbalancer + upstream_port = _k8s_extract_service_port(svc) + upstream_host = payload.upstream_host or _k8s_extract_loadbalancer_host(svc) + if not upstream_host: + upstream_host = await _k8s_wait_for_loadbalancer_host( + service_name, timeout_s=payload.wait_timeout_seconds, interval_s=payload.wait_interval_seconds, + ) + + LOGGER.info("Expose '%s' (%s): upstream %s:%d", app_id, payload.expose_type, upstream_host, upstream_port) + try: + exapp_meta = await nc_get_exapp(app_id) + if not exapp_meta: + LOGGER.error("No ExApp metadata for '%s' in Nextcloud.", app_id) + raise web.HTTPNotFound(text=f"No ExApp metadata for '{app_id}'") + except web.HTTPException: + raise + except Exception as e: + LOGGER.exception("Failed to fetch ExApp metadata for '%s'", app_id) + raise web.HTTPServiceUnavailable(text=f"Failed to fetch metadata for '{app_id}'") from e + + exapp_meta.host = upstream_host + exapp_meta.port = int(upstream_port) + exapp_meta.resolved_host = "" + + async with EXAPP_CACHE_LOCK: + EXAPP_CACHE[app_id] = exapp_meta + + return web.json_response( + { + "appId": app_id, + "host": upstream_host, + "port": int(upstream_port), + "exposeType": payload.expose_type, + "serviceName": service_name, + "namespace": K8S_NAMESPACE, + } + ) + + ############################################################################### # HTTP Server Setup ############################################################################### @@ -1688,6 +2759,16 @@ def create_web_app() -> web.Application: app.router.add_post("/docker/exapp/wait_for_start", docker_exapp_wait_for_start) app.router.add_post("/docker/exapp/remove", docker_exapp_remove) app.router.add_post("/docker/exapp/install_certificates", docker_exapp_install_certificates) + + # Kubernetes APIs wrappers + app.router.add_post("/k8s/exapp/exists", k8s_exapp_exists) + app.router.add_post("/k8s/exapp/create", k8s_exapp_create) + app.router.add_post("/k8s/exapp/start", k8s_exapp_start) + app.router.add_post("/k8s/exapp/stop", k8s_exapp_stop) + app.router.add_post("/k8s/exapp/wait_for_start", k8s_exapp_wait_for_start) + app.router.add_post("/k8s/exapp/remove", k8s_exapp_remove) + app.router.add_post("/k8s/exapp/install_certificates", k8s_exapp_install_certificates) + app.router.add_post("/k8s/exapp/expose", k8s_exapp_expose) return app @@ -1708,6 +2789,9 @@ async def run_http_server(host="127.0.0.1", port=8200): async def main(): + # Ensure cluster-wide DNS for host aliases before starting servers. + await _k8s_ensure_coredns_host_aliases() + spoa_task = asyncio.create_task(SPOA_AGENT._run(host=SPOA_HOST, port=SPOA_PORT)) # noqa http_task = asyncio.create_task(run_http_server(host="127.0.0.1", port=8200))