Skip to content

Commit 73df09e

Browse files
committed
stream: support dispose in writable
1 parent 71d7707 commit 73df09e

File tree

4 files changed

+75
-23
lines changed

4 files changed

+75
-23
lines changed

lib/internal/streams/writable.js

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const {
3434
ObjectSetPrototypeOf,
3535
StringPrototypeToLowerCase,
3636
Symbol,
37+
SymbolAsyncDispose,
3738
SymbolHasInstance,
3839
} = primordials;
3940

@@ -44,6 +45,7 @@ const EE = require('events');
4445
const Stream = require('internal/streams/legacy').Stream;
4546
const { Buffer } = require('buffer');
4647
const destroyImpl = require('internal/streams/destroy');
48+
const eos = require('internal/streams/end-of-stream');
4749

4850
const {
4951
addAbortSignal,
@@ -54,16 +56,19 @@ const {
5456
getDefaultHighWaterMark,
5557
} = require('internal/streams/state');
5658
const {
57-
ERR_INVALID_ARG_TYPE,
58-
ERR_METHOD_NOT_IMPLEMENTED,
59-
ERR_MULTIPLE_CALLBACK,
60-
ERR_STREAM_CANNOT_PIPE,
61-
ERR_STREAM_DESTROYED,
62-
ERR_STREAM_ALREADY_FINISHED,
63-
ERR_STREAM_NULL_VALUES,
64-
ERR_STREAM_WRITE_AFTER_END,
65-
ERR_UNKNOWN_ENCODING,
66-
} = require('internal/errors').codes;
59+
AbortError,
60+
codes: {
61+
ERR_INVALID_ARG_TYPE,
62+
ERR_METHOD_NOT_IMPLEMENTED,
63+
ERR_MULTIPLE_CALLBACK,
64+
ERR_STREAM_CANNOT_PIPE,
65+
ERR_STREAM_DESTROYED,
66+
ERR_STREAM_ALREADY_FINISHED,
67+
ERR_STREAM_NULL_VALUES,
68+
ERR_STREAM_WRITE_AFTER_END,
69+
ERR_UNKNOWN_ENCODING,
70+
},
71+
} = require('internal/errors');
6772

6873
const { errorOrDestroy } = destroyImpl;
6974

@@ -477,7 +482,7 @@ function onwrite(stream, er) {
477482
// rather just increase a counter, to improve performance and avoid
478483
// memory allocations.
479484
if (state.afterWriteTickInfo !== null &&
480-
state.afterWriteTickInfo.cb === cb) {
485+
state.afterWriteTickInfo.cb === cb) {
481486
state.afterWriteTickInfo.count++;
482487
} else {
483488
state.afterWriteTickInfo = { count: 1, cb, stream, state };
@@ -538,9 +543,9 @@ function errorBuffer(state) {
538543
// If there's something in the buffer waiting, then process it.
539544
function clearBuffer(stream, state) {
540545
if (state.corked ||
541-
state.bufferProcessing ||
542-
state.destroyed ||
543-
!state.constructed) {
546+
state.bufferProcessing ||
547+
state.destroyed ||
548+
!state.constructed) {
544549
return;
545550
}
546551

@@ -661,15 +666,15 @@ Writable.prototype.end = function(chunk, encoding, cb) {
661666

662667
function needFinish(state) {
663668
return (state.ending &&
664-
!state.destroyed &&
665-
state.constructed &&
666-
state.length === 0 &&
667-
!state.errored &&
668-
state.buffered.length === 0 &&
669-
!state.finished &&
670-
!state.writing &&
671-
!state.errorEmitted &&
672-
!state.closeEmitted);
669+
!state.destroyed &&
670+
state.constructed &&
671+
state.length === 0 &&
672+
!state.errored &&
673+
state.buffered.length === 0 &&
674+
!state.finished &&
675+
!state.writing &&
676+
!state.errorEmitted &&
677+
!state.closeEmitted);
673678
}
674679

675680
function callFinal(stream, state) {
@@ -934,3 +939,12 @@ Writable.fromWeb = function(writableStream, options) {
934939
Writable.toWeb = function(streamWritable) {
935940
return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable);
936941
};
942+
943+
Writable.prototype[SymbolAsyncDispose] = function() {
944+
let error;
945+
if (!this.destroyed) {
946+
error = this.readableEnded ? null : new AbortError();
947+
this.destroy(error);
948+
}
949+
return new Promise((resolve, reject) => eos(this, (err) => (err && err !== error ? reject(err) : resolve(null))));
950+
};

test/parallel/test-stream-duplex-destroy.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,3 +255,17 @@ const assert = require('assert');
255255
duplex.on('close', common.mustCall());
256256
controller.abort();
257257
}
258+
{
259+
// Check Symbol.asyncDispose
260+
const duplex = new Duplex({
261+
write(chunk, enc, cb) { cb(); },
262+
read() {},
263+
});
264+
let count = 0;
265+
duplex.on('error', common.mustCall((e) => {
266+
assert.strictEqual(count++, 0); // Ensure not called twice
267+
assert.strictEqual(e.name, 'AbortError');
268+
}));
269+
duplex.on('close', common.mustCall());
270+
duplex[Symbol.asyncDispose]().then(common.mustCall());
271+
}

test/parallel/test-stream-transform-destroy.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,14 @@ const assert = require('assert');
141141

142142
transform.destroy();
143143
}
144+
145+
{
146+
const transform = new Transform({
147+
transform(chunk, enc, cb) {}
148+
});
149+
transform.on('error', common.mustCall((err) => {
150+
assert.strictEqual(err.name, "AbortError");
151+
}));
152+
transform.on('clocse', common.mustCall());
153+
transform[Symbol.asyncDispose]().then(common.mustCall());
154+
}

test/parallel/test-stream-writable-destroy.js

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,3 +487,16 @@ const assert = require('assert');
487487
}));
488488
s.destroy(_err);
489489
}
490+
491+
{
492+
493+
const write = new Writable({
494+
write(chunk, enc, cb) { cb(); }
495+
});
496+
497+
write.on('error', common.mustCall((e) => {
498+
assert.strictEqual(e.name, 'AbortError');
499+
assert.strictEqual(write.destroyed, true);
500+
}));
501+
write[Symbol.asyncDispose]().then(common.mustCall());
502+
}

0 commit comments

Comments
 (0)