From bacd71f1580cd18b7ca287ca68ddb73e6c394c90 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Wed, 12 Nov 2025 21:17:16 -0500 Subject: [PATCH] feat: implement input stream support for web backends Both backends were returning empty configs or panicking when trying to use microphone input. Now properly implements getUserMedia() with async support. --- CHANGELOG.md | 2 + Cargo.toml | 16 +++ src/host/emscripten/mod.rs | 263 ++++++++++++++++++++++++++++++----- src/host/webaudio/mod.rs | 278 ++++++++++++++++++++++++++++++++----- 4 files changed, 487 insertions(+), 72 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 907acc27c..c8212a77a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ - CoreAudio: Make `Stream` implement `Send`. - CoreAudio: Remove `Clone` impl from `Stream`. - Emscripten: Add `BufferSize::Fixed` validation against supported range. +- Emscripten: Implement input stream support using `getUserMedia()`. - iOS: Fix example by properly activating audio session. - iOS: Add complete AVAudioSession integration for device enumeration and buffer size control. - JACK: Add `BufferSize::Fixed` validation to reject requests that don't match server buffer size. @@ -36,6 +37,7 @@ - Wasm: Removed optional `wee-alloc` feature for security reasons. - Wasm: Make `Stream` implement `Send`. - WebAudio: Add `BufferSize::Fixed` validation against supported range. +- WebAudio: Implement input stream support using `getUserMedia()`. # Version 0.16.0 (2025-06-07) diff --git a/Cargo.toml b/Cargo.toml index 0ff38048c..875451a75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,6 +94,14 @@ web-sys = { version = "0.3.35", features = [ "AudioBufferSourceNode", "AudioNode", "AudioDestinationNode", + "AudioProcessingEvent", + "MediaDevices", + "MediaStream", + "MediaStreamAudioSourceNode", + "MediaStreamConstraints", + "MediaTrackConstraints", + "Navigator", + "ScriptProcessorNode", "Window", "AudioContextState", ] } @@ -108,6 +116,14 @@ web-sys = { version = "0.3.35", features = [ "AudioBufferSourceNode", "AudioNode", "AudioDestinationNode", + "AudioProcessingEvent", + "MediaDevices", + "MediaStream", + "MediaStreamAudioSourceNode", + "MediaStreamConstraints", + "MediaTrackConstraints", + "Navigator", + "ScriptProcessorNode", "Window", "AudioContextState", ] } diff --git a/src/host/emscripten/mod.rs b/src/host/emscripten/mod.rs index dafc041af..97fe9caa9 100644 --- a/src/host/emscripten/mod.rs +++ b/src/host/emscripten/mod.rs @@ -1,16 +1,17 @@ use js_sys::Float32Array; +use std::sync::{Arc, Mutex}; use std::time::Duration; use wasm_bindgen::prelude::*; use wasm_bindgen::JsCast; use wasm_bindgen_futures::{spawn_local, JsFuture}; -use web_sys::AudioContext; +use web_sys::{AudioContext, AudioProcessingEvent, MediaStream, MediaStreamConstraints}; use crate::traits::{DeviceTrait, HostTrait, StreamTrait}; use crate::{ - BufferSize, BuildStreamError, Data, DefaultStreamConfigError, DeviceNameError, DevicesError, - InputCallbackInfo, OutputCallbackInfo, PauseStreamError, PlayStreamError, SampleFormat, - SampleRate, StreamConfig, StreamError, SupportedBufferSize, SupportedStreamConfig, - SupportedStreamConfigRange, SupportedStreamConfigsError, + BackendSpecificError, BufferSize, BuildStreamError, Data, DefaultStreamConfigError, + DeviceNameError, DevicesError, InputCallbackInfo, OutputCallbackInfo, PauseStreamError, + PlayStreamError, SampleFormat, SampleRate, StreamConfig, StreamError, SupportedBufferSize, + SupportedStreamConfig, SupportedStreamConfigRange, SupportedStreamConfigsError, }; // The emscripten backend currently works by instantiating an `AudioContext` object per `Stream`. @@ -69,6 +70,35 @@ impl Devices { } } +/// Helper function to generate supported stream configurations. +/// Emscripten/WebAudio supports the same configurations for both input and output streams. +fn supported_configs() -> ::std::vec::IntoIter { + let buffer_size = SupportedBufferSize::Range { + min: MIN_BUFFER_SIZE, + max: MAX_BUFFER_SIZE, + }; + let configs: Vec<_> = (MIN_CHANNELS..=MAX_CHANNELS) + .map(|channels| SupportedStreamConfigRange { + channels, + min_sample_rate: MIN_SAMPLE_RATE, + max_sample_rate: MAX_SAMPLE_RATE, + buffer_size: buffer_size.clone(), + sample_format: SUPPORTED_SAMPLE_FORMAT, + }) + .collect(); + configs.into_iter() +} + +/// Helper function to get the default configuration. +/// Emscripten/WebAudio uses the same logic for both input and output. +fn default_config() -> Result { + let config = supported_configs() + .max_by(|a, b| a.cmp_default_heuristics(b)) + .ok_or(DefaultStreamConfigError::DeviceNotAvailable)? + .with_sample_rate(DEFAULT_SAMPLE_RATE); + Ok(config) +} + impl Device { #[inline] fn name(&self) -> Result { @@ -79,43 +109,22 @@ impl Device { fn supported_input_configs( &self, ) -> Result { - unimplemented!(); + Ok(supported_configs()) } #[inline] fn supported_output_configs( &self, ) -> Result { - let buffer_size = SupportedBufferSize::Range { - min: MIN_BUFFER_SIZE, - max: MAX_BUFFER_SIZE, - }; - let configs: Vec<_> = (MIN_CHANNELS..=MAX_CHANNELS) - .map(|channels| SupportedStreamConfigRange { - channels, - min_sample_rate: MIN_SAMPLE_RATE, - max_sample_rate: MAX_SAMPLE_RATE, - buffer_size: buffer_size.clone(), - sample_format: SUPPORTED_SAMPLE_FORMAT, - }) - .collect(); - Ok(configs.into_iter()) + Ok(supported_configs()) } fn default_input_config(&self) -> Result { - unimplemented!(); + default_config() } fn default_output_config(&self) -> Result { - const EXPECT: &str = "expected at least one valid webaudio stream config"; - let config = self - .supported_output_configs() - .expect(EXPECT) - .max_by(|a, b| a.cmp_default_heuristics(b)) - .unwrap() - .with_sample_rate(DEFAULT_SAMPLE_RATE); - - Ok(config) + default_config() } } @@ -172,17 +181,17 @@ impl DeviceTrait for Device { fn build_input_stream_raw( &self, - _config: &StreamConfig, - _sample_format: SampleFormat, - _data_callback: D, - _error_callback: E, + config: &StreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, _timeout: Option, ) -> Result where D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static, { - unimplemented!() + build_input_stream_emscripten(config, sample_format, data_callback, error_callback) } fn build_output_stream_raw( @@ -390,9 +399,191 @@ impl Iterator for Devices { } } +fn build_input_stream_emscripten( + config: &StreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, +) -> Result +where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, +{ + if sample_format != SUPPORTED_SAMPLE_FORMAT { + return Err(BuildStreamError::StreamConfigNotSupported); + } + + let n_channels = config.channels as usize; + let buffer_size_frames = match config.buffer_size { + BufferSize::Fixed(v) => { + if !(MIN_BUFFER_SIZE..=MAX_BUFFER_SIZE).contains(&v) { + return Err(BuildStreamError::StreamConfigNotSupported); + } + v as usize + } + BufferSize::Default => DEFAULT_BUFFER_SIZE, + }; + let buffer_size_samples = buffer_size_frames * n_channels; + + // Create the AudioContext + let audio_ctxt = AudioContext::new().map_err(|err| { + let description = format!("Failed to create AudioContext: {:?}", err); + BuildStreamError::from(BackendSpecificError { description }) + })?; + + // Get the window and navigator objects + let window = web_sys::window().ok_or_else(|| { + let description = "Failed to get window object".to_string(); + BuildStreamError::from(BackendSpecificError { description }) + })?; + + let navigator = window.navigator(); + let media_devices = navigator.media_devices().map_err(|err| { + let description = format!("Failed to get media devices: {:?}", err); + BuildStreamError::from(BackendSpecificError { description }) + })?; + + // Create constraints for getUserMedia + let constraints = MediaStreamConstraints::new(); + constraints.set_audio(&JsValue::TRUE); + constraints.set_video(&JsValue::FALSE); + + // Get the media stream asynchronously + let get_user_media_promise = media_devices + .get_user_media_with_constraints(&constraints) + .map_err(|err| { + let description = format!("Failed to call getUserMedia: {:?}", err); + BuildStreamError::from(BackendSpecificError { description }) + })?; + + // Prepare variables that will be moved into the async closure + let audio_ctxt_clone = audio_ctxt.clone(); + let data_callback = Arc::new(Mutex::new(data_callback)); + let error_callback = Arc::new(Mutex::new(error_callback)); + + // Spawn async task to handle the getUserMedia Promise + let future = async move { + match JsFuture::from(get_user_media_promise).await { + Ok(stream_js) => { + // Convert JsValue to MediaStream + let media_stream: MediaStream = match stream_js.dyn_into() { + Ok(stream) => stream, + Err(err) => { + let mut error_cb = error_callback.lock().unwrap(); + let description = format!("Failed to convert to MediaStream: {:?}", err); + error_cb(StreamError::BackendSpecific { + err: BackendSpecificError { description }, + }); + return; + } + }; + + // Create MediaStreamAudioSourceNode + let source = match audio_ctxt_clone.create_media_stream_source(&media_stream) { + Ok(s) => s, + Err(err) => { + let mut error_cb = error_callback.lock().unwrap(); + let description = + format!("Failed to create MediaStreamAudioSourceNode: {:?}", err); + error_cb(StreamError::BackendSpecific { + err: BackendSpecificError { description }, + }); + return; + } + }; + + // Create ScriptProcessorNode for capturing audio + let processor = match audio_ctxt_clone.create_script_processor_with_buffer_size_and_number_of_input_channels_and_number_of_output_channels( + buffer_size_frames as u32, + n_channels as u32, + 0, // No output channels needed for input stream + ) { + Ok(p) => p, + Err(err) => { + let mut error_cb = error_callback.lock().unwrap(); + let description = format!("Failed to create ScriptProcessorNode: {:?}", err); + error_cb(StreamError::BackendSpecific { + err: BackendSpecificError { description }, + }); + return; + } + }; + + // Set up the onaudioprocess callback + let mut temporary_buffer = vec![0f32; buffer_size_samples]; + let onaudioprocess_closure = + Closure::wrap(Box::new(move |event: AudioProcessingEvent| { + let input_buffer = match event.input_buffer() { + Ok(buf) => buf, + Err(_) => return, // Skip this callback if we can't get the buffer + }; + let now = event.playback_time(); + + // Interleave the input channels into our temporary buffer + for channel in 0..n_channels { + if let Ok(channel_data) = input_buffer.get_channel_data(channel as u32) + { + for (i, sample) in channel_data.iter().enumerate() { + if i < buffer_size_frames { + temporary_buffer[i * n_channels + channel] = *sample; + } + } + } + } + + // Call the user's data callback + let len = temporary_buffer.len(); + let data = temporary_buffer.as_mut_ptr() as *mut (); + let data = unsafe { Data::from_parts(data, len, sample_format) }; + let mut callback = data_callback.lock().unwrap(); + let capture = crate::StreamInstant::from_secs_f64(now); + let timestamp = crate::InputStreamTimestamp { + callback: capture, + capture, + }; + let info = InputCallbackInfo { timestamp }; + callback(&data, &info); + }) as Box); + + processor.set_onaudioprocess(Some(onaudioprocess_closure.as_ref().unchecked_ref())); + onaudioprocess_closure.forget(); // Keep closure alive + + // Connect: source -> processor -> destination + if let Err(err) = source.connect_with_audio_node(&processor) { + let mut error_cb = error_callback.lock().unwrap(); + let description = format!("Failed to connect source to processor: {:?}", err); + error_cb(StreamError::BackendSpecific { + err: BackendSpecificError { description }, + }); + return; + } + + // Connect processor to destination (required for onaudioprocess to fire in some browsers) + let _ = processor.connect_with_audio_node(&audio_ctxt_clone.destination()); + } + Err(err) => { + let mut error_cb = error_callback.lock().unwrap(); + let description = format!("getUserMedia failed: {:?}", err); + error_cb(StreamError::BackendSpecific { + err: BackendSpecificError { description }, + }); + } + } + }; + + // Spawn the future + spawn_local(future); + + Ok(Stream { audio_ctxt }) +} + #[inline] fn default_input_device() -> Option { - unimplemented!(); + if is_webaudio_available() { + Some(Device) + } else { + None + } } #[inline] diff --git a/src/host/webaudio/mod.rs b/src/host/webaudio/mod.rs index ec4ed560f..bd7bb2c00 100644 --- a/src/host/webaudio/mod.rs +++ b/src/host/webaudio/mod.rs @@ -4,7 +4,10 @@ extern crate web_sys; use self::wasm_bindgen::prelude::*; use self::wasm_bindgen::JsCast; -use self::web_sys::{AudioContext, AudioContextOptions}; +use self::web_sys::{ + AudioContext, AudioContextOptions, AudioProcessingEvent, MediaStream, + MediaStreamAudioSourceNode, MediaStreamConstraints, ScriptProcessorNode, +}; use crate::traits::{DeviceTrait, HostTrait, StreamTrait}; use crate::{ BackendSpecificError, BufferSize, BuildStreamError, Data, DefaultStreamConfigError, @@ -84,6 +87,35 @@ impl Devices { } } +/// Helper function to generate supported stream configurations. +/// WebAudio supports the same configurations for both input and output streams. +fn supported_configs() -> ::std::vec::IntoIter { + let buffer_size = SupportedBufferSize::Range { + min: MIN_BUFFER_SIZE, + max: MAX_BUFFER_SIZE, + }; + let configs: Vec<_> = (MIN_CHANNELS..=MAX_CHANNELS) + .map(|channels| SupportedStreamConfigRange { + channels, + min_sample_rate: MIN_SAMPLE_RATE, + max_sample_rate: MAX_SAMPLE_RATE, + buffer_size: buffer_size.clone(), + sample_format: SUPPORTED_SAMPLE_FORMAT, + }) + .collect(); + configs.into_iter() +} + +/// Helper function to get the default configuration. +/// WebAudio uses the same logic for both input and output. +fn default_config() -> Result { + let config = supported_configs() + .max_by(|a, b| a.cmp_default_heuristics(b)) + .ok_or(DefaultStreamConfigError::DeviceNotAvailable)? + .with_sample_rate(DEFAULT_SAMPLE_RATE); + Ok(config) +} + impl Device { #[inline] fn name(&self) -> Result { @@ -94,47 +126,24 @@ impl Device { fn supported_input_configs( &self, ) -> Result { - // TODO - Ok(Vec::new().into_iter()) + Ok(supported_configs()) } #[inline] fn supported_output_configs( &self, ) -> Result { - let buffer_size = SupportedBufferSize::Range { - min: MIN_BUFFER_SIZE, - max: MAX_BUFFER_SIZE, - }; - let configs: Vec<_> = (MIN_CHANNELS..=MAX_CHANNELS) - .map(|channels| SupportedStreamConfigRange { - channels, - min_sample_rate: MIN_SAMPLE_RATE, - max_sample_rate: MAX_SAMPLE_RATE, - buffer_size: buffer_size.clone(), - sample_format: SUPPORTED_SAMPLE_FORMAT, - }) - .collect(); - Ok(configs.into_iter()) + Ok(supported_configs()) } #[inline] fn default_input_config(&self) -> Result { - // TODO - Err(DefaultStreamConfigError::StreamTypeNotSupported) + default_config() } #[inline] fn default_output_config(&self) -> Result { - const EXPECT: &str = "expected at least one valid webaudio stream config"; - let config = self - .supported_output_configs() - .expect(EXPECT) - .max_by(|a, b| a.cmp_default_heuristics(b)) - .unwrap() - .with_sample_rate(DEFAULT_SAMPLE_RATE); - - Ok(config) + default_config() } } @@ -174,18 +183,21 @@ impl DeviceTrait for Device { fn build_input_stream_raw( &self, - _config: &StreamConfig, - _sample_format: SampleFormat, - _data_callback: D, - _error_callback: E, + config: &StreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, _timeout: Option, ) -> Result where D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static, { - // TODO - Err(BuildStreamError::StreamConfigNotSupported) + if !valid_config(config, sample_format) { + return Err(BuildStreamError::StreamConfigNotSupported); + } + + build_input_stream(config, sample_format, data_callback, error_callback) } /// Create an output stream. @@ -482,8 +494,11 @@ impl Iterator for Devices { #[inline] fn default_input_device() -> Option { - // TODO - None + if is_webaudio_available() { + Some(Device) + } else { + None + } } #[inline] @@ -495,6 +510,197 @@ fn default_output_device() -> Option { } } +fn build_input_stream( + config: &StreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, +) -> Result +where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, +{ + let n_channels = config.channels as usize; + let buffer_size_frames = match config.buffer_size { + BufferSize::Fixed(v) => { + if !(MIN_BUFFER_SIZE..=MAX_BUFFER_SIZE).contains(&v) { + return Err(BuildStreamError::StreamConfigNotSupported); + } + v as usize + } + BufferSize::Default => DEFAULT_BUFFER_SIZE, + }; + let buffer_size_samples = buffer_size_frames * n_channels; + + // Create the AudioContext + let mut stream_opts = AudioContextOptions::new(); + stream_opts.sample_rate(config.sample_rate.0 as f32); + let ctx = AudioContext::new_with_context_options(&stream_opts).map_err(|err| { + let description = format!("{:?}", err); + let err = BackendSpecificError { description }; + BuildStreamError::from(err) + })?; + + let ctx = Arc::new(ctx); + + // Container for managing lifecycle of closures + let on_ended_closures: Vec>>>> = Vec::new(); + + // Get the window and navigator objects + let window = web_sys::window().ok_or_else(|| { + let description = "Failed to get window object".to_string(); + BuildStreamError::from(BackendSpecificError { description }) + })?; + + let navigator = window.navigator(); + let media_devices = navigator.media_devices().map_err(|err| { + let description = format!("Failed to get media devices: {:?}", err); + BuildStreamError::from(BackendSpecificError { description }) + })?; + + // Create constraints for getUserMedia + let constraints = MediaStreamConstraints::new(); + constraints.set_audio(&JsValue::TRUE); + constraints.set_video(&JsValue::FALSE); + + // Get the media stream asynchronously + let get_user_media_promise = media_devices + .get_user_media_with_constraints(&constraints) + .map_err(|err| { + let description = format!("Failed to call getUserMedia: {:?}", err); + BuildStreamError::from(BackendSpecificError { description }) + })?; + + // Prepare variables that will be moved into the async closure + let ctx_clone = ctx.clone(); + let data_callback = Arc::new(Mutex::new(data_callback)); + let error_callback = Arc::new(Mutex::new(error_callback)); + + // Spawn async task to handle the getUserMedia Promise + let future = async move { + use wasm_bindgen_futures::JsFuture; + + match JsFuture::from(get_user_media_promise).await { + Ok(stream_js) => { + // Convert JsValue to MediaStream + let media_stream: MediaStream = match stream_js.dyn_into() { + Ok(stream) => stream, + Err(err) => { + let mut error_cb = error_callback.lock().unwrap(); + let description = format!("Failed to convert to MediaStream: {:?}", err); + error_cb(StreamError::BackendSpecific { + err: BackendSpecificError { description }, + }); + return; + } + }; + + // Create MediaStreamAudioSourceNode + let source = match ctx_clone.create_media_stream_source(&media_stream) { + Ok(s) => s, + Err(err) => { + let mut error_cb = error_callback.lock().unwrap(); + let description = + format!("Failed to create MediaStreamAudioSourceNode: {:?}", err); + error_cb(StreamError::BackendSpecific { + err: BackendSpecificError { description }, + }); + return; + } + }; + + // Create ScriptProcessorNode for capturing audio + // Arguments: buffer_size, input_channels, output_channels + let processor = match ctx_clone.create_script_processor_with_buffer_size_and_number_of_input_channels_and_number_of_output_channels( + buffer_size_frames as u32, + n_channels as u32, + 0, // No output channels needed for input stream + ) { + Ok(p) => p, + Err(err) => { + let mut error_cb = error_callback.lock().unwrap(); + let description = format!("Failed to create ScriptProcessorNode: {:?}", err); + error_cb(StreamError::BackendSpecific { + err: BackendSpecificError { description }, + }); + return; + } + }; + + // Set up the onaudioprocess callback + let mut temporary_buffer = vec![0f32; buffer_size_samples]; + let onaudioprocess_closure = + Closure::wrap(Box::new(move |event: web_sys::AudioProcessingEvent| { + let input_buffer = match event.input_buffer() { + Ok(buf) => buf, + Err(_) => return, // Skip this callback if we can't get the buffer + }; + let now = event.playback_time(); + + // Interleave the input channels into our temporary buffer + for channel in 0..n_channels { + if let Ok(channel_data) = input_buffer.get_channel_data(channel as u32) + { + for (i, sample) in channel_data.iter().enumerate() { + if i < buffer_size_frames { + temporary_buffer[i * n_channels + channel] = *sample; + } + } + } + } + + // Call the user's data callback + let len = temporary_buffer.len(); + let data = temporary_buffer.as_mut_ptr() as *mut (); + let mut data = unsafe { Data::from_parts(data, len, sample_format) }; + let mut callback = data_callback.lock().unwrap(); + let capture = crate::StreamInstant::from_secs_f64(now); + let timestamp = crate::InputStreamTimestamp { + callback: capture, + capture, + }; + let info = InputCallbackInfo { timestamp }; + callback(&data, &info); + }) as Box); + + processor.set_onaudioprocess(Some(onaudioprocess_closure.as_ref().unchecked_ref())); + onaudioprocess_closure.forget(); // Keep closure alive + + // Connect: source -> processor -> destination (must connect to destination even with 0 output channels) + if let Err(err) = source.connect_with_audio_node(&processor) { + let mut error_cb = error_callback.lock().unwrap(); + let description = format!("Failed to connect source to processor: {:?}", err); + error_cb(StreamError::BackendSpecific { + err: BackendSpecificError { description }, + }); + return; + } + + // Note: We don't need to connect processor to destination for input-only streams + // but some browsers require it to trigger onaudioprocess events + let _ = processor.connect_with_audio_node(&ctx_clone.destination()); + } + Err(err) => { + let mut error_cb = error_callback.lock().unwrap(); + let description = format!("getUserMedia failed: {:?}", err); + error_cb(StreamError::BackendSpecific { + err: BackendSpecificError { description }, + }); + } + } + }; + + // Spawn the future + wasm_bindgen_futures::spawn_local(future); + + Ok(Stream { + ctx, + on_ended_closures, + config: config.clone(), + buffer_size_frames, + }) +} + // Detects whether the `AudioContext` global variable is available. fn is_webaudio_available() -> bool { js_sys::Reflect::get(&js_sys::global(), &JsValue::from("AudioContext"))