Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 112 additions & 22 deletions crates/fkst-framework/src/sdk_codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ static NEXT_PIPELINE_OWNER_ID: AtomicU64 = AtomicU64::new(1);
static NEXT_CODEX_RUN_ID: AtomicU64 = AtomicU64::new(1);

// both sync and async SDK calls share one immutable request description.
#[derive(Clone)]
struct CodexRequest {
run_id: String,
prompt: String,
Expand Down Expand Up @@ -348,9 +349,15 @@ pub(crate) fn register_with_runner(
lua.create_function(move |lua, opts: Table| {
crate::process_tree::ensure_supervisor_parent_alive()?;
let request = codex_request_from_opts(opts, dept.as_deref().map(str::to_string));
flush_raises_before_sync_codex(&raise_buf, raised_auth_token.as_deref());
run_codex_request(request, &host_root, &config, runner.as_ref().as_ref())?
.into_lua_table(lua)
run_codex_sync_request(
request,
&host_root,
&config,
runner.as_ref().as_ref(),
raise_buf.clone(),
raised_auth_token.as_ref().clone(),
)?
.into_lua_table(lua)
})?
})?;
lua.globals().set("spawn_codex", {
Expand Down Expand Up @@ -404,6 +411,35 @@ fn flush_raises_before_sync_codex(raise_buf: &RaiseBuffer, raised_auth_token: Op
}
}

fn run_codex_sync_request(
request: CodexRequest,
host_root: &Path,
config: &ConfigContext,
runner: Option<&MockCommandState>,
raise_buf: RaiseBuffer,
raised_auth_token: Option<String>,
) -> Result<CodexResult> {
if let Some(token) = raised_auth_token {
let timeout = codex_sync_timeout(request.timeout_seconds);
let timeout_request = request.clone();
let (tx, rx) = mpsc::channel();
std::thread::spawn(move || {
flush_raises_before_sync_codex(&raise_buf, Some(&token));
let _ = tx.send(());
});
match rx.recv_timeout(timeout) {
Ok(()) => {}
Err(RecvTimeoutError::Timeout) => return Ok(sync_timeout_result(&timeout_request)),
Err(RecvTimeoutError::Disconnected) => {
return Err(mlua::Error::external(
"spawn_codex_sync raise flush exited without result",
))
}
}
}
run_codex_request(request, host_root, config, runner)
}

// the same input names an overall wall-clock timeout with a bounded default.
fn codex_request_from_opts(opts: Table, runtime_dept: Option<String>) -> CodexRequest {
let prompt: String = opts.get("prompt").unwrap_or_default();
Expand Down Expand Up @@ -615,7 +651,10 @@ fn run_adoptable_codex_request(
.write(true)
.open(lock_path)
.map_err(mlua::Error::external)?;
flock(lock_file.as_raw_fd(), FlockArg::LockExclusive).map_err(mlua::Error::external)?;
let deadline = codex_deadline(request.timeout_seconds);
if !try_lock_until_deadline(lock_file.as_raw_fd(), deadline).map_err(mlua::Error::external)? {
return Ok(adoption_timeout_result(&request, &paths)?);
}
if let Some(result) = read_completed_adoption_result(&paths)? {
drop(lock_file);
return Ok(result);
Expand Down Expand Up @@ -982,10 +1021,7 @@ fn wait_for_adoption_result(
request: &CodexRequest,
paths: &CodexAdoptionPaths,
) -> Result<CodexResult> {
let deadline = Instant::now()
+ overall_timeout(request.timeout_seconds)
.unwrap_or_else(|| Duration::from_secs(DEFAULT_CODEX_TIMEOUT_SECONDS as u64))
+ CODEX_ADOPTION_TIMEOUT_GRACE;
let deadline = codex_deadline(request.timeout_seconds) + CODEX_ADOPTION_TIMEOUT_GRACE;
loop {
if let Some(result) = read_completed_adoption_result(paths)? {
return Ok(result);
Expand All @@ -994,23 +1030,72 @@ fn wait_for_adoption_result(
if let Some(record) = read_adoption_record(&paths.status)? {
kill_adoption_worker_group(&record);
}
let message = format!(
"adopted codex timed out waiting for result after {}s wall clock",
request.timeout_seconds
);
return Ok(CodexResult::failure(
"timeout",
message,
read_optional_string(&paths.stdout).map_err(mlua::Error::external)?,
read_optional_string(&paths.stderr).map_err(mlua::Error::external)?,
124,
request.log_path.to_string_lossy().into_owned(),
));
return Ok(adoption_timeout_result(request, paths)?);
}
std::thread::sleep(CODEX_ADOPTION_POLL);
}
}

fn adoption_timeout_result(
request: &CodexRequest,
paths: &CodexAdoptionPaths,
) -> Result<CodexResult> {
let message = format!(
"adopted codex timed out waiting for result after {}s wall clock",
request.timeout_seconds
);
Ok(CodexResult::failure(
"timeout",
message,
read_optional_string(&paths.stdout).map_err(mlua::Error::external)?,
read_optional_string(&paths.stderr).map_err(mlua::Error::external)?,
124,
request.log_path.to_string_lossy().into_owned(),
))
}

fn sync_timeout_result(request: &CodexRequest) -> CodexResult {
CodexResult::failure(
"timeout",
format!(
"spawn_codex_sync timed out after {}s wall clock",
request.timeout_seconds
),
String::new(),
String::new(),
124,
request.log_path.to_string_lossy().into_owned(),
)
}

fn codex_sync_timeout(timeout_seconds: i64) -> Duration {
overall_timeout(timeout_seconds)
.unwrap_or_else(|| Duration::from_secs(DEFAULT_CODEX_TIMEOUT_SECONDS as u64))
}

fn codex_deadline(timeout_seconds: i64) -> Instant {
Instant::now() + codex_sync_timeout(timeout_seconds)
}

fn try_lock_until_deadline(fd: std::os::fd::RawFd, deadline: Instant) -> anyhow::Result<bool> {
loop {
match flock(fd, FlockArg::LockExclusiveNonblock) {
Ok(()) => return Ok(true),
Err(err) if lock_is_busy(err) => {
if Instant::now() >= deadline {
return Ok(false);
}
std::thread::sleep(CODEX_ADOPTION_POLL);
}
Err(err) => return Err(err.into()),
}
}
}

fn lock_is_busy(err: nix::errno::Errno) -> bool {
err == nix::errno::Errno::EWOULDBLOCK || err == nix::errno::Errno::EAGAIN
}

#[cfg(unix)]
fn kill_adoption_worker_group(record: &CodexAdoptionRecord) {
if let Some(pid) = record.codex_pid {
Expand Down Expand Up @@ -1769,6 +1854,7 @@ fn wait_codex_with_timeout_and_tail(
} else {
killed_for_timeout = Some(message);
}
break Err("codex overall timeout reached".to_string());
}
Err(RecvTimeoutError::Disconnected) => {
break Err("codex wait channel closed before process exit".to_string());
Expand All @@ -1777,9 +1863,13 @@ fn wait_codex_with_timeout_and_tail(
};

for reader in readers {
let _ = reader.join();
if killed_for_timeout.is_none() {
let _ = reader.join();
}
}
if killed_for_timeout.is_none() {
let _ = waiter.join();
}
let _ = waiter.join();
drain_codex_events(&rx, &mut stdout, &mut stderr);
write_live_output_tail(output_tail_path.as_deref(), &stdout, &stderr);

Expand Down
127 changes: 127 additions & 0 deletions crates/fkst-framework/tests/sdk_codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,21 @@ fn recv_result(rx: &std::sync::mpsc::Receiver<String>, label: &str) -> String {
.unwrap_or_else(|err| panic!("timed out waiting for {label}: {err}"))
}

#[cfg(unix)]
fn wait_for_process_exit(pid: i32, timeout: Duration) -> bool {
use nix::sys::signal::kill;
use nix::unistd::Pid;

let deadline = std::time::Instant::now() + timeout;
while std::time::Instant::now() < deadline {
if kill(Pid::from_raw(pid), None).is_err() {
return true;
}
std::thread::sleep(Duration::from_millis(25));
}
false
}

#[cfg(unix)]
fn continuous_output_codex_script(stream_redirect: &'static str) -> String {
format!(
Expand Down Expand Up @@ -933,6 +948,118 @@ printf 'adopted-%s' "$count"
);
}

#[cfg(unix)]
#[test]
fn spawn_codex_sync_adoption_quiet_child_returns_timeout_failure() {
let tmp = tempfile::tempdir().unwrap();
let bin_dir = tmp.path().join("bin");
let worktree = tmp.path().join("wt");
let pid_fifo = tmp.path().join("pid.fifo");
std::fs::create_dir_all(&worktree).unwrap();
make_fifo(&pid_fifo);
install_codex_script(
&bin_dir,
r#"#!/bin/sh
cat >/dev/null
printf '%s' "$$" > "$PID_FIFO"
while :; do sleep 10; done
"#,
);

let mut sandbox = ProcessSandbox::new();
sandbox.enter_cwd(tmp.path()).runtime_root(".fkst/runtime");
sandbox.prepend_path(&bin_dir);
sandbox.set_env("PID_FIFO", pid_fifo.to_string_lossy().into_owned());
sandbox.set_env(CODEX_WORKER_BIN_ENV, framework_bin());
sandbox.runtime_log_dir(tmp.path().join("runtime"));
let (_lock, _guard) = sandbox.enter();

let pid_reader = std::thread::spawn({
let pid_fifo = pid_fifo.clone();
move || read_fifo(&pid_fifo).trim().parse::<i32>().unwrap()
});

let lua = Lua::new();
register(&lua).unwrap();
let spawn: mlua::Function = lua.globals().get("spawn_codex_sync").unwrap();
let opts = lua_opts(&lua, "quiet adoption timeout");
opts.set("worktree", worktree.to_string_lossy().into_owned())
.unwrap();
opts.set("dedup_key", "quiet-timeout").unwrap();
opts.set("timeout", 1).unwrap();

let result: Table = spawn.call(opts).unwrap();
let child_pid = pid_reader.join().unwrap();
assert_eq!(result.get::<i64>("exit_code").unwrap(), 124);
assert_eq!(result.get::<String>("error_kind").unwrap(), "timeout");
assert!(result.get::<String>("error").unwrap().contains("timed out"));
assert!(wait_for_process_exit(child_pid, Duration::from_secs(3)));
}

#[cfg(unix)]
#[test]
fn spawn_codex_sync_adoption_lock_wait_is_bounded_by_timeout() {
let tmp = tempfile::tempdir().unwrap();
let bin_dir = tmp.path().join("bin");
let worktree = tmp.path().join("wt");
std::fs::create_dir_all(&worktree).unwrap();
install_codex_script(
&bin_dir,
r#"#!/bin/sh
cat >/dev/null
printf 'must-not-run'
"#,
);

let mut sandbox = ProcessSandbox::new();
sandbox.enter_cwd(tmp.path()).runtime_root(".fkst/runtime");
sandbox.prepend_path(&bin_dir);
sandbox.set_env(CODEX_WORKER_BIN_ENV, framework_bin());
sandbox.runtime_log_dir(tmp.path().join("runtime"));
let (_lock, _guard) = sandbox.enter();

let lua = Lua::new();
register(&lua).unwrap();
let spawn: mlua::Function = lua.globals().get("spawn_codex_sync").unwrap();
let first_opts = lua_opts(&lua, "stale lock");
first_opts
.set("worktree", worktree.to_string_lossy().into_owned())
.unwrap();
first_opts.set("dedup_key", "lock-timeout").unwrap();
first_opts.set("timeout", 30).unwrap();

let first: Table = spawn.call(first_opts).unwrap();
assert_eq!(first.get::<String>("stdout").unwrap(), "must-not-run");

let adoption_dir = tmp.path().join(".fkst/runtime/logs/codex-adoption");
let adoption_key_dir = std::fs::read_dir(&adoption_dir)
.unwrap()
.map(|entry| entry.unwrap().path())
.find(|path| path.is_dir())
.expect("adoption key dir");
let lock_file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(adoption_key_dir.join("run.lock"))
.unwrap();
flock(lock_file.as_raw_fd(), FlockArg::LockExclusiveNonblock).unwrap();
std::fs::remove_file(adoption_key_dir.join("status.json")).unwrap();

let second_opts = lua_opts(&lua, "stale lock");
second_opts
.set("worktree", worktree.to_string_lossy().into_owned())
.unwrap();
second_opts.set("dedup_key", "lock-timeout").unwrap();
second_opts.set("timeout", 1).unwrap();
let started = std::time::Instant::now();
let second: Table = spawn.call(second_opts).unwrap();

assert!(started.elapsed() < Duration::from_secs(5));
assert_eq!(second.get::<i64>("exit_code").unwrap(), 124);
assert_eq!(second.get::<String>("error_kind").unwrap(), "timeout");
assert!(second.get::<String>("error").unwrap().contains("timed out"));
}

#[cfg(unix)]
#[test]
fn codex_runs_reads_running_adoption_record_without_status_log() {
Expand Down
Loading