diff --git a/nemo_retriever/docs/cli/README.md b/nemo_retriever/docs/cli/README.md new file mode 100644 index 000000000..d0aef882e --- /dev/null +++ b/nemo_retriever/docs/cli/README.md @@ -0,0 +1,74 @@ +# Retriever CLI — Replacement Examples for `nv-ingest-cli` + +This folder contains `retriever` command-line examples that deliver the same +end-user outcomes as the `nv-ingest-cli` examples in +`nv-ingest/docs/`, `nv-ingest/api/`, `nv-ingest/client/`, and `nv-ingest/deploy/`. + +The original `nv-ingest-cli` documentation is **not removed** — these files sit +alongside it as a new-CLI counterpart you can link to or migrate to. + +## Key shape difference + +`nv-ingest-cli` is a **single command that talks to a running REST service on +`localhost:7670`** and composes work via repeated `--task extract|split|caption|embed|dedup|filter|udf`. + +`retriever` is a **multi-subcommand Typer app**. Most of the old CLI examples +map to `retriever pipeline run INPUT_PATH`, which runs the graph pipeline +locally (in-process or via Ray) and writes results to LanceDB and, optionally, +to Parquet / object storage. Other subcommands cover focused tasks: + +| Old intent | New subcommand | +|------------|----------------| +| Extract + embed + store a batch of documents | `retriever pipeline run` | +| Run an ad-hoc PDF extraction stage | `retriever pdf stage` | +| Run an HTML / text / audio / chart stage | `retriever html run`, `retriever txt run`, `retriever audio extract`, `retriever chart run` | +| Upload stage output to LanceDB | `retriever vector-store stage` | +| Query LanceDB + compute recall@k | `retriever recall vdb-recall` | +| Run a QA evaluation sweep | `retriever eval run` | +| Serve / submit to the online REST API | `retriever online serve` / `retriever online stream-pdf` | +| Benchmark stage throughput | `retriever benchmark {split,extract,audio-extract,page-elements,ocr,all}` | +| Benchmark orchestration | `retriever harness {run,sweep,nightly,summary,compare}` | + +## Contents + +| New file | Replaces example(s) in | +|----------|------------------------| +| [`retriever_cli.md`](retriever_cli.md) | `nv-ingest/docs/docs/extraction/nv-ingest_cli.md` and the rebranded mirror `cli-reference.md` | +| [`quickstart.md`](quickstart.md) | `nv-ingest/docs/docs/extraction/quickstart-guide.md` (the `nv-ingest-cli` section) | +| [`pdf-split-tuning.md`](pdf-split-tuning.md) | `nv-ingest/docs/docs/extraction/v2-api-guide.md` (CLI example) | +| [`smoke-test.md`](smoke-test.md) | `nv-ingest/api/api_tests/smoke_test.sh` | +| [`cli-client-usage.md`](cli-client-usage.md) | `nv-ingest/client/client_examples/examples/cli_client_usage.ipynb` | +| [`pdf-blueprint.md`](pdf-blueprint.md) | `nv-ingest/deploy/pdf-blueprint.ipynb` (CLI cell) | +| [`benchmarking.md`](benchmarking.md) | `nv-ingest/docs/docs/extraction/benchmarking.md` and `nv-ingest/tools/harness/README.md` | + +## Gaps with no retriever-CLI equivalent (kept out of this folder) + +The following `nv-ingest-cli` examples are **not** migrated here because the +new CLI does not yet expose an equivalent — continue to use `nv-ingest-cli` +for these cases: + +- `--task 'udf:{…}'` — user-defined functions + (`nv-ingest/docs/docs/extraction/user-defined-functions.md`, + `nv-ingest/examples/udfs/README.md`). `retriever` does not expose UDFs. +- `--task 'filter:{content_type:"image", min_size:…, min_aspect_ratio:…, max_aspect_ratio:…}'`. + The image scale/aspect-ratio filter stage is not reproduced in the new CLI. +- Bare service submission (`nv-ingest-cli --doc foo.pdf` with no extract tasks + and full content-type metadata returned by the service). `retriever online submit` + is currently a stub — only `retriever online stream-pdf` is implemented. +- `gen_dataset.py` dataset creation with enumeration and sampling. +- `--collect_profiling_traces --zipkin_host --zipkin_port`. Use + `--runtime-metrics-dir` / `--runtime-metrics-prefix` instead for a different + metrics flavor. + +## Conventions used in the examples + +- Input paths assume you invoke `retriever` from the `nv-ingest/nemo_retriever` + directory (or point at absolute paths). +- `--save-intermediate ` writes the extraction DataFrame as Parquet for + inspection. LanceDB output goes to `--lancedb-uri` (defaults to `./lancedb`). +- `--store-images-uri ` stores extracted images to a local path or an + fsspec URI (e.g. `s3://bucket/prefix`). +- `--run-mode inprocess` skips Ray and is ideal for single-file demos and CI; + `--run-mode batch` (the default) uses Ray Data for throughput. + +Run `retriever pipeline run --help` for the authoritative flag list. diff --git a/nemo_retriever/docs/cli/benchmarking.md b/nemo_retriever/docs/cli/benchmarking.md new file mode 100644 index 000000000..8f8bad19c --- /dev/null +++ b/nemo_retriever/docs/cli/benchmarking.md @@ -0,0 +1,98 @@ +# Benchmarking with the `retriever` CLI + +This page is the `retriever`-CLI counterpart to +`nv-ingest/docs/docs/extraction/benchmarking.md` and +`nv-ingest/tools/harness/README.md`. + +The old benchmarking workflow is driven by `tools/harness` and +`uv run nv-ingest-harness-run`. The `retriever` CLI exposes the harness (and +per-stage micro-benchmarks) as first-class subcommands, so you can run +benchmarks without `uv run` or a separate harness repo. + +## Harness (end-to-end benchmarks) + +Old: + +```bash +cd tools/harness +uv sync +uv run nv-ingest-harness-run --case=e2e --dataset=bo767 +uv run nv-ingest-harness-run --case=e2e --dataset=/path/to/your/data +``` + +New — the harness is a subcommand on the main CLI (full parity): + +```bash +retriever harness run --case=e2e --dataset=bo767 +retriever harness run --case=e2e --dataset=/path/to/your/data +``` + +Related commands (browse with `--help`): + +```bash +retriever harness --help # run, sweep, nightly, summary, compare +retriever harness run --help +retriever harness sweep --help +retriever harness nightly --help +retriever harness summary --help +retriever harness compare --help +``` + +### Harness with image / text storage + +Old: + +```bash +retriever harness run --dataset bo20 --preset single_gpu \ + --override store_images_uri=stored_images --override store_text=true +``` + +New (unchanged — this form is already the `retriever` CLI): + +```bash +retriever harness run --dataset bo20 --preset single_gpu \ + --override store_images_uri=stored_images --override store_text=true +``` + +When `store_images_uri` is a relative path it resolves to +`artifact_dir/stored_images/` per run; absolute paths and fsspec URIs +(e.g. `s3://bucket/prefix`) are passed through unchanged. + +## Per-stage micro-benchmarks + +The new CLI also exposes stage-level throughput benchmarks that had no direct +counterpart in `nv-ingest-cli`: + +```bash +retriever benchmark --help # split, extract, audio-extract, page-elements, ocr, all +retriever benchmark split --help +retriever benchmark extract --help +retriever benchmark audio-extract --help +retriever benchmark page-elements --help +retriever benchmark ocr --help +retriever benchmark all --help +``` + +Example — benchmark the PDF extraction actor: + +```bash +retriever benchmark extract ./data/pdf_corpus \ + --pdf-extract-batch-size 8 \ + --pdf-extract-actors 4 +``` + +Each benchmark reports rows/sec (or chunk rows/sec for audio) for its actor. +Use these when you want focused numbers for a single stage instead of an +end-to-end run. + +## Parity notes + +- The harness use-cases in the old docs (`--case=e2e`, `--dataset=bo767`, + `--dataset=/path/...`, `--override ...`) are preserved verbatim — only the + launcher changes (`retriever harness run …` instead of + `uv run nv-ingest-harness-run …`). +- If you have a repo-local `uv` environment, `uv run retriever harness run …` + still works. +- Stage benchmarks (`retriever benchmark …`) are net-new relative to the old + `nv-ingest-cli` examples — they are the recommended way to profile + individual actors before tuning `pipeline run` flags. diff --git a/nemo_retriever/docs/cli/cli-client-usage.md b/nemo_retriever/docs/cli/cli-client-usage.md new file mode 100644 index 000000000..2ebdb43d0 --- /dev/null +++ b/nemo_retriever/docs/cli/cli-client-usage.md @@ -0,0 +1,126 @@ +# `retriever` CLI — Client-Usage Walk-through + +This page is the `retriever`-CLI counterpart to +`nv-ingest/client/client_examples/examples/cli_client_usage.ipynb`. + +The original notebook walks through `nv-ingest-cli` by: + +1. Printing `--help`. +2. Submitting a single PDF with `extract + dedup + filter` tasks. +3. Submitting a dataset of PDFs with the same task set. + +The equivalent `retriever` workflow is shown below. You can drop these cells +into a new notebook (e.g. `retriever_client_usage.ipynb`) alongside the old +one. + +## 1. Help + +```bash +retriever --help +retriever pipeline run --help +``` + +Top-level `--help` lists the subcommand tree; `pipeline run --help` shows the +ingest-specific flags you will actually use in this walk-through. + +## 2. Submit a single PDF + +Old notebook cell: + +```bash +nv-ingest-cli \ + --doc ${SAMPLE_PDF0} \ + --task='extract:{"document_type": "pdf", "extract_method": "pdfium", "extract_text": true, "extract_images": true, "extract_tables": true, "extract_tables_method": "yolox"}' \ + --task='dedup:{"content_type": "image", "filter": true}' \ + --task='filter:{"content_type": "image", "min_size": 128, "max_aspect_ratio": 5.0, "min_aspect_ratio": 0.2, "filter": true}' \ + --client_host=${REDIS_HOST} \ + --client_port=${REDIS_PORT} \ + --output_directory=${OUTPUT_DIRECTORY_SINGLE} +``` + +New: + +```bash +retriever pipeline run "${SAMPLE_PDF0}" \ + --input-type pdf \ + --method pdfium \ + --extract-text --extract-tables --extract-charts \ + --dedup --dedup-iou-thres 0.45 \ + --store-images-uri "${OUTPUT_DIRECTORY_SINGLE}/images" \ + --strip-base64 \ + --save-intermediate "${OUTPUT_DIRECTORY_SINGLE}" +``` + +### Parity notes + +- `extract_tables_method:"yolox"` is not a CLI selector — the pipeline picks + its table/structure detectors automatically. Tables are still extracted. +- `dedup:{content_type:"image", filter:true}` maps to `--dedup` (with + `--dedup-iou-thres` for the IoU threshold). +- `filter:{content_type:"image", min_size, min/max_aspect_ratio, filter:true}` + **has no parity.** There is no image scale/aspect-ratio filter in the + `retriever` CLI today. If that matters, drop to the Python API or keep the + old `nv-ingest-cli` for that example. +- `extract_images:true` is implicitly satisfied by `--store-images-uri` + (images are extracted and persisted to the URI). + +## 3. Submit a dataset of PDFs + +Old notebook cell: + +```bash +nv-ingest-cli \ + --dataset ${BATCH_FILE} \ + --task='extract:{"document_type": "pdf", "extract_method": "pdfium", "extract_text": true, "extract_images": true, "extract_tables": true, "extract_tables_method": "yolox"}' \ + --task='dedup:{"content_type": "image", "filter": true}' \ + --task='filter:{"content_type": "image", "min_size": 128, "max_aspect_ratio": 5.0, "min_aspect_ratio": 0.2, "filter": true}' \ + --client_host=${REDIS_HOST} \ + --client_port=${REDIS_PORT} \ + --output_directory=${OUTPUT_DIRECTORY_BATCH} +``` + +New — point `retriever` at a directory of PDFs instead of a dataset JSON: + +```bash +# Assume $PDF_DIR is a directory holding your batch of PDFs. +retriever pipeline run "${PDF_DIR}" \ + --input-type pdf \ + --method pdfium \ + --extract-text --extract-tables --extract-charts \ + --dedup --dedup-iou-thres 0.45 \ + --store-images-uri "${OUTPUT_DIRECTORY_BATCH}/images" \ + --strip-base64 \ + --save-intermediate "${OUTPUT_DIRECTORY_BATCH}" +``` + +### Parity notes + +- The `dataset.json` (`sampled_files`) format and `gen_dataset.py` sampler + are not reproduced. Materialize a directory (or glob) containing the files + you want to process. +- The `--shuffle_dataset` knob is not present; set Ray block / batch sizes + via `--pdf-split-batch`, `--pdf-split-batch-size`, etc. for throughput. + +## 4. Inspect results + +```python +import pyarrow.parquet as pq +import lancedb + +# Parquet extraction dumps written by --save-intermediate: +df = pq.read_table(OUTPUT_DIRECTORY_BATCH).to_pandas() +print(df[["source_id", "text", "content_type"]].head()) + +# LanceDB rows (default table name "nv-ingest"): +db = lancedb.connect("./lancedb") +tbl = db.open_table("nv-ingest") +print(tbl.to_pandas().head()) +``` + +## Migration summary + +| Old notebook cell | New `retriever` form | Parity | +|-------------------|----------------------|--------| +| `!nv-ingest-cli --help` | `!retriever --help` (plus `retriever pipeline run --help`) | Full | +| Single-file extract + dedup + filter | `retriever pipeline run … --dedup …` | Partial — no image-size/aspect filter, `extract_tables_method` auto-selected | +| Dataset extract + dedup + filter | `retriever pipeline run …` | Partial — no `dataset.json` loader; use a directory | diff --git a/nemo_retriever/docs/cli/pdf-blueprint.md b/nemo_retriever/docs/cli/pdf-blueprint.md new file mode 100644 index 000000000..a82788201 --- /dev/null +++ b/nemo_retriever/docs/cli/pdf-blueprint.md @@ -0,0 +1,92 @@ +# PDF Blueprint — `retriever` CLI Replacement + +This page is the `retriever`-CLI counterpart to the CLI cell in +`nv-ingest/deploy/pdf-blueprint.ipynb`. + +## Original blueprint cell + +```bash +nv-ingest-cli \ + --doc nv-ingest/data/multimodal_test.pdf \ + --output_directory ./processed_docs \ + --task='extract:{"document_type": "pdf", "extract_method": "pdfium", "extract_tables": "true", "extract_images": "true", "extract_charts": "true"}' \ + --client_host=host.docker.internal \ + --client_port=7670 +``` + +This submits the blueprint's multimodal sample PDF to the running ingest +service and asks for text + tables + charts + images. + +## `retriever` equivalent + +```bash +retriever pipeline run nv-ingest/data/multimodal_test.pdf \ + --input-type pdf \ + --method pdfium \ + --extract-text --extract-tables --extract-charts \ + --store-images-uri ./processed_docs/images \ + --strip-base64 \ + --save-intermediate ./processed_docs +``` + +### What you get (end-user outcome) + +- The same multimodal content (text, table markdown, chart descriptions, + extracted images) is produced. +- Text / table / chart rows land in LanceDB at `./lancedb/nv-ingest.lance`. +- Parquet extraction rows are written under `./processed_docs/`. +- Extracted images are written under `./processed_docs/images/`, referenced by + `content_url` in the row metadata. + +### Notebook-friendly form + +To keep the notebook self-contained, prefix the shell cell with `!`: + +```bash +!retriever pipeline run nv-ingest/data/multimodal_test.pdf \ + --input-type pdf \ + --method pdfium \ + --extract-text --extract-tables --extract-charts \ + --store-images-uri ./processed_docs/images \ + --strip-base64 \ + --save-intermediate ./processed_docs +``` + +And inspect the results in the next cell: + +```python +import pyarrow.parquet as pq +import lancedb + +df = pq.read_table("./processed_docs").to_pandas() +print(df[["source_id", "content_type"]].value_counts()) + +db = lancedb.connect("./lancedb") +tbl = db.open_table("nv-ingest") +print(tbl.to_pandas().head()) +``` + +## Migrating the blueprint `pip install` cell + +The blueprint also installs `nv-ingest-client==25.9.0`. For the `retriever` +path, install `nemo-retriever` instead (see `nemo_retriever/README.md` for +current pinned versions): + +```bash +pip install "nemo-retriever==26.3.0" \ + nv-ingest-client==26.3.0 nv-ingest==26.3.0 nv-ingest-api==26.3.0 \ + pymilvus[bulk_writer,model] \ + minio \ + tritonclient \ + langchain_milvus +``` + +## Parity notes + +- `client_host=host.docker.internal` / `client_port=7670` are irrelevant here: + `retriever pipeline run` is in-process, so the blueprint no longer needs a + running `nv-ingest-ms-runtime` container for the CLI cell. +- If you still want the blueprint to hit a live service (for example to + exercise the REST API), replace the CLI cell with a `retriever online serve` + container plus `retriever online stream-pdf` for per-page NDJSON output. + Note that `retriever online submit` is currently a stub. diff --git a/nemo_retriever/docs/cli/pdf-split-tuning.md b/nemo_retriever/docs/cli/pdf-split-tuning.md new file mode 100644 index 000000000..5b2625ef4 --- /dev/null +++ b/nemo_retriever/docs/cli/pdf-split-tuning.md @@ -0,0 +1,68 @@ +# PDF Page-Batch Tuning with the `retriever` CLI + +This page is the `retriever`-CLI counterpart to the CLI example in +`nv-ingest/docs/docs/extraction/v2-api-guide.md`. + +## Background + +The old V2 API introduced per-request PDF splitting via `--api_version v2 --pdf_split_page_count N`: +PDFs larger than `N` pages were split and processed in parallel inside the +service. + +`retriever` has **no V1/V2 API concept** — the graph pipeline is always the +current generation. Page-batch parallelism is tuned directly on the pipeline +via `--pdf-split-batch` (page-batch granularity) and `--pdf-split-batch-size` +(Ray block size). + +## Replacement example + +Old (V2 API with 64 pages per chunk): + +```bash +nv-ingest-cli \ + --api_version v2 \ + --pdf_split_page_count 64 \ + --doc large_document.pdf \ + --task 'extract:{"document_type":"pdf", "extract_text":true}' \ + --output_directory ./results +``` + +New: + +```bash +retriever pipeline run large_document.pdf \ + --input-type pdf \ + --method pdfium \ + --extract-text --no-extract-tables --no-extract-charts \ + --pdf-split-batch 64 \ + --save-intermediate ./results +``` + +### What `--pdf-split-batch` does + +`--pdf-split-batch` controls how many pages are grouped into one batch passed +downstream to the OCR / page-elements / extraction actors. Smaller values give +more parallelism but more overhead; larger values amortize overhead but limit +concurrency — the same trade-off the V2 `--pdf_split_page_count` flag exposed. + +For very wide fan-out you can also increase the number of concurrent actors: + +```bash +retriever pipeline run large_document.pdf \ + --input-type pdf \ + --pdf-split-batch 64 \ + --page-elements-actors 4 \ + --ocr-actors 4 \ + --embed-actors 2 \ + --save-intermediate ./results +``` + +## Parity notes + +- The pipeline always runs through the current ingest graph — there is no + `--api_version` flag. +- `--pdf-split-batch` is a batching knob on the local Ray pipeline, not a + server-side splitter. End-user outcome (parallel extraction of a large PDF) + is preserved. +- Use `retriever pipeline run --help` to see related tuning flags + (`--pdf-split-batch-size`, `--pdf-extract-batch-size`, etc.). diff --git a/nemo_retriever/docs/cli/quickstart.md b/nemo_retriever/docs/cli/quickstart.md new file mode 100644 index 000000000..0d2a35554 --- /dev/null +++ b/nemo_retriever/docs/cli/quickstart.md @@ -0,0 +1,83 @@ +# Quick Start — `retriever` CLI + +This page is the `retriever`-CLI counterpart to the CLI section of +`nv-ingest/docs/docs/extraction/quickstart-guide.md`. + +## Replacement for the quickstart CLI example + +The original quickstart example submits a single PDF to the running service +and asks for text, tables, charts, and images: + +```bash +nv-ingest-cli \ + --doc ./data/multimodal_test.pdf \ + --output_directory ./processed_docs \ + --task='extract:{"document_type": "pdf", "extract_method": "pdfium", "extract_tables": "true", "extract_images": "true", "extract_charts": "true"}' \ + --client_host=localhost \ + --client_port=7670 +``` + +The `retriever` equivalent runs the full pipeline locally — extraction, +embedding, and LanceDB upload — and produces the same multimodal outputs: + +```bash +retriever pipeline run ./data/multimodal_test.pdf \ + --input-type pdf \ + --method pdfium \ + --extract-text --extract-tables --extract-charts \ + --store-images-uri ./processed_docs/images \ + --save-intermediate ./processed_docs +``` + +### What you get + +- Extracted text, table markdown, and chart descriptions as rows in the + LanceDB table at `./lancedb/nv-ingest.lance` (default `--lancedb-uri`). +- Per-document extraction rows as Parquet under `./processed_docs/` (from + `--save-intermediate`). +- Extracted images on disk under `./processed_docs/images/` (from + `--store-images-uri`). The `content_url` column points at these paths. +- Progress, timing, and stage-level logs on stderr. + +### Inspect the results + +```bash +ls ./processed_docs +ls ./processed_docs/images +ls ./lancedb +``` + +For programmatic access: + +```python +import pyarrow.parquet as pq +import lancedb + +df = pq.read_table("./processed_docs").to_pandas() +print(df.head()) + +db = lancedb.connect("./lancedb") +tbl = db.open_table("nv-ingest") +print(tbl.to_pandas().head()) +``` + +Or query via the Retriever Python client (same workflow as the library +quickstart in `nemo_retriever/README.md`): + +```python +from nemo_retriever.retriever import Retriever + +retriever = Retriever(lancedb_uri="lancedb", lancedb_table="nv-ingest", top_k=5) +hits = retriever.query( + "Given their activities, which animal is responsible for the typos?" +) +``` + +## Notes on running larger datasets + +- Pass a directory for batch ingestion: + `retriever pipeline run ./data/pdf_corpus --input-type pdf …`. +- For faster throughput on a multi-GPU node, keep `--run-mode batch` (default, + Ray-based) and tune `--pdf-split-batch`, `--embed-actors`, + `--embed-batch-size`, `--ocr-actors`, and `--page-elements-actors`. +- For debugging or CI, use `--run-mode inprocess` to avoid starting Ray. diff --git a/nemo_retriever/docs/cli/retriever_cli.md b/nemo_retriever/docs/cli/retriever_cli.md new file mode 100644 index 000000000..59f7bc801 --- /dev/null +++ b/nemo_retriever/docs/cli/retriever_cli.md @@ -0,0 +1,265 @@ +# Use the Retriever Command Line Interface + +This page is the `retriever`-CLI counterpart to +`nv-ingest/docs/docs/extraction/nv-ingest_cli.md` and +`nv-ingest/docs/docs/extraction/cli-reference.md`. + +The original `nv-ingest-cli` docs remain valid for service-based ingestion. +The examples below show how to obtain the same end-user outcomes with the new +`retriever` Typer app (the `retriever` executable installed with the +`nemo-retriever` package). + +> **Shape difference.** `nv-ingest-cli` is a single command that submits +> `--task ...` definitions to a running REST service. `retriever` is a +> multi-subcommand app. The main replacement for document ingestion is +> `retriever pipeline run INPUT_PATH`, which runs the full graph pipeline +> locally (in-process or Ray) and writes rows to LanceDB + optional Parquet. + +To check the installed version: + +```bash +retriever --version +``` + +To list the top-level subcommands: + +```bash +retriever --help +``` + +To see the full `pipeline run` flag surface (the closest analogue to the old +single-command CLI): + +```bash +retriever pipeline run --help +``` + +## Examples + +### Example: Text / PDF file — extract with defaults + +Old (`nv-ingest-cli`, returns full metadata over the service): + +```bash +nv-ingest-cli \ + --doc ./data/test.pdf \ + --client_host=localhost \ + --client_port=7670 +``` + +New (`retriever`, local pipeline; text, tables, and charts are extracted by +default): + +```bash +retriever pipeline run ./data/test.pdf \ + --input-type pdf \ + --run-mode inprocess \ + --save-intermediate ./processed_docs +``` + +**Parity note.** Results are written to LanceDB (`./lancedb` by default) and, +with `--save-intermediate`, to a Parquet file under `./processed_docs`. +The old per-content-type `*.metadata.json` tree is not produced; use +`pyarrow.parquet.read_table` or LanceDB queries to inspect rows. + +### Example: PDF with splitting only + +The old `--task='split'` example submitted a split-only job to the service: + +```bash +nv-ingest-cli \ + --doc ./data/test.pdf \ + --output_directory ./processed_docs \ + --task='split' \ + --client_host=localhost \ + --client_port=7670 +``` + +With `retriever`, splitting is intrinsic to the pipeline. Control text-level +chunking via `--text-chunk`, and control PDF page-batch sizing via +`--pdf-split-batch`: + +```bash +retriever pipeline run ./data/test.pdf \ + --input-type pdf \ + --no-extract-tables --no-extract-charts \ + --text-chunk --text-chunk-max-tokens 512 --text-chunk-overlap-tokens 64 \ + --save-intermediate ./processed_docs +``` + +**Parity note.** There is no "split-only, no extraction" mode. If you only +care about chunk boundaries, run the pipeline with extraction narrowed to text +and inspect the resulting chunks. + +### Example: PDF with splitting and extraction + +Old: + +```bash +nv-ingest-cli \ + --doc ./data/test.pdf \ + --output_directory ./processed_docs \ + --task='extract:{"document_type": "pdf", "extract_method": "pdfium"}' \ + --task='extract:{"document_type": "docx", "extract_method": "python_docx"}' \ + --task='split' \ + --client_host=localhost \ + --client_port=7670 +``` + +New — run once per input type (PDF and docx/pptx use different `--input-type`): + +```bash +retriever pipeline run ./data/test.pdf \ + --input-type pdf \ + --method pdfium \ + --text-chunk --text-chunk-max-tokens 512 \ + --save-intermediate ./processed_docs +``` + +```bash +retriever pipeline run ./data/test.docx \ + --input-type doc \ + --text-chunk --text-chunk-max-tokens 512 \ + --save-intermediate ./processed_docs +``` + +**Parity note.** `--input-type doc` matches `*.docx` and `*.pptx` +(see `src/nemo_retriever/pipeline/__main__.py::_resolve_file_patterns`). Mixed +PDF + docx in a single invocation is not supported; invoke once per type. + +### Example: PDF with a custom page-batch size + +Old: + +```bash +nv-ingest-cli \ + --doc ./data/test.pdf \ + --output_directory ./processed_docs \ + --task='extract:{"document_type": "pdf", "extract_method": "pdfium", "extract_text": "true"}' \ + --pdf_split_page_count 64 \ + --api_version v2 \ + --client_host=localhost \ + --client_port=7670 +``` + +New: + +```bash +retriever pipeline run ./data/test.pdf \ + --input-type pdf \ + --method pdfium \ + --extract-text --no-extract-tables --no-extract-charts \ + --pdf-split-batch 64 \ + --save-intermediate ./processed_docs +``` + +**Parity note.** There is no v1/v2 concept in `retriever`; `--pdf-split-batch` +is the batching knob used by the Ray pipeline. + +### Example: Caption images + +Old: + +```bash +nv-ingest-cli \ + --doc ./data/test.pdf \ + --task='extract:{"document_type": "pdf", "extract_method": "pdfium", "extract_images": "true"}' \ + --task='caption:{"prompt": "Caption the content of this image:", "reasoning": true}' \ + --client_host=localhost \ + --client_port=7670 +``` + +New: + +```bash +retriever pipeline run ./data/test.pdf \ + --input-type pdf \ + --method pdfium \ + --caption \ + --caption-model-name nvidia/NVIDIA-Nemotron-Nano-VL-8B-V2 \ + --caption-invoke-url https://integrate.api.nvidia.com/v1/chat/completions \ + --api-key "${NVIDIA_API_KEY}" \ + --store-images-uri ./processed_docs/images \ + --save-intermediate ./processed_docs +``` + +**Parity gaps.** + +- The `reasoning: true` option is not exposed as a CLI flag. +- A custom `prompt` is not exposed either; the caption stage uses its default + prompt. For prompt or reasoning control, drop to the Python API + (`nemo_retriever.ingestor.Ingestor.caption(...)`). +- If you do not set a caption endpoint / local GPU profile, the caption stage + is skipped at runtime — matching the old behavior. + +### Example: Process a directory of documents + +Old (dataset file with sampled entries): + +```bash +nv-ingest-cli \ + --dataset dataset.json \ + --output_directory ./processed_docs \ + --task='extract:{"document_type": "pdf", "extract_method": "pdfium"}' \ + --client_host=localhost \ + --client_port=7670 +``` + +New (point `retriever` at a directory; it globs files for the given +`--input-type`): + +```bash +retriever pipeline run ./data/pdf_corpus \ + --input-type pdf \ + --method pdfium \ + --save-intermediate ./processed_docs +``` + +**Parity gap.** The `dataset.json` (`sampled_files`) schema and +`gen_dataset.py` sampler are not reproduced. Materialize a directory (or glob) +that contains the files you want. + +### Example: Upload extracted images to object storage + +Old: extraction + a MinIO upload configured on the service side. + +```bash +nv-ingest-cli \ + --doc ./data/test.pdf \ + --output_directory ./processed_docs \ + --task='extract:{"document_type": "pdf", "extract_method": "pdfium"}' \ + --client_host=localhost \ + --client_port=7670 +``` + +New — use the first-class `--store-images-uri` flag (local path, `s3://…`, or +any fsspec URI): + +```bash +retriever pipeline run ./data/test.pdf \ + --input-type pdf \ + --method pdfium \ + --store-images-uri s3://my-bucket/images \ + --strip-base64 \ + --save-intermediate ./processed_docs +``` + +**Parity note.** `--strip-base64` removes inline base64 from rows after the +image has been persisted, so downstream consumers follow `content_url` +references — analogous to `--save_images_separately` on the old CLI. + +## Where results live + +- **LanceDB table.** `--lancedb-uri lancedb` (default). Query via + `retriever recall vdb-recall …` or the `nemo_retriever.retriever.Retriever` + Python class. +- **Parquet (optional).** `--save-intermediate ` writes the extraction + DataFrame for inspection. +- **Images (optional).** `--store-images-uri ` writes extracted images to + a local path or object store. + +## Errors and exit codes + +`retriever pipeline run` exits **0** on success, **non-zero** on Typer +validation errors (bad `--input-type`, missing `INPUT_PATH`, etc.) or pipeline +failures. Use `--debug` or `--log-file ` for detailed diagnostics. diff --git a/nemo_retriever/docs/cli/smoke-test.md b/nemo_retriever/docs/cli/smoke-test.md new file mode 100644 index 000000000..e5b5f4cd0 --- /dev/null +++ b/nemo_retriever/docs/cli/smoke-test.md @@ -0,0 +1,162 @@ +# Multi-format Smoke Test with the `retriever` CLI + +This page is the `retriever`-CLI counterpart to +`nv-ingest/api/api_tests/smoke_test.sh`. + +The original script loops over formats (pdf, jpeg, png, tiff, bmp, wav, pptx, +docx) and submits each file to the running ingest service via +`nv-ingest-cli`, counting PASS/FAIL and printing a table. + +With `retriever`, each format is driven by a single `--input-type` dispatch. +The same `exit_code` logic works — `retriever` returns non-zero on failure. + +## Replacement script + +```bash +#!/bin/bash +# smoke_test_retriever.sh — multi-format parity smoke test for `retriever`. + +set -u + +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" +BASE_DIR="$( dirname "$SCRIPT_DIR" )" + +OUTPUT_DIR="./processed/smoke_test_retriever" +DATA_DIR="${BASE_DIR}/../data" +mkdir -p "$OUTPUT_DIR" + +LOG_DIR="./extraction_logs_retriever" +mkdir -p "$LOG_DIR" +LOG_FILE="$LOG_DIR/extraction_test_$(date +%Y%m%d_%H%M%S).log" +TEMP_LOG_DIR="$LOG_DIR/temp_logs" +mkdir -p "$TEMP_LOG_DIR" + +echo "====== Multimodal Extraction Test (retriever) - $(date) ======" | tee -a "$LOG_FILE" + +run_extraction() { + local file_ext=$1 + local input_type=$2 + local test_file="$DATA_DIR/multimodal_test.$file_ext" + local temp_log="$TEMP_LOG_DIR/${file_ext}_extraction.log" + local result_file="$TEMP_LOG_DIR/${file_ext}_result" + + { + echo "====== Testing $file_ext (input-type=$input_type) ======" + echo "File: $test_file" + echo "Started at: $(date)" + + if [ ! -f "$test_file" ]; then + echo "ERROR: File $test_file not found. Skipping test." + echo "1" > "$result_file" + return 1 + fi + + start_time=$(date +%s) + + retriever pipeline run "$test_file" \ + --input-type "$input_type" \ + --run-mode inprocess \ + --extract-text --extract-tables --extract-charts --extract-infographics \ + --save-intermediate "$OUTPUT_DIR/${file_ext}" \ + --lancedb-uri "$OUTPUT_DIR/lancedb_${file_ext}" \ + 2>&1 + + exit_code=$? + end_time=$(date +%s) + duration=$((end_time - start_time)) + + echo "Exit code: $exit_code" + echo "Duration: $duration seconds" + echo "--------------------------------------------------" + + echo "$exit_code" > "$result_file" + } > "$temp_log" 2>&1 + + exit_code=$(cat "$result_file") + return $exit_code +} + +declare -A formats=( + ["pdf"]="pdf" + ["jpeg"]="image" + ["png"]="image" + ["tiff"]="image" + ["bmp"]="image" + ["wav"]="audio" + ["pptx"]="doc" + ["docx"]="doc" +) + +total_formats=0 +successful_formats=0 +declare -A test_results=() +declare -A test_durations=() + +for ext in "${!formats[@]}"; do + input_type="${formats[$ext]}" + ((total_formats++)) + + run_extraction "$ext" "$input_type" & + pid=$! + pids+=($pid) + format_pids["$pid"]="$ext" +done + +for pid in "${pids[@]}"; do + wait $pid + exit_code=$? + format="${format_pids[$pid]}" + + temp_log="$TEMP_LOG_DIR/${format}_extraction.log" + duration="N/A" + if [ -f "$temp_log" ]; then + duration=$(grep "Duration:" "$temp_log" | awk '{print $2}') + test_durations["$format"]="$duration" + fi + + if [ $exit_code -eq 0 ]; then + ((successful_formats++)) + test_results["$format"]="PASS" + else + test_results["$format"]="FAIL" + echo "====== FAILED: $format extraction ======" | tee -a "$LOG_FILE" + cat "$temp_log" | tee -a "$LOG_FILE" + fi +done + +echo "====== SUMMARY ======" | tee -a "$LOG_FILE" +echo "Total formats tested: $total_formats" | tee -a "$LOG_FILE" +echo "Successful extractions: $successful_formats" | tee -a "$LOG_FILE" +echo "Failed extractions: $((total_formats - successful_formats))" | tee -a "$LOG_FILE" + +if [ $successful_formats -eq $total_formats ]; then + echo "All tests passed successfully!" + exit 0 +else + exit 1 +fi +``` + +## Format-to-`--input-type` mapping + +| File extension | `--input-type` | +|----------------|----------------| +| `.pdf` | `pdf` | +| `.jpg`, `.jpeg`, `.png`, `.tiff`, `.bmp` | `image` | +| `.mp3`, `.wav`, `.m4a` | `audio` | +| `.docx`, `.pptx` | `doc` | +| `.txt` | `txt` | +| `.html` | `html` | + +(See `nemo_retriever/src/nemo_retriever/pipeline/__main__.py::_resolve_file_patterns`.) + +## Parity notes + +- The old script required a running ingest service at `localhost:7670`. The + `retriever` run is self-contained — no service needed. This is usually an + improvement for CI smoke tests. +- PASS / FAIL semantics and the per-format log layout are preserved. +- `--run-mode inprocess` is recommended for smoke tests: it skips Ray startup + and gives a cleaner failure mode. +- Each format gets its own `--lancedb-uri` so parallel runs do not contend on + the same LanceDB directory. diff --git a/nemo_retriever/src/nemo_retriever/adapters/cli/main.py b/nemo_retriever/src/nemo_retriever/adapters/cli/main.py index 1d2ad5bfc..4603980d3 100644 --- a/nemo_retriever/src/nemo_retriever/adapters/cli/main.py +++ b/nemo_retriever/src/nemo_retriever/adapters/cli/main.py @@ -17,6 +17,7 @@ from nemo_retriever.local import app as local_app from nemo_retriever.online import __main__ as online_main from nemo_retriever.pdf import app as pdf_app +from nemo_retriever.pipeline import __main__ as pipeline_main from nemo_retriever.recall import app as recall_app from nemo_retriever.txt import __main__ as txt_main from nemo_retriever.vector_store import app as vector_store_app @@ -37,6 +38,7 @@ app.add_typer(txt_main.app, name="txt") app.add_typer(html_main.app, name="html") app.add_typer(online_main.app, name="online") +app.add_typer(pipeline_main.app, name="pipeline") def _version_callback(value: bool) -> None: diff --git a/nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py b/nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py index 3e1091778..30bd29b53 100644 --- a/nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py +++ b/nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py @@ -2,767 +2,27 @@ # All rights reserved. # SPDX-License-Identifier: Apache-2.0 -"""Graph-based ingestion pipeline using the operator graph API directly. +"""Backward-compat shim for the graph ingestion pipeline. -This script uses :class:`~nemo_retriever.graph_ingestor.GraphIngestor` which -builds an operator :class:`~nemo_retriever.graph.Graph` via -:func:`~nemo_retriever.graph.ingestor_runtime.build_graph` (Ray Data) -or :func:`~nemo_retriever.graph.ingestor_runtime.build_inprocess_graph` -(single-process pandas) and then calls the appropriate executor. +The implementation was moved to :mod:`nemo_retriever.pipeline` and is exposed +as the ``retriever pipeline run`` CLI subcommand. -Run with:: +This module re-exports the same Typer :data:`app` and keeps the +``python -m nemo_retriever.examples.graph_pipeline `` entry point +working so existing callers (notably +:mod:`nemo_retriever.harness.run`) do not need to change. - source /opt/retriever_runtime/bin/activate - python -m nemo_retriever.examples.graph_pipeline [OPTIONS] +New code should invoke the pipeline via one of the following: -Examples:: - - # Batch mode (Ray) with PDF extraction + embedding - python -m nemo_retriever.examples.graph_pipeline /data/pdfs \\ - --run-mode batch \\ - --embed-invoke-url http://localhost:8000/v1 - - # In-process mode (no Ray) for quick local testing - python -m nemo_retriever.examples.graph_pipeline /data/pdfs \\ - --run-mode inprocess \\ - --ocr-invoke-url http://localhost:9000/v1 - - # Save extraction Parquet for full-page markdown (e.g. page index + export) - python -m nemo_retriever.examples.graph_pipeline /data/pdfs \\ - --lancedb-uri lancedb \\ - --save-intermediate /path/to/extracted_parquet_dir +* ``retriever pipeline run [OPTIONS]`` +* ``python -m nemo_retriever.pipeline [OPTIONS]`` +* ``from nemo_retriever.pipeline import app`` (Typer app) or + ``from nemo_retriever.pipeline import run`` (command callable) """ from __future__ import annotations -import json -import logging -import os -import time -from pathlib import Path -from typing import Optional, TextIO - -import typer - -from nemo_retriever.audio import asr_params_from_env -from nemo_retriever.graph_ingestor import GraphIngestor -from nemo_retriever.params import AudioChunkParams -from nemo_retriever.params import CaptionParams -from nemo_retriever.params import DedupParams -from nemo_retriever.params import EmbedParams -from nemo_retriever.params import ExtractParams -from nemo_retriever.params import StoreParams -from nemo_retriever.params import TextChunkParams -from nemo_retriever.model import VL_EMBED_MODEL, VL_RERANK_MODEL -from nemo_retriever.params.models import BatchTuningParams -from nemo_retriever.utils.input_files import resolve_input_patterns -from nemo_retriever.utils.remote_auth import resolve_remote_api_key -from nemo_retriever.vector_store.lancedb_store import handle_lancedb - -logger = logging.getLogger(__name__) -app = typer.Typer() - -LANCEDB_URI = "lancedb" -LANCEDB_TABLE = "nv-ingest" - - -# --------------------------------------------------------------------------- -# Logging helpers -# --------------------------------------------------------------------------- - - -class _TeeStream: - def __init__(self, primary: TextIO, mirror: TextIO) -> None: - self._primary = primary - self._mirror = mirror - - def write(self, data: str) -> int: - self._primary.write(data) - self._mirror.write(data) - return len(data) - - def flush(self) -> None: - self._primary.flush() - self._mirror.flush() - - def isatty(self) -> bool: - return bool(getattr(self._primary, "isatty", lambda: False)()) - - def fileno(self) -> int: - return int(getattr(self._primary, "fileno")()) - - def writable(self) -> bool: - return bool(getattr(self._primary, "writable", lambda: True)()) - - @property - def encoding(self) -> str: - return str(getattr(self._primary, "encoding", "utf-8")) - - -def _configure_logging(log_file: Optional[Path], *, debug: bool = False) -> tuple[Optional[TextIO], TextIO, TextIO]: - original_stdout = os.sys.stdout - original_stderr = os.sys.stderr - log_level = logging.DEBUG if debug else logging.INFO - if log_file is None: - logging.basicConfig( - level=log_level, - format="%(asctime)s %(levelname)s %(name)s: %(message)s", - force=True, - ) - return None, original_stdout, original_stderr - - target = Path(log_file).expanduser().resolve() - target.parent.mkdir(parents=True, exist_ok=True) - fh = open(target, "a", encoding="utf-8", buffering=1) - os.sys.stdout = _TeeStream(os.sys.__stdout__, fh) - os.sys.stderr = _TeeStream(os.sys.__stderr__, fh) - logging.basicConfig( - level=log_level, - format="%(asctime)s %(levelname)s %(name)s: %(message)s", - handlers=[logging.StreamHandler(os.sys.stdout)], - force=True, - ) - logger.info("Writing combined pipeline logs to %s", str(target)) - return fh, original_stdout, original_stderr - - -def _ensure_lancedb_table(uri: str, table_name: str) -> None: - from nemo_retriever.vector_store.lancedb_utils import lancedb_schema - import lancedb - import pyarrow as pa - - Path(uri).mkdir(parents=True, exist_ok=True) - db = lancedb.connect(uri) - try: - db.open_table(table_name) - return - except Exception: - pass - schema = lancedb_schema() - empty = pa.table({f.name: [] for f in schema}, schema=schema) - db.create_table(table_name, data=empty, schema=schema, mode="create") - - -def _write_runtime_summary( - runtime_metrics_dir: Optional[Path], - runtime_metrics_prefix: Optional[str], - payload: dict[str, object], -) -> None: - if runtime_metrics_dir is None and not runtime_metrics_prefix: - return - - target_dir = Path(runtime_metrics_dir or Path.cwd()).expanduser().resolve() - target_dir.mkdir(parents=True, exist_ok=True) - prefix = (runtime_metrics_prefix or "run").strip() or "run" - target = target_dir / f"{prefix}.runtime.summary.json" - target.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8") - - -def _count_input_units(result_df) -> int: - if "source_id" in result_df.columns: - return int(result_df["source_id"].nunique()) - if "source_path" in result_df.columns: - return int(result_df["source_path"].nunique()) - return int(len(result_df.index)) - - -def _resolve_file_patterns(input_path: Path, input_type: str) -> list[str]: - import glob as _glob - - input_path = Path(input_path) - if input_path.is_file(): - return [str(input_path)] - if not input_path.is_dir(): - raise typer.BadParameter(f"Path does not exist: {input_path}") - - if input_type not in {"pdf", "doc", "txt", "html", "image", "audio"}: - raise typer.BadParameter(f"Unsupported --input-type: {input_type!r}") - - patterns = resolve_input_patterns(input_path, input_type) - matched = [p for p in patterns if _glob.glob(p, recursive=True)] - if not matched: - raise typer.BadParameter(f"No files found for input_type={input_type!r} in {input_path}") - return matched - - -# --------------------------------------------------------------------------- -# CLI command -# --------------------------------------------------------------------------- - - -@app.command() -def main( - ctx: typer.Context, - input_path: Path = typer.Argument( - ..., - help="File or directory of documents to ingest.", - path_type=Path, - ), - run_mode: str = typer.Option( - "batch", - "--run-mode", - help="Execution mode: 'batch' (Ray Data) or 'inprocess' (pandas, no Ray).", - ), - debug: bool = typer.Option(False, "--debug/--no-debug", help="Enable debug-level logging."), - dpi: int = typer.Option(300, "--dpi", min=72, help="Render DPI for PDF page images."), - input_type: str = typer.Option( - "pdf", "--input-type", help="Input type: 'pdf', 'doc', 'txt', 'html', 'image', or 'audio'." - ), - method: str = typer.Option("pdfium", "--method", help="PDF text extraction method."), - extract_text: bool = typer.Option(True, "--extract-text/--no-extract-text"), - extract_tables: bool = typer.Option(True, "--extract-tables/--no-extract-tables"), - extract_charts: bool = typer.Option(True, "--extract-charts/--no-extract-charts"), - extract_infographics: bool = typer.Option(False, "--extract-infographics/--no-extract-infographics"), - extract_page_as_image: bool = typer.Option(True, "--extract-page-as-image/--no-extract-page-as-image"), - use_graphic_elements: bool = typer.Option(False, "--use-graphic-elements"), - use_table_structure: bool = typer.Option(False, "--use-table-structure"), - table_output_format: Optional[str] = typer.Option(None, "--table-output-format"), - # Remote endpoints - api_key: Optional[str] = typer.Option(None, "--api-key", help="Bearer token for remote NIM endpoints."), - page_elements_invoke_url: Optional[str] = typer.Option(None, "--page-elements-invoke-url"), - ocr_invoke_url: Optional[str] = typer.Option(None, "--ocr-invoke-url"), - graphic_elements_invoke_url: Optional[str] = typer.Option(None, "--graphic-elements-invoke-url"), - table_structure_invoke_url: Optional[str] = typer.Option(None, "--table-structure-invoke-url"), - embed_invoke_url: Optional[str] = typer.Option(None, "--embed-invoke-url"), - # Embedding - embed_model_name: str = typer.Option(VL_EMBED_MODEL, "--embed-model-name"), - embed_modality: str = typer.Option("text", "--embed-modality"), - embed_granularity: str = typer.Option("element", "--embed-granularity"), - text_elements_modality: Optional[str] = typer.Option(None, "--text-elements-modality"), - structured_elements_modality: Optional[str] = typer.Option(None, "--structured-elements-modality"), - # Dedup / caption - dedup: Optional[bool] = typer.Option(None, "--dedup/--no-dedup"), - dedup_iou_threshold: float = typer.Option(0.45, "--dedup-iou-threshold"), - caption: bool = typer.Option(False, "--caption/--no-caption"), - caption_invoke_url: Optional[str] = typer.Option(None, "--caption-invoke-url"), - caption_model_name: str = typer.Option("nvidia/NVIDIA-Nemotron-Nano-12B-v2-VL-BF16", "--caption-model-name"), - caption_device: Optional[str] = typer.Option(None, "--caption-device"), - caption_context_text_max_chars: int = typer.Option(0, "--caption-context-text-max-chars"), - caption_gpu_memory_utilization: float = typer.Option(0.5, "--caption-gpu-memory-utilization"), - caption_gpus_per_actor: Optional[float] = typer.Option(None, "--caption-gpus-per-actor", max=1.0), - caption_temperature: float = typer.Option(1.0, "--caption-temperature"), - caption_top_p: Optional[float] = typer.Option(None, "--caption-top-p"), - caption_max_tokens: int = typer.Option(1024, "--caption-max-tokens"), - # Text chunking - store_images_uri: Optional[str] = typer.Option( - None, "--store-images-uri", help="Store extracted images to this URI." - ), - store_text: bool = typer.Option(False, "--store-text/--no-store-text", help="Also store extracted text."), - strip_base64: bool = typer.Option(True, "--strip-base64/--no-strip-base64", help="Strip base64 after storing."), - text_chunk: bool = typer.Option(False, "--text-chunk"), - text_chunk_max_tokens: Optional[int] = typer.Option(None, "--text-chunk-max-tokens"), - text_chunk_overlap_tokens: Optional[int] = typer.Option(None, "--text-chunk-overlap-tokens"), - # Ray / batch tuning - # NOTE: *_gpus_per_actor defaults are None (not 0.0) so we can distinguish - # "not set → use heuristic" from "explicitly 0 → no GPU". Other tuning - # defaults use 0/0.0 because those values are never valid explicit choices. - ray_address: Optional[str] = typer.Option(None, "--ray-address"), - ray_log_to_driver: bool = typer.Option(True, "--ray-log-to-driver/--no-ray-log-to-driver"), - ocr_actors: Optional[int] = typer.Option(0, "--ocr-actors"), - ocr_batch_size: Optional[int] = typer.Option(0, "--ocr-batch-size"), - ocr_cpus_per_actor: Optional[float] = typer.Option(0.0, "--ocr-cpus-per-actor"), - ocr_gpus_per_actor: Optional[float] = typer.Option(None, "--ocr-gpus-per-actor", max=1.0), - page_elements_actors: Optional[int] = typer.Option(0, "--page-elements-actors"), - page_elements_batch_size: Optional[int] = typer.Option(0, "--page-elements-batch-size"), - page_elements_cpus_per_actor: Optional[float] = typer.Option(0.0, "--page-elements-cpus-per-actor"), - page_elements_gpus_per_actor: Optional[float] = typer.Option(None, "--page-elements-gpus-per-actor", max=1.0), - embed_actors: Optional[int] = typer.Option(0, "--embed-actors"), - embed_batch_size: Optional[int] = typer.Option(0, "--embed-batch-size"), - embed_cpus_per_actor: Optional[float] = typer.Option(0.0, "--embed-cpus-per-actor"), - embed_gpus_per_actor: Optional[float] = typer.Option(None, "--embed-gpus-per-actor", max=1.0), - pdf_split_batch_size: int = typer.Option(1, "--pdf-split-batch-size", min=1), - pdf_extract_batch_size: Optional[int] = typer.Option(0, "--pdf-extract-batch-size"), - pdf_extract_tasks: Optional[int] = typer.Option(0, "--pdf-extract-tasks"), - pdf_extract_cpus_per_task: Optional[float] = typer.Option(0.0, "--pdf-extract-cpus-per-task"), - nemotron_parse_actors: Optional[int] = typer.Option(0, "--nemotron-parse-actors"), - nemotron_parse_gpus_per_actor: Optional[float] = typer.Option( - None, "--nemotron-parse-gpus-per-actor", min=0.0, max=1.0 - ), - nemotron_parse_batch_size: Optional[int] = typer.Option(0, "--nemotron-parse-batch-size"), - # LanceDB / evaluation - lancedb_uri: str = typer.Option(LANCEDB_URI, "--lancedb-uri"), - save_intermediate: Optional[Path] = typer.Option( - None, - "--save-intermediate", - help="Directory to write extraction results as Parquet (for full-page markdown / page index).", - path_type=Path, - file_okay=False, - dir_okay=True, - ), - hybrid: bool = typer.Option(False, "--hybrid/--no-hybrid"), - query_csv: Path = typer.Option("./data/bo767_query_gt.csv", "--query-csv", path_type=Path), - recall_match_mode: str = typer.Option("pdf_page", "--recall-match-mode"), - audio_match_tolerance_secs: float = typer.Option(2.0, "--audio-match-tolerance-secs", min=0.0), - segment_audio: bool = typer.Option(False, "--segment-audio/--no-segment-audio"), - audio_split_type: str = typer.Option("size", "--audio-split-type"), - audio_split_interval: int = typer.Option(500000, "--audio-split-interval", min=1), - evaluation_mode: str = typer.Option("recall", "--evaluation-mode"), - reranker: Optional[bool] = typer.Option(False, "--reranker/--no-reranker"), - reranker_model_name: str = typer.Option(VL_RERANK_MODEL, "--reranker-model-name"), - beir_loader: Optional[str] = typer.Option(None, "--beir-loader"), - beir_dataset_name: Optional[str] = typer.Option(None, "--beir-dataset-name"), - beir_split: str = typer.Option("test", "--beir-split"), - beir_query_language: Optional[str] = typer.Option(None, "--beir-query-language"), - beir_doc_id_field: str = typer.Option("pdf_basename", "--beir-doc-id-field"), - beir_k: list[int] = typer.Option([], "--beir-k"), - recall_details: bool = typer.Option(True, "--recall-details/--no-recall-details"), - runtime_metrics_dir: Optional[Path] = typer.Option(None, "--runtime-metrics-dir", path_type=Path), - runtime_metrics_prefix: Optional[str] = typer.Option(None, "--runtime-metrics-prefix"), - detection_summary_file: Optional[Path] = typer.Option(None, "--detection-summary-file", path_type=Path), - log_file: Optional[Path] = typer.Option(None, "--log-file", path_type=Path, dir_okay=False), -) -> None: - _ = ctx - log_handle, original_stdout, original_stderr = _configure_logging(log_file, debug=bool(debug)) - try: - if run_mode not in {"batch", "inprocess"}: - raise ValueError(f"Unsupported --run-mode: {run_mode!r}") - if recall_match_mode not in {"pdf_page", "pdf_only", "audio_segment"}: - raise ValueError(f"Unsupported --recall-match-mode: {recall_match_mode!r}") - if audio_split_type not in {"size", "time", "frame"}: - raise ValueError(f"Unsupported --audio-split-type: {audio_split_type!r}") - if evaluation_mode not in {"recall", "beir"}: - raise ValueError(f"Unsupported --evaluation-mode: {evaluation_mode!r}") - - if run_mode == "batch": - os.environ["RAY_LOG_TO_DRIVER"] = "1" if ray_log_to_driver else "0" - - lancedb_uri = str(Path(lancedb_uri).expanduser().resolve()) - _ensure_lancedb_table(lancedb_uri, LANCEDB_TABLE) - - remote_api_key = resolve_remote_api_key(api_key) - extract_remote_api_key = remote_api_key - embed_remote_api_key = remote_api_key - caption_remote_api_key = remote_api_key - - # Warn if remote URLs configured without an API key - if ( - any( - ( - page_elements_invoke_url, - ocr_invoke_url, - graphic_elements_invoke_url, - table_structure_invoke_url, - embed_invoke_url, - ) - ) - and remote_api_key is None - ): - logger.warning("Remote endpoint URL(s) were configured without an API key.") - - # Zero out GPU fractions when a remote URL replaces the local model - if page_elements_invoke_url and float(page_elements_gpus_per_actor or 0.0) != 0.0: - logger.warning("Forcing page-elements GPUs to 0.0 because --page-elements-invoke-url is set.") - page_elements_gpus_per_actor = 0.0 - if ocr_invoke_url and float(ocr_gpus_per_actor or 0.0) != 0.0: - logger.warning("Forcing OCR GPUs to 0.0 because --ocr-invoke-url is set.") - ocr_gpus_per_actor = 0.0 - if embed_invoke_url and float(embed_gpus_per_actor or 0.0) != 0.0: - logger.warning("Forcing embed GPUs to 0.0 because --embed-invoke-url is set.") - embed_gpus_per_actor = 0.0 - - file_patterns = _resolve_file_patterns(Path(input_path), input_type) - - # ------------------------------------------------------------------ - # Build extraction params - # ------------------------------------------------------------------ - extract_batch_tuning = BatchTuningParams( - **{ - k: v - for k, v in { - "pdf_split_batch_size": pdf_split_batch_size, - "pdf_extract_batch_size": pdf_extract_batch_size or None, - "pdf_extract_workers": pdf_extract_tasks or None, - "pdf_extract_num_cpus": pdf_extract_cpus_per_task or None, - "page_elements_batch_size": page_elements_batch_size or None, - "page_elements_workers": page_elements_actors or None, - "page_elements_cpus_per_actor": page_elements_cpus_per_actor or None, - "gpu_page_elements": ( - 0.0 - if page_elements_invoke_url - else (page_elements_gpus_per_actor if page_elements_gpus_per_actor is not None else None) - ), - "ocr_inference_batch_size": ocr_batch_size or None, - "ocr_workers": ocr_actors or None, - "ocr_cpus_per_actor": ocr_cpus_per_actor or None, - "gpu_ocr": ( - 0.0 if ocr_invoke_url else (ocr_gpus_per_actor if ocr_gpus_per_actor is not None else None) - ), - "nemotron_parse_batch_size": nemotron_parse_batch_size or None, - "nemotron_parse_workers": nemotron_parse_actors or None, - "gpu_nemotron_parse": ( - nemotron_parse_gpus_per_actor if nemotron_parse_gpus_per_actor is not None else None - ), - }.items() - if v is not None - } - ) - extract_params = ExtractParams( - **{ - k: v - for k, v in { - "method": method, - "dpi": int(dpi), - "extract_text": extract_text, - "extract_tables": extract_tables, - "extract_charts": extract_charts, - "extract_infographics": extract_infographics, - "extract_page_as_image": extract_page_as_image, - "api_key": extract_remote_api_key, - "page_elements_invoke_url": page_elements_invoke_url, - "ocr_invoke_url": ocr_invoke_url, - "graphic_elements_invoke_url": graphic_elements_invoke_url, - "table_structure_invoke_url": table_structure_invoke_url, - "use_graphic_elements": use_graphic_elements, - "use_table_structure": use_table_structure, - "table_output_format": table_output_format, - "inference_batch_size": page_elements_batch_size or None, - "batch_tuning": extract_batch_tuning, - }.items() - if v is not None - } - ) - - # ------------------------------------------------------------------ - # Build embedding params - # ------------------------------------------------------------------ - embed_batch_tuning = BatchTuningParams( - **{ - k: v - for k, v in { - "embed_batch_size": embed_batch_size or None, - "embed_workers": embed_actors or None, - "embed_cpus_per_actor": embed_cpus_per_actor or None, - "gpu_embed": ( - 0.0 - if embed_invoke_url - else (embed_gpus_per_actor if embed_gpus_per_actor is not None else None) - ), - }.items() - if v is not None - } - ) - embed_params = EmbedParams( - **{ - k: v - for k, v in { - "model_name": embed_model_name, - "embed_invoke_url": embed_invoke_url, - "api_key": embed_remote_api_key, - "embed_modality": embed_modality, - "text_elements_modality": text_elements_modality, - "structured_elements_modality": structured_elements_modality, - "embed_granularity": embed_granularity, - "batch_tuning": embed_batch_tuning, - "inference_batch_size": embed_batch_size or None, - }.items() - if v is not None - } - ) - text_chunk_params = TextChunkParams( - max_tokens=text_chunk_max_tokens or 1024, - overlap_tokens=text_chunk_overlap_tokens if text_chunk_overlap_tokens is not None else 150, - ) - - # ------------------------------------------------------------------ - # Build GraphIngestor and configure pipeline stages - # ------------------------------------------------------------------ - logger.info("Building graph pipeline (run_mode=%s) for %s ...", run_mode, input_path) - - node_overrides = {} - if caption_gpus_per_actor is not None: - node_overrides["CaptionActor"] = {"num_gpus": caption_gpus_per_actor} - - ingestor = GraphIngestor(run_mode=run_mode, ray_address=ray_address, node_overrides=node_overrides or None) - ingestor = ingestor.files(file_patterns) - - # Extraction stage - if input_type == "txt": - ingestor = ingestor.extract_txt(text_chunk_params) - elif input_type == "html": - ingestor = ingestor.extract_html(text_chunk_params) - elif input_type == "image": - ingestor = ingestor.extract_image_files(extract_params) - elif input_type == "audio": - asr_params = asr_params_from_env().model_copy(update={"segment_audio": bool(segment_audio)}) - ingestor = ingestor.extract_audio( - params=AudioChunkParams(split_type=audio_split_type, split_interval=int(audio_split_interval)), - asr_params=asr_params, - ) - else: - # "pdf" or "doc" - ingestor = ingestor.extract(extract_params) - - # Optional post-extraction stages - enable_text_chunk = text_chunk or text_chunk_max_tokens is not None or text_chunk_overlap_tokens is not None - if enable_text_chunk: - ingestor = ingestor.split(text_chunk_params) - - enable_caption = caption or caption_invoke_url is not None - enable_dedup = dedup if dedup is not None else enable_caption - if enable_dedup: - ingestor = ingestor.dedup(DedupParams(iou_threshold=dedup_iou_threshold)) - - if enable_caption: - ingestor = ingestor.caption( - CaptionParams( - endpoint_url=caption_invoke_url, - api_key=caption_remote_api_key, - model_name=caption_model_name, - device=caption_device, - context_text_max_chars=caption_context_text_max_chars, - gpu_memory_utilization=caption_gpu_memory_utilization, - temperature=caption_temperature, - top_p=caption_top_p, - max_tokens=caption_max_tokens, - ) - ) - - if store_images_uri is not None: - ingestor = ingestor.store( - StoreParams( - storage_uri=store_images_uri, - store_text=store_text, - strip_base64=strip_base64, - ) - ) - - ingestor = ingestor.embed(embed_params) - - # ------------------------------------------------------------------ - # Execute the graph via the executor - # ------------------------------------------------------------------ - logger.info("Starting ingestion of %s ...", input_path) - ingest_start = time.perf_counter() - - # GraphIngestor.ingest() builds the Graph, creates the executor, - # and calls executor.ingest(file_patterns) returning: - # batch mode -> materialized ray.data.Dataset - # inprocess mode -> pandas.DataFrame - result = ingestor.ingest() - - ingestion_only_total_time = time.perf_counter() - ingest_start - - # ------------------------------------------------------------------ - # Collect results - # ------------------------------------------------------------------ - if run_mode == "batch": - import ray - - ray_download_start = time.perf_counter() - ingest_local_results = result.take_all() - ray_download_time = time.perf_counter() - ray_download_start - - import pandas as pd - - result_df = pd.DataFrame(ingest_local_results) - num_rows = _count_input_units(result_df) - else: - import pandas as pd - - result_df = result - ingest_local_results = result_df.to_dict("records") - ray_download_time = 0.0 - num_rows = _count_input_units(result_df) - - if save_intermediate is not None: - out_dir = Path(save_intermediate).expanduser().resolve() - out_dir.mkdir(parents=True, exist_ok=True) - out_path = out_dir / "extraction.parquet" - result_df.to_parquet(out_path, index=False) - logger.info("Wrote extraction Parquet for intermediate use: %s", out_path) - - if detection_summary_file is not None: - from nemo_retriever.utils.detection_summary import ( - collect_detection_summary_from_df, - write_detection_summary, - ) - - write_detection_summary( - Path(detection_summary_file), - collect_detection_summary_from_df(result_df), - ) - - # ------------------------------------------------------------------ - # Write to LanceDB - # ------------------------------------------------------------------ - lancedb_write_start = time.perf_counter() - handle_lancedb(ingest_local_results, lancedb_uri, LANCEDB_TABLE, hybrid=hybrid, mode="overwrite") - lancedb_write_time = time.perf_counter() - lancedb_write_start - - # ------------------------------------------------------------------ - # Recall / BEIR evaluation - # ------------------------------------------------------------------ - import lancedb as _lancedb_mod - - db = _lancedb_mod.connect(lancedb_uri) - table = db.open_table(LANCEDB_TABLE) - - if int(table.count_rows()) == 0: - logger.warning("LanceDB table is empty; skipping %s evaluation.", evaluation_mode) - _write_runtime_summary( - runtime_metrics_dir, - runtime_metrics_prefix, - { - "run_mode": run_mode, - "input_path": str(Path(input_path).resolve()), - "input_pages": int(num_rows), - "num_pages": int(num_rows), - "num_rows": int(len(result_df.index)), - "ingestion_only_secs": float(ingestion_only_total_time), - "ray_download_secs": float(ray_download_time), - "lancedb_write_secs": float(lancedb_write_time), - "evaluation_secs": 0.0, - "total_secs": float(time.perf_counter() - ingest_start), - "evaluation_mode": evaluation_mode, - "evaluation_metrics": {}, - "recall_details": bool(recall_details), - "lancedb_uri": str(lancedb_uri), - "lancedb_table": str(LANCEDB_TABLE), - }, - ) - if run_mode == "batch": - ray.shutdown() - return - - from nemo_retriever.model import resolve_embed_model - from nemo_retriever.utils.detection_summary import print_run_summary - - _recall_model = resolve_embed_model(str(embed_model_name)) - evaluation_label = "Recall" - evaluation_total_time = 0.0 - evaluation_metrics: dict[str, float] = {} - evaluation_query_count: Optional[int] = None - - if evaluation_mode == "beir": - if not beir_loader: - raise ValueError("--beir-loader is required when --evaluation-mode=beir") - if not beir_dataset_name: - raise ValueError("--beir-dataset-name is required when --evaluation-mode=beir") - - from nemo_retriever.recall.beir import BeirConfig, evaluate_lancedb_beir - - cfg = BeirConfig( - lancedb_uri=str(lancedb_uri), - lancedb_table=str(LANCEDB_TABLE), - embedding_model=_recall_model, - loader=str(beir_loader), - dataset_name=str(beir_dataset_name), - split=str(beir_split), - query_language=beir_query_language, - doc_id_field=str(beir_doc_id_field), - ks=tuple(beir_k) if beir_k else (1, 3, 5, 10), - embedding_http_endpoint=embed_invoke_url, - embedding_api_key=embed_remote_api_key or "", - hybrid=hybrid, - reranker=bool(reranker), - reranker_model_name=str(reranker_model_name), - ) - evaluation_start = time.perf_counter() - beir_dataset, _raw_hits, _run, evaluation_metrics = evaluate_lancedb_beir(cfg) - evaluation_total_time = time.perf_counter() - evaluation_start - evaluation_label = "BEIR" - evaluation_query_count = len(beir_dataset.query_ids) - else: - query_csv_path = Path(query_csv) - if not query_csv_path.exists(): - logger.warning("Query CSV not found at %s; skipping recall evaluation.", query_csv_path) - _write_runtime_summary( - runtime_metrics_dir, - runtime_metrics_prefix, - { - "run_mode": run_mode, - "input_path": str(Path(input_path).resolve()), - "input_pages": int(num_rows), - "num_pages": int(num_rows), - "num_rows": int(len(result_df.index)), - "ingestion_only_secs": float(ingestion_only_total_time), - "ray_download_secs": float(ray_download_time), - "lancedb_write_secs": float(lancedb_write_time), - "evaluation_secs": 0.0, - "total_secs": float(time.perf_counter() - ingest_start), - "evaluation_mode": evaluation_mode, - "evaluation_metrics": {}, - "recall_details": bool(recall_details), - "lancedb_uri": str(lancedb_uri), - "lancedb_table": str(LANCEDB_TABLE), - }, - ) - if run_mode == "batch": - ray.shutdown() - return - - from nemo_retriever.recall.core import RecallConfig, retrieve_and_score - - recall_cfg = RecallConfig( - lancedb_uri=str(lancedb_uri), - lancedb_table=str(LANCEDB_TABLE), - embedding_model=_recall_model, - embedding_http_endpoint=embed_invoke_url, - embedding_api_key=embed_remote_api_key or "", - top_k=10, - ks=(1, 5, 10), - hybrid=hybrid, - match_mode=recall_match_mode, - audio_match_tolerance_secs=float(audio_match_tolerance_secs), - reranker=reranker_model_name if reranker else None, - embed_modality=embed_modality, - ) - evaluation_start = time.perf_counter() - _df_query, _gold, _raw_hits, _retrieved_keys, evaluation_metrics = retrieve_and_score( - query_csv=query_csv_path, cfg=recall_cfg - ) - evaluation_total_time = time.perf_counter() - evaluation_start - evaluation_query_count = len(_df_query.index) - - total_time = time.perf_counter() - ingest_start - - _write_runtime_summary( - runtime_metrics_dir, - runtime_metrics_prefix, - { - "run_mode": run_mode, - "input_path": str(Path(input_path).resolve()), - "input_pages": int(num_rows), - "num_pages": int(num_rows), - "num_rows": int(len(result_df.index)), - "ingestion_only_secs": float(ingestion_only_total_time), - "ray_download_secs": float(ray_download_time), - "lancedb_write_secs": float(lancedb_write_time), - "evaluation_secs": float(evaluation_total_time), - "total_secs": float(total_time), - "evaluation_mode": evaluation_mode, - "evaluation_metrics": dict(evaluation_metrics), - "evaluation_count": evaluation_query_count, - "recall_details": bool(recall_details), - "lancedb_uri": str(lancedb_uri), - "lancedb_table": str(LANCEDB_TABLE), - }, - ) - - if run_mode == "batch": - ray.shutdown() - - print_run_summary( - num_rows, - Path(input_path), - hybrid, - lancedb_uri, - LANCEDB_TABLE, - total_time, - ingestion_only_total_time, - ray_download_time, - lancedb_write_time, - evaluation_total_time, - evaluation_metrics, - evaluation_label=evaluation_label, - evaluation_count=evaluation_query_count, - ) - finally: - os.sys.stdout = original_stdout - os.sys.stderr = original_stderr - if log_handle is not None: - log_handle.close() - +from nemo_retriever.pipeline.__main__ import app if __name__ == "__main__": app() diff --git a/nemo_retriever/src/nemo_retriever/pipeline/__init__.py b/nemo_retriever/src/nemo_retriever/pipeline/__init__.py new file mode 100644 index 000000000..ba321a0ad --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/pipeline/__init__.py @@ -0,0 +1,42 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""End-to-end ingestion pipeline subcommand for the ``retriever`` CLI. + +This package wraps :class:`~nemo_retriever.graph_ingestor.GraphIngestor` with a +Typer application that exposes every knob needed to run a full PDF / doc / +txt / html / image / audio ingestion job, write results to LanceDB, and +optionally evaluate recall or BEIR. + +It is registered on the ``retriever`` CLI as the ``pipeline`` subcommand:: + + retriever pipeline run [OPTIONS] + +The implementation historically lived in +``nemo_retriever/examples/graph_pipeline.py``; that module is now a thin +backward-compat shim that re-exports the same Typer app from +:mod:`nemo_retriever.pipeline.__main__`. + +``app`` and ``run`` are exposed via lazy attribute access so that +``python -m nemo_retriever.pipeline`` can import the ``__main__`` module +cleanly (without a re-import warning). +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +__all__ = ["app", "run"] + + +if TYPE_CHECKING: + from .__main__ import app, run # noqa: F401 + + +def __getattr__(name: str) -> Any: + if name in {"app", "run"}: + from . import __main__ as _main + + return getattr(_main, name) + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/nemo_retriever/src/nemo_retriever/pipeline/__main__.py b/nemo_retriever/src/nemo_retriever/pipeline/__main__.py new file mode 100644 index 000000000..8f28eafc5 --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/pipeline/__main__.py @@ -0,0 +1,1084 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Typer CLI for the end-to-end graph ingestion pipeline. + +Registered on the ``retriever`` CLI as the ``pipeline`` subcommand. + +Examples:: + + # Batch mode (Ray) with PDF extraction + embedding + retriever pipeline run /data/pdfs \\ + --run-mode batch \\ + --embed-invoke-url http://localhost:8000/v1 + + # In-process mode (no Ray) for quick local testing + retriever pipeline run /data/pdfs \\ + --run-mode inprocess \\ + --ocr-invoke-url http://localhost:9000/v1 + + # Save extraction Parquet for full-page markdown (page index / export) + retriever pipeline run /data/pdfs \\ + --lancedb-uri lancedb \\ + --save-intermediate /path/to/extracted_parquet_dir +""" + +from __future__ import annotations + +import glob as _glob +import json +import logging +import os +import time +from pathlib import Path +from typing import Any, Optional, TextIO + +import typer + +from nemo_retriever.audio import asr_params_from_env +from nemo_retriever.graph_ingestor import GraphIngestor +from nemo_retriever.model import VL_EMBED_MODEL, VL_RERANK_MODEL +from nemo_retriever.params import ( + AudioChunkParams, + CaptionParams, + DedupParams, + EmbedParams, + ExtractParams, + StoreParams, + TextChunkParams, +) +from nemo_retriever.params.models import BatchTuningParams +from nemo_retriever.utils.remote_auth import resolve_remote_api_key +from nemo_retriever.vector_store.lancedb_store import handle_lancedb + +logger = logging.getLogger(__name__) + +app = typer.Typer(help="End-to-end graph-based ingestion pipeline (extract -> embed -> LanceDB).") + +LANCEDB_URI = "lancedb" +LANCEDB_TABLE = "nv-ingest" + +# Help panel labels (keep stable so --help groupings read consistently). +_PANEL_IO = "I/O and Execution" +_PANEL_EXTRACT = "PDF / Document Extraction" +_PANEL_REMOTE = "Remote NIM Endpoints" +_PANEL_EMBED = "Embedding" +_PANEL_DEDUP_CAPTION = "Dedup and Caption" +_PANEL_STORE_CHUNK = "Storage and Text Chunking" +_PANEL_AUDIO = "Audio" +_PANEL_RAY = "Ray / Batch Tuning" +_PANEL_LANCEDB = "LanceDB and Outputs" +_PANEL_EVAL = "Evaluation (Recall / BEIR)" +_PANEL_OBS = "Observability" + + +# --------------------------------------------------------------------------- +# Logging helpers +# --------------------------------------------------------------------------- + + +class _TeeStream: + """Mirror stdout/stderr writes into a second stream (e.g. a log file).""" + + def __init__(self, primary: TextIO, mirror: TextIO) -> None: + self._primary = primary + self._mirror = mirror + + def write(self, data: str) -> int: + self._primary.write(data) + self._mirror.write(data) + return len(data) + + def flush(self) -> None: + self._primary.flush() + self._mirror.flush() + + def isatty(self) -> bool: + return bool(getattr(self._primary, "isatty", lambda: False)()) + + def fileno(self) -> int: + return int(getattr(self._primary, "fileno")()) + + def writable(self) -> bool: + return bool(getattr(self._primary, "writable", lambda: True)()) + + @property + def encoding(self) -> str: + return str(getattr(self._primary, "encoding", "utf-8")) + + +def _configure_logging(log_file: Optional[Path], *, debug: bool = False) -> tuple[Optional[TextIO], TextIO, TextIO]: + original_stdout = os.sys.stdout + original_stderr = os.sys.stderr + log_level = logging.DEBUG if debug else logging.INFO + if log_file is None: + logging.basicConfig( + level=log_level, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + force=True, + ) + return None, original_stdout, original_stderr + + target = Path(log_file).expanduser().resolve() + target.parent.mkdir(parents=True, exist_ok=True) + fh = open(target, "a", encoding="utf-8", buffering=1) + os.sys.stdout = _TeeStream(os.sys.__stdout__, fh) + os.sys.stderr = _TeeStream(os.sys.__stderr__, fh) + logging.basicConfig( + level=log_level, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + handlers=[logging.StreamHandler(os.sys.stdout)], + force=True, + ) + logger.info("Writing combined pipeline logs to %s", str(target)) + return fh, original_stdout, original_stderr + + +# --------------------------------------------------------------------------- +# Small utilities (LanceDB, summaries, file patterns) +# --------------------------------------------------------------------------- + + +def _ensure_lancedb_table(uri: str, table_name: str) -> None: + from nemo_retriever.vector_store.lancedb_utils import lancedb_schema + import lancedb + import pyarrow as pa + + Path(uri).mkdir(parents=True, exist_ok=True) + db = lancedb.connect(uri) + try: + db.open_table(table_name) + return + except ValueError as e: + # lancedb has no TableNotFoundError; missing tables raise ValueError, same + # substring LanceDB uses internally in db.py for this case. + if f"Table '{table_name}' was not found" not in str(e): + raise + schema = lancedb_schema() + empty = pa.table({f.name: [] for f in schema}, schema=schema) + db.create_table(table_name, data=empty, schema=schema, mode="create") + + +def _write_runtime_summary( + runtime_metrics_dir: Optional[Path], + runtime_metrics_prefix: Optional[str], + payload: dict[str, object], +) -> None: + if runtime_metrics_dir is None and not runtime_metrics_prefix: + return + + target_dir = Path(runtime_metrics_dir or Path.cwd()).expanduser().resolve() + target_dir.mkdir(parents=True, exist_ok=True) + prefix = (runtime_metrics_prefix or "run").strip() or "run" + target = target_dir / f"{prefix}.runtime.summary.json" + target.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8") + + +def _count_input_units(result_df) -> int: + if "source_id" in result_df.columns: + return int(result_df["source_id"].nunique()) + if "source_path" in result_df.columns: + return int(result_df["source_path"].nunique()) + return int(len(result_df.index)) + + +def _resolve_file_patterns(input_path: Path, input_type: str) -> list[str]: + input_path = Path(input_path) + if input_path.is_file(): + return [str(input_path)] + if not input_path.is_dir(): + raise typer.BadParameter(f"Path does not exist: {input_path}") + + ext_map = { + "pdf": ["*.pdf"], + "doc": ["*.docx", "*.pptx"], + "txt": ["*.txt"], + "html": ["*.html"], + "image": ["*.jpg", "*.jpeg", "*.png", "*.tiff", "*.bmp"], + "audio": ["*.mp3", "*.wav", "*.m4a"], + } + exts = ext_map.get(input_type) + if exts is None: + raise typer.BadParameter(f"Unsupported --input-type: {input_type!r}") + + patterns = [str(input_path / ext) for ext in exts] + matched = [p for p in patterns if _glob.glob(p)] + if not matched: + raise typer.BadParameter(f"No files found for input_type={input_type!r} in {input_path}") + return matched + + +# --------------------------------------------------------------------------- +# Parameter builders (split out from the old monolithic main()) +# --------------------------------------------------------------------------- + + +def _build_extract_params( + *, + method: str, + dpi: int, + extract_text: bool, + extract_tables: bool, + extract_charts: bool, + extract_infographics: bool, + extract_page_as_image: bool, + use_graphic_elements: bool, + use_table_structure: bool, + table_output_format: Optional[str], + extract_remote_api_key: Optional[str], + page_elements_invoke_url: Optional[str], + ocr_invoke_url: Optional[str], + graphic_elements_invoke_url: Optional[str], + table_structure_invoke_url: Optional[str], + pdf_split_batch_size: int, + pdf_extract_batch_size: Optional[int], + pdf_extract_tasks: Optional[int], + pdf_extract_cpus_per_task: Optional[float], + page_elements_actors: Optional[int], + page_elements_batch_size: Optional[int], + page_elements_cpus_per_actor: Optional[float], + page_elements_gpus_per_actor: Optional[float], + ocr_actors: Optional[int], + ocr_batch_size: Optional[int], + ocr_cpus_per_actor: Optional[float], + ocr_gpus_per_actor: Optional[float], + nemotron_parse_actors: Optional[int], + nemotron_parse_batch_size: Optional[int], + nemotron_parse_gpus_per_actor: Optional[float], +) -> ExtractParams: + """Assemble :class:`ExtractParams` plus its :class:`BatchTuningParams`.""" + + extract_batch_tuning = BatchTuningParams( + **{ + k: v + for k, v in { + "pdf_split_batch_size": pdf_split_batch_size, + "pdf_extract_batch_size": pdf_extract_batch_size or None, + "pdf_extract_workers": pdf_extract_tasks or None, + "pdf_extract_num_cpus": pdf_extract_cpus_per_task or None, + "page_elements_batch_size": page_elements_batch_size or None, + "page_elements_workers": page_elements_actors or None, + "page_elements_cpus_per_actor": page_elements_cpus_per_actor or None, + "gpu_page_elements": ( + 0.0 + if page_elements_invoke_url + else (page_elements_gpus_per_actor if page_elements_gpus_per_actor is not None else None) + ), + "ocr_inference_batch_size": ocr_batch_size or None, + "ocr_workers": ocr_actors or None, + "ocr_cpus_per_actor": ocr_cpus_per_actor or None, + "gpu_ocr": ( + 0.0 if ocr_invoke_url else (ocr_gpus_per_actor if ocr_gpus_per_actor is not None else None) + ), + "nemotron_parse_batch_size": nemotron_parse_batch_size or None, + "nemotron_parse_workers": nemotron_parse_actors or None, + "gpu_nemotron_parse": ( + nemotron_parse_gpus_per_actor if nemotron_parse_gpus_per_actor is not None else None + ), + }.items() + if v is not None + } + ) + return ExtractParams( + **{ + k: v + for k, v in { + "method": method, + "dpi": int(dpi), + "extract_text": extract_text, + "extract_tables": extract_tables, + "extract_charts": extract_charts, + "extract_infographics": extract_infographics, + "extract_page_as_image": extract_page_as_image, + "api_key": extract_remote_api_key, + "page_elements_invoke_url": page_elements_invoke_url, + "ocr_invoke_url": ocr_invoke_url, + "graphic_elements_invoke_url": graphic_elements_invoke_url, + "table_structure_invoke_url": table_structure_invoke_url, + "use_graphic_elements": use_graphic_elements, + "use_table_structure": use_table_structure, + "table_output_format": table_output_format, + "inference_batch_size": page_elements_batch_size or None, + "batch_tuning": extract_batch_tuning, + }.items() + if v is not None + } + ) + + +def _build_embed_params( + *, + embed_model_name: str, + embed_invoke_url: Optional[str], + embed_remote_api_key: Optional[str], + embed_modality: str, + text_elements_modality: Optional[str], + structured_elements_modality: Optional[str], + embed_granularity: str, + embed_actors: Optional[int], + embed_batch_size: Optional[int], + embed_cpus_per_actor: Optional[float], + embed_gpus_per_actor: Optional[float], +) -> EmbedParams: + """Assemble :class:`EmbedParams` plus its :class:`BatchTuningParams`.""" + + embed_batch_tuning = BatchTuningParams( + **{ + k: v + for k, v in { + "embed_batch_size": embed_batch_size or None, + "embed_workers": embed_actors or None, + "embed_cpus_per_actor": embed_cpus_per_actor or None, + "gpu_embed": ( + 0.0 if embed_invoke_url else (embed_gpus_per_actor if embed_gpus_per_actor is not None else None) + ), + }.items() + if v is not None + } + ) + return EmbedParams( + **{ + k: v + for k, v in { + "model_name": embed_model_name, + "embed_invoke_url": embed_invoke_url, + "api_key": embed_remote_api_key, + "embed_modality": embed_modality, + "text_elements_modality": text_elements_modality, + "structured_elements_modality": structured_elements_modality, + "embed_granularity": embed_granularity, + "batch_tuning": embed_batch_tuning, + "inference_batch_size": embed_batch_size or None, + }.items() + if v is not None + } + ) + + +def _build_ingestor( + *, + run_mode: str, + ray_address: Optional[str], + file_patterns: list[str], + input_type: str, + extract_params: ExtractParams, + embed_params: EmbedParams, + text_chunk_params: TextChunkParams, + enable_text_chunk: bool, + enable_dedup: bool, + enable_caption: bool, + dedup_iou_threshold: float, + caption_invoke_url: Optional[str], + caption_remote_api_key: Optional[str], + caption_model_name: str, + caption_device: Optional[str], + caption_context_text_max_chars: int, + caption_gpu_memory_utilization: float, + caption_gpus_per_actor: Optional[float], + store_images_uri: Optional[str], + store_text: bool, + strip_base64: bool, + segment_audio: bool, + audio_split_type: str, + audio_split_interval: int, +) -> GraphIngestor: + """Construct a :class:`GraphIngestor` with all requested stages attached.""" + + node_overrides: dict[str, dict[str, Any]] = {} + if caption_gpus_per_actor is not None: + node_overrides["CaptionActor"] = {"num_gpus": caption_gpus_per_actor} + + ingestor = GraphIngestor( + run_mode=run_mode, + ray_address=ray_address, + node_overrides=node_overrides or None, + ) + ingestor = ingestor.files(file_patterns) + + # Extraction stage is selected by input type. + if input_type == "txt": + ingestor = ingestor.extract_txt(text_chunk_params) + elif input_type == "html": + ingestor = ingestor.extract_html(text_chunk_params) + elif input_type == "image": + ingestor = ingestor.extract_image_files(extract_params) + elif input_type == "audio": + asr_params = asr_params_from_env().model_copy(update={"segment_audio": bool(segment_audio)}) + ingestor = ingestor.extract_audio( + params=AudioChunkParams(split_type=audio_split_type, split_interval=int(audio_split_interval)), + asr_params=asr_params, + ) + else: + # "pdf" or "doc" + ingestor = ingestor.extract(extract_params) + + if enable_text_chunk: + ingestor = ingestor.split(text_chunk_params) + + if enable_dedup: + ingestor = ingestor.dedup(DedupParams(iou_threshold=dedup_iou_threshold)) + + if enable_caption: + ingestor = ingestor.caption( + CaptionParams( + endpoint_url=caption_invoke_url, + api_key=caption_remote_api_key, + model_name=caption_model_name, + device=caption_device, + context_text_max_chars=caption_context_text_max_chars, + gpu_memory_utilization=caption_gpu_memory_utilization, + ) + ) + + if store_images_uri is not None: + ingestor = ingestor.store( + StoreParams( + storage_uri=store_images_uri, + store_text=store_text, + strip_base64=strip_base64, + ) + ) + + return ingestor.embed(embed_params) + + +def _collect_results(run_mode: str, result: Any) -> tuple[list[dict[str, Any]], Any, float, int]: + """Materialize the graph result into a list of records + DataFrame. + + Returns ``(records, result_df, ray_download_secs, num_input_units)``. + """ + + import pandas as pd + + if run_mode == "batch": + ray_download_start = time.perf_counter() + records = result.take_all() + ray_download_time = time.perf_counter() - ray_download_start + result_df = pd.DataFrame(records) + else: + result_df = result + records = result_df.to_dict("records") + ray_download_time = 0.0 + + return records, result_df, float(ray_download_time), _count_input_units(result_df) + + +def _run_evaluation( + *, + evaluation_mode: str, + lancedb_uri: str, + embed_model_name: str, + embed_invoke_url: Optional[str], + embed_remote_api_key: Optional[str], + embed_modality: str, + query_csv: Path, + recall_match_mode: str, + audio_match_tolerance_secs: float, + hybrid: bool, + reranker: Optional[bool], + reranker_model_name: str, + beir_loader: Optional[str], + beir_dataset_name: Optional[str], + beir_split: str, + beir_query_language: Optional[str], + beir_doc_id_field: str, + beir_k: list[int], +) -> tuple[str, float, dict[str, float], Optional[int], bool]: + """Run recall or BEIR evaluation. + + Returns ``(label, elapsed_secs, metrics, query_count, ran)``. When the + query CSV is missing in recall mode, ``ran`` is ``False`` and the caller + should skip metric recording. + """ + + from nemo_retriever.model import resolve_embed_model + + embed_model = resolve_embed_model(str(embed_model_name)) + + if evaluation_mode == "beir": + if not beir_loader: + raise ValueError("--beir-loader is required when --evaluation-mode=beir") + if not beir_dataset_name: + raise ValueError("--beir-dataset-name is required when --evaluation-mode=beir") + + from nemo_retriever.recall.beir import BeirConfig, evaluate_lancedb_beir + + cfg = BeirConfig( + lancedb_uri=str(lancedb_uri), + lancedb_table=str(LANCEDB_TABLE), + embedding_model=embed_model, + loader=str(beir_loader), + dataset_name=str(beir_dataset_name), + split=str(beir_split), + query_language=beir_query_language, + doc_id_field=str(beir_doc_id_field), + ks=tuple(beir_k) if beir_k else (1, 3, 5, 10), + embedding_http_endpoint=embed_invoke_url, + embedding_api_key=embed_remote_api_key or "", + hybrid=hybrid, + reranker=bool(reranker), + reranker_model_name=str(reranker_model_name), + ) + evaluation_start = time.perf_counter() + beir_dataset, _raw_hits, _run, metrics = evaluate_lancedb_beir(cfg) + return "BEIR", time.perf_counter() - evaluation_start, metrics, len(beir_dataset.query_ids), True + + # Default: recall eval against a query CSV. + query_csv_path = Path(query_csv) + if not query_csv_path.exists(): + logger.warning("Query CSV not found at %s; skipping recall evaluation.", query_csv_path) + return "Recall", 0.0, {}, None, False + + from nemo_retriever.recall.core import RecallConfig, retrieve_and_score + + recall_cfg = RecallConfig( + lancedb_uri=str(lancedb_uri), + lancedb_table=str(LANCEDB_TABLE), + embedding_model=embed_model, + embedding_http_endpoint=embed_invoke_url, + embedding_api_key=embed_remote_api_key or "", + top_k=10, + ks=(1, 5, 10), + hybrid=hybrid, + match_mode=recall_match_mode, + audio_match_tolerance_secs=float(audio_match_tolerance_secs), + reranker=reranker_model_name if reranker else None, + embed_modality=embed_modality, + ) + evaluation_start = time.perf_counter() + df_query, _gold, _raw_hits, _retrieved_keys, metrics = retrieve_and_score(query_csv=query_csv_path, cfg=recall_cfg) + return "Recall", time.perf_counter() - evaluation_start, metrics, len(df_query.index), True + + +# --------------------------------------------------------------------------- +# Typer command: `retriever pipeline run` +# --------------------------------------------------------------------------- + + +@app.command("run") +def run( + ctx: typer.Context, + input_path: Path = typer.Argument( + ..., + help="File or directory of documents to ingest.", + path_type=Path, + ), + # --- I/O and execution ------------------------------------------------ + run_mode: str = typer.Option( + "batch", + "--run-mode", + help="Execution mode: 'batch' (Ray Data) or 'inprocess' (pandas, no Ray).", + rich_help_panel=_PANEL_IO, + ), + input_type: str = typer.Option( + "pdf", + "--input-type", + help="Input type: 'pdf', 'doc', 'txt', 'html', 'image', or 'audio'.", + rich_help_panel=_PANEL_IO, + ), + debug: bool = typer.Option( + False, "--debug/--no-debug", help="Enable debug-level logging.", rich_help_panel=_PANEL_IO + ), + log_file: Optional[Path] = typer.Option( + None, "--log-file", path_type=Path, dir_okay=False, rich_help_panel=_PANEL_IO + ), + # --- PDF / document extraction --------------------------------------- + method: str = typer.Option( + "pdfium", "--method", help="PDF text extraction method.", rich_help_panel=_PANEL_EXTRACT + ), + dpi: int = typer.Option( + 300, "--dpi", min=72, help="Render DPI for PDF page images.", rich_help_panel=_PANEL_EXTRACT + ), + extract_text: bool = typer.Option(True, "--extract-text/--no-extract-text", rich_help_panel=_PANEL_EXTRACT), + extract_tables: bool = typer.Option(True, "--extract-tables/--no-extract-tables", rich_help_panel=_PANEL_EXTRACT), + extract_charts: bool = typer.Option(True, "--extract-charts/--no-extract-charts", rich_help_panel=_PANEL_EXTRACT), + extract_infographics: bool = typer.Option( + False, "--extract-infographics/--no-extract-infographics", rich_help_panel=_PANEL_EXTRACT + ), + extract_page_as_image: bool = typer.Option( + True, + "--extract-page-as-image/--no-extract-page-as-image", + rich_help_panel=_PANEL_EXTRACT, + ), + use_graphic_elements: bool = typer.Option(False, "--use-graphic-elements", rich_help_panel=_PANEL_EXTRACT), + use_table_structure: bool = typer.Option(False, "--use-table-structure", rich_help_panel=_PANEL_EXTRACT), + table_output_format: Optional[str] = typer.Option(None, "--table-output-format", rich_help_panel=_PANEL_EXTRACT), + # --- Remote NIM endpoints -------------------------------------------- + api_key: Optional[str] = typer.Option( + None, + "--api-key", + help="Bearer token for remote NIM endpoints.", + rich_help_panel=_PANEL_REMOTE, + ), + page_elements_invoke_url: Optional[str] = typer.Option( + None, "--page-elements-invoke-url", rich_help_panel=_PANEL_REMOTE + ), + ocr_invoke_url: Optional[str] = typer.Option(None, "--ocr-invoke-url", rich_help_panel=_PANEL_REMOTE), + graphic_elements_invoke_url: Optional[str] = typer.Option( + None, "--graphic-elements-invoke-url", rich_help_panel=_PANEL_REMOTE + ), + table_structure_invoke_url: Optional[str] = typer.Option( + None, "--table-structure-invoke-url", rich_help_panel=_PANEL_REMOTE + ), + embed_invoke_url: Optional[str] = typer.Option(None, "--embed-invoke-url", rich_help_panel=_PANEL_REMOTE), + # --- Embedding -------------------------------------------------------- + embed_model_name: str = typer.Option(VL_EMBED_MODEL, "--embed-model-name", rich_help_panel=_PANEL_EMBED), + embed_modality: str = typer.Option("text", "--embed-modality", rich_help_panel=_PANEL_EMBED), + embed_granularity: str = typer.Option("element", "--embed-granularity", rich_help_panel=_PANEL_EMBED), + text_elements_modality: Optional[str] = typer.Option( + None, "--text-elements-modality", rich_help_panel=_PANEL_EMBED + ), + structured_elements_modality: Optional[str] = typer.Option( + None, "--structured-elements-modality", rich_help_panel=_PANEL_EMBED + ), + # --- Dedup / caption ------------------------------------------------- + dedup: Optional[bool] = typer.Option(None, "--dedup/--no-dedup", rich_help_panel=_PANEL_DEDUP_CAPTION), + dedup_iou_threshold: float = typer.Option(0.45, "--dedup-iou-threshold", rich_help_panel=_PANEL_DEDUP_CAPTION), + caption: bool = typer.Option(False, "--caption/--no-caption", rich_help_panel=_PANEL_DEDUP_CAPTION), + caption_invoke_url: Optional[str] = typer.Option( + None, "--caption-invoke-url", rich_help_panel=_PANEL_DEDUP_CAPTION + ), + caption_model_name: str = typer.Option( + "nvidia/NVIDIA-Nemotron-Nano-12B-v2-VL-BF16", + "--caption-model-name", + rich_help_panel=_PANEL_DEDUP_CAPTION, + ), + caption_device: Optional[str] = typer.Option(None, "--caption-device", rich_help_panel=_PANEL_DEDUP_CAPTION), + caption_context_text_max_chars: int = typer.Option( + 0, "--caption-context-text-max-chars", rich_help_panel=_PANEL_DEDUP_CAPTION + ), + caption_gpu_memory_utilization: float = typer.Option( + 0.5, "--caption-gpu-memory-utilization", rich_help_panel=_PANEL_DEDUP_CAPTION + ), + caption_gpus_per_actor: Optional[float] = typer.Option( + None, "--caption-gpus-per-actor", max=1.0, rich_help_panel=_PANEL_DEDUP_CAPTION + ), + # --- Storage and text chunking -------------------------------------- + store_images_uri: Optional[str] = typer.Option( + None, + "--store-images-uri", + help="Store extracted images to this URI.", + rich_help_panel=_PANEL_STORE_CHUNK, + ), + store_text: bool = typer.Option( + False, + "--store-text/--no-store-text", + help="Also store extracted text.", + rich_help_panel=_PANEL_STORE_CHUNK, + ), + strip_base64: bool = typer.Option( + True, + "--strip-base64/--no-strip-base64", + help="Strip base64 after storing.", + rich_help_panel=_PANEL_STORE_CHUNK, + ), + text_chunk: bool = typer.Option(False, "--text-chunk", rich_help_panel=_PANEL_STORE_CHUNK), + text_chunk_max_tokens: Optional[int] = typer.Option( + None, "--text-chunk-max-tokens", rich_help_panel=_PANEL_STORE_CHUNK + ), + text_chunk_overlap_tokens: Optional[int] = typer.Option( + None, "--text-chunk-overlap-tokens", rich_help_panel=_PANEL_STORE_CHUNK + ), + # --- Ray / batch tuning --------------------------------------------- + # *_gpus_per_actor defaults are None (not 0.0) so we can distinguish + # "not set -> use heuristic" from "explicitly 0 -> no GPU". Other tuning + # defaults use 0/0.0 because those values are never valid explicit choices. + ray_address: Optional[str] = typer.Option(None, "--ray-address", rich_help_panel=_PANEL_RAY), + ray_log_to_driver: bool = typer.Option( + True, "--ray-log-to-driver/--no-ray-log-to-driver", rich_help_panel=_PANEL_RAY + ), + ocr_actors: Optional[int] = typer.Option(0, "--ocr-actors", rich_help_panel=_PANEL_RAY), + ocr_batch_size: Optional[int] = typer.Option(0, "--ocr-batch-size", rich_help_panel=_PANEL_RAY), + ocr_cpus_per_actor: Optional[float] = typer.Option(0.0, "--ocr-cpus-per-actor", rich_help_panel=_PANEL_RAY), + ocr_gpus_per_actor: Optional[float] = typer.Option( + None, "--ocr-gpus-per-actor", max=1.0, rich_help_panel=_PANEL_RAY + ), + page_elements_actors: Optional[int] = typer.Option(0, "--page-elements-actors", rich_help_panel=_PANEL_RAY), + page_elements_batch_size: Optional[int] = typer.Option(0, "--page-elements-batch-size", rich_help_panel=_PANEL_RAY), + page_elements_cpus_per_actor: Optional[float] = typer.Option( + 0.0, "--page-elements-cpus-per-actor", rich_help_panel=_PANEL_RAY + ), + page_elements_gpus_per_actor: Optional[float] = typer.Option( + None, "--page-elements-gpus-per-actor", max=1.0, rich_help_panel=_PANEL_RAY + ), + embed_actors: Optional[int] = typer.Option(0, "--embed-actors", rich_help_panel=_PANEL_RAY), + embed_batch_size: Optional[int] = typer.Option(0, "--embed-batch-size", rich_help_panel=_PANEL_RAY), + embed_cpus_per_actor: Optional[float] = typer.Option(0.0, "--embed-cpus-per-actor", rich_help_panel=_PANEL_RAY), + embed_gpus_per_actor: Optional[float] = typer.Option( + None, "--embed-gpus-per-actor", max=1.0, rich_help_panel=_PANEL_RAY + ), + pdf_split_batch_size: int = typer.Option(1, "--pdf-split-batch-size", min=1, rich_help_panel=_PANEL_RAY), + pdf_extract_batch_size: Optional[int] = typer.Option(0, "--pdf-extract-batch-size", rich_help_panel=_PANEL_RAY), + pdf_extract_tasks: Optional[int] = typer.Option(0, "--pdf-extract-tasks", rich_help_panel=_PANEL_RAY), + pdf_extract_cpus_per_task: Optional[float] = typer.Option( + 0.0, "--pdf-extract-cpus-per-task", rich_help_panel=_PANEL_RAY + ), + nemotron_parse_actors: Optional[int] = typer.Option(0, "--nemotron-parse-actors", rich_help_panel=_PANEL_RAY), + nemotron_parse_gpus_per_actor: Optional[float] = typer.Option( + None, + "--nemotron-parse-gpus-per-actor", + min=0.0, + max=1.0, + rich_help_panel=_PANEL_RAY, + ), + nemotron_parse_batch_size: Optional[int] = typer.Option( + 0, "--nemotron-parse-batch-size", rich_help_panel=_PANEL_RAY + ), + # --- Audio ---------------------------------------------------------- + segment_audio: bool = typer.Option(False, "--segment-audio/--no-segment-audio", rich_help_panel=_PANEL_AUDIO), + audio_split_type: str = typer.Option("size", "--audio-split-type", rich_help_panel=_PANEL_AUDIO), + audio_split_interval: int = typer.Option(500000, "--audio-split-interval", min=1, rich_help_panel=_PANEL_AUDIO), + audio_match_tolerance_secs: float = typer.Option( + 2.0, "--audio-match-tolerance-secs", min=0.0, rich_help_panel=_PANEL_AUDIO + ), + # --- LanceDB / outputs --------------------------------------------- + lancedb_uri: str = typer.Option(LANCEDB_URI, "--lancedb-uri", rich_help_panel=_PANEL_LANCEDB), + save_intermediate: Optional[Path] = typer.Option( + None, + "--save-intermediate", + help="Directory to write extraction results as Parquet (for full-page markdown / page index).", + path_type=Path, + file_okay=False, + dir_okay=True, + rich_help_panel=_PANEL_LANCEDB, + ), + hybrid: bool = typer.Option(False, "--hybrid/--no-hybrid", rich_help_panel=_PANEL_LANCEDB), + detection_summary_file: Optional[Path] = typer.Option( + None, "--detection-summary-file", path_type=Path, rich_help_panel=_PANEL_LANCEDB + ), + runtime_metrics_dir: Optional[Path] = typer.Option( + None, "--runtime-metrics-dir", path_type=Path, rich_help_panel=_PANEL_OBS + ), + runtime_metrics_prefix: Optional[str] = typer.Option(None, "--runtime-metrics-prefix", rich_help_panel=_PANEL_OBS), + # --- Evaluation ----------------------------------------------------- + evaluation_mode: str = typer.Option("recall", "--evaluation-mode", rich_help_panel=_PANEL_EVAL), + query_csv: Path = typer.Option( + "./data/bo767_query_gt.csv", + "--query-csv", + path_type=Path, + rich_help_panel=_PANEL_EVAL, + ), + recall_match_mode: str = typer.Option("pdf_page", "--recall-match-mode", rich_help_panel=_PANEL_EVAL), + recall_details: bool = typer.Option(True, "--recall-details/--no-recall-details", rich_help_panel=_PANEL_EVAL), + reranker: Optional[bool] = typer.Option(False, "--reranker/--no-reranker", rich_help_panel=_PANEL_EVAL), + reranker_model_name: str = typer.Option(VL_RERANK_MODEL, "--reranker-model-name", rich_help_panel=_PANEL_EVAL), + beir_loader: Optional[str] = typer.Option(None, "--beir-loader", rich_help_panel=_PANEL_EVAL), + beir_dataset_name: Optional[str] = typer.Option(None, "--beir-dataset-name", rich_help_panel=_PANEL_EVAL), + beir_split: str = typer.Option("test", "--beir-split", rich_help_panel=_PANEL_EVAL), + beir_query_language: Optional[str] = typer.Option(None, "--beir-query-language", rich_help_panel=_PANEL_EVAL), + beir_doc_id_field: str = typer.Option("pdf_basename", "--beir-doc-id-field", rich_help_panel=_PANEL_EVAL), + beir_k: list[int] = typer.Option([], "--beir-k", rich_help_panel=_PANEL_EVAL), +) -> None: + """Run the end-to-end graph ingestion pipeline against ``INPUT_PATH``.""" + + _ = ctx + log_handle, original_stdout, original_stderr = _configure_logging(log_file, debug=bool(debug)) + try: + if run_mode not in {"batch", "inprocess"}: + raise ValueError(f"Unsupported --run-mode: {run_mode!r}") + if recall_match_mode not in {"pdf_page", "pdf_only", "audio_segment"}: + raise ValueError(f"Unsupported --recall-match-mode: {recall_match_mode!r}") + if audio_split_type not in {"size", "time", "frame"}: + raise ValueError(f"Unsupported --audio-split-type: {audio_split_type!r}") + if evaluation_mode not in {"recall", "beir"}: + raise ValueError(f"Unsupported --evaluation-mode: {evaluation_mode!r}") + + if run_mode == "batch": + os.environ["RAY_LOG_TO_DRIVER"] = "1" if ray_log_to_driver else "0" + + lancedb_uri = str(Path(lancedb_uri).expanduser().resolve()) + _ensure_lancedb_table(lancedb_uri, LANCEDB_TABLE) + + remote_api_key = resolve_remote_api_key(api_key) + extract_remote_api_key = remote_api_key + embed_remote_api_key = remote_api_key + caption_remote_api_key = remote_api_key + + if ( + any( + ( + page_elements_invoke_url, + ocr_invoke_url, + graphic_elements_invoke_url, + table_structure_invoke_url, + embed_invoke_url, + ) + ) + and remote_api_key is None + ): + logger.warning("Remote endpoint URL(s) were configured without an API key.") + + # Zero out GPU fractions when a remote URL replaces the local model. + if page_elements_invoke_url and float(page_elements_gpus_per_actor or 0.0) != 0.0: + logger.warning("Forcing page-elements GPUs to 0.0 because --page-elements-invoke-url is set.") + page_elements_gpus_per_actor = 0.0 + if ocr_invoke_url and float(ocr_gpus_per_actor or 0.0) != 0.0: + logger.warning("Forcing OCR GPUs to 0.0 because --ocr-invoke-url is set.") + ocr_gpus_per_actor = 0.0 + if embed_invoke_url and float(embed_gpus_per_actor or 0.0) != 0.0: + logger.warning("Forcing embed GPUs to 0.0 because --embed-invoke-url is set.") + embed_gpus_per_actor = 0.0 + + file_patterns = _resolve_file_patterns(Path(input_path), input_type) + + extract_params = _build_extract_params( + method=method, + dpi=dpi, + extract_text=extract_text, + extract_tables=extract_tables, + extract_charts=extract_charts, + extract_infographics=extract_infographics, + extract_page_as_image=extract_page_as_image, + use_graphic_elements=use_graphic_elements, + use_table_structure=use_table_structure, + table_output_format=table_output_format, + extract_remote_api_key=extract_remote_api_key, + page_elements_invoke_url=page_elements_invoke_url, + ocr_invoke_url=ocr_invoke_url, + graphic_elements_invoke_url=graphic_elements_invoke_url, + table_structure_invoke_url=table_structure_invoke_url, + pdf_split_batch_size=pdf_split_batch_size, + pdf_extract_batch_size=pdf_extract_batch_size, + pdf_extract_tasks=pdf_extract_tasks, + pdf_extract_cpus_per_task=pdf_extract_cpus_per_task, + page_elements_actors=page_elements_actors, + page_elements_batch_size=page_elements_batch_size, + page_elements_cpus_per_actor=page_elements_cpus_per_actor, + page_elements_gpus_per_actor=page_elements_gpus_per_actor, + ocr_actors=ocr_actors, + ocr_batch_size=ocr_batch_size, + ocr_cpus_per_actor=ocr_cpus_per_actor, + ocr_gpus_per_actor=ocr_gpus_per_actor, + nemotron_parse_actors=nemotron_parse_actors, + nemotron_parse_batch_size=nemotron_parse_batch_size, + nemotron_parse_gpus_per_actor=nemotron_parse_gpus_per_actor, + ) + + embed_params = _build_embed_params( + embed_model_name=embed_model_name, + embed_invoke_url=embed_invoke_url, + embed_remote_api_key=embed_remote_api_key, + embed_modality=embed_modality, + text_elements_modality=text_elements_modality, + structured_elements_modality=structured_elements_modality, + embed_granularity=embed_granularity, + embed_actors=embed_actors, + embed_batch_size=embed_batch_size, + embed_cpus_per_actor=embed_cpus_per_actor, + embed_gpus_per_actor=embed_gpus_per_actor, + ) + + text_chunk_params = TextChunkParams( + max_tokens=text_chunk_max_tokens or 1024, + overlap_tokens=text_chunk_overlap_tokens if text_chunk_overlap_tokens is not None else 150, + ) + + enable_text_chunk = text_chunk or text_chunk_max_tokens is not None or text_chunk_overlap_tokens is not None + enable_caption = caption or caption_invoke_url is not None + enable_dedup = dedup if dedup is not None else enable_caption + + logger.info("Building graph pipeline (run_mode=%s) for %s ...", run_mode, input_path) + ingestor = _build_ingestor( + run_mode=run_mode, + ray_address=ray_address, + file_patterns=file_patterns, + input_type=input_type, + extract_params=extract_params, + embed_params=embed_params, + text_chunk_params=text_chunk_params, + enable_text_chunk=enable_text_chunk, + enable_dedup=enable_dedup, + enable_caption=enable_caption, + dedup_iou_threshold=dedup_iou_threshold, + caption_invoke_url=caption_invoke_url, + caption_remote_api_key=caption_remote_api_key, + caption_model_name=caption_model_name, + caption_device=caption_device, + caption_context_text_max_chars=caption_context_text_max_chars, + caption_gpu_memory_utilization=caption_gpu_memory_utilization, + caption_gpus_per_actor=caption_gpus_per_actor, + store_images_uri=store_images_uri, + store_text=store_text, + strip_base64=strip_base64, + segment_audio=segment_audio, + audio_split_type=audio_split_type, + audio_split_interval=audio_split_interval, + ) + + # --- Execute --------------------------------------------------- + logger.info("Starting ingestion of %s ...", input_path) + ingest_start = time.perf_counter() + raw_result = ingestor.ingest() + ingestion_only_total_time = time.perf_counter() - ingest_start + + ingest_local_results, result_df, ray_download_time, num_rows = _collect_results(run_mode, raw_result) + + if save_intermediate is not None: + out_dir = Path(save_intermediate).expanduser().resolve() + out_dir.mkdir(parents=True, exist_ok=True) + out_path = out_dir / "extraction.parquet" + result_df.to_parquet(out_path, index=False) + logger.info("Wrote extraction Parquet for intermediate use: %s", out_path) + + if detection_summary_file is not None: + from nemo_retriever.utils.detection_summary import ( + collect_detection_summary_from_df, + write_detection_summary, + ) + + write_detection_summary( + Path(detection_summary_file), + collect_detection_summary_from_df(result_df), + ) + + # --- Write to LanceDB ---------------------------------------- + lancedb_write_start = time.perf_counter() + handle_lancedb(ingest_local_results, lancedb_uri, LANCEDB_TABLE, hybrid=hybrid, mode="overwrite") + lancedb_write_time = time.perf_counter() - lancedb_write_start + + # --- Evaluation --------------------------------------------- + import lancedb as _lancedb_mod + + db = _lancedb_mod.connect(lancedb_uri) + table = db.open_table(LANCEDB_TABLE) + + if int(table.count_rows()) == 0: + logger.warning("LanceDB table is empty; skipping %s evaluation.", evaluation_mode) + _write_runtime_summary( + runtime_metrics_dir, + runtime_metrics_prefix, + { + "run_mode": run_mode, + "input_path": str(Path(input_path).resolve()), + "input_pages": int(num_rows), + "num_pages": int(num_rows), + "num_rows": int(len(result_df.index)), + "ingestion_only_secs": float(ingestion_only_total_time), + "ray_download_secs": float(ray_download_time), + "lancedb_write_secs": float(lancedb_write_time), + "evaluation_secs": 0.0, + "total_secs": float(time.perf_counter() - ingest_start), + "evaluation_mode": evaluation_mode, + "evaluation_metrics": {}, + "recall_details": bool(recall_details), + "lancedb_uri": str(lancedb_uri), + "lancedb_table": str(LANCEDB_TABLE), + }, + ) + if run_mode == "batch": + import ray + + ray.shutdown() + return + + evaluation_label, evaluation_total_time, evaluation_metrics, evaluation_query_count, ran = _run_evaluation( + evaluation_mode=evaluation_mode, + lancedb_uri=lancedb_uri, + embed_model_name=embed_model_name, + embed_invoke_url=embed_invoke_url, + embed_remote_api_key=embed_remote_api_key, + embed_modality=embed_modality, + query_csv=query_csv, + recall_match_mode=recall_match_mode, + audio_match_tolerance_secs=audio_match_tolerance_secs, + hybrid=hybrid, + reranker=reranker, + reranker_model_name=reranker_model_name, + beir_loader=beir_loader, + beir_dataset_name=beir_dataset_name, + beir_split=beir_split, + beir_query_language=beir_query_language, + beir_doc_id_field=beir_doc_id_field, + beir_k=beir_k, + ) + + if not ran: + _write_runtime_summary( + runtime_metrics_dir, + runtime_metrics_prefix, + { + "run_mode": run_mode, + "input_path": str(Path(input_path).resolve()), + "input_pages": int(num_rows), + "num_pages": int(num_rows), + "num_rows": int(len(result_df.index)), + "ingestion_only_secs": float(ingestion_only_total_time), + "ray_download_secs": float(ray_download_time), + "lancedb_write_secs": float(lancedb_write_time), + "evaluation_secs": 0.0, + "total_secs": float(time.perf_counter() - ingest_start), + "evaluation_mode": evaluation_mode, + "evaluation_metrics": {}, + "recall_details": bool(recall_details), + "lancedb_uri": str(lancedb_uri), + "lancedb_table": str(LANCEDB_TABLE), + }, + ) + if run_mode == "batch": + import ray + + ray.shutdown() + return + + total_time = time.perf_counter() - ingest_start + + _write_runtime_summary( + runtime_metrics_dir, + runtime_metrics_prefix, + { + "run_mode": run_mode, + "input_path": str(Path(input_path).resolve()), + "input_pages": int(num_rows), + "num_pages": int(num_rows), + "num_rows": int(len(result_df.index)), + "ingestion_only_secs": float(ingestion_only_total_time), + "ray_download_secs": float(ray_download_time), + "lancedb_write_secs": float(lancedb_write_time), + "evaluation_secs": float(evaluation_total_time), + "total_secs": float(total_time), + "evaluation_mode": evaluation_mode, + "evaluation_metrics": dict(evaluation_metrics), + "evaluation_count": evaluation_query_count, + "recall_details": bool(recall_details), + "lancedb_uri": str(lancedb_uri), + "lancedb_table": str(LANCEDB_TABLE), + }, + ) + + if run_mode == "batch": + import ray + + ray.shutdown() + + from nemo_retriever.utils.detection_summary import print_run_summary + + print_run_summary( + num_rows, + Path(input_path), + hybrid, + lancedb_uri, + LANCEDB_TABLE, + total_time, + ingestion_only_total_time, + ray_download_time, + lancedb_write_time, + evaluation_total_time, + evaluation_metrics, + evaluation_label=evaluation_label, + evaluation_count=evaluation_query_count, + ) + finally: + os.sys.stdout = original_stdout + os.sys.stderr = original_stderr + if log_handle is not None: + log_handle.close() + + +def main() -> None: + """Entrypoint for ``python -m nemo_retriever.pipeline``.""" + app() + + +if __name__ == "__main__": + main() diff --git a/nemo_retriever/tests/test_pipeline_helpers.py b/nemo_retriever/tests/test_pipeline_helpers.py new file mode 100644 index 000000000..4c1d404c0 --- /dev/null +++ b/nemo_retriever/tests/test_pipeline_helpers.py @@ -0,0 +1,642 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for the private helpers in :mod:`nemo_retriever.pipeline`. + +These tests target the non-trivial pieces of logic that the ``retriever +pipeline run`` command relies on: + +* ``_resolve_file_patterns`` — file/dir/glob resolution and input-type map. +* ``_build_extract_params`` — translation from CLI flags to ``ExtractParams`` + and the nested ``BatchTuningParams``. +* ``_build_embed_params`` — translation from CLI flags to ``EmbedParams``. +* ``_collect_results`` — Ray ``take_all`` vs. in-process DataFrame + handling + ``_count_input_units`` fallback. +* ``_ensure_lancedb_table`` — idempotent LanceDB table creation. + +They also exercise the lazy attribute access in +``nemo_retriever.pipeline.__init__`` so the package-level ``app`` / ``run`` +exports stay wired to ``__main__``. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import pandas as pd +import pyarrow as pa +import pytest +import typer + +import nemo_retriever.pipeline as pipeline_pkg +from nemo_retriever.params import EmbedParams, ExtractParams +from nemo_retriever.pipeline.__main__ import ( + _build_embed_params, + _build_extract_params, + _collect_results, + _count_input_units, + _ensure_lancedb_table, + _resolve_file_patterns, +) + + +# ============================================================================= +# Package-level lazy exports (pipeline/__init__.py) +# ============================================================================= + + +class TestPipelinePackageExports: + """The ``nemo_retriever.pipeline`` package forwards ``app`` and ``run``.""" + + def test_app_is_forwarded_from_main(self): + from nemo_retriever.pipeline.__main__ import app as main_app + + assert pipeline_pkg.app is main_app + + def test_run_is_forwarded_from_main(self): + from nemo_retriever.pipeline.__main__ import run as main_run + + assert pipeline_pkg.run is main_run + + def test_unknown_attribute_raises_attribute_error(self): + with pytest.raises(AttributeError, match="no attribute 'does_not_exist'"): + _ = pipeline_pkg.does_not_exist # type: ignore[attr-defined] + + def test_dunder_all(self): + assert set(pipeline_pkg.__all__) == {"app", "run"} + + +# ============================================================================= +# _resolve_file_patterns +# ============================================================================= + + +class TestResolveFilePatterns: + """File / directory / input-type resolution.""" + + # --- single-file short-circuit ---------------------------------------- + def test_single_existing_file_returned_verbatim(self, tmp_path: Path) -> None: + pdf = tmp_path / "doc.pdf" + pdf.write_bytes(b"%PDF-1.4 test") + + # input_type does not matter when the path is an existing file. + result = _resolve_file_patterns(pdf, "pdf") + assert result == [str(pdf)] + + def test_single_file_ignores_input_type_mismatch(self, tmp_path: Path) -> None: + """A single file path is returned as-is, even if the extension doesn't match.""" + f = tmp_path / "arbitrary.pdf" + f.write_bytes(b"not really a pdf") + + # We still get [str(file)] back even though input_type is wildly wrong. + result = _resolve_file_patterns(f, "audio") + assert result == [str(f)] + + # --- directory: per-input-type extension maps -------------------------- + @pytest.mark.parametrize( + "input_type,files,expected_patterns", + [ + ("pdf", ["a.pdf"], ["*.pdf"]), + ("txt", ["notes.txt"], ["*.txt"]), + ("html", ["page.html"], ["*.html"]), + ("image", ["a.jpg"], ["*.jpg"]), + ("image", ["b.jpeg"], ["*.jpeg"]), + ("image", ["c.png"], ["*.png"]), + ("image", ["d.tiff"], ["*.tiff"]), + ("image", ["e.bmp"], ["*.bmp"]), + ("audio", ["clip.mp3"], ["*.mp3"]), + ("audio", ["clip.wav"], ["*.wav"]), + ("audio", ["clip.m4a"], ["*.m4a"]), + ], + ) + def test_directory_match_by_input_type( + self, + tmp_path: Path, + input_type: str, + files: list[str], + expected_patterns: list[str], + ) -> None: + for name in files: + (tmp_path / name).write_bytes(b"x") + + result = _resolve_file_patterns(tmp_path, input_type) + + # Every returned pattern should match at least one expected suffix. + assert result == [str(tmp_path / p) for p in expected_patterns] + + def test_doc_input_type_matches_both_docx_and_pptx(self, tmp_path: Path) -> None: + (tmp_path / "a.docx").write_bytes(b"x") + (tmp_path / "b.pptx").write_bytes(b"x") + + result = _resolve_file_patterns(tmp_path, "doc") + + assert result == [str(tmp_path / "*.docx"), str(tmp_path / "*.pptx")] + + def test_doc_input_type_only_one_family_present(self, tmp_path: Path) -> None: + """When only docx files are present, we still only get the docx glob back.""" + (tmp_path / "a.docx").write_bytes(b"x") + + result = _resolve_file_patterns(tmp_path, "doc") + + # Only the matching glob survives the non-empty filter. + assert result == [str(tmp_path / "*.docx")] + + def test_image_input_type_multiple_extensions_filtered(self, tmp_path: Path) -> None: + """Only extensions that actually match get returned; empty globs are filtered out.""" + (tmp_path / "a.jpg").write_bytes(b"x") + (tmp_path / "b.png").write_bytes(b"x") + + result = _resolve_file_patterns(tmp_path, "image") + + assert set(result) == {str(tmp_path / "*.jpg"), str(tmp_path / "*.png")} + # No unmatched patterns slip through. + assert not any(p.endswith("*.tiff") for p in result) + assert not any(p.endswith("*.bmp") for p in result) + assert not any(p.endswith("*.jpeg") for p in result) + + # --- error paths ------------------------------------------------------- + def test_nonexistent_path_raises_bad_parameter(self, tmp_path: Path) -> None: + missing = tmp_path / "does_not_exist" + with pytest.raises(typer.BadParameter, match="Path does not exist"): + _resolve_file_patterns(missing, "pdf") + + def test_unsupported_input_type_raises_bad_parameter(self, tmp_path: Path) -> None: + with pytest.raises(typer.BadParameter, match="Unsupported --input-type"): + _resolve_file_patterns(tmp_path, "parquet") + + def test_directory_with_no_matches_raises_bad_parameter(self, tmp_path: Path) -> None: + # Directory exists but contains nothing matching the pdf glob. + (tmp_path / "sidecar.json").write_bytes(b"{}") + + with pytest.raises(typer.BadParameter, match="No files found"): + _resolve_file_patterns(tmp_path, "pdf") + + def test_accepts_string_path_not_just_pathlib(self, tmp_path: Path) -> None: + (tmp_path / "sample.txt").write_bytes(b"x") + + # Upstream callers sometimes pass a ``str``; the helper coerces to Path. + result = _resolve_file_patterns(str(tmp_path), "txt") # type: ignore[arg-type] + + assert result == [str(Path(tmp_path) / "*.txt")] + + +# ============================================================================= +# _build_extract_params +# ============================================================================= + + +_EXTRACT_BASE = dict( + method="pdfium", + dpi=300, + extract_text=True, + extract_tables=True, + extract_charts=True, + extract_infographics=False, + extract_page_as_image=True, + use_graphic_elements=False, + use_table_structure=False, + table_output_format=None, + extract_remote_api_key=None, + page_elements_invoke_url=None, + ocr_invoke_url=None, + graphic_elements_invoke_url=None, + table_structure_invoke_url=None, + pdf_split_batch_size=1, + pdf_extract_batch_size=0, + pdf_extract_tasks=0, + pdf_extract_cpus_per_task=0.0, + page_elements_actors=0, + page_elements_batch_size=0, + page_elements_cpus_per_actor=0.0, + page_elements_gpus_per_actor=None, + ocr_actors=0, + ocr_batch_size=0, + ocr_cpus_per_actor=0.0, + ocr_gpus_per_actor=None, + nemotron_parse_actors=0, + nemotron_parse_batch_size=0, + nemotron_parse_gpus_per_actor=None, +) + + +class TestBuildExtractParams: + """Translation from CLI flags to ``ExtractParams`` + ``BatchTuningParams``.""" + + def test_returns_extract_params(self): + params = _build_extract_params(**_EXTRACT_BASE) + assert isinstance(params, ExtractParams) + assert params.method == "pdfium" + assert params.dpi == 300 + assert params.extract_text is True + assert params.extract_tables is True + assert params.extract_charts is True + assert params.extract_infographics is False + assert params.extract_page_as_image is True + assert params.use_graphic_elements is False + assert params.use_table_structure is False + + def test_dpi_coerced_to_int(self): + # CLI passes an int, but the helper explicitly coerces — cover the branch. + overrides = {**_EXTRACT_BASE, "dpi": 150.7} # type: ignore[dict-item] + params = _build_extract_params(**overrides) + assert params.dpi == 150 + assert isinstance(params.dpi, int) + + def test_zero_batch_sizes_are_dropped(self): + """``... or None`` should drop CLI ``0`` defaults so pydantic defaults win.""" + params = _build_extract_params(**_EXTRACT_BASE) + + tuning = params.batch_tuning + # The BatchTuningParams defaults (non-None) should remain. + assert tuning.pdf_extract_batch_size == 4 # pydantic default, not 0 + assert tuning.page_elements_batch_size == 24 + # Workers stay None (CLI 0 -> None -> pydantic Optional default None). + assert tuning.pdf_extract_workers is None + assert tuning.page_elements_workers is None + assert tuning.ocr_workers is None + + def test_pdf_split_batch_size_is_always_applied(self): + # pdf_split_batch_size lacks ``or None`` so ``1`` gets through verbatim. + params = _build_extract_params(**_EXTRACT_BASE) + assert params.batch_tuning.pdf_split_batch_size == 1 + + overrides = {**_EXTRACT_BASE, "pdf_split_batch_size": 8} + params = _build_extract_params(**overrides) + assert params.batch_tuning.pdf_split_batch_size == 8 + + def test_explicit_worker_counts_flow_through(self): + overrides = { + **_EXTRACT_BASE, + "pdf_extract_tasks": 3, + "pdf_extract_batch_size": 16, + "pdf_extract_cpus_per_task": 2.5, + "page_elements_actors": 4, + "page_elements_batch_size": 32, + "page_elements_cpus_per_actor": 0.75, + "ocr_actors": 2, + "ocr_batch_size": 12, + "ocr_cpus_per_actor": 1.25, + "nemotron_parse_actors": 1, + "nemotron_parse_batch_size": 6, + } + params = _build_extract_params(**overrides) + + tuning = params.batch_tuning + assert tuning.pdf_extract_workers == 3 + assert tuning.pdf_extract_batch_size == 16 + assert tuning.pdf_extract_num_cpus == 2.5 + assert tuning.page_elements_workers == 4 + assert tuning.page_elements_batch_size == 32 + assert tuning.page_elements_cpus_per_actor == 0.75 + assert tuning.ocr_workers == 2 + assert tuning.ocr_inference_batch_size == 12 + assert tuning.ocr_cpus_per_actor == 1.25 + assert tuning.nemotron_parse_workers == 1 + assert tuning.nemotron_parse_batch_size == 6 + + # --- GPU gating: remote URL forces gpu_* = 0.0 ----------------------- + def test_page_elements_invoke_url_forces_zero_gpu(self): + overrides = { + **_EXTRACT_BASE, + "page_elements_invoke_url": "http://pe.example/v1", + "page_elements_gpus_per_actor": 0.5, # would be 0.5 without URL + } + params = _build_extract_params(**overrides) + assert params.batch_tuning.gpu_page_elements == 0.0 + assert params.page_elements_invoke_url == "http://pe.example/v1" + + def test_ocr_invoke_url_forces_zero_gpu(self): + overrides = { + **_EXTRACT_BASE, + "ocr_invoke_url": "http://ocr.example/v1", + "ocr_gpus_per_actor": 0.75, + } + params = _build_extract_params(**overrides) + assert params.batch_tuning.gpu_ocr == 0.0 + assert params.ocr_invoke_url == "http://ocr.example/v1" + + def test_local_gpu_values_flow_through_when_no_remote_url(self): + overrides = { + **_EXTRACT_BASE, + "page_elements_gpus_per_actor": 0.25, + "ocr_gpus_per_actor": 0.5, + "nemotron_parse_gpus_per_actor": 0.33, + } + params = _build_extract_params(**overrides) + assert params.batch_tuning.gpu_page_elements == 0.25 + assert params.batch_tuning.gpu_ocr == 0.5 + assert params.batch_tuning.gpu_nemotron_parse == 0.33 + + def test_gpu_none_values_dropped_when_no_remote_url(self): + """With no remote URL and gpu=None, the key is dropped and pydantic default applies.""" + overrides = { + **_EXTRACT_BASE, + "page_elements_gpus_per_actor": None, + "ocr_gpus_per_actor": None, + } + params = _build_extract_params(**overrides) + # BatchTuningParams declares these as Optional[float] with default None. + assert params.batch_tuning.gpu_page_elements is None + assert params.batch_tuning.gpu_ocr is None + + # --- api_key + endpoints pass through -------------------------------- + def test_api_key_and_endpoints_pass_through(self): + overrides = { + **_EXTRACT_BASE, + "extract_remote_api_key": "nvapi-secret", + "page_elements_invoke_url": "http://pe/v1", + "ocr_invoke_url": "http://ocr/v1", + "graphic_elements_invoke_url": "http://ge/v1", + "table_structure_invoke_url": "http://ts/v1", + "table_output_format": "markdown", + } + params = _build_extract_params(**overrides) + assert params.api_key == "nvapi-secret" + assert params.page_elements_invoke_url == "http://pe/v1" + assert params.ocr_invoke_url == "http://ocr/v1" + assert params.graphic_elements_invoke_url == "http://ge/v1" + assert params.table_structure_invoke_url == "http://ts/v1" + # The model validator auto-enables feature flags when URLs are set. + assert params.use_graphic_elements is True + assert params.use_table_structure is True + assert params.table_output_format == "markdown" + + def test_none_values_are_filtered_before_pydantic(self): + """Keys whose CLI value is ``None`` must be omitted, not forwarded as ``None``.""" + overrides = { + **_EXTRACT_BASE, + "table_output_format": None, + "extract_remote_api_key": None, + } + params = _build_extract_params(**overrides) + # ``table_output_format=None`` gets dropped and the model validator fills + # it in based on ``use_table_structure`` (False → "pseudo_markdown"). + assert params.table_output_format == "pseudo_markdown" + # api_key is truly None since it wasn't forced. + assert params.api_key is None + + +# ============================================================================= +# _build_embed_params +# ============================================================================= + + +_EMBED_BASE = dict( + embed_model_name="nvidia/test-embed", + embed_invoke_url=None, + embed_remote_api_key=None, + embed_modality="text", + text_elements_modality=None, + structured_elements_modality=None, + embed_granularity="element", + embed_actors=0, + embed_batch_size=0, + embed_cpus_per_actor=0.0, + embed_gpus_per_actor=None, +) + + +class TestBuildEmbedParams: + """Translation from CLI flags to ``EmbedParams`` + ``BatchTuningParams``.""" + + def test_returns_embed_params(self): + params = _build_embed_params(**_EMBED_BASE) + assert isinstance(params, EmbedParams) + assert params.model_name == "nvidia/test-embed" + assert params.embed_modality == "text" + assert params.embed_granularity == "element" + + def test_zero_batch_size_dropped_to_pydantic_default(self): + params = _build_embed_params(**_EMBED_BASE) + # ``embed_batch_size or None`` drops 0; pydantic default (256) wins. + assert params.batch_tuning.embed_batch_size == 256 + # Same for embed_workers (Optional, default None). + assert params.batch_tuning.embed_workers is None + # inference_batch_size comes from the same ``or None`` pattern; + # pydantic default is 32. + assert params.inference_batch_size == 32 + + def test_explicit_batch_params_flow_through(self): + overrides = { + **_EMBED_BASE, + "embed_actors": 3, + "embed_batch_size": 64, + "embed_cpus_per_actor": 1.5, + } + params = _build_embed_params(**overrides) + assert params.batch_tuning.embed_workers == 3 + assert params.batch_tuning.embed_batch_size == 64 + assert params.batch_tuning.embed_cpus_per_actor == 1.5 + # ``inference_batch_size`` is also pinned from ``embed_batch_size``. + assert params.inference_batch_size == 64 + + def test_embed_invoke_url_forces_zero_gpu(self): + overrides = { + **_EMBED_BASE, + "embed_invoke_url": "http://embed.example/v1", + "embed_gpus_per_actor": 0.5, + "embed_remote_api_key": "nvapi-xyz", + } + params = _build_embed_params(**overrides) + assert params.batch_tuning.gpu_embed == 0.0 + assert params.embed_invoke_url == "http://embed.example/v1" + assert params.api_key == "nvapi-xyz" + + def test_local_embed_gpu_value_flows_through(self): + overrides = {**_EMBED_BASE, "embed_gpus_per_actor": 0.25} + params = _build_embed_params(**overrides) + assert params.batch_tuning.gpu_embed == 0.25 + + def test_local_embed_gpu_none_dropped(self): + overrides = {**_EMBED_BASE, "embed_gpus_per_actor": None} + params = _build_embed_params(**overrides) + assert params.batch_tuning.gpu_embed is None + + def test_modality_overrides_pass_through(self): + overrides = { + **_EMBED_BASE, + "embed_modality": "text_image", + "text_elements_modality": "text", + "structured_elements_modality": "image", + "embed_granularity": "element", + } + params = _build_embed_params(**overrides) + assert params.embed_modality == "text_image" + assert params.text_elements_modality == "text" + assert params.structured_elements_modality == "image" + + def test_invalid_modality_raises_pydantic_validation_error(self): + overrides = {**_EMBED_BASE, "embed_modality": "image_text"} + with pytest.raises(Exception, match="text_image"): + _build_embed_params(**overrides) + + +# ============================================================================= +# _collect_results (and _count_input_units) +# ============================================================================= + + +class _FakeRayDataset: + """Minimal stand-in for a Ray Data dataset produced by ``ingestor.ingest()``.""" + + def __init__(self, records: list[dict[str, Any]]) -> None: + self._records = records + self.take_all_calls = 0 + + def take_all(self) -> list[dict[str, Any]]: + self.take_all_calls += 1 + return list(self._records) + + +class TestCollectResults: + """Ray (batch) and pandas (inprocess) result materialization.""" + + def test_batch_mode_calls_take_all_and_builds_dataframe(self): + rows = [ + {"source_id": "a", "text": "hello"}, + {"source_id": "a", "text": "world"}, + {"source_id": "b", "text": "!"}, + ] + fake = _FakeRayDataset(rows) + + records, df, download_time, num_units = _collect_results("batch", fake) + + assert fake.take_all_calls == 1 + assert records == rows + assert isinstance(df, pd.DataFrame) + assert list(df.columns) == ["source_id", "text"] + assert len(df) == 3 + # ``source_id`` has two distinct values → that is the unit count. + assert num_units == 2 + # Elapsed time is measured with ``perf_counter``; it must be non-negative. + assert download_time >= 0.0 + + def test_batch_mode_handles_empty_result(self): + fake = _FakeRayDataset([]) + records, df, download_time, num_units = _collect_results("batch", fake) + assert records == [] + assert df.empty + # Empty DataFrame has no columns → falls through to len(df.index) == 0. + assert num_units == 0 + assert download_time >= 0.0 + + def test_inprocess_mode_accepts_dataframe_directly(self): + rows = [ + {"source_id": "a", "text": "x"}, + {"source_id": "b", "text": "y"}, + ] + df_in = pd.DataFrame(rows) + + records, df_out, download_time, num_units = _collect_results("inprocess", df_in) + + # The DataFrame is passed through unchanged (same object). + assert df_out is df_in + assert records == rows + # inprocess mode never incurs Ray download time. + assert download_time == 0.0 + assert num_units == 2 + + +class TestCountInputUnits: + """``_count_input_units`` fallback chain: source_id -> source_path -> len.""" + + def test_prefers_source_id(self): + df = pd.DataFrame( + { + "source_id": ["a", "a", "b"], + "source_path": ["/p1", "/p1", "/p2"], + "text": ["x", "y", "z"], + } + ) + assert _count_input_units(df) == 2 + + def test_falls_back_to_source_path(self): + df = pd.DataFrame( + { + "source_path": ["/p1", "/p2", "/p2", "/p3"], + "text": ["x", "y", "z", "w"], + } + ) + assert _count_input_units(df) == 3 + + def test_falls_back_to_len_when_no_source_columns(self): + df = pd.DataFrame({"text": ["x", "y", "z", "w"]}) + assert _count_input_units(df) == 4 + + def test_empty_dataframe_without_columns(self): + df = pd.DataFrame() + assert _count_input_units(df) == 0 + + +# ============================================================================= +# _ensure_lancedb_table +# ============================================================================= + + +class TestEnsureLancedbTable: + """Idempotent creation of the LanceDB table used by ``pipeline run``.""" + + def test_creates_uri_directory_when_missing(self, tmp_path: Path): + uri = tmp_path / "new_lancedb" + assert not uri.exists() + + _ensure_lancedb_table(str(uri), "nv-ingest") + + assert uri.exists() and uri.is_dir() + + def test_creates_table_with_lancedb_schema(self, tmp_path: Path): + import lancedb + from nemo_retriever.vector_store.lancedb_utils import lancedb_schema + + uri = tmp_path / "lancedb" + _ensure_lancedb_table(str(uri), "nv-ingest") + + db = lancedb.connect(str(uri)) + # Table exists and is empty. + tbl = db.open_table("nv-ingest") + assert tbl.count_rows() == 0 + + # The schema matches the canonical lancedb_schema() pa schema. + expected = lancedb_schema() + actual = tbl.schema + assert actual.names == expected.names + for name in expected.names: + assert actual.field(name).type == expected.field(name).type + + def test_is_idempotent_when_table_already_exists(self, tmp_path: Path): + import lancedb + + uri = tmp_path / "lancedb" + _ensure_lancedb_table(str(uri), "nv-ingest") + + # Seed one row so we can confirm the table was not recreated/emptied. + db = lancedb.connect(str(uri)) + tbl = db.open_table("nv-ingest") + schema = tbl.schema + payload = {f.name: [None] for f in schema} + payload["page_number"] = [1] + payload["text"] = ["seed"] + row = pa.table(payload, schema=schema) + tbl.add(row) + assert tbl.count_rows() == 1 + + # Second call must be a no-op — the existing data must survive. + _ensure_lancedb_table(str(uri), "nv-ingest") + tbl = lancedb.connect(str(uri)).open_table("nv-ingest") + assert tbl.count_rows() == 1 + + def test_respects_custom_table_name(self, tmp_path: Path): + import lancedb + + uri = tmp_path / "lancedb" + _ensure_lancedb_table(str(uri), "custom-name") + + db = lancedb.connect(str(uri)) + # Prefer the non-deprecated ``list_tables`` API; fall back to + # ``table_names`` on older LanceDB releases. Both return either a flat + # sequence of names or a pydantic-style response with a ``tables`` attr. + raw = db.list_tables() if hasattr(db, "list_tables") else db.table_names() + names = getattr(raw, "tables", None) or list(raw) + assert "custom-name" in names