From 691bea78715370bc4ad8c6edb5bf63c224f6776a Mon Sep 17 00:00:00 2001 From: Quentin Monnet Date: Thu, 19 Mar 2026 11:05:04 +0000 Subject: [PATCH 1/9] refactor(flow-filter): Move FlowFilter's process_packet() to the top Keep the main logic at the top. This brings methods bypass_with_flow_info() and check_packet_flow_info() nearer to similar functions, lower down in the file, and keeps process_packet() with the core logic near the top of the file. Signed-off-by: Quentin Monnet --- flow-filter/src/lib.rs | 157 +++++++++++++++++++++-------------------- 1 file changed, 79 insertions(+), 78 deletions(-) diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index 3a941928b..5e4c8e453 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -60,84 +60,6 @@ impl FlowFilter { } } - /// Attempt to determine destination vpc from packet's flow-info - fn check_packet_flow_info( - &self, - packet: &mut Packet, - ) -> Result, DoneReason> { - let nfi = &self.name; - - let Some(flow_info) = &packet.meta().flow_info else { - debug!("{nfi}: Packet does not contain any flow-info"); - return Ok(None); - }; - - let Ok(locked_info) = flow_info.locked.read() else { - debug!("{nfi}: Warning! failed to lock flow-info for packet, dropping packet"); - return Err(DoneReason::InternalFailure); - }; - - let vpcd = locked_info - .dst_vpcd - .as_ref() - .and_then(|d| d.extract_ref::()); - - let Some(dst_vpcd) = vpcd else { - debug!("{nfi}: No VPC discriminant found, dropping packet"); - return Err(DoneReason::Unroutable); - }; - - let status = flow_info.status(); - if status != FlowStatus::Active { - debug!( - "{nfi}: Found flow-info with dst_vpcd {dst_vpcd} but status {status}, dropping packet" - ); - return Err(DoneReason::Unroutable); - } - - debug!("{nfi}: dst_vpcd discriminant is {dst_vpcd} (from active flow-info entry)"); - Ok(Some(*dst_vpcd)) - } - - fn bypass_with_flow_info( - &self, - packet: &mut Packet, - genid: i64, - ) -> bool { - let Some(flow_info) = &packet.meta().flow_info else { - debug!("Packet does not contain any flow-info"); - return false; - }; - let flow_genid = flow_info.genid(); - if flow_genid < genid { - debug!("Packet has flow-info ({flow_genid} < {genid}). Need to re-evaluate..."); - return false; - } - let status = flow_info.status(); - if status != FlowStatus::Active { - debug!("Found flow-info but its status is {status}. Need to re-evaluate..."); - return false; - } - - let vpcd = flow_info - .locked - .read() - .unwrap() - .dst_vpcd - .as_ref() - .and_then(|d| d.extract_ref::()) - .copied(); - - debug!("Packet can bypass filter due to flow {flow_info}"); - - if set_nat_requirements_from_flow_info(packet).is_err() { - debug!("Failed to set nat requirements"); - return false; - } - packet.meta_mut().dst_vpcd = vpcd; - true - } - /// Process a packet. fn process_packet( &self, @@ -244,6 +166,85 @@ impl FlowFilter { debug!("{nfi}: Flow {tuple} is allowed, setting packet dst_vpcd to {dst_vpcd}"); packet.meta_mut().dst_vpcd = Some(dst_vpcd); } + + /// Check if flow-info is up-to-date and allows bypassing the main filtering logic. + fn bypass_with_flow_info( + &self, + packet: &mut Packet, + genid: i64, + ) -> bool { + let Some(flow_info) = &packet.meta().flow_info else { + debug!("Packet does not contain any flow-info"); + return false; + }; + let flow_genid = flow_info.genid(); + if flow_genid < genid { + debug!("Packet has flow-info ({flow_genid} < {genid}). Need to re-evaluate..."); + return false; + } + let status = flow_info.status(); + if status != FlowStatus::Active { + debug!("Found flow-info but its status is {status}. Need to re-evaluate..."); + return false; + } + + let vpcd = flow_info + .locked + .read() + .unwrap() + .dst_vpcd + .as_ref() + .and_then(|d| d.extract_ref::()) + .copied(); + + debug!("Packet can bypass filter due to flow {flow_info}"); + + if set_nat_requirements_from_flow_info(packet).is_err() { + debug!("Failed to set nat requirements"); + return false; + } + packet.meta_mut().dst_vpcd = vpcd; + true + } + + /// Attempt to determine destination VPC from packet's flow-info. + fn check_packet_flow_info( + &self, + packet: &mut Packet, + ) -> Result, DoneReason> { + let nfi = &self.name; + + let Some(flow_info) = &packet.meta().flow_info else { + debug!("{nfi}: Packet does not contain any flow-info"); + return Ok(None); + }; + + let Ok(locked_info) = flow_info.locked.read() else { + debug!("{nfi}: Warning! failed to lock flow-info for packet, dropping packet"); + return Err(DoneReason::InternalFailure); + }; + + let vpcd = locked_info + .dst_vpcd + .as_ref() + .and_then(|d| d.extract_ref::()); + + let Some(dst_vpcd) = vpcd else { + debug!("{nfi}: No VPC discriminant found, dropping packet"); + return Err(DoneReason::Unroutable); + }; + + let status = flow_info.status(); + if status != FlowStatus::Active { + debug!( + "{nfi}: Found flow-info with dst_vpcd {dst_vpcd} but status {status}, dropping packet" + ); + return Err(DoneReason::Unroutable); + } + + debug!("{nfi}: dst_vpcd discriminant is {dst_vpcd} (from active flow-info entry)"); + Ok(Some(*dst_vpcd)) + } } fn deal_with_multiple_matches( From 62f14c3eb74f130a144c336ce4c64dd1ee10446d Mon Sep 17 00:00:00 2001 From: Quentin Monnet Date: Thu, 19 Mar 2026 11:07:01 +0000 Subject: [PATCH 2/9] chore(flow-filter): Make logs in bypass_with_flow_info() use NF name For consistency with the other logs in the rest of the file, use the network function's name in logs from bypass_with_flow_info(). Signed-off-by: Quentin Monnet --- flow-filter/src/lib.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index 5e4c8e453..a5b4b8d6c 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -173,18 +173,20 @@ impl FlowFilter { packet: &mut Packet, genid: i64, ) -> bool { + let nfi = &self.name; + let Some(flow_info) = &packet.meta().flow_info else { - debug!("Packet does not contain any flow-info"); + debug!("{nfi}: Packet does not contain any flow-info"); return false; }; let flow_genid = flow_info.genid(); if flow_genid < genid { - debug!("Packet has flow-info ({flow_genid} < {genid}). Need to re-evaluate..."); + debug!("{nfi}: Packet has flow-info ({flow_genid} < {genid}). Need to re-evaluate..."); return false; } let status = flow_info.status(); if status != FlowStatus::Active { - debug!("Found flow-info but its status is {status}. Need to re-evaluate..."); + debug!("{nfi}: Found flow-info but its status is {status}. Need to re-evaluate..."); return false; } @@ -197,10 +199,10 @@ impl FlowFilter { .and_then(|d| d.extract_ref::()) .copied(); - debug!("Packet can bypass filter due to flow {flow_info}"); + debug!("{nfi}: Packet can bypass filter due to flow {flow_info}"); if set_nat_requirements_from_flow_info(packet).is_err() { - debug!("Failed to set nat requirements"); + debug!("{nfi}: Failed to set nat requirements"); return false; } packet.meta_mut().dst_vpcd = vpcd; From e1386533ffc7e230b973d93eb27ff2fcb9fd247a Mon Sep 17 00:00:00 2001 From: Quentin Monnet Date: Thu, 19 Mar 2026 11:09:47 +0000 Subject: [PATCH 3/9] refactor(flow-filter): Make deal_with_multiple_matches() to FlowFilter Make the function a method of struct FlowFilter, for consistency with similar functions called by FlowFilter.process_packet(). Signed-off-by: Quentin Monnet --- flow-filter/src/lib.rs | 172 +++++++++++++++++++++-------------------- 1 file changed, 88 insertions(+), 84 deletions(-) diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index a5b4b8d6c..84adfd60a 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -124,7 +124,7 @@ impl FlowFilter { debug!( "{nfi}: No flow table entry found for flow {tuple}, trying to figure out destination VPC anyway" ); - deal_with_multiple_matches(packet, &data_set, nfi, &tuple) + self.deal_with_multiple_matches(packet, &data_set, &tuple) } Err(reason) => { debug!("Will drop packet. Reason: {reason}"); @@ -247,98 +247,102 @@ impl FlowFilter { debug!("{nfi}: dst_vpcd discriminant is {dst_vpcd} (from active flow-info entry)"); Ok(Some(*dst_vpcd)) } -} -fn deal_with_multiple_matches( - packet: &mut Packet, - data_set: &HashSet, - nfi: &str, - tuple: &FlowTuple, -) -> Option { - // We should always have at least one matching RemoteData object applying to our packet. - debug_assert!( - !data_set.is_empty(), - "{nfi}: No matching RemoteData objects left for flow {tuple}" - ); - - // Do all matches have the same destination VPC? - let Some(first_vpcd) = data_set.iter().next().map(|d| d.vpcd) else { - debug!("{nfi}: Missing destination VPC information for flow {tuple}, dropping packet"); - return None; - }; - if data_set.iter().any(|d| d.vpcd != first_vpcd) { - debug!( - "{nfi}: Unable to decide what destination VPC to use for flow {tuple}, dropping packet" - ); - return None; - }; - - // data_set may actually contain RemoteData objects that do not apply to our packet, because the - // table lookup does not account for TCP vs. UDP, we only deal with the protocol when looking at - // NAT requirements. Here we filter out RemoteData objects that do not apply to our packet. - - let packet_proto = get_l4_proto(packet); - let data_set = data_set - .iter() - .filter(|d| d.applies_to(packet_proto)) - .collect::>(); - - if data_set.is_empty() { - debug!( - "{nfi}: No NAT requirement found for flow {tuple} after filtering by protocol, dropping packet" + /// Handle destination VPC retrieval and NAT requirements setting when multiple matches were + /// found, with no accompanying flow-info for the packet. + fn deal_with_multiple_matches( + &self, + packet: &mut Packet, + data_set: &HashSet, + tuple: &FlowTuple, + ) -> Option { + let nfi = &self.name; + + // We should always have at least one matching RemoteData object applying to our packet. + debug_assert!( + !data_set.is_empty(), + "{nfi}: No matching RemoteData objects left for flow {tuple}" ); - return None; - } - // Can we do something sensible from the NAT requirements? At the moment we allow prefix overlap - // only when port forwarding is used in conjunction with stateful NAT, so if we reach this case - // this is what we should have. + // Do all matches have the same destination VPC? + let Some(first_vpcd) = data_set.iter().next().map(|d| d.vpcd) else { + debug!("{nfi}: Missing destination VPC information for flow {tuple}, dropping packet"); + return None; + }; + if data_set.iter().any(|d| d.vpcd != first_vpcd) { + debug!( + "{nfi}: Unable to decide what destination VPC to use for flow {tuple}, dropping packet" + ); + return None; + }; - // Note: if data_set.len() == 1 we can trivially figure out the destination VPC and NAT - // requirement. - if data_set.len() == 1 { - let dst_data = data_set.iter().next().unwrap_or_else(|| unreachable!()); - set_nat_requirements(packet, dst_data); - return Some(first_vpcd); - } + // data_set may actually contain RemoteData objects that do not apply to our packet, because the + // table lookup does not account for TCP vs. UDP, we only deal with the protocol when looking at + // NAT requirements. Here we filter out RemoteData objects that do not apply to our packet. - if data_set.len() > 2 { - debug!("{nfi}: Unsupported NAT requirements for flow {tuple}"); - return None; - } + let packet_proto = get_l4_proto(packet); + let data_set = data_set + .iter() + .filter(|d| d.applies_to(packet_proto)) + .collect::>(); - // If we have stateful NAT and port masquerading on the source side, given that we haven't found - // a valid NAT entry, stateful NAT should take precedence so the packet can come out. - if let Some(dst_data) = data_set - .iter() - .find(|d| d.src_nat_req == Some(NatRequirement::Stateful)) - && data_set.iter().any(|d| { - let Some(NatRequirement::PortForwarding(requirement_proto)) = d.src_nat_req else { + if data_set.is_empty() { + debug!( + "{nfi}: No NAT requirement found for flow {tuple} after filtering by protocol, dropping packet" + ); + return None; + } + + // Can we do something sensible from the NAT requirements? At the moment we allow prefix overlap + // only when port forwarding is used in conjunction with stateful NAT, so if we reach this case + // this is what we should have. + + // Note: if data_set.len() == 1 we can trivially figure out the destination VPC and NAT + // requirement. + if data_set.len() == 1 { + let dst_data = data_set.iter().next().unwrap_or_else(|| unreachable!()); + set_nat_requirements(packet, dst_data); + return Some(first_vpcd); + } + + if data_set.len() > 2 { + debug!("{nfi}: Unsupported NAT requirements for flow {tuple}"); + return None; + } + + // If we have stateful NAT and port masquerading on the source side, given that we haven't found + // a valid NAT entry, stateful NAT should take precedence so the packet can come out. + if let Some(dst_data) = data_set + .iter() + .find(|d| d.src_nat_req == Some(NatRequirement::Stateful)) + && data_set.iter().any(|d| { + let Some(NatRequirement::PortForwarding(requirement_proto)) = d.src_nat_req else { + return false; + }; + requirement_proto.intersection(&packet_proto).is_some() + }) + { + set_nat_requirements(packet, dst_data); + return Some(first_vpcd); + } + // If we have stateful NAT and port masquerading on the destination side, given that we haven't + // found a valid NAT entry, port forwarding should take precedence. + if let Some(dst_data) = data_set.iter().find(|d| { + let Some(NatRequirement::PortForwarding(req_proto)) = d.dst_nat_req else { return false; }; - requirement_proto.intersection(&packet_proto).is_some() - }) - { - set_nat_requirements(packet, dst_data); - return Some(first_vpcd); - } - // If we have stateful NAT and port masquerading on the destination side, given that we haven't - // found a valid NAT entry, port forwarding should take precedence. - if let Some(dst_data) = data_set.iter().find(|d| { - let Some(NatRequirement::PortForwarding(req_proto)) = d.dst_nat_req else { - return false; - }; - req_proto.intersection(&packet_proto).is_some() - }) && data_set - .iter() - .any(|d| d.dst_nat_req == Some(NatRequirement::Stateful)) - { - set_nat_requirements(packet, dst_data); - return Some(first_vpcd); - } + req_proto.intersection(&packet_proto).is_some() + }) && data_set + .iter() + .any(|d| d.dst_nat_req == Some(NatRequirement::Stateful)) + { + set_nat_requirements(packet, dst_data); + return Some(first_vpcd); + } - debug!("{nfi}: Unsupported NAT requirements for flow {tuple}"); - None + debug!("{nfi}: Unsupported NAT requirements for flow {tuple}"); + None + } } impl NetworkFunction for FlowFilter { From c83d74649a742e51742f53d323d583e805f30285 Mon Sep 17 00:00:00 2001 From: Quentin Monnet Date: Thu, 19 Mar 2026 11:19:34 +0000 Subject: [PATCH 4/9] refactor(flow-filter): Move methods to FlowFilter's implementation Keep restructuring the code in flow-filter/src/lib.rs, by moving processing function into FlowFilter's implementation. Signed-off-by: Quentin Monnet --- flow-filter/src/lib.rs | 94 +++++++++++++++++++++--------------------- 1 file changed, 48 insertions(+), 46 deletions(-) diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index 84adfd60a..59b892f1d 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -103,7 +103,7 @@ impl FlowFilter { None } Some(VpcdLookupResult::Single(dst_data)) => { - set_nat_requirements(packet, &dst_data); + Self::set_nat_requirements(packet, &dst_data); Some(dst_data.vpcd) } Some(VpcdLookupResult::MultipleMatches(data_set)) => { @@ -113,7 +113,7 @@ impl FlowFilter { match self.check_packet_flow_info(packet) { Ok(Some(dst_vpcd)) => { - if set_nat_requirements_from_flow_info(packet).is_ok() { + if Self::set_nat_requirements_from_flow_info(packet).is_ok() { Some(dst_vpcd) } else { debug!("{nfi}: Failed to set NAT requirements from flow info"); @@ -201,7 +201,7 @@ impl FlowFilter { debug!("{nfi}: Packet can bypass filter due to flow {flow_info}"); - if set_nat_requirements_from_flow_info(packet).is_err() { + if Self::set_nat_requirements_from_flow_info(packet).is_err() { debug!("{nfi}: Failed to set nat requirements"); return false; } @@ -301,7 +301,7 @@ impl FlowFilter { // requirement. if data_set.len() == 1 { let dst_data = data_set.iter().next().unwrap_or_else(|| unreachable!()); - set_nat_requirements(packet, dst_data); + Self::set_nat_requirements(packet, dst_data); return Some(first_vpcd); } @@ -322,7 +322,7 @@ impl FlowFilter { requirement_proto.intersection(&packet_proto).is_some() }) { - set_nat_requirements(packet, dst_data); + Self::set_nat_requirements(packet, dst_data); return Some(first_vpcd); } // If we have stateful NAT and port masquerading on the destination side, given that we haven't @@ -336,13 +336,55 @@ impl FlowFilter { .iter() .any(|d| d.dst_nat_req == Some(NatRequirement::Stateful)) { - set_nat_requirements(packet, dst_data); + Self::set_nat_requirements(packet, dst_data); return Some(first_vpcd); } debug!("{nfi}: Unsupported NAT requirements for flow {tuple}"); None } + + /// Set NAT requirements on the packet based on the remote data object. + fn set_nat_requirements(packet: &mut Packet, data: &RemoteData) { + if data.requires_stateful_nat() { + packet.meta_mut().set_stateful_nat(true); + } + if data.requires_stateless_nat() { + packet.meta_mut().set_stateless_nat(true); + } + if data.requires_port_forwarding(get_l4_proto(packet)) { + packet.meta_mut().set_port_forwarding(true); + } + } + + /// Set NAT requirements on the packet based on packet's flow-info, if any. + fn set_nat_requirements_from_flow_info( + packet: &mut Packet, + ) -> Result<(), ()> { + let locked_info = packet + .meta() + .flow_info + .as_ref() + .ok_or(())? + .locked + .read() + .map_err(|_| ())?; + let needs_stateful_nat = locked_info.nat_state.is_some(); + let needs_port_forwarding = locked_info.port_fw_state.is_some(); + drop(locked_info); + + match (needs_stateful_nat, needs_port_forwarding) { + (true, false) => { + packet.meta_mut().set_stateful_nat(true); + Ok(()) + } + (false, true) => { + packet.meta_mut().set_port_forwarding(true); + Ok(()) + } + _ => Err(()), + } + } } impl NetworkFunction for FlowFilter { @@ -427,46 +469,6 @@ impl Display for FlowFilter { } } -fn set_nat_requirements(packet: &mut Packet, data: &RemoteData) { - if data.requires_stateful_nat() { - packet.meta_mut().set_stateful_nat(true); - } - if data.requires_stateless_nat() { - packet.meta_mut().set_stateless_nat(true); - } - if data.requires_port_forwarding(get_l4_proto(packet)) { - packet.meta_mut().set_port_forwarding(true); - } -} - -fn set_nat_requirements_from_flow_info( - packet: &mut Packet, -) -> Result<(), ()> { - let locked_info = packet - .meta() - .flow_info - .as_ref() - .ok_or(())? - .locked - .read() - .map_err(|_| ())?; - let needs_stateful_nat = locked_info.nat_state.is_some(); - let needs_port_forwarding = locked_info.port_fw_state.is_some(); - drop(locked_info); - - match (needs_stateful_nat, needs_port_forwarding) { - (true, false) => { - packet.meta_mut().set_stateful_nat(true); - Ok(()) - } - (false, true) => { - packet.meta_mut().set_port_forwarding(true); - Ok(()) - } - _ => Err(()), - } -} - pub(crate) fn get_l4_proto(packet: &Packet) -> L4Protocol { match packet.try_transport() { Some(Transport::Tcp(_)) => L4Protocol::Tcp, From e91419245c4c11048bcd983b354db366262dd091 Mon Sep 17 00:00:00 2001 From: Quentin Monnet Date: Thu, 19 Mar 2026 11:30:49 +0000 Subject: [PATCH 5/9] fix(flow-filter): Fix comments in deal_with_multiple_matches() Fix comments: there's no such thing as "port masquerading", it's supposed to be "port forwarding". Reported-by: Fredi Raspall Signed-off-by: Quentin Monnet --- flow-filter/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index 59b892f1d..3dc84dccb 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -310,8 +310,8 @@ impl FlowFilter { return None; } - // If we have stateful NAT and port masquerading on the source side, given that we haven't found - // a valid NAT entry, stateful NAT should take precedence so the packet can come out. + // If we have masquerading and port forwarding on the source side, given that we haven't + // found a valid NAT entry, stateful NAT should take precedence so the packet can come out. if let Some(dst_data) = data_set .iter() .find(|d| d.src_nat_req == Some(NatRequirement::Stateful)) @@ -325,8 +325,8 @@ impl FlowFilter { Self::set_nat_requirements(packet, dst_data); return Some(first_vpcd); } - // If we have stateful NAT and port masquerading on the destination side, given that we haven't - // found a valid NAT entry, port forwarding should take precedence. + // If we have masquerading and port forwarding on the destination side, given that we + // haven't found a valid NAT entry, port forwarding should take precedence. if let Some(dst_data) = data_set.iter().find(|d| { let Some(NatRequirement::PortForwarding(req_proto)) = d.dst_nat_req else { return false; From c766c6dbf87b19661fc7b8495258129137299b7f Mon Sep 17 00:00:00 2001 From: Quentin Monnet Date: Thu, 19 Mar 2026 11:12:31 +0000 Subject: [PATCH 6/9] fix(flow-filter): Do not drop packet on expired flow When looking for an existing flow to determine where to send a packet, in the case when the flow-filter lookup returned an ambiguous result, we would discard the packet on finding an expired flow. This is incorrect: if the flow is expired, then we cannot use it to determine the destination VPC, but this is not a reason for dropping the packet. Instead we need to carry on and try to determine the destination VPC based on other means, potentially going through stateful NAT as a follow-up step and recreating an up-to-date flow. Also move the status check closer to the flow availability check itself. No need to attempt to take the lock to retrieve the destination VPC if the flow has expired. Fixes: 2eb6274ac5af ("feat(flow-filter): Update logic for port forwarding + masquerade overlap") Signed-off-by: Quentin Monnet --- flow-filter/src/lib.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index 3dc84dccb..9a7a4d00b 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -221,6 +221,12 @@ impl FlowFilter { return Ok(None); }; + let status = flow_info.status(); + if status != FlowStatus::Active { + debug!("{nfi}: Found flow-info but its status is {status}, cannot use it"); + return Ok(None); + } + let Ok(locked_info) = flow_info.locked.read() else { debug!("{nfi}: Warning! failed to lock flow-info for packet, dropping packet"); return Err(DoneReason::InternalFailure); @@ -236,14 +242,6 @@ impl FlowFilter { return Err(DoneReason::Unroutable); }; - let status = flow_info.status(); - if status != FlowStatus::Active { - debug!( - "{nfi}: Found flow-info with dst_vpcd {dst_vpcd} but status {status}, dropping packet" - ); - return Err(DoneReason::Unroutable); - } - debug!("{nfi}: dst_vpcd discriminant is {dst_vpcd} (from active flow-info entry)"); Ok(Some(*dst_vpcd)) } From f415621020cb5c5705a585fc4192a59f0762e192 Mon Sep 17 00:00:00 2001 From: Quentin Monnet Date: Thu, 19 Mar 2026 11:27:39 +0000 Subject: [PATCH 7/9] refactor(flow-info,net): Move flow info lookup/checks to Packet In flow-filter's main logic, both functions check_packet_flow_info() and bypass_with_flow_info() check for the availability, and then for the status of flow information for the packet being processed. And we may yet add another similar occurrence of these checks in future work. Let's move the two checks to a dedicated method from struct Packet in an effort to remove duplicated code. Note that we do not use the new method to check the status again after looking for flow availability in set_nat_requirements_from_flow_info(), because we've just checked it at the two locations where we called this function. Checking it again in that function: - Would be useless, as we already check it. - Might introduce a bug in the logic if the flow expired between the two checks, because in that case we'd end up dropping the packet instead of acting as if there was no flow (in fact, if the first checked returned an active flow and the flow expired after the first check, it's still OK to keep going with this packet). Signed-off-by: Quentin Monnet --- flow-filter/src/lib.rs | 18 ++---------------- net/src/packet/mod.rs | 17 +++++++++++++++++ 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index 9a7a4d00b..8082cb08d 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -17,7 +17,6 @@ use crate::tables::{NatRequirement, RemoteData, VpcdLookupResult}; use indenter::indented; use lpm::prefix::L4Protocol; use net::buffer::PacketBufferMut; -use net::flows::FlowStatus; use net::flows::flow_info_item::ExtractRef; use net::headers::{Transport, TryIp, TryTransport}; use net::packet::{DoneReason, Packet, VpcDiscriminant}; @@ -175,8 +174,7 @@ impl FlowFilter { ) -> bool { let nfi = &self.name; - let Some(flow_info) = &packet.meta().flow_info else { - debug!("{nfi}: Packet does not contain any flow-info"); + let Some(flow_info) = packet.active_flow_info() else { return false; }; let flow_genid = flow_info.genid(); @@ -184,11 +182,6 @@ impl FlowFilter { debug!("{nfi}: Packet has flow-info ({flow_genid} < {genid}). Need to re-evaluate..."); return false; } - let status = flow_info.status(); - if status != FlowStatus::Active { - debug!("{nfi}: Found flow-info but its status is {status}. Need to re-evaluate..."); - return false; - } let vpcd = flow_info .locked @@ -216,17 +209,10 @@ impl FlowFilter { ) -> Result, DoneReason> { let nfi = &self.name; - let Some(flow_info) = &packet.meta().flow_info else { - debug!("{nfi}: Packet does not contain any flow-info"); + let Some(flow_info) = packet.active_flow_info() else { return Ok(None); }; - let status = flow_info.status(); - if status != FlowStatus::Active { - debug!("{nfi}: Found flow-info but its status is {status}, cannot use it"); - return Ok(None); - } - let Ok(locked_info) = flow_info.locked.read() else { debug!("{nfi}: Warning! failed to lock flow-info for packet, dropping packet"); return Err(DoneReason::InternalFailure); diff --git a/net/src/packet/mod.rs b/net/src/packet/mod.rs index f1d30e08f..64e5cdab7 100644 --- a/net/src/packet/mod.rs +++ b/net/src/packet/mod.rs @@ -16,6 +16,7 @@ pub mod test_utils; use crate::buffer::{Headroom, PacketBufferMut, Prepend, Tailroom, TrimFromStart}; use crate::eth::Eth; use crate::eth::EthError; +use crate::flows::{FlowInfo, FlowStatus}; use crate::headers::{ AbstractEmbeddedHeaders, AbstractEmbeddedHeadersMut, AbstractHeaders, AbstractHeadersMut, Headers, Net, Transport, TryEmbeddedHeaders, TryEmbeddedHeadersMut, TryHeaders, TryHeadersMut, @@ -27,11 +28,13 @@ use crate::udp::{Udp, UdpChecksum}; use crate::checksum::Checksum; use crate::vxlan::{Vxlan, VxlanEncap}; +use concurrency::sync::Arc; #[allow(unused_imports)] // re-export pub use hash::*; #[allow(unused_imports)] // re-export pub use meta::*; use std::num::NonZero; +use tracing::debug; pub mod utils; @@ -112,6 +115,20 @@ impl Packet { self.payload_len() + self.header_len().get() } + /// Return the active flow-info for the packet, if any. + pub fn active_flow_info(&self) -> Option<&Arc> { + let Some(flow_info) = &self.meta().flow_info else { + debug!("Packet does not contain any flow-info"); + return None; + }; + let status = flow_info.status(); + if status != FlowStatus::Active { + debug!("Found flow-info but its status is {status}, cannot use it"); + return None; + } + Some(flow_info) + } + #[inline] fn underlay_qos_from_outer_headers(headers: &Headers) -> (Option, Option) { match &headers.net { From b85525cfcd6583c241d884ac484fb560af79a1a2 Mon Sep 17 00:00:00 2001 From: Quentin Monnet Date: Thu, 19 Mar 2026 11:42:15 +0000 Subject: [PATCH 8/9] refactor(flow-filter): Move dst_vpcd extraction to helper function In an effort to de-duplicate some of the code in the flow-filter processing, move the code to extract and cast the destination VPC discriminant from packet's flow information to a dedicated function that can be called in both check_packet_flow_info() and bypass_with_flow_info(). Signed-off-by: Quentin Monnet --- flow-filter/src/lib.rs | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index 8082cb08d..5c6d2bd25 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -17,6 +17,7 @@ use crate::tables::{NatRequirement, RemoteData, VpcdLookupResult}; use indenter::indented; use lpm::prefix::L4Protocol; use net::buffer::PacketBufferMut; +use net::flows::FlowInfo; use net::flows::flow_info_item::ExtractRef; use net::headers::{Transport, TryIp, TryTransport}; use net::packet::{DoneReason, Packet, VpcDiscriminant}; @@ -183,14 +184,7 @@ impl FlowFilter { return false; } - let vpcd = flow_info - .locked - .read() - .unwrap() - .dst_vpcd - .as_ref() - .and_then(|d| d.extract_ref::()) - .copied(); + let vpcd = Self::dst_vpcd_from_flow_info(flow_info); debug!("{nfi}: Packet can bypass filter due to flow {flow_info}"); @@ -213,15 +207,7 @@ impl FlowFilter { return Ok(None); }; - let Ok(locked_info) = flow_info.locked.read() else { - debug!("{nfi}: Warning! failed to lock flow-info for packet, dropping packet"); - return Err(DoneReason::InternalFailure); - }; - - let vpcd = locked_info - .dst_vpcd - .as_ref() - .and_then(|d| d.extract_ref::()); + let vpcd = Self::dst_vpcd_from_flow_info(flow_info); let Some(dst_vpcd) = vpcd else { debug!("{nfi}: No VPC discriminant found, dropping packet"); @@ -229,7 +215,7 @@ impl FlowFilter { }; debug!("{nfi}: dst_vpcd discriminant is {dst_vpcd} (from active flow-info entry)"); - Ok(Some(*dst_vpcd)) + Ok(Some(dst_vpcd)) } /// Handle destination VPC retrieval and NAT requirements setting when multiple matches were @@ -369,6 +355,19 @@ impl FlowFilter { _ => Err(()), } } + + /// Extract the destination VPC discriminant from a flow-info entry. + /// Panic if the lock has been poisoned. + fn dst_vpcd_from_flow_info(flow_info: &Arc) -> Option { + flow_info + .locked + .read() + .unwrap() + .dst_vpcd + .as_ref() + .and_then(|d| d.extract_ref::()) + .copied() + } } impl NetworkFunction for FlowFilter { From 9c364e51a35e08c7bb79eace1f22c4d864fef69f Mon Sep 17 00:00:00 2001 From: Quentin Monnet Date: Wed, 18 Mar 2026 17:22:32 +0000 Subject: [PATCH 9/9] chore(flow-filter): Remove unused Display implementation for FlowFilter We already implement Display for FlowFilterTable, we don't need it for the network function object itself. Signed-off-by: Quentin Monnet --- flow-filter/src/lib.rs | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index 5c6d2bd25..5a8b712ed 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -14,7 +14,6 @@ //! peerings, get dropped. use crate::tables::{NatRequirement, RemoteData, VpcdLookupResult}; -use indenter::indented; use lpm::prefix::L4Protocol; use net::buffer::PacketBufferMut; use net::flows::FlowInfo; @@ -23,7 +22,7 @@ use net::headers::{Transport, TryIp, TryTransport}; use net::packet::{DoneReason, Packet, VpcDiscriminant}; use pipeline::{NetworkFunction, PipelineData}; use std::collections::HashSet; -use std::fmt::{Display, Write}; +use std::fmt::Display; use std::net::IpAddr; use std::num::NonZero; use std::sync::Arc; @@ -441,17 +440,6 @@ impl Display for FlowTuple { } } -impl Display for FlowFilter { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - writeln!(f, "{}:", self.name)?; - if let Some(table) = self.tablesr.enter() { - write!(indented(f).with_str(" "), "{}", *table) - } else { - writeln!(f, " [no table]") - } - } -} - pub(crate) fn get_l4_proto(packet: &Packet) -> L4Protocol { match packet.try_transport() { Some(Transport::Tcp(_)) => L4Protocol::Tcp,