diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7770a40..f41ef94 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,7 +43,7 @@ jobs: --target ${{ env.NO_STD_TARGET }} --no-dev-deps --feature-powerset - --skip yield,thread_local + --skip yield,thread_local,parking msrv: name: MSRV @@ -108,6 +108,8 @@ jobs: run: cargo run --example thread_local --features thread_local - name: Run lock_api example run: cargo run --example lock_api --features lock_api,barging + - name: Run parking with thread_local example + run: cargo run --example parking --features parking,thread_local linter: name: Linter diff --git a/Cargo.toml b/Cargo.toml index 54d1a4e..903c782 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,13 +15,18 @@ categories = ["algorithms", "concurrency", "no-std", "no-std::no-alloc"] keywords = ["mutex", "no_std", "spinlock", "synchronization"] [features] -# NOTE: Features `yield`, `thread_local` require std. +# NOTE: Features `yield`, `thread_local` and `parking` require std. yield = [] thread_local = [] barging = [] # NOTE: The `dep:` syntax requires Rust 1.60. +parking = ["dep:atomic-wait"] lock_api = ["dep:lock_api"] +[dependencies.atomic-wait] +version = "1" +optional = true + [dependencies.lock_api] version = "0.4" default-features = false @@ -42,6 +47,10 @@ check-cfg = ["cfg(loom)", "cfg(tarpaulin)", "cfg(tarpaulin_include)"] name = "barging" required-features = ["barging"] +[[example]] +name = "parking" +required-features = ["parking", "thread_local"] + [[example]] name = "thread_local" required-features = ["thread_local"] diff --git a/Makefile.toml b/Makefile.toml index 36b5797..831f103 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -23,7 +23,7 @@ args = [ "--feature-powerset", "--no-dev-deps", "--skip", - "yield,thread_local", + "yield,thread_local,parking", ] dependencies = ["install-no-std-target"] diff --git a/README.md b/README.md index 8d0046d..5a37aeb 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,8 @@ ![No_std][no_std-badge] MCS lock is a List-Based Queuing Lock that avoids network contention by having -threads spin on local memory locations. The main properties of this mechanism are: +threads spin and/or park on local memory locations. The main properties of this +mechanism are: - guarantees FIFO ordering of lock acquisitions; - spins on locally-accessible flag variables only; @@ -24,9 +25,10 @@ paper. And a simpler correctness proof of the MCS lock was proposed by ## Spinlock use cases It is noteworthy to mention that [spinlocks are usually not what you want]. The -majority of use cases are well covered by OS-based mutexes like [`std::sync::Mutex`] -and [`parking_lot::Mutex`]. These implementations will notify the system that the -waiting thread should be parked, freeing the processor to work on something else. +majority of use cases are well covered by OS-based mutexes like +[`std::sync::Mutex`] or [`parking_lot::Mutex`] or even this crate's [`parking`] +Mutexes. These implementations will notify the system that the waiting thread +should be parked, freeing the processor to work on something else. Spinlocks are only efficient in very few circumstances where the overhead of context switching or process rescheduling are greater than busy waiting @@ -160,6 +162,42 @@ fn main() { } ``` +## Locking with thread parking MCS locks + +This crate also supports MCS lock implementations that will put the blocking +threads to sleep. All `no_std` flavors: `raw`, `barging` have matching `Mutex` +types under the [`parking`] module, with corresponding paths and public APIs, +that are thread parking capable. These implementations are not `no_std` +compatible. See [`parking`] module for more information. + +```rust +use std::sync::Arc; +use std::thread; + +// Requires `parking` feature. +// Spins for a while then parks during contention. +use mcslock::parking::raw::{spins::Mutex, MutexNode}; + +// Requires `parking` and `thread_local` features. +mcslock::thread_local_parking_node! { static NODE } + +fn main() { + let mutex = Arc::new(Mutex::new(0)); + let c_mutex = Arc::clone(&mutex); + + thread::spawn(move || { + // Local node handles are provided by reference. + // Critical section must be defined as a closure. + c_mutex.lock_with_local_then(&NODE, |data| *data = 10); + }) + .join().expect("thread::spawn failed"); + + // A node may also be transparently allocated in the stack. + // Critical section must be defined as a closure. + assert_eq!(mutex.try_lock_then(|data| *data.unwrap()), 10); +} +``` + ## Features This crate dos not provide any default features. Features that can be enabled @@ -178,26 +216,37 @@ just simply busy-waits. This feature is not `no_std` compatible. ### thread_local -The `thread_local` feature enables [`raw::Mutex`] locking APIs that operate over -queue nodes that are stored at the thread local storage. These locking APIs -require a static reference to [`raw::LocalMutexNode`] keys. Keys must be generated -by the [`thread_local_node!`] macro. This feature also enables memory optimizations -for [`barging::Mutex`] locking operations. This feature is not `no_std` +The `thread_local` feature enables [`raw::Mutex`] and [`parking::raw::Mutex`] +locking APIs that operate over queue nodes that are stored at the thread local +storage. These locking APIs require a static reference to [`raw::LocalMutexNode`] +and [`parking::raw::LocalMutexNode`] keys respectively. Keys must be generated +by the [`thread_local_node!`] and [`thread_local_parking_node!`] macros. This +feature also enables memory optimizations for [`barging::Mutex`] and +[`parking::barging::Mutex`] locking operations. This feature is not `no_std` compatible. ### barging -The `barging` feature provides locking APIs that are compatible with the -[lock_api] crate. It does not require node allocations from the caller. -The [`barging`] module is suitable for `no_std` environments. This implementation -is not fair (does not guarantee FIFO), but can improve throughput when the lock -is heavily contended. +The `barging` feature provides locking APIs that are compatible with the [lock_api] +crate. It does not require node allocations from the caller. The [`barging`] module +is suitable for `no_std` environments, but [`parking::barging`] is not. This +implementation is not fair (does not guarantee FIFO), but can improve throughput +when the lock is heavily contended. ### lock_api This feature implements the [`RawMutex`] trait from the [lock_api] crate for -[`barging::Mutex`]. Aliases are provided by the [`barging::lock_api`] (`no_std`) -module. The `lock_api` public dependency is set to version 0.4. +both [`barging::Mutex`] and [`parking::barging::Mutex`]. Aliases are provided by +the [`barging::lock_api`] (`no_std`) and [`parking::barging::lock_api`] modules. +The [lock_api] public dependency is set to version 0.4. + +### parking + +The `parking` feature provides Mutex implementations that are capable of putting +blocking threads waiting for the lock to sleep. These implementations are +published under the [`parking`] module. Each `no_std` mutex flavors provided +by this crate have corresponding parking implementations under that module. +Users may select a out of the box parking policy at [`parking::park`]. ## Minimum Supported Rust Version (MSRV) @@ -260,10 +309,18 @@ each of your dependencies, including this one. [`raw::Mutex`]: https://docs.rs/mcslock/latest/mcslock/raw/struct.Mutex.html [`raw::MutexNode`]: https://docs.rs/mcslock/latest/mcslock/raw/struct.MutexNode.html [`raw::LocalMutexNode`]: https://docs.rs/mcslock/latest/mcslock/raw/struct.LocalMutexNode.html +[`parking`]: https://docs.rs/mcslock/latest/mcslock/parking/index.html +[`parking::park`]: https://docs.rs/mcslock/latest/mcslock/parking/park/index.html +[`parking::barging`]: https://docs.rs/mcslock/latest/mcslock/parking/barging/index.html +[`parking::lock_api`]: https://docs.rs/mcslock/latest/mcslock/parking/lock_api/index.html +[`parking::raw::Mutex`]: https://docs.rs/mcslock/latest/mcslock/parking/raw/struct.Mutex.html +[`parking::raw::LocalMutexNode`]: https://docs.rs/mcslock/latest/mcslock/parking/raw/struct.LocalMutexNode.html +[`parking::barging::Mutex`]: https://docs.rs/mcslock/latest/mcslock/parking/barging/struct.Mutex.html [`barging`]: https://docs.rs/mcslock/latest/mcslock/barging/index.html [`barging::lock_api`]: https://docs.rs/mcslock/latest/mcslock/barging/lock_api/index.html [`barging::Mutex`]: https://docs.rs/mcslock/latest/mcslock/barging/struct.Mutex.html [`thread_local_node!`]: https://docs.rs/mcslock/latest/mcslock/macro.thread_local_node.html +[`thread_local_parking_node!`]: https://docs.rs/mcslock/latest/mcslock/macro.thread_local_parking_node.html [`std::sync::Mutex`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html [`std::thread::yield_now`]: https://doc.rust-lang.org/std/thread/fn.yield_now.html diff --git a/examples/parking.rs b/examples/parking.rs new file mode 100644 index 0000000..177ce19 --- /dev/null +++ b/examples/parking.rs @@ -0,0 +1,56 @@ +use std::sync::mpsc::channel; +use std::sync::Arc; +use std::thread; + +// Requires `parking` feature. +// `spins::Mutex` spins for a while then parks during contention. +use mcslock::parking::raw::{spins::Mutex, MutexNode}; + +// Requires that the `thread_local` feature is enabled. +mcslock::thread_local_parking_node! { + // * Allows multiple static declarations, must be separated with semicolons. + // * Visibility is optional (private by default). + // * Requires `static` keyword and a UPPER_SNAKE_CASE name. + pub static NODE; + static UNUSED_NODE; +} + +fn main() { + const N: usize = 10; + + // Spawn a few threads to increment a shared variable (non-atomically), and + // let the main thread know once all increments are done. + // + // Here we're using an Arc to share memory among threads, and the data inside + // the Arc is protected with a mutex. + let data = Arc::new(Mutex::new(0)); + + let (tx, rx) = channel(); + for _ in 0..N { + let (data, tx) = (data.clone(), tx.clone()); + thread::spawn(move || { + // A queue node must be mutably accessible. + let mut node = MutexNode::new(); + // The shared state can only be accessed once the lock is held. + // Our non-atomic increment is safe because we're the only thread + // which can access the shared state when the lock is held. + // + // We unwrap() the return value to assert that we are not expecting + // threads to ever fail while holding the lock. + data.lock_with_then(&mut node, |data| { + *data += 1; + if *data == N { + tx.send(()).unwrap(); + } + // The lock is unlocked here at the end of the closure scope. + }); + // The node can now be reused for other locking operations. + let _ = data.lock_with_then(&mut node, |data| *data); + }); + } + let _message = rx.recv(); + + // A thread local node is borrowed. + let count = data.lock_with_local_then(&NODE, |data| *data); + assert_eq!(count, N); +} diff --git a/examples/thread_local.rs b/examples/thread_local.rs index f5e5fe0..3773821 100644 --- a/examples/thread_local.rs +++ b/examples/thread_local.rs @@ -35,6 +35,7 @@ fn main() { // threads to ever fail while holding the lock. // // Data is exclusively accessed by the guard argument. + // A thread local node is borrowed. data.lock_with_local_then(&NODE, |data| { *data += 1; if *data == N { @@ -46,6 +47,7 @@ fn main() { } let _message = rx.recv(); + // A thread local node is borrowed. let count = data.lock_with_local_then(&NODE, |data| *data); assert_eq!(count, N); } diff --git a/src/inner/raw/mod.rs b/src/inner/raw/mod.rs index 905cb15..e09ddf5 100644 --- a/src/inner/raw/mod.rs +++ b/src/inner/raw/mod.rs @@ -353,7 +353,7 @@ impl core::ops::Deref for MutexGuard<'_, T, L, W> { } #[cfg(not(all(loom, test)))] -#[cfg(not(tarpaulin_include))] +#[cfg(not(tarpaulin))] impl core::ops::DerefMut for MutexGuard<'_, T, L, W> { /// Mutably dereferences the guard to access the underlying data. #[inline(always)] diff --git a/src/lib.rs b/src/lib.rs index d0ecaad..6077f16 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,8 +2,8 @@ //! contention-free [lock] for mutual exclusion, referred to as MCS lock. //! //! MCS lock is a List-Based Queuing Lock that avoids network contention by -//! having threads spin on local memory locations. The main properties of this -//! mechanism are: +//! having threads spin and/or park on local memory locations. The main +//! properties of this mechanism are: //! //! - guarantees FIFO ordering of lock acquisitions; //! - spins on locally-accessible flag variables only; @@ -19,9 +19,9 @@ //! //! It is noteworthy to mention that [spinlocks are usually not what you want]. //! The majority of use cases are well covered by OS-based mutexes like -//! [`std::sync::Mutex`], [`parking_lot::Mutex`]. These implementations will -//! notify the system that the waiting thread should be parked, freeing the -//! processor to work on something else. +//! [`std::sync::Mutex`], [`parking_lot::Mutex`] or even this crate's [`parking`] +//! Mutexes. These implementations will notify the system that the waiting thread +//! should be parked, freeing the processor to work on something else. //! //! Spinlocks are only efficient in very few circunstances where the overhead //! of context switching or process rescheduling are greater than busy waiting @@ -129,6 +129,45 @@ //! } //! ``` //! +//! ## Locking with thread parking MCS locks +//! +//! This crate also supports MCS lock implementations that will put the blocking +//! threads to sleep. All `no_std` flavors: `raw`, `barging` have matching `Mutex` +//! types under the [`parking`] module, with corresponding paths and public APIs, +//! that are thread parking capable. These implementations are not `no_std` +//! compatible. See [`parking`] module for more information. +//! +//! ``` +//! # #[cfg(all(feature = "thread_local", feature = "parking"))] +//! # { +//! use std::sync::Arc; +//! use std::thread; +//! +//! // Requires `parking` feature. +//! // Spins for a while then parks during contention. +//! use mcslock::parking::raw::{spins::Mutex, MutexNode}; +//! +//! // Requires `parking` and `thread_local` features. +//! mcslock::thread_local_parking_node! { static NODE } +//! +//! let mutex = Arc::new(Mutex::new(0)); +//! let c_mutex = Arc::clone(&mutex); +//! +//! thread::spawn(move || { +//! // Local node handles are provided by reference. +//! // Critical section must be defined as a closure. +//! c_mutex.lock_with_local_then(&NODE, |data| *data = 10); +//! }) +//! .join().expect("thread::spawn failed"); +//! +//! // A node may also be transparently allocated in the stack. +//! // Critical section must be defined as a closure. +//! assert_eq!(mutex.try_lock_then(|data| *data.unwrap()), 10); +//! # } +//! # #[cfg(not(all(feature = "thread_local", feature = "parking")))] +//! # fn main() {} +//! ``` +//! //! ## Features //! //! This crate dos not provide any default features. Features that can be enabled @@ -147,12 +186,14 @@ //! //! ### thread_local //! -//! The `thread_local` feature enables [`raw::Mutex`] locking APIs that operate -//! over queue nodes that are stored at the thread local storage. These locking -//! APIs require a static reference to [`raw::LocalMutexNode`] keys. Keys must be -//! generated by the [`thread_local_node!`] macro. This feature also enables memory -//! optimizations for [`barging::Mutex`] locking operations. This feature is not -//! `no_std` compatible. +//! The `thread_local` feature enables [`raw::Mutex`] and [`parking::raw::Mutex`] +//! locking APIs that operate over queue nodes that are stored at the thread +//! local storage. These locking APIs require a static reference to +//! [`raw::LocalMutexNode`] and [`parking::raw::LocalMutexNode`] keys respectively. +//! Keys must be generated by the [`thread_local_node!`] and +//! [`thread_local_parking_node`] macros. This feature also enables memory +//! optimizations for [`barging::Mutex`] and [`parking::barging::Mutex`] locking +//! operations. This feature is not `no_std` compatible. //! //! This feature is not `no_std` compatible. //! @@ -160,15 +201,17 @@ //! //! The `barging` feature provides locking APIs that are compatible with the //! [lock_api] crate. It does not require node allocations from the caller. The -//! [`barging`] module is suitable for `no_std` environments. This implementation -//! is not fair (does not guarantee FIFO), but can improve throughput when the lock -//! is heavily contended. +//! [`barging`] module is suitable for `no_std` environments, but +//! [`parking::barging`] is not. These implementations are not fair (they do +//! not guarantee FIFO), but can improve throughput when the lock is heavily +//! contended. //! //! ### lock_api //! -//! This feature implements the [`RawMutex`] trait from the [lock_api] -//! crate for [`barging::Mutex`]. Aliases are provided by the [`barging::lock_api`] -//! (`no_std`) module. The `lock_api` public dependency is set to version 0.4. +//! This feature implements the [`RawMutex`] trait from the [lock_api] crate for +//! both [`barging::Mutex`] and [`parking::barging::Mutex`]. Aliases are provided by +//! the [`barging::lock_api`] (`no_std`) and [`parking::barging::lock_api`] modules. +//! The [lock_api] public dependency is set to version 0.4. //! //! ## Minimum Supported Rust Version (MSRV) //! @@ -180,6 +223,14 @@ //! breaking changes when this feature is enabled. Check [lock_api]'s documentation //! for their latest MSRV under version 0.4. //! +//! ### parking +//! +//! The `parking` feature provides Mutex implementations that are capable of +//! putting blocking threads waiting for the lock to sleep. These implementations +//! are published under the [`parking`] module. Each `no_std` mutex flavors +//! provided by this crate have corresponding parking implementations under that +//! module. Users may select a out of the box parking policy at [`parking::park`]. +//! //! ## Related projects //! //! These projects provide MCS lock implementations with different APIs, @@ -199,7 +250,6 @@ //! [lock_api]: https://docs.rs/lock_api/latest/lock_api //! [`RawMutex`]: https://docs.rs/lock_api/latest/lock_api/trait.RawMutex.html //! [`RawMutexFair`]: https://docs.rs/lock_api/latest/lock_api/trait.RawMutexFair.html -//! [`parking_lot::Mutex`]: https://docs.rs/parking_lot/latest/parking_lot/type.Mutex.html #![no_std] #![allow(clippy::doc_markdown)] @@ -210,7 +260,7 @@ #![warn(clippy::undocumented_unsafe_blocks)] #![cfg_attr(docsrs, feature(doc_cfg))] -#[cfg(any(feature = "yield", feature = "thread_local", loom, test))] +#[cfg(any(feature = "yield", feature = "thread_local", feature = "parking", loom, test))] extern crate std; #[macro_use] @@ -228,6 +278,10 @@ pub(crate) mod lock; #[cfg_attr(docsrs, doc(cfg(feature = "barging")))] pub mod barging; +#[cfg(feature = "parking")] +#[cfg_attr(docsrs, doc(cfg(feature = "parking")))] +pub mod parking; + #[cfg(test)] pub(crate) mod test; diff --git a/src/lock.rs b/src/lock.rs index 5007753..530d8ad 100644 --- a/src/lock.rs +++ b/src/lock.rs @@ -3,8 +3,17 @@ use core::sync::atomic::Ordering::{Acquire, Relaxed, Release}; use crate::cfg::atomic::AtomicBool; use crate::relax::Relax; +#[cfg(feature = "parking")] +use crate::parking::park::Park; + /// A `Lock` is some arbitrary data type used by a lock implementation to /// manage the state of the lock. +/// +/// The `no_std` implementations (eg `raw` and `barging`) can simply use a +/// `AtomicBool` to manage state. The parking variants thought, need platform +/// specific data types. We are currently using the `atomic_wait` crate for +/// easy parking integration. It uses `AtomicU32` as the data type for all +/// major platforms. pub trait Lock { /// Creates a new locked `Lock` instance. /// @@ -71,11 +80,27 @@ pub trait Wait { /// The relax operation excuted inside `unlock` busy-wait loops. type UnlockRelax: Relax; + /// The thread parking policy that will be executed during lock contention. + /// + /// Enabled only for thread parking capable policies. + #[cfg(feature = "parking")] + type Park: Park; + /// Returns a initialzed relax waiting policy. fn relax_policy() -> RelaxPolicy { let relax = Self::LockRelax::new(); RelaxPolicy { relax } } + + /// Returns a initialized thread parking waiting policy. + /// + /// Enabled only for thread parking capable policies. + #[cfg(feature = "parking")] + fn parking_policy() -> ParkingPolicy { + let relax = Self::LockRelax::new(); + let park = Self::Park::new(); + ParkingPolicy { relax, park } + } } /// A waiting policy that is only composed of a relax policy. @@ -83,6 +108,13 @@ pub struct RelaxPolicy { pub relax: W::LockRelax, } +/// A waiting policy that is composed of both relax and thread parking policies. +#[cfg(feature = "parking")] +pub struct ParkingPolicy { + pub relax: W::LockRelax, + pub park: W::Park, +} + impl Lock for AtomicBool { #[cfg(not(all(loom, test)))] #[allow(clippy::declare_interior_mutable_const)] diff --git a/src/parking/barging/lock_api/mod.rs b/src/parking/barging/lock_api/mod.rs new file mode 100644 index 0000000..d2e323d --- /dev/null +++ b/src/parking/barging/lock_api/mod.rs @@ -0,0 +1,218 @@ +//! Unfair MCS lock aliases for [`lock_api::Mutex`] with thread parking support. +//! +//! This module exports [`lock_api::Mutex`] and [`lock_api::MutexGuard`] type +//! aliases with a `barging` MCS lock and guard as their inner types. The +//! [`parking::barging::Mutex`] type will implement the [`lock_api::RawMutex`] +//! trait when this feature is enabled. The `barging` MCS lock is a unfair lock. +//! +//! This module provides an implementation that is **not** `no_std` compatible. +//! +//! The lock is hold for as long as its associated RAII guard is in scope. Once +//! the guard is dropped, the mutex is freed. Mutex guards are returned by +//! [`lock`] and [`try_lock`]. +//! +//! This Mutex is generic over the two layers of parking policies. User may +//! choose a policy as long as it implements the [`Park`] trait. The shared +//! lock parking policy is associated with the `Ps` generic paramater. The +//! handoff parking policy is then associated with the `Pq` generic parameter. +//! Backoff parking policies are usually prefered for shared lock contention, +//! while non-backoff parking policies are usually prefered for handoffs. +//! +//! There is a number of parking policies provided by the [`park`] module. The +//! following modules provide type aliases for [`lock_api::Mutex`] and +//! [`lock_api::MutexGuard`] associated with a parking policy. See their +//! documentation for more information. +//! +//! [`park`]: crate::parking::park +//! [`Park`]: crate::parking::park::Park +//! [`parking::barging::Mutex`]: crate::parking::barging::Mutex +//! +//! [lock_api]: https://crates.io/crates/lock_api +//! [`lock_api::Mutex`]: https://docs.rs/lock_api/latest/lock_api/struct.Mutex.html +//! [`lock_api::MutexGuard`]: https://docs.rs/lock_api/latest/lock_api/struct.MutexGuard.html +//! [`lock_api::RawMutex`]: https://docs.rs/lock_api/latest/lock_api/trait.RawMutex.html +//! [`lock`]: https://docs.rs/lock_api/latest/lock_api/struct.Mutex.html#method.lock +//! [`try_lock`]: https://docs.rs/lock_api/latest/lock_api/struct.Mutex.html#method.try_lock + +mod mutex; +pub use mutex::{Mutex, MutexGuard}; + +/// An unfair MCS lock that implements a `spin then park` parking policy. +/// +/// During lock contention, this lock spins while signaling the processor that +/// it is running a busy-wait spin-loop. +pub mod spins { + use super::mutex; + use crate::parking::park::SpinThenPark; + + /// A [`lock_api::Mutex`] that implements the [`SpinThenPark`] parking policy. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::barging::lock_api::spins::Mutex; + /// + /// let mutex = Mutex::new(0); + /// let guard = mutex.lock(); + /// assert_eq!(*guard, 0); + /// ``` + /// [`lock_api::Mutex`]: mutex::Mutex + pub type Mutex = mutex::Mutex; + + /// A [`lock_api::MutexGuard`] that implements the [`SpinThenPark`] parking + /// policy. + /// + /// [`lock_api::MutexGuard`]: mutex::MutexGuard + pub type MutexGuard<'a, T> = mutex::MutexGuard<'a, T, SpinThenPark, SpinThenPark>; + + /// An unfair MCS lock that implements a `spin with backoff then park` parking + /// policy. + /// + /// During lock contention, this lock will perform exponential backoff + /// while spinning, signaling the processor that it is running a busy-wait + /// spin-loop. + pub mod backoff { + use super::mutex; + use crate::parking::park::{SpinBackoffThenPark, SpinThenPark}; + + /// A [`lock_api::Mutex`] that implements the [`SpinBackoffThenPark`] + /// parking policy. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::barging::lock_api::spins::backoff::Mutex; + /// + /// let mutex = Mutex::new(0); + /// let guard = mutex.lock(); + /// assert_eq!(*guard, 0); + /// ``` + /// [`lock_api::Mutex`]: mutex::Mutex + pub type Mutex = mutex::Mutex; + + /// A [`lock_api::MutexGuard`] that implements the [`SpinBackoffThenPark`] + /// parking policy. + /// + /// [`lock_api::MutexGuard`]: mutex::MutexGuard + pub type MutexGuard<'a, T> = mutex::MutexGuard<'a, T, SpinBackoffThenPark, SpinThenPark>; + } +} + +/// An unfair MCS lock that implements a `yield then park` parking policy. +/// +/// During lock contention, this lock will yield the current time slice to the +/// OS scheduler. +#[cfg(any(feature = "yield", loom, test))] +#[cfg_attr(docsrs, doc(cfg(feature = "yield")))] +pub mod yields { + use super::mutex; + use crate::parking::park::YieldThenPark; + + /// A [`lock_api::Mutex`] that implements the [`YieldThenPark`] parking policy. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::barging::lock_api::yields::Mutex; + /// + /// let mutex = Mutex::new(0); + /// let guard = mutex.lock(); + /// assert_eq!(*guard, 0); + /// ``` + /// [`lock_api::Mutex`]: mutex::Mutex + pub type Mutex = mutex::Mutex; + + /// A [`lock_api::MutexGuard`] that implements the [`YieldThenPark`] parking + /// policy. + /// + /// [`lock_api::MutexGuard`]: mutex::MutexGuard + pub type MutexGuard<'a, T> = mutex::MutexGuard<'a, T, YieldThenPark, YieldThenPark>; + + /// An unfair MCS lock that implements a `yield with backoff then park` + /// parking policy. + /// + /// During lock contention, this lock will perform exponential backoff while + /// spinning, up to a threshold, then yields back to the OS scheduler. + pub mod backoff { + use super::mutex; + use crate::parking::park::{YieldBackoffThenPark, YieldThenPark}; + + /// A [`lock_api::Mutex`] that implements the [`YieldBackoffThenPark`] + /// parking policy. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::barging::lock_api::yields::backoff::Mutex; + /// + /// let mutex = Mutex::new(0); + /// let guard = mutex.lock(); + /// assert_eq!(*guard, 0); + /// ``` + /// [`lock_api::Mutex`]: mutex::Mutex + pub type Mutex = mutex::Mutex; + + /// A [`lock_api::MutexGuard`] that implements the [`YieldBackoffThenPark`] + /// parking policy. + /// + /// [`lock_api::MutexGuard`]: mutex::MutexGuard + pub type MutexGuard<'a, T> = mutex::MutexGuard<'a, T, YieldBackoffThenPark, YieldThenPark>; + } +} + +/// An unfair MCS lock that implements a `loop then park` parking policy. +/// +/// During lock contention, this lock will rapidly spin without telling the CPU +/// to do any power down. +pub mod loops { + use super::mutex; + use crate::parking::park::LoopThenPark; + + /// A [`lock_api::Mutex`] that implements the [`LoopThenPark`] parking policy. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::barging::lock_api::loops::Mutex; + /// + /// let mutex = Mutex::new(0); + /// let guard = mutex.lock(); + /// assert_eq!(*guard, 0); + /// ``` + /// [`lock_api::Mutex`]: mutex::Mutex + pub type Mutex = mutex::Mutex; + + /// A [`lock_api::MutexGuard`] that implements the [`LoopThenPark`] parking + /// policy. + /// + /// [`lock_api::MutexGuard`]: mutex::MutexGuard + pub type MutexGuard<'a, T> = mutex::MutexGuard<'a, T, LoopThenPark, LoopThenPark>; +} + +/// An unfair MCS lock that implements an `immediate park` parking policy. +/// +/// During lock contention, this lock will immediately put the thread to sleep. +pub mod immediate { + use super::mutex; + use crate::parking::park::ImmediatePark; + + /// A [`lock_api::Mutex`] that implements the [`ImmediatePark`] parking policy. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::barging::lock_api::immediate::Mutex; + /// + /// let mutex = Mutex::new(0); + /// let guard = mutex.lock(); + /// assert_eq!(*guard, 0); + /// ``` + /// [`lock_api::Mutex`]: mutex::Mutex + pub type Mutex = mutex::Mutex; + + /// A [`lock_api::MutexGuard`] that implements the [`ImmediatePark`] parking + /// policy. + /// + /// [`lock_api::MutexGuard`]: mutex::MutexGuard + pub type MutexGuard<'a, T> = mutex::MutexGuard<'a, T, ImmediatePark, ImmediatePark>; +} diff --git a/src/parking/barging/lock_api/mutex.rs b/src/parking/barging/lock_api/mutex.rs new file mode 100644 index 0000000..6db2365 --- /dev/null +++ b/src/parking/barging/lock_api/mutex.rs @@ -0,0 +1,169 @@ +use crate::parking::barging; + +#[cfg(test)] +use crate::parking::park::Park; +#[cfg(test)] +use crate::test::{LockData, LockNew, LockThen, LockWithThen, TryLockThen, TryLockWithThen}; + +/// A [`lock_api::Mutex`] alias that wraps a [`parking::barging::Mutex`]. +/// +/// [`parking::barging::Mutex`]: crate::parking::barging::Mutex +/// [`lock_api::Mutex`]: https://docs.rs/lock_api/latest/lock_api/struct.Mutex.html +pub type Mutex = lock_api::Mutex, T>; + +/// A [`lock_api::MutexGuard`] alias that wraps a [`parking::barging::MutexGuard`]. +/// +/// [`parking::barging::MutexGuard`]: crate::parking::barging::MutexGuard +/// [`lock_api::MutexGuard`]: https://docs.rs/lock_api/latest/lock_api/struct.MutexGuard.html +pub type MutexGuard<'a, T, Ps, Pq> = lock_api::MutexGuard<'a, barging::Mutex<(), Ps, Pq>, T>; + +#[cfg(test)] +impl LockNew for Mutex { + type Target = T; + + fn new(value: Self::Target) -> Self + where + Self::Target: Sized, + { + Self::new(value) + } +} + +#[cfg(test)] +impl LockWithThen for Mutex { + // The barging mutex does not require queue node access. + type Node = (); + + type Guard<'a> + = MutexGuard<'a, T, Ps, Pq> + where + Self: 'a, + Self::Target: 'a; + + fn lock_with_then(&self, (): &mut Self::Node, f: F) -> Ret + where + F: FnOnce(MutexGuard<'_, T, Ps, Pq>) -> Ret, + { + f(self.lock()) + } +} + +#[cfg(test)] +impl TryLockWithThen for Mutex { + fn try_lock_with_then(&self, (): &mut Self::Node, f: F) -> Ret + where + F: FnOnce(Option>) -> Ret, + { + f(self.try_lock()) + } + + fn is_locked(&self) -> bool { + self.is_locked() + } +} + +#[cfg(test)] +impl LockData for Mutex { + fn into_inner(self) -> Self::Target + where + Self::Target: Sized, + { + self.into_inner() + } + + fn get_mut(&mut self) -> &mut Self::Target { + self.get_mut() + } +} + +#[cfg(test)] +impl LockThen for Mutex {} + +#[cfg(test)] +impl TryLockThen for Mutex {} + +#[cfg(test)] +mod test { + use crate::parking::barging::lock_api::immediate::Mutex; + use crate::test::tests; + + #[test] + fn lots_and_lots_lock() { + tests::lots_and_lots_lock::>(); + } + + #[test] + fn lots_and_lots_try_lock() { + tests::lots_and_lots_try_lock::>(); + } + + #[test] + fn lots_and_lots_mixed_lock() { + tests::lots_and_lots_mixed_lock::>(); + } + + #[test] + fn smoke() { + tests::smoke::>(); + } + + #[test] + fn test_guard_debug_display() { + tests::test_guard_debug_display::>(); + } + + #[test] + fn test_mutex_debug() { + tests::test_mutex_debug::>(); + } + + #[test] + fn test_mutex_from() { + tests::test_mutex_from::>(); + } + + #[test] + fn test_mutex_default() { + tests::test_mutex_default::>(); + } + + #[test] + fn test_try_lock() { + tests::test_try_lock::>(); + } + + #[test] + fn test_into_inner() { + tests::test_into_inner::>(); + } + + #[test] + fn test_into_inner_drop() { + tests::test_into_inner_drop::>(); + } + + #[test] + fn test_get_mut() { + tests::test_get_mut::>(); + } + + #[test] + fn test_lock_arc_nested() { + tests::test_lock_arc_nested::, Mutex<_>>(); + } + + #[test] + fn test_acquire_more_than_one_lock() { + tests::test_acquire_more_than_one_lock::>(); + } + + #[test] + fn test_lock_arc_access_in_unwind() { + tests::test_lock_arc_access_in_unwind::>(); + } + + #[test] + fn test_lock_unsized() { + tests::test_lock_unsized::>(); + } +} diff --git a/src/parking/barging/mod.rs b/src/parking/barging/mod.rs new file mode 100644 index 0000000..9b38fb3 --- /dev/null +++ b/src/parking/barging/mod.rs @@ -0,0 +1,236 @@ +//! Unfair MCS lock implementation with thread parking support. +//! +//! This implementation will have non-waiting threads race for the lock against +//! the front of the waiting queue thread. If the front of the queue thread +//! looses the race, it will simply go back to sleep, while holding its position +//! in the queue. By allowing barging instead of forcing FIFO, a higher +//! throughput can be achieved when the lock is heavily contended. +//! +//! This module provides an implementation that ** is not** `no_std` compatible, +//! it does not require queue nodes to be allocated by the callers, and so it +//! is compatible with the [lock_api] crate (see `lock_api` feature). +//! +//! The lock is hold for as long as its associated RAII guard is in scope. Once +//! the guard is dropped, the mutex is freed. Mutex guards are returned by +//! [`lock`] and [`try_lock`]. Guards are also accessible as the closure argument +//! for [`lock_then`] and [`try_lock_then`] methods. +//! +//! This Mutex is generic over the two layers of parking policies. User may +//! choose a policy as long as it implements the [`Park`] trait. The shared +//! lock parking policy is associated with the `Ps` generic paramater. The +//! handoff parking policy is then associated with the `Pq` generic parameter. +//! Backoff parking policies are usually prefered for shared lock contention, +//! while non-backoff parking policies are usually prefered for handoffs. +//! +//! There is a number of policies provided by the [`park`] module. The +//! following modules provide type aliases for [`Mutex`] and [`MutexGuard`] +//! associated a parking policy. See their documentation for more information. +//! +//! [`lock`]: Mutex::lock +//! [`try_lock`]: Mutex::try_lock +//! [`lock_then`]: Mutex::lock_then +//! [`try_lock_then`]: Mutex::try_lock_then +//! [`park`]: crate::parking::park +//! [`Park`]: crate::parking::park::Park +//! +//! [lock_api]: https://crates.io/crates/lock_api + +mod mutex; +pub use mutex::{Mutex, MutexGuard}; + +#[cfg(all(feature = "lock_api", not(loom)))] +#[cfg_attr(docsrs, doc(cfg(feature = "lock_api")))] +pub mod lock_api; + +#[cfg(feature = "thread_local")] +mod thread_local; + +/// A unfair MCS lock that implements a `spin then park` parking policy. +/// +/// During lock contention, and for a certain amount of attempts, this lock spins +/// while signaling the processor that it is running a busy-wait spin-loop. Once +/// all attempts have been tried, puts the thread to sleep. +pub mod spins { + use super::mutex; + use crate::parking::park::SpinThenPark; + + /// A [`parking::barging::Mutex`] that implements the [`SpinThenPark`] + /// parking policy. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::barging::spins::Mutex; + /// + /// let mutex = Mutex::new(0); + /// let guard = mutex.lock(); + /// assert_eq!(*guard, 0); + /// ``` + /// [`parking::barging::Mutex`]: mutex::Mutex + pub type Mutex = mutex::Mutex; + + /// A [`parking::barging::MutexGuard`] that implements the [`SpinThenPark`] + /// parking policy. + /// + /// [`parking::barging::MutexGuard`]: mutex::MutexGuard + pub type MutexGuard<'a, T> = mutex::MutexGuard<'a, T, SpinThenPark, SpinThenPark>; + + /// A unfair MCS lock that implements a `spin with backoff then park` + /// parking policy. + /// + /// During lock contention, and for a certain amount of attempts, this lock + /// will perform exponential backoff spinning, signaling the processor that + /// its is running a busy-wait spin-loop. Once all attempts have been tried, + /// puts the thread to sleep. + pub mod backoff { + use super::mutex; + use crate::parking::park::{SpinBackoffThenPark, SpinThenPark}; + + /// A [`parking::barging::Mutex`] that implements the + /// [`SpinBackoffThenPark`] parking policy. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::barging::spins::backoff::Mutex; + /// + /// let mutex = Mutex::new(0); + /// let guard = mutex.lock(); + /// assert_eq!(*guard, 0); + /// ``` + /// [`parking::barging::Mutex`]: mutex::Mutex + pub type Mutex = mutex::Mutex; + + /// A [`parking::barging::MutexGuard`] that implements the + /// [`SpinBackoffThenPark`] parking policy. + /// + /// [`parking::barging::MutexGuard`]: mutex::MutexGuard + pub type MutexGuard<'a, T> = mutex::MutexGuard<'a, T, SpinBackoffThenPark, SpinThenPark>; + } +} + +/// A unfair MCS lock that implements a `yield then park` parking policy. +/// +/// During lock contention, and for a certain amount of attempts, this lock will +/// yield the current time slice to the OS scheduler. Once all attempts have +/// been tried, puts the thread to sleep. +#[cfg(any(feature = "yield", loom, test))] +#[cfg_attr(docsrs, doc(cfg(feature = "yield")))] +pub mod yields { + use super::mutex; + use crate::parking::park::YieldThenPark; + + /// A [`parking::barging::Mutex`] that implements the [`YieldThenPark`] + /// parking policy. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::barging::yields::Mutex; + /// + /// let mutex = Mutex::new(0); + /// let guard = mutex.lock(); + /// assert_eq!(*guard, 0); + /// ``` + /// [`parking::barging::Mutex`]: mutex::Mutex + pub type Mutex = mutex::Mutex; + + /// A [`parking::barging::MutexGuard`] that implements the [`YieldThenPark`] + /// parking policy. + /// + /// [`parking::barging::MutexGuard`]: mutex::MutexGuard + pub type MutexGuard<'a, T> = mutex::MutexGuard<'a, T, YieldThenPark, YieldThenPark>; + + /// A unfair MCS lock that implements a `yield with backoff then park` + /// parking policy. + /// + /// During lock contention, and for a certain amount of attempts, this lock + /// will perform exponential backoff while spinning, up to a threshold, then + /// yields back to the OS scheduler. Once all attempts have been tried, it + /// will then put the thread to sleep. + pub mod backoff { + use super::mutex; + use crate::parking::park::{YieldBackoffThenPark, YieldThenPark}; + + /// A [`parking::barging::Mutex`] that implements the + /// [`YieldBackoffThenPark`] parking policy. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::barging::yields::backoff::Mutex; + /// + /// let mutex = Mutex::new(0); + /// let guard = mutex.lock(); + /// assert_eq!(*guard, 0); + /// ``` + /// [`parking::barging::Mutex`]: mutex::Mutex + pub type Mutex = mutex::Mutex; + + /// A [`parking::barging::MutexGuard`] that implements the + /// [`YieldBackoffThenPark`] parking policy. + /// + /// [`parking::barging::MutexGuard`]: mutex::MutexGuard + pub type MutexGuard<'a, T> = mutex::MutexGuard<'a, T, YieldBackoffThenPark, YieldThenPark>; + } +} + +/// A unfair MCS lock that implements a `loop then park` parking policy. +/// +/// During lock contention, and for a certain amount of attempts, this lock +/// will rapidly spin without telling the CPU to do any power down. Once all +/// attempts have been tried, it will then put the thread to sleep. +pub mod loops { + use super::mutex; + use crate::parking::park::LoopThenPark; + + /// A [`parking::barging::Mutex`] that implements the [`LoopThenPark`] + /// parking policy. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::barging::loops::Mutex; + /// + /// let mutex = Mutex::new(0); + /// let guard = mutex.lock(); + /// assert_eq!(*guard, 0); + /// ``` + /// [`parking::barging::Mutex`]: mutex::Mutex + pub type Mutex = mutex::Mutex; + + /// A [`parking::barging::MutexGuard`] that implements the [`LoopThenPark`] + /// parking policy. + /// + /// [`parking::barging::MutexGuard`]: mutex::MutexGuard + pub type MutexGuard<'a, T> = mutex::MutexGuard<'a, T, LoopThenPark, LoopThenPark>; +} + +/// A unfair MCS lock that implements an `immediate park` parking policy. +/// +/// During lock contention, this lock will immediately put the thread to sleep. +pub mod immediate { + use super::mutex; + use crate::parking::park::ImmediatePark; + + /// A [`parking::barging::Mutex`] that implements the [`ImmediatePark`] + /// parking policy. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::barging::immediate::Mutex; + /// + /// let mutex = Mutex::new(0); + /// let guard = mutex.lock(); + /// assert_eq!(*guard, 0); + /// ``` + /// [`parking::barging::Mutex`]: mutex::Mutex + pub type Mutex = mutex::Mutex; + + /// A [`parking::barging::MutexGuard`] that implements the [`ImmediatePark`] + /// parking policy. + /// + /// [`parking::barging::MutexGuard`]: mutex::MutexGuard + pub type MutexGuard<'a, T> = mutex::MutexGuard<'a, T, ImmediatePark, ImmediatePark>; +} diff --git a/src/parking/barging/mutex.rs b/src/parking/barging/mutex.rs new file mode 100644 index 0000000..e4fc79b --- /dev/null +++ b/src/parking/barging/mutex.rs @@ -0,0 +1,757 @@ +use core::fmt::{self, Debug, Display, Formatter}; + +use crate::inner::barging as inner; +use crate::parking::park::{Park, ParkWait}; +use crate::parking::parker::Parker; + +#[cfg(test)] +use crate::test::{LockNew, LockThen, LockWithThen, TryLockThen, TryLockWithThen}; + +#[cfg(all(loom, test))] +use crate::loom::{Guard, GuardDeref, GuardDerefMut}; +#[cfg(all(loom, test))] +use crate::test::{AsDeref, AsDerefMut}; + +// The inner type of mutex, with a `futex` compatible atomic data. +type MutexInner = inner::Mutex, ParkWait>; + +/// A mutual exclusion primitive useful for protecting shared data. +/// +/// This mutex will block threads waiting for the lock to become available. The +/// mutex can also be statically initialized or created via a [`new`] +/// constructor. Each mutex has a type parameter which represents the data that +/// it is protecting. The data can only be accessed through the RAII guards +/// returned from [`lock`] and [`try_lock`], which guarantees that the data is only +/// ever accessed when the mutex is locked. +/// +/// If the `thread_local` feature is enabled (not `no_std` compatible), locking +/// operations that block ([`lock`] and [`lock_then`]) will access and modify +/// queue nodes stored at the thread local storage of the locking threads. Else, +/// these locking operations will allocate a queue node in the stack, for each +/// call (`no_std` compatible). +/// +/// # Examples +/// +/// ``` +/// use std::sync::Arc; +/// use std::thread; +/// use std::sync::mpsc::channel; +/// +/// use mcslock::parking::barging; +/// use mcslock::parking::park::{SpinBackoffThenPark, SpinThenPark}; +/// +/// type Mutex = barging::Mutex; +/// +/// const N: usize = 10; +/// +/// // Spawn a few threads to increment a shared variable (non-atomically), and +/// // let the main thread know once all increments are done. +/// // +/// // Here we're using an Arc to share memory among threads, and the data inside +/// // the Arc is protected with a mutex. +/// let data = Arc::new(Mutex::new(0)); +/// +/// let (tx, rx) = channel(); +/// for _ in 0..N { +/// let (data, tx) = (data.clone(), tx.clone()); +/// thread::spawn(move || { +/// // The shared state can only be accessed once the lock is held. +/// // Our non-atomic increment is safe because we're the only thread +/// // which can access the shared state when the lock is held. +/// // +/// // We unwrap() the return value to assert that we are not expecting +/// // threads to ever fail while holding the lock. +/// let mut data = data.lock(); +/// *data += 1; +/// if *data == N { +/// tx.send(()).unwrap(); +/// } +/// // the lock is unlocked here when `data` goes out of scope. +/// }); +/// } +/// +/// rx.recv().unwrap(); +/// ``` +/// [`new`]: Mutex::new +/// [`lock`]: Mutex::lock +/// [`lock_then`]: Mutex::lock_then +/// [`try_lock`]: Mutex::try_lock +pub struct Mutex { + pub(super) inner: MutexInner, +} + +// SAFETY: `inner::Mutex` is `Send` if `T` is `Send`. +unsafe impl Send for Mutex {} +// SAFETY: `inner::Mutex` is `Sync` if `T` is `Send`. +unsafe impl Sync for Mutex {} + +impl Mutex { + /// Creates a new mutex in an unlocked state ready for use. + /// + /// # Examples + /// + /// ``` + /// use mcslock::parking::barging; + /// use mcslock::parking::park::{SpinBackoffThenPark, SpinThenPark}; + /// + /// type Mutex = barging::Mutex; + /// + /// const MUTEX: Mutex = Mutex::new(0); + /// let mutex = Mutex::new(0); + /// ``` + #[cfg(not(all(loom, test)))] + #[inline] + pub const fn new(value: T) -> Self { + Self { inner: inner::Mutex::new(value) } + } + + /// Creates a new unlocked mutex with Loom primitives (non-const). + #[cfg(all(loom, test))] + #[cfg(not(tarpaulin))] + pub(super) fn new(value: T) -> Self { + Self { inner: inner::Mutex::new(value) } + } + + /// Consumes this mutex, returning the underlying data. + /// + /// # Examples + /// + /// ``` + /// use mcslock::parking::barging; + /// use mcslock::parking::park::{SpinBackoffThenPark, SpinThenPark}; + /// + /// type Mutex = barging::Mutex; + /// + /// let mutex = Mutex::new(0); + /// assert_eq!(mutex.into_inner(), 0); + /// ``` + #[inline(always)] + pub fn into_inner(self) -> T { + self.inner.into_inner() + } +} + +impl Mutex { + /// Acquires this mutex, blocking the current thread until it is able to do so. + /// + /// This function will block the local thread until it is available to acquire + /// the mutex. Upon returning, the thread is the only thread with the lock + /// held. An RAII guard is returned to allow scoped unlock of the lock. When + /// the guard goes out of scope, the mutex will be unlocked. + /// + /// This function will block if the lock is unavailable. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use std::thread; + /// + /// use mcslock::parking::barging; + /// use mcslock::parking::park::{SpinBackoffThenPark, SpinThenPark}; + /// + /// type Mutex = barging::Mutex; + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = Arc::clone(&mutex); + /// + /// thread::spawn(move || { + /// *c_mutex.lock() = 10; + /// }) + /// .join().expect("thread::spawn failed"); + /// + /// assert_eq!(*mutex.lock(), 10); + /// ``` + #[inline] + pub fn lock(&self) -> MutexGuard<'_, T, Ps, Pq> { + self.lock_impl() + } + + /// Acquires this mutex and then runs the closure against its guard. + /// + /// This function will block the local thread until it is available to acquire + /// the mutex. Upon acquiring the mutex, the user provided closure will be + /// executed against the mutex guard. Once the guard goes out of scope, it + /// will unlock the mutex. + /// + /// This function will block if the lock is unavailable. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use std::thread; + /// + /// use mcslock::parking::barging; + /// use mcslock::parking::park::{SpinBackoffThenPark, SpinThenPark}; + /// + /// type Mutex = barging::Mutex; + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = Arc::clone(&mutex); + /// + /// thread::spawn(move || { + /// c_mutex.lock_then(|mut guard| *guard = 10); + /// }) + /// .join().expect("thread::spawn failed"); + /// + /// assert_eq!(mutex.lock_then(|guard| *guard), 10); + /// ``` + /// + /// Compile fail: borrows of the guard or its data cannot escape the given + /// closure: + /// + /// ```compile_fail,E0515 + /// use mcslock::parking::barging::spins::Mutex; + /// + /// let mutex = Mutex::new(1); + /// let data = mutex.lock_then(|guard| &*guard); + /// ``` + #[inline] + pub fn lock_then(&self, f: F) -> Ret + where + F: FnOnce(MutexGuard<'_, T, Ps, Pq>) -> Ret, + { + f(self.lock()) + } +} + +impl Mutex { + /// The non `thread_local` enabled `lock` implementation. + /// + /// Implemented by `lock_with_stack_queue_node`. + #[cfg(not(feature = "thread_local"))] + #[cfg(not(tarpaulin_include))] + fn lock_impl(&self) -> MutexGuard<'_, T, Ps, Pq> { + self.lock_with_stack_queue_node() + } + + /// Underlying implementation of `lock` that is only enabled when the + /// `thread_local` feature is disabled. + /// + /// This implementation will allocate, access and modify a queue node for + /// each call, storing it at the current stack frame. + #[cfg(any(test, not(feature = "thread_local")))] + fn lock_with_stack_queue_node(&self) -> MutexGuard<'_, T, Ps, Pq> { + self.inner.lock_with_stack_queue_node().into() + } +} + +impl Mutex { + /// Attempts to acquire this mutex without blocking the thread. + /// + /// If the lock could not be acquired at this time, then [`None`] is returned. + /// Otherwise, an RAII guard is returned. The lock will be unlocked when the + /// guard is dropped. + /// + /// This function does not block. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use std::thread; + /// + /// use mcslock::parking::barging; + /// use mcslock::parking::park::{SpinBackoffThenPark, SpinThenPark}; + /// + /// type Mutex = barging::Mutex; + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = Arc::clone(&mutex); + /// + /// thread::spawn(move || { + /// let mut guard = c_mutex.try_lock(); + /// if let Some(mut guard) = guard { + /// *guard = 10; + /// } else { + /// println!("try_lock failed"); + /// } + /// }) + /// .join().expect("thread::spawn failed"); + /// + /// assert_eq!(*mutex.lock(), 10); + /// ``` + #[inline] + pub fn try_lock(&self) -> Option> { + self.inner.try_lock().map(From::from) + } + + /// Attempts to acquire this mutex and then runs a closure against its guard. + /// + /// If the lock could not be acquired at this time, then a [`None`] value is + /// given back as the closure argument. If the lock has been acquired, then + /// a [`Some`] value with the mutex guard is given instead. The lock will be + /// unlocked when the guard is dropped. + /// + /// This function does not block. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use std::thread; + /// + /// use mcslock::parking::barging; + /// use mcslock::parking::park::{SpinBackoffThenPark, SpinThenPark}; + /// + /// type Mutex = barging::Mutex; + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = Arc::clone(&mutex); + /// + /// thread::spawn(move || { + /// c_mutex.try_lock_then(|guard| { + /// if let Some(mut guard) = guard { + /// *guard = 10; + /// } else { + /// println!("try_lock_then failed"); + /// } + /// }); + /// }) + /// .join().expect("thread::spawn failed"); + /// + /// assert_eq!(mutex.lock_then(|guard| *guard), 10); + /// ``` + /// + /// Compile fail: borrows of the guard or its data cannot escape the given + /// closure: + /// + /// ```compile_fail,E0515 + /// use mcslock::parking::barging::spins::Mutex; + /// + /// let mutex = Mutex::new(1); + /// let data = mutex.try_lock_then(|guard| &*guard.unwrap()); + /// ``` + #[inline] + pub fn try_lock_then(&self, f: F) -> Ret + where + F: FnOnce(Option>) -> Ret, + { + f(self.try_lock()) + } + + /// Returns `true` if the lock is currently held. + /// + /// This method does not provide any synchronization guarantees, so its only + /// useful as a heuristic, and so must be considered not up to date. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::barging; + /// use mcslock::parking::park::{SpinBackoffThenPark, SpinThenPark}; + /// + /// type Mutex = barging::Mutex; + /// + /// let mutex = Mutex::new(0); + /// let guard = mutex.lock(); + /// drop(guard); + /// + /// assert_eq!(mutex.is_locked(), false); + /// ``` + #[inline] + pub fn is_locked(&self) -> bool { + self.inner.is_locked() + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the `Mutex` mutably, no actual locking needs to + /// take place - the mutable borrow statically guarantees no locks exist. + /// + /// # Examples + /// + /// ``` + /// use mcslock::parking::barging; + /// use mcslock::parking::park::{SpinBackoffThenPark, SpinThenPark}; + /// + /// type Mutex = barging::Mutex; + /// + /// let mut mutex = Mutex::new(0); + /// *mutex.get_mut() = 10; + /// + /// assert_eq!(*mutex.lock(), 10); + /// ``` + #[cfg(not(all(loom, test)))] + #[inline(always)] + pub fn get_mut(&mut self) -> &mut T { + self.inner.get_mut() + } +} + +impl Default for Mutex { + /// Creates a `Mutex`, with the `Default` value for `T`. + #[inline] + fn default() -> Self { + Self::new(Default::default()) + } +} + +impl From for Mutex { + /// Creates a `Mutex` from a instance of `T`. + #[inline] + fn from(data: T) -> Self { + Self::new(data) + } +} + +impl Debug for Mutex { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + self.inner.fmt(f) + } +} + +/// A Mutex wrapper type that calls `lock_with_stack_queue_node` when +/// implementing testing traits. +#[cfg(test)] +struct MutexStackNode(Mutex); + +#[cfg(test)] +impl Default for MutexStackNode { + fn default() -> Self { + Self(Mutex::default()) + } +} + +#[cfg(test)] +impl From for MutexStackNode { + fn from(value: T) -> Self { + Self(Mutex::from(value)) + } +} + +#[cfg(test)] +impl Debug for MutexStackNode { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +#[cfg(test)] +impl LockNew for MutexStackNode { + type Target = T; + + fn new(value: Self::Target) -> Self + where + Self::Target: Sized, + { + Self(Mutex::new(value)) + } +} + +#[cfg(test)] +impl LockWithThen for MutexStackNode { + // The barging mutex does not require queue node access. + type Node = (); + + type Guard<'a> + = MutexGuard<'a, T, Ps, Pq> + where + Self: 'a, + Self::Target: 'a; + + fn lock_with_then(&self, (): &mut Self::Node, f: F) -> Ret + where + F: FnOnce(MutexGuard<'_, T, Ps, Pq>) -> Ret, + { + f(self.0.lock_with_stack_queue_node()) + } +} + +#[cfg(test)] +impl TryLockWithThen for MutexStackNode { + fn try_lock_with_then(&self, (): &mut Self::Node, f: F) -> Ret + where + F: FnOnce(Option>) -> Ret, + { + self.0.try_lock_then(f) + } + + fn is_locked(&self) -> bool { + self.0.is_locked() + } +} + +#[cfg(all(not(loom), test))] +impl crate::test::LockData for MutexStackNode { + fn into_inner(self) -> Self::Target + where + Self::Target: Sized, + { + self.0.into_inner() + } + + fn get_mut(&mut self) -> &mut Self::Target { + self.0.get_mut() + } +} + +#[cfg(test)] +impl LockThen for MutexStackNode { + fn lock_then(&self, f: F) -> Ret + where + F: FnOnce(MutexGuard<'_, T, Ps, Pq>) -> Ret, + { + self.0.lock_then(f) + } +} + +#[cfg(test)] +impl TryLockThen for MutexStackNode {} + +// SAFETY: This `Mutex` implementation guarantees linearization of access and +// modification to the protected data in a concurrent, multithreaded context. +#[cfg(all(feature = "lock_api", not(loom)))] +unsafe impl lock_api::RawMutex for Mutex<(), Ps, Pq> { + type GuardMarker = lock_api::GuardSend; + + #[allow(clippy::declare_interior_mutable_const)] + const INIT: Self = Self::new(()); + + #[inline] + fn lock(&self) { + core::mem::forget(Self::lock(self)); + } + + #[inline] + fn try_lock(&self) -> bool { + Self::try_lock(self).map(core::mem::forget).is_some() + } + + #[inline] + unsafe fn unlock(&self) { + self.inner.unlock(); + } + + #[inline] + fn is_locked(&self) -> bool { + self.is_locked() + } +} + +// The inner type of mutex's guard, with a `futex` compatible atomic data. +type GuardInner<'a, T, Ps, Pq> = inner::MutexGuard<'a, T, Parker, ParkWait, ParkWait>; + +/// An RAII implementation of a "scoped lock" of a mutex. When this structure is +/// dropped (falls out of scope), the lock will be unlocked. +/// +/// The data protected by the mutex can be access through this guard via its +/// [`Deref`] and [`DerefMut`] implementations. +/// +/// This structure is returned by [`lock`] and [`try_lock`] methods on [`Mutex`]. +/// It is also given as closure parameters by [`lock_then`] and [`try_lock_then`] +/// methods. +/// +/// [`Deref`]: core::ops::Deref +/// [`DerefMut`]: core::ops::DerefMut +/// [`lock`]: Mutex::lock +/// [`try_lock`]: Mutex::lock +/// [`lock_then`]: Mutex::lock_then +/// [`try_lock_then`]: Mutex::try_lock_then +#[must_use = "if unused the Mutex will immediately unlock"] +pub struct MutexGuard<'a, T: ?Sized, Ps, Pq> { + inner: GuardInner<'a, T, Ps, Pq>, +} + +// SAFETY: `inner::MutexGuard` is `Send` if `T` is `Send`. +unsafe impl Send for MutexGuard<'_, T, Ps, Pq> {} +// SAFETY: `inner::MutexGuard` is `Sync` if `T` is `Sync`. +unsafe impl Sync for MutexGuard<'_, T, Ps, Pq> {} + +#[doc(hidden)] +impl<'a, T: ?Sized, Ps, Pq> From> for MutexGuard<'a, T, Ps, Pq> { + fn from(inner: GuardInner<'a, T, Ps, Pq>) -> Self { + Self { inner } + } +} + +impl Debug for MutexGuard<'_, T, Ps, Pq> { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + self.inner.fmt(f) + } +} + +impl Display for MutexGuard<'_, T, Ps, Pq> { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + self.inner.fmt(f) + } +} + +#[cfg(not(all(loom, test)))] +impl core::ops::Deref for MutexGuard<'_, T, Ps, Pq> { + type Target = T; + + /// Dereferences the guard to access the underlying data. + #[inline(always)] + fn deref(&self) -> &T { + &self.inner + } +} + +#[cfg(not(all(loom, test)))] +impl core::ops::DerefMut for MutexGuard<'_, T, Ps, Pq> { + /// Mutably dereferences the guard to access the underlying data. + #[inline(always)] + fn deref_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +// SAFETY: A guard instance hold the lock locked, with exclusive access to the +// underlying data. +#[cfg(all(loom, test))] +#[cfg(not(tarpaulin))] +unsafe impl Guard for MutexGuard<'_, T, Ps, Pq> { + type Target = T; + + fn get(&self) -> &loom::cell::UnsafeCell { + self.inner.get() + } +} + +#[cfg(all(loom, test))] +#[cfg(not(tarpaulin))] +impl AsDeref for MutexGuard<'_, T, Ps, Pq> { + type Target = T; + + type Deref<'a> + = GuardDeref<'a, Self> + where + Self: 'a, + Self::Target: 'a; + + fn as_deref(&self) -> Self::Deref<'_> { + self.get_ref() + } +} + +#[cfg(all(loom, test))] +#[cfg(not(tarpaulin))] +impl AsDerefMut for MutexGuard<'_, T, Ps, Pq> { + type DerefMut<'a> + = GuardDerefMut<'a, Self> + where + Self: 'a, + Self::Target: 'a; + + fn as_deref_mut(&mut self) -> Self::DerefMut<'_> { + self.get_mut() + } +} + +#[cfg(all(not(loom), test))] +mod test { + use crate::parking::park::ImmediatePark; + use crate::test::tests; + + type Mutex = super::MutexStackNode; + + #[test] + fn node_waiter_drop_does_not_matter() { + tests::node_waiter_drop_does_not_matter::(); + } + + #[test] + fn lots_and_lots_lock() { + tests::lots_and_lots_lock::>(); + } + + #[test] + fn lots_and_lots_try_lock() { + tests::lots_and_lots_try_lock::>(); + } + + #[test] + fn lots_and_lots_mixed_lock() { + tests::lots_and_lots_mixed_lock::>(); + } + + #[test] + fn smoke() { + tests::smoke::>(); + } + + #[test] + fn test_guard_debug_display() { + tests::test_guard_debug_display::>(); + } + + #[test] + fn test_mutex_debug() { + tests::test_mutex_debug::>(); + } + + #[test] + fn test_mutex_from() { + tests::test_mutex_from::>(); + } + + #[test] + fn test_mutex_default() { + tests::test_mutex_default::>(); + } + + #[test] + fn test_try_lock() { + tests::test_try_lock::>(); + } + + #[test] + fn test_into_inner() { + tests::test_into_inner::>(); + } + + #[test] + fn test_into_inner_drop() { + tests::test_into_inner_drop::>(); + } + + #[test] + fn test_get_mut() { + tests::test_get_mut::>(); + } + + #[test] + fn test_lock_arc_nested() { + tests::test_lock_arc_nested::, Mutex<_>>(); + } + + #[test] + fn test_acquire_more_than_one_lock() { + tests::test_acquire_more_than_one_lock::>(); + } + + #[test] + fn test_lock_arc_access_in_unwind() { + tests::test_lock_arc_access_in_unwind::>(); + } + + #[test] + fn test_lock_unsized() { + tests::test_lock_unsized::>(); + } +} + +#[cfg(all(loom, test))] +mod model { + use crate::loom::models; + use crate::parking::park::ImmediatePark; + + type Mutex = super::MutexStackNode; + + #[test] + fn try_lock_join() { + models::try_lock_join::>(); + } + + #[test] + fn lock_join() { + models::lock_join::>(); + } + + #[test] + fn mixed_lock_join() { + models::mixed_lock_join::>(); + } +} diff --git a/src/parking/barging/thread_local.rs b/src/parking/barging/thread_local.rs new file mode 100644 index 0000000..e905718 --- /dev/null +++ b/src/parking/barging/thread_local.rs @@ -0,0 +1,136 @@ +use super::{Mutex, MutexGuard}; +use crate::parking::park::Park; + +#[cfg(test)] +use crate::test::{LockNew, LockThen, LockWithThen, TryLockThen, TryLockWithThen}; + +impl Mutex { + /// The `thread_local` enabled `lock` implementation. + /// + /// Implemented by `lock_with_local_queue_node`. + pub(super) fn lock_impl(&self) -> MutexGuard<'_, T, Ps, Pq> { + self.lock_with_local_queue_node() + } + + /// Underlying implementation of `lock` that is only enabled when the + /// `thread_local` feature is enabled. + /// + /// This implementation will access and modify queue nodes that are stored + /// in the thread local storage of the locking threads. That is, the number + /// of queue nodes is proportional at 1:1 to the number of locking threads. + fn lock_with_local_queue_node(&self) -> MutexGuard<'_, T, Ps, Pq> { + crate::thread_local_parking_node! { static NODE } + // SAFETY: The thread local node: `NODE` is not borrowed to any other + // locking operation for all duration of the `inner` borrow of it. + unsafe { self.inner.lock_with_local_unchecked(&NODE.inner) }.into() + } +} + +/// A Mutex wrapper type that calls `lock_with_local_queue_node` when +/// implementing testing traits. +#[cfg(test)] +struct MutexLocalNode(Mutex); + +#[cfg(test)] +impl LockNew for MutexLocalNode { + type Target = T; + + fn new(value: Self::Target) -> Self + where + Self::Target: Sized, + { + Self(Mutex::new(value)) + } +} + +#[cfg(test)] +impl LockWithThen for MutexLocalNode { + // The barging mutex does not require queue node access. + type Node = (); + + type Guard<'a> + = MutexGuard<'a, T, Ps, Pq> + where + Self: 'a, + Self::Target: 'a; + + fn lock_with_then(&self, (): &mut Self::Node, f: F) -> Ret + where + F: FnOnce(MutexGuard<'_, T, Ps, Pq>) -> Ret, + { + f(self.0.lock_with_local_queue_node()) + } +} + +#[cfg(test)] +impl TryLockWithThen for MutexLocalNode { + fn try_lock_with_then(&self, (): &mut Self::Node, f: F) -> Ret + where + F: FnOnce(Option>) -> Ret, + { + self.0.try_lock_then(f) + } + + fn is_locked(&self) -> bool { + self.0.is_locked() + } +} + +#[cfg(test)] +impl LockThen for MutexLocalNode { + fn lock_then(&self, f: F) -> Ret + where + F: FnOnce(MutexGuard<'_, T, Ps, Pq>) -> Ret, + { + self.0.lock_then(f) + } +} + +#[cfg(test)] +impl TryLockThen for MutexLocalNode {} + +#[cfg(all(not(loom), test))] +mod test { + use crate::parking::park::ImmediatePark; + use crate::test::tests; + + type Mutex = super::MutexLocalNode; + + #[test] + fn lots_and_lots_lock() { + tests::lots_and_lots_lock::>(); + } + + #[test] + fn test_lock_arc_nested() { + tests::test_lock_arc_nested::, Mutex<_>>(); + } + + #[test] + fn test_acquire_more_than_one_lock() { + tests::test_acquire_more_than_one_lock::>(); + } + + #[test] + fn test_lock_arc_access_in_unwind() { + tests::test_lock_arc_access_in_unwind::>(); + } + + #[test] + fn test_lock_unsized() { + tests::test_lock_unsized::>(); + } +} + +#[cfg(all(loom, test))] +mod model { + use crate::loom::models; + use crate::parking::park::ImmediatePark; + + type Mutex = super::MutexLocalNode; + + #[test] + fn lock_join() { + models::lock_join::>(); + } +} diff --git a/src/parking/mod.rs b/src/parking/mod.rs new file mode 100644 index 0000000..e6e0536 --- /dev/null +++ b/src/parking/mod.rs @@ -0,0 +1,24 @@ +//! MCS lock implementations with thread parking support. +//! +//! This module provides implementations that **are not** `no_std` compatible. +//! +//! Each [`raw`] and [`barging`] modules implement the same protocols as their +//! root level counter parts and the same locking interfaces. The distinction +//! is that these Mutex implementations will transparently put the waiting +//! threads to sleep under some policy. Users are free to implement their own +//! policies or pick sensible ones under the [`park`] module. To define your +//! own policy, users must implement the [`Park`] trait. +//! +//! [`raw`]: crate::parking::raw +//! [`barging`]: crate::parking::barging +//! [`park`]: crate::parking::park +//! [`Park`]: crate::parking::park::Park + +pub mod park; +pub mod raw; + +#[cfg(feature = "barging")] +#[cfg_attr(docsrs, doc(cfg(feature = "barging")))] +pub mod barging; + +mod parker; diff --git a/src/parking/park.rs b/src/parking/park.rs new file mode 100644 index 0000000..4605afa --- /dev/null +++ b/src/parking/park.rs @@ -0,0 +1,470 @@ +//! Thread parking policies that determine the behaviour of locks when +//! encountering contention. +//! +//! When a thread is "parked", it essentially goes into a sleeping state until +//! it is awakened by the OS when a event or condition occurs. This is used to +//! prevent busy-waiting, where a thread continuously checks for a condition to +//! be true, wasting CPU resources. +//! +//! This crate integrates with the OS specific thread sleeping and awakening +//! infrastructure transparently. Users are then responsible solely to tell +//! _when_ should the current thread be put to sleep. The `Park` trait defines +//! the interface of which users will then conditionally request the current +//! waiting thread to be parked. + +use core::marker::PhantomData; + +use crate::cfg::debug_abort; +use crate::relax::{Loop, Relax, Spin, SpinBackoff}; + +#[cfg(any(feature = "yield", test))] +use crate::relax::{Yield, YieldBackoff}; + +pub(crate) use wait::{CantPark, ParkWait}; + +/// The thread parking waiting policy to be applied when the lock is contended. +/// +/// # Example +/// +/// ``` +/// // Requires `parking` feature. +/// use mcslock::parking::park::Park; +/// use mcslock::relax::Spin; +/// +/// #[derive(Default)] +/// struct SpinThenPark(u32); +/// +/// unsafe impl Park for SpinThenPark { +/// type Relax = Spin; +/// +/// #[inline(always)] +/// fn new() -> Self { +/// Self::default() +/// } +/// +/// #[inline(always)] +/// fn should_park(&self) -> bool { +/// self.0 >= 100 +/// } +/// +/// #[inline(always)] +/// fn on_failure(&mut self) { +/// self.0 += 1; +/// } +/// } +/// ``` +/// +/// # Safety +/// +/// All associated function implementations **must not** cause a thread exit, +/// such as envoking a uncaught [`core::panic!`] call, or any other operation +/// that will panic the thread. Exiting the thread will result in undefined +/// behiavior. +pub unsafe trait Park { + /// The relax operation that should be run during a period of contention. + type Relax: Relax; + + /// Returns the initial value for this parking policy. + fn new() -> Self; + + /// Hints whether or not should the parking operation be executed at this + /// time. + /// + /// Returning `false` means that the thread is not ready to be put to sleep + /// yet, there is some other event that should occur first. Returning `true` + /// indicates that the thread is no longer waiting for any event, and so it + /// is hinting that it should be parked. + fn should_park(&self) -> bool; + + /// Updates the inner state whenever the thread fails to acquire the lock. + /// + /// This function will be called once whenever both `should_park` returns + /// `false` **and** the thread fails to acquire the lock. This will not be + /// called otherwise. + fn on_failure(&mut self); +} + +mod sealed { + /// The actual implementation of this crate's `Park` types. + pub trait ParkImpl { + type Relax: super::Relax; + + /// The actual `new` implementation. + fn new() -> Self; + + /// The actual `should_park` implementation. + fn should_park(&self) -> bool; + + /// The actual `on_failure` implementation. + fn on_failure(&mut self); + } +} +use sealed::ParkImpl; + +// SAFETY: All `new`, `should_park` and `on_failure` function implementations are +// protected with a process abort (under test with unwind on panic configuration) +// in case any of them where to panic the thread. +unsafe impl Park for P { + type Relax = P::Relax; + + #[inline(always)] + fn new() -> Self { + debug_abort::on_unwind(|| P::new()) + } + + #[inline(always)] + fn should_park(&self) -> bool { + debug_abort::on_unwind(|| P::should_park(self)) + } + + #[inline(always)] + fn on_failure(&mut self) { + debug_abort::on_unwind(|| P::on_failure(self)); + } +} + +/// A busy-wait spin-loop then thread sleeping policy. +/// +/// A thread parking policy that, while trying to acquire the lock, will initially +/// run a busy-wait spin-loop (signaling the CPU to power down) for a number of +/// attempts and then, if unsuccessful, requests for the current thread to be put +/// to sleep. +/// +/// The [`Spin`] relax strategy is executed during waiting loops. +pub struct SpinThenPark { + bounded: Bounded<{ Self::ATTEMPTS }>, +} + +impl SpinThenPark { + /// The maximum number of attempts this policy will run before being parked. + const ATTEMPTS: Uint = DEFAULT_ATTEMPTS; +} + +impl ParkImpl for SpinThenPark { + type Relax = Spin; + + fn new() -> Self { + Self { bounded: Bounded::new() } + } + + fn should_park(&self) -> bool { + self.bounded.should_park() + } + + fn on_failure(&mut self) { + self.bounded.on_failure(); + } +} + +/// A no power down spin-loop then thread sleeping policy. +/// +/// A thread parking policy that, while trying to acquire the lock, will initially +/// rapidly spin in a loop (without signaling the CPU to power down) for a number +/// of attempts and then, if unsuccessful, requests for the current thread to be +/// put to sleep. +/// +/// The [`Loop`] relax strategy is executed during waiting loops. +pub struct LoopThenPark { + bounded: Bounded<{ Self::ATTEMPTS }>, +} + +impl LoopThenPark { + /// The maximum number of attempts this policy will run before being parked. + const ATTEMPTS: Uint = DEFAULT_ATTEMPTS; +} + +impl ParkImpl for LoopThenPark { + type Relax = Loop; + + fn new() -> Self { + Self { bounded: Bounded::new() } + } + + fn should_park(&self) -> bool { + self.bounded.should_park() + } + + fn on_failure(&mut self) { + self.bounded.on_failure(); + } +} + +/// A thread yielding then thread sleeping policy. +/// +/// A thread parking policy that, while trying to acquire the lock, will initially +/// request the OS to yield the current thread, for number of attempts and then, +/// if unsuccessful, requests for the current thread to be put to sleep. +/// +/// The [`Yield`] relax strategy is executed during waiting loops. +#[cfg(any(feature = "yield", test))] +#[cfg_attr(docsrs, doc(cfg(feature = "yield")))] +pub struct YieldThenPark { + bounded: Bounded<{ Self::ATTEMPTS }>, +} + +#[cfg(any(feature = "yield", test))] +impl YieldThenPark { + /// The maximum number of attempts this policy will run before being parked. + const ATTEMPTS: Uint = DEFAULT_ATTEMPTS; +} + +#[cfg(any(feature = "yield", test))] +impl ParkImpl for YieldThenPark { + type Relax = Yield; + + fn new() -> Self { + Self { bounded: Bounded::new() } + } + + fn should_park(&self) -> bool { + self.bounded.should_park() + } + + fn on_failure(&mut self) { + self.bounded.on_failure(); + } +} + +/// Immediately requests the thread to be put to sleep. +/// +/// A thread parking policy that immediately requests that the current thread +/// should be put to sleep. No relax operation is executed during lock waiting +/// loops. During unlock waiting loops, the generic `R` type's relax strategy +/// will be executed. +/// +/// The default relax operation executed is [`Spin`]. +pub struct ImmediatePark { + relax: PhantomData, +} + +impl ParkImpl for ImmediatePark { + type Relax = R; + + fn new() -> Self { + Self { relax: PhantomData } + } + + fn should_park(&self) -> bool { + true + } + + #[cfg(not(tarpaulin_include))] + fn on_failure(&mut self) {} +} + +/// A spin-loop with exponential backoff then thread sleeping policy. +/// +/// A thread parking policy that, while trying to acquire the lock, will initially +/// run a busy-wait spin-loop with exponential backoff (signaling the CPU to +/// power down) for a number of attempts and then, if unsuccessful, requests +/// for the current thread to be put to sleep. +/// +/// The [`SpinBackoff`] relax strategy is executed during waiting loops. +pub struct SpinBackoffThenPark { + bounded: Bounded<{ Self::ATTEMPTS }>, +} + +impl SpinBackoffThenPark { + /// The maximum number of attempts this policy will run before being parked. + const ATTEMPTS: Uint = DEFAULT_ATTEMPTS; +} + +impl ParkImpl for SpinBackoffThenPark { + type Relax = SpinBackoff; + + fn new() -> Self { + Self { bounded: Bounded::new() } + } + + fn should_park(&self) -> bool { + self.bounded.should_park() + } + + fn on_failure(&mut self) { + self.bounded.on_failure(); + } +} + +/// A spin-loop with exponential backoff, then thread yielding and finally thread +/// sleeping policy. +/// +/// A thread parking policy that, while trying to acquire the lock, will initially +/// run a busy-wait spin-loop with exponential backoff (signaling the CPU to +/// power down) up to a threshold, then requests the OS to yield the current +/// thread, for a number of attempts and finally, if unsuccessful, requests for +/// the current thread to be put to sleep. +/// +/// The [`YieldBackoff`] relax strategy is executed during waiting loops. +#[cfg(any(feature = "yield", test))] +#[cfg_attr(docsrs, doc(cfg(feature = "yield")))] +pub struct YieldBackoffThenPark { + bounded: Bounded<{ Self::ATTEMPTS }>, +} + +#[cfg(any(feature = "yield", test))] +impl YieldBackoffThenPark { + /// The maximum number of attempts this policy will run before being parked. + const ATTEMPTS: Uint = DEFAULT_ATTEMPTS; +} + +#[cfg(any(feature = "yield", test))] +impl ParkImpl for YieldBackoffThenPark { + type Relax = YieldBackoff; + + fn new() -> Self { + Self { bounded: Bounded::new() } + } + + fn should_park(&self) -> bool { + self.bounded.should_park() + } + + fn on_failure(&mut self) { + self.bounded.on_failure(); + } +} + +/// An unsigned integer type use as the inner type for [`Bounded`]. +/// +/// All `Bounded` related arithmetic operations (eg. sum) should only +/// use this same type as the right-hand and lef-hand side types. +type Uint = u32; + +/// A default number of attempts to acquire the lock before parking the thread. +const DEFAULT_ATTEMPTS: Uint = 100; + +/// A bounded parking policy that will block the thread for at most some number +/// of attempts. +struct Bounded { + attempts: Uint, +} + +impl Bounded { + const fn new() -> Self { + Self { attempts: 0 } + } + + const fn should_park(&self) -> bool { + self.attempts >= MAX + } + + #[allow(clippy::missing_const_for_fn)] // 1.91.0 + fn on_failure(&mut self) { + self.attempts += 1; + } +} + +mod wait { + use core::marker::PhantomData; + + use crate::lock::Wait; + use crate::parking::park::Park; + use crate::relax::Relax; + + use super::ParkImpl; + + /// A generic parking waiter, that implements [`Park`] so long as `P` + /// implements it too. + /// + /// This saves us from defining a blanket [`Wait`] impl for a generic `T` where + /// `T` implements [`Park`], because that would prevent us from implementing + /// `Wait` for `T` when it implements [`Relax`], since they would conflict. We + /// need both `Park` and `Relax` types to implement `Wait`. + pub struct ParkWait

(PhantomData

); + + impl Wait for ParkWait

{ + type LockRelax = P::Relax; + type UnlockRelax = P::Relax; + type Park = P; + } + + /// A parking policy that never requests the thread to be parked. + /// + /// Useful for `Relax` types so that they can implement the `Wait` trait, even + /// though they will never call any of the `Park` methods. + pub struct CantPark(PhantomData); + + #[cfg(not(tarpaulin_include))] + impl ParkImpl for CantPark { + type Relax = R; + + fn new() -> Self { + Self(PhantomData) + } + + fn should_park(&self) -> bool { + false + } + + fn on_failure(&mut self) {} + } +} + +#[cfg(all(not(loom), test))] +mod test { + use super::{Park, Uint}; + + fn parking_loop() -> (P, Uint) { + let mut parker = P::new(); + let mut counter = 0; + for _ in 0..=MAX.saturating_mul(10) { + while !parker.should_park() { + parker.on_failure(); + counter += 1; + } + } + (parker, counter) + } + + fn should_park_eventually() { + let (waiter, counter) = parking_loop::(); + assert!(waiter.should_park()); + assert_eq!(MAX, counter); + } + + fn should_park_immediately() { + let (waiter, counter) = parking_loop::(); + assert!(waiter.should_park()); + assert_eq!(0, counter); + } + + #[test] + fn spins() { + use super::SpinThenPark; + const MAX: Uint = SpinThenPark::ATTEMPTS; + should_park_eventually::(); + } + + #[test] + fn yields() { + use super::YieldThenPark; + const MAX: Uint = YieldThenPark::ATTEMPTS; + should_park_eventually::(); + } + + #[test] + fn loops() { + use super::LoopThenPark; + const MAX: Uint = LoopThenPark::ATTEMPTS; + should_park_eventually::(); + } + + #[test] + fn spin_backoff() { + use super::SpinBackoffThenPark; + const MAX: Uint = SpinBackoffThenPark::ATTEMPTS; + should_park_eventually::(); + } + + #[test] + fn yield_backoff() { + use super::YieldBackoffThenPark; + const MAX: Uint = YieldBackoffThenPark::ATTEMPTS; + should_park_eventually::(); + } + + #[test] + fn immediately() { + should_park_immediately::(); + } +} diff --git a/src/parking/parker.rs b/src/parking/parker.rs new file mode 100644 index 0000000..ee7b9a3 --- /dev/null +++ b/src/parking/parker.rs @@ -0,0 +1,249 @@ +use crate::lock::{Lock, Wait}; +use crate::parking::park::Park; +use crate::relax::Relax; + +#[cfg(not(all(loom, test)))] +pub(super) use common::Parker; + +#[cfg(all(loom, test))] +pub(super) use loom::Parker; + +/// A trait the specifies the contract of use for Parker implementations. +/// +/// Currently, this crate leverages `atomic_wait`'s API for parking purposes, +/// that provides unified, cross platform wait and wake functionality. This +/// makes implementation simpler, at the cost of loosing platform specific +/// features. Should we choose to integrate with system's interfaces in the +/// future, each Parker implementation should follow this same contract. +pub trait ParkerT { + /// Creates a new locked `Parker` instance. + /// + /// It's expected for a implementing type to be compiler-time evaluable. + #[cfg(not(all(loom, test)))] + const LOCKED: Self; + + /// Creates a new unlocked `Parker` instance. + /// + /// It's expected for a implementing type to be compiler-time evaluable. + #[cfg(not(all(loom, test)))] + const UNLOCKED: Self; + + /// Creates a new locked `Parker` instance with Loom primitives (non-const). + /// + /// Loom primitives are not compiler-time evaluable. + #[cfg(all(loom, test))] + #[cfg(not(tarpaulin))] + fn locked() -> Self; + + /// Creates a new unlocked `Parker` instance with Loom primitives (non-const). + /// + /// Loom primitives are not compiler-time evaluable. + #[cfg(all(loom, test))] + #[cfg(not(tarpaulin))] + fn unlocked() -> Self; + + /// Returns `true` if the lock is currently held. + /// + /// This function does not guarantee strong ordering, only atomicity. + fn is_locked_relaxed(&self) -> bool; + + /// Tries to lock this mutex with acquire load. + /// + /// Returns `true` if successfully moved from unlocked state to locked + /// state, `false` otherwise. + fn try_lock_acquire(&self) -> bool; + + /// Tries to lock this mutex with acquire load and weak exchange. + /// + /// Returns `true` if successfully moved from unlocked state to locked + /// state, `false` otherwise. + fn try_lock_acquire_weak(&self) -> bool; + + /// Blocks unless or until the current thread's token is made availiable. + /// + /// Implementors of this function are expected to call the platform's + /// specific APIs for thread parking and to also implement a mechanism + /// to safeguard agains spurious wake-ups if they are possible. That is, + /// this function should only unblock once a corresponding unpark call has + /// been issued to this parked thread. + fn park_loop_relaxed(&self); + + /// Atomically makes the handle’s token available if it is not already. + /// + /// Implementors of this function are expected to call the platform's + /// specific API for thread unparking. + fn unpark_release(&self); +} + +/// Parks the current thread under the specified policy and `futex` compatible +/// atomic value. +/// +/// If the current thread manages to acquire the lock within the limit of +/// attempts, then just return and unblock the thread, without ever requesting +/// the OS to put the thread to sleep. +/// +/// Else, if the limit has been reached and the lock remains locked, then park +/// the thread and protect it from being awaken by spurious wakeups. +fn park_current_thread_relaxed(parker: &P) { + let mut parking_policy = W::parking_policy(); + while !parking_policy.park.should_park() { + let true = parker.is_locked_relaxed() else { return }; + parking_policy.park.on_failure(); + parking_policy.relax.relax(); + } + parker.park_loop_relaxed(); +} + +impl Lock for Parker { + #[cfg(not(all(loom, test)))] + #[allow(clippy::declare_interior_mutable_const)] + const LOCKED: Self = ParkerT::LOCKED; + + #[cfg(not(all(loom, test)))] + #[allow(clippy::declare_interior_mutable_const)] + const UNLOCKED: Self = ParkerT::UNLOCKED; + + #[cfg(all(loom, test))] + #[cfg(not(tarpaulin))] + fn locked() -> Self { + ParkerT::locked() + } + + #[cfg(all(loom, test))] + #[cfg(not(tarpaulin))] + fn unlocked() -> Self { + ParkerT::unlocked() + } + + fn try_lock_acquire(&self) -> bool { + ParkerT::try_lock_acquire(self) + } + + fn try_lock_acquire_weak(&self) -> bool { + ParkerT::try_lock_acquire_weak(self) + } + + fn wait_lock_relaxed(&self) { + park_current_thread_relaxed::(self); + } + + fn is_locked_relaxed(&self) -> bool { + ParkerT::is_locked_relaxed(self) + } + + fn notify_release(&self) { + ParkerT::unpark_release(self); + } +} + +#[cfg(not(all(loom, test)))] +mod common { + use core::ptr; + use core::sync::atomic::AtomicU32; + use core::sync::atomic::Ordering::{Acquire, Relaxed, Release}; + + use super::ParkerT; + + #[derive(Debug)] + pub struct Parker { + state: AtomicU32, + } + + const UNLOCKED: u32 = 0; + const LOCKED: u32 = 1; + + impl ParkerT for Parker { + #[allow(clippy::declare_interior_mutable_const)] + const LOCKED: Self = { + let state = AtomicU32::new(LOCKED); + Self { state } + }; + + #[allow(clippy::declare_interior_mutable_const)] + const UNLOCKED: Self = { + let state = AtomicU32::new(UNLOCKED); + Self { state } + }; + + fn try_lock_acquire(&self) -> bool { + self.state.compare_exchange(UNLOCKED, LOCKED, Acquire, Relaxed).is_ok() + } + + fn try_lock_acquire_weak(&self) -> bool { + self.state.compare_exchange_weak(UNLOCKED, LOCKED, Acquire, Relaxed).is_ok() + } + + fn is_locked_relaxed(&self) -> bool { + self.state.load(Relaxed) == LOCKED + } + + fn park_loop_relaxed(&self) { + while self.state.load(Relaxed) == LOCKED { + atomic_wait::wait(&self.state, LOCKED); + } + } + + fn unpark_release(&self) { + let state = &self.state; + // TODO: 1.82.0 supports native syntax: + // let ptr = &raw const self.state; + let ptr = ptr::addr_of!(*state); + state.store(UNLOCKED, Release); + atomic_wait::wake_one(ptr); + } + } +} + +#[cfg(all(loom, test))] +#[cfg(not(tarpaulin))] +mod loom { + use core::sync::atomic::Ordering::{Acquire, Relaxed, Release}; + + use loom::sync::atomic::AtomicBool; + use loom::thread; + + use super::ParkerT; + + #[derive(Debug)] + pub struct Parker { + locked: AtomicBool, + } + + const UNLOCKED: bool = false; + const LOCKED: bool = true; + + impl ParkerT for Parker { + fn locked() -> Self { + let locked = AtomicBool::new(LOCKED); + Self { locked } + } + + fn unlocked() -> Self { + let locked = AtomicBool::new(UNLOCKED); + Self { locked } + } + + fn try_lock_acquire(&self) -> bool { + self.locked.compare_exchange(UNLOCKED, LOCKED, Acquire, Relaxed).is_ok() + } + + fn try_lock_acquire_weak(&self) -> bool { + self.locked.compare_exchange_weak(UNLOCKED, LOCKED, Acquire, Relaxed).is_ok() + } + + fn is_locked_relaxed(&self) -> bool { + self.locked.load(Relaxed) == LOCKED + } + + fn park_loop_relaxed(&self) { + while self.locked.load(Relaxed) == LOCKED { + thread::yield_now(); + } + } + + fn unpark_release(&self) { + self.locked.store(UNLOCKED, Release); + thread::yield_now(); + } + } +} diff --git a/src/parking/raw/mod.rs b/src/parking/raw/mod.rs new file mode 100644 index 0000000..7bb80e6 --- /dev/null +++ b/src/parking/raw/mod.rs @@ -0,0 +1,198 @@ +//! MCS lock implementation with thread parking support. +//! +//! The `raw` implementation of MCS lock is fair, that is, it guarantees that +//! thread that have waited for longer will be scheduled first (FIFO). Each +//! waiting thread will spin and park against its own, locally-accessible atomic +//! lock state, which then avoids the network contention of the state access. +//! +//! This module provides an implementation that **is not** `no_std` compatible, +//! and it also requires that queue nodes must be allocated by the callers. +//! Queue nodes are represented by the [`MutexNode`] type. +//! +//! The lock is held for all the duration of the locking closure scope provided +//! to [`Mutex`]'s [`try_lock_then`], [`try_lock_with_then`], [`lock_then`] and +//! [`lock_with_then`] methods. +//! +//! This Mutex is generic over the parking policy. User may choose a policy as +//! long as it implements the [`Park`] trait. +//! +//! There is a number of parking policies provided by the [`park`] module. The +//! following modules provide type aliases for [`Mutex`] associated with a parking +//! policy. See their documentation for more information. +//! +//! [`try_lock_then`]: Mutex::try_lock_then +//! [`try_lock_with_then`]: Mutex::try_lock_with_then +//! [`lock_then`]: Mutex::lock_then +//! [`lock_with_then`]: Mutex::try_lock_with_then +//! [`park`]: crate::parking::park +//! [`Park`]: crate::parking::park::Park + +mod mutex; +pub use mutex::{Mutex, MutexNode}; + +#[cfg(feature = "thread_local")] +#[cfg_attr(docsrs, doc(cfg(feature = "thread_local")))] +mod thread_local; + +#[cfg(feature = "thread_local")] +pub use thread_local::LocalMutexNode; + +/// A MCS lock that implements a `spin then park` parking policy. +/// +/// During lock contention, and for a certain amount of attempts, this lock spins +/// while signaling the processor that it is running a busy-wait spin-loop. Once +/// all attempts have been tried, puts the thread to sleep. +pub mod spins { + use super::mutex; + use crate::parking::park::SpinThenPark; + + /// A [`parking::raw::Mutex`] that implements the [`SpinThenPark`] + /// parking policy. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::raw::{spins::Mutex, MutexNode}; + /// + /// let mutex = Mutex::new(0); + /// let mut node = MutexNode::new(); + /// let value = mutex.lock_with_then(&mut node, |data| *data); + /// assert_eq!(value, 0); + /// ``` + /// [`parking::raw::Mutex`]: mutex::Mutex + pub type Mutex = mutex::Mutex; + + /// A MCS lock that implements a `spin with backoff then park` + /// policy. + /// + /// During lock contention, and for a certain amount of attempts, this lock + /// will perform exponential backoff spinning, signaling the processor that + /// its is running a busy-wait spin-loop. Once all attempts have been tried, + /// puts the thread to sleep. + pub mod backoff { + use super::mutex; + use crate::parking::park::SpinBackoffThenPark; + + /// A [`parking::raw::Mutex`] that implements the [`SpinBackoffThenPark`] + /// parking policy. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::raw::{spins::backoff::Mutex, MutexNode}; + /// + /// let mutex = Mutex::new(0); + /// let mut node = MutexNode::new(); + /// let value = mutex.lock_with_then(&mut node, |data| *data); + /// assert_eq!(value, 0); + /// ``` + /// [`parking::raw::Mutex`]: mutex::Mutex + pub type Mutex = mutex::Mutex; + } +} + +/// A MCS lock that implements a `yield then park` parking policy. +/// +/// During lock contention, and for a certain amount of attempts, this lock will +/// yield the current time slice to the OS scheduler. Once all attempts have +/// been tried, puts the thread to sleep. +#[cfg(any(feature = "yield", loom, test))] +#[cfg_attr(docsrs, doc(cfg(feature = "yield")))] +pub mod yields { + use super::mutex; + use crate::parking::park::YieldThenPark; + + /// A [`parking::raw::Mutex`] that implements the [`YieldThenPark`] parking + /// policy. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::raw::{yields::Mutex, MutexNode}; + /// + /// let mutex = Mutex::new(0); + /// let mut node = MutexNode::new(); + /// let value = mutex.lock_with_then(&mut node, |data| *data); + /// assert_eq!(value, 0); + /// ``` + /// [`parking::raw::Mutex`]: mutex::Mutex + pub type Mutex = mutex::Mutex; + + /// A MCS lock that implements a `yield with backoff then park` + /// parking policy. + /// + /// During lock contention, and for a certain amount of attempts, this lock + /// will perform exponential backoff while spinning, up to a threshold, then + /// yields back to the OS scheduler. Once all attempts have been tried, it + /// will then put the thread to sleep. + pub mod backoff { + use super::mutex; + use crate::parking::park::YieldBackoffThenPark; + + /// A [`parking::raw::Mutex`] that implements the [`YieldBackoffThenPark`] + /// parking policy. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::raw::{yields::backoff::Mutex, MutexNode}; + /// + /// let mutex = Mutex::new(0); + /// let mut node = MutexNode::new(); + /// let value = mutex.lock_with_then(&mut node, |data| *data); + /// assert_eq!(value, 0); + /// ``` + /// [`parking::raw::Mutex`]: mutex::Mutex + pub type Mutex = mutex::Mutex; + } +} + +/// A MCS lock that implements a `loop then park` parking policy. +/// +/// During lock contention, and for a certain amount of attempts, this lock +/// will rapidly spin without telling the CPU to do any power down. Once all +/// attempts have been tried, it will then put the thread to sleep. +pub mod loops { + use super::mutex; + use crate::parking::park::LoopThenPark; + + /// A [`parking::raw::Mutex`] that implements the [`LoopThenPark`] parking + /// policy. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::raw::{loops::Mutex, MutexNode}; + /// + /// let mutex = Mutex::new(0); + /// let mut node = MutexNode::new(); + /// let value = mutex.lock_with_then(&mut node, |data| *data); + /// assert_eq!(value, 0); + /// ``` + /// [`parking::raw::Mutex`]: mutex::Mutex + pub type Mutex = mutex::Mutex; +} + +/// A MCS lock that implements an `immediate park` parking policy. +/// +/// During lock contention, this lock will immediately put the thread to sleep. +pub mod immediate { + use super::mutex; + use crate::parking::park::ImmediatePark; + + /// A [`parking::raw::Mutex`] that implements the [`ImmediatePark`] parking + /// policy. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::raw::{immediate::Mutex, MutexNode}; + /// + /// let mutex = Mutex::new(0); + /// let mut node = MutexNode::new(); + /// let value = mutex.lock_with_then(&mut node, |data| *data); + /// assert_eq!(value, 0); + /// ``` + /// [`parking::raw::Mutex`]: mutex::Mutex + pub type Mutex = mutex::Mutex; +} diff --git a/src/parking/raw/mutex.rs b/src/parking/raw/mutex.rs new file mode 100644 index 0000000..efb2804 --- /dev/null +++ b/src/parking/raw/mutex.rs @@ -0,0 +1,674 @@ +use core::fmt::{self, Debug, Formatter}; +use core::ops::{Deref, DerefMut}; + +use crate::inner::raw as inner; +use crate::parking::park::{Park, ParkWait}; +use crate::parking::parker::Parker; + +#[cfg(test)] +use crate::test::{LockNew, LockThen, LockWithThen, TryLockThen, TryLockWithThen}; + +/// A locally-accessible record for forming the waiting queue. +/// +/// `MutexNode` is an opaque type that holds metadata for the [`Mutex`]'s +/// waiting queue. To acquire a MCS lock, an instance of queue node must be +/// reachable and mutably borrowed for the duration of some associate locking +/// closure. Once the closure goes out of scope, a node instance can be reused +/// as the backing allocation for another lock acquisition. See [`lock_with_then`] +/// and [`try_lock_with_then`] methods on [`Mutex`]. +/// +/// [`lock_with_then`]: Mutex::lock_with_then +/// [`try_lock_with_then`]: Mutex::try_lock_with_then +#[derive(Debug)] +#[repr(transparent)] +pub struct MutexNode { + inner: inner::MutexNode, +} + +impl MutexNode { + /// Creates new `MutexNode` instance. + /// + /// # Examples + /// + /// ``` + /// use mcslock::parking::raw::MutexNode; + /// + /// let node = MutexNode::new(); + /// ``` + #[must_use] + #[inline(always)] + pub const fn new() -> Self { + Self { inner: inner::MutexNode::new() } + } +} + +#[cfg(not(tarpaulin_include))] +#[doc(hidden)] +impl Deref for MutexNode { + type Target = inner::MutexNode; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +#[doc(hidden)] +impl DerefMut for MutexNode { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +#[cfg(not(tarpaulin_include))] +impl Default for MutexNode { + #[inline(always)] + fn default() -> Self { + Self::new() + } +} + +/// A mutual exclusion primitive useful for protecting shared data. +/// +/// This mutex will block threads waiting for the lock to become available. The +/// mutex can also be statically initialized or created via a [`new`] +/// constructor. Each mutex has a type parameter which represents the data that +/// it is protecting. The data can only be accessed through closure parameters +/// provided by [`lock_then`], [`lock_with_then`], [`try_lock_then`] and +/// [`try_lock_with_then`] that guarantees that the data is only ever accessed +/// when the mutex is locked. +/// +/// # Examples +/// +/// ``` +/// use std::sync::Arc; +/// use std::thread; +/// use std::sync::mpsc::channel; +/// +/// use mcslock::parking::raw::{self, MutexNode}; +/// use mcslock::parking::park::SpinThenPark; +/// +/// type Mutex = raw::Mutex; +/// +/// const N: usize = 10; +/// +/// // Spawn a few threads to increment a shared variable (non-atomically), and +/// // let the main thread know once all increments are done. +/// // +/// // Here we're using an Arc to share memory among threads, and the data inside +/// // the Arc is protected with a mutex. +/// let data = Arc::new(Mutex::new(0)); +/// +/// let (tx, rx) = channel(); +/// for _ in 0..N { +/// let (data, tx) = (data.clone(), tx.clone()); +/// thread::spawn(move || { +/// // A queue node must be mutably accessible. +/// let mut node = MutexNode::new(); +/// // The shared state can only be accessed once the lock is held. +/// // Our non-atomic increment is safe because we're the only thread +/// // which can access the shared state when the lock is held. +/// // +/// // We unwrap() the return value to assert that we are not expecting +/// // threads to ever fail while holding the lock. +/// data.lock_with_then(&mut node, |data| { +/// *data += 1; +/// if *data == N { +/// tx.send(()).unwrap(); +/// } +/// // The lock is unlocked here at the end of the closure scope. +/// }); +/// }); +/// } +/// +/// rx.recv().unwrap(); +/// ``` +/// [`new`]: Mutex::new +/// [`lock_then`]: Mutex::lock_then +/// [`lock_with_then`]: Mutex::lock_with_then +/// [`try_lock_then`]: Mutex::try_lock_then +/// [`try_lock_with_then`]: Mutex::try_lock_with_then +pub struct Mutex { + pub(super) inner: inner::Mutex>, +} + +// SAFETY: `inner::Mutex` is `Send` if `T` is `Send`. +unsafe impl Send for Mutex {} +// SAFETY: `inner::Mutex` is `Sync` if `T` is `Send`. +unsafe impl Sync for Mutex {} + +impl Mutex { + /// Creates a new mutex in an unlocked state ready for use. + /// + /// # Examples + /// + /// ``` + /// use mcslock::parking::raw; + /// use mcslock::parking::park::SpinThenPark; + /// + /// type Mutex = raw::Mutex; + /// + /// const MUTEX: Mutex = Mutex::new(0); + /// let mutex = Mutex::new(0); + /// ``` + #[cfg(not(all(loom, test)))] + #[inline] + pub const fn new(value: T) -> Self { + Self { inner: inner::Mutex::new(value) } + } + + /// Creates a new unlocked mutex with Loom primitives (non-const). + #[cfg(all(loom, test))] + #[cfg(not(tarpaulin_include))] + fn new(value: T) -> Self { + Self { inner: inner::Mutex::new(value) } + } + + /// Consumes this mutex, returning the underlying data. + /// + /// # Examples + /// + /// ``` + /// use mcslock::parking::raw; + /// use mcslock::parking::park::SpinThenPark; + /// + /// type Mutex = raw::Mutex; + /// + /// let mutex = Mutex::new(0); + /// assert_eq!(mutex.into_inner(), 0); + /// ``` + #[inline(always)] + pub fn into_inner(self) -> T { + self.inner.into_inner() + } +} + +impl Mutex { + /// Attempts to acquire this mutex and then runs a closure against the + /// proteced data. + /// + /// If the lock could not be acquired at this time, then [`None`] is returned. + /// Otherwise, an [`Some`] with a `&mut T` is returned. The lock will be + /// unlocked once the closure goes out of scope. + /// + /// This function transparently allocates a [`MutexNode`] in the stack for + /// each call, and so it will not reuse the same node for other calls. + /// Consider callig [`try_lock_with_then`] if you want to reuse node + /// allocations. + /// + /// This function does not block. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use std::thread; + /// + /// use mcslock::parking::raw; + /// use mcslock::parking::park::SpinThenPark; + /// + /// type Mutex = raw::Mutex; + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = Arc::clone(&mutex); + /// + /// thread::spawn(move || { + /// c_mutex.try_lock_then(|data| { + /// if let Some(data) = data { + /// *data = 10; + /// } else { + /// println!("try_lock failed"); + /// } + /// }); + /// }) + /// .join().expect("thread::spawn failed"); + /// + /// let value = mutex.lock_then(|data| *data); + /// assert_eq!(value, 10); + /// ``` + /// + /// Compile fail: borrows of the data cannot escape the given closure: + /// + /// ```compile_fail,E0515 + /// use mcslock::parking::raw::spins::Mutex; + /// + /// let mutex = Mutex::new(1); + /// let borrow = mutex.try_lock_then(|data| &*data.unwrap()); + /// ``` + /// [`try_lock_with_then`]: Mutex::try_lock_with_then + #[inline] + pub fn try_lock_then(&self, f: F) -> Ret + where + F: FnOnce(Option<&mut T>) -> Ret, + { + self.try_lock_with_then(&mut MutexNode::new(), f) + } + + /// Attempts to acquire this mutex and then runs a closure against the + /// protected data. + /// + /// If the lock could not be acquired at this time, then [`None`] is returned. + /// Otherwise, an [`Some`] with a `&mut T` is returned. The lock will be + /// unlocked once the closure goes out of scope. + /// + /// To acquire a MCS lock through this function, it's also required a mutably + /// borrowed queue node, which is a record that keeps a link for forming the + /// queue, see [`MutexNode`]. + /// + /// This function does not block. + /// + /// ``` + /// use std::sync::Arc; + /// use std::thread; + /// + /// use mcslock::parking::raw::{self, MutexNode}; + /// use mcslock::parking::park::SpinThenPark; + /// + /// type Mutex = raw::Mutex; + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = Arc::clone(&mutex); + /// + /// thread::spawn(move || { + /// let mut node = MutexNode::new(); + /// c_mutex.try_lock_with_then(&mut node, |data| { + /// if let Some(data) = data { + /// *data = 10; + /// } else { + /// println!("try_lock failed"); + /// } + /// }); + /// }) + /// .join().expect("thread::spawn failed"); + /// + /// let mut node = MutexNode::new(); + /// let value = mutex.lock_with_then(&mut node, |data| *data); + /// assert_eq!(value, 10); + /// ``` + /// + /// Compile fail: borrows of the data cannot escape the given closure: + /// + /// ```compile_fail,E0515 + /// use mcslock::parking::raw::{spins::Mutex, MutexNode}; + /// + /// let mutex = Mutex::new(1); + /// let mut node = MutexNode::new(); + /// let borrow = mutex.try_lock_with_then(&mut node, |data| &*data.unwrap()); + /// ``` + #[inline] + pub fn try_lock_with_then<'a, F, Ret>(&'a self, node: &'a mut MutexNode, f: F) -> Ret + where + F: FnOnce(Option<&mut T>) -> Ret, + { + self.inner.try_lock_with_then(&mut node.inner, f) + } + + /// Acquires this mutex and then runs the closure against the protected data. + /// + /// This function will block the local thread until it is available to acquire + /// the mutex. Upon acquiring the mutex, the user provided closure will be + /// executed against the mutex proteced data. Once the closure goes out of + /// scope, it will unlock the mutex. + /// + /// This function transparently allocates a [`MutexNode`] in the stack for + /// each call, and so it will not reuse the same node for other calls. + /// Consider callig [`lock_with_then`] if you want to reuse node + /// allocations. + /// + /// This function will block if the lock is unavailable. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use std::thread; + /// + /// use mcslock::parking::raw; + /// use mcslock::parking::park::SpinThenPark; + /// + /// type Mutex = raw::Mutex; + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = Arc::clone(&mutex); + /// + /// thread::spawn(move || { + /// c_mutex.lock_then(|data| *data = 10); + /// }) + /// .join().expect("thread::spawn failed"); + /// + /// assert_eq!(mutex.lock_then(|data| *data), 10); + /// ``` + /// + /// Compile fail: borrows of the data cannot escape the given closure: + /// + /// ```compile_fail,E0515 + /// use mcslock::parking::raw::spins::Mutex; + /// + /// let mutex = Mutex::new(1); + /// let borrow = mutex.lock_then(|data| &*data); + /// ``` + /// [`lock_with_then`]: Mutex::lock_with_then + #[inline] + pub fn lock_then(&self, f: F) -> Ret + where + F: FnOnce(&mut T) -> Ret, + { + self.lock_with_then(&mut MutexNode::new(), f) + } + + /// Acquires this mutex and then runs the closure against the proteced data. + /// + /// This function will block the local thread until it is available to acquire + /// the mutex. Upon acquiring the mutex, the user provided closure will be + /// executed against the mutex proteced data. Once the closure goes out of + /// scope, it will unlock the mutex. + /// + /// To acquire a MCS lock through this function, it's also required a mutably + /// borrowed queue node, which is a record that keeps a link for forming the + /// queue, see [`MutexNode`]. + /// + /// This function will block if the lock is unavailable. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use std::thread; + /// + /// use mcslock::parking::raw::{self, MutexNode}; + /// use mcslock::parking::park::SpinThenPark; + /// + /// type Mutex = raw::Mutex; + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = Arc::clone(&mutex); + /// + /// thread::spawn(move || { + /// let mut node = MutexNode::new(); + /// c_mutex.lock_with_then(&mut node, |data| { + /// *data = 10; + /// }); + /// }) + /// .join().expect("thread::spawn failed"); + /// + /// let mut node = MutexNode::new(); + /// assert_eq!(mutex.lock_with_then(&mut node, |data| *data), 10); + /// ``` + /// + /// Compile fail: borrows of the data cannot escape the given closure: + /// + /// ```compile_fail,E0515 + /// use mcslock::parking::raw::{spins::Mutex, MutexNode}; + /// + /// let mutex = Mutex::new(1); + /// let mut node = MutexNode::new(); + /// let borrow = mutex.lock_with_then(&mut node, |data| &*data); + /// ``` + #[inline] + pub fn lock_with_then<'a, F, Ret>(&'a self, node: &'a mut MutexNode, f: F) -> Ret + where + F: FnOnce(&mut T) -> Ret, + { + self.inner.lock_with_then(&mut node.inner, f) + } +} + +impl Mutex { + /// Returns `true` if the lock is currently held. + /// + /// This method does not provide any synchronization guarantees, so its only + /// useful as a heuristic, and so must be considered not up to date. + /// + /// # Example + /// + /// ``` + /// use mcslock::parking::raw; + /// use mcslock::parking::park::SpinThenPark; + /// + /// type Mutex = raw::Mutex; + /// + /// let mutex = Mutex::new(0); + /// + /// mutex.lock_then(|_data| { + /// assert_eq!(mutex.is_locked(), true); + /// }); + /// + /// assert_eq!(mutex.is_locked(), false); + /// ``` + #[inline] + pub fn is_locked(&self) -> bool { + self.inner.is_locked() + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the `Mutex` mutably, no actual locking needs to + /// take place - the mutable borrow statically guarantees no locks exist. + /// + /// # Examples + /// + /// ``` + /// use mcslock::parking::raw; + /// use mcslock::parking::park::SpinThenPark; + /// + /// type Mutex = raw::Mutex; + /// + /// let mut mutex = Mutex::new(0); + /// *mutex.get_mut() = 10; + /// + /// assert_eq!(mutex.lock_then(|data| *data), 10); + /// ``` + #[cfg(not(all(loom, test)))] + #[inline(always)] + pub fn get_mut(&mut self) -> &mut T { + self.inner.get_mut() + } +} + +impl Default for Mutex { + /// Creates a `Mutex`, with the `Default` value for `T`. + #[inline] + fn default() -> Self { + Self::new(Default::default()) + } +} + +impl From for Mutex { + /// Creates a `Mutex` from a instance of `T`. + #[inline] + fn from(data: T) -> Self { + Self::new(data) + } +} + +impl Debug for Mutex { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + self.inner.fmt(f) + } +} + +#[cfg(test)] +impl LockNew for Mutex { + type Target = T; + + fn new(value: Self::Target) -> Self + where + Self::Target: Sized, + { + Self::new(value) + } +} + +#[cfg(test)] +impl LockWithThen for Mutex { + type Node = MutexNode; + + type Guard<'a> + = &'a mut Self::Target + where + Self: 'a, + Self::Target: 'a; + + fn lock_with_then(&self, node: &mut Self::Node, f: F) -> Ret + where + F: FnOnce(&mut Self::Target) -> Ret, + { + self.lock_with_then(node, f) + } +} + +#[cfg(test)] +impl TryLockWithThen for Mutex { + fn try_lock_with_then(&self, node: &mut Self::Node, f: F) -> Ret + where + F: FnOnce(Option<&mut Self::Target>) -> Ret, + { + self.try_lock_with_then(node, f) + } + + fn is_locked(&self) -> bool { + self.is_locked() + } +} + +#[cfg(all(not(loom), test))] +impl crate::test::LockData for Mutex { + fn into_inner(self) -> Self::Target + where + Self::Target: Sized, + { + self.into_inner() + } + + fn get_mut(&mut self) -> &mut Self::Target { + self.get_mut() + } +} + +#[cfg(test)] +impl LockThen for Mutex { + fn lock_then(&self, f: F) -> Ret + where + F: FnOnce(&mut Self::Target) -> Ret, + { + self.lock_then(f) + } +} + +#[cfg(test)] +impl TryLockThen for Mutex { + fn try_lock_then(&self, f: F) -> Ret + where + F: FnOnce(Option<&mut Self::Target>) -> Ret, + { + self.try_lock_then(f) + } +} + +#[cfg(all(not(loom), test))] +mod test { + use crate::parking::raw::{immediate, yields}; + use crate::test::tests; + + type Mutex = immediate::Mutex; + + #[test] + fn node_waiter_drop_does_not_matter() { + tests::node_waiter_drop_does_not_matter::(); + } + + #[test] + fn lots_and_lots_lock_yield_backoff_then_park() { + tests::lots_and_lots_lock::>(); + } + + #[test] + fn lots_and_lots_try_lock_yield_backoff_then_park() { + tests::lots_and_lots_try_lock::>(); + } + + #[test] + fn lots_and_lots_mixed_lock_yield_backoff_then_park() { + tests::lots_and_lots_mixed_lock::>(); + } + + #[test] + fn smoke() { + tests::smoke::>(); + } + + #[test] + fn test_mutex_debug() { + tests::test_mutex_debug::>(); + } + + #[test] + fn test_mutex_from() { + tests::test_mutex_from::>(); + } + + #[test] + fn test_mutex_default() { + tests::test_mutex_default::>(); + } + + #[test] + fn test_try_lock() { + tests::test_try_lock::>(); + } + + #[test] + fn test_into_inner() { + tests::test_into_inner::>(); + } + + #[test] + fn test_into_inner_drop() { + tests::test_into_inner_drop::>(); + } + + #[test] + fn test_get_mut() { + tests::test_get_mut::>(); + } + + #[test] + fn test_lock_arc_nested() { + tests::test_lock_arc_nested::, Mutex<_>>(); + } + + #[test] + fn test_acquire_more_than_one_lock() { + tests::test_acquire_more_than_one_lock::>(); + } + + #[test] + fn test_lock_arc_access_in_unwind() { + tests::test_lock_arc_access_in_unwind::>(); + } + + #[test] + fn test_lock_unsized() { + tests::test_lock_unsized::>(); + } +} + +#[cfg(all(loom, test))] +mod model { + use crate::loom::models; + use crate::parking::raw::immediate::Mutex; + + #[test] + fn try_lock_join() { + models::try_lock_join::>(); + } + + #[test] + fn lock_join() { + models::lock_join::>(); + } + + #[test] + fn mixed_lock_join() { + models::mixed_lock_join::>(); + } +} diff --git a/src/parking/raw/thread_local.rs b/src/parking/raw/thread_local.rs new file mode 100644 index 0000000..0d72942 --- /dev/null +++ b/src/parking/raw/thread_local.rs @@ -0,0 +1,743 @@ +use core::cell::RefCell; + +use super::{Mutex, MutexNode}; +use crate::cfg::thread::LocalKey; +use crate::inner::raw as inner; +use crate::parking::park::Park; + +#[cfg(test)] +use crate::test::{LockNew, LockThen, LockWithThen, TryLockThen, TryLockWithThen}; + +type Key = &'static LocalMutexNode; + +/// Declares a new [`parking::LocalMutexNode`] key, which is a handle to the +/// thread local node of the currently running thread. +/// +/// The macro wraps any number of static declarations and make them thread +/// local. Each provided name is associated with a single thread local key. The +/// keys are wrapped and managed by the [`LocalMutexNode`] type, which are the +/// actual handles meant to be used with the `lock_with_local_then` API family +/// from [`raw::Mutex`]. Handles are provided by reference to functions. +/// +/// See: [`try_lock_with_local_then`], [`lock_with_local_then`], +/// [`try_lock_with_local_then_unchecked`] or [`lock_with_local_then_unchecked`]. +/// +/// The thread local node declaration generated by this macro avoids lazy +/// initialization and does not need to be dropped, which enables a more +/// efficient underlying implementation. See [`std::thread_local!`] macro. +/// +/// # Sintax +/// +/// * Allows multiple static declarations, must be separated with semicolons. +/// * Visibility is optional (private by default). +/// * Requires `static` keyword and a **UPPER_SNAKE_CASE** name. +/// +/// # Example +/// +/// ``` +/// use mcslock::parking::raw::spins::Mutex; +/// +/// // Multiple declarations allowed. +/// mcslock::thread_local_parking_node! { +/// pub static NODE; +/// static OTHER_NODE; +/// } +/// +/// let mutex = Mutex::new(0); +/// // Keys are provided to APIs by reference. +/// mutex.lock_with_local_then(&NODE, |data| *data = 10); +/// assert_eq!(mutex.lock_with_local_then(&NODE, |data| *data), 10); +/// ``` +/// [`raw::Mutex`]: Mutex +/// [`parking::LocalMutexNode`]: LocalMutexNode +/// [`std::thread_local!`]: https://doc.rust-lang.org/std/macro.thread_local.html +/// [`try_lock_with_local_then`]: Mutex::try_lock_with_local_then +/// [`lock_with_local_then`]: Mutex::lock_with_local_then +/// [`try_lock_with_local_then_unchecked`]: Mutex::try_lock_with_local_then_unchecked +/// [`lock_with_local_then_unchecked`]: Mutex::lock_with_local_then_unchecked +#[macro_export] +macro_rules! thread_local_parking_node { + // Empty (base for recursion). + () => {}; + // Process multiple declarations (recursive). + ($vis:vis static $node:ident; $($rest:tt)*) => { + $crate::__thread_local_node_inner! { $vis $node, parking::raw } + $crate::thread_local_parking_node! { $($rest)* } + }; + // Process single declaration. + ($vis:vis static $node:ident) => { + $crate::__thread_local_node_inner! { $vis $node, parking::raw } + }; +} + +/// A handle to a [`MutexNode`] stored at the thread local storage. +/// +/// Thread local nodes can be claimed for temporary, exclusive access during +/// runtime for locking purposes. Node handles refer to the node stored at +/// the current running thread. +/// +/// Just like `MutexNode`, this is an opaque type that holds metadata for the +/// [`parking::raw::Mutex`]'s waiting queue. You must declare a thread local node +/// with the [`thread_local_parking_node!`] macro, and provide the generated +/// handle to the appropriate [`parking::raw::Mutex`] locking APIs. Attempting to +/// lock a mutex with a thread local node that already is in use for the locking +/// thread will cause a panic. Handles are provided by reference to functions. +/// +/// See: [`try_lock_with_local_then`], [`lock_with_local_then`], +/// [`try_lock_with_local_then_unchecked`] or [`lock_with_local_then_unchecked`]. +/// +/// [`MutexNode`]: MutexNode +/// [`parking::raw::Mutex`]: Mutex +/// [`try_lock_with_local_then`]: Mutex::try_lock_with_local_then +/// [`lock_with_local_then`]: Mutex::lock_with_local_then +/// [`try_lock_with_local_then_unchecked`]: Mutex::try_lock_with_local_then_unchecked +/// [`lock_with_local_then_unchecked`]: Mutex::lock_with_local_then_unchecked +#[derive(Debug)] +#[repr(transparent)] +pub struct LocalMutexNode { + pub(crate) inner: inner::LocalMutexNode, +} + +#[cfg(not(tarpaulin_include))] +impl LocalMutexNode { + /// Creates a new `LocalMutexNode` key from the provided thread local node + /// key. + /// + /// This function is **NOT** part of the public API and so must not be + /// called directly by user's code. It is subjected to changes **WITHOUT** + /// prior notice or accompanied with relevant SemVer changes. + #[cfg(not(all(loom, test)))] + #[doc(hidden)] + #[must_use] + #[inline(always)] + pub const fn __new(key: LocalKey>) -> Self { + Self { inner: inner::LocalMutexNode::new(key) } + } + + /// Creates a new Loom based `LocalMutexNode` key from the provided thread + /// local node key (non-const). + #[cfg(all(loom, test))] + pub(crate) const fn new(key: &'static LocalKey>) -> Self { + Self { inner: inner::LocalMutexNode::new(key) } + } +} + +impl Mutex { + /// Attempts to acquire this mutex and then runs a closure against the + /// protected data. + /// + /// If the lock could not be acquired at this time, then a [`None`] value is + /// given back as the closure argument. If the lock has been acquired, then + /// a [`Some`] value with the mutex proteced data is given instead. The lock + /// will be unlocked when the closure scope ends. + /// + /// To acquire a MCS lock through this function, it's also required a + /// queue node, which is a record that keeps a link for forming the queue, + /// to be stored in the current locking thread local storage. See + /// [`LocalMutexNode`] and [`thread_local_parking_node!`]. + /// + /// This function does not block. + /// + /// # Panics + /// + /// Will panic if the thread local node is already mutably borrowed. + /// + /// Panics if the key currently has its destructor running, and it **may** + /// panic if the destructor has previously been run for this thread. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use std::thread; + /// + /// use mcslock::parking::raw::spins::Mutex; + /// + /// mcslock::thread_local_parking_node! { static NODE } + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = Arc::clone(&mutex); + /// + /// thread::spawn(move || { + /// c_mutex.try_lock_with_local_then(&NODE, |data| { + /// if let Some(data) = data { + /// *data = 10; + /// } else { + /// println!("try_lock_with_local failed"); + /// } + /// }); + /// }) + /// .join().expect("thread::spawn failed"); + /// + /// assert_eq!(mutex.lock_with_local_then(&NODE, |data| *data), 10); + /// ``` + /// + /// Compile fail: borrows of the data cannot escape the given closure: + /// + /// ```compile_fail,E0515 + /// use mcslock::parking::raw::spins::Mutex; + /// + /// mcslock::thread_local_parking_node! { static NODE } + /// + /// let mutex = Mutex::new(1); + /// let borrow = mutex.try_lock_with_local_then(&NODE, |data| &*data.unwrap()); + /// ``` + /// + /// Panic: thread local node cannot be borrowed more than once at the same + /// time: + /// + #[doc = concat!("```should_panic(expected = ", already_borrowed_error!(), ")")] + /// use mcslock::parking::raw::spins::Mutex; + /// + /// mcslock::thread_local_parking_node! { static NODE } + /// + /// let mutex = Mutex::new(0); + /// + /// mutex.lock_with_local_then(&NODE, |_data| { + /// // `NODE` is already mutably borrowed in this thread by the + /// // enclosing `lock_with_local_then`, the borrow is live for the full + /// // duration of this closure scope. + /// let mutex = Mutex::new(()); + /// mutex.try_lock_with_local_then(&NODE, |_data| ()); + /// }); + #[doc = "```"] + #[inline] + #[track_caller] + pub fn try_lock_with_local_then(&self, node: Key, f: F) -> Ret + where + F: FnOnce(Option<&mut T>) -> Ret, + { + self.inner.try_lock_with_local_then(&node.inner, f) + } + + /// Attempts to acquire this mutex and then runs a closure against the + /// protected data. + /// + /// If the lock could not be acquired at this time, then a [`None`] value is + /// given back as the closure argument. If the lock has been acquired, then + /// a [`Some`] value with the mutex protected data is given instead. The lock + /// will be unlocked when the closure scope ends. + /// + /// To acquire a MCS lock through this function, it's also required a + /// queue node, which is a record that keeps a link for forming the queue, + /// to be stored in the current locking thread local storage. See + /// [`LocalMutexNode`] and [`thread_local_parking_node!`]. + /// + /// This function does not block. + /// + /// # Safety + /// + /// Unlike [`try_lock_with_local_then`], this method is unsafe because it does + /// not check if the current thread local node is already mutably borrowed. + /// If the current thread local node is already borrowed, calling this + /// function is undefined behavior. + /// + /// # Panics + /// + /// Panics if the key currently has its destructor running, and it **may** + /// panic if the destructor has previously been run for this thread. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use std::thread; + /// + /// use mcslock::parking::raw::spins::Mutex; + /// + /// mcslock::thread_local_parking_node! { static NODE } + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = Arc::clone(&mutex); + /// + /// thread::spawn(move || unsafe { + /// c_mutex.try_lock_with_local_then_unchecked(&NODE, |guard| { + /// if let Some(mut guard) = guard { + /// *guard = 10; + /// } else { + /// println!("try_lock_with_local_then_unchecked failed"); + /// } + /// }); + /// }) + /// .join().expect("thread::spawn failed"); + /// + /// assert_eq!(mutex.lock_with_local_then(&NODE, |guard| *guard), 10); + /// ``` + /// + /// Compile fail: borrows of the data cannot escape the given closure: + /// + /// ```compile_fail,E0515 + /// use mcslock::parking::raw::spins::Mutex; + /// + /// mcslock::thread_local_parking_node! { static NODE } + /// + /// let mutex = Mutex::new(1); + /// let data = unsafe { + /// mutex.try_lock_with_local_then_unchecked(&NODE, |d| &*d.unwrap()) + /// }; + /// ``` + /// + /// Undefined behavior: thread local node cannot be borrowed more than once + /// at the same time: + /// + /// ```no_run + /// use mcslock::parking::raw::spins::Mutex; + /// + /// mcslock::thread_local_parking_node! { static NODE } + /// + /// let mutex = Mutex::new(0); + /// + /// mutex.lock_with_local_then(&NODE, |_data| unsafe { + /// // UB: `NODE` is already mutably borrowed in this thread by the + /// // enclosing `lock_with_local`, the borrow is live for the full + /// // duration of this closure scope. + /// let mutex = Mutex::new(()); + /// mutex.try_lock_with_local_then_unchecked(&NODE, |_data| ()); + /// }); + /// ``` + /// [`try_lock_with_local_then`]: Mutex::try_lock_with_local_then + #[inline] + pub unsafe fn try_lock_with_local_then_unchecked(&self, node: Key, f: F) -> Ret + where + F: FnOnce(Option<&mut T>) -> Ret, + { + // SAFETY: Caller guaranteed that we have exclusive access over `node`. + unsafe { self.inner.try_lock_with_local_then_unchecked(&node.inner, f) } + } + + /// Acquires this mutex and then runs the closure against the protected data. + /// + /// This function will block the local thread until it is available to acquire + /// the mutex. Upon acquiring the mutex, the user provided closure will be + /// executed against the mutex protected data. Once the closure goes out of + /// scope, it will unlock the mutex. + /// + /// To acquire a MCS lock through this function, it's also required a + /// queue node, which is a record that keeps a link for forming the queue, + /// to be stored in the current locking thread local storage. See + /// [`LocalMutexNode`] and [`thread_local_parking_node!`]. + /// + /// This function will block if the lock is unavailable. + /// + /// # Panics + /// + /// Will panic if the thread local node is already mutably borrowed. + /// + /// Panics if the key currently has its destructor running, and it **may** + /// panic if the destructor has previously been run for this thread. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use std::thread; + /// + /// use mcslock::parking::raw::spins::Mutex; + /// + /// mcslock::thread_local_parking_node! { static NODE } + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = Arc::clone(&mutex); + /// + /// thread::spawn(move || { + /// c_mutex.lock_with_local_then(&NODE, |data| *data = 10); + /// }) + /// .join().expect("thread::spawn failed"); + /// + /// assert_eq!(mutex.lock_with_local_then(&NODE, |data| *data), 10); + /// ``` + /// + /// Compile fail: borrows of the data cannot escape the given closure: + /// + /// ```compile_fail,E0515 + /// use mcslock::parking::raw::spins::Mutex; + /// + /// mcslock::thread_local_parking_node! { static NODE } + /// + /// let mutex = Mutex::new(1); + /// let borrow = mutex.lock_with_local(&NODE, |guard| &*guard); + /// ``` + /// + /// Panic: thread local node cannot be borrowed more than once at the same + /// time: + /// + #[doc = concat!("```should_panic(expected = ", already_borrowed_error!(), ")")] + /// use mcslock::parking::raw::spins::Mutex; + /// + /// mcslock::thread_local_parking_node! { static NODE } + /// + /// let mutex = Mutex::new(0); + /// + /// mutex.lock_with_local_then(&NODE, |_data| { + /// // `NODE` is already mutably borrowed in this thread by the enclosing + /// // `lock_with_local_then`, the borrow is live for the full duration + /// // of this closure scope. + /// let mutex = Mutex::new(()); + /// mutex.lock_with_local_then(&NODE, |_data| ()); + /// }); + #[doc = "```"] + #[inline] + #[track_caller] + pub fn lock_with_local_then(&self, node: Key, f: F) -> Ret + where + F: FnOnce(&mut T) -> Ret, + { + self.inner.lock_with_local_then(&node.inner, f) + } + + /// Acquires this mutex and then runs the closure against the protected data. + /// + /// This function will block the local thread until it is available to acquire + /// the mutex. Upon acquiring the mutex, the user provided closure will be + /// executed against the mutex protected data. Once the closure goes out of + /// scope, it will unlock the mutex. + /// + /// To acquire a MCS lock through this function, it's also required a + /// queue node, which is a record that keeps a link for forming the queue, + /// to be stored in the current locking thread local storage. See + /// [`LocalMutexNode`] and [`thread_local_parking_node!`]. + /// + /// This function will block if the lock is unavailable. + /// + /// # Safety + /// + /// Unlike [`lock_with_local_then`], this method is unsafe because it does not + /// check if the current thread local node is already mutably borrowed. If + /// the current thread local node is already borrowed, calling this + /// function is undefined behavior. + /// + /// # Panics + /// + /// Panics if the key currently has its destructor running, and it **may** + /// panic if the destructor has previously been run for this thread. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use std::thread; + /// + /// use mcslock::parking::raw::spins::Mutex; + /// + /// mcslock::thread_local_parking_node! { static NODE } + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = Arc::clone(&mutex); + /// + /// thread::spawn(move || unsafe { + /// c_mutex.lock_with_local_then_unchecked(&NODE, |data| *data = 10); + /// }) + /// .join().expect("thread::spawn failed"); + /// + /// assert_eq!(mutex.lock_with_local_then(&NODE, |data| *data), 10); + /// ``` + /// + /// Compile fail: borrows of the data cannot escape the given closure: + /// + /// ```compile_fail,E0515 + /// use mcslock::parking::raw::spins::Mutex; + /// + /// mcslock::thread_local_parking_node! { static NODE } + /// + /// let mutex = Mutex::new(1); + /// let data = unsafe { + /// mutex.lock_with_local_then_unchecked(&NODE, |data| &*data) + /// }; + /// ``` + /// + /// Undefined behavior: thread local node cannot be borrowed more than once + /// at the same time: + /// + /// ```no_run + /// use mcslock::parking::raw::spins::Mutex; + /// + /// mcslock::thread_local_parking_node! { static NODE } + /// + /// let mutex = Mutex::new(0); + /// + /// mutex.lock_with_local_then(&NODE, |_data| unsafe { + /// // UB: `NODE` is already mutably borrowed in this thread by the + /// // enclosing `lock_with_local_then`, the borrow is live for the full + /// // duration of this closure scope. + /// let mutex = Mutex::new(()); + /// mutex.lock_with_local_then_unchecked(&NODE, |_data| ()); + /// }); + /// ``` + /// [`lock_with_local_then`]: Mutex::lock_with_local_then + #[inline] + pub unsafe fn lock_with_local_then_unchecked(&self, node: Key, f: F) -> Ret + where + F: FnOnce(&mut T) -> Ret, + { + // SAFETY: Caller guaranteed that we have exclusive access over `node`. + unsafe { self.inner.lock_with_local_then_unchecked(&node.inner, f) } + } + + /// Mutable borrows must not escape the closure. + /// + /// ```compile_fail + /// use mcslock::parking::raw::spins::Mutex; + /// + /// mcslock::thread_local_parking_node! { static NODE } + /// + /// let mutex = Mutex::new(1); + /// let borrow = mutex.lock_with_local_then(&NODE, |data| data); + /// ``` + /// + /// ```compile_fail,E0521 + /// use std::thread; + /// use mcslock::parking::raw::spins::Mutex; + /// + /// mcslock::thread_local_parking_node! { static NODE } + /// + /// let mutex = Mutex::new(1); + /// mutex.lock_with_local_then(&NODE, |data| { + /// thread::spawn(move || { + /// let data = data; + /// }); + /// }); + /// ``` + #[cfg(doctest)] + #[cfg(not(tarpaulin_include))] + const fn __borrows_must_not_escape_closure() {} +} + +// A thread local node declaration used for testing. +// +// NOTE: Be mindfull of usage since it is a module global name. +#[cfg(test)] +#[cfg(not(tarpaulin_include))] +thread_local_parking_node! { static TEST_NODE } + +/// A Mutex wrapper type that calls `lock_with_local_then` and +/// `try_lock_with_local_then` when implementing testing traits. +#[cfg(test)] +struct MutexPanic(Mutex); + +#[cfg(test)] +impl LockNew for MutexPanic { + type Target = T; + + fn new(value: Self::Target) -> Self + where + Self::Target: Sized, + { + Self(Mutex::new(value)) + } +} + +#[cfg(test)] +impl LockWithThen for MutexPanic { + // A thread local node is transparently accessed instead. + type Node = (); + + type Guard<'a> + = &'a mut Self::Target + where + Self: 'a, + Self::Target: 'a; + + fn lock_with_then(&self, (): &mut Self::Node, f: F) -> Ret + where + F: FnOnce(&mut Self::Target) -> Ret, + { + self.0.lock_with_local_then(&TEST_NODE, f) + } +} + +#[cfg(test)] +impl TryLockWithThen for MutexPanic { + fn try_lock_with_then(&self, (): &mut Self::Node, f: F) -> Ret + where + F: FnOnce(Option<&mut Self::Target>) -> Ret, + { + self.0.try_lock_with_local_then(&TEST_NODE, f) + } + + fn is_locked(&self) -> bool { + self.0.is_locked() + } +} + +#[cfg(test)] +impl LockThen for MutexPanic {} + +#[cfg(test)] +impl TryLockThen for MutexPanic {} + +/// A Mutex wrapper type that calls `lock_with_local_then_unchecked` and +/// `try_lock_with_local_then_unchecked` when implementing testing traits. +#[cfg(test)] +struct MutexUnchecked(Mutex); + +#[cfg(test)] +impl LockNew for MutexUnchecked { + type Target = T; + + fn new(value: Self::Target) -> Self + where + Self::Target: Sized, + { + Self(Mutex::new(value)) + } +} + +#[cfg(test)] +impl LockWithThen for MutexUnchecked { + // A thread local node is transparently accessed instead. + type Node = (); + + type Guard<'a> + = &'a mut Self::Target + where + Self: 'a, + Self::Target: 'a; + + fn lock_with_then(&self, (): &mut Self::Node, f: F) -> Ret + where + F: FnOnce(&mut Self::Target) -> Ret, + { + // SAFETY: caller must guarantee that this thread local node is not + // already mutably borrowed for some other lock acquisition. + unsafe { self.0.lock_with_local_then_unchecked(&TEST_NODE, f) } + } +} + +#[cfg(test)] +impl TryLockWithThen for MutexUnchecked { + fn try_lock_with_then(&self, (): &mut Self::Node, f: F) -> Ret + where + F: FnOnce(Option<&mut Self::Target>) -> Ret, + { + // SAFETY: caller must guarantee that this thread local node is not + // already mutably borrowed for some other lock acquisition. + unsafe { self.0.try_lock_with_local_then_unchecked(&TEST_NODE, f) } + } + + fn is_locked(&self) -> bool { + self.0.is_locked() + } +} + +#[cfg(test)] +impl LockThen for MutexUnchecked {} + +#[cfg(test)] +impl TryLockThen for MutexUnchecked {} + +#[cfg(all(not(loom), test))] +mod test { + use crate::parking::park::ImmediatePark; + use crate::parking::raw::MutexNode; + use crate::test::tests; + + type MutexPanic = super::MutexPanic; + type MutexUnchecked = super::MutexUnchecked; + + #[test] + fn ref_cell_node_drop_does_not_matter() { + use core::{cell::RefCell, mem}; + assert!(!mem::needs_drop::>()); + } + + #[test] + fn lots_and_lots_lock() { + tests::lots_and_lots_lock::>(); + } + + #[test] + fn lots_and_lots_lock_unchecked() { + tests::lots_and_lots_lock::>(); + } + + #[test] + fn smoke() { + tests::smoke::>(); + } + + #[test] + fn smoke_unchecked() { + tests::smoke::>(); + } + + #[test] + fn test_try_lock() { + tests::test_try_lock::>(); + } + + #[test] + fn test_try_lock_unchecked() { + tests::test_try_lock::>(); + } + + #[test] + #[should_panic = already_borrowed_error!()] + fn test_lock_arc_nested() { + tests::test_lock_arc_nested::, MutexPanic<_>>(); + } + + #[test] + #[should_panic = already_borrowed_error!()] + fn test_acquire_more_than_one_lock() { + tests::test_acquire_more_than_one_lock::>(); + } + + #[test] + fn test_lock_arc_access_in_unwind() { + tests::test_lock_arc_access_in_unwind::>(); + } + + #[test] + fn test_lock_arc_access_in_unwind_unchecked() { + tests::test_lock_arc_access_in_unwind::>(); + } + + #[test] + fn test_lock_unsized() { + tests::test_lock_unsized::>(); + } + + #[test] + fn test_lock_unsized_unchecked() { + tests::test_lock_unsized::>(); + } +} + +#[cfg(all(loom, test))] +mod model { + use crate::loom::models; + use crate::parking::park::ImmediatePark; + + type MutexPanic = super::MutexPanic; + type MutexUnchecked = super::MutexUnchecked; + + #[test] + fn try_lock_join_panic() { + models::try_lock_join::>(); + } + + #[test] + fn lock_join_panic() { + models::lock_join::>(); + } + + #[test] + fn mixed_lock_join_panic() { + models::mixed_lock_join::>(); + } + + #[test] + fn try_lock_join_unchecked() { + models::try_lock_join::>(); + } + + #[test] + fn lock_join_unchecked() { + models::lock_join::>(); + } + + #[test] + fn mixed_lock_join_unchecked() { + models::mixed_lock_join::>(); + } +} diff --git a/src/raw/thread_local.rs b/src/raw/thread_local.rs index 3ad3b90..a242197 100644 --- a/src/raw/thread_local.rs +++ b/src/raw/thread_local.rs @@ -454,7 +454,7 @@ impl Mutex { /// ```no_run /// use mcslock::raw::spins::Mutex; /// - /// mcslock::thread_local_node!{ static NODE } + /// mcslock::thread_local_node! { static NODE } /// /// let mutex = Mutex::new(0); /// @@ -505,7 +505,7 @@ impl Mutex { const fn __borrows_must_not_escape_closure() {} } -// A thread local node definition used for testing. +// A thread local node declaration used for testing. // // NOTE: Be mindfull of usage since it is a module global name. #[cfg(test)] diff --git a/src/relax.rs b/src/relax.rs index bcd11da..eefec63 100644 --- a/src/relax.rs +++ b/src/relax.rs @@ -301,18 +301,23 @@ mod wait { use crate::lock::Wait; use crate::relax::Relax; + #[cfg(feature = "parking")] + use crate::parking::park::CantPark; + /// A generic relaxed waiter, that implements [`Relax`] so long as `R` /// implements it too. /// /// This saves us from defining a blanket [`Wait`] impl for a generic `T` where /// `T` implements [`Relax`], because that would prevent us from implementing - /// `Wait` for `T` where `T` implements some other target trait, since they - /// would conflict. + /// `Wait` for `T` when it implements [`Park`], since they would conflict. We + /// need both `Relax` and `Park` types to implement `Wait`. pub struct RelaxWait(PhantomData); impl Wait for RelaxWait { type LockRelax = R; type UnlockRelax = R; + #[cfg(feature = "parking")] + type Park = CantPark; } } diff --git a/src/thread_local.rs b/src/thread_local.rs index 2aadb0d..3664239 100644 --- a/src/thread_local.rs +++ b/src/thread_local.rs @@ -1,4 +1,5 @@ -/// Non-recursive, inner definition of `thread_local_node!`. +/// Non-recursive, inner definition of `thread_local_node!` and +/// `thread_local_parking_node!`. /// /// This macro is **NOT** part of the public API and so must not be called /// directly by user's code. It is subjected to changes **WITHOUT** prior @@ -19,7 +20,8 @@ macro_rules! __thread_local_node_inner { }; } -/// Non-recursive, Loom based inner definition of `thread_local_node!`. +/// Non-recursive, Loom based inner definition of `thread_local_node!` and +/// `thread_local_parking_node!`. /// /// This node declaration uses Loom primitives and it can't be evaluated at /// compile-time since Loom does not support that feature. Loom's `thread_local!`