Skip to content

Commit 6b276b1

Browse files
schenksjclaude
andcommitted
fix(delta): code review hardening + CI workflow registration
Code review follow-ups (#27-#35): - Replace unsafe `.get` with safe `flatMap(find)` in row-tracking Project builder - Replace catch-all `Throwable` with `NonFatal` in Delta reflection paths - Guard synthetic column names against user-table collisions - Short-circuit row-tracking rewrite when enableRowTracking=false - Document why per-file PD splitting is correctness-required - Centralise `PhysicalNameMetadataKey` constant in DeltaReflection - Use `pd.copy` for Spark 3.x/4.0 cross-compilation - Make dev scripts honor DELTA_DIR/DELTA_WORKDIR/TMPDIR env vars Operational improvements: - Fall back on Parquet encryption (not wired through Delta exec path) - Recompute DPP pruning at execution time so AQE broadcast results actually prune partitions instead of reading all files - Add CometDeltaRowTrackingSuite (unmaterialised + post-MERGE tests) CI/DevOps (per PR feedback): - Register all Delta test suites in pr_build_linux.yml and pr_build_macos.yml - Add `permissions: contents: read` to delta_regression_test.yml - Add DeltaReadFromS3Suite to check-suites.py ignore list - Fix scalastyle locale violations Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 9e88ac0 commit 6b276b1

13 files changed

Lines changed: 233 additions & 40 deletions

.github/workflows/delta_regression_test.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ on:
4545
# manual trigger
4646
workflow_dispatch:
4747

48+
permissions:
49+
contents: read
50+
4851
env:
4952
RUST_VERSION: stable
5053
RUST_BACKTRACE: 1

.github/workflows/pr_build_linux.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,11 @@ jobs:
290290
org.apache.spark.sql.comet.ParquetEncryptionITCase
291291
org.apache.comet.exec.CometNativeReaderSuite
292292
org.apache.comet.CometIcebergNativeSuite
293+
org.apache.comet.CometDeltaNativeSuite
294+
org.apache.comet.CometDeltaColumnMappingSuite
295+
org.apache.comet.CometDeltaAdvancedSuite
296+
org.apache.comet.CometDeltaRowTrackingSuite
297+
org.apache.comet.CometFuzzDeltaSuite
293298
- name: "csv"
294299
value: |
295300
org.apache.comet.csv.CometCsvNativeReadSuite

.github/workflows/pr_build_macos.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,11 @@ jobs:
171171
org.apache.spark.sql.comet.ParquetEncryptionITCase
172172
org.apache.comet.exec.CometNativeReaderSuite
173173
org.apache.comet.CometIcebergNativeSuite
174+
org.apache.comet.CometDeltaNativeSuite
175+
org.apache.comet.CometDeltaColumnMappingSuite
176+
org.apache.comet.CometDeltaAdvancedSuite
177+
org.apache.comet.CometDeltaRowTrackingSuite
178+
org.apache.comet.CometFuzzDeltaSuite
174179
- name: "csv"
175180
value: |
176181
org.apache.comet.csv.CometCsvNativeReadSuite

dev/ci/check-suites.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def file_to_class_name(path: Path) -> str | None:
3535
"org.apache.comet.parquet.ParquetReadSuite", # abstract
3636
"org.apache.comet.parquet.ParquetReadFromS3Suite", # manual test suite
3737
"org.apache.comet.IcebergReadFromS3Suite", # manual test suite
38+
"org.apache.comet.DeltaReadFromS3Suite", # manual test suite
3839
"org.apache.spark.sql.comet.CometPlanStabilitySuite", # abstract
3940
"org.apache.spark.sql.comet.ParquetDatetimeRebaseSuite", # abstract
4041
"org.apache.comet.exec.CometColumnarShuffleSuite" # abstract

dev/run-delta-regression-targeted.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
set -uo pipefail
3131

3232
REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)"
33-
DELTA_DIR="/private/var/folders/80/_42csh0j4gqgbny139xqwpym0000gn/T/delta-regression-3.3.2"
33+
DELTA_VERSION="${DELTA_VERSION:-3.3.2}"
34+
DELTA_DIR="${DELTA_DIR:-${DELTA_WORKDIR:-${TMPDIR:-/tmp}/delta-regression-${DELTA_VERSION}}}"
3435
export JAVA_HOME="${JAVA_HOME:-$HOME/jdks/jdk-17.0.18+8/Contents/Home}"
3536
export SPARK_LOCAL_IP=127.0.0.1
3637
export RUST_BACKTRACE=1

dev/run-delta-test.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
# target/delta-regression-logs/test-<timestamp>.log (relative to this repo).
2727
set -euo pipefail
2828
REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)"
29-
DELTA_DIR="/private/var/folders/80/_42csh0j4gqgbny139xqwpym0000gn/T/delta-regression-3.3.2"
29+
DELTA_VERSION="${DELTA_VERSION:-3.3.2}"
30+
DELTA_DIR="${DELTA_DIR:-${DELTA_WORKDIR:-${TMPDIR:-/tmp}/delta-regression-${DELTA_VERSION}}}"
3031
export JAVA_HOME="${JAVA_HOME:-$HOME/jdks/jdk-17.0.18+8/Contents/Home}"
3132
export SPARK_LOCAL_IP=127.0.0.1
3233
export RUST_BACKTRACE=1

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class CometSparkSessionExtensions
8484
"false")
8585
}
8686
} catch {
87-
case _: Throwable => // delta-spark not on classpath; ignore
87+
case scala.util.control.NonFatal(_) => // delta-spark not on classpath; ignore
8888
}
8989
plan
9090
}

spark/src/main/scala/org/apache/comet/delta/DeltaReflection.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ object DeltaReflection extends Logging {
161161
}
162162
}
163163

164+
/** StructField metadata key under which Delta stores the column-mapping physical name. */
165+
val PhysicalNameMetadataKey: String = "delta.columnMapping.physicalName"
166+
164167
/** Property key for the physical column name Delta materialises row IDs into. */
165168
val MaterializedRowIdColumnProp: String =
166169
"delta.rowTracking.materializedRowIdColumnName"
@@ -379,11 +382,11 @@ object DeltaReflection extends Logging {
379382
val nil = scala.collection.immutable.Nil
380383
try Option(m.invoke(location, nil, nil))
381384
catch {
382-
case _: Throwable => None
385+
case scala.util.control.NonFatal(_) => None
383386
}
384387
}
385388
} catch {
386-
case _: Throwable => None
389+
case scala.util.control.NonFatal(_) => None
387390
}
388391
}
389392

spark/src/main/scala/org/apache/comet/delta/RowTrackingAugmentedFileIndex.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,13 @@ class RowTrackingAugmentedFileIndex(
7070
* Delegates listing to the underlying FileIndex, then splits each returned `PartitionDirectory`
7171
* into one-per-file directories, each carrying the original partition values PLUS the per-file
7272
* baseRowId and defaultRowCommitVersion.
73+
*
74+
* The per-file split is unavoidable for correctness: `AddFile.baseRowId` is unique per file, so
75+
* two files that share a Delta partition cannot share a `PartitionDirectory` once we inject the
76+
* per-file synthetic columns. Scheduling parallelism is unaffected -- `FileSourceScanExec`
77+
* flattens all PDs' files into `PartitionedFile`s and bin-packs them by `maxSplitBytes`, so PD
78+
* granularity only governs how partition values get serialised with each file, not the number
79+
* of tasks.
7380
*/
7481
override def listFiles(
7582
partitionFilters: scala.collection.Seq[Expression],
@@ -80,7 +87,11 @@ class RowTrackingAugmentedFileIndex(
8087
val info = infoByFileName.getOrElse(
8188
fileStatus.getPath.getName,
8289
DeltaReflection.RowTrackingFileInfo(None, None))
83-
PartitionDirectory(augmentPartitionValues(pd.values, info), Seq(fileStatus))
90+
// Use `pd.copy(...)` rather than `PartitionDirectory.apply(...)` so this
91+
// compiles against both Spark 3.x (files: Seq[FileStatus]) and Spark 4.0
92+
// (files: Seq[FileStatusWithMetadata]) without a per-version shim -- we
93+
// round-trip the same element type we got from `pd.files`.
94+
pd.copy(values = augmentPartitionValues(pd.values, info), files = Seq(fileStatus))
8495
}
8596
}
8697
}

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -404,8 +404,15 @@ case class CometScanRule(session: SparkSession)
404404
}
405405
val hadoopConf = Option(hadoopConfOrNull).getOrElse(
406406
r.sparkSession.sessionState.newHadoopConfWithOptions(r.options))
407-
if (encryptionEnabled(hadoopConf) && !isEncryptionConfigSupported(hadoopConf)) {
408-
withInfo(scanExec, s"Native Delta scan does not support encryption")
407+
// Parquet encryption: the Delta native scan path does not currently thread
408+
// `broadcastedHadoopConfForEncryption` / `encryptedFilePaths` through to
409+
// `CometExecRDD` the way `CometNativeScanExec` does, so any encrypted read
410+
// would fail at decrypt time. Fall back to Spark+Delta whenever encryption
411+
// is enabled, regardless of whether the config is otherwise supported.
412+
if (encryptionEnabled(hadoopConf)) {
413+
withInfo(
414+
scanExec,
415+
"Native Delta scan does not yet wire Parquet encryption through to executors")
409416
return None
410417
}
411418
if (!isSchemaSupported(scanExec, SCAN_NATIVE_DELTA_COMPAT, r)) {
@@ -493,6 +500,16 @@ case class CometScanRule(session: SparkSession)
493500
if (!hasRowIdField) return None
494501

495502
val cfg = DeltaReflection.extractMetadataConfiguration(r).getOrElse(Map.empty)
503+
// If the table explicitly has row tracking disabled, there is nothing to rewrite -- the
504+
// scan references `row_id` / `row_commit_version` but the table isn't producing them.
505+
// Fall back rather than spending reflection effort on per-file baseRowId lookups.
506+
if (cfg.get("delta.enableRowTracking").exists(_.equalsIgnoreCase("false"))) {
507+
withInfo(
508+
scanExec,
509+
"Native Delta scan: row-tracking columns requested but table has " +
510+
"delta.enableRowTracking=false; falling back.")
511+
return Some(None)
512+
}
496513
val rowIdPhysical = cfg.get(DeltaReflection.MaterializedRowIdColumnProp)
497514
val rowVerPhysical = cfg.get(DeltaReflection.MaterializedRowCommitVersionColumnProp)
498515

@@ -559,6 +576,27 @@ case class CometScanRule(session: SparkSession)
559576
}
560577
val needSynth = includeRowIdSynth || includeRowVerSynth
561578

579+
// Guard against collisions: if the underlying table already has columns named
580+
// the same as any synthetic we are about to introduce, decline native
581+
// acceleration rather than silently shadow the user's data.
582+
if (needSynth) {
583+
import java.util.Locale
584+
val existingNames =
585+
(r.dataSchema.fieldNames ++ r.partitionSchema.fieldNames)
586+
.map(_.toLowerCase(Locale.ROOT))
587+
.toSet
588+
val syntheticNames = Seq(RowIndexColName, BaseRowIdColName, DefaultRowCommitVersionColName)
589+
val collisions =
590+
syntheticNames.filter(n => existingNames.contains(n.toLowerCase(Locale.ROOT)))
591+
if (collisions.nonEmpty) {
592+
withInfo(
593+
scanExec,
594+
s"Native Delta scan: table has columns that collide with Comet row-tracking " +
595+
s"synthetic columns (${collisions.mkString(", ")}); falling back.")
596+
return Some(None)
597+
}
598+
}
599+
562600
val infoByFileName: Map[String, DeltaReflection.RowTrackingFileInfo] =
563601
if (needSynth) DeltaReflection.extractRowTrackingInfoByFileName(r.location)
564602
else Map.empty
@@ -633,22 +671,18 @@ case class CometScanRule(session: SparkSession)
633671
val cometScan = CometScanExec(newScan, session, SCAN_NATIVE_DELTA_COMPAT)
634672

635673
val projectExprs = origOutput.map { a =>
636-
renameMap.get(a.name) match {
637-
case Some(phys) if a.name.equalsIgnoreCase(RowIdName) && includeRowIdSynth =>
638-
val physAttr = baseNewOutput.find(_.name == phys).get
639-
// row_id = coalesce(materialized, base_row_id + row_index)
674+
renameMap.get(a.name).flatMap(phys => baseNewOutput.find(_.name == phys)) match {
675+
case Some(physAttr) if a.name.equalsIgnoreCase(RowIdName) && includeRowIdSynth =>
640676
val synth = Add(baseRowIdAttr, rowIndexAttr)
641677
Alias(Coalesce(Seq(physAttr, synth)), a.name)(
642678
exprId = a.exprId,
643679
qualifier = a.qualifier)
644-
case Some(phys) if a.name.equalsIgnoreCase(RowCommitVersionName) && includeRowVerSynth =>
645-
val physAttr = baseNewOutput.find(_.name == phys).get
646-
// row_commit_version = coalesce(materialized, default_row_commit_version)
680+
case Some(physAttr)
681+
if a.name.equalsIgnoreCase(RowCommitVersionName) && includeRowVerSynth =>
647682
Alias(Coalesce(Seq(physAttr, defaultVerAttr)), a.name)(
648683
exprId = a.exprId,
649684
qualifier = a.qualifier)
650-
case Some(phys) =>
651-
val physAttr = baseNewOutput.find(_.name == phys).get
685+
case Some(physAttr) =>
652686
Alias(physAttr, a.name)(exprId = a.exprId, qualifier = a.qualifier)
653687
case None => a
654688
}

0 commit comments

Comments
 (0)