From 7ad990a2876dd7d8b04bc76f95308c8f7c4d7457 Mon Sep 17 00:00:00 2001 From: auric Date: Fri, 19 Jun 2026 06:00:01 +0800 Subject: [PATCH] =?UTF-8?q?auto-implement=20#135:=20Flush=20the=20raise=20?= =?UTF-8?q?buffer=20before=20a=20department=20blocks=20in=20spawn=5Fcodex?= =?UTF-8?q?=5Fsync=20(raises=20are=20invisible=20for=20the=20entire=20long?= =?UTF-8?q?=20codex=20run=20=E2=86=92=20false-terminal=20+=20double-spawn)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/fkst-framework/src/main.rs | 1 + crates/fkst-framework/src/mlua_init.rs | 21 ++- crates/fkst-framework/src/raise.rs | 29 +++ crates/fkst-framework/src/sdk_codex.rs | 25 ++- crates/fkst-framework/src/self_test.rs | 1 + .../fkst-framework/src/supervise/consumer.rs | 173 +++++++++++++++--- crates/fkst-framework/src/test_runner.rs | 2 + crates/fkst-framework/tests/sdk_codex.rs | 9 +- .../tests/release_probe_witness.rs | 5 +- 9 files changed, 229 insertions(+), 37 deletions(-) diff --git a/crates/fkst-framework/src/main.rs b/crates/fkst-framework/src/main.rs index 0d247f6..b69104c 100644 --- a/crates/fkst-framework/src/main.rs +++ b/crates/fkst-framework/src/main.rs @@ -473,6 +473,7 @@ fn run_pipeline( RaiseAuthority::new(declared_produces), Some(roots.clone()), graph_json_authorized, + raised_auth_token.clone(), )?; let exit_code = match mlua_init::run_dept_with_require_roots( diff --git a/crates/fkst-framework/src/mlua_init.rs b/crates/fkst-framework/src/mlua_init.rs index e3d53dc..bff8439 100644 --- a/crates/fkst-framework/src/mlua_init.rs +++ b/crates/fkst-framework/src/mlua_init.rs @@ -30,6 +30,7 @@ pub fn register_framework_sdk( raise_authority: RaiseAuthority, graph_roots: Option, graph_json_authorized: bool, + raised_auth_token: Option, ) -> mlua::Result<()> { let config = ConfigContext::from_host_root(host_root).map_err(mlua::Error::external)?; crate::rate_pool::RatePoolRegistry::from_config(&config).map_err(mlua::Error::external)?; @@ -43,7 +44,14 @@ pub fn register_framework_sdk( crate::sdk_git::register(lua, host_root, config.clone())?; crate::sdk_mark::register(lua, host_root)?; crate::sdk_cache::register(lua, host_root)?; - crate::sdk_codex::register(lua, host_root, config, dept)?; + crate::sdk_codex::register( + lua, + host_root, + config, + dept, + raise_buf.clone(), + raised_auth_token, + )?; crate::raise::register(lua, raise_buf, resolver, owner_namespace, raise_authority)?; Ok(()) } @@ -60,6 +68,7 @@ pub(crate) fn register_framework_sdk_with_runner( runner: Option, graph_roots: Option, graph_json_authorized: bool, + raised_auth_token: Option, ) -> mlua::Result<()> { let config = ConfigContext::from_host_root(host_root).map_err(mlua::Error::external)?; crate::rate_pool::RatePoolRegistry::from_config(&config).map_err(mlua::Error::external)?; @@ -73,7 +82,15 @@ pub(crate) fn register_framework_sdk_with_runner( crate::sdk_git::register_with_runner(lua, host_root, config.clone(), runner.clone())?; crate::sdk_mark::register(lua, host_root)?; crate::sdk_cache::register(lua, host_root)?; - crate::sdk_codex::register_with_runner(lua, host_root, config, dept, runner)?; + crate::sdk_codex::register_with_runner( + lua, + host_root, + config, + dept, + runner, + raise_buf.clone(), + raised_auth_token, + )?; crate::raise::register(lua, raise_buf, resolver, owner_namespace, raise_authority)?; Ok(()) } diff --git a/crates/fkst-framework/src/raise.rs b/crates/fkst-framework/src/raise.rs index ffa01e8..2d2cf92 100644 --- a/crates/fkst-framework/src/raise.rs +++ b/crates/fkst-framework/src/raise.rs @@ -9,6 +9,7 @@ use mlua::{Lua, LuaSerdeExt, Result}; use serde::Serialize; use serde_json::Value as JsonValue; use std::collections::BTreeSet; +use std::io::Write; use std::sync::{Arc, Mutex}; use crate::path_resolver::NameResolver; @@ -64,6 +65,15 @@ impl RaiseBuffer { pub fn encoded_frame(&self) -> Option { let entries = self.0.lock().unwrap().clone(); + Self::encode_entries(entries) + } + + pub(crate) fn drain_encoded_frame(&self) -> Option { + let entries = self.0.lock().unwrap().drain(..).collect(); + Self::encode_entries(entries) + } + + fn encode_entries(entries: Vec) -> Option { if entries.is_empty() { return None; } @@ -74,12 +84,21 @@ impl RaiseBuffer { pub fn emit_stdout(&self) { if let Some(b64) = self.encoded_frame() { println!("RAISED: {}", b64); + let _ = std::io::stdout().flush(); } } pub(crate) fn emit_authenticated_stdout(&self, token: &str) { if let Some(b64) = self.encoded_frame() { println!("RAISED-AUTH: {token} {b64}"); + let _ = std::io::stdout().flush(); + } + } + + pub(crate) fn drain_emit_authenticated_stdout(&self, token: &str) { + if let Some(b64) = self.drain_encoded_frame() { + println!("RAISED-AUTH: {token} {b64}"); + let _ = std::io::stdout().flush(); } } } @@ -331,4 +350,14 @@ mod tests { // Just verify no panic; output goes to real stdout in tests so we can't capture cleanly. buf.emit_stdout(); } + + #[test] + fn drain_encoded_frame_removes_buffered_entries() { + let buf = RaiseBuffer::new(); + buf.push("done".to_string(), serde_json::json!({"n": 1})); + + assert!(buf.drain_encoded_frame().is_some()); + assert!(buf.encoded_frame().is_none()); + assert!(buf.snapshot().is_empty()); + } } diff --git a/crates/fkst-framework/src/sdk_codex.rs b/crates/fkst-framework/src/sdk_codex.rs index e9f01ac..652ab8a 100644 --- a/crates/fkst-framework/src/sdk_codex.rs +++ b/crates/fkst-framework/src/sdk_codex.rs @@ -27,6 +27,7 @@ use crate::config_registry::{ConfigContext, ConfigKey}; use crate::external_command::{ format_command, MockCommandInvocation, MockCommandPlan, MockCommandResult, MockCommandState, }; +use crate::raise::RaiseBuffer; use crate::runtime_context; pub(crate) const CODEX_PERMIT_SLOTS_ENV: &str = "FKST_CODEX_PERMIT_SLOTS"; @@ -306,8 +307,18 @@ pub fn register( host_root: &Path, config: ConfigContext, dept: Option, + raise_buf: RaiseBuffer, + raised_auth_token: Option, ) -> Result<()> { - register_with_runner(lua, host_root, config, dept, None) + register_with_runner( + lua, + host_root, + config, + dept, + None, + raise_buf, + raised_auth_token, + ) } pub(crate) fn register_with_runner( @@ -316,6 +327,8 @@ pub(crate) fn register_with_runner( config: ConfigContext, dept: Option, runner: Option, + raise_buf: RaiseBuffer, + raised_auth_token: Option, ) -> Result<()> { let owner_id = NEXT_PIPELINE_OWNER_ID.fetch_add(1, Ordering::Relaxed); let next_task_id = Arc::new(AtomicU64::new(1)); @@ -323,15 +336,19 @@ pub(crate) fn register_with_runner( let config = Arc::new(config); let dept = Arc::new(dept); let runner = Arc::new(runner); + let raised_auth_token = Arc::new(raised_auth_token); lua.globals().set("spawn_codex_sync", { let host_root = Arc::clone(&host_root); let config = Arc::clone(&config); let dept = Arc::clone(&dept); let runner = Arc::clone(&runner); + let raise_buf = raise_buf.clone(); + let raised_auth_token = Arc::clone(&raised_auth_token); lua.create_function(move |lua, opts: Table| { crate::process_tree::ensure_supervisor_parent_alive()?; let request = codex_request_from_opts(opts, dept.as_deref().map(str::to_string)); + flush_raises_before_sync_codex(&raise_buf, raised_auth_token.as_deref()); run_codex_request(request, &host_root, &config, runner.as_ref().as_ref())? .into_lua_table(lua) })? @@ -381,6 +398,12 @@ pub(crate) fn register_with_runner( Ok(()) } +fn flush_raises_before_sync_codex(raise_buf: &RaiseBuffer, raised_auth_token: Option<&str>) { + if let Some(token) = raised_auth_token { + raise_buf.drain_emit_authenticated_stdout(token); + } +} + // the same input names an overall wall-clock timeout with a bounded default. fn codex_request_from_opts(opts: Table, runtime_dept: Option) -> CodexRequest { let prompt: String = opts.get("prompt").unwrap_or_default(); diff --git a/crates/fkst-framework/src/self_test.rs b/crates/fkst-framework/src/self_test.rs index 8b10d2e..1a4f0fb 100644 --- a/crates/fkst-framework/src/self_test.rs +++ b/crates/fkst-framework/src/self_test.rs @@ -142,6 +142,7 @@ fn check_sdk_registration(host_root: &std::path::Path) -> Result<()> { crate::raise::RaiseAuthority::new(Default::default()), None, false, + None, ) .context("register framework SDK")?; lua.load( diff --git a/crates/fkst-framework/src/supervise/consumer.rs b/crates/fkst-framework/src/supervise/consumer.rs index d5717c8..4d438b2 100644 --- a/crates/fkst-framework/src/supervise/consumer.rs +++ b/crates/fkst-framework/src/supervise/consumer.rs @@ -8,7 +8,7 @@ use super::delivery_types::{ use super::event_fanout::Fanout; use super::failure_fact::{dead_record_payload, delivery_failure_fact, FAILURE_FACT_QUEUE}; use super::journal::{optional_path, SupervisorJournal}; -use super::raised::parse_authenticated_raised; +use super::raised::{parse_authenticated_raised, parse_authenticated_raised_line}; use super::source_runner::parse_duration; use super::spawner::{spawn_framework_with_stdout_observer, SpawnResult, StdoutLineObserver}; use crate::path_resolver::PackageRoots; @@ -17,6 +17,7 @@ use fkst_common::config::{DepartmentDecl, RetryDecl}; use fkst_common::{Event, RuntimeKind}; use std::collections::BTreeMap; use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; @@ -287,17 +288,46 @@ fn spawn_ephemeral( let dept_name = name.to_string(); let router = router.clone(); tokio::spawn(async move { - match spawn_and_report(&dept_name, &args, &journal).await { + let raised_observed = Arc::new(AtomicBool::new(false)); + let publish_error = Arc::new(std::sync::Mutex::new(None::)); + let stdout_observer = args.raised_auth_token.clone().map(|token| { + let router = router.clone(); + let raised_observed = Arc::clone(&raised_observed); + let publish_error = Arc::clone(&publish_error); + Arc::new(move |line: &str| -> anyhow::Result<()> { + let raised_events = parse_authenticated_raised_line(line, &token); + if !raised_events.is_empty() { + raised_observed.store(true, Ordering::Relaxed); + } + if let Err(err) = publish_ephemeral_raised_events(&router, raised_events) { + if let Ok(mut publish_error) = publish_error.lock() { + *publish_error = Some(err.to_string()); + } + } + Ok(()) + }) as StdoutLineObserver + }); + match spawn_and_report_with_stdout_observer(&dept_name, &args, stdout_observer, &journal) + .await + { Ok(result) => { if result.exit_code != 0 { return; } - if let Err(err) = publish_ephemeral_raised( - &router, - &result.stdout, - args.raised_auth_token.as_deref(), - ) { - error!(dept = %dept_name, error = %err, "publish raised failed"); + if let Ok(mut publish_error) = publish_error.lock() { + if let Some(err) = publish_error.take() { + error!(dept = %dept_name, error = %err, "publish raised failed"); + return; + } + } + if !raised_observed.load(Ordering::Relaxed) { + if let Err(err) = publish_ephemeral_raised( + &router, + &result.stdout, + args.raised_auth_token.as_deref(), + ) { + error!(dept = %dept_name, error = %err, "publish raised failed"); + } } } Err(err) => { @@ -419,17 +449,71 @@ async fn run_durable_record( args: SpawnArgs, journal: &SupervisorJournal, ) -> CompletedDelivery { - let result = spawn_and_report(dept_name, &args, journal).await; + let next_ordinal = Arc::new(std::sync::Mutex::new(0_usize)); + let publish_error = Arc::new(std::sync::Mutex::new(None::)); + let stdout_observer = args.raised_auth_token.clone().map(|token| { + let router = router.clone(); + let record = record.clone(); + let next_ordinal = Arc::clone(&next_ordinal); + let publish_error = Arc::clone(&publish_error); + Arc::new(move |line: &str| -> anyhow::Result<()> { + let raised_events = parse_authenticated_raised_line(line, &token); + match next_ordinal.lock() { + Ok(mut next_ordinal) => { + if let Err(err) = + publish_raised_events(&router, raised_events, &record, &mut next_ordinal) + { + if let Ok(mut publish_error) = publish_error.lock() { + *publish_error = Some(err.to_string()); + } + } + } + Err(_) => { + if let Ok(mut publish_error) = publish_error.lock() { + *publish_error = Some("raised ordinal lock poisoned".to_string()); + } + } + } + Ok(()) + }) as StdoutLineObserver + }); + let result = + spawn_and_report_with_stdout_observer(dept_name, &args, stdout_observer, journal).await; let failure = match result { Ok(result) if result.exit_code == 0 => { - publish_raised( - &router, - &result.stdout, - args.raised_auth_token.as_deref(), - &record, - ) - .err() - .map(|err| DeliveryFailure::permanent(format!("raised publish error: {err}"))) + let publish_error_message = match publish_error.lock() { + Ok(mut publish_error) => publish_error.take(), + Err(_) => Some("raised publish error lock poisoned".to_string()), + }; + if let Some(err) = publish_error_message { + return CompletedDelivery { + record, + failure: Some(DeliveryFailure::permanent(format!( + "raised publish error: {err}" + ))), + }; + } + let next_ordinal = match next_ordinal.lock() { + Ok(next_ordinal) => next_ordinal, + Err(_) => { + return CompletedDelivery { + record, + failure: Some(DeliveryFailure::permanent("raised ordinal lock poisoned")), + } + } + }; + if *next_ordinal == 0 { + publish_raised( + &router, + &result.stdout, + args.raised_auth_token.as_deref(), + &record, + ) + } else { + Ok(()) + } + .err() + .map(|err| DeliveryFailure::permanent(format!("raised publish error: {err}"))) } Ok(result) => Some(DeliveryFailure::from_spawn_result(&result)), Err(err) => { @@ -791,12 +875,26 @@ fn publish_raised( let Some(raised_auth_token) = raised_auth_token else { return Ok(()); }; - for (ordinal, mut raised_ev) in parse_authenticated_raised(stdout, raised_auth_token) - .into_iter() - .enumerate() - { + let mut next_ordinal = 0; + publish_raised_events( + router, + parse_authenticated_raised(stdout, raised_auth_token), + parent, + &mut next_ordinal, + ) +} + +fn publish_raised_events( + router: &DeliveryRouter, + raised_events: Vec, + parent: &DeliveryRecord, + next_ordinal: &mut usize, +) -> anyhow::Result<()> { + for mut raised_ev in raised_events { reject_reserved_raised_queue(&raised_ev)?; raised_ev.ts = parent.observed_at_ms; + let ordinal = *next_ordinal; + *next_ordinal = next_ordinal.saturating_add(1); router.publish(PublishEnvelope { event: raised_ev, source: parent.source.clone(), @@ -818,7 +916,17 @@ fn publish_ephemeral_raised( let Some(raised_auth_token) = raised_auth_token else { return Ok(()); }; - for raised_ev in parse_authenticated_raised(stdout, raised_auth_token) { + publish_ephemeral_raised_events( + router, + parse_authenticated_raised(stdout, raised_auth_token), + ) +} + +fn publish_ephemeral_raised_events( + router: &DeliveryRouter, + raised_events: Vec, +) -> anyhow::Result<()> { + for raised_ev in raised_events { reject_reserved_raised_queue(&raised_ev)?; router.publish(PublishEnvelope { event: raised_ev, @@ -1466,11 +1574,12 @@ mod tests { } #[tokio::test] - async fn durable_authenticated_raised_publishes_after_child_exit() { + async fn durable_authenticated_raised_publishes_before_child_exit() { let temp = TempDir::new().unwrap(); let binary = temp.path().join("fkst-framework"); let lua = temp.path().join("dept.lua"); let log_dir = temp.path().join("logs"); + let release = temp.path().join("release"); fs::write(&lua, "return {}\n").unwrap(); let trusted_raised = base64::engine::general_purpose::URL_SAFE.encode( serde_json::to_vec(&serde_json::json!([ @@ -1481,7 +1590,8 @@ mod tests { write_executable( &binary, &format!( - "printf 'RAISED: forged\\n'; printf 'RAISED-AUTH: trusted-token {trusted_raised}\\n'; exit 0" + "printf 'RAISED: forged\\n'; printf 'RAISED-AUTH: trusted-token {trusted_raised}\\n'; while [ ! -f '{}' ]; do sleep 0.05; done; exit 0", + release.display() ), ); @@ -1534,7 +1644,7 @@ mod tests { process_groups: ProcessGroupRegistry::default(), }; - let done = tokio::spawn(async move { + let handle = tokio::spawn(async move { run_durable_record( "worker", &spawned_router, @@ -1543,17 +1653,22 @@ mod tests { &SupervisorJournal::disabled(), ) .await - }) - .await - .unwrap(); + }); let event = timeout(Duration::from_secs(1), rx.recv()) .await - .expect("authenticated raise should publish after child exit") + .expect("authenticated raise should publish before child exit") .expect("done subscription should remain open"); assert_eq!(event.queue, "done"); assert_eq!(event.payload, serde_json::json!({"n": 2})); + fs::write(&release, "").unwrap(); + let done = handle.await.unwrap(); assert!(done.failure.is_none()); + let no_duplicate = timeout(Duration::from_millis(100), rx.recv()).await; + assert!( + no_duplicate.is_err(), + "streamed RAISED frame must not publish again after child exit" + ); } #[tokio::test] diff --git a/crates/fkst-framework/src/test_runner.rs b/crates/fkst-framework/src/test_runner.rs index 8b92f27..e1c5417 100644 --- a/crates/fkst-framework/src/test_runner.rs +++ b/crates/fkst-framework/src/test_runner.rs @@ -57,6 +57,7 @@ pub(crate) fn run_tests( Some(mock_commands.clone()), Some(roots.clone()), false, + None, ) .with_context(|| format!("register SDK for {}", relpath))?; register_test_sdk( @@ -586,6 +587,7 @@ fn run_department( Some(mock_commands), Some(roots.clone()), graph_json_authorized, + None, )?; if let Some(coverage) = &coverage { coverage.install(&dept_lua)?; diff --git a/crates/fkst-framework/tests/sdk_codex.rs b/crates/fkst-framework/tests/sdk_codex.rs index 0a95da3..c3fcb1d 100644 --- a/crates/fkst-framework/tests/sdk_codex.rs +++ b/crates/fkst-framework/tests/sdk_codex.rs @@ -6,10 +6,14 @@ mod boundary_resource; mod config_registry; #[path = "../src/external_command.rs"] mod external_command; +#[path = "../src/path_resolver.rs"] +mod path_resolver; #[path = "../src/process_tree.rs"] mod process_tree; #[path = "../src/provenance.rs"] mod provenance; +#[path = "../src/raise.rs"] +mod raise; #[path = "../src/rate_pool.rs"] mod rate_pool; #[path = "../src/rate_shim.rs"] @@ -18,10 +22,13 @@ mod rate_shim; mod runtime_context; #[path = "../src/sdk_codex.rs"] mod sdk_codex; +#[path = "../src/sdk_json.rs"] +mod sdk_json; mod support; use mlua::{AnyUserData, Function, Lua, Table}; use nix::fcntl::{flock, FlockArg}; +use raise::RaiseBuffer; use sdk_codex::{ acquire_permit, ensure_pool, CodexResult, CodexTaskHandle, CODEX_PERMIT_SLOTS_ENV, }; @@ -42,7 +49,7 @@ fn register_with_dept(lua: &Lua, dept: Option) -> mlua::Result<()> { let host_root = std::env::current_dir().map_err(mlua::Error::external)?; let config = config_registry::ConfigContext::from_host_root(&host_root) .map_err(mlua::Error::external)?; - sdk_codex::register(lua, &host_root, config, dept) + sdk_codex::register(lua, &host_root, config, dept, RaiseBuffer::new(), None) } #[cfg(unix)] diff --git a/crates/fkst-supervisor/tests/release_probe_witness.rs b/crates/fkst-supervisor/tests/release_probe_witness.rs index 8e204b5..cc22296 100644 --- a/crates/fkst-supervisor/tests/release_probe_witness.rs +++ b/crates/fkst-supervisor/tests/release_probe_witness.rs @@ -478,10 +478,7 @@ fn supervisor_framework_completes_release_probe_raised_cycle() { child_log.contains("DEPT=release_probe\n"), "log={child_log}" ); - assert!( - child_log.contains("RAISED_AUTH=enabled"), - "log={child_log}" - ); + assert!(child_log.contains("RAISED_AUTH=enabled"), "log={child_log}"); assert!(child_log.contains("EXIT=0\n"), "log={child_log}"); }