Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 38 additions & 4 deletions crates/pctx/src/commands/mcp/start.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
use anyhow::Result;
use clap::Parser;
use pctx_code_mode::CodeMode;
use pctx_code_mode::{CodeMode, ExecutorPool, PoolConfig};
use pctx_config::Config;
use tracing::info;
use std::sync::Arc;
use tracing::{info, warn};

use pctx_mcp_server::PctxMcpServer;

#[derive(Debug, Clone, Parser)]
pub struct StartCmd {
/// Port to listen on
#[arg(short, long, default_value = "8080")]
#[arg(short, long, default_value = "8080", env = "PCTX_PORT")]
pub port: u16,

/// Host address to bind to (use 0.0.0.0 for external access)
#[arg(long, default_value = "127.0.0.1")]
#[arg(long, default_value = "127.0.0.1", env = "PCTX_HOST")]
pub host: String,

/// Don't show the server banner
Expand All @@ -27,6 +28,12 @@ pub struct StartCmd {
/// Use stateful MCP sessions (incompatible with --stdio)
#[arg(long, conflicts_with = "stdio")]
pub stateful_http: bool,

/// Number of worker processes in the executor pool.
/// Defaults to the number of logical CPUs, capped at 8.
/// Set to 0 to disable the pool and run in-process.
#[arg(long, env = "PCTX_WORKERS")]
pub workers: Option<usize>,
}

impl StartCmd {
Expand All @@ -53,7 +60,34 @@ impl StartCmd {
);
}

let worker_count = self.workers.unwrap_or_else(|| {
std::thread::available_parallelism()
.map(|n| n.get().min(8))
.unwrap_or(4)
});

let code_mode = StartCmd::load_code_mode(&cfg).await?;
let code_mode = if worker_count == 0 {
info!("Executor pool disabled (--workers 0), running in-process");
code_mode
} else {
match PoolConfig::from_current_exe(worker_count) {
Ok(pool_cfg) => match ExecutorPool::new(pool_cfg).await {
Ok(pool) => {
info!("Executor pool ready ({worker_count} workers)");
code_mode.with_executor_pool(Arc::new(pool))
}
Err(e) => {
warn!("Failed to start executor pool, falling back to in-process execution: {e}");
code_mode
}
},
Err(e) => {
warn!("Could not locate worker binary, falling back to in-process execution: {e}");
code_mode
}
}
};

let pctx_mcp = PctxMcpServer::new(&cfg, code_mode)
.with_banner(!self.no_banner)
Expand Down
44 changes: 40 additions & 4 deletions crates/pctx/src/commands/start.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use anyhow::Result;
use camino::Utf8PathBuf;
use clap::Parser;
use pctx_code_mode::{ExecutorPool, PoolConfig};
use pctx_session_server::{AppState, start_server};
use std::sync::Arc;
use tabled::{
Table,
builder::Builder,
Expand All @@ -13,7 +15,7 @@ use tabled::{
},
};
use terminal_size::terminal_size;
use tracing::info;
use tracing::{info, warn};
use url::Url;

use crate::utils::styles::fmt_dimmed;
Expand All @@ -23,11 +25,11 @@ const LOGO: &str = include_str!("../../../../assets/ascii-logo.txt");
#[derive(Debug, Clone, Parser)]
pub struct StartCmd {
/// Port to listen on
#[arg(short, long, default_value = "8080")]
#[arg(short, long, default_value = "8080", env = "PCTX_PORT")]
pub port: u16,

/// Host address to bind to (use 0.0.0.0 for external access)
#[arg(long, default_value = "127.0.0.1")]
#[arg(long, default_value = "127.0.0.1", env = "PCTX_HOST")]
pub host: String,

/// Path to session storage directory
Expand All @@ -42,14 +44,41 @@ pub struct StartCmd {
#[arg(long = "allowed-origin")]
pub allowed_origins: Vec<Url>,

/// Number of worker processes in the executor pool.
/// Defaults to the number of logical CPUs, capped at 8.
/// Set to 0 to disable the pool and run in-process.
#[arg(long, env = "PCTX_WORKERS")]
pub workers: Option<usize>,

/// Don't show the server banner
#[arg(long)]
pub no_banner: bool,
}

impl StartCmd {
pub(crate) async fn handle(&self) -> Result<()> {
let state = AppState::new_local();
let worker_count = self.workers.unwrap_or_else(default_workers);
let state = if worker_count == 0 {
info!("Executor pool disabled (--workers 0), running in-process");
AppState::new_local()
} else {
match PoolConfig::from_current_exe(worker_count) {
Ok(pool_cfg) => match ExecutorPool::new(pool_cfg).await {
Ok(pool) => {
info!("Executor pool ready ({worker_count} workers)");
AppState::new_local().with_executor_pool(Arc::new(pool))
}
Err(e) => {
warn!("Failed to start executor pool, falling back to in-process execution: {e}");
AppState::new_local()
}
},
Err(e) => {
warn!("Could not locate worker binary, falling back to in-process execution: {e}");
AppState::new_local()
}
}
};

self.print_banner();

Expand Down Expand Up @@ -133,3 +162,10 @@ impl StartCmd {
info!("pctx agent server listening at {rest_url}...");
}
}

/// Default worker count: logical CPUs, capped at 8.
fn default_workers() -> usize {
std::thread::available_parallelism()
.map(|n| n.get().min(8))
.unwrap_or(4)
}
78 changes: 71 additions & 7 deletions crates/pctx_code_mode/src/code_mode.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use pctx_codegen::{Tool, ToolSet};
use pctx_config::{ToolDisclosure, server::ServerConfig};
use pctx_executor::ExecutorPool;
use pctx_registry::PctxRegistry;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use tracing::{debug, info, instrument, warn};
Expand All @@ -28,11 +30,34 @@ pub struct CodeMode {

// Virtual filesystem for just-bash exploration
virtual_fs: HashMap<String, String>,

/// Optional process pool for parallel TypeScript execution.
///
/// When set, `execute_typescript` and `execute_bash` dispatch to a worker
/// process instead of acquiring the process-wide `V8_MUTEX`, allowing N
/// concurrent executions where N is the pool size.
///
/// Skipped during serialisation — pools must be re-attached after
/// deserialising a `CodeMode`.
#[serde(skip)]
executor: Option<Arc<ExecutorPool>>,
}

impl CodeMode {
// --------------- Builder functions ---------------

/// Attach a process pool so that `execute_typescript` and `execute_bash`
/// dispatch to worker processes instead of serialising through the
/// process-wide V8 lock.
///
/// The pool must outlive this `CodeMode` (hence `Arc`). Call this after
/// construction, before any executions.
#[must_use]
pub fn with_executor_pool(mut self, pool: Arc<ExecutorPool>) -> Self {
self.executor = Some(pool);
self
}

pub async fn with_server(mut self, server: &ServerConfig) -> Result<Self> {
self.add_server(server).await?;
Ok(self)
Expand Down Expand Up @@ -180,6 +205,13 @@ impl CodeMode {
tool_set.tools.len()
);

// Explicitly cancel the transport task before dropping so the OS-level
// TCP socket and epoll registration are released immediately rather than
// waiting for the Tokio task to be scheduled and exit on its own.
// Under concurrent load, many pending-cancellation tasks accumulate and
// exhaust the process file-descriptor limit (EMFILE / os error 24).
mcp_client.cancellation_token().cancel();

Ok((tool_set, server.clone()))
}

Expand Down Expand Up @@ -504,8 +536,24 @@ export default result;"#,

debug!(to_execute = %to_execute, "Executing bash in sandbox");

let execution_res =
pctx_executor::execute(&to_execute, pctx_executor::ExecuteOptions::new()).await?;
let execution_res = if let Some(pool) = &self.executor {
pool.execute(&to_execute, pctx_executor::ExecuteOptions::new())
.await?
} else {
tokio::task::spawn_blocking(move || -> Result<_> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| Error::Message(format!("Failed to create Tokio runtime: {e}")))?;
rt.block_on(pctx_executor::execute(
&to_execute,
pctx_executor::ExecuteOptions::new(),
))
.map_err(Into::into)
})
.await
.map_err(|e| Error::Message(format!("Task join error: {e}")))??
};

// Extract stdout and stderr from the bash result object
// The output field contains the result object: { stdout, stderr, exitCode }
Expand Down Expand Up @@ -666,11 +714,27 @@ export default result;"#,

debug!(to_execute = %to_execute, "Executing TypeScript in sandbox");

let execution_res = pctx_executor::execute(
&to_execute,
pctx_executor::ExecuteOptions::new().with_registry(registry),
)
.await?;
let execution_res = if let Some(pool) = &self.executor {
pool.execute(
&to_execute,
pctx_executor::ExecuteOptions::new().with_registry(registry),
)
.await?
} else {
tokio::task::spawn_blocking(move || -> Result<_> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| Error::Message(format!("Failed to create Tokio runtime: {e}")))?;
rt.block_on(pctx_executor::execute(
&to_execute,
pctx_executor::ExecuteOptions::new().with_registry(registry),
))
.map_err(Into::into)
})
.await
.map_err(|e| Error::Message(format!("Task join error: {e}")))??
};

if execution_res.success {
debug!("TypeScript execution completed successfully");
Expand Down
1 change: 1 addition & 0 deletions crates/pctx_code_mode/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ pub use code_mode::CodeMode;
// Re-export config, codegen, registry, crates
pub use pctx_codegen as codegen;
pub use pctx_config as config;
pub use pctx_executor::{ExecutorPool, PoolConfig};
pub use pctx_registry as registry;

pub type Result<T> = std::result::Result<T, Error>;
Expand Down
7 changes: 7 additions & 0 deletions crates/pctx_executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ repository = "https://github.com/portofcontext/pctx"
keywords = ["typescript", "executor", "pctx"]
categories = ["development-tools"]

[[bin]]
name = "pctx_worker"
path = "src/bin/worker.rs"

[dependencies]
serde = { workspace = true }
serde_json = { workspace = true }
Expand All @@ -20,6 +24,9 @@ pctx_deno_transpiler = { version = "^0.1.1", path = "../pctx_deno_transpiler" }
pctx_code_execution_runtime = { version = "^0.2.0", path = "../pctx_code_execution_runtime" }
pctx_registry = { version = "^0.1.1", path = "../pctx_registry" }
pctx_type_check_runtime = { version = "^0.1.3", path = "../pctx_type_check_runtime" }
pctx_config = { version = "^0.1.5", path = "../pctx_config" }
tokio = { workspace = true, features = ["rt", "io-util", "sync", "process", "macros"] }
uuid = { workspace = true }
tempfile = "3"
regex = "1"
thiserror = { workspace = true }
Expand Down
Loading