From bbee0161dbef654132daf003f4336b3a80158835 Mon Sep 17 00:00:00 2001 From: francesco Date: Mon, 13 Apr 2026 16:00:31 +0200 Subject: [PATCH 1/4] feat: handling backpressure without timer --- src/response.js | 44 +++++++++++++++++--------------------------- 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/src/response.js b/src/response.js index cc081ab2..ac07d678 100644 --- a/src/response.js +++ b/src/response.js @@ -72,8 +72,7 @@ module.exports = class Response extends Writable { #socket = null; #ended = false; #pendingChunks = []; - #lastWriteChunkTime = 0; - #writeTimeout = null; + #flushScheduled = false; req; constructor(res, req, app) { super(); @@ -158,32 +157,21 @@ module.exports = class Response extends Writable { if (this.chunkedTransfer) { this.#pendingChunks.push(chunk); - const size = this.#pendingChunks.reduce((acc, chunk) => acc + chunk.byteLength, 0); - const now = performance.now(); - // the first chunk is sent immediately (!this.#lastWriteChunkTime) - // the other chunks are sent when watermark is reached (size >= HIGH_WATERMARK) - // or if elapsed 50ms of last send (now - this.#lastWriteChunkTime > 50) - if (!this.#lastWriteChunkTime || size >= HIGH_WATERMARK || now - this.#lastWriteChunkTime > 50) { - this._res.write(Buffer.concat(this.#pendingChunks, size)); - this.#pendingChunks = []; - this.#lastWriteChunkTime = now; - if(this.#writeTimeout) { - clearTimeout(this.#writeTimeout); - this.#writeTimeout = null; - } - } else if(!this.#writeTimeout) { - this.#writeTimeout = setTimeout(() => { - this.#writeTimeout = null; - if(!this.finished && !this.aborted) this._res.cork(() => { - if(this.#pendingChunks.length) { - const size = this.#pendingChunks.reduce((acc, chunk) => acc + chunk.byteLength, 0); - this._res.write(Buffer.concat(this.#pendingChunks, size)); + if (!this.#flushScheduled) { + this.#flushScheduled = true; + queueMicrotask(() => { + this.#flushScheduled = false; + if (this.finished || this.aborted || !this.#pendingChunks.length) return; + this._res.cork(() => { + if (this.#pendingChunks.length) { + const buf = this.#pendingChunks.length === 1 + ? this.#pendingChunks[0] + : Buffer.concat(this.#pendingChunks); this.#pendingChunks = []; - this.#lastWriteChunkTime = performance.now(); + this._res.write(buf); } }); - }, 50); - this.#writeTimeout.unref(); + }); } this.writingChunk = false; callback(null); @@ -313,9 +301,11 @@ module.exports = class Response extends Writable { this._res.endWithoutBody(contentLength.toString()); } else { if(this.#pendingChunks.length) { - this._res.write(Buffer.concat(this.#pendingChunks)); + const buf = this.#pendingChunks.length === 1 + ? this.#pendingChunks[0] + : Buffer.concat(this.#pendingChunks); this.#pendingChunks = []; - this.lastWriteChunkTime = 0; + this._res.write(buf); } if(data instanceof Buffer) { data = data.buffer.slice(data.byteOffset, data.byteOffset + data.byteLength); From 3eebf6ce7988ee37bbb9f59985b28d38392dabbb Mon Sep 17 00:00:00 2001 From: francesco Date: Mon, 13 Apr 2026 16:09:59 +0200 Subject: [PATCH 2/4] fix: brotli --- src/response.js | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/response.js b/src/response.js index ac07d678..904a355b 100644 --- a/src/response.js +++ b/src/response.js @@ -157,20 +157,19 @@ module.exports = class Response extends Writable { if (this.chunkedTransfer) { this.#pendingChunks.push(chunk); + const size = this.#pendingChunks.reduce((acc, c) => acc + c.byteLength, 0); + if (size >= HIGH_WATERMARK) { + this._res.write(Buffer.concat(this.#pendingChunks, size)); + this.#pendingChunks = []; + } if (!this.#flushScheduled) { this.#flushScheduled = true; queueMicrotask(() => { this.#flushScheduled = false; if (this.finished || this.aborted || !this.#pendingChunks.length) return; - this._res.cork(() => { - if (this.#pendingChunks.length) { - const buf = this.#pendingChunks.length === 1 - ? this.#pendingChunks[0] - : Buffer.concat(this.#pendingChunks); - this.#pendingChunks = []; - this._res.write(buf); - } - }); + const size = this.#pendingChunks.reduce((acc, c) => acc + c.byteLength, 0); + this._res.write(Buffer.concat(this.#pendingChunks, size)); + this.#pendingChunks = []; }); } this.writingChunk = false; From 26f7823ad59f4a911f56eef403186e76c8ff27c7 Mon Sep 17 00:00:00 2001 From: francesco Date: Mon, 13 Apr 2026 16:23:55 +0200 Subject: [PATCH 3/4] feat: add test for compression --- tests/test-compression-server.js | 34 ++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 tests/test-compression-server.js diff --git a/tests/test-compression-server.js b/tests/test-compression-server.js new file mode 100644 index 00000000..b2804349 --- /dev/null +++ b/tests/test-compression-server.js @@ -0,0 +1,34 @@ +const express = require('../src/index.js'); +const compression = require('compression'); +const app = express(); + +// force encoding by query param ?compress= +app.use((req, res, next) => { + const enc = req.query.compress; + if (enc) { + req.headers['accept-encoding'] = enc; + } + next(); +}); + +app.use(compression({ threshold: 1 })); +app.use(express.static('tests/parts')); + +const PORT = 13335; +app.listen(PORT, () => { + const base = `http://localhost:${PORT}`; + const files = ['small-file.json', 'medium-file.json', 'large-file.json']; + const encodings = ['gzip', 'br', 'deflate']; + + console.log(`\n Server on ${base}\n`); + console.log(' === Links ===\n'); + + for (const file of files) { + console.log(` ${file}`); + console.log(` ${'identity'.padEnd(8)} : ${base}/${file}?compress=identity`); + for (const enc of encodings) { + console.log(` ${enc.padEnd(8)} : ${base}/${file}?compress=${enc}`); + } + console.log(); + } +}); From f624dd2f52fb3585f57628e2091975465c429890 Mon Sep 17 00:00:00 2001 From: francesco Date: Mon, 13 Apr 2026 16:35:43 +0200 Subject: [PATCH 4/4] fix: uws warning --- src/response.js | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/response.js b/src/response.js index 904a355b..65fcc833 100644 --- a/src/response.js +++ b/src/response.js @@ -164,13 +164,17 @@ module.exports = class Response extends Writable { } if (!this.#flushScheduled) { this.#flushScheduled = true; - queueMicrotask(() => { + setTimeout(() => { this.#flushScheduled = false; if (this.finished || this.aborted || !this.#pendingChunks.length) return; - const size = this.#pendingChunks.reduce((acc, c) => acc + c.byteLength, 0); - this._res.write(Buffer.concat(this.#pendingChunks, size)); - this.#pendingChunks = []; - }); + this._res.cork(() => { + if (this.#pendingChunks.length) { + const s = this.#pendingChunks.reduce((acc, c) => acc + c.byteLength, 0); + this._res.write(Buffer.concat(this.#pendingChunks, s)); + this.#pendingChunks = []; + } + }); + }, 0); } this.writingChunk = false; callback(null);