Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
bf1313f
feat(ui): schema-driven boolean and text controls on node cards
streamkit-devin Apr 3, 2026
3df959e
fix(ui): support dot-notation path in sliders and InspectorPane readback
streamkit-devin Apr 3, 2026
479c883
fix(ui): deep-merge nested param updates to preserve sibling properties
streamkit-devin Apr 3, 2026
2a922cf
fix(ui): use useTuneNode for dot-path updates, fix slider prop fallback
streamkit-devin Apr 3, 2026
a5a925d
fix(ui): move side effect out of useState updater, fix design-view do…
streamkit-devin Apr 3, 2026
9efe938
refactor(ui): address review feedback on schema-driven controls
streamkit-devin Apr 3, 2026
5e7acff
fix(ui): use textRef in flushDebounce to avoid stale closure on every…
streamkit-devin Apr 3, 2026
bce6cba
fix(ui): update checkedRef in handleToggle to prevent double-click st…
streamkit-devin Apr 3, 2026
e036cf1
fix(ui): clear debounce timer ref after firing to prevent duplicate W…
streamkit-devin Apr 3, 2026
c7b3543
feat: runtime dynamic param schema enrichment
streamkit-devin Apr 3, 2026
3eb599f
fix: bump NATIVE_PLUGIN_API_VERSION to 4, fix ValueType variants
streamkit-devin Apr 3, 2026
1d92fc1
refactor(sdk): replace CResult with CSchemaResult for get_runtime_par…
streamkit-devin Apr 4, 2026
e388fd3
style: format dynamic_messages.rs
streamkit-devin Apr 4, 2026
c932992
fix(ui): property-level deep merge in deepMergeSchemas
streamkit-devin Apr 4, 2026
9508bad
docs(core): document runtime_param_schema() immutability assumption
streamkit-devin Apr 4, 2026
7b8070d
refactor(ui): move TEXT_DEBOUNCE_MS to constants/timing.ts
streamkit-devin Apr 4, 2026
3794344
fix(engine): broadcast RuntimeSchemasUpdated WS event on schema disco…
streamkit-devin Apr 4, 2026
7613424
Merge branch 'main' into devin/1775244734-schema-driven-controls
streamer45 Apr 4, 2026
b2766a2
feat(ui): collapsible controls on node cards for cleaner pipeline view
streamkit-devin Apr 4, 2026
e5fff3d
fix(ui): improve controls toggle alignment, z-index, and selection
streamkit-devin Apr 4, 2026
d965f2a
fix: deep-merge partial params on server and client, rebuild on runti…
streamkit-devin Apr 4, 2026
aed9493
style: format deep_merge_json
streamkit-devin Apr 4, 2026
53def42
fix: address review findings — FFI safety, dedup, schema drop warning
streamkit-devin Apr 4, 2026
bad7ee6
fix: persist expanded state, dot-path validation, unbounded schema ch…
streamkit-devin Apr 4, 2026
f390882
style: format TypeScript files
streamkit-devin Apr 4, 2026
948a104
fix: move expandedState declaration after imports to fix ESLint impor…
streamkit-devin Apr 4, 2026
ac79c20
fix: toggle initial state from runtime schema defaults, lighter LIVE …
streamkit-devin Apr 4, 2026
8e6c73a
style: format slint_thread.rs for CI
streamkit-devin Apr 4, 2026
d34220e
fix: normalize Slint property names and unify Stream/Monitor controls
streamkit-devin Apr 4, 2026
c58bbde
fix: add clock_running to scoreboard YAML params and controls
streamkit-devin Apr 4, 2026
a7cfb51
fix: resolve dot-path validation by matching schema path field
streamkit-devin Apr 4, 2026
6ac6c7b
fix: deep-merge params in sync TuneNode handler to match async handler
streamkit-devin Apr 4, 2026
05affdd
Merge branch 'main' into devin/1775244734-schema-driven-controls
streamer45 Apr 4, 2026
0d803d8
fix: use WS-driven store for StreamView pipeline, add SAFETY docs and…
streamkit-devin Apr 4, 2026
c1748c6
style: fix Rust fmt and remove unused fetchPipeline export
streamkit-devin Apr 4, 2026
fe1a3ed
refactor: remove redundant double deep-merge in useTuneNode and usePi…
streamkit-devin Apr 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apps/skit/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2008,6 +2008,7 @@ async fn get_pipeline_handler(
// Fetch current node states without holding the pipeline lock.
let node_states = session.get_node_states().await.unwrap_or_default();
let node_view_data = session.get_node_view_data().await.unwrap_or_default();
let runtime_schemas = session.get_runtime_schemas().await.unwrap_or_default();

// Clone pipeline (short lock hold) and add runtime state to nodes.
let mut api_pipeline = {
Expand All @@ -2023,6 +2024,11 @@ async fn get_pipeline_handler(
api_pipeline.view_data = Some(node_view_data);
}

// Attach runtime param schemas so the UI can merge them with static schemas.
if !runtime_schemas.is_empty() {
api_pipeline.runtime_schemas = Some(runtime_schemas);
}

info!("Fetched pipeline with states for session '{}' via HTTP", session_id);
Ok(Json(api_pipeline))
}
Expand Down
41 changes: 41 additions & 0 deletions apps/skit/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,34 @@ impl Session {
);
});

// Subscribe to runtime schema discovery notifications from the engine
let mut runtime_schema_rx = engine_handle
.subscribe_runtime_schemas()
.await
.map_err(|e| format!("Failed to subscribe to runtime schema updates: {e}"))?;

// Spawn task to forward runtime schema updates to WebSocket clients
let session_id_for_schemas = session_id.clone();
let event_tx_for_schemas = event_tx.clone();
tokio::spawn(async move {
while let Some(update) = runtime_schema_rx.recv().await {
let event = ApiEvent {
message_type: MessageType::Event,
correlation_id: None,
payload: EventPayload::RuntimeSchemasUpdated {
session_id: session_id_for_schemas.clone(),
node_id: update.node_id,
schema: update.schema,
},
};
let _ = event_tx_for_schemas.send(BroadcastEvent::to_all(event));
}
tracing::debug!(
session_id = %session_id_for_schemas,
"Runtime schema forwarding task ended"
);
});

// Subscribe to telemetry events from the engine
let mut telemetry_rx = engine_handle
.subscribe_telemetry()
Expand Down Expand Up @@ -422,6 +450,19 @@ impl Session {
self.engine_handle.get_node_view_data().await
}

/// Gets the runtime param schema overrides for all nodes in this session.
///
/// Only nodes whose `ProcessorNode::runtime_param_schema()` returned
/// `Some` after initialization will have entries.
///
/// # Errors
///
/// Returns an error if the engine handle's oneshot channel fails to receive a response,
/// which typically indicates the engine actor has stopped or panicked.
pub async fn get_runtime_schemas(&self) -> Result<HashMap<String, serde_json::Value>, String> {
self.engine_handle.get_runtime_schemas().await
}

/// Registers a new preview, enforcing the per-session limit.
///
/// # Errors
Expand Down
3 changes: 2 additions & 1 deletion apps/skit/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ pub async fn handle_websocket(
| EventPayload::ConnectionAdded { session_id, .. }
| EventPayload::ConnectionRemoved { session_id, .. }
| EventPayload::NodeTelemetry { session_id, .. }
| EventPayload::NodeViewDataUpdated { session_id, .. } => {
| EventPayload::NodeViewDataUpdated { session_id, .. }
| EventPayload::RuntimeSchemasUpdated { session_id, .. } => {
visible_session_ids.contains(session_id)
}
}
Expand Down
105 changes: 103 additions & 2 deletions apps/skit/src/websocket_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,12 @@ async fn handle_tune_node(
}
let mut pipeline = session.pipeline.lock().await;
if let Some(node) = pipeline.nodes.get_mut(&node_id) {
node.params = Some(durable_params);
// Deep-merge the partial update into existing params so
// sibling keys are preserved (mirrors the async handler).
node.params = Some(match node.params.take() {
Some(existing) => deep_merge_json(existing, durable_params),
None => durable_params,
});
} else {
warn!(
node_id = %node_id,
Expand Down Expand Up @@ -988,7 +993,15 @@ async fn handle_tune_node_fire_and_forget(
}
let mut pipeline = session.pipeline.lock().await;
if let Some(node) = pipeline.nodes.get_mut(&node_id) {
node.params = Some(durable_params);
// Deep-merge the partial update into existing params so
// sibling keys are preserved. Without this, a partial
// nested update like `{ properties: { show: false } }`
// would overwrite the entire params, losing keys such
// as `fps`, `width`, or `properties.name`.
node.params = Some(match node.params.take() {
Some(existing) => deep_merge_json(existing, durable_params),
None => durable_params,
});
} else {
warn!(
node_id = %node_id,
Expand All @@ -997,6 +1010,10 @@ async fn handle_tune_node_fire_and_forget(
}
} // Lock released here

// Broadcast the *partial delta* (not merged state) to all clients.
// Correct deep-merge on receive depends on each client having a
// valid base state, which is guaranteed because every client
// fetches the full pipeline on connect.
let event = ApiEvent {
message_type: MessageType::Event,
correlation_id: None,
Expand Down Expand Up @@ -1066,6 +1083,7 @@ async fn handle_get_pipeline(

let node_states = session.get_node_states().await.unwrap_or_default();
let node_view_data = session.get_node_view_data().await.unwrap_or_default();
let runtime_schemas = session.get_runtime_schemas().await.unwrap_or_default();

// Clone pipeline (short lock hold) and add runtime state to nodes.
let mut api_pipeline = {
Expand All @@ -1081,6 +1099,12 @@ async fn handle_get_pipeline(
api_pipeline.view_data = Some(node_view_data);
}

// Attach runtime param schemas so the UI can merge them with static schemas
// and render controls for dynamically discovered parameters.
if !runtime_schemas.is_empty() {
api_pipeline.runtime_schemas = Some(runtime_schemas);
}

info!(
session_id = %session_id,
node_count = api_pipeline.nodes.len(),
Expand Down Expand Up @@ -1360,3 +1384,80 @@ fn handle_get_permissions(perms: &Permissions, role_name: &str) -> ResponsePaylo
info!(role = %role_name, "Returning permissions for role");
ResponsePayload::Permissions { role: role_name.to_string(), permissions: perms.to_info() }
}

/// Recursively deep-merges `source` into `target`, returning the merged value.
/// Only JSON objects are merged recursively; arrays and scalars in `source`
/// replace the corresponding value in `target`.
fn deep_merge_json(target: serde_json::Value, source: serde_json::Value) -> serde_json::Value {
match (target, source) {
(serde_json::Value::Object(mut t_map), serde_json::Value::Object(s_map)) => {
for (key, s_val) in s_map {
let merged = match t_map.remove(&key) {
Some(t_val) => deep_merge_json(t_val, s_val),
None => s_val,
};
t_map.insert(key, merged);
}
serde_json::Value::Object(t_map)
},
// Non-object source replaces target wholesale.
(_, source) => source,
}
}

#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;

#[test]
fn deep_merge_preserves_sibling_keys() {
let target = json!({
"fps": 30,
"width": 1920,
"properties": { "show": true, "name": "Alex" }
});
let source = json!({
"properties": { "show": false }
});
let merged = deep_merge_json(target, source);
assert_eq!(merged["fps"], 30);
assert_eq!(merged["width"], 1920);
assert_eq!(merged["properties"]["show"], false);
assert_eq!(merged["properties"]["name"], "Alex");
}

#[test]
fn deep_merge_adds_new_keys() {
let target = json!({ "a": 1 });
let source = json!({ "b": 2 });
let merged = deep_merge_json(target, source);
assert_eq!(merged, json!({ "a": 1, "b": 2 }));
}

#[test]
fn deep_merge_replaces_non_object() {
let target = json!({ "x": [1, 2, 3] });
let source = json!({ "x": [4, 5] });
let merged = deep_merge_json(target, source);
assert_eq!(merged["x"], json!([4, 5]));
}

#[test]
fn deep_merge_nested_objects() {
let target = json!({
"properties": {
"home_score": 0,
"away_score": 0,
"show": true
}
});
let source = json!({
"properties": { "home_score": 3 }
});
let merged = deep_merge_json(target, source);
assert_eq!(merged["properties"]["home_score"], 3);
assert_eq!(merged["properties"]["away_score"], 0);
assert_eq!(merged["properties"]["show"], true);
}
}
17 changes: 17 additions & 0 deletions crates/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,16 @@ pub enum EventPayload {
/// RFC 3339 formatted timestamp for convenience
timestamp: String,
},
// --- Runtime Schema Events ---
/// A node's runtime param schema has been discovered after initialization.
/// The UI should merge this with the static per-kind schema so controls
/// can render for dynamically discovered parameters (e.g. Slint properties).
RuntimeSchemasUpdated {
session_id: String,
node_id: String,
#[ts(type = "JsonValue")]
schema: serde_json::Value,
},
}

pub type Event = Message<EventPayload>;
Expand Down Expand Up @@ -538,6 +548,13 @@ pub struct Pipeline {
#[serde(default)]
#[ts(type = "Record<string, JsonValue> | null")]
pub view_data: Option<HashMap<String, serde_json::Value>>,
/// Per-instance runtime param schema overrides discovered after node
/// initialization. Only populated in API responses for nodes whose
/// `ProcessorNode::runtime_param_schema()` returned `Some`.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
#[ts(type = "Record<string, JsonValue> | null")]
pub runtime_schemas: Option<HashMap<String, serde_json::Value>>,
}

// Type aliases for backwards compatibility
Expand Down
22 changes: 20 additions & 2 deletions crates/api/src/yaml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,16 @@ fn compile_steps(
nodes.insert(node_name, Node { kind: step.kind, params: step.params, state: None });
}

Pipeline { name, description, mode, client, nodes, connections, view_data: None }
Pipeline {
name,
description,
mode,
client,
nodes,
connections,
view_data: None,
runtime_schemas: None,
}
}

/// Known bidirectional node kinds that are allowed to participate in cycles.
Expand Down Expand Up @@ -730,7 +739,16 @@ fn compile_dag(
})
.collect();

Ok(Pipeline { name, description, mode, client, nodes, connections, view_data: None })
Ok(Pipeline {
name,
description,
mode,
client,
nodes,
connections,
view_data: None,
runtime_schemas: None,
})
}

// ---------------------------------------------------------------------------
Expand Down
23 changes: 23 additions & 0 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,29 @@ pub trait ProcessorNode: Send + Sync {
Ok(PinUpdate::NoChange)
}

/// Return a runtime-discovered param schema after initialization.
///
/// Plugins whose tunable parameters depend on runtime configuration
/// (e.g., properties discovered after compiling a `.slint` file) can
/// override this to return a JSON Schema fragment. The engine will
/// deep-merge it with the static `param_schema` from registration
/// and deliver the enriched schema to the UI.
///
/// The returned value should be a JSON Schema `"type": "object"` with
/// a `"properties"` map. Each property can include `"tunable": true`
/// and an optional `"path"` override for dot-notation addressing.
///
/// **Called once** — the engine queries this immediately after
/// [`initialize`](Self::initialize) and caches the result for the
/// lifetime of the node. There is currently no mechanism to refresh
/// the schema at runtime; if the underlying configuration changes
/// (e.g. a different `.slint` file), the node must be re-created.
///
/// Default: `None` (use static schema only).
fn runtime_param_schema(&self) -> Option<serde_json::Value> {
None
}

/// Tier 2: Runtime pin management capability.
///
/// Returns true if this node supports adding/removing pins while running.
Expand Down
1 change: 1 addition & 0 deletions crates/engine/benches/av1_compositor_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ fn build_pipeline(args: &BenchArgs) -> streamkit_api::Pipeline {
connections,
view_data: None,
client: None,
runtime_schemas: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/engine/benches/compositor_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ fn build_pipeline(width: u32, height: u32, fps: u32, frame_count: u32) -> stream
connections,
view_data: None,
client: None,
runtime_schemas: None,
}
}

Expand Down
Loading
Loading