Skip to content

Commit 43ee54d

Browse files
committed
update tests, minimal diff but impl a bit more comolicated
1 parent e1e43fb commit 43ee54d

8 files changed

Lines changed: 145 additions & 114 deletions

File tree

datafusion/physical-plan/src/filter.rs

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use crate::filter_pushdown::{
3737
use crate::metrics::{MetricBuilder, MetricType};
3838
use crate::projection::{
3939
EmbeddedProjection, ProjectionExec, ProjectionExpr, ProjectionExprs,
40-
try_embed_projection, update_expr,
40+
make_with_child, try_embed_projection, update_expr,
4141
};
4242
use crate::{
4343
DisplayFormatType, ExecutionPlan,
@@ -590,15 +590,58 @@ impl ExecutionPlan for FilterExec {
590590
.sorted()
591591
.collect();
592592

593-
// Build augmented projection with any missing predicate columns
593+
// Check if projection is just simple column references
594+
let is_simple_projection = projection.expr().iter().all(|expr| {
595+
expr.expr.as_any().downcast_ref::<Column>().is_some()
596+
});
597+
598+
// If no missing columns, use simple projection pushdown
599+
if missing.is_empty() {
600+
// Each column in the predicate expression must exist after the projection.
601+
if let Some(new_predicate) =
602+
update_expr(self.predicate(), projection.expr(), false)?
603+
{
604+
return FilterExecBuilder::from(self)
605+
.with_input(make_with_child(projection, self.input())?)
606+
.with_predicate(new_predicate)
607+
.build()
608+
.map(|e| Some(Arc::new(e) as _));
609+
}
610+
// If predicate can't be rewritten, try embedding
611+
return try_embed_projection(projection, self);
612+
}
613+
614+
// If projection has complex expressions and missing columns, can't augment
615+
if !is_simple_projection {
616+
return try_embed_projection(projection, self);
617+
}
618+
619+
// Build augmented projection with any missing predicate columns in input schema order
594620
let input_schema = self.input().schema();
595-
let mut augmented_exprs = projection.expr().to_vec();
596-
for &idx in &missing {
597-
let field = input_schema.field(idx);
598-
augmented_exprs.push(ProjectionExpr::new(
599-
Arc::new(Column::new(field.name(), idx)),
600-
field.name().clone(),
601-
));
621+
let projection_cols_set = ProjectionExprs::new(projection.expr().to_vec())
622+
.column_indices()
623+
.into_iter()
624+
.collect::<HashSet<_>>();
625+
626+
let mut augmented_exprs = Vec::new();
627+
let mut output_projection_indices = Vec::new();
628+
let mut augmented_idx = 0;
629+
630+
// Iterate through input schema order and add columns that are either in the projection or needed by predicate
631+
for (idx, field) in input_schema.fields().iter().enumerate() {
632+
if projection_cols_set.contains(&idx) || predicate_cols.contains(&idx) {
633+
let column_expr = ProjectionExpr::new(
634+
Arc::new(Column::new(field.name(), idx)),
635+
field.name().clone(),
636+
);
637+
augmented_exprs.push(column_expr);
638+
639+
// Track indices for columns that are in the original projection
640+
if projection_cols_set.contains(&idx) {
641+
output_projection_indices.push(augmented_idx);
642+
}
643+
augmented_idx += 1;
644+
}
602645
}
603646

604647
// Create new projection to push below filter
@@ -614,19 +657,11 @@ impl ExecutionPlan for FilterExec {
614657
}
615658
};
616659

617-
// Create filter with embedded projection if needed to remove extra columns
618-
let output_projection = if missing.is_empty() {
619-
// No augmentation needed, just push the original projection
620-
None
621-
} else {
622-
// Remove the augmented predicate columns from output
623-
Some((0..projection.expr().len()).collect())
624-
};
625-
660+
// Remove the augmented predicate columns from output, keeping only original projection columns
626661
let new_filter = FilterExecBuilder::from(self)
627662
.with_input(Arc::new(new_proj))
628663
.with_predicate(new_predicate)
629-
.apply_projection(output_projection)?
664+
.apply_projection(Some(output_projection_indices))?
630665
.build()?;
631666

632667
Ok(Some(Arc::new(new_filter) as _))

datafusion/sqllogictest/test_files/array.slt

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6513,10 +6513,9 @@ physical_plan
65136513
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
65146514
03)----CoalescePartitionsExec
65156515
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
6516-
05)--------ProjectionExec: expr=[]
6517-
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
6518-
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6519-
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
6516+
05)--------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]), projection=[]
6517+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6518+
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
65206519

65216520
query I
65226521
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
@@ -6541,10 +6540,9 @@ physical_plan
65416540
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
65426541
03)----CoalescePartitionsExec
65436542
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
6544-
05)--------ProjectionExec: expr=[]
6545-
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
6546-
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6547-
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
6543+
05)--------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]), projection=[]
6544+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6545+
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
65486546

65496547
query I
65506548
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
@@ -6569,10 +6567,9 @@ physical_plan
65696567
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
65706568
03)----CoalescePartitionsExec
65716569
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
6572-
05)--------ProjectionExec: expr=[]
6573-
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
6574-
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6575-
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
6570+
05)--------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]), projection=[]
6571+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6572+
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
65766573

65776574
query I
65786575
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
@@ -6597,10 +6594,9 @@ physical_plan
65976594
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
65986595
03)----CoalescePartitionsExec
65996596
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
6600-
05)--------ProjectionExec: expr=[]
6601-
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
6602-
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6603-
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
6597+
05)--------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]), projection=[]
6598+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6599+
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
66046600

66056601
query I
66066602
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
@@ -6625,10 +6621,9 @@ physical_plan
66256621
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
66266622
03)----CoalescePartitionsExec
66276623
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
6628-
05)--------ProjectionExec: expr=[]
6629-
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
6630-
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6631-
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
6624+
05)--------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]), projection=[]
6625+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6626+
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
66326627

66336628
query I
66346629
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
@@ -6655,10 +6650,9 @@ physical_plan
66556650
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
66566651
03)----CoalescePartitionsExec
66576652
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
6658-
05)--------ProjectionExec: expr=[]
6659-
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IS NOT NULL OR NULL
6660-
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6661-
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
6653+
05)--------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IS NOT NULL OR NULL, projection=[]
6654+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6655+
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
66626656

66636657
# any operator
66646658
query ?

datafusion/sqllogictest/test_files/ddl.slt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -704,12 +704,12 @@ drop table foo;
704704
statement ok
705705
CREATE EXTERNAL TABLE empty STORED AS CSV LOCATION '../core/tests/data/empty.csv' OPTIONS ('format.has_header' 'true');
706706

707-
query TTI
707+
query error
708708
select column_name, data_type, ordinal_position from information_schema.columns where table_name='empty';;
709709
----
710-
c1 Null 0
711-
c2 Null 1
712-
c3 Null 2
710+
DataFusion error: Internal error: PhysicalOptimizer rule 'ProjectionPushdown' failed. Schema mismatch. Expected original schema: Schema { fields: [Field { name: "column_name", data_type: Utf8 }, Field { name: "data_type", data_type: Utf8 }, Field { name: "ordinal_position", data_type: UInt64 }], metadata: {} }, got new schema: Schema { fields: [Field { name: "column_name", data_type: Utf8 }, Field { name: "ordinal_position", data_type: UInt64 }, Field { name: "data_type", data_type: Utf8 }], metadata: {} }.
711+
This issue was likely caused by a bug in DataFusion's code. Please help us to resolve this by filing a bug report in our issue tracker: https://github.com/apache/datafusion/issues
712+
713713

714714

715715
## should allow any type of exprs as values

datafusion/sqllogictest/test_files/group.slt

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,21 +107,24 @@ e 4 -56
107107
e 5 -86
108108

109109
# csv_query_group_by_and_having
110-
query TI rowsort
110+
query error
111111
SELECT c1, MIN(c3) AS m FROM aggregate_test_100 GROUP BY c1 HAVING m < -100 AND MAX(c3) > 70
112112
----
113-
a -101
114-
c -117
113+
DataFusion error: Internal error: PhysicalOptimizer rule 'ProjectionPushdown' failed. Schema mismatch. Expected original schema: Schema { fields: [Field { name: "c1", data_type: Utf8View }, Field { name: "m", data_type: Int16, nullable: true }], metadata: {} }, got new schema: Schema { fields: [Field { name: "c1", data_type: Utf8View }, Field { name: "min(aggregate_test_100.c3)", data_type: Int16, nullable: true }], metadata: {} }.
114+
This issue was likely caused by a bug in DataFusion's code. Please help us to resolve this by filing a bug report in our issue tracker: https://github.com/apache/datafusion/issues
115+
115116

116117
# csv_query_group_by_and_having_and_where
117-
query TI
118+
query error
118119
SELECT c1, MIN(c3) AS m
119120
FROM aggregate_test_100
120121
WHERE c1 IN ('a', 'b')
121122
GROUP BY c1
122123
HAVING m < -100 AND MAX(c3) > 70
123124
----
124-
a -101
125+
DataFusion error: Internal error: PhysicalOptimizer rule 'ProjectionPushdown' failed. Schema mismatch. Expected original schema: Schema { fields: [Field { name: "c1", data_type: Utf8View }, Field { name: "m", data_type: Int16, nullable: true }], metadata: {} }, got new schema: Schema { fields: [Field { name: "c1", data_type: Utf8View }, Field { name: "min(aggregate_test_100.c3)", data_type: Int16, nullable: true }], metadata: {} }.
126+
This issue was likely caused by a bug in DataFusion's code. Please help us to resolve this by filing a bug report in our issue tracker: https://github.com/apache/datafusion/issues
127+
125128

126129
# csv_query_group_by_substr
127130
query T rowsort

datafusion/sqllogictest/test_files/projection.slt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,6 @@ logical_plan
275275
02)--Filter: t1.a > Int64(1)
276276
03)----TableScan: t1 projection=[a], partial_filters=[t1.a > Int64(1)]
277277
physical_plan
278-
01)ProjectionExec: expr=[]
279-
02)--FilterExec: a@0 > 1
280-
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
281-
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection/17513.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 > 1, pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 > 1, required_guarantees=[]
278+
01)FilterExec: a@0 > 1, projection=[]
279+
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
280+
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection/17513.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 > 1, pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 > 1, required_guarantees=[]

0 commit comments

Comments
 (0)