[DataLoader] Add batch_size for intra-file streaming#493
Closed
cbb330 wants to merge 2 commits intolinkedin:mainfrom
Closed
[DataLoader] Add batch_size for intra-file streaming#493cbb330 wants to merge 2 commits intolinkedin:mainfrom
cbb330 wants to merge 2 commits intolinkedin:mainfrom
Conversation
Tests that validate how Iceberg snapshot expiration interacts with tags and branches, demonstrating that refs protect snapshots from expiration and that the `history.expire.max-ref-age-ms` table property can be used to automatically expire refs and their protected snapshots.
Use ArrivalOrder(concurrent_streams=1) from pyiceberg PR #3046 to stream RecordBatches incrementally instead of materializing entire files into memory. The new batch_size parameter controls rows per batch, preventing OOM on large files in distributed workers.
Collaborator
Author
|
Folded into #491 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
batch_size: int | Noneparameter toOpenHouseDataLoaderandDataLoaderSplitTaskOrder(materializes entire file vialist()) toArrivalOrder(concurrent_streams=1)for intra-file streaming — prevents OOM on large files in distributed workersbatch_sizecontrols rows per RecordBatch;None(default) uses PyArrow default (~131K rows)Stacked on #491 — merge that first.
Test plan
test_batch_size_default_returns_all_data— backwards compatibilitytest_batch_size_limits_rows_per_batch— 100 rows with batch_size=10 produces ≤10-row batchestest_batch_size_returns_correct_data— data integrity preservedtest_batch_size_with_columns_and_filters— works with projection + filterstest_batch_size_with_empty_table— no crash on empty tabletest_split_batch_size_limits_rows_per_batch— split-level enforcementtest_split_batch_size_none_returns_all_rows— default preserves all datatest_split_batch_size_preserves_data— non-even row counts handled correctly