Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 95 additions & 1 deletion pulse-binding/src/mainloop/standard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@

use std::os::raw::{c_ulong, c_void};
use std::rc::Rc;
use std::sync::{Arc, Mutex};
#[cfg(not(windows))]
use libc::pollfd;
#[cfg(windows)]
Expand Down Expand Up @@ -268,6 +269,85 @@ impl IterateResult {
pub struct Mainloop {
/// The ref-counted inner data.
pub _inner: Rc<MainloopInner<MainloopInternal>>,

/// Shared with [`MainloopWaker`] clones; invalidated on drop.
waker_handle: Arc<MainloopWakerHandle>,
}

/// Storage shared between a [`Mainloop`] and its [`MainloopWaker`] clones. The
/// pointer is nulled by `Mainloop::drop` under the mutex, before the underlying
/// main loop is freed, so wakers can never observe a freed pointer.
struct MainloopWakerHandle {
ptr: Mutex<*mut MainloopInternal>,
}

// SAFETY: `pa_mainloop_wakeup` is documented as thread safe; the pointer is
// never used outside that call, serialised by the mutex.
unsafe impl Send for MainloopWakerHandle {}
unsafe impl Sync for MainloopWakerHandle {}

/// A `Send + Sync + Clone` handle for waking a [`Mainloop`] from any thread.
///
/// Obtain one from [`Mainloop::waker()`]. [`wakeup()`](Self::wakeup) wraps the
/// thread safe `pa_mainloop_wakeup` C function, and is a no-op once the owning
/// [`Mainloop`] has been dropped.
///
/// # Example
///
/// ```rust,no_run
/// use std::thread;
/// use std::time::Duration;
/// use libpulse_binding::mainloop::standard::Mainloop;
///
/// let mut mainloop = Mainloop::new().unwrap();
/// let waker = mainloop.waker();
///
/// thread::spawn(move || {
/// thread::sleep(Duration::from_secs(1));
/// waker.wakeup();
/// });
///
/// mainloop.iterate(true);
/// ```
#[derive(Clone)]
pub struct MainloopWaker {
handle: Arc<MainloopWakerHandle>,
}

impl std::fmt::Debug for MainloopWaker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MainloopWaker").finish_non_exhaustive()
}
}

impl MainloopWaker {
/// Interrupts a running poll inside the owning [`Mainloop`].
///
/// Safe to call from any thread. Becomes a no-op once the owning `Mainloop`
/// has been dropped.
#[inline]
pub fn wakeup(&self) {
// poisoning is not reachable in practice (only FFI and a pointer write
// run under this lock), but treat it as a no-op for consistency.
if let Ok(guard) = self.handle.ptr.lock() {
if !guard.is_null() {
// SAFETY: thread safe per PulseAudio docs; the pointer is kept
// valid for the duration of this call by the mutex.
unsafe { capi::pa_mainloop_wakeup(*guard); }
}
}
}
}

impl Drop for Mainloop {
fn drop(&mut self) {
// invalidate outstanding wakers before the inner `Rc` drop frees the
// main loop. concurrent `wakeup` calls hold this lock for the duration
// of `pa_mainloop_wakeup`, so no stale pointer survives this release.
if let Ok(mut guard) = self.waker_handle.ptr.lock() {
*guard = std::ptr::null_mut();
}
}
}

impl MainloopTrait for Mainloop {
Expand Down Expand Up @@ -301,7 +381,18 @@ impl Mainloop {
MainloopInner::<MainloopInternal>::new(ptr, std::mem::transmute(api_ptr),
MainloopInner::<MainloopInternal>::drop_actual, true)
};
Some(Self { _inner: Rc::new(ml_inner) })
Some(Self {
_inner: Rc::new(ml_inner),
waker_handle: Arc::new(MainloopWakerHandle { ptr: Mutex::new(ptr) }),
})
}

/// Returns a [`Send`] handle for interrupting a blocking
/// [`iterate()`](Self::iterate) / [`poll()`](Self::poll) call from another
/// thread. See [`MainloopWaker`].
#[inline]
pub fn waker(&self) -> MainloopWaker {
MainloopWaker { handle: Arc::clone(&self.waker_handle) }
}

/// Prepares for a single iteration of the main loop.
Expand Down Expand Up @@ -420,6 +511,9 @@ impl Mainloop {
}

/// Interrupts a running poll (for threaded systems).
///
/// To wake the main loop from a different thread, use [`waker()`](Self::waker)
/// to obtain a [`Send`] handle.
#[inline]
pub fn wakeup(&mut self) {
unsafe { capi::pa_mainloop_wakeup(self._inner.get_ptr()); }
Expand Down