From bb452c7427b80d600a2f77c529af58d8677c0fba Mon Sep 17 00:00:00 2001 From: auric Date: Fri, 19 Jun 2026 07:39:08 +0800 Subject: [PATCH] auto-implement #139: Codex overall-timeout (3600s) not enforced: implement codex hung 11h on quiet `scripts/run.sh test`, freezing the devloop pipeline (open_pr 0 success); suspect #135 flush blocks t --- crates/fkst-framework/src/sdk_codex.rs | 134 +++++++++++++++++++---- crates/fkst-framework/tests/sdk_codex.rs | 127 +++++++++++++++++++++ 2 files changed, 239 insertions(+), 22 deletions(-) diff --git a/crates/fkst-framework/src/sdk_codex.rs b/crates/fkst-framework/src/sdk_codex.rs index 652ab8a..3d125aa 100644 --- a/crates/fkst-framework/src/sdk_codex.rs +++ b/crates/fkst-framework/src/sdk_codex.rs @@ -48,6 +48,7 @@ static NEXT_PIPELINE_OWNER_ID: AtomicU64 = AtomicU64::new(1); static NEXT_CODEX_RUN_ID: AtomicU64 = AtomicU64::new(1); // both sync and async SDK calls share one immutable request description. +#[derive(Clone)] struct CodexRequest { run_id: String, prompt: String, @@ -348,9 +349,15 @@ pub(crate) fn register_with_runner( lua.create_function(move |lua, opts: Table| { crate::process_tree::ensure_supervisor_parent_alive()?; let request = codex_request_from_opts(opts, dept.as_deref().map(str::to_string)); - flush_raises_before_sync_codex(&raise_buf, raised_auth_token.as_deref()); - run_codex_request(request, &host_root, &config, runner.as_ref().as_ref())? - .into_lua_table(lua) + run_codex_sync_request( + request, + &host_root, + &config, + runner.as_ref().as_ref(), + raise_buf.clone(), + raised_auth_token.as_ref().clone(), + )? + .into_lua_table(lua) })? })?; lua.globals().set("spawn_codex", { @@ -404,6 +411,35 @@ fn flush_raises_before_sync_codex(raise_buf: &RaiseBuffer, raised_auth_token: Op } } +fn run_codex_sync_request( + request: CodexRequest, + host_root: &Path, + config: &ConfigContext, + runner: Option<&MockCommandState>, + raise_buf: RaiseBuffer, + raised_auth_token: Option, +) -> Result { + if let Some(token) = raised_auth_token { + let timeout = codex_sync_timeout(request.timeout_seconds); + let timeout_request = request.clone(); + let (tx, rx) = mpsc::channel(); + std::thread::spawn(move || { + flush_raises_before_sync_codex(&raise_buf, Some(&token)); + let _ = tx.send(()); + }); + match rx.recv_timeout(timeout) { + Ok(()) => {} + Err(RecvTimeoutError::Timeout) => return Ok(sync_timeout_result(&timeout_request)), + Err(RecvTimeoutError::Disconnected) => { + return Err(mlua::Error::external( + "spawn_codex_sync raise flush exited without result", + )) + } + } + } + run_codex_request(request, host_root, config, runner) +} + // the same input names an overall wall-clock timeout with a bounded default. fn codex_request_from_opts(opts: Table, runtime_dept: Option) -> CodexRequest { let prompt: String = opts.get("prompt").unwrap_or_default(); @@ -615,7 +651,10 @@ fn run_adoptable_codex_request( .write(true) .open(lock_path) .map_err(mlua::Error::external)?; - flock(lock_file.as_raw_fd(), FlockArg::LockExclusive).map_err(mlua::Error::external)?; + let deadline = codex_deadline(request.timeout_seconds); + if !try_lock_until_deadline(lock_file.as_raw_fd(), deadline).map_err(mlua::Error::external)? { + return Ok(adoption_timeout_result(&request, &paths)?); + } if let Some(result) = read_completed_adoption_result(&paths)? { drop(lock_file); return Ok(result); @@ -982,10 +1021,7 @@ fn wait_for_adoption_result( request: &CodexRequest, paths: &CodexAdoptionPaths, ) -> Result { - let deadline = Instant::now() - + overall_timeout(request.timeout_seconds) - .unwrap_or_else(|| Duration::from_secs(DEFAULT_CODEX_TIMEOUT_SECONDS as u64)) - + CODEX_ADOPTION_TIMEOUT_GRACE; + let deadline = codex_deadline(request.timeout_seconds) + CODEX_ADOPTION_TIMEOUT_GRACE; loop { if let Some(result) = read_completed_adoption_result(paths)? { return Ok(result); @@ -994,23 +1030,72 @@ fn wait_for_adoption_result( if let Some(record) = read_adoption_record(&paths.status)? { kill_adoption_worker_group(&record); } - let message = format!( - "adopted codex timed out waiting for result after {}s wall clock", - request.timeout_seconds - ); - return Ok(CodexResult::failure( - "timeout", - message, - read_optional_string(&paths.stdout).map_err(mlua::Error::external)?, - read_optional_string(&paths.stderr).map_err(mlua::Error::external)?, - 124, - request.log_path.to_string_lossy().into_owned(), - )); + return Ok(adoption_timeout_result(request, paths)?); } std::thread::sleep(CODEX_ADOPTION_POLL); } } +fn adoption_timeout_result( + request: &CodexRequest, + paths: &CodexAdoptionPaths, +) -> Result { + let message = format!( + "adopted codex timed out waiting for result after {}s wall clock", + request.timeout_seconds + ); + Ok(CodexResult::failure( + "timeout", + message, + read_optional_string(&paths.stdout).map_err(mlua::Error::external)?, + read_optional_string(&paths.stderr).map_err(mlua::Error::external)?, + 124, + request.log_path.to_string_lossy().into_owned(), + )) +} + +fn sync_timeout_result(request: &CodexRequest) -> CodexResult { + CodexResult::failure( + "timeout", + format!( + "spawn_codex_sync timed out after {}s wall clock", + request.timeout_seconds + ), + String::new(), + String::new(), + 124, + request.log_path.to_string_lossy().into_owned(), + ) +} + +fn codex_sync_timeout(timeout_seconds: i64) -> Duration { + overall_timeout(timeout_seconds) + .unwrap_or_else(|| Duration::from_secs(DEFAULT_CODEX_TIMEOUT_SECONDS as u64)) +} + +fn codex_deadline(timeout_seconds: i64) -> Instant { + Instant::now() + codex_sync_timeout(timeout_seconds) +} + +fn try_lock_until_deadline(fd: std::os::fd::RawFd, deadline: Instant) -> anyhow::Result { + loop { + match flock(fd, FlockArg::LockExclusiveNonblock) { + Ok(()) => return Ok(true), + Err(err) if lock_is_busy(err) => { + if Instant::now() >= deadline { + return Ok(false); + } + std::thread::sleep(CODEX_ADOPTION_POLL); + } + Err(err) => return Err(err.into()), + } + } +} + +fn lock_is_busy(err: nix::errno::Errno) -> bool { + err == nix::errno::Errno::EWOULDBLOCK || err == nix::errno::Errno::EAGAIN +} + #[cfg(unix)] fn kill_adoption_worker_group(record: &CodexAdoptionRecord) { if let Some(pid) = record.codex_pid { @@ -1769,6 +1854,7 @@ fn wait_codex_with_timeout_and_tail( } else { killed_for_timeout = Some(message); } + break Err("codex overall timeout reached".to_string()); } Err(RecvTimeoutError::Disconnected) => { break Err("codex wait channel closed before process exit".to_string()); @@ -1777,9 +1863,13 @@ fn wait_codex_with_timeout_and_tail( }; for reader in readers { - let _ = reader.join(); + if killed_for_timeout.is_none() { + let _ = reader.join(); + } + } + if killed_for_timeout.is_none() { + let _ = waiter.join(); } - let _ = waiter.join(); drain_codex_events(&rx, &mut stdout, &mut stderr); write_live_output_tail(output_tail_path.as_deref(), &stdout, &stderr); diff --git a/crates/fkst-framework/tests/sdk_codex.rs b/crates/fkst-framework/tests/sdk_codex.rs index c3fcb1d..3c1891d 100644 --- a/crates/fkst-framework/tests/sdk_codex.rs +++ b/crates/fkst-framework/tests/sdk_codex.rs @@ -100,6 +100,21 @@ fn recv_result(rx: &std::sync::mpsc::Receiver, label: &str) -> String { .unwrap_or_else(|err| panic!("timed out waiting for {label}: {err}")) } +#[cfg(unix)] +fn wait_for_process_exit(pid: i32, timeout: Duration) -> bool { + use nix::sys::signal::kill; + use nix::unistd::Pid; + + let deadline = std::time::Instant::now() + timeout; + while std::time::Instant::now() < deadline { + if kill(Pid::from_raw(pid), None).is_err() { + return true; + } + std::thread::sleep(Duration::from_millis(25)); + } + false +} + #[cfg(unix)] fn continuous_output_codex_script(stream_redirect: &'static str) -> String { format!( @@ -933,6 +948,118 @@ printf 'adopted-%s' "$count" ); } +#[cfg(unix)] +#[test] +fn spawn_codex_sync_adoption_quiet_child_returns_timeout_failure() { + let tmp = tempfile::tempdir().unwrap(); + let bin_dir = tmp.path().join("bin"); + let worktree = tmp.path().join("wt"); + let pid_fifo = tmp.path().join("pid.fifo"); + std::fs::create_dir_all(&worktree).unwrap(); + make_fifo(&pid_fifo); + install_codex_script( + &bin_dir, + r#"#!/bin/sh +cat >/dev/null +printf '%s' "$$" > "$PID_FIFO" +while :; do sleep 10; done +"#, + ); + + let mut sandbox = ProcessSandbox::new(); + sandbox.enter_cwd(tmp.path()).runtime_root(".fkst/runtime"); + sandbox.prepend_path(&bin_dir); + sandbox.set_env("PID_FIFO", pid_fifo.to_string_lossy().into_owned()); + sandbox.set_env(CODEX_WORKER_BIN_ENV, framework_bin()); + sandbox.runtime_log_dir(tmp.path().join("runtime")); + let (_lock, _guard) = sandbox.enter(); + + let pid_reader = std::thread::spawn({ + let pid_fifo = pid_fifo.clone(); + move || read_fifo(&pid_fifo).trim().parse::().unwrap() + }); + + let lua = Lua::new(); + register(&lua).unwrap(); + let spawn: mlua::Function = lua.globals().get("spawn_codex_sync").unwrap(); + let opts = lua_opts(&lua, "quiet adoption timeout"); + opts.set("worktree", worktree.to_string_lossy().into_owned()) + .unwrap(); + opts.set("dedup_key", "quiet-timeout").unwrap(); + opts.set("timeout", 1).unwrap(); + + let result: Table = spawn.call(opts).unwrap(); + let child_pid = pid_reader.join().unwrap(); + assert_eq!(result.get::("exit_code").unwrap(), 124); + assert_eq!(result.get::("error_kind").unwrap(), "timeout"); + assert!(result.get::("error").unwrap().contains("timed out")); + assert!(wait_for_process_exit(child_pid, Duration::from_secs(3))); +} + +#[cfg(unix)] +#[test] +fn spawn_codex_sync_adoption_lock_wait_is_bounded_by_timeout() { + let tmp = tempfile::tempdir().unwrap(); + let bin_dir = tmp.path().join("bin"); + let worktree = tmp.path().join("wt"); + std::fs::create_dir_all(&worktree).unwrap(); + install_codex_script( + &bin_dir, + r#"#!/bin/sh +cat >/dev/null +printf 'must-not-run' +"#, + ); + + let mut sandbox = ProcessSandbox::new(); + sandbox.enter_cwd(tmp.path()).runtime_root(".fkst/runtime"); + sandbox.prepend_path(&bin_dir); + sandbox.set_env(CODEX_WORKER_BIN_ENV, framework_bin()); + sandbox.runtime_log_dir(tmp.path().join("runtime")); + let (_lock, _guard) = sandbox.enter(); + + let lua = Lua::new(); + register(&lua).unwrap(); + let spawn: mlua::Function = lua.globals().get("spawn_codex_sync").unwrap(); + let first_opts = lua_opts(&lua, "stale lock"); + first_opts + .set("worktree", worktree.to_string_lossy().into_owned()) + .unwrap(); + first_opts.set("dedup_key", "lock-timeout").unwrap(); + first_opts.set("timeout", 30).unwrap(); + + let first: Table = spawn.call(first_opts).unwrap(); + assert_eq!(first.get::("stdout").unwrap(), "must-not-run"); + + let adoption_dir = tmp.path().join(".fkst/runtime/logs/codex-adoption"); + let adoption_key_dir = std::fs::read_dir(&adoption_dir) + .unwrap() + .map(|entry| entry.unwrap().path()) + .find(|path| path.is_dir()) + .expect("adoption key dir"); + let lock_file = std::fs::OpenOptions::new() + .read(true) + .write(true) + .open(adoption_key_dir.join("run.lock")) + .unwrap(); + flock(lock_file.as_raw_fd(), FlockArg::LockExclusiveNonblock).unwrap(); + std::fs::remove_file(adoption_key_dir.join("status.json")).unwrap(); + + let second_opts = lua_opts(&lua, "stale lock"); + second_opts + .set("worktree", worktree.to_string_lossy().into_owned()) + .unwrap(); + second_opts.set("dedup_key", "lock-timeout").unwrap(); + second_opts.set("timeout", 1).unwrap(); + let started = std::time::Instant::now(); + let second: Table = spawn.call(second_opts).unwrap(); + + assert!(started.elapsed() < Duration::from_secs(5)); + assert_eq!(second.get::("exit_code").unwrap(), 124); + assert_eq!(second.get::("error_kind").unwrap(), "timeout"); + assert!(second.get::("error").unwrap().contains("timed out")); +} + #[cfg(unix)] #[test] fn codex_runs_reads_running_adoption_record_without_status_log() {