From 30b6ba5d9563f44df09aad06d872afe59dcc3e01 Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Mon, 8 Jun 2026 14:24:13 +0800 Subject: [PATCH 1/2] Add a faq to yuanrong[doc] and set env_var for rh2d Signed-off-by: dpj135 <958208521@qq.com> --- .../openyuanrong_datasystem.md | 15 ++++++++++ .../storage/bootstrap/yuanrong_bootstrap.py | 4 +++ .../storage/clients/yuanrong_client.py | 28 ++++++++++++++----- 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/docs/storage_backends/openyuanrong_datasystem.md b/docs/storage_backends/openyuanrong_datasystem.md index a0e346a..839ddc9 100644 --- a/docs/storage_backends/openyuanrong_datasystem.md +++ b/docs/storage_backends/openyuanrong_datasystem.md @@ -463,6 +463,21 @@ RuntimeError: code: [Out of memory], msg: [Shared memory no space in arena: ...] Solution: Increase `--shared_memory_size_mb` in `worker_args`, or reduce the data volume being cached. +### "Cannot retrieve stored data" Error on get/clear + +If you encounter an error like: +``` +ValueError: Cannot retrieve stored data because the backend that originally stored it is unavailable in the current process or node. Please check that the configuration and NPU resource availability are consistent across all processes and nodes. +``` + +This occurs when `kv_batch_get` or `kv_batch_clear` cannot find the storage backend that originally handled the data. The most common cause is a mismatch between the process that originally `put` the data and the process performing `get`/`clear`, such as: + +- Different `enable_yr_npu_transport` settings across processes or nodes (e.g., `true` vs `false`). +- NPU hardware or CANN/torch-npu unavailable on the `get`/`clear` process or node, even though the configuration is identical. +- When running inside Ray actors, the actor may not be assigned NPU resources (e.g., missing `"NPU": 1` in `.options(resources=...)`), preventing the NPU transport backend from initializing. + +Solution: Ensure that all processes and nodes use the same TransferQueue configuration and have consistent NPU resource availability. When using Ray actors, make sure NPU resources are properly allocated via `.options(resources={"NPU": 1})`. + ## Datasystem Logs diff --git a/transfer_queue/storage/bootstrap/yuanrong_bootstrap.py b/transfer_queue/storage/bootstrap/yuanrong_bootstrap.py index 5682593..bfafe95 100644 --- a/transfer_queue/storage/bootstrap/yuanrong_bootstrap.py +++ b/transfer_queue/storage/bootstrap/yuanrong_bootstrap.py @@ -120,6 +120,10 @@ def start_datasystem_worker( if device_ids or enable_rdma or ucx_env_vars: env = os.environ.copy() if device_ids: + # Ensure direct copy for specified devices + env["DS_D2H_MEMCPY_POLICY"] = "direct" + env["DS_H2D_MEMCPY_POLICY"] = "direct" + env["ASCEND_RT_VISIBLE_DEVICES"] = device_ids logger.info( f"Setting ASCEND_RT_VISIBLE_DEVICES={device_ids} for dscli subprocess ({node_type} at {worker_address})" diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index 0aebf22..8e7c7cc 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -351,7 +351,9 @@ def put(self, keys: list[str], values: list[Any]) -> list[str]: if len(keys) != len(values): raise ValueError("Number of keys must match number of values") - routed_indexes = self._route_to_strategies(values, lambda strategy_, item_: strategy_.supports_put(item_)) + routed_indexes = self._route_to_strategies( + values, lambda strategy_, item_: strategy_.supports_put(item_), item_label="value" + ) # Define the 'put_task': Slicing the input list and calling the backend strategy. # The closure captures local 'keys' and 'values' for zero-overhead parameter passing. @@ -394,7 +396,7 @@ def get( strategy_tags = custom_backend_meta routed_indexes = self._route_to_strategies( - strategy_tags, lambda strategy_, item_: strategy_.supports_get(item_) + strategy_tags, lambda strategy_, item_: strategy_.supports_get(item_), item_label="backend_meta" ) # Define the 'get_task': handles slicing of keys, shapes, and dtypes simultaneously. @@ -426,7 +428,10 @@ def clear(self, keys: list[str], custom_backend_meta: list[str] | None = None) - strategy_tags = custom_backend_meta routed_indexes = self._route_to_strategies( - strategy_tags, lambda strategy_, item_: strategy_.supports_clear(item_), ignore_unmatched=True + strategy_tags, + lambda strategy_, item_: strategy_.supports_clear(item_), + ignore_unmatched=True, + item_label="backend_meta", ) def clear_task(strategy, indexes): @@ -439,7 +444,9 @@ def _route_to_strategies( self, items: list[Any], selector: Callable[[StorageStrategy, Any], bool], + *, ignore_unmatched: bool = False, + item_label: str = "value", ) -> dict[StorageStrategy, list[int]]: """Groups item indices by the first strategy that supports them. @@ -454,6 +461,8 @@ def _route_to_strategies( Signature: `(strategy: StorageStrategy, item: Any) -> bool`. ignore_unmatched: If True, items that don't match any strategy will be ignored (not included in output). If False, a ValueError will be raised for any unmatched item. + item_label: Description of what `items` represents, used in error messages. + "value" for put (user-provided data), "backend_meta" for get/clear (backend metadata). Returns: A dictionary mapping each active strategy to a list of indexes in `items` @@ -470,10 +479,15 @@ def _route_to_strategies( if ignore_unmatched: unmatched_count += 1 else: - raise ValueError( - f"No strategy supports item of type {type(item).__name__}: {item}. " - f"Available strategies: {[type(s).__name__ for s in self._strategies]}" - ) + if item_label == "backend_meta": + raise ValueError( + "Cannot retrieve stored data because the backend that originally " + "stored it is unavailable in the current process or node. Please " + "check that the configuration and NPU resource availability are " + "consistent across all processes and nodes." + ) + else: + raise ValueError(f"No strategy can handle {item_label} of type {type(item).__name__}.") if unmatched_count > 0: logger.warning(f"{unmatched_count} items were not matched to any strategy and will be ignored.") From 26392cf34b749aa3de3bed5ff1c68056d499204c Mon Sep 17 00:00:00 2001 From: dpj135 <958208521@qq.com> Date: Mon, 8 Jun 2026 20:57:22 +0800 Subject: [PATCH 2/2] Fixed comments and add some tests Signed-off-by: dpj135 <958208521@qq.com> --- .../openyuanrong_datasystem.md | 6 +-- tests/test_yuanrong_storage_client_e2e.py | 34 +++++++++++++ .../storage/bootstrap/yuanrong_bootstrap.py | 4 +- .../storage/clients/yuanrong_client.py | 50 +++++++++++++++---- 4 files changed, 79 insertions(+), 15 deletions(-) diff --git a/docs/storage_backends/openyuanrong_datasystem.md b/docs/storage_backends/openyuanrong_datasystem.md index 839ddc9..37c7a21 100644 --- a/docs/storage_backends/openyuanrong_datasystem.md +++ b/docs/storage_backends/openyuanrong_datasystem.md @@ -420,7 +420,7 @@ ps aux | grep datasystem_worker dscli stop --worker_address : # Force cleanup (use with caution) -pkill -f datasystem_worker +pkill -9 -f datasystem_worker ``` ### Multi-Process Initialization @@ -470,10 +470,10 @@ If you encounter an error like: ValueError: Cannot retrieve stored data because the backend that originally stored it is unavailable in the current process or node. Please check that the configuration and NPU resource availability are consistent across all processes and nodes. ``` -This occurs when `kv_batch_get` or `kv_batch_clear` cannot find the storage backend that originally handled the data. The most common cause is a mismatch between the process that originally `put` the data and the process performing `get`/`clear`, such as: +This occurs when `kv_batch_get` cannot find the storage backend that originally handled the data. The most common cause is a mismatch between the process that originally `put` the data and the process performing `get`, such as: - Different `enable_yr_npu_transport` settings across processes or nodes (e.g., `true` vs `false`). -- NPU hardware or CANN/torch-npu unavailable on the `get`/`clear` process or node, even though the configuration is identical. +- NPU hardware or CANN/torch-npu unavailable on the `get` process or node, even though the configuration is identical. - When running inside Ray actors, the actor may not be assigned NPU resources (e.g., missing `"NPU": 1` in `.options(resources=...)`), preventing the NPU transport backend from initializing. Solution: Ensure that all processes and nodes use the same TransferQueue configuration and have consistent NPU resource availability. When using Ray actors, make sure NPU resources are properly allocated via `.options(resources={"NPU": 1})`. diff --git a/tests/test_yuanrong_storage_client_e2e.py b/tests/test_yuanrong_storage_client_e2e.py index 17b2f88..e439478 100644 --- a/tests/test_yuanrong_storage_client_e2e.py +++ b/tests/test_yuanrong_storage_client_e2e.py @@ -219,3 +219,37 @@ def test_mixed_flow(self, config): assert_tensors_equal(o, r) else: assert o == r + + def test_get_with_invalid_backend_meta_raises_error(self, config): + """Verify that get raises ValueError when backend_meta contains an unrecognized tag.""" + client = self.client_cls(config) + keys = ["k1"] + shapes = [[]] + dtypes = [None] + invalid_meta = ["99"] + with pytest.raises(ValueError, match="Cannot retrieve stored data"): + client.get(keys, shapes, dtypes, invalid_meta) + + def test_get_with_empty_backend_meta_raises_error(self, config): + """Verify that get raises ValueError when backend_meta contains empty tags (not previously stored).""" + client = self.client_cls(config) + keys = ["k1"] + shapes = [[]] + dtypes = [None] + empty_meta = [""] + with pytest.raises(ValueError, match="no backend metadata"): + client.get(keys, shapes, dtypes, empty_meta) + + def test_put_with_no_strategies_raises_error(self, config): + """Verify that put raises ValueError when no strategy supports the value type.""" + client = self.client_cls(config) + client._strategies = [] + with pytest.raises(ValueError, match=f"No storage backend can handle {self.client_cls.ROUTE_ITEM_AS_VALUE}"): + client.put(["k1"], [1]) + + def test_clear_with_empty_backend_meta_silent(self, config): + """Verify that clear silently skips keys with empty backend_meta (not previously stored).""" + client = self.client_cls(config) + empty_meta = [""] + # No exception, no warning — only debug log + client.clear(["k1"], empty_meta) diff --git a/transfer_queue/storage/bootstrap/yuanrong_bootstrap.py b/transfer_queue/storage/bootstrap/yuanrong_bootstrap.py index bfafe95..42a7bfa 100644 --- a/transfer_queue/storage/bootstrap/yuanrong_bootstrap.py +++ b/transfer_queue/storage/bootstrap/yuanrong_bootstrap.py @@ -121,8 +121,8 @@ def start_datasystem_worker( env = os.environ.copy() if device_ids: # Ensure direct copy for specified devices - env["DS_D2H_MEMCPY_POLICY"] = "direct" - env["DS_H2D_MEMCPY_POLICY"] = "direct" + env.setdefault("DS_D2H_MEMCPY_POLICY", "direct") + env.setdefault("DS_H2D_MEMCPY_POLICY", "direct") env["ASCEND_RT_VISIBLE_DEVICES"] = device_ids logger.info( diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index 8e7c7cc..3512805 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -291,6 +291,11 @@ def mget_zero_copy(self, keys: list[str]) -> list[Any]: """ buffers = self._ds_client.get_buffers(keys) valid_indexes = [i for i, buf in enumerate(buffers) if buf is not None] + if valid_indexes and len(valid_indexes) < len(keys): + logger.warning( + f"{len(keys) - len(valid_indexes)} requested keys were not found in openYuanrong-datasystem storage. " + f"Returned results will contain None for these keys." + ) valid_bufs = [buffers[i] for i in valid_indexes] decoded_objs = batch_decode_from(valid_bufs) results = [None] * len(keys) @@ -310,6 +315,9 @@ class YuanrongStorageClient(StorageKVClient): - General objects (CPU tensors, str, bool, list, etc.) via GeneralKVClientAdapter with serialization. """ + ROUTE_ITEM_AS_VALUE = "value" + ROUTE_ITEM_AS_BACKEND_META = "backend_meta" + def __init__(self, config: dict[str, Any]): if not YUANRONG_DATASYSTEM_IMPORTED: raise ImportError("YuanRong DataSystem not installed.") @@ -331,7 +339,7 @@ def __init__(self, config: dict[str, Any]): self._strategies.append(strategy) if not self._strategies: - raise RuntimeError("No storage strategy available for YuanrongStorageClient") + raise RuntimeError("No storage backend available for YuanrongStorageClient") def put(self, keys: list[str], values: list[Any]) -> list[str]: """Stores multiple key-value pairs to remote storage. @@ -352,7 +360,7 @@ def put(self, keys: list[str], values: list[Any]) -> list[str]: raise ValueError("Number of keys must match number of values") routed_indexes = self._route_to_strategies( - values, lambda strategy_, item_: strategy_.supports_put(item_), item_label="value" + values, lambda strategy_, item_: strategy_.supports_put(item_), item_label=self.ROUTE_ITEM_AS_VALUE ) # Define the 'put_task': Slicing the input list and calling the backend strategy. @@ -394,9 +402,17 @@ def get( if not (len(keys) == len(shapes) == len(dtypes) == len(custom_backend_meta)): raise ValueError("Lengths of keys, shapes, dtypes, custom_backend_meta must match") + if any(not tag for tag in custom_backend_meta): + raise ValueError( + "Some keys have no backend metadata (empty string), indicating they " + "were not previously stored. Ensure all keys have been put before calling get." + ) + strategy_tags = custom_backend_meta routed_indexes = self._route_to_strategies( - strategy_tags, lambda strategy_, item_: strategy_.supports_get(item_), item_label="backend_meta" + strategy_tags, + lambda strategy_, item_: strategy_.supports_get(item_), + item_label=self.ROUTE_ITEM_AS_BACKEND_META, ) # Define the 'get_task': handles slicing of keys, shapes, and dtypes simultaneously. @@ -431,7 +447,7 @@ def clear(self, keys: list[str], custom_backend_meta: list[str] | None = None) - strategy_tags, lambda strategy_, item_: strategy_.supports_clear(item_), ignore_unmatched=True, - item_label="backend_meta", + item_label=self.ROUTE_ITEM_AS_BACKEND_META, ) def clear_task(strategy, indexes): @@ -446,7 +462,7 @@ def _route_to_strategies( selector: Callable[[StorageStrategy, Any], bool], *, ignore_unmatched: bool = False, - item_label: str = "value", + item_label: str, ) -> dict[StorageStrategy, list[int]]: """Groups item indices by the first strategy that supports them. @@ -462,13 +478,15 @@ def _route_to_strategies( ignore_unmatched: If True, items that don't match any strategy will be ignored (not included in output). If False, a ValueError will be raised for any unmatched item. item_label: Description of what `items` represents, used in error messages. - "value" for put (user-provided data), "backend_meta" for get/clear (backend metadata). + Use ROUTE_ITEM_AS_VALUE for put (user-provided data), + or ROUTE_ITEM_AS_BACKEND_META for get/clear (backend metadata). Returns: A dictionary mapping each active strategy to a list of indexes in `items` that it should handle. Every index appears exactly once. """ unmatched_count = 0 + warning_count = 0 routed_indexes: dict[StorageStrategy, list[int]] = {s: [] for s in self._strategies} for i, item in enumerate(items): for strategy in self._strategies: @@ -477,9 +495,11 @@ def _route_to_strategies( break else: if ignore_unmatched: + if item: # non-empty item → real tag, backend likely unavailable + warning_count += 1 unmatched_count += 1 else: - if item_label == "backend_meta": + if item_label == self.ROUTE_ITEM_AS_BACKEND_META: raise ValueError( "Cannot retrieve stored data because the backend that originally " "stored it is unavailable in the current process or node. Please " @@ -487,9 +507,19 @@ def _route_to_strategies( "consistent across all processes and nodes." ) else: - raise ValueError(f"No strategy can handle {item_label} of type {type(item).__name__}.") - if unmatched_count > 0: - logger.warning(f"{unmatched_count} items were not matched to any strategy and will be ignored.") + raise ValueError(f"No storage backend can handle {item_label} of type {type(item).__name__}.") + if warning_count > 0: + logger.warning( + f"{warning_count} stored items could not be processed because the backend " + f"that originally handled them may be unavailable in the current process or " + f"node. Please check that the configuration and NPU resource availability " + f"are consistent across all processes and nodes." + ) + if unmatched_count > warning_count: + logger.debug( + f"{unmatched_count - warning_count} items with empty {item_label} " + f"will be silently skipped (likely not previously stored)." + ) return routed_indexes