Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions source/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ const getStreamIterable = async function * (stream) {
handleStreamEnd(stream, controller, state);

try {
for await (const [chunk] of nodeImports.events.on(stream, 'data', {
signal: controller.signal,
highWatermark: stream.readableHighWaterMark,
})) {
for await (const [chunk] of nodeImports.events.on(stream, 'data', {signal: controller.signal})) {
yield chunk;
}
} catch (error) {
Expand Down
25 changes: 0 additions & 25 deletions test/stream.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {once} from 'node:events';
import {version} from 'node:process';
import {Readable, Duplex} from 'node:stream';
import {finished} from 'node:stream/promises';
import {scheduler, setTimeout as pSetTimeout} from 'node:timers/promises';
Expand Down Expand Up @@ -268,30 +267,6 @@ const testMultipleReads = async (t, wait) => {
test('Handles multiple successive fast reads', testMultipleReads, () => scheduler.yield());
test('Handles multiple successive slow reads', testMultipleReads, () => pSetTimeout(100));

// The `highWaterMark` option was added to `once()` by Node 20.
// See https://github.com/nodejs/node/pull/41276
const nodeMajor = version.split('.')[0].slice(1);
if (nodeMajor >= 20) {
test('Pause stream when too much data at once', async t => {
const stream = new Readable({
read: onetime(function () {
this.push('.');
this.push('.');
this.push('.');
this.push('.');
this.push(null);
}),
highWaterMark: 2,
});
const [result] = await Promise.all([
getStream(stream),
once(stream, 'pause'),
]);
t.is(result, '....');
assertSuccess(t, stream, Readable);
});
}

test('Can call twice at the same time', async t => {
const stream = Readable.from(fixtureMultiString);
const [result, secondResult] = await Promise.all([
Expand Down