@@ -367,9 +367,22 @@ def run_workers(self, test_suite: TestSuite | TestGraph, params: Params) -> None
367367 slot_workers = sorted ([* graph .workers .values ()], key = lambda x : x .params ["name" ])
368368 to_traverse = [graph .traverse_object_trees (s , params ) for s in slot_workers ]
369369 try :
370- asyncio .run (self .run_test_nodes (to_traverse , self .job .timeout or 86400 ))
370+ self .loop .run_until_complete (
371+ asyncio .wait_for (
372+ asyncio .shield (asyncio .gather (* to_traverse )),
373+ self .job .timeout or 86400 ,
374+ )
375+ )
371376 except asyncio .TimeoutError as error :
372- logging .error ("Error during running workers: {error}" )
377+ logging .error (error )
378+ import stackscope
379+
380+ logging .critical (
381+ "Timeout exceeded. Printing stacks of all running coroutines:"
382+ )
383+ for task in asyncio .all_tasks ():
384+ coro = task .get_coro ()
385+ logging .critical (stackscope .extract (coro ))
373386 except KeyboardInterrupt as error :
374387 logging .info (str (error ))
375388 self .job .interrupted_reason = str (error )
@@ -399,11 +412,14 @@ def run_suite(self, job: Job, test_suite: TestSuite) -> set[str]:
399412 self .status_server = StatusServer (
400413 self .job .config .get ("run.status_server_listen" ), self .status_repo
401414 )
402- loop = asyncio .get_event_loop ()
403- loop .run_until_complete (self .status_server .create_server ())
404- asyncio .ensure_future (self .status_server .serve_forever ())
415+
416+ self .loop = asyncio .new_event_loop ()
417+ asyncio .set_event_loop (self .loop )
418+
419+ self .loop .run_until_complete (self .status_server .create_server ())
420+ self .loop .create_task (self .status_server .serve_forever ())
405421 # TODO: this needs more customization
406- asyncio . ensure_future (self ._update_status ())
422+ self . loop . create_task (self ._update_status ())
407423
408424 params = self .job .config ["param_dict" ]
409425 try :
0 commit comments