[SPARK-55690] Schema evolution in DSv2 AppendData, OverwriteByExpression, OverwritePartitionsDynamic#54488
[SPARK-55690] Schema evolution in DSv2 AppendData, OverwriteByExpression, OverwritePartitionsDynamic#54488johanl-db wants to merge 16 commits intoapache:masterfrom
Conversation
szehon-ho
left a comment
There was a problem hiding this comment.
Thanks, I think this is a great pr! The tests coverage can be improved on various cases, but functionally its a good change:
eg
- INSERT OVERWRITE with PartitionOverwriteMode.DYNAMIC + schema evolution
- Case-insensitive column name matching
- Static partition overwrite with schema evolution
- Table without AUTOMATIC_SCHEMA_EVOLUTION capability should no-op
etc
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
szehon-ho
left a comment
There was a problem hiding this comment.
This looks good to me!
suggestion: add tests like:
- type evolution
- 2 level structs
- non-partitioned table
- constraints
Also do we run the same tests Dataframe API? (I think we only test with SQL?)
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
Outdated
Show resolved
Hide resolved
szehon-ho
left a comment
There was a problem hiding this comment.
lgtm! cc @aokolnychyi @cloud-fan
I meant to reply earlier to this:
For the dataframe API: Spark doesn't actually provide a way to enable schema evolution via the dataframe API, so I've left that out for now. Adding it would require more discussions: Delta (and Iceberg) do it via an a writer option |
No worries, i took another look and the test coverage looks good in the latest pr.
Yea makes sense, i think it can be a follow up.
Sorry maybe we can ignore that, i was thinking the other case, it was more applicable for source having fewer columns case (checking whether putting NULLS violate the constraints), but you are right and it doesnt apply here.
Yes, also realize after I typed it, forgot that there is no mergeSchema option for normal inserts. It'd be nice at some point, but definitely a follow up |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
|
@szehon-ho @johanl-db I'd say we should get #54704 in first to reduce the scope of this PR and simplify review. |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Show resolved
Hide resolved
| @@ -3642,7 +3648,8 @@ class Analyzer( | |||
| override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( | |||
| _.containsPattern(COMMAND), ruleId) { | |||
| case v2Write: V2WriteCommand | |||
There was a problem hiding this comment.
@johanl-db @szehon-ho, can you folks explain the relation between skipSchemaEvolution via ACCEPT_ANY_SCHEMA and automatic schema evolution via AUTOMATIC_SCHEMA_EVOLUTION? Are these two mutually exclusive? Or can they co-exist? MERGE vs INSERT?
There was a problem hiding this comment.
Merge with ACCEPT_ANY_SCHEMA on normal DSV2 data source breaks today as it relies on external rule to resolve the merge.
I think insert already works with ACCEPT_ANY_SCHEMA, and this would be another mode. Probably should be mutually exclusive?
There was a problem hiding this comment.
Discussed with @aokolnychyi this morning: AUTOMATIC_SCHEMA_EVOLUTION and ACCEPT_ANY_SCHEMA are not exclusive:
AUTOMATIC_SCHEMA_EVOLUTIONallows the rule ResolveSchemaEvolution to triggerACCEPT_ANY_SCHEMAskips some resolution steps in Spark, under the assumption that the connector will handle them. In particular:- For INSERT, skips schema alignment in ResolveOutputRelation
- For MERGE: skips clause resolution in Analyzer, and skips schema alignment in ResolveRowLevelCommandAssignments
At least, that's how Spark applies these capabilities today, even though the name ACCEPT_ANY_SCHEMA suggests more.
The connector can choose to set either depending on the resolution flow that suits.
For example, Delta today always handles schema evolution itself (doesn't set AUTOMATIC_SCHEMA_EVOLUTION) and does resolution / schema alignment (sets ACCEPT_ANY_SCHEMA)
As Delta moves to DSv2, my plan is to have two phases:
- Delta sets both
AUTOMATIC_SCHEMA_EVOLUTIONandACCEPT_ANY_SCHEMA: Spark handles schema evolution, but Delta takes over to do the resolution of MERGE clauses initially, and then do schema alignment for both INSERT and MERGE - Delta sets only
AUTOMATIC_SCHEMA_EVOLUTION: once we've reconciled all behavior differences between how Delta and Spark do schema alignment today, we hand over schema alignment to Spark. This will require substantial efforts, and careful breaking changes (if at all possible) in Delta
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
Show resolved
Hide resolved
| object ResolveSchemaEvolution extends Rule[LogicalPlan] with Logging { | ||
|
|
||
| override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( | ||
| _.containsPattern(COMMAND), ruleId) { |
There was a problem hiding this comment.
I do agree about _.containsPattern but what about ruleId? Does it mean we will now mark this rule as ineffective after first pass? Will this potentially break MERGE that relies on some but not full resolution before the schema evolution kicks in?
@szehon-ho, can you check?
There was a problem hiding this comment.
Removed ruleId, this is very likely wrong.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
|
This is getting very close, primarily minor suggestions. Worried about rule ID. |
aokolnychyi
left a comment
There was a problem hiding this comment.
Looks good to me except some minor points.
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
|
Thanks, @johanl-db! Merged to master. |
### What changes were proposed in this pull request? Follow-up cleanup after [merge / V2 write schema evolution refactors](#54488): - Remove unused `DataTypeUtils.extractAllFieldPaths` and `extractLeafFieldPaths` (no call sites). - Update `Analyzer` comments that still referred to the removed rule name `ResolveMergeIntoTableSchemaEvolution`; the rule is now `ResolveSchemaEvolution`. - Clarify the `MergeIntoTable.schemaEvolutionReady` comment: `isSchemaEvolutionCandidate` lives on the companion object and is private. ### Why are the changes needed? Avoid misleading references to deleted APIs and dead utility code. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing tests; change is comment + unused API removal only. Closes #54930 from szehon-ho/SPARK-55690-fix-dead-refs. Authored-by: Szehon Ho <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…6319) ## Description Support for schema evolution in INSERT landed in Spark master: apache/spark#54488 A few changes are required to maintain compatibility in Delta: - V2Write commands (AppendData, OverwriteByExpression, ..) now have an extra parameter `withSchemaEvolution` and introduce a few methods to implement for schema evolution for V2Write commands (e.p. `writePrivileges`) - Spark doesn't set delta's writer option "mergeSchema" anymore, which was used as workaround until a `withSchemaEvolution` was introduced on write plan nodes. Instead, Spark now sets `withSchemaEvolution` directly, and a pre-resolution rule `PropagateSchemaEvolutionWriteOption` is added in Delta to set the writer option `mergeSchema` when `withSchemaEvolution=true` if it's not already explicitly set by the user. ## How was this patch tested? Existing tests ## Does this PR introduce _any_ user-facing changes? No
What changes were proposed in this pull request?
Adds support for schema evolution during INSERT operations (AppendData, OverwriteByExpression, OverwritePartitionsDynamic)
When the table reports capability
AUTOMATIC_SCHEMA_EVOLUTION, a new analyzer ruleResolveInsertSchemaEvolutioncollects new columns and nested fields present in the source query but not in the table schema, and adds them to the target table by callingcatalog.alterTable()Identifying new columns/fields respects the resolution semantics of INSERT operations: matching fields by-name vs by-position.
This builds on previous from @szehon-ho , in particular #51698.
The first two commits move this previous code around to reuse it, the core of the implementation is in the third commit.
Why are the changes needed?
The
WITH SCHEMA EVOLUTIONsyntax for SQL inserts was added recently: #53732. This actually implements schema evolution behind this syntax.Does this PR introduce any user-facing change?
Yes, when the
WITH SCHEMA EVOLUTIONclause is specified in SQL INSERT operations, new columns and nested fields in the source data will be added to the target table - assuming the data source supports schema evolution (capability AUTOMATIC_SCHEMA_EVOLUTION):How was this patch tested?
Added basic testing in
DataSourceV2SQLSuite.Integrated with Delta and ran extensive Delta test harness for schema evolution against this implementation.
See delta-io/delta#6140. A number of expected failures for tests that would need to be updated on Delta side (different error class returned, negative tests checking something specifically doesn't work if a fix is disabled, ..)