From 83729d27bac368333563a843950b1d2fa43d8e96 Mon Sep 17 00:00:00 2001 From: Chris O'Neil Date: Wed, 22 Apr 2026 16:41:36 +0100 Subject: [PATCH] fix(node): handle ant-node auto-upgrade transparently ant-node replaces its binary on disk and later exits so a service manager can restart it. The daemon was classifying that exit as a clean stop, leaving nodes marked `Stopped` with a stale version and tempting users to restart them manually. Nodes spawned by the daemon now run with `--stop-on-upgrade`, and the supervisor polls each running node's on-disk binary version every 60s. When the disk version drifts from the registry, the node transitions to a new `NodeStatus::UpgradeScheduled` variant (with `pending_version` on the status payload) and `NodeEvent::UpgradeScheduled` fires. On process exit, the supervisor respawns the node against the new binary, refreshes `NodeConfig.version` in the registry, and emits `NodeEvent::NodeUpgraded`. `Stopped` is now reserved for user-initiated stops only. Co-Authored-By: Claude Opus 4.7 (1M context) --- .gitignore | 1 + ant-cli/src/commands/node/status.rs | 11 +- ant-core/src/node/binary.rs | 5 +- ant-core/src/node/daemon/server.rs | 41 ++- ant-core/src/node/daemon/supervisor.rs | 342 ++++++++++++++++++++++++- ant-core/src/node/events.rs | 42 +++ ant-core/src/node/mod.rs | 1 + ant-core/src/node/types.rs | 44 ++++ 8 files changed, 463 insertions(+), 24 deletions(-) diff --git a/.gitignore b/.gitignore index d6647de..fc94dea 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ /target .cargo/config.toml .claude/plans/ +.claude/scheduled_tasks.lock .claude/settings.local.json .claude/skills/ .planning/ diff --git a/ant-cli/src/commands/node/status.rs b/ant-cli/src/commands/node/status.rs index 06f8031..31fc6e1 100644 --- a/ant-cli/src/commands/node/status.rs +++ b/ant-cli/src/commands/node/status.rs @@ -47,12 +47,19 @@ impl StatusArgs { NodeStatus::Starting => format!("{} {}", "●".yellow(), "Starting".yellow()), NodeStatus::Stopping => format!("{} {}", "●".yellow(), "Stopping".yellow()), NodeStatus::Errored => format!("{} {}", "●".red(), "Errored".red()), + NodeStatus::UpgradeScheduled => { + format!("{} {}", "●".cyan(), "Upgrade scheduled".cyan()) + } + }; + let version_display = match &node.pending_version { + Some(pending) => format!("{} → {}", node.version, pending), + None => node.version.clone(), }; println!( - " {:<4} {:<14} {:<10} {}", + " {:<4} {:<14} {:<18} {}", node.node_id.to_string().bold(), node.name, - node.version.dimmed(), + version_display.dimmed(), status_display ); } diff --git a/ant-core/src/node/binary.rs b/ant-core/src/node/binary.rs index 220cd62..67e1a3e 100644 --- a/ant-core/src/node/binary.rs +++ b/ant-core/src/node/binary.rs @@ -399,7 +399,10 @@ pub fn extract_zip(data: &[u8], install_dir: &Path, binary_name: &str) -> Result } /// Extract the version string from a node binary by running ` --version`. -async fn extract_version(binary_path: &Path) -> Result { +/// +/// `pub(crate)` so the supervisor can poll the on-disk binary's version to detect +/// auto-upgrade state without duplicating the parse logic. +pub(crate) async fn extract_version(binary_path: &Path) -> Result { let output = tokio::process::Command::new(binary_path) .arg("--version") .output() diff --git a/ant-core/src/node/daemon/server.rs b/ant-core/src/node/daemon/server.rs index d101d43..5b9c7e6 100644 --- a/ant-core/src/node/daemon/server.rs +++ b/ant-core/src/node/daemon/server.rs @@ -15,7 +15,7 @@ use tokio_util::sync::CancellationToken; use crate::error::Result; use crate::node::binary::NoopProgress; -use crate::node::daemon::supervisor::Supervisor; +use crate::node::daemon::supervisor::{spawn_upgrade_monitor, Supervisor, UPGRADE_POLL_INTERVAL}; use crate::node::events::NodeEvent; use crate::node::registry::NodeRegistry; use crate::node::types::{ @@ -26,7 +26,7 @@ use crate::node::types::{ /// Shared application state for the daemon HTTP server. pub struct AppState { - pub registry: RwLock, + pub registry: Arc>, pub supervisor: Arc>, pub event_tx: broadcast::Sender, pub start_time: Instant, @@ -53,15 +53,28 @@ pub async fn start( .local_addr() .map_err(|e| crate::error::Error::BindError(e.to_string()))?; + let registry = Arc::new(RwLock::new(registry)); + let supervisor = Arc::new(RwLock::new(Supervisor::new(event_tx.clone()))); + let state = Arc::new(AppState { - registry: RwLock::new(registry), - supervisor: Arc::new(RwLock::new(Supervisor::new(event_tx.clone()))), + registry: registry.clone(), + supervisor: supervisor.clone(), event_tx, start_time: Instant::now(), config: config.clone(), bound_port: bound_addr.port(), }); + // Background task: probe each Running node's on-disk binary for version drift caused by + // ant-node's auto-upgrade, and flip them to UpgradeScheduled so the supervisor knows the + // next exit is expected. + spawn_upgrade_monitor( + registry, + supervisor, + UPGRADE_POLL_INTERVAL, + shutdown.clone(), + ); + let app = build_router(state.clone()); // Write port and PID files @@ -173,12 +186,15 @@ async fn get_nodes_status(State(state): State>) -> Json total_running += 1, + NodeStatus::Running | NodeStatus::Starting | NodeStatus::UpgradeScheduled => { + total_running += 1 + } _ => total_stopped += 1, } let pid = supervisor.node_pid(config.id); let uptime_secs = supervisor.node_uptime_secs(config.id); + let pending_version = supervisor.node_pending_version(config.id); nodes.push(NodeStatusSummary { node_id: config.id, @@ -187,6 +203,7 @@ async fn get_nodes_status(State(state): State>) -> Json Ok(Json(started)), Err(crate::error::Error::NodeAlreadyRunning(id)) => { let pid = supervisor.node_pid(id); @@ -364,6 +387,7 @@ async fn post_start_all(State(state): State>) -> Json>) -> Json started.push(result), Err(crate::error::Error::NodeAlreadyRunning(id)) => { already_running.push(id); diff --git a/ant-core/src/node/daemon/supervisor.rs b/ant-core/src/node/daemon/supervisor.rs index 18d1c41..6dee0b5 100644 --- a/ant-core/src/node/daemon/supervisor.rs +++ b/ant-core/src/node/daemon/supervisor.rs @@ -3,14 +3,20 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::{broadcast, RwLock}; +use tokio_util::sync::CancellationToken; use crate::error::{Error, Result}; +use crate::node::binary::extract_version; use crate::node::events::NodeEvent; use crate::node::process::spawn::spawn_node; +use crate::node::registry::NodeRegistry; use crate::node::types::{ NodeConfig, NodeStarted, NodeStatus, NodeStopFailed, NodeStopped, StopNodeResult, }; +/// How often the upgrade-detection task polls each running node's binary for a version change. +pub const UPGRADE_POLL_INTERVAL: Duration = Duration::from_secs(60); + /// Maximum restart attempts before marking a node as errored. const MAX_CRASHES_BEFORE_ERRORED: u32 = 5; @@ -37,6 +43,8 @@ struct NodeRuntime { started_at: Option, restart_count: u32, first_crash_at: Option, + /// When `status == UpgradeScheduled`, the target version the on-disk binary now reports. + pending_version: Option, } impl Supervisor { @@ -55,6 +63,7 @@ impl Supervisor { &mut self, config: &NodeConfig, supervisor_ref: Arc>, + registry_ref: Arc>, ) -> Result { let node_id = config.id; @@ -96,6 +105,7 @@ impl Supervisor { started_at: None, restart_count: 0, first_crash_at: None, + pending_version: None, }, ); return Err(Error::ProcessSpawn(format!( @@ -118,6 +128,7 @@ impl Supervisor { started_at: Some(Instant::now()), restart_count: 0, first_crash_at: None, + pending_version: None, }, ); @@ -133,7 +144,7 @@ impl Supervisor { let event_tx = self.event_tx.clone(); let config = config.clone(); tokio::spawn(async move { - monitor_node(child, config, supervisor_ref, event_tx).await; + monitor_node(child, config, supervisor_ref, registry_ref, event_tx).await; }); Ok(result) @@ -241,6 +252,34 @@ impl Supervisor { .and_then(|s| s.started_at.map(|t| t.elapsed().as_secs())) } + /// The target version when the node is in `UpgradeScheduled` state, otherwise `None`. + pub fn node_pending_version(&self, node_id: u32) -> Option { + self.node_states + .get(&node_id) + .and_then(|s| s.pending_version.clone()) + } + + /// Transition a Running node into `UpgradeScheduled` with the target version. + /// + /// Only affects nodes currently in `Running`: any other state is left alone (a stopped + /// node legitimately has an out-of-date binary; a node already in UpgradeScheduled has + /// already been marked). Returns `true` if the transition happened. + fn mark_upgrade_scheduled(&mut self, node_id: u32, pending_version: String) -> bool { + let Some(state) = self.node_states.get_mut(&node_id) else { + return false; + }; + if state.status != NodeStatus::Running { + return false; + } + state.status = NodeStatus::UpgradeScheduled; + state.pending_version = Some(pending_version.clone()); + let _ = self.event_tx.send(NodeEvent::UpgradeScheduled { + node_id, + pending_version, + }); + true + } + /// Check whether a node is running. pub fn is_running(&self, node_id: u32) -> bool { self.node_states @@ -255,7 +294,10 @@ impl Supervisor { let mut errored = 0u32; for state in self.node_states.values() { match state.status { - NodeStatus::Running | NodeStatus::Starting => running += 1, + // UpgradeScheduled means the process is still running; count it with running. + NodeStatus::Running | NodeStatus::Starting | NodeStatus::UpgradeScheduled => { + running += 1 + } NodeStatus::Stopped | NodeStatus::Stopping => stopped += 1, NodeStatus::Errored => errored += 1, } @@ -319,6 +361,73 @@ impl Supervisor { } } +/// Periodically probe each Running node's on-disk binary for a version change. +/// +/// When a node's binary-on-disk reports a different version than was recorded in the registry +/// at `ant node add` time, ant-node has replaced the binary in place as part of its auto-upgrade +/// flow and will restart the process shortly. We flip the node to `UpgradeScheduled` with the +/// target version, which lets `ant node status` render the in-between state and lets +/// `monitor_node` reclassify the upcoming clean exit as an expected restart rather than a crash. +/// +/// The task exits when `shutdown` is cancelled. +pub fn spawn_upgrade_monitor( + registry: Arc>, + supervisor: Arc>, + interval: Duration, + shutdown: CancellationToken, +) { + tokio::spawn(async move { + let mut ticker = tokio::time::interval(interval); + // Skip the immediate first tick — we don't want to probe while nodes are still in the + // Starting -> Running transition. + ticker.tick().await; + + loop { + tokio::select! { + _ = shutdown.cancelled() => return, + _ = ticker.tick() => {}, + } + + // Collect a snapshot of (node_id, binary_path, recorded_version, current_pending) + // to release the locks before running --version subprocesses (which take time). + let candidates: Vec<(u32, std::path::PathBuf, String, Option)> = { + let reg = registry.read().await; + let sup = supervisor.read().await; + reg.list() + .into_iter() + .filter_map(|config| match sup.node_status(config.id) { + Ok(NodeStatus::Running) => Some(( + config.id, + config.binary_path.clone(), + config.version.clone(), + sup.node_pending_version(config.id), + )), + _ => None, + }) + .collect() + }; + + for (node_id, binary_path, recorded_version, current_pending) in candidates { + let observed = match extract_version(&binary_path).await { + Ok(v) => v, + // Transient failures (e.g. binary mid-replacement) — skip this round. + Err(_) => continue, + }; + if observed == recorded_version { + continue; + } + if current_pending.as_deref() == Some(observed.as_str()) { + continue; + } + supervisor + .write() + .await + .mark_upgrade_scheduled(node_id, observed); + } + } + }); +} + /// Build CLI arguments for the node binary from a NodeConfig. pub fn build_node_args(config: &NodeConfig) -> Vec { let mut args = vec![ @@ -349,6 +458,12 @@ pub fn build_node_args(config: &NodeConfig) -> Vec { args.push(peer.clone()); } + // The daemon's supervisor is the service manager. Tell ant-node not to spawn its own + // replacement on auto-upgrade; instead, exit cleanly and let us respawn. Without this, + // ant-node's default spawn-grandchild-then-exit flow races for the node's port during + // the parent's graceful shutdown and the grandchild fails to bind. + args.push("--stop-on-upgrade".to_string()); + args } @@ -368,8 +483,9 @@ async fn spawn_node_from_config(config: &NodeConfig) -> Result>, + registry: Arc>, event_tx: broadcast::Sender, ) { let node_id = config.id; @@ -378,27 +494,81 @@ async fn monitor_node( // Wait for the process to exit let exit_status = child.wait().await; - // Check if the node was intentionally stopped - { + // Check whether this is a scheduled upgrade restart or an intentional stop. + let status_at_exit = { let sup = supervisor.read().await; - if let Ok(status) = sup.node_status(node_id) { - if status == NodeStatus::Stopped || status == NodeStatus::Stopping { - return; + sup.node_status(node_id).ok() + }; + + match status_at_exit { + Some(NodeStatus::Stopped) | Some(NodeStatus::Stopping) => return, + Some(NodeStatus::UpgradeScheduled) => { + // ant-node cleanly exited after replacing its binary in place. Respawn + // directly (no backoff, no crash counter) and refresh the recorded version. + match respawn_upgraded_node(&mut config, &supervisor, ®istry, &event_tx).await { + Ok(new_child) => { + child = new_child; + continue; + } + Err(e) => { + let _ = event_tx.send(NodeEvent::NodeErrored { + node_id, + message: format!("Failed to respawn after upgrade: {e}"), + }); + let mut sup = supervisor.write().await; + sup.update_state(node_id, NodeStatus::Errored, None); + return; + } } } + _ => {} } let exit_code = exit_status.ok().and_then(|s| s.code()); + // A process-reported exit that wasn't user-initiated (Stopping was filtered above) is + // either an auto-upgrade (exit 0 after ant-node replaced its binary) or a crash. In + // neither case should the node be parked in `Stopped` — that state is reserved for + // intentional user stops. + // + // Distinguish upgrade from crash by checking whether the on-disk binary's version + // drifted from the registry. Between replacing its binary and actually exiting, + // ant-node can hold the process open for anywhere from seconds to minutes, depending + // on in-flight work and its own config. The periodic version poll will usually have + // flipped the node to `UpgradeScheduled` well before the exit, but when the window is + // short we cannot rely on that — hence this synchronous re-check here. if exit_code == Some(0) { - // Clean exit - let mut sup = supervisor.write().await; - sup.update_state(node_id, NodeStatus::Stopped, None); - let _ = event_tx.send(NodeEvent::NodeStopped { node_id }); - return; + if let Ok(disk_version) = extract_version(&config.binary_path).await { + if disk_version != config.version { + { + let mut sup = supervisor.write().await; + sup.mark_upgrade_scheduled(node_id, disk_version.clone()); + } + match respawn_upgraded_node(&mut config, &supervisor, ®istry, &event_tx) + .await + { + Ok(new_child) => { + child = new_child; + continue; + } + Err(e) => { + let _ = event_tx.send(NodeEvent::NodeErrored { + node_id, + message: format!("Failed to respawn after upgrade: {e}"), + }); + let mut sup = supervisor.write().await; + sup.update_state(node_id, NodeStatus::Errored, None); + return; + } + } + } + } + // Exit 0 but the binary didn't change — fall through to the crash / restart path. + // We report the crash with the exit code preserved; the crash counter guards + // against infinite restart loops if the process keeps exiting immediately. } - // Crash + // Crash (or clean exit that wasn't an upgrade) let _ = event_tx.send(NodeEvent::NodeCrashed { node_id, exit_code }); let (should_restart, attempt, backoff) = { @@ -459,6 +629,61 @@ async fn monitor_node( } } +/// Respawn a node whose `UpgradeScheduled` status tells us the exit was expected. +/// +/// On success: persists the new version to the registry, updates the in-memory config clone, +/// clears pending_version, sets status back to Running, and fires `NodeUpgraded`. +async fn respawn_upgraded_node( + config: &mut NodeConfig, + supervisor: &Arc>, + registry: &Arc>, + event_tx: &broadcast::Sender, +) -> Result { + let node_id = config.id; + let old_version = config.version.clone(); + + let new_child = spawn_node_from_config(config).await?; + let pid = new_child + .id() + .ok_or_else(|| Error::ProcessSpawn("Failed to get PID after upgrade respawn".into()))?; + + // Read the new version from the replaced binary. If this fails we still consider the respawn + // successful; we just don't refresh the recorded version this round. + let new_version = extract_version(&config.binary_path).await.ok(); + + if let Some(ref version) = new_version { + config.version = version.clone(); + let mut reg = registry.write().await; + if let Ok(stored) = reg.get_mut(node_id) { + stored.version = version.clone(); + let _ = reg.save(); + } + } + + { + let mut sup = supervisor.write().await; + if let Some(state) = sup.node_states.get_mut(&node_id) { + state.status = NodeStatus::Running; + state.pid = Some(pid); + state.started_at = Some(Instant::now()); + state.pending_version = None; + state.restart_count = 0; + state.first_crash_at = None; + } + } + + let _ = event_tx.send(NodeEvent::NodeStarted { node_id, pid }); + if let Some(version) = new_version { + let _ = event_tx.send(NodeEvent::NodeUpgraded { + node_id, + old_version, + new_version: version, + }); + } + + Ok(new_child) +} + /// Timeout for graceful shutdown before force-killing. const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10); @@ -627,6 +852,7 @@ mod tests { assert!(args.contains(&"--bootstrap".to_string())); assert!(args.contains(&"peer1".to_string())); assert!(args.contains(&"peer2".to_string())); + assert!(args.contains(&"--stop-on-upgrade".to_string())); } #[test] @@ -655,6 +881,7 @@ mod tests { assert!(!args.contains(&"--port".to_string())); assert!(!args.contains(&"--metrics-port".to_string())); assert!(!args.contains(&"--bootstrap".to_string())); + assert!(args.contains(&"--stop-on-upgrade".to_string())); } #[test] @@ -671,6 +898,7 @@ mod tests { started_at: Some(Instant::now()), restart_count: 0, first_crash_at: None, + pending_version: None, }, ); @@ -714,6 +942,7 @@ mod tests { started_at: Some(Instant::now()), restart_count: 0, first_crash_at: None, + pending_version: None, }, ); sup.node_states.insert( @@ -724,6 +953,7 @@ mod tests { started_at: None, restart_count: 0, first_crash_at: None, + pending_version: None, }, ); sup.node_states.insert( @@ -734,6 +964,7 @@ mod tests { started_at: None, restart_count: 5, first_crash_at: None, + pending_version: None, }, ); @@ -743,6 +974,86 @@ mod tests { assert_eq!(errored, 1); } + #[test] + fn mark_upgrade_scheduled_only_affects_running_nodes() { + let (tx, mut rx) = broadcast::channel(16); + let mut sup = Supervisor::new(tx); + + sup.node_states.insert( + 1, + NodeRuntime { + status: NodeStatus::Running, + pid: Some(111), + started_at: Some(Instant::now()), + restart_count: 0, + first_crash_at: None, + pending_version: None, + }, + ); + sup.node_states.insert( + 2, + NodeRuntime { + status: NodeStatus::Stopped, + pid: None, + started_at: None, + restart_count: 0, + first_crash_at: None, + pending_version: None, + }, + ); + + // Running node: transitions to UpgradeScheduled with pending_version set and event fires. + let affected = sup.mark_upgrade_scheduled(1, "0.10.11-rc.1".to_string()); + assert!(affected); + assert_eq!(sup.node_status(1).unwrap(), NodeStatus::UpgradeScheduled); + assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1")); + match rx.try_recv() { + Ok(NodeEvent::UpgradeScheduled { + node_id, + pending_version, + }) => { + assert_eq!(node_id, 1); + assert_eq!(pending_version, "0.10.11-rc.1"); + } + other => panic!("expected UpgradeScheduled event, got {other:?}"), + } + + // Stopped node: untouched, no event fired. + let affected = sup.mark_upgrade_scheduled(2, "0.10.11-rc.1".to_string()); + assert!(!affected); + assert_eq!(sup.node_status(2).unwrap(), NodeStatus::Stopped); + assert!(sup.node_pending_version(2).is_none()); + + // Already-UpgradeScheduled node: calling again is a no-op. + let affected = sup.mark_upgrade_scheduled(1, "0.10.12".to_string()); + assert!(!affected); + // Pending version is the original one set. + assert_eq!(sup.node_pending_version(1).as_deref(), Some("0.10.11-rc.1")); + } + + #[test] + fn node_counts_counts_upgrade_scheduled_as_running() { + let (tx, _rx) = broadcast::channel(16); + let mut sup = Supervisor::new(tx); + + sup.node_states.insert( + 1, + NodeRuntime { + status: NodeStatus::UpgradeScheduled, + pid: Some(111), + started_at: Some(Instant::now()), + restart_count: 0, + first_crash_at: None, + pending_version: Some("0.10.11-rc.1".to_string()), + }, + ); + + let (running, stopped, errored) = sup.node_counts(); + assert_eq!(running, 1); + assert_eq!(stopped, 0); + assert_eq!(errored, 0); + } + #[tokio::test] async fn stop_node_not_found() { let (tx, _rx) = broadcast::channel(16); @@ -765,6 +1076,7 @@ mod tests { started_at: None, restart_count: 0, first_crash_at: None, + pending_version: None, }, ); @@ -786,6 +1098,7 @@ mod tests { started_at: Some(Instant::now()), restart_count: 0, first_crash_at: None, + pending_version: None, }, ); // Node 2: already stopped @@ -797,6 +1110,7 @@ mod tests { started_at: None, restart_count: 0, first_crash_at: None, + pending_version: None, }, ); diff --git a/ant-core/src/node/events.rs b/ant-core/src/node/events.rs index cc5cf40..4b2de63 100644 --- a/ant-core/src/node/events.rs +++ b/ant-core/src/node/events.rs @@ -44,6 +44,19 @@ pub enum NodeEvent { version: String, path: PathBuf, }, + /// Emitted when the supervisor detects that a node's on-disk binary has been + /// replaced by its auto-upgrade, ahead of the node process restarting. + UpgradeScheduled { + node_id: u32, + pending_version: String, + }, + /// Emitted after the supervisor has respawned a node against its replaced binary and + /// observed the new version. + NodeUpgraded { + node_id: u32, + old_version: String, + new_version: String, + }, } impl NodeEvent { @@ -60,6 +73,8 @@ impl NodeEvent { NodeEvent::DownloadStarted { .. } => "download_started", NodeEvent::DownloadProgress { .. } => "download_progress", NodeEvent::DownloadComplete { .. } => "download_complete", + NodeEvent::UpgradeScheduled { .. } => "upgrade_scheduled", + NodeEvent::NodeUpgraded { .. } => "node_upgraded", } } } @@ -108,4 +123,31 @@ mod tests { let deserialized: NodeEvent = serde_json::from_str(&json).unwrap(); assert_eq!(deserialized.event_type(), "download_progress"); } + + #[test] + fn upgrade_scheduled_event_serializes() { + let event = NodeEvent::UpgradeScheduled { + node_id: 2, + pending_version: "0.10.11-rc.1".to_string(), + }; + let json = serde_json::to_string(&event).unwrap(); + assert!(json.contains("\"type\":\"upgrade_scheduled\"")); + assert!(json.contains("\"node_id\":2")); + assert!(json.contains("\"pending_version\":\"0.10.11-rc.1\"")); + assert_eq!(event.event_type(), "upgrade_scheduled"); + } + + #[test] + fn node_upgraded_event_serializes() { + let event = NodeEvent::NodeUpgraded { + node_id: 3, + old_version: "0.10.1".to_string(), + new_version: "0.10.11-rc.1".to_string(), + }; + let json = serde_json::to_string(&event).unwrap(); + assert!(json.contains("\"type\":\"node_upgraded\"")); + assert!(json.contains("\"old_version\":\"0.10.1\"")); + assert!(json.contains("\"new_version\":\"0.10.11-rc.1\"")); + assert_eq!(event.event_type(), "node_upgraded"); + } } diff --git a/ant-core/src/node/mod.rs b/ant-core/src/node/mod.rs index 9392479..63c3eaa 100644 --- a/ant-core/src/node/mod.rs +++ b/ant-core/src/node/mod.rs @@ -212,6 +212,7 @@ pub fn node_status_offline(registry_path: &Path) -> Result { status: NodeStatus::Stopped, pid: None, uptime_secs: None, + pending_version: None, }) .collect(); let total_stopped = nodes.len() as u32; diff --git a/ant-core/src/node/types.rs b/ant-core/src/node/types.rs index 855ec4b..5e83dd3 100644 --- a/ant-core/src/node/types.rs +++ b/ant-core/src/node/types.rs @@ -70,6 +70,10 @@ pub enum NodeStatus { Running, Stopping, Errored, + /// The node's on-disk binary has been replaced by an auto-upgrade, but the process has not + /// yet restarted. The supervisor is waiting for the current process to exit and will then + /// respawn it against the new binary. + UpgradeScheduled, } /// Persisted configuration for a single node. @@ -100,6 +104,10 @@ pub struct NodeInfo { pub status: NodeStatus, pub pid: Option, pub uptime_secs: Option, + /// Set only when `status == UpgradeScheduled`: the new version that the replaced on-disk + /// binary reports. Omitted otherwise. + #[serde(skip_serializing_if = "Option::is_none")] + pub pending_version: Option, } /// Result of a daemon start operation. @@ -348,6 +356,10 @@ pub struct NodeStatusSummary { /// Seconds since the node process started (only set when running). #[serde(skip_serializing_if = "Option::is_none")] pub uptime_secs: Option, + /// Set only when `status == UpgradeScheduled`: the new version that the replaced on-disk + /// binary reports. Omitted otherwise. + #[serde(skip_serializing_if = "Option::is_none")] + pub pending_version: Option, } /// Result of querying node status across all registered nodes. @@ -396,6 +408,32 @@ mod tests { assert_eq!(json, "\"running\""); } + #[test] + fn node_status_upgrade_scheduled_serializes() { + let json = serde_json::to_string(&NodeStatus::UpgradeScheduled).unwrap(); + assert_eq!(json, "\"upgrade_scheduled\""); + let parsed: NodeStatus = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed, NodeStatus::UpgradeScheduled); + } + + #[test] + fn node_status_summary_with_pending_version() { + let summary = NodeStatusSummary { + node_id: 7, + name: "antnode-7".to_string(), + version: "0.10.1".to_string(), + status: NodeStatus::UpgradeScheduled, + pid: Some(4242), + uptime_secs: Some(3600), + pending_version: Some("0.10.11-rc.1".to_string()), + }; + let json = serde_json::to_string(&summary).unwrap(); + assert!(json.contains("\"status\":\"upgrade_scheduled\"")); + assert!(json.contains("\"pending_version\":\"0.10.11-rc.1\"")); + let roundtrip: NodeStatusSummary = serde_json::from_str(&json).unwrap(); + assert_eq!(roundtrip.pending_version.as_deref(), Some("0.10.11-rc.1")); + } + #[test] fn port_range_single_len() { let pr = PortRange::Single(8080); @@ -478,6 +516,7 @@ mod tests { status: NodeStatus::Running, pid: Some(1234), uptime_secs: Some(60), + pending_version: None, }, NodeStatusSummary { node_id: 2, @@ -486,6 +525,7 @@ mod tests { status: NodeStatus::Stopped, pid: None, uptime_secs: None, + pending_version: None, }, ], total_running: 1, @@ -510,6 +550,7 @@ mod tests { status: NodeStatus::Running, pid: Some(5678), uptime_secs: Some(120), + pending_version: None, }; let json = serde_json::to_string(&summary).unwrap(); assert!(json.contains("\"node_id\":1")); @@ -518,6 +559,7 @@ mod tests { assert!(json.contains("\"status\":\"running\"")); assert!(json.contains("\"pid\":5678")); assert!(json.contains("\"uptime_secs\":120")); + assert!(!json.contains("pending_version")); // None fields should be omitted let stopped = NodeStatusSummary { @@ -527,10 +569,12 @@ mod tests { status: NodeStatus::Stopped, pid: None, uptime_secs: None, + pending_version: None, }; let json_stopped = serde_json::to_string(&stopped).unwrap(); assert!(!json_stopped.contains("pid")); assert!(!json_stopped.contains("uptime_secs")); + assert!(!json_stopped.contains("pending_version")); } #[test]