Skip to content

5.15.8 | Bump version & backport #8435

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "noobaa-core",
"version": "5.15.7",
"version": "5.15.8",
"license": "SEE LICENSE IN LICENSE",
"description": "",
"homepage": "https://github.com/noobaa/noobaa-core",
Expand Down
38 changes: 17 additions & 21 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ class NamespaceFS {
}

/**
* @param {nb.ObjectSDK} object_sdk
* @param {nb.ObjectSDK} object_sdk
* @returns {nb.NativeFSContext}
*/
prepare_fs_context(object_sdk) {
Expand Down Expand Up @@ -1056,7 +1056,9 @@ class NamespaceFS {
// end the stream
res.end();

await stream_utils.wait_finished(res, { signal: object_sdk.abort_controller.signal });
// in case of transform streams such as ChunkFS there is also a readable part. since we expect write stream
// and don't care about the readable part, set readable: false
await stream_utils.wait_finished(res, { readable: false, signal: object_sdk.abort_controller.signal });
object_sdk.throw_if_aborted();

dbg.log0('NamespaceFS: read_object_stream completed file', file_path, {
Expand Down Expand Up @@ -1174,9 +1176,7 @@ class NamespaceFS {
}

if (copy_res) {
if (copy_res === copy_status_enum.FALLBACK) {
params.copy_source.nsfs_copy_fallback();
} else {
if (copy_res !== copy_status_enum.FALLBACK) {
// open file after copy link/same inode should use read open mode
open_mode = config.NSFS_OPEN_READ_MODE;
if (copy_res === copy_status_enum.SAME_INODE) open_path = file_path;
Expand Down Expand Up @@ -1259,10 +1259,8 @@ class NamespaceFS {
let stat = await target_file.stat({ ...fs_context, disable_ctime_check: part_upload });
this._verify_encryption(params.encryption, this._get_encryption_info(stat));

// handle xattr
// assign user xattr on non copy / copy with xattr_copy header provided
const copy_xattr = params.copy_source && params.xattr_copy;
let fs_xattr = copy_xattr ? undefined : to_fs_xattr(params.xattr);
let fs_xattr = to_fs_xattr(params.xattr);

// assign noobaa internal xattr - content type, md5, versioning xattr
if (params.content_type) {
Expand Down Expand Up @@ -1305,7 +1303,6 @@ class NamespaceFS {

// when object is a dir, xattr are set on the folder itself and the content is in .folder file
if (is_dir_content) {
if (params.copy_source) fs_xattr = await this._get_copy_source_xattr(params, fs_context, fs_xattr);
await this._assign_dir_content_to_xattr(fs_context, fs_xattr, { ...params, size: stat.size }, copy_xattr);
}
stat = await nb_native().fs.stat({ ...fs_context, disable_ctime_check: part_upload }, file_path);
Expand All @@ -1317,12 +1314,11 @@ class NamespaceFS {
await native_fs_utils._make_path_dirs(file_path, fs_context);
const copy_xattr = params.copy_source && params.xattr_copy;

let fs_xattr = copy_xattr ? {} : to_fs_xattr(params.xattr) || {};
let fs_xattr = to_fs_xattr(params.xattr) || {};
if (params.content_type) {
fs_xattr = fs_xattr || {};
fs_xattr[XATTR_CONTENT_TYPE] = params.content_type;
}
if (params.copy_source) fs_xattr = await this._get_copy_source_xattr(params, fs_context, fs_xattr);

await this._assign_dir_content_to_xattr(fs_context, fs_xattr, params, copy_xattr);
// when .folder exist and it's no upload flow - .folder should be deleted if it exists
Expand All @@ -1338,13 +1334,6 @@ class NamespaceFS {
return upload_info;
}

async _get_copy_source_xattr(params, fs_context, fs_xattr) {
const is_source_dir = params.copy_source.key.endsWith('/');
const source_file_md_path = await this._find_version_path(fs_context, params.copy_source, is_source_dir);
const source_stat = await nb_native().fs.stat(fs_context, source_file_md_path);
return { ...source_stat.xattr, ...fs_xattr };
}

// move to dest GPFS (wt) / POSIX (w / undefined) - non part upload
async _move_to_dest(fs_context, source_path, dest_path, target_file, open_mode, key) {
let retries = config.NSFS_RENAME_RETRIES;
Expand Down Expand Up @@ -1475,7 +1464,7 @@ class NamespaceFS {
// Can be finetuned further on if needed and inserting the Semaphore logic inside
// Instead of wrapping the whole _upload_stream function (q_buffers lives outside of the data scope of the stream)
async _upload_stream({ fs_context, params, target_file, object_sdk, offset }) {
const { source_stream } = params;
const { source_stream, copy_source } = params;
try {
// Not using async iterators with ReadableStreams due to unsettled promises issues on abort/destroy
const md5_enabled = config.NSFS_CALCULATE_MD5 || (this.force_md5_etag ||
Expand All @@ -1491,8 +1480,14 @@ class NamespaceFS {
large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size
});
chunk_fs.on('error', err1 => dbg.error('namespace_fs._upload_stream: error occured on stream ChunkFS: ', err1));
await stream_utils.pipeline([source_stream, chunk_fs]);
await stream_utils.wait_finished(chunk_fs);
if (copy_source) {
await this.read_object_stream(copy_source, object_sdk, chunk_fs);
} else if (params.source_params) {
await params.source_ns.read_object_stream(params.source_params, object_sdk, chunk_fs);
} else {
await stream_utils.pipeline([source_stream, chunk_fs]);
await stream_utils.wait_finished(chunk_fs);
}
return { digest: chunk_fs.digest, total_bytes: chunk_fs.total_bytes };
} catch (error) {
dbg.error('_upload_stream had error: ', error);
Expand Down Expand Up @@ -1779,6 +1774,7 @@ class NamespaceFS {
upload_params.params.xattr = create_params_parsed.xattr;
upload_params.params.storage_class = create_params_parsed.storage_class;
upload_params.digest = MD5Async && (((await MD5Async.digest()).toString('hex')) + '-' + multiparts.length);
upload_params.params.content_type = create_params_parsed.content_type;

const upload_info = await this._finish_upload(upload_params);

Expand Down
33 changes: 18 additions & 15 deletions src/sdk/object_sdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ class ObjectSDK {
* in order to handle aborting requests gracefully. The `abort_controller` member will
* be used to signal async flows that abort was detected.
* @see {@link https://nodejs.org/docs/latest/api/globals.html#class-abortcontroller}
* @param {import('http').IncomingMessage} req
* @param {import('http').ServerResponse} res
* @param {import('http').IncomingMessage} req
* @param {import('http').ServerResponse} res
*/
setup_abort_controller(req, res) {
res.once('error', err => {
Expand Down Expand Up @@ -157,7 +157,7 @@ class ObjectSDK {
}

/**
* @param {string} name
* @param {string} name
* @returns {Promise<nb.Namespace>}
*/
async _get_bucket_namespace(name) {
Expand Down Expand Up @@ -268,7 +268,7 @@ class ObjectSDK {
return Boolean(fs_root_path || fs_root_path === '');
}

// validates requests for non nsfs buckets from accounts which are nsfs_only
// validates requests for non nsfs buckets from accounts which are nsfs_only
has_non_nsfs_bucket_access(account, ns) {
dbg.log1('validate_non_nsfs_bucket: ', account, ns?.write_resource?.resource);
if (!account) return false;
Expand Down Expand Up @@ -524,7 +524,7 @@ class ObjectSDK {
/**
* Calls the op and report time and error to stats collector.
* on_success can be added to update read/write stats (but on_success shouln't throw)
*
*
* @template T
* @param {{
* op_name: string;
Expand Down Expand Up @@ -642,7 +642,9 @@ class ObjectSDK {
params.content_type = source_md.content_type;
}
try {
if (params.xattr) params.xattr = _.omitBy(params.xattr, (val, name) => name.startsWith('noobaa-namespace'));
//omitBy iterates all xattr calling startsWith on them. this can include symbols such as XATTR_SORT_SYMBOL.
//in that case startsWith will not apply
if (params.xattr) params.xattr = _.omitBy(params.xattr, (val, name) => name.startsWith?.('noobaa-namespace'));
} catch (e) {
dbg.log3("Got an error while trying to omitBy param.xattr:", params.xattr, "error:", e);
}
Expand All @@ -658,19 +660,14 @@ class ObjectSDK {
params.copy_source.bucket = actual_source_ns.get_bucket(bucket);
params.copy_source.obj_id = source_md.obj_id;
params.copy_source.version_id = source_md.version_id;
if (source_ns instanceof NamespaceFS) {
params.copy_source.nsfs_copy_fallback = () => {
this._populate_nsfs_copy_fallback({ source_params, source_ns, params });
params.copy_source = null;
};
}
} else {
// source cannot be copied directly (different plaforms, accounts, etc.)
// set the source_stream to read from the copy source
// Source params need these for read operations
source_params.object_md = source_md;
source_params.obj_id = source_md.obj_id;
source_params.version_id = source_md.version_id;
source_params.bucket = actual_source_ns.get_bucket(bucket);
// param size is needed when doing an upload. Can be overrided during ranged writes
params.size = source_md.size;

Expand All @@ -684,7 +681,13 @@ class ObjectSDK {

// if the source namespace is NSFS then we need to pass the read_object_stream the read_stream
if (source_ns instanceof NamespaceFS) {
this._populate_nsfs_copy_fallback({ source_params, source_ns, params });
if (target_ns instanceof NamespaceFS) {
params.source_ns = actual_source_ns;
params.source_params = source_params;
} else {
//this._populate_nsfs_copy_fallback({ source_params, source_ns, params });
throw new Error('TODO fix _populate_nsfs_copy_fallback');
}
} else {
params.source_stream = await source_ns.read_object_stream(source_params, this);
}
Expand All @@ -701,9 +704,9 @@ class ObjectSDK {
}
}

// nsfs copy_object & server side copy consisted of link and a fallback to
// nsfs copy_object & server side copy consisted of link and a fallback to
// read stream and then upload stream
// nsfs copy object when can't server side copy - fallback directly
// nsfs copy object when can't server side copy - fallback directly
_populate_nsfs_copy_fallback({ source_ns, params, source_params }) {
const read_stream = new stream.PassThrough();
source_ns.read_object_stream(source_params, this, read_stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,7 @@ s3tests_boto3/functional/test_s3.py::test_put_object_ifmatch_failed
s3tests_boto3/functional/test_s3.py::test_put_object_ifnonmatch_failed
s3tests_boto3/functional/test_s3.py::test_put_object_ifnonmatch_overwrite_existed_failed
s3tests_boto3/functional/test_s3.py::test_object_raw_authenticated_bucket_gone
s3tests_boto3/functional/test_s3.py::test_object_copy_to_itself_with_metadata
s3tests_boto3/functional/test_s3.py::test_object_copy_canned_acl
s3tests_boto3/functional/test_s3.py::test_object_copy_retaining_metadata
s3tests_boto3/functional/test_s3.py::test_object_copy_replacing_metadata
s3tests_boto3/functional/test_s3.py::test_object_copy_versioning_multipart_upload
s3tests_boto3/functional/test_s3.py::test_list_multipart_upload
s3tests_boto3/functional/test_s3.py::test_multipart_upload_missing_part
s3tests_boto3/functional/test_s3.py::test_multipart_upload_incorrect_etag
Expand Down
26 changes: 21 additions & 5 deletions src/test/unit_tests/test_namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -1575,12 +1575,14 @@ mocha.describe('namespace_fs copy object', function() {
assert.deepStrictEqual(xattr, { ...add_user_prefix(read_md_res.xattr), [XATTR_DIR_CONTENT]: `${read_md_res.size}` });
assert.equal(stream_content_type, read_md_res.content_type);

const copy_source = { bucket: upload_bkt, key: key1 };
await ns_tmp.upload_object({
bucket: upload_bkt,
key: key2,
copy_source: { bucket: upload_bkt, key: key1 },
copy_source: copy_source,
size: 100,
xattr_copy: true
xattr_copy: true,
xattr: await _get_source_copy_xattr(copy_source, ns_tmp, dummy_object_sdk)
}, dummy_object_sdk);
const file_path2 = ns_tmp_bucket_path + '/' + key2;
xattr = await get_xattr(file_path2);
Expand Down Expand Up @@ -1615,12 +1617,14 @@ mocha.describe('namespace_fs copy object', function() {
assert.deepStrictEqual(xattr, { ...add_user_prefix(read_md_res.xattr) });
assert.equal(stream_content_type, read_md_res.content_type);

const copy_source = { bucket: upload_bkt, key: src_key };
await ns_tmp.upload_object({
bucket: upload_bkt,
key: dst_key,
copy_source: { bucket: upload_bkt, key: src_key },
copy_source: copy_source,
size: 100,
xattr_copy: true,
xattr: await _get_source_copy_xattr(copy_source, ns_tmp, dummy_object_sdk)
}, dummy_object_sdk);
const file_path2 = ns_tmp_bucket_path + '/' + dst_key;
xattr = await get_xattr(file_path2);
Expand Down Expand Up @@ -1656,12 +1660,14 @@ mocha.describe('namespace_fs copy object', function() {
assert.deepStrictEqual(xattr, { ...add_user_prefix(read_md_res.xattr) });
assert.equal(stream_content_type, read_md_res.content_type);

const copy_source = { bucket: upload_bkt, key: src_key };
await ns_tmp.upload_object({
bucket: upload_bkt,
key: dst_key,
copy_source: { bucket: upload_bkt, key: src_key },
copy_source: copy_source,
size: 0,
xattr_copy: true
xattr_copy: true,
xattr: await _get_source_copy_xattr(copy_source, ns_tmp, dummy_object_sdk)
}, dummy_object_sdk);
const file_path2 = ns_tmp_bucket_path + '/' + dst_key;
xattr = await get_xattr(file_path2);
Expand All @@ -1687,6 +1693,16 @@ mocha.describe('namespace_fs copy object', function() {

});

//simulates object_sdk.fix_copy_source_params filtering of source xattr for copy object tests
async function _get_source_copy_xattr(copy_source, source_ns, object_sdk) {
const read_md_res = await source_ns.read_object_md({
bucket: copy_source.bucket,
key: copy_source.key
}, object_sdk);
const res = _.omitBy(read_md_res.xattr, (val, name) => name.startsWith?.('noobaa-namespace'));
return res;
}

async function list_objects(ns, bucket, delimiter, prefix, dummy_object_sdk) {
const res = await ns.list_objects({
bucket: bucket,
Expand Down
Loading