Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ All notable changes to this project will be documented in this file.
### Breaking

### Changes
- Activator
- Cap tunnel ID allocation per device based on `max_users` and gracefully reject users when a device is full instead of allocating invalid tunnel IDs
- Client
- Surface rejection reason as a CLI error on `doublezero connect` instead of hanging indefinitely when the activator rejects a user (e.g., device full)
- Demote passive-mode liveness session-down log messages from Info to Debug to reduce log noise when no dataplane action is taken
- E2E Tests
- Add geoprobe E2E test (`TestE2E_GeoprobeDiscovery`) that exercises the full geolocation flow: deploy geolocation program, create probe onchain, start geoprobe-agent container, and verify the telemetry-agent discovers and measures the probe via TWAMP
Expand Down
55 changes: 48 additions & 7 deletions activator/src/idallocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,23 @@ use indexmap::IndexSet;
#[derive(Debug)]
pub struct IDAllocator {
pub first: u16,
pub max: Option<u16>,
pub assigned: IndexSet<u16>,
}

impl IDAllocator {
pub fn new(first: u16, assigned: Vec<u16>) -> Self {
Self {
first,
max: None,
assigned: assigned.into_iter().collect(),
}
}

pub fn with_max(first: u16, max: u16, assigned: Vec<u16>) -> Self {
Self {
first,
max: Some(max),
assigned: assigned.into_iter().collect(),
}
}
Expand All @@ -22,13 +32,23 @@ impl IDAllocator {
self.assigned.shift_remove(&id);
}

pub fn next_available(&mut self) -> u16 {
pub fn next_available(&mut self) -> Option<u16> {
let mut id = self.first;
while self.assigned.contains(&id) {
id += 1;
if let Some(max) = self.max {
if id > max {
return None;
}
}
}
if let Some(max) = self.max {
if id > max {
return None;
}
}
self.assigned.insert(id);
id
Some(id)
}

#[allow(dead_code)]
Expand Down Expand Up @@ -84,23 +104,23 @@ mod tests {
fn test_next_available_from_first() {
let mut allocator = IDAllocator::new(100, vec![101, 102]);
let id = allocator.next_available();
assert_eq!(id, 100);
assert_eq!(id, Some(100));
assert_eq!(allocator.display_assigned(), "101,102,100");
}

#[test]
fn test_next_available_fills_gap() {
let mut allocator = IDAllocator::new(100, vec![100, 101, 103]);
let id = allocator.next_available();
assert_eq!(id, 102);
assert_eq!(id, Some(102));
assert_eq!(allocator.display_assigned(), "100,101,103,102");
}

#[test]
fn test_next_available_after_all_taken() {
let mut allocator = IDAllocator::new(100, vec![100, 101, 102]);
let id = allocator.next_available();
assert_eq!(id, 103);
assert_eq!(id, Some(103));
assert_eq!(allocator.display_assigned(), "100,101,102,103");
}

Expand All @@ -109,7 +129,7 @@ mod tests {
let mut allocator = IDAllocator::new(100, vec![100, 101, 102]);
allocator.unassign(101);
let id = allocator.next_available();
assert_eq!(id, 101);
assert_eq!(id, Some(101));
assert_eq!(allocator.display_assigned(), "100,102,101");
}

Expand All @@ -119,10 +139,31 @@ mod tests {
assert_eq!(allocator.display_assigned(), "");

let id = allocator.next_available();
assert_eq!(id, 200);
assert_eq!(id, Some(200));
assert_eq!(allocator.display_assigned(), "200");
}

#[test]
fn test_with_max_respects_upper_bound() {
let mut allocator = IDAllocator::with_max(500, 502, vec![500, 501, 502]);
assert_eq!(allocator.next_available(), None);
}

#[test]
fn test_with_max_fills_gap() {
let mut allocator = IDAllocator::with_max(500, 502, vec![500, 502]);
assert_eq!(allocator.next_available(), Some(501));
}

#[test]
fn test_with_max_allocates_up_to_max() {
let mut allocator = IDAllocator::with_max(500, 502, vec![]);
assert_eq!(allocator.next_available(), Some(500));
assert_eq!(allocator.next_available(), Some(501));
assert_eq!(allocator.next_available(), Some(502));
assert_eq!(allocator.next_available(), None);
}

#[test]
fn test_insertion_order_preserved() {
let mut allocator = IDAllocator::new(1, vec![5, 3, 7, 2]);
Expand Down
4 changes: 3 additions & 1 deletion activator/src/process/iface_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,9 @@ impl<'a> InterfaceMgr<'a> {
// Allocate segment routing ID if needed
if iface.node_segment_idx == 0 && iface.loopback_type == LoopbackType::Vpnv4 {
if let Some(ref mut segment_routing_ids) = self.segment_routing_ids {
iface.node_segment_idx = segment_routing_ids.next_available();
iface.node_segment_idx = segment_routing_ids
.next_available()
.expect("segment routing ID pool exhausted");
info!(
"Assigning segment routing ID {} to device {} interface {}",
iface.node_segment_idx, device.code, iface.name
Expand Down
2 changes: 1 addition & 1 deletion activator/src/process/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ fn get_link_id(link: &Link, link_ids: &mut IDAllocator) -> u16 {
if link.tunnel_id != 0 {
link.tunnel_id
} else {
link_ids.next_available()
link_ids.next_available().expect("link ID pool exhausted")
}
}

Expand Down
15 changes: 14 additions & 1 deletion activator/src/process/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,20 @@ pub fn process_user_event(

write!(&mut log_msg, " tunnel_net: {} ", &tunnel_net).unwrap();

let tunnel_id = device_state.get_next_tunnel_id();
let tunnel_id = match device_state.get_next_tunnel_id() {
Some(id) => id,
None => {
log_reject(
client,
pubkey,
"Error: No available tunnel ID on device",
"No available tunnel ID",
&mut log_msg,
);
info!("{log_msg}");
return;
}
};

// Determine tunnel endpoint: if the client demanded a specific one, validate it;
// otherwise fall back to first-available (backwards compat with 0.0.0.0).
Expand Down
7 changes: 5 additions & 2 deletions activator/src/states/devicestate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl DeviceState {
.iter()
.map(|b| IPBlockAllocator::new((*b).into()))
.collect(),
tunnel_ids: IDAllocator::new(500, vec![]),
tunnel_ids: IDAllocator::with_max(500, 499 + device.max_users, vec![]),
tunnel_endpoints_in_use: HashMap::new(),
Comment on lines +53 to 54
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't going to work if it's done at start time. If a device is currently drained (i.e. max users = 0), activator restarts then the contributor changes max users to 128, the tunnel ID allocator still still be at 0.

Either leaving the implementation as is and relying on Steven's fix to deallocated tunnel IDs properly or if activate requests are always serialized, using onchain state only to allocate the next tunnel both seem like better fixes.

}
}
Expand All @@ -68,6 +68,9 @@ impl DeviceState {
device.code, &device.public_ip, &device.dz_prefixes,
);
}
// Update tunnel ID cap if max_users changed.
self.tunnel_ids.max = Some(499 + device.max_users);

// Always refresh the device data so interfaces (e.g. UTE loopbacks
// added after initial load) are visible to get_available_tunnel_endpoint.
self.device = device.clone();
Expand All @@ -86,7 +89,7 @@ impl DeviceState {
None
}

pub fn get_next_tunnel_id(&mut self) -> u16 {
pub fn get_next_tunnel_id(&mut self) -> Option<u16> {
self.tunnel_ids.next_available()
}

Expand Down
28 changes: 17 additions & 11 deletions client/doublezero/src/command/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,22 +830,19 @@ impl ProvisioningCliCommand {
.with_min_delay(Duration::from_secs(1))
.with_max_delay(Duration::from_secs(32));

let get_activated_user = || {
let get_activated_or_rejected_user = || {
client
.get_user(GetUserCommand {
pubkey: *user_pubkey,
})
.and_then(|(pk, user)| {
if user.status != UserStatus::Activated {
Err(eyre::eyre!("User not activated yet"))
} else {
Ok((pk, user))
}
.and_then(|(pk, user)| match user.status {
UserStatus::Activated | UserStatus::Rejected => Ok((pk, user)),
_ => Err(eyre::eyre!("User not activated yet")),
})
.map_err(|e| eyre::eyre!(e.to_string()))
};

get_activated_user
get_activated_or_rejected_user
.retry(builder)
.notify(|_, dur| {
spinner.set_message(format!(
Expand Down Expand Up @@ -948,21 +945,30 @@ impl ProvisioningCliCommand {
user_pubkey: &Pubkey,
spinner: &ProgressBar,
) -> eyre::Result<()> {
spinner.println(format!(" {}", "User rejected"));
spinner.println("❌ User rejected");

spinner.set_message("Reading logs...");
std::thread::sleep(std::time::Duration::from_secs(10));
let msgs = client
.get_logs(user_pubkey)
.map_err(|_| eyre::eyre!("Unable to get logs"))?;

let mut reasons = Vec::new();
for mut msg in msgs {
if msg.starts_with("Program log: Error: ") {
spinner.println(format!(" {}", msg.split_off(20)));
let reason = msg.split_off(20);
spinner.println(format!(" Reason: {reason}"));
reasons.push(reason);
}
}

Ok(())
let detail = if reasons.is_empty() {
"no rejection reason available (check activator logs)".to_string()
} else {
reasons.join("; ")
};

Err(eyre::eyre!("User rejected: {detail}"))
}
}

Expand Down
Loading