From 79b90e6a5a755c0de82ed4219013f1b8570c6750 Mon Sep 17 00:00:00 2001 From: Joseph Mearman Date: Tue, 16 Jun 2026 21:22:23 +0100 Subject: [PATCH 1/2] feat(p2p): propagate remote exec exit code end to end MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The remote-exec subsystem previously lost the process exit status: the node's pump_session_output received ExecEvent::Exited and discarded it, and the wire carried only byte frames, so a one-shot 'cascade remote exec -- ' always exited 0 regardless of the remote process's outcome. Add a terminal BepMessage::ExecExit control frame (FROZEN wire discriminant 23) carrying the session id plus optional exit code and signal. The node pump sends it exactly once after the last ExecStream output frame, before returning Ok; it is not credit-gated (the session is over, so the backpressure window does not apply). The manager-side relay session routes an inbound ExecExit to the exec-stream consumer registered for (device_id, session). The consumer channel now carries an ExecStreamEvent enum (Output for byte frames, Exited for the terminal status) instead of a bare ExecStreamFrame, so a one-shot exec pump learns both the bytes and the exit status through one channel. The CLI's one-shot exec now exits with the remote process's code: a normal exit yields code, a signal-killed process maps to 128 + signal per shell convention, and an indeterminate exit (neither set) yields 1. The interactive shell ignores the code (it ends on stream close). The Exec variant docstring drops the 'does not surface exit code' note. The web terminal (cascade-web-api) is untouched: it consumes the local ExecProvider's ExecEvent::Exited directly and already surfaces the code. Tests: - protocol codec round-trip for ExecExit (present/absent code and signal, negative code, indeterminate) plus a malformed-sentinel rejection. - pump_session_output sends an ExecExit frame on ExecEvent::Exited (with a code, and with a signal) — two integration tests. - the manager routing (SyncEngine::handle_message) delivers an ExecExit to the registered consumer as the Exited variant, and drops it when the peer did not advertise the exec domain. - the exit-code mapping function (code wins, signal maps to 128+s, indeterminate maps to 1) and ExecStreamEvent::to_exit_code. Conformance: add ExecExit vectors (code and signal variants) to docs/conformance/frames.v1.json so the byte-exact conformance harness covers the new frame. Update docs/node-protocol.md with the new message type, body layout, and domain-governance references. --- crates/backend-p2p/src/lib.rs | 2 +- crates/backend-p2p/src/sync.rs | 31 ++-- crates/backend-p2p/src/sync/relay_session.rs | 72 +++++++-- crates/backend-p2p/src/sync_tests.rs | 94 ++++++++++++ crates/cascade/src/cli/remote.rs | 70 ++++++--- crates/p2p/src/exec_stream.rs | 148 ++++++++++++++++++- crates/p2p/src/protocol.rs | 88 ++++++++++- crates/p2p/src/protocol_tests.rs | 50 +++++++ crates/p2p/tests/exec_data_plane.rs | 118 +++++++++++++++ docs/conformance/frames.v1.json | 53 ++++++- docs/node-protocol.md | 21 ++- 11 files changed, 681 insertions(+), 66 deletions(-) diff --git a/crates/backend-p2p/src/lib.rs b/crates/backend-p2p/src/lib.rs index 6f93800..452a7a0 100644 --- a/crates/backend-p2p/src/lib.rs +++ b/crates/backend-p2p/src/lib.rs @@ -798,7 +798,7 @@ impl P2pBackend { &self, device_id: &str, session_id: u64, - ) -> tokio::sync::mpsc::UnboundedReceiver { + ) -> tokio::sync::mpsc::UnboundedReceiver { self.sync.subscribe_exec_stream(device_id, session_id).await } diff --git a/crates/backend-p2p/src/sync.rs b/crates/backend-p2p/src/sync.rs index 3938503..cbb4cae 100644 --- a/crates/backend-p2p/src/sync.rs +++ b/crates/backend-p2p/src/sync.rs @@ -51,7 +51,7 @@ use cascade_p2p::block::BlockHash; use cascade_p2p::candidate::{Candidate, CandidateKind}; use cascade_p2p::connection::ConnectionManager; use cascade_p2p::discovery::DiscoveredPeer; -use cascade_p2p::exec_stream::ExecStreamFrame; +use cascade_p2p::exec_stream::ExecStreamEvent; use cascade_p2p::framed::{FramedPeer, FramedSession, SessionReader, SessionWriter}; use cascade_p2p::identity::DeviceIdentity; use cascade_p2p::nat::{ @@ -727,13 +727,16 @@ pub struct SyncEngine { /// When a manager spawns a PTY or process on a remote node, the node /// streams the session's stdout/stderr back as /// [`BepMessage::ExecStream`] frames. This map holds the channel that - /// delivers those frames to the manager-side consumer (the CLI's exec / - /// shell commands). The session loop routes each inbound frame to the - /// matching consumer and sends a [`BepMessage::ExecStreamAck`] back so - /// the node's producer honours the backpressure window. A consumer that - /// drops its receiver is removed from the map on the next frame. + /// delivers those frames (as [`ExecStreamEvent`] items — both + /// [`ExecStreamEvent::Output`] byte frames and the terminal + /// [`ExecStreamEvent::Exited`] carrying the process exit status) to the + /// manager-side consumer (the CLI's exec / shell commands). The session + /// loop routes each inbound frame to the matching consumer and sends a + /// [`BepMessage::ExecStreamAck`] back so the node's producer honours the + /// backpressure window. A consumer that drops its receiver is removed from + /// the map on the next frame. exec_stream_consumers: - Arc>>>, + Arc>>>, } impl std::fmt::Debug for SyncEngine { @@ -2339,16 +2342,18 @@ impl SyncEngine { /// /// The session loop routes each [`BepMessage::ExecStream`] frame for the /// named `(device_id, session_id)` pair to this receiver as a typed - /// [`ExecStreamFrame`], and sends a [`BepMessage::ExecStreamAck`] back so - /// the node's producer keeps flowing. A manager calls this *before* - /// sending the `PtySpawn` / `ProcSpawn` that mints `session_id`, so the - /// first output frame does not race the registration. Dropping the - /// receiver removes the entry on the next frame. + /// [`ExecStreamEvent::Output`], sends a [`BepMessage::ExecStreamAck`] back + /// so the node's producer keeps flowing, and delivers the terminal + /// [`BepMessage::ExecExit`] as [`ExecStreamEvent::Exited`] so a one-shot + /// `exec` pump can exit with the remote process's exit code. A manager + /// calls this *before* sending the `PtySpawn` / `ProcSpawn` that mints + /// `session_id`, so the first output frame does not race the registration. + /// Dropping the receiver removes the entry on the next frame. pub async fn subscribe_exec_stream( &self, device_id: &str, session_id: u64, - ) -> mpsc::UnboundedReceiver { + ) -> mpsc::UnboundedReceiver { let (tx, rx) = mpsc::unbounded_channel(); let mut consumers = self.exec_stream_consumers.lock().await; consumers.insert((device_id.to_owned(), session_id), tx); diff --git a/crates/backend-p2p/src/sync/relay_session.rs b/crates/backend-p2p/src/sync/relay_session.rs index 42581ea..40fe4ab 100644 --- a/crates/backend-p2p/src/sync/relay_session.rs +++ b/crates/backend-p2p/src/sync/relay_session.rs @@ -1210,11 +1210,13 @@ impl SyncEngine { let consumers = self.exec_stream_consumers.lock().await; consumers.get(&key).is_some_and(|sender| { sender - .send(cascade_p2p::exec_stream::ExecStreamFrame { - session, - stream: kind, - bytes: bytes.clone(), - }) + .send(cascade_p2p::exec_stream::ExecStreamEvent::Output( + cascade_p2p::exec_stream::ExecStreamFrame { + session, + stream: kind, + bytes: bytes.clone(), + }, + )) .is_ok() }) }; @@ -1262,16 +1264,62 @@ impl SyncEngine { ); Ok(()) } + BepMessage::ExecExit { + session, + code, + signal, + } => { + // A peer that did not advertise the exec capability domain must + // not send exec frames. Drop with a domain-violation log rather + // than tearing the session down. + if !peer_domains.contains(&CapabilityDomain::Exec) { + debug!( + target: "cascade::backend::p2p", + peer = %peer_device_id, + session, + "dropping exec exit frame — peer did not advertise exec capability", + ); + return Ok(()); + } + // Deliver the terminal exit status to the consumer registered for + // this (peer, session) pair as the Exited variant. Unlike byte + // frames this carries no sequence number and is not acked: it is + // a single control frame sent after the last output frame, so + // the backpressure window does not apply. + let key = (peer_device_id.to_owned(), session); + let consumer_alive = { + let consumers = self.exec_stream_consumers.lock().await; + consumers.get(&key).is_some_and(|sender| { + sender + .send(cascade_p2p::exec_stream::ExecStreamEvent::Exited { + code, + signal, + }) + .is_ok() + }) + }; + if !consumer_alive { + debug!( + target: "cascade::backend::p2p", + peer = %peer_device_id, + session, + "dropping exec exit frame — no live consumer", + ); + let mut consumers = self.exec_stream_consumers.lock().await; + consumers.remove(&key); + } + Ok(()) + } BepMessage::OplogHave { .. } | BepMessage::OplogRequest { .. } | BepMessage::OplogData { .. } => { - // This node does not advertise the oplog capability domain, so a - // peer must never negotiate it and never send these frames. Per - // the node protocol a frame for an un-negotiated domain is dropped - // (with a domain-violation log), never guessed at and never a - // reason to tear the session down — the peer can still serve - // content, management, and exec frames. Wiring oplog sync is a - // later step that lands with the entry-payload co-design. + // This node does not advertise the oplog capability domain, so + // a peer must never negotiate it and never send these frames. + // Per the node protocol a frame for an un-negotiated domain is + // dropped (with a domain-violation log), never guessed at and + // never a reason to tear the session down — the peer can still + // serve content, management, and exec frames. Wiring oplog sync + // is a later step that lands with the entry-payload co-design. debug!( target: "cascade::backend::p2p", peer = %peer_device_id, diff --git a/crates/backend-p2p/src/sync_tests.rs b/crates/backend-p2p/src/sync_tests.rs index e5f0de6..c4de562 100644 --- a/crates/backend-p2p/src/sync_tests.rs +++ b/crates/backend-p2p/src/sync_tests.rs @@ -2794,3 +2794,97 @@ async fn heterogeneous_exec_capability_is_negotiated() { "A must see B as exec-incapable (B did not set advertise_exec)", ); } + +/// An inbound `BepMessage::ExecExit` is routed to the exec-stream consumer +/// registered for `(device_id, session)` as the +/// [`ExecStreamEvent::Exited`] variant, carrying the process's exit code and +/// signal. This is the manager-side half of the exit-code propagation chain. +#[tokio::test] +async fn handle_message_routes_exec_exit_to_consumer() { + use cascade_p2p::exec_stream::ExecStreamEvent; + + let (_dir, engine) = make_engine("test"); + + // Register a consumer for session 42 from peer "PEER". + let mut consumer = engine.subscribe_exec_stream("PEER", 42).await; + + let (outbound_tx, _outbound_rx) = mpsc::unbounded_channel::(); + let pending: Arc>>>> = + Arc::new(Mutex::new(HashMap::new())); + let manage_pending: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + + // The peer advertised the exec domain. + let peer_domains = vec![CapabilityDomain::Exec]; + + engine + .handle_message( + "PEER", + CallerAuthentication::TlsVerified, + BepMessage::ExecExit { + session: 42, + code: Some(7), + signal: None, + }, + &peer_domains, + &outbound_tx, + &pending, + &manage_pending, + ) + .await + .expect("ExecExit must be handled without error"); + + let event = tokio::time::timeout(std::time::Duration::from_secs(2), consumer.recv()) + .await + .expect("consumer must receive the Exited event") + .expect("channel must not close before delivery"); + assert_eq!( + event, + ExecStreamEvent::Exited { + code: Some(7), + signal: None, + } + ); +} + +/// A `BepMessage::ExecExit` from a peer that did not advertise the exec domain +/// is dropped (domain-violation) and never reaches the consumer. +#[tokio::test] +async fn handle_message_drops_exec_exit_without_exec_domain() { + let (_dir, engine) = make_engine("test"); + + let mut consumer = engine.subscribe_exec_stream("PEER", 1).await; + + let (outbound_tx, _outbound_rx) = mpsc::unbounded_channel::(); + let pending: Arc>>>> = + Arc::new(Mutex::new(HashMap::new())); + let manage_pending: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + + // Peer advertised management only — no exec domain. + let peer_domains = vec![CapabilityDomain::Management]; + + engine + .handle_message( + "PEER", + CallerAuthentication::TlsVerified, + BepMessage::ExecExit { + session: 1, + code: Some(0), + signal: None, + }, + &peer_domains, + &outbound_tx, + &pending, + &manage_pending, + ) + .await + .expect("a dropped domain-violation frame is not an error"); + + // The consumer must not receive anything within a short window. + let result = tokio::time::timeout(std::time::Duration::from_millis(100), consumer.recv()).await; + assert!( + result.is_err() || matches!(result, Ok(None)), + "consumer must not receive an exit from a non-exec peer, got {result:?}", + ); +} diff --git a/crates/cascade/src/cli/remote.rs b/crates/cascade/src/cli/remote.rs index c668fbe..dbcd8ec 100644 --- a/crates/cascade/src/cli/remote.rs +++ b/crates/cascade/src/cli/remote.rs @@ -147,9 +147,9 @@ pub enum RemoteCommand { }, /// Run a one-shot command on the remote node under a PTY. The command's /// stdout and stderr are streamed back and written to the terminal; the - /// CLI exits once the node closes the output stream. The exec data plane - /// does not yet carry the process exit code, so it is not reflected in the - /// CLI's own exit status. + /// CLI exits when the node signals the session ended, propagating the + /// remote process's exit code as its own exit status. A process killed by + /// a signal maps to `128 + signal` per the shell convention. /// /// Requires the dangerous `exec:pty` capability over the session's /// working directory, granted explicitly for a folder scope. @@ -566,7 +566,10 @@ async fn run_exec( let mut stream = backend.subscribe_exec_stream(device_id, session_id).await; // Drive the session: multiplex output draining with stdin forwarding - // (shell mode only) using tokio::select. + // (shell mode only) using tokio::select. For a one-shot `exec` the pump + // returns the remote process's exit code; the CLI propagates it as its own + // exit status. For an interactive `shell` the code is irrelevant (the + // session ends on stream close). let pump_result = pump_exec_session( &backend, device_id, @@ -585,12 +588,26 @@ async fn run_exec( .send_pty_signal(device_id, session_id, 15, token) .await; - pump_result + let exit_code = pump_result?; + if !is_shell && let Some(code) = exit_code { + // Propagate the remote process's exit status as this process's own. + // std::process::exit runs no further destructors, but the session + // cleanup above already ran and stdout is flushed by the OS on exit. + std::process::exit(code); + } + Ok(()) } /// Pump the exec session: forward output to the terminal and (for shell mode) -/// forward local stdin to the remote PTY. Returns when the output stream -/// closes (the session ended) or stdin reaches EOF. +/// forward local stdin to the remote PTY. Returns when the session's +/// [`ExecStreamEvent::Exited`] arrives (the process exited) or, for a shell, +/// when stdin reaches EOF. +/// +/// Returns `Ok(Some(code))` when a one-shot `exec` received the remote +/// process's exit status (a shell-style code: `code` for a normal exit, +/// `128 + signal` for a signal kill, `1` for an indeterminate exit). Returns +/// `Ok(None)` for an interactive shell — its exit status is not the remote +/// process's. /// /// `token` is re-presented with every `PtyWrite` so a caller authenticated /// only by a capability token (no on-node grant) can drive an interactive @@ -599,26 +616,34 @@ async fn pump_exec_session( backend: &std::sync::Arc, device_id: &str, session_id: u64, - stream: &mut tokio::sync::mpsc::UnboundedReceiver, + stream: &mut tokio::sync::mpsc::UnboundedReceiver, is_shell: bool, token: Option, -) -> Result<()> { +) -> Result> { use std::io::IsTerminal as _; use std::io::Write as _; use tokio::io::AsyncReadExt as _; if !is_shell { // One-shot exec: drain output to the terminal. The remote process's - // exit code is not carried on the exec data plane today (there is no - // exit frame), so the CLI cannot forward it; the session is considered - // done when the node closes the output stream. + // exit code arrives as an ExecStreamEvent::Exited control frame after + // the last output frame; the CLI propagates it as its own exit status. let stdout = std::io::stdout(); - while let Some(frame) = stream.recv().await { - let mut handle = stdout.lock(); - handle.write_all(&frame.bytes)?; - handle.flush()?; + while let Some(event) = stream.recv().await { + match event { + cascade_p2p::exec_stream::ExecStreamEvent::Output(frame) => { + let mut handle = stdout.lock(); + handle.write_all(&frame.bytes)?; + handle.flush()?; + } + cascade_p2p::exec_stream::ExecStreamEvent::Exited { .. } => { + return Ok(event.to_exit_code()); + } + } } - return Ok(()); + // The stream closed without an Exited event (the node went away before + // delivering the exit frame). Treat it as a generic failure. + return Ok(Some(1)); } // Interactive shell: drive a real raw-mode PTY. The local terminal is put @@ -653,13 +678,16 @@ async fn pump_exec_session( loop { tokio::select! { // Output from the remote session. - frame = stream.recv() => match frame { - Some(frame) => { + event = stream.recv() => match event { + Some(cascade_p2p::exec_stream::ExecStreamEvent::Output(frame)) => { let mut handle = stdout.lock(); handle.write_all(&frame.bytes)?; handle.flush()?; } - None => break, + // The remote process exited: end the shell session. The exit + // code is ignored for the interactive shell — its own exit + // status is not the remote process's. + Some(cascade_p2p::exec_stream::ExecStreamEvent::Exited { .. }) | None => break, }, // Local stdin bytes -> remote PtyWrite. In raw mode each keystroke // arrives immediately; ^C arrives as 0x03 and the remote PTY raises @@ -691,7 +719,7 @@ async fn pump_exec_session( } drop(raw); - Ok(()) + Ok(None) } /// Restores the local terminal from raw mode when the shell session ends, diff --git a/crates/p2p/src/exec_stream.rs b/crates/p2p/src/exec_stream.rs index 4b4cd6c..247274a 100644 --- a/crates/p2p/src/exec_stream.rs +++ b/crates/p2p/src/exec_stream.rs @@ -89,7 +89,7 @@ impl WireStreamKind { /// which depends on this crate; the registration therefore lives there rather /// than here, where the data-plane type is defined), so the consumer never sees /// the wire sequence number or raw discriminant. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct ExecStreamFrame { /// The session the bytes belong to. pub session: u64, @@ -99,6 +99,64 @@ pub struct ExecStreamFrame { pub bytes: Vec, } +/// One item delivered to the manager-side exec-stream consumer over its +/// subscription channel. +/// +/// The consumer channel (handed out by `SyncEngine::subscribe_exec_stream`) used +/// to carry only [`ExecStreamFrame`] byte payloads; the terminal +/// [`BepMessage::ExecExit`] control frame is now delivered through the same +/// channel as the [`Self::Exited`] variant, so a one-shot `exec` pump can exit +/// with the remote process's exit code rather than only learning the session +/// ended when the byte stream closes. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ExecStreamEvent { + /// A chunk of stdout/stderr bytes for the session. + Output(ExecStreamFrame), + /// The session's process exited. `code` is the normal exit code (if any); + /// `signal` is the POSIX signal that killed it (if any). Exactly one is + /// `Some` for a normal Unix exit; both `None` means the exit status was + /// indeterminate. A consumer that wants a shell-style exit status maps a + /// signal-killed process to `128 + signal`. + Exited { + /// The process exit code, if it exited normally. + code: Option, + /// The POSIX signal number that terminated the process, if killed by a + /// signal. + signal: Option, + }, +} + +impl ExecStreamEvent { + /// The exit status a one-shot `exec` should propagate, following the shell + /// convention: a normal exit yields `code`, a signal-killed process yields + /// `128 + signal`, and an indeterminate exit (neither set) yields the + /// generic failure status `1`. + #[must_use] + pub const fn to_exit_code(&self) -> Option { + match self { + Self::Output(_) => None, + Self::Exited { code, signal } => Some(exit_code_from_code_signal(*code, *signal)), + } + } +} + +/// Map an optional exit `code` and optional `signal` to a shell-style process +/// exit status. Exposed so the CLI's one-shot `exec` and tests share one +/// definition. +/// +/// - `code` present -> `code`. +/// - `code` absent, `signal` present -> `128 + signal`. +/// - both absent -> `1` (generic failure: the process exited without a +/// determinable status). +#[must_use] +pub const fn exit_code_from_code_signal(code: Option, signal: Option) -> i32 { + match (code, signal) { + (Some(c), _) => c, + (None, Some(s)) => 128 + s, + (None, None) => 1, + } +} + /// Default credit window a consumer advertises, in bytes. /// /// Sized to the node-side output channel's buffering headroom: the local @@ -238,9 +296,12 @@ impl ExecStreamCredit { /// /// Returns when the session's output channel closes (the session ended and all /// pumps dropped their senders) or an [`ExecEvent::Exited`] arrives, whichever -/// comes first. An `Exited` event ends the stream cleanly: the function returns -/// `Ok(())` without tearing down the shared writer, leaving the caller to send a -/// `Close` or continue using the session for other traffic. +/// comes first. An `Exited` event ends the stream cleanly: the pump sends a +/// single [`BepMessage::ExecExit`] control frame carrying the process's exit +/// code and signal (NOT credit-gated — it is one terminal frame sent after the +/// last output frame), then returns `Ok(())` without tearing down the shared +/// writer, leaving the caller to send a `Close` or continue using the session +/// for other traffic. pub async fn pump_session_output( session: ExecSessionId, mut events: tokio::sync::mpsc::Receiver, @@ -280,7 +341,26 @@ pub async fn pump_session_output( credit.record_sent(seq, sent_bytes).await; seq = seq.wrapping_add(1); } - ExecEvent::Exited { .. } => return Ok(()), + ExecEvent::Exited { code, signal } => { + // Terminal control frame: send exactly once, after the last + // output frame. It is NOT credit-gated — it carries no sequence + // number and the manager routes it to the consumer without + // acking, so it must not be throttled by a slow consumer's + // window (the session is over; the only remaining business is + // to deliver the exit status). + let exit_frame = BepMessage::ExecExit { + session: session.0, + code, + signal, + }; + writer + .lock() + .await + .send(&exit_frame) + .await + .context("sending exec exit frame")?; + return Ok(()); + } } } Ok(()) @@ -442,3 +522,61 @@ where } Ok(None) } + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used, clippy::indexing_slicing)] +mod tests { + use super::*; + + #[test] + fn exit_code_normal_exit_yields_code() { + assert_eq!(exit_code_from_code_signal(Some(0), None), 0); + assert_eq!(exit_code_from_code_signal(Some(42), None), 42); + assert_eq!(exit_code_from_code_signal(Some(-1), None), -1); + } + + #[test] + fn exit_code_signal_kill_yields_128_plus_signal() { + assert_eq!(exit_code_from_code_signal(None, Some(9)), 137); + assert_eq!(exit_code_from_code_signal(None, Some(15)), 143); + assert_eq!(exit_code_from_code_signal(None, Some(2)), 130); + } + + #[test] + fn exit_code_indeterminate_yields_generic_failure() { + assert_eq!(exit_code_from_code_signal(None, None), 1); + } + + #[test] + fn exit_code_code_takes_precedence_over_signal() { + // A normal exit code wins even if a signal is also carried (defensive: + // the node sets exactly one for a real Unix exit, but the mapping must + // not panic or surprise if both are present). + assert_eq!(exit_code_from_code_signal(Some(3), Some(9)), 3); + } + + #[test] + fn event_to_exit_code_is_none_for_output() { + let frame = ExecStreamFrame { + session: 1, + stream: WireStreamKind::Stdout, + bytes: Vec::new(), + }; + assert_eq!(ExecStreamEvent::Output(frame).to_exit_code(), None); + } + + #[test] + fn event_to_exit_code_maps_exited() { + let exited = ExecStreamEvent::Exited { + code: Some(42), + signal: None, + }; + assert_eq!(exited.to_exit_code(), Some(42)); + + let killed = ExecStreamEvent::Exited { + code: None, + signal: Some(9), + }; + assert_eq!(killed.to_exit_code(), Some(137)); + } +} diff --git a/crates/p2p/src/protocol.rs b/crates/p2p/src/protocol.rs index 9625aaa..b622c2a 100644 --- a/crates/p2p/src/protocol.rs +++ b/crates/p2p/src/protocol.rs @@ -45,6 +45,16 @@ const MSG_OPLOG_HAVE: u32 = 20; const MSG_OPLOG_REQUEST: u32 = 21; /// Carry a contiguous range of opaque, signed oplog entries. FROZEN at 22. const MSG_OPLOG_DATA: u32 = 22; +/// Carry the exit status of a finished exec session. FROZEN at 23. +/// +/// Sent once by the node's exec output pump after the last +/// [`BepMessage::ExecStream`] output frame, on the session's terminal +/// [`cascade_exec::ExecEvent::Exited`]. It is a single control frame, not +/// credit-gated: the manager routes it to the exec-stream consumer registered +/// for `(device_id, session)` so a one-shot `exec` can exit with the remote +/// process's code (a signal-killed process carries `signal`, which the CLI maps +/// to `128 + signal` per shell convention). +const MSG_EXEC_EXIT: u32 = 23; /// The protocol version this implementation speaks. FROZEN; a version bump is /// gated by the conformance vectors in `docs/conformance/`. @@ -1069,6 +1079,26 @@ pub enum BepMessage { /// The credit window in bytes the consumer will accept past `ack_seq`. window: u32, }, + /// The exit status of a finished exec session. + /// + /// Sent once by the node's exec output pump after the last + /// [`BepMessage::ExecStream`] output frame, on the session's terminal + /// [`cascade_exec::ExecEvent::Exited`]. It is a single control frame, not + /// credit-gated: the manager routes it to the exec-stream consumer + /// registered for `(device_id, session)` so a one-shot `exec` can exit with + /// the remote process's code. A signal-killed process carries `signal`; the + /// CLI maps that to `128 + signal` per shell convention. Exactly one of + /// `code` and `signal` is present for a normal Unix exit; both absent means + /// the exit status was indeterminate. + ExecExit { + /// The session that exited. + session: u64, + /// The process exit code, if it exited normally. + code: Option, + /// The POSIX signal number that terminated the process, if killed by a + /// signal. + signal: Option, + }, /// Advertise the latest sequence of the sender's single-writer oplog for a /// named peer. /// @@ -1150,6 +1180,7 @@ impl BepMessage { Self::Handshake { .. } => MSG_HANDSHAKE, Self::ExecStream { .. } => MSG_EXEC_STREAM, Self::ExecStreamAck { .. } => MSG_EXEC_STREAM_ACK, + Self::ExecExit { .. } => MSG_EXEC_EXIT, Self::OplogHave { .. } => MSG_OPLOG_HAVE, Self::OplogRequest { .. } => MSG_OPLOG_REQUEST, Self::OplogData { .. } => MSG_OPLOG_DATA, @@ -1168,9 +1199,9 @@ impl BepMessage { /// `IndexUpdate`, `Request`, `Response`). /// - `management` covers `ManageRequest` / `ManageResponse`. /// - `exec` covers the live stdio stream frames (`ExecStream`, - /// `ExecStreamAck`); exec *control* travels as `management` frames and is - /// governed by the management domain plus the exec capability grant, not by - /// this mapping. + /// `ExecStreamAck`, `ExecExit`); exec *control* travels as `management` + /// frames and is governed by the management domain plus the exec capability + /// grant, not by this mapping. /// - `oplog` covers `OplogHave` / `OplogRequest` / `OplogData`. /// - Everything else is transport plumbing (the handshake itself, keepalive, /// NAT-traversal and relay frames) that every peer speaks regardless of the @@ -1186,7 +1217,9 @@ impl BepMessage { Self::ManageRequest { .. } | Self::ManageResponse { .. } => { Some(CapabilityDomain::Management) } - Self::ExecStream { .. } | Self::ExecStreamAck { .. } => Some(CapabilityDomain::Exec), + Self::ExecStream { .. } | Self::ExecStreamAck { .. } | Self::ExecExit { .. } => { + Some(CapabilityDomain::Exec) + } Self::OplogHave { .. } | Self::OplogRequest { .. } | Self::OplogData { .. } => { Some(CapabilityDomain::Oplog) } @@ -1368,6 +1401,15 @@ pub fn encode_message(msg: &BepMessage) -> Result> { encode_u64(&mut body, *ack_seq); encode_u32(&mut body, *window); } + BepMessage::ExecExit { + session, + code, + signal, + } => { + encode_u64(&mut body, *session); + encode_opt_i32(&mut body, *code); + encode_opt_i32(&mut body, *signal); + } BepMessage::OplogHave { peer, head_seq } => { encode_string(&mut body, peer)?; encode_u64(&mut body, *head_seq); @@ -1711,6 +1753,19 @@ fn encode_opt_i64(buf: &mut Vec, val: Option) { } } +/// Encode an `Option` as a one-word presence sentinel followed, when +/// present, by the value. Mirrors [`encode_opt_i64`] for the `i32`-typed exit +/// code and signal carried by [`BepMessage::ExecExit`]. +fn encode_opt_i32(buf: &mut Vec, val: Option) { + match val { + None => encode_u32(buf, OPTION_NONE), + Some(v) => { + encode_u32(buf, OPTION_SOME); + encode_i32(buf, v); + } + } +} + /// Encode an `Option<&str>` as a one-word presence sentinel followed, when /// present, by the string. fn encode_opt_string(buf: &mut Vec, val: Option<&str>) -> Result<()> { @@ -1799,6 +1854,7 @@ pub fn decode_message(frame: &[u8]) -> Result { MSG_HANDSHAKE => decode_handshake(rest), MSG_EXEC_STREAM => decode_exec_stream(rest), MSG_EXEC_STREAM_ACK => decode_exec_stream_ack(rest), + MSG_EXEC_EXIT => decode_exec_exit(rest), MSG_OPLOG_HAVE => decode_oplog_have(rest), MSG_OPLOG_REQUEST => decode_oplog_request(rest), MSG_OPLOG_DATA => decode_oplog_data(rest), @@ -2140,6 +2196,17 @@ fn decode_exec_stream_ack(data: &[u8]) -> Result { }) } +fn decode_exec_exit(data: &[u8]) -> Result { + let (session, rest) = decode_u64(data)?; + let (code, rest) = decode_opt_i32(rest)?; + let (signal, _) = decode_opt_i32(rest)?; + Ok(BepMessage::ExecExit { + session, + code, + signal, + }) +} + fn decode_oplog_have(data: &[u8]) -> Result { let (peer, rest) = decode_string(data)?; let (head_seq, _) = decode_u64(rest)?; @@ -2185,6 +2252,19 @@ fn decode_opt_i64(data: &[u8]) -> Result<(Option, &[u8])> { } } +/// Decode an `Option` written by [`encode_opt_i32`]. +fn decode_opt_i32(data: &[u8]) -> Result<(Option, &[u8])> { + let (tag, rest) = decode_u32(data)?; + match tag { + OPTION_NONE => Ok((None, rest)), + OPTION_SOME => { + let (val, rest) = decode_i32(rest)?; + Ok((Some(val), rest)) + } + other => anyhow::bail!("invalid option sentinel {other}"), + } +} + /// Decode an `Option` written by [`encode_opt_string`]. fn decode_opt_string(data: &[u8]) -> Result<(Option, &[u8])> { let (tag, rest) = decode_u32(data)?; diff --git a/crates/p2p/src/protocol_tests.rs b/crates/p2p/src/protocol_tests.rs index 4201746..be4da15 100644 --- a/crates/p2p/src/protocol_tests.rs +++ b/crates/p2p/src/protocol_tests.rs @@ -1195,6 +1195,56 @@ fn encode_decode_exec_stream_ack_round_trip() { }); } +#[test] +fn encode_decode_exec_exit_round_trip() { + // Normal exit with a code, no signal. + round_trip(BepMessage::ExecExit { + session: 42, + code: Some(0), + signal: None, + }); + // Non-zero exit code. + round_trip(BepMessage::ExecExit { + session: u64::MAX, + code: Some(42), + signal: None, + }); + // Signal kill: no code, signal present. + round_trip(BepMessage::ExecExit { + session: 7, + code: None, + signal: Some(9), + }); + // Indeterminate exit: neither set. + round_trip(BepMessage::ExecExit { + session: 1, + code: None, + signal: None, + }); + // Negative code. + round_trip(BepMessage::ExecExit { + session: 3, + code: Some(-1), + signal: None, + }); +} + +#[test] +fn exec_exit_rejects_bad_option_sentinel() { + let mut body = Vec::new(); + encode_u32(&mut body, MSG_EXEC_EXIT); + encode_u64(&mut body, 1); + // An invalid presence sentinel (2) for `code` must fail to decode. + encode_u32(&mut body, 2); + encode_i32(&mut body, 0); + encode_u32(&mut body, OPTION_NONE); + let body_len = u32::try_from(body.len()).unwrap(); + let mut frame = Vec::new(); + encode_u32(&mut frame, body_len); + frame.extend_from_slice(&body); + assert!(decode_message(&frame).is_err()); +} + // ── Exec management commands ── fn exec_request(command: ManageCommand) -> BepMessage { diff --git a/crates/p2p/tests/exec_data_plane.rs b/crates/p2p/tests/exec_data_plane.rs index d0dda33..422faba 100644 --- a/crates/p2p/tests/exec_data_plane.rs +++ b/crates/p2p/tests/exec_data_plane.rs @@ -346,3 +346,121 @@ async fn kill_tears_down_stream_cleanly() { "pump must return Ok after a clean kill teardown: {result:?}" ); } + +/// On `ExecEvent::Exited`, the pump sends exactly one `BepMessage::ExecExit` +/// control frame carrying the process's code/signal, after any output frames, +/// then returns `Ok(())`. The exit frame is not credit-gated and carries no +/// sequence number. +#[tokio::test] +async fn pump_emits_exec_exit_on_exited_event() { + use cascade_exec::ExecEvent; + + let id = ExecSessionId(99); + let (events_tx, events_rx) = mpsc::channel::(8); + + let (producer_t, consumer_t) = ChannelTransport::pair(); + let (_p_reader, p_writer) = FramedSession::new(producer_t).split(); + let (mut c_reader, _c_writer) = FramedSession::new(consumer_t).split(); + + let producer_writer = Arc::new(Mutex::new(p_writer)); + let credit = ExecStreamCredit::new(DEFAULT_CREDIT_WINDOW); + + // Send one output chunk, then the exit event. The pump must emit an + // ExecStream frame followed by an ExecExit frame. + events_tx + .send(ExecEvent::Output { + stream: cascade_exec::ExecStreamKind::Stdout, + bytes: b"hello".to_vec(), + }) + .await + .unwrap(); + events_tx + .send(ExecEvent::Exited { + code: Some(42), + signal: None, + }) + .await + .unwrap(); + + let pump = { + let writer = Arc::clone(&producer_writer); + let credit = Arc::clone(&credit); + tokio::spawn(async move { pump_session_output(id, events_rx, &writer, &credit).await }) + }; + + // First frame is the output chunk. + let output_frame = tokio::time::timeout(Duration::from_secs(5), c_reader.recv()) + .await + .expect("output frame should arrive") + .unwrap() + .expect("output frame must be Some"); + assert!(matches!(output_frame, BepMessage::ExecStream { bytes, .. } if bytes == b"hello")); + + // Second frame is the exit control frame carrying the code. + let exit_frame = tokio::time::timeout(Duration::from_secs(5), c_reader.recv()) + .await + .expect("exit frame should arrive") + .unwrap() + .expect("exit frame must be Some"); + assert_eq!( + exit_frame, + BepMessage::ExecExit { + session: 99, + code: Some(42), + signal: None, + } + ); + + let result = tokio::time::timeout(Duration::from_secs(5), pump) + .await + .expect("pump should return after exit") + .unwrap(); + assert!(result.is_ok(), "pump must return Ok after exit: {result:?}"); +} + +/// A signal-killed process carries `signal` (and no `code`) through the exit +/// frame end to end. +#[tokio::test] +async fn pump_emits_exec_exit_with_signal() { + use cascade_exec::ExecEvent; + + let id = ExecSessionId(5); + let (events_tx, events_rx) = mpsc::channel::(8); + + let (producer_t, consumer_t) = ChannelTransport::pair(); + let (_p_reader, p_writer) = FramedSession::new(producer_t).split(); + let (mut c_reader, _c_writer) = FramedSession::new(consumer_t).split(); + + let producer_writer = Arc::new(Mutex::new(p_writer)); + let credit = ExecStreamCredit::new(DEFAULT_CREDIT_WINDOW); + + events_tx + .send(ExecEvent::Exited { + code: None, + signal: Some(9), + }) + .await + .unwrap(); + + let pump = { + let writer = Arc::clone(&producer_writer); + let credit = Arc::clone(&credit); + tokio::spawn(async move { pump_session_output(id, events_rx, &writer, &credit).await }) + }; + + let exit_frame = tokio::time::timeout(Duration::from_secs(5), c_reader.recv()) + .await + .expect("exit frame should arrive") + .unwrap() + .expect("exit frame must be Some"); + assert_eq!( + exit_frame, + BepMessage::ExecExit { + session: 5, + code: None, + signal: Some(9), + } + ); + + let _ = tokio::time::timeout(Duration::from_secs(5), pump).await; +} diff --git a/docs/conformance/frames.v1.json b/docs/conformance/frames.v1.json index 4f41b55..b526e52 100644 --- a/docs/conformance/frames.v1.json +++ b/docs/conformance/frames.v1.json @@ -8,7 +8,11 @@ "message": { "kind": "Handshake", "protocol_version": 1, - "domains": ["content", "management", "exec"] + "domains": [ + "content", + "management", + "exec" + ] }, "wire_hex": "00000018000000110000000100000003000000000000000100000002" }, @@ -27,7 +31,10 @@ "cols": 80, "rows": 24 }, - "scope": { "kind": "folder", "path": "/work" }, + "scope": { + "kind": "folder", + "path": "/work" + }, "token": null }, "wire_hex": "000000540000000f00000000000000010000000d00000001000000072f62696e2f7368000000000000000001000000052f776f726b00000000000000000000500000001800000001000000052f776f726b00000000000000" @@ -38,8 +45,14 @@ "message": { "kind": "ManageRequest", "request_id": 2, - "command": { "kind": "ProcKill", "session": 7 }, - "scope": { "kind": "folder", "path": "/work" }, + "command": { + "kind": "ProcKill", + "session": 7 + }, + "scope": { + "kind": "folder", + "path": "/work" + }, "token": null }, "wire_hex": "0000002c0000000f000000000000000200000013000000000000000700000001000000052f776f726b00000000000000" @@ -50,7 +63,10 @@ "message": { "kind": "ManageResponse", "request_id": 1, - "result": { "kind": "ExecSpawned", "session": 42 } + "result": { + "kind": "ExecSpawned", + "session": 42 + } }, "wire_hex": "0000001800000010000000000000000100000002000000000000002a" }, @@ -77,6 +93,28 @@ }, "wire_hex": "0000001800000013000000000000002a000000000000000100010000" }, + { + "name": "exec_exit_code", + "msg_type": 23, + "message": { + "kind": "ExecExit", + "session": 42, + "code": 42, + "signal": null + }, + "wire_hex": "0000001800000017000000000000002a000000010000002a00000000" + }, + { + "name": "exec_exit_signal", + "msg_type": 23, + "message": { + "kind": "ExecExit", + "session": 7, + "code": null, + "signal": 9 + }, + "wire_hex": "00000018000000170000000000000007000000000000000100000009" + }, { "name": "oplog_have", "msg_type": 20, @@ -104,7 +142,10 @@ "kind": "OplogData", "peer": "PEER", "from_seq": 1, - "ops_hex": ["6f7032", "6f7033"] + "ops_hex": [ + "6f7032", + "6f7033" + ] }, "wire_hex": "00000028000000160000000450454552000000000000000100000002000000036f703200000000036f703300" } diff --git a/docs/node-protocol.md b/docs/node-protocol.md index 2f0d097..a1b3e96 100644 --- a/docs/node-protocol.md +++ b/docs/node-protocol.md @@ -188,6 +188,7 @@ some) followed, when present, by the value. | 20 | `OplogHave` | oplog | yes | | 21 | `OplogRequest` | oplog | yes | | 22 | `OplogData` | oplog (envelope) | yes | +| 23 | `ExecExit` | exec | yes | Transport frames (the handshake itself, keepalive, NAT-traversal, and relay frames) are domain-independent: every peer speaks them regardless of the @@ -195,8 +196,8 @@ negotiated capability set. The remaining frames are governed by the domain in th table; the receiver refuses an inbound frame whose domain is not in the negotiated set. (Exec *control* travels as `management` frames and is governed by the management domain plus the exec capability grant, not by the exec domain -mapping; the exec domain governs only the `ExecStream`/`ExecStreamAck` stdio -frames.) +mapping; the exec domain governs only the `ExecStream`/`ExecStreamAck`/ +`ExecExit` stdio frames.) ### Handshake (type 17) @@ -245,6 +246,18 @@ Body: `u64 session`, `u64 ack_seq` (highest contiguous sequence accepted), `u32 window` (credit, in bytes, the consumer will accept past `ack_seq`). The producer must not send beyond the window. +### Exec exit (`ExecExit`, type 23) + +Body: `u64 session`, `Option code` (presence sentinel `0`=absent / `1`=present +then `i32`), `Option signal` (same encoding). Sent exactly once by the node's +exec output pump after the last `ExecStream` output frame, on the session's +terminal exit. It is a single control frame, not credit-gated: it carries no +sequence number and the manager routes it to the exec-stream consumer registered +for `(device_id, session)` without acking. Exactly one of `code`/`signal` is +present for a normal Unix exit; both absent means the exit status was +indeterminate (the CLI maps that to exit code `1`). A signal-killed process +carries `signal`; the CLI maps it to `128 + signal` per the shell convention. + ### Oplog sync (types 20–22) - `OplogHave` (20): string `peer`, `u64 head_seq`. @@ -274,8 +287,8 @@ consumed by every implementation's CI: human and the lowercase hex of its full `[len][type][body]` frame. A conformant codec must decode the hex to the message and re-encode the message to exactly the same hex. Covers the handshake, the exec `ManageCommand` verbs, the - `ExecStream`/`ExecStreamAck` stdio frames, and the oplog frames (with arbitrary - opaque entry bytes, since the entry payload is not frozen). + `ExecStream`/`ExecStreamAck`/`ExecExit` stdio frames, and the oplog frames + (with arbitrary opaque entry bytes, since the entry payload is not frozen). - `handshake.v1.json` — for each `(local domains, peer domains)` pair, the expected negotiated set and the domains whose frames the local node must refuse from that peer. Drives the heterogeneous-peer and graceful-degradation rules, From 89cdb22d52a211b3db80f7b81e44fb95f0b2eafa Mon Sep 17 00:00:00 2001 From: Joseph Mearman Date: Tue, 16 Jun 2026 21:36:28 +0100 Subject: [PATCH 2/2] fix(docs): resolve broken intra-doc link in pump_exec_session The doc comment on pump_exec_session referenced [`ExecStreamEvent::Exited`] by bare name, but the type is only in scope via its full path cascade_p2p::exec_stream::ExecStreamEvent. rustdoc reported the link as unresolved under -D warnings. Use the full path so the link resolves. --- crates/cascade/src/cli/remote.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/cascade/src/cli/remote.rs b/crates/cascade/src/cli/remote.rs index dbcd8ec..488ea81 100644 --- a/crates/cascade/src/cli/remote.rs +++ b/crates/cascade/src/cli/remote.rs @@ -600,8 +600,8 @@ async fn run_exec( /// Pump the exec session: forward output to the terminal and (for shell mode) /// forward local stdin to the remote PTY. Returns when the session's -/// [`ExecStreamEvent::Exited`] arrives (the process exited) or, for a shell, -/// when stdin reaches EOF. +/// [`cascade_p2p::exec_stream::ExecStreamEvent::Exited`] arrives (the process +/// exited) or, for a shell, when stdin reaches EOF. /// /// Returns `Ok(Some(code))` when a one-shot `exec` received the remote /// process's exit status (a shell-style code: `code` for a normal exit,