Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions crates/fkst-framework/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use anyhow::{Context, Result};
use host_conformance::HostConformanceOptions;
use path_resolver::PackageRoots;
use serde_json::Value as JsonValue;
use std::collections::BTreeSet;
use std::path::{Path, PathBuf};

mod boundary_resource;
Expand Down Expand Up @@ -48,6 +49,7 @@ mod sdk_log;
mod sdk_mark;
mod sdk_strings;
mod self_test;
mod spec_queues;
mod supervise;
mod test_runner;

Expand All @@ -56,7 +58,7 @@ mod test_env {
pub(crate) static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
}

use raise::RaiseBuffer;
use raise::{RaiseAuthority, RaiseBuffer, RAISED_AUTH_TOKEN_ENV};

enum CliCommand {
Run {
Expand Down Expand Up @@ -442,6 +444,8 @@ fn run_pipeline(
eprintln!("[framework] supervisor parent lost before pipeline start");
return Ok(125);
}
let raised_auth_token = std::env::var(RAISED_AUTH_TOKEN_ENV).ok();
std::env::remove_var(RAISED_AUTH_TOKEN_ENV);
let lua = mlua_init::new_lua();
let raise_buf = RaiseBuffer::new();
let owner_root = roots
Expand All @@ -452,15 +456,21 @@ fn run_pipeline(
let require_roots = roots.require_roots_for_owner(owner_root);
let graph_json_authorized =
sdk_graph::department_authorized(&roots, owner_root, &lua_path).unwrap_or(false);
let declared_produces =
department_declared_resolved_produces(&roots, owner_root, &owner_namespace, &lua_path)
.with_context(|| format!("resolve raise authority for {}", lua_path.display()))?;

mlua_init::register_framework_sdk(
&lua,
raise_buf.clone(),
roots.host_root(),
owner_root,
department_name_for_lua(&lua_path, owner_root, &owner_namespace),
roots.name_resolver(),
roots
.name_resolver()
.with_recorded_only_queues(declared_produces.clone()),
owner_namespace.clone(),
RaiseAuthority::new(declared_produces),
Some(roots.clone()),
graph_json_authorized,
)?;
Expand All @@ -478,10 +488,36 @@ fn run_pipeline(
}
};

raise_buf.emit_stdout();
if let Some(token) = raised_auth_token {
raise_buf.emit_authenticated_stdout(&token);
} else {
raise_buf.emit_stdout();
}
Ok(exit_code)
}

fn department_declared_resolved_produces(
roots: &PackageRoots,
owner_root: &Path,
owner_namespace: &str,
lua_path: &Path,
) -> Result<BTreeSet<String>> {
let lua_path = lua_path
.canonicalize()
.with_context(|| format!("canonicalize {}", lua_path.display()))?;
let require_roots = roots.require_roots_for_owner(owner_root);
let package_path = mlua_init::package_roots_path(require_roots.iter().map(PathBuf::as_path));
spec_queues::declared_resolved_produces(
roots,
owner_namespace,
owner_root,
&lua_path,
&package_path,
&mlua_init::LuaChunkCache::default(),
)
.map_err(anyhow::Error::from)
}

fn department_name_for_lua(
lua_path: &Path,
owner_root: &Path,
Expand Down
8 changes: 5 additions & 3 deletions crates/fkst-framework/src/mlua_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::time::SystemTime;
use crate::config_registry::ConfigContext;
use crate::external_command::MockCommandState;
use crate::path_resolver::{package_root_path, NameResolver, PackageRoots};
use crate::raise::RaiseBuffer;
use crate::raise::{RaiseAuthority, RaiseBuffer};

/// Create a Lua state with stdlib enabled.
pub fn new_lua() -> Lua {
Expand All @@ -27,6 +27,7 @@ pub fn register_framework_sdk(
dept: Option<String>,
resolver: NameResolver,
owner_namespace: String,
raise_authority: RaiseAuthority,
graph_roots: Option<PackageRoots>,
graph_json_authorized: bool,
) -> mlua::Result<()> {
Expand All @@ -43,7 +44,7 @@ pub fn register_framework_sdk(
crate::sdk_mark::register(lua, host_root)?;
crate::sdk_cache::register(lua, host_root)?;
crate::sdk_codex::register(lua, host_root, config, dept)?;
crate::raise::register(lua, raise_buf, resolver, owner_namespace)?;
crate::raise::register(lua, raise_buf, resolver, owner_namespace, raise_authority)?;
Ok(())
}

Expand All @@ -55,6 +56,7 @@ pub(crate) fn register_framework_sdk_with_runner(
dept: Option<String>,
resolver: NameResolver,
owner_namespace: String,
raise_authority: RaiseAuthority,
runner: Option<MockCommandState>,
graph_roots: Option<PackageRoots>,
graph_json_authorized: bool,
Expand All @@ -72,7 +74,7 @@ pub(crate) fn register_framework_sdk_with_runner(
crate::sdk_mark::register(lua, host_root)?;
crate::sdk_cache::register(lua, host_root)?;
crate::sdk_codex::register_with_runner(lua, host_root, config, dept, runner)?;
crate::raise::register(lua, raise_buf, resolver, owner_namespace)?;
crate::raise::register(lua, raise_buf, resolver, owner_namespace, raise_authority)?;
Ok(())
}

Expand Down
78 changes: 72 additions & 6 deletions crates/fkst-framework/src/raise.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
//! SDK: `raise(queue_name, payload)` — buffers in-process; emit RAISED stdout line on exit.
//! SDK: `raise(queue_name, payload)` — buffers in-process for trusted parent transport.
//!
//! Raise is best-effort, at-most-once, and derived-only. Durable intent goes through filesystem.
//! Emit format = `RAISED: <base64-url-encoded JSON [{queue, payload}, ...]>`.
//! External direct `run` compatibility still emits `RAISED: <base64-url-encoded JSON
//! [{queue, payload}, ...]>` when no supervise authentication token is supplied.

use base64::Engine;
use mlua::{Lua, LuaSerdeExt, Result};
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::collections::BTreeSet;
use std::sync::{Arc, Mutex};

use crate::path_resolver::NameResolver;

pub(crate) const RAISED_AUTH_TOKEN_ENV: &str = "FKST_RAISED_AUTH_TOKEN";

#[derive(Serialize, Debug, Clone)]
struct RaisedEntry {
queue: String,
Expand All @@ -20,6 +24,26 @@ struct RaisedEntry {
#[derive(Clone, Default)]
pub struct RaiseBuffer(Arc<Mutex<Vec<RaisedEntry>>>);

#[derive(Clone, Debug)]
pub(crate) struct RaiseAuthority {
allowed_queues: BTreeSet<String>,
}

impl RaiseAuthority {
pub(crate) fn new(allowed_queues: BTreeSet<String>) -> Self {
Self { allowed_queues }
}

fn ensure_allowed(&self, queue: &str) -> Result<()> {
if self.allowed_queues.contains(queue) {
return Ok(());
}
Err(mlua::Error::external(format!(
"raise queue `{queue}` is not declared in department M.spec.produces"
)))
}
}

impl RaiseBuffer {
pub fn new() -> Self {
Self::default()
Expand All @@ -38,14 +62,25 @@ impl RaiseBuffer {
self.0.lock().unwrap().push(RaisedEntry { queue, payload });
}

pub fn emit_stdout(&self) {
pub fn encoded_frame(&self) -> Option<String> {
let entries = self.0.lock().unwrap().clone();
if entries.is_empty() {
return;
return None;
}
let json = serde_json::to_string(&entries).expect("serialize raise entries");
let b64 = base64::engine::general_purpose::URL_SAFE.encode(json.as_bytes());
println!("RAISED: {}", b64);
Some(base64::engine::general_purpose::URL_SAFE.encode(json.as_bytes()))
}

pub fn emit_stdout(&self) {
if let Some(b64) = self.encoded_frame() {
println!("RAISED: {}", b64);
}
}

pub(crate) fn emit_authenticated_stdout(&self, token: &str) {
if let Some(b64) = self.encoded_frame() {
println!("RAISED-AUTH: {token} {b64}");
}
}
}

Expand All @@ -54,6 +89,7 @@ pub fn register(
buf: RaiseBuffer,
resolver: NameResolver,
owner_namespace: String,
authority: RaiseAuthority,
) -> Result<()> {
let buf_clone = buf.clone();
lua.globals().set(
Expand All @@ -63,6 +99,7 @@ pub fn register(
let queue = resolver
.resolve(&owner_namespace, &queue)
.map_err(mlua::Error::external)?;
authority.ensure_allowed(&queue)?;
let p_json: JsonValue = lua.from_value(payload).unwrap_or(JsonValue::Null);
buf_clone.push(queue, p_json);
Ok(())
Expand All @@ -83,6 +120,7 @@ mod tests {
buf,
NameResolver::new(["pkg".to_string()]),
"pkg".to_string(),
RaiseAuthority::new(BTreeSet::from(["done".to_string()])),
)
.unwrap();
}
Expand All @@ -96,6 +134,7 @@ mod tests {
buf.clone(),
NameResolver::new(["pkg".to_string()]),
"pkg".to_string(),
RaiseAuthority::new(BTreeSet::from(["done".to_string()])),
)
.unwrap();
lua.load(r#"raise("done", {n=42})"#).exec().unwrap();
Expand All @@ -114,6 +153,7 @@ mod tests {
buf.clone(),
NameResolver::new(["pkg".to_string()]),
"pkg".to_string(),
RaiseAuthority::new(BTreeSet::from(["q1".to_string(), "q2".to_string()])),
)
.unwrap();
lua.load(
Expand Down Expand Up @@ -216,6 +256,7 @@ mod tests {
buf.clone(),
NameResolver::new(["pkg".to_string(), "host".to_string()]),
"pkg".to_string(),
RaiseAuthority::new(BTreeSet::from(["pkg.done".to_string()])),
)
.unwrap();

Expand Down Expand Up @@ -246,6 +287,7 @@ mod tests {
buf.clone(),
NameResolver::new(["pkg".to_string(), "host".to_string()]),
"pkg".to_string(),
RaiseAuthority::new(BTreeSet::from(["pkg.done".to_string()])),
)
.unwrap();

Expand All @@ -259,6 +301,30 @@ mod tests {
assert!(entries.is_empty());
}

#[test]
fn raise_rejects_resolved_queue_not_in_authority() {
let lua = Lua::new();
let buf = RaiseBuffer::new();
register(
&lua,
buf.clone(),
NameResolver::new(["pkg".to_string(), "host".to_string()]),
"pkg".to_string(),
RaiseAuthority::new(BTreeSet::from(["pkg.done".to_string()])),
)
.unwrap();

let err = lua.load(r#"raise("other", {})"#).exec().unwrap_err();

assert!(
err.to_string()
.contains("raise queue `pkg.other` is not declared"),
"got: {err}"
);
let entries = buf.0.lock().unwrap();
assert!(entries.is_empty());
}

#[test]
fn empty_buffer_emits_nothing() {
let buf = RaiseBuffer::new();
Expand Down
1 change: 1 addition & 0 deletions crates/fkst-framework/src/self_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ fn check_sdk_registration(host_root: &std::path::Path) -> Result<()> {
Some("pkg.self_test".to_string()),
NameResolver::new(["pkg".to_string()]),
"pkg".to_string(),
crate::raise::RaiseAuthority::new(Default::default()),
None,
false,
)
Expand Down
Loading
Loading