Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 46 additions & 4 deletions src/qiki/services/operator_console/clients/nats_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,19 @@ async def subscribe_control_responses(
self,
callback: Callable[[Dict[str, Any]], Awaitable[None]],
) -> None:
"""Subscribe to control responses emitted by FastStream bridge."""
"""
Subscribe to control response messages from the FastStream bridge and forward each message to the provided callback.

Parameters:
callback (Callable[[Dict[str, Any]], Awaitable[None]]): Async function invoked for every received message. It will be called with a dictionary containing:
- stream: `"CONTROL_RESPONSES"`
- timestamp: ISO 8601 timestamp string when the message was processed
- subject: the NATS subject of the message (or `None` if unavailable)
- data: the decoded JSON payload of the message

Raises:
RuntimeError: If the client is not connected to NATS.
"""
if not self.nc:
raise RuntimeError("Not connected to NATS")

Expand Down Expand Up @@ -303,11 +315,34 @@ async def subscribe_qiki_proposals(
self,
callback: Callable[[Dict[str, Any]], Awaitable[None]],
) -> None:
"""Subscribe to QIKI proposals batches (core NATS)."""
"""
Subscribe to QIKI proposal batches from core NATS and forward each message to the provided callback.

The callback is invoked with a dictionary containing the keys:
- `stream`: the string "QIKI_PROPOSALS"
- `timestamp`: ISO 8601 timestamp when the message was processed
- `subject`: the NATS subject on which the message was received (or None)
- `data`: the decoded JSON payload

Parameters:
callback (Callable[[Dict[str, Any]], Awaitable[None]]): Async function called for each incoming message with the described dictionary payload.

Raises:
RuntimeError: If the NATS client is not connected.
"""
if not self.nc:
raise RuntimeError("Not connected to NATS")

async def message_handler(msg):
"""
Handle an incoming NATS message for QIKI proposals by decoding JSON and invoking the subscription callback with a structured payload.

Parameters:
msg: NATS message object whose `data` is a JSON-encoded bytes payload and which may have a `subject` attribute.

Notes:
On processing failure, an error message is printed.
"""
try:
data = json.loads(msg.data.decode())
await callback(
Expand All @@ -331,7 +366,14 @@ async def message_handler(msg):
raise

async def get_jetstream_info(self) -> Dict[str, Any]:
"""Get JetStream account info."""
"""
Retrieve high-level JetStream account statistics.

Returns a dictionary with keys "memory", "storage", "streams", and "consumers" containing the corresponding account values. If an error occurs while fetching info, an empty dict is returned.

Raises:
RuntimeError: If the client is not connected to JetStream.
"""
if not self.js:
raise RuntimeError("Not connected to JetStream")

Expand Down Expand Up @@ -378,4 +420,4 @@ async def track_callback(data):


if __name__ == "__main__":
asyncio.run(test_client())
asyncio.run(test_client())
168 changes: 166 additions & 2 deletions src/qiki/services/operator_console/main_orion.py

Large diffs are not rendered by default.

16 changes: 15 additions & 1 deletion src/qiki/services/operator_console/tests/test_qiki_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,23 @@ def test_parse_qiki_intent_shell_command() -> None:

class _FakeNats:
def __init__(self) -> None:
"""
Initialize the fake NATS publisher and prepare an empty list of captured publishes.

The `published` attribute records published messages as a list of tuples (subject, payload),
where `subject` is a string and `payload` is a dict representing the published command.
"""
self.published: list[tuple[str, dict]] = []

async def publish_command(self, subject: str, command: dict) -> None:
"""
Record a published command by appending the (subject, command) tuple to the `published` list.

Parameters:
subject (str): NATS subject under which the command was published.
command (dict): Payload of the published command.

"""
self.published.append((subject, command))


Expand Down Expand Up @@ -66,4 +80,4 @@ async def test_shell_command_does_not_publish_intent() -> None:
await app._run_command("help")
await asyncio.sleep(0)

assert app.nats_client.published == [] # type: ignore[attr-defined]
assert app.nats_client.published == [] # type: ignore[attr-defined]
56 changes: 53 additions & 3 deletions src/qiki/services/q_core_agent/core/neural_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ class _LLMProposalsResponseV1(BaseModel):

def _strip_actions_for_proposals_only(payload: Dict[str, Any]) -> Dict[str, Any]:
"""
Defensive hardening: some models may try to add "actions"/"proposed_actions" even if we
instruct "proposals-only". We always drop them before schema validation.
Remove any "actions" or "proposed_actions" keys from each proposal in the payload.

If the payload contains a "proposals" list of dictionaries, this function removes those keys from each dictionary in-place and returns the (possibly modified) payload.

Returns:
payload (Dict[str, Any]): The original payload with "actions" and "proposed_actions" removed from proposal items when present.
"""
proposals = payload.get("proposals")
if isinstance(proposals, list):
Expand All @@ -59,6 +63,20 @@ class NeuralEngine(INeuralEngine):
"""

def __init__(self, context: "AgentContext", config: "QCoreAgentConfig"):
"""
Initialize the NeuralEngine instance and load OpenAI-related configuration from environment variables.

Parameters:
context (AgentContext): Runtime agent context used by the engine.
config (QCoreAgentConfig): Agent configuration providing flags such as mock_neural_proposals_enabled.

Details:
- Saves provided context and config on the instance.
- Reads and stores OpenAI settings from environment variables:
OPENAI_API_KEY, OPENAI_MODEL, OPENAI_BASE_URL, OPENAI_TIMEOUT_S,
OPENAI_MAX_OUTPUT_TOKENS, OPENAI_MAX_RETRIES, OPENAI_TEMPERATURE.
- Sets the mock proposals enabled flag from config and logs initialization.
"""
self.context = context
self.config = config
self.mock_neural_proposals_enabled = config.mock_neural_proposals_enabled
Expand All @@ -80,6 +98,14 @@ def __init__(self, context: "AgentContext", config: "QCoreAgentConfig"):
)

def generate_proposals(self, context: "AgentContext") -> List[Proposal]:
"""
Generate a list of Proposal objects using either a mock implementation, a diagnostics fallback, or proposals produced by the OpenAI-backed LLM.

When mock proposals are enabled, returns a single predefined mock Proposal. If no OpenAI API key is configured, returns a single diagnostics Proposal indicating LLM unavailability. If an OpenAI call fails, returns a single diagnostics Proposal containing a truncated (first 200 characters) error message and zeroed priority/confidence. On success, converts each LLM proposal into a Proposal where the justification is formed as "title: justification", priority and confidence are taken from the LLM output, and the proposal type is mapped to ProposalTypeEnum (defaults to PLANNING if unknown).

Returns:
List[Proposal]: A list containing either mock, diagnostics, or translated LLM proposals.
"""
logger.debug("Generating proposals from Neural Engine.")
proposals: List[Proposal] = []

Expand Down Expand Up @@ -148,6 +174,15 @@ def generate_proposals(self, context: "AgentContext") -> List[Proposal]:
def _generate_openai_proposals(
self, context: "AgentContext"
) -> _LLMProposalsResponseV1:
"""
Request 1–3 structured proposals from the configured OpenAI Responses API using a minimal agent context and return them validated and sanitized.

Parameters:
context (AgentContext): Agent state used to build the minimal user context supplied to the LLM.

Returns:
_LLMProposalsResponseV1: A validated and sanitized response containing 1–3 proposals, each with `title`, `justification`, `priority`, `confidence`, and `type`.
"""
client = OpenAIResponsesClient(
api_key=self._openai_api_key,
model=self._openai_model,
Expand Down Expand Up @@ -181,6 +216,21 @@ def _generate_openai_proposals(
return _LLMProposalsResponseV1.model_validate(parsed)

def _build_min_context(self, context: "AgentContext") -> Dict[str, Any]:
"""
Build a minimal serializable context dictionary extracted from the agent runtime context for use in LLM prompts.

The returned dictionary contains the following keys:
- bios_status: a serializable representation of context.bios_status or None.
- fsm_state: a serializable representation of context.fsm_state or None.
- guard_events: a list of up to the first 20 serializable guard event objects (empty list if none).
- world_snapshot: a serializable representation of context.world_snapshot or None.

Serialization rules applied to each value:
- protobuf Message objects are converted to dicts while preserving proto field names.
- objects with a `model_dump()` method use that output.
- objects with a `dict()` method use that output.
- None is preserved; any other value is returned as-is.
"""
def _dump(value: Any) -> Any:
if value is None:
return None
Expand All @@ -200,4 +250,4 @@ def _dump(value: Any) -> Any:
for event in (getattr(context, "guard_events", None) or [])[:20]
],
"world_snapshot": _dump(getattr(context, "world_snapshot", None)),
}
}
58 changes: 57 additions & 1 deletion src/qiki/services/q_core_agent/core/openai_responses_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@ def _http_post_json(
body: Dict[str, Any],
timeout_s: float,
) -> Dict[str, Any]:
"""
Send a JSON POST to the given URL and return the parsed JSON response.

Parameters:
url (str): Destination URL for the POST request.
headers (Dict[str, str]): HTTP headers to include (will be merged with Content-Type).
body (Dict[str, Any]): JSON-serializable payload to send as the request body.
timeout_s (float): Request timeout in seconds.

Returns:
Dict[str, Any]: Parsed JSON object from the response body.

Raises:
OpenAIResponsesError: If the response body is not valid JSON.
"""
request = urllib.request.Request(
url=url,
data=json.dumps(body).encode("utf-8"),
Expand All @@ -33,6 +48,20 @@ def _http_post_json(


def _extract_output_text(response: Dict[str, Any]) -> str:
"""
Extracts the first non-empty "output_text" string from an OpenAI-like response.

Scans the top-level response's "output" list for the first item of type "message", then the first content chunk of type "output_text" whose "text" field is a non-empty string, and returns that string.

Parameters:
response (Dict[str, Any]): The parsed response mapping expected to contain an "output" list of message objects with "content" lists of chunks.

Returns:
str: The first non-empty output text found.

Raises:
OpenAIResponsesError: If the top-level "output" is missing or not a list, or if no suitable "output_text" chunk with non-empty text is found.
"""
output = response.get("output")
if not isinstance(output, list):
raise OpenAIResponsesError("OpenAI response missing 'output' array")
Expand Down Expand Up @@ -74,6 +103,22 @@ def create_response_json_schema(
user_json: Dict[str, Any],
json_schema: Dict[str, Any],
) -> Dict[str, Any]:
"""
Create a response from the configured Responses API constrained by a JSON schema.

Sends a POST to the client's /responses endpoint with a system prompt, the provided user JSON (sent as a JSON string), and a JSON Schema to enforce on the model's output. The method will retry transient HTTP and network errors up to the client's max_retries using exponential backoff with jitter.

Parameters:
system_prompt (str): System-role prompt text to include in the request.
user_json (dict): JSON-serializable object to include as the user's input; it will be JSON-dumped into the request.
json_schema (dict): JSON Schema that the model's output must conform to; sent in strict `json_schema` format.

Returns:
dict: The parsed JSON response payload returned by the Responses API.

Raises:
OpenAIResponsesError: If the request fails (HTTP error, network error, or exhausted retries) or the API returns an error detail.
"""
url = f"{self.base_url.rstrip('/')}/responses"
headers = {"Authorization": f"Bearer {self.api_key}"}

Expand Down Expand Up @@ -132,6 +177,18 @@ def create_response_json_schema(


def parse_response_json(*, response: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse the model response and return the JSON object produced by the model.

Parameters:
response (Dict[str, Any]): Raw response structure from the OpenAI-like Responses API.

Returns:
Dict[str, Any]: The parsed JSON object extracted from the model output.

Raises:
OpenAIResponsesError: If the model output is not valid JSON or if the parsed JSON is not an object.
"""
text = _extract_output_text(response)
try:
data = json.loads(text)
Expand All @@ -140,4 +197,3 @@ def parse_response_json(*, response: Dict[str, Any]) -> Dict[str, Any]:
if not isinstance(data, dict):
raise OpenAIResponsesError("Model output JSON must be an object")
return data

16 changes: 15 additions & 1 deletion src/qiki/services/q_core_agent/tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,14 @@ def test_neural_engine_openai_success_returns_proposals(monkeypatch):
from qiki.services.q_core_agent.core import neural_engine as neural_engine_module

def fake_create_response_json_schema(*args, **kwargs):
"""
Return a deterministic mock OpenAI Responses payload containing two proposal entries.

The returned structure mimics the JSON schema produced by OpenAIResponsesClient.create_response_json_schema: an outer dict with an "output" list containing a single message whose "content" includes an "output_text" entry. The output_text is a JSON string whose parsed object has a "proposals" list with two proposal objects (one DIAGNOSTICS and one PLANNING) including keys: title, justification, priority, confidence, and type.

Returns:
dict: A mock response payload with the described "output" structure and embedded JSON-stringified proposals.
"""
_ = args, kwargs
return {
"output": [
Expand Down Expand Up @@ -490,6 +498,12 @@ def test_neural_engine_openai_error_returns_stub(monkeypatch):
from qiki.services.q_core_agent.core import neural_engine as neural_engine_module

def fake_create_response_json_schema(*args, **kwargs):
"""
Simulates an OpenAI responses client raising a rate-limit error for testing.

Raises:
neural_engine_module.OpenAIResponsesError: Always raised with message "rate limited".
"""
_ = args, kwargs
raise neural_engine_module.OpenAIResponsesError("rate limited")

Expand Down Expand Up @@ -548,4 +562,4 @@ def test_tick_orchestrator_handles_exception_and_recovers(mock_data_provider):
mock_agent._handle_fsm.assert_not_called()
mock_agent._evaluate_proposals.assert_not_called()
mock_agent._make_decision.assert_not_called()
assert mock_agent.tick_id == 1
assert mock_agent.tick_id == 1
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@


def _make_engine() -> NeuralEngine:
"""
Create a NeuralEngine preconfigured for local testing.

Returns:
A NeuralEngine instance configured with tick_interval=1, log_level="INFO",
recovery_delay=1, proposal_confidence_threshold=0.8,
mock_neural_proposals_enabled=False, and grpc_server_address="localhost:50051".
"""
config = QCoreAgentConfig(
tick_interval=1,
log_level="INFO",
Expand Down Expand Up @@ -63,6 +71,17 @@ def test_neuralengine_strips_actions_from_llm_payload(monkeypatch) -> None:
monkeypatch.setenv("OPENAI_API_KEY", "test-key")

def _fake_create_response_json_schema(self, *, system_prompt, user_json, json_schema): # noqa: ARG001
"""
Produce a fake OpenAIResponsesClient.create_response_json_schema-style payload whose `output` contains a single message with `output_text` embedding a JSON string of proposals that include both `actions` and `proposed_actions`.

Parameters:
system_prompt (str): Ignored; present to match the real method signature.
user_json (dict): Ignored; present to match the real method signature.
json_schema (dict): Ignored; present to match the real method signature.

Returns:
dict: A simulated response where `output` is a list containing one message whose `content` includes an `output_text` entry. The `text` value is a JSON string with a `proposals` array; each proposal includes `title`, `justification`, `priority`, `confidence`, `type`, and explicit `actions` and `proposed_actions` fields.
"""
return {
"output": [
{
Expand Down Expand Up @@ -92,4 +111,4 @@ def _fake_create_response_json_schema(self, *, system_prompt, user_json, json_sc
proposals = engine.generate_proposals(AgentContext())

assert proposals
assert all(len(p.proposed_actions) == 0 for p in proposals)
assert all(len(p.proposed_actions) == 0 for p in proposals)
13 changes: 12 additions & 1 deletion src/qiki/shared/models/orion_qiki_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ class ProposalV1(_StrictModel):
@field_validator("proposed_actions")
@classmethod
def _must_be_empty_in_stage_a(cls, v: list[Any]) -> list[Any]:
"""
Validate that the `proposed_actions` list is empty for Stage A.

Parameters:
v (list[Any]): The `proposed_actions` list to validate.

Returns:
list[Any]: The same list when valid.

Raises:
ValueError: If `v` is non-empty.
"""
if v:
raise ValueError("proposed_actions must be empty in Stage A")
return v
Expand All @@ -58,4 +70,3 @@ class ProposalsBatchV1(_StrictModel):
ts: int
proposals: list[ProposalV1] = Field(default_factory=list)
metadata: dict[str, Any] = Field(default_factory=dict)

Loading