Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/qiki/services/operator_console/clients/nats_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
RADAR_TRACKS_SR,
SYSTEM_TELEMETRY,
RESPONSES_CONTROL,
QIKI_PROPOSALS_V1,
)


Expand Down Expand Up @@ -297,6 +298,37 @@ async def message_handler(msg):
except Exception as e:
print(f"❌ Failed to subscribe to CONTROL_RESPONSES: {e}")
raise

async def subscribe_qiki_proposals(
self,
callback: Callable[[Dict[str, Any]], Awaitable[None]],
) -> None:
"""Subscribe to QIKI proposals batches (core NATS)."""
if not self.nc:
raise RuntimeError("Not connected to NATS")

async def message_handler(msg):
try:
data = json.loads(msg.data.decode())
await callback(
{
"stream": "QIKI_PROPOSALS",
"timestamp": datetime.now().isoformat(),
"subject": getattr(msg, "subject", None),
"data": data,
}
)
except Exception as e:
print(f"Error processing QIKI_PROPOSALS message: {e}")

subject = os.getenv("QIKI_PROPOSALS_SUBJECT", QIKI_PROPOSALS_V1)
try:
sub = await self.nc.subscribe(subject, cb=message_handler)
self.subscriptions["QIKI_PROPOSALS"] = sub
print(f"✅ Subscribed to QIKI_PROPOSALS: {subject}")
except Exception as e:
print(f"❌ Failed to subscribe to QIKI_PROPOSALS: {e}")
raise

async def get_jetstream_info(self) -> Dict[str, Any]:
"""Get JetStream account info."""
Expand Down
Loading