diff --git a/crates/js-component-bindgen/src/function_bindgen.rs b/crates/js-component-bindgen/src/function_bindgen.rs index 8404ce734..7259b6055 100644 --- a/crates/js-component-bindgen/src/function_bindgen.rs +++ b/crates/js-component-bindgen/src/function_bindgen.rs @@ -280,6 +280,9 @@ impl FunctionBindgen<'_> { /// let ret; /// ``` /// + /// This function returns as a first tuple parameter a list of + /// variables that should be created via let statements beforehand. + /// /// # Arguments /// /// * `amt` - number of results @@ -290,35 +293,37 @@ impl FunctionBindgen<'_> { amt: usize, results: &mut Vec, is_async: bool, - ) -> String { + ) -> (String, String) { let mut s = String::new(); + let mut vars_init = String::new(); match amt { 0 => { // Async functions with no returns still return async code, // which will be used as the initial callback result going into the async driver if is_async { - uwrite!(s, "let ret = ") - } else { - uwrite!(s, "let ret;") + uwrite!(s, "ret = ") } + uwriteln!(vars_init, "let ret;"); } 1 => { - uwrite!(s, "let ret = "); + uwrite!(s, "ret = "); results.push("ret".to_string()); + uwriteln!(vars_init, "let ret;"); } n => { - uwrite!(s, "var ["); + uwrite!(s, "["); for i in 0..n { if i > 0 { uwrite!(s, ", "); } - uwrite!(s, "ret{}", i); + uwrite!(s, "ret{i}"); results.push(format!("ret{i}")); + uwriteln!(vars_init, "let ret;"); } uwrite!(s, "] = "); } } - s + (vars_init, s) } fn bitcast(&mut self, cast: &Bitcast, op: &str) -> String { @@ -394,31 +399,8 @@ impl FunctionBindgen<'_> { let start_current_task_fn = self.intrinsic(Intrinsic::AsyncTask( AsyncTaskIntrinsic::CreateNewCurrentTask, )); - let global_task_map = self.intrinsic(Intrinsic::AsyncTask( - AsyncTaskIntrinsic::GlobalAsyncCurrentTaskMap, - )); let component_instance_idx = self.canon_opts.instance.as_u32(); - // If we're within an async function, wait for all top level previous tasks to finish before running - // to ensure that guests do not try to run two tasks at the same time. - if is_async && self.requires_async_porcelain { - uwriteln!( - self.src, - r#" - // All other tasks must finish before we can start this one - const taskMetas = {global_task_map}.get({component_instance_idx}); - if (taskMetas) {{ - const taskPromises = taskMetas - .filter(mt => mt.componentIdx === {component_instance_idx}) - .map(mt => mt.task) - .filter(t => !t.getParentSubtask()) - .map(t => t.exitPromise()); - await Promise.allSettled(taskPromises); - }} - "#, - ); - } - uwriteln!( self.src, r#" @@ -1508,13 +1490,31 @@ impl Bindgen for FunctionBindgen<'_> { // Output result binding preamble (e.g. 'var ret =', 'var [ ret0, ret1] = exports...() ') // along with the code to perofrm the call let sig_results_length = sig.results.len(); - let s = self.generate_result_assignment_lhs(sig_results_length, results, is_async); - - let (call_prefix, call_wrapper) = if self.requires_async_porcelain | self.is_async { - ("await ", Intrinsic::WithGlobalCurrentTaskMetaFnAsync.name()) - } else { - ("", Intrinsic::WithGlobalCurrentTaskMetaFn.name()) - }; + let (vars_init, assignment_lhs) = + self.generate_result_assignment_lhs(sig_results_length, results, is_async); + + let (call_prefix, call_wrapper, call_err_cleanup) = + if self.requires_async_porcelain | self.is_async { + ( + "await ", + Intrinsic::WithGlobalCurrentTaskMetaFnAsync.name(), + r#" + task.reject(err); + task.exit(); + return Promise.reject(err); + "#, + ) + } else { + ( + "", + Intrinsic::WithGlobalCurrentTaskMetaFn.name(), + r#" + task.setErrored(err); + task.exit(); + throw err; + "#, + ) + }; let args = if self.asmjs { let split_i64 = @@ -1559,12 +1559,18 @@ impl Bindgen for FunctionBindgen<'_> { uwriteln!( self.src, - r#"{s} {call_prefix} {call_wrapper}({{ - taskID: task.id(), - componentIdx: task.componentIdx(), - fn: () => {callee_invoke}, - }}); - "#, + r#" + {vars_init} + try {{ + {assignment_lhs} {call_prefix} {call_wrapper}({{ + taskID: task.id(), + componentIdx: task.componentIdx(), + fn: () => {callee_invoke}, + }}); + }} catch (err) {{ + {call_err_cleanup} + }} + "#, ); if self.tracing_enabled { @@ -1593,6 +1599,11 @@ impl Bindgen for FunctionBindgen<'_> { self.intrinsic(Intrinsic::AsyncTask(AsyncTaskIntrinsic::GetCurrentTask)); let component_instance_idx = self.canon_opts.instance.as_u32(); + // At first, use the global current task metadata, in case we are executing from + // inside a with-global-current-task wrapper + let get_global_current_task_meta_fn = + self.intrinsic(Intrinsic::GetGlobalCurrentTaskMetaFn); + uwriteln!( self.src, "{debug_log_fn}('{prefix} [Instruction::CallInterface] ({async_}, @ enter)');", @@ -1647,7 +1658,11 @@ impl Bindgen for FunctionBindgen<'_> { }}; taskCreation: {{ - parentTask = {current_task_get_fn}({component_instance_idx})?.task; + parentTask = {current_task_get_fn}( + {component_instance_idx}, + {get_global_current_task_meta_fn}({component_instance_idx})?.taskID, + )?.task; + if (!parentTask) {{ createTask(); break taskCreation; @@ -1701,17 +1716,34 @@ impl Bindgen for FunctionBindgen<'_> { } // Build the JS expression that calls the callee - let (call_prefix, call_wrapper) = if is_async || self.requires_async_porcelain { - ("await ", Intrinsic::WithGlobalCurrentTaskMetaFnAsync.name()) - } else { - ("", Intrinsic::WithGlobalCurrentTaskMetaFn.name()) - }; + let (call_prefix, call_wrapper, call_err_cleanup) = + if is_async || self.requires_async_porcelain { + ( + "await ", + Intrinsic::WithGlobalCurrentTaskMetaFnAsync.name(), + r#" + task.reject(err); + task.exit(); + return Promise.reject(err); + "#, + ) + } else { + ( + "", + Intrinsic::WithGlobalCurrentTaskMetaFn.name(), + r#" + task.reject(err); + task.exit(); + throw err; + "#, + ) + }; let call = format!( r#"{call_prefix} {call_wrapper}({{ - componentIdx: task.componentIdx(), - taskID: task.id(), - fn: () => {callee_fn_js}({callee_args_js}) - }}) + componentIdx: task.componentIdx(), + taskID: task.id(), + fn: () => {callee_fn_js}({callee_args_js}), + }}) "#, ); @@ -1719,12 +1751,22 @@ impl Bindgen for FunctionBindgen<'_> { // If configured to do *no* error handling at all or throw // error objects directly, we can simply perform the call ErrHandling::None | ErrHandling::ThrowResultErr => { - let s = self.generate_result_assignment_lhs( + let (vars_init, assignment_lhs) = self.generate_result_assignment_lhs( fn_wasm_result_count, results, is_async, ); - uwriteln!(self.src, "{s}{call};"); + uwriteln!( + self.src, + r#" + {vars_init} + try {{ + {assignment_lhs}{call}; + }} catch (err) {{ + {call_err_cleanup} + }} + "# + ); } // If configured to force all thrown errors into result objects, // then we add a try/catch around the call diff --git a/crates/js-component-bindgen/src/intrinsics/component.rs b/crates/js-component-bindgen/src/intrinsics/component.rs index a087ba832..037447d3c 100644 --- a/crates/js-component-bindgen/src/intrinsics/component.rs +++ b/crates/js-component-bindgen/src/intrinsics/component.rs @@ -303,8 +303,6 @@ impl ComponentIntrinsic { this.#locked = locked; }} - // TODO(fix): we might want to check for pre-locked status here, we should be deterministically - // going from locked -> unlocked and vice versa exclusiveLock() {{ {debug_log_fn}('[{component_async_state_class}#exclusiveLock()]', {{ locked: this.#locked, @@ -337,6 +335,54 @@ impl ComponentIntrinsic { this.#onExclusiveReleaseHandlers.push(fn); }} + // nextTaskPromise & nextTaskQueue are used to await current task completion and queues + // any tasks attempting to enter() and complete. + // + // see: nextTaskExecutionSlot() + // + // TODO(threads): this should be unnecessary once threads are properly implemented, + // as the task.enter() logic should suffice (it should be guaranteed that we cannot re-enter + // unless the task in question is the current task in the thread execution, and only one can + // run at a time) + #nextTaskPromise = Promise.resolve(true); + #nextTaskQueue = []; + + async nextTaskExecutionSlot(args) {{ + const {{ task }} = args; + + const placeholder = {{ + completed: false, + task, + promise: task.exitPromise().then(() => {{ + placeholder.completed = true; + }}), + }}; + this.#nextTaskQueue.push(placeholder); + + let next; + while (true) {{ + await this.#nextTaskPromise; + + next = this.#nextTaskQueue.find(placeholder => !placeholder.completed); + + // This task is next in the queue, we can continue + if (next === undefined || next === placeholder) {{ + this.#nextTaskPromise = next.promise; + if (this.#nextTaskQueue.length > 1000) {{ + this.#nextTaskQueue = this.#nextTaskQueue.filter(p => !p.completed); + if (this.#nextTaskQueue.length > 1000) {{ + {debug_log_fn}('[{component_async_state_class}#()nextTaskExecutionSlot] next task queue length > 1000 even after cleanup, tasks may be leaking'); + }} + }} + break; + }} + + // If we get here, this task was *not* next in the queue, continue waiting + // (at this point the task that *is* next will likely have already set itself + // as this.#nextTaskPromise) + }} + }} + #getSuspendedTaskMeta(taskID) {{ return this.#suspendedTasksByTaskID.get(taskID); }} diff --git a/crates/js-component-bindgen/src/intrinsics/mod.rs b/crates/js-component-bindgen/src/intrinsics/mod.rs index 1b93aeb26..519bac969 100644 --- a/crates/js-component-bindgen/src/intrinsics/mod.rs +++ b/crates/js-component-bindgen/src/intrinsics/mod.rs @@ -180,32 +180,48 @@ impl Intrinsic { Intrinsic::Host(i) => i.render(output, args), Intrinsic::GlobalAsyncDeterminism => { - output.push_str(&format!( - "const {var_name} = '{determinism}';\n", + uwriteln!( + output, + "const {var_name} = '{determinism}';", var_name = self.name(), determinism = args.determinism, - )); + ); } Intrinsic::CoinFlip => { - output.push_str(&format!( - "const {var_name} = () => {{ return Math.random() > 0.5; }};\n", + uwriteln!( + output, + "const {var_name} = () => {{ return Math.random() > 0.5; }};", var_name = self.name(), - )); + ); } Intrinsic::ConstantI32Min => output.push_str(&format!( "const {const_name} = -2_147_483_648;\n", const_name = self.name() )), - Intrinsic::ConstantI32Max => output.push_str(&format!( - "const {const_name} = 2_147_483_647;\n", - const_name = self.name() - )), + + Intrinsic::ConstantI32Max => { + uwriteln!( + output, + r#" + const {const_name} = 2_147_483_647; + "#, + const_name = self.name() + ) + } + Intrinsic::TypeCheckValidI32 => { let i32_const_min = Intrinsic::ConstantI32Min.name(); let i32_const_max = Intrinsic::ConstantI32Max.name(); - output.push_str(&format!("const {fn_name} = (n) => typeof n === 'number' && n >= {i32_const_min} && n <= {i32_const_max};\n", fn_name = self.name())) + + uwriteln!( + output, + r#" + const {fn_name} = (n) => typeof n === 'number' && n >= {i32_const_min} && n <= {i32_const_max}; + "#, + fn_name = self.name() + ); } Intrinsic::AsyncFunctionCtor => { @@ -231,25 +247,39 @@ impl Intrinsic { Intrinsic::Base64Compile => { if !args.no_nodejs_compat { - output.push_str(" - const base64Compile = str => WebAssembly.compile(typeof Buffer !== 'undefined' ? Buffer.from(str, 'base64') : Uint8Array.from(atob(str), b => b.charCodeAt(0))); - ") + uwriteln!( + output, + r#" + const base64Compile = str => WebAssembly.compile( + typeof Buffer !== 'undefined' + ? Buffer.from(str, 'base64') + : Uint8Array.from(atob(str), b => b.charCodeAt(0)) + ); + "# + ); } else { - output.push_str(" - const base64Compile = str => WebAssembly.compile(Uint8Array.from(atob(str), b => b.charCodeAt(0))); - ") + uwriteln!( + output, + r#" + const base64Compile = str => WebAssembly.compile(Uint8Array.from(atob(str), b => b.charCodeAt(0))); + "# + ); } } - Intrinsic::ClampGuest => output.push_str( - " - function clampGuest(i, min, max) { - if (i < min || i > max) \ - throw new TypeError(`must be between ${min} and ${max}`); - return i; - } - ", - ), + Intrinsic::ClampGuest => { + uwriteln!( + output, + r#" + function clampGuest(i, min, max) {{ + if (i < min || i > max) {{ + throw new TypeError(`must be between ${{min}} and ${{max}}`); + }} + return i; + }} + "# + ); + } Intrinsic::ComponentError => output.push_str( " @@ -847,7 +877,9 @@ impl Intrinsic { Self::GetGlobalCurrentTaskMetaFn => { let get_current_global_task_meta_fn = Self::GetGlobalCurrentTaskMetaFn.name(); let global_current_task_meta_obj = Self::GlobalCurrentTaskMeta.name(); - output.push_str(&format!( + + uwriteln!( + output, r#" function {get_current_global_task_meta_fn}(componentIdx) {{ const v = {global_current_task_meta_obj}[componentIdx]; @@ -855,13 +887,15 @@ impl Intrinsic { return {{ ...v }}; }} "#, - )); + ); } Self::SetGlobalCurrentTaskMetaFn => { - let set_global_current_task_meta_fn = Self::SetGlobalCurrentTaskMetaFn.name(); + let set_global_current_task_meta_fn = self.name(); let global_current_task_meta_obj = Self::GlobalCurrentTaskMeta.name(); - output.push_str(&format!( + + uwriteln!( + output, r#" function {set_global_current_task_meta_fn}(args) {{ if (!args) {{ throw new TypeError('args missing'); }} @@ -871,7 +905,7 @@ impl Intrinsic { return {global_current_task_meta_obj}[componentIdx] = {{ taskID, componentIdx }}; }} "#, - )); + ); } Self::WithGlobalCurrentTaskMetaFn => { diff --git a/crates/js-component-bindgen/src/intrinsics/p3/async_task.rs b/crates/js-component-bindgen/src/intrinsics/p3/async_task.rs index 4ef382554..aa201fa88 100644 --- a/crates/js-component-bindgen/src/intrinsics/p3/async_task.rs +++ b/crates/js-component-bindgen/src/intrinsics/p3/async_task.rs @@ -1,10 +1,13 @@ //! Intrinsics that represent helpers that implement async tasks +use std::fmt::Write; + use crate::intrinsics::component::ComponentIntrinsic; use crate::intrinsics::conversion::ConversionIntrinsic; use crate::intrinsics::p3::waitable::WaitableIntrinsic; use crate::intrinsics::{Intrinsic, RenderIntrinsicsArgs}; use crate::source::Source; +use crate::uwriteln; /// This enum contains intrinsics that implement async tasks #[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] @@ -329,10 +332,13 @@ impl AsyncTaskIntrinsic { pub fn render(&self, output: &mut Source, _render_args: &RenderIntrinsicsArgs<'_>) { match self { Self::CurrentTaskMayBlock => { - output.push_str(&format!( - "const {var_name} = new WebAssembly.Global({{ value: 'i32', mutable: true }}, 0);\n", - var_name = self.name() - )); + let var_name = self.name(); + uwriteln!( + output, + r#" + const {var_name} = new WebAssembly.Global({{ value: 'i32', mutable: true }}, 0); + "# + ); } Self::GlobalAsyncCurrentTaskMap => { @@ -360,37 +366,40 @@ impl AsyncTaskIntrinsic { let type_check_i32 = Intrinsic::TypeCheckValidI32.name(); let get_global_current_task_meta_fn = Intrinsic::GetGlobalCurrentTaskMetaFn.name(); - output.push_str(&format!(r#" - function {context_set_fn}(ctx, value) {{ - const {{ componentIdx, slot }} = ctx; - if (componentIdx === undefined) {{ throw new TypeError("missing component idx"); }} - if (slot === undefined) {{ throw new TypeError("missing slot"); }} - if (!({type_check_i32}(value))) {{ throw new Error('invalid value for context set (not valid i32)'); }} - - const currentTaskMeta = {get_global_current_task_meta_fn}(componentIdx); - if (!currentTaskMeta) {{ - throw new Error(`missing/incomplete global current task meta for component idx [${{componentIdx}}] during context set`); - }} - const taskID = currentTaskMeta.taskID; - - const taskMeta = {current_task_get_fn}(componentIdx, taskID); - if (!taskMeta) {{ throw new Error('failed to retrieve current task'); }} - - let task = taskMeta.task; - if (!task) {{ throw new Error('invalid/missing current task in metadata while setting context'); }} - - {debug_log_fn}('[{context_set_fn}()] args', {{ - slot, - value, - storage: task.storage, - taskID: task.id(), - componentIdx: task.componentIdx(), - }}); - - if (slot < 0 || slot >= task.storage.length) {{ throw new Error('invalid slot for current task'); }} - task.storage[slot] = value; - }} - "#)); + uwriteln!( + output, + r#" + function {context_set_fn}(ctx, value) {{ + const {{ componentIdx, slot }} = ctx; + if (componentIdx === undefined) {{ throw new TypeError("missing component idx"); }} + if (slot === undefined) {{ throw new TypeError("missing slot"); }} + if (!({type_check_i32}(value))) {{ throw new Error('invalid value for context set (not valid i32)'); }} + + const currentTaskMeta = {get_global_current_task_meta_fn}(componentIdx); + if (!currentTaskMeta) {{ + throw new Error(`missing/incomplete global current task meta for component idx [${{componentIdx}}] during context set`); + }} + const taskID = currentTaskMeta.taskID; + + const taskMeta = {current_task_get_fn}(componentIdx, taskID); + if (!taskMeta) {{ throw new Error('failed to retrieve current task'); }} + + let task = taskMeta.task; + if (!task) {{ throw new Error('invalid/missing current task in metadata while setting context'); }} + + {debug_log_fn}('[{context_set_fn}()] args', {{ + slot, + value, + storage: task.storage, + taskID: task.id(), + componentIdx: task.componentIdx(), + }}); + + if (slot < 0 || slot >= task.storage.length) {{ throw new Error('invalid slot for current task'); }} + task.storage[slot] = value; + }} + "# + ); } Self::ContextGet => { @@ -399,36 +408,39 @@ impl AsyncTaskIntrinsic { let current_task_get_fn = Self::GetCurrentTask.name(); let get_global_current_task_meta_fn = Intrinsic::GetGlobalCurrentTaskMetaFn.name(); - output.push_str(&format!(r#" - function {context_get_fn}(ctx) {{ - const {{ componentIdx, slot }} = ctx; - if (componentIdx === undefined) {{ throw new TypeError("missing component idx"); }} - if (slot === undefined) {{ throw new TypeError("missing slot"); }} - - const currentTaskMeta = {get_global_current_task_meta_fn}(componentIdx); - if (!currentTaskMeta) {{ - throw new Error(`missing/incomplete global current task meta for component idx [${{componentIdx}}] during context set`); - }} - const taskID = currentTaskMeta.taskID; - - const taskMeta = {current_task_get_fn}(componentIdx, taskID); - if (!taskMeta) {{ throw new Error('failed to retrieve current task'); }} - - let task = taskMeta.task; - if (!task) {{ throw new Error('invalid/missing current task in metadata while getting context'); }} - - {debug_log_fn}('[{context_get_fn}()] args', {{ - slot, - storage: task.storage, - taskID: task.id(), - componentIdx: task.componentIdx(), - }}); - - if (slot < 0 || slot >= task.storage.length) {{ throw new Error('invalid slot for current task'); }} - - return task.storage[slot]; - }} - "#)); + uwriteln!( + output, + r#" + function {context_get_fn}(ctx) {{ + const {{ componentIdx, slot }} = ctx; + if (componentIdx === undefined) {{ throw new TypeError("missing component idx"); }} + if (slot === undefined) {{ throw new TypeError("missing slot"); }} + + const currentTaskMeta = {get_global_current_task_meta_fn}(componentIdx); + if (!currentTaskMeta) {{ + throw new Error(`missing/incomplete global current task meta for component idx [${{componentIdx}}] during context set`); + }} + const taskID = currentTaskMeta.taskID; + + const taskMeta = {current_task_get_fn}(componentIdx, taskID); + if (!taskMeta) {{ throw new Error('failed to retrieve current task'); }} + + let task = taskMeta.task; + if (!task) {{ throw new Error('invalid/missing current task in metadata while getting context'); }} + + {debug_log_fn}('[{context_get_fn}()] args', {{ + slot, + storage: task.storage, + taskID: task.id(), + componentIdx: task.componentIdx(), + }}); + + if (slot < 0 || slot >= task.storage.length) {{ throw new Error('invalid slot for current task'); }} + + return task.storage[slot]; + }} + "# + ); } // Equivalent of `task.return` @@ -782,12 +794,13 @@ impl AsyncTaskIntrinsic { let event_code_enum = Intrinsic::AsyncEventCodeEnum.name(); let task_class = Self::AsyncTaskClass.name(); let subtask_class = Self::AsyncSubtaskClass.name(); - let global_async_determinism = Intrinsic::GlobalAsyncDeterminism.name(); let coin_flip_fn = Intrinsic::CoinFlip.name(); let waitable_class = Intrinsic::Waitable(WaitableIntrinsic::WaitableClass).name(); let clear_current_task_fn = Intrinsic::AsyncTask(AsyncTaskIntrinsic::ClearCurrentTask).name(); + let with_global_current_task_meta_async_fn = + Intrinsic::WithGlobalCurrentTaskMetaFnAsync.name(); output.push_str(&format!(r#" class {task_class} {{ @@ -834,8 +847,6 @@ impl AsyncTaskIntrinsic { #parentSubtask = null; - #needsExclusiveLock = false; - #errHandling; #backpressurePromise; @@ -910,7 +921,6 @@ impl AsyncTaskIntrinsic { if (opts.parentSubtask) {{ this.#parentSubtask = opts.parentSubtask; }} - this.#needsExclusiveLock = this.isSync() || !this.hasCallback(); if (opts.errHandling) {{ this.#errHandling = opts.errHandling; }} }} @@ -984,8 +994,12 @@ impl AsyncTaskIntrinsic { }} async runCallbackFn(...args) {{ - if (!this.#callbackFn) {{ throw new Error('on callback function has been set for task'); }} - return await this.#callbackFn.apply(null, args); + if (!this.#callbackFn) {{ throw new Error('no callback function has been set for task'); }} + return {with_global_current_task_meta_async_fn}({{ + taskID: this.#id, + componentIdx: this.#componentIdx, + fn: () => {{ return this.#callbackFn.apply(null, args); }} + }}); }} getCalleeParams() {{ @@ -1016,6 +1030,11 @@ impl AsyncTaskIntrinsic { enterSync() {{ if (this.needsExclusiveLock()) {{ const cstate = {get_or_create_async_state_fn}(this.#componentIdx); + // TODO(???): it is *very possible* for a the line below to fail if + // an async function is already running (and holding the exclusive lock) + // + // It's not really possible to fix this unless we turn every sync export into + // an async export that will use the regular async enabled `enter()`. cstate.exclusiveLock(); }} return true; @@ -1026,6 +1045,7 @@ impl AsyncTaskIntrinsic { taskID: this.#id, componentIdx: this.#componentIdx, subtaskID: this.getParentSubtask()?.id(), + entryFnName: this.#entryFnName, }}); if (this.#entered) {{ @@ -1034,12 +1054,14 @@ impl AsyncTaskIntrinsic { const cstate = {get_or_create_async_state_fn}(this.#componentIdx); + await cstate.nextTaskExecutionSlot({{ task: this }}); + // If a task is either synchronous or host-provided (e.g. a host import, whether sync or async) // then we can avoid component-relevant tracking and immediately enter if (this.isSync() || opts?.isHost) {{ this.#entered = true; - // TODO(breaking): remove once manually-spccifying async fns is removed + // TODO(breaking): remove once manually-specifying async fns is removed // It is currently possible for an actually sync export to be specified // as async via JSPI if (this.#isManualAsync) {{ @@ -1049,11 +1071,15 @@ impl AsyncTaskIntrinsic { return this.#entered; }} - if (cstate.hasBackpressure()) {{ + // Perform intial backpressure check + if (cstate.hasBackpressure() || this.needsExclusiveLock() && cstate.isExclusivelyLocked()) {{ cstate.addBackpressureWaiter(); const result = await this.waitUntil({{ - readyFn: () => !cstate.hasBackpressure(), + readyFn: () => {{ + return !(cstate.hasBackpressure() + || this.needsExclusiveLock() && cstate.isExclusivelyLocked()); + }}, cancellable: true, }}); @@ -1065,7 +1091,32 @@ impl AsyncTaskIntrinsic { }} }} - if (this.needsExclusiveLock()) {{ cstate.exclusiveLock(); }} + // Lock the component state or keep trying until we can/do + try {{ + if (this.needsExclusiveLock()) {{ cstate.exclusiveLock(); }} + }} catch {{ + // Continuously attempt to lock until we can + while (cstate.hasBackpressure() || this.needsExclusiveLock() && cstate.isExclusivelyLocked()) {{ + try {{ + if (this.needsExclusiveLock()) {{ cstate.exclusiveLock(); }} + break; + }} catch(err) {{ + cstate.addBackpressureWaiter(); + const result = await this.waitUntil({{ + readyFn: () => {{ + return !(cstate.hasBackpressure() + || this.needsExclusiveLock() && cstate.isExclusivelyLocked()); + }}, + cancellable: true, + }}); + cstate.removeBackpressureWaiter(); + if (result === {task_class}.BlockResult.CANCELLED) {{ + this.cancel(); + return false; + }} + }} + }} + }} this.#entered = true; return this.#entered; @@ -1076,38 +1127,19 @@ impl AsyncTaskIntrinsic { isResolved() {{ return this.#state === {task_class}.State.RESOLVED; }} async waitUntil(opts) {{ - const {{ readyFn, waitableSetRep, cancellable }} = opts; - {debug_log_fn}('[{task_class}#waitUntil()] args', {{ taskID: this.#id, waitableSetRep, cancellable }}); - - const state = {get_or_create_async_state_fn}(this.#componentIdx); - const wset = state.handles.get(waitableSetRep); - - let event; + const {{ readyFn, cancellable }} = opts; + {debug_log_fn}('[{task_class}#waitUntil()] args', {{ taskID: this.#id, cancellable }}); - wset.incrementNumWaiting(); + // TODO(fix): check for cancel + // TODO(fix): determinism + // TODO(threads): add this thread to waiting list const keepGoing = await this.suspendUntil({{ - readyFn: () => {{ - const hasPendingEvent = wset.hasPendingEvent(); - const ready = readyFn(); - return ready && hasPendingEvent; - }}, + readyFn, cancellable, }}); - if (keepGoing) {{ - event = wset.getPendingEvent(); - }} else {{ - event = {{ - code: {event_code_enum}.TASK_CANCELLED, - payload0: 0, - payload1: 0, - }}; - }} - - wset.decrementNumWaiting(); - - return event; + return keepGoing; }} async yieldUntil(opts) {{ @@ -1148,9 +1180,8 @@ impl AsyncTaskIntrinsic { const ready = readyFn(); if (ready && {global_async_determinism} === 'random') {{ - // const coinFlip = {coin_flip_fn}(); - // if (coinFlip) {{ return true }} - return true; + const coinFlip = {coin_flip_fn}(); + if (coinFlip) {{ return true }} }} const keepGoing = await this.immediateSuspend({{ cancellable, readyFn }}); @@ -1275,8 +1306,7 @@ impl AsyncTaskIntrinsic { this.#state = {task_class}.State.PENDING_CANCEL; const cancelled = this.deliverPendingCancel({{ cancellable: true }}); -// TODO: do cleanup here to reset the machinery so we can run again? - + // TODO: do cleanup here to reset the machinery so we can run again? this.cancel({{ error: taskErr }}); }} @@ -1319,7 +1349,7 @@ impl AsyncTaskIntrinsic { }} }} - exit() {{ + exit(args) {{ {debug_log_fn}('[{task_class}#exit()]', {{ componentIdx: this.#componentIdx, taskID: this.#id, @@ -1354,8 +1384,10 @@ impl AsyncTaskIntrinsic { if (!state) {{ throw new Error('missing async state for component [' + this.#componentIdx + ']'); }} // Exempt the host from exclusive lock check - if (this.#componentIdx !== -1 && this.needsExclusiveLock() && !state.isExclusivelyLocked()) {{ - throw new Error(`task [${{this.#id}}] exit: component [${{this.#componentIdx}}] should have been exclusively locked`); + if (this.#componentIdx !== -1 && !args?.skipExclusiveLockCheck) {{ + if (this.needsExclusiveLock() && !state.isExclusivelyLocked()) {{ + throw new Error(`task [${{this.#id}}] exit: component [${{this.#componentIdx}}] should have been exclusively locked`); + }} }} state.exclusiveRelease(); @@ -1813,6 +1845,8 @@ impl AsyncTaskIntrinsic { let unpack_callback_result_fn = Self::UnpackCallbackResult.name(); let get_or_create_async_state_fn = Intrinsic::Component(ComponentIntrinsic::GetOrCreateAsyncState).name(); + let waitable_set_class = + Intrinsic::Waitable(WaitableIntrinsic::WaitableSetClass).name(); output.push_str(&format!(r#" async function {driver_loop_fn}(args) {{ @@ -1856,10 +1890,13 @@ impl AsyncTaskIntrinsic { throw new Error('invalid async return value, outside callback code range'); }} + const cstate = {get_or_create_async_state_fn}(componentIdx); + let eventCode; let index; let result; let asyncRes; + let wset; try {{ while (true) {{ if (callbackCode !== 0) {{ componentState.exclusiveRelease(); }} @@ -1872,7 +1909,7 @@ impl AsyncTaskIntrinsic { callbackFnName, taskID: task.id() }}); - task.exit(); + task.exit({{ skipExclusiveLockCheck: true }}); return; case 1: // YIELD @@ -1896,7 +1933,6 @@ impl AsyncTaskIntrinsic { break; case 2: // WAIT for a given waitable set - const cstate = {get_or_create_async_state_fn}(componentIdx); {debug_log_fn}('[{driver_loop_fn}()] waiting for event', {{ fnName, componentIdx, @@ -1905,11 +1941,18 @@ impl AsyncTaskIntrinsic { waitableSetRep, waitableSetTargets: cstate.handles.get(waitableSetRep).targets(), }}); - asyncRes = await task.waitUntil({{ + + wset = cstate.handles.get(waitableSetRep); + if (!(wset instanceof {waitable_set_class})) {{ + throw new Error(`non-waitable set returned from component state handles @ [${{waitableSetRep}}]`); + }} + + asyncRes = await wset.waitUntil({{ readyFn: () => !componentState.isExclusivelyLocked(), - waitableSetRep, + task, cancellable: true, }}); + {debug_log_fn}('[{driver_loop_fn}()] finished waiting for event', {{ fnName, componentIdx, @@ -1918,6 +1961,7 @@ impl AsyncTaskIntrinsic { waitableSetRep, asyncRes, }}); + break; default: diff --git a/crates/js-component-bindgen/src/intrinsics/p3/waitable.rs b/crates/js-component-bindgen/src/intrinsics/p3/waitable.rs index f61fc8c4a..51d5cb69a 100644 --- a/crates/js-component-bindgen/src/intrinsics/p3/waitable.rs +++ b/crates/js-component-bindgen/src/intrinsics/p3/waitable.rs @@ -151,6 +151,7 @@ impl WaitableIntrinsic { Self::WaitableSetClass => { let debug_log_fn = Intrinsic::DebugLog.name(); let waitable_set_class = Self::WaitableSetClass.name(); + let async_event_code_enum = Intrinsic::AsyncEventCodeEnum.name(); output.push_str(&format!( r#" @@ -225,6 +226,40 @@ impl WaitableIntrinsic { }} throw new Error('no waitables had a pending event'); }} + + async waitUntil(opts) {{ + {debug_log_fn}('[{waitable_set_class}#waitUntil()] args', {{ opts }}); + // TODO(threads): this task should be the thread + const {{ readyFn, task, cancellable }} = opts; + + let event; + + this.incrementNumWaiting(); + + const keepGoing = await task.suspendUntil({{ + readyFn: () => {{ + const hasPendingEvent = this.hasPendingEvent(); + const ready = readyFn(); + return ready && hasPendingEvent; + }}, + cancellable, + }}); + + if (keepGoing) {{ + event = this.getPendingEvent(); + }} else {{ + event = {{ + code: {async_event_code_enum}.TASK_CANCELLED, + payload0: 0, + payload1: 0, + }}; + }} + + this.decrementNumWaiting(); + + return event; + }} + }} "# )); @@ -356,7 +391,8 @@ impl WaitableIntrinsic { const state = {get_or_create_async_state_fn}(componentIdx); if (!state) {{throw new Error(`missing async state for component idx [${{componentIdx}}]`); }} - const rep = state.handles.insert(new {waitable_set_class}(componentIdx)); + const wset = new {waitable_set_class}(componentIdx); + const rep = state.handles.insert(wset); if (typeof rep !== 'number') {{ throw new Error(`invalid/missing waitable set rep [${{rep}}]`); }} {debug_log_fn}('[{waitable_set_new_fn}()] created waitable set', {{ componentIdx, rep }}); @@ -372,6 +408,10 @@ impl WaitableIntrinsic { Intrinsic::AsyncTask(AsyncTaskIntrinsic::GetCurrentTask).name(); let store_event_in_component_memory_fn = Intrinsic::Host(HostIntrinsic::StoreEventInComponentMemory).name(); + let get_or_create_async_state_fn = + Intrinsic::Component(ComponentIntrinsic::GetOrCreateAsyncState).name(); + let waitable_set_class = Self::WaitableSetClass.name(); + output.push_str(&format!(r#" async function {waitable_set_wait_fn}(ctx, waitableSetRep, resultPtr) {{ {debug_log_fn}('[{waitable_set_wait_fn}()] args', {{ ctx, waitableSetRep, resultPtr }}); @@ -393,7 +433,14 @@ impl WaitableIntrinsic { }} const memory = getMemoryFn(); - const event = await task.waitUntil({{ waitableSetRep, readyFn: () => true, cancellable: false }}); + + const cstate = {get_or_create_async_state_fn}(componentIdx); + const wset = await cstate.handles.get(waitableSetRep); + if (!(wset instanceof {waitable_set_class})) {{ + throw new Error(`non-waitable set returned from component state handles @ [${{waitableSetRep}}]`); + }} + + const event = await wset.waitUntil({{ readyFn: () => true, task, cancellable: false }}); return {store_event_in_component_memory_fn}({{ memory, ptr: resultPtr, event }}); }} "#)); @@ -535,7 +582,6 @@ impl WaitableIntrinsic { }} const waitable = waitableObj.getWaitable ? waitableObj.getWaitable() : waitableObj; if (!waitable.join) {{ - console.error("WAITABLE", {{ waitable, componentIdx, waitableRep, waitableSetRep }}); throw new Error("invalid waitable object, does not have join()"); }} diff --git a/packages/jco/test/p3/future-lifts.js b/packages/jco/test/p3/future-lifts.js index 5c3aee06a..d48e74b22 100644 --- a/packages/jco/test/p3/future-lifts.js +++ b/packages/jco/test/p3/future-lifts.js @@ -1,6 +1,6 @@ import { join } from "node:path"; -import { suite, test, assert, beforeAll, beforeEach, afterAll, expect } from "vitest"; +import { suite, test, assert, beforeAll, afterAll, expect } from "vitest"; import { setupAsyncTest } from "../helpers.js"; import { @@ -44,14 +44,7 @@ suite("future lifts", () => { esModule = setupRes.esModule; cleanup = setupRes.cleanup; - //console.log("OUTPUT DIR", setupRes.outputDir); - }); - - afterAll(async () => { - await cleanup(); - }); - beforeEach(async () => { instance = await esModule.instantiate(undefined, { ...new WASIShim().getImportObject(), "jco:test-components/resources": { @@ -60,6 +53,10 @@ suite("future lifts", () => { }); }); + afterAll(async () => { + await cleanup(); + }); + test.concurrent("bool", async () => { assert.instanceOf(instance["jco:test-components/get-future-async"].getFutureBool, AsyncFunction); const vals = [true, false]; @@ -435,7 +432,10 @@ suite("future lifts", () => { let numDisposed = 0; for (const resource of resources) { - assert.strictEqual(resource.getId(), await resource.getIdAsync()); + // sync functions can fail to lock the component state if run too soon after + // async functions + let expectedID = resource.getId(); + assert.strictEqual(expectedID, await resource.getIdAsync()); assert.doesNotThrow(() => resource[disposeSymbol]()); numDisposed += 1; assert.strictEqual( diff --git a/packages/jco/test/p3/stream-lifts.js b/packages/jco/test/p3/stream-lifts.js index 5f9a3c1a3..39b50ad44 100644 --- a/packages/jco/test/p3/stream-lifts.js +++ b/packages/jco/test/p3/stream-lifts.js @@ -1,13 +1,13 @@ import { join } from "node:path"; -import { suite, test, assert, beforeAll, beforeEach, afterAll, expect } from "vitest"; +import { suite, test, assert, beforeAll, afterAll, expect } from "vitest"; import { setupAsyncTest } from "../helpers.js"; import { AsyncFunction, LOCAL_TEST_COMPONENTS_DIR, checkStreamValues } from "../common.js"; import { WASIShim } from "@bytecodealliance/preview2-shim/instantiation"; suite("stream lifts", () => { - let esModule, cleanup, instance; + let esModule, cleanup, getInstance, instance; class ExampleResource { #id; @@ -39,22 +39,33 @@ suite("stream lifts", () => { esModule = setupRes.esModule; cleanup = setupRes.cleanup; - }); - - afterAll(async () => { - await cleanup(); - }); - beforeEach(async () => { + // We use a completely shared instance because sibling re-entrance + // is mediated by code in task.enter() instance = await esModule.instantiate(undefined, { ...new WASIShim().getImportObject(), "jco:test-components/resources": { ExampleResource, }, }); + getInstance = () => Promise.resolve(instance); + + // NOTE: To use an explicitly new instance per-test (more stable), uncomment the lines below + // + // getInstance = async () => esModule.instantiate(undefined, { + // ...new WASIShim().getImportObject(), + // "jco:test-components/resources": { + // ExampleResource, + // }, + // }); + }); + + afterAll(async () => { + await cleanup(); }); test.concurrent("bool", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamBool, AsyncFunction); const vals = [true, false]; const stream = await instance["jco:test-components/get-stream-async"].getStreamBool(vals); @@ -62,6 +73,7 @@ suite("stream lifts", () => { }); test.concurrent("u8/s8", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamU8, AsyncFunction); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamS8, AsyncFunction); @@ -89,6 +101,7 @@ suite("stream lifts", () => { }); test.concurrent("u16/s16", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamU16, AsyncFunction); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamS16, AsyncFunction); @@ -102,6 +115,7 @@ suite("stream lifts", () => { }); test.concurrent("u32/s32", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamU32, AsyncFunction); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamS32, AsyncFunction); @@ -115,6 +129,7 @@ suite("stream lifts", () => { }); test.concurrent("u64/s64", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamU64, AsyncFunction); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamS64, AsyncFunction); @@ -142,6 +157,7 @@ suite("stream lifts", () => { }); test.concurrent("f32/f64", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamF64, AsyncFunction); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamF32, AsyncFunction); @@ -169,6 +185,7 @@ suite("stream lifts", () => { }); test.concurrent("string", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamString, AsyncFunction); let vals = ["hello", "world", "!"]; @@ -177,6 +194,7 @@ suite("stream lifts", () => { }); test.concurrent("record", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamRecord, AsyncFunction); let vals = [ @@ -189,6 +207,7 @@ suite("stream lifts", () => { }); test.concurrent("variant", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamVariant, AsyncFunction); const vals = [ @@ -238,6 +257,7 @@ suite("stream lifts", () => { }); test.concurrent("tuple", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamTuple, AsyncFunction); let vals = [ @@ -250,6 +270,7 @@ suite("stream lifts", () => { }); test.concurrent("flags", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamFlags, AsyncFunction); let vals = [ @@ -262,6 +283,7 @@ suite("stream lifts", () => { }); test.concurrent("enum", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamEnum, AsyncFunction); let vals = ["first", "second", "third"]; @@ -270,6 +292,7 @@ suite("stream lifts", () => { }); test.concurrent("option", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamOptionString, AsyncFunction); let vals = ["present string", null]; @@ -288,6 +311,7 @@ suite("stream lifts", () => { }); test.concurrent("list", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamListU8, AsyncFunction); let vals = [[0x01, 0x02, 0x03, 0x04, 0x05], new Uint8Array([0x05, 0x04, 0x03, 0x02, 0x01]), []]; let stream = await instance["jco:test-components/get-stream-async"].getStreamListU8(vals); @@ -309,6 +333,7 @@ suite("stream lifts", () => { // TODO(fix): add tests for optimized UintXArrays (js_array_ty) test.concurrent("list", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamListString, AsyncFunction); let vals = [["first", "second", "third"], []]; let stream = await instance["jco:test-components/get-stream-async"].getStreamListString(vals); @@ -316,6 +341,7 @@ suite("stream lifts", () => { }); test.concurrent("list", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamFixedListU32, AsyncFunction); let vals = [ [1, 2, 3, 4, 5], @@ -327,6 +353,7 @@ suite("stream lifts", () => { }); test.concurrent("list", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamListRecord, AsyncFunction); let vals = [ [{ id: 1, idStr: "one" }], @@ -346,6 +373,7 @@ suite("stream lifts", () => { }); test.concurrent("result", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamResultString, AsyncFunction); let vals = [ { tag: "ok", val: "present string" }, @@ -356,20 +384,21 @@ suite("stream lifts", () => { }); test.concurrent("example-resource", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamExampleResourceOwn, AsyncFunction); const disposeSymbol = Symbol.dispose || Symbol.for("dispose"); let vals = [2, 1, 0]; let stream = await instance["jco:test-components/get-stream-async"].getStreamExampleResourceOwn(vals); const resources = []; - for (const expectedResourceId of vals) { - const { value: resource, done } = await stream.next(); - assert.isFalse(done); + const retrievedIds = []; + for await (const resource of stream) { assert.isNotNull(resource); assert.instanceOf(resource, instance["jco:test-components/get-stream-async"].ExampleGuestResource); - assert.strictEqual(resource.getId(), expectedResourceId); + retrievedIds.push(resource.getId()); resources.push(resource); } + assert.deepEqual(retrievedIds, vals); const finished = await stream.next(); assert.isTrue(finished.done); @@ -385,7 +414,10 @@ suite("stream lifts", () => { let numDisposed = 0; for (const resource of resources) { - assert.strictEqual(resource.getId(), await resource.getIdAsync()); + // NOTE: if runing async operations and sync operations too quickly, the sync operation *can* + // fail to lock the component async state. + let expectedID = resource.getId(); + assert.strictEqual(expectedID, await resource.getIdAsync()); assert.doesNotThrow(() => resource[disposeSymbol]()); numDisposed += 1; assert.strictEqual( @@ -396,6 +428,7 @@ suite("stream lifts", () => { }); test.concurrent("example-resource#get-id", async () => { + const instance = await getInstance(); assert.instanceOf( instance["jco:test-components/get-stream-async"].getStreamExampleResourceOwnAttr, AsyncFunction, @@ -412,6 +445,7 @@ suite("stream lifts", () => { }); test.concurrent("stream", async () => { + const instance = await getInstance(); assert.instanceOf(instance["jco:test-components/get-stream-async"].getStreamStreamString, AsyncFunction); let vals = ["first", "third", "second"]; let stream = await instance["jco:test-components/get-stream-async"].getStreamStreamString(vals); diff --git a/packages/jco/test/p3/stream-lowers.js b/packages/jco/test/p3/stream-lowers.js index e3f63b58f..60d163171 100644 --- a/packages/jco/test/p3/stream-lowers.js +++ b/packages/jco/test/p3/stream-lowers.js @@ -1,14 +1,14 @@ import { join } from "node:path"; import { ReadableStream } from "node:stream/web"; -import { suite, test, assert, beforeAll, beforeEach, afterAll } from "vitest"; +import { suite, test, assert, beforeAll, afterAll, describe } from "vitest"; import { setupAsyncTest } from "../helpers.js"; import { AsyncFunction, LOCAL_TEST_COMPONENTS_DIR, createReadableStreamFromValues } from "../common.js"; import { WASIShim } from "@bytecodealliance/preview2-shim/instantiation"; suite("stream lowers", () => { - let esModule, cleanup, instance; + let esModule, cleanup, getInstance, instance; class ExampleResource { #id; @@ -44,416 +44,572 @@ suite("stream lowers", () => { esModule = setupRes.esModule; cleanup = setupRes.cleanup; - }); - - afterAll(async () => { - await cleanup(); - }); - beforeEach(async () => { + // We use a completely shared instance because sibling re-entrance + // is mediated by code in task.enter() instance = await esModule.instantiate(undefined, { ...new WASIShim().getImportObject(), "jco:test-components/resources": { ExampleResource, }, }); - }); - - test.concurrent("sync passthrough", async () => { - assert.notInstanceOf(instance["jco:test-components/use-stream-sync"].streamPassthrough, AsyncFunction); - - let vals = [0, 5, 10]; - const readerStream = new ReadableStream({ - start(ctrl) { - vals.forEach((v) => ctrl.enqueue(v)); - ctrl.close(); - }, - }); + getInstance = () => Promise.resolve(instance); - let returnedStream = instance["jco:test-components/use-stream-sync"].streamPassthrough(readerStream); - - // NOTE: Returned streams conform to the async iterator protocol -- they *do not* confirm to - // any other interface, though an object that is a ReadableStream may have been passed in. + // NOTE: To use an explicitly new instance per-test (more stable), uncomment the lines below // - let returnedVals = []; - for await (const v of returnedStream) { - returnedVals.push(v); - } - assert.deepEqual(vals, returnedVals); - - // Test late writer -- component should block until a value is written, - // and we should handle a final value + done from an iterator properly - const lateStream = { - [Symbol.asyncIterator]() { - let returned = 0; - return { - async next() { - await new Promise((resolve) => setTimeout(resolve, 300)); - if (returned === 2) { - return { value: 42, done: true }; - } - returned += 1; - return { value: 42, done: false }; - }, - }; - }, - }; - returnedStream = instance["jco:test-components/use-stream-sync"].streamPassthrough(lateStream); + // getInstance = async () => esModule.instantiate(undefined, { + // ...new WASIShim().getImportObject(), + // "jco:test-components/resources": { + // ExampleResource, + // }, + // }); + }); - returnedVals = []; - for await (const v of returnedStream) { - returnedVals.push(v); - } - assert.deepEqual([42, 42, 42], returnedVals); + afterAll(async () => { + await cleanup(); }); - test.concurrent("async passthrough", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].streamPassthrough, AsyncFunction); + describe("sync", () => { + test("sync passthrough", async () => { + const instance = await getInstance(); + assert.notInstanceOf(instance["jco:test-components/use-stream-sync"].streamPassthrough, AsyncFunction); - let vals = [10, 5, 0]; - const readerStream = new ReadableStream({ - start(ctrl) { - vals.forEach((v) => ctrl.enqueue(v)); - ctrl.close(); - }, + let vals = [0, 5, 10]; + const readerStream = new ReadableStream({ + start(ctrl) { + vals.forEach((v) => ctrl.enqueue(v)); + ctrl.close(); + }, + }); + + let returnedStream = instance["jco:test-components/use-stream-sync"].streamPassthrough(readerStream); + + // NOTE: Returned streams conform to the async iterator protocol -- they *do not* confirm to + // any other interface, though an object that is a ReadableStream may have been passed in. + // + let returnedVals = []; + for await (const v of returnedStream) { + returnedVals.push(v); + } + assert.deepEqual(vals, returnedVals); + + // Test late writer -- component should block until a value is written, + // and we should handle a final value + done from an iterator properly + const lateStream = { + [Symbol.asyncIterator]() { + let returned = 0; + return { + async next() { + await new Promise((resolve) => setTimeout(resolve, 300)); + if (returned === 2) { + return { value: 42, done: true }; + } + returned += 1; + return { value: 42, done: false }; + }, + }; + }, + }; + returnedStream = instance["jco:test-components/use-stream-sync"].streamPassthrough(lateStream); + + returnedVals = []; + for await (const v of returnedStream) { + returnedVals.push(v); + } + assert.deepEqual([42, 42, 42], returnedVals); }); - - let stream = await instance["jco:test-components/use-stream-async"].streamPassthrough(readerStream); - let returnedVals = []; - for await (const v of stream) { - returnedVals.push(v); - } - assert.deepEqual(vals, returnedVals); }); - test.concurrent("bool", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesBool, AsyncFunction); + describe("async", () => { + test.concurrent("async passthrough", async () => { + const instance = await getInstance(); + assert.instanceOf(instance["jco:test-components/use-stream-async"].streamPassthrough, AsyncFunction); - let vals = [true, false]; - let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesBool( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, vals); - }); + let vals = [10, 5, 0]; + const readerStream = new ReadableStream({ + start(ctrl) { + vals.forEach((v) => ctrl.enqueue(v)); + ctrl.close(); + }, + }); + + let stream = await instance["jco:test-components/use-stream-async"].streamPassthrough(readerStream); + let returnedVals = []; + for await (const v of stream) { + returnedVals.push(v); + } + assert.deepEqual(vals, returnedVals); + }); - test.concurrent("u8/s8", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesU8, AsyncFunction); - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesS8, AsyncFunction); - - let vals = [0, 1, 255]; - let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesU8( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, vals); - - vals = [-128, 0, 1, 127]; - returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesS8( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, vals); - }); + test.concurrent("bool", async () => { + const instance = await getInstance(); + assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesBool, AsyncFunction); - test.concurrent("u16/s16", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesU16, AsyncFunction); - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesS16, AsyncFunction); - - let vals = [0, 100, 65535]; - let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesU16( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, vals); - - vals = [-32_768, 0, 32_767]; - returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesS16( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, vals); - }); + let vals = [true, false]; + let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesBool( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, vals); + }); - test.concurrent("u32/s32", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesU32, AsyncFunction); - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesS32, AsyncFunction); - - let vals = [10, 5, 0]; - let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesU32( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, vals); - - vals = [-32, 90001, 3200000]; - returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesS32( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, vals); - }); + test.concurrent("u8/s8", async () => { + const instance = await getInstance(); + assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesU8, AsyncFunction); + assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesS8, AsyncFunction); + + let vals = [0, 1, 255]; + let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesU8( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, vals); + + vals = [-128, 0, 1, 127]; + returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesS8( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, vals); + }); - test.concurrent("u64/s64", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesU64, AsyncFunction); - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesS64, AsyncFunction); - - let vals = [0n, 100n, 65535n]; - let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesU64( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, vals); - - vals = [-32_768n, 0n, 32_767n]; - returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesS64( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, vals); - }); + test.concurrent("u16/s16", async () => { + const instance = await getInstance(); + assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesU16, AsyncFunction); + assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesS16, AsyncFunction); + + let vals = [0, 100, 65535]; + let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesU16( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, vals); + + vals = [-32_768, 0, 32_767]; + returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesS16( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, vals); + }); - test.concurrent("f32/f64", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesF32, AsyncFunction); - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesF64, AsyncFunction); - - let vals = [-300.01235, -1.5, -0.0, 0.0, 1.5, 300.01235]; - let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesF32( - createReadableStreamFromValues(vals), - ); - vals.entries().forEach(([idx, v]) => assert.closeTo(v, returnedVals[idx], 0.01)); - - vals = [-60000.01235, -1.5, -0.0, 0.0, 1.5, -60000.01235]; - returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesF32( - createReadableStreamFromValues(vals), - ); - vals.entries().forEach(([idx, v]) => assert.closeTo(v, returnedVals[idx], 0.01)); - }); + test.concurrent("u32/s32", async () => { + const instance = await getInstance(); + assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesU32, AsyncFunction); + assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesS32, AsyncFunction); + + let vals = [10, 5, 0]; + let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesU32( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, vals); + + vals = [-32, 90001, 3200000]; + returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesS32( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, vals); + }); - test.concurrent("string", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesString, AsyncFunction); + test.concurrent("u64/s64", async () => { + const instance = await getInstance(); + assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesU64, AsyncFunction); + assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesS64, AsyncFunction); + + let vals = [0n, 100n, 65535n]; + let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesU64( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, vals); + + vals = [-32_768n, 0n, 32_767n]; + returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesS64( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, vals); + }); - let vals = ["hello", "world", "!"]; - let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesString( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, vals); - }); + test.concurrent("f32/f64", async () => { + const instance = await getInstance(); + assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesF32, AsyncFunction); + assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesF64, AsyncFunction); + + let vals = [-300.01235, -1.5, -0.0, 0.0, 1.5, 300.01235]; + let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesF32( + createReadableStreamFromValues(vals), + ); + vals.entries().forEach(([idx, v]) => assert.closeTo(v, returnedVals[idx], 0.01)); + + vals = [-60000.01235, -1.5, -0.0, 0.0, 1.5, -60000.01235]; + returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesF32( + createReadableStreamFromValues(vals), + ); + vals.entries().forEach(([idx, v]) => assert.closeTo(v, returnedVals[idx], 0.01)); + }); - test.concurrent("record", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesRecord, AsyncFunction); - - let vals = [ - { id: 3, idStr: "three" }, - { id: 2, idStr: "two" }, - { id: 1, idStr: "one" }, - ]; - let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesRecord( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, vals); - }); + test.concurrent("string", async () => { + const instance = await getInstance(); + assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesString, AsyncFunction); - test.concurrent("variant", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesVariant, AsyncFunction); - - let vals = [ - { tag: "maybe-u32", val: 123 }, - { tag: "maybe-u32", val: null }, - { tag: "str", val: "string-value" }, - { tag: "num", val: 1 }, - ]; - let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesVariant( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, [ - // TODO: wit type representation smoothing mismatch - { tag: "maybe-u32", val: { tag: "some", val: 123 } }, - { tag: "maybe-u32", val: { tag: "none" } }, - { tag: "str", val: "string-value" }, - { tag: "num", val: 1 }, - ]); - - vals = [{ tag: "float", val: 123.1 }]; - returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesVariant( - createReadableStreamFromValues(vals), - ); - assert.closeTo(returnedVals[0].val, 123.1, 0.01); - }); + let vals = ["hello", "world", "!"]; + let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesString( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, vals); + }); - test.concurrent("tuple", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesTuple, AsyncFunction); - - let vals = [ - [1, -1, "one"], - [2, -2, "two"], - [3, -3, "two"], - ]; - let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesTuple( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, vals); - }); + test.concurrent("record", async () => { + const instance = await getInstance(); + assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesRecord, AsyncFunction); - test.concurrent("flags", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesFlags, AsyncFunction); - - let vals = [ - { first: true, second: false, third: false }, - { first: false, second: true, third: false }, - { first: false, second: false, third: true }, - ]; - let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesFlags( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, vals); - }); + let vals = [ + { id: 3, idStr: "three" }, + { id: 2, idStr: "two" }, + { id: 1, idStr: "one" }, + ]; + let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesRecord( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, vals); + }); - test.concurrent("enum", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesEnum, AsyncFunction); + test.concurrent("variant", async () => { + const instance = await getInstance(); + assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesVariant, AsyncFunction); + + let vals = [ + { tag: "maybe-u32", val: 123 }, + { tag: "maybe-u32", val: null }, + { tag: "str", val: "string-value" }, + { tag: "num", val: 1 }, + ]; + let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesVariant( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, [ + // TODO: wit type representation smoothing mismatch + { tag: "maybe-u32", val: { tag: "some", val: 123 } }, + { tag: "maybe-u32", val: { tag: "none" } }, + { tag: "str", val: "string-value" }, + { tag: "num", val: 1 }, + ]); + + vals = [{ tag: "float", val: 123.1 }]; + returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesVariant( + createReadableStreamFromValues(vals), + ); + assert.closeTo(returnedVals[0].val, 123.1, 0.01); + }); - let vals = ["first", "second", "third"]; - let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesEnum( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, ["first", "second", "third"]); - }); + test.concurrent("tuple", async () => { + const instance = await getInstance(); + assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesTuple, AsyncFunction); + + let vals = [ + [1, -1, "one"], + [2, -2, "two"], + [3, -3, "two"], + ]; + let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesTuple( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, vals); + }); - test.concurrent("option", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesOptionString, AsyncFunction); - - let vals = ["present string", null]; - let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesOptionString( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, [ - // TODO: wit type representation smoothing mismatch - { tag: "some", val: "present string" }, - { tag: "none" }, - ]); - }); + test.concurrent("flags", async () => { + const instance = await getInstance(); + assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesFlags, AsyncFunction); + + let vals = [ + { first: true, second: false, third: false }, + { first: false, second: true, third: false }, + { first: false, second: false, third: true }, + ]; + let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesFlags( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, vals); + }); - test.concurrent("result", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesResultString, AsyncFunction); - - let vals = [{ tag: "ok", val: "present string" }, { tag: "err", val: "nope" }, "bare string (ok)"]; - let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesResultString( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, [ - // TODO: wit type representation smoothing mismatch - { tag: "ok", val: "present string" }, - { tag: "err", val: "nope" }, - { tag: "ok", val: "bare string (ok)" }, - ]); - }); + test.concurrent("enum", async () => { + const instance = await getInstance(); + assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesEnum, AsyncFunction); - test.concurrent("list", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesListU8, AsyncFunction); - - let vals = [[0x01, 0x02, 0x03, 0x04, 0x05], new Uint8Array([0x05, 0x04, 0x03, 0x02, 0x01]), []]; - let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesListU8( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, [ - // TODO: wit type representation smoothing mismatch - vals[0], - [...vals[1]], - [], - ]); - }); + let vals = ["first", "second", "third"]; + let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesEnum( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, ["first", "second", "third"]); + }); - test.concurrent("list", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesListString, AsyncFunction); + test.concurrent("option", async () => { + const instance = await getInstance(); + assert.instanceOf( + instance["jco:test-components/use-stream-async"].readStreamValuesOptionString, + AsyncFunction, + ); + + let vals = ["present string", null]; + let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesOptionString( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, [ + // TODO: wit type representation smoothing mismatch + { tag: "some", val: "present string" }, + { tag: "none" }, + ]); + }); - let vals = [["first", "second", "third"], []]; - let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesListString( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, vals); - }); + test.concurrent("result", async () => { + const instance = await getInstance(); + assert.instanceOf( + instance["jco:test-components/use-stream-async"].readStreamValuesResultString, + AsyncFunction, + ); + + let vals = [{ tag: "ok", val: "present string" }, { tag: "err", val: "nope" }, "bare string (ok)"]; + let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesResultString( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, [ + // TODO: wit type representation smoothing mismatch + { tag: "ok", val: "present string" }, + { tag: "err", val: "nope" }, + { tag: "ok", val: "bare string (ok)" }, + ]); + }); - test.concurrent("list>", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesFixedListU32, AsyncFunction); - - let vals = [ - [ - [1, 2, 3, 4, 5], - [0, 0, 0, 0, 0], - ], - [[0, 0, 0, 0, 0], new Uint32Array([0x05, 0x04, 0x03, 0x02, 0x01])], - ]; - let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesFixedListU32( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, [ - // TODO(fix): wit type representation smoothing mismatch - [ - [1, 2, 3, 4, 5], - [0, 0, 0, 0, 0], - ], - [ - [0, 0, 0, 0, 0], - [0x05, 0x04, 0x03, 0x02, 0x01], - ], - ]); - }); + test.concurrent("list", async () => { + const instance = await getInstance(); + assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesListU8, AsyncFunction); + + let vals = [[0x01, 0x02, 0x03, 0x04, 0x05], new Uint8Array([0x05, 0x04, 0x03, 0x02, 0x01]), []]; + let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesListU8( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, [ + // TODO: wit type representation smoothing mismatch + vals[0], + [...vals[1]], + [], + ]); + }); - test.concurrent("list", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesListRecord, AsyncFunction); + test.concurrent("list", async () => { + const instance = await getInstance(); + assert.instanceOf( + instance["jco:test-components/use-stream-async"].readStreamValuesListString, + AsyncFunction, + ); + + let vals = [["first", "second", "third"], []]; + let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesListString( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, vals); + }); - let vals = [ - [ - { id: 3, idStr: "three" }, - { id: 2, idStr: "two" }, - { id: 1, idStr: "one" }, - ], - [ - { id: 1, idStr: "one-one" }, - { id: 2, idStr: "two-two" }, - { id: 3, idStr: "three-three" }, - ], - ]; - let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesListRecord( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, vals); - }); + test.concurrent("list>", async () => { + const instance = await getInstance(); + assert.instanceOf( + instance["jco:test-components/use-stream-async"].readStreamValuesFixedListU32, + AsyncFunction, + ); + + let vals = [ + [ + [1, 2, 3, 4, 5], + [0, 0, 0, 0, 0], + ], + [[0, 0, 0, 0, 0], new Uint32Array([0x05, 0x04, 0x03, 0x02, 0x01])], + ]; + let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesFixedListU32( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, [ + // TODO(fix): wit type representation smoothing mismatch + [ + [1, 2, 3, 4, 5], + [0, 0, 0, 0, 0], + ], + [ + [0, 0, 0, 0, 0], + [0x05, 0x04, 0x03, 0x02, 0x01], + ], + ]); + }); - test.concurrent("example-resource", async () => { - assert.instanceOf( - instance["jco:test-components/use-stream-async"].readStreamValuesExampleResourceOwn, - AsyncFunction, - ); - - let vals = [new ExampleResource(0), new ExampleResource(1), new ExampleResource(2)]; - await instance["jco:test-components/use-stream-async"].readStreamValuesExampleResourceOwn( - createReadableStreamFromValues(vals), - ); - // TODO(fix): we shoudl be able to ensure destructor call - // see: https://github.com/bytecodealliance/jco/issues/989 - // assert(vals.every(r => r.dropped)); - }); + test.concurrent("list", async () => { + const instance = await getInstance(); + assert.instanceOf( + instance["jco:test-components/use-stream-async"].readStreamValuesListRecord, + AsyncFunction, + ); + + let vals = [ + [ + { id: 3, idStr: "three" }, + { id: 2, idStr: "two" }, + { id: 1, idStr: "one" }, + ], + [ + { id: 1, idStr: "one-one" }, + { id: 2, idStr: "two-two" }, + { id: 3, idStr: "three-three" }, + ], + ]; + let returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesListRecord( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, vals); + }); - test.concurrent("example-resource#get-id", async () => { - assert.instanceOf( - instance["jco:test-components/use-stream-async"].readStreamValuesExampleResourceOwnAttr, - AsyncFunction, - ); - - let vals = [new ExampleResource(2), new ExampleResource(1), new ExampleResource(0)]; - const returnedVals = await instance[ - "jco:test-components/use-stream-async" - ].readStreamValuesExampleResourceOwnAttr(createReadableStreamFromValues(vals)); - assert.deepEqual(returnedVals, [2, 1, 0]); - }); + test.concurrent("example-resource", async () => { + const instance = await getInstance(); + assert.instanceOf( + instance["jco:test-components/use-stream-async"].readStreamValuesExampleResourceOwn, + AsyncFunction, + ); + + let vals = [new ExampleResource(0), new ExampleResource(1), new ExampleResource(2)]; + await instance["jco:test-components/use-stream-async"].readStreamValuesExampleResourceOwn( + createReadableStreamFromValues(vals), + ); + // TODO(fix): we shoudl be able to ensure destructor call + // see: https://github.com/bytecodealliance/jco/issues/989 + // assert(vals.every(r => r.dropped)); + }); - test.concurrent("stream", async () => { - assert.instanceOf(instance["jco:test-components/use-stream-async"].readStreamValuesStreamString, AsyncFunction); - - let vals = [ - createReadableStreamFromValues(["first", "stream", "values"]), - createReadableStreamFromValues(["second", "stream", "here"]), - createReadableStreamFromValues(["third", "values", "in stream"]), - ]; - const returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesStreamString( - createReadableStreamFromValues(vals), - ); - assert.deepEqual(returnedVals, [ - ["first", "stream", "values"], - ["second", "stream", "here"], - ["third", "values", "in stream"], - ]); + test.concurrent("example-resource#get-id", async () => { + const instance = await getInstance(); + assert.instanceOf( + instance["jco:test-components/use-stream-async"].readStreamValuesExampleResourceOwnAttr, + AsyncFunction, + ); + + let vals = [new ExampleResource(2), new ExampleResource(1), new ExampleResource(0)]; + const returnedVals = await instance[ + "jco:test-components/use-stream-async" + ].readStreamValuesExampleResourceOwnAttr(createReadableStreamFromValues(vals)); + assert.deepEqual(returnedVals, [2, 1, 0]); + }); + + test.concurrent("stream", async () => { + const instance = await getInstance(); + assert.instanceOf( + instance["jco:test-components/use-stream-async"].readStreamValuesStreamString, + AsyncFunction, + ); + + let vals = [ + createReadableStreamFromValues(["first", "stream", "values"]), + createReadableStreamFromValues(["second", "stream", "here"]), + createReadableStreamFromValues(["third", "values", "in stream"]), + ]; + const returnedVals = await instance["jco:test-components/use-stream-async"].readStreamValuesStreamString( + createReadableStreamFromValues(vals), + ); + assert.deepEqual(returnedVals, [ + ["first", "stream", "values"], + ["second", "stream", "here"], + ["third", "values", "in stream"], + ]); + }); }); }); + +// // NOTE: this suite of tests should *not* be run concurrently, as they performs sync operations +// // which cannot be mediated/scheduled in the same way as async functions (avoiding +// // which does *not* get mediated in the same way +// suite("stream lowers (sync)", () => { +// let esModule, cleanup, getInstance, instance; + +// beforeAll(async () => { +// const name = "stream-rx"; +// const setupRes = await setupAsyncTest({ +// asyncMode: "jspi", +// component: { +// name, +// path: join(LOCAL_TEST_COMPONENTS_DIR, `${name}.wasm`), +// skipInstantiation: true, +// }, +// jco: { +// transpile: { +// extraArgs: { +// minify: false, +// }, +// }, +// }, +// }); + +// esModule = setupRes.esModule; +// cleanup = setupRes.cleanup; + +// // We use a completely shared instance because sibling re-entrance +// // is mediated by code in task.enter() +// instance = await esModule.instantiate(undefined, { +// ...new WASIShim().getImportObject(), +// "jco:test-components/resources": { +// ExampleResource, +// }, +// }); +// getInstance = () => Promise.resolve(instance); + +// // NOTE: To use an explicitly new instance per-test (more stable), uncomment the lines below +// // +// // getInstance = async () => esModule.instantiate(undefined, { +// // ...new WASIShim().getImportObject(), +// // "jco:test-components/resources": { +// // ExampleResource, +// // }, +// // }); +// }); + +// afterAll(async () => { +// await cleanup(); +// }); + +// test("sync passthrough", async () => { +// const instance = await getInstance(); +// assert.notInstanceOf(instance["jco:test-components/use-stream-sync"].streamPassthrough, AsyncFunction); + +// let vals = [0, 5, 10]; +// const readerStream = new ReadableStream({ +// start(ctrl) { +// vals.forEach((v) => ctrl.enqueue(v)); +// ctrl.close(); +// }, +// }); + +// let returnedStream = instance["jco:test-components/use-stream-sync"].streamPassthrough(readerStream); + +// // NOTE: Returned streams conform to the async iterator protocol -- they *do not* confirm to +// // any other interface, though an object that is a ReadableStream may have been passed in. +// // +// let returnedVals = []; +// for await (const v of returnedStream) { +// returnedVals.push(v); +// } +// assert.deepEqual(vals, returnedVals); + +// // Test late writer -- component should block until a value is written, +// // and we should handle a final value + done from an iterator properly +// const lateStream = { +// [Symbol.asyncIterator]() { +// let returned = 0; +// return { +// async next() { +// await new Promise((resolve) => setTimeout(resolve, 300)); +// if (returned === 2) { +// return { value: 42, done: true }; +// } +// returned += 1; +// return { value: 42, done: false }; +// }, +// }; +// }, +// }; +// returnedStream = instance["jco:test-components/use-stream-sync"].streamPassthrough(lateStream); + +// returnedVals = []; +// for await (const v of returnedStream) { +// returnedVals.push(v); +// } +// assert.deepEqual([42, 42, 42], returnedVals); +// }); +// }); diff --git a/packages/jco/test/vitest.lts.ts b/packages/jco/test/vitest.lts.ts index 0e9ac8453..622919a10 100644 --- a/packages/jco/test/vitest.lts.ts +++ b/packages/jco/test/vitest.lts.ts @@ -2,7 +2,7 @@ import { availableParallelism } from "node:os"; import { defineConfig } from "vitest/config"; -const DEFAULT_TIMEOUT_MS = 1000 * 60 * 1; // 1m +const DEFAULT_TIMEOUT_MS = 1000 * 60 * 1; // 60s const CI_DEFAULT_TIMEOUT_MS = 1000 * 60 * 3; // 1m const REPORTERS = process.env.GITHUB_ACTIONS ? ["verbose", "github-actions"] : ["verbose"]; diff --git a/packages/jco/test/vitest.ts b/packages/jco/test/vitest.ts index 98acd6dcb..33514347b 100644 --- a/packages/jco/test/vitest.ts +++ b/packages/jco/test/vitest.ts @@ -2,7 +2,7 @@ import { availableParallelism } from "node:os"; import { defineConfig } from "vitest/config"; -const DEFAULT_TIMEOUT_MS = 1000 * 60 * 1; // 1m +const DEFAULT_TIMEOUT_MS = 1000 * 60 * 1; // 60s const CI_DEFAULT_TIMEOUT_MS = 1000 * 60 * 3; // 1m const REPORTERS = process.env.GITHUB_ACTIONS ? ["verbose", "github-actions"] : ["verbose"];