Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -7117,6 +7117,12 @@
},
"sqlState" : "42809"
},
"UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION" : {
"message" : [
"INSERT WITH SCHEMA EVOLUTION ... is unsupported for this table relation."
],
"sqlState" : "0A000"
},
"UNSUPPORTED_JOIN_TYPE" : {
"message" : [
"Unsupported join type '<typ>'. Supported join types include: <supported>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,9 +560,9 @@ query
;

insertInto
: INSERT OVERWRITE TABLE? identifierReference optionsClause? (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
| INSERT INTO TABLE? identifierReference optionsClause? partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
| INSERT INTO TABLE? identifierReference optionsClause? (BY NAME)? REPLACE whereClause #insertIntoReplaceWhere
: INSERT (WITH SCHEMA EVOLUTION)? OVERWRITE TABLE? identifierReference optionsClause? (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
| INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference optionsClause? partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
| INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference optionsClause? (BY NAME)? REPLACE whereClause #insertIntoReplaceWhere
| INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? createFileFormat? #insertOverwriteHiveDir
| INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,7 @@ class Analyzer(

def apply(plan: LogicalPlan)
: LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) {
case i @ InsertIntoStatement(table, _, _, _, _, _, _) =>
case i @ InsertIntoStatement(table, _, _, _, _, _, _, _) =>
val relation = table match {
case u: UnresolvedRelation if !u.isStreaming =>
resolveRelation(u).getOrElse(u)
Expand Down Expand Up @@ -1231,7 +1231,7 @@ class Analyzer(
object ResolveInsertInto extends ResolveInsertionBase {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
AlwaysProcess.fn, ruleId) {
case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _, _)
case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _, _, _)
if i.query.resolved =>
// ifPartitionNotExists is append with validation, but validation is not supported
if (i.ifPartitionNotExists) {
Expand All @@ -1249,27 +1249,50 @@ class Analyzer(
val partCols = partitionColumnNames(r.table)
validatePartitionSpec(partCols, i.partitionSpec)

val schemaEvolutionWriteOption: Map[String, String] =
if (i.withSchemaEvolution) Map("mergeSchema" -> "true") else Map.empty
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not add a new bool field to AppendData, like what we did for InsertIntoStatement? The MergeIntoTable also has a withSchemaEvolution flag.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @cloud-fan, thank you very much for your review! We also raised this point and this was our discussion on it

#53732 (comment)


val staticPartitions = i.partitionSpec.filter(_._2.isDefined).transform((_, v) => v.get)
val query = addStaticPartitionColumns(r, projectByName.getOrElse(i.query), staticPartitions,
isByName)

if (!i.overwrite) {
if (isByName) {
AppendData.byName(r, query)
AppendData.byName(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not properly model the schemaEvolution flag in AppendData/ etc?

Copy link
Copy Markdown
Contributor Author

@longvu-db longvu-db Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szehon-ho You mean we should add a flag schemaEvolution in place of writeOptions = schemaEvolutionWriteOption in v2 Write nodes, and add the schemaEvolutionWriteOption in https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala#L61 if the schemaEvolution flag is enabled? Do you have an example?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a dedicated flag would be cleaner: mergeSchema is fairly overloaded, e.g. it can also be a read option in Parquet that means inferring a superset schema from multiple Parquet files being read.

But: dataframe operations have always been using mergeSchema to enable schema evolution in Delta and Iceberg. By reusing the mergeSchema option, we automatically get schema evolution working there.
If we introduce a new field, then until Delta/Iceberg pick it up, WITH SCHEMA EVOLUTION will essentially be ignored - not good.

I would use mergeSchema for now, we can still introduce a dedicated field later if we want to

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks

r,
query,
writeOptions = schemaEvolutionWriteOption)
} else {
AppendData.byPosition(r, query)
AppendData.byPosition(
r,
query,
writeOptions = schemaEvolutionWriteOption)
}
} else if (conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC) {
if (isByName) {
OverwritePartitionsDynamic.byName(r, query)
OverwritePartitionsDynamic.byName(
r,
query,
writeOptions = schemaEvolutionWriteOption)
} else {
OverwritePartitionsDynamic.byPosition(r, query)
OverwritePartitionsDynamic.byPosition(
r,
query,
writeOptions = schemaEvolutionWriteOption)
}
} else {
if (isByName) {
OverwriteByExpression.byName(r, query, staticDeleteExpression(r, staticPartitions))
OverwriteByExpression.byName(
table = r,
df = query,
deleteExpr = staticDeleteExpression(r, staticPartitions),
writeOptions = schemaEvolutionWriteOption)
} else {
OverwriteByExpression.byPosition(r, query, staticDeleteExpression(r, staticPartitions))
OverwriteByExpression.byPosition(
table = r,
query = query,
deleteExpr = staticDeleteExpression(r, staticPartitions),
writeOptions = schemaEvolutionWriteOption)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString
// not found first, instead of errors in the input query of the insert command, by doing a
// top-down traversal.
plan.foreach {
case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _, _) =>
case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _, _, _) =>
u.tableNotFound(u.multipartIdentifier)

// TODO (SPARK-27484): handle streaming write commands when we have them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -875,9 +875,12 @@ class AstBuilder extends DataTypeAstBuilder
/**
* Add an
* {{{
* INSERT OVERWRITE TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]? [identifierList]
* INSERT INTO [TABLE] tableIdentifier [partitionSpec] ([BY NAME] | [identifierList])
* INSERT INTO [TABLE] tableIdentifier REPLACE whereClause
* INSERT [WITH SCHEMA EVOLUTION] OVERWRITE
* TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]? [identifierList]
* INSERT [WITH SCHEMA EVOLUTION] INTO
* [TABLE] tableIdentifier [partitionSpec] ([BY NAME] | [identifierList])
* INSERT [WITH SCHEMA EVOLUTION] INTO
* [TABLE] tableIdentifier REPLACE whereClause
* INSERT OVERWRITE [LOCAL] DIRECTORY STRING [rowFormat] [createFileFormat]
* INSERT OVERWRITE [LOCAL] DIRECTORY [STRING] tableProvider [OPTIONS tablePropertyList]
* }}}
Expand Down Expand Up @@ -906,7 +909,8 @@ class AstBuilder extends DataTypeAstBuilder
query = otherPlans.head,
overwrite = false,
ifPartitionNotExists = insertParams.ifPartitionNotExists,
byName = insertParams.byName)
byName = insertParams.byName,
withSchemaEvolution = table.EVOLUTION() != null)
})
case table: InsertOverwriteTableContext =>
val insertParams = visitInsertOverwriteTable(table)
Expand All @@ -923,7 +927,8 @@ class AstBuilder extends DataTypeAstBuilder
query = otherPlans.head,
overwrite = true,
ifPartitionNotExists = insertParams.ifPartitionNotExists,
byName = insertParams.byName)
byName = insertParams.byName,
withSchemaEvolution = table.EVOLUTION() != null)
})
case ctx: InsertIntoReplaceWhereContext =>
val options = Option(ctx.optionsClause())
Expand All @@ -932,10 +937,20 @@ class AstBuilder extends DataTypeAstBuilder
Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), isStreaming = false)
val deleteExpr = expression(ctx.whereClause().booleanExpression())
val isByName = ctx.NAME() != null
val schemaEvolutionWriteOption =
if (ctx.EVOLUTION() != null) Map("mergeSchema" -> "true") else Map.empty
if (isByName) {
OverwriteByExpression.byName(table, otherPlans.head, deleteExpr)
OverwriteByExpression.byName(
table,
df = otherPlans.head,
deleteExpr,
writeOptions = schemaEvolutionWriteOption)
} else {
OverwriteByExpression.byPosition(table, otherPlans.head, deleteExpr)
OverwriteByExpression.byPosition(
table,
query = otherPlans.head,
deleteExpr,
writeOptions = schemaEvolutionWriteOption)
}
})
case dir: InsertOverwriteDirContext =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ case class QualifiedColType(
* Only valid for static partitions.
* @param byName If true, reorder the data columns to match the column names of the
* target table.
* @param withSchemaEvolution If true, enables automatic schema evolution for the operation.
*/
case class InsertIntoStatement(
table: LogicalPlan,
Expand All @@ -181,7 +182,8 @@ case class InsertIntoStatement(
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean,
byName: Boolean = false) extends UnaryParsedStatement {
byName: Boolean = false,
withSchemaEvolution: Boolean = false) extends UnaryParsedStatement {

require(overwrite || !ifPartitionNotExists,
"IF NOT EXISTS is only valid in INSERT OVERWRITE")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
origin = t.origin)
}

def unsupportedInsertWithSchemaEvolution(): Throwable = {
new AnalysisException(
errorClass = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION",
messageParameters = Map.empty)
}

def writeIntoViewNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = {
new AnalysisException(
errorClass = "VIEW_WRITE_NOT_ALLOWED",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1790,6 +1790,90 @@ class DDLParserSuite extends AnalysisTest {
Literal(5))))
}

for {
isByName <- Seq(true, false)
userSpecifiedCols <- if (!isByName) {
Seq(Seq("a", "b"), Seq.empty)
} else {
Seq(Seq.empty)
}
} {
val byNameClause = if (isByName) "BY NAME " else ""
val sourceQuery = "SELECT * FROM source"
val userSpecifiedColsClause =
if (userSpecifiedCols.isEmpty) "" else userSpecifiedCols.mkString("(", ", ", ")")
val testMsg = s"isByName=$isByName, userSpecifiedColsClause=$userSpecifiedColsClause"

test(s"INSERT INTO with WITH SCHEMA EVOLUTION - $testMsg") {
val table = "testcat.ns1.ns2.tbl"
val insertSQLStmt = s"INSERT WITH SCHEMA EVOLUTION INTO $table " +
s"${userSpecifiedColsClause}${byNameClause}${sourceQuery}"

parseCompare(
sql = insertSQLStmt,
expected = InsertIntoStatement(
table = UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
partitionSpec = Map.empty,
userSpecifiedCols = userSpecifiedCols,
query = Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))),
overwrite = false,
ifPartitionNotExists = false,
byName = isByName,
withSchemaEvolution = true)
)
}

test(s"INSERT OVERWRITE (static) with WITH SCHEMA EVOLUTION - $testMsg") {
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key ->
SQLConf.PartitionOverwriteMode.STATIC.toString) {
val table = "testcat.ns1.ns2.tbl"
val insertSQLStmt = s"INSERT WITH SCHEMA EVOLUTION OVERWRITE $table " +
s"${userSpecifiedColsClause}${byNameClause}${sourceQuery}"

parseCompare(
sql = insertSQLStmt,
expected = InsertIntoStatement(
table = UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
partitionSpec = Map.empty,
userSpecifiedCols = Seq.empty,
query = Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))),
overwrite = true,
ifPartitionNotExists = false,
byName = isByName,
withSchemaEvolution = true)
)
}
}
}

for (isByName <- Seq(true, false)) {
val byNameClause = if (isByName) "BY NAME " else ""
val sourceQuery = "SELECT * FROM source"
val testMsg = s"isByName=$isByName"

test(s"INSERT OVERWRITE (dynamic) with WITH SCHEMA EVOLUTION - $testMsg") {
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key ->
SQLConf.PartitionOverwriteMode.DYNAMIC.toString) {
val table = "testcat.ns1.ns2.tbl"
val insertSQLStmt = s"INSERT WITH SCHEMA EVOLUTION OVERWRITE $table " +
s"${byNameClause}${sourceQuery}"

parseCompare(
sql = insertSQLStmt,
expected = InsertIntoStatement(
table = UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
partitionSpec = Map.empty,
userSpecifiedCols = Seq.empty,
query = Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))),
overwrite = true,
ifPartitionNotExists = false,
byName = isByName,
withSchemaEvolution = true)
)
}
}
}

test("delete from table: delete all") {
parseCompare("DELETE FROM testcat.ns1.ns2.tbl",
DeleteFromTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, query.output.map(_.name))

case InsertIntoStatement(l @ LogicalRelationWithTable(_: InsertableRelation, _),
parts, _, query, overwrite, false, _) if parts.isEmpty =>
parts, _, query, overwrite, false, _, withSchemaEvolution)
if parts.isEmpty && !withSchemaEvolution =>
InsertIntoDataSourceCommand(l, query, overwrite)

case InsertIntoDir(_, storage, provider, query, overwrite)
Expand All @@ -173,9 +174,9 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {

InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite)

case i @ InsertIntoStatement(
l @ LogicalRelationWithTable(t: HadoopFsRelation, table), parts, _, query, overwrite, _, _)
if query.resolved =>
case i @ InsertIntoStatement(l @ LogicalRelationWithTable(t: HadoopFsRelation, table),
parts, _, query, overwrite, _, _, withSchemaEvolution)
if query.resolved && !withSchemaEvolution =>
// If the InsertIntoTable command is for a partitioned HadoopFsRelation and
// the user has specified static partitions, we add a Project operator on top of the query
// to include those constant column values in the query result.
Expand Down Expand Up @@ -307,11 +308,11 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false),
_, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
_, _, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
i.copy(table = readDataSourceTable(tableMeta, options))

case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false),
_, _, _, _, _, _) =>
_, _, _, _, _, _, _) =>
i.copy(table = DDLUtils.readHiveTable(tableMeta))

case append @ AppendData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.v2.{ExtractV2Table, FileTable}
class FallBackFileSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoStatement(
d @ ExtractV2Table(table: FileTable), _, _, _, _, _, _) =>
d @ ExtractV2Table(table: FileTable), _, _, _, _, _, _, _) =>
val v1FileFormat = table.fallbackFileFormat.getDeclaredConstructor().newInstance()
val relation = HadoopFsRelation(
table.fileIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,10 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
partColNames: StructType,
catalogTable: Option[CatalogTable]): InsertIntoStatement = {

if (insert.withSchemaEvolution) {
throw QueryCompilationErrors.unsupportedInsertWithSchemaEvolution()
}

val normalizedPartSpec = normalizePartitionSpec(
insert.partitionSpec, partColNames, tblName, conf.resolver)

Expand Down Expand Up @@ -526,7 +530,8 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
}

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoStatement(table, _, _, query, _, _, _) if table.resolved && query.resolved =>
case i @ InsertIntoStatement(table, _, _, query, _, _, _, _)
if table.resolved && query.resolved =>
table match {
case relation: HiveTableRelation =>
val metadata = relation.tableMeta
Expand Down Expand Up @@ -606,7 +611,7 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case InsertIntoStatement(LogicalRelationWithTable(relation, _), partition,
_, query, _, _, _) =>
_, query, _, _, _, _) =>
// Get all input data source relations of the query.
val srcRelations = query.collect {
case l: LogicalRelation => l.relation
Expand Down Expand Up @@ -635,7 +640,7 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
messageParameters = Map("relationId" -> toSQLId(relation.toString)))
}

case InsertIntoStatement(t, _, _, _, _, _, _)
case InsertIntoStatement(t, _, _, _, _, _, _, _)
if !t.isInstanceOf[LeafNode] ||
t.isInstanceOf[Range] ||
t.isInstanceOf[OneRowRelation] ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ ExplainCommand 'Aggregate ['key], ['key, unresolvedalias('MIN('val))], Formatted
-- !query
EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4
-- !query analysis
ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [__required_write_privileges__=INSERT], false, false, false, false, ExtendedMode
ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [__required_write_privileges__=INSERT], false, false, false, false, false, ExtendedMode


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ ExplainCommand 'Aggregate ['key], ['key, unresolvedalias('MIN('val))], Formatted
-- !query
EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4
-- !query analysis
ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [__required_write_privileges__=INSERT], false, false, false, false, ExtendedMode
ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [__required_write_privileges__=INSERT], false, false, false, false, false, ExtendedMode


-- !query
Expand Down
Loading