-
-
Notifications
You must be signed in to change notification settings - Fork 31
Allow multiple readers at once #121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| import {isReadableStream} from 'is-stream'; | ||
|
|
||
| export const getAsyncIterable = stream => { | ||
| if (isReadableStream(stream, {checkOpen: false})) { | ||
| return getStreamIterable(stream); | ||
| } | ||
|
|
||
| if (typeof stream?.[Symbol.asyncIterator] !== 'function') { | ||
| throw new TypeError('The first argument must be a Readable, a ReadableStream, or an async iterable.'); | ||
| } | ||
|
|
||
| return stream; | ||
| }; | ||
|
|
||
| // The default iterable for Node.js streams does not allow for multiple readers at once, so we re-implement it | ||
| const getStreamIterable = async function * (stream) { | ||
| if (nodeImports === undefined) { | ||
| await loadNodeImports(); | ||
| } | ||
|
|
||
| const controller = new AbortController(); | ||
| const state = {}; | ||
| handleStreamEnd(stream, controller, state); | ||
|
|
||
| try { | ||
| for await (const [chunk] of nodeImports.events.on(stream, 'data', { | ||
| signal: controller.signal, | ||
| highWatermark: stream.readableHighWaterMark, | ||
| })) { | ||
| yield chunk; | ||
| } | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That PR fixes many subtle bugs with
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe target Node.js 20? I think we should target Node.js 20 in Execa anyway. Node.js 18 goes out of maintenance in April, which is not far away.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately, I believe Node 18 is in maintenance one more year: until April 2025. I also initially thought it was out of maintenance next month. Maybe we got this impression because they unsupported Node 16 earlier than usual (was in September 2023 instead of April 2024) due to some security issue.
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ugh. Yeah, I totally misread that...
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Maybe worth adding a TODO comment about how it could be simplified when Node.js 20 can be targeted.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was initially using the To support both Node 18 and 20, I switched to using the So the code can remain the same even after removing support for Node 18.
I did too! I realized this a few weeks ago. I do think that early deprecation of Node 16 might be the cause for our confusion. |
||
| } catch (error) { | ||
| // Stream failure, for example due to `stream.destroy(error)` | ||
| if (state.error !== undefined) { | ||
| throw state.error; | ||
| // `error` event directly emitted on stream | ||
| } else if (!controller.signal.aborted) { | ||
| throw error; | ||
| // Otherwise, stream completed successfully | ||
| } | ||
| // The `finally` block also runs when the caller throws, for example due to the `maxBuffer` option | ||
| } finally { | ||
| stream.destroy(); | ||
| } | ||
| }; | ||
|
|
||
| const handleStreamEnd = async (stream, controller, state) => { | ||
| try { | ||
| await nodeImports.streamPromises.finished(stream, {cleanup: true, readable: true, writable: false, error: false}); | ||
| } catch (error) { | ||
| state.error = error; | ||
| } finally { | ||
| controller.abort(); | ||
| } | ||
| }; | ||
|
|
||
| // Use dynamic imports to support browsers | ||
| const loadNodeImports = async () => { | ||
| const [events, streamPromises] = await Promise.all([ | ||
| import('node:events'), | ||
| import('node:stream/promises'), | ||
| ]); | ||
| nodeImports = {events, streamPromises}; | ||
| }; | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The However, browsers don't have those methods. They also don't need to, since they don't use This PR is currently fixing this using dynamic imports. This is not great. Besides it being hacky, it's got one practical issue which is: when executing This means if another consumer has called One potential solution could be to use static imports and hope that browser users' bundlers ignore/delete imports that start with the Another potential solution could be to have a separate entrypoint for browser users. This might be cleaner and allow us to add more Node-specific and browser-specific, if we ever need to (related: #116). I am really not sure what the best approach is here. What are your thoughts about this?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might also combine the last two solutions?
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unlikely scenario, but regardless, I think using separate browser and node exports in package.json may be the best solution in this case.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good.
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
|
|
||
| let nodeImports; | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
highWatermarkoption is undocumented but it is quite nifty. It was added by nodejs/node#41276 forreadlineInterface.I have opened nodejs/node#52078 since I'm not sure it was left undocumented on purpose.
Regardless, even if the option disappeared, this option is only meant to reduce memory consumption (by automatically pausing/resuming the stream when too much data is buffering), so it would not break the usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently, they are already working on it! 🎉
nodejs/node#52080