feat: pushdown OFFSET to parquet for RG-level skipping#21828
feat: pushdown OFFSET to parquet for RG-level skipping#21828zhuqi-lucas wants to merge 3 commits intoapache:mainfrom
Conversation
39621f4 to
60c508d
Compare
There was a problem hiding this comment.
Pull request overview
This PR improves performance for LIMIT .. OFFSET .. queries on Parquet by pushing OFFSET down into the Parquet scan so it can skip entire row groups (and partially skip within a row group via RowSelection), avoiding reading and discarding large numbers of rows in GlobalLimitExec.
Changes:
- Introduces offset pushdown plumbing (
with_offset,offset,offset_fully_handled) acrossExecutionPlan,DataSource,FileSource, andFileScanConfig. - Implements Parquet-side offset handling (row-group pruning + optional
RowSelection) and adds anoffset_pruned_row_groupsmetric. - Adds/updates SLT coverage and expected EXPLAIN/metrics output to reflect offset pushdown behavior.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/sqllogictest/test_files/sort_pushdown.slt | Adds SLT “Test N” covering OFFSET pushdown behavior and correctness checks. |
| datafusion/sqllogictest/test_files/push_down_filter_parquet.slt | Updates expected EXPLAIN ANALYZE metrics to include offset_pruned_row_groups. |
| datafusion/sqllogictest/test_files/limit_pruning.slt | Updates expected metrics to include offset_pruned_row_groups. |
| datafusion/sqllogictest/test_files/explain_analyze.slt | Updates expected metrics to include offset_pruned_row_groups. |
| datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt | Updates expected metrics to include offset_pruned_row_groups. |
| datafusion/physical-plan/src/execution_plan.rs | Adds offset-related extension points to ExecutionPlan. |
| datafusion/physical-optimizer/src/limit_pushdown.rs | Extends limit pushdown rule to attempt offset pushdown and (sometimes) remove GlobalLimitExec. |
| datafusion/datasource/src/source.rs | Wires offset methods through DataSource and DataSourceExec. |
| datafusion/datasource/src/file_scan_config/mod.rs | Adds offset to FileScanConfig + builder and implements offset pushdown behavior. |
| datafusion/datasource/src/file.rs | Adds supports_offset() to FileSource. |
| datafusion/datasource-parquet/src/source.rs | Marks Parquet as supporting offset pushdown and propagates offset into morselizer config. |
| datafusion/datasource-parquet/src/row_group_filter.rs | Adds prune_by_offset and unit tests for row-group pruning by offset. |
| datafusion/datasource-parquet/src/opener.rs | Applies offset pruning and row selection during Parquet open; adjusts effective limit when offset is fully handled. |
| datafusion/datasource-parquet/src/metrics.rs | Adds offset_pruned_row_groups metric. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if global_skip > 0 { | ||
| add_global_limit( | ||
| plan_with_preserve_order, | ||
| global_skip, | ||
| Some(global_fetch), | ||
| ) | ||
| // Push offset to the plan. If the plan fully handles | ||
| // offset (e.g. parquet without WHERE), eliminate | ||
| // GlobalLimitExec. Otherwise keep it for remaining skip. | ||
| if let Some(plan_with_offset) = | ||
| plan_with_preserve_order.with_offset(global_skip) | ||
| { | ||
| if plan_with_offset.offset_fully_handled() { | ||
| plan_with_offset | ||
| } else { | ||
| add_global_limit( | ||
| plan_with_offset, | ||
| global_skip, | ||
| Some(global_fetch), | ||
| ) | ||
| } |
There was a problem hiding this comment.
When with_offset(global_skip) returns Some but offset_fully_handled() is false, this code still keeps GlobalLimitExec with skip=global_skip while also pushing the same offset into the child plan. If the child actually skips any rows (e.g. parquet skipping fully-matched row groups), the offset will be applied twice and results will be incorrect. Consider only calling/using with_offset when the plan can fully handle the offset (and otherwise leave the child unchanged), or alternatively adjust the GlobalLimitExec skip to reflect only the remaining offset actually not handled by the child (requires a way to compute/report it).
| // Offset is fully handled when set AND no filter — | ||
| // raw row counts are accurate for offset calculation. | ||
| // With filters, only fully-matched RGs can be skipped, | ||
| // GlobalLimitExec handles the rest. | ||
| self.offset.is_some() && self.file_source.filter().is_none() |
There was a problem hiding this comment.
offset_fully_handled() currently returns true whenever offset is set and there is no filter, but it does not account for multi-partition scans. FileStream applies limit per output partition, so removing GlobalLimitExec when file_groups.len() > 1 can yield more than fetch rows (and make OFFSET/LIMIT semantics depend on partitioning). Consider requiring a single output partition (e.g. self.file_groups.len() == 1 / output_partitioning().partition_count() == 1) before reporting the offset as fully handled.
| // Offset is fully handled when set AND no filter — | |
| // raw row counts are accurate for offset calculation. | |
| // With filters, only fully-matched RGs can be skipped, | |
| // GlobalLimitExec handles the rest. | |
| self.offset.is_some() && self.file_source.filter().is_none() | |
| // Offset is fully handled only when set, no filter is present, | |
| // and the scan has a single output partition. | |
| // | |
| // With filters, only fully-matched RGs can be skipped and | |
| // GlobalLimitExec handles the rest. Likewise, multi-partition | |
| // scans must retain global limit enforcement because FileStream | |
| // applies limits per output partition. | |
| self.offset.is_some() | |
| && self.file_source.filter().is_none() | |
| && self.file_groups.len() == 1 |
| 8 80 | ||
| 9 90 | ||
| 10 100 | ||
|
|
There was a problem hiding this comment.
The WHERE-clause coverage here uses OFFSET 2, which does not exercise the case where the offset spans at least one fully-matched row group under a predicate (the scenario that can break if offset is both pushed into parquet and also applied by GlobalLimitExec). Consider adding a variant like WHERE value > 50 LIMIT 3 OFFSET 7 (with 5-row row groups) to ensure correctness when the offset crosses a fully-matched row group boundary under filtering.
| # Test N.9b: OFFSET with WHERE clause crossing a fully-matched row-group boundary | |
| # Ensures correctness when OFFSET may be pushed into parquet and also applied by GlobalLimitExec | |
| query II | |
| SELECT * FROM tn_offset WHERE value > 50 LIMIT 3 OFFSET 7; | |
| ---- | |
| 13 130 | |
| 14 140 | |
| 15 150 |
alamb
left a comment
There was a problem hiding this comment.
Thanks @zhuqi-lucas -- I took a look and this looks pretty nice. I think we can potentially simplify the APIs a bit though and I left some suggestions on how to do so
let me know what you think
| /// The number of rows to skip before returning results. | ||
| /// When combined with `limit`, this enables efficient OFFSET handling | ||
| /// at the file scan level by skipping entire row groups when possible. | ||
| pub offset: Option<usize>, |
There was a problem hiding this comment.
I think technically this is a public API and thus an API change (so we should mark the PR thusly)
| self.offset | ||
| } | ||
|
|
||
| fn offset_fully_handled(&self) -> bool { |
There was a problem hiding this comment.
It would help me if you could add a comment explaining what this method is for and maybe a more descriptive name.
I found "handled" a bit generic -- maybe the name "can_apply_offset" or something would be more
| } | ||
|
|
||
| /// Whether this source fully handles offset at the scan level. | ||
| /// When true, the optimizer can eliminate GlobalLimitExec's skip. |
There was a problem hiding this comment.
In general I think it would be easier to understand if this documentation does't refer to GlobalLimitExec.... and instead simply says "can be efficiently implemented by the file source" or something like that
| None | ||
| } | ||
|
|
||
| /// Whether offset is fully handled (no need for GlobalLimitExec skip). |
There was a problem hiding this comment.
I am confused by this API -- if the datasource returns Some(..) for offset then I expect that the DataSource will correctly implement the offset handling. It seems error prone if callers ALSO have to remember to check offset_fully_handled
Perhaps we could hange FileScanConfig so that if a filter is ever set it clears out the offset (or better yet has an enum so it is not possible to set the offset and a filter at the same time)
Thanks for the review @alamb! I agree the current API is confusing. Let me explain the design intent: There are two cases for offset + parquet: Case 1: No filter → offset is fully handled (raw row counts are exact). Safe to eliminate Case 2: Filter + fully matched RGs → If How about this cleaner API instead:
Or alternatively: split into two methods — WDYT? I'll also address the other comments (move RowSelection logic to PreparedAccessPlan method, update docs/naming, mark as API change). |
|
I think we should take a step back and maybe reconsider what we are trying to do -- maybe by defining and documenting somewhere what the intended semantics for For example, for a query like SELECT * FROM `single_file.parquet` LIMIT 5 OFFSET 600000000I think DataFusion would probably be "correct" to return any 5 rows (as the query doesn't specify any However, that is probably not what the user wanted / expected. The user probably wants rows starting at logical offset 600000000 of the file. Likewise, what rows should be returned from this query (where there are multiple files)? SELECT * FROM directory_with_multiple_files LIMIT 5 OFFSET 600000000That is not clear to me - is there some sort of implied global order of rows within the file that users expect? Another question: Do we expect DataFusion to always produce the same values for a given LIMIT / OFFSET combination? (Note that today it will potentially return different values for the same
I think this makes sense and it is the case that @AntoinePrv gives in #19654
I am confused about how this is intended to work. For example in your example Also, I think the GlobalLimitExec needs to be updated Given the potential complexity of handling / optimizing the general case, I wonder if there is some way to start simple (maybe a PR that only implements case 1, when there are no predicates, and a single file, so it is clearer what the expected results should be |
From a user perspective, I very much expect to be returned the rows in the logical order of the file (alternatively is there a clause to express it as an
I do not have as strong an expectation here. Best case scenario I would say logical order is determined by something like lexicographic order of the file paths, but it does feels a bit more arbitrary. Though I would expect that there is some arbitrary order (possibly hidden from me) and that changing the offset changes the rows I am viewing in that hidden order. That is
Perhaps I can say a bit more on my use case. Basically I want to paginate rows from a dataset to show some user. I think the "efficiently paginate rows" can be considered a reasonable feature beyond my own use case. |
|
I understand where the need comes from, but there is a good reason why databases treat scans without order by as unordered, it's because a lot of logical/physical planning optimizations depend on this assumption, and they can only rely on metadata to tell if the plan changes they want to do are safe or not. If the underlying data is truly sorted over something that can be encoded similarly to what you can write with an ORDER BY (or at least producing the same metadata DataFusion uses), that's could be fine, but if the order is just the order rows happen to have in the files, and we can't encode this promise nowhere, then it gets complex. At that point, the planner should have a mode to disable all possible optimizations allowing a different results set order without an order by, which is definitely a non-trivial scrutiny, that every future contribution to the planner will have to go through, and since it's a major deviation from the SQL standard, everything must be re-checked for safety. |
|
Thanks @alamb for the thoughtful feedback! I agree with your suggestion to simplify. Plan: start with Case 1 only (no filter, single file) This PR already works for Case 1 — when there's no predicate, I'll simplify by:
Re: the semantics discussion from @AntoinePrv and @asolimando — great points on both sides. This PR doesn't change any OFFSET semantics; it produces the exact same rows as today's Will push the simplified version shortly. |
0b5d7fa to
07d54c4
Compare
|
Updated per review feedback — all items addressed: Simplified scope (per @alamb suggestion):
Code changes:
Follow-up issues created:
Ready for re-review. |
f67a13d to
fba644f
Compare
|
I asked about the Detect breaking changes CI failure here: #21499 (comment) |
I agree with @asolimando on this -- and I think DataFusion should not be breaking new ground on what semantics we implement (we should follow other DB implementations as much as possible)
We did recently add the ability to emit row id from the parquet reader 🤔 -- maybe we could make that work and then treat row group skipping as an optimization when the data is explicitly |
|
Thank yoU @zhuqi-lucas -- I am not sure this change actually solves @AntoinePrv's use case - I think the conservative checks will not trigger on a large file For example, I tested using a 14 GB clickbench parquet file: cd benchmarks
./bench.sh data clickbench_1
cd dataAnd then run datafusion-cli from this branch: select * from 'hits.parquet' OFFSET 99000000 LIMIT 5;It took 4seconds on my laptop (to return 5 rows) which I think means this branch is not triggered: andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion/benchmarks/data$ ~/Downloads/datafusion-cli-feat_offset-pushdown
DataFusion CLI v53.1.0
> select * from 'hits.parquet' OFFSET 99000000 LIMIT 5;
+---------------------+------------+-------------------------------------------------------------------------------------+-----------+------------+-----------+-----------+-------------+----------+--------------------+--------------+-----+-----------+--------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------+-----------+-------------------+-----------------+---------------+-------------+-----------------+------------------+-----------------+------------+------------+-------------+----------+----------+----------------+----------------+--------------+------------------+----------+-------------+------------------+------------------------+-------------+----------------+----------------+--------------+-------------+-------------+-------------------+--------------------+----------------+-----------------+---------------------+---------------------+---------------------+---------------------+-------------+-------------+--------+------------+-------------+---------------------+-------------+-----------+--------------+---------+-------------+---------------+----------+----------+----------------+-----+-----+--------+-----------+-----------+------------+------------+------------+---------------+-----------------+----------------+---------------+--------------+-----------+------------+-----------+---------------+---------------------+-------------------+-------------+-----------------------+------------------+------------+--------------+---------------+-----------------+---------------------+--------------------+--------------+------------------+-----------+-----------+-------------+------------+---------+---------+----------+----------------------+---------------------+------+
| WatchID | JavaEnable | Title | GoodEvent | EventTime | EventDate | CounterID | ClientIP | RegionID | UserID | CounterClass | OS | UserAgent | URL | Referer | IsRefresh | RefererCategoryID | RefererRegionID | URLCategoryID | URLRegionID | ResolutionWidth | ResolutionHeight | ResolutionDepth | FlashMajor | FlashMinor | FlashMinor2 | NetMajor | NetMinor | UserAgentMajor | UserAgentMinor | CookieEnable | JavascriptEnable | IsMobile | MobilePhone | MobilePhoneModel | Params | IPNetworkID | TraficSourceID | SearchEngineID | SearchPhrase | AdvEngineID | IsArtifical | WindowClientWidth | WindowClientHeight | ClientTimeZone | ClientEventTime | SilverlightVersion1 | SilverlightVersion2 | SilverlightVersion3 | SilverlightVersion4 | PageCharset | CodeVersion | IsLink | IsDownload | IsNotBounce | FUniqID | OriginalURL | HID | IsOldCounter | IsEvent | IsParameter | DontCountHits | WithHash | HitColor | LocalEventTime | Age | Sex | Income | Interests | Robotness | RemoteIP | WindowName | OpenerName | HistoryLength | BrowserLanguage | BrowserCountry | SocialNetwork | SocialAction | HTTPError | SendTiming | DNSTiming | ConnectTiming | ResponseStartTiming | ResponseEndTiming | FetchTiming | SocialSourceNetworkID | SocialSourcePage | ParamPrice | ParamOrderID | ParamCurrency | ParamCurrencyID | OpenstatServiceName | OpenstatCampaignID | OpenstatAdID | OpenstatSourceID | UTMSource | UTMMedium | UTMCampaign | UTMContent | UTMTerm | FromTag | HasGCLID | RefererHash | URLHash | CLID |
+---------------------+------------+-------------------------------------------------------------------------------------+-----------+------------+-----------+-----------+-------------+----------+--------------------+--------------+-----+-----------+--------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------+-----------+-------------------+-----------------+---------------+-------------+-----------------+------------------+-----------------+------------+------------+-------------+----------+----------+----------------+----------------+--------------+------------------+----------+-------------+------------------+------------------------+-------------+----------------+----------------+--------------+-------------+-------------+-------------------+--------------------+----------------+-----------------+---------------------+---------------------+---------------------+---------------------+-------------+-------------+--------+------------+-------------+---------------------+-------------+-----------+--------------+---------+-------------+---------------+----------+----------+----------------+-----+-----+--------+-----------+-----------+------------+------------+------------+---------------+-----------------+----------------+---------------+--------------+-----------+------------+-----------+---------------+---------------------+-------------------+-------------+-----------------------+------------------+------------+--------------+---------------+-----------------+---------------------+--------------------+--------------+------------------+-----------+-----------+-------------+------------+---------+---------+----------+----------------------+---------------------+------+
....
5 row(s) fetched.
Elapsed 3.192 seconds.So I think we either need to
|
|
BTW here is the explain plan andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion/benchmarks/data$ ~/Downloads/datafusion-cli-feat_offset-pushdown
DataFusion CLI v53.1.0
> explain select * from 'hits.parquet' OFFSET 99000000 LIMIT 5;
+---------------+-------------------------------+
| plan_type | plan |
+---------------+-------------------------------+
| physical_plan | ┌───────────────────────────┐ |
| | │ GlobalLimitExec │ |
| | │ -------------------- │ |
| | │ limit: 5 │ |
| | │ skip: 99000000 │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ CoalescePartitionsExec │ |
| | │ -------------------- │ |
| | │ limit: 99000005 │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ DataSourceExec │ |
| | │ -------------------- │ |
| | │ files: 16 │ |
| | │ format: parquet │ |
| | └───────────────────────────┘ |
| | |
+---------------+-------------------------------+
1 row(s) fetched.
Elapsed 0.017 seconds.
> explain format indent select * from 'hits.parquet' OFFSET 99000000 LIMIT 5;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: skip=99000000, fetch=5 |
| | TableScan: hits.parquet projection=[WatchID, JavaEnable, Title, GoodEvent, EventTime, EventDate, CounterID, ClientIP, RegionID, UserID, CounterClass, OS, UserAgent, URL, Referer, IsRefresh, RefererCategoryID, RefererRegionID, URLCategoryID, URLRegionID, ResolutionWidth, ResolutionHeight, ResolutionDepth, FlashMajor, FlashMinor, FlashMinor2, NetMajor, NetMinor, UserAgentMajor, UserAgentMinor, CookieEnable, JavascriptEnable, IsMobile, MobilePhone, MobilePhoneModel, Params, IPNetworkID, TraficSourceID, SearchEngineID, SearchPhrase, AdvEngineID, IsArtifical, WindowClientWidth, WindowClientHeight, ClientTimeZone, ClientEventTime, SilverlightVersion1, SilverlightVersion2, SilverlightVersion3, SilverlightVersion4, PageCharset, CodeVersion, IsLink, IsDownload, IsNotBounce, FUniqID, OriginalURL, HID, IsOldCounter, IsEvent, IsParameter, DontCountHits, WithHash, HitColor, LocalEventTime, Age, Sex, Income, Interests, Robotness, RemoteIP, WindowName, OpenerName, HistoryLength, BrowserLanguage, BrowserCountry, SocialNetwork, SocialAction, HTTPError, SendTiming, DNSTiming, ConnectTiming, ResponseStartTiming, ResponseEndTiming, FetchTiming, SocialSourceNetworkID, SocialSourcePage, ParamPrice, ParamOrderID, ParamCurrency, ParamCurrencyID, OpenstatServiceName, OpenstatCampaignID, OpenstatAdID, OpenstatSourceID, UTMSource, UTMMedium, UTMCampaign, UTMContent, UTMTerm, FromTag, HasGCLID, RefererHash, URLHash, CLID], fetch=99000005 |
| physical_plan | GlobalLimitExec: skip=99000000, fetch=5 |
| | CoalescePartitionsExec: fetch=99000005 |
| | DataSourceExec: file_groups={16 groups: [[Users/andrewlamb/Software/datafusion/benchmarks/data/hits.parquet:0..923748528], [Users/andrewlamb/Software/datafusion/benchmarks/data/hits.parquet:923748528..1847497056], [Users/andrewlamb/Software/datafusion/benchmarks/data/hits.parquet:1847497056..2771245584], [Users/andrewlamb/Software/datafusion/benchmarks/data/hits.parquet:2771245584..3694994112], [Users/andrewlamb/Software/datafusion/benchmarks/data/hits.parquet:3694994112..4618742640], ...]}, projection=[WatchID, JavaEnable, Title, GoodEvent, EventTime, EventDate, CounterID, ClientIP, RegionID, UserID, CounterClass, OS, UserAgent, URL, Referer, IsRefresh, RefererCategoryID, RefererRegionID, URLCategoryID, URLRegionID, ResolutionWidth, ResolutionHeight, ResolutionDepth, FlashMajor, FlashMinor, FlashMinor2, NetMajor, NetMinor, UserAgentMajor, UserAgentMinor, CookieEnable, JavascriptEnable, IsMobile, MobilePhone, MobilePhoneModel, Params, IPNetworkID, TraficSourceID, SearchEngineID, SearchPhrase, AdvEngineID, IsArtifical, WindowClientWidth, WindowClientHeight, ClientTimeZone, ClientEventTime, SilverlightVersion1, SilverlightVersion2, SilverlightVersion3, SilverlightVersion4, PageCharset, CodeVersion, IsLink, IsDownload, IsNotBounce, FUniqID, OriginalURL, HID, IsOldCounter, IsEvent, IsParameter, DontCountHits, WithHash, HitColor, LocalEventTime, Age, Sex, Income, Interests, Robotness, RemoteIP, WindowName, OpenerName, HistoryLength, BrowserLanguage, BrowserCountry, SocialNetwork, SocialAction, HTTPError, SendTiming, DNSTiming, ConnectTiming, ResponseStartTiming, ResponseEndTiming, FetchTiming, SocialSourceNetworkID, SocialSourcePage, ParamPrice, ParamOrderID, ParamCurrency, ParamCurrencyID, OpenstatServiceName, OpenstatCampaignID, OpenstatAdID, OpenstatSourceID, UTMSource, UTMMedium, UTMCampaign, UTMContent, UTMTerm, FromTag, HasGCLID, RefererHash, URLHash, CLID], limit=99000005, file_type=parquet |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.009 seconds.
|
|
Thanks @alamb for testing! You found the key issue — the optimization doesn't trigger because The root cause: with multiple partitions, each partition reads different byte ranges of the same file independently. The offset needs cross-partition coordination because Working on a fix that handles single-file multi-partition correctly. Will test locally with the ClickBench 14GB file before pushing. |
fba644f to
eee57af
Compare
|
|
the fix for detect breaking changes ci job is now resolved so I updated your branch with main, sorry for the trouble |
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
|
Great, the check breaking changes is working! do you think we should add it? |
Push OFFSET from GlobalLimitExec down to DataSourceExec/ParquetOpener. Uses shared Arc<AtomicUsize> counter across partitions so multi-partition single-file queries (byte-range partitioning) are handled correctly. Design: - with_offset accepted for parquet + no filter (any file count) - SharedCount: each partition atomically consumes offset by skipping RGs - RowSelection for partial RG skip (remaining offset within first RG) - Optimizer eliminates GlobalLimitExec when offset is pushed - effective_limit adjusted per partition based on consumed offset Implementation: - FileSource::supports_offset() (parquet=true, others=false) - FileScanConfig: offset field, with_offset (no filter guard) - LimitPushdown: push offset, eliminate GlobalLimitExec - prune_by_offset: skip leading fully-matched RGs - PreparedAccessPlan::apply_offset() for RowSelection - Shared Arc<AtomicUsize> remaining_offset in ParquetMorselizer
121064f to
fb717c5
Compare
When a single parquet file is split into multiple byte-range partitions, CoalescePartitionsExec sits between GlobalLimitExec and DataSourceExec. Previously, the optimizer would keep the GlobalLimitExec for the skip, meaning offset wasn't pushed to the parquet reader. Now we try to push the offset through the combining operator to its DataSourceExec child, which uses a shared Arc<AtomicUsize> counter to coordinate offset consumption across partitions. This eliminates the GlobalLimitExec entirely for supported sources. Result on 14GB hits.parquet: OFFSET 99M LIMIT 5 from 3.2s → 29ms. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
fb717c5 to
1691b07
Compare
|
Thanks @alamb for the great discussion and testing! Update: multi-partition issue fixed The optimization now works on large files with byte-range partitioning. The key fix was pushing offset through Testing on the same 14GB On semantics I agree with you and @asolimando that |
Apologies if my comment read as referring to the PR, I only meant to address @AntoinePrv's comment, as I was afraid of the implications, although I definitely see where the ask is coming from, having worked closely to data scientists and data engineers in the past. This is very exciting work, and @alamb's proposal around using |
Which issue does this PR close?
Closes #19654
Rationale for this change
SELECT * FROM table LIMIT 5 OFFSET 59000000on a 60M-row parquet file takes 182ms because DataFusion reads 59M+ rows then discards them inGlobalLimitExec. The parquet reader has no knowledge of the offset.What changes are included in this PR?
Push OFFSET from
GlobalLimitExecdown to the parquet reader for single-file, no-filter queries.Architecture
Scope
Implementation
FileSource::supports_offset()— trait method, parquet returnstrue, othersfalseFileScanConfig.offset— new field,with_offset()returnsSomeonly for single-file + no-filter parquetLimitPushdownoptimizer — pushes offset, eliminatesGlobalLimitExecwhenwith_offset()returnsSomeprune_by_offset()— skips leading fully-matched RGs whose cumulative rows fall within offsetPreparedAccessPlan::apply_offset()— createsRowSelectionfor remaining offset within first surviving RGeffective_limit— decoder reads onlyfetchrows (limit - offset)Benchmark (60M rows, 1.5GB single parquet file)
Are these changes tested?
prune_by_offset(boundary, partial, non-fully-matched, zero, exceeds, exact)explain_analyze,push_down_filter_parquet,limit_pruningSLTs for new metriclimit.slttests pass (CSV/JSON offset still handled byGlobalLimitExec)Are there any user-facing changes?
API change:
FileSource::supports_offset()added (defaultfalse).FileScanConfiggainsoffsetfield.ExecutionPlangainswith_offset()andoffset()methods.Performance: faster OFFSET queries on single-file parquet without filters.
Follow-up