diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index 0bfc5916..97f25758 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -39,6 +39,7 @@ _TRANSFER_QUEUE_CLIENT: Any = None _TRANSFER_QUEUE_STORAGE: Any = None +_TRANSFER_QUEUE_CONTROLLER: Any = None def _maybe_create_transferqueue_client( @@ -101,9 +102,11 @@ def _init_from_existing() -> bool: Returns: True if successfully initialized from existing controller, False otherwise. """ - + global _TRANSFER_QUEUE_CONTROLLER try: - controller = ray.get_actor("TransferQueueController") + if _TRANSFER_QUEUE_CONTROLLER is None: + _TRANSFER_QUEUE_CONTROLLER = ray.get_actor("TransferQueueController") + except ValueError: logger.info("Called _init_from_existing() but TransferQueueController has not been initialized yet.") return False @@ -112,7 +115,7 @@ def _init_from_existing() -> bool: conf = None while conf is None: - conf = ray.get(controller.get_config.remote()) + conf = ray.get(_TRANSFER_QUEUE_CONTROLLER.get_config.remote()) if conf is not None: _maybe_create_transferqueue_client(conf) logger.info("TransferQueueClient initialized.") @@ -188,7 +191,8 @@ def init(conf: Optional[DictConfig] = None) -> None: try: # Ray will make sure actor with same name can only be created once - controller = TransferQueueController.options(name="TransferQueueController", lifetime="detached").remote( # type: ignore[attr-defined] + global _TRANSFER_QUEUE_CONTROLLER + _TRANSFER_QUEUE_CONTROLLER = TransferQueueController.options(name="TransferQueueController").remote( # type: ignore[attr-defined] sampler=sampler, polling_mode=final_conf.controller.polling_mode ) logger.info("TransferQueueController has been created.") @@ -197,14 +201,14 @@ def init(conf: Optional[DictConfig] = None) -> None: _init_from_existing() return - controller_zmq_info = process_zmq_server_info(controller) + controller_zmq_info = process_zmq_server_info(_TRANSFER_QUEUE_CONTROLLER) final_conf.controller.zmq_info = controller_zmq_info # create distributed storage backends final_conf = _maybe_create_transferqueue_storage(final_conf) # store the config into controller - ray.get(controller.store_config.remote(final_conf)) + ray.get(_TRANSFER_QUEUE_CONTROLLER.store_config.remote(final_conf)) logger.info(f"TransferQueue config: {final_conf}") # create client @@ -224,6 +228,7 @@ def close(): """ global _TRANSFER_QUEUE_CLIENT global _TRANSFER_QUEUE_STORAGE + global _TRANSFER_QUEUE_CONTROLLER if _TRANSFER_QUEUE_CLIENT: _TRANSFER_QUEUE_CLIENT.close() _TRANSFER_QUEUE_CLIENT = None @@ -237,6 +242,13 @@ def close(): except Exception: pass + if _TRANSFER_QUEUE_CONTROLLER: + try: + ray.kill(_TRANSFER_QUEUE_CONTROLLER) + except Exception: + pass + _TRANSFER_QUEUE_CONTROLLER = None + try: controller = ray.get_actor("TransferQueueController") ray.kill(controller)