Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions broker/client.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import assert from "node:assert/strict";
import { EventEmitter } from "node:events";
import net from "node:net";
import { test } from "node:test";

interface Registration {
cwd: string;
model: string;
pid: number;
startedAt: number;
lastActivity: number;
}

class FakeSocket extends EventEmitter {
destroyed = false;
writable = true;
writableEnded = false;
writes: Buffer[] = [];

write(chunk: Uint8Array | string): boolean {
this.writes.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
return true;
}

destroy(): this {
this.destroyed = true;
this.writable = false;
this.writableEnded = true;
return this;
}

end(): this {
this.writableEnded = true;
this.emit("close");
return this;
}
}

function registration(): Registration {
return {
cwd: process.cwd(),
model: "test",
pid: process.pid,
startedAt: Date.now(),
lastActivity: Date.now(),
};
}

test("connect keeps a guard for late socket errors after cleanup", async () => {
const originalConnect = net.connect;
let socket: FakeSocket | undefined;

try {
(net as typeof net & { connect: () => FakeSocket }).connect = () => {
socket = new FakeSocket();
return socket;
};

const { IntercomClient } = await import("./client.ts");
const client = new IntercomClient();
const firstError = Object.assign(new Error("first reset"), { code: "ECONNRESET" });
const lateError = Object.assign(new Error("late reset"), { code: "ECONNRESET" });

const connectPromise = client.connect(registration()).catch((error: Error) => error);
assert.ok(socket);

socket.emit("error", firstError);
assert.equal((await connectPromise).message, "first reset");
assert.equal(socket.destroyed, true);
assert.equal(socket.listenerCount("error"), 1);

assert.doesNotThrow(() => socket?.emit("error", lateError));
} finally {
(net as typeof net & { connect: typeof originalConnect }).connect = originalConnect;
}
});
2 changes: 2 additions & 0 deletions broker/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ export class IntercomClient extends EventEmitter {
socket.on("close", onClose);

socket.on("error", onSocketError);
// Keep a permanent guard for late read errors emitted during or after destroy().
socket.on("error", () => {});
this.once("_registered", onRegistered);

try {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"skills/**/*"
],
"scripts": {
"test": "tsx --test broker/paths.test.ts broker/spawn.test.ts reply-tracker.test.ts intercom.integration.test.ts test/inline-message.test.ts"
"test": "tsx --test broker/client.test.ts broker/paths.test.ts broker/spawn.test.ts reply-tracker.test.ts intercom.integration.test.ts test/inline-message.test.ts"
},
"keywords": [
"pi-package"
Expand Down