Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions lading/src/blackhole/tcp_rr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ impl TcpRr {
.collect::<Vec<_>>(),
);

let mut handles = Vec::with_capacity(num_threads as usize);
let metrics_handle = {
let tm = Arc::clone(&thread_metrics);
let labels = self.metric_labels.clone();
Expand All @@ -169,6 +170,7 @@ impl TcpRr {
metrics::run_metrics_thread(&tm, &labels, &flag);
})
};
handles.push(metrics_handle);

// Pre-build thread 0's listener here so the BPF program is attached
// to the reuseport group before any other thread calls bind(). This
Expand All @@ -192,7 +194,6 @@ impl TcpRr {
// instead of hanging forever.
let (ready_tx, mut ready_rx) = mpsc::unbounded_channel::<()>();

let mut handles = Vec::with_capacity(num_threads as usize);
let mut thread0_listener = thread0_listener;
for i in 0..num_threads {
let request_size = self.config.request_size.get();
Expand Down Expand Up @@ -250,8 +251,6 @@ impl TcpRr {
.expect("failed to set control listener nonblocking");
info!("control port listening on {control_addr}, waiting for generator");

handles.push(metrics_handle);

// Accept with shutdown awareness: poll accept in a loop.
let flag = Arc::clone(&shutdown_flag);
let shutdown_clone = self.shutdown.clone();
Expand All @@ -260,39 +259,39 @@ impl TcpRr {
flag.store(true, Relaxed);
});
let mut generator_connected = false;
loop {
let controller_err = loop {
if shutdown_flag.load(Relaxed) {
info!("shutdown before generator connected");
break;
break Ok(());
}
match control_listener.accept() {
Ok((_conn, peer)) => {
info!("generator connected from {peer}, data threads running");
generator_connected = true;
break;
break Ok(());
}
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
tokio::time::sleep(Duration::from_millis(100)).await;
}
Err(e) => {
return Err(Error::Bind {
break Err(Error::Bind {
addr: control_addr,
source: Box::new(e),
});
}
}
}
};
drop(control_listener);

if generator_connected {
if controller_err.is_ok() && generator_connected {
self.shutdown.recv().await;
info!("shutdown signal received");
}
shutdown_flag.store(true, Relaxed);

thread::join_all(handles).map_err(|()| Error::ThreadPanicked)?;

Ok(())
controller_err
}
}

Expand Down
Loading