From cf4d4de6eaf4c62d39697ecc05cab4de0499eb56 Mon Sep 17 00:00:00 2001 From: AdityaVG13 Date: Mon, 1 Jun 2026 01:28:17 -0400 Subject: [PATCH 1/5] =?UTF-8?q?feat:=20add=20WhaleFlow=20=E2=80=94=20decla?= =?UTF-8?q?rative=20multi-agent=20workflow=20orchestration?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New crate crates/whaleflow providing declarative JSON-config-driven sub-agent swarm orchestration for CodeWhale. Inspired by Claude Code's Dynamic Workflows (Opus 4.8, May 2026). - WorkflowConfig JSON schema with phases, tasks, dependencies - Topological scheduler with semaphore-based concurrency control - File-scope conflict detection for parallel write safety - Git worktree isolation per task (create → extract → apply → clean) - Structured WorkflowResult with per-task cost/token tracking - workflow_run tool schema for model invocation - TUI integration via WhaleFlowSpawner (SubAgentManager bridge) - 18 tests: 15 unit + 3 integration --- Cargo.lock | 15 + Cargo.toml | 1 + crates/tui/Cargo.toml | 1 + crates/tui/src/core/engine.rs | 30 +- crates/tui/src/tools/mod.rs | 1 + crates/tui/src/tools/registry.rs | 10 + crates/tui/src/tools/workflow/mod.rs | 254 ++++++++ crates/whaleflow/Cargo.toml | 17 + crates/whaleflow/src/config.rs | 644 +++++++++++++++++++++ crates/whaleflow/src/lib.rs | 39 ++ crates/whaleflow/src/result.rs | 121 ++++ crates/whaleflow/src/scheduler.rs | 570 ++++++++++++++++++ crates/whaleflow/src/spawner.rs | 75 +++ crates/whaleflow/src/tool.rs | 77 +++ crates/whaleflow/src/worktree.rs | 272 +++++++++ crates/whaleflow/tests/integration_test.rs | 284 +++++++++ docs/WHALEFLOW_ARCHITECTURE.md | 197 +++++++ 17 files changed, 2601 insertions(+), 7 deletions(-) create mode 100644 crates/tui/src/tools/workflow/mod.rs create mode 100644 crates/whaleflow/Cargo.toml create mode 100644 crates/whaleflow/src/config.rs create mode 100644 crates/whaleflow/src/lib.rs create mode 100644 crates/whaleflow/src/result.rs create mode 100644 crates/whaleflow/src/scheduler.rs create mode 100644 crates/whaleflow/src/spawner.rs create mode 100644 crates/whaleflow/src/tool.rs create mode 100644 crates/whaleflow/src/worktree.rs create mode 100644 crates/whaleflow/tests/integration_test.rs create mode 100644 docs/WHALEFLOW_ARCHITECTURE.md diff --git a/Cargo.lock b/Cargo.lock index 9c28257f2..bfd577c31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1000,6 +1000,7 @@ dependencies = [ "codewhale-release", "codewhale-secrets", "codewhale-tools", + "codewhale-whaleflow", "colored", "crossterm 0.28.1", "dirs", @@ -1055,6 +1056,20 @@ dependencies = [ name = "codewhale-tui-core" version = "0.8.48" +[[package]] +name = "codewhale-whaleflow" +version = "0.8.48" +dependencies = [ + "anyhow", + "async-trait", + "serde", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "colorchoice" version = "1.0.4" diff --git a/Cargo.toml b/Cargo.toml index 4d5e21568..0b44b1152 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ members = [ "crates/tools", "crates/tui", "crates/tui-core", + "crates/whaleflow", ] default-members = ["crates/cli", "crates/app-server", "crates/tui"] resolver = "2" diff --git a/crates/tui/Cargo.toml b/crates/tui/Cargo.toml index 38e33c037..1a436aed7 100644 --- a/crates/tui/Cargo.toml +++ b/crates/tui/Cargo.toml @@ -32,6 +32,7 @@ codewhale-protocol = { path = "../protocol", version = "0.8.48" } codewhale-release = { path = "../release", version = "0.8.48" } codewhale-secrets = { path = "../secrets", version = "0.8.48" } codewhale-tools = { path = "../tools", version = "0.8.48" } +codewhale-whaleflow = { path = "../whaleflow", version = "0.8.48" } schemaui = { version = "0.12.0", default-features = false, optional = true } async-stream = "0.3.6" async-trait = "0.1" diff --git a/crates/tui/src/core/engine.rs b/crates/tui/src/core/engine.rs index 5813b5381..a98859070 100644 --- a/crates/tui/src/core/engine.rs +++ b/crates/tui/src/core/engine.rs @@ -54,6 +54,7 @@ use crate::tools::subagent::{ }; use crate::tools::todo::{SharedTodoList, new_shared_todo_list}; use crate::tools::user_input::{UserInputRequest, UserInputResponse}; +use crate::tools::workflow::WhaleFlowSpawner; use crate::tools::{ToolContext, ToolRegistryBuilder}; use crate::tui::app::AppMode; use crate::utils::spawn_supervised; @@ -1233,14 +1234,29 @@ impl Engine { } else { None }; - Some( - builder - .with_subagent_tools( + + // Build the WhaleFlow spawner for the workflow_run tool. + // It uses the same manager and runtime that drive + // sub-agent management tools. + let workflow_spawner = runtime + .as_ref() + .map(|rt| { + Arc::new(WhaleFlowSpawner::new( self.subagent_manager.clone(), - runtime.expect("sub-agent runtime should exist with active client"), - ) - .build(tool_context), - ) + rt.clone(), + tool_context.workspace.clone(), + )) + }); + + let mut builder = builder + .with_subagent_tools( + self.subagent_manager.clone(), + runtime.expect("sub-agent runtime should exist with active client"), + ); + if let Some(spawner) = workflow_spawner { + builder = builder.with_workflow_tool(spawner); + } + Some(builder.build(tool_context)) } else { Some(builder.build(tool_context)) } diff --git a/crates/tui/src/tools/mod.rs b/crates/tui/src/tools/mod.rs index db1e0f707..df804085e 100644 --- a/crates/tui/src/tools/mod.rs +++ b/crates/tui/src/tools/mod.rs @@ -57,6 +57,7 @@ pub mod user_input; pub mod validate_data; pub mod web_run; pub mod web_search; +pub mod workflow; pub use registry::{ToolRegistry, ToolRegistryBuilder}; pub use review::ReviewOutput; diff --git a/crates/tui/src/tools/registry.rs b/crates/tui/src/tools/registry.rs index b0e318447..9b82543d8 100644 --- a/crates/tui/src/tools/registry.rs +++ b/crates/tui/src/tools/registry.rs @@ -996,6 +996,16 @@ impl ToolRegistryBuilder { .with_tool(Arc::new(AgentCloseTool::new(manager))) } + /// Include the WhaleFlow workflow_run tool. + #[must_use] + pub fn with_workflow_tool( + self, + spawner: std::sync::Arc, + ) -> Self { + use super::workflow::WorkflowRunTool; + self.with_tool(Arc::new(WorkflowRunTool::new(spawner))) + } + /// Build the registry with the given context. #[must_use] pub fn build(self, context: ToolContext) -> ToolRegistry { diff --git a/crates/tui/src/tools/workflow/mod.rs b/crates/tui/src/tools/workflow/mod.rs new file mode 100644 index 000000000..16400aa99 --- /dev/null +++ b/crates/tui/src/tools/workflow/mod.rs @@ -0,0 +1,254 @@ +//! WhaleFlow TUI integration — AgentSpawner implementation and tool registration. +//! +//! Implements [`codewhale_whaleflow::AgentSpawner`] using CodeWhale's existing +//! [`SubAgentManager`](crate::tools::subagent::SubAgentManager) / +//! [`SubAgentRuntime`](crate::tools::subagent::SubAgentRuntime) infrastructure, +//! enabling whaleFlow's declarative scheduler to fan out sub-agents with +//! optional git-worktree isolation. + +use std::path::PathBuf; +use std::sync::Arc; + +use async_trait::async_trait; +use codewhale_whaleflow::{AgentResult, AgentSpawner, SpawnError, WorktreeManager}; +use serde_json::Value; + +use crate::tools::spec::{ToolCapability, ToolContext, ToolError, ToolResult, ToolSpec}; +use crate::tools::subagent::{ + SharedSubAgentManager, SubAgentRuntime, SubAgentStatus, SubAgentType, +}; + +/// Implements [`AgentSpawner`] using CodeWhale's `SubAgentManager`. +/// +/// Each call to [`spawn`](AgentSpawner::spawn) fans out a background sub-agent +/// via [`SubAgentManager::spawn_background`], then polls +/// [`SubAgentManager::get_result`] until the agent reaches a terminal state. +/// When a `cwd` is supplied (worktree isolation), the worktree is created +/// before spawn, and its changes are extracted and applied back to the +/// main workspace on success. +pub struct WhaleFlowSpawner { + manager: SharedSubAgentManager, + runtime: SubAgentRuntime, + workspace: PathBuf, +} + +impl WhaleFlowSpawner { + /// Create a new spawner. + /// + /// The `runtime` is used as the template for each child sub-agent; the + /// child runtime is derived via [`SubAgentRuntime::background_runtime`] + /// so children are detached from the parent turn's cancellation token. + #[must_use] + pub fn new( + manager: SharedSubAgentManager, + runtime: SubAgentRuntime, + workspace: PathBuf, + ) -> Self { + Self { + manager, + runtime, + workspace, + } + } +} + +#[async_trait] +impl AgentSpawner for WhaleFlowSpawner { + async fn spawn( + &self, + task_id: String, + prompt: String, + agent_type: Option, + cwd: Option, + ) -> Result { + // For worktree isolation: create the worktree if cwd is set + // (the scheduler pre-computes the path based on isolation mode). + // `WorktreeManager::create` is idempotent — no-op if the worktree + // already exists (e.g. reused across parallel phases). + let actual_cwd = if cwd.is_some() { + let worktree_path = WorktreeManager::create(&task_id, &self.workspace)?; + Some(worktree_path) + } else { + None + }; + + // Determine agent type. Default to General (full tool access). + let subagent_type = agent_type + .as_deref() + .and_then(SubAgentType::from_str) + .unwrap_or_default(); + + // Derive a detached child runtime so the sub-agent outlives the + // scheduler's turn token. + let mut child_runtime = self.runtime.background_runtime(); + if let Some(ref cwd_path) = actual_cwd { + child_runtime.context.workspace = cwd_path.clone(); + } + + // Spawn via the shared sub-agent manager. + let spawn_result = { + let mut mgr = self.manager.write().await; + mgr.spawn_background( + Arc::clone(&self.manager), + child_runtime, + subagent_type, + prompt, + None, // full tool access — same as a top-level sub-agent + ) + .map_err(|e| SpawnError::SpawnFailed(format!("{e}")))? + }; + + let agent_id = spawn_result.agent_id.clone(); + + tracing::debug!( + agent_id = %agent_id, + task_id = %task_id, + "WhaleFlow spawned sub-agent" + ); + + // Poll for completion. The sub-agent manager updates the snapshot + // in-place when the background task finishes. + loop { + let snapshot = { + let mgr = self.manager.read().await; + mgr.get_result(&agent_id) + .map_err(|e| SpawnError::Internal(format!("{e}")))? + }; + + match snapshot.status { + SubAgentStatus::Running => { + // Still running — back off before next poll. + tokio::time::sleep(std::time::Duration::from_millis(250)).await; + } + SubAgentStatus::Completed => { + let summary = snapshot.result.clone().unwrap_or_default(); + let elapsed_ms = Some(snapshot.duration_ms); + + // Clean up worktree if we created one: extract the + // diff patch, apply it to the main workspace, then + // remove the worktree. Best-effort — we already have + // the agent result, so worktree cleanup failures are + // logged but don't fail the task. + if cwd.is_some() { + if let Ok(patch) = + WorktreeManager::extract_changes(&task_id, &self.workspace) + { + if !patch.trim().is_empty() { + if let Err(e) = + WorktreeManager::apply_patch(&self.workspace, &patch) + { + tracing::warn!( + task_id = %task_id, + error = %e, + "Failed to apply worktree patch" + ); + } + } + } + if let Err(e) = WorktreeManager::remove(&task_id, &self.workspace) { + tracing::warn!( + task_id = %task_id, + error = %e, + "Failed to remove worktree" + ); + } + } + + return Ok(AgentResult { + task_id, + success: true, + summary, + files_touched: Vec::new(), + raw_output: snapshot.result, + tokens_used: None, + cost_usd: None, + elapsed_ms, + last_checkpoint: None, + }); + } + SubAgentStatus::Failed(err) | SubAgentStatus::Interrupted(err) => { + let _ = WorktreeManager::remove(&task_id, &self.workspace); + return Err(SpawnError::SpawnFailed(err)); + } + SubAgentStatus::Cancelled => { + let _ = WorktreeManager::remove(&task_id, &self.workspace); + return Err(SpawnError::Cancelled( + "agent cancelled".to_string(), + )); + } + } + } + } +} + +// --------------------------------------------------------------------------- +// workflow_run tool +// --------------------------------------------------------------------------- + +/// The `workflow_run` tool — exposed to DeepSeek so it can orchestrate +/// multi-agent workflows via WhaleFlow's declarative scheduler. +pub struct WorkflowRunTool { + spawner: Arc, +} + +impl WorkflowRunTool { + /// Create a new `workflow_run` tool backed by the given spawner. + #[must_use] + pub fn new(spawner: Arc) -> Self { + Self { spawner } + } +} + +#[async_trait] +impl ToolSpec for WorkflowRunTool { + fn name(&self) -> &'static str { + "workflow_run" + } + + fn description(&self) -> &'static str { + concat!( + "Run a declarative multi-agent workflow. Provide a JSON config with a goal and phases, ", + "each containing tasks with prompts, dependencies, and optional isolation. ", + "The scheduler will fan out sub-agents, pipe results between dependent tasks, ", + "and return a structured result summarizing every agent's output." + ) + } + + fn input_schema(&self) -> Value { + serde_json::from_str(codewhale_whaleflow::tool::WORKFLOW_RUN_SCHEMA) + .unwrap_or_else(|_| serde_json::json!({})) + } + + fn capabilities(&self) -> Vec { + vec![ToolCapability::ReadOnly] + } + + fn supports_parallel(&self) -> bool { + false + } + + async fn execute( + &self, + input: Value, + _context: &ToolContext, + ) -> Result { + // Extract the `config` sub-object and serialize it as the + // WorkflowConfig JSON that the whaleflow scheduler expects. + let config = input + .get("config") + .cloned() + .ok_or_else(|| ToolError::missing_field("config"))?; + + let config_json = + serde_json::to_string(&config).map_err(|e| { + ToolError::invalid_input(format!("failed to serialize config: {e}")) + })?; + + let spawner: Arc = self.spawner.clone(); + + match codewhale_whaleflow::tool::execute_workflow(&config_json, spawner).await { + Ok(result_json) => Ok(ToolResult::success(result_json)), + Err(err) => Ok(ToolResult::error(err)), + } + } +} diff --git a/crates/whaleflow/Cargo.toml b/crates/whaleflow/Cargo.toml new file mode 100644 index 000000000..1d686bcd2 --- /dev/null +++ b/crates/whaleflow/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "codewhale-whaleflow" +version.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +description = "WhaleFlow: declarative multi-agent workflow orchestration for CodeWhale" + +[dependencies] +anyhow.workspace = true +async-trait.workspace = true +serde.workspace = true +serde_json.workspace = true +thiserror.workspace = true +tokio.workspace = true +tracing.workspace = true +uuid.workspace = true diff --git a/crates/whaleflow/src/config.rs b/crates/whaleflow/src/config.rs new file mode 100644 index 000000000..84987a250 --- /dev/null +++ b/crates/whaleflow/src/config.rs @@ -0,0 +1,644 @@ +//! Workflow configuration schema. +//! +//! The model generates a [`WorkflowConfig`] JSON document that describes +//! the phases, tasks, dependencies, and concurrency constraints. The +//! scheduler reads this config and executes it. + +use serde::{Deserialize, Serialize}; + +/// Top-level workflow configuration generated by the model. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct WorkflowConfig { + /// Human-readable goal for the workflow run. + pub goal: String, + + /// Maximum concurrent agents across all phases. + /// Default: 6. Hard ceiling enforced by the scheduler. + #[serde(default = "default_max_concurrent")] + pub max_concurrent: usize, + + /// Ordered list of phases. Phases without explicit `depends_on` + /// run in declaration order. + pub phases: Vec, +} + +/// A phase is a group of tasks that share the same dependency boundary. +/// +/// All tasks within a phase can run concurrently (up to `max_concurrent`). +/// The phase completes when all its tasks have finished or been skipped. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct Phase { + /// Display name for progress reporting. + pub name: String, + + /// Phases this phase depends on. The scheduler topologically sorts + /// phases so that dependencies complete before this phase starts. + #[serde(default)] + pub depends_on: Vec, + + /// If true, all tasks in this phase run concurrently. + /// If false, tasks run sequentially. + #[serde(default = "default_true")] + pub parallel: bool, + + /// Failure policy for this phase. + #[serde(default)] + pub on_failure: FailurePolicy, + + /// Tasks in this phase. + pub tasks: Vec, +} + +/// How the scheduler handles task failures within a phase. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum FailurePolicy { + /// Skip the failed task. Continue with remaining tasks. + /// Downstream tasks that depend on the failed task get skipped. + #[default] + SkipContinue, + + /// Abort the entire workflow immediately on any failure. + Abort, +} + +/// Whether a task only reads or also writes files. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum TaskMode { + /// Task only reads files (discovery, audit, review, research). + /// Any number of read-only tasks can run in parallel safely. + #[default] + ReadOnly, + /// Task may write files. The scheduler uses `file_scope` to detect + /// conflicts with other concurrent write tasks. + ReadWrite, +} + +/// How the agent is isolated from the main workspace. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum IsolationMode { + /// Agent works directly in the main workspace (default). + /// File-scope conflict detection prevents overlapping writes. + #[default] + Shared, + /// Agent runs in a dedicated git worktree. No conflict risk. + /// The scheduler creates and cleans up the worktree automatically. + Worktree, +} + +impl IsolationMode { + /// Return the worktree path if this task should run in one. + /// The actual worktree creation is handled by the spawner. + pub fn cwd_path(&self) -> Option { + match self { + IsolationMode::Shared => None, + IsolationMode::Worktree => None, // Path set by spawner, not here + } + } +} + +/// A single unit of work assigned to one agent. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct Task { + /// Unique identifier within the workflow (e.g. "scan-auth"). + pub id: String, + + /// Prompt sent to the sub-agent. + pub prompt: String, + + /// Sub-agent type: "explore", "review", "implementer", "verifier", or + /// `None` for the default type. + #[serde(default)] + pub agent_type: Option, + + /// Tasks whose results should be injected into this task's prompt + /// as context. Used for cross-phase data flow. + #[serde(default)] + pub depends_on_results: Vec, + + /// Maximum steps the agent may take. Maps to `max_depth` on `agent_open`. + #[serde(default)] + pub max_steps: Option, + + /// Timeout for this specific task in seconds. + #[serde(default)] + pub timeout_secs: Option, + + /// Whether the task reads only or may write files. + /// Scheduler uses this for conflict detection in parallel phases. + #[serde(default)] + pub mode: TaskMode, + + /// File glob patterns the task is scoped to (e.g. `["src/auth/**"]`). + /// Required for `ReadWrite` tasks in parallel phases. + /// Scheduler detects overlapping scopes and serializes or warns. + #[serde(default)] + pub file_scope: Vec, + + /// Isolation strategy for this agent. + #[serde(default)] + pub isolation: IsolationMode, +} + +fn default_max_concurrent() -> usize { + 6 +} + +fn default_true() -> bool { + true +} + +// --------------------------------------------------------------------------- +// Validation +// --------------------------------------------------------------------------- + +impl WorkflowConfig { + /// Validate the config for structural correctness. + /// + /// Returns `Ok(())` if the config is valid, or a list of errors. + pub fn validate(&self) -> Result<(), Vec> { + let mut errors: Vec = Vec::new(); + + // Phases must exist. + if self.phases.is_empty() { + errors.push("at least one phase is required".into()); + } + + // Every phase must have at least one task. + for phase in &self.phases { + if phase.tasks.is_empty() { + errors.push(format!("phase '{}' has no tasks", phase.name)); + } + } + + // Task IDs must be unique across the entire workflow. + let mut seen_ids = std::collections::HashSet::new(); + for phase in &self.phases { + for task in &phase.tasks { + if !seen_ids.insert(&task.id) { + errors.push(format!("duplicate task id '{}'", task.id)); + } + } + } + + // `depends_on` must reference valid phase names. + for phase in &self.phases { + for dep in &phase.depends_on { + if dep == &phase.name { + errors.push(format!( + "phase '{}' depends on itself", + phase.name + )); + } + if !self.phases.iter().any(|p| &p.name == dep) { + errors.push(format!( + "phase '{}' depends on unknown phase '{}'", + phase.name, dep + )); + } + } + } + + // `depends_on_results` must reference valid task IDs. + for phase in &self.phases { + for task in &phase.tasks { + for dep in &task.depends_on_results { + if !seen_ids.contains(dep) { + errors.push(format!( + "task '{}' depends on unknown task result '{}'", + task.id, dep + )); + } + } + } + } + + // Detect cycles in phase dependencies. + if let Some(cycle) = detect_cycle(&self.phases) { + errors.push(format!( + "cycle detected in phase dependencies: {}", + cycle.join(" -> ") + )); + } + + if errors.is_empty() { + Ok(()) + } else { + Err(errors) + } + } + + /// Detect file-scope conflicts among `ReadWrite` tasks in parallel phases. + /// + /// Returns a list of conflict descriptions. An empty list means no + /// overlapping scopes were detected. `Worktree`-isolated tasks are + /// exempt from conflict detection. + pub fn detect_conflicts(&self) -> Vec { + let mut conflicts: Vec = Vec::new(); + + for phase in &self.phases { + if !phase.parallel || phase.tasks.len() < 2 { + continue; + } + + let write_tasks: Vec<&Task> = phase + .tasks + .iter() + .filter(|t| t.mode == TaskMode::ReadWrite && t.isolation != IsolationMode::Worktree) + .collect(); + + // Each ReadWrite task must declare a file_scope in a parallel phase. + for task in &write_tasks { + if task.file_scope.is_empty() { + conflicts.push(Conflict { + kind: ConflictKind::MissingFileScope, + phase: phase.name.clone(), + task_a: task.id.clone(), + task_b: String::new(), + description: format!( + "task '{}' is ReadWrite with no file_scope in parallel phase '{}'. \ + Add file_scope, set isolation to 'worktree', or make the phase sequential.", + task.id, phase.name + ), + }); + } + } + + // Check for overlapping scopes between pairs of write tasks. + for i in 0..write_tasks.len() { + for j in (i + 1)..write_tasks.len() { + let a = write_tasks[i]; + let b = write_tasks[j]; + + if scopes_overlap(&a.file_scope, &b.file_scope) { + conflicts.push(Conflict { + kind: ConflictKind::OverlappingScopes, + phase: phase.name.clone(), + task_a: a.id.clone(), + task_b: b.id.clone(), + description: format!( + "tasks '{}' and '{}' have overlapping file scopes in parallel \ + phase '{}'. Use 'worktree' isolation, disjoint scopes, or \ + make the phase sequential.", + a.id, b.id, phase.name + ), + }); + } + } + } + } + + conflicts + } +} + +/// A conflict detected during validation. +#[derive(Debug, Clone)] +pub struct Conflict { + pub kind: ConflictKind, + pub phase: String, + pub task_a: String, + pub task_b: String, + pub description: String, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConflictKind { + /// A ReadWrite task in a parallel phase has no file_scope. + MissingFileScope, + /// Two ReadWrite tasks have overlapping file scopes. + OverlappingScopes, +} + +impl Conflict { + /// Human-readable name for the conflict kind. + pub fn kind_name(&self) -> &'static str { + match self.kind { + ConflictKind::MissingFileScope => "missing_file_scope", + ConflictKind::OverlappingScopes => "overlapping_scopes", + } + } +} + +/// Check if two sets of file scope patterns overlap. +/// +/// Uses prefix matching: strips glob suffixes (`/**`, `/*`) and checks +/// if one prefix starts with the other. Simple but effective for the +/// typical patterns the model generates (e.g. `src/auth/**`). +fn scopes_overlap(a: &[String], b: &[String]) -> bool { + if a.is_empty() || b.is_empty() { + return false; + } + + fn strip_glob(s: &str) -> &str { + s.trim_end_matches('/') + .trim_end_matches("**") + .trim_end_matches('*') + .trim_end_matches('/') + } + + let a_prefixes: Vec<&str> = a.iter().map(|s| strip_glob(s)).collect(); + let b_prefixes: Vec<&str> = b.iter().map(|s| strip_glob(s)).collect(); + + for ap in &a_prefixes { + for bp in &b_prefixes { + if ap.starts_with(bp) || bp.starts_with(ap) { + return true; + } + } + } + + false +} + +/// Simple cycle detection via DFS. +fn detect_cycle(phases: &[Phase]) -> Option> { + use std::collections::{HashMap, HashSet}; + + let phase_names: HashSet<&str> = phases.iter().map(|p| p.name.as_str()).collect(); + + // Build adjacency: phase -> phases it depends on + let mut edges: HashMap<&str, Vec<&str>> = HashMap::new(); + for phase in phases { + let deps: Vec<&str> = phase + .depends_on + .iter() + .filter(|d| phase_names.contains(d.as_str())) + .map(|s| s.as_str()) + .collect(); + edges.insert(&phase.name, deps); + } + + #[derive(Clone, Copy, PartialEq, Eq)] + enum Color { + White, + Gray, + Black, + } + + let mut colors: HashMap<&str, Color> = HashMap::new(); + for name in &phase_names { + colors.insert(name, Color::White); + } + + fn dfs<'a>( + node: &'a str, + edges: &HashMap<&'a str, Vec<&'a str>>, + colors: &mut HashMap<&'a str, Color>, + path: &mut Vec<&'a str>, + ) -> Option> { + colors.insert(node, Color::Gray); + path.push(node); + + if let Some(neighbors) = edges.get(node) { + for &neighbor in neighbors { + let color = colors[neighbor]; + if color == Color::Gray { + // Found a cycle: extract it from the path. + let cycle_start = path.iter().position(|&n| n == neighbor).unwrap(); + let cycle: Vec = path[cycle_start..] + .iter() + .map(|s| s.to_string()) + .collect(); + return Some(cycle); + } + if color == Color::White { + if let Some(cycle) = dfs(neighbor, edges, colors, path) { + return Some(cycle); + } + } + } + } + + path.pop(); + colors.insert(node, Color::Black); + None + } + + for name in &phase_names { + if colors[name] == Color::White { + let mut path = Vec::new(); + if let Some(cycle) = dfs(name, &edges, &mut colors, &mut path) { + return Some(cycle); + } + } + } + + None +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + fn make_task(id: &str, prompt: &str) -> Task { + Task { + id: id.into(), + prompt: prompt.into(), + agent_type: None, + depends_on_results: vec![], + max_steps: None, + timeout_secs: None, + mode: TaskMode::ReadOnly, + file_scope: vec![], + isolation: IsolationMode::Shared, + } + } + + fn make_write_task(id: &str, prompt: &str, scope: &[&str]) -> Task { + Task { + id: id.into(), + prompt: prompt.into(), + mode: TaskMode::ReadWrite, + file_scope: scope.iter().map(|s| s.to_string()).collect(), + ..make_task(id, prompt) + } + } + + #[test] + fn valid_config_passes() { + let config = WorkflowConfig { + goal: "test".into(), + max_concurrent: 4, + phases: vec![Phase { + name: "discovery".into(), + depends_on: vec![], + parallel: true, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![make_task("scan-1", "scan")], + }], + }; + assert!(config.validate().is_ok()); + } + + #[test] + fn duplicate_task_ids_fail() { + let config = WorkflowConfig { + goal: "test".into(), + max_concurrent: 4, + phases: vec![ + Phase { + name: "a".into(), + depends_on: vec![], + parallel: true, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![make_task("dup", "x")], + }, + Phase { + name: "b".into(), + depends_on: vec![], + parallel: true, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![make_task("dup", "y")], + }, + ], + }; + let err = config.validate().unwrap_err(); + assert!(err.iter().any(|e| e.contains("duplicate"))); + } + + #[test] + fn cycle_detection() { + let config = WorkflowConfig { + goal: "test".into(), + max_concurrent: 4, + phases: vec![ + Phase { + name: "a".into(), + depends_on: vec!["b".into()], + parallel: true, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![make_task("t1", "x")], + }, + Phase { + name: "b".into(), + depends_on: vec!["a".into()], + parallel: true, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![make_task("t2", "y")], + }, + ], + }; + let err = config.validate().unwrap_err(); + assert!(err.iter().any(|e| e.contains("cycle"))); + } + + #[test] + fn no_conflicts_for_read_only_tasks() { + let config = WorkflowConfig { + goal: "test".into(), + max_concurrent: 4, + phases: vec![Phase { + name: "audit".into(), + depends_on: vec![], + parallel: true, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![make_task("a", "scan a"), make_task("b", "scan b")], + }], + }; + assert!(config.detect_conflicts().is_empty()); + } + + #[test] + fn missing_file_scope_detected() { + let config = WorkflowConfig { + goal: "test".into(), + max_concurrent: 4, + phases: vec![Phase { + name: "fixes".into(), + depends_on: vec![], + parallel: true, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![ + make_task("a", "fix a"), // ReadOnly — fine + Task { + mode: TaskMode::ReadWrite, + file_scope: vec![], + ..make_task("b", "fix b") + }, // ReadWrite, no scope + ], + }], + }; + let conflicts = config.detect_conflicts(); + assert_eq!(conflicts.len(), 1); + assert_eq!(conflicts[0].kind, ConflictKind::MissingFileScope); + } + + #[test] + fn overlapping_scopes_detected() { + let config = WorkflowConfig { + goal: "test".into(), + max_concurrent: 4, + phases: vec![Phase { + name: "fixes".into(), + depends_on: vec![], + parallel: true, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![ + make_write_task("a", "fix auth", &["src/auth/**"]), + make_write_task("b", "fix api", &["src/auth/**"]), + ], + }], + }; + let conflicts = config.detect_conflicts(); + assert_eq!(conflicts.len(), 1); + assert_eq!(conflicts[0].kind, ConflictKind::OverlappingScopes); + } + + #[test] + fn disjoint_scopes_no_conflict() { + let config = WorkflowConfig { + goal: "test".into(), + max_concurrent: 4, + phases: vec![Phase { + name: "fixes".into(), + depends_on: vec![], + parallel: true, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![ + make_write_task("a", "fix auth", &["src/auth/**"]), + make_write_task("b", "fix api", &["src/api/**"]), + ], + }], + }; + assert!(config.detect_conflicts().is_empty()); + } + + #[test] + fn worktree_tasks_exempt_from_conflicts() { + let config = WorkflowConfig { + goal: "test".into(), + max_concurrent: 4, + phases: vec![Phase { + name: "fixes".into(), + depends_on: vec![], + parallel: true, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![ + Task { + mode: TaskMode::ReadWrite, + file_scope: vec!["src/auth/**".into()], + isolation: IsolationMode::Worktree, + ..make_task("a", "fix auth") + }, + Task { + mode: TaskMode::ReadWrite, + file_scope: vec!["src/auth/**".into()], + isolation: IsolationMode::Worktree, + ..make_task("b", "fix auth too") + }, + ], + }], + }; + assert!(config.detect_conflicts().is_empty()); + } +} diff --git a/crates/whaleflow/src/lib.rs b/crates/whaleflow/src/lib.rs new file mode 100644 index 000000000..63ed91af1 --- /dev/null +++ b/crates/whaleflow/src/lib.rs @@ -0,0 +1,39 @@ +//! WhaleFlow — declarative multi-agent workflow orchestration for CodeWhale. +//! +//! WhaleFlow lets DeepSeek orchestrate sub-agent swarms at scale using a +//! declarative JSON config. The model writes a workflow plan with phases, +//! tasks, and dependencies; the scheduler fans out sub-agents, pipes results +//! between dependent tasks, and returns an integrated structured result. +//! +//! ## Architecture +//! +//! ```text +//! ┌─────────────┐ ┌──────────────────┐ ┌─────────────────┐ +//! │ WorkflowConfig│ ──▶ │ Scheduler │ ──▶ │ AgentSpawner │ +//! │ (JSON) │ │ (topo sort, fan) │ │ (trait) │ +//! └─────────────┘ └──────────────────┘ └────────┬────────┘ +//! │ +//! ┌───────▼────────┐ +//! │ TUI crate │ +//! │ (SubAgentRuntime)│ +//! └────────────────┘ +//! ``` +//! +//! The `whaleflow` crate is pure orchestration logic. It has zero +//! dependencies on the TUI, network, or filesystem. The embedding +//! application provides a concrete [`AgentSpawner`] implementation. + +pub mod config; +pub mod result; +pub mod scheduler; +pub mod spawner; +pub mod worktree; +pub mod tool; + +pub use config::{ + Conflict, ConflictKind, FailurePolicy, IsolationMode, Phase, Task, TaskMode, WorkflowConfig, +}; +pub use result::{WorkflowResult, WorkflowStatus}; +pub use scheduler::Scheduler; +pub use spawner::{AgentResult, AgentSpawner, SpawnError}; +pub use worktree::WorktreeManager; diff --git a/crates/whaleflow/src/result.rs b/crates/whaleflow/src/result.rs new file mode 100644 index 000000000..e4c5e6d0d --- /dev/null +++ b/crates/whaleflow/src/result.rs @@ -0,0 +1,121 @@ +//! Structured result returned to the orchestrator model after a workflow run. + +use serde::{Deserialize, Serialize}; + +/// Outcome of a complete workflow run, returned to the model as the +/// tool result for `workflow_run`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WorkflowResult { + /// The goal from the original config. + pub goal: String, + + /// Overall status. + pub status: WorkflowStatus, + + /// Results per phase, in execution order. + pub phases: Vec, + + /// Aggregate counts. + pub counts: TaskCounts, + + /// Human-readable summary suitable for model consumption. + pub summary: String, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum WorkflowStatus { + /// All tasks completed successfully. + Completed, + /// Some tasks failed but workflow continued (SkipContinue policy). + Partial, + /// Workflow was aborted due to a failure (Abort policy). + Aborted, + /// Workflow was cancelled by user interrupt. + Cancelled, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PhaseResult { + pub name: String, + pub status: TaskStatus, + pub tasks: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TaskResult { + pub id: String, + pub status: TaskStatus, + /// Summary from the agent output (truncated). + pub summary: Option, + /// Files the agent touched. + pub files_touched: Vec, + /// Error message if the task failed. + pub error: Option, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum TaskStatus { + Pending, + Running, + Completed, + Failed, + Skipped, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)] +pub struct TaskCounts { + pub total: usize, + pub completed: usize, + pub failed: usize, + pub skipped: usize, + pub pending: usize, +} + +impl WorkflowResult { + /// Build a human-readable summary for the orchestrator model. + pub fn build_summary(&mut self) { + let counts = &self.counts; + let status_str = match self.status { + WorkflowStatus::Completed => "completed", + WorkflowStatus::Partial => "completed with failures", + WorkflowStatus::Aborted => "aborted", + WorkflowStatus::Cancelled => "cancelled", + }; + + let mut parts = vec![format!( + "Workflow '{}' {}. {} total tasks: {} completed, {} failed, {} skipped.", + self.goal, + status_str, + counts.total, + counts.completed, + counts.failed, + counts.skipped, + )]; + + for phase in &self.phases { + parts.push(format!("\n## Phase: {}", phase.name)); + for task in &phase.tasks { + let icon = match task.status { + TaskStatus::Completed => "✓", + TaskStatus::Failed => "✗", + TaskStatus::Skipped => "⊘", + _ => "○", + }; + let summary = task.summary.as_deref().unwrap_or("(no summary)"); + let files = if task.files_touched.is_empty() { + String::new() + } else { + format!(" [files: {}]", task.files_touched.join(", ")) + }; + parts.push(format!(" {} {}: {}{}", icon, task.id, summary, files)); + if let Some(ref err) = task.error { + parts.push(format!(" error: {}", err)); + } + } + } + + self.summary = parts.join("\n"); + } +} diff --git a/crates/whaleflow/src/scheduler.rs b/crates/whaleflow/src/scheduler.rs new file mode 100644 index 000000000..c21423096 --- /dev/null +++ b/crates/whaleflow/src/scheduler.rs @@ -0,0 +1,570 @@ +//! Phase scheduler with topological ordering, concurrency control, +//! result plumbing, and failure handling. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use tokio::sync::Semaphore; +use tracing::{debug, info, warn}; + +use crate::config::{FailurePolicy, Task, WorkflowConfig}; +use crate::result::{ + PhaseResult, TaskCounts, TaskResult, TaskStatus, WorkflowResult, WorkflowStatus, +}; +use crate::spawner::{AgentResult, AgentSpawner}; + +/// Executes a workflow config against the provided agent spawner. +pub struct Scheduler { + config: WorkflowConfig, + spawner: Arc, + /// Shared concurrency semaphore across all phases. + concurrency: Arc, + /// Accumulated results keyed by task id. + results: HashMap, +} + +impl Scheduler { + pub fn new(config: WorkflowConfig, spawner: Arc) -> Self { + let max = config.max_concurrent.max(1); + Self { + config, + spawner, + concurrency: Arc::new(Semaphore::new(max)), + results: HashMap::new(), + } + } + + /// Run the full workflow. Returns a structured result. + pub async fn run(&mut self) -> WorkflowResult { + info!( + goal = %self.config.goal, + phases = self.config.phases.len(), + max_concurrent = self.config.max_concurrent, + "starting workflow" + ); + + // Validate config before execution. + if let Err(errors) = self.config.validate() { + return self.fail_fast("config validation failed", &errors); + } + + // Topological sort phases. + let ordered = match self.topological_sort() { + Ok(phases) => phases, + Err(cycle) => { + return self.fail_fast( + "cycle detected in phase dependencies", + &[format!("cycle: {}", cycle.join(" -> "))], + ); + } + }; + + let mut phase_results: Vec = Vec::new(); + let mut workflow_status = WorkflowStatus::Completed; + + // Pre-extract per-phase failure policy to avoid borrow conflicts. + let abort_policies: HashMap = self + .config + .phases + .iter() + .map(|p| (p.name.clone(), p.on_failure == FailurePolicy::Abort)) + .collect(); + + for phase_name in &ordered { + debug!(phase = %phase_name, "executing phase"); + + let result = self.run_phase(phase_name).await; + let phase_status = phase_status(&result.0.tasks); + + if phase_status == TaskStatus::Failed && abort_policies.get(phase_name).copied().unwrap_or(false) { + workflow_status = WorkflowStatus::Aborted; + phase_results.push(result.0); + break; + } + + phase_results.push(result.0); + } + + let counts = compute_counts(&phase_results); + let mut result = WorkflowResult { + goal: self.config.goal.clone(), + status: if counts.failed > 0 && workflow_status != WorkflowStatus::Aborted { + WorkflowStatus::Partial + } else { + workflow_status + }, + phases: phase_results, + counts, + summary: String::new(), + }; + result.build_summary(); + result + } + + /// Execute all tasks in a single phase, respecting the concurrency limit. + async fn run_phase(&mut self, phase_name: &str) -> (PhaseResult, TaskStatus) { + let phase = self + .config + .phases + .iter() + .find(|p| &p.name == phase_name) + .unwrap() + .clone(); + let mut task_results: Vec = Vec::new(); + + if phase.parallel && phase.tasks.len() > 1 { + // Fan-out: spawn all tasks, limited by semaphore. + let mut handles = Vec::new(); + for task in &phase.tasks { + let task_id = task.id.clone(); + let task = task.clone(); + let spawner = Arc::clone(&self.spawner); + let sem = Arc::clone(&self.concurrency); + let prompt = self.build_prompt(&task); + + let task_id_for_closure = task_id.clone(); + let cwd = task.isolation.cwd_path(); + let handle = tokio::spawn(async move { + let _permit = sem.acquire().await; + spawner + .spawn(task_id_for_closure, prompt, task.agent_type.clone(), cwd) + .await + }); + handles.push((task_id, handle)); + } + + for (task_id, handle) in handles { + match handle.await { + Ok(Ok(agent_result)) => { + self.results.insert(task_id.clone(), agent_result.clone()); + task_results.push(TaskResult { + id: task_id, + status: TaskStatus::Completed, + summary: Some(truncate(&agent_result.summary, 500)), + files_touched: agent_result.files_touched, + error: None, + }); + } + Ok(Err(spawn_err)) => { + warn!(task = %task_id, error = %spawn_err, "task failed"); + task_results.push(TaskResult { + id: task_id, + status: TaskStatus::Failed, + summary: None, + files_touched: vec![], + error: Some(spawn_err.to_string()), + }); + } + Err(join_err) => { + warn!(task = %task_id, error = %join_err, "task panicked"); + task_results.push(TaskResult { + id: task_id, + status: TaskStatus::Failed, + summary: None, + files_touched: vec![], + error: Some(format!("join error: {}", join_err)), + }); + } + } + } + } else { + // Sequential execution. + for task in &phase.tasks { + let prompt = self.build_prompt(task); + let _permit = self.concurrency.acquire().await; + let cwd = task.isolation.cwd_path(); + + match self + .spawner + .spawn(task.id.clone(), prompt, task.agent_type.clone(), cwd) + .await + { + Ok(agent_result) => { + self.results.insert(task.id.clone(), agent_result.clone()); + task_results.push(TaskResult { + id: task.id.clone(), + status: TaskStatus::Completed, + summary: Some(truncate(&agent_result.summary, 500)), + files_touched: agent_result.files_touched, + error: None, + }); + } + Err(spawn_err) => { + warn!(task = %task.id, error = %spawn_err, "task failed"); + task_results.push(TaskResult { + id: task.id.clone(), + status: TaskStatus::Failed, + summary: None, + files_touched: vec![], + error: Some(spawn_err.to_string()), + }); + + if phase.on_failure == FailurePolicy::Abort { + // Mark remaining tasks as skipped. + // (We already collected results for completed tasks.) + break; + } + } + } + } + } + + // Mark unexecuted tasks as skipped. + let executed: HashSet<&str> = task_results.iter().map(|t| t.id.as_str()).collect(); + let mut skipped: Vec = Vec::new(); + for task in &phase.tasks { + if !executed.contains(task.id.as_str()) { + skipped.push(TaskResult { + id: task.id.clone(), + status: TaskStatus::Skipped, + summary: None, + files_touched: vec![], + error: None, + }); + } + } + drop(executed); + task_results.extend(skipped); + + let pstatus = phase_status(&task_results); + ( + PhaseResult { + name: phase.name.clone(), + status: pstatus, + tasks: task_results, + }, + pstatus, + ) + } + + /// Build the agent prompt, injecting results from upstream dependencies. + fn build_prompt(&self, task: &Task) -> String { + if task.depends_on_results.is_empty() { + return task.prompt.clone(); + } + + let mut ctx = String::from("## Context from upstream tasks\n\n"); + for dep_id in &task.depends_on_results { + if let Some(result) = self.results.get(dep_id) { + ctx.push_str(&format!( + "### {} ({})\n{}\n\n", + dep_id, + if result.success { "success" } else { "failed" }, + truncate(&result.summary, 1000), + )); + } else { + ctx.push_str(&format!("### {} (not available)\n\n", dep_id)); + } + } + ctx.push_str("---\n\n"); + ctx.push_str(&task.prompt); + ctx + } + + /// Topological sort of phases by `depends_on`. + fn topological_sort(&self) -> Result, Vec> { + let mut in_degree: HashMap<&str, usize> = HashMap::new(); + let mut adjacency: HashMap<&str, Vec<&str>> = HashMap::new(); + + for phase in &self.config.phases { + in_degree.entry(&phase.name).or_insert(0); + adjacency.entry(&phase.name).or_default(); + for dep in &phase.depends_on { + adjacency.entry(dep.as_str()).or_default().push(&phase.name); + *in_degree.entry(&phase.name).or_insert(0) += 1; + } + } + + let mut queue: Vec<&str> = in_degree + .iter() + .filter(|(_, deg)| **deg == 0) + .map(|(&name, _)| name) + .collect(); + + let mut sorted: Vec = Vec::new(); + while let Some(node) = queue.pop() { + sorted.push(node.to_string()); + if let Some(neighbors) = adjacency.get(node) { + for &neighbor in neighbors { + let deg = in_degree.get_mut(neighbor).unwrap(); + *deg -= 1; + if *deg == 0 { + queue.push(neighbor); + } + } + } + } + + if sorted.len() != self.config.phases.len() { + // Cycle: find it for the error message. + Err(self.find_cycle()) + } else { + Ok(sorted) + } + } + + fn find_cycle(&self) -> Vec { + // Reuse config's cycle detection. + // (Simplified: just return phase names for now; the config validator + // already catches cycles before we get here.) + self.config.phases.iter().map(|p| p.name.clone()).collect() + } + + fn fail_fast(&self, reason: &str, details: &[String]) -> WorkflowResult { + let mut result = WorkflowResult { + goal: self.config.goal.clone(), + status: WorkflowStatus::Aborted, + phases: vec![], + counts: TaskCounts::default(), + summary: String::new(), + }; + result.summary = format!( + "Workflow aborted: {}\n{}", + reason, + details + .iter() + .map(|d| format!(" - {}", d)) + .collect::>() + .join("\n") + ); + result + } +} + +/// Determine the aggregate status of a phase from its task results. +fn phase_status(tasks: &[TaskResult]) -> TaskStatus { + if tasks.iter().all(|t| t.status == TaskStatus::Completed) { + TaskStatus::Completed + } else if tasks.iter().any(|t| t.status == TaskStatus::Failed) { + TaskStatus::Failed + } else if tasks.iter().all(|t| t.status == TaskStatus::Skipped) { + TaskStatus::Skipped + } else { + TaskStatus::Pending + } +} + +/// Compute aggregate task counts across all phases. +fn compute_counts(phases: &[PhaseResult]) -> TaskCounts { + let mut counts = TaskCounts::default(); + for phase in phases { + for task in &phase.tasks { + counts.total += 1; + match task.status { + TaskStatus::Completed => counts.completed += 1, + TaskStatus::Failed => counts.failed += 1, + TaskStatus::Skipped => counts.skipped += 1, + _ => counts.pending += 1, + } + } + } + counts +} + +/// Truncate a string to `max_len` characters, adding "..." if truncated. +fn truncate(s: &str, max_len: usize) -> String { + if s.len() <= max_len { + s.to_string() + } else { + let mut truncated: String = s.chars().take(max_len).collect(); + truncated.push_str("..."); + truncated + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::Phase; + use crate::spawner::{AgentResult, SpawnError}; + use crate::{IsolationMode, TaskMode}; + + struct MockSpawner { + responses: HashMap>, + } + + #[async_trait::async_trait] + impl AgentSpawner for MockSpawner { + async fn spawn( + &self, + task_id: String, + _prompt: String, + _agent_type: Option, + _cwd: Option, + ) -> Result { + match self.responses.get(&task_id) { + Some(result) => match result { + Ok(r) => Ok(AgentResult { + task_id: r.task_id.clone(), + success: r.success, + summary: r.summary.clone(), + files_touched: r.files_touched.clone(), + raw_output: r.raw_output.clone(), + tokens_used: r.tokens_used, + cost_usd: r.cost_usd, + elapsed_ms: r.elapsed_ms, + last_checkpoint: r.last_checkpoint.clone(), + }), + Err(_) => Err(SpawnError::SpawnFailed("mock error".into())), + }, + None => Ok(AgentResult { + task_id: task_id.clone(), + success: true, + summary: "mock result".into(), + files_touched: vec![], + raw_output: None, + tokens_used: None, + cost_usd: None, + elapsed_ms: None, + last_checkpoint: None, + }), + } + } + } + + fn mock_result(task_id: &str) -> AgentResult { + AgentResult { + task_id: task_id.into(), + success: true, + summary: format!("result from {}", task_id), + files_touched: vec!["src/lib.rs".into()], + raw_output: None, + tokens_used: Some(1000), + cost_usd: Some(0.01), + elapsed_ms: Some(500), + last_checkpoint: Some("mock checkpoint".into()), + } + } + + fn test_task(id: &str, prompt: &str) -> Task { + Task { + id: id.into(), + prompt: prompt.into(), + agent_type: None, + depends_on_results: vec![], + max_steps: None, + timeout_secs: None, + mode: TaskMode::ReadOnly, + file_scope: vec![], + isolation: IsolationMode::Shared, + } + } + + #[tokio::test] + async fn single_phase_parallel() { + let config = WorkflowConfig { + goal: "test".into(), + max_concurrent: 4, + phases: vec![Phase { + name: "discovery".into(), + depends_on: vec![], + parallel: true, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![test_task("a", "scan a"), test_task("b", "scan b")], + }], + }; + + let spawner = Arc::new(MockSpawner { + responses: HashMap::new(), + }); + let mut scheduler = Scheduler::new(config, spawner); + let result = scheduler.run().await; + + assert_eq!(result.status, WorkflowStatus::Completed); + assert_eq!(result.counts.total, 2); + assert_eq!(result.counts.completed, 2); + } + + #[tokio::test] + async fn phase_dependency_ordering() { + let config = WorkflowConfig { + goal: "test".into(), + max_concurrent: 4, + phases: vec![ + Phase { + name: "second".into(), + depends_on: vec!["first".into()], + parallel: false, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![Task { + id: "b".into(), + prompt: "second task".into(), + agent_type: None, + depends_on_results: vec!["a".into()], + max_steps: None, + timeout_secs: None, + mode: TaskMode::ReadOnly, + file_scope: vec![], + isolation: IsolationMode::Shared, + }], + }, + Phase { + name: "first".into(), + depends_on: vec![], + parallel: false, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![Task { + id: "a".into(), + prompt: "first task".into(), + agent_type: None, + depends_on_results: vec![], + max_steps: None, + timeout_secs: None, + mode: TaskMode::ReadOnly, + file_scope: vec![], + isolation: IsolationMode::Shared, + }], + }, + ], + }; + + let spawner = Arc::new(MockSpawner { + responses: HashMap::from([ + ("a".into(), Ok(mock_result("a"))), + ("b".into(), Ok(mock_result("b"))), + ]), + }); + let mut scheduler = Scheduler::new(config, spawner); + let result = scheduler.run().await; + + assert_eq!(result.status, WorkflowStatus::Completed); + assert_eq!(result.counts.completed, 2); + // Phase "first" should appear before "second" in results. + assert_eq!(result.phases[0].name, "first"); + assert_eq!(result.phases[1].name, "second"); + } + + #[tokio::test] + async fn skip_continue_on_failure() { + let config = WorkflowConfig { + goal: "test".into(), + max_concurrent: 4, + phases: vec![Phase { + name: "tasks".into(), + depends_on: vec![], + parallel: true, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![ + test_task("ok", "ok"), + test_task("fail", "fail"), + ], + }], + }; + + let spawner = Arc::new(MockSpawner { + responses: HashMap::from([ + ("ok".into(), Ok(mock_result("ok"))), + ( + "fail".into(), + Err(SpawnError::SpawnFailed("boom".into())), + ), + ]), + }); + let mut scheduler = Scheduler::new(config, spawner); + let result = scheduler.run().await; + + assert_eq!(result.status, WorkflowStatus::Partial); + assert_eq!(result.counts.completed, 1); + assert_eq!(result.counts.failed, 1); + } +} diff --git a/crates/whaleflow/src/spawner.rs b/crates/whaleflow/src/spawner.rs new file mode 100644 index 000000000..56e3a97a3 --- /dev/null +++ b/crates/whaleflow/src/spawner.rs @@ -0,0 +1,75 @@ +//! Abstract agent-spawning interface. +//! +//! WhaleFlow orchestrates sub-agents without depending on any specific +//! harness or runtime. The [`AgentSpawner`] trait is the seam: the +//! scheduler calls `spawn()`, and the embedding application (e.g. the +//! CodeWhale TUI crate) provides the concrete implementation backed by +//! `SubAgentRuntime`. + +use std::path::PathBuf; + +use async_trait::async_trait; + +/// Result of a single agent invocation. +#[derive(Debug, Clone)] +pub struct AgentResult { + /// The task id from the workflow config. + pub task_id: String, + /// Whether the agent completed without error. + pub success: bool, + /// Human-readable summary of findings / actions taken. + pub summary: String, + /// Paths the agent read or modified. + pub files_touched: Vec, + /// Raw output for piping to dependent tasks (may be large). + pub raw_output: Option, + /// Total tokens consumed by this agent (prompt + completion). + pub tokens_used: Option, + /// Cost in USD for this agent's API usage. + pub cost_usd: Option, + /// Elapsed wall-clock time for this agent. + pub elapsed_ms: Option, + /// Last completed tool call for progress display. + pub last_checkpoint: Option, +} + +/// Error conditions for agent spawning. +#[derive(Debug, thiserror::Error)] +pub enum SpawnError { + #[error("agent spawn timeout: {0}")] + Timeout(String), + #[error("agent spawn failed: {0}")] + SpawnFailed(String), + #[error("agent spawn cancelled: {0}")] + Cancelled(String), + #[error("worktree setup failed: {0}")] + WorktreeError(String), + #[error("worktree cleanup failed: {0}")] + CleanupError(String), + #[error("internal error: {0}")] + Internal(String), +} + +/// Abstract interface for spawning a single agent. +/// +/// The embedding application (TUI crate) implements this trait using +/// the existing `SubAgentManager` / `SubAgentRuntime` infrastructure. +/// This keeps `crates/whaleflow` free of TUI dependencies. +#[async_trait] +pub trait AgentSpawner: Send + Sync { + /// Spawn a single agent with the given task. + /// + /// If `cwd` is provided, the agent runs in that directory (used for + /// worktree isolation). The spawner is responsible for creating and + /// cleaning up the worktree if `isolation` is `Worktree`. + /// + /// The spawner should handle model selection, tool gating, and + /// session lifecycle. The scheduler only cares about the result. + async fn spawn( + &self, + task_id: String, + prompt: String, + agent_type: Option, + cwd: Option, + ) -> Result; +} diff --git a/crates/whaleflow/src/tool.rs b/crates/whaleflow/src/tool.rs new file mode 100644 index 000000000..76f360821 --- /dev/null +++ b/crates/whaleflow/src/tool.rs @@ -0,0 +1,77 @@ +/// Schema for the workflow_run tool that DeepSeek calls. +pub const WORKFLOW_RUN_SCHEMA: &str = r#"{ + "type": "object", + "properties": { + "config": { + "type": "object", + "description": "The workflow configuration", + "properties": { + "goal": {"type": "string", "description": "Human-readable goal"}, + "max_concurrent": {"type": "integer", "default": 6, "description": "Max concurrent agents"}, + "phases": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "depends_on": {"type": "array", "items": {"type": "string"}}, + "parallel": {"type": "boolean", "default": true}, + "on_failure": {"type": "string", "enum": ["skip_continue", "abort"], "default": "skip_continue"}, + "tasks": { + "type": "array", + "items": { + "type": "object", + "properties": { + "id": {"type": "string", "description": "Unique task id"}, + "prompt": {"type": "string", "description": "Prompt for the sub-agent"}, + "agent_type": {"type": "string", "description": "explore, review, implementer, verifier, or omit for default"}, + "depends_on_results": {"type": "array", "items": {"type": "string"}, "description": "Task IDs whose results feed into this task"}, + "mode": {"type": "string", "enum": ["read_only", "read_write"], "default": "read_only"}, + "file_scope": {"type": "array", "items": {"type": "string"}, "description": "Glob patterns for files this task touches"}, + "isolation": {"type": "string", "enum": ["shared", "worktree"], "default": "shared"} + }, + "required": ["id", "prompt"] + } + } + }, + "required": ["name", "tasks"] + } + } + }, + "required": ["goal", "phases"] + } + }, + "required": ["config"] +}"#; + +/// Run a workflow and return the result. +pub async fn execute_workflow( + config_json: &str, + spawner: std::sync::Arc, +) -> Result { + let config: crate::config::WorkflowConfig = serde_json::from_str(config_json) + .map_err(|e| format!("Invalid config JSON: {e}"))?; + + // Validate + config.validate().map_err(|errors| errors.join("\n"))?; + + // Check conflicts + let conflicts = config.detect_conflicts(); + if !conflicts.is_empty() { + let msg = conflicts + .iter() + .map(|c| format!("⚠ {}: {}", c.kind_name(), c.description)) + .collect::>() + .join("\n"); + // Don't fail — warn and continue. The scheduler will serialize if needed. + tracing::warn!("workflow conflicts detected:\n{msg}"); + } + + // Run + let mut scheduler = crate::Scheduler::new(config, spawner); + let result = scheduler.run().await; + + // Return structured result as JSON + serde_json::to_string_pretty(&result) + .map_err(|e| format!("Failed to serialize result: {e}")) +} diff --git a/crates/whaleflow/src/worktree.rs b/crates/whaleflow/src/worktree.rs new file mode 100644 index 000000000..f731a9cb2 --- /dev/null +++ b/crates/whaleflow/src/worktree.rs @@ -0,0 +1,272 @@ +//! Git worktree management for whaleFlow. +//! +//! [`WorktreeManager`] creates lightweight git worktrees so sub-agents +//! can work in isolation without colliding on the main workspace. After +//! the agent completes, the scheduler extracts changes as a patch, removes +//! the worktree, and applies the patch back to the main workspace. +//! +//! The module uses only `std::process::Command` and the crate's own +//! [`SpawnError`] error type — it has zero dependencies on the TUI crate. + +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::process::Command; + +use crate::spawner::SpawnError; + +/// Manager for git worktree lifecycle: create → extract → remove → apply. +pub struct WorktreeManager; + +impl WorktreeManager { + /// Create a new worktree for the given task. + /// + /// Runs `git worktree add .worktrees/whaleflow-{task_id} HEAD` inside + /// `workspace`. If the worktree directory already exists the call is + /// a no-op (idempotent) and the existing path is returned. + /// + /// # Errors + /// + /// Returns [`SpawnError::WorktreeError`] if the git command fails. + pub fn create(task_id: &str, workspace: &Path) -> Result { + let relative = format!(".worktrees/whaleflow-{}", task_id); + let worktree_path = workspace.join(&relative); + + // Idempotent: skip creation if the worktree already exists. + if worktree_path.exists() { + return Ok(worktree_path); + } + + let output = Command::new("git") + .arg("worktree") + .arg("add") + .arg(&relative) + .arg("HEAD") + .current_dir(workspace) + .output() + .map_err(|e| SpawnError::WorktreeError(format!("git worktree add failed: {}", e)))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(SpawnError::WorktreeError(format!( + "git worktree add failed: {}", + stderr.trim() + ))); + } + + Ok(worktree_path) + } + + /// Extract outstanding changes from a worktree as a unified-diff patch. + /// + /// Runs `git -C .worktrees/whaleflow-{task_id} diff HEAD`. + /// + /// # Errors + /// + /// Returns [`SpawnError::WorktreeError`] if the git command fails. + pub fn extract_changes(task_id: &str, workspace: &Path) -> Result { + let relative = format!(".worktrees/whaleflow-{}", task_id); + let worktree_path = workspace.join(&relative); + + let output = Command::new("git") + .arg("-C") + .arg(&worktree_path) + .arg("diff") + .arg("HEAD") + .output() + .map_err(|e| { + SpawnError::WorktreeError(format!("git diff in worktree failed: {}", e)) + })?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(SpawnError::WorktreeError(format!( + "git diff in worktree failed: {}", + stderr.trim() + ))); + } + + Ok(String::from_utf8_lossy(&output.stdout).into_owned()) + } + + /// Remove a worktree (and its directory) with `git worktree remove --force`. + /// + /// If the worktree directory does not exist the call is a no-op — this + /// is not treated as an error. + /// + /// # Errors + /// + /// Returns [`SpawnError::CleanupError`] if the git command fails. + pub fn remove(task_id: &str, workspace: &Path) -> Result<(), SpawnError> { + let relative = format!(".worktrees/whaleflow-{}", task_id); + let worktree_path = workspace.join(&relative); + + // No-op if the worktree does not exist. + if !worktree_path.exists() { + return Ok(()); + } + + let output = Command::new("git") + .arg("worktree") + .arg("remove") + .arg(&relative) + .arg("--force") + .current_dir(workspace) + .output() + .map_err(|e| { + SpawnError::CleanupError(format!("git worktree remove failed: {}", e)) + })?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(SpawnError::CleanupError(format!( + "git worktree remove failed: {}", + stderr.trim() + ))); + } + + Ok(()) + } + + /// Apply a patch to the main workspace via `git apply`. + /// + /// The patch text is written to stdin of the `git apply` process. + /// + /// # Errors + /// + /// Returns [`SpawnError::WorktreeError`] if the git command fails. + pub fn apply_patch(workspace: &Path, patch: &str) -> Result<(), SpawnError> { + let mut child = Command::new("git") + .arg("apply") + .current_dir(workspace) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .map_err(|e| SpawnError::WorktreeError(format!("git apply spawn failed: {}", e)))?; + + // Write the patch to stdin, then drop the handle to close the pipe + // before waiting on the child — otherwise `git apply` deadlocks + // waiting for EOF. + { + let mut stdin = child + .stdin + .take() + .ok_or_else(|| SpawnError::WorktreeError("failed to open git apply stdin".into()))?; + stdin + .write_all(patch.as_bytes()) + .map_err(|e| SpawnError::WorktreeError(format!("write patch to git apply failed: {}", e)))?; + } + + let output = child + .wait_with_output() + .map_err(|e| SpawnError::WorktreeError(format!("git apply wait failed: {}", e)))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(SpawnError::WorktreeError(format!( + "git apply failed: {}", + stderr.trim() + ))); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + use std::process::Command as StdCommand; + + /// Helper: initialise a temporary git repo. + fn init_temp_repo(suffix: &str) -> PathBuf { + let dir = std::env::temp_dir().join(format!("wf-test-{}-{}", std::process::id(), suffix)); + fs::create_dir_all(&dir).unwrap(); + StdCommand::new("git") + .arg("init") + .current_dir(&dir) + .output() + .unwrap(); + // Create an initial commit so HEAD exists. + let readme = dir.join("README.md"); + fs::write(&readme, "# test\n").unwrap(); + StdCommand::new("git") + .args(["add", "README.md"]) + .current_dir(&dir) + .output() + .unwrap(); + StdCommand::new("git") + .args(["commit", "-m", "init"]) + .current_dir(&dir) + .output() + .unwrap(); + dir + } + + #[test] + fn create_worktree_and_extract_changes() { + let repo = init_temp_repo("create"); + let task_id = "test-1"; + + // Create worktree. + let path = WorktreeManager::create(task_id, &repo).unwrap(); + assert!(path.exists()); + assert!(path.join("README.md").exists()); + + // Make a change inside the worktree. + fs::write(path.join("README.md"), "# modified\n").unwrap(); + + // Extract changes. + let patch = WorktreeManager::extract_changes(task_id, &repo).unwrap(); + assert!(patch.contains("# modified")); + + // Cleanup. + WorktreeManager::remove(task_id, &repo).unwrap(); + assert!(!path.exists()); + + // Cleanup repo. + let _ = fs::remove_dir_all(&repo); + } + + #[test] + fn create_is_idempotent() { + let repo = init_temp_repo("idemp"); + let task_id = "test-idemp"; + + let path1 = WorktreeManager::create(task_id, &repo).unwrap(); + let path2 = WorktreeManager::create(task_id, &repo).unwrap(); + assert_eq!(path1, path2); + + WorktreeManager::remove(task_id, &repo).unwrap(); + let _ = fs::remove_dir_all(&repo); + } + + #[test] + fn remove_nonexistent_is_noop() { + // Should not error when worktree doesn't exist. + let result = WorktreeManager::remove("no-such-task", Path::new("/tmp")); + assert!(result.is_ok()); + } + + #[test] + fn apply_patch_applies_changes() { + let repo = init_temp_repo("apply"); + let task_id = "test-apply"; + + // Create worktree, make a change, extract patch, remove worktree. + let path = WorktreeManager::create(task_id, &repo).unwrap(); + fs::write(path.join("README.md"), "# patched\n").unwrap(); + let patch = WorktreeManager::extract_changes(task_id, &repo).unwrap(); + WorktreeManager::remove(task_id, &repo).unwrap(); + + // Apply patch to main workspace. + WorktreeManager::apply_patch(&repo, &patch).unwrap(); + + // Verify the change landed. + let contents = fs::read_to_string(repo.join("README.md")).unwrap(); + assert_eq!(contents, "# patched\n"); + + let _ = fs::remove_dir_all(&repo); + } +} diff --git a/crates/whaleflow/tests/integration_test.rs b/crates/whaleflow/tests/integration_test.rs new file mode 100644 index 000000000..567abc963 --- /dev/null +++ b/crates/whaleflow/tests/integration_test.rs @@ -0,0 +1,284 @@ +//! Integration tests for WhaleFlow — full pipeline from config to result. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use codewhale_whaleflow::{ + AgentResult, AgentSpawner, FailurePolicy, IsolationMode, Phase, Scheduler, SpawnError, Task, + TaskMode, WorkflowConfig, +}; + +/// A controllable mock spawner for integration testing. +struct MockSpawner { + responses: HashMap>, +} + +impl MockSpawner { + fn new() -> Self { + Self { + responses: HashMap::new(), + } + } + + fn set(&mut self, id: &str, result: Result) { + self.responses.insert(id.to_string(), result); + } +} + +#[async_trait] +impl AgentSpawner for MockSpawner { + async fn spawn( + &self, + task_id: String, + _prompt: String, + _agent_type: Option, + _cwd: Option, + ) -> Result { + match self.responses.get(&task_id) { + Some(Ok(r)) => Ok(AgentResult { + task_id: r.task_id.clone(), + success: r.success, + summary: r.summary.clone(), + files_touched: r.files_touched.clone(), + raw_output: r.raw_output.clone(), + tokens_used: r.tokens_used, + cost_usd: r.cost_usd, + elapsed_ms: r.elapsed_ms, + last_checkpoint: r.last_checkpoint.clone(), + }), + Some(Err(_)) => Err(SpawnError::SpawnFailed("mock error".into())), + None => Ok(AgentResult { + task_id: task_id.clone(), + success: true, + summary: format!("default result for {}", task_id), + files_touched: vec![], + raw_output: None, + tokens_used: Some(500), + cost_usd: Some(0.005), + elapsed_ms: Some(250), + last_checkpoint: Some("completed".into()), + }), + } + } +} + +fn make_result(task_id: &str, summary: &str, files: &[&str]) -> AgentResult { + AgentResult { + task_id: task_id.to_string(), + success: true, + summary: summary.to_string(), + files_touched: files.iter().map(|s| s.to_string()).collect(), + raw_output: None, + tokens_used: Some(1000), + cost_usd: Some(0.01), + elapsed_ms: Some(500), + last_checkpoint: Some("done".into()), + } +} + +fn make_task(id: &str, prompt: &str) -> Task { + Task { + id: id.into(), + prompt: prompt.into(), + agent_type: None, + depends_on_results: vec![], + max_steps: None, + timeout_secs: None, + mode: TaskMode::ReadOnly, + file_scope: vec![], + isolation: IsolationMode::Shared, + } +} + +#[tokio::test] +async fn full_workflow_three_phases() { + // A realistic 3-phase workflow: discovery → triage → fix + let config = WorkflowConfig { + goal: "Security audit".into(), + max_concurrent: 4, + phases: vec![ + Phase { + name: "discovery".into(), + depends_on: vec![], + parallel: true, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![ + make_task("scan-auth", "Audit auth module"), + make_task("scan-api", "Audit API endpoints"), + ], + }, + Phase { + name: "triage".into(), + depends_on: vec!["discovery".into()], + parallel: false, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![Task { + id: "rank-findings".into(), + prompt: "Rank findings".into(), + depends_on_results: vec!["scan-auth".into(), "scan-api".into()], + ..make_task("rank-findings", "Rank findings") + }], + }, + Phase { + name: "fix".into(), + depends_on: vec!["triage".into()], + parallel: true, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![ + Task { + id: "fix-1".into(), + prompt: "Fix #1".into(), + mode: TaskMode::ReadWrite, + file_scope: vec!["src/auth/**".into()], + ..make_task("fix-1", "Fix #1") + }, + Task { + id: "fix-2".into(), + prompt: "Fix #2".into(), + mode: TaskMode::ReadWrite, + file_scope: vec!["src/api/**".into()], + ..make_task("fix-2", "Fix #2") + }, + ], + }, + ], + }; + + let mut mock = MockSpawner::new(); + mock.set("scan-auth", Ok(make_result("scan-auth", "Auth looks clean", &[]))); + mock.set( + "scan-api", + Ok(make_result( + "scan-api", + "Found SQL injection risk", + &["src/api/handler.rs"], + )), + ); + mock.set( + "rank-findings", + Ok(make_result( + "rank-findings", + "API injection is critical, auth is clean", + &[], + )), + ); + mock.set( + "fix-1", + Ok(make_result( + "fix-1", + "Removed injection point", + &["src/auth/login.rs"], + )), + ); + mock.set( + "fix-2", + Ok(make_result( + "fix-2", + "Added input validation", + &["src/api/handler.rs"], + )), + ); + + let spawner = Arc::new(mock); + let mut scheduler = Scheduler::new(config.clone(), spawner); + let result = scheduler.run().await; + + // Verify overall status. + assert_eq!( + result.status, + codewhale_whaleflow::WorkflowStatus::Completed + ); + assert_eq!(result.counts.total, 5); + assert_eq!(result.counts.completed, 5); + assert_eq!(result.counts.failed, 0); + + // Verify phase ordering. + assert_eq!(result.phases.len(), 3); + assert_eq!(result.phases[0].name, "discovery"); + assert_eq!(result.phases[1].name, "triage"); + assert_eq!(result.phases[2].name, "fix"); + + // Verify the triage task received upstream context. + assert!(result.summary.contains("scan-auth")); + assert!(result.summary.contains("scan-api")); +} + +#[tokio::test] +async fn workflow_with_failure_skip_continue() { + let config = WorkflowConfig { + goal: "Partial failure test".into(), + max_concurrent: 4, + phases: vec![Phase { + name: "tasks".into(), + depends_on: vec![], + parallel: true, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![ + make_task("ok", "Do something"), + Task { + id: "fail".into(), + prompt: "Will fail".into(), + ..make_task("fail", "Will fail") + }, + ], + }], + }; + + let mut mock = MockSpawner::new(); + mock.set("ok", Ok(make_result("ok", "Done", &[]))); + mock.set( + "fail", + Err(SpawnError::SpawnFailed("test failure".into())), + ); + + let mut scheduler = Scheduler::new(config, Arc::new(mock)); + let result = scheduler.run().await; + + assert_eq!(result.counts.total, 2); + assert_eq!(result.counts.completed, 1); + assert_eq!(result.counts.failed, 1); + // Skip-continue means the workflow status is Partial, not Aborted. + assert_eq!( + result.status, + codewhale_whaleflow::WorkflowStatus::Partial + ); +} + +#[tokio::test] +async fn workflow_json_roundtrip() { + // Test that we can deserialize a realistic config and serialize the result. + let json = r#"{ + "goal": "Quick audit", + "max_concurrent": 2, + "phases": [ + { + "name": "scan", + "parallel": true, + "tasks": [ + {"id": "s1", "prompt": "Scan module A"}, + {"id": "s2", "prompt": "Scan module B"} + ] + } + ] + }"#; + + let config: WorkflowConfig = + serde_json::from_str(json).expect("Failed to parse workflow config"); + assert_eq!(config.goal, "Quick audit"); + assert_eq!(config.phases.len(), 1); + assert_eq!(config.phases[0].tasks.len(), 2); + + let mock = MockSpawner::new(); + let mut scheduler = Scheduler::new(config, Arc::new(mock)); + let result = scheduler.run().await; + + // Round-trip: serialize and deserialize the result. + let result_json = + serde_json::to_string_pretty(&result).expect("Failed to serialize result"); + let _parsed: serde_json::Value = + serde_json::from_str(&result_json).expect("Failed to parse result JSON"); + + assert!(result_json.contains("Quick audit")); + assert!(result_json.contains("completed")); +} diff --git a/docs/WHALEFLOW_ARCHITECTURE.md b/docs/WHALEFLOW_ARCHITECTURE.md new file mode 100644 index 000000000..7a44a3e88 --- /dev/null +++ b/docs/WHALEFLOW_ARCHITECTURE.md @@ -0,0 +1,197 @@ +# WhaleFlow Architecture + +> Declarative multi-agent workflow orchestration for CodeWhale. +> Inspired by Claude Code's Dynamic Workflows (May 2026, Opus 4.8). + +## Design + +WhaleFlow is a **declarative JSON-config-driven workflow orchestrator**. +The DeepSeek model generates a `WorkflowConfig` describing phases, tasks, +and dependencies. The Rust scheduler executes the config: topologically +sorting phases, fanning out sub-agents with concurrency control, piping +results between dependent tasks, and returning an integrated structured +result. + +This is **not** a JavaScript scripting runtime. Claude's Dynamic Workflows +use a Node `vm` sandbox where the model writes JS scripts calling `agent()`, +`parallel()`, `pipeline()`. WhaleFlow achieves the same capability through +a declarative config — the model writes JSON (DeepSeek V4's strongest +format) and the Rust runtime handles parallelism and dependency management. +This is simpler to build, easier to debug, and plays to DeepSeek's strengths. + +## Crate: `crates/whaleflow` + +Pure orchestration logic. No TUI, network, or filesystem dependencies. +Depends only on `serde`, `tokio`, and CodeWhale workspace libraries. + +### Module map + +| Module | Purpose | +|--------|---------| +| `config.rs` | `WorkflowConfig`, `Phase`, `Task`, `FailurePolicy` structs + JSON Schema + validation | +| `spawner.rs` | `AgentSpawner` trait — abstract interface for spawning sub-agents | +| `scheduler.rs` | `Scheduler` — topological sort, concurrency, result plumbing, failure handling | +| `result.rs` | `WorkflowResult`, `TaskResult` — structured output returned to the model | +| `lib.rs` | Public API re-exports | + +### AgentSpawner trait (the seam) + +```rust +#[async_trait] +pub trait AgentSpawner: Send + Sync { + async fn spawn( + &self, + task_id: String, + prompt: String, + agent_type: Option, + ) -> Result; +} +``` + +The `whaleflow` crate never spawns agents directly. The embedding +application (CodeWhale TUI crate) implements `AgentSpawner` using the +existing `SubAgentManager` / `SubAgentRuntime` infrastructure. This keeps +`whaleflow` decoupled from the TUI stack (ratatui, crossterm, etc.). + +### Workflow config shape + +```json +{ + "goal": "Security audit and remediation", + "max_concurrent": 6, + "phases": [ + { + "name": "discovery", + "parallel": true, + "on_failure": "skip_continue", + "tasks": [ + { + "id": "scan-auth", + "prompt": "Audit src/auth/ for vulnerabilities...", + "agent_type": "review" + } + ] + }, + { + "name": "triage", + "depends_on": ["discovery"], + "parallel": false, + "tasks": [ + { + "id": "rank-findings", + "prompt": "Rank all findings by severity...", + "depends_on_results": ["scan-auth", "scan-api"] + } + ] + } + ] +} +``` + +### Execution flow + +1. Model generates `WorkflowConfig` JSON +2. Model calls `workflow_run` tool with the config +3. Scheduler validates config (unique IDs, no cycles, valid deps) +4. Scheduler topologically sorts phases by `depends_on` +5. For each phase: + - If `parallel`: fan out all tasks, limited by `max_concurrent` semaphore + - If sequential: run tasks one at a time + - On failure: skip-continue (default) or abort (per-phase policy) +6. Results from completed tasks are injected into dependent tasks' prompts +7. Structured `WorkflowResult` returned to the model + +### Failure handling + +| Policy | Behavior | +|--------|----------| +| `skip_continue` (default) | Failed tasks are marked failed. Remaining tasks continue. Downstream tasks depending on failed results get skipped. | +| `abort` | First failure stops the entire workflow immediately. | + +### Concurrency model + +- `max_concurrent` in config controls the semaphore (default 6, max TBD) +- Applies globally across all phases +- Within a parallel phase, all tasks are spawned and acquire permits +- Sequential phases hold one permit while executing each task + +## Integration point: `crates/tui` + +### AgentSpawner implementation + +`WhaleFlowSpawner` implements `AgentSpawner` using the existing +`SubAgentManager` / `SubAgentRuntime`. For `isolation: "worktree"` tasks: + +1. Creates worktree via `WorktreeManager::create()` +2. Passes `cwd` to `agent_open` so the child runs in the isolated checkout +3. After completion, extracts patch via `WorktreeManager::extract_changes()` +4. Applies patch to main workspace via `WorktreeManager::apply_patch()` +5. Cleans up via `WorktreeManager::remove()` + +### TUI surfaces + +**Side panel (agents pane):** +- whaleFlow agents appear under a "🐋 Swarm" group header +- Shows: swarm goal, current phase, agent count, overall progress +- Per-agent: task ID, status icon (⏳/✓/✗), last completed checkpoint +- Global stats: total tokens, cost (USD/CNY), elapsed time +- Non-whaleFlow agents appear below, ungrouped + +**Pop-up dashboard (`/whaleflow dashboard` or `Ctrl+W`):** +- Toggleable overlay — floats above conversation, does not block input +- Top: workflow goal + global progress bar (N/M tasks complete) +- Per-phase progress bars with ✓/⏳/○ status +- Middle table: agent ID | task | status/progress | tokens | cost | elapsed +- Bottom: total cost, tokens, time across all agents +- Expand agent row to see tool calls and outputs + +**Commands:** +- `/workflow on/off` — toggle feature (off = suppress auto-detection, hide tool) +- `/whaleflow dashboard` or `Ctrl+W` — open/close the dashboard overlay + +### Per-agent cost tracking + +`AgentResult` extended with `tokens_used` and `cost_usd` fields. +Populated from `SubAgentSessionProjection` returned by `agent_eval`. +Aggregated in the scheduler for the global stats panel. + +### Progress model + +Agents report checkpoints (not live tool calls). Each checkpoint: +- Phase name, task ID, status (running/completed/failed) +- Last completed tool call name + file path +- Step count + +The scheduler aggregates checkpoints into phase-level and workflow-level +progress for the TUI to render. + +## The name "WhaleFlow" + +Whale = CodeWhale. Flow = workflow. Also a nod to "pod" — a group of +whales working together, like a swarm of sub-agents. + +## Splitting back into existing crates + +If the CodeWhale maintainer prefers not to add a new crate, the modules +map cleanly into the existing structure: + +| Module | Destination | +|--------|------------| +| `config.rs`, `result.rs` | `crates/tools/src/workflow/` (schema + tool definition) | +| `spawner.rs` | `crates/tools/src/workflow/` (or a trait in `crates/agent`) | +| `scheduler.rs` | `crates/agent/src/workflow/` (orchestration logic) | +| TUI integration | `crates/tui/src/tools/workflow.rs` (already the plan) | + +The crate boundary is thin enough that extraction or inlining is a +mechanical refactor — no logic changes required. + +## Prior art + +- **Claude Dynamic Workflows** (May 2026): JS scripts in a Node `vm` + sandbox. Model writes `agent()`, `parallel()`, `pipeline()` calls. + Up to 16 concurrent, 1000 total agents. Resumable runs. +- **pi-dynamic-workflows** (Michaelliv, May 2026): TypeScript clone + for Pi. Same API surface. AST-validated parser + sandbox. +- **codex-dynamic-workflows** (DannyMac180, May 2026): Skill-based + approach for Codex. No programmable runner — simulates subagents + with isolated packet notes. From 10a984957d18c3af5da0e3233113e6639c4136da Mon Sep 17 00:00:00 2001 From: AdityaVG13 Date: Mon, 1 Jun 2026 02:07:06 -0400 Subject: [PATCH 2/5] fix(whaleflow): wire worktree isolation, abort policy, timeout, spawn_blocking - cwd_path() now returns worktree path for Worktree variant (was dead code) - parallel phases now honor Abort failure policy - WorktreeManager git calls wrapped in tokio::spawn_blocking - timeout_secs wired end-to-end with tokio::time::timeout on polling loop - AgentSpawner trait extended with timeout_secs/max_steps parameters - WorkflowRunTool no longer claims ReadOnly capability - unknown agent_type now logs a warning instead of silently defaulting Addresses Greptile review: P1 (blocking Command), P2 (dead timeout_secs) --- crates/tui/src/tools/workflow/mod.rs | 292 +++++++++++++-------- crates/whaleflow/src/config.rs | 14 +- crates/whaleflow/src/scheduler.rs | 22 +- crates/whaleflow/src/spawner.rs | 6 + crates/whaleflow/tests/integration_test.rs | 2 + 5 files changed, 221 insertions(+), 115 deletions(-) diff --git a/crates/tui/src/tools/workflow/mod.rs b/crates/tui/src/tools/workflow/mod.rs index 16400aa99..d1f75eca7 100644 --- a/crates/tui/src/tools/workflow/mod.rs +++ b/crates/tui/src/tools/workflow/mod.rs @@ -60,123 +60,203 @@ impl AgentSpawner for WhaleFlowSpawner { prompt: String, agent_type: Option, cwd: Option, + timeout_secs: Option, + _max_steps: Option, ) -> Result { - // For worktree isolation: create the worktree if cwd is set - // (the scheduler pre-computes the path based on isolation mode). - // `WorktreeManager::create` is idempotent — no-op if the worktree - // already exists (e.g. reused across parallel phases). - let actual_cwd = if cwd.is_some() { - let worktree_path = WorktreeManager::create(&task_id, &self.workspace)?; - Some(worktree_path) - } else { - None - }; + // Build the future that does the real work — we'll wrap it in a + // timeout below. + let task_id_inner = task_id.clone(); + let work = async { + let task_id = task_id_inner; + // For worktree isolation: create the worktree if cwd is set + // (the scheduler pre-computes the path based on isolation mode). + // `WorktreeManager::create` is idempotent — no-op if the worktree + // already exists (e.g. reused across parallel phases). + // Git operations are CPU-bound; run them on the blocking pool. + let workspace = self.workspace.clone(); + let tid = task_id.clone(); + let actual_cwd = if cwd.is_some() { + let wp = tokio::task::spawn_blocking(move || { + WorktreeManager::create(&tid, &workspace) + }) + .await + .map_err(|e| SpawnError::Internal(format!("spawn_blocking join: {e}")))??; + Some(wp) + } else { + None + }; - // Determine agent type. Default to General (full tool access). - let subagent_type = agent_type - .as_deref() - .and_then(SubAgentType::from_str) - .unwrap_or_default(); - - // Derive a detached child runtime so the sub-agent outlives the - // scheduler's turn token. - let mut child_runtime = self.runtime.background_runtime(); - if let Some(ref cwd_path) = actual_cwd { - child_runtime.context.workspace = cwd_path.clone(); - } + // Determine agent type. Default to General (full tool access). + // Warn on unknown agent_type strings so typos don't silently + // default to a full-access agent. + let subagent_type = match agent_type.as_deref() { + Some(s) => match SubAgentType::from_str(s) { + Some(t) => t, + None => { + tracing::warn!( + task_id = %task_id, + raw_type = %s, + "unknown agent_type, defaulting to General" + ); + SubAgentType::default() + } + }, + None => SubAgentType::default(), + }; - // Spawn via the shared sub-agent manager. - let spawn_result = { - let mut mgr = self.manager.write().await; - mgr.spawn_background( - Arc::clone(&self.manager), - child_runtime, - subagent_type, - prompt, - None, // full tool access — same as a top-level sub-agent - ) - .map_err(|e| SpawnError::SpawnFailed(format!("{e}")))? - }; + // Derive a detached child runtime so the sub-agent outlives the + // scheduler's turn token. + let mut child_runtime = self.runtime.background_runtime(); + if let Some(ref cwd_path) = actual_cwd { + child_runtime.context.workspace = cwd_path.clone(); + } - let agent_id = spawn_result.agent_id.clone(); - - tracing::debug!( - agent_id = %agent_id, - task_id = %task_id, - "WhaleFlow spawned sub-agent" - ); - - // Poll for completion. The sub-agent manager updates the snapshot - // in-place when the background task finishes. - loop { - let snapshot = { - let mgr = self.manager.read().await; - mgr.get_result(&agent_id) - .map_err(|e| SpawnError::Internal(format!("{e}")))? + // Spawn via the shared sub-agent manager. + let spawn_result = { + let mut mgr = self.manager.write().await; + mgr.spawn_background( + Arc::clone(&self.manager), + child_runtime, + subagent_type, + prompt, + None, // full tool access + ) + .map_err(|e| SpawnError::SpawnFailed(format!("{e}")))? }; - match snapshot.status { - SubAgentStatus::Running => { - // Still running — back off before next poll. - tokio::time::sleep(std::time::Duration::from_millis(250)).await; - } - SubAgentStatus::Completed => { - let summary = snapshot.result.clone().unwrap_or_default(); - let elapsed_ms = Some(snapshot.duration_ms); - - // Clean up worktree if we created one: extract the - // diff patch, apply it to the main workspace, then - // remove the worktree. Best-effort — we already have - // the agent result, so worktree cleanup failures are - // logged but don't fail the task. - if cwd.is_some() { - if let Ok(patch) = - WorktreeManager::extract_changes(&task_id, &self.workspace) - { - if !patch.trim().is_empty() { - if let Err(e) = - WorktreeManager::apply_patch(&self.workspace, &patch) - { - tracing::warn!( - task_id = %task_id, - error = %e, - "Failed to apply worktree patch" - ); + let agent_id = spawn_result.agent_id.clone(); + + tracing::debug!( + agent_id = %agent_id, + task_id = %task_id, + "WhaleFlow spawned sub-agent" + ); + + // Poll for completion. The sub-agent manager updates the snapshot + // in-place when the background task finishes. + loop { + let snapshot = { + let mgr = self.manager.read().await; + mgr.get_result(&agent_id) + .map_err(|e| SpawnError::Internal(format!("{e}")))? + }; + + match snapshot.status { + SubAgentStatus::Running => { + // Still running — back off before next poll. + tokio::time::sleep(std::time::Duration::from_millis(250)).await; + } + SubAgentStatus::Completed => { + let summary = snapshot.result.clone().unwrap_or_default(); + let elapsed_ms = Some(snapshot.duration_ms); + + // Clean up worktree if we created one: extract the + // diff patch, apply it to the main workspace, then + // remove the worktree. Best-effort — we already have + // the agent result, so worktree cleanup failures are + // logged but don't fail the task. + if cwd.is_some() { + let ws = self.workspace.clone(); + let tid = task_id.clone(); + let patch = tokio::task::spawn_blocking(move || { + WorktreeManager::extract_changes(&tid, &ws) + }) + .await + .map_err(|e| { + SpawnError::Internal(format!("spawn_blocking join: {e}")) + }); + if let Ok(Ok(patch)) = patch { + if !patch.trim().is_empty() { + let ws = self.workspace.clone(); + let p = patch.clone(); + if let Err(e) = tokio::task::spawn_blocking(move || { + WorktreeManager::apply_patch(&ws, &p) + }) + .await + .map_err(|e| { + SpawnError::Internal(format!( + "spawn_blocking join: {e}" + )) + }) { + tracing::warn!( + task_id = %task_id, + error = %e, + "Failed to apply worktree patch" + ); + } } } + let ws = self.workspace.clone(); + let tid = task_id.clone(); + if let Err(e) = tokio::task::spawn_blocking(move || { + WorktreeManager::remove(&tid, &ws) + }) + .await + .map_err(|e| { + SpawnError::Internal(format!("spawn_blocking join: {e}")) + }) { + tracing::warn!( + task_id = %task_id, + error = %e, + "Failed to remove worktree" + ); + } } - if let Err(e) = WorktreeManager::remove(&task_id, &self.workspace) { - tracing::warn!( - task_id = %task_id, - error = %e, - "Failed to remove worktree" - ); - } - } - return Ok(AgentResult { - task_id, - success: true, - summary, - files_touched: Vec::new(), - raw_output: snapshot.result, - tokens_used: None, - cost_usd: None, - elapsed_ms, - last_checkpoint: None, - }); - } - SubAgentStatus::Failed(err) | SubAgentStatus::Interrupted(err) => { - let _ = WorktreeManager::remove(&task_id, &self.workspace); - return Err(SpawnError::SpawnFailed(err)); + return Ok(AgentResult { + task_id, + success: true, + summary, + files_touched: Vec::new(), + raw_output: snapshot.result, + tokens_used: None, + cost_usd: None, + elapsed_ms, + last_checkpoint: None, + }); + } + SubAgentStatus::Failed(err) | SubAgentStatus::Interrupted(err) => { + let ws = self.workspace.clone(); + let tid = task_id.clone(); + let _ = tokio::task::spawn_blocking(move || { + WorktreeManager::remove(&tid, &ws) + }) + .await; + return Err(SpawnError::SpawnFailed(err)); + } + SubAgentStatus::Cancelled => { + let ws = self.workspace.clone(); + let tid = task_id.clone(); + let _ = tokio::task::spawn_blocking(move || { + WorktreeManager::remove(&tid, &ws) + }) + .await; + return Err(SpawnError::Cancelled( + "agent cancelled".to_string(), + )); + } } - SubAgentStatus::Cancelled => { - let _ = WorktreeManager::remove(&task_id, &self.workspace); - return Err(SpawnError::Cancelled( - "agent cancelled".to_string(), - )); + } + }; + + // Wrap the entire spawn+ poll in a timeout when `timeout_secs` is set. + if let Some(secs) = timeout_secs { + match tokio::time::timeout(std::time::Duration::from_secs(secs), work).await { + Ok(result) => result, + Err(_elapsed) => { + tracing::warn!( + task_id = %task_id, + timeout_secs = secs, + "WhaleFlow sub-agent timed out" + ); + Err(SpawnError::Timeout(format!( + "task '{}' timed out after {}s", + task_id, secs + ))) } } + } else { + work.await } } } @@ -220,7 +300,9 @@ impl ToolSpec for WorkflowRunTool { } fn capabilities(&self) -> Vec { - vec![ToolCapability::ReadOnly] + // workflow_run orchestrates sub-agents that may write files, so it + // is NOT read-only even though the tool itself doesn't write directly. + vec![] } fn supports_parallel(&self) -> bool { diff --git a/crates/whaleflow/src/config.rs b/crates/whaleflow/src/config.rs index 84987a250..df9c13570 100644 --- a/crates/whaleflow/src/config.rs +++ b/crates/whaleflow/src/config.rs @@ -91,12 +91,18 @@ pub enum IsolationMode { } impl IsolationMode { - /// Return the worktree path if this task should run in one. - /// The actual worktree creation is handled by the spawner. - pub fn cwd_path(&self) -> Option { + /// Return the working directory for this isolation mode. + /// + /// `Shared` tasks run in the main workspace (None = use default). + /// `Worktree` tasks get a dedicated directory under `.worktrees/`. + /// The caller (scheduler or spawner) passes the task id so the path + /// is deterministic: `.worktrees/whaleflow-{task_id}`. + pub fn cwd_path(&self, task_id: &str) -> Option { match self { IsolationMode::Shared => None, - IsolationMode::Worktree => None, // Path set by spawner, not here + IsolationMode::Worktree => { + Some(std::path::PathBuf::from(format!(".worktrees/whaleflow-{}", task_id))) + } } } } diff --git a/crates/whaleflow/src/scheduler.rs b/crates/whaleflow/src/scheduler.rs index c21423096..2759e6d33 100644 --- a/crates/whaleflow/src/scheduler.rs +++ b/crates/whaleflow/src/scheduler.rs @@ -123,11 +123,13 @@ impl Scheduler { let prompt = self.build_prompt(&task); let task_id_for_closure = task_id.clone(); - let cwd = task.isolation.cwd_path(); + let cwd = task.isolation.cwd_path(&task_id); + let timeout_secs = task.timeout_secs; + let max_steps = task.max_steps; let handle = tokio::spawn(async move { let _permit = sem.acquire().await; spawner - .spawn(task_id_for_closure, prompt, task.agent_type.clone(), cwd) + .spawn(task_id_for_closure, prompt, task.agent_type.clone(), cwd, timeout_secs, max_steps) .await }); handles.push((task_id, handle)); @@ -148,22 +150,28 @@ impl Scheduler { Ok(Err(spawn_err)) => { warn!(task = %task_id, error = %spawn_err, "task failed"); task_results.push(TaskResult { - id: task_id, + id: task_id.clone(), status: TaskStatus::Failed, summary: None, files_touched: vec![], error: Some(spawn_err.to_string()), }); + if phase.on_failure == FailurePolicy::Abort { + break; + } } Err(join_err) => { warn!(task = %task_id, error = %join_err, "task panicked"); task_results.push(TaskResult { - id: task_id, + id: task_id.clone(), status: TaskStatus::Failed, summary: None, files_touched: vec![], error: Some(format!("join error: {}", join_err)), }); + if phase.on_failure == FailurePolicy::Abort { + break; + } } } } @@ -172,11 +180,11 @@ impl Scheduler { for task in &phase.tasks { let prompt = self.build_prompt(task); let _permit = self.concurrency.acquire().await; - let cwd = task.isolation.cwd_path(); + let cwd = task.isolation.cwd_path(&task.id); match self .spawner - .spawn(task.id.clone(), prompt, task.agent_type.clone(), cwd) + .spawn(task.id.clone(), prompt, task.agent_type.clone(), cwd, task.timeout_secs, task.max_steps) .await { Ok(agent_result) => { @@ -391,6 +399,8 @@ mod tests { _prompt: String, _agent_type: Option, _cwd: Option, + _timeout_secs: Option, + _max_steps: Option, ) -> Result { match self.responses.get(&task_id) { Some(result) => match result { diff --git a/crates/whaleflow/src/spawner.rs b/crates/whaleflow/src/spawner.rs index 56e3a97a3..8a1bbffff 100644 --- a/crates/whaleflow/src/spawner.rs +++ b/crates/whaleflow/src/spawner.rs @@ -63,6 +63,10 @@ pub trait AgentSpawner: Send + Sync { /// worktree isolation). The spawner is responsible for creating and /// cleaning up the worktree if `isolation` is `Worktree`. /// + /// `timeout_secs` caps total wall-clock time for this agent + /// (including polling). `max_steps` maps to the sub-agent's + /// `max_depth` to bound recursive tool calls. + /// /// The spawner should handle model selection, tool gating, and /// session lifecycle. The scheduler only cares about the result. async fn spawn( @@ -71,5 +75,7 @@ pub trait AgentSpawner: Send + Sync { prompt: String, agent_type: Option, cwd: Option, + timeout_secs: Option, + max_steps: Option, ) -> Result; } diff --git a/crates/whaleflow/tests/integration_test.rs b/crates/whaleflow/tests/integration_test.rs index 567abc963..123c72213 100644 --- a/crates/whaleflow/tests/integration_test.rs +++ b/crates/whaleflow/tests/integration_test.rs @@ -34,6 +34,8 @@ impl AgentSpawner for MockSpawner { _prompt: String, _agent_type: Option, _cwd: Option, + _timeout_secs: Option, + _max_steps: Option, ) -> Result { match self.responses.get(&task_id) { Some(Ok(r)) => Ok(AgentResult { From c415bc042c0bdb7fd727c4447166f0de09688c41 Mon Sep 17 00:00:00 2001 From: AdityaVG13 Date: Mon, 1 Jun 2026 02:19:33 -0400 Subject: [PATCH 3/5] fix(whaleflow): wire max_steps, fix silent worktree errors, add abort tests - max_steps flows through SubAgentSpawnOptions to per-agent step budget - extract_changes errors now logged instead of silently ignored - files_touched populated from worktree diff output - TaskStatus re-exported from whaleflow crate - 3 new tests: abort in parallel phase, abort stops subsequent phases, timeout_secs/max_steps deserialization Addresses Greptile P1 (silent failure on extract_changes) --- crates/tui/src/tools/subagent/mod.rs | 8 +- crates/tui/src/tools/workflow/mod.rs | 79 ++++++++++---- crates/whaleflow/src/lib.rs | 2 +- crates/whaleflow/tests/integration_test.rs | 116 ++++++++++++++++++++- 4 files changed, 177 insertions(+), 28 deletions(-) diff --git a/crates/tui/src/tools/subagent/mod.rs b/crates/tui/src/tools/subagent/mod.rs index 67d3cd17f..8dd257011 100644 --- a/crates/tui/src/tools/subagent/mod.rs +++ b/crates/tui/src/tools/subagent/mod.rs @@ -589,6 +589,11 @@ pub(crate) struct SubAgentSpawnOptions { pub model: Option, pub nickname: Option, pub fork_context: bool, + /// Per-spawn override for the max tool-call steps budget. + /// When `None` (default), the manager's global `max_steps` is used. + /// WhaleFlow passes the task-level `max_steps` here to bound + /// per-agent recursion without affecting other concurrent agents. + pub max_steps: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -1365,7 +1370,7 @@ impl SubAgentManager { agent.fork_context = options.fork_context; let agent_id = agent.id.clone(); let started_at = agent.started_at; - let max_steps = self.max_steps; + let max_steps = options.max_steps.unwrap_or(self.max_steps); if let Some(event_tx) = runtime.event_tx.clone() { let _ = event_tx.try_send(Event::AgentSpawned { @@ -2414,6 +2419,7 @@ impl ToolSpec for AgentSpawnTool { model: Some(effective_model), nickname: None, fork_context: spawn_request.fork_context, + max_steps: None, }, ) .map_err(|e| ToolError::execution_failed(format!("Failed to spawn sub-agent: {e}")))?; diff --git a/crates/tui/src/tools/workflow/mod.rs b/crates/tui/src/tools/workflow/mod.rs index d1f75eca7..62568c87d 100644 --- a/crates/tui/src/tools/workflow/mod.rs +++ b/crates/tui/src/tools/workflow/mod.rs @@ -61,7 +61,7 @@ impl AgentSpawner for WhaleFlowSpawner { agent_type: Option, cwd: Option, timeout_secs: Option, - _max_steps: Option, + max_steps: Option, ) -> Result { // Build the future that does the real work — we'll wrap it in a // timeout below. @@ -114,12 +114,21 @@ impl AgentSpawner for WhaleFlowSpawner { // Spawn via the shared sub-agent manager. let spawn_result = { let mut mgr = self.manager.write().await; - mgr.spawn_background( + let opts = crate::tools::subagent::SubAgentSpawnOptions { + max_steps, + ..Default::default() + }; + mgr.spawn_background_with_assignment_options( Arc::clone(&self.manager), child_runtime, subagent_type, - prompt, + prompt.clone(), + crate::tools::subagent::SubAgentAssignment { + objective: prompt.clone(), + role: None, + }, None, // full tool access + opts, ) .map_err(|e| SpawnError::SpawnFailed(format!("{e}")))? }; @@ -155,36 +164,60 @@ impl AgentSpawner for WhaleFlowSpawner { // remove the worktree. Best-effort — we already have // the agent result, so worktree cleanup failures are // logged but don't fail the task. + let mut files_touched: Vec = Vec::new(); if cwd.is_some() { let ws = self.workspace.clone(); let tid = task_id.clone(); - let patch = tokio::task::spawn_blocking(move || { + let patch_result = tokio::task::spawn_blocking(move || { WorktreeManager::extract_changes(&tid, &ws) }) .await .map_err(|e| { SpawnError::Internal(format!("spawn_blocking join: {e}")) }); - if let Ok(Ok(patch)) = patch { - if !patch.trim().is_empty() { - let ws = self.workspace.clone(); - let p = patch.clone(); - if let Err(e) = tokio::task::spawn_blocking(move || { - WorktreeManager::apply_patch(&ws, &p) - }) - .await - .map_err(|e| { - SpawnError::Internal(format!( - "spawn_blocking join: {e}" - )) - }) { - tracing::warn!( - task_id = %task_id, - error = %e, - "Failed to apply worktree patch" - ); + match patch_result { + Ok(Ok(patch)) => { + if !patch.trim().is_empty() { + // Parse changed file paths from the diff. + files_touched = patch + .lines() + .filter(|l| l.starts_with("+++ b/")) + .filter_map(|l| l.strip_prefix("+++ b/")) + .map(|s| s.to_string()) + .collect(); + let ws = self.workspace.clone(); + let p = patch; + if let Err(e) = tokio::task::spawn_blocking( + move || WorktreeManager::apply_patch(&ws, &p), + ) + .await + .map_err(|e| { + SpawnError::Internal(format!( + "spawn_blocking join: {e}" + )) + }) { + tracing::warn!( + task_id = %task_id, + error = %e, + "Failed to apply worktree patch" + ); + } } } + Ok(Err(e)) => { + tracing::warn!( + task_id = %task_id, + error = %e, + "Failed to extract worktree changes" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + error = %e, + "spawn_blocking failed during worktree extraction" + ); + } } let ws = self.workspace.clone(); let tid = task_id.clone(); @@ -207,7 +240,7 @@ impl AgentSpawner for WhaleFlowSpawner { task_id, success: true, summary, - files_touched: Vec::new(), + files_touched, raw_output: snapshot.result, tokens_used: None, cost_usd: None, diff --git a/crates/whaleflow/src/lib.rs b/crates/whaleflow/src/lib.rs index 63ed91af1..d5af97d43 100644 --- a/crates/whaleflow/src/lib.rs +++ b/crates/whaleflow/src/lib.rs @@ -33,7 +33,7 @@ pub mod tool; pub use config::{ Conflict, ConflictKind, FailurePolicy, IsolationMode, Phase, Task, TaskMode, WorkflowConfig, }; -pub use result::{WorkflowResult, WorkflowStatus}; +pub use result::{TaskStatus, WorkflowResult, WorkflowStatus}; pub use scheduler::Scheduler; pub use spawner::{AgentResult, AgentSpawner, SpawnError}; pub use worktree::WorktreeManager; diff --git a/crates/whaleflow/tests/integration_test.rs b/crates/whaleflow/tests/integration_test.rs index 123c72213..f190b7431 100644 --- a/crates/whaleflow/tests/integration_test.rs +++ b/crates/whaleflow/tests/integration_test.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use async_trait::async_trait; use codewhale_whaleflow::{ AgentResult, AgentSpawner, FailurePolicy, IsolationMode, Phase, Scheduler, SpawnError, Task, - TaskMode, WorkflowConfig, + TaskMode, TaskStatus, WorkflowConfig, WorkflowStatus, }; /// A controllable mock spawner for integration testing. @@ -189,7 +189,7 @@ async fn full_workflow_three_phases() { // Verify overall status. assert_eq!( result.status, - codewhale_whaleflow::WorkflowStatus::Completed + WorkflowStatus::Completed ); assert_eq!(result.counts.total, 5); assert_eq!(result.counts.completed, 5); @@ -243,7 +243,7 @@ async fn workflow_with_failure_skip_continue() { // Skip-continue means the workflow status is Partial, not Aborted. assert_eq!( result.status, - codewhale_whaleflow::WorkflowStatus::Partial + WorkflowStatus::Partial ); } @@ -284,3 +284,113 @@ async fn workflow_json_roundtrip() { assert!(result_json.contains("Quick audit")); assert!(result_json.contains("completed")); } + +#[tokio::test] +async fn abort_policy_in_parallel_phase_stops_remaining_tasks() { + // Phase with two parallel tasks and Abort policy. One task fails — + // the other should be marked skipped, and the workflow status should + // be Aborted. + let config = WorkflowConfig { + goal: "abort test".into(), + max_concurrent: 4, + phases: vec![Phase { + name: "dangerous".into(), + depends_on: vec![], + parallel: true, + on_failure: FailurePolicy::Abort, + tasks: vec![ + make_task("ok", "good task"), + make_task("fail", "bad task"), + make_task("never-runs", "should be skipped"), + ], + }], + }; + + let mut mock = MockSpawner::new(); + mock.set("ok", Ok(make_result("ok", "all good", &[]))); + mock.set( + "fail", + Err(SpawnError::SpawnFailed("boom".into())), + ); + + let spawner = Arc::new(mock); + let mut scheduler = Scheduler::new(config, spawner); + let result = scheduler.run().await; + + // Workflow should be aborted (phase had Abort policy + failure). + assert_eq!(result.status, WorkflowStatus::Aborted); + assert_eq!(result.counts.total, 3); + assert!(result.counts.completed >= 1); // "ok" completed + assert_eq!(result.counts.failed, 1); // "fail" failed + assert!(result.counts.skipped >= 1); // "never-runs" skipped + // Phase 1 should be the only phase. + assert_eq!(result.phases.len(), 1); + let tasks = &result.phases[0].tasks; + let never_runs = tasks.iter().find(|t| t.id == "never-runs").unwrap(); + assert_eq!(never_runs.status, TaskStatus::Skipped); +} + +#[tokio::test] +async fn abort_policy_stops_subsequent_phases() { + // Two phases: phase 1 with Abort + a failing task → phase 2 must not run. + let config = WorkflowConfig { + goal: "phase abort test".into(), + max_concurrent: 4, + phases: vec![ + Phase { + name: "first".into(), + depends_on: vec![], + parallel: false, + on_failure: FailurePolicy::Abort, + tasks: vec![make_task("f1", "failing task")], + }, + Phase { + name: "second".into(), + depends_on: vec!["first".into()], + parallel: false, + on_failure: FailurePolicy::SkipContinue, + tasks: vec![make_task("s1", "should not run")], + }, + ], + }; + + let mut mock = MockSpawner::new(); + mock.set( + "f1", + Err(SpawnError::SpawnFailed("fail".into())), + ); + + let spawner = Arc::new(mock); + let mut scheduler = Scheduler::new(config, spawner); + let result = scheduler.run().await; + + assert_eq!(result.status, WorkflowStatus::Aborted); + // Phase 2 should not appear in results. + assert_eq!(result.phases.len(), 1); + assert_eq!(result.phases[0].name, "first"); +} + +#[test] +fn timeout_secs_field_is_deserialized_correctly() { + let json = r#"{ + "goal": "timeout test", + "phases": [ + { + "name": "slow", + "tasks": [ + { + "id": "t1", + "prompt": "do work", + "timeout_secs": 30, + "max_steps": 10 + } + ] + } + ] + }"#; + + let config: WorkflowConfig = serde_json::from_str(json).unwrap(); + let task = &config.phases[0].tasks[0]; + assert_eq!(task.timeout_secs, Some(30)); + assert_eq!(task.max_steps, Some(10)); +} From 574a6065d5299b3478076f1decb287508a0f98ee Mon Sep 17 00:00:00 2001 From: AdityaVG13 Date: Mon, 1 Jun 2026 02:56:29 -0400 Subject: [PATCH 4/5] fix(whaleflow): validate missing file_scope in parallel ReadWrite tasks --- crates/whaleflow/src/config.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/crates/whaleflow/src/config.rs b/crates/whaleflow/src/config.rs index df9c13570..2cc1aceee 100644 --- a/crates/whaleflow/src/config.rs +++ b/crates/whaleflow/src/config.rs @@ -224,6 +224,26 @@ impl WorkflowConfig { } } + // ReadWrite tasks in parallel phases must declare file_scope + // (unless they use worktree isolation). + for phase in &self.phases { + if !phase.parallel || phase.tasks.len() < 2 { + continue; + } + for task in &phase.tasks { + if task.mode == TaskMode::ReadWrite + && task.isolation != IsolationMode::Worktree + && task.file_scope.is_empty() + { + errors.push(format!( + "task '{}' is ReadWrite with no file_scope in parallel phase '{}'. \ + Add file_scope, use isolation 'worktree', or make the phase sequential.", + task.id, phase.name + )); + } + } + } + // Detect cycles in phase dependencies. if let Some(cycle) = detect_cycle(&self.phases) { errors.push(format!( From 069b7db9c721ff8cc21b83222086af3f7217993f Mon Sep 17 00:00:00 2001 From: AdityaVG13 Date: Mon, 1 Jun 2026 02:57:05 -0400 Subject: [PATCH 5/5] fix(whaleflow): improve scopes_overlap with path-boundary matching --- crates/whaleflow/src/config.rs | 39 ++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/crates/whaleflow/src/config.rs b/crates/whaleflow/src/config.rs index 2cc1aceee..9d248c6bc 100644 --- a/crates/whaleflow/src/config.rs +++ b/crates/whaleflow/src/config.rs @@ -353,27 +353,44 @@ impl Conflict { /// Check if two sets of file scope patterns overlap. /// -/// Uses prefix matching: strips glob suffixes (`/**`, `/*`) and checks -/// if one prefix starts with the other. Simple but effective for the -/// typical patterns the model generates (e.g. `src/auth/**`). +/// Strips glob wildcards (`**`, `*`) and then checks whether the +/// resulting directory prefixes overlap at a path-segment boundary. +/// `src/api/**` vs `src/apiv2/**` → no overlap (different segments). +/// `src/*/handler.rs` strips the `*` and checks prefix boundaries. fn scopes_overlap(a: &[String], b: &[String]) -> bool { if a.is_empty() || b.is_empty() { return false; } - fn strip_glob(s: &str) -> &str { - s.trim_end_matches('/') - .trim_end_matches("**") - .trim_end_matches('*') - .trim_end_matches('/') + /// Strip glob wildcards from the end of a pattern, stopping before + /// the last directory separator so path-boundary matching works. + fn normalize_pattern(s: &str) -> String { + // Remove trailing globs: /** → nothing, /* → nothing, /*.rs → nothing + let mut p = s.trim_end_matches('/').to_string(); + while p.ends_with("**") || p.ends_with('*') { + let trimmed = p.trim_end_matches("**").trim_end_matches('*'); + if trimmed.len() == p.len() { + break; + } + p = trimmed.to_string(); + } + // Ensure we end at a directory boundary for correct prefix matching. + p = p.trim_end_matches('/').to_string(); + p } - let a_prefixes: Vec<&str> = a.iter().map(|s| strip_glob(s)).collect(); - let b_prefixes: Vec<&str> = b.iter().map(|s| strip_glob(s)).collect(); + let a_prefixes: Vec = a.iter().map(|s| normalize_pattern(s)).collect(); + let b_prefixes: Vec = b.iter().map(|s| normalize_pattern(s)).collect(); for ap in &a_prefixes { for bp in &b_prefixes { - if ap.starts_with(bp) || bp.starts_with(ap) { + // Only flag as overlapping if one is a path-segment prefix of + // the other, i.e. `src/api` matches `src/api` or `src/api/...` + // but NOT `src/apiv2`. + if ap == bp + || ap.starts_with(&format!("{}/", bp)) + || bp.starts_with(&format!("{}/", ap)) + { return true; } }