From fc8f5bfc4a92838db7952db144554f700c82ed37 Mon Sep 17 00:00:00 2001 From: Quill Agent <261289082+quillaid@users.noreply.github.com> Date: Sat, 13 Jun 2026 19:05:53 +0000 Subject: [PATCH] fix(jupyter): support IPC connection files --- cli/js/jupyter_kernel.js | 39 +++++++++----- tests/integration/jupyter_client.rs | 6 ++- tests/integration/jupyter_tests.rs | 80 ++++++++++++++++++++++++++++- 3 files changed, 109 insertions(+), 16 deletions(-) diff --git a/cli/js/jupyter_kernel.js b/cli/js/jupyter_kernel.js index 1610d20f95301d..4806068d3e105e 100644 --- a/cli/js/jupyter_kernel.js +++ b/cli/js/jupyter_kernel.js @@ -337,8 +337,19 @@ async function recvMultipart(conn) { * REP socket server (heartbeat). * For each connected peer, echo back every received message. */ -async function runHeartbeat(port, ip) { - const listener = Deno.listen({ hostname: ip, port }); +function listenOptions(info, port) { + switch (info.transport ?? "tcp") { + case "tcp": + return { hostname: info.ip, port }; + case "ipc": + return { transport: "unix", path: `${info.ip}-${port}` }; + default: + throw new TypeError(`Unsupported Jupyter transport: ${info.transport}`); + } +} + +async function runHeartbeat(info, port) { + const listener = Deno.listen(listenOptions(info, port)); while (true) { const conn = await listener.accept(); (async () => { @@ -389,9 +400,9 @@ function makeQueue() { * every connected peer via `sendAll`). */ class RouterSocket { - constructor(port, ip) { + constructor(info, port) { this.port = port; - this.ip = ip; + this.info = info; this.incoming = makeQueue(); this.peers = new Map(); // peerId (string) -> conn this._listen(); @@ -399,7 +410,7 @@ class RouterSocket { _listen() { (async () => { - const listener = Deno.listen({ hostname: this.ip, port: this.port }); + const listener = Deno.listen(listenOptions(this.info, this.port)); while (true) { const conn = await listener.accept(); this._handlePeer(conn); @@ -468,16 +479,16 @@ class RouterSocket { * Sends the same frames to all connected subscribers. */ class PubSocket { - constructor(port, ip) { + constructor(info, port) { this.port = port; - this.ip = ip; + this.info = info; this.conns = new Set(); this._listen(); } _listen() { (async () => { - const listener = Deno.listen({ hostname: this.ip, port: this.port }); + const listener = Deno.listen(listenOptions(this.info, this.port)); while (true) { const conn = await listener.accept(); (async () => { @@ -525,17 +536,17 @@ class PubSocket { async function startJupyterKernel() { const info = JSON.parse(op_jupyter_get_connection_info()); - const { ip, key, hb_port, shell_port, control_port, stdin_port, iopub_port } = + const { key, hb_port, shell_port, control_port, stdin_port, iopub_port } = info; const session = crypto.randomUUID(); // Start heartbeat (purely async, fire-and-forget) - runHeartbeat(hb_port, ip); + runHeartbeat(info, hb_port); - const shell = new RouterSocket(shell_port, ip); - const control = new RouterSocket(control_port, ip); - const iopub = new PubSocket(iopub_port, ip); - const stdin = new RouterSocket(stdin_port, ip); + const shell = new RouterSocket(info, shell_port); + const control = new RouterSocket(info, control_port); + const iopub = new PubSocket(info, iopub_port); + const stdin = new RouterSocket(info, stdin_port); let executionCount = 0; let currentParentHeader = {}; diff --git a/tests/integration/jupyter_client.rs b/tests/integration/jupyter_client.rs index 2955d46d47eca7..5985e1ea20838d 100644 --- a/tests/integration/jupyter_client.rs +++ b/tests/integration/jupyter_client.rs @@ -14,7 +14,11 @@ use zeromq::SocketSend; use zeromq::ZmqMessage; fn endpoint(addr: &str) -> String { - format!("tcp://{addr}") + if addr.contains("://") { + addr.to_string() + } else { + format!("tcp://{addr}") + } } fn frames_to_message(frames: &[Bytes]) -> ZmqMessage { diff --git a/tests/integration/jupyter_tests.rs b/tests/integration/jupyter_tests.rs index 67f5c596592092..42df5ad4c1d2f6 100644 --- a/tests/integration/jupyter_tests.rs +++ b/tests/integration/jupyter_tests.rs @@ -43,7 +43,10 @@ struct ConnectionSpec { impl ConnectionSpec { fn endpoint(&self, port: u16) -> String { - format!("{}:{}", self.ip, port) + match self.transport.as_str() { + "ipc" => format!("ipc://{}-{}", self.ip, port), + _ => format!("{}:{}", self.ip, port), + } } } @@ -82,6 +85,22 @@ impl ConnectionSpec { listeners, ) } + + #[cfg(unix)] + fn new_ipc(ip: String) -> Self { + Self { + key: "".into(), + signature_scheme: "hmac-sha256".into(), + transport: "ipc".into(), + ip, + hb_port: 1, + control_port: 2, + shell_port: 3, + stdin_port: 4, + iopub_port: 5, + kernel_name: "deno".into(), + } + } } const DELIMITER: &[u8] = b""; @@ -395,6 +414,10 @@ impl Drop for JupyterServerProcess { } async fn server_ready_on(addr: &str) -> bool { + if let Some(path) = addr.strip_prefix("ipc://") { + return tokio::fs::metadata(path).await.is_ok(); + } + matches!( timeout( Duration::from_millis(1000), @@ -425,6 +448,41 @@ async fn setup_server() -> (TestContext, ConnectionSpec, JupyterServerProcess) { setup_server_with_key("").await } +#[cfg(unix)] +async fn setup_ipc_server() +-> (TestContext, ConnectionSpec, JupyterServerProcess) { + let context = TestContextBuilder::new().use_temp_cwd().build(); + let socket_prefix = context + .temp_dir() + .path() + .join("deno-jupyter-ipc") + .to_string_lossy() + .to_string(); + let conn = ConnectionSpec::new_ipc(socket_prefix); + let conn_file = context.temp_dir().path().join("connection.json"); + conn_file.write_json(&conn); + + let mut process = context + .new_command() + .args_vec(vec![ + "jupyter", + "--kernel", + "--conn", + conn_file.to_string().as_str(), + ]) + .spawn() + .unwrap(); + + for _ in 0..40 { + if process.try_wait().unwrap().is_none() && server_ready(&conn).await { + return (context, conn, JupyterServerProcess(Some(process))); + } + tokio::time::sleep(Duration::from_millis(250)).await; + } + + panic!("Failed to start Jupyter IPC server"); +} + async fn setup_server_with_key( key: &str, ) -> (TestContext, ConnectionSpec, JupyterServerProcess) { @@ -487,6 +545,26 @@ async fn setup() -> (TestContext, JupyterClient, JupyterServerProcess) { (context, client, process) } +#[cfg(unix)] +#[test] +async fn jupyter_ipc_kernel_info() -> Result<()> { + let (_ctx, conn, _process) = setup_ipc_server().await; + let client = JupyterClient::new(&conn).await; + client.io_subscribe("").await?; + client.send_heartbeat(b"ping").await?; + let msg = client.recv_heartbeat().await?; + assert_eq!(msg, Bytes::from_static(b"ping")); + + client + .send(Control, "kernel_info_request", json!({})) + .await?; + let msg = client.recv(Control).await?; + assert_eq!(msg.header.msg_type, "kernel_info_reply"); + assert_json_subset(msg.content, json!({ "status": "ok" })); + + Ok(()) +} + #[test] async fn jupyter_heartbeat_echoes() -> Result<()> { let (_ctx, client, _process) = setup().await;