From 573f78bb0452fba74988bf3e3f5a5e2f5b8877e1 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Wed, 6 May 2026 16:09:27 +0200 Subject: [PATCH 1/3] wait on metrics handle on error path --- lading/src/blackhole/tcp_rr.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lading/src/blackhole/tcp_rr.rs b/lading/src/blackhole/tcp_rr.rs index b81fc513c..edccbd634 100644 --- a/lading/src/blackhole/tcp_rr.rs +++ b/lading/src/blackhole/tcp_rr.rs @@ -161,6 +161,7 @@ impl TcpRr { .collect::>(), ); + let mut handles = Vec::with_capacity(num_threads as usize); let metrics_handle = { let tm = Arc::clone(&thread_metrics); let labels = self.metric_labels.clone(); @@ -169,6 +170,7 @@ impl TcpRr { metrics::run_metrics_thread(&tm, &labels, &flag); }) }; + handles.push(metrics_handle); // Pre-build thread 0's listener here so the BPF program is attached // to the reuseport group before any other thread calls bind(). This @@ -192,7 +194,6 @@ impl TcpRr { // instead of hanging forever. let (ready_tx, mut ready_rx) = mpsc::unbounded_channel::<()>(); - let mut handles = Vec::with_capacity(num_threads as usize); let mut thread0_listener = thread0_listener; for i in 0..num_threads { let request_size = self.config.request_size.get(); @@ -250,7 +251,6 @@ impl TcpRr { .expect("failed to set control listener nonblocking"); info!("control port listening on {control_addr}, waiting for generator"); - handles.push(metrics_handle); // Accept with shutdown awareness: poll accept in a loop. let flag = Arc::clone(&shutdown_flag); From 458fbd0363349b57d63d3e97d0a4d3eb94cc4523 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Wed, 6 May 2026 16:16:42 +0200 Subject: [PATCH 2/3] on error when accepting connection on control port tear down correctly --- lading/src/blackhole/tcp_rr.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lading/src/blackhole/tcp_rr.rs b/lading/src/blackhole/tcp_rr.rs index edccbd634..225f23a0f 100644 --- a/lading/src/blackhole/tcp_rr.rs +++ b/lading/src/blackhole/tcp_rr.rs @@ -260,31 +260,31 @@ impl TcpRr { flag.store(true, Relaxed); }); let mut generator_connected = false; - loop { + let controller_err = loop { if shutdown_flag.load(Relaxed) { info!("shutdown before generator connected"); - break; + break Ok(()); } match control_listener.accept() { Ok((_conn, peer)) => { info!("generator connected from {peer}, data threads running"); generator_connected = true; - break; + break Ok(()); } Err(ref e) if e.kind() == ErrorKind::WouldBlock => { tokio::time::sleep(Duration::from_millis(100)).await; } Err(e) => { - return Err(Error::Bind { + break Err(Error::Bind { addr: control_addr, source: Box::new(e), }); } } - } + }; drop(control_listener); - if generator_connected { + if controller_err.is_ok() && generator_connected { self.shutdown.recv().await; info!("shutdown signal received"); } @@ -292,7 +292,7 @@ impl TcpRr { thread::join_all(handles).map_err(|()| Error::ThreadPanicked)?; - Ok(()) + controller_err } } From c479cfda391a091d904724fb84f0d9f23d22dc41 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Wed, 6 May 2026 16:18:29 +0200 Subject: [PATCH 3/3] format --- lading/src/blackhole/tcp_rr.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/lading/src/blackhole/tcp_rr.rs b/lading/src/blackhole/tcp_rr.rs index 225f23a0f..330a9331b 100644 --- a/lading/src/blackhole/tcp_rr.rs +++ b/lading/src/blackhole/tcp_rr.rs @@ -251,7 +251,6 @@ impl TcpRr { .expect("failed to set control listener nonblocking"); info!("control port listening on {control_addr}, waiting for generator"); - // Accept with shutdown awareness: poll accept in a loop. let flag = Arc::clone(&shutdown_flag); let shutdown_clone = self.shutdown.clone();