diff --git a/compiler/rustc_data_structures/src/marker.rs b/compiler/rustc_data_structures/src/marker.rs index 997077ac4402e..cfe21abca5bb9 100644 --- a/compiler/rustc_data_structures/src/marker.rs +++ b/compiler/rustc_data_structures/src/marker.rs @@ -63,7 +63,7 @@ already_send!( [std::sync::atomic::AtomicU32][std::backtrace::Backtrace][std::io::Stdout][std::io::Stderr] [std::io::Error][std::fs::File][std::panic::Location<'_>][rustc_arena::DroplessArena] [jobserver_crate::Client][jobserver_crate::HelperThread][crate::memmap::Mmap] - [crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice] + [crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice][crate::sync::Registry] ); #[cfg(target_has_atomic = "64")] @@ -142,7 +142,7 @@ already_sync!( [std::sync::atomic::AtomicBool][std::sync::atomic::AtomicUsize][std::sync::atomic::AtomicU8] [std::sync::atomic::AtomicU32][std::backtrace::Backtrace][std::io::Error][std::fs::File][std::panic::Location<'_>] [jobserver_crate::Client][jobserver_crate::HelperThread][crate::memmap::Mmap] - [crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice] + [crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice][crate::sync::Registry] ); // Use portable AtomicU64 for targets without native 64-bit atomics diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs index 3d5bc85278286..b71a59a0052c2 100644 --- a/compiler/rustc_data_structures/src/sync.rs +++ b/compiler/rustc_data_structures/src/sync.rs @@ -29,6 +29,7 @@ pub use parking_lot::{ MappedRwLockReadGuard as MappedReadGuard, MappedRwLockWriteGuard as MappedWriteGuard, RwLockReadGuard as ReadGuard, RwLockWriteGuard as WriteGuard, }; +pub use rustc_thread_pool::{Registry, WorkerLocal}; pub use self::atomic::AtomicU64; pub use self::freeze::{FreezeLock, FreezeReadGuard, FreezeWriteGuard}; @@ -42,14 +43,12 @@ pub use self::parallel::{ try_par_for_each_in, }; pub use self::vec::{AppendOnlyIndexVec, AppendOnlyVec}; -pub use self::worker_local::{Registry, WorkerLocal}; pub use crate::marker::*; mod freeze; mod lock; mod parallel; mod vec; -mod worker_local; /// Keep the conditional imports together in a submodule, so that import-sorting /// doesn't split them up. diff --git a/compiler/rustc_data_structures/src/sync/worker_local.rs b/compiler/rustc_data_structures/src/sync/worker_local.rs deleted file mode 100644 index d75af00985047..0000000000000 --- a/compiler/rustc_data_structures/src/sync/worker_local.rs +++ /dev/null @@ -1,149 +0,0 @@ -use std::cell::{Cell, OnceCell}; -use std::num::NonZero; -use std::ops::Deref; -use std::ptr; -use std::sync::Arc; - -use parking_lot::Mutex; - -use crate::outline; -use crate::sync::CacheAligned; - -/// A pointer to the `RegistryData` which uniquely identifies a registry. -/// This identifier can be reused if the registry gets freed. -#[derive(Clone, Copy, PartialEq)] -struct RegistryId(*const RegistryData); - -impl RegistryId { - #[inline(always)] - /// Verifies that the current thread is associated with the registry and returns its unique - /// index within the registry. This panics if the current thread is not associated with this - /// registry. - /// - /// Note that there's a race possible where the identifier in `THREAD_DATA` could be reused - /// so this can succeed from a different registry. - fn verify(self) -> usize { - let (id, index) = THREAD_DATA.with(|data| (data.registry_id.get(), data.index.get())); - - if id == self { index } else { outline(|| panic!("Unable to verify registry association")) } - } -} - -struct RegistryData { - thread_limit: NonZero, - threads: Mutex, -} - -/// Represents a list of threads which can access worker locals. -#[derive(Clone)] -pub struct Registry(Arc); - -thread_local! { - /// The registry associated with the thread. - /// This allows the `WorkerLocal` type to clone the registry in its constructor. - static REGISTRY: OnceCell = const { OnceCell::new() }; -} - -struct ThreadData { - registry_id: Cell, - index: Cell, -} - -thread_local! { - /// A thread local which contains the identifier of `REGISTRY` but allows for faster access. - /// It also holds the index of the current thread. - static THREAD_DATA: ThreadData = const { ThreadData { - registry_id: Cell::new(RegistryId(ptr::null())), - index: Cell::new(0), - }}; -} - -impl Registry { - /// Creates a registry which can hold up to `thread_limit` threads. - pub fn new(thread_limit: NonZero) -> Self { - Registry(Arc::new(RegistryData { thread_limit, threads: Mutex::new(0) })) - } - - /// Gets the registry associated with the current thread. Panics if there's no such registry. - pub fn current() -> Self { - REGISTRY.with(|registry| registry.get().cloned().expect("No associated registry")) - } - - /// Registers the current thread with the registry so worker locals can be used on it. - /// Panics if the thread limit is hit or if the thread already has an associated registry. - pub fn register(&self) { - let mut threads = self.0.threads.lock(); - if *threads < self.0.thread_limit.get() { - REGISTRY.with(|registry| { - if registry.get().is_some() { - drop(threads); - panic!("Thread already has a registry"); - } - registry.set(self.clone()).ok(); - THREAD_DATA.with(|data| { - data.registry_id.set(self.id()); - data.index.set(*threads); - }); - *threads += 1; - }); - } else { - drop(threads); - panic!("Thread limit reached"); - } - } - - /// Gets the identifier of this registry. - fn id(&self) -> RegistryId { - RegistryId(&*self.0) - } -} - -/// Holds worker local values for each possible thread in a registry. You can only access the -/// worker local value through the `Deref` impl on the registry associated with the thread it was -/// created on. It will panic otherwise. -pub struct WorkerLocal { - locals: Box<[CacheAligned]>, - registry: Registry, -} - -// This is safe because the `deref` call will return a reference to a `T` unique to each thread -// or it will panic for threads without an associated local. So there isn't a need for `T` to do -// it's own synchronization. The `verify` method on `RegistryId` has an issue where the id -// can be reused, but `WorkerLocal` has a reference to `Registry` which will prevent any reuse. -unsafe impl Sync for WorkerLocal {} - -impl WorkerLocal { - /// Creates a new worker local where the `initial` closure computes the - /// value this worker local should take for each thread in the registry. - #[inline] - pub fn new T>(mut initial: F) -> WorkerLocal { - let registry = Registry::current(); - WorkerLocal { - locals: (0..registry.0.thread_limit.get()).map(|i| CacheAligned(initial(i))).collect(), - registry, - } - } - - /// Returns the worker-local values for each thread - #[inline] - pub fn into_inner(self) -> impl Iterator { - self.locals.into_vec().into_iter().map(|local| local.0) - } -} - -impl Deref for WorkerLocal { - type Target = T; - - #[inline(always)] - fn deref(&self) -> &T { - // This is safe because `verify` will only return values less than - // `self.registry.thread_limit` which is the size of the `self.locals` array. - unsafe { &self.locals.get_unchecked(self.registry.id().verify()).0 } - } -} - -impl Default for WorkerLocal { - fn default() -> Self { - WorkerLocal::new(|_| T::default()) - } -} diff --git a/compiler/rustc_interface/src/util.rs b/compiler/rustc_interface/src/util.rs index 24b23cc4199e9..7c462c5cc43f3 100644 --- a/compiler/rustc_interface/src/util.rs +++ b/compiler/rustc_interface/src/util.rs @@ -137,36 +137,36 @@ fn run_in_thread_with_globals) -> R + Send, R: extra_symbols: &[&'static str], f: F, ) -> R { - // The "thread pool" is a single spawned thread in the non-parallel - // compiler. We run on a spawned thread instead of the main thread (a) to + // For `WorkerLocal` to function properly we need to create a thread pool. + // Also we run on a spawned thread instead of the main thread (a) to // provide control over the stack size, and (b) to increase similarity with // the parallel compiler, in particular to ensure there is no accidental // sharing of data between the main thread and the compilation thread // (which might cause problems for the parallel compiler). - let builder = thread::Builder::new().name("rustc".to_string()).stack_size(thread_stack_size); + let builder = rustc_thread_pool::ThreadPoolBuilder::new() + .thread_name(|_| "rustc".to_string()) + .num_threads(1) + .stack_size(thread_stack_size); // We build the session globals and run `f` on the spawned thread, because // `SessionGlobals` does not impl `Send` in the non-parallel compiler. - thread::scope(|s| { - // `unwrap` is ok here because `spawn_scoped` only panics if the thread - // name contains null bytes. - let r = builder - .spawn_scoped(s, move || { - rustc_span::create_session_globals_then( - edition, - extra_symbols, - Some(sm_inputs), - || f(CurrentGcx::new(), Proxy::new()), - ) - }) - .unwrap() - .join(); - - match r { - Ok(v) => v, - Err(e) => std::panic::resume_unwind(e), - } - }) + // `unwrap` is ok here because `build_scoped` just as `std::thread` + // only panics if the thread name contains null bytes. + builder + .build_scoped( + |thread| thread.run(), + move |pool| { + pool.install(|| { + rustc_span::create_session_globals_then( + edition, + extra_symbols, + Some(sm_inputs), + || f(CurrentGcx::new(), Proxy::new()), + ) + }) + }, + ) + .unwrap() } pub(crate) fn run_in_thread_pool_with_globals< @@ -188,21 +188,8 @@ pub(crate) fn run_in_thread_pool_with_globals< let thread_stack_size = init_stack_size(thread_builder_diag); - let registry = sync::Registry::new(std::num::NonZero::new(threads).unwrap()); - let Some(proof) = sync::check_dyn_thread_safe() else { - return run_in_thread_with_globals( - thread_stack_size, - edition, - sm_inputs, - extra_symbols, - |current_gcx, jobserver_proxy| { - // Register the thread for use with the `WorkerLocal` type. - registry.register(); - - f(current_gcx, jobserver_proxy) - }, - ); + return run_in_thread_with_globals(thread_stack_size, edition, sm_inputs, extra_symbols, f); }; let current_gcx = proof.derive(CurrentGcx::new()); @@ -282,9 +269,6 @@ internal compiler error: query cycle handler thread panicked, aborting process"; .build_scoped( // Initialize each new worker thread when created. move |thread: rustc_thread_pool::ThreadBuilder| { - // Register the thread for use with the `WorkerLocal` type. - registry.register(); - rustc_span::set_session_globals_then(session_globals.into_inner(), || { thread.run() }) diff --git a/compiler/rustc_thread_pool/src/registry.rs b/compiler/rustc_thread_pool/src/registry.rs index 9510c1842f86a..39bdf8c07457e 100644 --- a/compiler/rustc_thread_pool/src/registry.rs +++ b/compiler/rustc_thread_pool/src/registry.rs @@ -126,7 +126,7 @@ where } pub struct Registry { - thread_infos: Vec, + thread_infos: Box<[ThreadInfo]>, sleep: Sleep, injected_jobs: Injector, broadcasts: Mutex>>, @@ -989,7 +989,7 @@ impl WorkerThread { debug_assert!(self.local_deque_is_empty()); // otherwise, try to steal - let thread_infos = &self.registry.thread_infos.as_slice(); + let thread_infos = &*self.registry.thread_infos; let num_threads = thread_infos.len(); if num_threads <= 1 { return None; diff --git a/compiler/rustc_thread_pool/src/worker_local.rs b/compiler/rustc_thread_pool/src/worker_local.rs index 912001233bfea..23ffde39b8a9a 100644 --- a/compiler/rustc_thread_pool/src/worker_local.rs +++ b/compiler/rustc_thread_pool/src/worker_local.rs @@ -12,7 +12,7 @@ struct CacheAligned(T); /// You can only access the worker local value through the Deref impl /// on the thread pool it was constructed on. It will panic otherwise pub struct WorkerLocal { - locals: Vec>, + locals: Box<[CacheAligned]>, registry: Arc, } @@ -35,21 +35,15 @@ impl WorkerLocal { /// Returns the worker-local value for each thread #[inline] - pub fn into_inner(self) -> Vec { - self.locals.into_iter().map(|c| c.0).collect() + pub fn into_inner(self) -> impl Iterator { + self.locals.into_vec().into_iter().map(|local| local.0) } +} - fn current(&self) -> &T { - unsafe { - let worker_thread = WorkerThread::current(); - if worker_thread.is_null() - || !std::ptr::eq(&*(*worker_thread).registry, &*self.registry) - { - panic!("WorkerLocal can only be used on the thread pool it was created on") - } - &self.locals[(*worker_thread).index].0 - } - } +#[inline(never)] +#[cold] +fn panic_different_registry() -> ! { + panic!("WorkerLocal can only be used on the thread pool it was created on") } impl WorkerLocal> { @@ -70,6 +64,22 @@ impl Deref for WorkerLocal { #[inline(always)] fn deref(&self) -> &T { - self.current() + unsafe { + let worker_thread = WorkerThread::current(); + if worker_thread.is_null() + || !std::ptr::eq(&*(*worker_thread).registry, &*self.registry) + { + panic_different_registry() + } + // SAFETY: `verify` will only return values less than + // `self.registry.num_threads` which is the size of the `self.locals` array. + &self.locals.get_unchecked((*worker_thread).index).0 + } + } +} + +impl Default for WorkerLocal { + fn default() -> Self { + WorkerLocal::new(|_| Default::default()) } }