diff --git a/tutorial/basic.ipynb b/tutorial/basic.ipynb new file mode 100644 index 00000000..aaa5d448 --- /dev/null +++ b/tutorial/basic.ipynb @@ -0,0 +1,1006 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# TransferQueue Tutorial — Basic Usage\n", + "\n", + "This notebook walks through the core **Key-Value (KV) interface** of\n", + "[TransferQueue](https://github.com/Ascend/TransferQueue), an asynchronous\n", + "streaming data management module for efficient post-training workflows.\n", + "\n", + "**What you will learn:**\n", + "\n", + "1. Initialise TransferQueue (with Ray)\n", + "2. Store a single sample — `kv_put`\n", + "3. Store a batch of samples — `kv_batch_put`\n", + "4. Retrieve data — `kv_batch_get`\n", + "5. List stored keys & tags — `kv_list`\n", + "6. Partial-key and partial-field retrieval\n", + "7. Updating fields incrementally\n", + "8. Working with nested (variable-length) tensors\n", + "9. Storing variable-size image data\n", + "10. Storing non-tensor data (`NonTensorData` / `NonTensorStack`)\n", + "11. Multiple partitions\n", + "12. Clean up — `kv_clear` / `close`\n", + "\n", + "> **Prerequisites:** `pip install TransferQueue` (or install from source). \n", + "> Ray will be started automatically in this notebook." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. Initialization\n", + "\n", + "TransferQueue runs on top of [Ray](https://www.ray.io/). \n", + "We start Ray, then call `tq.init()` with a minimal configuration that uses the\n", + "built-in **SimpleStorage** backend." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2026-03-25 15:57:42,882\tINFO worker.py:2014 -- Started a local Ray instance. View the dashboard at \u001b[1m\u001b[32mhttp://127.0.0.1:8265 \u001b[39m\u001b[22m\n", + "/opt/miniconda3/envs/verl/lib/python3.11/site-packages/ray/_private/worker.py:2062: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0\n", + " warnings.warn(\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[33m(raylet)\u001b[0m It looks like you're creating a detached actor in an anonymous namespace. In order to access this actor in the future, you will need to explicitly connect to this namespace with ray.init(namespace=\"d7384875-7cae-4308-b16c-c92541dc9f07\", ...)\n", + "TransferQueue is ready!\n" + ] + } + ], + "source": [ + "import ray\n", + "import torch\n", + "from omegaconf import OmegaConf\n", + "from tensordict import TensorDict\n", + "from tensordict.tensorclass import NonTensorStack\n", + "\n", + "import transfer_queue as tq\n", + "\n", + "ray.init(ignore_reinit_error=True)\n", + "\n", + "config = OmegaConf.create(\n", + " {\n", + " \"controller\": {\"polling_mode\": True},\n", + " \"backend\": {\n", + " \"storage_backend\": \"SimpleStorage\",\n", + " \"SimpleStorage\": {\n", + " \"total_storage_size\": 200,\n", + " \"num_data_storage_units\": 2,\n", + " },\n", + " },\n", + " }\n", + ")\n", + "\n", + "tq.init(config)\n", + "print(\"TransferQueue is ready!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Store a Single Sample — `kv_put`\n", + "\n", + "`kv_put` stores **one** key-value pair. \n", + "- `key` — a unique string identifier for the sample \n", + "- `partition_id` — a logical namespace (like a table name) \n", + "- `fields` — a `dict` of tensors **or** a `TensorDict` \n", + "- `tag` — optional metadata dict attached to the key" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Stored sample_0\n" + ] + } + ], + "source": [ + "tq.kv_put(\n", + " key=\"sample_0\",\n", + " partition_id=\"train\",\n", + " fields={\"input_ids\": torch.tensor([1, 2, 3, 4])},\n", + " tag={\"source\": \"wikipedia\", \"score\": 0.95},\n", + ")\n", + "print(\"Stored sample_0\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You can also pass a pre-built `TensorDict` directly (the batch dimension\n", + "must be 1 for `kv_put`):" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Stored sample_1\n" + ] + } + ], + "source": [ + "fields_td = TensorDict(\n", + " {\n", + " \"input_ids\": torch.tensor([[5, 6, 7, 8]]),\n", + " \"attention_mask\": torch.ones(1, 4, dtype=torch.long),\n", + " },\n", + " batch_size=1,\n", + ")\n", + "\n", + "tq.kv_put(\n", + " key=\"sample_1\",\n", + " partition_id=\"train\",\n", + " fields=fields_td,\n", + " tag={\"source\": \"books\", \"score\": 0.88},\n", + ")\n", + "print(\"Stored sample_1\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. Store a Batch of Samples — `kv_batch_put`\n", + "\n", + "When you have multiple samples, `kv_batch_put` is more efficient than\n", + "calling `kv_put` in a loop. The `fields` TensorDict must have\n", + "`batch_size == len(keys)`." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Stored 3 samples in one call\n" + ] + } + ], + "source": [ + "keys = [\"batch_0\", \"batch_1\", \"batch_2\"]\n", + "\n", + "fields = TensorDict(\n", + " {\n", + " \"input_ids\": torch.tensor([[10, 20], [30, 40], [50, 60]]),\n", + " \"attention_mask\": torch.ones(3, 2, dtype=torch.long),\n", + " },\n", + " batch_size=3,\n", + ")\n", + "\n", + "tags = [\n", + " {\"split\": \"train\", \"idx\": 0},\n", + " {\"split\": \"train\", \"idx\": 1},\n", + " {\"split\": \"train\", \"idx\": 2},\n", + "]\n", + "\n", + "tq.kv_batch_put(keys=keys, partition_id=\"train\", fields=fields, tags=tags)\n", + "print(f\"Stored {len(keys)} samples in one call\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4. Retrieve Data — `kv_batch_get`\n", + "\n", + "Retrieve samples by key(s). The result is always a `TensorDict`." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "sample_0 → TensorDict(\n", + " fields={\n", + " input_ids: Tensor(shape=torch.Size([1, 4]), device=cpu, dtype=torch.int64, is_shared=False)},\n", + " batch_size=torch.Size([1]),\n", + " device=None,\n", + " is_shared=False)\n", + "input_ids: tensor([[1, 2, 3, 4]])\n" + ] + } + ], + "source": [ + "# Retrieve a single key (pass a string)\n", + "result = tq.kv_batch_get(keys=\"sample_0\", partition_id=\"train\")\n", + "print(\"sample_0 →\", result)\n", + "print(\"input_ids:\", result[\"input_ids\"])" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "batch result → TensorDict(\n", + " fields={\n", + " attention_mask: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.int64, is_shared=False),\n", + " input_ids: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.int64, is_shared=False)},\n", + " batch_size=torch.Size([3]),\n", + " device=None,\n", + " is_shared=False)\n", + "input_ids:\n", + " tensor([[10, 20],\n", + " [30, 40],\n", + " [50, 60]])\n", + "attention_mask:\n", + " tensor([[1, 1],\n", + " [1, 1],\n", + " [1, 1]])\n" + ] + } + ], + "source": [ + "# Retrieve multiple keys at once\n", + "result = tq.kv_batch_get(keys=keys, partition_id=\"train\")\n", + "print(\"batch result →\", result)\n", + "print(\"input_ids:\\n\", result[\"input_ids\"])\n", + "print(\"attention_mask:\\n\", result[\"attention_mask\"])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 5. List Keys & Tags — `kv_list`\n", + "\n", + "`kv_list` returns a nested dict:\n", + "```\n", + "{ partition_id: { key: tag_dict, ... }, ... }\n", + "```\n", + "Pass `partition_id` to filter, or omit it to see everything." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "Partition: train\n", + " sample_0: {'source': 'wikipedia', 'score': 0.95}\n", + " sample_1: {'source': 'books', 'score': 0.88}\n", + " batch_0: {'split': 'train', 'idx': 0}\n", + " batch_1: {'split': 'train', 'idx': 1}\n", + " batch_2: {'split': 'train', 'idx': 2}\n" + ] + } + ], + "source": [ + "info = tq.kv_list(partition_id=\"train\")\n", + "\n", + "for partition, key_tags in info.items():\n", + " print(f\"\\nPartition: {partition}\")\n", + " for key, tag in key_tags.items():\n", + " print(f\" {key}: {tag}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 6. Partial-Key and Partial-Field Retrieval\n", + "\n", + "You don't have to retrieve *all* keys or *all* fields at once." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 6a. Partial Keys\n", + "\n", + "Just pass a subset of the keys you stored." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Partial-key input_ids:\n", + " tensor([[10, 20],\n", + " [50, 60]])\n" + ] + } + ], + "source": [ + "partial = tq.kv_batch_get(keys=[\"batch_0\", \"batch_2\"], partition_id=\"train\")\n", + "print(\"Partial-key input_ids:\\n\", partial[\"input_ids\"])\n", + "assert partial[\"input_ids\"].shape[0] == 2 # only 2 rows" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 6b. Partial Fields\n", + "\n", + "Use the `fields` argument to select specific columns." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Fields returned: ['input_ids']\n", + "Fields returned: ['attention_mask', 'input_ids']\n" + ] + } + ], + "source": [ + "# Retrieve only input_ids (single field)\n", + "result = tq.kv_batch_get(keys=\"sample_1\", partition_id=\"train\", fields=\"input_ids\")\n", + "print(\"Fields returned:\", list(result.keys()))\n", + "assert \"input_ids\" in result.keys()\n", + "assert \"attention_mask\" not in result.keys()\n", + "\n", + "# Retrieve a specific set of fields\n", + "result = tq.kv_batch_get(\n", + " keys=\"sample_1\",\n", + " partition_id=\"train\",\n", + " fields=[\"input_ids\", \"attention_mask\"],\n", + ")\n", + "print(\"Fields returned:\", list(result.keys()))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 7. Updating Fields Incrementally\n", + "\n", + "TransferQueue tracks each field (column) independently per key (row). \n", + "You can **add new fields** to existing keys with another `kv_put` /\n", + "`kv_batch_put` call — the earlier fields are preserved." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "All fields now: ['attention_mask', 'input_ids', 'response']\n", + "response:\n", + " tensor([[100, 200],\n", + " [300, 400],\n", + " [500, 600]])\n" + ] + } + ], + "source": [ + "# Add a \"response\" field to the batch keys\n", + "response_fields = TensorDict(\n", + " {\"response\": torch.tensor([[100, 200], [300, 400], [500, 600]])},\n", + " batch_size=3,\n", + ")\n", + "\n", + "tq.kv_batch_put(keys=keys, partition_id=\"train\", fields=response_fields)\n", + "\n", + "result = tq.kv_batch_get(keys=keys, partition_id=\"train\")\n", + "print(\"All fields now:\", list(result.keys()))\n", + "print(\"response:\\n\", result[\"response\"])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 8. Working with Nested (Variable-Length) Tensors\n", + "\n", + "In many NLP and RL workloads each sample has a **different sequence length**\n", + "(e.g. generated responses). PyTorch represents these as\n", + "[nested tensors](https://pytorch.org/docs/stable/nested.html) with the\n", + "**jagged layout** (`layout=torch.jagged`), and TransferQueue handles them\n", + "natively.\n", + "\n", + "> **Note:** Because individual samples have different shapes, you must use\n", + "> `kv_batch_put` (not `kv_put`) to store nested tensors." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Stored 3 samples with variable-length responses\n" + ] + } + ], + "source": [ + "nested_keys = [\"nested_0\", \"nested_1\", \"nested_2\"]\n", + "\n", + "# Each sample has a different sequence length\n", + "nested_responses = torch.nested.as_nested_tensor(\n", + " [\n", + " torch.tensor([10, 11, 12]), # length 3\n", + " torch.tensor([20]), # length 1\n", + " torch.tensor([30, 31]), # length 2\n", + " ],\n", + " layout=torch.jagged,\n", + ")\n", + "\n", + "nested_fields = TensorDict(\n", + " {\n", + " \"input_ids\": torch.tensor([[1, 2], [3, 4], [5, 6]]),\n", + " \"response\": nested_responses,\n", + " },\n", + " batch_size=3,\n", + ")\n", + "\n", + "tq.kv_batch_put(\n", + " keys=nested_keys,\n", + " partition_id=\"train\",\n", + " fields=nested_fields,\n", + " tags=[{\"len\": 3}, {\"len\": 1}, {\"len\": 2}],\n", + ")\n", + "print(\"Stored 3 samples with variable-length responses\")" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "input_ids:\n", + " tensor([[1, 2],\n", + " [3, 4],\n", + " [5, 6]])\n", + "\n", + "response (nested tensor):\n", + " sample 0: tensor([10, 11, 12]) (length 3)\n", + " sample 1: tensor([20]) (length 1)\n", + " sample 2: tensor([30, 31]) (length 2)\n" + ] + } + ], + "source": [ + "# Retrieve all nested samples\n", + "result = tq.kv_batch_get(keys=nested_keys, partition_id=\"train\")\n", + "print(\"input_ids:\\n\", result[\"input_ids\"])\n", + "print(\"\\nresponse (nested tensor):\")\n", + "for i, sample in enumerate(result[\"response\"]):\n", + " print(f\" sample {i}: {sample} (length {sample.shape[0]})\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Partial-key retrieval works the same way with nested tensors — only the\n", + "requested samples are returned:" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Partial-key input_ids:\n", + " tensor([[1, 2],\n", + " [5, 6]])\n", + "\n", + "Partial-key responses:\n", + " sample 0: tensor([10, 11, 12])\n", + " sample 1: tensor([30, 31])\n", + "\n", + "Assertions passed!\n" + ] + } + ], + "source": [ + "# Retrieve only the first and last sample\n", + "partial = tq.kv_batch_get(keys=[\"nested_0\", \"nested_2\"], partition_id=\"train\")\n", + "print(\"Partial-key input_ids:\\n\", partial[\"input_ids\"])\n", + "print(\"\\nPartial-key responses:\")\n", + "for i, sample in enumerate(partial[\"response\"]):\n", + " print(f\" sample {i}: {sample}\")\n", + "\n", + "# Verify correctness\n", + "assert torch.equal(partial[\"response\"][0], torch.tensor([10, 11, 12]))\n", + "assert torch.equal(partial[\"response\"][1], torch.tensor([30, 31]))\n", + "print(\"\\nAssertions passed!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Higher-dimensional nested tensors work too. Here each sample is a 3D\n", + "tensor with a variable first dimension (e.g. a different number of\n", + "attention heads or generated candidates):" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "sample 0 hidden_states shape: torch.Size([2, 3, 4])\n", + "sample 1 hidden_states shape: torch.Size([5, 3, 4])\n", + "sample 2 hidden_states shape: torch.Size([1, 3, 4])\n" + ] + } + ], + "source": [ + "nested_3d_keys = [\"nd3d_0\", \"nd3d_1\", \"nd3d_2\"]\n", + "\n", + "nested_3d = torch.nested.as_nested_tensor(\n", + " [\n", + " torch.randn(2, 3, 4), # 2 heads\n", + " torch.randn(5, 3, 4), # 5 heads\n", + " torch.randn(1, 3, 4), # 1 head\n", + " ],\n", + " layout=torch.jagged,\n", + ")\n", + "\n", + "fields_3d = TensorDict(\n", + " {\n", + " \"input_ids\": torch.tensor([[1, 2], [3, 4], [5, 6]]),\n", + " \"hidden_states\": nested_3d,\n", + " },\n", + " batch_size=3,\n", + ")\n", + "\n", + "tq.kv_batch_put(keys=nested_3d_keys, partition_id=\"train\", fields=fields_3d)\n", + "\n", + "result_3d = tq.kv_batch_get(keys=nested_3d_keys, partition_id=\"train\")\n", + "for i, sample in enumerate(result_3d[\"hidden_states\"]):\n", + " print(f\"sample {i} hidden_states shape: {sample.shape}\")\n", + "\n", + "# Clean up nested-tensor keys\n", + "tq.kv_clear(keys=nested_keys, partition_id=\"train\")\n", + "tq.kv_clear(keys=nested_3d_keys, partition_id=\"train\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 9. Storing Variable-Size Image Data\n", + "\n", + "A common multimodal scenario: each sample in a batch contains a\n", + "**different number of images**, and each image has a **different\n", + "resolution**. We can model this with a list of nested tensors — one\n", + "nested tensor per sample — wrapped inside a `TensorDict`.\n", + "\n", + "Since the data is doubly ragged (variable count *and* variable size),\n", + "we store each image as a flattened 1-D tensor and pack all images per\n", + "sample into a single jagged nested tensor. This way every sample is\n", + "one element of the batch, yet images retain their individual sizes." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Stored 3 samples with variable numbers of variable-size images\n" + ] + } + ], + "source": [ + "image_keys = [\"img_sample_0\", \"img_sample_1\", \"img_sample_2\"]\n", + "\n", + "# Sample 0: 2 images — 3×32×32 (RGB, 32×32) and 3×64×64\n", + "sample_0_images = [torch.randn(3, 32, 32), torch.randn(3, 64, 64)]\n", + "\n", + "# Sample 1: 1 image — 3×48×48\n", + "sample_1_images = [torch.randn(3, 48, 48)]\n", + "\n", + "# Sample 2: 3 images — 3×16×16, 3×24×24, 3×32×64\n", + "sample_2_images = [torch.randn(3, 16, 16), torch.randn(3, 24, 24), torch.randn(3, 32, 64)]\n", + "\n", + "\n", + "# Flatten each image to 1-D so they can live in a single jagged nested tensor per sample\n", + "def flatten_images(images):\n", + " return torch.cat([img.flatten() for img in images])\n", + "\n", + "\n", + "pixel_data = torch.nested.as_nested_tensor(\n", + " [\n", + " flatten_images(sample_0_images), # 3*32*32 + 3*64*64 = 15360\n", + " flatten_images(sample_1_images), # 3*48*48 = 6912\n", + " flatten_images(sample_2_images),\n", + " ], # 3*16*16 + 3*24*24 + 3*32*64 = 8736\n", + " layout=torch.jagged,\n", + ")\n", + "\n", + "# Store the number of pixels per image so we can reconstruct later\n", + "image_shapes = torch.nested.as_nested_tensor(\n", + " [\n", + " torch.tensor([[3, 32, 32], [3, 64, 64]]), # 2 images\n", + " torch.tensor([[3, 48, 48]]), # 1 image\n", + " torch.tensor([[3, 16, 16], [3, 24, 24], [3, 32, 64]]), # 3 images\n", + " ],\n", + " layout=torch.jagged,\n", + ")\n", + "\n", + "fields_img = TensorDict(\n", + " {\n", + " \"prompt\": torch.tensor([[101, 102], [201, 202], [301, 302]]),\n", + " \"pixel_data\": pixel_data,\n", + " \"image_shapes\": image_shapes,\n", + " },\n", + " batch_size=3,\n", + ")\n", + "\n", + "tags_img = [\n", + " {\"num_images\": 2},\n", + " {\"num_images\": 1},\n", + " {\"num_images\": 3},\n", + "]\n", + "\n", + "tq.kv_batch_put(keys=image_keys, partition_id=\"train\", fields=fields_img, tags=tags_img)\n", + "print(\"Stored 3 samples with variable numbers of variable-size images\")" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Sample 0: 2 image(s) → [(3, 32, 32), (3, 64, 64)]\n", + "Sample 1: 1 image(s) → [(3, 48, 48)]\n", + "Sample 2: 3 image(s) → [(3, 16, 16), (3, 24, 24), (3, 32, 64)]\n" + ] + } + ], + "source": [ + "# Retrieve and reconstruct the images\n", + "result_img = tq.kv_batch_get(keys=image_keys, partition_id=\"train\")\n", + "\n", + "for i in range(result_img.batch_size[0]):\n", + " shapes = result_img[\"image_shapes\"][i] # (num_images, 3) tensor\n", + " pixels = result_img[\"pixel_data\"][i] # flat 1-D tensor of all pixels\n", + " num_images = shapes.shape[0]\n", + "\n", + " offset = 0\n", + " reconstructed = []\n", + " for j in range(num_images):\n", + " c, h, w = shapes[j].tolist()\n", + " numel = c * h * w\n", + " img = pixels[offset : offset + numel].reshape(c, h, w)\n", + " reconstructed.append(img)\n", + " offset += numel\n", + "\n", + " print(f\"Sample {i}: {num_images} image(s) → {[tuple(img.shape) for img in reconstructed]}\")\n", + "\n", + "# Clean up\n", + "tq.kv_clear(keys=image_keys, partition_id=\"train\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 10. Storing Non-Tensor Data — `NonTensorData` / `NonTensorStack`\n", + "\n", + "Not every field is a numeric tensor. Prompts, file paths, JSON metadata,\n", + "or arbitrary Python objects can be stored as **non-tensor data** using\n", + "tensordict's `NonTensorData` and `NonTensorStack`.\n", + "\n", + "- `NonTensorData` wraps a **single** Python object (string, dict, list, …).\n", + "- `NonTensorStack` wraps a **batch** of Python objects — one per sample.\n", + "\n", + "TransferQueue serialises them transparently alongside regular tensors." + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Stored 3 samples with non-tensor fields\n" + ] + } + ], + "source": [ + "nt_keys = [\"nt_sample_0\", \"nt_sample_1\", \"nt_sample_2\"]\n", + "\n", + "# Build a TensorDict that mixes tensors with non-tensor data\n", + "nt_fields = TensorDict(\n", + " {\n", + " \"input_ids\": torch.tensor([[1, 2, 3], [4, 5, 6], [7, 8, 9]]),\n", + " # NonTensorStack: one string per sample in the batch\n", + " \"prompt_text\": NonTensorStack(\n", + " \"Summarise the following article.\",\n", + " \"Translate to French:\",\n", + " \"Write a poem about rain.\",\n", + " ),\n", + " # You can also store richer Python objects (dicts, lists, …)\n", + " \"metadata\": NonTensorStack(\n", + " {\"source\": \"wiki\", \"lang\": \"en\"},\n", + " {\"source\": \"books\", \"lang\": \"fr\"},\n", + " {\"source\": \"user\", \"lang\": \"en\"},\n", + " ),\n", + " },\n", + " batch_size=3,\n", + ")\n", + "\n", + "tq.kv_batch_put(keys=nt_keys, partition_id=\"train\", fields=nt_fields)\n", + "print(\"Stored 3 samples with non-tensor fields\")" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "input_ids:\n", + " tensor([[1, 2, 3],\n", + " [4, 5, 6],\n", + " [7, 8, 9]])\n", + "\n", + "prompt_text (NonTensorStack):\n", + " [0] 'Summarise the following article.'\n", + " [1] 'Translate to French:'\n", + " [2] 'Write a poem about rain.'\n", + "\n", + "metadata (NonTensorStack of dicts):\n", + " [0] {'source': 'wiki', 'lang': 'en'}\n", + " [1] {'source': 'books', 'lang': 'fr'}\n", + " [2] {'source': 'user', 'lang': 'en'}\n", + "\n", + "Single-key non-tensor field: label=[['positive']]\n" + ] + } + ], + "source": [ + "# Retrieve and inspect non-tensor fields\n", + "result_nt = tq.kv_batch_get(keys=nt_keys, partition_id=\"train\")\n", + "\n", + "print(\"input_ids:\\n\", result_nt[\"input_ids\"])\n", + "\n", + "print(\"\\nprompt_text (NonTensorStack):\")\n", + "for i, text in enumerate(list(result_nt[\"prompt_text\"])):\n", + " print(f\" [{i}] {text!r}\")\n", + "\n", + "print(\"\\nmetadata (NonTensorStack of dicts):\")\n", + "for i, meta in enumerate(list(result_nt[\"metadata\"])):\n", + " print(f\" [{i}] {meta}\")\n", + "\n", + "# You can also add a NonTensorData field to a single key via kv_put\n", + "tq.kv_put(\n", + " key=\"single_nt\",\n", + " partition_id=\"train\",\n", + " fields={\"label\": NonTensorStack(\"positive\"), \"score\": torch.tensor([0.99])},\n", + ")\n", + "single = tq.kv_batch_get(keys=\"single_nt\", partition_id=\"train\")\n", + "print(f\"\\nSingle-key non-tensor field: label={list(single['label'])}\")\n", + "\n", + "# Clean up\n", + "tq.kv_clear(keys=nt_keys, partition_id=\"train\")\n", + "tq.kv_clear(keys=\"single_nt\", partition_id=\"train\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 11. Multiple Partitions\n", + "\n", + "Partitions provide logical isolation — the same key name can exist in\n", + "different partitions without conflict." + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "All partitions: ['train', 'validation']\n" + ] + } + ], + "source": [ + "tq.kv_put(\n", + " key=\"val_sample_0\",\n", + " partition_id=\"validation\",\n", + " fields={\"input_ids\": torch.tensor([99, 98, 97])},\n", + " tag={\"split\": \"val\"},\n", + ")\n", + "\n", + "all_info = tq.kv_list() # no partition_id → list everything\n", + "print(\"All partitions:\", list(all_info.keys()))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 12. Clean Up — `kv_clear` and `close`\n", + "\n", + "Remove specific keys with `kv_clear`, then shut down the system with `tq.close()`." + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "All keys cleared.\n", + "Remaining keys across all partitions: 0\n" + ] + } + ], + "source": [ + "# Clear individual keys\n", + "tq.kv_clear(keys=\"sample_0\", partition_id=\"train\")\n", + "tq.kv_clear(keys=\"sample_1\", partition_id=\"train\")\n", + "tq.kv_clear(keys=keys, partition_id=\"train\")\n", + "tq.kv_clear(keys=\"val_sample_0\", partition_id=\"validation\")\n", + "print(\"All keys cleared.\")\n", + "\n", + "remaining = tq.kv_list()\n", + "total_keys = sum(len(v) for v in remaining.values())\n", + "print(f\"Remaining keys across all partitions: {total_keys}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "TransferQueue and Ray shut down.\n" + ] + } + ], + "source": [ + "tq.close()\n", + "ray.shutdown()\n", + "print(\"TransferQueue and Ray shut down.\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## Summary\n", + "\n", + "| Operation | Function | Notes |\n", + "|---|---|---|\n", + "| Init | `tq.init(config)` | Call once; subsequent processes auto-connect |\n", + "| Put single | `tq.kv_put(key, partition_id, fields, tag)` | `fields` can be a plain dict |\n", + "| Put batch | `tq.kv_batch_put(keys, partition_id, fields, tags)` | `fields` must be a `TensorDict` |\n", + "| Get | `tq.kv_batch_get(keys, partition_id, fields=None)` | Returns a `TensorDict` |\n", + "| List | `tq.kv_list(partition_id=None)` | Returns `{partition: {key: tag}}` |\n", + "| Clear | `tq.kv_clear(keys, partition_id)` | Removes keys + data |\n", + "| Close | `tq.close()` | Tears down controller & storage |\n", + "\n", + "For **async** variants, use `async_kv_put`, `async_kv_batch_put`,\n", + "`async_kv_batch_get`, `async_kv_list`, and `async_kv_clear`.\n", + "\n", + "For low-level, metadata-based access, see `tq.get_client()` and the\n", + "[official tutorials](https://github.com/Ascend/TransferQueue/tree/main/tutorial)." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.14" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}