diff --git a/nemo_retriever/docs/cli/README.md b/nemo_retriever/docs/cli/README.md
new file mode 100644
index 000000000..d0aef882e
--- /dev/null
+++ b/nemo_retriever/docs/cli/README.md
@@ -0,0 +1,74 @@
+# Retriever CLI — Replacement Examples for `nv-ingest-cli`
+
+This folder contains `retriever` command-line examples that deliver the same
+end-user outcomes as the `nv-ingest-cli` examples in
+`nv-ingest/docs/`, `nv-ingest/api/`, `nv-ingest/client/`, and `nv-ingest/deploy/`.
+
+The original `nv-ingest-cli` documentation is **not removed** — these files sit
+alongside it as a new-CLI counterpart you can link to or migrate to.
+
+## Key shape difference
+
+`nv-ingest-cli` is a **single command that talks to a running REST service on
+`localhost:7670`** and composes work via repeated `--task extract|split|caption|embed|dedup|filter|udf`.
+
+`retriever` is a **multi-subcommand Typer app**. Most of the old CLI examples
+map to `retriever pipeline run INPUT_PATH`, which runs the graph pipeline
+locally (in-process or via Ray) and writes results to LanceDB and, optionally,
+to Parquet / object storage. Other subcommands cover focused tasks:
+
+| Old intent | New subcommand |
+|------------|----------------|
+| Extract + embed + store a batch of documents | `retriever pipeline run` |
+| Run an ad-hoc PDF extraction stage | `retriever pdf stage` |
+| Run an HTML / text / audio / chart stage | `retriever html run`, `retriever txt run`, `retriever audio extract`, `retriever chart run` |
+| Upload stage output to LanceDB | `retriever vector-store stage` |
+| Query LanceDB + compute recall@k | `retriever recall vdb-recall` |
+| Run a QA evaluation sweep | `retriever eval run` |
+| Serve / submit to the online REST API | `retriever online serve` / `retriever online stream-pdf` |
+| Benchmark stage throughput | `retriever benchmark {split,extract,audio-extract,page-elements,ocr,all}` |
+| Benchmark orchestration | `retriever harness {run,sweep,nightly,summary,compare}` |
+
+## Contents
+
+| New file | Replaces example(s) in |
+|----------|------------------------|
+| [`retriever_cli.md`](retriever_cli.md) | `nv-ingest/docs/docs/extraction/nv-ingest_cli.md` and the rebranded mirror `cli-reference.md` |
+| [`quickstart.md`](quickstart.md) | `nv-ingest/docs/docs/extraction/quickstart-guide.md` (the `nv-ingest-cli` section) |
+| [`pdf-split-tuning.md`](pdf-split-tuning.md) | `nv-ingest/docs/docs/extraction/v2-api-guide.md` (CLI example) |
+| [`smoke-test.md`](smoke-test.md) | `nv-ingest/api/api_tests/smoke_test.sh` |
+| [`cli-client-usage.md`](cli-client-usage.md) | `nv-ingest/client/client_examples/examples/cli_client_usage.ipynb` |
+| [`pdf-blueprint.md`](pdf-blueprint.md) | `nv-ingest/deploy/pdf-blueprint.ipynb` (CLI cell) |
+| [`benchmarking.md`](benchmarking.md) | `nv-ingest/docs/docs/extraction/benchmarking.md` and `nv-ingest/tools/harness/README.md` |
+
+## Gaps with no retriever-CLI equivalent (kept out of this folder)
+
+The following `nv-ingest-cli` examples are **not** migrated here because the
+new CLI does not yet expose an equivalent — continue to use `nv-ingest-cli`
+for these cases:
+
+- `--task 'udf:{…}'` — user-defined functions
+ (`nv-ingest/docs/docs/extraction/user-defined-functions.md`,
+ `nv-ingest/examples/udfs/README.md`). `retriever` does not expose UDFs.
+- `--task 'filter:{content_type:"image", min_size:…, min_aspect_ratio:…, max_aspect_ratio:…}'`.
+ The image scale/aspect-ratio filter stage is not reproduced in the new CLI.
+- Bare service submission (`nv-ingest-cli --doc foo.pdf` with no extract tasks
+ and full content-type metadata returned by the service). `retriever online submit`
+ is currently a stub — only `retriever online stream-pdf` is implemented.
+- `gen_dataset.py` dataset creation with enumeration and sampling.
+- `--collect_profiling_traces --zipkin_host --zipkin_port`. Use
+ `--runtime-metrics-dir` / `--runtime-metrics-prefix` instead for a different
+ metrics flavor.
+
+## Conventions used in the examples
+
+- Input paths assume you invoke `retriever` from the `nv-ingest/nemo_retriever`
+ directory (or point at absolute paths).
+- `--save-intermediate
` writes the extraction DataFrame as Parquet for
+ inspection. LanceDB output goes to `--lancedb-uri` (defaults to `./lancedb`).
+- `--store-images-uri ` stores extracted images to a local path or an
+ fsspec URI (e.g. `s3://bucket/prefix`).
+- `--run-mode inprocess` skips Ray and is ideal for single-file demos and CI;
+ `--run-mode batch` (the default) uses Ray Data for throughput.
+
+Run `retriever pipeline run --help` for the authoritative flag list.
diff --git a/nemo_retriever/docs/cli/benchmarking.md b/nemo_retriever/docs/cli/benchmarking.md
new file mode 100644
index 000000000..8f8bad19c
--- /dev/null
+++ b/nemo_retriever/docs/cli/benchmarking.md
@@ -0,0 +1,98 @@
+# Benchmarking with the `retriever` CLI
+
+This page is the `retriever`-CLI counterpart to
+`nv-ingest/docs/docs/extraction/benchmarking.md` and
+`nv-ingest/tools/harness/README.md`.
+
+The old benchmarking workflow is driven by `tools/harness` and
+`uv run nv-ingest-harness-run`. The `retriever` CLI exposes the harness (and
+per-stage micro-benchmarks) as first-class subcommands, so you can run
+benchmarks without `uv run` or a separate harness repo.
+
+## Harness (end-to-end benchmarks)
+
+Old:
+
+```bash
+cd tools/harness
+uv sync
+uv run nv-ingest-harness-run --case=e2e --dataset=bo767
+uv run nv-ingest-harness-run --case=e2e --dataset=/path/to/your/data
+```
+
+New — the harness is a subcommand on the main CLI (full parity):
+
+```bash
+retriever harness run --case=e2e --dataset=bo767
+retriever harness run --case=e2e --dataset=/path/to/your/data
+```
+
+Related commands (browse with `--help`):
+
+```bash
+retriever harness --help # run, sweep, nightly, summary, compare
+retriever harness run --help
+retriever harness sweep --help
+retriever harness nightly --help
+retriever harness summary --help
+retriever harness compare --help
+```
+
+### Harness with image / text storage
+
+Old:
+
+```bash
+retriever harness run --dataset bo20 --preset single_gpu \
+ --override store_images_uri=stored_images --override store_text=true
+```
+
+New (unchanged — this form is already the `retriever` CLI):
+
+```bash
+retriever harness run --dataset bo20 --preset single_gpu \
+ --override store_images_uri=stored_images --override store_text=true
+```
+
+When `store_images_uri` is a relative path it resolves to
+`artifact_dir/stored_images/` per run; absolute paths and fsspec URIs
+(e.g. `s3://bucket/prefix`) are passed through unchanged.
+
+## Per-stage micro-benchmarks
+
+The new CLI also exposes stage-level throughput benchmarks that had no direct
+counterpart in `nv-ingest-cli`:
+
+```bash
+retriever benchmark --help # split, extract, audio-extract, page-elements, ocr, all
+retriever benchmark split --help
+retriever benchmark extract --help
+retriever benchmark audio-extract --help
+retriever benchmark page-elements --help
+retriever benchmark ocr --help
+retriever benchmark all --help
+```
+
+Example — benchmark the PDF extraction actor:
+
+```bash
+retriever benchmark extract ./data/pdf_corpus \
+ --pdf-extract-batch-size 8 \
+ --pdf-extract-actors 4
+```
+
+Each benchmark reports rows/sec (or chunk rows/sec for audio) for its actor.
+Use these when you want focused numbers for a single stage instead of an
+end-to-end run.
+
+## Parity notes
+
+- The harness use-cases in the old docs (`--case=e2e`, `--dataset=bo767`,
+ `--dataset=/path/...`, `--override ...`) are preserved verbatim — only the
+ launcher changes (`retriever harness run …` instead of
+ `uv run nv-ingest-harness-run …`).
+- If you have a repo-local `uv` environment, `uv run retriever harness run …`
+ still works.
+- Stage benchmarks (`retriever benchmark …`) are net-new relative to the old
+ `nv-ingest-cli` examples — they are the recommended way to profile
+ individual actors before tuning `pipeline run` flags.
diff --git a/nemo_retriever/docs/cli/cli-client-usage.md b/nemo_retriever/docs/cli/cli-client-usage.md
new file mode 100644
index 000000000..2ebdb43d0
--- /dev/null
+++ b/nemo_retriever/docs/cli/cli-client-usage.md
@@ -0,0 +1,126 @@
+# `retriever` CLI — Client-Usage Walk-through
+
+This page is the `retriever`-CLI counterpart to
+`nv-ingest/client/client_examples/examples/cli_client_usage.ipynb`.
+
+The original notebook walks through `nv-ingest-cli` by:
+
+1. Printing `--help`.
+2. Submitting a single PDF with `extract + dedup + filter` tasks.
+3. Submitting a dataset of PDFs with the same task set.
+
+The equivalent `retriever` workflow is shown below. You can drop these cells
+into a new notebook (e.g. `retriever_client_usage.ipynb`) alongside the old
+one.
+
+## 1. Help
+
+```bash
+retriever --help
+retriever pipeline run --help
+```
+
+Top-level `--help` lists the subcommand tree; `pipeline run --help` shows the
+ingest-specific flags you will actually use in this walk-through.
+
+## 2. Submit a single PDF
+
+Old notebook cell:
+
+```bash
+nv-ingest-cli \
+ --doc ${SAMPLE_PDF0} \
+ --task='extract:{"document_type": "pdf", "extract_method": "pdfium", "extract_text": true, "extract_images": true, "extract_tables": true, "extract_tables_method": "yolox"}' \
+ --task='dedup:{"content_type": "image", "filter": true}' \
+ --task='filter:{"content_type": "image", "min_size": 128, "max_aspect_ratio": 5.0, "min_aspect_ratio": 0.2, "filter": true}' \
+ --client_host=${REDIS_HOST} \
+ --client_port=${REDIS_PORT} \
+ --output_directory=${OUTPUT_DIRECTORY_SINGLE}
+```
+
+New:
+
+```bash
+retriever pipeline run "${SAMPLE_PDF0}" \
+ --input-type pdf \
+ --method pdfium \
+ --extract-text --extract-tables --extract-charts \
+ --dedup --dedup-iou-thres 0.45 \
+ --store-images-uri "${OUTPUT_DIRECTORY_SINGLE}/images" \
+ --strip-base64 \
+ --save-intermediate "${OUTPUT_DIRECTORY_SINGLE}"
+```
+
+### Parity notes
+
+- `extract_tables_method:"yolox"` is not a CLI selector — the pipeline picks
+ its table/structure detectors automatically. Tables are still extracted.
+- `dedup:{content_type:"image", filter:true}` maps to `--dedup` (with
+ `--dedup-iou-thres` for the IoU threshold).
+- `filter:{content_type:"image", min_size, min/max_aspect_ratio, filter:true}`
+ **has no parity.** There is no image scale/aspect-ratio filter in the
+ `retriever` CLI today. If that matters, drop to the Python API or keep the
+ old `nv-ingest-cli` for that example.
+- `extract_images:true` is implicitly satisfied by `--store-images-uri`
+ (images are extracted and persisted to the URI).
+
+## 3. Submit a dataset of PDFs
+
+Old notebook cell:
+
+```bash
+nv-ingest-cli \
+ --dataset ${BATCH_FILE} \
+ --task='extract:{"document_type": "pdf", "extract_method": "pdfium", "extract_text": true, "extract_images": true, "extract_tables": true, "extract_tables_method": "yolox"}' \
+ --task='dedup:{"content_type": "image", "filter": true}' \
+ --task='filter:{"content_type": "image", "min_size": 128, "max_aspect_ratio": 5.0, "min_aspect_ratio": 0.2, "filter": true}' \
+ --client_host=${REDIS_HOST} \
+ --client_port=${REDIS_PORT} \
+ --output_directory=${OUTPUT_DIRECTORY_BATCH}
+```
+
+New — point `retriever` at a directory of PDFs instead of a dataset JSON:
+
+```bash
+# Assume $PDF_DIR is a directory holding your batch of PDFs.
+retriever pipeline run "${PDF_DIR}" \
+ --input-type pdf \
+ --method pdfium \
+ --extract-text --extract-tables --extract-charts \
+ --dedup --dedup-iou-thres 0.45 \
+ --store-images-uri "${OUTPUT_DIRECTORY_BATCH}/images" \
+ --strip-base64 \
+ --save-intermediate "${OUTPUT_DIRECTORY_BATCH}"
+```
+
+### Parity notes
+
+- The `dataset.json` (`sampled_files`) format and `gen_dataset.py` sampler
+ are not reproduced. Materialize a directory (or glob) containing the files
+ you want to process.
+- The `--shuffle_dataset` knob is not present; set Ray block / batch sizes
+ via `--pdf-split-batch`, `--pdf-split-batch-size`, etc. for throughput.
+
+## 4. Inspect results
+
+```python
+import pyarrow.parquet as pq
+import lancedb
+
+# Parquet extraction dumps written by --save-intermediate:
+df = pq.read_table(OUTPUT_DIRECTORY_BATCH).to_pandas()
+print(df[["source_id", "text", "content_type"]].head())
+
+# LanceDB rows (default table name "nv-ingest"):
+db = lancedb.connect("./lancedb")
+tbl = db.open_table("nv-ingest")
+print(tbl.to_pandas().head())
+```
+
+## Migration summary
+
+| Old notebook cell | New `retriever` form | Parity |
+|-------------------|----------------------|--------|
+| `!nv-ingest-cli --help` | `!retriever --help` (plus `retriever pipeline run --help`) | Full |
+| Single-file extract + dedup + filter | `retriever pipeline run … --dedup …` | Partial — no image-size/aspect filter, `extract_tables_method` auto-selected |
+| Dataset extract + dedup + filter | `retriever pipeline run …` | Partial — no `dataset.json` loader; use a directory |
diff --git a/nemo_retriever/docs/cli/pdf-blueprint.md b/nemo_retriever/docs/cli/pdf-blueprint.md
new file mode 100644
index 000000000..a82788201
--- /dev/null
+++ b/nemo_retriever/docs/cli/pdf-blueprint.md
@@ -0,0 +1,92 @@
+# PDF Blueprint — `retriever` CLI Replacement
+
+This page is the `retriever`-CLI counterpart to the CLI cell in
+`nv-ingest/deploy/pdf-blueprint.ipynb`.
+
+## Original blueprint cell
+
+```bash
+nv-ingest-cli \
+ --doc nv-ingest/data/multimodal_test.pdf \
+ --output_directory ./processed_docs \
+ --task='extract:{"document_type": "pdf", "extract_method": "pdfium", "extract_tables": "true", "extract_images": "true", "extract_charts": "true"}' \
+ --client_host=host.docker.internal \
+ --client_port=7670
+```
+
+This submits the blueprint's multimodal sample PDF to the running ingest
+service and asks for text + tables + charts + images.
+
+## `retriever` equivalent
+
+```bash
+retriever pipeline run nv-ingest/data/multimodal_test.pdf \
+ --input-type pdf \
+ --method pdfium \
+ --extract-text --extract-tables --extract-charts \
+ --store-images-uri ./processed_docs/images \
+ --strip-base64 \
+ --save-intermediate ./processed_docs
+```
+
+### What you get (end-user outcome)
+
+- The same multimodal content (text, table markdown, chart descriptions,
+ extracted images) is produced.
+- Text / table / chart rows land in LanceDB at `./lancedb/nv-ingest.lance`.
+- Parquet extraction rows are written under `./processed_docs/`.
+- Extracted images are written under `./processed_docs/images/`, referenced by
+ `content_url` in the row metadata.
+
+### Notebook-friendly form
+
+To keep the notebook self-contained, prefix the shell cell with `!`:
+
+```bash
+!retriever pipeline run nv-ingest/data/multimodal_test.pdf \
+ --input-type pdf \
+ --method pdfium \
+ --extract-text --extract-tables --extract-charts \
+ --store-images-uri ./processed_docs/images \
+ --strip-base64 \
+ --save-intermediate ./processed_docs
+```
+
+And inspect the results in the next cell:
+
+```python
+import pyarrow.parquet as pq
+import lancedb
+
+df = pq.read_table("./processed_docs").to_pandas()
+print(df[["source_id", "content_type"]].value_counts())
+
+db = lancedb.connect("./lancedb")
+tbl = db.open_table("nv-ingest")
+print(tbl.to_pandas().head())
+```
+
+## Migrating the blueprint `pip install` cell
+
+The blueprint also installs `nv-ingest-client==25.9.0`. For the `retriever`
+path, install `nemo-retriever` instead (see `nemo_retriever/README.md` for
+current pinned versions):
+
+```bash
+pip install "nemo-retriever==26.3.0" \
+ nv-ingest-client==26.3.0 nv-ingest==26.3.0 nv-ingest-api==26.3.0 \
+ pymilvus[bulk_writer,model] \
+ minio \
+ tritonclient \
+ langchain_milvus
+```
+
+## Parity notes
+
+- `client_host=host.docker.internal` / `client_port=7670` are irrelevant here:
+ `retriever pipeline run` is in-process, so the blueprint no longer needs a
+ running `nv-ingest-ms-runtime` container for the CLI cell.
+- If you still want the blueprint to hit a live service (for example to
+ exercise the REST API), replace the CLI cell with a `retriever online serve`
+ container plus `retriever online stream-pdf` for per-page NDJSON output.
+ Note that `retriever online submit` is currently a stub.
diff --git a/nemo_retriever/docs/cli/pdf-split-tuning.md b/nemo_retriever/docs/cli/pdf-split-tuning.md
new file mode 100644
index 000000000..5b2625ef4
--- /dev/null
+++ b/nemo_retriever/docs/cli/pdf-split-tuning.md
@@ -0,0 +1,68 @@
+# PDF Page-Batch Tuning with the `retriever` CLI
+
+This page is the `retriever`-CLI counterpart to the CLI example in
+`nv-ingest/docs/docs/extraction/v2-api-guide.md`.
+
+## Background
+
+The old V2 API introduced per-request PDF splitting via `--api_version v2 --pdf_split_page_count N`:
+PDFs larger than `N` pages were split and processed in parallel inside the
+service.
+
+`retriever` has **no V1/V2 API concept** — the graph pipeline is always the
+current generation. Page-batch parallelism is tuned directly on the pipeline
+via `--pdf-split-batch` (page-batch granularity) and `--pdf-split-batch-size`
+(Ray block size).
+
+## Replacement example
+
+Old (V2 API with 64 pages per chunk):
+
+```bash
+nv-ingest-cli \
+ --api_version v2 \
+ --pdf_split_page_count 64 \
+ --doc large_document.pdf \
+ --task 'extract:{"document_type":"pdf", "extract_text":true}' \
+ --output_directory ./results
+```
+
+New:
+
+```bash
+retriever pipeline run large_document.pdf \
+ --input-type pdf \
+ --method pdfium \
+ --extract-text --no-extract-tables --no-extract-charts \
+ --pdf-split-batch 64 \
+ --save-intermediate ./results
+```
+
+### What `--pdf-split-batch` does
+
+`--pdf-split-batch` controls how many pages are grouped into one batch passed
+downstream to the OCR / page-elements / extraction actors. Smaller values give
+more parallelism but more overhead; larger values amortize overhead but limit
+concurrency — the same trade-off the V2 `--pdf_split_page_count` flag exposed.
+
+For very wide fan-out you can also increase the number of concurrent actors:
+
+```bash
+retriever pipeline run large_document.pdf \
+ --input-type pdf \
+ --pdf-split-batch 64 \
+ --page-elements-actors 4 \
+ --ocr-actors 4 \
+ --embed-actors 2 \
+ --save-intermediate ./results
+```
+
+## Parity notes
+
+- The pipeline always runs through the current ingest graph — there is no
+ `--api_version` flag.
+- `--pdf-split-batch` is a batching knob on the local Ray pipeline, not a
+ server-side splitter. End-user outcome (parallel extraction of a large PDF)
+ is preserved.
+- Use `retriever pipeline run --help` to see related tuning flags
+ (`--pdf-split-batch-size`, `--pdf-extract-batch-size`, etc.).
diff --git a/nemo_retriever/docs/cli/quickstart.md b/nemo_retriever/docs/cli/quickstart.md
new file mode 100644
index 000000000..0d2a35554
--- /dev/null
+++ b/nemo_retriever/docs/cli/quickstart.md
@@ -0,0 +1,83 @@
+# Quick Start — `retriever` CLI
+
+This page is the `retriever`-CLI counterpart to the CLI section of
+`nv-ingest/docs/docs/extraction/quickstart-guide.md`.
+
+## Replacement for the quickstart CLI example
+
+The original quickstart example submits a single PDF to the running service
+and asks for text, tables, charts, and images:
+
+```bash
+nv-ingest-cli \
+ --doc ./data/multimodal_test.pdf \
+ --output_directory ./processed_docs \
+ --task='extract:{"document_type": "pdf", "extract_method": "pdfium", "extract_tables": "true", "extract_images": "true", "extract_charts": "true"}' \
+ --client_host=localhost \
+ --client_port=7670
+```
+
+The `retriever` equivalent runs the full pipeline locally — extraction,
+embedding, and LanceDB upload — and produces the same multimodal outputs:
+
+```bash
+retriever pipeline run ./data/multimodal_test.pdf \
+ --input-type pdf \
+ --method pdfium \
+ --extract-text --extract-tables --extract-charts \
+ --store-images-uri ./processed_docs/images \
+ --save-intermediate ./processed_docs
+```
+
+### What you get
+
+- Extracted text, table markdown, and chart descriptions as rows in the
+ LanceDB table at `./lancedb/nv-ingest.lance` (default `--lancedb-uri`).
+- Per-document extraction rows as Parquet under `./processed_docs/` (from
+ `--save-intermediate`).
+- Extracted images on disk under `./processed_docs/images/` (from
+ `--store-images-uri`). The `content_url` column points at these paths.
+- Progress, timing, and stage-level logs on stderr.
+
+### Inspect the results
+
+```bash
+ls ./processed_docs
+ls ./processed_docs/images
+ls ./lancedb
+```
+
+For programmatic access:
+
+```python
+import pyarrow.parquet as pq
+import lancedb
+
+df = pq.read_table("./processed_docs").to_pandas()
+print(df.head())
+
+db = lancedb.connect("./lancedb")
+tbl = db.open_table("nv-ingest")
+print(tbl.to_pandas().head())
+```
+
+Or query via the Retriever Python client (same workflow as the library
+quickstart in `nemo_retriever/README.md`):
+
+```python
+from nemo_retriever.retriever import Retriever
+
+retriever = Retriever(lancedb_uri="lancedb", lancedb_table="nv-ingest", top_k=5)
+hits = retriever.query(
+ "Given their activities, which animal is responsible for the typos?"
+)
+```
+
+## Notes on running larger datasets
+
+- Pass a directory for batch ingestion:
+ `retriever pipeline run ./data/pdf_corpus --input-type pdf …`.
+- For faster throughput on a multi-GPU node, keep `--run-mode batch` (default,
+ Ray-based) and tune `--pdf-split-batch`, `--embed-actors`,
+ `--embed-batch-size`, `--ocr-actors`, and `--page-elements-actors`.
+- For debugging or CI, use `--run-mode inprocess` to avoid starting Ray.
diff --git a/nemo_retriever/docs/cli/retriever_cli.md b/nemo_retriever/docs/cli/retriever_cli.md
new file mode 100644
index 000000000..59f7bc801
--- /dev/null
+++ b/nemo_retriever/docs/cli/retriever_cli.md
@@ -0,0 +1,265 @@
+# Use the Retriever Command Line Interface
+
+This page is the `retriever`-CLI counterpart to
+`nv-ingest/docs/docs/extraction/nv-ingest_cli.md` and
+`nv-ingest/docs/docs/extraction/cli-reference.md`.
+
+The original `nv-ingest-cli` docs remain valid for service-based ingestion.
+The examples below show how to obtain the same end-user outcomes with the new
+`retriever` Typer app (the `retriever` executable installed with the
+`nemo-retriever` package).
+
+> **Shape difference.** `nv-ingest-cli` is a single command that submits
+> `--task ...` definitions to a running REST service. `retriever` is a
+> multi-subcommand app. The main replacement for document ingestion is
+> `retriever pipeline run INPUT_PATH`, which runs the full graph pipeline
+> locally (in-process or Ray) and writes rows to LanceDB + optional Parquet.
+
+To check the installed version:
+
+```bash
+retriever --version
+```
+
+To list the top-level subcommands:
+
+```bash
+retriever --help
+```
+
+To see the full `pipeline run` flag surface (the closest analogue to the old
+single-command CLI):
+
+```bash
+retriever pipeline run --help
+```
+
+## Examples
+
+### Example: Text / PDF file — extract with defaults
+
+Old (`nv-ingest-cli`, returns full metadata over the service):
+
+```bash
+nv-ingest-cli \
+ --doc ./data/test.pdf \
+ --client_host=localhost \
+ --client_port=7670
+```
+
+New (`retriever`, local pipeline; text, tables, and charts are extracted by
+default):
+
+```bash
+retriever pipeline run ./data/test.pdf \
+ --input-type pdf \
+ --run-mode inprocess \
+ --save-intermediate ./processed_docs
+```
+
+**Parity note.** Results are written to LanceDB (`./lancedb` by default) and,
+with `--save-intermediate`, to a Parquet file under `./processed_docs`.
+The old per-content-type `*.metadata.json` tree is not produced; use
+`pyarrow.parquet.read_table` or LanceDB queries to inspect rows.
+
+### Example: PDF with splitting only
+
+The old `--task='split'` example submitted a split-only job to the service:
+
+```bash
+nv-ingest-cli \
+ --doc ./data/test.pdf \
+ --output_directory ./processed_docs \
+ --task='split' \
+ --client_host=localhost \
+ --client_port=7670
+```
+
+With `retriever`, splitting is intrinsic to the pipeline. Control text-level
+chunking via `--text-chunk`, and control PDF page-batch sizing via
+`--pdf-split-batch`:
+
+```bash
+retriever pipeline run ./data/test.pdf \
+ --input-type pdf \
+ --no-extract-tables --no-extract-charts \
+ --text-chunk --text-chunk-max-tokens 512 --text-chunk-overlap-tokens 64 \
+ --save-intermediate ./processed_docs
+```
+
+**Parity note.** There is no "split-only, no extraction" mode. If you only
+care about chunk boundaries, run the pipeline with extraction narrowed to text
+and inspect the resulting chunks.
+
+### Example: PDF with splitting and extraction
+
+Old:
+
+```bash
+nv-ingest-cli \
+ --doc ./data/test.pdf \
+ --output_directory ./processed_docs \
+ --task='extract:{"document_type": "pdf", "extract_method": "pdfium"}' \
+ --task='extract:{"document_type": "docx", "extract_method": "python_docx"}' \
+ --task='split' \
+ --client_host=localhost \
+ --client_port=7670
+```
+
+New — run once per input type (PDF and docx/pptx use different `--input-type`):
+
+```bash
+retriever pipeline run ./data/test.pdf \
+ --input-type pdf \
+ --method pdfium \
+ --text-chunk --text-chunk-max-tokens 512 \
+ --save-intermediate ./processed_docs
+```
+
+```bash
+retriever pipeline run ./data/test.docx \
+ --input-type doc \
+ --text-chunk --text-chunk-max-tokens 512 \
+ --save-intermediate ./processed_docs
+```
+
+**Parity note.** `--input-type doc` matches `*.docx` and `*.pptx`
+(see `src/nemo_retriever/pipeline/__main__.py::_resolve_file_patterns`). Mixed
+PDF + docx in a single invocation is not supported; invoke once per type.
+
+### Example: PDF with a custom page-batch size
+
+Old:
+
+```bash
+nv-ingest-cli \
+ --doc ./data/test.pdf \
+ --output_directory ./processed_docs \
+ --task='extract:{"document_type": "pdf", "extract_method": "pdfium", "extract_text": "true"}' \
+ --pdf_split_page_count 64 \
+ --api_version v2 \
+ --client_host=localhost \
+ --client_port=7670
+```
+
+New:
+
+```bash
+retriever pipeline run ./data/test.pdf \
+ --input-type pdf \
+ --method pdfium \
+ --extract-text --no-extract-tables --no-extract-charts \
+ --pdf-split-batch 64 \
+ --save-intermediate ./processed_docs
+```
+
+**Parity note.** There is no v1/v2 concept in `retriever`; `--pdf-split-batch`
+is the batching knob used by the Ray pipeline.
+
+### Example: Caption images
+
+Old:
+
+```bash
+nv-ingest-cli \
+ --doc ./data/test.pdf \
+ --task='extract:{"document_type": "pdf", "extract_method": "pdfium", "extract_images": "true"}' \
+ --task='caption:{"prompt": "Caption the content of this image:", "reasoning": true}' \
+ --client_host=localhost \
+ --client_port=7670
+```
+
+New:
+
+```bash
+retriever pipeline run ./data/test.pdf \
+ --input-type pdf \
+ --method pdfium \
+ --caption \
+ --caption-model-name nvidia/NVIDIA-Nemotron-Nano-VL-8B-V2 \
+ --caption-invoke-url https://integrate.api.nvidia.com/v1/chat/completions \
+ --api-key "${NVIDIA_API_KEY}" \
+ --store-images-uri ./processed_docs/images \
+ --save-intermediate ./processed_docs
+```
+
+**Parity gaps.**
+
+- The `reasoning: true` option is not exposed as a CLI flag.
+- A custom `prompt` is not exposed either; the caption stage uses its default
+ prompt. For prompt or reasoning control, drop to the Python API
+ (`nemo_retriever.ingestor.Ingestor.caption(...)`).
+- If you do not set a caption endpoint / local GPU profile, the caption stage
+ is skipped at runtime — matching the old behavior.
+
+### Example: Process a directory of documents
+
+Old (dataset file with sampled entries):
+
+```bash
+nv-ingest-cli \
+ --dataset dataset.json \
+ --output_directory ./processed_docs \
+ --task='extract:{"document_type": "pdf", "extract_method": "pdfium"}' \
+ --client_host=localhost \
+ --client_port=7670
+```
+
+New (point `retriever` at a directory; it globs files for the given
+`--input-type`):
+
+```bash
+retriever pipeline run ./data/pdf_corpus \
+ --input-type pdf \
+ --method pdfium \
+ --save-intermediate ./processed_docs
+```
+
+**Parity gap.** The `dataset.json` (`sampled_files`) schema and
+`gen_dataset.py` sampler are not reproduced. Materialize a directory (or glob)
+that contains the files you want.
+
+### Example: Upload extracted images to object storage
+
+Old: extraction + a MinIO upload configured on the service side.
+
+```bash
+nv-ingest-cli \
+ --doc ./data/test.pdf \
+ --output_directory ./processed_docs \
+ --task='extract:{"document_type": "pdf", "extract_method": "pdfium"}' \
+ --client_host=localhost \
+ --client_port=7670
+```
+
+New — use the first-class `--store-images-uri` flag (local path, `s3://…`, or
+any fsspec URI):
+
+```bash
+retriever pipeline run ./data/test.pdf \
+ --input-type pdf \
+ --method pdfium \
+ --store-images-uri s3://my-bucket/images \
+ --strip-base64 \
+ --save-intermediate ./processed_docs
+```
+
+**Parity note.** `--strip-base64` removes inline base64 from rows after the
+image has been persisted, so downstream consumers follow `content_url`
+references — analogous to `--save_images_separately` on the old CLI.
+
+## Where results live
+
+- **LanceDB table.** `--lancedb-uri lancedb` (default). Query via
+ `retriever recall vdb-recall …` or the `nemo_retriever.retriever.Retriever`
+ Python class.
+- **Parquet (optional).** `--save-intermediate ` writes the extraction
+ DataFrame for inspection.
+- **Images (optional).** `--store-images-uri ` writes extracted images to
+ a local path or object store.
+
+## Errors and exit codes
+
+`retriever pipeline run` exits **0** on success, **non-zero** on Typer
+validation errors (bad `--input-type`, missing `INPUT_PATH`, etc.) or pipeline
+failures. Use `--debug` or `--log-file ` for detailed diagnostics.
diff --git a/nemo_retriever/docs/cli/smoke-test.md b/nemo_retriever/docs/cli/smoke-test.md
new file mode 100644
index 000000000..e5b5f4cd0
--- /dev/null
+++ b/nemo_retriever/docs/cli/smoke-test.md
@@ -0,0 +1,162 @@
+# Multi-format Smoke Test with the `retriever` CLI
+
+This page is the `retriever`-CLI counterpart to
+`nv-ingest/api/api_tests/smoke_test.sh`.
+
+The original script loops over formats (pdf, jpeg, png, tiff, bmp, wav, pptx,
+docx) and submits each file to the running ingest service via
+`nv-ingest-cli`, counting PASS/FAIL and printing a table.
+
+With `retriever`, each format is driven by a single `--input-type` dispatch.
+The same `exit_code` logic works — `retriever` returns non-zero on failure.
+
+## Replacement script
+
+```bash
+#!/bin/bash
+# smoke_test_retriever.sh — multi-format parity smoke test for `retriever`.
+
+set -u
+
+SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
+BASE_DIR="$( dirname "$SCRIPT_DIR" )"
+
+OUTPUT_DIR="./processed/smoke_test_retriever"
+DATA_DIR="${BASE_DIR}/../data"
+mkdir -p "$OUTPUT_DIR"
+
+LOG_DIR="./extraction_logs_retriever"
+mkdir -p "$LOG_DIR"
+LOG_FILE="$LOG_DIR/extraction_test_$(date +%Y%m%d_%H%M%S).log"
+TEMP_LOG_DIR="$LOG_DIR/temp_logs"
+mkdir -p "$TEMP_LOG_DIR"
+
+echo "====== Multimodal Extraction Test (retriever) - $(date) ======" | tee -a "$LOG_FILE"
+
+run_extraction() {
+ local file_ext=$1
+ local input_type=$2
+ local test_file="$DATA_DIR/multimodal_test.$file_ext"
+ local temp_log="$TEMP_LOG_DIR/${file_ext}_extraction.log"
+ local result_file="$TEMP_LOG_DIR/${file_ext}_result"
+
+ {
+ echo "====== Testing $file_ext (input-type=$input_type) ======"
+ echo "File: $test_file"
+ echo "Started at: $(date)"
+
+ if [ ! -f "$test_file" ]; then
+ echo "ERROR: File $test_file not found. Skipping test."
+ echo "1" > "$result_file"
+ return 1
+ fi
+
+ start_time=$(date +%s)
+
+ retriever pipeline run "$test_file" \
+ --input-type "$input_type" \
+ --run-mode inprocess \
+ --extract-text --extract-tables --extract-charts --extract-infographics \
+ --save-intermediate "$OUTPUT_DIR/${file_ext}" \
+ --lancedb-uri "$OUTPUT_DIR/lancedb_${file_ext}" \
+ 2>&1
+
+ exit_code=$?
+ end_time=$(date +%s)
+ duration=$((end_time - start_time))
+
+ echo "Exit code: $exit_code"
+ echo "Duration: $duration seconds"
+ echo "--------------------------------------------------"
+
+ echo "$exit_code" > "$result_file"
+ } > "$temp_log" 2>&1
+
+ exit_code=$(cat "$result_file")
+ return $exit_code
+}
+
+declare -A formats=(
+ ["pdf"]="pdf"
+ ["jpeg"]="image"
+ ["png"]="image"
+ ["tiff"]="image"
+ ["bmp"]="image"
+ ["wav"]="audio"
+ ["pptx"]="doc"
+ ["docx"]="doc"
+)
+
+total_formats=0
+successful_formats=0
+declare -A test_results=()
+declare -A test_durations=()
+
+for ext in "${!formats[@]}"; do
+ input_type="${formats[$ext]}"
+ ((total_formats++))
+
+ run_extraction "$ext" "$input_type" &
+ pid=$!
+ pids+=($pid)
+ format_pids["$pid"]="$ext"
+done
+
+for pid in "${pids[@]}"; do
+ wait $pid
+ exit_code=$?
+ format="${format_pids[$pid]}"
+
+ temp_log="$TEMP_LOG_DIR/${format}_extraction.log"
+ duration="N/A"
+ if [ -f "$temp_log" ]; then
+ duration=$(grep "Duration:" "$temp_log" | awk '{print $2}')
+ test_durations["$format"]="$duration"
+ fi
+
+ if [ $exit_code -eq 0 ]; then
+ ((successful_formats++))
+ test_results["$format"]="PASS"
+ else
+ test_results["$format"]="FAIL"
+ echo "====== FAILED: $format extraction ======" | tee -a "$LOG_FILE"
+ cat "$temp_log" | tee -a "$LOG_FILE"
+ fi
+done
+
+echo "====== SUMMARY ======" | tee -a "$LOG_FILE"
+echo "Total formats tested: $total_formats" | tee -a "$LOG_FILE"
+echo "Successful extractions: $successful_formats" | tee -a "$LOG_FILE"
+echo "Failed extractions: $((total_formats - successful_formats))" | tee -a "$LOG_FILE"
+
+if [ $successful_formats -eq $total_formats ]; then
+ echo "All tests passed successfully!"
+ exit 0
+else
+ exit 1
+fi
+```
+
+## Format-to-`--input-type` mapping
+
+| File extension | `--input-type` |
+|----------------|----------------|
+| `.pdf` | `pdf` |
+| `.jpg`, `.jpeg`, `.png`, `.tiff`, `.bmp` | `image` |
+| `.mp3`, `.wav`, `.m4a` | `audio` |
+| `.docx`, `.pptx` | `doc` |
+| `.txt` | `txt` |
+| `.html` | `html` |
+
+(See `nemo_retriever/src/nemo_retriever/pipeline/__main__.py::_resolve_file_patterns`.)
+
+## Parity notes
+
+- The old script required a running ingest service at `localhost:7670`. The
+ `retriever` run is self-contained — no service needed. This is usually an
+ improvement for CI smoke tests.
+- PASS / FAIL semantics and the per-format log layout are preserved.
+- `--run-mode inprocess` is recommended for smoke tests: it skips Ray startup
+ and gives a cleaner failure mode.
+- Each format gets its own `--lancedb-uri` so parallel runs do not contend on
+ the same LanceDB directory.
diff --git a/nemo_retriever/src/nemo_retriever/adapters/cli/main.py b/nemo_retriever/src/nemo_retriever/adapters/cli/main.py
index 1d2ad5bfc..4603980d3 100644
--- a/nemo_retriever/src/nemo_retriever/adapters/cli/main.py
+++ b/nemo_retriever/src/nemo_retriever/adapters/cli/main.py
@@ -17,6 +17,7 @@
from nemo_retriever.local import app as local_app
from nemo_retriever.online import __main__ as online_main
from nemo_retriever.pdf import app as pdf_app
+from nemo_retriever.pipeline import __main__ as pipeline_main
from nemo_retriever.recall import app as recall_app
from nemo_retriever.txt import __main__ as txt_main
from nemo_retriever.vector_store import app as vector_store_app
@@ -37,6 +38,7 @@
app.add_typer(txt_main.app, name="txt")
app.add_typer(html_main.app, name="html")
app.add_typer(online_main.app, name="online")
+app.add_typer(pipeline_main.app, name="pipeline")
def _version_callback(value: bool) -> None:
diff --git a/nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py b/nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py
index 3e1091778..30bd29b53 100644
--- a/nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py
+++ b/nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py
@@ -2,767 +2,27 @@
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
-"""Graph-based ingestion pipeline using the operator graph API directly.
+"""Backward-compat shim for the graph ingestion pipeline.
-This script uses :class:`~nemo_retriever.graph_ingestor.GraphIngestor` which
-builds an operator :class:`~nemo_retriever.graph.Graph` via
-:func:`~nemo_retriever.graph.ingestor_runtime.build_graph` (Ray Data)
-or :func:`~nemo_retriever.graph.ingestor_runtime.build_inprocess_graph`
-(single-process pandas) and then calls the appropriate executor.
+The implementation was moved to :mod:`nemo_retriever.pipeline` and is exposed
+as the ``retriever pipeline run`` CLI subcommand.
-Run with::
+This module re-exports the same Typer :data:`app` and keeps the
+``python -m nemo_retriever.examples.graph_pipeline `` entry point
+working so existing callers (notably
+:mod:`nemo_retriever.harness.run`) do not need to change.
- source /opt/retriever_runtime/bin/activate
- python -m nemo_retriever.examples.graph_pipeline [OPTIONS]
+New code should invoke the pipeline via one of the following:
-Examples::
-
- # Batch mode (Ray) with PDF extraction + embedding
- python -m nemo_retriever.examples.graph_pipeline /data/pdfs \\
- --run-mode batch \\
- --embed-invoke-url http://localhost:8000/v1
-
- # In-process mode (no Ray) for quick local testing
- python -m nemo_retriever.examples.graph_pipeline /data/pdfs \\
- --run-mode inprocess \\
- --ocr-invoke-url http://localhost:9000/v1
-
- # Save extraction Parquet for full-page markdown (e.g. page index + export)
- python -m nemo_retriever.examples.graph_pipeline /data/pdfs \\
- --lancedb-uri lancedb \\
- --save-intermediate /path/to/extracted_parquet_dir
+* ``retriever pipeline run [OPTIONS]``
+* ``python -m nemo_retriever.pipeline [OPTIONS]``
+* ``from nemo_retriever.pipeline import app`` (Typer app) or
+ ``from nemo_retriever.pipeline import run`` (command callable)
"""
from __future__ import annotations
-import json
-import logging
-import os
-import time
-from pathlib import Path
-from typing import Optional, TextIO
-
-import typer
-
-from nemo_retriever.audio import asr_params_from_env
-from nemo_retriever.graph_ingestor import GraphIngestor
-from nemo_retriever.params import AudioChunkParams
-from nemo_retriever.params import CaptionParams
-from nemo_retriever.params import DedupParams
-from nemo_retriever.params import EmbedParams
-from nemo_retriever.params import ExtractParams
-from nemo_retriever.params import StoreParams
-from nemo_retriever.params import TextChunkParams
-from nemo_retriever.model import VL_EMBED_MODEL, VL_RERANK_MODEL
-from nemo_retriever.params.models import BatchTuningParams
-from nemo_retriever.utils.input_files import resolve_input_patterns
-from nemo_retriever.utils.remote_auth import resolve_remote_api_key
-from nemo_retriever.vector_store.lancedb_store import handle_lancedb
-
-logger = logging.getLogger(__name__)
-app = typer.Typer()
-
-LANCEDB_URI = "lancedb"
-LANCEDB_TABLE = "nv-ingest"
-
-
-# ---------------------------------------------------------------------------
-# Logging helpers
-# ---------------------------------------------------------------------------
-
-
-class _TeeStream:
- def __init__(self, primary: TextIO, mirror: TextIO) -> None:
- self._primary = primary
- self._mirror = mirror
-
- def write(self, data: str) -> int:
- self._primary.write(data)
- self._mirror.write(data)
- return len(data)
-
- def flush(self) -> None:
- self._primary.flush()
- self._mirror.flush()
-
- def isatty(self) -> bool:
- return bool(getattr(self._primary, "isatty", lambda: False)())
-
- def fileno(self) -> int:
- return int(getattr(self._primary, "fileno")())
-
- def writable(self) -> bool:
- return bool(getattr(self._primary, "writable", lambda: True)())
-
- @property
- def encoding(self) -> str:
- return str(getattr(self._primary, "encoding", "utf-8"))
-
-
-def _configure_logging(log_file: Optional[Path], *, debug: bool = False) -> tuple[Optional[TextIO], TextIO, TextIO]:
- original_stdout = os.sys.stdout
- original_stderr = os.sys.stderr
- log_level = logging.DEBUG if debug else logging.INFO
- if log_file is None:
- logging.basicConfig(
- level=log_level,
- format="%(asctime)s %(levelname)s %(name)s: %(message)s",
- force=True,
- )
- return None, original_stdout, original_stderr
-
- target = Path(log_file).expanduser().resolve()
- target.parent.mkdir(parents=True, exist_ok=True)
- fh = open(target, "a", encoding="utf-8", buffering=1)
- os.sys.stdout = _TeeStream(os.sys.__stdout__, fh)
- os.sys.stderr = _TeeStream(os.sys.__stderr__, fh)
- logging.basicConfig(
- level=log_level,
- format="%(asctime)s %(levelname)s %(name)s: %(message)s",
- handlers=[logging.StreamHandler(os.sys.stdout)],
- force=True,
- )
- logger.info("Writing combined pipeline logs to %s", str(target))
- return fh, original_stdout, original_stderr
-
-
-def _ensure_lancedb_table(uri: str, table_name: str) -> None:
- from nemo_retriever.vector_store.lancedb_utils import lancedb_schema
- import lancedb
- import pyarrow as pa
-
- Path(uri).mkdir(parents=True, exist_ok=True)
- db = lancedb.connect(uri)
- try:
- db.open_table(table_name)
- return
- except Exception:
- pass
- schema = lancedb_schema()
- empty = pa.table({f.name: [] for f in schema}, schema=schema)
- db.create_table(table_name, data=empty, schema=schema, mode="create")
-
-
-def _write_runtime_summary(
- runtime_metrics_dir: Optional[Path],
- runtime_metrics_prefix: Optional[str],
- payload: dict[str, object],
-) -> None:
- if runtime_metrics_dir is None and not runtime_metrics_prefix:
- return
-
- target_dir = Path(runtime_metrics_dir or Path.cwd()).expanduser().resolve()
- target_dir.mkdir(parents=True, exist_ok=True)
- prefix = (runtime_metrics_prefix or "run").strip() or "run"
- target = target_dir / f"{prefix}.runtime.summary.json"
- target.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
-
-
-def _count_input_units(result_df) -> int:
- if "source_id" in result_df.columns:
- return int(result_df["source_id"].nunique())
- if "source_path" in result_df.columns:
- return int(result_df["source_path"].nunique())
- return int(len(result_df.index))
-
-
-def _resolve_file_patterns(input_path: Path, input_type: str) -> list[str]:
- import glob as _glob
-
- input_path = Path(input_path)
- if input_path.is_file():
- return [str(input_path)]
- if not input_path.is_dir():
- raise typer.BadParameter(f"Path does not exist: {input_path}")
-
- if input_type not in {"pdf", "doc", "txt", "html", "image", "audio"}:
- raise typer.BadParameter(f"Unsupported --input-type: {input_type!r}")
-
- patterns = resolve_input_patterns(input_path, input_type)
- matched = [p for p in patterns if _glob.glob(p, recursive=True)]
- if not matched:
- raise typer.BadParameter(f"No files found for input_type={input_type!r} in {input_path}")
- return matched
-
-
-# ---------------------------------------------------------------------------
-# CLI command
-# ---------------------------------------------------------------------------
-
-
-@app.command()
-def main(
- ctx: typer.Context,
- input_path: Path = typer.Argument(
- ...,
- help="File or directory of documents to ingest.",
- path_type=Path,
- ),
- run_mode: str = typer.Option(
- "batch",
- "--run-mode",
- help="Execution mode: 'batch' (Ray Data) or 'inprocess' (pandas, no Ray).",
- ),
- debug: bool = typer.Option(False, "--debug/--no-debug", help="Enable debug-level logging."),
- dpi: int = typer.Option(300, "--dpi", min=72, help="Render DPI for PDF page images."),
- input_type: str = typer.Option(
- "pdf", "--input-type", help="Input type: 'pdf', 'doc', 'txt', 'html', 'image', or 'audio'."
- ),
- method: str = typer.Option("pdfium", "--method", help="PDF text extraction method."),
- extract_text: bool = typer.Option(True, "--extract-text/--no-extract-text"),
- extract_tables: bool = typer.Option(True, "--extract-tables/--no-extract-tables"),
- extract_charts: bool = typer.Option(True, "--extract-charts/--no-extract-charts"),
- extract_infographics: bool = typer.Option(False, "--extract-infographics/--no-extract-infographics"),
- extract_page_as_image: bool = typer.Option(True, "--extract-page-as-image/--no-extract-page-as-image"),
- use_graphic_elements: bool = typer.Option(False, "--use-graphic-elements"),
- use_table_structure: bool = typer.Option(False, "--use-table-structure"),
- table_output_format: Optional[str] = typer.Option(None, "--table-output-format"),
- # Remote endpoints
- api_key: Optional[str] = typer.Option(None, "--api-key", help="Bearer token for remote NIM endpoints."),
- page_elements_invoke_url: Optional[str] = typer.Option(None, "--page-elements-invoke-url"),
- ocr_invoke_url: Optional[str] = typer.Option(None, "--ocr-invoke-url"),
- graphic_elements_invoke_url: Optional[str] = typer.Option(None, "--graphic-elements-invoke-url"),
- table_structure_invoke_url: Optional[str] = typer.Option(None, "--table-structure-invoke-url"),
- embed_invoke_url: Optional[str] = typer.Option(None, "--embed-invoke-url"),
- # Embedding
- embed_model_name: str = typer.Option(VL_EMBED_MODEL, "--embed-model-name"),
- embed_modality: str = typer.Option("text", "--embed-modality"),
- embed_granularity: str = typer.Option("element", "--embed-granularity"),
- text_elements_modality: Optional[str] = typer.Option(None, "--text-elements-modality"),
- structured_elements_modality: Optional[str] = typer.Option(None, "--structured-elements-modality"),
- # Dedup / caption
- dedup: Optional[bool] = typer.Option(None, "--dedup/--no-dedup"),
- dedup_iou_threshold: float = typer.Option(0.45, "--dedup-iou-threshold"),
- caption: bool = typer.Option(False, "--caption/--no-caption"),
- caption_invoke_url: Optional[str] = typer.Option(None, "--caption-invoke-url"),
- caption_model_name: str = typer.Option("nvidia/NVIDIA-Nemotron-Nano-12B-v2-VL-BF16", "--caption-model-name"),
- caption_device: Optional[str] = typer.Option(None, "--caption-device"),
- caption_context_text_max_chars: int = typer.Option(0, "--caption-context-text-max-chars"),
- caption_gpu_memory_utilization: float = typer.Option(0.5, "--caption-gpu-memory-utilization"),
- caption_gpus_per_actor: Optional[float] = typer.Option(None, "--caption-gpus-per-actor", max=1.0),
- caption_temperature: float = typer.Option(1.0, "--caption-temperature"),
- caption_top_p: Optional[float] = typer.Option(None, "--caption-top-p"),
- caption_max_tokens: int = typer.Option(1024, "--caption-max-tokens"),
- # Text chunking
- store_images_uri: Optional[str] = typer.Option(
- None, "--store-images-uri", help="Store extracted images to this URI."
- ),
- store_text: bool = typer.Option(False, "--store-text/--no-store-text", help="Also store extracted text."),
- strip_base64: bool = typer.Option(True, "--strip-base64/--no-strip-base64", help="Strip base64 after storing."),
- text_chunk: bool = typer.Option(False, "--text-chunk"),
- text_chunk_max_tokens: Optional[int] = typer.Option(None, "--text-chunk-max-tokens"),
- text_chunk_overlap_tokens: Optional[int] = typer.Option(None, "--text-chunk-overlap-tokens"),
- # Ray / batch tuning
- # NOTE: *_gpus_per_actor defaults are None (not 0.0) so we can distinguish
- # "not set → use heuristic" from "explicitly 0 → no GPU". Other tuning
- # defaults use 0/0.0 because those values are never valid explicit choices.
- ray_address: Optional[str] = typer.Option(None, "--ray-address"),
- ray_log_to_driver: bool = typer.Option(True, "--ray-log-to-driver/--no-ray-log-to-driver"),
- ocr_actors: Optional[int] = typer.Option(0, "--ocr-actors"),
- ocr_batch_size: Optional[int] = typer.Option(0, "--ocr-batch-size"),
- ocr_cpus_per_actor: Optional[float] = typer.Option(0.0, "--ocr-cpus-per-actor"),
- ocr_gpus_per_actor: Optional[float] = typer.Option(None, "--ocr-gpus-per-actor", max=1.0),
- page_elements_actors: Optional[int] = typer.Option(0, "--page-elements-actors"),
- page_elements_batch_size: Optional[int] = typer.Option(0, "--page-elements-batch-size"),
- page_elements_cpus_per_actor: Optional[float] = typer.Option(0.0, "--page-elements-cpus-per-actor"),
- page_elements_gpus_per_actor: Optional[float] = typer.Option(None, "--page-elements-gpus-per-actor", max=1.0),
- embed_actors: Optional[int] = typer.Option(0, "--embed-actors"),
- embed_batch_size: Optional[int] = typer.Option(0, "--embed-batch-size"),
- embed_cpus_per_actor: Optional[float] = typer.Option(0.0, "--embed-cpus-per-actor"),
- embed_gpus_per_actor: Optional[float] = typer.Option(None, "--embed-gpus-per-actor", max=1.0),
- pdf_split_batch_size: int = typer.Option(1, "--pdf-split-batch-size", min=1),
- pdf_extract_batch_size: Optional[int] = typer.Option(0, "--pdf-extract-batch-size"),
- pdf_extract_tasks: Optional[int] = typer.Option(0, "--pdf-extract-tasks"),
- pdf_extract_cpus_per_task: Optional[float] = typer.Option(0.0, "--pdf-extract-cpus-per-task"),
- nemotron_parse_actors: Optional[int] = typer.Option(0, "--nemotron-parse-actors"),
- nemotron_parse_gpus_per_actor: Optional[float] = typer.Option(
- None, "--nemotron-parse-gpus-per-actor", min=0.0, max=1.0
- ),
- nemotron_parse_batch_size: Optional[int] = typer.Option(0, "--nemotron-parse-batch-size"),
- # LanceDB / evaluation
- lancedb_uri: str = typer.Option(LANCEDB_URI, "--lancedb-uri"),
- save_intermediate: Optional[Path] = typer.Option(
- None,
- "--save-intermediate",
- help="Directory to write extraction results as Parquet (for full-page markdown / page index).",
- path_type=Path,
- file_okay=False,
- dir_okay=True,
- ),
- hybrid: bool = typer.Option(False, "--hybrid/--no-hybrid"),
- query_csv: Path = typer.Option("./data/bo767_query_gt.csv", "--query-csv", path_type=Path),
- recall_match_mode: str = typer.Option("pdf_page", "--recall-match-mode"),
- audio_match_tolerance_secs: float = typer.Option(2.0, "--audio-match-tolerance-secs", min=0.0),
- segment_audio: bool = typer.Option(False, "--segment-audio/--no-segment-audio"),
- audio_split_type: str = typer.Option("size", "--audio-split-type"),
- audio_split_interval: int = typer.Option(500000, "--audio-split-interval", min=1),
- evaluation_mode: str = typer.Option("recall", "--evaluation-mode"),
- reranker: Optional[bool] = typer.Option(False, "--reranker/--no-reranker"),
- reranker_model_name: str = typer.Option(VL_RERANK_MODEL, "--reranker-model-name"),
- beir_loader: Optional[str] = typer.Option(None, "--beir-loader"),
- beir_dataset_name: Optional[str] = typer.Option(None, "--beir-dataset-name"),
- beir_split: str = typer.Option("test", "--beir-split"),
- beir_query_language: Optional[str] = typer.Option(None, "--beir-query-language"),
- beir_doc_id_field: str = typer.Option("pdf_basename", "--beir-doc-id-field"),
- beir_k: list[int] = typer.Option([], "--beir-k"),
- recall_details: bool = typer.Option(True, "--recall-details/--no-recall-details"),
- runtime_metrics_dir: Optional[Path] = typer.Option(None, "--runtime-metrics-dir", path_type=Path),
- runtime_metrics_prefix: Optional[str] = typer.Option(None, "--runtime-metrics-prefix"),
- detection_summary_file: Optional[Path] = typer.Option(None, "--detection-summary-file", path_type=Path),
- log_file: Optional[Path] = typer.Option(None, "--log-file", path_type=Path, dir_okay=False),
-) -> None:
- _ = ctx
- log_handle, original_stdout, original_stderr = _configure_logging(log_file, debug=bool(debug))
- try:
- if run_mode not in {"batch", "inprocess"}:
- raise ValueError(f"Unsupported --run-mode: {run_mode!r}")
- if recall_match_mode not in {"pdf_page", "pdf_only", "audio_segment"}:
- raise ValueError(f"Unsupported --recall-match-mode: {recall_match_mode!r}")
- if audio_split_type not in {"size", "time", "frame"}:
- raise ValueError(f"Unsupported --audio-split-type: {audio_split_type!r}")
- if evaluation_mode not in {"recall", "beir"}:
- raise ValueError(f"Unsupported --evaluation-mode: {evaluation_mode!r}")
-
- if run_mode == "batch":
- os.environ["RAY_LOG_TO_DRIVER"] = "1" if ray_log_to_driver else "0"
-
- lancedb_uri = str(Path(lancedb_uri).expanduser().resolve())
- _ensure_lancedb_table(lancedb_uri, LANCEDB_TABLE)
-
- remote_api_key = resolve_remote_api_key(api_key)
- extract_remote_api_key = remote_api_key
- embed_remote_api_key = remote_api_key
- caption_remote_api_key = remote_api_key
-
- # Warn if remote URLs configured without an API key
- if (
- any(
- (
- page_elements_invoke_url,
- ocr_invoke_url,
- graphic_elements_invoke_url,
- table_structure_invoke_url,
- embed_invoke_url,
- )
- )
- and remote_api_key is None
- ):
- logger.warning("Remote endpoint URL(s) were configured without an API key.")
-
- # Zero out GPU fractions when a remote URL replaces the local model
- if page_elements_invoke_url and float(page_elements_gpus_per_actor or 0.0) != 0.0:
- logger.warning("Forcing page-elements GPUs to 0.0 because --page-elements-invoke-url is set.")
- page_elements_gpus_per_actor = 0.0
- if ocr_invoke_url and float(ocr_gpus_per_actor or 0.0) != 0.0:
- logger.warning("Forcing OCR GPUs to 0.0 because --ocr-invoke-url is set.")
- ocr_gpus_per_actor = 0.0
- if embed_invoke_url and float(embed_gpus_per_actor or 0.0) != 0.0:
- logger.warning("Forcing embed GPUs to 0.0 because --embed-invoke-url is set.")
- embed_gpus_per_actor = 0.0
-
- file_patterns = _resolve_file_patterns(Path(input_path), input_type)
-
- # ------------------------------------------------------------------
- # Build extraction params
- # ------------------------------------------------------------------
- extract_batch_tuning = BatchTuningParams(
- **{
- k: v
- for k, v in {
- "pdf_split_batch_size": pdf_split_batch_size,
- "pdf_extract_batch_size": pdf_extract_batch_size or None,
- "pdf_extract_workers": pdf_extract_tasks or None,
- "pdf_extract_num_cpus": pdf_extract_cpus_per_task or None,
- "page_elements_batch_size": page_elements_batch_size or None,
- "page_elements_workers": page_elements_actors or None,
- "page_elements_cpus_per_actor": page_elements_cpus_per_actor or None,
- "gpu_page_elements": (
- 0.0
- if page_elements_invoke_url
- else (page_elements_gpus_per_actor if page_elements_gpus_per_actor is not None else None)
- ),
- "ocr_inference_batch_size": ocr_batch_size or None,
- "ocr_workers": ocr_actors or None,
- "ocr_cpus_per_actor": ocr_cpus_per_actor or None,
- "gpu_ocr": (
- 0.0 if ocr_invoke_url else (ocr_gpus_per_actor if ocr_gpus_per_actor is not None else None)
- ),
- "nemotron_parse_batch_size": nemotron_parse_batch_size or None,
- "nemotron_parse_workers": nemotron_parse_actors or None,
- "gpu_nemotron_parse": (
- nemotron_parse_gpus_per_actor if nemotron_parse_gpus_per_actor is not None else None
- ),
- }.items()
- if v is not None
- }
- )
- extract_params = ExtractParams(
- **{
- k: v
- for k, v in {
- "method": method,
- "dpi": int(dpi),
- "extract_text": extract_text,
- "extract_tables": extract_tables,
- "extract_charts": extract_charts,
- "extract_infographics": extract_infographics,
- "extract_page_as_image": extract_page_as_image,
- "api_key": extract_remote_api_key,
- "page_elements_invoke_url": page_elements_invoke_url,
- "ocr_invoke_url": ocr_invoke_url,
- "graphic_elements_invoke_url": graphic_elements_invoke_url,
- "table_structure_invoke_url": table_structure_invoke_url,
- "use_graphic_elements": use_graphic_elements,
- "use_table_structure": use_table_structure,
- "table_output_format": table_output_format,
- "inference_batch_size": page_elements_batch_size or None,
- "batch_tuning": extract_batch_tuning,
- }.items()
- if v is not None
- }
- )
-
- # ------------------------------------------------------------------
- # Build embedding params
- # ------------------------------------------------------------------
- embed_batch_tuning = BatchTuningParams(
- **{
- k: v
- for k, v in {
- "embed_batch_size": embed_batch_size or None,
- "embed_workers": embed_actors or None,
- "embed_cpus_per_actor": embed_cpus_per_actor or None,
- "gpu_embed": (
- 0.0
- if embed_invoke_url
- else (embed_gpus_per_actor if embed_gpus_per_actor is not None else None)
- ),
- }.items()
- if v is not None
- }
- )
- embed_params = EmbedParams(
- **{
- k: v
- for k, v in {
- "model_name": embed_model_name,
- "embed_invoke_url": embed_invoke_url,
- "api_key": embed_remote_api_key,
- "embed_modality": embed_modality,
- "text_elements_modality": text_elements_modality,
- "structured_elements_modality": structured_elements_modality,
- "embed_granularity": embed_granularity,
- "batch_tuning": embed_batch_tuning,
- "inference_batch_size": embed_batch_size or None,
- }.items()
- if v is not None
- }
- )
- text_chunk_params = TextChunkParams(
- max_tokens=text_chunk_max_tokens or 1024,
- overlap_tokens=text_chunk_overlap_tokens if text_chunk_overlap_tokens is not None else 150,
- )
-
- # ------------------------------------------------------------------
- # Build GraphIngestor and configure pipeline stages
- # ------------------------------------------------------------------
- logger.info("Building graph pipeline (run_mode=%s) for %s ...", run_mode, input_path)
-
- node_overrides = {}
- if caption_gpus_per_actor is not None:
- node_overrides["CaptionActor"] = {"num_gpus": caption_gpus_per_actor}
-
- ingestor = GraphIngestor(run_mode=run_mode, ray_address=ray_address, node_overrides=node_overrides or None)
- ingestor = ingestor.files(file_patterns)
-
- # Extraction stage
- if input_type == "txt":
- ingestor = ingestor.extract_txt(text_chunk_params)
- elif input_type == "html":
- ingestor = ingestor.extract_html(text_chunk_params)
- elif input_type == "image":
- ingestor = ingestor.extract_image_files(extract_params)
- elif input_type == "audio":
- asr_params = asr_params_from_env().model_copy(update={"segment_audio": bool(segment_audio)})
- ingestor = ingestor.extract_audio(
- params=AudioChunkParams(split_type=audio_split_type, split_interval=int(audio_split_interval)),
- asr_params=asr_params,
- )
- else:
- # "pdf" or "doc"
- ingestor = ingestor.extract(extract_params)
-
- # Optional post-extraction stages
- enable_text_chunk = text_chunk or text_chunk_max_tokens is not None or text_chunk_overlap_tokens is not None
- if enable_text_chunk:
- ingestor = ingestor.split(text_chunk_params)
-
- enable_caption = caption or caption_invoke_url is not None
- enable_dedup = dedup if dedup is not None else enable_caption
- if enable_dedup:
- ingestor = ingestor.dedup(DedupParams(iou_threshold=dedup_iou_threshold))
-
- if enable_caption:
- ingestor = ingestor.caption(
- CaptionParams(
- endpoint_url=caption_invoke_url,
- api_key=caption_remote_api_key,
- model_name=caption_model_name,
- device=caption_device,
- context_text_max_chars=caption_context_text_max_chars,
- gpu_memory_utilization=caption_gpu_memory_utilization,
- temperature=caption_temperature,
- top_p=caption_top_p,
- max_tokens=caption_max_tokens,
- )
- )
-
- if store_images_uri is not None:
- ingestor = ingestor.store(
- StoreParams(
- storage_uri=store_images_uri,
- store_text=store_text,
- strip_base64=strip_base64,
- )
- )
-
- ingestor = ingestor.embed(embed_params)
-
- # ------------------------------------------------------------------
- # Execute the graph via the executor
- # ------------------------------------------------------------------
- logger.info("Starting ingestion of %s ...", input_path)
- ingest_start = time.perf_counter()
-
- # GraphIngestor.ingest() builds the Graph, creates the executor,
- # and calls executor.ingest(file_patterns) returning:
- # batch mode -> materialized ray.data.Dataset
- # inprocess mode -> pandas.DataFrame
- result = ingestor.ingest()
-
- ingestion_only_total_time = time.perf_counter() - ingest_start
-
- # ------------------------------------------------------------------
- # Collect results
- # ------------------------------------------------------------------
- if run_mode == "batch":
- import ray
-
- ray_download_start = time.perf_counter()
- ingest_local_results = result.take_all()
- ray_download_time = time.perf_counter() - ray_download_start
-
- import pandas as pd
-
- result_df = pd.DataFrame(ingest_local_results)
- num_rows = _count_input_units(result_df)
- else:
- import pandas as pd
-
- result_df = result
- ingest_local_results = result_df.to_dict("records")
- ray_download_time = 0.0
- num_rows = _count_input_units(result_df)
-
- if save_intermediate is not None:
- out_dir = Path(save_intermediate).expanduser().resolve()
- out_dir.mkdir(parents=True, exist_ok=True)
- out_path = out_dir / "extraction.parquet"
- result_df.to_parquet(out_path, index=False)
- logger.info("Wrote extraction Parquet for intermediate use: %s", out_path)
-
- if detection_summary_file is not None:
- from nemo_retriever.utils.detection_summary import (
- collect_detection_summary_from_df,
- write_detection_summary,
- )
-
- write_detection_summary(
- Path(detection_summary_file),
- collect_detection_summary_from_df(result_df),
- )
-
- # ------------------------------------------------------------------
- # Write to LanceDB
- # ------------------------------------------------------------------
- lancedb_write_start = time.perf_counter()
- handle_lancedb(ingest_local_results, lancedb_uri, LANCEDB_TABLE, hybrid=hybrid, mode="overwrite")
- lancedb_write_time = time.perf_counter() - lancedb_write_start
-
- # ------------------------------------------------------------------
- # Recall / BEIR evaluation
- # ------------------------------------------------------------------
- import lancedb as _lancedb_mod
-
- db = _lancedb_mod.connect(lancedb_uri)
- table = db.open_table(LANCEDB_TABLE)
-
- if int(table.count_rows()) == 0:
- logger.warning("LanceDB table is empty; skipping %s evaluation.", evaluation_mode)
- _write_runtime_summary(
- runtime_metrics_dir,
- runtime_metrics_prefix,
- {
- "run_mode": run_mode,
- "input_path": str(Path(input_path).resolve()),
- "input_pages": int(num_rows),
- "num_pages": int(num_rows),
- "num_rows": int(len(result_df.index)),
- "ingestion_only_secs": float(ingestion_only_total_time),
- "ray_download_secs": float(ray_download_time),
- "lancedb_write_secs": float(lancedb_write_time),
- "evaluation_secs": 0.0,
- "total_secs": float(time.perf_counter() - ingest_start),
- "evaluation_mode": evaluation_mode,
- "evaluation_metrics": {},
- "recall_details": bool(recall_details),
- "lancedb_uri": str(lancedb_uri),
- "lancedb_table": str(LANCEDB_TABLE),
- },
- )
- if run_mode == "batch":
- ray.shutdown()
- return
-
- from nemo_retriever.model import resolve_embed_model
- from nemo_retriever.utils.detection_summary import print_run_summary
-
- _recall_model = resolve_embed_model(str(embed_model_name))
- evaluation_label = "Recall"
- evaluation_total_time = 0.0
- evaluation_metrics: dict[str, float] = {}
- evaluation_query_count: Optional[int] = None
-
- if evaluation_mode == "beir":
- if not beir_loader:
- raise ValueError("--beir-loader is required when --evaluation-mode=beir")
- if not beir_dataset_name:
- raise ValueError("--beir-dataset-name is required when --evaluation-mode=beir")
-
- from nemo_retriever.recall.beir import BeirConfig, evaluate_lancedb_beir
-
- cfg = BeirConfig(
- lancedb_uri=str(lancedb_uri),
- lancedb_table=str(LANCEDB_TABLE),
- embedding_model=_recall_model,
- loader=str(beir_loader),
- dataset_name=str(beir_dataset_name),
- split=str(beir_split),
- query_language=beir_query_language,
- doc_id_field=str(beir_doc_id_field),
- ks=tuple(beir_k) if beir_k else (1, 3, 5, 10),
- embedding_http_endpoint=embed_invoke_url,
- embedding_api_key=embed_remote_api_key or "",
- hybrid=hybrid,
- reranker=bool(reranker),
- reranker_model_name=str(reranker_model_name),
- )
- evaluation_start = time.perf_counter()
- beir_dataset, _raw_hits, _run, evaluation_metrics = evaluate_lancedb_beir(cfg)
- evaluation_total_time = time.perf_counter() - evaluation_start
- evaluation_label = "BEIR"
- evaluation_query_count = len(beir_dataset.query_ids)
- else:
- query_csv_path = Path(query_csv)
- if not query_csv_path.exists():
- logger.warning("Query CSV not found at %s; skipping recall evaluation.", query_csv_path)
- _write_runtime_summary(
- runtime_metrics_dir,
- runtime_metrics_prefix,
- {
- "run_mode": run_mode,
- "input_path": str(Path(input_path).resolve()),
- "input_pages": int(num_rows),
- "num_pages": int(num_rows),
- "num_rows": int(len(result_df.index)),
- "ingestion_only_secs": float(ingestion_only_total_time),
- "ray_download_secs": float(ray_download_time),
- "lancedb_write_secs": float(lancedb_write_time),
- "evaluation_secs": 0.0,
- "total_secs": float(time.perf_counter() - ingest_start),
- "evaluation_mode": evaluation_mode,
- "evaluation_metrics": {},
- "recall_details": bool(recall_details),
- "lancedb_uri": str(lancedb_uri),
- "lancedb_table": str(LANCEDB_TABLE),
- },
- )
- if run_mode == "batch":
- ray.shutdown()
- return
-
- from nemo_retriever.recall.core import RecallConfig, retrieve_and_score
-
- recall_cfg = RecallConfig(
- lancedb_uri=str(lancedb_uri),
- lancedb_table=str(LANCEDB_TABLE),
- embedding_model=_recall_model,
- embedding_http_endpoint=embed_invoke_url,
- embedding_api_key=embed_remote_api_key or "",
- top_k=10,
- ks=(1, 5, 10),
- hybrid=hybrid,
- match_mode=recall_match_mode,
- audio_match_tolerance_secs=float(audio_match_tolerance_secs),
- reranker=reranker_model_name if reranker else None,
- embed_modality=embed_modality,
- )
- evaluation_start = time.perf_counter()
- _df_query, _gold, _raw_hits, _retrieved_keys, evaluation_metrics = retrieve_and_score(
- query_csv=query_csv_path, cfg=recall_cfg
- )
- evaluation_total_time = time.perf_counter() - evaluation_start
- evaluation_query_count = len(_df_query.index)
-
- total_time = time.perf_counter() - ingest_start
-
- _write_runtime_summary(
- runtime_metrics_dir,
- runtime_metrics_prefix,
- {
- "run_mode": run_mode,
- "input_path": str(Path(input_path).resolve()),
- "input_pages": int(num_rows),
- "num_pages": int(num_rows),
- "num_rows": int(len(result_df.index)),
- "ingestion_only_secs": float(ingestion_only_total_time),
- "ray_download_secs": float(ray_download_time),
- "lancedb_write_secs": float(lancedb_write_time),
- "evaluation_secs": float(evaluation_total_time),
- "total_secs": float(total_time),
- "evaluation_mode": evaluation_mode,
- "evaluation_metrics": dict(evaluation_metrics),
- "evaluation_count": evaluation_query_count,
- "recall_details": bool(recall_details),
- "lancedb_uri": str(lancedb_uri),
- "lancedb_table": str(LANCEDB_TABLE),
- },
- )
-
- if run_mode == "batch":
- ray.shutdown()
-
- print_run_summary(
- num_rows,
- Path(input_path),
- hybrid,
- lancedb_uri,
- LANCEDB_TABLE,
- total_time,
- ingestion_only_total_time,
- ray_download_time,
- lancedb_write_time,
- evaluation_total_time,
- evaluation_metrics,
- evaluation_label=evaluation_label,
- evaluation_count=evaluation_query_count,
- )
- finally:
- os.sys.stdout = original_stdout
- os.sys.stderr = original_stderr
- if log_handle is not None:
- log_handle.close()
-
+from nemo_retriever.pipeline.__main__ import app
if __name__ == "__main__":
app()
diff --git a/nemo_retriever/src/nemo_retriever/pipeline/__init__.py b/nemo_retriever/src/nemo_retriever/pipeline/__init__.py
new file mode 100644
index 000000000..ba321a0ad
--- /dev/null
+++ b/nemo_retriever/src/nemo_retriever/pipeline/__init__.py
@@ -0,0 +1,42 @@
+# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
+# All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+"""End-to-end ingestion pipeline subcommand for the ``retriever`` CLI.
+
+This package wraps :class:`~nemo_retriever.graph_ingestor.GraphIngestor` with a
+Typer application that exposes every knob needed to run a full PDF / doc /
+txt / html / image / audio ingestion job, write results to LanceDB, and
+optionally evaluate recall or BEIR.
+
+It is registered on the ``retriever`` CLI as the ``pipeline`` subcommand::
+
+ retriever pipeline run [OPTIONS]
+
+The implementation historically lived in
+``nemo_retriever/examples/graph_pipeline.py``; that module is now a thin
+backward-compat shim that re-exports the same Typer app from
+:mod:`nemo_retriever.pipeline.__main__`.
+
+``app`` and ``run`` are exposed via lazy attribute access so that
+``python -m nemo_retriever.pipeline`` can import the ``__main__`` module
+cleanly (without a re-import warning).
+"""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+__all__ = ["app", "run"]
+
+
+if TYPE_CHECKING:
+ from .__main__ import app, run # noqa: F401
+
+
+def __getattr__(name: str) -> Any:
+ if name in {"app", "run"}:
+ from . import __main__ as _main
+
+ return getattr(_main, name)
+ raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
diff --git a/nemo_retriever/src/nemo_retriever/pipeline/__main__.py b/nemo_retriever/src/nemo_retriever/pipeline/__main__.py
new file mode 100644
index 000000000..8f28eafc5
--- /dev/null
+++ b/nemo_retriever/src/nemo_retriever/pipeline/__main__.py
@@ -0,0 +1,1084 @@
+# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
+# All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+"""Typer CLI for the end-to-end graph ingestion pipeline.
+
+Registered on the ``retriever`` CLI as the ``pipeline`` subcommand.
+
+Examples::
+
+ # Batch mode (Ray) with PDF extraction + embedding
+ retriever pipeline run /data/pdfs \\
+ --run-mode batch \\
+ --embed-invoke-url http://localhost:8000/v1
+
+ # In-process mode (no Ray) for quick local testing
+ retriever pipeline run /data/pdfs \\
+ --run-mode inprocess \\
+ --ocr-invoke-url http://localhost:9000/v1
+
+ # Save extraction Parquet for full-page markdown (page index / export)
+ retriever pipeline run /data/pdfs \\
+ --lancedb-uri lancedb \\
+ --save-intermediate /path/to/extracted_parquet_dir
+"""
+
+from __future__ import annotations
+
+import glob as _glob
+import json
+import logging
+import os
+import time
+from pathlib import Path
+from typing import Any, Optional, TextIO
+
+import typer
+
+from nemo_retriever.audio import asr_params_from_env
+from nemo_retriever.graph_ingestor import GraphIngestor
+from nemo_retriever.model import VL_EMBED_MODEL, VL_RERANK_MODEL
+from nemo_retriever.params import (
+ AudioChunkParams,
+ CaptionParams,
+ DedupParams,
+ EmbedParams,
+ ExtractParams,
+ StoreParams,
+ TextChunkParams,
+)
+from nemo_retriever.params.models import BatchTuningParams
+from nemo_retriever.utils.remote_auth import resolve_remote_api_key
+from nemo_retriever.vector_store.lancedb_store import handle_lancedb
+
+logger = logging.getLogger(__name__)
+
+app = typer.Typer(help="End-to-end graph-based ingestion pipeline (extract -> embed -> LanceDB).")
+
+LANCEDB_URI = "lancedb"
+LANCEDB_TABLE = "nv-ingest"
+
+# Help panel labels (keep stable so --help groupings read consistently).
+_PANEL_IO = "I/O and Execution"
+_PANEL_EXTRACT = "PDF / Document Extraction"
+_PANEL_REMOTE = "Remote NIM Endpoints"
+_PANEL_EMBED = "Embedding"
+_PANEL_DEDUP_CAPTION = "Dedup and Caption"
+_PANEL_STORE_CHUNK = "Storage and Text Chunking"
+_PANEL_AUDIO = "Audio"
+_PANEL_RAY = "Ray / Batch Tuning"
+_PANEL_LANCEDB = "LanceDB and Outputs"
+_PANEL_EVAL = "Evaluation (Recall / BEIR)"
+_PANEL_OBS = "Observability"
+
+
+# ---------------------------------------------------------------------------
+# Logging helpers
+# ---------------------------------------------------------------------------
+
+
+class _TeeStream:
+ """Mirror stdout/stderr writes into a second stream (e.g. a log file)."""
+
+ def __init__(self, primary: TextIO, mirror: TextIO) -> None:
+ self._primary = primary
+ self._mirror = mirror
+
+ def write(self, data: str) -> int:
+ self._primary.write(data)
+ self._mirror.write(data)
+ return len(data)
+
+ def flush(self) -> None:
+ self._primary.flush()
+ self._mirror.flush()
+
+ def isatty(self) -> bool:
+ return bool(getattr(self._primary, "isatty", lambda: False)())
+
+ def fileno(self) -> int:
+ return int(getattr(self._primary, "fileno")())
+
+ def writable(self) -> bool:
+ return bool(getattr(self._primary, "writable", lambda: True)())
+
+ @property
+ def encoding(self) -> str:
+ return str(getattr(self._primary, "encoding", "utf-8"))
+
+
+def _configure_logging(log_file: Optional[Path], *, debug: bool = False) -> tuple[Optional[TextIO], TextIO, TextIO]:
+ original_stdout = os.sys.stdout
+ original_stderr = os.sys.stderr
+ log_level = logging.DEBUG if debug else logging.INFO
+ if log_file is None:
+ logging.basicConfig(
+ level=log_level,
+ format="%(asctime)s %(levelname)s %(name)s: %(message)s",
+ force=True,
+ )
+ return None, original_stdout, original_stderr
+
+ target = Path(log_file).expanduser().resolve()
+ target.parent.mkdir(parents=True, exist_ok=True)
+ fh = open(target, "a", encoding="utf-8", buffering=1)
+ os.sys.stdout = _TeeStream(os.sys.__stdout__, fh)
+ os.sys.stderr = _TeeStream(os.sys.__stderr__, fh)
+ logging.basicConfig(
+ level=log_level,
+ format="%(asctime)s %(levelname)s %(name)s: %(message)s",
+ handlers=[logging.StreamHandler(os.sys.stdout)],
+ force=True,
+ )
+ logger.info("Writing combined pipeline logs to %s", str(target))
+ return fh, original_stdout, original_stderr
+
+
+# ---------------------------------------------------------------------------
+# Small utilities (LanceDB, summaries, file patterns)
+# ---------------------------------------------------------------------------
+
+
+def _ensure_lancedb_table(uri: str, table_name: str) -> None:
+ from nemo_retriever.vector_store.lancedb_utils import lancedb_schema
+ import lancedb
+ import pyarrow as pa
+
+ Path(uri).mkdir(parents=True, exist_ok=True)
+ db = lancedb.connect(uri)
+ try:
+ db.open_table(table_name)
+ return
+ except ValueError as e:
+ # lancedb has no TableNotFoundError; missing tables raise ValueError, same
+ # substring LanceDB uses internally in db.py for this case.
+ if f"Table '{table_name}' was not found" not in str(e):
+ raise
+ schema = lancedb_schema()
+ empty = pa.table({f.name: [] for f in schema}, schema=schema)
+ db.create_table(table_name, data=empty, schema=schema, mode="create")
+
+
+def _write_runtime_summary(
+ runtime_metrics_dir: Optional[Path],
+ runtime_metrics_prefix: Optional[str],
+ payload: dict[str, object],
+) -> None:
+ if runtime_metrics_dir is None and not runtime_metrics_prefix:
+ return
+
+ target_dir = Path(runtime_metrics_dir or Path.cwd()).expanduser().resolve()
+ target_dir.mkdir(parents=True, exist_ok=True)
+ prefix = (runtime_metrics_prefix or "run").strip() or "run"
+ target = target_dir / f"{prefix}.runtime.summary.json"
+ target.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
+
+
+def _count_input_units(result_df) -> int:
+ if "source_id" in result_df.columns:
+ return int(result_df["source_id"].nunique())
+ if "source_path" in result_df.columns:
+ return int(result_df["source_path"].nunique())
+ return int(len(result_df.index))
+
+
+def _resolve_file_patterns(input_path: Path, input_type: str) -> list[str]:
+ input_path = Path(input_path)
+ if input_path.is_file():
+ return [str(input_path)]
+ if not input_path.is_dir():
+ raise typer.BadParameter(f"Path does not exist: {input_path}")
+
+ ext_map = {
+ "pdf": ["*.pdf"],
+ "doc": ["*.docx", "*.pptx"],
+ "txt": ["*.txt"],
+ "html": ["*.html"],
+ "image": ["*.jpg", "*.jpeg", "*.png", "*.tiff", "*.bmp"],
+ "audio": ["*.mp3", "*.wav", "*.m4a"],
+ }
+ exts = ext_map.get(input_type)
+ if exts is None:
+ raise typer.BadParameter(f"Unsupported --input-type: {input_type!r}")
+
+ patterns = [str(input_path / ext) for ext in exts]
+ matched = [p for p in patterns if _glob.glob(p)]
+ if not matched:
+ raise typer.BadParameter(f"No files found for input_type={input_type!r} in {input_path}")
+ return matched
+
+
+# ---------------------------------------------------------------------------
+# Parameter builders (split out from the old monolithic main())
+# ---------------------------------------------------------------------------
+
+
+def _build_extract_params(
+ *,
+ method: str,
+ dpi: int,
+ extract_text: bool,
+ extract_tables: bool,
+ extract_charts: bool,
+ extract_infographics: bool,
+ extract_page_as_image: bool,
+ use_graphic_elements: bool,
+ use_table_structure: bool,
+ table_output_format: Optional[str],
+ extract_remote_api_key: Optional[str],
+ page_elements_invoke_url: Optional[str],
+ ocr_invoke_url: Optional[str],
+ graphic_elements_invoke_url: Optional[str],
+ table_structure_invoke_url: Optional[str],
+ pdf_split_batch_size: int,
+ pdf_extract_batch_size: Optional[int],
+ pdf_extract_tasks: Optional[int],
+ pdf_extract_cpus_per_task: Optional[float],
+ page_elements_actors: Optional[int],
+ page_elements_batch_size: Optional[int],
+ page_elements_cpus_per_actor: Optional[float],
+ page_elements_gpus_per_actor: Optional[float],
+ ocr_actors: Optional[int],
+ ocr_batch_size: Optional[int],
+ ocr_cpus_per_actor: Optional[float],
+ ocr_gpus_per_actor: Optional[float],
+ nemotron_parse_actors: Optional[int],
+ nemotron_parse_batch_size: Optional[int],
+ nemotron_parse_gpus_per_actor: Optional[float],
+) -> ExtractParams:
+ """Assemble :class:`ExtractParams` plus its :class:`BatchTuningParams`."""
+
+ extract_batch_tuning = BatchTuningParams(
+ **{
+ k: v
+ for k, v in {
+ "pdf_split_batch_size": pdf_split_batch_size,
+ "pdf_extract_batch_size": pdf_extract_batch_size or None,
+ "pdf_extract_workers": pdf_extract_tasks or None,
+ "pdf_extract_num_cpus": pdf_extract_cpus_per_task or None,
+ "page_elements_batch_size": page_elements_batch_size or None,
+ "page_elements_workers": page_elements_actors or None,
+ "page_elements_cpus_per_actor": page_elements_cpus_per_actor or None,
+ "gpu_page_elements": (
+ 0.0
+ if page_elements_invoke_url
+ else (page_elements_gpus_per_actor if page_elements_gpus_per_actor is not None else None)
+ ),
+ "ocr_inference_batch_size": ocr_batch_size or None,
+ "ocr_workers": ocr_actors or None,
+ "ocr_cpus_per_actor": ocr_cpus_per_actor or None,
+ "gpu_ocr": (
+ 0.0 if ocr_invoke_url else (ocr_gpus_per_actor if ocr_gpus_per_actor is not None else None)
+ ),
+ "nemotron_parse_batch_size": nemotron_parse_batch_size or None,
+ "nemotron_parse_workers": nemotron_parse_actors or None,
+ "gpu_nemotron_parse": (
+ nemotron_parse_gpus_per_actor if nemotron_parse_gpus_per_actor is not None else None
+ ),
+ }.items()
+ if v is not None
+ }
+ )
+ return ExtractParams(
+ **{
+ k: v
+ for k, v in {
+ "method": method,
+ "dpi": int(dpi),
+ "extract_text": extract_text,
+ "extract_tables": extract_tables,
+ "extract_charts": extract_charts,
+ "extract_infographics": extract_infographics,
+ "extract_page_as_image": extract_page_as_image,
+ "api_key": extract_remote_api_key,
+ "page_elements_invoke_url": page_elements_invoke_url,
+ "ocr_invoke_url": ocr_invoke_url,
+ "graphic_elements_invoke_url": graphic_elements_invoke_url,
+ "table_structure_invoke_url": table_structure_invoke_url,
+ "use_graphic_elements": use_graphic_elements,
+ "use_table_structure": use_table_structure,
+ "table_output_format": table_output_format,
+ "inference_batch_size": page_elements_batch_size or None,
+ "batch_tuning": extract_batch_tuning,
+ }.items()
+ if v is not None
+ }
+ )
+
+
+def _build_embed_params(
+ *,
+ embed_model_name: str,
+ embed_invoke_url: Optional[str],
+ embed_remote_api_key: Optional[str],
+ embed_modality: str,
+ text_elements_modality: Optional[str],
+ structured_elements_modality: Optional[str],
+ embed_granularity: str,
+ embed_actors: Optional[int],
+ embed_batch_size: Optional[int],
+ embed_cpus_per_actor: Optional[float],
+ embed_gpus_per_actor: Optional[float],
+) -> EmbedParams:
+ """Assemble :class:`EmbedParams` plus its :class:`BatchTuningParams`."""
+
+ embed_batch_tuning = BatchTuningParams(
+ **{
+ k: v
+ for k, v in {
+ "embed_batch_size": embed_batch_size or None,
+ "embed_workers": embed_actors or None,
+ "embed_cpus_per_actor": embed_cpus_per_actor or None,
+ "gpu_embed": (
+ 0.0 if embed_invoke_url else (embed_gpus_per_actor if embed_gpus_per_actor is not None else None)
+ ),
+ }.items()
+ if v is not None
+ }
+ )
+ return EmbedParams(
+ **{
+ k: v
+ for k, v in {
+ "model_name": embed_model_name,
+ "embed_invoke_url": embed_invoke_url,
+ "api_key": embed_remote_api_key,
+ "embed_modality": embed_modality,
+ "text_elements_modality": text_elements_modality,
+ "structured_elements_modality": structured_elements_modality,
+ "embed_granularity": embed_granularity,
+ "batch_tuning": embed_batch_tuning,
+ "inference_batch_size": embed_batch_size or None,
+ }.items()
+ if v is not None
+ }
+ )
+
+
+def _build_ingestor(
+ *,
+ run_mode: str,
+ ray_address: Optional[str],
+ file_patterns: list[str],
+ input_type: str,
+ extract_params: ExtractParams,
+ embed_params: EmbedParams,
+ text_chunk_params: TextChunkParams,
+ enable_text_chunk: bool,
+ enable_dedup: bool,
+ enable_caption: bool,
+ dedup_iou_threshold: float,
+ caption_invoke_url: Optional[str],
+ caption_remote_api_key: Optional[str],
+ caption_model_name: str,
+ caption_device: Optional[str],
+ caption_context_text_max_chars: int,
+ caption_gpu_memory_utilization: float,
+ caption_gpus_per_actor: Optional[float],
+ store_images_uri: Optional[str],
+ store_text: bool,
+ strip_base64: bool,
+ segment_audio: bool,
+ audio_split_type: str,
+ audio_split_interval: int,
+) -> GraphIngestor:
+ """Construct a :class:`GraphIngestor` with all requested stages attached."""
+
+ node_overrides: dict[str, dict[str, Any]] = {}
+ if caption_gpus_per_actor is not None:
+ node_overrides["CaptionActor"] = {"num_gpus": caption_gpus_per_actor}
+
+ ingestor = GraphIngestor(
+ run_mode=run_mode,
+ ray_address=ray_address,
+ node_overrides=node_overrides or None,
+ )
+ ingestor = ingestor.files(file_patterns)
+
+ # Extraction stage is selected by input type.
+ if input_type == "txt":
+ ingestor = ingestor.extract_txt(text_chunk_params)
+ elif input_type == "html":
+ ingestor = ingestor.extract_html(text_chunk_params)
+ elif input_type == "image":
+ ingestor = ingestor.extract_image_files(extract_params)
+ elif input_type == "audio":
+ asr_params = asr_params_from_env().model_copy(update={"segment_audio": bool(segment_audio)})
+ ingestor = ingestor.extract_audio(
+ params=AudioChunkParams(split_type=audio_split_type, split_interval=int(audio_split_interval)),
+ asr_params=asr_params,
+ )
+ else:
+ # "pdf" or "doc"
+ ingestor = ingestor.extract(extract_params)
+
+ if enable_text_chunk:
+ ingestor = ingestor.split(text_chunk_params)
+
+ if enable_dedup:
+ ingestor = ingestor.dedup(DedupParams(iou_threshold=dedup_iou_threshold))
+
+ if enable_caption:
+ ingestor = ingestor.caption(
+ CaptionParams(
+ endpoint_url=caption_invoke_url,
+ api_key=caption_remote_api_key,
+ model_name=caption_model_name,
+ device=caption_device,
+ context_text_max_chars=caption_context_text_max_chars,
+ gpu_memory_utilization=caption_gpu_memory_utilization,
+ )
+ )
+
+ if store_images_uri is not None:
+ ingestor = ingestor.store(
+ StoreParams(
+ storage_uri=store_images_uri,
+ store_text=store_text,
+ strip_base64=strip_base64,
+ )
+ )
+
+ return ingestor.embed(embed_params)
+
+
+def _collect_results(run_mode: str, result: Any) -> tuple[list[dict[str, Any]], Any, float, int]:
+ """Materialize the graph result into a list of records + DataFrame.
+
+ Returns ``(records, result_df, ray_download_secs, num_input_units)``.
+ """
+
+ import pandas as pd
+
+ if run_mode == "batch":
+ ray_download_start = time.perf_counter()
+ records = result.take_all()
+ ray_download_time = time.perf_counter() - ray_download_start
+ result_df = pd.DataFrame(records)
+ else:
+ result_df = result
+ records = result_df.to_dict("records")
+ ray_download_time = 0.0
+
+ return records, result_df, float(ray_download_time), _count_input_units(result_df)
+
+
+def _run_evaluation(
+ *,
+ evaluation_mode: str,
+ lancedb_uri: str,
+ embed_model_name: str,
+ embed_invoke_url: Optional[str],
+ embed_remote_api_key: Optional[str],
+ embed_modality: str,
+ query_csv: Path,
+ recall_match_mode: str,
+ audio_match_tolerance_secs: float,
+ hybrid: bool,
+ reranker: Optional[bool],
+ reranker_model_name: str,
+ beir_loader: Optional[str],
+ beir_dataset_name: Optional[str],
+ beir_split: str,
+ beir_query_language: Optional[str],
+ beir_doc_id_field: str,
+ beir_k: list[int],
+) -> tuple[str, float, dict[str, float], Optional[int], bool]:
+ """Run recall or BEIR evaluation.
+
+ Returns ``(label, elapsed_secs, metrics, query_count, ran)``. When the
+ query CSV is missing in recall mode, ``ran`` is ``False`` and the caller
+ should skip metric recording.
+ """
+
+ from nemo_retriever.model import resolve_embed_model
+
+ embed_model = resolve_embed_model(str(embed_model_name))
+
+ if evaluation_mode == "beir":
+ if not beir_loader:
+ raise ValueError("--beir-loader is required when --evaluation-mode=beir")
+ if not beir_dataset_name:
+ raise ValueError("--beir-dataset-name is required when --evaluation-mode=beir")
+
+ from nemo_retriever.recall.beir import BeirConfig, evaluate_lancedb_beir
+
+ cfg = BeirConfig(
+ lancedb_uri=str(lancedb_uri),
+ lancedb_table=str(LANCEDB_TABLE),
+ embedding_model=embed_model,
+ loader=str(beir_loader),
+ dataset_name=str(beir_dataset_name),
+ split=str(beir_split),
+ query_language=beir_query_language,
+ doc_id_field=str(beir_doc_id_field),
+ ks=tuple(beir_k) if beir_k else (1, 3, 5, 10),
+ embedding_http_endpoint=embed_invoke_url,
+ embedding_api_key=embed_remote_api_key or "",
+ hybrid=hybrid,
+ reranker=bool(reranker),
+ reranker_model_name=str(reranker_model_name),
+ )
+ evaluation_start = time.perf_counter()
+ beir_dataset, _raw_hits, _run, metrics = evaluate_lancedb_beir(cfg)
+ return "BEIR", time.perf_counter() - evaluation_start, metrics, len(beir_dataset.query_ids), True
+
+ # Default: recall eval against a query CSV.
+ query_csv_path = Path(query_csv)
+ if not query_csv_path.exists():
+ logger.warning("Query CSV not found at %s; skipping recall evaluation.", query_csv_path)
+ return "Recall", 0.0, {}, None, False
+
+ from nemo_retriever.recall.core import RecallConfig, retrieve_and_score
+
+ recall_cfg = RecallConfig(
+ lancedb_uri=str(lancedb_uri),
+ lancedb_table=str(LANCEDB_TABLE),
+ embedding_model=embed_model,
+ embedding_http_endpoint=embed_invoke_url,
+ embedding_api_key=embed_remote_api_key or "",
+ top_k=10,
+ ks=(1, 5, 10),
+ hybrid=hybrid,
+ match_mode=recall_match_mode,
+ audio_match_tolerance_secs=float(audio_match_tolerance_secs),
+ reranker=reranker_model_name if reranker else None,
+ embed_modality=embed_modality,
+ )
+ evaluation_start = time.perf_counter()
+ df_query, _gold, _raw_hits, _retrieved_keys, metrics = retrieve_and_score(query_csv=query_csv_path, cfg=recall_cfg)
+ return "Recall", time.perf_counter() - evaluation_start, metrics, len(df_query.index), True
+
+
+# ---------------------------------------------------------------------------
+# Typer command: `retriever pipeline run`
+# ---------------------------------------------------------------------------
+
+
+@app.command("run")
+def run(
+ ctx: typer.Context,
+ input_path: Path = typer.Argument(
+ ...,
+ help="File or directory of documents to ingest.",
+ path_type=Path,
+ ),
+ # --- I/O and execution ------------------------------------------------
+ run_mode: str = typer.Option(
+ "batch",
+ "--run-mode",
+ help="Execution mode: 'batch' (Ray Data) or 'inprocess' (pandas, no Ray).",
+ rich_help_panel=_PANEL_IO,
+ ),
+ input_type: str = typer.Option(
+ "pdf",
+ "--input-type",
+ help="Input type: 'pdf', 'doc', 'txt', 'html', 'image', or 'audio'.",
+ rich_help_panel=_PANEL_IO,
+ ),
+ debug: bool = typer.Option(
+ False, "--debug/--no-debug", help="Enable debug-level logging.", rich_help_panel=_PANEL_IO
+ ),
+ log_file: Optional[Path] = typer.Option(
+ None, "--log-file", path_type=Path, dir_okay=False, rich_help_panel=_PANEL_IO
+ ),
+ # --- PDF / document extraction ---------------------------------------
+ method: str = typer.Option(
+ "pdfium", "--method", help="PDF text extraction method.", rich_help_panel=_PANEL_EXTRACT
+ ),
+ dpi: int = typer.Option(
+ 300, "--dpi", min=72, help="Render DPI for PDF page images.", rich_help_panel=_PANEL_EXTRACT
+ ),
+ extract_text: bool = typer.Option(True, "--extract-text/--no-extract-text", rich_help_panel=_PANEL_EXTRACT),
+ extract_tables: bool = typer.Option(True, "--extract-tables/--no-extract-tables", rich_help_panel=_PANEL_EXTRACT),
+ extract_charts: bool = typer.Option(True, "--extract-charts/--no-extract-charts", rich_help_panel=_PANEL_EXTRACT),
+ extract_infographics: bool = typer.Option(
+ False, "--extract-infographics/--no-extract-infographics", rich_help_panel=_PANEL_EXTRACT
+ ),
+ extract_page_as_image: bool = typer.Option(
+ True,
+ "--extract-page-as-image/--no-extract-page-as-image",
+ rich_help_panel=_PANEL_EXTRACT,
+ ),
+ use_graphic_elements: bool = typer.Option(False, "--use-graphic-elements", rich_help_panel=_PANEL_EXTRACT),
+ use_table_structure: bool = typer.Option(False, "--use-table-structure", rich_help_panel=_PANEL_EXTRACT),
+ table_output_format: Optional[str] = typer.Option(None, "--table-output-format", rich_help_panel=_PANEL_EXTRACT),
+ # --- Remote NIM endpoints --------------------------------------------
+ api_key: Optional[str] = typer.Option(
+ None,
+ "--api-key",
+ help="Bearer token for remote NIM endpoints.",
+ rich_help_panel=_PANEL_REMOTE,
+ ),
+ page_elements_invoke_url: Optional[str] = typer.Option(
+ None, "--page-elements-invoke-url", rich_help_panel=_PANEL_REMOTE
+ ),
+ ocr_invoke_url: Optional[str] = typer.Option(None, "--ocr-invoke-url", rich_help_panel=_PANEL_REMOTE),
+ graphic_elements_invoke_url: Optional[str] = typer.Option(
+ None, "--graphic-elements-invoke-url", rich_help_panel=_PANEL_REMOTE
+ ),
+ table_structure_invoke_url: Optional[str] = typer.Option(
+ None, "--table-structure-invoke-url", rich_help_panel=_PANEL_REMOTE
+ ),
+ embed_invoke_url: Optional[str] = typer.Option(None, "--embed-invoke-url", rich_help_panel=_PANEL_REMOTE),
+ # --- Embedding --------------------------------------------------------
+ embed_model_name: str = typer.Option(VL_EMBED_MODEL, "--embed-model-name", rich_help_panel=_PANEL_EMBED),
+ embed_modality: str = typer.Option("text", "--embed-modality", rich_help_panel=_PANEL_EMBED),
+ embed_granularity: str = typer.Option("element", "--embed-granularity", rich_help_panel=_PANEL_EMBED),
+ text_elements_modality: Optional[str] = typer.Option(
+ None, "--text-elements-modality", rich_help_panel=_PANEL_EMBED
+ ),
+ structured_elements_modality: Optional[str] = typer.Option(
+ None, "--structured-elements-modality", rich_help_panel=_PANEL_EMBED
+ ),
+ # --- Dedup / caption -------------------------------------------------
+ dedup: Optional[bool] = typer.Option(None, "--dedup/--no-dedup", rich_help_panel=_PANEL_DEDUP_CAPTION),
+ dedup_iou_threshold: float = typer.Option(0.45, "--dedup-iou-threshold", rich_help_panel=_PANEL_DEDUP_CAPTION),
+ caption: bool = typer.Option(False, "--caption/--no-caption", rich_help_panel=_PANEL_DEDUP_CAPTION),
+ caption_invoke_url: Optional[str] = typer.Option(
+ None, "--caption-invoke-url", rich_help_panel=_PANEL_DEDUP_CAPTION
+ ),
+ caption_model_name: str = typer.Option(
+ "nvidia/NVIDIA-Nemotron-Nano-12B-v2-VL-BF16",
+ "--caption-model-name",
+ rich_help_panel=_PANEL_DEDUP_CAPTION,
+ ),
+ caption_device: Optional[str] = typer.Option(None, "--caption-device", rich_help_panel=_PANEL_DEDUP_CAPTION),
+ caption_context_text_max_chars: int = typer.Option(
+ 0, "--caption-context-text-max-chars", rich_help_panel=_PANEL_DEDUP_CAPTION
+ ),
+ caption_gpu_memory_utilization: float = typer.Option(
+ 0.5, "--caption-gpu-memory-utilization", rich_help_panel=_PANEL_DEDUP_CAPTION
+ ),
+ caption_gpus_per_actor: Optional[float] = typer.Option(
+ None, "--caption-gpus-per-actor", max=1.0, rich_help_panel=_PANEL_DEDUP_CAPTION
+ ),
+ # --- Storage and text chunking --------------------------------------
+ store_images_uri: Optional[str] = typer.Option(
+ None,
+ "--store-images-uri",
+ help="Store extracted images to this URI.",
+ rich_help_panel=_PANEL_STORE_CHUNK,
+ ),
+ store_text: bool = typer.Option(
+ False,
+ "--store-text/--no-store-text",
+ help="Also store extracted text.",
+ rich_help_panel=_PANEL_STORE_CHUNK,
+ ),
+ strip_base64: bool = typer.Option(
+ True,
+ "--strip-base64/--no-strip-base64",
+ help="Strip base64 after storing.",
+ rich_help_panel=_PANEL_STORE_CHUNK,
+ ),
+ text_chunk: bool = typer.Option(False, "--text-chunk", rich_help_panel=_PANEL_STORE_CHUNK),
+ text_chunk_max_tokens: Optional[int] = typer.Option(
+ None, "--text-chunk-max-tokens", rich_help_panel=_PANEL_STORE_CHUNK
+ ),
+ text_chunk_overlap_tokens: Optional[int] = typer.Option(
+ None, "--text-chunk-overlap-tokens", rich_help_panel=_PANEL_STORE_CHUNK
+ ),
+ # --- Ray / batch tuning ---------------------------------------------
+ # *_gpus_per_actor defaults are None (not 0.0) so we can distinguish
+ # "not set -> use heuristic" from "explicitly 0 -> no GPU". Other tuning
+ # defaults use 0/0.0 because those values are never valid explicit choices.
+ ray_address: Optional[str] = typer.Option(None, "--ray-address", rich_help_panel=_PANEL_RAY),
+ ray_log_to_driver: bool = typer.Option(
+ True, "--ray-log-to-driver/--no-ray-log-to-driver", rich_help_panel=_PANEL_RAY
+ ),
+ ocr_actors: Optional[int] = typer.Option(0, "--ocr-actors", rich_help_panel=_PANEL_RAY),
+ ocr_batch_size: Optional[int] = typer.Option(0, "--ocr-batch-size", rich_help_panel=_PANEL_RAY),
+ ocr_cpus_per_actor: Optional[float] = typer.Option(0.0, "--ocr-cpus-per-actor", rich_help_panel=_PANEL_RAY),
+ ocr_gpus_per_actor: Optional[float] = typer.Option(
+ None, "--ocr-gpus-per-actor", max=1.0, rich_help_panel=_PANEL_RAY
+ ),
+ page_elements_actors: Optional[int] = typer.Option(0, "--page-elements-actors", rich_help_panel=_PANEL_RAY),
+ page_elements_batch_size: Optional[int] = typer.Option(0, "--page-elements-batch-size", rich_help_panel=_PANEL_RAY),
+ page_elements_cpus_per_actor: Optional[float] = typer.Option(
+ 0.0, "--page-elements-cpus-per-actor", rich_help_panel=_PANEL_RAY
+ ),
+ page_elements_gpus_per_actor: Optional[float] = typer.Option(
+ None, "--page-elements-gpus-per-actor", max=1.0, rich_help_panel=_PANEL_RAY
+ ),
+ embed_actors: Optional[int] = typer.Option(0, "--embed-actors", rich_help_panel=_PANEL_RAY),
+ embed_batch_size: Optional[int] = typer.Option(0, "--embed-batch-size", rich_help_panel=_PANEL_RAY),
+ embed_cpus_per_actor: Optional[float] = typer.Option(0.0, "--embed-cpus-per-actor", rich_help_panel=_PANEL_RAY),
+ embed_gpus_per_actor: Optional[float] = typer.Option(
+ None, "--embed-gpus-per-actor", max=1.0, rich_help_panel=_PANEL_RAY
+ ),
+ pdf_split_batch_size: int = typer.Option(1, "--pdf-split-batch-size", min=1, rich_help_panel=_PANEL_RAY),
+ pdf_extract_batch_size: Optional[int] = typer.Option(0, "--pdf-extract-batch-size", rich_help_panel=_PANEL_RAY),
+ pdf_extract_tasks: Optional[int] = typer.Option(0, "--pdf-extract-tasks", rich_help_panel=_PANEL_RAY),
+ pdf_extract_cpus_per_task: Optional[float] = typer.Option(
+ 0.0, "--pdf-extract-cpus-per-task", rich_help_panel=_PANEL_RAY
+ ),
+ nemotron_parse_actors: Optional[int] = typer.Option(0, "--nemotron-parse-actors", rich_help_panel=_PANEL_RAY),
+ nemotron_parse_gpus_per_actor: Optional[float] = typer.Option(
+ None,
+ "--nemotron-parse-gpus-per-actor",
+ min=0.0,
+ max=1.0,
+ rich_help_panel=_PANEL_RAY,
+ ),
+ nemotron_parse_batch_size: Optional[int] = typer.Option(
+ 0, "--nemotron-parse-batch-size", rich_help_panel=_PANEL_RAY
+ ),
+ # --- Audio ----------------------------------------------------------
+ segment_audio: bool = typer.Option(False, "--segment-audio/--no-segment-audio", rich_help_panel=_PANEL_AUDIO),
+ audio_split_type: str = typer.Option("size", "--audio-split-type", rich_help_panel=_PANEL_AUDIO),
+ audio_split_interval: int = typer.Option(500000, "--audio-split-interval", min=1, rich_help_panel=_PANEL_AUDIO),
+ audio_match_tolerance_secs: float = typer.Option(
+ 2.0, "--audio-match-tolerance-secs", min=0.0, rich_help_panel=_PANEL_AUDIO
+ ),
+ # --- LanceDB / outputs ---------------------------------------------
+ lancedb_uri: str = typer.Option(LANCEDB_URI, "--lancedb-uri", rich_help_panel=_PANEL_LANCEDB),
+ save_intermediate: Optional[Path] = typer.Option(
+ None,
+ "--save-intermediate",
+ help="Directory to write extraction results as Parquet (for full-page markdown / page index).",
+ path_type=Path,
+ file_okay=False,
+ dir_okay=True,
+ rich_help_panel=_PANEL_LANCEDB,
+ ),
+ hybrid: bool = typer.Option(False, "--hybrid/--no-hybrid", rich_help_panel=_PANEL_LANCEDB),
+ detection_summary_file: Optional[Path] = typer.Option(
+ None, "--detection-summary-file", path_type=Path, rich_help_panel=_PANEL_LANCEDB
+ ),
+ runtime_metrics_dir: Optional[Path] = typer.Option(
+ None, "--runtime-metrics-dir", path_type=Path, rich_help_panel=_PANEL_OBS
+ ),
+ runtime_metrics_prefix: Optional[str] = typer.Option(None, "--runtime-metrics-prefix", rich_help_panel=_PANEL_OBS),
+ # --- Evaluation -----------------------------------------------------
+ evaluation_mode: str = typer.Option("recall", "--evaluation-mode", rich_help_panel=_PANEL_EVAL),
+ query_csv: Path = typer.Option(
+ "./data/bo767_query_gt.csv",
+ "--query-csv",
+ path_type=Path,
+ rich_help_panel=_PANEL_EVAL,
+ ),
+ recall_match_mode: str = typer.Option("pdf_page", "--recall-match-mode", rich_help_panel=_PANEL_EVAL),
+ recall_details: bool = typer.Option(True, "--recall-details/--no-recall-details", rich_help_panel=_PANEL_EVAL),
+ reranker: Optional[bool] = typer.Option(False, "--reranker/--no-reranker", rich_help_panel=_PANEL_EVAL),
+ reranker_model_name: str = typer.Option(VL_RERANK_MODEL, "--reranker-model-name", rich_help_panel=_PANEL_EVAL),
+ beir_loader: Optional[str] = typer.Option(None, "--beir-loader", rich_help_panel=_PANEL_EVAL),
+ beir_dataset_name: Optional[str] = typer.Option(None, "--beir-dataset-name", rich_help_panel=_PANEL_EVAL),
+ beir_split: str = typer.Option("test", "--beir-split", rich_help_panel=_PANEL_EVAL),
+ beir_query_language: Optional[str] = typer.Option(None, "--beir-query-language", rich_help_panel=_PANEL_EVAL),
+ beir_doc_id_field: str = typer.Option("pdf_basename", "--beir-doc-id-field", rich_help_panel=_PANEL_EVAL),
+ beir_k: list[int] = typer.Option([], "--beir-k", rich_help_panel=_PANEL_EVAL),
+) -> None:
+ """Run the end-to-end graph ingestion pipeline against ``INPUT_PATH``."""
+
+ _ = ctx
+ log_handle, original_stdout, original_stderr = _configure_logging(log_file, debug=bool(debug))
+ try:
+ if run_mode not in {"batch", "inprocess"}:
+ raise ValueError(f"Unsupported --run-mode: {run_mode!r}")
+ if recall_match_mode not in {"pdf_page", "pdf_only", "audio_segment"}:
+ raise ValueError(f"Unsupported --recall-match-mode: {recall_match_mode!r}")
+ if audio_split_type not in {"size", "time", "frame"}:
+ raise ValueError(f"Unsupported --audio-split-type: {audio_split_type!r}")
+ if evaluation_mode not in {"recall", "beir"}:
+ raise ValueError(f"Unsupported --evaluation-mode: {evaluation_mode!r}")
+
+ if run_mode == "batch":
+ os.environ["RAY_LOG_TO_DRIVER"] = "1" if ray_log_to_driver else "0"
+
+ lancedb_uri = str(Path(lancedb_uri).expanduser().resolve())
+ _ensure_lancedb_table(lancedb_uri, LANCEDB_TABLE)
+
+ remote_api_key = resolve_remote_api_key(api_key)
+ extract_remote_api_key = remote_api_key
+ embed_remote_api_key = remote_api_key
+ caption_remote_api_key = remote_api_key
+
+ if (
+ any(
+ (
+ page_elements_invoke_url,
+ ocr_invoke_url,
+ graphic_elements_invoke_url,
+ table_structure_invoke_url,
+ embed_invoke_url,
+ )
+ )
+ and remote_api_key is None
+ ):
+ logger.warning("Remote endpoint URL(s) were configured without an API key.")
+
+ # Zero out GPU fractions when a remote URL replaces the local model.
+ if page_elements_invoke_url and float(page_elements_gpus_per_actor or 0.0) != 0.0:
+ logger.warning("Forcing page-elements GPUs to 0.0 because --page-elements-invoke-url is set.")
+ page_elements_gpus_per_actor = 0.0
+ if ocr_invoke_url and float(ocr_gpus_per_actor or 0.0) != 0.0:
+ logger.warning("Forcing OCR GPUs to 0.0 because --ocr-invoke-url is set.")
+ ocr_gpus_per_actor = 0.0
+ if embed_invoke_url and float(embed_gpus_per_actor or 0.0) != 0.0:
+ logger.warning("Forcing embed GPUs to 0.0 because --embed-invoke-url is set.")
+ embed_gpus_per_actor = 0.0
+
+ file_patterns = _resolve_file_patterns(Path(input_path), input_type)
+
+ extract_params = _build_extract_params(
+ method=method,
+ dpi=dpi,
+ extract_text=extract_text,
+ extract_tables=extract_tables,
+ extract_charts=extract_charts,
+ extract_infographics=extract_infographics,
+ extract_page_as_image=extract_page_as_image,
+ use_graphic_elements=use_graphic_elements,
+ use_table_structure=use_table_structure,
+ table_output_format=table_output_format,
+ extract_remote_api_key=extract_remote_api_key,
+ page_elements_invoke_url=page_elements_invoke_url,
+ ocr_invoke_url=ocr_invoke_url,
+ graphic_elements_invoke_url=graphic_elements_invoke_url,
+ table_structure_invoke_url=table_structure_invoke_url,
+ pdf_split_batch_size=pdf_split_batch_size,
+ pdf_extract_batch_size=pdf_extract_batch_size,
+ pdf_extract_tasks=pdf_extract_tasks,
+ pdf_extract_cpus_per_task=pdf_extract_cpus_per_task,
+ page_elements_actors=page_elements_actors,
+ page_elements_batch_size=page_elements_batch_size,
+ page_elements_cpus_per_actor=page_elements_cpus_per_actor,
+ page_elements_gpus_per_actor=page_elements_gpus_per_actor,
+ ocr_actors=ocr_actors,
+ ocr_batch_size=ocr_batch_size,
+ ocr_cpus_per_actor=ocr_cpus_per_actor,
+ ocr_gpus_per_actor=ocr_gpus_per_actor,
+ nemotron_parse_actors=nemotron_parse_actors,
+ nemotron_parse_batch_size=nemotron_parse_batch_size,
+ nemotron_parse_gpus_per_actor=nemotron_parse_gpus_per_actor,
+ )
+
+ embed_params = _build_embed_params(
+ embed_model_name=embed_model_name,
+ embed_invoke_url=embed_invoke_url,
+ embed_remote_api_key=embed_remote_api_key,
+ embed_modality=embed_modality,
+ text_elements_modality=text_elements_modality,
+ structured_elements_modality=structured_elements_modality,
+ embed_granularity=embed_granularity,
+ embed_actors=embed_actors,
+ embed_batch_size=embed_batch_size,
+ embed_cpus_per_actor=embed_cpus_per_actor,
+ embed_gpus_per_actor=embed_gpus_per_actor,
+ )
+
+ text_chunk_params = TextChunkParams(
+ max_tokens=text_chunk_max_tokens or 1024,
+ overlap_tokens=text_chunk_overlap_tokens if text_chunk_overlap_tokens is not None else 150,
+ )
+
+ enable_text_chunk = text_chunk or text_chunk_max_tokens is not None or text_chunk_overlap_tokens is not None
+ enable_caption = caption or caption_invoke_url is not None
+ enable_dedup = dedup if dedup is not None else enable_caption
+
+ logger.info("Building graph pipeline (run_mode=%s) for %s ...", run_mode, input_path)
+ ingestor = _build_ingestor(
+ run_mode=run_mode,
+ ray_address=ray_address,
+ file_patterns=file_patterns,
+ input_type=input_type,
+ extract_params=extract_params,
+ embed_params=embed_params,
+ text_chunk_params=text_chunk_params,
+ enable_text_chunk=enable_text_chunk,
+ enable_dedup=enable_dedup,
+ enable_caption=enable_caption,
+ dedup_iou_threshold=dedup_iou_threshold,
+ caption_invoke_url=caption_invoke_url,
+ caption_remote_api_key=caption_remote_api_key,
+ caption_model_name=caption_model_name,
+ caption_device=caption_device,
+ caption_context_text_max_chars=caption_context_text_max_chars,
+ caption_gpu_memory_utilization=caption_gpu_memory_utilization,
+ caption_gpus_per_actor=caption_gpus_per_actor,
+ store_images_uri=store_images_uri,
+ store_text=store_text,
+ strip_base64=strip_base64,
+ segment_audio=segment_audio,
+ audio_split_type=audio_split_type,
+ audio_split_interval=audio_split_interval,
+ )
+
+ # --- Execute ---------------------------------------------------
+ logger.info("Starting ingestion of %s ...", input_path)
+ ingest_start = time.perf_counter()
+ raw_result = ingestor.ingest()
+ ingestion_only_total_time = time.perf_counter() - ingest_start
+
+ ingest_local_results, result_df, ray_download_time, num_rows = _collect_results(run_mode, raw_result)
+
+ if save_intermediate is not None:
+ out_dir = Path(save_intermediate).expanduser().resolve()
+ out_dir.mkdir(parents=True, exist_ok=True)
+ out_path = out_dir / "extraction.parquet"
+ result_df.to_parquet(out_path, index=False)
+ logger.info("Wrote extraction Parquet for intermediate use: %s", out_path)
+
+ if detection_summary_file is not None:
+ from nemo_retriever.utils.detection_summary import (
+ collect_detection_summary_from_df,
+ write_detection_summary,
+ )
+
+ write_detection_summary(
+ Path(detection_summary_file),
+ collect_detection_summary_from_df(result_df),
+ )
+
+ # --- Write to LanceDB ----------------------------------------
+ lancedb_write_start = time.perf_counter()
+ handle_lancedb(ingest_local_results, lancedb_uri, LANCEDB_TABLE, hybrid=hybrid, mode="overwrite")
+ lancedb_write_time = time.perf_counter() - lancedb_write_start
+
+ # --- Evaluation ---------------------------------------------
+ import lancedb as _lancedb_mod
+
+ db = _lancedb_mod.connect(lancedb_uri)
+ table = db.open_table(LANCEDB_TABLE)
+
+ if int(table.count_rows()) == 0:
+ logger.warning("LanceDB table is empty; skipping %s evaluation.", evaluation_mode)
+ _write_runtime_summary(
+ runtime_metrics_dir,
+ runtime_metrics_prefix,
+ {
+ "run_mode": run_mode,
+ "input_path": str(Path(input_path).resolve()),
+ "input_pages": int(num_rows),
+ "num_pages": int(num_rows),
+ "num_rows": int(len(result_df.index)),
+ "ingestion_only_secs": float(ingestion_only_total_time),
+ "ray_download_secs": float(ray_download_time),
+ "lancedb_write_secs": float(lancedb_write_time),
+ "evaluation_secs": 0.0,
+ "total_secs": float(time.perf_counter() - ingest_start),
+ "evaluation_mode": evaluation_mode,
+ "evaluation_metrics": {},
+ "recall_details": bool(recall_details),
+ "lancedb_uri": str(lancedb_uri),
+ "lancedb_table": str(LANCEDB_TABLE),
+ },
+ )
+ if run_mode == "batch":
+ import ray
+
+ ray.shutdown()
+ return
+
+ evaluation_label, evaluation_total_time, evaluation_metrics, evaluation_query_count, ran = _run_evaluation(
+ evaluation_mode=evaluation_mode,
+ lancedb_uri=lancedb_uri,
+ embed_model_name=embed_model_name,
+ embed_invoke_url=embed_invoke_url,
+ embed_remote_api_key=embed_remote_api_key,
+ embed_modality=embed_modality,
+ query_csv=query_csv,
+ recall_match_mode=recall_match_mode,
+ audio_match_tolerance_secs=audio_match_tolerance_secs,
+ hybrid=hybrid,
+ reranker=reranker,
+ reranker_model_name=reranker_model_name,
+ beir_loader=beir_loader,
+ beir_dataset_name=beir_dataset_name,
+ beir_split=beir_split,
+ beir_query_language=beir_query_language,
+ beir_doc_id_field=beir_doc_id_field,
+ beir_k=beir_k,
+ )
+
+ if not ran:
+ _write_runtime_summary(
+ runtime_metrics_dir,
+ runtime_metrics_prefix,
+ {
+ "run_mode": run_mode,
+ "input_path": str(Path(input_path).resolve()),
+ "input_pages": int(num_rows),
+ "num_pages": int(num_rows),
+ "num_rows": int(len(result_df.index)),
+ "ingestion_only_secs": float(ingestion_only_total_time),
+ "ray_download_secs": float(ray_download_time),
+ "lancedb_write_secs": float(lancedb_write_time),
+ "evaluation_secs": 0.0,
+ "total_secs": float(time.perf_counter() - ingest_start),
+ "evaluation_mode": evaluation_mode,
+ "evaluation_metrics": {},
+ "recall_details": bool(recall_details),
+ "lancedb_uri": str(lancedb_uri),
+ "lancedb_table": str(LANCEDB_TABLE),
+ },
+ )
+ if run_mode == "batch":
+ import ray
+
+ ray.shutdown()
+ return
+
+ total_time = time.perf_counter() - ingest_start
+
+ _write_runtime_summary(
+ runtime_metrics_dir,
+ runtime_metrics_prefix,
+ {
+ "run_mode": run_mode,
+ "input_path": str(Path(input_path).resolve()),
+ "input_pages": int(num_rows),
+ "num_pages": int(num_rows),
+ "num_rows": int(len(result_df.index)),
+ "ingestion_only_secs": float(ingestion_only_total_time),
+ "ray_download_secs": float(ray_download_time),
+ "lancedb_write_secs": float(lancedb_write_time),
+ "evaluation_secs": float(evaluation_total_time),
+ "total_secs": float(total_time),
+ "evaluation_mode": evaluation_mode,
+ "evaluation_metrics": dict(evaluation_metrics),
+ "evaluation_count": evaluation_query_count,
+ "recall_details": bool(recall_details),
+ "lancedb_uri": str(lancedb_uri),
+ "lancedb_table": str(LANCEDB_TABLE),
+ },
+ )
+
+ if run_mode == "batch":
+ import ray
+
+ ray.shutdown()
+
+ from nemo_retriever.utils.detection_summary import print_run_summary
+
+ print_run_summary(
+ num_rows,
+ Path(input_path),
+ hybrid,
+ lancedb_uri,
+ LANCEDB_TABLE,
+ total_time,
+ ingestion_only_total_time,
+ ray_download_time,
+ lancedb_write_time,
+ evaluation_total_time,
+ evaluation_metrics,
+ evaluation_label=evaluation_label,
+ evaluation_count=evaluation_query_count,
+ )
+ finally:
+ os.sys.stdout = original_stdout
+ os.sys.stderr = original_stderr
+ if log_handle is not None:
+ log_handle.close()
+
+
+def main() -> None:
+ """Entrypoint for ``python -m nemo_retriever.pipeline``."""
+ app()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/nemo_retriever/tests/test_pipeline_helpers.py b/nemo_retriever/tests/test_pipeline_helpers.py
new file mode 100644
index 000000000..4c1d404c0
--- /dev/null
+++ b/nemo_retriever/tests/test_pipeline_helpers.py
@@ -0,0 +1,642 @@
+# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
+# All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+"""Unit tests for the private helpers in :mod:`nemo_retriever.pipeline`.
+
+These tests target the non-trivial pieces of logic that the ``retriever
+pipeline run`` command relies on:
+
+* ``_resolve_file_patterns`` — file/dir/glob resolution and input-type map.
+* ``_build_extract_params`` — translation from CLI flags to ``ExtractParams``
+ and the nested ``BatchTuningParams``.
+* ``_build_embed_params`` — translation from CLI flags to ``EmbedParams``.
+* ``_collect_results`` — Ray ``take_all`` vs. in-process DataFrame
+ handling + ``_count_input_units`` fallback.
+* ``_ensure_lancedb_table`` — idempotent LanceDB table creation.
+
+They also exercise the lazy attribute access in
+``nemo_retriever.pipeline.__init__`` so the package-level ``app`` / ``run``
+exports stay wired to ``__main__``.
+"""
+
+from __future__ import annotations
+
+from pathlib import Path
+from typing import Any
+
+import pandas as pd
+import pyarrow as pa
+import pytest
+import typer
+
+import nemo_retriever.pipeline as pipeline_pkg
+from nemo_retriever.params import EmbedParams, ExtractParams
+from nemo_retriever.pipeline.__main__ import (
+ _build_embed_params,
+ _build_extract_params,
+ _collect_results,
+ _count_input_units,
+ _ensure_lancedb_table,
+ _resolve_file_patterns,
+)
+
+
+# =============================================================================
+# Package-level lazy exports (pipeline/__init__.py)
+# =============================================================================
+
+
+class TestPipelinePackageExports:
+ """The ``nemo_retriever.pipeline`` package forwards ``app`` and ``run``."""
+
+ def test_app_is_forwarded_from_main(self):
+ from nemo_retriever.pipeline.__main__ import app as main_app
+
+ assert pipeline_pkg.app is main_app
+
+ def test_run_is_forwarded_from_main(self):
+ from nemo_retriever.pipeline.__main__ import run as main_run
+
+ assert pipeline_pkg.run is main_run
+
+ def test_unknown_attribute_raises_attribute_error(self):
+ with pytest.raises(AttributeError, match="no attribute 'does_not_exist'"):
+ _ = pipeline_pkg.does_not_exist # type: ignore[attr-defined]
+
+ def test_dunder_all(self):
+ assert set(pipeline_pkg.__all__) == {"app", "run"}
+
+
+# =============================================================================
+# _resolve_file_patterns
+# =============================================================================
+
+
+class TestResolveFilePatterns:
+ """File / directory / input-type resolution."""
+
+ # --- single-file short-circuit ----------------------------------------
+ def test_single_existing_file_returned_verbatim(self, tmp_path: Path) -> None:
+ pdf = tmp_path / "doc.pdf"
+ pdf.write_bytes(b"%PDF-1.4 test")
+
+ # input_type does not matter when the path is an existing file.
+ result = _resolve_file_patterns(pdf, "pdf")
+ assert result == [str(pdf)]
+
+ def test_single_file_ignores_input_type_mismatch(self, tmp_path: Path) -> None:
+ """A single file path is returned as-is, even if the extension doesn't match."""
+ f = tmp_path / "arbitrary.pdf"
+ f.write_bytes(b"not really a pdf")
+
+ # We still get [str(file)] back even though input_type is wildly wrong.
+ result = _resolve_file_patterns(f, "audio")
+ assert result == [str(f)]
+
+ # --- directory: per-input-type extension maps --------------------------
+ @pytest.mark.parametrize(
+ "input_type,files,expected_patterns",
+ [
+ ("pdf", ["a.pdf"], ["*.pdf"]),
+ ("txt", ["notes.txt"], ["*.txt"]),
+ ("html", ["page.html"], ["*.html"]),
+ ("image", ["a.jpg"], ["*.jpg"]),
+ ("image", ["b.jpeg"], ["*.jpeg"]),
+ ("image", ["c.png"], ["*.png"]),
+ ("image", ["d.tiff"], ["*.tiff"]),
+ ("image", ["e.bmp"], ["*.bmp"]),
+ ("audio", ["clip.mp3"], ["*.mp3"]),
+ ("audio", ["clip.wav"], ["*.wav"]),
+ ("audio", ["clip.m4a"], ["*.m4a"]),
+ ],
+ )
+ def test_directory_match_by_input_type(
+ self,
+ tmp_path: Path,
+ input_type: str,
+ files: list[str],
+ expected_patterns: list[str],
+ ) -> None:
+ for name in files:
+ (tmp_path / name).write_bytes(b"x")
+
+ result = _resolve_file_patterns(tmp_path, input_type)
+
+ # Every returned pattern should match at least one expected suffix.
+ assert result == [str(tmp_path / p) for p in expected_patterns]
+
+ def test_doc_input_type_matches_both_docx_and_pptx(self, tmp_path: Path) -> None:
+ (tmp_path / "a.docx").write_bytes(b"x")
+ (tmp_path / "b.pptx").write_bytes(b"x")
+
+ result = _resolve_file_patterns(tmp_path, "doc")
+
+ assert result == [str(tmp_path / "*.docx"), str(tmp_path / "*.pptx")]
+
+ def test_doc_input_type_only_one_family_present(self, tmp_path: Path) -> None:
+ """When only docx files are present, we still only get the docx glob back."""
+ (tmp_path / "a.docx").write_bytes(b"x")
+
+ result = _resolve_file_patterns(tmp_path, "doc")
+
+ # Only the matching glob survives the non-empty filter.
+ assert result == [str(tmp_path / "*.docx")]
+
+ def test_image_input_type_multiple_extensions_filtered(self, tmp_path: Path) -> None:
+ """Only extensions that actually match get returned; empty globs are filtered out."""
+ (tmp_path / "a.jpg").write_bytes(b"x")
+ (tmp_path / "b.png").write_bytes(b"x")
+
+ result = _resolve_file_patterns(tmp_path, "image")
+
+ assert set(result) == {str(tmp_path / "*.jpg"), str(tmp_path / "*.png")}
+ # No unmatched patterns slip through.
+ assert not any(p.endswith("*.tiff") for p in result)
+ assert not any(p.endswith("*.bmp") for p in result)
+ assert not any(p.endswith("*.jpeg") for p in result)
+
+ # --- error paths -------------------------------------------------------
+ def test_nonexistent_path_raises_bad_parameter(self, tmp_path: Path) -> None:
+ missing = tmp_path / "does_not_exist"
+ with pytest.raises(typer.BadParameter, match="Path does not exist"):
+ _resolve_file_patterns(missing, "pdf")
+
+ def test_unsupported_input_type_raises_bad_parameter(self, tmp_path: Path) -> None:
+ with pytest.raises(typer.BadParameter, match="Unsupported --input-type"):
+ _resolve_file_patterns(tmp_path, "parquet")
+
+ def test_directory_with_no_matches_raises_bad_parameter(self, tmp_path: Path) -> None:
+ # Directory exists but contains nothing matching the pdf glob.
+ (tmp_path / "sidecar.json").write_bytes(b"{}")
+
+ with pytest.raises(typer.BadParameter, match="No files found"):
+ _resolve_file_patterns(tmp_path, "pdf")
+
+ def test_accepts_string_path_not_just_pathlib(self, tmp_path: Path) -> None:
+ (tmp_path / "sample.txt").write_bytes(b"x")
+
+ # Upstream callers sometimes pass a ``str``; the helper coerces to Path.
+ result = _resolve_file_patterns(str(tmp_path), "txt") # type: ignore[arg-type]
+
+ assert result == [str(Path(tmp_path) / "*.txt")]
+
+
+# =============================================================================
+# _build_extract_params
+# =============================================================================
+
+
+_EXTRACT_BASE = dict(
+ method="pdfium",
+ dpi=300,
+ extract_text=True,
+ extract_tables=True,
+ extract_charts=True,
+ extract_infographics=False,
+ extract_page_as_image=True,
+ use_graphic_elements=False,
+ use_table_structure=False,
+ table_output_format=None,
+ extract_remote_api_key=None,
+ page_elements_invoke_url=None,
+ ocr_invoke_url=None,
+ graphic_elements_invoke_url=None,
+ table_structure_invoke_url=None,
+ pdf_split_batch_size=1,
+ pdf_extract_batch_size=0,
+ pdf_extract_tasks=0,
+ pdf_extract_cpus_per_task=0.0,
+ page_elements_actors=0,
+ page_elements_batch_size=0,
+ page_elements_cpus_per_actor=0.0,
+ page_elements_gpus_per_actor=None,
+ ocr_actors=0,
+ ocr_batch_size=0,
+ ocr_cpus_per_actor=0.0,
+ ocr_gpus_per_actor=None,
+ nemotron_parse_actors=0,
+ nemotron_parse_batch_size=0,
+ nemotron_parse_gpus_per_actor=None,
+)
+
+
+class TestBuildExtractParams:
+ """Translation from CLI flags to ``ExtractParams`` + ``BatchTuningParams``."""
+
+ def test_returns_extract_params(self):
+ params = _build_extract_params(**_EXTRACT_BASE)
+ assert isinstance(params, ExtractParams)
+ assert params.method == "pdfium"
+ assert params.dpi == 300
+ assert params.extract_text is True
+ assert params.extract_tables is True
+ assert params.extract_charts is True
+ assert params.extract_infographics is False
+ assert params.extract_page_as_image is True
+ assert params.use_graphic_elements is False
+ assert params.use_table_structure is False
+
+ def test_dpi_coerced_to_int(self):
+ # CLI passes an int, but the helper explicitly coerces — cover the branch.
+ overrides = {**_EXTRACT_BASE, "dpi": 150.7} # type: ignore[dict-item]
+ params = _build_extract_params(**overrides)
+ assert params.dpi == 150
+ assert isinstance(params.dpi, int)
+
+ def test_zero_batch_sizes_are_dropped(self):
+ """``... or None`` should drop CLI ``0`` defaults so pydantic defaults win."""
+ params = _build_extract_params(**_EXTRACT_BASE)
+
+ tuning = params.batch_tuning
+ # The BatchTuningParams defaults (non-None) should remain.
+ assert tuning.pdf_extract_batch_size == 4 # pydantic default, not 0
+ assert tuning.page_elements_batch_size == 24
+ # Workers stay None (CLI 0 -> None -> pydantic Optional default None).
+ assert tuning.pdf_extract_workers is None
+ assert tuning.page_elements_workers is None
+ assert tuning.ocr_workers is None
+
+ def test_pdf_split_batch_size_is_always_applied(self):
+ # pdf_split_batch_size lacks ``or None`` so ``1`` gets through verbatim.
+ params = _build_extract_params(**_EXTRACT_BASE)
+ assert params.batch_tuning.pdf_split_batch_size == 1
+
+ overrides = {**_EXTRACT_BASE, "pdf_split_batch_size": 8}
+ params = _build_extract_params(**overrides)
+ assert params.batch_tuning.pdf_split_batch_size == 8
+
+ def test_explicit_worker_counts_flow_through(self):
+ overrides = {
+ **_EXTRACT_BASE,
+ "pdf_extract_tasks": 3,
+ "pdf_extract_batch_size": 16,
+ "pdf_extract_cpus_per_task": 2.5,
+ "page_elements_actors": 4,
+ "page_elements_batch_size": 32,
+ "page_elements_cpus_per_actor": 0.75,
+ "ocr_actors": 2,
+ "ocr_batch_size": 12,
+ "ocr_cpus_per_actor": 1.25,
+ "nemotron_parse_actors": 1,
+ "nemotron_parse_batch_size": 6,
+ }
+ params = _build_extract_params(**overrides)
+
+ tuning = params.batch_tuning
+ assert tuning.pdf_extract_workers == 3
+ assert tuning.pdf_extract_batch_size == 16
+ assert tuning.pdf_extract_num_cpus == 2.5
+ assert tuning.page_elements_workers == 4
+ assert tuning.page_elements_batch_size == 32
+ assert tuning.page_elements_cpus_per_actor == 0.75
+ assert tuning.ocr_workers == 2
+ assert tuning.ocr_inference_batch_size == 12
+ assert tuning.ocr_cpus_per_actor == 1.25
+ assert tuning.nemotron_parse_workers == 1
+ assert tuning.nemotron_parse_batch_size == 6
+
+ # --- GPU gating: remote URL forces gpu_* = 0.0 -----------------------
+ def test_page_elements_invoke_url_forces_zero_gpu(self):
+ overrides = {
+ **_EXTRACT_BASE,
+ "page_elements_invoke_url": "http://pe.example/v1",
+ "page_elements_gpus_per_actor": 0.5, # would be 0.5 without URL
+ }
+ params = _build_extract_params(**overrides)
+ assert params.batch_tuning.gpu_page_elements == 0.0
+ assert params.page_elements_invoke_url == "http://pe.example/v1"
+
+ def test_ocr_invoke_url_forces_zero_gpu(self):
+ overrides = {
+ **_EXTRACT_BASE,
+ "ocr_invoke_url": "http://ocr.example/v1",
+ "ocr_gpus_per_actor": 0.75,
+ }
+ params = _build_extract_params(**overrides)
+ assert params.batch_tuning.gpu_ocr == 0.0
+ assert params.ocr_invoke_url == "http://ocr.example/v1"
+
+ def test_local_gpu_values_flow_through_when_no_remote_url(self):
+ overrides = {
+ **_EXTRACT_BASE,
+ "page_elements_gpus_per_actor": 0.25,
+ "ocr_gpus_per_actor": 0.5,
+ "nemotron_parse_gpus_per_actor": 0.33,
+ }
+ params = _build_extract_params(**overrides)
+ assert params.batch_tuning.gpu_page_elements == 0.25
+ assert params.batch_tuning.gpu_ocr == 0.5
+ assert params.batch_tuning.gpu_nemotron_parse == 0.33
+
+ def test_gpu_none_values_dropped_when_no_remote_url(self):
+ """With no remote URL and gpu=None, the key is dropped and pydantic default applies."""
+ overrides = {
+ **_EXTRACT_BASE,
+ "page_elements_gpus_per_actor": None,
+ "ocr_gpus_per_actor": None,
+ }
+ params = _build_extract_params(**overrides)
+ # BatchTuningParams declares these as Optional[float] with default None.
+ assert params.batch_tuning.gpu_page_elements is None
+ assert params.batch_tuning.gpu_ocr is None
+
+ # --- api_key + endpoints pass through --------------------------------
+ def test_api_key_and_endpoints_pass_through(self):
+ overrides = {
+ **_EXTRACT_BASE,
+ "extract_remote_api_key": "nvapi-secret",
+ "page_elements_invoke_url": "http://pe/v1",
+ "ocr_invoke_url": "http://ocr/v1",
+ "graphic_elements_invoke_url": "http://ge/v1",
+ "table_structure_invoke_url": "http://ts/v1",
+ "table_output_format": "markdown",
+ }
+ params = _build_extract_params(**overrides)
+ assert params.api_key == "nvapi-secret"
+ assert params.page_elements_invoke_url == "http://pe/v1"
+ assert params.ocr_invoke_url == "http://ocr/v1"
+ assert params.graphic_elements_invoke_url == "http://ge/v1"
+ assert params.table_structure_invoke_url == "http://ts/v1"
+ # The model validator auto-enables feature flags when URLs are set.
+ assert params.use_graphic_elements is True
+ assert params.use_table_structure is True
+ assert params.table_output_format == "markdown"
+
+ def test_none_values_are_filtered_before_pydantic(self):
+ """Keys whose CLI value is ``None`` must be omitted, not forwarded as ``None``."""
+ overrides = {
+ **_EXTRACT_BASE,
+ "table_output_format": None,
+ "extract_remote_api_key": None,
+ }
+ params = _build_extract_params(**overrides)
+ # ``table_output_format=None`` gets dropped and the model validator fills
+ # it in based on ``use_table_structure`` (False → "pseudo_markdown").
+ assert params.table_output_format == "pseudo_markdown"
+ # api_key is truly None since it wasn't forced.
+ assert params.api_key is None
+
+
+# =============================================================================
+# _build_embed_params
+# =============================================================================
+
+
+_EMBED_BASE = dict(
+ embed_model_name="nvidia/test-embed",
+ embed_invoke_url=None,
+ embed_remote_api_key=None,
+ embed_modality="text",
+ text_elements_modality=None,
+ structured_elements_modality=None,
+ embed_granularity="element",
+ embed_actors=0,
+ embed_batch_size=0,
+ embed_cpus_per_actor=0.0,
+ embed_gpus_per_actor=None,
+)
+
+
+class TestBuildEmbedParams:
+ """Translation from CLI flags to ``EmbedParams`` + ``BatchTuningParams``."""
+
+ def test_returns_embed_params(self):
+ params = _build_embed_params(**_EMBED_BASE)
+ assert isinstance(params, EmbedParams)
+ assert params.model_name == "nvidia/test-embed"
+ assert params.embed_modality == "text"
+ assert params.embed_granularity == "element"
+
+ def test_zero_batch_size_dropped_to_pydantic_default(self):
+ params = _build_embed_params(**_EMBED_BASE)
+ # ``embed_batch_size or None`` drops 0; pydantic default (256) wins.
+ assert params.batch_tuning.embed_batch_size == 256
+ # Same for embed_workers (Optional, default None).
+ assert params.batch_tuning.embed_workers is None
+ # inference_batch_size comes from the same ``or None`` pattern;
+ # pydantic default is 32.
+ assert params.inference_batch_size == 32
+
+ def test_explicit_batch_params_flow_through(self):
+ overrides = {
+ **_EMBED_BASE,
+ "embed_actors": 3,
+ "embed_batch_size": 64,
+ "embed_cpus_per_actor": 1.5,
+ }
+ params = _build_embed_params(**overrides)
+ assert params.batch_tuning.embed_workers == 3
+ assert params.batch_tuning.embed_batch_size == 64
+ assert params.batch_tuning.embed_cpus_per_actor == 1.5
+ # ``inference_batch_size`` is also pinned from ``embed_batch_size``.
+ assert params.inference_batch_size == 64
+
+ def test_embed_invoke_url_forces_zero_gpu(self):
+ overrides = {
+ **_EMBED_BASE,
+ "embed_invoke_url": "http://embed.example/v1",
+ "embed_gpus_per_actor": 0.5,
+ "embed_remote_api_key": "nvapi-xyz",
+ }
+ params = _build_embed_params(**overrides)
+ assert params.batch_tuning.gpu_embed == 0.0
+ assert params.embed_invoke_url == "http://embed.example/v1"
+ assert params.api_key == "nvapi-xyz"
+
+ def test_local_embed_gpu_value_flows_through(self):
+ overrides = {**_EMBED_BASE, "embed_gpus_per_actor": 0.25}
+ params = _build_embed_params(**overrides)
+ assert params.batch_tuning.gpu_embed == 0.25
+
+ def test_local_embed_gpu_none_dropped(self):
+ overrides = {**_EMBED_BASE, "embed_gpus_per_actor": None}
+ params = _build_embed_params(**overrides)
+ assert params.batch_tuning.gpu_embed is None
+
+ def test_modality_overrides_pass_through(self):
+ overrides = {
+ **_EMBED_BASE,
+ "embed_modality": "text_image",
+ "text_elements_modality": "text",
+ "structured_elements_modality": "image",
+ "embed_granularity": "element",
+ }
+ params = _build_embed_params(**overrides)
+ assert params.embed_modality == "text_image"
+ assert params.text_elements_modality == "text"
+ assert params.structured_elements_modality == "image"
+
+ def test_invalid_modality_raises_pydantic_validation_error(self):
+ overrides = {**_EMBED_BASE, "embed_modality": "image_text"}
+ with pytest.raises(Exception, match="text_image"):
+ _build_embed_params(**overrides)
+
+
+# =============================================================================
+# _collect_results (and _count_input_units)
+# =============================================================================
+
+
+class _FakeRayDataset:
+ """Minimal stand-in for a Ray Data dataset produced by ``ingestor.ingest()``."""
+
+ def __init__(self, records: list[dict[str, Any]]) -> None:
+ self._records = records
+ self.take_all_calls = 0
+
+ def take_all(self) -> list[dict[str, Any]]:
+ self.take_all_calls += 1
+ return list(self._records)
+
+
+class TestCollectResults:
+ """Ray (batch) and pandas (inprocess) result materialization."""
+
+ def test_batch_mode_calls_take_all_and_builds_dataframe(self):
+ rows = [
+ {"source_id": "a", "text": "hello"},
+ {"source_id": "a", "text": "world"},
+ {"source_id": "b", "text": "!"},
+ ]
+ fake = _FakeRayDataset(rows)
+
+ records, df, download_time, num_units = _collect_results("batch", fake)
+
+ assert fake.take_all_calls == 1
+ assert records == rows
+ assert isinstance(df, pd.DataFrame)
+ assert list(df.columns) == ["source_id", "text"]
+ assert len(df) == 3
+ # ``source_id`` has two distinct values → that is the unit count.
+ assert num_units == 2
+ # Elapsed time is measured with ``perf_counter``; it must be non-negative.
+ assert download_time >= 0.0
+
+ def test_batch_mode_handles_empty_result(self):
+ fake = _FakeRayDataset([])
+ records, df, download_time, num_units = _collect_results("batch", fake)
+ assert records == []
+ assert df.empty
+ # Empty DataFrame has no columns → falls through to len(df.index) == 0.
+ assert num_units == 0
+ assert download_time >= 0.0
+
+ def test_inprocess_mode_accepts_dataframe_directly(self):
+ rows = [
+ {"source_id": "a", "text": "x"},
+ {"source_id": "b", "text": "y"},
+ ]
+ df_in = pd.DataFrame(rows)
+
+ records, df_out, download_time, num_units = _collect_results("inprocess", df_in)
+
+ # The DataFrame is passed through unchanged (same object).
+ assert df_out is df_in
+ assert records == rows
+ # inprocess mode never incurs Ray download time.
+ assert download_time == 0.0
+ assert num_units == 2
+
+
+class TestCountInputUnits:
+ """``_count_input_units`` fallback chain: source_id -> source_path -> len."""
+
+ def test_prefers_source_id(self):
+ df = pd.DataFrame(
+ {
+ "source_id": ["a", "a", "b"],
+ "source_path": ["/p1", "/p1", "/p2"],
+ "text": ["x", "y", "z"],
+ }
+ )
+ assert _count_input_units(df) == 2
+
+ def test_falls_back_to_source_path(self):
+ df = pd.DataFrame(
+ {
+ "source_path": ["/p1", "/p2", "/p2", "/p3"],
+ "text": ["x", "y", "z", "w"],
+ }
+ )
+ assert _count_input_units(df) == 3
+
+ def test_falls_back_to_len_when_no_source_columns(self):
+ df = pd.DataFrame({"text": ["x", "y", "z", "w"]})
+ assert _count_input_units(df) == 4
+
+ def test_empty_dataframe_without_columns(self):
+ df = pd.DataFrame()
+ assert _count_input_units(df) == 0
+
+
+# =============================================================================
+# _ensure_lancedb_table
+# =============================================================================
+
+
+class TestEnsureLancedbTable:
+ """Idempotent creation of the LanceDB table used by ``pipeline run``."""
+
+ def test_creates_uri_directory_when_missing(self, tmp_path: Path):
+ uri = tmp_path / "new_lancedb"
+ assert not uri.exists()
+
+ _ensure_lancedb_table(str(uri), "nv-ingest")
+
+ assert uri.exists() and uri.is_dir()
+
+ def test_creates_table_with_lancedb_schema(self, tmp_path: Path):
+ import lancedb
+ from nemo_retriever.vector_store.lancedb_utils import lancedb_schema
+
+ uri = tmp_path / "lancedb"
+ _ensure_lancedb_table(str(uri), "nv-ingest")
+
+ db = lancedb.connect(str(uri))
+ # Table exists and is empty.
+ tbl = db.open_table("nv-ingest")
+ assert tbl.count_rows() == 0
+
+ # The schema matches the canonical lancedb_schema() pa schema.
+ expected = lancedb_schema()
+ actual = tbl.schema
+ assert actual.names == expected.names
+ for name in expected.names:
+ assert actual.field(name).type == expected.field(name).type
+
+ def test_is_idempotent_when_table_already_exists(self, tmp_path: Path):
+ import lancedb
+
+ uri = tmp_path / "lancedb"
+ _ensure_lancedb_table(str(uri), "nv-ingest")
+
+ # Seed one row so we can confirm the table was not recreated/emptied.
+ db = lancedb.connect(str(uri))
+ tbl = db.open_table("nv-ingest")
+ schema = tbl.schema
+ payload = {f.name: [None] for f in schema}
+ payload["page_number"] = [1]
+ payload["text"] = ["seed"]
+ row = pa.table(payload, schema=schema)
+ tbl.add(row)
+ assert tbl.count_rows() == 1
+
+ # Second call must be a no-op — the existing data must survive.
+ _ensure_lancedb_table(str(uri), "nv-ingest")
+ tbl = lancedb.connect(str(uri)).open_table("nv-ingest")
+ assert tbl.count_rows() == 1
+
+ def test_respects_custom_table_name(self, tmp_path: Path):
+ import lancedb
+
+ uri = tmp_path / "lancedb"
+ _ensure_lancedb_table(str(uri), "custom-name")
+
+ db = lancedb.connect(str(uri))
+ # Prefer the non-deprecated ``list_tables`` API; fall back to
+ # ``table_names`` on older LanceDB releases. Both return either a flat
+ # sequence of names or a pydantic-style response with a ``tables`` attr.
+ raw = db.list_tables() if hasattr(db, "list_tables") else db.table_names()
+ names = getattr(raw, "tables", None) or list(raw)
+ assert "custom-name" in names