From 85acb85188a8256bbcba0dca7910a5cec9cb9c4c Mon Sep 17 00:00:00 2001 From: Jeremy Eder Date: Wed, 11 Feb 2026 15:51:03 -0500 Subject: [PATCH 1/4] fix(ci): run ruff in autofix mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Runs `ruff check --fix` and `ruff format` to apply auto-fixable lint and format corrections, then fails via `git diff --exit-code` if the working tree is dirty — meaning the developer forgot to run ruff locally before pushing. This gives a clear error message pointing them to the fix. Co-Authored-By: Claude Opus 4.6 (1M context) --- .github/workflows/ci.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cfe0754..3f903f8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -48,10 +48,11 @@ jobs: mkdir -p ~/.config/acp cp clusters.yaml.example ~/.config/acp/clusters.yaml - - name: Run ruff (lint and format check) + - name: Run ruff (autofix, then verify) run: | - uv run ruff check . - uv run ruff format --check . + uv run ruff check --fix . + uv run ruff format . + git diff --exit-code || (echo "ruff made changes — commit them locally before pushing" && exit 1) # TODO: Enable mypy once type annotations are complete # - name: Run mypy (type checking) From 057fd7be57bdcd320209c048b86071a3e6e75abf Mon Sep 17 00:00:00 2001 From: Jeremy Eder Date: Wed, 11 Feb 2026 15:52:50 -0500 Subject: [PATCH 2/4] style: apply ruff format to client.py Pre-commit hooks caught one file needing reformatting. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/mcp_acp/client.py | 47 ++++++++++++++++++++++--------------------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/src/mcp_acp/client.py b/src/mcp_acp/client.py index 5840912..9893d6b 100644 --- a/src/mcp_acp/client.py +++ b/src/mcp_acp/client.py @@ -79,8 +79,7 @@ def _get_token(self, cluster_config: dict[str, Any]) -> str: if not token: raise ValueError( - "No authentication token available. " - "Set 'token' in clusters.yaml or ACP_TOKEN environment variable." + "No authentication token available. " "Set 'token' in clusters.yaml or ACP_TOKEN environment variable." ) return token @@ -162,9 +161,7 @@ def _validate_input(self, value: str, field_name: str, max_length: int = 253) -> if len(value) > max_length: raise ValueError(f"{field_name} exceeds maximum length of {max_length}") if not re.match(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$", value): - raise ValueError( - f"{field_name} contains invalid characters. Must match DNS-1123 format." - ) + raise ValueError(f"{field_name} contains invalid characters. Must match DNS-1123 format.") def _validate_bulk_operation(self, items: list[str], operation_name: str) -> None: """Enforce item limit for bulk operations.""" @@ -311,9 +308,7 @@ async def delete_session(self, project: str, session: str, dry_run: bool = False "message": f"Failed to delete session: {str(e)}", } - async def bulk_delete_sessions( - self, project: str, sessions: list[str], dry_run: bool = False - ) -> dict[str, Any]: + async def bulk_delete_sessions(self, project: str, sessions: list[str], dry_run: bool = False) -> dict[str, Any]: """Delete multiple sessions (max 3). Args: @@ -332,15 +327,19 @@ async def bulk_delete_sessions( if dry_run: if result.get("success", True): - dry_run_info["would_execute"].append({ - "session": session, - "info": result.get("session_info"), - }) + dry_run_info["would_execute"].append( + { + "session": session, + "info": result.get("session_info"), + } + ) else: - dry_run_info["skipped"].append({ - "session": session, - "reason": result.get("message"), - }) + dry_run_info["skipped"].append( + { + "session": session, + "reason": result.get("message"), + } + ) else: if result.get("deleted"): success.append(session) @@ -360,13 +359,15 @@ def list_clusters(self) -> dict[str, Any]: default_cluster = self.clusters_config.default_cluster for name, cluster in self.clusters_config.clusters.items(): - clusters.append({ - "name": name, - "server": cluster.server, - "description": cluster.description or "", - "default_project": cluster.default_project, - "is_default": name == default_cluster, - }) + clusters.append( + { + "name": name, + "server": cluster.server, + "description": cluster.description or "", + "default_project": cluster.default_project, + "is_default": name == default_cluster, + } + ) return {"clusters": clusters, "default_cluster": default_cluster} From f825b6dd6496118ab96cbd41a5af6892077d2fcf Mon Sep 17 00:00:00 2001 From: Ambient Code Date: Wed, 11 Feb 2026 20:59:11 +0000 Subject: [PATCH 3/4] fix: merge implicit string concatenation for ruff compliance Co-Authored-By: Claude Opus 4.5 --- src/mcp_acp/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mcp_acp/client.py b/src/mcp_acp/client.py index 9893d6b..587a666 100644 --- a/src/mcp_acp/client.py +++ b/src/mcp_acp/client.py @@ -79,7 +79,7 @@ def _get_token(self, cluster_config: dict[str, Any]) -> str: if not token: raise ValueError( - "No authentication token available. " "Set 'token' in clusters.yaml or ACP_TOKEN environment variable." + "No authentication token available. Set 'token' in clusters.yaml or ACP_TOKEN environment variable." ) return token From 628b19101c7a7dda5c6471540115bcd8612aaf09 Mon Sep 17 00:00:00 2001 From: Ambient Code Date: Wed, 11 Feb 2026 21:05:59 +0000 Subject: [PATCH 4/4] test: rewrite tests for new public-api client - Remove obsolete tests for oc CLI-based client - Remove test_security.py (tested methods no longer exist) - Add test_formatters.py for output formatting - Update test_client.py for HTTP-based client - Update test_server.py for current 7 tools All 40 tests pass with 70% coverage. Co-Authored-By: Claude Opus 4.5 --- tests/test_client.py | 558 ++++++++++++--------------------------- tests/test_formatters.py | 185 +++++++++++++ tests/test_security.py | 164 ------------ tests/test_server.py | 412 ++++++----------------------- 4 files changed, 431 insertions(+), 888 deletions(-) create mode 100644 tests/test_formatters.py delete mode 100644 tests/test_security.py diff --git a/tests/test_client.py b/tests/test_client.py index 82a2868..847d476 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,461 +1,233 @@ """Tests for ACP client.""" -import json -from datetime import datetime, timedelta -from pathlib import Path +from datetime import UTC, datetime, timedelta from unittest.mock import AsyncMock, MagicMock, patch import pytest -import yaml from mcp_acp.client import ACPClient @pytest.fixture -def mock_config(tmp_path: Path) -> str: - """Create a temporary cluster configuration.""" - config_dir = tmp_path / ".config" / "acp" - config_dir.mkdir(parents=True) - - config_file = config_dir / "clusters.yaml" - config = { - "clusters": { - "test-cluster": { - "server": "https://api.test.example.com:443", - "description": "Test Cluster", - "default_project": "test-workspace", - }, - "prod-cluster": { - "server": "https://api.prod.example.com:443", - "description": "Production Cluster", - "default_project": "prod-workspace", - }, - }, - "default_cluster": "test-cluster", - } - - with open(config_file, "w") as f: - yaml.dump(config, f) - - return str(config_file) +def mock_settings(): + """Create mock settings.""" + settings = MagicMock() + settings.config_path = None + return settings @pytest.fixture -def client(mock_config: str) -> ACPClient: - """Create ACP client with mock config.""" - return ACPClient(config_path=mock_config) +def mock_clusters_config(): + """Create mock clusters config.""" + cluster = MagicMock() + cluster.server = "https://api.test.example.com" + cluster.default_project = "test-project" + cluster.description = "Test Cluster" + cluster.token = "test-token" + config = MagicMock() + config.clusters = {"test-cluster": cluster} + config.default_cluster = "test-cluster" + return config -class TestACPClient: - """Tests for ACPClient.""" - def test_load_config(self, client: ACPClient) -> None: - """Test configuration loading.""" - assert "test-cluster" in client.config["clusters"] - assert "prod-cluster" in client.config["clusters"] - assert client.config["default_cluster"] == "test-cluster" +@pytest.fixture +def client(mock_settings, mock_clusters_config): + """Create client with mocked config.""" + with patch("mcp_acp.client.load_settings", return_value=mock_settings): + with patch("mcp_acp.client.load_clusters_config", return_value=mock_clusters_config): + return ACPClient() + + +class TestACPClientInit: + """Tests for client initialization.""" + + def test_client_init(self, client: ACPClient) -> None: + """Test client initializes with config.""" + assert client.clusters_config.default_cluster == "test-cluster" + assert "test-cluster" in client.clusters_config.clusters + + +class TestInputValidation: + """Tests for input validation.""" + + def test_validate_input_valid(self, client: ACPClient) -> None: + """Test valid input passes validation.""" + client._validate_input("my-session", "session") + client._validate_input("project-123", "project") + + def test_validate_input_invalid_chars(self, client: ACPClient) -> None: + """Test invalid characters rejected.""" + with pytest.raises(ValueError, match="invalid characters"): + client._validate_input("my_session", "session") + + with pytest.raises(ValueError, match="invalid characters"): + client._validate_input("My-Session", "session") + + def test_validate_input_too_long(self, client: ACPClient) -> None: + """Test input exceeding max length rejected.""" + with pytest.raises(ValueError, match="exceeds maximum length"): + client._validate_input("a" * 254, "session") + + def test_validate_bulk_operation_within_limit(self, client: ACPClient) -> None: + """Test bulk operation within limit passes.""" + client._validate_bulk_operation(["s1", "s2", "s3"], "delete") + + def test_validate_bulk_operation_exceeds_limit(self, client: ACPClient) -> None: + """Test bulk operation exceeding limit rejected.""" + with pytest.raises(ValueError, match="limited to 3 items"): + client._validate_bulk_operation(["s1", "s2", "s3", "s4"], "delete") + - def test_parse_time_delta(self, client: ACPClient) -> None: - """Test time delta parsing.""" - now = datetime.utcnow() +class TestTimeParsing: + """Tests for time parsing utilities.""" - # Test days + def test_parse_time_delta_days(self, client: ACPClient) -> None: + """Test parsing days.""" + now = datetime.now(UTC) result = client._parse_time_delta("7d") expected = now - timedelta(days=7) - assert abs((result - expected).total_seconds()) < 1 + assert abs((result - expected.replace(tzinfo=None)).total_seconds()) < 5 - # Test hours + def test_parse_time_delta_hours(self, client: ACPClient) -> None: + """Test parsing hours.""" + now = datetime.now(UTC) result = client._parse_time_delta("24h") expected = now - timedelta(hours=24) - assert abs((result - expected).total_seconds()) < 1 - - # Test minutes - result = client._parse_time_delta("30m") - expected = now - timedelta(minutes=30) - assert abs((result - expected).total_seconds()) < 1 + assert abs((result - expected.replace(tzinfo=None)).total_seconds()) < 5 def test_parse_time_delta_invalid(self, client: ACPClient) -> None: - """Test invalid time delta format.""" + """Test invalid format rejected.""" with pytest.raises(ValueError, match="Invalid time format"): - client._parse_time_delta("invalid") + client._parse_time_delta("7x") def test_is_older_than(self, client: ACPClient) -> None: - """Test timestamp comparison.""" - cutoff = datetime.utcnow() - timedelta(days=7) - - # Older timestamp - old_time = (cutoff - timedelta(days=1)).isoformat() + "Z" - assert client._is_older_than(old_time, cutoff) is True + """Test age comparison.""" + cutoff = datetime.now(UTC) - timedelta(days=7) + cutoff_naive = cutoff.replace(tzinfo=None) - # Newer timestamp - new_time = (cutoff + timedelta(days=1)).isoformat() + "Z" - assert client._is_older_than(new_time, cutoff) is False + old_timestamp = (datetime.now(UTC) - timedelta(days=10)).isoformat() + assert client._is_older_than(old_timestamp, cutoff_naive) is True - # None timestamp - assert client._is_older_than(None, cutoff) is False + new_timestamp = (datetime.now(UTC) - timedelta(days=1)).isoformat() + assert client._is_older_than(new_timestamp, cutoff_naive) is False - @pytest.mark.asyncio - async def test_list_sessions_basic(self, client: ACPClient) -> None: - """Test basic session listing.""" - mock_response = { - "items": [ - { - "metadata": { - "name": "session-1", - "creationTimestamp": "2024-01-20T10:00:00Z", - }, - "spec": {"displayName": "Test Session"}, - "status": {"phase": "running"}, - }, - { - "metadata": { - "name": "session-2", - "creationTimestamp": "2024-01-21T10:00:00Z", - }, - "spec": {}, - "status": {"phase": "stopped"}, - }, - ] - } - - with patch.object( - client, - "_run_oc_command", - new_callable=AsyncMock, - return_value=MagicMock(returncode=0, stdout=json.dumps(mock_response).encode()), - ): - result = await client.list_sessions(project="test-project") - - assert result["total"] == 2 - assert len(result["sessions"]) == 2 - assert result["filters_applied"] == {} - @pytest.mark.asyncio - async def test_list_sessions_with_status_filter(self, client: ACPClient) -> None: - """Test session listing with status filter.""" - mock_response = { - "items": [ - { - "metadata": {"name": "session-1"}, - "status": {"phase": "running"}, - }, - { - "metadata": {"name": "session-2"}, - "status": {"phase": "stopped"}, - }, - ] - } - - with patch.object( - client, - "_run_oc_command", - new_callable=AsyncMock, - return_value=MagicMock(returncode=0, stdout=json.dumps(mock_response).encode()), - ): - result = await client.list_sessions(project="test-project", status="running") +class TestListClusters: + """Tests for list_clusters.""" - assert result["total"] == 1 - assert result["sessions"][0]["metadata"]["name"] == "session-1" - assert result["filters_applied"]["status"] == "running" + def test_list_clusters(self, client: ACPClient) -> None: + """Test listing clusters.""" + result = client.list_clusters() - @pytest.mark.asyncio - async def test_list_sessions_with_limit(self, client: ACPClient) -> None: - """Test session listing with limit.""" - mock_response = {"items": [{"metadata": {"name": f"session-{i}"}} for i in range(10)]} - - with patch.object( - client, - "_run_oc_command", - new_callable=AsyncMock, - return_value=MagicMock(returncode=0, stdout=json.dumps(mock_response).encode()), - ): - result = await client.list_sessions(project="test-project", limit=5) - - assert result["total"] == 5 - assert len(result["sessions"]) == 5 - assert result["filters_applied"]["limit"] == 5 + assert "clusters" in result + assert len(result["clusters"]) == 1 + assert result["clusters"][0]["name"] == "test-cluster" + assert result["clusters"][0]["is_default"] is True + assert result["default_cluster"] == "test-cluster" - @pytest.mark.asyncio - async def test_delete_session_success(self, client: ACPClient) -> None: - """Test successful session deletion.""" - with patch.object( - client, - "_run_oc_command", - new_callable=AsyncMock, - return_value=MagicMock(returncode=0, stderr=b""), - ): - result = await client.delete_session(project="test-project", session="test-session") - assert result["deleted"] is True - assert "Successfully deleted" in result["message"] +class TestSwitchCluster: + """Tests for switch_cluster.""" @pytest.mark.asyncio - async def test_delete_session_dry_run(self, client: ACPClient) -> None: - """Test session deletion dry run.""" - mock_session = { - "metadata": {"name": "test-session", "creationTimestamp": "2024-01-20T10:00:00Z"}, - "status": {"phase": "running"}, - } - - with patch.object( - client, - "_get_resource_json", - new_callable=AsyncMock, - return_value=mock_session, - ): - result = await client.delete_session(project="test-project", session="test-session", dry_run=True) - - assert result["dry_run"] is True - assert result["success"] is True - assert "Would delete" in result["message"] - assert "session_info" in result + async def test_switch_cluster_success(self, client: ACPClient) -> None: + """Test switching to valid cluster.""" + result = await client.switch_cluster("test-cluster") + assert result["switched"] is True @pytest.mark.asyncio - async def test_restart_session_success(self, client: ACPClient) -> None: - """Test successful session restart.""" - mock_session = { - "metadata": {"name": "test-session"}, - "status": {"phase": "stopped"}, - } - - with patch.object( - client, - "_run_oc_command", - new_callable=AsyncMock, - ) as mock_cmd: - # First call: get session status - # Second call: patch session - mock_cmd.side_effect = [ - MagicMock(returncode=0, stdout=json.dumps(mock_session).encode()), - MagicMock(returncode=0, stderr=b""), - ] - - result = await client.restart_session(project="test-project", session="test-session") - - assert result["status"] == "restarting" - assert "Successfully restarted" in result["message"] + async def test_switch_cluster_unknown(self, client: ACPClient) -> None: + """Test switching to unknown cluster.""" + result = await client.switch_cluster("unknown-cluster") + assert result["switched"] is False + assert "Unknown cluster" in result["message"] - @pytest.mark.asyncio - async def test_restart_session_dry_run(self, client: ACPClient) -> None: - """Test session restart dry run.""" - mock_session = { - "metadata": {"name": "test-session"}, - "status": {"phase": "stopped", "stoppedAt": "2024-01-20T10:00:00Z"}, - } - - with patch.object( - client, - "_run_oc_command", - new_callable=AsyncMock, - return_value=MagicMock(returncode=0, stdout=json.dumps(mock_session).encode()), - ): - result = await client.restart_session(project="test-project", session="test-session", dry_run=True) - assert result["dry_run"] is True - assert "Would restart" in result["message"] - assert result["session_info"]["current_status"] == "stopped" +class TestWhoami: + """Tests for whoami.""" @pytest.mark.asyncio - async def test_bulk_delete_sessions(self, client: ACPClient) -> None: - """Test bulk session deletion.""" - sessions = ["session-1", "session-2", "session-3"] + async def test_whoami_authenticated(self, client: ACPClient) -> None: + """Test whoami with valid token.""" + result = await client.whoami() - with patch.object(client, "delete_session", new_callable=AsyncMock) as mock_delete: - # Simulate 2 successful and 1 failed deletion - mock_delete.side_effect = [ - {"deleted": True, "message": "Success"}, - {"deleted": True, "message": "Success"}, - {"deleted": False, "message": "Session not found"}, - ] + assert result["authenticated"] is True + assert result["token_valid"] is True + assert result["cluster"] == "test-cluster" + assert result["server"] == "https://api.test.example.com" - result = await client.bulk_delete_sessions(project="test-project", sessions=sessions) - assert len(result["deleted"]) == 2 - assert len(result["failed"]) == 1 - assert result["failed"][0]["session"] == "session-3" +class TestHTTPRequests: + """Tests for HTTP request handling.""" @pytest.mark.asyncio - async def test_bulk_stop_sessions(self, client: ACPClient) -> None: - """Test bulk session stop.""" - sessions = ["session-1", "session-2"] - - mock_session = { - "metadata": {"name": "session-1"}, - "status": {"phase": "running"}, - } - - with ( - patch.object( - client, - "_get_resource_json", - new_callable=AsyncMock, - return_value=mock_session, - ), - patch.object( - client, - "_run_oc_command", - new_callable=AsyncMock, - return_value=MagicMock(returncode=0, stderr=b""), - ), - ): - result = await client.bulk_stop_sessions(project="test-project", sessions=sessions) - - assert len(result["stopped"]) == 2 - assert len(result["failed"]) == 0 + async def test_list_sessions(self, client: ACPClient) -> None: + """Test list_sessions makes correct HTTP request.""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"items": [{"id": "session-1", "status": "running"}]} - @pytest.mark.asyncio - async def test_get_session_logs(self, client: ACPClient) -> None: - """Test getting session logs.""" - mock_pods = { - "items": [ - { - "metadata": {"name": "test-session-pod-12345"}, - } - ] - } - - mock_logs = "2024-01-20 10:00:00 INFO Starting session\n2024-01-20 10:00:01 INFO Session ready\n" - - with patch.object( - client, - "_run_oc_command", - new_callable=AsyncMock, - ) as mock_cmd: - # First call: get pods - # Second call: get logs - mock_cmd.side_effect = [ - MagicMock(returncode=0, stdout=json.dumps(mock_pods).encode()), - MagicMock(returncode=0, stdout=mock_logs.encode()), - ] - - result = await client.get_session_logs(project="test-project", session="test-session", tail_lines=100) - - assert result["logs"] == mock_logs - assert result["lines"] == 3 # Including trailing newline + with patch.object(client, "_get_http_client") as mock_get_client: + mock_http_client = AsyncMock() + mock_http_client.request = AsyncMock(return_value=mock_response) + mock_get_client.return_value = mock_http_client - def test_list_clusters(self, client: ACPClient) -> None: - """Test listing clusters.""" - result = client.list_clusters() - - assert len(result["clusters"]) == 2 - assert result["default_cluster"] == "test-cluster" + result = await client.list_sessions("test-project") - # Check first cluster - test_cluster = next(c for c in result["clusters"] if c["name"] == "test-cluster") - assert test_cluster["is_default"] is True - assert test_cluster["server"] == "https://api.test.example.com:443" - assert test_cluster["default_project"] == "test-workspace" + assert result["total"] == 1 + assert result["sessions"][0]["id"] == "session-1" @pytest.mark.asyncio - async def test_whoami(self, client: ACPClient) -> None: - """Test whoami command.""" - with patch.object( - client, - "_run_oc_command", - new_callable=AsyncMock, - ) as mock_cmd: - # Mock responses for user, server, project, token - mock_cmd.side_effect = [ - MagicMock(returncode=0, stdout=b"testuser"), - MagicMock(returncode=0, stdout=b"https://api.test.example.com:443"), - MagicMock(returncode=0, stdout=b"test-workspace"), - MagicMock(returncode=0, stdout=b"sha256~..."), - ] - - result = await client.whoami() - - assert result["user"] == "testuser" - assert result["server"] == "https://api.test.example.com:443" - assert result["project"] == "test-workspace" - assert result["token_valid"] is True - assert result["authenticated"] is True - - -class TestBulkSafety: - """Tests for bulk operation safety limits.""" - - def test_validate_bulk_operation_within_limit(self, client: ACPClient) -> None: - """Should pass with 3 or fewer items.""" - client._validate_bulk_operation(["s1", "s2", "s3"], "delete") # Should not raise - - def test_validate_bulk_operation_exceeds_limit(self, client: ACPClient) -> None: - """Should raise ValueError with >3 items.""" - with pytest.raises(ValueError, match="limited to 3 items"): - client._validate_bulk_operation(["s1", "s2", "s3", "s4"], "delete") + async def test_delete_session_dry_run(self, client: ACPClient) -> None: + """Test delete_session in dry_run mode.""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"id": "session-1", "status": "running"} + with patch.object(client, "_get_http_client") as mock_get_client: + mock_http_client = AsyncMock() + mock_http_client.request = AsyncMock(return_value=mock_response) + mock_get_client.return_value = mock_http_client -class TestLabelOperations: - """Tests for label operations.""" + result = await client.delete_session("test-project", "session-1", dry_run=True) - @pytest.mark.asyncio - async def test_label_resource_success(self, client: ACPClient) -> None: - """Should label resource successfully.""" - with patch.object(client, "_run_oc_command", new_callable=AsyncMock, return_value=MagicMock(returncode=0)): - result = await client.label_resource( - "agenticsession", - "test-session", - "test-project", - labels={"env": "dev", "team": "api"}, - ) - - assert result["labeled"] is True - assert result["labels"] == {"env": "dev", "team": "api"} + assert result["dry_run"] is True + assert result["success"] is True + assert "Would delete" in result["message"] @pytest.mark.asyncio - async def test_label_resource_invalid_key(self, client: ACPClient) -> None: - """Should reject invalid label keys.""" - with pytest.raises(ValueError, match="Invalid label key"): - await client.label_resource("agenticsession", "test", "test-project", labels={"bad key!": "value"}) + async def test_delete_session_success(self, client: ACPClient) -> None: + """Test delete_session success.""" + mock_response = MagicMock() + mock_response.status_code = 204 - @pytest.mark.asyncio - async def test_unlabel_resource_success(self, client: ACPClient) -> None: - """Should remove labels successfully.""" - with patch.object(client, "_run_oc_command", new_callable=AsyncMock, return_value=MagicMock(returncode=0)): - result = await client.unlabel_resource( - "agenticsession", "test-session", "test-project", label_keys=["env", "team"] - ) + with patch.object(client, "_get_http_client") as mock_get_client: + mock_http_client = AsyncMock() + mock_http_client.request = AsyncMock(return_value=mock_response) + mock_get_client.return_value = mock_http_client - assert result["unlabeled"] is True - assert result["removed_keys"] == ["env", "team"] + result = await client.delete_session("test-project", "session-1") - @pytest.mark.asyncio - async def test_bulk_label_resources(self, client: ACPClient) -> None: - """Should label multiple resources.""" - with patch.object(client, "label_resource", new_callable=AsyncMock) as mock_label: - mock_label.return_value = {"labeled": True} + assert result["deleted"] is True - result = await client.bulk_label_resources( - "agenticsession", ["s1", "s2"], "test-project", labels={"env": "dev"} - ) - assert len(result["labeled"]) == 2 - assert len(result["failed"]) == 0 +class TestBulkOperations: + """Tests for bulk operations.""" @pytest.mark.asyncio - async def test_list_sessions_by_label(self, client: ACPClient) -> None: - """Should list sessions by label selector.""" - mock_response = {"items": [{"metadata": {"name": "session-1"}}]} - - with patch.object( - client, - "_run_oc_command", - new_callable=AsyncMock, - return_value=MagicMock(returncode=0, stdout=json.dumps(mock_response).encode()), - ): - result = await client.list_sessions_by_user_labels("test-project", labels={"env": "dev"}) + async def test_bulk_delete_sessions(self, client: ACPClient) -> None: + """Test bulk delete sessions.""" + mock_response = MagicMock() + mock_response.status_code = 204 - assert result["total"] == 1 + with patch.object(client, "_get_http_client") as mock_get_client: + mock_http_client = AsyncMock() + mock_http_client.request = AsyncMock(return_value=mock_response) + mock_get_client.return_value = mock_http_client - @pytest.mark.asyncio - async def test_bulk_delete_by_label_exceeds_limit(self, client: ACPClient) -> None: - """Should reject when label selector matches >3 sessions.""" - mock_response = {"items": [{"metadata": {"name": f"s{i}"}} for i in range(5)]} - - with patch.object( - client, - "_run_oc_command", - new_callable=AsyncMock, - return_value=MagicMock(returncode=0, stdout=json.dumps(mock_response).encode()), - ): - with pytest.raises(ValueError, match="Max 3 allowed"): - await client.bulk_delete_sessions_by_label("test-project", labels={"cleanup": "true"}) + result = await client.bulk_delete_sessions("test-project", ["s1", "s2"]) + + assert len(result["deleted"]) == 2 + assert "s1" in result["deleted"] + assert "s2" in result["deleted"] diff --git a/tests/test_formatters.py b/tests/test_formatters.py new file mode 100644 index 0000000..3131743 --- /dev/null +++ b/tests/test_formatters.py @@ -0,0 +1,185 @@ +"""Tests for output formatters.""" + +from mcp_acp.formatters import ( + format_bulk_result, + format_clusters, + format_result, + format_sessions_list, + format_whoami, +) + + +class TestFormatResult: + """Tests for format_result.""" + + def test_format_result_dry_run(self) -> None: + """Test formatting dry run result.""" + result = { + "dry_run": True, + "message": "Would delete session", + "session_info": {"name": "test-session", "status": "running"}, + } + + output = format_result(result) + + assert "DRY RUN MODE" in output + assert "Would delete session" in output + assert "test-session" in output + + def test_format_result_normal(self) -> None: + """Test formatting normal result.""" + result = {"message": "Successfully deleted session"} + + output = format_result(result) + + assert "Successfully deleted session" in output + + +class TestFormatSessionsList: + """Tests for format_sessions_list.""" + + def test_format_sessions_list(self) -> None: + """Test formatting sessions list.""" + result = { + "total": 2, + "filters_applied": {"status": "running"}, + "sessions": [ + {"id": "session-1", "status": "running", "createdAt": "2024-01-20T10:00:00Z"}, + {"id": "session-2", "status": "running", "createdAt": "2024-01-21T10:00:00Z"}, + ], + } + + output = format_sessions_list(result) + + assert "Found 2 session(s)" in output + assert "session-1" in output + assert "session-2" in output + assert "running" in output + + def test_format_sessions_list_empty(self) -> None: + """Test formatting empty sessions list.""" + result = {"total": 0, "filters_applied": {}, "sessions": []} + + output = format_sessions_list(result) + + assert "Found 0 session(s)" in output + + +class TestFormatBulkResult: + """Tests for format_bulk_result.""" + + def test_format_bulk_result_dry_run(self) -> None: + """Test formatting bulk delete dry run.""" + result = { + "dry_run": True, + "dry_run_info": { + "would_execute": [ + {"session": "session-1", "info": {"status": "stopped"}}, + {"session": "session-2", "info": {"status": "stopped"}}, + ], + "skipped": [], + }, + } + + output = format_bulk_result(result, "delete") + + assert "DRY RUN MODE" in output + assert "Would delete 2 session(s)" in output + assert "session-1" in output + assert "session-2" in output + + def test_format_bulk_result_success(self) -> None: + """Test formatting bulk delete success.""" + result = { + "deleted": ["session-1", "session-2"], + "failed": [], + } + + output = format_bulk_result(result, "delete") + + assert "Successfully deleted 2 session(s)" in output + assert "session-1" in output + assert "session-2" in output + + def test_format_bulk_result_with_failures(self) -> None: + """Test formatting bulk delete with failures.""" + result = { + "deleted": ["session-1"], + "failed": [{"session": "session-2", "error": "Not found"}], + } + + output = format_bulk_result(result, "delete") + + assert "Successfully deleted 1 session(s)" in output + assert "Failed" in output + assert "session-2" in output + assert "Not found" in output + + +class TestFormatClusters: + """Tests for format_clusters.""" + + def test_format_clusters(self) -> None: + """Test formatting clusters list.""" + result = { + "clusters": [ + { + "name": "test-cluster", + "server": "https://api.test.example.com", + "description": "Test Cluster", + "default_project": "test-workspace", + "is_default": True, + }, + ], + "default_cluster": "test-cluster", + } + + output = format_clusters(result) + + assert "test-cluster [DEFAULT]" in output + assert "https://api.test.example.com" in output + assert "Test Cluster" in output + + def test_format_clusters_empty(self) -> None: + """Test formatting empty clusters list.""" + result = {"clusters": [], "default_cluster": None} + + output = format_clusters(result) + + assert "No clusters configured" in output + + +class TestFormatWhoami: + """Tests for format_whoami.""" + + def test_format_whoami_authenticated(self) -> None: + """Test formatting whoami when authenticated.""" + result = { + "authenticated": True, + "cluster": "test-cluster", + "server": "https://api.test.example.com", + "project": "test-workspace", + "token_valid": True, + } + + output = format_whoami(result) + + assert "Token Configured: Yes" in output + assert "Cluster: test-cluster" in output + assert "Server: https://api.test.example.com" in output + assert "Project: test-workspace" in output + + def test_format_whoami_not_authenticated(self) -> None: + """Test formatting whoami when not authenticated.""" + result = { + "authenticated": False, + "cluster": "test-cluster", + "server": "https://api.test.example.com", + "project": "unknown", + "token_valid": False, + } + + output = format_whoami(result) + + assert "Token Configured: No" in output + assert "Set token" in output diff --git a/tests/test_security.py b/tests/test_security.py deleted file mode 100644 index b9a30f1..0000000 --- a/tests/test_security.py +++ /dev/null @@ -1,164 +0,0 @@ -"""Security tests for MCP ACP Server.""" - -import pytest - -from mcp_acp.client import ACPClient - - -class TestInputValidation: - """Test input validation and security controls.""" - - def test_validate_input_valid_names(self): - """Test that valid Kubernetes names pass validation.""" - client = ACPClient() - - valid_names = [ - "test-session", - "my-project-123", - "a", - "session-with-many-dashes", - "123-numeric-start", - ] - - for name in valid_names: - # Should not raise - client._validate_input(name, "test") - - def test_validate_input_invalid_names(self): - """Test that invalid names are rejected.""" - client = ACPClient() - - invalid_names = [ - "Test-Session", # uppercase - "my_project", # underscore - "session.name", # dot - "session name", # space - "session;name", # semicolon - "../../../etc/passwd", # path traversal - "session|name", # pipe - "session&name", # ampersand - "-starts-dash", # starts with dash - "ends-dash-", # ends with dash - "a" * 254, # too long - ] - - for name in invalid_names: - with pytest.raises(ValueError): - client._validate_input(name, "test") - - -class TestCommandInjectionPrevention: - """Test command injection prevention.""" - - @pytest.mark.asyncio - async def test_run_oc_command_rejects_metacharacters(self): - """Test that shell metacharacters in arguments are rejected.""" - client = ACPClient() - - malicious_args = [ - "test; rm -rf /", - "test | cat /etc/passwd", - "test && whoami", - "test `ls`", - "test $HOME", - "test\nrm -rf /", - ] - - for arg in malicious_args: - with pytest.raises(ValueError, match="suspicious characters"): - await client._run_oc_command(["get", "pods", arg]) - - def test_resource_type_whitelist(self): - """Test that only whitelisted resource types are allowed.""" - client = ACPClient() - - # Allowed types should work - assert "agenticsession" in client.ALLOWED_RESOURCE_TYPES - assert "pods" in client.ALLOWED_RESOURCE_TYPES - assert "event" in client.ALLOWED_RESOURCE_TYPES - - # Disallowed types should fail - assert "secrets" not in client.ALLOWED_RESOURCE_TYPES - assert "configmaps" not in client.ALLOWED_RESOURCE_TYPES - - @pytest.mark.asyncio - async def test_get_resource_json_validates_resource_type(self): - """Test that _get_resource_json validates resource types.""" - client = ACPClient() - - with pytest.raises(ValueError, match="not allowed"): - await client._get_resource_json("secrets", "test", "default") - - -class TestResourceLimits: - """Test resource exhaustion protection.""" - - def test_max_log_lines_limit(self): - """Test that log line limits are enforced.""" - client = ACPClient() - - # Should accept valid values - assert 100 <= client.MAX_LOG_LINES - - @pytest.mark.asyncio - async def test_get_session_logs_validates_tail_lines(self): - """Test that tail_lines is validated.""" - client = ACPClient() - - # Too large - result = await client.get_session_logs("test", "session", tail_lines=999999) - assert "error" in result - assert "tail_lines" in result["error"].lower() - - # Negative - result = await client.get_session_logs("test", "session", tail_lines=-1) - assert "error" in result - - def test_timeout_constants(self): - """Test that timeout constants are reasonable.""" - client = ACPClient() - - # Should have a max command timeout - assert hasattr(client, "MAX_COMMAND_TIMEOUT") - assert client.MAX_COMMAND_TIMEOUT > 0 - assert client.MAX_COMMAND_TIMEOUT <= 600 # Not more than 10 minutes - - -class TestDataProtection: - """Test sensitive data protection.""" - - @pytest.mark.asyncio - async def test_list_workflows_validates_url(self): - """Test that workflow repository URLs are validated.""" - client = ACPClient() - - # Invalid URLs should be rejected - invalid_urls = [ - "file:///etc/passwd", - "ftp://example.com", - "javascript:alert(1)", - "data:text/html,", - "https://example.com; rm -rf /", - "https://example.com | cat", - ] - - for url in invalid_urls: - result = await client.list_workflows(url) - assert "error" in result - - def test_add_cluster_validates_inputs(self): - """Test that add_cluster validates all inputs.""" - client = ACPClient() - - # Invalid cluster name - result = client.add_cluster("Invalid Name", "https://example.com") - assert not result.get("added") - assert "error" in result.get("message", "").lower() or not result.get("added") - - # Invalid server URL - result = client.add_cluster("valid-name", "not-a-url") - assert not result.get("added") - - -if __name__ == "__main__": - pytest.main([__file__, "-v"]) diff --git a/tests/test_server.py b/tests/test_server.py index b0c6046..b3dcbb3 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -4,392 +4,119 @@ import pytest -from mcp_acp.formatters import ( - format_bulk_result, - format_clusters, - format_logs, - format_result, - format_sessions_list, - format_whoami, -) from mcp_acp.server import call_tool, list_tools -class TestServerFormatters: - """Tests for server formatting functions.""" - - def testformat_result_dry_run(self) -> None: - """Test formatting dry run results.""" - result = { - "dry_run": True, - "message": "Would delete session", - "session_info": {"name": "test-session", "status": "running"}, - } - - output = format_result(result) - - assert "DRY RUN MODE" in output - assert "Would delete session" in output - assert "test-session" in output - - def testformat_result_normal(self) -> None: - """Test formatting normal results.""" - result = {"deleted": True, "message": "Successfully deleted session"} - - output = format_result(result) - - assert "Successfully deleted session" in output - - def testformat_sessions_list(self) -> None: - """Test formatting sessions list.""" - result = { - "total": 2, - "filters_applied": {"status": "running"}, - "sessions": [ - { - "metadata": { - "name": "session-1", - "creationTimestamp": "2024-01-20T10:00:00Z", - }, - "spec": {"displayName": "Test Session"}, - "status": {"phase": "running"}, - }, - { - "metadata": { - "name": "session-2", - "creationTimestamp": "2024-01-21T10:00:00Z", - }, - "spec": {}, - "status": {"phase": "running"}, - }, - ], - } - - output = format_sessions_list(result) - - assert "Found 2 session(s)" in output - assert "session-1" in output - assert "Test Session" in output - assert "session-2" in output - assert "running" in output - - def testformat_bulk_result_delete_dry_run(self) -> None: - """Test formatting bulk delete dry run.""" - result = { - "dry_run": True, - "dry_run_info": { - "would_delete": [ - {"session": "session-1", "info": {"status": "stopped"}}, - {"session": "session-2", "info": {"status": "stopped"}}, - ], - "not_found": ["session-3"], - }, - } - - output = format_bulk_result(result, "delete") - - assert "DRY RUN MODE" in output - assert "Would delete 2 session(s)" in output - assert "session-1" in output - assert "session-2" in output - assert "Not found" in output - assert "session-3" in output - - def testformat_bulk_result_delete_normal(self) -> None: - """Test formatting bulk delete normal mode.""" - result = { - "deleted": ["session-1", "session-2"], - "failed": [{"session": "session-3", "error": "Not found"}], - } - - output = format_bulk_result(result, "delete") - - assert "Successfully deleted 2 session(s)" in output - assert "session-1" in output - assert "session-2" in output - assert "Failed" in output - assert "session-3" in output - assert "Not found" in output - - def testformat_bulk_result_stop_dry_run(self) -> None: - """Test formatting bulk stop dry run.""" - result = { - "dry_run": True, - "dry_run_info": { - "would_stop": [ - {"session": "session-1", "current_status": "running"}, - ], - "not_running": [ - {"session": "session-2", "current_status": "stopped"}, - ], - }, - } - - output = format_bulk_result(result, "stop") - - assert "DRY RUN MODE" in output - assert "Would stop 1 session(s)" in output - assert "session-1" in output - assert "Not running" in output - assert "session-2" in output - - def testformat_logs(self) -> None: - """Test formatting logs.""" - result = { - "logs": "2024-01-20 10:00:00 INFO Starting\n2024-01-20 10:00:01 INFO Ready\n", - "container": "runner", - "lines": 3, - } - - output = format_logs(result) - - assert "container 'runner'" in output - assert "3 lines" in output - assert "Starting" in output - assert "Ready" in output - - def testformat_logs_error(self) -> None: - """Test formatting logs with error.""" - result = {"error": "Pod not found"} - - output = format_logs(result) - - assert "Error: Pod not found" in output - - def testformat_clusters(self) -> None: - """Test formatting clusters list.""" - result = { - "clusters": [ - { - "name": "test-cluster", - "server": "https://api.test.example.com:443", - "description": "Test Cluster", - "default_project": "test-workspace", - "is_default": True, - }, - { - "name": "prod-cluster", - "server": "https://api.prod.example.com:443", - "description": "Production Cluster", - "default_project": "prod-workspace", - "is_default": False, - }, - ], - "default_cluster": "test-cluster", - } - - output = format_clusters(result) - - assert "test-cluster [DEFAULT]" in output - assert "prod-cluster" in output - assert "Test Cluster" in output - assert "Production Cluster" in output - assert "https://api.test.example.com:443" in output - - def testformat_clusters_empty(self) -> None: - """Test formatting empty clusters list.""" - result = {"clusters": [], "default_cluster": None} - - output = format_clusters(result) - - assert "No clusters configured" in output - - def testformat_whoami_authenticated(self) -> None: - """Test formatting whoami when authenticated.""" - result = { - "authenticated": True, - "user": "testuser", - "server": "https://api.test.example.com:443", - "project": "test-workspace", - "token_valid": True, - } - - output = format_whoami(result) - - assert "Authenticated: Yes" in output - assert "User: testuser" in output - assert "Server: https://api.test.example.com:443" in output - assert "Project: test-workspace" in output - assert "Token Valid: Yes" in output - - def testformat_whoami_not_authenticated(self) -> None: - """Test formatting whoami when not authenticated.""" - result = { - "authenticated": False, - "user": "unknown", - "server": "unknown", - "project": "unknown", - "token_valid": False, - } - - output = format_whoami(result) - - assert "Authenticated: No" in output - assert "not authenticated" in output - - -class TestServerTools: - """Tests for server tool handling.""" +class TestListTools: + """Tests for list_tools.""" @pytest.mark.asyncio - async def test_list_tools(self) -> None: + async def test_list_tools_returns_all_tools(self) -> None: """Test listing available tools.""" tools = await list_tools() - tool_names = [t.name for t in tools] - # Check P0 tools - assert "acp_delete_session" in tool_names + # Session tools assert "acp_list_sessions" in tool_names - - # Check P1 tools - assert "acp_restart_session" in tool_names + assert "acp_get_session" in tool_names + assert "acp_delete_session" in tool_names assert "acp_bulk_delete_sessions" in tool_names - assert "acp_bulk_stop_sessions" in tool_names - assert "acp_get_session_logs" in tool_names + + # Cluster tools assert "acp_list_clusters" in tool_names assert "acp_whoami" in tool_names + assert "acp_switch_cluster" in tool_names @pytest.mark.asyncio - async def test_call_tool_delete_session(self) -> None: - """Test calling delete session tool.""" - mock_client = MagicMock() - mock_client.delete_session = AsyncMock(return_value={"deleted": True, "message": "Success"}) + async def test_list_tools_count(self) -> None: + """Test correct number of tools.""" + tools = await list_tools() + assert len(tools) == 7 - with patch("mcp_acp.server.get_client", return_value=mock_client): - result = await call_tool( - "acp_delete_session", - {"project": "test-project", "session": "test-session"}, - ) - assert len(result) == 1 - assert "Success" in result[0].text - - mock_client.delete_session.assert_called_once_with(project="test-project", session="test-session") +class TestCallTool: + """Tests for call_tool.""" @pytest.mark.asyncio async def test_call_tool_list_sessions(self) -> None: """Test calling list sessions tool.""" mock_client = MagicMock() + mock_client.clusters_config = MagicMock() + mock_client.clusters_config.default_cluster = "test" + mock_client.clusters_config.clusters = {"test": MagicMock(default_project="test-project")} mock_client.list_sessions = AsyncMock( return_value={ "total": 1, "filters_applied": {}, - "sessions": [ - { - "metadata": {"name": "test-session"}, - "spec": {}, - "status": {"phase": "running"}, - } - ], + "sessions": [{"id": "test-session", "status": "running", "createdAt": "2024-01-01T00:00:00Z"}], } ) with patch("mcp_acp.server.get_client", return_value=mock_client): - result = await call_tool( - "acp_list_sessions", - {"project": "test-project", "status": "running"}, - ) + result = await call_tool("acp_list_sessions", {"project": "test-project"}) assert len(result) == 1 assert "test-session" in result[0].text - mock_client.list_sessions.assert_called_once() - @pytest.mark.asyncio - async def test_call_tool_restart_session(self) -> None: - """Test calling restart session tool.""" + async def test_call_tool_delete_session(self) -> None: + """Test calling delete session tool.""" mock_client = MagicMock() - mock_client.restart_session = AsyncMock(return_value={"status": "restarting", "message": "Success"}) + mock_client.clusters_config = MagicMock() + mock_client.clusters_config.default_cluster = "test" + mock_client.clusters_config.clusters = {"test": MagicMock(default_project="test-project")} + mock_client.delete_session = AsyncMock(return_value={"deleted": True, "message": "Success"}) with patch("mcp_acp.server.get_client", return_value=mock_client): - result = await call_tool( - "acp_restart_session", - {"project": "test-project", "session": "test-session", "dry_run": True}, - ) + result = await call_tool("acp_delete_session", {"project": "test-project", "session": "test-session"}) assert len(result) == 1 - - mock_client.restart_session.assert_called_once_with( - project="test-project", session="test-session", dry_run=True - ) + assert "Success" in result[0].text @pytest.mark.asyncio - async def test_call_tool_bulk_delete(self) -> None: - """Test calling bulk delete tool.""" + async def test_call_tool_bulk_delete_requires_confirm(self) -> None: + """Test bulk delete requires confirm flag.""" mock_client = MagicMock() - mock_client.bulk_delete_sessions = AsyncMock(return_value={"deleted": ["s1", "s2"], "failed": []}) + mock_client.clusters_config = MagicMock() + mock_client.clusters_config.default_cluster = "test" + mock_client.clusters_config.clusters = {"test": MagicMock(default_project="test-project")} with patch("mcp_acp.server.get_client", return_value=mock_client): result = await call_tool( "acp_bulk_delete_sessions", - { - "project": "test-project", - "sessions": ["s1", "s2"], - "confirm": True, - }, - ) - - assert len(result) == 1 - assert "Successfully deleted 2 resource(s)" in result[0].text - - @pytest.mark.asyncio - async def test_call_tool_bulk_stop(self) -> None: - """Test calling bulk stop tool.""" - mock_client = MagicMock() - mock_client.bulk_stop_sessions = AsyncMock(return_value={"stopped": ["s1", "s2"], "failed": []}) - - with patch("mcp_acp.server.get_client", return_value=mock_client): - result = await call_tool( - "acp_bulk_stop_sessions", - { - "project": "test-project", - "sessions": ["s1", "s2"], - "confirm": True, - }, + {"project": "test-project", "sessions": ["s1", "s2"]}, ) - assert len(result) == 1 - assert "Successfully stopd 2 resource(s)" in result[0].text + assert "requires confirm=true" in result[0].text @pytest.mark.asyncio - async def test_call_tool_get_logs(self) -> None: - """Test calling get logs tool.""" + async def test_call_tool_bulk_delete_with_confirm(self) -> None: + """Test bulk delete with confirm flag.""" mock_client = MagicMock() - mock_client.get_session_logs = AsyncMock( - return_value={ - "logs": "test logs", - "container": "runner", - "lines": 1, - } - ) + mock_client.clusters_config = MagicMock() + mock_client.clusters_config.default_cluster = "test" + mock_client.clusters_config.clusters = {"test": MagicMock(default_project="test-project")} + mock_client.bulk_delete_sessions = AsyncMock(return_value={"deleted": ["s1", "s2"], "failed": []}) with patch("mcp_acp.server.get_client", return_value=mock_client): result = await call_tool( - "acp_get_session_logs", - { - "project": "test-project", - "session": "test-session", - "tail_lines": 100, - }, + "acp_bulk_delete_sessions", + {"project": "test-project", "sessions": ["s1", "s2"], "confirm": True}, ) assert len(result) == 1 - assert "test logs" in result[0].text + assert "Successfully deleted 2" in result[0].text @pytest.mark.asyncio async def test_call_tool_list_clusters(self) -> None: """Test calling list clusters tool.""" mock_client = MagicMock() + mock_client.clusters_config = MagicMock() + mock_client.clusters_config.default_cluster = "test" + mock_client.clusters_config.clusters = {} mock_client.list_clusters = MagicMock( return_value={ - "clusters": [ - { - "name": "test", - "server": "https://test.com", - "is_default": True, - } - ], + "clusters": [{"name": "test", "server": "https://test.com", "is_default": True}], "default_cluster": "test", } ) @@ -404,10 +131,13 @@ async def test_call_tool_list_clusters(self) -> None: async def test_call_tool_whoami(self) -> None: """Test calling whoami tool.""" mock_client = MagicMock() + mock_client.clusters_config = MagicMock() + mock_client.clusters_config.default_cluster = "test" + mock_client.clusters_config.clusters = {} mock_client.whoami = AsyncMock( return_value={ "authenticated": True, - "user": "testuser", + "cluster": "test", "server": "https://test.com", "project": "test-project", "token_valid": True, @@ -418,30 +148,50 @@ async def test_call_tool_whoami(self) -> None: result = await call_tool("acp_whoami", {}) assert len(result) == 1 - assert "testuser" in result[0].text + assert "test" in result[0].text @pytest.mark.asyncio - async def test_call_tool_error_handling(self) -> None: - """Test tool error handling.""" + async def test_call_tool_switch_cluster(self) -> None: + """Test calling switch cluster tool.""" mock_client = MagicMock() - mock_client.delete_session = AsyncMock(side_effect=Exception("Test error")) + mock_client.clusters_config = MagicMock() + mock_client.clusters_config.default_cluster = "test" + mock_client.clusters_config.clusters = {} + mock_client.switch_cluster = AsyncMock( + return_value={"switched": True, "previous": "old", "current": "new", "message": "Switched"} + ) with patch("mcp_acp.server.get_client", return_value=mock_client): - result = await call_tool( - "acp_delete_session", - {"project": "test-project", "session": "test-session"}, - ) + result = await call_tool("acp_switch_cluster", {"cluster": "new"}) assert len(result) == 1 - assert "Error: Test error" in result[0].text + assert "Switched" in result[0].text @pytest.mark.asyncio async def test_call_tool_unknown(self) -> None: """Test calling unknown tool.""" mock_client = MagicMock() + mock_client.clusters_config = MagicMock() + mock_client.clusters_config.default_cluster = "test" + mock_client.clusters_config.clusters = {} with patch("mcp_acp.server.get_client", return_value=mock_client): result = await call_tool("unknown_tool", {}) assert len(result) == 1 assert "Unknown tool" in result[0].text + + @pytest.mark.asyncio + async def test_call_tool_error_handling(self) -> None: + """Test tool error handling.""" + mock_client = MagicMock() + mock_client.clusters_config = MagicMock() + mock_client.clusters_config.default_cluster = "test" + mock_client.clusters_config.clusters = {"test": MagicMock(default_project="test-project")} + mock_client.delete_session = AsyncMock(side_effect=ValueError("Test error")) + + with patch("mcp_acp.server.get_client", return_value=mock_client): + result = await call_tool("acp_delete_session", {"project": "test-project", "session": "test"}) + + assert len(result) == 1 + assert "Test error" in result[0].text