diff --git a/crates/backend-p2p/src/lib.rs b/crates/backend-p2p/src/lib.rs index 6f93800..452a7a0 100644 --- a/crates/backend-p2p/src/lib.rs +++ b/crates/backend-p2p/src/lib.rs @@ -798,7 +798,7 @@ impl P2pBackend { &self, device_id: &str, session_id: u64, - ) -> tokio::sync::mpsc::UnboundedReceiver { + ) -> tokio::sync::mpsc::UnboundedReceiver { self.sync.subscribe_exec_stream(device_id, session_id).await } diff --git a/crates/backend-p2p/src/sync.rs b/crates/backend-p2p/src/sync.rs index 3938503..cbb4cae 100644 --- a/crates/backend-p2p/src/sync.rs +++ b/crates/backend-p2p/src/sync.rs @@ -51,7 +51,7 @@ use cascade_p2p::block::BlockHash; use cascade_p2p::candidate::{Candidate, CandidateKind}; use cascade_p2p::connection::ConnectionManager; use cascade_p2p::discovery::DiscoveredPeer; -use cascade_p2p::exec_stream::ExecStreamFrame; +use cascade_p2p::exec_stream::ExecStreamEvent; use cascade_p2p::framed::{FramedPeer, FramedSession, SessionReader, SessionWriter}; use cascade_p2p::identity::DeviceIdentity; use cascade_p2p::nat::{ @@ -727,13 +727,16 @@ pub struct SyncEngine { /// When a manager spawns a PTY or process on a remote node, the node /// streams the session's stdout/stderr back as /// [`BepMessage::ExecStream`] frames. This map holds the channel that - /// delivers those frames to the manager-side consumer (the CLI's exec / - /// shell commands). The session loop routes each inbound frame to the - /// matching consumer and sends a [`BepMessage::ExecStreamAck`] back so - /// the node's producer honours the backpressure window. A consumer that - /// drops its receiver is removed from the map on the next frame. + /// delivers those frames (as [`ExecStreamEvent`] items — both + /// [`ExecStreamEvent::Output`] byte frames and the terminal + /// [`ExecStreamEvent::Exited`] carrying the process exit status) to the + /// manager-side consumer (the CLI's exec / shell commands). The session + /// loop routes each inbound frame to the matching consumer and sends a + /// [`BepMessage::ExecStreamAck`] back so the node's producer honours the + /// backpressure window. A consumer that drops its receiver is removed from + /// the map on the next frame. exec_stream_consumers: - Arc>>>, + Arc>>>, } impl std::fmt::Debug for SyncEngine { @@ -2339,16 +2342,18 @@ impl SyncEngine { /// /// The session loop routes each [`BepMessage::ExecStream`] frame for the /// named `(device_id, session_id)` pair to this receiver as a typed - /// [`ExecStreamFrame`], and sends a [`BepMessage::ExecStreamAck`] back so - /// the node's producer keeps flowing. A manager calls this *before* - /// sending the `PtySpawn` / `ProcSpawn` that mints `session_id`, so the - /// first output frame does not race the registration. Dropping the - /// receiver removes the entry on the next frame. + /// [`ExecStreamEvent::Output`], sends a [`BepMessage::ExecStreamAck`] back + /// so the node's producer keeps flowing, and delivers the terminal + /// [`BepMessage::ExecExit`] as [`ExecStreamEvent::Exited`] so a one-shot + /// `exec` pump can exit with the remote process's exit code. A manager + /// calls this *before* sending the `PtySpawn` / `ProcSpawn` that mints + /// `session_id`, so the first output frame does not race the registration. + /// Dropping the receiver removes the entry on the next frame. pub async fn subscribe_exec_stream( &self, device_id: &str, session_id: u64, - ) -> mpsc::UnboundedReceiver { + ) -> mpsc::UnboundedReceiver { let (tx, rx) = mpsc::unbounded_channel(); let mut consumers = self.exec_stream_consumers.lock().await; consumers.insert((device_id.to_owned(), session_id), tx); diff --git a/crates/backend-p2p/src/sync/relay_session.rs b/crates/backend-p2p/src/sync/relay_session.rs index 42581ea..40fe4ab 100644 --- a/crates/backend-p2p/src/sync/relay_session.rs +++ b/crates/backend-p2p/src/sync/relay_session.rs @@ -1210,11 +1210,13 @@ impl SyncEngine { let consumers = self.exec_stream_consumers.lock().await; consumers.get(&key).is_some_and(|sender| { sender - .send(cascade_p2p::exec_stream::ExecStreamFrame { - session, - stream: kind, - bytes: bytes.clone(), - }) + .send(cascade_p2p::exec_stream::ExecStreamEvent::Output( + cascade_p2p::exec_stream::ExecStreamFrame { + session, + stream: kind, + bytes: bytes.clone(), + }, + )) .is_ok() }) }; @@ -1262,16 +1264,62 @@ impl SyncEngine { ); Ok(()) } + BepMessage::ExecExit { + session, + code, + signal, + } => { + // A peer that did not advertise the exec capability domain must + // not send exec frames. Drop with a domain-violation log rather + // than tearing the session down. + if !peer_domains.contains(&CapabilityDomain::Exec) { + debug!( + target: "cascade::backend::p2p", + peer = %peer_device_id, + session, + "dropping exec exit frame — peer did not advertise exec capability", + ); + return Ok(()); + } + // Deliver the terminal exit status to the consumer registered for + // this (peer, session) pair as the Exited variant. Unlike byte + // frames this carries no sequence number and is not acked: it is + // a single control frame sent after the last output frame, so + // the backpressure window does not apply. + let key = (peer_device_id.to_owned(), session); + let consumer_alive = { + let consumers = self.exec_stream_consumers.lock().await; + consumers.get(&key).is_some_and(|sender| { + sender + .send(cascade_p2p::exec_stream::ExecStreamEvent::Exited { + code, + signal, + }) + .is_ok() + }) + }; + if !consumer_alive { + debug!( + target: "cascade::backend::p2p", + peer = %peer_device_id, + session, + "dropping exec exit frame — no live consumer", + ); + let mut consumers = self.exec_stream_consumers.lock().await; + consumers.remove(&key); + } + Ok(()) + } BepMessage::OplogHave { .. } | BepMessage::OplogRequest { .. } | BepMessage::OplogData { .. } => { - // This node does not advertise the oplog capability domain, so a - // peer must never negotiate it and never send these frames. Per - // the node protocol a frame for an un-negotiated domain is dropped - // (with a domain-violation log), never guessed at and never a - // reason to tear the session down — the peer can still serve - // content, management, and exec frames. Wiring oplog sync is a - // later step that lands with the entry-payload co-design. + // This node does not advertise the oplog capability domain, so + // a peer must never negotiate it and never send these frames. + // Per the node protocol a frame for an un-negotiated domain is + // dropped (with a domain-violation log), never guessed at and + // never a reason to tear the session down — the peer can still + // serve content, management, and exec frames. Wiring oplog sync + // is a later step that lands with the entry-payload co-design. debug!( target: "cascade::backend::p2p", peer = %peer_device_id, diff --git a/crates/backend-p2p/src/sync_tests.rs b/crates/backend-p2p/src/sync_tests.rs index e5f0de6..c4de562 100644 --- a/crates/backend-p2p/src/sync_tests.rs +++ b/crates/backend-p2p/src/sync_tests.rs @@ -2794,3 +2794,97 @@ async fn heterogeneous_exec_capability_is_negotiated() { "A must see B as exec-incapable (B did not set advertise_exec)", ); } + +/// An inbound `BepMessage::ExecExit` is routed to the exec-stream consumer +/// registered for `(device_id, session)` as the +/// [`ExecStreamEvent::Exited`] variant, carrying the process's exit code and +/// signal. This is the manager-side half of the exit-code propagation chain. +#[tokio::test] +async fn handle_message_routes_exec_exit_to_consumer() { + use cascade_p2p::exec_stream::ExecStreamEvent; + + let (_dir, engine) = make_engine("test"); + + // Register a consumer for session 42 from peer "PEER". + let mut consumer = engine.subscribe_exec_stream("PEER", 42).await; + + let (outbound_tx, _outbound_rx) = mpsc::unbounded_channel::(); + let pending: Arc>>>> = + Arc::new(Mutex::new(HashMap::new())); + let manage_pending: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + + // The peer advertised the exec domain. + let peer_domains = vec![CapabilityDomain::Exec]; + + engine + .handle_message( + "PEER", + CallerAuthentication::TlsVerified, + BepMessage::ExecExit { + session: 42, + code: Some(7), + signal: None, + }, + &peer_domains, + &outbound_tx, + &pending, + &manage_pending, + ) + .await + .expect("ExecExit must be handled without error"); + + let event = tokio::time::timeout(std::time::Duration::from_secs(2), consumer.recv()) + .await + .expect("consumer must receive the Exited event") + .expect("channel must not close before delivery"); + assert_eq!( + event, + ExecStreamEvent::Exited { + code: Some(7), + signal: None, + } + ); +} + +/// A `BepMessage::ExecExit` from a peer that did not advertise the exec domain +/// is dropped (domain-violation) and never reaches the consumer. +#[tokio::test] +async fn handle_message_drops_exec_exit_without_exec_domain() { + let (_dir, engine) = make_engine("test"); + + let mut consumer = engine.subscribe_exec_stream("PEER", 1).await; + + let (outbound_tx, _outbound_rx) = mpsc::unbounded_channel::(); + let pending: Arc>>>> = + Arc::new(Mutex::new(HashMap::new())); + let manage_pending: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + + // Peer advertised management only — no exec domain. + let peer_domains = vec![CapabilityDomain::Management]; + + engine + .handle_message( + "PEER", + CallerAuthentication::TlsVerified, + BepMessage::ExecExit { + session: 1, + code: Some(0), + signal: None, + }, + &peer_domains, + &outbound_tx, + &pending, + &manage_pending, + ) + .await + .expect("a dropped domain-violation frame is not an error"); + + // The consumer must not receive anything within a short window. + let result = tokio::time::timeout(std::time::Duration::from_millis(100), consumer.recv()).await; + assert!( + result.is_err() || matches!(result, Ok(None)), + "consumer must not receive an exit from a non-exec peer, got {result:?}", + ); +} diff --git a/crates/cascade/src/cli/remote.rs b/crates/cascade/src/cli/remote.rs index c668fbe..488ea81 100644 --- a/crates/cascade/src/cli/remote.rs +++ b/crates/cascade/src/cli/remote.rs @@ -147,9 +147,9 @@ pub enum RemoteCommand { }, /// Run a one-shot command on the remote node under a PTY. The command's /// stdout and stderr are streamed back and written to the terminal; the - /// CLI exits once the node closes the output stream. The exec data plane - /// does not yet carry the process exit code, so it is not reflected in the - /// CLI's own exit status. + /// CLI exits when the node signals the session ended, propagating the + /// remote process's exit code as its own exit status. A process killed by + /// a signal maps to `128 + signal` per the shell convention. /// /// Requires the dangerous `exec:pty` capability over the session's /// working directory, granted explicitly for a folder scope. @@ -566,7 +566,10 @@ async fn run_exec( let mut stream = backend.subscribe_exec_stream(device_id, session_id).await; // Drive the session: multiplex output draining with stdin forwarding - // (shell mode only) using tokio::select. + // (shell mode only) using tokio::select. For a one-shot `exec` the pump + // returns the remote process's exit code; the CLI propagates it as its own + // exit status. For an interactive `shell` the code is irrelevant (the + // session ends on stream close). let pump_result = pump_exec_session( &backend, device_id, @@ -585,12 +588,26 @@ async fn run_exec( .send_pty_signal(device_id, session_id, 15, token) .await; - pump_result + let exit_code = pump_result?; + if !is_shell && let Some(code) = exit_code { + // Propagate the remote process's exit status as this process's own. + // std::process::exit runs no further destructors, but the session + // cleanup above already ran and stdout is flushed by the OS on exit. + std::process::exit(code); + } + Ok(()) } /// Pump the exec session: forward output to the terminal and (for shell mode) -/// forward local stdin to the remote PTY. Returns when the output stream -/// closes (the session ended) or stdin reaches EOF. +/// forward local stdin to the remote PTY. Returns when the session's +/// [`cascade_p2p::exec_stream::ExecStreamEvent::Exited`] arrives (the process +/// exited) or, for a shell, when stdin reaches EOF. +/// +/// Returns `Ok(Some(code))` when a one-shot `exec` received the remote +/// process's exit status (a shell-style code: `code` for a normal exit, +/// `128 + signal` for a signal kill, `1` for an indeterminate exit). Returns +/// `Ok(None)` for an interactive shell — its exit status is not the remote +/// process's. /// /// `token` is re-presented with every `PtyWrite` so a caller authenticated /// only by a capability token (no on-node grant) can drive an interactive @@ -599,26 +616,34 @@ async fn pump_exec_session( backend: &std::sync::Arc, device_id: &str, session_id: u64, - stream: &mut tokio::sync::mpsc::UnboundedReceiver, + stream: &mut tokio::sync::mpsc::UnboundedReceiver, is_shell: bool, token: Option, -) -> Result<()> { +) -> Result> { use std::io::IsTerminal as _; use std::io::Write as _; use tokio::io::AsyncReadExt as _; if !is_shell { // One-shot exec: drain output to the terminal. The remote process's - // exit code is not carried on the exec data plane today (there is no - // exit frame), so the CLI cannot forward it; the session is considered - // done when the node closes the output stream. + // exit code arrives as an ExecStreamEvent::Exited control frame after + // the last output frame; the CLI propagates it as its own exit status. let stdout = std::io::stdout(); - while let Some(frame) = stream.recv().await { - let mut handle = stdout.lock(); - handle.write_all(&frame.bytes)?; - handle.flush()?; + while let Some(event) = stream.recv().await { + match event { + cascade_p2p::exec_stream::ExecStreamEvent::Output(frame) => { + let mut handle = stdout.lock(); + handle.write_all(&frame.bytes)?; + handle.flush()?; + } + cascade_p2p::exec_stream::ExecStreamEvent::Exited { .. } => { + return Ok(event.to_exit_code()); + } + } } - return Ok(()); + // The stream closed without an Exited event (the node went away before + // delivering the exit frame). Treat it as a generic failure. + return Ok(Some(1)); } // Interactive shell: drive a real raw-mode PTY. The local terminal is put @@ -653,13 +678,16 @@ async fn pump_exec_session( loop { tokio::select! { // Output from the remote session. - frame = stream.recv() => match frame { - Some(frame) => { + event = stream.recv() => match event { + Some(cascade_p2p::exec_stream::ExecStreamEvent::Output(frame)) => { let mut handle = stdout.lock(); handle.write_all(&frame.bytes)?; handle.flush()?; } - None => break, + // The remote process exited: end the shell session. The exit + // code is ignored for the interactive shell — its own exit + // status is not the remote process's. + Some(cascade_p2p::exec_stream::ExecStreamEvent::Exited { .. }) | None => break, }, // Local stdin bytes -> remote PtyWrite. In raw mode each keystroke // arrives immediately; ^C arrives as 0x03 and the remote PTY raises @@ -691,7 +719,7 @@ async fn pump_exec_session( } drop(raw); - Ok(()) + Ok(None) } /// Restores the local terminal from raw mode when the shell session ends, diff --git a/crates/p2p/src/exec_stream.rs b/crates/p2p/src/exec_stream.rs index 4b4cd6c..247274a 100644 --- a/crates/p2p/src/exec_stream.rs +++ b/crates/p2p/src/exec_stream.rs @@ -89,7 +89,7 @@ impl WireStreamKind { /// which depends on this crate; the registration therefore lives there rather /// than here, where the data-plane type is defined), so the consumer never sees /// the wire sequence number or raw discriminant. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct ExecStreamFrame { /// The session the bytes belong to. pub session: u64, @@ -99,6 +99,64 @@ pub struct ExecStreamFrame { pub bytes: Vec, } +/// One item delivered to the manager-side exec-stream consumer over its +/// subscription channel. +/// +/// The consumer channel (handed out by `SyncEngine::subscribe_exec_stream`) used +/// to carry only [`ExecStreamFrame`] byte payloads; the terminal +/// [`BepMessage::ExecExit`] control frame is now delivered through the same +/// channel as the [`Self::Exited`] variant, so a one-shot `exec` pump can exit +/// with the remote process's exit code rather than only learning the session +/// ended when the byte stream closes. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ExecStreamEvent { + /// A chunk of stdout/stderr bytes for the session. + Output(ExecStreamFrame), + /// The session's process exited. `code` is the normal exit code (if any); + /// `signal` is the POSIX signal that killed it (if any). Exactly one is + /// `Some` for a normal Unix exit; both `None` means the exit status was + /// indeterminate. A consumer that wants a shell-style exit status maps a + /// signal-killed process to `128 + signal`. + Exited { + /// The process exit code, if it exited normally. + code: Option, + /// The POSIX signal number that terminated the process, if killed by a + /// signal. + signal: Option, + }, +} + +impl ExecStreamEvent { + /// The exit status a one-shot `exec` should propagate, following the shell + /// convention: a normal exit yields `code`, a signal-killed process yields + /// `128 + signal`, and an indeterminate exit (neither set) yields the + /// generic failure status `1`. + #[must_use] + pub const fn to_exit_code(&self) -> Option { + match self { + Self::Output(_) => None, + Self::Exited { code, signal } => Some(exit_code_from_code_signal(*code, *signal)), + } + } +} + +/// Map an optional exit `code` and optional `signal` to a shell-style process +/// exit status. Exposed so the CLI's one-shot `exec` and tests share one +/// definition. +/// +/// - `code` present -> `code`. +/// - `code` absent, `signal` present -> `128 + signal`. +/// - both absent -> `1` (generic failure: the process exited without a +/// determinable status). +#[must_use] +pub const fn exit_code_from_code_signal(code: Option, signal: Option) -> i32 { + match (code, signal) { + (Some(c), _) => c, + (None, Some(s)) => 128 + s, + (None, None) => 1, + } +} + /// Default credit window a consumer advertises, in bytes. /// /// Sized to the node-side output channel's buffering headroom: the local @@ -238,9 +296,12 @@ impl ExecStreamCredit { /// /// Returns when the session's output channel closes (the session ended and all /// pumps dropped their senders) or an [`ExecEvent::Exited`] arrives, whichever -/// comes first. An `Exited` event ends the stream cleanly: the function returns -/// `Ok(())` without tearing down the shared writer, leaving the caller to send a -/// `Close` or continue using the session for other traffic. +/// comes first. An `Exited` event ends the stream cleanly: the pump sends a +/// single [`BepMessage::ExecExit`] control frame carrying the process's exit +/// code and signal (NOT credit-gated — it is one terminal frame sent after the +/// last output frame), then returns `Ok(())` without tearing down the shared +/// writer, leaving the caller to send a `Close` or continue using the session +/// for other traffic. pub async fn pump_session_output( session: ExecSessionId, mut events: tokio::sync::mpsc::Receiver, @@ -280,7 +341,26 @@ pub async fn pump_session_output( credit.record_sent(seq, sent_bytes).await; seq = seq.wrapping_add(1); } - ExecEvent::Exited { .. } => return Ok(()), + ExecEvent::Exited { code, signal } => { + // Terminal control frame: send exactly once, after the last + // output frame. It is NOT credit-gated — it carries no sequence + // number and the manager routes it to the consumer without + // acking, so it must not be throttled by a slow consumer's + // window (the session is over; the only remaining business is + // to deliver the exit status). + let exit_frame = BepMessage::ExecExit { + session: session.0, + code, + signal, + }; + writer + .lock() + .await + .send(&exit_frame) + .await + .context("sending exec exit frame")?; + return Ok(()); + } } } Ok(()) @@ -442,3 +522,61 @@ where } Ok(None) } + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used, clippy::indexing_slicing)] +mod tests { + use super::*; + + #[test] + fn exit_code_normal_exit_yields_code() { + assert_eq!(exit_code_from_code_signal(Some(0), None), 0); + assert_eq!(exit_code_from_code_signal(Some(42), None), 42); + assert_eq!(exit_code_from_code_signal(Some(-1), None), -1); + } + + #[test] + fn exit_code_signal_kill_yields_128_plus_signal() { + assert_eq!(exit_code_from_code_signal(None, Some(9)), 137); + assert_eq!(exit_code_from_code_signal(None, Some(15)), 143); + assert_eq!(exit_code_from_code_signal(None, Some(2)), 130); + } + + #[test] + fn exit_code_indeterminate_yields_generic_failure() { + assert_eq!(exit_code_from_code_signal(None, None), 1); + } + + #[test] + fn exit_code_code_takes_precedence_over_signal() { + // A normal exit code wins even if a signal is also carried (defensive: + // the node sets exactly one for a real Unix exit, but the mapping must + // not panic or surprise if both are present). + assert_eq!(exit_code_from_code_signal(Some(3), Some(9)), 3); + } + + #[test] + fn event_to_exit_code_is_none_for_output() { + let frame = ExecStreamFrame { + session: 1, + stream: WireStreamKind::Stdout, + bytes: Vec::new(), + }; + assert_eq!(ExecStreamEvent::Output(frame).to_exit_code(), None); + } + + #[test] + fn event_to_exit_code_maps_exited() { + let exited = ExecStreamEvent::Exited { + code: Some(42), + signal: None, + }; + assert_eq!(exited.to_exit_code(), Some(42)); + + let killed = ExecStreamEvent::Exited { + code: None, + signal: Some(9), + }; + assert_eq!(killed.to_exit_code(), Some(137)); + } +} diff --git a/crates/p2p/src/protocol.rs b/crates/p2p/src/protocol.rs index 9625aaa..b622c2a 100644 --- a/crates/p2p/src/protocol.rs +++ b/crates/p2p/src/protocol.rs @@ -45,6 +45,16 @@ const MSG_OPLOG_HAVE: u32 = 20; const MSG_OPLOG_REQUEST: u32 = 21; /// Carry a contiguous range of opaque, signed oplog entries. FROZEN at 22. const MSG_OPLOG_DATA: u32 = 22; +/// Carry the exit status of a finished exec session. FROZEN at 23. +/// +/// Sent once by the node's exec output pump after the last +/// [`BepMessage::ExecStream`] output frame, on the session's terminal +/// [`cascade_exec::ExecEvent::Exited`]. It is a single control frame, not +/// credit-gated: the manager routes it to the exec-stream consumer registered +/// for `(device_id, session)` so a one-shot `exec` can exit with the remote +/// process's code (a signal-killed process carries `signal`, which the CLI maps +/// to `128 + signal` per shell convention). +const MSG_EXEC_EXIT: u32 = 23; /// The protocol version this implementation speaks. FROZEN; a version bump is /// gated by the conformance vectors in `docs/conformance/`. @@ -1069,6 +1079,26 @@ pub enum BepMessage { /// The credit window in bytes the consumer will accept past `ack_seq`. window: u32, }, + /// The exit status of a finished exec session. + /// + /// Sent once by the node's exec output pump after the last + /// [`BepMessage::ExecStream`] output frame, on the session's terminal + /// [`cascade_exec::ExecEvent::Exited`]. It is a single control frame, not + /// credit-gated: the manager routes it to the exec-stream consumer + /// registered for `(device_id, session)` so a one-shot `exec` can exit with + /// the remote process's code. A signal-killed process carries `signal`; the + /// CLI maps that to `128 + signal` per shell convention. Exactly one of + /// `code` and `signal` is present for a normal Unix exit; both absent means + /// the exit status was indeterminate. + ExecExit { + /// The session that exited. + session: u64, + /// The process exit code, if it exited normally. + code: Option, + /// The POSIX signal number that terminated the process, if killed by a + /// signal. + signal: Option, + }, /// Advertise the latest sequence of the sender's single-writer oplog for a /// named peer. /// @@ -1150,6 +1180,7 @@ impl BepMessage { Self::Handshake { .. } => MSG_HANDSHAKE, Self::ExecStream { .. } => MSG_EXEC_STREAM, Self::ExecStreamAck { .. } => MSG_EXEC_STREAM_ACK, + Self::ExecExit { .. } => MSG_EXEC_EXIT, Self::OplogHave { .. } => MSG_OPLOG_HAVE, Self::OplogRequest { .. } => MSG_OPLOG_REQUEST, Self::OplogData { .. } => MSG_OPLOG_DATA, @@ -1168,9 +1199,9 @@ impl BepMessage { /// `IndexUpdate`, `Request`, `Response`). /// - `management` covers `ManageRequest` / `ManageResponse`. /// - `exec` covers the live stdio stream frames (`ExecStream`, - /// `ExecStreamAck`); exec *control* travels as `management` frames and is - /// governed by the management domain plus the exec capability grant, not by - /// this mapping. + /// `ExecStreamAck`, `ExecExit`); exec *control* travels as `management` + /// frames and is governed by the management domain plus the exec capability + /// grant, not by this mapping. /// - `oplog` covers `OplogHave` / `OplogRequest` / `OplogData`. /// - Everything else is transport plumbing (the handshake itself, keepalive, /// NAT-traversal and relay frames) that every peer speaks regardless of the @@ -1186,7 +1217,9 @@ impl BepMessage { Self::ManageRequest { .. } | Self::ManageResponse { .. } => { Some(CapabilityDomain::Management) } - Self::ExecStream { .. } | Self::ExecStreamAck { .. } => Some(CapabilityDomain::Exec), + Self::ExecStream { .. } | Self::ExecStreamAck { .. } | Self::ExecExit { .. } => { + Some(CapabilityDomain::Exec) + } Self::OplogHave { .. } | Self::OplogRequest { .. } | Self::OplogData { .. } => { Some(CapabilityDomain::Oplog) } @@ -1368,6 +1401,15 @@ pub fn encode_message(msg: &BepMessage) -> Result> { encode_u64(&mut body, *ack_seq); encode_u32(&mut body, *window); } + BepMessage::ExecExit { + session, + code, + signal, + } => { + encode_u64(&mut body, *session); + encode_opt_i32(&mut body, *code); + encode_opt_i32(&mut body, *signal); + } BepMessage::OplogHave { peer, head_seq } => { encode_string(&mut body, peer)?; encode_u64(&mut body, *head_seq); @@ -1711,6 +1753,19 @@ fn encode_opt_i64(buf: &mut Vec, val: Option) { } } +/// Encode an `Option` as a one-word presence sentinel followed, when +/// present, by the value. Mirrors [`encode_opt_i64`] for the `i32`-typed exit +/// code and signal carried by [`BepMessage::ExecExit`]. +fn encode_opt_i32(buf: &mut Vec, val: Option) { + match val { + None => encode_u32(buf, OPTION_NONE), + Some(v) => { + encode_u32(buf, OPTION_SOME); + encode_i32(buf, v); + } + } +} + /// Encode an `Option<&str>` as a one-word presence sentinel followed, when /// present, by the string. fn encode_opt_string(buf: &mut Vec, val: Option<&str>) -> Result<()> { @@ -1799,6 +1854,7 @@ pub fn decode_message(frame: &[u8]) -> Result { MSG_HANDSHAKE => decode_handshake(rest), MSG_EXEC_STREAM => decode_exec_stream(rest), MSG_EXEC_STREAM_ACK => decode_exec_stream_ack(rest), + MSG_EXEC_EXIT => decode_exec_exit(rest), MSG_OPLOG_HAVE => decode_oplog_have(rest), MSG_OPLOG_REQUEST => decode_oplog_request(rest), MSG_OPLOG_DATA => decode_oplog_data(rest), @@ -2140,6 +2196,17 @@ fn decode_exec_stream_ack(data: &[u8]) -> Result { }) } +fn decode_exec_exit(data: &[u8]) -> Result { + let (session, rest) = decode_u64(data)?; + let (code, rest) = decode_opt_i32(rest)?; + let (signal, _) = decode_opt_i32(rest)?; + Ok(BepMessage::ExecExit { + session, + code, + signal, + }) +} + fn decode_oplog_have(data: &[u8]) -> Result { let (peer, rest) = decode_string(data)?; let (head_seq, _) = decode_u64(rest)?; @@ -2185,6 +2252,19 @@ fn decode_opt_i64(data: &[u8]) -> Result<(Option, &[u8])> { } } +/// Decode an `Option` written by [`encode_opt_i32`]. +fn decode_opt_i32(data: &[u8]) -> Result<(Option, &[u8])> { + let (tag, rest) = decode_u32(data)?; + match tag { + OPTION_NONE => Ok((None, rest)), + OPTION_SOME => { + let (val, rest) = decode_i32(rest)?; + Ok((Some(val), rest)) + } + other => anyhow::bail!("invalid option sentinel {other}"), + } +} + /// Decode an `Option` written by [`encode_opt_string`]. fn decode_opt_string(data: &[u8]) -> Result<(Option, &[u8])> { let (tag, rest) = decode_u32(data)?; diff --git a/crates/p2p/src/protocol_tests.rs b/crates/p2p/src/protocol_tests.rs index 4201746..be4da15 100644 --- a/crates/p2p/src/protocol_tests.rs +++ b/crates/p2p/src/protocol_tests.rs @@ -1195,6 +1195,56 @@ fn encode_decode_exec_stream_ack_round_trip() { }); } +#[test] +fn encode_decode_exec_exit_round_trip() { + // Normal exit with a code, no signal. + round_trip(BepMessage::ExecExit { + session: 42, + code: Some(0), + signal: None, + }); + // Non-zero exit code. + round_trip(BepMessage::ExecExit { + session: u64::MAX, + code: Some(42), + signal: None, + }); + // Signal kill: no code, signal present. + round_trip(BepMessage::ExecExit { + session: 7, + code: None, + signal: Some(9), + }); + // Indeterminate exit: neither set. + round_trip(BepMessage::ExecExit { + session: 1, + code: None, + signal: None, + }); + // Negative code. + round_trip(BepMessage::ExecExit { + session: 3, + code: Some(-1), + signal: None, + }); +} + +#[test] +fn exec_exit_rejects_bad_option_sentinel() { + let mut body = Vec::new(); + encode_u32(&mut body, MSG_EXEC_EXIT); + encode_u64(&mut body, 1); + // An invalid presence sentinel (2) for `code` must fail to decode. + encode_u32(&mut body, 2); + encode_i32(&mut body, 0); + encode_u32(&mut body, OPTION_NONE); + let body_len = u32::try_from(body.len()).unwrap(); + let mut frame = Vec::new(); + encode_u32(&mut frame, body_len); + frame.extend_from_slice(&body); + assert!(decode_message(&frame).is_err()); +} + // ── Exec management commands ── fn exec_request(command: ManageCommand) -> BepMessage { diff --git a/crates/p2p/tests/exec_data_plane.rs b/crates/p2p/tests/exec_data_plane.rs index d0dda33..422faba 100644 --- a/crates/p2p/tests/exec_data_plane.rs +++ b/crates/p2p/tests/exec_data_plane.rs @@ -346,3 +346,121 @@ async fn kill_tears_down_stream_cleanly() { "pump must return Ok after a clean kill teardown: {result:?}" ); } + +/// On `ExecEvent::Exited`, the pump sends exactly one `BepMessage::ExecExit` +/// control frame carrying the process's code/signal, after any output frames, +/// then returns `Ok(())`. The exit frame is not credit-gated and carries no +/// sequence number. +#[tokio::test] +async fn pump_emits_exec_exit_on_exited_event() { + use cascade_exec::ExecEvent; + + let id = ExecSessionId(99); + let (events_tx, events_rx) = mpsc::channel::(8); + + let (producer_t, consumer_t) = ChannelTransport::pair(); + let (_p_reader, p_writer) = FramedSession::new(producer_t).split(); + let (mut c_reader, _c_writer) = FramedSession::new(consumer_t).split(); + + let producer_writer = Arc::new(Mutex::new(p_writer)); + let credit = ExecStreamCredit::new(DEFAULT_CREDIT_WINDOW); + + // Send one output chunk, then the exit event. The pump must emit an + // ExecStream frame followed by an ExecExit frame. + events_tx + .send(ExecEvent::Output { + stream: cascade_exec::ExecStreamKind::Stdout, + bytes: b"hello".to_vec(), + }) + .await + .unwrap(); + events_tx + .send(ExecEvent::Exited { + code: Some(42), + signal: None, + }) + .await + .unwrap(); + + let pump = { + let writer = Arc::clone(&producer_writer); + let credit = Arc::clone(&credit); + tokio::spawn(async move { pump_session_output(id, events_rx, &writer, &credit).await }) + }; + + // First frame is the output chunk. + let output_frame = tokio::time::timeout(Duration::from_secs(5), c_reader.recv()) + .await + .expect("output frame should arrive") + .unwrap() + .expect("output frame must be Some"); + assert!(matches!(output_frame, BepMessage::ExecStream { bytes, .. } if bytes == b"hello")); + + // Second frame is the exit control frame carrying the code. + let exit_frame = tokio::time::timeout(Duration::from_secs(5), c_reader.recv()) + .await + .expect("exit frame should arrive") + .unwrap() + .expect("exit frame must be Some"); + assert_eq!( + exit_frame, + BepMessage::ExecExit { + session: 99, + code: Some(42), + signal: None, + } + ); + + let result = tokio::time::timeout(Duration::from_secs(5), pump) + .await + .expect("pump should return after exit") + .unwrap(); + assert!(result.is_ok(), "pump must return Ok after exit: {result:?}"); +} + +/// A signal-killed process carries `signal` (and no `code`) through the exit +/// frame end to end. +#[tokio::test] +async fn pump_emits_exec_exit_with_signal() { + use cascade_exec::ExecEvent; + + let id = ExecSessionId(5); + let (events_tx, events_rx) = mpsc::channel::(8); + + let (producer_t, consumer_t) = ChannelTransport::pair(); + let (_p_reader, p_writer) = FramedSession::new(producer_t).split(); + let (mut c_reader, _c_writer) = FramedSession::new(consumer_t).split(); + + let producer_writer = Arc::new(Mutex::new(p_writer)); + let credit = ExecStreamCredit::new(DEFAULT_CREDIT_WINDOW); + + events_tx + .send(ExecEvent::Exited { + code: None, + signal: Some(9), + }) + .await + .unwrap(); + + let pump = { + let writer = Arc::clone(&producer_writer); + let credit = Arc::clone(&credit); + tokio::spawn(async move { pump_session_output(id, events_rx, &writer, &credit).await }) + }; + + let exit_frame = tokio::time::timeout(Duration::from_secs(5), c_reader.recv()) + .await + .expect("exit frame should arrive") + .unwrap() + .expect("exit frame must be Some"); + assert_eq!( + exit_frame, + BepMessage::ExecExit { + session: 5, + code: None, + signal: Some(9), + } + ); + + let _ = tokio::time::timeout(Duration::from_secs(5), pump).await; +} diff --git a/docs/conformance/frames.v1.json b/docs/conformance/frames.v1.json index 4f41b55..b526e52 100644 --- a/docs/conformance/frames.v1.json +++ b/docs/conformance/frames.v1.json @@ -8,7 +8,11 @@ "message": { "kind": "Handshake", "protocol_version": 1, - "domains": ["content", "management", "exec"] + "domains": [ + "content", + "management", + "exec" + ] }, "wire_hex": "00000018000000110000000100000003000000000000000100000002" }, @@ -27,7 +31,10 @@ "cols": 80, "rows": 24 }, - "scope": { "kind": "folder", "path": "/work" }, + "scope": { + "kind": "folder", + "path": "/work" + }, "token": null }, "wire_hex": "000000540000000f00000000000000010000000d00000001000000072f62696e2f7368000000000000000001000000052f776f726b00000000000000000000500000001800000001000000052f776f726b00000000000000" @@ -38,8 +45,14 @@ "message": { "kind": "ManageRequest", "request_id": 2, - "command": { "kind": "ProcKill", "session": 7 }, - "scope": { "kind": "folder", "path": "/work" }, + "command": { + "kind": "ProcKill", + "session": 7 + }, + "scope": { + "kind": "folder", + "path": "/work" + }, "token": null }, "wire_hex": "0000002c0000000f000000000000000200000013000000000000000700000001000000052f776f726b00000000000000" @@ -50,7 +63,10 @@ "message": { "kind": "ManageResponse", "request_id": 1, - "result": { "kind": "ExecSpawned", "session": 42 } + "result": { + "kind": "ExecSpawned", + "session": 42 + } }, "wire_hex": "0000001800000010000000000000000100000002000000000000002a" }, @@ -77,6 +93,28 @@ }, "wire_hex": "0000001800000013000000000000002a000000000000000100010000" }, + { + "name": "exec_exit_code", + "msg_type": 23, + "message": { + "kind": "ExecExit", + "session": 42, + "code": 42, + "signal": null + }, + "wire_hex": "0000001800000017000000000000002a000000010000002a00000000" + }, + { + "name": "exec_exit_signal", + "msg_type": 23, + "message": { + "kind": "ExecExit", + "session": 7, + "code": null, + "signal": 9 + }, + "wire_hex": "00000018000000170000000000000007000000000000000100000009" + }, { "name": "oplog_have", "msg_type": 20, @@ -104,7 +142,10 @@ "kind": "OplogData", "peer": "PEER", "from_seq": 1, - "ops_hex": ["6f7032", "6f7033"] + "ops_hex": [ + "6f7032", + "6f7033" + ] }, "wire_hex": "00000028000000160000000450454552000000000000000100000002000000036f703200000000036f703300" } diff --git a/docs/node-protocol.md b/docs/node-protocol.md index 2f0d097..a1b3e96 100644 --- a/docs/node-protocol.md +++ b/docs/node-protocol.md @@ -188,6 +188,7 @@ some) followed, when present, by the value. | 20 | `OplogHave` | oplog | yes | | 21 | `OplogRequest` | oplog | yes | | 22 | `OplogData` | oplog (envelope) | yes | +| 23 | `ExecExit` | exec | yes | Transport frames (the handshake itself, keepalive, NAT-traversal, and relay frames) are domain-independent: every peer speaks them regardless of the @@ -195,8 +196,8 @@ negotiated capability set. The remaining frames are governed by the domain in th table; the receiver refuses an inbound frame whose domain is not in the negotiated set. (Exec *control* travels as `management` frames and is governed by the management domain plus the exec capability grant, not by the exec domain -mapping; the exec domain governs only the `ExecStream`/`ExecStreamAck` stdio -frames.) +mapping; the exec domain governs only the `ExecStream`/`ExecStreamAck`/ +`ExecExit` stdio frames.) ### Handshake (type 17) @@ -245,6 +246,18 @@ Body: `u64 session`, `u64 ack_seq` (highest contiguous sequence accepted), `u32 window` (credit, in bytes, the consumer will accept past `ack_seq`). The producer must not send beyond the window. +### Exec exit (`ExecExit`, type 23) + +Body: `u64 session`, `Option code` (presence sentinel `0`=absent / `1`=present +then `i32`), `Option signal` (same encoding). Sent exactly once by the node's +exec output pump after the last `ExecStream` output frame, on the session's +terminal exit. It is a single control frame, not credit-gated: it carries no +sequence number and the manager routes it to the exec-stream consumer registered +for `(device_id, session)` without acking. Exactly one of `code`/`signal` is +present for a normal Unix exit; both absent means the exit status was +indeterminate (the CLI maps that to exit code `1`). A signal-killed process +carries `signal`; the CLI maps it to `128 + signal` per the shell convention. + ### Oplog sync (types 20–22) - `OplogHave` (20): string `peer`, `u64 head_seq`. @@ -274,8 +287,8 @@ consumed by every implementation's CI: human and the lowercase hex of its full `[len][type][body]` frame. A conformant codec must decode the hex to the message and re-encode the message to exactly the same hex. Covers the handshake, the exec `ManageCommand` verbs, the - `ExecStream`/`ExecStreamAck` stdio frames, and the oplog frames (with arbitrary - opaque entry bytes, since the entry payload is not frozen). + `ExecStream`/`ExecStreamAck`/`ExecExit` stdio frames, and the oplog frames + (with arbitrary opaque entry bytes, since the entry payload is not frozen). - `handshake.v1.json` — for each `(local domains, peer domains)` pair, the expected negotiated set and the domains whose frames the local node must refuse from that peer. Drives the heterogeneous-peer and graceful-degradation rules,