diff --git a/src/qiki/services/operator_console/clients/nats_client.py b/src/qiki/services/operator_console/clients/nats_client.py index 3ce92794..33f64b43 100644 --- a/src/qiki/services/operator_console/clients/nats_client.py +++ b/src/qiki/services/operator_console/clients/nats_client.py @@ -24,6 +24,7 @@ RADAR_TRACKS_SR, SYSTEM_TELEMETRY, RESPONSES_CONTROL, + QIKI_PROPOSALS_V1, ) @@ -297,6 +298,37 @@ async def message_handler(msg): except Exception as e: print(f"❌ Failed to subscribe to CONTROL_RESPONSES: {e}") raise + + async def subscribe_qiki_proposals( + self, + callback: Callable[[Dict[str, Any]], Awaitable[None]], + ) -> None: + """Subscribe to QIKI proposals batches (core NATS).""" + if not self.nc: + raise RuntimeError("Not connected to NATS") + + async def message_handler(msg): + try: + data = json.loads(msg.data.decode()) + await callback( + { + "stream": "QIKI_PROPOSALS", + "timestamp": datetime.now().isoformat(), + "subject": getattr(msg, "subject", None), + "data": data, + } + ) + except Exception as e: + print(f"Error processing QIKI_PROPOSALS message: {e}") + + subject = os.getenv("QIKI_PROPOSALS_SUBJECT", QIKI_PROPOSALS_V1) + try: + sub = await self.nc.subscribe(subject, cb=message_handler) + self.subscriptions["QIKI_PROPOSALS"] = sub + print(f"✅ Subscribed to QIKI_PROPOSALS: {subject}") + except Exception as e: + print(f"❌ Failed to subscribe to QIKI_PROPOSALS: {e}") + raise async def get_jetstream_info(self) -> Dict[str, Any]: """Get JetStream account info.""" diff --git a/src/qiki/services/operator_console/main_orion.py b/src/qiki/services/operator_console/main_orion.py index 7df73ebe..de41680a 100644 --- a/src/qiki/services/operator_console/main_orion.py +++ b/src/qiki/services/operator_console/main_orion.py @@ -2,10 +2,12 @@ from dataclasses import dataclass from dataclasses import asdict +import asyncio import json import math import os import time +from hashlib import sha256 from typing import Any, Optional from pydantic import ValidationError @@ -23,8 +25,16 @@ from qiki.services.operator_console.core.incidents import IncidentStore from qiki.services.operator_console.ui import i18n as I18N from qiki.shared.models.core import CommandMessage, MessageMetadata +from qiki.shared.models.orion_qiki_protocol import ( + EnvironmentMode, + IntentV1, + LangHint, + SelectionV1, + ProposalV1, + ProposalsBatchV1, +) from qiki.shared.models.telemetry import TelemetrySnapshotModel -from qiki.shared.nats_subjects import COMMANDS_CONTROL, QIKI_INTENTS +from qiki.shared.nats_subjects import COMMANDS_CONTROL, QIKI_INTENT_V1 try: from qiki.services.operator_console.ui.charts import PpiScopeRenderer @@ -289,6 +299,13 @@ def freshness_label(self, type_name: str, *, now_epoch: Optional[float] = None) hotkey_label="F8", aliases=("mission", "миссия", "tasks", "задачи", "task", "задача"), ), + OrionAppSpec( + screen="proposals", + title=I18N.bidi("Proposals", "Предложения"), + hotkey="ctrl+o", + hotkey_label="Ctrl+O", + aliases=("proposals", "предложения", "proposal", "предложение"), + ), OrionAppSpec( screen="rules", title=I18N.bidi("Rules", "Правила"), @@ -307,6 +324,7 @@ def freshness_label(self, type_name: str, *, now_epoch: Optional[float] = None) "power": I18N.bidi("Power", "Пит"), "diagnostics": I18N.bidi("Diag", "Диагн"), "mission": I18N.bidi("Mission", "Миссия"), + "proposals": I18N.bidi("Props", "Предл"), "rules": I18N.bidi("Rules", "Прав"), } @@ -508,7 +526,7 @@ def render(self) -> str: extra.append( f"{I18N.bidi('QIKI', 'QIKI')} {I18N.bidi('intent', 'намерение')}: q: | //" ) - if active_screen in {"radar", "events", "console", "summary", "rules"}: + if active_screen in {"radar", "events", "console", "summary", "rules", "proposals"}: extra.append( f"{I18N.bidi('Up/Down arrows', 'Стрелки вверх/вниз')} {I18N.bidi('Selection', 'Выбор')}" ) @@ -516,6 +534,8 @@ def render(self) -> str: extra.append(f"A {I18N.bidi('Acknowledge incident', 'Подтвердить инцидент')}") extra.append(f"X {I18N.bidi('Clear acknowledged', 'Очистить подтвержденные')}") extra.append(f"R {I18N.bidi('Mark read', 'Отметить прочитанным')}") + if active_screen == "proposals": + extra.append(f"{I18N.bidi('Inspector', 'Инспектор')}: {I18N.bidi('select proposal', 'выбрать предложение')}") if active_screen == "rules": extra.append(f"T {I18N.bidi('Toggle enabled', 'Переключить включено')}") extra.extend( @@ -606,6 +626,7 @@ class OrionApp(App): #power-table { height: 1fr; } #diagnostics-table { height: 1fr; } #mission-table { height: 1fr; } + #proposals-table { height: 1fr; } #rules-toolbar { height: 3; padding: 0 1; border: round #303030; background: #050505; } #rules-hint { padding: 0 1; color: #a0a0a0; background: #050505; } #rules-toggle-hint { padding: 0 1; color: #a0a0a0; background: #050505; } @@ -668,6 +689,9 @@ def __init__(self) -> None: self._power_by_key: dict[str, dict[str, Any]] = {} self._diagnostics_by_key: dict[str, dict[str, Any]] = {} self._mission_by_key: dict[str, dict[str, Any]] = {} + self._proposals_by_key: dict[str, dict[str, Any]] = {} + self._proposals_batches: list[dict[str, Any]] = [] + self._max_proposals_rows: int = int(os.getenv("OPERATOR_CONSOLE_MAX_PROPOSALS_ROWS", "200")) self._selection_by_app: dict[str, SelectionContext] = {} self._snapshots = SnapshotStore() self._events_filter_type: Optional[str] = None @@ -1746,6 +1770,15 @@ def compose(self) -> ComposeResult: mission_table.add_column(I18N.bidi("Value", "Значение"), width=60) yield mission_table + with Container(id="screen-proposals"): + proposals_table: DataTable = DataTable(id="proposals-table") + proposals_table.add_columns( + I18N.bidi("Priority", "Приоритет"), + I18N.bidi("Confidence", "Уверенность"), + I18N.bidi("Title", "Заголовок"), + ) + yield proposals_table + with Container(id="screen-rules"): with Horizontal(id="rules-toolbar"): yield Button(I18N.bidi("Reload rules", "Перезагрузить правила"), id="rules-reload") @@ -1781,6 +1814,7 @@ async def on_mount(self) -> None: self._seed_power_table() self._seed_diagnostics_table() self._seed_mission_table() + self._seed_proposals_table() self._seed_rules_table() self._update_system_snapshot() self._update_command_placeholder() @@ -2281,6 +2315,51 @@ def _seed_mission_table(self) -> None: table.clear() table.add_row("—", I18N.NA, I18N.NA, key="seed") + def _seed_proposals_table(self) -> None: + try: + table = self.query_one("#proposals-table", DataTable) + except Exception: + return + self._proposals_by_key = {} + self._selection_by_app.pop("proposals", None) + table.clear() + table.add_row(I18N.NA, I18N.NA, I18N.NA, key="seed") + + def _render_proposals_table(self) -> None: + try: + table = self.query_one("#proposals-table", DataTable) + except Exception: + return + table.clear() + rows = list(self._proposals_by_key.items()) + if not rows: + table.add_row(I18N.NA, I18N.NA, I18N.NA, key="seed") + return + + def sort_key(item: tuple[str, dict[str, Any]]) -> tuple[int, float]: + payload = item[1] + try: + pr = int(payload.get("priority") or 0) + except Exception: + pr = 0 + try: + conf = float(payload.get("confidence") or 0.0) + except Exception: + conf = 0.0 + return (-pr, -conf) + + rows.sort(key=sort_key) + for proposal_id, payload in rows: + pr = payload.get("priority") + conf = payload.get("confidence") + title = payload.get("title") + table.add_row( + str(pr if pr is not None else I18N.NA), + (f"{float(conf):.2f}" if isinstance(conf, (int, float)) else I18N.NA), + str(title or I18N.NA), + key=proposal_id, + ) + def _seed_rules_table(self) -> None: try: table = self.query_one("#rules-table", DataTable) @@ -2376,6 +2455,14 @@ async def _init_nats(self) -> None: f"⚠️ {I18N.bidi('Control responses subscribe failed', 'Подписка ответов управления не удалась')}: {e}" ) + try: + await self.nats_client.subscribe_qiki_proposals(self.handle_proposals_data) + self._log_msg(f"💡 {I18N.bidi('Subscribed', 'Подписка')}: {I18N.bidi('QIKI proposals', 'предложения QIKI')}") + except Exception as e: + self._log_msg( + f"⚠️ {I18N.bidi('QIKI proposals subscribe failed', 'Подписка предложений QIKI не удалась')}: {e}" + ) + def _refresh_header(self) -> None: telemetry_env = self._snapshots.get_last("telemetry") if telemetry_env is None or not isinstance(telemetry_env.payload, dict): @@ -2582,6 +2669,21 @@ def app_title(screen: str) -> str: (I18N.bidi("Message", "Сообщение"), I18N.fmt_na(payload.get("message"))), ] ) + if ctx.app_id == "proposals" and isinstance(ctx.payload, dict): + payload = ctx.payload + summary_rows.extend( + [ + (I18N.bidi("Proposal ID", "ID предложения"), I18N.fmt_na(payload.get("proposal_id"))), + (I18N.bidi("Priority", "Приоритет"), I18N.fmt_na(payload.get("priority"))), + (I18N.bidi("Confidence", "Уверенность"), I18N.fmt_na(payload.get("confidence"))), + ] + ) + fields_rows.extend( + [ + (I18N.bidi("Title", "Заголовок"), I18N.fmt_na(payload.get("title"))), + (I18N.bidi("Justification", "Обоснование"), I18N.fmt_na(payload.get("justification"))), + ] + ) if ctx.app_id == "mission": fields_rows.append( ( @@ -2623,6 +2725,9 @@ def app_title(screen: str) -> str: f"T — {I18N.bidi('Toggle enabled (with confirmation)', 'Переключить включено (с подтверждением)')}" ) + if self.active_screen == "proposals": + actions.append(f"{I18N.bidi('Proposals', 'Предложения')}: {I18N.bidi('separate from incidents', 'отдельно от инцидентов')}") + nats = I18N.yes_no(self.nats_connected) if isinstance(self.nats_connected, bool) else I18N.NA summary_rows.append((I18N.bidi("NATS connectivity", "Связь с NATS"), nats)) telemetry_age_s = self._snapshots.age_s("telemetry") @@ -2961,6 +3066,42 @@ def after(decision: bool) -> None: self.push_screen(ConfirmDialog(prompt), after) + def _ingest_proposals_batch(self, batch: ProposalsBatchV1) -> None: + self._proposals_batches.append(batch.model_dump()) + if len(self._proposals_batches) > 30: + self._proposals_batches = self._proposals_batches[-30:] + + for proposal in batch.proposals: + pid = str(getattr(proposal, "proposal_id", "") or "").strip() + if not pid: + continue + self._proposals_by_key[pid] = proposal.model_dump() + + if self._max_proposals_rows > 0 and len(self._proposals_by_key) > self._max_proposals_rows: + for k in list(self._proposals_by_key.keys())[: len(self._proposals_by_key) - self._max_proposals_rows]: + self._proposals_by_key.pop(k, None) + + self._render_proposals_table() + + async def handle_proposals_data(self, data: dict) -> None: + payload = data.get("data", {}) if isinstance(data, dict) else {} + try: + batch = ProposalsBatchV1.model_validate(payload) + except Exception as exc: + self._calm_log( + f"⚠️ {I18N.bidi('Bad proposals payload', 'Плохие предложения')}: {exc}", + level="warn", + ) + return + + self._ingest_proposals_batch(batch) + for proposal in batch.proposals: + title = str(getattr(proposal, "title", "") or I18N.NA) + self._calm_log(f"💡 QIKI: {title}", level="info") + + if self.active_screen == "proposals": + self._refresh_inspector() + async def handle_control_response(self, data: dict) -> None: payload = data.get("data", {}) if isinstance(data, dict) else {} if not isinstance(payload, dict): @@ -2993,7 +3134,18 @@ def action_show_screen(self, screen: str) -> None: self.query_one("#orion-sidebar", OrionSidebar).set_active(screen) except Exception: pass - for sid in ("system", "radar", "events", "console", "summary", "power", "diagnostics", "mission", "rules"): + for sid in ( + "system", + "radar", + "events", + "console", + "summary", + "power", + "diagnostics", + "mission", + "proposals", + "rules", + ): try: self.query_one(f"#screen-{sid}", Container).display = sid == screen except Exception: @@ -3046,6 +3198,8 @@ def safe_query(selector: str) -> Optional[Static]: workspace = safe_query("#diagnostics-table") elif self.active_screen == "mission": workspace = safe_query("#mission-table") + elif self.active_screen == "proposals": + workspace = safe_query("#proposals-table") else: workspace = safe_query("#panel-nav") @@ -3074,6 +3228,29 @@ def action_help(self) -> None: self._show_help() def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None: + if event.data_table.id == "proposals-table": + try: + row_key = str(event.row_key) + except Exception: + return + if row_key == "seed": + return + payload = self._proposals_by_key.get(row_key) + if not isinstance(payload, dict): + return + self._set_selection( + SelectionContext( + app_id="proposals", + key=row_key, + kind="proposal", + source="qiki", + created_at_epoch=time.time(), + payload=payload, + ids=(row_key,), + ) + ) + return + if event.data_table.id == "rules-table": try: row_key = str(event.row_key) @@ -3453,7 +3630,9 @@ async def _run_command(self, raw: str) -> None: level="info", ) return - await self._publish_qiki_intent(qiki_text) + # Fire-and-forget: ORION must never block the operator loop on agent I/O. + task = asyncio.create_task(self._publish_qiki_intent(qiki_text)) + task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None) return self._console_log(f"{I18N.bidi('command', 'команда')}> {cmd}", level="info") @@ -3679,23 +3858,115 @@ async def _run_command(self, raw: str) -> None: async def _publish_qiki_intent(self, text: str) -> None: if not text: return - self._console_log(f"{I18N.bidi('QIKI intent', 'Намерение QIKI')}> {text}", level="info") if not self.nats_client: - self._console_log(f"❌ {I18N.bidi('NATS not initialized', 'NATS не инициализирован')}", level="error") + self._console_log( + f"❌ {I18N.bidi('NATS not initialized', 'NATS не инициализирован')}: " + f"{I18N.bidi('intent not sent', 'намерение не отправлено')}", + level="error", + ) return + + def nested_get(d: Any, path: str) -> Any: + node = d + for part in (path or "").split("."): + if not part: + continue + if not isinstance(node, dict): + return None + node = node.get(part) + return node + + # Minimal snapshot: vitals + screen + selection + top incidents. + vitals: dict[str, Any] = {} + try: + normalized = TelemetrySnapshotModel.normalize_payload(self.latest_telemetry or {}) + except Exception: + normalized = {} + if isinstance(normalized, dict) and normalized: + vitals = { + "battery_pct": normalized.get("battery"), + "soc_pct": nested_get(normalized, "power.soc_pct"), + "hull_integrity": nested_get(normalized, "hull.integrity"), + "radiation_usvh": normalized.get("radiation_usvh"), + "temp_external_c": normalized.get("temp_external_c"), + "temp_core_c": normalized.get("temp_core_c"), + "online": nested_get(normalized, "link.online"), + } + + ctx = self._selection_by_app.get(self.active_screen) + sel_kind = "none" + sel_id: Optional[str] = None + if ctx is not None: + k = (ctx.kind or "").strip().lower() + if k in {"event", "incident", "track", "snapshot"}: + sel_kind = k + sel_id = (ctx.key or "").strip() or None + + incidents_top: list[dict[str, Any]] = [] + if self._incident_store is not None: + self._incident_store.refresh() + incidents = list(self._incident_store.list_incidents()) + severity_rank = {"A": 0, "C": 1, "W": 2, "I": 3} + incidents.sort( + key=lambda inc: ( + bool(getattr(inc, "acked", False)), + severity_rank.get(str(getattr(inc, "severity", "W")), 9), + -float(getattr(inc, "last_seen", 0.0) or 0.0), + ) + ) + limit = int(os.getenv("OPERATOR_CONSOLE_INTENT_TOP_INCIDENTS", "10") or 10) + now = time.time() + for inc in incidents[: max(0, limit)]: + incidents_top.append( + { + "incident_id": getattr(inc, "incident_id", None), + "rule_id": getattr(inc, "rule_id", None), + "title": getattr(inc, "title", None), + "severity": getattr(inc, "severity", None), + "state": getattr(inc, "state", None), + "acked": bool(getattr(inc, "acked", False)), + "count": int(getattr(inc, "count", 0) or 0), + "age_s": max(0.0, now - float(getattr(inc, "last_seen", now) or now)), + "source": getattr(inc, "source", None), + "subject": getattr(inc, "subject", None), + } + ) + + env_raw = (os.getenv("OPERATOR_CONSOLE_ENVIRONMENT_MODE") or "FACTORY").strip().upper() + env_mode = EnvironmentMode.FACTORY if env_raw != "MISSION" else EnvironmentMode.MISSION + + app_spec = next((a for a in ORION_APPS if a.screen == self.active_screen), ORION_APPS[0]) + intent = IntentV1( + text=text, + lang_hint=LangHint.AUTO, + screen=menu_label(app_spec), + selection=SelectionV1(kind=sel_kind, id=sel_id), + ts=int(time.time() * 1000), + environment_mode=env_mode, + snapshot_min={ + "vitals": vitals, + "active_screen": self.active_screen, + "selection": {"kind": sel_kind, "id": sel_id}, + "incidents_top": incidents_top, + }, + ) + + intent_payload = intent.model_dump() + canonical = json.dumps(intent_payload, ensure_ascii=False, separators=(",", ":"), sort_keys=True).encode("utf-8") + digest = sha256(canonical).hexdigest()[:8] try: await self.nats_client.publish_command( - QIKI_INTENTS, - { - "text": text, - "source": "operator-console", - "ts_epoch": time.time(), - }, + QIKI_INTENT_V1, + intent_payload, + ) + self._console_log( + f"📤 {I18N.bidi('Intent sent', 'Намерение отправлено')}: " + f"{I18N.bidi('hash', 'хэш')}={digest} {I18N.bidi('ts', 'время')}={intent.ts}", + level="info", ) - self._console_log(f"📤 {I18N.bidi('Sent to QIKI', 'Отправлено в QIKI')}: {QIKI_INTENTS}", level="info") except Exception as e: self._console_log( - f"❌ {I18N.bidi('Failed to send', 'Не удалось отправить')}: {e}", + f"❌ {I18N.bidi('Failed to send intent', 'Не удалось отправить намерение')}: {e}", level="error", ) @@ -3710,7 +3981,7 @@ def _update_command_placeholder(self) -> None: f"{I18N.bidi('help', 'помощь')} | " f"{I18N.bidi('screen', 'экран')} /<имя> | " f"simulation.start/симуляция.старт | " - f"{I18N.bidi('QIKI', 'QIKI')} q: " + f"{I18N.bidi('QIKI', 'QIKI')} q: | // " ) @staticmethod diff --git a/src/qiki/services/operator_console/tests/test_qiki_proposals_display.py b/src/qiki/services/operator_console/tests/test_qiki_proposals_display.py new file mode 100644 index 00000000..58ce49bd --- /dev/null +++ b/src/qiki/services/operator_console/tests/test_qiki_proposals_display.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import pytest + +from qiki.services.operator_console.main_orion import OrionApp +from qiki.shared.models.orion_qiki_protocol import ProposalsBatchV1, ProposalV1 + + +def test_ingest_proposals_batch_updates_store() -> None: + app = OrionApp() + assert app._proposals_by_key == {} + + batch = ProposalsBatchV1( + ts=1700000000000, + proposals=[ + ProposalV1( + proposal_id="P1", + title="Title 1", + justification="Just 1", + priority=80, + confidence=0.7, + ), + ProposalV1( + proposal_id="P2", + title="Title 2", + justification="Just 2", + priority=20, + confidence=0.5, + ), + ], + metadata={"source": "test"}, + ) + + app._ingest_proposals_batch(batch) + assert "P1" in app._proposals_by_key + assert app._proposals_by_key["P1"]["title"] == "Title 1" + + +@pytest.mark.asyncio +async def test_handle_proposals_data_invalid_does_not_crash() -> None: + app = OrionApp() + await app.handle_proposals_data({"data": {"version": 1}}) diff --git a/src/qiki/services/operator_console/tests/test_qiki_routing.py b/src/qiki/services/operator_console/tests/test_qiki_routing.py index ed83f04d..71fef6e2 100644 --- a/src/qiki/services/operator_console/tests/test_qiki_routing.py +++ b/src/qiki/services/operator_console/tests/test_qiki_routing.py @@ -1,6 +1,12 @@ from __future__ import annotations +import asyncio + +import pytest + from qiki.services.operator_console.main_orion import OrionApp +from qiki.shared.models.orion_qiki_protocol import IntentV1 +from qiki.shared.nats_subjects import QIKI_INTENT_V1 def test_parse_qiki_intent_prefix_q_colon() -> None: @@ -25,3 +31,39 @@ def test_parse_qiki_intent_shell_command() -> None: is_qiki, text = OrionApp._parse_qiki_intent("clear") assert is_qiki is False assert text is None + + +class _FakeNats: + def __init__(self) -> None: + self.published: list[tuple[str, dict]] = [] + + async def publish_command(self, subject: str, command: dict) -> None: + self.published.append((subject, command)) + + +@pytest.mark.asyncio +async def test_qiki_prefix_publishes_intent_v1() -> None: + app = OrionApp() + app.nats_client = _FakeNats() # type: ignore[assignment] + + await app._run_command("q: scan 360") + await asyncio.sleep(0) + + assert len(app.nats_client.published) == 1 # type: ignore[attr-defined] + subject, payload = app.nats_client.published[0] # type: ignore[attr-defined] + assert subject == QIKI_INTENT_V1 + + intent = IntentV1.model_validate(payload) + assert intent.environment_mode is not None + assert intent.screen + + +@pytest.mark.asyncio +async def test_shell_command_does_not_publish_intent() -> None: + app = OrionApp() + app.nats_client = _FakeNats() # type: ignore[assignment] + + await app._run_command("help") + await asyncio.sleep(0) + + assert app.nats_client.published == [] # type: ignore[attr-defined] diff --git a/src/qiki/shared/models/orion_qiki_protocol.py b/src/qiki/shared/models/orion_qiki_protocol.py new file mode 100644 index 00000000..6a9ab668 --- /dev/null +++ b/src/qiki/shared/models/orion_qiki_protocol.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from enum import Enum +from typing import Any, Literal, Optional + +from pydantic import BaseModel, ConfigDict, Field, field_validator + + +class _StrictModel(BaseModel): + model_config = ConfigDict(extra="forbid", validate_assignment=True) + + +class LangHint(str, Enum): + AUTO = "auto" + RU = "ru" + EN = "en" + + +class EnvironmentMode(str, Enum): + FACTORY = "FACTORY" + MISSION = "MISSION" + + +class SelectionV1(_StrictModel): + kind: Literal["event", "incident", "track", "snapshot", "none"] = "none" + id: Optional[str] = None + + +class IntentV1(_StrictModel): + version: Literal[1] = 1 + text: str + lang_hint: LangHint = LangHint.AUTO + screen: str + selection: SelectionV1 = Field(default_factory=SelectionV1) + ts: int + environment_mode: EnvironmentMode = EnvironmentMode.FACTORY + snapshot_min: dict[str, Any] = Field(default_factory=dict) + + +class ProposalV1(_StrictModel): + proposal_id: str + title: str + justification: str + priority: int = Field(ge=0, le=100) + confidence: float = Field(ge=0.0, le=1.0) + proposed_actions: list[Any] = Field(default_factory=list) + + @field_validator("proposed_actions") + @classmethod + def _must_be_empty_in_stage_a(cls, v: list[Any]) -> list[Any]: + if v: + raise ValueError("proposed_actions must be empty in Stage A") + return v + + +class ProposalsBatchV1(_StrictModel): + version: Literal[1] = 1 + ts: int + proposals: list[ProposalV1] = Field(default_factory=list) + metadata: dict[str, Any] = Field(default_factory=dict) + diff --git a/src/qiki/shared/nats_subjects.py b/src/qiki/shared/nats_subjects.py index cbfc22af..9c07ce07 100644 --- a/src/qiki/shared/nats_subjects.py +++ b/src/qiki/shared/nats_subjects.py @@ -30,7 +30,11 @@ RESPONSES_CONTROL = "qiki.responses.control" # QIKI interaction subjects (operator intents, agent replies) -QIKI_INTENTS = "qiki.intents" +QIKI_INTENT_V1 = "qiki.intent.v1" +QIKI_PROPOSALS_V1 = "qiki.proposals.v1" + +# Backward-compat alias: prefer QIKI_INTENT_V1. +QIKI_INTENTS = QIKI_INTENT_V1 # Events subjects EVENTS_V1_WILDCARD = "qiki.events.v1.>" diff --git a/tests/unit/test_orion_qiki_protocol_v1.py b/tests/unit/test_orion_qiki_protocol_v1.py new file mode 100644 index 00000000..bd579c97 --- /dev/null +++ b/tests/unit/test_orion_qiki_protocol_v1.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +import json + +import pytest +from pydantic import ValidationError + +from qiki.shared.models.orion_qiki_protocol import ( + EnvironmentMode, + IntentV1, + LangHint, + ProposalV1, + ProposalsBatchV1, + SelectionV1, +) + + +def test_intent_v1_roundtrip() -> None: + payload = IntentV1( + text="scan 360", + lang_hint=LangHint.EN, + screen="Events/События", + selection=SelectionV1(kind="incident", id="INC|sensor|core"), + ts=1700000000000, + environment_mode=EnvironmentMode.FACTORY, + snapshot_min={"nats": True, "unread": 3}, + ) + dumped = payload.model_dump() + reloaded = IntentV1.model_validate(dumped) + assert reloaded.version == 1 + assert reloaded.text == "scan 360" + assert reloaded.selection.kind == "incident" + assert reloaded.snapshot_min["unread"] == 3 + + +def test_intent_v1_requires_fields() -> None: + with pytest.raises(ValidationError): + IntentV1.model_validate({"version": 1, "text": "x"}) + + +def test_proposal_v1_actions_must_be_empty_in_stage_a() -> None: + ok = ProposalV1( + proposal_id="P1", + title="Title", + justification="Justification", + priority=50, + confidence=0.6, + proposed_actions=[], + ) + assert ok.proposed_actions == [] + + with pytest.raises(ValidationError): + ProposalV1( + proposal_id="P2", + title="Title", + justification="Justification", + priority=50, + confidence=0.6, + proposed_actions=[{"op": "do"}], + ) + + +def test_batch_v1_json_roundtrip() -> None: + batch = ProposalsBatchV1( + ts=1700000000000, + proposals=[ + ProposalV1( + proposal_id="P1", + title="T", + justification="J", + priority=10, + confidence=0.9, + ) + ], + metadata={"request_id": "RID"}, + ) + raw = batch.model_dump_json() + parsed = ProposalsBatchV1.model_validate_json(raw) + assert parsed.version == 1 + assert parsed.proposals[0].proposal_id == "P1" + assert parsed.metadata["request_id"] == "RID" + + +def test_version_compatibility_strict() -> None: + payload = { + "version": 2, + "ts": 1700000000000, + "proposals": [], + "metadata": {}, + } + with pytest.raises(ValidationError): + ProposalsBatchV1.model_validate(payload) + + # Ensure we can still deserialize strict v1 payloads even if they came as JSON. + raw = json.dumps({"version": 1, "ts": 1700000000000, "proposals": [], "metadata": {}}, ensure_ascii=False) + parsed = ProposalsBatchV1.model_validate_json(raw) + assert parsed.version == 1 + + +def test_strict_extra_fields_rejected() -> None: + with pytest.raises(ValidationError): + IntentV1.model_validate( + { + "version": 1, + "text": "x", + "lang_hint": "auto", + "screen": "System/Система", + "selection": {"kind": "none"}, + "ts": 1700000000000, + "environment_mode": "FACTORY", + "snapshot_min": {}, + "extra": "nope", + } + )