Skip to content

Commit ba0ebf2

Browse files
andygroveclaude
andcommitted
fix: fall back to native_iceberg_compat for row index columns in native_datafusion scan
When the native_datafusion scan encounters row index metadata columns (_tmp_metadata_row_index), fall back to native_iceberg_compat instead of falling back to Spark. This fixes the issue where row index columns were not being populated when using the native_datafusion scan mode. Closes #3317 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 2f64b60 commit ba0ebf2

2 files changed

Lines changed: 5 additions & 45 deletions

File tree

dev/diffs/3.5.8.diff

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2138,49 +2138,6 @@ index 5e01d3f447c..284d6657d4f 100644
21382138
withTempDir { dir =>
21392139
val readSchema =
21402140
new StructType()
2141-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
2142-
index c10e1799702..ba6629abfd9 100644
2143-
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
2144-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
2145-
@@ -16,7 +16,7 @@
2146-
*/
2147-
package org.apache.spark.sql.execution.datasources.parquet
2148-
2149-
-import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest}
2150-
+import org.apache.spark.sql.{AnalysisException, DataFrame, IgnoreCometNativeDataFusion, QueryTest}
2151-
import org.apache.spark.sql.execution.datasources.FileFormat
2152-
import org.apache.spark.sql.functions.{col, lit}
2153-
import org.apache.spark.sql.internal.SQLConf
2154-
@@ -154,7 +154,8 @@ class ParquetFileMetadataStructRowIndexSuite extends QueryTest with SharedSparkS
2155-
}
2156-
}
2157-
2158-
- test(s"reading ${ROW_INDEX_TEMPORARY_COLUMN_NAME} - not present in a table") {
2159-
+ test(s"reading ${ROW_INDEX_TEMPORARY_COLUMN_NAME} - not present in a table",
2160-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3317")) {
2161-
// File format supporting row index generation populates the column with row indexes.
2162-
withReadDataFrame("parquet", extraSchemaFields =
2163-
Seq(StructField(ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType))) { df =>
2164-
@@ -172,7 +173,8 @@ class ParquetFileMetadataStructRowIndexSuite extends QueryTest with SharedSparkS
2165-
}
2166-
}
2167-
2168-
- test(s"reading ${ROW_INDEX_TEMPORARY_COLUMN_NAME} - present in a table") {
2169-
+ test(s"reading ${ROW_INDEX_TEMPORARY_COLUMN_NAME} - present in a table",
2170-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3317")) {
2171-
withReadDataFrame("parquet", extraCol = ROW_INDEX_TEMPORARY_COLUMN_NAME) { df =>
2172-
// Values of ROW_INDEX_TEMPORARY_COLUMN_NAME column are always populated with
2173-
// generated row indexes, rather than read from the file.
2174-
@@ -189,7 +191,8 @@ class ParquetFileMetadataStructRowIndexSuite extends QueryTest with SharedSparkS
2175-
}
2176-
}
2177-
2178-
- test(s"reading ${ROW_INDEX_TEMPORARY_COLUMN_NAME} - as partition col") {
2179-
+ test(s"reading ${ROW_INDEX_TEMPORARY_COLUMN_NAME} - as partition col",
2180-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3317")) {
2181-
withReadDataFrame("parquet", partitionCol = ROW_INDEX_TEMPORARY_COLUMN_NAME) { df =>
2182-
// Column values are set for each partition, rather than populated with generated row indexes.
2183-
assert(df
21842141
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
21852142
index 8e88049f51e..49f2001dc6b 100644
21862143
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,11 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
198198
return None
199199
}
200200
if (ShimFileFormat.findRowIndexColumnIndexInSchema(scanExec.requiredSchema) >= 0) {
201-
withInfo(scanExec, "Native DataFusion scan does not support row index generation")
202-
return None
201+
withInfo(
202+
scanExec,
203+
"Native DataFusion scan does not support row index generation," +
204+
" falling back to native_iceberg_compat")
205+
return nativeIcebergCompatScan(session, scanExec, r, hadoopConf)
203206
}
204207
if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) {
205208
return None

0 commit comments

Comments
 (0)