Skip to content

Commit d087351

Browse files
committed
👔 Create expected outputs log in longitudinal template flow
1 parent 5f16386 commit d087351

File tree

4 files changed

+72
-207
lines changed

4 files changed

+72
-207
lines changed

CPAC/pipeline/cpac_pipeline.py

Lines changed: 12 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929

3030
import yaml
3131
import nipype
32-
from nipype import config, logging
3332
from flowdump import save_workflow_json, WorkflowJSONMeta
3433
from indi_aws import aws_utils, fetch_creds
3534

@@ -198,6 +197,7 @@
198197
from CPAC.utils.monitoring import (
199198
FMLOGGER,
200199
getLogger,
200+
init_loggers,
201201
log_nodes_cb,
202202
log_nodes_initial,
203203
LOGTAIL,
@@ -221,11 +221,11 @@
221221

222222

223223
def run_workflow(
224-
sub_dict,
225-
c,
226-
run,
227-
pipeline_timing_info=None,
228-
p_name=None,
224+
sub_dict: dict,
225+
c: Configuration,
226+
run: bool,
227+
pipeline_timing_info: Optional[list] = None,
228+
p_name: Optional[str] = None,
229229
plugin="MultiProc",
230230
plugin_args=None,
231231
test_config=False,
@@ -256,8 +256,6 @@ def run_workflow(
256256
0 for success
257257
1 for general failure
258258
"""
259-
from CPAC.utils.datasource import bidsier_prefix
260-
261259
if plugin is not None and not isinstance(plugin, str):
262260
msg = (
263261
'CPAC.pipeline.cpac_pipeline.run_workflow requires a '
@@ -273,36 +271,7 @@ def run_workflow(
273271
subject_id, p_name, log_dir = set_subject(sub_dict, c)
274272
c["subject_id"] = subject_id
275273

276-
set_up_logger(
277-
f"{subject_id}_expectedOutputs",
278-
filename=f'{bidsier_prefix(c["subject_id"])}_' 'expectedOutputs.yml',
279-
level="info",
280-
log_dir=log_dir,
281-
mock=True,
282-
overwrite_existing=True,
283-
)
284-
if c.pipeline_setup["Debugging"]["verbose"]:
285-
set_up_logger("CPAC.engine", level="debug", log_dir=log_dir, mock=True)
286-
287-
config.update_config(
288-
{
289-
"logging": {
290-
"log_directory": log_dir,
291-
"log_to_file": bool(
292-
getattr(c.pipeline_setup["log_directory"], "run_logging", True)
293-
),
294-
},
295-
"execution": {
296-
"crashfile_format": "txt",
297-
"resource_monitor_frequency": 0.2,
298-
"stop_on_first_crash": c[
299-
"pipeline_setup", "system_config", "fail_fast"
300-
],
301-
},
302-
}
303-
)
304-
config.enable_resource_monitor()
305-
logging.update_logging(config)
274+
init_loggers(subject_id, c, log_dir, mock=True, longitudinal=False)
306275

307276
# Start timing here
308277
pipeline_start_time = time.time()
@@ -555,6 +524,7 @@ def run_workflow(
555524

556525
workflow_result = None
557526
exitcode = 0
527+
cb_log_filename = os.path.join(log_dir, "callback.log")
558528
try:
559529
subject_info["resource_pool"] = []
560530

@@ -566,8 +536,6 @@ def run_workflow(
566536
subject_info["status"] = "Running"
567537

568538
# Create callback logger
569-
cb_log_filename = os.path.join(log_dir, "callback.log")
570-
571539
try:
572540
if not os.path.exists(os.path.dirname(cb_log_filename)):
573541
os.makedirs(os.path.dirname(cb_log_filename))
@@ -598,8 +566,9 @@ def run_workflow(
598566
plugin = MultiProcPlugin(plugin_args)
599567

600568
try:
601-
# Actually run the pipeline now, for the current subject
602-
workflow_result = workflow.run(plugin=plugin, plugin_args=plugin_args)
569+
if run:
570+
# Actually run the pipeline now, for the current subject
571+
workflow_result = workflow.run(plugin=plugin, plugin_args=plugin_args)
603572
except UnicodeDecodeError:
604573
msg = (
605574
"C-PAC migrated from Python 2 to Python 3 in v1.6.2 (see "
@@ -814,7 +783,7 @@ def run_workflow(
814783
run_start=pipeline_start_datetime,
815784
run_finish=strftime("%Y-%m-%d %H:%M:%S"),
816785
output_check=check_outputs(
817-
c.pipeline_setup["output_directory"]["path"],
786+
c["pipeline_setup"]["output_directory"]["path"],
818787
log_dir,
819788
c.pipeline_setup["pipeline_name"],
820789
c["subject_id"],
@@ -1073,14 +1042,6 @@ def build_T1w_registration_stack(
10731042
space: Literal["longitudinal", "T1w"] = "T1w",
10741043
):
10751044
"""Build the T1w registration pipeline blocks."""
1076-
# if space == "longitudinal":
1077-
# for using in cfg[
1078-
# "registration_workflows", "anatomical_registration", "registration", "using"
1079-
# ]:
1080-
# if using.lower() != "fsl":
1081-
# msg = f"{using} anatomical registration not yet implemented for longitudinal workflows."
1082-
# raise NotImplementedError(msg)
1083-
10841045
if not pipeline_blocks:
10851046
pipeline_blocks = []
10861047

CPAC/pipeline/cpac_runner.py

Lines changed: 5 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@
2929
from CPAC.utils.configuration import check_pname, Configuration, set_subject
3030
from CPAC.utils.configuration.yaml_template import upgrade_pipeline_to_1_8
3131
from CPAC.utils.ga import track_run
32-
from CPAC.utils.monitoring import failed_to_start, log_nodes_cb, WFLOGGER
32+
from CPAC.utils.monitoring import failed_to_start, init_loggers, log_nodes_cb, WFLOGGER
3333

3434

35-
# Run condor jobs
3635
def run_condor_jobs(c, config_file, subject_list_file, p_name):
36+
"""Run condor jobs."""
3737
# Import packages
3838
import subprocess
3939
from time import strftime
@@ -249,6 +249,8 @@ def run_T1w_longitudinal(sublist, cfg: Configuration, dry_run: bool = False):
249249
# sessions for each participant as value
250250
for subject_id, sub_list in subject_id_dict.items():
251251
if len(sub_list) > 1:
252+
_, _, log_dir = set_subject(subject_id_dict, cfg)
253+
init_loggers(subject_id, cfg, log_dir, mock=True, longitudinal=True)
252254
anat_longitudinal_wf(subject_id, sub_list, cfg, dry_run=dry_run)
253255
elif len(sub_list) == 1:
254256
warnings.warn(
@@ -491,161 +493,10 @@ def run(
491493
"""
492494

493495
# BEGIN LONGITUDINAL TEMPLATE PIPELINE
494-
if (
495-
hasattr(c, "longitudinal_template_generation")
496-
and c.longitudinal_template_generation["run"]
497-
):
496+
if c["longitudinal_template_generation", "run"]:
498497
run_T1w_longitudinal(sublist, c, dry_run=test_config)
499498
# TODO functional longitudinal pipeline
500499

501-
"""
502-
if valid_longitudinal_data:
503-
rsc_file_list = []
504-
for dirpath, dirnames, filenames in os.walk(c.pipeline_setup[
505-
'output_directory']['path']):
506-
for f in filenames:
507-
# TODO is there a better way to check output folder name?
508-
if f != '.DS_Store' and 'T1w_longitudinal_pipeline' in dirpath:
509-
rsc_file_list.append(os.path.join(dirpath, f))
510-
511-
subject_specific_dict = {subj: [] for subj in subject_id_dict.keys()}
512-
session_specific_dict = {os.path.join(session['subject_id'], session['unique_id']): [] for session in sublist}
513-
for rsc_path in rsc_file_list:
514-
key = [s for s in session_specific_dict.keys() if s in rsc_path]
515-
if key:
516-
session_specific_dict[key[0]].append(rsc_path)
517-
else:
518-
subj = [s for s in subject_specific_dict.keys() if s in rsc_path]
519-
if subj:
520-
subject_specific_dict[subj[0]].append(rsc_path)
521-
522-
# update individual-specific outputs:
523-
# anatomical_brain, anatomical_brain_mask and anatomical_reorient
524-
for key in session_specific_dict.keys():
525-
for f in session_specific_dict[key]:
526-
sub, ses = key.split('/')
527-
ses_list = [subj for subj in sublist if sub in subj['subject_id'] and ses in subj['unique_id']]
528-
if len(ses_list) > 1:
529-
raise Exception("There are several files containing " + f)
530-
if len(ses_list) == 1:
531-
ses = ses_list[0]
532-
subj_id = ses['subject_id']
533-
tmp = f.split(c.pipeline_setup['output_directory']['path'])[-1]
534-
keys = tmp.split(os.sep)
535-
if keys[0] == '':
536-
keys = keys[1:]
537-
if len(keys) > 1:
538-
if ses.get('resource_pool') is None:
539-
ses['resource_pool'] = {
540-
keys[0].split(c.pipeline_setup['pipeline_name'] + '_')[-1]: {
541-
keys[-2]: f
542-
}
543-
}
544-
else:
545-
strat_key = keys[0].split(c.pipeline_setup['pipeline_name'] + '_')[-1]
546-
if ses['resource_pool'].get(strat_key) is None:
547-
ses['resource_pool'].update({
548-
strat_key: {
549-
keys[-2]: f
550-
}
551-
})
552-
else:
553-
ses['resource_pool'][strat_key].update({
554-
keys[-2]: f
555-
})
556-
557-
for key in subject_specific_dict:
558-
for f in subject_specific_dict[key]:
559-
ses_list = [subj for subj in sublist if key in subj['anat']]
560-
for ses in ses_list:
561-
tmp = f.split(c.pipeline_setup['output_directory']['path'])[-1]
562-
keys = tmp.split(os.sep)
563-
if keys[0] == '':
564-
keys = keys[1:]
565-
if len(keys) > 1:
566-
if ses.get('resource_pool') is None:
567-
ses['resource_pool'] = {
568-
keys[0].split(c.pipeline_setup['pipeline_name'] + '_')[-1]: {
569-
keys[-2]: f
570-
}
571-
}
572-
else:
573-
strat_key = keys[0].split(c.pipeline_setup['pipeline_name'] + '_')[-1]
574-
if ses['resource_pool'].get(strat_key) is None:
575-
ses['resource_pool'].update({
576-
strat_key: {
577-
keys[-2]: f
578-
}
579-
})
580-
else:
581-
if keys[-2] == 'anatomical_brain' or keys[-2] == 'anatomical_brain_mask' or keys[-2] == 'anatomical_skull_leaf':
582-
pass
583-
elif 'apply_warp_anat_longitudinal_to_standard' in keys[-2] or 'fsl_apply_xfm_longitudinal' in keys[-2]:
584-
# TODO update!!!
585-
# it assumes session id == last key (ordered by session count instead of session id) + 1
586-
# might cause problem if session id is not continuous
587-
def replace_index(target1, target2, file_path):
588-
index1 = file_path.index(target1)+len(target1)
589-
index2 = file_path.index(target2)+len(target2)
590-
file_str_list = list(file_path)
591-
file_str_list[index1] = "*"
592-
file_str_list[index2] = "*"
593-
file_path_updated = "".join(file_str_list)
594-
file_list = glob.glob(file_path_updated)
595-
file_list.sort()
596-
return file_list
597-
if ses['unique_id'] == str(int(keys[-2][-1])+1):
598-
if keys[-3] == 'seg_probability_maps':
599-
f_list = replace_index('seg_probability_maps_', 'segment_prob_', f)
600-
ses['resource_pool'][strat_key].update({
601-
keys[-3]: f_list
602-
})
603-
elif keys[-3] == 'seg_partial_volume_files':
604-
f_list = replace_index('seg_partial_volume_files_', 'segment_pve_', f)
605-
ses['resource_pool'][strat_key].update({
606-
keys[-3]: f_list
607-
})
608-
else:
609-
ses['resource_pool'][strat_key].update({
610-
keys[-3]: f # keys[-3]: 'anatomical_to_standard'
611-
})
612-
elif keys[-2] != 'warp_list':
613-
ses['resource_pool'][strat_key].update({
614-
keys[-2]: f
615-
})
616-
elif keys[-2] == 'warp_list':
617-
if 'ses-'+ses['unique_id'] in tmp:
618-
ses['resource_pool'][strat_key].update({
619-
keys[-2]: f
620-
})
621-
for key in subject_specific_dict:
622-
ses_list = [subj for subj in sublist if key in subj['anat']]
623-
for ses in ses_list:
624-
for reg_strat in strat_list:
625-
try:
626-
ss_strat_list = list(ses['resource_pool'])
627-
for strat_key in ss_strat_list:
628-
try:
629-
ses['resource_pool'][strat_key].update({
630-
'registration_method': reg_strat['registration_method']
631-
})
632-
except KeyError:
633-
pass
634-
except KeyError:
635-
pass
636-
637-
yaml.dump(sublist, open(os.path.join(c.pipeline_setup['working_directory']['path'],'data_config_longitudinal.yml'), 'w'), default_flow_style=False)
638-
WFLOGGER.info("\n\nLongitudinal pipeline completed.\n\n")
639-
640-
# skip main preprocessing
641-
if (
642-
not c.anatomical_preproc['run'] and
643-
not c.functional_preproc['run']
644-
):
645-
sys.exit()
646-
"""
647-
# END LONGITUDINAL TEMPLATE PIPELINE
648-
649500
# If it only allows one, run it linearly
650501
if c.pipeline_setup["system_config"]["num_participants_at_once"] == 1:
651502
for sub in sublist:

CPAC/utils/monitoring/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
FMLOGGER,
2727
getLogger,
2828
IFLOGGER,
29+
init_loggers,
2930
set_up_logger,
3031
UTLOGGER,
3132
WFLOGGER,
@@ -44,6 +45,7 @@
4445
"FMLOGGER",
4546
"getLogger",
4647
"IFLOGGER",
48+
"init_loggers",
4749
"LoggingHTTPServer",
4850
"LoggingRequestHandler",
4951
"log_nodes_cb",

0 commit comments

Comments
 (0)