Skip to content

Commit d5d6d5d

Browse files
committed
add maxQueue and overflow options
1 parent 6701104 commit d5d6d5d

File tree

1 file changed

+34
-11
lines changed

1 file changed

+34
-11
lines changed

lib/internal/fs/watchers.js

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
'use strict';
22

33
const {
4+
ArrayPrototypePush,
5+
ArrayPrototypeShift,
46
FunctionPrototypeCall,
57
ObjectDefineProperty,
8+
ObjectPrototypeHasOwnProperty,
69
ObjectSetPrototypeOf,
710
PromiseWithResolvers,
811
Symbol,
@@ -14,13 +17,13 @@ const {
1417
codes: {
1518
ERR_INVALID_ARG_VALUE,
1619
},
20+
genericNodeError,
1721
} = require('internal/errors');
22+
1823
const {
1924
kEmptyObject,
2025
} = require('internal/util');
2126

22-
const FixedQueue = require('internal/fixed_queue');
23-
2427
const {
2528
kFsStatsFieldsNumber,
2629
StatWatcher: _StatWatcher,
@@ -47,6 +50,8 @@ const {
4750
validateBoolean,
4851
validateObject,
4952
validateUint32,
53+
validateInteger,
54+
validateOneOf,
5055
} = require('internal/validators');
5156

5257
const {
@@ -311,11 +316,15 @@ async function* watch(filename, options = kEmptyObject) {
311316
persistent = true,
312317
recursive = false,
313318
encoding = 'utf8',
319+
maxQueue = 2048,
320+
overflow = 'swallow',
314321
signal,
315322
} = options;
316323

317324
validateBoolean(persistent, 'options.persistent');
318325
validateBoolean(recursive, 'options.recursive');
326+
validateInteger(maxQueue, 'options.maxQueue');
327+
validateOneOf(overflow, 'options.overflow', ['swallow', 'throw']);
319328
validateAbortSignal(signal, 'options.signal');
320329

321330
if (encoding && !isEncoding(encoding)) {
@@ -328,7 +337,7 @@ async function* watch(filename, options = kEmptyObject) {
328337

329338
const handle = new FSEvent();
330339
let { promise, resolve } = PromiseWithResolvers();
331-
const queue = new FixedQueue();
340+
const queue = [];
332341
const oncancel = () => {
333342
handle.close();
334343
resolve();
@@ -348,12 +357,21 @@ async function* watch(filename, options = kEmptyObject) {
348357
});
349358
error.filename = filename;
350359
handle.close();
351-
queue.push(error);
360+
ArrayPrototypePush(queue, error);
352361
resolve();
353362
return;
354363
}
355-
queue.push({ eventType, filename });
356-
resolve();
364+
if (queue.length < maxQueue) {
365+
ArrayPrototypePush(queue, { eventType, filename });
366+
resolve();
367+
} else if (overflow === 'throw') {
368+
const overflowError = genericNodeError('fs.watch maxQueue exceeded');
369+
queue.length = 0;
370+
ArrayPrototypePush(queue, overflowError);
371+
resolve();
372+
} else {
373+
process.emitWarning('fs.watch maxQueue exceeded');
374+
}
357375
};
358376

359377
const err = handle.start(path, persistent, recursive, encoding);
@@ -372,13 +390,18 @@ async function* watch(filename, options = kEmptyObject) {
372390

373391
while (!signal?.aborted) {
374392
await promise;
375-
while (!queue.isEmpty()) {
376-
const item = queue.shift();
377-
if (item instanceof UVException) throw item;
378-
yield item;
393+
while (queue.length) {
394+
const item = ArrayPrototypeShift(queue);
395+
if (
396+
ObjectPrototypeHasOwnProperty(item, 'eventType') &&
397+
ObjectPrototypeHasOwnProperty(item, 'filename')
398+
) {
399+
yield item;
400+
} else {
401+
throw item;
402+
}
379403
}
380404
({ promise, resolve } = PromiseWithResolvers());
381-
382405
}
383406
if (signal?.aborted) {
384407
throw new AbortError(undefined, { cause: signal?.reason });

0 commit comments

Comments
 (0)