diff --git a/README.md b/README.md index 37b67cc1..19be3321 100644 --- a/README.md +++ b/README.md @@ -114,6 +114,26 @@ try { ``` +You can provide a custom stream using the `transformRequest` option. This is useful for environments like Google Cloud Functions where the request body has already been consumed: + +```js +const { Readable } = require('node:stream') + +// In environments like Google Cloud Functions/Firebase, `request.rawBody` +// contains the full raw request body buffer/string. +fastify.register(require('@fastify/multipart'), { + transformRequest: (request) => Readable.from(request.rawBody) +}) + +// In a standard Fastify/Node.js server, `request.raw` is already a readable +// stream for the incoming HTTP request, so you can usually omit +// `transformRequest` entirely or simply return `request.raw`: +// +// fastify.register(require('@fastify/multipart'), { +// transformRequest: (request) => request.raw +// }) +``` + Additionally, you can pass per-request options to the `req.file`, `req.files`, `req.saveRequestFiles` or `req.parts` function. ```js diff --git a/index.js b/index.js index 283510dc..415a1661 100644 --- a/index.js +++ b/index.js @@ -10,7 +10,7 @@ const { generateId } = require('./lib/generateId') const createError = require('@fastify/error') const streamToNull = require('./lib/stream-consumer') const deepmergeAll = require('@fastify/deepmerge')({ all: true }) -const { PassThrough, Readable } = require('node:stream') +const { PassThrough, Readable, pipeline } = require('node:stream') const { pipeline: pump } = require('node:stream/promises') const secureJSON = require('secure-json-parse') @@ -26,6 +26,7 @@ const InvalidMultipartContentTypeError = createError('FST_INVALID_MULTIPART_CONT const InvalidJSONFieldError = createError('FST_INVALID_JSON_FIELD_ERROR', 'a request field is not a valid JSON as declared by its Content-Type', 406) const FileBufferNotFoundError = createError('FST_FILE_BUFFER_NOT_FOUND', 'the file buffer was not found', 500) const NoFormData = createError('FST_NO_FORM_DATA', 'FormData is not available', 500) +const InvalidTransformRequestError = createError('FST_INVALID_TRANSFORM_REQUEST', 'transformRequest must return a readable stream', 500) function setMultipart (req, _payload, done) { req[kMultipart] = true @@ -171,6 +172,8 @@ function fastifyMultipart (fastify, options, done) { ? options.throwFileSizeLimit : true + const transformRequest = typeof options.transformRequest === 'function' ? options.transformRequest : (request) => request + fastify.decorate('multipartErrors', { PartsLimitError, FilesLimitError, @@ -297,7 +300,12 @@ function fastifyMultipart (fastify, options, done) { process.nextTick(() => cleanup(err)) }) - request.pipe(bb) + const stream = transformRequest(request) + if (!stream || typeof stream.pipe !== 'function') throw new InvalidTransformRequestError() + + pipeline(stream, bb, (err) => { + cleanup(err) + }) function onField (name, fieldValue, fieldnameTruncated, valueTruncated, encoding, contentType) { // don't overwrite prototypes @@ -435,8 +443,6 @@ function fastifyMultipart (fastify, options, done) { } function cleanup (err) { - request.unpipe(bb) - if ((err || request.aborted) && currentFile) { currentFile.destroy() currentFile = null diff --git a/test/multipart-transform-request.test.js b/test/multipart-transform-request.test.js new file mode 100644 index 00000000..f310c365 --- /dev/null +++ b/test/multipart-transform-request.test.js @@ -0,0 +1,396 @@ +'use strict' + +const test = require('node:test') +const { Transform } = require('node:stream') +const Fastify = require('fastify') +const multipart = require('..') +const http = require('node:http') + +const boundary = '----TestBoundary' +const payload = Buffer.from( + `--${boundary}\r\n` + + 'Content-Disposition: form-data; name="file"; filename="test.txt"\r\n' + + 'Content-Type: text/plain\r\n' + + '\r\n' + + 'test content\r\n' + + `--${boundary}--\r\n` +) + +test('should transformRequest called when option passed', function (t, done) { + t.plan(4) + + let transformRequestCalled = false + const fastify = Fastify() + t.after(() => fastify.close()) + + fastify.register(multipart, { + transformRequest: (request) => { + transformRequestCalled = true + return request + } + }) + + fastify.post('/', async function (req, reply) { + const file = await req.file() + t.assert.strictEqual(file.filename, 'test.txt') + const content = await file.toBuffer() + t.assert.strictEqual(content.toString(), 'test content') + + t.assert.ok(transformRequestCalled, 'transformRequest should have been called') + reply.code(200).send() + }) + + fastify.listen({ port: 0 }, function () { + const opts = { + protocol: 'http:', + hostname: 'localhost', + port: fastify.server.address().port, + path: '/', + headers: { 'content-type': `multipart/form-data; boundary=${boundary}` }, + method: 'POST' + } + + const req = http.request(opts, (res) => { + t.assert.strictEqual(res.statusCode, 200) + res.resume() + res.on('end', () => { + done() + }) + }) + + req.end(payload) + }) +}) + +test('should transform the request stream', function (t, done) { + t.plan(3) + const fastify = Fastify() + t.after(() => fastify.close()) + + fastify.register(multipart, { + transformRequest: (request) => { + // custom transfrom function to change content to uppercase + const upperCaseTr = new Transform({ + transform (chunk, encoding, callback) { + callback(null, chunk.toString().toUpperCase()) + } + }) + return request.pipe(upperCaseTr) + } + }) + + fastify.post('/', async (req) => { + const file = await req.file() + t.assert.strictEqual(file.filename, 'TEST.TXT') + + const content = await file.toBuffer() + t.assert.strictEqual(content.toString().trim(), 'TEST CONTENT') + + return { ok: true } + }) + + fastify.listen({ port: 0 }, () => { + const req = http.request({ + port: fastify.server.address().port, + method: 'POST', + headers: { 'content-type': `multipart/form-data; boundary=${boundary.toUpperCase()}` } + }, (res) => { + t.assert.strictEqual(res.statusCode, 200) + res.resume() + res.on('end', done) + }) + req.end(payload) + }) +}) + +test('should handle transformRequest throwing an error', function (t, done) { + t.plan(2) + + const fastify = Fastify() + t.after(() => fastify.close()) + + fastify.register(multipart, { + transformRequest: (request) => { + throw new Error('transformRequest failed') + } + }) + + fastify.post('/', async function (req, reply) { + const file = await req.file() + await file.toBuffer() + reply.code(200).send() + }) + + fastify.listen({ port: 0 }, function () { + const opts = { + protocol: 'http:', + hostname: 'localhost', + port: fastify.server.address().port, + path: '/', + headers: { 'content-type': `multipart/form-data; boundary=${boundary}` }, + method: 'POST' + } + + const req = http.request(opts, (res) => { + t.assert.strictEqual(res.statusCode, 500) + res.resume() + res.on('end', () => { + t.assert.ok('res ended successfully') + done() + }) + }) + + req.end(payload) + }) +}) + +test('should throw transformRequest returning undefined', function (t, done) { + t.plan(3) + + const fastify = Fastify() + t.after(() => fastify.close()) + + fastify.register(multipart, { + transformRequest: (request) => { + return undefined + } + }) + + fastify.post('/', async function (req, reply) { + t.assert.ok(req.isMultipart()) + + try { + const file = await req.file() + await file.toBuffer() + reply.code(200).send() + } catch (error) { + t.assert.strictEqual(error.code, 'FST_INVALID_TRANSFORM_REQUEST') + reply.code(500).send() + } + }) + + fastify.listen({ port: 0 }, function () { + const opts = { + protocol: 'http:', + hostname: 'localhost', + port: fastify.server.address().port, + path: '/', + headers: { 'content-type': `multipart/form-data; boundary=${boundary}` }, + method: 'POST' + } + + const req = http.request(opts, (res) => { + t.assert.strictEqual(res.statusCode, 500) + res.resume() + res.on('end', done) + }) + + req.end(payload) + }) +}) + +test('should throw transformRequest returning null', function (t, done) { + t.plan(3) + + const fastify = Fastify() + t.after(() => fastify.close()) + + fastify.register(multipart, { + transformRequest: (request) => { + return null + } + }) + + fastify.post('/', async function (req, reply) { + t.assert.ok(req.isMultipart()) + + try { + const file = await req.file() + await file.toBuffer() + reply.code(200).send() + } catch (error) { + t.assert.strictEqual(error.code, 'FST_INVALID_TRANSFORM_REQUEST') + reply.code(500).send() + } + }) + + fastify.listen({ port: 0 }, function () { + const opts = { + protocol: 'http:', + hostname: 'localhost', + port: fastify.server.address().port, + path: '/', + headers: { 'content-type': `multipart/form-data; boundary=${boundary}` }, + method: 'POST' + } + + const req = http.request(opts, (res) => { + t.assert.strictEqual(res.statusCode, 500) + res.resume() + res.on('end', done) + }) + + req.end(payload) + }) +}) + +test('should throw transformRequest returning non-streaming data', function (t, done) { + t.plan(3) + + const fastify = Fastify() + t.after(() => fastify.close()) + + fastify.register(multipart, { + transformRequest: (request) => { + return 'non-streaming data' + } + }) + + fastify.post('/', async function (req, reply) { + t.assert.ok(req.isMultipart()) + + try { + const file = await req.file() + await file.toBuffer() + reply.code(200).send() + } catch (error) { + t.assert.strictEqual(error.code, 'FST_INVALID_TRANSFORM_REQUEST') + reply.code(500).send() + } + }) + + fastify.listen({ port: 0 }, function () { + const opts = { + protocol: 'http:', + hostname: 'localhost', + port: fastify.server.address().port, + path: '/', + headers: { 'content-type': `multipart/form-data; boundary=${boundary}` }, + method: 'POST' + } + + const req = http.request(opts, (res) => { + t.assert.strictEqual(res.statusCode, 500) + res.resume() + res.on('end', done) + }) + + req.end(payload) + }) +}) + +test('should handle transformed stream closing prematurely', function (t, done) { + t.plan(3) + + const largeContent = Buffer.alloc(1024, 'x').toString() + const largePayload = Buffer.from( + `--${boundary}\r\n` + + 'Content-Disposition: form-data; name="file"; filename="large.txt"\r\n' + + 'Content-Type: text/plain\r\n' + + '\r\n' + + largeContent + '\r\n' + + `--${boundary}--\r\n` + ) + + const fastify = Fastify() + t.after(() => fastify.close()) + + fastify.register(multipart, { + transformRequest: (request) => { + const prematureStream = new Transform({ + transform (chunk, encoding, callback) { + process.nextTick(() => this.destroy()) + callback(null, chunk) + } + }) + return request.pipe(prematureStream) + } + }) + + fastify.post('/', async function (req, reply) { + t.assert.ok(req.isMultipart()) + + try { + const file = await req.file() + await file.toBuffer() + reply.code(200).send() + } catch (error) { + t.assert.strictEqual(error.code, 'ERR_STREAM_PREMATURE_CLOSE') + reply.code(500).send() + } + }) + + fastify.listen({ port: 0 }, function () { + let finished = false + const safeDone = () => { + if (!finished) { + finished = true + done() + } + } + + const req = http.request({ + port: fastify.server.address().port, + method: 'POST', + headers: { 'content-type': `multipart/form-data; boundary=${boundary}` } + }, (res) => { + t.assert.strictEqual(res.statusCode, 500) + res.resume() + res.on('end', done) + }) + + req.on('error', (err) => { + if (!finished) { + t.assert.ok(err.message.includes('EPIPE') || err.message.includes('reset')) + safeDone() + } + }) + + req.end(largePayload) + }) +}) + +test('should handle transformed stream emitting an error during processing', function (t, done) { + t.plan(3) + + const fastify = Fastify() + t.after(() => fastify.close()) + + fastify.register(multipart, { + transformRequest: (request) => { + const errStream = new Transform({ + transform (chunk, encoding, callback) { + callback(new Error('stream processing failed')) + } + }) + return request.pipe(errStream) + } + }) + + fastify.post('/', async function (req, reply) { + t.assert.ok(req.isMultipart()) + + try { + const file = await req.file() + await file.toBuffer() + reply.code(200).send() + } catch (error) { + t.assert.strictEqual(error.message, 'stream processing failed') + reply.code(500).send() + } + }) + + fastify.listen({ port: 0 }, function () { + const req = http.request({ + port: fastify.server.address().port, + method: 'POST', + headers: { 'content-type': `multipart/form-data; boundary=${boundary}` } + }, (res) => { + t.assert.strictEqual(res.statusCode, 500) + res.resume() + res.on('end', done) + }) + + req.end(payload) + }) +}) diff --git a/types/index.d.ts b/types/index.d.ts index 67f7cbc0..f5696140 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -163,6 +163,16 @@ declare namespace fastifyMultipart { */ parts?: number; }; + + /** + * Provide a custom stream for multipart parsing + * + * By default, the request stream is used directly + * + * Useful for environments like Google Cloud Functions + * where the request body has already been consumed + */ + transformRequest?: (request: Readable) => Readable; } export interface FastifyMultipartOptions extends FastifyMultipartBaseOptions { diff --git a/types/index.test-d.ts b/types/index.test-d.ts index 078cee6c..f6226550 100644 --- a/types/index.test-d.ts +++ b/types/index.test-d.ts @@ -2,7 +2,7 @@ import fastify from 'fastify' import fastifyMultipart, { MultipartValue, MultipartFields, MultipartFile } from '..' import * as util from 'node:util' -import { pipeline } from 'node:stream' +import { pipeline, Readable } from 'node:stream' import * as fs from 'node:fs' import { expectError, expectType } from 'tsd' import { FastifyErrorConstructor } from '@fastify/error' @@ -19,6 +19,10 @@ const runServer = async () => { limits: { parts: 500 }, + transformRequest: (request) => { + expectType(request) + return request + }, onFile: (part: MultipartFile) => { console.log(part) }