Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion docs/storage_backends/openyuanrong_datasystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ ps aux | grep datasystem_worker
dscli stop --worker_address <IP>:<PORT>

# Force cleanup (use with caution)
pkill -f datasystem_worker
pkill -9 -f datasystem_worker
```

### Multi-Process Initialization
Expand Down Expand Up @@ -463,6 +463,21 @@ RuntimeError: code: [Out of memory], msg: [Shared memory no space in arena: ...]

Solution: Increase `--shared_memory_size_mb` in `worker_args`, or reduce the data volume being cached.

### "Cannot retrieve stored data" Error on get/clear

If you encounter an error like:
```
ValueError: Cannot retrieve stored data because the backend that originally stored it is unavailable in the current process or node. Please check that the configuration and NPU resource availability are consistent across all processes and nodes.
```

This occurs when `kv_batch_get` cannot find the storage backend that originally handled the data. The most common cause is a mismatch between the process that originally `put` the data and the process performing `get`, such as:

- Different `enable_yr_npu_transport` settings across processes or nodes (e.g., `true` vs `false`).
- NPU hardware or CANN/torch-npu unavailable on the `get` process or node, even though the configuration is identical.
- When running inside Ray actors, the actor may not be assigned NPU resources (e.g., missing `"NPU": 1` in `.options(resources=...)`), preventing the NPU transport backend from initializing.

Solution: Ensure that all processes and nodes use the same TransferQueue configuration and have consistent NPU resource availability. When using Ray actors, make sure NPU resources are properly allocated via `.options(resources={"NPU": 1})`.


## Datasystem Logs

Expand Down
34 changes: 34 additions & 0 deletions tests/test_yuanrong_storage_client_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,37 @@ def test_mixed_flow(self, config):
assert_tensors_equal(o, r)
else:
assert o == r

def test_get_with_invalid_backend_meta_raises_error(self, config):
"""Verify that get raises ValueError when backend_meta contains an unrecognized tag."""
client = self.client_cls(config)
keys = ["k1"]
shapes = [[]]
dtypes = [None]
invalid_meta = ["99"]
with pytest.raises(ValueError, match="Cannot retrieve stored data"):
client.get(keys, shapes, dtypes, invalid_meta)

def test_get_with_empty_backend_meta_raises_error(self, config):
"""Verify that get raises ValueError when backend_meta contains empty tags (not previously stored)."""
client = self.client_cls(config)
keys = ["k1"]
shapes = [[]]
dtypes = [None]
empty_meta = [""]
with pytest.raises(ValueError, match="no backend metadata"):
client.get(keys, shapes, dtypes, empty_meta)

def test_put_with_no_strategies_raises_error(self, config):
"""Verify that put raises ValueError when no strategy supports the value type."""
client = self.client_cls(config)
client._strategies = []
with pytest.raises(ValueError, match=f"No storage backend can handle {self.client_cls.ROUTE_ITEM_AS_VALUE}"):
client.put(["k1"], [1])

def test_clear_with_empty_backend_meta_silent(self, config):
"""Verify that clear silently skips keys with empty backend_meta (not previously stored)."""
client = self.client_cls(config)
empty_meta = [""]
# No exception, no warning — only debug log
client.clear(["k1"], empty_meta)
4 changes: 4 additions & 0 deletions transfer_queue/storage/bootstrap/yuanrong_bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ def start_datasystem_worker(
if device_ids or enable_rdma or ucx_env_vars:
env = os.environ.copy()
if device_ids:
# Ensure direct copy for specified devices
env.setdefault("DS_D2H_MEMCPY_POLICY", "direct")
env.setdefault("DS_H2D_MEMCPY_POLICY", "direct")

env["ASCEND_RT_VISIBLE_DEVICES"] = device_ids
logger.info(
f"Setting ASCEND_RT_VISIBLE_DEVICES={device_ids} for dscli subprocess ({node_type} at {worker_address})"
Expand Down
64 changes: 54 additions & 10 deletions transfer_queue/storage/clients/yuanrong_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ def mget_zero_copy(self, keys: list[str]) -> list[Any]:
"""
buffers = self._ds_client.get_buffers(keys)
valid_indexes = [i for i, buf in enumerate(buffers) if buf is not None]
if valid_indexes and len(valid_indexes) < len(keys):
logger.warning(
f"{len(keys) - len(valid_indexes)} requested keys were not found in openYuanrong-datasystem storage. "
f"Returned results will contain None for these keys."
)
valid_bufs = [buffers[i] for i in valid_indexes]
decoded_objs = batch_decode_from(valid_bufs)
results = [None] * len(keys)
Expand All @@ -310,6 +315,9 @@ class YuanrongStorageClient(StorageKVClient):
- General objects (CPU tensors, str, bool, list, etc.) via GeneralKVClientAdapter with serialization.
"""

ROUTE_ITEM_AS_VALUE = "value"
ROUTE_ITEM_AS_BACKEND_META = "backend_meta"

def __init__(self, config: dict[str, Any]):
if not YUANRONG_DATASYSTEM_IMPORTED:
raise ImportError("YuanRong DataSystem not installed.")
Expand All @@ -331,7 +339,7 @@ def __init__(self, config: dict[str, Any]):
self._strategies.append(strategy)

if not self._strategies:
raise RuntimeError("No storage strategy available for YuanrongStorageClient")
raise RuntimeError("No storage backend available for YuanrongStorageClient")

def put(self, keys: list[str], values: list[Any]) -> list[str]:
"""Stores multiple key-value pairs to remote storage.
Expand All @@ -351,7 +359,9 @@ def put(self, keys: list[str], values: list[Any]) -> list[str]:
if len(keys) != len(values):
raise ValueError("Number of keys must match number of values")

routed_indexes = self._route_to_strategies(values, lambda strategy_, item_: strategy_.supports_put(item_))
routed_indexes = self._route_to_strategies(
values, lambda strategy_, item_: strategy_.supports_put(item_), item_label=self.ROUTE_ITEM_AS_VALUE
)

# Define the 'put_task': Slicing the input list and calling the backend strategy.
# The closure captures local 'keys' and 'values' for zero-overhead parameter passing.
Expand Down Expand Up @@ -392,9 +402,17 @@ def get(
if not (len(keys) == len(shapes) == len(dtypes) == len(custom_backend_meta)):
raise ValueError("Lengths of keys, shapes, dtypes, custom_backend_meta must match")

if any(not tag for tag in custom_backend_meta):
raise ValueError(
"Some keys have no backend metadata (empty string), indicating they "
"were not previously stored. Ensure all keys have been put before calling get."
)

strategy_tags = custom_backend_meta
routed_indexes = self._route_to_strategies(
strategy_tags, lambda strategy_, item_: strategy_.supports_get(item_)
strategy_tags,
lambda strategy_, item_: strategy_.supports_get(item_),
item_label=self.ROUTE_ITEM_AS_BACKEND_META,
)

# Define the 'get_task': handles slicing of keys, shapes, and dtypes simultaneously.
Expand Down Expand Up @@ -426,7 +444,10 @@ def clear(self, keys: list[str], custom_backend_meta: list[str] | None = None) -

strategy_tags = custom_backend_meta
routed_indexes = self._route_to_strategies(
strategy_tags, lambda strategy_, item_: strategy_.supports_clear(item_), ignore_unmatched=True
strategy_tags,
lambda strategy_, item_: strategy_.supports_clear(item_),
ignore_unmatched=True,
item_label=self.ROUTE_ITEM_AS_BACKEND_META,
)
Comment on lines +447 to 451

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

distinguishes between two cases when ignore_unmatched is True : keys that have never been put into and keys put into ds from different storageClient.


def clear_task(strategy, indexes):
Expand All @@ -439,7 +460,9 @@ def _route_to_strategies(
self,
items: list[Any],
selector: Callable[[StorageStrategy, Any], bool],
*,
ignore_unmatched: bool = False,
item_label: str,
) -> dict[StorageStrategy, list[int]]:
"""Groups item indices by the first strategy that supports them.

Expand All @@ -454,12 +477,16 @@ def _route_to_strategies(
Signature: `(strategy: StorageStrategy, item: Any) -> bool`.
ignore_unmatched: If True, items that don't match any strategy will be ignored (not included in output).
If False, a ValueError will be raised for any unmatched item.
item_label: Description of what `items` represents, used in error messages.
Use ROUTE_ITEM_AS_VALUE for put (user-provided data),
or ROUTE_ITEM_AS_BACKEND_META for get/clear (backend metadata).

Returns:
A dictionary mapping each active strategy to a list of indexes in `items`
that it should handle. Every index appears exactly once.
"""
unmatched_count = 0
warning_count = 0
routed_indexes: dict[StorageStrategy, list[int]] = {s: [] for s in self._strategies}
for i, item in enumerate(items):
for strategy in self._strategies:
Expand All @@ -468,14 +495,31 @@ def _route_to_strategies(
break
else:
if ignore_unmatched:
if item: # non-empty item → real tag, backend likely unavailable
warning_count += 1
unmatched_count += 1
else:
raise ValueError(
f"No strategy supports item of type {type(item).__name__}: {item}. "
f"Available strategies: {[type(s).__name__ for s in self._strategies]}"
)
if unmatched_count > 0:
logger.warning(f"{unmatched_count} items were not matched to any strategy and will be ignored.")
if item_label == self.ROUTE_ITEM_AS_BACKEND_META:
raise ValueError(
"Cannot retrieve stored data because the backend that originally "
"stored it is unavailable in the current process or node. Please "
"check that the configuration and NPU resource availability are "
"consistent across all processes and nodes."
)
Comment on lines +503 to +508
else:
raise ValueError(f"No storage backend can handle {item_label} of type {type(item).__name__}.")
if warning_count > 0:
logger.warning(
f"{warning_count} stored items could not be processed because the backend "
f"that originally handled them may be unavailable in the current process or "
f"node. Please check that the configuration and NPU resource availability "
f"are consistent across all processes and nodes."
)
if unmatched_count > warning_count:
logger.debug(
f"{unmatched_count - warning_count} items with empty {item_label} "
f"will be silently skipped (likely not previously stored)."
)

return routed_indexes

Expand Down
Loading