Skip to content

Commit 085a1da

Browse files
jackkleemande-bgunter
authored andcommitted
Support pushing down empty projections into joins (apache#20191)
## Which issue does this PR close? - Closes apache#20190. ## Rationale for this change We should push down empty projections into HashJoinExec ## What changes are included in this PR? 1. try_embed_projection should embed empty projections 2. build_batch_empty_build_side should support empty schemas ## Are these changes tested? Yes ## Are there any user-facing changes? No
1 parent 8c58d44 commit 085a1da

6 files changed

Lines changed: 154 additions & 55 deletions

File tree

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1723,3 +1723,47 @@ fn test_cooperative_exec_after_projection() -> Result<()> {
17231723

17241724
Ok(())
17251725
}
1726+
1727+
#[test]
1728+
fn test_hash_join_empty_projection_embeds() -> Result<()> {
1729+
let left_csv = create_simple_csv_exec();
1730+
let right_csv = create_simple_csv_exec();
1731+
1732+
let join = Arc::new(HashJoinExec::try_new(
1733+
left_csv,
1734+
right_csv,
1735+
vec![(Arc::new(Column::new("a", 0)), Arc::new(Column::new("a", 0)))],
1736+
None,
1737+
&JoinType::Right,
1738+
None,
1739+
PartitionMode::CollectLeft,
1740+
NullEquality::NullEqualsNothing,
1741+
false,
1742+
)?);
1743+
1744+
// Empty projection: no columns needed from the join output
1745+
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
1746+
vec![] as Vec<ProjectionExpr>,
1747+
join,
1748+
)?);
1749+
1750+
let after_optimize =
1751+
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
1752+
let after_optimize_string = displayable(after_optimize.as_ref())
1753+
.indent(true)
1754+
.to_string();
1755+
let actual = after_optimize_string.trim();
1756+
1757+
// The empty projection should be embedded into the HashJoinExec,
1758+
// resulting in projection=[] on the join and no ProjectionExec wrapper.
1759+
assert_snapshot!(
1760+
actual,
1761+
@r"
1762+
HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@0, a@0)], projection=[]
1763+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1764+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1765+
"
1766+
);
1767+
1768+
Ok(())
1769+
}

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -981,6 +981,17 @@ pub(crate) fn apply_join_filter_to_indices(
981981
))
982982
}
983983

984+
/// Creates a [RecordBatch] with zero columns but the given row count.
985+
/// Used when a join has an empty projection (e.g. `SELECT count(1) ...`).
986+
fn new_empty_schema_batch(schema: &Schema, row_count: usize) -> Result<RecordBatch> {
987+
let options = RecordBatchOptions::new().with_row_count(Some(row_count));
988+
Ok(RecordBatch::try_new_with_options(
989+
Arc::new(schema.clone()),
990+
vec![],
991+
&options,
992+
)?)
993+
}
994+
984995
/// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`.
985996
/// The resulting batch has [Schema] `schema`.
986997
#[expect(clippy::too_many_arguments)]
@@ -1070,6 +1081,9 @@ pub(crate) fn build_batch_empty_build_side(
10701081
// the remaining joins will return data for the right columns and null for the left ones
10711082
JoinType::Right | JoinType::Full | JoinType::RightAnti | JoinType::RightMark => {
10721083
let num_rows = probe_batch.num_rows();
1084+
if schema.fields().is_empty() {
1085+
return new_empty_schema_batch(schema, num_rows);
1086+
}
10731087
let mut columns: Vec<Arc<dyn Array>> =
10741088
Vec::with_capacity(schema.fields().len());
10751089

@@ -2902,4 +2916,35 @@ mod tests {
29022916

29032917
Ok(())
29042918
}
2919+
2920+
#[test]
2921+
fn test_build_batch_empty_build_side_empty_schema() -> Result<()> {
2922+
// When the output schema has no fields (empty projection pushed into
2923+
// the join), build_batch_empty_build_side should return a RecordBatch
2924+
// with the correct row count but no columns.
2925+
let empty_schema = Schema::empty();
2926+
2927+
let build_batch = RecordBatch::try_new(
2928+
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
2929+
vec![Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3]))],
2930+
)?;
2931+
2932+
let probe_batch = RecordBatch::try_new(
2933+
Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, true)])),
2934+
vec![Arc::new(arrow::array::Int32Array::from(vec![4, 5, 6, 7]))],
2935+
)?;
2936+
2937+
let result = build_batch_empty_build_side(
2938+
&empty_schema,
2939+
&build_batch,
2940+
&probe_batch,
2941+
&[], // no column indices with empty projection
2942+
JoinType::Right,
2943+
)?;
2944+
2945+
assert_eq!(result.num_rows(), 4);
2946+
assert_eq!(result.num_columns(), 0);
2947+
2948+
Ok(())
2949+
}
29052950
}

datafusion/physical-plan/src/projection.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,15 @@ impl RecordBatchStream for ProjectionStream {
553553
}
554554
}
555555

556+
/// Trait for execution plans that can embed a projection, avoiding a separate
557+
/// [`ProjectionExec`] wrapper.
558+
///
559+
/// # Empty projections
560+
///
561+
/// `Some(vec![])` is a valid projection that produces zero output columns while
562+
/// preserving the correct row count. Implementors must ensure that runtime batch
563+
/// construction still returns batches with the right number of rows even when no
564+
/// columns are selected (e.g. for `SELECT count(1) … JOIN …`).
556565
pub trait EmbeddedProjection: ExecutionPlan + Sized {
557566
fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self>;
558567
}
@@ -563,6 +572,15 @@ pub fn try_embed_projection<Exec: EmbeddedProjection + 'static>(
563572
projection: &ProjectionExec,
564573
execution_plan: &Exec,
565574
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
575+
// If the projection has no expressions at all (e.g., ProjectionExec: expr=[]),
576+
// embed an empty projection into the execution plan so it outputs zero columns.
577+
// This avoids allocating throwaway null arrays for build-side columns
578+
// when no output columns are actually needed (e.g., count(1) over a right join).
579+
if projection.expr().is_empty() {
580+
let new_execution_plan = Arc::new(execution_plan.with_projection(Some(vec![]))?);
581+
return Ok(Some(new_execution_plan));
582+
}
583+
566584
// Collect all column indices from the given projection expressions.
567585
let projection_index = collect_column_indices(projection.expr());
568586

datafusion/sqllogictest/test_files/array.slt

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6606,10 +6606,9 @@ physical_plan
66066606
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
66076607
03)----CoalescePartitionsExec
66086608
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
6609-
05)--------ProjectionExec: expr=[]
6610-
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
6611-
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6612-
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
6609+
05)--------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]), projection=[]
6610+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6611+
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
66136612

66146613
query I
66156614
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
@@ -6634,10 +6633,9 @@ physical_plan
66346633
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
66356634
03)----CoalescePartitionsExec
66366635
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
6637-
05)--------ProjectionExec: expr=[]
6638-
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
6639-
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6640-
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
6636+
05)--------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]), projection=[]
6637+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6638+
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
66416639

66426640
query I
66436641
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
@@ -6662,10 +6660,9 @@ physical_plan
66626660
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
66636661
03)----CoalescePartitionsExec
66646662
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
6665-
05)--------ProjectionExec: expr=[]
6666-
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
6667-
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6668-
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
6663+
05)--------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]), projection=[]
6664+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6665+
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
66696666

66706667
query I
66716668
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
@@ -6690,10 +6687,9 @@ physical_plan
66906687
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
66916688
03)----CoalescePartitionsExec
66926689
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
6693-
05)--------ProjectionExec: expr=[]
6694-
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
6695-
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6696-
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
6690+
05)--------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]), projection=[]
6691+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6692+
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
66976693

66986694
query I
66996695
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
@@ -6718,10 +6714,9 @@ physical_plan
67186714
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
67196715
03)----CoalescePartitionsExec
67206716
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
6721-
05)--------ProjectionExec: expr=[]
6722-
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
6723-
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6724-
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
6717+
05)--------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]), projection=[]
6718+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6719+
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
67256720

67266721
query I
67276722
with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i))
@@ -6748,10 +6743,9 @@ physical_plan
67486743
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
67496744
03)----CoalescePartitionsExec
67506745
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
6751-
05)--------ProjectionExec: expr=[]
6752-
06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IS NOT NULL OR NULL
6753-
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6754-
08)--------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
6746+
05)--------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IS NOT NULL OR NULL, projection=[]
6747+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
6748+
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
67556749

67566750
# any operator
67576751
query ?

datafusion/sqllogictest/test_files/joins.slt

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2085,7 +2085,7 @@ SELECT join_t1.t1_id, join_t2.t2_id
20852085
FROM (select t1_id from join_t1 where join_t1.t1_id > 22) as join_t1
20862086
RIGHT JOIN (select t2_id from join_t2 where join_t2.t2_id > 11) as join_t2
20872087
ON join_t1.t1_id < join_t2.t2_id
2088-
ORDER BY 1, 2
2088+
ORDER BY 1, 2
20892089
----
20902090
33 44
20912091
33 55
@@ -3954,7 +3954,7 @@ query TT
39543954
explain select t1_id, t1_name, i from join_t1 t1 cross join lateral (select * from unnest(generate_series(1, t1_int))) as series(i);
39553955
----
39563956
logical_plan
3957-
01)Cross Join:
3957+
01)Cross Join:
39583958
02)--SubqueryAlias: t1
39593959
03)----TableScan: join_t1 projection=[t1_id, t1_name]
39603960
04)--SubqueryAlias: series
@@ -4367,10 +4367,9 @@ logical_plan
43674367
physical_plan
43684368
01)ProjectionExec: expr=[count(Int64(1))@0 as count(*)]
43694369
02)--AggregateExec: mode=Single, gby=[], aggr=[count(Int64(1))]
4370-
03)----ProjectionExec: expr=[]
4371-
04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(binary_col@0, binary_col@0)]
4372-
05)--------DataSourceExec: partitions=1, partition_sizes=[1]
4373-
06)--------DataSourceExec: partitions=1, partition_sizes=[1]
4370+
03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(binary_col@0, binary_col@0)], projection=[]
4371+
04)------DataSourceExec: partitions=1, partition_sizes=[1]
4372+
05)------DataSourceExec: partitions=1, partition_sizes=[1]
43744373

43754374
# Test hash join sort push down
43764375
# Issue: https://github.com/apache/datafusion/issues/13559
@@ -4532,7 +4531,7 @@ query TT
45324531
explain SELECT * FROM person a NATURAL JOIN lineitem b;
45334532
----
45344533
logical_plan
4535-
01)Cross Join:
4534+
01)Cross Join:
45364535
02)--SubqueryAlias: a
45374536
03)----TableScan: person projection=[id, age, state]
45384537
04)--SubqueryAlias: b
@@ -4578,7 +4577,7 @@ query TT
45784577
explain SELECT j1_string, j2_string FROM j1, LATERAL (SELECT * FROM j2 WHERE j1_id < j2_id) AS j2;
45794578
----
45804579
logical_plan
4581-
01)Cross Join:
4580+
01)Cross Join:
45824581
02)--TableScan: j1 projection=[j1_string]
45834582
03)--SubqueryAlias: j2
45844583
04)----Projection: j2.j2_string
@@ -4591,7 +4590,7 @@ query TT
45914590
explain SELECT * FROM j1 JOIN (j2 JOIN j3 ON(j2_id = j3_id - 2)) ON(j1_id = j2_id), LATERAL (SELECT * FROM j3 WHERE j3_string = j2_string) as j4
45924591
----
45934592
logical_plan
4594-
01)Cross Join:
4593+
01)Cross Join:
45954594
02)--Inner Join: CAST(j2.j2_id AS Int64) = CAST(j3.j3_id AS Int64) - Int64(2)
45964595
03)----Inner Join: j1.j1_id = j2.j2_id
45974596
04)------TableScan: j1 projection=[j1_string, j1_id]
@@ -4607,11 +4606,11 @@ query TT
46074606
explain SELECT * FROM j1, LATERAL (SELECT * FROM j1, LATERAL (SELECT * FROM j2 WHERE j1_id = j2_id) as j2) as j2;
46084607
----
46094608
logical_plan
4610-
01)Cross Join:
4609+
01)Cross Join:
46114610
02)--TableScan: j1 projection=[j1_string, j1_id]
46124611
03)--SubqueryAlias: j2
46134612
04)----Subquery:
4614-
05)------Cross Join:
4613+
05)------Cross Join:
46154614
06)--------TableScan: j1 projection=[j1_string, j1_id]
46164615
07)--------SubqueryAlias: j2
46174616
08)----------Subquery:
@@ -4623,7 +4622,7 @@ query TT
46234622
explain SELECT j1_string, j2_string FROM j1 LEFT JOIN LATERAL (SELECT * FROM j2 WHERE j1_id < j2_id) AS j2 ON(true);
46244623
----
46254624
logical_plan
4626-
01)Left Join:
4625+
01)Left Join:
46274626
02)--TableScan: j1 projection=[j1_string]
46284627
03)--SubqueryAlias: j2
46294628
04)----Projection: j2.j2_string
@@ -4636,9 +4635,9 @@ query TT
46364635
explain SELECT * FROM j1, (j2 LEFT JOIN LATERAL (SELECT * FROM j3 WHERE j1_id + j2_id = j3_id) AS j3 ON(true));
46374636
----
46384637
logical_plan
4639-
01)Cross Join:
4638+
01)Cross Join:
46404639
02)--TableScan: j1 projection=[j1_string, j1_id]
4641-
03)--Left Join:
4640+
03)--Left Join:
46424641
04)----TableScan: j2 projection=[j2_string, j2_id]
46434642
05)----SubqueryAlias: j3
46444643
06)------Subquery:
@@ -4650,7 +4649,7 @@ query TT
46504649
explain SELECT * FROM j1, LATERAL (SELECT 1) AS j2;
46514650
----
46524651
logical_plan
4653-
01)Cross Join:
4652+
01)Cross Join:
46544653
02)--TableScan: j1 projection=[j1_string, j1_id]
46554654
03)--SubqueryAlias: j2
46564655
04)----Projection: Int64(1)
@@ -4992,7 +4991,7 @@ FULL JOIN t2 ON k1 = k2
49924991

49934992
# LEFT MARK JOIN
49944993
query TT
4995-
EXPLAIN
4994+
EXPLAIN
49964995
SELECT *
49974996
FROM t2
49984997
WHERE k2 > 0
@@ -5067,14 +5066,14 @@ CREATE OR REPLACE TABLE t1(b INT, c INT, d INT);
50675066

50685067
statement ok
50695068
INSERT INTO t1 VALUES
5070-
(10, 5, 3),
5071-
( 1, 7, 8),
5072-
( 2, 9, 7),
5073-
( 3, 8,10),
5074-
( 5, 6, 6),
5075-
( 0, 4, 9),
5076-
( 4, 8, 7),
5077-
(100,6, 5);
5069+
(10, 5, 3),
5070+
( 1, 7, 8),
5071+
( 2, 9, 7),
5072+
( 3, 8,10),
5073+
( 5, 6, 6),
5074+
( 0, 4, 9),
5075+
( 4, 8, 7),
5076+
(100,6, 5);
50785077

50795078
query I rowsort
50805079
SELECT c

datafusion/sqllogictest/test_files/projection.slt

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,12 +167,12 @@ set datafusion.explain.logical_plan_only = false
167167

168168
# project cast dictionary
169169
query T
170-
SELECT
171-
CASE
170+
SELECT
171+
CASE
172172
WHEN cpu_load_short.host IS NULL THEN ''
173173
ELSE cpu_load_short.host
174174
END AS host
175-
FROM
175+
FROM
176176
cpu_load_short;
177177
----
178178
host1
@@ -275,7 +275,6 @@ logical_plan
275275
02)--Filter: t1.a > Int64(1)
276276
03)----TableScan: t1 projection=[a], partial_filters=[t1.a > Int64(1)]
277277
physical_plan
278-
01)ProjectionExec: expr=[]
279-
02)--FilterExec: a@0 > 1
280-
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
281-
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection/17513.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 > 1, pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 > 1, required_guarantees=[]
278+
01)FilterExec: a@0 > 1, projection=[]
279+
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
280+
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection/17513.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 > 1, pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 > 1, required_guarantees=[]

0 commit comments

Comments
 (0)