Skip to content

Commit 591616f

Browse files
committed
stream: writableNeedDrain
Don't write to a stream which already has a full buffer. Fixes: #35341
1 parent fb88257 commit 591616f

File tree

6 files changed

+60
-23
lines changed

6 files changed

+60
-23
lines changed

doc/api/stream.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,15 @@ This property contains the number of bytes (or objects) in the queue
573573
ready to be written. The value provides introspection data regarding
574574
the status of the `highWaterMark`.
575575

576+
##### `writable.writableNeedDrain`
577+
<!-- YAML
578+
added: REPLACEME
579+
-->
580+
581+
* {boolean}
582+
583+
Is `true` if buffer is full.
584+
576585
##### `writable.writableObjectMode`
577586
<!-- YAML
578587
added: v12.3.0

lib/_http_outgoing.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,11 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableEnded', {
649649
get: function() { return this.finished; }
650650
});
651651

652+
ObjectDefineProperty(OutgoingMessage.prototype, 'writableNeedDrain', {
653+
get: function() {
654+
return !this.destroyed && !this.finished && this[kNeedDrain];
655+
}
656+
});
652657

653658
const crlf_buf = Buffer.from('\r\n');
654659
OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {

lib/internal/streams/duplex.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ ObjectDefineProperties(Duplex.prototype, {
8787
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableCorked'),
8888
writableEnded:
8989
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableEnded'),
90+
writableNeedDrain:
91+
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableNeedDrain'),
9092

9193
destroyed: {
9294
get() {

lib/internal/streams/pipeline.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ async function pump(iterable, writable, finish) {
126126
}
127127
let error;
128128
try {
129+
if (writable.writableNeedDrain === true) {
130+
await EE.once(writable, 'drain');
131+
}
132+
129133
for await (const chunk of iterable) {
130134
if (!writable.write(chunk)) {
131135
if (writable.destroyed) return;

lib/internal/streams/readable.js

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -727,35 +727,44 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
727727
ondrain();
728728
}
729729

730+
function pause() {
731+
// If the user unpiped during `dest.write()`, it is possible
732+
// to get stuck in a permanently paused state if that write
733+
// also returned false.
734+
// => Check whether `dest` is still a piping destination.
735+
if (!cleanedUp) {
736+
if (state.pipes.length === 1 && state.pipes[0] === dest) {
737+
debug('false write response, pause', 0);
738+
state.awaitDrainWriters = dest;
739+
state.multiAwaitDrain = false;
740+
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
741+
debug('false write response, pause', state.awaitDrainWriters.size);
742+
state.awaitDrainWriters.add(dest);
743+
}
744+
src.pause();
745+
}
746+
if (!ondrain) {
747+
// When the dest drains, it reduces the awaitDrain counter
748+
// on the source. This would be more elegant with a .once()
749+
// handler in flow(), but adding and removing repeatedly is
750+
// too slow.
751+
ondrain = pipeOnDrain(src, dest);
752+
dest.on('drain', ondrain);
753+
}
754+
}
755+
730756
src.on('data', ondata);
757+
758+
if (dest.writableNeedDrain === true) {
759+
pause();
760+
}
761+
731762
function ondata(chunk) {
732763
debug('ondata');
733764
const ret = dest.write(chunk);
734765
debug('dest.write', ret);
735766
if (ret === false) {
736-
// If the user unpiped during `dest.write()`, it is possible
737-
// to get stuck in a permanently paused state if that write
738-
// also returned false.
739-
// => Check whether `dest` is still a piping destination.
740-
if (!cleanedUp) {
741-
if (state.pipes.length === 1 && state.pipes[0] === dest) {
742-
debug('false write response, pause', 0);
743-
state.awaitDrainWriters = dest;
744-
state.multiAwaitDrain = false;
745-
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
746-
debug('false write response, pause', state.awaitDrainWriters.size);
747-
state.awaitDrainWriters.add(dest);
748-
}
749-
src.pause();
750-
}
751-
if (!ondrain) {
752-
// When the dest drains, it reduces the awaitDrain counter
753-
// on the source. This would be more elegant with a .once()
754-
// handler in flow(), but adding and removing repeatedly is
755-
// too slow.
756-
ondrain = pipeOnDrain(src, dest);
757-
dest.on('drain', ondrain);
758-
}
767+
pause();
759768
}
760769
}
761770

lib/internal/streams/writable.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,14 @@ ObjectDefineProperties(Writable.prototype, {
805805
}
806806
},
807807

808+
writableNeedDrain: {
809+
get() {
810+
const wState = this._writableState;
811+
if (!wState) return false;
812+
return !this._writable.destroyed && !wState.ending && wState.needDrain;
813+
}
814+
},
815+
808816
writableHighWaterMark: {
809817
get() {
810818
return this._writableState && this._writableState.highWaterMark;

0 commit comments

Comments
 (0)