1717import math
1818import os
1919import shutil
20+ import socket
2021import subprocess
2122import tempfile
2223import time
@@ -236,23 +237,18 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
236237 start_new_session = True ,
237238 )
238239 time .sleep (3 ) # Wait for etcd to start
239- # TODO: check if etcd is healthy
240- etcd_is_healthy = etcd_process .poll () is None
241- # check
242- #
243- #
244- #
245- #
246- if not etcd_is_healthy :
247- # etcd exited immediately, indicate failure
248- # Clean up data directory on failure
249- try :
250- shutil .rmtree (etcd_data_dir , ignore_errors = True )
251- except Exception :
252- pass
240+
241+ if etcd_process .poll () is None :
242+ sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
243+ sock .settimeout (2 )
244+ result = sock .connect_ex ((host , port ))
245+ sock .close ()
246+ if result != 0 :
247+ raise RuntimeError (f"etcd process started but not listening on { host } :{ port } " )
248+ else :
253249 raise RuntimeError (f"etcd exited immediately with return code { etcd_process .returncode } " )
254250
255- # Wait a moment for etcd to be ready
251+ logger . info ( f" etcd started, PID: { etcd_process . pid } " )
256252 time .sleep (2 )
257253
258254 # ========== Start datasystem worker ==========
@@ -265,8 +261,10 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
265261 "dscli" ,
266262 "start" ,
267263 "-w" ,
268- f"--worker_address={ worker_address } " ,
269- f"--etcd_address={ etcd_address } " ,
264+ "--worker_address" ,
265+ worker_address ,
266+ "--etcd_address" ,
267+ etcd_address ,
270268 ]
271269
272270 try :
@@ -278,13 +276,16 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
278276 timeout = 90 ,
279277 )
280278 except subprocess .TimeoutExpired as err :
281- raise RuntimeError ("dscli start timed out" ) from err
279+ raise RuntimeError (f "dscli start timed out: { err } " ) from err
282280 # Wait for dscli to start and exit (it starts worker and exits)
283281 if ds_result .returncode == 0 and "[ OK ]" in ds_result .stdout :
284282 logger .info (f"dscli started Yuanrong datasystem worker at { worker_address } successfully." )
285283
286284 else :
287- raise RuntimeError (f"Failed to start datasystem worker at { worker_address } ." )
285+ raise RuntimeError (
286+ f"Failed to start datasystem worker at { worker_address } . "
287+ f"Return code: { ds_result .returncode } , Output: { ds_result .stdout } "
288+ )
288289
289290 # Store processes and data directory
290291 _TRANSFER_QUEUE_STORAGE ["Yuanrong" ] = {
@@ -494,7 +495,7 @@ def close():
494495 try :
495496 subprocess .run (
496497 ["dscli" , "stop" , "--worker_address" , worker_address ],
497- timeout = 5 ,
498+ timeout = 90 ,
498499 capture_output = True ,
499500 )
500501 logger .info (f"Stopped datasystem worker at { worker_address } via dscli stop" )
0 commit comments