Skip to content

Commit 76751f1

Browse files
alambhaohuaijin
andauthored
[branch-53] fix: interval analysis error when have two filterexec that inner filter proves zero selectivity (#20743) (#20882)
- Part of #19692 - Closes #20742 on branch-53 This PR: - Backports #20743 from @haohuaijin to the branch-53 line Co-authored-by: Huaijin <haohuaijin@gmail.com>
1 parent 519866c commit 76751f1

2 files changed

Lines changed: 93 additions & 22 deletions

File tree

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -386,17 +386,17 @@ mod test {
386386
column_statistics: vec![
387387
ColumnStatistics {
388388
null_count: Precision::Exact(0),
389-
max_value: Precision::Exact(ScalarValue::Null),
390-
min_value: Precision::Exact(ScalarValue::Null),
391-
sum_value: Precision::Exact(ScalarValue::Null),
389+
max_value: Precision::Exact(ScalarValue::Int32(None)),
390+
min_value: Precision::Exact(ScalarValue::Int32(None)),
391+
sum_value: Precision::Exact(ScalarValue::Int32(None)),
392392
distinct_count: Precision::Exact(0),
393393
byte_size: Precision::Exact(16),
394394
},
395395
ColumnStatistics {
396396
null_count: Precision::Exact(0),
397-
max_value: Precision::Exact(ScalarValue::Null),
398-
min_value: Precision::Exact(ScalarValue::Null),
399-
sum_value: Precision::Exact(ScalarValue::Null),
397+
max_value: Precision::Exact(ScalarValue::Date32(None)),
398+
min_value: Precision::Exact(ScalarValue::Date32(None)),
399+
sum_value: Precision::Exact(ScalarValue::Date32(None)),
400400
distinct_count: Precision::Exact(0),
401401
byte_size: Precision::Exact(16), // 4 rows * 4 bytes (Date32)
402402
},
@@ -415,17 +415,17 @@ mod test {
415415
column_statistics: vec![
416416
ColumnStatistics {
417417
null_count: Precision::Exact(0),
418-
max_value: Precision::Exact(ScalarValue::Null),
419-
min_value: Precision::Exact(ScalarValue::Null),
420-
sum_value: Precision::Exact(ScalarValue::Null),
418+
max_value: Precision::Exact(ScalarValue::Int32(None)),
419+
min_value: Precision::Exact(ScalarValue::Int32(None)),
420+
sum_value: Precision::Exact(ScalarValue::Int32(None)),
421421
distinct_count: Precision::Exact(0),
422422
byte_size: Precision::Exact(8),
423423
},
424424
ColumnStatistics {
425425
null_count: Precision::Exact(0),
426-
max_value: Precision::Exact(ScalarValue::Null),
427-
min_value: Precision::Exact(ScalarValue::Null),
428-
sum_value: Precision::Exact(ScalarValue::Null),
426+
max_value: Precision::Exact(ScalarValue::Date32(None)),
427+
min_value: Precision::Exact(ScalarValue::Date32(None)),
428+
sum_value: Precision::Exact(ScalarValue::Date32(None)),
429429
distinct_count: Precision::Exact(0),
430430
byte_size: Precision::Exact(8), // 2 rows * 4 bytes (Date32)
431431
},

datafusion/physical-plan/src/filter.rs

Lines changed: 81 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ impl FilterExec {
337337
let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
338338

339339
let column_statistics = collect_new_statistics(
340+
schema,
340341
&input_stats.column_statistics,
341342
analysis_ctx.boundaries,
342343
);
@@ -757,6 +758,7 @@ impl EmbeddedProjection for FilterExec {
757758
/// is adjusted by using the next/previous value for its data type to convert
758759
/// it into a closed bound.
759760
fn collect_new_statistics(
761+
schema: &SchemaRef,
760762
input_column_stats: &[ColumnStatistics],
761763
analysis_boundaries: Vec<ExprBoundaries>,
762764
) -> Vec<ColumnStatistics> {
@@ -773,12 +775,17 @@ fn collect_new_statistics(
773775
},
774776
)| {
775777
let Some(interval) = interval else {
776-
// If the interval is `None`, we can say that there are no rows:
778+
// If the interval is `None`, we can say that there are no rows.
779+
// Use a typed null to preserve the column's data type, so that
780+
// downstream interval analysis can still intersect intervals
781+
// of the same type.
782+
let typed_null = ScalarValue::try_from(schema.field(idx).data_type())
783+
.unwrap_or(ScalarValue::Null);
777784
return ColumnStatistics {
778785
null_count: Precision::Exact(0),
779-
max_value: Precision::Exact(ScalarValue::Null),
780-
min_value: Precision::Exact(ScalarValue::Null),
781-
sum_value: Precision::Exact(ScalarValue::Null),
786+
max_value: Precision::Exact(typed_null.clone()),
787+
min_value: Precision::Exact(typed_null.clone()),
788+
sum_value: Precision::Exact(typed_null),
782789
distinct_count: Precision::Exact(0),
783790
byte_size: input_column_stats[idx].byte_size,
784791
};
@@ -1471,17 +1478,17 @@ mod tests {
14711478
statistics.column_statistics,
14721479
vec![
14731480
ColumnStatistics {
1474-
min_value: Precision::Exact(ScalarValue::Null),
1475-
max_value: Precision::Exact(ScalarValue::Null),
1476-
sum_value: Precision::Exact(ScalarValue::Null),
1481+
min_value: Precision::Exact(ScalarValue::Int32(None)),
1482+
max_value: Precision::Exact(ScalarValue::Int32(None)),
1483+
sum_value: Precision::Exact(ScalarValue::Int32(None)),
14771484
distinct_count: Precision::Exact(0),
14781485
null_count: Precision::Exact(0),
14791486
byte_size: Precision::Absent,
14801487
},
14811488
ColumnStatistics {
1482-
min_value: Precision::Exact(ScalarValue::Null),
1483-
max_value: Precision::Exact(ScalarValue::Null),
1484-
sum_value: Precision::Exact(ScalarValue::Null),
1489+
min_value: Precision::Exact(ScalarValue::Int32(None)),
1490+
max_value: Precision::Exact(ScalarValue::Int32(None)),
1491+
sum_value: Precision::Exact(ScalarValue::Int32(None)),
14851492
distinct_count: Precision::Exact(0),
14861493
null_count: Precision::Exact(0),
14871494
byte_size: Precision::Absent,
@@ -1492,6 +1499,70 @@ mod tests {
14921499
Ok(())
14931500
}
14941501

1502+
/// Regression test: stacking two FilterExecs where the inner filter
1503+
/// proves zero selectivity should not panic with a type mismatch
1504+
/// during interval intersection.
1505+
///
1506+
/// Previously, when a filter proved no rows could match, the column
1507+
/// statistics used untyped `ScalarValue::Null` (data type `Null`).
1508+
/// If an outer FilterExec then tried to analyze its own predicate
1509+
/// against those statistics, `Interval::intersect` would fail with:
1510+
/// "Only intervals with the same data type are intersectable, lhs:Null, rhs:Int32"
1511+
#[tokio::test]
1512+
async fn test_nested_filter_with_zero_selectivity_inner() -> Result<()> {
1513+
// Inner table: a: [1, 100], b: [1, 3]
1514+
let schema = Schema::new(vec![
1515+
Field::new("a", DataType::Int32, false),
1516+
Field::new("b", DataType::Int32, false),
1517+
]);
1518+
let input = Arc::new(StatisticsExec::new(
1519+
Statistics {
1520+
num_rows: Precision::Inexact(1000),
1521+
total_byte_size: Precision::Inexact(4000),
1522+
column_statistics: vec![
1523+
ColumnStatistics {
1524+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1525+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1526+
..Default::default()
1527+
},
1528+
ColumnStatistics {
1529+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1530+
max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1531+
..Default::default()
1532+
},
1533+
],
1534+
},
1535+
schema,
1536+
));
1537+
1538+
// Inner filter: a > 200 (impossible given a max=100 → zero selectivity)
1539+
let inner_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1540+
Arc::new(Column::new("a", 0)),
1541+
Operator::Gt,
1542+
Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1543+
));
1544+
let inner_filter: Arc<dyn ExecutionPlan> =
1545+
Arc::new(FilterExec::try_new(inner_predicate, input)?);
1546+
1547+
// Outer filter: a = 50
1548+
// Before the fix, this would panic because the inner filter's
1549+
// zero-selectivity statistics produced Null-typed intervals for
1550+
// column `a`, which couldn't intersect with the Int32 literal.
1551+
let outer_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1552+
Arc::new(Column::new("a", 0)),
1553+
Operator::Eq,
1554+
Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1555+
));
1556+
let outer_filter: Arc<dyn ExecutionPlan> =
1557+
Arc::new(FilterExec::try_new(outer_predicate, inner_filter)?);
1558+
1559+
// Should succeed without error
1560+
let statistics = outer_filter.partition_statistics(None)?;
1561+
assert_eq!(statistics.num_rows, Precision::Inexact(0));
1562+
1563+
Ok(())
1564+
}
1565+
14951566
#[tokio::test]
14961567
async fn test_filter_statistics_more_inputs() -> Result<()> {
14971568
let schema = Schema::new(vec![

0 commit comments

Comments
 (0)