Skip to content

[SPARK-56384][SS] Support stream-stream non-outer join in Update mode#55249

Open
HeartSaVioR wants to merge 4 commits intoapache:masterfrom
HeartSaVioR:SPARK-56384
Open

[SPARK-56384][SS] Support stream-stream non-outer join in Update mode#55249
HeartSaVioR wants to merge 4 commits intoapache:masterfrom
HeartSaVioR:SPARK-56384

Conversation

@HeartSaVioR
Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR commented Apr 7, 2026

What changes were proposed in this pull request?

This PR proposes to support stream-stream non-outer join (Inner/LeftSemi) in Update mode. The expected behavior of Inner/LeftSemi join in Update mode is simply the same with Append mode (since there is no point of early-firing), hence this is bound to the definition of Update mode, "if the query doesn't contain aggregations, it is equivalent to Append mode".

Why are the changes needed?

Previously stream-stream inner join cannot be used in update mode, where there could be some workload benefited by this. e.g. stream-stream inner join, followed by time window aggregation - this workload has different ability based on the output mode. On Update mode, we have early-firing result of time window aggregation from the input of stream-stream join. This PR unblocks the case.

Does this PR introduce any user-facing change?

Yes, stream-stream non-outer join will be supported with Update mode.

How was this patch tested?

New tests.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude 4.6 Opus

throwError(s"$joinType join between two streaming DataFrames/Datasets" +
s" is not supported in ${outputMode} output mode, only in Append output mode")
}
case _ =>
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we list the supported join types explicitly?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw we have pattern match at the very next which will handle the failure based on join types. This pattern match is mostly about checking compatibility with output mode.

condition = Some(attributeWithWatermark === attribute)),
OutputMode.Update())

// Left outer, right outer, full outer, left semi joins
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems a little redundant.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me see if this is the consistent pattern within file or we just added this only here. If it's consistent over the file, I guess we can just leave it as it is. If not we can remove it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the pattern is consistent in the file (that's what Claude said), let's leave it as it is.

Comment on lines +965 to +967
// dedup1: filters 1, 2 (already seen), passes only 4
// dedup2: filters 2, 3 (already seen), passes only 4
// join: only (4, 4) matches
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I feel like we don't need to have such verbose comments. The query is pretty short, and the test output should be verified to match the query anyway.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For stateful operators we tend to be very reader friendly, but for the case when watermark doesn't come into play, maybe it's very obvious to think through without guidance. We can remove comments for this test.

Comment on lines +940 to +941
.withColumnRenamed("value", "value1")
.withColumn("eventTime1", timestamp_seconds($"value1"))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: select?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withColumn is a syntax sugar to append a new column - it has been a best practice.

}

// Stream-stream non-outer join produces the same behavior between Append mode and Update mode.
// We only run a sanity test here rather than replicating the full Append mode test suite.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we run a loop over the existing append-mode test suite so all the test cases run for both append and update mode?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK that works for me. I'll see whether it is running too long (so that I need to have separate suite).

)
}

test("dedup on both sides -> stream-stream inner join, update mode") {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as below: if the behavior is the same, could we run a loop over test case with different output modes?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes let's do that.

// Left outer, right outer, full outer, left semi joins
Seq(LeftOuter, RightOuter, FullOuter, LeftSemi).foreach { joinType =>
// Update mode not allowed
// Left outer, right outer, full outer joins: Update mode not allowed
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Left outer, right outer, full outer joins: Update mode not allowed
// The behavior for unmatched rows in outer joins with update mode hasn't been defined yet.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's good for explaining rationale, but this is a test. If we want to have this comment I'd say we should have it in the place where we block the operation. I'll do that in UnsupportedOperationChecker and keep this comment as it is.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added that comment in UnsupportedOperationChecker.

import testImplicits._

test("windowed left semi join") {
testWithAppendAndUpdate("windowed left semi join") { outputMode =>
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not to be confused: we do not apply time window aggregation to come up with time window. It's just a projection. That's how this works in Update mode.

@HeartSaVioR HeartSaVioR requested a review from funrollloops April 9, 2026 01:59
@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

@funrollloops Would you mind taking a second look? Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants