diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index 8e4f8c34..d415a2fb 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -47,7 +47,12 @@ def _maybe_create_transferqueue_client( global _TRANSFER_QUEUE_CLIENT if _TRANSFER_QUEUE_CLIENT is None: if conf is None: - raise ValueError("Missing config for initializing TransferQueueClient!") + _init_from_existing() + assert _TRANSFER_QUEUE_CLIENT is not None, ( + "TransferQueueController has not been initialized yet. Please call init() first." + ) + return _TRANSFER_QUEUE_CLIENT + pid = os.getpid() _TRANSFER_QUEUE_CLIENT = TransferQueueClient( client_id=f"TransferQueueClient_{pid}", controller_info=conf.controller.zmq_info @@ -88,23 +93,34 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig: return conf -def _init_from_existing() -> None: - """Initialize the TransferQueueClient from existing controller.""" +def _init_from_existing() -> bool: + """Initialize the TransferQueueClient from existing controller. + + Returns: + True if successfully initialized from existing controller, False otherwise. + """ + + try: + controller = ray.get_actor("TransferQueueController") + except ValueError: + logger.info("Called _init_from_existing() but TransferQueueController has not been initialized yet.") + return False - controller = ray.get_actor("TransferQueueController") logger.info("Found existing TransferQueueController instance. Connecting...") conf = None while conf is None: - remote_conf = ray.get(controller.get_config.remote()) - if remote_conf is not None: - _maybe_create_transferqueue_client(remote_conf) + conf = ray.get(controller.get_config.remote()) + if conf is not None: + _maybe_create_transferqueue_client(conf) logger.info("TransferQueueClient initialized.") - return + return True logger.debug("Waiting for controller to initialize... Retrying in 1s") time.sleep(1) + return False + # ==================== Initialization API ==================== def init(conf: Optional[DictConfig] = None) -> None: @@ -138,14 +154,11 @@ def init(conf: Optional[DictConfig] = None) -> None: >>> metadata = tq.get_meta(...) >>> data = tq.get_data(metadata) """ - try: - _init_from_existing() - except ValueError: - logger.info("No TransferQueueController found. Starting first-time initialization...") - else: + if _init_from_existing(): return # First-time initialize TransferQueue + logger.info("No TransferQueueController found. Starting first-time initialization...") # create config final_conf = OmegaConf.create({}, flags={"allow_objects": True})