Skip to content

Commit d41bfb2

Browse files
alambhaohuaijin
authored andcommitted
[branch-52] fix: interval analysis error when have two filterexec that inner filter proves zero selectivity (apache#20743) (apache#20880)
- Part of apache#20855 - Closes apache#20742 on branch-52 This PR: - Backports apache#20743 from @haohuaijin to the branch-52 line Co-authored-by: Huaijin <haohuaijin@gmail.com>
1 parent 679d4df commit d41bfb2

2 files changed

Lines changed: 93 additions & 22 deletions

File tree

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -387,17 +387,17 @@ mod test {
387387
column_statistics: vec![
388388
ColumnStatistics {
389389
null_count: Precision::Exact(0),
390-
max_value: Precision::Exact(ScalarValue::Null),
391-
min_value: Precision::Exact(ScalarValue::Null),
392-
sum_value: Precision::Exact(ScalarValue::Null),
390+
max_value: Precision::Exact(ScalarValue::Int32(None)),
391+
min_value: Precision::Exact(ScalarValue::Int32(None)),
392+
sum_value: Precision::Exact(ScalarValue::Int32(None)),
393393
distinct_count: Precision::Exact(0),
394394
byte_size: Precision::Exact(16),
395395
},
396396
ColumnStatistics {
397397
null_count: Precision::Exact(0),
398-
max_value: Precision::Exact(ScalarValue::Null),
399-
min_value: Precision::Exact(ScalarValue::Null),
400-
sum_value: Precision::Exact(ScalarValue::Null),
398+
max_value: Precision::Exact(ScalarValue::Date32(None)),
399+
min_value: Precision::Exact(ScalarValue::Date32(None)),
400+
sum_value: Precision::Exact(ScalarValue::Date32(None)),
401401
distinct_count: Precision::Exact(0),
402402
byte_size: Precision::Exact(16), // 4 rows * 4 bytes (Date32)
403403
},
@@ -416,17 +416,17 @@ mod test {
416416
column_statistics: vec![
417417
ColumnStatistics {
418418
null_count: Precision::Exact(0),
419-
max_value: Precision::Exact(ScalarValue::Null),
420-
min_value: Precision::Exact(ScalarValue::Null),
421-
sum_value: Precision::Exact(ScalarValue::Null),
419+
max_value: Precision::Exact(ScalarValue::Int32(None)),
420+
min_value: Precision::Exact(ScalarValue::Int32(None)),
421+
sum_value: Precision::Exact(ScalarValue::Int32(None)),
422422
distinct_count: Precision::Exact(0),
423423
byte_size: Precision::Exact(8),
424424
},
425425
ColumnStatistics {
426426
null_count: Precision::Exact(0),
427-
max_value: Precision::Exact(ScalarValue::Null),
428-
min_value: Precision::Exact(ScalarValue::Null),
429-
sum_value: Precision::Exact(ScalarValue::Null),
427+
max_value: Precision::Exact(ScalarValue::Date32(None)),
428+
min_value: Precision::Exact(ScalarValue::Date32(None)),
429+
sum_value: Precision::Exact(ScalarValue::Date32(None)),
430430
distinct_count: Precision::Exact(0),
431431
byte_size: Precision::Exact(8), // 2 rows * 4 bytes (Date32)
432432
},

datafusion/physical-plan/src/filter.rs

Lines changed: 81 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ impl FilterExec {
232232
let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
233233

234234
let column_statistics = collect_new_statistics(
235+
schema,
235236
&input_stats.column_statistics,
236237
analysis_ctx.boundaries,
237238
);
@@ -637,6 +638,7 @@ impl EmbeddedProjection for FilterExec {
637638
/// is adjusted by using the next/previous value for its data type to convert
638639
/// it into a closed bound.
639640
fn collect_new_statistics(
641+
schema: &SchemaRef,
640642
input_column_stats: &[ColumnStatistics],
641643
analysis_boundaries: Vec<ExprBoundaries>,
642644
) -> Vec<ColumnStatistics> {
@@ -653,12 +655,17 @@ fn collect_new_statistics(
653655
},
654656
)| {
655657
let Some(interval) = interval else {
656-
// If the interval is `None`, we can say that there are no rows:
658+
// If the interval is `None`, we can say that there are no rows.
659+
// Use a typed null to preserve the column's data type, so that
660+
// downstream interval analysis can still intersect intervals
661+
// of the same type.
662+
let typed_null = ScalarValue::try_from(schema.field(idx).data_type())
663+
.unwrap_or(ScalarValue::Null);
657664
return ColumnStatistics {
658665
null_count: Precision::Exact(0),
659-
max_value: Precision::Exact(ScalarValue::Null),
660-
min_value: Precision::Exact(ScalarValue::Null),
661-
sum_value: Precision::Exact(ScalarValue::Null),
666+
max_value: Precision::Exact(typed_null.clone()),
667+
min_value: Precision::Exact(typed_null.clone()),
668+
sum_value: Precision::Exact(typed_null),
662669
distinct_count: Precision::Exact(0),
663670
byte_size: input_column_stats[idx].byte_size,
664671
};
@@ -1351,17 +1358,17 @@ mod tests {
13511358
statistics.column_statistics,
13521359
vec![
13531360
ColumnStatistics {
1354-
min_value: Precision::Exact(ScalarValue::Null),
1355-
max_value: Precision::Exact(ScalarValue::Null),
1356-
sum_value: Precision::Exact(ScalarValue::Null),
1361+
min_value: Precision::Exact(ScalarValue::Int32(None)),
1362+
max_value: Precision::Exact(ScalarValue::Int32(None)),
1363+
sum_value: Precision::Exact(ScalarValue::Int32(None)),
13571364
distinct_count: Precision::Exact(0),
13581365
null_count: Precision::Exact(0),
13591366
byte_size: Precision::Absent,
13601367
},
13611368
ColumnStatistics {
1362-
min_value: Precision::Exact(ScalarValue::Null),
1363-
max_value: Precision::Exact(ScalarValue::Null),
1364-
sum_value: Precision::Exact(ScalarValue::Null),
1369+
min_value: Precision::Exact(ScalarValue::Int32(None)),
1370+
max_value: Precision::Exact(ScalarValue::Int32(None)),
1371+
sum_value: Precision::Exact(ScalarValue::Int32(None)),
13651372
distinct_count: Precision::Exact(0),
13661373
null_count: Precision::Exact(0),
13671374
byte_size: Precision::Absent,
@@ -1372,6 +1379,70 @@ mod tests {
13721379
Ok(())
13731380
}
13741381

1382+
/// Regression test: stacking two FilterExecs where the inner filter
1383+
/// proves zero selectivity should not panic with a type mismatch
1384+
/// during interval intersection.
1385+
///
1386+
/// Previously, when a filter proved no rows could match, the column
1387+
/// statistics used untyped `ScalarValue::Null` (data type `Null`).
1388+
/// If an outer FilterExec then tried to analyze its own predicate
1389+
/// against those statistics, `Interval::intersect` would fail with:
1390+
/// "Only intervals with the same data type are intersectable, lhs:Null, rhs:Int32"
1391+
#[tokio::test]
1392+
async fn test_nested_filter_with_zero_selectivity_inner() -> Result<()> {
1393+
// Inner table: a: [1, 100], b: [1, 3]
1394+
let schema = Schema::new(vec![
1395+
Field::new("a", DataType::Int32, false),
1396+
Field::new("b", DataType::Int32, false),
1397+
]);
1398+
let input = Arc::new(StatisticsExec::new(
1399+
Statistics {
1400+
num_rows: Precision::Inexact(1000),
1401+
total_byte_size: Precision::Inexact(4000),
1402+
column_statistics: vec![
1403+
ColumnStatistics {
1404+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1405+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1406+
..Default::default()
1407+
},
1408+
ColumnStatistics {
1409+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1410+
max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
1411+
..Default::default()
1412+
},
1413+
],
1414+
},
1415+
schema,
1416+
));
1417+
1418+
// Inner filter: a > 200 (impossible given a max=100 → zero selectivity)
1419+
let inner_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1420+
Arc::new(Column::new("a", 0)),
1421+
Operator::Gt,
1422+
Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
1423+
));
1424+
let inner_filter: Arc<dyn ExecutionPlan> =
1425+
Arc::new(FilterExec::try_new(inner_predicate, input)?);
1426+
1427+
// Outer filter: a = 50
1428+
// Before the fix, this would panic because the inner filter's
1429+
// zero-selectivity statistics produced Null-typed intervals for
1430+
// column `a`, which couldn't intersect with the Int32 literal.
1431+
let outer_predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1432+
Arc::new(Column::new("a", 0)),
1433+
Operator::Eq,
1434+
Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
1435+
));
1436+
let outer_filter: Arc<dyn ExecutionPlan> =
1437+
Arc::new(FilterExec::try_new(outer_predicate, inner_filter)?);
1438+
1439+
// Should succeed without error
1440+
let statistics = outer_filter.partition_statistics(None)?;
1441+
assert_eq!(statistics.num_rows, Precision::Inexact(0));
1442+
1443+
Ok(())
1444+
}
1445+
13751446
#[tokio::test]
13761447
async fn test_filter_statistics_more_inputs() -> Result<()> {
13771448
let schema = Schema::new(vec![

0 commit comments

Comments
 (0)