feat: pushdown OFFSET to parquet for RG-level skipping#21828
feat: pushdown OFFSET to parquet for RG-level skipping#21828zhuqi-lucas wants to merge 1 commit intoapache:mainfrom
Conversation
Push OFFSET from GlobalLimitExec down to DataSourceExec/ParquetOpener. Skips entire row groups + uses RowSelection for partial RG skip. Two modes based on filter presence: - No WHERE: offset fully handled, GlobalLimitExec eliminated (RG prune + RowSelection + effective_limit adjustment) - With WHERE: offset as hint, GlobalLimitExec kept for correctness (only fully-matched RGs skipped, remaining offset by GlobalLimitExec) Implementation: - FileSource::supports_offset() trait method (parquet=true) - ExecutionPlan::offset_fully_handled() for optimizer decision - FileScanConfig: offset field + with_offset - LimitPushdown: push offset, eliminate GlobalLimitExec when fully handled - prune_by_offset: skip leading fully-matched RGs by cumulative row count - RowSelection for remaining offset (no-filter case only) - effective_limit = limit - offset (no-filter case only) Benchmark (60M row, 1.5GB parquet, single partition): OFFSET 0: 3ms → 2ms OFFSET 1M: 4ms → 1ms (4x) OFFSET 30M: 98ms → 1ms (98x) OFFSET 59M: 182ms → <1ms (>182x)
39621f4 to
60c508d
Compare
There was a problem hiding this comment.
Pull request overview
This PR improves performance for LIMIT .. OFFSET .. queries on Parquet by pushing OFFSET down into the Parquet scan so it can skip entire row groups (and partially skip within a row group via RowSelection), avoiding reading and discarding large numbers of rows in GlobalLimitExec.
Changes:
- Introduces offset pushdown plumbing (
with_offset,offset,offset_fully_handled) acrossExecutionPlan,DataSource,FileSource, andFileScanConfig. - Implements Parquet-side offset handling (row-group pruning + optional
RowSelection) and adds anoffset_pruned_row_groupsmetric. - Adds/updates SLT coverage and expected EXPLAIN/metrics output to reflect offset pushdown behavior.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/sqllogictest/test_files/sort_pushdown.slt | Adds SLT “Test N” covering OFFSET pushdown behavior and correctness checks. |
| datafusion/sqllogictest/test_files/push_down_filter_parquet.slt | Updates expected EXPLAIN ANALYZE metrics to include offset_pruned_row_groups. |
| datafusion/sqllogictest/test_files/limit_pruning.slt | Updates expected metrics to include offset_pruned_row_groups. |
| datafusion/sqllogictest/test_files/explain_analyze.slt | Updates expected metrics to include offset_pruned_row_groups. |
| datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt | Updates expected metrics to include offset_pruned_row_groups. |
| datafusion/physical-plan/src/execution_plan.rs | Adds offset-related extension points to ExecutionPlan. |
| datafusion/physical-optimizer/src/limit_pushdown.rs | Extends limit pushdown rule to attempt offset pushdown and (sometimes) remove GlobalLimitExec. |
| datafusion/datasource/src/source.rs | Wires offset methods through DataSource and DataSourceExec. |
| datafusion/datasource/src/file_scan_config/mod.rs | Adds offset to FileScanConfig + builder and implements offset pushdown behavior. |
| datafusion/datasource/src/file.rs | Adds supports_offset() to FileSource. |
| datafusion/datasource-parquet/src/source.rs | Marks Parquet as supporting offset pushdown and propagates offset into morselizer config. |
| datafusion/datasource-parquet/src/row_group_filter.rs | Adds prune_by_offset and unit tests for row-group pruning by offset. |
| datafusion/datasource-parquet/src/opener.rs | Applies offset pruning and row selection during Parquet open; adjusts effective limit when offset is fully handled. |
| datafusion/datasource-parquet/src/metrics.rs | Adds offset_pruned_row_groups metric. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if global_skip > 0 { | ||
| add_global_limit( | ||
| plan_with_preserve_order, | ||
| global_skip, | ||
| Some(global_fetch), | ||
| ) | ||
| // Push offset to the plan. If the plan fully handles | ||
| // offset (e.g. parquet without WHERE), eliminate | ||
| // GlobalLimitExec. Otherwise keep it for remaining skip. | ||
| if let Some(plan_with_offset) = | ||
| plan_with_preserve_order.with_offset(global_skip) | ||
| { | ||
| if plan_with_offset.offset_fully_handled() { | ||
| plan_with_offset | ||
| } else { | ||
| add_global_limit( | ||
| plan_with_offset, | ||
| global_skip, | ||
| Some(global_fetch), | ||
| ) | ||
| } |
There was a problem hiding this comment.
When with_offset(global_skip) returns Some but offset_fully_handled() is false, this code still keeps GlobalLimitExec with skip=global_skip while also pushing the same offset into the child plan. If the child actually skips any rows (e.g. parquet skipping fully-matched row groups), the offset will be applied twice and results will be incorrect. Consider only calling/using with_offset when the plan can fully handle the offset (and otherwise leave the child unchanged), or alternatively adjust the GlobalLimitExec skip to reflect only the remaining offset actually not handled by the child (requires a way to compute/report it).
| // Offset is fully handled when set AND no filter — | ||
| // raw row counts are accurate for offset calculation. | ||
| // With filters, only fully-matched RGs can be skipped, | ||
| // GlobalLimitExec handles the rest. | ||
| self.offset.is_some() && self.file_source.filter().is_none() |
There was a problem hiding this comment.
offset_fully_handled() currently returns true whenever offset is set and there is no filter, but it does not account for multi-partition scans. FileStream applies limit per output partition, so removing GlobalLimitExec when file_groups.len() > 1 can yield more than fetch rows (and make OFFSET/LIMIT semantics depend on partitioning). Consider requiring a single output partition (e.g. self.file_groups.len() == 1 / output_partitioning().partition_count() == 1) before reporting the offset as fully handled.
| // Offset is fully handled when set AND no filter — | |
| // raw row counts are accurate for offset calculation. | |
| // With filters, only fully-matched RGs can be skipped, | |
| // GlobalLimitExec handles the rest. | |
| self.offset.is_some() && self.file_source.filter().is_none() | |
| // Offset is fully handled only when set, no filter is present, | |
| // and the scan has a single output partition. | |
| // | |
| // With filters, only fully-matched RGs can be skipped and | |
| // GlobalLimitExec handles the rest. Likewise, multi-partition | |
| // scans must retain global limit enforcement because FileStream | |
| // applies limits per output partition. | |
| self.offset.is_some() | |
| && self.file_source.filter().is_none() | |
| && self.file_groups.len() == 1 |
| // Prune by offset: skip leading fully-matched row groups that fall | ||
| // entirely within the offset, so they are never read from disk. | ||
| let remaining_offset = if let Some(offset) = prepared.offset { | ||
| if offset > 0 { | ||
| Some(row_groups.prune_by_offset( | ||
| offset, | ||
| prepared.predicate.is_some(), | ||
| rg_metadata, | ||
| &prepared.file_metrics, | ||
| )) | ||
| } else { | ||
| None | ||
| } | ||
| } else { | ||
| None |
There was a problem hiding this comment.
Offset pruning is applied even when prepared.predicate.is_some() (via prune_by_offset(..., prepared.predicate.is_some(), ...)). Since the optimizer keeps GlobalLimitExec when predicates exist (offset not fully handled), skipping any fully-matched leading row groups here will change the stream output and can cause the global skip to be applied twice (wrong results when the offset spans ≥1 fully-matched row group). Consider only performing offset-based row group pruning / selection when offset is fully handled within parquet (currently the predicate.is_none() case), or otherwise ensure the remaining global skip is reduced accordingly.
| // Prune by offset: skip leading fully-matched row groups that fall | |
| // entirely within the offset, so they are never read from disk. | |
| let remaining_offset = if let Some(offset) = prepared.offset { | |
| if offset > 0 { | |
| Some(row_groups.prune_by_offset( | |
| offset, | |
| prepared.predicate.is_some(), | |
| rg_metadata, | |
| &prepared.file_metrics, | |
| )) | |
| } else { | |
| None | |
| } | |
| } else { | |
| None | |
| // Prune by offset only when parquet can fully handle it. When a | |
| // predicate exists, the optimizer keeps a global skip/offset above | |
| // parquet, so pruning fully-matched leading row groups here would | |
| // change the stream seen by that operator and can apply the offset | |
| // twice. | |
| let remaining_offset = match (prepared.offset, prepared.predicate.is_none()) { | |
| (Some(offset), true) if offset > 0 => Some(row_groups.prune_by_offset( | |
| offset, | |
| false, | |
| rg_metadata, | |
| &prepared.file_metrics, | |
| )), | |
| _ => None, |
| 8 80 | ||
| 9 90 | ||
| 10 100 | ||
|
|
There was a problem hiding this comment.
The WHERE-clause coverage here uses OFFSET 2, which does not exercise the case where the offset spans at least one fully-matched row group under a predicate (the scenario that can break if offset is both pushed into parquet and also applied by GlobalLimitExec). Consider adding a variant like WHERE value > 50 LIMIT 3 OFFSET 7 (with 5-row row groups) to ensure correctness when the offset crosses a fully-matched row group boundary under filtering.
| # Test N.9b: OFFSET with WHERE clause crossing a fully-matched row-group boundary | |
| # Ensures correctness when OFFSET may be pushed into parquet and also applied by GlobalLimitExec | |
| query II | |
| SELECT * FROM tn_offset WHERE value > 50 LIMIT 3 OFFSET 7; | |
| ---- | |
| 13 130 | |
| 14 140 | |
| 15 150 |
Which issue does this PR close?
Closes #19654
Rationale for this change
SELECT * FROM table LIMIT 5 OFFSET 59000000on a 60M-row parquet file takes 182ms because DataFusion reads 59M+ rows then discards them inGlobalLimitExec. The parquet reader has no knowledge of the offset.What changes are included in this PR?
Push OFFSET from
GlobalLimitExecdown to the parquet reader, enabling RG-level skipping.Architecture
Two modes based on filter presence
Implementation details
FileSource::supports_offset()— trait method, parquet returnstrue, CSV/JSON returnfalseExecutionPlan::offset_fully_handled()— returnstruewhen offset is fully handled (parquet + no WHERE), optimizer uses this to decide whether to eliminateGlobalLimitExecFileScanConfig.offset— new field, set viawith_offset()(only accepted whensupports_offset())LimitPushdownoptimizer — pushes offset toDataSourceExec. Ifoffset_fully_handled()→ eliminateGlobalLimitExec. Otherwise keep it.prune_by_offset()— skips leading fully-matched RGs whose cumulative rows fall within offsetRowSelection— for partial RG skip (remaining offset within first surviving RG, no-filter case only)effective_limit— decoder reads onlyfetchrows (limit - offset, no-filter case only)Benchmark (60M rows, 1.5GB parquet, single partition)
Are these changes tested?
prune_by_offset(boundary, partial, non-fully-matched, zero, exceeds total, exact)explain_analyze,push_down_filter_parquet,limit_pruningSLTs for newoffset_pruned_row_groupsmetricencrypted_parquetupstream bug)limit.slttests pass (CSV/JSON offset handled byGlobalLimitExec—supports_offset()returns false)Are there any user-facing changes?
Performance only — faster OFFSET queries on parquet. No API changes visible to SQL users.