Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions cli/js/jupyter_kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,19 @@ async function recvMultipart(conn) {
* REP socket server (heartbeat).
* For each connected peer, echo back every received message.
*/
async function runHeartbeat(port, ip) {
const listener = Deno.listen({ hostname: ip, port });
function listenOptions(info, port) {
switch (info.transport ?? "tcp") {
case "tcp":
return { hostname: info.ip, port };
case "ipc":
return { transport: "unix", path: `${info.ip}-${port}` };
default:
throw new TypeError(`Unsupported Jupyter transport: ${info.transport}`);
}
}

async function runHeartbeat(info, port) {
const listener = Deno.listen(listenOptions(info, port));
while (true) {
const conn = await listener.accept();
(async () => {
Expand Down Expand Up @@ -389,17 +400,17 @@ function makeQueue() {
* every connected peer via `sendAll`).
*/
class RouterSocket {
constructor(port, ip) {
constructor(info, port) {
this.port = port;
this.ip = ip;
this.info = info;
this.incoming = makeQueue();
this.peers = new Map(); // peerId (string) -> conn
this._listen();
}

_listen() {
(async () => {
const listener = Deno.listen({ hostname: this.ip, port: this.port });
const listener = Deno.listen(listenOptions(this.info, this.port));
while (true) {
const conn = await listener.accept();
this._handlePeer(conn);
Expand Down Expand Up @@ -468,16 +479,16 @@ class RouterSocket {
* Sends the same frames to all connected subscribers.
*/
class PubSocket {
constructor(port, ip) {
constructor(info, port) {
this.port = port;
this.ip = ip;
this.info = info;
this.conns = new Set();
this._listen();
}

_listen() {
(async () => {
const listener = Deno.listen({ hostname: this.ip, port: this.port });
const listener = Deno.listen(listenOptions(this.info, this.port));
while (true) {
const conn = await listener.accept();
(async () => {
Expand Down Expand Up @@ -525,17 +536,17 @@ class PubSocket {

async function startJupyterKernel() {
const info = JSON.parse(op_jupyter_get_connection_info());
const { ip, key, hb_port, shell_port, control_port, stdin_port, iopub_port } =
const { key, hb_port, shell_port, control_port, stdin_port, iopub_port } =
info;
const session = crypto.randomUUID();

// Start heartbeat (purely async, fire-and-forget)
runHeartbeat(hb_port, ip);
runHeartbeat(info, hb_port);

const shell = new RouterSocket(shell_port, ip);
const control = new RouterSocket(control_port, ip);
const iopub = new PubSocket(iopub_port, ip);
const stdin = new RouterSocket(stdin_port, ip);
const shell = new RouterSocket(info, shell_port);
const control = new RouterSocket(info, control_port);
const iopub = new PubSocket(info, iopub_port);
const stdin = new RouterSocket(info, stdin_port);

let executionCount = 0;
let currentParentHeader = {};
Expand Down
6 changes: 5 additions & 1 deletion tests/integration/jupyter_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ use zeromq::SocketSend;
use zeromq::ZmqMessage;

fn endpoint(addr: &str) -> String {
format!("tcp://{addr}")
if addr.contains("://") {
addr.to_string()
} else {
format!("tcp://{addr}")
}
}

fn frames_to_message(frames: &[Bytes]) -> ZmqMessage {
Expand Down
80 changes: 79 additions & 1 deletion tests/integration/jupyter_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ struct ConnectionSpec {

impl ConnectionSpec {
fn endpoint(&self, port: u16) -> String {
format!("{}:{}", self.ip, port)
match self.transport.as_str() {
"ipc" => format!("ipc://{}-{}", self.ip, port),
_ => format!("{}:{}", self.ip, port),
}
}
}

Expand Down Expand Up @@ -82,6 +85,22 @@ impl ConnectionSpec {
listeners,
)
}

#[cfg(unix)]
fn new_ipc(ip: String) -> Self {
Self {
key: "".into(),
signature_scheme: "hmac-sha256".into(),
transport: "ipc".into(),
ip,
hb_port: 1,
control_port: 2,
shell_port: 3,
stdin_port: 4,
iopub_port: 5,
kernel_name: "deno".into(),
}
}
}

const DELIMITER: &[u8] = b"<IDS|MSG>";
Expand Down Expand Up @@ -395,6 +414,10 @@ impl Drop for JupyterServerProcess {
}

async fn server_ready_on(addr: &str) -> bool {
if let Some(path) = addr.strip_prefix("ipc://") {
return tokio::fs::metadata(path).await.is_ok();
}

matches!(
timeout(
Duration::from_millis(1000),
Expand Down Expand Up @@ -425,6 +448,41 @@ async fn setup_server() -> (TestContext, ConnectionSpec, JupyterServerProcess) {
setup_server_with_key("").await
}

#[cfg(unix)]
async fn setup_ipc_server()
-> (TestContext, ConnectionSpec, JupyterServerProcess) {
let context = TestContextBuilder::new().use_temp_cwd().build();
let socket_prefix = context
.temp_dir()
.path()
.join("deno-jupyter-ipc")
.to_string_lossy()
.to_string();
let conn = ConnectionSpec::new_ipc(socket_prefix);
let conn_file = context.temp_dir().path().join("connection.json");
conn_file.write_json(&conn);

let mut process = context
.new_command()
.args_vec(vec![
"jupyter",
"--kernel",
"--conn",
conn_file.to_string().as_str(),
])
.spawn()
.unwrap();

for _ in 0..40 {
if process.try_wait().unwrap().is_none() && server_ready(&conn).await {
return (context, conn, JupyterServerProcess(Some(process)));
}
tokio::time::sleep(Duration::from_millis(250)).await;
}

panic!("Failed to start Jupyter IPC server");
}

async fn setup_server_with_key(
key: &str,
) -> (TestContext, ConnectionSpec, JupyterServerProcess) {
Expand Down Expand Up @@ -487,6 +545,26 @@ async fn setup() -> (TestContext, JupyterClient, JupyterServerProcess) {
(context, client, process)
}

#[cfg(unix)]
#[test]
async fn jupyter_ipc_kernel_info() -> Result<()> {
let (_ctx, conn, _process) = setup_ipc_server().await;
let client = JupyterClient::new(&conn).await;
client.io_subscribe("").await?;
client.send_heartbeat(b"ping").await?;
let msg = client.recv_heartbeat().await?;
assert_eq!(msg, Bytes::from_static(b"ping"));

client
.send(Control, "kernel_info_request", json!({}))
.await?;
let msg = client.recv(Control).await?;
assert_eq!(msg.header.msg_type, "kernel_info_reply");
assert_json_subset(msg.content, json!({ "status": "ok" }));

Ok(())
}

#[test]
async fn jupyter_heartbeat_echoes() -> Result<()> {
let (_ctx, client, _process) = setup().await;
Expand Down