Add support for end to end in-memory QA evaluation#1865
Add support for end to end in-memory QA evaluation#1865KyleZheng1284 wants to merge 7 commits intoNVIDIA:mainfrom
Conversation
Greptile SummaryThis PR adds end-to-end in-memory QA evaluation to
|
| Filename | Overview |
|---|---|
| nemo_retriever/src/nemo_retriever/evaluation/orchestrator.py | New QAEvalPipeline orchestrator — P1 issue: pair["answer"] KeyError on the retrieval success path when the answer key is absent, vs pair.get("answer", "") on the error path. |
| nemo_retriever/src/nemo_retriever/retriever.py | Live RAG SDK (retrieve, retrieve_batch, answer, pipeline) — thread-safety fixed (local top_k), top_p correctly forwarded, concurrent scoring+judge via ThreadPoolExecutor. |
| nemo_retriever/src/nemo_retriever/evaluation/config.py | Now raises ValueError when heterogeneous judges are used with build_eval_chain/build_eval_pipeline; build_eval_chain also raises for non-file retrieval types. |
| nemo_retriever/src/nemo_retriever/evaluation/retrievers.py | FileRetriever._from_dict factory for in-memory QA path with shared _initialize_index; well-tested with roundtrip, state-parity, and from_lancedb path. |
| nemo_retriever/src/nemo_retriever/evaluation/runner.py | run_eval_sweep correctly iterates evaluations with per-combo judge/client; P2: result JSON written without explicit encoding="utf-8". |
| nemo_retriever/src/nemo_retriever/io/markdown.py | Large-column filtering now applied on dataframe= path, mirroring the Parquet path's _MARKDOWN_PARQUET_COLUMNS pruning to prevent multi-GB memory spikes. |
| nemo_retriever/src/nemo_retriever/llm/clients/init.py | New clients package with lazy-load registry; P2: private helpers _build_rag_prompt/_parse_judge_response included in all, contradicting their underscore-prefixed privacy intent. |
| nemo_retriever/tests/test_live_rag.py | Comprehensive tests covering protocol compliance, all answer tiers, concurrent scoring, builder composition, top_p forwarding, and the thread-safety fix for retrieve_batch. |
| nemo_retriever/tests/test_file_retriever.py | Covers file-based and in-memory construction paths, state-parity invariant guard, normalization, and from_lancedb mock tests. |
| nemo_retriever/tests/test_llm_params.py | Validates LLMRemoteClientParams/LLMInferenceParams validation, both LiteLLMClient constructor paths, api_key redaction, and LLMJudge defaults. |
Sequence Diagram
sequenceDiagram
participant CLI as graph_pipeline.py
participant FR as FileRetriever.from_lancedb()
participant LDB as LanceDB
participant Sweep as run_eval_sweep()
participant Pipeline as QAEvalPipeline
participant LLM as LiteLLMClient
participant Judge as LLMJudge
CLI->>FR: from_lancedb(qa_pairs, lancedb_uri, page_index)
FR->>LDB: query_lancedb(batched)
LDB-->>FR: retrieval results dict
FR-->>CLI: FileRetriever (in-memory)
CLI->>Sweep: run_eval_sweep(config, qa_pairs, retriever)
loop per evaluation combo
Sweep->>Pipeline: QAEvalPipeline(retriever, llm_clients, judge)
Pipeline->>FR: retrieve(query, top_k) per row [threaded]
FR-->>Pipeline: RetrievalResult(chunks, metadata)
Pipeline->>LLM: generate(query, chunks) [ThreadPoolExecutor]
LLM-->>Pipeline: GenerationResult(answer)
Pipeline->>Judge: judge(query, reference, answer)
Judge-->>Pipeline: JudgeResult(score, reasoning)
Pipeline-->>Sweep: eval_results dict
Sweep->>Sweep: write JSON to results_dir
end
Sweep-->>CLI: sweep_results list
CLI->>CLI: raise typer.Exit(1) if any FAIL
Comments Outside Diff (1)
-
nemo_retriever/src/nemo_retriever/evaluation/orchestrator.py, line 268 (link)pair["answer"]raisesKeyErrorwhen neither key is presentpair.get("reference_answer") or pair["answer"]will propagate aKeyErrorinto theThreadPoolExecutorwhen a QA-pair has noanswerkey (e.g. a CSV with only aquerycolumn). The exception is caught by theas_completedhandler, logged as"Retrieval for query [N] failed: … 'answer'", and the row is silently replaced with an empty reference — producing misleading evaluation metrics with no actionable error.The fallback path at line 294 already does the right thing with
pair.get("answer", ""). Apply the same safe access here:Prompt To Fix With AI
This is a comment left during a code review. Path: nemo_retriever/src/nemo_retriever/evaluation/orchestrator.py Line: 268 Comment: **`pair["answer"]` raises `KeyError` when neither key is present** `pair.get("reference_answer") or pair["answer"]` will propagate a `KeyError` into the `ThreadPoolExecutor` when a QA-pair has no `answer` key (e.g. a CSV with only a `query` column). The exception is caught by the `as_completed` handler, logged as `"Retrieval for query [N] failed: … 'answer'"`, and the row is silently replaced with an empty reference — producing misleading evaluation metrics with no actionable error. The fallback path at line 294 already does the right thing with `pair.get("answer", "")`. Apply the same safe access here: How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix All With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/evaluation/orchestrator.py
Line: 268
Comment:
**`pair["answer"]` raises `KeyError` when neither key is present**
`pair.get("reference_answer") or pair["answer"]` will propagate a `KeyError` into the `ThreadPoolExecutor` when a QA-pair has no `answer` key (e.g. a CSV with only a `query` column). The exception is caught by the `as_completed` handler, logged as `"Retrieval for query [N] failed: … 'answer'"`, and the row is silently replaced with an empty reference — producing misleading evaluation metrics with no actionable error.
The fallback path at line 294 already does the right thing with `pair.get("answer", "")`. Apply the same safe access here:
```suggestion
reference = pair.get("reference_answer") or pair.get("answer", "")
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/evaluation/runner.py
Line: 157
Comment:
**Missing `encoding="utf-8"` on result JSON write**
The result file is opened without an explicit encoding, so the write uses the platform default (e.g. Windows defaults to `cp1252`). QA results that contain non-ASCII text (judge reasoning, source paths with accented characters) may be written and read back incorrectly on some platforms. All other file-opens in this PR use `encoding="utf-8"`.
```suggestion
with open(out_path, "w", encoding="utf-8") as f:
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/llm/clients/__init__.py
Line: 57-64
Comment:
**Private helpers `_build_rag_prompt` and `_parse_judge_response` in `__all__`**
Including underscore-prefixed names in `__all__` is contradictory: the leading `_` signals "internal, not for external use" while `__all__` makes them visible to `from nemo_retriever.llm.clients import *`. The docstring explains the backward-compat motivation, but for the public API contract these helpers should be re-exported only from submodule paths when explicitly imported rather than published as top-level names. Consider removing them from `__all__` and documenting a specific import path for any downstream code that needs them.
How can I resolve this? If you propose a fix, please make it concise.Reviews (8): Last reviewed commit: "resolve greptile issues + add tests case..." | Re-trigger Greptile
| if dataframe is not None: | ||
| df = dataframe | ||
| else: | ||
| parquet_path = Path(parquet_dir) | ||
| if not parquet_path.is_dir(): | ||
| raise FileNotFoundError(f"Parquet directory not found: {parquet_path}") | ||
| single = parquet_path / "extraction.parquet" | ||
| if single.is_file(): | ||
| parquet_files = [single] | ||
| else: | ||
| parquet_files = sorted(f for f in parquet_path.rglob("*.parquet") if f.name != "extraction.parquet") | ||
| if not parquet_files: | ||
| raise FileNotFoundError(f"No .parquet files in {parquet_path}") | ||
|
|
||
| dfs = [_read_parquet_for_markdown(f) for f in parquet_files] | ||
| df = pd.concat(dfs, ignore_index=True) | ||
|
|
||
| path_col = "path" if "path" in df.columns else "source_id" | ||
| if path_col not in df.columns: | ||
| raise KeyError(f"Neither 'path' nor 'source_id' found in columns: {list(df.columns)}") | ||
|
|
||
| list_keys = ("tables", "table", "charts", "chart", "infographics", "infographic") | ||
|
|
||
| docs_grouped: dict[str, list[dict]] = defaultdict(list) | ||
| for _, row in df.iterrows(): | ||
| source = str(row.get(path_col, "")) | ||
| if not source: | ||
| continue | ||
| record = row.to_dict() | ||
| for key in list_keys: | ||
| val = record.get(key) | ||
| if isinstance(val, np.ndarray): | ||
| record[key] = val.tolist() | ||
| docs_grouped[source].append(record) |
There was a problem hiding this comment.
Large-column filtering not applied on the
dataframe= path
_read_parquet_for_markdown explicitly restricts columns to _MARKDOWN_PARQUET_COLUMNS and its docstring says this "avoids multi-GB memory spikes" from embedding vectors and page images. When build_page_index is called with dataframe=result_df from graph_pipeline.py, the full ingestion DataFrame (with embeddings, base64 page images, etc.) flows through unchanged — every row.to_dict() creates a record dict carrying all columns.
For the 767-PDF use case (~84k rows), applying the same column selection before iterating would keep memory consistent with the Parquet path:
if dataframe is not None:
available = set(dataframe.columns)
cols = sorted(_MARKDOWN_PARQUET_COLUMNS & available)
df = dataframe[cols] if cols else dataframe
else:
...Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/io/markdown.py
Line: 363-396
Comment:
**Large-column filtering not applied on the `dataframe=` path**
`_read_parquet_for_markdown` explicitly restricts columns to `_MARKDOWN_PARQUET_COLUMNS` and its docstring says this "avoids multi-GB memory spikes" from embedding vectors and page images. When `build_page_index` is called with `dataframe=result_df` from `graph_pipeline.py`, the full ingestion DataFrame (with embeddings, base64 page images, etc.) flows through unchanged — every `row.to_dict()` creates a record dict carrying all columns.
For the 767-PDF use case (~84k rows), applying the same column selection before iterating would keep memory consistent with the Parquet path:
```python
if dataframe is not None:
available = set(dataframe.columns)
cols = sorted(_MARKDOWN_PARQUET_COLUMNS & available)
df = dataframe[cols] if cols else dataframe
else:
...
```
How can I resolve this? If you propose a fix, please make it concise.| "clients for heterogeneous judges.", | ||
| len(distinct_judges), | ||
| sorted(distinct_judges), | ||
| first_judge_key, | ||
| ) | ||
| config.setdefault("judge", models[first_judge_key]) | ||
|
|
||
| elif "generators" in config and "judge" in config: | ||
| generators = config["generators"] | ||
| judge_cfg = config["judge"] | ||
| judge_name = judge_cfg.get("name", "judge") | ||
| config.setdefault("models", {}) |
There was a problem hiding this comment.
Heterogeneous judges silently collapsed to first judge for
build_eval_chain/build_eval_pipeline
When the new models/evaluations format specifies different judges for different evaluations, config["judge"] is set to only the first evaluation's judge. run_eval_sweep is unaffected (it reads models[judge_name] per evaluation), but build_eval_chain and build_eval_pipeline — which both read config["judge"] — will silently apply only the first judge across all evaluations. The logger.warning fires, but the result is wrong rather than an error.
Consider raising a ValueError instead of a warning when build_eval_chain/build_eval_pipeline are called with a config that has multiple distinct judges, so callers know to switch to run_eval_sweep.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/evaluation/config.py
Line: 146-157
Comment:
**Heterogeneous judges silently collapsed to first judge for `build_eval_chain`/`build_eval_pipeline`**
When the new `models`/`evaluations` format specifies different judges for different evaluations, `config["judge"]` is set to only the first evaluation's judge. `run_eval_sweep` is unaffected (it reads `models[judge_name]` per evaluation), but `build_eval_chain` and `build_eval_pipeline` — which both read `config["judge"]` — will silently apply only the first judge across all evaluations. The `logger.warning` fires, but the result is wrong rather than an error.
Consider raising a `ValueError` instead of a warning when `build_eval_chain`/`build_eval_pipeline` are called with a config that has multiple distinct judges, so callers know to switch to `run_eval_sweep`.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
This seems like a valid issue, or not?
There was a problem hiding this comment.
Yes, it's valid. Confirmed by reading build_eval_chain (~line 240) and build_eval_pipeline (~line 300) — both do judge_cfg = config["judge"], which is the single collapsed value set during _normalize_config.
So if a user writes:
evaluations:
- generator: "model-a"
judge: "judge-x"
- generator: "model-b"
judge: "judge-y" # silently ignoredBoth evaluations end up scored by judge-x. The warning fires in _normalize_config, but by then the damage is done — there's no guard at the call site of build_eval_chain/build_eval_pipeline to stop execution.
The minimal fix is to raise in _normalize_config when distinct_judges > 1:
| "clients for heterogeneous judges.", | |
| len(distinct_judges), | |
| sorted(distinct_judges), | |
| first_judge_key, | |
| ) | |
| config.setdefault("judge", models[first_judge_key]) | |
| elif "generators" in config and "judge" in config: | |
| generators = config["generators"] | |
| judge_cfg = config["judge"] | |
| judge_name = judge_cfg.get("name", "judge") | |
| config.setdefault("models", {}) | |
| if len(distinct_judges) > 1: | |
| raise ValueError( | |
| f"Config has {len(distinct_judges)} distinct judges {sorted(distinct_judges)}. " | |
| "build_eval_chain() and build_eval_pipeline() support only a single judge. " | |
| "Use run_eval_sweep() for heterogeneous judges." | |
| ) | |
| config.setdefault("judge", models[first_judge_key]) |
This fails fast with a clear message instead of silently producing wrong scores.
7ce7eba to
37a4d53
Compare
| @classmethod | ||
| def _from_dict(cls, queries: dict[str, dict]) -> "FileRetriever": | ||
| """Build a FileRetriever from an in-memory queries dict. | ||
|
|
||
| Bypasses file I/O while reusing the same normalized index that | ||
| ``__init__`` builds from JSON. All instance methods (``retrieve``, | ||
| ``check_coverage``) work identically afterwards. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| queries : dict | ||
| ``{query_text: {"chunks": [...], "metadata": [...]}}`` -- | ||
| the same shape as the ``"queries"`` value in a retrieval JSON. | ||
| """ | ||
| if not queries: | ||
| raise ValueError("FileRetriever._from_dict: queries dict is empty") | ||
| sample = next(iter(queries.values()), {}) | ||
| if not isinstance(sample.get("chunks"), list): | ||
| raise ValueError( | ||
| "FileRetriever._from_dict: first entry is missing a 'chunks' list. " | ||
| 'Expected: {"query": {"chunks": ["..."]}}' | ||
| ) | ||
|
|
||
| instance = object.__new__(cls) | ||
| instance.file_path = "<in-memory>" | ||
| instance._norm_index = {} | ||
| instance._raw_keys = {} | ||
| instance._miss_count = 0 | ||
| instance._miss_lock = threading.Lock() | ||
| for raw_key, value in queries.items(): | ||
| norm = _normalize_query(raw_key) | ||
| instance._norm_index[norm] = value | ||
| instance._raw_keys[norm] = raw_key | ||
| return instance |
There was a problem hiding this comment.
No unit tests for the new in-memory construction path
_from_dict() is the core factory for the in-memory QA path and is also called by the public from_lancedb(). Neither path has a corresponding test. If the normalisation logic in _from_dict diverges from __init__ (e.g. a future attribute is added to __init__ but forgotten in _from_dict), the divergence will only surface at runtime. The test-coverage-new-code rule requires tests for new business logic.
Suggested additions to nemo_retriever/tests/test_retriever_queries.py (or a new test_qa_eval.py):
def test_from_dict_roundtrip():
queries = {"What is X?": {"chunks": ["chunk1", "chunk2"], "metadata": []}}
retriever = FileRetriever._from_dict(queries)
assert retriever.file_path == "<in-memory>"
result = retriever.retrieve("What is X?", top_k=5)
assert result.chunks == ["chunk1", "chunk2"]
def test_from_dict_empty_raises():
with pytest.raises(ValueError, match="empty"):
FileRetriever._from_dict({})
def test_from_dict_missing_chunks_raises():
with pytest.raises(ValueError):
FileRetriever._from_dict({"Q": {"metadata": []}})Rule Used: New functionality must include corresponding unit ... (source)
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/evaluation/retrievers.py
Line: 92-125
Comment:
**No unit tests for the new in-memory construction path**
`_from_dict()` is the core factory for the in-memory QA path and is also called by the public `from_lancedb()`. Neither path has a corresponding test. If the normalisation logic in `_from_dict` diverges from `__init__` (e.g. a future attribute is added to `__init__` but forgotten in `_from_dict`), the divergence will only surface at runtime. The `test-coverage-new-code` rule requires tests for new business logic.
Suggested additions to `nemo_retriever/tests/test_retriever_queries.py` (or a new `test_qa_eval.py`):
```python
def test_from_dict_roundtrip():
queries = {"What is X?": {"chunks": ["chunk1", "chunk2"], "metadata": []}}
retriever = FileRetriever._from_dict(queries)
assert retriever.file_path == "<in-memory>"
result = retriever.retrieve("What is X?", top_k=5)
assert result.chunks == ["chunk1", "chunk2"]
def test_from_dict_empty_raises():
with pytest.raises(ValueError, match="empty"):
FileRetriever._from_dict({})
def test_from_dict_missing_chunks_raises():
with pytest.raises(ValueError):
FileRetriever._from_dict({"Q": {"metadata": []}})
```
**Rule Used:** New functionality must include corresponding unit ... ([source](https://app.greptile.com/review/custom-context?memory=test-coverage-new-code))
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
This seems like something you should add. That divergence will cause issues later down the line.
The eval CLI (cli.py) already aborts when retrieval coverage falls below the configured min_coverage. The graph_pipeline QA path logged coverage but never enforced the threshold, creating inconsistent behavior between the two entry points. Made-with: Cursor
- build_eval_chain() now raises a clear ValueError when retrieval.type is not 'file', matching build_eval_pipeline(). - graph_pipeline validates eval_config and query_csv file existence before starting ingestion so a typo does not waste a full run. Made-with: Cursor
add144b to
b04e4ab
Compare
| "clients for heterogeneous judges.", | ||
| len(distinct_judges), | ||
| sorted(distinct_judges), | ||
| first_judge_key, | ||
| ) | ||
| config.setdefault("judge", models[first_judge_key]) | ||
|
|
||
| elif "generators" in config and "judge" in config: | ||
| generators = config["generators"] | ||
| judge_cfg = config["judge"] | ||
| judge_name = judge_cfg.get("name", "judge") | ||
| config.setdefault("models", {}) |
There was a problem hiding this comment.
This seems like a valid issue, or not?
| @classmethod | ||
| def _from_dict(cls, queries: dict[str, dict]) -> "FileRetriever": | ||
| """Build a FileRetriever from an in-memory queries dict. | ||
|
|
||
| Bypasses file I/O while reusing the same normalized index that | ||
| ``__init__`` builds from JSON. All instance methods (``retrieve``, | ||
| ``check_coverage``) work identically afterwards. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| queries : dict | ||
| ``{query_text: {"chunks": [...], "metadata": [...]}}`` -- | ||
| the same shape as the ``"queries"`` value in a retrieval JSON. | ||
| """ | ||
| if not queries: | ||
| raise ValueError("FileRetriever._from_dict: queries dict is empty") | ||
| sample = next(iter(queries.values()), {}) | ||
| if not isinstance(sample.get("chunks"), list): | ||
| raise ValueError( | ||
| "FileRetriever._from_dict: first entry is missing a 'chunks' list. " | ||
| 'Expected: {"query": {"chunks": ["..."]}}' | ||
| ) | ||
|
|
||
| instance = object.__new__(cls) | ||
| instance.file_path = "<in-memory>" | ||
| instance._norm_index = {} | ||
| instance._raw_keys = {} | ||
| instance._miss_count = 0 | ||
| instance._miss_lock = threading.Lock() | ||
| for raw_key, value in queries.items(): | ||
| norm = _normalize_query(raw_key) | ||
| instance._norm_index[norm] = value | ||
| instance._raw_keys[norm] = raw_key | ||
| return instance |
There was a problem hiding this comment.
This seems like something you should add. That divergence will cause issues later down the line.
| # Live RAG API (structured retrieval + generation) | ||
| # ------------------------------------------------------------------ | ||
|
|
||
| def retrieve( |
There was a problem hiding this comment.
what is the difference between what you get from retriever and what you get from query/queries?
There was a problem hiding this comment.
retrieve() will reshape .query()'s raw LanceDB hits into RetrievalResult which is the return type required by the RetrieverStrategy Protocol so Retriever and FileRetriever will become interchangeable at each Protocol call site
| results.append(RetrievalResult(chunks=chunks, metadata=metadata)) | ||
| return results | ||
|
|
||
| def answer( |
There was a problem hiding this comment.
This should all fall within the query/queries APIs. When I have a retriever graph query or queries runs that full retriever graph. When I call query, it should execute the entire graph for that query. Answer(llm), score(f1), retrieve(lancedb) are all individual operators within the graph. All you should have to do here is change the graph that is executed. The user facing API should not change. Or is there something I overlooked that makes these API calls necessary?
There was a problem hiding this comment.
apis exist so that single query can be a fast path (no graph or ray setup cost) used by the cli and mcp. I don't think the pipeline would be the right tool for this use case
| raise ValueError("reference length must match queries length") | ||
| df["reference_answer"] = refs | ||
| elif isinstance(queries, pd.DataFrame): | ||
| df = queries.copy() |
There was a problem hiding this comment.
why do you need to copy the df here?
There was a problem hiding this comment.
LiveRetrievalOperator and QAGenerationOperator will mutate the DataFrame in place (adding context, answer, etc.), so the copy prevents those side-effects from leaking back to the caller's df
| ... .generate(llm) | ||
| ... .score() | ||
| ... .judge(judge) | ||
| ... .run(queries=["What is RAG?"], reference=["Retrieval-augmented generation..."]) |
There was a problem hiding this comment.
I think having a simple graph param in the Retriever object that consumed the graph built by RetrieverPipelineBuilder would be sufficient. These commands here that actually execute actions I dont think belong here.
There was a problem hiding this comment.
pipeline() returns a builder that wraps LiveRetrievalOperator/QAGenerationOperator/ScoringOperator/JudgingOperator under the hood, so that folks using this get a short fluent option instead of the four imports and >> syntax. The raw-graph form should still work identically so I'd be happy to split RetrieverPipelineBuilder into evaluation/builder.py if you'd rather it live outside retriever.py.
Description
E2E in-memory QA mode on graph_pipeline (d9af7e4, 31d4e88, 632cd36) - one command that ingests, builds the page-markdown index in-memory, queries LanceDB, and runs the full QA sweep. Closes the gap that previously forced Ingest -> Export JSON -> Eval as three separate invocations.
Live-RAG SDK on Retriever (b04e4ab) - the much larger commit. Turns Retriever from a raw-hits wrapper into a first-class RAG SDK with .retrieve(), .retrieve_batch(), .answer() (single-query Tier 1+2+3 scoring), and a fluent .pipeline().generate(...).score().judge(...).run(queries) batch builder. Plus a ground-up nemo_retriever/llm/ package (shared types.py, Pydantic LLMInferenceParams + new LLMRemoteClientParams, clients/litellm.py, clients/judge.py, lazy-loaded init.py) that the eval framework was refactored to import from.
Built off of this existing PR #1754
Checklist