Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 59 additions & 3 deletions crates/orbit-core/src/command/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use orbit_store::{global_executor_def_store, global_policy_def_store};
use crate::OrbitRuntime;
use crate::command::activity::seed_default_activities;
use crate::command::executor::seed_default_executors;
use crate::command::job::seed_default_jobs;
use crate::command::job::{refresh_stale_default_job_assets, seed_default_jobs};
use crate::command::policy::seed_default_policies;
use crate::command::skill::{
default_skill_ids, is_default_skill_file_for_root, seed_default_skills,
Expand Down Expand Up @@ -80,7 +80,8 @@ pub(crate) fn ensure_orbit_root_initialized(
..Default::default()
},
)?;
prepare_workspace_root_layout(workspace_root)?;
let workspace_layout = prepare_workspace_root_layout(workspace_root)?;
refresh_stale_default_job_assets(&workspace_layout.jobs_dir)?;
if RuntimeConfig::load_layered(global_root, global_root)?.scoring_enabled {
seed_scoreboard_templates(workspace_root)?;
}
Expand Down Expand Up @@ -205,9 +206,10 @@ pub fn init_workspace_at_root(
)?;
refreshed_skill_files = global_result.refreshed_skill_files;
created_skills_symlink = global_result.created_skills_symlink;
let refreshed_workspace_jobs = refresh_stale_default_job_assets(&layout.jobs_dir)?;
(
global_result.refreshed_default_activities,
global_result.refreshed_default_jobs,
global_result.refreshed_default_jobs + refreshed_workspace_jobs,
global_result.refreshed_default_executors,
global_result.refreshed_default_policies,
RuntimeConfig::load_layered(&global_root, &orbit_root)?.scoring_enabled,
Expand Down Expand Up @@ -721,6 +723,26 @@ mod tests {
assert_skill_link_exists(home.path().join(".claude").join("skills").join("orbit"));
}

#[test]
fn implicit_init_refreshes_stale_workspace_task_gate_pipeline() {
let global = tempdir().expect("global tempdir");
let workspace = tempdir().expect("workspace tempdir");
let global_root = global.path().join(".orbit");
let workspace_root = workspace.path().join(".orbit");
let workspace_job = workspace_root
.join("resources")
.join("jobs")
.join("task_gate_pipeline.yaml");
write_legacy_ttl_task_gate_pipeline(&workspace_job);

ensure_orbit_root_initialized(&global_root, &workspace_root)
.expect("implicit init refreshes workspace resources");

let contents = fs::read_to_string(&workspace_job).expect("read refreshed job");
assert!(contents.contains("release_reservation"));
assert!(contents.contains("target: activity:release_locks"));
}

#[test]
fn global_init_writes_role_settings_to_config_toml() {
let _guard = ENV_LOCK.lock().expect("lock env");
Expand Down Expand Up @@ -888,6 +910,40 @@ mod tests {
assert!(path.join("SKILL.md").exists());
}

fn write_legacy_ttl_task_gate_pipeline(path: &Path) {
let yaml = r#"schemaVersion: 2
kind: Job
metadata:
name: task_gate_pipeline
spec:
state: enabled
kind: workflow
max_active_runs: 10
steps:
- id: wait_for_window
loop:
max_iterations: 120
break_when: "{{ steps.reserve.output.reserved }} == true"
steps:
- id: reserve
target: activity:reserve_locks
- id: sleep_if_conflict
when: "{{ steps.reserve.output.reserved }} == false"
target: activity:sleep
- id: starvation_check
when: "{{ steps.reserve.output.reserved }} == false"
target: activity:gate_starvation_fail
- id: dispatch_pr
when: "{{ input.mode }} == pr && {{ steps.reserve.output.reserved }} == true"
target: activity:invoke_and_wait
- id: dispatch_local
when: "{{ input.mode }} == local && {{ steps.reserve.output.reserved }} == true"
target: activity:invoke_and_wait
"#;
fs::create_dir_all(path.parent().expect("job path has parent")).expect("create job dir");
fs::write(path, yaml).expect("write legacy task gate pipeline");
}

fn restore_home(previous_home: Option<std::ffi::OsString>) {
match previous_home {
Some(value) => unsafe {
Expand Down
210 changes: 205 additions & 5 deletions crates/orbit-core/src/command/job.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};

use orbit_common::types::{JobKind, JobRun, JobScheduleState, JobV2, OrbitError, load_job_asset};
use orbit_common::types::{
JobKind, JobRun, JobScheduleState, JobV2, JobV2Step, JobV2StepBody, OrbitError, load_job_asset,
};
use orbit_common::utility::fs::write_text_with_parent;
use serde_json::Value;

use crate::OrbitRuntime;

const TASK_GATE_PIPELINE_JOB: &str = "task_gate_pipeline";
const TASK_GATE_PIPELINE_FILE: &str = "task_gate_pipeline.yaml";

/// Shippable default workflow assets, seeded under
/// `<orbit_root>/resources/jobs/<name>.yaml` on `orbit init`. The five
/// entries here are the admission-controlled task shipment workflows
Expand Down Expand Up @@ -329,8 +334,10 @@ fn find_v2_job_asset_in_dir_inner(
/// copied out on `orbit init` so the job loader can discover it without
/// depending on a git checkout of this repo.
///
/// When `overwrite` is false, existing files are preserved — users who've
/// edited a previously-seeded workflow won't lose their changes on re-init.
/// When `overwrite` is false, existing files are preserved unless they match a
/// known stale default that must migrate to keep the shipped workflow contract.
/// Custom task-gate overrides that violate the release contract are reported as
/// stale rather than silently kept.
pub(crate) fn seed_default_jobs(jobs_dir: &Path, overwrite: bool) -> Result<usize, OrbitError> {
let mut count = 0usize;
for (name, content) in DEFAULT_JOB_FILES {
Expand All @@ -341,20 +348,130 @@ pub(crate) fn seed_default_jobs(jobs_dir: &Path, overwrite: bool) -> Result<usiz
write_text_with_parent(&path, content)?;
count += 1;
}
count += refresh_stale_default_job_assets(jobs_dir)?;
Ok(count)
}

/// Refresh known stale job defaults that may live in a resource directory from
/// older Orbit versions. This is intentionally narrow: custom overrides that
/// violate the task-gate reservation-release contract are reported as stale
/// instead of being silently rewritten.
pub(crate) fn refresh_stale_default_job_assets(jobs_dir: &Path) -> Result<usize, OrbitError> {
refresh_stale_task_gate_pipeline_asset(&jobs_dir.join(TASK_GATE_PIPELINE_FILE))
}

fn refresh_stale_task_gate_pipeline_asset(path: &Path) -> Result<usize, OrbitError> {
if !path.exists() {
return Ok(0);
}
if !path.is_file() {
return Err(OrbitError::InvalidInput(format!(
"expected task gate pipeline asset to be a file: {}",
path.display()
)));
}

let yaml = std::fs::read_to_string(path).map_err(|err| {
OrbitError::InvalidInput(format!("read stale-check asset {}: {err}", path.display()))
})?;
let asset = load_job_asset(&yaml).map_err(|err| {
OrbitError::InvalidInput(format!("parse stale-check asset {}: {err}", path.display()))
})?;
if asset.name != TASK_GATE_PIPELINE_JOB {
return Ok(0);
}
if task_gate_pipeline_releases_reservation(&asset.spec) {
return Ok(0);
}

if is_legacy_ttl_only_task_gate_pipeline(&asset.spec) {
let canonical = default_job_yaml(TASK_GATE_PIPELINE_JOB).ok_or_else(|| {
OrbitError::InvalidInput("missing canonical task_gate_pipeline asset".to_string())
})?;
write_text_with_parent(path, canonical)?;
return Ok(1);
}

Err(OrbitError::InvalidInput(format!(
"stale task_gate_pipeline override at {} keeps reservations until TTL; update it from the canonical asset or remove the override so the release_locks step is used",
path.display()
)))
}

fn default_job_yaml(job_name: &str) -> Option<&'static str> {
DEFAULT_JOB_FILES
.iter()
.find_map(|(name, yaml)| (*name == job_name).then_some(*yaml))
}

fn task_gate_pipeline_releases_reservation(job: &JobV2) -> bool {
let mut saw_child_wait = false;
for step in &job.steps {
if step_targets_activity(step, "invoke_and_wait") {
saw_child_wait = true;
}
if saw_child_wait && step_targets_activity(step, "release_locks") {
return true;
}
}
false
}

fn is_legacy_ttl_only_task_gate_pipeline(job: &JobV2) -> bool {
let root_step_ids = job
.steps
.iter()
.map(|step| step.id.as_str())
.collect::<Vec<_>>();
root_step_ids
== [
"wait_for_window",
"starvation_check",
"dispatch_pr",
"dispatch_local",
]
&& job
.steps
.iter()
.any(|step| step_targets_activity(step, "invoke_and_wait"))
}

fn step_targets_activity(step: &JobV2Step, activity_name: &str) -> bool {
match &step.body {
JobV2StepBody::TargetRef(target) => {
target.target.as_str().strip_prefix("activity:") == Some(activity_name)
}
JobV2StepBody::Target(target) => match &target.spec {
orbit_common::types::ActivityV2Spec::Deterministic(spec) => {
spec.action == activity_name
}
_ => false,
},
JobV2StepBody::Parallel { parallel } => parallel
.branches
.iter()
.any(|child| step_targets_activity(child, activity_name)),
JobV2StepBody::FanOut { fan_out, .. } => {
step_targets_activity(&fan_out.worker, activity_name)
}
JobV2StepBody::Loop { loop_ } => loop_
.steps
.iter()
.any(|child| step_targets_activity(child, activity_name)),
}
}

#[cfg(test)]
mod tests {
use super::*;

use orbit_common::types::activity_job::{V2ActivityCatalog, resolve_job_target_refs};
use orbit_common::types::{ActivityV2Spec, JobV2Step, JobV2StepBody, load_activity_asset};
use orbit_common::types::{ActivityV2Spec, load_activity_asset};
use serde_json::Value;
use std::collections::BTreeSet;
use tempfile::tempdir;

use crate::command::activity::DEFAULT_ACTIVITY_FILES;
use crate::command::activity::{DEFAULT_ACTIVITY_FILES, seed_default_activities};

fn test_runtime() -> (tempfile::TempDir, OrbitRuntime, PathBuf, PathBuf) {
let root = tempdir().expect("create tempdir");
Expand Down Expand Up @@ -391,6 +508,41 @@ spec:
std::fs::write(path, yaml).expect("write job yaml");
}

fn write_legacy_ttl_task_gate_pipeline(path: &Path) {
let yaml = r#"schemaVersion: 2
kind: Job
metadata:
name: task_gate_pipeline
spec:
state: enabled
kind: workflow
max_active_runs: 10
steps:
- id: wait_for_window
loop:
max_iterations: 120
break_when: "{{ steps.reserve.output.reserved }} == true"
steps:
- id: reserve
target: activity:reserve_locks
- id: sleep_if_conflict
when: "{{ steps.reserve.output.reserved }} == false"
target: activity:sleep
- id: starvation_check
when: "{{ steps.reserve.output.reserved }} == false"
target: activity:gate_starvation_fail
- id: dispatch_pr
when: "{{ input.mode }} == pr && {{ steps.reserve.output.reserved }} == true"
target: activity:invoke_and_wait
- id: dispatch_local
when: "{{ input.mode }} == local && {{ steps.reserve.output.reserved }} == true"
target: activity:invoke_and_wait
"#;
std::fs::create_dir_all(path.parent().expect("job path has parent"))
.expect("create job dir");
std::fs::write(path, yaml).expect("write legacy task gate pipeline");
}

fn default_activity_catalog() -> V2ActivityCatalog {
let mut catalog = V2ActivityCatalog::new();
for (name, yaml) in DEFAULT_ACTIVITY_FILES {
Expand Down Expand Up @@ -818,6 +970,54 @@ spec:
assert_eq!(spec.max_active_runs, 7);
}

#[test]
fn stale_workspace_task_gate_pipeline_refreshes_to_release_behavior() {
let (_root, runtime, global_root, workspace_root) = test_runtime();
seed_default_jobs(&global_root.join("resources/jobs"), true).expect("seed default jobs");
seed_default_activities(&global_root.join("resources/activities"), true)
.expect("seed default activities");
let workspace_job = workspace_root.join("resources/jobs/task_gate_pipeline.yaml");
write_legacy_ttl_task_gate_pipeline(&workspace_job);

let refreshed = refresh_stale_default_job_assets(&workspace_root.join("resources/jobs"))
.expect("refresh stale workspace job");
assert_eq!(refreshed, 1);

let entry = runtime
.show_job_catalog_entry("task_gate_pipeline")
.expect("catalog entry");
assert_eq!(entry.path, workspace_job);
assert!(
task_gate_pipeline_releases_reservation(&entry.spec),
"refreshed task_gate_pipeline must release reservations explicitly"
);

let catalog = runtime.v2_activity_catalog().expect("activity catalog");
let mut resolved = entry.spec.clone();
resolve_job_target_refs(&mut resolved, &catalog)
.expect("release_locks resolves from the active catalog");
}

#[test]
fn custom_stale_task_gate_pipeline_override_is_reported() {
let root = tempdir().expect("create tempdir");
let jobs_dir = root.path().join("resources/jobs");
write_job(
&jobs_dir.join("task_gate_pipeline.yaml"),
"task_gate_pipeline",
"custom_gate_without_release",
1,
);

let err = refresh_stale_default_job_assets(&jobs_dir)
.expect_err("custom stale override should be reported");
assert!(
err.to_string()
.contains("stale task_gate_pipeline override"),
"{err}"
);
}

#[test]
fn duplicate_jobs_within_one_catalog_directory_remain_invalid() {
let (_root, runtime, _global_root, workspace_root) = test_runtime();
Expand Down
3 changes: 3 additions & 0 deletions docs/design/activity-job/2_design.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ The seeded `list_backlog_tasks` deterministic activity starts `task_auto_pipelin

`task_gate_pipeline` reserves a bundle's context files before it dispatches `task_pr_pipeline` or `task_local_pipeline` through `invoke_and_wait`. The reservation remains active for the whole child run, so another gate run with overlapping selectors continues to receive a `reservation` conflict while the child is running. After `invoke_and_wait` returns a terminal child-run status (`succeeded`, `failed`, or `cancelled`), the seeded deterministic `release_locks` activity calls `orbit.task.locks.release` with the recorded reservation id. A wait-side `timeout` does not release the reservation because the child run may still be active; the original TTL remains the abandoned/crashed-run cleanup path. This lifecycle was tightened in [T20260430-26].

Default seeding treats the embedded `crates/orbit-core/assets/jobs/task_gate_pipeline.yaml` as canonical for the reservation-release contract. After [T20260505-3], implicit initialization and default job seeding refresh the known legacy TTL-only `task_gate_pipeline` asset when it appears in a global or workspace resource directory; a customized override that still lacks `release_locks` fails with a stale-override error instead of silently preserving queue-latency behavior.

---

## 5. Backend Resolution and Constraint Rules
Expand Down Expand Up @@ -454,5 +456,6 @@ Read-only history does not need the same dependencies as live execution. [T20260
- **[T20260430-30]** — Make `ship-auto` default text output human-readable while preserving JSON fields.
- **[T20260430-31]** — Require populated execution summaries before opening task PRs.
- **[T20260505-2]** — Admit accepted backlog friction reports in automatic backlog listing.
- **[T20260505-3]** — Refresh legacy TTL-only task-gate resource assets or report stale custom overrides.

> Resolve any task above with `orbit task show <ID>` or `git log --grep=<ID>`.
Loading
Loading