Skip to content

Commit ce646b3

Browse files
szehon-hocloud-fan
authored andcommitted
[SPARK-52991][SQL] Implement MERGE INTO with SCHEMA EVOLUTION for V2 Data Source
### What changes were proposed in this pull request? Add support for schema evolution for data source that support MERGE INTO, currently V2 DataSources. This means that if the SOURCE table of merge has a different schema than TARGET table, the TARGET table can automatically update to take into account the new or different fields. The basic idea is to add - TableCapability.MERGE_SCHEMA_EVOLUTION to indicate DSV2 table wants Spark to handle schema evolution for MERGE - ResolveMergeIntoSchemaEvolution rule, will generate DSV2 TableChanges and calls Catalog.alterTable For any new field in the top level or in a nested struct, Spark will add the field to the end. TODOS: 1. this currently does not support the case where SOURCE has a missing nested field from TARGET, and there is a UPDATE or INSERT star. Example: ``` MERGE INTO TARGET t USING SOURCE s // s=struct('a', struct('b': Int)) // t = struct('a', struct('c', int)) ``` will only work if the user specifies a value explicitly for the new nested field t.b for INSERT and UPDATE, ie ``` INSERT (s) VALUES (nested_struct('a', nested_struct('b', 1, 'c' 2))) UPDATE SET a.b = 2 ``` and not if they use INSERT * or UPDATE SET *. 2. Type widening is not allowed for the moment, as we need to decide what widenings to allow We can take this in a follow on pr. ### Why are the changes needed? #45748 added the syntax 'WITH SCHEMA EVOLUTION' to 'MERGE INTO'. However, this requires some external Spark extension to resolve Merge, and doesnt do anything in Spark's native MERGE implementation. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added many tests to MergeIntoTableSuiteBase ### Was this patch authored or co-authored using generative AI tooling? No Closes #51698 from szehon-ho/merge_schema_evolution. Authored-by: Szehon Ho <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 1d84810 commit ce646b3

File tree

12 files changed

+868
-33
lines changed

12 files changed

+868
-33
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6731,6 +6731,12 @@
67316731
},
67326732
"sqlState" : "0A000"
67336733
},
6734+
"UNSUPPORTED_TABLE_CHANGE_IN_AUTO_SCHEMA_EVOLUTION" : {
6735+
"message" : [
6736+
"The table changes <changes> are not supported by the catalog on table <tableName>."
6737+
],
6738+
"sqlState" : "42000"
6739+
},
67346740
"UNSUPPORTED_TABLE_CHANGE_IN_JDBC_CATALOG" : {
67356741
"message" : [
67366742
"The table change <change> is not supported for the JDBC catalog on table <tableName>. Supported changes include: AddColumn, RenameColumn, DeleteColumn, UpdateColumnType, UpdateColumnNullability."

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCapability.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ public enum TableCapability {
9393
*/
9494
ACCEPT_ANY_SCHEMA,
9595

96+
/**
97+
* Signals that table supports Spark altering the schema if necessary
98+
* as part of an operation.
99+
*/
100+
AUTOMATIC_SCHEMA_EVOLUTION,
101+
96102
/**
97103
* Signals that the table supports append writes using the V1 InsertableRelation interface.
98104
* <p>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
444444
AddMetadataColumns ::
445445
DeduplicateRelations ::
446446
ResolveCollationName ::
447+
ResolveMergeIntoSchemaEvolution ::
447448
new ResolveReferences(catalogManager) ::
448449
// Please do not insert any other rules in between. See the TODO comments in rule
449450
// ResolveLateralColumnAliasReference for more details.
@@ -1669,7 +1670,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
16691670
case u: UpdateTable => resolveReferencesInUpdate(u)
16701671

16711672
case m @ MergeIntoTable(targetTable, sourceTable, _, _, _, _, _)
1672-
if !m.resolved && targetTable.resolved && sourceTable.resolved =>
1673+
if !m.resolved && targetTable.resolved && sourceTable.resolved && !m.needSchemaEvolution =>
16731674

16741675
EliminateSubqueryAliases(targetTable) match {
16751676
case r: NamedRelation if r.skipSchemaResolution =>
@@ -1692,9 +1693,12 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
16921693
// The update value can access columns from both target and source tables.
16931694
resolveAssignments(assignments, m, MergeResolvePolicy.BOTH))
16941695
case UpdateStarAction(updateCondition) =>
1695-
val assignments = targetTable.output.map { attr =>
1696-
Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
1697-
}
1696+
// Use only source columns. Missing columns in target will be handled in
1697+
// ResolveRowLevelCommandAssignments.
1698+
val assignments = targetTable.output.flatMap{ targetAttr =>
1699+
sourceTable.output.find(
1700+
sourceCol => conf.resolver(sourceCol.name, targetAttr.name))
1701+
.map(Assignment(targetAttr, _))}
16981702
UpdateAction(
16991703
updateCondition.map(resolveExpressionByPlanChildren(_, m)),
17001704
// For UPDATE *, the value must be from source table.
@@ -1715,9 +1719,12 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
17151719
// access columns from the source table.
17161720
val resolvedInsertCondition = insertCondition.map(
17171721
resolveExpressionByPlanOutput(_, m.sourceTable))
1718-
val assignments = targetTable.output.map { attr =>
1719-
Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
1720-
}
1722+
// Use only source columns. Missing columns in target will be handled in
1723+
// ResolveRowLevelCommandAssignments.
1724+
val assignments = targetTable.output.flatMap{ targetAttr =>
1725+
sourceTable.output.find(
1726+
sourceCol => conf.resolver(sourceCol.name, targetAttr.name))
1727+
.map(Assignment(targetAttr, _))}
17211728
InsertAction(
17221729
resolvedInsertCondition,
17231730
resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE))
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.sql.catalyst.plans.logical._
21+
import org.apache.spark.sql.catalyst.rules.Rule
22+
import org.apache.spark.sql.catalyst.types.DataTypeUtils
23+
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog}
24+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
25+
import org.apache.spark.sql.errors.QueryCompilationErrors
26+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
27+
28+
29+
/**
30+
* A rule that resolves schema evolution for MERGE INTO.
31+
*
32+
* This rule will call the DSV2 Catalog to update the schema of the target table.
33+
*/
34+
object ResolveMergeIntoSchemaEvolution extends Rule[LogicalPlan] {
35+
36+
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
37+
case m @ MergeIntoTable(_, _, _, _, _, _, _)
38+
if m.needSchemaEvolution =>
39+
val newTarget = m.targetTable.transform {
40+
case r : DataSourceV2Relation => performSchemaEvolution(r, m.sourceTable)
41+
}
42+
m.copy(targetTable = newTarget)
43+
}
44+
45+
private def performSchemaEvolution(relation: DataSourceV2Relation, source: LogicalPlan)
46+
: DataSourceV2Relation = {
47+
(relation.catalog, relation.identifier) match {
48+
case (Some(c: TableCatalog), Some(i)) =>
49+
val changes = MergeIntoTable.schemaChanges(relation.schema, source.schema)
50+
c.alterTable(i, changes: _*)
51+
val newTable = c.loadTable(i)
52+
val newSchema = CatalogV2Util.v2ColumnsToStructType(newTable.columns())
53+
// Check if there are any remaining changes not applied.
54+
val remainingChanges = MergeIntoTable.schemaChanges(newSchema, source.schema)
55+
if (remainingChanges.nonEmpty) {
56+
throw QueryCompilationErrors.unsupportedTableChangesInAutoSchemaEvolutionError(
57+
remainingChanges, i.toQualifiedNameParts(c))
58+
}
59+
relation.copy(table = newTable, output = DataTypeUtils.toAttributes(newSchema))
60+
case _ => logWarning(s"Schema Evolution enabled but data source $relation " +
61+
s"does not support it, skipping.")
62+
relation
63+
}
64+
}
65+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,17 @@ object ResolveRowLevelCommandAssignments extends Rule[LogicalPlan] {
4848
case u: UpdateTable if !u.skipSchemaResolution && u.resolved && !u.aligned =>
4949
resolveAssignments(u)
5050

51-
case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved && m.rewritable && !m.aligned =>
51+
case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved && m.rewritable && !m.aligned &&
52+
!m.needSchemaEvolution =>
5253
validateStoreAssignmentPolicy()
5354
m.copy(
5455
targetTable = cleanAttrMetadata(m.targetTable),
5556
matchedActions = alignActions(m.targetTable.output, m.matchedActions),
5657
notMatchedActions = alignActions(m.targetTable.output, m.notMatchedActions),
5758
notMatchedBySourceActions = alignActions(m.targetTable.output, m.notMatchedBySourceActions))
5859

59-
case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved && !m.aligned =>
60+
case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved && !m.aligned
61+
&& !m.needSchemaEvolution =>
6062
resolveAssignments(m)
6163
}
6264

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper
4545

4646
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
4747
case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, notMatchedActions,
48-
notMatchedBySourceActions, _) if m.resolved && m.rewritable && m.aligned &&
49-
matchedActions.isEmpty && notMatchedActions.size == 1 &&
48+
notMatchedBySourceActions, _) if m.resolved && m.rewritable && m.aligned &&
49+
!m.needSchemaEvolution && matchedActions.isEmpty && notMatchedActions.size == 1 &&
5050
notMatchedBySourceActions.isEmpty =>
5151

5252
EliminateSubqueryAliases(aliasedTable) match {
@@ -79,7 +79,8 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper
7979
}
8080

8181
case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, notMatchedActions,
82-
notMatchedBySourceActions, _) if m.resolved && m.rewritable && m.aligned &&
82+
notMatchedBySourceActions, _)
83+
if m.resolved && m.rewritable && m.aligned && !m.needSchemaEvolution &&
8384
matchedActions.isEmpty && notMatchedBySourceActions.isEmpty =>
8485

8586
EliminateSubqueryAliases(aliasedTable) match {
@@ -120,7 +121,8 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper
120121
}
121122

122123
case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, notMatchedActions,
123-
notMatchedBySourceActions, _) if m.resolved && m.rewritable && m.aligned =>
124+
notMatchedBySourceActions, _)
125+
if m.resolved && m.rewritable && m.aligned && !m.needSchemaEvolution =>
124126

125127
EliminateSubqueryAliases(aliasedTable) match {
126128
case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, _, _) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@ import org.apache.spark.sql.connector.expressions.filter.Predicate
3838
import org.apache.spark.sql.connector.write.{DeltaWrite, RowLevelOperation, RowLevelOperationTable, SupportsDelta, Write}
3939
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.{DELETE, MERGE, UPDATE}
4040
import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
41+
import org.apache.spark.sql.errors.QueryExecutionErrors
4142
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
42-
import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, MapType, MetadataBuilder, StringType, StructType}
43+
import org.apache.spark.sql.internal.SQLConf
44+
import org.apache.spark.sql.types.{ArrayType, BooleanType, DataType, IntegerType, MapType, MetadataBuilder, StringType, StructField, StructType}
4345
import org.apache.spark.util.ArrayImplicits._
4446
import org.apache.spark.util.Utils
4547

@@ -894,6 +896,17 @@ case class MergeIntoTable(
894896
override protected def withNewChildrenInternal(
895897
newLeft: LogicalPlan, newRight: LogicalPlan): MergeIntoTable =
896898
copy(targetTable = newLeft, sourceTable = newRight)
899+
900+
def needSchemaEvolution: Boolean =
901+
schemaEvolutionEnabled &&
902+
MergeIntoTable.schemaChanges(targetTable.schema, sourceTable.schema).nonEmpty
903+
904+
private def schemaEvolutionEnabled: Boolean = withSchemaEvolution && {
905+
EliminateSubqueryAliases(targetTable) match {
906+
case r: DataSourceV2Relation if r.autoSchemaEvolution() => true
907+
case _ => false
908+
}
909+
}
897910
}
898911

899912
object MergeIntoTable {
@@ -909,6 +922,69 @@ object MergeIntoTable {
909922
}
910923
privileges.toSeq
911924
}
925+
926+
def schemaChanges(
927+
originalTarget: StructType,
928+
originalSource: StructType,
929+
fieldPath: Array[String] = Array()): Array[TableChange] = {
930+
schemaChanges(originalTarget, originalSource, originalTarget, originalSource, fieldPath)
931+
}
932+
933+
private def schemaChanges(
934+
current: DataType,
935+
newType: DataType,
936+
originalTarget: StructType,
937+
originalSource: StructType,
938+
fieldPath: Array[String]): Array[TableChange] = {
939+
(current, newType) match {
940+
case (StructType(currentFields), StructType(newFields)) =>
941+
val newFieldMap = toFieldMap(newFields)
942+
943+
// Update existing field types
944+
val updates = {
945+
currentFields collect {
946+
case currentField: StructField if newFieldMap.contains(currentField.name) =>
947+
schemaChanges(currentField.dataType, newFieldMap(currentField.name).dataType,
948+
originalTarget, originalSource, fieldPath ++ Seq(currentField.name))
949+
}}.flatten
950+
951+
// Identify the newly added fields and append to the end
952+
val currentFieldMap = toFieldMap(currentFields)
953+
val adds = newFields.filterNot (f => currentFieldMap.contains (f.name))
954+
.map(f => TableChange.addColumn(fieldPath ++ Set(f.name), f.dataType))
955+
956+
updates ++ adds
957+
958+
case (ArrayType(currentElementType, _), ArrayType(newElementType, _)) =>
959+
schemaChanges(currentElementType, newElementType,
960+
originalTarget, originalSource, fieldPath ++ Seq("element"))
961+
962+
case (MapType(currentKeyType, currentElementType, _),
963+
MapType(updateKeyType, updateElementType, _)) =>
964+
schemaChanges(currentKeyType, updateKeyType, originalTarget, originalSource,
965+
fieldPath ++ Seq("key")) ++
966+
schemaChanges(currentElementType, updateElementType,
967+
originalTarget, originalSource, fieldPath ++ Seq("value"))
968+
969+
case (currentType, newType) if currentType == newType =>
970+
// No change needed
971+
Array.empty[TableChange]
972+
973+
case _ =>
974+
// For now do not support type widening
975+
throw QueryExecutionErrors.failedToMergeIncompatibleSchemasError(
976+
originalTarget, originalSource, null)
977+
}
978+
}
979+
980+
def toFieldMap(fields: Array[StructField]): Map[String, StructField] = {
981+
val fieldMap = fields.map(field => field.name -> field).toMap
982+
if (SQLConf.get.caseSensitiveAnalysis) {
983+
fieldMap
984+
} else {
985+
CaseInsensitiveMap(fieldMap)
986+
}
987+
}
912988
}
913989

914990
sealed abstract class MergeAction extends Expression with Unevaluable {

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3348,6 +3348,15 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
33483348
"change" -> change.toString, "tableName" -> toSQLId(sanitizedTableName)))
33493349
}
33503350

3351+
def unsupportedTableChangesInAutoSchemaEvolutionError(
3352+
changes: Array[TableChange], tableName: Seq[String]): Throwable = {
3353+
val sanitizedTableName = tableName.map(_.replaceAll("\"", ""))
3354+
new AnalysisException(
3355+
errorClass = "UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION",
3356+
messageParameters = Map(
3357+
"changes" -> changes.mkString(","), "tableName" -> toSQLId(sanitizedTableName)))
3358+
}
3359+
33513360
def pathOptionNotSetCorrectlyWhenReadingError(): Throwable = {
33523361
new AnalysisException(
33533362
errorClass = "_LEGACY_ERROR_TEMP_1306",

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,9 @@ case class DataSourceV2Relation(
126126
this
127127
}
128128
}
129+
130+
def autoSchemaEvolution(): Boolean =
131+
table.capabilities().contains(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION)
129132
}
130133

131134
/**

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ abstract class InMemoryBaseTable(
134134
properties.getOrDefault("allow-unsupported-transforms", "false").toBoolean
135135

136136
private val acceptAnySchema = properties.getOrDefault("accept-any-schema", "false").toBoolean
137+
private val autoSchemaEvolution = properties.getOrDefault("auto-schema-evolution", "true")
138+
.toBoolean
137139

138140
partitioning.foreach {
139141
case _: IdentityTransform =>
@@ -349,13 +351,11 @@ abstract class InMemoryBaseTable(
349351
TableCapability.OVERWRITE_DYNAMIC,
350352
TableCapability.TRUNCATE)
351353

352-
override def capabilities(): util.Set[TableCapability] = {
353-
if (acceptAnySchema) {
354-
(baseCapabiilities ++ Set(TableCapability.ACCEPT_ANY_SCHEMA)).asJava
355-
} else {
356-
baseCapabiilities.asJava
357-
}
358-
}
354+
override def capabilities(): util.Set[TableCapability] =
355+
(baseCapabiilities ++
356+
(if (acceptAnySchema) Seq(TableCapability.ACCEPT_ANY_SCHEMA) else Seq.empty) ++
357+
(if (autoSchemaEvolution) Seq(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION) else Seq.empty))
358+
.asJava
359359

360360
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
361361
new InMemoryScanBuilder(schema, options)

0 commit comments

Comments
 (0)