Skip to content

Commit b9248c9

Browse files
committed
stream: readable use bitmap accessors
PR-URL: #50350
1 parent 8f742bb commit b9248c9

File tree

1 file changed

+31
-27
lines changed

1 file changed

+31
-27
lines changed

lib/internal/streams/readable.js

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ function readableAddChunkUnshiftObjectMode(stream, state, chunk) {
435435
function readableAddChunkUnshiftValue(stream, state, chunk) {
436436
if ((state[kState] & kEndEmitted) !== 0)
437437
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
438-
else if (state.destroyed || state.errored)
438+
else if ((state[kState] & (kDestroyed | kErrored)) !== 0)
439439
return false;
440440
else
441441
addChunk(stream, state, chunk, true);
@@ -604,7 +604,7 @@ function computeNewHighWaterMark(n) {
604604
// This function is designed to be inlinable, so please take care when making
605605
// changes to the function body.
606606
function howMuchToRead(n, state) {
607-
if (n <= 0 || (state.length === 0 && state.ended))
607+
if (n <= 0 || (state.length === 0 && (state[kState] & kEnded) !== 0))
608608
return 0;
609609
if ((state[kState] & kObjectMode) !== 0)
610610
return 1;
@@ -648,7 +648,7 @@ Readable.prototype.read = function(n) {
648648
state.length >= state.highWaterMark :
649649
state.length > 0) ||
650650
(state[kState] & kEnded) !== 0)) {
651-
debug('read: emitReadable', state.length, (state[kState] & kEnded) !== 0);
651+
debug('read: emitReadable');
652652
if (state.length === 0 && (state[kState] & kEnded) !== 0)
653653
endReadable(this);
654654
else
@@ -806,7 +806,7 @@ function emitReadable(stream) {
806806
function emitReadable_(stream) {
807807
const state = stream._readableState;
808808
debug('emitReadable_');
809-
if ((state[kState] & (kDestroyed | kErrored)) === 0 && (state.length || state.ended)) {
809+
if ((state[kState] & (kDestroyed | kErrored)) === 0 && (state.length || (state[kState] & kEnded) !== 0)) {
810810
stream.emit('readable');
811811
state[kState] &= ~kEmittedReadable;
812812
}
@@ -887,7 +887,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
887887
const state = this._readableState;
888888

889889
if (state.pipes.length === 1) {
890-
if (!state.multiAwaitDrain) {
890+
if ((state[kState] & kMultiAwaitDrain) === 0) {
891891
state[kState] |= kMultiAwaitDrain;
892892
state.awaitDrainWriters = new SafeSet(
893893
state.awaitDrainWriters ? [state.awaitDrainWriters] : [],
@@ -903,7 +903,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
903903
dest !== process.stderr;
904904

905905
const endFn = doEnd ? onend : unpipe;
906-
if (state.endEmitted)
906+
if ((state[kState] & kEndEmitted) !== 0)
907907
process.nextTick(endFn);
908908
else
909909
src.once('end', endFn);
@@ -962,7 +962,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
962962
if (state.pipes.length === 1 && state.pipes[0] === dest) {
963963
debug('false write response, pause', 0);
964964
state.awaitDrainWriters = dest;
965-
state.multiAwaitDrain = false;
965+
state[kState] &= ~kMultiAwaitDrain;
966966
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
967967
debug('false write response, pause', state.awaitDrainWriters.size);
968968
state.awaitDrainWriters.add(dest);
@@ -1034,7 +1034,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
10341034

10351035
if (dest.writableNeedDrain === true) {
10361036
pause();
1037-
} else if (!state.flowing) {
1037+
} else if ((state[kState] & kFlowing) === 0) {
10381038
debug('pipe resume');
10391039
src.resume();
10401040
}
@@ -1052,7 +1052,7 @@ function pipeOnDrain(src, dest) {
10521052
if (state.awaitDrainWriters === dest) {
10531053
debug('pipeOnDrain', 1);
10541054
state.awaitDrainWriters = null;
1055-
} else if (state.multiAwaitDrain) {
1055+
} else if ((state[kState] & kMultiAwaitDrain) !== 0) {
10561056
debug('pipeOnDrain', state.awaitDrainWriters.size);
10571057
state.awaitDrainWriters.delete(dest);
10581058
}
@@ -1107,20 +1107,20 @@ Readable.prototype.on = function(ev, fn) {
11071107
if (ev === 'data') {
11081108
// Update readableListening so that resume() may be a no-op
11091109
// a few lines down. This is needed to support once('readable').
1110-
state.readableListening = this.listenerCount('readable') > 0;
1110+
state[kState] |= this.listenerCount('readable') > 0 ? kReadableListening : 0;
11111111

11121112
// Try start flowing on next tick if stream isn't explicitly paused.
1113-
if (state.flowing !== false)
1113+
if ((state[kState] & (kHasFlowing | kFlowing)) !== kHasFlowing) {
11141114
this.resume();
1115+
}
11151116
} else if (ev === 'readable') {
1116-
if (!state.endEmitted && !state.readableListening) {
1117-
state.readableListening = state.needReadable = true;
1118-
state.flowing = false;
1119-
state.emittedReadable = false;
1120-
debug('on readable', state.length, state.reading);
1117+
if ((state[kState] & (kEndEmitted | kReadableListening)) === 0) {
1118+
state[kState] |= kReadableListening | kNeedReadable | kHasFlowing;
1119+
state[kState] &= ~(kFlowing | kEmittedReadable);
1120+
debug('on readable');
11211121
if (state.length) {
11221122
emitReadable(this);
1123-
} else if (!state.reading) {
1123+
} else if ((state[kState] & kReading) === 0) {
11241124
process.nextTick(nReadingNextTick, this);
11251125
}
11261126
}
@@ -1167,7 +1167,12 @@ Readable.prototype.removeAllListeners = function(ev) {
11671167

11681168
function updateReadableListening(self) {
11691169
const state = self._readableState;
1170-
state.readableListening = self.listenerCount('readable') > 0;
1170+
1171+
if (self.listenerCount('readable') > 0) {
1172+
state[kState] |= kReadableListening;
1173+
} else {
1174+
state[kState] &= ~kReadableListening;
1175+
}
11711176

11721177
if ((state[kState] & (kHasPaused | kPaused | kResumeScheduled)) === (kHasPaused | kResumeScheduled)) {
11731178
// Flowing needs to be set to true now, otherwise
@@ -1197,7 +1202,7 @@ Readable.prototype.resume = function() {
11971202
// for readable, but we still have to call
11981203
// resume().
11991204
state[kState] |= kHasFlowing;
1200-
if (!state.readableListening) {
1205+
if ((state[kState] & kReadableListening) === 0) {
12011206
state[kState] |= kFlowing;
12021207
} else {
12031208
state[kState] &= ~kFlowing;
@@ -1210,8 +1215,8 @@ Readable.prototype.resume = function() {
12101215
};
12111216

12121217
function resume(stream, state) {
1213-
if (!state.resumeScheduled) {
1214-
state.resumeScheduled = true;
1218+
if ((state[kState] & kResumeScheduled) === 0) {
1219+
state[kState] |= kResumeScheduled;
12151220
process.nextTick(resume_, stream, state);
12161221
}
12171222
}
@@ -1232,7 +1237,7 @@ function resume_(stream, state) {
12321237
Readable.prototype.pause = function() {
12331238
const state = this._readableState;
12341239
debug('call pause');
1235-
if (state.flowing !== false) {
1240+
if ((state[kState] & (kHasFlowing | kFlowing)) !== kHasFlowing) {
12361241
debug('pause');
12371242
state[kState] |= kHasFlowing;
12381243
state[kState] &= ~kFlowing;
@@ -1572,20 +1577,19 @@ function fromList(n, state) {
15721577
function endReadable(stream) {
15731578
const state = stream._readableState;
15741579

1575-
debug('endReadable', (state[kState] & kEndEmitted) !== 0);
1580+
debug('endReadable');
15761581
if ((state[kState] & kEndEmitted) === 0) {
15771582
state[kState] |= kEnded;
15781583
process.nextTick(endReadableNT, state, stream);
15791584
}
15801585
}
15811586

15821587
function endReadableNT(state, stream) {
1583-
debug('endReadableNT', state.endEmitted, state.length);
1588+
debug('endReadableNT');
15841589

15851590
// Check that we didn't get one last unshift.
1586-
if (!state.errored && !state.closeEmitted &&
1587-
!state.endEmitted && state.length === 0) {
1588-
state.endEmitted = true;
1591+
if ((state[kState] & (kErrored | kCloseEmitted | kEndEmitted)) === 0 && state.length === 0) {
1592+
state[kState] |= kEndEmitted;
15891593
stream.emit('end');
15901594

15911595
if (stream.writable && stream.allowHalfOpen === false) {

0 commit comments

Comments
 (0)