Skip to content

Commit 33e40b7

Browse files
szehon-hodongjoon-hyun
authored andcommitted
[SPARK-52991][SQL][FOLLOW-UP] Revise MergeIntoTable to use lazy val and add a new test
### What changes were proposed in this pull request? Minor follow up for #51698 1. Small optimization for MergeIntoTable node (aokolnychyi noticed post-commit that the new states can be calculated once using `lazy val`, as each time rules will copy the node so the state never changes). 2. Relocate `left`, `right`, `withNewChildrenInternal` 3. Add test to validate that default values work in schema evolution case. ### Why are the changes needed? Small analysis performance improvement and increase test coverage ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Run existing tests, and the patch adds another test ### Was this patch authored or co-authored using generative AI tooling? No Closes #52362 from szehon-ho/merge_schema_evolution_follow. Authored-by: Szehon Ho <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 067f295 commit 33e40b7

File tree

2 files changed

+69
-9
lines changed

2 files changed

+69
-9
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -883,21 +883,16 @@ case class MergeIntoTable(
883883
}
884884
}
885885

886-
def duplicateResolved: Boolean = targetTable.outputSet.intersect(sourceTable.outputSet).isEmpty
886+
lazy val duplicateResolved: Boolean =
887+
targetTable.outputSet.intersect(sourceTable.outputSet).isEmpty
887888

888-
def skipSchemaResolution: Boolean = targetTable match {
889+
lazy val skipSchemaResolution: Boolean = targetTable match {
889890
case r: NamedRelation => r.skipSchemaResolution
890891
case SubqueryAlias(_, r: NamedRelation) => r.skipSchemaResolution
891892
case _ => false
892893
}
893894

894-
override def left: LogicalPlan = targetTable
895-
override def right: LogicalPlan = sourceTable
896-
override protected def withNewChildrenInternal(
897-
newLeft: LogicalPlan, newRight: LogicalPlan): MergeIntoTable =
898-
copy(targetTable = newLeft, sourceTable = newRight)
899-
900-
def needSchemaEvolution: Boolean =
895+
lazy val needSchemaEvolution: Boolean =
901896
schemaEvolutionEnabled &&
902897
MergeIntoTable.schemaChanges(targetTable.schema, sourceTable.schema).nonEmpty
903898

@@ -907,6 +902,12 @@ case class MergeIntoTable(
907902
case _ => false
908903
}
909904
}
905+
906+
override def left: LogicalPlan = targetTable
907+
override def right: LogicalPlan = sourceTable
908+
override protected def withNewChildrenInternal(
909+
newLeft: LogicalPlan, newRight: LogicalPlan): MergeIntoTable =
910+
copy(targetTable = newLeft, sourceTable = newRight)
910911
}
911912

912913
object MergeIntoTable {

sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2332,6 +2332,65 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
23322332
}
23332333
}
23342334

2335+
test("Merge schema evolution replacing column with default value and set all column") {
2336+
Seq((true, true), (false, true), (true, false)).foreach {
2337+
case (withSchemaEvolution, schemaEvolutionEnabled) =>
2338+
withTempView("source") {
2339+
createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
2340+
"""{ "pk": 1, "salary": 100, "dep": "hr" }
2341+
|{ "pk": 2, "salary": 200, "dep": "software" }
2342+
|{ "pk": 3, "salary": 300, "dep": "hr" }
2343+
|{ "pk": 4, "salary": 400, "dep": "marketing" }
2344+
|{ "pk": 5, "salary": 500, "dep": "executive" }
2345+
|""".stripMargin)
2346+
2347+
if (!schemaEvolutionEnabled) {
2348+
sql(s"""ALTER TABLE $tableNameAsString SET TBLPROPERTIES
2349+
| ('auto-schema-evolution' = 'false')""".stripMargin)
2350+
}
2351+
sql(s"""ALTER TABLE $tableNameAsString ALTER COLUMN dep SET DEFAULT 'unknown'""")
2352+
2353+
val sourceDF = Seq((4, 150, true),
2354+
(5, 250, true),
2355+
(6, 350, false)).toDF("pk", "salary", "active")
2356+
sourceDF.createOrReplaceTempView("source")
2357+
2358+
val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else ""
2359+
sql(s"""MERGE $schemaEvolutionClause
2360+
|INTO $tableNameAsString t
2361+
|USING source s
2362+
|ON t.pk = s.pk
2363+
|WHEN MATCHED THEN
2364+
| UPDATE SET *
2365+
|WHEN NOT MATCHED THEN
2366+
| INSERT *
2367+
|""".stripMargin)
2368+
if (withSchemaEvolution && schemaEvolutionEnabled) {
2369+
checkAnswer(
2370+
sql(s"SELECT * FROM $tableNameAsString"),
2371+
Seq(
2372+
Row(1, 100, "hr", null),
2373+
Row(2, 200, "software", null),
2374+
Row(3, 300, "hr", null),
2375+
Row(4, 150, "marketing", true),
2376+
Row(5, 250, "executive", true),
2377+
Row(6, 350, "unknown", false)))
2378+
} else {
2379+
checkAnswer(
2380+
sql(s"SELECT * FROM $tableNameAsString"),
2381+
Seq(
2382+
Row(1, 100, "hr"),
2383+
Row(2, 200, "software"),
2384+
Row(3, 300, "hr"),
2385+
Row(4, 150, "marketing"),
2386+
Row(5, 250, "executive"),
2387+
Row(6, 350, "unknown")))
2388+
}
2389+
sql(s"DROP TABLE $tableNameAsString")
2390+
}
2391+
}
2392+
}
2393+
23352394
test("Merge schema evolution replacing column with set explicit column") {
23362395
Seq((true, true), (false, true), (true, false)).foreach {
23372396
case (withSchemaEvolution, schemaEvolutionEnabled) =>

0 commit comments

Comments
 (0)