Skip to content

Commit 261c92b

Browse files
committed
Support pushing down empty projections into joins
1 parent 1799c31 commit 261c92b

3 files changed

Lines changed: 92 additions & 0 deletions

File tree

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1723,3 +1723,47 @@ fn test_cooperative_exec_after_projection() -> Result<()> {
17231723

17241724
Ok(())
17251725
}
1726+
1727+
#[test]
1728+
fn test_hash_join_empty_projection_embeds() -> Result<()> {
1729+
let left_csv = create_simple_csv_exec();
1730+
let right_csv = create_simple_csv_exec();
1731+
1732+
let join = Arc::new(HashJoinExec::try_new(
1733+
left_csv,
1734+
right_csv,
1735+
vec![(Arc::new(Column::new("a", 0)), Arc::new(Column::new("a", 0)))],
1736+
None,
1737+
&JoinType::Right,
1738+
None,
1739+
PartitionMode::CollectLeft,
1740+
NullEquality::NullEqualsNothing,
1741+
false,
1742+
)?);
1743+
1744+
// Empty projection: no columns needed from the join output
1745+
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
1746+
vec![] as Vec<ProjectionExpr>,
1747+
join,
1748+
)?);
1749+
1750+
let after_optimize =
1751+
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
1752+
let after_optimize_string = displayable(after_optimize.as_ref())
1753+
.indent(true)
1754+
.to_string();
1755+
let actual = after_optimize_string.trim();
1756+
1757+
// The empty projection should be embedded into the HashJoinExec,
1758+
// resulting in projection=[] on the join and no ProjectionExec wrapper.
1759+
assert_snapshot!(
1760+
actual,
1761+
@r"
1762+
HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@0, a@0)], projection=[]
1763+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1764+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1765+
"
1766+
);
1767+
1768+
Ok(())
1769+
}

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1057,6 +1057,14 @@ pub(crate) fn build_batch_empty_build_side(
10571057
// the remaining joins will return data for the right columns and null for the left ones
10581058
JoinType::Right | JoinType::Full | JoinType::RightAnti | JoinType::RightMark => {
10591059
let num_rows = probe_batch.num_rows();
1060+
if schema.fields().is_empty() {
1061+
let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
1062+
return Ok(RecordBatch::try_new_with_options(
1063+
Arc::new(schema.clone()),
1064+
vec![],
1065+
&options,
1066+
)?);
1067+
}
10601068
let mut columns: Vec<Arc<dyn Array>> =
10611069
Vec::with_capacity(schema.fields().len());
10621070

@@ -2889,4 +2897,35 @@ mod tests {
28892897

28902898
Ok(())
28912899
}
2900+
2901+
#[test]
2902+
fn test_build_batch_empty_build_side_empty_schema() -> Result<()> {
2903+
// When the output schema has no fields (empty projection pushed into
2904+
// the join), build_batch_empty_build_side should return a RecordBatch
2905+
// with the correct row count but no columns.
2906+
let empty_schema = Schema::empty();
2907+
2908+
let build_batch = RecordBatch::try_new(
2909+
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
2910+
vec![Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3]))],
2911+
)?;
2912+
2913+
let probe_batch = RecordBatch::try_new(
2914+
Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, true)])),
2915+
vec![Arc::new(arrow::array::Int32Array::from(vec![4, 5, 6, 7]))],
2916+
)?;
2917+
2918+
let result = build_batch_empty_build_side(
2919+
&empty_schema,
2920+
&build_batch,
2921+
&probe_batch,
2922+
&[], // no column indices with empty projection
2923+
JoinType::Right,
2924+
)?;
2925+
2926+
assert_eq!(result.num_rows(), 4);
2927+
assert_eq!(result.num_columns(), 0);
2928+
2929+
Ok(())
2930+
}
28922931
}

datafusion/physical-plan/src/projection.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,15 @@ pub fn try_embed_projection<Exec: EmbeddedProjection + 'static>(
563563
projection: &ProjectionExec,
564564
execution_plan: &Exec,
565565
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
566+
// If the projection has no expressions at all (e.g., ProjectionExec: expr=[]),
567+
// embed an empty projection into the execution plan so it outputs zero columns.
568+
// This avoids allocating throwaway null arrays for build-side columns
569+
// when no output columns are actually needed (e.g., count(1) over a right join).
570+
if projection.expr().is_empty() {
571+
let new_execution_plan = Arc::new(execution_plan.with_projection(Some(vec![]))?);
572+
return Ok(Some(new_execution_plan));
573+
}
574+
566575
// Collect all column indices from the given projection expressions.
567576
let projection_index = collect_column_indices(projection.expr());
568577

0 commit comments

Comments
 (0)