Skip to content

Backports to 5.17.2 #8605

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,8 @@ config.NSFS_LOW_FREE_SPACE_PERCENT_UNLEASH = 0.10;
// anonymous account name
config.ANONYMOUS_ACCOUNT_NAME = 'anonymous';

config.NFSF_UPLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024;

////////////////////////////
// NSFS NON CONTAINERIZED //
////////////////////////////
Expand Down
12 changes: 11 additions & 1 deletion src/endpoint/endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,17 @@ async function main(options = {}) {
// the primary just forks and returns, workers will continue to serve
fork_count = options.forks ?? config.ENDPOINT_FORKS;
const metrics_port = options.metrics_port || config.EP_METRICS_SERVER_PORT;
if (fork_utils.start_workers(metrics_port, fork_count)) return;
/**
* Please notice that we can run the main in 2 states:
* 1. Only the primary process runs the main (fork is 0 or undefined) - everything that
* is implemented here would be run by this process.
* 2. A primary process with multiple forks (IMPORTANT) - if there is implementation that
* in only relevant to the primary process it should be implemented in
* fork_utils.start_workers because the primary process returns after start_workers
* and the forks will continue executing the code lines in this function
* */
const is_workers_started_from_primary = await fork_utils.start_workers(metrics_port, fork_count);
if (is_workers_started_from_primary) return;

const http_port = options.http_port || config.ENDPOINT_PORT;
const https_port = options.https_port || config.ENDPOINT_SSL_PORT;
Expand Down
15 changes: 15 additions & 0 deletions src/endpoint/s3/s3_errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,21 @@ S3Error.InvalidEncodingType = Object.freeze({
message: 'Invalid Encoding Method specified in Request',
http_code: 400,
});
S3Error.AuthorizationQueryParametersError = Object.freeze({
code: 'AuthorizationQueryParametersError',
message: 'X-Amz-Expires must be less than a week (in seconds); that is, the given X-Amz-Expires must be less than 604800 seconds',
http_code: 400,
});
S3Error.RequestExpired = Object.freeze({
code: 'AccessDenied',
message: 'Request has expired',
http_code: 403,
});
S3Error.RequestNotValidYet = Object.freeze({
code: 'AccessDenied',
message: 'request is not valid yet',
http_code: 403,
});

////////////////////////////////////////////////////////////////
// S3 Select //
Expand Down
12 changes: 10 additions & 2 deletions src/manage_nsfs/diagnose.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,17 @@ async function gather_metrics() {
const buffer = await buffer_utils.read_stream_join(res);
const body = buffer.toString('utf8');
metrics_output = JSON.parse(body);
if (!metrics_output) throw new Error('received empty metrics response', { cause: res.statusCode });
write_stdout_response(ManageCLIResponse.MetricsStatus, metrics_output);
} else if (res.statusCode >= 500 && res.rawHeaders.includes('application/json')) {
const buffer = await buffer_utils.read_stream_join(res);
const body = buffer.toString('utf8');
const error_output = JSON.parse(body);
if (!error_output) throw new Error('received empty metrics response', { cause: res.statusCode });
throw_cli_error({ ...ManageCLIError.MetricsStatusFailed, ...error_output });
} else {
throw new Error('received empty metrics response', { cause: res.statusCode });
}
if (!metrics_output) throw new Error('recieved empty metrics response', { cause: res.statusCode });
write_stdout_response(ManageCLIResponse.MetricsStatus, metrics_output);
} catch (err) {
dbg.warn('could not receive metrics response', err);
throw_cli_error({ ...ManageCLIError.MetricsStatusFailed, cause: err?.errors?.[0] || err });
Expand Down
29 changes: 16 additions & 13 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const stream_utils = require('../util/stream_utils');
const buffer_utils = require('../util/buffer_utils');
const size_utils = require('../util/size_utils');
const native_fs_utils = require('../util/native_fs_utils');
const ChunkFS = require('../util/chunk_fs');
const FileWriter = require('../util/file_writer');
const LRUCache = require('../util/lru_cache');
const nb_native = require('../util/nb_native');
const RpcError = require('../rpc/rpc_error');
Expand Down Expand Up @@ -1563,30 +1563,33 @@ class NamespaceFS {
// Can be finetuned further on if needed and inserting the Semaphore logic inside
// Instead of wrapping the whole _upload_stream function (q_buffers lives outside of the data scope of the stream)
async _upload_stream({ fs_context, params, target_file, object_sdk, offset }) {
const { source_stream, copy_source } = params;
const { copy_source } = params;
try {
// Not using async iterators with ReadableStreams due to unsettled promises issues on abort/destroy
const md5_enabled = this._is_force_md5_enabled(object_sdk);
const chunk_fs = new ChunkFS({
const file_writer = new FileWriter({
target_file,
fs_context,
stats: this.stats,
namespace_resource_id: this.namespace_resource_id,
md5_enabled,
offset,
md5_enabled,
stats: this.stats,
bucket: params.bucket,
large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size
large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size,
namespace_resource_id: this.namespace_resource_id,
});
chunk_fs.on('error', err1 => dbg.error('namespace_fs._upload_stream: error occured on stream ChunkFS: ', err1));
file_writer.on('error', err => dbg.error('namespace_fs._upload_stream: error occured on FileWriter: ', err));
file_writer.on('finish', arg => dbg.log1('namespace_fs._upload_stream: finish occured on stream FileWriter: ', arg));
file_writer.on('close', arg => dbg.log1('namespace_fs._upload_stream: close occured on stream FileWriter: ', arg));

if (copy_source) {
await this.read_object_stream(copy_source, object_sdk, chunk_fs);
await this.read_object_stream(copy_source, object_sdk, file_writer);
} else if (params.source_params) {
await params.source_ns.read_object_stream(params.source_params, object_sdk, chunk_fs);
await params.source_ns.read_object_stream(params.source_params, object_sdk, file_writer);
} else {
await stream_utils.pipeline([source_stream, chunk_fs]);
await stream_utils.wait_finished(chunk_fs);
await stream_utils.pipeline([params.source_stream, file_writer]);
await stream_utils.wait_finished(file_writer);
}
return { digest: chunk_fs.digest, total_bytes: chunk_fs.total_bytes };
return { digest: file_writer.digest, total_bytes: file_writer.total_bytes };
} catch (error) {
dbg.error('_upload_stream had error: ', error);
throw error;
Expand Down
27 changes: 20 additions & 7 deletions src/server/analytic_services/prometheus_reporting.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,7 @@ async function start_server(
const server = http.createServer(async (req, res) => {
// Serve all metrics on the root path for system that do have one or more fork running.
if (fork_enabled) {
const metrics = await aggregatorRegistry.clusterMetrics();
if (req.url === '' || req.url === '/') {
res.writeHead(200, { 'Content-Type': aggregatorRegistry.contentType });
res.end(metrics);
return;
}
// we would like this part to be first as clusterMetrics might fail.
if (req.url === '/metrics/nsfs_stats') {
res.writeHead(200, { 'Content-Type': 'text/plain' });
const nsfs_report = {
Expand All @@ -77,6 +72,24 @@ async function start_server(
res.end(JSON.stringify(nsfs_report));
return;
}
let metrics;
try {
metrics = await aggregatorRegistry.clusterMetrics();
} catch (err) {
dbg.error('start_server: Could not get the metrics, got an error', err);
res.writeHead(504, { 'Content-Type': 'application/json' });
const reply = JSON.stringify({
error: 'Internal server error - timeout',
message: 'Looks like the server is taking a long time to respond (Could not get the metrics)',
});
res.end(reply);
return;
}
if (req.url === '' || req.url === '/') {
res.writeHead(200, { 'Content-Type': aggregatorRegistry.contentType });
res.end(metrics);
return;
}
// Serve report's metrics on the report name path
const report_name = req.url.substr(1);
const single_metrics = export_single_metrics(metrics, report_name);
Expand Down Expand Up @@ -165,7 +178,7 @@ async function metrics_nsfs_stats_handler() {
op_stats_counters: op_stats_counters,
fs_worker_stats_counters: fs_worker_stats_counters
};
dbg.log1(`_create_nsfs_report: nsfs_report ${nsfs_report}`);
dbg.log1('_create_nsfs_report: nsfs_report', nsfs_report);
return JSON.stringify(nsfs_report);
}

Expand Down
2 changes: 1 addition & 1 deletion src/test/unit_tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require('./test_bucket_chunks_builder');
require('./test_mirror_writer');
require('./test_namespace_fs');
require('./test_ns_list_objects');
require('./test_chunk_fs');
require('./test_file_writer');
require('./test_namespace_fs_mpu');
require('./test_nb_native_fs');
require('./test_s3select');
Expand Down
2 changes: 2 additions & 0 deletions src/test/unit_tests/nc_coretest.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ function setup(options = {}) {
});

// TODO - run health
// wait 2 seconds before announcing nc coretes is ready
await P.delay(2000);
await announce(`nc coretest ready... (took ${((Date.now() - start) / 1000).toFixed(1)} sec)`);
});

Expand Down
2 changes: 1 addition & 1 deletion src/test/unit_tests/nc_index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ coretest.setup();

require('./test_namespace_fs');
require('./test_ns_list_objects');
require('./test_chunk_fs');
require('./test_file_writer');
require('./test_namespace_fs_mpu');
require('./test_nb_native_fs');
require('./test_nc_nsfs_cli');
Expand Down
134 changes: 133 additions & 1 deletion src/test/unit_tests/test_bucketspace.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* Copyright (C) 2020 NooBaa */
/*eslint max-lines: ["error", 2200]*/
/*eslint max-lines: ["error", 2500]*/
/*eslint max-lines-per-function: ["error", 1300]*/
/*eslint max-statements: ["error", 80, { "ignoreTopLevelFunctions": true }]*/
'use strict';
Expand All @@ -12,9 +12,15 @@ const util = require('util');
const http = require('http');
const mocha = require('mocha');
const assert = require('assert');
const http_utils = require('../../util/http_utils');
const config = require('../../../config');
const fs_utils = require('../../util/fs_utils');
const { JSON_SUFFIX } = require('../../sdk/config_fs');
const fetch = require('node-fetch');
const P = require('../../util/promise');
const cloud_utils = require('../../util/cloud_utils');
const SensitiveString = require('../../util/sensitive_string');
const S3Error = require('../../../src/endpoint/s3/s3_errors').S3Error;
const test_utils = require('../system_tests/test_utils');
const { stat, open } = require('../../util/nb_native')().fs;
const { get_process_fs_context } = require('../../util/native_fs_utils');
Expand Down Expand Up @@ -2116,3 +2122,129 @@ async function delete_anonymous_account(accounts_dir_path, account_config_path)
console.log('Anonymous account Deleted');
}

mocha.describe('Presigned URL tests', function() {
this.timeout(50000); // eslint-disable-line no-invalid-this
const nsr = 'presigned_url_nsr';
const account_name = 'presigned_url_account';
const fs_path = path.join(TMP_PATH, 'presigned_url_tests/');
const presigned_url_bucket = 'presigned-url-bucket';
const presigned_url_object = 'presigned-url-object.txt';
const presigned_body = 'presigned_body';
let s3_client;
let access_key;
let secret_key;
CORETEST_ENDPOINT = coretest.get_http_address();
let valid_default_presigned_url;
let presigned_url_params;

mocha.before(async function() {
await fs_utils.create_fresh_path(fs_path);
await rpc_client.pool.create_namespace_resource({ name: nsr, nsfs_config: { fs_root_path: fs_path } });
const new_buckets_path = is_nc_coretest ? fs_path : '/';
const nsfs_account_config = {
uid: process.getuid(), gid: process.getgid(), new_buckets_path, nsfs_only: true
};
const account_params = { ...new_account_params, email: `${account_name}@noobaa.io`, name: account_name, default_resource: nsr, nsfs_account_config };
const res = await rpc_client.account.create_account(account_params);
access_key = res.access_keys[0].access_key;
secret_key = res.access_keys[0].secret_key;
s3_client = generate_s3_client(access_key.unwrap(), secret_key.unwrap(), CORETEST_ENDPOINT);
await s3_client.createBucket({ Bucket: presigned_url_bucket });
await s3_client.putObject({ Bucket: presigned_url_bucket, Key: presigned_url_object, Body: presigned_body });

presigned_url_params = {
bucket: new SensitiveString(presigned_url_bucket),
key: presigned_url_object,
endpoint: CORETEST_ENDPOINT,
access_key: access_key,
secret_key: secret_key
};
valid_default_presigned_url = cloud_utils.get_signed_url(presigned_url_params);
});

mocha.after(async function() {
if (!is_nc_coretest) return;
await s3_client.deleteObject({ Bucket: presigned_url_bucket, Key: presigned_url_object });
await s3_client.deleteBucket({ Bucket: presigned_url_bucket });
await rpc_client.account.delete_account({ email: `${account_name}@noobaa.io` });
await fs_utils.folder_delete(fs_path);
});

it('fetch valid presigned URL - 604800 seconds - epoch expiry - should return object data', async () => {
const data = await fetchData(valid_default_presigned_url);
assert.equal(data, presigned_body);
});

it('fetch valid presigned URL - 604800 seconds - should return object data - with valid date + expiry in seconds', async () => {
const now = new Date();
const valid_url_with_date = valid_default_presigned_url + '&X-Amz-Date=' + now.toISOString() + '&X-Amz-Expires=' + 604800;
const data = await fetchData(valid_url_with_date);
assert.equal(data, presigned_body);
});

it('fetch invalid presigned URL - 604800 seconds - epoch expiry + with future date', async () => {
const now = new Date();
// Add one hour (3600000 milliseconds)
const one_hour_in_ms = 60 * 60 * 1000;
const one_hour_from_now = new Date(now.getTime() + one_hour_in_ms);
const future_presigned_url = valid_default_presigned_url + '&X-Amz-Date=' + one_hour_from_now.toISOString();
const expected_err = new S3Error(S3Error.RequestNotValidYet);
await assert_throws_async(fetchData(future_presigned_url), expected_err.message);
});

it('fetch invalid presigned URL - 604800 expiry seconds + with future date', async () => {
const now = new Date();
// Add one hour (3600000 milliseconds)
const one_hour_in_ms = 60 * 60 * 1000;
const one_hour_from_now = new Date(now.getTime() + one_hour_in_ms);
const future_presigned_url = valid_default_presigned_url + '&X-Amz-Date=' + one_hour_from_now.toISOString() + '&X-Amz-Expires=' + 604800;
const expected_err = new S3Error(S3Error.RequestNotValidYet);
await assert_throws_async(fetchData(future_presigned_url), expected_err.message);
});

it('fetch invalid presigned URL - 604800 seconds - epoch expiry - URL expired', async () => {
const expired_presigned_url = cloud_utils.get_signed_url(presigned_url_params, 1);
// wait for 2 seconds before fetching the url
await P.delay(2000);
const expected_err = new S3Error(S3Error.RequestExpired);
await assert_throws_async(fetchData(expired_presigned_url), expected_err.message);
});

it('fetch invalid presigned URL - 604800 expiry seconds - URL expired', async () => {
const now = new Date();
const expired_presigned_url = cloud_utils.get_signed_url(presigned_url_params, 1) + '&X-Amz-Date=' + now.toISOString() + '&X-Amz-Expires=' + 1;
// wait for 2 seconds before fetching the url
await P.delay(2000);
const expected_err = new S3Error(S3Error.RequestExpired);
await assert_throws_async(fetchData(expired_presigned_url), expected_err.message);
});

it('fetch invalid presigned URL - expiry expoch - expire in bigger than limit', async () => {
const invalid_expiry = 604800 + 10;
const invalid_expiry_presigned_url = cloud_utils.get_signed_url(presigned_url_params, invalid_expiry);
const expected_err = new S3Error(S3Error.AuthorizationQueryParametersError);
await assert_throws_async(fetchData(invalid_expiry_presigned_url), expected_err.message);
});

it('fetch invalid presigned URL - expire in bigger than limit', async () => {
const now = new Date();
const invalid_expiry = 604800 + 10;
const invalid_expiry_presigned_url = cloud_utils.get_signed_url(presigned_url_params, invalid_expiry) + '&X-Amz-Date=' + now.toISOString() + '&X-Amz-Expires=' + invalid_expiry;
const expected_err = new S3Error(S3Error.AuthorizationQueryParametersError);
await assert_throws_async(fetchData(invalid_expiry_presigned_url), expected_err.message);
});
});

async function fetchData(presigned_url) {
const response = await fetch(presigned_url, { agent: new http.Agent({ keepAlive: false }) });
let data;
if (!response.ok) {
data = (await response.text()).trim();
const err_json = (await http_utils.parse_xml_to_js(data)).Error;
const err = new Error(err_json.Message);
err.code = err_json.Code;
throw err;
}
data = await response.text();
return data.trim();
}
34 changes: 0 additions & 34 deletions src/test/unit_tests/test_chunk_fs.js

This file was deleted.

Loading
Loading