Skip to content

Async ray executor#1877

Open
jdye64 wants to merge 10 commits intoNVIDIA:mainfrom
jdye64:async-ray-executor
Open

Async ray executor#1877
jdye64 wants to merge 10 commits intoNVIDIA:mainfrom
jdye64:async-ray-executor

Conversation

@jdye64
Copy link
Copy Markdown
Collaborator

@jdye64 jdye64 commented Apr 20, 2026

Please review and merge #1860 first

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.
  • If adjusting docker-compose.yaml environment variables have you ensured those are mimicked in the Helm values.yaml file.

@jdye64 jdye64 marked this pull request as ready for review April 20, 2026 17:49
@jdye64 jdye64 requested review from a team as code owners April 20, 2026 17:49
@jdye64 jdye64 requested a review from drobison00 April 20, 2026 17:49
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 20, 2026

Greptile Summary

This PR converts the Ray operator pipeline from synchronous __call__ to async by adding aprocess/arun/async __call__ through the AbstractOperator and ArchetypeOperator hierarchy, along with async variants of the core OCR and chart inference functions (aocr_page_elements, agraphic_elements_ocr_page_elements, anemotron_parse_page_elements). Several issues from the prior review round have been addressed: the invasive DefaultEventLoopPolicy reset is replaced with the safer _ensure_event_loop(), the _BatchEmbedActor alias is replaced with a proper class BatchEmbedActor, and all test helpers are unified behind the _run() utility in testing_utils.py.

Confidence Score: 4/5

Safe to merge once the remaining print() calls in new async functions are replaced with logger.warning()

The prior round P0/P1 concerns (policy reset, _BatchEmbedActor naming, get_event_loop deprecation, use_table_structure drop) have all been addressed or were already flagged. The only new finding is P2: print() in aocr_page_elements and async nemotron parse in ocr/shared.py, plus the still-open chart/shared.py print() from the prior thread. All remaining issues are style-level and do not affect runtime correctness.

nemo_retriever/src/nemo_retriever/ocr/shared.py (print() in new async functions); nemo_retriever/src/nemo_retriever/chart/shared.py (print() in agraphic_elements_ocr_page_elements — noted in prior thread)

Important Files Changed

Filename Overview
nemo_retriever/src/nemo_retriever/graph/abstract_operator.py Replaced invasive DefaultEventLoopPolicy reset with a safe _ensure_event_loop() helper; call is now async (breaking change already flagged in prior thread)
nemo_retriever/src/nemo_retriever/graph/operator_archetype.py Added aprocess/arun/async call delegation to resolved delegate; clean implementation
nemo_retriever/src/nemo_retriever/text_embed/operators.py BatchEmbedActor is now a proper top-level class (not _BatchEmbedActor alias), fixing name for node lookups; backward-compat getattr shim retained
nemo_retriever/src/nemo_retriever/ocr/shared.py New aocr_page_elements and async nemotron parse functions use print() for errors (should be logger.warning); use_table_structure silently dropped in remote async path (flagged in prior thread)
nemo_retriever/src/nemo_retriever/chart/shared.py agraphic_elements_ocr_page_elements still uses print() for all three error paths (flagged in prior thread)
nemo_retriever/tests/testing_utils.py New _run() helper cleanly handles both coroutines and plain values using asyncio.new_event_loop(); fixes deprecated asyncio.get_event_loop() pattern across all test files
nemo_retriever/src/nemo_retriever/graph/ingestor_runtime.py build_graph / build_inprocess_graph alias unchanged; BatchEmbedActor reference now correct after rename
nemo_retriever/src/nemo_retriever/operators/embedding.py Thin re-export module; correctly delegates to text_embed.operators.BatchEmbedActor

Class Diagram

%%{init: {'theme': 'neutral'}}%%
classDiagram
    class AbstractOperator {
        +preprocess(data) Any
        +process(data) Any
        +postprocess(data) Any
        +run(data) Any
        +aprocess(data) coroutine
        +arun(data) coroutine
        +__call__(data) coroutine
        +get_constructor_kwargs() dict
    }
    class ArchetypeOperator {
        +_cpu_variant_class
        +_gpu_variant_class
        +resolve_operator_class() type
        +aprocess(data) coroutine
        +arun(data) coroutine
        +__call__(data) coroutine
        -_resolve_delegate() AbstractOperator
    }
    class BatchEmbedActor {
        +prefers_cpu_variant() bool
        +cpu_variant_class() type
        +gpu_variant_class() type
    }
    class BatchEmbedCPUActor
    class BatchEmbedGPUActor

    AbstractOperator <|-- ArchetypeOperator
    ArchetypeOperator <|-- BatchEmbedActor
    AbstractOperator <|-- BatchEmbedCPUActor
    AbstractOperator <|-- BatchEmbedGPUActor
    BatchEmbedActor ..> BatchEmbedCPUActor : resolves to
    BatchEmbedActor ..> BatchEmbedGPUActor : resolves to
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/ocr/shared.py
Line: 1222-1223

Comment:
**`print()` in new async OCR and parse functions — violates no-print-statements rule**

The new `aocr_page_elements` function (and the async version of `anemotron_parse_page_elements` at line ~1411) both use `print()` for error reporting. All surrounding sync functions and other actors use `logger.warning()`. Since `aocr_page_elements` is called from Ray actor workers, these messages will bypass structured log handlers and cannot be filtered, captured, or routed to observability tooling.

```suggestion
        except BaseException as e:
            logger.warning("OCR failed: %s: %s", type(e).__name__, e)
```

**Rule Used:** Use the Python logging module (import logging; log... ([source](https://app.greptile.com/review/custom-context?memory=no-print-statements))

How can I resolve this? If you propose a fix, please make it concise.

Reviews (5): Last reviewed commit: "Address review comments" | Re-trigger Greptile

Comment thread nemo_retriever/tests/test_ingest_plans.py
Comment thread nemo_retriever/src/nemo_retriever/chart/shared.py Outdated
Comment thread nemo_retriever/src/nemo_retriever/graph/abstract_operator.py
Comment thread nemo_retriever/src/nemo_retriever/graph/abstract_operator.py Outdated
Comment thread nemo_retriever/tests/test_asr_actor.py Outdated
Comment thread nemo_retriever/src/nemo_retriever/graph/abstract_operator.py Outdated
Comment on lines +81 to +83
``asyncio.to_thread`` would allow Ray Data to run multiple
batches in parallel threads inside the same actor, causing
memory corruption.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true? Claude flagged that we're using asyncio.to_thread in our local GPU page-elements implementation, which implies this memory corruption claim is dubious

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know for sure asyncio.to_thread is not causing those types of memory issues in the GPU page-elements implementation?

Comment thread nemo_retriever/src/nemo_retriever/text_embed/operators.py
Comment thread nemo_retriever/tests/test_asr_actor.py Outdated
@charlesbluca
Copy link
Copy Markdown
Collaborator

Should we also apply these changes to NemotronParseCPUActor?

Comment thread nemo_retriever/src/nemo_retriever/graph/abstract_operator.py Outdated
Comment thread nemo_retriever/src/nemo_retriever/recall/core.py Outdated
# Internal cache for local HF embedders, keyed by model name.
_embedder_cache: dict = field(default_factory=dict, init=False, repr=False, compare=False)

def __str__(self) -> str:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where and how is this used?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an overloaded Python str function to get the string representation of the object itself. I use it in other things external to this codebase itself

Comment thread nemo_retriever/tests/test_asr_actor.py Outdated
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 22, 2026

Want your agent to iterate on Greptile's feedback? Try greploops.

Comment on lines +1199 to +1209
blocks = _parse_ocr_result(preds)
if label_name == "table":
crop_hw_table: Tuple[int, int] = (0, 0)
try:
_raw = base64.b64decode(crop_b64s[i])
with Image.open(io.BytesIO(_raw)) as _cim:
_cw, _ch = _cim.size
crop_hw_table = (_ch, _cw)
except Exception:
pass
text = _blocks_to_pseudo_markdown(blocks, crop_hw=crop_hw_table) or _blocks_to_text(blocks)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 use_table_structure silently dropped in async remote OCR path

aocr_page_elements has no use_table_structure parameter and its remote-path table handling (line 1199–1209) goes directly to _blocks_to_pseudo_markdown, skipping the _find_ts_detections_for_bbox / join_table_structure_and_ocr_output logic that the sync counterpart executes when use_table_structure=True. OCRCPUActor stores use_table_structure in self.ocr_kwargs and passes it via **kwargs to aocr_page_elements, but the async remote branch never reads it. Result: table-structure enrichment is silently absent for all remote-mode async invocations even though it is requested via the actor's config.

The non-remote branch correctly delegates to asyncio.to_thread(ocr_page_elements, ...) which preserves the logic, making the behavior inconsistent depending on whether invoke_url is set.

Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/ocr/shared.py
Line: 1199-1209

Comment:
**`use_table_structure` silently dropped in async remote OCR path**

`aocr_page_elements` has no `use_table_structure` parameter and its remote-path table handling (line 1199–1209) goes directly to `_blocks_to_pseudo_markdown`, skipping the `_find_ts_detections_for_bbox` / `join_table_structure_and_ocr_output` logic that the sync counterpart executes when `use_table_structure=True`. `OCRCPUActor` stores `use_table_structure` in `self.ocr_kwargs` and passes it via `**kwargs` to `aocr_page_elements`, but the async remote branch never reads it. Result: table-structure enrichment is silently absent for all remote-mode async invocations even though it is requested via the actor's config.

The non-remote branch correctly delegates to `asyncio.to_thread(ocr_page_elements, ...)` which preserves the logic, making the behavior inconsistent depending on whether `invoke_url` is set.

How can I resolve this? If you propose a fix, please make it concise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants