@@ -31,7 +31,8 @@ const config_fs_options = { silent_if_missing: true };
31
31
32
32
const lifecycle_run_status = {
33
33
running_host : os . hostname ( ) , lifecycle_run_times : { } ,
34
- total_stats : _get_default_stats ( ) , buckets_statuses : { }
34
+ total_stats : _get_default_stats ( ) , buckets_statuses : { } ,
35
+ state : { is_finished : false }
35
36
} ;
36
37
37
38
let return_short_status = false ;
@@ -137,13 +138,19 @@ async function run_lifecycle(config_fs, disable_service_validation) {
137
138
*/
138
139
async function process_buckets ( config_fs , bucket_names , system_json ) {
139
140
const buckets_concurrency = 10 ; // TODO - think about it
140
- await P . map_with_concurrency ( buckets_concurrency , bucket_names , async bucket_name =>
141
- await _call_op_and_update_status ( {
142
- bucket_name,
143
- op_name : TIMED_OPS . PROCESS_BUCKET ,
144
- op_func : async ( ) => process_bucket ( config_fs , bucket_name , system_json )
145
- } )
146
- ) ;
141
+ while ( ! lifecycle_run_status . state . is_finished ) {
142
+ await P . map_with_concurrency ( buckets_concurrency , bucket_names , async bucket_name =>
143
+ await _call_op_and_update_status ( {
144
+ bucket_name,
145
+ op_name : TIMED_OPS . PROCESS_BUCKET ,
146
+ op_func : async ( ) => process_bucket ( config_fs , bucket_name , system_json )
147
+ } )
148
+ ) ;
149
+ lifecycle_run_status . state . is_finished = Object . values ( lifecycle_run_status . buckets_statuses ) . reduce (
150
+ ( acc , bucket ) => acc && ( bucket . state ?. is_finished ) ,
151
+ true
152
+ ) ;
153
+ }
147
154
}
148
155
149
156
/**
@@ -158,7 +165,10 @@ async function process_bucket(config_fs, bucket_name, system_json) {
158
165
const object_sdk = new NsfsObjectSDK ( '' , config_fs , account , bucket_json . versioning , config_fs . config_root , system_json ) ;
159
166
await object_sdk . _simple_load_requesting_account ( ) ;
160
167
const should_notify = notifications_util . should_notify_on_event ( bucket_json , notifications_util . OP_TO_EVENT . lifecycle_delete . name ) ;
161
- if ( ! bucket_json . lifecycle_configuration_rules ) return { } ;
168
+ if ( ! bucket_json . lifecycle_configuration_rules ) {
169
+ lifecycle_run_status . buckets_statuses [ bucket_json . name ] . state = { is_finished : true } ;
170
+ return ;
171
+ }
162
172
await process_rules ( config_fs , bucket_json , object_sdk , should_notify ) ;
163
173
}
164
174
@@ -170,16 +180,31 @@ async function process_bucket(config_fs, bucket_name, system_json) {
170
180
*/
171
181
async function process_rules ( config_fs , bucket_json , object_sdk , should_notify ) {
172
182
try {
173
- await P . all ( _ . map ( bucket_json . lifecycle_configuration_rules ,
174
- async ( lifecycle_rule , index ) =>
175
- await _call_op_and_update_status ( {
176
- bucket_name : bucket_json . name ,
177
- rule_id : lifecycle_rule . id ,
178
- op_name : TIMED_OPS . PROCESS_RULE ,
179
- op_func : async ( ) => process_rule ( config_fs , lifecycle_rule , index , bucket_json , object_sdk , should_notify )
180
- } )
181
- )
182
- ) ;
183
+ lifecycle_run_status . buckets_statuses [ bucket_json . name ] . state ??= { } ;
184
+ const bucket_state = lifecycle_run_status . buckets_statuses [ bucket_json . name ] . state ;
185
+ bucket_state . num_processed_objects = 0 ;
186
+ while ( ! bucket_state . is_finished && bucket_state . num_processed_objects < config . NC_LIFECYCLE_BUCKET_BATCH_SIZE ) {
187
+ await P . all ( _ . map ( bucket_json . lifecycle_configuration_rules ,
188
+ async ( lifecycle_rule , index ) =>
189
+ await _call_op_and_update_status ( {
190
+ bucket_name : bucket_json . name ,
191
+ rule_id : lifecycle_rule . id ,
192
+ op_name : TIMED_OPS . PROCESS_RULE ,
193
+ op_func : async ( ) => process_rule ( config_fs ,
194
+ lifecycle_rule ,
195
+ index ,
196
+ bucket_json ,
197
+ object_sdk ,
198
+ should_notify
199
+ )
200
+ } )
201
+ )
202
+ ) ;
203
+ bucket_state . is_finished = Object . values ( lifecycle_run_status . buckets_statuses [ bucket_json . name ] . rules_statuses ) . reduce (
204
+ ( acc , rule ) => acc && ( _ . isEmpty ( rule . state ) || rule . state . is_finished ) ,
205
+ true
206
+ ) ;
207
+ }
183
208
} catch ( err ) {
184
209
dbg . error ( 'process_rules failed with error' , err , err . code , err . message ) ;
185
210
}
@@ -268,7 +293,6 @@ async function process_rule(config_fs, lifecycle_rule, index, bucket_json, objec
268
293
op_func : async ( ) => abort_mpus ( candidates , object_sdk )
269
294
} ) ;
270
295
}
271
- await update_lifecycle_rules_last_sync ( config_fs , bucket_json , rule_id , index ) ;
272
296
} catch ( err ) {
273
297
dbg . error ( 'process_rule failed with error' , err , err . code , err . message ) ;
274
298
}
@@ -350,8 +374,9 @@ async function throw_if_noobaa_not_active(config_fs, system_json) {
350
374
*/
351
375
async function get_candidates ( bucket_json , lifecycle_rule , object_sdk , fs_context ) {
352
376
const candidates = { abort_mpu_candidates : [ ] , delete_candidates : [ ] } ;
377
+ const rule_state = lifecycle_run_status . buckets_statuses [ bucket_json . name ] . rules_statuses [ lifecycle_rule . id ] ?. state || { } ;
353
378
if ( lifecycle_rule . expiration ) {
354
- candidates . delete_candidates = await get_candidates_by_expiration_rule ( lifecycle_rule , bucket_json , object_sdk ) ;
379
+ candidates . delete_candidates = await get_candidates_by_expiration_rule ( lifecycle_rule , bucket_json , object_sdk , rule_state ) ;
355
380
if ( lifecycle_rule . expiration . days || lifecycle_rule . expiration . expired_object_delete_marker ) {
356
381
const dm_candidates = await get_candidates_by_expiration_delete_marker_rule ( lifecycle_rule , bucket_json ) ;
357
382
candidates . delete_candidates = candidates . delete_candidates . concat ( dm_candidates ) ;
@@ -365,6 +390,7 @@ async function get_candidates(bucket_json, lifecycle_rule, object_sdk, fs_contex
365
390
candidates . abort_mpu_candidates = await get_candidates_by_abort_incomplete_multipart_upload_rule (
366
391
lifecycle_rule , bucket_json , object_sdk , fs_context ) ;
367
392
}
393
+ lifecycle_run_status . buckets_statuses [ bucket_json . name ] . rules_statuses [ lifecycle_rule . id ] . state = rule_state ;
368
394
return candidates ;
369
395
}
370
396
@@ -376,15 +402,10 @@ async function get_candidates(bucket_json, lifecycle_rule, object_sdk, fs_contex
376
402
* @returns {boolean }
377
403
*/
378
404
function validate_rule_enabled ( rule , bucket ) {
379
- const now = Date . now ( ) ;
380
405
if ( rule . status !== 'Enabled' ) {
381
406
dbg . log0 ( 'validate_rule_enabled: SKIP bucket:' , bucket . name , '(bucket id:' , bucket . _id , ') rule' , util . inspect ( rule ) , 'not Enabled' ) ;
382
407
return false ;
383
408
}
384
- if ( rule . last_sync && now - rule . last_sync < config . LIFECYCLE_SCHEDULE_MIN ) {
385
- dbg . log0 ( 'validate_rule_enabled: SKIP bucket:' , bucket . name , '(bucket id:' , bucket . _id , ') rule' , util . inspect ( rule ) , 'now' , now , 'last_sync' , rule . last_sync , 'schedule min' , config . LIFECYCLE_SCHEDULE_MIN ) ;
386
- return false ;
387
- }
388
409
return true ;
389
410
}
390
411
@@ -410,12 +431,12 @@ function _get_lifecycle_object_info_from_list_object_entry(entry) {
410
431
* @param {Object } bucket_json
411
432
* @returns {Promise<Object[]> }
412
433
*/
413
- async function get_candidates_by_expiration_rule ( lifecycle_rule , bucket_json , object_sdk ) {
434
+ async function get_candidates_by_expiration_rule ( lifecycle_rule , bucket_json , object_sdk , rule_state ) {
414
435
const is_gpfs = nb_native ( ) . fs . gpfs ;
415
436
if ( is_gpfs && config . NC_LIFECYCLE_GPFS_ILM_ENABLED ) {
416
437
return get_candidates_by_expiration_rule_gpfs ( lifecycle_rule , bucket_json ) ;
417
438
} else {
418
- return get_candidates_by_expiration_rule_posix ( lifecycle_rule , bucket_json , object_sdk ) ;
439
+ return get_candidates_by_expiration_rule_posix ( lifecycle_rule , bucket_json , object_sdk , rule_state ) ;
419
440
}
420
441
}
421
442
@@ -436,21 +457,34 @@ async function get_candidates_by_expiration_rule_gpfs(lifecycle_rule, bucket_jso
436
457
* @param {Object } bucket_json
437
458
* @returns {Promise<Object[]> }
438
459
*/
439
- async function get_candidates_by_expiration_rule_posix ( lifecycle_rule , bucket_json , object_sdk ) {
460
+ async function get_candidates_by_expiration_rule_posix ( lifecycle_rule , bucket_json , object_sdk , rule_state ) {
440
461
const expiration = _get_expiration_time ( lifecycle_rule . expiration ) ;
441
462
if ( expiration < 0 ) return [ ] ;
442
463
const filter_func = _build_lifecycle_filter ( { filter : lifecycle_rule . filter , expiration} ) ;
443
464
444
465
const filtered_objects = [ ] ;
445
466
// TODO list_objects does not accept a filter and works in batch sizes of 1000. should handle batching
446
467
// also should maybe create a helper function or add argument for a filter in list object
447
- const objects_list = await object_sdk . list_objects ( { bucket : bucket_json . name , prefix : lifecycle_rule . filter ?. prefix } ) ;
468
+ const objects_list = await object_sdk . list_objects ( {
469
+ bucket : bucket_json . name ,
470
+ prefix : lifecycle_rule . filter ?. prefix ,
471
+ key_marker : rule_state . key_marker ,
472
+ limit : config . NC_LIFECYCLE_LIST_BATCH_SIZE
473
+ } ) ;
448
474
objects_list . objects . forEach ( obj => {
449
475
const object_info = _get_lifecycle_object_info_from_list_object_entry ( obj ) ;
450
476
if ( filter_func ( object_info ) ) {
451
477
filtered_objects . push ( { key : object_info . key } ) ;
452
478
}
453
479
} ) ;
480
+
481
+ const bucket_state = lifecycle_run_status . buckets_statuses [ bucket_json . name ] . state ;
482
+ bucket_state . num_processed_objects += objects_list . objects . length ;
483
+ if ( objects_list . is_truncated ) {
484
+ rule_state . key_marker = objects_list . next_marker ;
485
+ } else {
486
+ rule_state . is_finished = true ;
487
+ }
454
488
return filtered_objects ;
455
489
456
490
}
@@ -630,26 +664,6 @@ function _file_contain_tags(object_info, filter_tags) {
630
664
//////// STATUS HELPERS ////////
631
665
/////////////////////////////////
632
666
633
- /**
634
- * update_lifecycle_rules_last_sync updates the last sync time of the lifecycle rule
635
- * @param {import('../sdk/config_fs').ConfigFS } config_fs
636
- * @param {Object } bucket_json
637
- * @param {String } rule_id
638
- * @param {number } index
639
- * @returns {Promise<Void> }
640
- */
641
- async function update_lifecycle_rules_last_sync ( config_fs , bucket_json , rule_id , index ) {
642
- bucket_json . lifecycle_configuration_rules [ index ] . last_sync = Date . now ( ) ;
643
- const { num_objects_deleted = 0 , num_mpu_aborted = 0 } =
644
- lifecycle_run_status . buckets_statuses [ bucket_json . name ] . rules_statuses [ rule_id ] . rule_stats ;
645
- // if (res.num_objects_deleted >= config.LIFECYCLE_BATCH_SIZE) should_rerun = true; // TODO - think if needed and add something about mpu abort
646
- dbg . log0 ( 'nc_lifecycle.update_lifecycle_rules_last_sync Done bucket:' , bucket_json . name , '(bucket id:' , bucket_json . _id , ') done deletion of objects per rule' ,
647
- bucket_json . lifecycle_configuration_rules [ index ] ,
648
- 'time:' , bucket_json . lifecycle_configuration_rules [ index ] . last_sync ,
649
- 'num_objects_deleted:' , num_objects_deleted , 'num_mpu_aborted:' , num_mpu_aborted ) ;
650
- await config_fs . update_bucket_config_file ( bucket_json ) ;
651
- }
652
-
653
667
/**
654
668
* _call_op_and_update_status calls the op and report time and error to the lifecycle status.
655
669
*
0 commit comments