Skip to content

Backports | 5.18.2 | NC | Lifecycle #8900

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,9 @@ config.NC_LIFECYCLE_RUN_DELAY_LIMIT_MINS = 2;
/** @type {'UTC' | 'LOCAL'} */
config.NC_LIFECYCLE_TZ = 'LOCAL';

config.NC_LIFECYCLE_LIST_BATCH_SIZE = 1000;
config.NC_LIFECYCLE_BUCKET_BATCH_SIZE = 10000;

config.NC_LIFECYCLE_GPFS_ILM_ENABLED = false;
////////// GPFS //////////
config.GPFS_DOWN_DELAY = 1000;
Expand Down
59 changes: 54 additions & 5 deletions src/manage_nsfs/health.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

const dbg = require('../util/debug_module')(__filename);
const _ = require('lodash');
const path = require('path');
const P = require('../util/promise');
const config = require('../../config');
const os_util = require('../util/os_utils');
Expand Down Expand Up @@ -104,6 +105,7 @@ class NSFSHealth {
this.all_bucket_details = options.all_bucket_details;
this.all_connection_details = options.all_connection_details;
this.notif_storage_threshold = options.notif_storage_threshold;
this.lifecycle = options.lifecycle;
this.config_fs = options.config_fs;
}

Expand Down Expand Up @@ -133,6 +135,7 @@ class NSFSHealth {
let account_details;
let connection_details;
let notif_storage_threshold_details;
let latest_lifecycle_run_status;
const endpoint_response_code = (endpoint_state && endpoint_state.response?.response_code) || 'UNKNOWN_ERROR';
const health_check_params = { service_status, pid, endpoint_response_code, config_directory_status };
const service_health = this._calc_health_status(health_check_params);
Expand All @@ -141,6 +144,8 @@ class NSFSHealth {
if (this.all_account_details) account_details = await this.get_account_status();
if (this.all_connection_details) connection_details = await this.get_connection_status();
if (this.notif_storage_threshold) notif_storage_threshold_details = this.get_notif_storage_threshold_status();
if (this.lifecycle) latest_lifecycle_run_status = await this.get_lifecycle_health_status();

const health = {
service_name: NOOBAA_SERVICE_NAME,
status: service_health,
Expand All @@ -164,7 +169,8 @@ class NSFSHealth {
error_type: health_errors_tyes.PERSISTENT,
},
connections_status: connection_details,
notif_storage_threshold_details
notif_storage_threshold_details,
latest_lifecycle_run_status
}
};
if (!this.all_account_details) delete health.checks.accounts_status;
Expand Down Expand Up @@ -333,10 +339,10 @@ class NSFSHealth {
};
}

async validate_config_dir_exists(path, type) {
const config_root_type_exists = await this.config_fs.validate_config_dir_exists(path);
async validate_config_dir_exists(config_dir_path, type) {
const config_root_type_exists = await this.config_fs.validate_config_dir_exists(config_dir_path);
if (!config_root_type_exists) {
dbg.log1(`Config directory type - ${type} is missing, ${path}`);
dbg.log1(`Config directory type - ${type} is missing, ${config_dir_path}`);
return {
invalid_storages: [],
valid_storages: []
Expand Down Expand Up @@ -446,6 +452,48 @@ class NSFSHealth {
return res;
}

/////////////////////////////
// LIFECYCLE HEALTH STATUS //
/////////////////////////////

/**
* get_lifecycle_health_status returns the lifecycle rules status based on the status of the latest lifecycle wroker run
* on the same host
* @returns {Promise<object>}
*/
async get_lifecycle_health_status() {
const latest_lifecycle_run_status = await this.get_latest_lifecycle_run_status({ silent_if_missing: true });
if (!latest_lifecycle_run_status) return {};
return {
total_stats: latest_lifecycle_run_status.total_stats,
lifecycle_run_times: latest_lifecycle_run_status.lifecycle_run_times,
errors: latest_lifecycle_run_status.errors
};
}


/**
* get_latest_lifecycle_run_status returns the latest lifecycle run status
* latest run can be found by maxing the lifecycle log entry names, log entry name is the lifecycle_run_{timestamp}.json of the run
* @params {{silent_if_missing: boolean}} options
* @returns {Promise<object | undefined >}
*/
async get_latest_lifecycle_run_status(options) {
const { silent_if_missing = false } = options;
try {
const lifecycle_log_entries = await nb_native().fs.readdir(this.config_fs.fs_context, config.NC_LIFECYCLE_LOGS_DIR);
const latest_lifecycle_run = _.maxBy(lifecycle_log_entries, entry => entry.name);
const latest_lifecycle_run_status_path = path.join(config.NC_LIFECYCLE_LOGS_DIR, latest_lifecycle_run.name);
const latest_lifecycle_run_status = await this.config_fs.get_config_data(latest_lifecycle_run_status_path, options);
return latest_lifecycle_run_status;
} catch (err) {
if (err.code === 'ENOENT' && silent_if_missing) {
return;
}
throw err;
}
}

/**
* get_config_file_data_or_error_object return an object containing config_data or err_obj if error occurred
* @param {string} type
Expand Down Expand Up @@ -613,10 +661,11 @@ async function get_health_status(argv, config_fs) {
const all_bucket_details = get_boolean_or_string_value(argv.all_bucket_details);
const all_connection_details = get_boolean_or_string_value(argv.all_connection_details);
const notif_storage_threshold = get_boolean_or_string_value(argv.notif_storage_threshold);
const lifecycle = get_boolean_or_string_value(argv.lifecycle);

if (deployment_type === 'nc') {
const health = new NSFSHealth({ https_port,
all_account_details, all_bucket_details, all_connection_details, notif_storage_threshold, config_fs });
all_account_details, all_bucket_details, all_connection_details, notif_storage_threshold, lifecycle, config_fs });
const health_status = await health.nc_nsfs_health();
write_stdout_response(ManageCLIResponse.HealthStatus, health_status);
} else {
Expand Down
3 changes: 2 additions & 1 deletion src/manage_nsfs/manage_nsfs_constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ const VALID_OPTIONS_GLACIER = {
};

const VALID_OPTIONS_DIAGNOSE = {
'health': new Set([ 'https_port', 'deployment_type', 'all_account_details', 'all_bucket_details', 'all_connection_details', 'notif_storage_threshold', ...CLI_MUTUAL_OPTIONS]),
'health': new Set([ 'https_port', 'deployment_type', 'all_account_details', 'all_bucket_details', 'all_connection_details', 'notif_storage_threshold', 'lifecycle', ...CLI_MUTUAL_OPTIONS]),
'gather-logs': new Set([ CONFIG_ROOT_FLAG]),
'metrics': new Set([CONFIG_ROOT_FLAG])
};
Expand Down Expand Up @@ -150,6 +150,7 @@ const OPTION_TYPE = {
notif_storage_threshold: 'boolean',
https_port: 'number',
debug: 'number',
lifecycle: 'boolean',
// upgrade options
expected_version: 'string',
expected_hosts: 'string',
Expand Down
4 changes: 2 additions & 2 deletions src/manage_nsfs/manage_nsfs_events_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ NoobaaEvent.LIFECYCLE_FAILED = Object.freeze({
event_type: 'ERROR',
scope: 'NODE',
severity: 'ERROR',
state: 'DEGRADED',
state: 'HEALTHY',
});

NoobaaEvent.LIFECYCLE_TIMEOUT = Object.freeze({
Expand All @@ -455,7 +455,7 @@ NoobaaEvent.LIFECYCLE_TIMEOUT = Object.freeze({
event_type: 'ERROR',
scope: 'NODE',
severity: 'ERROR',
state: 'DEGRADED',
state: 'HEALTHY',
});

exports.NoobaaEvent = NoobaaEvent;
1 change: 1 addition & 0 deletions src/manage_nsfs/manage_nsfs_help_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ Flags:
--all_account_details <boolean> (optional) Set a flag for returning all account details.
--all_bucket_details <boolean> (optional) Set a flag for returning all bucket details.
--all_connection_details <boolean> (optional) Set a flag for returning all connection details.
--lifecycle <boolean> (optional) Set a flag for returning lifecycle details on the current host.
--debug <number> (optional) Use for increasing the log verbosity of health cli commands.
--config_root <string> (optional) Set Configuration files path for Noobaa standalon NSFS. (default config.NSFS_NC_DEFAULT_CONF_DIR)

Expand Down
114 changes: 64 additions & 50 deletions src/manage_nsfs/nc_lifecycle.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ const config_fs_options = { silent_if_missing: true };

const lifecycle_run_status = {
running_host: os.hostname(), lifecycle_run_times: {},
total_stats: _get_default_stats(), buckets_statuses: {}
total_stats: _get_default_stats(), buckets_statuses: {},
state: {is_finished: false}
};

let return_short_status = false;
Expand Down Expand Up @@ -137,13 +138,19 @@ async function run_lifecycle(config_fs, disable_service_validation) {
*/
async function process_buckets(config_fs, bucket_names, system_json) {
const buckets_concurrency = 10; // TODO - think about it
await P.map_with_concurrency(buckets_concurrency, bucket_names, async bucket_name =>
await _call_op_and_update_status({
bucket_name,
op_name: TIMED_OPS.PROCESS_BUCKET,
op_func: async () => process_bucket(config_fs, bucket_name, system_json)
})
);
while (!lifecycle_run_status.state.is_finished) {
await P.map_with_concurrency(buckets_concurrency, bucket_names, async bucket_name =>
await _call_op_and_update_status({
bucket_name,
op_name: TIMED_OPS.PROCESS_BUCKET,
op_func: async () => process_bucket(config_fs, bucket_name, system_json)
})
);
lifecycle_run_status.state.is_finished = Object.values(lifecycle_run_status.buckets_statuses).reduce(
(acc, bucket) => acc && (bucket.state?.is_finished),
true
);
}
}

/**
Expand All @@ -158,7 +165,10 @@ async function process_bucket(config_fs, bucket_name, system_json) {
const object_sdk = new NsfsObjectSDK('', config_fs, account, bucket_json.versioning, config_fs.config_root, system_json);
await object_sdk._simple_load_requesting_account();
const should_notify = notifications_util.should_notify_on_event(bucket_json, notifications_util.OP_TO_EVENT.lifecycle_delete.name);
if (!bucket_json.lifecycle_configuration_rules) return {};
if (!bucket_json.lifecycle_configuration_rules) {
lifecycle_run_status.buckets_statuses[bucket_json.name].state = {is_finished: true};
return;
}
await process_rules(config_fs, bucket_json, object_sdk, should_notify);
}

Expand All @@ -170,16 +180,31 @@ async function process_bucket(config_fs, bucket_name, system_json) {
*/
async function process_rules(config_fs, bucket_json, object_sdk, should_notify) {
try {
await P.all(_.map(bucket_json.lifecycle_configuration_rules,
async (lifecycle_rule, index) =>
await _call_op_and_update_status({
bucket_name: bucket_json.name,
rule_id: lifecycle_rule.id,
op_name: TIMED_OPS.PROCESS_RULE,
op_func: async () => process_rule(config_fs, lifecycle_rule, index, bucket_json, object_sdk, should_notify)
})
)
);
lifecycle_run_status.buckets_statuses[bucket_json.name].state ??= {};
const bucket_state = lifecycle_run_status.buckets_statuses[bucket_json.name].state;
bucket_state.num_processed_objects = 0;
while (!bucket_state.is_finished && bucket_state.num_processed_objects < config.NC_LIFECYCLE_BUCKET_BATCH_SIZE) {
await P.all(_.map(bucket_json.lifecycle_configuration_rules,
async (lifecycle_rule, index) =>
await _call_op_and_update_status({
bucket_name: bucket_json.name,
rule_id: lifecycle_rule.id,
op_name: TIMED_OPS.PROCESS_RULE,
op_func: async () => process_rule(config_fs,
lifecycle_rule,
index,
bucket_json,
object_sdk,
should_notify
)
})
)
);
bucket_state.is_finished = Object.values(lifecycle_run_status.buckets_statuses[bucket_json.name].rules_statuses).reduce(
(acc, rule) => acc && (_.isEmpty(rule.state) || rule.state.is_finished),
true
);
}
} catch (err) {
dbg.error('process_rules failed with error', err, err.code, err.message);
}
Expand Down Expand Up @@ -268,7 +293,6 @@ async function process_rule(config_fs, lifecycle_rule, index, bucket_json, objec
op_func: async () => abort_mpus(candidates, object_sdk)
});
}
await update_lifecycle_rules_last_sync(config_fs, bucket_json, rule_id, index);
} catch (err) {
dbg.error('process_rule failed with error', err, err.code, err.message);
}
Expand Down Expand Up @@ -350,8 +374,9 @@ async function throw_if_noobaa_not_active(config_fs, system_json) {
*/
async function get_candidates(bucket_json, lifecycle_rule, object_sdk, fs_context) {
const candidates = { abort_mpu_candidates: [], delete_candidates: [] };
const rule_state = lifecycle_run_status.buckets_statuses[bucket_json.name].rules_statuses[lifecycle_rule.id]?.state || {};
if (lifecycle_rule.expiration) {
candidates.delete_candidates = await get_candidates_by_expiration_rule(lifecycle_rule, bucket_json, object_sdk);
candidates.delete_candidates = await get_candidates_by_expiration_rule(lifecycle_rule, bucket_json, object_sdk, rule_state);
if (lifecycle_rule.expiration.days || lifecycle_rule.expiration.expired_object_delete_marker) {
const dm_candidates = await get_candidates_by_expiration_delete_marker_rule(lifecycle_rule, bucket_json);
candidates.delete_candidates = candidates.delete_candidates.concat(dm_candidates);
Expand All @@ -365,6 +390,7 @@ async function get_candidates(bucket_json, lifecycle_rule, object_sdk, fs_contex
candidates.abort_mpu_candidates = await get_candidates_by_abort_incomplete_multipart_upload_rule(
lifecycle_rule, bucket_json, object_sdk, fs_context);
}
lifecycle_run_status.buckets_statuses[bucket_json.name].rules_statuses[lifecycle_rule.id].state = rule_state;
return candidates;
}

Expand All @@ -376,15 +402,10 @@ async function get_candidates(bucket_json, lifecycle_rule, object_sdk, fs_contex
* @returns {boolean}
*/
function validate_rule_enabled(rule, bucket) {
const now = Date.now();
if (rule.status !== 'Enabled') {
dbg.log0('validate_rule_enabled: SKIP bucket:', bucket.name, '(bucket id:', bucket._id, ') rule', util.inspect(rule), 'not Enabled');
return false;
}
if (rule.last_sync && now - rule.last_sync < config.LIFECYCLE_SCHEDULE_MIN) {
dbg.log0('validate_rule_enabled: SKIP bucket:', bucket.name, '(bucket id:', bucket._id, ') rule', util.inspect(rule), 'now', now, 'last_sync', rule.last_sync, 'schedule min', config.LIFECYCLE_SCHEDULE_MIN);
return false;
}
return true;
}

Expand All @@ -410,12 +431,12 @@ function _get_lifecycle_object_info_from_list_object_entry(entry) {
* @param {Object} bucket_json
* @returns {Promise<Object[]>}
*/
async function get_candidates_by_expiration_rule(lifecycle_rule, bucket_json, object_sdk) {
async function get_candidates_by_expiration_rule(lifecycle_rule, bucket_json, object_sdk, rule_state) {
const is_gpfs = nb_native().fs.gpfs;
if (is_gpfs && config.NC_LIFECYCLE_GPFS_ILM_ENABLED) {
return get_candidates_by_expiration_rule_gpfs(lifecycle_rule, bucket_json);
} else {
return get_candidates_by_expiration_rule_posix(lifecycle_rule, bucket_json, object_sdk);
return get_candidates_by_expiration_rule_posix(lifecycle_rule, bucket_json, object_sdk, rule_state);
}
}

Expand All @@ -436,21 +457,34 @@ async function get_candidates_by_expiration_rule_gpfs(lifecycle_rule, bucket_jso
* @param {Object} bucket_json
* @returns {Promise<Object[]>}
*/
async function get_candidates_by_expiration_rule_posix(lifecycle_rule, bucket_json, object_sdk) {
async function get_candidates_by_expiration_rule_posix(lifecycle_rule, bucket_json, object_sdk, rule_state) {
const expiration = _get_expiration_time(lifecycle_rule.expiration);
if (expiration < 0) return [];
const filter_func = _build_lifecycle_filter({filter: lifecycle_rule.filter, expiration});

const filtered_objects = [];
// TODO list_objects does not accept a filter and works in batch sizes of 1000. should handle batching
// also should maybe create a helper function or add argument for a filter in list object
const objects_list = await object_sdk.list_objects({bucket: bucket_json.name, prefix: lifecycle_rule.filter?.prefix});
const objects_list = await object_sdk.list_objects({
bucket: bucket_json.name,
prefix: lifecycle_rule.filter?.prefix,
key_marker: rule_state.key_marker,
limit: config.NC_LIFECYCLE_LIST_BATCH_SIZE
});
objects_list.objects.forEach(obj => {
const object_info = _get_lifecycle_object_info_from_list_object_entry(obj);
if (filter_func(object_info)) {
filtered_objects.push({key: object_info.key});
}
});

const bucket_state = lifecycle_run_status.buckets_statuses[bucket_json.name].state;
bucket_state.num_processed_objects += objects_list.objects.length;
if (objects_list.is_truncated) {
rule_state.key_marker = objects_list.next_marker;
} else {
rule_state.is_finished = true;
}
return filtered_objects;

}
Expand Down Expand Up @@ -630,26 +664,6 @@ function _file_contain_tags(object_info, filter_tags) {
//////// STATUS HELPERS ////////
/////////////////////////////////

/**
* update_lifecycle_rules_last_sync updates the last sync time of the lifecycle rule
* @param {import('../sdk/config_fs').ConfigFS} config_fs
* @param {Object} bucket_json
* @param {String} rule_id
* @param {number} index
* @returns {Promise<Void>}
*/
async function update_lifecycle_rules_last_sync(config_fs, bucket_json, rule_id, index) {
bucket_json.lifecycle_configuration_rules[index].last_sync = Date.now();
const { num_objects_deleted = 0, num_mpu_aborted = 0 } =
lifecycle_run_status.buckets_statuses[bucket_json.name].rules_statuses[rule_id].rule_stats;
// if (res.num_objects_deleted >= config.LIFECYCLE_BATCH_SIZE) should_rerun = true; // TODO - think if needed and add something about mpu abort
dbg.log0('nc_lifecycle.update_lifecycle_rules_last_sync Done bucket:', bucket_json.name, '(bucket id:', bucket_json._id, ') done deletion of objects per rule',
bucket_json.lifecycle_configuration_rules[index],
'time:', bucket_json.lifecycle_configuration_rules[index].last_sync,
'num_objects_deleted:', num_objects_deleted, 'num_mpu_aborted:', num_mpu_aborted);
await config_fs.update_bucket_config_file(bucket_json);
}

/**
* _call_op_and_update_status calls the op and report time and error to the lifecycle status.
*
Expand Down
Loading