diff --git a/.claude/commands/test-vpc-agent.md b/.claude/commands/test-vpc-agent.md new file mode 100644 index 0000000..3e70824 --- /dev/null +++ b/.claude/commands/test-vpc-agent.md @@ -0,0 +1,101 @@ +--- +description: Run the VPC-agent end-to-end test loop against the local FastAPI mock backend (Kind or direct-celery) +argument-hint: "[--kind | --code] [--scenarios path/to/file.json] [--timeout 180] [--manual]" +--- + +# Run the VPC-agent end-to-end test loop + +You are running the test suite for the local VPC agent against the FastAPI mock backend at `mock_backend/`. The mock simulates DrDroid cloud — it issues a bearer token, records every inbound request as JSONL, and lets you seed connection-test + playbook-task scenarios. + +User args (may be empty): `$ARGUMENTS` + +## Step 0: pick the path + +Two paths exist. Choose based on what the user is changing: + +- **`--kind` (default when uncertain on a branch that touches Dockerfile, base image, requirements.txt, helm/, or deploy_local.sh):** runs the *real* agent Docker image inside Kind alongside a sibling mock pod. Use this for image migrations (e.g. Debian→Alpine), helm changes, dep upgrades that affect runtime. +- **`--code` (default for pure Python changes — connector logic, playbook tasks, business code):** runs celery directly on the host against a host-bound mock. Faster (~30s vs minutes for image build). + +If the user passed `--manual`, skip both and print the three-terminal manual flow from `mock_backend/README.md`. Stop. + +If neither flag is set, infer: `git diff main --name-only | grep -E '^(Dockerfile|requirements\.txt|helm/|deploy_local\.sh|mock_backend/Dockerfile)'` — any hit ⇒ default `--kind`, else default `--code`. + +## Step 1: preflight + +Path-specific. Run the relevant checks in parallel. + +**For `--kind`:** +- `kind get clusters` — check the binary works; cluster will be created if missing. +- `kubectl version --client` — fail fast if missing. +- `docker info` — Docker Desktop must be running. +- `cat credentials/secrets.yaml | head -5` from the repo root — should be non-empty. Empty file means scenarios that reference real connectors will fail with "Connector not found". Warn but proceed. + +**For `--code`:** +- `redis-cli ping` — must return `PONG`. If not, suggest `docker run -d --rm --name drd-redis -p 6379:6379 redis:alpine` and ask before running. +- `cat credentials/secrets.yaml | head -5` (same as above). +- `lsof -i :8080` — must be free. If taken, pass `MOCK_BACKEND_PORT=` through. +- `python -c 'import django, celery'` from the repo root — agent Python deps importable. If not, ask the user about `pip install -r requirements.txt`. + +## Step 2: run the loop + +### `--kind` + +```bash +cd /Users/dipeshmittal/drdroid/drd-vpc-agent +./mock_backend/deploy_kind.sh +``` + +Pass `--scenarios ` and `--timeout ` through. The script: +1. ensures Kind cluster `drd-local` + `drdroid` namespace +2. builds `mock_backend/Dockerfile` and `kind load`s it +3. applies `mock_backend/k8s/manifests.yaml` (Deployment + Service) +4. waits for readiness, port-forwards `:18080` +5. mints a token via `/admin/tokens` +6. shells out to `./deploy_local.sh ` with `DRD_CLOUD_API_HOST=http://drd-vpc-agent-mock.drdroid.svc.cluster.local:8080` so agent pods talk to the mock over cluster DNS +7. seeds scenarios + runs `e2e.py` + +The user does NOT bring a token — the mock issues it. `--reset-only` tears down the agent helm release and mock manifest. + +### `--code` + +```bash +cd /Users/dipeshmittal/drdroid/drd-vpc-agent +./mock_backend/run_e2e.sh +``` + +Same shape (start mock → mint token → boot agent → seed → validate) but agent runs as celery host processes. Pass `SCENARIOS_FILE=` and `E2E_TIMEOUT=` through. + +Both scripts stream to stderr. Let them run; don't wrap in another background subprocess. + +## Step 3: report + +On PASS: one or two lines. + +On FAIL: open the relevant logs. + +**For `--kind`:** +- `kubectl -n drdroid get pods` — check pod state. +- `kubectl -n drdroid logs deploy/drd-vpc-agent-mock --tail 100` +- `kubectl -n drdroid logs -l app=celery-worker --tail 100` +- `kubectl -n drdroid logs -l app=celery-beat --tail 100` +- `kubectl -n drdroid describe pod ` if any pod is in CrashLoopBackOff. +- While `deploy_kind.sh` is running, the mock admin API is up at `:18080` — `curl -s http://127.0.0.1:18080/admin/recorded` for buckets, `curl -s http://127.0.0.1:18080/admin/recorded/` for events. + +**For `--code`:** +- `mock_backend/data/mock-backend.log` +- `mock_backend/data/agent-logs/{beat,worker,worker-exec}.log` +- `mock_backend/data/recorded/*.jsonl` — the actual wire traffic. + +Common failure modes: +- Agent pod CrashLoopBackOff right after `deploy_local.sh`: usually `secrets.yaml` is empty/malformed, or the alpine image is missing a binary the agent runtime needs (this is exactly what we're testing for on `experiment/alpine-base` — surface the traceback; don't try to fix it). +- "agent did not poll X": worker isn't draining. Check broker connectivity (redis pod for kind; host redis for code path); check worker log for traceback. +- "playbook_tasks executed without errors: FAIL" with `Connector not found`: scenario's `connector_name` isn't in `credentials/secrets.yaml`. Tell the user; don't edit secrets yourself. +- 401 in mock log: token env didn't reach the agent. For kind, check `kubectl -n drdroid get secret drd-cloud-secret -o yaml | base64 -D` (token field). For code, check the env exported by `run_e2e.sh`. + +Don't speculate beyond the logs. If genuinely ambiguous after reading them, ask the user. + +## Notes + +- The test harness lives at `mock_backend/`. Code is checked in; `mock_backend/data/` is gitignored. +- This skill exists because the agent talks to `DRD_CLOUD_API_HOST` for ping/registration/connection tests/asset metadata/playbook tasks, and exercising those without mocking means hitting production. +- If the user wants to add new scenarios, point them at `mock_backend/scenarios/default.json` (commented inline) or `mock_backend/README.md`. diff --git a/.gitignore b/.gitignore index 85e6d1d..992e0d9 100644 --- a/.gitignore +++ b/.gitignore @@ -182,3 +182,8 @@ helm/values.local.yaml # Local copy of debug toolkit for testing drdroid-debug-toolkit/ deploy_local.sh + +# Per-user Claude Code settings (project-scoped slash commands under +# .claude/commands/ should be committed; this file holds local permission +# allowlists that vary by machine). +.claude/settings.local.json diff --git a/mock_backend/.gitignore b/mock_backend/.gitignore new file mode 100644 index 0000000..0fb4b93 --- /dev/null +++ b/mock_backend/.gitignore @@ -0,0 +1,5 @@ +data/ +__pycache__/ +*.pyc +.venv/ +venv/ diff --git a/mock_backend/Dockerfile b/mock_backend/Dockerfile new file mode 100644 index 0000000..71b34ae --- /dev/null +++ b/mock_backend/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.12-alpine + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY app.py storage.py ./ + +ENV PYTHONUNBUFFERED=1 +EXPOSE 8080 + +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8080", "--log-level", "info"] diff --git a/mock_backend/README.md b/mock_backend/README.md new file mode 100644 index 0000000..8677393 --- /dev/null +++ b/mock_backend/README.md @@ -0,0 +1,199 @@ +# mock_backend — local stand-in for DrDroid cloud + +A FastAPI service that speaks the exact wire protocol the VPC agent +expects. The agent has no idea it's not the real backend. + +## What it does + +| Surface | Purpose | +|---|---| +| `GET /connectors/proxy/ping` | startup reachability check (agent app `ready()`) | +| `POST /connectors/proxy/ping` | periodic heartbeat with pod health (every 50s) | +| `POST /connectors/proxy/register` | connector registration | +| `POST /connectors/proxy/connector/connection/tests` | poll for connection-test requests (every 10s) | +| `POST /connectors/proxy/connector/connection/results` | result upload | +| `POST /connectors/proxy/connector/metadata/register` | asset-metadata batches from extractors | +| `POST /playbooks-engine/proxy/execution/tasks` | poll for tasks (every 1s) — incl. `ASSET_REFRESH` | +| `POST /playbooks-engine/proxy/execution/results` | task / asset_refresh result upload | + +Auth is `Authorization: Bearer `. Tokens are issued by the mock +itself — the agent only knows it via `DRD_CLOUD_API_TOKEN`. + +Every inbound request (body + headers + query + the response we sent) is +recorded as a JSONL event under `data/recorded/.jsonl`. + +## Layout + +``` +mock_backend/ +├── app.py # FastAPI app: proxy + admin + catch-all +├── storage.py # JSON persistence (tokens, recordings, queues) +├── seed_scenarios.py # push a scenarios file into the admin API +├── e2e.py # poll recordings + assert agent behaviour +├── scenarios/default.json # default seeded scenarios +├── run_mock.sh # foreground: mock + token +├── run_agent.sh # foreground: celery beat/workers pointed at the mock +├── run_e2e.sh # one-shot loop: mock → agent → seed → validate +└── data/ # gitignored — recordings, queues, logs +``` + +## Three ways to run it + +### 1. Test the actual Docker image in Kind (recommended for image migrations) + +This is what you want when validating things like the Debian → Alpine +base swap. The agent runs from its real image inside Kind; the mock +runs as a sibling pod in the same `drdroid` namespace. + +```bash +# from repo root +./mock_backend/deploy_kind.sh +``` + +**Build-time note:** the alpine agent image takes ~20 min to build on +first run because several deps (psycopg2-binary, pymongo, sqlalchemy, +cryptography, lxml) have no musllinux wheels and compile from source. +By default `deploy_kind.sh` **auto-reuses** an existing +`drd-vpc-agent:local-` image — subsequent iterations are ~30s. + +| When | Flag | +|---|---| +| Agent code changed → must rebuild | `--rebuild-agent` (or `--rebuild`) | +| Only mock/scenarios changed → reuse agent | (default) | +| Force-skip everything (fastest) | `--reuse-image` | +| Mock backend code changed | (auto-detected — mock rebuilds when src is newer than image) | + +What it does: +1. ensures the `drd-local` Kind cluster + `drdroid` namespace +2. builds `mock_backend/Dockerfile` → `drd-vpc-agent-mock:local-latest` +3. `kind load`s it and applies `mock_backend/k8s/manifests.yaml` +4. waits for readiness, opens a `kubectl port-forward` on `:18080` +5. mints a token via `POST /admin/tokens` +6. shells out to `./deploy_local.sh ` with + `DRD_CLOUD_API_HOST=http://drd-vpc-agent-mock.drdroid.svc.cluster.local:8080` + so the agent pods talk to the mock over cluster DNS +7. seeds `scenarios/default.json` +8. runs `e2e.py` and reports PASS/FAIL + +You don't bring a token — the mock issues one. Reset everything between +runs with `./mock_backend/deploy_kind.sh --reset-only`. + +### 2. Skip Kind, run agent directly via celery (fast feedback on code) + +```bash +# Prereq: redis on :6379, repo Python deps importable +./mock_backend/run_e2e.sh +``` + +Exit 0 = PASS, 1 = FAIL. Logs in `mock_backend/data/`. + +### 3. Manual three-terminal loop + +```bash +# terminal 1 — mock +./mock_backend/run_mock.sh +# (note the printed DRD_CLOUD_API_TOKEN / DRD_CLOUD_API_HOST) + +# terminal 2 — agent +DRD_CLOUD_API_TOKEN=... DRD_CLOUD_API_HOST=http://127.0.0.1:8080 \ + ./mock_backend/run_agent.sh + +# terminal 3 — push scenarios + validate +.venv/bin/python mock_backend/seed_scenarios.py --reset +.venv/bin/python mock_backend/e2e.py +``` + +## Admin API (test control) + +| Method | Path | Purpose | +|---|---|---| +| `POST` | `/admin/tokens` | mint a token (returns `{token, label}`) | +| `GET` | `/admin/tokens` | list tokens | +| `DELETE` | `/admin/tokens/{token}` | revoke a token | +| `POST` | `/admin/queues/connection-tests` | enqueue a connection test (or `{items:[…]}` batch) | +| `POST` | `/admin/queues/playbook-tasks` | enqueue a playbook task / `ASSET_REFRESH` | +| `GET` | `/admin/queues/{name}` | peek pending | +| `DELETE` | `/admin/queues/{name}` | clear queue | +| `GET` | `/admin/recorded` | list recording buckets | +| `GET` | `/admin/recorded/{bucket}` | list events in a bucket | +| `POST` | `/admin/reset?keep_tokens=true` | wipe queues + recordings | + +Recording buckets emitted by the proxy endpoints: +`ping_get`, `ping_post`, `register_connectors`, `connection_tests_poll`, +`connection_tests_results`, `metadata_register`, `playbook_tasks_poll`, +`playbook_tasks_results`, `unmatched`. + +## Scenarios + +Edit `scenarios/default.json`. Two arrays: + +- `connection_tests`: each item is `{connector_name, request_id?}`. The + agent looks up `connector_name` in `credentials/secrets.yaml`, runs a + real connection test, and posts the result back. +- `playbook_tasks`: each item is a `playbook_task_execution` dict. Two + task shapes are exercised by default: + + **Real kubectl tasks** (canonical proto3 JSON — `dict_to_proto` parses + this directly into `PlaybookTask`): + ```json + { + "task": { + "source": "KUBERNETES", + "kubernetes": { + "type": "COMMAND", + "command": {"command": "kubectl get pods -n drdroid -o json"} + } + } + } + ``` + These run inside the agent pod via `KubectlApiProcessor` — no + connector configured in `secrets.yaml` is needed because the helm + chart sets `NATIVE_KUBERNETES_API_MODE=true` and `kubectl` is in the + agent's image. The agent's own RBAC (`drdroid-k8s-cluster-role`) + permits pods/services/events/namespaces. + + **`ASSET_REFRESH`** (intercepted before proto parsing — wrapper-form + `{"value": …}` here is intentional): + ```json + { + "task": { + "drd_proxy_agent": { + "type": "ASSET_REFRESH", + "asset_refresh": { + "connector_name": {"value": "native_k8s"}, + "connector_type": {"value": 47}, + "extractor_method": {"value": "extract_pods"} + } + } + } + } + ``` + `connector_type` is the protobuf `Source` enum int (KUBERNETES = 47). + Drop `extractor_method` to run every `extract_*` method. + +Underscore-prefixed keys (`_doc`, `$schema_note`) are ignored — handy for +inline documentation. + +### What `e2e.py` checks for kubectl tasks + +For each kubectl scenario it pulls the agent's reported output from the +recordings and asserts: + +1. The command's `output` is parseable JSON. +2. A stable, deployment-managed item is present (e.g. the mock-backend + pod must show up in `get pods -n drdroid`; the `drdroid` namespace + in `get namespaces`). +3. **Ground-truth match** — if `kubectl` is on the host PATH and points + at the same Kind cluster, the validator runs the same command on the + host and asserts the *set of names* matches what the agent reported. + Events are checked for "non-empty on both sides" only (events churn). + +## Notes + +- The mock holds queues in flat JSON files (`data/queues/*.json`); each + poll drains the file. Restart-safe. +- `data/` is gitignored. Tokens live there too — they don't survive a + hard reset (`POST /admin/reset?keep_tokens=false`). +- `scripts/start-celery-worker.sh` reads `CELERY_QUEUE`; we run a beat + + a `celery` worker + an `exec` worker. The asset_extraction queue is + only enabled when `NATIVE_KUBERNETES_API_MODE=true`. diff --git a/mock_backend/app.py b/mock_backend/app.py new file mode 100644 index 0000000..505438b --- /dev/null +++ b/mock_backend/app.py @@ -0,0 +1,316 @@ +"""FastAPI mock of the DrDroid cloud backend. + +Speaks the exact wire protocol the VPC agent expects — the agent has no +idea this isn't the real backend. Three concerns are layered on the same +FastAPI app: + +1. /connectors/proxy/* and /playbooks-engine/proxy/* — the real-shaped + endpoints. Auth via `Authorization: Bearer `. Every payload is + recorded. +2. /admin/* — test-control surface: issue tokens, enqueue scenarios + (connection tests, playbook tasks, asset_refresh), inspect / reset + recordings. +3. /health — orchestration probe. + +Run: `uvicorn app:app --host 0.0.0.0 --port 8080` (see run_mock.sh). +""" +from __future__ import annotations + +import os +from typing import Any + +from fastapi import Depends, FastAPI, Header, HTTPException, Query, Request +from fastapi.responses import JSONResponse + +import storage + +app = FastAPI(title="DrDroid VPC-Agent Mock Backend", version="0.1.0") + + +# --- Auth ---------------------------------------------------------------- + +def _bearer(authorization: str | None) -> str | None: + if not authorization: + return None + parts = authorization.split(None, 1) + if len(parts) != 2 or parts[0].lower() != "bearer": + return None + return parts[1].strip() + + +def require_token(authorization: str | None = Header(default=None)) -> str: + token = _bearer(authorization) + if not token or not storage.is_valid_token(token): + raise HTTPException(status_code=401, detail="invalid or missing bearer token") + return token + + +async def _capture(request: Request) -> dict: + """Pull body + headers + query into a serializable dict for recording.""" + try: + body = await request.json() + except Exception: + try: + body = (await request.body()).decode("utf-8", errors="replace") + except Exception: + body = None + return { + "method": request.method, + "path": request.url.path, + "query": dict(request.query_params), + "headers": {k: v for k, v in request.headers.items() if k.lower() != "authorization"}, + "body": body, + } + + +# --- Health -------------------------------------------------------------- + +@app.get("/health") +def health() -> dict: + tokens = storage.list_tokens() + return { + "status": "ok", + "tokens_issued": len(tokens), + "pending": { + "connection_tests": len(storage.queue_peek("connection_tests")), + "playbook_tasks": len(storage.queue_peek("playbook_tasks")), + }, + "recorded_buckets": storage.list_buckets(), + } + + +# --- Agent: startup + heartbeat ----------------------------------------- + +@app.get("/connectors/proxy/ping") +async def ping_get( + request: Request, + commit_hash: str = Query(default=""), + token: str = Depends(require_token), +): + captured = await _capture(request) + storage.record("ping_get", {"token_suffix": token[-4:], **captured}) + return {"status": "ok", "commit_hash": commit_hash} + + +@app.post("/connectors/proxy/ping") +async def ping_post(request: Request, token: str = Depends(require_token)): + captured = await _capture(request) + storage.record("ping_post", {"token_suffix": token[-4:], **captured}) + return {"status": "ok"} + + +# --- Agent: connector registration -------------------------------------- + +@app.post("/connectors/proxy/register") +async def register_connectors( + request: Request, + commit_hash: str = Query(default=""), + token: str = Depends(require_token), +): + captured = await _capture(request) + storage.record( + "register_connectors", + {"token_suffix": token[-4:], "commit_hash": commit_hash, **captured}, + ) + return {"status": "ok"} + + +# --- Agent: connection-test poll/result --------------------------------- + +@app.post("/connectors/proxy/connector/connection/tests") +async def connection_tests_poll(request: Request, token: str = Depends(require_token)): + pending = storage.queue_take_all("connection_tests") + response_body = {"requests": pending} + captured = await _capture(request) + storage.record( + "connection_tests_poll", + {"token_suffix": token[-4:], "delivered": pending, **captured}, + ) + return response_body + + +@app.post("/connectors/proxy/connector/connection/results") +async def connection_tests_result(request: Request, token: str = Depends(require_token)): + captured = await _capture(request) + storage.record( + "connection_tests_results", + {"token_suffix": token[-4:], **captured}, + ) + return {"status": "ok"} + + +# --- Agent: asset metadata batches -------------------------------------- + +@app.post("/connectors/proxy/connector/metadata/register") +async def metadata_register(request: Request, token: str = Depends(require_token)): + captured = await _capture(request) + body = captured.get("body") if isinstance(captured.get("body"), dict) else {} + storage.record( + "metadata_register", + { + "token_suffix": token[-4:], + "connector": body.get("connector"), + "model_type": body.get("model_type"), + "refresh_id": body.get("refresh_id"), + "has_more": body.get("has_more"), + "asset_count": len(body.get("assets", []) or []), + **captured, + }, + ) + return {"status": "ok"} + + +# --- Agent: playbook task poll/result ----------------------------------- + +@app.post("/playbooks-engine/proxy/execution/tasks") +async def playbook_tasks_poll(request: Request, token: str = Depends(require_token)): + pending = storage.queue_take_all("playbook_tasks") + response_body = {"playbook_task_executions": pending} + captured = await _capture(request) + storage.record( + "playbook_tasks_poll", + {"token_suffix": token[-4:], "delivered": pending, **captured}, + ) + return response_body + + +@app.post("/playbooks-engine/proxy/execution/results") +async def playbook_tasks_result(request: Request, token: str = Depends(require_token)): + captured = await _capture(request) + body = captured.get("body") if isinstance(captured.get("body"), dict) else {} + logs = body.get("playbook_task_execution_logs", []) or [] + summarised_logs = [] + for log in logs: + result = log.get("result") or {} + summarised_logs.append( + { + "request_id": log.get("proxy_execution_request_id"), + "has_error": "error" in result, + "error": (result.get("error") or {}).get("value") if isinstance(result.get("error"), dict) else result.get("error"), + "result_keys": list(result.keys()), + } + ) + storage.record( + "playbook_tasks_results", + { + "token_suffix": token[-4:], + "log_count": len(logs), + "summary": summarised_logs, + **captured, + }, + ) + return {"status": "ok"} + + +# --- Admin (test-control) ----------------------------------------------- + +@app.post("/admin/tokens") +def admin_issue_token(payload: dict | None = None) -> dict: + label = (payload or {}).get("label") + token = storage.issue_token(label) + return {"token": token, "label": label or "agent"} + + +@app.get("/admin/tokens") +def admin_list_tokens() -> dict: + return {"tokens": storage.list_tokens()} + + +@app.delete("/admin/tokens/{token}") +def admin_revoke_token(token: str) -> dict: + return {"revoked": storage.revoke_token(token)} + + +@app.post("/admin/queues/connection-tests") +def admin_enqueue_connection_test(payload: dict) -> dict: + """Body: {"connector_name": "...", "request_id": "optional-uuid"}. + + Or {"items": [{...}, ...]} for batch. + """ + items = payload.get("items") + if items is None: + items = [payload] + normalised = [] + import uuid as _uuid + for it in items: + if "connector_name" not in it: + raise HTTPException(400, "connector_name required") + normalised.append({ + "request_id": it.get("request_id") or str(_uuid.uuid4()), + "connector_name": it["connector_name"], + }) + storage.queue_extend("connection_tests", normalised) + return {"enqueued": normalised} + + +@app.post("/admin/queues/playbook-tasks") +def admin_enqueue_playbook_task(payload: dict) -> dict: + """Body: a single playbook_task_execution dict, or {"items": [...]}. + + Asset-refresh shape: + { + "task": { + "drd_proxy_agent": { + "type": "ASSET_REFRESH", + "asset_refresh": { + "connector_name": {"value": "my_k8s"}, + "connector_type": {"value": 47}, + "extractor_method": {"value": "extract_pods"} + } + } + } + } + """ + items = payload.get("items") + if items is None: + items = [payload] + import time as _time, uuid as _uuid + normalised = [] + for it in items: + normalised.append({ + "proxy_execution_request_id": it.get("proxy_execution_request_id") or str(_uuid.uuid4()), + "task": it.get("task", {}), + "time_range": it.get("time_range") or { + "time_geq": int(_time.time()) - 300, + "time_lt": int(_time.time()), + }, + "execution_global_variable_set": it.get("execution_global_variable_set", {}), + }) + storage.queue_extend("playbook_tasks", normalised) + return {"enqueued": normalised} + + +@app.get("/admin/queues/{name}") +def admin_queue_peek(name: str) -> dict: + return {"name": name, "items": storage.queue_peek(name)} + + +@app.delete("/admin/queues/{name}") +def admin_queue_clear(name: str) -> dict: + storage.queue_clear(name) + return {"cleared": name} + + +@app.get("/admin/recorded") +def admin_recorded_buckets() -> dict: + return {"buckets": storage.list_buckets()} + + +@app.get("/admin/recorded/{bucket}") +def admin_recorded(bucket: str) -> JSONResponse: + return JSONResponse({"bucket": bucket, "events": storage.read_records(bucket)}) + + +@app.post("/admin/reset") +def admin_reset(keep_tokens: bool = Query(default=True)) -> dict: + storage.reset_all(keep_tokens=keep_tokens) + return {"reset": True, "tokens_kept": keep_tokens} + + +# --- Catch-all to surface unexpected agent calls ------------------------ + +@app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) +async def catchall(full_path: str, request: Request) -> JSONResponse: + captured = await _capture(request) + storage.record("unmatched", {"path": "/" + full_path, **captured}) + return JSONResponse({"error": "unmatched path", "path": "/" + full_path}, status_code=404) diff --git a/mock_backend/deploy_kind.sh b/mock_backend/deploy_kind.sh new file mode 100755 index 0000000..8b6dcea --- /dev/null +++ b/mock_backend/deploy_kind.sh @@ -0,0 +1,282 @@ +#!/usr/bin/env bash +# Full Kind-based test loop: agent runs from its real Docker image, mock +# backend runs as a sibling pod in the same namespace. +# +# By default it reuses the agent image when it already exists locally — +# the alpine first-build is ~20min, but subsequent test iterations are +# ~30s. Pass --rebuild-agent (or --rebuild) when the agent code changed. +# +# Usage: +# ./mock_backend/deploy_kind.sh # build (or reuse) + deploy + seed + validate +# ./mock_backend/deploy_kind.sh --reuse-image # force-skip both rebuilds (fastest) +# ./mock_backend/deploy_kind.sh --rebuild # force a full rebuild of both images +# ./mock_backend/deploy_kind.sh --rebuild-agent # rebuild agent only (after code changes) +# ./mock_backend/deploy_kind.sh --rebuild-mock # rebuild mock only +# ./mock_backend/deploy_kind.sh --no-validate # skip e2e.py +# ./mock_backend/deploy_kind.sh --reset-only # tear down mock + agent, leave cluster +# +# Token + host are auto-wired — you don't bring them. The script prints +# everything you need (token, URLs, kubectl tail commands) on success. +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_DIR="$(dirname "$SCRIPT_DIR")" + +KIND_CLUSTER="${KIND_CLUSTER_NAME:-drd-local}" +NS="drdroid" +MOCK_IMAGE="drd-vpc-agent-mock:local-latest" +MOCK_SVC_HOST="http://drd-vpc-agent-mock.${NS}.svc.cluster.local:8080" +PORT_FWD_LOCAL_PORT="${MOCK_PORT_FORWARD:-18080}" + +VALIDATE=1 +RESET_ONLY=0 +REUSE_AGENT_IMAGE=auto # auto | always | never +REUSE_MOCK_IMAGE=auto +SCENARIOS_FILE="${SCENARIOS_FILE:-$SCRIPT_DIR/scenarios/default.json}" +E2E_TIMEOUT="${E2E_TIMEOUT:-180}" + +while [[ $# -gt 0 ]]; do + case "$1" in + --no-validate) VALIDATE=0; shift ;; + --reset-only) RESET_ONLY=1; shift ;; + --reuse-image) REUSE_AGENT_IMAGE=always; REUSE_MOCK_IMAGE=always; shift ;; + --rebuild) REUSE_AGENT_IMAGE=never; REUSE_MOCK_IMAGE=never; shift ;; + --rebuild-agent) REUSE_AGENT_IMAGE=never; shift ;; + --rebuild-mock) REUSE_MOCK_IMAGE=never; shift ;; + --scenarios) SCENARIOS_FILE="$2"; shift 2 ;; + --timeout) E2E_TIMEOUT="$2"; shift 2 ;; + -h|--help) sed -n '2,24p' "$0"; exit 0 ;; + *) echo "unknown arg: $1" >&2; exit 2 ;; + esac +done + +log() { echo "[deploy_kind] $*" >&2; } + +# Reset path ------------------------------------------------------------- +if [[ "$RESET_ONLY" -eq 1 ]]; then + log "uninstalling agent helm release" + helm status drd-vpc-agent -n "$NS" >/dev/null 2>&1 && helm uninstall drd-vpc-agent -n "$NS" || true + log "deleting mock manifest" + kubectl delete -f "$SCRIPT_DIR/k8s/manifests.yaml" --ignore-not-found + log "done" + exit 0 +fi + +# 1. cluster ------------------------------------------------------------- +if ! kind get clusters 2>/dev/null | grep -q "^${KIND_CLUSTER}$"; then + log "creating Kind cluster '${KIND_CLUSTER}'" + kind create cluster --name "${KIND_CLUSTER}" +fi +kubectl config use-context "kind-${KIND_CLUSTER}" >/dev/null +kubectl create namespace "$NS" --dry-run=client -o yaml | kubectl apply -f - >/dev/null + +# 2. build + load mock image -------------------------------------------- +should_rebuild_mock=1 +if [[ "$REUSE_MOCK_IMAGE" == "always" ]]; then + should_rebuild_mock=0 +elif [[ "$REUSE_MOCK_IMAGE" == "auto" ]] && docker image inspect "$MOCK_IMAGE" >/dev/null 2>&1; then + # Auto-reuse only if no source files are newer than the image. + IMAGE_TS=$(docker image inspect -f '{{.Created}}' "$MOCK_IMAGE" 2>/dev/null | xargs -I{} date -j -f '%Y-%m-%dT%H:%M:%S' "{}" +%s 2>/dev/null || echo 0) + NEWEST_SRC=$(find "$SCRIPT_DIR" -maxdepth 2 \( -name '*.py' -o -name 'requirements.txt' -o -name 'Dockerfile' \) -not -path '*/.venv/*' -not -path '*/data/*' -print0 2>/dev/null | xargs -0 stat -f '%m' 2>/dev/null | sort -n | tail -1) + if [[ -n "$IMAGE_TS" && -n "$NEWEST_SRC" && "$NEWEST_SRC" -le "$IMAGE_TS" ]]; then + should_rebuild_mock=0 + fi +fi + +if [[ "$should_rebuild_mock" -eq 1 ]]; then + log "building mock image: $MOCK_IMAGE" + docker build -t "$MOCK_IMAGE" "$SCRIPT_DIR" >/dev/null +else + log "reusing existing mock image: $MOCK_IMAGE" +fi + +log "kind load $MOCK_IMAGE (idempotent)" +kind load docker-image "$MOCK_IMAGE" --name "${KIND_CLUSTER}" >/dev/null 2>&1 || true + +# 3. apply mock manifests ------------------------------------------------ +log "applying mock manifests" +kubectl apply -f "$SCRIPT_DIR/k8s/manifests.yaml" >/dev/null + +# Force a pod recycle so a new image is picked up on rebuild. +kubectl -n "$NS" rollout restart deployment/drd-vpc-agent-mock >/dev/null + +# 4. wait for readiness -------------------------------------------------- +log "waiting for mock pod to be ready" +kubectl -n "$NS" rollout status deployment/drd-vpc-agent-mock --timeout=120s >/dev/null + +# 5. port-forward briefly + mint a token -------------------------------- +log "minting token via port-forward on :${PORT_FWD_LOCAL_PORT}" +kubectl -n "$NS" port-forward svc/drd-vpc-agent-mock "${PORT_FWD_LOCAL_PORT}:8080" >/tmp/drd-mock-pf.log 2>&1 & +PF_PID=$! +trap 'kill "$PF_PID" 2>/dev/null || true' EXIT INT TERM + +# Wait until the forward is live. +for _ in $(seq 1 50); do + if curl -sf "http://127.0.0.1:${PORT_FWD_LOCAL_PORT}/health" >/dev/null; then break; fi + sleep 0.2 +done +if ! curl -sf "http://127.0.0.1:${PORT_FWD_LOCAL_PORT}/health" >/dev/null; then + log "port-forward never came up — see /tmp/drd-mock-pf.log" + exit 1 +fi + +# Wipe queues + recordings between runs but keep tokens (so a long-running +# agent in the cluster wouldn't lose its credential — defensive). +curl -sf -X POST "http://127.0.0.1:${PORT_FWD_LOCAL_PORT}/admin/reset?keep_tokens=true" >/dev/null + +TOKEN=$(curl -sf -X POST "http://127.0.0.1:${PORT_FWD_LOCAL_PORT}/admin/tokens" \ + -H 'content-type: application/json' -d '{"label":"kind-agent"}' \ + | python3 -c 'import json,sys;print(json.load(sys.stdin)["token"])') +log "token: ${TOKEN:0:20}…" + +# 5b. ensure credentials-secret exists ---------------------------------- +# The helm chart mounts `credentials-secret` into the celery pods, but +# its manifest at helm/credentials-secret.yaml is NOT under templates/ +# so `helm install` never creates it. Without this secret kubelet hangs +# the pods in PodInitializing forever (FailedMount). +# +# Build it from credentials/secrets.yaml on the host. Empty file is fine +# — the helm chart just needs the volume to exist. Idempotent. +log "ensuring credentials-secret exists in $NS" +SECRETS_FILE="$REPO_DIR/credentials/secrets.yaml" +if [[ ! -f "$SECRETS_FILE" ]]; then + # Create an empty file rather than failing — most local test setups + # don't need any real connector creds since NATIVE_KUBERNETES_API_MODE + # is on. + log " no credentials/secrets.yaml found; creating empty placeholder" + : > "$SECRETS_FILE" +fi +kubectl -n "$NS" create secret generic credentials-secret \ + --from-file=secrets.yaml="$SECRETS_FILE" \ + --dry-run=client -o yaml | kubectl apply -f - >/dev/null + +# 6. build (or reuse) the agent image + deploy via helm ----------------- +# Self-contained — does not depend on the user's local deploy_local.sh +# (which is gitignored and per-machine). Logic: +# a. compute image tag from git commit +# b. decide skip-build (auto: skip when image already exists; --rebuild-agent forces) +# c. docker build + kind load (when not skipping) +# d. helm uninstall any prior release +# e. write helm/values.local.yaml with token + host + image tag +# f. helm upgrade --install +COMMIT_HASH=$(cd "$REPO_DIR" && git rev-parse --short HEAD 2>/dev/null || echo "local-dev") +AGENT_IMAGE_NAME="drd-vpc-agent" +AGENT_IMAGE_TAG="local-${COMMIT_HASH}" +AGENT_FULL_IMAGE="${AGENT_IMAGE_NAME}:${AGENT_IMAGE_TAG}" + +SKIP_AGENT_BUILD=0 +case "$REUSE_AGENT_IMAGE" in + always) SKIP_AGENT_BUILD=1 ;; + auto) + if docker image inspect "$AGENT_FULL_IMAGE" >/dev/null 2>&1; then + log "auto-reuse: $AGENT_FULL_IMAGE already exists; skipping agent build" + log " (pass --rebuild-agent to force rebuild after agent code changes)" + SKIP_AGENT_BUILD=1 + fi ;; + never) SKIP_AGENT_BUILD=0 ;; +esac + +cd "$REPO_DIR" + +if [[ "$SKIP_AGENT_BUILD" -eq 0 ]]; then + log "building agent image: $AGENT_FULL_IMAGE (this can take ~20min on first alpine build)" + docker build \ + --build-arg COMMIT_HASH="$COMMIT_HASH" \ + -t "$AGENT_FULL_IMAGE" \ + -t "${AGENT_IMAGE_NAME}:local-latest" \ + "$REPO_DIR" + log "kind load $AGENT_FULL_IMAGE" + kind load docker-image "$AGENT_FULL_IMAGE" --name "${KIND_CLUSTER}" >/dev/null +else + # Image exists locally; ensure it's also loaded into kind (idempotent NOP if already there). + kind load docker-image "$AGENT_FULL_IMAGE" --name "${KIND_CLUSTER}" >/dev/null 2>&1 || true +fi + +# Uninstall any previous helm release so we get a clean roll. +if helm status drd-vpc-agent -n "$NS" >/dev/null 2>&1; then + log "uninstalling previous helm release" + helm uninstall drd-vpc-agent -n "$NS" >/dev/null +fi + +# Write a values override file pointing helm at the mock + the local image. +VALUES_LOCAL="$REPO_DIR/helm/values.local.yaml" +{ + echo "global:" + echo " DRD_CLOUD_API_TOKEN: \"$TOKEN\"" + echo " DRD_CLOUD_API_HOST: \"$MOCK_SVC_HOST\"" + cat < "$VALUES_LOCAL" + +# The chart's configmap.yaml is at helm/ root (not under templates/), so +# helm install doesn't apply it. Same with credentials-secret (already +# handled above). +log "applying helm/configmap.yaml" +kubectl apply -f "$REPO_DIR/helm/configmap.yaml" -n "$NS" >/dev/null + +log "helm upgrade --install drd-vpc-agent" +helm upgrade --install drd-vpc-agent "$REPO_DIR/helm" \ + -n "$NS" \ + -f "$REPO_DIR/helm/values.yaml" \ + -f "$VALUES_LOCAL" + +# Give the agent a few seconds to call /connectors/proxy/ping etc. +sleep 5 + +# 7. seed scenarios ------------------------------------------------------ +log "seeding scenarios from $SCENARIOS_FILE" +# Use the venv created by run_mock.sh / run_e2e.sh if it exists, otherwise +# fall back to system python — both will work, only httpx is needed. +if [[ -x "$SCRIPT_DIR/.venv/bin/python" ]]; then + PY="$SCRIPT_DIR/.venv/bin/python" +else + PY="python3" + "$PY" -c 'import httpx' 2>/dev/null || pip install --quiet httpx +fi +"$PY" "$SCRIPT_DIR/seed_scenarios.py" \ + --host "http://127.0.0.1:${PORT_FWD_LOCAL_PORT}" \ + --file "$SCENARIOS_FILE" + +echo +echo "==============================================================" +echo "Mock backend : $MOCK_SVC_HOST (in-cluster)" +echo "Mock admin : http://127.0.0.1:${PORT_FWD_LOCAL_PORT} (port-forward, while this script lives)" +echo "Token : $TOKEN" +echo "Agent pods : kubectl -n ${NS} get pods" +echo "Agent logs : kubectl -n ${NS} logs -l app=celery-worker -f" +echo "Mock logs : kubectl -n ${NS} logs deploy/drd-vpc-agent-mock -f" +echo "Inspect calls : curl -s http://127.0.0.1:${PORT_FWD_LOCAL_PORT}/admin/recorded | jq" +echo "==============================================================" +echo + +if [[ "$VALIDATE" -eq 1 ]]; then + log "running e2e validator (timeout ${E2E_TIMEOUT}s)" + set +e + "$PY" "$SCRIPT_DIR/e2e.py" \ + --host "http://127.0.0.1:${PORT_FWD_LOCAL_PORT}" \ + --file "$SCENARIOS_FILE" \ + --timeout "$E2E_TIMEOUT" + RC=$? + set -e + if [[ $RC -eq 0 ]]; then + log "PASS" + else + log "FAIL — see logs above. Pods + mock are still running for inspection." + fi + exit $RC +else + log "skipping validation (--no-validate). Port-forward holds while this script runs; Ctrl-C to release." + wait "$PF_PID" +fi diff --git a/mock_backend/e2e.py b/mock_backend/e2e.py new file mode 100644 index 0000000..7a3ad2e --- /dev/null +++ b/mock_backend/e2e.py @@ -0,0 +1,414 @@ +"""Lightweight end-to-end validator. + +Assumes the mock is running and an agent has been pointed at it. After +seeding scenarios this script polls the recordings and asserts the agent +exhibited the expected behaviours: startup ping, periodic heartbeat, drained +the queues, and reported back results for everything we enqueued. + +Designed to be run after `run_e2e.sh` brings everything up. Returns +non-zero exit on failure with a human-readable diff. +""" +from __future__ import annotations + +import argparse +import json +import shutil +import subprocess +import sys +import time +from dataclasses import dataclass, field +from pathlib import Path + +import httpx + + +@dataclass +class CheckResult: + name: str + ok: bool + detail: str = "" + + def render(self) -> str: + marker = "PASS" if self.ok else "FAIL" + return f" [{marker}] {self.name}" + (f" — {self.detail}" if self.detail else "") + + +@dataclass +class Report: + checks: list[CheckResult] = field(default_factory=list) + + def add(self, name: str, ok: bool, detail: str = "") -> None: + self.checks.append(CheckResult(name, ok, detail)) + + @property + def ok(self) -> bool: + return all(c.ok for c in self.checks) + + def render(self) -> str: + lines = [c.render() for c in self.checks] + lines.append("") + lines.append(f" TOTAL: {sum(1 for c in self.checks if c.ok)}/{len(self.checks)} passed") + return "\n".join(lines) + + +def _get(client: httpx.Client, path: str) -> dict: + r = client.get(path) + r.raise_for_status() + return r.json() + + +def _events(client: httpx.Client, bucket: str) -> list[dict]: + return _get(client, f"/admin/recorded/{bucket}").get("events", []) + + +def wait_for(client: httpx.Client, bucket: str, predicate, timeout: float, label: str) -> tuple[bool, list[dict]]: + deadline = time.time() + timeout + last: list[dict] = [] + while time.time() < deadline: + last = _events(client, bucket) + if predicate(last): + return True, last + time.sleep(1.0) + return False, last + + +def _kubectl_command_from_scenario(scenario: dict) -> str | None: + """Return the kubectl command string from a scenario, or None if it's not a Kubectl COMMAND task.""" + task = scenario.get("task") or {} + if task.get("source") != "KUBERNETES": + return None + k = task.get("kubernetes") or {} + if k.get("type") != "COMMAND": + return None + cmd = (k.get("command") or {}).get("command") + return cmd if isinstance(cmd, str) else None + + +def _find_kubectl_output(results_events: list[dict], command: str) -> str | None: + """Walk recorded playbook_tasks_results events and return the agent's + output string for the given command (None if not found).""" + for ev in results_events: + body = ev.get("body") or {} + for log in body.get("playbook_task_execution_logs", []) or []: + result = log.get("result") or {} + bash = result.get("bash_command_output") or {} + for co in bash.get("command_outputs", []) or []: + # MessageToDict gives bare strings for StringValue; older + # encodings might give {"value": "..."} — handle both. + cmd_field = co.get("command") + if isinstance(cmd_field, dict): + cmd_field = cmd_field.get("value") + if cmd_field == command or (cmd_field or "").strip() == command.strip(): + out = co.get("output") + if isinstance(out, dict): + out = out.get("value") + return out + return None + + +def _host_kubectl(args: list[str]) -> tuple[bool, str]: + """Run kubectl on the host. Returns (ok, stdout-or-error).""" + if not shutil.which("kubectl"): + return False, "host kubectl not in PATH" + try: + proc = subprocess.run(["kubectl", *args], capture_output=True, text=True, timeout=15) + except subprocess.TimeoutExpired: + return False, "host kubectl timed out" + if proc.returncode != 0: + return False, f"exit {proc.returncode}: {proc.stderr.strip()[:200]}" + return True, proc.stdout + + +def _names_from_list(blob: str) -> set[str]: + """Pull `metadata.name` out of a `kubectl get ... -o json` listing.""" + try: + data = json.loads(blob) + except json.JSONDecodeError: + return set() + return {(it.get("metadata") or {}).get("name") for it in data.get("items", []) if (it.get("metadata") or {}).get("name")} + + +def _check_kubectl_scenarios(client: httpx.Client, expected_pt: list[dict], report: Report, timeout: float) -> None: + """For each kubectl scenario: assert agent reported a parseable JSON + output, the expected stable item is present, and the set of names + matches the host's `kubectl` snapshot when available.""" + kubectl_scenarios = [(s, _kubectl_command_from_scenario(s)) for s in expected_pt] + kubectl_scenarios = [(s, c) for s, c in kubectl_scenarios if c] + if not kubectl_scenarios: + return + + # Wait for results to be in. The earlier "results posted back" check + # already gated on this, but we re-pull fresh recordings here. + deadline = time.time() + min(timeout, 60) + results: list[dict] = [] + while time.time() < deadline: + results = _events(client, "playbook_tasks_results") + if any(_find_kubectl_output(results, c) is not None for _, c in kubectl_scenarios): + break + time.sleep(1.0) + + host_kubectl_available = shutil.which("kubectl") is not None + if not host_kubectl_available: + report.add( + "host kubectl available for ground-truth comparison", + False, + "kubectl not in PATH — agent outputs will be self-checked only", + ) + + for scenario, command in kubectl_scenarios: + output = _find_kubectl_output(results, command) + if output is None: + report.add(f"agent reported output for: {command}", False, "no matching command_output in recordings") + continue + + # Parseability — every command in our default set asks for `-o json`. + try: + parsed = json.loads(output) + except json.JSONDecodeError as e: + report.add(f"agent output is valid JSON: {command}", False, f"json error: {e}") + continue + report.add(f"agent output is valid JSON: {command}", True) + + # Self-checks (deployment-stable invariants we set up ourselves). + if "get pods -n drdroid" in command: + agent_names = {(it.get("metadata") or {}).get("name") for it in parsed.get("items", [])} + agent_names = {n for n in agent_names if n} + mock_pod = any(n.startswith("drd-vpc-agent-mock") for n in agent_names) + report.add("mock backend pod visible in agent's `get pods -n drdroid`", mock_pod, + f"saw {len(agent_names)} pods: {sorted(agent_names)[:5]}…") + celery_pod = any(n.startswith("drd-vpc-agent-celery") or "celery" in n for n in agent_names) + report.add("agent celery pod visible in `get pods -n drdroid`", celery_pod, + f"pod set: {sorted(agent_names)}") + elif "get svc -n drdroid" in command: + agent_names = {(it.get("metadata") or {}).get("name") for it in parsed.get("items", [])} + agent_names = {n for n in agent_names if n} + report.add("mock backend service visible in agent's `get svc -n drdroid`", + "drd-vpc-agent-mock" in agent_names, + f"saw services: {sorted(agent_names)}") + elif "get namespaces" in command: + agent_names = {(it.get("metadata") or {}).get("name") for it in parsed.get("items", [])} + agent_names = {n for n in agent_names if n} + report.add("`drdroid` namespace visible in agent's `get namespaces`", + "drdroid" in agent_names, + f"saw namespaces: {sorted(agent_names)}") + elif "get events -n drdroid" in command: + event_count = len(parsed.get("items", [])) + report.add("agent's `get events -n drdroid` returned events", + event_count > 0, + f"saw {event_count} events") + + # Host-side ground truth (skipped if host has no kubectl context for this cluster). + if not host_kubectl_available: + continue + + agent_set = _names_from_list(output) + host_args: list[str] | None = None + if "get pods -n drdroid" in command: + host_args = ["get", "pods", "-n", "drdroid", "-o", "json"] + elif "get svc -n drdroid" in command: + host_args = ["get", "svc", "-n", "drdroid", "-o", "json"] + elif "get namespaces" in command: + host_args = ["get", "namespaces", "-o", "json"] + elif "get events -n drdroid" in command: + host_args = ["get", "events", "-n", "drdroid", "-o", "json"] + if host_args is None: + continue + + ok, host_blob = _host_kubectl(host_args) + if not ok: + report.add(f"host ground-truth: {' '.join(host_args)}", False, host_blob) + continue + host_set = _names_from_list(host_blob) + + # Quiet clusters should agree on names. Allow tiny drift for events + # (the only churn-y resource) — for that one, just ensure both sides + # are non-empty. + if "events" in host_args: + ok = bool(agent_set or len(json.loads(output).get("items", []))) and bool(host_set or len(json.loads(host_blob).get("items", []))) + report.add(f"host vs agent both report events for `{' '.join(host_args)}`", ok) + else: + missing_in_agent = host_set - agent_set + extra_in_agent = agent_set - host_set + ok = not missing_in_agent and not extra_in_agent + detail = f"agent={sorted(agent_set)} host={sorted(host_set)}" + if not ok: + detail = f"missing_in_agent={sorted(missing_in_agent)} extra_in_agent={sorted(extra_in_agent)} | {detail}" + report.add(f"host kubectl set matches agent's: `{' '.join(host_args)}`", ok, detail) + + +def _configured_connector_names() -> set[str]: + """Mirror seed_scenarios.loaded_connector_names so e2e.py and the + seeder agree on which connection-tests will actually be queued.""" + secrets_path = Path(__file__).resolve().parent.parent / "credentials" / "secrets.yaml" + if not secrets_path.exists(): + return set() + try: + import yaml # type: ignore + except ImportError: + return set() + try: + data = yaml.safe_load(secrets_path.read_text()) or {} + except yaml.YAMLError: + return set() + return set(data.keys()) if isinstance(data, dict) else set() + + +def run(host: str, scenarios_path: Path, timeout: float) -> Report: + report = Report() + scenarios = json.loads(scenarios_path.read_text()) + expected_ct = [c for c in scenarios.get("connection_tests", []) if any(not k.startswith("_") and not k.startswith("$") for k in c)] + # Match seed_scenarios.py: skip connection-tests whose connector + # isn't configured locally — the agent silently drops them. + configured = _configured_connector_names() + expected_ct = [c for c in expected_ct if c.get("connector_name") in configured] + expected_pt = [p for p in scenarios.get("playbook_tasks", []) if any(not k.startswith("_") and not k.startswith("$") for k in p)] + + with httpx.Client(base_url=host, timeout=10.0) as client: + # 1. startup ping. + ok, evts = wait_for(client, "ping_get", lambda e: len(e) >= 1, timeout, "startup ping") + report.add( + "agent issued startup ping (GET /connectors/proxy/ping)", + ok, + f"saw {len(evts)} ping_get events", + ) + + # 2. periodic heartbeat — POST /connectors/proxy/ping (every 50s in + # default schedule, so allow a generous window). + ok, evts = wait_for(client, "ping_post", lambda e: len(e) >= 1, max(timeout, 70.0), "heartbeat") + report.add( + "agent sent periodic heartbeat (POST /connectors/proxy/ping)", + ok, + f"saw {len(evts)} ping_post events", + ) + + # 3. connection-test loop — agent polled and reported. + ok, polls = wait_for(client, "connection_tests_poll", lambda e: len(e) >= 1, timeout, "ct poll") + report.add( + "agent polled connection_tests endpoint", + ok, + f"saw {len(polls)} polls", + ) + + if expected_ct: + expected_request_ids = set() + ct_drained = False + ct_deadline = time.time() + timeout + while time.time() < ct_deadline: + polls = _events(client, "connection_tests_poll") + for ev in polls: + for delivered in ev.get("delivered") or []: + if delivered.get("request_id"): + expected_request_ids.add(delivered["request_id"]) + # Wait for as many request_ids to be delivered as we enqueued. + if len(expected_request_ids) >= len(expected_ct): + ct_drained = True + break + time.sleep(1.0) + report.add( + f"connection_tests drained ({len(expected_ct)} enqueued)", + ct_drained, + f"agent received {len(expected_request_ids)} request_ids", + ) + + results = _events(client, "connection_tests_results") + seen_results = set() + for ev in results: + body = ev.get("body") or {} + for r in body.get("results", []) or []: + if r.get("request_id"): + seen_results.add(r["request_id"]) + report.add( + "agent posted connection_tests results back", + expected_request_ids.issubset(seen_results), + f"reported {len(seen_results & expected_request_ids)}/{len(expected_request_ids)} request_ids", + ) + + # 4. playbook-task loop. + ok, polls = wait_for(client, "playbook_tasks_poll", lambda e: len(e) >= 1, timeout, "pt poll") + report.add( + "agent polled playbook_tasks endpoint", + ok, + f"saw {len(polls)} polls", + ) + + if expected_pt: + seen_request_ids: set[str] = set() + seen_deadline = time.time() + timeout + while time.time() < seen_deadline: + polls = _events(client, "playbook_tasks_poll") + for ev in polls: + for delivered in ev.get("delivered") or []: + if delivered.get("proxy_execution_request_id"): + seen_request_ids.add(delivered["proxy_execution_request_id"]) + if len(seen_request_ids) >= len(expected_pt): + break + time.sleep(1.0) + report.add( + f"playbook_tasks drained ({len(expected_pt)} enqueued)", + len(seen_request_ids) >= len(expected_pt), + f"agent received {len(seen_request_ids)} request_ids", + ) + + # Result endpoint should fire once per task. + result_ids: set[str] = set() + errored: list[str] = [] + res_deadline = time.time() + timeout + while time.time() < res_deadline: + for ev in _events(client, "playbook_tasks_results"): + for s in ev.get("summary") or []: + if s.get("request_id"): + result_ids.add(s["request_id"]) + if s.get("has_error"): + errored.append(f"{s['request_id']}: {s.get('error')}") + if seen_request_ids.issubset(result_ids): + break + time.sleep(1.0) + report.add( + "agent posted playbook_tasks results back", + seen_request_ids.issubset(result_ids), + f"reported {len(seen_request_ids & result_ids)}/{len(seen_request_ids)} request_ids", + ) + if errored: + report.add( + "playbook tasks executed without errors", + False, + "errors: " + "; ".join(errored[:3]) + (f" (+{len(errored)-3})" if len(errored) > 3 else ""), + ) + + # Asset refresh tasks should drive metadata/register batches. + has_asset_refresh = any( + (p.get("task") or {}).get("drd_proxy_agent", {}).get("type") == "ASSET_REFRESH" + for p in expected_pt + ) + if has_asset_refresh: + metadata_events = _events(client, "metadata_register") + report.add( + "ASSET_REFRESH produced metadata register batches", + len(metadata_events) >= 1, + f"saw {len(metadata_events)} batches", + ) + + # Real kubectl tasks: assert the agent's reported output is + # parseable, contains expected stable items, and matches a + # host-side `kubectl` snapshot when available. + _check_kubectl_scenarios(client, expected_pt, report, timeout) + + return report + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--host", default="http://localhost:8080") + ap.add_argument("--file", default=str(Path(__file__).parent / "scenarios/default.json")) + ap.add_argument("--timeout", type=float, default=90.0, + help="seconds to wait for each phase before failing") + args = ap.parse_args() + + print(f"[e2e] mock={args.host} scenarios={args.file} timeout={args.timeout}s", file=sys.stderr) + report = run(args.host, Path(args.file), args.timeout) + print(report.render()) + return 0 if report.ok else 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/mock_backend/k8s/manifests.yaml b/mock_backend/k8s/manifests.yaml new file mode 100644 index 0000000..c4af4f0 --- /dev/null +++ b/mock_backend/k8s/manifests.yaml @@ -0,0 +1,70 @@ +# Mock backend deployed inside the drdroid namespace alongside the agent. +# Agent pods reach it via in-cluster DNS at: +# http://drd-vpc-agent-mock.drdroid.svc.cluster.local:8080 +# The host reaches its admin API by `kubectl port-forward`. +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: drd-vpc-agent-mock + namespace: drdroid + labels: + app: drd-vpc-agent-mock +spec: + replicas: 1 + selector: + matchLabels: + app: drd-vpc-agent-mock + template: + metadata: + labels: + app: drd-vpc-agent-mock + spec: + containers: + - name: mock + image: drd-vpc-agent-mock:local-latest + imagePullPolicy: Never + ports: + - name: http + containerPort: 8080 + readinessProbe: + httpGet: + path: /health + port: http + initialDelaySeconds: 1 + periodSeconds: 2 + livenessProbe: + httpGet: + path: /health + port: http + initialDelaySeconds: 5 + periodSeconds: 10 + resources: + requests: + cpu: "50m" + memory: "64Mi" + limits: + cpu: "500m" + memory: "256Mi" + volumeMounts: + - name: data + mountPath: /app/data + volumes: + - name: data + emptyDir: {} +--- +apiVersion: v1 +kind: Service +metadata: + name: drd-vpc-agent-mock + namespace: drdroid + labels: + app: drd-vpc-agent-mock +spec: + type: ClusterIP + selector: + app: drd-vpc-agent-mock + ports: + - name: http + port: 8080 + targetPort: http diff --git a/mock_backend/requirements.txt b/mock_backend/requirements.txt new file mode 100644 index 0000000..7082f16 --- /dev/null +++ b/mock_backend/requirements.txt @@ -0,0 +1,4 @@ +fastapi>=0.110 +uvicorn[standard]>=0.27 +pyyaml>=6.0 +httpx>=0.27 diff --git a/mock_backend/run_agent.sh b/mock_backend/run_agent.sh new file mode 100755 index 0000000..328bf1c --- /dev/null +++ b/mock_backend/run_agent.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +# Boot the VPC agent (celery beat + worker + exec-worker) pointed at the +# mock backend. Token + host are required — get them from run_mock.sh. +# +# DRD_CLOUD_API_TOKEN=... DRD_CLOUD_API_HOST=http://127.0.0.1:8080 ./run_agent.sh +# +# This expects: redis on $REDIS_URL (default localhost:6379), the agent's +# Python deps installed in the active environment, and credentials/secrets.yaml +# populated with at least the connectors referenced in your scenarios. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_DIR="$(dirname "$SCRIPT_DIR")" + +: "${DRD_CLOUD_API_TOKEN:?set DRD_CLOUD_API_TOKEN — get one from run_mock.sh}" +: "${DRD_CLOUD_API_HOST:=http://127.0.0.1:8080}" +: "${CELERY_BROKER_URL:=redis://localhost:6379/0}" +: "${CELERY_RESULT_BACKEND:=redis://localhost:6379/0}" +: "${REDIS_URL:=redis://localhost:6379/0}" +: "${VPC_AGENT_COMMIT_HASH:=mock-test}" + +export DRD_CLOUD_API_TOKEN DRD_CLOUD_API_HOST CELERY_BROKER_URL CELERY_RESULT_BACKEND REDIS_URL VPC_AGENT_COMMIT_HASH + +cd "$REPO_DIR" + +LOG_DIR="$SCRIPT_DIR/data/agent-logs" +mkdir -p "$LOG_DIR" + +echo "[run_agent] migrating sqlite…" >&2 +python manage.py migrate >/dev/null + +echo "[run_agent] starting celery beat" >&2 +celery -A agent beat -l INFO --pidfile="$LOG_DIR/beat.pid" >"$LOG_DIR/beat.log" 2>&1 & +BEAT_PID=$! + +echo "[run_agent] starting celery worker (default queue)" >&2 +celery -A agent worker -l INFO -Q celery --concurrency=2 \ + --pidfile="$LOG_DIR/worker.pid" >"$LOG_DIR/worker.log" 2>&1 & +WORKER_PID=$! + +echo "[run_agent] starting celery worker (exec queue)" >&2 +CELERY_QUEUE=exec celery -A agent worker -l INFO -Q exec --concurrency=2 \ + --pidfile="$LOG_DIR/worker-exec.pid" >"$LOG_DIR/worker-exec.log" 2>&1 & +EXEC_PID=$! + +trap 'kill $BEAT_PID $WORKER_PID $EXEC_PID 2>/dev/null || true' EXIT INT TERM + +echo "==============================================================" >&2 +echo "[run_agent] agent running. logs in $LOG_DIR" >&2 +echo "[run_agent] beat=$BEAT_PID worker=$WORKER_PID exec=$EXEC_PID" >&2 +echo "[run_agent] press Ctrl-C to stop" >&2 +echo "==============================================================" >&2 + +wait diff --git a/mock_backend/run_e2e.sh b/mock_backend/run_e2e.sh new file mode 100755 index 0000000..e4bf6d7 --- /dev/null +++ b/mock_backend/run_e2e.sh @@ -0,0 +1,105 @@ +#!/usr/bin/env bash +# One-shot end-to-end loop: +# 1. start the FastAPI mock in the background +# 2. issue a token, export DRD_CLOUD_API_HOST/TOKEN +# 3. start celery beat + workers in the background +# 4. seed scenarios +# 5. wait for the agent to drain everything +# 6. validate via e2e.py +# 7. tear everything down (PASS/FAIL exit code) +# +# Requirements: +# - redis available at $REDIS_URL (default redis://localhost:6379/0) +# - VPC agent's Python deps importable from the current shell +# (e.g. `pip install -r requirements.txt` from repo root, or activate venv) +# - credentials/secrets.yaml populated with the connectors referenced +# in your scenarios file +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_DIR="$(dirname "$SCRIPT_DIR")" + +PORT="${MOCK_BACKEND_PORT:-8080}" +SCENARIOS_FILE="${SCENARIOS_FILE:-$SCRIPT_DIR/scenarios/default.json}" +E2E_TIMEOUT="${E2E_TIMEOUT:-120}" + +cd "$SCRIPT_DIR" + +# 1. mock backend --------------------------------------------------------- + +VENV="$SCRIPT_DIR/.venv" +if [[ ! -d "$VENV" ]]; then + echo "[e2e] creating mock-backend venv" >&2 + python3 -m venv "$VENV" + "$VENV/bin/pip" install --upgrade pip >/dev/null + "$VENV/bin/pip" install -r requirements.txt >/dev/null +fi + +LOG_DIR="$SCRIPT_DIR/data/agent-logs" +MOCK_LOG="$SCRIPT_DIR/data/mock-backend.log" +mkdir -p "$LOG_DIR" +: > "$MOCK_LOG" + +"$VENV/bin/uvicorn" app:app --host 0.0.0.0 --port "$PORT" --log-level warning >>"$MOCK_LOG" 2>&1 & +MOCK_PID=$! +trap 'echo "[e2e] tearing down" >&2; kill $MOCK_PID 2>/dev/null || true; pkill -P $$ 2>/dev/null || true' EXIT INT TERM + +for _ in $(seq 1 50); do + curl -sf "http://127.0.0.1:${PORT}/health" >/dev/null && break + sleep 0.1 +done +curl -sf "http://127.0.0.1:${PORT}/health" >/dev/null || { echo "[e2e] mock did not start — see $MOCK_LOG" >&2; exit 1; } +echo "[e2e] mock up on :${PORT}" >&2 + +# 2. token + env --------------------------------------------------------- + +TOKEN=$(curl -sf -X POST "http://127.0.0.1:${PORT}/admin/tokens" \ + -H 'content-type: application/json' -d '{"label":"e2e"}' \ + | "$VENV/bin/python" -c 'import json,sys;print(json.load(sys.stdin)["token"])') +echo "[e2e] issued token ${TOKEN:0:14}…" >&2 + +export DRD_CLOUD_API_TOKEN="$TOKEN" +export DRD_CLOUD_API_HOST="http://127.0.0.1:${PORT}" +export VPC_AGENT_COMMIT_HASH="${VPC_AGENT_COMMIT_HASH:-e2e-test}" +export CELERY_BROKER_URL="${CELERY_BROKER_URL:-redis://localhost:6379/0}" +export CELERY_RESULT_BACKEND="${CELERY_RESULT_BACKEND:-redis://localhost:6379/0}" +export REDIS_URL="${REDIS_URL:-redis://localhost:6379/0}" + +# 3. wipe prior recordings & start agent --------------------------------- + +curl -sf -X POST "http://127.0.0.1:${PORT}/admin/reset?keep_tokens=true" >/dev/null + +cd "$REPO_DIR" +echo "[e2e] migrating sqlite" >&2 +python manage.py migrate >/dev/null + +echo "[e2e] starting celery beat + workers" >&2 +celery -A agent beat -l INFO --pidfile="$LOG_DIR/beat.pid" >"$LOG_DIR/beat.log" 2>&1 & +celery -A agent worker -l INFO -Q celery --concurrency=2 --pidfile="$LOG_DIR/worker.pid" >"$LOG_DIR/worker.log" 2>&1 & +celery -A agent worker -l INFO -Q exec --concurrency=2 --pidfile="$LOG_DIR/worker-exec.pid" >"$LOG_DIR/worker-exec.log" 2>&1 & + +# Give celery a moment to register beat/worker schedules. +sleep 4 + +# 4. seed scenarios ------------------------------------------------------ + +echo "[e2e] seeding $SCENARIOS_FILE" >&2 +"$VENV/bin/python" "$SCRIPT_DIR/seed_scenarios.py" \ + --host "http://127.0.0.1:${PORT}" --file "$SCENARIOS_FILE" + +# 5. validate ------------------------------------------------------------ + +set +e +"$VENV/bin/python" "$SCRIPT_DIR/e2e.py" \ + --host "http://127.0.0.1:${PORT}" \ + --file "$SCENARIOS_FILE" \ + --timeout "$E2E_TIMEOUT" +RC=$? +set -e + +if [[ $RC -eq 0 ]]; then + echo "[e2e] PASS" >&2 +else + echo "[e2e] FAIL — see $LOG_DIR/*.log and $MOCK_LOG" >&2 +fi +exit $RC diff --git a/mock_backend/run_mock.sh b/mock_backend/run_mock.sh new file mode 100755 index 0000000..2b80593 --- /dev/null +++ b/mock_backend/run_mock.sh @@ -0,0 +1,82 @@ +#!/usr/bin/env bash +# Bring up the mock backend and print a freshly-issued bearer token. +# +# ./run_mock.sh [--port 8080] [--no-token] +# +# The token is printed to stdout. Capture it like: +# TOKEN=$(./run_mock.sh --print-token-only) +# (in another terminal — the script blocks on uvicorn while serving) + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PORT="${MOCK_BACKEND_PORT:-8080}" +HOST="${MOCK_BACKEND_HOST:-0.0.0.0}" +ISSUE_TOKEN=1 +PRINT_TOKEN_ONLY=0 + +while [[ $# -gt 0 ]]; do + case "$1" in + --port) PORT="$2"; shift 2 ;; + --host) HOST="$2"; shift 2 ;; + --no-token) ISSUE_TOKEN=0; shift ;; + --print-token-only) PRINT_TOKEN_ONLY=1; shift ;; + -h|--help) + sed -n '2,12p' "$0" + exit 0 + ;; + *) echo "unknown arg: $1" >&2; exit 2 ;; + esac +done + +cd "$SCRIPT_DIR" + +# Lazy install — keep deps off the agent's environment by using a venv next to this dir. +VENV="$SCRIPT_DIR/.venv" +if [[ ! -d "$VENV" ]]; then + echo "[mock_backend] creating venv at $VENV" >&2 + python3 -m venv "$VENV" + "$VENV/bin/pip" install --upgrade pip >/dev/null + "$VENV/bin/pip" install -r requirements.txt >/dev/null +fi + +PYTHON="$VENV/bin/python" +UVICORN="$VENV/bin/uvicorn" + +# Kick off uvicorn in the background so we can issue a token, then bring it +# to the foreground with `wait`. +LOG_FILE="$SCRIPT_DIR/data/mock-backend.log" +mkdir -p "$SCRIPT_DIR/data" +"$UVICORN" app:app --host "$HOST" --port "$PORT" --log-level info >"$LOG_FILE" 2>&1 & +UVICORN_PID=$! +trap 'kill "$UVICORN_PID" 2>/dev/null || true' EXIT INT TERM + +# Wait for /health to come up. +for _ in $(seq 1 50); do + if curl -sf "http://127.0.0.1:${PORT}/health" >/dev/null; then break; fi + sleep 0.1 +done +if ! curl -sf "http://127.0.0.1:${PORT}/health" >/dev/null; then + echo "[mock_backend] failed to start — see $LOG_FILE" >&2 + exit 1 +fi + +if [[ "$ISSUE_TOKEN" -eq 1 ]]; then + TOKEN=$(curl -sf -X POST "http://127.0.0.1:${PORT}/admin/tokens" \ + -H 'content-type: application/json' -d '{"label":"agent"}' \ + | "$PYTHON" -c 'import json,sys;print(json.load(sys.stdin)["token"])') + if [[ "$PRINT_TOKEN_ONLY" -eq 1 ]]; then + # In token-only mode we exit immediately so the caller can eat stdout. + echo "$TOKEN" + kill "$UVICORN_PID" 2>/dev/null || true + exit 0 + fi + echo "==============================================================" >&2 + echo "[mock_backend] up at http://127.0.0.1:${PORT}" >&2 + echo "[mock_backend] token: $TOKEN" >&2 + echo "[mock_backend] export: DRD_CLOUD_API_TOKEN=$TOKEN" >&2 + echo "[mock_backend] export: DRD_CLOUD_API_HOST=http://127.0.0.1:${PORT}" >&2 + echo "==============================================================" >&2 +fi + +wait "$UVICORN_PID" diff --git a/mock_backend/scenarios/default.json b/mock_backend/scenarios/default.json new file mode 100644 index 0000000..4db2646 --- /dev/null +++ b/mock_backend/scenarios/default.json @@ -0,0 +1,69 @@ +{ + "$schema_note": "Loaded by seed_scenarios.py. Each item is sent through the matching /admin/queues/* endpoint after the mock comes up. Edit freely — no code change needed.", + + "connection_tests": [ + { + "connector_name": "connector_name_1", + "_doc": "Should match a connector key in credentials/secrets.yaml. Agent will run a real test against the configured creds and post the result back. Drop or replace if your secrets.yaml is empty." + } + ], + + "playbook_tasks": [ + { + "_doc": "ASSET_REFRESH: works with NATIVE_KUBERNETES_API_MODE=true (helm default) — no connector_name needed in secrets.yaml. Pulls every extract_* method from the kubernetes metadata extractor.", + "task": { + "drd_proxy_agent": { + "type": "ASSET_REFRESH", + "asset_refresh": { + "connector_name": {"value": "native_k8s"}, + "connector_type": {"value": 47} + } + } + } + }, + + { + "_doc": "Kubectl: list all pods in the drdroid namespace. e2e.py asserts the JSON parses, includes the mock backend's pod, and matches the host's kubectl snapshot.", + "task": { + "source": "KUBERNETES", + "kubernetes": { + "type": "COMMAND", + "command": {"command": "kubectl get pods -n drdroid -o json"} + } + } + }, + + { + "_doc": "Kubectl: list services in drdroid namespace. e2e.py asserts the mock's service shows up.", + "task": { + "source": "KUBERNETES", + "kubernetes": { + "type": "COMMAND", + "command": {"command": "kubectl get svc -n drdroid -o json"} + } + } + }, + + { + "_doc": "Kubectl: list cluster namespaces. e2e.py asserts drdroid is present and the set matches the host's view.", + "task": { + "source": "KUBERNETES", + "kubernetes": { + "type": "COMMAND", + "command": {"command": "kubectl get namespaces -o json"} + } + } + }, + + { + "_doc": "Kubectl: list events in drdroid namespace. e2e.py asserts the JSON parses and events list is non-empty (a fresh deploy always emits events).", + "task": { + "source": "KUBERNETES", + "kubernetes": { + "type": "COMMAND", + "command": {"command": "kubectl get events -n drdroid -o json"} + } + } + } + ] +} diff --git a/mock_backend/seed_scenarios.py b/mock_backend/seed_scenarios.py new file mode 100644 index 0000000..891b389 --- /dev/null +++ b/mock_backend/seed_scenarios.py @@ -0,0 +1,80 @@ +"""Push a scenarios JSON file into a running mock via its admin API.""" +from __future__ import annotations + +import argparse +import json +import sys +from pathlib import Path + +import httpx + +REPO_ROOT = Path(__file__).resolve().parent.parent +SECRETS_PATH = REPO_ROOT / "credentials" / "secrets.yaml" + + +def loaded_connector_names() -> set[str]: + """Return the set of connector names defined in credentials/secrets.yaml. + + The agent's connection-test executor silently drops requests for + connectors not in this set (no result posted), so seeding such tests + leaves the mock waiting forever. Filter at seed time. + """ + if not SECRETS_PATH.exists(): + return set() + try: + import yaml # type: ignore + except ImportError: + return set() + try: + data = yaml.safe_load(SECRETS_PATH.read_text()) or {} + except yaml.YAMLError: + return set() + if not isinstance(data, dict): + return set() + return set(data.keys()) + + +def seed(host: str, scenarios_path: Path, reset: bool = False) -> None: + data = json.loads(scenarios_path.read_text()) + with httpx.Client(base_url=host, timeout=10.0) as client: + if reset: + r = client.post("/admin/reset", params={"keep_tokens": "true"}) + r.raise_for_status() + print(f"reset: {r.json()}") + + ct = [c for c in data.get("connection_tests", []) if not str(next(iter(c), "")).startswith("$")] + ct = [c for c in ct if not all(k.startswith("_") or k.startswith("$") for k in c)] + # Drop connection-tests for connectors that don't exist in + # credentials/secrets.yaml — the agent will silently no-op them. + configured = loaded_connector_names() + skipped = [c for c in ct if c.get("connector_name") not in configured] + ct = [c for c in ct if c.get("connector_name") in configured] + if skipped: + names = ", ".join(c.get("connector_name", "?") for c in skipped) + print(f"connection_tests skipped (no matching connector in secrets.yaml): {names}") + if ct: + r = client.post("/admin/queues/connection-tests", json={"items": ct}) + r.raise_for_status() + print(f"connection_tests enqueued: {len(r.json().get('enqueued', []))}") + + pt = data.get("playbook_tasks", []) or [] + pt = [p for p in pt if any(not k.startswith("_") and not k.startswith("$") for k in p)] + if pt: + r = client.post("/admin/queues/playbook-tasks", json={"items": pt}) + r.raise_for_status() + print(f"playbook_tasks enqueued: {len(r.json().get('enqueued', []))}") + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--host", default="http://localhost:8080") + ap.add_argument("--file", default=str(Path(__file__).parent / "scenarios/default.json")) + ap.add_argument("--reset", action="store_true", help="clear queues + recordings before seeding") + args = ap.parse_args() + + seed(args.host, Path(args.file), reset=args.reset) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/mock_backend/storage.py b/mock_backend/storage.py new file mode 100644 index 0000000..133ec9e --- /dev/null +++ b/mock_backend/storage.py @@ -0,0 +1,195 @@ +"""Persistence + queues for the mock backend. + +Every inbound payload from the agent is appended to a JSONL file under +data/recorded/.jsonl so tests can replay and assert on the wire +traffic. Pending work (connection tests, playbook tasks) is held in JSON +files under data/queues/ that the admin API mutates and the polling +endpoints drain. +""" +from __future__ import annotations + +import json +import secrets +import threading +import time +import uuid +from pathlib import Path +from typing import Any + + +DATA_DIR = Path(__file__).parent / "data" +RECORDED_DIR = DATA_DIR / "recorded" +QUEUES_DIR = DATA_DIR / "queues" +TOKENS_FILE = DATA_DIR / "tokens.json" + +_lock = threading.Lock() + + +def _ensure_dirs() -> None: + RECORDED_DIR.mkdir(parents=True, exist_ok=True) + QUEUES_DIR.mkdir(parents=True, exist_ok=True) + + +def _load_json(path: Path, default: Any) -> Any: + if not path.exists(): + return default + try: + return json.loads(path.read_text()) + except json.JSONDecodeError: + return default + + +def _dump_json(path: Path, value: Any) -> None: + path.write_text(json.dumps(value, indent=2, default=str)) + + +# --- Token store --------------------------------------------------------- + +def issue_token(label: str | None = None) -> str: + """Mint a fresh bearer token. Returned plaintext is the only auth secret + the agent needs.""" + with _lock: + _ensure_dirs() + tokens = _load_json(TOKENS_FILE, {}) + token = "mock-" + secrets.token_urlsafe(24) + tokens[token] = { + "label": label or "agent", + "issued_at": time.time(), + } + _dump_json(TOKENS_FILE, tokens) + return token + + +def list_tokens() -> dict[str, dict]: + with _lock: + return _load_json(TOKENS_FILE, {}) + + +def revoke_token(token: str) -> bool: + with _lock: + tokens = _load_json(TOKENS_FILE, {}) + if token in tokens: + del tokens[token] + _dump_json(TOKENS_FILE, tokens) + return True + return False + + +def is_valid_token(token: str) -> bool: + return token in list_tokens() + + +# --- Inbound recording --------------------------------------------------- + +def record(bucket: str, payload: dict) -> dict: + """Append a single event to data/recorded/.jsonl. + + `payload` should already include the request body, headers, query + params, and the response we returned — anything a test may want to + assert on later. + """ + with _lock: + _ensure_dirs() + event = { + "id": str(uuid.uuid4()), + "ts": time.time(), + "bucket": bucket, + **payload, + } + path = RECORDED_DIR / f"{bucket}.jsonl" + with path.open("a") as f: + f.write(json.dumps(event, default=str) + "\n") + return event + + +def read_records(bucket: str) -> list[dict]: + path = RECORDED_DIR / f"{bucket}.jsonl" + if not path.exists(): + return [] + out = [] + for line in path.read_text().splitlines(): + line = line.strip() + if not line: + continue + try: + out.append(json.loads(line)) + except json.JSONDecodeError: + continue + return out + + +def list_buckets() -> list[str]: + if not RECORDED_DIR.exists(): + return [] + return sorted(p.stem for p in RECORDED_DIR.glob("*.jsonl")) + + +def clear_records() -> None: + with _lock: + if RECORDED_DIR.exists(): + for f in RECORDED_DIR.glob("*.jsonl"): + f.unlink() + + +# --- Pending-work queues ------------------------------------------------- +# Two named queues: +# - "connection_tests": items shaped {request_id, connector_name} +# - "playbook_tasks": items shaped {proxy_execution_request_id, task, time_range, ...} +# `take_all` drains and returns every queued item — the agent's polling +# tasks consume the whole batch each tick. + +def _queue_path(name: str) -> Path: + _ensure_dirs() + return QUEUES_DIR / f"{name}.json" + + +def queue_push(name: str, item: dict) -> dict: + with _lock: + path = _queue_path(name) + items = _load_json(path, []) + items.append(item) + _dump_json(path, items) + return item + + +def queue_extend(name: str, items: list[dict]) -> int: + with _lock: + path = _queue_path(name) + existing = _load_json(path, []) + existing.extend(items) + _dump_json(path, existing) + return len(items) + + +def queue_take_all(name: str) -> list[dict]: + with _lock: + path = _queue_path(name) + items = _load_json(path, []) + if items: + _dump_json(path, []) + return items + + +def queue_peek(name: str) -> list[dict]: + return _load_json(_queue_path(name), []) + + +def queue_clear(name: str | None = None) -> None: + with _lock: + if name: + p = _queue_path(name) + if p.exists(): + p.unlink() + return + if QUEUES_DIR.exists(): + for f in QUEUES_DIR.glob("*.json"): + f.unlink() + + +def reset_all(keep_tokens: bool = True) -> None: + """Wipe recordings and queues. Keep tokens by default so an in-flight + agent doesn't lose its credential mid-test.""" + clear_records() + queue_clear() + if not keep_tokens and TOKENS_FILE.exists(): + TOKENS_FILE.unlink()