Skip to content

stream: Add option to Readable.take operator to not close the stream #46980

@rluvaton

Description

@rluvaton

What is the problem this feature will solve?

I will be able to do this:

const csvParsedStream = fs
	.createReadStream('file.csv')
	.compose(csvParse({ columns: false }));

const [columns] = await csvParsedStream
	.take(1)
	.toArray();

// This will now be empty and no data as take already consumed the stream
const parsed = await csvParsedStream
   .map((row) => parseRowByColumns(row, columns))
   .toArray();

Another example (I know I can use [first, ...rest] this is just an example):

const a = Readable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
const [first] = await a.take(1).toArray();

console.log(first);
// [1]

const rest = await a.toArray();
console.log(rest)
// []

What is the feature you are proposing to solve the problem?

Adding closeStream option to the take operator that with default value true that I could disable closing the stream

const csvParsedStream = fs
	.createReadStream('file.csv')
	.compose(csvParse({ columns: false }));

const [columns] = await csvParsedStream
	.take(1, { closeStream: false })  // Right now this would close the stream, but we give it an option to not
	.toArray();

const parsed = await csvParsedStream.map((row) => parseRowByColumns(row, columns)).toArray();

What alternatives have you considered?

Get the first value from stream as async itarator and rest

let columns;
for await (const c of csvParsedStream.iterator<string[]>({ destroyOnReturn: false })) {
	columns = c;

	break;
}

const parsed = await csvParsedStream
    .map((row) => parseRowByColumns(row, columns))
    .toArray();

Metadata

Metadata

Assignees

No one assigned

    Labels

    feature requestIssues that request new features to be added to Node.js.streamIssues and PRs related to the stream subsystem.

    Type

    No type

    Projects

    Status

    Awaiting Triage

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions