Skip to content

Commit 82de660

Browse files
MattiasBuelensRafaelGSS
authored andcommitted
stream: fix code style
PR-URL: #51168 Reviewed-By: Matteo Collina <[email protected]>
1 parent a41dca0 commit 82de660

File tree

5 files changed

+251
-282
lines changed

5 files changed

+251
-282
lines changed

lib/internal/webstreams/readablestream.js

Lines changed: 88 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ const {
1111
FunctionPrototypeCall,
1212
MathMin,
1313
NumberIsInteger,
14-
ObjectCreate,
1514
ObjectDefineProperties,
1615
ObjectSetPrototypeOf,
1716
Promise,
@@ -95,9 +94,9 @@ const {
9594
AsyncIterator,
9695
cloneAsUint8Array,
9796
copyArrayBuffer,
97+
createPromiseCallback,
9898
customInspect,
9999
dequeueValue,
100-
ensureIsPromise,
101100
enqueueValueWithSize,
102101
extractHighWaterMark,
103102
extractSizeAlgorithm,
@@ -251,19 +250,7 @@ class ReadableStream {
251250
markTransferMode(this, false, true);
252251
if (source === null)
253252
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
254-
this[kState] = {
255-
disturbed: false,
256-
reader: undefined,
257-
state: 'readable',
258-
storedError: undefined,
259-
stream: undefined,
260-
transfer: {
261-
writable: undefined,
262-
port1: undefined,
263-
port2: undefined,
264-
promise: undefined,
265-
},
266-
};
253+
this[kState] = createReadableStreamState();
267254

268255
this[kIsClosedPromise] = createDeferredPromise();
269256
this[kControllerErrorFunction] = () => {};
@@ -649,19 +636,7 @@ ObjectDefineProperties(ReadableStream, {
649636
function InternalTransferredReadableStream() {
650637
markTransferMode(this, false, true);
651638
this[kType] = 'ReadableStream';
652-
this[kState] = {
653-
disturbed: false,
654-
reader: undefined,
655-
state: 'readable',
656-
storedError: undefined,
657-
stream: undefined,
658-
transfer: {
659-
writable: undefined,
660-
port1: undefined,
661-
port2: undefined,
662-
promise: undefined,
663-
},
664-
};
639+
this[kState] = createReadableStreamState();
665640

666641
this[kIsClosedPromise] = createDeferredPromise();
667642
}
@@ -1230,41 +1205,58 @@ ObjectDefineProperties(ReadableByteStreamController.prototype, {
12301205
[SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableByteStreamController.name),
12311206
});
12321207

1233-
function TeeReadableStream(start, pull, cancel) {
1208+
function InternalReadableStream(start, pull, cancel, highWaterMark, size) {
12341209
markTransferMode(this, false, true);
12351210
this[kType] = 'ReadableStream';
1236-
this[kState] = {
1237-
disturbed: false,
1238-
state: 'readable',
1239-
storedError: undefined,
1240-
stream: undefined,
1241-
transfer: {
1242-
writable: undefined,
1243-
port: undefined,
1244-
promise: undefined,
1245-
},
1246-
};
1211+
this[kState] = createReadableStreamState();
1212+
this[kIsClosedPromise] = createDeferredPromise();
1213+
const controller = new ReadableStreamDefaultController(kSkipThrow);
1214+
setupReadableStreamDefaultController(
1215+
this,
1216+
controller,
1217+
start,
1218+
pull,
1219+
cancel,
1220+
highWaterMark,
1221+
size);
1222+
}
1223+
1224+
ObjectSetPrototypeOf(InternalReadableStream.prototype, ReadableStream.prototype);
1225+
ObjectSetPrototypeOf(InternalReadableStream, ReadableStream);
1226+
1227+
function createReadableStream(start, pull, cancel, highWaterMark = 1, size = () => 1) {
1228+
const stream = new InternalReadableStream(start, pull, cancel, highWaterMark, size);
1229+
1230+
// For spec compliance the InternalReadableStream must be a ReadableStream
1231+
stream.constructor = ReadableStream;
1232+
return stream;
1233+
}
1234+
1235+
function InternalReadableByteStream(start, pull, cancel) {
1236+
markTransferMode(this, false, true);
1237+
this[kType] = 'ReadableStream';
1238+
this[kState] = createReadableStreamState();
12471239
this[kIsClosedPromise] = createDeferredPromise();
1248-
setupReadableStreamDefaultControllerFromSource(
1240+
const controller = new ReadableByteStreamController(kSkipThrow);
1241+
setupReadableByteStreamController(
12491242
this,
1250-
ObjectCreate(null, {
1251-
start: { __proto__: null, value: start },
1252-
pull: { __proto__: null, value: pull },
1253-
cancel: { __proto__: null, value: cancel },
1254-
}),
1255-
1,
1256-
() => 1);
1243+
controller,
1244+
start,
1245+
pull,
1246+
cancel,
1247+
0,
1248+
undefined);
12571249
}
12581250

1259-
ObjectSetPrototypeOf(TeeReadableStream.prototype, ReadableStream.prototype);
1260-
ObjectSetPrototypeOf(TeeReadableStream, ReadableStream);
1251+
ObjectSetPrototypeOf(InternalReadableByteStream.prototype, ReadableStream.prototype);
1252+
ObjectSetPrototypeOf(InternalReadableByteStream, ReadableStream);
12611253

1262-
function createTeeReadableStream(start, pull, cancel) {
1263-
const tee = new TeeReadableStream(start, pull, cancel);
1254+
function createReadableByteStream(start, pull, cancel) {
1255+
const stream = new InternalReadableByteStream(start, pull, cancel);
12641256

1265-
// For spec compliance the Tee must be a ReadableStream
1266-
tee.constructor = ReadableStream;
1267-
return tee;
1257+
// For spec compliance the InternalReadableByteStream must be a ReadableStream
1258+
stream.constructor = ReadableStream;
1259+
return stream;
12681260
}
12691261

12701262
const isReadableStream =
@@ -1280,6 +1272,23 @@ const isReadableStreamBYOBReader =
12801272

12811273
// ---- ReadableStream Implementation
12821274

1275+
function createReadableStreamState() {
1276+
return {
1277+
__proto__: null,
1278+
disturbed: false,
1279+
reader: undefined,
1280+
state: 'readable',
1281+
storedError: undefined,
1282+
transfer: {
1283+
__proto__: null,
1284+
writable: undefined,
1285+
port1: undefined,
1286+
port2: undefined,
1287+
promise: undefined,
1288+
},
1289+
};
1290+
}
1291+
12831292
function readableStreamFromIterable(iterable) {
12841293
let stream;
12851294
const iteratorRecord = getIterator(iterable, 'async');
@@ -1319,16 +1328,12 @@ function readableStreamFromIterable(iterable) {
13191328
});
13201329
}
13211330

1322-
stream = new ReadableStream({
1323-
start: startAlgorithm,
1324-
pull: pullAlgorithm,
1325-
cancel: cancelAlgorithm,
1326-
}, {
1327-
size() {
1328-
return 1;
1329-
},
1330-
highWaterMark: 0,
1331-
});
1331+
stream = createReadableStream(
1332+
startAlgorithm,
1333+
pullAlgorithm,
1334+
cancelAlgorithm,
1335+
0,
1336+
);
13321337

13331338
return stream;
13341339
}
@@ -1654,9 +1659,9 @@ function readableStreamDefaultTee(stream, cloneForBranch2) {
16541659
}
16551660

16561661
branch1 =
1657-
createTeeReadableStream(nonOpStart, pullAlgorithm, cancel1Algorithm);
1662+
createReadableStream(nonOpStart, pullAlgorithm, cancel1Algorithm);
16581663
branch2 =
1659-
createTeeReadableStream(nonOpStart, pullAlgorithm, cancel2Algorithm);
1664+
createReadableStream(nonOpStart, pullAlgorithm, cancel2Algorithm);
16601665

16611666
PromisePrototypeThen(
16621667
reader[kState].close.promise,
@@ -1933,16 +1938,10 @@ function readableByteStreamTee(stream) {
19331938
return cancelDeferred.promise;
19341939
}
19351940

1936-
branch1 = new ReadableStream({
1937-
type: 'bytes',
1938-
pull: pull1Algorithm,
1939-
cancel: cancel1Algorithm,
1940-
});
1941-
branch2 = new ReadableStream({
1942-
type: 'bytes',
1943-
pull: pull2Algorithm,
1944-
cancel: cancel2Algorithm,
1945-
});
1941+
branch1 =
1942+
createReadableByteStream(nonOpStart, pull1Algorithm, cancel1Algorithm);
1943+
branch2 =
1944+
createReadableByteStream(nonOpStart, pull2Algorithm, cancel2Algorithm);
19461945

19471946
forwardReaderError(reader);
19481947

@@ -1993,10 +1992,7 @@ function readableStreamCancel(stream, reason) {
19931992
}
19941993

19951994
return PromisePrototypeThen(
1996-
ensureIsPromise(
1997-
stream[kState].controller[kCancel],
1998-
stream[kState].controller,
1999-
reason),
1995+
stream[kState].controller[kCancel](reason),
20001996
() => {});
20011997
}
20021998

@@ -2361,7 +2357,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) {
23612357
assert(!controller[kState].pullAgain);
23622358
controller[kState].pulling = true;
23632359
PromisePrototypeThen(
2364-
ensureIsPromise(controller[kState].pullAlgorithm, controller),
2360+
controller[kState].pullAlgorithm(controller),
23652361
() => {
23662362
controller[kState].pulling = false;
23672363
if (controller[kState].pullAgain) {
@@ -2391,12 +2387,9 @@ function readableStreamDefaultControllerError(controller, error) {
23912387

23922388
function readableStreamDefaultControllerCancelSteps(controller, reason) {
23932389
resetQueue(controller);
2394-
try {
2395-
const result = controller[kState].cancelAlgorithm(reason);
2396-
return result;
2397-
} finally {
2398-
readableStreamDefaultControllerClearAlgorithms(controller);
2399-
}
2390+
const result = controller[kState].cancelAlgorithm(reason);
2391+
readableStreamDefaultControllerClearAlgorithms(controller);
2392+
return result;
24002393
}
24012394

24022395
function readableStreamDefaultControllerPullSteps(controller, readRequest) {
@@ -2470,11 +2463,10 @@ function setupReadableStreamDefaultControllerFromSource(
24702463
FunctionPrototypeBind(start, source, controller) :
24712464
nonOpStart;
24722465
const pullAlgorithm = pull ?
2473-
FunctionPrototypeBind(pull, source, controller) :
2466+
createPromiseCallback('source.pull', pull, source) :
24742467
nonOpPull;
2475-
24762468
const cancelAlgorithm = cancel ?
2477-
FunctionPrototypeBind(cancel, source) :
2469+
createPromiseCallback('source.cancel', cancel, source) :
24782470
nonOpCancel;
24792471

24802472
setupReadableStreamDefaultController(
@@ -3102,7 +3094,7 @@ function readableByteStreamControllerCallPullIfNeeded(controller) {
31023094
assert(!controller[kState].pullAgain);
31033095
controller[kState].pulling = true;
31043096
PromisePrototypeThen(
3105-
ensureIsPromise(controller[kState].pullAlgorithm, controller),
3097+
controller[kState].pullAlgorithm(controller),
31063098
() => {
31073099
controller[kState].pulling = false;
31083100
if (controller[kState].pullAgain) {
@@ -3269,10 +3261,10 @@ function setupReadableByteStreamControllerFromSource(
32693261
FunctionPrototypeBind(start, source, controller) :
32703262
nonOpStart;
32713263
const pullAlgorithm = pull ?
3272-
FunctionPrototypeBind(pull, source, controller) :
3264+
createPromiseCallback('source.pull', pull, source, controller) :
32733265
nonOpPull;
32743266
const cancelAlgorithm = cancel ?
3275-
FunctionPrototypeBind(cancel, source) :
3267+
createPromiseCallback('source.cancel', cancel, source) :
32763268
nonOpCancel;
32773269

32783270
if (autoAllocateChunkSize === 0) {
@@ -3369,4 +3361,6 @@ module.exports = {
33693361
readableByteStreamControllerPullSteps,
33703362
setupReadableByteStreamController,
33713363
setupReadableByteStreamControllerFromSource,
3364+
createReadableStream,
3365+
createReadableByteStream,
33723366
};

0 commit comments

Comments
 (0)