Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/eleven-bulldogs-fetch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@sveltejs/kit": patch
---

Pause Node streams as necessary when converting to ReadableStream
74 changes: 44 additions & 30 deletions packages/kit/src/node/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ function get_raw_body(req) {
return null;
}

if (req.destroyed) {
const readable = new ReadableStream();
readable.cancel();
return readable;
}

let size = 0;
let cancelled = false;

Expand All @@ -28,31 +34,34 @@ function get_raw_body(req) {
});

req.on('end', () => {
if (!cancelled) {
controller.close();
}
if (cancelled) return;
controller.close();
});
},

pull(controller) {
return new Promise((fulfil) => {
req.once('data', (chunk) => {
if (!cancelled) {
size += chunk.length;
if (size > length) {
controller.error(new Error('content-length exceeded'));
}
req.on('data', (chunk) => {
if (cancelled) return;

size += chunk.length;
if (size > length) {
controller.error(new Error('content-length exceeded'));
return;
}

controller.enqueue(chunk);
}
controller.enqueue(chunk);

fulfil();
});
if (controller.desiredSize === null || controller.desiredSize <= 0) {
req.pause();
}
});
},

cancel() {
pull() {
req.resume();
},

cancel(reason) {
cancelled = true;
req.destroy(reason);
}
});
}
Expand Down Expand Up @@ -104,16 +113,26 @@ export async function setResponse(res, response) {
res.writeHead(response.status, headers);

if (response.body) {
let cancelled = false;

const reader = response.body.getReader();

if (res.destroyed) {
reader.cancel();
return;
}

let cancelled = false;

res.on('close', () => {
reader.cancel();
cancelled = true;
});

const next = async () => {
res.on('error', (error) => {
reader.cancel(error);
cancelled = true;
});

for (;;) {
const { done, value } = await reader.read();

if (cancelled) return;
Expand All @@ -123,17 +142,12 @@ export async function setResponse(res, response) {
return;
}

res.write(Buffer.from(value), (error) => {
if (error) {
console.error('Error writing stream', error);
res.end();
} else {
next();
}
});
};
const ok = res.write(value);

next();
if (!ok) {
await new Promise((fulfil) => res.once('drain', fulfil));
}
}
} else {
res.end();
}
Expand Down
18 changes: 18 additions & 0 deletions packages/kit/test/apps/basics/src/routes/endpoint-input/sha256.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { createHash } from 'node:crypto';

/** @type {import('@sveltejs/kit').RequestHandler} */
export async function PUT({ request }) {
const hash = createHash('sha256');
const reader = request.body.getReader();

for (;;) {
const { done, value } = await reader.read();
if (done) break;
hash.update(value);
await new Promise((r) => setTimeout(r, 10));
}

return {
body: hash.digest('base64url')
};
}
36 changes: 29 additions & 7 deletions packages/kit/test/apps/basics/src/routes/endpoint-output/stream.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,36 @@
import { createHash, randomBytes } from 'node:crypto';

/** @type {import('@sveltejs/kit').RequestHandler} */
export function GET() {
const data = randomBytes(1024 * 256);
const digest = createHash('sha256').update(data).digest('base64url');

let length = 0;

return {
headers: {
'content-type': 'application/octet-stream'
'content-type': 'application/octet-stream',
digest: `sha-256=${digest}`
},
body: new ReadableStream({
start: (controller) => {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.close();
}
})
body: new ReadableStream(
{
pull(controller) {
const offset = data.byteOffset + length;
const chunk =
data.byteLength - length > controller.desiredSize
? new Uint8Array(data.buffer, offset, controller.desiredSize)
: new Uint8Array(data.buffer, offset);

controller.enqueue(chunk);

length += chunk.byteLength;

if (length >= data.byteLength) {
controller.close();
}
}
},
{ highWaterMark: 1024 * 16 }
)
};
}
11 changes: 10 additions & 1 deletion packages/kit/test/apps/basics/test/server.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { expect } from '@playwright/test';
import { start_server, test } from '../../../utils.js';
import { createHash, randomBytes } from 'node:crypto';

/** @typedef {import('@playwright/test').Response} Response */

Expand Down Expand Up @@ -273,7 +274,15 @@ test.describe('Endpoints', () => {
test('body can be a binary ReadableStream', async ({ request }) => {
const response = await request.get('/endpoint-output/stream');
const body = await response.body();
expect(Array.from(body)).toEqual([1, 2, 3]);
const digest = createHash('sha256').update(body).digest('base64url');
expect(response.headers()['digest']).toEqual(`sha-256=${digest}`);
});

test('request body can be read slow', async ({ request }) => {
const data = randomBytes(1024 * 256);
const digest = createHash('sha256').update(data).digest('base64url');
const response = await request.put('/endpoint-input/sha256', { data });
expect(await response.text()).toEqual(digest);
});
});

Expand Down