-
Notifications
You must be signed in to change notification settings - Fork 2.1k
fix: SortMergeJoin don't wait for all input before emitting #20482
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
ba87c51
d028236
8104d55
814afab
4ca5ea3
bec4f21
86d0578
0a5cd16
7af653b
6bc1e0b
93be13f
869264e
3205c1a
d87058d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,14 +62,16 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExprRef; | |
| use futures::{Stream, StreamExt}; | ||
|
|
||
| /// State of SMJ stream | ||
| #[derive(Debug, PartialEq, Eq)] | ||
| #[derive(Debug, PartialEq, Eq, Clone)] | ||
| pub(super) enum SortMergeJoinState { | ||
| /// Init joining with a new streamed row or a new buffered batches | ||
| Init, | ||
| /// Polling one streamed row or one buffered batch, or both | ||
| Polling, | ||
| /// Joining polled data and making output | ||
| JoinOutput, | ||
| /// Emit ready data if have any | ||
| EmitReady { next_state: Box<SortMergeJoinState> }, | ||
| /// No more output | ||
| Exhausted, | ||
| } | ||
|
|
@@ -598,13 +600,49 @@ impl Stream for SortMergeJoinStream { | |
| self.current_ordering = self.compare_streamed_buffered()?; | ||
| self.state = SortMergeJoinState::JoinOutput; | ||
| } | ||
| SortMergeJoinState::EmitReady { next_state } => { | ||
| // If have data to emit, emit it and if no more, change to next | ||
|
|
||
| // Verify metadata alignment before checking if we have batches to output | ||
| self.joined_record_batches | ||
| .filter_metadata | ||
| .debug_assert_metadata_aligned(); | ||
|
|
||
| // For filtered joins, skip output and let Init state handle it | ||
| if needs_deferred_filtering(&self.filter, self.join_type) { | ||
| self.state = next_state.as_ref().clone(); | ||
| continue; | ||
| } | ||
|
|
||
| let maybe_next = next_state.as_ref().clone(); | ||
|
|
||
| // For non-filtered joins, only output if we have a completed batch | ||
| // (opportunistic output when target batch size is reached) | ||
| if self | ||
| .joined_record_batches | ||
| .joined_batches | ||
| .has_completed_batch() | ||
| { | ||
| let record_batch = self | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a point in not doing a let Some() on next_completed_batch()?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. next refactor I have already |
||
| .joined_record_batches | ||
| .joined_batches | ||
| .next_completed_batch() | ||
| .expect("has_completed_batch was true"); | ||
| (&record_batch) | ||
| .record_output(&self.join_metrics.baseline_metrics()); | ||
| return Poll::Ready(Some(Ok(record_batch))); | ||
| } | ||
| self.state = maybe_next; | ||
| } | ||
| SortMergeJoinState::JoinOutput => { | ||
| self.join_partial()?; | ||
|
|
||
| if self.num_unfrozen_pairs() < self.batch_size { | ||
| if self.buffered_data.scanning_finished() { | ||
| self.buffered_data.scanning_reset(); | ||
| self.state = SortMergeJoinState::Init; | ||
| self.state = SortMergeJoinState::EmitReady { | ||
| next_state: Box::new(SortMergeJoinState::Init), | ||
| }; | ||
| } | ||
| } else { | ||
| self.freeze_all()?; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the
EmitReadysomewhat repeatsJoinOutput.JoinOutputslightly tricky as it includes 2 sub states internally -> partial join and then output.btw from this comment
looks like the idea was correct releasing batches for nonfiltered SMJ as soon as they matched but somehow it didn't work.
Introducing new state would make the entire complicated mechanism even more complicated, however memory leak is important thing to consider. @rluvaton do you feel we can reuse
JoinOutputstate and see what is holding batches to be released?If we can do it fast then nice, if no, we can go with this PR and refactor this later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried to find why it did not emit early but it was hard to track
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found the name
EmitReadya bit vague. Could we make it more specific, e.g.EmitReadyThenInit?From the current usage, this state is only entered to opportunistically emit a completed batch and then continue with
Init(afterbuffered_data.scanning_finished()/scanning_reset()).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed