[SPARK-52991][SQL] Implement MERGE INTO with SCHEMA EVOLUTION for V2 Data Source#51698
[SPARK-52991][SQL] Implement MERGE INTO with SCHEMA EVOLUTION for V2 Data Source#51698szehon-ho wants to merge 16 commits intoapache:masterfrom
Conversation
There was a problem hiding this comment.
This is some code needed to handle the UpdateStar and InsertStar cases.
The star is expanded to whole target schema (mergedSchema), and would not work in the case that Source has less fields than Target.
Ie, MERGE INTO target USING source where target=(a int), source=(b int).
Merged target schema should be (a, b), but Update * should be expanded to just one assignment for s.b, and not s.a.
There was a problem hiding this comment.
shall we issue an alterTable request to change the table schema?
There was a problem hiding this comment.
changed to spark generating tableChanges and calling alterTable
There was a problem hiding this comment.
I thought UPDATE * should assign value to each column in the target table. What's the semantic if some columns are omitted? Remain unchanged?
There was a problem hiding this comment.
yea its a bit of a new feature for Spark MERGE INTO, Delta Lake MERGE INTO support this case and makes the new value null.
https://docs.delta.io/latest/delta-update.html#automatic-schema-evolution
Actually I am adding this support for top level field assignment here, we still dont have the support for struct type where source has less struct fields (as I mention in the pr description)
Do you think I should do it in a separate pr?
There was a problem hiding this comment.
for columns not present in source table, the value is null?
There was a problem hiding this comment.
yes, inspire by current behavior of delta lake : https://docs.delta.io/latest/delta-update.html#automatic-schema-evolution
a9a21a2 to
8c200ae
Compare
|
addressed @cloud-fan comments here and offline, now
|
f27d946 to
bb1daf3
Compare
.../src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
nit: given we are already resolving column names here, we can skip the use of UnresolvedAttribute:
val assignments = targetTable.output.flatMap { targetAttr =>
sourceTable.output
.find(sourceAttr => conf.resolver(sourceAttr.name, targetAttr.name))
.map(Assignment(targetAttr, _))
}
There was a problem hiding this comment.
can we add a comment to mention where we will handle the missing columns?
.../src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
nit: to make the code more extensible, we shouldn't do the check here, but put it in the default case match
case _ if current == newType => Array.empty
case _ => throw QueryExecutionErrors.failedToMergeIncompatibleSchemasError ...
There was a problem hiding this comment.
we should pass the user-specified table name, instead of the v2 source defined name. CatalogV2Implicits.IdentifierHelper can help here: i.toQualifiedNameParts(c.name)
There was a problem hiding this comment.
@cloud-fan i was a bit back and forth on whether to do this by default. The change would be, it makes more cases pass that use to fail.
I was thinking some data source already depend on the old behavior: https://docs.delta.io/latest/delta-update.html#automatic-schema-evolution
Behavior without schema evolution is to throw the error.
Let me know , though if i should do it in all cases
There was a problem hiding this comment.
This is a good point. I think it's clearer to always do it, but this means v2 sources need to handle more cases now, so the error message may get worse if the behavior is undefined for those existing v2 sources.
There was a problem hiding this comment.
oh actually it doesn't matter. V2 sources that support custom schema evolution will likely use ACCET_ANY_SCHEMA capability, and this whole MERGE clauses resolution code path will be skipped for ACCET_ANY_SCHEMA.
So the question is: does the row level operation framework support missing columns in MERGE INSERT/UPDATE clauses?
There was a problem hiding this comment.
yes, the later rule ResolveRowLevelCommandAssignment will add the missing column for INSERT/UPDATE already. It already does that to handle the non STAR case (insert or update only a subset of column)
There was a problem hiding this comment.
then I feel it's better to resolve * to avaiable source columns only for all cases.
There was a problem hiding this comment.
sounds good, let's do it. Updated
c12be2e to
f43ac54
Compare
f43ac54 to
72f9c56
Compare
|
thanks, merging to master! |
…l` and add a new test ### What changes were proposed in this pull request? Minor follow up for #51698 1. Small optimization for MergeIntoTable node (aokolnychyi noticed post-commit that the new states can be calculated once using `lazy val`, as each time rules will copy the node so the state never changes). 2. Relocate `left`, `right`, `withNewChildrenInternal` 3. Add test to validate that default values work in schema evolution case. ### Why are the changes needed? Small analysis performance improvement and increase test coverage ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Run existing tests, and the patch adds another test ### Was this patch authored or co-authored using generative AI tooling? No Closes #52362 from szehon-ho/merge_schema_evolution_follow. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
… field than target ### What changes were proposed in this pull request? Support MERGE INTO where source has less fields than target. This is already partially supported as part of: #51698, but only for top level fields. This support it even for nested fields (structs, including within other structs, arrays, and maps) This patch modifies the MERGE INTO assignment to re-use existing logic in TableOutputResolver to resolve empty values in structs to null or default. UPDATE can also benefit from this, but we can do it in a subsequent pr. ### Why are the changes needed? For cases where source has less fields than target in MERGE INTO, it should behave more gracefully (inserting null values where source field does not exist). ### Does this PR introduce _any_ user-facing change? No, only that this scenario used to fail and will now pass. This gates on a new flag: "spark.sql.merge.source.nested.type.coercion.enabled", enabled by default. ### How was this patch tested? Add unit test to MergeIntoTableSuiteBase ### Was this patch authored or co-authored using generative AI tooling? No Closes #52347 from szehon-ho/nested_merge_round_3. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…nced columns ### What changes were proposed in this pull request? Change MERGE INTO schema evolution scope. Limit the scope of schema evolution to only add columns/nested fields that exist in source and which are directly assigned to the source column without transformation. ie, ``` UPDATE SET new_col = source.new_col UPDATE SET struct.new_field = source.struct.new_field INSERT (old_col, new_col) VALUES (s.old_col, s.new_col) ``` ### Why are the changes needed? #51698 added schema evolution support for MERGE INTO statements. However, it is a bit too broad. In some instances, source table may have many more fields than target tables. But user may only need a few new ones to be added to the target for the MERGE INTO statement. ### Does this PR introduce _any_ user-facing change? No, MERGE INTO schema evolution is not yet released in Spark 4.1. ### How was this patch tested? Added many unit tests in MergeIntoTableSuiteBase ### Was this patch authored or co-authored using generative AI tooling? No Closes #52866 from szehon-ho/merge_schema_evolution_limit_cols. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…nced columns ### What changes were proposed in this pull request? Change MERGE INTO schema evolution scope. Limit the scope of schema evolution to only add columns/nested fields that exist in source and which are directly assigned to the source column without transformation. ie, ``` UPDATE SET new_col = source.new_col UPDATE SET struct.new_field = source.struct.new_field INSERT (old_col, new_col) VALUES (s.old_col, s.new_col) ``` ### Why are the changes needed? #51698 added schema evolution support for MERGE INTO statements. However, it is a bit too broad. In some instances, source table may have many more fields than target tables. But user may only need a few new ones to be added to the target for the MERGE INTO statement. ### Does this PR introduce _any_ user-facing change? No, MERGE INTO schema evolution is not yet released in Spark 4.1. ### How was this patch tested? Added many unit tests in MergeIntoTableSuiteBase ### Was this patch authored or co-authored using generative AI tooling? No Closes #52866 from szehon-ho/merge_schema_evolution_limit_cols. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 03eb023) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…nced columns ### What changes were proposed in this pull request? Change MERGE INTO schema evolution scope. Limit the scope of schema evolution to only add columns/nested fields that exist in source and which are directly assigned to the source column without transformation. ie, ``` UPDATE SET new_col = source.new_col UPDATE SET struct.new_field = source.struct.new_field INSERT (old_col, new_col) VALUES (s.old_col, s.new_col) ``` ### Why are the changes needed? apache#51698 added schema evolution support for MERGE INTO statements. However, it is a bit too broad. In some instances, source table may have many more fields than target tables. But user may only need a few new ones to be added to the target for the MERGE INTO statement. ### Does this PR introduce _any_ user-facing change? No, MERGE INTO schema evolution is not yet released in Spark 4.1. ### How was this patch tested? Added many unit tests in MergeIntoTableSuiteBase ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52866 from szehon-ho/merge_schema_evolution_limit_cols. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…l` and add a new test ### What changes were proposed in this pull request? Minor follow up for apache#51698 1. Small optimization for MergeIntoTable node (aokolnychyi noticed post-commit that the new states can be calculated once using `lazy val`, as each time rules will copy the node so the state never changes). 2. Relocate `left`, `right`, `withNewChildrenInternal` 3. Add test to validate that default values work in schema evolution case. ### Why are the changes needed? Small analysis performance improvement and increase test coverage ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Run existing tests, and the patch adds another test ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52362 from szehon-ho/merge_schema_evolution_follow. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
… field than target ### What changes were proposed in this pull request? Support MERGE INTO where source has less fields than target. This is already partially supported as part of: apache#51698, but only for top level fields. This support it even for nested fields (structs, including within other structs, arrays, and maps) This patch modifies the MERGE INTO assignment to re-use existing logic in TableOutputResolver to resolve empty values in structs to null or default. UPDATE can also benefit from this, but we can do it in a subsequent pr. ### Why are the changes needed? For cases where source has less fields than target in MERGE INTO, it should behave more gracefully (inserting null values where source field does not exist). ### Does this PR introduce _any_ user-facing change? No, only that this scenario used to fail and will now pass. This gates on a new flag: "spark.sql.merge.source.nested.type.coercion.enabled", enabled by default. ### How was this patch tested? Add unit test to MergeIntoTableSuiteBase ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52347 from szehon-ho/nested_merge_round_3. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…nced columns ### What changes were proposed in this pull request? Change MERGE INTO schema evolution scope. Limit the scope of schema evolution to only add columns/nested fields that exist in source and which are directly assigned to the source column without transformation. ie, ``` UPDATE SET new_col = source.new_col UPDATE SET struct.new_field = source.struct.new_field INSERT (old_col, new_col) VALUES (s.old_col, s.new_col) ``` ### Why are the changes needed? apache#51698 added schema evolution support for MERGE INTO statements. However, it is a bit too broad. In some instances, source table may have many more fields than target tables. But user may only need a few new ones to be added to the target for the MERGE INTO statement. ### Does this PR introduce _any_ user-facing change? No, MERGE INTO schema evolution is not yet released in Spark 4.1. ### How was this patch tested? Added many unit tests in MergeIntoTableSuiteBase ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52866 from szehon-ho/merge_schema_evolution_limit_cols. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…ion, OverwritePartitionsDynamic ### What changes were proposed in this pull request? Adds support for schema evolution during INSERT operations (AppendData, OverwriteByExpression, OverwritePartitionsDynamic) When the table reports capability `AUTOMATIC_SCHEMA_EVOLUTION`, a new analyzer rule `ResolveInsertSchemaEvolution` collects new columns and nested fields present in the source query but not in the table schema, and adds them to the target table by calling `catalog.alterTable()` Identifying new columns/fields respects the resolution semantics of INSERT operations: matching fields by-name vs by-position. This builds on previous from szehon-ho , in particular #51698. The first two commits move this previous code around to reuse it, the core of the implementation is in the [third commit](7be9d2a). ### Why are the changes needed? The `WITH SCHEMA EVOLUTION` syntax for SQL inserts was added recently: #53732. This actually implements schema evolution behind this syntax. ### Does this PR introduce _any_ user-facing change? Yes, when the `WITH SCHEMA EVOLUTION` clause is specified in SQL INSERT operations, new columns and nested fields in the source data will be added to the target table - assuming the data source supports schema evolution (capability AUTOMATIC_SCHEMA_EVOLUTION): ``` CREATE TABLE target (id INT); INSERT INTO target VALUES (1); INSERT WITH SCHEMA EVOLUTION INTO target SELECT 2 AS id, "two" AS value; SELECT * FROM target; | id | value | |----|-------| | 1 | null | | 2 | "two" | ``` ### How was this patch tested? Added basic testing in `DataSourceV2SQLSuite`. Integrated with Delta and ran extensive Delta test harness for schema evolution against this implementation. See delta-io/delta#6140. A number of expected failures for tests that would need to be updated on Delta side (different error class returned, negative tests checking something specifically doesn't work if a fix is disabled, ..) Closes #54488 from johanl-db/dsv2-schema-evolution-insert. Authored-by: Johan Lasperas <johan.lasperas@databricks.com> Signed-off-by: Anton Okolnychyi <aokolnychyi@apache.org>
…tion in MERGE ### What changes were proposed in this pull request? Fixes a small bug in the [initial implementation of schema evolution in MERGE](#51698): when a nested struct fields present in the source is used aliased in a direct assignment clause in MERGE, it is not correctly considered for schema evolution. Example: ``` source.mergeInto("target", condition) .whenMatched() update(Map("info" -> col("source.info").as("info"))) .withSchemaEvolution() .merge() ``` where `info` is a struct that contains an extra field in the source compared to the target. Without this fix, the extra field is ignored during schema evolution and isn't added to the target table. With the fix, it is correctly added to the target schema. ### How was this patch tested? Added a test that reproduces the issue. Closes #54891 from johanl-db/dsv2-schema-evolution-merge-alias. Authored-by: Johan Lasperas <johan.lasperas@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Add support for schema evolution for data source that support MERGE INTO, currently V2 DataSources. This means that if the SOURCE table of merge has a different schema than TARGET table, the TARGET table can automatically update to take into account the new or different fields.
The basic idea is to add
For any new field in the top level or in a nested struct, Spark will add the field to the end.
TODOS:
Example:
will only work if the user specifies a value explicitly for the new nested field t.b for INSERT and UPDATE, ie
and not if they use INSERT * or UPDATE SET *.
We can take this in a follow on pr.
Why are the changes needed?
#45748 added the syntax 'WITH SCHEMA EVOLUTION' to 'MERGE INTO'. However, this requires some external Spark extension to resolve Merge, and doesnt do anything in Spark's native MERGE implementation.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added many tests to MergeIntoTableSuiteBase
Was this patch authored or co-authored using generative AI tooling?
No