Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dataplane/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ fn main() {
hostname: gwname.clone(),
processor_params: ConfigProcessorParams {
router_ctl: setup.router.get_ctl_tx(),
pipeline_data: pipeline_factory().get_data(),
vpcmapw: setup.vpcmapw,
nattablesw: setup.nattablesw,
natallocatorw: setup.natallocatorw,
Expand Down
5 changes: 4 additions & 1 deletion dataplane/src/packet_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use nat::stateless::NatTablesWriter;
use nat::{IcmpErrorHandler, StatefulNat, StatelessNat};

use net::buffer::PacketBufferMut;
use pipeline::DynPipeline;
use pipeline::sample_nfs::PacketDumper;
use pipeline::{DynPipeline, PipelineData};

use routing::{Router, RouterError, RouterParams};

Expand Down Expand Up @@ -73,8 +73,10 @@ pub(crate) fn start_router<Buf: PacketBufferMut>(
let nattabler_factory = nattablesw.get_reader_factory();
let natallocator_factory = natallocatorw.get_reader_factory();
let portfw_factory = portfw_w.reader().factory();
let pdata = Arc::from(PipelineData::new(0));

let pipeline_builder = move || {
let pdata_clone = pdata.clone();
// Build network functions
let stage_ingress = Ingress::new("Ingress", iftr_factory.handle());
let stage_egress = Egress::new("Egress", iftr_factory.handle(), atabler_factory.handle());
Expand All @@ -101,6 +103,7 @@ pub(crate) fn start_router<Buf: PacketBufferMut>(
// Build the pipeline for a router. The composition of the pipeline (in stages) is currently
// hard-coded. In any pipeline, the Stats and ExpirationsNF stages should go last
DynPipeline::new()
.set_data(pdata_clone)
.add_stage(stage_ingress)
.add_stage(iprouter1)
.add_stage(flow_lookup)
Expand Down
63 changes: 62 additions & 1 deletion flow-filter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ use net::flows::FlowStatus;
use net::flows::flow_info_item::ExtractRef;
use net::headers::{Transport, TryIp, TryTransport};
use net::packet::{DoneReason, Packet, VpcDiscriminant};
use pipeline::NetworkFunction;
use pipeline::{NetworkFunction, PipelineData};
use std::collections::HashSet;
use std::fmt::{Display, Write};
use std::net::IpAddr;
use std::num::NonZero;
use std::sync::Arc;
use tracing::{debug, error};

mod filter_rw;
Expand All @@ -45,6 +46,7 @@ trace_target!("flow-filter", LevelFilter::INFO, &["pipeline"]);
pub struct FlowFilter {
name: String,
tablesr: FlowFilterTableReader,
pipeline_data: Arc<PipelineData>,
}

impl FlowFilter {
Expand All @@ -53,6 +55,7 @@ impl FlowFilter {
Self {
name: name.to_string(),
tablesr,
pipeline_data: Arc::from(PipelineData::default()),
}
}

Expand Down Expand Up @@ -95,13 +98,58 @@ impl FlowFilter {
Ok(Some(*dst_vpcd))
}

fn bypass_with_flow_info<Buf: PacketBufferMut>(
&self,
packet: &mut Packet<Buf>,
genid: i64,
) -> bool {
let Some(flow_info) = &packet.meta().flow_info else {
debug!("Packet does not contain any flow-info");
return false;
};
let flow_genid = flow_info.genid();
if flow_genid < genid {
debug!("Packet has flow-info ({flow_genid} < {genid}). Need to re-evaluate...");
return false;
}
let status = flow_info.status();
if status != FlowStatus::Active {
debug!("Found flow-info but its status is {status}. Need to re-evaluate...");
return false;
}

let vpcd = flow_info
.locked
.read()
.unwrap()
.dst_vpcd
.as_ref()
.and_then(|d| d.extract_ref::<VpcDiscriminant>())
.copied();

debug!("Packet can bypass filter due to flow {flow_info}");

if set_nat_requirements_from_flow_info(packet).is_err() {
debug!("Failed to set nat requirements");
return false;
}
packet.meta_mut().dst_vpcd = vpcd;
true
}

/// Process a packet.
fn process_packet<Buf: PacketBufferMut>(
&self,
tablesr: &left_right::ReadGuard<'_, FlowFilterTable>,
packet: &mut Packet<Buf>,
) {
let nfi = &self.name;
let genid = self.pipeline_data.genid();

// bypass flow-filter if packet has flow-info and it is not outdated
if self.bypass_with_flow_info(packet, genid) {
return;
}

let Some(net) = packet.try_ip() else {
debug!("{nfi}: No IP headers found, dropping packet");
Expand Down Expand Up @@ -179,10 +227,19 @@ impl FlowFilter {
// Drop the packet since we don't know destination and it is not an icmp error
let Some(dst_vpcd) = dst_vpcd else {
debug!("Could not determine dst vpcd. Dropping packet");
// if packet referred to a flow, invalidate it
if let Some(flow_info) = packet.meta().flow_info.as_ref() {
flow_info.invalidate_pair();
}
packet.done(DoneReason::Filtered);
return;
};

// packet is allowed. If it refers to a flow, update its genid, and that of the related flow if any
if let Some(flow_info) = &packet.meta().flow_info {
flow_info.set_genid_pair(genid);
}

debug!("{nfi}: Flow {tuple} is allowed, setting packet dst_vpcd to {dst_vpcd}");
packet.meta_mut().dst_vpcd = Some(dst_vpcd);
}
Expand Down Expand Up @@ -297,6 +354,10 @@ impl<Buf: PacketBufferMut> NetworkFunction<Buf> for FlowFilter {
packet.enforce()
})
}

fn set_data(&mut self, data: Arc<PipelineData>) {
self.pipeline_data = data;
}
}

// Only used for Display
Expand Down
1 change: 1 addition & 0 deletions mgmt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ k8s-less = { workspace = true }
lpm = { workspace = true }
nat = { workspace = true }
net = { workspace = true }
pipeline = { workspace = true }
rekon = { workspace = true }
routing = { workspace = true }
stats = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions mgmt/src/processor/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use nat::portfw::build_port_forwarding_configuration;
use nat::stateful::NatAllocatorWriter;
use nat::stateless::NatTablesWriter;
use nat::stateless::setup::build_nat_configuration;
use pipeline::PipelineData;

use crate::processor::display::ConfigHistory;
use crate::processor::gwconfigdb::GwConfigDatabase;
Expand Down Expand Up @@ -77,6 +78,9 @@ pub struct ConfigProcessorParams {
// channel to router
pub router_ctl: RouterCtlSender,

// access data associated to pipeline
pub pipeline_data: Arc<PipelineData>,

// writer for vpc mapping table
pub vpcmapw: VpcMapWriter<VpcMapName>,

Expand Down Expand Up @@ -623,6 +627,9 @@ impl ConfigProcessor {
/* apply config in router */
apply_router_config(&kernel_vrfs, config, router_ctl).await?;

/* update the pipeline generation id, iff config was applied */
self.proc_params.pipeline_data.set_genid(genid);

info!("Successfully applied config for genid {genid}");
Ok(())
}
Expand Down
5 changes: 5 additions & 0 deletions mgmt/src/tests/mgmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod test {
use nat::stateless::NatTablesWriter;
use net::eth::mac::Mac;
use net::interface::Mtu;
use pipeline::PipelineData;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::str::FromStr;
Expand Down Expand Up @@ -465,9 +466,13 @@ pub mod test {
/* create VPC stats store (Arc) */
let vpc_stats_store = VpcStatsStore::new();

/* pipeline data */
let pipeline_data = Arc::from(PipelineData::default());

/* build configuration of mgmt config processor */
let processor_config = ConfigProcessorParams {
router_ctl,
pipeline_data,
vpcmapw,
nattablesw,
natallocatorw,
Expand Down
12 changes: 3 additions & 9 deletions nat/src/portfw/flow_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,9 @@ pub(crate) fn get_packet_port_fw_state<Buf: PacketBufferMut>(

/// Invalidate the flow that this packet matched and the related one if any.
pub(crate) fn invalidate_flow_state<Buf: PacketBufferMut>(packet: &Packet<Buf>) {
let Some(flow_info) = packet.meta().flow_info.as_ref() else {
return;
};
flow_info.invalidate();
flow_info
.related
.as_ref()
.and_then(Weak::upgrade)
.inspect(|related| related.invalidate());
if let Some(flow_info) = packet.meta().flow_info.as_ref() {
flow_info.invalidate_pair();
}
}

/// Update the port-forwarding state of a flow entry after processing a packet.
Expand Down
3 changes: 3 additions & 0 deletions nat/src/portfw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ pub use portfwtable::access::{PortFwTableReader, PortFwTableReaderFactory, PortF
pub use portfwtable::objects::{PortFwEntry, PortFwKey, PortFwTable};
pub use portfwtable::portrange::PortRange;
pub use portfwtable::setup::build_port_forwarding_configuration;

use tracectl::trace_target;
trace_target!("port-forwarding", LevelFilter::INFO, &["nat", "pipeline"]);
11 changes: 7 additions & 4 deletions nat/src/portfw/nf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use net::flows::{ExtractMut, ExtractRef, FlowInfo};
use net::headers::{TryIp, TryTcp, TryTransport};
use net::ip::{NextHeader, UnicastIpAddr};
use net::packet::{DoneReason, Packet, VpcDiscriminant};
use pipeline::NetworkFunction;
use pipeline::{NetworkFunction, PipelineData};
use std::num::NonZero;
use std::sync::Arc;
use std::time::Instant;
Expand All @@ -28,14 +28,12 @@ use crate::portfw::packet::{dnat_packet, nat_packet};
#[allow(unused)]
use tracing::{debug, error, trace, warn};

use tracectl::trace_target;
trace_target!("port-forwarding", LevelFilter::INFO, &["nat", "pipeline"]);

/// A port-forwarding network function
pub struct PortForwarder {
name: String,
flow_table: Arc<FlowTable>,
fwtable: PortFwTableReader,
pipeline_data: Arc<PipelineData>,
}

impl PortForwarder {
Expand All @@ -46,6 +44,7 @@ impl PortForwarder {
name: name.to_string(),
flow_table,
fwtable,
pipeline_data: Arc::from(PipelineData::default()),
}
}

Expand Down Expand Up @@ -369,4 +368,8 @@ impl<Buf: PacketBufferMut> NetworkFunction<Buf> for PortForwarder {
packet.enforce()
})
}

fn set_data(&mut self, data: Arc<PipelineData>) {
self.pipeline_data = data;
}
}
Loading
Loading