Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions compiler/rustc_data_structures/src/marker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ already_send!(
[std::sync::atomic::AtomicU32][std::backtrace::Backtrace][std::io::Stdout][std::io::Stderr]
[std::io::Error][std::fs::File][std::panic::Location<'_>][rustc_arena::DroplessArena]
[jobserver_crate::Client][jobserver_crate::HelperThread][crate::memmap::Mmap]
[crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice]
[crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice][crate::sync::Registry]
);

#[cfg(target_has_atomic = "64")]
Expand Down Expand Up @@ -142,7 +142,7 @@ already_sync!(
[std::sync::atomic::AtomicBool][std::sync::atomic::AtomicUsize][std::sync::atomic::AtomicU8]
[std::sync::atomic::AtomicU32][std::backtrace::Backtrace][std::io::Error][std::fs::File][std::panic::Location<'_>]
[jobserver_crate::Client][jobserver_crate::HelperThread][crate::memmap::Mmap]
[crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice]
[crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice][crate::sync::Registry]
);

// Use portable AtomicU64 for targets without native 64-bit atomics
Expand Down
3 changes: 1 addition & 2 deletions compiler/rustc_data_structures/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub use parking_lot::{
MappedRwLockReadGuard as MappedReadGuard, MappedRwLockWriteGuard as MappedWriteGuard,
RwLockReadGuard as ReadGuard, RwLockWriteGuard as WriteGuard,
};
pub use rustc_thread_pool::{Registry, WorkerLocal};

pub use self::atomic::AtomicU64;
pub use self::freeze::{FreezeLock, FreezeReadGuard, FreezeWriteGuard};
Expand All @@ -42,14 +43,12 @@ pub use self::parallel::{
try_par_for_each_in,
};
pub use self::vec::{AppendOnlyIndexVec, AppendOnlyVec};
pub use self::worker_local::{Registry, WorkerLocal};
pub use crate::marker::*;

mod freeze;
mod lock;
mod parallel;
mod vec;
mod worker_local;

/// Keep the conditional imports together in a submodule, so that import-sorting
/// doesn't split them up.
Expand Down
149 changes: 0 additions & 149 deletions compiler/rustc_data_structures/src/sync/worker_local.rs

This file was deleted.

64 changes: 24 additions & 40 deletions compiler/rustc_interface/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,36 +137,36 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send, R:
extra_symbols: &[&'static str],
f: F,
) -> R {
// The "thread pool" is a single spawned thread in the non-parallel
// compiler. We run on a spawned thread instead of the main thread (a) to
// For `WorkerLocal` to function properly we need to create a thread pool.
// Also we run on a spawned thread instead of the main thread (a) to
// provide control over the stack size, and (b) to increase similarity with
// the parallel compiler, in particular to ensure there is no accidental
// sharing of data between the main thread and the compilation thread
// (which might cause problems for the parallel compiler).
let builder = thread::Builder::new().name("rustc".to_string()).stack_size(thread_stack_size);
let builder = rustc_thread_pool::ThreadPoolBuilder::new()
.thread_name(|_| "rustc".to_string())
.num_threads(1)
.stack_size(thread_stack_size);

// We build the session globals and run `f` on the spawned thread, because
// `SessionGlobals` does not impl `Send` in the non-parallel compiler.
thread::scope(|s| {
// `unwrap` is ok here because `spawn_scoped` only panics if the thread
// name contains null bytes.
let r = builder
.spawn_scoped(s, move || {
rustc_span::create_session_globals_then(
edition,
extra_symbols,
Some(sm_inputs),
|| f(CurrentGcx::new(), Proxy::new()),
)
})
.unwrap()
.join();

match r {
Ok(v) => v,
Err(e) => std::panic::resume_unwind(e),
}
})
// `unwrap` is ok here because `build_scoped` just as `std::thread`
// only panics if the thread name contains null bytes.
builder
.build_scoped(
|thread| thread.run(),
move |pool| {
pool.install(|| {
rustc_span::create_session_globals_then(
edition,
extra_symbols,
Some(sm_inputs),
|| f(CurrentGcx::new(), Proxy::new()),
)
})
},
)
.unwrap()
}

pub(crate) fn run_in_thread_pool_with_globals<
Expand All @@ -188,21 +188,8 @@ pub(crate) fn run_in_thread_pool_with_globals<

let thread_stack_size = init_stack_size(thread_builder_diag);

let registry = sync::Registry::new(std::num::NonZero::new(threads).unwrap());

let Some(proof) = sync::check_dyn_thread_safe() else {
return run_in_thread_with_globals(
thread_stack_size,
edition,
sm_inputs,
extra_symbols,
|current_gcx, jobserver_proxy| {
// Register the thread for use with the `WorkerLocal` type.
registry.register();

f(current_gcx, jobserver_proxy)
},
);
return run_in_thread_with_globals(thread_stack_size, edition, sm_inputs, extra_symbols, f);
};

let current_gcx = proof.derive(CurrentGcx::new());
Expand Down Expand Up @@ -282,9 +269,6 @@ internal compiler error: query cycle handler thread panicked, aborting process";
.build_scoped(
// Initialize each new worker thread when created.
move |thread: rustc_thread_pool::ThreadBuilder| {
// Register the thread for use with the `WorkerLocal` type.
registry.register();

rustc_span::set_session_globals_then(session_globals.into_inner(), || {
thread.run()
})
Expand Down
4 changes: 2 additions & 2 deletions compiler/rustc_thread_pool/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ where
}

pub struct Registry {
thread_infos: Vec<ThreadInfo>,
thread_infos: Box<[ThreadInfo]>,
sleep: Sleep,
injected_jobs: Injector<JobRef>,
broadcasts: Mutex<Vec<Worker<JobRef>>>,
Expand Down Expand Up @@ -989,7 +989,7 @@ impl WorkerThread {
debug_assert!(self.local_deque_is_empty());

// otherwise, try to steal
let thread_infos = &self.registry.thread_infos.as_slice();
let thread_infos = &*self.registry.thread_infos;
let num_threads = thread_infos.len();
if num_threads <= 1 {
return None;
Expand Down
40 changes: 25 additions & 15 deletions compiler/rustc_thread_pool/src/worker_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ struct CacheAligned<T>(T);
/// You can only access the worker local value through the Deref impl
/// on the thread pool it was constructed on. It will panic otherwise
pub struct WorkerLocal<T> {
locals: Vec<CacheAligned<T>>,
locals: Box<[CacheAligned<T>]>,
registry: Arc<Registry>,
}

Expand All @@ -35,21 +35,15 @@ impl<T> WorkerLocal<T> {

/// Returns the worker-local value for each thread
#[inline]
pub fn into_inner(self) -> Vec<T> {
self.locals.into_iter().map(|c| c.0).collect()
pub fn into_inner(self) -> impl Iterator<Item = T> {
self.locals.into_vec().into_iter().map(|local| local.0)
}
}

fn current(&self) -> &T {
unsafe {
let worker_thread = WorkerThread::current();
if worker_thread.is_null()
|| !std::ptr::eq(&*(*worker_thread).registry, &*self.registry)
{
panic!("WorkerLocal can only be used on the thread pool it was created on")
}
&self.locals[(*worker_thread).index].0
}
}
#[inline(never)]
#[cold]
fn panic_different_registry() -> ! {
panic!("WorkerLocal can only be used on the thread pool it was created on")
}

impl<T> WorkerLocal<Vec<T>> {
Expand All @@ -70,6 +64,22 @@ impl<T> Deref for WorkerLocal<T> {

#[inline(always)]
fn deref(&self) -> &T {
self.current()
unsafe {
let worker_thread = WorkerThread::current();
if worker_thread.is_null()
|| !std::ptr::eq(&*(*worker_thread).registry, &*self.registry)
{
panic_different_registry()
}
// SAFETY: `verify` will only return values less than
// `self.registry.num_threads` which is the size of the `self.locals` array.
&self.locals.get_unchecked((*worker_thread).index).0
}
}
}

impl<T: Default> Default for WorkerLocal<T> {
fn default() -> Self {
WorkerLocal::new(|_| Default::default())
}
}
Loading