diff --git a/README.md b/README.md index b044c9e..762b655 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ MCP Jam, and long-running agents should use. Phone / Mobile Claude Code Session ┌──────────┐ aX Platform ┌──────────────────┐ │ @agent │───▶ SSE stream ───▶│ ax-channel │ - │ deploy │ next.paxai.app │ (MCP SDK) │ + │ deploy │ next.paxai.app │ (MCP stdio) │ │ status │ │ │ │ └──────────┘ │ ┌────▼────┐ │ ▲ │ │ Claude │ │ @@ -99,9 +99,6 @@ This is not a chat bridge. Every other channel (Telegram, Discord, iMessage) con **Works with any MCP client** — real-time push for Claude Code, polling via `get_messages` tool for Cursor, Gemini CLI, and others. ```bash -# Install -cd channel && bun install - # Bootstrap with CLI first. The user PAT stays in the trusted terminal. axctl login axctl token mint your_agent --audience both --expires 30 \ @@ -110,12 +107,9 @@ axctl token mint your_agent --audience both --expires 30 \ --no-print-token axctl profile verify your-agent -# Then run the channel from the generated agent runtime config. -mkdir -p ~/.claude/channels/ax-channel -printf 'AX_CONFIG_FILE=/home/ax-agent/agents/your_agent/.ax/config.toml\n' \ - > ~/.claude/channels/ax-channel/.env -printf 'AX_SPACE_ID=\n' >> ~/.claude/channels/ax-channel/.env -chmod 600 ~/.claude/channels/ax-channel/.env +# Then run the channel through the generated agent profile/config. +# For a fixed channel session, make the MCP server command explicit: +# eval "$(axctl profile env your-agent)" && exec axctl channel --agent your_agent --space-id # Run claude --dangerously-load-development-channels server:ax-channel @@ -123,8 +117,10 @@ claude --dangerously-load-development-channels server:ax-channel CLI and channel are paired: `axctl` handles bootstrap, profiles, token minting, messages, tasks, and context; `ax-channel` is the live delivery layer that wakes -Claude Code on mentions. See [channel/README.md](channel/README.md) for full -setup guide. +Claude Code on mentions. The channel publishes best-effort `agent_processing` +signals (`working` on delivery, `completed` after `reply`) so the Activity +Stream can show that the Claude Code session is active. See +[channel/README.md](channel/README.md) for full setup guide. ## Connect via Remote MCP @@ -455,7 +451,9 @@ present and fail if `matrix.ok` is false. | `ax send "msg" --file FILE` | Send a chat message with a polished attachment preview backed by context metadata | | `ax upload file FILE` | Upload file to context and emit a compact context-upload signal | | `ax context upload-file FILE` | Upload file to context storage only | +| `ax context fetch-url URL --upload` | Fetch a URL, upload it as a renderable context artifact, and store the source URL | | `ax context load KEY` | Load a context file into the private preview cache | +| `ax context preview KEY` | Agent-friendly alias for loading a protected artifact into the preview cache | | `ax context download KEY` | Download file from context | | `ax apps list` | List MCP app surfaces the CLI can signal | | `ax apps signal context --context-key KEY --to @agent` | Write a folded Context Explorer app signal | @@ -469,6 +467,13 @@ only for storage-only writes where no transcript signal is wanted. Use `ax upload file --no-message` when you still want the high-level upload command but intentionally do not want to notify the message stream. +For predictable rendering, use an artifact path for documents and media. Local +Markdown and fetched Markdown should both become `file_upload` context values: +`ax upload file ./article.md` for local files, or +`ax context fetch-url https://example.com/article.md --upload` for remote files. +Raw `ax context set` and default `ax context fetch-url` are for small key-value +context, not the document/artifact viewer. + Unread state is an API-backed per-user inbox signal. Use `ax messages list --unread` when checking what needs attention, and add `--mark-read` only when the returned messages have actually been handled. diff --git a/ax_cli/client.py b/ax_cli/client.py index d4247a8..c86b4a6 100644 --- a/ax_cli/client.py +++ b/ax_cli/client.py @@ -429,6 +429,30 @@ def send_message( r.raise_for_status() return self._parse_json(r) + def set_agent_processing_status( + self, + message_id: str, + status: str, + *, + agent_name: str | None = None, + space_id: str | None = None, + ) -> dict: + """POST /api/v1/agents/processing-status. + + Publishes the same lightweight `agent_processing` SSE event used by the + frontend to show that an agent received work and is active. This is + best-effort presence/progress, not durable task state. + """ + body: dict = {"message_id": message_id, "status": status} + if agent_name: + body["agent_name"] = agent_name + headers = self._with_agent(self.agent_id) + if space_id: + headers["X-Space-Id"] = space_id + r = self._http.post("/api/v1/agents/processing-status", json=body, headers=headers) + r.raise_for_status() + return self._parse_json(r) + def upload_file(self, file_path: str, *, space_id: str | None = None) -> dict: """POST /api/v1/uploads — upload a local file. diff --git a/ax_cli/commands/alerts.py b/ax_cli/commands/alerts.py new file mode 100644 index 0000000..7574051 --- /dev/null +++ b/ax_cli/commands/alerts.py @@ -0,0 +1,572 @@ +"""ax alerts — fire Activity Stream alert/reminder cards via message metadata. + +First-slice MVP (task dfef4c92): a thin wrapper over POST /api/v1/messages that +builds a ``metadata.alert`` + ``metadata.ui.cards[]`` envelope the existing +frontend already renders as an AlertCardBody. No backend schema changes; no +scheduler dependency; manual fire only. + +Design rule (per ChatGPT 2026-04-15): **tasks are the canonical reminder / +workflow object.** Alerts and reminders are Activity Stream *events* generated +from task reminder policies (or manually fired for ad-hoc alerts). The task +remains the source of truth; alert/reminder messages are receipts / wakeups +linked back via ``metadata.alert.source_task_id``. This CLI only produces +slice-1 manual events — recurring, SLA, and stale-task policies live on the +task object (follow-up work under 0dacbc1e + 68656c16 scheduler). + +Design notes: +- The card type is "alert" so AxMessageWidgets.getCardChrome picks the + ShieldAlert accent and AlertCardBody renders the alert detail block. +- We keep reminder metadata compact — no task-board widget initial_data. + A clickable source_task_id link is enough for the first demo. +- State transitions (ack/snooze/resolve) post a REPLY to the original alert + with ``metadata.alert_state_change``. Backend PATCH only accepts ``content`` + today — metadata updates are silently dropped — so state-change-as-reply + keeps the slice honest and produces an auditable stream event. A small + frontend follow-up can fold the reply into the parent card's state badge. +""" + +from __future__ import annotations + +import datetime as _dt +import uuid +from typing import Any, Optional + +import httpx +import typer + +from ..config import get_client, resolve_agent_name, resolve_space_id +from ..output import JSON_OPTION, console, print_json, print_kv + + +def _fail(message: str, *, exit_code: int = 1) -> None: + """Print an error and exit — alerts.py's own handle_error variant + that accepts a string (the shared ``handle_error`` only wraps + httpx.HTTPStatusError instances).""" + typer.echo(f"Error: {message}", err=True) + raise typer.Exit(exit_code) + + +def _print_kv(data: dict, *, title: str | None = None) -> None: + """print_kv with optional title prefix.""" + if title: + console.print(f"[bold cyan]{title}[/bold cyan]") + print_kv(data) + + +app = typer.Typer(name="alerts", help="Activity Stream alerts and task reminders", no_args_is_help=True) + + +_ALLOWED_SEVERITIES = {"info", "warn", "warning", "critical", "error"} +_ALLOWED_KINDS = {"alert", "reminder", "task_reminder"} +_ALLOWED_STATES = { + "triggered", + "acknowledged", + "snoozed", + "resolved", + "stale", + "escalated", +} + + +def _normalize_severity(value: str) -> str: + value = (value or "info").strip().lower() + if value == "warning": + return "warn" + if value == "error": + return "critical" + if value not in _ALLOWED_SEVERITIES: + raise typer.BadParameter("severity must be one of: info, warn, critical") + return value + + +def _normalize_kind(value: str) -> str: + value = (value or "alert").strip().lower() + if value == "task_reminder": + value = "reminder" + if value not in {"alert", "reminder"}: + raise typer.BadParameter("kind must be 'alert' or 'reminder'") + return value + + +def _normalize_state(value: str) -> str: + value = (value or "triggered").strip().lower() + if value not in _ALLOWED_STATES: + raise typer.BadParameter(f"state must be one of: {', '.join(sorted(_ALLOWED_STATES))}") + return value + + +def _strip_at(target: str | None) -> str | None: + if not target: + return None + return target.strip().lstrip("@") or None + + +def _iso_utc_now() -> str: + return _dt.datetime.now(_dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + + +_MIN_REASONABLE_YEAR = 2020 + + +def _validate_timestamp(value: str | None, *, flag: str) -> str | None: + """Reject obviously-broken timestamps (e.g. 2000-01-01 from a runner + with clock skew — caught in msg b9fb15b6 dogfood). + + Accepts None/empty (field optional). Returns the validated string. + Raises typer.BadParameter with a clear message on garbage. + """ + if not value: + return None + try: + # Normalize trailing Z to +00:00 for fromisoformat() on 3.10 + probe = value.strip().replace("Z", "+00:00") if value.endswith("Z") else value.strip() + parsed = _dt.datetime.fromisoformat(probe) + except ValueError as exc: + raise typer.BadParameter(f"{flag}: not a valid ISO-8601 timestamp: {value!r} ({exc})") + + if parsed.year < _MIN_REASONABLE_YEAR: + raise typer.BadParameter( + f"{flag}: timestamp {value!r} is before {_MIN_REASONABLE_YEAR} — " + f"this usually means the caller has a broken clock. Pass a real UTC ISO-8601 " + f"timestamp (e.g. 2026-04-16T17:00:00Z)." + ) + return value + + +def _build_alert_metadata( + *, + kind: str, + severity: str, + target: str | None, + reason: str, + source_task_id: str | None, + due_at: str | None, + remind_at: str | None, + expected_response: str | None, + response_required: bool, + evidence: str | None, + triggered_by_agent: str | None, + title: str | None, + state: str = "triggered", +) -> dict[str, Any]: + """Build the ``metadata`` block the frontend's AlertCardBody reads. + + Shape mirrors the dogfood message 1942cc2c but with the compact + reminder fields ChatGPT flagged (source_task_id, due_at, remind_at, + state) and no task-board widget hydration. + """ + card_title = title or (f"Reminder: {reason[:80]}" if kind == "reminder" else f"Alert: {reason[:80]}") + fired_at = _iso_utc_now() + card_id = f"alert:{uuid.uuid4()}" + + alert: dict[str, Any] = { + "kind": "task_reminder" if kind == "reminder" else "alert", + "severity": severity, + "source": "axctl_alerts", + "state": state, + "fired_at": fired_at, + "title": card_title, + "summary": reason, + "reason": reason, + "response_required": response_required, + } + if target: + alert["target_agent"] = target + alert["target"] = target + if source_task_id: + alert["source_task_id"] = source_task_id + if due_at: + alert["due_at"] = due_at + if remind_at: + alert["remind_at"] = remind_at + if expected_response: + alert["expected_response"] = expected_response + if evidence: + alert["context_key"] = evidence + if triggered_by_agent: + alert["triggered_by_agent_name"] = triggered_by_agent + + card_payload: dict[str, Any] = { + "title": card_title, + "summary": reason, + "severity": severity, + "alert": alert, + "intent": "alert", + } + if source_task_id: + card_payload["source_task_id"] = source_task_id + card_payload["resource_uri"] = f"ui://tasks/{source_task_id}" + + return { + "alert": alert, + "ui": { + "cards": [ + { + "card_id": card_id, + "type": "alert", + "version": 1, + "payload": card_payload, + } + ] + }, + } + + +def _resolve_target_from_task(client: Any, task_id: str) -> tuple[str | None, str | None]: + """Fetch a task and return (target_name, resolved_from). + + Preference: assignee → creator. Returns (None, None) on any failure — + callers should fall back to unassigned-but-still-fired behavior. + ``resolved_from`` is "assignee" or "creator" for display/logging. + """ + try: + r = client._http.get( + f"/api/v1/tasks/{task_id}", + headers=client._with_agent(None), + ) + r.raise_for_status() + wrapper = client._parse_json(r) + except Exception: + return None, None + + task = wrapper.get("task", wrapper) if isinstance(wrapper, dict) else {} + + # The backend returns ids, not names. Try to resolve via the agent + # roster — best-effort, skip if unreachable. + def _name_for(agent_id: str | None) -> str | None: + if not agent_id: + return None + try: + rr = client._http.get( + f"/api/v1/agents/{agent_id}", + headers=client._with_agent(None), + ) + rr.raise_for_status() + agent_wrapper = client._parse_json(rr) + agent = agent_wrapper.get("agent", agent_wrapper) if isinstance(agent_wrapper, dict) else {} + name = agent.get("name") or agent.get("username") or agent.get("handle") + return name.strip().lstrip("@") if isinstance(name, str) else None + except Exception: + return None + + assignee_name = _name_for(task.get("assignee_id")) + if assignee_name: + return assignee_name, "assignee" + creator_name = _name_for(task.get("creator_id")) + if creator_name: + return creator_name, "creator" + return None, None + + +def _format_mention_content(target: str | None, reason: str, kind: str) -> str: + label = "Reminder" if kind == "reminder" else "Alert" + prefix = f"@{target} " if target else "" + return f"{prefix}{label}: {reason}" + + +@app.command("send") +def send( + reason: str = typer.Argument(..., help="Short human-readable reason / summary"), + target: Optional[str] = typer.Option(None, "--target", "-t", help="@agent or username (no @ needed)"), + severity: str = typer.Option("info", "--severity", "-s", help="info | warn | critical"), + kind: str = typer.Option("alert", "--kind", "-k", help="alert | reminder"), + source_task: Optional[str] = typer.Option(None, "--source-task", help="Linked task id (clickable in card)"), + due_at: Optional[str] = typer.Option(None, "--due-at", help="ISO-8601 due timestamp (reminder)"), + remind_at: Optional[str] = typer.Option(None, "--remind-at", help="ISO-8601 remind-at timestamp (reminder)"), + expected_response: Optional[str] = typer.Option(None, "--expected-response", help="What response is expected"), + response_required: bool = typer.Option(False, "--response-required", help="Mark response as required"), + evidence: Optional[str] = typer.Option(None, "--evidence", help="Context key / URL pointing at evidence"), + title: Optional[str] = typer.Option(None, "--title", help="Override card title (defaults to reason)"), + channel: str = typer.Option("main", "--channel", "-c", help="Channel (default: main)"), + space_id: Optional[str] = typer.Option(None, "--space-id", help="Override default space"), + as_json: bool = JSON_OPTION, +) -> None: + """Fire an alert or reminder into the Activity Stream. + + Examples: + + ax alerts send "dev ALB regressed on /auth/me" --target @orion --severity critical + ax alerts send "review needed" --kind reminder --source-task dfef4c92 --remind-at 2026-04-16T17:00Z + """ + severity_n = _normalize_severity(severity) + kind_n = _normalize_kind(kind) + target_n = _strip_at(target) + + if kind_n == "reminder" and not source_task: + raise typer.BadParameter("--source-task is required for --kind reminder") + + # Clock-skew guard: reject nonsense timestamps (e.g. 2000-01-01 from a + # runner with a frozen/unset system clock — real case caught via msg + # b9fb15b6). Applies to both --due-at and --remind-at. + due_at = _validate_timestamp(due_at, flag="--due-at") + remind_at = _validate_timestamp(remind_at, flag="--remind-at") + + # Reminders that the recipient is expected to act on should default to + # response_required=true so the card shows a "Required" chip, unless the + # firer explicitly chose a one-shot-FYI (--kind alert). + if kind_n == "reminder" and not response_required: + response_required = True + + client = get_client() + try: + resolved_space = resolve_space_id(client, explicit=space_id) + except Exception as exc: + _fail(f"Space ID not resolvable: {exc}. Pass --space-id or configure default.", exit_code=2) + + try: + triggered_by = resolve_agent_name(client=client) + except Exception: + triggered_by = None + + # Task-linked design: when --source-task is given but no --target, + # default to the task's assignee, falling back to creator. This keeps + # tasks as the source of truth (per dfef4c92 / 0dacbc1e design rule) + # and means CLI reminders reach the right agent without manual targeting. + target_resolved_from = None + if source_task and not target_n: + target_n, target_resolved_from = _resolve_target_from_task(client, source_task) + + metadata = _build_alert_metadata( + kind=kind_n, + severity=severity_n, + target=target_n, + reason=reason, + source_task_id=source_task, + due_at=due_at, + remind_at=remind_at, + expected_response=expected_response, + response_required=response_required, + evidence=evidence, + triggered_by_agent=triggered_by, + title=title, + ) + + content = _format_mention_content(target_n, reason, kind_n) + + try: + result = client.send_message( + resolved_space, + content, + channel=channel, + metadata=metadata, + message_type="alert" if kind_n == "alert" else "reminder", + ) + except httpx.HTTPStatusError as exc: + _fail(f"send failed: {exc.response.status_code} {exc.response.text[:300]}", exit_code=1) + except (httpx.ConnectError, httpx.ReadError) as exc: + _fail(f"cannot reach aX API: {exc}", exit_code=1) + + if as_json: + print_json(result) + return + + # Response is either {"id": ...} or {"message": {"id": ...}} + msg: dict[str, Any] = result.get("message", result) if isinstance(result, dict) else {} + target_label = target_n or "-" + if target_resolved_from: + target_label = f"{target_n} (from task {target_resolved_from})" + _print_kv( + { + "id": msg.get("id", "?"), + "kind": kind_n, + "severity": severity_n, + "target": target_label, + "source_task": source_task or "-", + "state": "triggered", + }, + title=f"{'Reminder' if kind_n == 'reminder' else 'Alert'} fired", + ) + + +@app.command("reminder") +def reminder( + reason: str = typer.Argument(..., help="Short reminder text"), + source_task: str = typer.Option(..., "--source-task", help="Linked task id (required)"), + target: Optional[str] = typer.Option(None, "--target", "-t", help="@agent or username"), + severity: str = typer.Option("info", "--severity", "-s", help="info | warn | critical"), + due_at: Optional[str] = typer.Option(None, "--due-at", help="ISO-8601 due timestamp"), + remind_at: Optional[str] = typer.Option(None, "--remind-at", help="ISO-8601 remind-at timestamp"), + evidence: Optional[str] = typer.Option(None, "--evidence", help="Context key / URL"), + channel: str = typer.Option("main", "--channel", "-c", help="Channel"), + space_id: Optional[str] = typer.Option(None, "--space-id", help="Override default space"), + as_json: bool = JSON_OPTION, +) -> None: + """Shortcut for ``ax alerts send --kind reminder``.""" + # Delegate to send() with kind=reminder + send( # type: ignore[call-arg] + reason=reason, + target=target, + severity=severity, + kind="reminder", + source_task=source_task, + due_at=due_at, + remind_at=remind_at, + expected_response=None, + response_required=False, + evidence=evidence, + title=None, + channel=channel, + space_id=space_id, + as_json=as_json, + ) + + +def _post_state_change(message_id: str, new_state: str, *, as_json: bool = False) -> None: + """Post a state-change *reply* to an existing alert message. + + The backend's message PATCH endpoint (``MessageEditBody``) only accepts + ``content`` — metadata updates are silently dropped. So for the first + MVP slice we treat state transitions as first-class stream events: a + reply-message whose ``metadata.alert_state_change`` references the + parent alert. The frontend can fold these into the parent card's + state badge on render (that's a small follow-up PR). + + This keeps the slice honest about the backend constraint while still + producing an auditable, streamable state-change event. + """ + new_state = _normalize_state(new_state) + client = get_client() + + try: + r = client._http.get( + f"/api/v1/messages/{message_id}", + headers=client._with_agent(None), + ) + r.raise_for_status() + parent_wrapper = client._parse_json(r) + except httpx.HTTPStatusError as exc: + _fail(f"fetch parent failed: {exc.response.status_code}", exit_code=1) + except (httpx.ConnectError, httpx.ReadError) as exc: + _fail(f"cannot reach aX API: {exc}", exit_code=1) + + parent = parent_wrapper.get("message", parent_wrapper) if isinstance(parent_wrapper, dict) else {} + parent_metadata = parent.get("metadata") or {} + parent_alert = parent_metadata.get("alert") or {} + if not parent_alert: + _fail(f"message {message_id} has no metadata.alert — not an alert", exit_code=1) + + parent_space = parent.get("space_id") + if not parent_space: + _fail(f"message {message_id} has no space_id", exit_code=1) + + now = _iso_utc_now() + parent_kind = parent_alert.get("kind", "alert") + previous_state = parent_alert.get("state", "triggered") + state_change_metadata = { + "alert_state_change": { + "parent_message_id": message_id, + "new_state": new_state, + "previous_state": previous_state, + "changed_at": now, + "kind": parent_kind, + }, + "alert": { + # Mirror as a lightweight alert so existing card renderers that + # key on metadata.alert still pick up the transition as an event. + "kind": "alert_state_change", + "severity": parent_alert.get("severity", "info"), + "state": new_state, + "source": "axctl_alerts", + "parent_message_id": message_id, + "fired_at": now, + "title": f"{parent_kind} → {new_state}", + "summary": f"State changed from {previous_state} to {new_state}", + }, + } + + content = f"[{parent_kind} → {new_state}]" + + try: + result = client.send_message( + parent_space, + content, + parent_id=message_id, + metadata=state_change_metadata, + message_type="alert_state_change", + ) + except httpx.HTTPStatusError as exc: + detail = exc.response.text[:300] + hint = "" + if "Cannot reply to your own message" in detail: + hint = ( + "\nHint: alerts are acked/resolved by the *recipient*, not the firer. " + "Run this command as the target agent (or the user)." + ) + _fail( + f"state-change post failed: {exc.response.status_code} {detail}{hint}", + exit_code=1, + ) + except (httpx.ConnectError, httpx.ReadError) as exc: + _fail(f"cannot reach aX API: {exc}", exit_code=1) + + if as_json: + print_json(result) + return + + reply = result.get("message", result) if isinstance(result, dict) else {} + _print_kv( + { + "parent": message_id, + "reply": reply.get("id", "?"), + "new_state": new_state, + }, + title=f"Alert state → {new_state} (posted as reply)", + ) + + +@app.command("ack") +def ack( + message_id: str = typer.Argument(..., help="Alert message ID"), + as_json: bool = JSON_OPTION, +) -> None: + """Acknowledge an alert (state → acknowledged). + + Semantics: the *recipient* of an alert acks it, not the firer. Running + this on an alert you sent will fail with "Cannot reply to your own + message" — run it as the targeted agent or user instead. + + Today this posts a state-change reply linked to the parent alert + because the backend PATCH endpoint drops metadata updates. Once + 247f7bf0 lands (backend accepts metadata on PATCH), this becomes + an in-place state transition. + """ + _post_state_change(message_id, "acknowledged", as_json=as_json) + + +@app.command("resolve") +def resolve( + message_id: str = typer.Argument(..., help="Alert message ID"), + as_json: bool = JSON_OPTION, +) -> None: + """Resolve an alert (state → resolved). + + Semantics: the recipient (or an authorized responder) resolves — + not the firer. See ``ax alerts ack --help`` for the full note. + """ + _post_state_change(message_id, "resolved", as_json=as_json) + + +@app.command("snooze") +def snooze( + message_id: str = typer.Argument(..., help="Alert message ID"), + as_json: bool = JSON_OPTION, +) -> None: + """Snooze an alert (state → snoozed). + + Slice-2 scheduler will re-fire snoozed reminders at remind_at / next + cadence tick. For slice 1 this is purely a stream event — no re-fire yet. + """ + _post_state_change(message_id, "snoozed", as_json=as_json) + + +@app.command("state") +def set_state( + message_id: str = typer.Argument(..., help="Alert message ID"), + new_state: str = typer.Argument(..., help="triggered | acknowledged | resolved | stale | escalated"), + as_json: bool = JSON_OPTION, +) -> None: + """Set an arbitrary state on an existing alert. + + Subject to the same recipient-acks-not-firer rule as ``ack``/``resolve``. + """ + _post_state_change(message_id, new_state, as_json=as_json) diff --git a/ax_cli/commands/channel.py b/ax_cli/commands/channel.py index af8e90d..102ee82 100644 --- a/ax_cli/commands/channel.py +++ b/ax_cli/commands/channel.py @@ -9,10 +9,12 @@ import asyncio import contextlib import json +import os import sys import threading import time from dataclasses import dataclass +from pathlib import Path from typing import Any, Optional import httpx @@ -28,6 +30,22 @@ SERVER_NAME = "ax-channel" SERVER_VERSION = "0.1.0" SEEN_MAX = 500 +CHANNEL_ENV_PATH = Path.home() / ".claude" / "channels" / "ax-channel" / ".env" + + +def _load_channel_env(path: Path = CHANNEL_ENV_PATH) -> None: + """Load KEY=VALUE channel env defaults without overriding real env vars.""" + if not path.exists(): + return + for line in path.read_text().splitlines(): + raw = line.strip() + if not raw or raw.startswith("#") or "=" not in raw: + continue + key, value = raw.split("=", 1) + key = key.strip() + if not key or key in os.environ: + continue + os.environ[key] = value.strip().strip("\"'") @dataclass(slots=True) @@ -53,12 +71,14 @@ def __init__( space_id: str, queue_size: int, debug: bool, + processing_status: bool, ) -> None: self.client = client self.agent_name = agent_name self.agent_id = agent_id self.space_id = space_id self.debug = debug + self.processing_status = processing_status self.loop: asyncio.AbstractEventLoop | None = None self.mention_queue: asyncio.Queue[MentionEvent] = asyncio.Queue(maxsize=queue_size) self.initialized = asyncio.Event() @@ -67,6 +87,7 @@ def __init__( self._write_lock = asyncio.Lock() self._last_message_id: str | None = None self._reply_anchor_ids: set[str] = set() + self._pending_mentions: list[MentionEvent] = [] def log(self, message: str) -> None: if not self.debug: @@ -99,6 +120,31 @@ async def write_message(self, payload: dict[str, Any]) -> None: async def send_notification(self, method: str, params: dict[str, Any]) -> None: await self.write_message({"jsonrpc": "2.0", "method": method, "params": params}) + async def publish_processing_status(self, message_id: str, status: str) -> None: + """Best-effort Activity Stream signal for channel delivery/progress. + + This lets the frontend show the same inline "agent is working" affordance + for Claude Code channel sessions that it shows for other agent runtimes. + It is intentionally non-blocking: channel delivery/replies must still + work if the progress endpoint is unavailable. + """ + if not self.processing_status: + return + try: + + def _send_status(): + return self.client.set_agent_processing_status( + message_id, + status, + agent_name=self.agent_name, + space_id=self.space_id, + ) + + await asyncio.to_thread(_send_status) + self.log(f"processing status {status} for {message_id[:12]}") + except Exception as exc: # pragma: no cover - live best-effort path + self.log(f"processing status failed for {message_id[:12]}: {exc}") + async def send_response(self, request_id: Any, result: dict[str, Any]) -> None: await self.write_message({"jsonrpc": "2.0", "id": request_id, "result": result}) @@ -117,18 +163,21 @@ async def emit_mentions(self) -> None: self.log("emit_mentions: initialized done, sending notification") self._last_message_id = event.message_id + self._pending_mentions.append(event) + if len(self._pending_mentions) > SEEN_MAX: + self._pending_mentions = self._pending_mentions[-SEEN_MAX // 2 :] + ts = event.created_at or time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) meta: dict[str, Any] = { "chat_id": event.space_id, "message_id": event.message_id, - "parent_id": event.parent_id, - "conversation_id": event.conversation_id, "user": event.author, "sender": event.author, "source": "ax", "space_id": event.space_id, - "ts": event.created_at, - "raw_content": event.raw_content, + "ts": ts, } + if event.parent_id: + meta["parent_id"] = event.parent_id if event.attachments: meta["attachments"] = event.attachments await self.send_notification( @@ -138,6 +187,7 @@ async def emit_mentions(self) -> None: "meta": meta, }, ) + await self.publish_processing_status(event.message_id, "working") self.log(f"delivered mention {event.message_id} from {event.author}") finally: self.mention_queue.task_done() @@ -178,14 +228,64 @@ async def handle_tools_list(self, request_id: Any) -> None: }, "required": ["text"], }, - } + }, + { + "name": "get_messages", + "description": "Get pending aX channel messages for clients that need a polling fallback.", + "inputSchema": { + "type": "object", + "properties": { + "limit": { + "type": "number", + "description": "Max messages to return (default: 10).", + }, + "mark_read": { + "type": "boolean", + "description": "Remove returned messages from the pending fallback queue (default: true).", + }, + }, + }, + }, ] }, ) + async def handle_empty_list(self, request_id: Any, key: str) -> None: + await self.send_response(request_id, {key: []}) + + async def handle_get_messages(self, request_id: Any, arguments: dict[str, Any]) -> None: + try: + limit = max(1, int(arguments.get("limit") or 10)) + except (TypeError, ValueError): + limit = 10 + mark_read = arguments.get("mark_read") is not False + pending = self._pending_mentions[:limit] + if mark_read: + self._pending_mentions = self._pending_mentions[len(pending) :] + if not pending: + text = "No pending messages." + else: + text = json.dumps( + [ + { + "message_id": event.message_id, + "author": event.author, + "content": event.prompt, + "parent_id": event.parent_id, + "ts": event.created_at, + } + for event in pending + ], + indent=2, + ) + await self.send_response(request_id, {"content": [{"type": "text", "text": text}]}) + async def handle_tool_call(self, request_id: Any, params: dict[str, Any]) -> None: name = params.get("name") arguments = params.get("arguments") or {} + if name == "get_messages": + await self.handle_get_messages(request_id, arguments) + return if name != "reply": await self.send_error(request_id, -32601, f"Unknown tool: {name}") return @@ -242,6 +342,7 @@ def _send_as_agent(): message = data.get("message", data) sent_id = message.get("id") or data.get("id") _remember_reply_anchor(self._reply_anchor_ids, sent_id) + await self.publish_processing_status(reply_to, "completed") await self.send_response( request_id, { @@ -272,6 +373,12 @@ async def handle_request(self, request: dict[str, Any]) -> None: await self.handle_initialize(request_id) elif method == "tools/list": await self.handle_tools_list(request_id) + elif method == "resources/list": + await self.handle_empty_list(request_id, "resources") + elif method == "resources/templates/list": + await self.handle_empty_list(request_id, "resourceTemplates") + elif method == "prompts/list": + await self.handle_empty_list(request_id, "prompts") elif method == "tools/call": await self.handle_tool_call(request_id, params) elif method == "ping": @@ -458,8 +565,14 @@ def channel( space_id: Optional[str] = typer.Option(None, "--space-id", "-s", help="Space to bridge (default: from config)"), queue_size: int = typer.Option(50, "--queue-size", help="Max queued mentions before dropping"), debug: bool = typer.Option(False, "--debug", help="Log bridge activity to stderr"), + processing_status: bool = typer.Option( + True, + "--processing-status/--no-processing-status", + help="Publish agent_processing events when messages are delivered and replies complete.", + ), ): """Run an MCP stdio server that bridges aX mentions into Claude Code.""" + _load_channel_env() client = get_client() agent_name = agent or resolve_agent_name(client=client) if not agent_name: @@ -479,6 +592,7 @@ def channel( space_id=sid, queue_size=queue_size, debug=debug, + processing_status=processing_status, ) listener = threading.Thread(target=_sse_loop, args=(bridge,), daemon=True) diff --git a/ax_cli/commands/context.py b/ax_cli/commands/context.py index ec69278..4127035 100644 --- a/ax_cli/commands/context.py +++ b/ax_cli/commands/context.py @@ -340,27 +340,33 @@ def fetch_url( if upload or not is_text: # Download to temp file, then upload - suffix = Path(parsed.path).suffix or ".bin" - with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: - tmp.write(resp.content) - tmp_path = tmp.name + safe_name = _safe_filename(default_key) + if "." not in safe_name: + safe_name = f"{safe_name}.bin" + tmp_dir = Path(tempfile.mkdtemp(prefix="ax-fetch-url-")) + tmp_path = tmp_dir / safe_name + tmp_path.write_bytes(resp.content) try: - upload_data = client.upload_file(tmp_path, space_id=sid) + upload_data = client.upload_file(str(tmp_path), space_id=sid) except httpx.HTTPStatusError as exc: handle_error(exc) finally: - Path(tmp_path).unlink(missing_ok=True) + tmp_path.unlink(missing_ok=True) + tmp_dir.rmdir() info = _normalize_upload(upload_data) context_value = { - "type": "url_fetch_upload", + "type": "file_upload", "filename": info.get("filename"), "content_type": info.get("content_type") or content_type, "size": info.get("size"), "url": info.get("url"), + "source": "url_fetch", "source_url": url, } + if is_text: + context_value["content"] = resp.text else: # Store text content directly in context text_content = resp.text @@ -562,7 +568,7 @@ def load_file( print_kv(result) -@app.command("preview", hidden=True) +@app.command("preview") def preview_file( key: str = typer.Argument(..., help="Context key to preview"), cache_dir: Optional[str] = typer.Option( @@ -574,7 +580,12 @@ def preview_file( space_id: Optional[str] = typer.Option(None, "--space-id", help="Override default space"), as_json: bool = JSON_OPTION, ): - """Backward-compatible alias for `context load`.""" + """Preview a context artifact from the private local cache. + + This is an agent-friendly alias for `context load`: it resolves protected + upload URLs with the active profile, writes the artifact under the preview + cache, and returns the local path. + """ load_file( key=key, cache_dir=cache_dir, diff --git a/ax_cli/commands/credentials.py b/ax_cli/commands/credentials.py index ba0ae69..eb915c1 100644 --- a/ax_cli/commands/credentials.py +++ b/ax_cli/commands/credentials.py @@ -8,11 +8,81 @@ import typer from ..config import get_client -from ..output import JSON_OPTION, console, handle_error, print_json +from ..output import EXIT_NOT_OK, JSON_OPTION, console, handle_error, print_json, print_table app = typer.Typer(name="credentials", help="Credential management (PATs, enrollment tokens)", no_args_is_help=True) +def _active_agent_credentials(credentials: list[dict]) -> dict[str, list[dict]]: + grouped: dict[str, list[dict]] = {} + for credential in credentials: + if credential.get("lifecycle_state") != "active": + continue + agent_id = credential.get("bound_agent_id") + if not agent_id: + continue + grouped.setdefault(agent_id, []).append(credential) + return grouped + + +def build_credential_audit(credentials: list[dict]) -> dict: + """Build the non-destructive agent PAT hygiene report.""" + agents: list[dict] = [] + for agent_id, active in sorted(_active_agent_credentials(credentials).items()): + active = sorted(active, key=lambda c: str(c.get("created_at") or "")) + count = len(active) + if count == 1: + status = "ok" + severity = "ok" + recommendation = "one active PAT" + elif count == 2: + status = "rotation_window" + severity = "warning" + recommendation = "verify the replacement works, then revoke the older PAT" + else: + status = "cleanup_required" + severity = "violation" + recommendation = "revoke stale PATs before minting another token" + + agents.append( + { + "agent_id": agent_id, + "active_count": count, + "status": status, + "severity": severity, + "recommendation": recommendation, + "credentials": [ + { + "credential_id": c.get("credential_id"), + "key_id": c.get("key_id"), + "name": c.get("name"), + "audience": c.get("audience"), + "created_at": c.get("created_at"), + "expires_at": c.get("expires_at"), + "last_used_at": c.get("last_used_at"), + } + for c in active + ], + } + ) + + summary = { + "agents_checked": len(agents), + "ok": sum(1 for agent in agents if agent["status"] == "ok"), + "rotation_windows": sum(1 for agent in agents if agent["status"] == "rotation_window"), + "cleanup_required": sum(1 for agent in agents if agent["status"] == "cleanup_required"), + } + return { + "policy": { + "normal_active_agent_pats": 1, + "rotation_window_active_agent_pats": 2, + "max_active_agent_pats": 2, + }, + "summary": summary, + "agents": agents, + } + + @app.command("issue-agent-pat") def issue_agent_pat( agent: str = typer.Argument(..., help="Agent name or ID to bind PAT to"), @@ -125,6 +195,49 @@ def revoke( console.print(f"[red]Revoked:[/red] {credential_id}") +@app.command("audit") +def audit( + as_json: bool = JSON_OPTION, + strict: bool = typer.Option(False, "--strict", help="Exit non-zero when any agent has more than two active PATs"), +): + """Audit active agent PAT counts without minting or revoking credentials.""" + client = get_client() + try: + creds = client.mgmt_list_credentials() + except httpx.HTTPStatusError as e: + handle_error(e) + + report = build_credential_audit(creds) + if as_json: + print_json(report) + else: + summary = report["summary"] + console.print( + "[bold]Agent PAT audit[/bold] " + f"ok={summary['ok']} rotation_windows={summary['rotation_windows']} " + f"cleanup_required={summary['cleanup_required']}" + ) + if not report["agents"]: + console.print("[dim]No active agent-bound PATs found.[/dim]") + else: + print_table( + ["Agent", "Active", "Status", "Recommendation"], + [ + { + "agent": agent["agent_id"], + "active": agent["active_count"], + "status": agent["status"], + "recommendation": agent["recommendation"], + } + for agent in report["agents"] + ], + keys=["agent", "active", "status", "recommendation"], + ) + + if strict and report["summary"]["cleanup_required"]: + raise typer.Exit(EXIT_NOT_OK) + + @app.command("list") def list_credentials(as_json: bool = JSON_OPTION): """List all credentials you own.""" diff --git a/ax_cli/commands/messages.py b/ax_cli/commands/messages.py index 76670e1..74386e6 100644 --- a/ax_cli/commands/messages.py +++ b/ax_cli/commands/messages.py @@ -1,6 +1,8 @@ """ax messages — send, list, get, edit, delete, search.""" import json +import queue +import threading import time from pathlib import Path from typing import Optional @@ -11,6 +13,7 @@ from ..config import get_client, resolve_agent_name, resolve_space_id from ..context_keys import build_upload_context_key from ..output import JSON_OPTION, console, handle_error, print_json, print_kv, print_table +from .watch import _iter_sse app = typer.Typer(name="messages", help="Message operations", no_args_is_help=True) @@ -21,6 +24,85 @@ def _print_wait_status(remaining: int, last_remaining: int | None, wait_label: s return remaining +def _processing_status_from_event(message_id: str, event_type: str | None, data: object) -> dict | None: + """Return an agent_processing event for this message, if one was emitted.""" + if event_type != "agent_processing" or not isinstance(data, dict): + return None + event_message_id = str(data.get("message_id") or data.get("source_message_id") or "") + if event_message_id != message_id: + return None + status = str(data.get("status") or "").strip() + if not status: + return None + return { + "message_id": event_message_id, + "status": status, + "agent_id": data.get("agent_id"), + "agent_name": data.get("agent_name"), + } + + +class _ProcessingStatusWatcher: + """Best-effort SSE watcher for delivery/working events emitted by channel runtimes.""" + + def __init__(self, client, *, space_id: str, timeout: int) -> None: + self.client = client + self.space_id = space_id + self.deadline = time.time() + max(1, timeout) + self.message_id: str | None = None + self.events: list[dict] = [] + self._queue: queue.Queue[dict] = queue.Queue() + self._ready = threading.Event() + self._stop = threading.Event() + self._thread: threading.Thread | None = None + + def start(self) -> None: + self._thread = threading.Thread(target=self._run, name="ax-send-processing-watch", daemon=True) + self._thread.start() + + def wait_ready(self, timeout: float = 1.5) -> bool: + return self._ready.wait(timeout) + + def set_message_id(self, message_id: str) -> None: + self.message_id = message_id + + def close(self) -> None: + self._stop.set() + + def drain(self) -> list[dict]: + drained: list[dict] = [] + while True: + try: + item = self._queue.get_nowait() + except queue.Empty: + return drained + self.events.append(item) + drained.append(item) + + def _run(self) -> None: + while not self._stop.is_set() and time.time() < self.deadline: + try: + timeout = httpx.Timeout(connect=5, read=1, write=5, pool=5) + with self.client.connect_sse(space_id=self.space_id, timeout=timeout) as response: + self._ready.set() + if response.status_code != 200: + return + for event_type, data in _iter_sse(response): + if self._stop.is_set() or time.time() >= self.deadline: + return + message_id = self.message_id + if not message_id: + continue + status = _processing_status_from_event(message_id, event_type, data) + if status: + self._queue.put(status) + except httpx.ReadTimeout: + continue + except (httpx.HTTPError, RuntimeError, AttributeError): + self._ready.set() + return + + def _matching_reply(message_id: str, payload, seen_ids: set[str]) -> tuple[dict | None, bool]: routing_announced = False @@ -60,13 +142,24 @@ def _wait_for_reply_polling( seen_ids: set[str], wait_label: str = "reply", poll_interval: float = 2.0, + processing_watcher: _ProcessingStatusWatcher | None = None, ) -> dict | None: """Poll for a reply as a fallback when SSE is unavailable.""" last_remaining = None + announced_processing: set[tuple[str | None, str]] = set() while time.time() < deadline: remaining = int(deadline - time.time()) last_remaining = _print_wait_status(remaining, last_remaining, wait_label) + if processing_watcher: + for status_event in processing_watcher.drain(): + status = str(status_event.get("status") or "") + agent_name = status_event.get("agent_name") or wait_label + key = (status_event.get("agent_id"), status) + if status and key not in announced_processing: + console.print(" " * 60, end="\r") + console.print(f" [cyan]@{str(agent_name).lstrip('@')} is {status}[/cyan]") + announced_processing.add(key) try: data = client.list_replies(message_id) @@ -85,7 +178,14 @@ def _wait_for_reply_polling( return None -def _wait_for_reply(client, message_id: str, timeout: int = 60, wait_label: str = "reply") -> dict | None: +def _wait_for_reply( + client, + message_id: str, + timeout: int = 60, + wait_label: str = "reply", + *, + processing_watcher: _ProcessingStatusWatcher | None = None, +) -> dict | None: """Wait for a reply by polling list_replies.""" deadline = time.time() + timeout seen_ids: set[str] = {message_id} @@ -97,6 +197,7 @@ def _wait_for_reply(client, message_id: str, timeout: int = 60, wait_label: str seen_ids=seen_ids, wait_label=wait_label, poll_interval=1.0, + processing_watcher=processing_watcher, ) @@ -368,6 +469,12 @@ def send( if not _starts_with_mention(content, mention): final_content = f"{mention} {content}" + processing_watcher = None + if wait and to: + processing_watcher = _ProcessingStatusWatcher(client, space_id=sid, timeout=timeout + 5) + processing_watcher.start() + processing_watcher.wait_ready() + try: parent_id = _resolve_message_id(client, parent, space_id=sid) if parent else None data = client.send_message( @@ -382,8 +489,12 @@ def send( msg = data.get("message", data) msg_id = msg.get("id") or msg.get("message_id") or data.get("id") + if processing_watcher and msg_id: + processing_watcher.set_message_id(str(msg_id)) if not wait or not msg_id: + if processing_watcher: + processing_watcher.close() if as_json: print_json(data) else: @@ -392,18 +503,41 @@ def send( console.print(f"[green]Sent.[/green] id={msg_id}") wait_label = _target_mention("aX") if ask_ax else (_target_mention(to) if to else "reply") - reply = _wait_for_reply(client, msg_id, timeout=timeout, wait_label=wait_label) + reply = _wait_for_reply( + client, + msg_id, + timeout=timeout, + wait_label=wait_label, + processing_watcher=processing_watcher, + ) + processing_statuses = processing_watcher.events if processing_watcher else [] + if processing_watcher: + processing_watcher.close() if reply: if as_json: - print_json({"sent": data, "reply": reply}) + print_json({"sent": data, "reply": reply, "processing_statuses": processing_statuses}) else: console.print(f"\n[bold cyan]aX:[/bold cyan] {reply.get('content', '')}") else: if as_json: - print_json({"sent": data, "reply": None, "timeout": True}) + print_json( + { + "sent": data, + "reply": None, + "timeout": True, + "processing_statuses": processing_statuses, + } + ) else: - console.print(f"\n[yellow]No reply within {timeout}s. Check later: ax messages list[/yellow]") + if processing_statuses: + last_status = processing_statuses[-1].get("status") + console.print( + f"\n[yellow]No final reply within {timeout}s, " + f"but target emitted processing status: {last_status}.[/yellow]" + ) + else: + console.print(f"\n[yellow]No reply within {timeout}s. Check later: ax messages list[/yellow]") @app.command("list") diff --git a/ax_cli/commands/reminders.py b/ax_cli/commands/reminders.py new file mode 100644 index 0000000..ae3063c --- /dev/null +++ b/ax_cli/commands/reminders.py @@ -0,0 +1,407 @@ +"""Local reminder policy runner. + +This is intentionally a CLI-first dogfood loop. It stores reminder policy +state in a local JSON file, then emits Activity Stream reminder cards through +the existing ``ax alerts`` metadata contract when policies become due. +""" + +from __future__ import annotations + +import datetime as _dt +import json +import os +import time +import uuid +from pathlib import Path +from typing import Any, Optional + +import httpx +import typer + +from ..config import get_client, resolve_agent_name, resolve_space_id +from ..output import JSON_OPTION, console, print_json, print_table +from .alerts import ( + _build_alert_metadata, + _format_mention_content, + _normalize_severity, + _resolve_target_from_task, + _strip_at, + _validate_timestamp, +) + +app = typer.Typer(name="reminders", help="Local task reminder policy runner", no_args_is_help=True) + + +def _now() -> _dt.datetime: + return _dt.datetime.now(_dt.timezone.utc).replace(microsecond=0) + + +def _iso(value: _dt.datetime) -> str: + return value.astimezone(_dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + + +def _parse_iso(value: str) -> _dt.datetime: + text = value.strip() + if text.endswith("Z"): + text = text[:-1] + "+00:00" + parsed = _dt.datetime.fromisoformat(text) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=_dt.timezone.utc) + return parsed.astimezone(_dt.timezone.utc) + + +def _default_policy_file() -> Path: + env_path = os.environ.get("AX_REMINDERS_FILE") + if env_path: + return Path(env_path).expanduser() + + cwd = Path.cwd() + for parent in [cwd, *cwd.parents]: + ax_dir = parent / ".ax" + if ax_dir.is_dir(): + return ax_dir / "reminders.json" + return Path.home() / ".ax" / "reminders.json" + + +def _policy_file(path: str | None) -> Path: + return Path(path).expanduser() if path else _default_policy_file() + + +def _empty_store() -> dict[str, Any]: + return {"version": 1, "policies": []} + + +def _load_store(path: Path) -> dict[str, Any]: + if not path.exists(): + return _empty_store() + try: + data = json.loads(path.read_text()) + except json.JSONDecodeError as exc: + typer.echo(f"Error: reminder policy file is not valid JSON: {path} ({exc})", err=True) + raise typer.Exit(1) + if not isinstance(data, dict): + typer.echo(f"Error: reminder policy file must contain a JSON object: {path}", err=True) + raise typer.Exit(1) + data.setdefault("version", 1) + data.setdefault("policies", []) + if not isinstance(data["policies"], list): + typer.echo(f"Error: reminders policies must be a list: {path}", err=True) + raise typer.Exit(1) + return data + + +def _save_store(path: Path, store: dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + tmp.write_text(json.dumps(store, indent=2, sort_keys=True) + "\n") + tmp.replace(path) + path.chmod(0o600) + + +def _short_id() -> str: + return f"rem-{uuid.uuid4().hex[:10]}" + + +def _find_policy(store: dict[str, Any], policy_id: str) -> dict[str, Any]: + matches = [ + p for p in store.get("policies", []) if isinstance(p, dict) and str(p.get("id", "")).startswith(policy_id) + ] + if not matches: + typer.echo(f"Error: reminder policy not found: {policy_id}", err=True) + raise typer.Exit(1) + if len(matches) > 1: + typer.echo(f"Error: reminder policy id is ambiguous: {policy_id}", err=True) + raise typer.Exit(1) + return matches[0] + + +def _policy_rows(store: dict[str, Any]) -> list[dict[str, Any]]: + rows = [] + for policy in store.get("policies", []): + if not isinstance(policy, dict): + continue + rows.append( + { + "id": policy.get("id", ""), + "enabled": policy.get("enabled", True), + "task": policy.get("source_task_id", ""), + "target": policy.get("target") or "(task default)", + "next_fire": policy.get("next_fire_at", ""), + "fires": f"{policy.get('fired_count', 0)}/{policy.get('max_fires', '-')}", + "reason": policy.get("reason", ""), + } + ) + return rows + + +@app.command("add") +def add( + source_task: str = typer.Argument(..., help="Task ID to remind about"), + reason: str = typer.Option("Please review this task.", "--reason", "-r", help="Reminder text"), + target: Optional[str] = typer.Option(None, "--target", "-t", help="@agent/user; default resolves from task"), + first_at: Optional[str] = typer.Option(None, "--first-at", help="First fire time, ISO-8601 UTC"), + first_in: int = typer.Option(5, "--first-in-minutes", help="Minutes from now for first fire"), + cadence: int = typer.Option(5, "--cadence-minutes", help="Minutes between recurring fires"), + max_fires: int = typer.Option(1, "--max-fires", help="Maximum reminder fires before disabling"), + severity: str = typer.Option("info", "--severity", "-s", help="info | warn | critical"), + expected_response: Optional[str] = typer.Option(None, "--expected-response", help="What response is expected"), + space_id: Optional[str] = typer.Option(None, "--space-id", help="Override default space"), + policy_file: Optional[str] = typer.Option(None, "--file", help="Reminder policy JSON file"), + as_json: bool = JSON_OPTION, +) -> None: + """Add a local reminder policy. + + The policy is local state. Use ``ax reminders run`` to fire due policies. + """ + if max_fires < 1: + raise typer.BadParameter("--max-fires must be at least 1") + if cadence < 1: + raise typer.BadParameter("--cadence-minutes must be at least 1") + if first_in < 0: + raise typer.BadParameter("--first-in-minutes cannot be negative") + + first_at = _validate_timestamp(first_at, flag="--first-at") + next_fire = _parse_iso(first_at) if first_at else _now() + _dt.timedelta(minutes=first_in) + + client = get_client() + try: + resolved_space = resolve_space_id(client, explicit=space_id) + except Exception as exc: + typer.echo(f"Error: Space ID not resolvable: {exc}. Pass --space-id or configure default.", err=True) + raise typer.Exit(2) + + path = _policy_file(policy_file) + store = _load_store(path) + policy = { + "id": _short_id(), + "enabled": True, + "space_id": resolved_space, + "source_task_id": source_task, + "reason": reason, + "target": _strip_at(target), + "severity": _normalize_severity(severity), + "expected_response": expected_response, + "cadence_seconds": cadence * 60, + "next_fire_at": _iso(next_fire), + "max_fires": max_fires, + "fired_count": 0, + "fired_keys": [], + "created_at": _iso(_now()), + "updated_at": _iso(_now()), + } + store["policies"].append(policy) + _save_store(path, store) + + if as_json: + print_json({"policy": policy, "file": str(path)}) + return + + console.print(f"[bold cyan]Reminder policy added[/bold cyan] {policy['id']}") + console.print(f"[bold]file[/bold]: {path}") + console.print(f"[bold]next_fire_at[/bold]: {policy['next_fire_at']}") + + +@app.command("list") +def list_policies( + policy_file: Optional[str] = typer.Option(None, "--file", help="Reminder policy JSON file"), + as_json: bool = JSON_OPTION, +) -> None: + """List local reminder policies.""" + path = _policy_file(policy_file) + store = _load_store(path) + if as_json: + print_json({"file": str(path), "policies": store.get("policies", [])}) + return + rows = _policy_rows(store) + if not rows: + console.print(f"No reminder policies in {path}") + return + print_table( + ["ID", "Enabled", "Task", "Target", "Next Fire", "Fires", "Reason"], + rows, + keys=["id", "enabled", "task", "target", "next_fire", "fires", "reason"], + ) + + +@app.command("disable") +def disable( + policy_id: str = typer.Argument(..., help="Policy ID or unique prefix"), + policy_file: Optional[str] = typer.Option(None, "--file", help="Reminder policy JSON file"), + as_json: bool = JSON_OPTION, +) -> None: + """Disable a local reminder policy.""" + path = _policy_file(policy_file) + store = _load_store(path) + policy = _find_policy(store, policy_id) + policy["enabled"] = False + policy["updated_at"] = _iso(_now()) + _save_store(path, store) + if as_json: + print_json({"policy": policy, "file": str(path)}) + return + console.print(f"Disabled reminder policy {policy['id']}") + + +def _fire_policy(client: Any, policy: dict[str, Any], *, now: _dt.datetime) -> dict[str, Any]: + source_task = str(policy.get("source_task_id") or "") + reason = str(policy.get("reason") or "Please review this task.") + target = _strip_at(policy.get("target")) + target_resolved_from = None + if source_task and not target: + target, target_resolved_from = _resolve_target_from_task(client, source_task) + + try: + triggered_by = resolve_agent_name(client=client) + except Exception: + triggered_by = None + + fired_at = _iso(now) + metadata = _build_alert_metadata( + kind="reminder", + severity=str(policy.get("severity") or "info"), + target=target, + reason=reason, + source_task_id=source_task, + due_at=policy.get("due_at"), + remind_at=fired_at, + expected_response=policy.get("expected_response"), + response_required=True, + evidence=policy.get("evidence"), + triggered_by_agent=triggered_by, + title=policy.get("title"), + ) + metadata["reminder_policy"] = { + "policy_id": policy.get("id"), + "fire_key": policy.get("_current_fire_key"), + "cadence_seconds": policy.get("cadence_seconds"), + "fired_count": policy.get("fired_count", 0) + 1, + "max_fires": policy.get("max_fires"), + "target_resolved_from": target_resolved_from, + } + + result = client.send_message( + str(policy.get("space_id")), + _format_mention_content(target, reason, "reminder"), + channel=str(policy.get("channel") or "main"), + metadata=metadata, + message_type="reminder", + ) + message = result.get("message", result) if isinstance(result, dict) else {} + return { + "policy_id": policy.get("id"), + "message_id": message.get("id"), + "target": target, + "target_resolved_from": target_resolved_from, + "fired_at": fired_at, + } + + +def _due_policies(store: dict[str, Any], *, now: _dt.datetime) -> list[dict[str, Any]]: + due = [] + for policy in store.get("policies", []): + if not isinstance(policy, dict) or not policy.get("enabled", True): + continue + if int(policy.get("fired_count", 0)) >= int(policy.get("max_fires", 1)): + policy["enabled"] = False + policy["updated_at"] = _iso(now) + continue + try: + next_fire = _parse_iso(str(policy.get("next_fire_at"))) + except Exception: + policy["enabled"] = False + policy["disabled_reason"] = "invalid next_fire_at" + policy["updated_at"] = _iso(now) + continue + if next_fire <= now: + fire_key = f"{policy.get('id')}:{policy.get('next_fire_at')}" + if fire_key in set(policy.get("fired_keys") or []): + continue + policy["_current_fire_key"] = fire_key + due.append(policy) + return due + + +def _advance_policy(policy: dict[str, Any], *, now: _dt.datetime, message_id: str | None) -> None: + fire_key = str(policy.pop("_current_fire_key", "")) + fired_keys = list(policy.get("fired_keys") or []) + if fire_key: + fired_keys.append(fire_key) + policy["fired_keys"] = fired_keys[-50:] + policy["fired_count"] = int(policy.get("fired_count", 0)) + 1 + policy["last_fired_at"] = _iso(now) + policy["last_message_id"] = message_id + policy["updated_at"] = _iso(now) + + max_fires = int(policy.get("max_fires", 1)) + if policy["fired_count"] >= max_fires: + policy["enabled"] = False + policy["disabled_reason"] = "max_fires reached" + return + cadence_seconds = int(policy.get("cadence_seconds", 300)) + policy["next_fire_at"] = _iso(now + _dt.timedelta(seconds=cadence_seconds)) + + +@app.command("run") +def run( + once: bool = typer.Option(False, "--once", help="Run one due-policy pass and exit"), + watch: bool = typer.Option(False, "--watch", help="Keep running due-policy passes"), + interval: int = typer.Option(30, "--interval", help="Seconds between watch passes"), + policy_file: Optional[str] = typer.Option(None, "--file", help="Reminder policy JSON file"), + as_json: bool = JSON_OPTION, +) -> None: + """Fire due local reminder policies. + + Use ``--once`` for cron-like execution. Use ``--watch`` for dogfood loops. + """ + if not once and not watch: + once = True + if interval < 1: + raise typer.BadParameter("--interval must be at least 1 second") + + path = _policy_file(policy_file) + all_results: list[dict[str, Any]] = [] + client = get_client() + + while True: + store = _load_store(path) + now = _now() + pass_results: list[dict[str, Any]] = [] + for policy in _due_policies(store, now=now): + try: + result = _fire_policy(client, policy, now=now) + except httpx.HTTPStatusError as exc: + result = { + "policy_id": policy.get("id"), + "error": f"{exc.response.status_code} {exc.response.text[:200]}", + } + except (httpx.ConnectError, httpx.ReadError) as exc: + result = {"policy_id": policy.get("id"), "error": str(exc)} + if not result.get("error"): + _advance_policy(policy, now=now, message_id=result.get("message_id")) + pass_results.append(result) + all_results.append(result) + _save_store(path, store) + + if once: + if as_json: + print_json({"file": str(path), "fired": all_results}) + elif pass_results: + print_table( + ["Policy", "Message", "Target", "Fired At"], + pass_results, + keys=["policy_id", "message_id", "target", "fired_at"], + ) + else: + console.print(f"No due reminders in {path}") + return + + if pass_results and not as_json: + for item in pass_results: + if item.get("error"): + console.print(f"[red]{item['policy_id']}[/red]: {item['error']}") + else: + console.print( + f"[green]{item['policy_id']}[/green] fired " + f"message={item.get('message_id')} target={item.get('target')}" + ) + time.sleep(interval) diff --git a/ax_cli/config.py b/ax_cli/config.py index e509199..0ee9e86 100644 --- a/ax_cli/config.py +++ b/ax_cli/config.py @@ -129,6 +129,30 @@ def _load_local_config() -> dict: return {} +def _load_runtime_config_file(raw_path: str | None) -> dict: + """Load an explicit runtime config file and resolve its token_file.""" + if not raw_path: + return {} + config_path = Path(raw_path).expanduser() + cfg = tomllib.loads(config_path.read_text()) + token_file = cfg.get("token_file") + if token_file and not cfg.get("token"): + token_path = Path(str(token_file)).expanduser() + if not token_path.is_absolute(): + token_path = config_path.parent / token_path + cfg["token"] = token_path.read_text().strip() + return cfg + + +def _read_token_file(raw_path: str | None) -> str | None: + if not raw_path: + return None + try: + return Path(raw_path).expanduser().read_text().strip() + except OSError: + return None + + _global_config_warned = False _unsafe_local_config_warned = False @@ -590,7 +614,12 @@ def apply_cfg(cfg: dict, source: str) -> None: def _load_config() -> dict: - """Merge global -> active profile -> local. Local/env still win.""" + """Merge global -> active profile -> local -> explicit runtime file. + + AX_CONFIG_FILE is intentionally last because it is an environment-selected + runtime identity, used by channel/headless processes where CWD may contain + unrelated project config. + """ merged = _load_global_config() user_cfg = _load_user_config() if user_cfg: @@ -618,6 +647,11 @@ def _load_config() -> dict: merged.update(local_cfg) if "principal_type" not in local_cfg and _has_agent_identity(local_cfg): merged["principal_type"] = "agent" + explicit_cfg = _load_runtime_config_file(os.environ.get("AX_CONFIG_FILE")) + if explicit_cfg: + merged.update(explicit_cfg) + if "principal_type" not in explicit_cfg and _has_agent_identity(explicit_cfg): + merged["principal_type"] = "agent" return merged @@ -665,7 +699,9 @@ def _check_config_permissions() -> None: def resolve_token() -> str | None: _check_config_permissions() - return os.environ.get("AX_TOKEN") or _load_config().get("token") + return ( + os.environ.get("AX_TOKEN") or _read_token_file(os.environ.get("AX_TOKEN_FILE")) or _load_config().get("token") + ) def resolve_user_token() -> str | None: diff --git a/ax_cli/main.py b/ax_cli/main.py index ffcae1e..f6331e3 100644 --- a/ax_cli/main.py +++ b/ax_cli/main.py @@ -8,6 +8,7 @@ from .commands import ( agents, + alerts, apps, auth, channel, @@ -21,6 +22,7 @@ mint, profile, qa, + reminders, spaces, tasks, upload, @@ -34,6 +36,8 @@ app.add_typer(agents.app, name="agents") app.add_typer(apps.app, name="apps") app.add_typer(messages.app, name="messages") +app.add_typer(alerts.app, name="alerts") +app.add_typer(reminders.app, name="reminders") app.add_typer(tasks.app, name="tasks") app.add_typer(events.app, name="events") app.add_typer(listen.app, name="listen") diff --git a/channel/.mcp.json b/channel/.mcp.json index 9b9b303..7c6178d 100644 --- a/channel/.mcp.json +++ b/channel/.mcp.json @@ -1,8 +1,8 @@ { "mcpServers": { "ax-channel": { - "command": "bun", - "args": ["run", "--cwd", "${CLAUDE_PLUGIN_ROOT}", "--shell=bun", "--silent", "start"] + "command": "axctl", + "args": ["channel"] } } } diff --git a/channel/README.md b/channel/README.md index a52987c..3bce53c 100644 --- a/channel/README.md +++ b/channel/README.md @@ -31,7 +31,7 @@ aX Platform (next.paxai.app) │ SSE stream (real-time) ▼ ┌──────────────────────┐ -│ ax-channel │ Bun + MCP SDK +│ ax-channel │ axctl MCP stdio │ │ │ SSE listener ──┼── detects @mentions, queues in memory │ JWT auto-refresh ──┼── fresh token every reconnect @@ -69,18 +69,15 @@ The channel uses standard MCP protocol. While push notifications (`notifications ### Prerequisites - [Claude Code](https://claude.ai/code) v2.1.80+ with claude.ai login -- [Bun](https://bun.sh) installed (`bun --version`) +- `axctl` installed and on `PATH` - An aX platform account - `axctl login` completed in a trusted terminal with a user PAT - An agent-bound token/profile (`axp_a_...`) generated with `axctl token mint` ### Install -```bash -git clone https://github.com/ax-platform/ax-cli.git -cd ax-cli/channel -bun install -``` +Install `axctl` first, then use the bundled Claude Code channel definition. +The channel runtime is `axctl channel`. ### Configure @@ -95,25 +92,32 @@ axctl token mint your_agent --create --audience both --expires 30 \ axctl profile verify your-agent ``` -Preferred: point the channel at the agent config generated by -`axctl token mint`: - -``` -AX_CONFIG_FILE=/home/ax-agent/agents/your_agent/.ax/config.toml -AX_SPACE_ID=your_space_uuid +Preferred for a fixed Claude Code session: launch the channel through the +verified profile so no raw token is stored in `.mcp.json`: + +```json +{ + "mcpServers": { + "ax-channel": { + "command": "bash", + "args": [ + "-lc", + "eval \"$(axctl profile env your-agent)\" && exec axctl channel --agent your_agent --space-id your_space_uuid" + ] + } + } +} ``` -The config file should contain the same shape `axctl` writes: +Also supported: point the channel at the generated agent config: ``` -token_file = "/home/ax-agent/agents/your_agent/.ax/your_agent_token" -base_url = "https://next.paxai.app" -agent_name = "your_agent" -agent_id = "your_agent_uuid" +AX_CONFIG_FILE=/home/ax-agent/agents/your_agent/.ax/config.toml +AX_SPACE_ID=your_space_uuid ``` -Fallback: set `AX_TOKEN_FILE`, `AX_BASE_URL`, `AX_AGENT_NAME`, `AX_AGENT_ID`, and -`AX_SPACE_ID` directly. +Fallback: set `AX_TOKEN_FILE`, `AX_BASE_URL`, `AX_AGENT_NAME`, `AX_AGENT_ID`, +and `AX_SPACE_ID` directly. Do not configure the channel with a user PAT. User tokens are for setup and credential minting; channel runtime should use the agent's own PAT/JWT. @@ -145,12 +149,42 @@ Send a message mentioning your agent on the aX platform: The message appears in your Claude Code session as a `` tag. Reply with the `reply` tool and it shows up on the platform. +When `ax-channel` successfully delivers the message into Claude Code, it also +publishes a best-effort `agent_processing` event with `status=working` for the +original message. After the `reply` tool sends a response, it publishes +`status=completed`. This is how aX can show that the Claude Code session is +active and working instead of leaving the sender guessing. + +Disable this only for debugging: + +```bash +axctl channel --no-processing-status +``` + +### Headless Smoke Test + +Use the smoke harness to test the channel runtime without restarting Claude Code: + +```bash +python3 scripts/channel_smoke.py \ + --listener-profile next-orion \ + --sender-profile next-chatgpt \ + --profile-workdir /home/ax-agent \ + --agent orion \ + --space-id 49afd277-78d2-4a32-9858-3594cda684af \ + --case reply \ + --channel-command 'bun run --cwd /home/ax-agent/channel --shell=bun --silent start --debug' +``` + +`delivery` proves the bridge received the message and emitted `working`. +`reply` also calls the channel `reply` tool and verifies `completed`. + ## Features - **Real-time push** — SSE listener detects @mentions and delivers instantly via MCP channel notifications - **Polling fallback** — `get_messages` tool for any MCP client that doesn't support push - **Reply tool** — respond in-thread, messages appear as your agent on the platform -- **Ack + heartbeat** — creates one status message, updates it in place while working (no noise) +- **Activity status** — emits `working` on delivery and `completed` after reply so the UI can show that the session is alive - **Message queue** — all mentions buffered in memory, never dropped during busy periods - **JWT auto-refresh** — fresh token on every SSE reconnect, no silent expiry - **Self-filter** — ignores your own messages to prevent loops @@ -158,7 +192,8 @@ The message appears in your Claude Code session as a `` tag. Reply with ## Configuration -All config is read from environment variables, falling back to `~/.claude/channels/ax-channel/.env`: +`axctl channel` reads `~/.claude/channels/ax-channel/.env` as defaults, then the +standard CLI config cascade plus these environment variables: | Variable | Description | Default | |----------|-------------|---------| diff --git a/channel/package.json b/channel/package.json index 0deb0af..d26bc57 100644 --- a/channel/package.json +++ b/channel/package.json @@ -3,7 +3,7 @@ "version": "0.1.0", "type": "module", "scripts": { - "start": "bun install --no-summary && bun server.ts" + "start": "axctl channel" }, "dependencies": { "@mcpjam/sdk": "^0.8.14", diff --git a/docs/agent-authentication.md b/docs/agent-authentication.md index cccc91e..4027cb3 100644 --- a/docs/agent-authentication.md +++ b/docs/agent-authentication.md @@ -264,6 +264,8 @@ ax profile use ──► verifies all three ──► ax commands 4. Tokens live in files (mode 600), never in config.toml 5. Setup automation stores scoped agent PATs without printing them unless explicitly requested with `--print-token` +6. One active agent PAT is the normal state. Two active PATs is only a rotation + window; revoke the old one after the replacement profile verifies. ## Profile Verification @@ -286,6 +288,27 @@ Register Agent → Create Scoped PAT → Save Token File → Revoke (when decommissioning) ``` +### Rotation With Existing CLI Commands + +You do not need a special rotate endpoint to rotate an agent PAT safely. The +simple loop is: check the keys, mint one replacement, test it, then remove the +old one. Use the credential-management commands as a transaction: + +1. From a verified user bootstrap login, inventory credentials: + `axctl credentials list --json`. Use `axctl credentials audit` for the + active-key policy view and `axctl credentials audit --strict` in automation. +2. Mint the replacement for the same agent and audience: + `axctl token mint --audience --expires --save-to --profile --no-print-token`. +3. Verify the new profile: + `axctl profile verify ` and `axctl auth whoami --json`. +4. Revoke the old credential id: + `axctl credentials revoke `. + +Do not revoke first. A rotation is complete only when the replacement token +works and the previous credential is revoked. If an agent has two active PATs, +show that as a warning; if it has more than two, stop and clean up stale keys +before issuing more. + ## Troubleshooting | Error | Fix | diff --git a/docs/credential-security.md b/docs/credential-security.md index ef5c16f..75e6a86 100644 --- a/docs/credential-security.md +++ b/docs/credential-security.md @@ -121,6 +121,50 @@ The safe pattern is: `axctl token mint` hides newly minted PATs by default when it stores them locally. Use `--print-token` only when a human explicitly needs to copy the token. +## Agent PAT Rotation + +The simple loop is: check the keys, mint one replacement, test it, then remove +the old one. Rotation is built from the existing CLI commands instead of +relying on a separate rotate API: + +1. `axctl credentials list --json` +2. `axctl credentials audit` +3. `axctl token mint --audience --expires --save-to --profile --no-print-token` +4. `axctl profile verify ` +5. `axctl auth whoami --json` +6. `axctl credentials revoke ` + +The normal target is one active PAT per agent. Two active PATs is acceptable +only during the rotation window. More than two active PATs for one agent should +be treated as a security hygiene issue and cleaned up before issuing another +token. + +## Credential Detection Signals + +Warnings should come from credential metadata, not guesswork: + +- Active keys per agent: warn at two active keys, block or require cleanup above + two. +- New device or host fingerprint for an existing token. +- New location, IP region, or ASN for an existing token. +- Impossible travel between two token uses. +- Active token with old `last_used_at`. +- Token used against an unexpected audience, space, or bound agent. + +These should become normal alerts in the product: tell the user what changed, +which token/agent is involved, when it happened, and the recommended action, +usually "verify this was you" or "revoke the inactive key." + +### Same-Location Limit + +Device and location fingerprints are useful when a token appears somewhere new. +They are much less useful when a token is used from the expected hashed +location. In that case the hard question is whether the runtime host or user +account is compromised. That is a different threat class: we reduce blast radius +with one agent PAT per agent, mode `0600` token files, profile verification, +short-lived exchanged JWTs, audit logs, and fast revocation, but we should not +claim fingerprinting can detect every same-host compromise. + Local isolation note: a fully trusted shell agent running as the same OS user can generally read files that the user can read. Device trust and OS secret storage reduce exposure, but untrusted code still needs process/user-level isolation. diff --git a/scripts/channel_smoke.py b/scripts/channel_smoke.py new file mode 100644 index 0000000..80e8cf1 --- /dev/null +++ b/scripts/channel_smoke.py @@ -0,0 +1,304 @@ +#!/usr/bin/env python3 +"""Headless live smoke test for the Claude Code aX channel bridge. + +This starts the channel MCP server as a subprocess, performs the minimal MCP +handshake, sends a real aX message to the listener agent, verifies that the +channel receives it, and optionally calls the channel reply tool to verify the +completed status path. +""" + +from __future__ import annotations + +import argparse +import contextlib +import json +import os +import queue +import shlex +import subprocess +import sys +import threading +import time +from dataclasses import dataclass +from typing import Any + + +@dataclass +class ProcessOutput: + source: str + line: str + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Run a headless live smoke test for axctl channel.") + parser.add_argument("--listener-profile", required=True, help="Agent profile used by the channel listener.") + parser.add_argument("--sender-profile", required=True, help="Profile used to send the test message.") + parser.add_argument( + "--profile-workdir", + default=None, + help="Working directory used when evaluating axctl profile env, for profile verification.", + ) + parser.add_argument("--agent", required=True, help="Agent name the channel listens as.") + parser.add_argument("--space-id", required=True, help="Space id to bridge and send into.") + parser.add_argument( + "--case", + choices=["delivery", "reply"], + default="reply", + help="delivery verifies working; reply also calls the reply tool and verifies completed.", + ) + parser.add_argument("--timeout", type=float, default=25.0, help="Seconds to wait for each expected event.") + parser.add_argument( + "--channel-command", + default="axctl channel --debug", + help="Command to launch the channel server. It runs with listener profile env applied.", + ) + parser.add_argument( + "--message", + default="headless channel smoke", + help="Message body suffix; @agent is prepended automatically.", + ) + return parser.parse_args() + + +def profile_env(profile: str, *, cwd: str | None = None) -> dict[str, str]: + cmd = f'eval "$(axctl profile env {shlex.quote(profile)})" && env' + result = subprocess.run(["bash", "-lc", cmd], check=True, capture_output=True, text=True, cwd=cwd) + env: dict[str, str] = {} + for line in result.stdout.splitlines(): + if "=" not in line: + continue + key, value = line.split("=", 1) + if key.startswith("AX_"): + env[key] = value + return env + + +def enqueue_lines(stream, source: str, out: "queue.Queue[ProcessOutput]") -> None: + for line in iter(stream.readline, ""): + out.put(ProcessOutput(source, line.rstrip("\n"))) + + +def start_reader_threads(proc: subprocess.Popen[str], out: "queue.Queue[ProcessOutput]") -> None: + assert proc.stdout is not None + assert proc.stderr is not None + threading.Thread(target=enqueue_lines, args=(proc.stdout, "stdout", out), daemon=True).start() + threading.Thread(target=enqueue_lines, args=(proc.stderr, "stderr", out), daemon=True).start() + + +def send_json(proc: subprocess.Popen[str], payload: dict[str, Any]) -> None: + assert proc.stdin is not None + proc.stdin.write(json.dumps(payload, separators=(",", ":")) + "\n") + proc.stdin.flush() + + +def parse_json_line(line: str) -> dict[str, Any] | None: + try: + return json.loads(line) + except json.JSONDecodeError: + return None + + +def wait_for_output( + out: "queue.Queue[ProcessOutput]", + *, + timeout: float, + predicate, + label: str, +) -> ProcessOutput: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + item = out.get(timeout=0.25) + except queue.Empty: + continue + print(f"[channel:{item.source}] {item.line}", file=sys.stderr) + if predicate(item): + return item + raise TimeoutError(f"Timed out waiting for {label}") + + +def start_processing_watcher(sender_env: dict[str, str], space_id: str) -> subprocess.Popen[str]: + env = os.environ.copy() + env.update(sender_env) + env["AX_SPACE_ID"] = space_id + return subprocess.Popen( + ["axctl", "events", "stream", "--filter", "agent_processing", "--json"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + env=env, + ) + + +def send_test_message(sender_env: dict[str, str], *, space_id: str, agent: str, message: str) -> str: + env = os.environ.copy() + env.update(sender_env) + env["AX_SPACE_ID"] = space_id + content = f"@{agent} {message}" + result = subprocess.run( + [ + "axctl", + "send", + "--space-id", + space_id, + "--to", + agent, + "--no-wait", + "--json", + content, + ], + check=True, + capture_output=True, + text=True, + env=env, + ) + data = json.loads(result.stdout) + message_data = data.get("sent", {}).get("message") or data.get("message") or data + message_id = message_data.get("id") + if not message_id: + raise RuntimeError(f"Could not find sent message id in: {result.stdout}") + print(f"[smoke] sent message {message_id}", file=sys.stderr) + return str(message_id) + + +def wait_for_processing_event( + out: "queue.Queue[ProcessOutput]", + *, + message_id: str, + status: str, + timeout: float, +) -> dict[str, Any]: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + item = out.get(timeout=0.25) + except queue.Empty: + continue + line = item.line + line = line.strip() + print(f"[events:{item.source}] {line}", file=sys.stderr) + if item.source != "stdout": + continue + payload = parse_json_line(line) + data = payload.get("data", {}) if payload else {} + if data.get("message_id") == message_id and data.get("status") == status: + return data + raise TimeoutError(f"Timed out waiting for agent_processing {status} for {message_id}") + + +def stop_process(proc: subprocess.Popen[str]) -> None: + proc.terminate() + try: + proc.wait(timeout=3) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=3) + + +def main() -> int: + args = parse_args() + listener_env = profile_env(args.listener_profile, cwd=args.profile_workdir) + sender_env = profile_env(args.sender_profile, cwd=args.profile_workdir) + listener_env["AX_SPACE_ID"] = args.space_id + + proc = subprocess.Popen( + shlex.split(args.channel_command), + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + env={**os.environ, **listener_env}, + ) + out: queue.Queue[ProcessOutput] = queue.Queue() + start_reader_threads(proc, out) + watcher = start_processing_watcher(sender_env, args.space_id) + events_out: queue.Queue[ProcessOutput] = queue.Queue() + start_reader_threads(watcher, events_out) + + try: + send_json( + proc, + { + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": { + "protocolVersion": "2025-11-25", + "capabilities": {}, + "clientInfo": {"name": "ax-channel-smoke", "version": "0.1"}, + }, + }, + ) + wait_for_output( + out, + timeout=args.timeout, + label="initialize response", + predicate=lambda item: (parse_json_line(item.line) or {}).get("id") == 1, + ) + send_json(proc, {"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}) + + message_id = send_test_message( + sender_env, + space_id=args.space_id, + agent=args.agent, + message=f"{args.message} {int(time.time())}", + ) + wait_for_output( + out, + timeout=args.timeout, + label="channel notification", + predicate=lambda item: (parse_json_line(item.line) or {}) + .get("params", {}) + .get("meta", {}) + .get("message_id") + == message_id, + ) + working = wait_for_processing_event( + events_out, + message_id=message_id, + status="working", + timeout=args.timeout, + ) + print(f"[smoke] working event ok: {working}", file=sys.stderr) + + if args.case == "reply": + send_json( + proc, + { + "jsonrpc": "2.0", + "id": 2, + "method": "tools/call", + "params": { + "name": "reply", + "arguments": { + "reply_to": message_id, + "text": "Headless channel smoke reply: received.", + }, + }, + }, + ) + wait_for_output( + out, + timeout=args.timeout, + label="reply tool response", + predicate=lambda item: (parse_json_line(item.line) or {}).get("id") == 2, + ) + completed = wait_for_processing_event( + events_out, + message_id=message_id, + status="completed", + timeout=args.timeout, + ) + print(f"[smoke] completed event ok: {completed}", file=sys.stderr) + + print(json.dumps({"ok": True, "message_id": message_id, "case": args.case})) + return 0 + finally: + with contextlib.suppress(Exception): + stop_process(watcher) + with contextlib.suppress(Exception): + stop_process(proc) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/skills/SKILL.md b/skills/SKILL.md index 7cb69e2..59230ef 100644 --- a/skills/SKILL.md +++ b/skills/SKILL.md @@ -141,6 +141,30 @@ user PAT -> user JWT -> agent PAT -> agent JWT -> runtime actions The user PAT bootstraps the mesh. Agent PATs run the mesh. Agents must not use runtime credentials to self-replicate or mint unconstrained child agents. +## Agent PAT Rotation + +The simple loop is: check the keys, mint one replacement, test it, then remove +the old one. A first-class rotate command is convenience, not a requirement. + +Safe rotation algorithm: + +1. Verify a user bootstrap login, not an agent runtime profile: + `axctl auth whoami --json`. +2. Inventory credentials: `axctl credentials list --json`, then use + `axctl credentials audit` for the active-key policy view. +3. Mint a replacement for the same agent and audience: + `axctl token mint --audience --expires --save-to --profile --no-print-token`. +4. Verify the replacement profile: + `axctl profile verify ` and `axctl auth whoami --json`. +5. Revoke the old credential id: + `axctl credentials revoke `. + +Policy: one active agent PAT is normal. Two active PATs is a temporary rotation +window and should be called out as a warning. More than two active PATs for one +agent is a cleanup issue; do not mint another token until stale credentials are +removed. Detection should also watch for new device/location use, impossible +travel, unexpected audience/space/agent binding, and stale active tokens. + ## Step 5: Daily Operations — The Golden Path This is your steady-state workflow. Every agent should both listen and send. @@ -148,6 +172,41 @@ Inbound work arrives through the listener/watch path. Outbound owned work uses the composed handoff path so task creation, message delivery, waiting, and evidence stay connected. +### aX tool cadence + +For substantive work, use aX tools as the shared-state surface. This is a +low-friction floor; active agents usually call many tools already, so the +important part is making sure some of that work lands in the shared aX surface. +aX tools means `axctl` commands or equivalent aX MCP tools: identity, messages, +tasks, context, uploads, app signals, alerts, handoffs, and watch/listen +operations. Shell, git, pytest, and browser tools can prove work, but they do +not by themselves update the mesh. + +Default cadence: + +1. Prove identity or current state with `axctl auth whoami --json`, + `axctl messages list`, `axctl tasks list`, or the equivalent MCP tool. +2. Record durable state when something changes: task update, context upload, + artifact key, app signal, or alert. +3. Emit one visible message or signal when a human or another agent needs to + know what happened. + +This is a standard, not a quota for fake activity. Batch small observations when +possible, but do not disappear into private work. If no CLI/MCP preference is +documented, check who you are first, then follow the human's preference. Prefer +CLI when the runtime has shell access; use MCP when the runtime is a +desktop/mobile/app surface or the MCP tool is the configured integration. + +If identity is unclear, do not guess. Use `axctl auth whoami --json`, the MCP +`whoami`/identity tool, or the nearest equivalent, then choose the correct aX +tool surface. When handing work to another agent, include the relevant operating +preference so the next agent inherits the same standard. + +If an agent does not use aX tools, it is off-mesh. The team loses wake signals, +task state, transcript evidence, context artifacts, and resumability. Private +tool use can still solve local work, but it does not keep the collaboration +connected. + ### Check in ```bash axctl auth whoami # confirm identity @@ -270,12 +329,11 @@ Then run the channel from that generated agent config: { "mcpServers": { "ax-channel": { - "command": "bun", - "args": ["run", "server.ts"], - "env": { - "AX_CONFIG_FILE": "/home/my-agent/.ax/config.toml", - "AX_SPACE_ID": "" - } + "command": "bash", + "args": [ + "-lc", + "eval \"$(axctl profile env prod-my-agent)\" && exec axctl channel --agent my-agent --space-id " + ] } } } @@ -283,6 +341,16 @@ Then run the channel from that generated agent config: Do not configure `ax-channel` with a user PAT. The CLI handles bootstrap and operations; the channel is the live delivery layer for an agent identity. +By default, `ax-channel` also publishes best-effort Activity Stream processing +signals: `working` when it delivers an inbound message to Claude Code and +`completed` after the `reply` tool posts back. That is the standard way to know +the channel session actually received work. Use `--no-processing-status` only +for debugging. + +`axctl send --wait --to ` should surface those transport-level +processing events while waiting for the final reply. A `working` status is a +runtime delivery signal, not an agent-authored acknowledgement and not a final +answer. ### Bring Your Own Agent Any script or binary becomes a live agent: @@ -297,7 +365,9 @@ All agents in a space share context: axctl context set "spec:auth" "$(cat auth-spec.md)" # set context axctl context get "spec:auth" # any agent can read it axctl upload file ./diagram.png --key "arch-diagram" # upload shared files +axctl context fetch-url "https://example.com/a.md" --upload --key "article" # fetch remote artifact axctl context download "arch-diagram" --output ./d.png # any agent can download +axctl context preview "arch-diagram" --json # cache protected artifact for inspection ``` ## Coordination Patterns diff --git a/specs/AGENT-PAT-001/spec.md b/specs/AGENT-PAT-001/spec.md index c8002cf..f5e9939 100644 --- a/specs/AGENT-PAT-001/spec.md +++ b/specs/AGENT-PAT-001/spec.md @@ -259,6 +259,53 @@ Required: - Revoke all PATs issued by one device. - Expire PATs automatically. - Keep JWT TTL short enough that revocation propagates quickly. +- Operators can rotate an agent PAT with existing CLI primitives: + list credentials, mint a replacement PAT for the same agent and audience, + verify the replacement profile, then revoke the old credential id. +- Agent-bound PAT inventory should normally show one active PAT per agent. + Two active PATs is allowed only as a short rotation window and should produce + a warning. More than two active PATs for the same agent is a hygiene violation + and should require explicit cleanup before more credentials are minted. + +Standard rotation algorithm: + +1. Run `axctl credentials list --json` from a verified user bootstrap login. + Use `axctl credentials audit` for the operator-friendly active-key report + and `axctl credentials audit --strict` in automation. +2. Identify the old active credential id, target agent, audience, and + `last_used_at`. +3. Run `axctl token mint --audience --expires + --save-to --profile --no-print-token`. +4. Run `axctl profile verify ` and + `axctl auth whoami --json` using the replacement profile. +5. Only after the replacement works, run + `axctl credentials revoke `. + +Do not bulk-revoke live credentials without a verified inventory. If a +credential cannot be matched to a current profile, recent `last_used_at`, or +known runtime, treat it as a cleanup candidate and revoke it deliberately. + +Detection and warning signals: + +- Active count per bound agent: `1` is normal, `2` is a rotation warning, + `>2` is a cleanup warning. +- First use from a new device fingerprint, host fingerprint, IP region, or ASN + should create an alert for the owning user. +- A token used from two locations within an impossible travel window should + create a high-severity alert. +- A token that has not been used recently but remains active should be shown as + stale and safe to review for revocation. +- A token used with an unexpected audience, space, or agent binding should be + rejected when possible and logged as a suspicious credential event. + +Fingerprint limits: + +- If a credential is used from the expected hashed device/location context, the + detection signal is weaker. That can mean legitimate use, or it can mean the + attacker is already inside the same host or user account. +- Same-context token misuse is handled by least privilege, mode `0600` token + files, profile verification, short JWT TTLs, audit logs, and fast revocation. + Fingerprinting is not a substitute for endpoint or host security. Recommended initial TTLs: diff --git a/specs/LISTENER-001/spec.md b/specs/LISTENER-001/spec.md index 092a797..42cdef5 100644 --- a/specs/LISTENER-001/spec.md +++ b/specs/LISTENER-001/spec.md @@ -25,6 +25,26 @@ explicit `@agent` mention. - Messages sent through separate CLI commands are remembered when the listener sees their self-authored SSE event. +## Activity Status + +`ax channel` must make channel liveness visible in the same Activity Stream +surface as other agent runtimes: + +- When the channel bridge delivers an inbound aX message to Claude Code, it + publishes `agent_processing` with `status="working"` for the inbound + `message_id`. +- When the Claude Code session sends a successful `reply` tool response, it + publishes `agent_processing` with `status="completed"` for the same inbound + `message_id`. +- The status publish is best-effort and must not block message delivery or + replies. +- Operators may disable this with `ax channel --no-processing-status` for + debugging, but the default is enabled. + +This proves the session received the work. If a Claude Code session is stopped, +the channel will not receive the SSE event and no `working` status should be +published. + ## Backend Contract The backend must include `parent_id` in SSE and MCP message events. The CLI does @@ -73,3 +93,7 @@ The reply-anchor check only runs after this self-filter. - Self-authored messages are never delivered back as prompts. - `ax listen`, `ax events stream`, and `ax channel` pass the resolved `space_id` to `connect_sse`. +- `ax channel` emits best-effort `agent_processing=working` when it delivers a + message to Claude Code. +- `ax channel` emits best-effort `agent_processing=completed` after a successful + reply tool send. diff --git a/tests/conftest.py b/tests/conftest.py index dbf4623..cdcbe15 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,7 +10,7 @@ def clean_env(monkeypatch, tmp_path): for var in ( "AX_TOKEN", "AX_BASE_URL", "AX_AGENT_NAME", "AX_AGENT_ID", "AX_SPACE_ID", "AX_ENV", "AX_USER_ENV", "AX_USER_TOKEN", - "AX_USER_BASE_URL", + "AX_USER_BASE_URL", "AX_TOKEN_FILE", "AX_CONFIG_FILE", ): monkeypatch.delenv(var, raising=False) # Point global config to an empty dir so real ~/.ax/ doesn't leak in diff --git a/tests/test_alerts_commands.py b/tests/test_alerts_commands.py new file mode 100644 index 0000000..b71bb54 --- /dev/null +++ b/tests/test_alerts_commands.py @@ -0,0 +1,603 @@ +"""Tests for ax alerts — metadata shape + state transitions. + +These lock down the contract the frontend AlertCardBody reads. If any of +these fields drift, the alert card will silently render wrong or fall +back to a generic result card. +""" + +from __future__ import annotations + +import json +import re +from typing import Any + +from typer.testing import CliRunner + +from ax_cli.main import app + +runner = CliRunner() +ANSI_RE = re.compile(r"\x1b\[[0-9;]*m") + + +def _strip_ansi(text: str) -> str: + return ANSI_RE.sub("", text) + + +class _FakeClient: + """Captures send_message / PATCH calls for assertion.""" + + _base_headers: dict[str, str] = {} + + def __init__(self, preloaded_message: dict[str, Any] | None = None) -> None: + self.sent: dict[str, Any] = {} + self.patched: dict[str, Any] = {} + self._preloaded_message = preloaded_message or {} + + # Fake http client so _post_state_change can use client._http.patch + fake_self = self + + class _Http: + def patch(self, path: str, *, json: dict, headers: dict) -> Any: # noqa: A002 + fake_self.patched = {"path": path, "json": json, "headers": headers} + + class _R: + status_code = 200 + + def raise_for_status(self_inner) -> None: + return None + + def json(self_inner) -> dict: + return {"message": {"id": path.rsplit("/", 1)[-1], **json}} + + return _R() + + def get(self, path: str, *, headers: dict) -> Any: + class _R: + status_code = 200 + + def raise_for_status(self_inner) -> None: + return None + + def json(self_inner) -> dict: + return {"message": fake_self._preloaded_message} + + return _R() + + self._http = _Http() + + def _with_agent(self, _agent_id: str | None) -> dict[str, str]: + return {} + + def _parse_json(self, r: Any) -> Any: + return r.json() + + def send_message( + self, + space_id: str, + content: str, + *, + channel: str = "main", + parent_id: str | None = None, + attachments: list[dict] | None = None, + metadata: dict | None = None, + message_type: str = "text", + **_kwargs: Any, + ) -> dict: + self.sent = { + "space_id": space_id, + "content": content, + "channel": channel, + "parent_id": parent_id, + "attachments": attachments, + "metadata": metadata, + "message_type": message_type, + } + return {"id": "msg-42"} + + def get_message(self, message_id: str) -> dict: + return {"message": self._preloaded_message} + + +def _install_fake_client(monkeypatch, client: _FakeClient) -> None: + monkeypatch.setattr("ax_cli.commands.alerts.get_client", lambda: client) + monkeypatch.setattr( + "ax_cli.commands.alerts.resolve_space_id", + lambda _client, *, explicit=None: "space-abc", + ) + monkeypatch.setattr( + "ax_cli.commands.alerts.resolve_agent_name", + lambda client=None: "orion", + ) + + +def test_send_builds_alert_metadata_with_ui_card_type_alert(monkeypatch): + fake = _FakeClient() + _install_fake_client(monkeypatch, fake) + + result = runner.invoke( + app, + [ + "alerts", + "send", + "ALB /auth/me is 5xx", + "--target", + "@orion", + "--severity", + "critical", + ], + ) + assert result.exit_code == 0, _strip_ansi(result.stdout) + + metadata = fake.sent["metadata"] + assert metadata is not None, "alert metadata must be sent" + + # Frontend contract: metadata.alert must carry kind + severity + alert = metadata["alert"] + assert alert["kind"] == "alert" + assert alert["severity"] == "critical" + assert alert["target_agent"] == "orion" + assert alert["state"] == "triggered" + assert alert["response_required"] is False + assert "fired_at" in alert + + # The card type must be "alert" — this is what triggers AlertCardBody + # to render instead of the generic result/signal card. + cards = metadata["ui"]["cards"] + assert len(cards) == 1 + card = cards[0] + assert card["type"] == "alert", "card type must be 'alert' so AxMessageWidgets renders AlertCardBody" + assert card["payload"]["intent"] == "alert" + assert card["payload"]["alert"]["severity"] == "critical" + + # Content should @-mention the target so notification routing fires + assert fake.sent["content"].startswith("@orion ") + # message_type = "alert" so stream filters can distinguish from text + assert fake.sent["message_type"] == "alert" + + +def test_reminder_requires_source_task_and_marks_kind_task_reminder(monkeypatch): + fake = _FakeClient() + _install_fake_client(monkeypatch, fake) + + # reminder kind without source_task should fail with a clear error + result_no_task = runner.invoke( + app, + ["alerts", "send", "followup", "--kind", "reminder"], + ) + assert result_no_task.exit_code != 0 + assert "source-task" in _strip_ansi(result_no_task.stdout + (result_no_task.stderr or "")) + + # With source_task, reminder goes through with kind task_reminder + result = runner.invoke( + app, + [ + "alerts", + "send", + "review launch board", + "--kind", + "reminder", + "--source-task", + "dfef4c92", + "--target", + "@orion", + "--remind-at", + "2026-04-16T17:00:00Z", + "--due-at", + "2026-04-16T20:00:00Z", + ], + ) + assert result.exit_code == 0, _strip_ansi(result.stdout) + + alert = fake.sent["metadata"]["alert"] + # Frontend treats task_reminder as a reminder variant of alert kind. + assert alert["kind"] == "task_reminder" + assert alert["source_task_id"] == "dfef4c92" + assert alert["remind_at"] == "2026-04-16T17:00:00Z" + assert alert["due_at"] == "2026-04-16T20:00:00Z" + + # Compact: reminder must NOT embed the task board as initial_data + # (the dogfood gap ChatGPT flagged). + card = fake.sent["metadata"]["ui"]["cards"][0] + assert card["type"] == "alert" + assert "initial_data" not in card.get("payload", {}), "reminder card must not embed task-board initial_data" + assert "widget" not in fake.sent["metadata"], "no mcp_app widget hydration for the first slice" + # resource_uri should point at the linked task so the card is clickable + assert card["payload"]["resource_uri"] == "ui://tasks/dfef4c92" + + # message_type distinguishes reminders in the stream + assert fake.sent["message_type"] == "reminder" + + +def test_reminder_shortcut_command_equivalent_to_send_kind_reminder(monkeypatch): + fake = _FakeClient() + _install_fake_client(monkeypatch, fake) + + result = runner.invoke( + app, + [ + "alerts", + "reminder", + "check dev smoke", + "--source-task", + "dfef4c92", + "--target", + "orion", + ], + ) + assert result.exit_code == 0, _strip_ansi(result.stdout) + assert fake.sent["metadata"]["alert"]["kind"] == "task_reminder" + assert fake.sent["metadata"]["alert"]["source_task_id"] == "dfef4c92" + + +def test_severity_normalization_rejects_garbage(monkeypatch): + fake = _FakeClient() + _install_fake_client(monkeypatch, fake) + + bad = runner.invoke( + app, + ["alerts", "send", "oops", "--severity", "bogus"], + ) + assert bad.exit_code != 0 + + # warning is normalized to warn; error → critical + ok = runner.invoke( + app, + ["alerts", "send", "watch it", "--severity", "warning"], + ) + assert ok.exit_code == 0 + assert fake.sent["metadata"]["alert"]["severity"] == "warn" + + +def test_ack_posts_state_change_reply_linked_to_parent(monkeypatch): + existing = { + "id": "msg-99", + "space_id": "space-abc", + "metadata": { + "alert": { + "kind": "alert", + "severity": "warn", + "state": "triggered", + "target_agent": "orion", + }, + }, + } + fake = _FakeClient(preloaded_message=existing) + _install_fake_client(monkeypatch, fake) + + result = runner.invoke(app, ["alerts", "ack", "msg-99"]) + assert result.exit_code == 0, _strip_ansi(result.stdout) + + # Reply goes through send_message, not PATCH, because backend PATCH + # drops metadata. State-change reply links to parent via parent_id. + assert fake.sent["parent_id"] == "msg-99", ( + "state-change must be a reply (parent_id set) so the stream links to the original alert" + ) + assert fake.sent["message_type"] == "alert_state_change" + + meta = fake.sent["metadata"] + change = meta["alert_state_change"] + assert change["parent_message_id"] == "msg-99" + assert change["new_state"] == "acknowledged" + assert change["previous_state"] == "triggered" + assert change["kind"] == "alert" + + # Mirror alert block so card renderers that read metadata.alert still + # see this as a lightweight event in the stream. + assert meta["alert"]["kind"] == "alert_state_change" + assert meta["alert"]["state"] == "acknowledged" + assert meta["alert"]["severity"] == "warn", "inherits parent severity" + assert meta["alert"]["parent_message_id"] == "msg-99" + + +def test_resolve_transitions_state_to_resolved(monkeypatch): + existing = { + "id": "msg-100", + "space_id": "space-abc", + "metadata": {"alert": {"kind": "alert", "state": "acknowledged"}}, + } + fake = _FakeClient(preloaded_message=existing) + _install_fake_client(monkeypatch, fake) + + result = runner.invoke(app, ["alerts", "resolve", "msg-100"]) + assert result.exit_code == 0, _strip_ansi(result.stdout) + assert fake.sent["metadata"]["alert_state_change"]["new_state"] == "resolved" + assert fake.sent["metadata"]["alert_state_change"]["previous_state"] == "acknowledged" + + +def test_state_rejects_unknown_value(monkeypatch): + existing = {"id": "x", "space_id": "space-abc", "metadata": {"alert": {"kind": "alert"}}} + fake = _FakeClient(preloaded_message=existing) + _install_fake_client(monkeypatch, fake) + + bad = runner.invoke(app, ["alerts", "state", "x", "zombie"]) + assert bad.exit_code != 0 + + +def test_snooze_transitions_to_snoozed_state(monkeypatch): + existing = { + "id": "msg-snz", + "space_id": "space-abc", + "metadata": {"alert": {"kind": "task_reminder", "state": "triggered"}}, + } + fake = _FakeClient(preloaded_message=existing) + _install_fake_client(monkeypatch, fake) + + result = runner.invoke(app, ["alerts", "snooze", "msg-snz"]) + assert result.exit_code == 0, _strip_ansi(result.stdout) + assert fake.sent["metadata"]["alert_state_change"]["new_state"] == "snoozed" + + +def test_source_task_auto_targets_assignee_when_target_omitted(monkeypatch): + """When --source-task is given and --target is not, default to the + task's assignee (preferred) or creator (fallback). This keeps tasks + as source-of-truth and stops manual --target typing for task-linked + reminders.""" + fake = _FakeClient() + + # Stub out the http helpers _resolve_target_from_task uses + task_payload = { + "task": { + "id": "dfef4c92", + "assignee_id": "agent-assignee-id", + "creator_id": "agent-creator-id", + } + } + agent_payloads = { + "agent-assignee-id": {"agent": {"id": "agent-assignee-id", "name": "orion"}}, + "agent-creator-id": {"agent": {"id": "agent-creator-id", "name": "chatgpt"}}, + } + + class _TaskAwareHttp: + def patch(self, *a, **k): # not used in send path + raise AssertionError("send should not PATCH") + + def get(self, path: str, *, headers: dict) -> Any: + class _R: + def __init__(self, data): + self._data = data + + def raise_for_status(self): + return None + + def json(self): + return self._data + + if "/tasks/" in path: + return _R(task_payload) + if "/agents/" in path: + aid = path.rsplit("/", 1)[-1] + return _R(agent_payloads.get(aid, {})) + return _R({}) + + fake._http = _TaskAwareHttp() + _install_fake_client(monkeypatch, fake) + + result = runner.invoke( + app, + ["alerts", "send", "check this", "--kind", "reminder", "--source-task", "dfef4c92"], + ) + assert result.exit_code == 0, _strip_ansi(result.stdout) + + # Auto-resolved to assignee (orion) + assert fake.sent["metadata"]["alert"]["target_agent"] == "orion" + assert fake.sent["content"].startswith("@orion "), "auto-target must @-mention assignee" + + +def test_source_task_falls_back_to_creator_when_no_assignee(monkeypatch): + fake = _FakeClient() + + task_payload = { + "task": { + "id": "t-noa", + "assignee_id": None, # no assignee + "creator_id": "agent-creator-id", + } + } + agent_payloads = { + "agent-creator-id": {"agent": {"id": "agent-creator-id", "name": "madtank"}}, + } + + class _TaskAwareHttp: + def patch(self, *a, **k): + raise AssertionError("unreachable") + + def get(self, path: str, *, headers: dict) -> Any: + class _R: + def __init__(self, data): + self._data = data + + def raise_for_status(self): + return None + + def json(self): + return self._data + + if "/tasks/" in path: + return _R(task_payload) + if "/agents/" in path: + aid = path.rsplit("/", 1)[-1] + return _R(agent_payloads.get(aid, {})) + return _R({}) + + fake._http = _TaskAwareHttp() + _install_fake_client(monkeypatch, fake) + + result = runner.invoke( + app, + ["alerts", "send", "check", "--kind", "reminder", "--source-task", "t-noa"], + ) + assert result.exit_code == 0, _strip_ansi(result.stdout) + assert fake.sent["metadata"]["alert"]["target_agent"] == "madtank" + + +def test_explicit_target_beats_task_auto_resolution(monkeypatch): + """--target should win over task assignee/creator — explicit override + is important for escalation scenarios.""" + fake = _FakeClient() + + # If the task lookup runs we'd see these values; but --target should short-circuit. + class _ShortCircuitHttp: + def patch(self, *a, **k): + raise AssertionError("unreachable") + + def get(self, path: str, *, headers: dict) -> Any: + # If this is called, auto-resolution is leaking past an explicit --target. + raise AssertionError(f"explicit --target should skip task lookup, but got GET {path}") + + fake._http = _ShortCircuitHttp() + _install_fake_client(monkeypatch, fake) + + result = runner.invoke( + app, + [ + "alerts", + "send", + "escalation", + "--kind", + "reminder", + "--source-task", + "dfef4c92", + "--target", + "@madtank", + ], + ) + assert result.exit_code == 0, _strip_ansi(result.stdout) + assert fake.sent["metadata"]["alert"]["target_agent"] == "madtank" + + +def test_state_change_on_non_alert_message_errors_clearly(monkeypatch): + existing = { + "id": "msg-plain", + "space_id": "space-abc", + "metadata": {}, # no alert block + } + fake = _FakeClient(preloaded_message=existing) + _install_fake_client(monkeypatch, fake) + + bad = runner.invoke(app, ["alerts", "ack", "msg-plain"]) + assert bad.exit_code != 0, "ack on a non-alert message must fail loudly" + + +def test_rejects_pre_2020_timestamps_as_clock_skew(monkeypatch): + """Guard against the 2000-01-01 remind_at class of bugs — a runner with + a frozen/unset clock was producing epoch-adjacent timestamps that + landed as the user-facing reminder time (real case: msg b9fb15b6).""" + fake = _FakeClient() + _install_fake_client(monkeypatch, fake) + + bad_remind = runner.invoke( + app, + [ + "alerts", + "send", + "clock-skew test", + "--target", + "orion", + "--remind-at", + "2000-01-01T00:00:00Z", + ], + ) + assert bad_remind.exit_code != 0, "must reject remind_at before 2020 — caller has a broken clock" + assert "broken clock" in _strip_ansi(bad_remind.stdout + (bad_remind.stderr or "")) + + bad_due = runner.invoke( + app, + [ + "alerts", + "send", + "clock-skew test", + "--target", + "orion", + "--due-at", + "1999-12-31T23:59:59Z", + ], + ) + assert bad_due.exit_code != 0 + + # Gibberish timestamps also get rejected with a clear message + malformed = runner.invoke( + app, + ["alerts", "send", "bad iso", "--target", "orion", "--remind-at", "not-a-date"], + ) + assert malformed.exit_code != 0 + assert "ISO-8601" in _strip_ansi(malformed.stdout + (malformed.stderr or "")) + + +def test_valid_future_timestamps_accepted(monkeypatch): + fake = _FakeClient() + _install_fake_client(monkeypatch, fake) + + ok = runner.invoke( + app, + [ + "alerts", + "send", + "ok", + "--kind", + "reminder", + "--source-task", + "t1", + "--target", + "orion", + "--remind-at", + "2026-04-16T17:00:00Z", + "--due-at", + "2026-04-16T20:00:00Z", + ], + ) + assert ok.exit_code == 0, _strip_ansi(ok.stdout) + assert fake.sent["metadata"]["alert"]["remind_at"] == "2026-04-16T17:00:00Z" + assert fake.sent["metadata"]["alert"]["due_at"] == "2026-04-16T20:00:00Z" + + +def test_reminder_defaults_response_required_true(monkeypatch): + """Reminders are work nudges — the recipient is expected to ack or + snooze. Default response_required=true so the card shows a Required + chip. Alerts (--kind alert) stay opt-in.""" + fake = _FakeClient() + _install_fake_client(monkeypatch, fake) + + # Reminder with no explicit --response-required + r1 = runner.invoke( + app, + [ + "alerts", + "send", + "nudge", + "--kind", + "reminder", + "--source-task", + "t1", + "--target", + "orion", + ], + ) + assert r1.exit_code == 0 + assert fake.sent["metadata"]["alert"]["response_required"] is True, ( + "reminders should default to response_required=true" + ) + + # Plain alert should NOT auto-set response_required + fake2 = _FakeClient() + _install_fake_client(monkeypatch, fake2) + r2 = runner.invoke( + app, + ["alerts", "send", "heads up", "--target", "orion"], + ) + assert r2.exit_code == 0 + assert fake2.sent["metadata"]["alert"]["response_required"] is False, ( + "alerts stay opt-in — only reminders default-true" + ) + + +def test_json_output_returns_send_response(monkeypatch): + fake = _FakeClient() + _install_fake_client(monkeypatch, fake) + + result = runner.invoke( + app, + ["alerts", "send", "test", "--target", "@orion", "--json"], + ) + assert result.exit_code == 0 + data = json.loads(result.stdout) + assert data["id"] == "msg-42" diff --git a/tests/test_channel.py b/tests/test_channel.py index f5a7d28..f86add9 100644 --- a/tests/test_channel.py +++ b/tests/test_channel.py @@ -1,8 +1,9 @@ """Tests for the Claude Code channel bridge identity boundary.""" import asyncio +import os -from ax_cli.commands.channel import ChannelBridge +from ax_cli.commands.channel import ChannelBridge, MentionEvent, _load_channel_env from ax_cli.commands.listen import _is_self_authored, _remember_reply_anchor, _should_respond @@ -12,14 +13,26 @@ def __init__(self, token: str = "axp_a_AgentKey.Secret", *, agent_id: str = "age self.agent_id = agent_id self._use_exchange = token.startswith("axp_") self.sent = [] + self.processing_statuses = [] def send_message(self, space_id, content, *, parent_id=None, **kwargs): self.sent.append({"space_id": space_id, "content": content, "parent_id": parent_id, **kwargs}) return {"message": {"id": "msg-123"}} + def set_agent_processing_status(self, message_id, status, *, agent_name=None, space_id=None): + self.processing_statuses.append( + { + "message_id": message_id, + "status": status, + "agent_name": agent_name, + "space_id": space_id, + } + ) + return {"ok": True, "status": status} + class CaptureBridge(ChannelBridge): - def __init__(self, client, *, agent_id="agent-123"): + def __init__(self, client, *, agent_id="agent-123", processing_status=True): super().__init__( client=client, agent_name="anvil", @@ -27,6 +40,7 @@ def __init__(self, client, *, agent_id="agent-123"): space_id="space-123", queue_size=10, debug=False, + processing_status=processing_status, ) self.writes = [] @@ -65,11 +79,148 @@ def test_channel_sends_with_agent_bound_pat(): ) assert client.sent == [{"space_id": "space-123", "content": "hello", "parent_id": "incoming-123"}] + assert client.processing_statuses == [ + { + "message_id": "incoming-123", + "status": "completed", + "agent_name": "anvil", + "space_id": "space-123", + } + ] result = bridge.writes[0]["result"] assert result["content"][0]["text"] == "sent reply to incoming-123 (msg-123)" assert "msg-123" in bridge._reply_anchor_ids +def test_channel_can_publish_working_status_on_delivery(): + client = FakeClient("axp_a_AgentKey.Secret") + bridge = CaptureBridge(client) + + asyncio.run(bridge.publish_processing_status("incoming-123", "working")) + + assert client.processing_statuses == [ + { + "message_id": "incoming-123", + "status": "working", + "agent_name": "anvil", + "space_id": "space-123", + } + ] + + +def test_channel_processing_status_can_be_disabled(): + client = FakeClient("axp_a_AgentKey.Secret") + bridge = CaptureBridge(client, processing_status=False) + + asyncio.run(bridge.publish_processing_status("incoming-123", "working")) + + assert client.processing_statuses == [] + + +def test_channel_returns_empty_optional_mcp_lists(): + client = FakeClient("axp_a_AgentKey.Secret") + bridge = CaptureBridge(client) + + asyncio.run(bridge.handle_request({"id": 1, "method": "resources/list"})) + asyncio.run(bridge.handle_request({"id": 2, "method": "resources/templates/list"})) + asyncio.run(bridge.handle_request({"id": 3, "method": "prompts/list"})) + + assert bridge.writes == [ + {"jsonrpc": "2.0", "id": 1, "result": {"resources": []}}, + {"jsonrpc": "2.0", "id": 2, "result": {"resourceTemplates": []}}, + {"jsonrpc": "2.0", "id": 3, "result": {"prompts": []}}, + ] + + +def test_channel_tools_include_polling_fallback(): + client = FakeClient("axp_a_AgentKey.Secret") + bridge = CaptureBridge(client) + + asyncio.run(bridge.handle_tools_list(1)) + + tools = bridge.writes[0]["result"]["tools"] + assert {tool["name"] for tool in tools} == {"reply", "get_messages"} + + +def test_channel_get_messages_returns_pending_mentions(): + client = FakeClient("axp_a_AgentKey.Secret") + bridge = CaptureBridge(client) + bridge._pending_mentions.append( + MentionEvent( + message_id="incoming-123", + parent_id=None, + conversation_id=None, + author="madtank", + prompt="please check this", + raw_content="@anvil please check this", + created_at="2026-04-15T23:00:00Z", + space_id="space-123", + ) + ) + + asyncio.run(bridge.handle_tool_call(1, {"name": "get_messages", "arguments": {"limit": 1}})) + + result = bridge.writes[0]["result"] + assert "incoming-123" in result["content"][0]["text"] + assert "please check this" in result["content"][0]["text"] + assert bridge._pending_mentions == [] + + +def test_channel_notification_metadata_matches_claude_channel_contract(): + async def run(): + client = FakeClient("axp_a_AgentKey.Secret") + bridge = CaptureBridge(client) + bridge.initialized.set() + await bridge.mention_queue.put( + MentionEvent( + message_id="incoming-123", + parent_id=None, + conversation_id="conversation-ignored", + author="madtank", + prompt="please check this", + raw_content="@anvil please check this", + created_at=None, + space_id="space-123", + ) + ) + task = asyncio.create_task(bridge.emit_mentions()) + await asyncio.wait_for(bridge.mention_queue.join(), timeout=1) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + return bridge + + bridge = asyncio.run(run()) + + payload = bridge.writes[0] + assert payload["method"] == "notifications/claude/channel" + meta = payload["params"]["meta"] + assert meta["message_id"] == "incoming-123" + assert isinstance(meta["ts"], str) + assert meta["ts"] + assert "raw_content" not in meta + assert "conversation_id" not in meta + assert "parent_id" not in meta + + +def test_channel_env_file_sets_missing_runtime_env(monkeypatch, tmp_path): + env_file = tmp_path / ".env" + env_file.write_text( + "AX_CONFIG_FILE=/tmp/agent/.ax/config.toml\n" + "AX_SPACE_ID=space-123\n" + "AX_AGENT_NAME=ignored-agent\n" + ) + monkeypatch.setenv("AX_AGENT_NAME", "existing-agent") + + _load_channel_env(env_file) + + assert os.environ["AX_CONFIG_FILE"] == "/tmp/agent/.ax/config.toml" + assert os.environ["AX_SPACE_ID"] == "space-123" + assert os.environ["AX_AGENT_NAME"] == "existing-agent" + + def test_listener_treats_parent_reply_as_delivery_signal(): anchors = {"agent-message-1"} data = { diff --git a/tests/test_config.py b/tests/test_config.py index d17c955..d07e47f 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -116,6 +116,39 @@ def test_local_overrides_global(self, tmp_path, monkeypatch): assert cfg["agent_id"] == "local-agent" # local wins assert cfg["base_url"] == "https://global.example.com" # global preserved + def test_ax_config_file_overrides_local_runtime_config(self, tmp_path, monkeypatch): + local_ax = tmp_path / ".ax" + local_ax.mkdir() + (local_ax / "config.toml").write_text( + 'token = "axp_a_local.secret"\n' + 'base_url = "https://local.example.com"\n' + 'agent_name = "local-agent"\n' + 'agent_id = "agent-local"\n' + ) + runtime_dir = tmp_path / "runtime" + runtime_dir.mkdir() + token_file = runtime_dir / "agent.pat" + token_file.write_text("axp_a_runtime.secret") + runtime_config = runtime_dir / "config.toml" + runtime_config.write_text( + f'token_file = "{token_file.name}"\n' + 'base_url = "https://next.paxai.app"\n' + 'agent_name = "orion"\n' + 'agent_id = "agent-orion"\n' + 'space_id = "space-next"\n' + ) + monkeypatch.chdir(tmp_path) + monkeypatch.setenv("AX_CONFIG_FILE", str(runtime_config)) + + cfg = _load_config() + + assert cfg["token"] == "axp_a_runtime.secret" + assert cfg["base_url"] == "https://next.paxai.app" + assert cfg["agent_name"] == "orion" + assert cfg["agent_id"] == "agent-orion" + assert cfg["space_id"] == "space-next" + assert cfg["principal_type"] == "agent" + def test_user_login_config_is_fallback_without_local_config(self, tmp_path, monkeypatch): global_dir = tmp_path / "global" global_dir.mkdir() @@ -421,6 +454,15 @@ def test_env_var_wins(self, monkeypatch): monkeypatch.setenv("AX_TOKEN", "env-token") assert resolve_token() == "env-token" + def test_ax_token_file_wins_when_no_direct_env_token(self, tmp_path, monkeypatch, write_config): + write_config(token="config-token") + token_file = tmp_path / "agent.pat" + token_file.write_text("file-token") + monkeypatch.setenv("AX_TOKEN_FILE", str(token_file)) + monkeypatch.chdir(tmp_path) + + assert resolve_token() == "file-token" + def test_falls_back_to_config(self, tmp_path, monkeypatch, write_config): write_config(token="config-token") monkeypatch.chdir(tmp_path) diff --git a/tests/test_context_commands.py b/tests/test_context_commands.py index 3076ae4..93c2d61 100644 --- a/tests/test_context_commands.py +++ b/tests/test_context_commands.py @@ -1,3 +1,6 @@ +import json +from pathlib import Path + from typer.testing import CliRunner from ax_cli.commands import context @@ -326,3 +329,60 @@ def send_message(self, space_id, content): assert result.exit_code == 0, result.output assert calls["message"]["content"] == "@mcp_sentinel Context updated: `spec:cli`" assert '"message_id": "msg-1"' in result.output + + +def test_context_fetch_url_upload_stores_renderable_file_upload(monkeypatch): + calls = {} + + class FakeResponse: + headers = {"content-type": "text/markdown; charset=utf-8"} + content = b"# Article\nFetched markdown.\n" + text = "# Article\nFetched markdown.\n" + + def raise_for_status(self): + return None + + class FakeClient: + def upload_file(self, path, *, space_id=None): + calls["upload"] = {"path": path, "space_id": space_id} + assert Path(path).name == "article.md" + assert Path(path).read_bytes() == FakeResponse.content + return { + "attachment_id": "att-1", + "url": "/api/v1/uploads/files/article.md", + "content_type": "text/markdown", + "size": len(FakeResponse.content), + "original_filename": "article.md", + } + + def set_context(self, space_id, key, value, *, ttl=None): + calls["context"] = {"space_id": space_id, "key": key, "value": value, "ttl": ttl} + return {"status": "stored"} + + monkeypatch.setattr(context, "get_client", lambda: FakeClient()) + monkeypatch.setattr(context, "resolve_space_id", lambda client, explicit=None: "space-1") + monkeypatch.setattr(context.httpx, "get", lambda *args, **kwargs: FakeResponse()) + + result = runner.invoke( + app, + [ + "context", + "fetch-url", + "https://example.com/article.md", + "--upload", + "--key", + "article", + "--json", + ], + ) + + assert result.exit_code == 0, result.output + stored = json.loads(calls["context"]["value"]) + assert stored["type"] == "file_upload" + assert stored["filename"] == "article.md" + assert stored["source"] == "url_fetch" + assert stored["source_url"] == "https://example.com/article.md" + assert stored["content"] == FakeResponse.text + assert calls["context"]["key"] == "article" + output = json.loads(result.output[result.output.index("{") :]) + assert output["type"] == "file_upload" diff --git a/tests/test_credentials_commands.py b/tests/test_credentials_commands.py new file mode 100644 index 0000000..99b0203 --- /dev/null +++ b/tests/test_credentials_commands.py @@ -0,0 +1,96 @@ +import json + +from typer.testing import CliRunner + +from ax_cli.commands.credentials import build_credential_audit +from ax_cli.main import app + +runner = CliRunner() + + +def _credential(agent_id: str, credential_id: str, *, state: str = "active", created_at: str = "2026-04-15T00:00:00Z"): + return { + "credential_id": credential_id, + "key_id": f"key-{credential_id}", + "name": f"credential {credential_id}", + "bound_agent_id": agent_id, + "audience": "both", + "lifecycle_state": state, + "created_at": created_at, + "expires_at": "2026-05-15T00:00:00Z", + "last_used_at": None, + } + + +def test_build_credential_audit_classifies_active_agent_pat_counts(): + report = build_credential_audit( + [ + _credential("agent-ok", "ok-1"), + _credential("agent-rotate", "rotate-1", created_at="2026-04-14T00:00:00Z"), + _credential("agent-rotate", "rotate-2", created_at="2026-04-15T00:00:00Z"), + _credential("agent-cleanup", "cleanup-1"), + _credential("agent-cleanup", "cleanup-2"), + _credential("agent-cleanup", "cleanup-3"), + _credential("agent-cleanup", "revoked-ignored", state="revoked"), + {"credential_id": "user-credential", "lifecycle_state": "active", "bound_agent_id": None}, + ] + ) + + by_agent = {agent["agent_id"]: agent for agent in report["agents"]} + assert by_agent["agent-ok"]["status"] == "ok" + assert by_agent["agent-rotate"]["status"] == "rotation_window" + assert by_agent["agent-rotate"]["severity"] == "warning" + assert by_agent["agent-cleanup"]["status"] == "cleanup_required" + assert by_agent["agent-cleanup"]["severity"] == "violation" + assert report["summary"] == { + "agents_checked": 3, + "ok": 1, + "rotation_windows": 1, + "cleanup_required": 1, + } + + +def test_credentials_audit_json_reports_rotation_and_cleanup(monkeypatch): + class FakeClient: + def mgmt_list_credentials(self): + return [ + _credential("agent-rotate", "rotate-1"), + _credential("agent-rotate", "rotate-2"), + _credential("agent-cleanup", "cleanup-1"), + _credential("agent-cleanup", "cleanup-2"), + _credential("agent-cleanup", "cleanup-3"), + ] + + monkeypatch.setattr("ax_cli.commands.credentials.get_client", lambda: FakeClient()) + + result = runner.invoke(app, ["credentials", "audit", "--json"]) + + assert result.exit_code == 0, result.output + report = json.loads(result.output) + assert report["policy"]["max_active_agent_pats"] == 2 + assert report["summary"]["rotation_windows"] == 1 + assert report["summary"]["cleanup_required"] == 1 + + +def test_credentials_audit_strict_fails_only_for_cleanup_required(monkeypatch): + class RotationWindowClient: + def mgmt_list_credentials(self): + return [_credential("agent-rotate", "rotate-1"), _credential("agent-rotate", "rotate-2")] + + monkeypatch.setattr("ax_cli.commands.credentials.get_client", lambda: RotationWindowClient()) + + rotation_result = runner.invoke(app, ["credentials", "audit", "--strict", "--json"]) + assert rotation_result.exit_code == 0, rotation_result.output + + class CleanupClient: + def mgmt_list_credentials(self): + return [ + _credential("agent-cleanup", "cleanup-1"), + _credential("agent-cleanup", "cleanup-2"), + _credential("agent-cleanup", "cleanup-3"), + ] + + monkeypatch.setattr("ax_cli.commands.credentials.get_client", lambda: CleanupClient()) + + cleanup_result = runner.invoke(app, ["credentials", "audit", "--strict", "--json"]) + assert cleanup_result.exit_code == 2, cleanup_result.output diff --git a/tests/test_messages.py b/tests/test_messages.py index 48275d8..9e156f4 100644 --- a/tests/test_messages.py +++ b/tests/test_messages.py @@ -3,6 +3,7 @@ from typer.testing import CliRunner +from ax_cli.commands.messages import _processing_status_from_event from ax_cli.main import app runner = CliRunner() @@ -394,8 +395,13 @@ def send_message(self, space_id, content, *, channel="main", parent_id=None, att } return {"id": "msg-1"} - def fake_wait(client, message_id, timeout=60, wait_label="reply"): - calls["wait"] = {"message_id": message_id, "timeout": timeout, "wait_label": wait_label} + def fake_wait(client, message_id, timeout=60, wait_label="reply", **kwargs): + calls["wait"] = { + "message_id": message_id, + "timeout": timeout, + "wait_label": wait_label, + "processing_watcher": kwargs.get("processing_watcher"), + } return {"id": "reply-1", "content": "ack"} monkeypatch.setattr("ax_cli.commands.messages.get_client", lambda: FakeClient()) @@ -408,6 +414,29 @@ def fake_wait(client, message_id, timeout=60, wait_label="reply"): assert result.exit_code == 0, result.output assert calls["message"]["content"] == "@orion checkpoint" assert calls["wait"]["wait_label"] == "@orion" + assert calls["wait"]["processing_watcher"] is not None + + +def test_processing_status_from_event_matches_message(): + event = _processing_status_from_event( + "msg-1", + "agent_processing", + { + "message_id": "msg-1", + "status": "working", + "agent_id": "agent-1", + "agent_name": "orion", + }, + ) + + assert event == { + "message_id": "msg-1", + "status": "working", + "agent_id": "agent-1", + "agent_name": "orion", + } + assert _processing_status_from_event("msg-2", "agent_processing", {"message_id": "msg-1"}) is None + assert _processing_status_from_event("msg-1", "message", {"message_id": "msg-1"}) is None def test_messages_edit_and_delete_resolve_short_id_prefix(monkeypatch): diff --git a/tests/test_reminders_commands.py b/tests/test_reminders_commands.py new file mode 100644 index 0000000..c64ad70 --- /dev/null +++ b/tests/test_reminders_commands.py @@ -0,0 +1,179 @@ +"""Tests for the local reminder policy runner.""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +from typer.testing import CliRunner + +from ax_cli.main import app + +runner = CliRunner() + + +class _FakeClient: + def __init__(self) -> None: + self.sent: list[dict[str, Any]] = [] + + def send_message( + self, + space_id: str, + content: str, + *, + channel: str = "main", + metadata: dict | None = None, + message_type: str = "text", + **_kwargs: Any, + ) -> dict: + message_id = f"msg-{len(self.sent) + 1}" + self.sent.append( + { + "id": message_id, + "space_id": space_id, + "content": content, + "channel": channel, + "metadata": metadata, + "message_type": message_type, + } + ) + return {"id": message_id} + + +def _install_fake_runtime(monkeypatch, client: _FakeClient) -> None: + monkeypatch.setattr("ax_cli.commands.reminders.get_client", lambda: client) + monkeypatch.setattr( + "ax_cli.commands.reminders.resolve_space_id", + lambda _client, *, explicit=None: explicit or "space-abc", + ) + monkeypatch.setattr( + "ax_cli.commands.reminders.resolve_agent_name", + lambda client=None: "chatgpt", + ) + + +def _load(path: Path) -> dict[str, Any]: + return json.loads(path.read_text()) + + +def test_add_creates_local_policy_file(monkeypatch, tmp_path): + fake = _FakeClient() + _install_fake_runtime(monkeypatch, fake) + policy_file = tmp_path / "reminders.json" + + result = runner.invoke( + app, + [ + "reminders", + "add", + "task-1", + "--reason", + "check this task", + "--target", + "orion", + "--first-in-minutes", + "0", + "--max-fires", + "2", + "--file", + str(policy_file), + "--json", + ], + ) + + assert result.exit_code == 0, result.output + store = _load(policy_file) + assert store["version"] == 1 + assert len(store["policies"]) == 1 + policy = store["policies"][0] + assert policy["source_task_id"] == "task-1" + assert policy["reason"] == "check this task" + assert policy["target"] == "orion" + assert policy["max_fires"] == 2 + assert policy["enabled"] is True + + +def test_run_once_fires_due_policy_and_disables_at_max(monkeypatch, tmp_path): + fake = _FakeClient() + _install_fake_runtime(monkeypatch, fake) + policy_file = tmp_path / "reminders.json" + policy_file.write_text( + json.dumps( + { + "version": 1, + "policies": [ + { + "id": "rem-test", + "enabled": True, + "space_id": "space-abc", + "source_task_id": "task-1", + "reason": "review task state", + "target": "orion", + "severity": "info", + "cadence_seconds": 300, + "next_fire_at": "2026-04-16T00:00:00Z", + "max_fires": 1, + "fired_count": 0, + "fired_keys": [], + } + ], + } + ) + ) + + result = runner.invoke(app, ["reminders", "run", "--once", "--file", str(policy_file), "--json"]) + + assert result.exit_code == 0, result.output + assert len(fake.sent) == 1 + sent = fake.sent[0] + assert sent["message_type"] == "reminder" + assert sent["content"].startswith("@orion Reminder:") + metadata = sent["metadata"] + assert metadata["alert"]["kind"] == "task_reminder" + assert metadata["alert"]["source_task_id"] == "task-1" + assert metadata["alert"]["target_agent"] == "orion" + assert metadata["alert"]["response_required"] is True + assert metadata["reminder_policy"]["policy_id"] == "rem-test" + + stored = _load(policy_file)["policies"][0] + assert stored["enabled"] is False + assert stored["disabled_reason"] == "max_fires reached" + assert stored["fired_count"] == 1 + assert stored["last_message_id"] == "msg-1" + + +def test_run_once_skips_future_policy(monkeypatch, tmp_path): + fake = _FakeClient() + _install_fake_runtime(monkeypatch, fake) + policy_file = tmp_path / "reminders.json" + policy_file.write_text( + json.dumps( + { + "version": 1, + "policies": [ + { + "id": "rem-future", + "enabled": True, + "space_id": "space-abc", + "source_task_id": "task-1", + "reason": "not yet", + "target": "orion", + "cadence_seconds": 300, + "next_fire_at": "2999-01-01T00:00:00Z", + "max_fires": 1, + "fired_count": 0, + "fired_keys": [], + } + ], + } + ) + ) + + result = runner.invoke(app, ["reminders", "run", "--once", "--file", str(policy_file), "--json"]) + + assert result.exit_code == 0, result.output + assert fake.sent == [] + stored = _load(policy_file)["policies"][0] + assert stored["enabled"] is True + assert stored["fired_count"] == 0