Skip to content

Commit 4a36e6b

Browse files
LiaCastanedakosiewclaude
authored
Cherry-pick apache#21068 (#99)
* Skip probe-side consumption when hash join build side is empty (apache#21068) * Closes apache#20492. `HashJoinExec` currently continues polling and consuming the probe side even after the build side has completed with zero rows. For join types whose output is guaranteed to be empty when the build side is empty, this work is unnecessary. In practice, it can trigger large avoidable scans and extra compute despite producing no output. This is especially costly for cases such as INNER, LEFT, LEFT SEMI, LEFT ANTI, LEFT MARK, and RIGHT SEMI joins. This change makes the stream state machine aware of that condition so execution can terminate as soon as the build side is known to be empty and no probe rows are needed to determine the final result. The change also preserves the existing behavior for join types that still require probe-side rows even when the build side is empty, such as RIGHT, FULL, RIGHT ANTI, and RIGHT MARK joins. * Added `JoinType::empty_build_side_produces_empty_result` to centralize logic determining when an empty build side guarantees empty output. * Updated `HashJoinStream` state transitions to: * Skip transitioning to `FetchProbeBatch` when the build side is empty and output is deterministically empty. * Immediately complete the stream in such cases. * Refactored logic in `build_batch_empty_build_side` to reuse the new helper method and simplify match branches. * Ensured probe-side consumption still occurs for join types that require probe rows (e.g., RIGHT, FULL). * Added helper `state_after_build_ready` to unify post-build decision logic. * Introduced reusable helper for constructing hash joins with dynamic filters in tests. Yes, comprehensive tests have been added: * Verified that probe side is **not consumed** when: * Build side is empty * Join type guarantees empty output * Verified that probe side **is still consumed** when required by join semantics (e.g., RIGHT, FULL joins) * Covered both filtered and non-filtered joins * Added tests ensuring correct behavior with dynamic filters * Added regression test ensuring correct behavior after partition bounds reporting These tests validate both correctness and the intended optimization behavior. No API changes. However, this introduces a performance optimization: * Queries involving joins with empty build sides may complete significantly faster * Reduced unnecessary IO and compute No behavioral changes in query results. This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested. (cherry picked from commit 6c5e241) * Fix cherry-pick of apache#21068: remove null-aware code and fix missing helpers The cherry-pick of apache PR apache#21068 incorrectly included null-aware anti-join code (referencing nonexistent fields `null_aware`, `probe_side_non_empty`, `probe_side_has_null` on `HashJoinStream`/ `JoinLeftData`) from a different PR. Also fixes: - `.map()` -> `.hash_map()` to match this branch's `JoinLeftData` API - Replace `new_empty_schema_batch()` (undefined in this branch) with an inline `RecordBatch::try_new_with_options` equivalent Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: kosiew <kosiew@gmail.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 1a1fb91 commit 4a36e6b

4 files changed

Lines changed: 267 additions & 79 deletions

File tree

datafusion/common/src/join_type.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,20 @@ impl JoinType {
113113
| JoinType::RightMark
114114
)
115115
}
116+
117+
/// Returns true when an empty build side necessarily produces an empty
118+
/// result for this join type.
119+
pub fn empty_build_side_produces_empty_result(self) -> bool {
120+
matches!(
121+
self,
122+
JoinType::Inner
123+
| JoinType::Left
124+
| JoinType::LeftSemi
125+
| JoinType::LeftAnti
126+
| JoinType::LeftMark
127+
| JoinType::RightSemi
128+
)
129+
}
116130
}
117131

118132
impl Display for JoinType {

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 194 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1655,6 +1655,109 @@ mod tests {
16551655
)
16561656
}
16571657

1658+
fn empty_build_with_probe_error_inputs()
1659+
-> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>, JoinOn) {
1660+
let left_batch =
1661+
build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
1662+
let left_schema = left_batch.schema();
1663+
let left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
1664+
&[vec![left_batch]],
1665+
Arc::clone(&left_schema),
1666+
None,
1667+
)
1668+
.unwrap();
1669+
1670+
let err = exec_err!("bad data error");
1671+
let right_batch =
1672+
build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
1673+
let right_schema = right_batch.schema();
1674+
let on = vec![(
1675+
Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _,
1676+
Arc::new(Column::new_with_schema("b1", &right_schema).unwrap()) as _,
1677+
)];
1678+
let right: Arc<dyn ExecutionPlan> = Arc::new(
1679+
MockExec::new(vec![Ok(right_batch), err], right_schema).with_use_task(false),
1680+
);
1681+
1682+
(left, right, on)
1683+
}
1684+
1685+
async fn assert_empty_build_probe_behavior(
1686+
join_types: &[JoinType],
1687+
expect_probe_error: bool,
1688+
with_filter: bool,
1689+
) {
1690+
let (left, right, on) = empty_build_with_probe_error_inputs();
1691+
let filter = prepare_join_filter();
1692+
1693+
for join_type in join_types {
1694+
let join = if with_filter {
1695+
join_with_filter(
1696+
Arc::clone(&left),
1697+
Arc::clone(&right),
1698+
on.clone(),
1699+
filter.clone(),
1700+
join_type,
1701+
NullEquality::NullEqualsNothing,
1702+
)
1703+
.unwrap()
1704+
} else {
1705+
join(
1706+
Arc::clone(&left),
1707+
Arc::clone(&right),
1708+
on.clone(),
1709+
join_type,
1710+
NullEquality::NullEqualsNothing,
1711+
)
1712+
.unwrap()
1713+
};
1714+
1715+
let result = common::collect(
1716+
join.execute(0, Arc::new(TaskContext::default())).unwrap(),
1717+
)
1718+
.await;
1719+
1720+
if expect_probe_error {
1721+
let result_string = result.unwrap_err().to_string();
1722+
assert!(
1723+
result_string.contains("bad data error"),
1724+
"actual: {result_string}"
1725+
);
1726+
} else {
1727+
let batches = result.unwrap();
1728+
assert!(
1729+
batches.is_empty(),
1730+
"expected no output batches for {join_type}, got {batches:?}"
1731+
);
1732+
}
1733+
}
1734+
}
1735+
1736+
fn hash_join_with_dynamic_filter(
1737+
left: Arc<dyn ExecutionPlan>,
1738+
right: Arc<dyn ExecutionPlan>,
1739+
on: JoinOn,
1740+
join_type: JoinType,
1741+
) -> Result<(HashJoinExec, Arc<DynamicFilterPhysicalExpr>)> {
1742+
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
1743+
let mut join = HashJoinExec::try_new(
1744+
left,
1745+
right,
1746+
on,
1747+
None,
1748+
&join_type,
1749+
None,
1750+
PartitionMode::CollectLeft,
1751+
NullEquality::NullEqualsNothing,
1752+
)?;
1753+
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
1754+
filter: Arc::clone(&dynamic_filter),
1755+
build_accumulator: OnceLock::new(),
1756+
});
1757+
1758+
Ok((join, dynamic_filter))
1759+
}
1760+
16581761
async fn join_collect(
16591762
left: Arc<dyn ExecutionPlan>,
16601763
right: Arc<dyn ExecutionPlan>,
@@ -4187,6 +4290,70 @@ mod tests {
41874290
}
41884291
}
41894292

4293+
#[tokio::test]
4294+
async fn join_does_not_consume_probe_when_empty_build_fixes_output() {
4295+
assert_empty_build_probe_behavior(
4296+
&[
4297+
JoinType::Inner,
4298+
JoinType::Left,
4299+
JoinType::LeftSemi,
4300+
JoinType::LeftAnti,
4301+
JoinType::LeftMark,
4302+
JoinType::RightSemi,
4303+
],
4304+
false,
4305+
false,
4306+
)
4307+
.await;
4308+
}
4309+
4310+
#[tokio::test]
4311+
async fn join_does_not_consume_probe_when_empty_build_fixes_output_with_filter() {
4312+
assert_empty_build_probe_behavior(
4313+
&[
4314+
JoinType::Inner,
4315+
JoinType::Left,
4316+
JoinType::LeftSemi,
4317+
JoinType::LeftAnti,
4318+
JoinType::LeftMark,
4319+
JoinType::RightSemi,
4320+
],
4321+
false,
4322+
true,
4323+
)
4324+
.await;
4325+
}
4326+
4327+
#[tokio::test]
4328+
async fn join_still_consumes_probe_when_empty_build_needs_probe_rows() {
4329+
assert_empty_build_probe_behavior(
4330+
&[
4331+
JoinType::Right,
4332+
JoinType::Full,
4333+
JoinType::RightAnti,
4334+
JoinType::RightMark,
4335+
],
4336+
true,
4337+
false,
4338+
)
4339+
.await;
4340+
}
4341+
4342+
#[tokio::test]
4343+
async fn join_still_consumes_probe_when_empty_build_needs_probe_rows_with_filter() {
4344+
assert_empty_build_probe_behavior(
4345+
&[
4346+
JoinType::Right,
4347+
JoinType::Full,
4348+
JoinType::RightAnti,
4349+
JoinType::RightMark,
4350+
],
4351+
true,
4352+
true,
4353+
)
4354+
.await;
4355+
}
4356+
41904357
#[tokio::test]
41914358
async fn join_split_batch() {
41924359
let left = build_table(
@@ -4629,33 +4796,16 @@ mod tests {
46294796
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
46304797
)];
46314798

4632-
// Create a dynamic filter manually
4633-
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
4634-
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
4635-
4636-
// Create HashJoinExec with the dynamic filter
4637-
let mut join = HashJoinExec::try_new(
4638-
left,
4639-
right,
4640-
on,
4641-
None,
4642-
&JoinType::Inner,
4643-
None,
4644-
PartitionMode::CollectLeft,
4645-
NullEquality::NullEqualsNothing,
4646-
)?;
4647-
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
4648-
filter: dynamic_filter,
4649-
build_accumulator: OnceLock::new(),
4650-
});
4799+
let (join, dynamic_filter) =
4800+
hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;
46514801

46524802
// Execute the join
46534803
let stream = join.execute(0, task_ctx)?;
46544804
let _batches = common::collect(stream).await?;
46554805

46564806
// After the join completes, the dynamic filter should be marked as complete
46574807
// wait_complete() should return immediately
4658-
dynamic_filter_clone.wait_complete().await;
4808+
dynamic_filter.wait_complete().await;
46594809

46604810
Ok(())
46614811
}
@@ -4677,33 +4827,37 @@ mod tests {
46774827
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
46784828
)];
46794829

4680-
// Create a dynamic filter manually
4681-
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
4682-
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
4683-
4684-
// Create HashJoinExec with the dynamic filter
4685-
let mut join = HashJoinExec::try_new(
4686-
left,
4687-
right,
4688-
on,
4689-
None,
4690-
&JoinType::Inner,
4691-
None,
4692-
PartitionMode::CollectLeft,
4693-
NullEquality::NullEqualsNothing,
4694-
)?;
4695-
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
4696-
filter: dynamic_filter,
4697-
build_accumulator: OnceLock::new(),
4698-
});
4830+
let (join, dynamic_filter) =
4831+
hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;
46994832

47004833
// Execute the join
47014834
let stream = join.execute(0, task_ctx)?;
47024835
let _batches = common::collect(stream).await?;
47034836

47044837
// Even with empty build side, the dynamic filter should be marked as complete
47054838
// wait_complete() should return immediately
4706-
dynamic_filter_clone.wait_complete().await;
4839+
dynamic_filter.wait_complete().await;
4840+
4841+
Ok(())
4842+
}
4843+
4844+
#[tokio::test]
4845+
async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report()
4846+
-> Result<()> {
4847+
let task_ctx = Arc::new(TaskContext::default());
4848+
let (left, right, on) = empty_build_with_probe_error_inputs();
4849+
4850+
// Keep an extra consumer reference so execute() enables dynamic filter pushdown
4851+
// and enters the WaitPartitionBoundsReport path before deciding whether to poll
4852+
// the probe side.
4853+
let (join, dynamic_filter) =
4854+
hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;
4855+
4856+
let stream = join.execute(0, task_ctx)?;
4857+
let batches = common::collect(stream).await?;
4858+
assert!(batches.is_empty());
4859+
4860+
dynamic_filter.wait_complete().await;
47074861

47084862
Ok(())
47094863
}

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,21 @@ impl HashJoinStream {
404404
}
405405
}
406406

407+
/// Returns the next state after the build side has been fully collected
408+
/// and any required build-side coordination has completed.
409+
fn state_after_build_ready(
410+
join_type: JoinType,
411+
left_data: &JoinLeftData,
412+
) -> HashJoinStreamState {
413+
if left_data.hash_map().is_empty()
414+
&& join_type.empty_build_side_produces_empty_result()
415+
{
416+
HashJoinStreamState::Completed
417+
} else {
418+
HashJoinStreamState::FetchProbeBatch
419+
}
420+
}
421+
407422
/// Separate implementation function that unpins the [`HashJoinStream`] so
408423
/// that partial borrows work correctly
409424
fn poll_next_impl(
@@ -462,7 +477,9 @@ impl HashJoinStream {
462477
if let Some(ref mut fut) = self.build_waiter {
463478
ready!(fut.get_shared(cx))?;
464479
}
465-
self.state = HashJoinStreamState::FetchProbeBatch;
480+
let build_side = self.build_side.try_as_ready()?;
481+
self.state =
482+
Self::state_after_build_ready(self.join_type, build_side.left_data.as_ref());
466483
Poll::Ready(Ok(StatefulStreamResult::Continue))
467484
}
468485

@@ -529,7 +546,8 @@ impl HashJoinStream {
529546
}));
530547
self.state = HashJoinStreamState::WaitPartitionBoundsReport;
531548
} else {
532-
self.state = HashJoinStreamState::FetchProbeBatch;
549+
self.state =
550+
Self::state_after_build_ready(self.join_type, left_data.as_ref());
533551
}
534552

535553
self.build_side = BuildSide::Ready(BuildSideReadyState { left_data });
@@ -588,8 +606,14 @@ impl HashJoinStream {
588606

589607
let timer = self.join_metrics.join_time.timer();
590608

591-
// if the left side is empty, we can skip the (potentially expensive) join operation
592-
if build_side.left_data.hash_map.is_empty() && self.filter.is_none() {
609+
// If the build side is empty, this stream only reaches ProcessProbeBatch for
610+
// join types whose output still depends on probe rows.
611+
let is_empty = build_side.left_data.hash_map().is_empty();
612+
613+
if is_empty {
614+
// Invariant: state_after_build_ready should have already completed
615+
// join types whose result is fixed to empty when the build side is empty.
616+
debug_assert!(!self.join_type.empty_build_side_produces_empty_result());
593617
let result = build_batch_empty_build_side(
594618
&self.schema,
595619
build_side.left_data.batch(),

0 commit comments

Comments
 (0)