From 45e7852da015d8ce1b32e8191915e5ab61279ee1 Mon Sep 17 00:00:00 2001 From: 0oshowero0 Date: Thu, 5 Mar 2026 11:18:24 +0800 Subject: [PATCH 1/2] convert detached actor to normal actor Signed-off-by: 0oshowero0 --- transfer_queue/interface.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index 0bfc5916..5b18e794 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -39,6 +39,7 @@ _TRANSFER_QUEUE_CLIENT: Any = None _TRANSFER_QUEUE_STORAGE: Any = None +_TRANSFER_QUEUE_CONTROLLER: Any = None def _maybe_create_transferqueue_client( @@ -101,9 +102,11 @@ def _init_from_existing() -> bool: Returns: True if successfully initialized from existing controller, False otherwise. """ - + global _TRANSFER_QUEUE_CONTROLLER try: - controller = ray.get_actor("TransferQueueController") + if _TRANSFER_QUEUE_CONTROLLER is None: + _TRANSFER_QUEUE_CONTROLLER = ray.get_actor("TransferQueueController") + except ValueError: logger.info("Called _init_from_existing() but TransferQueueController has not been initialized yet.") return False @@ -112,7 +115,7 @@ def _init_from_existing() -> bool: conf = None while conf is None: - conf = ray.get(controller.get_config.remote()) + conf = ray.get(_TRANSFER_QUEUE_CONTROLLER.get_config.remote()) if conf is not None: _maybe_create_transferqueue_client(conf) logger.info("TransferQueueClient initialized.") @@ -188,7 +191,8 @@ def init(conf: Optional[DictConfig] = None) -> None: try: # Ray will make sure actor with same name can only be created once - controller = TransferQueueController.options(name="TransferQueueController", lifetime="detached").remote( # type: ignore[attr-defined] + global _TRANSFER_QUEUE_CONTROLLER + _TRANSFER_QUEUE_CONTROLLER = TransferQueueController.options(name="TransferQueueController").remote( # type: ignore[attr-defined] sampler=sampler, polling_mode=final_conf.controller.polling_mode ) logger.info("TransferQueueController has been created.") @@ -197,14 +201,14 @@ def init(conf: Optional[DictConfig] = None) -> None: _init_from_existing() return - controller_zmq_info = process_zmq_server_info(controller) + controller_zmq_info = process_zmq_server_info(_TRANSFER_QUEUE_CONTROLLER) final_conf.controller.zmq_info = controller_zmq_info # create distributed storage backends final_conf = _maybe_create_transferqueue_storage(final_conf) # store the config into controller - ray.get(controller.store_config.remote(final_conf)) + ray.get(_TRANSFER_QUEUE_CONTROLLER.store_config.remote(final_conf)) logger.info(f"TransferQueue config: {final_conf}") # create client @@ -224,6 +228,7 @@ def close(): """ global _TRANSFER_QUEUE_CLIENT global _TRANSFER_QUEUE_STORAGE + global _TRANSFER_QUEUE_CONTROLLER if _TRANSFER_QUEUE_CLIENT: _TRANSFER_QUEUE_CLIENT.close() _TRANSFER_QUEUE_CLIENT = None @@ -237,6 +242,10 @@ def close(): except Exception: pass + if _TRANSFER_QUEUE_CONTROLLER: + ray.kill(_TRANSFER_QUEUE_CONTROLLER) + _TRANSFER_QUEUE_CONTROLLER = None + try: controller = ray.get_actor("TransferQueueController") ray.kill(controller) From 47cbab9f0890b4b9ff8864f5fc156566dd622bb1 Mon Sep 17 00:00:00 2001 From: 0oshowero0 Date: Thu, 5 Mar 2026 11:30:50 +0800 Subject: [PATCH 2/2] fix Signed-off-by: 0oshowero0 --- transfer_queue/interface.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index 5b18e794..97f25758 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -243,7 +243,10 @@ def close(): pass if _TRANSFER_QUEUE_CONTROLLER: - ray.kill(_TRANSFER_QUEUE_CONTROLLER) + try: + ray.kill(_TRANSFER_QUEUE_CONTROLLER) + except Exception: + pass _TRANSFER_QUEUE_CONTROLLER = None try: