Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/backend-p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ impl P2pBackend {
&self,
device_id: &str,
session_id: u64,
) -> tokio::sync::mpsc::UnboundedReceiver<cascade_p2p::exec_stream::ExecStreamFrame> {
) -> tokio::sync::mpsc::UnboundedReceiver<cascade_p2p::exec_stream::ExecStreamEvent> {
self.sync.subscribe_exec_stream(device_id, session_id).await
}

Expand Down
31 changes: 18 additions & 13 deletions crates/backend-p2p/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use cascade_p2p::block::BlockHash;
use cascade_p2p::candidate::{Candidate, CandidateKind};
use cascade_p2p::connection::ConnectionManager;
use cascade_p2p::discovery::DiscoveredPeer;
use cascade_p2p::exec_stream::ExecStreamFrame;
use cascade_p2p::exec_stream::ExecStreamEvent;
use cascade_p2p::framed::{FramedPeer, FramedSession, SessionReader, SessionWriter};
use cascade_p2p::identity::DeviceIdentity;
use cascade_p2p::nat::{
Expand Down Expand Up @@ -727,13 +727,16 @@ pub struct SyncEngine {
/// When a manager spawns a PTY or process on a remote node, the node
/// streams the session's stdout/stderr back as
/// [`BepMessage::ExecStream`] frames. This map holds the channel that
/// delivers those frames to the manager-side consumer (the CLI's exec /
/// shell commands). The session loop routes each inbound frame to the
/// matching consumer and sends a [`BepMessage::ExecStreamAck`] back so
/// the node's producer honours the backpressure window. A consumer that
/// drops its receiver is removed from the map on the next frame.
/// delivers those frames (as [`ExecStreamEvent`] items — both
/// [`ExecStreamEvent::Output`] byte frames and the terminal
/// [`ExecStreamEvent::Exited`] carrying the process exit status) to the
/// manager-side consumer (the CLI's exec / shell commands). The session
/// loop routes each inbound frame to the matching consumer and sends a
/// [`BepMessage::ExecStreamAck`] back so the node's producer honours the
/// backpressure window. A consumer that drops its receiver is removed from
/// the map on the next frame.
exec_stream_consumers:
Arc<Mutex<HashMap<(String, u64), mpsc::UnboundedSender<ExecStreamFrame>>>>,
Arc<Mutex<HashMap<(String, u64), mpsc::UnboundedSender<ExecStreamEvent>>>>,
}

impl std::fmt::Debug for SyncEngine {
Expand Down Expand Up @@ -2339,16 +2342,18 @@ impl SyncEngine {
///
/// The session loop routes each [`BepMessage::ExecStream`] frame for the
/// named `(device_id, session_id)` pair to this receiver as a typed
/// [`ExecStreamFrame`], and sends a [`BepMessage::ExecStreamAck`] back so
/// the node's producer keeps flowing. A manager calls this *before*
/// sending the `PtySpawn` / `ProcSpawn` that mints `session_id`, so the
/// first output frame does not race the registration. Dropping the
/// receiver removes the entry on the next frame.
/// [`ExecStreamEvent::Output`], sends a [`BepMessage::ExecStreamAck`] back
/// so the node's producer keeps flowing, and delivers the terminal
/// [`BepMessage::ExecExit`] as [`ExecStreamEvent::Exited`] so a one-shot
/// `exec` pump can exit with the remote process's exit code. A manager
/// calls this *before* sending the `PtySpawn` / `ProcSpawn` that mints
/// `session_id`, so the first output frame does not race the registration.
/// Dropping the receiver removes the entry on the next frame.
pub async fn subscribe_exec_stream(
&self,
device_id: &str,
session_id: u64,
) -> mpsc::UnboundedReceiver<cascade_p2p::exec_stream::ExecStreamFrame> {
) -> mpsc::UnboundedReceiver<cascade_p2p::exec_stream::ExecStreamEvent> {
let (tx, rx) = mpsc::unbounded_channel();
let mut consumers = self.exec_stream_consumers.lock().await;
consumers.insert((device_id.to_owned(), session_id), tx);
Expand Down
72 changes: 60 additions & 12 deletions crates/backend-p2p/src/sync/relay_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1210,11 +1210,13 @@ impl SyncEngine {
let consumers = self.exec_stream_consumers.lock().await;
consumers.get(&key).is_some_and(|sender| {
sender
.send(cascade_p2p::exec_stream::ExecStreamFrame {
session,
stream: kind,
bytes: bytes.clone(),
})
.send(cascade_p2p::exec_stream::ExecStreamEvent::Output(
cascade_p2p::exec_stream::ExecStreamFrame {
session,
stream: kind,
bytes: bytes.clone(),
},
))
.is_ok()
})
};
Expand Down Expand Up @@ -1262,16 +1264,62 @@ impl SyncEngine {
);
Ok(())
}
BepMessage::ExecExit {
session,
code,
signal,
} => {
// A peer that did not advertise the exec capability domain must
// not send exec frames. Drop with a domain-violation log rather
// than tearing the session down.
if !peer_domains.contains(&CapabilityDomain::Exec) {
debug!(
target: "cascade::backend::p2p",
peer = %peer_device_id,
session,
"dropping exec exit frame — peer did not advertise exec capability",
);
return Ok(());
}
// Deliver the terminal exit status to the consumer registered for
// this (peer, session) pair as the Exited variant. Unlike byte
// frames this carries no sequence number and is not acked: it is
// a single control frame sent after the last output frame, so
// the backpressure window does not apply.
let key = (peer_device_id.to_owned(), session);
let consumer_alive = {
let consumers = self.exec_stream_consumers.lock().await;
consumers.get(&key).is_some_and(|sender| {
sender
.send(cascade_p2p::exec_stream::ExecStreamEvent::Exited {
code,
signal,
})
.is_ok()
})
};
if !consumer_alive {
debug!(
target: "cascade::backend::p2p",
peer = %peer_device_id,
session,
"dropping exec exit frame — no live consumer",
);
let mut consumers = self.exec_stream_consumers.lock().await;
consumers.remove(&key);
}
Ok(())
}
BepMessage::OplogHave { .. }
| BepMessage::OplogRequest { .. }
| BepMessage::OplogData { .. } => {
// This node does not advertise the oplog capability domain, so a
// peer must never negotiate it and never send these frames. Per
// the node protocol a frame for an un-negotiated domain is dropped
// (with a domain-violation log), never guessed at and never a
// reason to tear the session down — the peer can still serve
// content, management, and exec frames. Wiring oplog sync is a
// later step that lands with the entry-payload co-design.
// This node does not advertise the oplog capability domain, so
// a peer must never negotiate it and never send these frames.
// Per the node protocol a frame for an un-negotiated domain is
// dropped (with a domain-violation log), never guessed at and
// never a reason to tear the session down — the peer can still
// serve content, management, and exec frames. Wiring oplog sync
// is a later step that lands with the entry-payload co-design.
debug!(
target: "cascade::backend::p2p",
peer = %peer_device_id,
Expand Down
94 changes: 94 additions & 0 deletions crates/backend-p2p/src/sync_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2794,3 +2794,97 @@ async fn heterogeneous_exec_capability_is_negotiated() {
"A must see B as exec-incapable (B did not set advertise_exec)",
);
}

/// An inbound `BepMessage::ExecExit` is routed to the exec-stream consumer
/// registered for `(device_id, session)` as the
/// [`ExecStreamEvent::Exited`] variant, carrying the process's exit code and
/// signal. This is the manager-side half of the exit-code propagation chain.
#[tokio::test]
async fn handle_message_routes_exec_exit_to_consumer() {
use cascade_p2p::exec_stream::ExecStreamEvent;

let (_dir, engine) = make_engine("test");

// Register a consumer for session 42 from peer "PEER".
let mut consumer = engine.subscribe_exec_stream("PEER", 42).await;

let (outbound_tx, _outbound_rx) = mpsc::unbounded_channel::<BepMessage>();
let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Vec<u8>>>>> =
Arc::new(Mutex::new(HashMap::new()));
let manage_pending: Arc<Mutex<HashMap<u64, oneshot::Sender<ManageResult>>>> =
Arc::new(Mutex::new(HashMap::new()));

// The peer advertised the exec domain.
let peer_domains = vec![CapabilityDomain::Exec];

engine
.handle_message(
"PEER",
CallerAuthentication::TlsVerified,
BepMessage::ExecExit {
session: 42,
code: Some(7),
signal: None,
},
&peer_domains,
&outbound_tx,
&pending,
&manage_pending,
)
.await
.expect("ExecExit must be handled without error");

let event = tokio::time::timeout(std::time::Duration::from_secs(2), consumer.recv())
.await
.expect("consumer must receive the Exited event")
.expect("channel must not close before delivery");
assert_eq!(
event,
ExecStreamEvent::Exited {
code: Some(7),
signal: None,
}
);
}

/// A `BepMessage::ExecExit` from a peer that did not advertise the exec domain
/// is dropped (domain-violation) and never reaches the consumer.
#[tokio::test]
async fn handle_message_drops_exec_exit_without_exec_domain() {
let (_dir, engine) = make_engine("test");

let mut consumer = engine.subscribe_exec_stream("PEER", 1).await;

let (outbound_tx, _outbound_rx) = mpsc::unbounded_channel::<BepMessage>();
let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Vec<u8>>>>> =
Arc::new(Mutex::new(HashMap::new()));
let manage_pending: Arc<Mutex<HashMap<u64, oneshot::Sender<ManageResult>>>> =
Arc::new(Mutex::new(HashMap::new()));

// Peer advertised management only — no exec domain.
let peer_domains = vec![CapabilityDomain::Management];

engine
.handle_message(
"PEER",
CallerAuthentication::TlsVerified,
BepMessage::ExecExit {
session: 1,
code: Some(0),
signal: None,
},
&peer_domains,
&outbound_tx,
&pending,
&manage_pending,
)
.await
.expect("a dropped domain-violation frame is not an error");

// The consumer must not receive anything within a short window.
let result = tokio::time::timeout(std::time::Duration::from_millis(100), consumer.recv()).await;
assert!(
result.is_err() || matches!(result, Ok(None)),
"consumer must not receive an exit from a non-exec peer, got {result:?}",
);
}
70 changes: 49 additions & 21 deletions crates/cascade/src/cli/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ pub enum RemoteCommand {
},
/// Run a one-shot command on the remote node under a PTY. The command's
/// stdout and stderr are streamed back and written to the terminal; the
/// CLI exits once the node closes the output stream. The exec data plane
/// does not yet carry the process exit code, so it is not reflected in the
/// CLI's own exit status.
/// CLI exits when the node signals the session ended, propagating the
/// remote process's exit code as its own exit status. A process killed by
/// a signal maps to `128 + signal` per the shell convention.
///
/// Requires the dangerous `exec:pty` capability over the session's
/// working directory, granted explicitly for a folder scope.
Expand Down Expand Up @@ -566,7 +566,10 @@ async fn run_exec(
let mut stream = backend.subscribe_exec_stream(device_id, session_id).await;

// Drive the session: multiplex output draining with stdin forwarding
// (shell mode only) using tokio::select.
// (shell mode only) using tokio::select. For a one-shot `exec` the pump
// returns the remote process's exit code; the CLI propagates it as its own
// exit status. For an interactive `shell` the code is irrelevant (the
// session ends on stream close).
let pump_result = pump_exec_session(
&backend,
device_id,
Expand All @@ -585,12 +588,26 @@ async fn run_exec(
.send_pty_signal(device_id, session_id, 15, token)
.await;

pump_result
let exit_code = pump_result?;
if !is_shell && let Some(code) = exit_code {
// Propagate the remote process's exit status as this process's own.
// std::process::exit runs no further destructors, but the session
// cleanup above already ran and stdout is flushed by the OS on exit.
std::process::exit(code);
}
Ok(())
}

/// Pump the exec session: forward output to the terminal and (for shell mode)
/// forward local stdin to the remote PTY. Returns when the output stream
/// closes (the session ended) or stdin reaches EOF.
/// forward local stdin to the remote PTY. Returns when the session's
/// [`cascade_p2p::exec_stream::ExecStreamEvent::Exited`] arrives (the process
/// exited) or, for a shell, when stdin reaches EOF.
///
/// Returns `Ok(Some(code))` when a one-shot `exec` received the remote
/// process's exit status (a shell-style code: `code` for a normal exit,
/// `128 + signal` for a signal kill, `1` for an indeterminate exit). Returns
/// `Ok(None)` for an interactive shell — its exit status is not the remote
/// process's.
///
/// `token` is re-presented with every `PtyWrite` so a caller authenticated
/// only by a capability token (no on-node grant) can drive an interactive
Expand All @@ -599,26 +616,34 @@ async fn pump_exec_session(
backend: &std::sync::Arc<cascade_backend_p2p::P2pBackend>,
device_id: &str,
session_id: u64,
stream: &mut tokio::sync::mpsc::UnboundedReceiver<cascade_p2p::exec_stream::ExecStreamFrame>,
stream: &mut tokio::sync::mpsc::UnboundedReceiver<cascade_p2p::exec_stream::ExecStreamEvent>,
is_shell: bool,
token: Option<String>,
) -> Result<()> {
) -> Result<Option<i32>> {
use std::io::IsTerminal as _;
use std::io::Write as _;
use tokio::io::AsyncReadExt as _;

if !is_shell {
// One-shot exec: drain output to the terminal. The remote process's
// exit code is not carried on the exec data plane today (there is no
// exit frame), so the CLI cannot forward it; the session is considered
// done when the node closes the output stream.
// exit code arrives as an ExecStreamEvent::Exited control frame after
// the last output frame; the CLI propagates it as its own exit status.
let stdout = std::io::stdout();
while let Some(frame) = stream.recv().await {
let mut handle = stdout.lock();
handle.write_all(&frame.bytes)?;
handle.flush()?;
while let Some(event) = stream.recv().await {
match event {
cascade_p2p::exec_stream::ExecStreamEvent::Output(frame) => {
let mut handle = stdout.lock();
handle.write_all(&frame.bytes)?;
handle.flush()?;
}
cascade_p2p::exec_stream::ExecStreamEvent::Exited { .. } => {
return Ok(event.to_exit_code());
}
}
}
return Ok(());
// The stream closed without an Exited event (the node went away before
// delivering the exit frame). Treat it as a generic failure.
return Ok(Some(1));
}

// Interactive shell: drive a real raw-mode PTY. The local terminal is put
Expand Down Expand Up @@ -653,13 +678,16 @@ async fn pump_exec_session(
loop {
tokio::select! {
// Output from the remote session.
frame = stream.recv() => match frame {
Some(frame) => {
event = stream.recv() => match event {
Some(cascade_p2p::exec_stream::ExecStreamEvent::Output(frame)) => {
let mut handle = stdout.lock();
handle.write_all(&frame.bytes)?;
handle.flush()?;
}
None => break,
// The remote process exited: end the shell session. The exit
// code is ignored for the interactive shell — its own exit
// status is not the remote process's.
Some(cascade_p2p::exec_stream::ExecStreamEvent::Exited { .. }) | None => break,
},
// Local stdin bytes -> remote PtyWrite. In raw mode each keystroke
// arrives immediately; ^C arrives as 0x03 and the remote PTY raises
Expand Down Expand Up @@ -691,7 +719,7 @@ async fn pump_exec_session(
}

drop(raw);
Ok(())
Ok(None)
}

/// Restores the local terminal from raw mode when the shell session ends,
Expand Down
Loading
Loading