Skip to content

Commit caf48ae

Browse files
rluvatonmbutrovich
authored andcommitted
fix: SortMergeJoin don't wait for all input before emitting (apache#20482)
N/A I noticed while playing around with local tests and debugging memory issue, that `SortMergeJoinStream` wait for all input before start emitting, which shouldn't be the case as we can emit early when we have enough data. also, this cause huge memory pressure Trying to fix the issue, not sure yet Yes ----- - [x] update docs - [x] finish fix
1 parent da4014d commit caf48ae

3 files changed

Lines changed: 554 additions & 23 deletions

File tree

datafusion/physical-plan/src/joins/sort_merge_join/stream.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ pub(super) enum SortMergeJoinState {
6868
Polling,
6969
/// Joining polled data and making output
7070
JoinOutput,
71+
/// Emit ready data if have any and then go back to [`Self::Init`] state
72+
EmitReadyThenInit,
7173
/// No more output
7274
Exhausted,
7375
}
@@ -830,13 +832,45 @@ impl Stream for SortMergeJoinStream {
830832
self.current_ordering = self.compare_streamed_buffered()?;
831833
self.state = SortMergeJoinState::JoinOutput;
832834
}
835+
SortMergeJoinState::EmitReadyThenInit => {
836+
// If have data to emit, emit it and if no more, change to next
837+
838+
// Verify metadata alignment before checking if we have batches to output
839+
self.joined_record_batches
840+
.filter_metadata
841+
.debug_assert_metadata_aligned();
842+
843+
// For filtered joins, skip output and let Init state handle it
844+
if needs_deferred_filtering(&self.filter, self.join_type) {
845+
self.state = SortMergeJoinState::Init;
846+
continue;
847+
}
848+
849+
// For non-filtered joins, only output if we have a completed batch
850+
// (opportunistic output when target batch size is reached)
851+
if self
852+
.joined_record_batches
853+
.joined_batches
854+
.has_completed_batch()
855+
{
856+
let record_batch = self
857+
.joined_record_batches
858+
.joined_batches
859+
.next_completed_batch()
860+
.expect("has_completed_batch was true");
861+
(&record_batch)
862+
.record_output(&self.join_metrics.baseline_metrics());
863+
return Poll::Ready(Some(Ok(record_batch)));
864+
}
865+
self.state = SortMergeJoinState::Init;
866+
}
833867
SortMergeJoinState::JoinOutput => {
834868
self.join_partial()?;
835869

836870
if self.num_unfrozen_pairs() < self.batch_size {
837871
if self.buffered_data.scanning_finished() {
838872
self.buffered_data.scanning_reset();
839-
self.state = SortMergeJoinState::Init;
873+
self.state = SortMergeJoinState::EmitReadyThenInit;
840874
}
841875
} else {
842876
self.freeze_all()?;

0 commit comments

Comments
 (0)