Skip to content

Commit 9286563

Browse files
adriangbclaude
authored andcommitted
fix: validate inter-file ordering in eq_properties() (apache#20329)
## Summary Discovered this bug while working on apache#19724. TLDR: just because the files themselves are sorted doesn't mean the partition streams are sorted. - **`eq_properties()` in `FileScanConfig` blindly trusted `output_ordering`** (set from Parquet `sorting_columns` metadata) without verifying that files within a group are in the correct inter-file order - `EnforceSorting` then removed `SortExec` based on this unvalidated ordering, producing **wrong results** when filesystem order didn't match data order - Added `validated_output_ordering()` that filters orderings using `MinMaxStatistics::new_from_files()` + `is_sorted()` to verify inter-file sort order before reporting them to the optimizer ## Changes ### `datafusion/datasource/src/file_scan_config.rs` - Added `validated_output_ordering()` method on `FileScanConfig` that validates each output ordering against actual file group statistics - Changed `eq_properties()` to call `self.validated_output_ordering()` instead of `self.output_ordering.clone()` ### `datafusion/sqllogictest/test_files/sort_pushdown.slt` Added 8 new regression tests (Tests 4-11): | Test | Scenario | Key assertion | |------|----------|---------------| | **4** | Reversed filesystem order (inferred ordering) | SortExec retained — wrong inter-file order detected | | **5** | Overlapping file ranges (inferred ordering) | SortExec retained — overlapping ranges detected | | **6** | `WITH ORDER` + reversed filesystem order | SortExec retained despite explicit ordering | | **7** | Correctly ordered multi-file group (positive) | SortExec eliminated — validation passes | | **8** | DESC ordering with wrong inter-file DESC order | SortExec retained for DESC direction | | **9** | Multi-column sort key (overlapping vs non-overlapping) | Conservative rejection with overlapping stats; passes with clean boundaries | | **10** | Correctly ordered + `WITH ORDER` (positive) | SortExec eliminated — both ordering and stats agree | | **11** | Multiple partitions (one file per group) | `SortPreservingMergeExec` merges; no per-partition sort needed | ## Test plan - [x] `cargo test --test sqllogictests -- sort_pushdown` — all new + existing tests pass - [x] `cargo test -p datafusion-datasource` — 97 unit tests + 6 doc tests pass - [x] Existing Test 1 (single-file sort pushdown with `WITH ORDER`) still eliminates SortExec (no regression) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 016e2ae commit 9286563

5 files changed

Lines changed: 660 additions & 47 deletions

File tree

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,7 @@ mod test {
864864
let plan_string = get_plan_string(&aggregate_exec_partial).swap_remove(0);
865865
assert_snapshot!(
866866
plan_string,
867-
@"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)], ordering_mode=Sorted"
867+
@"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)]"
868868
);
869869

870870
let p0_statistics = aggregate_exec_partial.partition_statistics(Some(0))?;

datafusion/datasource/src/file_scan_config.rs

Lines changed: 118 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,7 @@ impl DataSource for FileScanConfig {
665665
let schema = self.file_source.table_schema().table_schema();
666666
let mut eq_properties = EquivalenceProperties::new_with_orderings(
667667
Arc::clone(schema),
668-
self.output_ordering.clone(),
668+
self.validated_output_ordering(),
669669
)
670670
.with_constraints(self.constraints.clone());
671671

@@ -853,6 +853,40 @@ impl DataSource for FileScanConfig {
853853
}
854854

855855
impl FileScanConfig {
856+
/// Returns only the output orderings that are validated against actual
857+
/// file group statistics.
858+
///
859+
/// For example, individual files may be ordered by `col1 ASC`,
860+
/// but if we have files with these min/max statistics in a single partition / file group:
861+
///
862+
/// - file1: min(col1) = 10, max(col1) = 20
863+
/// - file2: min(col1) = 5, max(col1) = 15
864+
///
865+
/// Because reading file1 followed by file2 would produce out-of-order output (there is overlap
866+
/// in the ranges), we cannot retain `col1 ASC` as a valid output ordering.
867+
///
868+
/// Similarly this would not be a valid order (non-overlapping ranges but not ordered):
869+
///
870+
/// - file1: min(col1) = 20, max(col1) = 30
871+
/// - file2: min(col1) = 10, max(col1) = 15
872+
///
873+
/// On the other hand if we had:
874+
///
875+
/// - file1: min(col1) = 5, max(col1) = 15
876+
/// - file2: min(col1) = 16, max(col1) = 25
877+
///
878+
/// Then we know that reading file1 followed by file2 will produce ordered output,
879+
/// so `col1 ASC` would be retained.
880+
///
881+
/// Note that we are checking for ordering *within* *each* file group / partition,
882+
/// files in different partitions are read independently and do not affect each other's ordering.
883+
/// Merging of the multiple partition streams into a single ordered stream is handled
884+
/// upstream e.g. by `SortPreservingMergeExec`.
885+
fn validated_output_ordering(&self) -> Vec<LexOrdering> {
886+
let schema = self.file_source.table_schema().table_schema();
887+
validate_orderings(&self.output_ordering, schema, &self.file_groups, None)
888+
}
889+
856890
/// Get the file schema (schema of the files without partition columns)
857891
pub fn file_schema(&self) -> &SchemaRef {
858892
self.file_source.table_schema().file_schema()
@@ -1202,6 +1236,51 @@ fn ordered_column_indices_from_projection(
12021236
.collect::<Option<Vec<usize>>>()
12031237
}
12041238

1239+
/// Check whether a given ordering is valid for all file groups by verifying
1240+
/// that files within each group are sorted according to their min/max statistics.
1241+
///
1242+
/// For single-file (or empty) groups, the ordering is trivially valid.
1243+
/// For multi-file groups, we check that the min/max statistics for the sort
1244+
/// columns are in order and non-overlapping (or touching at boundaries).
1245+
///
1246+
/// `projection` maps projected column indices back to table-schema indices
1247+
/// when validating after projection; pass `None` when validating at
1248+
/// table-schema level.
1249+
fn is_ordering_valid_for_file_groups(
1250+
file_groups: &[FileGroup],
1251+
ordering: &LexOrdering,
1252+
schema: &SchemaRef,
1253+
projection: Option<&[usize]>,
1254+
) -> bool {
1255+
file_groups.iter().all(|group| {
1256+
if group.len() <= 1 {
1257+
return true; // single-file groups are trivially sorted
1258+
}
1259+
match MinMaxStatistics::new_from_files(ordering, schema, projection, group.iter())
1260+
{
1261+
Ok(stats) => stats.is_sorted(),
1262+
Err(_) => false, // can't prove sorted → reject
1263+
}
1264+
})
1265+
}
1266+
1267+
/// Filters orderings to retain only those valid for all file groups,
1268+
/// verified via min/max statistics.
1269+
fn validate_orderings(
1270+
orderings: &[LexOrdering],
1271+
schema: &SchemaRef,
1272+
file_groups: &[FileGroup],
1273+
projection: Option<&[usize]>,
1274+
) -> Vec<LexOrdering> {
1275+
orderings
1276+
.iter()
1277+
.filter(|ordering| {
1278+
is_ordering_valid_for_file_groups(file_groups, ordering, schema, projection)
1279+
})
1280+
.cloned()
1281+
.collect()
1282+
}
1283+
12051284
/// The various listing tables does not attempt to read all files
12061285
/// concurrently, instead they will read files in sequence within a
12071286
/// partition. This is an important property as it allows plans to
@@ -1268,52 +1347,47 @@ fn get_projected_output_ordering(
12681347
let projected_orderings =
12691348
project_orderings(&base_config.output_ordering, projected_schema);
12701349

1271-
let mut all_orderings = vec![];
1272-
for new_ordering in projected_orderings {
1273-
// Check if any file groups are not sorted
1274-
if base_config.file_groups.iter().any(|group| {
1275-
if group.len() <= 1 {
1276-
// File groups with <= 1 files are always sorted
1277-
return false;
1278-
}
1279-
1280-
let Some(indices) = base_config
1281-
.file_source
1282-
.projection()
1283-
.as_ref()
1284-
.map(|p| ordered_column_indices_from_projection(p))
1285-
else {
1286-
// Can't determine if ordered without a simple projection
1287-
return true;
1288-
};
1289-
1290-
let statistics = match MinMaxStatistics::new_from_files(
1291-
&new_ordering,
1350+
let indices = base_config
1351+
.file_source
1352+
.projection()
1353+
.as_ref()
1354+
.map(|p| ordered_column_indices_from_projection(p));
1355+
1356+
match indices {
1357+
Some(Some(indices)) => {
1358+
// Simple column projection — validate with statistics
1359+
validate_orderings(
1360+
&projected_orderings,
12921361
projected_schema,
1293-
indices.as_deref(),
1294-
group.iter(),
1295-
) {
1296-
Ok(statistics) => statistics,
1297-
Err(e) => {
1298-
log::trace!("Error fetching statistics for file group: {e}");
1299-
// we can't prove that it's ordered, so we have to reject it
1300-
return true;
1301-
}
1302-
};
1303-
1304-
!statistics.is_sorted()
1305-
}) {
1306-
debug!(
1307-
"Skipping specified output ordering {:?}. \
1308-
Some file groups couldn't be determined to be sorted: {:?}",
1309-
base_config.output_ordering[0], base_config.file_groups
1310-
);
1311-
continue;
1362+
&base_config.file_groups,
1363+
Some(indices.as_slice()),
1364+
)
1365+
}
1366+
None => {
1367+
// No projection — validate with statistics (no remapping needed)
1368+
validate_orderings(
1369+
&projected_orderings,
1370+
projected_schema,
1371+
&base_config.file_groups,
1372+
None,
1373+
)
1374+
}
1375+
Some(None) => {
1376+
// Complex projection (expressions, not simple columns) — can't
1377+
// determine column indices for statistics. Still valid if all
1378+
// file groups have at most one file.
1379+
if base_config.file_groups.iter().all(|g| g.len() <= 1) {
1380+
projected_orderings
1381+
} else {
1382+
debug!(
1383+
"Skipping specified output orderings. \
1384+
Some file groups couldn't be determined to be sorted: {:?}",
1385+
base_config.file_groups
1386+
);
1387+
vec![]
1388+
}
13121389
}
1313-
1314-
all_orderings.push(new_ordering);
13151390
}
1316-
all_orderings
13171391
}
13181392

13191393
/// Convert type to a type suitable for use as a `ListingTable`

datafusion/datasource/src/statistics.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,11 +266,12 @@ impl MinMaxStatistics {
266266
}
267267

268268
/// Check if the min/max statistics are in order and non-overlapping
269+
/// (or touching at boundaries)
269270
pub fn is_sorted(&self) -> bool {
270271
self.max_by_sort_order
271272
.iter()
272273
.zip(self.min_by_sort_order.iter().skip(1))
273-
.all(|(max, next_min)| max < next_min)
274+
.all(|(max, next_min)| max <= next_min)
274275
}
275276
}
276277

datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,4 +274,4 @@ logical_plan
274274
02)--TableScan: test_table projection=[constant_col]
275275
physical_plan
276276
01)SortPreservingMergeExec: [constant_col@0 ASC NULLS LAST]
277-
02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[constant_col], file_type=parquet
277+
02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[constant_col], output_ordering=[constant_col@0 ASC NULLS LAST], file_type=parquet

0 commit comments

Comments
 (0)