Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions nemo_retriever/harness/HANDOFF.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,29 @@ Operator-oriented notes for `nemo_retriever` benchmark runs. Implementation deta

## Scope

- Standalone harness under `nemo_retriever` (not `tools/harness`).
- Invokes **`nemo_retriever.examples.graph_pipeline`** (`batch` / `inprocess` via `--run-mode`).
- LanceDB only; recall gating via `recall_required` in config.

## Key files
- Harness is standalone under `nemo_retriever` (not based on `tools/harness`).
- It wraps `nemo_retriever.examples.graph_pipeline`.
- Primary use case is benchmark orchestration for local/cluster runs without Docker orchestration.
- Vector DB is LanceDB only.
- Recall gating is supported and enforced by config (`recall_required`).

## Graph Refactor Integration (Apr 2026)

- Upstream main includes `53f3a8c5` (`Refactor step (#1778)`), which removed mode-specific runtime files:
- removed: `nemo_retriever/src/nemo_retriever/examples/batch_pipeline.py`
- removed: `nemo_retriever/src/nemo_retriever/examples/inprocess_pipeline.py`
- removed: `nemo_retriever/src/nemo_retriever/ingest_modes/`
- added: `nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py`
- added: `nemo_retriever/src/nemo_retriever/graph_ingestor.py`
- added/active seam: `nemo_retriever/src/nemo_retriever/graph/ingestor_runtime.py`
- Any tuning or stability changes previously implemented in `batch_pipeline.py` or `ingest_modes/*` must now be ported to:
- CLI arg wiring: `examples/graph_pipeline.py`
- graph node override mapping: `graph/ingestor_runtime.py`
- embed operators/runtime: `text_embed/operators.py`, `text_embed/runtime.py`
- local model behavior: `model/local/llama_nemotron_embed_1b_v2_embedder.py`
- For embed OOM stabilization specifically, old `batch/inprocess` diffs should be treated as historical evidence only; the mergeable implementation surface is graph-only.

## Key Files

| Path | Role |
|------|------|
Expand Down Expand Up @@ -72,7 +90,8 @@ Prefer **`summary_metrics`** for dashboards (small set: pages, ingest timing, re
- Dataset paths under `/datasets/nv-ingest/...` may resolve to `/raid/$USER/...` when the former is missing.
- Optional **store** options (`store_images_uri`, `store_text`, `strip_base64`) map to graph CLI flags; relative `store_images_uri` is resolved under the run artifact directory. See `config.py` for env overrides (`HARNESS_STORE_*`).

## Backlog (maintainers / agents)
3. **TTY-backed subprocess retained**
- Harness runs graph pipeline through a PTY so Ray progress remains rich/pretty by default.

Ideas not committed to code; pick up or trim as priorities change.

Expand Down
2 changes: 2 additions & 0 deletions nemo_retriever/harness/test_configs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ presets:
ocr_batch_size: 16
embed_workers: 3
embed_batch_size: 256
embed_inference_batch_size: 32
page_elements_cpus_per_actor: 1.0
ocr_cpus_per_actor: 1.0
embed_cpus_per_actor: 1.0
Expand All @@ -62,6 +63,7 @@ presets:
ocr_batch_size: 16
embed_workers: 3
embed_batch_size: 256
embed_inference_batch_size: 32
page_elements_cpus_per_actor: 1.0
ocr_cpus_per_actor: 1.0
embed_cpus_per_actor: 1.0
Expand Down
14 changes: 12 additions & 2 deletions nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,16 @@ def main(
page_elements_cpus_per_actor: Optional[float] = typer.Option(0.0, "--page-elements-cpus-per-actor"),
page_elements_gpus_per_actor: Optional[float] = typer.Option(None, "--page-elements-gpus-per-actor", max=1.0),
embed_actors: Optional[int] = typer.Option(0, "--embed-actors"),
embed_batch_size: Optional[int] = typer.Option(0, "--embed-batch-size"),
embed_batch_size: Optional[int] = typer.Option(
0,
"--embed-batch-size",
help="Ray Data batch size for embedding stage scheduling.",
),
embed_inference_batch_size: Optional[int] = typer.Option(
0,
"--embed-inference-batch-size",
help="Per-forward local embedding microbatch size (VRAM control knob).",
),
embed_cpus_per_actor: Optional[float] = typer.Option(0.0, "--embed-cpus-per-actor"),
embed_gpus_per_actor: Optional[float] = typer.Option(None, "--embed-gpus-per-actor", max=1.0),
pdf_split_batch_size: int = typer.Option(1, "--pdf-split-batch-size", min=1),
Expand Down Expand Up @@ -453,7 +462,8 @@ def main(
"structured_elements_modality": structured_elements_modality,
"embed_granularity": embed_granularity,
"batch_tuning": embed_batch_tuning,
"inference_batch_size": embed_batch_size or None,
"inference_batch_size": embed_inference_batch_size or embed_batch_size or None,
"embed_inference_batch_size": embed_inference_batch_size or None,
}.items()
if v is not None
}
Expand Down
84 changes: 74 additions & 10 deletions nemo_retriever/src/nemo_retriever/graph_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
from __future__ import annotations

import json
import logging
import os
import sys
from typing import Any, Callable, Dict, List, Optional, Union

from nemo_retriever.graph import InprocessExecutor, RayDataExecutor
Expand All @@ -46,8 +46,11 @@
StoreParams,
TextChunkParams,
)
from nemo_retriever.utils.hf_cache import resolve_hf_cache_dir
from nemo_retriever.utils.remote_auth import resolve_remote_api_key

logger = logging.getLogger(__name__)


def _resolve_api_key(params: Any) -> Any:
"""Auto-resolve api_key from NVIDIA_API_KEY / NGC_API_KEY if not explicitly set."""
Expand Down Expand Up @@ -75,6 +78,66 @@ def _coerce(params: Any, kwargs: dict[str, Any], *, default_factory: Callable[[]
return params


def _runtime_env_vars(*, debug: bool) -> dict[str, str]:
env_vars = {
"NEMO_RETRIEVER_HF_CACHE_DIR": resolve_hf_cache_dir(),
"LOG_LEVEL": "DEBUG" if debug else "INFO",
}
return {key: value for key, value in env_vars.items() if isinstance(value, str)}


def _resolve_object_store_memory_bytes() -> int | None:
raw_bytes = os.getenv("RAY_OBJECT_STORE_MEMORY_BYTES")
if raw_bytes is not None:
try:
value = int(raw_bytes)
if value > 0:
return value
except ValueError:
logger.warning("Invalid RAY_OBJECT_STORE_MEMORY_BYTES=%r; expected positive integer bytes.", raw_bytes)

raw_proportion = os.getenv("RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION")
if raw_proportion is None:
return None

try:
proportion = float(raw_proportion)
except ValueError:
logger.warning(
"Invalid RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION=%r; expected float in (0, 1].",
raw_proportion,
)
return None

if proportion <= 0.0:
return None
if proportion > 1.0:
logger.warning(
"RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION=%s is > 1.0; clamping to 1.0.",
raw_proportion,
)
proportion = 1.0

try:
total_memory_bytes = int(os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES"))
except (ValueError, OSError, AttributeError):
logger.warning("Could not detect system memory; skipping object store override.")
return None

target_bytes = int(total_memory_bytes * proportion)

try:
shm_stats = os.statvfs("/dev/shm")
shm_total_bytes = int(shm_stats.f_frsize * shm_stats.f_blocks)
except (OSError, AttributeError, ValueError):
shm_total_bytes = 0

if shm_total_bytes > 0:
target_bytes = min(target_bytes, int(shm_total_bytes * 0.98))

return target_bytes if target_bytes > 0 else None


class GraphIngestor(ingestor):
"""Ingestor that constructs and executes operator graphs directly.

Expand Down Expand Up @@ -274,16 +337,17 @@ def ingest(self, params: Any = None, **kwargs: Any) -> Any:
import ray

if self._ray_address or not ray.is_initialized():
runtime_env = {
"env_vars": {
"VIRTUAL_ENV": os.path.dirname(os.path.dirname(sys.executable)),
},
ray_init_kwargs: dict[str, Any] = {
"address": self._ray_address,
"ignore_reinit_error": True,
"log_to_driver": bool(self._ray_log_to_driver),
"runtime_env": {"env_vars": _runtime_env_vars(debug=bool(self._debug))},
}
ray.init(
address=self._ray_address,
ignore_reinit_error=True,
runtime_env=runtime_env,
)
object_store_memory = _resolve_object_store_memory_bytes()
if object_store_memory is not None:
ray_init_kwargs["object_store_memory"] = object_store_memory
logger.info("Ray object_store_memory configured to %.1f GiB.", object_store_memory / (1024**3))
ray.init(**ray_init_kwargs)
cluster_resources = gather_cluster_resources(ray)

graph = build_graph(
Expand Down
4 changes: 4 additions & 0 deletions nemo_retriever/src/nemo_retriever/harness/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"ocr_batch_size",
"embed_workers",
"embed_batch_size",
"embed_inference_batch_size",
"page_elements_cpus_per_actor",
"ocr_cpus_per_actor",
"embed_cpus_per_actor",
Expand Down Expand Up @@ -106,6 +107,7 @@ class HarnessConfig:
ocr_batch_size: int = 16
embed_workers: int = 3
embed_batch_size: int = 256
embed_inference_batch_size: int = 32
page_elements_cpus_per_actor: float = 1.0
ocr_cpus_per_actor: float = 1.0
embed_cpus_per_actor: float = 1.0
Expand Down Expand Up @@ -183,6 +185,8 @@ def validate(self) -> list[str]:
min_val = 0 if name in _ZERO_ALLOWED_WORKERS else 1
if int(val) < min_val:
errors.append(f"{name} must be >= {min_val}")
elif name.endswith("_batch_size") and int(val) < 1:
errors.append(f"{name} must be >= 1")

return errors

Expand Down
2 changes: 2 additions & 0 deletions nemo_retriever/src/nemo_retriever/harness/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,8 @@ def _build_command(
str(cfg.embed_workers),
"--embed-batch-size",
str(cfg.embed_batch_size),
"--embed-inference-batch-size",
str(cfg.embed_inference_batch_size),
"--page-elements-cpus-per-actor",
str(cfg.page_elements_cpus_per_actor),
"--ocr-cpus-per-actor",
Expand Down
Loading
Loading