Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/target
.cargo/config.toml
.claude/plans/
.claude/scheduled_tasks.lock
.claude/settings.local.json
.claude/skills/
.planning/
11 changes: 9 additions & 2 deletions ant-cli/src/commands/node/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,19 @@ impl StatusArgs {
NodeStatus::Starting => format!("{} {}", "●".yellow(), "Starting".yellow()),
NodeStatus::Stopping => format!("{} {}", "●".yellow(), "Stopping".yellow()),
NodeStatus::Errored => format!("{} {}", "●".red(), "Errored".red()),
NodeStatus::UpgradeScheduled => {
format!("{} {}", "●".cyan(), "Upgrade scheduled".cyan())
}
};
let version_display = match &node.pending_version {
Some(pending) => format!("{} → {}", node.version, pending),
None => node.version.clone(),
};
println!(
" {:<4} {:<14} {:<10} {}",
" {:<4} {:<14} {:<18} {}",
node.node_id.to_string().bold(),
node.name,
node.version.dimmed(),
version_display.dimmed(),
status_display
);
}
Expand Down
5 changes: 4 additions & 1 deletion ant-core/src/node/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,10 @@ pub fn extract_zip(data: &[u8], install_dir: &Path, binary_name: &str) -> Result
}

/// Extract the version string from a node binary by running `<binary> --version`.
async fn extract_version(binary_path: &Path) -> Result<String> {
///
/// `pub(crate)` so the supervisor can poll the on-disk binary's version to detect
/// auto-upgrade state without duplicating the parse logic.
pub(crate) async fn extract_version(binary_path: &Path) -> Result<String> {
let output = tokio::process::Command::new(binary_path)
.arg("--version")
.output()
Expand Down
41 changes: 34 additions & 7 deletions ant-core/src/node/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tokio_util::sync::CancellationToken;

use crate::error::Result;
use crate::node::binary::NoopProgress;
use crate::node::daemon::supervisor::Supervisor;
use crate::node::daemon::supervisor::{spawn_upgrade_monitor, Supervisor, UPGRADE_POLL_INTERVAL};
use crate::node::events::NodeEvent;
use crate::node::registry::NodeRegistry;
use crate::node::types::{
Expand All @@ -26,7 +26,7 @@ use crate::node::types::{

/// Shared application state for the daemon HTTP server.
pub struct AppState {
pub registry: RwLock<NodeRegistry>,
pub registry: Arc<RwLock<NodeRegistry>>,
pub supervisor: Arc<RwLock<Supervisor>>,
pub event_tx: broadcast::Sender<NodeEvent>,
pub start_time: Instant,
Expand All @@ -53,15 +53,28 @@ pub async fn start(
.local_addr()
.map_err(|e| crate::error::Error::BindError(e.to_string()))?;

let registry = Arc::new(RwLock::new(registry));
let supervisor = Arc::new(RwLock::new(Supervisor::new(event_tx.clone())));

let state = Arc::new(AppState {
registry: RwLock::new(registry),
supervisor: Arc::new(RwLock::new(Supervisor::new(event_tx.clone()))),
registry: registry.clone(),
supervisor: supervisor.clone(),
event_tx,
start_time: Instant::now(),
config: config.clone(),
bound_port: bound_addr.port(),
});

// Background task: probe each Running node's on-disk binary for version drift caused by
// ant-node's auto-upgrade, and flip them to UpgradeScheduled so the supervisor knows the
// next exit is expected.
spawn_upgrade_monitor(
registry,
supervisor,
UPGRADE_POLL_INTERVAL,
shutdown.clone(),
);

let app = build_router(state.clone());

// Write port and PID files
Expand Down Expand Up @@ -173,12 +186,15 @@ async fn get_nodes_status(State(state): State<Arc<AppState>>) -> Json<NodeStatus
.unwrap_or(NodeStatus::Stopped);

match status {
NodeStatus::Running | NodeStatus::Starting => total_running += 1,
NodeStatus::Running | NodeStatus::Starting | NodeStatus::UpgradeScheduled => {
total_running += 1
}
_ => total_stopped += 1,
}

let pid = supervisor.node_pid(config.id);
let uptime_secs = supervisor.node_uptime_secs(config.id);
let pending_version = supervisor.node_pending_version(config.id);

nodes.push(NodeStatusSummary {
node_id: config.id,
Expand All @@ -187,6 +203,7 @@ async fn get_nodes_status(State(state): State<Arc<AppState>>) -> Json<NodeStatus
status,
pid,
uptime_secs,
pending_version,
});
}

Expand Down Expand Up @@ -217,12 +234,14 @@ async fn get_node_detail(
let status = supervisor.node_status(id).unwrap_or(NodeStatus::Stopped);
let pid = supervisor.node_pid(id);
let uptime_secs = supervisor.node_uptime_secs(id);
let pending_version = supervisor.node_pending_version(id);

Ok(Json(NodeInfo {
config,
status,
pid,
uptime_secs,
pending_version,
}))
}

Expand Down Expand Up @@ -328,7 +347,11 @@ async fn post_start_node(
));
}

match supervisor.start_node(&config, supervisor_ref).await {
let registry_ref = state.registry.clone();
match supervisor
.start_node(&config, supervisor_ref, registry_ref)
.await
{
Ok(started) => Ok(Json(started)),
Err(crate::error::Error::NodeAlreadyRunning(id)) => {
let pid = supervisor.node_pid(id);
Expand Down Expand Up @@ -364,6 +387,7 @@ async fn post_start_all(State(state): State<Arc<AppState>>) -> Json<StartNodeRes
let mut already_running = Vec::new();

let supervisor_ref = state.supervisor.clone();
let registry_ref = state.registry.clone();

for config in &configs {
let mut supervisor = state.supervisor.write().await;
Expand All @@ -372,7 +396,10 @@ async fn post_start_all(State(state): State<Arc<AppState>>) -> Json<StartNodeRes
continue;
}

match supervisor.start_node(config, supervisor_ref.clone()).await {
match supervisor
.start_node(config, supervisor_ref.clone(), registry_ref.clone())
.await
{
Ok(result) => started.push(result),
Err(crate::error::Error::NodeAlreadyRunning(id)) => {
already_running.push(id);
Expand Down
Loading
Loading