Skip to content

Commit 30c3b22

Browse files
committed
stream: fix deadlock when pipeing to full sink
When piping a paused Readable to a full Writable we didn't register a drain listener which cause the src to never resume. Refs: #48666
1 parent 8244e6c commit 30c3b22

File tree

2 files changed

+33
-0
lines changed

2 files changed

+33
-0
lines changed

lib/internal/streams/readable.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -756,8 +756,12 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
756756
debug('false write response, pause', state.awaitDrainWriters.size);
757757
state.awaitDrainWriters.add(dest);
758758
}
759+
759760
src.pause();
760761
}
762+
}
763+
764+
function registerDrain () {
761765
if (!ondrain) {
762766
// When the dest drains, it reduces the awaitDrain counter
763767
// on the source. This would be more elegant with a .once()
@@ -775,6 +779,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
775779
debug('dest.write', ret);
776780
if (ret === false) {
777781
pause();
782+
registerDrain();
778783
}
779784
}
780785

@@ -825,6 +830,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
825830
if (state.flowing) {
826831
pause();
827832
}
833+
registerDrain();
828834
} else if (!state.flowing) {
829835
debug('pipe resume');
830836
src.resume();
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
"use strict";
2+
3+
const common = require('../common');
4+
const { Readable, Writable } = require('stream');
5+
6+
// https://github.com/nodejs/node/issues/48666
7+
(async () => {
8+
// Prepare src that is internally ended, with buffered data pending
9+
const src = new Readable({ read() {} });
10+
src.push(Buffer.alloc(100));
11+
src.push(null);
12+
src.pause();
13+
14+
// Give it time to settle
15+
await new Promise((resolve) => setImmediate(resolve));
16+
17+
const dst = new Writable({
18+
highWaterMark: 1000,
19+
write(buf, enc, cb) {
20+
process.nextTick(cb);
21+
}
22+
});
23+
24+
dst.write(Buffer.alloc(1000)); // Fill write buffer
25+
dst.on('finish', common.mustCall());
26+
src.pipe(dst);
27+
})();

0 commit comments

Comments
 (0)