Skip to content

[DataLoader] Add pyiceberg ArrivalOrder support via upstream PR#491

Open
cbb330 wants to merge 7 commits intolinkedin:mainfrom
cbb330:chbush/pyiceberg-arrival-order
Open

[DataLoader] Add pyiceberg ArrivalOrder support via upstream PR#491
cbb330 wants to merge 7 commits intolinkedin:mainfrom
cbb330:chbush/pyiceberg-arrival-order

Conversation

@cbb330
Copy link
Collaborator

@cbb330 cbb330 commented Mar 9, 2026

Summary

Point the dataloader's pyiceberg dependency at apache/iceberg-python#3046 which adds ArrivalOrder — bounded-memory concurrent record batch streaming — to ArrowScan.to_record_batches(). This is a prerequisite for the dataloader to leverage concurrent file reads in a future PR.

Thanks to @sumedhsakdeo for the upstream pyiceberg contribution that this PR depends on and the guidance towards this solution.

Dependency pinning approach

We use a PEP 508 direct reference to pin pyiceberg to the fork's commit SHA:

"pyiceberg @ git+https://github.com/sumedhsakdeo/iceberg-python@75ba28bf..."

This gets baked into the published wheel's Requires-Dist metadata, so any consumer (including lipy-openhouse) that installs openhouse-dataloader will resolve pyiceberg from the pinned fork commit — not from PyPI or lerna. Both dev/CI (uv sync) and deployed artifacts use the same fork.

Options evaluated

Option Pros Cons Verdict
[tool.uv.sources] override Simple to configure; works for local dev and CI (uv sync); does not change the published dependency specifier (pyiceberg~=0.11.0) uv-only — not part of PEP 621; not baked into wheel metadata, so consumers (including lipy-openhouse) would resolve stock pyiceberg from PyPI/Artifactory, missing the ArrivalOrder API; dev and deploy diverge silently Insufficient — only covers dev/CI, not deployed artifacts
JFrog Artifactory upload Immutable artifact, deterministic resolution, works with any Python tooling (pip, uv, poetry) Must manually build and upload a wheel for each PR revision; artifact has no traceability back to source; stale the moment the upstream PR gets a new commit; need to clean up the custom artifact once the upstream PR merges and a release is cut; unclear if ELR would need updating for a custom-built version of an already-approved library, adding process risk Overkill for tracking a pre-merge upstream PR that is still evolving
LinkedIn fork (linkedin/iceberg-python) Same pros as PEP 508 direct reference; LinkedIn-owned repo gives organizational control over the fork Blocked on internal process to open a new OSS project; CI is more complicated and requires new repo setup and ELR process; adds ongoing maintenance burden for a temporary dependency that will revert to upstream once the PR merges Too slow for the temporary pin we need now. We will require more patches in feature so the process should be started in parallel
PEP 508 direct reference Clear source provenance; pinned commit SHA in both pyproject.toml and wheel metadata; reproducible across dev, CI, and production; no ELR ambiguity since it's the same OSS project at an unreleased commit Pinned to a fork (upstream apache/ repo doesn't expose PR commits to uv's git resolver); must manually bump SHA if the upstream PR updates; requires allow-direct-references = true in hatchling config Best fit — portable, reproducible, and explicitly pinned for both dev and deploy

Once the upstream PR merges and a new pyiceberg release includes ArrivalOrder, we revert to a standard version specifier (e.g. pyiceberg~=0.12.0) and remove allow-direct-references.

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

integrations/python/dataloader/pyproject.toml — Changed pyiceberg from ~=0.11.0 version specifier to a PEP 508 direct reference pinned at commit 75ba28bf on sumedhsakdeo/iceberg-python. Added [tool.hatch.metadata] allow-direct-references = true for hatchling.

integrations/python/dataloader/tests/test_arrival_order.py — New test file verifying:

  • ScanOrder, TaskOrder, ArrivalOrder are importable from pyiceberg.table
  • ArrivalOrder dataclass defaults and custom parameters work correctly
  • ArrivalOrder validates concurrent_streams >= 1 and max_buffered_batches >= 1
  • ArrowScan.to_record_batches() accepts order=TaskOrder() and order=ArrivalOrder() and returns correct data

integrations/python/dataloader/uv.lock — Updated to resolve pyiceberg from git.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

make verify passes — all 102 tests pass (10 new + 92 existing), lint, format, and mypy checks all green. Built wheel and confirmed Requires-Dist: pyiceberg @ git+https://...@75ba28bf... is in the published metadata.

Additional Information

  • Breaking Changes
  • Deprecations
  • Large PR broken into smaller PRs, and PR plan linked in the description.

No breaking changes. The existing to_record_batches() call in DataLoaderSplit.__iter__ continues to use the default TaskOrder() behavior. A follow-up PR will use ArrivalOrder to enable concurrent reads.

@cbb330 cbb330 force-pushed the chbush/pyiceberg-arrival-order branch from 75bce14 to 3d93be9 Compare March 9, 2026 20:34
ignore_missing_imports = true

[tool.uv.sources]
pyiceberg = { git = "https://github.com/sumedhsakdeo/iceberg-python", branch = "fix/arrow-scan-benchmark-3036" }
Copy link
Collaborator

@sumedhsakdeo sumedhsakdeo Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rev = "<your-commit-sha>"

we should pin it to a sha instead of branch. I will be careful that I don't force rebase it.

Copy link
Collaborator Author

@cbb330 cbb330 Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done switched to rev = "75ba28bf..." pinned to the fork. Tried pointing at apache/iceberg-python directly but uv's git resolver doesn't fetch PR refs (refs/pull/3046/head), so the SHA isn't reachable via a normal clone of upstream. The fork works since the commit is on a regular branch there.

Copy link
Collaborator

@sumedhsakdeo sumedhsakdeo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you build this package locally as a whl, and then pip install the whl in a virtual env, and test in a ipython shell if you can run the imports fine?

Copy link
Collaborator

@sumedhsakdeo sumedhsakdeo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. @ShreyeshArangath / @robreeves - thoughts to unblocking?

@cbb330
Copy link
Collaborator Author

cbb330 commented Mar 10, 2026

@sumedhsakdeo
regarding

Can you build this package locally as a whl, and then pip install the whl in a virtual env, and test in a ipython shell if you can run the imports fine?

I executed this test e2e and note the steps below:

  1. Build the wheel as CD would
  • Ran uv build with SETUPTOOLS_SCM_PRETEND_VERSION=0.5.999 to simulate what the CI/CD pipeline does (it injects a real semver version via that env var). This produced openhouse_dataloader-0.5.999-py3-none-any.whl — the same artifact format that gets published to Artifactory.
  • We then inspected the wheel's METADATA file (the packaging equivalent of a manifest) and confirmed it contains: Requires-Dist: pyiceberg @ git+https://github.com/sumedhsakdeo/iceberg-python@75ba28bfc6d8bbeac398357c6db80327632a2dc8. This is the key, the fork URL and SHA are embedded in the artifact itself, not in any external config.
  1. Install in a completely isolated venv
  • Created a brand new virtual environment at /tmp/oh-test-venv with no prior packages. Then installed the wheel using uv pip install , exactly how a downstream consumer would install it. No pyproject.toml, no uv.lock, no [tool.uv.sources], just the wheel file.
  • uv read the wheel's Requires-Dist metadata, saw the direct reference for pyiceberg, and cloned it from the fork at the pinned SHA. The install log confirmed: + pyiceberg==0.11.0 (from git+https://github.com/sumedhsakdeo/iceberg-python@75ba28bf...)
  1. Verify the ArrivalOrder API is actually available
  • Ran a Python script in that isolated venv that:
    • Imports ScanOrder, TaskOrder, ArrivalOrder from pyiceberg.table
    • Creates an ArrivalOrder() with defaults and verifies concurrent_streams=8, max_buffered_batches=16
    • Creates one with custom params and verifies they stick
    • Asserts both TaskOrder and ArrivalOrder are subclasses of ScanOrder
  • All passed. This proves that a consumer who installs the published openhouse-dataloader wheel — with no knowledge of the fork, no special config automatically gets the correct pyiceberg with ArrivalOrder available.

Copy link
Collaborator

@ShreyeshArangath ShreyeshArangath left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, with the way the Dataloader is set up, users cannot use ArrivalOrder since the ArrowScan implementation is not exposed. Should we also expose the required configuration properties so users can configure and tune the arrival order behavior? A separate PR works for this too but seemed like a simple change to me.

Copy link
Collaborator

@ShreyeshArangath ShreyeshArangath left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I'll let @robreeves take a final look

ArrivalOrder(max_buffered_batches=0)


class TestToRecordBatchesOrder:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests directly test PyArrow. IMO they should exist in the PyArrow project, not here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests guard against accidentally building against iceberg-python/main (or a released version that doesn't include the ArrivalOrder API). else they'd fail with ImportError on the ScanOrder/ArrivalOrder imports alone.

They also pin the expected defaults and public API surface. When we eventually point to a new fork or upstream release, these tests catch any accidental changes to default values or parameter validation.

Given that we're depending on an unreleased API from a pre-merge PR, I think the tests are worth keeping.

cbb330 and others added 7 commits March 10, 2026 20:20
Point pyiceberg dependency at sumedhsakdeo/iceberg-python#3046 which adds
ArrivalOrder — bounded-memory concurrent record batch streaming — to
ArrowScan.to_record_batches().

Co-Authored-By: Sumedh Sakdeo <sumedhsakdeo@gmail.com>
Co-Authored-By: Sumedh Sakdeo <sumedhsakdeo@gmail.com>
Replace [tool.uv.sources] override with a PEP 508 direct reference in
dependencies so the fork SHA is baked into the published wheel metadata,
not just the dev environment. This ensures both openhouse-dataloader and
lipy-openhouse resolve pyiceberg from the same fork commit.

Co-Authored-By: Sumedh Sakdeo <sumedhsakdeo@gmail.com>
Use ArrivalOrder(concurrent_streams=1) from pyiceberg PR #3046 to
stream RecordBatches incrementally instead of materializing entire
files into memory. The new batch_size parameter controls rows per
batch, preventing OOM on large files in distributed workers.
- Rename _make_large_table → _make_table in test_data_loader_split.py
- Split _make_arrow_scan_and_task into _make_arrow_scan and
  _make_file_scan_task in test_arrival_order.py
- Remove ArrivalOrder from __iter__ docstring, use reviewer's suggested
  wording about bounded memory (data_loader_split.py)
- Replace data-verification batch_size tests in test_data_loader.py with
  passthrough tests that only check batch_size is forwarded to splits
- Add batch_size to integration tests with batch count assertions
@cbb330 cbb330 force-pushed the chbush/pyiceberg-arrival-order branch from e021ba9 to 4ff2172 Compare March 11, 2026 03:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants