Conversation
Greptile SummaryThis PR converts the Ray operator pipeline from synchronous
|
| Filename | Overview |
|---|---|
| nemo_retriever/src/nemo_retriever/graph/abstract_operator.py | Replaced invasive DefaultEventLoopPolicy reset with a safe _ensure_event_loop() helper; call is now async (breaking change already flagged in prior thread) |
| nemo_retriever/src/nemo_retriever/graph/operator_archetype.py | Added aprocess/arun/async call delegation to resolved delegate; clean implementation |
| nemo_retriever/src/nemo_retriever/text_embed/operators.py | BatchEmbedActor is now a proper top-level class (not _BatchEmbedActor alias), fixing name for node lookups; backward-compat getattr shim retained |
| nemo_retriever/src/nemo_retriever/ocr/shared.py | New aocr_page_elements and async nemotron parse functions use print() for errors (should be logger.warning); use_table_structure silently dropped in remote async path (flagged in prior thread) |
| nemo_retriever/src/nemo_retriever/chart/shared.py | agraphic_elements_ocr_page_elements still uses print() for all three error paths (flagged in prior thread) |
| nemo_retriever/tests/testing_utils.py | New _run() helper cleanly handles both coroutines and plain values using asyncio.new_event_loop(); fixes deprecated asyncio.get_event_loop() pattern across all test files |
| nemo_retriever/src/nemo_retriever/graph/ingestor_runtime.py | build_graph / build_inprocess_graph alias unchanged; BatchEmbedActor reference now correct after rename |
| nemo_retriever/src/nemo_retriever/operators/embedding.py | Thin re-export module; correctly delegates to text_embed.operators.BatchEmbedActor |
Class Diagram
%%{init: {'theme': 'neutral'}}%%
classDiagram
class AbstractOperator {
+preprocess(data) Any
+process(data) Any
+postprocess(data) Any
+run(data) Any
+aprocess(data) coroutine
+arun(data) coroutine
+__call__(data) coroutine
+get_constructor_kwargs() dict
}
class ArchetypeOperator {
+_cpu_variant_class
+_gpu_variant_class
+resolve_operator_class() type
+aprocess(data) coroutine
+arun(data) coroutine
+__call__(data) coroutine
-_resolve_delegate() AbstractOperator
}
class BatchEmbedActor {
+prefers_cpu_variant() bool
+cpu_variant_class() type
+gpu_variant_class() type
}
class BatchEmbedCPUActor
class BatchEmbedGPUActor
AbstractOperator <|-- ArchetypeOperator
ArchetypeOperator <|-- BatchEmbedActor
AbstractOperator <|-- BatchEmbedCPUActor
AbstractOperator <|-- BatchEmbedGPUActor
BatchEmbedActor ..> BatchEmbedCPUActor : resolves to
BatchEmbedActor ..> BatchEmbedGPUActor : resolves to
Prompt To Fix All With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/ocr/shared.py
Line: 1222-1223
Comment:
**`print()` in new async OCR and parse functions — violates no-print-statements rule**
The new `aocr_page_elements` function (and the async version of `anemotron_parse_page_elements` at line ~1411) both use `print()` for error reporting. All surrounding sync functions and other actors use `logger.warning()`. Since `aocr_page_elements` is called from Ray actor workers, these messages will bypass structured log handlers and cannot be filtered, captured, or routed to observability tooling.
```suggestion
except BaseException as e:
logger.warning("OCR failed: %s: %s", type(e).__name__, e)
```
**Rule Used:** Use the Python logging module (import logging; log... ([source](https://app.greptile.com/review/custom-context?memory=no-print-statements))
How can I resolve this? If you propose a fix, please make it concise.Reviews (5): Last reviewed commit: "Address review comments" | Re-trigger Greptile
| ``asyncio.to_thread`` would allow Ray Data to run multiple | ||
| batches in parallel threads inside the same actor, causing | ||
| memory corruption. |
There was a problem hiding this comment.
Is this true? Claude flagged that we're using asyncio.to_thread in our local GPU page-elements implementation, which implies this memory corruption claim is dubious
There was a problem hiding this comment.
Do we know for sure asyncio.to_thread is not causing those types of memory issues in the GPU page-elements implementation?
|
Should we also apply these changes to |
| # Internal cache for local HF embedders, keyed by model name. | ||
| _embedder_cache: dict = field(default_factory=dict, init=False, repr=False, compare=False) | ||
|
|
||
| def __str__(self) -> str: |
There was a problem hiding this comment.
where and how is this used?
There was a problem hiding this comment.
Just an overloaded Python str function to get the string representation of the object itself. I use it in other things external to this codebase itself
|
Want your agent to iterate on Greptile's feedback? Try greploops. |
| blocks = _parse_ocr_result(preds) | ||
| if label_name == "table": | ||
| crop_hw_table: Tuple[int, int] = (0, 0) | ||
| try: | ||
| _raw = base64.b64decode(crop_b64s[i]) | ||
| with Image.open(io.BytesIO(_raw)) as _cim: | ||
| _cw, _ch = _cim.size | ||
| crop_hw_table = (_ch, _cw) | ||
| except Exception: | ||
| pass | ||
| text = _blocks_to_pseudo_markdown(blocks, crop_hw=crop_hw_table) or _blocks_to_text(blocks) |
There was a problem hiding this comment.
use_table_structure silently dropped in async remote OCR path
aocr_page_elements has no use_table_structure parameter and its remote-path table handling (line 1199–1209) goes directly to _blocks_to_pseudo_markdown, skipping the _find_ts_detections_for_bbox / join_table_structure_and_ocr_output logic that the sync counterpart executes when use_table_structure=True. OCRCPUActor stores use_table_structure in self.ocr_kwargs and passes it via **kwargs to aocr_page_elements, but the async remote branch never reads it. Result: table-structure enrichment is silently absent for all remote-mode async invocations even though it is requested via the actor's config.
The non-remote branch correctly delegates to asyncio.to_thread(ocr_page_elements, ...) which preserves the logic, making the behavior inconsistent depending on whether invoke_url is set.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/ocr/shared.py
Line: 1199-1209
Comment:
**`use_table_structure` silently dropped in async remote OCR path**
`aocr_page_elements` has no `use_table_structure` parameter and its remote-path table handling (line 1199–1209) goes directly to `_blocks_to_pseudo_markdown`, skipping the `_find_ts_detections_for_bbox` / `join_table_structure_and_ocr_output` logic that the sync counterpart executes when `use_table_structure=True`. `OCRCPUActor` stores `use_table_structure` in `self.ocr_kwargs` and passes it via `**kwargs` to `aocr_page_elements`, but the async remote branch never reads it. Result: table-structure enrichment is silently absent for all remote-mode async invocations even though it is requested via the actor's config.
The non-remote branch correctly delegates to `asyncio.to_thread(ocr_page_elements, ...)` which preserves the logic, making the behavior inconsistent depending on whether `invoke_url` is set.
How can I resolve this? If you propose a fix, please make it concise.
Please review and merge #1860 first
Checklist