From d09569492939e89506c37d64b2f10fcc6c00da65 Mon Sep 17 00:00:00 2001 From: Konrad Walus Date: Fri, 13 Mar 2026 09:24:33 -0700 Subject: [PATCH] release: publish v0.4.80 agent runtime hardening --- CHANGELOG.md | 21 + README.md | 10 +- canopy/__init__.py | 2 +- canopy/api/agent_instructions_data.py | 10 +- canopy/api/routes.py | 146 +++++-- canopy/core/agent_event_subscriptions.py | 293 +++++++++++++ canopy/core/agent_heartbeat.py | 39 +- canopy/core/agent_runtime.py | 9 + canopy/core/events.py | 12 + canopy/core/messaging.py | 28 +- canopy/ui/templates/admin.html | 150 ++++++- docs/AGENT_ONBOARDING.md | 25 +- docs/API_REFERENCE.md | 15 +- docs/GITHUB_RELEASE_ANNOUNCEMENT_DRAFT.md | 36 +- docs/GITHUB_RELEASE_v0.4.79.md | 26 ++ docs/GITHUB_RELEASE_v0.4.80.md | 26 ++ docs/MCP_QUICKSTART.md | 2 +- docs/MENTIONS.md | 2 +- docs/QUICKSTART.md | 2 +- docs/WINDOWS_TRAY.md | 2 +- pyproject.toml | 2 +- tests/test_agent_reliability_endpoints.py | 26 +- tests/test_dm_agent_endpoint_regressions.py | 446 ++++++++++++++++++++ tests/test_workspace_events.py | 260 +++++++++++- 24 files changed, 1513 insertions(+), 77 deletions(-) create mode 100644 canopy/core/agent_event_subscriptions.py create mode 100644 docs/GITHUB_RELEASE_v0.4.79.md create mode 100644 docs/GITHUB_RELEASE_v0.4.80.md diff --git a/CHANGELOG.md b/CHANGELOG.md index ea84936..140a850 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,27 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ## [Unreleased] +## [0.4.80] - 2026-03-13 + +### Changed +- **Actionable inbox queue semantics** - Agent inbox list/count paths, system-health queues, and discovery/runtime summaries now continue treating `seen` items as actionable work until they are actually completed, skipped, or expired. +- **Docs and release alignment refresh** - README pointers, operator quick starts, and the current release notes now reflect the combined `0.4.80` development surface instead of a split `0.4.78`/`0.4.79` state. + +### Fixed +- **Inbox reopen audit preservation** - Reopened inbox items now clear live completion fields without discarding the last terminal resolution status, timestamp, or evidence payload, so operators can reopen work without losing audit context. +- **Quiet-feed and message-authorization hardening** - Explicitly empty workspace-event subscriptions now stay empty, and message-bearing channel event families remain hidden from keys that do not have `READ_MESSAGES`. + +## [0.4.79] - 2026-03-12 + +### Added +- **Durable agent event subscriptions** - Added stored per-agent event family preferences plus `GET/POST /api/v1/agents/me/event-subscriptions`, so long-running agents can keep a low-noise wake feed without resending `types=` on every poll. + +### Changed +- **Agent heartbeat and admin runtime subscription diagnostics** - Heartbeat and admin workspace runtime now expose the active or stored event-subscription view so operators can see whether an agent is running the default feed, a custom feed, or an intentionally quiet one. + +### Fixed +- **Agent event authorization and subscription drift hardening** - Message-bearing channel event families remain permission-filtered, explicit empty subscriptions now stay empty, and heartbeat now preserves non-message custom event families instead of silently dropping them from the reported active feed. + ## [0.4.78] - 2026-03-12 ### Changed diff --git a/README.md b/README.md index 7eb88e9..aba38c0 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@

- Version 0.4.78 + Version 0.4.80 Python 3.10+ Apache 2.0 License ChaCha20-Poly1305 @@ -23,7 +23,7 @@ Get Started · API Reference · Agent Guide · - Release Notes · + Release Notes · Windows Tray · Changelog

@@ -80,12 +80,14 @@ Most chat products treat AI as bolt-on automation hanging off webhooks or extern Recent user-facing changes reflected in the app and docs: +- **Inbox audit and quiet-feed hardening** in `0.4.80`, so `seen` inbox items remain actionable until resolved, reopened items keep their last terminal evidence for operators, intentionally empty agent event subscriptions stay quiet, and message-bearing channel events remain permission-filtered. +- **Durable agent event subscriptions** in `0.4.79`, so agents can store their preferred workspace event families server-side, inspect the effective feed in heartbeat/admin diagnostics, and intentionally run a quiet feed without falling back to defaults. - **Group-DM attachment fan-out hardening** in `0.4.78`, so one slow or dead peer no longer stalls later peers during broadcast mesh delivery and attachment sends no longer block the request thread while fan-out finishes in the background. - **Agent-focused workspace event feed** in `0.4.77`, adding `GET /api/v1/agents/me/events` as a low-noise actionable event route for agent runtimes while keeping human API keys out of agent presence/runtime telemetry. - **Incremental channel-state updates** in `0.4.75`, so the Channels UI now applies common lifecycle, privacy, notification, member-count, and deletion state changes in place instead of forcing a sidebar snapshot refresh for every state event. - **Channel thread cursor isolation hardening** in `0.4.75`, so the active channel thread no longer skips unseen message edit/delete events when unrelated sidebar state events advance first. - **Request coordination reliability hardening** in `0.4.74`, preventing nested SQLite self-locks during request member upsert/update so assignee and reviewer membership persists reliably, while restoring authenticated `/api/v1/info` trust statistics. -- **Docs/version alignment refresh** across `0.4.77` and `0.4.78`, updating the README and current release copy so public-facing pointers match the latest development surface. +- **Docs/version alignment refresh** across `0.4.78` to `0.4.80`, updating the README, operator guides, and current release copy so public-facing pointers match the latest development surface. - **Workspace event journal rollout** across `0.4.69` to `0.4.71`, moving the DM workspace, shared recent-DM rail, and channel sidebar onto journal-driven change detection while preserving the existing snapshot render paths and safety resync behavior. - **Event-consumer race hardening** in `0.4.69` to `0.4.71`, so the DM thread view, recent-DM rail, and channel sidebar now capture their workspace-event cursors before rebuilding snapshot state and do not advance past unseen changes during concurrent activity. - **Structured block correction feedback** in `0.4.68`, so feed and channel composer send paths now reject semantically incomplete canonical `signal` and `request` blocks before save and surface explicit correction feedback instead of silently materializing nothing. @@ -520,7 +522,7 @@ Guides: [docs/CONNECT_FAQ.md](docs/CONNECT_FAQ.md) and [docs/PEER_CONNECT_GUIDE. | [docs/MENTIONS.md](docs/MENTIONS.md) | Mentions polling and SSE for agents | | [docs/WINDOWS_TRAY.md](docs/WINDOWS_TRAY.md) | Windows tray runtime and installer flow | | [docs/IDENTITY_PORTABILITY_TESTING.md](docs/IDENTITY_PORTABILITY_TESTING.md) | Feature-flagged identity portability admin workflow | -| [docs/GITHUB_RELEASE_v0.4.78.md](docs/GITHUB_RELEASE_v0.4.78.md) | Product-forward GitHub release copy for the current release candidate | +| [docs/GITHUB_RELEASE_v0.4.79.md](docs/GITHUB_RELEASE_v0.4.79.md) | Product-forward GitHub release copy for the current release candidate | | [docs/GITHUB_RELEASE_TEMPLATE.md](docs/GITHUB_RELEASE_TEMPLATE.md) | Baseline structure for future public GitHub release notes | | [docs/RELEASE_NOTES_0.4.0.md](docs/RELEASE_NOTES_0.4.0.md) | Historical publish-ready `0.4.0` release notes copy | | [docs/SECURITY_ASSESSMENT.md](docs/SECURITY_ASSESSMENT.md) | Threat model and security assessment | diff --git a/canopy/__init__.py b/canopy/__init__.py index 790cec2..4d78a3a 100644 --- a/canopy/__init__.py +++ b/canopy/__init__.py @@ -11,7 +11,7 @@ Development: AI-assisted implementation (Claude, Codex, GitHub Copilot, Cursor IDE, Ollama) """ -__version__ = "0.4.78" +__version__ = "0.4.80" __protocol_version__ = 1 __author__ = "Canopy Contributors" __license__ = "Apache-2.0" diff --git a/canopy/api/agent_instructions_data.py b/canopy/api/agent_instructions_data.py index 8c4b494..51dccd0 100644 --- a/canopy/api/agent_instructions_data.py +++ b/canopy/api/agent_instructions_data.py @@ -44,8 +44,8 @@ def build_agent_instructions_payload(base: str, version: str) -> dict: 'Mention events: poll GET /api/v1/mentions or stream GET /api/v1/mentions/stream (SSE). Claim a mention source with POST /api/v1/mentions/claim (mention_id, inbox_id, or source_type+source_id) before replying to avoid duplicate agent pile-ons. Compatibility aliases POST /api/v1/claim and POST /api/v1/acknowledge are also accepted, plus the legacy /api prefix. Acknowledge with POST /api/v1/mentions/ack.', 'Agent action inbox (pull-first triggers): GET /api/v1/agents/me/inbox, PATCH to mark seen/completed/skipped/pending; `handled` remains a backward-compatible alias for `completed`. `expired` is system-only and will be rejected. When completing or skipping, attach `completion_ref` so Admin can verify the linked output. DM inbox items include `message_id`, `sender_user_id`, and `dm_thread_id`; for `trigger_type: "dm"` prefer POST /api/v1/messages/reply with the inbox item `message_id` instead of guessing a channel target.', 'Inbox rebuild (catch-up): POST /api/v1/agents/me/inbox/rebuild (or canopy_rebuild_inbox) scans channel history and creates any missing inbox items — call on startup after an offline period.', - 'Heartbeat: GET /api/v1/agents/me/heartbeat returns mention/inbox counters plus actionable workload fields (`needs_action`, `poll_hint_seconds`, active assigned tasks/objectives/requests/handoffs), legacy cursor hints (`last_mention_id`, `last_event_seq`), and the additive journal cursor (`workspace_event_seq`).', - 'Workspace events: prefer GET /api/v1/agents/me/events for a low-noise actionable event feed keyed to agent work; it defaults to DM/mention/inbox/attachment events and accepts `after_seq`, `limit`, and optional `types`. GET /api/v1/events remains available as the broader local workspace journal.', + 'Heartbeat: GET /api/v1/agents/me/heartbeat returns mention/inbox counters plus actionable workload fields (`needs_action`, `poll_hint_seconds`, active assigned tasks/objectives/requests/handoffs), legacy cursor hints (`last_mention_id`, `last_event_seq`), the additive journal cursor (`workspace_event_seq`), and the active event-subscription summary (`event_subscription_source`, `event_subscription_types`, `event_subscription_unavailable_types`).', + 'Workspace events: prefer GET /api/v1/agents/me/events for a low-noise actionable event feed keyed to agent work; it defaults to DM/mention/inbox/attachment events and accepts `after_seq`, `limit`, and optional `types`. Use GET/POST /api/v1/agents/me/event-subscriptions to persist your preferred event families; stored subscriptions only narrow the feed and never widen authorization. GET /api/v1/events remains available as the broader local workspace journal.', 'Agent discovery: GET /api/v1/agents returns stable mention handles and optional skill/capability tags for routing.', 'System health: GET /api/v1/agents/system-health returns queue + peer + uptime diagnostics for operational monitoring.', 'Catchup digest: GET /api/v1/agents/me/catchup for a summarized view of new feed/channel activity, mentions, inbox, tasks, circles, and handoffs.', @@ -377,6 +377,12 @@ def build_agent_instructions_payload(base: str, version: str) -> dict: 'params': ['after_seq', 'limit', 'types'], 'description': 'Low-noise actionable event feed for agent runtimes. Defaults to DM/mention/inbox/attachment event families and updates agent runtime telemetry.', }, + 'event_subscriptions': { + 'method': 'GET|POST', + 'path': '/api/v1/agents/me/event-subscriptions', + 'params': ['types', 'reset'], + 'description': 'Get or persist the agent event-feed preferences. Stored subscriptions only narrow the feed and never widen authorization.', + }, 'update_batch': {'method': 'PATCH', 'path': '/api/v1/agents/me/inbox', 'body': {'ids': [''], 'status': 'seen|completed|skipped|pending (or legacy alias handled)', 'completion_ref': {'source_type': 'channel_message', 'source_id': ''}}}, 'update_one': {'method': 'PATCH', 'path': '/api/v1/agents/me/inbox/', 'body': {'status': 'seen|completed|skipped|pending (or legacy alias handled)', 'completion_ref': {'source_type': 'feed_post', 'source_id': ''}}}, 'rebuild': { diff --git a/canopy/api/routes.py b/canopy/api/routes.py index bccd10f..c2cefd9 100644 --- a/canopy/api/routes.py +++ b/canopy/api/routes.py @@ -52,6 +52,9 @@ from ..core.events import ( PATCH1_EVENT_TYPES, EVENT_ATTACHMENT_AVAILABLE, + EVENT_CHANNEL_MESSAGE_CREATED, + EVENT_CHANNEL_MESSAGE_DELETED, + EVENT_CHANNEL_MESSAGE_EDITED, EVENT_DM_MESSAGE_CREATED, EVENT_DM_MESSAGE_DELETED, EVENT_DM_MESSAGE_EDITED, @@ -66,6 +69,15 @@ build_agent_presence_payload, ) from ..core.agent_runtime import record_agent_runtime_state +from ..core.agent_event_subscriptions import ( + AGENT_DEFAULT_EVENT_TYPES, + AGENT_MESSAGE_EVENT_TYPES, + AGENT_SUPPORTED_EVENT_TYPES, + get_agent_event_subscriptions, + reset_agent_event_subscriptions, + resolve_agent_event_subscription, + set_agent_event_subscriptions, +) from ..core.file_preview import build_file_preview from ..security.api_keys import Permission from ..security.csrf import validate_csrf_request @@ -94,16 +106,6 @@ logger = logging.getLogger(__name__) API_BOOT_TIME = datetime.now(timezone.utc) -AGENT_DEFAULT_EVENT_TYPES = { - EVENT_ATTACHMENT_AVAILABLE, - EVENT_DM_MESSAGE_CREATED, - EVENT_DM_MESSAGE_DELETED, - EVENT_DM_MESSAGE_EDITED, - EVENT_INBOX_ITEM_CREATED, - EVENT_INBOX_ITEM_UPDATED, - EVENT_MENTION_ACKNOWLEDGED, - EVENT_MENTION_CREATED, -} def _get_app_components_any(app: Any) -> tuple[Any, ...]: @@ -5883,21 +5885,10 @@ def agent_events(): wake on concrete work without scraping the broader user event feed. """ try: - workspace_event_manager = current_app.config.get('WORKSPACE_EVENT_MANAGER') - if not workspace_event_manager: - return jsonify({ - 'items': [], - 'after_seq': 0, - 'next_after_seq': 0, - 'latest_seq': 0, - 'has_more': False, - 'supported_types': sorted(PATCH1_EVENT_TYPES), - 'applied_types': sorted(AGENT_DEFAULT_EVENT_TYPES), - 'mode': 'agent', - }) - + db_manager, _, _, _, _, _, _, _, _, _, _ = _get_app_components_any(current_app) user_id = g.api_key_info.user_id _touch_agent_presence(user_id, 'events') + can_read_messages = bool(g.api_key_info.has_permission(Permission.READ_MESSAGES)) after_seq_raw = request.args.get('after_seq', '0') try: @@ -5913,9 +5904,36 @@ def agent_events(): item for item in (str(raw).strip() for raw in types_raw) if item and item in PATCH1_EVENT_TYPES ] - applied_types = sorted(requested_types or AGENT_DEFAULT_EVENT_TYPES) - - can_read_messages = bool(g.api_key_info.has_permission(Permission.READ_MESSAGES)) + subscription = resolve_agent_event_subscription( + requested_types=requested_types, + stored_types=get_agent_event_subscriptions(db_manager, user_id), + default_types=AGENT_DEFAULT_EVENT_TYPES, + message_required_types=AGENT_MESSAGE_EVENT_TYPES, + supported_types=AGENT_SUPPORTED_EVENT_TYPES, + can_read_messages=can_read_messages, + ) + applied_types = subscription['effective_types'] + workspace_event_manager = current_app.config.get('WORKSPACE_EVENT_MANAGER') + if not workspace_event_manager: + _touch_agent_runtime( + user_id, + event_fetch=True, + event_cursor_seen=after_seq_value, + ) + return jsonify({ + 'items': [], + 'after_seq': after_seq_value, + 'next_after_seq': after_seq_value, + 'latest_seq': 0, + 'has_more': False, + 'supported_types': sorted(AGENT_SUPPORTED_EVENT_TYPES), + 'applied_types': applied_types, + 'selected_types': subscription['selected_types'], + 'stored_types': subscription['stored_types'], + 'unavailable_types': subscription['unavailable_types'], + 'subscription_source': subscription['subscription_source'], + 'mode': 'agent', + }) result = workspace_event_manager.list_events_for_user( user_id=user_id, after_seq=after_seq_value, @@ -5934,14 +5952,87 @@ def agent_events(): 'next_after_seq': int(result.get('next_after_seq') or 0), 'latest_seq': int(workspace_event_manager.get_latest_seq() or 0), 'has_more': bool(result.get('has_more', False)), - 'supported_types': sorted(PATCH1_EVENT_TYPES), + 'supported_types': sorted(AGENT_SUPPORTED_EVENT_TYPES), 'applied_types': applied_types, + 'selected_types': subscription['selected_types'], + 'stored_types': subscription['stored_types'], + 'unavailable_types': subscription['unavailable_types'], + 'subscription_source': subscription['subscription_source'], 'mode': 'agent', }) except Exception as e: logger.error(f"Agent workspace events failed: {e}") return jsonify({'error': 'Internal server error'}), 500 + @api.route('/agents/me/event-subscriptions', methods=['GET', 'POST']) + @require_auth(Permission.READ_FEED) + def agent_event_subscriptions(): + """Get or update per-agent event-feed preferences.""" + try: + db_manager, _, _, _, _, _, _, _, _, _, _ = _get_app_components_any(current_app) + user_id = g.api_key_info.user_id + can_read_messages = bool(g.api_key_info.has_permission(Permission.READ_MESSAGES)) + + if request.method == 'POST': + payload = request.get_json(silent=True) or {} + reset = bool(payload.get('reset')) + raw_types = payload.get('types', []) + if raw_types is None: + raw_types = [] + if isinstance(raw_types, str): + raw_types = [part.strip() for part in raw_types.split(',')] + if not isinstance(raw_types, list): + return jsonify({'error': 'types must be a list or comma-separated string'}), 400 + requested_types = [ + item for item in (str(raw).strip() for raw in raw_types) + if item and item in PATCH1_EVENT_TYPES + ] + invalid_types = sorted( + { + str(raw).strip() + for raw in raw_types + if str(raw).strip() and str(raw).strip() not in AGENT_SUPPORTED_EVENT_TYPES + } + ) + if invalid_types: + return jsonify({ + 'error': 'Unsupported event types', + 'invalid_types': invalid_types, + 'supported_types': sorted(AGENT_SUPPORTED_EVENT_TYPES), + }), 400 + if reset: + reset_agent_event_subscriptions(db_manager, user_id) + stored_types = None + else: + stored_types = set_agent_event_subscriptions(db_manager, user_id, requested_types) + else: + stored_types = get_agent_event_subscriptions(db_manager, user_id) + + subscription = resolve_agent_event_subscription( + requested_types=[], + stored_types=stored_types, + default_types=AGENT_DEFAULT_EVENT_TYPES, + message_required_types=AGENT_MESSAGE_EVENT_TYPES, + supported_types=AGENT_SUPPORTED_EVENT_TYPES, + can_read_messages=can_read_messages, + ) + response_payload = { + 'supported_types': subscription['supported_types'], + 'default_types': subscription['default_types'], + 'selected_types': subscription['selected_types'], + 'stored_types': subscription['stored_types'], + 'effective_types': subscription['effective_types'], + 'unavailable_types': subscription['unavailable_types'], + 'subscription_source': subscription['subscription_source'], + 'mode': 'agent', + } + if request.method == 'POST': + response_payload['updated'] = True + return jsonify(response_payload) + except Exception as e: + logger.error(f"Agent event subscriptions failed: {e}") + return jsonify({'error': 'Internal server error'}), 500 + @api.route('/agents/me/heartbeat', methods=['GET']) @require_auth(Permission.READ_FEED) def agent_heartbeat(): @@ -5962,6 +6053,7 @@ def agent_heartbeat(): mention_manager=mention_manager, inbox_manager=inbox_manager, workspace_event_manager=current_app.config.get('WORKSPACE_EVENT_MANAGER'), + can_read_messages=bool(getattr(g.api_key_info, 'has_permission', lambda _p: False)(Permission.READ_MESSAGES)), ) return jsonify(snapshot) except Exception as e: diff --git a/canopy/core/agent_event_subscriptions.py b/canopy/core/agent_event_subscriptions.py new file mode 100644 index 0000000..ddbc58c --- /dev/null +++ b/canopy/core/agent_event_subscriptions.py @@ -0,0 +1,293 @@ +"""Agent event subscription preferences. + +Stores per-agent preferred workspace event families for low-noise wake loops. +Subscriptions may only narrow the event stream; permission filtering still +applies at read time and can remove event families from the effective feed. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any, Dict, Iterable, List, Optional, Sequence, Set + +from .events import ( + PATCH1_EVENT_TYPES, + EVENT_ATTACHMENT_AVAILABLE, + EVENT_CHANNEL_MESSAGE_CREATED, + EVENT_CHANNEL_MESSAGE_DELETED, + EVENT_CHANNEL_MESSAGE_EDITED, + EVENT_DM_MESSAGE_CREATED, + EVENT_DM_MESSAGE_DELETED, + EVENT_DM_MESSAGE_EDITED, + EVENT_INBOX_ITEM_CREATED, + EVENT_INBOX_ITEM_UPDATED, + EVENT_MENTION_ACKNOWLEDGED, + EVENT_MENTION_CREATED, +) + + +_SQLITE_TS_FORMAT = "%Y-%m-%d %H:%M:%S.%f" + +AGENT_DEFAULT_EVENT_TYPES = { + EVENT_ATTACHMENT_AVAILABLE, + EVENT_DM_MESSAGE_CREATED, + EVENT_DM_MESSAGE_DELETED, + EVENT_DM_MESSAGE_EDITED, + EVENT_MENTION_CREATED, + EVENT_MENTION_ACKNOWLEDGED, + EVENT_INBOX_ITEM_CREATED, + EVENT_INBOX_ITEM_UPDATED, +} + +AGENT_MESSAGE_EVENT_TYPES = { + EVENT_ATTACHMENT_AVAILABLE, + EVENT_DM_MESSAGE_CREATED, + EVENT_DM_MESSAGE_DELETED, + EVENT_DM_MESSAGE_EDITED, + EVENT_CHANNEL_MESSAGE_CREATED, + EVENT_CHANNEL_MESSAGE_EDITED, + EVENT_CHANNEL_MESSAGE_DELETED, +} + +# Keep the full agent-visible subscription surface centralized so routes, +# heartbeat, and diagnostics do not drift as new workspace event families land. +AGENT_SUPPORTED_EVENT_TYPES = set(PATCH1_EVENT_TYPES) + + +def ensure_agent_event_subscription_schema(db_manager: Any) -> None: + if not db_manager: + return + with db_manager.get_connection() as conn: + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS agent_event_subscription_state ( + user_id TEXT PRIMARY KEY, + custom_enabled INTEGER NOT NULL DEFAULT 0, + updated_at TIMESTAMP NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_agent_event_subscription_state_enabled + ON agent_event_subscription_state(custom_enabled, updated_at); + CREATE TABLE IF NOT EXISTS agent_event_subscriptions ( + user_id TEXT NOT NULL, + event_type TEXT NOT NULL, + updated_at TIMESTAMP NOT NULL, + PRIMARY KEY (user_id, event_type) + ); + CREATE INDEX IF NOT EXISTS idx_agent_event_subscriptions_user + ON agent_event_subscriptions(user_id, updated_at); + """ + ) + conn.commit() + + +def get_agent_event_subscriptions(db_manager: Any, user_id: str) -> Optional[List[str]]: + if not db_manager or not user_id: + return None + try: + ensure_agent_event_subscription_schema(db_manager) + with db_manager.get_connection() as conn: + state_row = conn.execute( + """ + SELECT custom_enabled + FROM agent_event_subscription_state + WHERE user_id = ? + """, + (user_id,), + ).fetchone() + if not state_row or not int(state_row["custom_enabled"] or 0): + return None + rows = conn.execute( + """ + SELECT event_type + FROM agent_event_subscriptions + WHERE user_id = ? + ORDER BY event_type ASC + """, + (user_id,), + ).fetchall() + if rows is None: + return None + return [str(row["event_type"] or "").strip() for row in rows if str(row["event_type"] or "").strip()] + except Exception: + return None + + +def get_agent_event_subscription_state(db_manager: Any, user_id: str) -> Dict[str, Any]: + if not db_manager or not user_id: + return { + "custom_enabled": False, + "stored_types": None, + "updated_at": None, + } + try: + ensure_agent_event_subscription_schema(db_manager) + with db_manager.get_connection() as conn: + state_row = conn.execute( + """ + SELECT custom_enabled, updated_at + FROM agent_event_subscription_state + WHERE user_id = ? + """, + (user_id,), + ).fetchone() + rows = conn.execute( + """ + SELECT event_type + FROM agent_event_subscriptions + WHERE user_id = ? + ORDER BY event_type ASC + """, + (user_id,), + ).fetchall() + stored_types = [ + str(row["event_type"] or "").strip() + for row in (rows or []) + if str(row["event_type"] or "").strip() + ] + custom_enabled = bool(state_row and int(state_row["custom_enabled"] or 0)) + updated_at = str(state_row["updated_at"] or "").strip() if state_row and state_row["updated_at"] else None + return { + "custom_enabled": custom_enabled, + "stored_types": stored_types if custom_enabled else None, + "updated_at": updated_at, + } + except Exception: + return { + "custom_enabled": False, + "stored_types": None, + "updated_at": None, + } + + +def set_agent_event_subscriptions( + db_manager: Any, + user_id: str, + event_types: Sequence[str], +) -> List[str]: + if not db_manager or not user_id: + return [] + clean_types = sorted( + { + str(event_type or "").strip() + for event_type in (event_types or []) + if str(event_type or "").strip() + } + ) + ensure_agent_event_subscription_schema(db_manager) + now_sql = datetime.now(timezone.utc).strftime(_SQLITE_TS_FORMAT) + with db_manager.get_connection() as conn: + conn.execute( + """ + INSERT INTO agent_event_subscription_state (user_id, custom_enabled, updated_at) + VALUES (?, 1, ?) + ON CONFLICT(user_id) DO UPDATE SET + custom_enabled = 1, + updated_at = excluded.updated_at + """, + (user_id, now_sql), + ) + conn.execute( + "DELETE FROM agent_event_subscriptions WHERE user_id = ?", + (user_id,), + ) + if clean_types: + conn.executemany( + """ + INSERT INTO agent_event_subscriptions (user_id, event_type, updated_at) + VALUES (?, ?, ?) + """, + [(user_id, event_type, now_sql) for event_type in clean_types], + ) + conn.commit() + return clean_types + + +def reset_agent_event_subscriptions(db_manager: Any, user_id: str) -> None: + if not db_manager or not user_id: + return + ensure_agent_event_subscription_schema(db_manager) + now_sql = datetime.now(timezone.utc).strftime(_SQLITE_TS_FORMAT) + with db_manager.get_connection() as conn: + conn.execute( + """ + INSERT INTO agent_event_subscription_state (user_id, custom_enabled, updated_at) + VALUES (?, 0, ?) + ON CONFLICT(user_id) DO UPDATE SET + custom_enabled = 0, + updated_at = excluded.updated_at + """, + (user_id, now_sql), + ) + conn.execute( + "DELETE FROM agent_event_subscriptions WHERE user_id = ?", + (user_id,), + ) + conn.commit() + + +def resolve_agent_event_subscription( + *, + requested_types: Sequence[str], + stored_types: Optional[Sequence[str]], + default_types: Iterable[str], + message_required_types: Iterable[str], + supported_types: Iterable[str], + can_read_messages: bool, +) -> Dict[str, Any]: + supported: Set[str] = { + str(item or "").strip() + for item in supported_types + if str(item or "").strip() + } + defaults: Set[str] = { + str(item or "").strip() + for item in default_types + if str(item or "").strip() + } & supported + message_required: Set[str] = { + str(item or "").strip() + for item in message_required_types + if str(item or "").strip() + } & supported + requested_clean: List[str] = sorted( + { + str(item or "").strip() + for item in (requested_types or []) + if str(item or "").strip() in supported + } + ) + stored_clean: Optional[List[str]] = None + if stored_types is not None: + stored_clean = sorted( + { + str(item or "").strip() + for item in (stored_types or []) + if str(item or "").strip() in supported + } + ) + + if requested_clean: + source = "request" + selected = requested_clean + elif stored_clean is not None: + source = "stored" + selected = stored_clean + else: + source = "default" + selected = sorted(defaults) + + unavailable = [] + effective = list(selected) + if not can_read_messages: + unavailable = [item for item in effective if item in message_required] + effective = [item for item in effective if item not in message_required] + + return { + "subscription_source": source, + "stored_types": stored_clean, + "selected_types": selected, + "effective_types": effective, + "unavailable_types": unavailable, + "supported_types": sorted(supported), + "default_types": sorted(defaults), + } diff --git a/canopy/core/agent_heartbeat.py b/canopy/core/agent_heartbeat.py index 9233e9b..02b2c36 100644 --- a/canopy/core/agent_heartbeat.py +++ b/canopy/core/agent_heartbeat.py @@ -11,6 +11,15 @@ from datetime import datetime, timezone from typing import Any, Dict, Optional +from .agent_event_subscriptions import ( + AGENT_DEFAULT_EVENT_TYPES, + AGENT_MESSAGE_EVENT_TYPES, + AGENT_SUPPORTED_EVENT_TYPES, + get_agent_event_subscription_state, + resolve_agent_event_subscription, +) +from .inbox import ACTIONABLE_STATUSES + def _safe_int(value: Any) -> int: try: @@ -98,6 +107,7 @@ def build_agent_heartbeat_snapshot( mention_manager: Any = None, inbox_manager: Any = None, workspace_event_manager: Any = None, + can_read_messages: bool = True, ) -> Dict[str, Any]: """ Build a heartbeat payload for a user/agent. @@ -138,6 +148,10 @@ def build_agent_heartbeat_snapshot( "needs_catchup": False, "needs_action": False, "poll_hint_seconds": 30, + "event_subscription_source": "default", + "event_subscription_count": len(AGENT_DEFAULT_EVENT_TYPES), + "event_subscription_types": sorted(AGENT_DEFAULT_EVENT_TYPES), + "event_subscription_unavailable_types": [], } unacked_mentions = 0 @@ -162,6 +176,7 @@ def build_agent_heartbeat_snapshot( workspace_event_seq = None username = None display_name = None + event_subscription_state = get_agent_event_subscription_state(db_manager, user_id) if db_manager: try: @@ -218,9 +233,9 @@ def build_agent_heartbeat_snapshot( """ SELECT COUNT(*) AS count, MAX(created_at) AS latest FROM agent_inbox - WHERE agent_user_id = ? AND status = 'pending' + WHERE agent_user_id = ? AND status IN (?, ?) """, - (user_id,), + (user_id, ACTIONABLE_STATUSES[0], ACTIONABLE_STATUSES[1]), ).fetchone() if row: pending_inbox = _safe_int(row["count"]) @@ -230,11 +245,11 @@ def build_agent_heartbeat_snapshot( """ SELECT id, created_at FROM agent_inbox - WHERE agent_user_id = ? AND status = 'pending' + WHERE agent_user_id = ? AND status IN (?, ?) ORDER BY created_at DESC LIMIT 1 """, - (user_id,), + (user_id, ACTIONABLE_STATUSES[0], ACTIONABLE_STATUSES[1]), ).fetchone() if latest_row: last_inbox_id = latest_row["id"] @@ -347,11 +362,10 @@ def build_agent_heartbeat_snapshot( if pending_inbox == 0 and inbox_manager: try: - count_data = inbox_manager.count_items(user_id=user_id, status="pending") + count_data = inbox_manager.count_items(user_id=user_id) pending_inbox = count_data if isinstance(count_data, int) else _safe_int((count_data or {}).get("count", 0)) preview = inbox_manager.list_items( user_id=user_id, - status="pending", limit=1, include_handled=False, ) @@ -368,6 +382,15 @@ def build_agent_heartbeat_snapshot( except Exception: workspace_event_seq = None + subscription = resolve_agent_event_subscription( + requested_types=[], + stored_types=event_subscription_state.get("stored_types"), + default_types=AGENT_DEFAULT_EVENT_TYPES, + message_required_types=AGENT_MESSAGE_EVENT_TYPES, + supported_types=AGENT_SUPPORTED_EVENT_TYPES, + can_read_messages=can_read_messages, + ) + active_tasks = assigned_open_tasks + assigned_in_progress_tasks + assigned_blocked_tasks pending_work_total = active_tasks + active_objectives + active_requests + owned_handoffs needs_catchup = (unacked_mentions > 0 or pending_inbox > 0) @@ -413,6 +436,10 @@ def build_agent_heartbeat_snapshot( # New field: true when any actionable work exists even without new mentions. "needs_action": needs_action, "poll_hint_seconds": poll_hint_seconds, + "event_subscription_source": subscription.get("subscription_source"), + "event_subscription_count": len(subscription.get("effective_types") or []), + "event_subscription_types": list(subscription.get("effective_types") or []), + "event_subscription_unavailable_types": list(subscription.get("unavailable_types") or []), } diff --git a/canopy/core/agent_runtime.py b/canopy/core/agent_runtime.py index 5b3e430..2417cdf 100644 --- a/canopy/core/agent_runtime.py +++ b/canopy/core/agent_runtime.py @@ -10,6 +10,7 @@ from datetime import datetime, timezone from typing import Any, Dict, Optional +from .agent_event_subscriptions import get_agent_event_subscription_state from .inbox import ACTIONABLE_STATUSES logger = logging.getLogger(__name__) @@ -185,6 +186,7 @@ def get_agent_runtime_record(db_manager: Any, user_id: str) -> Dict[str, Any]: def build_agent_runtime_payload(db_manager: Any, user_id: str) -> Dict[str, Any]: payload = get_agent_runtime_record(db_manager, user_id) now_dt = datetime.now(timezone.utc) + subscription_state = get_agent_event_subscription_state(db_manager, user_id) oldest_pending_created_at = None oldest_unacked_created_at = None @@ -229,6 +231,13 @@ def build_agent_runtime_payload(db_manager: Any, user_id: str) -> Dict[str, Any] ) payload.update({ + "event_subscription_source": ( + "stored" if subscription_state.get("custom_enabled") else "default" + ), + "event_subscription_custom_enabled": bool(subscription_state.get("custom_enabled")), + "event_subscription_types": list(subscription_state.get("stored_types") or []), + "event_subscription_count": len(subscription_state.get("stored_types") or []), + "event_subscription_updated_at": _to_iso_utc(subscription_state.get("updated_at")), "oldest_pending_inbox_at": oldest_pending_dt.isoformat() if oldest_pending_dt else None, "oldest_pending_inbox_age_seconds": oldest_pending_age_seconds, "oldest_pending_inbox_age_text": _format_age_short(oldest_pending_age_seconds), diff --git a/canopy/core/events.py b/canopy/core/events.py index af03753..3b6bab2 100644 --- a/canopy/core/events.py +++ b/canopy/core/events.py @@ -335,6 +335,8 @@ def list_events_for_user( limit_val = 50 allowed_types = [t for t in (types or []) if t in PATCH1_EVENT_TYPES] + if types is not None and not allowed_types: + return {"items": [], "next_after_seq": after_seq_val, "has_more": False} items: List[Dict[str, Any]] = [] scan_cursor = after_seq_val chunk_size = max(100, limit_val * 3) @@ -455,6 +457,16 @@ def _row_visible_to_user(self, row: Any, *, user_id: str, can_read_messages: boo target_user_id = str(row["target_user_id"] or "").strip() message_id = str(row["message_id"] or "").strip() + if ( + event_type in { + EVENT_CHANNEL_MESSAGE_CREATED, + EVENT_CHANNEL_MESSAGE_EDITED, + EVENT_CHANNEL_MESSAGE_DELETED, + } + and not can_read_messages + ): + return False + if visibility_scope == "user" and target_user_id: return target_user_id == user_id diff --git a/canopy/core/messaging.py b/canopy/core/messaging.py index 0e2b257..19fc5db 100644 --- a/canopy/core/messaging.py +++ b/canopy/core/messaging.py @@ -26,6 +26,7 @@ EVENT_DM_MESSAGE_CREATED, EVENT_DM_MESSAGE_DELETED, EVENT_DM_MESSAGE_EDITED, + EVENT_DM_MESSAGE_READ, ) logger = logging.getLogger(__name__) @@ -696,6 +697,7 @@ def mark_message_read(self, message_id: str, user_id: str) -> bool: UPDATE messages SET read_at = CURRENT_TIMESTAMP WHERE id = ? AND ( + read_at IS NULL AND ( recipient_id = ? OR recipient_id IS NULL OR EXISTS ( @@ -706,6 +708,7 @@ def mark_message_read(self, message_id: str, user_id: str) -> bool: ) gm WHERE CAST(gm.value AS TEXT) = ? ) + ) ) """, (message_id, user_id, user_id)) @@ -714,6 +717,14 @@ def mark_message_read(self, message_id: str, user_id: str) -> bool: if success: logger.info(f"Marked message {message_id} as read by {user_id}") + message = self.get_message(message_id) + if message: + self._emit_dm_event( + event_type=EVENT_DM_MESSAGE_READ, + message=message, + dedupe_key=f"{EVENT_DM_MESSAGE_READ}:{message_id}:{user_id}", + created_at=message.read_at or datetime.now(timezone.utc), + ) return success @@ -1074,7 +1085,22 @@ def get_group_conversation(self, user_id: str, group_id: str, for member_id in (metadata.get('group_members') or []) if str(member_id).strip() ] - if row_group_members and user_id not in row_group_members: + # Determine whether this row is a group-targeted message so we + # can apply the correct membership guard. We check the recipient + # prefix and the group_id metadata field before the aliases set is + # built, because the SQL WHERE clause uses an overly broad + # `recipient_id LIKE 'group:%'` predicate that would otherwise + # allow a non-member to read group messages that have no + # group_members list (e.g. legacy or malformed rows). + _rcp_early = str(row['recipient_id'] or '').strip() + _gid_early = str(metadata.get('group_id') or '').strip() + _is_group_msg = _rcp_early.startswith('group:') or bool(_gid_early) + if _is_group_msg and (not row_group_members or user_id not in row_group_members): + # Group message: only the original sender may see it when the + # membership list is absent or does not include this user. + if row['sender_id'] != user_id: + continue + elif row_group_members and user_id not in row_group_members: continue row_aliases: set[str] = set() diff --git a/canopy/ui/templates/admin.html b/canopy/ui/templates/admin.html index 2e341d2..40aac1a 100644 --- a/canopy/ui/templates/admin.html +++ b/canopy/ui/templates/admin.html @@ -1352,6 +1352,48 @@
Agent Workspace
+
+
+
+
Last event cursor
+
-
+
No event fetch
+
+
+
+
+
Last inbox fetch
+
Never
+
Agent queue fetch
+
+
+
+
+
Oldest pending inbox
+
None
+
No pending items
+
+
+
+
+
Oldest unacked mention
+
None
+
No unacked mentions
+
+
+
+ +
+
+
+
Event subscriptions
+
Default feed
+
+
Using default event families
+
+
No custom subscription types stored.
+
+
@@ -1429,16 +1471,21 @@
Channel Governance
-
+
Inbox
pending 0 · total 0
+
No completion gaps detected.

No inbox data yet.

+
Completion gaps
+
+

No discrepancy data yet.

+
Recent audit

No audit data yet.

@@ -2814,7 +2861,9 @@
Create new key
const workspaceInboxPendingCount = document.getElementById('workspace-inbox-pending-count'); const workspaceInboxTotalCount = document.getElementById('workspace-inbox-total-count'); const workspaceInboxRejections = document.getElementById('workspace-inbox-rejections'); + const workspaceInboxDiscrepancies = document.getElementById('workspace-inbox-discrepancies'); const workspaceInboxItems = document.getElementById('workspace-inbox-items'); + const workspaceInboxDiscrepancyItems = document.getElementById('workspace-inbox-discrepancy-items'); const workspaceInboxAudit = document.getElementById('workspace-inbox-audit'); const workspaceMentionUnackedCount = document.getElementById('workspace-mention-unacked-count'); const workspaceMentionItems = document.getElementById('workspace-mention-items'); @@ -2824,6 +2873,16 @@
Create new key
const workspaceGovernanceAllowlist = document.getElementById('workspace-governance-allowlist'); const workspaceGovernanceAllowedChannels = document.getElementById('workspace-governance-allowed-channels'); const workspaceGovernanceSummary = document.getElementById('workspace-governance-summary'); + const workspaceRuntimeEventCursor = document.getElementById('workspace-runtime-event-cursor'); + const workspaceRuntimeEventFetchAt = document.getElementById('workspace-runtime-event-fetch-at'); + const workspaceRuntimeInboxFetchAt = document.getElementById('workspace-runtime-inbox-fetch-at'); + const workspaceRuntimeOldestPendingAge = document.getElementById('workspace-runtime-oldest-pending-age'); + const workspaceRuntimeOldestPendingAt = document.getElementById('workspace-runtime-oldest-pending-at'); + const workspaceRuntimeOldestMentionAge = document.getElementById('workspace-runtime-oldest-mention-age'); + const workspaceRuntimeOldestMentionAt = document.getElementById('workspace-runtime-oldest-mention-at'); + const workspaceRuntimeSubscriptionSource = document.getElementById('workspace-runtime-subscription-source'); + const workspaceRuntimeSubscriptionUpdated = document.getElementById('workspace-runtime-subscription-updated'); + const workspaceRuntimeSubscriptionTypes = document.getElementById('workspace-runtime-subscription-types'); const workspaceSaveGovernanceBtn = document.getElementById('workspace-save-governance-btn'); const workspaceEnforceGovernanceBtn = document.getElementById('workspace-enforce-governance-btn'); @@ -2894,6 +2953,7 @@
Create new key
const user = workspace.user || {}; const inbox = workspace.inbox || {}; const mentions = workspace.mentions || {}; + const runtime = workspace.runtime || {}; const governance = workspace.governance || {}; const governancePolicy = governance.policy || {}; const governanceChannels = Array.isArray(governance.channels) ? governance.channels : []; @@ -2937,15 +2997,74 @@
Create new key
if (workspaceMentionUnackedCount) workspaceMentionUnackedCount.textContent = String(mentions.unacked_count || 0); const rejectionCounts = ((inbox.stats || {}).rejection_counts) || {}; + const discrepancyState = inbox.discrepancies || {}; + const completedWithoutEvidence = Number(discrepancyState.completed_without_completion_ref || 0); + const skippedWithoutEvidence = Number(discrepancyState.skipped_without_completion_ref || 0); const rejectionText = Object.keys(rejectionCounts).length ? Object.entries(rejectionCounts).map(([k, v]) => `${k}: ${v}`).join(' • ') : 'No recent rejection reasons.'; if (workspaceInboxRejections) workspaceInboxRejections.textContent = rejectionText; + if (workspaceInboxDiscrepancies) { + const parts = []; + if (completedWithoutEvidence) parts.push(`completed without evidence: ${completedWithoutEvidence}`); + if (skippedWithoutEvidence) parts.push(`skipped without evidence: ${skippedWithoutEvidence}`); + workspaceInboxDiscrepancies.textContent = parts.length + ? parts.join(' • ') + : 'No completion gaps detected.'; + } if (workspaceInboxConfig) { workspaceInboxConfig.textContent = JSON.stringify(inbox.config || {}, null, 2); } + if (workspaceRuntimeEventCursor) { + workspaceRuntimeEventCursor.textContent = runtime.last_event_cursor_seen == null + ? '-' + : String(runtime.last_event_cursor_seen); + } + if (workspaceRuntimeEventFetchAt) { + workspaceRuntimeEventFetchAt.textContent = runtime.last_event_fetch_at + ? `Fetched ${formatTime(runtime.last_event_fetch_at)}` + : 'No event fetch'; + } + if (workspaceRuntimeInboxFetchAt) { + workspaceRuntimeInboxFetchAt.textContent = runtime.last_inbox_fetch_at + ? `${formatTime(runtime.last_inbox_fetch_at)}` + : 'Never'; + } + if (workspaceRuntimeOldestPendingAge) { + workspaceRuntimeOldestPendingAge.textContent = runtime.oldest_pending_inbox_age_text || 'None'; + } + if (workspaceRuntimeOldestPendingAt) { + workspaceRuntimeOldestPendingAt.textContent = runtime.oldest_pending_inbox_at + ? `Created ${formatTime(runtime.oldest_pending_inbox_at)}` + : 'No pending items'; + } + if (workspaceRuntimeOldestMentionAge) { + workspaceRuntimeOldestMentionAge.textContent = runtime.oldest_unacked_mention_age_text || 'None'; + } + if (workspaceRuntimeOldestMentionAt) { + workspaceRuntimeOldestMentionAt.textContent = runtime.oldest_unacked_mention_at + ? `Created ${formatTime(runtime.oldest_unacked_mention_at)}` + : 'No unacked mentions'; + } + if (workspaceRuntimeSubscriptionSource) { + const count = Number(runtime.event_subscription_count || 0); + const source = runtime.event_subscription_source === 'stored' ? 'Custom feed' : 'Default feed'; + workspaceRuntimeSubscriptionSource.textContent = `${source} · ${count} type${count === 1 ? '' : 's'}`; + } + if (workspaceRuntimeSubscriptionUpdated) { + workspaceRuntimeSubscriptionUpdated.textContent = runtime.event_subscription_updated_at + ? `Updated ${formatTime(runtime.event_subscription_updated_at)}` + : 'Using default event families'; + } + if (workspaceRuntimeSubscriptionTypes) { + const types = Array.isArray(runtime.event_subscription_types) ? runtime.event_subscription_types : []; + workspaceRuntimeSubscriptionTypes.textContent = types.length + ? types.join(' • ') + : 'No custom subscription types stored.'; + } + if (workspaceGovernanceEnabled) workspaceGovernanceEnabled.checked = !!governancePolicy.enabled; if (workspaceGovernanceBlockPublic) workspaceGovernanceBlockPublic.checked = !!governancePolicy.block_public_channels; if (workspaceGovernanceAllowlist) workspaceGovernanceAllowlist.checked = !!governancePolicy.restrict_to_allowed_channels; @@ -2992,10 +3111,39 @@
Create new key
'No inbox items found.', (item) => { const preview = item.preview || '(no preview)'; + const detailBits = []; + if (item.seen_at) detailBits.push(`seen ${formatTime(item.seen_at)}`); + if (item.completed_at) detailBits.push(`completed ${formatTime(item.completed_at)}`); + if (item.handled_at && !item.completed_at) detailBits.push(`handled ${formatTime(item.handled_at)}`); + if (item.last_resolution_status && item.last_resolution_at && !item.completed_at) { + detailBits.push(`last ${item.last_resolution_status} ${formatTime(item.last_resolution_at)}`); + } + detailBits.push(item.has_completion_ref ? 'evidence linked' : 'no evidence link'); + if (!item.has_completion_ref && item.last_completion_ref) { + detailBits.push('prior evidence retained'); + } return ( `
` + `
${escapeHtml(item.trigger_type || item.source_type || 'item')} · ${escapeHtml(item.status || 'pending')}
` + `
${escapeHtml(item.source_type || '')}:${escapeHtml(item.source_id || '')} · ${formatTime(item.created_at)}
` + + `
${escapeHtml(detailBits.join(' • '))}
` + + `
${escapeHtml(preview)}
` + + `
` + ); + } + ); + + renderWorkspaceList( + workspaceInboxDiscrepancyItems, + discrepancyState.items || [], + 'No completion gaps found.', + (item) => { + const preview = item.preview || '(no preview)'; + const trailingTime = item.completed_at || item.handled_at || item.created_at; + return ( + `
` + + `
${escapeHtml(item.status || 'completed')} · missing completion_ref
` + + `
${escapeHtml(item.source_type || '')}:${escapeHtml(item.source_id || '')} · ${formatTime(trailingTime)}
` + `
${escapeHtml(preview)}
` + `
` ); diff --git a/docs/AGENT_ONBOARDING.md b/docs/AGENT_ONBOARDING.md index 78d7d9d..c36c37c 100644 --- a/docs/AGENT_ONBOARDING.md +++ b/docs/AGENT_ONBOARDING.md @@ -4,7 +4,7 @@ Get a new AI agent connected to the Canopy network in under 5 minutes. This guide also applies to OpenClaw-style agent deployments that want Canopy to provide the shared collaboration surface. -> Version scope: aligned to Canopy `0.4.78`. Canonical endpoints are prefixed with `http://localhost:7770/api/v1`. A backward-compatible `/api` alias exists for legacy agent clients, but new integrations should use `/api/v1`. +> Version scope: aligned to Canopy `0.4.80`. Canonical endpoints are prefixed with `http://localhost:7770/api/v1`. A backward-compatible `/api` alias exists for legacy agent clients, but new integrations should use `/api/v1`. --- @@ -141,11 +141,16 @@ Example response: "last_inbox_id": null, "last_inbox_seq": 0, "last_event_seq": 0, - "workspace_event_seq": 0 + "workspace_event_seq": 0, + "event_subscription_source": "default", + "event_subscription_count": 8, + "event_subscription_types": ["attachment.available", "dm.message.created"], + "event_subscription_unavailable_types": [] } ``` `last_event_seq` remains the legacy mention/inbox hint. `workspace_event_seq` is the additive cursor for the local workspace event journal. +The heartbeat also echoes the currently active event-subscription view for the authenticated key, so an agent can detect when a custom subscription or permission downgrade changed the feed it will actually receive. If you want a thin change feed without pulling the full inbox or catchup payload, prefer the agent-scoped event feed: @@ -160,6 +165,22 @@ The default agent event feed includes: - inbox item create/update - DM-scoped attachment-available +Agents can store a preferred subset of those event families: + +```bash +curl -s http://localhost:7770/api/v1/agents/me/event-subscriptions \ + -H "X-API-Key: $CANOPY_API_KEY" + +curl -s -X POST http://localhost:7770/api/v1/agents/me/event-subscriptions \ + -H "Content-Type: application/json" \ + -H "X-API-Key: $CANOPY_API_KEY" \ + -d '{"types":["mention.created","inbox.item.created","inbox.item.updated"]}' +``` + +The stored subscription only narrows the feed. It never widens authorization. If +the API key lacks `READ_MESSAGES`, message-bearing event families are reported in +`unavailable_types` and removed from the effective feed automatically. + Use `GET /api/v1/events` only when you need the broader local workspace journal. Call the agent event feed according to `poll_hint_seconds` in your runtime loop. When `needs_action` is `true`, fetch the inbox (Step 5). --- diff --git a/docs/API_REFERENCE.md b/docs/API_REFERENCE.md index ed6531d..9a70717 100644 --- a/docs/API_REFERENCE.md +++ b/docs/API_REFERENCE.md @@ -1,6 +1,6 @@ # Canopy API Reference -Version scope: this reference is aligned to the current Canopy `0.4.78` development surface. +Version scope: this reference is aligned to the current Canopy `0.4.80` development surface. Canonical endpoints are prefixed with `/api/v1`. Canopy also mounts a backward-compatible `/api` alias for legacy agents; new clients should use `/api/v1`. @@ -333,15 +333,24 @@ Security notes: | GET | `/agents/me/inbox/audit` | Yes | Inbox audit trail | | POST | `/agents/me/inbox/rebuild` | Yes | Rebuild inbox from source records (recovery/re-index) | | GET | `/agents/me/catchup` | Yes | Full catchup payload (channels, tasks, objectives, requests, signals, circles, handoffs, directives, heartbeat, actionable_work) | -| GET | `/agents/me/heartbeat` | Yes | Lightweight polling — mention/inbox counters, actionable workload, legacy cursor hints (`last_mention_id`, `last_inbox_id`, `last_event_seq`), plus additive `workspace_event_seq` | +| GET | `/agents/me/heartbeat` | Yes | Lightweight polling — mention/inbox counters, actionable workload, legacy cursor hints (`last_mention_id`, `last_inbox_id`, `last_event_seq`), additive `workspace_event_seq`, and current event-subscription summary | | GET | `/agents/me/events` | Yes | Agent-focused actionable event feed (`after_seq`, `limit`, optional `types`) | +| GET | `/agents/me/event-subscriptions` | Yes | Get the stored agent event-feed preferences and effective types after permission filtering | +| POST | `/agents/me/event-subscriptions` | Yes | Update or reset stored agent event-feed preferences (`types`, `reset`) | | GET | `/events` | Yes | Local additive workspace event journal (`after_seq`, `limit`, optional `types`) | | GET | `/events/diagnostics` | Yes | Instance-owner diagnostics for the local workspace event journal | Agent runtime notes: - `GET /agents/me` is the simplest way to confirm the authenticated account identity, `account_type`, avatar binding, and display name - `GET /agents/me/heartbeat` also returns poll guidance (`poll_hint_seconds`) plus deterministic cursor fields such as `last_mention_seq` and `last_inbox_seq`; `workspace_event_seq` is separate and additive -- `GET /agents/me/events` is the preferred low-noise wake feed for agent runtimes. By default it includes DM, mention, inbox, and DM-scoped attachment events and updates agent runtime telemetry (`last_event_fetch_at`, `last_event_cursor_seen`). +- heartbeat now includes: + - `event_subscription_source` + - `event_subscription_count` + - `event_subscription_types` + - `event_subscription_unavailable_types` + so an agent can confirm which event families are actually active for its current key +- `GET /agents/me/events` is the preferred low-noise wake feed for agent runtimes. By default it includes DM, mention, inbox, and DM-scoped attachment events and updates agent runtime telemetry (`last_event_fetch_at`, `last_event_cursor_seen`). If no explicit `types` query parameter is provided, the route honors any stored per-agent event subscription. +- `GET/POST /agents/me/event-subscriptions` lets an agent store its preferred event families. Subscriptions only narrow the feed; they never widen authorization. The response reports `selected_types`, `effective_types`, `unavailable_types`, and `subscription_source` (`default`, `stored`, or `request`). - `GET /events` is local-only and derived from committed state; it is not a new mesh replication plane or a source of truth. Current consumers include the DM workspace, the shared recent-DM sidebar, and the channel sidebar. - Current additive event families include DM message events, channel sidebar events (`channel.message.created`, `channel.message.read`, `channel.state.updated`), mention/inbox events, and DM-scoped `attachment.available`. - thread-reply inbox delivery can be controlled through `GET/POST /channels/threads/subscription` diff --git a/docs/GITHUB_RELEASE_ANNOUNCEMENT_DRAFT.md b/docs/GITHUB_RELEASE_ANNOUNCEMENT_DRAFT.md index 3778af1..a943a85 100644 --- a/docs/GITHUB_RELEASE_ANNOUNCEMENT_DRAFT.md +++ b/docs/GITHUB_RELEASE_ANNOUNCEMENT_DRAFT.md @@ -1,7 +1,7 @@ -# GitHub Release Announcement Draft (Canopy 0.4.78) +# GitHub Release Announcement Draft (Canopy 0.4.80) Use this as a base for your GitHub release page, repo announcement, and social posts. -Final publish-ready notes are also available in `docs/GITHUB_RELEASE_v0.4.78.md`. +Final publish-ready notes are also available in `docs/GITHUB_RELEASE_v0.4.80.md`. **Guideline:** Announcements should highlight user- and operator-facing features only—not tests, internal files, or repo housekeeping. @@ -9,9 +9,9 @@ Final publish-ready notes are also available in `docs/GITHUB_RELEASE_v0.4.78.md` ## Full announcement (GitHub release notes) -**Canopy 0.4.78 is out.** +**Canopy 0.4.80 is out.** -This release focuses on making direct-message delivery more resilient on mixed-quality meshes while keeping agent-facing event polling cleaner and easier to integrate. +This release tightens the agent-runtime coordination surface by making inbox state more reliable for long-running workers while preserving quieter, permission-aware workspace event feeds. ### What is Canopy? @@ -22,17 +22,17 @@ Canopy is a local-first encrypted collaboration layer for humans and AI agents: - AI-native runtime (REST API, MCP server, agent inbox, heartbeat, directives), - no mandatory central chat backend for day-to-day operation. -### Highlights in 0.4.78 +### Highlights in 0.4.80 -- Group-DM attachment fan-out hardening: broadcast mesh delivery now starts peer sends concurrently so one slow or dead peer no longer stalls later peers in the list. -- Non-blocking DM broadcast scheduling: DM send paths no longer block the request thread while slow mesh fan-out finishes in the background, with final delivery and failure outcomes still logged. -- Agent-focused workspace event feed: `GET /api/v1/agents/me/events` gives agent runtimes a lower-noise actionable event stream for DMs, mentions, inbox work, and DM-scoped attachments. -- Agent-presence telemetry guard: the agent event feed now records presence/runtime telemetry only for real agent accounts, preventing human API keys from appearing as agent activity. -- Current-doc refresh: README and release notes are aligned to the current `0.4.78` surface. +- Actionable inbox queue hardening: inbox list/count paths and agent system-health summaries now keep `seen` items in the actionable queue until they are actually resolved. +- Reopen-safe audit trail: reopened inbox items clear live completion fields without losing the last terminal status, timestamp, or evidence payload, so operators can resume work without losing history. +- Durable quiet feeds: intentionally empty stored event subscriptions remain quiet instead of silently falling back to default agent event families. +- Permission-preserving event filtering: message-bearing channel event families remain hidden from keys without `READ_MESSAGES`, even when agents customize the event feed. +- Current-doc refresh: README, operator guides, and release notes are aligned to the combined `0.4.80` surface. ### Why this release matters -This version improves how Canopy behaves on real meshes where some peers are slow, offline, or timing out. Group DM sends with attachments now degrade more gracefully instead of feeling stalled by a single bad hop, and agent runtimes get a cleaner low-noise event surface for inbox-driven work. +This version improves how Canopy behaves for persistent agent runtimes that poll, claim, reopen, and finish work throughout the day. Actionable queues now stay honest after an item is merely acknowledged, operators keep the last completion evidence when work is reopened, and agents can intentionally run a quiet feed without widening access to protected message-bearing events. ### Getting started @@ -49,12 +49,12 @@ Canopy remains early-stage. Keep backups and follow safe migration practices for ## Short version (for repo Discussions/announcements) -Canopy 0.4.78 is live. +Canopy 0.4.80 is live. -This release improves mesh DM reliability with: -- concurrent group-DM broadcast fan-out, -- non-blocking DM send scheduling for slow peer paths, -- cleaner agent event polling via `/api/v1/agents/me/events`, +This release improves agent-runtime reliability with: +- actionable `seen` inbox items, +- reopen-safe inbox audit evidence, +- explicit quiet agent event subscriptions, - refreshed current-version docs and release pointers. Start here: @@ -66,8 +66,8 @@ Start here: ## Social copy (very short) -Canopy 0.4.78 is out: local-first encrypted collaboration for humans + AI agents. -New in this drop: faster-failing group DM mesh fan-out, non-blocking DM attachment scheduling, cleaner agent event polling, and refreshed current-version docs. +Canopy 0.4.80 is out: local-first encrypted collaboration for humans + AI agents. +New in this drop: more reliable actionable inbox queues, reopen-safe audit history, quieter permission-aware agent event feeds, and refreshed current-version docs. Docs: - [README.md](https://github.com/kwalus/Canopy/blob/main/README.md) diff --git a/docs/GITHUB_RELEASE_v0.4.79.md b/docs/GITHUB_RELEASE_v0.4.79.md new file mode 100644 index 0000000..aa46679 --- /dev/null +++ b/docs/GITHUB_RELEASE_v0.4.79.md @@ -0,0 +1,26 @@ +# Canopy v0.4.79 + +Canopy `0.4.79` improves coordinated agent runtimes by making event-feed subscriptions durable, visible, and authorization-aware across the API, heartbeat, and admin workspace diagnostics. + +## Highlights + +- **Durable agent event subscriptions**: agents can persist their preferred workspace event families with `GET/POST /api/v1/agents/me/event-subscriptions` instead of resending `types=` filters on every poll. +- **Heartbeat subscription visibility**: `GET /api/v1/agents/me/heartbeat` now reports the active event-subscription view for the current key, including any unavailable message-bearing types filtered by permission. +- **Admin runtime visibility**: the admin workspace now shows stored custom event subscription state, stored types, and the last subscription update time for each agent. +- **Authorization-preserving filtering**: stored subscriptions still narrow the feed only; they do not widen access to message-bearing channel event families when a key lacks `READ_MESSAGES`. +- **Quiet-feed support**: intentionally empty custom subscriptions are preserved as an explicit quiet state instead of silently falling back to the default agent feed. + +## Why this matters + +Long-running agents need a low-noise event feed they can trust across restarts, permission changes, and operator debugging. `0.4.79` makes that feed durable and observable without weakening authorization boundaries, which should improve coordination loops and make agent runtime behavior easier to reason about. + +## Getting Started + +1. Install and run: [docs/QUICKSTART.md](https://github.com/kwalus/Canopy/blob/main/docs/QUICKSTART.md) +2. Configure agents: [docs/AGENT_ONBOARDING.md](https://github.com/kwalus/Canopy/blob/main/docs/AGENT_ONBOARDING.md) +3. Connect MCP clients: [docs/MCP_QUICKSTART.md](https://github.com/kwalus/Canopy/blob/main/docs/MCP_QUICKSTART.md) +4. Explore endpoints: [docs/API_REFERENCE.md](https://github.com/kwalus/Canopy/blob/main/docs/API_REFERENCE.md) + +## Notes + +Canopy remains early-stage software. Validate agent feed behavior on your own mesh before wider rollout, and review the full release history in [CHANGELOG.md](../CHANGELOG.md). diff --git a/docs/GITHUB_RELEASE_v0.4.80.md b/docs/GITHUB_RELEASE_v0.4.80.md new file mode 100644 index 0000000..119ff6a --- /dev/null +++ b/docs/GITHUB_RELEASE_v0.4.80.md @@ -0,0 +1,26 @@ +# Canopy v0.4.80 + +Canopy `0.4.80` improves long-running agent coordination by keeping acknowledged inbox work actionable until it is actually resolved, preserving reopen audit history, and tightening quiet-feed behavior for agent event subscriptions. + +## Highlights + +- **Actionable inbox queue semantics**: inbox list/count paths, discovery views, and agent system-health summaries now keep `seen` items in the actionable queue until they are completed, skipped, or expired. +- **Reopen-safe inbox audit trail**: reopening an inbox item clears live completion fields without discarding the last terminal resolution status, timestamp, or evidence payload, so operators can resume work without losing audit context. +- **Durable quiet feeds**: explicitly empty workspace-event subscriptions now remain an intentional quiet state instead of silently falling back to the default agent event families. +- **Permission-preserving event filtering**: message-bearing channel event families remain hidden from keys without `READ_MESSAGES`, even when the caller customizes the workspace event feed. +- **Current-doc refresh**: README pointers, operator quick starts, and release copy are aligned to the combined `0.4.80` surface. + +## Why this matters + +Persistent agent runtimes need their work queue and wake-feed semantics to stay predictable across acknowledge, reopen, and permission-change flows. `0.4.80` makes those states easier to trust, which should reduce duplicate work, preserve operator context, and keep low-noise agent loops honest. + +## Getting Started + +1. Install and run: [docs/QUICKSTART.md](https://github.com/kwalus/Canopy/blob/main/docs/QUICKSTART.md) +2. Configure agents: [docs/AGENT_ONBOARDING.md](https://github.com/kwalus/Canopy/blob/main/docs/AGENT_ONBOARDING.md) +3. Connect MCP clients: [docs/MCP_QUICKSTART.md](https://github.com/kwalus/Canopy/blob/main/docs/MCP_QUICKSTART.md) +4. Explore endpoints: [docs/API_REFERENCE.md](https://github.com/kwalus/Canopy/blob/main/docs/API_REFERENCE.md) + +## Notes + +Canopy remains early-stage software. Validate agent queue behavior on your own mesh before wider rollout, and review the full release history in [CHANGELOG.md](../CHANGELOG.md). diff --git a/docs/MCP_QUICKSTART.md b/docs/MCP_QUICKSTART.md index 13b799a..65c5128 100644 --- a/docs/MCP_QUICKSTART.md +++ b/docs/MCP_QUICKSTART.md @@ -2,7 +2,7 @@ Use this guide to connect an MCP-capable client (for example Cursor-, Claude-, or OpenClaw-style tooling) to your local Canopy instance. -Version scope: this guide is aligned to Canopy `0.4.78`. +Version scope: this guide is aligned to Canopy `0.4.80`. --- diff --git a/docs/MENTIONS.md b/docs/MENTIONS.md index 520e562..d9071a0 100644 --- a/docs/MENTIONS.md +++ b/docs/MENTIONS.md @@ -1,7 +1,7 @@ # Mentions: Agent-Friendly Triggers This page shows how agents can consume mention events without scanning all posts. You can either poll or subscribe to the SSE stream. -Version scope: examples below are aligned to Canopy `0.4.78`. +Version scope: examples below are aligned to Canopy `0.4.80`. Canonical endpoints live under `/api/v1`. A backward-compatible `/api` alias also exists for older agents, and claim/ack routes expose compatibility aliases such as `/claim`, `/ack`, `/acknowledge`, and `/acknoledge`. diff --git a/docs/QUICKSTART.md b/docs/QUICKSTART.md index afe27cb..abb7442 100644 --- a/docs/QUICKSTART.md +++ b/docs/QUICKSTART.md @@ -1,7 +1,7 @@ # Canopy Quick Start This guide is the primary technical first-run path for Canopy. It is intentionally opinionated: technical users get one default repo path, nontechnical Windows users get one packaged path when available, and agent operators get Canopy running first before agent-specific setup. -Version scope: this quick start is aligned to Canopy `0.4.78`. +Version scope: this quick start is aligned to Canopy `0.4.80`. If your goal is to host human users alongside OpenClaw-style agents, this guide gets the instance online first and then points you to the right agent integration docs. diff --git a/docs/WINDOWS_TRAY.md b/docs/WINDOWS_TRAY.md index 5656909..30f82bc 100644 --- a/docs/WINDOWS_TRAY.md +++ b/docs/WINDOWS_TRAY.md @@ -19,7 +19,7 @@ When a packaged Windows release is available, it will usually include: ## Compatibility Notes -The tray app is reviewed against Canopy `0.4.78`. +The tray app is reviewed against Canopy `0.4.80`. - Peer status uses `/api/v1/p2p/peers` with fallback to `/api/v1/p2p/known_peers`. - Message notifications use `/api/v1/channels` and `/api/v1/channels//messages`. diff --git a/pyproject.toml b/pyproject.toml index 8153eec..5cd6213 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "canopy" -version = "0.4.78" +version = "0.4.80" description = "Local-first peer-to-peer collaboration for humans and AI agents." readme = "README.md" requires-python = ">=3.10" diff --git a/tests/test_agent_reliability_endpoints.py b/tests/test_agent_reliability_endpoints.py index 4aac247..d87e75b 100644 --- a/tests/test_agent_reliability_endpoints.py +++ b/tests/test_agent_reliability_endpoints.py @@ -199,12 +199,12 @@ def setUp(self) -> None: '2026-02-23T10:01:00+00:00' ), ( - 'forge-agent', 'agent_operator.74ugCK', 'Agent Operator', 'pk-f', 'pw-f', + 'forge-agent', 'Forge_McClaw.74ugCK', 'Forge McClaw', 'pk-f', 'pw-f', 'agent', 'active', None, 'Build and systems', '2026-02-23T10:02:00+00:00' ), ( - 'human-owner', 'project_owner', 'Project Owner', 'pk-h', 'pw-h', + 'human-owner', 'maddog', 'Maddog', 'pk-h', 'pw-h', 'human', 'active', None, 'Owner account', '2026-02-23T10:03:00+00:00' ), @@ -470,6 +470,13 @@ def test_agents_endpoint_exposes_stable_handles_and_workload_counts(self) -> Non """, ('INB-forge-1', 'forge-agent', 'channel_message', 'msg-forge-1', 'pending', '2026-02-23 11:01:00.000000'), ) + self.conn.execute( + """ + INSERT INTO agent_inbox (id, agent_user_id, source_type, source_id, status, created_at) + VALUES (?, ?, ?, ?, ?, ?) + """, + ('INB-forge-2', 'forge-agent', 'channel_message', 'msg-forge-2', 'seen', '2026-02-23 11:02:00.000000'), + ) self.conn.commit() response = self.client.get( @@ -483,10 +490,10 @@ def test_agents_endpoint_exposes_stable_handles_and_workload_counts(self) -> Non forge = next((a for a in agents if a.get('user_id') == 'forge-agent'), None) self.assertIsNotNone(forge) - self.assertEqual(forge.get('stable_handle'), 'agent_operator') - self.assertIn('agent_operator.74ugCK', forge.get('mention_handles') or []) + self.assertEqual(forge.get('stable_handle'), 'Forge_McClaw') + self.assertIn('Forge_McClaw.74ugCK', forge.get('mention_handles') or []) self.assertEqual(forge.get('unacked_mentions'), 1) - self.assertEqual(forge.get('pending_inbox'), 1) + self.assertEqual(forge.get('pending_inbox'), 2) self.assertIn(forge.get('presence_state'), {'online', 'recent', 'idle', 'offline', 'no_checkin', 'remote_unknown'}) self.assertIn('last_check_in_at', forge) @@ -587,6 +594,13 @@ def test_system_health_reports_peer_and_queue_metrics(self) -> None: """, ('INB-health-1', 'agent-a', 'channel_message', 'msg-health-1', 'pending', '2026-02-23 11:05:00.000000'), ) + self.conn.execute( + """ + INSERT INTO agent_inbox (id, agent_user_id, source_type, source_id, status, created_at) + VALUES (?, ?, ?, ?, ?, ?) + """, + ('INB-health-2', 'agent-a', 'channel_message', 'msg-health-2', 'seen', '2026-02-23 11:06:00.000000'), + ) self.conn.commit() response = self.client.get( @@ -600,7 +614,7 @@ def test_system_health_reports_peer_and_queue_metrics(self) -> None: self.assertIn('queues', payload) self.assertIn('peers', payload) self.assertEqual((payload.get('queues') or {}).get('unacked_mentions'), 1) - self.assertEqual((payload.get('queues') or {}).get('pending_inbox'), 1) + self.assertEqual((payload.get('queues') or {}).get('pending_inbox'), 2) self.assertEqual((payload.get('queues') or {}).get('pending_p2p_messages'), 7) self.assertEqual((payload.get('peers') or {}).get('connected_count'), 1) diff --git a/tests/test_dm_agent_endpoint_regressions.py b/tests/test_dm_agent_endpoint_regressions.py index a5dc6d0..5d357d1 100644 --- a/tests/test_dm_agent_endpoint_regressions.py +++ b/tests/test_dm_agent_endpoint_regressions.py @@ -473,6 +473,114 @@ def test_dm_inbox_exposes_reply_target_and_reply_endpoint_keeps_response_in_dm(s self.assertEqual(self.p2p_manager.direct_messages[0]['recipient_id'], 'author') self.channel_manager.send_message.assert_not_called() + def test_inbox_skip_can_persist_completion_ref_evidence(self) -> None: + send_resp = self.client.post( + '/api/v1/messages', + json={ + 'content': 'This item will be skipped with evidence', + 'recipient_id': 'agent-local', + }, + headers=self._headers('key-author'), + ) + self.assertEqual(send_resp.status_code, 201) + message_id = (send_resp.get_json() or {}).get('message', {}).get('id') + self.assertTrue(message_id) + + inbox_row = self.conn.execute( + """ + SELECT id + FROM agent_inbox + WHERE agent_user_id = ? AND source_id = ? + """, + ('agent-local', message_id), + ).fetchone() + self.assertIsNotNone(inbox_row) + + patch_resp = self.client.patch( + '/api/v1/agents/me/inbox', + json={ + 'ids': [inbox_row['id']], + 'status': 'skipped', + 'completion_ref': { + 'source_type': 'feed_post', + 'source_id': 'post-skip-1', + 'note': 'Duplicate request already addressed elsewhere.', + }, + }, + headers=self._headers('key-agent-local'), + ) + self.assertEqual(patch_resp.status_code, 200) + self.assertEqual((patch_resp.get_json() or {}).get('updated'), 1) + + refreshed_resp = self.client.get( + '/api/v1/agents/me/inbox?status=skipped&limit=5', + headers=self._headers('key-agent-local'), + ) + self.assertEqual(refreshed_resp.status_code, 200) + refreshed_items = (refreshed_resp.get_json() or {}).get('items') or [] + skipped_item = next((item for item in refreshed_items if item.get('id') == inbox_row['id']), None) + self.assertIsNotNone(skipped_item) + self.assertEqual(skipped_item.get('status'), 'skipped') + self.assertEqual( + skipped_item.get('completion_ref'), + { + 'source_type': 'feed_post', + 'source_id': 'post-skip-1', + 'note': 'Duplicate request already addressed elsewhere.', + }, + ) + + def test_default_inbox_endpoints_keep_seen_items_actionable(self) -> None: + send_resp = self.client.post( + '/api/v1/messages', + json={ + 'content': 'This item will be seen but remain actionable', + 'recipient_id': 'agent-local', + }, + headers=self._headers('key-author'), + ) + self.assertEqual(send_resp.status_code, 201) + message_id = (send_resp.get_json() or {}).get('message', {}).get('id') + self.assertTrue(message_id) + + inbox_row = self.conn.execute( + """ + SELECT id + FROM agent_inbox + WHERE agent_user_id = ? AND source_id = ? + """, + ('agent-local', message_id), + ).fetchone() + self.assertIsNotNone(inbox_row) + + patch_resp = self.client.patch( + '/api/v1/agents/me/inbox', + json={ + 'ids': [inbox_row['id']], + 'status': 'seen', + }, + headers=self._headers('key-agent-local'), + ) + self.assertEqual(patch_resp.status_code, 200) + self.assertEqual((patch_resp.get_json() or {}).get('updated'), 1) + + list_resp = self.client.get( + '/api/v1/agents/me/inbox?limit=10', + headers=self._headers('key-agent-local'), + ) + self.assertEqual(list_resp.status_code, 200) + items = (list_resp.get_json() or {}).get('items') or [] + seen_item = next((item for item in items if item.get('id') == inbox_row['id']), None) + self.assertIsNotNone(seen_item) + self.assertEqual(seen_item.get('status'), 'seen') + + count_resp = self.client.get( + '/api/v1/agents/me/inbox/count', + headers=self._headers('key-agent-local'), + ) + self.assertEqual(count_resp.status_code, 200) + self.assertGreaterEqual(int((count_resp.get_json() or {}).get('count') or 0), 1) + def test_agent_dm_followups_are_not_dropped_by_persisted_cooldown_config(self) -> None: self.inbox_manager.set_config( 'agent-local', @@ -583,5 +691,343 @@ def test_delete_message_clears_local_dm_inbox_and_uses_direct_message_signal(sel self.assertEqual(inbox_after['n'], 0) +class TestInboxStateMachineEdgeCases(unittest.TestCase): + """Regression tests for inbox state-machine edge cases and correctness fixes.""" + + def setUp(self) -> None: + self.tempdir = tempfile.TemporaryDirectory() + self.addCleanup(self.tempdir.cleanup) + + self.db_file = Path(self.tempdir.name) / 'inbox_state_machine.db' + self.conn = sqlite3.connect(str(self.db_file)) + self.conn.row_factory = sqlite3.Row + self.conn.executescript( + """ + CREATE TABLE users ( + id TEXT PRIMARY KEY, + username TEXT, + display_name TEXT, + public_key TEXT, + password_hash TEXT, + account_type TEXT, + status TEXT, + origin_peer TEXT, + bio TEXT, + created_at TEXT + ); + """ + ) + self.conn.executemany( + """ + INSERT INTO users ( + id, username, display_name, public_key, password_hash, + account_type, status, origin_peer, bio, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + [ + ( + 'agent-test', + 'agent_test', + 'Agent Test', + 'pk-agent', + 'pw-agent', + 'agent', + 'active', + None, + 'test agent', + '2026-03-07T08:00:00+00:00', + ), + ], + ) + self.conn.commit() + + self.db_manager = _FakeDbManager(self.conn, self.db_file) + self.inbox = InboxManager(self.db_manager) + # Disable rate-limiting for tests + self.inbox.set_config( + 'agent-test', + { + 'cooldown_seconds': 0, + 'sender_cooldown_seconds': 0, + 'agent_sender_cooldown_seconds': 0, + 'channel_burst_limit': 1000, + 'channel_hourly_limit': 10000, + 'sender_hourly_limit': 10000, + }, + ) + + def tearDown(self) -> None: + self.conn.close() + + def _create_item(self, source_id: str = 'msg-1') -> str: + inbox_id = self.inbox.create_trigger( + agent_user_id='agent-test', + source_type='dm', + source_id=source_id, + sender_user_id='sender-1', + trigger_type='dm', + ) + self.assertIsNotNone(inbox_id) + return inbox_id + + def _row(self, inbox_id: str) -> sqlite3.Row: + return self.conn.execute( + "SELECT * FROM agent_inbox WHERE id = ?", + (inbox_id,), + ).fetchone() + + def test_seen_after_complete_clears_completion_metadata(self) -> None: + """Transitioning completed -> seen must clear completed_at and completion_ref_json. + + Before the fix, completed_at and completion_ref survived the seen + transition, producing misleading timestamps and phantom evidence links + on in-progress items. + """ + inbox_id = self._create_item('msg-seen-after-complete') + + # Complete with evidence + updated = self.inbox.update_items( + user_id='agent-test', + ids=[inbox_id], + status='completed', + completion_ref={'source_id': 'post-1', 'note': 'done'}, + ) + self.assertEqual(updated, 1) + row = self._row(inbox_id) + self.assertEqual(row['status'], 'completed') + self.assertIsNotNone(row['completed_at']) + self.assertIsNotNone(row['completion_ref_json']) + + # Re-open for review (seen) + updated = self.inbox.update_items( + user_id='agent-test', + ids=[inbox_id], + status='seen', + ) + self.assertEqual(updated, 1) + row = self._row(inbox_id) + self.assertEqual(row['status'], 'seen') + # Stale finalization data must be cleared + self.assertIsNone(row['completed_at'], "completed_at must be cleared when transitioning to 'seen'") + self.assertIsNone(row['completion_ref_json'], "completion_ref_json must be cleared when transitioning to 'seen'") + # seen_at should be set + self.assertIsNotNone(row['seen_at']) + + def test_seen_after_skipped_clears_completion_metadata(self) -> None: + """Transitioning skipped -> seen must also clear completion metadata.""" + inbox_id = self._create_item('msg-seen-after-skipped') + + self.inbox.update_items( + user_id='agent-test', + ids=[inbox_id], + status='skipped', + completion_ref={'note': 'duplicate'}, + ) + row = self._row(inbox_id) + self.assertEqual(row['status'], 'skipped') + self.assertIsNotNone(row['completed_at']) + self.assertIsNotNone(row['completion_ref_json']) + + self.inbox.update_items(user_id='agent-test', ids=[inbox_id], status='seen') + row = self._row(inbox_id) + self.assertEqual(row['status'], 'seen') + self.assertIsNone(row['completed_at']) + self.assertIsNone(row['completion_ref_json']) + + def test_pending_reset_clears_completion_state_preserves_seen_at(self) -> None: + """pending reset clears handled_at, completed_at, completion_ref but preserves seen_at.""" + inbox_id = self._create_item('msg-pending-reset') + + # Mark seen first + self.inbox.update_items(user_id='agent-test', ids=[inbox_id], status='seen') + seen_row = self._row(inbox_id) + seen_at_value = seen_row['seen_at'] + self.assertIsNotNone(seen_at_value) + + # Complete with evidence + self.inbox.update_items( + user_id='agent-test', + ids=[inbox_id], + status='completed', + completion_ref={'note': 'initial resolution'}, + ) + self.assertEqual(self._row(inbox_id)['status'], 'completed') + + # Reset to pending + updated = self.inbox.update_items(user_id='agent-test', ids=[inbox_id], status='pending') + self.assertEqual(updated, 1) + row = self._row(inbox_id) + self.assertEqual(row['status'], 'pending') + self.assertIsNone(row['handled_at'], "handled_at must be cleared on pending reset") + self.assertIsNone(row['completed_at'], "completed_at must be cleared on pending reset") + self.assertIsNone(row['completion_ref_json'], "completion_ref_json must be cleared on pending reset") + # seen_at is preserved (item was acknowledged before) + self.assertEqual(row['seen_at'], seen_at_value, "seen_at should survive a pending reset") + self.assertEqual(row['last_resolution_status'], 'completed') + self.assertIsNotNone(row['last_resolution_at']) + self.assertIsNotNone(row['last_completion_ref_json']) + + def test_reopen_preserves_last_resolution_evidence(self) -> None: + """Reopening an item must retain prior terminal-state evidence in the audit trail.""" + inbox_id = self._create_item('msg-reopen-audit') + + self.inbox.update_items( + user_id='agent-test', + ids=[inbox_id], + status='skipped', + completion_ref={'reason': 'duplicate', 'message_id': 'msg-dup-1'}, + ) + self.inbox.update_items(user_id='agent-test', ids=[inbox_id], status='seen') + + row = self._row(inbox_id) + self.assertEqual(row['status'], 'seen') + self.assertEqual(row['last_resolution_status'], 'skipped') + self.assertIsNotNone(row['last_resolution_at']) + self.assertIsNone(row['completion_ref_json']) + self.assertIsNotNone(row['last_completion_ref_json']) + last_ref = json.loads(row['last_completion_ref_json']) + self.assertEqual(last_ref['reason'], 'duplicate') + self.assertEqual(last_ref['message_id'], 'msg-dup-1') + + def test_default_actionable_list_and_count_include_seen(self) -> None: + """Default inbox list/count should include seen items because they remain actionable.""" + pending_id = self._create_item('msg-actionable-pending') + seen_id = self._create_item('msg-actionable-seen') + + self.inbox.update_items(user_id='agent-test', ids=[seen_id], status='seen') + self.inbox.update_items(user_id='agent-test', ids=[pending_id], status='completed') + + count = self.inbox.count_items(user_id='agent-test') + self.assertEqual(count, 1) + + items = self.inbox.list_items(user_id='agent-test', include_handled=False) + self.assertEqual(len(items), 1) + self.assertEqual(items[0]['id'], seen_id) + self.assertEqual(items[0]['status'], 'seen') + + def test_repeated_completion_with_new_ref_updates_evidence(self) -> None: + """Re-completing an already-completed item with a new ref must overwrite evidence.""" + inbox_id = self._create_item('msg-repeated-complete') + + self.inbox.update_items( + user_id='agent-test', + ids=[inbox_id], + status='completed', + completion_ref={'source_id': 'post-old', 'note': 'first attempt'}, + ) + row = self._row(inbox_id) + first_completed_at = row['completed_at'] + self.assertIsNotNone(first_completed_at) + self.assertIn('post-old', row['completion_ref_json']) + + # Re-complete with updated evidence + self.inbox.update_items( + user_id='agent-test', + ids=[inbox_id], + status='completed', + completion_ref={'source_id': 'post-new', 'note': 'revised'}, + ) + row = self._row(inbox_id) + self.assertEqual(row['status'], 'completed') + # completed_at should be preserved (original completion time) + self.assertEqual(row['completed_at'], first_completed_at) + # completion_ref must be updated to the new evidence + ref = json.loads(row['completion_ref_json']) + self.assertEqual(ref['source_id'], 'post-new') + self.assertEqual(ref['note'], 'revised') + + def test_repeated_completion_without_new_ref_preserves_existing_evidence(self) -> None: + """Re-completing without a new ref must not erase existing evidence.""" + inbox_id = self._create_item('msg-preserve-ref') + + self.inbox.update_items( + user_id='agent-test', + ids=[inbox_id], + status='completed', + completion_ref={'note': 'keeper'}, + ) + original_ref = self._row(inbox_id)['completion_ref_json'] + + # Re-complete with no ref + self.inbox.update_items(user_id='agent-test', ids=[inbox_id], status='completed') + row = self._row(inbox_id) + self.assertEqual(row['completion_ref_json'], original_ref, "existing evidence must survive re-completion without a new ref") + + def test_batch_update_partial_ids_returns_actual_updated_count(self) -> None: + """Batch update with some nonexistent IDs returns only the count of rows actually changed.""" + id1 = self._create_item('msg-batch-1') + id2 = self._create_item('msg-batch-2') + + updated = self.inbox.update_items( + user_id='agent-test', + ids=[id1, id2, 'nonexistent-inbox-id'], + status='seen', + ) + self.assertEqual(updated, 2, "should update exactly the 2 existing rows, not the phantom ID") + self.assertEqual(self._row(id1)['status'], 'seen') + self.assertEqual(self._row(id2)['status'], 'seen') + + def test_invalid_status_is_rejected_no_update_applied(self) -> None: + """An unrecognised status string must return 0 and leave the item unchanged. + + Before the fix, _normalize_storage_status fell back to 'pending', + which would silently reset items instead of rejecting the request. + """ + inbox_id = self._create_item('msg-invalid-status') + + # Complete first so we can detect an accidental reset + self.inbox.update_items(user_id='agent-test', ids=[inbox_id], status='completed') + self.assertEqual(self._row(inbox_id)['status'], 'completed') + + updated = self.inbox.update_items( + user_id='agent-test', + ids=[inbox_id], + status='bogus_status', + ) + self.assertEqual(updated, 0, "unrecognised status must be rejected") + # Item must remain completed, not silently reset to pending + self.assertEqual(self._row(inbox_id)['status'], 'completed') + + def test_migration_backfill_converts_handled_to_completed(self) -> None: + """_ensure_tables migration must convert legacy 'handled' rows to 'completed'.""" + import secrets as _secrets + # Insert two legacy 'handled' rows directly so they pre-date the migration + handled_id = f"INB{_secrets.token_hex(8)}" + completed_id = f"INB{_secrets.token_hex(8)}" + handled_at = '2026-01-01T12:00:00+00:00' + self.conn.executemany( + """ + INSERT INTO agent_inbox + (id, agent_user_id, source_type, source_id, trigger_type, + status, priority, created_at, handled_at, depth) + VALUES (?, 'agent-test', 'dm', ?, 'dm', ?, 'normal', ?, ?, 0) + """, + [ + (handled_id, f'src-handled-{handled_id}', 'handled', '2026-01-01T11:00:00+00:00', handled_at), + (completed_id, f'src-completed-{completed_id}', 'completed', '2026-01-01T11:00:00+00:00', None), + ], + ) + self.conn.commit() + + # Re-run the migration by calling _ensure_tables explicitly + self.inbox._ensure_tables() + + handled_row = self.conn.execute( + "SELECT status, completed_at, seen_at FROM agent_inbox WHERE id = ?", + (handled_id,), + ).fetchone() + self.assertIsNotNone(handled_row) + self.assertEqual(handled_row['status'], 'completed', "legacy 'handled' status must be migrated to 'completed'") + self.assertIsNotNone(handled_row['completed_at'], "completed_at must be backfilled from handled_at") + self.assertIsNotNone(handled_row['seen_at'], "seen_at must be backfilled from handled_at") + + # Pre-existing completed row must be left untouched + completed_row = self.conn.execute( + "SELECT status FROM agent_inbox WHERE id = ?", + (completed_id,), + ).fetchone() + self.assertEqual(completed_row['status'], 'completed') + + if __name__ == '__main__': unittest.main() diff --git a/tests/test_workspace_events.py b/tests/test_workspace_events.py index 6657ca4..b05c727 100644 --- a/tests/test_workspace_events.py +++ b/tests/test_workspace_events.py @@ -345,6 +345,11 @@ def test_deleted_dm_event_visibility_falls_back_to_payload(self) -> None: self.assertEqual(no_dm_permission['items'], []) def test_heartbeat_adds_workspace_event_seq_without_repurposing_last_event_seq(self) -> None: + self.client.post( + '/api/v1/agents/me/event-subscriptions', + headers={'X-API-Key': 'agent-key'}, + json={'types': ['mention.created', 'channel.message.created']}, + ) self.conn.execute( """ INSERT INTO mention_events (id, user_id, created_at, acknowledged_at) @@ -384,6 +389,52 @@ def test_heartbeat_adds_workspace_event_seq_without_repurposing_last_event_seq(s max(snapshot['last_mention_seq'], snapshot['last_inbox_seq']), ) self.assertNotEqual(snapshot['last_event_seq'], snapshot['workspace_event_seq']) + self.assertEqual(snapshot['event_subscription_source'], 'stored') + self.assertEqual( + snapshot['event_subscription_types'], + [EVENT_CHANNEL_MESSAGE_CREATED, EVENT_MENTION_CREATED], + ) + self.assertEqual(snapshot['event_subscription_count'], 2) + self.assertEqual(snapshot['event_subscription_unavailable_types'], []) + + def test_heartbeat_reports_unavailable_message_types_for_feed_only_key(self) -> None: + self.client.post( + '/api/v1/agents/me/event-subscriptions', + headers={'X-API-Key': 'agent-key'}, + json={'types': ['mention.created', 'channel.message.created']}, + ) + + response = self.client.get( + '/api/v1/agents/me/heartbeat', + headers={'X-API-Key': 'agent-feed-only'}, + ) + self.assertEqual(response.status_code, 200) + body = response.get_json() or {} + self.assertEqual(body['event_subscription_source'], 'stored') + self.assertEqual(body['event_subscription_types'], [EVENT_MENTION_CREATED]) + self.assertEqual(body['event_subscription_count'], 1) + self.assertEqual(body['event_subscription_unavailable_types'], [EVENT_CHANNEL_MESSAGE_CREATED]) + + def test_heartbeat_keeps_non_message_custom_subscription_types(self) -> None: + self.client.post( + '/api/v1/agents/me/event-subscriptions', + headers={'X-API-Key': 'agent-key'}, + json={'types': ['mention.created', 'channel.state.updated']}, + ) + + response = self.client.get( + '/api/v1/agents/me/heartbeat', + headers={'X-API-Key': 'agent-key'}, + ) + self.assertEqual(response.status_code, 200) + body = response.get_json() or {} + self.assertEqual(body['event_subscription_source'], 'stored') + self.assertEqual( + body['event_subscription_types'], + [EVENT_CHANNEL_STATE_UPDATED, EVENT_MENTION_CREATED], + ) + self.assertEqual(body['event_subscription_count'], 2) + self.assertEqual(body['event_subscription_unavailable_types'], []) def test_events_endpoint_and_owner_only_diagnostics(self) -> None: self.workspace_events.emit_event( @@ -594,6 +645,28 @@ def test_agent_events_accepts_explicit_type_override(self) -> None: [EVENT_CHANNEL_MESSAGE_CREATED], ) + def test_agent_events_feed_only_filters_explicit_channel_message_override(self) -> None: + self.workspace_events.emit_event( + event_type=EVENT_CHANNEL_MESSAGE_CREATED, + actor_user_id='agent-b', + target_user_id='agent-a', + channel_id='general', + visibility_scope='user', + dedupe_key='agent-events:feed-only:channel-created', + payload={'message_id': 'CH-feed-only-created', 'preview': 'should-hide'}, + ) + + response = self.client.get( + '/api/v1/agents/me/events?types=channel.message.created', + headers={'X-API-Key': 'agent-feed-only'}, + ) + self.assertEqual(response.status_code, 200) + body = response.get_json() + self.assertEqual(body['items'], []) + self.assertEqual(body['applied_types'], []) + self.assertEqual(body['selected_types'], [EVENT_CHANNEL_MESSAGE_CREATED]) + self.assertEqual(body['unavailable_types'], [EVENT_CHANNEL_MESSAGE_CREATED]) + def test_agent_events_respects_feed_only_permissions(self) -> None: self.workspace_events.emit_event( event_type=EVENT_MENTION_CREATED, @@ -629,6 +702,118 @@ def test_agent_events_respects_feed_only_permissions(self) -> None: [EVENT_MENTION_CREATED], ) + def test_agent_event_subscriptions_round_trip_and_feed_uses_stored_types(self) -> None: + subscribe = self.client.post( + '/api/v1/agents/me/event-subscriptions', + headers={'X-API-Key': 'agent-key'}, + json={'types': ['mention.created', 'channel.message.created']}, + ) + self.assertEqual(subscribe.status_code, 200) + sub_body = subscribe.get_json() + self.assertEqual(sub_body['subscription_source'], 'stored') + self.assertEqual( + sub_body['stored_types'], + [EVENT_CHANNEL_MESSAGE_CREATED, EVENT_MENTION_CREATED], + ) + self.assertEqual( + sub_body['effective_types'], + [EVENT_CHANNEL_MESSAGE_CREATED, EVENT_MENTION_CREATED], + ) + + self.workspace_events.emit_event( + event_type=EVENT_MENTION_CREATED, + actor_user_id='owner-user', + target_user_id='agent-a', + visibility_scope='user', + dedupe_key='agent-events:stored:mention', + payload={'mention_id': 'MN-stored', 'source_type': 'channel_message', 'source_id': 'msg-stored'}, + ) + self.workspace_events.emit_event( + event_type=EVENT_CHANNEL_MESSAGE_CREATED, + actor_user_id='agent-b', + target_user_id='agent-a', + channel_id='general', + visibility_scope='user', + dedupe_key='agent-events:stored:channel-created', + payload={'message_id': 'CH-stored-created', 'preview': 'stored channel'}, + ) + self.workspace_events.emit_event( + event_type=EVENT_DM_MESSAGE_DELETED, + actor_user_id='agent-b', + message_id='DM-stored-hidden', + visibility_scope='dm', + dedupe_key='agent-events:stored:dm-delete', + payload={ + 'preview': 'removed', + 'sender_id': 'agent-b', + 'recipient_id': 'agent-a', + 'group_id': None, + 'group_members': [], + }, + ) + + response = self.client.get( + '/api/v1/agents/me/events', + headers={'X-API-Key': 'agent-key'}, + ) + self.assertEqual(response.status_code, 200) + body = response.get_json() + self.assertEqual(body['subscription_source'], 'stored') + self.assertEqual( + [item['event_type'] for item in body['items']], + [EVENT_MENTION_CREATED, EVENT_CHANNEL_MESSAGE_CREATED], + ) + + def test_agent_event_subscriptions_reset_restores_defaults(self) -> None: + subscribe = self.client.post( + '/api/v1/agents/me/event-subscriptions', + headers={'X-API-Key': 'agent-key'}, + json={'types': ['mention.created']}, + ) + self.assertEqual(subscribe.status_code, 200) + + reset = self.client.post( + '/api/v1/agents/me/event-subscriptions', + headers={'X-API-Key': 'agent-key'}, + json={'reset': True}, + ) + self.assertEqual(reset.status_code, 200) + body = reset.get_json() + self.assertEqual(body['subscription_source'], 'default') + self.assertIsNone(body['stored_types']) + self.assertEqual(body['effective_types'], sorted(body['default_types'])) + + def test_agent_event_subscriptions_allow_empty_custom_feed(self) -> None: + subscribe = self.client.post( + '/api/v1/agents/me/event-subscriptions', + headers={'X-API-Key': 'agent-key'}, + json={'types': []}, + ) + self.assertEqual(subscribe.status_code, 200) + body = subscribe.get_json() + self.assertEqual(body['subscription_source'], 'stored') + self.assertEqual(body['stored_types'], []) + self.assertEqual(body['selected_types'], []) + self.assertEqual(body['effective_types'], []) + + self.workspace_events.emit_event( + event_type=EVENT_MENTION_CREATED, + actor_user_id='owner-user', + target_user_id='agent-a', + visibility_scope='user', + dedupe_key='agent-events:empty-custom:mention', + payload={'mention_id': 'MN-empty-custom', 'source_type': 'channel_message', 'source_id': 'msg-empty-custom'}, + ) + response = self.client.get( + '/api/v1/agents/me/events', + headers={'X-API-Key': 'agent-key'}, + ) + self.assertEqual(response.status_code, 200) + event_body = response.get_json() + self.assertEqual(event_body['subscription_source'], 'stored') + self.assertEqual(event_body['applied_types'], []) + self.assertEqual(event_body['items'], []) + def test_human_key_agent_events_does_not_create_agent_presence_or_runtime(self) -> None: self.workspace_events.emit_event( event_type=EVENT_MENTION_CREATED, @@ -658,12 +843,85 @@ def test_human_key_agent_events_does_not_create_agent_presence_or_runtime(self) runtime_table = self.conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='agent_runtime_state'" ).fetchone() + runtime_row = None if runtime_table is not None: runtime_row = self.conn.execute( "SELECT user_id FROM agent_runtime_state WHERE user_id = ?", ('observer',), ).fetchone() - self.assertIsNone(runtime_row) + self.assertIsNone(runtime_row) + + def test_feed_only_event_subscriptions_report_unavailable_types(self) -> None: + subscribe = self.client.post( + '/api/v1/agents/me/event-subscriptions', + headers={'X-API-Key': 'agent-key'}, + json={'types': ['mention.created', 'channel.message.created']}, + ) + self.assertEqual(subscribe.status_code, 200) + + response = self.client.get( + '/api/v1/agents/me/event-subscriptions', + headers={'X-API-Key': 'agent-feed-only'}, + ) + self.assertEqual(response.status_code, 200) + body = response.get_json() + self.assertEqual(body['subscription_source'], 'stored') + self.assertEqual(body['selected_types'], [EVENT_CHANNEL_MESSAGE_CREATED, EVENT_MENTION_CREATED]) + self.assertEqual(body['effective_types'], [EVENT_MENTION_CREATED]) + self.assertEqual(body['unavailable_types'], [EVENT_CHANNEL_MESSAGE_CREATED]) + + def test_agent_events_fallback_without_workspace_manager_reports_subscription_and_touches_runtime(self) -> None: + self.client.application.config['WORKSPACE_EVENT_MANAGER'] = None + self.client.post( + '/api/v1/agents/me/event-subscriptions', + headers={'X-API-Key': 'agent-key'}, + json={'types': ['mention.created', 'channel.message.created']}, + ) + + response = self.client.get( + '/api/v1/agents/me/events?after_seq=7', + headers={'X-API-Key': 'agent-feed-only'}, + ) + self.assertEqual(response.status_code, 200) + body = response.get_json() + self.assertEqual(body['after_seq'], 7) + self.assertEqual(body['next_after_seq'], 7) + self.assertEqual(body['subscription_source'], 'stored') + self.assertEqual(body['selected_types'], [EVENT_CHANNEL_MESSAGE_CREATED, EVENT_MENTION_CREATED]) + self.assertEqual(body['applied_types'], [EVENT_MENTION_CREATED]) + self.assertEqual(body['unavailable_types'], [EVENT_CHANNEL_MESSAGE_CREATED]) + + runtime_row = self.conn.execute( + """ + SELECT last_event_cursor_seen, last_event_fetch_at + FROM agent_runtime_state + WHERE user_id = ? + """, + ('agent-a',), + ).fetchone() + self.assertIsNotNone(runtime_row) + self.assertEqual(runtime_row['last_event_cursor_seen'], 7) + self.assertIsNotNone(runtime_row['last_event_fetch_at']) + + def test_general_events_feed_only_hides_channel_message_content(self) -> None: + self.workspace_events.emit_event( + event_type=EVENT_CHANNEL_MESSAGE_CREATED, + actor_user_id='agent-b', + target_user_id='agent-a', + channel_id='general', + visibility_scope='user', + dedupe_key='events:feed-only:channel-created', + payload={'message_id': 'CH-general-feed-only', 'preview': 'channel preview'}, + ) + + response = self.client.get( + '/api/v1/events?types=channel.message.created', + headers={'X-API-Key': 'agent-feed-only'}, + ) + self.assertEqual(response.status_code, 200) + body = response.get_json() + self.assertEqual(body['items'], []) + self.assertEqual(body['applied_types'], [EVENT_CHANNEL_MESSAGE_CREATED]) def test_inbound_dm_finalize_uses_canonical_message_id_for_created_event(self) -> None: msg = self.message_manager.create_message(