Skip to content

Commit 0ccd463

Browse files
benjamingrruyadorno
authored andcommitted
stream: use bitmap in readable state
PR-URL: #49745 Reviewed-By: Yagiz Nizipli <[email protected]> Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Moshe Atlow <[email protected]>
1 parent 864fe56 commit 0ccd463

File tree

1 file changed

+91
-59
lines changed

1 file changed

+91
-59
lines changed

lib/internal/streams/readable.js

Lines changed: 91 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,75 @@ const nop = () => {};
8383

8484
const { errorOrDestroy } = destroyImpl;
8585

86+
const kObjectMode = 1 << 0;
87+
const kEnded = 1 << 1;
88+
const kEndEmitted = 1 << 2;
89+
const kReading = 1 << 3;
90+
const kConstructed = 1 << 4;
91+
const kSync = 1 << 5;
92+
const kNeedReadable = 1 << 6;
93+
const kEmittedReadable = 1 << 7;
94+
const kReadableListening = 1 << 8;
95+
const kResumeScheduled = 1 << 9;
96+
const kErrorEmitted = 1 << 10;
97+
const kEmitClose = 1 << 11;
98+
const kAutoDestroy = 1 << 12;
99+
const kDestroyed = 1 << 13;
100+
const kClosed = 1 << 14;
101+
const kCloseEmitted = 1 << 15;
102+
const kMultiAwaitDrain = 1 << 16;
103+
const kReadingMore = 1 << 17;
104+
const kDataEmitted = 1 << 18;
105+
106+
// TODO(benjamingr) it is likely slower to do it this way than with free functions
107+
function makeBitMapDescriptor(bit) {
108+
return {
109+
enumerable: false,
110+
get() { return (this.state & bit) !== 0; },
111+
set(value) {
112+
if (value) this.state |= bit;
113+
else this.state &= ~bit;
114+
},
115+
};
116+
}
117+
ObjectDefineProperties(ReadableState.prototype, {
118+
objectMode: makeBitMapDescriptor(kObjectMode),
119+
ended: makeBitMapDescriptor(kEnded),
120+
endEmitted: makeBitMapDescriptor(kEndEmitted),
121+
reading: makeBitMapDescriptor(kReading),
122+
// Stream is still being constructed and cannot be
123+
// destroyed until construction finished or failed.
124+
// Async construction is opt in, therefore we start as
125+
// constructed.
126+
constructed: makeBitMapDescriptor(kConstructed),
127+
// A flag to be able to tell if the event 'readable'/'data' is emitted
128+
// immediately, or on a later tick. We set this to true at first, because
129+
// any actions that shouldn't happen until "later" should generally also
130+
// not happen before the first read call.
131+
sync: makeBitMapDescriptor(kSync),
132+
// Whenever we return null, then we set a flag to say
133+
// that we're awaiting a 'readable' event emission.
134+
needReadable: makeBitMapDescriptor(kNeedReadable),
135+
emittedReadable: makeBitMapDescriptor(kEmittedReadable),
136+
readableListening: makeBitMapDescriptor(kReadableListening),
137+
resumeScheduled: makeBitMapDescriptor(kResumeScheduled),
138+
// True if the error was already emitted and should not be thrown again.
139+
errorEmitted: makeBitMapDescriptor(kErrorEmitted),
140+
emitClose: makeBitMapDescriptor(kEmitClose),
141+
autoDestroy: makeBitMapDescriptor(kAutoDestroy),
142+
// Has it been destroyed.
143+
destroyed: makeBitMapDescriptor(kDestroyed),
144+
// Indicates whether the stream has finished destroying.
145+
closed: makeBitMapDescriptor(kClosed),
146+
// True if close has been emitted or would have been emitted
147+
// depending on emitClose.
148+
closeEmitted: makeBitMapDescriptor(kCloseEmitted),
149+
multiAwaitDrain: makeBitMapDescriptor(kMultiAwaitDrain),
150+
// If true, a maybeReadMore has been scheduled.
151+
readingMore: makeBitMapDescriptor(kReadingMore),
152+
dataEmitted: makeBitMapDescriptor(kDataEmitted),
153+
});
154+
86155
function ReadableState(options, stream, isDuplex) {
87156
// Duplex streams are both readable and writable, but share
88157
// the same options object.
@@ -92,13 +161,15 @@ function ReadableState(options, stream, isDuplex) {
92161
if (typeof isDuplex !== 'boolean')
93162
isDuplex = stream instanceof Stream.Duplex;
94163

164+
// Bit map field to store ReadableState more effciently with 1 bit per field
165+
// instead of a V8 slot per field.
166+
this.state = kEmitClose | kAutoDestroy | kConstructed | kSync;
95167
// Object stream flag. Used to make read(n) ignore n and to
96168
// make all the buffer merging and length checks go away.
97-
this.objectMode = !!(options && options.objectMode);
169+
if (options && options.objectMode) this.state |= kObjectMode;
98170

99-
if (isDuplex)
100-
this.objectMode = this.objectMode ||
101-
!!(options && options.readableObjectMode);
171+
if (isDuplex && options && options.readableObjectMode)
172+
this.state |= kObjectMode;
102173

103174
// The point at which it stops calling _read() to fill the buffer
104175
// Note: 0 is a valid value, means "don't call _read preemptively ever"
@@ -113,54 +184,22 @@ function ReadableState(options, stream, isDuplex) {
113184
this.length = 0;
114185
this.pipes = [];
115186
this.flowing = null;
116-
this.ended = false;
117-
this.endEmitted = false;
118-
this.reading = false;
119-
120-
// Stream is still being constructed and cannot be
121-
// destroyed until construction finished or failed.
122-
// Async construction is opt in, therefore we start as
123-
// constructed.
124-
this.constructed = true;
125187

126-
// A flag to be able to tell if the event 'readable'/'data' is emitted
127-
// immediately, or on a later tick. We set this to true at first, because
128-
// any actions that shouldn't happen until "later" should generally also
129-
// not happen before the first read call.
130-
this.sync = true;
131-
132-
// Whenever we return null, then we set a flag to say
133-
// that we're awaiting a 'readable' event emission.
134-
this.needReadable = false;
135-
this.emittedReadable = false;
136-
this.readableListening = false;
137-
this.resumeScheduled = false;
138188
this[kPaused] = null;
139189

140-
// True if the error was already emitted and should not be thrown again.
141-
this.errorEmitted = false;
142-
143190
// Should close be emitted on destroy. Defaults to true.
144-
this.emitClose = !options || options.emitClose !== false;
191+
if (options && options.emitClose === false) this.state &= ~kEmitClose;
145192

146193
// Should .destroy() be called after 'end' (and potentially 'finish').
147-
this.autoDestroy = !options || options.autoDestroy !== false;
194+
if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy;
148195

149-
// Has it been destroyed.
150-
this.destroyed = false;
151196

152197
// Indicates whether the stream has errored. When true no further
153198
// _read calls, 'data' or 'readable' events should occur. This is needed
154199
// since when autoDestroy is disabled we need a way to tell whether the
155200
// stream has failed.
156201
this.errored = null;
157202

158-
// Indicates whether the stream has finished destroying.
159-
this.closed = false;
160-
161-
// True if close has been emitted or would have been emitted
162-
// depending on emitClose.
163-
this.closeEmitted = false;
164203

165204
// Crypto is kind of old and crusty. Historically, its default string
166205
// encoding is 'binary' so we have to make this configurable.
@@ -177,12 +216,6 @@ function ReadableState(options, stream, isDuplex) {
177216
// Ref the piped dest which we need a drain event on it
178217
// type: null | Writable | Set<Writable>.
179218
this.awaitDrainWriters = null;
180-
this.multiAwaitDrain = false;
181-
182-
// If true, a maybeReadMore has been scheduled.
183-
this.readingMore = false;
184-
185-
this.dataEmitted = false;
186219

187220
this.decoder = null;
188221
this.encoding = null;
@@ -263,7 +296,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
263296
const state = stream._readableState;
264297

265298
let err;
266-
if (!state.objectMode) {
299+
if ((state.state & kObjectMode) === 0) {
267300
if (typeof chunk === 'string') {
268301
encoding = encoding || state.defaultEncoding;
269302
if (state.encoding !== encoding) {
@@ -290,11 +323,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
290323
if (err) {
291324
errorOrDestroy(stream, err);
292325
} else if (chunk === null) {
293-
state.reading = false;
326+
state.state &= ~kReading;
294327
onEofChunk(stream, state);
295-
} else if (state.objectMode || (chunk && chunk.length > 0)) {
328+
} else if (((state.state & kObjectMode) !== 0) || (chunk && chunk.length > 0)) {
296329
if (addToFront) {
297-
if (state.endEmitted)
330+
if ((state.state & kEndEmitted) !== 0)
298331
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
299332
else if (state.destroyed || state.errored)
300333
return false;
@@ -305,7 +338,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
305338
} else if (state.destroyed || state.errored) {
306339
return false;
307340
} else {
308-
state.reading = false;
341+
state.state &= ~kReading;
309342
if (state.decoder && !encoding) {
310343
chunk = state.decoder.write(chunk);
311344
if (state.objectMode || chunk.length !== 0)
@@ -317,7 +350,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
317350
}
318351
}
319352
} else if (!addToFront) {
320-
state.reading = false;
353+
state.state &= ~kReading;
321354
maybeReadMore(stream, state);
322355
}
323356

@@ -333,7 +366,7 @@ function addChunk(stream, state, chunk, addToFront) {
333366
stream.listenerCount('data') > 0) {
334367
// Use the guard to avoid creating `Set()` repeatedly
335368
// when we have multiple pipes.
336-
if (state.multiAwaitDrain) {
369+
if ((state.state & kMultiAwaitDrain) !== 0) {
337370
state.awaitDrainWriters.clear();
338371
} else {
339372
state.awaitDrainWriters = null;
@@ -349,7 +382,7 @@ function addChunk(stream, state, chunk, addToFront) {
349382
else
350383
state.buffer.push(chunk);
351384

352-
if (state.needReadable)
385+
if ((state.state & kNeedReadable) !== 0)
353386
emitReadable(stream);
354387
}
355388
maybeReadMore(stream, state);
@@ -404,7 +437,7 @@ function computeNewHighWaterMark(n) {
404437
function howMuchToRead(n, state) {
405438
if (n <= 0 || (state.length === 0 && state.ended))
406439
return 0;
407-
if (state.objectMode)
440+
if ((state.state & kObjectMode) !== 0)
408441
return 1;
409442
if (NumberIsNaN(n)) {
410443
// Only flow one buffer at a time.
@@ -435,7 +468,7 @@ Readable.prototype.read = function(n) {
435468
state.highWaterMark = computeNewHighWaterMark(n);
436469

437470
if (n !== 0)
438-
state.emittedReadable = false;
471+
state.state &= ~kEmittedReadable;
439472

440473
// If we're doing read(0) to trigger a readable event, but we
441474
// already have a bunch of data in the buffer, then just trigger
@@ -486,7 +519,7 @@ Readable.prototype.read = function(n) {
486519
// 3. Actually pull the requested chunks out of the buffer and return.
487520

488521
// if we need a readable event, then we need to do some reading.
489-
let doRead = state.needReadable;
522+
let doRead = (state.state & kNeedReadable) !== 0;
490523
debug('need readable', doRead);
491524

492525
// If we currently have less than the highWaterMark, then also read some.
@@ -504,20 +537,19 @@ Readable.prototype.read = function(n) {
504537
debug('reading, ended or constructing', doRead);
505538
} else if (doRead) {
506539
debug('do read');
507-
state.reading = true;
508-
state.sync = true;
540+
state.state |= kReading | kSync;
509541
// If the length is currently zero, then we *need* a readable event.
510542
if (state.length === 0)
511-
state.needReadable = true;
543+
state.state |= kNeedReadable;
512544

513545
// Call internal read method
514546
try {
515547
this._read(state.highWaterMark);
516548
} catch (err) {
517549
errorOrDestroy(this, err);
518550
}
551+
state.state &= ~kSync;
519552

520-
state.sync = false;
521553
// If _read pushed data synchronously, then `reading` will be false,
522554
// and we need to re-evaluate how much data we can return to the user.
523555
if (!state.reading)

0 commit comments

Comments
 (0)