Skip to content

Commit 9e88ac0

Browse files
schenksjclaude
andcommitted
fix(delta): translate column-mapping names on the pre-materialised-index path
`buildTaskListFromAddFiles` (TahoeBatchFileIndex / CdcAddFileIndex / TahoeRemoveFileIndex / TahoeChangeFileIndex) was emitting AddFile partition values keyed by the PHYSICAL column name and skipping `column_mappings` entirely -- both of which the kernel path already handles. For tables using column mapping id / name mode, CDC reads therefore lost both the real partition column value and all non- partition data columns (read as null under logical names the parquet file didn't have). Two companion changes: 1. `buildTaskListFromAddFiles` takes an optional `physicalToLogical` partition-name map; the TahoeBatchFileIndex callsite derives it from `relation.partitionSchema` field metadata (`delta.columnMapping.physicalName`) before dispatch. 2. Where kernel didn't populate `column_mappings` (i.e. the matchingFiles path), re-derive the logical->physical list from `relation.dataSchema ++ relation.partitionSchema` field metadata and inject it into the task list before the proto is finalised. The native-side column-mapping rewrite in planner.rs already consumes that list. Validated by the `decoded-objects` representative test (DeltaCDCSQLIdColumnMappingSuite "batch write: append, dynamic partition overwrite + CDF - column mapping id mode"), which now returns the correct CDC records instead of (null, null) data columns. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent ff352e4 commit 9e88ac0

1 file changed

Lines changed: 62 additions & 4 deletions

File tree

spark/src/main/scala/org/apache/comet/serde/operator/CometDeltaNativeScan.scala

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,12 +177,26 @@ object CometDeltaNativeScan extends CometOperatorSerde[CometScanExec] with Loggi
177177
if (DeltaReflection.isBatchFileIndex(relation.location)) {
178178
DeltaReflection.extractBatchAddFiles(relation.location) match {
179179
case Some(addFiles) if addFiles.forall(!_.hasDeletionVector) =>
180+
// Under column mapping, Delta stores partition values in AddFile
181+
// keyed by the physical column name. Build the physical->logical
182+
// map from the relation's partition schema (Delta stashes
183+
// `delta.columnMapping.physicalName` in each StructField's
184+
// metadata) so `buildTaskListFromAddFiles` can translate keys
185+
// before they reach the proto.
186+
val physToLogical = relation.partitionSchema.fields.flatMap { f =>
187+
if (f.metadata.contains("delta.columnMapping.physicalName")) {
188+
Some(f.metadata.getString("delta.columnMapping.physicalName") -> f.name)
189+
} else {
190+
None
191+
}
192+
}.toMap
180193
buildTaskListFromAddFiles(
181194
tableRoot,
182195
snapshotVersion,
183196
addFiles,
184197
nativeOp = null,
185-
columnNames).toByteArray
198+
columnNames,
199+
physicalToLogicalPartitionNames = physToLogical).toByteArray
186200
case Some(_) =>
187201
// Phase 1 of the pre-materialized-index path: fall back when any
188202
// AddFile carries a DeletionVectorDescriptor. Phase 2 can apply the
@@ -218,7 +232,41 @@ object CometDeltaNativeScan extends CometOperatorSerde[CometScanExec] with Loggi
218232
return None
219233
}
220234
}
221-
val taskList = DeltaScanTaskList.parseFrom(taskListBytes)
235+
val taskList0 = DeltaScanTaskList.parseFrom(taskListBytes)
236+
// The kernel path populates `column_mappings` from kernel's schema metadata.
237+
// The pre-materialised-index path (`buildTaskListFromAddFiles`) doesn't have
238+
// that information yet, so re-derive the mapping from the relation's data
239+
// + partition schema -- each StructField carries
240+
// `delta.columnMapping.physicalName` in its metadata when the table uses
241+
// column mapping. Without this the native scan can't translate logical
242+
// column references to physical parquet column names and returns nulls.
243+
val taskList =
244+
if (!taskList0.getColumnMappingsList.isEmpty) {
245+
taskList0
246+
} else {
247+
val allFields = relation.dataSchema.fields ++ relation.partitionSchema.fields
248+
val logicalToPhysical = allFields.flatMap { f =>
249+
if (f.metadata.contains("delta.columnMapping.physicalName")) {
250+
Some(f.name -> f.metadata.getString("delta.columnMapping.physicalName"))
251+
} else {
252+
None
253+
}
254+
}
255+
if (logicalToPhysical.isEmpty) {
256+
taskList0
257+
} else {
258+
val b = DeltaScanTaskList.newBuilder(taskList0)
259+
logicalToPhysical.foreach { case (logical, physical) =>
260+
b.addColumnMappings(
261+
OperatorOuterClass.DeltaColumnMapping
262+
.newBuilder()
263+
.setLogicalName(logical)
264+
.setPhysicalName(physical)
265+
.build())
266+
}
267+
b.build()
268+
}
269+
}
222270

223271
// Phase 6 reader-feature gate. Kernel reports any Delta reader features that
224272
// are currently in use in this snapshot and that Comet's native path does NOT
@@ -445,7 +493,9 @@ object CometDeltaNativeScan extends CometOperatorSerde[CometScanExec] with Loggi
445493
snapshotVersion: Long,
446494
addFiles: Seq[DeltaReflection.ExtractedAddFile],
447495
nativeOp: AnyRef,
448-
columnNames: Array[String]): OperatorOuterClass.DeltaScanTaskList = {
496+
columnNames: Array[String],
497+
physicalToLogicalPartitionNames: Map[String, String] = Map.empty)
498+
: OperatorOuterClass.DeltaScanTaskList = {
449499
val tlBuilder = OperatorOuterClass.DeltaScanTaskList.newBuilder()
450500
tlBuilder.setTableRoot(tableRoot)
451501
if (snapshotVersion >= 0) tlBuilder.setSnapshotVersion(snapshotVersion)
@@ -462,7 +512,15 @@ object CometDeltaNativeScan extends CometOperatorSerde[CometScanExec] with Loggi
462512
taskBuilder.setFileSize(af.size)
463513
DeltaReflection.parseNumRecords(af.statsJson).foreach(taskBuilder.setRecordCount)
464514
af.partitionValues.foreach { case (k, v) =>
465-
val pvBuilder = OperatorOuterClass.DeltaPartitionValue.newBuilder().setName(k)
515+
// Under column mapping, Delta stores partition values keyed by the
516+
// PHYSICAL column name (e.g. `col-<uuid>-part`). Our partition_schema
517+
// on the wire uses LOGICAL names, and `build_delta_partitioned_files`
518+
// native-side matches by name. Translate when we have a physical
519+
// ->logical map (the kernel-path jni.rs already performs the same
520+
// translation for its own extraction).
521+
val logicalName = physicalToLogicalPartitionNames.getOrElse(k, k)
522+
val pvBuilder =
523+
OperatorOuterClass.DeltaPartitionValue.newBuilder().setName(logicalName)
466524
if (v != null) pvBuilder.setValue(v)
467525
taskBuilder.addPartitionValues(pvBuilder.build())
468526
}

0 commit comments

Comments
 (0)