Skip to content

Commit 016e2ae

Browse files
authored
[branch-52] FilterExec should remap indices of parent dynamic filters (apache#20286) (apache#20347)
- part of apache#20287 - related to apache#20285 - backports 0f74dbf / apache#20286 from @jackkleeman to branch-52
1 parent 8aaa274 commit 016e2ae

1 file changed

Lines changed: 56 additions & 10 deletions

File tree

datafusion/physical-plan/src/filter.rs

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::common::can_project;
3131
use crate::execution_plan::CardinalityEffect;
3232
use crate::filter_pushdown::{
3333
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
34-
FilterPushdownPropagation, PushedDown, PushedDownPredicate,
34+
FilterPushdownPropagation, PushedDown,
3535
};
3636
use crate::metrics::{MetricBuilder, MetricType};
3737
use crate::projection::{
@@ -494,15 +494,9 @@ impl ExecutionPlan for FilterExec {
494494
) -> Result<FilterDescription> {
495495
if !matches!(phase, FilterPushdownPhase::Pre) {
496496
// For non-pre phase, filters pass through unchanged
497-
let filter_supports = parent_filters
498-
.into_iter()
499-
.map(PushedDownPredicate::supported)
500-
.collect();
501-
502-
return Ok(FilterDescription::new().with_child(ChildFilterDescription {
503-
parent_filters: filter_supports,
504-
self_filters: vec![],
505-
}));
497+
let child =
498+
ChildFilterDescription::from_child(&parent_filters, self.input())?;
499+
return Ok(FilterDescription::new().with_child(child));
506500
}
507501

508502
let child = ChildFilterDescription::from_child(&parent_filters, self.input())?
@@ -1585,4 +1579,56 @@ mod tests {
15851579

15861580
Ok(())
15871581
}
1582+
1583+
#[test]
1584+
fn test_filter_with_projection_remaps_post_phase_parent_filters() -> Result<()> {
1585+
// Test that FilterExec with a projection must remap parent dynamic
1586+
// filter column indices from its output schema to the input schema
1587+
// before passing them to the child.
1588+
let input_schema = Arc::new(Schema::new(vec![
1589+
Field::new("a", DataType::Int32, false),
1590+
Field::new("b", DataType::Utf8, false),
1591+
Field::new("c", DataType::Float64, false),
1592+
]));
1593+
let input = Arc::new(EmptyExec::new(Arc::clone(&input_schema)));
1594+
1595+
// FilterExec: a > 0, projection=[c@2]
1596+
let predicate = Arc::new(BinaryExpr::new(
1597+
Arc::new(Column::new("a", 0)),
1598+
Operator::Gt,
1599+
Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1600+
));
1601+
let filter =
1602+
FilterExec::try_new(predicate, input)?.with_projection(Some(vec![2]))?;
1603+
1604+
// Output schema should be [c:Float64]
1605+
let output_schema = filter.schema();
1606+
assert_eq!(output_schema.fields().len(), 1);
1607+
assert_eq!(output_schema.field(0).name(), "c");
1608+
1609+
// Simulate a parent dynamic filter referencing output column c@0
1610+
let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c", 0));
1611+
1612+
let config = ConfigOptions::new();
1613+
let desc = filter.gather_filters_for_pushdown(
1614+
FilterPushdownPhase::Post,
1615+
vec![parent_filter],
1616+
&config,
1617+
)?;
1618+
1619+
// The filter pushed to the child must reference c@2 (input schema),
1620+
// not c@0 (output schema).
1621+
let parent_filters = desc.parent_filters();
1622+
assert_eq!(parent_filters.len(), 1); // one child
1623+
assert_eq!(parent_filters[0].len(), 1); // one filter
1624+
let remapped = &parent_filters[0][0].predicate;
1625+
let display = format!("{remapped}");
1626+
assert_eq!(
1627+
display, "c@2",
1628+
"Post-phase parent filter column index must be remapped \
1629+
from output schema (c@0) to input schema (c@2)"
1630+
);
1631+
1632+
Ok(())
1633+
}
15881634
}

0 commit comments

Comments
 (0)