diff --git a/CONTEXT.md b/CONTEXT.md index 6205d34..cc145ff 100644 --- a/CONTEXT.md +++ b/CONTEXT.md @@ -134,9 +134,69 @@ Two kinds: Distinct from the **Component Catalog**: catalog is metadata for _registration_ (what UI shows, how to construct); Wiring is per-impl _behavior_ applied after construction. -## Runtime Context +## Runtime Services -Read-only bundle passed to component factories at construction time: connected brokers, configured LLM providers. Lets a component pluck the bits it needs (e.g. `Llm` reads its provider's `base_url`/`api_key`) without mutating `node.data` upstream. Empty for components with no external deps. +Typed bundle of every external service the runtime can hand to a component, threaded through component construction. Lives in `runtime/services/mod.rs` as `RuntimeServices`. One field per **Capability Trait** / **Service Registry**: + +- `llm_registry: Arc` +- `mqtt_publisher: Arc` +- _(future kinds — HTTP, OSC, WebSocket — accrete as new fields with no churn in unrelated components)_ + +Built once at application startup (`AppState::run`) and reused across every `flow_update`. `Clone` so it can be carried alongside a pending `FlowUpdate` on `AppState::pending_flow` and replayed on board-connect without losing live registries. + +Replaces the former `RuntimeContext` struct: same role, more honest name. The corresponding ADR is [ADR-0002](docs/adr/0002-per-capability-service-traits.md). + +## Component Deps + +The associated `type Deps: FromServices` on the `ComponentBuilder` trait — each impl's typed record of the slice of **Runtime Services** it needs to construct. + +- Components that need nothing declare `type Deps = ();` (the default-friendliest shape — `FromServices for ()` returns unit). 29 of the 32 Catalog impls do this. +- `Llm` declares `type Deps = Arc`. +- `Mqtt` and `Figma` declare `type Deps = Arc`. + +The component registry's `Factory` closure (`runtime/registry.rs::make_factory`) projects `::from_services(services)` at build time and hands the slice to `B::build`. Adding a new external kind = add a field to `RuntimeServices` + a `FromServices` impl for the new `Arc<..>` — zero touches in the 29 unaffected builders. + +## Capability Trait + +A Rust trait describing one external kind's outbound operations (e.g. [`LlmProvider`](#llm-provider)`::generate`, `MqttPublisher::publish`). Lives in `runtime/services/.rs`. Components depend on `Arc` (or on the **Service Registry** that maps id → `Arc`), never on the concrete HTTP client / broker library. + +Each Capability Trait ships with two adapters from day one — a production impl (e.g. `HttpLlmProvider`) and a recording test impl (e.g. `RecordingLlmProvider`) — which is what makes the trait a real seam rather than a hypothetical one (same rule as **BoardHandle** + **TestIoLoop**). + +See [ADR-0002](docs/adr/0002-per-capability-service-traits.md). + +## Service Registry + +Live, mutable map of `id → Arc` for one capability kind (e.g. `LlmRegistry`, future `MqttRegistry`). Lives in `runtime/services/.rs`. The frontend's authoritative list is pushed in full via `sync(providers: Vec<(id, Arc)>)`; existing in-flight calls against the previous instance run to completion, subsequent lookups see the new entry. + +Components hold `Arc` and resolve the **Capability Trait** by id at dispatch time, not at construction time. Consequence: credential / endpoint rotation takes effect on the next call, no component rebuild, no flow_update re-fire. + +Replaces the parallel `LlmManager` + `RuntimeContext.providers` dual-state pattern. + +## LLM Provider + +The **Capability Trait** for any backend that can run an LLM completion against an OpenAI-compatible `/v1/chat/completions` request shape. Lives in `runtime/services/llm.rs`. Carries one method: + +```rust +async fn generate(&self, request: LlmRequest) -> Result; +``` + +`LlmRequest` is `{ model, system: Option, prompt }` — template substitution is the caller's job, the provider sees the rendered text. `LlmResponse` is `{ text }` for now; token counts / finish reasons accrete only when a consumer needs them. + +Production adapter: `HttpLlmProvider` (one `reqwest::Client` per instance for connection-pool reuse; empty `api_key` skips the `Authorization` header so local Ollama works). Test adapter: `RecordingLlmProvider` (records every request, returns scripted responses or errors from a FIFO queue, returns `LlmError::Cancelled` when the script is exhausted). + +## MQTT Publisher + +The **Capability Trait** for any backend that can publish a single MQTT message. Lives in `runtime/services/mqtt.rs`. One method: + +```rust +async fn publish(&self, broker_id: &str, topic: &str, payload: &[u8], retain: bool) -> Result<(), MqttPublishError>; +``` + +`MqttPublishError::BrokerNotConnected` is distinguished from `PublishFailed` so callers (UI, logs) can prompt for a reconnect instead of surfacing a generic wire error. + +Production adapter: `crate::mqtt::manager::MqttManager` (via `impl MqttPublisher for MqttManager` in `runtime/services/mqtt.rs`) — delegates to the existing broker pool, translating the legacy `String` error into the typed variant. Test adapter: `RecordingMqttPublisher` (records every `(broker_id, topic, payload, retain)` tuple and pops scripted errors from a FIFO queue). + +`Mqtt` and `Figma` components hold `Arc` and call `publish(...)` directly from their `dispatch` arms via a Tokio-spawned task. Replaces the legacy `_mqtt_publish` reserved-event pattern (component emits a JSON-encoded publish request, `lib.rs` parses it and re-routes through a dedicated handler thread) — that path was retired in [ADR-0002](docs/adr/0002-per-capability-service-traits.md) Phase 3. ## Host Adapter diff --git a/apps/web/src-tauri/Cargo.lock b/apps/web/src-tauri/Cargo.lock index 2dd632a..bf6f397 100644 --- a/apps/web/src-tauri/Cargo.lock +++ b/apps/web/src-tauri/Cargo.lock @@ -2923,6 +2923,7 @@ dependencies = [ name = "microflow" version = "0.7.0" dependencies = [ + "async-trait", "boa_engine", "criterion", "dashmap", diff --git a/apps/web/src-tauri/Cargo.toml b/apps/web/src-tauri/Cargo.toml index 16fb520..f475f23 100644 --- a/apps/web/src-tauri/Cargo.toml +++ b/apps/web/src-tauri/Cargo.toml @@ -40,6 +40,7 @@ thiserror = "1" reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } boa_engine = "0.20" dashmap = "6" +async-trait = "0.1" [dev-dependencies] proptest = "1" diff --git a/apps/web/src-tauri/src/lib.rs b/apps/web/src-tauri/src/lib.rs index e109590..80bd07d 100644 --- a/apps/web/src-tauri/src/lib.rs +++ b/apps/web/src-tauri/src/lib.rs @@ -45,24 +45,13 @@ pub mod runtime; pub use error::*; use hardware::HardwareService; -use llm::LlmManager; use mqtt::MqttManager; -use runtime::{FlowRuntime, FlowUpdate, RuntimeContext}; +use runtime::services::{LlmRegistry, MqttPublisher, RuntimeServices}; +use runtime::{FlowRuntime, FlowUpdate}; use std::sync::{Arc, Mutex, RwLock}; use tauri::{Emitter, Listener}; -use tokio::sync::mpsc; use tokio::sync::Mutex as TokioMutex; -/// MQTT publish request from flow components -#[derive(Debug, Clone)] -pub struct MqttPublishRequest { - pub component_id: String, - pub broker_id: String, - pub topic: String, - pub payload: String, - pub retain: bool, -} - /// Tracks active Figma MQTT subscriptions so they can be cleaned up on flow switch #[derive(Debug, Clone)] pub struct FigmaSubscription { @@ -74,16 +63,23 @@ pub struct FigmaSubscription { pub struct AppState { pub hardware_service: Arc>, pub flow_runtime: Arc>, - /// Pending flow update + its runtime context, applied when the board connects. - pub pending_flow: Arc>>, + /// Pending flow update + the runtime services bundle live at + /// `flow_update` time, applied when the board connects. + pub pending_flow: Arc>>, /// Whether a Firmata board is connected pub board_connected: Arc>, /// MQTT broker manager pub mqtt_manager: MqttManager, - /// Channel for MQTT publish requests from flow components - pub mqtt_publish_tx: mpsc::UnboundedSender, - /// LLM provider manager - pub llm_manager: LlmManager, + /// MQTT publish handle wired into the runtime via `RuntimeContext` so + /// `Mqtt` / `Figma` components publish straight through the manager + /// instead of emitting `_mqtt_publish` events for `lib.rs` to re-route + /// (ADR-0002 Phase 3). + pub mqtt_publisher: Arc, + /// Live LLM provider registry. Shared with `RuntimeContext` so components + /// resolve providers at dispatch time and pick up credential rotation + /// without rebuilding. Filled by the `flow_update` and + /// `llm_sync_providers` Tauri commands. + pub llm_registry: Arc, /// Active Figma MQTT subscriptions (cleaned up on flow switch) pub figma_subscriptions: Arc>>, } @@ -99,13 +95,13 @@ pub fn run() { let flow_runtime = Arc::new(TokioMutex::new(FlowRuntime::new())); let pending_flow = Arc::new(RwLock::new(None)); let board_connected = Arc::new(RwLock::new(false)); - - // Create channel for MQTT publish requests - let (mqtt_publish_tx, mqtt_publish_rx) = mpsc::unbounded_channel::(); let mqtt_manager = MqttManager::new(); - let mqtt_manager_for_publish = mqtt_manager.clone(); - let llm_manager = LlmManager::new(); + // `MqttManager` is `Clone` (its broker map lives behind `Arc>`), + // so the cloned instance handed to the dyn-trait `Arc` shares the same + // broker pool as the one held on `AppState`. + let mqtt_publisher: Arc = Arc::new(mqtt_manager.clone()); + let llm_registry = Arc::new(LlmRegistry::new()); let app_state = AppState { hardware_service: Arc::clone(&hardware_service), @@ -113,8 +109,8 @@ pub fn run() { pending_flow: Arc::clone(&pending_flow), board_connected: Arc::clone(&board_connected), mqtt_manager, - mqtt_publish_tx: mqtt_publish_tx.clone(), - llm_manager, + mqtt_publisher, + llm_registry, figma_subscriptions: Arc::new(TokioMutex::new(Vec::new())), }; @@ -150,53 +146,30 @@ pub fn run() { // Use blocking_lock() for sync context during setup let event_rx = flow_runtime.blocking_lock().take_event_receiver(); - // Set up event forwarding from flow runtime + // Set up event forwarding from flow runtime. + // + // Outbound MQTT publishes used to ride this channel under the + // `_mqtt_publish` reserved handle — components emitted the + // request as a JSON value, this thread parsed it and + // re-dispatched to a dedicated publish-handler thread. Since + // ADR-0002 Phase 3 that path is gone: `Mqtt` / `Figma` hold an + // `Arc` and publish directly. The event + // channel now carries only plain component events, all of + // which go straight to the frontend + executor. let app_handle_events = app_handle.clone(); let flow_runtime_events = Arc::clone(&flow_runtime); - let mqtt_publish_tx_events = mqtt_publish_tx.clone(); std::thread::spawn(move || { log::info!("Event forwarding thread started"); if let Some(mut rx) = event_rx { log::info!("Event receiver obtained, waiting for events..."); while let Some(event) = rx.blocking_recv() { - log::trace!("Event: {} ({}) -> {:?}", - event.source, event.source_handle, event.value); - - // Handle MQTT publish events specially - // These are emitted by MQTT publish nodes when they receive input - if event.source_handle.as_ref() == "_mqtt_publish" { - log::debug!("[MQTT] Publish event from component {}", event.source); - - // Parse the JSON publish info from the event value - if let runtime::ComponentValue::String(json_str) = &event.value { - if let Ok(publish_info) = serde_json::from_str::(json_str) { - let broker_id = publish_info.get("brokerId").and_then(|v| v.as_str()).unwrap_or("").to_string(); - let topic = publish_info.get("topic").and_then(|v| v.as_str()).unwrap_or("").to_string(); - let payload = publish_info.get("payload").and_then(|v| v.as_str()).unwrap_or("").to_string(); - let retain = publish_info.get("retain").and_then(serde_json::Value::as_bool).unwrap_or(false); - - log::info!("[MQTT] Publishing to broker={broker_id}, topic={topic}, payload={payload}, retain={retain}"); - - if !broker_id.is_empty() && !topic.is_empty() { - let _ = mqtt_publish_tx_events.send(MqttPublishRequest { - component_id: event.source.to_string(), - broker_id, - topic, - payload, - retain, - }); - } else { - log::warn!("[MQTT] Publish request missing broker_id or topic"); - } - } else { - log::error!("[MQTT] Failed to parse publish info JSON"); - } - } - continue; - } - + log::trace!( + "Event: {} ({}) -> {:?}", + event.source, event.source_handle, event.value + ); + let _ = app_handle_events.emit("component-event", &event); - + // This thread is std::thread::spawn, NOT tokio::spawn — blocking_lock() is safe // here and will not stall the Tokio executor. The original try_lock() rationale // ("avoid blocking the async runtime") was incorrect for this call site. @@ -211,34 +184,6 @@ pub fn run() { } }); - // Spawn MQTT publish handler thread - let mqtt_manager_publish = mqtt_manager_for_publish; - std::thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async move { - let mut rx = mqtt_publish_rx; - log::info!("[MQTT] Publish handler thread started"); - while let Some(request) = rx.recv().await { - log::info!("[MQTT] Processing publish request for component {}", request.component_id); - - // For now, we need the broker_id and topic from the request - // In a full implementation, we'd look these up from the component - if !request.broker_id.is_empty() && !request.topic.is_empty() { - if let Err(e) = mqtt_manager_publish.publish( - &request.broker_id, - &request.topic, - request.payload.as_bytes(), - request.retain, - ).await { - log::error!("[MQTT] Failed to publish: {e}"); - } - } else { - log::warn!("[MQTT] Publish request missing broker_id or topic"); - } - } - }); - }); - // Listen for board-state events from hardware monitor let flow_runtime_board = Arc::clone(&flow_runtime); let pending_flow_board = Arc::clone(&pending_flow_setup); @@ -255,12 +200,12 @@ pub fn run() { *board_connected_listener.write().unwrap_or_else(std::sync::PoisonError::into_inner) = true; // Apply pending flow if any, which will also install the callback - if let Some((flow, ctx)) = pending_flow_board.write().unwrap_or_else(std::sync::PoisonError::into_inner).take() { + if let Some((flow, services)) = pending_flow_board.write().unwrap_or_else(std::sync::PoisonError::into_inner).take() { log::info!("Applying pending flow: {} nodes, {} edges", flow.nodes.len(), flow.edges.len()); // Use blocking_lock() for async mutex in sync callback context let mut runtime = flow_runtime_board.blocking_lock(); - if let Err(e) = runtime.update_flow(flow, &ctx) { + if let Err(e) = runtime.update_flow(flow, &services) { log::error!("Failed to apply pending flow: {e}"); } else if let Err(e) = runtime.initialize_hardware() { log::warn!("Failed to initialize hardware after pending flow: {e}"); diff --git a/apps/web/src-tauri/src/llm/commands.rs b/apps/web/src-tauri/src/llm/commands.rs index 10cabf2..1e79d3f 100644 --- a/apps/web/src-tauri/src/llm/commands.rs +++ b/apps/web/src-tauri/src/llm/commands.rs @@ -1,13 +1,21 @@ -//! Tauri commands for LLM provider management +//! Tauri commands for LLM provider management. +//! +//! `llm_sync_providers` is the frontend's primary sync path: it pushes the +//! authoritative list of provider records straight into the shared +//! [`crate::runtime::services::LlmRegistry`] held on [`crate::AppState`]. +//! Once synced, every live `Llm` component sees the new credentials on its +//! next `dispatch("trigger")` — no flow_update re-fire required (ADR-0002). -use super::provider::ProviderConfig; +use crate::runtime::services::{HttpLlmProvider, LlmProvider}; use crate::AppState; +use std::sync::Arc; use tauri::State; /// Provider config payload from the frontend #[derive(Debug, Clone, serde::Deserialize)] pub struct SyncProviderConfig { pub id: String, + #[allow(dead_code)] pub name: String, pub base_url: String, pub api_key: String, @@ -20,17 +28,18 @@ pub async fn llm_sync_providers( state: State<'_, AppState>, providers: Vec, ) -> Result<(), String> { - let configs: Vec = providers + let count = providers.len(); + let entries: Vec<(String, Arc)> = providers .into_iter() - .map(|p| ProviderConfig { - id: p.id, - name: p.name, - base_url: p.base_url, - api_key: p.api_key, + .map(|p| { + let provider: Arc = + Arc::new(HttpLlmProvider::new(p.base_url, p.api_key)); + (p.id, provider) }) .collect(); - state.llm_manager.sync(configs).await; + state.llm_registry.sync(entries).await; + log::info!("[LLM] Synced {count} provider(s) into registry"); Ok(()) } diff --git a/apps/web/src-tauri/src/llm/manager.rs b/apps/web/src-tauri/src/llm/manager.rs deleted file mode 100644 index d9f17c5..0000000 --- a/apps/web/src-tauri/src/llm/manager.rs +++ /dev/null @@ -1,44 +0,0 @@ -//! LLM Provider Manager -//! -//! Stores provider configs keyed by ID. No persistent connections needed — -//! LLM calls are stateless HTTP requests resolved on demand. - -use super::provider::ProviderConfig; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::RwLock; - -#[derive(Clone)] -pub struct LlmManager { - providers: Arc>>, -} - -impl LlmManager { - #[must_use] - pub fn new() -> Self { - Self { - providers: Arc::new(RwLock::new(HashMap::new())), - } - } - - /// Replace all providers with the new set (called on sync from frontend) - pub async fn sync(&self, configs: Vec) { - let mut map = self.providers.write().await; - map.clear(); - for config in configs { - map.insert(config.id.clone(), config); - } - log::info!("[LLM] Synced {} provider(s)", map.len()); - } - - /// Retrieve a provider config by ID - pub async fn get(&self, id: &str) -> Option { - self.providers.read().await.get(id).cloned() - } -} - -impl Default for LlmManager { - fn default() -> Self { - Self::new() - } -} diff --git a/apps/web/src-tauri/src/llm/mod.rs b/apps/web/src-tauri/src/llm/mod.rs index d06eb4a..d8e904d 100644 --- a/apps/web/src-tauri/src/llm/mod.rs +++ b/apps/web/src-tauri/src/llm/mod.rs @@ -1,5 +1,6 @@ -pub mod commands; -pub mod manager; -pub mod provider; +//! Tauri-facing LLM glue: the `llm_sync_providers` / `llm_test_provider` +//! commands. The runtime-side capability trait, registry, and provider impls +//! live in `runtime/services/llm.rs`; this module exists only to wire those +//! into Tauri's invoke handler. See `docs/adr/0002-per-capability-service-traits.md`. -pub use manager::LlmManager; +pub mod commands; diff --git a/apps/web/src-tauri/src/llm/provider.rs b/apps/web/src-tauri/src/llm/provider.rs deleted file mode 100644 index 5cc4b72..0000000 --- a/apps/web/src-tauri/src/llm/provider.rs +++ /dev/null @@ -1,14 +0,0 @@ -//! LLM Provider configuration - -use serde::{Deserialize, Serialize}; - -/// A configured LLM provider (`OpenRouter`, Ollama, `OpenAI`-compatible endpoint, etc.) -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ProviderConfig { - pub id: String, - pub name: String, - /// Base URL — e.g. `http://localhost:11434` or `https://openrouter.ai/api/v1` - pub base_url: String, - /// API key — empty for local Ollama, required for `OpenRouter` / `OpenAI` - pub api_key: String, -} diff --git a/apps/web/src-tauri/src/runtime/board/receipt.rs b/apps/web/src-tauri/src/runtime/board/receipt.rs index 3191d8a..ab5ec5e 100644 --- a/apps/web/src-tauri/src/runtime/board/receipt.rs +++ b/apps/web/src-tauri/src/runtime/board/receipt.rs @@ -43,14 +43,10 @@ impl CommandReceipt { /// Consume into an async future. `.await` from Tauri commands or any /// async context. - pub fn into_future( - self, - ) -> impl std::future::Future> + Send + 'static { - async move { - self.rx - .await - .unwrap_or(Err(HardwareError::Disconnected)) - } + pub async fn into_future(self) -> Result<(), HardwareError> { + self.rx + .await + .unwrap_or(Err(HardwareError::Disconnected)) } /// Drop the receipt without consuming the outcome. Makes fire-and-forget diff --git a/apps/web/src-tauri/src/runtime/builders.rs b/apps/web/src-tauri/src/runtime/builders.rs index 04acf2b..c9485ae 100644 --- a/apps/web/src-tauri/src/runtime/builders.rs +++ b/apps/web/src-tauri/src/runtime/builders.rs @@ -1,67 +1,82 @@ //! Catalog [`ComponentBuilder`] impls for every entry in `node-components.json`. //! -//! Each impl plugs a Component into the registry by declaring its `Config` -//! type and a `build` constructor. Almost all impls reduce to -//! `Ok(Self::new(id, config))`; the `Llm` entry uses the [`RuntimeContext`] to -//! resolve a provider record into its `base_url` / `api_key`. +//! Each impl plugs a Component into the registry by declaring its +//! [`Config`](ComponentBuilder::Config) type, its +//! [`Deps`](ComponentBuilder::Deps) — the typed slice of +//! [`super::services::RuntimeServices`] it needs — and a `build` +//! constructor. Almost all impls reduce to `Ok(Self::new(id, config))` +//! with `type Deps = ();` the three `external/` impls declare what they +//! actually pull from the services bundle: //! -//! Keeping these impls in one module avoids editing 30+ component files when -//! the registry contract evolves. The catalog flag `usesRuntimeContext` is no -//! longer load-bearing: every builder takes a `RuntimeContext` and the ones -//! that don't need it simply ignore the argument. +//! - `Llm` declares `Deps = Arc` so it can resolve providers +//! at dispatch time (ADR-0002 Phase 2). +//! - `Mqtt` / `Figma` declare `Deps = Arc` so they can +//! publish directly through the capability trait (ADR-0002 Phase 3). +//! +//! Keeping these impls in one module avoids editing 30+ component files +//! when the registry contract evolves. + +use std::sync::Arc; use crate::error::RuntimeError; use super::component::ComponentBuilder; -use super::context::RuntimeContext; +use super::services::{LlmRegistry, MqttPublisher}; // --- input -------------------------------------------------------------- impl ComponentBuilder for super::input::Button { type Config = super::input::ButtonConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::input::Hotkey { type Config = super::input::HotkeyConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::input::I2cDevice { type Config = super::input::I2cDeviceConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::input::Motion { type Config = super::input::MotionConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::input::Proximity { type Config = super::input::ProximityConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::input::Sensor { type Config = super::input::SensorConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::input::Switch { type Config = super::input::SwitchConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } @@ -70,63 +85,72 @@ impl ComponentBuilder for super::input::Switch { impl ComponentBuilder for super::output::Led { type Config = super::output::LedConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::output::Matrix { type Config = super::output::MatrixConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::output::Monitor { type Config = super::output::MonitorConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::output::Piezo { type Config = super::output::PiezoConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::output::Pixel { type Config = super::output::PixelConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::output::Relay { type Config = super::output::RelayConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::output::Rgb { type Config = super::output::RgbConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::output::Servo { type Config = super::output::ServoConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::output::Stepper { type Config = super::output::StepperConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } @@ -135,42 +159,48 @@ impl ComponentBuilder for super::output::Stepper { impl ComponentBuilder for super::transformation::Calculate { type Config = super::transformation::CalculateConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::transformation::Compare { type Config = super::transformation::CompareConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::transformation::Function { type Config = super::transformation::FunctionConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::transformation::Gate { type Config = super::transformation::GateConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::transformation::RangeMap { type Config = super::transformation::RangeMapConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::transformation::Smooth { type Config = super::transformation::SmoothConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } @@ -179,21 +209,24 @@ impl ComponentBuilder for super::transformation::Smooth { impl ComponentBuilder for super::control::Counter { type Config = super::control::CounterConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::control::Delay { type Config = super::control::DelayConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::control::Trigger { type Config = super::control::TriggerConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } @@ -202,21 +235,24 @@ impl ComponentBuilder for super::control::Trigger { impl ComponentBuilder for super::generator::Constant { type Config = super::generator::ConstantConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::generator::Interval { type Config = super::generator::IntervalConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } impl ComponentBuilder for super::generator::Oscillator { type Config = super::generator::OscillatorConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { + type Deps = (); + fn build(id: String, config: Self::Config, _deps: Self::Deps) -> Result { Ok(Self::new(id, config)) } } @@ -225,33 +261,28 @@ impl ComponentBuilder for super::generator::Oscillator { impl ComponentBuilder for super::external::Figma { type Config = super::external::FigmaConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { - Ok(Self::new(id, config)) + type Deps = Arc; + fn build(id: String, config: Self::Config, deps: Self::Deps) -> Result { + Ok(Self::new(id, config, deps)) } } impl ComponentBuilder for super::external::Mqtt { type Config = super::external::MqttConfig; - fn build(id: String, config: Self::Config, _ctx: &RuntimeContext) -> Result { - Ok(Self::new(id, config)) + type Deps = Arc; + fn build(id: String, config: Self::Config, deps: Self::Deps) -> Result { + Ok(Self::new(id, config, deps)) } } -/// `Llm` is the one Catalog entry that consults [`RuntimeContext`]: it resolves -/// a frontend-side provider record (by id) into the LLM endpoint's `base_url` -/// and `api_key`. Previously this lived in a `from_data` codegen branch; now -/// it sits beside the other builders. +/// `Llm` is the one Catalog entry that needs the LLM registry: it pulls the +/// shared [`LlmRegistry`] handle and hands it to the component, which then +/// resolves its `provider_id` against the registry at dispatch time +/// (ADR-0002 Phase 2). impl ComponentBuilder for super::external::Llm { type Config = super::external::LlmConfig; - fn build( - id: String, - mut config: Self::Config, - ctx: &RuntimeContext, - ) -> Result { - if let Some(provider) = ctx.provider(&config.provider_id) { - config.base_url.clone_from(&provider.base_url); - config.api_key.clone_from(&provider.api_key); - } - Ok(Self::new(id, config)) + type Deps = Arc; + fn build(id: String, config: Self::Config, deps: Self::Deps) -> Result { + Ok(Self::new(id, config, deps)) } } diff --git a/apps/web/src-tauri/src/runtime/commands.rs b/apps/web/src-tauri/src/runtime/commands.rs index e54b9a2..6f78be9 100644 --- a/apps/web/src-tauri/src/runtime/commands.rs +++ b/apps/web/src-tauri/src/runtime/commands.rs @@ -3,7 +3,7 @@ //! `flow_update` and `component_call` commands use super::base::ComponentValue; -use super::context::{ProviderEntry, RuntimeContext}; +use super::services::{HttpLlmProvider, LlmProvider, RuntimeServices}; use super::wiring::SubscriberWiring; use super::FlowUpdate; use crate::AppState; @@ -74,14 +74,32 @@ pub async fn flow_update( } } - // Build the RuntimeContext that components like Llm consult during construction. - let ctx = RuntimeContext { - providers: providers.unwrap_or_default().into_iter().map(|p| ProviderEntry { - id: p.id, - base_url: p.base_url, - api_key: p.api_key, - }).collect(), - }; + // Sync any provider records straight into the shared LlmRegistry so + // components built by this flow_update — and any subsequent + // `Llm::dispatch("trigger")` — see live credentials. This replaces the + // legacy build-time snapshot into `LlmConfig.base_url/api_key`. See + // ADR-0002 § Decision D2. + if let Some(provider_configs) = providers { + let entries: Vec<(String, Arc)> = provider_configs + .into_iter() + .map(|p| { + let provider: Arc = + Arc::new(HttpLlmProvider::new(p.base_url, p.api_key)); + (p.id, provider) + }) + .collect(); + state.llm_registry.sync(entries).await; + } + + // RuntimeServices for component factories. The registry's factory + // closure projects each impl's typed `Deps` out of this bundle via + // `FromServices`, so components that need nothing pay nothing and + // components that need an `LlmRegistry` / `MqttPublisher` receive + // exactly the shared `Arc` they declared. + let services = RuntimeServices::new( + Arc::clone(&state.llm_registry), + Arc::clone(&state.mqtt_publisher), + ); // Apply the flow first so components are constructed and can describe their wiring. let board_connected = *state.board_connected.read().unwrap_or_else(std::sync::PoisonError::into_inner); @@ -89,7 +107,7 @@ pub async fn flow_update( let component_wirings: Vec<(String, SubscriberWiring)> = { let mut runtime = state.flow_runtime.lock().await; - runtime.update_flow(flow.clone(), &ctx)?; + runtime.update_flow(flow.clone(), &services)?; if board_connected { if let Err(e) = runtime.initialize_hardware() { @@ -100,7 +118,7 @@ pub async fn flow_update( // shared `WiringRegistry` indices. Nothing to reinstall here. } else { log::info!("Board not connected — storing pending flow for hardware init on connect"); - *state.pending_flow.write().unwrap_or_else(std::sync::PoisonError::into_inner) = Some((flow, ctx.clone())); + *state.pending_flow.write().unwrap_or_else(std::sync::PoisonError::into_inner) = Some((flow, services.clone())); } runtime.collect_subscriber_wirings() diff --git a/apps/web/src-tauri/src/runtime/component.rs b/apps/web/src-tauri/src/runtime/component.rs index bd153da..5e5a4b8 100644 --- a/apps/web/src-tauri/src/runtime/component.rs +++ b/apps/web/src-tauri/src/runtime/component.rs @@ -14,7 +14,7 @@ use std::sync::Arc; use tokio::sync::mpsc; use super::board::BoardHandle; -use super::context::RuntimeContext; +use super::services::FromServices; /// Value that a component can hold #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -295,10 +295,21 @@ pub trait HardwareComponent: Component { /// Catalog factory contract used by [`ComponentRegistry`]. /// /// Every entry in the Component Catalog (`node-components.json`) implements -/// this trait. `build` consumes the deserialized `Config` plus the active -/// [`RuntimeContext`] and produces the concrete component. Deserialization -/// errors surface as [`RuntimeError::ConfigDeserialize`] inside the registry — -/// no silent `Default` fallback. +/// this trait. `build` consumes the deserialized [`Config`](Self::Config) +/// plus the typed [`Deps`](Self::Deps) the component needs (projected from +/// the active [`super::services::RuntimeServices`] by the registry) and +/// produces the concrete component. Deserialization errors surface as +/// [`RuntimeError::ConfigDeserialize`] inside the registry — no silent +/// `Default` fallback. +/// +/// `Deps` is the per-impl record of which external services the component +/// touches. Components that need nothing declare `type Deps = ();` — the +/// `()` impl of [`FromServices`] returns unit. The `Llm` component +/// declares `type Deps = Arc`; `Mqtt` / `Figma` declare +/// `type Deps = Arc`. Adding a new external kind = add +/// a field to `RuntimeServices` and a `FromServices` impl for the new +/// `Arc<..>`; the 29 unaffected builders keep their `type Deps = ()`. +/// See `CONTEXT.md` § Runtime Services and § Capability Trait. /// /// Hardware components are bound by `register_hardware::(name)` in the /// registry, which adds a `HardwareComponent` bound so a catalog @@ -308,13 +319,16 @@ pub trait ComponentBuilder: Component + Sized + 'static { /// The deserialized configuration shape declared by this component. type Config: serde::de::DeserializeOwned; - /// Construct the component from a config plus the active runtime context. - /// Most impls ignore `ctx`; the `Llm` node uses it to resolve a provider - /// referenced by `provider_id`. + /// The typed services this component needs from + /// [`super::services::RuntimeServices`]. Use `()` for components that + /// need nothing. + type Deps: FromServices; + + /// Construct the component from a config plus the projected `Deps`. fn build( id: String, config: Self::Config, - ctx: &RuntimeContext, + deps: Self::Deps, ) -> Result; } diff --git a/apps/web/src-tauri/src/runtime/context.rs b/apps/web/src-tauri/src/runtime/context.rs deleted file mode 100644 index 7f725ef..0000000 --- a/apps/web/src-tauri/src/runtime/context.rs +++ /dev/null @@ -1,27 +0,0 @@ -//! Read-only bundle passed to component factories at construction time. -//! -//! Lets a component pluck external resources it needs (e.g. `Llm` reads its -//! provider's `base_url`/`api_key`) without mutating `node.data` upstream. -//! See `CONTEXT.md` § Runtime Context. - -#[derive(Debug, Clone)] -pub struct ProviderEntry { - pub id: String, - pub base_url: String, - pub api_key: String, -} - -#[derive(Debug, Default, Clone)] -pub struct RuntimeContext { - pub providers: Vec, -} - -impl RuntimeContext { - #[must_use] - pub fn empty() -> Self { Self::default() } - - #[must_use] - pub fn provider(&self, id: &str) -> Option<&ProviderEntry> { - self.providers.iter().find(|p| p.id == id) - } -} diff --git a/apps/web/src-tauri/src/runtime/external/figma.rs b/apps/web/src-tauri/src/runtime/external/figma.rs index 1207759..d2c5f51 100644 --- a/apps/web/src-tauri/src/runtime/external/figma.rs +++ b/apps/web/src-tauri/src/runtime/external/figma.rs @@ -7,13 +7,17 @@ //! - Subscribes to `microflow/${uniqueId}/figma/variable/${shortVarId}` and //! `microflow/${uniqueId}/app/variable/${shortVarId}` for incoming values. //! - Emits a "change" event downstream when a value arrives. -//! - Handles `dispatch` (set / toggle / true / false / increment / decrement / reset / red / -//! green / blue / opacity) by emitting a `_mqtt_publish` event that lib.rs intercepts. +//! - Handles `dispatch` (set / toggle / true / false / increment / decrement / +//! reset / red / green / blue / opacity) by computing a payload and calling +//! the held `Arc` directly. Replaces the legacy +//! `_mqtt_publish` event-out pattern (ADR-0002 Phase 3). use crate::runtime::base::{Component, ComponentBase, ComponentValue}; +use crate::runtime::services::MqttPublisher; use crate::runtime::wiring::SubscriberWiring; use serde::{Deserialize, Serialize}; use std::borrow::Cow; +use std::sync::Arc; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -53,14 +57,21 @@ impl Default for FigmaConfig { pub struct Figma { base: ComponentBase, config: FigmaConfig, + /// Shared MQTT publish handle. Cloned into each spawned publish task. + publisher: Arc, + /// Tokio handle captured at construction so `dispatch` (sync) can spawn + /// the async publish call. + rt_handle: Option, } impl Figma { #[must_use] - pub fn new(id: String, config: FigmaConfig) -> Self { + pub fn new(id: String, config: FigmaConfig, publisher: Arc) -> Self { Self { base: ComponentBase::new(id, ComponentValue::String(String::new())), config, + publisher, + rt_handle: tokio::runtime::Handle::try_current().ok(), } } @@ -190,18 +201,37 @@ impl Figma { } } - /// Emit a `_mqtt_publish` event intercepted by lib.rs to actually send the MQTT message. - fn publish(&mut self, payload: String) { - let info = serde_json::json!({ - "brokerId": self.config.broker_id, - "topic": self.set_topic(), - "payload": payload, - "retain": false, + /// Push a JSON payload back into Figma over the configured broker. + /// + /// Spawns the async [`MqttPublisher::publish`] call onto the captured + /// Tokio handle and logs any wire failure — the component's `dispatch` + /// path stays synchronous and never blocks on the broker. + fn publish(&self, payload: String) { + let publisher = Arc::clone(&self.publisher); + let broker_id = self.config.broker_id.clone(); + let topic = self.set_topic(); + let component_id = Arc::clone(&self.base.id); + + let Some(handle) = &self.rt_handle else { + log::error!( + "[Figma] {component_id} no Tokio runtime available, cannot spawn publish" + ); + return; + }; + + handle.spawn(async move { + log::debug!( + "[Figma] {component_id} publish → broker={broker_id} topic={topic}" + ); + if let Err(e) = publisher + .publish(&broker_id, &topic, payload.as_bytes(), false) + .await + { + log::error!( + "[Figma] {component_id} publish failed (broker={broker_id} topic={topic}): {e}" + ); + } }); - self.base.emit_with_value( - "_mqtt_publish", - Cow::Owned(ComponentValue::String(info.to_string())), - ); } /// Called by commands.rs when a message arrives on any of this component's subscribed topics. @@ -316,3 +346,100 @@ impl Component for Figma { self.receive_message(topic, payload); } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::runtime::services::RecordingMqttPublisher; + use std::time::Duration; + + async fn wait_for_publishes( + recorder: &RecordingMqttPublisher, + min: usize, + timeout: Duration, + ) -> Vec { + let deadline = tokio::time::Instant::now() + timeout; + loop { + let snap = recorder.recorded(); + if snap.len() >= min { + return snap; + } + if tokio::time::Instant::now() >= deadline { + return snap; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + } + + fn config() -> FigmaConfig { + FigmaConfig { + broker_id: "broker-1".into(), + unique_id: "uid-1".into(), + variable_id: "VariableID:1:2".into(), + resolved_type: "BOOLEAN".into(), + ..FigmaConfig::default() + } + } + + #[tokio::test] + async fn dispatch_true_publishes_true_payload_to_set_topic() { + let recorder = Arc::new(RecordingMqttPublisher::new()); + let mut figma = Figma::new( + "node-1".into(), + config(), + recorder.clone() as Arc, + ); + + figma + .dispatch("true", ComponentValue::Bool(true)) + .expect("dispatch ok"); + + let recorded = wait_for_publishes(&recorder, 1, Duration::from_secs(1)).await; + assert_eq!(recorded.len(), 1); + assert_eq!(recorded[0].broker_id, "broker-1"); + assert_eq!(recorded[0].topic, "microflow/uid-1/app/variable/1-2/set"); + assert_eq!(recorded[0].payload, b"true"); + assert!(!recorded[0].retain); + } + + #[tokio::test] + async fn dispatch_increment_publishes_summed_payload() { + let mut c = config(); + c.resolved_type = "FLOAT".into(); + let recorder = Arc::new(RecordingMqttPublisher::new()); + let mut figma = Figma::new( + "node-1".into(), + c, + recorder.clone() as Arc, + ); + // Seed current value to 5. + figma + .dispatch("set", ComponentValue::Number(5.0)) + .expect("set ok"); + // Drain the seed publish. + wait_for_publishes(&recorder, 1, Duration::from_secs(1)).await; + + figma + .dispatch("increment", ComponentValue::Number(3.0)) + .expect("increment ok"); + let recorded = wait_for_publishes(&recorder, 2, Duration::from_secs(1)).await; + assert_eq!(recorded.len(), 2); + assert_eq!(recorded[1].payload, b"8"); + } + + #[tokio::test] + async fn dispatch_unknown_method_does_not_publish() { + let recorder = Arc::new(RecordingMqttPublisher::new()); + let mut figma = Figma::new( + "node-1".into(), + config(), + recorder.clone() as Arc, + ); + let err = figma + .dispatch("definitely-not-a-method", ComponentValue::Bool(true)) + .expect_err("should fail"); + assert!(err.to_string().contains("Unknown method")); + tokio::time::sleep(Duration::from_millis(20)).await; + assert!(recorder.recorded().is_empty()); + } +} diff --git a/apps/web/src-tauri/src/runtime/external/llm.rs b/apps/web/src-tauri/src/runtime/external/llm.rs index ae74d70..c6c4f68 100644 --- a/apps/web/src-tauri/src/runtime/external/llm.rs +++ b/apps/web/src-tauri/src/runtime/external/llm.rs @@ -1,7 +1,7 @@ //! LLM Component - External //! -//! Calls an `OpenAI`-compatible LLM API (`OpenRouter`, Ollama, etc.) -//! and emits the text response downstream. +//! Resolves an [`LlmProvider`](crate::runtime::services::LlmProvider) by id +//! against the shared [`LlmRegistry`] and emits the text response downstream. //! //! # Handles //! @@ -10,18 +10,32 @@ //! - `thinking` (output, state): true while generating, false when idle //! - `done` (output, event): fires when generation completes successfully //! - `value` (output, value): emits the generated text response +//! - `error` (output, event): fires with an error message when generation fails +//! +//! Provider lookup happens *per dispatch* against [`LlmRegistry`], not at +//! `Llm::build` time — credential rotation via [`LlmRegistry::sync`] takes +//! effect on the next `trigger` without rebuilding the component. See +//! `docs/adr/0002-per-capability-service-traits.md`. use crate::runtime::base::{Component, ComponentBase, ComponentEvent, ComponentValue}; +use crate::runtime::services::{LlmError, LlmRegistry, LlmRequest}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; +/// Static config — the structural fields that describe *what* this node +/// generates. Credentials (`base_url`/`api_key`) live on the registry's +/// provider impls now, not on the node config. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct LlmConfig { + /// Human-facing provider kind label (`ollama`, `openrouter`, …). Not + /// load-bearing for the runtime; kept for parity with the frontend + /// schema and surfaced in logs. #[serde(default = "default_provider")] pub provider: String, - /// Frontend provider record id; resolved against `RuntimeContext` at construction. + /// Frontend provider record id; resolved against [`LlmRegistry`] at + /// dispatch time. #[serde(default)] pub provider_id: String, #[serde(default)] @@ -30,14 +44,9 @@ pub struct LlmConfig { pub prompt: String, #[serde(default)] pub system: String, - #[serde(default = "default_base_url")] - pub base_url: String, - #[serde(default)] - pub api_key: String, } fn default_provider() -> String { "ollama".to_string() } -fn default_base_url() -> String { "http://localhost:11434".to_string() } impl Default for LlmConfig { fn default() -> Self { @@ -47,8 +56,6 @@ impl Default for LlmConfig { model: String::new(), prompt: String::new(), system: String::new(), - base_url: default_base_url(), - api_key: String::new(), } } } @@ -62,17 +69,21 @@ pub struct Llm { rt_handle: Option, /// Abort handle for the currently running generation task running_task: Option, + /// Shared LLM provider registry. Cloned into each spawned task so the + /// lookup happens at dispatch time, not at construction time. + llm_registry: Arc, } impl Llm { #[must_use] - pub fn new(id: String, config: LlmConfig) -> Self { + pub fn new(id: String, config: LlmConfig, llm_registry: Arc) -> Self { Self { base: ComponentBase::new(id, ComponentValue::String(String::new())), config, template_vars: HashMap::new(), rt_handle: tokio::runtime::Handle::try_current().ok(), running_task: None, + llm_registry, } } @@ -103,9 +114,19 @@ impl Llm { abort.abort(); } - let config = self.config.clone(); let component_id = Arc::clone(&self.base.id); let event_sender = self.base.event_sender.clone(); + let registry = Arc::clone(&self.llm_registry); + let provider_id = self.config.provider_id.clone(); + let request = LlmRequest { + model: self.config.model.clone(), + system: if self.config.system.is_empty() { + None + } else { + Some(self.config.system.clone()) + }, + prompt, + }; let Some(handle) = &self.rt_handle else { log::error!("[Llm] {component_id} no Tokio runtime available, cannot spawn task"); @@ -125,54 +146,41 @@ impl Llm { } }; - let base = config.base_url.trim_end_matches('/'); - let url = format!("{base}/v1/chat/completions"); - - let mut messages = Vec::new(); - if !config.system.is_empty() { - messages.push(serde_json::json!({ "role": "system", "content": config.system })); - } - messages.push(serde_json::json!({ "role": "user", "content": prompt })); - - let body = serde_json::json!({ - "model": config.model, - "messages": messages, - "stream": false, - }); + let Some(provider) = registry.get(&provider_id).await else { + log::error!( + "[Llm] {component_id} provider '{provider_id}' not in registry" + ); + send("thinking", ComponentValue::Bool(false)); + send( + "error", + ComponentValue::String(format!( + "LLM provider '{provider_id}' not configured" + )), + ); + return; + }; - let client = reqwest::Client::new(); - let mut req = client.post(&url).json(&body); - if !config.api_key.is_empty() { - req = req.bearer_auth(&config.api_key); - } + log::info!( + "[Llm] {component_id} → provider={provider_id} model={}", + request.model + ); - log::info!("[Llm] {component_id} → POST {url}"); - - match req.send().await { - Ok(resp) => match resp.json::().await { - Ok(json) => { - let response = json - .get("choices") - .and_then(|c| c.get(0)) - .and_then(|c| c.get("message")) - .and_then(|m| m.get("content")) - .and_then(|s| s.as_str()) - .unwrap_or("") - .to_string(); - - log::info!("[Llm] {} response: {} chars", component_id, response.len()); - send("thinking", ComponentValue::Bool(false)); - send("value", ComponentValue::String(response)); - send("done", ComponentValue::Bool(true)); - } - Err(e) => { - log::error!("[Llm] {component_id} failed to parse response: {e}"); - send("thinking", ComponentValue::Bool(false)); - send("error", ComponentValue::String(e.to_string())); - } - }, + match provider.generate(request).await { + Ok(response) => { + log::info!( + "[Llm] {component_id} response: {} chars", + response.text.len() + ); + send("thinking", ComponentValue::Bool(false)); + send("value", ComponentValue::String(response.text)); + send("done", ComponentValue::Bool(true)); + } + Err(LlmError::Cancelled) => { + log::info!("[Llm] {component_id} cancelled"); + // No error event; the abort path disowned this task. + } Err(e) => { - log::error!("[Llm] {component_id} request to {url} failed: {e}"); + log::error!("[Llm] {component_id} generate failed: {e}"); send("thinking", ComponentValue::Bool(false)); send("error", ComponentValue::String(e.to_string())); } @@ -217,3 +225,162 @@ impl Component for Llm { log::info!("[Llm] {} destroyed", self.base.id); } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::runtime::services::{LlmProvider, RecordingLlmProvider}; + use std::time::Duration; + use tokio::sync::mpsc; + + /// Drain events until either the `done` handle fires or the deadline + /// elapses. Returns the collected events in arrival order. + async fn drain_until_done( + rx: &mut mpsc::UnboundedReceiver, + timeout: Duration, + ) -> Vec { + let mut events = Vec::new(); + let deadline = tokio::time::Instant::now() + timeout; + while tokio::time::Instant::now() < deadline { + match tokio::time::timeout(Duration::from_millis(20), rx.recv()).await { + Ok(Some(event)) => { + let handle = event.source_handle.clone(); + events.push(event); + if handle.as_ref() == "done" || handle.as_ref() == "error" { + return events; + } + } + Ok(None) => return events, + Err(_) => continue, + } + } + events + } + + #[tokio::test] + async fn dispatches_to_registry_provider_and_emits_value() { + let registry = Arc::new(LlmRegistry::new()); + let recorder = Arc::new(RecordingLlmProvider::new()); + recorder.script_ok("hi back"); + registry + .insert("test-provider".into(), recorder.clone() as Arc) + .await; + + let (tx, mut rx) = mpsc::unbounded_channel(); + let config = LlmConfig { + provider_id: "test-provider".into(), + model: "test-model".into(), + prompt: "hello".into(), + ..LlmConfig::default() + }; + + let mut llm = Llm::new("node-1".into(), config, Arc::clone(®istry)); + llm.set_event_sender(tx); + + llm.dispatch("trigger", ComponentValue::Bool(true)) + .expect("trigger ok"); + + let events = drain_until_done(&mut rx, Duration::from_secs(2)).await; + + let value = events.iter().find_map(|e| { + if e.source_handle.as_ref() == "value" { + if let ComponentValue::String(s) = &e.value { + return Some(s.clone()); + } + } + None + }); + assert_eq!(value.as_deref(), Some("hi back")); + assert!(events + .iter() + .any(|e| e.source_handle.as_ref() == "done")); + + let recorded = recorder.recorded(); + assert_eq!(recorded.len(), 1); + assert_eq!(recorded[0].model, "test-model"); + assert_eq!(recorded[0].prompt, "hello"); + assert!(recorded[0].system.is_none()); + } + + #[tokio::test] + async fn emits_error_when_provider_not_in_registry() { + let registry = Arc::new(LlmRegistry::new()); // empty + let (tx, mut rx) = mpsc::unbounded_channel(); + let config = LlmConfig { + provider_id: "missing".into(), + ..LlmConfig::default() + }; + let mut llm = Llm::new("node-1".into(), config, Arc::clone(®istry)); + llm.set_event_sender(tx); + + llm.dispatch("trigger", ComponentValue::Bool(true)).unwrap(); + + let events = drain_until_done(&mut rx, Duration::from_secs(2)).await; + let err = events.iter().find(|e| e.source_handle.as_ref() == "error"); + assert!(err.is_some(), "expected error event, got {events:?}"); + if let Some(e) = err { + if let ComponentValue::String(msg) = &e.value { + assert!(msg.contains("missing")); + } else { + panic!("error event carried non-string value"); + } + } + } + + #[tokio::test] + async fn forwards_system_prompt_when_set() { + let registry = Arc::new(LlmRegistry::new()); + let recorder = Arc::new(RecordingLlmProvider::new()); + recorder.script_ok("ok"); + registry + .insert("p".into(), recorder.clone() as Arc) + .await; + + let (tx, mut rx) = mpsc::unbounded_channel(); + let config = LlmConfig { + provider_id: "p".into(), + system: "you are terse".into(), + prompt: "hi".into(), + ..LlmConfig::default() + }; + let mut llm = Llm::new("node-1".into(), config, Arc::clone(®istry)); + llm.set_event_sender(tx); + + llm.dispatch("trigger", ComponentValue::Bool(true)).unwrap(); + drain_until_done(&mut rx, Duration::from_secs(2)).await; + + let recorded = recorder.recorded(); + assert_eq!(recorded.len(), 1); + assert_eq!(recorded[0].system.as_deref(), Some("you are terse")); + } + + #[tokio::test] + async fn substitutes_template_vars_into_prompt() { + let registry = Arc::new(LlmRegistry::new()); + let recorder = Arc::new(RecordingLlmProvider::new()); + recorder.script_ok("ok"); + registry + .insert("p".into(), recorder.clone() as Arc) + .await; + + let (tx, mut rx) = mpsc::unbounded_channel(); + let config = LlmConfig { + provider_id: "p".into(), + prompt: "hello {{name}}".into(), + ..LlmConfig::default() + }; + let mut llm = Llm::new("node-1".into(), config, Arc::clone(®istry)); + llm.set_event_sender(tx); + + // Set the template var via the {{var}} input port. + llm.dispatch("name", ComponentValue::String("world".into())) + .unwrap(); + // Trigger the generation. + llm.dispatch("trigger", ComponentValue::Bool(true)).unwrap(); + drain_until_done(&mut rx, Duration::from_secs(2)).await; + + let recorded = recorder.recorded(); + assert_eq!(recorded.len(), 1); + assert_eq!(recorded[0].prompt, "hello world"); + } +} diff --git a/apps/web/src-tauri/src/runtime/external/mqtt.rs b/apps/web/src-tauri/src/runtime/external/mqtt.rs index 10ab8de..3780650 100644 --- a/apps/web/src-tauri/src/runtime/external/mqtt.rs +++ b/apps/web/src-tauri/src/runtime/external/mqtt.rs @@ -1,19 +1,24 @@ //! MQTT Component - External //! -//! Publishes and subscribes to MQTT topics for `IoT` connectivity. -//! +//! Publishes to MQTT brokers and surfaces incoming messages on subscribe nodes. +//! //! # Architecture -//! -//! The MQTT component works with the `MqttManager` through events: -//! - Subscribe nodes: `MqttManager` routes incoming messages to the component via `receive_message` -//! - Publish nodes: Component emits values that get published via `MqttManager` //! -//! The wiring happens in lib.rs where flow events are processed. +//! - **Subscribe nodes** — describe their interest via +//! [`Component::subscriber_wiring`] and receive messages via +//! [`Component::receive_raw_message`] / `receive_message`, routed by the +//! runtime against the broker manager. Unchanged in ADR-0002 Phase 3. +//! - **Publish nodes** — on `dispatch("trigger")`, the component calls +//! [`MqttPublisher::publish`] directly through its held `Arc` +//! handle. Replaces the legacy `_mqtt_publish` event-out pattern, which +//! walked through `lib.rs`'s event-forwarding thread and a dedicated +//! publish handler thread. See `docs/adr/0002-per-capability-service-traits.md`. use crate::runtime::base::{Component, ComponentBase, ComponentValue}; +use crate::runtime::services::MqttPublisher; use crate::runtime::wiring::SubscriberWiring; use serde::{Deserialize, Serialize}; -use std::borrow::Cow; +use std::sync::Arc; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -49,64 +54,72 @@ impl Default for MqttConfig { pub struct Mqtt { base: ComponentBase, config: MqttConfig, + /// Shared MQTT publish handle. Used only by `direction = "publish"` + /// nodes; subscribe nodes ignore it. + publisher: Arc, + /// Tokio handle captured at construction so `dispatch` (sync) can + /// spawn the async `publish` call. + rt_handle: Option, } impl Mqtt { - #[must_use] - pub fn new(id: String, config: MqttConfig) -> Self { + #[must_use] + pub fn new(id: String, config: MqttConfig, publisher: Arc) -> Self { Self { base: ComponentBase::new(id, ComponentValue::String(String::new())), config, + publisher, + rt_handle: tokio::runtime::Handle::try_current().ok(), } } /// Get the full config #[allow(dead_code)] - #[must_use] + #[must_use] pub fn config(&self) -> &MqttConfig { &self.config } /// Get the broker ID this component is configured for #[allow(dead_code)] - #[must_use] + #[must_use] pub fn broker_id(&self) -> &str { &self.config.broker_id } /// Get the topic this component subscribes to or publishes on #[allow(dead_code)] - #[must_use] + #[must_use] pub fn topic(&self) -> &str { &self.config.topic } /// Check if this is a subscribe node #[allow(dead_code)] - #[must_use] + #[must_use] pub fn is_subscribe(&self) -> bool { self.config.direction == "subscribe" } /// Check if this is a publish node - #[must_use] + #[must_use] pub fn is_publish(&self) -> bool { self.config.direction == "publish" } /// Get retain flag for publish #[allow(dead_code)] - #[must_use] + #[must_use] pub fn retain(&self) -> bool { self.config.retain } - /// Called when a message is received from the broker (for subscribe nodes) - /// This is called by the `MqttManager` integration in lib.rs - #[allow(dead_code)] + /// Called when a message is received from the broker (for subscribe nodes). + /// Invoked by `executor::route_mqtt_message` once the broker delivers a + /// payload on this component's subscribed topic. pub fn receive_message(&mut self, payload: &[u8]) { let value = String::from_utf8_lossy(payload).to_string(); - + // Try to parse as number first, then bool, then keep as string let component_value = if let Ok(num) = value.parse::() { ComponentValue::Number(num) @@ -119,7 +132,19 @@ impl Mqtt { }; self.base.value = component_value.clone(); - self.base.emit_with_value("message", Cow::Owned(component_value)); + self.base + .emit_with_value("message", std::borrow::Cow::Owned(component_value)); + } + + /// Render a [`ComponentValue`] into the byte payload an MQTT broker + /// forwards. Centralised so `dispatch` is testable on its own. + fn encode_payload(value: &ComponentValue) -> Vec { + match value { + ComponentValue::String(s) => s.clone().into_bytes(), + ComponentValue::Number(n) => n.to_string().into_bytes(), + ComponentValue::Bool(b) => b.to_string().into_bytes(), + _ => Vec::new(), + } } } @@ -147,26 +172,41 @@ impl Component for Mqtt { match method { "trigger" => { if !self.is_publish() { - return Err(crate::error::RuntimeError::ComponentError("This MQTT node is configured for subscribe, not publish".to_string())); + return Err(crate::error::RuntimeError::ComponentError( + "This MQTT node is configured for subscribe, not publish".to_string(), + )); } self.base.value = args.clone(); - let payload = match &args { - ComponentValue::String(s) => s.clone(), - ComponentValue::Number(n) => n.to_string(), - ComponentValue::Bool(b) => b.to_string(), - _ => String::new(), + let payload = Self::encode_payload(&args); + let publisher = Arc::clone(&self.publisher); + let broker_id = self.config.broker_id.clone(); + let topic = self.config.topic.clone(); + let retain = self.config.retain; + let component_id = Arc::clone(&self.base.id); + + let Some(handle) = &self.rt_handle else { + log::error!( + "[MQTT] {component_id} no Tokio runtime available, cannot spawn publish" + ); + return Ok(()); }; - let publish_info = serde_json::json!({ - "brokerId": self.config.broker_id, - "topic": self.config.topic, - "payload": payload, - "retain": self.config.retain, + handle.spawn(async move { + log::info!( + "[MQTT] {component_id} publish → broker={broker_id} topic={topic} retain={retain}" + ); + if let Err(e) = publisher + .publish(&broker_id, &topic, &payload, retain) + .await + { + log::error!( + "[MQTT] {component_id} publish failed (broker={broker_id} topic={topic}): {e}" + ); + } }); - self.base.emit_with_value("_mqtt_publish", Cow::Owned(ComponentValue::String(publish_info.to_string()))); Ok(()) } _ => Err(crate::error::RuntimeError::ComponentError(format!("Unknown method: {method}"))), @@ -177,3 +217,93 @@ impl Component for Mqtt { log::info!("[MQTT] Component {} destroyed", self.base.id); } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::runtime::services::RecordingMqttPublisher; + use std::time::Duration; + + /// Spin until the recorder sees a publish call or the deadline passes. + async fn wait_for_publishes( + recorder: &RecordingMqttPublisher, + min: usize, + timeout: Duration, + ) -> Vec { + let deadline = tokio::time::Instant::now() + timeout; + loop { + let snap = recorder.recorded(); + if snap.len() >= min { + return snap; + } + if tokio::time::Instant::now() >= deadline { + return snap; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + } + + #[tokio::test] + async fn publish_node_dispatches_to_publisher() { + let recorder = Arc::new(RecordingMqttPublisher::new()); + let config = MqttConfig { + direction: "publish".into(), + broker_id: "broker-1".into(), + topic: "sensors/light".into(), + retain: true, + ..MqttConfig::default() + }; + + let mut node = Mqtt::new( + "node-1".into(), + config, + recorder.clone() as Arc, + ); + + node.dispatch("trigger", ComponentValue::Number(42.0)) + .expect("dispatch ok"); + + let recorded = wait_for_publishes(&recorder, 1, Duration::from_secs(1)).await; + assert_eq!(recorded.len(), 1); + assert_eq!(recorded[0].broker_id, "broker-1"); + assert_eq!(recorded[0].topic, "sensors/light"); + assert_eq!(recorded[0].payload, b"42"); + assert!(recorded[0].retain); + } + + #[tokio::test] + async fn subscribe_node_rejects_trigger_without_publishing() { + let recorder = Arc::new(RecordingMqttPublisher::new()); + let config = MqttConfig { + direction: "subscribe".into(), + broker_id: "broker-1".into(), + topic: "sensors/light".into(), + ..MqttConfig::default() + }; + + let mut node = Mqtt::new( + "node-1".into(), + config, + recorder.clone() as Arc, + ); + + let err = node + .dispatch("trigger", ComponentValue::Bool(true)) + .expect_err("subscribe should refuse trigger"); + assert!(err.to_string().contains("subscribe")); + // Give a tick to make sure no rogue spawn snuck a publish through. + tokio::time::sleep(Duration::from_millis(20)).await; + assert!(recorder.recorded().is_empty()); + } + + #[test] + fn encode_payload_covers_known_value_kinds() { + assert_eq!(Mqtt::encode_payload(&ComponentValue::Bool(true)), b"true"); + assert_eq!(Mqtt::encode_payload(&ComponentValue::Number(1.5)), b"1.5"); + assert_eq!( + Mqtt::encode_payload(&ComponentValue::String("hi".into())), + b"hi" + ); + assert!(Mqtt::encode_payload(&ComponentValue::Array(vec![])).is_empty()); + } +} diff --git a/apps/web/src-tauri/src/runtime/mod.rs b/apps/web/src-tauri/src/runtime/mod.rs index 1831a58..ea2aa91 100644 --- a/apps/web/src-tauri/src/runtime/mod.rs +++ b/apps/web/src-tauri/src/runtime/mod.rs @@ -34,7 +34,6 @@ pub mod board; mod builders; pub mod commands; pub mod component; -pub mod context; pub mod control; mod executor; pub mod external; @@ -45,13 +44,14 @@ pub mod pin_mode; mod registry; mod router; pub mod serde_utils; +pub mod services; pub mod transformation; mod types; pub mod wiring; mod wiring_registry; pub use base::{BoardConnection, BoardHandle, Component, ComponentEvent, ComponentValue, SerialPortWrapper}; -pub use context::{ProviderEntry, RuntimeContext}; +pub use services::RuntimeServices; pub use wiring::{ListenerWiring, SubscriberWiring}; pub use executor::FlowExecutor; // FlowEdge is re-exported for use in integration tests and external consumers @@ -250,7 +250,7 @@ impl FlowRuntime { /// /// Uses diff-based updates: unchanged nodes are kept, only modified/added/removed /// nodes are touched. This turns an O(n) operation into O(delta) for small edits. - pub fn update_flow(&mut self, update: FlowUpdate, ctx: &crate::runtime::context::RuntimeContext) -> Result<(), RuntimeError> { + pub fn update_flow(&mut self, update: FlowUpdate, services: &crate::runtime::services::RuntimeServices) -> Result<(), RuntimeError> { // Increment sequence FIRST - this ensures any events generated during // the update will have the new sequence number, and stale events from // the previous flow version will be filtered out by the executor @@ -345,7 +345,7 @@ impl FlowRuntime { &node.id, instance, &node.data, - ctx, + services, self.event_tx.clone(), board_handle.clone(), ) { diff --git a/apps/web/src-tauri/src/runtime/registry.rs b/apps/web/src-tauri/src/runtime/registry.rs index 2022d6e..6419cd8 100644 --- a/apps/web/src-tauri/src/runtime/registry.rs +++ b/apps/web/src-tauri/src/runtime/registry.rs @@ -19,21 +19,22 @@ use super::base::{BoardHandle, Component, ComponentEvent}; use super::component::{ComponentBuilder, HardwareComponent}; -use super::context::RuntimeContext; +use super::services::{FromServices, RuntimeServices}; use crate::error::RuntimeError; use serde::Deserialize; use std::sync::Arc; use tokio::sync::mpsc; -/// Boxed builder closure: deserializes a config off the catalog node `data` -/// and produces a constructed `Box`. Errors out with +/// Boxed builder closure: deserializes a config off the catalog node `data`, +/// projects the impl's typed `Deps` slice out of [`RuntimeServices`], and +/// produces a constructed `Box`. Errors out with /// [`RuntimeError::ConfigDeserialize`] if the JSON doesn't match the /// builder's `Config` type — no silent `Default` fallback. type Factory = Box< dyn Fn( String, &serde_json::Value, - &RuntimeContext, + &RuntimeServices, ) -> Result, RuntimeError> + Send + Sync, @@ -59,7 +60,7 @@ impl ComponentRegistry { id: &str, instance: &str, data: &serde_json::Value, - ctx: &RuntimeContext, + services: &RuntimeServices, event_sender: mpsc::UnboundedSender, board_handle: Arc, ) -> Result, RuntimeError> { @@ -67,7 +68,7 @@ impl ComponentRegistry { RuntimeError::ComponentNotFound(format!("Unknown component type: {instance}")) })?; - let mut component = factory(id.to_string(), data, ctx)?; + let mut component = factory(id.to_string(), data, services)?; component.set_event_sender(event_sender); // Initialize only when the component is hardware-bound and the board is @@ -135,15 +136,21 @@ fn assert_ports_match(name: &'static str, catalog_ports: &'static /// Build a [`Factory`] closure that captures the catalog name for use in /// [`RuntimeError::ConfigDeserialize`] when deserialization fails. +/// +/// The closure projects each impl's typed [`ComponentBuilder::Deps`] out of +/// the active [`RuntimeServices`] via [`FromServices`], so the builder +/// receives exactly the slice it declared (an `Arc` for +/// `Llm`, `()` for the 29 components that need nothing, etc.). fn make_factory(name: &'static str) -> Factory { - Box::new(move |id, data, ctx| { + Box::new(move |id, data, services| { let config = B::Config::deserialize(data).map_err(|e| { RuntimeError::ConfigDeserialize { component: name.to_string(), source: e, } })?; - let component = B::build(id, config, ctx)?; + let deps = ::from_services(services); + let component = B::build(id, config, deps)?; Ok(Box::new(component) as Box) }) } diff --git a/apps/web/src-tauri/src/runtime/services/llm.rs b/apps/web/src-tauri/src/runtime/services/llm.rs new file mode 100644 index 0000000..62efe8e --- /dev/null +++ b/apps/web/src-tauri/src/runtime/services/llm.rs @@ -0,0 +1,475 @@ +//! LLM **Capability Trait**, request/response value types, and the +//! `LlmRegistry` Service Registry. +//! +//! The trait is the seam: components depend on `Arc` (or +//! on `Arc` and resolve a provider by id at dispatch time), +//! never on `reqwest::Client` or a concrete endpoint. +//! +//! Two adapters ship with this module: +//! +//! - [`HttpLlmProvider`] — production. POSTs an OpenAI-compatible +//! `/v1/chat/completions` request, parses `choices[0].message.content`. +//! - [`RecordingLlmProvider`] — test. Records every inbound [`LlmRequest`] +//! and returns scripted [`LlmResponse`]s (or [`LlmError`]s). Mirrors the +//! `TestIoLoop` adapter for `BoardHandle` (CONTEXT.md § TestIoLoop): two +//! adapters is what makes the trait a real seam, not a hypothetical one. +//! +//! See `docs/adr/0002-per-capability-service-traits.md`. + +use async_trait::async_trait; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use thiserror::Error; +use tokio::sync::RwLock; + +/// A single LLM completion request. The OpenAI-compatible shape — `system` +/// is optional because some providers (e.g. local Ollama models without a +/// system slot) ignore it; callers that don't have a system prompt pass +/// `None` rather than an empty string so the request shape matches the +/// provider's expectation. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LlmRequest { + /// Model id understood by the provider (`gpt-4o-mini`, + /// `llama3.1:8b`, `claude-opus-4-7`, …). Opaque to the trait. + pub model: String, + /// Optional system prompt prepended to the message list. + pub system: Option, + /// User-role prompt content. Template substitution already applied by + /// the caller (the `Llm` component owns `{{var}}` resolution; the + /// provider sees the rendered string). + pub prompt: String, +} + +/// A single LLM completion response. +/// +/// Only the assistant text is exposed for now. Token counts, finish reasons, +/// and stop sequences are not part of the trait surface yet — they accrete +/// when the first consumer needs them. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LlmResponse { + pub text: String, +} + +/// Failure modes a `LlmProvider::generate` can surface. +/// +/// Distinct from [`crate::error::RuntimeError`] so a component can match on +/// the specific failure (cancelled vs. provider missing vs. wire failure) +/// without growing a `RuntimeError` variant per external kind. +#[derive(Error, Debug)] +pub enum LlmError { + /// Caller asked the [`LlmRegistry`] for a provider id that isn't present. + #[error("LLM provider '{0}' not found")] + ProviderNotFound(String), + + /// HTTP transport failure (DNS, TLS handshake, connection refused, + /// timeout, non-2xx status). Inner message is the upstream reason. + #[error("LLM request failed: {0}")] + Request(String), + + /// Response body didn't deserialize, or didn't contain the expected + /// `choices[0].message.content` path. + #[error("LLM response parse failed: {0}")] + Parse(String), + + /// The `RecordingLlmProvider` script was exhausted, or the production + /// task was aborted before the wire returned. Distinct from `Request` + /// so tests can assert on cancellation paths without matching on a + /// transport error message. + #[error("LLM request cancelled")] + Cancelled, +} + +/// Capability Trait for any backend that can run an LLM completion. +/// +/// Implementations must be `Send + Sync` because `Arc` is +/// shared across tokio tasks. Async via `async-trait` to keep the trait +/// object-safe under Rust 2021. +#[async_trait] +pub trait LlmProvider: Send + Sync { + /// Run one completion request. Cancellation is the caller's job — the + /// provider does not own the task; if the caller drops the awaiting + /// future the in-flight request is abandoned at the next `.await`. + async fn generate(&self, request: LlmRequest) -> Result; +} + +// --------------------------------------------------------------------------- +// Production: HttpLlmProvider +// --------------------------------------------------------------------------- + +/// Production [`LlmProvider`] backed by an OpenAI-compatible +/// `/v1/chat/completions` endpoint (OpenAI, OpenRouter, Ollama with +/// `/v1` enabled, vLLM, LM Studio, …). +/// +/// One instance per provider id — `base_url`+`api_key` are immutable on +/// the instance. Credential rotation = a new `HttpLlmProvider` inserted +/// into [`LlmRegistry`] under the same id. The `reqwest::Client` is shared +/// across calls so the connection pool is reused. +pub struct HttpLlmProvider { + base_url: String, + api_key: String, + client: reqwest::Client, +} + +impl HttpLlmProvider { + /// Build a provider against the given endpoint. `base_url` may include + /// or omit a trailing slash; the `/v1/chat/completions` suffix is + /// appended at request time. Empty `api_key` skips the + /// `Authorization: Bearer` header (useful for local Ollama). + #[must_use] + pub fn new(base_url: String, api_key: String) -> Self { + Self { + base_url, + api_key, + client: reqwest::Client::new(), + } + } +} + +#[async_trait] +impl LlmProvider for HttpLlmProvider { + async fn generate(&self, request: LlmRequest) -> Result { + let base = self.base_url.trim_end_matches('/'); + let url = format!("{base}/v1/chat/completions"); + + let mut messages = Vec::with_capacity(2); + if let Some(system) = &request.system { + if !system.is_empty() { + messages.push(serde_json::json!({ "role": "system", "content": system })); + } + } + messages.push(serde_json::json!({ "role": "user", "content": request.prompt })); + + let body = serde_json::json!({ + "model": request.model, + "messages": messages, + "stream": false, + }); + + let mut req = self.client.post(&url).json(&body); + if !self.api_key.is_empty() { + req = req.bearer_auth(&self.api_key); + } + + let resp = req + .send() + .await + .map_err(|e| LlmError::Request(e.to_string()))?; + + let json: serde_json::Value = resp + .json() + .await + .map_err(|e| LlmError::Parse(e.to_string()))?; + + let text = json + .get("choices") + .and_then(|c| c.get(0)) + .and_then(|c| c.get("message")) + .and_then(|m| m.get("content")) + .and_then(|s| s.as_str()) + .ok_or_else(|| { + LlmError::Parse(format!( + "missing choices[0].message.content in response: {json}" + )) + })? + .to_string(); + + Ok(LlmResponse { text }) + } +} + +// --------------------------------------------------------------------------- +// Test: RecordingLlmProvider +// --------------------------------------------------------------------------- + +/// Test [`LlmProvider`] that records every inbound request and returns +/// scripted outcomes from a FIFO queue. +/// +/// Mirrors the [`crate::runtime::board::TestIoLoop`] pattern: tests assert +/// what was sent and drive the component through both success and failure +/// paths without standing up an LLM endpoint. The second adapter beside +/// [`HttpLlmProvider`] is what makes [`LlmProvider`] a real seam. +/// +/// Behavioural contract: +/// +/// - `generate` pops the *front* of `scripted` and returns it; if empty, +/// returns [`LlmError::Cancelled`] so misconfigured tests fail loudly +/// rather than blocking on a fake await. +/// - `recorded` returns a snapshot of all requests received so far, in +/// call order. +pub struct RecordingLlmProvider { + recorded: Mutex>, + scripted: Mutex>>, +} + +impl RecordingLlmProvider { + #[must_use] + pub fn new() -> Self { + Self { + recorded: Mutex::new(Vec::new()), + scripted: Mutex::new(std::collections::VecDeque::new()), + } + } + + /// Push one scripted outcome onto the back of the queue. Calls consume + /// from the front. + pub fn script(&self, outcome: Result) { + self.scripted + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .push_back(outcome); + } + + /// Convenience: script a successful text reply. + pub fn script_ok(&self, text: impl Into) { + self.script(Ok(LlmResponse { text: text.into() })); + } + + /// Snapshot of every request received, in call order. + #[must_use] + pub fn recorded(&self) -> Vec { + self.recorded + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .clone() + } +} + +impl Default for RecordingLlmProvider { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl LlmProvider for RecordingLlmProvider { + async fn generate(&self, request: LlmRequest) -> Result { + self.recorded + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .push(request); + self.scripted + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .pop_front() + .unwrap_or(Err(LlmError::Cancelled)) + } +} + +// --------------------------------------------------------------------------- +// LlmRegistry +// --------------------------------------------------------------------------- + +/// Live registry of [`LlmProvider`] implementations keyed by provider id. +/// +/// Replaces the parallel `LlmManager` + `RuntimeContext.providers` dual +/// state (ADR-0002). Components hold `Arc` and call +/// [`LlmRegistry::get`] per dispatch, so swapping a provider's +/// `base_url`/`api_key` (via [`LlmRegistry::sync`]) takes effect on the +/// next request — no component rebuild required. +/// +/// Internal storage is `tokio::sync::RwLock>` so syncs and +/// reads coexist with async tasks holding open generate calls. +pub struct LlmRegistry { + entries: RwLock>>, +} + +impl LlmRegistry { + #[must_use] + pub fn new() -> Self { + Self { + entries: RwLock::new(HashMap::new()), + } + } + + /// Insert or replace one entry. Existing in-flight calls against the + /// previous instance continue to completion; subsequent lookups see the + /// new one. + pub async fn insert(&self, id: String, provider: Arc) { + self.entries.write().await.insert(id, provider); + } + + /// Resolve an entry by id. + pub async fn get(&self, id: &str) -> Option> { + self.entries.read().await.get(id).cloned() + } + + /// Replace the entire registry atomically. Used by the frontend sync + /// path: the frontend's authoritative provider list is pushed in full, + /// so atomic replace gives "the set you sent is what's live." + pub async fn sync(&self, providers: Vec<(String, Arc)>) { + let mut map = self.entries.write().await; + map.clear(); + for (id, provider) in providers { + map.insert(id, provider); + } + } + + /// Snapshot of all currently registered provider ids. Order is + /// undefined (HashMap iteration). Mostly useful for tests and logging. + pub async fn ids(&self) -> Vec { + self.entries.read().await.keys().cloned().collect() + } +} + +impl Default for LlmRegistry { + fn default() -> Self { + Self::new() + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn recording_provider_records_request_and_returns_scripted_response() { + let provider = RecordingLlmProvider::new(); + provider.script_ok("hello back"); + + let resp = provider + .generate(LlmRequest { + model: "test-model".into(), + system: Some("be terse".into()), + prompt: "hi".into(), + }) + .await + .expect("scripted ok"); + + assert_eq!(resp.text, "hello back"); + + let recorded = provider.recorded(); + assert_eq!(recorded.len(), 1); + assert_eq!(recorded[0].model, "test-model"); + assert_eq!(recorded[0].system.as_deref(), Some("be terse")); + assert_eq!(recorded[0].prompt, "hi"); + } + + #[tokio::test] + async fn recording_provider_returns_cancelled_when_script_empty() { + let provider = RecordingLlmProvider::new(); + let err = provider + .generate(LlmRequest { + model: "m".into(), + system: None, + prompt: "p".into(), + }) + .await + .expect_err("script exhausted"); + assert!(matches!(err, LlmError::Cancelled)); + } + + #[tokio::test] + async fn recording_provider_returns_scripted_errors() { + let provider = RecordingLlmProvider::new(); + provider.script(Err(LlmError::Request("boom".into()))); + let err = provider + .generate(LlmRequest { + model: "m".into(), + system: None, + prompt: "p".into(), + }) + .await + .expect_err("scripted err"); + assert!(matches!(err, LlmError::Request(msg) if msg == "boom")); + } + + #[tokio::test] + async fn recording_provider_scripts_drain_fifo() { + let provider = RecordingLlmProvider::new(); + provider.script_ok("first"); + provider.script_ok("second"); + + let req = LlmRequest { + model: "m".into(), + system: None, + prompt: "p".into(), + }; + let a = provider.generate(req.clone()).await.unwrap(); + let b = provider.generate(req).await.unwrap(); + assert_eq!(a.text, "first"); + assert_eq!(b.text, "second"); + } + + #[tokio::test] + async fn registry_insert_and_get_roundtrip() { + let registry = LlmRegistry::new(); + let provider = Arc::new(RecordingLlmProvider::new()); + provider.script_ok("ok"); + + registry + .insert("alpha".into(), provider.clone() as Arc) + .await; + + let fetched = registry.get("alpha").await.expect("present"); + let resp = fetched + .generate(LlmRequest { + model: "m".into(), + system: None, + prompt: "p".into(), + }) + .await + .expect("ok"); + assert_eq!(resp.text, "ok"); + // Original handle should also see the recorded request. + assert_eq!(provider.recorded().len(), 1); + } + + #[tokio::test] + async fn registry_get_missing_returns_none() { + let registry = LlmRegistry::new(); + assert!(registry.get("nope").await.is_none()); + } + + #[tokio::test] + async fn registry_sync_replaces_all_entries_atomically() { + let registry = LlmRegistry::new(); + + let a = Arc::new(RecordingLlmProvider::new()) as Arc; + let b = Arc::new(RecordingLlmProvider::new()) as Arc; + registry.insert("a".into(), a).await; + registry.insert("b".into(), b).await; + assert_eq!(registry.ids().await.len(), 2); + + let c = Arc::new(RecordingLlmProvider::new()) as Arc; + registry.sync(vec![("c".into(), c)]).await; + + let ids = registry.ids().await; + assert_eq!(ids, vec!["c".to_string()]); + assert!(registry.get("a").await.is_none()); + assert!(registry.get("b").await.is_none()); + assert!(registry.get("c").await.is_some()); + } + + #[tokio::test] + async fn registry_insert_replaces_same_id() { + let registry = LlmRegistry::new(); + let first = Arc::new(RecordingLlmProvider::new()); + first.script_ok("from-first"); + let second = Arc::new(RecordingLlmProvider::new()); + second.script_ok("from-second"); + + registry + .insert("id".into(), first.clone() as Arc) + .await; + registry + .insert("id".into(), second.clone() as Arc) + .await; + + let fetched = registry.get("id").await.expect("present"); + let resp = fetched + .generate(LlmRequest { + model: "m".into(), + system: None, + prompt: "p".into(), + }) + .await + .unwrap(); + assert_eq!(resp.text, "from-second"); + assert!(first.recorded().is_empty()); + assert_eq!(second.recorded().len(), 1); + } +} diff --git a/apps/web/src-tauri/src/runtime/services/mod.rs b/apps/web/src-tauri/src/runtime/services/mod.rs new file mode 100644 index 0000000..d540f13 --- /dev/null +++ b/apps/web/src-tauri/src/runtime/services/mod.rs @@ -0,0 +1,157 @@ +//! Runtime Services — per-capability traits, registries, and the typed +//! bundle threaded through component construction. +//! +//! This module replaces the ad-hoc split between: +//! +//! - `RuntimeContext` + build-time provider snapshot (LLM), +//! - `_mqtt_publish` event-out + `lib.rs` interceptor (MQTT/Figma), +//! +//! with one shape: every external kind gets a **Capability Trait** plus a +//! **Service Registry** (LLM) or a direct publisher handle (MQTT). +//! Components hold the relevant `Arc` and resolve the backing +//! implementation at dispatch time, so credential rotation and broker +//! reconfiguration take effect without rebuilding components. +//! +//! [`RuntimeServices`] is the typed bundle handed to the component registry +//! per flow_update; each `ComponentBuilder` impl declares which slice of +//! it the component needs via its associated `Deps` type. The +//! [`FromServices`] trait does the projection. + +use std::sync::Arc; + +pub mod llm; +pub mod mqtt; + +pub use llm::{ + HttpLlmProvider, LlmError, LlmProvider, LlmRegistry, LlmRequest, LlmResponse, + RecordingLlmProvider, +}; +pub use mqtt::{MqttPublishError, MqttPublisher, RecordedPublish, RecordingMqttPublisher}; + +// --------------------------------------------------------------------------- +// RuntimeServices +// --------------------------------------------------------------------------- + +/// Typed bundle of every external service the runtime can hand to a +/// component. One field per **Capability Trait** / **Service Registry**. +/// +/// `Arc` everywhere — handing a `RuntimeServices` to a factory and then +/// projecting a `Deps` slice out of it is a series of cheap clones. +/// +/// Built once at application startup and reused across every flow update. +/// `Clone` so it can be carried alongside a pending `FlowUpdate` on +/// `AppState::pending_flow` and replayed on board-connect without losing +/// any of the live registries. +#[derive(Clone)] +pub struct RuntimeServices { + /// Live registry of [`LlmProvider`] implementations keyed by id. See + /// `CONTEXT.md` § LLM Provider, § Service Registry. + pub llm_registry: Arc, + /// Production MQTT publisher (the application's `MqttManager`, + /// adapted to [`MqttPublisher`]). See `CONTEXT.md` § MQTT Publisher. + pub mqtt_publisher: Arc, +} + +impl RuntimeServices { + /// Build a services bundle around the given shared handles. + #[must_use] + pub fn new( + llm_registry: Arc, + mqtt_publisher: Arc, + ) -> Self { + Self { + llm_registry, + mqtt_publisher, + } + } + + /// Build a services bundle with freshly-allocated, empty registries. + /// Used in unit tests that don't exercise external dispatch paths; + /// production call sites should clone shared handles from + /// `AppState` via [`RuntimeServices::new`]. + #[must_use] + pub fn empty() -> Self { + Self { + llm_registry: Arc::new(LlmRegistry::new()), + mqtt_publisher: Arc::new(RecordingMqttPublisher::new()), + } + } +} + +impl Default for RuntimeServices { + fn default() -> Self { + Self::empty() + } +} + +// --------------------------------------------------------------------------- +// FromServices +// --------------------------------------------------------------------------- + +/// Project a typed slice out of [`RuntimeServices`]. +/// +/// Every concrete `Deps` shape a [`crate::runtime::component::ComponentBuilder`] +/// declares must implement this trait, so the component registry's factory +/// closure can pull out the right slice without naming the specific impl: +/// +/// ```ignore +/// let deps = ::from_services(services); +/// B::build(id, config, deps) +/// ``` +/// +/// Adding a new external kind = add a new field to [`RuntimeServices`] and +/// add a `FromServices` impl for whatever `Arc` / registry handle +/// components reach for. Zero touches in the 29 builders that need +/// nothing — they keep their `type Deps = ()`. +pub trait FromServices { + fn from_services(services: &RuntimeServices) -> Self; +} + +/// Builders that need nothing from the services bundle declare +/// `type Deps = ()`. The impl is a `()` constructor. +impl FromServices for () { + fn from_services(_: &RuntimeServices) -> Self {} +} + +impl FromServices for Arc { + fn from_services(services: &RuntimeServices) -> Self { + Arc::clone(&services.llm_registry) + } +} + +impl FromServices for Arc { + fn from_services(services: &RuntimeServices) -> Self { + Arc::clone(&services.mqtt_publisher) + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn from_services_unit_returns_unit() { + let services = RuntimeServices::empty(); + let () = <() as FromServices>::from_services(&services); + } + + #[test] + fn from_services_llm_registry_shares_arc() { + let services = RuntimeServices::empty(); + let a = as FromServices>::from_services(&services); + let b = as FromServices>::from_services(&services); + assert!(Arc::ptr_eq(&a, &b)); + assert!(Arc::ptr_eq(&a, &services.llm_registry)); + } + + #[test] + fn from_services_mqtt_publisher_shares_arc() { + let services = RuntimeServices::empty(); + let a = as FromServices>::from_services(&services); + assert!(Arc::ptr_eq(&a, &services.mqtt_publisher)); + } +} diff --git a/apps/web/src-tauri/src/runtime/services/mqtt.rs b/apps/web/src-tauri/src/runtime/services/mqtt.rs new file mode 100644 index 0000000..beb553b --- /dev/null +++ b/apps/web/src-tauri/src/runtime/services/mqtt.rs @@ -0,0 +1,241 @@ +//! MQTT publish **Capability Trait** and adapters. +//! +//! Components that publish to an MQTT broker (`Mqtt`, `Figma`) hold an +//! `Arc` and call `publish(...)` directly, replacing the +//! historic `_mqtt_publish` event-emission pattern that hopped through +//! `lib.rs`'s event-forwarding thread and a dedicated publish-handler +//! thread (see ADR-0002 § D3). +//! +//! Inbound subscription routing — collecting [`super::super::wiring::SubscriberWiring`] +//! and applying it to the manager — stays where it lives now. This trait +//! covers only the outbound publish path. +//! +//! Two adapters ship with this module: +//! +//! - [`crate::mqtt::manager::MqttManager`] is the production adapter; the +//! trait impl in this file delegates to its existing +//! `publish(broker_id, topic, payload, retain)` async method and +//! translates the legacy `String` error into [`MqttPublishError`] so +//! callers can match on the failure kind. +//! - [`RecordingMqttPublisher`] records every inbound publish call and +//! either returns `Ok(())` or pops a scripted error from a FIFO queue. +//! Mirrors the `RecordingLlmProvider` pattern — the second adapter is +//! what makes [`MqttPublisher`] a real seam. + +use async_trait::async_trait; +use std::sync::Mutex; +use thiserror::Error; + +use crate::mqtt::manager::MqttManager; + +/// Failure modes a `MqttPublisher::publish` can surface. +#[derive(Error, Debug)] +pub enum MqttPublishError { + /// The broker id is not in the manager's connected set. Distinguished + /// from `PublishFailed` so callers can prompt the user to (re)connect + /// rather than surface a generic wire error. + #[error("MQTT broker '{0}' not connected")] + BrokerNotConnected(String), + + /// Wire-level failure (broker rejected the publish, connection + /// dropped mid-flight, payload too large, …). Inner message is the + /// upstream reason. + #[error("MQTT publish failed: {0}")] + PublishFailed(String), +} + +/// Capability Trait for any backend that can publish a single MQTT message. +/// +/// `Send + Sync` because `Arc` is shared across tokio +/// tasks. Async via `async-trait` for dyn-safety under Rust 2021. +#[async_trait] +pub trait MqttPublisher: Send + Sync { + /// Publish one message to `topic` on the broker identified by + /// `broker_id`. `payload` is the raw bytes the broker forwards; + /// callers serialise their own value first. `retain` matches MQTT's + /// retain flag — the broker stores the last retained payload per + /// topic and replays it to new subscribers. + async fn publish( + &self, + broker_id: &str, + topic: &str, + payload: &[u8], + retain: bool, + ) -> Result<(), MqttPublishError>; +} + +// --------------------------------------------------------------------------- +// Production adapter: MqttManager +// --------------------------------------------------------------------------- + +#[async_trait] +impl MqttPublisher for MqttManager { + async fn publish( + &self, + broker_id: &str, + topic: &str, + payload: &[u8], + retain: bool, + ) -> Result<(), MqttPublishError> { + // The existing `MqttManager::publish` returns `Result<(), String>` + // with "Broker {id} not connected" as the only id-specific message + // it produces. Translate that into the typed variant; route every + // other string into `PublishFailed` so callers can still log / + // surface the upstream reason verbatim. + match MqttManager::publish(self, broker_id, topic, payload, retain).await { + Ok(()) => Ok(()), + Err(msg) if msg.contains("not connected") => { + Err(MqttPublishError::BrokerNotConnected(broker_id.to_string())) + } + Err(msg) => Err(MqttPublishError::PublishFailed(msg)), + } + } +} + +// --------------------------------------------------------------------------- +// Test adapter: RecordingMqttPublisher +// --------------------------------------------------------------------------- + +/// One captured publish call. The trait passes `&[u8]` for the payload but +/// the recorder snapshots it as an owned `Vec` so tests can inspect +/// later without lifetime constraints. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RecordedPublish { + pub broker_id: String, + pub topic: String, + pub payload: Vec, + pub retain: bool, +} + +/// Test [`MqttPublisher`] that records every publish call and either +/// returns `Ok(())` or pops a scripted error from a FIFO queue. +/// +/// Mirrors [`super::llm::RecordingLlmProvider`]: tests assert what was +/// sent (broker_id / topic / payload / retain) and drive components +/// through both success and failure paths without standing up a broker. +pub struct RecordingMqttPublisher { + recorded: Mutex>, + /// Scripted failures. Each entry pops one publish; when the queue is + /// empty, calls succeed (the default for tests that don't care about + /// failure paths). Use [`script_err`](Self::script_err) to inject a + /// specific failure. + scripted_errors: Mutex>, +} + +impl RecordingMqttPublisher { + #[must_use] + pub fn new() -> Self { + Self { + recorded: Mutex::new(Vec::new()), + scripted_errors: Mutex::new(std::collections::VecDeque::new()), + } + } + + /// Push one scripted error onto the back of the failure queue. The + /// next publish call consumes it and returns the error; the call is + /// still recorded. + pub fn script_err(&self, err: MqttPublishError) { + self.scripted_errors + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .push_back(err); + } + + /// Snapshot of every call received, in call order. + #[must_use] + pub fn recorded(&self) -> Vec { + self.recorded + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .clone() + } +} + +impl Default for RecordingMqttPublisher { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl MqttPublisher for RecordingMqttPublisher { + async fn publish( + &self, + broker_id: &str, + topic: &str, + payload: &[u8], + retain: bool, + ) -> Result<(), MqttPublishError> { + self.recorded + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .push(RecordedPublish { + broker_id: broker_id.to_string(), + topic: topic.to_string(), + payload: payload.to_vec(), + retain, + }); + + if let Some(err) = self + .scripted_errors + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .pop_front() + { + Err(err) + } else { + Ok(()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn recording_publisher_captures_calls_in_order() { + let pub_ = RecordingMqttPublisher::new(); + pub_.publish("b1", "t/one", b"a", false).await.unwrap(); + pub_.publish("b2", "t/two", b"bcd", true).await.unwrap(); + + let recorded = pub_.recorded(); + assert_eq!(recorded.len(), 2); + assert_eq!(recorded[0].broker_id, "b1"); + assert_eq!(recorded[0].topic, "t/one"); + assert_eq!(recorded[0].payload, b"a"); + assert!(!recorded[0].retain); + assert_eq!(recorded[1].broker_id, "b2"); + assert_eq!(recorded[1].topic, "t/two"); + assert_eq!(recorded[1].payload, b"bcd"); + assert!(recorded[1].retain); + } + + #[tokio::test] + async fn recording_publisher_records_even_when_scripted_to_fail() { + let pub_ = RecordingMqttPublisher::new(); + pub_.script_err(MqttPublishError::BrokerNotConnected("b1".into())); + let err = pub_ + .publish("b1", "t", b"x", false) + .await + .expect_err("scripted err"); + assert!(matches!(err, MqttPublishError::BrokerNotConnected(id) if id == "b1")); + // Call still recorded. + assert_eq!(pub_.recorded().len(), 1); + } + + #[tokio::test] + async fn recording_publisher_errors_drain_fifo() { + let pub_ = RecordingMqttPublisher::new(); + pub_.script_err(MqttPublishError::PublishFailed("first".into())); + pub_.script_err(MqttPublishError::PublishFailed("second".into())); + + let a = pub_.publish("b", "t", b"x", false).await.unwrap_err(); + let b = pub_.publish("b", "t", b"x", false).await.unwrap_err(); + let c = pub_.publish("b", "t", b"x", false).await; // queue empty → Ok + + assert!(matches!(a, MqttPublishError::PublishFailed(msg) if msg == "first")); + assert!(matches!(b, MqttPublishError::PublishFailed(msg) if msg == "second")); + assert!(c.is_ok()); + } +} diff --git a/docs/adr/0002-per-capability-service-traits.md b/docs/adr/0002-per-capability-service-traits.md new file mode 100644 index 0000000..15f3c4f --- /dev/null +++ b/docs/adr/0002-per-capability-service-traits.md @@ -0,0 +1,228 @@ +# ADR-0002 — Per-capability service traits over a single `RuntimeContext` bundle + +- **Status:** accepted +- **Date:** 2026-05-17 +- **Deciders:** sander + +## Context + +External components — components in `runtime/external/` that talk to MQTT brokers, +LLM endpoints, etc. — currently reach their backing resources through two +inconsistent patterns: + +1. **Snapshot-into-builder (LLM).** `flow_update` (`runtime/commands.rs:78`) + constructs a `RuntimeContext { providers: Vec }` from the + frontend payload. `ComponentBuilder::build` is called with `&RuntimeContext` + for every Catalog impl; only `Llm::build` actually consults it, to copy + `base_url`/`api_key` into `LlmConfig`. The other ~31 builders take `_ctx` + and ignore it. After construction, `Llm` owns its credentials and builds + `reqwest::Client::new()` inline inside `Llm::spawn_generate` + (`runtime/external/llm.rs:99-183`). There is no abstraction over the wire. + +2. **Event-out, manager-resolves (MQTT/Figma).** `Mqtt`/`Figma` know only a + `broker_id`. They emit a reserved `_mqtt_publish` event whose value carries + `{brokerId, topic, payload, retain}`. The event leaves the runtime via the + event channel, is intercepted in `lib.rs::run`'s event-forwarding thread + (`apps/web/src-tauri/src/lib.rs:167-196`), and re-dispatched onto an + `MqttPublishRequest` channel that another thread drains by calling + `state.mqtt_manager.publish(...)`. The component is decoupled from the + manager — at the cost of a stringly-typed event protocol and an extra + thread hop. + +This split has accreted three concrete pains: + +- **Dead writer / dual state.** `LlmManager` (`apps/web/src-tauri/src/llm/manager.rs`) + exists as `Arc>>`, synced from the + frontend via the `llm_sync_providers` Tauri command. **Nothing reads + `LlmManager.get()`.** The same provider data is *also* pushed through + `flow_update` into `RuntimeContext.providers`. Two sync paths, two stores, + zero readers of the manager. Engineer reading the code cannot tell which + is canonical. + +- **Stale-credential hazard via `pending_flow`.** `AppState.pending_flow: + Arc>>` (`lib.rs:78`). When the + board is disconnected at `flow_update` time, the `RuntimeContext` is stored + alongside the flow and replayed on board-connect (`lib.rs:262-264`). If a + provider's API key rotates between the two events, the `Llm` component is + built with stale credentials. `LlmConfig` then snapshots those credentials, + so the staleness sticks until the next `flow_update`. + +- **No test seam for LLM/MQTT.** `BoardHandle` has `TestIoLoop` as a paired + adapter at the `BoardCommand` channel (CONTEXT.md § TestIoLoop) — one + production adapter, one test adapter, which is what makes the seam real + (per the `LANGUAGE.md` rule of thumb). LLM and MQTT have no such pair. + Testing `Llm` today requires running a real OpenAI-compatible endpoint or + monkey-patching reqwest. Testing `Mqtt`/`Figma` requires standing up a + broker or stubbing the event interceptor in `lib.rs`. The trait does not + exist; there is nothing to substitute. + +The system will gain more external kinds (HTTP webhook, OSC, WebSocket, more +LLM providers). Each new kind under the current split forces another choice +between "snapshot into config" and "emit reserved event," another bespoke +manager, and another untested code path. + +## Decision + +Introduce a layered service-trait architecture, replacing both patterns with +one. + +``` +┌─ Layer 1: Capability traits (one per external kind) ──────┐ +│ trait LlmProvider: async fn generate(req) -> resp │ +│ trait MqttPublisher: async fn publish(topic, payload) │ +│ trait HttpCaller: ... (added when needed) │ +└────────────────────────────────────────────────────────────┘ +┌─ Layer 2: Service registries (one per kind) ──────────────┐ +│ LlmRegistry { RwLock>> } +│ MqttRegistry { RwLock>> } +│ Sync command refreshes entries; live components observe │ +│ updates without rebuild. │ +└────────────────────────────────────────────────────────────┘ +┌─ Layer 3: RuntimeServices (replaces RuntimeContext) ──────┐ +│ pub struct RuntimeServices { │ +│ llm: Arc, │ +│ mqtt: Arc, │ +│ ... │ +│ } │ +└────────────────────────────────────────────────────────────┘ +┌─ Layer 4: ComponentBuilder declares typed Deps ───────────┐ +│ trait ComponentBuilder { │ +│ type Deps = (); // default: needs none │ +│ fn build(id, config, deps: Self::Deps) -> Result │ +│ } │ +│ impl ComponentBuilder for Llm { type Deps = Arc; } +│ impl ComponentBuilder for Mqtt { type Deps = Arc; } +│ Registry factory projects Deps from RuntimeServices per impl +└────────────────────────────────────────────────────────────┘ +┌─ Layer 5: Dispatch-time lookup, not build-time snapshot ──┐ +│ Llm.dispatch("trigger") → llm_registry.get(&provider_id) │ +│ ?.generate(req).await │ +│ No more stale ctx in pending_flow. Key rotation takes │ +│ effect on the next call. │ +└────────────────────────────────────────────────────────────┘ +``` + +Five sub-decisions: + +- **D1 — Per-capability traits, not a single `ExternalGateway` trait.** Each + external kind gets its own trait surface. Adding HTTP/OSC/WebSocket adds a + new trait + registry; it does not grow an existing god-trait. Cost: one + trait per kind. Benefit: Open/Closed at the trait boundary. + +- **D2 — Live `Arc` registries, not value snapshots.** Components + hold `Arc` and look up the provider per call. Credential rotation + works without rebuilding components. The dead `LlmManager` becomes the + production `LlmRegistry`, finally read. + +- **D3 — Trait dispatch over event-emission for outbound calls.** `Mqtt` + publishes via `Arc` instead of emitting `_mqtt_publish` + events that `lib.rs` re-routes. The event interceptor retires once unused; + the runtime stops being a thread-hop router for external calls. + +- **D4 — Typed `ComponentBuilder::Deps`, not a universal `&RuntimeContext`.** + Each `ComponentBuilder` impl declares exactly the registries it needs. + Default `type Deps = ()` for the ~30 components that need nothing. The + registry factory projects the right `Deps` from `RuntimeServices` per impl. + `RuntimeContext` and `ProviderEntry` delete. + +- **D5 — Two adapters per trait from day one.** Production HTTP/MQTT impls, + plus a test impl (`RecordingLlmProvider`, `RecordingMqttPublisher`) that + records inbound calls and returns scripted outcomes. Mirrors the + `BoardHandle` + `TestIoLoop` pattern. The second adapter is what makes + each trait a *real* seam (per `LANGUAGE.md`). + +Roll out in four phases. Each compiles and ships: + +1. **Phase 1** — Add `runtime/services` module with the LLM capability trait, + registry, production HTTP impl, and recording test impl. Pure addition — + existing `Llm` component is untouched, existing `LlmManager` remains in + place. Lands the foundation and the test pair. +2. **Phase 2** — Migrate `Llm` component to hold `Arc` and + resolve the provider at dispatch time. Drop snapshot fields from + `LlmConfig`. Retire `LlmManager` in favour of `LlmRegistry`. +3. **Phase 3** — Same shape for MQTT: `MqttPublisher` trait, `MqttRegistry`, + migrate `Mqtt`/`Figma`. Retire the `_mqtt_publish` event interceptor. +4. **Phase 4** — Introduce `RuntimeServices` + `ComponentBuilder::Deps`. + Delete `RuntimeContext`, `ProviderEntry`. Update 31 pass-through builders + to drop `_ctx`. Change `AppState.pending_flow` to hold + `Arc`. + +## Consequences + +**Positive** + +- One pattern for external service access. Adding a new kind (HTTP, OSC, + WebSocket) is template work: define trait, define registry, add field to + `RuntimeServices`. No bespoke per-kind plumbing in `lib.rs`. +- Tests for `Llm`/`Mqtt`/`Figma` become unit tests against a recording + adapter. Component logic verifiable without standing up a broker, an LLM + endpoint, or the Tauri host. +- Provider/broker credential rotation takes effect on the next dispatch — + no component rebuild, no flow_update re-fire. +- `RuntimeContext` deletes; 31 pass-through `_ctx` params delete. + `ComponentBuilder` signature shrinks. +- `LlmManager` stops being dead code (becomes `LlmRegistry`). +- The `_mqtt_publish` event interceptor in `lib.rs::run` deletes; + `mqtt_publish_tx`/`MqttPublishRequest`/the dedicated publish thread go with + it. + +**Negative** + +- `ComponentBuilder` grows an associated `type Deps` with a default. The + registry factory closure (`runtime/registry.rs::Factory`) becomes generic + over `` and projects `Deps` per impl. Slightly more type plumbing in the + registry; offset by deleting `RuntimeContext` and 31 `_ctx` params. +- Async traits require the `async-trait` crate (or native async-in-trait + with `Pin>` boilerplate). New dep: `async-trait = "0.1"`. +- Two registries (`LlmRegistry`, `MqttRegistry`) become the long-lived + shared state, instead of one `RuntimeContext` value. Lifetime is the + app lifetime; held via `Arc` in `AppState` and forwarded to components. +- The `Mqtt` event-emission path was used by `Figma` too. Migrating Figma + in Phase 3 requires its `dispatch` arms to call `MqttPublisher::publish` + directly instead of emitting `_mqtt_publish`. The event handle disappears + from the `executor`'s `_`-prefix routing. + +**Neutral** + +- `ListenerWiring` / `SubscriberWiring` (CONTEXT.md § Wiring) are + unaffected. Wiring is descriptive data returned from components; this ADR + changes how *outbound* external calls are dispatched, not how inbound + subscriptions are described. +- `BoardHandle` is unaffected. Hardware seams already follow the + one-trait-one-test-adapter pattern; this ADR generalises that pattern to + the other external kinds. + +## Glossary + +New terms recorded in `CONTEXT.md`: + +- **Capability Trait** — a trait describing one external kind's operations + (e.g. `LlmProvider`, `MqttPublisher`). +- **Service Registry** — a live `Arc>>>` + of capability-trait implementations keyed by an id from the frontend. +- **Runtime Services** — the typed bundle of registries passed to + `ComponentBuilder::build` via the impl's associated `Deps`. +- **LLM Provider** — a `Capability Trait` for any backend that can run an + LLM completion against an OpenAI-compatible request shape. + +## References + +- `apps/web/src-tauri/src/runtime/context.rs` — `RuntimeContext` and + `ProviderEntry` (to delete in Phase 4). +- `apps/web/src-tauri/src/runtime/builders.rs` — 32 `ComponentBuilder` impls + (31 with `_ctx`, 1 reading `ctx`). +- `apps/web/src-tauri/src/runtime/external/llm.rs` — inline `reqwest::Client` + HTTP call (to retire in Phase 2). +- `apps/web/src-tauri/src/runtime/external/mqtt.rs`, + `apps/web/src-tauri/src/runtime/external/figma.rs` — `_mqtt_publish` emit + sites (to retire in Phase 3). +- `apps/web/src-tauri/src/lib.rs:167-196` — `_mqtt_publish` event + interceptor (to retire in Phase 3). +- `apps/web/src-tauri/src/llm/manager.rs` — `LlmManager` dead writer + (folds into `LlmRegistry` in Phase 2). +- `apps/web/src-tauri/src/mqtt/manager.rs` — `MqttManager` (becomes the + production `MqttPublisher` impl in Phase 3). +- `CONTEXT.md` § Runtime Context (to delete in Phase 4) and § BoardHandle / + § TestIoLoop (the seam pattern this ADR generalises). +- `ADR-0001` — Component trait flow separation (sets the precedent for + splitting an overloaded interface into typed seams).