From a5626b345101d1120aac0e6e2bf5a4c187972f50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Sat, 10 Jul 2021 21:11:01 +0200 Subject: [PATCH 1/6] use whatwg streams --- README.md | 54 +++++---------------------------------- from.js | 10 +++----- index.js | 72 ++++++++++++++++++++++++++++++++++++++++++---------- package.json | 5 +++- test.js | 13 +++++++--- 5 files changed, 82 insertions(+), 72 deletions(-) diff --git a/README.md b/README.md index 5f293da..fb3e198 100644 --- a/README.md +++ b/README.md @@ -22,10 +22,8 @@ npm install fetch-blob - internal Buffer.from was replaced with TextEncoder/Decoder - internal buffers was replaced with Uint8Arrays - CommonJS was replaced with ESM - - The node stream returned by calling `blob.stream()` was replaced with a simple generator function that yields Uint8Array (Breaking change) - (Read "Differences from other blobs" for more info.) - - All of this changes have made it dependency free of any core node modules, so it would be possible to just import it using http-import from a CDN without any bundling + - The node stream returned by calling `blob.stream()` was replaced with whatwg streams + - (Read "Differences from other blobs" for more info.) @@ -36,48 +34,12 @@ npm install fetch-blob - This blob version is more arbitrary, it can be constructed with blob parts that isn't a instance of itself it has to look and behave as a blob to be accepted as a blob part. - The benefit of this is that you can create other types of blobs that don't contain any internal data that has to be read in other ways, such as the `BlobDataItem` created in `from.js` that wraps a file path into a blob-like item and read lazily (nodejs plans to [implement this][fs-blobs] as well) - - The `blob.stream()` is the most noticeable differences. It returns a AsyncGeneratorFunction that yields Uint8Arrays - - The reasoning behind `Blob.prototype.stream()` is that NodeJS readable stream - isn't spec compatible with whatwg streams and we didn't want to import the hole whatwg stream polyfill for node - or browserify NodeJS streams for the browsers and picking any flavor over the other. So we decided to opted out - of any stream and just implement the bear minium of what both streams have in common which is the asyncIterator - that both yields Uint8Array. this is the most isomorphic way with the use of `for-await-of` loops. - It would be redundant to convert anything to whatwg streams and than convert it back to - node streams since you work inside of Node. - It will probably stay like this until nodejs get native support for whatwg[1][https://github.com/nodejs/whatwg-stream] streams and whatwg stream add the node - equivalent for `Readable.from(iterable)`[2](https://github.com/whatwg/streams/issues/1018) - - But for now if you really need a Node Stream then you can do so using this transformation + - The `blob.stream()` is the most noticeable differences. It returns a WHATWG stream now. to keep it as a node stream you would have to do: + ```js import {Readable} from 'stream' const stream = Readable.from(blob.stream()) ``` - But if you don't need it to be a stream then you can just use the asyncIterator part of it that is isomorphic. - ```js - for await (const chunk of blob.stream()) { - console.log(chunk) // uInt8Array - } - ``` - If you need to make some feature detection to fix this different behavior - ```js - if (Blob.prototype.stream?.constructor?.name === 'AsyncGeneratorFunction') { - // not spec compatible, monkey patch it... - // (Alternative you could extend the Blob and use super.stream()) - let orig = Blob.prototype.stream - Blob.prototype.stream = function () { - const iterator = orig.call(this) - return new ReadableStream({ - async pull (ctrl) { - const next = await iterator.next() - return next.done ? ctrl.close() : ctrl.enqueue(next.value) - } - }) - } - } - ``` - Possible feature whatwg version: `ReadableStream.from(iterator)` - It's also possible to delete this method and instead use `.slice()` and `.arrayBuffer()` since it has both a public and private stream method ## Usage @@ -100,12 +62,8 @@ const blob = new Blob(['hello, world']) await blob.text() await blob.arrayBuffer() for await (let chunk of blob.stream()) { ... } - -// turn the async iterator into a node stream -stream.Readable.from(blob.stream()) - -// turn the async iterator into a whatwg stream (feature) -globalThis.ReadableStream.from(blob.stream()) +blob.stream().getReader().read() +blob.stream().getReader({mode: 'byob'}).read(view) ``` ### Blob part backed up by filesystem diff --git a/from.js b/from.js index fd81a1d..bd7dba3 100644 --- a/from.js +++ b/from.js @@ -86,12 +86,10 @@ class BlobDataItem { if (mtimeMs > this.lastModified) { throw new DOMException('The requested file could not be read, typically due to permission problems that have occurred after a reference to a file was acquired.', 'NotReadableError'); } - if (this.size) { - yield * createReadStream(this.#path, { - start: this.#start, - end: this.#start + this.size - 1 - }); - } + yield * createReadStream(this.#path, { + start: this.#start, + end: this.#start + this.size - 1 + }); } get [Symbol.toStringTag]() { diff --git a/index.js b/index.js index e7b419f..8eb7cca 100644 --- a/index.js +++ b/index.js @@ -1,7 +1,43 @@ + +// TODO (jimmywarting): in the feature use conditional loading with top level await (requires 14.x) +// Node has recently added whatwg stream into core, want to use that instead when it becomes avalible. + +import * as stream from 'web-streams-polyfill/dist/ponyfill.es2018.js' + +const ReadableStream = globalThis.ReadableStream || stream.ReadableStream +const ByteLengthQueuingStrategy = globalThis.ByteLengthQueuingStrategy || stream.ReadableStream + +/** @typedef {import('buffer').Blob} NodeBlob} */ + +// Fix buffer.Blob's missing stream implantation +import('buffer').then(m => { + if (m.Blob && !m.Blob.prototype.stream) { + m.Blob.prototype.stream = function name(params) { + let position = 0; + const blob = this; + const stratergy = new ByteLengthQueuingStrategy({ highWaterMark: POOL_SIZE }); + + return new ReadableStream({ + type: "bytes", + async pull(ctrl) { + const chunk = blob.slice(position, Math.min(blob.size, position + POOL_SIZE)); + const buffer = await chunk.arrayBuffer(); + position += buffer.byteLength; + ctrl.enqueue(new Uint8Array(buffer)) + + if (position === blob.size) { + ctrl.close() + } + } + }, stratergy) + } + } +}, () => {}) + // 64 KiB (same size chrome slice theirs blob into Uint8array's) const POOL_SIZE = 65536; -/** @param {(Blob | Uint8Array)[]} parts */ +/** @param {(Blob | NodeBlob | Uint8Array)[]} parts */ async function * toIterator (parts, clone = true) { for (let part of parts) { if ('stream' in part) { @@ -116,6 +152,11 @@ export default class Blob { * @return {Promise} */ async arrayBuffer() { + // Easier way... Just a unnecessary overhead + // const view = new Uint8Array(this.size); + // await this.stream().getReader({mode: 'byob'}).read(view); + // return view.buffer; + const data = new Uint8Array(this.size); let offset = 0; for await (const chunk of toIterator(this.#parts, false)) { @@ -126,14 +167,17 @@ export default class Blob { return data.buffer; } - /** - * The Blob stream() implements partial support of the whatwg stream - * by only being async iterable. - * - * @returns {AsyncGenerator} - */ - async * stream() { - yield * toIterator(this.#parts, true); + stream() { + const it = toIterator(this.#parts, true); + const stratergy = new ByteLengthQueuingStrategy({ highWaterMark: POOL_SIZE }); + + return new ReadableStream({ + type: "bytes", + async pull(ctrl) { + const chunk = await it.next(); + chunk.done ? ctrl.close() : ctrl.enqueue(chunk.value); + } + }, stratergy) } /** @@ -157,6 +201,11 @@ export default class Blob { let added = 0; for (const part of parts) { + // don't add the overflow to new blobParts + if (added >= span) { + break; + } + const size = ArrayBuffer.isView(part) ? part.byteLength : part.size; if (relativeStart && size <= relativeStart) { // Skip the beginning and change the relative @@ -174,11 +223,6 @@ export default class Blob { } blobParts.push(chunk); relativeStart = 0; // All next sequential parts should start at 0 - - // don't add the overflow to new blobParts - if (added >= span) { - break; - } } } diff --git a/package.json b/package.json index 7594f2d..ae44c87 100644 --- a/package.json +++ b/package.json @@ -75,5 +75,8 @@ "type": "paypal", "url": "https://paypal.me/jimmywarting" } - ] + ], + "dependencies": { + "web-streams-polyfill": "^3.0.3" + } } diff --git a/test.js b/test.js index a3d843b..2eed348 100644 --- a/test.js +++ b/test.js @@ -160,7 +160,7 @@ test('blob part backed up by filesystem', async t => { test('Reading after modified should fail', async t => { const blob = blobFromSync('./LICENSE'); await new Promise(resolve => { - setTimeout(resolve, 100); + setTimeout(resolve, 500); }); fs.closeSync(fs.openSync('./LICENSE', 'a')); const error = await t.throwsAsync(blob.text()); @@ -174,7 +174,7 @@ test('Reading after modified should fail', async t => { // The lastModifiedDate is deprecated and removed from spec t.false('lastModifiedDate' in file); const mod = file.lastModified - Date.now(); - t.true(mod <= 0 && mod >= -100); // Close to tolerance: 0.100ms + t.true(mod <= 0 && mod >= -500); // Close to tolerance: 0.500ms }); test('Reading file after modified should fail', async t => { @@ -241,7 +241,7 @@ test('Parts are immutable', async t => { test('Blobs are immutable', async t => { const buf = new Uint8Array([97]); const blob = new Blob([buf]); - const chunk = await blob.stream().next(); + const chunk = await blob.stream().getReader().read(); t.is(chunk.value[0], 97); chunk.value[0] = 98; t.is(await blob.text(), 'a'); @@ -344,3 +344,10 @@ test('new File() throws with too few args', t => { message: 'Failed to construct \'File\': 2 arguments required, but only 0 present.' }); }); + +test('can slice zero sized blobs', async t => { + // const blob = fileFromPathSync('./package.json') // or + const blob = new Blob(); + const txt = await blob.slice(0, 0).text(); + t.is(txt, ''); +}); From c281ad21f0b32f5c02048e35da6f55be3492c664 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Sat, 10 Jul 2021 21:51:03 +0200 Subject: [PATCH 2/6] rm comment --- test.js | 1 - 1 file changed, 1 deletion(-) diff --git a/test.js b/test.js index 2eed348..4865aeb 100644 --- a/test.js +++ b/test.js @@ -346,7 +346,6 @@ test('new File() throws with too few args', t => { }); test('can slice zero sized blobs', async t => { - // const blob = fileFromPathSync('./package.json') // or const blob = new Blob(); const txt = await blob.slice(0, 0).text(); t.is(txt, ''); From 10e2a25be8712343064cbae1c22939ec66492bdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Thu, 15 Jul 2021 22:29:20 +0200 Subject: [PATCH 3/6] could not use optional changing yet --- index.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/index.js b/index.js index 8eb7cca..28042d4 100644 --- a/index.js +++ b/index.js @@ -239,7 +239,8 @@ export default class Blob { static [Symbol.hasInstance](object) { return ( - typeof object?.constructor === 'function' && + typeof object === 'object' && + typeof object.constructor === 'function' && ( typeof object.stream === 'function' || typeof object.arrayBuffer === 'function' From 817fd03490950c0d64dfcc2849bab811517b6238 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Thu, 15 Jul 2021 22:31:39 +0200 Subject: [PATCH 4/6] check option too --- index.js | 1 + 1 file changed, 1 insertion(+) diff --git a/index.js b/index.js index 28042d4..6979248 100644 --- a/index.js +++ b/index.js @@ -239,6 +239,7 @@ export default class Blob { static [Symbol.hasInstance](object) { return ( + object && typeof object === 'object' && typeof object.constructor === 'function' && ( From cfb51571b1f11eb461901dd4763e219a6d494a4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Thu, 15 Jul 2021 22:35:55 +0200 Subject: [PATCH 5/6] could not use fs/promise in v12 either --- from.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/from.js b/from.js index bd7dba3..02f0828 100644 --- a/from.js +++ b/from.js @@ -1,10 +1,11 @@ -import {statSync, createReadStream} from 'fs'; -import {stat} from 'fs/promises'; +import {statSync, createReadStream, promises as fs} from 'fs'; import {basename} from 'path'; import File from './file.js'; import Blob from './index.js'; import {MessageChannel} from 'worker_threads'; +const {stat} = fs + const DOMException = globalThis.DOMException || (() => { const port = new MessageChannel().port1 const ab = new ArrayBuffer(0) From dbfefe8115577dc86eb44475e47d6d4e91c5d3e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Thu, 15 Jul 2021 22:41:53 +0200 Subject: [PATCH 6/6] cuz it never happend --- from.js | 2 +- index.js | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/from.js b/from.js index 02f0828..22408fb 100644 --- a/from.js +++ b/from.js @@ -4,7 +4,7 @@ import File from './file.js'; import Blob from './index.js'; import {MessageChannel} from 'worker_threads'; -const {stat} = fs +const {stat} = fs; const DOMException = globalThis.DOMException || (() => { const port = new MessageChannel().port1 diff --git a/index.js b/index.js index 6979248..9a453f6 100644 --- a/index.js +++ b/index.js @@ -56,6 +56,7 @@ async function * toIterator (parts, clone = true) { yield part; } } else { + /* c8 ignore start */ // For blobs that have arrayBuffer but no stream method (nodes buffer.Blob) let position = 0; while (position !== part.size) { @@ -64,6 +65,7 @@ async function * toIterator (parts, clone = true) { position += buffer.byteLength; yield new Uint8Array(buffer); } + /* c8 ignore end */ } } }