diff --git a/CHANGELOG.md b/CHANGELOG.md index 602a4921e..dd4d86d93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,10 @@ All notable changes to this project will be documented in this file. ### Breaking ### Changes +- Activator + - Cap tunnel ID allocation per device based on `max_users` and gracefully reject users when a device is full instead of allocating invalid tunnel IDs - Client + - Surface rejection reason as a CLI error on `doublezero connect` instead of hanging indefinitely when the activator rejects a user (e.g., device full) - Demote passive-mode liveness session-down log messages from Info to Debug to reduce log noise when no dataplane action is taken - E2E Tests - Add geoprobe E2E test (`TestE2E_GeoprobeDiscovery`) that exercises the full geolocation flow: deploy geolocation program, create probe onchain, start geoprobe-agent container, and verify the telemetry-agent discovers and measures the probe via TWAMP diff --git a/activator/src/idallocator.rs b/activator/src/idallocator.rs index c94ac02bb..0b7689fe6 100644 --- a/activator/src/idallocator.rs +++ b/activator/src/idallocator.rs @@ -3,6 +3,7 @@ use indexmap::IndexSet; #[derive(Debug)] pub struct IDAllocator { pub first: u16, + pub max: Option, pub assigned: IndexSet, } @@ -10,6 +11,15 @@ impl IDAllocator { pub fn new(first: u16, assigned: Vec) -> Self { Self { first, + max: None, + assigned: assigned.into_iter().collect(), + } + } + + pub fn with_max(first: u16, max: u16, assigned: Vec) -> Self { + Self { + first, + max: Some(max), assigned: assigned.into_iter().collect(), } } @@ -22,13 +32,23 @@ impl IDAllocator { self.assigned.shift_remove(&id); } - pub fn next_available(&mut self) -> u16 { + pub fn next_available(&mut self) -> Option { let mut id = self.first; while self.assigned.contains(&id) { id += 1; + if let Some(max) = self.max { + if id > max { + return None; + } + } + } + if let Some(max) = self.max { + if id > max { + return None; + } } self.assigned.insert(id); - id + Some(id) } #[allow(dead_code)] @@ -84,7 +104,7 @@ mod tests { fn test_next_available_from_first() { let mut allocator = IDAllocator::new(100, vec![101, 102]); let id = allocator.next_available(); - assert_eq!(id, 100); + assert_eq!(id, Some(100)); assert_eq!(allocator.display_assigned(), "101,102,100"); } @@ -92,7 +112,7 @@ mod tests { fn test_next_available_fills_gap() { let mut allocator = IDAllocator::new(100, vec![100, 101, 103]); let id = allocator.next_available(); - assert_eq!(id, 102); + assert_eq!(id, Some(102)); assert_eq!(allocator.display_assigned(), "100,101,103,102"); } @@ -100,7 +120,7 @@ mod tests { fn test_next_available_after_all_taken() { let mut allocator = IDAllocator::new(100, vec![100, 101, 102]); let id = allocator.next_available(); - assert_eq!(id, 103); + assert_eq!(id, Some(103)); assert_eq!(allocator.display_assigned(), "100,101,102,103"); } @@ -109,7 +129,7 @@ mod tests { let mut allocator = IDAllocator::new(100, vec![100, 101, 102]); allocator.unassign(101); let id = allocator.next_available(); - assert_eq!(id, 101); + assert_eq!(id, Some(101)); assert_eq!(allocator.display_assigned(), "100,102,101"); } @@ -119,10 +139,31 @@ mod tests { assert_eq!(allocator.display_assigned(), ""); let id = allocator.next_available(); - assert_eq!(id, 200); + assert_eq!(id, Some(200)); assert_eq!(allocator.display_assigned(), "200"); } + #[test] + fn test_with_max_respects_upper_bound() { + let mut allocator = IDAllocator::with_max(500, 502, vec![500, 501, 502]); + assert_eq!(allocator.next_available(), None); + } + + #[test] + fn test_with_max_fills_gap() { + let mut allocator = IDAllocator::with_max(500, 502, vec![500, 502]); + assert_eq!(allocator.next_available(), Some(501)); + } + + #[test] + fn test_with_max_allocates_up_to_max() { + let mut allocator = IDAllocator::with_max(500, 502, vec![]); + assert_eq!(allocator.next_available(), Some(500)); + assert_eq!(allocator.next_available(), Some(501)); + assert_eq!(allocator.next_available(), Some(502)); + assert_eq!(allocator.next_available(), None); + } + #[test] fn test_insertion_order_preserved() { let mut allocator = IDAllocator::new(1, vec![5, 3, 7, 2]); diff --git a/activator/src/process/iface_mgr.rs b/activator/src/process/iface_mgr.rs index 02d0f1733..d76304c3e 100644 --- a/activator/src/process/iface_mgr.rs +++ b/activator/src/process/iface_mgr.rs @@ -210,7 +210,9 @@ impl<'a> InterfaceMgr<'a> { // Allocate segment routing ID if needed if iface.node_segment_idx == 0 && iface.loopback_type == LoopbackType::Vpnv4 { if let Some(ref mut segment_routing_ids) = self.segment_routing_ids { - iface.node_segment_idx = segment_routing_ids.next_available(); + iface.node_segment_idx = segment_routing_ids + .next_available() + .expect("segment routing ID pool exhausted"); info!( "Assigning segment routing ID {} to device {} interface {}", iface.node_segment_idx, device.code, iface.name diff --git a/activator/src/process/link.rs b/activator/src/process/link.rs index fee82e46b..93d38cb9c 100644 --- a/activator/src/process/link.rs +++ b/activator/src/process/link.rs @@ -100,7 +100,7 @@ fn get_link_id(link: &Link, link_ids: &mut IDAllocator) -> u16 { if link.tunnel_id != 0 { link.tunnel_id } else { - link_ids.next_available() + link_ids.next_available().expect("link ID pool exhausted") } } diff --git a/activator/src/process/user.rs b/activator/src/process/user.rs index 782d0b649..9736aa01c 100644 --- a/activator/src/process/user.rs +++ b/activator/src/process/user.rs @@ -100,7 +100,20 @@ pub fn process_user_event( write!(&mut log_msg, " tunnel_net: {} ", &tunnel_net).unwrap(); - let tunnel_id = device_state.get_next_tunnel_id(); + let tunnel_id = match device_state.get_next_tunnel_id() { + Some(id) => id, + None => { + log_reject( + client, + pubkey, + "Error: No available tunnel ID on device", + "No available tunnel ID", + &mut log_msg, + ); + info!("{log_msg}"); + return; + } + }; // Determine tunnel endpoint: if the client demanded a specific one, validate it; // otherwise fall back to first-available (backwards compat with 0.0.0.0). diff --git a/activator/src/states/devicestate.rs b/activator/src/states/devicestate.rs index 381412c64..52daf6ffc 100644 --- a/activator/src/states/devicestate.rs +++ b/activator/src/states/devicestate.rs @@ -50,7 +50,7 @@ impl DeviceState { .iter() .map(|b| IPBlockAllocator::new((*b).into())) .collect(), - tunnel_ids: IDAllocator::new(500, vec![]), + tunnel_ids: IDAllocator::with_max(500, 499 + device.max_users, vec![]), tunnel_endpoints_in_use: HashMap::new(), } } @@ -68,6 +68,9 @@ impl DeviceState { device.code, &device.public_ip, &device.dz_prefixes, ); } + // Update tunnel ID cap if max_users changed. + self.tunnel_ids.max = Some(499 + device.max_users); + // Always refresh the device data so interfaces (e.g. UTE loopbacks // added after initial load) are visible to get_available_tunnel_endpoint. self.device = device.clone(); @@ -86,7 +89,7 @@ impl DeviceState { None } - pub fn get_next_tunnel_id(&mut self) -> u16 { + pub fn get_next_tunnel_id(&mut self) -> Option { self.tunnel_ids.next_available() } diff --git a/client/doublezero/src/command/connect.rs b/client/doublezero/src/command/connect.rs index 2f91b0708..e327677be 100644 --- a/client/doublezero/src/command/connect.rs +++ b/client/doublezero/src/command/connect.rs @@ -830,22 +830,19 @@ impl ProvisioningCliCommand { .with_min_delay(Duration::from_secs(1)) .with_max_delay(Duration::from_secs(32)); - let get_activated_user = || { + let get_activated_or_rejected_user = || { client .get_user(GetUserCommand { pubkey: *user_pubkey, }) - .and_then(|(pk, user)| { - if user.status != UserStatus::Activated { - Err(eyre::eyre!("User not activated yet")) - } else { - Ok((pk, user)) - } + .and_then(|(pk, user)| match user.status { + UserStatus::Activated | UserStatus::Rejected => Ok((pk, user)), + _ => Err(eyre::eyre!("User not activated yet")), }) .map_err(|e| eyre::eyre!(e.to_string())) }; - get_activated_user + get_activated_or_rejected_user .retry(builder) .notify(|_, dur| { spinner.set_message(format!( @@ -948,7 +945,7 @@ impl ProvisioningCliCommand { user_pubkey: &Pubkey, spinner: &ProgressBar, ) -> eyre::Result<()> { - spinner.println(format!(" {}", "User rejected")); + spinner.println("❌ User rejected"); spinner.set_message("Reading logs..."); std::thread::sleep(std::time::Duration::from_secs(10)); @@ -956,13 +953,22 @@ impl ProvisioningCliCommand { .get_logs(user_pubkey) .map_err(|_| eyre::eyre!("Unable to get logs"))?; + let mut reasons = Vec::new(); for mut msg in msgs { if msg.starts_with("Program log: Error: ") { - spinner.println(format!(" {}", msg.split_off(20))); + let reason = msg.split_off(20); + spinner.println(format!(" Reason: {reason}")); + reasons.push(reason); } } - Ok(()) + let detail = if reasons.is_empty() { + "no rejection reason available (check activator logs)".to_string() + } else { + reasons.join("; ") + }; + + Err(eyre::eyre!("User rejected: {detail}")) } }