Skip to content

fs: add support for async iterators to fs.writeFile #38525

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -3879,6 +3879,9 @@ details.
<!-- YAML
added: v0.1.29
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/38525
description: add support for async iterators to `fs.writeFile`.
- version: v16.0.0
pr-url: https://github.com/nodejs/node/pull/37460
description: The error returned may be an `AggregateError` if more than one
Expand Down Expand Up @@ -3916,7 +3919,8 @@ changes:
-->

* `file` {string|Buffer|URL|integer} filename or file descriptor
* `data` {string|Buffer|TypedArray|DataView|Object}
* `data` {string|Buffer|TypedArray|DataView|Object
|AsyncIterable|Iterable|Stream}
* `options` {Object|string}
* `encoding` {string|null} **Default:** `'utf8'`
* `mode` {integer} **Default:** `0o666`
Expand Down
72 changes: 57 additions & 15 deletions lib/fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const {
StringPrototypeCharCodeAt,
StringPrototypeIndexOf,
StringPrototypeSlice,
SymbolAsyncIterator,
SymbolIterator,
} = primordials;

const { fs: constants } = internalBinding('constants');
Expand Down Expand Up @@ -85,6 +87,7 @@ const {
const { FSReqCallback } = binding;
const { toPathIfFileURL } = require('internal/url');
const internalUtil = require('internal/util');
const { isCustomIterable } = require('internal/streams/utils');
const {
constants: {
kIoMaxLength,
Expand Down Expand Up @@ -828,12 +831,12 @@ function write(fd, buffer, offset, length, position, callback) {
} else {
position = length;
}
length = 'utf8';
length = length || 'utf8';
}

const str = String(buffer);
validateEncoding(str, length);
callback = maybeCallback(position);
callback = maybeCallback(callback || position);

const req = new FSReqCallback();
req.oncomplete = wrapper;
Expand Down Expand Up @@ -2039,7 +2042,8 @@ function lutimesSync(path, atime, mtime) {
handleErrorFromBinding(ctx);
}

function writeAll(fd, isUserFd, buffer, offset, length, signal, callback) {
function writeAll(
fd, isUserFd, buffer, offset, length, signal, encoding, callback) {
if (signal?.aborted) {
const abortError = new AbortError();
if (isUserFd) {
Expand All @@ -2051,16 +2055,16 @@ function writeAll(fd, isUserFd, buffer, offset, length, signal, callback) {
}
return;
}
// write(fd, buffer, offset, length, position, callback)

if (isCustomIterable(buffer)) {
writeAllCustomIterable(
fd, isUserFd, buffer, offset, length, signal, encoding, callback)
.catch((reason) => { throw reason; });
return;
}
fs.write(fd, buffer, offset, length, null, (writeErr, written) => {
if (writeErr) {
if (isUserFd) {
callback(writeErr);
} else {
fs.close(fd, (err) => {
callback(aggregateTwoErrors(err, writeErr));
});
}
handleWriteAllErrorCallback(fd, isUserFd, writeErr, callback);
} else if (written === length) {
if (isUserFd) {
callback(null);
Expand All @@ -2070,11 +2074,43 @@ function writeAll(fd, isUserFd, buffer, offset, length, signal, callback) {
} else {
offset += written;
length -= written;
writeAll(fd, isUserFd, buffer, offset, length, signal, callback);
writeAll(
fd, isUserFd, buffer, offset, length, signal, encoding, callback);
}
});
}

async function writeAllCustomIterable(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not use async with a callback. Mixing those two can only lead to bugs. I would recommend having a promisified version of fs.write() and just use async/await.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcollina Thanks for your suggestion 👍 . I'm curious about the advantages of having a promisified version of fs.write() and using async/await. Could you elaborate on why this approach might be better than mixing async with a callback? I'm interested in understanding the potential benefits and how it could prevent bugs.

fd, isUserFd, buffer, offset, length, signal, encoding, callback) {
const result = await buffer.next();
if (result.done) {
fs.close(fd, callback);
return;
}
const resultValue = result.value.toString();
fs.write(fd, resultValue, undefined,
isArrayBufferView(buffer) ? resultValue.byteLength : encoding,
null, (writeErr, _) => {
if (writeErr) {
handleWriteAllErrorCallback(fd, isUserFd, writeErr, callback);
} else {
writeAll(fd, isUserFd, buffer, offset,
Copy link
Contributor

@Linkgoron Linkgoron May 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that there's a mistake here that also exists in the fs/promises version, where the code assumes that the whole chunk has been written but it might have only been partially written.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think that it makes more sense to call writeAllCustomIterable instead of writeAll - though in that case, it would make sense to add an abort signal check to the start of the method.

length, signal, encoding, callback);
}
}
);
}

function handleWriteAllErrorCallback(fd, isUserFd, writeErr, callback) {
if (isUserFd) {
callback(writeErr);
} else {
fs.close(fd, (err) => {
callback(aggregateTwoErrors(err, writeErr));
});
}
}

/**
* Asynchronously writes data to the file.
* @param {string | Buffer | URL | number} path
Expand All @@ -2093,15 +2129,20 @@ function writeFile(path, data, options, callback) {
options = getOptions(options, { encoding: 'utf8', mode: 0o666, flag: 'w' });
const flag = options.flag || 'w';

if (!isArrayBufferView(data)) {
if (!isArrayBufferView(data) && !isCustomIterable(data)) {
validateStringAfterArrayBufferView(data, 'data');
data = Buffer.from(String(data), options.encoding || 'utf8');
}

if (isCustomIterable(data)) {
data = data[SymbolIterator]?.() ?? data[SymbolAsyncIterator]?.();
}

if (isFd(path)) {
const isUserFd = true;
const signal = options.signal;
writeAll(path, isUserFd, data, 0, data.byteLength, signal, callback);
writeAll(path, isUserFd, data,
0, data.byteLength, signal, options.encoding, callback);
return;
}

Expand All @@ -2114,7 +2155,8 @@ function writeFile(path, data, options, callback) {
} else {
const isUserFd = false;
const signal = options.signal;
writeAll(fd, isUserFd, data, 0, data.byteLength, signal, callback);
writeAll(fd, isUserFd, data,
0, data.byteLength, signal, options.encoding, callback);
}
});
}
Expand Down
6 changes: 1 addition & 5 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ const pathModule = require('path');
const { promisify } = require('internal/util');
const { EventEmitterMixin } = require('internal/event_target');
const { watch } = require('internal/fs/watchers');
const { isIterable } = require('internal/streams/utils');
const { isCustomIterable } = require('internal/streams/utils');
const assert = require('internal/assert');

const kHandle = Symbol('kHandle');
Expand Down Expand Up @@ -730,10 +730,6 @@ async function writeFile(path, data, options) {
writeFileHandle(fd, data, options.signal, options.encoding), fd.close);
}

function isCustomIterable(obj) {
return isIterable(obj) && !isArrayBufferView(obj) && typeof obj !== 'string';
}

async function appendFile(path, data, options) {
options = getOptions(options, { encoding: 'utf8', mode: 0o666, flag: 'a' });
options = copyObject(options);
Expand Down
6 changes: 6 additions & 0 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const {
SymbolAsyncIterator,
SymbolIterator,
} = primordials;
const { isArrayBufferView } = require('internal/util/types');

function isReadable(obj) {
return !!(obj && typeof obj.pipe === 'function' &&
Expand All @@ -27,7 +28,12 @@ function isIterable(obj, isAsync) {
typeof obj[SymbolIterator] === 'function';
}

function isCustomIterable(obj) {
return isIterable(obj) && !isArrayBufferView(obj) && typeof obj !== 'string';
}

module.exports = {
isCustomIterable,
isIterable,
isReadable,
isStream,
Expand Down
88 changes: 88 additions & 0 deletions test/parallel/test-fs-write-file-async-iterators.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const fs = require('fs');
const join = require('path').join;
const { Readable } = require('stream');

const tmpdir = require('../common/tmpdir');
tmpdir.refresh();

{
const filenameIterable = join(tmpdir.path, 'testIterable.txt');
const iterable = {
expected: 'abc',
*[Symbol.iterator]() {
yield 'a';
yield 'b';
yield 'c';
}
};

fs.writeFile(filenameIterable, iterable, common.mustSucceed(() => {
const data = fs.readFileSync(filenameIterable, 'utf-8');
assert.strictEqual(iterable.expected, data);
}));
}

{
const filenameBufferIterable = join(tmpdir.path, 'testBufferIterable.txt');
const bufferIterable = {
expected: 'abc',
*[Symbol.iterator]() {
yield Buffer.from('a');
yield Buffer.from('b');
yield Buffer.from('c');
}
};

fs.writeFile(
filenameBufferIterable, bufferIterable, common.mustSucceed(() => {
const data = fs.readFileSync(filenameBufferIterable, 'utf-8');
assert.strictEqual(bufferIterable.expected, data);
})
);
}


{
const filenameAsyncIterable = join(tmpdir.path, 'testAsyncIterable.txt');
const asyncIterable = {
expected: 'abc',
*[Symbol.asyncIterator]() {
yield 'a';
yield 'b';
yield 'c';
}
};

fs.writeFile(filenameAsyncIterable, asyncIterable, common.mustSucceed(() => {
const data = fs.readFileSync(filenameAsyncIterable, 'utf-8');
assert.strictEqual(asyncIterable.expected, data);
}));
}

{
const filenameStream = join(tmpdir.path, 'testStream.txt');
const stream = Readable.from(['a', 'b', 'c']);
const expected = 'abc';

fs.writeFile(filenameStream, stream, common.mustSucceed(() => {
const data = fs.readFileSync(filenameStream, 'utf-8');
assert.strictEqual(expected, data);
}));
}

{
const filenameStreamWithEncoding =
join(tmpdir.path, 'testStreamWithEncoding.txt');
const stream = Readable.from(['ümlaut', ' ', 'sechzig']);
const expected = 'ümlaut sechzig';

fs.writeFile(
filenameStreamWithEncoding, stream, 'latin1', common.mustSucceed(() => {
const data = fs.readFileSync(filenameStreamWithEncoding, 'latin1');
assert.strictEqual(expected, data);
})
);
}