Skip to content

Commit e66c4de

Browse files
ronagTrott
authored andcommitted
stream: invoke buffered write callbacks on error
Buffered write callbacks were only invoked upon error if `autoDestroy` was invoked. PR-URL: #30596 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Rich Trott <[email protected]>
1 parent 0a958f4 commit e66c4de

File tree

2 files changed

+69
-6
lines changed

2 files changed

+69
-6
lines changed

lib/_stream_writable.js

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,11 @@ function onwriteError(stream, state, er, cb) {
458458
--state.pendingcb;
459459

460460
cb(er);
461+
// Ensure callbacks are invoked even when autoDestroy is
462+
// not enabled. Passing `er` here doesn't make sense since
463+
// it's related to one specific write, not to the buffered
464+
// writes.
465+
errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
461466
// This can emit error, but error must always follow cb.
462467
errorOrDestroy(stream, er);
463468
}
@@ -531,9 +536,29 @@ function afterWrite(stream, state, count, cb) {
531536
cb();
532537
}
533538

539+
if (state.destroyed) {
540+
errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
541+
}
542+
534543
finishMaybe(stream, state);
535544
}
536545

546+
// If there's something in the buffer waiting, then invoke callbacks.
547+
function errorBuffer(state, err) {
548+
if (state.writing || !state.bufferedRequest) {
549+
return;
550+
}
551+
552+
for (let entry = state.bufferedRequest; entry; entry = entry.next) {
553+
const len = state.objectMode ? 1 : entry.chunk.length;
554+
state.length -= len;
555+
entry.callback(err);
556+
}
557+
state.bufferedRequest = null;
558+
state.lastBufferedRequest = null;
559+
state.bufferedRequestCount = 0;
560+
}
561+
537562
// If there's something in the buffer waiting, then process it
538563
function clearBuffer(stream, state) {
539564
state.bufferProcessing = true;
@@ -820,12 +845,7 @@ const destroy = destroyImpl.destroy;
820845
Writable.prototype.destroy = function(err, cb) {
821846
const state = this._writableState;
822847
if (!state.destroyed) {
823-
for (let entry = state.bufferedRequest; entry; entry = entry.next) {
824-
process.nextTick(entry.callback, new ERR_STREAM_DESTROYED('write'));
825-
}
826-
state.bufferedRequest = null;
827-
state.lastBufferedRequest = null;
828-
state.bufferedRequestCount = 0;
848+
process.nextTick(errorBuffer, state, new ERR_STREAM_DESTROYED('write'));
829849
}
830850
destroy.call(this, err, cb);
831851
return this;

test/parallel/test-stream-writable-destroy.js

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,3 +364,46 @@ const assert = require('assert');
364364
}));
365365
write.destroy(new Error('asd'));
366366
}
367+
368+
{
369+
// Call buffered write callback with error
370+
371+
const write = new Writable({
372+
write(chunk, enc, cb) {
373+
process.nextTick(cb, new Error('asd'));
374+
},
375+
autoDestroy: false
376+
});
377+
write.cork();
378+
write.write('asd', common.mustCall((err) => {
379+
assert.strictEqual(err.message, 'asd');
380+
}));
381+
write.write('asd', common.mustCall((err) => {
382+
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
383+
}));
384+
write.on('error', common.mustCall((err) => {
385+
assert.strictEqual(err.message, 'asd');
386+
}));
387+
write.uncork();
388+
}
389+
390+
{
391+
// Ensure callback order.
392+
393+
let state = 0;
394+
const write = new Writable({
395+
write(chunk, enc, cb) {
396+
// `setImmediate()` is used on purpose to ensure the callback is called
397+
// after `process.nextTick()` callbacks.
398+
setImmediate(cb);
399+
}
400+
});
401+
write.write('asd', common.mustCall(() => {
402+
assert.strictEqual(state++, 0);
403+
}));
404+
write.write('asd', common.mustCall((err) => {
405+
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
406+
assert.strictEqual(state++, 1);
407+
}));
408+
write.destroy();
409+
}

0 commit comments

Comments
 (0)