Skip to content

Commit 6cd3731

Browse files
committed
NC | Concurrency & refactoring | Add delay, version move checks and GPFS refactoring
Signed-off-by: Romy <[email protected]>
1 parent e24e59a commit 6cd3731

File tree

4 files changed

+117
-42
lines changed

4 files changed

+117
-42
lines changed

config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -766,6 +766,7 @@ config.NSFS_SEM_WARNING_TIMEOUT = 10 * 60 * 1000;
766766
// number of rename retries in case of deleted destination directory
767767
config.NSFS_RENAME_RETRIES = 10;
768768
config.NSFS_MKDIR_PATH_RETRIES = 3;
769+
config.NSFS_DELETE_VERSION_RETRIES = 2;
769770

770771
config.NSFS_VERSIONING_ENABLED = true;
771772
config.NSFS_UPDATE_ISSUES_REPORT_ENABLED = true;

src/sdk/namespace_fs.js

Lines changed: 76 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1391,9 +1391,10 @@ class NamespaceFS {
13911391
}
13921392

13931393
// 1. get latest version_id
1394-
// 2. if versioning is suspended -
1394+
// 2. if versioning is suspended -
13951395
// 2.1 if version ID of the latest version is null -
1396-
// 2.1.1 remove latest version
1396+
// 2.1.1. if it's POSIX backend - unlink the null version
1397+
// 2.1.2. if it's GPFS backend - nothing to do, the linkatif will override it
13971398
// 2.2 else (version ID of the latest version is unique or there is no latest version) -
13981399
// 2.2.1 remove a version (or delete marker) with null version ID from .versions/ (if exists)
13991400
// 3. if latest version exists -
@@ -1417,10 +1418,8 @@ class NamespaceFS {
14171418
const versioned_path = latest_ver_info && this._get_version_path(key, latest_ver_info.version_id_str);
14181419
const versioned_info = latest_ver_info && await this._get_version_info(fs_context, versioned_path);
14191420

1420-
gpfs_options = is_gpfs ?
1421-
await this._open_files_gpfs(fs_context, new_ver_tmp_path, latest_ver_path, upload_file,
1422-
latest_ver_info, open_mode, undefined, versioned_info) :
1423-
undefined;
1421+
gpfs_options = await this._open_files_gpfs(fs_context, new_ver_tmp_path, latest_ver_path, upload_file,
1422+
latest_ver_info, open_mode, undefined, versioned_info);
14241423
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
14251424
dbg.log1('Namespace_fs._move_to_dest_version:', latest_ver_info, new_ver_info, gpfs_options);
14261425

@@ -1439,13 +1438,20 @@ class NamespaceFS {
14391438
if (latest_ver_info &&
14401439
((this._is_versioning_enabled()) ||
14411440
(this._is_versioning_suspended() && latest_ver_info.version_id_str !== NULL_VERSION_ID))) {
1442-
dbg.log1('NamespaceFS._move_to_dest_version version ID of the latest version is a unique ID - the file will be moved it to .versions/ directory');
1441+
dbg.log1('NamespaceFS._move_to_dest_version version ID of the latet version is a unique ID - the file will be moved it to .versions/ directory');
1442+
await this._check_version_moved(fs_context, key, latest_ver_info.version_id_str, latest_ver_path);
14431443
await native_fs_utils._make_path_dirs(versioned_path, fs_context);
14441444
await native_fs_utils.safe_move(fs_context, latest_ver_path, versioned_path, latest_ver_info,
1445-
gpfs_options && gpfs_options.move_to_versions, bucket_tmp_dir_path);
1445+
gpfs_options?.move_to_versions, bucket_tmp_dir_path);
14461446
}
14471447
try {
14481448
// move new version to latest_ver_path (key path)
1449+
if (latest_ver_info) {
1450+
await this._check_version_moved(fs_context, key, latest_ver_info.version_id_str, latest_ver_path);
1451+
} else {
1452+
const latest_ver_info_verification = await this._get_version_info(fs_context, latest_ver_path);
1453+
if (latest_ver_info_verification) throw new RpcError('VERSION_MOVED');
1454+
}
14491455
await native_fs_utils.safe_move(fs_context, new_ver_tmp_path, latest_ver_path, new_ver_info,
14501456
gpfs_options && gpfs_options.move_to_dst, bucket_tmp_dir_path);
14511457
} catch (err) {
@@ -1460,6 +1466,7 @@ class NamespaceFS {
14601466
dbg.warn(`NamespaceFS._move_to_dest_version retrying retries=${retries} should_retry=${should_retry}` +
14611467
` new_ver_tmp_path=${new_ver_tmp_path} latest_ver_path=${latest_ver_path}`, err);
14621468
if (!should_retry || retries <= 0) throw err;
1469+
await P.delay(100);
14631470
} finally {
14641471
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.move_to_dst, open_mode);
14651472
}
@@ -2680,9 +2687,18 @@ class NamespaceFS {
26802687
// if stat failed, undefined will return
26812688
}
26822689

2683-
// 1. if version exists in .versions/ folder - return its path
2684-
// 2. else if version is latest version - return latest version path
2685-
// 3. throw ENOENT error
2690+
/**
2691+
* _find_version_path returns the path of the version
2692+
* 1. if version_id is not defined, it returns the key file
2693+
* 2. else,
2694+
* 2.1. check version format
2695+
* 2.2. check if latest version exists and it matches the versio_id parameter the latest version path returns
2696+
* 2.3. else, return the version path under .versions/
2697+
* @param {import('./nb').NativeFSContext} fs_context
2698+
* @param {{key: string, version_id?: string}} params
2699+
* @param {boolean} [return_md_path]
2700+
* @returns {Promise<string>}
2701+
*/
26862702
async _find_version_path(fs_context, { key, version_id }, return_md_path) {
26872703
const cur_ver_path = return_md_path ? this._get_file_md_path({ key }) : this._get_file_path({ key });
26882704
if (!version_id) return cur_ver_path;
@@ -2740,7 +2756,7 @@ class NamespaceFS {
27402756
*/
27412757
// we can use this function when versioning is enabled or suspended
27422758
async _delete_single_object_versioned(fs_context, key, version_id) {
2743-
let retries = config.NSFS_RENAME_RETRIES;
2759+
let retries = config.NSFS_DELETE_VERSION_RETRIES;
27442760
const is_gpfs = native_fs_utils._is_gpfs(fs_context);
27452761
const latest_version_path = this._get_file_path({ key });
27462762
for (;;) {
@@ -2754,15 +2770,15 @@ class NamespaceFS {
27542770

27552771
const deleted_latest = file_path === latest_version_path;
27562772
if (deleted_latest) {
2757-
gpfs_options = is_gpfs ?
2758-
await this._open_files_gpfs(fs_context, file_path, undefined, undefined, undefined, undefined, true) :
2759-
undefined;
2773+
gpfs_options = await this._open_files_gpfs(fs_context, file_path, undefined, undefined, undefined, undefined, true);
27602774
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
27612775
await native_fs_utils.safe_unlink(fs_context, file_path, version_info,
27622776
gpfs_options?.delete_version, bucket_tmp_dir_path);
2777+
await this._check_version_moved(fs_context, key, version_id, file_path);
27632778
return { ...version_info, latest: true };
27642779
} else {
27652780
await native_fs_utils.unlink_ignore_enoent(fs_context, file_path);
2781+
await this._check_version_moved(fs_context, key, version_id, file_path);
27662782
}
27672783
return version_info;
27682784
} catch (err) {
@@ -2776,6 +2792,7 @@ class NamespaceFS {
27762792
// 3. concurrent delete of this version - will get ENOENT, doing a retry will return successfully
27772793
// after we will see that the version was already deleted
27782794
if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err;
2795+
await P.delay(100);
27792796
} finally {
27802797
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.delete_version, undefined, true);
27812798
}
@@ -2886,6 +2903,7 @@ class NamespaceFS {
28862903
}
28872904
if (err.code !== 'ENOENT') throw err;
28882905
dbg.warn(`NamespaceFS: _promote_version_to_latest failed retries=${retries}`, err);
2906+
await P.delay(100);
28892907
}
28902908
}
28912909
}
@@ -2920,19 +2938,17 @@ class NamespaceFS {
29202938

29212939
dbg.log1('Namespace_fs._delete_latest_version:', latest_ver_info, versioned_path, versioned_info);
29222940
if (latest_ver_info) {
2923-
gpfs_options = is_gpfs ?
2924-
await this._open_files_gpfs(fs_context, latest_ver_path,
2925-
undefined, undefined, undefined, undefined, true, versioned_info) :
2926-
undefined;
2941+
gpfs_options = await this._open_files_gpfs(fs_context, latest_ver_path, undefined, undefined, undefined,
2942+
undefined, true, versioned_info);
29272943

29282944
const suspended_and_latest_is_not_null = this._is_versioning_suspended() &&
29292945
latest_ver_info.version_id_str !== NULL_VERSION_ID;
29302946
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
29312947
if (this._is_versioning_enabled() || suspended_and_latest_is_not_null) {
29322948
await native_fs_utils._make_path_dirs(versioned_path, fs_context);
2933-
await native_fs_utils.safe_move(fs_context, latest_ver_path, versioned_path, latest_ver_info,
2934-
gpfs_options && gpfs_options.delete_version, bucket_tmp_dir_path);
2935-
if (suspended_and_latest_is_not_null) {
2949+
await native_fs_utils.safe_move(fs_context, latest_ver_path, versioned_path, latest_ver_info,
2950+
gpfs_options?.delete_version, bucket_tmp_dir_path);
2951+
if (suspended_and_latest_is_not_null) {
29362952
// remove a version (or delete marker) with null version ID from .versions/ (if exists)
29372953
await this._delete_null_version_from_versions_directory(params.key, fs_context);
29382954
}
@@ -2948,6 +2964,7 @@ class NamespaceFS {
29482964
retries -= 1;
29492965
if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err;
29502966
dbg.warn(`NamespaceFS._delete_latest_version: Retrying retries=${retries} latest_ver_path=${latest_ver_path}`, err);
2967+
await P.delay(100);
29512968
} finally {
29522969
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.delete_version, undefined, true);
29532970
}
@@ -2965,29 +2982,29 @@ class NamespaceFS {
29652982
// This function removes an object version or delete marker with a null version ID inside .version/ directory
29662983
async _delete_null_version_from_versions_directory(key, fs_context) {
29672984
const is_gpfs = native_fs_utils._is_gpfs(fs_context);
2968-
let retries = config.NSFS_RENAME_RETRIES;
29692985
const null_versioned_path = this._get_version_path(key, NULL_VERSION_ID);
29702986
await this._check_path_in_bucket_boundaries(fs_context, null_versioned_path);
2971-
2987+
let gpfs_options;
2988+
let retries = config.NSFS_RENAME_RETRIES;
29722989
for (;;) {
29732990
try {
29742991
const null_versioned_path_info = await this._get_version_info(fs_context, null_versioned_path);
29752992
dbg.log1('Namespace_fs._delete_null_version_from_versions_directory:', null_versioned_path, null_versioned_path_info);
2976-
if (null_versioned_path_info) {
2977-
const gpfs_options = is_gpfs ?
2978-
await this._open_files_gpfs(fs_context, null_versioned_path, undefined, undefined, undefined, undefined, true) :
2979-
undefined;
2980-
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
2981-
await native_fs_utils.safe_unlink(fs_context, null_versioned_path, null_versioned_path_info,
2982-
gpfs_options?.delete_version, bucket_tmp_dir_path);
2993+
if (!null_versioned_path_info) return;
29832994

2984-
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options?.delete_version, undefined, true);
2985-
}
2995+
gpfs_options = await this._open_files_gpfs(fs_context, null_versioned_path, undefined, undefined, undefined,
2996+
undefined, true);
2997+
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
2998+
await native_fs_utils.safe_unlink(fs_context, null_versioned_path, null_versioned_path_info,
2999+
gpfs_options?.delete_version, bucket_tmp_dir_path);
29863000
break;
29873001
} catch (err) {
29883002
retries -= 1;
29893003
if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err;
29903004
dbg.warn(`NamespaceFS._delete_null_version_from_versions_directory Retrying retries=${retries} null_versioned_path=${null_versioned_path}`, err);
3005+
} finally {
3006+
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.delete_version, undefined, true);
3007+
await P.delay(100);
29913008
}
29923009
}
29933010
}
@@ -3025,6 +3042,7 @@ class NamespaceFS {
30253042
return delete_marker_version_id;
30263043
}
30273044
dbg.warn(`NamespaceFS: _create_delete_marker failed retries=${retries}`, err);
3045+
await P.delay(100);
30283046
} finally {
30293047
if (upload_params) await this.complete_object_upload_finally(undefined, undefined, upload_params.target_file, fs_context);
30303048
}
@@ -3070,6 +3088,8 @@ class NamespaceFS {
30703088
// eslint-disable-next-line max-params
30713089
async _open_files_gpfs(fs_context, src_path, dst_path, upload_or_dir_file, dst_ver_info, open_mode, delete_version, versioned_info) {
30723090
dbg.log1('Namespace_fs._open_files_gpfs:', src_path, src_path && path.dirname(src_path), dst_path, upload_or_dir_file, dst_ver_info, open_mode, delete_version, versioned_info);
3091+
const is_gpfs = native_fs_utils._is_gpfs(fs_context);
3092+
if (!is_gpfs) return;
30733093

30743094
let src_file;
30753095
let dst_file;
@@ -3133,6 +3153,29 @@ class NamespaceFS {
31333153
}
31343154
}
31353155

3156+
/**
3157+
* _check_version_moved recieves cur_path and checks if the version moved
3158+
* 1. if cur_path is in .versions/ - the function will check if the verison moved to latest
3159+
* 2. if cur_path is the latest version - the function will check if the version moved to .versions/
3160+
* @param {import('./nb').NativeFSContext} fs_context
3161+
* @param {string} key
3162+
* @param {string} version_id
3163+
* @param {string} cur_path
3164+
*/
3165+
async _check_version_moved(fs_context, key, version_id, cur_path) {
3166+
const latest_version_path = this._get_file_path({ key });
3167+
const verisoned_path = this._get_version_path(key, version_id);
3168+
if (cur_path === latest_version_path) {
3169+
const verisoned_path_info = await this._get_version_info(fs_context, verisoned_path);
3170+
if (verisoned_path_info) throw error_utils.new_error_code('VERSION_MOVED', 'version file moved from latest to .versions/, retrying');
3171+
} else if (cur_path === verisoned_path) {
3172+
const latest_ver_info = await this._get_version_info(fs_context, latest_version_path);
3173+
if (latest_ver_info && latest_ver_info.version_id_str === version_id) throw error_utils.new_error_code('VERSION_MOVED', 'version file moved from .versions/ to latest, retrying');
3174+
} else {
3175+
throw new RpcError('INTERNAL_ERROR', `namespace_fs._check_version_moved failed, invalid cur_path=${cur_path}`);
3176+
}
3177+
}
3178+
31363179
async _throw_if_storage_class_not_supported(storage_class) {
31373180
if (!await this._is_storage_class_supported(storage_class)) {
31383181
throw new S3Error(S3Error.InvalidStorageClass);

src/test/unit_tests/jest_tests/test_versioning_concurrency.test.js

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
'use strict';
33

44
const path = require('path');
5-
const config = require('../../../../config');
65
const P = require('../../../util/promise');
76
const fs_utils = require('../../../util/fs_utils');
87
const NamespaceFS = require('../../../sdk/namespace_fs');
@@ -41,32 +40,31 @@ const nsfs = new NamespaceFS({
4140

4241
const DUMMY_OBJECT_SDK = make_dummy_object_sdk(true);
4342
describe('test versioning concurrency', () => {
44-
const prior_value_of_nsfs_rename_retries = config.NSFS_RENAME_RETRIES;
4543

4644
beforeEach(async () => {
4745
await fs_utils.create_fresh_path(tmp_fs_path);
4846
});
4947

5048
afterEach(async () => {
5149
await fs_utils.folder_delete(tmp_fs_path);
52-
config.NSFS_RENAME_RETRIES = prior_value_of_nsfs_rename_retries;
5350
});
5451

5552
it('multiple puts of the same key', async () => {
5653
const bucket = 'bucket1';
5754
const key = 'key1';
5855
const failed_operations = [];
59-
for (let i = 0; i < 5; i++) {
56+
const num_of_concurrency = 10;
57+
for (let i = 0; i < num_of_concurrency; i++) {
6058
const random_data = Buffer.from(String(i));
6159
const body = buffer_utils.buffer_to_read_stream(random_data);
6260
nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK)
6361
.catch(err => failed_operations.push(err));
6462
}
65-
await P.delay(1000);
63+
await P.delay(3000);
6664
expect(failed_operations.length).toBe(0);
6765
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);
68-
expect(versions.objects.length).toBe(5);
69-
});
66+
expect(versions.objects.length).toBe(num_of_concurrency);
67+
}, 6000);
7068

7169
it('multiple delete version id and key', async () => {
7270
const bucket = 'bucket1';
@@ -163,7 +161,6 @@ describe('test versioning concurrency', () => {
163161
const failed_put_operations = [];
164162
const failed_head_operations = [];
165163
const number_of_iterations = 10;
166-
config.NSFS_RENAME_RETRIES = 40;
167164
for (let i = 0; i < number_of_iterations; i++) {
168165
const random_data = Buffer.from(String(i));
169166
const body = buffer_utils.buffer_to_read_stream(random_data);
@@ -181,6 +178,40 @@ describe('test versioning concurrency', () => {
181178
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);
182179
expect(versions.objects.length).toBe(number_of_iterations + 1); // 1 version before + 10 versions concurrent
183180
});
181+
182+
it('concurrent delete objects by version id/latest', async () => {
183+
const bucket = 'bucket1';
184+
const key = 'key5';
185+
const delete_ver_res_arr = [];
186+
const delete_ver_err_arr = [];
187+
const delete_res_arr = [];
188+
const delete_err_arr = [];
189+
const initial_num_of_versions = 1;
190+
const versions_arr = await _upload_versions(bucket, key, initial_num_of_versions);
191+
192+
const version_id_to_delete = versions_arr[initial_num_of_versions - 1];
193+
const num_of_concurrency = initial_num_of_versions;
194+
for (let i = 0; i < num_of_concurrency; i++) {
195+
nsfs.delete_object({ bucket: bucket, key: key, version_id: version_id_to_delete }, DUMMY_OBJECT_SDK)
196+
.then(res => delete_ver_res_arr.push(res.deleted_version_id))
197+
.catch(err => delete_ver_err_arr.push(err));
198+
nsfs.delete_object({ bucket: bucket, key: key }, DUMMY_OBJECT_SDK)
199+
.then(res => delete_res_arr.push(res))
200+
.catch(err => delete_err_arr.push(err));
201+
}
202+
await P.delay(5000);
203+
expect(delete_ver_res_arr).toHaveLength(num_of_concurrency);
204+
expect(delete_res_arr).toHaveLength(num_of_concurrency);
205+
expect(delete_ver_err_arr).toHaveLength(0);
206+
expect(delete_err_arr).toHaveLength(0);
207+
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);
208+
209+
expect(versions.objects).toHaveLength(num_of_concurrency);
210+
const num_of_delete_markers = (versions.objects.filter(version => version.delete_marker === true)).length;
211+
expect(num_of_delete_markers).toBe(num_of_concurrency);
212+
const num_of_latest_versions = (versions.objects.filter(version => version.is_latest === true)).length;
213+
expect(num_of_latest_versions).toBe(1);
214+
}, 6000);
184215
});
185216

186217
/**

0 commit comments

Comments
 (0)