Skip to content

NC | lifecycle | expire objects with batching #8897

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,9 @@ config.NC_LIFECYCLE_RUN_DELAY_LIMIT_MINS = 2;
/** @type {'UTC' | 'LOCAL'} */
config.NC_LIFECYCLE_TZ = 'LOCAL';

config.NC_LIFECYCLE_LIST_BATCH_SIZE = 1000;
config.NC_LIFECYCLE_BUCKET_BATCH_SIZE = 10000;

config.NC_LIFECYCLE_GPFS_ILM_ENABLED = false;
////////// GPFS //////////
config.GPFS_DOWN_DELAY = 1000;
Expand Down
4 changes: 2 additions & 2 deletions src/manage_nsfs/manage_nsfs_events_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ NoobaaEvent.LIFECYCLE_FAILED = Object.freeze({
event_type: 'ERROR',
scope: 'NODE',
severity: 'ERROR',
state: 'DEGRADED',
state: 'HEALTHY',
});

NoobaaEvent.LIFECYCLE_TIMEOUT = Object.freeze({
Expand All @@ -455,7 +455,7 @@ NoobaaEvent.LIFECYCLE_TIMEOUT = Object.freeze({
event_type: 'ERROR',
scope: 'NODE',
severity: 'ERROR',
state: 'DEGRADED',
state: 'HEALTHY',
});

exports.NoobaaEvent = NoobaaEvent;
114 changes: 64 additions & 50 deletions src/manage_nsfs/nc_lifecycle.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ const config_fs_options = { silent_if_missing: true };

const lifecycle_run_status = {
running_host: os.hostname(), lifecycle_run_times: {},
total_stats: _get_default_stats(), buckets_statuses: {}
total_stats: _get_default_stats(), buckets_statuses: {},
state: {is_finished: false}
};

let return_short_status = false;
Expand Down Expand Up @@ -137,13 +138,19 @@ async function run_lifecycle(config_fs, disable_service_validation) {
*/
async function process_buckets(config_fs, bucket_names, system_json) {
const buckets_concurrency = 10; // TODO - think about it
await P.map_with_concurrency(buckets_concurrency, bucket_names, async bucket_name =>
await _call_op_and_update_status({
bucket_name,
op_name: TIMED_OPS.PROCESS_BUCKET,
op_func: async () => process_bucket(config_fs, bucket_name, system_json)
})
);
while (!lifecycle_run_status.state.is_finished) {
await P.map_with_concurrency(buckets_concurrency, bucket_names, async bucket_name =>
await _call_op_and_update_status({
bucket_name,
op_name: TIMED_OPS.PROCESS_BUCKET,
op_func: async () => process_bucket(config_fs, bucket_name, system_json)
})
);
lifecycle_run_status.state.is_finished = Object.values(lifecycle_run_status.buckets_statuses).reduce(
(acc, bucket) => acc && (bucket.state?.is_finished),
true
);
}
}

/**
Expand All @@ -158,7 +165,10 @@ async function process_bucket(config_fs, bucket_name, system_json) {
const object_sdk = new NsfsObjectSDK('', config_fs, account, bucket_json.versioning, config_fs.config_root, system_json);
await object_sdk._simple_load_requesting_account();
const should_notify = notifications_util.should_notify_on_event(bucket_json, notifications_util.OP_TO_EVENT.lifecycle_delete.name);
if (!bucket_json.lifecycle_configuration_rules) return {};
if (!bucket_json.lifecycle_configuration_rules) {
lifecycle_run_status.buckets_statuses[bucket_json.name].state = {is_finished: true};
return;
}
await process_rules(config_fs, bucket_json, object_sdk, should_notify);
}

Expand All @@ -170,16 +180,31 @@ async function process_bucket(config_fs, bucket_name, system_json) {
*/
async function process_rules(config_fs, bucket_json, object_sdk, should_notify) {
try {
await P.all(_.map(bucket_json.lifecycle_configuration_rules,
async (lifecycle_rule, index) =>
await _call_op_and_update_status({
bucket_name: bucket_json.name,
rule_id: lifecycle_rule.id,
op_name: TIMED_OPS.PROCESS_RULE,
op_func: async () => process_rule(config_fs, lifecycle_rule, index, bucket_json, object_sdk, should_notify)
})
)
);
lifecycle_run_status.buckets_statuses[bucket_json.name].state ??= {};
const bucket_state = lifecycle_run_status.buckets_statuses[bucket_json.name].state;
bucket_state.num_processed_objects = 0;
while (!bucket_state.is_finished && bucket_state.num_processed_objects < config.NC_LIFECYCLE_BUCKET_BATCH_SIZE) {
await P.all(_.map(bucket_json.lifecycle_configuration_rules,
async (lifecycle_rule, index) =>
await _call_op_and_update_status({
bucket_name: bucket_json.name,
rule_id: lifecycle_rule.id,
op_name: TIMED_OPS.PROCESS_RULE,
op_func: async () => process_rule(config_fs,
lifecycle_rule,
index,
bucket_json,
object_sdk,
should_notify
)
})
)
);
bucket_state.is_finished = Object.values(lifecycle_run_status.buckets_statuses[bucket_json.name].rules_statuses).reduce(
(acc, rule) => acc && (_.isEmpty(rule.state) || rule.state.is_finished),
true
);
}
} catch (err) {
dbg.error('process_rules failed with error', err, err.code, err.message);
}
Expand Down Expand Up @@ -268,7 +293,6 @@ async function process_rule(config_fs, lifecycle_rule, index, bucket_json, objec
op_func: async () => abort_mpus(candidates, object_sdk)
});
}
await update_lifecycle_rules_last_sync(config_fs, bucket_json, rule_id, index);
} catch (err) {
dbg.error('process_rule failed with error', err, err.code, err.message);
}
Expand Down Expand Up @@ -350,8 +374,9 @@ async function throw_if_noobaa_not_active(config_fs, system_json) {
*/
async function get_candidates(bucket_json, lifecycle_rule, object_sdk, fs_context) {
const candidates = { abort_mpu_candidates: [], delete_candidates: [] };
const rule_state = lifecycle_run_status.buckets_statuses[bucket_json.name].rules_statuses[lifecycle_rule.id]?.state || {};
if (lifecycle_rule.expiration) {
candidates.delete_candidates = await get_candidates_by_expiration_rule(lifecycle_rule, bucket_json, object_sdk);
candidates.delete_candidates = await get_candidates_by_expiration_rule(lifecycle_rule, bucket_json, object_sdk, rule_state);
if (lifecycle_rule.expiration.days || lifecycle_rule.expiration.expired_object_delete_marker) {
const dm_candidates = await get_candidates_by_expiration_delete_marker_rule(lifecycle_rule, bucket_json);
candidates.delete_candidates = candidates.delete_candidates.concat(dm_candidates);
Expand All @@ -365,6 +390,7 @@ async function get_candidates(bucket_json, lifecycle_rule, object_sdk, fs_contex
candidates.abort_mpu_candidates = await get_candidates_by_abort_incomplete_multipart_upload_rule(
lifecycle_rule, bucket_json, object_sdk, fs_context);
}
lifecycle_run_status.buckets_statuses[bucket_json.name].rules_statuses[lifecycle_rule.id].state = rule_state;
return candidates;
}

Expand All @@ -376,15 +402,10 @@ async function get_candidates(bucket_json, lifecycle_rule, object_sdk, fs_contex
* @returns {boolean}
*/
function validate_rule_enabled(rule, bucket) {
const now = Date.now();
if (rule.status !== 'Enabled') {
dbg.log0('validate_rule_enabled: SKIP bucket:', bucket.name, '(bucket id:', bucket._id, ') rule', util.inspect(rule), 'not Enabled');
return false;
}
if (rule.last_sync && now - rule.last_sync < config.LIFECYCLE_SCHEDULE_MIN) {
dbg.log0('validate_rule_enabled: SKIP bucket:', bucket.name, '(bucket id:', bucket._id, ') rule', util.inspect(rule), 'now', now, 'last_sync', rule.last_sync, 'schedule min', config.LIFECYCLE_SCHEDULE_MIN);
return false;
}
return true;
}

Expand All @@ -410,12 +431,12 @@ function _get_lifecycle_object_info_from_list_object_entry(entry) {
* @param {Object} bucket_json
* @returns {Promise<Object[]>}
*/
async function get_candidates_by_expiration_rule(lifecycle_rule, bucket_json, object_sdk) {
async function get_candidates_by_expiration_rule(lifecycle_rule, bucket_json, object_sdk, rule_state) {
const is_gpfs = nb_native().fs.gpfs;
if (is_gpfs && config.NC_LIFECYCLE_GPFS_ILM_ENABLED) {
return get_candidates_by_expiration_rule_gpfs(lifecycle_rule, bucket_json);
} else {
return get_candidates_by_expiration_rule_posix(lifecycle_rule, bucket_json, object_sdk);
return get_candidates_by_expiration_rule_posix(lifecycle_rule, bucket_json, object_sdk, rule_state);
}
}

Expand All @@ -436,21 +457,34 @@ async function get_candidates_by_expiration_rule_gpfs(lifecycle_rule, bucket_jso
* @param {Object} bucket_json
* @returns {Promise<Object[]>}
*/
async function get_candidates_by_expiration_rule_posix(lifecycle_rule, bucket_json, object_sdk) {
async function get_candidates_by_expiration_rule_posix(lifecycle_rule, bucket_json, object_sdk, rule_state) {
const expiration = _get_expiration_time(lifecycle_rule.expiration);
if (expiration < 0) return [];
const filter_func = _build_lifecycle_filter({filter: lifecycle_rule.filter, expiration});

const filtered_objects = [];
// TODO list_objects does not accept a filter and works in batch sizes of 1000. should handle batching
// also should maybe create a helper function or add argument for a filter in list object
const objects_list = await object_sdk.list_objects({bucket: bucket_json.name, prefix: lifecycle_rule.filter?.prefix});
const objects_list = await object_sdk.list_objects({
bucket: bucket_json.name,
prefix: lifecycle_rule.filter?.prefix,
key_marker: rule_state.key_marker,
limit: config.NC_LIFECYCLE_LIST_BATCH_SIZE
});
objects_list.objects.forEach(obj => {
const object_info = _get_lifecycle_object_info_from_list_object_entry(obj);
if (filter_func(object_info)) {
filtered_objects.push({key: object_info.key});
}
});

const bucket_state = lifecycle_run_status.buckets_statuses[bucket_json.name].state;
bucket_state.num_processed_objects += objects_list.objects.length;
if (objects_list.is_truncated) {
rule_state.key_marker = objects_list.next_marker;
} else {
rule_state.is_finished = true;
}
return filtered_objects;

}
Expand Down Expand Up @@ -630,26 +664,6 @@ function _file_contain_tags(object_info, filter_tags) {
//////// STATUS HELPERS ////////
/////////////////////////////////

/**
* update_lifecycle_rules_last_sync updates the last sync time of the lifecycle rule
* @param {import('../sdk/config_fs').ConfigFS} config_fs
* @param {Object} bucket_json
* @param {String} rule_id
* @param {number} index
* @returns {Promise<Void>}
*/
async function update_lifecycle_rules_last_sync(config_fs, bucket_json, rule_id, index) {
bucket_json.lifecycle_configuration_rules[index].last_sync = Date.now();
const { num_objects_deleted = 0, num_mpu_aborted = 0 } =
lifecycle_run_status.buckets_statuses[bucket_json.name].rules_statuses[rule_id].rule_stats;
// if (res.num_objects_deleted >= config.LIFECYCLE_BATCH_SIZE) should_rerun = true; // TODO - think if needed and add something about mpu abort
dbg.log0('nc_lifecycle.update_lifecycle_rules_last_sync Done bucket:', bucket_json.name, '(bucket id:', bucket_json._id, ') done deletion of objects per rule',
bucket_json.lifecycle_configuration_rules[index],
'time:', bucket_json.lifecycle_configuration_rules[index].last_sync,
'num_objects_deleted:', num_objects_deleted, 'num_mpu_aborted:', num_mpu_aborted);
await config_fs.update_bucket_config_file(bucket_json);
}

/**
* _call_op_and_update_status calls the op and report time and error to the lifecycle status.
*
Expand Down
Loading