diff --git a/crates/kira/Cargo.toml b/crates/kira/Cargo.toml index 99bc51a1..859d3d05 100644 --- a/crates/kira/Cargo.toml +++ b/crates/kira/Cargo.toml @@ -22,11 +22,11 @@ symphonia = { version = "0.5.4", optional = true, default-features = false } triple_buffer = "8.1.1" [target.'cfg(not(target_arch = "wasm32"))'.dependencies.cpal] -version = "0.17.0" +version = "0.17.3" optional = true [target.'cfg(target_arch = "wasm32")'.dependencies.cpal] -version = "0.17.0" +version = "0.17.3" optional = true features = ["wasm-bindgen"] diff --git a/crates/kira/src/backend/cpal/desktop/stream_manager.rs b/crates/kira/src/backend/cpal/desktop/stream_manager.rs index 14ca03f1..75e7aa27 100644 --- a/crates/kira/src/backend/cpal/desktop/stream_manager.rs +++ b/crates/kira/src/backend/cpal/desktop/stream_manager.rs @@ -5,6 +5,7 @@ use std::{ Arc, Mutex, atomic::{AtomicBool, AtomicU64, Ordering}, }, + thread::JoinHandle, time::Duration, }; @@ -36,6 +37,16 @@ pub(super) struct StreamManagerController { should_drop: Arc, num_stream_errors_discarded: Arc, handled_stream_error_consumer: Mutex>, + thread_handle: Option>, +} + +impl Drop for StreamManagerController { + fn drop(&mut self) { + self.should_drop.store(true, Ordering::SeqCst); + if let Some(handle) = self.thread_handle.take() { + let _ = handle.join(); + } + } } impl StreamManagerController { @@ -86,7 +97,7 @@ impl StreamManager { let (mut initial_result_producer, mut initial_result_consumer) = RingBuffer::new(1); - std::thread::spawn(move || { + let handle = std::thread::spawn(move || { let mut stream_manager = StreamManager { state: State::Idle { renderer }, device_id: device_id(&device), @@ -109,7 +120,7 @@ impl StreamManager { } }; loop { - std::thread::sleep(CHECK_STREAM_INTERVAL); + std::thread::park_timeout(CHECK_STREAM_INTERVAL); if should_drop.load(Ordering::SeqCst) { break; } @@ -126,13 +137,14 @@ impl StreamManager { result?; break; } - std::thread::sleep(Duration::from_micros(100)); + std::thread::park_timeout(Duration::from_micros(100)); } Ok(StreamManagerController { should_drop: should_drop_clone, num_stream_errors_discarded: num_stream_errors_discarded_clone, handled_stream_error_consumer: Mutex::new(handled_stream_error_consumer), + thread_handle: Some(handle), }) } diff --git a/crates/kira/src/backend/cpal/desktop/stream_manager/send_on_drop.rs b/crates/kira/src/backend/cpal/desktop/stream_manager/send_on_drop.rs index 0dab5ed1..ab9ef8c2 100644 --- a/crates/kira/src/backend/cpal/desktop/stream_manager/send_on_drop.rs +++ b/crates/kira/src/backend/cpal/desktop/stream_manager/send_on_drop.rs @@ -42,8 +42,8 @@ impl DerefMut for SendOnDrop { impl Drop for SendOnDrop { fn drop(&mut self) { - self.producer - .push(self.data.take().unwrap()) - .expect("send on drop producer full"); + if let Some(data) = self.data.take() { + let _ = self.producer.push(data); + } } } diff --git a/crates/kira/src/sound/streaming/sound.rs b/crates/kira/src/sound/streaming/sound.rs index bc44b998..afa89200 100644 --- a/crates/kira/src/sound/streaming/sound.rs +++ b/crates/kira/src/sound/streaming/sound.rs @@ -3,9 +3,12 @@ pub(crate) mod decode_scheduler; #[cfg(test)] mod test; -use std::sync::{ - Arc, - atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering}, +use std::{ + sync::{ + Arc, + atomic::{AtomicBool, AtomicPtr, AtomicU8, AtomicU64, Ordering}, + }, + thread::JoinHandle, }; use crate::{ @@ -28,6 +31,8 @@ pub(crate) struct Shared { position: AtomicU64, reached_end: AtomicBool, encountered_error: AtomicBool, + dropped: AtomicBool, + handle: AtomicPtr>, } impl Shared { @@ -38,6 +43,8 @@ impl Shared { state: AtomicU8::new(PlaybackState::Playing as u8), reached_end: AtomicBool::new(false), encountered_error: AtomicBool::new(false), + dropped: AtomicBool::new(false), + handle: AtomicPtr::default(), } } @@ -59,6 +66,11 @@ impl Shared { self.state.store(state as u8, Ordering::SeqCst); } + pub(crate) fn set_handle(&self, handle: JoinHandle<()>) { + self.handle + .store(Box::into_raw(Box::new(handle)), Ordering::SeqCst); + } + #[must_use] pub fn position(&self) -> f64 { f64::from_bits(self.position.load(Ordering::SeqCst)) @@ -89,6 +101,22 @@ pub(crate) struct StreamingSound { shared: Arc, } +impl Drop for StreamingSound { + fn drop(&mut self) { + self.shared.dropped.store(true, Ordering::Release); + let handle = self + .shared + .handle + .swap(std::ptr::NonNull::dangling().as_ptr(), Ordering::SeqCst); + if !handle.is_null() { + // SAFETY: pointer initially stored once from a Box + let handle = unsafe { Box::from_raw(handle) }; + handle.thread().unpark(); + let _ = handle.join(); + } + } +} + impl StreamingSound { #[must_use] pub(super) fn new( diff --git a/crates/kira/src/sound/streaming/sound/decode_scheduler.rs b/crates/kira/src/sound/streaming/sound/decode_scheduler.rs index d3428412..52e3f5ae 100644 --- a/crates/kira/src/sound/streaming/sound/decode_scheduler.rs +++ b/crates/kira/src/sound/streaming/sound/decode_scheduler.rs @@ -92,12 +92,13 @@ impl DecodeScheduler { } pub fn start(mut self) { - std::thread::spawn(move || { + let shared = self.shared.clone(); + let handle = std::thread::spawn(move || { loop { match self.run() { Ok(result) => match result { NextStep::Continue => {} - NextStep::Wait => std::thread::sleep(DECODER_THREAD_SLEEP_DURATION), + NextStep::Wait => std::thread::park_timeout(DECODER_THREAD_SLEEP_DURATION), NextStep::End => break, }, Err(error) => { @@ -107,11 +108,14 @@ impl DecodeScheduler { } } }); + shared.set_handle(handle); } pub fn run(&mut self) -> Result { // if the sound was manually stopped, end the thread - if self.shared.state() == PlaybackState::Stopped { + if self.shared.dropped.load(Ordering::Acquire) + || self.shared.state() == PlaybackState::Stopped + { return Ok(NextStep::End); } // if the frame ringbuffer is full, sleep for a bit