diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index 3a941928b..5a8b712ed 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -14,16 +14,15 @@ //! peerings, get dropped. use crate::tables::{NatRequirement, RemoteData, VpcdLookupResult}; -use indenter::indented; use lpm::prefix::L4Protocol; use net::buffer::PacketBufferMut; -use net::flows::FlowStatus; +use net::flows::FlowInfo; use net::flows::flow_info_item::ExtractRef; use net::headers::{Transport, TryIp, TryTransport}; use net::packet::{DoneReason, Packet, VpcDiscriminant}; use pipeline::{NetworkFunction, PipelineData}; use std::collections::HashSet; -use std::fmt::{Display, Write}; +use std::fmt::Display; use std::net::IpAddr; use std::num::NonZero; use std::sync::Arc; @@ -60,84 +59,6 @@ impl FlowFilter { } } - /// Attempt to determine destination vpc from packet's flow-info - fn check_packet_flow_info( - &self, - packet: &mut Packet, - ) -> Result, DoneReason> { - let nfi = &self.name; - - let Some(flow_info) = &packet.meta().flow_info else { - debug!("{nfi}: Packet does not contain any flow-info"); - return Ok(None); - }; - - let Ok(locked_info) = flow_info.locked.read() else { - debug!("{nfi}: Warning! failed to lock flow-info for packet, dropping packet"); - return Err(DoneReason::InternalFailure); - }; - - let vpcd = locked_info - .dst_vpcd - .as_ref() - .and_then(|d| d.extract_ref::()); - - let Some(dst_vpcd) = vpcd else { - debug!("{nfi}: No VPC discriminant found, dropping packet"); - return Err(DoneReason::Unroutable); - }; - - let status = flow_info.status(); - if status != FlowStatus::Active { - debug!( - "{nfi}: Found flow-info with dst_vpcd {dst_vpcd} but status {status}, dropping packet" - ); - return Err(DoneReason::Unroutable); - } - - debug!("{nfi}: dst_vpcd discriminant is {dst_vpcd} (from active flow-info entry)"); - Ok(Some(*dst_vpcd)) - } - - fn bypass_with_flow_info( - &self, - packet: &mut Packet, - genid: i64, - ) -> bool { - let Some(flow_info) = &packet.meta().flow_info else { - debug!("Packet does not contain any flow-info"); - return false; - }; - let flow_genid = flow_info.genid(); - if flow_genid < genid { - debug!("Packet has flow-info ({flow_genid} < {genid}). Need to re-evaluate..."); - return false; - } - let status = flow_info.status(); - if status != FlowStatus::Active { - debug!("Found flow-info but its status is {status}. Need to re-evaluate..."); - return false; - } - - let vpcd = flow_info - .locked - .read() - .unwrap() - .dst_vpcd - .as_ref() - .and_then(|d| d.extract_ref::()) - .copied(); - - debug!("Packet can bypass filter due to flow {flow_info}"); - - if set_nat_requirements_from_flow_info(packet).is_err() { - debug!("Failed to set nat requirements"); - return false; - } - packet.meta_mut().dst_vpcd = vpcd; - true - } - /// Process a packet. fn process_packet( &self, @@ -181,7 +102,7 @@ impl FlowFilter { None } Some(VpcdLookupResult::Single(dst_data)) => { - set_nat_requirements(packet, &dst_data); + Self::set_nat_requirements(packet, &dst_data); Some(dst_data.vpcd) } Some(VpcdLookupResult::MultipleMatches(data_set)) => { @@ -191,7 +112,7 @@ impl FlowFilter { match self.check_packet_flow_info(packet) { Ok(Some(dst_vpcd)) => { - if set_nat_requirements_from_flow_info(packet).is_ok() { + if Self::set_nat_requirements_from_flow_info(packet).is_ok() { Some(dst_vpcd) } else { debug!("{nfi}: Failed to set NAT requirements from flow info"); @@ -202,7 +123,7 @@ impl FlowFilter { debug!( "{nfi}: No flow table entry found for flow {tuple}, trying to figure out destination VPC anyway" ); - deal_with_multiple_matches(packet, &data_set, nfi, &tuple) + self.deal_with_multiple_matches(packet, &data_set, &tuple) } Err(reason) => { debug!("Will drop packet. Reason: {reason}"); @@ -244,98 +165,208 @@ impl FlowFilter { debug!("{nfi}: Flow {tuple} is allowed, setting packet dst_vpcd to {dst_vpcd}"); packet.meta_mut().dst_vpcd = Some(dst_vpcd); } -} -fn deal_with_multiple_matches( - packet: &mut Packet, - data_set: &HashSet, - nfi: &str, - tuple: &FlowTuple, -) -> Option { - // We should always have at least one matching RemoteData object applying to our packet. - debug_assert!( - !data_set.is_empty(), - "{nfi}: No matching RemoteData objects left for flow {tuple}" - ); - - // Do all matches have the same destination VPC? - let Some(first_vpcd) = data_set.iter().next().map(|d| d.vpcd) else { - debug!("{nfi}: Missing destination VPC information for flow {tuple}, dropping packet"); - return None; - }; - if data_set.iter().any(|d| d.vpcd != first_vpcd) { - debug!( - "{nfi}: Unable to decide what destination VPC to use for flow {tuple}, dropping packet" - ); - return None; - }; - - // data_set may actually contain RemoteData objects that do not apply to our packet, because the - // table lookup does not account for TCP vs. UDP, we only deal with the protocol when looking at - // NAT requirements. Here we filter out RemoteData objects that do not apply to our packet. - - let packet_proto = get_l4_proto(packet); - let data_set = data_set - .iter() - .filter(|d| d.applies_to(packet_proto)) - .collect::>(); - - if data_set.is_empty() { - debug!( - "{nfi}: No NAT requirement found for flow {tuple} after filtering by protocol, dropping packet" - ); - return None; - } + /// Check if flow-info is up-to-date and allows bypassing the main filtering logic. + fn bypass_with_flow_info( + &self, + packet: &mut Packet, + genid: i64, + ) -> bool { + let nfi = &self.name; + + let Some(flow_info) = packet.active_flow_info() else { + return false; + }; + let flow_genid = flow_info.genid(); + if flow_genid < genid { + debug!("{nfi}: Packet has flow-info ({flow_genid} < {genid}). Need to re-evaluate..."); + return false; + } + + let vpcd = Self::dst_vpcd_from_flow_info(flow_info); - // Can we do something sensible from the NAT requirements? At the moment we allow prefix overlap - // only when port forwarding is used in conjunction with stateful NAT, so if we reach this case - // this is what we should have. + debug!("{nfi}: Packet can bypass filter due to flow {flow_info}"); - // Note: if data_set.len() == 1 we can trivially figure out the destination VPC and NAT - // requirement. - if data_set.len() == 1 { - let dst_data = data_set.iter().next().unwrap_or_else(|| unreachable!()); - set_nat_requirements(packet, dst_data); - return Some(first_vpcd); + if Self::set_nat_requirements_from_flow_info(packet).is_err() { + debug!("{nfi}: Failed to set nat requirements"); + return false; + } + packet.meta_mut().dst_vpcd = vpcd; + true } - if data_set.len() > 2 { - debug!("{nfi}: Unsupported NAT requirements for flow {tuple}"); - return None; + /// Attempt to determine destination VPC from packet's flow-info. + fn check_packet_flow_info( + &self, + packet: &mut Packet, + ) -> Result, DoneReason> { + let nfi = &self.name; + + let Some(flow_info) = packet.active_flow_info() else { + return Ok(None); + }; + + let vpcd = Self::dst_vpcd_from_flow_info(flow_info); + + let Some(dst_vpcd) = vpcd else { + debug!("{nfi}: No VPC discriminant found, dropping packet"); + return Err(DoneReason::Unroutable); + }; + + debug!("{nfi}: dst_vpcd discriminant is {dst_vpcd} (from active flow-info entry)"); + Ok(Some(dst_vpcd)) } - // If we have stateful NAT and port masquerading on the source side, given that we haven't found - // a valid NAT entry, stateful NAT should take precedence so the packet can come out. - if let Some(dst_data) = data_set - .iter() - .find(|d| d.src_nat_req == Some(NatRequirement::Stateful)) - && data_set.iter().any(|d| { - let Some(NatRequirement::PortForwarding(requirement_proto)) = d.src_nat_req else { + /// Handle destination VPC retrieval and NAT requirements setting when multiple matches were + /// found, with no accompanying flow-info for the packet. + fn deal_with_multiple_matches( + &self, + packet: &mut Packet, + data_set: &HashSet, + tuple: &FlowTuple, + ) -> Option { + let nfi = &self.name; + + // We should always have at least one matching RemoteData object applying to our packet. + debug_assert!( + !data_set.is_empty(), + "{nfi}: No matching RemoteData objects left for flow {tuple}" + ); + + // Do all matches have the same destination VPC? + let Some(first_vpcd) = data_set.iter().next().map(|d| d.vpcd) else { + debug!("{nfi}: Missing destination VPC information for flow {tuple}, dropping packet"); + return None; + }; + if data_set.iter().any(|d| d.vpcd != first_vpcd) { + debug!( + "{nfi}: Unable to decide what destination VPC to use for flow {tuple}, dropping packet" + ); + return None; + }; + + // data_set may actually contain RemoteData objects that do not apply to our packet, because the + // table lookup does not account for TCP vs. UDP, we only deal with the protocol when looking at + // NAT requirements. Here we filter out RemoteData objects that do not apply to our packet. + + let packet_proto = get_l4_proto(packet); + let data_set = data_set + .iter() + .filter(|d| d.applies_to(packet_proto)) + .collect::>(); + + if data_set.is_empty() { + debug!( + "{nfi}: No NAT requirement found for flow {tuple} after filtering by protocol, dropping packet" + ); + return None; + } + + // Can we do something sensible from the NAT requirements? At the moment we allow prefix overlap + // only when port forwarding is used in conjunction with stateful NAT, so if we reach this case + // this is what we should have. + + // Note: if data_set.len() == 1 we can trivially figure out the destination VPC and NAT + // requirement. + if data_set.len() == 1 { + let dst_data = data_set.iter().next().unwrap_or_else(|| unreachable!()); + Self::set_nat_requirements(packet, dst_data); + return Some(first_vpcd); + } + + if data_set.len() > 2 { + debug!("{nfi}: Unsupported NAT requirements for flow {tuple}"); + return None; + } + + // If we have masquerading and port forwarding on the source side, given that we haven't + // found a valid NAT entry, stateful NAT should take precedence so the packet can come out. + if let Some(dst_data) = data_set + .iter() + .find(|d| d.src_nat_req == Some(NatRequirement::Stateful)) + && data_set.iter().any(|d| { + let Some(NatRequirement::PortForwarding(requirement_proto)) = d.src_nat_req else { + return false; + }; + requirement_proto.intersection(&packet_proto).is_some() + }) + { + Self::set_nat_requirements(packet, dst_data); + return Some(first_vpcd); + } + // If we have masquerading and port forwarding on the destination side, given that we + // haven't found a valid NAT entry, port forwarding should take precedence. + if let Some(dst_data) = data_set.iter().find(|d| { + let Some(NatRequirement::PortForwarding(req_proto)) = d.dst_nat_req else { return false; }; - requirement_proto.intersection(&packet_proto).is_some() - }) - { - set_nat_requirements(packet, dst_data); - return Some(first_vpcd); + req_proto.intersection(&packet_proto).is_some() + }) && data_set + .iter() + .any(|d| d.dst_nat_req == Some(NatRequirement::Stateful)) + { + Self::set_nat_requirements(packet, dst_data); + return Some(first_vpcd); + } + + debug!("{nfi}: Unsupported NAT requirements for flow {tuple}"); + None } - // If we have stateful NAT and port masquerading on the destination side, given that we haven't - // found a valid NAT entry, port forwarding should take precedence. - if let Some(dst_data) = data_set.iter().find(|d| { - let Some(NatRequirement::PortForwarding(req_proto)) = d.dst_nat_req else { - return false; - }; - req_proto.intersection(&packet_proto).is_some() - }) && data_set - .iter() - .any(|d| d.dst_nat_req == Some(NatRequirement::Stateful)) - { - set_nat_requirements(packet, dst_data); - return Some(first_vpcd); + + /// Set NAT requirements on the packet based on the remote data object. + fn set_nat_requirements(packet: &mut Packet, data: &RemoteData) { + if data.requires_stateful_nat() { + packet.meta_mut().set_stateful_nat(true); + } + if data.requires_stateless_nat() { + packet.meta_mut().set_stateless_nat(true); + } + if data.requires_port_forwarding(get_l4_proto(packet)) { + packet.meta_mut().set_port_forwarding(true); + } + } + + /// Set NAT requirements on the packet based on packet's flow-info, if any. + fn set_nat_requirements_from_flow_info( + packet: &mut Packet, + ) -> Result<(), ()> { + let locked_info = packet + .meta() + .flow_info + .as_ref() + .ok_or(())? + .locked + .read() + .map_err(|_| ())?; + let needs_stateful_nat = locked_info.nat_state.is_some(); + let needs_port_forwarding = locked_info.port_fw_state.is_some(); + drop(locked_info); + + match (needs_stateful_nat, needs_port_forwarding) { + (true, false) => { + packet.meta_mut().set_stateful_nat(true); + Ok(()) + } + (false, true) => { + packet.meta_mut().set_port_forwarding(true); + Ok(()) + } + _ => Err(()), + } } - debug!("{nfi}: Unsupported NAT requirements for flow {tuple}"); - None + /// Extract the destination VPC discriminant from a flow-info entry. + /// Panic if the lock has been poisoned. + fn dst_vpcd_from_flow_info(flow_info: &Arc) -> Option { + flow_info + .locked + .read() + .unwrap() + .dst_vpcd + .as_ref() + .and_then(|d| d.extract_ref::()) + .copied() + } } impl NetworkFunction for FlowFilter { @@ -409,57 +440,6 @@ impl Display for FlowTuple { } } -impl Display for FlowFilter { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - writeln!(f, "{}:", self.name)?; - if let Some(table) = self.tablesr.enter() { - write!(indented(f).with_str(" "), "{}", *table) - } else { - writeln!(f, " [no table]") - } - } -} - -fn set_nat_requirements(packet: &mut Packet, data: &RemoteData) { - if data.requires_stateful_nat() { - packet.meta_mut().set_stateful_nat(true); - } - if data.requires_stateless_nat() { - packet.meta_mut().set_stateless_nat(true); - } - if data.requires_port_forwarding(get_l4_proto(packet)) { - packet.meta_mut().set_port_forwarding(true); - } -} - -fn set_nat_requirements_from_flow_info( - packet: &mut Packet, -) -> Result<(), ()> { - let locked_info = packet - .meta() - .flow_info - .as_ref() - .ok_or(())? - .locked - .read() - .map_err(|_| ())?; - let needs_stateful_nat = locked_info.nat_state.is_some(); - let needs_port_forwarding = locked_info.port_fw_state.is_some(); - drop(locked_info); - - match (needs_stateful_nat, needs_port_forwarding) { - (true, false) => { - packet.meta_mut().set_stateful_nat(true); - Ok(()) - } - (false, true) => { - packet.meta_mut().set_port_forwarding(true); - Ok(()) - } - _ => Err(()), - } -} - pub(crate) fn get_l4_proto(packet: &Packet) -> L4Protocol { match packet.try_transport() { Some(Transport::Tcp(_)) => L4Protocol::Tcp, diff --git a/net/src/packet/mod.rs b/net/src/packet/mod.rs index f1d30e08f..64e5cdab7 100644 --- a/net/src/packet/mod.rs +++ b/net/src/packet/mod.rs @@ -16,6 +16,7 @@ pub mod test_utils; use crate::buffer::{Headroom, PacketBufferMut, Prepend, Tailroom, TrimFromStart}; use crate::eth::Eth; use crate::eth::EthError; +use crate::flows::{FlowInfo, FlowStatus}; use crate::headers::{ AbstractEmbeddedHeaders, AbstractEmbeddedHeadersMut, AbstractHeaders, AbstractHeadersMut, Headers, Net, Transport, TryEmbeddedHeaders, TryEmbeddedHeadersMut, TryHeaders, TryHeadersMut, @@ -27,11 +28,13 @@ use crate::udp::{Udp, UdpChecksum}; use crate::checksum::Checksum; use crate::vxlan::{Vxlan, VxlanEncap}; +use concurrency::sync::Arc; #[allow(unused_imports)] // re-export pub use hash::*; #[allow(unused_imports)] // re-export pub use meta::*; use std::num::NonZero; +use tracing::debug; pub mod utils; @@ -112,6 +115,20 @@ impl Packet { self.payload_len() + self.header_len().get() } + /// Return the active flow-info for the packet, if any. + pub fn active_flow_info(&self) -> Option<&Arc> { + let Some(flow_info) = &self.meta().flow_info else { + debug!("Packet does not contain any flow-info"); + return None; + }; + let status = flow_info.status(); + if status != FlowStatus::Active { + debug!("Found flow-info but its status is {status}, cannot use it"); + return None; + } + Some(flow_info) + } + #[inline] fn underlay_qos_from_outer_headers(headers: &Headers) -> (Option, Option) { match &headers.net {