[DataLoader] Add pyiceberg ArrivalOrder support via upstream PR#491
[DataLoader] Add pyiceberg ArrivalOrder support via upstream PR#491cbb330 wants to merge 7 commits intolinkedin:mainfrom
Conversation
75bce14 to
3d93be9
Compare
| ignore_missing_imports = true | ||
|
|
||
| [tool.uv.sources] | ||
| pyiceberg = { git = "https://github.com/sumedhsakdeo/iceberg-python", branch = "fix/arrow-scan-benchmark-3036" } |
There was a problem hiding this comment.
rev = "<your-commit-sha>"
we should pin it to a sha instead of branch. I will be careful that I don't force rebase it.
There was a problem hiding this comment.
Done switched to rev = "75ba28bf..." pinned to the fork. Tried pointing at apache/iceberg-python directly but uv's git resolver doesn't fetch PR refs (refs/pull/3046/head), so the SHA isn't reachable via a normal clone of upstream. The fork works since the commit is on a regular branch there.
sumedhsakdeo
left a comment
There was a problem hiding this comment.
Can you build this package locally as a whl, and then pip install the whl in a virtual env, and test in a ipython shell if you can run the imports fine?
sumedhsakdeo
left a comment
There was a problem hiding this comment.
lgtm. @ShreyeshArangath / @robreeves - thoughts to unblocking?
|
@sumedhsakdeo
I executed this test e2e and note the steps below:
|
There was a problem hiding this comment.
Right now, with the way the Dataloader is set up, users cannot use ArrivalOrder since the ArrowScan implementation is not exposed. Should we also expose the required configuration properties so users can configure and tune the arrival order behavior? A separate PR works for this too but seemed like a simple change to me.
integrations/python/dataloader/src/openhouse/dataloader/data_loader.py
Outdated
Show resolved
Hide resolved
ShreyeshArangath
left a comment
There was a problem hiding this comment.
LGTM, I'll let @robreeves take a final look
integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py
Outdated
Show resolved
Hide resolved
| ArrivalOrder(max_buffered_batches=0) | ||
|
|
||
|
|
||
| class TestToRecordBatchesOrder: |
There was a problem hiding this comment.
These tests directly test PyArrow. IMO they should exist in the PyArrow project, not here.
There was a problem hiding this comment.
These tests guard against accidentally building against iceberg-python/main (or a released version that doesn't include the ArrivalOrder API). else they'd fail with ImportError on the ScanOrder/ArrivalOrder imports alone.
They also pin the expected defaults and public API surface. When we eventually point to a new fork or upstream release, these tests catch any accidental changes to default values or parameter validation.
Given that we're depending on an unreleased API from a pre-merge PR, I think the tests are worth keeping.
Point pyiceberg dependency at sumedhsakdeo/iceberg-python#3046 which adds ArrivalOrder — bounded-memory concurrent record batch streaming — to ArrowScan.to_record_batches(). Co-Authored-By: Sumedh Sakdeo <sumedhsakdeo@gmail.com>
Co-Authored-By: Sumedh Sakdeo <sumedhsakdeo@gmail.com>
Replace [tool.uv.sources] override with a PEP 508 direct reference in dependencies so the fork SHA is baked into the published wheel metadata, not just the dev environment. This ensures both openhouse-dataloader and lipy-openhouse resolve pyiceberg from the same fork commit. Co-Authored-By: Sumedh Sakdeo <sumedhsakdeo@gmail.com>
Use ArrivalOrder(concurrent_streams=1) from pyiceberg PR #3046 to stream RecordBatches incrementally instead of materializing entire files into memory. The new batch_size parameter controls rows per batch, preventing OOM on large files in distributed workers.
- Rename _make_large_table → _make_table in test_data_loader_split.py - Split _make_arrow_scan_and_task into _make_arrow_scan and _make_file_scan_task in test_arrival_order.py
- Remove ArrivalOrder from __iter__ docstring, use reviewer's suggested wording about bounded memory (data_loader_split.py) - Replace data-verification batch_size tests in test_data_loader.py with passthrough tests that only check batch_size is forwarded to splits - Add batch_size to integration tests with batch count assertions
e021ba9 to
4ff2172
Compare
Summary
Point the dataloader's pyiceberg dependency at apache/iceberg-python#3046 which adds
ArrivalOrder— bounded-memory concurrent record batch streaming — toArrowScan.to_record_batches(). This is a prerequisite for the dataloader to leverage concurrent file reads in a future PR.Thanks to @sumedhsakdeo for the upstream pyiceberg contribution that this PR depends on and the guidance towards this solution.
Dependency pinning approach
We use a PEP 508 direct reference to pin pyiceberg to the fork's commit SHA:
"pyiceberg @ git+https://github.com/sumedhsakdeo/iceberg-python@75ba28bf..."This gets baked into the published wheel's
Requires-Distmetadata, so any consumer (including lipy-openhouse) that installs openhouse-dataloader will resolve pyiceberg from the pinned fork commit — not from PyPI or lerna. Both dev/CI (uv sync) and deployed artifacts use the same fork.Options evaluated
[tool.uv.sources]overrideuv sync); does not change the published dependency specifier (pyiceberg~=0.11.0)linkedin/iceberg-python)apache/repo doesn't expose PR commits to uv's git resolver); must manually bump SHA if the upstream PR updates; requiresallow-direct-references = truein hatchling configOnce the upstream PR merges and a new pyiceberg release includes ArrivalOrder, we revert to a standard version specifier (e.g.
pyiceberg~=0.12.0) and removeallow-direct-references.Changes
integrations/python/dataloader/pyproject.toml— Changed pyiceberg from~=0.11.0version specifier to a PEP 508 direct reference pinned at commit75ba28bfonsumedhsakdeo/iceberg-python. Added[tool.hatch.metadata] allow-direct-references = truefor hatchling.integrations/python/dataloader/tests/test_arrival_order.py— New test file verifying:ScanOrder,TaskOrder,ArrivalOrderare importable frompyiceberg.tableArrivalOrderdataclass defaults and custom parameters work correctlyArrivalOrdervalidatesconcurrent_streams >= 1andmax_buffered_batches >= 1ArrowScan.to_record_batches()acceptsorder=TaskOrder()andorder=ArrivalOrder()and returns correct dataintegrations/python/dataloader/uv.lock— Updated to resolve pyiceberg from git.Testing Done
make verifypasses — all 102 tests pass (10 new + 92 existing), lint, format, and mypy checks all green. Built wheel and confirmedRequires-Dist: pyiceberg @ git+https://...@75ba28bf...is in the published metadata.Additional Information
No breaking changes. The existing
to_record_batches()call inDataLoaderSplit.__iter__continues to use the defaultTaskOrder()behavior. A follow-up PR will useArrivalOrderto enable concurrent reads.