Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ class Analyzer(
DeduplicateRelations ::
ResolveCollationName ::
ResolveMergeIntoSchemaEvolution ::
ResolveInsertSchemaEvolution ::
ValidateEventTimeWatermarkColumn ::
new ResolveReferences(catalogManager) ::
// Please do not insert any other rules in between. See the TODO comments in rule
Expand Down Expand Up @@ -1178,38 +1179,44 @@ class Analyzer(
AppendData.byName(
r,
query,
writeOptions = schemaEvolutionWriteOption)
writeOptions = schemaEvolutionWriteOption,
withSchemaEvolution = i.withSchemaEvolution)
} else {
AppendData.byPosition(
r,
query,
writeOptions = schemaEvolutionWriteOption)
writeOptions = schemaEvolutionWriteOption,
withSchemaEvolution = i.withSchemaEvolution)
}
} else if (conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC) {
if (isByName) {
OverwritePartitionsDynamic.byName(
r,
query,
writeOptions = schemaEvolutionWriteOption)
writeOptions = schemaEvolutionWriteOption,
withSchemaEvolution = i.withSchemaEvolution)
} else {
OverwritePartitionsDynamic.byPosition(
r,
query,
writeOptions = schemaEvolutionWriteOption)
writeOptions = schemaEvolutionWriteOption,
withSchemaEvolution = i.withSchemaEvolution)
}
} else {
if (isByName) {
OverwriteByExpression.byName(
table = r,
df = query,
deleteExpr = staticDeleteExpression(r, staticPartitions),
writeOptions = schemaEvolutionWriteOption)
writeOptions = schemaEvolutionWriteOption,
withSchemaEvolution = i.withSchemaEvolution)
} else {
OverwriteByExpression.byPosition(
table = r,
query = query,
deleteExpr = staticDeleteExpression(r, staticPartitions),
writeOptions = schemaEvolutionWriteOption)
writeOptions = schemaEvolutionWriteOption,
withSchemaEvolution = i.withSchemaEvolution)
}
}
}
Expand Down Expand Up @@ -3594,7 +3601,8 @@ class Analyzer(
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
_.containsPattern(COMMAND), ruleId) {
case v2Write: V2WriteCommand
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johanl-db @szehon-ho, can you folks explain the relation between skipSchemaEvolution via ACCEPT_ANY_SCHEMA and automatic schema evolution via AUTOMATIC_SCHEMA_EVOLUTION? Are these two mutually exclusive? Or can they co-exist? MERGE vs INSERT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge with ACCEPT_ANY_SCHEMA on normal DSV2 data source breaks today as it relies on external rule to resolve the merge.

I think insert already works with ACCEPT_ANY_SCHEMA, and this would be another mode. Probably should be mutually exclusive?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with @aokolnychyi this morning: AUTOMATIC_SCHEMA_EVOLUTION and ACCEPT_ANY_SCHEMA are not exclusive:

  • AUTOMATIC_SCHEMA_EVOLUTION allows the rule ResolveSchemaEvolution to trigger
  • ACCEPT_ANY_SCHEMA skips some resolution steps in Spark, under the assumption that the connector will handle them. In particular:
    • For INSERT, skips schema alignment in ResolveOutputRelation
    • For MERGE: skips clause resolution in Analyzer, and skips schema alignment in ResolveRowLevelCommandAssignments

At least, that's how Spark applies these capabilities today, even though the name ACCEPT_ANY_SCHEMA suggests more.

The connector can choose to set either depending on the resolution flow that suits.
For example, Delta today always handles schema evolution itself (doesn't set AUTOMATIC_SCHEMA_EVOLUTION) and does resolution / schema alignment (sets ACCEPT_ANY_SCHEMA)

As Delta moves to DSv2, my plan is to have two phases:

  1. Delta sets both AUTOMATIC_SCHEMA_EVOLUTION and ACCEPT_ANY_SCHEMA: Spark handles schema evolution, but Delta takes over to do the resolution of MERGE clauses initially, and then do schema alignment for both INSERT and MERGE
  2. Delta sets only AUTOMATIC_SCHEMA_EVOLUTION: once we've reconciled all behavior differences between how Delta and Spark do schema alignment today, we hand over schema alignment to Spark. This will require substantial efforts, and careful breaking changes (if at all possible) in Delta

if v2Write.table.resolved && v2Write.query.resolved && !v2Write.outputResolved =>
if v2Write.table.resolved && v2Write.query.resolved && !v2Write.outputResolved &&
!v2Write.needSchemaEvolution =>
validateStoreAssignmentPolicy()
TableOutputResolver.suitableForByNameCheck(v2Write.isByName,
expected = v2Write.table.output, queryOutput = v2Write.query.output)
Expand Down

This file was deleted.

Loading