Skip to content

Commit 7ed1e12

Browse files
committed
stream: fix timing relative to promises
PR-URL: nodejs#51070
1 parent bb2dd0e commit 7ed1e12

File tree

2 files changed

+51
-10
lines changed

2 files changed

+51
-10
lines changed

lib/internal/streams/destroy.js

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,13 @@ function _destroy(self, err, cb) {
112112
}
113113

114114
if (err) {
115-
process.nextTick(emitErrorCloseNT, self, err);
115+
queueMicrotask(() => {
116+
emitErrorCloseNT(self, err);
117+
});
116118
} else {
117-
process.nextTick(emitCloseNT, self);
119+
queueMicrotask(() => {
120+
emitCloseNT(self);
121+
});
118122
}
119123
}
120124
try {
@@ -233,7 +237,9 @@ function errorOrDestroy(stream, err, sync) {
233237
r.errored = err;
234238
}
235239
if (sync) {
236-
process.nextTick(emitErrorNT, stream, err);
240+
queueMicrotask(() => {
241+
emitErrorNT(stream, err);
242+
});
237243
} else {
238244
emitErrorNT(stream, err);
239245
}
@@ -262,7 +268,9 @@ function construct(stream, cb) {
262268
return;
263269
}
264270

265-
process.nextTick(constructNT, stream);
271+
queueMicrotask(() => {
272+
constructNT(stream);
273+
});
266274
}
267275

268276
function constructNT(stream) {
@@ -291,16 +299,22 @@ function constructNT(stream) {
291299
} else if (err) {
292300
errorOrDestroy(stream, err, true);
293301
} else {
294-
process.nextTick(emitConstructNT, stream);
302+
queueMicrotask(() => {
303+
emitConstructNT(stream);
304+
});
295305
}
296306
}
297307

298308
try {
299309
stream._construct((err) => {
300-
process.nextTick(onConstruct, err);
310+
queueMicrotask(() =>{
311+
onConstruct(err);
312+
});
301313
});
302314
} catch (err) {
303-
process.nextTick(onConstruct, err);
315+
queueMicrotask(() => {
316+
onConstruct(err);
317+
});
304318
}
305319
}
306320

@@ -318,11 +332,14 @@ function emitCloseLegacy(stream) {
318332

319333
function emitErrorCloseLegacy(stream, err) {
320334
stream.emit('error', err);
321-
process.nextTick(emitCloseLegacy, stream);
335+
queueMicrotask(() => {
336+
emitCloseLegacy(stream);
337+
});
322338
}
323339

324340
// Normalize destroy for legacy.
325341
function destroyer(stream, err) {
342+
process._rawDebug("### 0")
326343
if (!stream || isDestroyed(stream)) {
327344
return;
328345
}
@@ -345,9 +362,13 @@ function destroyer(stream, err) {
345362
// TODO: Don't lose err?
346363
stream.close();
347364
} else if (err) {
348-
process.nextTick(emitErrorCloseLegacy, stream, err);
365+
queueMicrotask(() => {
366+
emitErrorCloseLegacy(stream, err);
367+
});
349368
} else {
350-
process.nextTick(emitCloseLegacy, stream);
369+
queueMicrotask(() => {
370+
emitCloseLegacy(stream);
371+
});
351372
}
352373

353374
if (!stream.destroyed) {

test/parallel/test-stream-destroy.js

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,3 +118,23 @@ const http = require('http');
118118
req.end('asd');
119119
});
120120
}
121+
122+
{
123+
// Destroy timing relative to Promise
124+
125+
new Promise(resolve => {
126+
const r = new Readable({ read() {} });
127+
destroy(r, new Error('asd'));
128+
resolve(r);
129+
}).then(common.mustCall(r => {
130+
r.on('error', common.mustCall());
131+
}));
132+
133+
new Promise(resolve => {
134+
const r = new Readable({ read() {} });
135+
r.destroy(new Error('asd'));
136+
resolve(r);
137+
}).then(common.mustCall(r => {
138+
r.on('error', common.mustCall());
139+
}));
140+
}

0 commit comments

Comments
 (0)