Skip to content

Commit fe34ee2

Browse files
committed
NC | Concurrency & refactoring | Add delay, version move checks and GPFS refactoring
Signed-off-by: Romy <[email protected]>
1 parent e24e59a commit fe34ee2

File tree

4 files changed

+120
-41
lines changed

4 files changed

+120
-41
lines changed

config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -766,6 +766,7 @@ config.NSFS_SEM_WARNING_TIMEOUT = 10 * 60 * 1000;
766766
// number of rename retries in case of deleted destination directory
767767
config.NSFS_RENAME_RETRIES = 10;
768768
config.NSFS_MKDIR_PATH_RETRIES = 3;
769+
config.NSFS_RANDOM_DELAY_BASE = 70;
769770

770771
config.NSFS_VERSIONING_ENABLED = true;
771772
config.NSFS_UPDATE_ISSUES_REPORT_ENABLED = true;

src/sdk/namespace_fs.js

Lines changed: 79 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const { v4: uuidv4 } = require('uuid');
1212
const P = require('../util/promise');
1313
const dbg = require('../util/debug_module')(__filename);
1414
const config = require('../../config');
15+
const crypto = require('crypto');
1516
const s3_utils = require('../endpoint/s3/s3_utils');
1617
const error_utils = require('../util/error_utils');
1718
const stream_utils = require('../util/stream_utils');
@@ -293,6 +294,16 @@ function to_fs_xattr(xattr) {
293294
return _.mapKeys(xattr, (val, key) => XATTR_USER_PREFIX + key);
294295
}
295296

297+
/**
298+
* get_random_delay returns a random delay number between base + min and max
299+
* @param {number} base
300+
* @param {number} min
301+
* @param {number} max
302+
* @returns {number}
303+
*/
304+
function get_random_delay(base, min, max) {
305+
return base + crypto.randomInt(min, max);
306+
}
296307

297308
/**
298309
* @typedef {{
@@ -1391,9 +1402,10 @@ class NamespaceFS {
13911402
}
13921403

13931404
// 1. get latest version_id
1394-
// 2. if versioning is suspended -
1405+
// 2. if versioning is suspended -
13951406
// 2.1 if version ID of the latest version is null -
1396-
// 2.1.1 remove latest version
1407+
// 2.1.1. if it's POSIX backend - unlink the null version
1408+
// 2.1.2. if it's GPFS backend - nothing to do, the linkatif will override it
13971409
// 2.2 else (version ID of the latest version is unique or there is no latest version) -
13981410
// 2.2.1 remove a version (or delete marker) with null version ID from .versions/ (if exists)
13991411
// 3. if latest version exists -
@@ -1417,10 +1429,8 @@ class NamespaceFS {
14171429
const versioned_path = latest_ver_info && this._get_version_path(key, latest_ver_info.version_id_str);
14181430
const versioned_info = latest_ver_info && await this._get_version_info(fs_context, versioned_path);
14191431

1420-
gpfs_options = is_gpfs ?
1421-
await this._open_files_gpfs(fs_context, new_ver_tmp_path, latest_ver_path, upload_file,
1422-
latest_ver_info, open_mode, undefined, versioned_info) :
1423-
undefined;
1432+
gpfs_options = await this._open_files_gpfs(fs_context, new_ver_tmp_path, latest_ver_path, upload_file,
1433+
latest_ver_info, open_mode, undefined, versioned_info);
14241434
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
14251435
dbg.log1('Namespace_fs._move_to_dest_version:', latest_ver_info, new_ver_info, gpfs_options);
14261436

@@ -1439,10 +1449,10 @@ class NamespaceFS {
14391449
if (latest_ver_info &&
14401450
((this._is_versioning_enabled()) ||
14411451
(this._is_versioning_suspended() && latest_ver_info.version_id_str !== NULL_VERSION_ID))) {
1442-
dbg.log1('NamespaceFS._move_to_dest_version version ID of the latest version is a unique ID - the file will be moved it to .versions/ directory');
1452+
dbg.log1('NamespaceFS._move_to_dest_version version ID of the latet version is a unique ID - the file will be moved it to .versions/ directory');
14431453
await native_fs_utils._make_path_dirs(versioned_path, fs_context);
14441454
await native_fs_utils.safe_move(fs_context, latest_ver_path, versioned_path, latest_ver_info,
1445-
gpfs_options && gpfs_options.move_to_versions, bucket_tmp_dir_path);
1455+
gpfs_options?.move_to_versions, bucket_tmp_dir_path);
14461456
}
14471457
try {
14481458
// move new version to latest_ver_path (key path)
@@ -1460,6 +1470,7 @@ class NamespaceFS {
14601470
dbg.warn(`NamespaceFS._move_to_dest_version retrying retries=${retries} should_retry=${should_retry}` +
14611471
` new_ver_tmp_path=${new_ver_tmp_path} latest_ver_path=${latest_ver_path}`, err);
14621472
if (!should_retry || retries <= 0) throw err;
1473+
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
14631474
} finally {
14641475
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.move_to_dst, open_mode);
14651476
}
@@ -2680,9 +2691,18 @@ class NamespaceFS {
26802691
// if stat failed, undefined will return
26812692
}
26822693

2683-
// 1. if version exists in .versions/ folder - return its path
2684-
// 2. else if version is latest version - return latest version path
2685-
// 3. throw ENOENT error
2694+
/**
2695+
* _find_version_path returns the path of the version
2696+
* 1. if version_id is not defined, it returns the key file
2697+
* 2. else,
2698+
* 2.1. check version format
2699+
* 2.2. check if latest version exists and it matches the versio_id parameter the latest version path returns
2700+
* 2.3. else, return the version path under .versions/
2701+
* @param {import('./nb').NativeFSContext} fs_context
2702+
* @param {{key: string, version_id?: string}} params
2703+
* @param {boolean} [return_md_path]
2704+
* @returns {Promise<string>}
2705+
*/
26862706
async _find_version_path(fs_context, { key, version_id }, return_md_path) {
26872707
const cur_ver_path = return_md_path ? this._get_file_md_path({ key }) : this._get_file_path({ key });
26882708
if (!version_id) return cur_ver_path;
@@ -2754,15 +2774,15 @@ class NamespaceFS {
27542774

27552775
const deleted_latest = file_path === latest_version_path;
27562776
if (deleted_latest) {
2757-
gpfs_options = is_gpfs ?
2758-
await this._open_files_gpfs(fs_context, file_path, undefined, undefined, undefined, undefined, true) :
2759-
undefined;
2777+
gpfs_options = await this._open_files_gpfs(fs_context, file_path, undefined, undefined, undefined, undefined, true);
27602778
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
27612779
await native_fs_utils.safe_unlink(fs_context, file_path, version_info,
27622780
gpfs_options?.delete_version, bucket_tmp_dir_path);
2781+
await this._check_version_moved(fs_context, key, version_id, file_path);
27632782
return { ...version_info, latest: true };
27642783
} else {
27652784
await native_fs_utils.unlink_ignore_enoent(fs_context, file_path);
2785+
await this._check_version_moved(fs_context, key, version_id, file_path);
27662786
}
27672787
return version_info;
27682788
} catch (err) {
@@ -2776,6 +2796,7 @@ class NamespaceFS {
27762796
// 3. concurrent delete of this version - will get ENOENT, doing a retry will return successfully
27772797
// after we will see that the version was already deleted
27782798
if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err;
2799+
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
27792800
} finally {
27802801
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.delete_version, undefined, true);
27812802
}
@@ -2886,6 +2907,7 @@ class NamespaceFS {
28862907
}
28872908
if (err.code !== 'ENOENT') throw err;
28882909
dbg.warn(`NamespaceFS: _promote_version_to_latest failed retries=${retries}`, err);
2910+
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
28892911
}
28902912
}
28912913
}
@@ -2920,19 +2942,17 @@ class NamespaceFS {
29202942

29212943
dbg.log1('Namespace_fs._delete_latest_version:', latest_ver_info, versioned_path, versioned_info);
29222944
if (latest_ver_info) {
2923-
gpfs_options = is_gpfs ?
2924-
await this._open_files_gpfs(fs_context, latest_ver_path,
2925-
undefined, undefined, undefined, undefined, true, versioned_info) :
2926-
undefined;
2945+
gpfs_options = await this._open_files_gpfs(fs_context, latest_ver_path, undefined, undefined, undefined,
2946+
undefined, true, versioned_info);
29272947

29282948
const suspended_and_latest_is_not_null = this._is_versioning_suspended() &&
29292949
latest_ver_info.version_id_str !== NULL_VERSION_ID;
29302950
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
29312951
if (this._is_versioning_enabled() || suspended_and_latest_is_not_null) {
29322952
await native_fs_utils._make_path_dirs(versioned_path, fs_context);
2933-
await native_fs_utils.safe_move(fs_context, latest_ver_path, versioned_path, latest_ver_info,
2934-
gpfs_options && gpfs_options.delete_version, bucket_tmp_dir_path);
2935-
if (suspended_and_latest_is_not_null) {
2953+
await native_fs_utils.safe_move(fs_context, latest_ver_path, versioned_path, latest_ver_info,
2954+
gpfs_options?.delete_version, bucket_tmp_dir_path);
2955+
if (suspended_and_latest_is_not_null) {
29362956
// remove a version (or delete marker) with null version ID from .versions/ (if exists)
29372957
await this._delete_null_version_from_versions_directory(params.key, fs_context);
29382958
}
@@ -2948,6 +2968,7 @@ class NamespaceFS {
29482968
retries -= 1;
29492969
if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err;
29502970
dbg.warn(`NamespaceFS._delete_latest_version: Retrying retries=${retries} latest_ver_path=${latest_ver_path}`, err);
2971+
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
29512972
} finally {
29522973
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.delete_version, undefined, true);
29532974
}
@@ -2965,29 +2986,29 @@ class NamespaceFS {
29652986
// This function removes an object version or delete marker with a null version ID inside .version/ directory
29662987
async _delete_null_version_from_versions_directory(key, fs_context) {
29672988
const is_gpfs = native_fs_utils._is_gpfs(fs_context);
2968-
let retries = config.NSFS_RENAME_RETRIES;
29692989
const null_versioned_path = this._get_version_path(key, NULL_VERSION_ID);
29702990
await this._check_path_in_bucket_boundaries(fs_context, null_versioned_path);
2971-
2991+
let gpfs_options;
2992+
let retries = config.NSFS_RENAME_RETRIES;
29722993
for (;;) {
29732994
try {
29742995
const null_versioned_path_info = await this._get_version_info(fs_context, null_versioned_path);
29752996
dbg.log1('Namespace_fs._delete_null_version_from_versions_directory:', null_versioned_path, null_versioned_path_info);
2976-
if (null_versioned_path_info) {
2977-
const gpfs_options = is_gpfs ?
2978-
await this._open_files_gpfs(fs_context, null_versioned_path, undefined, undefined, undefined, undefined, true) :
2979-
undefined;
2980-
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
2981-
await native_fs_utils.safe_unlink(fs_context, null_versioned_path, null_versioned_path_info,
2982-
gpfs_options?.delete_version, bucket_tmp_dir_path);
2997+
if (!null_versioned_path_info) return;
29832998

2984-
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options?.delete_version, undefined, true);
2985-
}
2999+
gpfs_options = await this._open_files_gpfs(fs_context, null_versioned_path, undefined, undefined, undefined,
3000+
undefined, true);
3001+
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
3002+
await native_fs_utils.safe_unlink(fs_context, null_versioned_path, null_versioned_path_info,
3003+
gpfs_options?.delete_version, bucket_tmp_dir_path);
29863004
break;
29873005
} catch (err) {
29883006
retries -= 1;
29893007
if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err;
29903008
dbg.warn(`NamespaceFS._delete_null_version_from_versions_directory Retrying retries=${retries} null_versioned_path=${null_versioned_path}`, err);
3009+
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
3010+
} finally {
3011+
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.delete_version, undefined, true);
29913012
}
29923013
}
29933014
}
@@ -3025,6 +3046,7 @@ class NamespaceFS {
30253046
return delete_marker_version_id;
30263047
}
30273048
dbg.warn(`NamespaceFS: _create_delete_marker failed retries=${retries}`, err);
3049+
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
30283050
} finally {
30293051
if (upload_params) await this.complete_object_upload_finally(undefined, undefined, upload_params.target_file, fs_context);
30303052
}
@@ -3070,6 +3092,8 @@ class NamespaceFS {
30703092
// eslint-disable-next-line max-params
30713093
async _open_files_gpfs(fs_context, src_path, dst_path, upload_or_dir_file, dst_ver_info, open_mode, delete_version, versioned_info) {
30723094
dbg.log1('Namespace_fs._open_files_gpfs:', src_path, src_path && path.dirname(src_path), dst_path, upload_or_dir_file, dst_ver_info, open_mode, delete_version, versioned_info);
3095+
const is_gpfs = native_fs_utils._is_gpfs(fs_context);
3096+
if (!is_gpfs) return;
30733097

30743098
let src_file;
30753099
let dst_file;
@@ -3133,6 +3157,29 @@ class NamespaceFS {
31333157
}
31343158
}
31353159

3160+
/**
3161+
* _check_version_moved recieves cur_path and checks if the version moved
3162+
* 1. if cur_path is in .versions/ - the function will check if the verison moved to latest
3163+
* 2. if cur_path is the latest version - the function will check if the version moved to .versions/
3164+
* @param {import('./nb').NativeFSContext} fs_context
3165+
* @param {string} key
3166+
* @param {string} version_id
3167+
* @param {string} cur_path
3168+
*/
3169+
async _check_version_moved(fs_context, key, version_id, cur_path) {
3170+
const latest_version_path = this._get_file_path({ key });
3171+
const verisoned_path = this._get_version_path(key, version_id);
3172+
if (cur_path === latest_version_path) {
3173+
const verisoned_path_info = await this._get_version_info(fs_context, verisoned_path);
3174+
if (verisoned_path_info) throw error_utils.new_error_code('VERSION_MOVED', 'version file moved from latest to .versions/, retrying');
3175+
} else if (cur_path === verisoned_path) {
3176+
const latest_ver_info = await this._get_version_info(fs_context, latest_version_path);
3177+
if (latest_ver_info && latest_ver_info.version_id_str === version_id) throw error_utils.new_error_code('VERSION_MOVED', 'version file moved from .versions/ to latest, retrying');
3178+
} else {
3179+
throw new RpcError('INTERNAL_ERROR', `namespace_fs._check_version_moved failed, invalid cur_path=${cur_path}`);
3180+
}
3181+
}
3182+
31363183
async _throw_if_storage_class_not_supported(storage_class) {
31373184
if (!await this._is_storage_class_supported(storage_class)) {
31383185
throw new S3Error(S3Error.InvalidStorageClass);

src/test/unit_tests/jest_tests/test_versioning_concurrency.test.js

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
'use strict';
33

44
const path = require('path');
5-
const config = require('../../../../config');
65
const P = require('../../../util/promise');
76
const fs_utils = require('../../../util/fs_utils');
87
const NamespaceFS = require('../../../sdk/namespace_fs');
@@ -41,32 +40,31 @@ const nsfs = new NamespaceFS({
4140

4241
const DUMMY_OBJECT_SDK = make_dummy_object_sdk(true);
4342
describe('test versioning concurrency', () => {
44-
const prior_value_of_nsfs_rename_retries = config.NSFS_RENAME_RETRIES;
4543

4644
beforeEach(async () => {
4745
await fs_utils.create_fresh_path(tmp_fs_path);
4846
});
4947

5048
afterEach(async () => {
5149
await fs_utils.folder_delete(tmp_fs_path);
52-
config.NSFS_RENAME_RETRIES = prior_value_of_nsfs_rename_retries;
5350
});
5451

5552
it('multiple puts of the same key', async () => {
5653
const bucket = 'bucket1';
5754
const key = 'key1';
5855
const failed_operations = [];
59-
for (let i = 0; i < 5; i++) {
56+
const num_of_concurrency = 10;
57+
for (let i = 0; i < num_of_concurrency; i++) {
6058
const random_data = Buffer.from(String(i));
6159
const body = buffer_utils.buffer_to_read_stream(random_data);
6260
nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK)
6361
.catch(err => failed_operations.push(err));
6462
}
65-
await P.delay(1000);
63+
await P.delay(3000);
6664
expect(failed_operations.length).toBe(0);
6765
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);
68-
expect(versions.objects.length).toBe(5);
69-
});
66+
expect(versions.objects.length).toBe(num_of_concurrency);
67+
}, 6000);
7068

7169
it('multiple delete version id and key', async () => {
7270
const bucket = 'bucket1';
@@ -163,7 +161,6 @@ describe('test versioning concurrency', () => {
163161
const failed_put_operations = [];
164162
const failed_head_operations = [];
165163
const number_of_iterations = 10;
166-
config.NSFS_RENAME_RETRIES = 40;
167164
for (let i = 0; i < number_of_iterations; i++) {
168165
const random_data = Buffer.from(String(i));
169166
const body = buffer_utils.buffer_to_read_stream(random_data);
@@ -181,6 +178,40 @@ describe('test versioning concurrency', () => {
181178
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);
182179
expect(versions.objects.length).toBe(number_of_iterations + 1); // 1 version before + 10 versions concurrent
183180
});
181+
182+
it('concurrent delete objects by version id/latest', async () => {
183+
const bucket = 'bucket1';
184+
const key = 'key5';
185+
const delete_ver_res_arr = [];
186+
const delete_ver_err_arr = [];
187+
const delete_res_arr = [];
188+
const delete_err_arr = [];
189+
const initial_num_of_versions = 1;
190+
const versions_arr = await _upload_versions(bucket, key, initial_num_of_versions);
191+
192+
const version_id_to_delete = versions_arr[initial_num_of_versions - 1];
193+
const num_of_concurrency = initial_num_of_versions;
194+
for (let i = 0; i < num_of_concurrency; i++) {
195+
nsfs.delete_object({ bucket: bucket, key: key, version_id: version_id_to_delete }, DUMMY_OBJECT_SDK)
196+
.then(res => delete_ver_res_arr.push(res.deleted_version_id))
197+
.catch(err => delete_ver_err_arr.push(err));
198+
nsfs.delete_object({ bucket: bucket, key: key }, DUMMY_OBJECT_SDK)
199+
.then(res => delete_res_arr.push(res))
200+
.catch(err => delete_err_arr.push(err));
201+
}
202+
await P.delay(5000);
203+
expect(delete_ver_res_arr).toHaveLength(num_of_concurrency);
204+
expect(delete_res_arr).toHaveLength(num_of_concurrency);
205+
expect(delete_ver_err_arr).toHaveLength(0);
206+
expect(delete_err_arr).toHaveLength(0);
207+
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);
208+
209+
expect(versions.objects).toHaveLength(num_of_concurrency);
210+
const num_of_delete_markers = (versions.objects.filter(version => version.delete_marker === true)).length;
211+
expect(num_of_delete_markers).toBe(num_of_concurrency);
212+
const num_of_latest_versions = (versions.objects.filter(version => version.is_latest === true)).length;
213+
expect(num_of_latest_versions).toBe(1);
214+
}, 6000);
184215
});
185216

186217
/**

0 commit comments

Comments
 (0)