Skip to content

Commit d99602f

Browse files
committed
streams: implement finished() for webstreams
Refs: #39316
1 parent 91ca2d4 commit d99602f

File tree

7 files changed

+92
-36
lines changed

7 files changed

+92
-36
lines changed

lib/internal/streams/end-of-stream.js

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ const {
2222
validateBoolean
2323
} = require('internal/validators');
2424

25-
const { Promise } = primordials;
25+
const { Promise, PromisePrototypeThen } = primordials;
2626

2727
const {
2828
isClosed,
@@ -38,6 +38,15 @@ const {
3838
willEmitClose: _willEmitClose,
3939
} = require('internal/streams/utils');
4040

41+
const {
42+
isBrandCheck,
43+
} = require('internal/webstreams/util');
44+
45+
const isReadableStream =
46+
isBrandCheck('ReadableStream');
47+
const isWritableStream =
48+
isBrandCheck('WritableStream');
49+
4150
function isRequest(stream) {
4251
return stream.setHeader && typeof stream.abort === 'function';
4352
}
@@ -62,8 +71,7 @@ function eos(stream, options, callback) {
6271
const writable = options.writable ?? isWritableNodeStream(stream);
6372

6473
if (!isNodeStream(stream)) {
65-
// TODO: Webstreams.
66-
throw new ERR_INVALID_ARG_TYPE('stream', 'Stream', stream);
74+
return eosWeb(stream, options, callback);
6775
}
6876

6977
const wState = stream._writableState;
@@ -255,6 +263,15 @@ function eos(stream, options, callback) {
255263
return cleanup;
256264
}
257265

266+
function eosWeb(stream, opts, callback) {
267+
PromisePrototypeThen(
268+
stream.streamClosed,
269+
() => callback.call(stream),
270+
(err) => callback.call(stream, err)
271+
);
272+
return nop;
273+
}
274+
258275
function finished(stream, opts) {
259276
let autoCleanup = false;
260277
if (opts === null) {

lib/internal/streams/utils.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ function isWritableEnded(stream) {
8383

8484
// Have emitted 'finish'.
8585
function isWritableFinished(stream, strict) {
86-
if (!isWritableNodeStream(stream)) return null;
86+
if (!isWritableNodeStream(stream)) return stream?.state === 'closed' ? true : null;
8787
if (stream.writableFinished === true) return true;
8888
const wState = stream._writableState;
8989
if (wState?.errored) return false;
@@ -106,7 +106,7 @@ function isReadableEnded(stream) {
106106

107107
// Have emitted 'end'.
108108
function isReadableFinished(stream, strict) {
109-
if (!isReadableNodeStream(stream)) return null;
109+
if (!isReadableNodeStream(stream)) stream?.state === 'closed' ? true : null;
110110
const rState = stream._readableState;
111111
if (rState?.errored) return false;
112112
if (typeof rState?.endEmitted !== 'boolean') return null;
@@ -155,7 +155,7 @@ function isFinished(stream, opts) {
155155

156156
function isWritableErrored(stream) {
157157
if (!isNodeStream(stream)) {
158-
return null;
158+
return stream?.state === 'errored' ? true : null;
159159
}
160160

161161
if (stream.writableErrored) {
@@ -167,7 +167,7 @@ function isWritableErrored(stream) {
167167

168168
function isReadableErrored(stream) {
169169
if (!isNodeStream(stream)) {
170-
return null;
170+
return stream?.state === 'errored' ? true : null;
171171
}
172172

173173
if (stream.readableErrored) {
@@ -179,7 +179,7 @@ function isReadableErrored(stream) {
179179

180180
function isClosed(stream) {
181181
if (!isNodeStream(stream)) {
182-
return null;
182+
return stream?.state === 'closed' ? true : null;
183183
}
184184

185185
if (typeof stream.closed === 'boolean') {

lib/internal/webstreams/readablestream.js

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,8 @@ class ReadableStream {
231231
port1: undefined,
232232
port2: undefined,
233233
promise: undefined,
234-
}
234+
},
235+
streamClosed: createDeferredPromise(),
235236
};
236237

237238
// The spec requires handling of the strategy first
@@ -288,6 +289,12 @@ class ReadableStream {
288289
return isReadableStreamLocked(this);
289290
}
290291

292+
get streamClosed() {
293+
if (!isReadableStream(this))
294+
throw new ERR_INVALID_THIS('ReadableStream');
295+
return this[kState].streamClosed.promise;
296+
}
297+
291298
/**
292299
* @param {any} [reason]
293300
* @returns { Promise<void> }
@@ -1869,6 +1876,7 @@ function readableStreamCancel(stream, reason) {
18691876
function readableStreamClose(stream) {
18701877
assert(stream[kState].state === 'readable');
18711878
stream[kState].state = 'closed';
1879+
stream[kState].streamClosed?.resolve?.();
18721880

18731881
const {
18741882
reader,
@@ -1900,6 +1908,10 @@ function readableStreamError(stream, error) {
19001908

19011909
reader[kState].close.reject(error);
19021910
setPromiseHandled(reader[kState].close.promise);
1911+
if (stream[kState].streamClosed?.promise !== undefined) {
1912+
stream[kState].streamClosed?.reject?.(error);
1913+
setPromiseHandled(stream[kState].streamClosed?.promise);
1914+
}
19031915

19041916
if (readableStreamHasDefaultReader(stream)) {
19051917
for (let n = 0; n < reader[kState].readRequests.length; n++)

lib/internal/webstreams/writablestream.js

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,8 @@ class WritableStream {
175175
port1: undefined,
176176
port2: undefined,
177177
promise: undefined,
178-
}
178+
},
179+
streamClosed: createDeferredPromise(),
179180
};
180181

181182
const size = extractSizeAlgorithm(strategy?.size);
@@ -201,6 +202,12 @@ class WritableStream {
201202
return isWritableStreamLocked(this);
202203
}
203204

205+
get streamClosed() {
206+
if (!isWritableStream(this))
207+
throw new ERR_INVALID_THIS('WritableStream');
208+
return this[kState].streamClosed.promise;
209+
}
210+
204211
/**
205212
* @param {any} reason
206213
* @returns {Promise<void>}
@@ -733,6 +740,10 @@ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) {
733740
writer[kState].close.reject?.(stream[kState].storedError);
734741
setPromiseHandled(writer[kState].close.promise);
735742
}
743+
if (stream[kState].streamClosed?.promise !== undefined) {
744+
stream[kState].streamClosed.reject?.(stream[kState]?.storedError);
745+
setPromiseHandled(stream[kState].streamClosed?.promise);
746+
}
736747
}
737748

738749
function writableStreamMarkFirstWriteRequestInFlight(stream) {
@@ -839,6 +850,7 @@ function writableStreamFinishInFlightClose(stream) {
839850
stream[kState].state = 'closed';
840851
if (stream[kState].writer !== undefined)
841852
stream[kState].writer[kState].close.resolve?.();
853+
stream[kState].streamClosed?.resolve?.();
842854
assert(stream[kState].pendingAbortRequest.abort.promise === undefined);
843855
assert(stream[kState].storedError === undefined);
844856
}

test/parallel/test-stream-end-of-streams.js

Lines changed: 0 additions & 20 deletions
This file was deleted.

test/parallel/test-stream-finished.js

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -260,12 +260,7 @@ const http = require('http');
260260
const streamLike = new EE();
261261
streamLike.readableEnded = true;
262262
streamLike.readable = true;
263-
assert.throws(
264-
() => {
265-
finished(streamLike, () => {});
266-
},
267-
{ code: 'ERR_INVALID_ARG_TYPE' }
268-
);
263+
finished(streamLike, common.mustCall());
269264
streamLike.emit('close');
270265
}
271266

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const { ReadableStream, WritableStream } = require('stream/web');
6+
const { finished } = require('stream');
7+
8+
{
9+
const rs = new ReadableStream({
10+
start(controller) {
11+
controller.enqueue('asd');
12+
controller.close();
13+
},
14+
});
15+
finished(rs, common.mustSucceed());
16+
async function test() {
17+
const values = [];
18+
for await (const chunk of rs) {
19+
values.push(chunk);
20+
}
21+
assert.deepStrictEqual(values, ['asd']);
22+
}
23+
test();
24+
}
25+
26+
{
27+
let str = '';
28+
const ws = new WritableStream({
29+
write(chunk) {
30+
console.log(chunk);
31+
str += chunk;
32+
}
33+
});
34+
finished(ws, common.mustSucceed(() => {
35+
assert.strictEqual(str, 'asd');
36+
}));
37+
const writer = ws.getWriter();
38+
writer.write('asd');
39+
writer.close();
40+
}

0 commit comments

Comments
 (0)