Skip to content

Commit 1168f81

Browse files
committed
NSFS | Replace ChunkFS with FileWriter
Signed-off-by: Romy <[email protected]>
1 parent c03e592 commit 1168f81

File tree

8 files changed

+203
-165
lines changed

8 files changed

+203
-165
lines changed

config.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -882,6 +882,8 @@ config.NSFS_LOW_FREE_SPACE_PERCENT_UNLEASH = 0.10;
882882
// anonymous account name
883883
config.ANONYMOUS_ACCOUNT_NAME = 'anonymous';
884884

885+
config.NSFS_UPLOAD_STREAM_MAX_BUFFER_SIZE = 64 * 1024 * 1024;
886+
885887
////////////////////////////
886888
// NSFS NON CONTAINERIZED //
887889
////////////////////////////

src/sdk/namespace_fs.js

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const stream_utils = require('../util/stream_utils');
1919
const buffer_utils = require('../util/buffer_utils');
2020
const size_utils = require('../util/size_utils');
2121
const native_fs_utils = require('../util/native_fs_utils');
22-
const ChunkFS = require('../util/chunk_fs');
22+
const FileWriter = require('../util/file_writer');
2323
const LRUCache = require('../util/lru_cache');
2424
const nb_native = require('../util/nb_native');
2525
const RpcError = require('../rpc/rpc_error');
@@ -1564,36 +1564,46 @@ class NamespaceFS {
15641564
// Can be finetuned further on if needed and inserting the Semaphore logic inside
15651565
// Instead of wrapping the whole _upload_stream function (q_buffers lives outside of the data scope of the stream)
15661566
async _upload_stream({ fs_context, params, target_file, object_sdk, offset }) {
1567-
const { source_stream, copy_source } = params;
1567+
const { copy_source } = params;
15681568
try {
1569-
// Not using async iterators with ReadableStreams due to unsettled promises issues on abort/destroy
1570-
const md5_enabled = this._is_force_md5_enabled(object_sdk);
1571-
const chunk_fs = new ChunkFS({
1569+
const md5_enabled =
1570+
config.NSFS_CALCULATE_MD5 ||
1571+
this.force_md5_etag ||
1572+
object_sdk?.requesting_account?.force_md5_etag;
1573+
const file_writer = new FileWriter({
15721574
target_file,
15731575
fs_context,
1574-
stats: this.stats,
1575-
namespace_resource_id: this.namespace_resource_id,
1576-
md5_enabled,
15771576
offset,
1577+
md5_enabled,
1578+
stats: this.stats,
15781579
bucket: params.bucket,
1579-
large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size
1580+
large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size,
1581+
namespace_resource_id: this.namespace_resource_id,
15801582
});
1581-
chunk_fs.on('error', err1 => dbg.error('namespace_fs._upload_stream: error occured on stream ChunkFS: ', err1));
1582-
chunk_fs.on('finish', arg => dbg.error('namespace_fs._upload_stream: finish occured on stream ChunkFS: ', arg));
1583-
chunk_fs.on('close', arg => dbg.error('namespace_fs._upload_stream: close occured on stream ChunkFS: ', arg));
1583+
file_writer.on('error', err => dbg.error('namespace_fs._upload_stream: error occured on FileWriter: ', err));
1584+
file_writer.on('finish', arg => dbg.error('namespace_fs._upload_stream: finish occured on stream FileWriter: ', arg));
1585+
file_writer.on('close', arg => dbg.error('namespace_fs._upload_stream: close occured on stream FileWriter: ', arg));
1586+
1587+
const parser = params.source_stream?.socket?.parser;
1588+
if (parser) {
1589+
const { HTTPParser } = process.binding('http_parser');
1590+
parser[HTTPParser.kOnStreamAlloc] = length => {
1591+
const bp = multi_buffer_pool.get_buffers_pool(length);
1592+
const { buffer, callback } = bp.get_buffer_nowait();
1593+
// @ts-ignore
1594+
buffer.buffer_pool_release_callback = callback;
1595+
return buffer;
1596+
};
1597+
}
15841598
if (copy_source) {
1585-
// ChunkFS is a Transform stream, however read_object_stream expects a write stream. call resume to close the read part
1586-
// we need to close both read and write parts for Transform stream to properly close and release resorces
1587-
chunk_fs.resume();
1588-
await this.read_object_stream(copy_source, object_sdk, chunk_fs);
1599+
await this.read_object_stream(copy_source, object_sdk, file_writer);
15891600
} else if (params.source_params) {
1590-
chunk_fs.resume();
1591-
await params.source_ns.read_object_stream(params.source_params, object_sdk, chunk_fs);
1601+
await params.source_ns.read_object_stream(params.source_params, object_sdk, file_writer);
15921602
} else {
1593-
await stream_utils.pipeline([source_stream, chunk_fs]);
1594-
await stream_utils.wait_finished(chunk_fs);
1603+
await stream_utils.pipeline([params.source_stream, file_writer]);
1604+
await stream_utils.wait_finished(file_writer);
15951605
}
1596-
return { digest: chunk_fs.digest, total_bytes: chunk_fs.total_bytes };
1606+
return { digest: file_writer.digest, total_bytes: file_writer.total_bytes };
15971607
} catch (error) {
15981608
dbg.error('_upload_stream had error: ', error);
15991609
throw error;

src/test/unit_tests/index.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ require('./test_bucket_chunks_builder');
5757
require('./test_mirror_writer');
5858
require('./test_namespace_fs');
5959
require('./test_ns_list_objects');
60-
require('./test_chunk_fs');
60+
require('./test_file_writer');
6161
require('./test_namespace_fs_mpu');
6262
require('./test_nb_native_fs');
6363
require('./test_s3select');

src/test/unit_tests/nc_index.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ coretest.setup();
77

88
require('./test_namespace_fs');
99
require('./test_ns_list_objects');
10-
require('./test_chunk_fs');
10+
require('./test_file_writer');
1111
require('./test_namespace_fs_mpu');
1212
require('./test_nb_native_fs');
1313
require('./test_nc_nsfs_cli');

src/test/unit_tests/test_chunk_fs.js renamed to src/test/unit_tests/test_file_writer.js

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,24 @@
33
'use strict';
44

55
const mocha = require('mocha');
6-
const chunk_fs_hashing = require('../../tools/chunk_fs_hashing');
6+
const file_writer_hashing = require('../../tools/file_writer_hashing');
77

8-
mocha.describe('ChunkFS', function() {
8+
mocha.describe('FileWriter', function() {
99
const RUN_TIMEOUT = 10 * 60 * 1000;
1010

11-
mocha.it('Concurrent ChunkFS with hash target', async function() {
11+
mocha.it('Concurrent FileWriter with hash target', async function() {
1212
const self = this;
1313
self.timeout(RUN_TIMEOUT);
14-
await chunk_fs_hashing.hash_target();
14+
await file_writer_hashing.hash_target();
1515
});
1616

17-
mocha.it('Concurrent ChunkFS with file target', async function() {
17+
mocha.it('Concurrent FileWriter with file target', async function() {
1818
const self = this;
1919
self.timeout(RUN_TIMEOUT);
20-
await chunk_fs_hashing.file_target();
20+
await file_writer_hashing.file_target();
2121
});
2222

23-
mocha.it('Concurrent ChunkFS with file target - produce num_chunks > 1024 && total_chunks_size < config.NSFS_BUF_SIZE_L', async function() {
23+
mocha.it('Concurrent FileWriter with file target - produce num_chunks > 1024 && total_chunks_size < config.NSFS_BUF_SIZE_L', async function() {
2424
const self = this;
2525
self.timeout(RUN_TIMEOUT);
2626
// The goal of this test is to produce num_chunks > 1024 && total_chunks_size < config.NSFS_BUF_SIZE_L
@@ -29,6 +29,6 @@ mocha.describe('ChunkFS', function() {
2929
// chunk size = 100, total_chunks_size after having 1024 chunks is = 100 * 1024 < config.NSFS_BUF_SIZE_L
3030
const chunk_size = 100;
3131
const parts_s = 50;
32-
await chunk_fs_hashing.file_target(chunk_size, parts_s);
32+
await file_writer_hashing.file_target(chunk_size, parts_s);
3333
});
3434
});

src/tools/chunk_fs_hashing.js renamed to src/tools/file_writer_hashing.js

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
const crypto = require('crypto');
55
const assert = require('assert');
6-
const ChunkFS = require('../util/chunk_fs');
6+
const FileWriter = require('../util/file_writer');
77
const config = require('../../config');
88
const nb_native = require('../util/nb_native');
99
const stream_utils = require('../util/stream_utils');
@@ -19,7 +19,7 @@ const PARTS = Number(argv.parts) || 1000;
1919
const CONCURRENCY = Number(argv.concurrency) || 20;
2020
const CHUNK = Number(argv.chunk) || 16 * 1024;
2121
const PART_SIZE = Number(argv.part_size) || 20 * 1024 * 1024;
22-
const F_PREFIX = argv.dst_folder || '/tmp/chunk_fs_hashing/';
22+
const F_PREFIX = argv.dst_folder || '/tmp/file_writer_hashing/';
2323

2424
const DEFAULT_FS_CONFIG = {
2525
uid: Number(argv.uid) || process.getuid(),
@@ -28,12 +28,6 @@ const DEFAULT_FS_CONFIG = {
2828
warn_threshold_ms: 100,
2929
};
3030

31-
const DUMMY_RPC = {
32-
object: {
33-
update_endpoint_stats: (...params) => null
34-
}
35-
};
36-
3731
const XATTR_USER_PREFIX = 'user.';
3832
// TODO: In order to verify validity add content_md5_mtime as well
3933
const XATTR_MD5_KEY = XATTR_USER_PREFIX + 'content_md5';
@@ -75,25 +69,24 @@ async function hash_target() {
7569
}
7670
}());
7771
const target = new TargetHash();
78-
const chunk_fs = new ChunkFS({
72+
const file_writer = new FileWriter({
7973
target_file: target,
8074
fs_context: DEFAULT_FS_CONFIG,
81-
rpc_client: DUMMY_RPC,
8275
namespace_resource_id: 'MajesticSloth'
8376
});
84-
await stream_utils.pipeline([source_stream, chunk_fs]);
85-
await stream_utils.wait_finished(chunk_fs);
77+
await stream_utils.pipeline([source_stream, file_writer]);
78+
await stream_utils.wait_finished(file_writer);
8679
const write_hash = target.digest();
8780
console.log(
8881
'Hash target',
89-
`NativeMD5=${chunk_fs.digest}`,
82+
`NativeMD5=${file_writer.digest}`,
9083
`DataWriteCryptoMD5=${write_hash}`,
9184
`DataOriginMD5=${content_md5}`,
9285
);
9386
assert.strictEqual(content_md5, write_hash);
9487
if (config.NSFS_CALCULATE_MD5) {
95-
assert.strictEqual(chunk_fs.digest, content_md5);
96-
assert.strictEqual(chunk_fs.digest, write_hash);
88+
assert.strictEqual(file_writer.digest, content_md5);
89+
assert.strictEqual(file_writer.digest, write_hash);
9790
}
9891
});
9992
}
@@ -113,32 +106,31 @@ async function file_target(chunk_size = CHUNK, parts = PARTS) {
113106
yield data.slice(i, i + chunk_size);
114107
}
115108
}());
116-
const chunk_fs = new ChunkFS({
109+
const file_writer = new FileWriter({
117110
target_file,
118111
fs_context: DEFAULT_FS_CONFIG,
119-
rpc_client: DUMMY_RPC,
120112
namespace_resource_id: 'MajesticSloth'
121113
});
122-
await stream_utils.pipeline([source_stream, chunk_fs]);
123-
await stream_utils.wait_finished(chunk_fs);
114+
await stream_utils.pipeline([source_stream, file_writer]);
115+
await stream_utils.wait_finished(file_writer);
124116
if (XATTR) {
125117
await target_file.replacexattr(
126118
DEFAULT_FS_CONFIG,
127-
assign_md5_to_fs_xattr(chunk_fs.digest, {})
119+
assign_md5_to_fs_xattr(file_writer.digest, {})
128120
);
129121
}
130122
if (FSYNC) await target_file.fsync(DEFAULT_FS_CONFIG);
131123
const write_hash = crypto.createHash('md5').update(fs.readFileSync(F_TARGET)).digest('hex');
132124
console.log(
133125
'File target',
134-
`NativeMD5=${chunk_fs.digest}`,
126+
`NativeMD5=${file_writer.digest}`,
135127
`DataWriteMD5=${write_hash}`,
136128
`DataOriginMD5=${content_md5}`,
137129
);
138130
assert.strictEqual(content_md5, write_hash);
139131
if (config.NSFS_CALCULATE_MD5) {
140-
assert.strictEqual(chunk_fs.digest, content_md5);
141-
assert.strictEqual(chunk_fs.digest, write_hash);
132+
assert.strictEqual(file_writer.digest, content_md5);
133+
assert.strictEqual(file_writer.digest, write_hash);
142134
}
143135
// Leave parts on error
144136
fs.rmSync(F_TARGET);

src/util/chunk_fs.js

Lines changed: 0 additions & 111 deletions
This file was deleted.

0 commit comments

Comments
 (0)