[SPARK-54971] Add WITH SCHEMA EVOLUTION syntax for SQL INSERT#53732
[SPARK-54971] Add WITH SCHEMA EVOLUTION syntax for SQL INSERT#53732longvu-db wants to merge 7 commits intoapache:masterfrom
Conversation
JIRA Issue Information=== Improvement SPARK-54971 === This comment was automatically generated by GitHub Actions |
| if (!i.overwrite) { | ||
| if (isByName) { | ||
| AppendData.byName(r, query) | ||
| AppendData.byName( |
There was a problem hiding this comment.
why not properly model the schemaEvolution flag in AppendData/ etc?
There was a problem hiding this comment.
@szehon-ho You mean we should add a flag schemaEvolution in place of writeOptions = schemaEvolutionWriteOption in v2 Write nodes, and add the schemaEvolutionWriteOption in https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala#L61 if the schemaEvolution flag is enabled? Do you have an example?
There was a problem hiding this comment.
Having a dedicated flag would be cleaner: mergeSchema is fairly overloaded, e.g. it can also be a read option in Parquet that means inferring a superset schema from multiple Parquet files being read.
But: dataframe operations have always been using mergeSchema to enable schema evolution in Delta and Iceberg. By reusing the mergeSchema option, we automatically get schema evolution working there.
If we introduce a new field, then until Delta/Iceberg pick it up, WITH SCHEMA EVOLUTION will essentially be ignored - not good.
I would use mergeSchema for now, we can still introduce a dedicated field later if we want to
|
I was thinking it can be interesting to have Spark optionally call alterTable , if the V2 data source has TableCapability.AUTOMATIC_SCHEMA_EVOLUTION (which we introduced when doing MERGE INTO schema evolution implementation in DSV2). That will ease the burden on the data sources. But it can be a future enhancement. |
| if (!i.overwrite) { | ||
| if (isByName) { | ||
| AppendData.byName(r, query) | ||
| AppendData.byName( |
There was a problem hiding this comment.
Having a dedicated flag would be cleaner: mergeSchema is fairly overloaded, e.g. it can also be a read option in Parquet that means inferring a superset schema from multiple Parquet files being read.
But: dataframe operations have always been using mergeSchema to enable schema evolution in Delta and Iceberg. By reusing the mergeSchema option, we automatically get schema evolution working there.
If we introduce a new field, then until Delta/Iceberg pick it up, WITH SCHEMA EVOLUTION will essentially be ignored - not good.
I would use mergeSchema for now, we can still introduce a dedicated field later if we want to
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| test("SPARK-54971: INSERT WITH SCHEMA EVOLUTION is currently unsupported") { |
There was a problem hiding this comment.
To cover the first
case InsertIntoStatement(l @ LogicalRelationWithTable(_: InsertableRelation, _),
parts, _, query, overwrite, false, _) if parts.isEmpty =>
parts, _, query, overwrite, false, _, withSchemaEvolution)
if parts.isEmpty && !withSchemaEvolution =>
| } | ||
| } | ||
|
|
||
| testPartitionedTable("SPARK-54971: INSERT WITH SCHEMA EVOLUTION is currently unsupported") { |
There was a problem hiding this comment.
To cover the 2nd case
case i @ InsertIntoStatement(l @ LogicalRelationWithTable(t: HadoopFsRelation, table),
parts, _, query, overwrite, _, _, withSchemaEvolution)
if query.resolved && !withSchemaEvolution =>
| validatePartitionSpec(partCols, i.partitionSpec) | ||
|
|
||
| val schemaEvolutionWriteOption: Map[String, String] = | ||
| if (i.withSchemaEvolution) Map("mergeSchema" -> "true") else Map.empty |
There was a problem hiding this comment.
why not add a new bool field to AppendData, like what we did for InsertIntoStatement? The MergeIntoTable also has a withSchemaEvolution flag.
There was a problem hiding this comment.
Hey @cloud-fan, thank you very much for your review! We also raised this point and this was our discussion on it
|
@johanl-db : A big question is where we should apply schema evolution. For MERGE INTO, @szehon-ho did a lot of work to do schema evolution within Spark, so that the behavior is controlled by Spark and all data source will have the same behavior. From this PR, it seems the data source need to implement INSERT schema evolution by its own? |
DSv1 data sources still need to do schema evolution themselves, this doesn't change with this PR. This is somewhat orthogonal to this PR that focuses on adding dedicated SQL syntax, but doesn't intend to extend schema evolution support beyond what exists today (i.e. mostly provided by DSv1 data source implementation themselves) |
|
thanks for the explanation, merging to master! |
### What changes were proposed in this pull request? This PR introduces two new SQL syntaxes for the `INSERT` command: - `INSERT INTO ... REPLACE ON <condition>` — replaces rows matching a condition - `INSERT INTO ... REPLACE USING (<columns>)` — replaces rows based on matching column values Similar to the [INSERT WITH SCHEMA EVOLUTION PR](apache#53732), Spark is only responsible for recognizing these syntaxes. Since no table format in open-source Spark implements these operations yet, users will receive an unsupported error if they try to use them. Data sources (e.g., Delta Lake) can implement support for these syntaxes by handling the `replaceCriteriaOpt` field in `InsertIntoStatement`. ### Why are the changes needed? `INSERT INTO ... REPLACE ON/USING` provides SQL syntax for atomically replacing a subset of rows in a table — a common pattern for incremental data loading. This builds on the existing `INSERT INTO ... REPLACE WHERE` syntax (SPARK-40956) and extends it with more flexible matching semantics: - `REPLACE ON` allows matching via arbitrary boolean expressions (e.g., `t.id = s.id`) - `REPLACE USING` allows matching via a list of column names ### Does this PR introduce _any_ user-facing change? Yes. Two new SQL syntaxes are recognized by the parser: - `INSERT [WITH SCHEMA EVOLUTION] INTO table AS alias [BY NAME] REPLACE ON condition query` - `INSERT [WITH SCHEMA EVOLUTION] INTO table AS alias [BY NAME] REPLACE USING (col1, col2) query` Both currently throw `UNSUPPORTED_INSERT_REPLACE_ON` / `UNSUPPORTED_INSERT_REPLACE_USING`. ### How was this patch tested? - DDLParserSuite: Parser tests for REPLACE USING, REPLACE ON, and combined WITH SCHEMA EVOLUTION - PlanResolutionSuite: V2 table unsupported error tests - InsertSuite (core): V1 table unsupported error tests - InsertSuite (hive): Hive table unsupported error tests ### Was this patch authored or co-authored using generative AI tooling? Yes. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ion, OverwritePartitionsDynamic ### What changes were proposed in this pull request? Adds support for schema evolution during INSERT operations (AppendData, OverwriteByExpression, OverwritePartitionsDynamic) When the table reports capability `AUTOMATIC_SCHEMA_EVOLUTION`, a new analyzer rule `ResolveInsertSchemaEvolution` collects new columns and nested fields present in the source query but not in the table schema, and adds them to the target table by calling `catalog.alterTable()` Identifying new columns/fields respects the resolution semantics of INSERT operations: matching fields by-name vs by-position. This builds on previous from szehon-ho , in particular #51698. The first two commits move this previous code around to reuse it, the core of the implementation is in the [third commit](7be9d2a). ### Why are the changes needed? The `WITH SCHEMA EVOLUTION` syntax for SQL inserts was added recently: #53732. This actually implements schema evolution behind this syntax. ### Does this PR introduce _any_ user-facing change? Yes, when the `WITH SCHEMA EVOLUTION` clause is specified in SQL INSERT operations, new columns and nested fields in the source data will be added to the target table - assuming the data source supports schema evolution (capability AUTOMATIC_SCHEMA_EVOLUTION): ``` CREATE TABLE target (id INT); INSERT INTO target VALUES (1); INSERT WITH SCHEMA EVOLUTION INTO target SELECT 2 AS id, "two" AS value; SELECT * FROM target; | id | value | |----|-------| | 1 | null | | 2 | "two" | ``` ### How was this patch tested? Added basic testing in `DataSourceV2SQLSuite`. Integrated with Delta and ran extensive Delta test harness for schema evolution against this implementation. See delta-io/delta#6140. A number of expected failures for tests that would need to be updated on Delta side (different error class returned, negative tests checking something specifically doesn't work if a fix is disabled, ..) Closes #54488 from johanl-db/dsv2-schema-evolution-insert. Authored-by: Johan Lasperas <johan.lasperas@databricks.com> Signed-off-by: Anton Okolnychyi <aokolnychyi@apache.org>
This PR introduces two new SQL syntaxes for the `INSERT` command: - `INSERT INTO ... REPLACE ON <condition>` — replaces rows matching a condition - `INSERT INTO ... REPLACE USING (<columns>)` — replaces rows based on matching column values Similar to the [INSERT WITH SCHEMA EVOLUTION PR](apache#53732), Spark is only responsible for recognizing these syntaxes. Since no table format in open-source Spark implements these operations yet, users will receive an unsupported error if they try to use them. Data sources (e.g., Delta Lake) can implement support for these syntaxes by handling the `replaceCriteriaOpt` field in `InsertIntoStatement`. `INSERT INTO ... REPLACE ON/USING` provides SQL syntax for atomically replacing a subset of rows in a table — a common pattern for incremental data loading. This builds on the existing `INSERT INTO ... REPLACE WHERE` syntax (SPARK-40956) and extends it with more flexible matching semantics: - `REPLACE ON` allows matching via arbitrary boolean expressions (e.g., `t.id = s.id`) - `REPLACE USING` allows matching via a list of column names Yes. Two new SQL syntaxes are recognized by the parser: - `INSERT [WITH SCHEMA EVOLUTION] INTO table AS alias [BY NAME] REPLACE ON condition query` - `INSERT [WITH SCHEMA EVOLUTION] INTO table AS alias [BY NAME] REPLACE USING (col1, col2) query` Both currently throw `UNSUPPORTED_INSERT_REPLACE_ON` / `UNSUPPORTED_INSERT_REPLACE_USING`. - DDLParserSuite: Parser tests for REPLACE USING, REPLACE ON, and combined WITH SCHEMA EVOLUTION - PlanResolutionSuite: V2 table unsupported error tests - InsertSuite (core): V1 table unsupported error tests - InsertSuite (hive): Hive table unsupported error tests Yes. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
### What changes were proposed in this pull request? This PR introduces two new SQL syntaxes for the `INSERT` command (think `JOIN ON/USING`and `INSERT REPLACE WHERE`): - `INSERT INTO ... REPLACE ON <condition>` — replaces rows matching a condition - `INSERT INTO ... REPLACE USING (<columns>)` — replaces rows based on matching column values Similar to the [INSERT WITH SCHEMA EVOLUTION PR](#53732), Spark is only responsible for recognizing these syntaxes. Since no table format in open-source Spark implements these operations yet, users will receive an unsupported error if they try to use them. ### Why are the changes needed? `INSERT INTO ... REPLACE ON/USING` provides SQL syntax for atomically replacing a subset of rows in a table. This builds on the existing `INSERT INTO ... REPLACE WHERE` syntax ([SPARK-40956](https://issues.apache.org/jira/browse/SPARK-40956) and extends it with more flexible matching semantics: - `REPLACE ON` allows matching via arbitrary boolean expressions (e.g., `t.id = s.id`) - `REPLACE USING` allows matching via a list of column names ### Does this PR introduce _any_ user-facing change? Yes. Two new SQL syntaxes are recognized by the parser: - `INSERT [WITH SCHEMA EVOLUTION] INTO table AS alias [BY NAME] REPLACE ON condition query` - `INSERT [WITH SCHEMA EVOLUTION] INTO table [BY NAME] REPLACE USING (column_list) query` Both currently throw `UNSUPPORTED_INSERT_REPLACE_ON_OR_USING`. ### How was this patch tested? - DDLParserSuite: Parser tests for REPLACE USING, REPLACE ON, and combined WITH SCHEMA EVOLUTION - PlanResolutionSuite: V2 table unsupported error tests - InsertSuite (core): V1 table unsupported error tests - InsertSuite (hive): Hive table unsupported error tests ### Was this patch authored or co-authored using generative AI tooling? Yes. Closes #54722 from longvu-db/insert-replace-on-using. Authored-by: Thang Long VU <long.vu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Similar to the MERGE WITH SCHEMA EVOLUTION PR, this PR introduces a syntax
WITH SCHEMA EVOLUTIONto the SQLINSERTcommand. Since this syntax is not fully implemented for any table formats yet, users will receive an exception if they try to use it.When
WITH SCHEMA EVOLUTIONis specified, schema evolution-related features must be turned on for this single statement and only in this statement.In this PR, Spark is only responsible for recognizing the existence or absence of the syntax WITH SCHEMA EVOLUTION, and the recognition info is passed down from the
Analyzer. WhenWITH SCHEMA EVOLUTIONis detected, Spark sets themergeSchemawrite option totruein the respective V2 Insert Command nodes.Data sources must respect the syntax and give appropriate reactions: Turn on features that are categorised as "schema evolution" when the
WITH SCHEMA EVOLUTIONSyntax exists.Why are the changes needed?
This intuitive SQL Syntax allows the user to specify Automatic Schema Evolution for a specific
INSERToperation.Some users would like Schema Evolution for DML commands like
MERGE,INSERT,... where the schema between the table and query relations can mismatch.Does this PR introduce any user-facing change?
Yes, Introducing the SQL Syntax
WITH SCHEMA EVOLUTIONto SQLINSERT.How was this patch tested?
Added UTs.
Was this patch authored or co-authored using generative AI tooling?
No.