Skip to content

Commit b682bfe

Browse files
LiaCastanedaclaude
andcommitted
Fix cherry-pick of apache#21068: remove null-aware code and fix missing helpers
The cherry-pick of apache PR apache#21068 incorrectly included null-aware anti-join code (referencing nonexistent fields `null_aware`, `probe_side_non_empty`, `probe_side_has_null` on `HashJoinStream`/ `JoinLeftData`) from a different PR. Also fixes: - `.map()` -> `.hash_map()` to match this branch's `JoinLeftData` API - Replace `new_empty_schema_batch()` (undefined in this branch) with an inline `RecordBatch::try_new_with_options` equivalent Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent d89e9d0 commit b682bfe

2 files changed

Lines changed: 8 additions & 41 deletions

File tree

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ impl HashJoinStream {
410410
join_type: JoinType,
411411
left_data: &JoinLeftData,
412412
) -> HashJoinStreamState {
413-
if left_data.map().is_empty()
413+
if left_data.hash_map().is_empty()
414414
&& join_type.empty_build_side_produces_empty_result()
415415
{
416416
HashJoinStreamState::Completed
@@ -606,47 +606,9 @@ impl HashJoinStream {
606606

607607
let timer = self.join_metrics.join_time.timer();
608608

609-
// Null-aware anti join semantics:
610-
// For LeftAnti: output LEFT (build) rows where LEFT.key NOT IN RIGHT.key
611-
// 1. If RIGHT (probe) contains NULL in any batch, no LEFT rows should be output
612-
// 2. LEFT rows with NULL keys should not be output (handled in final stage)
613-
if self.null_aware {
614-
// Mark that we've seen a probe batch with actual rows (probe side is non-empty)
615-
// Only set this if batch has rows - empty batches don't count
616-
// Use shared atomic state so all partitions can see this global information
617-
if state.batch.num_rows() > 0 {
618-
build_side
619-
.left_data
620-
.probe_side_non_empty
621-
.store(true, Ordering::Relaxed);
622-
}
623-
624-
// Check if probe side (RIGHT) contains NULL
625-
// Since null_aware validation ensures single column join, we only check the first column
626-
let probe_key_column = &state.values[0];
627-
if probe_key_column.null_count() > 0 {
628-
// Found NULL in probe side - set shared flag to prevent any output
629-
build_side
630-
.left_data
631-
.probe_side_has_null
632-
.store(true, Ordering::Relaxed);
633-
}
634-
635-
// If probe side has NULL (detected in this or any other partition), return empty result
636-
if build_side
637-
.left_data
638-
.probe_side_has_null
639-
.load(Ordering::Relaxed)
640-
{
641-
timer.done();
642-
self.state = HashJoinStreamState::FetchProbeBatch;
643-
return Ok(StatefulStreamResult::Continue);
644-
}
645-
}
646-
647609
// If the build side is empty, this stream only reaches ProcessProbeBatch for
648610
// join types whose output still depends on probe rows.
649-
let is_empty = build_side.left_data.map().is_empty();
611+
let is_empty = build_side.left_data.hash_map().is_empty();
650612

651613
if is_empty {
652614
// Invariant: state_after_build_ready should have already completed

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1065,7 +1065,12 @@ pub(crate) fn build_batch_empty_build_side(
10651065
// The remaining joins return right-side rows and nulls for the left side.
10661066
let num_rows = probe_batch.num_rows();
10671067
if schema.fields().is_empty() {
1068-
return new_empty_schema_batch(schema, num_rows);
1068+
return RecordBatch::try_new_with_options(
1069+
Arc::new(schema.clone()),
1070+
vec![],
1071+
&RecordBatchOptions::new().with_row_count(Some(num_rows)),
1072+
)
1073+
.map_err(Into::into);
10691074
}
10701075

10711076
let columns = column_indices

0 commit comments

Comments
 (0)