Skip to content

Commit 8aaa274

Browse files
haohuaijinadriangb
andauthored
[branch-52] fix: filter pushdown when merge filter (apache#20110) (apache#20289)
## Which issue does this PR close? - related to apache#20287 ## Rationale for this change see issue apache#20109 ## What changes are included in this PR? 1. Remap parent filter expressions: When a FilterExec has a projection, remap unsupported parent filter expressions from output schema coordinates to input schema coordinates using `reassign_expr_columns()` before combining them with the current filter's predicates. 2. Preserve projection: When creating the merged FilterExec, preserve the original projection instead of discarding it . ## Are these changes tested? yes, add some test case ## Are there any user-facing changes? --------- ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes #. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com>
1 parent 9f3ddce commit 8aaa274

2 files changed

Lines changed: 108 additions & 7 deletions

File tree

  • datafusion
    • core/tests/physical_optimizer/filter_pushdown
    • physical-plan/src

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3600,3 +3600,88 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() {
36003600
);
36013601
}
36023602
}
3603+
3604+
/// Regression test for https://github.com/apache/datafusion/issues/20109
3605+
#[tokio::test]
3606+
async fn test_filter_with_projection_pushdown() {
3607+
use arrow::array::{Int64Array, RecordBatch, StringArray};
3608+
use datafusion_physical_plan::collect;
3609+
use datafusion_physical_plan::filter::FilterExec;
3610+
3611+
// Create schema: [time, event, size]
3612+
let schema = Arc::new(Schema::new(vec![
3613+
Field::new("time", DataType::Int64, false),
3614+
Field::new("event", DataType::Utf8, false),
3615+
Field::new("size", DataType::Int64, false),
3616+
]));
3617+
3618+
// Create sample data
3619+
let timestamps = vec![100i64, 200, 300, 400, 500];
3620+
let events = vec!["Ingestion", "Ingestion", "Query", "Ingestion", "Query"];
3621+
let sizes = vec![10i64, 20, 30, 40, 50];
3622+
3623+
let batch = RecordBatch::try_new(
3624+
schema.clone(),
3625+
vec![
3626+
Arc::new(Int64Array::from(timestamps)),
3627+
Arc::new(StringArray::from(events)),
3628+
Arc::new(Int64Array::from(sizes)),
3629+
],
3630+
)
3631+
.unwrap();
3632+
3633+
// Create data source
3634+
let memory_exec = datafusion_datasource::memory::MemorySourceConfig::try_new_exec(
3635+
&[vec![batch]],
3636+
schema.clone(),
3637+
None,
3638+
)
3639+
.unwrap();
3640+
3641+
// First FilterExec: time < 350 with projection=[event@1, size@2]
3642+
let time_col = col("time", &memory_exec.schema()).unwrap();
3643+
let time_filter = Arc::new(BinaryExpr::new(
3644+
time_col,
3645+
Operator::Lt,
3646+
Arc::new(Literal::new(ScalarValue::Int64(Some(350)))),
3647+
));
3648+
let filter1 = Arc::new(
3649+
FilterExec::try_new(time_filter, memory_exec)
3650+
.unwrap()
3651+
.with_projection(Some(vec![1, 2]))
3652+
.unwrap(),
3653+
);
3654+
3655+
// Second FilterExec: event = 'Ingestion' with projection=[size@1]
3656+
let event_col = col("event", &filter1.schema()).unwrap();
3657+
let event_filter = Arc::new(BinaryExpr::new(
3658+
event_col,
3659+
Operator::Eq,
3660+
Arc::new(Literal::new(ScalarValue::Utf8(Some(
3661+
"Ingestion".to_string(),
3662+
)))),
3663+
));
3664+
let filter2 = Arc::new(
3665+
FilterExec::try_new(event_filter, filter1)
3666+
.unwrap()
3667+
.with_projection(Some(vec![1]))
3668+
.unwrap(),
3669+
);
3670+
3671+
// Apply filter pushdown optimization
3672+
let config = ConfigOptions::default();
3673+
let optimized_plan = FilterPushdown::new()
3674+
.optimize(Arc::clone(&filter2) as Arc<dyn ExecutionPlan>, &config)
3675+
.unwrap();
3676+
3677+
// Execute the optimized plan - this should not error
3678+
let ctx = SessionContext::new();
3679+
let result = collect(optimized_plan, ctx.task_ctx()).await.unwrap();
3680+
3681+
// Verify results: should return rows where time < 350 AND event = 'Ingestion'
3682+
// That's rows with time=100,200 (both have event='Ingestion'), so sizes 10,20
3683+
let expected = [
3684+
"+------+", "| size |", "+------+", "| 10 |", "| 20 |", "+------+",
3685+
];
3686+
assert_batches_eq!(expected, &result);
3687+
}

datafusion/physical-plan/src/filter.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ use datafusion_expr::Operator;
5757
use datafusion_physical_expr::equivalence::ProjectionMapping;
5858
use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit};
5959
use datafusion_physical_expr::intervals::utils::check_support;
60-
use datafusion_physical_expr::utils::collect_columns;
60+
use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
6161
use datafusion_physical_expr::{
6262
AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, analyze,
6363
conjunction, split_conjunction,
@@ -526,10 +526,26 @@ impl ExecutionPlan for FilterExec {
526526
return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
527527
}
528528
// We absorb any parent filters that were not handled by our children
529-
let unsupported_parent_filters =
530-
child_pushdown_result.parent_filters.iter().filter_map(|f| {
531-
matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
532-
});
529+
let mut unsupported_parent_filters: Vec<Arc<dyn PhysicalExpr>> =
530+
child_pushdown_result
531+
.parent_filters
532+
.iter()
533+
.filter_map(|f| {
534+
matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
535+
})
536+
.collect();
537+
538+
// If this FilterExec has a projection, the unsupported parent filters
539+
// are in the output schema (after projection) coordinates. We need to
540+
// remap them to the input schema coordinates before combining with self filters.
541+
if self.projection.is_some() {
542+
let input_schema = self.input().schema();
543+
unsupported_parent_filters = unsupported_parent_filters
544+
.into_iter()
545+
.map(|expr| reassign_expr_columns(expr, &input_schema))
546+
.collect::<Result<Vec<_>>>()?;
547+
}
548+
533549
let unsupported_self_filters = child_pushdown_result
534550
.self_filters
535551
.first()
@@ -577,7 +593,7 @@ impl ExecutionPlan for FilterExec {
577593
// The new predicate is the same as our current predicate
578594
None
579595
} else {
580-
// Create a new FilterExec with the new predicate
596+
// Create a new FilterExec with the new predicate, preserving the projection
581597
let new = FilterExec {
582598
predicate: Arc::clone(&new_predicate),
583599
input: Arc::clone(&filter_input),
@@ -589,7 +605,7 @@ impl ExecutionPlan for FilterExec {
589605
self.default_selectivity,
590606
self.projection.as_ref(),
591607
)?,
592-
projection: None,
608+
projection: self.projection.clone(),
593609
batch_size: self.batch_size,
594610
fetch: self.fetch,
595611
};

0 commit comments

Comments
 (0)