diff --git a/Cargo.lock b/Cargo.lock index a5cdb266..6b74c97e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4289,6 +4289,7 @@ dependencies = [ "futures", "node_resolver", "pctx_code_execution_runtime", + "pctx_config", "pctx_deno_transpiler", "pctx_registry", "pctx_type_check_runtime", @@ -4303,6 +4304,7 @@ dependencies = [ "tokio", "tracing", "url", + "uuid", "windows-sys 0.59.0", ] diff --git a/crates/pctx/src/commands/mcp/start.rs b/crates/pctx/src/commands/mcp/start.rs index 09eb12da..7e823bfc 100644 --- a/crates/pctx/src/commands/mcp/start.rs +++ b/crates/pctx/src/commands/mcp/start.rs @@ -1,19 +1,20 @@ use anyhow::Result; use clap::Parser; -use pctx_code_mode::CodeMode; +use pctx_code_mode::{CodeMode, ExecutorPool, PoolConfig}; use pctx_config::Config; -use tracing::info; +use std::sync::Arc; +use tracing::{info, warn}; use pctx_mcp_server::PctxMcpServer; #[derive(Debug, Clone, Parser)] pub struct StartCmd { /// Port to listen on - #[arg(short, long, default_value = "8080")] + #[arg(short, long, default_value = "8080", env = "PCTX_PORT")] pub port: u16, /// Host address to bind to (use 0.0.0.0 for external access) - #[arg(long, default_value = "127.0.0.1")] + #[arg(long, default_value = "127.0.0.1", env = "PCTX_HOST")] pub host: String, /// Don't show the server banner @@ -27,6 +28,12 @@ pub struct StartCmd { /// Use stateful MCP sessions (incompatible with --stdio) #[arg(long, conflicts_with = "stdio")] pub stateful_http: bool, + + /// Number of worker processes in the executor pool. + /// Defaults to the number of logical CPUs, capped at 8. + /// Set to 0 to disable the pool and run in-process. + #[arg(long, env = "PCTX_WORKERS")] + pub workers: Option, } impl StartCmd { @@ -53,7 +60,34 @@ impl StartCmd { ); } + let worker_count = self.workers.unwrap_or_else(|| { + std::thread::available_parallelism() + .map(|n| n.get().min(8)) + .unwrap_or(4) + }); + let code_mode = StartCmd::load_code_mode(&cfg).await?; + let code_mode = if worker_count == 0 { + info!("Executor pool disabled (--workers 0), running in-process"); + code_mode + } else { + match PoolConfig::from_current_exe(worker_count) { + Ok(pool_cfg) => match ExecutorPool::new(pool_cfg).await { + Ok(pool) => { + info!("Executor pool ready ({worker_count} workers)"); + code_mode.with_executor_pool(Arc::new(pool)) + } + Err(e) => { + warn!("Failed to start executor pool, falling back to in-process execution: {e}"); + code_mode + } + }, + Err(e) => { + warn!("Could not locate worker binary, falling back to in-process execution: {e}"); + code_mode + } + } + }; let pctx_mcp = PctxMcpServer::new(&cfg, code_mode) .with_banner(!self.no_banner) diff --git a/crates/pctx/src/commands/start.rs b/crates/pctx/src/commands/start.rs index f4137df7..8e64aa00 100644 --- a/crates/pctx/src/commands/start.rs +++ b/crates/pctx/src/commands/start.rs @@ -1,7 +1,9 @@ use anyhow::Result; use camino::Utf8PathBuf; use clap::Parser; +use pctx_code_mode::{ExecutorPool, PoolConfig}; use pctx_session_server::{AppState, start_server}; +use std::sync::Arc; use tabled::{ Table, builder::Builder, @@ -13,7 +15,7 @@ use tabled::{ }, }; use terminal_size::terminal_size; -use tracing::info; +use tracing::{info, warn}; use url::Url; use crate::utils::styles::fmt_dimmed; @@ -23,11 +25,11 @@ const LOGO: &str = include_str!("../../../../assets/ascii-logo.txt"); #[derive(Debug, Clone, Parser)] pub struct StartCmd { /// Port to listen on - #[arg(short, long, default_value = "8080")] + #[arg(short, long, default_value = "8080", env = "PCTX_PORT")] pub port: u16, /// Host address to bind to (use 0.0.0.0 for external access) - #[arg(long, default_value = "127.0.0.1")] + #[arg(long, default_value = "127.0.0.1", env = "PCTX_HOST")] pub host: String, /// Path to session storage directory @@ -42,6 +44,12 @@ pub struct StartCmd { #[arg(long = "allowed-origin")] pub allowed_origins: Vec, + /// Number of worker processes in the executor pool. + /// Defaults to the number of logical CPUs, capped at 8. + /// Set to 0 to disable the pool and run in-process. + #[arg(long, env = "PCTX_WORKERS")] + pub workers: Option, + /// Don't show the server banner #[arg(long)] pub no_banner: bool, @@ -49,7 +57,28 @@ pub struct StartCmd { impl StartCmd { pub(crate) async fn handle(&self) -> Result<()> { - let state = AppState::new_local(); + let worker_count = self.workers.unwrap_or_else(default_workers); + let state = if worker_count == 0 { + info!("Executor pool disabled (--workers 0), running in-process"); + AppState::new_local() + } else { + match PoolConfig::from_current_exe(worker_count) { + Ok(pool_cfg) => match ExecutorPool::new(pool_cfg).await { + Ok(pool) => { + info!("Executor pool ready ({worker_count} workers)"); + AppState::new_local().with_executor_pool(Arc::new(pool)) + } + Err(e) => { + warn!("Failed to start executor pool, falling back to in-process execution: {e}"); + AppState::new_local() + } + }, + Err(e) => { + warn!("Could not locate worker binary, falling back to in-process execution: {e}"); + AppState::new_local() + } + } + }; self.print_banner(); @@ -133,3 +162,10 @@ impl StartCmd { info!("pctx agent server listening at {rest_url}..."); } } + +/// Default worker count: logical CPUs, capped at 8. +fn default_workers() -> usize { + std::thread::available_parallelism() + .map(|n| n.get().min(8)) + .unwrap_or(4) +} diff --git a/crates/pctx_code_mode/src/code_mode.rs b/crates/pctx_code_mode/src/code_mode.rs index 931f422e..9126b811 100644 --- a/crates/pctx_code_mode/src/code_mode.rs +++ b/crates/pctx_code_mode/src/code_mode.rs @@ -1,10 +1,12 @@ use pctx_codegen::{Tool, ToolSet}; use pctx_config::{ToolDisclosure, server::ServerConfig}; +use pctx_executor::ExecutorPool; use pctx_registry::PctxRegistry; use serde::{Deserialize, Serialize}; use serde_json::json; use std::{ collections::{HashMap, HashSet}, + sync::Arc, time::Duration, }; use tracing::{debug, info, instrument, warn}; @@ -28,11 +30,34 @@ pub struct CodeMode { // Virtual filesystem for just-bash exploration virtual_fs: HashMap, + + /// Optional process pool for parallel TypeScript execution. + /// + /// When set, `execute_typescript` and `execute_bash` dispatch to a worker + /// process instead of acquiring the process-wide `V8_MUTEX`, allowing N + /// concurrent executions where N is the pool size. + /// + /// Skipped during serialisation — pools must be re-attached after + /// deserialising a `CodeMode`. + #[serde(skip)] + executor: Option>, } impl CodeMode { // --------------- Builder functions --------------- + /// Attach a process pool so that `execute_typescript` and `execute_bash` + /// dispatch to worker processes instead of serialising through the + /// process-wide V8 lock. + /// + /// The pool must outlive this `CodeMode` (hence `Arc`). Call this after + /// construction, before any executions. + #[must_use] + pub fn with_executor_pool(mut self, pool: Arc) -> Self { + self.executor = Some(pool); + self + } + pub async fn with_server(mut self, server: &ServerConfig) -> Result { self.add_server(server).await?; Ok(self) @@ -180,6 +205,13 @@ impl CodeMode { tool_set.tools.len() ); + // Explicitly cancel the transport task before dropping so the OS-level + // TCP socket and epoll registration are released immediately rather than + // waiting for the Tokio task to be scheduled and exit on its own. + // Under concurrent load, many pending-cancellation tasks accumulate and + // exhaust the process file-descriptor limit (EMFILE / os error 24). + mcp_client.cancellation_token().cancel(); + Ok((tool_set, server.clone())) } @@ -504,8 +536,24 @@ export default result;"#, debug!(to_execute = %to_execute, "Executing bash in sandbox"); - let execution_res = - pctx_executor::execute(&to_execute, pctx_executor::ExecuteOptions::new()).await?; + let execution_res = if let Some(pool) = &self.executor { + pool.execute(&to_execute, pctx_executor::ExecuteOptions::new()) + .await? + } else { + tokio::task::spawn_blocking(move || -> Result<_> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|e| Error::Message(format!("Failed to create Tokio runtime: {e}")))?; + rt.block_on(pctx_executor::execute( + &to_execute, + pctx_executor::ExecuteOptions::new(), + )) + .map_err(Into::into) + }) + .await + .map_err(|e| Error::Message(format!("Task join error: {e}")))?? + }; // Extract stdout and stderr from the bash result object // The output field contains the result object: { stdout, stderr, exitCode } @@ -666,11 +714,27 @@ export default result;"#, debug!(to_execute = %to_execute, "Executing TypeScript in sandbox"); - let execution_res = pctx_executor::execute( - &to_execute, - pctx_executor::ExecuteOptions::new().with_registry(registry), - ) - .await?; + let execution_res = if let Some(pool) = &self.executor { + pool.execute( + &to_execute, + pctx_executor::ExecuteOptions::new().with_registry(registry), + ) + .await? + } else { + tokio::task::spawn_blocking(move || -> Result<_> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|e| Error::Message(format!("Failed to create Tokio runtime: {e}")))?; + rt.block_on(pctx_executor::execute( + &to_execute, + pctx_executor::ExecuteOptions::new().with_registry(registry), + )) + .map_err(Into::into) + }) + .await + .map_err(|e| Error::Message(format!("Task join error: {e}")))?? + }; if execution_res.success { debug!("TypeScript execution completed successfully"); diff --git a/crates/pctx_code_mode/src/lib.rs b/crates/pctx_code_mode/src/lib.rs index 51b8b3a0..e9974184 100644 --- a/crates/pctx_code_mode/src/lib.rs +++ b/crates/pctx_code_mode/src/lib.rs @@ -190,6 +190,7 @@ pub use code_mode::CodeMode; // Re-export config, codegen, registry, crates pub use pctx_codegen as codegen; pub use pctx_config as config; +pub use pctx_executor::{ExecutorPool, PoolConfig}; pub use pctx_registry as registry; pub type Result = std::result::Result; diff --git a/crates/pctx_executor/Cargo.toml b/crates/pctx_executor/Cargo.toml index 96d749ce..dec408cc 100644 --- a/crates/pctx_executor/Cargo.toml +++ b/crates/pctx_executor/Cargo.toml @@ -9,6 +9,10 @@ repository = "https://github.com/portofcontext/pctx" keywords = ["typescript", "executor", "pctx"] categories = ["development-tools"] +[[bin]] +name = "pctx_worker" +path = "src/bin/worker.rs" + [dependencies] serde = { workspace = true } serde_json = { workspace = true } @@ -20,6 +24,9 @@ pctx_deno_transpiler = { version = "^0.1.1", path = "../pctx_deno_transpiler" } pctx_code_execution_runtime = { version = "^0.2.0", path = "../pctx_code_execution_runtime" } pctx_registry = { version = "^0.1.1", path = "../pctx_registry" } pctx_type_check_runtime = { version = "^0.1.3", path = "../pctx_type_check_runtime" } +pctx_config = { version = "^0.1.5", path = "../pctx_config" } +tokio = { workspace = true, features = ["rt", "io-util", "sync", "process", "macros"] } +uuid = { workspace = true } tempfile = "3" regex = "1" thiserror = { workspace = true } diff --git a/crates/pctx_executor/src/bin/worker.rs b/crates/pctx_executor/src/bin/worker.rs new file mode 100644 index 00000000..cd161b34 --- /dev/null +++ b/crates/pctx_executor/src/bin/worker.rs @@ -0,0 +1,231 @@ +/// Worker process binary for the `pctx_executor` process pool. +/// +/// Each worker owns an exclusive V8 platform and processes one +/// [`ExecuteRequest`] at a time, eliminating V8's cross-thread sharing +/// constraint that serialises execution in the parent process. +/// +/// # Lifecycle +/// 1. Writes `{"type":"ready"}` to stdout once V8 is initialised. +/// 2. Reads one `ExecuteRequest` from stdin. +/// 3. Builds a [`PctxRegistry`] with: +/// - Real MCP connections (fresh per request). +/// - Synthetic proxy callbacks that bounce to the parent via IPC. +/// 4. Runs `pctx_executor::execute()` while concurrently reading +/// `CallbackResponse` messages from stdin (driven by `tokio::select!`). +/// 5. Writes the `ExecuteResult` back to stdout and returns to step 2. +use std::{collections::HashMap, sync::Arc}; + +use pctx_executor::{ + DenoExecutorError, ExecuteOptions, ExecuteResult, + ipc::{read_msg, write_msg}, + protocol::{CallbackRequest, ExecuteResultMsg, WorkerMessage, WorkerRequest}, +}; +use pctx_registry::{CallbackFn, PctxRegistry}; +use serde_json::Value; +use tokio::{ + io::{AsyncWriteExt, BufReader, BufWriter, stdin, stdout}, + sync::{Mutex, oneshot}, +}; +use uuid::Uuid; + +/// Pending callback invocations: `call_id` → `oneshot::Sender`. +type Pending = Arc>>>>; + +#[tokio::main(flavor = "current_thread")] +async fn main() { + // Wrap stdout in Arc> so synthetic callbacks can write to it + // concurrently with the main loop (all on the same single thread, so the + // mutex never actually contends, but the types require it). + let stdout_shared = Arc::new(Mutex::new(BufWriter::new(stdout()))); + let mut stdin = BufReader::new(stdin()); + + // Signal ready – this also triggers the first V8 platform init via the + // LazyLock inside pctx_executor when execute() is first called. + send_msg(&stdout_shared, &WorkerMessage::Ready).await; + + loop { + let req: WorkerRequest = match read_msg(&mut stdin).await { + Ok(r) => r, + Err(_) => break, // EOF = parent closed the pipe; exit cleanly. + }; + + let WorkerRequest::Execute(exec_req) = req else { + // Unexpected message type before an Execute request – skip. + continue; + }; + + let request_id = exec_req.request_id; + let pending: Pending = Arc::new(Mutex::new(HashMap::new())); + + let registry = build_registry(&exec_req, pending.clone(), stdout_shared.clone()); + let code = exec_req.code.clone(); + + let result_msg = + run_execution(request_id, code, registry, &mut stdin, &pending).await; + + send_msg(&stdout_shared, &WorkerMessage::ExecuteResult(result_msg)).await; + } +} + +// --------------------------------------------------------------------------- +// Core execution loop +// --------------------------------------------------------------------------- + +/// Drive `execute()` and read `CallbackResponse` messages from stdin +/// concurrently on the single-thread runtime using `tokio::select!`. +/// +/// When a synthetic callback suspends waiting for a parent response, the +/// `select!` reads that response from stdin and satisfies the `oneshot` +/// channel, which wakes the Deno event loop. +async fn run_execution( + request_id: Uuid, + code: String, + registry: PctxRegistry, + stdin: &mut BufReader, + pending: &Pending, +) -> ExecuteResultMsg { + let opts = ExecuteOptions::new().with_registry(registry); + let exec_fut = pctx_executor::execute(&code, opts); + tokio::pin!(exec_fut); + + loop { + tokio::select! { + result = &mut exec_fut => { + return to_result_msg(request_id, result); + } + maybe_msg = read_msg::<_, WorkerRequest>(stdin) => { + match maybe_msg { + Ok(WorkerRequest::CallbackResponse(resp)) => { + let mut p = pending.lock().await; + if let Some(tx) = p.remove(&resp.callback_call_id) { + // Ignore send errors: the callback future may have + // been cancelled if execute() returned already. + let _ = tx.send(resp.result); + } + } + Ok(_) => {} // unexpected; ignore + Err(_) => break, // parent closed stdin + } + } + } + } + + // Reached only if stdin closed mid-execution. + to_result_msg( + request_id, + Err(DenoExecutorError::InternalError( + "parent process disconnected during execution".into(), + )), + ) +} + +// --------------------------------------------------------------------------- +// Registry construction +// --------------------------------------------------------------------------- + +/// Build a [`PctxRegistry`] for this execution request. +/// +/// Every tool ID — whether a Rust callback or an MCP server tool — is backed +/// by a synthetic IPC-proxy closure. When TypeScript invokes a tool, the +/// worker sends a [`CallbackRequest`] to the parent and suspends; the parent +/// calls its own `registry.invoke()` (which uses the live session-scoped +/// connection pool) and sends back a [`CallbackResponse`]. +/// +/// This means the worker never opens direct MCP connections, so it always +/// benefits from the parent's already-established and session-cached pool. +fn build_registry( + exec_req: &pctx_executor::protocol::ExecuteRequest, + pending: Pending, + stdout_shared: Arc>>, +) -> PctxRegistry { + let registry = PctxRegistry::default(); + + for tool_id in &exec_req.all_tool_ids { + let id = tool_id.clone(); + let pending = pending.clone(); + let out = stdout_shared.clone(); + + let cb: CallbackFn = Arc::new(move |args: Option| { + let id = id.clone(); + let pending = pending.clone(); + let out = out.clone(); + + Box::pin(async move { + let call_id = Uuid::new_v4(); + let (tx, rx) = oneshot::channel::>(); + + { + let mut p = pending.lock().await; + p.insert(call_id, tx); + } + + let cb_req = CallbackRequest { + callback_call_id: call_id, + callback_id: id.clone(), + args, + }; + send_msg(&out, &WorkerMessage::CallbackRequest(cb_req)).await; + + rx.await + .unwrap_or_else(|_| Err("callback channel dropped".into())) + }) + }); + + if let Err(e) = registry.add_callback(tool_id, cb) { + eprintln!("[pctx_worker] failed to register tool proxy {tool_id}: {e}"); + } + } + + registry +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/// Write a message to the shared stdout and flush. +async fn send_msg( + out: &Arc>>, + msg: &T, +) { + let mut guard = out.lock().await; + write_msg(&mut *guard, msg) + .await + .expect("worker: write to stdout"); + guard.flush().await.expect("worker: flush stdout"); +} + +/// Convert an `execute()` result into the wire-format [`ExecuteResultMsg`]. +fn to_result_msg( + request_id: Uuid, + result: pctx_executor::Result, +) -> ExecuteResultMsg { + match result { + Ok(r) => ExecuteResultMsg { + request_id, + success: r.success, + diagnostics: r.diagnostics, + runtime_error: r.runtime_error, + output: r.output, + stdout: r.stdout, + stderr: r.stderr, + events: r.trace.events, + }, + Err(e) => { + let msg = e.to_string(); + ExecuteResultMsg { + request_id, + success: false, + diagnostics: vec![], + runtime_error: Some(pctx_executor::ExecutionError { + message: msg.clone(), + stack: None, + }), + output: None, + stdout: String::new(), + stderr: msg, + events: vec![], + } + } + } +} diff --git a/crates/pctx_executor/src/ipc.rs b/crates/pctx_executor/src/ipc.rs new file mode 100644 index 00000000..cfb4f185 --- /dev/null +++ b/crates/pctx_executor/src/ipc.rs @@ -0,0 +1,40 @@ +/// Async JSON-lines IPC helpers shared by the pool manager and worker binary. +/// +/// Each message is a single line of JSON terminated by `\n`. The writer flushes +/// must be called by the caller after writing (to avoid buffering surprises). +use serde::{Serialize, de::DeserializeOwned}; +use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt}; + +/// Serialize `msg` as a single JSON line and write it to `writer`. +/// +/// Does **not** flush; the caller must flush when appropriate. +pub async fn write_msg(writer: &mut W, msg: &T) -> std::io::Result<()> +where + W: AsyncWrite + Unpin, + T: Serialize, +{ + let mut line = serde_json::to_string(msg) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + line.push('\n'); + writer.write_all(line.as_bytes()).await +} + +/// Read one `\n`-terminated JSON line from `reader` and deserialize it. +/// +/// Returns `Err` with `UnexpectedEof` if the stream ends before a complete line. +pub async fn read_msg(reader: &mut R) -> std::io::Result +where + R: AsyncBufRead + Unpin, + T: DeserializeOwned, +{ + let mut line = String::new(); + let n = reader.read_line(&mut line).await?; + if n == 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "IPC channel closed (EOF)", + )); + } + serde_json::from_str(line.trim_end()) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)) +} diff --git a/crates/pctx_executor/src/lib.rs b/crates/pctx_executor/src/lib.rs index 58c3b330..534ff595 100644 --- a/crates/pctx_executor/src/lib.rs +++ b/crates/pctx_executor/src/lib.rs @@ -13,6 +13,11 @@ use thiserror::Error; use tracing::{debug, warn}; pub mod events; +pub mod ipc; +pub mod pool; +pub mod protocol; + +pub use pool::{ExecutorPool, PoolConfig}; /// Process-wide mutex to serialize all V8 isolate creation and usage. /// @@ -531,3 +536,6 @@ fn process_execution_results( pub fn version() -> &'static str { env!("CARGO_PKG_VERSION") } + +#[cfg(test)] +mod tests; diff --git a/crates/pctx_executor/src/pool.rs b/crates/pctx_executor/src/pool.rs new file mode 100644 index 00000000..7ab47a26 --- /dev/null +++ b/crates/pctx_executor/src/pool.rs @@ -0,0 +1,314 @@ +/// Process pool for parallel TypeScript execution. +/// +/// [`ExecutorPool`] manages N worker sub-processes, each with its own V8 +/// platform. Requests are dispatched round-robin so that N executions can run +/// truly in parallel, eliminating the contention of the process-wide +/// `V8_MUTEX` when all callers share a single process. +/// +/// # Callback proxying +/// +/// The worker process cannot hold Rust closures, so when TypeScript code calls +/// a callback registered in the parent's [`PctxRegistry`], the worker sends a +/// [`CallbackRequest`] IPC message and suspends. The pool's `execute()` loop +/// receives that message, invokes the real callback via the parent's registry, +/// and sends the result back as a [`CallbackResponse`]. The worker resumes +/// transparently. +/// +/// # Connection pool caching +/// +/// MCP connections made *inside* a worker are independent of the parent +/// process's connection pool. The [`ExecuteResult`] returned by +/// [`ExecutorPool::execute`] contains the *original* registry that was passed +/// in (so pool caching by the session / MCP server layers continues to work +/// for parent-side connections), but the worker will reconnect on every +/// request. Session-affinity routing is a planned follow-up to address this. +use std::{ + path::PathBuf, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + time::SystemTime, +}; + +use pctx_registry::PctxRegistry; +use tokio::{ + io::{AsyncWriteExt, BufReader, BufWriter}, + process::{Child, ChildStdin, ChildStdout, Command}, + sync::Mutex, +}; +use tracing::{debug, info, warn}; +use uuid::Uuid; + +use crate::{ + DenoExecutorError, ExecuteOptions, ExecuteResult, + events::{ExecutionEvent, ExecutionTrace}, + ipc::{read_msg, write_msg}, + protocol::{CallbackResponse, ExecuteRequest, WorkerMessage, WorkerRequest}, +}; + +// --------------------------------------------------------------------------- +// Public API +// --------------------------------------------------------------------------- + +/// Configuration for an [`ExecutorPool`]. +pub struct PoolConfig { + /// Number of worker processes to spawn (one V8 platform each). + pub worker_count: usize, + /// Path to the `pctx_worker` binary. + pub worker_binary: PathBuf, +} + +impl PoolConfig { + /// Build a config that locates `pctx_worker` as a sibling of the current + /// executable (i.e., same `target/debug` or `target/release` directory). + /// + /// # Errors + /// Returns an error if `std::env::current_exe()` fails. + pub fn from_current_exe(worker_count: usize) -> std::io::Result { + let exe = std::env::current_exe()?; + let dir = exe.parent().ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::NotFound, + "current executable has no parent directory", + ) + })?; + Ok(Self { + worker_count, + worker_binary: dir.join("pctx_worker"), + }) + } +} + +/// A pool of worker processes that execute TypeScript in parallel. +/// +/// Construct with [`ExecutorPool::new`], then call [`ExecutorPool::execute`] +/// as a drop-in replacement for [`crate::execute`]. +pub struct ExecutorPool { + workers: Vec>>, + next: AtomicUsize, +} + +impl std::fmt::Debug for ExecutorPool { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ExecutorPool") + .field("worker_count", &self.workers.len()) + .finish_non_exhaustive() + } +} + +impl ExecutorPool { + /// Spawn all worker processes and wait for each to signal readiness. + /// + /// # Errors + /// Returns an error if any worker fails to start or does not send a + /// `Ready` message within a reasonable time. + pub async fn new(config: PoolConfig) -> std::io::Result { + let mut workers = Vec::with_capacity(config.worker_count); + for i in 0..config.worker_count { + let handle = spawn_worker(&config.worker_binary).await.map_err(|e| { + std::io::Error::new( + e.kind(), + format!("failed to spawn worker {i}: {e}"), + ) + })?; + info!(worker = i, "Worker process ready"); + workers.push(Arc::new(Mutex::new(handle))); + } + Ok(Self { + workers, + next: AtomicUsize::new(0), + }) + } + + /// Execute TypeScript code on the next available worker. + /// + /// This is a drop-in replacement for [`crate::execute`]: same inputs, same + /// output. Callback invocations are proxied back to the parent's registry + /// transparently. + /// + /// # Errors + /// Returns [`DenoExecutorError::InternalError`] if the IPC channel fails + /// (e.g., the worker process crashed). + pub async fn execute( + &self, + code: &str, + options: ExecuteOptions, + ) -> crate::Result { + let worker_idx = self.next.fetch_add(1, Ordering::Relaxed) % self.workers.len(); + let mut worker = self.workers[worker_idx].lock().await; + + let started_at = SystemTime::now(); + let request_id = Uuid::new_v4(); + + let all_tool_ids = options.registry.ids(); + + let req = ExecuteRequest { + request_id, + code: code.to_string(), + all_tool_ids, + }; + + debug!( + worker = worker_idx, + %request_id, + code_len = code.len(), + "Dispatching to worker", + ); + + // Send the execute request. + write_msg(&mut worker.stdin, &WorkerRequest::Execute(req)) + .await + .map_err(|e| DenoExecutorError::InternalError(format!("IPC write: {e}")))?; + worker + .stdin + .flush() + .await + .map_err(|e| DenoExecutorError::InternalError(format!("IPC flush: {e}")))?; + + // Drive the message loop until the worker returns a result. + // Callback requests are intercepted and proxied here. + let result_msg = loop { + let msg: WorkerMessage = read_msg(&mut worker.stdout) + .await + .map_err(|e| DenoExecutorError::InternalError(format!("IPC read: {e}")))?; + + match msg { + WorkerMessage::ExecuteResult(r) => break r, + + WorkerMessage::CallbackRequest(cb) => { + debug!( + callback_id = %cb.callback_id, + call_id = %cb.callback_call_id, + "Proxying callback to parent registry", + ); + let args_obj = cb.args.as_ref().and_then(|v| v.as_object().cloned()); + let result = options + .registry + .invoke(&cb.callback_id, args_obj) + .await + .map_err(|e| e.to_string()); + + let resp = WorkerRequest::CallbackResponse(CallbackResponse { + callback_call_id: cb.callback_call_id, + result, + }); + write_msg(&mut worker.stdin, &resp) + .await + .map_err(|e| DenoExecutorError::InternalError(format!("IPC write: {e}")))?; + worker + .stdin + .flush() + .await + .map_err(|e| DenoExecutorError::InternalError(format!("IPC flush: {e}")))?; + } + + WorkerMessage::Ready => { + warn!("Received unexpected Ready message during execution; ignoring"); + } + } + }; + + let ended_at = SystemTime::now(); + + debug!( + worker = worker_idx, + %request_id, + success = result_msg.success, + "Worker execution complete", + ); + + Ok(reconstruct_result( + result_msg, + options.registry, + code, + started_at, + ended_at, + )) + } +} + +// --------------------------------------------------------------------------- +// Internal helpers +// --------------------------------------------------------------------------- + +struct WorkerHandle { + stdin: BufWriter, + stdout: BufReader, + /// Kept alive so the child process is not killed on drop. + _child: Child, +} + +/// Spawn one worker process and wait for the `Ready` handshake. +async fn spawn_worker(binary: &PathBuf) -> std::io::Result { + let mut child = Command::new(binary) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::inherit()) + .spawn() + .map_err(|e| { + std::io::Error::new( + e.kind(), + format!("failed to spawn {}: {e}", binary.display()), + ) + })?; + + let stdin = BufWriter::new(child.stdin.take().expect("stdin piped")); + let stdout = BufReader::new(child.stdout.take().expect("stdout piped")); + + let mut handle = WorkerHandle { + stdin, + stdout, + _child: child, + }; + + // Wait for the worker to finish V8 platform initialisation. + let first_msg: WorkerMessage = read_msg(&mut handle.stdout).await.map_err(|e| { + std::io::Error::new( + e.kind(), + format!("worker did not send Ready signal: {e}"), + ) + })?; + + if !matches!(first_msg, WorkerMessage::Ready) { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "worker sent unexpected first message (expected Ready)", + )); + } + + Ok(handle) +} + + +/// Reconstruct an [`ExecuteResult`] from the worker's response message. +/// +/// The original registry is returned unchanged so the caller's connection-pool +/// caching continues to work for parent-side MCP connections. +fn reconstruct_result( + msg: crate::protocol::ExecuteResultMsg, + original_registry: PctxRegistry, + code: &str, + started_at: SystemTime, + ended_at: SystemTime, +) -> ExecuteResult { + // The events vec arriving from the worker already includes the TypeCheck + // event and all registry events, sorted by started_at. + let events: Vec = msg.events; + + ExecuteResult { + success: msg.success, + diagnostics: msg.diagnostics, + runtime_error: msg.runtime_error, + output: msg.output, + stdout: msg.stdout, + stderr: msg.stderr, + registry: original_registry, + trace: ExecutionTrace { + code: code.to_string(), + started_at, + ended_at, + events, + }, + } +} diff --git a/crates/pctx_executor/src/protocol.rs b/crates/pctx_executor/src/protocol.rs new file mode 100644 index 00000000..302ba72b --- /dev/null +++ b/crates/pctx_executor/src/protocol.rs @@ -0,0 +1,96 @@ +/// IPC message types exchanged between the pool manager (parent process) and +/// worker processes over newline-delimited JSON on stdin/stdout. +/// +/// Parent → Worker: [`WorkerRequest`] +/// Worker → Parent: [`WorkerMessage`] +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::{Diagnostic, ExecutionError, events::ExecutionEvent}; + +// --------------------------------------------------------------------------- +// Parent → Worker +// --------------------------------------------------------------------------- + +/// A message sent from the pool manager to a worker. +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum WorkerRequest { + /// Begin executing the given code. + Execute(ExecuteRequest), + /// Return value for a callback the worker asked the parent to invoke. + CallbackResponse(CallbackResponse), +} + +/// Payload for a [`WorkerRequest::Execute`] message. +/// +/// Instead of sending MCP server configs and having the worker reconnect, +/// all tool IDs (both callbacks and MCP tools) are sent as `all_tool_ids`. +/// The worker creates an IPC-proxy stub for each ID so every tool call is +/// routed back to the parent's registry, which holds the live connection pool. +#[derive(Debug, Serialize, Deserialize)] +pub struct ExecuteRequest { + /// Correlates this request with the eventual [`ExecuteResultMsg`]. + pub request_id: Uuid, + /// Fully-prepared TypeScript code (post code-generation wrapping). + pub code: String, + /// Every tool ID registered in the parent's registry — both MCP tool IDs + /// (formatted as `"server__tool"`) and callback IDs. The worker creates + /// an IPC-proxy callback for each so no direct connections leave the worker. + pub all_tool_ids: Vec, +} + +/// The parent's answer to a [`CallbackRequest`] the worker sent earlier. +#[derive(Debug, Serialize, Deserialize)] +pub struct CallbackResponse { + /// Matches the `callback_call_id` in the original [`CallbackRequest`]. + pub callback_call_id: Uuid, + /// `Ok(value)` on success, `Err(message)` on failure. + pub result: Result, +} + +// --------------------------------------------------------------------------- +// Worker → Parent +// --------------------------------------------------------------------------- + +/// A message sent from a worker to the pool manager. +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum WorkerMessage { + /// V8 platform is initialised; the worker is ready to accept requests. + Ready, + /// Execution has finished (success or failure). + ExecuteResult(ExecuteResultMsg), + /// A callback registered in the parent must be invoked. + CallbackRequest(CallbackRequest), +} + +/// Payload for a [`WorkerMessage::ExecuteResult`] message. +#[derive(Debug, Serialize, Deserialize)] +pub struct ExecuteResultMsg { + /// Matches the `request_id` from the originating [`ExecuteRequest`]. + pub request_id: Uuid, + pub success: bool, + /// Type-check diagnostics (non-empty only when type checking failed). + pub diagnostics: Vec, + /// Runtime error (set when execution threw an unhandled exception). + pub runtime_error: Option, + /// Default export value from the module. + pub output: Option, + pub stdout: String, + pub stderr: String, + /// Full ordered event log (TypeCheck + registry events), already sorted + /// by `started_at`. + pub events: Vec, +} + +/// A request from the worker to invoke a parent-side callback. +#[derive(Debug, Serialize, Deserialize)] +pub struct CallbackRequest { + /// Unique ID for this specific invocation (used to match the response). + pub callback_call_id: Uuid, + /// The callback ID as registered in the parent's [`PctxRegistry`]. + pub callback_id: String, + /// Arguments passed from TypeScript. + pub args: Option, +} diff --git a/crates/pctx_executor/src/tests/ipc.rs b/crates/pctx_executor/src/tests/ipc.rs new file mode 100644 index 00000000..fe4ba26e --- /dev/null +++ b/crates/pctx_executor/src/tests/ipc.rs @@ -0,0 +1,60 @@ +use serde_json::{Value, json}; +use tokio::io::{BufReader, duplex}; + +use crate::ipc::{read_msg, write_msg}; + +/// Write a value and read it back through an in-memory pipe. +#[tokio::test] +async fn round_trip_json_value() { + let (mut writer, reader) = duplex(4096); + let mut reader = BufReader::new(reader); + + let sent = json!({"hello": "world", "n": 42, "flag": true}); + write_msg(&mut writer, &sent).await.unwrap(); + + let received: Value = read_msg(&mut reader).await.unwrap(); + assert_eq!(sent, received); +} + +/// Multiple messages are read in order. +#[tokio::test] +async fn multiple_messages_in_order() { + let (mut writer, reader) = duplex(4096); + let mut reader = BufReader::new(reader); + + for i in 0u32..5 { + write_msg(&mut writer, &json!({"i": i})).await.unwrap(); + } + + for i in 0u32..5 { + let msg: Value = read_msg(&mut reader).await.unwrap(); + assert_eq!(msg["i"], i); + } +} + +/// Closing the write end produces `UnexpectedEof` on the read side. +#[tokio::test] +async fn eof_returns_unexpected_eof_error() { + let (writer, reader) = duplex(4096); + let mut reader = BufReader::new(reader); + + // Drop the writer immediately — no bytes written. + drop(writer); + + let err = read_msg::<_, Value>(&mut reader).await.unwrap_err(); + assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof); +} + +/// Garbled bytes produce an `InvalidData` error, not a panic. +#[tokio::test] +async fn invalid_json_returns_invalid_data_error() { + use tokio::io::AsyncWriteExt; + + let (mut writer, reader) = duplex(4096); + let mut reader = BufReader::new(reader); + + writer.write_all(b"not valid json\n").await.unwrap(); + + let err = read_msg::<_, Value>(&mut reader).await.unwrap_err(); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidData); +} diff --git a/crates/pctx_executor/src/tests/mod.rs b/crates/pctx_executor/src/tests/mod.rs index 17b0d1a0..f036e5d0 100644 --- a/crates/pctx_executor/src/tests/mod.rs +++ b/crates/pctx_executor/src/tests/mod.rs @@ -18,8 +18,10 @@ mod callback_usage; mod concurrent_v8_stress; mod default_export_capture; mod diagnostic_filtering; +mod ipc; mod just_bash; mod mcp_client_usage; mod output_capture; +mod pool; mod runtime_execution; mod type_checking; diff --git a/crates/pctx_executor/src/tests/pool.rs b/crates/pctx_executor/src/tests/pool.rs new file mode 100644 index 00000000..1c15a196 --- /dev/null +++ b/crates/pctx_executor/src/tests/pool.rs @@ -0,0 +1,238 @@ +/// Integration tests for [`ExecutorPool`]. +/// +/// These tests spawn real `pctx_worker` sub-processes. `cargo test -p +/// pctx_executor` builds all binaries in the package (including +/// `pctx_worker`) before running tests, so the binary is always available +/// at `target/{profile}/pctx_worker` — one directory above the test +/// runner binary in `target/{profile}/deps/`. +/// +/// Pool tests do NOT need `#[serial]`: the V8 mutex lives inside each +/// worker process, not in the test process. +use std::sync::Arc; + +use serde_json::json; + +use crate::{ExecuteOptions, ExecutorPool, PoolConfig}; +use pctx_registry::PctxRegistry; + +/// Locate `pctx_worker` relative to the running test binary. +fn worker_pool(worker_count: usize) -> PoolConfig { + let test_exe = std::env::current_exe().expect("current_exe"); + // test binary: .../target/debug/deps/pctx_executor- + // worker binary: .../target/debug/pctx_worker + let bin_dir = test_exe + .parent() // .../deps + .and_then(|d| d.parent()) // .../debug (or release) + .expect("could not determine bin dir from test exe path"); + + PoolConfig { + worker_count, + worker_binary: bin_dir.join("pctx_worker"), + } +} + +// ───────────────────────────────────────────────────────────────────────────── +// Basic execution +// ───────────────────────────────────────────────────────────────────────────── + +#[tokio::test] +async fn pool_executes_simple_code() { + let pool = ExecutorPool::new(worker_pool(1)).await.expect("pool"); + let result = pool + .execute("export default 42;", ExecuteOptions::new()) + .await + .expect("execute"); + + assert!(result.success); + assert_eq!(result.output, Some(json!(42))); + assert!(result.runtime_error.is_none()); + assert!(result.diagnostics.is_empty()); +} + +#[tokio::test] +async fn pool_captures_stdout() { + let pool = ExecutorPool::new(worker_pool(1)).await.expect("pool"); + let result = pool + .execute( + r#"console.log("hello from worker"); export default null;"#, + ExecuteOptions::new(), + ) + .await + .expect("execute"); + + assert!(result.success); + assert!(result.stdout.contains("hello from worker")); +} + +#[tokio::test] +async fn pool_reports_runtime_error() { + let pool = ExecutorPool::new(worker_pool(1)).await.expect("pool"); + let result = pool + .execute( + r#"throw new Error("boom"); export default null;"#, + ExecuteOptions::new(), + ) + .await + .expect("execute returned Err — expected Ok with runtime_error set"); + + assert!(!result.success); + assert!(result.runtime_error.is_some()); + assert!( + result + .runtime_error + .as_ref() + .unwrap() + .message + .contains("boom") + ); +} + +// ───────────────────────────────────────────────────────────────────────────── +// Callback proxying — the core new IPC path +// ───────────────────────────────────────────────────────────────────────────── + +#[tokio::test] +async fn pool_proxies_callback_to_parent() { + let registry = PctxRegistry::default(); + registry + .add_callback( + "math.add", + Arc::new(|args: Option| { + Box::pin(async move { + let a = args.as_ref().and_then(|v| v["a"].as_i64()).unwrap_or(0); + let b = args.as_ref().and_then(|v| v["b"].as_i64()).unwrap_or(0); + Ok(json!(a + b)) + }) + }), + ) + .expect("add_callback"); + + let pool = ExecutorPool::new(worker_pool(1)).await.expect("pool"); + let result = pool + .execute( + r#" +async function run() { + return await invokeInternal({ name: "math.add", arguments: { a: 7, b: 3 } }); +} +export default await run(); +"#, + ExecuteOptions::new().with_registry(registry), + ) + .await + .expect("execute"); + + assert!(result.success, "stderr: {}", result.stderr); + assert_eq!(result.output, Some(json!(10))); +} + +#[tokio::test] +async fn pool_proxies_multiple_callback_invocations() { + let registry = PctxRegistry::default(); + registry + .add_callback( + "counter.inc", + Arc::new(|args: Option| { + Box::pin(async move { + let n = args.as_ref().and_then(|v| v["n"].as_i64()).unwrap_or(0); + Ok(json!(n + 1)) + }) + }), + ) + .expect("add_callback"); + + let pool = ExecutorPool::new(worker_pool(1)).await.expect("pool"); + let result = pool + .execute( + r#" +async function run() { + let v = 0; + for (let i = 0; i < 5; i++) { + v = await invokeInternal({ name: "counter.inc", arguments: { n: v } }); + } + return v; +} +export default await run(); +"#, + ExecuteOptions::new().with_registry(registry), + ) + .await + .expect("execute"); + + assert!(result.success, "stderr: {}", result.stderr); + assert_eq!(result.output, Some(json!(5))); +} + +// ───────────────────────────────────────────────────────────────────────────── +// MCP tool proxying — no direct connections from the worker +// ───────────────────────────────────────────────────────────────────────────── + +/// Registers a tool with an MCP-style ID (`"server__tool"`) as a plain Rust +/// callback in the parent registry. The worker must proxy the call back to +/// the parent rather than attempting a live MCP connection. +/// +/// This is the regression test for the fix: previously the worker would try +/// to open a fresh HTTP connection to the MCP server, which fails in any +/// environment that doesn't have access to that host. Now the worker only +/// holds IPC-proxy stubs and the parent's registry handles the actual dispatch. +#[tokio::test] +async fn pool_proxies_mcp_style_tool_id_through_parent() { + let registry = PctxRegistry::default(); + + // Register with the same `"server__tool"` format the MCP registry uses. + // The callback simulates what the real MCP server would return. + registry + .add_callback( + "my_server__get_item", + Arc::new(|args: Option| { + Box::pin(async move { + let id = args.as_ref().and_then(|v| v["id"].as_str()).unwrap_or("?"); + Ok(json!({ "id": id, "name": "widget" })) + }) + }), + ) + .expect("add_callback"); + + let pool = ExecutorPool::new(worker_pool(1)).await.expect("pool"); + let result = pool + .execute( + r#" +async function run() { + return await invokeInternal({ + name: "my_server__get_item", + arguments: { id: "abc123" }, + }); +} +export default await run(); +"#, + ExecuteOptions::new().with_registry(registry), + ) + .await + .expect("execute"); + + assert!(result.success, "stderr: {}", result.stderr); + assert_eq!(result.output, Some(json!({ "id": "abc123", "name": "widget" }))); +} + +// ───────────────────────────────────────────────────────────────────────────── +// Round-robin dispatch +// ───────────────────────────────────────────────────────────────────────────── + +#[tokio::test] +async fn pool_reuses_workers_across_executions() { + // A 2-worker pool running 6 executions: verifies the pool doesn't break + // after the first round of requests and workers handle reuse correctly. + let pool = ExecutorPool::new(worker_pool(2)).await.expect("pool"); + + for i in 0i32..6 { + let result = pool + .execute( + &format!("export default {i};"), + ExecuteOptions::new(), + ) + .await + .expect("execute"); + + assert!(result.success); + assert_eq!(result.output, Some(json!(i))); + } +} diff --git a/crates/pctx_mcp_server/src/service.rs b/crates/pctx_mcp_server/src/service.rs index ca1453b3..ef438b2e 100644 --- a/crates/pctx_mcp_server/src/service.rs +++ b/crates/pctx_mcp_server/src/service.rs @@ -177,38 +177,14 @@ impl PctxMcpService { &self, Parameters(input): Parameters, ) -> McpResult { - // Capture current tracing context to propagate to spawned thread - let current_span = tracing::Span::current(); - - let code_mode = self.code_mode.clone(); - let command = input.command; - - let execution_output = tokio::task::spawn_blocking(move || -> Result<_, anyhow::Error> { - // Enter the captured span context in the new thread - let _guard = current_span.enter(); - - // Create a new current-thread runtime for Deno ops that use deno_unsync - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .map_err(|e| anyhow::anyhow!("Failed to create runtime: {e}"))?; - - rt.block_on(async { - code_mode - .execute_bash(&command) - .await - .map_err(|e| anyhow::anyhow!("Execution error: {e}")) - }) - }) - .await - .map_err(|e| { - error!("Task join failed: {e}"); - rmcp::ErrorData::internal_error(format!("Task join failed: {e}"), None) - })? - .map_err(|e| { - error!("Sandbox execution error: {e}"); - rmcp::ErrorData::internal_error(format!("Execution failed: {e}"), None) - })?; + let execution_output = self + .code_mode + .execute_bash(&input.command) + .await + .map_err(|e| { + error!("Bash execution error: {e}"); + rmcp::ErrorData::internal_error(format!("Execution failed: {e}"), None) + })?; Ok(CallToolResult::success(vec![Content::text( execution_output.to_string(), @@ -289,40 +265,16 @@ impl PctxMcpService { input: ExecuteTypescriptInput, session_id: Option, ) -> McpResult { - // Capture current tracing context to propagate to spawned thread - let current_span = tracing::Span::current(); - let registry = self.get_pctx_registry(session_id.as_deref()).await?; - let code_mode = self.code_mode.clone(); - let code = input.code; - let style = self.disclosure; - - let execution_output = tokio::task::spawn_blocking(move || -> Result<_, anyhow::Error> { - // Enter the captured span context in the new thread - let _guard = current_span.enter(); - - // Create a new current-thread runtime for Deno ops that use deno_unsync - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .map_err(|e| anyhow::anyhow!("Failed to create runtime: {e}"))?; - - rt.block_on(async { - code_mode - .execute_typescript(&code, style, Some(registry)) - .await - .map_err(|e| anyhow::anyhow!("Execution error: {e}")) - }) - }) - .await - .map_err(|e| { - error!("Task join failed: {e}"); - rmcp::ErrorData::internal_error(format!("Task join failed: {e}"), None) - })? - .map_err(|e| { - error!("Sandbox execution error: {e}"); - rmcp::ErrorData::internal_error(format!("Execution failed: {e}"), None) - })?; + + let execution_output = self + .code_mode + .execute_typescript(&input.code, self.disclosure, Some(registry)) + .await + .map_err(|e| { + error!("Sandbox execution error: {e}"); + rmcp::ErrorData::internal_error(format!("Execution failed: {e}"), None) + })?; self.cache_pool(session_id.as_deref(), execution_output.registry.pool())?; diff --git a/crates/pctx_registry/src/registry.rs b/crates/pctx_registry/src/registry.rs index 1e7919bb..2223045a 100644 --- a/crates/pctx_registry/src/registry.rs +++ b/crates/pctx_registry/src/registry.rs @@ -71,6 +71,62 @@ pub struct PctxRegistry { } impl PctxRegistry { + /// Returns every MCP server registered in this registry as a list of + /// `(tool_names, ServerConfig)` pairs. + /// + /// Used by the process pool to reconstruct a registry inside a worker + /// process that only has access to serialisable data. + /// + /// # Panics + /// + /// Panics if either internal lock is poisoned. + pub fn mcp_servers(&self) -> Vec<(Vec, ServerConfig)> { + let actions = self.actions.read().unwrap(); + let servers = self.servers.read().unwrap(); + + let mut by_server: std::collections::HashMap> = + std::collections::HashMap::new(); + + for action in actions.values() { + if let RegistryAction::Mcp(mcp_id) = action { + by_server + .entry(mcp_id.sever_name.clone()) + .or_default() + .push(mcp_id.tool_name.clone()); + } + } + + by_server + .into_iter() + .filter_map(|(server_name, tool_names)| { + servers.get(&server_name).map(|cfg| (tool_names, cfg.clone())) + }) + .collect() + } + + /// Returns the IDs of every callback registered in this registry. + /// + /// Used by the process pool to tell a worker which callback IDs to proxy + /// back to the parent process. + /// + /// # Panics + /// + /// Panics if the internal lock is poisoned. + pub fn callback_ids(&self) -> Vec { + self.actions + .read() + .unwrap() + .iter() + .filter_map(|(id, action)| { + if matches!(action, RegistryAction::Callback(_)) { + Some(id.clone()) + } else { + None + } + }) + .collect() + } + /// Returns the ids of this Pctx Registry. /// /// # Panics @@ -406,3 +462,97 @@ impl std::fmt::Display for PctxRegistry { ) } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use serde_json::json; + + use super::PctxRegistry; + + fn noop_callback() -> super::CallbackFn { + Arc::new(|_args| Box::pin(async { Ok(json!(null)) })) + } + + // ── callback_ids() ─────────────────────────────────────────────────────── + + #[test] + fn callback_ids_empty_when_no_callbacks_registered() { + let registry = PctxRegistry::default(); + assert!(registry.callback_ids().is_empty()); + } + + #[test] + fn callback_ids_returns_all_registered_ids() { + let registry = PctxRegistry::default(); + registry.add_callback("ns.foo", noop_callback()).unwrap(); + registry.add_callback("ns.bar", noop_callback()).unwrap(); + + let mut ids = registry.callback_ids(); + ids.sort(); + assert_eq!(ids, vec!["ns.bar", "ns.foo"]); + } + + #[test] + fn callback_ids_excludes_mcp_actions() { + // An MCP action registered via add_mcp should NOT appear in callback_ids. + use pctx_config::server::{HttpServerConfig, ServerConfig, ServerTransport}; + let registry = PctxRegistry::default(); + registry + .add_mcp( + &["my_tool".to_string()], + ServerConfig { + name: "my_server".to_string(), + transport: ServerTransport::Http(HttpServerConfig { + url: "http://localhost:1234".parse().unwrap(), + auth: None, + }), + }, + ) + .unwrap(); + registry.add_callback("cb.one", noop_callback()).unwrap(); + + let ids = registry.callback_ids(); + assert_eq!(ids, vec!["cb.one"]); + } + + // ── mcp_servers() ──────────────────────────────────────────────────────── + + #[test] + fn mcp_servers_empty_when_no_servers_registered() { + let registry = PctxRegistry::default(); + assert!(registry.mcp_servers().is_empty()); + } + + #[test] + fn mcp_servers_excludes_callbacks() { + let registry = PctxRegistry::default(); + registry.add_callback("cb.one", noop_callback()).unwrap(); + assert!(registry.mcp_servers().is_empty()); + } + + #[test] + fn mcp_servers_returns_tool_names_and_config() { + use pctx_config::server::{HttpServerConfig, ServerConfig, ServerTransport}; + let registry = PctxRegistry::default(); + let cfg = ServerConfig { + name: "svc".to_string(), + transport: ServerTransport::Http(HttpServerConfig { + url: "http://localhost:9999".parse().unwrap(), + auth: None, + }), + }; + registry + .add_mcp(&["tool_a".to_string(), "tool_b".to_string()], cfg.clone()) + .unwrap(); + + let servers = registry.mcp_servers(); + assert_eq!(servers.len(), 1); + + let (mut tool_names, returned_cfg) = servers.into_iter().next().unwrap(); + tool_names.sort(); + assert_eq!(tool_names, vec!["tool_a", "tool_b"]); + assert_eq!(returned_cfg.name, cfg.name); + } +} diff --git a/crates/pctx_session_server/src/routes.rs b/crates/pctx_session_server/src/routes.rs index 0467a3a9..3fc5ba3a 100644 --- a/crates/pctx_session_server/src/routes.rs +++ b/crates/pctx_session_server/src/routes.rs @@ -370,45 +370,21 @@ pub(crate) async fn execute_bash( }, ))?; - // Clone for the blocking task - let code_mode_clone = code_mode.clone(); - let bash_command = request.command.clone(); - - // Execute bash command in blocking context - let output = tokio::task::spawn_blocking(move || -> Result<_, anyhow::Error> { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .map_err(|e| anyhow::anyhow!("Failed to create runtime: {e}"))?; - - rt.block_on(code_mode_clone.execute_bash(&bash_command)) - .map_err(|e| anyhow::anyhow!("Execution error: {e}")) - }) - .await; + let code_mode = state.attach_pool(code_mode); - let exec_output = match output { - Ok(Ok(result)) => result, - Ok(Err(e)) => { - return Err(ApiError::new( + let exec_output = code_mode + .execute_bash(&request.command) + .await + .map_err(|e| { + ApiError::new( StatusCode::INTERNAL_SERVER_ERROR, ErrorData { code: ErrorCode::Execution, message: format!("Bash execution failed: {e}"), details: None, }, - )); - } - Err(e) => { - return Err(ApiError::new( - StatusCode::INTERNAL_SERVER_ERROR, - ErrorData { - code: ErrorCode::Internal, - message: format!("Task join failed: {e}"), - details: None, - }, - )); - } - }; + ) + })?; Ok(Json(exec_output)) } diff --git a/crates/pctx_session_server/src/state/mod.rs b/crates/pctx_session_server/src/state/mod.rs index e6da2915..de630fad 100644 --- a/crates/pctx_session_server/src/state/mod.rs +++ b/crates/pctx_session_server/src/state/mod.rs @@ -1,5 +1,7 @@ use std::{collections::HashMap, sync::Arc}; +use pctx_code_mode::{CodeMode, ExecutorPool}; + use crate::{ LocalBackend, metadata::{NoopMetadata, SessionMetadata}, @@ -19,6 +21,9 @@ pub struct AppState { /// to [`SessionMetadata`] hooks. Implementations read whatever keys they /// need; the `pctx` binary has no opinion on their contents. pub env: Arc>, + /// Optional process pool for parallel TypeScript execution. + /// When `None`, execution falls back to the in-process single-threaded path. + pub pool: Option>, } impl AppState { @@ -28,6 +33,7 @@ impl AppState { backend: Arc::new(backend), metadata: Arc::new(NoopMetadata), env: Arc::new(std::env::vars().collect()), + pool: None, } } @@ -40,6 +46,29 @@ impl AppState { self.metadata = Arc::new(metadata); self } + + /// Attach an [`ExecutorPool`] for parallel TypeScript execution. + /// + /// When set, every `execute_typescript` / `execute_bash` call will be + /// dispatched to a worker subprocess rather than running in-process. + #[must_use] + pub fn with_executor_pool(mut self, pool: Arc) -> Self { + self.pool = Some(pool); + self + } + + /// Re-attach the pool to a freshly-fetched [`CodeMode`] instance. + /// + /// Because `executor` is `#[serde(skip)]`, it is always `None` after + /// deserialisation from the backend. Call this after every + /// `backend.get()` before passing `CodeMode` to an execution function. + pub fn attach_pool(&self, code_mode: CodeMode) -> CodeMode { + if let Some(pool) = &self.pool { + code_mode.with_executor_pool(pool.clone()) + } else { + code_mode + } + } } impl AppState { diff --git a/crates/pctx_session_server/src/websocket/handler.rs b/crates/pctx_session_server/src/websocket/handler.rs index 11ddd2e3..e6d5e2a7 100644 --- a/crates/pctx_session_server/src/websocket/handler.rs +++ b/crates/pctx_session_server/src/websocket/handler.rs @@ -29,7 +29,7 @@ use rmcp::{ }; use serde_json::json; use tokio::sync::mpsc; -use tracing::{debug, error, info, warn}; +use tracing::{Instrument, debug, error, info, warn}; use uuid::Uuid; use crate::AppState; @@ -47,7 +47,7 @@ pub async fn ws_handler( .await .unwrap_or_default() { - error!("Rejecting WebSocket connection: code mode session {code_mode_session} not found"); + warn!("Rejecting WebSocket connection: code mode session {code_mode_session} not found (stale session ID or server restarted)"); return ( StatusCode::NOT_FOUND, format!("Code mode session {code_mode_session} not found"), @@ -202,6 +202,9 @@ async fn handle_execute_code_request( debug!("Found CodeMode session with ID: {code_mode_session_id}"); + // Re-attach the executor pool (stripped by serde on backend fetch). + let code_mode = state.attach_pool(code_mode); + let execution_id = Uuid::new_v4(); // Build registry from the session's MCP servers, reusing the cached pool @@ -270,28 +273,17 @@ async fn handle_execute_code_request( execution_id = %execution_id, ); - tokio::spawn(async move { - let code_mode_clone = code_mode.clone(); + tokio::spawn( + async move { let code_to_exec = params.code.clone(); - let output = tokio::task::spawn_blocking(move || -> Result<_, anyhow::Error> { - let _guard = execution_span.enter(); - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .map_err(|e| anyhow::anyhow!("Failed to create runtime: {e}"))?; - - rt.block_on(code_mode_clone.execute_typescript( - &code_to_exec, - params.disclosure, - Some(registry), - )) - .map_err(|e| anyhow::anyhow!("Execution error: {e}")) - }) - .await; + let output = code_mode + .execute_typescript(&code_to_exec, params.disclosure, Some(registry)) + .await + .map_err(|e| anyhow::anyhow!("Execution error: {e}")); let (msg, execution_res) = match output { - Ok(Ok(exec_output)) => { + Ok(exec_output) => { if let Err(e) = state .backend .set_pool(code_mode_session_id, exec_output.registry.pool()) @@ -307,22 +299,11 @@ async fn handle_execute_code_request( Ok(exec_output), ) } - Ok(Err(e)) => ( - WsJsonRpcMessage::error( - ErrorData { - code: ErrorCode::INTERNAL_ERROR, - message: format!("Execution failed: {e}").into(), - data: None, - }, - req_id, - ), - Err(anyhow!(e)), - ), Err(e) => ( WsJsonRpcMessage::error( ErrorData { code: ErrorCode::INTERNAL_ERROR, - message: format!("Task join failed: {e}").into(), + message: format!("Execution failed: {e}").into(), data: None, }, req_id, @@ -349,7 +330,9 @@ async fn handle_execute_code_request( if let Err(e) = sender.send(msg) { error!("Failed to send response: {e}"); } - }); + } + .instrument(execution_span), + ); Ok(()) }