Skip to content

Commit 73dd6ed

Browse files
johanl-dbaokolnychyi
authored andcommitted
[SPARK-55690] Schema evolution in DSv2 AppendData, OverwriteByExpression, OverwritePartitionsDynamic
### What changes were proposed in this pull request? Adds support for schema evolution during INSERT operations (AppendData, OverwriteByExpression, OverwritePartitionsDynamic) When the table reports capability `AUTOMATIC_SCHEMA_EVOLUTION`, a new analyzer rule `ResolveInsertSchemaEvolution` collects new columns and nested fields present in the source query but not in the table schema, and adds them to the target table by calling `catalog.alterTable()` Identifying new columns/fields respects the resolution semantics of INSERT operations: matching fields by-name vs by-position. This builds on previous from szehon-ho , in particular #51698. The first two commits move this previous code around to reuse it, the core of the implementation is in the [third commit](7be9d2a). ### Why are the changes needed? The `WITH SCHEMA EVOLUTION` syntax for SQL inserts was added recently: #53732. This actually implements schema evolution behind this syntax. ### Does this PR introduce _any_ user-facing change? Yes, when the `WITH SCHEMA EVOLUTION` clause is specified in SQL INSERT operations, new columns and nested fields in the source data will be added to the target table - assuming the data source supports schema evolution (capability AUTOMATIC_SCHEMA_EVOLUTION): ``` CREATE TABLE target (id INT); INSERT INTO target VALUES (1); INSERT WITH SCHEMA EVOLUTION INTO target SELECT 2 AS id, "two" AS value; SELECT * FROM target; | id | value | |----|-------| | 1 | null | | 2 | "two" | ``` ### How was this patch tested? Added basic testing in `DataSourceV2SQLSuite`. Integrated with Delta and ran extensive Delta test harness for schema evolution against this implementation. See delta-io/delta#6140. A number of expected failures for tests that would need to be updated on Delta side (different error class returned, negative tests checking something specifically doesn't work if a fix is disabled, ..) Closes #54488 from johanl-db/dsv2-schema-evolution-insert. Authored-by: Johan Lasperas <[email protected]> Signed-off-by: Anton Okolnychyi <[email protected]>
1 parent cbbbd41 commit 73dd6ed

File tree

16 files changed

+1041
-322
lines changed

16 files changed

+1041
-322
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7126,6 +7126,11 @@
71267126
"message" : [
71277127
"The catalog did not support or only partially applied the following changes: <changes>."
71287128
]
7129+
},
7130+
"TABLE_NOT_SUPPORTED" : {
7131+
"message" : [
7132+
"The write target does not support automatic schema evolution."
7133+
]
71297134
}
71307135
},
71317136
"sqlState" : "42000"

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ class Analyzer(
450450
AddMetadataColumns ::
451451
DeduplicateRelations ::
452452
ResolveCollationName ::
453-
ResolveMergeIntoSchemaEvolution ::
453+
ResolveSchemaEvolution ::
454454
ValidateEventTimeWatermarkColumn ::
455455
new ResolveReferences(catalogManager) ::
456456
// Please do not insert any other rules in between. See the TODO comments in rule
@@ -1167,9 +1167,6 @@ class Analyzer(
11671167
val partCols = partitionColumnNames(r.table)
11681168
validatePartitionSpec(partCols, i.partitionSpec)
11691169

1170-
val schemaEvolutionWriteOption: Map[String, String] =
1171-
if (i.withSchemaEvolution) Map("mergeSchema" -> "true") else Map.empty
1172-
11731170
val staticPartitions = i.partitionSpec.filter(_._2.isDefined).transform((_, v) => v.get)
11741171
val query = addStaticPartitionColumns(r, projectByName.getOrElse(i.query), staticPartitions,
11751172
isByName)
@@ -1179,38 +1176,38 @@ class Analyzer(
11791176
AppendData.byName(
11801177
r,
11811178
query,
1182-
writeOptions = schemaEvolutionWriteOption)
1179+
withSchemaEvolution = i.withSchemaEvolution)
11831180
} else {
11841181
AppendData.byPosition(
11851182
r,
11861183
query,
1187-
writeOptions = schemaEvolutionWriteOption)
1184+
withSchemaEvolution = i.withSchemaEvolution)
11881185
}
11891186
} else if (conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC) {
11901187
if (isByName) {
11911188
OverwritePartitionsDynamic.byName(
11921189
r,
11931190
query,
1194-
writeOptions = schemaEvolutionWriteOption)
1191+
withSchemaEvolution = i.withSchemaEvolution)
11951192
} else {
11961193
OverwritePartitionsDynamic.byPosition(
11971194
r,
11981195
query,
1199-
writeOptions = schemaEvolutionWriteOption)
1196+
withSchemaEvolution = i.withSchemaEvolution)
12001197
}
12011198
} else {
12021199
if (isByName) {
12031200
OverwriteByExpression.byName(
12041201
table = r,
12051202
df = query,
12061203
deleteExpr = staticDeleteExpression(r, staticPartitions),
1207-
writeOptions = schemaEvolutionWriteOption)
1204+
withSchemaEvolution = i.withSchemaEvolution)
12081205
} else {
12091206
OverwriteByExpression.byPosition(
12101207
table = r,
12111208
query = query,
12121209
deleteExpr = staticDeleteExpression(r, staticPartitions),
1213-
writeOptions = schemaEvolutionWriteOption)
1210+
withSchemaEvolution = i.withSchemaEvolution)
12141211
}
12151212
}
12161213
}
@@ -3637,7 +3634,8 @@ class Analyzer(
36373634
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
36383635
_.containsPattern(COMMAND), ruleId) {
36393636
case v2Write: V2WriteCommand
3640-
if v2Write.table.resolved && v2Write.query.resolved && !v2Write.outputResolved =>
3637+
if v2Write.table.resolved && v2Write.query.resolved && !v2Write.outputResolved &&
3638+
v2Write.pendingSchemaChanges.isEmpty =>
36413639
validateStoreAssignmentPolicy()
36423640
TableOutputResolver.suitableForByNameCheck(v2Write.isByName,
36433641
expected = v2Write.table.output, queryOutput = v2Write.query.output)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala

Lines changed: 0 additions & 98 deletions
This file was deleted.

0 commit comments

Comments
 (0)