Skip to content

Commit 5e2e534

Browse files
committed
Reduced logics
Signed-off-by: dpj135 <958208521@qq.com>
1 parent 89a8989 commit 5e2e534

2 files changed

Lines changed: 90 additions & 161 deletions

File tree

transfer_queue/config.yaml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,11 @@ backend:
4848

4949
# For Yuanrong:
5050
Yuanrong:
51-
# Whether to let TQ automatically start etcd and datasystem services (default false)
52-
auto_init: false
51+
# Whether to let TQ automatically start etcd and datasystem services
52+
auto_init: True
5353
# etcd service address (used to start etcd when auto_init=true)
5454
etcd_address: "127.0.0.1:2379"
55-
# datasystem worker address (used to start dscli when auto_init=true)
56-
worker_address: "127.0.0.1:31501"
55+
# datasystem worker host and port (used to start dscli when auto_init=true)
5756
# YuanrongStorageClient connection parameters (required)
5857
host: "127.0.0.1"
5958
port: 31501

transfer_queue/interface.py

Lines changed: 87 additions & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import math
1818
import os
1919
import shutil
20-
import socket
2120
import subprocess
2221
import tempfile
2322
import time
@@ -192,169 +191,100 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
192191
if conf.backend.Yuanrong.auto_init:
193192
etcd_process = None
194193
etcd_data_dir = None
195-
194+
worker_address = None
195+
if not shutil.which("etcd"):
196+
raise RuntimeError(
197+
"etcd executable not found in PATH. Please install etcd and make sure it's in the PATH."
198+
)
199+
if not shutil.which("dscli"):
200+
raise RuntimeError(
201+
"dscli executable not found in PATH. Please run `pip install openyuanrong-datasystem`."
202+
)
196203
try:
197-
# ========== Start etcd (inlined from _start_etcd) ==========
198-
etcd_address = conf.backend.Yuanrong.etcd_address
199-
# Parse host and port
200-
if "://" in etcd_address:
201-
# Remove protocol prefix if present
202-
parsed = urlparse(etcd_address)
203-
host = parsed.hostname
204-
port = parsed.port
205-
else:
206-
# Assume host:port format
207-
parts = etcd_address.split(":")
208-
if len(parts) != 2:
209-
raise ValueError(f"Invalid etcd_address format: {etcd_address}. Expected host:port")
210-
host = parts[0]
211-
port = int(parts[1])
212-
213-
# Check if etcd is already running
214-
check = subprocess.run(["pgrep", "-f", "etcd"], stdout=subprocess.PIPE, text=True)
215-
if check.returncode == 0:
216-
pids = check.stdout.strip().replace("\n", ", ")
217-
logger.warning(f"Found existing etcd processes (PID: {pids}). TQ will not start a new one.")
218-
# Try to connect to see if it's listening on our desired port
219-
try:
220-
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
221-
sock.settimeout(2)
222-
result = sock.connect_ex((host, port))
223-
sock.close()
224-
if result == 0:
225-
logger.info(f"etcd is already listening on {host}:{port}")
226-
etcd_process = None
227-
etcd_data_dir = None
228-
else:
229-
logger.warning(
230-
f"etcd process exists but not listening on {host}:{port}, will start new instance"
231-
)
232-
# Continue to start new instance
233-
except Exception as e:
234-
logger.warning(f"Failed to check etcd port: {e}, will start new instance")
235-
# Continue to start new instance
236-
237-
if etcd_process is None:
238-
# Create temporary data directory
239-
etcd_data_dir = tempfile.mkdtemp(prefix="tq_etcd_")
240-
logger.info(f"Starting etcd with data directory: {etcd_data_dir}")
241-
242-
cmd = [
243-
"etcd",
244-
f"--data-dir={etcd_data_dir}",
245-
f"--listen-client-urls=http://0.0.0.0:{port}",
246-
f"--advertise-client-urls=http://{host}:{port}",
247-
"--listen-peer-urls=http://0.0.0.0:2380",
248-
f"--initial-advertise-peer-urls=http://{host}:2380",
249-
"--initial-cluster=default=http://{}:2380".format(host),
250-
]
251-
252-
log_file_path = "/tmp/etcd.log"
253-
with open(log_file_path, "w") as log_file:
254-
etcd_process = subprocess.Popen(
255-
cmd,
256-
stdout=log_file,
257-
stderr=subprocess.STDOUT,
258-
text=True,
259-
bufsize=1,
260-
universal_newlines=True,
261-
start_new_session=True,
262-
)
263-
time.sleep(3) # Wait for etcd to start
264-
265-
if etcd_process.poll() is None:
266-
logger.info(
267-
f"etcd started, PID: {etcd_process.pid}. Logs at: {os.path.abspath(log_file_path)}"
268-
)
269-
else:
270-
error_msg = ""
271-
try:
272-
with open(log_file_path) as f:
273-
error_msg = f.read()
274-
except Exception as e:
275-
error_msg = f"Failed to read log file: {e}"
204+
# ========== Start etcd ==========
205+
etcd_address = "127.0.0.1:2379"
206+
try:
207+
etcd_address = conf.backend.Yuanrong.etcd_address
208+
except Exception:
209+
pass
276210

277-
# Clean up data directory on failure
278-
try:
279-
shutil.rmtree(etcd_data_dir, ignore_errors=True)
280-
except Exception:
281-
pass
211+
# Assume host:port format
212+
parts = etcd_address.split(":")
213+
if len(parts) != 2:
214+
raise ValueError(f"Invalid etcd_address format: {etcd_address}. Expected host:port")
215+
host = parts[0]
216+
port = int(parts[1])
217+
218+
# Create temporary data directory
219+
etcd_data_dir = tempfile.mkdtemp(prefix="tq_etcd_")
220+
logger.info(f"Starting etcd with data directory: {etcd_data_dir}")
221+
222+
cmd = [
223+
"etcd",
224+
f"--data-dir={etcd_data_dir}",
225+
f"--listen-client-urls=http://{host}:{port}",
226+
f"--advertise-client-urls=http://{host}:{port}",
227+
]
228+
229+
etcd_process = subprocess.Popen(
230+
cmd,
231+
stdout=subprocess.DEVNULL,
232+
stderr=subprocess.DEVNULL,
233+
text=True,
234+
bufsize=1,
235+
universal_newlines=True,
236+
start_new_session=True,
237+
)
238+
time.sleep(3) # Wait for etcd to start
239+
# TODO: check if etcd is healthy
240+
etcd_is_healthy = etcd_process.poll() is None
241+
# check
242+
#
243+
#
244+
#
245+
#
246+
if not etcd_is_healthy:
247+
# etcd exited immediately, indicate failure
248+
# Clean up data directory on failure
249+
try:
250+
shutil.rmtree(etcd_data_dir, ignore_errors=True)
251+
except Exception:
252+
pass
253+
raise RuntimeError(f"etcd exited immediately with return code {etcd_process.returncode}")
282254

283-
raise RuntimeError(
284-
f"etcd exited with error. Check {log_file_path} for detailed logs. Output:\n{error_msg}"
285-
)
255+
# Wait a moment for etcd to be ready
256+
time.sleep(2)
286257

287-
# Wait a moment for etcd to be ready if we started it
288-
if etcd_process is not None:
289-
time.sleep(2)
258+
# ========== Start datasystem worker ==========
259+
# Assume host:port format
260+
worker_host = conf.backend.Yuanrong.host
261+
worker_port = conf.backend.Yuanrong.port
262+
worker_address = worker_host + ":" + str(worker_port)
290263

291-
# ========== Start datasystem worker (inlined from _start_dscli) ==========
292-
worker_address = conf.backend.Yuanrong.worker_address
293-
# Check if datasystem worker is already running (by checking worker port)
294-
parts = worker_address.split(":")
295-
if len(parts) != 2:
296-
raise ValueError(f"Invalid worker_address format: {worker_address}. Expected host:port")
297-
worker_host = parts[0]
298-
worker_port = int(parts[1])
264+
cmd = [
265+
"dscli",
266+
"start",
267+
"-w",
268+
f"--worker_address={worker_address}",
269+
f"--etcd_address={etcd_address}",
270+
]
299271

300272
try:
301-
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
302-
sock.settimeout(2)
303-
result = sock.connect_ex((worker_host, worker_port))
304-
sock.close()
305-
if result == 0:
306-
logger.info(f"Datasystem worker already listening on {worker_address}")
307-
else:
308-
# Start datasystem worker
309-
cmd = [
310-
"dscli",
311-
"start",
312-
"-w",
313-
f"--worker_address={worker_address}",
314-
f"--etcd_address={etcd_address}",
315-
]
316-
317-
log_file_path = "/tmp/dscli.log"
318-
with open(log_file_path, "w") as log_file:
319-
process = subprocess.Popen(
320-
cmd,
321-
stdout=log_file,
322-
stderr=subprocess.STDOUT,
323-
text=True,
324-
bufsize=1,
325-
universal_newlines=True,
326-
start_new_session=True,
327-
)
328-
# Wait for dscli to start and exit (it starts worker and exits)
329-
time.sleep(3)
330-
331-
if process.poll() is not None:
332-
# dscli exited, check if it succeeded by verifying port
333-
time.sleep(2) # Give worker time to start
334-
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
335-
sock.settimeout(2)
336-
result = sock.connect_ex((worker_host, worker_port))
337-
sock.close()
338-
if result != 0:
339-
error_msg = ""
340-
try:
341-
with open(log_file_path) as f:
342-
error_msg = f.read()
343-
except Exception as e:
344-
error_msg = f"Failed to read log file: {e}"
345-
raise RuntimeError(
346-
f"Failed to start datasystem worker. Check {log_file_path} for logs. "
347-
f"Output:\n{error_msg}"
348-
)
349-
else:
350-
logger.info(f"Datasystem worker started and listening on {worker_address}")
351-
else:
352-
# dscli is still running (unexpected), log warning
353-
logger.warning(
354-
f"dscli process still running (PID: {process.pid}). This may indicate an issue."
355-
)
356-
except Exception as e:
357-
raise RuntimeError(f"Failed to start datasystem worker: {e}") from e
273+
ds_result = subprocess.run(
274+
cmd,
275+
stdout=subprocess.PIPE,
276+
stderr=subprocess.STDOUT,
277+
text=True,
278+
timeout=90,
279+
)
280+
except subprocess.TimeoutExpired as err:
281+
raise RuntimeError("dscli start timed out") from err
282+
# Wait for dscli to start and exit (it starts worker and exits)
283+
if ds_result.returncode == 0 and "[ OK ]" in ds_result.stdout:
284+
logger.info(f"dscli started Yuanrong datasystem worker at {worker_address} successfully.")
285+
286+
else:
287+
raise RuntimeError(f"Failed to start datasystem worker at {worker_address}.")
358288

359289
# Store processes and data directory
360290
_TRANSFER_QUEUE_STORAGE["Yuanrong"] = {

0 commit comments

Comments
 (0)