diff --git a/apps/skit/src/server/mod.rs b/apps/skit/src/server/mod.rs index efa46a55..05f5e216 100644 --- a/apps/skit/src/server/mod.rs +++ b/apps/skit/src/server/mod.rs @@ -2008,6 +2008,7 @@ async fn get_pipeline_handler( // Fetch current node states without holding the pipeline lock. let node_states = session.get_node_states().await.unwrap_or_default(); let node_view_data = session.get_node_view_data().await.unwrap_or_default(); + let runtime_schemas = session.get_runtime_schemas().await.unwrap_or_default(); // Clone pipeline (short lock hold) and add runtime state to nodes. let mut api_pipeline = { @@ -2023,6 +2024,11 @@ async fn get_pipeline_handler( api_pipeline.view_data = Some(node_view_data); } + // Attach runtime param schemas so the UI can merge them with static schemas. + if !runtime_schemas.is_empty() { + api_pipeline.runtime_schemas = Some(runtime_schemas); + } + info!("Fetched pipeline with states for session '{}' via HTTP", session_id); Ok(Json(api_pipeline)) } diff --git a/apps/skit/src/session.rs b/apps/skit/src/session.rs index 75f13df1..c1ea704b 100644 --- a/apps/skit/src/session.rs +++ b/apps/skit/src/session.rs @@ -347,6 +347,34 @@ impl Session { ); }); + // Subscribe to runtime schema discovery notifications from the engine + let mut runtime_schema_rx = engine_handle + .subscribe_runtime_schemas() + .await + .map_err(|e| format!("Failed to subscribe to runtime schema updates: {e}"))?; + + // Spawn task to forward runtime schema updates to WebSocket clients + let session_id_for_schemas = session_id.clone(); + let event_tx_for_schemas = event_tx.clone(); + tokio::spawn(async move { + while let Some(update) = runtime_schema_rx.recv().await { + let event = ApiEvent { + message_type: MessageType::Event, + correlation_id: None, + payload: EventPayload::RuntimeSchemasUpdated { + session_id: session_id_for_schemas.clone(), + node_id: update.node_id, + schema: update.schema, + }, + }; + let _ = event_tx_for_schemas.send(BroadcastEvent::to_all(event)); + } + tracing::debug!( + session_id = %session_id_for_schemas, + "Runtime schema forwarding task ended" + ); + }); + // Subscribe to telemetry events from the engine let mut telemetry_rx = engine_handle .subscribe_telemetry() @@ -422,6 +450,19 @@ impl Session { self.engine_handle.get_node_view_data().await } + /// Gets the runtime param schema overrides for all nodes in this session. + /// + /// Only nodes whose `ProcessorNode::runtime_param_schema()` returned + /// `Some` after initialization will have entries. + /// + /// # Errors + /// + /// Returns an error if the engine handle's oneshot channel fails to receive a response, + /// which typically indicates the engine actor has stopped or panicked. + pub async fn get_runtime_schemas(&self) -> Result, String> { + self.engine_handle.get_runtime_schemas().await + } + /// Registers a new preview, enforcing the per-session limit. /// /// # Errors diff --git a/apps/skit/src/websocket.rs b/apps/skit/src/websocket.rs index 85fd9467..8944b524 100644 --- a/apps/skit/src/websocket.rs +++ b/apps/skit/src/websocket.rs @@ -277,7 +277,8 @@ pub async fn handle_websocket( | EventPayload::ConnectionAdded { session_id, .. } | EventPayload::ConnectionRemoved { session_id, .. } | EventPayload::NodeTelemetry { session_id, .. } - | EventPayload::NodeViewDataUpdated { session_id, .. } => { + | EventPayload::NodeViewDataUpdated { session_id, .. } + | EventPayload::RuntimeSchemasUpdated { session_id, .. } => { visible_session_ids.contains(session_id) } } diff --git a/apps/skit/src/websocket_handlers.rs b/apps/skit/src/websocket_handlers.rs index 58458eb2..7da72e5b 100644 --- a/apps/skit/src/websocket_handlers.rs +++ b/apps/skit/src/websocket_handlers.rs @@ -851,7 +851,12 @@ async fn handle_tune_node( } let mut pipeline = session.pipeline.lock().await; if let Some(node) = pipeline.nodes.get_mut(&node_id) { - node.params = Some(durable_params); + // Deep-merge the partial update into existing params so + // sibling keys are preserved (mirrors the async handler). + node.params = Some(match node.params.take() { + Some(existing) => deep_merge_json(existing, durable_params), + None => durable_params, + }); } else { warn!( node_id = %node_id, @@ -988,7 +993,15 @@ async fn handle_tune_node_fire_and_forget( } let mut pipeline = session.pipeline.lock().await; if let Some(node) = pipeline.nodes.get_mut(&node_id) { - node.params = Some(durable_params); + // Deep-merge the partial update into existing params so + // sibling keys are preserved. Without this, a partial + // nested update like `{ properties: { show: false } }` + // would overwrite the entire params, losing keys such + // as `fps`, `width`, or `properties.name`. + node.params = Some(match node.params.take() { + Some(existing) => deep_merge_json(existing, durable_params), + None => durable_params, + }); } else { warn!( node_id = %node_id, @@ -997,6 +1010,10 @@ async fn handle_tune_node_fire_and_forget( } } // Lock released here + // Broadcast the *partial delta* (not merged state) to all clients. + // Correct deep-merge on receive depends on each client having a + // valid base state, which is guaranteed because every client + // fetches the full pipeline on connect. let event = ApiEvent { message_type: MessageType::Event, correlation_id: None, @@ -1066,6 +1083,7 @@ async fn handle_get_pipeline( let node_states = session.get_node_states().await.unwrap_or_default(); let node_view_data = session.get_node_view_data().await.unwrap_or_default(); + let runtime_schemas = session.get_runtime_schemas().await.unwrap_or_default(); // Clone pipeline (short lock hold) and add runtime state to nodes. let mut api_pipeline = { @@ -1081,6 +1099,12 @@ async fn handle_get_pipeline( api_pipeline.view_data = Some(node_view_data); } + // Attach runtime param schemas so the UI can merge them with static schemas + // and render controls for dynamically discovered parameters. + if !runtime_schemas.is_empty() { + api_pipeline.runtime_schemas = Some(runtime_schemas); + } + info!( session_id = %session_id, node_count = api_pipeline.nodes.len(), @@ -1360,3 +1384,80 @@ fn handle_get_permissions(perms: &Permissions, role_name: &str) -> ResponsePaylo info!(role = %role_name, "Returning permissions for role"); ResponsePayload::Permissions { role: role_name.to_string(), permissions: perms.to_info() } } + +/// Recursively deep-merges `source` into `target`, returning the merged value. +/// Only JSON objects are merged recursively; arrays and scalars in `source` +/// replace the corresponding value in `target`. +fn deep_merge_json(target: serde_json::Value, source: serde_json::Value) -> serde_json::Value { + match (target, source) { + (serde_json::Value::Object(mut t_map), serde_json::Value::Object(s_map)) => { + for (key, s_val) in s_map { + let merged = match t_map.remove(&key) { + Some(t_val) => deep_merge_json(t_val, s_val), + None => s_val, + }; + t_map.insert(key, merged); + } + serde_json::Value::Object(t_map) + }, + // Non-object source replaces target wholesale. + (_, source) => source, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn deep_merge_preserves_sibling_keys() { + let target = json!({ + "fps": 30, + "width": 1920, + "properties": { "show": true, "name": "Alex" } + }); + let source = json!({ + "properties": { "show": false } + }); + let merged = deep_merge_json(target, source); + assert_eq!(merged["fps"], 30); + assert_eq!(merged["width"], 1920); + assert_eq!(merged["properties"]["show"], false); + assert_eq!(merged["properties"]["name"], "Alex"); + } + + #[test] + fn deep_merge_adds_new_keys() { + let target = json!({ "a": 1 }); + let source = json!({ "b": 2 }); + let merged = deep_merge_json(target, source); + assert_eq!(merged, json!({ "a": 1, "b": 2 })); + } + + #[test] + fn deep_merge_replaces_non_object() { + let target = json!({ "x": [1, 2, 3] }); + let source = json!({ "x": [4, 5] }); + let merged = deep_merge_json(target, source); + assert_eq!(merged["x"], json!([4, 5])); + } + + #[test] + fn deep_merge_nested_objects() { + let target = json!({ + "properties": { + "home_score": 0, + "away_score": 0, + "show": true + } + }); + let source = json!({ + "properties": { "home_score": 3 } + }); + let merged = deep_merge_json(target, source); + assert_eq!(merged["properties"]["home_score"], 3); + assert_eq!(merged["properties"]["away_score"], 0); + assert_eq!(merged["properties"]["show"], true); + } +} diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs index 8dd034cb..0fc0741b 100644 --- a/crates/api/src/lib.rs +++ b/crates/api/src/lib.rs @@ -465,6 +465,16 @@ pub enum EventPayload { /// RFC 3339 formatted timestamp for convenience timestamp: String, }, + // --- Runtime Schema Events --- + /// A node's runtime param schema has been discovered after initialization. + /// The UI should merge this with the static per-kind schema so controls + /// can render for dynamically discovered parameters (e.g. Slint properties). + RuntimeSchemasUpdated { + session_id: String, + node_id: String, + #[ts(type = "JsonValue")] + schema: serde_json::Value, + }, } pub type Event = Message; @@ -538,6 +548,13 @@ pub struct Pipeline { #[serde(default)] #[ts(type = "Record | null")] pub view_data: Option>, + /// Per-instance runtime param schema overrides discovered after node + /// initialization. Only populated in API responses for nodes whose + /// `ProcessorNode::runtime_param_schema()` returned `Some`. + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + #[ts(type = "Record | null")] + pub runtime_schemas: Option>, } // Type aliases for backwards compatibility diff --git a/crates/api/src/yaml.rs b/crates/api/src/yaml.rs index bb00d737..5f8f00f4 100644 --- a/crates/api/src/yaml.rs +++ b/crates/api/src/yaml.rs @@ -497,7 +497,16 @@ fn compile_steps( nodes.insert(node_name, Node { kind: step.kind, params: step.params, state: None }); } - Pipeline { name, description, mode, client, nodes, connections, view_data: None } + Pipeline { + name, + description, + mode, + client, + nodes, + connections, + view_data: None, + runtime_schemas: None, + } } /// Known bidirectional node kinds that are allowed to participate in cycles. @@ -730,7 +739,16 @@ fn compile_dag( }) .collect(); - Ok(Pipeline { name, description, mode, client, nodes, connections, view_data: None }) + Ok(Pipeline { + name, + description, + mode, + client, + nodes, + connections, + view_data: None, + runtime_schemas: None, + }) } // --------------------------------------------------------------------------- diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 0f6ff0e8..df438925 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -405,6 +405,29 @@ pub trait ProcessorNode: Send + Sync { Ok(PinUpdate::NoChange) } + /// Return a runtime-discovered param schema after initialization. + /// + /// Plugins whose tunable parameters depend on runtime configuration + /// (e.g., properties discovered after compiling a `.slint` file) can + /// override this to return a JSON Schema fragment. The engine will + /// deep-merge it with the static `param_schema` from registration + /// and deliver the enriched schema to the UI. + /// + /// The returned value should be a JSON Schema `"type": "object"` with + /// a `"properties"` map. Each property can include `"tunable": true` + /// and an optional `"path"` override for dot-notation addressing. + /// + /// **Called once** — the engine queries this immediately after + /// [`initialize`](Self::initialize) and caches the result for the + /// lifetime of the node. There is currently no mechanism to refresh + /// the schema at runtime; if the underlying configuration changes + /// (e.g. a different `.slint` file), the node must be re-created. + /// + /// Default: `None` (use static schema only). + fn runtime_param_schema(&self) -> Option { + None + } + /// Tier 2: Runtime pin management capability. /// /// Returns true if this node supports adding/removing pins while running. diff --git a/crates/engine/benches/av1_compositor_pipeline.rs b/crates/engine/benches/av1_compositor_pipeline.rs index b4588672..67b52708 100644 --- a/crates/engine/benches/av1_compositor_pipeline.rs +++ b/crates/engine/benches/av1_compositor_pipeline.rs @@ -287,6 +287,7 @@ fn build_pipeline(args: &BenchArgs) -> streamkit_api::Pipeline { connections, view_data: None, client: None, + runtime_schemas: None, } } diff --git a/crates/engine/benches/compositor_pipeline.rs b/crates/engine/benches/compositor_pipeline.rs index 9b6717d6..a4c70188 100644 --- a/crates/engine/benches/compositor_pipeline.rs +++ b/crates/engine/benches/compositor_pipeline.rs @@ -239,6 +239,7 @@ fn build_pipeline(width: u32, height: u32, fps: u32, frame_count: u32) -> stream connections, view_data: None, client: None, + runtime_schemas: None, } } diff --git a/crates/engine/src/dynamic_actor.rs b/crates/engine/src/dynamic_actor.rs index 0134cc3c..22c91f4d 100644 --- a/crates/engine/src/dynamic_actor.rs +++ b/crates/engine/src/dynamic_actor.rs @@ -11,7 +11,7 @@ use crate::{ constants::DEFAULT_SUBSCRIBER_CHANNEL_CAPACITY, dynamic_config::CONTROL_CAPACITY, - dynamic_messages::{PinConfigMsg, QueryMessage}, + dynamic_messages::{PinConfigMsg, QueryMessage, RuntimeSchemaUpdate}, dynamic_pin_distributor::PinDistributorActor, graph_builder, }; @@ -104,6 +104,15 @@ pub struct DynamicEngine { pub(super) node_view_data: HashMap, /// Subscribers that want to receive node view data updates pub(super) view_data_subscribers: Vec>, + /// Per-instance runtime param schema overrides discovered after node init. + /// Only populated for nodes whose `ProcessorNode::runtime_param_schema()` + /// returns `Some`. + pub(super) runtime_schemas: HashMap, + /// Subscribers that want to receive runtime schema discovery notifications. + /// Unbounded because schema discovery is one-per-node and low-frequency; + /// a bounded channel risks silently dropping a notification that leaves + /// the UI permanently stale. + pub(super) runtime_schema_subscribers: Vec>, // Metrics pub(super) nodes_active_gauge: opentelemetry::metrics::Gauge, pub(super) node_state_transitions_counter: opentelemetry::metrics::Counter, @@ -205,6 +214,14 @@ impl DynamicEngine { QueryMessage::GetNodeViewData { response_tx } => { let _ = response_tx.send(self.node_view_data.clone()).await; }, + QueryMessage::GetRuntimeSchemas { response_tx } => { + let _ = response_tx.send(self.runtime_schemas.clone()).await; + }, + QueryMessage::SubscribeRuntimeSchemas { response_tx } => { + let (tx, rx) = mpsc::unbounded_channel(); + self.runtime_schema_subscribers.push(tx); + let _ = response_tx.send(rx).await; + }, } } @@ -535,6 +552,18 @@ impl DynamicEngine { }, } + // Query runtime param schema after init (before spawning the run loop, + // which consumes the node via `Box`). + if let Some(schema) = node.runtime_param_schema() { + self.runtime_schemas.insert(node_id.to_string(), schema.clone()); + + // Notify subscribers so the UI can merge the schema immediately + // rather than waiting for a manual pipeline re-fetch. + let update = RuntimeSchemaUpdate { node_id: node_id.to_string(), schema }; + self.runtime_schema_subscribers + .retain(|subscriber| subscriber.send(update.clone()).is_ok()); + } + let (control_tx, control_rx) = mpsc::channel(CONTROL_CAPACITY); // 0. Capture pin metadata for runtime type validation @@ -1385,6 +1414,7 @@ impl DynamicEngine { self.node_pin_metadata.remove(node_id); self.pin_management_txs.remove(node_id); self.dynamic_pin_nodes.remove(node_id); + self.runtime_schemas.remove(node_id); self.connections.retain(|(to, _), (from, _)| to != node_id && from != node_id); self.node_kinds.remove(node_id); self.nodes_active_gauge.record(self.live_nodes.len() as u64, &[]); diff --git a/crates/engine/src/dynamic_handle.rs b/crates/engine/src/dynamic_handle.rs index f62d4748..48058e25 100644 --- a/crates/engine/src/dynamic_handle.rs +++ b/crates/engine/src/dynamic_handle.rs @@ -4,7 +4,7 @@ //! Public client handle for controlling a running dynamic engine. -use crate::dynamic_messages::QueryMessage; +use crate::dynamic_messages::{QueryMessage, RuntimeSchemaUpdate}; use std::collections::HashMap; use std::sync::Arc; use streamkit_core::control::EngineControlMessage; @@ -153,6 +153,43 @@ impl DynamicEngineHandle { response_rx.recv().await.ok_or_else(|| "Failed to receive response from engine".to_string()) } + /// Gets the runtime param schema overrides for all nodes in the pipeline. + /// + /// Only nodes whose `ProcessorNode::runtime_param_schema()` returned + /// `Some` after initialization will have entries. + /// + /// # Errors + /// + /// Returns an error if the engine actor has shut down or fails to respond. + pub async fn get_runtime_schemas(&self) -> Result, String> { + let (response_tx, mut response_rx) = mpsc::channel(1); + self.query_tx + .send(QueryMessage::GetRuntimeSchemas { response_tx }) + .await + .map_err(|_| "Engine actor has shut down".to_string())?; + + response_rx.recv().await.ok_or_else(|| "Failed to receive response from engine".to_string()) + } + + /// Subscribes to runtime param schema discovery notifications. + /// Returns a receiver that will receive updates whenever a node's + /// runtime schema is discovered after initialization. + /// + /// # Errors + /// + /// Returns an error if the engine actor has shut down or fails to respond. + pub async fn subscribe_runtime_schemas( + &self, + ) -> Result, String> { + let (response_tx, mut response_rx) = mpsc::channel(1); + self.query_tx + .send(QueryMessage::SubscribeRuntimeSchemas { response_tx }) + .await + .map_err(|_| "Engine actor has shut down".to_string())?; + + response_rx.recv().await.ok_or_else(|| "Failed to receive response from engine".to_string()) + } + /// Sends a shutdown signal to the engine and waits for it to complete. /// This ensures all nodes are properly stopped before returning. /// Can only be called once - subsequent calls will return an error. diff --git a/crates/engine/src/dynamic_messages.rs b/crates/engine/src/dynamic_messages.rs index 99ce078e..19dda819 100644 --- a/crates/engine/src/dynamic_messages.rs +++ b/crates/engine/src/dynamic_messages.rs @@ -45,15 +45,43 @@ impl std::fmt::Display for ConnectionId { } } +/// Notification emitted when a node's runtime param schema is discovered +/// after initialization (e.g. Slint component properties). +#[derive(Clone, Debug)] +pub struct RuntimeSchemaUpdate { + pub node_id: String, + pub schema: serde_json::Value, +} + /// Query messages for retrieving information from the engine without modifying state. pub enum QueryMessage { - GetNodeStates { response_tx: mpsc::Sender> }, - GetNodeStats { response_tx: mpsc::Sender> }, - SubscribeState { response_tx: mpsc::Sender> }, - SubscribeStats { response_tx: mpsc::Sender> }, - SubscribeTelemetry { response_tx: mpsc::Sender> }, - SubscribeViewData { response_tx: mpsc::Sender> }, - GetNodeViewData { response_tx: mpsc::Sender> }, + GetNodeStates { + response_tx: mpsc::Sender>, + }, + GetNodeStats { + response_tx: mpsc::Sender>, + }, + SubscribeState { + response_tx: mpsc::Sender>, + }, + SubscribeStats { + response_tx: mpsc::Sender>, + }, + SubscribeTelemetry { + response_tx: mpsc::Sender>, + }, + SubscribeViewData { + response_tx: mpsc::Sender>, + }, + GetNodeViewData { + response_tx: mpsc::Sender>, + }, + GetRuntimeSchemas { + response_tx: mpsc::Sender>, + }, + SubscribeRuntimeSchemas { + response_tx: mpsc::Sender>, + }, } // Re-export ConnectionMode from core for use by pin distributor diff --git a/crates/engine/src/lib.rs b/crates/engine/src/lib.rs index 092252d5..83fba63e 100644 --- a/crates/engine/src/lib.rs +++ b/crates/engine/src/lib.rs @@ -36,6 +36,8 @@ mod dynamic_pin_distributor; pub use dynamic_config::DynamicEngineConfig; #[cfg(feature = "dynamic")] pub use dynamic_handle::DynamicEngineHandle; +#[cfg(feature = "dynamic")] +pub use dynamic_messages::RuntimeSchemaUpdate; pub use oneshot::{OneshotEngineConfig, OneshotInput, OneshotPipelineResult}; // Import constants and types (within dynamic module) @@ -200,6 +202,8 @@ impl Engine { telemetry_subscribers: Vec::new(), node_view_data: HashMap::new(), view_data_subscribers: Vec::new(), + runtime_schemas: HashMap::new(), + runtime_schema_subscribers: Vec::new(), nodes_active_gauge: meter .u64_gauge("engine.nodes.active") .with_description("Number of active nodes in the pipeline") diff --git a/crates/engine/src/tests/connection_types.rs b/crates/engine/src/tests/connection_types.rs index 4aa7b32f..45f62e94 100644 --- a/crates/engine/src/tests/connection_types.rs +++ b/crates/engine/src/tests/connection_types.rs @@ -55,6 +55,8 @@ fn create_test_engine() -> DynamicEngine { node_packets_discarded_counter: meter.u64_counter("test.discarded").build(), node_packets_errored_counter: meter.u64_counter("test.errored").build(), node_state_gauge: meter.u64_gauge("test.state").build(), + runtime_schemas: HashMap::new(), + runtime_schema_subscribers: Vec::new(), } } diff --git a/crates/engine/src/tests/pipeline_activation.rs b/crates/engine/src/tests/pipeline_activation.rs index 239ed4f7..7792d7e1 100644 --- a/crates/engine/src/tests/pipeline_activation.rs +++ b/crates/engine/src/tests/pipeline_activation.rs @@ -56,6 +56,8 @@ fn create_test_engine() -> DynamicEngine { node_packets_discarded_counter: meter.u64_counter("test.discarded").build(), node_packets_errored_counter: meter.u64_counter("test.errored").build(), node_state_gauge: meter.u64_gauge("test.state").build(), + runtime_schemas: HashMap::new(), + runtime_schema_subscribers: Vec::new(), } } diff --git a/crates/plugin-native/src/wrapper.rs b/crates/plugin-native/src/wrapper.rs index 43d7d866..ce89a815 100644 --- a/crates/plugin-native/src/wrapper.rs +++ b/crates/plugin-native/src/wrapper.rs @@ -201,6 +201,40 @@ impl ProcessorNode for NativeNodeWrapper { self.metadata.outputs.clone() } + fn runtime_param_schema(&self) -> Option { + let get_schema = self.state.api().get_runtime_param_schema?; + let handle = self.state.begin_call()?; + + let result = get_schema(handle); + + if !result.success { + // FFI call failed — log and return None. + if !result.error_message.is_null() { + let msg = unsafe { conversions::c_str_to_string(result.error_message) } + .unwrap_or_default(); + warn!(error = %msg, "Plugin runtime_param_schema failed"); + } + self.state.finish_call(); + return None; + } + + // success=true, null json_schema → plugin has no runtime schema. + if result.json_schema.is_null() { + self.state.finish_call(); + return None; + } + + // success=true, non-null json_schema → JSON string containing the schema. + // SAFETY: result.json_schema points to a thread-local CString set by + // error_to_c (used here as a generic "String → *const c_char" helper). + // We must copy the string BEFORE any other FFI call on this thread + // (including finish_call) that could invoke error_to_c again and + // overwrite the thread-local buffer. + let json_str = unsafe { conversions::c_str_to_string(result.json_schema) }.ok(); + self.state.finish_call(); + json_str.and_then(|s| serde_json::from_str(&s).ok()) + } + // The run method is complex by necessity - it's an async actor managing FFI calls, // control messages, and packet processing. Breaking it up would make the logic harder to follow. #[allow(clippy::too_many_lines)] diff --git a/examples/plugins/gain-native-c/gain_plugin.c b/examples/plugins/gain-native-c/gain_plugin.c index 6aee513b..490df696 100644 --- a/examples/plugins/gain-native-c/gain_plugin.c +++ b/examples/plugins/gain-native-c/gain_plugin.c @@ -295,7 +295,10 @@ static const CNativePluginAPI g_plugin_api = { .process_packet = gain_process_packet, .update_params = gain_update_params, .flush = gain_flush, - .destroy_instance = gain_destroy_instance + .destroy_instance = gain_destroy_instance, + .get_source_config = NULL, /* Not a source plugin */ + .tick = NULL, /* Not a source plugin */ + .get_runtime_param_schema = NULL /* No dynamic params */ }; /* Export the plugin entry point */ diff --git a/examples/plugins/gain-native-c/streamkit_plugin.h b/examples/plugins/gain-native-c/streamkit_plugin.h index 9a18c085..0f1178aa 100644 --- a/examples/plugins/gain-native-c/streamkit_plugin.h +++ b/examples/plugins/gain-native-c/streamkit_plugin.h @@ -9,7 +9,15 @@ * Plugins must export a single symbol `streamkit_native_plugin_api` that * returns a pointer to a CNativePluginAPI struct. * - * API Version: 2 + * API Version: 4 + * + * Version history: + * v1: Initial release — processor nodes. + * v2: Added telemetry callback parameters to process_packet and flush. + * v3: Added video types (CRawVideoFormat, CPixelFormat, CVideoFrame), + * source node support (get_source_config, tick, CSourceConfig, CTickResult). + * v4: Added get_runtime_param_schema (CSchemaResult) for dynamic runtime + * parameter discovery. */ #ifndef STREAMKIT_PLUGIN_H @@ -28,7 +36,7 @@ extern "C" { * ============================================================================ */ /** Current API version. Plugins and host check compatibility via this field. */ -#define STREAMKIT_NATIVE_PLUGIN_API_VERSION 2 +#define STREAMKIT_NATIVE_PLUGIN_API_VERSION 4 /* ============================================================================ * Core Types @@ -63,6 +71,43 @@ typedef struct CResult { const char* error_message; /**< NULL on success, error string on failure */ } CResult; +/** + * Result type for get_runtime_param_schema. + * + * Unlike CResult, this type has a dedicated json_schema field for the + * success payload so that plugin authors don't have to read a JSON + * string out of error_message. + * + * - success=true, json_schema=NULL → plugin has no runtime schema. + * - success=true, json_schema= → JSON Schema string. + * - success=false, error_message= → error description. + * + * All pointers are borrowed and must not be freed by the caller. + */ +typedef struct CSchemaResult { + bool success; + const char* error_message; /**< NULL on success, error string on failure */ + const char* json_schema; /**< NULL when no schema, JSON string on success */ +} CSchemaResult; + +/** Helper to create a CSchemaResult with no runtime schema */ +static inline CSchemaResult CSchemaResult_none(void) { + CSchemaResult r = {true, NULL, NULL}; + return r; +} + +/** Helper to create a CSchemaResult carrying a JSON schema */ +static inline CSchemaResult CSchemaResult_schema(const char* json) { + CSchemaResult r = {true, NULL, json}; + return r; +} + +/** Helper to create a CSchemaResult error */ +static inline CSchemaResult CSchemaResult_error(const char* msg) { + CSchemaResult r = {false, msg, NULL}; + return r; +} + /** Helper to create a successful result */ static inline CResult CResult_success(void) { CResult r = {true, NULL}; @@ -118,9 +163,34 @@ typedef enum CPacketType { PACKET_TYPE_CUSTOM = 4, PACKET_TYPE_BINARY = 5, PACKET_TYPE_ANY = 6, - PACKET_TYPE_PASSTHROUGH = 7 + PACKET_TYPE_PASSTHROUGH = 7, + PACKET_TYPE_RAW_VIDEO = 8, + PACKET_TYPE_ENCODED_VIDEO = 9 } CPacketType; +/** Pixel format discriminant for raw video frames. */ +typedef enum CPixelFormat { + PIXEL_FORMAT_RGBA8 = 0, + PIXEL_FORMAT_I420 = 1, + PIXEL_FORMAT_NV12 = 2 +} CPixelFormat; + +/** Raw video format metadata. */ +typedef struct CRawVideoFormat { + uint32_t width; /**< Frame width in pixels (0 = unspecified) */ + uint32_t height; /**< Frame height in pixels (0 = unspecified) */ + CPixelFormat pixel_format; +} CRawVideoFormat; + +/** Video frame data passed across the ABI boundary. */ +typedef struct CVideoFrame { + uint32_t width; + uint32_t height; + CPixelFormat pixel_format; + const uint8_t* data; + size_t data_len; +} CVideoFrame; + /** Encoding for custom packets. */ typedef enum CCustomEncoding { CUSTOM_ENCODING_JSON = 0, @@ -155,8 +225,9 @@ typedef struct CCustomPacket { */ typedef struct CPacketTypeInfo { CPacketType type_discriminant; - const CAudioFormat* audio_format; /**< Non-NULL only for RawAudio */ - const char* custom_type_id; /**< Non-NULL only for Custom */ + const CAudioFormat* audio_format; /**< Non-NULL only for RawAudio */ + const char* custom_type_id; /**< Non-NULL only for Custom */ + const CRawVideoFormat* raw_video_format; /**< Non-NULL only for RawVideo */ } CPacketTypeInfo; /** @@ -205,6 +276,35 @@ typedef struct CNodeMetadata { size_t categories_count; } CNodeMetadata; +/* ============================================================================ + * Source Node Types (v3) + * ============================================================================ */ + +/** + * Source node configuration returned by the plugin. + * + * Tells the host how to drive the tick loop for source nodes (nodes with no + * inputs that generate data on their own schedule). + */ +typedef struct CSourceConfig { + /** If true, this plugin is a source node (no inputs, host drives tick loop). */ + bool is_source; + /** Microseconds between ticks (e.g. 33333 for 30 fps). */ + uint64_t tick_interval_us; + /** If > 0, host stops after this many ticks. 0 = infinite. */ + uint64_t max_ticks; +} CSourceConfig; + +/** + * Result returned by the source tick function. + */ +typedef struct CTickResult { + /** Standard success/error result. */ + CResult result; + /** If true, the source is done producing output (finite mode). */ + bool done; +} CTickResult; + /* ============================================================================ * Callbacks * ============================================================================ */ @@ -291,6 +391,54 @@ typedef struct CNativePluginAPI { * @param handle Plugin instance handle */ void (*destroy_instance)(CPluginHandle handle); + + /* -- v3 additions ---------------------------------------------------- */ + + /** + * Query source configuration after instance creation (optional). + * + * NULL for processor plugins. When non-NULL, the returned + * CSourceConfig.is_source tells the host whether to use the tick + * loop instead of the input-driven processing loop. + * + * @param handle Plugin instance handle + * @return CSourceConfig describing tick behaviour + */ + CSourceConfig (*get_source_config)(CPluginHandle handle); + + /** + * Produce one unit of output (source plugins, optional). + * + * The host calls this at the interval specified by get_source_config. + * The plugin renders one frame/sample/etc. and sends it via + * output_callback. Returns CTickResult to signal continuation or + * completion. + * + * NULL for processor plugins. + * + * @param handle Plugin instance handle + * @param output_callback Callback to send output packets + * @param callback_data User data to pass to callback + * @return CTickResult (done flag + result) + */ + CTickResult (*tick)(CPluginHandle handle, COutputCallback output_callback, + void* callback_data); + + /* -- v4 additions ---------------------------------------------------- */ + + /** + * Get runtime-discovered param schema (optional). + * + * Plugins whose tunable parameters depend on runtime configuration + * (e.g. properties discovered after compiling a .slint file) can + * implement this to return a JSON Schema fragment. The host merges + * it with the static param_schema and delivers it to the UI. + * + * @param handle Plugin instance handle + * @return CSchemaResult: success+NULL = no schema, + * success+json = schema, !success = error + */ + CSchemaResult (*get_runtime_param_schema)(CPluginHandle handle); } CNativePluginAPI; /* ============================================================================ diff --git a/plugins/native/slint/src/slint_node.rs b/plugins/native/slint/src/slint_node.rs index 8f89ae10..88e59914 100644 --- a/plugins/native/slint/src/slint_node.rs +++ b/plugins/native/slint/src/slint_node.rs @@ -10,7 +10,9 @@ use streamkit_plugin_sdk_native::streamkit_core::types::{ }; use crate::config::SlintConfig; -use crate::slint_thread::{send_work, NodeId, SlintThreadResult, SlintWorkItem}; +use crate::slint_thread::{ + send_work, DiscoveredProperty, DiscoveredValueType, NodeId, SlintThreadResult, SlintWorkItem, +}; /// Slint UI video source plugin. /// @@ -24,6 +26,9 @@ pub struct SlintSourcePlugin { tick_count: u64, duration_us: u64, logger: Logger, + /// Properties discovered from the compiled `.slint` component at init. + /// Used to build the runtime param schema so the UI can render controls. + discovered_properties: Vec, } impl NativeSourceNode for SlintSourcePlugin { @@ -135,6 +140,7 @@ impl NativeSourceNode for SlintSourcePlugin { tick_count: 0, duration_us: 1_000_000 / 30, logger, + discovered_properties: Vec::new(), }); }; @@ -163,21 +169,26 @@ impl NativeSourceNode for SlintSourcePlugin { // Wait for init result. match result_rx.recv() { - Ok(SlintThreadResult::InitOk) => { + Ok(SlintThreadResult::InitOk { properties }) => { plugin_info!(logger, "Slint instance registered: {node_id}"); + Ok(Self { + config, + node_id, + result_rx, + tick_count: 0, + duration_us, + logger, + discovered_properties: properties, + }) }, Ok(SlintThreadResult::InitErr(e)) => { - return Err(format!("Slint instance creation failed: {e}")); + Err(format!("Slint instance creation failed: {e}")) }, Ok(SlintThreadResult::Frame { .. }) => { - return Err("Unexpected frame result during init".to_string()); - }, - Err(_) => { - return Err("Shared Slint thread channel closed during init".to_string()); + Err("Unexpected frame result during init".to_string()) }, + Err(_) => Err("Shared Slint thread channel closed during init".to_string()), } - - Ok(Self { config, node_id, result_rx, tick_count: 0, duration_us, logger }) } fn tick(&mut self, output: &OutputSender) -> Result { @@ -237,4 +248,39 @@ impl NativeSourceNode for SlintSourcePlugin { let _ = send_work(SlintWorkItem::Unregister { node_id: self.node_id }); plugin_info!(self.logger, "Slint instance unregistered: {}", self.node_id); } + + fn runtime_param_schema(&self) -> Option { + if self.discovered_properties.is_empty() { + return None; + } + + let mut props = serde_json::Map::new(); + for dp in &self.discovered_properties { + let type_str = match dp.value_type { + DiscoveredValueType::Bool => "boolean", + DiscoveredValueType::Number => "number", + DiscoveredValueType::String => "string", + }; + + let mut schema = serde_json::json!({ + "type": type_str, + "tunable": true, + "path": format!("properties.{}", dp.name), + "description": format!("Slint property: {}", dp.name), + }); + + // Include the initial value from the component so the UI can + // show the correct default state (e.g. a toggle that starts on). + if let Some(ref initial) = dp.initial_value { + schema["default"] = initial.clone(); + } + + props.insert(dp.name.clone(), schema); + } + + Some(serde_json::json!({ + "type": "object", + "properties": props, + })) + } } diff --git a/plugins/native/slint/src/slint_thread.rs b/plugins/native/slint/src/slint_thread.rs index d20b8cc6..c35b1599 100644 --- a/plugins/native/slint/src/slint_thread.rs +++ b/plugins/native/slint/src/slint_thread.rs @@ -25,13 +25,34 @@ use slint::platform::software_renderer::{ }; use slint::platform::WindowAdapter; use slint::{ComponentHandle, LogicalSize, SharedString}; -use slint_interpreter::{ComponentDefinition, ComponentInstance, Value}; +use slint_interpreter::{ComponentDefinition, ComponentInstance, Value, ValueType}; use crate::config::SlintConfig; /// Opaque identifier for a plugin instance on the shared Slint thread. pub type NodeId = uuid::Uuid; +/// Describes a single publicly declared property discovered from a compiled +/// `.slint` component. Used to build the runtime param schema. +#[derive(Debug, Clone)] +pub struct DiscoveredProperty { + pub name: String, + pub value_type: DiscoveredValueType, + /// The initial value of the property as declared in the `.slint` file. + /// Used as the `default` in the runtime JSON Schema so the UI can show + /// the correct initial state (e.g. a toggle that is `true` at startup). + pub initial_value: Option, +} + +/// Subset of `slint_interpreter::ValueType` that maps to JSON Schema types +/// the UI can render as controls. +#[derive(Debug, Clone, Copy)] +pub enum DiscoveredValueType { + Bool, + Number, + String, +} + /// Work item sent from a plugin's `tick()` to the shared Slint thread. pub enum SlintWorkItem { /// Register a new instance: compile its `.slint` file and create a component. @@ -53,7 +74,9 @@ pub enum SlintWorkItem { /// Result sent from the shared Slint thread back to a specific instance. pub enum SlintThreadResult { /// Init succeeded — the instance can start rendering. - InitOk, + /// Carries the list of publicly declared properties discovered from the + /// compiled `.slint` component (may be empty). + InitOk { properties: Vec }, /// Init failed with an error message. InitErr(String), /// A rendered frame. @@ -126,12 +149,19 @@ fn slint_thread_main(work_rx: std::sync::mpsc::Receiver) { SlintWorkItem::Register { node_id, config, result_tx } => { match create_slint_instance(&config, &mut platform_set) { Ok(instance) => { + // Discover publicly declared properties from the compiled + // component. Only types the UI can render as controls + // (bool, number, string) are included. + let properties = + discover_properties(&instance.definition, &instance.component); + tracing::info!( node_id = %node_id, slint_file = %config.slint_file, + discovered_properties = properties.len(), "Created Slint instance", ); - let _ = result_tx.send(SlintThreadResult::InitOk); + let _ = result_tx.send(SlintThreadResult::InitOk { properties }); instances.insert( node_id, InstanceState { @@ -341,6 +371,58 @@ fn create_slint_instance( Ok(SlintInstance { window, component, definition, buffer, width, frame_counter: 0 }) } +/// Enumerate the publicly declared properties of a compiled Slint component +/// and return those whose types map to JSON Schema primitives the UI can +/// render as controls (boolean → toggle, number → slider, string → text). +/// +/// Also reads the initial value of each property from the instantiated +/// component so the UI can show the correct initial state. +/// +/// **Limitation:** `.slint` files are assumed to be static for the lifetime +/// of the node. Property discovery happens once at initialization; if the +/// source file changes, the node must be re-created to pick up new properties. +fn discover_properties( + definition: &ComponentDefinition, + component: &ComponentInstance, +) -> Vec { + definition + .properties() + .filter_map(|(name, value_type)| { + let vt = match value_type { + ValueType::Bool => DiscoveredValueType::Bool, + ValueType::Number => DiscoveredValueType::Number, + ValueType::String => DiscoveredValueType::String, + // Image, Model, Struct, Brush, etc. are not tuneable. + _ => return None, + }; + // Read the initial value from the live component instance so the + // UI can display the correct default (e.g. clock_running = true). + let initial_value = component.get_property(&name).ok().and_then(|v| match v { + Value::Bool(b) => Some(serde_json::Value::Bool(b)), + Value::Number(n) => { + let json_num = serde_json::Number::from_f64(n); + if json_num.is_none() { + tracing::warn!( + property = %name, + value = %n, + "Slint property has NaN/Infinity value, dropping default" + ); + } + json_num.map(serde_json::Value::Number) + }, + Value::String(s) => Some(serde_json::Value::String(s.to_string())), + _ => None, + }); + // Slint normalizes identifiers to kebab-case internally + // (e.g. `clock_running` → `clock-running`). The rest of the + // StreamKit stack (YAML params, JSON UpdateParams, set_properties) + // uses snake_case, so convert back to match. + let name = name.replace('-', "_"); + Some(DiscoveredProperty { name, value_type: vt, initial_value }) + }) + .collect() +} + /// Render a single frame from the Slint instance, returning raw RGBA8 data. /// /// Applies property keyframe cycling. Timer/animation pumping is handled diff --git a/samples/pipelines/dynamic/video_moq_slint_scoreboard.yml b/samples/pipelines/dynamic/video_moq_slint_scoreboard.yml index 873e920c..9b0966b0 100644 --- a/samples/pipelines/dynamic/video_moq_slint_scoreboard.yml +++ b/samples/pipelines/dynamic/video_moq_slint_scoreboard.yml @@ -44,6 +44,12 @@ client: node: lower_third property: properties.show default: true + - label: "Clock Running" + type: toggle + node: scoreboard + property: properties.clock_running + group: Scoreboard + default: true - label: "Player Name" type: text node: lower_third @@ -102,6 +108,7 @@ nodes: home_score: 3 away_score: 1 clock_start: 754 + clock_running: true period: "2ND" lower_third: diff --git a/sdks/plugin-sdk/native/src/lib.rs b/sdks/plugin-sdk/native/src/lib.rs index 8ba275e9..572d10af 100644 --- a/sdks/plugin-sdk/native/src/lib.rs +++ b/sdks/plugin-sdk/native/src/lib.rs @@ -328,6 +328,19 @@ pub trait NativeProcessorNode: Sized + Send + 'static { /// Clean up resources (optional) fn cleanup(&mut self) {} + + /// Return a runtime-discovered param schema after initialization (optional). + /// + /// Plugins whose tunable parameters depend on runtime configuration + /// (e.g., properties discovered after compiling a `.slint` file) can + /// override this to return a JSON Schema fragment. The engine will + /// deep-merge it with the static `param_schema` from metadata and + /// deliver the enriched schema to the UI. + /// + /// Default: `None` (use static schema only). + fn runtime_param_schema(&self) -> Option { + None + } } /// Configuration for a source node's tick loop. @@ -432,6 +445,19 @@ pub trait NativeSourceNode: Sized + Send + 'static { /// Clean up resources (optional). fn cleanup(&mut self) {} + + /// Return a runtime-discovered param schema after initialization (optional). + /// + /// Source plugins whose tunable parameters depend on runtime configuration + /// (e.g., properties discovered after compiling a `.slint` file) can + /// override this to return a JSON Schema fragment. The engine will + /// deep-merge it with the static `param_schema` from metadata and + /// deliver the enriched schema to the UI. + /// + /// Default: `None` (use static schema only). + fn runtime_param_schema(&self) -> Option { + None + } } /// Optional trait for plugins that need shared resource management (e.g., ML models). @@ -512,6 +538,52 @@ pub trait ResourceSupport: NativeProcessorNode { } } +/// Internal helper macro: generates `__plugin_get_runtime_param_schema` and +/// `__plugin_destroy_instance` trampolines. Shared by both +/// `native_plugin_entry!` and `native_source_plugin_entry!` to avoid +/// identical duplicated implementations. +#[macro_export] +#[doc(hidden)] +macro_rules! __plugin_shared_ffi { + ($plugin_type:ty) => { + extern "C" fn __plugin_get_runtime_param_schema( + handle: $crate::types::CPluginHandle, + ) -> $crate::types::CSchemaResult { + if handle.is_null() { + return $crate::types::CSchemaResult::none(); + } + + let instance = unsafe { &*(handle as *const $plugin_type) }; + match instance.runtime_param_schema() { + None => $crate::types::CSchemaResult::none(), + Some(schema) => match serde_json::to_string(&schema) { + Ok(json) => { + // NOTE: error_to_c is a misnomer here — it's a generic + // "String → thread-local CString" helper reused for the + // success payload. A rename to e.g. `thread_local_c_str` + // would clarify intent but touches many call-sites. + let c_str = $crate::conversions::error_to_c(json); + $crate::types::CSchemaResult::schema(c_str) + }, + Err(e) => { + let err_msg = $crate::conversions::error_to_c(format!( + "Failed to serialize runtime param schema: {e}" + )); + $crate::types::CSchemaResult::error(err_msg) + }, + }, + } + } + + extern "C" fn __plugin_destroy_instance(handle: $crate::types::CPluginHandle) { + if !handle.is_null() { + let mut instance = unsafe { Box::from_raw(handle as *mut $plugin_type) }; + instance.cleanup(); + } + } + }; +} + /// Macro to generate C ABI exports for a plugin /// /// This macro should be called once per plugin with the type that implements @@ -564,6 +636,7 @@ macro_rules! native_plugin_entry { destroy_instance: __plugin_destroy_instance, get_source_config: None, tick: None, + get_runtime_param_schema: Some(__plugin_get_runtime_param_schema), }; &API } @@ -1024,12 +1097,7 @@ macro_rules! native_plugin_entry { } } - extern "C" fn __plugin_destroy_instance(handle: $crate::types::CPluginHandle) { - if !handle.is_null() { - let mut instance = unsafe { Box::from_raw(handle as *mut $plugin_type) }; - instance.cleanup(); - } - } + $crate::__plugin_shared_ffi!($plugin_type); }; } @@ -1091,6 +1159,7 @@ macro_rules! native_source_plugin_entry { destroy_instance: __plugin_destroy_instance, get_source_config: Some(__plugin_get_source_config), tick: Some(__plugin_tick), + get_runtime_param_schema: Some(__plugin_get_runtime_param_schema), }; &API } @@ -1553,11 +1622,6 @@ macro_rules! native_source_plugin_entry { } } - extern "C" fn __plugin_destroy_instance(handle: $crate::types::CPluginHandle) { - if !handle.is_null() { - let mut instance = unsafe { Box::from_raw(handle as *mut $plugin_type) }; - instance.cleanup(); - } - } + $crate::__plugin_shared_ffi!($plugin_type); }; } diff --git a/sdks/plugin-sdk/native/src/types.rs b/sdks/plugin-sdk/native/src/types.rs index 3c25ab81..dc549c99 100644 --- a/sdks/plugin-sdk/native/src/types.rs +++ b/sdks/plugin-sdk/native/src/types.rs @@ -11,9 +11,14 @@ use std::os::raw::{c_char, c_void}; /// API version number. Plugins and host check compatibility via this field. /// +/// v1: Initial release — processor nodes (get_metadata, create_instance, +/// process_packet, update_params, flush, destroy_instance). +/// v2: Added telemetry callback parameters to process_packet and flush. /// v3: Added video packet types (`RawVideo`, `EncodedVideo`), `CRawVideoFormat`, /// `CPixelFormat`, and source node support (`get_source_config`, `tick`). -pub const NATIVE_PLUGIN_API_VERSION: u32 = 3; +/// v4: Added `get_runtime_param_schema` (returns [`CSchemaResult`]) for +/// dynamic runtime parameter discovery. +pub const NATIVE_PLUGIN_API_VERSION: u32 = 4; /// Opaque handle to a plugin instance pub type CPluginHandle = *mut c_void; @@ -61,6 +66,46 @@ impl CResult { } } +/// Result type for `get_runtime_param_schema`. +/// +/// Unlike [`CResult`], this type has a dedicated `json_schema` field for the +/// success payload so that plugin authors don't have to read a JSON string +/// out of `error_message`. +/// +/// - `success=true`, `json_schema=NULL` → plugin has no runtime schema. +/// - `success=true`, `json_schema=` → JSON Schema string. +/// - `success=false`, `error_message=` → error description. +/// +/// # Ownership +/// +/// Both pointers are **borrowed** and must not be freed by the caller. +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct CSchemaResult { + pub success: bool, + /// Null-terminated error message on failure, NULL on success. + pub error_message: *const c_char, + /// Null-terminated JSON Schema string on success, NULL otherwise. + pub json_schema: *const c_char, +} + +impl CSchemaResult { + /// No runtime schema (success, both pointers NULL). + pub const fn none() -> Self { + Self { success: true, error_message: std::ptr::null(), json_schema: std::ptr::null() } + } + + /// Runtime schema available (success, json_schema carries the payload). + pub const fn schema(json: *const c_char) -> Self { + Self { success: true, error_message: std::ptr::null(), json_schema: json } + } + + /// Error during schema retrieval. + pub const fn error(msg: *const c_char) -> Self { + Self { success: false, error_message: msg, json_schema: std::ptr::null() } + } +} + /// Audio sample format #[repr(C)] #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -373,6 +418,18 @@ pub struct CNativePluginAPI { *mut c_void, ) -> CTickResult, >, + + // ── v4 additions ────────────────────────────────────────────────────── + /// Query runtime-discovered param schema after instance creation. + /// + /// Returns a [`CSchemaResult`] describing additional tunable parameters + /// discovered at runtime (e.g. properties from a compiled `.slint` + /// file). The host deep-merges this with the static `param_schema` + /// from metadata and delivers it to the UI. + /// + /// `None` when the plugin has no runtime-discovered parameters (the + /// common case — most plugins declare everything statically). + pub get_runtime_param_schema: Option CSchemaResult>, } /// Symbol name that plugins must export diff --git a/ui/src/components/FlowCanvas.tsx b/ui/src/components/FlowCanvas.tsx index 43820b49..930ac95e 100644 --- a/ui/src/components/FlowCanvas.tsx +++ b/ui/src/components/FlowCanvas.tsx @@ -165,6 +165,7 @@ export const FlowCanvas = = Record = ({ pipelineYaml, sessionId }) => { +const OverlayControls: React.FC = ({ pipelineYaml, sessionId, pipeline }) => { const { tuneNodeConfig } = useTuneNode(sessionId); + const nodeDefinitions = useSchemaStore((s) => s.nodeDefinitions); // Parse controls from the pipeline YAML's client section. - const controls: ControlConfig[] = useMemo( + const yamlControls: ControlConfig[] = useMemo( () => parseClientFromYaml(pipelineYaml)?.controls ?? [], [pipelineYaml] ); + // Generate schema-driven controls from runtime_schemas (same source as + // Monitor View) and merge with YAML controls. YAML controls take + // precedence when they target the same node+property — they carry + // hand-authored labels, groups, and range overrides. + const controls: ControlConfig[] = useMemo(() => { + if (!pipeline?.runtime_schemas) return yamlControls; + + // Build a set of node:property keys already covered by YAML controls + const yamlKeys = new Set(yamlControls.map((c) => `${c.node}:${c.property}`)); + + const schemaControls: ControlConfig[] = []; + for (const [nodeId, rawSchema] of Object.entries(pipeline.runtime_schemas)) { + const runtimeSchema = rawSchema as JsonSchema | undefined; + if (!runtimeSchema) continue; + + // Merge with static param_schema (if any) from node registry + const node = pipeline.nodes[nodeId]; + const nodeDef = node ? nodeDefinitions.find((d) => d.kind === node.kind) : undefined; + const baseSchema = nodeDef?.param_schema as JsonSchema | undefined; + const merged = deepMergeSchemas(baseSchema, runtimeSchema); + + // Convert tunable properties to ControlConfig entries + const generated = schemaToControlConfigs(nodeId, merged, nodeId); + for (const ctrl of generated) { + const key = `${ctrl.node}:${ctrl.property}`; + if (!yamlKeys.has(key)) { + schemaControls.push(ctrl); + yamlKeys.add(key); // prevent duplicates within runtime schemas + } + } + } + + return [...yamlControls, ...schemaControls]; + }, [yamlControls, pipeline, nodeDefinitions]); + // Build a send callback for a control. A new closure is created per // render, but child controls absorb this via onSendRef so it is safe. const makeSend = useCallback( diff --git a/ui/src/constants/timing.ts b/ui/src/constants/timing.ts index dfdcc755..dc90e0d6 100644 --- a/ui/src/constants/timing.ts +++ b/ui/src/constants/timing.ts @@ -18,3 +18,11 @@ * while leaving headroom for network RTT and server processing. */ export const PARAM_THROTTLE_MS = 33; + +/** + * Debounce delay for text input controls on node cards. + * + * 300ms gives the user time to finish typing before sending the value + * to the server, avoiding excessive partial updates. + */ +export const TEXT_DEBOUNCE_MS = 300; diff --git a/ui/src/hooks/useNumericSlider.ts b/ui/src/hooks/useNumericSlider.ts index fddf69d6..9a950d9e 100644 --- a/ui/src/hooks/useNumericSlider.ts +++ b/ui/src/hooks/useNumericSlider.ts @@ -13,11 +13,17 @@ import { useState, useEffect, useMemo, useRef } from 'react'; import { PARAM_THROTTLE_MS } from '@/constants/timing'; import { nodeParamsAtom } from '@/stores/sessionAtoms'; +import { readByPath } from '@/utils/controlProps'; export interface UseNumericSliderOptions { nodeId: string; sessionId?: string; paramKey: string; + /** + * Dot-notation path for reading/writing nested params. + * Defaults to `paramKey` when omitted. + */ + path?: string; min: number; max: number; step: number; @@ -68,6 +74,7 @@ export const useNumericSlider = (options: UseNumericSliderOptions): UseNumericSl nodeId, sessionId, paramKey, + path: pathOverride, min, max, step, @@ -78,10 +85,13 @@ export const useNumericSlider = (options: UseNumericSliderOptions): UseNumericSl throttleMs = PARAM_THROTTLE_MS, } = options; - // Get stored value from Jotai per-node atom (fine-grained reactivity) + const effectivePath = pathOverride ?? paramKey; + + // Get stored value from Jotai per-node atom (fine-grained reactivity). + // Use readByPath so nested paths (e.g. "properties.score") are resolved. const paramsKey = sessionId ? `${sessionId}\0${nodeId}` : nodeId; const nodeParams = useAtomValue(nodeParamsAtom(paramsKey)); - const storedValue = nodeParams[paramKey] as number | undefined; + const storedValue = readByPath(nodeParams, effectivePath) as number | undefined; // Determine effective value: stored > prop > default const effectiveValue = (() => { @@ -125,12 +135,12 @@ export const useNumericSlider = (options: UseNumericSliderOptions): UseNumericSl return throttle( (value: number) => { const transformedValue = transformValue ? transformValue(value) : value; - onParamChange(nodeId, paramKey, transformedValue); + onParamChange(nodeId, effectivePath, transformedValue); }, throttleMs, { leading: true, trailing: true } ); - }, [nodeId, onParamChange, paramKey, transformValue, throttleMs]); + }, [nodeId, onParamChange, effectivePath, transformValue, throttleMs]); // Cancel throttled function on unmount useEffect( diff --git a/ui/src/hooks/usePipeline.ts b/ui/src/hooks/usePipeline.ts index 8ec250c5..110e132f 100644 --- a/ui/src/hooks/usePipeline.ts +++ b/ui/src/hooks/usePipeline.ts @@ -12,8 +12,10 @@ import { sessionStore as defaultSessionStore, nodeParamsAtom, writeNodeParam, + writeNodeParams, clearNodeParams, } from '@/stores/sessionAtoms'; +import { dispatchParamUpdate } from '@/utils/controlProps'; import { hooksLogger } from '@/utils/logger'; import { parseYamlToPipeline, type EngineMode } from '@/utils/yamlPipeline'; @@ -104,7 +106,13 @@ export const usePipeline = () => { const regenerateYamlRef = useRef<() => void>(() => {}); const handleParamChange = useCallback((nodeId: string, paramName: string, value: unknown) => { - writeNodeParam(nodeId, paramName, value); + // Dot-notation paths (e.g. "properties.score") need to be stored as + // nested objects so readByPath can find them. Flat keys use the + // simple writeNodeParam helper. + dispatchParamUpdate(nodeId, paramName, value, writeNodeParam, (nid, config) => { + // writeNodeParams handles the deep-merge internally. + writeNodeParams(nid, config); + }); // Keep the YAML editor in sync with param changes made via the canvas // (e.g. compositor layer drag / slider). The guard prevents a feedback // loop when YAML editing triggers parseYamlToPipeline which stores the diff --git a/ui/src/hooks/useTuneNode.ts b/ui/src/hooks/useTuneNode.ts index d54b323b..8bd5f96b 100644 --- a/ui/src/hooks/useTuneNode.ts +++ b/ui/src/hooks/useTuneNode.ts @@ -6,9 +6,8 @@ import { useCallback } from 'react'; import { v4 as uuidv4 } from 'uuid'; import { getWebSocketService } from '@/services/websocket'; -import { sessionStore, nodeParamsAtom, nodeKey, writeNodeParams } from '@/stores/sessionAtoms'; +import { writeNodeParams } from '@/stores/sessionAtoms'; import type { Request, MessageType } from '@/types/types'; -import { deepMerge } from '@/utils/controlProps'; // Resolved once at module level — getWebSocketService returns a singleton, // so hoisting it avoids a new reference on every render and keeps @@ -31,13 +30,10 @@ export function useTuneNode(sessionId: string | null) { (nodeId: string, config: Record) => { if (!sessionId) return; - // Deep-merge the partial update into the current atom value so - // sibling nested properties are preserved (e.g. updating + // writeNodeParams deep-merges the partial update into the current + // atom value, preserving sibling nested properties (e.g. updating // properties.home_score doesn't clobber properties.away_score). - const k = nodeKey(sessionId, nodeId); - const current = sessionStore.get(nodeParamsAtom(k)); - const merged = deepMerge(current, config); - writeNodeParams(nodeId, merged, sessionId); + writeNodeParams(nodeId, config, sessionId); const request: Request = { type: 'request' as MessageType, diff --git a/ui/src/nodes/ConfigurableNode.tsx b/ui/src/nodes/ConfigurableNode.tsx index bbe87757..614652c7 100644 --- a/ui/src/nodes/ConfigurableNode.tsx +++ b/ui/src/nodes/ConfigurableNode.tsx @@ -3,25 +3,38 @@ // SPDX-License-Identifier: MPL-2.0 import styled from '@emotion/styled'; -import * as Tooltip from '@radix-ui/react-tooltip'; -import React from 'react'; +import React, { useCallback, useMemo, useState } from 'react'; import { NodeFrame } from '@/components/node/NodeFrame'; -import { LiveBadge, LiveDot } from '@/components/ui/LiveIndicator'; +import { LiveDot } from '@/components/ui/LiveIndicator'; import { useNumericSlider } from '@/hooks/useNumericSlider'; import { areNodePropsEqual } from '@/nodes/nodePropsEqual'; +import { + BooleanToggleControl, + TextInputControl, + ControlLabel, + ControlLabelText, + ControlDescription, +} from '@/nodes/SchemaControls'; import { perfOnRender } from '@/perf'; import type { InputPin, OutputPin, NodeState, NodeStats, NodeDefinition } from '@/types/types'; +import { readByPath } from '@/utils/controlProps'; import { type JsonSchemaProperty, type JsonSchema, isFiniteNumber, extractSliderConfigs, + extractToggleConfigs, + extractTextConfigs, decimalPlacesFromStep, formatNumber, } from '@/utils/jsonSchema'; import { nodesLogger } from '@/utils/logger'; +// Module-level map so expanded state survives topology rebuilds (which +// recreate ConfigurableNode React elements, resetting useState). +const expandedState = new Map(); + const ParamCount = styled.div` padding: 4px 0; font-size: 12px; @@ -30,7 +43,7 @@ const ParamCount = styled.div` border-top: 1px solid var(--sk-border); `; -const SliderGroup = styled.div` +const ControlGroup = styled.div` display: flex; flex-direction: column; gap: 8px; @@ -44,25 +57,6 @@ const SliderWrapper = styled.div` padding: 4px 0; `; -const SliderLabel = styled.div` - display: flex; - justify-content: space-between; - align-items: center; - gap: 6px; - font-size: 12px; - font-weight: 600; - color: var(--sk-text); -`; - -const SliderLabelText = styled.span` - flex: 0 0 auto; -`; - -const SliderDescription = styled.div` - font-size: 11px; - color: var(--sk-text-muted); -`; - const SliderValue = styled.span` font-variant-numeric: tabular-nums; color: var(--sk-text-muted); @@ -70,18 +64,6 @@ const SliderValue = styled.span` flex: 0 0 auto; `; -const TooltipContent = styled(Tooltip.Content)` - background: var(--sk-panel-bg); - border: 1px solid var(--sk-border); - border-radius: 6px; - padding: 8px 12px; - box-shadow: 0 4px 12px var(--sk-shadow); - font-size: 11px; - z-index: 1000; - max-width: 250px; - color: var(--sk-text); -`; - const SliderInput = styled.input` width: 100%; pointer-events: auto; @@ -101,6 +83,33 @@ const SliderMarks = styled.div` font-variant-numeric: tabular-nums; `; +const ControlsToggleBar = styled.button` + display: flex; + align-items: center; + gap: 6px; + width: 100%; + padding: 6px 0; + background: none; + border: none; + border-top: 1px solid var(--sk-border); + color: var(--sk-text-muted); + font-size: 11px; + cursor: pointer; + font-family: inherit; + text-align: left; + border-radius: 0; + + &:hover { + color: var(--sk-text); + } +`; + +const ChevronSvg = styled.svg<{ expanded: boolean }>` + transition: transform 0.15s ease; + transform: rotate(${(props) => (props.expanded ? '90deg' : '0deg')}); + flex-shrink: 0; +`; + interface ConfigurableNodeData { label: string; kind: string; @@ -126,14 +135,14 @@ interface NumericSliderControlProps { nodeId: string; sessionId?: string; paramKey: string; + /** Dot-notation path for reading/writing nested params. Defaults to `paramKey`. */ + path?: string; schema: JsonSchemaProperty; min: number; max: number; step: number; params: Record; onParamChange?: (nodeId: string, paramName: string, value: unknown) => void; - showLiveIndicator?: boolean; - isTunable: boolean; } // Helper: Compute fallback value for slider @@ -187,20 +196,23 @@ function formatMinMaxLabels( }; } +// --------------------------------------------------------------------------- +// Numeric slider control +// --------------------------------------------------------------------------- + const NumericSliderControl: React.FC = ({ nodeId, sessionId, paramKey, + path: pathOverride, schema, min, max, step, params, onParamChange, - showLiveIndicator = false, - isTunable, }) => { - const baseParam = params?.[paramKey]; + const baseParam = readByPath(params as Record, pathOverride ?? paramKey); const defaultValue = schema?.default; const fallback = computeFallbackValue(defaultValue, baseParam, min, max); @@ -211,6 +223,7 @@ const NumericSliderControl: React.FC = ({ nodeId, sessionId, paramKey, + path: pathOverride, min, max, step, @@ -228,29 +241,11 @@ const NumericSliderControl: React.FC = ({ return ( - - {paramKey} - {showLiveIndicator && isTunable && ( - - - - - - LIVE - - - - - Changes apply immediately to the running pipeline - - - - - - )} + + {paramKey} {formattedValue} - - {schema?.description && {schema.description}} + + {schema?.description && {schema.description}} = React.memo(function Co const properties = schema?.properties ?? {}; const totalParams = Object.keys(properties).length; - const sliderConfigs = extractSliderConfigs(schema); + const sliderConfigs = useMemo(() => extractSliderConfigs(schema), [schema]); + const toggleConfigs = useMemo(() => extractToggleConfigs(schema), [schema]); + const textConfigs = useMemo(() => extractTextConfigs(schema), [schema]); + const controlCount = toggleConfigs.length + sliderConfigs.length + textConfigs.length; + const hasControls = controlCount > 0; // Detect bidirectional nodes using the bidirectional property from node definition const isBidirectional = data.definition?.bidirectional ?? false; @@ -297,6 +296,15 @@ const ConfigurableNode: React.FC = React.memo(function Co // This prevents the LIVE badge from showing in design view (which has no sessionId) const showLiveIndicator = !!data.onParamChange && !!data.sessionId; + const [controlsExpanded, setControlsExpanded] = useState(() => expandedState.get(id) ?? false); + const toggleExpanded = useCallback(() => { + setControlsExpanded((prev) => { + const next = !prev; + expandedState.set(id, next); + return next; + }); + }, [id]); + const content = ( = React.memo(function Co sessionId={data.sessionId} isBidirectional={isBidirectional} > - {sliderConfigs.length > 0 && ( - - {sliderConfigs.map(({ key, schema: schemaProp, min, max, step, tunable }) => ( - - ))} - + {hasControls && ( + <> + + + + + + {controlCount} control{controlCount !== 1 ? 's' : ''} + + {showLiveIndicator && } + + {controlsExpanded && ( + + {toggleConfigs.map((config) => ( + + ))} + {sliderConfigs.map(({ key, path, schema: schemaProp, min, max, step }) => ( + + ))} + {textConfigs.map((config) => ( + + ))} + + )} + )} {totalParams > 0 ? ( diff --git a/ui/src/nodes/SchemaControls.tsx b/ui/src/nodes/SchemaControls.tsx new file mode 100644 index 00000000..f8508e57 --- /dev/null +++ b/ui/src/nodes/SchemaControls.tsx @@ -0,0 +1,322 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * Schema-driven controls for Monitor view node cards. + * + * These components render toggle switches and text inputs directly on + * ConfigurableNode cards for parameters marked `tunable: true` in the + * node's `param_schema`. They complement the existing NumericSliderControl + * (which handles number/integer params) and use the same `TuneNodeAsync` + * wire protocol via `useTuneNode`. + */ + +import styled from '@emotion/styled'; +import { useAtomValue } from 'jotai/react'; +import React, { useCallback, useEffect, useRef, useState } from 'react'; + +import { TEXT_DEBOUNCE_MS } from '@/constants/timing'; +import { useTuneNode } from '@/hooks/useTuneNode'; +import { nodeParamsAtom } from '@/stores/sessionAtoms'; +import { buildParamUpdate, readByPath } from '@/utils/controlProps'; +import type { ToggleConfig, TextConfig } from '@/utils/jsonSchema'; + +// --------------------------------------------------------------------------- +// Styled components for toggle/text controls +// --------------------------------------------------------------------------- + +export const ControlLabel = styled.div` + display: flex; + justify-content: space-between; + align-items: center; + gap: 6px; + font-size: 12px; + font-weight: 600; + color: var(--sk-text); +`; + +export const ControlLabelText = styled.span` + flex: 0 0 auto; +`; + +export const ControlDescription = styled.div` + font-size: 11px; + color: var(--sk-text-muted); +`; + +// --------------------------------------------------------------------------- +// Toggle control styled components +// --------------------------------------------------------------------------- + +const ToggleRow = styled.div` + display: flex; + align-items: center; + justify-content: space-between; + gap: 8px; + padding: 4px 0; +`; + +const ToggleLabel = styled.span` + font-size: 12px; + font-weight: 600; + color: var(--sk-text); +`; + +const ToggleTrack = styled.button<{ checked: boolean }>` + position: relative; + width: 36px; + height: 20px; + border-radius: 10px; + border: 1px solid ${(props) => (props.checked ? 'var(--sk-primary)' : 'var(--sk-border)')}; + background: ${(props) => (props.checked ? 'var(--sk-primary)' : 'var(--sk-bg)')}; + cursor: pointer; + padding: 0; + transition: + background 0.15s, + border-color 0.15s; + flex-shrink: 0; + + &::after { + content: ''; + position: absolute; + top: 2px; + left: ${(props) => (props.checked ? '17px' : '2px')}; + width: 14px; + height: 14px; + border-radius: 50%; + background: ${(props) => + props.checked ? 'var(--sk-primary-contrast)' : 'var(--sk-text-muted)'}; + transition: left 0.15s; + } +`; + +// --------------------------------------------------------------------------- +// Text input control styled components +// --------------------------------------------------------------------------- + +const TextInputWrapper = styled.div` + display: flex; + flex-direction: column; + gap: 4px; + padding: 4px 0; +`; + +const CompactTextInput = styled.input` + width: 100%; + padding: 4px 8px; + font-size: 12px; + background: var(--sk-bg); + color: var(--sk-text); + border: 1px solid var(--sk-border); + border-radius: 4px; + font-family: inherit; + pointer-events: auto; + box-sizing: border-box; + + &:focus { + outline: none; + border-color: var(--sk-primary); + } + + &::placeholder { + color: var(--sk-text-muted); + } + + &:disabled { + cursor: not-allowed; + opacity: 0.5; + } +`; + +// --------------------------------------------------------------------------- +// Boolean toggle control +// --------------------------------------------------------------------------- + +interface BooleanToggleControlProps { + nodeId: string; + sessionId?: string; + config: ToggleConfig; + params: Record; +} + +export const BooleanToggleControl: React.FC = ({ + nodeId, + sessionId, + config, + params, +}) => { + const { tuneNodeConfig } = useTuneNode(sessionId ?? null); + + // Read from atom for live sync + const paramsKey = sessionId ? `${sessionId}\0${nodeId}` : nodeId; + const nodeParams = useAtomValue(nodeParamsAtom(paramsKey)); + + // Effective value: atom > props > default + const effectiveValue = (() => { + const stored = readByPath(nodeParams, config.path); + if (typeof stored === 'boolean') return stored; + const prop = readByPath(params as Record, config.path); + if (typeof prop === 'boolean') return prop; + if (typeof config.schema.default === 'boolean') return config.schema.default; + return false; + })(); + + const [checked, setChecked] = useState(effectiveValue); + + // Sync with external changes + useEffect(() => { + setChecked(effectiveValue); + }, [effectiveValue]); + + // Ref pattern: keep tuneNodeConfig ref stable so toggle handler identity + // doesn't change when sessionId (rarely) changes. + const tuneRef = useRef(tuneNodeConfig); + useEffect(() => { + tuneRef.current = tuneNodeConfig; + }, [tuneNodeConfig]); + + // Ref tracks latest checked to avoid stale closures if two clicks + // fire before React re-renders. + const checkedRef = useRef(checked); + checkedRef.current = checked; + + const handleToggle = useCallback(() => { + const next = !checkedRef.current; + checkedRef.current = next; + setChecked(next); + tuneRef.current(nodeId, buildParamUpdate(config.path, next)); + }, [nodeId, config.path]); + + const disabled = !sessionId; + + return ( + + {config.key} + + + ); +}; + +// --------------------------------------------------------------------------- +// Text input control +// --------------------------------------------------------------------------- + +interface TextInputControlProps { + nodeId: string; + sessionId?: string; + config: TextConfig; + params: Record; +} + +export const TextInputControl: React.FC = ({ + nodeId, + sessionId, + config, + params, +}) => { + const { tuneNodeConfig } = useTuneNode(sessionId ?? null); + + // Read from atom for live sync + const paramsKey = sessionId ? `${sessionId}\0${nodeId}` : nodeId; + const nodeParams = useAtomValue(nodeParamsAtom(paramsKey)); + + // Effective value: atom > props > default + const effectiveValue = (() => { + const stored = readByPath(nodeParams, config.path); + if (typeof stored === 'string') return stored; + const prop = readByPath(params as Record, config.path); + if (typeof prop === 'string') return prop; + if (typeof config.schema.default === 'string') return config.schema.default; + return ''; + })(); + + const [text, setText] = useState(effectiveValue); + // Ref tracks latest text for flushDebounce so its identity stays + // stable across keystrokes (no `text` in useCallback deps). + const textRef = useRef(text); + textRef.current = text; + + // Sync with external changes when not actively editing + const isEditingRef = useRef(false); + useEffect(() => { + if (!isEditingRef.current) { + setText(effectiveValue); + } + }, [effectiveValue]); + + // Ref pattern: keep tuneNodeConfig ref stable for the debounce closure. + const tuneRef = useRef(tuneNodeConfig); + useEffect(() => { + tuneRef.current = tuneNodeConfig; + }, [tuneNodeConfig]); + + const timerRef = useRef>(undefined); + + const debouncedSend = useCallback( + (value: string) => { + clearTimeout(timerRef.current); + timerRef.current = setTimeout(() => { + timerRef.current = undefined; + tuneRef.current(nodeId, buildParamUpdate(config.path, value)); + isEditingRef.current = false; + }, TEXT_DEBOUNCE_MS); + }, + [nodeId, config.path] + ); + + // Flush any pending debounce on blur/unmount so the last typed value + // is sent rather than silently dropped. Reads from textRef so the + // callback identity doesn't change on every keystroke. + const flushDebounce = useCallback(() => { + if (timerRef.current !== undefined) { + clearTimeout(timerRef.current); + timerRef.current = undefined; + tuneRef.current(nodeId, buildParamUpdate(config.path, textRef.current)); + isEditingRef.current = false; + } + }, [nodeId, config.path]); + + useEffect(() => () => flushDebounce(), [flushDebounce]); + + const handleChange = useCallback( + (e: React.ChangeEvent) => { + isEditingRef.current = true; + const value = e.target.value; + setText(value); + debouncedSend(value); + }, + [debouncedSend] + ); + + const disabled = !sessionId; + + return ( + + + {config.key} + + {config.schema.description && ( + {config.schema.description} + )} + + + ); +}; diff --git a/ui/src/panes/InspectorPane.tsx b/ui/src/panes/InspectorPane.tsx index 738965bd..03f07983 100644 --- a/ui/src/panes/InspectorPane.tsx +++ b/ui/src/panes/InspectorPane.tsx @@ -11,6 +11,8 @@ import { SKTooltip } from '@/components/Tooltip'; import { CheckboxWithLabel } from '@/components/ui/Checkbox'; import { nodeParamsAtom } from '@/stores/sessionAtoms'; import type { NodeDefinition, InputPin, OutputPin, PacketType } from '@/types/types'; +import { readByPath } from '@/utils/controlProps'; +import type { JsonSchema, JsonSchemaProperty } from '@/utils/jsonSchema'; import { formatPacketType, getPacketTypeColor, @@ -19,22 +21,6 @@ import { getPinCardinalityDescription, } from '@/utils/packetTypes'; -interface JsonSchemaProperty { - type?: string; - description?: string; - default?: unknown; - minimum?: number; - maximum?: number; - exclusiveMinimum?: number; - exclusiveMaximum?: number; - multipleOf?: number; - tunable?: boolean; -} - -interface JsonSchema { - properties?: Record; -} - const PaneWrapper = styled.div` display: flex; flex-direction: column; @@ -274,14 +260,24 @@ const InspectorPane: React.FC = ({ const paramsKey = node.data.sessionId ? `${node.data.sessionId}\0${node.id}` : node.id; const nodeParams = useAtomValue(nodeParamsAtom(paramsKey)); - const handleInputChange = (key: string, value: unknown) => { - onParamChange(node.id, key, value); + const handleInputChange = (key: string, value: unknown, schema?: JsonSchemaProperty) => { + // When a schema has a `path` override, build the nested payload and + // route it through onParamChange so it works in both Monitor view + // (where onParamChange → tuneNode) and design view (where + // onParamChange → onConfigChange). The path is passed as the + // paramName so the caller can build the correct UpdateParams. + if (schema?.path) { + onParamChange(node.id, schema.path, value); + } else { + onParamChange(node.id, key, value); + } }; const renderField = (key: string, schema: JsonSchemaProperty) => { + const readPath = schema.path ?? key; const currentValue = - (nodeParams as Record)[key] ?? - node.data.params?.[key] ?? + readByPath(nodeParams as Record, readPath) ?? + readByPath((node.data.params ?? {}) as Record, readPath) ?? schema.default ?? ''; const inputId = `param-${node.id}-${key}`; @@ -297,7 +293,7 @@ const InspectorPane: React.FC = ({ schema={schema} paramKey={key} readOnly={isDisabled} - onChange={(v) => handleInputChange(key, v)} + onChange={(v) => handleInputChange(key, v, schema)} /> ); case 'number': @@ -308,7 +304,7 @@ const InspectorPane: React.FC = ({ value={currentValue} schema={schema} readOnly={isDisabled} - onChange={(v) => handleInputChange(key, v)} + onChange={(v) => handleInputChange(key, v, schema)} /> ); case 'boolean': @@ -318,7 +314,7 @@ const InspectorPane: React.FC = ({ value={currentValue} schema={schema} readOnly={isDisabled} - onChange={(v) => handleInputChange(key, v)} + onChange={(v) => handleInputChange(key, v, schema)} /> ); default: @@ -328,7 +324,7 @@ const InspectorPane: React.FC = ({ value={currentValue} schema={schema} readOnly={isDisabled} - onChange={(v) => handleInputChange(key, v)} + onChange={(v) => handleInputChange(key, v, schema)} /> ); } diff --git a/ui/src/services/websocket.ts b/ui/src/services/websocket.ts index 75a2873e..a62aa1fc 100644 --- a/ui/src/services/websocket.ts +++ b/ui/src/services/websocket.ts @@ -36,6 +36,7 @@ type ConnectionAddedPayload = Extract; type NodeTelemetryPayload = Extract; type NodeViewDataUpdatedPayload = Extract; +type RuntimeSchemasUpdatedPayload = Extract; interface PendingRequest { resolve: (response: Response) => void; @@ -247,6 +248,9 @@ export class WebSocketService { case 'nodeviewdataupdated': this.handleNodeViewDataUpdated(payload); break; + case 'runtimeschemasupdated': + this.handleRuntimeSchemasUpdated(payload); + break; default: break; } @@ -396,6 +400,11 @@ export class WebSocketService { useSessionStore.getState().updateNodeViewData(session_id, node_id, data); } + private handleRuntimeSchemasUpdated(payload: RuntimeSchemasUpdatedPayload): void { + const { session_id, node_id, schema } = payload; + useSessionStore.getState().updateRuntimeSchema(session_id, node_id, schema); + } + private handleNodeTelemetry(payload: NodeTelemetryPayload): void { const telemetryEvent = parseTelemetryEvent({ session_id: payload.session_id, diff --git a/ui/src/stores/sessionAtoms.ts b/ui/src/stores/sessionAtoms.ts index dcc43585..7ec3c22d 100644 --- a/ui/src/stores/sessionAtoms.ts +++ b/ui/src/stores/sessionAtoms.ts @@ -18,6 +18,7 @@ import { atom, getDefaultStore } from 'jotai'; import { atomFamily } from 'jotai-family'; import type { NodeState, NodeStats, Pipeline } from '@/types/types'; +import { deepMerge } from '@/utils/controlProps'; // ── Default store reference ───────────────────────────────────────────────── @@ -49,7 +50,10 @@ export const nodeViewDataAtom = atomFamily((_key: string) => atom(undef /** Per-node params atom -- stores the full Record for a node. */ export const nodeParamsAtom = atomFamily((_key: string) => atom>({})); -/** Write a single node param to the Jotai atom. */ +/** Write a single flat-key node param to the Jotai atom. + * This performs a shallow merge — suitable for top-level scalar keys only + * (e.g. `gain_db`). For nested/dot-path updates, use `writeNodeParams` + * which deep-merges to preserve sibling properties. */ export function writeNodeParam( nodeId: string, key: string, @@ -77,7 +81,7 @@ export function writeNodeParams( cleaned[key] = value; } } - sessionStore.set(nodeParamsAtom(k), { ...current, ...cleaned }); + sessionStore.set(nodeParamsAtom(k), deepMerge(current, cleaned)); } /** Clear node params atom for a specific node. */ diff --git a/ui/src/stores/sessionStore.ts b/ui/src/stores/sessionStore.ts index c1853d8e..f0024d78 100644 --- a/ui/src/stores/sessionStore.ts +++ b/ui/src/stores/sessionStore.ts @@ -21,6 +21,7 @@ interface SessionStore { updateNodeState: (sessionId: string, nodeId: string, state: NodeState) => void; updateNodeStats: (sessionId: string, nodeId: string, stats: NodeStats) => void; updateNodeViewData: (sessionId: string, nodeId: string, data: unknown) => void; + updateRuntimeSchema: (sessionId: string, nodeId: string, schema: unknown) => void; setPipeline: (sessionId: string, pipeline: Pipeline) => void; updateNodeParams: (sessionId: string, nodeId: string, params: Record) => void; addNode: ( @@ -92,6 +93,22 @@ export const useSessionStore = create((set, get) => ({ return { sessions: newSessions }; }), + updateRuntimeSchema: (sessionId, nodeId, schema) => + set((prev) => { + const session = prev.sessions.get(sessionId); + if (!session || !session.pipeline) return prev; + + const existing = session.pipeline.runtime_schemas ?? {}; + const updatedPipeline: Pipeline = { + ...session.pipeline, + runtime_schemas: { ...existing, [nodeId]: schema }, + }; + + const newSessions = new Map(prev.sessions); + newSessions.set(sessionId, { ...session, pipeline: updatedPipeline }); + return { sessions: newSessions }; + }), + setPipeline: (sessionId, pipeline) => set((prev) => { const session = prev.sessions.get(sessionId); diff --git a/ui/src/types/generated/api-types.ts b/ui/src/types/generated/api-types.ts index 7a58734e..5c17c28d 100644 --- a/ui/src/types/generated/api-types.ts +++ b/ui/src/types/generated/api-types.ts @@ -327,7 +327,7 @@ timestamp_us: bigint | null, /** * RFC 3339 formatted timestamp for convenience */ -timestamp: string, }; +timestamp: string, } | { "event": "runtimeschemasupdated", session_id: string, node_id: string, schema: JsonValue, }; export type SessionInfo = { id: string, name: string | null, /** @@ -361,7 +361,13 @@ client?: ClientSection | null, nodes: Record, connections: Array | null, }; +view_data?: Record | null, +/** + * Per-instance runtime param schema overrides discovered after node + * initialization. Only populated in API responses for nodes whose + * `ProcessorNode::runtime_param_schema()` returned `Some`. + */ +runtime_schemas?: Record | null, }; export type SamplePipeline = { id: string, name: string, description: string, yaml: string, is_system: boolean, mode: string, /** @@ -656,7 +662,11 @@ property: string, */ group: string | null, /** - * Default value sent on first render / reset. + * Initial value for the UI widget. This is a **UI-only hint** — it + * seeds the local component state but is *not* sent to the server on + * mount. Pipeline authors should ensure defaults here match the + * node's own initial params to avoid a visual desync before the first + * user interaction. */ default: unknown, /** diff --git a/ui/src/utils/controlProps.test.ts b/ui/src/utils/controlProps.test.ts index 899f3375..068056d0 100644 --- a/ui/src/utils/controlProps.test.ts +++ b/ui/src/utils/controlProps.test.ts @@ -2,9 +2,9 @@ // // SPDX-License-Identifier: MPL-2.0 -import { describe, it, expect } from 'vitest'; +import { describe, it, expect, vi } from 'vitest'; -import { buildParamUpdate, deepMerge } from './controlProps'; +import { buildParamUpdate, deepMerge, dispatchParamUpdate, readByPath } from './controlProps'; describe('buildParamUpdate', () => { it('wraps a single-segment path as a flat key', () => { @@ -102,3 +102,62 @@ describe('deepMerge', () => { expect(target).toEqual({ properties: { score: 1 } }); }); }); + +describe('readByPath', () => { + it('reads a flat key', () => { + expect(readByPath({ gain_db: 1.5 }, 'gain_db')).toBe(1.5); + }); + + it('reads a two-segment nested path', () => { + expect(readByPath({ properties: { show: true } }, 'properties.show')).toBe(true); + }); + + it('reads a three-segment nested path', () => { + expect(readByPath({ a: { b: { c: 42 } } }, 'a.b.c')).toBe(42); + }); + + it('returns undefined for missing keys', () => { + expect(readByPath({}, 'missing')).toBeUndefined(); + expect(readByPath({}, 'a.b.c')).toBeUndefined(); + }); + + it('returns undefined when traversing through a non-object', () => { + expect(readByPath({ a: 'string' }, 'a.b')).toBeUndefined(); + expect(readByPath({ a: null }, 'a.b')).toBeUndefined(); + }); + + it('handles various value types', () => { + expect(readByPath({ key: false }, 'key')).toBe(false); + expect(readByPath({ key: 0 }, 'key')).toBe(0); + expect(readByPath({ key: '' }, 'key')).toBe(''); + }); + + it('is the inverse of buildParamUpdate for reading back', () => { + const update = buildParamUpdate('properties.home_score', 4); + expect(readByPath(update, 'properties.home_score')).toBe(4); + }); +}); + +describe('dispatchParamUpdate', () => { + it('routes flat keys through onFlat', () => { + const onFlat = vi.fn(); + const onNested = vi.fn(); + dispatchParamUpdate('node1', 'gain_db', 1.5, onFlat, onNested); + expect(onFlat).toHaveBeenCalledWith('node1', 'gain_db', 1.5); + expect(onNested).not.toHaveBeenCalled(); + }); + + it('routes dot-notation paths through onNested with buildParamUpdate result', () => { + const onFlat = vi.fn(); + const onNested = vi.fn(); + dispatchParamUpdate('node1', 'properties.show', true, onFlat, onNested); + expect(onNested).toHaveBeenCalledWith('node1', { properties: { show: true } }); + expect(onFlat).not.toHaveBeenCalled(); + }); + + it('handles multi-segment dot paths', () => { + const onNested = vi.fn(); + dispatchParamUpdate('node1', 'a.b.c', 42, vi.fn(), onNested); + expect(onNested).toHaveBeenCalledWith('node1', { a: { b: { c: 42 } } }); + }); +}); diff --git a/ui/src/utils/controlProps.ts b/ui/src/utils/controlProps.ts index ece8c618..62eb3a65 100644 --- a/ui/src/utils/controlProps.ts +++ b/ui/src/utils/controlProps.ts @@ -29,6 +29,52 @@ export function buildParamUpdate(path: string, value: unknown): Record, path: string): unknown { + const parts = path.split('.').filter(Boolean); + let current: unknown = obj; + for (const part of parts) { + if (current == null || typeof current !== 'object') return undefined; + current = (current as Record)[part]; + } + return current; +} + +/** + * Dispatches a param update through the correct handler based on whether + * the param name is a flat key or a dot-notation path. + * + * This centralises the `if (name.includes('.'))` branching that otherwise + * appears in every call-site (MonitorView, usePipeline, etc.). + * + * - **Flat keys** (e.g. `"gain_db"`) → `onFlat(nodeId, key, value)` + * - **Dot-paths** (e.g. `"properties.show"`) → `onNested(nodeId, partialConfig)` + * where `partialConfig` is produced by `buildParamUpdate`. + */ +export function dispatchParamUpdate( + nodeId: string, + paramName: string, + value: unknown, + onFlat: (nodeId: string, key: string, value: unknown) => void, + onNested: (nodeId: string, config: Record) => void +): void { + if (paramName.includes('.')) { + onNested(nodeId, buildParamUpdate(paramName, value)); + } else { + onFlat(nodeId, paramName, value); + } +} + function isPlainObject(v: unknown): v is Record { return typeof v === 'object' && v !== null && !Array.isArray(v); } diff --git a/ui/src/utils/jsonSchema.test.ts b/ui/src/utils/jsonSchema.test.ts new file mode 100644 index 00000000..f2954b47 --- /dev/null +++ b/ui/src/utils/jsonSchema.test.ts @@ -0,0 +1,466 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +import { describe, it, expect } from 'vitest'; + +import { + extractSliderConfigs, + extractToggleConfigs, + extractTextConfigs, + deepMergeSchemas, + schemaToControlConfigs, +} from './jsonSchema'; + +describe('extractToggleConfigs', () => { + it('returns boolean + tunable properties', () => { + const result = extractToggleConfigs({ + properties: { + show: { type: 'boolean', tunable: true, description: 'Show overlay' }, + }, + }); + expect(result).toEqual([ + { + key: 'show', + path: 'show', + schema: { type: 'boolean', tunable: true, description: 'Show overlay' }, + }, + ]); + }); + + it('excludes boolean properties without tunable', () => { + const result = extractToggleConfigs({ + properties: { + enabled: { type: 'boolean' }, + }, + }); + expect(result).toEqual([]); + }); + + it('excludes non-boolean tunable properties', () => { + const result = extractToggleConfigs({ + properties: { + gain: { type: 'number', tunable: true }, + name: { type: 'string', tunable: true }, + }, + }); + expect(result).toEqual([]); + }); + + it('uses schema path when provided', () => { + const result = extractToggleConfigs({ + properties: { + show: { + type: 'boolean', + tunable: true, + path: 'properties.show', + }, + }, + }); + expect(result).toHaveLength(1); + expect(result[0].key).toBe('show'); + expect(result[0].path).toBe('properties.show'); + }); + + it('defaults path to key when not specified', () => { + const result = extractToggleConfigs({ + properties: { + mute: { type: 'boolean', tunable: true }, + }, + }); + expect(result[0].path).toBe('mute'); + }); + + it('returns empty array for undefined schema', () => { + expect(extractToggleConfigs(undefined)).toEqual([]); + }); + + it('returns empty array for schema without properties', () => { + expect(extractToggleConfigs({})).toEqual([]); + }); +}); + +describe('extractTextConfigs', () => { + it('returns string + tunable properties', () => { + const result = extractTextConfigs({ + properties: { + name: { type: 'string', tunable: true, description: 'Player name' }, + }, + }); + expect(result).toEqual([ + { + key: 'name', + path: 'name', + schema: { type: 'string', tunable: true, description: 'Player name' }, + }, + ]); + }); + + it('excludes string properties without tunable', () => { + const result = extractTextConfigs({ + properties: { + label: { type: 'string' }, + }, + }); + expect(result).toEqual([]); + }); + + it('excludes non-string tunable properties', () => { + const result = extractTextConfigs({ + properties: { + gain: { type: 'number', tunable: true }, + show: { type: 'boolean', tunable: true }, + }, + }); + expect(result).toEqual([]); + }); + + it('uses schema path when provided', () => { + const result = extractTextConfigs({ + properties: { + name: { + type: 'string', + tunable: true, + path: 'properties.name', + }, + }, + }); + expect(result).toHaveLength(1); + expect(result[0].key).toBe('name'); + expect(result[0].path).toBe('properties.name'); + }); + + it('defaults path to key when not specified', () => { + const result = extractTextConfigs({ + properties: { + title: { type: 'string', tunable: true }, + }, + }); + expect(result[0].path).toBe('title'); + }); + + it('returns empty array for undefined schema', () => { + expect(extractTextConfigs(undefined)).toEqual([]); + }); + + it('excludes enum-constrained string properties', () => { + const result = extractTextConfigs({ + properties: { + mode: { + type: 'string', + tunable: true, + enum: ['fast', 'balanced', 'quality'], + }, + }, + }); + expect(result).toEqual([]); + }); + + it('includes string properties with empty enum array', () => { + const result = extractTextConfigs({ + properties: { + label: { type: 'string', tunable: true, enum: [] }, + }, + }); + expect(result).toHaveLength(1); + expect(result[0].key).toBe('label'); + }); +}); + +describe('extractSliderConfigs — path field', () => { + it('defaults path to key when not specified', () => { + const result = extractSliderConfigs({ + properties: { + gain_db: { + type: 'number', + tunable: true, + minimum: -60, + maximum: 12, + }, + }, + }); + expect(result).toHaveLength(1); + expect(result[0].key).toBe('gain_db'); + expect(result[0].path).toBe('gain_db'); + }); + + it('uses schema path when provided', () => { + const result = extractSliderConfigs({ + properties: { + score: { + type: 'integer', + tunable: true, + minimum: 0, + maximum: 99, + path: 'properties.score', + }, + }, + }); + expect(result).toHaveLength(1); + expect(result[0].key).toBe('score'); + expect(result[0].path).toBe('properties.score'); + }); +}); + +describe('mixed schema extraction', () => { + const schema = { + properties: { + show: { type: 'boolean', tunable: true, default: true }, + name: { type: 'string', tunable: true, default: 'Player' }, + score: { + type: 'integer', + tunable: true, + minimum: 0, + maximum: 99, + default: 0, + }, + codec: { type: 'string', tunable: false }, + internal_flag: { type: 'boolean' }, + }, + }; + + it('extracts only boolean tunables as toggles', () => { + const toggles = extractToggleConfigs(schema); + expect(toggles).toHaveLength(1); + expect(toggles[0].key).toBe('show'); + }); + + it('extracts only string tunables as text inputs', () => { + const texts = extractTextConfigs(schema); + expect(texts).toHaveLength(1); + expect(texts[0].key).toBe('name'); + }); + + it('extracts only numeric tunables with bounds as sliders', () => { + const sliders = extractSliderConfigs(schema); + expect(sliders).toHaveLength(1); + expect(sliders[0].key).toBe('score'); + }); +}); + +// --------------------------------------------------------------------------- +// deepMergeSchemas +// --------------------------------------------------------------------------- + +describe('deepMergeSchemas', () => { + it('returns empty object when both are undefined', () => { + expect(deepMergeSchemas(undefined, undefined)).toEqual({}); + }); + + it('returns base when runtime is undefined', () => { + const base = { properties: { gain: { type: 'number', tunable: true } } }; + expect(deepMergeSchemas(base, undefined)).toEqual(base); + }); + + it('returns runtime when base is undefined', () => { + const runtime = { properties: { show: { type: 'boolean', tunable: true } } }; + expect(deepMergeSchemas(undefined, runtime)).toEqual(runtime); + }); + + it('preserves base properties not in runtime', () => { + const base = { + properties: { + fps: { type: 'integer', default: 30 }, + width: { type: 'integer', default: 640 }, + }, + }; + const runtime = { + properties: { + show: { type: 'boolean', tunable: true, path: 'properties.show' }, + }, + }; + const merged = deepMergeSchemas(base, runtime); + expect(merged.properties).toHaveProperty('fps'); + expect(merged.properties).toHaveProperty('width'); + expect(merged.properties).toHaveProperty('show'); + }); + + it('runtime properties override base properties with same key', () => { + const base = { + properties: { + show: { type: 'boolean', default: false }, + }, + }; + const runtime = { + properties: { + show: { type: 'boolean', tunable: true, path: 'properties.show' }, + }, + }; + const merged = deepMergeSchemas(base, runtime); + // Runtime fields win, but base-only fields (default) are preserved. + expect(merged.properties?.show).toEqual({ + type: 'boolean', + default: false, + tunable: true, + path: 'properties.show', + }); + }); + + it('preserves base minimum/maximum when runtime only adds tunable + path', () => { + const base = { + properties: { + score: { type: 'integer', minimum: 0, maximum: 99, default: 0 }, + }, + }; + const runtime = { + properties: { + score: { type: 'integer', tunable: true, path: 'properties.score' }, + }, + }; + const merged = deepMergeSchemas(base, runtime); + expect(merged.properties?.score).toEqual({ + type: 'integer', + minimum: 0, + maximum: 99, + default: 0, + tunable: true, + path: 'properties.score', + }); + }); + + it('merged schema works with extractors', () => { + const base = { + properties: { + fps: { type: 'integer', default: 30 }, + }, + }; + const runtime = { + properties: { + show: { type: 'boolean', tunable: true, path: 'properties.show' }, + score: { + type: 'number', + tunable: true, + minimum: 0, + maximum: 99, + path: 'properties.score', + }, + name: { type: 'string', tunable: true, path: 'properties.name' }, + }, + }; + const merged = deepMergeSchemas(base, runtime); + + expect(extractToggleConfigs(merged)).toHaveLength(1); + expect(extractToggleConfigs(merged)[0].path).toBe('properties.show'); + + expect(extractSliderConfigs(merged)).toHaveLength(1); + expect(extractSliderConfigs(merged)[0].path).toBe('properties.score'); + + expect(extractTextConfigs(merged)).toHaveLength(1); + expect(extractTextConfigs(merged)[0].path).toBe('properties.name'); + }); +}); + +describe('schemaToControlConfigs', () => { + it('converts boolean tunable properties to toggle ControlConfigs', () => { + const result = schemaToControlConfigs('scoreboard', { + properties: { + clock_running: { + type: 'boolean', + tunable: true, + path: 'properties.clock_running', + default: true, + }, + }, + }); + expect(result).toHaveLength(1); + expect(result[0]).toMatchObject({ + label: 'Clock Running', + type: 'toggle', + node: 'scoreboard', + property: 'properties.clock_running', + default: true, + }); + }); + + it('converts number tunable properties to number ControlConfigs', () => { + const result = schemaToControlConfigs('scoreboard', { + properties: { + home_score: { + type: 'number', + tunable: true, + path: 'properties.home_score', + minimum: 0, + maximum: 99, + default: 0, + }, + }, + }); + expect(result).toHaveLength(1); + expect(result[0]).toMatchObject({ + label: 'Home Score', + type: 'number', + node: 'scoreboard', + property: 'properties.home_score', + min: 0, + max: 99, + default: 0, + }); + }); + + it('converts string tunable properties to text ControlConfigs', () => { + const result = schemaToControlConfigs('scoreboard', { + properties: { + home_team: { + type: 'string', + tunable: true, + path: 'properties.home_team', + default: 'HOME', + }, + }, + }); + expect(result).toHaveLength(1); + expect(result[0]).toMatchObject({ + label: 'Home Team', + type: 'text', + node: 'scoreboard', + property: 'properties.home_team', + default: 'HOME', + }); + }); + + it('skips non-tunable properties', () => { + const result = schemaToControlConfigs('node', { + properties: { + fps: { type: 'number', default: 30 }, + width: { type: 'integer', default: 420 }, + }, + }); + expect(result).toEqual([]); + }); + + it('skips enum-constrained strings', () => { + const result = schemaToControlConfigs('node', { + properties: { + mode: { type: 'string', tunable: true, enum: ['fast', 'slow'] }, + }, + }); + expect(result).toEqual([]); + }); + + it('assigns group label when provided', () => { + const result = schemaToControlConfigs( + 'scoreboard', + { + properties: { + show: { type: 'boolean', tunable: true, path: 'properties.show' }, + }, + }, + 'Scoreboard' + ); + expect(result[0].group).toBe('Scoreboard'); + }); + + it('derives label from snake_case and kebab-case keys', () => { + const result = schemaToControlConfigs('node', { + properties: { + clock_running: { type: 'boolean', tunable: true }, + 'font-size': { type: 'number', tunable: true, minimum: 8, maximum: 72 }, + }, + }); + expect(result.map((c) => c.label)).toEqual(['Clock Running', 'Font Size']); + }); + + it('returns empty array for undefined schema', () => { + expect(schemaToControlConfigs('node', undefined)).toEqual([]); + }); +}); diff --git a/ui/src/utils/jsonSchema.ts b/ui/src/utils/jsonSchema.ts index b5c1504e..000c6fa0 100644 --- a/ui/src/utils/jsonSchema.ts +++ b/ui/src/utils/jsonSchema.ts @@ -7,6 +7,8 @@ * These functions help extract slider configurations from JSON schemas. */ +import type { ControlConfig } from '@/types/types'; + export interface JsonSchemaProperty { type?: string; description?: string; @@ -22,6 +24,14 @@ export interface JsonSchemaProperty { * If true, the parameter supports live updates via UpdateParams messages. */ tunable?: boolean; + /** + * Override the UpdateParams key path (dot-notation). + * Defaults to the property key when omitted. + * Example: `"path": "properties.show"` sends `{ properties: { show: value } }`. + */ + path?: string; + /** Enum values for select/dropdown controls. */ + enum?: unknown[]; } export interface JsonSchema { @@ -30,6 +40,8 @@ export interface JsonSchema { export interface SliderConfig { key: string; + /** Dot-notation path for UpdateParams. Defaults to `key`. */ + path: string; schema: JsonSchemaProperty; min: number; max: number; @@ -142,6 +154,7 @@ export const extractSliderConfigs = (schema: JsonSchema | undefined): SliderConf const step = inferStep(schemaProp, min, max); acc.push({ key, + path: schemaProp.path ?? key, schema: schemaProp, min, max, @@ -207,3 +220,205 @@ export const validateValue = (value: unknown, schema: JsonSchemaProperty): strin return null; // Valid }; + +// --------------------------------------------------------------------------- +// Schema merging — runtime enrichment +// --------------------------------------------------------------------------- + +/** + * Deep-merge a runtime param schema into a base (static) schema. + * + * The merge is shallow at the top level (only `properties` is merged) and + * **deep within each property**: when a runtime property key collides with + * a base property key, the two entries are spread together so that base + * fields (e.g. `default`, `minimum`, `maximum`) are preserved unless the + * runtime entry explicitly overrides them. + * + * This is used to combine the static `param_schema` from the node registry + * with per-instance runtime discoveries (e.g. Slint component properties). + */ +export const deepMergeSchemas = ( + base: JsonSchema | undefined, + runtime: JsonSchema | undefined +): JsonSchema => { + if (!runtime) return base ?? {}; + if (!base) return runtime; + + const baseProps = base.properties ?? {}; + const runtimeProps = runtime.properties ?? {}; + + // Property-level deep merge: for each key present in runtime, spread + // the base entry first (if any) then the runtime entry on top so that + // runtime fields win but base-only fields (default, min, max, …) survive. + const mergedProps: Record = { ...baseProps }; + for (const [key, runtimeEntry] of Object.entries(runtimeProps)) { + const baseEntry = baseProps[key]; + mergedProps[key] = baseEntry ? { ...baseEntry, ...runtimeEntry } : runtimeEntry; + } + + return { + ...base, + properties: mergedProps, + }; +}; + +// --------------------------------------------------------------------------- +// Toggle (boolean) config extraction +// --------------------------------------------------------------------------- + +export interface ToggleConfig { + key: string; + /** Dot-notation path for UpdateParams. Defaults to `key`. */ + path: string; + schema: JsonSchemaProperty; +} + +/** + * Extracts toggle configurations from a JSON schema. + * Returns configs for boolean properties marked `tunable: true`. + */ +export const extractToggleConfigs = (schema: JsonSchema | undefined): ToggleConfig[] => { + if (!schema) return []; + + const properties = schema.properties ?? {}; + + return Object.entries(properties).reduce((acc, [key, schemaProp]) => { + if (!schemaProp || schemaProp.type !== 'boolean' || !schemaProp.tunable) { + return acc; + } + acc.push({ + key, + path: schemaProp.path ?? key, + schema: schemaProp, + }); + return acc; + }, [] as ToggleConfig[]); +}; + +// --------------------------------------------------------------------------- +// Text (string) config extraction +// --------------------------------------------------------------------------- + +export interface TextConfig { + key: string; + /** Dot-notation path for UpdateParams. Defaults to `key`. */ + path: string; + schema: JsonSchemaProperty; +} + +/** + * Extracts text input configurations from a JSON schema. + * Returns configs for string properties marked `tunable: true`. + * Excludes enum-constrained strings (those would need a select/dropdown control). + */ +export const extractTextConfigs = (schema: JsonSchema | undefined): TextConfig[] => { + if (!schema) return []; + + const properties = schema.properties ?? {}; + + return Object.entries(properties).reduce((acc, [key, schemaProp]) => { + if ( + !schemaProp || + schemaProp.type !== 'string' || + !schemaProp.tunable || + (schemaProp.enum && schemaProp.enum.length > 0) + ) { + return acc; + } + acc.push({ + key, + path: schemaProp.path ?? key, + schema: schemaProp, + }); + return acc; + }, [] as TextConfig[]); +}; + +// --------------------------------------------------------------------------- +// Schema → ControlConfig conversion +// --------------------------------------------------------------------------- + +/** Derive a human-readable label: "clock_running" → "Clock Running". */ +function labelFromKey(key: string): string { + return key.replace(/[_-]/g, ' ').replace(/\b\w/g, (c) => c.toUpperCase()); +} + +/** Map a single tunable schema property to a ControlConfig, or null. */ +function propToControlConfig( + nodeId: string, + key: string, + prop: JsonSchemaProperty, + group: string | null +): ControlConfig | null { + const path = prop.path ?? key; + const label = labelFromKey(key); + const base = { label, node: nodeId, property: path, group, value: null }; + + switch (prop.type) { + case 'boolean': + return { + ...base, + type: 'toggle', + default: prop.default ?? false, + min: null, + max: null, + step: null, + }; + case 'number': + case 'integer': { + const min = resolveMinimum(prop) ?? 0; + const max = resolveMaximum(prop) ?? 100; + return { + ...base, + type: 'number', + default: prop.default ?? min, + min, + max, + step: inferStep(prop, min, max), + }; + } + case 'string': + if (prop.enum && prop.enum.length > 0) return null; + return { + ...base, + type: 'text', + default: prop.default ?? '', + min: null, + max: null, + step: null, + }; + default: + return null; + } +} + +/** + * Converts tunable properties from a merged JSON schema into `ControlConfig` + * entries suitable for `OverlayControls`. + * + * This bridges the gap between the schema-driven controls in Monitor View + * and the `client.controls` YAML controls in Stream View, allowing both + * views to render the same set of controls from a single source of truth. + * + * @param nodeId The pipeline node ID (e.g. "scoreboard"). + * @param schema The merged (base + runtime) JSON schema for the node. + * @param group Optional group label for all generated controls. + */ +export function schemaToControlConfigs( + nodeId: string, + schema: JsonSchema | undefined, + group?: string +): ControlConfig[] { + if (!schema?.properties) return []; + + const groupLabel = group ?? null; + const controls: ControlConfig[] = []; + + for (const [key, prop] of Object.entries(schema.properties)) { + if (!prop?.tunable) continue; + const ctrl = propToControlConfig(nodeId, key, prop, groupLabel); + if (ctrl) controls.push(ctrl); + } + + return controls; +} diff --git a/ui/src/views/MonitorView.tsx b/ui/src/views/MonitorView.tsx index 0634e26b..ce59967a 100644 --- a/ui/src/views/MonitorView.tsx +++ b/ui/src/views/MonitorView.tsx @@ -48,6 +48,7 @@ import { useReactFlowCommon } from '@/hooks/useReactFlowCommon'; import { useResolvedColorMode } from '@/hooks/useResolvedColorMode'; import { useSession } from '@/hooks/useSession'; import { useSessionList } from '@/hooks/useSessionList'; +import { useTuneNode } from '@/hooks/useTuneNode'; import { useWebSocket } from '@/hooks/useWebSocket'; import { getWebSocketService } from '@/services/websocket'; import { useLayoutStore } from '@/stores/layoutStore'; @@ -69,9 +70,11 @@ import type { InputPin, OutputPin, } from '@/types/types'; +import { dispatchParamUpdate } from '@/utils/controlProps'; import { topoLevelsFromPipeline, orderedNamesFromLevels } from '@/utils/dag'; import { deepEqual } from '@/utils/deepEqual'; -import { validateValue } from '@/utils/jsonSchema'; +import { deepMergeSchemas, validateValue } from '@/utils/jsonSchema'; +import type { JsonSchema, JsonSchemaProperty } from '@/utils/jsonSchema'; import { viewsLogger } from '@/utils/logger'; import { buildEdgesFromConnections, @@ -382,6 +385,11 @@ const MonitorViewContent: React.FC = () => { disconnectPins, } = useSession(selectedSessionId); + // Lightweight hook for dot-notation path updates: deep-merges locally + // into the atom and sends only the partial to the server (unlike + // useSession.tuneNodeConfig which shallow-merges and sends as-is). + const { tuneNodeConfig: tuneNodeConfigDeep } = useTuneNode(selectedSessionId); + // Use session-specific connection status if a session is selected, otherwise use global const isConnected = selectedSessionId ? sessionIsConnected : globalIsConnected; @@ -406,7 +414,14 @@ const MonitorViewContent: React.FC = () => { const conns = pipeline.connections .map((c: Connection) => `${c.from_node}:${c.from_pin}>${c.to_node}:${c.to_pin}`) .sort(); - const key = JSON.stringify([kinds, conns]); + // Include runtime schema keys so topology rebuilds when schemas arrive + // after the initial build (e.g. Slint property discovery). + // NOTE: Only keys are tracked, not content. If a schema's content changed + // for an existing key (hot-reload), the effect would NOT re-run. This is + // intentional — runtime_param_schema() is documented as immutable for the + // node's lifetime (see crates/core ProcessorNode trait docs). + const runtimeKeys = Object.keys(pipeline.runtime_schemas ?? {}).sort(); + const key = JSON.stringify([kinds, conns, runtimeKeys]); viewsLogger.debug('topoKey recalculated:', key.substring(0, 100)); return key; }, [pipeline]); @@ -455,7 +470,15 @@ const MonitorViewContent: React.FC = () => { } }, [selectedSessionId, selectedSession, isLoadingSessions]); - // Helper to validate parameter value against schema + // Helper to validate parameter value against schema. + // Uses the runtime-merged schema when available so that dynamically + // discovered parameters are validated correctly. + // + // Runtime-discovered properties (e.g. from Slint) are stored as flat + // keys in the merged schema (e.g. "show") with a `path` field containing + // the dot-notation wire path (e.g. "properties.show"). When paramKey is + // a dot-path, we search for a property whose `path` matches before + // falling back to a flat key lookup. const validateParamValue = useCallback( (nodeId: string, paramKey: string, value: unknown): string | null => { const node = pipeline?.nodes[nodeId]; @@ -464,15 +487,28 @@ const MonitorViewContent: React.FC = () => { const nodeDef = nodeDefinitions.find((d) => d.kind === node.kind); if (!nodeDef) return null; - const schema = nodeDef.param_schema as - | { - properties?: Record< - string, - { type?: string; minimum?: number; maximum?: number; multipleOf?: number } - >; + // Merge runtime schema (if any) so dynamically discovered properties + // are included in validation. + const runtimeSchema = pipeline?.runtime_schemas?.[nodeId] as JsonSchema | undefined; + const baseSchema = nodeDef.param_schema as JsonSchema | undefined; + const merged = runtimeSchema ? deepMergeSchemas(baseSchema, runtimeSchema) : baseSchema; + if (!merged?.properties) return null; + + // 1. Direct flat-key lookup (works for simple keys like "gain_db"). + let propSchema = merged.properties[paramKey] as JsonSchemaProperty | undefined; + + // 2. If paramKey is a dot-path (e.g. "properties.show"), search for a + // schema property whose `path` field matches. Runtime-discovered + // properties use this pattern. + if (!propSchema && paramKey.includes('.')) { + for (const entry of Object.values(merged.properties)) { + if (entry && (entry as JsonSchemaProperty).path === paramKey) { + propSchema = entry as JsonSchemaProperty; + break; } - | undefined; - const propSchema = schema?.properties?.[paramKey]; + } + } + if (!propSchema) return null; return validateValue(value, propSchema); @@ -498,9 +534,11 @@ const MonitorViewContent: React.FC = () => { return; } - tuneNode(nodeId, key, value); + // Dot-notation paths need nested payload (same deep-merge logic as + // stableOnParamChange — see comment there for details). + dispatchParamUpdate(nodeId, key, value, tuneNode, tuneNodeConfigDeep); }, - [toast, tuneNode] + [toast, tuneNode, tuneNodeConfigDeep] ); // Memoized label change handler (currently no-op) @@ -842,19 +880,32 @@ const MonitorViewContent: React.FC = () => { : null) ?? apiNode.state; // Get base pins from definition and resolve dynamic pins - const baseInputs = defByKind.get(apiNode.kind)?.inputs ?? []; - const baseOutputs = defByKind.get(apiNode.kind)?.outputs ?? []; - const nodeDefinition = defByKind.get(apiNode.kind); + const nodeDef = defByKind.get(apiNode.kind); + const baseInputs = nodeDef?.inputs ?? []; + const baseOutputs = nodeDef?.outputs ?? []; const { finalInputs, finalOutputs } = resolveDynamicPins( - nodeDefinition, + nodeDef, nodeName, pipeline, baseInputs, baseOutputs ); - const nodeDef = defByKind.get(apiNode.kind); + // Merge runtime param schema (if any) with the static per-kind schema. + // Runtime schemas are per-instance overrides discovered after node init + // (e.g. Slint component properties enumerated from the compiled .slint). + const runtimeSchema = pipeline.runtime_schemas?.[nodeName] as JsonSchema | undefined; + const effectiveNodeDef = + runtimeSchema && nodeDef + ? { + ...nodeDef, + param_schema: deepMergeSchemas( + nodeDef.param_schema as JsonSchema | undefined, + runtimeSchema + ), + } + : nodeDef; // Build node object using helper function const node = buildNodeObject({ @@ -864,7 +915,7 @@ const MonitorViewContent: React.FC = () => { nodeState, finalInputs, finalOutputs, - nodeDef, + nodeDef: effectiveNodeDef, stableOnParamChange, stableOnConfigChange, selectedSessionId, @@ -902,9 +953,13 @@ const MonitorViewContent: React.FC = () => { return; } - tuneNode(nodeId, paramName, value); + // Dot-notation paths (e.g. "properties.show") need buildParamUpdate to + // produce the correct nested UpdateParams payload. tuneNodeConfigDeep + // deep-merges locally into the atom (preserving sibling nested + // properties) and sends only the partial to the server. + dispatchParamUpdate(nodeId, paramName, value, tuneNode, tuneNodeConfigDeep); }, - [toast, tuneNode] + [toast, tuneNode, tuneNodeConfigDeep] ); // Stable callback for full-config updates (compositor nodes). diff --git a/ui/src/views/StreamView.tsx b/ui/src/views/StreamView.tsx index 182509f4..bca5103f 100644 --- a/ui/src/views/StreamView.tsx +++ b/ui/src/views/StreamView.tsx @@ -35,6 +35,7 @@ import { getApiUrl } from '@/services/base'; import { listDynamicSamples } from '@/services/samples'; import { createSession } from '@/services/sessions'; import { useSchemaStore, ensureSchemasLoaded } from '@/stores/schemaStore'; +import { useSessionStore } from '@/stores/sessionStore'; import type { Event } from '@/types/types'; import { getLogger } from '@/utils/logger'; import { extractMoqPeerSettings, applyMoqSettings } from '@/utils/moqPeerSettings'; @@ -604,6 +605,20 @@ const StreamView: React.FC = () => { useVideoCanvas(videoRenderer); const { muted, volume, toggleMute, changeVolume } = useAudioControls(audioEmitter); + // Read the pipeline (including runtime_schemas) from the session store, + // which is kept up-to-date by WebSocket events (RuntimeSchemasUpdated). + // This replaces the previous one-shot REST fetch with a 1.5s delay that + // could miss late-arriving schemas. + const livePipeline = useSessionStore( + useCallback( + (s) => { + if (!activeSessionId) return null; + return s.sessions.get(activeSessionId)?.pipeline ?? null; + }, + [activeSessionId] + ) + ); + // Validate active session still exists when navigating to this view useEffect(() => { const validateSession = async () => { @@ -1059,7 +1074,11 @@ const StreamView: React.FC = () => { {activeSessionId && viewState.pipelineYaml && ( - + )} {isStreaming && videoRenderer && !msePath && (