From c7cc8c09a7ff6f5433754755e96848eebbc7410d Mon Sep 17 00:00:00 2001 From: Fredi Raspall Date: Fri, 13 Mar 2026 21:39:26 +0100 Subject: [PATCH 01/11] feat(pipeline): add PipelineData and trait We need an easy way for NFs that belong to a pipeline to access some common data (e.g. the generation Id of the current configuration applied) in an easy way, without needing to propagate or change APIs each time. This commit: - defines a PipelineData struct with an atomic u64 for the generation id. A struct is defined so that other fields can be added later. - extends trait NetWorkFunction so that the stages that require so can have an Arc to the PipelineData when added to the pipeline. - Modifies the method to add stages to the pipeline so that those NFs that implement the new method get access to the shared PipelineData. Signed-off-by: Fredi Raspall --- pipeline/src/lib.rs | 2 +- pipeline/src/pipeline.rs | 45 ++++++++++++++++++++++++++++++++++++++- pipeline/src/static_nf.rs | 6 ++++++ 3 files changed, 51 insertions(+), 2 deletions(-) diff --git a/pipeline/src/lib.rs b/pipeline/src/lib.rs index e8cf1914f..25f19328b 100644 --- a/pipeline/src/lib.rs +++ b/pipeline/src/lib.rs @@ -116,7 +116,7 @@ pub(crate) mod test_utils; #[allow(unused)] pub use dyn_nf::{DynNetworkFunction, nf_dyn}; #[allow(unused)] -pub use pipeline::{DynPipeline, StageId}; +pub use pipeline::{DynPipeline, PipelineData, StageId}; #[allow(unused)] pub use static_nf::{NetworkFunction, StaticChain}; diff --git a/pipeline/src/pipeline.rs b/pipeline/src/pipeline.rs index 6227ef687..8c23083d3 100644 --- a/pipeline/src/pipeline.rs +++ b/pipeline/src/pipeline.rs @@ -11,10 +11,37 @@ use net::buffer::PacketBufferMut; use net::packet::Packet; use ordermap::OrderMap; use std::any::Any; +use std::sync::Arc; +use std::sync::atomic::AtomicI64; /// A type that represents an Id for a stage or NF pub type StageId = Id>>; +/// Data associated to a `Pipeline` +#[derive(Default, Debug)] +pub struct PipelineData { + /// Current generation Id + pub genid: AtomicI64, +} +impl PipelineData { + #[must_use] + /// Build a new `PipelineData` object + pub fn new(genid: i64) -> Self { + Self { + genid: AtomicI64::new(genid), + } + } + /// Read the generation id + pub fn genid(&self) -> i64 { + self.genid.load(std::sync::atomic::Ordering::Relaxed) + } + /// Set the generation id + pub fn set_genid(&self, genid: i64) { + self.genid + .store(genid, std::sync::atomic::Ordering::Relaxed); + } +} + /// A dynamic pipeline that can be updated at runtime. /// /// This struct is used to create a dynamic pipeline that can be updated at runtime. @@ -25,6 +52,7 @@ pub type StageId = Id>>; #[derive(Default)] pub struct DynPipeline { nfs: OrderMap, Box>>, + data: Arc, } #[derive(Debug, thiserror::Error)] @@ -39,15 +67,30 @@ impl DynPipeline { pub fn new() -> Self { Self { nfs: OrderMap::new(), + data: Arc::from(PipelineData::default()), } } + #[must_use] + /// Set `PipelineData` to a `DynPipeline` + pub fn set_data(mut self, data: Arc) -> Self { + self.data = data; + self + } + + #[must_use] + /// Get `PipelineData` from a pipeline + pub fn get_data(&self) -> Arc { + self.data.clone() + } + /// Add a static network function to the pipeline. /// /// This method takes a [`NetworkFunction`] and adds it to the pipeline. /// #[must_use] - pub fn add_stage + 'static>(self, nf: NF) -> Self { + pub fn add_stage + 'static>(self, mut nf: NF) -> Self { + nf.set_data(self.data.clone()); self.add_stage_dyn(nf_dyn(nf)) } diff --git a/pipeline/src/static_nf.rs b/pipeline/src/static_nf.rs index e37221f28..9a4bb8d5d 100644 --- a/pipeline/src/static_nf.rs +++ b/pipeline/src/static_nf.rs @@ -4,6 +4,9 @@ use net::buffer::PacketBufferMut; use net::packet::Packet; use std::marker::PhantomData; +use std::sync::Arc; + +use crate::PipelineData; /// Trait for an object that processes a stream of packets. pub trait NetworkFunction { @@ -23,6 +26,9 @@ pub trait NetworkFunction { &'a mut self, input: Input, ) -> impl Iterator> + 'a; + + /// Let NFs access some `PipelineData` if they wish on their creation + fn set_data(&mut self, _data: Arc) {} } struct StaticChainImpl, NF2: NetworkFunction> { From 9d3a20308837914dba71f80074f91edd9ed9ff57 Mon Sep 17 00:00:00 2001 From: Fredi Raspall Date: Fri, 13 Mar 2026 23:26:26 +0100 Subject: [PATCH 02/11] feat(nat,flow-filter): impl set_data() for NAT and flow-filter Extend the masquerade, port-forwarding and flow-filter NFs with an Arc<> to pipeline data and implement the trait method set_data(). Signed-off-by: Fredi Raspall --- flow-filter/src/lib.rs | 9 ++++++++- nat/src/portfw/nf.rs | 8 +++++++- nat/src/stateful/mod.rs | 8 +++++++- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index 4dca8167c..9fe599801 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -21,11 +21,12 @@ use net::flows::FlowStatus; use net::flows::flow_info_item::ExtractRef; use net::headers::{Transport, TryIp, TryTransport}; use net::packet::{DoneReason, Packet, VpcDiscriminant}; -use pipeline::NetworkFunction; +use pipeline::{NetworkFunction, PipelineData}; use std::collections::HashSet; use std::fmt::{Display, Write}; use std::net::IpAddr; use std::num::NonZero; +use std::sync::Arc; use tracing::{debug, error}; mod filter_rw; @@ -45,6 +46,7 @@ trace_target!("flow-filter", LevelFilter::INFO, &["pipeline"]); pub struct FlowFilter { name: String, tablesr: FlowFilterTableReader, + pipeline_data: Arc, } impl FlowFilter { @@ -53,6 +55,7 @@ impl FlowFilter { Self { name: name.to_string(), tablesr, + pipeline_data: Arc::from(PipelineData::default()), } } @@ -297,6 +300,10 @@ impl NetworkFunction for FlowFilter { packet.enforce() }) } + + fn set_data(&mut self, data: Arc) { + self.pipeline_data = data; + } } // Only used for Display diff --git a/nat/src/portfw/nf.rs b/nat/src/portfw/nf.rs index 90505d5ba..78fcffa5a 100644 --- a/nat/src/portfw/nf.rs +++ b/nat/src/portfw/nf.rs @@ -11,7 +11,7 @@ use net::flows::{ExtractMut, ExtractRef, FlowInfo}; use net::headers::{TryIp, TryTcp, TryTransport}; use net::ip::{NextHeader, UnicastIpAddr}; use net::packet::{DoneReason, Packet, VpcDiscriminant}; -use pipeline::NetworkFunction; +use pipeline::{NetworkFunction, PipelineData}; use std::num::NonZero; use std::sync::Arc; use std::time::Instant; @@ -36,6 +36,7 @@ pub struct PortForwarder { name: String, flow_table: Arc, fwtable: PortFwTableReader, + pipeline_data: Arc, } impl PortForwarder { @@ -46,6 +47,7 @@ impl PortForwarder { name: name.to_string(), flow_table, fwtable, + pipeline_data: Arc::from(PipelineData::default()), } } @@ -369,4 +371,8 @@ impl NetworkFunction for PortForwarder { packet.enforce() }) } + + fn set_data(&mut self, data: Arc) { + self.pipeline_data = data; + } } diff --git a/nat/src/stateful/mod.rs b/nat/src/stateful/mod.rs index 27d575945..b0124ab13 100644 --- a/nat/src/stateful/mod.rs +++ b/nat/src/stateful/mod.rs @@ -23,7 +23,7 @@ use net::flows::{ExtractRef, FlowInfo}; use net::headers::{Net, Transport, TryIp, TryIpMut, TryTransportMut}; use net::packet::{DoneReason, Packet, VpcDiscriminant}; use net::{FlowKey, FlowKeyData, IpProtoKey}; -use pipeline::NetworkFunction; +use pipeline::{NetworkFunction, PipelineData}; use std::fmt::{Debug, Display}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::time::{Duration, Instant}; @@ -84,6 +84,7 @@ pub struct StatefulNat { name: String, sessions: Arc, allocator: NatAllocatorReader, + pipeline_data: Arc, } impl StatefulNat { @@ -94,6 +95,7 @@ impl StatefulNat { name: name.to_string(), sessions, allocator, + pipeline_data: Arc::from(PipelineData::default()), } } @@ -579,6 +581,10 @@ impl NetworkFunction for StatefulNat { packet.enforce() }) } + + fn set_data(&mut self, data: Arc) { + self.pipeline_data = data; + } } #[cfg(test)] From 83ddfeff1144d0441a2f9e9df24c9817d7705e3b Mon Sep 17 00:00:00 2001 From: Fredi Raspall Date: Sat, 14 Mar 2026 11:58:16 +0100 Subject: [PATCH 03/11] feat(dataplane): build pipeline with pipeline data Signed-off-by: Fredi Raspall --- dataplane/src/packet_processor/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dataplane/src/packet_processor/mod.rs b/dataplane/src/packet_processor/mod.rs index 2e0fdba0e..ed943e507 100644 --- a/dataplane/src/packet_processor/mod.rs +++ b/dataplane/src/packet_processor/mod.rs @@ -21,8 +21,8 @@ use nat::stateless::NatTablesWriter; use nat::{IcmpErrorHandler, StatefulNat, StatelessNat}; use net::buffer::PacketBufferMut; -use pipeline::DynPipeline; use pipeline::sample_nfs::PacketDumper; +use pipeline::{DynPipeline, PipelineData}; use routing::{Router, RouterError, RouterParams}; @@ -73,8 +73,10 @@ pub(crate) fn start_router( let nattabler_factory = nattablesw.get_reader_factory(); let natallocator_factory = natallocatorw.get_reader_factory(); let portfw_factory = portfw_w.reader().factory(); + let pdata = Arc::from(PipelineData::new(0)); let pipeline_builder = move || { + let pdata_clone = pdata.clone(); // Build network functions let stage_ingress = Ingress::new("Ingress", iftr_factory.handle()); let stage_egress = Egress::new("Egress", iftr_factory.handle(), atabler_factory.handle()); @@ -101,6 +103,7 @@ pub(crate) fn start_router( // Build the pipeline for a router. The composition of the pipeline (in stages) is currently // hard-coded. In any pipeline, the Stats and ExpirationsNF stages should go last DynPipeline::new() + .set_data(pdata_clone) .add_stage(stage_ingress) .add_stage(iprouter1) .add_stage(flow_lookup) From a2fee254147238e383698ef58028f6d9833aeb62 Mon Sep 17 00:00:00 2001 From: Fredi Raspall Date: Sat, 14 Mar 2026 12:17:22 +0100 Subject: [PATCH 04/11] feat(mgmnt,dataplane): pass pipeline data to mgmt Pass pipeline data to mgmt and update its generation id when a new config is applied. Signed-off-by: Fredi Raspall --- dataplane/src/main.rs | 1 + mgmt/Cargo.toml | 1 + mgmt/src/processor/proc.rs | 7 +++++++ mgmt/src/tests/mgmt.rs | 5 +++++ 4 files changed, 14 insertions(+) diff --git a/dataplane/src/main.rs b/dataplane/src/main.rs index 795775445..08820b907 100644 --- a/dataplane/src/main.rs +++ b/dataplane/src/main.rs @@ -208,6 +208,7 @@ fn main() { hostname: gwname.clone(), processor_params: ConfigProcessorParams { router_ctl: setup.router.get_ctl_tx(), + pipeline_data: pipeline_factory().get_data(), vpcmapw: setup.vpcmapw, nattablesw: setup.nattablesw, natallocatorw: setup.natallocatorw, diff --git a/mgmt/Cargo.toml b/mgmt/Cargo.toml index 6acefda0d..f39f381c6 100644 --- a/mgmt/Cargo.toml +++ b/mgmt/Cargo.toml @@ -28,6 +28,7 @@ k8s-less = { workspace = true } lpm = { workspace = true } nat = { workspace = true } net = { workspace = true } +pipeline = { workspace = true } rekon = { workspace = true } routing = { workspace = true } stats = { workspace = true } diff --git a/mgmt/src/processor/proc.rs b/mgmt/src/processor/proc.rs index 3bdbd4dfd..b66f02c36 100644 --- a/mgmt/src/processor/proc.rs +++ b/mgmt/src/processor/proc.rs @@ -26,6 +26,7 @@ use nat::portfw::build_port_forwarding_configuration; use nat::stateful::NatAllocatorWriter; use nat::stateless::NatTablesWriter; use nat::stateless::setup::build_nat_configuration; +use pipeline::PipelineData; use crate::processor::display::ConfigHistory; use crate::processor::gwconfigdb::GwConfigDatabase; @@ -77,6 +78,9 @@ pub struct ConfigProcessorParams { // channel to router pub router_ctl: RouterCtlSender, + // access data associated to pipeline + pub pipeline_data: Arc, + // writer for vpc mapping table pub vpcmapw: VpcMapWriter, @@ -623,6 +627,9 @@ impl ConfigProcessor { /* apply config in router */ apply_router_config(&kernel_vrfs, config, router_ctl).await?; + /* update the pipeline generation id, iff config was applied */ + self.proc_params.pipeline_data.set_genid(genid); + info!("Successfully applied config for genid {genid}"); Ok(()) } diff --git a/mgmt/src/tests/mgmt.rs b/mgmt/src/tests/mgmt.rs index f0741fc30..7b774baf0 100644 --- a/mgmt/src/tests/mgmt.rs +++ b/mgmt/src/tests/mgmt.rs @@ -17,6 +17,7 @@ pub mod test { use nat::stateless::NatTablesWriter; use net::eth::mac::Mac; use net::interface::Mtu; + use pipeline::PipelineData; use std::net::IpAddr; use std::net::Ipv4Addr; use std::str::FromStr; @@ -465,9 +466,13 @@ pub mod test { /* create VPC stats store (Arc) */ let vpc_stats_store = VpcStatsStore::new(); + /* pipeline data */ + let pipeline_data = Arc::from(PipelineData::default()); + /* build configuration of mgmt config processor */ let processor_config = ConfigProcessorParams { router_ctl, + pipeline_data, vpcmapw, nattablesw, natallocatorw, From dde454bf0df0d1fafae008cd1a910af83fdd3ab5 Mon Sep 17 00:00:00 2001 From: Fredi Raspall Date: Sat, 14 Mar 2026 11:59:22 +0100 Subject: [PATCH 05/11] feat(net): allow flow info to have a generation id Augment the flow_info object with a generation id. The value represents the generation id of the configuration under which the flow was created. Subsequent configuration changes will update the value to the current generation id if the flow is still allowed under that configuration. Signed-off-by: Fredi Raspall --- net/src/flows/flow_info.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/net/src/flows/flow_info.rs b/net/src/flows/flow_info.rs index 1e8100d32..8b29c8d9b 100644 --- a/net/src/flows/flow_info.rs +++ b/net/src/flows/flow_info.rs @@ -8,7 +8,7 @@ use concurrency::sync::RwLock; use concurrency::sync::Weak; use std::fmt::{Debug, Display}; use std::mem::MaybeUninit; -use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::atomic::{AtomicI64, AtomicU8, Ordering}; use std::time::{Duration, Instant}; use super::{AtomicInstant, FlowInfoItem}; @@ -142,6 +142,7 @@ pub struct FlowInfoLocked { pub struct FlowInfo { expires_at: AtomicInstant, flowkey: Option, + genid: AtomicI64, status: AtomicFlowStatus, pub locked: RwLock, pub related: Option>, @@ -156,6 +157,7 @@ impl FlowInfo { Self { expires_at: AtomicInstant::new(expires_at), flowkey: None, + genid: AtomicI64::new(0), status: AtomicFlowStatus::from(FlowStatus::Active), locked: RwLock::new(FlowInfoLocked::default()), related: None, @@ -171,6 +173,16 @@ impl FlowInfo { self.flowkey.as_ref() } + /// Set the generation Id of a flow + pub fn set_genid(&self, genid: i64) { + self.genid.store(genid, Ordering::Relaxed); + } + + /// Read the generation Id of a flow. + pub fn genid(&self) -> i64 { + self.genid.load(Ordering::Relaxed) + } + /// We want to create a pair of `FlowInfo`s that are mutually related via a `Weak` references so that no lookup /// is needed to find one from the other. This is tricky because the `FlowInfo`s are shared and we /// need concurrent access to them. One option to build such relationships is to let those `Weak` @@ -224,6 +236,7 @@ impl FlowInfo { one_p.write(Self { expires_at: AtomicInstant::new(expires_at), flowkey: Some(key1), + genid: AtomicI64::new(0), status: AtomicFlowStatus::from(FlowStatus::Active), locked: RwLock::new(FlowInfoLocked::default()), related: Some(two_weak), @@ -231,6 +244,7 @@ impl FlowInfo { two_p.write(Self { expires_at: AtomicInstant::new(expires_at), flowkey: Some(key2), + genid: AtomicI64::new(0), status: AtomicFlowStatus::from(FlowStatus::Active), locked: RwLock::new(FlowInfoLocked::default()), related: Some(one_weak), @@ -369,6 +383,7 @@ impl Display for FlowInfo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let expires_at = self.expires_at.load(Ordering::Relaxed); let expires_in = expires_at.saturating_duration_since(Instant::now()); + let genid = self.genid(); writeln!(f)?; if let Ok(info) = self.locked.try_read() { write!(f, "{info}")?; @@ -383,7 +398,7 @@ impl Display for FlowInfo { writeln!( f, - " status: {:?}, expires in {}s, related: {has_related}", + " status: {:?}, expires in {}s, related: {has_related}, genid: {genid}", self.status, expires_in.as_secs(), ) From 4d09427fd547cb5f37a908b4558c0f32197585bd Mon Sep 17 00:00:00 2001 From: Fredi Raspall Date: Sat, 14 Mar 2026 12:56:29 +0100 Subject: [PATCH 06/11] feat(nat): set generation id on flow creation When creating a flow for masquerading or port forwarding, label it with the current generation id. Signed-off-by: Fredi Raspall --- nat/src/portfw/nf.rs | 5 +++++ nat/src/stateful/mod.rs | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/nat/src/portfw/nf.rs b/nat/src/portfw/nf.rs index 78fcffa5a..8b4e76c96 100644 --- a/nat/src/portfw/nf.rs +++ b/nat/src/portfw/nf.rs @@ -118,6 +118,11 @@ impl PortForwarder { let timeout = Instant::now() + entry.init_timeout(); let (fw_flow, rev_flow) = FlowInfo::related_pair(timeout, fw_key, rev_key); + // label the flows with the current generation id + let genid = self.pipeline_data.genid(); + fw_flow.set_genid(genid); + rev_flow.set_genid(genid); + // set the flows in the FORWARD & REVERSE direction for subsequent packets let status = setup_forward_flow(&fw_key, &fw_flow, entry, new_dst_ip, new_dst_port); setup_reverse_flow(&rev_key, &rev_flow, entry, dst_ip, dst_port, status); diff --git a/nat/src/stateful/mod.rs b/nat/src/stateful/mod.rs index b0124ab13..611822bee 100644 --- a/nat/src/stateful/mod.rs +++ b/nat/src/stateful/mod.rs @@ -195,6 +195,10 @@ impl StatefulNat { ); let flow_info = FlowInfo::new(Self::session_timeout_time(idle_timeout)); + + // label the flow with the current generation id + flow_info.set_genid(self.pipeline_data.genid()); + if let Ok(mut write_guard) = flow_info.locked.write() { write_guard.nat_state = Some(Box::new(state)); write_guard.dst_vpcd = Some(Box::new(dst_vpcd)); From 04fdb26d451864543f5e7a85938ad1ab2de269b9 Mon Sep 17 00:00:00 2001 From: Fredi Raspall Date: Sat, 14 Mar 2026 16:01:26 +0100 Subject: [PATCH 07/11] feat(flow-filter): bypass flow filter if flow has state Modify flow filter so that packets are not unnecessarily re-evaluated if they refer to a flow and the flow was created with the current configuration. The logic is: - if packet refers to a flow and the flow's gen id is the same as that of the current configuration, we can bypass the flow filter since the flow should be allowed. - if packet refers to a flow but its gen id is lower than that of the current config, the packet needs to be re-evaluated; that is, we can't bypass the flow-filter. - if a packet referring to a flow is re-evaluated, then: - if the packet is allowed by the flow filter, set the flow gen id to the current (as the flow is allowed with the current config) for subsequent packets to bypass. - if the packet is not anymore allowed, drop it and invalidate the flow(s) it referred to. Signed-off-by: Fredi Raspall --- flow-filter/src/lib.rs | 64 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index 9fe599801..d0a6905ad 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -98,6 +98,45 @@ impl FlowFilter { Ok(Some(*dst_vpcd)) } + fn bypass_with_flow_info( + &self, + packet: &mut Packet, + genid: i64, + ) -> bool { + let Some(flow_info) = &packet.meta().flow_info else { + debug!("Packet does not contain any flow-info"); + return false; + }; + let flow_genid = flow_info.genid(); + if flow_genid < genid { + debug!("Packet has flow-info ({flow_genid} < {genid}). Need to re-evaluate..."); + return false; + } + let status = flow_info.status(); + if status != FlowStatus::Active { + debug!("Found flow-info but its status is {status}. Need to re-evaluate..."); + return false; + } + + let vpcd = flow_info + .locked + .read() + .unwrap() + .dst_vpcd + .as_ref() + .and_then(|d| d.extract_ref::()) + .copied(); + + debug!("Packet can bypass filter due to flow {flow_info}"); + + if set_nat_requirements_from_flow_info(packet).is_err() { + debug!("Failed to set nat requirements"); + return false; + } + packet.meta_mut().dst_vpcd = vpcd; + true + } + /// Process a packet. fn process_packet( &self, @@ -105,6 +144,12 @@ impl FlowFilter { packet: &mut Packet, ) { let nfi = &self.name; + let genid = self.pipeline_data.genid(); + + // bypass flow-filter if packet has flow-info and it is not outdated + if self.bypass_with_flow_info(packet, genid) { + return; + } let Some(net) = packet.try_ip() else { debug!("{nfi}: No IP headers found, dropping packet"); @@ -182,10 +227,29 @@ impl FlowFilter { // Drop the packet since we don't know destination and it is not an icmp error let Some(dst_vpcd) = dst_vpcd else { debug!("Could not determine dst vpcd. Dropping packet"); + // if packet referred to a flow, invalidate it + if let Some(flow_info) = packet.meta().flow_info.as_ref() { + flow_info.invalidate(); + flow_info + .related + .as_ref() + .and_then(|r| r.upgrade()) + .inspect(|r| r.invalidate()); + } packet.done(DoneReason::Filtered); return; }; + // packet is allowed. If it refers to a flow, update its genid, and that of the related flow if any + if let Some(flow_info) = &packet.meta().flow_info { + flow_info.set_genid(genid); + flow_info + .related + .as_ref() + .and_then(|r| r.upgrade()) + .inspect(|r| r.set_genid(genid)); + } + debug!("{nfi}: Flow {tuple} is allowed, setting packet dst_vpcd to {dst_vpcd}"); packet.meta_mut().dst_vpcd = Some(dst_vpcd); } From 65f7d6a558552efdb03f3daefb45a4ab6fa9d3e8 Mon Sep 17 00:00:00 2001 From: Fredi Raspall Date: Sat, 14 Mar 2026 16:15:52 +0100 Subject: [PATCH 08/11] feat(flow-filter): move tracing target declaration up Move the declaration of tracing target "port-frwarding" to the modules' root so that it governs all port-forwarding logs. Signed-off-by: Fredi Raspall --- nat/src/portfw/mod.rs | 3 +++ nat/src/portfw/nf.rs | 3 --- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/nat/src/portfw/mod.rs b/nat/src/portfw/mod.rs index 824414113..20d367e31 100644 --- a/nat/src/portfw/mod.rs +++ b/nat/src/portfw/mod.rs @@ -19,3 +19,6 @@ pub use portfwtable::access::{PortFwTableReader, PortFwTableReaderFactory, PortF pub use portfwtable::objects::{PortFwEntry, PortFwKey, PortFwTable}; pub use portfwtable::portrange::PortRange; pub use portfwtable::setup::build_port_forwarding_configuration; + +use tracectl::trace_target; +trace_target!("port-forwarding", LevelFilter::INFO, &["nat", "pipeline"]); diff --git a/nat/src/portfw/nf.rs b/nat/src/portfw/nf.rs index 8b4e76c96..efd01e3be 100644 --- a/nat/src/portfw/nf.rs +++ b/nat/src/portfw/nf.rs @@ -28,9 +28,6 @@ use crate::portfw::packet::{dnat_packet, nat_packet}; #[allow(unused)] use tracing::{debug, error, trace, warn}; -use tracectl::trace_target; -trace_target!("port-forwarding", LevelFilter::INFO, &["nat", "pipeline"]); - /// A port-forwarding network function pub struct PortForwarder { name: String, From f16ac83127e7e649accbf59b4ac5fda0be5df90c Mon Sep 17 00:00:00 2001 From: Fredi Raspall Date: Mon, 16 Mar 2026 21:55:54 +0100 Subject: [PATCH 09/11] feat(nat): do not label flows with current genid on creation When creating flows, neither masquerade nor port-forwarding labels them, so that the flow gets a default generation id of 0. This way the flow-filter must always validate a flow after it is created thereby avoiding the potential race where a flow is created for generation N while the packet, processed before by the flow filter, gets validated for config N-1. This race stems from the fact that the generation id is stored in an atomic and could change in the interim between the validation and the flow creation. If config N would allow the flow but N+1 wouldn't we could otherwise be in the situation where we would allow a flow that should be denied (until the next reconfiguration) because NAT would create the flow validated by a prior config. Note: with this changes, neither masquerade nor port-forwarding need to access the pipeline data. However, we leave the Arc's for future use since their cost is negligible. Signed-off-by: Fredi Raspall --- nat/src/portfw/nf.rs | 5 ----- nat/src/stateful/mod.rs | 4 ---- net/src/flows/flow_info.rs | 7 +++++++ 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/nat/src/portfw/nf.rs b/nat/src/portfw/nf.rs index efd01e3be..72a6369cc 100644 --- a/nat/src/portfw/nf.rs +++ b/nat/src/portfw/nf.rs @@ -115,11 +115,6 @@ impl PortForwarder { let timeout = Instant::now() + entry.init_timeout(); let (fw_flow, rev_flow) = FlowInfo::related_pair(timeout, fw_key, rev_key); - // label the flows with the current generation id - let genid = self.pipeline_data.genid(); - fw_flow.set_genid(genid); - rev_flow.set_genid(genid); - // set the flows in the FORWARD & REVERSE direction for subsequent packets let status = setup_forward_flow(&fw_key, &fw_flow, entry, new_dst_ip, new_dst_port); setup_reverse_flow(&rev_key, &rev_flow, entry, dst_ip, dst_port, status); diff --git a/nat/src/stateful/mod.rs b/nat/src/stateful/mod.rs index 611822bee..b0124ab13 100644 --- a/nat/src/stateful/mod.rs +++ b/nat/src/stateful/mod.rs @@ -195,10 +195,6 @@ impl StatefulNat { ); let flow_info = FlowInfo::new(Self::session_timeout_time(idle_timeout)); - - // label the flow with the current generation id - flow_info.set_genid(self.pipeline_data.genid()); - if let Ok(mut write_guard) = flow_info.locked.write() { write_guard.nat_state = Some(Box::new(state)); write_guard.dst_vpcd = Some(Box::new(dst_vpcd)); diff --git a/net/src/flows/flow_info.rs b/net/src/flows/flow_info.rs index 8b29c8d9b..17b5f6566 100644 --- a/net/src/flows/flow_info.rs +++ b/net/src/flows/flow_info.rs @@ -138,6 +138,13 @@ pub struct FlowInfoLocked { pub port_fw_state: Option>, } +/// Object that represents a flow of packets. +/// `related` is a `Weak` reference to another flow that is related to this one (e.g. +/// a flow in the reverse direction). `FlowKey` is optional, but any flow we store in +/// the flow table gets a key automatically. `genid` is the last generation id where +/// this flow is valid (accepted by the flow-filter). As such, it increases on config +/// changes (if the flow is acceptable under a new configuration), or the flow should +/// no longer have status `Active`. #[derive(Debug)] pub struct FlowInfo { expires_at: AtomicInstant, From 48892fdf414a8ae49ebf6fa4e50d906ca48f74c3 Mon Sep 17 00:00:00 2001 From: Fredi Raspall Date: Mon, 16 Mar 2026 22:33:33 +0100 Subject: [PATCH 10/11] feat(net,nat,flow-info): add methods to flow-info Add convenience methods to set the genid of a flow and its related flow and to invalidate flows in pairs. Signed-off-by: Fredi Raspall --- flow-filter/src/lib.rs | 14 ++------------ nat/src/portfw/flow_state.rs | 12 +++--------- net/src/flows/flow_info.rs | 22 ++++++++++++++++++++++ 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index d0a6905ad..9bd13b3b5 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -229,12 +229,7 @@ impl FlowFilter { debug!("Could not determine dst vpcd. Dropping packet"); // if packet referred to a flow, invalidate it if let Some(flow_info) = packet.meta().flow_info.as_ref() { - flow_info.invalidate(); - flow_info - .related - .as_ref() - .and_then(|r| r.upgrade()) - .inspect(|r| r.invalidate()); + flow_info.invalidate_pair(); } packet.done(DoneReason::Filtered); return; @@ -242,12 +237,7 @@ impl FlowFilter { // packet is allowed. If it refers to a flow, update its genid, and that of the related flow if any if let Some(flow_info) = &packet.meta().flow_info { - flow_info.set_genid(genid); - flow_info - .related - .as_ref() - .and_then(|r| r.upgrade()) - .inspect(|r| r.set_genid(genid)); + flow_info.set_genid_pair(genid); } debug!("{nfi}: Flow {tuple} is allowed, setting packet dst_vpcd to {dst_vpcd}"); diff --git a/nat/src/portfw/flow_state.rs b/nat/src/portfw/flow_state.rs index 97139806a..98ab65cde 100644 --- a/nat/src/portfw/flow_state.rs +++ b/nat/src/portfw/flow_state.rs @@ -217,15 +217,9 @@ pub(crate) fn get_packet_port_fw_state( /// Invalidate the flow that this packet matched and the related one if any. pub(crate) fn invalidate_flow_state(packet: &Packet) { - let Some(flow_info) = packet.meta().flow_info.as_ref() else { - return; - }; - flow_info.invalidate(); - flow_info - .related - .as_ref() - .and_then(Weak::upgrade) - .inspect(|related| related.invalidate()); + if let Some(flow_info) = packet.meta().flow_info.as_ref() { + flow_info.invalidate_pair(); + } } /// Update the port-forwarding state of a flow entry after processing a packet. diff --git a/net/src/flows/flow_info.rs b/net/src/flows/flow_info.rs index 17b5f6566..2b4bfa024 100644 --- a/net/src/flows/flow_info.rs +++ b/net/src/flows/flow_info.rs @@ -185,6 +185,15 @@ impl FlowInfo { self.genid.store(genid, Ordering::Relaxed); } + /// Set the generation Id of a flow and that of its related flow if any + pub fn set_genid_pair(&self, genid: i64) { + self.genid.store(genid, Ordering::Relaxed); + self.related + .as_ref() + .and_then(Weak::upgrade) + .inspect(|r| r.set_genid(genid)); + } + /// Read the generation Id of a flow. pub fn genid(&self) -> i64 { self.genid.load(Ordering::Relaxed) @@ -360,6 +369,19 @@ impl FlowInfo { self.update_status(FlowStatus::Cancelled); } + /// Invalidate a flow and also its related flow if any. + /// + /// # Thread Safety + /// + /// This method is thread-safe. + pub fn invalidate_pair(&self) { + self.invalidate(); + self.related + .as_ref() + .and_then(Weak::upgrade) + .inspect(|related| related.invalidate()); + } + /// Update the flow status. /// /// # Thread Safety From 70828ec00e284782ca37d382c6fbb89361898d8c Mon Sep 17 00:00:00 2001 From: Fredi Raspall Date: Mon, 16 Mar 2026 23:29:47 +0100 Subject: [PATCH 11/11] feat(stateful-nat): build flows in pairs Build the two flows for a stateful NAT session in pairs. This allows invalidating them both on a configuration change. Right now invalidation may not shorten their lifetime, but it will soon. Signed-off-by: Fredi Raspall --- nat/src/stateful/mod.rs | 75 +++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 36 deletions(-) diff --git a/nat/src/stateful/mod.rs b/nat/src/stateful/mod.rs index b0124ab13..1a0a9194a 100644 --- a/nat/src/stateful/mod.rs +++ b/nat/src/stateful/mod.rs @@ -22,7 +22,7 @@ use net::flow_key::{IcmpProtoKey, Uni}; use net::flows::{ExtractRef, FlowInfo}; use net::headers::{Net, Transport, TryIp, TryIpMut, TryTransportMut}; use net::packet::{DoneReason, Packet, VpcDiscriminant}; -use net::{FlowKey, FlowKeyData, IpProtoKey}; +use net::{FlowKey, IpProtoKey}; use pipeline::{NetworkFunction, PipelineData}; use std::fmt::{Debug, Display}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; @@ -173,28 +173,13 @@ impl StatefulNat { Instant::now() + timeout } - fn create_session( - &self, - flow_key: &FlowKey, + fn setup_flow_nat_state( + flow_info: &FlowInfo, state: NatFlowState, dst_vpcd: VpcDiscriminant, - idle_timeout: Duration, ) { - // Clear the destination VPC so we can make lookups without knowing it - let new_flow_key = FlowKey::Unidirectional(FlowKeyData::new( - flow_key.data().src_vpcd(), - *flow_key.data().src_ip(), - *flow_key.data().dst_ip(), - *flow_key.data().proto_key_info(), - )); - debug!( - "{}: Creating new flow session entry: {} -> {}", - self.name(), - new_flow_key.data(), - state - ); - - let flow_info = FlowInfo::new(Self::session_timeout_time(idle_timeout)); + let flow_key = flow_info.flowkey().unwrap_or_else(|| unreachable!()); + debug!("Setting up new flow: {flow_key} -> {state}"); if let Ok(mut write_guard) = flow_info.locked.write() { write_guard.nat_state = Some(Box::new(state)); write_guard.dst_vpcd = Some(Box::new(dst_vpcd)); @@ -202,7 +187,39 @@ impl StatefulNat { // flow info is just locally created unreachable!() } - self.sessions.insert(new_flow_key, flow_info); + } + + fn create_flow_pair( + &self, + packet: &mut Packet, + flow_key: &FlowKey, + alloc: AllocationResult>, + ) -> Result<(), StatefulNatError> { + // Given that at least one of alloc.src or alloc.dst is set, we should always have at least one timeout set. + let idle_timeout = alloc.idle_timeout().unwrap_or_else(|| unreachable!()); + + // src and dst vpc of this packet + let src_vpc_id = packet.meta().src_vpcd.unwrap_or_else(|| unreachable!()); + let dst_vpc_id = packet.meta().dst_vpcd.unwrap_or_else(|| unreachable!()); + + // build key for reverse flow + let reverse_key = Self::new_reverse_session(flow_key, &alloc, dst_vpc_id)?; + + // build NAT state for both flows + let (forward_state, reverse_state) = Self::new_states_from_alloc(alloc, idle_timeout); + + // build a flow pair from the keys (without NAT state) + let expires_at = Self::session_timeout_time(idle_timeout); + let (forward, reverse) = FlowInfo::related_pair(expires_at, *flow_key, reverse_key); + + // set up their NAT state + Self::setup_flow_nat_state(&forward, forward_state, dst_vpc_id); + Self::setup_flow_nat_state(&reverse, reverse_state, src_vpc_id); + + // insert in flow-table + self.sessions.insert_from_arc(*flow_key, &forward); + self.sessions.insert_from_arc(reverse_key, &reverse); + Ok(()) } #[allow(clippy::unnecessary_wraps)] @@ -433,7 +450,6 @@ impl StatefulNat { let flow_key = FlowKey::try_from(Uni(&*packet)).map_err(|_| StatefulNatError::TupleParseError)?; - let src_vpc_id = packet.meta().src_vpcd.unwrap_or_else(|| unreachable!()); let dst_vpc_id = packet.meta().dst_vpcd.unwrap_or_else(|| unreachable!()); // build extended flow key, with the dst vpc discriminant @@ -449,22 +465,9 @@ impl StatefulNat { debug!("{}: Allocated translation data: {alloc}", self.name()); - // Given that at least one of alloc.src or alloc.dst is set, we should always have at - // least one timeout set. - let idle_timeout = alloc.idle_timeout().unwrap_or_else(|| unreachable!()); - let translation_data = Self::get_translation_data(&alloc.src, &alloc.dst); - let reverse_flow_key = Self::new_reverse_session(&flow_key, &alloc, dst_vpc_id)?; - let (forward_state, reverse_state) = Self::new_states_from_alloc(alloc, idle_timeout); - - self.create_session(&flow_key, forward_state, dst_vpc_id, idle_timeout); - self.create_session( - &reverse_flow_key, - reverse_state.clone(), - src_vpc_id, - idle_timeout, - ); + self.create_flow_pair(packet, &flow_key, alloc)?; Self::stateful_translate::(self.name(), packet, &translation_data).and(Ok(true)) }