Skip to content

vector_store: stream VDB uploads in graph pipeline via client VDB wrapper#1847

Draft
jioffe502 wants to merge 7 commits intoNVIDIA:mainfrom
jioffe502:jioffe/vdb-graph-wiring
Draft

vector_store: stream VDB uploads in graph pipeline via client VDB wrapper#1847
jioffe502 wants to merge 7 commits intoNVIDIA:mainfrom
jioffe502:jioffe/vdb-graph-wiring

Conversation

@jioffe502
Copy link
Copy Markdown
Collaborator

TLDR

VDBUploadOperator wraps existing client VDB classes (LanceDB, Milvus) as an in-graph streaming stage, eliminating the collect-then-write pattern. Same operator, swap the backend string — both produce matching recall.

What changed

  • VDBUploadOperator wraps any client VDB class implementing the VDB ABC. preprocess extracts canonical records (backend-agnostic); process writes per-backend (LanceDB streaming via table.add(), Milvus via client.write_to_index()). Runs with concurrency=1 + batch_size=64.
  • _finalize_vdb() delegates post-pipeline index creation to the client LanceDB.write_to_index(table=table) instead of a custom backend.
  • graph_pipeline.py replaces take_all() + handle_lancedb() with .vdb_upload() in the fluent chain. VDB writes stream during the pipeline — the driver never collects the full dataset for VDB purposes. (take_all remains only for the optional detection summary path.)

Quantitative evidence

LanceDB (harness validated, jp20)

Metric Baseline (no VDB) With VDB
PPS 21.50 21.79
recall@5 0.8783 0.8783
recall@10 0.9304 0.9304

Milvus (standalone integration test, jp20)

Metric LanceDB Milvus
recall@1 0.6435 0.6435
recall@5 0.8783 0.8783
recall@10 0.9304 0.9217

recall@10 delta is from Milvus HNSW approximate search vs LanceDB brute-force at query time.

Known follow-ups

  • Harness still triggers take_all() for detection summary by default — decoupling that is a follow-up
  • write_embeddings_to_lancedb() in text_embed/processor.py is a pre-graph entry point that bypasses the operator — candidate for removal when the stage registry is retired
  • Harness backend switching (run recall evaluation against Milvus) is a follow-up wiring task

Test plan

  • 32 unit tests pass (test_vdb_upload_operator.py, test_vdb_record_contract.py)
  • Milvus mock tests prove create_index + write_to_index contract
  • retriever harness run --dataset bo20 — smoke test PASS
  • retriever harness run --dataset jp20 --override embed_batch_size=32 — recall + PPS parity
  • Milvus integration test — writes + vector search verified (integration_test_milvus_vdb.py)
  • Milvus full recall test — jp20 pipeline through Milvus, recall matches LanceDB (integration_test_milvus_recall.py)

🤖 Generated with Claude Code

jioffe502 and others added 4 commits April 14, 2026 16:10
…pper

- VDBUploadOperator wraps client VDB classes (LanceDB, Milvus) as a
  streaming graph stage with concurrency=1 and batch_size=64
- Preprocess extracts canonical records; process writes per-backend
- Finalization delegates to client LanceDB.write_to_index for indexing
- jp20 recall@5=0.8783 (parity), PPS=21.79 (parity with baseline 21.50)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
…atch

- Fix _write_via_client to use get_connection_params/get_write_params
  matching the client Milvus.run() dispatch pattern
- Add integration_test_milvus_vdb.py (writes + search verification)
- Add integration_test_milvus_recall.py (full jp20 pipeline + recall)
- Milvus recall: @1=0.6435 @5=0.8783 @10=0.9217 (matches LanceDB)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
- Remove monkeypatches for _ensure_lancedb_table and handle_lancedb
- Add vdb_upload/store/caption/dedup stubs to _FakeIngestor
- Add count() to _FakeDataset for no-collect code path
- Delete dead handle_lancedb function and its tests

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
- black reformatting on test files
- remove unused imports flagged by flake8
- end-of-file-fixer on lancedb_store.py

Co-Authored-By: Claude Opus 4 <noreply@anthropic.com>
Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
Lets callers pass a ready-made LanceDB/Milvus instance from
nv_ingest_client.util.vdb directly into GraphIngestor.vdb_upload(vdb_op=...)
so the graph wraps it instead of rebuilding one from VdbUploadParams. The
operator captures vdb_op in get_constructor_kwargs() so it round-trips to
Ray workers unchanged.

- VDBUploadOperator: new vdb_op kwarg, backend derived from its class name
- GraphIngestor.vdb_upload + build_graph + _append_ordered_transform_stages
  thread vdb_op through to the operator
- _finalize_vdb reuses the passed LanceDB instance for post-run indexing
- Extracted build_client_lancedb helper; removed dead build_lancedb_rows /
  build_vdb_records_from_dicts; fixed latent missing logger in graph_ingestor
- Added 3 unit tests + end-to-end integration test covering both backends
  (bo20 ingested 831 rows to both LanceDB and Milvus via passthrough)

Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
…ring

Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>

# Conflicts:
#	nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py
#	nemo_retriever/src/nemo_retriever/graph_ingestor.py
#	nemo_retriever/src/nemo_retriever/vector_store/lancedb_store.py
Preparatory checkpoint for the Milvus streaming-write fix. Adds a no-op
`VDBUploadOperator.finalize()` that the driver calls once after
`executor.ingest()` returns (via a new `vdb_upload_ops_out` handle threaded
through `build_graph`), so one-shot flush/wait-for-index work can live
off the per-batch lifecycle that `AbstractOperator.run()` fires. LanceDB
and the bulk-fallback write path remain no-ops. Also reverts the Milvus
CLI additions from `graph_pipeline.py` (backend selection moves back to
the caller passing `vdb_op=`) and sanitizes the bo20 dataset path in
`test_configs.yaml`.

Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant