From bd77ce73ec3b1994755da345fef5f03f7ddf2de9 Mon Sep 17 00:00:00 2001 From: "coderabbitai[bot]" <136622811+coderabbitai[bot]@users.noreply.github.com> Date: Fri, 9 Jan 2026 05:57:05 +0000 Subject: [PATCH] =?UTF-8?q?=F0=9F=93=9D=20Add=20docstrings=20to=20`feature?= =?UTF-8?q?/neuralengine-openai-proposals-pr5`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Docstrings generation was requested by @sonra44. * https://github.com/sonra44/QIKI_DTMP/pull/67#issuecomment-3724542532 The following files were modified: * `src/qiki/services/operator_console/clients/nats_client.py` * `src/qiki/services/operator_console/main_orion.py` * `src/qiki/services/operator_console/tests/test_qiki_routing.py` * `src/qiki/services/q_core_agent/core/neural_engine.py` * `src/qiki/services/q_core_agent/core/openai_responses_client.py` * `src/qiki/services/q_core_agent/tests/test_agent.py` * `src/qiki/services/q_core_agent/tests/test_neuralengine_proposals_only.py` * `src/qiki/shared/models/orion_qiki_protocol.py` * `tests/unit/test_orion_qiki_protocol_v1.py` --- .../operator_console/clients/nats_client.py | 50 +++++- .../services/operator_console/main_orion.py | 168 +++++++++++++++++- .../tests/test_qiki_routing.py | 16 +- .../q_core_agent/core/neural_engine.py | 56 +++++- .../core/openai_responses_client.py | 58 +++++- .../services/q_core_agent/tests/test_agent.py | 16 +- .../tests/test_neuralengine_proposals_only.py | 21 ++- src/qiki/shared/models/orion_qiki_protocol.py | 13 +- tests/unit/test_orion_qiki_protocol_v1.py | 9 +- 9 files changed, 392 insertions(+), 15 deletions(-) diff --git a/src/qiki/services/operator_console/clients/nats_client.py b/src/qiki/services/operator_console/clients/nats_client.py index 33f64b43..97574e24 100644 --- a/src/qiki/services/operator_console/clients/nats_client.py +++ b/src/qiki/services/operator_console/clients/nats_client.py @@ -272,7 +272,19 @@ async def subscribe_control_responses( self, callback: Callable[[Dict[str, Any]], Awaitable[None]], ) -> None: - """Subscribe to control responses emitted by FastStream bridge.""" + """ + Subscribe to control response messages from the FastStream bridge and forward each message to the provided callback. + + Parameters: + callback (Callable[[Dict[str, Any]], Awaitable[None]]): Async function invoked for every received message. It will be called with a dictionary containing: + - stream: `"CONTROL_RESPONSES"` + - timestamp: ISO 8601 timestamp string when the message was processed + - subject: the NATS subject of the message (or `None` if unavailable) + - data: the decoded JSON payload of the message + + Raises: + RuntimeError: If the client is not connected to NATS. + """ if not self.nc: raise RuntimeError("Not connected to NATS") @@ -303,11 +315,34 @@ async def subscribe_qiki_proposals( self, callback: Callable[[Dict[str, Any]], Awaitable[None]], ) -> None: - """Subscribe to QIKI proposals batches (core NATS).""" + """ + Subscribe to QIKI proposal batches from core NATS and forward each message to the provided callback. + + The callback is invoked with a dictionary containing the keys: + - `stream`: the string "QIKI_PROPOSALS" + - `timestamp`: ISO 8601 timestamp when the message was processed + - `subject`: the NATS subject on which the message was received (or None) + - `data`: the decoded JSON payload + + Parameters: + callback (Callable[[Dict[str, Any]], Awaitable[None]]): Async function called for each incoming message with the described dictionary payload. + + Raises: + RuntimeError: If the NATS client is not connected. + """ if not self.nc: raise RuntimeError("Not connected to NATS") async def message_handler(msg): + """ + Handle an incoming NATS message for QIKI proposals by decoding JSON and invoking the subscription callback with a structured payload. + + Parameters: + msg: NATS message object whose `data` is a JSON-encoded bytes payload and which may have a `subject` attribute. + + Notes: + On processing failure, an error message is printed. + """ try: data = json.loads(msg.data.decode()) await callback( @@ -331,7 +366,14 @@ async def message_handler(msg): raise async def get_jetstream_info(self) -> Dict[str, Any]: - """Get JetStream account info.""" + """ + Retrieve high-level JetStream account statistics. + + Returns a dictionary with keys "memory", "storage", "streams", and "consumers" containing the corresponding account values. If an error occurs while fetching info, an empty dict is returned. + + Raises: + RuntimeError: If the client is not connected to JetStream. + """ if not self.js: raise RuntimeError("Not connected to JetStream") @@ -378,4 +420,4 @@ async def track_callback(data): if __name__ == "__main__": - asyncio.run(test_client()) + asyncio.run(test_client()) \ No newline at end of file diff --git a/src/qiki/services/operator_console/main_orion.py b/src/qiki/services/operator_console/main_orion.py index de41680a..2b066693 100644 --- a/src/qiki/services/operator_console/main_orion.py +++ b/src/qiki/services/operator_console/main_orion.py @@ -521,6 +521,11 @@ def title_with_state(app: OrionAppSpec) -> str: class OrionKeybar(Static): def render(self) -> str: + """ + Builds the keybar string showing per-app hotkeys and context-sensitive command hints. + + The returned string contains formatted, bracketed tokens for each registered app (hotkey label and menu label) followed by contextual hints (e.g., navigation, event controls, proposals/rules actions) adapted to the current active screen. If the computed string is longer than the available width, it is truncated and ends with a trailing ellipsis (`…`). + """ active_screen = getattr(self.app, "active_screen", "system") extra: list[str] = [f"{I18N.bidi('Tab', 'Табуляция')} {I18N.bidi('Focus', 'Фокус')}"] extra.append( @@ -674,6 +679,11 @@ class OrionApp(App): active_screen = reactive("system") def __init__(self) -> None: + """ + Initialize the OrionApp instance and prepare internal state, stores, configuration limits, and auxiliary services. + + Sets up in-memory stores for tracks, events, console, summary, power, diagnostics, mission, and proposals; selection tracking per app; a SnapshotStore; environment-driven limits and overrides (max rows, TTLs, command length, output/bottom-bar sizing); the file-backed rules repository and placeholders for incident rules and store; optional PPI scope renderer when available; and then loads incident rules and updates the initial system snapshot. + """ super().__init__() self.nats_client: Optional[NATSClient] = None self._tracks_by_id: dict[str, tuple[dict[str, Any], float]] = {} @@ -1666,6 +1676,21 @@ def passes(inc: Any) -> bool: pass def compose(self) -> ComposeResult: + """ + Builds the application's widget hierarchy and screen layout for the ORION operator console. + + Constructs and yields the top-level UI: a header, workspace (sidebar, inspector, bottom command/output bar), and individual screens with their panels and DataTable widgets. Included screens and components: + - Header (OrionHeader) + - Workspace containing OrionSidebar and OrionInspector + - Bottom bar with output log, command input, and keybar + - System dashboard panels and radar PPI + radar table + - Events, Console, Summary, Power, Diagnostics, and Mission screens with their tables + - Proposals screen with a proposals table + - Rules screen with a toolbar (reload button and hints) and rules table + + Returns: + ComposeResult: an iterable yielding the constructed widgets for composition. + """ with Vertical(id="orion-root"): yield OrionHeader(id="orion-header") with Container(id="orion-workspace"): @@ -1803,6 +1828,11 @@ def compose(self) -> ComposeResult: yield rules_table async def on_mount(self) -> None: + """ + Initialize the application UI, seed all panels and tables, start background refresh timers, and establish NATS connectivity. + + Performs initial screen activation and UI setup (system header, radar, events, console, summary, power, diagnostics, mission, proposals, rules), updates the system snapshot and command placeholder, refreshes the inspector and responsive layout, starts the NATS client, and schedules periodic refresh callbacks for header, radar, summary, diagnostics, and mission. Also attempts to focus the command input. + """ self.action_show_screen("system") self._init_system_panels() self._seed_system_panels() @@ -2306,6 +2336,11 @@ def _seed_diagnostics_table(self) -> None: table.add_row("—", I18N.NA, I18N.NA, I18N.NA, key="seed") def _seed_mission_table(self) -> None: + """ + Initialize the mission table UI with a default seed row and reset related mission selection state. + + Clears any existing rows in the mission DataTable, resets the internal mission lookup and selection for the "mission" app, and inserts a single placeholder row indicating no available mission data. + """ try: table = self.query_one("#mission-table", DataTable) except Exception: @@ -2316,6 +2351,11 @@ def _seed_mission_table(self) -> None: table.add_row("—", I18N.NA, I18N.NA, key="seed") def _seed_proposals_table(self) -> None: + """ + Initialize the proposals data table and reset related in-memory state. + + Clears the proposals table UI, empties the internal proposals lookup, removes any current proposals selection, and inserts a single placeholder row indicating no available proposals. If the proposals table widget cannot be found, the function returns without side effects. + """ try: table = self.query_one("#proposals-table", DataTable) except Exception: @@ -2326,6 +2366,11 @@ def _seed_proposals_table(self) -> None: table.add_row(I18N.NA, I18N.NA, I18N.NA, key="seed") def _render_proposals_table(self) -> None: + """ + Populate the proposals DataTable (#proposals-table) with the current proposals from internal store. + + Clears any existing rows, then queries the widget with id "#proposals-table". If the table widget is not found the method returns silently. If there are no proposals, inserts a single seed row with NA placeholders. Otherwise, sorts proposals by priority (descending) then confidence (descending) and adds a row per proposal with columns: Priority, Confidence (formatted to two decimal places when numeric), and Title. Uses localized `I18N.NA` for missing values and uses each proposal's id as the row key. + """ try: table = self.query_one("#proposals-table", DataTable) except Exception: @@ -2337,6 +2382,15 @@ def _render_proposals_table(self) -> None: return def sort_key(item: tuple[str, dict[str, Any]]) -> tuple[int, float]: + """ + Compute a sort key that orders items by descending `priority` then descending `confidence`. + + Parameters: + item (tuple[str, dict[str, Any]]): A pair of (id, payload) where `payload` may contain numeric `priority` and `confidence` fields. + + Returns: + tuple[int, float]: A tuple (-priority, -confidence) where missing or non-numeric values are treated as 0 so that higher priority/confidence sort before lower ones. + """ payload = item[1] try: pr = int(payload.get("priority") or 0) @@ -2361,6 +2415,11 @@ def sort_key(item: tuple[str, dict[str, Any]]) -> tuple[int, float]: ) def _seed_rules_table(self) -> None: + """ + Seed the rules data table with a placeholder row and clear any existing rules selection. + + Clears the internal selection for the "rules" app, finds the DataTable with id "#rules-table", removes all rows, and inserts a single placeholder row labeled as not available. If the table cannot be found, the method returns without error. + """ try: table = self.query_one("#rules-table", DataTable) except Exception: @@ -2414,6 +2473,11 @@ def match_summary(rule: Any) -> str: ) async def _init_nats(self) -> None: + """ + Initialize the NATS client, establish connectivity, and subscribe to the application's NATS topics. + + On successful connection this sets self.nats_client, marks self.nats_connected True, logs the outcome, and updates the system snapshot. If the initial connection attempt fails the method marks self.nats_connected False, logs the failure, updates the system snapshot, and returns early. Topic subscriptions (telemetry, tracks, events, control responses, QIKI proposals) are attempted in a best-effort fashion; subscription failures are logged but do not raise exceptions or abort other subscriptions. + """ self.nats_client = NATSClient() try: await self.nats_client.connect() @@ -2464,6 +2528,11 @@ async def _init_nats(self) -> None: ) def _refresh_header(self) -> None: + """ + Update the top header with the most recent telemetry snapshot and refresh the inspector when viewing the system screen. + + Looks up the latest telemetry EventEnvelope from the snapshot store and, if present and valid, forwards its payload and derived metadata (NATS connectivity, age, and freshness) to the OrionHeader for display. If no telemetry is available or an error occurs, the method does nothing. When the active screen is "system", the inspector view is refreshed after the header update. + """ telemetry_env = self._snapshots.get_last("telemetry") if telemetry_env is None or not isinstance(telemetry_env.payload, dict): return @@ -2482,6 +2551,11 @@ def _refresh_header(self) -> None: self._refresh_inspector() def _refresh_inspector(self) -> None: + """ + Update the inspector pane to reflect the current selection and active screen. + + Builds the Summary, Fields, Raw data, and Actions sections for the right-hand inspector based on the current selection context and active screen (e.g., events, radar, rules, proposals, mission, console). If no selection exists, shows a "No selection" summary. Adds connectivity and telemetry age information. Safely returns without error if the inspector widget cannot be found. + """ try: inspector = self.query_one("#orion-inspector", OrionInspector) except Exception: @@ -3030,6 +3104,11 @@ def _apply_rule_enabled_change(self, rule_id: str, enabled: bool) -> None: ) def action_toggle_selected_rule_enabled(self) -> None: + """ + Toggle the enabled state of the currently selected incident rule after user confirmation. + + Looks up the selected rule in the rules screen; if a valid rule is selected, prompts the user to confirm toggling its enabled flag and, on confirmation, applies the change via the internal rule update handler. Logs informational messages for missing selection, unknown rule id, or cancellation. + """ if self.active_screen != "rules": return if isinstance(self.focused, Input): @@ -3067,6 +3146,14 @@ def after(decision: bool) -> None: self.push_screen(ConfirmDialog(prompt), after) def _ingest_proposals_batch(self, batch: ProposalsBatchV1) -> None: + """ + Ingest a ProposalsBatchV1 into the app's proposal stores and refresh the proposals table. + + Appends the batch (as a dict) to the rolling batch history (kept to 30 entries), updates the proposals-by-key map using each proposal's `proposal_id` (skipping proposals with no id), enforces the configured maximum visible proposals rows by removing the oldest entries when exceeded, and triggers a UI refresh of the proposals table. + + Parameters: + batch (ProposalsBatchV1): The incoming proposals batch to ingest. + """ self._proposals_batches.append(batch.model_dump()) if len(self._proposals_batches) > 30: self._proposals_batches = self._proposals_batches[-30:] @@ -3084,6 +3171,14 @@ def _ingest_proposals_batch(self, batch: ProposalsBatchV1) -> None: self._render_proposals_table() async def handle_proposals_data(self, data: dict) -> None: + """ + Process an incoming proposals message: validate, ingest the proposals batch, log each proposal title, and refresh the inspector when the proposals screen is active. + + If the payload fails validation, logs a warning and returns without mutating state. + + Parameters: + data (dict): Raw message payload expected to contain a "data" key with the proposals batch structure. + """ payload = data.get("data", {}) if isinstance(data, dict) else {} try: batch = ProposalsBatchV1.model_validate(payload) @@ -3103,6 +3198,14 @@ async def handle_proposals_data(self, data: dict) -> None: self._refresh_inspector() async def handle_control_response(self, data: dict) -> None: + """ + Log a human-readable, localized summary of a control response payload. + + Parses the incoming message (expected to be a dict with a "data" dict) and extracts a success indicator, request identifier, and an optional status/message from an inner payload. Formats those values using localization helpers and emits a single console log entry summarizing the control response. Handles missing or malformed input gracefully by substituting localized "N/A" or stringified values as appropriate. + + Parameters: + data (dict): Incoming control response envelope or raw payload; typically a dict containing a "data" key with the response body. If not a dict or missing expected fields, the function logs a best-effort summary with placeholders. + """ payload = data.get("data", {}) if isinstance(data, dict) else {} if not isinstance(payload, dict): payload = {} @@ -3126,6 +3229,14 @@ async def handle_control_response(self, data: dict) -> None: ) def action_show_screen(self, screen: str) -> None: + """ + Switch the UI to the specified screen and update visible panels and dependent views. + + Sets the application's active screen state, marks the corresponding sidebar entry active, shows the chosen screen's panel while hiding other screens, and refreshes any screen-specific tables and the inspector. If the provided screen name is not recognized, logs an "Unknown screen" message and does nothing. + + Parameters: + screen (str): Canonical screen name to activate (e.g., "system", "radar", "events", "console", "summary", "power", "diagnostics", "mission", "proposals", "rules"). + """ if screen not in {app.screen for app in ORION_APPS}: self._console_log(f"{I18N.bidi('Unknown screen', 'Неизвестный экран')}: {screen}", level="info") return @@ -3171,7 +3282,11 @@ def on_button_pressed(self, event: Button.Pressed) -> None: self._render_rules_table() def action_cycle_focus(self) -> None: - """Cycle focus: Sidebar → Workspace → Inspector → Command.""" + """ + Cycle keyboard focus through the primary UI regions in this order: sidebar → workspace → inspector → command. + + Skips any region that is not present and advances to the next available widget. The workspace region targeted depends on the currently active screen (for example, when the active screen is "radar" the radar table is focused; when "proposals" the proposals table is focused). + """ def safe_query(selector: str) -> Optional[Static]: try: @@ -3225,9 +3340,27 @@ def action_focus_command(self) -> None: pass def action_help(self) -> None: + """ + Display the application's help dialog. + + Shows the comprehensive help overlay with available commands, application screens, quick actions, and glossary entries. + """ self._show_help() def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None: + """ + Update the current selection context when a data table row is highlighted. + + This handler inspects the highlighted row and, depending on the source table, constructs + and sets an appropriate SelectionContext (app_id, key, kind, source, created_at_epoch, + payload, ids). It ignores seed rows, rows with invalid keys, and unsupported tables. + Supported tables and resulting selection kinds: proposals, rules, events (incidents), + console, summary, power, diagnostics, mission, and radar (tracks). + + Parameters: + event (DataTable.RowHighlighted): The row-highlight event containing the data_table + reference, the row_key for the highlighted row, and cursor_row for position-based lookup. + """ if event.data_table.id == "proposals-table": try: row_key = str(event.row_key) @@ -3618,6 +3751,14 @@ def display_aliases(app: OrionAppSpec) -> str: ) async def _run_command(self, raw: str) -> None: + """ + Parse and execute a raw operator-console command string, performing the corresponding UI action or control operation. + + This method interprets the provided command text and dispatches side-effecting actions such as publishing a QIKI intent, toggling events live/paused, acknowledging or clearing incidents, setting event type/text filters, reloading incident rules, switching screens, sending simulation commands, and other console operations; unrecognized commands are logged as unknown. Commands and localized synonyms (English/Russian) are supported and selection/defaulting behavior is applied when arguments are omitted. + + Parameters: + raw (str): The raw command text entered by the operator; leading/trailing whitespace is ignored. + """ cmd = (raw or "").strip() if not cmd: return @@ -3856,6 +3997,14 @@ async def _run_command(self, raw: str) -> None: self._console_log(f"{I18N.bidi('Unknown command', 'Неизвестная команда')}: {cmd}", level="info") async def _publish_qiki_intent(self, text: str) -> None: + """ + Publish a QIKI intent containing a short text and a minimal runtime snapshot to the QIKI intent stream. + + Constructs an IntentV1 payload that includes the provided text, current active screen and selection, a compact set of telemetry vitals, and a top-N incident summary, then publishes it to the QIKI intent subject and logs the outcome. If `text` is empty or the NATS client is not available, the function returns without publishing. + + Parameters: + text (str): The intent text to send to QIKI. + """ if not text: return if not self.nats_client: @@ -3867,6 +4016,16 @@ async def _publish_qiki_intent(self, text: str) -> None: return def nested_get(d: Any, path: str) -> Any: + """ + Retrieve a value from a nested mapping using a dot-separated key path. + + Parameters: + d (Any): The root mapping to traverse; expected to be a dict-like object. + path (str): Dot-separated sequence of keys (e.g. "a.b.c"). Empty or None yields the original mapping. + + Returns: + Any: The value found at the given path, or `None` if any intermediate value is not a dict or a key is missing. + """ node = d for part in (path or "").split("."): if not part: @@ -3971,6 +4130,11 @@ def nested_get(d: Any, path: str) -> Any: ) def _update_command_placeholder(self) -> None: + """ + Set the command input's placeholder to a localized hint string describing available commands. + + The placeholder includes short examples and localized labels for help, screen switching, simulation start, and QIKI intents (both `q: ` and `// `). If the command input widget cannot be found, the method does nothing. + """ try: dock = self.query_one("#command-dock", Input) except Exception: @@ -4075,4 +4239,4 @@ async def _publish_sim_command(self, cmd_name: str) -> None: if __name__ == "__main__": - OrionApp().run() + OrionApp().run() \ No newline at end of file diff --git a/src/qiki/services/operator_console/tests/test_qiki_routing.py b/src/qiki/services/operator_console/tests/test_qiki_routing.py index 71fef6e2..090a39f7 100644 --- a/src/qiki/services/operator_console/tests/test_qiki_routing.py +++ b/src/qiki/services/operator_console/tests/test_qiki_routing.py @@ -35,9 +35,23 @@ def test_parse_qiki_intent_shell_command() -> None: class _FakeNats: def __init__(self) -> None: + """ + Initialize the fake NATS publisher and prepare an empty list of captured publishes. + + The `published` attribute records published messages as a list of tuples (subject, payload), + where `subject` is a string and `payload` is a dict representing the published command. + """ self.published: list[tuple[str, dict]] = [] async def publish_command(self, subject: str, command: dict) -> None: + """ + Record a published command by appending the (subject, command) tuple to the `published` list. + + Parameters: + subject (str): NATS subject under which the command was published. + command (dict): Payload of the published command. + + """ self.published.append((subject, command)) @@ -66,4 +80,4 @@ async def test_shell_command_does_not_publish_intent() -> None: await app._run_command("help") await asyncio.sleep(0) - assert app.nats_client.published == [] # type: ignore[attr-defined] + assert app.nats_client.published == [] # type: ignore[attr-defined] \ No newline at end of file diff --git a/src/qiki/services/q_core_agent/core/neural_engine.py b/src/qiki/services/q_core_agent/core/neural_engine.py index 2f3433bc..47b28023 100644 --- a/src/qiki/services/q_core_agent/core/neural_engine.py +++ b/src/qiki/services/q_core_agent/core/neural_engine.py @@ -40,8 +40,12 @@ class _LLMProposalsResponseV1(BaseModel): def _strip_actions_for_proposals_only(payload: Dict[str, Any]) -> Dict[str, Any]: """ - Defensive hardening: some models may try to add "actions"/"proposed_actions" even if we - instruct "proposals-only". We always drop them before schema validation. + Remove any "actions" or "proposed_actions" keys from each proposal in the payload. + + If the payload contains a "proposals" list of dictionaries, this function removes those keys from each dictionary in-place and returns the (possibly modified) payload. + + Returns: + payload (Dict[str, Any]): The original payload with "actions" and "proposed_actions" removed from proposal items when present. """ proposals = payload.get("proposals") if isinstance(proposals, list): @@ -59,6 +63,20 @@ class NeuralEngine(INeuralEngine): """ def __init__(self, context: "AgentContext", config: "QCoreAgentConfig"): + """ + Initialize the NeuralEngine instance and load OpenAI-related configuration from environment variables. + + Parameters: + context (AgentContext): Runtime agent context used by the engine. + config (QCoreAgentConfig): Agent configuration providing flags such as mock_neural_proposals_enabled. + + Details: + - Saves provided context and config on the instance. + - Reads and stores OpenAI settings from environment variables: + OPENAI_API_KEY, OPENAI_MODEL, OPENAI_BASE_URL, OPENAI_TIMEOUT_S, + OPENAI_MAX_OUTPUT_TOKENS, OPENAI_MAX_RETRIES, OPENAI_TEMPERATURE. + - Sets the mock proposals enabled flag from config and logs initialization. + """ self.context = context self.config = config self.mock_neural_proposals_enabled = config.mock_neural_proposals_enabled @@ -80,6 +98,14 @@ def __init__(self, context: "AgentContext", config: "QCoreAgentConfig"): ) def generate_proposals(self, context: "AgentContext") -> List[Proposal]: + """ + Generate a list of Proposal objects using either a mock implementation, a diagnostics fallback, or proposals produced by the OpenAI-backed LLM. + + When mock proposals are enabled, returns a single predefined mock Proposal. If no OpenAI API key is configured, returns a single diagnostics Proposal indicating LLM unavailability. If an OpenAI call fails, returns a single diagnostics Proposal containing a truncated (first 200 characters) error message and zeroed priority/confidence. On success, converts each LLM proposal into a Proposal where the justification is formed as "title: justification", priority and confidence are taken from the LLM output, and the proposal type is mapped to ProposalTypeEnum (defaults to PLANNING if unknown). + + Returns: + List[Proposal]: A list containing either mock, diagnostics, or translated LLM proposals. + """ logger.debug("Generating proposals from Neural Engine.") proposals: List[Proposal] = [] @@ -148,6 +174,15 @@ def generate_proposals(self, context: "AgentContext") -> List[Proposal]: def _generate_openai_proposals( self, context: "AgentContext" ) -> _LLMProposalsResponseV1: + """ + Request 1–3 structured proposals from the configured OpenAI Responses API using a minimal agent context and return them validated and sanitized. + + Parameters: + context (AgentContext): Agent state used to build the minimal user context supplied to the LLM. + + Returns: + _LLMProposalsResponseV1: A validated and sanitized response containing 1–3 proposals, each with `title`, `justification`, `priority`, `confidence`, and `type`. + """ client = OpenAIResponsesClient( api_key=self._openai_api_key, model=self._openai_model, @@ -181,6 +216,21 @@ def _generate_openai_proposals( return _LLMProposalsResponseV1.model_validate(parsed) def _build_min_context(self, context: "AgentContext") -> Dict[str, Any]: + """ + Build a minimal serializable context dictionary extracted from the agent runtime context for use in LLM prompts. + + The returned dictionary contains the following keys: + - bios_status: a serializable representation of context.bios_status or None. + - fsm_state: a serializable representation of context.fsm_state or None. + - guard_events: a list of up to the first 20 serializable guard event objects (empty list if none). + - world_snapshot: a serializable representation of context.world_snapshot or None. + + Serialization rules applied to each value: + - protobuf Message objects are converted to dicts while preserving proto field names. + - objects with a `model_dump()` method use that output. + - objects with a `dict()` method use that output. + - None is preserved; any other value is returned as-is. + """ def _dump(value: Any) -> Any: if value is None: return None @@ -200,4 +250,4 @@ def _dump(value: Any) -> Any: for event in (getattr(context, "guard_events", None) or [])[:20] ], "world_snapshot": _dump(getattr(context, "world_snapshot", None)), - } + } \ No newline at end of file diff --git a/src/qiki/services/q_core_agent/core/openai_responses_client.py b/src/qiki/services/q_core_agent/core/openai_responses_client.py index 0ca1aa66..a253277b 100644 --- a/src/qiki/services/q_core_agent/core/openai_responses_client.py +++ b/src/qiki/services/q_core_agent/core/openai_responses_client.py @@ -18,6 +18,21 @@ def _http_post_json( body: Dict[str, Any], timeout_s: float, ) -> Dict[str, Any]: + """ + Send a JSON POST to the given URL and return the parsed JSON response. + + Parameters: + url (str): Destination URL for the POST request. + headers (Dict[str, str]): HTTP headers to include (will be merged with Content-Type). + body (Dict[str, Any]): JSON-serializable payload to send as the request body. + timeout_s (float): Request timeout in seconds. + + Returns: + Dict[str, Any]: Parsed JSON object from the response body. + + Raises: + OpenAIResponsesError: If the response body is not valid JSON. + """ request = urllib.request.Request( url=url, data=json.dumps(body).encode("utf-8"), @@ -33,6 +48,20 @@ def _http_post_json( def _extract_output_text(response: Dict[str, Any]) -> str: + """ + Extracts the first non-empty "output_text" string from an OpenAI-like response. + + Scans the top-level response's "output" list for the first item of type "message", then the first content chunk of type "output_text" whose "text" field is a non-empty string, and returns that string. + + Parameters: + response (Dict[str, Any]): The parsed response mapping expected to contain an "output" list of message objects with "content" lists of chunks. + + Returns: + str: The first non-empty output text found. + + Raises: + OpenAIResponsesError: If the top-level "output" is missing or not a list, or if no suitable "output_text" chunk with non-empty text is found. + """ output = response.get("output") if not isinstance(output, list): raise OpenAIResponsesError("OpenAI response missing 'output' array") @@ -74,6 +103,22 @@ def create_response_json_schema( user_json: Dict[str, Any], json_schema: Dict[str, Any], ) -> Dict[str, Any]: + """ + Create a response from the configured Responses API constrained by a JSON schema. + + Sends a POST to the client's /responses endpoint with a system prompt, the provided user JSON (sent as a JSON string), and a JSON Schema to enforce on the model's output. The method will retry transient HTTP and network errors up to the client's max_retries using exponential backoff with jitter. + + Parameters: + system_prompt (str): System-role prompt text to include in the request. + user_json (dict): JSON-serializable object to include as the user's input; it will be JSON-dumped into the request. + json_schema (dict): JSON Schema that the model's output must conform to; sent in strict `json_schema` format. + + Returns: + dict: The parsed JSON response payload returned by the Responses API. + + Raises: + OpenAIResponsesError: If the request fails (HTTP error, network error, or exhausted retries) or the API returns an error detail. + """ url = f"{self.base_url.rstrip('/')}/responses" headers = {"Authorization": f"Bearer {self.api_key}"} @@ -132,6 +177,18 @@ def create_response_json_schema( def parse_response_json(*, response: Dict[str, Any]) -> Dict[str, Any]: + """ + Parse the model response and return the JSON object produced by the model. + + Parameters: + response (Dict[str, Any]): Raw response structure from the OpenAI-like Responses API. + + Returns: + Dict[str, Any]: The parsed JSON object extracted from the model output. + + Raises: + OpenAIResponsesError: If the model output is not valid JSON or if the parsed JSON is not an object. + """ text = _extract_output_text(response) try: data = json.loads(text) @@ -140,4 +197,3 @@ def parse_response_json(*, response: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(data, dict): raise OpenAIResponsesError("Model output JSON must be an object") return data - diff --git a/src/qiki/services/q_core_agent/tests/test_agent.py b/src/qiki/services/q_core_agent/tests/test_agent.py index 988edd75..6888f86c 100644 --- a/src/qiki/services/q_core_agent/tests/test_agent.py +++ b/src/qiki/services/q_core_agent/tests/test_agent.py @@ -432,6 +432,14 @@ def test_neural_engine_openai_success_returns_proposals(monkeypatch): from qiki.services.q_core_agent.core import neural_engine as neural_engine_module def fake_create_response_json_schema(*args, **kwargs): + """ + Return a deterministic mock OpenAI Responses payload containing two proposal entries. + + The returned structure mimics the JSON schema produced by OpenAIResponsesClient.create_response_json_schema: an outer dict with an "output" list containing a single message whose "content" includes an "output_text" entry. The output_text is a JSON string whose parsed object has a "proposals" list with two proposal objects (one DIAGNOSTICS and one PLANNING) including keys: title, justification, priority, confidence, and type. + + Returns: + dict: A mock response payload with the described "output" structure and embedded JSON-stringified proposals. + """ _ = args, kwargs return { "output": [ @@ -490,6 +498,12 @@ def test_neural_engine_openai_error_returns_stub(monkeypatch): from qiki.services.q_core_agent.core import neural_engine as neural_engine_module def fake_create_response_json_schema(*args, **kwargs): + """ + Simulates an OpenAI responses client raising a rate-limit error for testing. + + Raises: + neural_engine_module.OpenAIResponsesError: Always raised with message "rate limited". + """ _ = args, kwargs raise neural_engine_module.OpenAIResponsesError("rate limited") @@ -548,4 +562,4 @@ def test_tick_orchestrator_handles_exception_and_recovers(mock_data_provider): mock_agent._handle_fsm.assert_not_called() mock_agent._evaluate_proposals.assert_not_called() mock_agent._make_decision.assert_not_called() - assert mock_agent.tick_id == 1 + assert mock_agent.tick_id == 1 \ No newline at end of file diff --git a/src/qiki/services/q_core_agent/tests/test_neuralengine_proposals_only.py b/src/qiki/services/q_core_agent/tests/test_neuralengine_proposals_only.py index 6ec0a06b..b4702e6a 100644 --- a/src/qiki/services/q_core_agent/tests/test_neuralengine_proposals_only.py +++ b/src/qiki/services/q_core_agent/tests/test_neuralengine_proposals_only.py @@ -10,6 +10,14 @@ def _make_engine() -> NeuralEngine: + """ + Create a NeuralEngine preconfigured for local testing. + + Returns: + A NeuralEngine instance configured with tick_interval=1, log_level="INFO", + recovery_delay=1, proposal_confidence_threshold=0.8, + mock_neural_proposals_enabled=False, and grpc_server_address="localhost:50051". + """ config = QCoreAgentConfig( tick_interval=1, log_level="INFO", @@ -63,6 +71,17 @@ def test_neuralengine_strips_actions_from_llm_payload(monkeypatch) -> None: monkeypatch.setenv("OPENAI_API_KEY", "test-key") def _fake_create_response_json_schema(self, *, system_prompt, user_json, json_schema): # noqa: ARG001 + """ + Produce a fake OpenAIResponsesClient.create_response_json_schema-style payload whose `output` contains a single message with `output_text` embedding a JSON string of proposals that include both `actions` and `proposed_actions`. + + Parameters: + system_prompt (str): Ignored; present to match the real method signature. + user_json (dict): Ignored; present to match the real method signature. + json_schema (dict): Ignored; present to match the real method signature. + + Returns: + dict: A simulated response where `output` is a list containing one message whose `content` includes an `output_text` entry. The `text` value is a JSON string with a `proposals` array; each proposal includes `title`, `justification`, `priority`, `confidence`, `type`, and explicit `actions` and `proposed_actions` fields. + """ return { "output": [ { @@ -92,4 +111,4 @@ def _fake_create_response_json_schema(self, *, system_prompt, user_json, json_sc proposals = engine.generate_proposals(AgentContext()) assert proposals - assert all(len(p.proposed_actions) == 0 for p in proposals) + assert all(len(p.proposed_actions) == 0 for p in proposals) \ No newline at end of file diff --git a/src/qiki/shared/models/orion_qiki_protocol.py b/src/qiki/shared/models/orion_qiki_protocol.py index 6a9ab668..ca56e73e 100644 --- a/src/qiki/shared/models/orion_qiki_protocol.py +++ b/src/qiki/shared/models/orion_qiki_protocol.py @@ -48,6 +48,18 @@ class ProposalV1(_StrictModel): @field_validator("proposed_actions") @classmethod def _must_be_empty_in_stage_a(cls, v: list[Any]) -> list[Any]: + """ + Validate that the `proposed_actions` list is empty for Stage A. + + Parameters: + v (list[Any]): The `proposed_actions` list to validate. + + Returns: + list[Any]: The same list when valid. + + Raises: + ValueError: If `v` is non-empty. + """ if v: raise ValueError("proposed_actions must be empty in Stage A") return v @@ -58,4 +70,3 @@ class ProposalsBatchV1(_StrictModel): ts: int proposals: list[ProposalV1] = Field(default_factory=list) metadata: dict[str, Any] = Field(default_factory=dict) - diff --git a/tests/unit/test_orion_qiki_protocol_v1.py b/tests/unit/test_orion_qiki_protocol_v1.py index bd579c97..c9a0342a 100644 --- a/tests/unit/test_orion_qiki_protocol_v1.py +++ b/tests/unit/test_orion_qiki_protocol_v1.py @@ -34,6 +34,13 @@ def test_intent_v1_roundtrip() -> None: def test_intent_v1_requires_fields() -> None: + """ + Verifies that validating an IntentV1 payload fails when required fields are missing. + + Attempts to validate a minimal payload containing only `version` and `text` and expects a `ValidationError`. + Raises: + ValidationError: If the payload is missing required fields. + """ with pytest.raises(ValidationError): IntentV1.model_validate({"version": 1, "text": "x"}) @@ -111,4 +118,4 @@ def test_strict_extra_fields_rejected() -> None: "snapshot_min": {}, "extra": "nope", } - ) + ) \ No newline at end of file