Skip to content

fs: add support for async iterators to fs.writeFile #38525

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -3879,6 +3879,9 @@ details.
<!-- YAML
added: v0.1.29
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/38525
description: add support for async iterators to `fs.writeFile`.
- version: v16.0.0
pr-url: https://github.com/nodejs/node/pull/37460
description: The error returned may be an `AggregateError` if more than one
Expand Down Expand Up @@ -3916,7 +3919,8 @@ changes:
-->

* `file` {string|Buffer|URL|integer} filename or file descriptor
* `data` {string|Buffer|TypedArray|DataView|Object}
* `data` {string|Buffer|TypedArray|DataView|Object
|AsyncIterable|Iterable|Stream}
* `options` {Object|string}
* `encoding` {string|null} **Default:** `'utf8'`
* `mode` {integer} **Default:** `0o666`
Expand Down
122 changes: 100 additions & 22 deletions lib/fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const {
ArrayPrototypePush,
BigIntPrototypeToString,
MathMax,
MathMin,
Number,
ObjectCreate,
ObjectDefineProperties,
Expand All @@ -44,6 +45,8 @@ const {
StringPrototypeCharCodeAt,
StringPrototypeIndexOf,
StringPrototypeSlice,
SymbolAsyncIterator,
SymbolIterator,
} = primordials;

const { fs: constants } = internalBinding('constants');
Expand Down Expand Up @@ -85,10 +88,12 @@ const {
const { FSReqCallback } = binding;
const { toPathIfFileURL } = require('internal/url');
const internalUtil = require('internal/util');
const { isCustomIterable } = require('internal/streams/utils');
const {
constants: {
kIoMaxLength,
kMaxUserId,
kWriteFileMaxChunkSize,
},
copyObject,
Dirent,
Expand Down Expand Up @@ -828,12 +833,12 @@ function write(fd, buffer, offset, length, position, callback) {
} else {
position = length;
}
length = 'utf8';
length = length || 'utf8';
}

const str = String(buffer);
validateEncoding(str, length);
callback = maybeCallback(position);
callback = maybeCallback(callback || position);

const req = new FSReqCallback();
req.oncomplete = wrapper;
Expand Down Expand Up @@ -2039,28 +2044,24 @@ function lutimesSync(path, atime, mtime) {
handleErrorFromBinding(ctx);
}

function writeAll(fd, isUserFd, buffer, offset, length, signal, callback) {
function writeAll(
fd, isUserFd, buffer, offset, length, signal, encoding, callback) {
if (signal?.aborted) {
const abortError = new AbortError();
if (isUserFd) {
callback(abortError);
} else {
fs.close(fd, (err) => {
callback(aggregateTwoErrors(err, abortError));
handleWriteAllErrorCallback(fd, isUserFd, new AbortError(), callback);
return;
}

if (isCustomIterable(buffer)) {
writeAllCustomIterable(
fd, isUserFd, buffer, offset, length, signal, encoding, callback)
.catch((reason) => {
handleWriteAllErrorCallback(fd, isUserFd, reason, callback);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would schedule this via process.nextTick. If this throws synchronously for whatever reason, it will lead to an unhandled rejection.

});
}
return;
}
// write(fd, buffer, offset, length, position, callback)
fs.write(fd, buffer, offset, length, null, (writeErr, written) => {
if (writeErr) {
if (isUserFd) {
callback(writeErr);
} else {
fs.close(fd, (err) => {
callback(aggregateTwoErrors(err, writeErr));
});
}
handleWriteAllErrorCallback(fd, isUserFd, writeErr, callback);
} else if (written === length) {
if (isUserFd) {
callback(null);
Expand All @@ -2070,11 +2071,82 @@ function writeAll(fd, isUserFd, buffer, offset, length, signal, callback) {
} else {
offset += written;
length -= written;
writeAll(fd, isUserFd, buffer, offset, length, signal, callback);
writeAll(
fd, isUserFd, buffer, offset, length, signal, encoding, callback);
}
});
}

async function writeAllCustomIterable(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not use async with a callback. Mixing those two can only lead to bugs. I would recommend having a promisified version of fs.write() and just use async/await.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcollina Thanks for your suggestion 👍 . I'm curious about the advantages of having a promisified version of fs.write() and using async/await. Could you elaborate on why this approach might be better than mixing async with a callback? I'm interested in understanding the potential benefits and how it could prevent bugs.

fd, isUserFd, buffer, offset, length, signal, encoding, callback) {
if (signal?.aborted) {
handleWriteAllErrorCallback(fd, isUserFd, new AbortError(), callback);
return;
}

const result = await buffer.next();
if (result.done) {
if (isUserFd) {
callback(null);
} else {
fs.close(fd, callback);
}
return;
}
if (signal?.aborted) {
handleWriteAllErrorCallback(fd, isUserFd, new AbortError(), callback);
return;
}
const resultValue = isArrayBufferView(result.value) ?
result.value : Buffer.from(String(result.value), encoding);
const remaining = resultValue.byteLength;
const writeSize = MathMin(kWriteFileMaxChunkSize, remaining);
fs.write(fd, resultValue, resultValue.byteLength - remaining, writeSize,
null, (writeErr, written) => {
handleWriteAllCustomIterableCallback(
fd, isUserFd, buffer, resultValue,
resultValue.byteLength - remaining, writeSize,
signal, encoding, writeErr, remaining, written, callback);
}
);
}

function handleWriteAllCustomIterableCallback(fd, isUserFd, buffer, resultValue,
offset, length, signal, encoding,
writeErr, remaining, written,
callback) {
if (writeErr) {
handleWriteAllErrorCallback(fd, isUserFd, writeErr, callback);
return;
}

remaining -= written;
if (remaining > 0) {
const writeSize = MathMin(kWriteFileMaxChunkSize, remaining);
fs.write(fd, resultValue,
resultValue.byteLength - remaining, writeSize,
null, (writeErr, written) => {
handleWriteAllCustomIterableCallback(
fd, isUserFd, buffer, resultValue, offset, length,
signal, encoding, writeErr, remaining, written, callback);
});
return;
}

writeAllCustomIterable(
fd, isUserFd, buffer, offset, length, signal, encoding, callback);
}

function handleWriteAllErrorCallback(fd, isUserFd, writeErr, callback) {
if (isUserFd) {
callback(writeErr);
} else {
fs.close(fd, (err) => {
callback(aggregateTwoErrors(err, writeErr));
});
}
}

/**
* Asynchronously writes data to the file.
* @param {string | Buffer | URL | number} path
Expand All @@ -2093,15 +2165,20 @@ function writeFile(path, data, options, callback) {
options = getOptions(options, { encoding: 'utf8', mode: 0o666, flag: 'w' });
const flag = options.flag || 'w';

if (!isArrayBufferView(data)) {
if (!isArrayBufferView(data) && !isCustomIterable(data)) {
validateStringAfterArrayBufferView(data, 'data');
data = Buffer.from(String(data), options.encoding || 'utf8');
}

if (isCustomIterable(data)) {
data = data[SymbolIterator]?.() ?? data[SymbolAsyncIterator]?.();
}

if (isFd(path)) {
const isUserFd = true;
const signal = options.signal;
writeAll(path, isUserFd, data, 0, data.byteLength, signal, callback);
writeAll(path, isUserFd, data,
0, data.byteLength, signal, options.encoding, callback);
return;
}

Expand All @@ -2114,7 +2191,8 @@ function writeFile(path, data, options, callback) {
} else {
const isUserFd = false;
const signal = options.signal;
writeAll(fd, isUserFd, data, 0, data.byteLength, signal, callback);
writeAll(fd, isUserFd, data,
0, data.byteLength, signal, options.encoding, callback);
}
});
}
Expand Down
6 changes: 1 addition & 5 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ const pathModule = require('path');
const { promisify } = require('internal/util');
const { EventEmitterMixin } = require('internal/event_target');
const { watch } = require('internal/fs/watchers');
const { isIterable } = require('internal/streams/utils');
const { isCustomIterable } = require('internal/streams/utils');
const assert = require('internal/assert');

const kHandle = Symbol('kHandle');
Expand Down Expand Up @@ -730,10 +730,6 @@ async function writeFile(path, data, options) {
writeFileHandle(fd, data, options.signal, options.encoding), fd.close);
}

function isCustomIterable(obj) {
return isIterable(obj) && !isArrayBufferView(obj) && typeof obj !== 'string';
}

async function appendFile(path, data, options) {
options = getOptions(options, { encoding: 'utf8', mode: 0o666, flag: 'a' });
options = copyObject(options);
Expand Down
6 changes: 6 additions & 0 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const {
SymbolAsyncIterator,
SymbolIterator,
} = primordials;
const { isArrayBufferView } = require('internal/util/types');

function isReadable(obj) {
return !!(obj && typeof obj.pipe === 'function' &&
Expand All @@ -27,7 +28,12 @@ function isIterable(obj, isAsync) {
typeof obj[SymbolIterator] === 'function';
}

function isCustomIterable(obj) {
return isIterable(obj) && !isArrayBufferView(obj) && typeof obj !== 'string';
}

module.exports = {
isCustomIterable,
isIterable,
isReadable,
isStream,
Expand Down
Loading