diff --git a/lib/events.js b/lib/events.js index f722b17aecae0d..57dc6a47614b07 100644 --- a/lib/events.js +++ b/lib/events.js @@ -1085,6 +1085,7 @@ function on(emitter, event, options) { error = err; eventTargetAgnosticRemoveListener(emitter, event, eventHandler); eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler); + return PromiseResolve(createIterResult(undefined, true)); }, [SymbolAsyncIterator]() { diff --git a/lib/internal/readline/interface.js b/lib/internal/readline/interface.js index 5c9cb94ced5817..2f15d0f21d12d6 100644 --- a/lib/internal/readline/interface.js +++ b/lib/internal/readline/interface.js @@ -50,7 +50,7 @@ const { getStringWidth, stripVTControlCharacters, } = require('internal/util/inspect'); -const EventEmitter = require('events'); +const { EventEmitter } = require('events'); const { charLengthAt, charLengthLeft, @@ -140,6 +140,41 @@ function InterfaceConstructor(input, output, completer, terminal) { FunctionPrototypeCall(EventEmitter, this); + { + if (Readable === undefined) { + Readable = require('stream').Readable; + } + const readable = new Readable({ + objectMode: true, + read: () => { + this.resume(); + }, + destroy: (err, cb) => { + this.off('line', lineListener); + this.off('close', closeListener); + this.close(); + cb(err); + }, + }); + const lineListener = (input) => { + if (!readable.push(input)) { + // TODO(rexagod): drain to resume flow + this.pause(); + } + }; + const closeListener = () => { + readable.push(null); + }; + const errorListener = (err) => { + readable.destroy(err); + }; + this.on('error', errorListener); + this.on('line', lineListener); + this.on('close', closeListener); + this[kLineObjectStream] = readable; + } + + let history; let historySize; let removeHistoryDuplicates = false; @@ -280,7 +315,6 @@ function InterfaceConstructor(input, output, completer, terminal) { self[kRefreshLine](); } - this[kLineObjectStream] = undefined; input.on('error', onerror); @@ -1324,39 +1358,6 @@ class Interface extends InterfaceConstructor { * @returns {InterfaceAsyncIterator} */ [SymbolAsyncIterator]() { - if (this[kLineObjectStream] === undefined) { - if (Readable === undefined) { - Readable = require('stream').Readable; - } - const readable = new Readable({ - objectMode: true, - read: () => { - this.resume(); - }, - destroy: (err, cb) => { - this.off('line', lineListener); - this.off('close', closeListener); - this.close(); - cb(err); - }, - }); - const lineListener = (input) => { - if (!readable.push(input)) { - // TODO(rexagod): drain to resume flow - this.pause(); - } - }; - const closeListener = () => { - readable.push(null); - }; - const errorListener = (err) => { - readable.destroy(err); - }; - this.on('error', errorListener); - this.on('line', lineListener); - this.on('close', closeListener); - this[kLineObjectStream] = readable; - } return this[kLineObjectStream][SymbolAsyncIterator](); } diff --git a/test/parallel/test-readline-buffers.mjs b/test/parallel/test-readline-buffers.mjs new file mode 100644 index 00000000000000..eb0ac0e557945d --- /dev/null +++ b/test/parallel/test-readline-buffers.mjs @@ -0,0 +1,30 @@ +import '../common/index.mjs'; + +import { createInterface } from 'readline'; +import { Readable } from 'stream'; +import { setImmediate } from 'timers/promises'; +import { deepStrictEqual } from 'assert'; + +const stream = Readable.from(async function* () { + for (let i = 0; i < 50; i++) { + yield new Buffer.from(i + '\r\n', 'utf-8'); + await Promise.resolve(); + } +}(), { objectMode: false }); + +// Promises don't keep the event loop alive so to avoid the process exiting +// below we create an interval; +const interval = setInterval(() => { }, 1000); + +const rl = createInterface({ + input: stream, + crlfDelay: Infinity +}); + +await setImmediate(); +const result = []; +for await (const item of rl) { + result.push(item); +} +deepStrictEqual(result, Object.keys(Array(50).fill())); +clearInterval(interval);