diff --git a/crates/fkst-framework/src/main.rs b/crates/fkst-framework/src/main.rs index 5ecbe43..0d247f6 100644 --- a/crates/fkst-framework/src/main.rs +++ b/crates/fkst-framework/src/main.rs @@ -18,6 +18,7 @@ use anyhow::{Context, Result}; use host_conformance::HostConformanceOptions; use path_resolver::PackageRoots; use serde_json::Value as JsonValue; +use std::collections::BTreeSet; use std::path::{Path, PathBuf}; mod boundary_resource; @@ -48,6 +49,7 @@ mod sdk_log; mod sdk_mark; mod sdk_strings; mod self_test; +mod spec_queues; mod supervise; mod test_runner; @@ -56,7 +58,7 @@ mod test_env { pub(crate) static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(()); } -use raise::RaiseBuffer; +use raise::{RaiseAuthority, RaiseBuffer, RAISED_AUTH_TOKEN_ENV}; enum CliCommand { Run { @@ -442,6 +444,8 @@ fn run_pipeline( eprintln!("[framework] supervisor parent lost before pipeline start"); return Ok(125); } + let raised_auth_token = std::env::var(RAISED_AUTH_TOKEN_ENV).ok(); + std::env::remove_var(RAISED_AUTH_TOKEN_ENV); let lua = mlua_init::new_lua(); let raise_buf = RaiseBuffer::new(); let owner_root = roots @@ -452,6 +456,9 @@ fn run_pipeline( let require_roots = roots.require_roots_for_owner(owner_root); let graph_json_authorized = sdk_graph::department_authorized(&roots, owner_root, &lua_path).unwrap_or(false); + let declared_produces = + department_declared_resolved_produces(&roots, owner_root, &owner_namespace, &lua_path) + .with_context(|| format!("resolve raise authority for {}", lua_path.display()))?; mlua_init::register_framework_sdk( &lua, @@ -459,8 +466,11 @@ fn run_pipeline( roots.host_root(), owner_root, department_name_for_lua(&lua_path, owner_root, &owner_namespace), - roots.name_resolver(), + roots + .name_resolver() + .with_recorded_only_queues(declared_produces.clone()), owner_namespace.clone(), + RaiseAuthority::new(declared_produces), Some(roots.clone()), graph_json_authorized, )?; @@ -478,10 +488,36 @@ fn run_pipeline( } }; - raise_buf.emit_stdout(); + if let Some(token) = raised_auth_token { + raise_buf.emit_authenticated_stdout(&token); + } else { + raise_buf.emit_stdout(); + } Ok(exit_code) } +fn department_declared_resolved_produces( + roots: &PackageRoots, + owner_root: &Path, + owner_namespace: &str, + lua_path: &Path, +) -> Result> { + let lua_path = lua_path + .canonicalize() + .with_context(|| format!("canonicalize {}", lua_path.display()))?; + let require_roots = roots.require_roots_for_owner(owner_root); + let package_path = mlua_init::package_roots_path(require_roots.iter().map(PathBuf::as_path)); + spec_queues::declared_resolved_produces( + roots, + owner_namespace, + owner_root, + &lua_path, + &package_path, + &mlua_init::LuaChunkCache::default(), + ) + .map_err(anyhow::Error::from) +} + fn department_name_for_lua( lua_path: &Path, owner_root: &Path, diff --git a/crates/fkst-framework/src/mlua_init.rs b/crates/fkst-framework/src/mlua_init.rs index 6d4d258..e3d53dc 100644 --- a/crates/fkst-framework/src/mlua_init.rs +++ b/crates/fkst-framework/src/mlua_init.rs @@ -11,7 +11,7 @@ use std::time::SystemTime; use crate::config_registry::ConfigContext; use crate::external_command::MockCommandState; use crate::path_resolver::{package_root_path, NameResolver, PackageRoots}; -use crate::raise::RaiseBuffer; +use crate::raise::{RaiseAuthority, RaiseBuffer}; /// Create a Lua state with stdlib enabled. pub fn new_lua() -> Lua { @@ -27,6 +27,7 @@ pub fn register_framework_sdk( dept: Option, resolver: NameResolver, owner_namespace: String, + raise_authority: RaiseAuthority, graph_roots: Option, graph_json_authorized: bool, ) -> mlua::Result<()> { @@ -43,7 +44,7 @@ pub fn register_framework_sdk( crate::sdk_mark::register(lua, host_root)?; crate::sdk_cache::register(lua, host_root)?; crate::sdk_codex::register(lua, host_root, config, dept)?; - crate::raise::register(lua, raise_buf, resolver, owner_namespace)?; + crate::raise::register(lua, raise_buf, resolver, owner_namespace, raise_authority)?; Ok(()) } @@ -55,6 +56,7 @@ pub(crate) fn register_framework_sdk_with_runner( dept: Option, resolver: NameResolver, owner_namespace: String, + raise_authority: RaiseAuthority, runner: Option, graph_roots: Option, graph_json_authorized: bool, @@ -72,7 +74,7 @@ pub(crate) fn register_framework_sdk_with_runner( crate::sdk_mark::register(lua, host_root)?; crate::sdk_cache::register(lua, host_root)?; crate::sdk_codex::register_with_runner(lua, host_root, config, dept, runner)?; - crate::raise::register(lua, raise_buf, resolver, owner_namespace)?; + crate::raise::register(lua, raise_buf, resolver, owner_namespace, raise_authority)?; Ok(()) } diff --git a/crates/fkst-framework/src/raise.rs b/crates/fkst-framework/src/raise.rs index 0775a33..ffa01e8 100644 --- a/crates/fkst-framework/src/raise.rs +++ b/crates/fkst-framework/src/raise.rs @@ -1,16 +1,20 @@ -//! SDK: `raise(queue_name, payload)` — buffers in-process; emit RAISED stdout line on exit. +//! SDK: `raise(queue_name, payload)` — buffers in-process for trusted parent transport. //! //! Raise is best-effort, at-most-once, and derived-only. Durable intent goes through filesystem. -//! Emit format = `RAISED: `. +//! External direct `run` compatibility still emits `RAISED: ` when no supervise authentication token is supplied. use base64::Engine; use mlua::{Lua, LuaSerdeExt, Result}; use serde::Serialize; use serde_json::Value as JsonValue; +use std::collections::BTreeSet; use std::sync::{Arc, Mutex}; use crate::path_resolver::NameResolver; +pub(crate) const RAISED_AUTH_TOKEN_ENV: &str = "FKST_RAISED_AUTH_TOKEN"; + #[derive(Serialize, Debug, Clone)] struct RaisedEntry { queue: String, @@ -20,6 +24,26 @@ struct RaisedEntry { #[derive(Clone, Default)] pub struct RaiseBuffer(Arc>>); +#[derive(Clone, Debug)] +pub(crate) struct RaiseAuthority { + allowed_queues: BTreeSet, +} + +impl RaiseAuthority { + pub(crate) fn new(allowed_queues: BTreeSet) -> Self { + Self { allowed_queues } + } + + fn ensure_allowed(&self, queue: &str) -> Result<()> { + if self.allowed_queues.contains(queue) { + return Ok(()); + } + Err(mlua::Error::external(format!( + "raise queue `{queue}` is not declared in department M.spec.produces" + ))) + } +} + impl RaiseBuffer { pub fn new() -> Self { Self::default() @@ -38,14 +62,25 @@ impl RaiseBuffer { self.0.lock().unwrap().push(RaisedEntry { queue, payload }); } - pub fn emit_stdout(&self) { + pub fn encoded_frame(&self) -> Option { let entries = self.0.lock().unwrap().clone(); if entries.is_empty() { - return; + return None; } let json = serde_json::to_string(&entries).expect("serialize raise entries"); - let b64 = base64::engine::general_purpose::URL_SAFE.encode(json.as_bytes()); - println!("RAISED: {}", b64); + Some(base64::engine::general_purpose::URL_SAFE.encode(json.as_bytes())) + } + + pub fn emit_stdout(&self) { + if let Some(b64) = self.encoded_frame() { + println!("RAISED: {}", b64); + } + } + + pub(crate) fn emit_authenticated_stdout(&self, token: &str) { + if let Some(b64) = self.encoded_frame() { + println!("RAISED-AUTH: {token} {b64}"); + } } } @@ -54,6 +89,7 @@ pub fn register( buf: RaiseBuffer, resolver: NameResolver, owner_namespace: String, + authority: RaiseAuthority, ) -> Result<()> { let buf_clone = buf.clone(); lua.globals().set( @@ -63,6 +99,7 @@ pub fn register( let queue = resolver .resolve(&owner_namespace, &queue) .map_err(mlua::Error::external)?; + authority.ensure_allowed(&queue)?; let p_json: JsonValue = lua.from_value(payload).unwrap_or(JsonValue::Null); buf_clone.push(queue, p_json); Ok(()) @@ -83,6 +120,7 @@ mod tests { buf, NameResolver::new(["pkg".to_string()]), "pkg".to_string(), + RaiseAuthority::new(BTreeSet::from(["done".to_string()])), ) .unwrap(); } @@ -96,6 +134,7 @@ mod tests { buf.clone(), NameResolver::new(["pkg".to_string()]), "pkg".to_string(), + RaiseAuthority::new(BTreeSet::from(["done".to_string()])), ) .unwrap(); lua.load(r#"raise("done", {n=42})"#).exec().unwrap(); @@ -114,6 +153,7 @@ mod tests { buf.clone(), NameResolver::new(["pkg".to_string()]), "pkg".to_string(), + RaiseAuthority::new(BTreeSet::from(["q1".to_string(), "q2".to_string()])), ) .unwrap(); lua.load( @@ -216,6 +256,7 @@ mod tests { buf.clone(), NameResolver::new(["pkg".to_string(), "host".to_string()]), "pkg".to_string(), + RaiseAuthority::new(BTreeSet::from(["pkg.done".to_string()])), ) .unwrap(); @@ -246,6 +287,7 @@ mod tests { buf.clone(), NameResolver::new(["pkg".to_string(), "host".to_string()]), "pkg".to_string(), + RaiseAuthority::new(BTreeSet::from(["pkg.done".to_string()])), ) .unwrap(); @@ -259,6 +301,30 @@ mod tests { assert!(entries.is_empty()); } + #[test] + fn raise_rejects_resolved_queue_not_in_authority() { + let lua = Lua::new(); + let buf = RaiseBuffer::new(); + register( + &lua, + buf.clone(), + NameResolver::new(["pkg".to_string(), "host".to_string()]), + "pkg".to_string(), + RaiseAuthority::new(BTreeSet::from(["pkg.done".to_string()])), + ) + .unwrap(); + + let err = lua.load(r#"raise("other", {})"#).exec().unwrap_err(); + + assert!( + err.to_string() + .contains("raise queue `pkg.other` is not declared"), + "got: {err}" + ); + let entries = buf.0.lock().unwrap(); + assert!(entries.is_empty()); + } + #[test] fn empty_buffer_emits_nothing() { let buf = RaiseBuffer::new(); diff --git a/crates/fkst-framework/src/self_test.rs b/crates/fkst-framework/src/self_test.rs index 607916e..8b10d2e 100644 --- a/crates/fkst-framework/src/self_test.rs +++ b/crates/fkst-framework/src/self_test.rs @@ -139,6 +139,7 @@ fn check_sdk_registration(host_root: &std::path::Path) -> Result<()> { Some("pkg.self_test".to_string()), NameResolver::new(["pkg".to_string()]), "pkg".to_string(), + crate::raise::RaiseAuthority::new(Default::default()), None, false, ) diff --git a/crates/fkst-framework/src/spec_queues.rs b/crates/fkst-framework/src/spec_queues.rs new file mode 100644 index 0000000..c35d68e --- /dev/null +++ b/crates/fkst-framework/src/spec_queues.rs @@ -0,0 +1,90 @@ +use crate::path_resolver::PackageRoots; +use mlua::{Table, Value}; +use std::collections::BTreeSet; +use std::path::Path; + +pub(crate) fn declared_resolved_produces( + roots: &PackageRoots, + owner_namespace: &str, + owner_root: &Path, + lua_path: &Path, + package_path: &str, + chunk_cache: &crate::mlua_init::LuaChunkCache, +) -> mlua::Result> { + let raw = declared_spec_queues(owner_root, lua_path, package_path, chunk_cache, "produces")?; + let resolver = roots.name_resolver().with_recorded_only_queues( + raw.iter() + .filter(|queue| queue.contains('.')) + .cloned() + .collect(), + ); + raw.into_iter() + .map(|queue| { + resolver + .resolve(owner_namespace, &queue) + .map_err(mlua::Error::external) + }) + .collect() +} + +pub(crate) fn declared_qualified_spec_queues( + owner_root: &Path, + lua_path: &Path, + package_path: &str, + chunk_cache: &crate::mlua_init::LuaChunkCache, + field: &str, +) -> mlua::Result> { + let raw = declared_spec_queues(owner_root, lua_path, package_path, chunk_cache, field)?; + Ok(raw + .into_iter() + .filter(|queue| queue.contains('.')) + .collect()) +} + +fn declared_spec_queues( + owner_root: &Path, + lua_path: &Path, + package_path: &str, + chunk_cache: &crate::mlua_init::LuaChunkCache, + field: &str, +) -> mlua::Result> { + if !is_department_entrypoint(owner_root, lua_path) { + return Ok(BTreeSet::new()); + } + + let lua = crate::mlua_init::new_lua(); + crate::mlua_init::set_package_path_string(&lua, package_path)?; + let value = match chunk_cache.eval_cached_chunk(&lua, lua_path) { + Ok(value) => value, + Err(_) => return Ok(BTreeSet::new()), + }; + let Value::Table(module) = value else { + return Ok(BTreeSet::new()); + }; + let Some(spec) = module.get::>("spec")? else { + return Ok(BTreeSet::new()); + }; + let Some(queues) = spec.get::>(field)? else { + return Ok(BTreeSet::new()); + }; + let mut declared = BTreeSet::new(); + for value in queues.sequence_values::() { + declared.insert(value?); + } + Ok(declared) +} + +fn is_department_entrypoint(owner_root: &Path, lua_path: &Path) -> bool { + let Ok(relative) = lua_path.strip_prefix(owner_root) else { + return false; + }; + let components = relative.components().collect::>(); + match components.as_slice() { + [std::path::Component::Normal(departments), std::path::Component::Normal(name), std::path::Component::Normal(main)] => { + *departments == std::ffi::OsStr::new("departments") + && !name.is_empty() + && *main == std::ffi::OsStr::new("main.lua") + } + _ => false, + } +} diff --git a/crates/fkst-framework/src/supervise/consumer.rs b/crates/fkst-framework/src/supervise/consumer.rs index 8f94b3f..d5717c8 100644 --- a/crates/fkst-framework/src/supervise/consumer.rs +++ b/crates/fkst-framework/src/supervise/consumer.rs @@ -8,7 +8,7 @@ use super::delivery_types::{ use super::event_fanout::Fanout; use super::failure_fact::{dead_record_payload, delivery_failure_fact, FAILURE_FACT_QUEUE}; use super::journal::{optional_path, SupervisorJournal}; -use super::raised::{parse_raised, parse_raised_line}; +use super::raised::parse_authenticated_raised; use super::source_runner::parse_duration; use super::spawner::{spawn_framework_with_stdout_observer, SpawnResult, StdoutLineObserver}; use crate::path_resolver::PackageRoots; @@ -17,9 +17,7 @@ use fkst_common::config::{DepartmentDecl, RetryDecl}; use fkst_common::{Event, RuntimeKind}; use std::collections::BTreeMap; use std::path::PathBuf; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::sync::Mutex; use std::time::Duration; use tokio::sync::mpsc; use tokio::task::JoinHandle; @@ -291,7 +289,14 @@ fn spawn_ephemeral( tokio::spawn(async move { match spawn_and_report(&dept_name, &args, &journal).await { Ok(result) => { - if let Err(err) = publish_ephemeral_raised(&router, &result.stdout) { + if result.exit_code != 0 { + return; + } + if let Err(err) = publish_ephemeral_raised( + &router, + &result.stdout, + args.raised_auth_token.as_deref(), + ) { error!(dept = %dept_name, error = %err, "publish raised failed"); } } @@ -414,18 +419,18 @@ async fn run_durable_record( args: SpawnArgs, journal: &SupervisorJournal, ) -> CompletedDelivery { - let publish_state = Arc::new(StreamingRaiseState::new(router.clone(), record.clone())); - let result = spawn_and_report_with_stdout_observer( - dept_name, - &args, - Some(streaming_raise_observer(publish_state.clone())), - journal, - ) - .await; + let result = spawn_and_report(dept_name, &args, journal).await; let failure = match result { - Ok(result) if result.exit_code == 0 => publish_state - .first_error() - .map(|err| DeliveryFailure::permanent(format!("raised publish error: {err}"))), + Ok(result) if result.exit_code == 0 => { + publish_raised( + &router, + &result.stdout, + args.raised_auth_token.as_deref(), + &record, + ) + .err() + .map(|err| DeliveryFailure::permanent(format!("raised publish error: {err}"))) + } Ok(result) => Some(DeliveryFailure::from_spawn_result(&result)), Err(err) => { error!( @@ -777,13 +782,19 @@ fn publish_failure_fact(router: &DeliveryRouter, event: Event) { } } -#[cfg(test)] fn publish_raised( router: &DeliveryRouter, stdout: &str, + raised_auth_token: Option<&str>, parent: &DeliveryRecord, ) -> anyhow::Result<()> { - for (ordinal, mut raised_ev) in parse_raised(stdout).into_iter().enumerate() { + let Some(raised_auth_token) = raised_auth_token else { + return Ok(()); + }; + for (ordinal, mut raised_ev) in parse_authenticated_raised(stdout, raised_auth_token) + .into_iter() + .enumerate() + { reject_reserved_raised_queue(&raised_ev)?; raised_ev.ts = parent.observed_at_ms; router.publish(PublishEnvelope { @@ -799,83 +810,15 @@ fn publish_raised( Ok(()) } -fn publish_raised_events( +fn publish_ephemeral_raised( router: &DeliveryRouter, - events: Vec, - parent: &DeliveryRecord, - next_ordinal: &AtomicUsize, + stdout: &str, + raised_auth_token: Option<&str>, ) -> anyhow::Result<()> { - for mut raised_ev in events { - reject_reserved_raised_queue(&raised_ev)?; - raised_ev.ts = parent.observed_at_ms; - let ordinal = next_ordinal.fetch_add(1, Ordering::Relaxed); - router.publish(PublishEnvelope { - event: raised_ev, - source: parent.source.clone(), - cron_payload: None, - derived: Some(DerivedDelivery { - parent_delivery_id: parent.delivery_id.clone(), - ordinal, - }), - })?; - } - Ok(()) -} - -struct StreamingRaiseState { - router: DeliveryRouter, - parent: DeliveryRecord, - next_ordinal: AtomicUsize, - first_error: Mutex>, -} - -impl StreamingRaiseState { - fn new(router: DeliveryRouter, parent: DeliveryRecord) -> Self { - Self { - router, - parent, - next_ordinal: AtomicUsize::new(0), - first_error: Mutex::new(None), - } - } - - fn publish_line(&self, line: &str) { - let events = parse_raised_line(line); - if events.is_empty() { - return; - } - // A child may fail after an already streamed raise; durable consumers own - // idempotence, so immediate publication intentionally keeps at-least-once semantics. - if let Err(err) = - publish_raised_events(&self.router, events, &self.parent, &self.next_ordinal) - { - let mut first_error = self - .first_error - .lock() - .unwrap_or_else(|poisoned| poisoned.into_inner()); - if first_error.is_none() { - *first_error = Some(err.to_string()); - } - } - } - - fn first_error(&self) -> Option { - self.first_error - .lock() - .unwrap_or_else(|poisoned| poisoned.into_inner()) - .clone() - } -} - -fn streaming_raise_observer(state: Arc) -> StdoutLineObserver { - Arc::new(move |line| { - state.publish_line(line); - Ok(()) - }) -} - -fn publish_ephemeral_raised(router: &DeliveryRouter, stdout: &str) -> anyhow::Result<()> { - for raised_ev in parse_raised(stdout) { + let Some(raised_auth_token) = raised_auth_token else { + return Ok(()); + }; + for raised_ev in parse_authenticated_raised(stdout, raised_auth_token) { reject_reserved_raised_queue(&raised_ev)?; router.publish(PublishEnvelope { event: raised_ev, @@ -930,6 +873,7 @@ fn spawn_args( stall_window, codex_permit_slots, log_dir: log_dir.to_path_buf(), + raised_auth_token: Some(ulid::Ulid::new().to_string()), owner_namespace: decl.owner_namespace.clone(), process_groups, }) @@ -944,6 +888,7 @@ struct SpawnArgs { stall_window: Duration, codex_permit_slots: usize, log_dir: PathBuf, + raised_auth_token: Option, owner_namespace: String, process_groups: ProcessGroupRegistry, } @@ -973,6 +918,7 @@ async fn spawn_and_report_with_stdout_observer( dept_name, &args.log_dir, args.process_groups.clone(), + args.raised_auth_token.as_deref(), stdout_observer, ) .await?; @@ -1354,6 +1300,12 @@ mod tests { DeliveryRouter::new(&cfg, fanout, None, None) } + fn authenticated_raised_stdout(token: &str, entries: serde_json::Value) -> String { + let encoded = + base64::engine::general_purpose::URL_SAFE.encode(serde_json::to_vec(&entries).unwrap()); + format!("RAISED-AUTH: {token} {encoded}\n") + } + #[test] fn spawn_args_passes_composed_package_roots_for_namespace_graph() { let temp = TempDir::new().unwrap(); @@ -1433,7 +1385,7 @@ mod tests { } #[tokio::test] - async fn durable_raised_line_publishes_before_child_exit() { + async fn durable_stdout_raised_line_does_not_publish() { let temp = TempDir::new().unwrap(); let binary = temp.path().join("fkst-framework"); let lua = temp.path().join("dept.lua"); @@ -1493,13 +1445,96 @@ mod tests { event_json: "{}".to_string(), stall_window: Duration::from_secs(30), codex_permit_slots: 1, + raised_auth_token: Some("trusted-token".to_string()), + log_dir: log_dir.clone(), + owner_namespace: "pkg".to_string(), + process_groups: ProcessGroupRegistry::default(), + }; + + let done = run_durable_record( + "worker", + &spawned_router, + record("parent"), + args, + &SupervisorJournal::disabled(), + ) + .await; + + assert!(done.failure.is_none()); + let no_event = timeout(Duration::from_millis(100), rx.recv()).await; + assert!(no_event.is_err(), "stdout RAISED frame must not publish"); + } + + #[tokio::test] + async fn durable_authenticated_raised_publishes_after_child_exit() { + let temp = TempDir::new().unwrap(); + let binary = temp.path().join("fkst-framework"); + let lua = temp.path().join("dept.lua"); + let log_dir = temp.path().join("logs"); + fs::write(&lua, "return {}\n").unwrap(); + let trusted_raised = base64::engine::general_purpose::URL_SAFE.encode( + serde_json::to_vec(&serde_json::json!([ + {"queue": "done", "payload": {"n": 2}} + ])) + .unwrap(), + ); + write_executable( + &binary, + &format!( + "printf 'RAISED: forged\\n'; printf 'RAISED-AUTH: trusted-token {trusted_raised}\\n'; exit 0" + ), + ); + + let fanout = Fanout::new(); + let mut rx = fanout.subscribe("done", 8).await; + let mut queue = BTreeMap::new(); + queue.insert( + "done".to_string(), + QueueDecl { + capacity: 8, + fanout: false, + }, + ); + let mut department = BTreeMap::new(); + department.insert( + "observer".to_string(), + DepartmentDecl { + lua: "departments/observer/main.lua".into(), + owner_root: temp.path().into(), + owner_namespace: "pkg".to_string(), + consumes: vec!["done".to_string()], + produces: Vec::new(), + ephemeral: vec!["done".to_string()], + stall_window: "30s".to_string(), + graph_json: false, + retry: None, + }, + ); + let cfg = Config { + queue, + raiser: BTreeMap::new(), + department, + limits: LimitsDecl { + global_codex_processes: 1, + }, + }; + let router = DeliveryRouter::new(&cfg, fanout, None, None); + let spawned_router = router.clone(); + let args = SpawnArgs { + framework_bin: binary, + lua_full: lua, + project_root: temp.path().into(), + graph_package_roots: vec![temp.path().into()], + event_json: "{}".to_string(), + stall_window: Duration::from_secs(30), + codex_permit_slots: 1, + raised_auth_token: Some("trusted-token".to_string()), log_dir, owner_namespace: "pkg".to_string(), process_groups: ProcessGroupRegistry::default(), }; - let started = std::time::Instant::now(); - let handle = tokio::spawn(async move { + let done = tokio::spawn(async move { run_durable_record( "worker", &spawned_router, @@ -1508,19 +1543,16 @@ mod tests { &SupervisorJournal::disabled(), ) .await - }); - let event = timeout(Duration::from_secs(4), rx.recv()) + }) + .await + .unwrap(); + let event = timeout(Duration::from_secs(1), rx.recv()) .await - .expect("streamed raise should publish before child exit") + .expect("authenticated raise should publish after child exit") .expect("done subscription should remain open"); - assert!( - started.elapsed() < Duration::from_secs(5), - "raise was not visible before the child sleep elapsed" - ); assert_eq!(event.queue, "done"); assert_eq!(event.payload, serde_json::json!({"n": 2})); - let done = handle.await.unwrap(); assert!(done.failure.is_none()); } @@ -2260,17 +2292,14 @@ mod tests { fn raised_output_cannot_publish_engine_failure_fact_queue() { let router = router_with_failure_fact_and_fanout(Fanout::new()); let parent = record("parent"); - let stdout = format!( - "RAISED: {}\n", - base64::engine::general_purpose::URL_SAFE.encode( - serde_json::to_vec(&serde_json::json!([ - {"queue": "fkst.failure_fact", "payload": {"n": 1}} - ])) - .unwrap() - ) + let raised = authenticated_raised_stdout( + "trusted-token", + serde_json::json!([ + {"queue": "fkst.failure_fact", "payload": {"n": 1}} + ]), ); - let err = publish_raised(&router, &stdout, &parent).unwrap_err(); + let err = publish_raised(&router, &raised, Some("trusted-token"), &parent).unwrap_err(); assert!( err.to_string() @@ -2350,19 +2379,16 @@ mod tests { let temp = TempDir::new().unwrap(); let store = Arc::new(DeliveryStore::open(temp.path().join("delivery.redb")).unwrap()); let router = DeliveryRouter::new(&cfg, Fanout::new(), Some(store), None); - let stdout = format!( - "RAISED: {}\n", - base64::engine::general_purpose::URL_SAFE.encode( - serde_json::to_vec(&serde_json::json!([ - {"queue": "next", "payload": {"n": 2}} - ])) - .unwrap() - ) + let raised = authenticated_raised_stdout( + "trusted-token", + serde_json::json!([ + {"queue": "next", "payload": {"n": 2}} + ]), ); let mut parent = record("parent"); parent.source = None; - let err = publish_raised(&router, &stdout, &parent).unwrap_err(); + let err = publish_raised(&router, &raised, Some("trusted-token"), &parent).unwrap_err(); assert!(err.to_string().contains("requires source_ref"), "{err}"); } @@ -2404,18 +2430,15 @@ mod tests { let store = Arc::new(DeliveryStore::open(temp.path().join("delivery.redb")).unwrap()); let router = DeliveryRouter::new(&cfg, Fanout::new(), Some(store.clone()), None); let parent = record("parent"); - let stdout = format!( - "RAISED: {}\n", - base64::engine::general_purpose::URL_SAFE.encode( - serde_json::to_vec(&serde_json::json!([ - {"queue": "next", "payload": {"n": 2}} - ])) - .unwrap() - ) + let raised = authenticated_raised_stdout( + "trusted-token", + serde_json::json!([ + {"queue": "next", "payload": {"n": 2}} + ]), ); - publish_raised(&router, &stdout, &parent).unwrap(); - publish_raised(&router, &stdout, &parent).unwrap(); + publish_raised(&router, &raised, Some("trusted-token"), &parent).unwrap(); + publish_raised(&router, &raised, Some("trusted-token"), &parent).unwrap(); let leased = store .lease_for_dept( diff --git a/crates/fkst-framework/src/supervise/raised.rs b/crates/fkst-framework/src/supervise/raised.rs index db60ae6..cc57994 100644 --- a/crates/fkst-framework/src/supervise/raised.rs +++ b/crates/fkst-framework/src/supervise/raised.rs @@ -1,9 +1,10 @@ -//! RAISED stdout protocol parser. +//! RAISED frame parser. //! -//! Framework prints exactly one final line `RAISED: ` on stdout -//! before exit. The JSON decodes to `[{queue, payload}, ...]`. Multiple RAISED lines → -//! last wins. No RAISED line → empty list, no error. Malformed base64/JSON → log -//! warning, treat as empty (don't crash supervisor). +//! Direct `run` compatibility prints one final line `RAISED: +//! ` on stdout before exit. Supervise requires a +//! parent-generated `RAISED-AUTH: ` frame, and +//! `run` removes the token from the Lua-visible environment before loading user +//! code. //! //! Scanning from end of stdout buffer prevents log lines like //! `log.info("RAISED: foo")` from being mistaken for the actual protocol. @@ -21,12 +22,7 @@ struct RaisedEntry { payload: Value, } -pub fn parse_raised_line(line: &str) -> Vec { - if !line.trim_start().starts_with("RAISED: ") { - return Vec::new(); - } - - let b64_part = line.trim_start().trim_start_matches("RAISED: ").trim(); +fn decode_raised_payload(b64_part: &str) -> Vec { let decoded_bytes = match base64::engine::general_purpose::URL_SAFE.decode(b64_part) { Ok(b) => b, Err(e) => { @@ -47,9 +43,42 @@ pub fn parse_raised_line(line: &str) -> Vec { .collect() } -/// Parse stdout into a list of (queue, Event) tuples. Returns empty vec if no RAISED line. +pub fn parse_raised_line(line: &str) -> Vec { + let Some(b64_part) = line.trim_start().strip_prefix("RAISED: ") else { + return Vec::new(); + }; + + decode_raised_payload(b64_part.trim()) +} + +pub fn parse_authenticated_raised_line(line: &str, token: &str) -> Vec { + let Some(rest) = line.trim_start().strip_prefix("RAISED-AUTH: ") else { + return Vec::new(); + }; + let Some((frame_token, b64_part)) = rest.trim().split_once(' ') else { + warn!("RAISED-AUTH line missing payload"); + return Vec::new(); + }; + if frame_token != token { + warn!("RAISED-AUTH line token mismatch"); + return Vec::new(); + } + decode_raised_payload(b64_part.trim()) +} + +pub fn parse_authenticated_raised(stdout: &str, token: &str) -> Vec { + let last = stdout + .lines() + .rev() + .find(|line| line.trim_start().starts_with("RAISED-AUTH: ")); + let Some(line) = last else { + return Vec::new(); + }; + parse_authenticated_raised_line(line, token) +} + +/// Parse stdout into a list of events. Returns empty vec if no RAISED line. pub fn parse_raised(stdout: &str) -> Vec { - // Find the LAST line starting with "RAISED: " (after any trailing whitespace). let last = stdout .lines() .rev() @@ -90,6 +119,28 @@ mod tests { assert_eq!(events[0].payload, serde_json::json!({"n": 1})); } + #[test] + fn authenticated_raised_requires_matching_token() { + let payload = encode(r#"[{"queue":"done","payload":{"n":1}}]"#); + let stdout = format!( + "RAISED-AUTH: wrong {}\nRAISED-AUTH: expected {}\n", + payload, payload + ); + + let events = parse_authenticated_raised(&stdout, "expected"); + assert_eq!(events.len(), 1); + assert_eq!(events[0].queue, "done"); + assert!(parse_authenticated_raised(&stdout, "wrong").is_empty()); + } + + #[test] + fn authenticated_raised_ignores_plain_raised() { + let payload = encode(r#"[{"queue":"done","payload":{"n":1}}]"#); + let stdout = format!("RAISED: {}\n", payload); + + assert!(parse_authenticated_raised(&stdout, "expected").is_empty()); + } + #[test] fn multiple_raised_lines_last_wins() { let first = encode(r#"[{"queue":"first","payload":null}]"#); diff --git a/crates/fkst-framework/src/supervise/spawner.rs b/crates/fkst-framework/src/supervise/spawner.rs index ba7b540..078abe3 100644 --- a/crates/fkst-framework/src/supervise/spawner.rs +++ b/crates/fkst-framework/src/supervise/spawner.rs @@ -18,6 +18,7 @@ use tokio::task::JoinHandle; use tracing::info; static NEXT_FRAMEWORK_CHILD_LOG_ID: AtomicU64 = AtomicU64::new(1); +const RAISED_AUTH_TOKEN_ENV: &str = "FKST_RAISED_AUTH_TOKEN"; pub struct SpawnResult { pub pid: u32, @@ -69,6 +70,7 @@ pub async fn spawn_framework( log_dir, process_groups, None, + None, ) .await } @@ -85,6 +87,7 @@ pub async fn spawn_framework_with_stdout_observer( child_label: &str, log_dir: &Path, process_groups: ProcessGroupRegistry, + raised_auth_token: Option<&str>, stdout_observer: Option, ) -> Result { let start = std::time::Instant::now(); @@ -118,6 +121,9 @@ pub async fn spawn_framework_with_stdout_observer( crate::provenance::current_pkg_versions_summary() )); log.write_line(&format!("DEPT={child_label}")); + if raised_auth_token.is_some() { + log.write_line("RAISED_AUTH=enabled"); + } let mut cmd = Command::new(binary); cmd.arg("run") @@ -142,6 +148,9 @@ pub async fn spawn_framework_with_stdout_observer( "FKST_SUPERVISOR_PID", crate::process_tree::current_pid().to_string(), ); + if let Some(token) = raised_auth_token { + cmd.env(RAISED_AUTH_TOKEN_ENV, token); + } cmd.current_dir(host_root); // Set a new process group before exec so framework becomes its own group leader. diff --git a/crates/fkst-framework/src/test_runner.rs b/crates/fkst-framework/src/test_runner.rs index 643bb86..8b92f27 100644 --- a/crates/fkst-framework/src/test_runner.rs +++ b/crates/fkst-framework/src/test_runner.rs @@ -53,6 +53,7 @@ pub(crate) fn run_tests( None, roots.name_resolver(), file.owner_namespace.clone(), + crate::raise::RaiseAuthority::new(Default::default()), Some(mock_commands.clone()), Some(roots.clone()), false, @@ -564,7 +565,8 @@ fn run_department( qualified_consumes, ) .map_err(mlua::Error::external)?; - let qualified_produces = cache.declared_qualified_produces(owner_root, &lua_path)?; + let declared_produces = + cache.declared_resolved_produces(owner_root, owner_namespace, &lua_path)?; let package_path = cache.package_path_string(&require_roots)?; let dept_lua = crate::mlua_init::new_lua(); @@ -578,8 +580,9 @@ fn run_department( department_name_for_lua(&lua_path, owner_root, owner_namespace), roots .name_resolver() - .with_recorded_only_queues(qualified_produces), + .with_recorded_only_queues(declared_produces.clone()), owner_namespace.to_string(), + crate::raise::RaiseAuthority::new(declared_produces), Some(mock_commands), Some(roots.clone()), graph_json_authorized, @@ -694,9 +697,10 @@ impl TestRunCache { Ok(path) } - fn declared_qualified_produces( + fn declared_resolved_produces( &self, owner_root: &Path, + owner_namespace: &str, lua_path: &Path, ) -> mlua::Result> { let lua_path = lua_path.canonicalize().map_err(mlua::Error::external)?; @@ -711,7 +715,9 @@ impl TestRunCache { } let require_roots = self.require_roots_for_owner(owner_root)?; let package_path = self.package_path_string(&require_roots)?; - let produces = declared_qualified_produces( + let produces = crate::spec_queues::declared_resolved_produces( + &self.roots, + owner_namespace, owner_root, &lua_path, &package_path, @@ -780,52 +786,13 @@ fn declared_qualified_consumes( package_path: &str, chunk_cache: &crate::mlua_init::LuaChunkCache, ) -> mlua::Result> { - declared_qualified_spec_queues(owner_root, lua_path, package_path, chunk_cache, "consumes") -} - -fn declared_qualified_produces( - owner_root: &Path, - lua_path: &Path, - package_path: &str, - chunk_cache: &crate::mlua_init::LuaChunkCache, -) -> mlua::Result> { - declared_qualified_spec_queues(owner_root, lua_path, package_path, chunk_cache, "produces") -} - -fn declared_qualified_spec_queues( - owner_root: &Path, - lua_path: &Path, - package_path: &str, - chunk_cache: &crate::mlua_init::LuaChunkCache, - field: &str, -) -> mlua::Result> { - if !is_department_entrypoint(owner_root, lua_path) { - return Ok(BTreeSet::new()); - } - - let lua = crate::mlua_init::new_lua(); - crate::mlua_init::set_package_path_string(&lua, package_path)?; - let value = match chunk_cache.eval_cached_chunk(&lua, lua_path) { - Ok(value) => value, - Err(_) => return Ok(BTreeSet::new()), - }; - let Value::Table(module) = value else { - return Ok(BTreeSet::new()); - }; - let Some(spec) = module.get::>("spec")? else { - return Ok(BTreeSet::new()); - }; - let Some(queues) = spec.get::>(field)? else { - return Ok(BTreeSet::new()); - }; - let mut qualified = BTreeSet::new(); - for value in queues.sequence_values::() { - let value = value?; - if value.contains('.') { - qualified.insert(value); - } - } - Ok(qualified) + crate::spec_queues::declared_qualified_spec_queues( + owner_root, + lua_path, + package_path, + chunk_cache, + "consumes", + ) } fn normalize_run_department_event_queue( @@ -855,21 +822,6 @@ fn normalize_run_department_event_queue( Ok(event) } -fn is_department_entrypoint(owner_root: &Path, lua_path: &Path) -> bool { - let Ok(relative) = lua_path.strip_prefix(owner_root) else { - return false; - }; - let components = relative.components().collect::>(); - match components.as_slice() { - [std::path::Component::Normal(departments), std::path::Component::Normal(name), std::path::Component::Normal(main)] => { - *departments == std::ffi::OsStr::new("departments") - && !name.is_empty() - && *main == std::ffi::OsStr::new("main.lua") - } - _ => false, - } -} - fn department_name_for_lua( lua_path: &Path, owner_root: &Path, diff --git a/crates/fkst-framework/tests/self_test_cli.rs b/crates/fkst-framework/tests/self_test_cli.rs index 781f979..2c41cd8 100644 --- a/crates/fkst-framework/tests/self_test_cli.rs +++ b/crates/fkst-framework/tests/self_test_cli.rs @@ -139,10 +139,13 @@ fn self_test_coverage_runs_lua_tests_and_writes_artifacts() { std::fs::write( tmp.path().join("departments/probe/main.lua"), r#" +local M = {} +M.spec = { produces = { "done" } } function pipeline(event) local value = event.payload.value .. "-covered" raise("done", { value = value }) end +return M "#, ) .unwrap(); @@ -255,6 +258,8 @@ fn run_project_root_controls_host_facts_and_git_sdk_when_cwd_differs() { &lua, format!( r#" +local M = {{}} +M.spec = {{ produces = {{ "done" }} }} function pipeline(event) local count = git_log_count("sdk git host fact regression", "1970-01-01T00:00:00Z") local worktree = setup_worktree("host-root-test") @@ -264,6 +269,7 @@ function pipeline(event) f:close() raise("done", {{ count = count }}) end +return M "#, witness.to_string_lossy() ), diff --git a/crates/fkst-framework/tests/test_runner_cli.rs b/crates/fkst-framework/tests/test_runner_cli.rs index 267f5d5..8ca9b73 100644 --- a/crates/fkst-framework/tests/test_runner_cli.rs +++ b/crates/fkst-framework/tests/test_runner_cli.rs @@ -201,6 +201,43 @@ return M assert_eq!(raises[0]["payload"]["source"], "ok"); } +#[test] +fn production_run_uses_same_resolved_produces_contract_as_test() { + let temp = tempfile::tempdir().unwrap(); + let package = temp.path().join("pkg"); + fs::create_dir_all(package.join("departments/probe")).unwrap(); + let probe = package.join("departments/probe/main.lua"); + fs::write( + &probe, + r#" +local M = {} +M.spec = { produces = { "pkg.seen" } } +function pipeline(event) + raise("pkg.seen", { value = event.payload.value }) +end +return M +"#, + ) + .unwrap(); + + let output = run_command(&package, &probe) + .arg("--package-root") + .arg(&package) + .output() + .unwrap(); + + assert_eq!( + output.status.code(), + Some(0), + "stdout: {}\nstderr: {}", + stdout(&output), + stderr(&output) + ); + let raises = raised_entries(&output); + assert_eq!(raises[0]["queue"], "pkg.seen"); + assert_eq!(raises[0]["payload"]["value"], "ok"); +} + #[test] fn production_run_resolves_owner_locale_catalog() { let temp = tempfile::tempdir().unwrap(); @@ -223,9 +260,12 @@ fn production_run_resolves_owner_locale_catalog() { fs::write( &probe, r#" +local M = {} +M.spec = { produces = { "checked" } } function pipeline(event) raise("checked", { message = t("result.summary", { name = event.payload.value }) }) end +return M "#, ) .unwrap(); @@ -266,12 +306,15 @@ fn production_run_composed_namespace_roots_do_not_authorize_sibling_require() { fs::write( &probe, r#" +local M = {} +M.spec = { produces = { "checked" } } function pipeline(event) local ok, err = pcall(require, "sibling_only") assert(not ok, "sibling module leaked into owner package.path") assert(string.find(err, "module 'sibling_only' not found", 1, true), err) raise("checked", { isolated = true }) end +return M "#, ) .unwrap(); @@ -672,10 +715,13 @@ fn test_runner_isolates_each_test_file_to_its_owner_root() { fs::write( package.join("departments/probe/main.lua"), r#" +local M = {} +M.spec = { produces = { "seen" } } local core = require("core") function pipeline(event) raise("seen", { value = core.value, expected = event.payload.expected }) end +return M "#, ) .unwrap(); @@ -741,10 +787,13 @@ fn test_runner_host_department_uses_package_standard_asset() { fs::write( host.path().join("departments/probe/main.lua"), r#" +local M = {} +M.spec = { produces = { "seen" } } local standard = require("fkst.standard_asset") function pipeline(event) raise("seen", { value = standard.value() }) end +return M "#, ) .unwrap(); @@ -1705,6 +1754,8 @@ fn test_coverage_writes_json_and_lcov_for_production_lua_lines() { fs::write( host.path().join("departments/probe/main.lua"), r#" +local M = {} +M.spec = { produces = { "done" } } local helper = require("helper") function pipeline(event) @@ -1716,6 +1767,7 @@ function pipeline(event) assert(ok, err) raise("done", { value = value }) end +return M "#, ) .unwrap(); @@ -1802,10 +1854,13 @@ fn test_runner_mocks_external_commands_fail_closed_and_isolates_tests() { fs::write( host.path().join("departments/probe/main.lua"), r#" +local M = {} +M.spec = { produces = { "seen" } } function pipeline(event) local result = exec_sync(event.payload.cmd) raise("seen", { stdout = result.stdout, exit_code = result.exit_code }) end +return M "#, ) .unwrap(); @@ -2123,10 +2178,13 @@ fn production_run_does_not_require_from_host_cwd_when_owner_lacks_module() { fs::write( &probe, r#" +local M = {} +M.spec = { produces = { "seen" } } local core = require("core") function pipeline(event) raise("seen", { value = core.value }) end +return M "#, ) .unwrap(); @@ -2166,10 +2224,13 @@ fn production_exec_sync_returns_typed_boundary_error_class() { fs::write( &probe, r#" +local M = {} +M.spec = { produces = { "seen" } } function pipeline(event) local result = exec_sync(event.payload.cmd) raise("seen", { exit_code = result.exit_code, error_class = result.error_class }) end +return M "#, ) .unwrap(); @@ -2244,10 +2305,13 @@ fn run_accepts_host_owner_with_multiple_package_root_flags_as_require_roots() { fs::write( &probe, r#" +local M = {} +M.spec = { produces = { "seen" } } local core = require("core") function pipeline(event) raise("seen", { value = core.value }) end +return M "#, ) .unwrap(); @@ -2316,10 +2380,13 @@ fn run_single_package_entrypoints_are_equivalent() { fs::write( &probe, r#" +local M = {} +M.spec = { produces = { "seen" } } local core = require("core") function pipeline(event) raise("seen", { core = core.value, input = event.payload.value }) end +return M "#, ) .unwrap(); diff --git a/crates/fkst-supervisor/tests/release_probe_witness.rs b/crates/fkst-supervisor/tests/release_probe_witness.rs index e798a11..8e204b5 100644 --- a/crates/fkst-supervisor/tests/release_probe_witness.rs +++ b/crates/fkst-supervisor/tests/release_probe_witness.rs @@ -157,7 +157,7 @@ fn wait_for_witness( ) } -fn wait_for_child_log_with_raised( +fn wait_for_child_log_with_raised_auth( deadline: Instant, child: &mut Child, supervisor_log: &Path, @@ -168,7 +168,7 @@ fn wait_for_child_log_with_raised( child, supervisor_log, child_log_dir, - "RAISED child log", + "raised auth child log", || { let entries = fs::read_dir(child_log_dir).ok()?; for entry in entries.flatten() { @@ -179,7 +179,9 @@ fn wait_for_child_log_with_raised( let Ok(content) = fs::read_to_string(&path) else { continue; }; - if content.contains("RAISED: ") { + if content.contains("DEPT=release_probe\n") + && content.contains("RAISED_AUTH=enabled") + { return Some(content); } } @@ -453,7 +455,7 @@ fn supervisor_framework_completes_release_probe_raised_cycle() { &child_log_dir, &witness, ); - let child_log = wait_for_child_log_with_raised( + let child_log = wait_for_child_log_with_raised_auth( deadline, supervisor.child_mut(), &supervisor_log, @@ -476,7 +478,10 @@ fn supervisor_framework_completes_release_probe_raised_cycle() { child_log.contains("DEPT=release_probe\n"), "log={child_log}" ); - assert!(child_log.contains("RAISED: "), "log={child_log}"); + assert!( + child_log.contains("RAISED_AUTH=enabled"), + "log={child_log}" + ); assert!(child_log.contains("EXIT=0\n"), "log={child_log}"); } diff --git a/examples/minimal-package/tests/run_department_subject.lua b/examples/minimal-package/tests/run_department_subject.lua deleted file mode 100644 index dfe5d87..0000000 --- a/examples/minimal-package/tests/run_department_subject.lua +++ /dev/null @@ -1,7 +0,0 @@ -function pipeline(event) - raise("example_event", { - from = "run_department_subject", - source_queue = event.queue, - source_raiser = event.payload and event.payload.raiser or "", - }) -end diff --git a/examples/minimal-package/tests/run_department_test.lua b/examples/minimal-package/tests/run_department_test.lua index c6abc02..1f39771 100644 --- a/examples/minimal-package/tests/run_department_test.lua +++ b/examples/minimal-package/tests/run_department_test.lua @@ -2,7 +2,7 @@ local t = fkst.test return { test_run_department_captures_raises = function() - local result = t.run_department("tests/run_department_subject.lua", { + local result = t.run_department("departments/producer/main.lua", { queue = "tick", payload = { raiser = "integration-test" }, ts = 123, @@ -10,7 +10,7 @@ return { t.eq(result.exit_code, 0) t.eq(result.raises[1].queue, "example_event") - t.eq(result.raises[1].payload.from, "run_department_subject") + t.eq(result.raises[1].payload.from, "producer") t.eq(result.raises[1].payload.source_queue, "tick") t.eq(result.raises[1].payload.source_raiser, "integration-test") t.is_nil(result.raises[2])