diff --git a/CHANGELOG.md b/CHANGELOG.md index ff6a99ad5..7a1c0e2d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 release-safe. The foundation now includes serializable branch, leaf, and control-node result records toward the #2668 TraceStore contract. Thanks @AdityaVG13 for the WhaleFlow draft and cost-tracking direction. +- Added a state-store v2 schema migration for WhaleFlow trace tables covering + workflow, branch, leaf, control-node, and teacher-candidate runs. The + migration creates persistence shape only; workflow execution and replay + remain deferred until the runtime semantics are safe (#2668). - Added an official VS Code extension Phase 0 scaffold with terminal launch, local runtime attach checks, status bar state, and a read-only Agent View preview backed by recent runtime thread summaries. This answers the VS Code diff --git a/crates/state/src/lib.rs b/crates/state/src/lib.rs index 8b75306be..2bf42ebb9 100644 --- a/crates/state/src/lib.rs +++ b/crates/state/src/lib.rs @@ -267,7 +267,7 @@ impl StateStore { fn init_schema(&self) -> Result<()> { let conn = self.conn()?; - let user_version: u32 = conn.query_row("PRAGMA user_version;", [], |row| row.get(0))?; + let mut user_version: u32 = conn.query_row("PRAGMA user_version;", [], |row| row.get(0))?; if user_version == 0 { conn.execute_batch( r#" @@ -376,6 +376,104 @@ impl StateStore { "#, ) .context("failed to initialize thread schema")?; + user_version = 1; + } + if user_version < 2 { + conn.execute_batch( + r#" + BEGIN; + CREATE TABLE IF NOT EXISTS workflow_runs ( + id TEXT PRIMARY KEY, + workflow_id TEXT NOT NULL, + goal TEXT NOT NULL, + status TEXT NOT NULL, + input_hash TEXT, + started_at INTEGER NOT NULL, + completed_at INTEGER, + metadata_json TEXT NOT NULL DEFAULT '{}' + ); + CREATE INDEX IF NOT EXISTS idx_workflow_runs_status_started_at + ON workflow_runs(status, started_at DESC); + CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow_started_at + ON workflow_runs(workflow_id, started_at DESC); + + CREATE TABLE IF NOT EXISTS branch_runs ( + id TEXT PRIMARY KEY, + workflow_run_id TEXT NOT NULL, + branch_id TEXT NOT NULL, + node_id TEXT NOT NULL, + status TEXT NOT NULL, + started_at INTEGER NOT NULL, + completed_at INTEGER, + result_json TEXT NOT NULL DEFAULT '{}', + FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE + ); + CREATE INDEX IF NOT EXISTS idx_branch_runs_workflow_run_id + ON branch_runs(workflow_run_id); + CREATE INDEX IF NOT EXISTS idx_branch_runs_branch_id + ON branch_runs(branch_id); + + CREATE TABLE IF NOT EXISTS leaf_runs ( + id TEXT PRIMARY KEY, + workflow_run_id TEXT NOT NULL, + branch_run_id TEXT, + leaf_id TEXT NOT NULL, + task_id TEXT NOT NULL, + input_hash TEXT, + status TEXT NOT NULL, + output_json TEXT NOT NULL DEFAULT '{}', + artifacts_json TEXT NOT NULL DEFAULT '[]', + started_at INTEGER NOT NULL, + completed_at INTEGER, + FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE, + FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL + ); + CREATE INDEX IF NOT EXISTS idx_leaf_runs_workflow_run_id + ON leaf_runs(workflow_run_id); + CREATE INDEX IF NOT EXISTS idx_leaf_runs_replay_lookup + ON leaf_runs(workflow_run_id, leaf_id, input_hash); + + CREATE TABLE IF NOT EXISTS control_node_runs ( + id TEXT PRIMARY KEY, + workflow_run_id TEXT NOT NULL, + node_id TEXT NOT NULL, + kind TEXT NOT NULL, + status TEXT NOT NULL, + selected_children_json TEXT NOT NULL DEFAULT '[]', + result_json TEXT NOT NULL DEFAULT '{}', + started_at INTEGER NOT NULL, + completed_at INTEGER, + FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE + ); + CREATE INDEX IF NOT EXISTS idx_control_node_runs_workflow_run_id + ON control_node_runs(workflow_run_id); + CREATE INDEX IF NOT EXISTS idx_control_node_runs_node_id + ON control_node_runs(node_id); + + CREATE TABLE IF NOT EXISTS teacher_candidates ( + id TEXT PRIMARY KEY, + workflow_run_id TEXT NOT NULL, + control_node_run_id TEXT NOT NULL, + candidate_id TEXT NOT NULL, + branch_run_id TEXT, + score REAL, + passed INTEGER, + rationale_json TEXT NOT NULL DEFAULT '{}', + created_at INTEGER NOT NULL, + FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE, + FOREIGN KEY(control_node_run_id) REFERENCES control_node_runs(id) ON DELETE CASCADE, + FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL + ); + CREATE INDEX IF NOT EXISTS idx_teacher_candidates_workflow_run_id + ON teacher_candidates(workflow_run_id); + CREATE INDEX IF NOT EXISTS idx_teacher_candidates_control_node_run_id + ON teacher_candidates(control_node_run_id); + + PRAGMA user_version = 2; + COMMIT; + "#, + ) + .context("failed to initialize workflow trace schema")?; } Ok(()) } diff --git a/crates/state/tests/parity_state.rs b/crates/state/tests/parity_state.rs index 2590b2a59..69481e080 100644 --- a/crates/state/tests/parity_state.rs +++ b/crates/state/tests/parity_state.rs @@ -12,6 +12,30 @@ fn temp_state_path(label: &str) -> PathBuf { )) } +fn assert_workflow_trace_schema(conn: &Connection) { + let user_version: u32 = conn + .query_row("PRAGMA user_version;", [], |row| row.get(0)) + .expect("read user_version"); + assert_eq!(user_version, 2); + + for table in [ + "workflow_runs", + "branch_runs", + "leaf_runs", + "control_node_runs", + "teacher_candidates", + ] { + let exists: bool = conn + .query_row( + "SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1)", + [table], + |row| row.get(0), + ) + .unwrap_or_else(|err| panic!("read sqlite_master for {table}: {err}")); + assert!(exists, "missing workflow trace table {table}"); + } +} + #[test] fn upsert_and_resume_thread_metadata() { let path = temp_state_path("upsert_resume"); @@ -157,6 +181,102 @@ fn init_schema_migration() { StateStore::open(Some(path.clone())).expect("open state store"); } +#[test] +fn fresh_schema_includes_workflow_trace_tables() { + let path = temp_state_path("fresh_schema_includes_workflow_trace_tables"); + + StateStore::open(Some(path.clone())).expect("open state store"); + + let conn = Connection::open(&path).expect("open state db"); + assert_workflow_trace_schema(&conn); +} + +#[test] +fn v1_schema_migrates_workflow_trace_tables() { + let path = temp_state_path("v1_schema_migrates_workflow_trace_tables"); + let conn = Connection::open(&path).expect("open state db"); + conn.execute_batch( + r#" + CREATE TABLE threads ( + id TEXT PRIMARY KEY, + rollout_path TEXT, + preview TEXT NOT NULL, + ephemeral INTEGER NOT NULL, + model_provider TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + status TEXT NOT NULL, + path TEXT, + cwd TEXT NOT NULL, + cli_version TEXT NOT NULL, + source TEXT NOT NULL, + title TEXT, + sandbox_policy TEXT, + approval_mode TEXT, + archived INTEGER NOT NULL DEFAULT 0, + archived_at INTEGER, + git_sha TEXT, + git_branch TEXT, + git_origin_url TEXT, + memory_mode TEXT, + current_leaf_id INTEGER + ); + CREATE TABLE messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + thread_id TEXT NOT NULL, + role TEXT NOT NULL, + content TEXT NOT NULL, + item_json TEXT, + created_at INTEGER NOT NULL, + parent_entry_id INTEGER + ); + CREATE TABLE checkpoints ( + thread_id TEXT NOT NULL, + checkpoint_id TEXT NOT NULL, + state_json TEXT NOT NULL, + created_at INTEGER NOT NULL, + PRIMARY KEY(thread_id, checkpoint_id) + ); + CREATE TABLE jobs ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + status TEXT NOT NULL, + progress INTEGER, + detail TEXT, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ); + CREATE TABLE thread_dynamic_tools ( + thread_id TEXT NOT NULL, + position INTEGER NOT NULL, + name TEXT NOT NULL, + description TEXT, + input_schema TEXT NOT NULL, + PRIMARY KEY (thread_id, position) + ); + INSERT INTO threads ( + id, preview, ephemeral, model_provider, created_at, updated_at, status, cwd, cli_version, source, archived + ) + VALUES ( + 'thread-test-1', 'hello', false, 'deepseek', 0, 0, 'running', '/tmp/project', '0.0.0-test', 'interactive', false + ); + PRAGMA user_version = 1; + "#, + ) + .expect("create v1 schema"); + drop(conn); + + let store = StateStore::open(Some(path.clone())).expect("open state store"); + let thread = store + .get_thread("thread-test-1") + .expect("read thread") + .expect("thread survives migration"); + assert_eq!(thread.preview, "hello"); + + let conn = Connection::open(&path).expect("open state db"); + assert_workflow_trace_schema(&conn); +} + #[test] fn init_schema_migration_same_second_messages() { let path = temp_state_path("init_schema_migration_same_second_messages"); diff --git a/crates/tui/CHANGELOG.md b/crates/tui/CHANGELOG.md index ff6a99ad5..7a1c0e2d0 100644 --- a/crates/tui/CHANGELOG.md +++ b/crates/tui/CHANGELOG.md @@ -40,6 +40,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 release-safe. The foundation now includes serializable branch, leaf, and control-node result records toward the #2668 TraceStore contract. Thanks @AdityaVG13 for the WhaleFlow draft and cost-tracking direction. +- Added a state-store v2 schema migration for WhaleFlow trace tables covering + workflow, branch, leaf, control-node, and teacher-candidate runs. The + migration creates persistence shape only; workflow execution and replay + remain deferred until the runtime semantics are safe (#2668). - Added an official VS Code extension Phase 0 scaffold with terminal launch, local runtime attach checks, status bar state, and a read-only Agent View preview backed by recent runtime thread summaries. This answers the VS Code