[SPARK-55689] Skip unsupported column changes during schema evolution#54658
[SPARK-55689] Skip unsupported column changes during schema evolution#54658johanl-db wants to merge 9 commits intoapache:masterfrom
Conversation
b5c6f7d to
285a4a8
Compare
gengliangwang
left a comment
There was a problem hiding this comment.
Overall Summary
This PR introduces a SupportsTypeEvolution DSv2 trait so connectors can control which type changes are allowed during schema evolution, and extends INSERT operations with schema evolution support (previously only MERGE INTO). The schema change logic is refactored from MergeIntoTable/ResolveMergeIntoSchemaEvolution into a shared ResolveSchemaEvolution utility. The design direction is sound.
General comments:
- The PR description still says "TODO" under user-facing changes. This should be filled in before merging.
- The
changesForSchemaEvolutionlazy val onV2WriteCommandcomputesschemaChangeseven whenschemaEvolutionEnabledisfalse(since it's a separate lazy val). WhileneedSchemaEvolutionproperly guards the combination, consider makingchangesForSchemaEvolutionshort-circuit when schema evolution is disabled, to avoid calling intoResolveSchemaEvolution.schemaChangesunnecessarily.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsTypeEvolution.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
285a4a8 to
a3bb1ed
Compare
a3bb1ed to
695f9a8
Compare
695f9a8 to
c560be2
Compare
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsSchemaEvolution.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsSchemaEvolution.java
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsSchemaEvolution.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsSchemaEvolution.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Show resolved
Hide resolved
| isByName: Boolean): Array[TableChange] = | ||
| computeSchemaChanges( | ||
| originalTarget, | ||
| isByName: Boolean): Array[TableChange] = { |
There was a problem hiding this comment.
A question for a follow-up PR: does it make sense to return Seq instead of Array here?
cloud-fan
left a comment
There was a problem hiding this comment.
Summary
The fundamental problem. When DML (INSERT or MERGE) encounters a type mismatch between source data and target table, Spark has two mutually exclusive strategies per column: evolve the target schema to match the source, or cast the source data to match the target. On master, enabling schema evolution commits to evolve-everything — all type changes are passed to catalog.alterTable, and if the connector can't apply one, the query fails. There is no fallback to casting. This means schema evolution is strictly worse than no schema evolution for type combinations where casting would work (e.g., inserting INT into a STRING column).
The solution. This PR introduces SupportsSchemaEvolution, a DSv2 mix-in interface where connectors declare which column changes they can physically apply. computeSupportedSchemaChanges filters candidate changes through supportsColumnChange before sending them to alterTable. Unsupported changes are silently dropped from the evolution set — the remaining type mismatches then resolve through standard TableOutputResolver cast insertion, just as if schema evolution were disabled for those columns. This lets connectors evolve what they can and cast the rest, instead of all-or-nothing.
Scope. The interface applies uniformly to all DML with schema evolution — both INSERT (V2WriteCommand.pendingSchemaChanges) and MERGE (MergeIntoTable.pendingSchemaChanges) call computeSupportedSchemaChanges.
Backward compatibility. Tables that have AUTOMATIC_SCHEMA_EVOLUTION but don't implement SupportsSchemaEvolution get the old unfiltered behavior (all changes attempted).
sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsSchemaEvolution.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
Outdated
Show resolved
Hide resolved
|
all minor, LGTM |
…ntoTests.scala Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
…alog/InMemoryBaseTable.scala Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
…ysis/ResolveSchemaEvolution.scala Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
…ysis/ResolveSchemaEvolution.scala Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
…log/SupportsSchemaEvolution.java Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
What changes were proposed in this pull request?
The initial implementation of schema evolution in MERGE/INSERT is too aggressive when trying to automatically apply schema evolution: any type mismatch between the source data and target table triggers an attempt to change the target data type, even though the table may not support it.
This change adds a new DSv2 trait
SupportsSchemaEvolutionthat lets connectors indicate whether a given column change should be applied or not.Why are the changes needed?
When schema evolution is enabled, the following write currently fails if the connector doesn't support changing the type of
valuefrom STRING to INT:On the other hand, the write succeeds without schema evolutio, a cast from INT to STRING is added, which is valid.
Does this PR introduce any user-facing change?
Yes, the following query now succeeds instead of trying - and failing - to change data type of
valueto INT:How was this patch tested?
Added tests for type evolution in INSERT and MERGE INTO