From b1b9c9df441c633c50d5819d9b0eb9276a5f27c6 Mon Sep 17 00:00:00 2001 From: Alexx Date: Sat, 16 May 2026 23:23:58 +0100 Subject: [PATCH] fix: absorb late socket errors after cleanup --- broker/client.test.ts | 76 +++++++++++++++++++++++++++++++++++++++++++ broker/client.ts | 2 ++ package.json | 2 +- 3 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 broker/client.test.ts diff --git a/broker/client.test.ts b/broker/client.test.ts new file mode 100644 index 0000000..ef9b649 --- /dev/null +++ b/broker/client.test.ts @@ -0,0 +1,76 @@ +import assert from "node:assert/strict"; +import { EventEmitter } from "node:events"; +import net from "node:net"; +import { test } from "node:test"; + +interface Registration { + cwd: string; + model: string; + pid: number; + startedAt: number; + lastActivity: number; +} + +class FakeSocket extends EventEmitter { + destroyed = false; + writable = true; + writableEnded = false; + writes: Buffer[] = []; + + write(chunk: Uint8Array | string): boolean { + this.writes.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + return true; + } + + destroy(): this { + this.destroyed = true; + this.writable = false; + this.writableEnded = true; + return this; + } + + end(): this { + this.writableEnded = true; + this.emit("close"); + return this; + } +} + +function registration(): Registration { + return { + cwd: process.cwd(), + model: "test", + pid: process.pid, + startedAt: Date.now(), + lastActivity: Date.now(), + }; +} + +test("connect keeps a guard for late socket errors after cleanup", async () => { + const originalConnect = net.connect; + let socket: FakeSocket | undefined; + + try { + (net as typeof net & { connect: () => FakeSocket }).connect = () => { + socket = new FakeSocket(); + return socket; + }; + + const { IntercomClient } = await import("./client.ts"); + const client = new IntercomClient(); + const firstError = Object.assign(new Error("first reset"), { code: "ECONNRESET" }); + const lateError = Object.assign(new Error("late reset"), { code: "ECONNRESET" }); + + const connectPromise = client.connect(registration()).catch((error: Error) => error); + assert.ok(socket); + + socket.emit("error", firstError); + assert.equal((await connectPromise).message, "first reset"); + assert.equal(socket.destroyed, true); + assert.equal(socket.listenerCount("error"), 1); + + assert.doesNotThrow(() => socket?.emit("error", lateError)); + } finally { + (net as typeof net & { connect: typeof originalConnect }).connect = originalConnect; + } +}); diff --git a/broker/client.ts b/broker/client.ts index a6647a7..14ac43c 100644 --- a/broker/client.ts +++ b/broker/client.ts @@ -251,6 +251,8 @@ export class IntercomClient extends EventEmitter { socket.on("close", onClose); socket.on("error", onSocketError); + // Keep a permanent guard for late read errors emitted during or after destroy(). + socket.on("error", () => {}); this.once("_registered", onRegistered); try { diff --git a/package.json b/package.json index 5538cb1..5f1086d 100644 --- a/package.json +++ b/package.json @@ -14,7 +14,7 @@ "skills/**/*" ], "scripts": { - "test": "tsx --test broker/paths.test.ts broker/spawn.test.ts reply-tracker.test.ts intercom.integration.test.ts test/inline-message.test.ts" + "test": "tsx --test broker/client.test.ts broker/paths.test.ts broker/spawn.test.ts reply-tracker.test.ts intercom.integration.test.ts test/inline-message.test.ts" }, "keywords": [ "pi-package"