From 89a89898e77969473cea6e294a14f4b6b00a348d Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Thu, 26 Mar 2026 15:37:16 +0800 Subject: [PATCH 1/4] Added yr init Signed-off-by: dpj135 <958208521@qq.com> --- transfer_queue/config.yaml | 13 +- transfer_queue/interface.py | 230 +++++++++++++++++++++++++++++++++++- 2 files changed, 238 insertions(+), 5 deletions(-) diff --git a/transfer_queue/config.yaml b/transfer_queue/config.yaml index 433c026d..318d49b7 100644 --- a/transfer_queue/config.yaml +++ b/transfer_queue/config.yaml @@ -48,7 +48,14 @@ backend: # For Yuanrong: Yuanrong: - # Port of local yuanrong datasystem worker + # Whether to let TQ automatically start etcd and datasystem services (default false) + auto_init: false + # etcd service address (used to start etcd when auto_init=true) + etcd_address: "127.0.0.1:2379" + # datasystem worker address (used to start dscli when auto_init=true) + worker_address: "127.0.0.1:31501" + # YuanrongStorageClient connection parameters (required) + host: "127.0.0.1" port: 31501 - # If enable npu transport - enable_yr_npu_transport: false + # Client name (optional, default is YuanrongStorageClient) + client_name: "YuanrongStorageClient" diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index 7e1e23a5..6a1a65c3 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -16,7 +16,10 @@ import logging import math import os +import shutil +import socket import subprocess +import tempfile import time from importlib import resources from typing import Any, Optional @@ -103,11 +106,11 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig: check = subprocess.run(["pgrep", "-f", "mooncake_master"], stdout=subprocess.PIPE, text=True) if check.returncode == 0: pids = check.stdout.strip().replace("\n", ", ") - logging.info(f"Find existing mooncake_master (PID: {pids}), try to kill first...") + logger.info(f"Find existing mooncake_master (PID: {pids}), try to kill first...") result = os.system('pkill -f "[m]ooncake_master"') if result == 0: - logging.info("Successfully killed existing mooncake_master processes.") + logger.info("Successfully killed existing mooncake_master processes.") else: raise RuntimeError(f"Failed to kill existing mooncake_master processes (exit code: {result}).") @@ -185,6 +188,193 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig: f"Output:\n{error_msg}" ) _TRANSFER_QUEUE_STORAGE["MooncakeStore"] = process + if conf.backend.storage_backend == "Yuanrong": + if conf.backend.Yuanrong.auto_init: + etcd_process = None + etcd_data_dir = None + + try: + # ========== Start etcd (inlined from _start_etcd) ========== + etcd_address = conf.backend.Yuanrong.etcd_address + # Parse host and port + if "://" in etcd_address: + # Remove protocol prefix if present + parsed = urlparse(etcd_address) + host = parsed.hostname + port = parsed.port + else: + # Assume host:port format + parts = etcd_address.split(":") + if len(parts) != 2: + raise ValueError(f"Invalid etcd_address format: {etcd_address}. Expected host:port") + host = parts[0] + port = int(parts[1]) + + # Check if etcd is already running + check = subprocess.run(["pgrep", "-f", "etcd"], stdout=subprocess.PIPE, text=True) + if check.returncode == 0: + pids = check.stdout.strip().replace("\n", ", ") + logger.warning(f"Found existing etcd processes (PID: {pids}). TQ will not start a new one.") + # Try to connect to see if it's listening on our desired port + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(2) + result = sock.connect_ex((host, port)) + sock.close() + if result == 0: + logger.info(f"etcd is already listening on {host}:{port}") + etcd_process = None + etcd_data_dir = None + else: + logger.warning( + f"etcd process exists but not listening on {host}:{port}, will start new instance" + ) + # Continue to start new instance + except Exception as e: + logger.warning(f"Failed to check etcd port: {e}, will start new instance") + # Continue to start new instance + + if etcd_process is None: + # Create temporary data directory + etcd_data_dir = tempfile.mkdtemp(prefix="tq_etcd_") + logger.info(f"Starting etcd with data directory: {etcd_data_dir}") + + cmd = [ + "etcd", + f"--data-dir={etcd_data_dir}", + f"--listen-client-urls=http://0.0.0.0:{port}", + f"--advertise-client-urls=http://{host}:{port}", + "--listen-peer-urls=http://0.0.0.0:2380", + f"--initial-advertise-peer-urls=http://{host}:2380", + "--initial-cluster=default=http://{}:2380".format(host), + ] + + log_file_path = "/tmp/etcd.log" + with open(log_file_path, "w") as log_file: + etcd_process = subprocess.Popen( + cmd, + stdout=log_file, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + universal_newlines=True, + start_new_session=True, + ) + time.sleep(3) # Wait for etcd to start + + if etcd_process.poll() is None: + logger.info( + f"etcd started, PID: {etcd_process.pid}. Logs at: {os.path.abspath(log_file_path)}" + ) + else: + error_msg = "" + try: + with open(log_file_path) as f: + error_msg = f.read() + except Exception as e: + error_msg = f"Failed to read log file: {e}" + + # Clean up data directory on failure + try: + shutil.rmtree(etcd_data_dir, ignore_errors=True) + except Exception: + pass + + raise RuntimeError( + f"etcd exited with error. Check {log_file_path} for detailed logs. Output:\n{error_msg}" + ) + + # Wait a moment for etcd to be ready if we started it + if etcd_process is not None: + time.sleep(2) + + # ========== Start datasystem worker (inlined from _start_dscli) ========== + worker_address = conf.backend.Yuanrong.worker_address + # Check if datasystem worker is already running (by checking worker port) + parts = worker_address.split(":") + if len(parts) != 2: + raise ValueError(f"Invalid worker_address format: {worker_address}. Expected host:port") + worker_host = parts[0] + worker_port = int(parts[1]) + + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(2) + result = sock.connect_ex((worker_host, worker_port)) + sock.close() + if result == 0: + logger.info(f"Datasystem worker already listening on {worker_address}") + else: + # Start datasystem worker + cmd = [ + "dscli", + "start", + "-w", + f"--worker_address={worker_address}", + f"--etcd_address={etcd_address}", + ] + + log_file_path = "/tmp/dscli.log" + with open(log_file_path, "w") as log_file: + process = subprocess.Popen( + cmd, + stdout=log_file, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + universal_newlines=True, + start_new_session=True, + ) + # Wait for dscli to start and exit (it starts worker and exits) + time.sleep(3) + + if process.poll() is not None: + # dscli exited, check if it succeeded by verifying port + time.sleep(2) # Give worker time to start + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(2) + result = sock.connect_ex((worker_host, worker_port)) + sock.close() + if result != 0: + error_msg = "" + try: + with open(log_file_path) as f: + error_msg = f.read() + except Exception as e: + error_msg = f"Failed to read log file: {e}" + raise RuntimeError( + f"Failed to start datasystem worker. Check {log_file_path} for logs. " + f"Output:\n{error_msg}" + ) + else: + logger.info(f"Datasystem worker started and listening on {worker_address}") + else: + # dscli is still running (unexpected), log warning + logger.warning( + f"dscli process still running (PID: {process.pid}). This may indicate an issue." + ) + except Exception as e: + raise RuntimeError(f"Failed to start datasystem worker: {e}") from e + + # Store processes and data directory + _TRANSFER_QUEUE_STORAGE["Yuanrong"] = { + "etcd": etcd_process, + "etcd_data_dir": etcd_data_dir, + "worker_address": worker_address, + "etcd_address": etcd_address, + } + logger.info("Yuanrong backend (etcd + datasystem) started successfully.") + + except Exception as e: + # Clean up on failure + if etcd_process is not None and etcd_process.poll() is None: + etcd_process.terminate() + if etcd_data_dir is not None: + try: + shutil.rmtree(etcd_data_dir, ignore_errors=True) + except Exception: + pass + raise RuntimeError(f"Failed to start Yuanrong backend: {e}") from e return conf @@ -346,6 +536,42 @@ def close(): logger.info("Successfully removed all existing keys in mooncake_master.") except Exception: pass + elif key == "Yuanrong": + # Stop etcd process and clean up data directory, stop datasystem worker via dscli + if isinstance(value, dict): + etcd_process = value.get("etcd") + etcd_data_dir = value.get("etcd_data_dir") + worker_address = value.get("worker_address") + + # Stop etcd if running + if etcd_process is not None and etcd_process.poll() is None: + etcd_process.terminate() + try: + etcd_process.wait(timeout=5) + except subprocess.TimeoutExpired: + etcd_process.kill() + + # Clean up etcd data directory + if etcd_data_dir is not None and os.path.exists(etcd_data_dir): + try: + shutil.rmtree(etcd_data_dir, ignore_errors=True) + logger.info(f"Cleaned up etcd data directory: {etcd_data_dir}") + except Exception as e: + logger.warning(f"Failed to clean up etcd data directory {etcd_data_dir}: {e}") + + # Stop datasystem worker via dscli command + if worker_address: + try: + subprocess.run( + ["dscli", "stop", "--worker_address", worker_address], + timeout=5, + capture_output=True, + ) + logger.info(f"Stopped datasystem worker at {worker_address} via dscli stop") + except Exception as e: + logger.warning(f"Failed to stop datasystem worker via dscli: {e}") + else: + logger.warning(f"Unexpected Yuanrong storage value: {value}") else: logger.warning(f"close for _TRANSFER_QUEUE_STORAGE with key {key} is not supported for now.") From 5e2e53443c6e92978cdc1963ffdaec38e7286925 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Sat, 28 Mar 2026 11:47:13 +0800 Subject: [PATCH 2/4] Reduced logics Signed-off-by: dpj135 <958208521@qq.com> --- transfer_queue/config.yaml | 7 +- transfer_queue/interface.py | 244 +++++++++++++----------------------- 2 files changed, 90 insertions(+), 161 deletions(-) diff --git a/transfer_queue/config.yaml b/transfer_queue/config.yaml index 318d49b7..e2e7a53b 100644 --- a/transfer_queue/config.yaml +++ b/transfer_queue/config.yaml @@ -48,12 +48,11 @@ backend: # For Yuanrong: Yuanrong: - # Whether to let TQ automatically start etcd and datasystem services (default false) - auto_init: false + # Whether to let TQ automatically start etcd and datasystem services + auto_init: True # etcd service address (used to start etcd when auto_init=true) etcd_address: "127.0.0.1:2379" - # datasystem worker address (used to start dscli when auto_init=true) - worker_address: "127.0.0.1:31501" + # datasystem worker host and port (used to start dscli when auto_init=true) # YuanrongStorageClient connection parameters (required) host: "127.0.0.1" port: 31501 diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index 6a1a65c3..ee481c12 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -17,7 +17,6 @@ import math import os import shutil -import socket import subprocess import tempfile import time @@ -192,169 +191,100 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig: if conf.backend.Yuanrong.auto_init: etcd_process = None etcd_data_dir = None - + worker_address = None + if not shutil.which("etcd"): + raise RuntimeError( + "etcd executable not found in PATH. Please install etcd and make sure it's in the PATH." + ) + if not shutil.which("dscli"): + raise RuntimeError( + "dscli executable not found in PATH. Please run `pip install openyuanrong-datasystem`." + ) try: - # ========== Start etcd (inlined from _start_etcd) ========== - etcd_address = conf.backend.Yuanrong.etcd_address - # Parse host and port - if "://" in etcd_address: - # Remove protocol prefix if present - parsed = urlparse(etcd_address) - host = parsed.hostname - port = parsed.port - else: - # Assume host:port format - parts = etcd_address.split(":") - if len(parts) != 2: - raise ValueError(f"Invalid etcd_address format: {etcd_address}. Expected host:port") - host = parts[0] - port = int(parts[1]) - - # Check if etcd is already running - check = subprocess.run(["pgrep", "-f", "etcd"], stdout=subprocess.PIPE, text=True) - if check.returncode == 0: - pids = check.stdout.strip().replace("\n", ", ") - logger.warning(f"Found existing etcd processes (PID: {pids}). TQ will not start a new one.") - # Try to connect to see if it's listening on our desired port - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(2) - result = sock.connect_ex((host, port)) - sock.close() - if result == 0: - logger.info(f"etcd is already listening on {host}:{port}") - etcd_process = None - etcd_data_dir = None - else: - logger.warning( - f"etcd process exists but not listening on {host}:{port}, will start new instance" - ) - # Continue to start new instance - except Exception as e: - logger.warning(f"Failed to check etcd port: {e}, will start new instance") - # Continue to start new instance - - if etcd_process is None: - # Create temporary data directory - etcd_data_dir = tempfile.mkdtemp(prefix="tq_etcd_") - logger.info(f"Starting etcd with data directory: {etcd_data_dir}") - - cmd = [ - "etcd", - f"--data-dir={etcd_data_dir}", - f"--listen-client-urls=http://0.0.0.0:{port}", - f"--advertise-client-urls=http://{host}:{port}", - "--listen-peer-urls=http://0.0.0.0:2380", - f"--initial-advertise-peer-urls=http://{host}:2380", - "--initial-cluster=default=http://{}:2380".format(host), - ] - - log_file_path = "/tmp/etcd.log" - with open(log_file_path, "w") as log_file: - etcd_process = subprocess.Popen( - cmd, - stdout=log_file, - stderr=subprocess.STDOUT, - text=True, - bufsize=1, - universal_newlines=True, - start_new_session=True, - ) - time.sleep(3) # Wait for etcd to start - - if etcd_process.poll() is None: - logger.info( - f"etcd started, PID: {etcd_process.pid}. Logs at: {os.path.abspath(log_file_path)}" - ) - else: - error_msg = "" - try: - with open(log_file_path) as f: - error_msg = f.read() - except Exception as e: - error_msg = f"Failed to read log file: {e}" + # ========== Start etcd ========== + etcd_address = "127.0.0.1:2379" + try: + etcd_address = conf.backend.Yuanrong.etcd_address + except Exception: + pass - # Clean up data directory on failure - try: - shutil.rmtree(etcd_data_dir, ignore_errors=True) - except Exception: - pass + # Assume host:port format + parts = etcd_address.split(":") + if len(parts) != 2: + raise ValueError(f"Invalid etcd_address format: {etcd_address}. Expected host:port") + host = parts[0] + port = int(parts[1]) + + # Create temporary data directory + etcd_data_dir = tempfile.mkdtemp(prefix="tq_etcd_") + logger.info(f"Starting etcd with data directory: {etcd_data_dir}") + + cmd = [ + "etcd", + f"--data-dir={etcd_data_dir}", + f"--listen-client-urls=http://{host}:{port}", + f"--advertise-client-urls=http://{host}:{port}", + ] + + etcd_process = subprocess.Popen( + cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + text=True, + bufsize=1, + universal_newlines=True, + start_new_session=True, + ) + time.sleep(3) # Wait for etcd to start + # TODO: check if etcd is healthy + etcd_is_healthy = etcd_process.poll() is None + # check + # + # + # + # + if not etcd_is_healthy: + # etcd exited immediately, indicate failure + # Clean up data directory on failure + try: + shutil.rmtree(etcd_data_dir, ignore_errors=True) + except Exception: + pass + raise RuntimeError(f"etcd exited immediately with return code {etcd_process.returncode}") - raise RuntimeError( - f"etcd exited with error. Check {log_file_path} for detailed logs. Output:\n{error_msg}" - ) + # Wait a moment for etcd to be ready + time.sleep(2) - # Wait a moment for etcd to be ready if we started it - if etcd_process is not None: - time.sleep(2) + # ========== Start datasystem worker ========== + # Assume host:port format + worker_host = conf.backend.Yuanrong.host + worker_port = conf.backend.Yuanrong.port + worker_address = worker_host + ":" + str(worker_port) - # ========== Start datasystem worker (inlined from _start_dscli) ========== - worker_address = conf.backend.Yuanrong.worker_address - # Check if datasystem worker is already running (by checking worker port) - parts = worker_address.split(":") - if len(parts) != 2: - raise ValueError(f"Invalid worker_address format: {worker_address}. Expected host:port") - worker_host = parts[0] - worker_port = int(parts[1]) + cmd = [ + "dscli", + "start", + "-w", + f"--worker_address={worker_address}", + f"--etcd_address={etcd_address}", + ] try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(2) - result = sock.connect_ex((worker_host, worker_port)) - sock.close() - if result == 0: - logger.info(f"Datasystem worker already listening on {worker_address}") - else: - # Start datasystem worker - cmd = [ - "dscli", - "start", - "-w", - f"--worker_address={worker_address}", - f"--etcd_address={etcd_address}", - ] - - log_file_path = "/tmp/dscli.log" - with open(log_file_path, "w") as log_file: - process = subprocess.Popen( - cmd, - stdout=log_file, - stderr=subprocess.STDOUT, - text=True, - bufsize=1, - universal_newlines=True, - start_new_session=True, - ) - # Wait for dscli to start and exit (it starts worker and exits) - time.sleep(3) - - if process.poll() is not None: - # dscli exited, check if it succeeded by verifying port - time.sleep(2) # Give worker time to start - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(2) - result = sock.connect_ex((worker_host, worker_port)) - sock.close() - if result != 0: - error_msg = "" - try: - with open(log_file_path) as f: - error_msg = f.read() - except Exception as e: - error_msg = f"Failed to read log file: {e}" - raise RuntimeError( - f"Failed to start datasystem worker. Check {log_file_path} for logs. " - f"Output:\n{error_msg}" - ) - else: - logger.info(f"Datasystem worker started and listening on {worker_address}") - else: - # dscli is still running (unexpected), log warning - logger.warning( - f"dscli process still running (PID: {process.pid}). This may indicate an issue." - ) - except Exception as e: - raise RuntimeError(f"Failed to start datasystem worker: {e}") from e + ds_result = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + timeout=90, + ) + except subprocess.TimeoutExpired as err: + raise RuntimeError("dscli start timed out") from err + # Wait for dscli to start and exit (it starts worker and exits) + if ds_result.returncode == 0 and "[ OK ]" in ds_result.stdout: + logger.info(f"dscli started Yuanrong datasystem worker at {worker_address} successfully.") + + else: + raise RuntimeError(f"Failed to start datasystem worker at {worker_address}.") # Store processes and data directory _TRANSFER_QUEUE_STORAGE["Yuanrong"] = { From e5c4588f6da7462f955b33a9cbd25f6a552d0593 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Sat, 28 Mar 2026 16:02:15 +0800 Subject: [PATCH 3/4] Simplified logics about Yuanrong startup Signed-off-by: dpj135 <958208521@qq.com> --- transfer_queue/interface.py | 41 +++++++++++++++++++------------------ 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index ee481c12..8b136f70 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -17,6 +17,7 @@ import math import os import shutil +import socket import subprocess import tempfile import time @@ -236,23 +237,18 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig: start_new_session=True, ) time.sleep(3) # Wait for etcd to start - # TODO: check if etcd is healthy - etcd_is_healthy = etcd_process.poll() is None - # check - # - # - # - # - if not etcd_is_healthy: - # etcd exited immediately, indicate failure - # Clean up data directory on failure - try: - shutil.rmtree(etcd_data_dir, ignore_errors=True) - except Exception: - pass + + if etcd_process.poll() is None: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(2) + result = sock.connect_ex((host, port)) + sock.close() + if result != 0: + raise RuntimeError(f"etcd process started but not listening on {host}:{port}") + else: raise RuntimeError(f"etcd exited immediately with return code {etcd_process.returncode}") - # Wait a moment for etcd to be ready + logger.info(f"etcd started, PID: {etcd_process.pid}") time.sleep(2) # ========== Start datasystem worker ========== @@ -265,8 +261,10 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig: "dscli", "start", "-w", - f"--worker_address={worker_address}", - f"--etcd_address={etcd_address}", + "--worker_address", + worker_address, + "--etcd_address", + etcd_address, ] try: @@ -278,13 +276,16 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig: timeout=90, ) except subprocess.TimeoutExpired as err: - raise RuntimeError("dscli start timed out") from err + raise RuntimeError(f"dscli start timed out: {err}") from err # Wait for dscli to start and exit (it starts worker and exits) if ds_result.returncode == 0 and "[ OK ]" in ds_result.stdout: logger.info(f"dscli started Yuanrong datasystem worker at {worker_address} successfully.") else: - raise RuntimeError(f"Failed to start datasystem worker at {worker_address}.") + raise RuntimeError( + f"Failed to start datasystem worker at {worker_address}. " + f"Return code: {ds_result.returncode}, Output: {ds_result.stdout}" + ) # Store processes and data directory _TRANSFER_QUEUE_STORAGE["Yuanrong"] = { @@ -494,7 +495,7 @@ def close(): try: subprocess.run( ["dscli", "stop", "--worker_address", worker_address], - timeout=5, + timeout=90, capture_output=True, ) logger.info(f"Stopped datasystem worker at {worker_address} via dscli stop") From 6bc4c613578e019bf47b4a006c6cdd95d846cf24 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Mon, 30 Mar 2026 16:01:00 +0800 Subject: [PATCH 4/4] Fixed comments Signed-off-by: dpj135 <958208521@qq.com> --- transfer_queue/config.yaml | 3 --- transfer_queue/interface.py | 19 +++++++++++++++++-- .../storage/clients/yuanrong_client.py | 2 +- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/transfer_queue/config.yaml b/transfer_queue/config.yaml index e2e7a53b..d5a3bac6 100644 --- a/transfer_queue/config.yaml +++ b/transfer_queue/config.yaml @@ -53,8 +53,5 @@ backend: # etcd service address (used to start etcd when auto_init=true) etcd_address: "127.0.0.1:2379" # datasystem worker host and port (used to start dscli when auto_init=true) - # YuanrongStorageClient connection parameters (required) host: "127.0.0.1" port: 31501 - # Client name (optional, default is YuanrongStorageClient) - client_name: "YuanrongStorageClient" diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index 8b136f70..347bd764 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -300,6 +300,11 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig: # Clean up on failure if etcd_process is not None and etcd_process.poll() is None: etcd_process.terminate() + try: + etcd_process.wait(timeout=5) + except subprocess.TimeoutExpired: + etcd_process.kill() + etcd_process.wait() if etcd_data_dir is not None: try: shutil.rmtree(etcd_data_dir, ignore_errors=True) @@ -481,6 +486,7 @@ def close(): etcd_process.wait(timeout=5) except subprocess.TimeoutExpired: etcd_process.kill() + etcd_process.wait() # Clean up etcd data directory if etcd_data_dir is not None and os.path.exists(etcd_data_dir): @@ -493,12 +499,21 @@ def close(): # Stop datasystem worker via dscli command if worker_address: try: - subprocess.run( + result = subprocess.run( ["dscli", "stop", "--worker_address", worker_address], timeout=90, capture_output=True, ) - logger.info(f"Stopped datasystem worker at {worker_address} via dscli stop") + if result.returncode == 0: + logger.info(f"Stopped datasystem worker at {worker_address} via dscli stop") + else: + error_msg = (result.stderr or result.stdout or b"").decode() + logger.warning( + f"Failed to stop datasystem worker at {worker_address}. " + f"Return code: {result.returncode}, Error: {error_msg}" + ) + except subprocess.TimeoutExpired as err: + logger.warning(f"dscli stop timed out for {worker_address}: {err}") except Exception as e: logger.warning(f"Failed to stop datasystem worker via dscli: {e}") else: diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index 77a981eb..3b0de3ab 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -297,7 +297,7 @@ class GeneralKVClientAdapter(StorageStrategy): The serialization method uses '_decoder' and '_encoder' from 'transfer_queue.utils.serial_utils'. """ - PUT_KEYS_LIMIT: int = 2_000 + PUT_KEYS_LIMIT: int = 10_000 GET_CLEAR_KEYS_LIMIT: int = 10_000 # Header: number of entries (uint32, little-endian)