Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 62 additions & 2 deletions CONTEXT.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,69 @@ Two kinds:

Distinct from the **Component Catalog**: catalog is metadata for _registration_ (what UI shows, how to construct); Wiring is per-impl _behavior_ applied after construction.

## Runtime Context
## Runtime Services

Read-only bundle passed to component factories at construction time: connected brokers, configured LLM providers. Lets a component pluck the bits it needs (e.g. `Llm` reads its provider's `base_url`/`api_key`) without mutating `node.data` upstream. Empty for components with no external deps.
Typed bundle of every external service the runtime can hand to a component, threaded through component construction. Lives in `runtime/services/mod.rs` as `RuntimeServices`. One field per **Capability Trait** / **Service Registry**:

- `llm_registry: Arc<LlmRegistry>`
- `mqtt_publisher: Arc<dyn MqttPublisher>`
- _(future kinds — HTTP, OSC, WebSocket — accrete as new fields with no churn in unrelated components)_

Built once at application startup (`AppState::run`) and reused across every `flow_update`. `Clone` so it can be carried alongside a pending `FlowUpdate` on `AppState::pending_flow` and replayed on board-connect without losing live registries.

Replaces the former `RuntimeContext` struct: same role, more honest name. The corresponding ADR is [ADR-0002](docs/adr/0002-per-capability-service-traits.md).

## Component Deps

The associated `type Deps: FromServices` on the `ComponentBuilder` trait — each impl's typed record of the slice of **Runtime Services** it needs to construct.

- Components that need nothing declare `type Deps = ();` (the default-friendliest shape — `FromServices for ()` returns unit). 29 of the 32 Catalog impls do this.
- `Llm` declares `type Deps = Arc<LlmRegistry>`.
- `Mqtt` and `Figma` declare `type Deps = Arc<dyn MqttPublisher>`.

The component registry's `Factory` closure (`runtime/registry.rs::make_factory`) projects `<B::Deps as FromServices>::from_services(services)` at build time and hands the slice to `B::build`. Adding a new external kind = add a field to `RuntimeServices` + a `FromServices` impl for the new `Arc<..>` — zero touches in the 29 unaffected builders.

## Capability Trait

A Rust trait describing one external kind's outbound operations (e.g. [`LlmProvider`](#llm-provider)`::generate`, `MqttPublisher::publish`). Lives in `runtime/services/<kind>.rs`. Components depend on `Arc<dyn CapabilityTrait>` (or on the **Service Registry** that maps id → `Arc<dyn CapabilityTrait>`), never on the concrete HTTP client / broker library.

Each Capability Trait ships with two adapters from day one — a production impl (e.g. `HttpLlmProvider`) and a recording test impl (e.g. `RecordingLlmProvider`) — which is what makes the trait a real seam rather than a hypothetical one (same rule as **BoardHandle** + **TestIoLoop**).

See [ADR-0002](docs/adr/0002-per-capability-service-traits.md).

## Service Registry

Live, mutable map of `id → Arc<dyn CapabilityTrait>` for one capability kind (e.g. `LlmRegistry`, future `MqttRegistry`). Lives in `runtime/services/<kind>.rs`. The frontend's authoritative list is pushed in full via `sync(providers: Vec<(id, Arc<dyn ..>)>)`; existing in-flight calls against the previous instance run to completion, subsequent lookups see the new entry.

Components hold `Arc<Registry>` and resolve the **Capability Trait** by id at dispatch time, not at construction time. Consequence: credential / endpoint rotation takes effect on the next call, no component rebuild, no flow_update re-fire.

Replaces the parallel `LlmManager` + `RuntimeContext.providers` dual-state pattern.

## LLM Provider

The **Capability Trait** for any backend that can run an LLM completion against an OpenAI-compatible `/v1/chat/completions` request shape. Lives in `runtime/services/llm.rs`. Carries one method:

```rust
async fn generate(&self, request: LlmRequest) -> Result<LlmResponse, LlmError>;
```

`LlmRequest` is `{ model, system: Option<String>, prompt }` — template substitution is the caller's job, the provider sees the rendered text. `LlmResponse` is `{ text }` for now; token counts / finish reasons accrete only when a consumer needs them.

Production adapter: `HttpLlmProvider` (one `reqwest::Client` per instance for connection-pool reuse; empty `api_key` skips the `Authorization` header so local Ollama works). Test adapter: `RecordingLlmProvider` (records every request, returns scripted responses or errors from a FIFO queue, returns `LlmError::Cancelled` when the script is exhausted).

## MQTT Publisher

The **Capability Trait** for any backend that can publish a single MQTT message. Lives in `runtime/services/mqtt.rs`. One method:

```rust
async fn publish(&self, broker_id: &str, topic: &str, payload: &[u8], retain: bool) -> Result<(), MqttPublishError>;
```

`MqttPublishError::BrokerNotConnected` is distinguished from `PublishFailed` so callers (UI, logs) can prompt for a reconnect instead of surfacing a generic wire error.

Production adapter: `crate::mqtt::manager::MqttManager` (via `impl MqttPublisher for MqttManager` in `runtime/services/mqtt.rs`) — delegates to the existing broker pool, translating the legacy `String` error into the typed variant. Test adapter: `RecordingMqttPublisher` (records every `(broker_id, topic, payload, retain)` tuple and pops scripted errors from a FIFO queue).

`Mqtt` and `Figma` components hold `Arc<dyn MqttPublisher>` and call `publish(...)` directly from their `dispatch` arms via a Tokio-spawned task. Replaces the legacy `_mqtt_publish` reserved-event pattern (component emits a JSON-encoded publish request, `lib.rs` parses it and re-routes through a dedicated handler thread) — that path was retired in [ADR-0002](docs/adr/0002-per-capability-service-traits.md) Phase 3.

## Host Adapter

Expand Down
1 change: 1 addition & 0 deletions apps/web/src-tauri/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apps/web/src-tauri/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ thiserror = "1"
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
boa_engine = "0.20"
dashmap = "6"
async-trait = "0.1"

[dev-dependencies]
proptest = "1"
Expand Down
135 changes: 40 additions & 95 deletions apps/web/src-tauri/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,13 @@ pub mod runtime;
pub use error::*;

use hardware::HardwareService;
use llm::LlmManager;
use mqtt::MqttManager;
use runtime::{FlowRuntime, FlowUpdate, RuntimeContext};
use runtime::services::{LlmRegistry, MqttPublisher, RuntimeServices};
use runtime::{FlowRuntime, FlowUpdate};
use std::sync::{Arc, Mutex, RwLock};
use tauri::{Emitter, Listener};
use tokio::sync::mpsc;
use tokio::sync::Mutex as TokioMutex;

/// MQTT publish request from flow components
#[derive(Debug, Clone)]
pub struct MqttPublishRequest {
pub component_id: String,
pub broker_id: String,
pub topic: String,
pub payload: String,
pub retain: bool,
}

/// Tracks active Figma MQTT subscriptions so they can be cleaned up on flow switch
#[derive(Debug, Clone)]
pub struct FigmaSubscription {
Expand All @@ -74,16 +63,23 @@ pub struct FigmaSubscription {
pub struct AppState {
pub hardware_service: Arc<Mutex<HardwareService>>,
pub flow_runtime: Arc<TokioMutex<FlowRuntime>>,
/// Pending flow update + its runtime context, applied when the board connects.
pub pending_flow: Arc<RwLock<Option<(FlowUpdate, RuntimeContext)>>>,
/// Pending flow update + the runtime services bundle live at
/// `flow_update` time, applied when the board connects.
pub pending_flow: Arc<RwLock<Option<(FlowUpdate, RuntimeServices)>>>,
/// Whether a Firmata board is connected
pub board_connected: Arc<RwLock<bool>>,
/// MQTT broker manager
pub mqtt_manager: MqttManager,
/// Channel for MQTT publish requests from flow components
pub mqtt_publish_tx: mpsc::UnboundedSender<MqttPublishRequest>,
/// LLM provider manager
pub llm_manager: LlmManager,
/// MQTT publish handle wired into the runtime via `RuntimeContext` so
/// `Mqtt` / `Figma` components publish straight through the manager
/// instead of emitting `_mqtt_publish` events for `lib.rs` to re-route
/// (ADR-0002 Phase 3).
pub mqtt_publisher: Arc<dyn MqttPublisher>,
/// Live LLM provider registry. Shared with `RuntimeContext` so components
/// resolve providers at dispatch time and pick up credential rotation
/// without rebuilding. Filled by the `flow_update` and
/// `llm_sync_providers` Tauri commands.
pub llm_registry: Arc<LlmRegistry>,
/// Active Figma MQTT subscriptions (cleaned up on flow switch)
pub figma_subscriptions: Arc<TokioMutex<Vec<FigmaSubscription>>>,
}
Expand All @@ -99,22 +95,22 @@ pub fn run() {
let flow_runtime = Arc::new(TokioMutex::new(FlowRuntime::new()));
let pending_flow = Arc::new(RwLock::new(None));
let board_connected = Arc::new(RwLock::new(false));

// Create channel for MQTT publish requests
let (mqtt_publish_tx, mqtt_publish_rx) = mpsc::unbounded_channel::<MqttPublishRequest>();

let mqtt_manager = MqttManager::new();
let mqtt_manager_for_publish = mqtt_manager.clone();
let llm_manager = LlmManager::new();
// `MqttManager` is `Clone` (its broker map lives behind `Arc<RwLock<..>>`),
// so the cloned instance handed to the dyn-trait `Arc` shares the same
// broker pool as the one held on `AppState`.
let mqtt_publisher: Arc<dyn MqttPublisher> = Arc::new(mqtt_manager.clone());
let llm_registry = Arc::new(LlmRegistry::new());

let app_state = AppState {
hardware_service: Arc::clone(&hardware_service),
flow_runtime: Arc::clone(&flow_runtime),
pending_flow: Arc::clone(&pending_flow),
board_connected: Arc::clone(&board_connected),
mqtt_manager,
mqtt_publish_tx: mqtt_publish_tx.clone(),
llm_manager,
mqtt_publisher,
llm_registry,
figma_subscriptions: Arc::new(TokioMutex::new(Vec::new())),
};

Expand Down Expand Up @@ -150,53 +146,30 @@ pub fn run() {
// Use blocking_lock() for sync context during setup
let event_rx = flow_runtime.blocking_lock().take_event_receiver();

// Set up event forwarding from flow runtime
// Set up event forwarding from flow runtime.
//
// Outbound MQTT publishes used to ride this channel under the
// `_mqtt_publish` reserved handle — components emitted the
// request as a JSON value, this thread parsed it and
// re-dispatched to a dedicated publish-handler thread. Since
// ADR-0002 Phase 3 that path is gone: `Mqtt` / `Figma` hold an
// `Arc<dyn MqttPublisher>` and publish directly. The event
// channel now carries only plain component events, all of
// which go straight to the frontend + executor.
let app_handle_events = app_handle.clone();
let flow_runtime_events = Arc::clone(&flow_runtime);
let mqtt_publish_tx_events = mqtt_publish_tx.clone();
std::thread::spawn(move || {
log::info!("Event forwarding thread started");
if let Some(mut rx) = event_rx {
log::info!("Event receiver obtained, waiting for events...");
while let Some(event) = rx.blocking_recv() {
log::trace!("Event: {} ({}) -> {:?}",
event.source, event.source_handle, event.value);

// Handle MQTT publish events specially
// These are emitted by MQTT publish nodes when they receive input
if event.source_handle.as_ref() == "_mqtt_publish" {
log::debug!("[MQTT] Publish event from component {}", event.source);

// Parse the JSON publish info from the event value
if let runtime::ComponentValue::String(json_str) = &event.value {
if let Ok(publish_info) = serde_json::from_str::<serde_json::Value>(json_str) {
let broker_id = publish_info.get("brokerId").and_then(|v| v.as_str()).unwrap_or("").to_string();
let topic = publish_info.get("topic").and_then(|v| v.as_str()).unwrap_or("").to_string();
let payload = publish_info.get("payload").and_then(|v| v.as_str()).unwrap_or("").to_string();
let retain = publish_info.get("retain").and_then(serde_json::Value::as_bool).unwrap_or(false);

log::info!("[MQTT] Publishing to broker={broker_id}, topic={topic}, payload={payload}, retain={retain}");

if !broker_id.is_empty() && !topic.is_empty() {
let _ = mqtt_publish_tx_events.send(MqttPublishRequest {
component_id: event.source.to_string(),
broker_id,
topic,
payload,
retain,
});
} else {
log::warn!("[MQTT] Publish request missing broker_id or topic");
}
} else {
log::error!("[MQTT] Failed to parse publish info JSON");
}
}
continue;
}

log::trace!(
"Event: {} ({}) -> {:?}",
event.source, event.source_handle, event.value
);

let _ = app_handle_events.emit("component-event", &event);

// This thread is std::thread::spawn, NOT tokio::spawn — blocking_lock() is safe
// here and will not stall the Tokio executor. The original try_lock() rationale
// ("avoid blocking the async runtime") was incorrect for this call site.
Expand All @@ -211,34 +184,6 @@ pub fn run() {
}
});

// Spawn MQTT publish handler thread
let mqtt_manager_publish = mqtt_manager_for_publish;
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let mut rx = mqtt_publish_rx;
log::info!("[MQTT] Publish handler thread started");
while let Some(request) = rx.recv().await {
log::info!("[MQTT] Processing publish request for component {}", request.component_id);

// For now, we need the broker_id and topic from the request
// In a full implementation, we'd look these up from the component
if !request.broker_id.is_empty() && !request.topic.is_empty() {
if let Err(e) = mqtt_manager_publish.publish(
&request.broker_id,
&request.topic,
request.payload.as_bytes(),
request.retain,
).await {
log::error!("[MQTT] Failed to publish: {e}");
}
} else {
log::warn!("[MQTT] Publish request missing broker_id or topic");
}
}
});
});

// Listen for board-state events from hardware monitor
let flow_runtime_board = Arc::clone(&flow_runtime);
let pending_flow_board = Arc::clone(&pending_flow_setup);
Expand All @@ -255,12 +200,12 @@ pub fn run() {
*board_connected_listener.write().unwrap_or_else(std::sync::PoisonError::into_inner) = true;

// Apply pending flow if any, which will also install the callback
if let Some((flow, ctx)) = pending_flow_board.write().unwrap_or_else(std::sync::PoisonError::into_inner).take() {
if let Some((flow, services)) = pending_flow_board.write().unwrap_or_else(std::sync::PoisonError::into_inner).take() {
log::info!("Applying pending flow: {} nodes, {} edges",
flow.nodes.len(), flow.edges.len());
// Use blocking_lock() for async mutex in sync callback context
let mut runtime = flow_runtime_board.blocking_lock();
if let Err(e) = runtime.update_flow(flow, &ctx) {
if let Err(e) = runtime.update_flow(flow, &services) {
log::error!("Failed to apply pending flow: {e}");
} else if let Err(e) = runtime.initialize_hardware() {
log::warn!("Failed to initialize hardware after pending flow: {e}");
Expand Down
27 changes: 18 additions & 9 deletions apps/web/src-tauri/src/llm/commands.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
//! Tauri commands for LLM provider management
//! Tauri commands for LLM provider management.
//!
//! `llm_sync_providers` is the frontend's primary sync path: it pushes the
//! authoritative list of provider records straight into the shared
//! [`crate::runtime::services::LlmRegistry`] held on [`crate::AppState`].
//! Once synced, every live `Llm` component sees the new credentials on its
//! next `dispatch("trigger")` — no flow_update re-fire required (ADR-0002).

use super::provider::ProviderConfig;
use crate::runtime::services::{HttpLlmProvider, LlmProvider};
use crate::AppState;
use std::sync::Arc;
use tauri::State;

/// Provider config payload from the frontend
#[derive(Debug, Clone, serde::Deserialize)]
pub struct SyncProviderConfig {
pub id: String,
#[allow(dead_code)]
pub name: String,
pub base_url: String,
pub api_key: String,
Expand All @@ -20,17 +28,18 @@ pub async fn llm_sync_providers(
state: State<'_, AppState>,
providers: Vec<SyncProviderConfig>,
) -> Result<(), String> {
let configs: Vec<ProviderConfig> = providers
let count = providers.len();
let entries: Vec<(String, Arc<dyn LlmProvider>)> = providers
.into_iter()
.map(|p| ProviderConfig {
id: p.id,
name: p.name,
base_url: p.base_url,
api_key: p.api_key,
.map(|p| {
let provider: Arc<dyn LlmProvider> =
Arc::new(HttpLlmProvider::new(p.base_url, p.api_key));
(p.id, provider)
})
.collect();

state.llm_manager.sync(configs).await;
state.llm_registry.sync(entries).await;
log::info!("[LLM] Synced {count} provider(s) into registry");
Ok(())
}

Expand Down
Loading
Loading