diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index cf096839d47..3be12f26cc4 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -31,6 +31,7 @@ use crate::host::{ModuleHost, ReducerCallError, ReducerCallResult, Scheduler}; use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; use crate::subscription::module_subscription_manager::TransactionOffset; +use crate::util::adaptive_recv::AdaptiveUnboundedReceiver; use crate::util::jobs::{AllocatedJobCore, CorePinner, LoadBalanceOnDropGuard}; use crate::worker_metrics::WORKER_METRICS; use core::any::type_name; @@ -53,8 +54,9 @@ use std::cell::Cell; use std::panic::AssertUnwindSafe; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, LazyLock}; +use std::time::Duration; use std::time::Instant; -use tokio::sync::{oneshot, Mutex as AsyncMutex, Notify}; +use tokio::sync::{mpsc, oneshot, Mutex as AsyncMutex, Notify}; use tracing::Instrument; use v8::script_compiler::{compile_module, Source}; use v8::{ @@ -114,6 +116,8 @@ static V8_RUNTIME_GLOBAL: LazyLock = LazyLock::new(V8RuntimeInne static NEXT_JS_INSTANCE_ID: AtomicU64 = AtomicU64::new(1); const REDUCER_ARGS_BUFFER_SIZE: usize = 4_096; pub(crate) const V8_WORKER_KIND_INSTANCE_LANE: &str = "instance_lane"; +const JS_REPLY_RELAY_BASELINE_LINGER: Duration = Duration::from_micros(25); +const JS_REPLY_RELAY_MAX_LINGER: Duration = Duration::from_micros(100); thread_local! { // Note, `on_module_thread` runs host closures on a single JS module thread. @@ -163,6 +167,37 @@ impl JsWorkerKind { } } +trait JsReplyTask { + fn run(self: Box); +} + +impl JsReplyTask for F +where + F: FnOnce() + Send + 'static, +{ + fn run(self: Box) { + (*self)(); + } +} + +type JsReplyTaskBox = Box; +type JsReplyRelayTx = mpsc::UnboundedSender; + +/// Spawns a worker for relaying replys to client connection handlers. +/// When the JS worker thread finishes executing a request, it pushes a +/// reply to this worker which then forwards it on to the correct +/// client connection handler. +fn spawn_js_reply_relay() -> JsReplyRelayTx { + let (tx, rx) = mpsc::unbounded_channel::(); + tokio::spawn(async move { + let mut rx = AdaptiveUnboundedReceiver::new(rx, JS_REPLY_RELAY_BASELINE_LINGER, JS_REPLY_RELAY_MAX_LINGER); + while let Some(task) = rx.recv().await { + task.run(); + } + }); + tx +} + pub(crate) fn assert_not_on_js_module_thread(label: &str) { ON_JS_MODULE_THREAD.with(|entered| { assert!( @@ -230,6 +265,7 @@ impl V8RuntimeInner { .v8_instance_lane_queue_length .with_label_values(&database_identity), ); + let lane_reply_relay = spawn_js_reply_relay(); // Validate/create the module and spawn the first instance. let mcc = Either::Right(mcc); @@ -243,6 +279,7 @@ impl V8RuntimeInner { heap_policy, JsWorkerKind::InstanceLane, lane_queue.clone(), + Some(lane_reply_relay.clone()), ) .await?; let module = JsModule { @@ -252,6 +289,7 @@ impl V8RuntimeInner { core_pinner, heap_policy, lane_queue, + lane_reply_relay, }; Ok(ModuleWithInstance::Js { module, init_inst }) @@ -266,6 +304,7 @@ pub struct JsModule { core_pinner: CorePinner, heap_policy: V8HeapPolicyConfig, lane_queue: Arc, + lane_reply_relay: JsReplyRelayTx, } impl JsModule { @@ -281,7 +320,12 @@ impl JsModule { self.common.info().clone() } - async fn create_instance_with_queue(&self, request_queue: Arc) -> JsInstance { + async fn create_instance_with_queue( + &self, + request_queue: Arc, + worker_kind: JsWorkerKind, + reply_relay: Option, + ) -> JsInstance { let program = self.program.clone(); let common = self.common.clone(); let load_balance_guard = self.load_balance_guard.clone(); @@ -295,8 +339,9 @@ impl JsModule { load_balance_guard, core_pinner, heap_policy, - JsWorkerKind::Pooled, + worker_kind, request_queue, + reply_relay, ) .await .expect("`spawn_instance_worker` should succeed when passed `ModuleCommon`"); @@ -307,11 +352,17 @@ impl JsModule { // We use a rendezvous channel for pooled instances, because they are checked // out one request at a time and subsequently returned to the pool, unlike the // long lived instance used for executing reducers. - self.create_instance_with_queue(JsWorkerQueue::bounded(0)).await + self.create_instance_with_queue(JsWorkerQueue::bounded(0), JsWorkerKind::Pooled, None) + .await } async fn create_lane_instance(&self) -> JsInstance { - self.create_instance_with_queue(self.lane_queue.clone()).await + self.create_instance_with_queue( + self.lane_queue.clone(), + JsWorkerKind::InstanceLane, + Some(self.lane_reply_relay.clone()), + ) + .await } } @@ -806,9 +857,22 @@ enum JsWorkerRequest { static_assert_size!(CallReducerParams, 192); -fn send_worker_reply(ctx: &str, reply_tx: JsReplyTx, value: T) { - if reply_tx.send(value).is_err() { - log::error!("should have receiver for `{ctx}` response"); +fn send_worker_reply(reply_relay: Option<&JsReplyRelayTx>, ctx: &'static str, reply_tx: JsReplyTx, value: T) +where + T: Send + 'static, +{ + let send_reply = move || { + if reply_tx.send(value).is_err() { + log::error!("should have receiver for `{ctx}` response"); + } + }; + + if let Some(reply_relay) = reply_relay { + if reply_relay.send(Box::new(send_reply)).is_err() { + log::error!("reply relay dropped while handling `{ctx}` response"); + } + } else { + send_reply(); } } @@ -1324,6 +1388,7 @@ async fn spawn_instance_worker( heap_policy: V8HeapPolicyConfig, worker_kind: JsWorkerKind, request_queue: Arc, + reply_relay: Option, ) -> anyhow::Result<(ModuleCommon, JsInstance)> { // This one-shot channel is used for initial startup error handling within the thread. let (result_tx, result_rx) = oneshot::channel(); @@ -1437,14 +1502,14 @@ async fn spawn_instance_worker( policy, } => { let res = instance_common.update_database(program, old_module_info, policy, &mut inst); - send_worker_reply("update_database", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "update_database", reply_tx, res); } JsWorkerRequest::CallReducer { reply_tx, params } => { let (res, trapped) = call_reducer(None, params); if trapped { worker_state_in_thread.mark_trapped(); } - send_worker_reply("call_reducer", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "call_reducer", reply_tx, res); should_exit = trapped; } JsWorkerRequest::CallView { reply_tx, cmd } => { @@ -1452,7 +1517,7 @@ async fn spawn_instance_worker( if trapped { worker_state_in_thread.mark_trapped(); } - send_worker_reply("call_view", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "call_view", reply_tx, res); should_exit = trapped; } JsWorkerRequest::CallProcedure { reply_tx, params } => { @@ -1463,12 +1528,12 @@ async fn spawn_instance_worker( if trapped { worker_state_in_thread.mark_trapped(); } - send_worker_reply("call_procedure", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "call_procedure", reply_tx, res); should_exit = trapped; } JsWorkerRequest::ClearAllClients(reply_tx) => { let res = instance_common.clear_all_clients(); - send_worker_reply("clear_all_clients", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "clear_all_clients", reply_tx, res); } JsWorkerRequest::CallIdentityConnected { reply_tx, @@ -1481,7 +1546,7 @@ async fn spawn_instance_worker( if trapped { worker_state_in_thread.mark_trapped(); } - send_worker_reply("call_identity_connected", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "call_identity_connected", reply_tx, res); should_exit = trapped; } JsWorkerRequest::CallIdentityDisconnected { @@ -1500,7 +1565,7 @@ async fn spawn_instance_worker( if trapped { worker_state_in_thread.mark_trapped(); } - send_worker_reply("call_identity_disconnected", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "call_identity_disconnected", reply_tx, res); should_exit = trapped; } JsWorkerRequest::DisconnectClient { reply_tx, client_id } => { @@ -1509,7 +1574,7 @@ async fn spawn_instance_worker( if trapped { worker_state_in_thread.mark_trapped(); } - send_worker_reply("disconnect_client", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "disconnect_client", reply_tx, res); should_exit = trapped; } JsWorkerRequest::InitDatabase { reply_tx, program } => { @@ -1518,7 +1583,7 @@ async fn spawn_instance_worker( if trapped { worker_state_in_thread.mark_trapped(); } - send_worker_reply("init_database", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "init_database", reply_tx, res); should_exit = trapped; } JsWorkerRequest::CallScheduledFunction { reply_tx, params } => { @@ -1529,7 +1594,7 @@ async fn spawn_instance_worker( if trapped { worker_state_in_thread.mark_trapped(); } - send_worker_reply("call_scheduled_function", reply_tx, res); + send_worker_reply(reply_relay.as_ref(), "call_scheduled_function", reply_tx, res); should_exit = trapped; } }