diff --git a/README.md b/README.md
index 5f293da..fb3e198 100644
--- a/README.md
+++ b/README.md
@@ -22,10 +22,8 @@ npm install fetch-blob
- internal Buffer.from was replaced with TextEncoder/Decoder
- internal buffers was replaced with Uint8Arrays
- CommonJS was replaced with ESM
- - The node stream returned by calling `blob.stream()` was replaced with a simple generator function that yields Uint8Array (Breaking change)
- (Read "Differences from other blobs" for more info.)
-
- All of this changes have made it dependency free of any core node modules, so it would be possible to just import it using http-import from a CDN without any bundling
+ - The node stream returned by calling `blob.stream()` was replaced with whatwg streams
+ - (Read "Differences from other blobs" for more info.)
@@ -36,48 +34,12 @@ npm install fetch-blob
- This blob version is more arbitrary, it can be constructed with blob parts that isn't a instance of itself
it has to look and behave as a blob to be accepted as a blob part.
- The benefit of this is that you can create other types of blobs that don't contain any internal data that has to be read in other ways, such as the `BlobDataItem` created in `from.js` that wraps a file path into a blob-like item and read lazily (nodejs plans to [implement this][fs-blobs] as well)
- - The `blob.stream()` is the most noticeable differences. It returns a AsyncGeneratorFunction that yields Uint8Arrays
-
- The reasoning behind `Blob.prototype.stream()` is that NodeJS readable stream
- isn't spec compatible with whatwg streams and we didn't want to import the hole whatwg stream polyfill for node
- or browserify NodeJS streams for the browsers and picking any flavor over the other. So we decided to opted out
- of any stream and just implement the bear minium of what both streams have in common which is the asyncIterator
- that both yields Uint8Array. this is the most isomorphic way with the use of `for-await-of` loops.
- It would be redundant to convert anything to whatwg streams and than convert it back to
- node streams since you work inside of Node.
- It will probably stay like this until nodejs get native support for whatwg[1][https://github.com/nodejs/whatwg-stream] streams and whatwg stream add the node
- equivalent for `Readable.from(iterable)`[2](https://github.com/whatwg/streams/issues/1018)
-
- But for now if you really need a Node Stream then you can do so using this transformation
+ - The `blob.stream()` is the most noticeable differences. It returns a WHATWG stream now. to keep it as a node stream you would have to do:
+
```js
import {Readable} from 'stream'
const stream = Readable.from(blob.stream())
```
- But if you don't need it to be a stream then you can just use the asyncIterator part of it that is isomorphic.
- ```js
- for await (const chunk of blob.stream()) {
- console.log(chunk) // uInt8Array
- }
- ```
- If you need to make some feature detection to fix this different behavior
- ```js
- if (Blob.prototype.stream?.constructor?.name === 'AsyncGeneratorFunction') {
- // not spec compatible, monkey patch it...
- // (Alternative you could extend the Blob and use super.stream())
- let orig = Blob.prototype.stream
- Blob.prototype.stream = function () {
- const iterator = orig.call(this)
- return new ReadableStream({
- async pull (ctrl) {
- const next = await iterator.next()
- return next.done ? ctrl.close() : ctrl.enqueue(next.value)
- }
- })
- }
- }
- ```
- Possible feature whatwg version: `ReadableStream.from(iterator)`
- It's also possible to delete this method and instead use `.slice()` and `.arrayBuffer()` since it has both a public and private stream method
## Usage
@@ -100,12 +62,8 @@ const blob = new Blob(['hello, world'])
await blob.text()
await blob.arrayBuffer()
for await (let chunk of blob.stream()) { ... }
-
-// turn the async iterator into a node stream
-stream.Readable.from(blob.stream())
-
-// turn the async iterator into a whatwg stream (feature)
-globalThis.ReadableStream.from(blob.stream())
+blob.stream().getReader().read()
+blob.stream().getReader({mode: 'byob'}).read(view)
```
### Blob part backed up by filesystem
diff --git a/from.js b/from.js
index fd81a1d..22408fb 100644
--- a/from.js
+++ b/from.js
@@ -1,10 +1,11 @@
-import {statSync, createReadStream} from 'fs';
-import {stat} from 'fs/promises';
+import {statSync, createReadStream, promises as fs} from 'fs';
import {basename} from 'path';
import File from './file.js';
import Blob from './index.js';
import {MessageChannel} from 'worker_threads';
+const {stat} = fs;
+
const DOMException = globalThis.DOMException || (() => {
const port = new MessageChannel().port1
const ab = new ArrayBuffer(0)
@@ -86,12 +87,10 @@ class BlobDataItem {
if (mtimeMs > this.lastModified) {
throw new DOMException('The requested file could not be read, typically due to permission problems that have occurred after a reference to a file was acquired.', 'NotReadableError');
}
- if (this.size) {
- yield * createReadStream(this.#path, {
- start: this.#start,
- end: this.#start + this.size - 1
- });
- }
+ yield * createReadStream(this.#path, {
+ start: this.#start,
+ end: this.#start + this.size - 1
+ });
}
get [Symbol.toStringTag]() {
diff --git a/index.js b/index.js
index e7b419f..9a453f6 100644
--- a/index.js
+++ b/index.js
@@ -1,7 +1,43 @@
+
+// TODO (jimmywarting): in the feature use conditional loading with top level await (requires 14.x)
+// Node has recently added whatwg stream into core, want to use that instead when it becomes avalible.
+
+import * as stream from 'web-streams-polyfill/dist/ponyfill.es2018.js'
+
+const ReadableStream = globalThis.ReadableStream || stream.ReadableStream
+const ByteLengthQueuingStrategy = globalThis.ByteLengthQueuingStrategy || stream.ReadableStream
+
+/** @typedef {import('buffer').Blob} NodeBlob} */
+
+// Fix buffer.Blob's missing stream implantation
+import('buffer').then(m => {
+ if (m.Blob && !m.Blob.prototype.stream) {
+ m.Blob.prototype.stream = function name(params) {
+ let position = 0;
+ const blob = this;
+ const stratergy = new ByteLengthQueuingStrategy({ highWaterMark: POOL_SIZE });
+
+ return new ReadableStream({
+ type: "bytes",
+ async pull(ctrl) {
+ const chunk = blob.slice(position, Math.min(blob.size, position + POOL_SIZE));
+ const buffer = await chunk.arrayBuffer();
+ position += buffer.byteLength;
+ ctrl.enqueue(new Uint8Array(buffer))
+
+ if (position === blob.size) {
+ ctrl.close()
+ }
+ }
+ }, stratergy)
+ }
+ }
+}, () => {})
+
// 64 KiB (same size chrome slice theirs blob into Uint8array's)
const POOL_SIZE = 65536;
-/** @param {(Blob | Uint8Array)[]} parts */
+/** @param {(Blob | NodeBlob | Uint8Array)[]} parts */
async function * toIterator (parts, clone = true) {
for (let part of parts) {
if ('stream' in part) {
@@ -20,6 +56,7 @@ async function * toIterator (parts, clone = true) {
yield part;
}
} else {
+ /* c8 ignore start */
// For blobs that have arrayBuffer but no stream method (nodes buffer.Blob)
let position = 0;
while (position !== part.size) {
@@ -28,6 +65,7 @@ async function * toIterator (parts, clone = true) {
position += buffer.byteLength;
yield new Uint8Array(buffer);
}
+ /* c8 ignore end */
}
}
}
@@ -116,6 +154,11 @@ export default class Blob {
* @return {Promise}
*/
async arrayBuffer() {
+ // Easier way... Just a unnecessary overhead
+ // const view = new Uint8Array(this.size);
+ // await this.stream().getReader({mode: 'byob'}).read(view);
+ // return view.buffer;
+
const data = new Uint8Array(this.size);
let offset = 0;
for await (const chunk of toIterator(this.#parts, false)) {
@@ -126,14 +169,17 @@ export default class Blob {
return data.buffer;
}
- /**
- * The Blob stream() implements partial support of the whatwg stream
- * by only being async iterable.
- *
- * @returns {AsyncGenerator}
- */
- async * stream() {
- yield * toIterator(this.#parts, true);
+ stream() {
+ const it = toIterator(this.#parts, true);
+ const stratergy = new ByteLengthQueuingStrategy({ highWaterMark: POOL_SIZE });
+
+ return new ReadableStream({
+ type: "bytes",
+ async pull(ctrl) {
+ const chunk = await it.next();
+ chunk.done ? ctrl.close() : ctrl.enqueue(chunk.value);
+ }
+ }, stratergy)
}
/**
@@ -157,6 +203,11 @@ export default class Blob {
let added = 0;
for (const part of parts) {
+ // don't add the overflow to new blobParts
+ if (added >= span) {
+ break;
+ }
+
const size = ArrayBuffer.isView(part) ? part.byteLength : part.size;
if (relativeStart && size <= relativeStart) {
// Skip the beginning and change the relative
@@ -174,11 +225,6 @@ export default class Blob {
}
blobParts.push(chunk);
relativeStart = 0; // All next sequential parts should start at 0
-
- // don't add the overflow to new blobParts
- if (added >= span) {
- break;
- }
}
}
@@ -195,7 +241,9 @@ export default class Blob {
static [Symbol.hasInstance](object) {
return (
- typeof object?.constructor === 'function' &&
+ object &&
+ typeof object === 'object' &&
+ typeof object.constructor === 'function' &&
(
typeof object.stream === 'function' ||
typeof object.arrayBuffer === 'function'
diff --git a/package.json b/package.json
index 7594f2d..ae44c87 100644
--- a/package.json
+++ b/package.json
@@ -75,5 +75,8 @@
"type": "paypal",
"url": "https://paypal.me/jimmywarting"
}
- ]
+ ],
+ "dependencies": {
+ "web-streams-polyfill": "^3.0.3"
+ }
}
diff --git a/test.js b/test.js
index a3d843b..4865aeb 100644
--- a/test.js
+++ b/test.js
@@ -160,7 +160,7 @@ test('blob part backed up by filesystem', async t => {
test('Reading after modified should fail', async t => {
const blob = blobFromSync('./LICENSE');
await new Promise(resolve => {
- setTimeout(resolve, 100);
+ setTimeout(resolve, 500);
});
fs.closeSync(fs.openSync('./LICENSE', 'a'));
const error = await t.throwsAsync(blob.text());
@@ -174,7 +174,7 @@ test('Reading after modified should fail', async t => {
// The lastModifiedDate is deprecated and removed from spec
t.false('lastModifiedDate' in file);
const mod = file.lastModified - Date.now();
- t.true(mod <= 0 && mod >= -100); // Close to tolerance: 0.100ms
+ t.true(mod <= 0 && mod >= -500); // Close to tolerance: 0.500ms
});
test('Reading file after modified should fail', async t => {
@@ -241,7 +241,7 @@ test('Parts are immutable', async t => {
test('Blobs are immutable', async t => {
const buf = new Uint8Array([97]);
const blob = new Blob([buf]);
- const chunk = await blob.stream().next();
+ const chunk = await blob.stream().getReader().read();
t.is(chunk.value[0], 97);
chunk.value[0] = 98;
t.is(await blob.text(), 'a');
@@ -344,3 +344,9 @@ test('new File() throws with too few args', t => {
message: 'Failed to construct \'File\': 2 arguments required, but only 0 present.'
});
});
+
+test('can slice zero sized blobs', async t => {
+ const blob = new Blob();
+ const txt = await blob.slice(0, 0).text();
+ t.is(txt, '');
+});