Skip to content

NC | Concurrency & refactoring | Add delay, version move checks and GPFS refactoring #8419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ config.NSFS_SEM_WARNING_TIMEOUT = 10 * 60 * 1000;
// number of rename retries in case of deleted destination directory
config.NSFS_RENAME_RETRIES = 10;
config.NSFS_MKDIR_PATH_RETRIES = 3;
config.NSFS_RANDOM_DELAY_BASE = 70;

config.NSFS_VERSIONING_ENABLED = true;
config.NSFS_UPDATE_ISSUES_REPORT_ENABLED = true;
Expand Down
116 changes: 80 additions & 36 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const { v4: uuidv4 } = require('uuid');
const P = require('../util/promise');
const dbg = require('../util/debug_module')(__filename);
const config = require('../../config');
const crypto = require('crypto');
const s3_utils = require('../endpoint/s3/s3_utils');
const error_utils = require('../util/error_utils');
const stream_utils = require('../util/stream_utils');
Expand Down Expand Up @@ -293,6 +294,16 @@ function to_fs_xattr(xattr) {
return _.mapKeys(xattr, (val, key) => XATTR_USER_PREFIX + key);
}

/**
* get_random_delay returns a random delay number between base + min and max
* @param {number} base
* @param {number} min
* @param {number} max
* @returns {number}
*/
function get_random_delay(base, min, max) {
return base + crypto.randomInt(min, max);
}

/**
* @typedef {{
Expand Down Expand Up @@ -1391,9 +1402,10 @@ class NamespaceFS {
}

// 1. get latest version_id
// 2. if versioning is suspended -
// 2. if versioning is suspended -
// 2.1 if version ID of the latest version is null -
// 2.1.1 remove latest version
// 2.1.1. if it's POSIX backend - unlink the null version
// 2.1.2. if it's GPFS backend - nothing to do, the linkatif will override it
// 2.2 else (version ID of the latest version is unique or there is no latest version) -
// 2.2.1 remove a version (or delete marker) with null version ID from .versions/ (if exists)
// 3. if latest version exists -
Expand All @@ -1417,10 +1429,8 @@ class NamespaceFS {
const versioned_path = latest_ver_info && this._get_version_path(key, latest_ver_info.version_id_str);
const versioned_info = latest_ver_info && await this._get_version_info(fs_context, versioned_path);

gpfs_options = is_gpfs ?
await this._open_files_gpfs(fs_context, new_ver_tmp_path, latest_ver_path, upload_file,
latest_ver_info, open_mode, undefined, versioned_info) :
undefined;
gpfs_options = await this._open_files_gpfs(fs_context, new_ver_tmp_path, latest_ver_path, upload_file,
latest_ver_info, open_mode, undefined, versioned_info);
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
dbg.log1('Namespace_fs._move_to_dest_version:', latest_ver_info, new_ver_info, gpfs_options);

Expand All @@ -1442,7 +1452,7 @@ class NamespaceFS {
dbg.log1('NamespaceFS._move_to_dest_version version ID of the latest version is a unique ID - the file will be moved it to .versions/ directory');
await native_fs_utils._make_path_dirs(versioned_path, fs_context);
await native_fs_utils.safe_move(fs_context, latest_ver_path, versioned_path, latest_ver_info,
gpfs_options && gpfs_options.move_to_versions, bucket_tmp_dir_path);
gpfs_options?.move_to_versions, bucket_tmp_dir_path);
}
try {
// move new version to latest_ver_path (key path)
Expand All @@ -1457,9 +1467,10 @@ class NamespaceFS {
} catch (err) {
retries -= 1;
const should_retry = native_fs_utils.should_retry_link_unlink(is_gpfs, err);
dbg.warn(`NamespaceFS._move_to_dest_version retrying retries=${retries} should_retry=${should_retry}` +
dbg.warn(`NamespaceFS._move_to_dest_version error: retries=${retries} should_retry=${should_retry}` +
` new_ver_tmp_path=${new_ver_tmp_path} latest_ver_path=${latest_ver_path}`, err);
if (!should_retry || retries <= 0) throw err;
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
} finally {
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.move_to_dst, open_mode);
}
Expand Down Expand Up @@ -2680,9 +2691,18 @@ class NamespaceFS {
// if stat failed, undefined will return
}

// 1. if version exists in .versions/ folder - return its path
// 2. else if version is latest version - return latest version path
// 3. throw ENOENT error
/**
* _find_version_path returns the path of the version
* 1. if version_id is not defined, it returns the key file
* 2. else,
* 2.1. check version format
* 2.2. check if the latest version exists and it matches the version_id parameter the latest version path returns
* 2.3. else, return the version path under .versions/
* @param {import('./nb').NativeFSContext} fs_context
* @param {{key: string, version_id?: string}} params
* @param {boolean} [return_md_path]
* @returns {Promise<string>}
*/
async _find_version_path(fs_context, { key, version_id }, return_md_path) {
const cur_ver_path = return_md_path ? this._get_file_md_path({ key }) : this._get_file_path({ key });
if (!version_id) return cur_ver_path;
Expand Down Expand Up @@ -2726,6 +2746,11 @@ class NamespaceFS {
}

/**
* _delete_single_object_versioned does the following -
* if the deleted version is the latest - try to delete it from the latest version location
* if the deleted version is in .versions/ - unlink the version
* we call check_version_moved() in case of concurrent puts, the version might move to .versions/
* if the version moved we will retry
* @param {nb.NativeFSContext} fs_context
* @param {string} key
* @param {string} version_id
Expand All @@ -2738,7 +2763,6 @@ class NamespaceFS {
* latest?: boolean;
* }>}
*/
// we can use this function when versioning is enabled or suspended
async _delete_single_object_versioned(fs_context, key, version_id) {
let retries = config.NSFS_RENAME_RETRIES;
const is_gpfs = native_fs_utils._is_gpfs(fs_context);
Expand All @@ -2754,19 +2778,19 @@ class NamespaceFS {

const deleted_latest = file_path === latest_version_path;
if (deleted_latest) {
gpfs_options = is_gpfs ?
await this._open_files_gpfs(fs_context, file_path, undefined, undefined, undefined, undefined, true) :
undefined;
gpfs_options = await this._open_files_gpfs(fs_context, file_path, undefined, undefined, undefined, undefined, true);
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
await native_fs_utils.safe_unlink(fs_context, file_path, version_info,
gpfs_options?.delete_version, bucket_tmp_dir_path);
await this._check_version_moved(fs_context, key, version_id);
return { ...version_info, latest: true };
} else {
await native_fs_utils.unlink_ignore_enoent(fs_context, file_path);
await this._check_version_moved(fs_context, key, version_id);
}
return version_info;
} catch (err) {
dbg.warn(`NamespaceFS._delete_single_object_versioned: retrying retries=${retries} file_path=${file_path}`, err);
dbg.warn(`NamespaceFS._delete_single_object_versioned error: retries=${retries} file_path=${file_path}`, err);
retries -= 1;
// there are a few concurrency scenarios that might happen we should retry for -
// 1. the version id is the latest, concurrent put will might move the version id from being the latest to .versions/ -
Expand All @@ -2776,6 +2800,7 @@ class NamespaceFS {
// 3. concurrent delete of this version - will get ENOENT, doing a retry will return successfully
// after we will see that the version was already deleted
if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err;
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
} finally {
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.delete_version, undefined, true);
}
Expand Down Expand Up @@ -2878,14 +2903,15 @@ class NamespaceFS {
max_past_ver_info, bucket_tmp_dir_path);
break;
} catch (err) {
dbg.warn(`NamespaceFS: _promote_version_to_latest failed error: retries=${retries}`, err);
retries -= 1;
if (retries <= 0) throw err;
if (!native_fs_utils._is_gpfs(fs_context) && err.code === 'EEXIST') {
dbg.warn('Namespace_fs._delete_version_id: latest version exist - skipping');
return;
}
if (err.code !== 'ENOENT') throw err;
dbg.warn(`NamespaceFS: _promote_version_to_latest failed retries=${retries}`, err);
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
}
}
}
Expand Down Expand Up @@ -2920,18 +2946,16 @@ class NamespaceFS {

dbg.log1('Namespace_fs._delete_latest_version:', latest_ver_info, versioned_path, versioned_info);
if (latest_ver_info) {
gpfs_options = is_gpfs ?
await this._open_files_gpfs(fs_context, latest_ver_path,
undefined, undefined, undefined, undefined, true, versioned_info) :
undefined;
gpfs_options = await this._open_files_gpfs(fs_context, latest_ver_path, undefined, undefined, undefined,
undefined, true, versioned_info);

const suspended_and_latest_is_not_null = this._is_versioning_suspended() &&
latest_ver_info.version_id_str !== NULL_VERSION_ID;
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
if (this._is_versioning_enabled() || suspended_and_latest_is_not_null) {
await native_fs_utils._make_path_dirs(versioned_path, fs_context);
await native_fs_utils.safe_move(fs_context, latest_ver_path, versioned_path, latest_ver_info,
gpfs_options && gpfs_options.delete_version, bucket_tmp_dir_path);
gpfs_options?.delete_version, bucket_tmp_dir_path);
if (suspended_and_latest_is_not_null) {
// remove a version (or delete marker) with null version ID from .versions/ (if exists)
await this._delete_null_version_from_versions_directory(params.key, fs_context);
Expand All @@ -2945,9 +2969,10 @@ class NamespaceFS {
}
break;
} catch (err) {
dbg.warn(`NamespaceFS._delete_latest_version: Retrying retries=${retries} latest_ver_path=${latest_ver_path}`, err);
dbg.warn(`NamespaceFS._delete_latest_version error: retries=${retries} latest_ver_path=${latest_ver_path}`, err);
retries -= 1;
if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err;
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
} finally {
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.delete_version, undefined, true);
}
Expand All @@ -2965,29 +2990,29 @@ class NamespaceFS {
// This function removes an object version or delete marker with a null version ID inside .version/ directory
async _delete_null_version_from_versions_directory(key, fs_context) {
const is_gpfs = native_fs_utils._is_gpfs(fs_context);
let retries = config.NSFS_RENAME_RETRIES;
const null_versioned_path = this._get_version_path(key, NULL_VERSION_ID);
await this._check_path_in_bucket_boundaries(fs_context, null_versioned_path);

let gpfs_options;
let retries = config.NSFS_RENAME_RETRIES;
for (;;) {
try {
const null_versioned_path_info = await this._get_version_info(fs_context, null_versioned_path);
dbg.log1('Namespace_fs._delete_null_version_from_versions_directory:', null_versioned_path, null_versioned_path_info);
if (null_versioned_path_info) {
const gpfs_options = is_gpfs ?
await this._open_files_gpfs(fs_context, null_versioned_path, undefined, undefined, undefined, undefined, true) :
undefined;
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
await native_fs_utils.safe_unlink(fs_context, null_versioned_path, null_versioned_path_info,
gpfs_options?.delete_version, bucket_tmp_dir_path);
if (!null_versioned_path_info) return;

if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options?.delete_version, undefined, true);
}
gpfs_options = await this._open_files_gpfs(fs_context, null_versioned_path, undefined, undefined, undefined,
undefined, true);
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
await native_fs_utils.safe_unlink(fs_context, null_versioned_path, null_versioned_path_info,
gpfs_options?.delete_version, bucket_tmp_dir_path);
break;
} catch (err) {
dbg.warn(`NamespaceFS._delete_null_version_from_versions_directory error: retries=${retries} null_versioned_path=${null_versioned_path}`, err);
retries -= 1;
if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err;
dbg.warn(`NamespaceFS._delete_null_version_from_versions_directory Retrying retries=${retries} null_versioned_path=${null_versioned_path}`, err);
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
} finally {
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.delete_version, undefined, true);
}
}
}
Expand Down Expand Up @@ -3018,13 +3043,14 @@ class NamespaceFS {
await nb_native().fs.rename(fs_context, upload_params.upload_path, file_path);
return delete_marker_version_id;
} catch (err) {
dbg.warn(`NamespaceFS: _create_delete_marker failed error: retries=${retries}`, err);
retries -= 1;
if (retries <= 0) throw err;
if (err.code === 'EEXIST') {
dbg.warn(`NamespaceFS: _create_delete_marker already exists, success`, err);
return delete_marker_version_id;
}
dbg.warn(`NamespaceFS: _create_delete_marker failed retries=${retries}`, err);
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
} finally {
if (upload_params) await this.complete_object_upload_finally(undefined, undefined, upload_params.target_file, fs_context);
}
Expand Down Expand Up @@ -3070,6 +3096,8 @@ class NamespaceFS {
// eslint-disable-next-line max-params
async _open_files_gpfs(fs_context, src_path, dst_path, upload_or_dir_file, dst_ver_info, open_mode, delete_version, versioned_info) {
dbg.log1('Namespace_fs._open_files_gpfs:', src_path, src_path && path.dirname(src_path), dst_path, upload_or_dir_file, dst_ver_info, open_mode, delete_version, versioned_info);
const is_gpfs = native_fs_utils._is_gpfs(fs_context);
if (!is_gpfs) return;

let src_file;
let dst_file;
Expand Down Expand Up @@ -3133,6 +3161,22 @@ class NamespaceFS {
}
}

/**
* _check_version_moved recieves key and version_id and checks if the version still exists in one of the optional locations
* latest version location or .versions/ directory
* @param {import('./nb').NativeFSContext} fs_context
* @param {string} key
* @param {string} version_id
*/
async _check_version_moved(fs_context, key, version_id) {
const latest_version_path = this._get_file_path({ key });
const versioned_path = this._get_version_path(key, version_id);
const versioned_path_info = await this._get_version_info(fs_context, versioned_path);
if (versioned_path_info) throw error_utils.new_error_code('VERSION_MOVED', `version file moved from latest ${latest_version_path} to .versions/ ${versioned_path}, retrying`);
const latest_ver_info = await this._get_version_info(fs_context, latest_version_path);
if (latest_ver_info && latest_ver_info.version_id_str === version_id) throw error_utils.new_error_code('VERSION_MOVED', `version file moved from .versions/ ${versioned_path} to latest ${latest_version_path}, retrying`);
}

async _throw_if_storage_class_not_supported(storage_class) {
if (!await this._is_storage_class_supported(storage_class)) {
throw new S3Error(S3Error.InvalidStorageClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
'use strict';

const path = require('path');
const config = require('../../../../config');
const P = require('../../../util/promise');
const fs_utils = require('../../../util/fs_utils');
const NamespaceFS = require('../../../sdk/namespace_fs');
Expand Down Expand Up @@ -42,23 +41,21 @@ const nsfs = new NamespaceFS({

const DUMMY_OBJECT_SDK = make_dummy_object_sdk(true);
describe('test versioning concurrency', () => {
const prior_value_of_nsfs_rename_retries = config.NSFS_RENAME_RETRIES;

beforeEach(async () => {
await fs_utils.create_fresh_path(tmp_fs_path);
});

afterEach(async () => {
await fs_utils.folder_delete(tmp_fs_path);
config.NSFS_RENAME_RETRIES = prior_value_of_nsfs_rename_retries;
});

it('multiple puts of the same key', async () => {
const bucket = 'bucket1';
const key = 'key1';
const failed_operations = [];
const successful_operations = [];
const num_of_concurrency = 5;
const num_of_concurrency = 10;
for (let i = 0; i < num_of_concurrency; i++) {
const random_data = Buffer.from(String(i));
const body = buffer_utils.buffer_to_read_stream(random_data);
Expand Down Expand Up @@ -169,7 +166,6 @@ describe('test versioning concurrency', () => {
const failed_put_operations = [];
const failed_head_operations = [];
const number_of_iterations = 10;
config.NSFS_RENAME_RETRIES = 40;
for (let i = 0; i < number_of_iterations; i++) {
const random_data = Buffer.from(String(i));
const body = buffer_utils.buffer_to_read_stream(random_data);
Expand All @@ -187,6 +183,7 @@ describe('test versioning concurrency', () => {
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);
expect(versions.objects.length).toBe(number_of_iterations + 1); // 1 version before + 10 versions concurrent
});

it('concurrent puts & delete latest objects', async () => {
const bucket = 'bucket1';
const key = 'key3';
Expand Down Expand Up @@ -255,8 +252,7 @@ describe('test versioning concurrency', () => {
expect(num_of_latest_versions).toBe(1);
}, 6000);

// currently being skipped because it's not passing - probably a bug that we need to fix
it.skip('concurrent delete objects by version id/latest', async () => {
it('concurrent delete objects by version id/latest', async () => {
const bucket = 'bucket1';
const key = 'key5';
const delete_ver_res_arr = [];
Expand Down
Loading