Skip to content

Commit 0f74dbf

Browse files
authored
FilterExec should remap indices of parent dynamic filters (apache#20286)
## Which issue does this PR close? - Closes apache#20285. ## Rationale for this change Parent dynamic filters should be remapped from input to output schema, otherwise the indices can be wrong ## What changes are included in this PR? - Use with_child to remap parent filters - Add unit test that catches this issue ## Are these changes tested? Yes ## Are there any user-facing changes? No
1 parent 49776a6 commit 0f74dbf

1 file changed

Lines changed: 57 additions & 11 deletions

File tree

datafusion/physical-plan/src/filter.rs

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::common::can_project;
3232
use crate::execution_plan::CardinalityEffect;
3333
use crate::filter_pushdown::{
3434
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
35-
FilterPushdownPropagation, PushedDown, PushedDownPredicate,
35+
FilterPushdownPropagation, PushedDown,
3636
};
3737
use crate::metrics::{MetricBuilder, MetricType};
3838
use crate::projection::{
@@ -590,16 +590,9 @@ impl ExecutionPlan for FilterExec {
590590
_config: &ConfigOptions,
591591
) -> Result<FilterDescription> {
592592
if !matches!(phase, FilterPushdownPhase::Pre) {
593-
// For non-pre phase, filters pass through unchanged
594-
let filter_supports = parent_filters
595-
.into_iter()
596-
.map(PushedDownPredicate::supported)
597-
.collect();
598-
599-
return Ok(FilterDescription::new().with_child(ChildFilterDescription {
600-
parent_filters: filter_supports,
601-
self_filters: vec![],
602-
}));
593+
let child =
594+
ChildFilterDescription::from_child(&parent_filters, self.input())?;
595+
return Ok(FilterDescription::new().with_child(child));
603596
}
604597

605598
let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
@@ -2011,4 +2004,57 @@ mod tests {
20112004

20122005
Ok(())
20132006
}
2007+
2008+
#[test]
2009+
fn test_filter_with_projection_remaps_post_phase_parent_filters() -> Result<()> {
2010+
// Test that FilterExec with a projection must remap parent dynamic
2011+
// filter column indices from its output schema to the input schema
2012+
// before passing them to the child.
2013+
let input_schema = Arc::new(Schema::new(vec![
2014+
Field::new("a", DataType::Int32, false),
2015+
Field::new("b", DataType::Utf8, false),
2016+
Field::new("c", DataType::Float64, false),
2017+
]));
2018+
let input = Arc::new(EmptyExec::new(Arc::clone(&input_schema)));
2019+
2020+
// FilterExec: a > 0, projection=[c@2]
2021+
let predicate = Arc::new(BinaryExpr::new(
2022+
Arc::new(Column::new("a", 0)),
2023+
Operator::Gt,
2024+
Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
2025+
));
2026+
let filter = FilterExecBuilder::new(predicate, input)
2027+
.apply_projection(Some(vec![2]))?
2028+
.build()?;
2029+
2030+
// Output schema should be [c:Float64]
2031+
let output_schema = filter.schema();
2032+
assert_eq!(output_schema.fields().len(), 1);
2033+
assert_eq!(output_schema.field(0).name(), "c");
2034+
2035+
// Simulate a parent dynamic filter referencing output column c@0
2036+
let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c", 0));
2037+
2038+
let config = ConfigOptions::new();
2039+
let desc = filter.gather_filters_for_pushdown(
2040+
FilterPushdownPhase::Post,
2041+
vec![parent_filter],
2042+
&config,
2043+
)?;
2044+
2045+
// The filter pushed to the child must reference c@2 (input schema),
2046+
// not c@0 (output schema).
2047+
let parent_filters = desc.parent_filters();
2048+
assert_eq!(parent_filters.len(), 1); // one child
2049+
assert_eq!(parent_filters[0].len(), 1); // one filter
2050+
let remapped = &parent_filters[0][0].predicate;
2051+
let display = format!("{remapped}");
2052+
assert_eq!(
2053+
display, "c@2",
2054+
"Post-phase parent filter column index must be remapped \
2055+
from output schema (c@0) to input schema (c@2)"
2056+
);
2057+
2058+
Ok(())
2059+
}
20142060
}

0 commit comments

Comments
 (0)