[SPARK-55903][SQL] Simplify MERGE Schema Evolution and Check Write Privileges#54704
[SPARK-55903][SQL] Simplify MERGE Schema Evolution and Check Write Privileges#54704szehon-ho wants to merge 4 commits intoapache:masterfrom
Conversation
| val writePrivileges = MergeIntoTable.getWritePrivileges(merge) | ||
| catalog.loadTable(ident, writePrivileges.toSet.asJava) | ||
| } catch { | ||
| case e: IllegalArgumentException if !e.isInstanceOf[SparkThrowable] => |
There was a problem hiding this comment.
this is to keep in line with AlterTableExec exceptions:
but let me know if it is unnecessary
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
Outdated
Show resolved
Hide resolved
| override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
| case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, notMatchedActions, | ||
| notMatchedBySourceActions, _) if m.resolved && m.rewritable && m.aligned && | ||
| !m.needSchemaEvolution && matchedActions.isEmpty && notMatchedActions.size == 1 && |
There was a problem hiding this comment.
I know we talked about this. Just to confirm. Removing !m.needSchemaEvolution is safe as aligned would be false otherwise?
There was a problem hiding this comment.
Yes, i also checked with AI here:
Yes. It’s safe even without assuming rule order.
Reasoning:
- aligned is defined as: every action’s assignments align with the current targetTable.output (same length, matching attribute names, compatible types).
- Pending schema evolution means the catalog target hasn’t been evolved yet, so in the plan targetTable.output is still the pre‑evolution schema.
- Assignments that refer to new columns or evolved types therefore cannot align with that current target (e.g. different length or incompatible types), so aligned is false whenever there are pending schema changes.
There was a problem hiding this comment.
Thanks for confirming!
| val writePrivileges = MergeIntoTable.getWritePrivileges(merge) | ||
| catalog.loadTable(ident, writePrivileges.toSet.asJava) | ||
| } catch { | ||
| case e: IllegalArgumentException if !e.isInstanceOf[SparkThrowable] => |
.../src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala
Show resolved
Hide resolved
| val keyPath = extractFieldPath(assignment.key, allowUnresolved = true) | ||
| // value should always be resolved (from source) | ||
| val valuePath = extractFieldPath(assignment.value, allowUnresolved = false) | ||
| keyPath == valuePath && assignment.value.references.subsetOf(source.outputSet) |
There was a problem hiding this comment.
Just to double check: assignment.value.references.subsetOf(source.outputSet) works for nested?
There was a problem hiding this comment.
yes.. also from AI here is how it works:
MERGE ... UPDATE SET addr.city = source.addr.city
- Key: nested path addr.city, e.g. GetStructField(GetStructField(target.addr, ...), ...)
- Value: same path on source, e.g. GetStructField(GetStructField(source.addr, 0), 1)
this may be possible to improve, we'll have to explore in the next pr
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
|
Thanks, @szehon-ho! Merged to master. |
What changes were proposed in this pull request?
Some simplification for Merge Into Schema Evolution and minor bug fixes
Why are the changes needed?
Discussed with @aokolnychyi on the state of Spark 4.1 schema evolution , and he suggested these changes as the code is currently confusing. Not using the write privileges is also wrong.
Does this PR introduce any user-facing change?
Write privilege is now enforced (if any system used DSV2 privileges). Error message are changed.
How was this patch tested?
Run existing unit tests
Was this patch authored or co-authored using generative AI tooling?
No