Skip to content

fix(inference-logging-client): avoid arrow 2GB overflow in decode_mplog_dataframe#372

Open
dheerajchouhan08 wants to merge 2 commits into
developfrom
fix/inference-logging-arrow-batch-overflow
Open

fix(inference-logging-client): avoid arrow 2GB overflow in decode_mplog_dataframe#372
dheerajchouhan08 wants to merge 2 commits into
developfrom
fix/inference-logging-arrow-batch-overflow

Conversation

@dheerajchouhan08
Copy link
Copy Markdown
Contributor

@dheerajchouhan08 dheerajchouhan08 commented May 11, 2026

Summary

  • Lower default max_records_per_batch in decode_mplog_dataframe from 200 → 50 so multi-MB feature rows can't aggregate past Arrow's 2 GiB per-column-per-batch limit (which was crashing the Python worker before _decode_batch even ran).
  • Project the input DataFrame to only the columns _decode_batch actually reads (features, metadata, mp_config_id, entities, and row-metadata cols) before mapInPandas, so unused columns don't bloat Arrow batches.
  • Bump version 0.3.1 → 0.3.2.

Context

Decoding via decode_mplog_dataframe was failing on Databricks with a PythonException whose traceback bottomed out inside Spark's own Arrow→pandas conversion (pyarrow.lib.check_status), before any user-level decode code ran. Root cause: with max_records_per_batch=200 and multi-MB binary payloads per row, a single Arrow batch's binary column exceeded the 2 GiB 32-bit-offset limit.

Test plan

  • Re-run the failing Databricks job against the same input and confirm decode_mplog_dataframe completes.
  • Spot-check that decoded row counts and a few feature values match a known-good baseline.
  • Verify caller-supplied max_records_per_batch still overrides the new default.

🤖 Generated with Claude Code

Summary by CodeRabbit

  • Chores

    • Package version bumped to 0.3.3.
  • Bug Fixes

    • Reduced default batch size to lower memory usage during batch decoding.
    • Distributed decoding now processes only required columns and repartitions the smaller dataset for efficiency.
    • Schema fetch failures (HTTP 400) are now cached to avoid repeated network calls and error objects include status information; cache clear now removes both positive and negative entries.

Review Change Stack

… overflow

- Lower default max_records_per_batch from 200 to 50 so multi-MB feature
  rows can't aggregate past Arrow's 2 GiB per-column-per-batch limit,
  which was crashing the Python worker before _decode_batch ran.
- Project input DataFrame to only the columns _decode_batch reads before
  mapInPandas, so unused columns don't bloat Arrow batches.
- Bump version to 0.3.2.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 11, 2026

Walkthrough

Version bumped to 0.3.3. decode_mplog_dataframe now defaults arrow batch sizing to 50, projects input DataFrame to required columns before repartitioning, and repartitions the projected DataFrame. SchemaFetchError gains an optional status_code; get_feature_schema adds a thread-safe negative LRU cache for HTTP 400 errors and clear_schema_cache clears both caches.

Changes

DataFrame Projection and Batch Configuration Optimization

Layer / File(s) Summary
Schema Exception
py-sdk/inference_logging_client/inference_logging_client/exceptions.py
SchemaFetchError now accepts and stores an optional status_code (adds Optional import).
Schema Fetch Negative Cache
py-sdk/inference_logging_client/inference_logging_client/io.py
Adds a thread-safe negative LRU cache for HTTP 400 schema fetch failures, updates retry helper to raise SchemaFetchError with status_code, and preserves final non-retryable errors.
Clear Schema Cache Update
py-sdk/inference_logging_client/inference_logging_client/io.py
clear_schema_cache() now clears both the positive schema cache and the new negative (HTTP 400) cache.
DataFrame Column Projection
py-sdk/inference_logging_client/inference_logging_client/__init__.py
Distributed decoding path projects the input DataFrame to only columns required by _decode_batch: features, metadata, mp_config_id, optional entities, and existing row metadata columns; preserves order and deduplicates.
Repartition & Batch Limit Configuration
py-sdk/inference_logging_client/inference_logging_client/__init__.py
Repartition is applied to the projected DataFrame; batch_limit default is changed to 50 when max_records_per_batch is not provided; docstring updated.
Version Updates
py-sdk/inference_logging_client/inference_logging_client/__init__.py, py-sdk/inference_logging_client/pyproject.toml
Module __version__ and project version bumped to 0.3.3.

Suggested labels

coderabbit-bugfix

🚥 Pre-merge checks | ✅ 1
✅ Passed checks (1 passed)
Check name Status Explanation
Dynamic Configuration Validation ✅ Passed No changes to application-dyn-*.yml files. PR only modifies Python files in py-sdk/inference_logging_client directory. Check does not apply.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 544-546: The code sets batch_limit from max_records_per_batch and
then configures Spark without validating input; add an early check that if
max_records_per_batch is not None and is <= 0 you raise a ValueError (clear
message like "max_records_per_batch must be a positive integer"), otherwise
compute batch_limit = max_records_per_batch if provided else 50 and then call
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch",
str(batch_limit)); reference max_records_per_batch, batch_limit and the
spark.conf.set/get calls when making the change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a7fe3cce-2478-491e-b98e-c0659c22f269

📥 Commits

Reviewing files that changed from the base of the PR and between 37b045d and 31ccf36.

📒 Files selected for processing (2)
  • py-sdk/inference_logging_client/inference_logging_client/__init__.py
  • py-sdk/inference_logging_client/pyproject.toml

Comment on lines +544 to 546
batch_limit = max_records_per_batch if max_records_per_batch is not None else 50
prev_max_records = spark.conf.get("spark.sql.execution.arrow.maxRecordsPerBatch")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", str(batch_limit))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Validate max_records_per_batch bounds before setting Spark conf.

Line 544 accepts non-positive values from callers; this can fail later with a less clear Spark/Arrow error. Add an early validation and fail fast with ValueError.

Suggested patch
-    batch_limit = max_records_per_batch if max_records_per_batch is not None else 50
+    batch_limit = max_records_per_batch if max_records_per_batch is not None else 50
+    if batch_limit <= 0:
+        raise ValueError("max_records_per_batch must be a positive integer")
     prev_max_records = spark.conf.get("spark.sql.execution.arrow.maxRecordsPerBatch")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
batch_limit = max_records_per_batch if max_records_per_batch is not None else 50
prev_max_records = spark.conf.get("spark.sql.execution.arrow.maxRecordsPerBatch")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", str(batch_limit))
batch_limit = max_records_per_batch if max_records_per_batch is not None else 50
if batch_limit <= 0:
raise ValueError("max_records_per_batch must be a positive integer")
prev_max_records = spark.conf.get("spark.sql.execution.arrow.maxRecordsPerBatch")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", str(batch_limit))
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 544 - 546, The code sets batch_limit from max_records_per_batch and then
configures Spark without validating input; add an early check that if
max_records_per_batch is not None and is <= 0 you raise a ValueError (clear
message like "max_records_per_batch must be a positive integer"), otherwise
compute batch_limit = max_records_per_batch if provided else 50 and then call
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch",
str(batch_limit)); reference max_records_per_batch, batch_limit and the
spark.conf.set/get calls when making the change.

- Add SchemaFetchError.status_code so callers can distinguish HTTP failure
  modes (400 bad request vs 5xx vs network errors).
- Add a thread-safe LRU negative cache in get_feature_schema, mirroring
  the positive cache shape (OrderedDict, max 100 entries, no TTL). Only
  HTTP 400 responses are negative-cached; transient errors (5xx, network,
  401/403/429) are intentionally excluded so they can be retried.
- clear_schema_cache() now clears both caches.
- Bump version to 0.3.3.

Prevents worker tasks from hammering the inference service with the same
400 for every row when a (mp_config_id, version) schema is not registered.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
py-sdk/inference_logging_client/inference_logging_client/io.py (1)

129-146: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Negative cache key can poison valid fetches across different endpoints.

_schema_neg_cache is keyed only by (model_config_id, version). A 400 from one inference_host/api_path (misconfig, rollout mismatch, bad gateway behavior) will be replayed for other hosts/paths without a network call.

Suggested fix
- _schema_neg_cache: OrderedDict[tuple[str, int], str] = OrderedDict()
+ _schema_neg_cache: OrderedDict[tuple[str, str, str, int], str] = OrderedDict()
...
- cache_key = (model_config_id, version)
+ cache_key = (model_config_id, version)
+ neg_cache_key = (inference_host, api_path, model_config_id, version)
...
- with _schema_neg_cache_lock:
-     if cache_key in _schema_neg_cache:
-         _schema_neg_cache.move_to_end(cache_key)
-         raise SchemaFetchError(_schema_neg_cache[cache_key], status_code=400)
+ with _schema_neg_cache_lock:
+     if neg_cache_key in _schema_neg_cache:
+         _schema_neg_cache.move_to_end(neg_cache_key)
+         raise SchemaFetchError(_schema_neg_cache[neg_cache_key], status_code=400)
...
- _schema_neg_cache[cache_key] = str(e)
+ _schema_neg_cache[neg_cache_key] = str(e)

Also applies to: 151-160

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@py-sdk/inference_logging_client/inference_logging_client/io.py` around lines
129 - 146, The negative cache currently uses cache_key = (model_config_id,
version) and thus replays a 400 from one endpoint across different
inference_host/api_path; change the negative-cache key to include the endpoint
identifiers (e.g., cache_key_neg = (model_config_id, version, inference_host,
api_path)) and use that new key for both lookups and inserts in
_schema_neg_cache (where you check membership, move_to_end, and when storing the
error); ensure the positive cache behavior remains the same and update all
places that reference _schema_neg_cache so lookups and raises replay the
endpoint-specific error only.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@py-sdk/inference_logging_client/inference_logging_client/io.py`:
- Around line 129-146: The negative cache currently uses cache_key =
(model_config_id, version) and thus replays a 400 from one endpoint across
different inference_host/api_path; change the negative-cache key to include the
endpoint identifiers (e.g., cache_key_neg = (model_config_id, version,
inference_host, api_path)) and use that new key for both lookups and inserts in
_schema_neg_cache (where you check membership, move_to_end, and when storing the
error); ensure the positive cache behavior remains the same and update all
places that reference _schema_neg_cache so lookups and raises replay the
endpoint-specific error only.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d249088e-19a6-414c-839f-d49712fb5fe2

📥 Commits

Reviewing files that changed from the base of the PR and between 31ccf36 and db4cd7d.

📒 Files selected for processing (4)
  • py-sdk/inference_logging_client/inference_logging_client/__init__.py
  • py-sdk/inference_logging_client/inference_logging_client/exceptions.py
  • py-sdk/inference_logging_client/inference_logging_client/io.py
  • py-sdk/inference_logging_client/pyproject.toml
✅ Files skipped from review due to trivial changes (1)
  • py-sdk/inference_logging_client/pyproject.toml
🚧 Files skipped from review as they are similar to previous changes (1)
  • py-sdk/inference_logging_client/inference_logging_client/init.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant