Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/kira/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ symphonia = { version = "0.5.4", optional = true, default-features = false }
triple_buffer = "8.1.1"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies.cpal]
version = "0.17.0"
version = "0.17.3"
optional = true

[target.'cfg(target_arch = "wasm32")'.dependencies.cpal]
version = "0.17.0"
version = "0.17.3"
optional = true
features = ["wasm-bindgen"]

Expand Down
18 changes: 15 additions & 3 deletions crates/kira/src/backend/cpal/desktop/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
Arc, Mutex,
atomic::{AtomicBool, AtomicU64, Ordering},
},
thread::JoinHandle,
time::Duration,
};

Expand Down Expand Up @@ -36,6 +37,16 @@ pub(super) struct StreamManagerController {
should_drop: Arc<AtomicBool>,
num_stream_errors_discarded: Arc<AtomicU64>,
handled_stream_error_consumer: Mutex<Consumer<StreamError>>,
thread_handle: Option<JoinHandle<()>>,
}

impl Drop for StreamManagerController {
fn drop(&mut self) {
self.should_drop.store(true, Ordering::SeqCst);
if let Some(handle) = self.thread_handle.take() {
let _ = handle.join();
}
}
}

impl StreamManagerController {
Expand Down Expand Up @@ -86,7 +97,7 @@ impl StreamManager {

let (mut initial_result_producer, mut initial_result_consumer) = RingBuffer::new(1);

std::thread::spawn(move || {
let handle = std::thread::spawn(move || {
let mut stream_manager = StreamManager {
state: State::Idle { renderer },
device_id: device_id(&device),
Expand All @@ -109,7 +120,7 @@ impl StreamManager {
}
};
loop {
std::thread::sleep(CHECK_STREAM_INTERVAL);
std::thread::park_timeout(CHECK_STREAM_INTERVAL);
if should_drop.load(Ordering::SeqCst) {
break;
}
Expand All @@ -126,13 +137,14 @@ impl StreamManager {
result?;
break;
}
std::thread::sleep(Duration::from_micros(100));
std::thread::park_timeout(Duration::from_micros(100));
}

Ok(StreamManagerController {
should_drop: should_drop_clone,
num_stream_errors_discarded: num_stream_errors_discarded_clone,
handled_stream_error_consumer: Mutex::new(handled_stream_error_consumer),
thread_handle: Some(handle),
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ impl<T> DerefMut for SendOnDrop<T> {

impl<T> Drop for SendOnDrop<T> {
fn drop(&mut self) {
self.producer
.push(self.data.take().unwrap())
.expect("send on drop producer full");
if let Some(data) = self.data.take() {
let _ = self.producer.push(data);
}
}
}
34 changes: 31 additions & 3 deletions crates/kira/src/sound/streaming/sound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ pub(crate) mod decode_scheduler;
#[cfg(test)]
mod test;

use std::sync::{
Arc,
atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
use std::{
sync::{
Arc,
atomic::{AtomicBool, AtomicPtr, AtomicU8, AtomicU64, Ordering},
},
thread::JoinHandle,
};

use crate::{
Expand All @@ -28,6 +31,8 @@ pub(crate) struct Shared {
position: AtomicU64,
reached_end: AtomicBool,
encountered_error: AtomicBool,
dropped: AtomicBool,
handle: AtomicPtr<JoinHandle<()>>,
}

impl Shared {
Expand All @@ -38,6 +43,8 @@ impl Shared {
state: AtomicU8::new(PlaybackState::Playing as u8),
reached_end: AtomicBool::new(false),
encountered_error: AtomicBool::new(false),
dropped: AtomicBool::new(false),
handle: AtomicPtr::default(),
}
}

Expand All @@ -59,6 +66,11 @@ impl Shared {
self.state.store(state as u8, Ordering::SeqCst);
}

pub(crate) fn set_handle(&self, handle: JoinHandle<()>) {
self.handle
.store(Box::into_raw(Box::new(handle)), Ordering::SeqCst);
}

#[must_use]
pub fn position(&self) -> f64 {
f64::from_bits(self.position.load(Ordering::SeqCst))
Expand Down Expand Up @@ -89,6 +101,22 @@ pub(crate) struct StreamingSound {
shared: Arc<Shared>,
}

impl Drop for StreamingSound {
fn drop(&mut self) {
self.shared.dropped.store(true, Ordering::Release);
let handle = self
.shared
.handle
.swap(std::ptr::NonNull::dangling().as_ptr(), Ordering::SeqCst);
if !handle.is_null() {
// SAFETY: pointer initially stored once from a Box
let handle = unsafe { Box::from_raw(handle) };
handle.thread().unpark();
let _ = handle.join();
}
}
}

impl StreamingSound {
#[must_use]
pub(super) fn new<Error: Send + 'static>(
Expand Down
10 changes: 7 additions & 3 deletions crates/kira/src/sound/streaming/sound/decode_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,13 @@ impl<Error: Send + 'static> DecodeScheduler<Error> {
}

pub fn start(mut self) {
std::thread::spawn(move || {
let shared = self.shared.clone();
let handle = std::thread::spawn(move || {
loop {
match self.run() {
Ok(result) => match result {
NextStep::Continue => {}
NextStep::Wait => std::thread::sleep(DECODER_THREAD_SLEEP_DURATION),
NextStep::Wait => std::thread::park_timeout(DECODER_THREAD_SLEEP_DURATION),
NextStep::End => break,
},
Err(error) => {
Expand All @@ -107,11 +108,14 @@ impl<Error: Send + 'static> DecodeScheduler<Error> {
}
}
});
shared.set_handle(handle);
}

pub fn run(&mut self) -> Result<NextStep, Error> {
// if the sound was manually stopped, end the thread
if self.shared.state() == PlaybackState::Stopped {
if self.shared.dropped.load(Ordering::Acquire)
|| self.shared.state() == PlaybackState::Stopped
{
return Ok(NextStep::End);
}
// if the frame ringbuffer is full, sleep for a bit
Expand Down
Loading