[feat] provide save/load checkpoint interfaces#124
Conversation
CLA Signature Passdodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
CLA Signature Passdodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
1 similar comment
CLA Signature Passdodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
| CHECKPOINT_DUMP_RESPONSE = "CHECKPOINT_DUMP_RESPONSE" | ||
| CHECKPOINT_RESTORE = "CHECKPOINT_RESTORE" | ||
| CHECKPOINT_RESTORE_RESPONSE = "CHECKPOINT_RESTORE_RESPONSE" | ||
| SAVE_LOAD_CKPT_ERROR = "SAVE_LOAD_CKPT_ERROR" |
| CHECKPOINT_DUMP = "CHECKPOINT_DUMP" | ||
| CHECKPOINT_DUMP_RESPONSE = "CHECKPOINT_DUMP_RESPONSE" | ||
| CHECKPOINT_RESTORE = "CHECKPOINT_RESTORE" | ||
| CHECKPOINT_RESTORE_RESPONSE = "CHECKPOINT_RESTORE_RESPONSE" |
There was a problem hiding this comment.
Distinguish between controller zmq event & simple storage zmq event
| @property | ||
| def storage_checkpoint_required(self) -> bool: | ||
| """Whether storage contents must be checkpointed for correct restore. | ||
|
|
||
| Returns True for in-memory backends (e.g. SimpleStorage) where data | ||
| is lost on restart and must be serialized. Returns False for persistent | ||
| KV backends (e.g. MooncakeStore, Yuanrong) where data survives restarts | ||
| and only controller metadata needs to be saved. | ||
|
|
||
| Subclasses should override this to reflect their actual persistence model. | ||
| """ | ||
| return False |
There was a problem hiding this comment.
This property is strange
| async def dump_checkpoint(self, output_dir: str) -> list[dict]: | ||
| """Dump all storage units to files in output_dir. | ||
|
|
||
| Returns: | ||
| List of dicts, each with keys: position, storage_unit_id. | ||
|
|
||
| Raises: | ||
| NotImplementedError: If this storage backend does not support checkpoint. | ||
| """ | ||
| raise NotImplementedError(f"{self.__class__.__name__} does not support checkpoint") | ||
|
|
||
| async def restore_checkpoint(self, checkpoint_dir: str, su_info_list: list[dict]) -> None: | ||
| """Restore all storage units from files in checkpoint_dir. | ||
|
|
||
| Args: | ||
| checkpoint_dir: Path to the checkpoint directory containing storage unit files. | ||
| su_info_list: Ordered list of storage unit info dicts from metadata.json. | ||
|
|
There was a problem hiding this comment.
These interfaces are not universal to other backends
| logger.error(f"[{self.controller_id}]: dump_to_file failed: {e}") | ||
| return False | ||
|
|
||
| def restore_from_file(self, path: str) -> bool: |
There was a problem hiding this comment.
Unify the interface name
Signed-off-by: yxstev <zhangyixiang9@huawei.com>
CLA Signature Passdodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
CLA Signature Passdodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
| def _put_batch(keys, partition_id, input_ids, attention_mask, tags=None): | ||
| fields = TensorDict( | ||
| {"input_ids": input_ids, "attention_mask": attention_mask}, | ||
| batch_size=len(keys), | ||
| ) | ||
| if tags is None: | ||
| tags = [{} for _ in keys] | ||
| tq.kv_batch_put(keys=keys, partition_id=partition_id, fields=fields, tags=tags) | ||
|
|
||
|
|
||
| def _get_batch(keys, partition_id): | ||
| return tq.kv_batch_get(keys=keys, partition_id=partition_id) |
There was a problem hiding this comment.
Just use TQ api. Don't warp it
| @@ -0,0 +1,395 @@ | |||
| # Copyright 2025 Huawei Technologies Co., Ltd. All Rights Reserved. | |||
There was a problem hiding this comment.
Need to refactor the test logics.
- Define test data
- Put
- Save
- check saved states
- Load
- check load states
There was a problem hiding this comment.
We can split the unit tests in test_controller.py, test_simple_storage_unit.py
Signed-off-by: yxstev <zhangyixiang9@huawei.com>
CLA Signature Passdodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
CLA Signature Passdodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
CLA Signature Passdodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Previously, load_checkpoint restored controller state first, then validated storage unit count. A mismatch would raise ValueError after the controller had already been overwritten, leaving the system in an inconsistent state. Fix: introduce validate_checkpoint on the storage manager that checks file existence and SU count without touching any state. interface.py now runs this validation before any restoration begins. Also adds an assertion to test_load_raises_on_storage_unit_count_mismatch to verify controller state is unchanged after a failed load. Signed-off-by: yxstev <zhangyixiang9@huawei.com>
CLA Signature Passdodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Summary
tq.save_checkpoint(checkpoint_dir, *, include_storage, metadata)andtq.load_checkpoint(checkpoint_dir)as top-level public APIs.tmpdirectory that is renamed on success and deleted on failure, ensuring no partial checkpoint is left on diskglobal_idx % num_units) so storage unit IDs regenerated across restarts are handled correctlyCheckpoint layout:
Test plan
pytest tests/e2e/test_checkpoint_e2e.py -vinclude_storage=Falsesaves only controller state