From 7a79f80d4eaf0d47e934c8d0a722e9f688302d21 Mon Sep 17 00:00:00 2001 From: MissLittleFish Date: Thu, 12 Feb 2026 20:58:24 +0800 Subject: [PATCH 1/8] update _maybe_create_transferqueue_client to avoid potential ray conflicts Signed-off-by: MissLittleFish --- transfer_queue/interface.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index 8e4f8c34..2a5154fe 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -47,7 +47,7 @@ def _maybe_create_transferqueue_client( global _TRANSFER_QUEUE_CLIENT if _TRANSFER_QUEUE_CLIENT is None: if conf is None: - raise ValueError("Missing config for initializing TransferQueueClient!") + return _init_from_existing() pid = os.getpid() _TRANSFER_QUEUE_CLIENT = TransferQueueClient( client_id=f"TransferQueueClient_{pid}", controller_info=conf.controller.zmq_info From d3e206803a062cbd80e4a6924a54012c9db5a57c Mon Sep 17 00:00:00 2001 From: MissLittleFish Date: Thu, 12 Feb 2026 22:06:52 +0800 Subject: [PATCH 2/8] fix return type declaration mismatch Signed-off-by: MissLittleFish --- transfer_queue/interface.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index 2a5154fe..f2aac868 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -47,7 +47,8 @@ def _maybe_create_transferqueue_client( global _TRANSFER_QUEUE_CLIENT if _TRANSFER_QUEUE_CLIENT is None: if conf is None: - return _init_from_existing() + _init_from_existing() + return _TRANSFER_QUEUE_CLIENT pid = os.getpid() _TRANSFER_QUEUE_CLIENT = TransferQueueClient( client_id=f"TransferQueueClient_{pid}", controller_info=conf.controller.zmq_info From b859e4395944cd248d744556eff5135c091d3cc8 Mon Sep 17 00:00:00 2001 From: MissLittleFish Date: Fri, 13 Feb 2026 12:15:01 +0800 Subject: [PATCH 3/8] modify _init_from_existing() type as bool Signed-off-by: MissLittleFish --- transfer_queue/interface.py | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index f2aac868..56b8db05 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -47,8 +47,11 @@ def _maybe_create_transferqueue_client( global _TRANSFER_QUEUE_CLIENT if _TRANSFER_QUEUE_CLIENT is None: if conf is None: - _init_from_existing() + result = _init_from_existing() + assert result is True + assert _TRANSFER_QUEUE_CLIENT is not None return _TRANSFER_QUEUE_CLIENT + pid = os.getpid() _TRANSFER_QUEUE_CLIENT = TransferQueueClient( client_id=f"TransferQueueClient_{pid}", controller_info=conf.controller.zmq_info @@ -89,10 +92,18 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig: return conf -def _init_from_existing() -> None: - """Initialize the TransferQueueClient from existing controller.""" +def _init_from_existing() -> bool: + """Initialize the TransferQueueClient from existing controller. + + Returns: + True if successfully initialized from existing controller, False otherwise. + """ + + try: + controller = ray.get_actor("TransferQueueController") + except ValueError: + return False - controller = ray.get_actor("TransferQueueController") logger.info("Found existing TransferQueueController instance. Connecting...") conf = None @@ -101,11 +112,13 @@ def _init_from_existing() -> None: if remote_conf is not None: _maybe_create_transferqueue_client(remote_conf) logger.info("TransferQueueClient initialized.") - return + return True logger.debug("Waiting for controller to initialize... Retrying in 1s") time.sleep(1) + return False + # ==================== Initialization API ==================== def init(conf: Optional[DictConfig] = None) -> None: @@ -139,13 +152,12 @@ def init(conf: Optional[DictConfig] = None) -> None: >>> metadata = tq.get_meta(...) >>> data = tq.get_data(metadata) """ - try: - _init_from_existing() - except ValueError: - logger.info("No TransferQueueController found. Starting first-time initialization...") - else: + if _init_from_existing(): return + # First-time initialize TransferQueue + logger.info("No TransferQueueController found. Starting first-time initialization...") + # First-time initialize TransferQueue # create config From e5e256860c2729173587fd0d1933372923bb45a5 Mon Sep 17 00:00:00 2001 From: MissLittleFish Date: Fri, 13 Feb 2026 14:10:21 +0800 Subject: [PATCH 4/8] try to fix review comments Signed-off-by: MissLittleFish --- transfer_queue/interface.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index 56b8db05..ca399f71 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -114,6 +114,7 @@ def _init_from_existing() -> bool: logger.info("TransferQueueClient initialized.") return True + conf = remote_conf logger.debug("Waiting for controller to initialize... Retrying in 1s") time.sleep(1) @@ -158,8 +159,6 @@ def init(conf: Optional[DictConfig] = None) -> None: # First-time initialize TransferQueue logger.info("No TransferQueueController found. Starting first-time initialization...") - # First-time initialize TransferQueue - # create config final_conf = OmegaConf.create({}, flags={"allow_objects": True}) with pkg_resources.path("transfer_queue", "config.yaml") as p: From e3295eafcf10205bb2b55bac78572a9f3df9ba6a Mon Sep 17 00:00:00 2001 From: MissLittleFish Date: Fri, 13 Feb 2026 14:16:56 +0800 Subject: [PATCH 5/8] fix following comments Signed-off-by: MissLittleFish --- transfer_queue/interface.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index ca399f71..b053cbb6 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -102,6 +102,8 @@ def _init_from_existing() -> bool: try: controller = ray.get_actor("TransferQueueController") except ValueError: + logger.warning("Called _init_from_existing() but TransferQueue Controller has not been initialized yet." + "Please call tq.init() first.") return False logger.info("Found existing TransferQueueController instance. Connecting...") From 07c12f18cb941d639ac6020813a4c14eb1f15d40 Mon Sep 17 00:00:00 2001 From: MissLittleFish Date: Fri, 13 Feb 2026 14:23:08 +0800 Subject: [PATCH 6/8] improve while conf loop Signed-off-by: MissLittleFish --- transfer_queue/interface.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index b053cbb6..10dd0af7 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -110,13 +110,12 @@ def _init_from_existing() -> bool: conf = None while conf is None: - remote_conf = ray.get(controller.get_config.remote()) + conf = ray.get(controller.get_config.remote()) if remote_conf is not None: _maybe_create_transferqueue_client(remote_conf) logger.info("TransferQueueClient initialized.") return True - conf = remote_conf logger.debug("Waiting for controller to initialize... Retrying in 1s") time.sleep(1) From 52b7f140e05a283e20b78cacb2537d1d11e6261c Mon Sep 17 00:00:00 2001 From: MissLittleFish Date: Fri, 13 Feb 2026 14:26:42 +0800 Subject: [PATCH 7/8] bug fix Signed-off-by: MissLittleFish --- transfer_queue/interface.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index 10dd0af7..a6bce374 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -102,8 +102,7 @@ def _init_from_existing() -> bool: try: controller = ray.get_actor("TransferQueueController") except ValueError: - logger.warning("Called _init_from_existing() but TransferQueue Controller has not been initialized yet." - "Please call tq.init() first.") + logger.warning("Called _init_from_existing() but TransferQueue Controller has not been initialized yet.") return False logger.info("Found existing TransferQueueController instance. Connecting...") From af81d1bc39bcf02068faec91e4cd2db74ea2e7f9 Mon Sep 17 00:00:00 2001 From: MissLittleFish Date: Fri, 13 Feb 2026 14:43:26 +0800 Subject: [PATCH 8/8] fix bug Signed-off-by: MissLittleFish --- transfer_queue/interface.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index a6bce374..d415a2fb 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -47,9 +47,10 @@ def _maybe_create_transferqueue_client( global _TRANSFER_QUEUE_CLIENT if _TRANSFER_QUEUE_CLIENT is None: if conf is None: - result = _init_from_existing() - assert result is True - assert _TRANSFER_QUEUE_CLIENT is not None + _init_from_existing() + assert _TRANSFER_QUEUE_CLIENT is not None, ( + "TransferQueueController has not been initialized yet. Please call init() first." + ) return _TRANSFER_QUEUE_CLIENT pid = os.getpid() @@ -102,7 +103,7 @@ def _init_from_existing() -> bool: try: controller = ray.get_actor("TransferQueueController") except ValueError: - logger.warning("Called _init_from_existing() but TransferQueue Controller has not been initialized yet.") + logger.info("Called _init_from_existing() but TransferQueueController has not been initialized yet.") return False logger.info("Found existing TransferQueueController instance. Connecting...") @@ -110,8 +111,8 @@ def _init_from_existing() -> bool: conf = None while conf is None: conf = ray.get(controller.get_config.remote()) - if remote_conf is not None: - _maybe_create_transferqueue_client(remote_conf) + if conf is not None: + _maybe_create_transferqueue_client(conf) logger.info("TransferQueueClient initialized.") return True