Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions transfer_queue/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

_TRANSFER_QUEUE_CLIENT: Any = None
_TRANSFER_QUEUE_STORAGE: Any = None
_TRANSFER_QUEUE_CONTROLLER: Any = None


def _maybe_create_transferqueue_client(
Expand Down Expand Up @@ -101,9 +102,11 @@ def _init_from_existing() -> bool:
Returns:
True if successfully initialized from existing controller, False otherwise.
"""

global _TRANSFER_QUEUE_CONTROLLER
try:
controller = ray.get_actor("TransferQueueController")
if _TRANSFER_QUEUE_CONTROLLER is None:
_TRANSFER_QUEUE_CONTROLLER = ray.get_actor("TransferQueueController")

Comment thread
0oshowero0 marked this conversation as resolved.
except ValueError:
logger.info("Called _init_from_existing() but TransferQueueController has not been initialized yet.")
return False
Expand All @@ -112,7 +115,7 @@ def _init_from_existing() -> bool:

conf = None
while conf is None:
conf = ray.get(controller.get_config.remote())
conf = ray.get(_TRANSFER_QUEUE_CONTROLLER.get_config.remote())
if conf is not None:
_maybe_create_transferqueue_client(conf)
logger.info("TransferQueueClient initialized.")
Expand Down Expand Up @@ -188,7 +191,8 @@ def init(conf: Optional[DictConfig] = None) -> None:

try:
# Ray will make sure actor with same name can only be created once
controller = TransferQueueController.options(name="TransferQueueController", lifetime="detached").remote( # type: ignore[attr-defined]
global _TRANSFER_QUEUE_CONTROLLER
_TRANSFER_QUEUE_CONTROLLER = TransferQueueController.options(name="TransferQueueController").remote( # type: ignore[attr-defined]
sampler=sampler, polling_mode=final_conf.controller.polling_mode
)
logger.info("TransferQueueController has been created.")
Expand All @@ -197,14 +201,14 @@ def init(conf: Optional[DictConfig] = None) -> None:
_init_from_existing()
return

controller_zmq_info = process_zmq_server_info(controller)
controller_zmq_info = process_zmq_server_info(_TRANSFER_QUEUE_CONTROLLER)
final_conf.controller.zmq_info = controller_zmq_info

# create distributed storage backends
final_conf = _maybe_create_transferqueue_storage(final_conf)

# store the config into controller
ray.get(controller.store_config.remote(final_conf))
ray.get(_TRANSFER_QUEUE_CONTROLLER.store_config.remote(final_conf))
logger.info(f"TransferQueue config: {final_conf}")

# create client
Expand All @@ -224,6 +228,7 @@ def close():
"""
global _TRANSFER_QUEUE_CLIENT
global _TRANSFER_QUEUE_STORAGE
global _TRANSFER_QUEUE_CONTROLLER
if _TRANSFER_QUEUE_CLIENT:
_TRANSFER_QUEUE_CLIENT.close()
_TRANSFER_QUEUE_CLIENT = None
Expand All @@ -237,6 +242,13 @@ def close():
except Exception:
pass

if _TRANSFER_QUEUE_CONTROLLER:
try:
ray.kill(_TRANSFER_QUEUE_CONTROLLER)
except Exception:
pass
_TRANSFER_QUEUE_CONTROLLER = None
Comment thread
0oshowero0 marked this conversation as resolved.

try:
controller = ray.get_actor("TransferQueueController")
ray.kill(controller)
Expand Down