Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
authoring layer now compiles fail-closed model-authored workflow files into
that typed IR, with `rlm_cache_change.star` and `issue_fix_tournament.star`
examples plus a one-pass repair for common `ctx.*` authoring aliases (#2670).
Leaf, branch, and workflow execution results now carry deterministic token
and cost telemetry fields that the mock executor can aggregate without live
provider calls or runtime sub-agent fanout (#2486).
Thanks @AdityaVG13 for the WhaleFlow draft and cost-tracking direction.
- Added a state-store v2 schema migration for WhaleFlow trace tables covering
workflow, branch, leaf, control-node, and teacher-candidate runs. The
Expand Down
3 changes: 3 additions & 0 deletions crates/tui/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
authoring layer now compiles fail-closed model-authored workflow files into
that typed IR, with `rlm_cache_change.star` and `issue_fix_tournament.star`
examples plus a one-pass repair for common `ctx.*` authoring aliases (#2670).
Leaf, branch, and workflow execution results now carry deterministic token
and cost telemetry fields that the mock executor can aggregate without live
provider calls or runtime sub-agent fanout (#2486).
Thanks @AdityaVG13 for the WhaleFlow draft and cost-tracking direction.
- Added a state-store v2 schema migration for WhaleFlow trace tables covering
workflow, branch, leaf, control-node, and teacher-candidate runs. The
Expand Down
112 changes: 112 additions & 0 deletions crates/whaleflow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,8 @@ pub struct BranchResult {
pub task_id: String,
pub status: WorkflowRunStatus,
#[serde(default)]
pub usage: WorkflowUsage,
#[serde(default)]
pub artifacts: Vec<String>,
#[serde(default)]
pub notes: Option<String>,
Expand All @@ -450,11 +452,36 @@ pub struct LeafResult {
pub task_id: String,
pub status: WorkflowRunStatus,
#[serde(default)]
pub usage: WorkflowUsage,
#[serde(default)]
pub output: Option<String>,
#[serde(default)]
pub artifacts: Vec<String>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct WorkflowUsage {
#[serde(default)]
pub input_tokens: u64,
#[serde(default)]
pub output_tokens: u64,
#[serde(default)]
pub cost_microusd: u64,
}

impl WorkflowUsage {
#[must_use]
pub fn total_tokens(self) -> u64 {
self.input_tokens.saturating_add(self.output_tokens)
}

fn add_assign(&mut self, other: Self) {
self.input_tokens = self.input_tokens.saturating_add(other.input_tokens);
self.output_tokens = self.output_tokens.saturating_add(other.output_tokens);
self.cost_microusd = self.cost_microusd.saturating_add(other.cost_microusd);
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ControlNodeResult {
pub node_id: String,
Expand Down Expand Up @@ -494,6 +521,8 @@ pub enum ControlNodeKind {
pub struct WorkflowExecution {
pub status: WorkflowRunStatus,
#[serde(default)]
pub usage: WorkflowUsage,
#[serde(default)]
pub leaf_results: Vec<LeafResult>,
#[serde(default)]
pub branch_results: Vec<BranchResult>,
Expand All @@ -505,6 +534,7 @@ impl Default for WorkflowExecution {
fn default() -> Self {
Self {
status: WorkflowRunStatus::Succeeded,
usage: WorkflowUsage::default(),
leaf_results: Vec::new(),
branch_results: Vec::new(),
control_node_results: Vec::new(),
Expand All @@ -522,6 +552,8 @@ impl WorkflowExecution {
pub struct MockLeafOutcome {
pub status: WorkflowRunStatus,
#[serde(default)]
pub usage: WorkflowUsage,
#[serde(default)]
pub output: Option<String>,
#[serde(default)]
pub artifacts: Vec<String>,
Expand All @@ -531,6 +563,7 @@ impl MockLeafOutcome {
pub fn succeeded(output: impl Into<String>) -> Self {
Self {
status: WorkflowRunStatus::Succeeded,
usage: WorkflowUsage::default(),
output: Some(output.into()),
artifacts: Vec::new(),
}
Expand All @@ -539,10 +572,16 @@ impl MockLeafOutcome {
pub fn failed(output: impl Into<String>) -> Self {
Self {
status: WorkflowRunStatus::Failed,
usage: WorkflowUsage::default(),
output: Some(output.into()),
artifacts: Vec::new(),
}
}

pub fn with_usage(mut self, usage: WorkflowUsage) -> Self {
self.usage = usage;
self
}
}

#[derive(Debug, Default, Clone)]
Expand Down Expand Up @@ -670,13 +709,18 @@ impl MockWorkflowExecutor {
} else {
WorkflowRunStatus::Succeeded
};
let mut usage = WorkflowUsage::default();
for result in &execution.leaf_results[before..] {
usage.add_assign(result.usage);
}
if status == WorkflowRunStatus::Failed {
execution.mark_failed();
}
execution.branch_results.push(BranchResult {
branch_id: spec.id.clone(),
task_id: spec.id.clone(),
status,
usage,
artifacts: Vec::new(),
notes: Some("mock branch set executed without runtime fanout".to_string()),
});
Expand All @@ -698,10 +742,12 @@ impl MockWorkflowExecutor {
if outcome.status != WorkflowRunStatus::Succeeded {
execution.mark_failed();
}
execution.usage.add_assign(outcome.usage);
execution.leaf_results.push(LeafResult {
leaf_id: spec.id.clone(),
task_id: spec.id.clone(),
status: outcome.status,
usage: outcome.usage,
output: outcome.output,
artifacts: outcome.artifacts,
});
Expand Down Expand Up @@ -1610,19 +1656,26 @@ mod tests {
branch_id: "discover".to_string(),
task_id: "scan".to_string(),
status: WorkflowRunStatus::Succeeded,
usage: WorkflowUsage {
input_tokens: 100,
output_tokens: 25,
cost_microusd: 42,
},
artifacts: vec!["trace://branches/discover".to_string()],
notes: Some("validated prompt surfaces".to_string()),
};

let json = serde_json::to_string(&result).expect("serialize branch result");

assert!(json.contains("\"status\":\"succeeded\""));
assert!(json.contains("\"cost_microusd\":42"));
let parsed: BranchResult = serde_json::from_str(&json).expect("parse branch result");
assert_eq!(parsed, result);

let minimal: BranchResult =
serde_json::from_str(r#"{"branch_id":"discover","task_id":"scan","status":"pending"}"#)
.expect("parse minimal branch result");
assert_eq!(minimal.usage, WorkflowUsage::default());
assert!(minimal.artifacts.is_empty());
assert_eq!(minimal.notes, None);
}
Expand All @@ -1633,20 +1686,27 @@ mod tests {
leaf_id: "scan-readme".to_string(),
task_id: "scan".to_string(),
status: WorkflowRunStatus::Failed,
usage: WorkflowUsage {
input_tokens: 11,
output_tokens: 7,
cost_microusd: 3,
},
output: Some("README needs clearer setup steps".to_string()),
artifacts: vec!["trace://leaves/scan-readme".to_string()],
};

let json = serde_json::to_string(&result).expect("serialize leaf result");

assert!(json.contains("\"status\":\"failed\""));
assert!(json.contains("\"input_tokens\":11"));
let parsed: LeafResult = serde_json::from_str(&json).expect("parse leaf result");
assert_eq!(parsed, result);

let minimal: LeafResult = serde_json::from_str(
r#"{"leaf_id":"scan-readme","task_id":"scan","status":"pending"}"#,
)
.expect("parse minimal leaf result");
assert_eq!(minimal.usage, WorkflowUsage::default());
assert_eq!(minimal.output, None);
assert!(minimal.artifacts.is_empty());
}
Expand Down Expand Up @@ -1713,6 +1773,58 @@ mod tests {
);
}

#[test]
fn mock_executor_aggregates_leaf_usage() {
let workflow = workflow_spec(vec![WorkflowNode::BranchSet(BranchSpec {
id: "discover".to_string(),
description: None,
parallel: true,
budget: BudgetSpec::default(),
permissions: PermissionSpec::default(),
model_policy: ModelPolicy::default(),
children: vec![leaf_node("scan-readme"), leaf_node("scan-tests")],
})]);

let mut executor = MockWorkflowExecutor::new()
.with_leaf_outcome(
"scan-readme",
MockLeafOutcome::succeeded("readme ok").with_usage(WorkflowUsage {
input_tokens: 100,
output_tokens: 25,
cost_microusd: 500,
}),
)
.with_leaf_outcome(
"scan-tests",
MockLeafOutcome::succeeded("tests ok").with_usage(WorkflowUsage {
input_tokens: 50,
output_tokens: 10,
cost_microusd: 250,
}),
);

let execution = executor.run(&workflow).expect("mock workflow should run");

assert_eq!(
execution.usage,
WorkflowUsage {
input_tokens: 150,
output_tokens: 35,
cost_microusd: 750,
}
);
assert_eq!(execution.usage.total_tokens(), 185);
assert_eq!(execution.branch_results[0].usage, execution.usage);
assert_eq!(
execution
.leaf_results
.iter()
.map(|result| result.usage.cost_microusd)
.collect::<Vec<_>>(),
vec![500, 250]
);
}

#[test]
fn loop_until_stops_on_pass() {
let workflow = workflow_spec(vec![WorkflowNode::LoopUntil(LoopUntilSpec {
Expand Down
Loading