From 7b6bec7dd3eae10cbb7b9db241140417c3ad2b86 Mon Sep 17 00:00:00 2001 From: Hunter B Date: Fri, 5 Jun 2026 21:55:53 -0700 Subject: [PATCH] feat(whaleflow): add usage telemetry to mock results --- CHANGELOG.md | 3 + crates/tui/CHANGELOG.md | 3 + crates/whaleflow/src/lib.rs | 112 ++++++++++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a7a228b6..d23626fc2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 authoring layer now compiles fail-closed model-authored workflow files into that typed IR, with `rlm_cache_change.star` and `issue_fix_tournament.star` examples plus a one-pass repair for common `ctx.*` authoring aliases (#2670). + Leaf, branch, and workflow execution results now carry deterministic token + and cost telemetry fields that the mock executor can aggregate without live + provider calls or runtime sub-agent fanout (#2486). Thanks @AdityaVG13 for the WhaleFlow draft and cost-tracking direction. - Added a state-store v2 schema migration for WhaleFlow trace tables covering workflow, branch, leaf, control-node, and teacher-candidate runs. The diff --git a/crates/tui/CHANGELOG.md b/crates/tui/CHANGELOG.md index 5a7a228b6..d23626fc2 100644 --- a/crates/tui/CHANGELOG.md +++ b/crates/tui/CHANGELOG.md @@ -47,6 +47,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 authoring layer now compiles fail-closed model-authored workflow files into that typed IR, with `rlm_cache_change.star` and `issue_fix_tournament.star` examples plus a one-pass repair for common `ctx.*` authoring aliases (#2670). + Leaf, branch, and workflow execution results now carry deterministic token + and cost telemetry fields that the mock executor can aggregate without live + provider calls or runtime sub-agent fanout (#2486). Thanks @AdityaVG13 for the WhaleFlow draft and cost-tracking direction. - Added a state-store v2 schema migration for WhaleFlow trace tables covering workflow, branch, leaf, control-node, and teacher-candidate runs. The diff --git a/crates/whaleflow/src/lib.rs b/crates/whaleflow/src/lib.rs index b08338500..c2ce75767 100644 --- a/crates/whaleflow/src/lib.rs +++ b/crates/whaleflow/src/lib.rs @@ -439,6 +439,8 @@ pub struct BranchResult { pub task_id: String, pub status: WorkflowRunStatus, #[serde(default)] + pub usage: WorkflowUsage, + #[serde(default)] pub artifacts: Vec, #[serde(default)] pub notes: Option, @@ -450,11 +452,36 @@ pub struct LeafResult { pub task_id: String, pub status: WorkflowRunStatus, #[serde(default)] + pub usage: WorkflowUsage, + #[serde(default)] pub output: Option, #[serde(default)] pub artifacts: Vec, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +pub struct WorkflowUsage { + #[serde(default)] + pub input_tokens: u64, + #[serde(default)] + pub output_tokens: u64, + #[serde(default)] + pub cost_microusd: u64, +} + +impl WorkflowUsage { + #[must_use] + pub fn total_tokens(self) -> u64 { + self.input_tokens.saturating_add(self.output_tokens) + } + + fn add_assign(&mut self, other: Self) { + self.input_tokens = self.input_tokens.saturating_add(other.input_tokens); + self.output_tokens = self.output_tokens.saturating_add(other.output_tokens); + self.cost_microusd = self.cost_microusd.saturating_add(other.cost_microusd); + } +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ControlNodeResult { pub node_id: String, @@ -494,6 +521,8 @@ pub enum ControlNodeKind { pub struct WorkflowExecution { pub status: WorkflowRunStatus, #[serde(default)] + pub usage: WorkflowUsage, + #[serde(default)] pub leaf_results: Vec, #[serde(default)] pub branch_results: Vec, @@ -505,6 +534,7 @@ impl Default for WorkflowExecution { fn default() -> Self { Self { status: WorkflowRunStatus::Succeeded, + usage: WorkflowUsage::default(), leaf_results: Vec::new(), branch_results: Vec::new(), control_node_results: Vec::new(), @@ -522,6 +552,8 @@ impl WorkflowExecution { pub struct MockLeafOutcome { pub status: WorkflowRunStatus, #[serde(default)] + pub usage: WorkflowUsage, + #[serde(default)] pub output: Option, #[serde(default)] pub artifacts: Vec, @@ -531,6 +563,7 @@ impl MockLeafOutcome { pub fn succeeded(output: impl Into) -> Self { Self { status: WorkflowRunStatus::Succeeded, + usage: WorkflowUsage::default(), output: Some(output.into()), artifacts: Vec::new(), } @@ -539,10 +572,16 @@ impl MockLeafOutcome { pub fn failed(output: impl Into) -> Self { Self { status: WorkflowRunStatus::Failed, + usage: WorkflowUsage::default(), output: Some(output.into()), artifacts: Vec::new(), } } + + pub fn with_usage(mut self, usage: WorkflowUsage) -> Self { + self.usage = usage; + self + } } #[derive(Debug, Default, Clone)] @@ -670,6 +709,10 @@ impl MockWorkflowExecutor { } else { WorkflowRunStatus::Succeeded }; + let mut usage = WorkflowUsage::default(); + for result in &execution.leaf_results[before..] { + usage.add_assign(result.usage); + } if status == WorkflowRunStatus::Failed { execution.mark_failed(); } @@ -677,6 +720,7 @@ impl MockWorkflowExecutor { branch_id: spec.id.clone(), task_id: spec.id.clone(), status, + usage, artifacts: Vec::new(), notes: Some("mock branch set executed without runtime fanout".to_string()), }); @@ -698,10 +742,12 @@ impl MockWorkflowExecutor { if outcome.status != WorkflowRunStatus::Succeeded { execution.mark_failed(); } + execution.usage.add_assign(outcome.usage); execution.leaf_results.push(LeafResult { leaf_id: spec.id.clone(), task_id: spec.id.clone(), status: outcome.status, + usage: outcome.usage, output: outcome.output, artifacts: outcome.artifacts, }); @@ -1610,6 +1656,11 @@ mod tests { branch_id: "discover".to_string(), task_id: "scan".to_string(), status: WorkflowRunStatus::Succeeded, + usage: WorkflowUsage { + input_tokens: 100, + output_tokens: 25, + cost_microusd: 42, + }, artifacts: vec!["trace://branches/discover".to_string()], notes: Some("validated prompt surfaces".to_string()), }; @@ -1617,12 +1668,14 @@ mod tests { let json = serde_json::to_string(&result).expect("serialize branch result"); assert!(json.contains("\"status\":\"succeeded\"")); + assert!(json.contains("\"cost_microusd\":42")); let parsed: BranchResult = serde_json::from_str(&json).expect("parse branch result"); assert_eq!(parsed, result); let minimal: BranchResult = serde_json::from_str(r#"{"branch_id":"discover","task_id":"scan","status":"pending"}"#) .expect("parse minimal branch result"); + assert_eq!(minimal.usage, WorkflowUsage::default()); assert!(minimal.artifacts.is_empty()); assert_eq!(minimal.notes, None); } @@ -1633,6 +1686,11 @@ mod tests { leaf_id: "scan-readme".to_string(), task_id: "scan".to_string(), status: WorkflowRunStatus::Failed, + usage: WorkflowUsage { + input_tokens: 11, + output_tokens: 7, + cost_microusd: 3, + }, output: Some("README needs clearer setup steps".to_string()), artifacts: vec!["trace://leaves/scan-readme".to_string()], }; @@ -1640,6 +1698,7 @@ mod tests { let json = serde_json::to_string(&result).expect("serialize leaf result"); assert!(json.contains("\"status\":\"failed\"")); + assert!(json.contains("\"input_tokens\":11")); let parsed: LeafResult = serde_json::from_str(&json).expect("parse leaf result"); assert_eq!(parsed, result); @@ -1647,6 +1706,7 @@ mod tests { r#"{"leaf_id":"scan-readme","task_id":"scan","status":"pending"}"#, ) .expect("parse minimal leaf result"); + assert_eq!(minimal.usage, WorkflowUsage::default()); assert_eq!(minimal.output, None); assert!(minimal.artifacts.is_empty()); } @@ -1713,6 +1773,58 @@ mod tests { ); } + #[test] + fn mock_executor_aggregates_leaf_usage() { + let workflow = workflow_spec(vec![WorkflowNode::BranchSet(BranchSpec { + id: "discover".to_string(), + description: None, + parallel: true, + budget: BudgetSpec::default(), + permissions: PermissionSpec::default(), + model_policy: ModelPolicy::default(), + children: vec![leaf_node("scan-readme"), leaf_node("scan-tests")], + })]); + + let mut executor = MockWorkflowExecutor::new() + .with_leaf_outcome( + "scan-readme", + MockLeafOutcome::succeeded("readme ok").with_usage(WorkflowUsage { + input_tokens: 100, + output_tokens: 25, + cost_microusd: 500, + }), + ) + .with_leaf_outcome( + "scan-tests", + MockLeafOutcome::succeeded("tests ok").with_usage(WorkflowUsage { + input_tokens: 50, + output_tokens: 10, + cost_microusd: 250, + }), + ); + + let execution = executor.run(&workflow).expect("mock workflow should run"); + + assert_eq!( + execution.usage, + WorkflowUsage { + input_tokens: 150, + output_tokens: 35, + cost_microusd: 750, + } + ); + assert_eq!(execution.usage.total_tokens(), 185); + assert_eq!(execution.branch_results[0].usage, execution.usage); + assert_eq!( + execution + .leaf_results + .iter() + .map(|result| result.usage.cost_microusd) + .collect::>(), + vec![500, 250] + ); + } + #[test] fn loop_until_stops_on_pass() { let workflow = workflow_spec(vec![WorkflowNode::LoopUntil(LoopUntilSpec {