From 2f2981a3df48aeb7f4401307c98f3e2910cdbadc Mon Sep 17 00:00:00 2001 From: Dipesh Mittal Date: Tue, 28 Apr 2026 09:04:51 +0530 Subject: [PATCH] test: add FastAPI mock backend + Kind-based e2e harness MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lets the local VPC agent be tested against a stand-in for the DrDroid cloud backend without hitting prod. The mock issues its own bearer token, records every inbound request as JSONL, and lets tests seed connection-test + playbook-task scenarios via an admin API. What's in mock_backend/: - app.py / storage.py: FastAPI service implementing the agent's wire contract — GET/POST /connectors/proxy/ping, /register, /connector/connection/{tests,results}, /connector/metadata/register, /playbooks-engine/proxy/execution/{tasks,results}. - Dockerfile + k8s/manifests.yaml: tiny python:3.12-alpine image deployed as a Service in the drdroid namespace; agent pods reach it over cluster DNS at drd-vpc-agent-mock.drdroid.svc.cluster.local:8080. - deploy_kind.sh: one-shot orchestrator. Ensures Kind cluster, builds (or auto-reuses) the agent image, mints a token, applies the credentials-secret the helm chart needs, deploys the agent via helm, seeds scenarios, runs e2e.py. Self-contained — does not depend on the per-user gitignored deploy_local.sh. - e2e.py: validator. Asserts startup ping, heartbeat, queue drain, result round-trip, and ASSET_REFRESH metadata batches. For each kubectl scenario it parses the agent's output, asserts stable cluster invariants are present, and (when host kubectl is available) diffs the set of metadata.name values against a host snapshot. - scenarios/default.json: 1 ASSET_REFRESH + 4 real kubectl tasks (get pods/svc/namespaces/events on drdroid). Connection-tests are filtered by what's in credentials/secrets.yaml so empty configs don't trigger spurious failures. Project-scoped slash command at .claude/commands/test-vpc-agent.md so /test-vpc-agent runs the full loop. .claude/settings.local.json added to .gitignore. Validated end-to-end on Kind: 20/20 checks pass, including exact host-vs-agent name-set match for pods, services, and namespaces. Co-Authored-By: Claude Opus 4.7 (1M context) --- .claude/commands/test-vpc-agent.md | 101 +++++++ .gitignore | 5 + mock_backend/.gitignore | 5 + mock_backend/Dockerfile | 13 + mock_backend/README.md | 199 +++++++++++++ mock_backend/app.py | 316 +++++++++++++++++++++ mock_backend/deploy_kind.sh | 282 +++++++++++++++++++ mock_backend/e2e.py | 414 ++++++++++++++++++++++++++++ mock_backend/k8s/manifests.yaml | 70 +++++ mock_backend/requirements.txt | 4 + mock_backend/run_agent.sh | 55 ++++ mock_backend/run_e2e.sh | 105 +++++++ mock_backend/run_mock.sh | 82 ++++++ mock_backend/scenarios/default.json | 69 +++++ mock_backend/seed_scenarios.py | 80 ++++++ mock_backend/storage.py | 195 +++++++++++++ 16 files changed, 1995 insertions(+) create mode 100644 .claude/commands/test-vpc-agent.md create mode 100644 mock_backend/.gitignore create mode 100644 mock_backend/Dockerfile create mode 100644 mock_backend/README.md create mode 100644 mock_backend/app.py create mode 100755 mock_backend/deploy_kind.sh create mode 100644 mock_backend/e2e.py create mode 100644 mock_backend/k8s/manifests.yaml create mode 100644 mock_backend/requirements.txt create mode 100755 mock_backend/run_agent.sh create mode 100755 mock_backend/run_e2e.sh create mode 100755 mock_backend/run_mock.sh create mode 100644 mock_backend/scenarios/default.json create mode 100644 mock_backend/seed_scenarios.py create mode 100644 mock_backend/storage.py diff --git a/.claude/commands/test-vpc-agent.md b/.claude/commands/test-vpc-agent.md new file mode 100644 index 0000000..3e70824 --- /dev/null +++ b/.claude/commands/test-vpc-agent.md @@ -0,0 +1,101 @@ +--- +description: Run the VPC-agent end-to-end test loop against the local FastAPI mock backend (Kind or direct-celery) +argument-hint: "[--kind | --code] [--scenarios path/to/file.json] [--timeout 180] [--manual]" +--- + +# Run the VPC-agent end-to-end test loop + +You are running the test suite for the local VPC agent against the FastAPI mock backend at `mock_backend/`. The mock simulates DrDroid cloud — it issues a bearer token, records every inbound request as JSONL, and lets you seed connection-test + playbook-task scenarios. + +User args (may be empty): `$ARGUMENTS` + +## Step 0: pick the path + +Two paths exist. Choose based on what the user is changing: + +- **`--kind` (default when uncertain on a branch that touches Dockerfile, base image, requirements.txt, helm/, or deploy_local.sh):** runs the *real* agent Docker image inside Kind alongside a sibling mock pod. Use this for image migrations (e.g. Debian→Alpine), helm changes, dep upgrades that affect runtime. +- **`--code` (default for pure Python changes — connector logic, playbook tasks, business code):** runs celery directly on the host against a host-bound mock. Faster (~30s vs minutes for image build). + +If the user passed `--manual`, skip both and print the three-terminal manual flow from `mock_backend/README.md`. Stop. + +If neither flag is set, infer: `git diff main --name-only | grep -E '^(Dockerfile|requirements\.txt|helm/|deploy_local\.sh|mock_backend/Dockerfile)'` — any hit ⇒ default `--kind`, else default `--code`. + +## Step 1: preflight + +Path-specific. Run the relevant checks in parallel. + +**For `--kind`:** +- `kind get clusters` — check the binary works; cluster will be created if missing. +- `kubectl version --client` — fail fast if missing. +- `docker info` — Docker Desktop must be running. +- `cat credentials/secrets.yaml | head -5` from the repo root — should be non-empty. Empty file means scenarios that reference real connectors will fail with "Connector not found". Warn but proceed. + +**For `--code`:** +- `redis-cli ping` — must return `PONG`. If not, suggest `docker run -d --rm --name drd-redis -p 6379:6379 redis:alpine` and ask before running. +- `cat credentials/secrets.yaml | head -5` (same as above). +- `lsof -i :8080` — must be free. If taken, pass `MOCK_BACKEND_PORT=` through. +- `python -c 'import django, celery'` from the repo root — agent Python deps importable. If not, ask the user about `pip install -r requirements.txt`. + +## Step 2: run the loop + +### `--kind` + +```bash +cd /Users/dipeshmittal/drdroid/drd-vpc-agent +./mock_backend/deploy_kind.sh +``` + +Pass `--scenarios ` and `--timeout ` through. The script: +1. ensures Kind cluster `drd-local` + `drdroid` namespace +2. builds `mock_backend/Dockerfile` and `kind load`s it +3. applies `mock_backend/k8s/manifests.yaml` (Deployment + Service) +4. waits for readiness, port-forwards `:18080` +5. mints a token via `/admin/tokens` +6. shells out to `./deploy_local.sh ` with `DRD_CLOUD_API_HOST=http://drd-vpc-agent-mock.drdroid.svc.cluster.local:8080` so agent pods talk to the mock over cluster DNS +7. seeds scenarios + runs `e2e.py` + +The user does NOT bring a token — the mock issues it. `--reset-only` tears down the agent helm release and mock manifest. + +### `--code` + +```bash +cd /Users/dipeshmittal/drdroid/drd-vpc-agent +./mock_backend/run_e2e.sh +``` + +Same shape (start mock → mint token → boot agent → seed → validate) but agent runs as celery host processes. Pass `SCENARIOS_FILE=` and `E2E_TIMEOUT=` through. + +Both scripts stream to stderr. Let them run; don't wrap in another background subprocess. + +## Step 3: report + +On PASS: one or two lines. + +On FAIL: open the relevant logs. + +**For `--kind`:** +- `kubectl -n drdroid get pods` — check pod state. +- `kubectl -n drdroid logs deploy/drd-vpc-agent-mock --tail 100` +- `kubectl -n drdroid logs -l app=celery-worker --tail 100` +- `kubectl -n drdroid logs -l app=celery-beat --tail 100` +- `kubectl -n drdroid describe pod ` if any pod is in CrashLoopBackOff. +- While `deploy_kind.sh` is running, the mock admin API is up at `:18080` — `curl -s http://127.0.0.1:18080/admin/recorded` for buckets, `curl -s http://127.0.0.1:18080/admin/recorded/` for events. + +**For `--code`:** +- `mock_backend/data/mock-backend.log` +- `mock_backend/data/agent-logs/{beat,worker,worker-exec}.log` +- `mock_backend/data/recorded/*.jsonl` — the actual wire traffic. + +Common failure modes: +- Agent pod CrashLoopBackOff right after `deploy_local.sh`: usually `secrets.yaml` is empty/malformed, or the alpine image is missing a binary the agent runtime needs (this is exactly what we're testing for on `experiment/alpine-base` — surface the traceback; don't try to fix it). +- "agent did not poll X": worker isn't draining. Check broker connectivity (redis pod for kind; host redis for code path); check worker log for traceback. +- "playbook_tasks executed without errors: FAIL" with `Connector not found`: scenario's `connector_name` isn't in `credentials/secrets.yaml`. Tell the user; don't edit secrets yourself. +- 401 in mock log: token env didn't reach the agent. For kind, check `kubectl -n drdroid get secret drd-cloud-secret -o yaml | base64 -D` (token field). For code, check the env exported by `run_e2e.sh`. + +Don't speculate beyond the logs. If genuinely ambiguous after reading them, ask the user. + +## Notes + +- The test harness lives at `mock_backend/`. Code is checked in; `mock_backend/data/` is gitignored. +- This skill exists because the agent talks to `DRD_CLOUD_API_HOST` for ping/registration/connection tests/asset metadata/playbook tasks, and exercising those without mocking means hitting production. +- If the user wants to add new scenarios, point them at `mock_backend/scenarios/default.json` (commented inline) or `mock_backend/README.md`. diff --git a/.gitignore b/.gitignore index 85e6d1d..992e0d9 100644 --- a/.gitignore +++ b/.gitignore @@ -182,3 +182,8 @@ helm/values.local.yaml # Local copy of debug toolkit for testing drdroid-debug-toolkit/ deploy_local.sh + +# Per-user Claude Code settings (project-scoped slash commands under +# .claude/commands/ should be committed; this file holds local permission +# allowlists that vary by machine). +.claude/settings.local.json diff --git a/mock_backend/.gitignore b/mock_backend/.gitignore new file mode 100644 index 0000000..0fb4b93 --- /dev/null +++ b/mock_backend/.gitignore @@ -0,0 +1,5 @@ +data/ +__pycache__/ +*.pyc +.venv/ +venv/ diff --git a/mock_backend/Dockerfile b/mock_backend/Dockerfile new file mode 100644 index 0000000..71b34ae --- /dev/null +++ b/mock_backend/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.12-alpine + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY app.py storage.py ./ + +ENV PYTHONUNBUFFERED=1 +EXPOSE 8080 + +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8080", "--log-level", "info"] diff --git a/mock_backend/README.md b/mock_backend/README.md new file mode 100644 index 0000000..8677393 --- /dev/null +++ b/mock_backend/README.md @@ -0,0 +1,199 @@ +# mock_backend — local stand-in for DrDroid cloud + +A FastAPI service that speaks the exact wire protocol the VPC agent +expects. The agent has no idea it's not the real backend. + +## What it does + +| Surface | Purpose | +|---|---| +| `GET /connectors/proxy/ping` | startup reachability check (agent app `ready()`) | +| `POST /connectors/proxy/ping` | periodic heartbeat with pod health (every 50s) | +| `POST /connectors/proxy/register` | connector registration | +| `POST /connectors/proxy/connector/connection/tests` | poll for connection-test requests (every 10s) | +| `POST /connectors/proxy/connector/connection/results` | result upload | +| `POST /connectors/proxy/connector/metadata/register` | asset-metadata batches from extractors | +| `POST /playbooks-engine/proxy/execution/tasks` | poll for tasks (every 1s) — incl. `ASSET_REFRESH` | +| `POST /playbooks-engine/proxy/execution/results` | task / asset_refresh result upload | + +Auth is `Authorization: Bearer `. Tokens are issued by the mock +itself — the agent only knows it via `DRD_CLOUD_API_TOKEN`. + +Every inbound request (body + headers + query + the response we sent) is +recorded as a JSONL event under `data/recorded/.jsonl`. + +## Layout + +``` +mock_backend/ +├── app.py # FastAPI app: proxy + admin + catch-all +├── storage.py # JSON persistence (tokens, recordings, queues) +├── seed_scenarios.py # push a scenarios file into the admin API +├── e2e.py # poll recordings + assert agent behaviour +├── scenarios/default.json # default seeded scenarios +├── run_mock.sh # foreground: mock + token +├── run_agent.sh # foreground: celery beat/workers pointed at the mock +├── run_e2e.sh # one-shot loop: mock → agent → seed → validate +└── data/ # gitignored — recordings, queues, logs +``` + +## Three ways to run it + +### 1. Test the actual Docker image in Kind (recommended for image migrations) + +This is what you want when validating things like the Debian → Alpine +base swap. The agent runs from its real image inside Kind; the mock +runs as a sibling pod in the same `drdroid` namespace. + +```bash +# from repo root +./mock_backend/deploy_kind.sh +``` + +**Build-time note:** the alpine agent image takes ~20 min to build on +first run because several deps (psycopg2-binary, pymongo, sqlalchemy, +cryptography, lxml) have no musllinux wheels and compile from source. +By default `deploy_kind.sh` **auto-reuses** an existing +`drd-vpc-agent:local-` image — subsequent iterations are ~30s. + +| When | Flag | +|---|---| +| Agent code changed → must rebuild | `--rebuild-agent` (or `--rebuild`) | +| Only mock/scenarios changed → reuse agent | (default) | +| Force-skip everything (fastest) | `--reuse-image` | +| Mock backend code changed | (auto-detected — mock rebuilds when src is newer than image) | + +What it does: +1. ensures the `drd-local` Kind cluster + `drdroid` namespace +2. builds `mock_backend/Dockerfile` → `drd-vpc-agent-mock:local-latest` +3. `kind load`s it and applies `mock_backend/k8s/manifests.yaml` +4. waits for readiness, opens a `kubectl port-forward` on `:18080` +5. mints a token via `POST /admin/tokens` +6. shells out to `./deploy_local.sh ` with + `DRD_CLOUD_API_HOST=http://drd-vpc-agent-mock.drdroid.svc.cluster.local:8080` + so the agent pods talk to the mock over cluster DNS +7. seeds `scenarios/default.json` +8. runs `e2e.py` and reports PASS/FAIL + +You don't bring a token — the mock issues one. Reset everything between +runs with `./mock_backend/deploy_kind.sh --reset-only`. + +### 2. Skip Kind, run agent directly via celery (fast feedback on code) + +```bash +# Prereq: redis on :6379, repo Python deps importable +./mock_backend/run_e2e.sh +``` + +Exit 0 = PASS, 1 = FAIL. Logs in `mock_backend/data/`. + +### 3. Manual three-terminal loop + +```bash +# terminal 1 — mock +./mock_backend/run_mock.sh +# (note the printed DRD_CLOUD_API_TOKEN / DRD_CLOUD_API_HOST) + +# terminal 2 — agent +DRD_CLOUD_API_TOKEN=... DRD_CLOUD_API_HOST=http://127.0.0.1:8080 \ + ./mock_backend/run_agent.sh + +# terminal 3 — push scenarios + validate +.venv/bin/python mock_backend/seed_scenarios.py --reset +.venv/bin/python mock_backend/e2e.py +``` + +## Admin API (test control) + +| Method | Path | Purpose | +|---|---|---| +| `POST` | `/admin/tokens` | mint a token (returns `{token, label}`) | +| `GET` | `/admin/tokens` | list tokens | +| `DELETE` | `/admin/tokens/{token}` | revoke a token | +| `POST` | `/admin/queues/connection-tests` | enqueue a connection test (or `{items:[…]}` batch) | +| `POST` | `/admin/queues/playbook-tasks` | enqueue a playbook task / `ASSET_REFRESH` | +| `GET` | `/admin/queues/{name}` | peek pending | +| `DELETE` | `/admin/queues/{name}` | clear queue | +| `GET` | `/admin/recorded` | list recording buckets | +| `GET` | `/admin/recorded/{bucket}` | list events in a bucket | +| `POST` | `/admin/reset?keep_tokens=true` | wipe queues + recordings | + +Recording buckets emitted by the proxy endpoints: +`ping_get`, `ping_post`, `register_connectors`, `connection_tests_poll`, +`connection_tests_results`, `metadata_register`, `playbook_tasks_poll`, +`playbook_tasks_results`, `unmatched`. + +## Scenarios + +Edit `scenarios/default.json`. Two arrays: + +- `connection_tests`: each item is `{connector_name, request_id?}`. The + agent looks up `connector_name` in `credentials/secrets.yaml`, runs a + real connection test, and posts the result back. +- `playbook_tasks`: each item is a `playbook_task_execution` dict. Two + task shapes are exercised by default: + + **Real kubectl tasks** (canonical proto3 JSON — `dict_to_proto` parses + this directly into `PlaybookTask`): + ```json + { + "task": { + "source": "KUBERNETES", + "kubernetes": { + "type": "COMMAND", + "command": {"command": "kubectl get pods -n drdroid -o json"} + } + } + } + ``` + These run inside the agent pod via `KubectlApiProcessor` — no + connector configured in `secrets.yaml` is needed because the helm + chart sets `NATIVE_KUBERNETES_API_MODE=true` and `kubectl` is in the + agent's image. The agent's own RBAC (`drdroid-k8s-cluster-role`) + permits pods/services/events/namespaces. + + **`ASSET_REFRESH`** (intercepted before proto parsing — wrapper-form + `{"value": …}` here is intentional): + ```json + { + "task": { + "drd_proxy_agent": { + "type": "ASSET_REFRESH", + "asset_refresh": { + "connector_name": {"value": "native_k8s"}, + "connector_type": {"value": 47}, + "extractor_method": {"value": "extract_pods"} + } + } + } + } + ``` + `connector_type` is the protobuf `Source` enum int (KUBERNETES = 47). + Drop `extractor_method` to run every `extract_*` method. + +Underscore-prefixed keys (`_doc`, `$schema_note`) are ignored — handy for +inline documentation. + +### What `e2e.py` checks for kubectl tasks + +For each kubectl scenario it pulls the agent's reported output from the +recordings and asserts: + +1. The command's `output` is parseable JSON. +2. A stable, deployment-managed item is present (e.g. the mock-backend + pod must show up in `get pods -n drdroid`; the `drdroid` namespace + in `get namespaces`). +3. **Ground-truth match** — if `kubectl` is on the host PATH and points + at the same Kind cluster, the validator runs the same command on the + host and asserts the *set of names* matches what the agent reported. + Events are checked for "non-empty on both sides" only (events churn). + +## Notes + +- The mock holds queues in flat JSON files (`data/queues/*.json`); each + poll drains the file. Restart-safe. +- `data/` is gitignored. Tokens live there too — they don't survive a + hard reset (`POST /admin/reset?keep_tokens=false`). +- `scripts/start-celery-worker.sh` reads `CELERY_QUEUE`; we run a beat + + a `celery` worker + an `exec` worker. The asset_extraction queue is + only enabled when `NATIVE_KUBERNETES_API_MODE=true`. diff --git a/mock_backend/app.py b/mock_backend/app.py new file mode 100644 index 0000000..505438b --- /dev/null +++ b/mock_backend/app.py @@ -0,0 +1,316 @@ +"""FastAPI mock of the DrDroid cloud backend. + +Speaks the exact wire protocol the VPC agent expects — the agent has no +idea this isn't the real backend. Three concerns are layered on the same +FastAPI app: + +1. /connectors/proxy/* and /playbooks-engine/proxy/* — the real-shaped + endpoints. Auth via `Authorization: Bearer `. Every payload is + recorded. +2. /admin/* — test-control surface: issue tokens, enqueue scenarios + (connection tests, playbook tasks, asset_refresh), inspect / reset + recordings. +3. /health — orchestration probe. + +Run: `uvicorn app:app --host 0.0.0.0 --port 8080` (see run_mock.sh). +""" +from __future__ import annotations + +import os +from typing import Any + +from fastapi import Depends, FastAPI, Header, HTTPException, Query, Request +from fastapi.responses import JSONResponse + +import storage + +app = FastAPI(title="DrDroid VPC-Agent Mock Backend", version="0.1.0") + + +# --- Auth ---------------------------------------------------------------- + +def _bearer(authorization: str | None) -> str | None: + if not authorization: + return None + parts = authorization.split(None, 1) + if len(parts) != 2 or parts[0].lower() != "bearer": + return None + return parts[1].strip() + + +def require_token(authorization: str | None = Header(default=None)) -> str: + token = _bearer(authorization) + if not token or not storage.is_valid_token(token): + raise HTTPException(status_code=401, detail="invalid or missing bearer token") + return token + + +async def _capture(request: Request) -> dict: + """Pull body + headers + query into a serializable dict for recording.""" + try: + body = await request.json() + except Exception: + try: + body = (await request.body()).decode("utf-8", errors="replace") + except Exception: + body = None + return { + "method": request.method, + "path": request.url.path, + "query": dict(request.query_params), + "headers": {k: v for k, v in request.headers.items() if k.lower() != "authorization"}, + "body": body, + } + + +# --- Health -------------------------------------------------------------- + +@app.get("/health") +def health() -> dict: + tokens = storage.list_tokens() + return { + "status": "ok", + "tokens_issued": len(tokens), + "pending": { + "connection_tests": len(storage.queue_peek("connection_tests")), + "playbook_tasks": len(storage.queue_peek("playbook_tasks")), + }, + "recorded_buckets": storage.list_buckets(), + } + + +# --- Agent: startup + heartbeat ----------------------------------------- + +@app.get("/connectors/proxy/ping") +async def ping_get( + request: Request, + commit_hash: str = Query(default=""), + token: str = Depends(require_token), +): + captured = await _capture(request) + storage.record("ping_get", {"token_suffix": token[-4:], **captured}) + return {"status": "ok", "commit_hash": commit_hash} + + +@app.post("/connectors/proxy/ping") +async def ping_post(request: Request, token: str = Depends(require_token)): + captured = await _capture(request) + storage.record("ping_post", {"token_suffix": token[-4:], **captured}) + return {"status": "ok"} + + +# --- Agent: connector registration -------------------------------------- + +@app.post("/connectors/proxy/register") +async def register_connectors( + request: Request, + commit_hash: str = Query(default=""), + token: str = Depends(require_token), +): + captured = await _capture(request) + storage.record( + "register_connectors", + {"token_suffix": token[-4:], "commit_hash": commit_hash, **captured}, + ) + return {"status": "ok"} + + +# --- Agent: connection-test poll/result --------------------------------- + +@app.post("/connectors/proxy/connector/connection/tests") +async def connection_tests_poll(request: Request, token: str = Depends(require_token)): + pending = storage.queue_take_all("connection_tests") + response_body = {"requests": pending} + captured = await _capture(request) + storage.record( + "connection_tests_poll", + {"token_suffix": token[-4:], "delivered": pending, **captured}, + ) + return response_body + + +@app.post("/connectors/proxy/connector/connection/results") +async def connection_tests_result(request: Request, token: str = Depends(require_token)): + captured = await _capture(request) + storage.record( + "connection_tests_results", + {"token_suffix": token[-4:], **captured}, + ) + return {"status": "ok"} + + +# --- Agent: asset metadata batches -------------------------------------- + +@app.post("/connectors/proxy/connector/metadata/register") +async def metadata_register(request: Request, token: str = Depends(require_token)): + captured = await _capture(request) + body = captured.get("body") if isinstance(captured.get("body"), dict) else {} + storage.record( + "metadata_register", + { + "token_suffix": token[-4:], + "connector": body.get("connector"), + "model_type": body.get("model_type"), + "refresh_id": body.get("refresh_id"), + "has_more": body.get("has_more"), + "asset_count": len(body.get("assets", []) or []), + **captured, + }, + ) + return {"status": "ok"} + + +# --- Agent: playbook task poll/result ----------------------------------- + +@app.post("/playbooks-engine/proxy/execution/tasks") +async def playbook_tasks_poll(request: Request, token: str = Depends(require_token)): + pending = storage.queue_take_all("playbook_tasks") + response_body = {"playbook_task_executions": pending} + captured = await _capture(request) + storage.record( + "playbook_tasks_poll", + {"token_suffix": token[-4:], "delivered": pending, **captured}, + ) + return response_body + + +@app.post("/playbooks-engine/proxy/execution/results") +async def playbook_tasks_result(request: Request, token: str = Depends(require_token)): + captured = await _capture(request) + body = captured.get("body") if isinstance(captured.get("body"), dict) else {} + logs = body.get("playbook_task_execution_logs", []) or [] + summarised_logs = [] + for log in logs: + result = log.get("result") or {} + summarised_logs.append( + { + "request_id": log.get("proxy_execution_request_id"), + "has_error": "error" in result, + "error": (result.get("error") or {}).get("value") if isinstance(result.get("error"), dict) else result.get("error"), + "result_keys": list(result.keys()), + } + ) + storage.record( + "playbook_tasks_results", + { + "token_suffix": token[-4:], + "log_count": len(logs), + "summary": summarised_logs, + **captured, + }, + ) + return {"status": "ok"} + + +# --- Admin (test-control) ----------------------------------------------- + +@app.post("/admin/tokens") +def admin_issue_token(payload: dict | None = None) -> dict: + label = (payload or {}).get("label") + token = storage.issue_token(label) + return {"token": token, "label": label or "agent"} + + +@app.get("/admin/tokens") +def admin_list_tokens() -> dict: + return {"tokens": storage.list_tokens()} + + +@app.delete("/admin/tokens/{token}") +def admin_revoke_token(token: str) -> dict: + return {"revoked": storage.revoke_token(token)} + + +@app.post("/admin/queues/connection-tests") +def admin_enqueue_connection_test(payload: dict) -> dict: + """Body: {"connector_name": "...", "request_id": "optional-uuid"}. + + Or {"items": [{...}, ...]} for batch. + """ + items = payload.get("items") + if items is None: + items = [payload] + normalised = [] + import uuid as _uuid + for it in items: + if "connector_name" not in it: + raise HTTPException(400, "connector_name required") + normalised.append({ + "request_id": it.get("request_id") or str(_uuid.uuid4()), + "connector_name": it["connector_name"], + }) + storage.queue_extend("connection_tests", normalised) + return {"enqueued": normalised} + + +@app.post("/admin/queues/playbook-tasks") +def admin_enqueue_playbook_task(payload: dict) -> dict: + """Body: a single playbook_task_execution dict, or {"items": [...]}. + + Asset-refresh shape: + { + "task": { + "drd_proxy_agent": { + "type": "ASSET_REFRESH", + "asset_refresh": { + "connector_name": {"value": "my_k8s"}, + "connector_type": {"value": 47}, + "extractor_method": {"value": "extract_pods"} + } + } + } + } + """ + items = payload.get("items") + if items is None: + items = [payload] + import time as _time, uuid as _uuid + normalised = [] + for it in items: + normalised.append({ + "proxy_execution_request_id": it.get("proxy_execution_request_id") or str(_uuid.uuid4()), + "task": it.get("task", {}), + "time_range": it.get("time_range") or { + "time_geq": int(_time.time()) - 300, + "time_lt": int(_time.time()), + }, + "execution_global_variable_set": it.get("execution_global_variable_set", {}), + }) + storage.queue_extend("playbook_tasks", normalised) + return {"enqueued": normalised} + + +@app.get("/admin/queues/{name}") +def admin_queue_peek(name: str) -> dict: + return {"name": name, "items": storage.queue_peek(name)} + + +@app.delete("/admin/queues/{name}") +def admin_queue_clear(name: str) -> dict: + storage.queue_clear(name) + return {"cleared": name} + + +@app.get("/admin/recorded") +def admin_recorded_buckets() -> dict: + return {"buckets": storage.list_buckets()} + + +@app.get("/admin/recorded/{bucket}") +def admin_recorded(bucket: str) -> JSONResponse: + return JSONResponse({"bucket": bucket, "events": storage.read_records(bucket)}) + + +@app.post("/admin/reset") +def admin_reset(keep_tokens: bool = Query(default=True)) -> dict: + storage.reset_all(keep_tokens=keep_tokens) + return {"reset": True, "tokens_kept": keep_tokens} + + +# --- Catch-all to surface unexpected agent calls ------------------------ + +@app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) +async def catchall(full_path: str, request: Request) -> JSONResponse: + captured = await _capture(request) + storage.record("unmatched", {"path": "/" + full_path, **captured}) + return JSONResponse({"error": "unmatched path", "path": "/" + full_path}, status_code=404) diff --git a/mock_backend/deploy_kind.sh b/mock_backend/deploy_kind.sh new file mode 100755 index 0000000..8b6dcea --- /dev/null +++ b/mock_backend/deploy_kind.sh @@ -0,0 +1,282 @@ +#!/usr/bin/env bash +# Full Kind-based test loop: agent runs from its real Docker image, mock +# backend runs as a sibling pod in the same namespace. +# +# By default it reuses the agent image when it already exists locally — +# the alpine first-build is ~20min, but subsequent test iterations are +# ~30s. Pass --rebuild-agent (or --rebuild) when the agent code changed. +# +# Usage: +# ./mock_backend/deploy_kind.sh # build (or reuse) + deploy + seed + validate +# ./mock_backend/deploy_kind.sh --reuse-image # force-skip both rebuilds (fastest) +# ./mock_backend/deploy_kind.sh --rebuild # force a full rebuild of both images +# ./mock_backend/deploy_kind.sh --rebuild-agent # rebuild agent only (after code changes) +# ./mock_backend/deploy_kind.sh --rebuild-mock # rebuild mock only +# ./mock_backend/deploy_kind.sh --no-validate # skip e2e.py +# ./mock_backend/deploy_kind.sh --reset-only # tear down mock + agent, leave cluster +# +# Token + host are auto-wired — you don't bring them. The script prints +# everything you need (token, URLs, kubectl tail commands) on success. +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_DIR="$(dirname "$SCRIPT_DIR")" + +KIND_CLUSTER="${KIND_CLUSTER_NAME:-drd-local}" +NS="drdroid" +MOCK_IMAGE="drd-vpc-agent-mock:local-latest" +MOCK_SVC_HOST="http://drd-vpc-agent-mock.${NS}.svc.cluster.local:8080" +PORT_FWD_LOCAL_PORT="${MOCK_PORT_FORWARD:-18080}" + +VALIDATE=1 +RESET_ONLY=0 +REUSE_AGENT_IMAGE=auto # auto | always | never +REUSE_MOCK_IMAGE=auto +SCENARIOS_FILE="${SCENARIOS_FILE:-$SCRIPT_DIR/scenarios/default.json}" +E2E_TIMEOUT="${E2E_TIMEOUT:-180}" + +while [[ $# -gt 0 ]]; do + case "$1" in + --no-validate) VALIDATE=0; shift ;; + --reset-only) RESET_ONLY=1; shift ;; + --reuse-image) REUSE_AGENT_IMAGE=always; REUSE_MOCK_IMAGE=always; shift ;; + --rebuild) REUSE_AGENT_IMAGE=never; REUSE_MOCK_IMAGE=never; shift ;; + --rebuild-agent) REUSE_AGENT_IMAGE=never; shift ;; + --rebuild-mock) REUSE_MOCK_IMAGE=never; shift ;; + --scenarios) SCENARIOS_FILE="$2"; shift 2 ;; + --timeout) E2E_TIMEOUT="$2"; shift 2 ;; + -h|--help) sed -n '2,24p' "$0"; exit 0 ;; + *) echo "unknown arg: $1" >&2; exit 2 ;; + esac +done + +log() { echo "[deploy_kind] $*" >&2; } + +# Reset path ------------------------------------------------------------- +if [[ "$RESET_ONLY" -eq 1 ]]; then + log "uninstalling agent helm release" + helm status drd-vpc-agent -n "$NS" >/dev/null 2>&1 && helm uninstall drd-vpc-agent -n "$NS" || true + log "deleting mock manifest" + kubectl delete -f "$SCRIPT_DIR/k8s/manifests.yaml" --ignore-not-found + log "done" + exit 0 +fi + +# 1. cluster ------------------------------------------------------------- +if ! kind get clusters 2>/dev/null | grep -q "^${KIND_CLUSTER}$"; then + log "creating Kind cluster '${KIND_CLUSTER}'" + kind create cluster --name "${KIND_CLUSTER}" +fi +kubectl config use-context "kind-${KIND_CLUSTER}" >/dev/null +kubectl create namespace "$NS" --dry-run=client -o yaml | kubectl apply -f - >/dev/null + +# 2. build + load mock image -------------------------------------------- +should_rebuild_mock=1 +if [[ "$REUSE_MOCK_IMAGE" == "always" ]]; then + should_rebuild_mock=0 +elif [[ "$REUSE_MOCK_IMAGE" == "auto" ]] && docker image inspect "$MOCK_IMAGE" >/dev/null 2>&1; then + # Auto-reuse only if no source files are newer than the image. + IMAGE_TS=$(docker image inspect -f '{{.Created}}' "$MOCK_IMAGE" 2>/dev/null | xargs -I{} date -j -f '%Y-%m-%dT%H:%M:%S' "{}" +%s 2>/dev/null || echo 0) + NEWEST_SRC=$(find "$SCRIPT_DIR" -maxdepth 2 \( -name '*.py' -o -name 'requirements.txt' -o -name 'Dockerfile' \) -not -path '*/.venv/*' -not -path '*/data/*' -print0 2>/dev/null | xargs -0 stat -f '%m' 2>/dev/null | sort -n | tail -1) + if [[ -n "$IMAGE_TS" && -n "$NEWEST_SRC" && "$NEWEST_SRC" -le "$IMAGE_TS" ]]; then + should_rebuild_mock=0 + fi +fi + +if [[ "$should_rebuild_mock" -eq 1 ]]; then + log "building mock image: $MOCK_IMAGE" + docker build -t "$MOCK_IMAGE" "$SCRIPT_DIR" >/dev/null +else + log "reusing existing mock image: $MOCK_IMAGE" +fi + +log "kind load $MOCK_IMAGE (idempotent)" +kind load docker-image "$MOCK_IMAGE" --name "${KIND_CLUSTER}" >/dev/null 2>&1 || true + +# 3. apply mock manifests ------------------------------------------------ +log "applying mock manifests" +kubectl apply -f "$SCRIPT_DIR/k8s/manifests.yaml" >/dev/null + +# Force a pod recycle so a new image is picked up on rebuild. +kubectl -n "$NS" rollout restart deployment/drd-vpc-agent-mock >/dev/null + +# 4. wait for readiness -------------------------------------------------- +log "waiting for mock pod to be ready" +kubectl -n "$NS" rollout status deployment/drd-vpc-agent-mock --timeout=120s >/dev/null + +# 5. port-forward briefly + mint a token -------------------------------- +log "minting token via port-forward on :${PORT_FWD_LOCAL_PORT}" +kubectl -n "$NS" port-forward svc/drd-vpc-agent-mock "${PORT_FWD_LOCAL_PORT}:8080" >/tmp/drd-mock-pf.log 2>&1 & +PF_PID=$! +trap 'kill "$PF_PID" 2>/dev/null || true' EXIT INT TERM + +# Wait until the forward is live. +for _ in $(seq 1 50); do + if curl -sf "http://127.0.0.1:${PORT_FWD_LOCAL_PORT}/health" >/dev/null; then break; fi + sleep 0.2 +done +if ! curl -sf "http://127.0.0.1:${PORT_FWD_LOCAL_PORT}/health" >/dev/null; then + log "port-forward never came up — see /tmp/drd-mock-pf.log" + exit 1 +fi + +# Wipe queues + recordings between runs but keep tokens (so a long-running +# agent in the cluster wouldn't lose its credential — defensive). +curl -sf -X POST "http://127.0.0.1:${PORT_FWD_LOCAL_PORT}/admin/reset?keep_tokens=true" >/dev/null + +TOKEN=$(curl -sf -X POST "http://127.0.0.1:${PORT_FWD_LOCAL_PORT}/admin/tokens" \ + -H 'content-type: application/json' -d '{"label":"kind-agent"}' \ + | python3 -c 'import json,sys;print(json.load(sys.stdin)["token"])') +log "token: ${TOKEN:0:20}…" + +# 5b. ensure credentials-secret exists ---------------------------------- +# The helm chart mounts `credentials-secret` into the celery pods, but +# its manifest at helm/credentials-secret.yaml is NOT under templates/ +# so `helm install` never creates it. Without this secret kubelet hangs +# the pods in PodInitializing forever (FailedMount). +# +# Build it from credentials/secrets.yaml on the host. Empty file is fine +# — the helm chart just needs the volume to exist. Idempotent. +log "ensuring credentials-secret exists in $NS" +SECRETS_FILE="$REPO_DIR/credentials/secrets.yaml" +if [[ ! -f "$SECRETS_FILE" ]]; then + # Create an empty file rather than failing — most local test setups + # don't need any real connector creds since NATIVE_KUBERNETES_API_MODE + # is on. + log " no credentials/secrets.yaml found; creating empty placeholder" + : > "$SECRETS_FILE" +fi +kubectl -n "$NS" create secret generic credentials-secret \ + --from-file=secrets.yaml="$SECRETS_FILE" \ + --dry-run=client -o yaml | kubectl apply -f - >/dev/null + +# 6. build (or reuse) the agent image + deploy via helm ----------------- +# Self-contained — does not depend on the user's local deploy_local.sh +# (which is gitignored and per-machine). Logic: +# a. compute image tag from git commit +# b. decide skip-build (auto: skip when image already exists; --rebuild-agent forces) +# c. docker build + kind load (when not skipping) +# d. helm uninstall any prior release +# e. write helm/values.local.yaml with token + host + image tag +# f. helm upgrade --install +COMMIT_HASH=$(cd "$REPO_DIR" && git rev-parse --short HEAD 2>/dev/null || echo "local-dev") +AGENT_IMAGE_NAME="drd-vpc-agent" +AGENT_IMAGE_TAG="local-${COMMIT_HASH}" +AGENT_FULL_IMAGE="${AGENT_IMAGE_NAME}:${AGENT_IMAGE_TAG}" + +SKIP_AGENT_BUILD=0 +case "$REUSE_AGENT_IMAGE" in + always) SKIP_AGENT_BUILD=1 ;; + auto) + if docker image inspect "$AGENT_FULL_IMAGE" >/dev/null 2>&1; then + log "auto-reuse: $AGENT_FULL_IMAGE already exists; skipping agent build" + log " (pass --rebuild-agent to force rebuild after agent code changes)" + SKIP_AGENT_BUILD=1 + fi ;; + never) SKIP_AGENT_BUILD=0 ;; +esac + +cd "$REPO_DIR" + +if [[ "$SKIP_AGENT_BUILD" -eq 0 ]]; then + log "building agent image: $AGENT_FULL_IMAGE (this can take ~20min on first alpine build)" + docker build \ + --build-arg COMMIT_HASH="$COMMIT_HASH" \ + -t "$AGENT_FULL_IMAGE" \ + -t "${AGENT_IMAGE_NAME}:local-latest" \ + "$REPO_DIR" + log "kind load $AGENT_FULL_IMAGE" + kind load docker-image "$AGENT_FULL_IMAGE" --name "${KIND_CLUSTER}" >/dev/null +else + # Image exists locally; ensure it's also loaded into kind (idempotent NOP if already there). + kind load docker-image "$AGENT_FULL_IMAGE" --name "${KIND_CLUSTER}" >/dev/null 2>&1 || true +fi + +# Uninstall any previous helm release so we get a clean roll. +if helm status drd-vpc-agent -n "$NS" >/dev/null 2>&1; then + log "uninstalling previous helm release" + helm uninstall drd-vpc-agent -n "$NS" >/dev/null +fi + +# Write a values override file pointing helm at the mock + the local image. +VALUES_LOCAL="$REPO_DIR/helm/values.local.yaml" +{ + echo "global:" + echo " DRD_CLOUD_API_TOKEN: \"$TOKEN\"" + echo " DRD_CLOUD_API_HOST: \"$MOCK_SVC_HOST\"" + cat < "$VALUES_LOCAL" + +# The chart's configmap.yaml is at helm/ root (not under templates/), so +# helm install doesn't apply it. Same with credentials-secret (already +# handled above). +log "applying helm/configmap.yaml" +kubectl apply -f "$REPO_DIR/helm/configmap.yaml" -n "$NS" >/dev/null + +log "helm upgrade --install drd-vpc-agent" +helm upgrade --install drd-vpc-agent "$REPO_DIR/helm" \ + -n "$NS" \ + -f "$REPO_DIR/helm/values.yaml" \ + -f "$VALUES_LOCAL" + +# Give the agent a few seconds to call /connectors/proxy/ping etc. +sleep 5 + +# 7. seed scenarios ------------------------------------------------------ +log "seeding scenarios from $SCENARIOS_FILE" +# Use the venv created by run_mock.sh / run_e2e.sh if it exists, otherwise +# fall back to system python — both will work, only httpx is needed. +if [[ -x "$SCRIPT_DIR/.venv/bin/python" ]]; then + PY="$SCRIPT_DIR/.venv/bin/python" +else + PY="python3" + "$PY" -c 'import httpx' 2>/dev/null || pip install --quiet httpx +fi +"$PY" "$SCRIPT_DIR/seed_scenarios.py" \ + --host "http://127.0.0.1:${PORT_FWD_LOCAL_PORT}" \ + --file "$SCENARIOS_FILE" + +echo +echo "==============================================================" +echo "Mock backend : $MOCK_SVC_HOST (in-cluster)" +echo "Mock admin : http://127.0.0.1:${PORT_FWD_LOCAL_PORT} (port-forward, while this script lives)" +echo "Token : $TOKEN" +echo "Agent pods : kubectl -n ${NS} get pods" +echo "Agent logs : kubectl -n ${NS} logs -l app=celery-worker -f" +echo "Mock logs : kubectl -n ${NS} logs deploy/drd-vpc-agent-mock -f" +echo "Inspect calls : curl -s http://127.0.0.1:${PORT_FWD_LOCAL_PORT}/admin/recorded | jq" +echo "==============================================================" +echo + +if [[ "$VALIDATE" -eq 1 ]]; then + log "running e2e validator (timeout ${E2E_TIMEOUT}s)" + set +e + "$PY" "$SCRIPT_DIR/e2e.py" \ + --host "http://127.0.0.1:${PORT_FWD_LOCAL_PORT}" \ + --file "$SCENARIOS_FILE" \ + --timeout "$E2E_TIMEOUT" + RC=$? + set -e + if [[ $RC -eq 0 ]]; then + log "PASS" + else + log "FAIL — see logs above. Pods + mock are still running for inspection." + fi + exit $RC +else + log "skipping validation (--no-validate). Port-forward holds while this script runs; Ctrl-C to release." + wait "$PF_PID" +fi diff --git a/mock_backend/e2e.py b/mock_backend/e2e.py new file mode 100644 index 0000000..7a3ad2e --- /dev/null +++ b/mock_backend/e2e.py @@ -0,0 +1,414 @@ +"""Lightweight end-to-end validator. + +Assumes the mock is running and an agent has been pointed at it. After +seeding scenarios this script polls the recordings and asserts the agent +exhibited the expected behaviours: startup ping, periodic heartbeat, drained +the queues, and reported back results for everything we enqueued. + +Designed to be run after `run_e2e.sh` brings everything up. Returns +non-zero exit on failure with a human-readable diff. +""" +from __future__ import annotations + +import argparse +import json +import shutil +import subprocess +import sys +import time +from dataclasses import dataclass, field +from pathlib import Path + +import httpx + + +@dataclass +class CheckResult: + name: str + ok: bool + detail: str = "" + + def render(self) -> str: + marker = "PASS" if self.ok else "FAIL" + return f" [{marker}] {self.name}" + (f" — {self.detail}" if self.detail else "") + + +@dataclass +class Report: + checks: list[CheckResult] = field(default_factory=list) + + def add(self, name: str, ok: bool, detail: str = "") -> None: + self.checks.append(CheckResult(name, ok, detail)) + + @property + def ok(self) -> bool: + return all(c.ok for c in self.checks) + + def render(self) -> str: + lines = [c.render() for c in self.checks] + lines.append("") + lines.append(f" TOTAL: {sum(1 for c in self.checks if c.ok)}/{len(self.checks)} passed") + return "\n".join(lines) + + +def _get(client: httpx.Client, path: str) -> dict: + r = client.get(path) + r.raise_for_status() + return r.json() + + +def _events(client: httpx.Client, bucket: str) -> list[dict]: + return _get(client, f"/admin/recorded/{bucket}").get("events", []) + + +def wait_for(client: httpx.Client, bucket: str, predicate, timeout: float, label: str) -> tuple[bool, list[dict]]: + deadline = time.time() + timeout + last: list[dict] = [] + while time.time() < deadline: + last = _events(client, bucket) + if predicate(last): + return True, last + time.sleep(1.0) + return False, last + + +def _kubectl_command_from_scenario(scenario: dict) -> str | None: + """Return the kubectl command string from a scenario, or None if it's not a Kubectl COMMAND task.""" + task = scenario.get("task") or {} + if task.get("source") != "KUBERNETES": + return None + k = task.get("kubernetes") or {} + if k.get("type") != "COMMAND": + return None + cmd = (k.get("command") or {}).get("command") + return cmd if isinstance(cmd, str) else None + + +def _find_kubectl_output(results_events: list[dict], command: str) -> str | None: + """Walk recorded playbook_tasks_results events and return the agent's + output string for the given command (None if not found).""" + for ev in results_events: + body = ev.get("body") or {} + for log in body.get("playbook_task_execution_logs", []) or []: + result = log.get("result") or {} + bash = result.get("bash_command_output") or {} + for co in bash.get("command_outputs", []) or []: + # MessageToDict gives bare strings for StringValue; older + # encodings might give {"value": "..."} — handle both. + cmd_field = co.get("command") + if isinstance(cmd_field, dict): + cmd_field = cmd_field.get("value") + if cmd_field == command or (cmd_field or "").strip() == command.strip(): + out = co.get("output") + if isinstance(out, dict): + out = out.get("value") + return out + return None + + +def _host_kubectl(args: list[str]) -> tuple[bool, str]: + """Run kubectl on the host. Returns (ok, stdout-or-error).""" + if not shutil.which("kubectl"): + return False, "host kubectl not in PATH" + try: + proc = subprocess.run(["kubectl", *args], capture_output=True, text=True, timeout=15) + except subprocess.TimeoutExpired: + return False, "host kubectl timed out" + if proc.returncode != 0: + return False, f"exit {proc.returncode}: {proc.stderr.strip()[:200]}" + return True, proc.stdout + + +def _names_from_list(blob: str) -> set[str]: + """Pull `metadata.name` out of a `kubectl get ... -o json` listing.""" + try: + data = json.loads(blob) + except json.JSONDecodeError: + return set() + return {(it.get("metadata") or {}).get("name") for it in data.get("items", []) if (it.get("metadata") or {}).get("name")} + + +def _check_kubectl_scenarios(client: httpx.Client, expected_pt: list[dict], report: Report, timeout: float) -> None: + """For each kubectl scenario: assert agent reported a parseable JSON + output, the expected stable item is present, and the set of names + matches the host's `kubectl` snapshot when available.""" + kubectl_scenarios = [(s, _kubectl_command_from_scenario(s)) for s in expected_pt] + kubectl_scenarios = [(s, c) for s, c in kubectl_scenarios if c] + if not kubectl_scenarios: + return + + # Wait for results to be in. The earlier "results posted back" check + # already gated on this, but we re-pull fresh recordings here. + deadline = time.time() + min(timeout, 60) + results: list[dict] = [] + while time.time() < deadline: + results = _events(client, "playbook_tasks_results") + if any(_find_kubectl_output(results, c) is not None for _, c in kubectl_scenarios): + break + time.sleep(1.0) + + host_kubectl_available = shutil.which("kubectl") is not None + if not host_kubectl_available: + report.add( + "host kubectl available for ground-truth comparison", + False, + "kubectl not in PATH — agent outputs will be self-checked only", + ) + + for scenario, command in kubectl_scenarios: + output = _find_kubectl_output(results, command) + if output is None: + report.add(f"agent reported output for: {command}", False, "no matching command_output in recordings") + continue + + # Parseability — every command in our default set asks for `-o json`. + try: + parsed = json.loads(output) + except json.JSONDecodeError as e: + report.add(f"agent output is valid JSON: {command}", False, f"json error: {e}") + continue + report.add(f"agent output is valid JSON: {command}", True) + + # Self-checks (deployment-stable invariants we set up ourselves). + if "get pods -n drdroid" in command: + agent_names = {(it.get("metadata") or {}).get("name") for it in parsed.get("items", [])} + agent_names = {n for n in agent_names if n} + mock_pod = any(n.startswith("drd-vpc-agent-mock") for n in agent_names) + report.add("mock backend pod visible in agent's `get pods -n drdroid`", mock_pod, + f"saw {len(agent_names)} pods: {sorted(agent_names)[:5]}…") + celery_pod = any(n.startswith("drd-vpc-agent-celery") or "celery" in n for n in agent_names) + report.add("agent celery pod visible in `get pods -n drdroid`", celery_pod, + f"pod set: {sorted(agent_names)}") + elif "get svc -n drdroid" in command: + agent_names = {(it.get("metadata") or {}).get("name") for it in parsed.get("items", [])} + agent_names = {n for n in agent_names if n} + report.add("mock backend service visible in agent's `get svc -n drdroid`", + "drd-vpc-agent-mock" in agent_names, + f"saw services: {sorted(agent_names)}") + elif "get namespaces" in command: + agent_names = {(it.get("metadata") or {}).get("name") for it in parsed.get("items", [])} + agent_names = {n for n in agent_names if n} + report.add("`drdroid` namespace visible in agent's `get namespaces`", + "drdroid" in agent_names, + f"saw namespaces: {sorted(agent_names)}") + elif "get events -n drdroid" in command: + event_count = len(parsed.get("items", [])) + report.add("agent's `get events -n drdroid` returned events", + event_count > 0, + f"saw {event_count} events") + + # Host-side ground truth (skipped if host has no kubectl context for this cluster). + if not host_kubectl_available: + continue + + agent_set = _names_from_list(output) + host_args: list[str] | None = None + if "get pods -n drdroid" in command: + host_args = ["get", "pods", "-n", "drdroid", "-o", "json"] + elif "get svc -n drdroid" in command: + host_args = ["get", "svc", "-n", "drdroid", "-o", "json"] + elif "get namespaces" in command: + host_args = ["get", "namespaces", "-o", "json"] + elif "get events -n drdroid" in command: + host_args = ["get", "events", "-n", "drdroid", "-o", "json"] + if host_args is None: + continue + + ok, host_blob = _host_kubectl(host_args) + if not ok: + report.add(f"host ground-truth: {' '.join(host_args)}", False, host_blob) + continue + host_set = _names_from_list(host_blob) + + # Quiet clusters should agree on names. Allow tiny drift for events + # (the only churn-y resource) — for that one, just ensure both sides + # are non-empty. + if "events" in host_args: + ok = bool(agent_set or len(json.loads(output).get("items", []))) and bool(host_set or len(json.loads(host_blob).get("items", []))) + report.add(f"host vs agent both report events for `{' '.join(host_args)}`", ok) + else: + missing_in_agent = host_set - agent_set + extra_in_agent = agent_set - host_set + ok = not missing_in_agent and not extra_in_agent + detail = f"agent={sorted(agent_set)} host={sorted(host_set)}" + if not ok: + detail = f"missing_in_agent={sorted(missing_in_agent)} extra_in_agent={sorted(extra_in_agent)} | {detail}" + report.add(f"host kubectl set matches agent's: `{' '.join(host_args)}`", ok, detail) + + +def _configured_connector_names() -> set[str]: + """Mirror seed_scenarios.loaded_connector_names so e2e.py and the + seeder agree on which connection-tests will actually be queued.""" + secrets_path = Path(__file__).resolve().parent.parent / "credentials" / "secrets.yaml" + if not secrets_path.exists(): + return set() + try: + import yaml # type: ignore + except ImportError: + return set() + try: + data = yaml.safe_load(secrets_path.read_text()) or {} + except yaml.YAMLError: + return set() + return set(data.keys()) if isinstance(data, dict) else set() + + +def run(host: str, scenarios_path: Path, timeout: float) -> Report: + report = Report() + scenarios = json.loads(scenarios_path.read_text()) + expected_ct = [c for c in scenarios.get("connection_tests", []) if any(not k.startswith("_") and not k.startswith("$") for k in c)] + # Match seed_scenarios.py: skip connection-tests whose connector + # isn't configured locally — the agent silently drops them. + configured = _configured_connector_names() + expected_ct = [c for c in expected_ct if c.get("connector_name") in configured] + expected_pt = [p for p in scenarios.get("playbook_tasks", []) if any(not k.startswith("_") and not k.startswith("$") for k in p)] + + with httpx.Client(base_url=host, timeout=10.0) as client: + # 1. startup ping. + ok, evts = wait_for(client, "ping_get", lambda e: len(e) >= 1, timeout, "startup ping") + report.add( + "agent issued startup ping (GET /connectors/proxy/ping)", + ok, + f"saw {len(evts)} ping_get events", + ) + + # 2. periodic heartbeat — POST /connectors/proxy/ping (every 50s in + # default schedule, so allow a generous window). + ok, evts = wait_for(client, "ping_post", lambda e: len(e) >= 1, max(timeout, 70.0), "heartbeat") + report.add( + "agent sent periodic heartbeat (POST /connectors/proxy/ping)", + ok, + f"saw {len(evts)} ping_post events", + ) + + # 3. connection-test loop — agent polled and reported. + ok, polls = wait_for(client, "connection_tests_poll", lambda e: len(e) >= 1, timeout, "ct poll") + report.add( + "agent polled connection_tests endpoint", + ok, + f"saw {len(polls)} polls", + ) + + if expected_ct: + expected_request_ids = set() + ct_drained = False + ct_deadline = time.time() + timeout + while time.time() < ct_deadline: + polls = _events(client, "connection_tests_poll") + for ev in polls: + for delivered in ev.get("delivered") or []: + if delivered.get("request_id"): + expected_request_ids.add(delivered["request_id"]) + # Wait for as many request_ids to be delivered as we enqueued. + if len(expected_request_ids) >= len(expected_ct): + ct_drained = True + break + time.sleep(1.0) + report.add( + f"connection_tests drained ({len(expected_ct)} enqueued)", + ct_drained, + f"agent received {len(expected_request_ids)} request_ids", + ) + + results = _events(client, "connection_tests_results") + seen_results = set() + for ev in results: + body = ev.get("body") or {} + for r in body.get("results", []) or []: + if r.get("request_id"): + seen_results.add(r["request_id"]) + report.add( + "agent posted connection_tests results back", + expected_request_ids.issubset(seen_results), + f"reported {len(seen_results & expected_request_ids)}/{len(expected_request_ids)} request_ids", + ) + + # 4. playbook-task loop. + ok, polls = wait_for(client, "playbook_tasks_poll", lambda e: len(e) >= 1, timeout, "pt poll") + report.add( + "agent polled playbook_tasks endpoint", + ok, + f"saw {len(polls)} polls", + ) + + if expected_pt: + seen_request_ids: set[str] = set() + seen_deadline = time.time() + timeout + while time.time() < seen_deadline: + polls = _events(client, "playbook_tasks_poll") + for ev in polls: + for delivered in ev.get("delivered") or []: + if delivered.get("proxy_execution_request_id"): + seen_request_ids.add(delivered["proxy_execution_request_id"]) + if len(seen_request_ids) >= len(expected_pt): + break + time.sleep(1.0) + report.add( + f"playbook_tasks drained ({len(expected_pt)} enqueued)", + len(seen_request_ids) >= len(expected_pt), + f"agent received {len(seen_request_ids)} request_ids", + ) + + # Result endpoint should fire once per task. + result_ids: set[str] = set() + errored: list[str] = [] + res_deadline = time.time() + timeout + while time.time() < res_deadline: + for ev in _events(client, "playbook_tasks_results"): + for s in ev.get("summary") or []: + if s.get("request_id"): + result_ids.add(s["request_id"]) + if s.get("has_error"): + errored.append(f"{s['request_id']}: {s.get('error')}") + if seen_request_ids.issubset(result_ids): + break + time.sleep(1.0) + report.add( + "agent posted playbook_tasks results back", + seen_request_ids.issubset(result_ids), + f"reported {len(seen_request_ids & result_ids)}/{len(seen_request_ids)} request_ids", + ) + if errored: + report.add( + "playbook tasks executed without errors", + False, + "errors: " + "; ".join(errored[:3]) + (f" (+{len(errored)-3})" if len(errored) > 3 else ""), + ) + + # Asset refresh tasks should drive metadata/register batches. + has_asset_refresh = any( + (p.get("task") or {}).get("drd_proxy_agent", {}).get("type") == "ASSET_REFRESH" + for p in expected_pt + ) + if has_asset_refresh: + metadata_events = _events(client, "metadata_register") + report.add( + "ASSET_REFRESH produced metadata register batches", + len(metadata_events) >= 1, + f"saw {len(metadata_events)} batches", + ) + + # Real kubectl tasks: assert the agent's reported output is + # parseable, contains expected stable items, and matches a + # host-side `kubectl` snapshot when available. + _check_kubectl_scenarios(client, expected_pt, report, timeout) + + return report + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--host", default="http://localhost:8080") + ap.add_argument("--file", default=str(Path(__file__).parent / "scenarios/default.json")) + ap.add_argument("--timeout", type=float, default=90.0, + help="seconds to wait for each phase before failing") + args = ap.parse_args() + + print(f"[e2e] mock={args.host} scenarios={args.file} timeout={args.timeout}s", file=sys.stderr) + report = run(args.host, Path(args.file), args.timeout) + print(report.render()) + return 0 if report.ok else 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/mock_backend/k8s/manifests.yaml b/mock_backend/k8s/manifests.yaml new file mode 100644 index 0000000..c4af4f0 --- /dev/null +++ b/mock_backend/k8s/manifests.yaml @@ -0,0 +1,70 @@ +# Mock backend deployed inside the drdroid namespace alongside the agent. +# Agent pods reach it via in-cluster DNS at: +# http://drd-vpc-agent-mock.drdroid.svc.cluster.local:8080 +# The host reaches its admin API by `kubectl port-forward`. +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: drd-vpc-agent-mock + namespace: drdroid + labels: + app: drd-vpc-agent-mock +spec: + replicas: 1 + selector: + matchLabels: + app: drd-vpc-agent-mock + template: + metadata: + labels: + app: drd-vpc-agent-mock + spec: + containers: + - name: mock + image: drd-vpc-agent-mock:local-latest + imagePullPolicy: Never + ports: + - name: http + containerPort: 8080 + readinessProbe: + httpGet: + path: /health + port: http + initialDelaySeconds: 1 + periodSeconds: 2 + livenessProbe: + httpGet: + path: /health + port: http + initialDelaySeconds: 5 + periodSeconds: 10 + resources: + requests: + cpu: "50m" + memory: "64Mi" + limits: + cpu: "500m" + memory: "256Mi" + volumeMounts: + - name: data + mountPath: /app/data + volumes: + - name: data + emptyDir: {} +--- +apiVersion: v1 +kind: Service +metadata: + name: drd-vpc-agent-mock + namespace: drdroid + labels: + app: drd-vpc-agent-mock +spec: + type: ClusterIP + selector: + app: drd-vpc-agent-mock + ports: + - name: http + port: 8080 + targetPort: http diff --git a/mock_backend/requirements.txt b/mock_backend/requirements.txt new file mode 100644 index 0000000..7082f16 --- /dev/null +++ b/mock_backend/requirements.txt @@ -0,0 +1,4 @@ +fastapi>=0.110 +uvicorn[standard]>=0.27 +pyyaml>=6.0 +httpx>=0.27 diff --git a/mock_backend/run_agent.sh b/mock_backend/run_agent.sh new file mode 100755 index 0000000..328bf1c --- /dev/null +++ b/mock_backend/run_agent.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +# Boot the VPC agent (celery beat + worker + exec-worker) pointed at the +# mock backend. Token + host are required — get them from run_mock.sh. +# +# DRD_CLOUD_API_TOKEN=... DRD_CLOUD_API_HOST=http://127.0.0.1:8080 ./run_agent.sh +# +# This expects: redis on $REDIS_URL (default localhost:6379), the agent's +# Python deps installed in the active environment, and credentials/secrets.yaml +# populated with at least the connectors referenced in your scenarios. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_DIR="$(dirname "$SCRIPT_DIR")" + +: "${DRD_CLOUD_API_TOKEN:?set DRD_CLOUD_API_TOKEN — get one from run_mock.sh}" +: "${DRD_CLOUD_API_HOST:=http://127.0.0.1:8080}" +: "${CELERY_BROKER_URL:=redis://localhost:6379/0}" +: "${CELERY_RESULT_BACKEND:=redis://localhost:6379/0}" +: "${REDIS_URL:=redis://localhost:6379/0}" +: "${VPC_AGENT_COMMIT_HASH:=mock-test}" + +export DRD_CLOUD_API_TOKEN DRD_CLOUD_API_HOST CELERY_BROKER_URL CELERY_RESULT_BACKEND REDIS_URL VPC_AGENT_COMMIT_HASH + +cd "$REPO_DIR" + +LOG_DIR="$SCRIPT_DIR/data/agent-logs" +mkdir -p "$LOG_DIR" + +echo "[run_agent] migrating sqlite…" >&2 +python manage.py migrate >/dev/null + +echo "[run_agent] starting celery beat" >&2 +celery -A agent beat -l INFO --pidfile="$LOG_DIR/beat.pid" >"$LOG_DIR/beat.log" 2>&1 & +BEAT_PID=$! + +echo "[run_agent] starting celery worker (default queue)" >&2 +celery -A agent worker -l INFO -Q celery --concurrency=2 \ + --pidfile="$LOG_DIR/worker.pid" >"$LOG_DIR/worker.log" 2>&1 & +WORKER_PID=$! + +echo "[run_agent] starting celery worker (exec queue)" >&2 +CELERY_QUEUE=exec celery -A agent worker -l INFO -Q exec --concurrency=2 \ + --pidfile="$LOG_DIR/worker-exec.pid" >"$LOG_DIR/worker-exec.log" 2>&1 & +EXEC_PID=$! + +trap 'kill $BEAT_PID $WORKER_PID $EXEC_PID 2>/dev/null || true' EXIT INT TERM + +echo "==============================================================" >&2 +echo "[run_agent] agent running. logs in $LOG_DIR" >&2 +echo "[run_agent] beat=$BEAT_PID worker=$WORKER_PID exec=$EXEC_PID" >&2 +echo "[run_agent] press Ctrl-C to stop" >&2 +echo "==============================================================" >&2 + +wait diff --git a/mock_backend/run_e2e.sh b/mock_backend/run_e2e.sh new file mode 100755 index 0000000..e4bf6d7 --- /dev/null +++ b/mock_backend/run_e2e.sh @@ -0,0 +1,105 @@ +#!/usr/bin/env bash +# One-shot end-to-end loop: +# 1. start the FastAPI mock in the background +# 2. issue a token, export DRD_CLOUD_API_HOST/TOKEN +# 3. start celery beat + workers in the background +# 4. seed scenarios +# 5. wait for the agent to drain everything +# 6. validate via e2e.py +# 7. tear everything down (PASS/FAIL exit code) +# +# Requirements: +# - redis available at $REDIS_URL (default redis://localhost:6379/0) +# - VPC agent's Python deps importable from the current shell +# (e.g. `pip install -r requirements.txt` from repo root, or activate venv) +# - credentials/secrets.yaml populated with the connectors referenced +# in your scenarios file +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_DIR="$(dirname "$SCRIPT_DIR")" + +PORT="${MOCK_BACKEND_PORT:-8080}" +SCENARIOS_FILE="${SCENARIOS_FILE:-$SCRIPT_DIR/scenarios/default.json}" +E2E_TIMEOUT="${E2E_TIMEOUT:-120}" + +cd "$SCRIPT_DIR" + +# 1. mock backend --------------------------------------------------------- + +VENV="$SCRIPT_DIR/.venv" +if [[ ! -d "$VENV" ]]; then + echo "[e2e] creating mock-backend venv" >&2 + python3 -m venv "$VENV" + "$VENV/bin/pip" install --upgrade pip >/dev/null + "$VENV/bin/pip" install -r requirements.txt >/dev/null +fi + +LOG_DIR="$SCRIPT_DIR/data/agent-logs" +MOCK_LOG="$SCRIPT_DIR/data/mock-backend.log" +mkdir -p "$LOG_DIR" +: > "$MOCK_LOG" + +"$VENV/bin/uvicorn" app:app --host 0.0.0.0 --port "$PORT" --log-level warning >>"$MOCK_LOG" 2>&1 & +MOCK_PID=$! +trap 'echo "[e2e] tearing down" >&2; kill $MOCK_PID 2>/dev/null || true; pkill -P $$ 2>/dev/null || true' EXIT INT TERM + +for _ in $(seq 1 50); do + curl -sf "http://127.0.0.1:${PORT}/health" >/dev/null && break + sleep 0.1 +done +curl -sf "http://127.0.0.1:${PORT}/health" >/dev/null || { echo "[e2e] mock did not start — see $MOCK_LOG" >&2; exit 1; } +echo "[e2e] mock up on :${PORT}" >&2 + +# 2. token + env --------------------------------------------------------- + +TOKEN=$(curl -sf -X POST "http://127.0.0.1:${PORT}/admin/tokens" \ + -H 'content-type: application/json' -d '{"label":"e2e"}' \ + | "$VENV/bin/python" -c 'import json,sys;print(json.load(sys.stdin)["token"])') +echo "[e2e] issued token ${TOKEN:0:14}…" >&2 + +export DRD_CLOUD_API_TOKEN="$TOKEN" +export DRD_CLOUD_API_HOST="http://127.0.0.1:${PORT}" +export VPC_AGENT_COMMIT_HASH="${VPC_AGENT_COMMIT_HASH:-e2e-test}" +export CELERY_BROKER_URL="${CELERY_BROKER_URL:-redis://localhost:6379/0}" +export CELERY_RESULT_BACKEND="${CELERY_RESULT_BACKEND:-redis://localhost:6379/0}" +export REDIS_URL="${REDIS_URL:-redis://localhost:6379/0}" + +# 3. wipe prior recordings & start agent --------------------------------- + +curl -sf -X POST "http://127.0.0.1:${PORT}/admin/reset?keep_tokens=true" >/dev/null + +cd "$REPO_DIR" +echo "[e2e] migrating sqlite" >&2 +python manage.py migrate >/dev/null + +echo "[e2e] starting celery beat + workers" >&2 +celery -A agent beat -l INFO --pidfile="$LOG_DIR/beat.pid" >"$LOG_DIR/beat.log" 2>&1 & +celery -A agent worker -l INFO -Q celery --concurrency=2 --pidfile="$LOG_DIR/worker.pid" >"$LOG_DIR/worker.log" 2>&1 & +celery -A agent worker -l INFO -Q exec --concurrency=2 --pidfile="$LOG_DIR/worker-exec.pid" >"$LOG_DIR/worker-exec.log" 2>&1 & + +# Give celery a moment to register beat/worker schedules. +sleep 4 + +# 4. seed scenarios ------------------------------------------------------ + +echo "[e2e] seeding $SCENARIOS_FILE" >&2 +"$VENV/bin/python" "$SCRIPT_DIR/seed_scenarios.py" \ + --host "http://127.0.0.1:${PORT}" --file "$SCENARIOS_FILE" + +# 5. validate ------------------------------------------------------------ + +set +e +"$VENV/bin/python" "$SCRIPT_DIR/e2e.py" \ + --host "http://127.0.0.1:${PORT}" \ + --file "$SCENARIOS_FILE" \ + --timeout "$E2E_TIMEOUT" +RC=$? +set -e + +if [[ $RC -eq 0 ]]; then + echo "[e2e] PASS" >&2 +else + echo "[e2e] FAIL — see $LOG_DIR/*.log and $MOCK_LOG" >&2 +fi +exit $RC diff --git a/mock_backend/run_mock.sh b/mock_backend/run_mock.sh new file mode 100755 index 0000000..2b80593 --- /dev/null +++ b/mock_backend/run_mock.sh @@ -0,0 +1,82 @@ +#!/usr/bin/env bash +# Bring up the mock backend and print a freshly-issued bearer token. +# +# ./run_mock.sh [--port 8080] [--no-token] +# +# The token is printed to stdout. Capture it like: +# TOKEN=$(./run_mock.sh --print-token-only) +# (in another terminal — the script blocks on uvicorn while serving) + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PORT="${MOCK_BACKEND_PORT:-8080}" +HOST="${MOCK_BACKEND_HOST:-0.0.0.0}" +ISSUE_TOKEN=1 +PRINT_TOKEN_ONLY=0 + +while [[ $# -gt 0 ]]; do + case "$1" in + --port) PORT="$2"; shift 2 ;; + --host) HOST="$2"; shift 2 ;; + --no-token) ISSUE_TOKEN=0; shift ;; + --print-token-only) PRINT_TOKEN_ONLY=1; shift ;; + -h|--help) + sed -n '2,12p' "$0" + exit 0 + ;; + *) echo "unknown arg: $1" >&2; exit 2 ;; + esac +done + +cd "$SCRIPT_DIR" + +# Lazy install — keep deps off the agent's environment by using a venv next to this dir. +VENV="$SCRIPT_DIR/.venv" +if [[ ! -d "$VENV" ]]; then + echo "[mock_backend] creating venv at $VENV" >&2 + python3 -m venv "$VENV" + "$VENV/bin/pip" install --upgrade pip >/dev/null + "$VENV/bin/pip" install -r requirements.txt >/dev/null +fi + +PYTHON="$VENV/bin/python" +UVICORN="$VENV/bin/uvicorn" + +# Kick off uvicorn in the background so we can issue a token, then bring it +# to the foreground with `wait`. +LOG_FILE="$SCRIPT_DIR/data/mock-backend.log" +mkdir -p "$SCRIPT_DIR/data" +"$UVICORN" app:app --host "$HOST" --port "$PORT" --log-level info >"$LOG_FILE" 2>&1 & +UVICORN_PID=$! +trap 'kill "$UVICORN_PID" 2>/dev/null || true' EXIT INT TERM + +# Wait for /health to come up. +for _ in $(seq 1 50); do + if curl -sf "http://127.0.0.1:${PORT}/health" >/dev/null; then break; fi + sleep 0.1 +done +if ! curl -sf "http://127.0.0.1:${PORT}/health" >/dev/null; then + echo "[mock_backend] failed to start — see $LOG_FILE" >&2 + exit 1 +fi + +if [[ "$ISSUE_TOKEN" -eq 1 ]]; then + TOKEN=$(curl -sf -X POST "http://127.0.0.1:${PORT}/admin/tokens" \ + -H 'content-type: application/json' -d '{"label":"agent"}' \ + | "$PYTHON" -c 'import json,sys;print(json.load(sys.stdin)["token"])') + if [[ "$PRINT_TOKEN_ONLY" -eq 1 ]]; then + # In token-only mode we exit immediately so the caller can eat stdout. + echo "$TOKEN" + kill "$UVICORN_PID" 2>/dev/null || true + exit 0 + fi + echo "==============================================================" >&2 + echo "[mock_backend] up at http://127.0.0.1:${PORT}" >&2 + echo "[mock_backend] token: $TOKEN" >&2 + echo "[mock_backend] export: DRD_CLOUD_API_TOKEN=$TOKEN" >&2 + echo "[mock_backend] export: DRD_CLOUD_API_HOST=http://127.0.0.1:${PORT}" >&2 + echo "==============================================================" >&2 +fi + +wait "$UVICORN_PID" diff --git a/mock_backend/scenarios/default.json b/mock_backend/scenarios/default.json new file mode 100644 index 0000000..4db2646 --- /dev/null +++ b/mock_backend/scenarios/default.json @@ -0,0 +1,69 @@ +{ + "$schema_note": "Loaded by seed_scenarios.py. Each item is sent through the matching /admin/queues/* endpoint after the mock comes up. Edit freely — no code change needed.", + + "connection_tests": [ + { + "connector_name": "connector_name_1", + "_doc": "Should match a connector key in credentials/secrets.yaml. Agent will run a real test against the configured creds and post the result back. Drop or replace if your secrets.yaml is empty." + } + ], + + "playbook_tasks": [ + { + "_doc": "ASSET_REFRESH: works with NATIVE_KUBERNETES_API_MODE=true (helm default) — no connector_name needed in secrets.yaml. Pulls every extract_* method from the kubernetes metadata extractor.", + "task": { + "drd_proxy_agent": { + "type": "ASSET_REFRESH", + "asset_refresh": { + "connector_name": {"value": "native_k8s"}, + "connector_type": {"value": 47} + } + } + } + }, + + { + "_doc": "Kubectl: list all pods in the drdroid namespace. e2e.py asserts the JSON parses, includes the mock backend's pod, and matches the host's kubectl snapshot.", + "task": { + "source": "KUBERNETES", + "kubernetes": { + "type": "COMMAND", + "command": {"command": "kubectl get pods -n drdroid -o json"} + } + } + }, + + { + "_doc": "Kubectl: list services in drdroid namespace. e2e.py asserts the mock's service shows up.", + "task": { + "source": "KUBERNETES", + "kubernetes": { + "type": "COMMAND", + "command": {"command": "kubectl get svc -n drdroid -o json"} + } + } + }, + + { + "_doc": "Kubectl: list cluster namespaces. e2e.py asserts drdroid is present and the set matches the host's view.", + "task": { + "source": "KUBERNETES", + "kubernetes": { + "type": "COMMAND", + "command": {"command": "kubectl get namespaces -o json"} + } + } + }, + + { + "_doc": "Kubectl: list events in drdroid namespace. e2e.py asserts the JSON parses and events list is non-empty (a fresh deploy always emits events).", + "task": { + "source": "KUBERNETES", + "kubernetes": { + "type": "COMMAND", + "command": {"command": "kubectl get events -n drdroid -o json"} + } + } + } + ] +} diff --git a/mock_backend/seed_scenarios.py b/mock_backend/seed_scenarios.py new file mode 100644 index 0000000..891b389 --- /dev/null +++ b/mock_backend/seed_scenarios.py @@ -0,0 +1,80 @@ +"""Push a scenarios JSON file into a running mock via its admin API.""" +from __future__ import annotations + +import argparse +import json +import sys +from pathlib import Path + +import httpx + +REPO_ROOT = Path(__file__).resolve().parent.parent +SECRETS_PATH = REPO_ROOT / "credentials" / "secrets.yaml" + + +def loaded_connector_names() -> set[str]: + """Return the set of connector names defined in credentials/secrets.yaml. + + The agent's connection-test executor silently drops requests for + connectors not in this set (no result posted), so seeding such tests + leaves the mock waiting forever. Filter at seed time. + """ + if not SECRETS_PATH.exists(): + return set() + try: + import yaml # type: ignore + except ImportError: + return set() + try: + data = yaml.safe_load(SECRETS_PATH.read_text()) or {} + except yaml.YAMLError: + return set() + if not isinstance(data, dict): + return set() + return set(data.keys()) + + +def seed(host: str, scenarios_path: Path, reset: bool = False) -> None: + data = json.loads(scenarios_path.read_text()) + with httpx.Client(base_url=host, timeout=10.0) as client: + if reset: + r = client.post("/admin/reset", params={"keep_tokens": "true"}) + r.raise_for_status() + print(f"reset: {r.json()}") + + ct = [c for c in data.get("connection_tests", []) if not str(next(iter(c), "")).startswith("$")] + ct = [c for c in ct if not all(k.startswith("_") or k.startswith("$") for k in c)] + # Drop connection-tests for connectors that don't exist in + # credentials/secrets.yaml — the agent will silently no-op them. + configured = loaded_connector_names() + skipped = [c for c in ct if c.get("connector_name") not in configured] + ct = [c for c in ct if c.get("connector_name") in configured] + if skipped: + names = ", ".join(c.get("connector_name", "?") for c in skipped) + print(f"connection_tests skipped (no matching connector in secrets.yaml): {names}") + if ct: + r = client.post("/admin/queues/connection-tests", json={"items": ct}) + r.raise_for_status() + print(f"connection_tests enqueued: {len(r.json().get('enqueued', []))}") + + pt = data.get("playbook_tasks", []) or [] + pt = [p for p in pt if any(not k.startswith("_") and not k.startswith("$") for k in p)] + if pt: + r = client.post("/admin/queues/playbook-tasks", json={"items": pt}) + r.raise_for_status() + print(f"playbook_tasks enqueued: {len(r.json().get('enqueued', []))}") + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--host", default="http://localhost:8080") + ap.add_argument("--file", default=str(Path(__file__).parent / "scenarios/default.json")) + ap.add_argument("--reset", action="store_true", help="clear queues + recordings before seeding") + args = ap.parse_args() + + seed(args.host, Path(args.file), reset=args.reset) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/mock_backend/storage.py b/mock_backend/storage.py new file mode 100644 index 0000000..133ec9e --- /dev/null +++ b/mock_backend/storage.py @@ -0,0 +1,195 @@ +"""Persistence + queues for the mock backend. + +Every inbound payload from the agent is appended to a JSONL file under +data/recorded/.jsonl so tests can replay and assert on the wire +traffic. Pending work (connection tests, playbook tasks) is held in JSON +files under data/queues/ that the admin API mutates and the polling +endpoints drain. +""" +from __future__ import annotations + +import json +import secrets +import threading +import time +import uuid +from pathlib import Path +from typing import Any + + +DATA_DIR = Path(__file__).parent / "data" +RECORDED_DIR = DATA_DIR / "recorded" +QUEUES_DIR = DATA_DIR / "queues" +TOKENS_FILE = DATA_DIR / "tokens.json" + +_lock = threading.Lock() + + +def _ensure_dirs() -> None: + RECORDED_DIR.mkdir(parents=True, exist_ok=True) + QUEUES_DIR.mkdir(parents=True, exist_ok=True) + + +def _load_json(path: Path, default: Any) -> Any: + if not path.exists(): + return default + try: + return json.loads(path.read_text()) + except json.JSONDecodeError: + return default + + +def _dump_json(path: Path, value: Any) -> None: + path.write_text(json.dumps(value, indent=2, default=str)) + + +# --- Token store --------------------------------------------------------- + +def issue_token(label: str | None = None) -> str: + """Mint a fresh bearer token. Returned plaintext is the only auth secret + the agent needs.""" + with _lock: + _ensure_dirs() + tokens = _load_json(TOKENS_FILE, {}) + token = "mock-" + secrets.token_urlsafe(24) + tokens[token] = { + "label": label or "agent", + "issued_at": time.time(), + } + _dump_json(TOKENS_FILE, tokens) + return token + + +def list_tokens() -> dict[str, dict]: + with _lock: + return _load_json(TOKENS_FILE, {}) + + +def revoke_token(token: str) -> bool: + with _lock: + tokens = _load_json(TOKENS_FILE, {}) + if token in tokens: + del tokens[token] + _dump_json(TOKENS_FILE, tokens) + return True + return False + + +def is_valid_token(token: str) -> bool: + return token in list_tokens() + + +# --- Inbound recording --------------------------------------------------- + +def record(bucket: str, payload: dict) -> dict: + """Append a single event to data/recorded/.jsonl. + + `payload` should already include the request body, headers, query + params, and the response we returned — anything a test may want to + assert on later. + """ + with _lock: + _ensure_dirs() + event = { + "id": str(uuid.uuid4()), + "ts": time.time(), + "bucket": bucket, + **payload, + } + path = RECORDED_DIR / f"{bucket}.jsonl" + with path.open("a") as f: + f.write(json.dumps(event, default=str) + "\n") + return event + + +def read_records(bucket: str) -> list[dict]: + path = RECORDED_DIR / f"{bucket}.jsonl" + if not path.exists(): + return [] + out = [] + for line in path.read_text().splitlines(): + line = line.strip() + if not line: + continue + try: + out.append(json.loads(line)) + except json.JSONDecodeError: + continue + return out + + +def list_buckets() -> list[str]: + if not RECORDED_DIR.exists(): + return [] + return sorted(p.stem for p in RECORDED_DIR.glob("*.jsonl")) + + +def clear_records() -> None: + with _lock: + if RECORDED_DIR.exists(): + for f in RECORDED_DIR.glob("*.jsonl"): + f.unlink() + + +# --- Pending-work queues ------------------------------------------------- +# Two named queues: +# - "connection_tests": items shaped {request_id, connector_name} +# - "playbook_tasks": items shaped {proxy_execution_request_id, task, time_range, ...} +# `take_all` drains and returns every queued item — the agent's polling +# tasks consume the whole batch each tick. + +def _queue_path(name: str) -> Path: + _ensure_dirs() + return QUEUES_DIR / f"{name}.json" + + +def queue_push(name: str, item: dict) -> dict: + with _lock: + path = _queue_path(name) + items = _load_json(path, []) + items.append(item) + _dump_json(path, items) + return item + + +def queue_extend(name: str, items: list[dict]) -> int: + with _lock: + path = _queue_path(name) + existing = _load_json(path, []) + existing.extend(items) + _dump_json(path, existing) + return len(items) + + +def queue_take_all(name: str) -> list[dict]: + with _lock: + path = _queue_path(name) + items = _load_json(path, []) + if items: + _dump_json(path, []) + return items + + +def queue_peek(name: str) -> list[dict]: + return _load_json(_queue_path(name), []) + + +def queue_clear(name: str | None = None) -> None: + with _lock: + if name: + p = _queue_path(name) + if p.exists(): + p.unlink() + return + if QUEUES_DIR.exists(): + for f in QUEUES_DIR.glob("*.json"): + f.unlink() + + +def reset_all(keep_tokens: bool = True) -> None: + """Wipe recordings and queues. Keep tokens by default so an in-flight + agent doesn't lose its credential mid-test.""" + clear_records() + queue_clear() + if not keep_tokens and TOKENS_FILE.exists(): + TOKENS_FILE.unlink()