Skip to content

Commit accf77d

Browse files
zhuqi-lucasclaude
authored andcommitted
feat: sort file groups by statistics during sort pushdown (Sort pushdown phase 2) (apache#21182)
- Closes apache#17348 - Closes apache#19329 When a partition (file group) contains multiple files in wrong order, `validated_output_ordering()` strips the ordering and `EnforceSorting` inserts an unnecessary `SortExec` — even though the files are non-overlapping and internally sorted. This PR fixes it by **sorting files within each group by min/max statistics** during sort pushdown. After sorting, the file order matches the sort key order, the ordering becomes valid, and `SortExec` can be eliminated. This works for both single-partition and multi-partition plans with multi-file groups. ```text Files in wrong order within a partition: After statistics-based sorting: [a_high(400k+), b_mid(200k), c_low(1)] [c_low(1), b_mid(200k), a_high(400k+)] → ordering stripped → ordering valid, non-overlapping → SortExec stays → SortExec eliminated ``` When `PushdownSort` finds a `SortExec` above a file-based `DataSourceExec`: 1. **FileSource returns Exact** (natural ordering satisfies request): - Sort files within each group by statistics, verify non-overlapping - SortExec removed, fetch (LIMIT) pushed to DataSourceExec 2. **FileSource returns Unsupported** (ordering stripped due to wrong file order): - Sort files within each group by statistics - Re-check: if files are now non-overlapping and ordering is valid → upgrade to Exact - SortExec eliminated + fetch pushed down 3. **FileSource returns Inexact** (reverse scan): - SortExec kept, scan optimized with reverse_row_groups | File | Change | |------|--------| | `datasource-parquet/src/source.rs` | ParquetSource returns `Exact` when natural ordering satisfies request | | `datasource/src/file_scan_config.rs` | Statistics-based file sorting, non-overlapping re-check, Unsupported→Exact upgrade | | `physical-optimizer/src/pushdown_sort.rs` | Preserve fetch (LIMIT) when eliminating SortExec, module doc update | | `core/tests/physical_optimizer/pushdown_sort.rs` | Updated prefix match test | | `sqllogictest/test_files/sort_pushdown.slt` | Updated existing tests + 5 new test groups (A-E) | Local release build, `--partitions 1`, 3 non-overlapping files with reversed naming (6M rows): | Query | Description | Main (ms) | PR (ms) | Speedup | |-------|-------------|-----------|---------|---------| | Q1 | `ORDER BY ASC` (full scan) | 259 | 122 | **2.1x faster** | | Q2 | `ORDER BY ASC LIMIT 100` | 80 | 3 | **27x faster** | | Q3 | `SELECT * ORDER BY ASC` | 700 | 313 | **2.2x faster** | | Q4 | `SELECT * LIMIT 100` | 342 | 7 | **49x faster** | LIMIT queries benefit most because sort elimination + limit pushdown means only the first ~100 rows are read. - 13 new unit tests covering all sort pushdown paths - 5 new SLT integration test groups (sort elimination, overlapping files, LIMIT, multi-partition, inferred ordering) - All existing tests pass with no regressions - [x] `cargo test -p datafusion-datasource` — all tests pass - [x] `cargo test -p datafusion-datasource-parquet` — all tests pass - [x] `cargo test -p datafusion-physical-optimizer` — all tests pass - [x] `cargo test -p datafusion --test core_integration` — all tests pass - [x] SLT sort/order/topk/window/union/joins tests pass (no regressions) - [x] `cargo clippy` — 0 warnings 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 708eb43 commit accf77d

6 files changed

Lines changed: 1460 additions & 113 deletions

File tree

datafusion/core/tests/physical_optimizer/pushdown_sort.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,9 @@ fn test_prefix_match_through_transparent_nodes() {
258258
}
259259

260260
#[test]
261-
fn test_no_prefix_match_wrong_direction() {
262-
// Test that prefix matching does NOT work if the direction is wrong
261+
fn test_exact_prefix_match_same_direction() {
262+
// Test that when the requested sort [a DESC] matches a prefix of the source's
263+
// natural ordering [a DESC, b ASC], the Sort is eliminated (Exact pushdown).
263264
let schema = schema();
264265

265266
// Source has [a DESC, b ASC] ordering
@@ -268,7 +269,7 @@ fn test_no_prefix_match_wrong_direction() {
268269
let source_ordering = LexOrdering::new(vec![a.clone().reverse(), b]).unwrap();
269270
let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);
270271

271-
// Request [a DESC] - same direction as source, NOT a reverse prefix
272+
// Request [a DESC] - same direction as source prefix, Sort should be eliminated
272273
let same_direction = LexOrdering::new(vec![a.clone().reverse()]).unwrap();
273274
let plan = sort_exec(same_direction, source);
274275

@@ -281,8 +282,7 @@ fn test_no_prefix_match_wrong_direction() {
281282
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
282283
output:
283284
Ok:
284-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
285-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
285+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
286286
"
287287
);
288288
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -741,19 +741,17 @@ impl FileSource for ParquetSource {
741741
///
742742
/// With both pieces of information, ParquetSource can decide what optimizations to apply.
743743
///
744-
/// # Phase 1 Behavior (Current)
745-
/// Returns `Inexact` when reversing the row group scan order would help satisfy the
746-
/// requested ordering. We still need a Sort operator at a higher level because:
747-
/// - We only reverse row group read order, not rows within row groups
748-
/// - This provides approximate ordering that benefits limit pushdown
749-
///
750-
/// # Phase 2 (Future)
751-
/// Could return `Exact` when we can guarantee perfect ordering through techniques like:
752-
/// - File reordering based on statistics
753-
/// - Detecting already-sorted data
754-
/// This would allow removing the Sort operator entirely.
744+
/// # Behavior
745+
/// - Returns `Exact` when the file's natural ordering (from Parquet metadata) already
746+
/// satisfies the requested ordering. This allows the Sort operator to be eliminated
747+
/// if the files within each group are also non-overlapping (checked by FileScanConfig).
748+
/// - Returns `Inexact` when reversing the row group scan order would help satisfy the
749+
/// requested ordering. We still need a Sort operator at a higher level because:
750+
/// - We only reverse row group read order, not rows within row groups
751+
/// - This provides approximate ordering that benefits limit pushdown
755752
///
756753
/// # Returns
754+
/// - `Exact`: The file's natural ordering satisfies the request (within-file ordering guaranteed)
757755
/// - `Inexact`: Created an optimized source (e.g., reversed scan) that approximates the order
758756
/// - `Unsupported`: Cannot optimize for this ordering
759757
fn try_reverse_output(
@@ -765,6 +763,16 @@ impl FileSource for ParquetSource {
765763
return Ok(SortOrderPushdownResult::Unsupported);
766764
}
767765

766+
// Check if the natural (non-reversed) ordering already satisfies the request.
767+
// Parquet metadata guarantees within-file ordering, so if the ordering matches
768+
// we can return Exact. FileScanConfig will verify that files within each group
769+
// are non-overlapping before declaring the entire scan as Exact.
770+
if eq_properties.ordering_satisfy(order.iter().cloned())? {
771+
return Ok(SortOrderPushdownResult::Exact {
772+
inner: Arc::new(self.clone()) as Arc<dyn FileSource>,
773+
});
774+
}
775+
768776
// Build new equivalence properties with the reversed ordering.
769777
// This allows us to check if the reversed ordering satisfies the request
770778
// by leveraging:
@@ -809,11 +817,6 @@ impl FileSource for ParquetSource {
809817
Ok(SortOrderPushdownResult::Inexact {
810818
inner: Arc::new(new_source) as Arc<dyn FileSource>,
811819
})
812-
813-
// TODO Phase 2: Add support for other optimizations:
814-
// - File reordering based on min/max statistics
815-
// - Detection of exact ordering (return Exact to remove Sort operator)
816-
// - Partial sort pushdown for prefix matches
817820
}
818821
}
819822

0 commit comments

Comments
 (0)