Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions crates/fkst-common/src/durable_layout.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Durable delivery path resolver.

use anyhow::{anyhow, Result};
use std::fmt::Write as _;
use std::path::{Component, Path, PathBuf};

pub const DURABLE_ROOT_ENV: &str = "FKST_DURABLE_ROOT";
Expand Down Expand Up @@ -36,6 +37,11 @@ impl DurableLayout {
pub fn delivery_db_path(&self) -> PathBuf {
self.root.join("delivery.redb")
}

// Supervise owns this transient live-observe endpoint while the store is open.
pub fn observe_socket_path(&self) -> PathBuf {
PathBuf::from("/tmp").join(format!("fkst-observe-{}.sock", stable_hex_hash(&self.root)))
}
}

fn reject_traversal(path: &Path) -> Result<()> {
Expand All @@ -50,6 +56,17 @@ fn reject_traversal(path: &Path) -> Result<()> {
Ok(())
}

fn stable_hex_hash(path: &Path) -> String {
let mut hash = 0xcbf29ce484222325_u64;
for byte in path.as_os_str().to_string_lossy().as_bytes() {
hash ^= u64::from(*byte);
hash = hash.wrapping_mul(0x100000001b3);
}
let mut out = String::with_capacity(16);
write!(&mut out, "{hash:016x}").expect("writing to string should not fail");
out
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -111,6 +128,20 @@ mod tests {
);
}

#[test]
fn observe_socket_path_is_short_and_stable() {
let layout = DurableLayout::new("/tmp/fkst-durable/repo-a").unwrap();
assert_eq!(layout.observe_socket_path(), layout.observe_socket_path());
assert!(layout.observe_socket_path().starts_with("/tmp"));
assert!(layout
.observe_socket_path()
.file_name()
.unwrap()
.to_string_lossy()
.ends_with(".sock"));
assert!(layout.observe_socket_path().to_string_lossy().len() < 100);
}

#[test]
fn traversal_is_rejected() {
assert!(DurableLayout::new("../durable").is_err());
Expand Down
10 changes: 9 additions & 1 deletion crates/fkst-framework/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! CLI: `fkst-framework conformance --project-root <path>`
//! CLI: `fkst-framework test --project-root <path> [--package-root <path> ...] [--report-json <path>]`
//! CLI: `fkst-framework init-package-repo [--ref <substrate-ref>] [--force]`
//! CLI: `fkst-framework observe --durable-root <path> [--json] [--limit <n>]`
//! CLI: `fkst-framework --self-test`
//! Exit codes:
//! 0 = pipeline ok
Expand All @@ -28,6 +29,7 @@ mod host_conformance;
mod init_package_repo;
mod lua_coverage;
mod mlua_init;
mod observe;
mod path_resolver;
mod process_tree;
mod provenance;
Expand Down Expand Up @@ -84,6 +86,7 @@ enum CliCommand {
},
Test(TestCli),
InitPackageRepo(init_package_repo::InitPackageRepoOptions),
Observe(observe::ObserveOptions),
CodexWorker(sdk_codex::CodexWorkerOptions),
SelfTest(SelfTestCli),
}
Expand All @@ -93,7 +96,7 @@ fn parse_args() -> Result<CliCommand> {
let mut args_iter = args.into_iter();
let sub = args_iter.next().ok_or_else(|| {
anyhow::anyhow!(
"usage: fkst-framework run <lua> --project-root <path> --package-root <path> [--package-root <path> ...] [--owner-namespace <id>] --event <json> | fkst-framework supervise --project-root <path> --framework-bin <path> [--package-root <path> ...] | fkst-framework conformance --project-root <path> [--package-root <path> ...] | fkst-framework config --project-root <path> [--package-root <path> ...] | fkst-framework boundary-resources | fkst-framework rate-acquire <pool> | fkst-framework rate-exec <pool> -- <program> [args...] | fkst-framework test --project-root <path> [--package-root <path> ...] [--report-json <path>] | fkst-framework init-package-repo [--ref <substrate-ref>] [--force] | fkst-framework --self-test"
"usage: fkst-framework run <lua> --project-root <path> --package-root <path> [--package-root <path> ...] [--owner-namespace <id>] --event <json> | fkst-framework supervise --project-root <path> --framework-bin <path> [--package-root <path> ...] | fkst-framework conformance --project-root <path> [--package-root <path> ...] | fkst-framework config --project-root <path> [--package-root <path> ...] | fkst-framework boundary-resources | fkst-framework rate-acquire <pool> | fkst-framework rate-exec <pool> -- <program> [args...] | fkst-framework test --project-root <path> [--package-root <path> ...] [--report-json <path>] | fkst-framework init-package-repo [--ref <substrate-ref>] [--force] | fkst-framework observe --durable-root <path> [--json] [--limit <n>] | fkst-framework --self-test"
)
})?;
if sub == "--self-test" {
Expand Down Expand Up @@ -171,6 +174,10 @@ fn parse_args() -> Result<CliCommand> {
&rest,
)?));
}
if sub == "observe" {
let rest = args_iter.collect::<Vec<_>>();
return Ok(CliCommand::Observe(observe::parse_args(&rest)?));
}
if sub == "run" {
let lua_path: PathBuf = args_iter
.next()
Expand Down Expand Up @@ -675,6 +682,7 @@ fn run() -> Result<i32> {
test_runner::run_tests(options.roots, options.report_json, options.coverage)
}
CliCommand::InitPackageRepo(options) => init_package_repo::run(options),
CliCommand::Observe(options) => observe::run(options),
CliCommand::CodexWorker(options) => sdk_codex::run_codex_worker(options),
CliCommand::SelfTest(options) => match self_test::run() {
Ok(()) => {
Expand Down
254 changes: 254 additions & 0 deletions crates/fkst-framework/src/observe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
use crate::supervise::delivery_store::{
DeliveryObserveOptions, DeliveryObserveSnapshot, DeliveryStore,
};
use anyhow::{Context, Result};
use fkst_common::DurableLayout;
use serde::{Deserialize, Serialize};
use std::io::{BufReader, Read, Write};
use std::os::unix::net::UnixStream;
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};

const DEFAULT_LIMIT: usize = 500;
const MAX_LIMIT: usize = 10_000;

#[derive(Clone, Debug)]
pub(crate) struct ObserveOptions {
pub(crate) durable_root: PathBuf,
pub(crate) json: bool,
pub(crate) limit: usize,
}

pub(crate) fn parse_args(args: &[String]) -> Result<ObserveOptions> {
let mut durable_root: Option<PathBuf> = None;
let mut json = false;
let mut limit = DEFAULT_LIMIT;
let mut i = 0;
while i < args.len() {
match args[i].as_str() {
"--durable-root" => {
if durable_root.is_some() {
anyhow::bail!("duplicate --durable-root");
}
i += 1;
durable_root = Some(next_value(args, i, "--durable-root")?.into());
}
"--json" => json = true,
"--limit" => {
i += 1;
let raw = next_value(args, i, "--limit")?;
limit = raw
.parse::<usize>()
.with_context(|| format!("invalid --limit value `{raw}`"))?;
if limit == 0 || limit > MAX_LIMIT {
anyhow::bail!("--limit must be between 1 and {MAX_LIMIT}");
}
}
other => anyhow::bail!("unknown observe argument: {}", other),
}
i += 1;
}
Ok(ObserveOptions {
durable_root: durable_root.ok_or_else(|| anyhow::anyhow!("missing --durable-root"))?,
json,
limit,
})
}

pub(crate) fn run(options: ObserveOptions) -> Result<i32> {
let layout = DurableLayout::new(&options.durable_root)?;
let database = layout.delivery_db_path();
let snapshot = match request_live_snapshot(&layout, options.limit)? {
Some(snapshot) => Ok(snapshot),
None => {
let store = DeliveryStore::open_existing(&database)?;
store.observe_snapshot(
layout.durable_root(),
&database,
&DeliveryObserveOptions {
now_ms: now_ms(),
limit: options.limit,
},
)
}
}?;
if options.json {
println!("{}", serde_json::to_string_pretty(&snapshot)?);
} else {
print_human(&snapshot);
}
Ok(0)
}

pub(crate) fn socket_path(layout: &DurableLayout) -> PathBuf {
layout.observe_socket_path()
}

pub(crate) fn request_live_snapshot(
layout: &DurableLayout,
limit: usize,
) -> Result<Option<DeliveryObserveSnapshot>> {
let path = socket_path(layout);
if !path.exists() {
return Ok(None);
}
let mut stream = match UnixStream::connect(&path) {
Ok(stream) => stream,
Err(err) if is_absent_socket_error(&err) => return Ok(None),
Err(err) => {
return Err(err)
.with_context(|| format!("connect live observe socket `{}`", path.display()))
}
};
let request = ObserveSocketRequest {
limit,
now_ms: now_ms(),
};
serde_json::to_writer(&mut stream, &request)?;
stream.write_all(b"\n")?;
stream.flush()?;

let mut response = String::new();
BufReader::new(stream).read_to_string(&mut response)?;
if response.trim().is_empty() {
anyhow::bail!("live observe socket returned an empty response");
}
let response: ObserveSocketResponse = serde_json::from_str(&response).with_context(|| {
format!(
"decode live observe socket response from `{}`",
path.display()
)
})?;
match response {
ObserveSocketResponse::Ok { snapshot } => Ok(Some(snapshot)),
ObserveSocketResponse::Err { error } => {
anyhow::bail!("live observe socket failed: {error}")
}
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub(crate) struct ObserveSocketRequest {
pub(crate) limit: usize,
pub(crate) now_ms: u64,
}

#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(tag = "status", rename_all = "snake_case")]
pub(crate) enum ObserveSocketResponse {
Ok { snapshot: DeliveryObserveSnapshot },
Err { error: String },
}

fn is_absent_socket_error(err: &std::io::Error) -> bool {
matches!(
err.kind(),
std::io::ErrorKind::NotFound | std::io::ErrorKind::ConnectionRefused
)
}

fn print_human(snapshot: &DeliveryObserveSnapshot) {
println!(
"durable_root={} database={} generated_at_ms={}",
snapshot.source.durable_root, snapshot.source.database, snapshot.generated_at_ms
);
println!("{}", snapshot.source.history_semantics);
println!("queues");
if snapshot.queues.is_empty() {
println!(" none");
} else {
for queue in &snapshot.queues {
let oldest = queue
.oldest_pending_age_ms
.map(|value| value.to_string())
.unwrap_or_else(|| "-".to_string());
println!(
" queue={} depth={} pending={} in_flight={} retrying={} oldest_pending_age_ms={}",
queue.queue, queue.depth, queue.pending, queue.in_flight, queue.retrying, oldest
);
}
}

println!("deliveries");
if snapshot.deliveries.is_empty() {
println!(" none");
} else {
for delivery in &snapshot.deliveries {
println!(
" id={} queue={} dept={} status={:?} attempt={} lease_generation={} not_before_ms={} digest={}",
delivery.delivery_id,
delivery.queue,
delivery.dept,
delivery.status,
delivery.attempt,
delivery.lease_generation,
delivery.not_before_ms,
delivery.payload.digest
);
}
}

println!("dead_letters");
if snapshot.dead_letters.is_empty() {
println!(" none");
} else {
for dead in &snapshot.dead_letters {
println!(
" id={} queue={} dept={} attempts={} permanent={} replayable={} dead_at_ms={} digest={}",
dead.delivery_id,
dead.queue,
dead.dept,
dead.attempts,
dead.permanent,
dead.replayable,
dead.dead_at_ms,
dead.payload.digest
);
}
}

if snapshot.truncated.deliveries || snapshot.truncated.dead_letters {
println!(
"truncated deliveries={} dead_letters={}",
snapshot.truncated.deliveries, snapshot.truncated.dead_letters
);
}
}

fn next_value(args: &[String], index: usize, flag: &str) -> Result<String> {
args.get(index)
.cloned()
.filter(|value| !value.is_empty())
.ok_or_else(|| anyhow::anyhow!("missing {} value", flag))
}

fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis()
.min(u64::MAX as u128) as u64
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn parse_requires_durable_root() {
let err = parse_args(&[]).unwrap_err();
assert!(format!("{err:#}").contains("missing --durable-root"));
}

#[test]
fn parse_rejects_unbounded_limit() {
let err = parse_args(&[
"--durable-root".to_string(),
"/tmp/fkst-durable".to_string(),
"--limit".to_string(),
"10001".to_string(),
])
.unwrap_err();
assert!(format!("{err:#}").contains("--limit must be between"));
}
}
Loading
Loading