Skip to content

Commit 8995571

Browse files
longvu-dbclaude
andcommitted
[SPARK-XXXXX][SQL] Add INSERT INTO ... REPLACE ON/USING syntax
This PR introduces two new SQL syntaxes for the `INSERT` command: - `INSERT INTO ... REPLACE ON <condition>` — replaces rows matching a condition - `INSERT INTO ... REPLACE USING (<columns>)` — replaces rows based on matching column values Similar to the [INSERT WITH SCHEMA EVOLUTION PR](apache#53732), Spark is only responsible for recognizing these syntaxes. Since no table format in open-source Spark implements these operations yet, users will receive an unsupported error if they try to use them. Data sources (e.g., Delta Lake) can implement support for these syntaxes by handling the `replaceCriteriaOpt` field in `InsertIntoStatement`. `INSERT INTO ... REPLACE ON/USING` provides SQL syntax for atomically replacing a subset of rows in a table — a common pattern for incremental data loading. This builds on the existing `INSERT INTO ... REPLACE WHERE` syntax (SPARK-40956) and extends it with more flexible matching semantics: - `REPLACE ON` allows matching via arbitrary boolean expressions (e.g., `t.id = s.id`) - `REPLACE USING` allows matching via a list of column names Yes. Two new SQL syntaxes are recognized by the parser: - `INSERT [WITH SCHEMA EVOLUTION] INTO table AS alias [BY NAME] REPLACE ON condition query` - `INSERT [WITH SCHEMA EVOLUTION] INTO table AS alias [BY NAME] REPLACE USING (col1, col2) query` Both currently throw `UNSUPPORTED_INSERT_REPLACE_ON` / `UNSUPPORTED_INSERT_REPLACE_USING`. - DDLParserSuite: Parser tests for REPLACE USING, REPLACE ON, and combined WITH SCHEMA EVOLUTION - PlanResolutionSuite: V2 table unsupported error tests - InsertSuite (core): V1 table unsupported error tests - InsertSuite (hive): Hive table unsupported error tests Yes. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
1 parent 75757e1 commit 8995571

File tree

15 files changed

+299
-19
lines changed

15 files changed

+299
-19
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7847,6 +7847,18 @@
78477847
},
78487848
"sqlState" : "42809"
78497849
},
7850+
"UNSUPPORTED_INSERT_REPLACE_ON" : {
7851+
"message" : [
7852+
"INSERT INTO ... REPLACE ON is not supported for this table format."
7853+
],
7854+
"sqlState" : "0A000"
7855+
},
7856+
"UNSUPPORTED_INSERT_REPLACE_USING" : {
7857+
"message" : [
7858+
"INSERT INTO ... REPLACE USING is not supported for this table format."
7859+
],
7860+
"sqlState" : "0A000"
7861+
},
78507862
"UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION" : {
78517863
"message" : [
78527864
"INSERT WITH SCHEMA EVOLUTION ... is unsupported for this table format."

sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,8 @@ insertInto
575575
: INSERT (WITH SCHEMA EVOLUTION)? OVERWRITE TABLE? identifierReference optionsClause? (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
576576
| INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference optionsClause? partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
577577
| INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference optionsClause? (BY NAME)? REPLACE whereClause #insertIntoReplaceWhere
578+
| INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference tableAlias optionsClause? (BY NAME)? REPLACE USING identifierList #insertIntoReplaceUsing
579+
| INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference tableAlias optionsClause? (BY NAME)? REPLACE ON replaceOnCondition=booleanExpression #insertIntoReplaceOn
578580
| INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? createFileFormat? #insertOverwriteHiveDir
579581
| INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir
580582
;

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1016,7 +1016,7 @@ class Analyzer(
10161016

10171017
def apply(plan: LogicalPlan)
10181018
: LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) {
1019-
case i @ InsertIntoStatement(table, _, _, _, _, _, _, _) =>
1019+
case i @ InsertIntoStatement(table, _, _, _, _, _, _, _, _) =>
10201020
val relation = table match {
10211021
case u: UnresolvedRelation if !u.isStreaming =>
10221022
resolveRelation(u).getOrElse(u)
@@ -1152,13 +1152,20 @@ class Analyzer(
11521152
object ResolveInsertInto extends ResolveInsertionBase {
11531153
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
11541154
AlwaysProcess.fn, ruleId) {
1155-
case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _, _, _)
1155+
case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _, _, _, _)
11561156
if i.query.resolved =>
11571157
// ifPartitionNotExists is append with validation, but validation is not supported
11581158
if (i.ifPartitionNotExists) {
11591159
throw QueryCompilationErrors.unsupportedIfNotExistsError(r.table.name)
11601160
}
11611161

1162+
i.replaceCriteriaOpt.foreach {
1163+
case _: InsertReplaceOn =>
1164+
throw QueryCompilationErrors.unsupportedInsertReplaceOn()
1165+
case _: InsertReplaceUsing =>
1166+
throw QueryCompilationErrors.unsupportedInsertReplaceUsing()
1167+
}
1168+
11621169
// Create a project if this is an INSERT INTO BY NAME query.
11631170
val projectByName = if (i.userSpecifiedCols.nonEmpty) {
11641171
Some(createProjectForByNameQuery(r.table.name, i))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString
254254
// not found first, instead of errors in the input query of the insert command, by doing a
255255
// top-down traversal.
256256
plan.foreach {
257-
case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _, _, _) =>
257+
case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _, _, _, _) =>
258258
u.tableNotFound(u.multipartIdentifier)
259259

260260
// TODO (SPARK-27484): handle streaming write commands when we have them.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,10 @@ class AstBuilder extends DataTypeAstBuilder
891891
* [TABLE] tableIdentifier [partitionSpec] ([BY NAME] | [identifierList])
892892
* INSERT [WITH SCHEMA EVOLUTION] INTO
893893
* [TABLE] tableIdentifier REPLACE whereClause
894+
* INSERT [WITH SCHEMA EVOLUTION] INTO
895+
* [TABLE] tableIdentifier tableAlias [BY NAME] REPLACE USING identifierList
896+
* INSERT [WITH SCHEMA EVOLUTION] INTO
897+
* [TABLE] tableIdentifier tableAlias [BY NAME] REPLACE ON booleanExpression
894898
* INSERT OVERWRITE [LOCAL] DIRECTORY STRING [rowFormat] [createFileFormat]
895899
* INSERT OVERWRITE [LOCAL] DIRECTORY [STRING] tableProvider [OPTIONS tablePropertyList]
896900
* }}}
@@ -961,6 +965,44 @@ class AstBuilder extends DataTypeAstBuilder
961965
withSchemaEvolution = ctx.EVOLUTION() != null)
962966
}
963967
})
968+
case ctx: InsertIntoReplaceUsingContext =>
969+
val options = Option(ctx.optionsClause())
970+
val byName = ctx.NAME() != null
971+
val replaceUsingCols =
972+
Option(ctx.identifierList()).map(visitIdentifierList).getOrElse(Nil)
973+
withIdentClause(ctx.identifierReference, Seq(query), (ident, otherPlans) => {
974+
InsertIntoStatement(
975+
table = createUnresolvedRelation(ctx.identifierReference, ident, options,
976+
Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), isStreaming = false),
977+
partitionSpec = Map.empty,
978+
userSpecifiedCols = Seq.empty,
979+
query = otherPlans.head,
980+
overwrite = true,
981+
ifPartitionNotExists = false,
982+
byName = byName,
983+
withSchemaEvolution = ctx.EVOLUTION() != null,
984+
replaceCriteriaOpt = Some(InsertReplaceUsing(replaceUsingCols)))
985+
})
986+
case ctx: InsertIntoReplaceOnContext =>
987+
val options = Option(ctx.optionsClause())
988+
val byName = ctx.NAME() != null
989+
val replaceOnCond = expression(ctx.replaceOnCondition)
990+
val tableAliasOpt =
991+
getTableAliasWithoutColumnAlias(ctx.tableAlias(), "INSERT REPLACE ON")
992+
withIdentClause(ctx.identifierReference, Seq(query), (ident, otherPlans) => {
993+
val queryWithAlias = otherPlans.head
994+
InsertIntoStatement(
995+
table = createUnresolvedRelation(ctx.identifierReference, ident, options,
996+
Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), isStreaming = false),
997+
partitionSpec = Map.empty,
998+
userSpecifiedCols = Seq.empty,
999+
query = queryWithAlias,
1000+
overwrite = true,
1001+
ifPartitionNotExists = false,
1002+
byName = byName,
1003+
withSchemaEvolution = ctx.EVOLUTION() != null,
1004+
replaceCriteriaOpt = Some(InsertReplaceOn(replaceOnCond, tableAliasOpt)))
1005+
})
9641006
case dir: InsertOverwriteDirContext =>
9651007
val (isLocal, storage, provider) = visitInsertOverwriteDir(dir)
9661008
InsertIntoDir(isLocal, storage, provider, query, overwrite = true)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ case class QualifiedColType(
174174
* @param byName If true, reorder the data columns to match the column names of the
175175
* target table.
176176
* @param withSchemaEvolution If true, enables automatic schema evolution for the operation.
177+
* @param replaceCriteriaOpt Optional replace criteria for INSERT REPLACE ON/USING operations.
177178
*/
178179
case class InsertIntoStatement(
179180
table: LogicalPlan,
@@ -183,7 +184,8 @@ case class InsertIntoStatement(
183184
overwrite: Boolean,
184185
ifPartitionNotExists: Boolean,
185186
byName: Boolean = false,
186-
withSchemaEvolution: Boolean = false) extends UnaryParsedStatement {
187+
withSchemaEvolution: Boolean = false,
188+
replaceCriteriaOpt: Option[InsertReplaceCriteria] = None) extends UnaryParsedStatement {
187189

188190
require(overwrite || !ifPartitionNotExists,
189191
"IF NOT EXISTS is only valid in INSERT OVERWRITE")
@@ -196,3 +198,27 @@ case class InsertIntoStatement(
196198
override protected def withNewChildInternal(newChild: LogicalPlan): InsertIntoStatement =
197199
copy(query = newChild)
198200
}
201+
202+
/**
203+
* Sealed trait representing the replace criteria for INSERT REPLACE ON/USING operations.
204+
*/
205+
sealed trait InsertReplaceCriteria
206+
207+
/**
208+
* Replace criteria for INSERT INTO ... REPLACE ON <condition>.
209+
* Rows matching the condition in the target table are replaced by rows from the source query.
210+
*
211+
* @param condition The boolean expression used to match rows for replacement.
212+
* @param tableAliasOpt Optional alias for the target table used in the condition.
213+
*/
214+
case class InsertReplaceOn(
215+
condition: Expression,
216+
tableAliasOpt: Option[String]) extends InsertReplaceCriteria
217+
218+
/**
219+
* Replace criteria for INSERT INTO ... REPLACE USING (<columns>).
220+
* Rows are replaced based on matching values in the specified columns.
221+
*
222+
* @param columns The list of column names used for matching.
223+
*/
224+
case class InsertReplaceUsing(columns: Seq[String]) extends InsertReplaceCriteria

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,18 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
591591
messageParameters = Map.empty)
592592
}
593593

594+
def unsupportedInsertReplaceOn(): Throwable = {
595+
new AnalysisException(
596+
errorClass = "UNSUPPORTED_INSERT_REPLACE_ON",
597+
messageParameters = Map.empty)
598+
}
599+
600+
def unsupportedInsertReplaceUsing(): Throwable = {
601+
new AnalysisException(
602+
errorClass = "UNSUPPORTED_INSERT_REPLACE_USING",
603+
messageParameters = Map.empty)
604+
}
605+
594606
def writeIntoViewNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = {
595607
new AnalysisException(
596608
errorClass = "VIEW_WRITE_NOT_ALLOWED",

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1874,6 +1874,74 @@ class DDLParserSuite extends AnalysisTest {
18741874
}
18751875
}
18761876

1877+
for (isByName <- Seq(true, false)) {
1878+
val byNameClause = if (isByName) "BY NAME " else ""
1879+
val sourceQuery = "SELECT * FROM source"
1880+
val testMsg = s"isByName=$isByName"
1881+
1882+
test(s"INSERT INTO REPLACE USING - $testMsg") {
1883+
val table = "testcat.ns1.ns2.tbl"
1884+
val insertSQLStmt = s"INSERT INTO $table AS t " +
1885+
s"${byNameClause}REPLACE USING (col1, col2) ${sourceQuery}"
1886+
1887+
parseCompare(
1888+
sql = insertSQLStmt,
1889+
expected = InsertIntoStatement(
1890+
table = UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
1891+
partitionSpec = Map.empty,
1892+
userSpecifiedCols = Seq.empty,
1893+
query = Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))),
1894+
overwrite = true,
1895+
ifPartitionNotExists = false,
1896+
byName = isByName,
1897+
withSchemaEvolution = false,
1898+
replaceCriteriaOpt = Some(InsertReplaceUsing(Seq("col1", "col2"))))
1899+
)
1900+
}
1901+
1902+
test(s"INSERT INTO REPLACE ON - $testMsg") {
1903+
val table = "testcat.ns1.ns2.tbl"
1904+
val insertSQLStmt = s"INSERT INTO $table AS t " +
1905+
s"${byNameClause}REPLACE ON t.col1 = col2 ${sourceQuery}"
1906+
1907+
parseCompare(
1908+
sql = insertSQLStmt,
1909+
expected = InsertIntoStatement(
1910+
table = UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
1911+
partitionSpec = Map.empty,
1912+
userSpecifiedCols = Seq.empty,
1913+
query = Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))),
1914+
overwrite = true,
1915+
ifPartitionNotExists = false,
1916+
byName = isByName,
1917+
withSchemaEvolution = false,
1918+
replaceCriteriaOpt = Some(InsertReplaceOn(
1919+
EqualTo(UnresolvedAttribute(Seq("t", "col1")), UnresolvedAttribute("col2")),
1920+
Some("t"))))
1921+
)
1922+
}
1923+
1924+
test(s"INSERT WITH SCHEMA EVOLUTION INTO REPLACE USING - $testMsg") {
1925+
val table = "testcat.ns1.ns2.tbl"
1926+
val insertSQLStmt = s"INSERT WITH SCHEMA EVOLUTION INTO $table AS t " +
1927+
s"${byNameClause}REPLACE USING (col1) ${sourceQuery}"
1928+
1929+
parseCompare(
1930+
sql = insertSQLStmt,
1931+
expected = InsertIntoStatement(
1932+
table = UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
1933+
partitionSpec = Map.empty,
1934+
userSpecifiedCols = Seq.empty,
1935+
query = Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))),
1936+
overwrite = true,
1937+
ifPartitionNotExists = false,
1938+
byName = isByName,
1939+
withSchemaEvolution = true,
1940+
replaceCriteriaOpt = Some(InsertReplaceUsing(Seq("col1"))))
1941+
)
1942+
}
1943+
}
1944+
18771945
test("delete from table: delete all") {
18781946
parseCompare("DELETE FROM testcat.ns1.ns2.tbl",
18791947
DeleteFromTable(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
163163
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, query.output.map(_.name))
164164

165165
case InsertIntoStatement(l @ LogicalRelationWithTable(_: InsertableRelation, _),
166-
parts, _, query, overwrite, false, _, _)
166+
parts, _, query, overwrite, false, _, _, _)
167167
if parts.isEmpty =>
168168
InsertIntoDataSourceCommand(l, query, overwrite)
169169

@@ -176,7 +176,7 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
176176
InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite)
177177

178178
case i @ InsertIntoStatement(l @ LogicalRelationWithTable(t: HadoopFsRelation, table),
179-
parts, _, query, overwrite, _, _, _)
179+
parts, _, query, overwrite, _, _, _, _)
180180
if query.resolved =>
181181
// If the InsertIntoTable command is for a partitioned HadoopFsRelation and
182182
// the user has specified static partitions, we add a Project operator on top of the query
@@ -312,11 +312,11 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
312312

313313
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
314314
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false),
315-
_, _, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
315+
_, _, _, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
316316
i.copy(table = readDataSourceTable(tableMeta, options))
317317

318318
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false),
319-
_, _, _, _, _, _, _) =>
319+
_, _, _, _, _, _, _, _) =>
320320
i.copy(table = DDLUtils.readHiveTable(tableMeta))
321321

322322
case append @ AppendData(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.v2.{ExtractV2Table, FileTable}
3535
class FallBackFileSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] {
3636
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
3737
case i @ InsertIntoStatement(
38-
d @ ExtractV2Table(table: FileTable), _, _, _, _, _, _, _) =>
38+
d @ ExtractV2Table(table: FileTable), _, _, _, _, _, _, _, _) =>
3939
val v1FileFormat = table.fallbackFileFormat.getDeclaredConstructor().newInstance()
4040
val relation = HadoopFsRelation(
4141
table.fileIndex,

0 commit comments

Comments
 (0)