From 8effc1d1eef1800e65529d3392cad57bcda7ad06 Mon Sep 17 00:00:00 2001 From: Ambient Code Date: Wed, 11 Feb 2026 18:10:04 +0000 Subject: [PATCH] feat: authenticate via public-api gateway instead of OpenShift API Replace direct OpenShift API authentication with HTTP requests to the public-api gateway service. This simplifies the MCP server by removing the oc CLI dependency and aligns with the platform's security model where the public-api is the single entry point for all clients. Changes: - client.py: Rewrite to use httpx for HTTP requests to public-api - settings.py: Update ClusterConfig to point to public-api URL - server.py: Reduce to 7 supported tools (list, get, delete sessions) - formatters.py: Remove unused formatters - pyproject.toml: Replace aiohttp with httpx, update description The public-api provides: - GET/POST/DELETE /v1/sessions endpoints - Bearer token auth via Authorization header - Project context via X-Ambient-Project header Co-Authored-By: Claude Opus 4.5 --- pyproject.toml | 10 +- src/mcp_acp/client.py | 1926 +++++-------------------------------- src/mcp_acp/formatters.py | 311 +----- src/mcp_acp/server.py | 831 +++------------- src/mcp_acp/settings.py | 39 +- 5 files changed, 418 insertions(+), 2699 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ec8af6c..81b33f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "mcp-acp" version = "0.1.3" -description = "MCP server for Ambient Code Platform (ACP) session management on OpenShift - list, delete, debug, and manage AgenticSession resources" +description = "MCP server for Ambient Code Platform (ACP) session management - list, delete, debug, and manage AgenticSession resources via the public API" readme = "README.md" requires-python = ">=3.12" authors = [ @@ -12,11 +12,9 @@ keywords = [ "acp", "ambient", "ambient-code-platform", - "ambient-container-platform", - "openshift", - "kubernetes", "agentic-session", - "agenticsession" + "agenticsession", + "public-api" ] classifiers = [ "Development Status :: 3 - Alpha", @@ -32,7 +30,7 @@ dependencies = [ "pydantic-settings>=2.0.0", "structlog>=25.0.0", "python-dotenv>=1.0.0", - "aiohttp>=3.8.0", + "httpx>=0.27.0", "pyyaml>=6.0", "python-dateutil>=2.8.0", ] diff --git a/src/mcp_acp/client.py b/src/mcp_acp/client.py index dc4cc7e..5840912 100644 --- a/src/mcp_acp/client.py +++ b/src/mcp_acp/client.py @@ -1,74 +1,54 @@ -"""ACP client wrapper for OpenShift CLI operations.""" +"""ACP client for Ambient Code Platform public API. -import asyncio -import json +This client communicates with the public-api gateway service which provides +a simplified REST API for managing AgenticSessions. +""" + +import os import re -import secrets -import subprocess -from collections.abc import Callable from datetime import datetime, timedelta -from pathlib import Path from typing import Any -import yaml +import httpx -from mcp_acp.settings import Settings, load_clusters_config, load_settings +from mcp_acp.settings import load_clusters_config, load_settings from utils.pylogger import get_python_logger -# Initialize structured logger logger = get_python_logger() class ACPClient: - """Client for interacting with ACP via OpenShift CLI. + """Client for interacting with Ambient Code Platform via public API. Attributes: settings: Global settings instance clusters_config: Cluster configuration instance - config: Raw cluster configuration (for backward compatibility) """ - # Security constants - ALLOWED_RESOURCE_TYPES = {"agenticsession", "pods", "event"} # Whitelist - MAX_BULK_ITEMS = 3 # Maximum items allowed in bulk operations - LABEL_PREFIX = "acp.ambient-code.ai/label-" # Label prefix for ACP labels - MAX_COMMAND_TIMEOUT = 120 # Maximum command timeout in seconds - MAX_LOG_LINES = 10000 # Maximum log lines to retrieve + MAX_BULK_ITEMS = 3 + DEFAULT_TIMEOUT = 30.0 - def __init__(self, config_path: str | None = None, settings: Settings | None = None): + def __init__(self, config_path: str | None = None, settings=None): """Initialize ACP client. Args: - config_path: Path to clusters.yaml config file (deprecated, use settings) + config_path: Path to clusters.yaml config file settings: Settings instance. If not provided, loads default settings. """ - # Load or use provided settings + from pathlib import Path + self.settings = settings or load_settings() - # Override config path if provided (for backward compatibility) if config_path: self.settings.config_path = Path(config_path) - # Load cluster configuration try: self.clusters_config = load_clusters_config(self.settings) except Exception as e: logger.error("cluster_config_load_failed", error=str(e)) raise - # Backward compatibility: expose raw config - self.config = { - "clusters": { - name: { - "server": cluster.server, - "default_project": cluster.default_project, - "description": cluster.description, - } - for name, cluster in self.clusters_config.clusters.items() - }, - "default_cluster": self.clusters_config.default_cluster, - } - self.config_path = str(self.settings.config_path) + self._http_client: httpx.AsyncClient | None = None logger.info( "acp_client_initialized", @@ -76,501 +56,164 @@ def __init__(self, config_path: str | None = None, settings: Settings | None = N default_cluster=self.clusters_config.default_cluster, ) - # Note: _load_config and _validate_config removed - now handled by Pydantic settings - - def _validate_input(self, value: str, field_name: str, max_length: int = 253) -> None: - """Validate input to prevent injection attacks. - - Args: - value: Value to validate - field_name: Field name for error messages - max_length: Maximum allowed length + def _get_cluster_config(self, cluster_name: str | None = None) -> dict[str, Any]: + """Get cluster configuration.""" + name = cluster_name or self.clusters_config.default_cluster + if not name: + raise ValueError("No cluster specified and no default_cluster configured") - Raises: - ValueError: If validation fails - """ - if not isinstance(value, str): - raise ValueError(f"{field_name} must be a string") - if len(value) > max_length: - raise ValueError(f"{field_name} exceeds maximum length of {max_length}") - # Validate Kubernetes naming conventions (DNS-1123 subdomain) - if not re.match(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$", value): - raise ValueError(f"{field_name} contains invalid characters. Must match: ^[a-z0-9]([-a-z0-9]*[a-z0-9])?$") + cluster = self.clusters_config.clusters.get(name) + if not cluster: + raise ValueError(f"Cluster '{name}' not found in configuration") - def _validate_bulk_operation(self, items: list[str], operation_name: str) -> None: - """Enforce 3-item limit for safety on bulk operations. + return { + "server": cluster.server, + "default_project": cluster.default_project, + "description": cluster.description, + "token": cluster.token, + } - Args: - items: List of items to operate on - operation_name: Operation name for error message + def _get_token(self, cluster_config: dict[str, Any]) -> str: + """Get authentication token for a cluster.""" + token = cluster_config.get("token") or os.getenv("ACP_TOKEN") - Raises: - ValueError: If item count exceeds limit - """ - if len(items) > self.MAX_BULK_ITEMS: + if not token: raise ValueError( - f"Bulk {operation_name} limited to {self.MAX_BULK_ITEMS} items for safety. " - f"You requested {len(items)} items. Split into multiple operations." + "No authentication token available. " + "Set 'token' in clusters.yaml or ACP_TOKEN environment variable." ) - async def _run_oc_command( - self, - args: list[str], - capture_output: bool = True, - parse_json: bool = False, - timeout: int | None = None, - ) -> subprocess.CompletedProcess | dict[str, Any]: - """Run an oc command asynchronously with security controls. - - Args: - args: Command arguments (will be validated) - capture_output: Whether to capture stdout/stderr - parse_json: If True, parse stdout as JSON and return dict - timeout: Command timeout in seconds (default: MAX_COMMAND_TIMEOUT) - - Returns: - CompletedProcess result or parsed JSON dict - - Raises: - asyncio.TimeoutError: If command exceeds timeout - ValueError: If arguments contain suspicious content - """ - # Security: Validate arguments don't contain shell metacharacters - for arg in args: - if not isinstance(arg, str): - raise ValueError(f"All arguments must be strings, got {type(arg)}") - # Detect potential command injection - if any(char in arg for char in [";", "|", "&", "$", "`", "\n", "\r"]): - raise ValueError(f"Argument contains suspicious characters: {arg}") - - cmd = ["oc"] + args - effective_timeout = timeout or self.MAX_COMMAND_TIMEOUT - - if capture_output: - try: - process = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - # Security: Prevent shell injection - ) - stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=effective_timeout) - result = subprocess.CompletedProcess( - args=cmd, - returncode=process.returncode or 0, - stdout=stdout, - stderr=stderr, - ) - - if parse_json and result.returncode == 0: - try: - return json.loads(result.stdout.decode()) - except json.JSONDecodeError as e: - raise ValueError(f"Failed to parse JSON response: {e}") from e - - return result - except TimeoutError: - # Kill the process if it times out - try: - process.kill() - await process.wait() - except Exception: - pass - raise TimeoutError(f"Command timed out after {effective_timeout}s") from None - else: - # For non-captured output, use subprocess.run with timeout - try: - result = await asyncio.wait_for( - asyncio.to_thread(subprocess.run, cmd, capture_output=False, timeout=effective_timeout), - timeout=effective_timeout + 5, # Extra buffer - ) - return result - except subprocess.TimeoutExpired: - raise TimeoutError(f"Command timed out after {effective_timeout}s") from None - - async def _get_resource_json(self, resource_type: str, name: str, namespace: str) -> dict[str, Any]: - """Get a Kubernetes resource as JSON dict. - - Args: - resource_type: Resource type (e.g., 'agenticsession') - name: Resource name - namespace: Namespace - - Returns: - Resource as JSON dict - - Raises: - ValueError: If inputs are invalid - Exception: If resource not found or command fails - """ - # Security: Validate inputs - if resource_type not in self.ALLOWED_RESOURCE_TYPES: - raise ValueError(f"Resource type '{resource_type}' not allowed") - self._validate_input(name, "resource name") - self._validate_input(namespace, "namespace") - - result = await self._run_oc_command(["get", resource_type, name, "-n", namespace, "-o", "json"]) - - if result.returncode != 0: - raise Exception(f"Failed to get {resource_type} '{name}': {result.stderr.decode()}") - - return json.loads(result.stdout.decode()) - - async def _list_resources_json( - self, resource_type: str, namespace: str, selector: str | None = None - ) -> list[dict[str, Any]]: - """List Kubernetes resources as JSON dicts. - - Args: - resource_type: Resource type (e.g., 'agenticsession') - namespace: Namespace - selector: Optional label selector - - Returns: - List of resources as JSON dicts - - Raises: - ValueError: If inputs are invalid - Exception: If command fails - """ - # Security: Validate inputs - if resource_type not in self.ALLOWED_RESOURCE_TYPES: - raise ValueError(f"Resource type '{resource_type}' not allowed") - self._validate_input(namespace, "namespace") - if selector and not re.match(r"^[a-zA-Z0-9=,_.\-/]+$", selector): - raise ValueError(f"Invalid label selector format: {selector}") - - args = ["get", resource_type, "-n", namespace, "-o", "json"] - if selector: - args.extend(["-l", selector]) - - result = await self._run_oc_command(args) - - if result.returncode != 0: - raise Exception(f"Failed to list {resource_type}: {result.stderr.decode()}") - - data = json.loads(result.stdout.decode()) - return data.get("items", []) - - async def _validate_session_for_dry_run(self, project: str, session: str, operation: str) -> dict[str, Any]: - """Validate session exists for dry-run and return session info. - - Args: - project: Project/namespace name - session: Session name - operation: Operation name for message (e.g., "delete", "restart") - - Returns: - Dict with dry_run response including session_info if found - """ - try: - session_data = await self._get_resource_json("agenticsession", session, project) - - return { - "dry_run": True, - "success": True, - "message": f"Would {operation} session '{session}' in project '{project}'", - "session_info": { - "name": session_data.get("metadata", {}).get("name"), - "status": session_data.get("status", {}).get("phase"), - "created": session_data.get("metadata", {}).get("creationTimestamp"), - "stopped_at": session_data.get("status", {}).get("stoppedAt"), - }, - } - except Exception: - return { - "dry_run": True, - "success": False, - "message": f"Session '{session}' not found in project '{project}'", - } - - async def _bulk_operation( - self, - project: str, - sessions: list[str], - operation_fn: Callable, - success_key: str, - dry_run: bool = False, - ) -> dict[str, Any]: - """Generic bulk operation handler. - - Args: - project: Project/namespace name - sessions: List of session names - operation_fn: Async function to call for each session - success_key: Key name for successful operations in response - dry_run: Preview mode - - Returns: - Standardized bulk operation response - """ - # Enforce 3-item limit - self._validate_bulk_operation(sessions, success_key) - - success = [] - failed = [] - dry_run_info = {"would_execute": [], "skipped": []} - - for session in sessions: - result = await operation_fn(project, session, dry_run=dry_run) - - if dry_run: - if result.get("success", True): - dry_run_info["would_execute"].append( - { - "session": session, - "info": result.get("session_info"), - } - ) - else: - dry_run_info["skipped"].append( - { - "session": session, - "reason": result.get("message"), - } - ) - else: - if result.get(success_key, result.get("success")): - success.append(session) - else: - failed.append({"session": session, "error": result.get("message")}) - - response = {success_key: success, "failed": failed} - if dry_run: - response["dry_run"] = True - response["dry_run_info"] = dry_run_info - - return response - - async def label_resource( - self, - resource_type: str, - name: str, - project: str, - labels: dict[str, str], - dry_run: bool = False, - ) -> dict[str, Any]: - """Add/update labels on any resource (generic, works for sessions/workspaces/etc). - - Args: - resource_type: Resource type (agenticsession, namespace, etc) - name: Resource name - project: Project/namespace name - labels: Label key-value pairs (e.g., {'env': 'prod'}) - dry_run: Preview mode - - Returns: - Dict with labeling status - """ - # Validate resource type - if resource_type not in self.ALLOWED_RESOURCE_TYPES: - raise ValueError(f"Resource type '{resource_type}' not allowed") - - # Simple validation + prefix (let K8s do heavy lifting) - k8s_labels = {} - for key, value in labels.items(): - # Basic format check - if not key.replace("-", "").replace("_", "").isalnum(): - raise ValueError(f"Invalid label key: {key}") - if not value.replace("-", "").replace("_", "").replace(".", "").isalnum(): - raise ValueError(f"Invalid label value: {value}") - if len(key) > 63 or len(value) > 63: - raise ValueError("Label key/value must be ≤63 characters") - - # Add prefix - k8s_labels[f"{self.LABEL_PREFIX}{key}"] = value - - if dry_run: - return { - "dry_run": True, - "resource": name, - "labels": labels, - "message": f"Would label {resource_type} '{name}'", - } - - # Apply with --overwrite (handles add & update) - label_args = [f"{k}={v}" for k, v in k8s_labels.items()] - result = await self._run_oc_command(["label", resource_type, name, "-n", project, "--overwrite"] + label_args) - - if result.returncode == 0: - return {"labeled": True, "resource": name, "labels": labels} - else: - return {"labeled": False, "message": result.stderr.decode()} + return token - async def unlabel_resource( - self, - resource_type: str, - name: str, - project: str, - label_keys: list[str], - dry_run: bool = False, - ) -> dict[str, Any]: - """Remove specific labels from a resource. - - Args: - resource_type: Resource type (agenticsession, namespace, etc) - name: Resource name - project: Project/namespace name - label_keys: List of label keys to remove (without prefix) - dry_run: Preview mode - - Returns: - Dict with unlabeling status - """ - # Validate resource type - if resource_type not in self.ALLOWED_RESOURCE_TYPES: - raise ValueError(f"Resource type '{resource_type}' not allowed") - - # Build prefixed keys - prefixed_keys = [f"{self.LABEL_PREFIX}{key}" for key in label_keys] - - if dry_run: - return { - "dry_run": True, - "resource": name, - "label_keys": label_keys, - "message": f"Would remove labels from {resource_type} '{name}'", - } - - # Remove labels using oc label with '-' suffix - label_args = [f"{k}-" for k in prefixed_keys] - result = await self._run_oc_command(["label", resource_type, name, "-n", project] + label_args) - - if result.returncode == 0: - return {"unlabeled": True, "resource": name, "removed_keys": label_keys} - else: - return {"unlabeled": False, "message": result.stderr.decode()} + async def _get_http_client(self) -> httpx.AsyncClient: + """Get or create HTTP client.""" + if self._http_client is None or self._http_client.is_closed: + self._http_client = httpx.AsyncClient( + timeout=httpx.Timeout(self.DEFAULT_TIMEOUT), + follow_redirects=True, + ) + return self._http_client - async def bulk_label_resources( + async def _request( self, - resource_type: str, - names: list[str], + method: str, + path: str, project: str, - labels: dict[str, str], - dry_run: bool = False, + cluster_name: str | None = None, + json_data: dict[str, Any] | None = None, + params: dict[str, Any] | None = None, ) -> dict[str, Any]: - """Label multiple resources with same labels (max 3). - - Args: - resource_type: Resource type - names: List of resource names - project: Project/namespace name - labels: Label key-value pairs - dry_run: Preview mode - - Returns: - Dict with bulk labeling results - """ - # Enforce limit - self._validate_bulk_operation(names, "label") - - success = [] - failed = [] + """Make an HTTP request to the public API.""" + cluster_config = self._get_cluster_config(cluster_name) + token = self._get_token(cluster_config) + base_url = cluster_config["server"] + + url = f"{base_url}{path}" + headers = { + "Authorization": f"Bearer {token}", + "X-Ambient-Project": project, + "Content-Type": "application/json", + "Accept": "application/json", + } - for name in names: - result = await self.label_resource(resource_type, name, project, labels, dry_run) - if result.get("labeled", result.get("success")): - success.append(name) - else: - failed.append({"resource": name, "error": result.get("message")}) + client = await self._get_http_client() - return { - "labeled": success, - "failed": failed, - "dry_run": dry_run, - } + try: + response = await client.request( + method=method, + url=url, + headers=headers, + json=json_data, + params=params, + ) - async def bulk_unlabel_resources( - self, - resource_type: str, - names: list[str], - project: str, - label_keys: list[str], - dry_run: bool = False, - ) -> dict[str, Any]: - """Remove labels from multiple resources (max 3). + if response.status_code >= 400: + try: + error_data = response.json() + error_msg = error_data.get("error", f"HTTP {response.status_code}") + except Exception: + error_msg = f"HTTP {response.status_code}: {response.text}" + + logger.warning( + "api_request_failed", + method=method, + path=path, + status_code=response.status_code, + error=error_msg, + ) + raise ValueError(error_msg) - Args: - resource_type: Resource type - names: List of resource names - project: Project/namespace name - label_keys: List of label keys to remove - dry_run: Preview mode + if response.status_code == 204: + return {"success": True} - Returns: - Dict with bulk unlabeling results - """ - # Enforce limit - self._validate_bulk_operation(names, "unlabel") + return response.json() - success = [] - failed = [] + except httpx.TimeoutException as e: + logger.error("api_request_timeout", method=method, path=path, error=str(e)) + raise TimeoutError(f"Request timed out: {path}") from e + except httpx.RequestError as e: + logger.error("api_request_error", method=method, path=path, error=str(e)) + raise ValueError(f"Request failed: {str(e)}") from e - for name in names: - result = await self.unlabel_resource(resource_type, name, project, label_keys, dry_run) - if result.get("unlabeled", result.get("success")): - success.append(name) - else: - failed.append({"resource": name, "error": result.get("message")}) + def _validate_input(self, value: str, field_name: str, max_length: int = 253) -> None: + """Validate input to prevent injection attacks.""" + if not isinstance(value, str): + raise ValueError(f"{field_name} must be a string") + if len(value) > max_length: + raise ValueError(f"{field_name} exceeds maximum length of {max_length}") + if not re.match(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$", value): + raise ValueError( + f"{field_name} contains invalid characters. Must match DNS-1123 format." + ) - return { - "unlabeled": success, - "failed": failed, - "dry_run": dry_run, - } + def _validate_bulk_operation(self, items: list[str], operation_name: str) -> None: + """Enforce item limit for bulk operations.""" + if len(items) > self.MAX_BULK_ITEMS: + raise ValueError( + f"Bulk {operation_name} limited to {self.MAX_BULK_ITEMS} items. " + f"You requested {len(items)}. Split into multiple operations." + ) async def list_sessions( self, project: str, status: str | None = None, - has_display_name: bool | None = None, older_than: str | None = None, sort_by: str | None = None, limit: int | None = None, - label_selector: str | None = None, ) -> dict[str, Any]: - """List sessions with enhanced filtering. + """List sessions with filtering. Args: project: Project/namespace name status: Filter by status (running, stopped, creating, failed) - has_display_name: Filter by display name presence older_than: Filter by age (e.g., "7d", "24h") sort_by: Sort field (created, stopped, name) limit: Maximum results - label_selector: Kubernetes label selector (e.g., 'acp.ambient-code.ai/label-env=prod') - - Returns: - Dict with sessions list and metadata """ - sessions = await self._list_resources_json("agenticsession", project, selector=label_selector) + self._validate_input(project, "project") + + response = await self._request("GET", "/v1/sessions", project) + sessions = response.get("items", []) - # Build filter predicates filters = [] filters_applied = {} if status: - filters.append(lambda s: s.get("status", {}).get("phase", "").lower() == status.lower()) + filters.append(lambda s: s.get("status", "").lower() == status.lower()) filters_applied["status"] = status - if has_display_name is not None: - filters.append(lambda s: bool(s.get("spec", {}).get("displayName")) == has_display_name) - filters_applied["has_display_name"] = has_display_name - if older_than: cutoff_time = self._parse_time_delta(older_than) - filters.append(lambda s: self._is_older_than(s.get("metadata", {}).get("creationTimestamp"), cutoff_time)) + filters.append(lambda s: self._is_older_than(s.get("createdAt"), cutoff_time)) filters_applied["older_than"] = older_than - # Single-pass filter filtered = [s for s in sessions if all(f(s) for f in filters)] - # Sort if sort_by: filtered = self._sort_sessions(filtered, sort_by) filters_applied["sort_by"] = sort_by - # Limit if limit and limit > 0: filtered = filtered[:limit] filters_applied["limit"] = limit @@ -581,42 +224,12 @@ async def list_sessions( "filters_applied": filters_applied, } - async def list_sessions_by_user_labels( - self, - project: str, - labels: dict[str, str], - **kwargs, - ) -> dict[str, Any]: - """List sessions by user-friendly labels (convenience wrapper). - - Args: - project: Project/namespace name - labels: User-friendly label key-value pairs - **kwargs: Additional arguments passed to list_sessions - - Returns: - Dict with sessions list - """ - # Build K8s label selector from user labels - label_parts = [f"{self.LABEL_PREFIX}{k}={v}" for k, v in labels.items()] - label_selector = ",".join(label_parts) - - return await self.list_sessions(project=project, label_selector=label_selector, **kwargs) - def _sort_sessions(self, sessions: list[dict], sort_by: str) -> list[dict]: - """Sort sessions by field. - - Args: - sessions: List of session dicts - sort_by: Sort field (created, stopped, name) - - Returns: - Sorted list - """ + """Sort sessions by field.""" sort_keys = { - "created": lambda s: s.get("metadata", {}).get("creationTimestamp", ""), - "stopped": lambda s: s.get("status", {}).get("stoppedAt", ""), - "name": lambda s: s.get("metadata", {}).get("name", ""), + "created": lambda s: s.get("createdAt", ""), + "stopped": lambda s: s.get("completedAt", ""), + "name": lambda s: s.get("id", ""), } key_fn = sort_keys.get(sort_by) @@ -625,1261 +238,190 @@ def _sort_sessions(self, sessions: list[dict], sort_by: str) -> list[dict]: return sessions def _parse_time_delta(self, time_str: str) -> datetime: - """Parse time delta string (e.g., '7d', '24h') to datetime. - - Args: - time_str: Time delta string - - Returns: - Datetime representing the cutoff time - """ + """Parse time delta string to datetime.""" match = re.match(r"(\d+)([dhm])", time_str.lower()) if not match: - raise ValueError(f"Invalid time format: {time_str}. Use format like '7d', '24h', '30m'") + raise ValueError(f"Invalid time format: {time_str}. Use '7d', '24h', or '30m'") value, unit = int(match.group(1)), match.group(2) now = datetime.utcnow() - if unit == "d": - return now - timedelta(days=value) - elif unit == "h": - return now - timedelta(hours=value) - elif unit == "m": - return now - timedelta(minutes=value) - - raise ValueError(f"Unknown time unit: {unit}") + deltas = {"d": timedelta(days=value), "h": timedelta(hours=value), "m": timedelta(minutes=value)} + return now - deltas[unit] def _is_older_than(self, timestamp_str: str | None, cutoff: datetime) -> bool: - """Check if timestamp is older than cutoff. - - Args: - timestamp_str: ISO format timestamp string - cutoff: Cutoff datetime - - Returns: - True if older than cutoff - """ + """Check if timestamp is older than cutoff.""" if not timestamp_str: return False - - # Parse ISO format timestamp timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) return timestamp.replace(tzinfo=None) < cutoff - async def delete_session(self, project: str, session: str, dry_run: bool = False) -> dict[str, Any]: - """Delete a session. + async def get_session(self, project: str, session: str) -> dict[str, Any]: + """Get a session by ID. Args: project: Project/namespace name - session: Session name - dry_run: Preview without deleting - - Returns: - Dict with deletion status + session: Session ID """ - if dry_run: - return await self._validate_session_for_dry_run(project, session, "delete") - - result = await self._run_oc_command(["delete", "agenticsession", session, "-n", project]) - - if result.returncode != 0: - return { - "deleted": False, - "message": f"Failed to delete session: {result.stderr.decode()}", - } + self._validate_input(project, "project") + self._validate_input(session, "session") - return { - "deleted": True, - "message": f"Successfully deleted session '{session}' from project '{project}'", - } + return await self._request("GET", f"/v1/sessions/{session}", project) - async def restart_session(self, project: str, session: str, dry_run: bool = False) -> dict[str, Any]: - """Restart a stopped session. + async def delete_session(self, project: str, session: str, dry_run: bool = False) -> dict[str, Any]: + """Delete a session. Args: project: Project/namespace name session: Session name - dry_run: Preview without restarting - - Returns: - Dict with restart status + dry_run: Preview without deleting """ - try: - session_data = await self._get_resource_json("agenticsession", session, project) - current_status = session_data.get("status", {}).get("phase", "unknown") + self._validate_input(project, "project") + self._validate_input(session, "session") - if dry_run: + if dry_run: + try: + session_data = await self._request("GET", f"/v1/sessions/{session}", project) return { - "status": current_status, "dry_run": True, "success": True, - "message": f"Would restart session '{session}' (current status: {current_status})", + "message": f"Would delete session '{session}' in project '{project}'", "session_info": { - "name": session_data.get("metadata", {}).get("name"), - "current_status": current_status, - "stopped_at": session_data.get("status", {}).get("stoppedAt"), + "name": session_data.get("id"), + "status": session_data.get("status"), + "created": session_data.get("createdAt"), }, } - - # Restart by patching the stopped field to false - patch = {"spec": {"stopped": False}} - result = await self._run_oc_command( - [ - "patch", - "agenticsession", - session, - "-n", - project, - "--type=merge", - "-p", - json.dumps(patch), - ] - ) - - if result.returncode != 0: + except ValueError: return { - "status": "error", - "message": f"Failed to restart session: {result.stderr.decode()}", + "dry_run": True, + "success": False, + "message": f"Session '{session}' not found in project '{project}'", } + try: + await self._request("DELETE", f"/v1/sessions/{session}", project) return { - "status": "restarting", - "message": f"Successfully restarted session '{session}' in project '{project}'", + "deleted": True, + "message": f"Successfully deleted session '{session}' from project '{project}'", + } + except ValueError as e: + return { + "deleted": False, + "message": f"Failed to delete session: {str(e)}", } - except Exception as e: - return {"status": "error", "message": str(e)} - async def bulk_delete_sessions(self, project: str, sessions: list[str], dry_run: bool = False) -> dict[str, Any]: - """Delete multiple sessions. + async def bulk_delete_sessions( + self, project: str, sessions: list[str], dry_run: bool = False + ) -> dict[str, Any]: + """Delete multiple sessions (max 3). Args: project: Project/namespace name sessions: List of session names dry_run: Preview without deleting - - Returns: - Dict with deletion results """ - return await self._bulk_operation(project, sessions, self.delete_session, "deleted", dry_run) - - async def bulk_stop_sessions(self, project: str, sessions: list[str], dry_run: bool = False) -> dict[str, Any]: - """Stop multiple running sessions. - - Args: - project: Project/namespace name - sessions: List of session names - dry_run: Preview without stopping + self._validate_bulk_operation(sessions, "delete") - Returns: - Dict with stop results - """ + success = [] + failed = [] + dry_run_info = {"would_execute": [], "skipped": []} - async def stop_session(project: str, session: str, dry_run: bool = False) -> dict[str, Any]: - """Internal stop session helper.""" - try: - session_data = await self._get_resource_json("agenticsession", session, project) - current_status = session_data.get("status", {}).get("phase") - - if dry_run: - return { - "dry_run": True, - "success": current_status == "running", - "message": f"Session status: {current_status}", - "session_info": { - "name": session, - "status": current_status, - }, - } - - # Stop the session - patch = {"spec": {"stopped": True}} - result = await self._run_oc_command( - [ - "patch", - "agenticsession", - session, - "-n", - project, - "--type=merge", - "-p", - json.dumps(patch), - ] - ) + for session in sessions: + result = await self.delete_session(project, session, dry_run=dry_run) - if result.returncode == 0: - return {"stopped": True, "message": "Success"} + if dry_run: + if result.get("success", True): + dry_run_info["would_execute"].append({ + "session": session, + "info": result.get("session_info"), + }) + else: + dry_run_info["skipped"].append({ + "session": session, + "reason": result.get("message"), + }) + else: + if result.get("deleted"): + success.append(session) else: - return { - "stopped": False, - "message": result.stderr.decode(), - } - except Exception as e: - return {"stopped": False, "success": False, "message": str(e)} + failed.append({"session": session, "error": result.get("message")}) - return await self._bulk_operation(project, sessions, stop_session, "stopped", dry_run) + response = {"deleted": success, "failed": failed} + if dry_run: + response["dry_run"] = True + response["dry_run_info"] = dry_run_info - async def bulk_restart_sessions(self, project: str, sessions: list[str], dry_run: bool = False) -> dict[str, Any]: - """Restart multiple stopped sessions (max 3). + return response - Args: - project: Project/namespace name - sessions: List of session names - dry_run: Preview mode + def list_clusters(self) -> dict[str, Any]: + """List configured clusters.""" + clusters = [] + default_cluster = self.clusters_config.default_cluster - Returns: - Dict with restart results - """ - # Enforce limit - self._validate_bulk_operation(sessions, "restart") - - success = [] - failed = [] - - for session in sessions: - result = await self.restart_session(project, session, dry_run) - if result.get("status") == "restarting" or result.get("success"): - success.append(session) - else: - failed.append({"session": session, "error": result.get("message")}) - - return { - "restarted": success, - "failed": failed, - "dry_run": dry_run, - } - - async def bulk_delete_sessions_by_label( - self, - project: str, - labels: dict[str, str], - dry_run: bool = False, - ) -> dict[str, Any]: - """Delete sessions matching label selector (max 3). - - Args: - project: Project/namespace name - labels: Label key-value pairs - dry_run: Preview mode - - Returns: - Dict with deletion results - """ - # Get sessions by label - result = await self.list_sessions_by_user_labels(project, labels) - sessions = result.get("sessions", []) - - if not sessions: - return { - "deleted": [], - "failed": [], - "message": f"No sessions found with labels {labels}", - } - - session_names = [s["metadata"]["name"] for s in sessions] - - # Early validation with helpful error - if len(session_names) > self.MAX_BULK_ITEMS: - raise ValueError( - f"Label selector matches {len(session_names)} sessions. " - f"Max {self.MAX_BULK_ITEMS} allowed. Refine your labels to be more specific." - ) - - # Enhanced dry-run output - if dry_run: - return { - "dry_run": True, - "matched_sessions": session_names, - "matched_count": len(session_names), - "label_selector": ",".join([f"{self.LABEL_PREFIX}{k}={v}" for k, v in labels.items()]), - "message": f"Would delete {len(session_names)} sessions. Review matched_sessions before confirming.", - } - - # Use existing bulk_delete_sessions - return await self.bulk_delete_sessions(project, session_names, dry_run=dry_run) - - async def bulk_stop_sessions_by_label( - self, - project: str, - labels: dict[str, str], - dry_run: bool = False, - ) -> dict[str, Any]: - """Stop sessions matching label selector (max 3). - - Args: - project: Project/namespace name - labels: Label key-value pairs - dry_run: Preview mode - - Returns: - Dict with stop results - """ - # Get sessions by label - result = await self.list_sessions_by_user_labels(project, labels) - sessions = result.get("sessions", []) - - if not sessions: - return { - "stopped": [], - "failed": [], - "message": f"No sessions found with labels {labels}", - } - - session_names = [s["metadata"]["name"] for s in sessions] - - # Early validation with helpful error - if len(session_names) > self.MAX_BULK_ITEMS: - raise ValueError( - f"Label selector matches {len(session_names)} sessions. " - f"Max {self.MAX_BULK_ITEMS} allowed. Refine your labels to be more specific." - ) - - # Enhanced dry-run output - if dry_run: - return { - "dry_run": True, - "matched_sessions": session_names, - "matched_count": len(session_names), - "label_selector": ",".join([f"{self.LABEL_PREFIX}{k}={v}" for k, v in labels.items()]), - "message": f"Would stop {len(session_names)} sessions. Review matched_sessions before confirming.", - } - - # Use existing bulk_stop_sessions - return await self.bulk_stop_sessions(project, session_names, dry_run=dry_run) - - async def bulk_restart_sessions_by_label( - self, - project: str, - labels: dict[str, str], - dry_run: bool = False, - ) -> dict[str, Any]: - """Restart sessions matching label selector (max 3). - - Args: - project: Project/namespace name - labels: Label key-value pairs - dry_run: Preview mode - - Returns: - Dict with restart results - """ - # Get sessions by label - result = await self.list_sessions_by_user_labels(project, labels) - sessions = result.get("sessions", []) - - if not sessions: - return { - "restarted": [], - "failed": [], - "message": f"No sessions found with labels {labels}", - } - - session_names = [s["metadata"]["name"] for s in sessions] - - # Early validation with helpful error - if len(session_names) > self.MAX_BULK_ITEMS: - raise ValueError( - f"Label selector matches {len(session_names)} sessions. " - f"Max {self.MAX_BULK_ITEMS} allowed. Refine your labels to be more specific." - ) - - # Enhanced dry-run output - if dry_run: - return { - "dry_run": True, - "matched_sessions": session_names, - "matched_count": len(session_names), - "label_selector": ",".join([f"{self.LABEL_PREFIX}{k}={v}" for k, v in labels.items()]), - "message": f"Would restart {len(session_names)} sessions. Review matched_sessions before confirming.", - } - - # Use existing bulk_restart_sessions - return await self.bulk_restart_sessions(project, session_names, dry_run=dry_run) - - async def get_session_logs( - self, - project: str, - session: str, - container: str | None = None, - tail_lines: int | None = None, - ) -> dict[str, Any]: - """Get logs for a session. - - Args: - project: Project/namespace name - session: Session name - container: Container name (optional) - tail_lines: Number of lines to retrieve - - Returns: - Dict with logs - """ - # Security: Validate inputs - try: - self._validate_input(project, "project") - self._validate_input(session, "session") - if container: - # Container names have slightly different naming rules - if not re.match(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$", container): - raise ValueError(f"Invalid container name: {container}") - # Security: Limit tail_lines to prevent DoS - if tail_lines and (tail_lines < 1 or tail_lines > self.MAX_LOG_LINES): - raise ValueError(f"tail_lines must be between 1 and {self.MAX_LOG_LINES}") - - # Find the pod for this session - pods = await self._list_resources_json("pods", project, selector=f"agenticsession={session}") - - if not pods: - return {"logs": "", "error": f"No pods found for session '{session}'"} - - pod_name = pods[0].get("metadata", {}).get("name") - - # Build logs command - logs_args = ["logs", pod_name, "-n", project] - if container: - logs_args.extend(["-c", container]) - if tail_lines: - logs_args.extend(["--tail", str(tail_lines)]) - else: - # Default limit to prevent memory exhaustion - logs_args.extend(["--tail", str(1000)]) - - result = await self._run_oc_command(logs_args) - - if result.returncode != 0: - return { - "logs": "", - "error": f"Failed to retrieve logs: {result.stderr.decode()}", - } - - return { - "logs": result.stdout.decode(), - "container": container or "default", - "lines": len(result.stdout.decode().split("\n")), - } - except ValueError as e: - return {"logs": "", "error": str(e)} - except Exception as e: - return {"logs": "", "error": f"Unexpected error: {str(e)}"} - - def list_clusters(self) -> dict[str, Any]: - """List configured clusters. - - Returns: - Dict with clusters list - """ - clusters = [] - config = self.config - default_cluster = config.get("default_cluster") - - for name, cluster_config in config.get("clusters", {}).items(): - clusters.append( - { - "name": name, - "server": cluster_config.get("server"), - "description": cluster_config.get("description", ""), - "default_project": cluster_config.get("default_project"), - "is_default": name == default_cluster, - } - ) + for name, cluster in self.clusters_config.clusters.items(): + clusters.append({ + "name": name, + "server": cluster.server, + "description": cluster.description or "", + "default_project": cluster.default_project, + "is_default": name == default_cluster, + }) return {"clusters": clusters, "default_cluster": default_cluster} async def whoami(self) -> dict[str, Any]: - """Get current user and cluster information. - - Returns: - Dict with user info - """ - # Get current user - user_result = await self._run_oc_command(["whoami"]) - user = user_result.stdout.decode().strip() if user_result.returncode == 0 else "unknown" - - # Get current server - server_result = await self._run_oc_command(["whoami", "--show-server"]) - server = server_result.stdout.decode().strip() if server_result.returncode == 0 else "unknown" - - # Get current project - project_result = await self._run_oc_command(["project", "-q"]) - project = project_result.stdout.decode().strip() if project_result.returncode == 0 else "unknown" - - # Get token info - token_result = await self._run_oc_command(["whoami", "-t"]) - token_valid = token_result.returncode == 0 - - # Try to get token expiry (if available) - token_expires = None - if token_valid: - # Get token and decode to check expiry - # Note: token variable intentionally unused - for future enhancement - try: - # Try to get token info from oc - token_info_result = await self._run_oc_command(["whoami", "--show-token"]) - if token_info_result.returncode == 0: - # Note: OpenShift doesn't provide expiry via CLI easily - # This is a placeholder for future enhancement - token_expires = None - except Exception: - pass - - # Get current cluster name (if available from config) - cluster = "unknown" - cluster_config_data = None - for name, cluster_config in self.config.get("clusters", {}).items(): - if cluster_config.get("server") == server: - cluster = name - cluster_config_data = cluster_config - break - - # Prefer default_project from config over current oc project - # This ensures we use the configured project even if oc is set to a different one - if cluster_config_data and cluster_config_data.get("default_project"): - project = cluster_config_data.get("default_project") - - return { - "user": user, - "cluster": cluster, - "server": server, - "project": project, - "token_expires": token_expires, - "token_valid": token_valid, - "authenticated": user != "unknown" and server != "unknown", - } - - # P2 Feature: Clone Session - async def clone_session( - self, project: str, source_session: str, new_display_name: str, dry_run: bool = False - ) -> dict[str, Any]: - """Clone a session with its configuration. - - Args: - project: Project/namespace name - source_session: Source session name to clone - new_display_name: Display name for new session - dry_run: Preview without creating - - Returns: - Dict with cloned session info - """ + """Get current configuration status.""" try: - # Get source session - source_data = await self._get_resource_json("agenticsession", source_session, project) - - if dry_run: - return { - "dry_run": True, - "success": True, - "message": f"Would clone session '{source_session}' with display name '{new_display_name}'", - "source_info": { - "name": source_data.get("metadata", {}).get("name"), - "display_name": source_data.get("spec", {}).get("displayName"), - "repos": source_data.get("spec", {}).get("repos", []), - "workflow": source_data.get("spec", {}).get("workflow"), - }, - } - - # Create new session from source spec - new_spec = source_data.get("spec", {}).copy() - new_spec["displayName"] = new_display_name - new_spec["stopped"] = False # Start new session as running - - # Create session manifest - manifest = { - "apiVersion": "vteam.ambient-code/v1alpha1", - "kind": "AgenticSession", - "metadata": { - "generateName": f"{source_session}-clone-", - "namespace": project, - }, - "spec": new_spec, - } + cluster_config = self._get_cluster_config() + cluster_name = self.clusters_config.default_cluster or "unknown" - # Apply manifest using secure temporary file - import os - import tempfile - - # Security: Use secure temp file with proper permissions (0600) - fd, manifest_file = tempfile.mkstemp(suffix=".yaml", prefix=f"acp-clone-{secrets.token_hex(8)}-") try: - # Write to file descriptor with secure permissions - with os.fdopen(fd, "w") as f: - yaml.dump(manifest, f) - - result = await self._run_oc_command(["create", "-f", manifest_file, "-o", "json"]) - - if result.returncode != 0: - return { - "cloned": False, - "message": f"Failed to clone session: {result.stderr.decode()}", - } - - created_data = json.loads(result.stdout.decode()) - new_session_name = created_data.get("metadata", {}).get("name") - - return { - "cloned": True, - "session": new_session_name, - "message": f"Successfully cloned session '{source_session}' to '{new_session_name}'", - } - finally: - # Ensure cleanup even if operation fails - try: - os.unlink(manifest_file) - except OSError: - pass - - except Exception as e: - return {"cloned": False, "message": str(e)} - - # P2 Feature: Get Session Transcript - async def get_session_transcript(self, project: str, session: str, format: str = "json") -> dict[str, Any]: - """Get session transcript/conversation history. - - Args: - project: Project/namespace name - session: Session name - format: Output format ("json" or "markdown") - - Returns: - Dict with transcript data - """ - try: - session_data = await self._get_resource_json("agenticsession", session, project) - - # Get events which contain the conversation - # Note: events variable intentionally unused - for future enhancement - # events = await self._list_resources_json( - # "event", project, selector=f"involvedObject.name={session}" - # ) - - # Extract transcript from session status if available - transcript_data = session_data.get("status", {}).get("transcript") or [] - - if format == "markdown": - # Convert to markdown format - markdown = f"# Session Transcript: {session}\n\n" - for idx, entry in enumerate(transcript_data): - role = entry.get("role", "unknown") - content = entry.get("content", "") - timestamp = entry.get("timestamp", "") - markdown += f"## Message {idx + 1} - {role}\n" - if timestamp: - markdown += f"*{timestamp}*\n\n" - markdown += f"{content}\n\n" - markdown += "---\n\n" - - return { - "transcript": markdown, - "format": "markdown", - "message_count": len(transcript_data), - } - else: - # Return as JSON - return { - "transcript": transcript_data, - "format": "json", - "message_count": len(transcript_data), - } - - except Exception as e: - return {"transcript": None, "error": str(e)} - - # P2 Feature: Update Session - async def update_session( - self, - project: str, - session: str, - display_name: str | None = None, - timeout: int | None = None, - dry_run: bool = False, - ) -> dict[str, Any]: - """Update session metadata. - - Args: - project: Project/namespace name - session: Session name - display_name: New display name - timeout: New timeout in seconds - dry_run: Preview without updating + self._get_token(cluster_config) + token_valid = True + except ValueError: + token_valid = False - Returns: - Dict with update status - """ - try: - session_data = await self._get_resource_json("agenticsession", session, project) - - if dry_run: - updates = {} - if display_name: - updates["displayName"] = display_name - if timeout: - updates["timeout"] = timeout - - return { - "dry_run": True, - "success": True, - "message": f"Would update session '{session}'", - "updates": updates, - "current": { - "displayName": session_data.get("spec", {}).get("displayName"), - "timeout": session_data.get("spec", {}).get("timeout"), - }, - } - - # Build patch - patch = {"spec": {}} - if display_name: - patch["spec"]["displayName"] = display_name - if timeout: - patch["spec"]["timeout"] = timeout - - if not patch["spec"]: - return {"updated": False, "message": "No updates specified"} - - result = await self._run_oc_command( - [ - "patch", - "agenticsession", - session, - "-n", - project, - "--type=merge", - "-p", - json.dumps(patch), - "-o", - "json", - ] - ) - - if result.returncode != 0: - return { - "updated": False, - "message": f"Failed to update session: {result.stderr.decode()}", - } - - updated_data = json.loads(result.stdout.decode()) - - return { - "updated": True, - "session": updated_data, - "message": f"Successfully updated session '{session}'", - } - - except Exception as e: - return {"updated": False, "message": str(e)} - - # P2 Feature: Export Session - async def export_session(self, project: str, session: str) -> dict[str, Any]: - """Export session configuration and transcript. - - Args: - project: Project/namespace name - session: Session name - - Returns: - Dict with exported session data - """ - try: - session_data = await self._get_resource_json("agenticsession", session, project) - - # Get transcript - transcript_result = await self.get_session_transcript(project, session, format="json") - - export_data = { - "config": { - "name": session_data.get("metadata", {}).get("name"), - "displayName": session_data.get("spec", {}).get("displayName"), - "repos": session_data.get("spec", {}).get("repos", []), - "workflow": session_data.get("spec", {}).get("workflow"), - "llmConfig": session_data.get("spec", {}).get("llmConfig", {}), - }, - "transcript": transcript_result.get("transcript", []), - "metadata": { - "created": session_data.get("metadata", {}).get("creationTimestamp"), - "status": session_data.get("status", {}).get("phase"), - "stoppedAt": session_data.get("status", {}).get("stoppedAt"), - "messageCount": transcript_result.get("message_count", 0), - }, - } - - return { - "exported": True, - "data": export_data, - "message": f"Successfully exported session '{session}'", - } - - except Exception as e: - return {"exported": False, "error": str(e)} - - # P3 Feature: Get Session Metrics - async def get_session_metrics(self, project: str, session: str) -> dict[str, Any]: - """Get session metrics and statistics. - - Args: - project: Project/namespace name - session: Session name - - Returns: - Dict with session metrics - """ - try: - session_data = await self._get_resource_json("agenticsession", session, project) - - # Get transcript for analysis - transcript_result = await self.get_session_transcript(project, session, format="json") - transcript = transcript_result.get("transcript") or [] - - # Calculate metrics - token_count = 0 - message_count = len(transcript) if transcript else 0 - tool_calls = {} - - for entry in transcript: - # Count tokens (approximate) - content = entry.get("content", "") - token_count += len(content.split()) * 1.3 # Rough estimate - - # Count tool calls - if "tool_calls" in entry: - for tool_call in entry.get("tool_calls", []): - tool_name = tool_call.get("name", "unknown") - tool_calls[tool_name] = tool_calls.get(tool_name, 0) + 1 - - # Calculate duration - created = session_data.get("metadata", {}).get("creationTimestamp") - stopped = session_data.get("status", {}).get("stoppedAt") - - duration_seconds = 0 - if created and stopped: - try: - from datetime import datetime - - created_dt = datetime.fromisoformat(created.replace("Z", "+00:00")) - stopped_dt = datetime.fromisoformat(stopped.replace("Z", "+00:00")) - duration_seconds = int((stopped_dt - created_dt).total_seconds()) - except Exception: - pass - - return { - "token_count": int(token_count), - "duration_seconds": duration_seconds, - "tool_calls": tool_calls, - "message_count": message_count, - "status": session_data.get("status", {}).get("phase"), - } - - except Exception as e: - return {"error": str(e)} - - # P3 Feature: List Workflows - async def list_workflows(self, repo_url: str | None = None) -> dict[str, Any]: - """List available workflows from repository. - - Args: - repo_url: Repository URL (defaults to ootb-ambient-workflows) - - Returns: - Dict with workflows list - """ - if not repo_url: - repo_url = "https://github.com/ambient-code/ootb-ambient-workflows" - - # Security: Validate repo URL format - if not isinstance(repo_url, str): - return {"workflows": [], "error": "Repository URL must be a string"} - if not (repo_url.startswith("https://") or repo_url.startswith("http://")): - return {"workflows": [], "error": "Repository URL must use http:// or https://"} - # Prevent command injection through URL - if any(char in repo_url for char in [";", "|", "&", "$", "`", "\n", "\r", " "]): - return {"workflows": [], "error": "Invalid characters in repository URL"} - - try: - # Clone repo to temp directory - import shutil - import tempfile - - # Security: Use secure temp directory with random name - temp_dir = tempfile.mkdtemp(prefix=f"acp-workflows-{secrets.token_hex(8)}-") - - try: - # Clone the repo using secure subprocess - process = await asyncio.create_subprocess_exec( - "git", - "clone", - "--depth", - "1", - "--", - repo_url, - temp_dir, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - stdout, stderr = await asyncio.wait_for( - process.communicate(), - timeout=60, # 60 second timeout for git clone - ) - - if process.returncode != 0: - return { - "workflows": [], - "error": f"Failed to clone repository: {stderr.decode()}", - } - - # Find workflow files - workflows = [] - workflows_dir = Path(temp_dir) / "workflows" - - if workflows_dir.exists(): - # Limit to prevent DoS - file_count = 0 - max_files = 100 - for workflow_file in workflows_dir.glob("**/*.yaml"): - if file_count >= max_files: - break - file_count += 1 - - # Security: Validate file is within expected directory - try: - workflow_file.resolve().relative_to(workflows_dir.resolve()) - except ValueError: - continue # Skip files outside workflows directory - - # Read workflow to get metadata - try: - with open(workflow_file) as f: - workflow_data = yaml.safe_load(f) - if not isinstance(workflow_data, dict): - workflow_data = {} - - workflows.append( - { - "name": workflow_file.stem, - "path": str(workflow_file.relative_to(workflows_dir)), - "description": ( - workflow_data.get("description", "") - if isinstance(workflow_data.get("description"), str) - else "" - ), - } - ) - except (yaml.YAMLError, OSError): - # Skip invalid workflow files - continue - - return { - "workflows": workflows, - "repo_url": repo_url, - "count": len(workflows), - } - - finally: - # Clean up temp directory securely - try: - shutil.rmtree(temp_dir, ignore_errors=True) - except Exception: - pass - - except TimeoutError: - return {"workflows": [], "error": "Repository clone timed out"} - except Exception as e: - return {"workflows": [], "error": f"Unexpected error: {str(e)}"} - - # P3 Feature: Create Session from Template - async def create_session_from_template( - self, - project: str, - template: str, - display_name: str, - repos: list[str] | None = None, - dry_run: bool = False, - ) -> dict[str, Any]: - """Create session from predefined template. - - Args: - project: Project/namespace name - template: Template name (triage, bugfix, feature, exploration) - display_name: Display name for session - repos: Optional list of repository URLs - dry_run: Preview without creating - - Returns: - Dict with session creation status - """ - # Define templates - templates = { - "triage": { - "workflow": "triage", - "llmConfig": {"model": "claude-sonnet-4", "temperature": 0.7}, - "description": "Triage and analyze issues", - }, - "bugfix": { - "workflow": "bugfix", - "llmConfig": {"model": "claude-sonnet-4", "temperature": 0.3}, - "description": "Fix bugs and issues", - }, - "feature": { - "workflow": "feature-development", - "llmConfig": {"model": "claude-sonnet-4", "temperature": 0.5}, - "description": "Develop new features", - }, - "exploration": { - "workflow": "codebase-exploration", - "llmConfig": {"model": "claude-sonnet-4", "temperature": 0.8}, - "description": "Explore codebase", - }, - } - - if template not in templates: - return { - "created": False, - "message": f"Unknown template: {template}. Available: {', '.join(templates.keys())}", - } - - template_config = templates[template] - - if dry_run: return { - "dry_run": True, - "success": True, - "message": f"Would create session from template '{template}'", - "template_config": template_config, - "display_name": display_name, - "repos": repos or [], + "cluster": cluster_name, + "server": cluster_config.get("server", "unknown"), + "project": cluster_config.get("default_project", "unknown"), + "token_valid": token_valid, + "authenticated": token_valid, } - - try: - # Create session manifest - manifest = { - "apiVersion": "vteam.ambient-code/v1alpha1", - "kind": "AgenticSession", - "metadata": { - "generateName": f"{template}-", - "namespace": project, - }, - "spec": { - "displayName": display_name, - "workflow": template_config["workflow"], - "llmConfig": template_config["llmConfig"], - "repos": repos or [], - }, - } - - # Apply manifest using secure temporary file - import os - import tempfile - - # Security: Use secure temp file with proper permissions (0600) - fd, manifest_file = tempfile.mkstemp(suffix=".yaml", prefix=f"acp-template-{secrets.token_hex(8)}-") - try: - # Write to file descriptor with secure permissions - with os.fdopen(fd, "w") as f: - yaml.dump(manifest, f) - - result = await self._run_oc_command(["create", "-f", manifest_file, "-o", "json"]) - - if result.returncode != 0: - return { - "created": False, - "message": f"Failed to create session: {result.stderr.decode()}", - } - - created_data = json.loads(result.stdout.decode()) - session_name = created_data.get("metadata", {}).get("name") - - return { - "created": True, - "session": session_name, - "message": f"Successfully created session '{session_name}' from template '{template}'", - } - finally: - # Ensure cleanup even if operation fails - try: - os.unlink(manifest_file) - except OSError: - pass - - except Exception as e: - return {"created": False, "message": str(e)} - - # Auth Feature: Login - async def login(self, cluster: str, web: bool = True, token: str | None = None) -> dict[str, Any]: - """Authenticate to OpenShift cluster. - - Args: - cluster: Cluster alias name or server URL - web: Use web login flow - token: Direct token authentication - - Returns: - Dict with login status - """ - # Look up cluster in config - server = cluster - if cluster in self.config.get("clusters", {}): - server = self.config["clusters"][cluster]["server"] - - try: - if token: - # Token-based login - result = await self._run_oc_command( - ["login", "--token", token, "--server", server], - capture_output=False, - ) - elif web: - # Web-based login - result = await self._run_oc_command( - ["login", "--web", "--server", server], - capture_output=False, - ) - else: - return { - "authenticated": False, - "message": "Either 'web' or 'token' must be provided", - } - - if result.returncode != 0: - return { - "authenticated": False, - "message": "Login failed", - } - - # Get user info after login - whoami_result = await self.whoami() - + except ValueError as e: return { - "authenticated": True, - "user": whoami_result.get("user"), - "cluster": cluster, - "server": server, - "message": f"Successfully logged in to {cluster}", + "cluster": "unknown", + "server": "unknown", + "project": "unknown", + "token_valid": False, + "authenticated": False, + "error": str(e), } - except Exception as e: - return {"authenticated": False, "message": str(e)} - - # Auth Feature: Switch Cluster async def switch_cluster(self, cluster: str) -> dict[str, Any]: """Switch to a different cluster context. Args: cluster: Cluster alias name - - Returns: - Dict with switch status """ - if cluster not in self.config.get("clusters", {}): + if cluster not in self.clusters_config.clusters: return { "switched": False, "message": f"Unknown cluster: {cluster}. Use acp_list_clusters to see available clusters.", } - cluster_config = self.config["clusters"][cluster] - server = cluster_config["server"] - - try: - # Get current context - current_whoami = await self.whoami() - previous_cluster = current_whoami.get("cluster", "unknown") - - # Switch context (assumes already authenticated) - result = await self._run_oc_command( - ["login", "--server", server], - capture_output=False, - ) - - if result.returncode != 0: - return { - "switched": False, - "message": f"Failed to switch to {cluster}. You may need to login first.", - } - - # Get new user info - new_whoami = await self.whoami() - - return { - "switched": True, - "previous": previous_cluster, - "current": cluster, - "user": new_whoami.get("user"), - "message": f"Switched from {previous_cluster} to {cluster}", - } - - except Exception as e: - return {"switched": False, "message": str(e)} + previous_cluster = self.clusters_config.default_cluster + self.clusters_config.default_cluster = cluster - # Auth Feature: Add Cluster - def add_cluster( - self, - name: str, - server: str, - description: str | None = None, - default_project: str | None = None, - set_default: bool = False, - ) -> dict[str, Any]: - """Add a new cluster to configuration. - - Args: - name: Cluster alias name - server: Server URL - description: Optional description - default_project: Optional default project - set_default: Set as default cluster - - Returns: - Dict with add status - """ - try: - # Security: Validate inputs - if not isinstance(name, str) or not re.match(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$", name): - return {"added": False, "message": "Invalid cluster name format"} - if not isinstance(server, str) or not (server.startswith("https://") or server.startswith("http://")): - return {"added": False, "message": "Server must be a valid HTTP/HTTPS URL"} - if description and (not isinstance(description, str) or len(description) > 500): - return { - "added": False, - "message": "Description must be a string under 500 characters", - } - if default_project: - try: - self._validate_input(default_project, "default_project") - except ValueError as e: - return {"added": False, "message": str(e)} - - # Update config - if "clusters" not in self.config: - self.config["clusters"] = {} - - self.config["clusters"][name] = { - "server": server, - "description": description or "", - "default_project": default_project, - } - - if set_default: - self.config["default_cluster"] = name - - # Save config securely - config_file = Path(self.config_path) - config_file.parent.mkdir(parents=True, exist_ok=True) - - # Security: Write with restricted permissions - with open(config_file, "w") as f: - yaml.dump(self.config, f) - # Set file permissions to 0600 (owner read/write only) - import os - - os.chmod(config_file, 0o600) - - return { - "added": True, - "cluster": { - "name": name, - "server": server, - "description": description, - "default_project": default_project, - "is_default": set_default, - }, - "message": f"Successfully added cluster '{name}'", - } + return { + "switched": True, + "previous": previous_cluster, + "current": cluster, + "message": f"Switched from {previous_cluster} to {cluster}", + } - except Exception as e: - return {"added": False, "message": f"Failed to add cluster: {str(e)}"} + async def close(self) -> None: + """Close the HTTP client.""" + if self._http_client and not self._http_client.is_closed: + await self._http_client.aclose() diff --git a/src/mcp_acp/formatters.py b/src/mcp_acp/formatters.py index 1901f63..5a5cafa 100644 --- a/src/mcp_acp/formatters.py +++ b/src/mcp_acp/formatters.py @@ -5,14 +5,7 @@ def format_result(result: dict[str, Any]) -> str: - """Format a simple result dictionary. - - Args: - result: Result dictionary from operation - - Returns: - Formatted string for display - """ + """Format a simple result dictionary.""" if result.get("dry_run"): output = "DRY RUN MODE - No changes made\n\n" output += result.get("message", "") @@ -24,67 +17,38 @@ def format_result(result: dict[str, Any]) -> str: def format_sessions_list(result: dict[str, Any]) -> str: - """Format sessions list with filtering info. - - Args: - result: Result dictionary with sessions list - - Returns: - Formatted string for display - """ + """Format sessions list with filtering info.""" output = f"Found {result['total']} session(s)" filters = result.get("filters_applied", {}) if filters: - output += f"\nFilters applied: {json.dumps(filters, indent=2)}" + output += f"\nFilters applied: {json.dumps(filters)}" output += "\n\nSessions:\n" for session in result["sessions"]: - metadata = session.get("metadata", {}) - spec = session.get("spec", {}) - status = session.get("status", {}) - - name = metadata.get("name", "unknown") - display_name = spec.get("displayName", "") - phase = status.get("phase", "unknown") - created = metadata.get("creationTimestamp", "unknown") - - output += f"\n- {name}" - if display_name: - output += f' ("{display_name}")' - output += f"\n Status: {phase}\n Created: {created}\n" + # Handle both public-api DTO format and raw K8s format + session_id = session.get("id") or session.get("metadata", {}).get("name", "unknown") + status = session.get("status") or session.get("status", {}).get("phase", "unknown") + created = session.get("createdAt") or session.get("metadata", {}).get("creationTimestamp", "unknown") + task = session.get("task", "") + + output += f"\n- {session_id}" + output += f"\n Status: {status}" + output += f"\n Created: {created}" + if task: + output += f"\n Task: {task[:50]}{'...' if len(task) > 50 else ''}" + output += "\n" return output def format_bulk_result(result: dict[str, Any], operation: str) -> str: - """Format bulk operation results. - - Args: - result: Result dictionary from bulk operation - operation: Operation name (e.g., "delete", "stop") - - Returns: - Formatted string for display - """ + """Format bulk operation results.""" if result.get("dry_run"): output = "DRY RUN MODE - No changes made\n\n" - # Handle enhanced dry-run for label-based operations - if "matched_sessions" in result: - matched = result.get("matched_sessions", []) - output += f"Matched {result.get('matched_count', len(matched))} sessions with label selector:\n" - output += f" {result.get('label_selector', 'N/A')}\n\n" - if matched: - output += "Matched sessions:\n" - for session in matched: - output += f" - {session}\n" - output += f"\n{result.get('message', '')}\n" - return output - dry_run_info = result.get("dry_run_info", {}) - would_execute = dry_run_info.get("would_execute", []) skipped = dry_run_info.get("skipped", []) @@ -92,10 +56,8 @@ def format_bulk_result(result: dict[str, Any], operation: str) -> str: output += f"Would {operation} {len(would_execute)} session(s):\n" for item in would_execute: output += f" - {item['session']}\n" - if item.get("info"): - info = item["info"] - if "status" in info: - output += f" Status: {info['status']}\n" + if item.get("info") and "status" in item["info"]: + output += f" Status: {item['info']['status']}\n" if skipped: output += f"\nSkipped ({len(skipped)} session(s)):\n" @@ -107,22 +69,12 @@ def format_bulk_result(result: dict[str, Any], operation: str) -> str: return output - # Normal mode - # Map operation to success key - success_key_map = { - "delete": "deleted", - "stop": "stopped", - "restart": "restarted", - "label": "labeled", - "unlabel": "unlabeled", - } + success_key_map = {"delete": "deleted", "stop": "stopped", "restart": "restarted"} success_key = success_key_map.get(operation, operation) success = result.get(success_key, []) failed = result.get("failed", []) - # Determine if we're working with sessions or resources - resource_type = "session(s)" if "session" in str(failed) else "resource(s)" - output = f"Successfully {operation}d {len(success)} {resource_type}" + output = f"Successfully {operation}d {len(success)} session(s)" if success: output += ":\n" @@ -137,39 +89,8 @@ def format_bulk_result(result: dict[str, Any], operation: str) -> str: return output -def format_logs(result: dict[str, Any]) -> str: - """Format session logs. - - Args: - result: Result dictionary with logs - - Returns: - Formatted string for display - """ - if "error" in result: - error_msg = result["error"] - # Check if this is an expected state rather than an error - error_lower = error_msg.lower() - if any(phrase in error_lower for phrase in ["no pods found", "not found", "no running pods"]): - return f"No logs available: {error_msg}\n\nNote: This is expected for stopped sessions or sessions without active pods." - return f"Error retrieving logs: {error_msg}" - - output = f"Logs from container '{result.get('container', 'default')}'" - output += f" ({result.get('lines', 0)} lines):\n\n" - output += result.get("logs", "") - - return output - - def format_clusters(result: dict[str, Any]) -> str: - """Format clusters list. - - Args: - result: Result dictionary with clusters list - - Returns: - Formatted string for display - """ + """Format clusters list.""" clusters = result.get("clusters", []) default = result.get("default_cluster") @@ -198,190 +119,20 @@ def format_clusters(result: dict[str, Any]) -> str: def format_whoami(result: dict[str, Any]) -> str: - """Format whoami information. - - Args: - result: Result dictionary with auth info - - Returns: - Formatted string for display - """ - output = "Current Authentication Status:\n\n" + """Format whoami information.""" + output = "Configuration Status:\n\n" authenticated = result.get("authenticated", False) - output += f"Authenticated: {'Yes' if authenticated else 'No'}\n" + output += f"Token Configured: {'Yes' if authenticated else 'No'}\n" - if authenticated: - output += f"User: {result.get('user', 'unknown')}\n" - output += f"Cluster: {result.get('cluster', 'unknown')}\n" - output += f"Server: {result.get('server', 'unknown')}\n" - output += f"Project: {result.get('project', 'unknown')}\n" + output += f"Cluster: {result.get('cluster', 'unknown')}\n" + output += f"Server: {result.get('server', 'unknown')}\n" + output += f"Project: {result.get('project', 'unknown')}\n" - token_valid = result.get("token_valid", False) - output += f"Token Valid: {'Yes' if token_valid else 'No'}\n" + if not authenticated: + output += "\nSet token in clusters.yaml or ACP_TOKEN environment variable.\n" - if result.get("token_expires"): - output += f"Token Expires: {result['token_expires']}\n" - else: - output += "\nYou are not authenticated. Use 'acp_login' to authenticate.\n" + if result.get("error"): + output += f"\nError: {result['error']}\n" return output - - -def format_transcript(result: dict[str, Any]) -> str: - """Format session transcript. - - Args: - result: Result dictionary with transcript - - Returns: - Formatted string for display - """ - if "error" in result: - error_msg = result["error"] - error_lower = error_msg.lower() - # Check if this is an expected state (no transcript available) - if any(phrase in error_lower for phrase in ["no transcript", "transcript not found", "no data"]): - return f"No transcript available: {error_msg}\n\nNote: Sessions may not have transcript data if they are newly created, stopped, or haven't processed messages yet." - return f"Error retrieving transcript: {error_msg}" - - format_type = result.get("format", "json") - message_count = result.get("message_count", 0) - - if message_count == 0: - return "Session Transcript: No messages yet.\n\nNote: This session may be newly created or hasn't processed any messages." - - if format_type == "markdown": - output = f"Session Transcript ({message_count} messages):\n\n" - output += result.get("transcript", "") - return output - else: - output = f"Session Transcript ({message_count} messages):\n\n" - output += json.dumps(result.get("transcript", []), indent=2) - return output - - -def format_metrics(result: dict[str, Any]) -> str: - """Format session metrics. - - Args: - result: Result dictionary with metrics - - Returns: - Formatted string for display - """ - if "error" in result: - error_msg = result["error"] - error_lower = error_msg.lower() - # Check if this is an expected state (no metrics available) - if any(phrase in error_lower for phrase in ["no transcript", "no data", "not found"]): - return f"No metrics available: {error_msg}\n\nNote: Metrics are calculated from transcript data. Sessions without transcript data (new, stopped, or inactive sessions) will not have metrics." - return f"Error retrieving metrics: {error_msg}" - - output = "Session Metrics:\n\n" - message_count = result.get("message_count", 0) - - if message_count == 0: - output += "No metrics available yet.\n\nNote: This session has no message history. Metrics will be available after the session processes messages." - return output - - output += f"Message Count: {message_count}\n" - output += f"Token Count (approx): {result.get('token_count', 0)}\n" - output += f"Duration: {result.get('duration_seconds', 0)} seconds\n" - output += f"Status: {result.get('status', 'unknown')}\n" - - tool_calls = result.get("tool_calls", {}) - if tool_calls: - output += "\nTool Usage:\n" - for tool_name, count in sorted(tool_calls.items(), key=lambda x: x[1], reverse=True): - output += f" - {tool_name}: {count}\n" - - return output - - -def format_workflows(result: dict[str, Any]) -> str: - """Format workflows list. - - Args: - result: Result dictionary with workflows - - Returns: - Formatted string for display - """ - if "error" in result: - error_msg = result["error"] - error_lower = error_msg.lower() - # Check if this is an expected state (no workflows found) - if any(phrase in error_lower for phrase in ["no workflows", "not found", "no .github/workflows"]): - return f"No workflows found: {error_msg}\n\nNote: This repository may not have GitHub Actions workflows configured yet." - return f"Error retrieving workflows: {error_msg}" - - workflows = result.get("workflows", []) - repo_url = result.get("repo_url", "") - count = result.get("count", 0) - - if not workflows: - return f"No workflows found in {repo_url}\n\nNote: This repository does not have any GitHub Actions workflows in .github/workflows/" - - output = f"Available Workflows ({count} found):\n" - output += f"Repository: {repo_url}\n\n" - - for workflow in workflows: - output += f"- {workflow['name']}\n" - output += f" Path: {workflow['path']}\n" - if workflow.get("description"): - output += f" Description: {workflow['description']}\n" - output += "\n" - - return output - - -def format_export(result: dict[str, Any]) -> str: - """Format session export data. - - Args: - result: Result dictionary with export data - - Returns: - Formatted string for display - """ - if "error" in result: - error_msg = result["error"] - error_lower = error_msg.lower() - # Check if this is a partial export (some data unavailable) - if any(phrase in error_lower for phrase in ["no transcript", "no data", "partially exported"]): - return f"Partial export: {error_msg}\n\nNote: Some session data may be unavailable for stopped or inactive sessions. Exported data reflects what was accessible." - return f"Error exporting session: {error_msg}" - - if not result.get("exported"): - return result.get("message", "Export failed") - - data = result.get("data", {}) - output = "Session Export:\n\n" - output += "Configuration:\n" - output += json.dumps(data.get("config", {}), indent=2) - output += "\n\nMetadata:\n" - output += json.dumps(data.get("metadata", {}), indent=2) - - transcript = data.get("transcript", []) - transcript_count = len(transcript) - output += f"\n\nTranscript: {transcript_count} messages" - - if transcript_count == 0: - output += " (no transcript data available - this is expected for new/stopped sessions)" - - output += "\n\n" + result.get("message", "") - - return output - - -def format_cluster_operation(result: dict[str, Any]) -> str: - """Format cluster operation results (add, switch, login). - - Args: - result: Result dictionary from cluster operation - - Returns: - Formatted string for display - """ - return result.get("message", json.dumps(result, indent=2)) diff --git a/src/mcp_acp/server.py b/src/mcp_acp/server.py index 4169c04..b97b199 100644 --- a/src/mcp_acp/server.py +++ b/src/mcp_acp/server.py @@ -2,7 +2,6 @@ import asyncio import os -from collections.abc import Callable from typing import Any from mcp.server import Server @@ -14,122 +13,21 @@ from .client import ACPClient from .formatters import ( format_bulk_result, - format_cluster_operation, format_clusters, - format_export, - format_logs, - format_metrics, format_result, format_sessions_list, - format_transcript, format_whoami, - format_workflows, ) -# Initialize structured logger logger = get_python_logger() -# Create MCP server instance app = Server("mcp-acp") -# Global client instance _client: ACPClient | None = None -# Schema fragments for reuse -SCHEMA_FRAGMENTS = { - "project": { - "type": "string", - "description": "Project/namespace name (optional - uses default_project from clusters.yaml if not provided)", - }, - "session": { - "type": "string", - "description": "Session name", - }, - "dry_run": { - "type": "boolean", - "description": "Preview without actually executing (default: false)", - "default": False, - }, - "sessions_list": { - "type": "array", - "items": {"type": "string"}, - "description": "List of session names", - }, - "container": { - "type": "string", - "description": "Container name (e.g., 'runner', 'sidecar')", - }, - "tail_lines": { - "type": "integer", - "description": "Number of lines to retrieve from the end", - "minimum": 1, - }, - "display_name": { - "type": "string", - "description": "Display name for the session", - }, - "cluster": { - "type": "string", - "description": "Cluster alias name or server URL", - }, - "repos_list": { - "type": "array", - "items": {"type": "string"}, - "description": "List of repository URLs", - }, - "labels_dict": { - "type": "object", - "description": "Label key-value pairs (e.g., {'env': 'prod'})", - "additionalProperties": {"type": "string"}, - }, - "label_keys_list": { - "type": "array", - "items": {"type": "string"}, - "description": "List of label keys to remove (without prefix)", - }, - "resource_type": { - "type": "string", - "description": "Resource type (agenticsession, namespace, etc)", - }, - "confirm": { - "type": "boolean", - "description": "Required for destructive bulk ops (default: false)", - "default": False, - }, -} - - -def create_tool_schema(properties: dict[str, Any], required: list[str]) -> dict[str, Any]: - """Build tool input schema from property references. - - Args: - properties: Dict mapping property names to fragment keys or schema dicts - required: List of required property names - - Returns: - JSON schema dict - """ - schema_properties = {} - for prop_name, fragment_key in properties.items(): - if isinstance(fragment_key, str) and fragment_key in SCHEMA_FRAGMENTS: - # Reference to a schema fragment - use a copy to avoid mutation - schema_properties[prop_name] = SCHEMA_FRAGMENTS[fragment_key].copy() - elif isinstance(fragment_key, dict): - # Inline schema definition - use a copy to avoid mutation - schema_properties[prop_name] = fragment_key.copy() - else: - # String reference not in fragments - treat as-is - schema_properties[prop_name] = fragment_key - - return { - "type": "object", - "properties": schema_properties, - "required": required, - } - def get_client() -> ACPClient: - """Get or create ACP client instance with error handling.""" + """Get or create ACP client instance.""" global _client if _client is None: config_path = os.getenv("ACP_CLUSTER_CONFIG") @@ -146,57 +44,25 @@ def get_client() -> ACPClient: return _client -async def _check_confirmation_then_execute(fn: Callable, args: dict[str, Any], operation: str) -> Any: - """Enforce confirmation at server layer (not client). - - Args: - fn: Function to execute - args: Function arguments - operation: Operation name for error message - - Returns: - Result from function - - Raises: - ValueError: If confirmation not provided for non-dry-run operations - """ - if not args.get("dry_run") and not args.get("confirm"): - raise ValueError(f"Bulk {operation} requires explicit confirmation.\nAdd confirm=true to proceed.") - return await fn(**args) - - @app.list_tools() async def list_tools() -> list[Tool]: - """List available ACP (Ambient Code Platform) tools for managing AgenticSession resources on OpenShift/Kubernetes.""" + """List available ACP tools for managing AgenticSession resources.""" return [ - # P0 Priority Tools - Tool( - name="acp_delete_session", - description="Delete an ACP (Ambient Code Platform) AgenticSession from an OpenShift project/namespace. Supports dry-run mode for safe preview before deletion.", - inputSchema=create_tool_schema( - properties={ - "project": "project", - "session": "session", - "dry_run": "dry_run", - }, - required=["session"], - ), - ), Tool( name="acp_list_sessions", - description="List and filter ACP (Ambient Code Platform) AgenticSessions in an OpenShift project. Filter by status (running/stopped/failed), age, display name, labels. Sort and limit results.", - inputSchema=create_tool_schema( - properties={ - "project": "project", + description="List and filter AgenticSessions in a project. Filter by status (running/stopped/failed), age. Sort and limit results.", + inputSchema={ + "type": "object", + "properties": { + "project": { + "type": "string", + "description": "Project/namespace name (uses default if not provided)", + }, "status": { "type": "string", "description": "Filter by status", "enum": ["running", "stopped", "creating", "failed"], }, - "has_display_name": { - "type": "boolean", - "description": "Filter by display name presence", - }, "older_than": { "type": "string", "description": "Filter by age (e.g., '7d', '24h', '30m')", @@ -211,611 +77,194 @@ async def list_tools() -> list[Tool]: "description": "Maximum number of results", "minimum": 1, }, - "label_selector": { - "type": "string", - "description": "K8s label selector (e.g., 'acp.ambient-code.ai/label-env=prod,acp.ambient-code.ai/label-team=api')", - }, - }, - required=[], - ), - ), - # P1 Priority Tools - Tool( - name="acp_restart_session", - description="Restart a stopped session. Supports dry-run mode.", - inputSchema=create_tool_schema( - properties={ - "project": "project", - "session": "session", - "dry_run": "dry_run", - }, - required=["session"], - ), - ), - Tool( - name="acp_bulk_delete_sessions", - description="Delete multiple sessions (max 3). DESTRUCTIVE: requires confirm=true. Use dry_run=true first!", - inputSchema=create_tool_schema( - properties={ - "project": "project", - "sessions": "sessions_list", - "confirm": "confirm", - "dry_run": "dry_run", - }, - required=["sessions"], - ), - ), - Tool( - name="acp_bulk_stop_sessions", - description="Stop multiple running sessions (max 3). Requires confirm=true. Use dry_run=true first!", - inputSchema=create_tool_schema( - properties={ - "project": "project", - "sessions": "sessions_list", - "confirm": "confirm", - "dry_run": "dry_run", - }, - required=["sessions"], - ), - ), - Tool( - name="acp_get_session_logs", - description="Retrieve container logs for a session for debugging purposes.", - inputSchema=create_tool_schema( - properties={ - "project": "project", - "session": "session", - "container": "container", - "tail_lines": "tail_lines", }, - required=["session"], - ), - ), - Tool( - name="acp_list_clusters", - description="List configured cluster aliases from clusters.yaml configuration.", - inputSchema={"type": "object", "properties": {}}, + "required": [], + }, ), Tool( - name="acp_whoami", - description="Get current authentication status and user information.", - inputSchema={"type": "object", "properties": {}}, - ), - # Label Management Tools - Tool( - name="acp_label_resource", - description="Add/update labels on any ACP resource. Works for sessions, workspaces, future types. Uses --overwrite.", - inputSchema=create_tool_schema( - properties={ - "resource_type": "resource_type", - "name": "session", - "project": "project", - "labels": "labels_dict", - "dry_run": "dry_run", - }, - required=["resource_type", "name", "project", "labels"], - ), - ), - Tool( - name="acp_unlabel_resource", - description="Remove specific labels from any ACP resource.", - inputSchema=create_tool_schema( - properties={ - "resource_type": "resource_type", - "name": "session", - "project": "project", - "label_keys": "label_keys_list", - "dry_run": "dry_run", - }, - required=["resource_type", "name", "project", "label_keys"], - ), - ), - Tool( - name="acp_bulk_label_resources", - description="Label multiple resources (max 3) with same labels. Requires confirm=true.", - inputSchema=create_tool_schema( - properties={ - "resource_type": "resource_type", - "names": "sessions_list", - "project": "project", - "labels": "labels_dict", - "confirm": "confirm", - "dry_run": "dry_run", - }, - required=["resource_type", "names", "project", "labels"], - ), - ), - Tool( - name="acp_bulk_unlabel_resources", - description="Remove labels from multiple resources (max 3). Requires confirm=true.", - inputSchema=create_tool_schema( - properties={ - "resource_type": "resource_type", - "names": "sessions_list", - "project": "project", - "label_keys": "label_keys_list", - "confirm": "confirm", - "dry_run": "dry_run", - }, - required=["resource_type", "names", "project", "label_keys"], - ), - ), - Tool( - name="acp_list_sessions_by_label", - description="List sessions filtered by user-friendly labels (convenience wrapper, auto-prefixes labels).", - inputSchema=create_tool_schema( - properties={ - "project": "project", - "labels": "labels_dict", - "status": { + name="acp_get_session", + description="Get details of a specific session by ID.", + inputSchema={ + "type": "object", + "properties": { + "project": { "type": "string", - "description": "Filter by status (running, stopped, etc)", - }, - "limit": { - "type": "integer", - "description": "Limit results", + "description": "Project/namespace name (uses default if not provided)", }, - }, - required=["project", "labels"], - ), - ), - Tool( - name="acp_bulk_delete_sessions_by_label", - description="Delete sessions (max 3) matching label selector. DESTRUCTIVE: requires confirm=true.", - inputSchema=create_tool_schema( - properties={ - "project": "project", - "labels": "labels_dict", - "confirm": "confirm", - "dry_run": "dry_run", - }, - required=["project", "labels"], - ), - ), - Tool( - name="acp_bulk_stop_sessions_by_label", - description="Stop sessions (max 3) matching label selector. Requires confirm=true.", - inputSchema=create_tool_schema( - properties={ - "project": "project", - "labels": "labels_dict", - "confirm": "confirm", - "dry_run": "dry_run", - }, - required=["project", "labels"], - ), - ), - Tool( - name="acp_bulk_restart_sessions", - description="Restart multiple stopped sessions (max 3). Requires confirm=true.", - inputSchema=create_tool_schema( - properties={ - "project": "project", - "sessions": "sessions_list", - "confirm": "confirm", - "dry_run": "dry_run", - }, - required=["project", "sessions"], - ), - ), - Tool( - name="acp_bulk_restart_sessions_by_label", - description="Restart sessions (max 3) matching label selector. Requires confirm=true.", - inputSchema=create_tool_schema( - properties={ - "project": "project", - "labels": "labels_dict", - "confirm": "confirm", - "dry_run": "dry_run", - }, - required=["project", "labels"], - ), - ), - # P2 Priority Tools - Tool( - name="acp_clone_session", - description="Clone a session with its configuration. Supports dry-run mode.", - inputSchema=create_tool_schema( - properties={ - "project": "project", - "source_session": "session", - "new_display_name": "display_name", - "dry_run": "dry_run", - }, - required=["source_session", "new_display_name"], - ), - ), - Tool( - name="acp_get_session_transcript", - description="Get session transcript/conversation history.", - inputSchema=create_tool_schema( - properties={ - "project": "project", - "session": "session", - "format": { + "session": { "type": "string", - "description": "Output format", - "enum": ["json", "markdown"], - "default": "json", + "description": "Session ID", }, }, - required=["session"], - ), + "required": ["session"], + }, ), Tool( - name="acp_update_session", - description="Update session metadata (display name, timeout). Supports dry-run mode.", - inputSchema=create_tool_schema( - properties={ - "project": "project", - "session": "session", - "display_name": "display_name", - "timeout": { - "type": "integer", - "description": "Timeout in seconds", + name="acp_delete_session", + description="Delete an AgenticSession. Supports dry-run mode for preview.", + inputSchema={ + "type": "object", + "properties": { + "project": { + "type": "string", + "description": "Project/namespace name (uses default if not provided)", }, - "dry_run": "dry_run", - }, - required=["session"], - ), - ), - Tool( - name="acp_export_session", - description="Export session configuration and transcript for archival.", - inputSchema=create_tool_schema( - properties={ - "project": "project", - "session": "session", - }, - required=["session"], - ), - ), - # P3 Priority Tools - Tool( - name="acp_get_session_metrics", - description="Get session metrics (token usage, duration, tool calls).", - inputSchema=create_tool_schema( - properties={ - "project": "project", - "session": "session", - }, - required=["session"], - ), - ), - Tool( - name="acp_list_workflows", - description="List available workflows from repository.", - inputSchema=create_tool_schema( - properties={ - "repo_url": { + "session": { "type": "string", - "description": "Repository URL (defaults to ootb-ambient-workflows)", + "description": "Session name", + }, + "dry_run": { + "type": "boolean", + "description": "Preview without deleting (default: false)", + "default": False, }, }, - required=[], - ), + "required": ["session"], + }, ), Tool( - name="acp_create_session_from_template", - description="Create session from predefined template (triage, bugfix, feature, exploration). Supports dry-run mode.", - inputSchema=create_tool_schema( - properties={ - "project": "project", - "template": { + name="acp_bulk_delete_sessions", + description="Delete multiple sessions (max 3). DESTRUCTIVE: requires confirm=true. Use dry_run=true first!", + inputSchema={ + "type": "object", + "properties": { + "project": { "type": "string", - "description": "Template name", - "enum": ["triage", "bugfix", "feature", "exploration"], + "description": "Project/namespace name (uses default if not provided)", }, - "display_name": "display_name", - "repos": "repos_list", - "dry_run": "dry_run", - }, - required=["template", "display_name"], - ), - ), - # Auth Enhancement Tools - Tool( - name="acp_login", - description="Authenticate to OpenShift cluster via web or token.", - inputSchema=create_tool_schema( - properties={ - "cluster": "cluster", - "web": { + "sessions": { + "type": "array", + "items": {"type": "string"}, + "description": "List of session names", + }, + "confirm": { "type": "boolean", - "description": "Use web login flow (default: true)", - "default": True, + "description": "Required for destructive operations (default: false)", + "default": False, }, - "token": { - "type": "string", - "description": "Direct token for authentication", + "dry_run": { + "type": "boolean", + "description": "Preview without deleting (default: false)", + "default": False, }, }, - required=["cluster"], - ), + "required": ["sessions"], + }, ), Tool( - name="acp_switch_cluster", - description="Switch to a different cluster context.", - inputSchema=create_tool_schema( - properties={ - "cluster": "cluster", - }, - required=["cluster"], - ), + name="acp_list_clusters", + description="List configured cluster aliases from clusters.yaml.", + inputSchema={"type": "object", "properties": {}}, + ), + Tool( + name="acp_whoami", + description="Get current configuration and authentication status.", + inputSchema={"type": "object", "properties": {}}, ), Tool( - name="acp_add_cluster", - description="Add a new cluster to configuration.", - inputSchema=create_tool_schema( - properties={ - "name": { + name="acp_switch_cluster", + description="Switch to a different cluster context.", + inputSchema={ + "type": "object", + "properties": { + "cluster": { "type": "string", "description": "Cluster alias name", }, - "server": { - "type": "string", - "description": "Server URL", - }, - "description": { - "type": "string", - "description": "Optional description", - }, - "default_project": { - "type": "string", - "description": "Optional default project", - }, - "set_default": { - "type": "boolean", - "description": "Set as default cluster", - "default": False, - }, }, - required=["name", "server"], - ), + "required": ["cluster"], + }, ), ] -# Async wrapper functions for confirmation-protected bulk operations -def create_bulk_wrappers(client: ACPClient) -> dict[str, Callable]: - """Create async wrapper functions for bulk operations with confirmation. - - Args: - client: ACP client instance - - Returns: - Dict of wrapper function names to async functions - """ - - async def bulk_delete_wrapper(**args): - return await _check_confirmation_then_execute(client.bulk_delete_sessions, args, "delete") - - async def bulk_stop_wrapper(**args): - return await _check_confirmation_then_execute(client.bulk_stop_sessions, args, "stop") - - async def bulk_delete_by_label_wrapper(**args): - return await _check_confirmation_then_execute(client.bulk_delete_sessions_by_label, args, "delete") - - async def bulk_stop_by_label_wrapper(**args): - return await _check_confirmation_then_execute(client.bulk_stop_sessions_by_label, args, "stop") - - async def bulk_restart_wrapper(**args): - return await _check_confirmation_then_execute(client.bulk_restart_sessions, args, "restart") - - async def bulk_restart_by_label_wrapper(**args): - return await _check_confirmation_then_execute(client.bulk_restart_sessions_by_label, args, "restart") - - return { - "bulk_delete": bulk_delete_wrapper, - "bulk_stop": bulk_stop_wrapper, - "bulk_delete_by_label": bulk_delete_by_label_wrapper, - "bulk_stop_by_label": bulk_stop_by_label_wrapper, - "bulk_restart": bulk_restart_wrapper, - "bulk_restart_by_label": bulk_restart_by_label_wrapper, - } - - -# Tool dispatch table: maps tool names to (handler, formatter) pairs -def create_dispatch_table(client: ACPClient) -> dict[str, tuple[Callable, Callable]]: - """Create tool dispatch table. - - Args: - client: ACP client instance - - Returns: - Dict mapping tool names to (handler, formatter) tuples - """ - bulk_wrappers = create_bulk_wrappers(client) - - return { - "acp_delete_session": ( - client.delete_session, - format_result, - ), - "acp_list_sessions": ( - client.list_sessions, - format_sessions_list, - ), - "acp_restart_session": ( - client.restart_session, - format_result, - ), - "acp_bulk_delete_sessions": ( - bulk_wrappers["bulk_delete"], - lambda r: format_bulk_result(r, "delete"), - ), - "acp_bulk_stop_sessions": ( - bulk_wrappers["bulk_stop"], - lambda r: format_bulk_result(r, "stop"), - ), - "acp_get_session_logs": ( - client.get_session_logs, - format_logs, - ), - "acp_list_clusters": ( - client.list_clusters, - format_clusters, - ), - "acp_whoami": ( - client.whoami, - format_whoami, - ), - # Label Management Tools - "acp_label_resource": ( - client.label_resource, - format_result, - ), - "acp_unlabel_resource": ( - client.unlabel_resource, - format_result, - ), - "acp_bulk_label_resources": ( - lambda **args: _check_confirmation_then_execute(client.bulk_label_resources, args, "label"), - lambda r: format_bulk_result(r, "label"), - ), - "acp_bulk_unlabel_resources": ( - lambda **args: _check_confirmation_then_execute(client.bulk_unlabel_resources, args, "unlabel"), - lambda r: format_bulk_result(r, "unlabel"), - ), - "acp_list_sessions_by_label": ( - client.list_sessions_by_user_labels, - format_sessions_list, - ), - "acp_bulk_delete_sessions_by_label": ( - bulk_wrappers["bulk_delete_by_label"], - lambda r: format_bulk_result(r, "delete"), - ), - "acp_bulk_stop_sessions_by_label": ( - bulk_wrappers["bulk_stop_by_label"], - lambda r: format_bulk_result(r, "stop"), - ), - "acp_bulk_restart_sessions": ( - bulk_wrappers["bulk_restart"], - lambda r: format_bulk_result(r, "restart"), - ), - "acp_bulk_restart_sessions_by_label": ( - bulk_wrappers["bulk_restart_by_label"], - lambda r: format_bulk_result(r, "restart"), - ), - # P2 Tools - "acp_clone_session": ( - client.clone_session, - format_result, - ), - "acp_get_session_transcript": ( - client.get_session_transcript, - format_transcript, - ), - "acp_update_session": ( - client.update_session, - format_result, - ), - "acp_export_session": ( - client.export_session, - format_export, - ), - # P3 Tools - "acp_get_session_metrics": ( - client.get_session_metrics, - format_metrics, - ), - "acp_list_workflows": ( - client.list_workflows, - format_workflows, - ), - "acp_create_session_from_template": ( - client.create_session_from_template, - format_result, - ), - # Auth Tools - "acp_login": ( - client.login, - format_cluster_operation, - ), - "acp_switch_cluster": ( - client.switch_cluster, - format_cluster_operation, - ), - "acp_add_cluster": ( - client.add_cluster, - format_cluster_operation, - ), - } - - -# Tools that don't require a project parameter (cluster-level or config operations) TOOLS_WITHOUT_PROJECT = { "acp_list_clusters", "acp_whoami", - "acp_login", "acp_switch_cluster", - "acp_add_cluster", - "acp_list_workflows", } @app.call_tool() async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: - """Handle tool calls with dispatch table. - - Args: - name: Tool name - arguments: Tool arguments - - Returns: - List of text content responses - """ + """Handle tool calls.""" import time start_time = time.time() - # Security: Sanitize arguments for logging (remove sensitive data) safe_args = {k: v for k, v in arguments.items() if k not in ["token", "password", "secret"]} logger.info("tool_call_started", tool=name, arguments=safe_args) client = get_client() - dispatch_table = create_dispatch_table(client) try: - handler, formatter = dispatch_table.get(name, (None, None)) + # Auto-fill project from default if not provided + if name not in TOOLS_WITHOUT_PROJECT and not arguments.get("project"): + cluster_name = client.clusters_config.default_cluster + if cluster_name: + cluster = client.clusters_config.clusters.get(cluster_name) + if cluster and cluster.default_project: + arguments["project"] = cluster.default_project + logger.info("project_autofilled", project=cluster.default_project) + + # Dispatch to handler + if name == "acp_list_sessions": + result = await client.list_sessions( + project=arguments.get("project", ""), + status=arguments.get("status"), + older_than=arguments.get("older_than"), + sort_by=arguments.get("sort_by"), + limit=arguments.get("limit"), + ) + text = format_sessions_list(result) + + elif name == "acp_get_session": + result = await client.get_session( + project=arguments.get("project", ""), + session=arguments["session"], + ) + text = format_result(result) + + elif name == "acp_delete_session": + result = await client.delete_session( + project=arguments.get("project", ""), + session=arguments["session"], + dry_run=arguments.get("dry_run", False), + ) + text = format_result(result) + + elif name == "acp_bulk_delete_sessions": + if not arguments.get("dry_run") and not arguments.get("confirm"): + raise ValueError("Bulk delete requires confirm=true. Use dry_run=true to preview first.") + result = await client.bulk_delete_sessions( + project=arguments.get("project", ""), + sessions=arguments["sessions"], + dry_run=arguments.get("dry_run", False), + ) + text = format_bulk_result(result, "delete") + + elif name == "acp_list_clusters": + result = client.list_clusters() + text = format_clusters(result) + + elif name == "acp_whoami": + result = await client.whoami() + text = format_whoami(result) + + elif name == "acp_switch_cluster": + result = await client.switch_cluster(arguments["cluster"]) + text = format_result(result) - if not handler: + else: logger.warning("unknown_tool_requested", tool=name) return [TextContent(type="text", text=f"Unknown tool: {name}")] - # Auto-fill project from default_project if not provided or empty - # Only for tools that actually use project parameter - if name not in TOOLS_WITHOUT_PROJECT and not arguments.get("project"): - # Get default project from current cluster config - default_cluster = client.config.get("default_cluster") - if default_cluster: - cluster_config = client.config.get("clusters", {}).get(default_cluster, {}) - default_project = cluster_config.get("default_project") - if default_project: - arguments["project"] = default_project - logger.info("project_autofilled", project=default_project, cluster=default_cluster) - - # Call handler (async or sync) - if asyncio.iscoroutinefunction(handler): - result = await handler(**arguments) - else: - result = handler(**arguments) - - # Log execution time elapsed = time.time() - start_time logger.info("tool_call_completed", tool=name, elapsed_seconds=round(elapsed, 2)) - # Check for errors in result - if isinstance(result, dict): - if result.get("error"): - logger.warning("tool_returned_error", tool=name, error=result.get("error")) - elif not result.get("success", True) and "message" in result: - logger.warning("tool_failed", tool=name, message=result.get("message")) - - return [TextContent(type="text", text=formatter(result))] + return [TextContent(type="text", text=text)] except ValueError as e: - # Validation errors - these are expected for invalid input elapsed = time.time() - start_time logger.warning("tool_validation_error", tool=name, elapsed_seconds=round(elapsed, 2), error=str(e)) return [TextContent(type="text", text=f"Validation Error: {str(e)}")] @@ -825,13 +274,7 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: return [TextContent(type="text", text=f"Timeout Error: {str(e)}")] except Exception as e: elapsed = time.time() - start_time - logger.error( - "tool_unexpected_error", - tool=name, - elapsed_seconds=round(elapsed, 2), - error=str(e), - exc_info=True, - ) + logger.error("tool_unexpected_error", tool=name, elapsed_seconds=round(elapsed, 2), error=str(e), exc_info=True) return [TextContent(type="text", text=f"Error: {str(e)}")] diff --git a/src/mcp_acp/settings.py b/src/mcp_acp/settings.py index 23810a9..8beb701 100644 --- a/src/mcp_acp/settings.py +++ b/src/mcp_acp/settings.py @@ -15,18 +15,23 @@ class ClusterConfig(BaseSettings): - """Configuration for a single OpenShift cluster. + """Configuration for a single Ambient Code Platform cluster. Attributes: - server: OpenShift API server URL + server: Public API gateway URL (the frontend route that exposes the public-api service) default_project: Default project/namespace to use description: Optional human-readable description + token: Optional authentication token (can also be set via environment variable) """ server: str = Field( ..., - description="OpenShift API server URL", - json_schema_extra={"example": "https://api.cluster.example.com:6443"}, + description="Public API gateway URL", + json_schema_extra={"example": "https://public-api-ambient.apps.cluster.example.com"}, + ) + token: str | None = Field( + default=None, + description="Authentication token (Bearer token for API access)", ) default_project: str = Field( ..., @@ -45,7 +50,8 @@ def validate_server_url(cls, v: str) -> str: """Validate server URL format.""" if not v.startswith(("https://", "http://")): raise ValueError("Server URL must start with https:// or http://") - return v + # Strip trailing slash for consistency + return v.rstrip("/") @field_validator("default_project") @classmethod @@ -61,7 +67,7 @@ def validate_project_name(cls, v: str) -> str: class ClustersConfig(BaseSettings): - """Configuration for all OpenShift clusters. + """Configuration for all Ambient Code Platform clusters. Attributes: clusters: Dictionary of cluster configurations @@ -144,9 +150,6 @@ class Settings(BaseSettings): Attributes: config_path: Path to clusters.yaml configuration file log_level: Logging level - timeout_default: Default timeout for oc commands (seconds) - max_log_lines: Maximum log lines to retrieve - max_file_size: Maximum file size for exports (bytes) """ config_path: Path = Field( @@ -158,24 +161,6 @@ class Settings(BaseSettings): description="Logging level", json_schema_extra={"enum": ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]}, ) - timeout_default: int = Field( - default=300, - ge=1, - le=3600, - description="Default timeout for oc commands (seconds)", - ) - max_log_lines: int = Field( - default=10000, - ge=1, - le=100000, - description="Maximum log lines to retrieve", - ) - max_file_size: int = Field( - default=10 * 1024 * 1024, # 10MB - ge=1024, - le=100 * 1024 * 1024, # 100MB - description="Maximum file size for exports (bytes)", - ) @field_validator("log_level") @classmethod