11import copy
22import threading
33import time
4+ import traceback
45from typing import Dict , List , Union
56
67import cherrypy
@@ -158,6 +159,16 @@ def create_sub_job(
158159 dive_params : DiveDatasetList ,
159160 cliItem : CLIItem ,
160161):
162+
163+ if not dive_params or not isinstance (dive_params , dict ):
164+ raise ValueError ("Invalid or missing dive_params for sub-job" )
165+ if slicer_params is None :
166+ raise ValueError ("slicer_params is None" )
167+
168+ required_keys = ['DIVEVideo' , 'DIVEDataset' , 'DIVEMetadata' ]
169+ for k in required_keys :
170+ if k not in dive_params :
171+ raise KeyError (f"Missing required key '{ k } ' in dive_params" )
161172 dataset_params = copy .deepcopy (slicer_params )
162173 dataset_params ['DIVEVideo' ] = dive_params ['DIVEVideo' ]
163174 dataset_params ['DIVEDataset' ] = dive_params ['DIVEDataset' ]
@@ -166,7 +177,7 @@ def create_sub_job(
166177 dataset_params ['DIVEMetadataRoot' ] = base_params ['DIVEMetadataRoot' ]
167178 dataset_params ['girderToken' ] = base_params ['girderToken' ]
168179 dataset_params ['girderApiUrl' ] = base_params ['girderApiUrl' ]
169- name = dive_params [ 'DIVEDatasetName' ]
180+ name = dive_params . get ( 'DIVEDatasetName' , 'Unknown Dataset' )
170181 Token ().createToken (user = user )
171182 subJob = cliSubHandler (cliItem , dataset_params , user , token )
172183 baseJob = Job ().updateJob (
@@ -178,36 +189,58 @@ def create_sub_job(
178189
179190
180191def metadata_filter_slicer_cli_task (baseJob : Task ):
192+ try :
193+ params : DIVEMetadataSlicerCLITaskParams = baseJob .get ('kwargs' , {}).get ('params' )
194+ if not params :
195+ raise ValueError ("Missing or invalid job parameters in baseJob" )
181196
182- params : DIVEMetadataSlicerCLITaskParams = baseJob ['kwargs' ]['params' ]
197+ dive_dataset_list = params .get ('dataset_list' )
198+ cli_item_id = params .get ('cli_item' )
199+ slicer_params = params .get ('slicer_params' )
200+ userId = params .get ('userId' )
183201
184- dive_dataset_list = params ['dataset_list' ]
185- cli_item_id = params ['cli_item' ]
186- slicer_params = params ['slicer_params' ]
187- userId = params ['userId' ]
188- user = User ().load (userId , force = True )
189- token = Token ().createToken (user = user )
190- # Using the base jobs get the
191- baseJob = Job ().updateJob (
192- baseJob ,
193- log = 'Started DiveMetadata processing\n ' ,
194- status = JobStatus .RUNNING ,
195- )
202+ if not dive_dataset_list or not isinstance (dive_dataset_list , list ):
203+ raise ValueError ("Missing or invalid dataset_list" )
204+ if not cli_item_id :
205+ raise ValueError ("Missing CLI item ID" )
206+ if not userId :
207+ raise ValueError ("Missing userId" )
196208
197- cliItem = CLIItem .find (cli_item_id , user )
198- total_count = len (dive_dataset_list )
199- # try:
200- scheduled = 0
201- done = False
202- lastSubJob = None
209+ user = User ().load (userId , force = True )
210+ if not user :
211+ raise ValueError (f"Could not load user { userId } " )
212+
213+ token = Token ().createToken (user = user )
214+ if not token :
215+ raise ValueError ("Failed to create token" )
216+
217+ baseJob = Job ().updateJob (
218+ baseJob , log = 'Started DiveMetadata processing\n ' , status = JobStatus .RUNNING
219+ )
220+
221+ cliItem = CLIItem .find (cli_item_id , user )
222+ if not cliItem :
223+ raise ValueError (f"CLIItem { cli_item_id } not found" )
224+
225+ total_count = len (dive_dataset_list )
226+ scheduled = 0
227+ lastSubJob = None
228+ done = False
229+
230+ Job ().updateJob (
231+ baseJob ,
232+ log = f'Found { total_count } DIVE datasets to process\n ' ,
233+ status = JobStatus .RUNNING ,
234+ )
203235
204- try :
205236 while not done :
206237 baseJob = Job ().load (id = baseJob ['_id' ], force = True )
207238 if lastSubJob :
208239 lastSubJob = Job ().load (lastSubJob ['_id' ], force = True )
240+
209241 if not baseJob or baseJob ['status' ] in {JobStatus .CANCELED , JobStatus .ERROR }:
210242 break
243+
211244 if lastSubJob is None or lastSubJob ['status' ] in {
212245 JobStatus .SUCCESS ,
213246 JobStatus .ERROR ,
@@ -216,31 +249,48 @@ def metadata_filter_slicer_cli_task(baseJob: Task):
216249 if scheduled >= total_count :
217250 done = True
218251 break
252+
219253 dive_dataset_params = dive_dataset_list [scheduled ]
220- if not done :
221- # We are running in a girder context, but girder_worker
222- # uses cherrypy.request.app to detect this, so we have to
223- # fake it.
224- lastSubJob = create_sub_job (
225- baseJob , user , token , params , slicer_params , dive_dataset_params , cliItem
226- )
254+ if not dive_dataset_params :
227255 Job ().updateJob (
228256 baseJob ,
229- log = f'Scheduling job { scheduled } of { total_count } on dataset: { dive_dataset_list [scheduled ]["DIVEDatasetName" ]} \n ' ,
230- progressCurrent = scheduled ,
231- progressTotal = total_count ,
232- status = JobStatus .RUNNING ,
257+ log = f'Error: DIVE dataset params None at index { scheduled } \n ' ,
258+ status = JobStatus .ERROR ,
233259 )
234- scheduled += 1
235- continue
260+ break
261+
262+ lastSubJob = create_sub_job (
263+ baseJob , user , token , params , slicer_params , dive_dataset_params , cliItem
264+ )
265+ Job ().updateJob (
266+ baseJob ,
267+ log = f'Scheduling job { scheduled } of { total_count } on dataset: '
268+ f'{ dive_dataset_params .get ("DIVEDatasetName" , "Unknown" )} \n ' ,
269+ progressCurrent = scheduled ,
270+ progressTotal = total_count ,
271+ status = JobStatus .RUNNING ,
272+ )
273+ scheduled += 1
274+ continue
275+
236276 time .sleep (0.1 )
277+
237278 except Exception as exc :
238279 Job ().updateJob (
239280 baseJob ,
240- log = f'Error During DIVEMetadata Slicer CLI Processing Item: { dive_dataset_list [scheduled ]} \n ' ,
281+ log = f'Error During DIVEMetadata Slicer CLI Processing Item:\n \
282+ base Params: { params } \n \
283+ Slicer Params: { slicer_params } \n \
284+ DIVE Dataset Params: { dive_dataset_list [scheduled ]} \n ' ,
241285 status = JobStatus .ERROR ,
242286 )
243287 Job ().updateJob (baseJob , log = 'Exception: %r\n ' % exc )
288+ Job ().updateJob (
289+ baseJob ,
290+ log = f"Error During DIVEMetadata Slicer CLI Processing:\n " f"{ traceback .format_exc ()} " ,
291+ status = JobStatus .ERROR ,
292+ )
293+ return
244294
245295 Job ().updateJob (
246296 baseJob , log = 'Finished DIVE Metadata Batch CLI Processing' , status = JobStatus .SUCCESS
0 commit comments