feat: Webhook operator for posting intermediate results back to webserver layer#1889
Conversation
Greptile SummaryThis PR adds
|
| Filename | Overview |
|---|---|
| nemo_retriever/src/nemo_retriever/graph/webhook_operator.py | New side-effect-only operator that POSTs batch results to a webhook URL; session is lazily created and reused; exception handling correctly logs network failures and passes data through |
| nemo_retriever/src/nemo_retriever/params/models.py | Adds WebhookParams with no api_key field (addressing previous security concern); fields lack Field(description=...) but that pattern is inconsistent across existing models |
| nemo_retriever/src/nemo_retriever/graph/ingestor_runtime.py | Threads webhook_params through resolve/append helpers as keyword-only args (safe after *); webhook stage is always appended last after all pending_stages, consistent with documented behaviour |
| nemo_retriever/tests/test_webhook_operator.py | Good coverage of serialize helpers, column-subset logic, no-op paths, happy path, error handling, session reuse, and retry adapter; one issue: mocks _get_session (internal) rather than requests.Session at the boundary |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[GraphIngestor.webhook\nrecords stage + params] --> B[build_graph]
B --> C[_resolve_execution_inputs\nextracts webhook_params from plan]
C --> D[_append_ordered_transform_stages]
D --> E{webhook_params set\nAND endpoint_url truthy?}
E -- No --> F[Skip: no-op]
E -- Yes --> G[Append WebhookNotifyOperator\nalways AFTER all other stages]
G --> H[WebhookNotifyOperator.process]
H --> I{data is empty?}
I -- Yes --> J[Debug log, return data]
I -- No --> K[_dataframe_to_records\nserialise to JSON-safe dicts]
K --> L[_get_session\nlazy-init, reuse across batches]
L --> M[session.post with Retry adapter]
M --> N{HTTP success?}
N -- Yes --> O[Log INFO, return data unchanged]
N -- No --> P[logger.exception, return data unchanged]
Prompt To Fix All With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/graph/webhook_operator.py
Line: 108-116
Comment:
**`_dataframe_to_records` called outside the try/except block**
`_dataframe_to_records(data, ...)` calls `df.to_dict()` and accesses `df.columns`. If an upstream operator ever passes something other than a `pd.DataFrame` (e.g., a `ControlMessage`, a list, or `None`), the resulting `AttributeError` propagates uncaught and halts the pipeline, whereas a network failure on the same path is silently logged and swallowed. Since this is a side-effect-only operator that is meant to be non-fatal, consider wrapping the entire notification block (including serialisation) in the try/except, or add an explicit `isinstance(data, pd.DataFrame)` guard before the serialisation call.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: nemo_retriever/tests/test_webhook_operator.py
Line: 160-175
Comment:
**Mocking at internal boundary rather than the `requests` adapter**
`@patch("...WebhookNotifyOperator._get_session")` mocks a private implementation method rather than the external I/O boundary (`requests.Session.post`). Per the project's mocking discipline rules, mocks should be placed at the boundary so that internal refactors (e.g., renaming `_get_session`) don't silently stop exercising the real path. Patching `requests.Session` (or its `post` method) directly, and letting `_get_session` run, would give stronger assurance that the adapter/retry configuration is actually applied during the call.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/params/models.py
Line: 420-424
Comment:
**No validation that `endpoint_url` is an HTTP/HTTPS URL**
Any arbitrary string (e.g., `"file:///etc/passwd"`, `"ftp://..."`, or a bare hostname) is accepted and only rejected at HTTP-request time. Because the exception is swallowed and logged, a misconfigured or malicious URL fails silently. A simple Pydantic `@field_validator` (or `AnyHttpUrl` type) on `endpoint_url` would surface mistakes early and prevent the operator from being inadvertently pointed at non-HTTP schemes or internal services.
```python
from pydantic import AnyHttpUrl
endpoint_url: Optional[AnyHttpUrl] = None
```
How can I resolve this? If you propose a fix, please make it concise.Reviews (2): Last reviewed commit: "Merge branch 'main' into feat/post-per-f..." | Re-trigger Greptile
…at is global to the object and used for each invocation
As the system is processing in "online" mode the system needs a way to report the current status of the processing back to the client. This requires getting those details first to the web server layer and then it sends it back to the client. This operator assists in getting those results back to the webserver before the entire job is completed.
Checklist