vector_store: stream VDB uploads in graph pipeline via client VDB wrapper#1847
Draft
jioffe502 wants to merge 7 commits intoNVIDIA:mainfrom
Draft
vector_store: stream VDB uploads in graph pipeline via client VDB wrapper#1847jioffe502 wants to merge 7 commits intoNVIDIA:mainfrom
jioffe502 wants to merge 7 commits intoNVIDIA:mainfrom
Conversation
…pper - VDBUploadOperator wraps client VDB classes (LanceDB, Milvus) as a streaming graph stage with concurrency=1 and batch_size=64 - Preprocess extracts canonical records; process writes per-backend - Finalization delegates to client LanceDB.write_to_index for indexing - jp20 recall@5=0.8783 (parity), PPS=21.79 (parity with baseline 21.50) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
…atch - Fix _write_via_client to use get_connection_params/get_write_params matching the client Milvus.run() dispatch pattern - Add integration_test_milvus_vdb.py (writes + search verification) - Add integration_test_milvus_recall.py (full jp20 pipeline + recall) - Milvus recall: @1=0.6435 @5=0.8783 @10=0.9217 (matches LanceDB) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
- Remove monkeypatches for _ensure_lancedb_table and handle_lancedb - Add vdb_upload/store/caption/dedup stubs to _FakeIngestor - Add count() to _FakeDataset for no-collect code path - Delete dead handle_lancedb function and its tests Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
- black reformatting on test files - remove unused imports flagged by flake8 - end-of-file-fixer on lancedb_store.py Co-Authored-By: Claude Opus 4 <noreply@anthropic.com> Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
3 tasks
Lets callers pass a ready-made LanceDB/Milvus instance from nv_ingest_client.util.vdb directly into GraphIngestor.vdb_upload(vdb_op=...) so the graph wraps it instead of rebuilding one from VdbUploadParams. The operator captures vdb_op in get_constructor_kwargs() so it round-trips to Ray workers unchanged. - VDBUploadOperator: new vdb_op kwarg, backend derived from its class name - GraphIngestor.vdb_upload + build_graph + _append_ordered_transform_stages thread vdb_op through to the operator - _finalize_vdb reuses the passed LanceDB instance for post-run indexing - Extracted build_client_lancedb helper; removed dead build_lancedb_rows / build_vdb_records_from_dicts; fixed latent missing logger in graph_ingestor - Added 3 unit tests + end-to-end integration test covering both backends (bo20 ingested 831 rows to both LanceDB and Milvus via passthrough) Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
…ring Signed-off-by: Jacob Ioffe <jioffe@nvidia.com> # Conflicts: # nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py # nemo_retriever/src/nemo_retriever/graph_ingestor.py # nemo_retriever/src/nemo_retriever/vector_store/lancedb_store.py
Preparatory checkpoint for the Milvus streaming-write fix. Adds a no-op `VDBUploadOperator.finalize()` that the driver calls once after `executor.ingest()` returns (via a new `vdb_upload_ops_out` handle threaded through `build_graph`), so one-shot flush/wait-for-index work can live off the per-batch lifecycle that `AbstractOperator.run()` fires. LanceDB and the bulk-fallback write path remain no-ops. Also reverts the Milvus CLI additions from `graph_pipeline.py` (backend selection moves back to the caller passing `vdb_op=`) and sanitizes the bo20 dataset path in `test_configs.yaml`. Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
TLDR
VDBUploadOperator wraps existing client VDB classes (LanceDB, Milvus) as an in-graph streaming stage, eliminating the collect-then-write pattern. Same operator, swap the backend string — both produce matching recall.
What changed
VDBABC.preprocessextracts canonical records (backend-agnostic);processwrites per-backend (LanceDB streaming viatable.add(), Milvus viaclient.write_to_index()). Runs withconcurrency=1+batch_size=64._finalize_vdb()delegates post-pipeline index creation to the clientLanceDB.write_to_index(table=table)instead of a custom backend.graph_pipeline.pyreplacestake_all() + handle_lancedb()with.vdb_upload()in the fluent chain. VDB writes stream during the pipeline — the driver never collects the full dataset for VDB purposes. (take_allremains only for the optional detection summary path.)Quantitative evidence
LanceDB (harness validated, jp20)
Milvus (standalone integration test, jp20)
recall@10 delta is from Milvus HNSW approximate search vs LanceDB brute-force at query time.
Known follow-ups
take_all()for detection summary by default — decoupling that is a follow-upwrite_embeddings_to_lancedb()intext_embed/processor.pyis a pre-graph entry point that bypasses the operator — candidate for removal when the stage registry is retiredTest plan
test_vdb_upload_operator.py,test_vdb_record_contract.py)create_index+write_to_indexcontractretriever harness run --dataset bo20— smoke test PASSretriever harness run --dataset jp20 --override embed_batch_size=32— recall + PPS parityintegration_test_milvus_vdb.py)integration_test_milvus_recall.py)🤖 Generated with Claude Code