diff --git a/lading/src/blackhole/tcp_rr.rs b/lading/src/blackhole/tcp_rr.rs index b81fc513c..330a9331b 100644 --- a/lading/src/blackhole/tcp_rr.rs +++ b/lading/src/blackhole/tcp_rr.rs @@ -161,6 +161,7 @@ impl TcpRr { .collect::>(), ); + let mut handles = Vec::with_capacity(num_threads as usize); let metrics_handle = { let tm = Arc::clone(&thread_metrics); let labels = self.metric_labels.clone(); @@ -169,6 +170,7 @@ impl TcpRr { metrics::run_metrics_thread(&tm, &labels, &flag); }) }; + handles.push(metrics_handle); // Pre-build thread 0's listener here so the BPF program is attached // to the reuseport group before any other thread calls bind(). This @@ -192,7 +194,6 @@ impl TcpRr { // instead of hanging forever. let (ready_tx, mut ready_rx) = mpsc::unbounded_channel::<()>(); - let mut handles = Vec::with_capacity(num_threads as usize); let mut thread0_listener = thread0_listener; for i in 0..num_threads { let request_size = self.config.request_size.get(); @@ -250,8 +251,6 @@ impl TcpRr { .expect("failed to set control listener nonblocking"); info!("control port listening on {control_addr}, waiting for generator"); - handles.push(metrics_handle); - // Accept with shutdown awareness: poll accept in a loop. let flag = Arc::clone(&shutdown_flag); let shutdown_clone = self.shutdown.clone(); @@ -260,31 +259,31 @@ impl TcpRr { flag.store(true, Relaxed); }); let mut generator_connected = false; - loop { + let controller_err = loop { if shutdown_flag.load(Relaxed) { info!("shutdown before generator connected"); - break; + break Ok(()); } match control_listener.accept() { Ok((_conn, peer)) => { info!("generator connected from {peer}, data threads running"); generator_connected = true; - break; + break Ok(()); } Err(ref e) if e.kind() == ErrorKind::WouldBlock => { tokio::time::sleep(Duration::from_millis(100)).await; } Err(e) => { - return Err(Error::Bind { + break Err(Error::Bind { addr: control_addr, source: Box::new(e), }); } } - } + }; drop(control_listener); - if generator_connected { + if controller_err.is_ok() && generator_connected { self.shutdown.recv().await; info!("shutdown signal received"); } @@ -292,7 +291,7 @@ impl TcpRr { thread::join_all(handles).map_err(|()| Error::ThreadPanicked)?; - Ok(()) + controller_err } }