Skip to content

fs: allow correct handling of burst in fs-events with AsyncIterator #58490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,13 @@ Path is a directory.
An attempt has been made to read a file whose size is larger than the maximum
allowed size for a `Buffer`.

<a id="ERR_FS_WATCH_QUEUE_OVERFLOW"></a>

### `ERR_FS_WATCH_QUEUE_OVERFLOW`

The number of file system events queued without being handled exceeded the size specified in
`maxQueue` in `fs.watch()`.

<a id="ERR_HTTP2_ALTSVC_INVALID_ORIGIN"></a>

### `ERR_HTTP2_ALTSVC_INVALID_ORIGIN`
Expand Down
5 changes: 5 additions & 0 deletions doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1797,6 +1797,11 @@ added:
filename passed to the listener. **Default:** `'utf8'`.
* `signal` {AbortSignal} An {AbortSignal} used to signal when the watcher
should stop.
* `maxQueue` {number} Specifies the number of events to queue between iterations
of the {AsyncIterator} returned. **Default:** `2048`.
* `overflow` {string} Either `'ignore'` or `'throw'` when there are more events to be
queued than `maxQueue` allows. `'ignore'` means overflow events are dropped and a
warning is emitted, while `'throw'` means to throw an exception. **Default:** `'ignore'`.
* Returns: {AsyncIterator} of objects with the properties:
* `eventType` {string} The type of change
* `filename` {string|Buffer|null} The name of the file changed.
Expand Down
1 change: 1 addition & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,7 @@ E('ERR_FS_CP_SYMLINK_TO_SUBDIRECTORY',
E('ERR_FS_CP_UNKNOWN', 'Cannot copy an unknown file type', SystemError);
E('ERR_FS_EISDIR', 'Path is a directory', SystemError, HideStackFramesError);
E('ERR_FS_FILE_TOO_LARGE', 'File size (%s) is greater than 2 GiB', RangeError);
E('ERR_FS_WATCH_QUEUE_OVERFLOW', 'fs.watch() queued more than %d events', Error);
E('ERR_HTTP2_ALTSVC_INVALID_ORIGIN',
'HTTP/2 ALTSVC frames require a valid origin', TypeError);
E('ERR_HTTP2_ALTSVC_LENGTH',
Expand Down
47 changes: 39 additions & 8 deletions lib/internal/fs/watchers.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
'use strict';

const {
ArrayPrototypePush,
ArrayPrototypeShift,
Error,
FunctionPrototypeCall,
ObjectDefineProperty,
ObjectSetPrototypeOf,
Expand All @@ -12,9 +15,11 @@ const {
AbortError,
UVException,
codes: {
ERR_FS_WATCH_QUEUE_OVERFLOW,
ERR_INVALID_ARG_VALUE,
},
} = require('internal/errors');

const {
kEmptyObject,
} = require('internal/util');
Expand Down Expand Up @@ -45,6 +50,8 @@ const {
validateBoolean,
validateObject,
validateUint32,
validateInteger,
validateOneOf,
} = require('internal/validators');

const {
Expand Down Expand Up @@ -309,11 +316,15 @@ async function* watch(filename, options = kEmptyObject) {
persistent = true,
recursive = false,
encoding = 'utf8',
maxQueue = 2048,
overflow = 'ignore',
signal,
} = options;

validateBoolean(persistent, 'options.persistent');
validateBoolean(recursive, 'options.recursive');
validateInteger(maxQueue, 'options.maxQueue');
validateOneOf(overflow, 'options.overflow', ['ignore', 'error']);
validateAbortSignal(signal, 'options.signal');

if (encoding && !isEncoding(encoding)) {
Expand All @@ -325,10 +336,11 @@ async function* watch(filename, options = kEmptyObject) {
throw new AbortError(undefined, { cause: signal.reason });

const handle = new FSEvent();
let { promise, resolve, reject } = PromiseWithResolvers();
let { promise, resolve } = PromiseWithResolvers();
const queue = [];
const oncancel = () => {
handle.close();
reject(new AbortError(undefined, { cause: signal?.reason }));
resolve();
};

try {
Expand All @@ -345,11 +357,20 @@ async function* watch(filename, options = kEmptyObject) {
});
error.filename = filename;
handle.close();
reject(error);
ArrayPrototypePush(queue, error);
resolve();
return;
}

resolve({ eventType, filename });
if (queue.length < maxQueue) {
ArrayPrototypePush(queue, { __proto__: null, eventType, filename });
resolve();
} else if (overflow === 'error') {
queue.length = 0;
ArrayPrototypePush(queue, new ERR_FS_WATCH_QUEUE_OVERFLOW(maxQueue));
resolve();
} else {
process.emitWarning('fs.watch maxQueue exceeded');
}
};

const err = handle.start(path, persistent, recursive, encoding);
Expand All @@ -367,10 +388,20 @@ async function* watch(filename, options = kEmptyObject) {
}

while (!signal?.aborted) {
yield await promise;
({ promise, resolve, reject } = PromiseWithResolvers());
await promise;
while (queue.length) {
const item = ArrayPrototypeShift(queue);
if (item instanceof Error) {
throw item;
} else {
yield item;
}
}
({ promise, resolve } = PromiseWithResolvers());
}
if (signal?.aborted) {
throw new AbortError(undefined, { cause: signal?.reason });
}
throw new AbortError(undefined, { cause: signal?.reason });
} finally {
handle.close();
signal?.removeEventListener('abort', oncancel);
Expand Down
3 changes: 3 additions & 0 deletions test/parallel/parallel.status
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ test-domain-throw-error-then-throw-from-uncaught-exception-handler: PASS, FLAKY
test-domain-with-abort-on-uncaught-exception: PASS, FLAKY
# https://github.com/nodejs/node/issues/54346
test-esm-loader-hooks-inspect-wait: PASS, FLAKY
test-fs-promises-watch-iterator: SKIP
# https://github.com/nodejs/node/issues/50050
test-tick-processor-arguments: SKIP
# https://github.com/nodejs/node/issues/54534
Expand All @@ -85,6 +86,7 @@ test-runner-run-watch: PASS, FLAKY
[$system==freebsd]
# https://github.com/nodejs/node/issues/54346
test-esm-loader-hooks-inspect-wait: PASS, FLAKY
test-fs-promises-watch-iterator: SKIP

[$system==aix]
# https://github.com/nodejs/node/issues/54346
Expand All @@ -95,6 +97,7 @@ test-esm-loader-hooks-inspect-wait: PASS, FLAKY
test-child-process-fork-net-server: SKIP
test-cli-node-options: SKIP
test-cluster-shared-leak: SKIP
test-fs-promises-watch-iterator: SKIP
test-http-writable-true-after-close: SKIP
test-http2-connect-method: SKIP
test-net-error-twice: SKIP
Expand Down
57 changes: 57 additions & 0 deletions test/parallel/test-fs-promises-watch-iterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
'use strict';
// This tests that when there is a burst of fs watch events, the events
// emitted after the consumer receives the initial event and before the
// control returns back to fs.watch() can be queued up and show up
// in the next iteration.
const common = require('../common');
const { watch, writeFile } = require('fs/promises');
const fs = require('fs');
const assert = require('assert');
const { join } = require('path');
const { setTimeout } = require('timers/promises');
const tmpdir = require('../common/tmpdir');

class WatchTestCase {
constructor(dirName, files) {
this.dirName = dirName;
this.files = files;
}
get dirPath() { return tmpdir.resolve(this.dirName); }
filePath(fileName) { return join(this.dirPath, fileName); }

async run() {
await Promise.all([this.watchFiles(), this.writeFiles()]);
assert(!this.files.length);
}
async watchFiles() {
const watcher = watch(this.dirPath);
for await (const evt of watcher) {
const idx = this.files.indexOf(evt.filename);
if (idx < 0) continue;
this.files.splice(idx, 1);
await setTimeout(common.platformTimeout(100));
if (!this.files.length) break;
}
}
async writeFiles() {
for (const fileName of [...this.files]) {
await writeFile(this.filePath(fileName), Date.now() + fileName.repeat(1e4));
}
await setTimeout(common.platformTimeout(100));
}
}

const kCases = [
// Watch on a directory should callback with a filename on supported systems
new WatchTestCase(
'watch1',
['foo', 'bar', 'baz']
),
];

tmpdir.refresh();

for (const testCase of kCases) {
fs.mkdirSync(testCase.dirPath);
testCase.run().then(common.mustCall());
}
19 changes: 19 additions & 0 deletions test/parallel/test-fs-promises-watch.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,25 @@ assert.rejects(
},
{ code: 'ERR_INVALID_ARG_TYPE' }).then(common.mustCall());

assert.rejects(
async () => {
// eslint-disable-next-line no-unused-vars, no-empty
for await (const _ of watch('', { maxQueue: 'silly' })) { }
},
{ code: 'ERR_INVALID_ARG_TYPE' }).then(common.mustCall());
assert.rejects(
async () => {
// eslint-disable-next-line no-unused-vars, no-empty
for await (const _ of watch('', { overflow: 1 })) { }
},
{ code: 'ERR_INVALID_ARG_VALUE' }).then(common.mustCall());
assert.rejects(
async () => {
// eslint-disable-next-line no-unused-vars, no-empty
for await (const _ of watch('', { overflow: 'barf' })) { }
},
{ code: 'ERR_INVALID_ARG_VALUE' }).then(common.mustCall());

(async () => {
const ac = new AbortController();
const { signal } = ac;
Expand Down
Loading