Skip to content

Commit e7db848

Browse files
andygroveclaude
andcommitted
fix: Add Spark-compatible schema validation for native_datafusion scan (#3311)
Add schema validation in the native schema adapter that rejects type coercions and column resolutions that Spark's vectorized Parquet reader would reject, gated behind a new config `spark.comet.parquet.schemaValidation.enabled` (default: true). When enabled, the native scan rejects: - TimestampLTZ <-> TimestampNTZ conversions - Integer/float widening (Int32->Int64, Float32->Float64) unless schema evolution is enabled - String/binary to timestamp or numeric conversions - Scalar to complex type conversions - Duplicate fields in case-insensitive mode This allows 5 previously-ignored Spark SQL tests to pass with native_datafusion enabled. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 2f64b60 commit e7db848

12 files changed

Lines changed: 244 additions & 59 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,17 @@ object CometConf extends ShimCometConf {
157157
.booleanConf
158158
.createWithDefault(false)
159159

160+
val COMET_PARQUET_SCHEMA_VALIDATION_ENABLED: ConfigEntry[Boolean] =
161+
conf("spark.comet.parquet.schemaValidation.enabled")
162+
.category(CATEGORY_PARQUET)
163+
.doc(
164+
"Whether to enable Spark-compatible schema validation when reading Parquet files " +
165+
"with native_datafusion scan. When enabled, type coercions and column resolutions " +
166+
"that Spark's vectorized reader would reject will also be rejected by Comet, " +
167+
"throwing SparkException with compatible error messages.")
168+
.booleanConf
169+
.createWithDefault(true)
170+
160171
val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] =
161172
conf("spark.comet.parquet.respectFilterPushdown")
162173
.category(CATEGORY_PARQUET)

dev/diffs/3.5.8.diff

Lines changed: 4 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -2271,17 +2271,7 @@ index 8e88049f51e..49f2001dc6b 100644
22712271
val schema = StructType(Seq(
22722272
StructField("a", IntegerType, nullable = false)
22732273
))
2274-
@@ -1933,7 +1949,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2275-
}
2276-
}
2277-
2278-
- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") {
2279-
+ test("SPARK-25207: exception when duplicate fields in case-insensitive mode",
2280-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
2281-
withTempPath { dir =>
2282-
val count = 10
2283-
val tableName = "spark_25207"
2284-
@@ -1984,7 +2001,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2274+
@@ -1984,7 +2000,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
22852275
}
22862276
}
22872277

@@ -2331,27 +2321,7 @@ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/
23312321
index 8ed9ef1630e..f312174b182 100644
23322322
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
23332323
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
2334-
@@ -1064,7 +1064,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
2335-
}
2336-
}
2337-
2338-
- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") {
2339-
+ test("SPARK-35640: read binary as timestamp should throw schema incompatible error",
2340-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
2341-
val data = (1 to 4).map(i => Tuple1(i.toString))
2342-
val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType)))
2343-
2344-
@@ -1075,7 +1076,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
2345-
}
2346-
}
2347-
2348-
- test("SPARK-35640: int as long should throw schema incompatible error") {
2349-
+ test("SPARK-35640: int as long should throw schema incompatible error",
2350-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
2351-
val data = (1 to 4).map(i => Tuple1(i))
2352-
val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType)))
2353-
2354-
@@ -1345,7 +1347,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
2324+
@@ -1345,7 +1345,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
23552325
}
23562326
}
23572327

@@ -2365,17 +2335,7 @@ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/
23652335
index f6472ba3d9d..ce39ebb52e6 100644
23662336
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
23672337
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
2368-
@@ -185,7 +185,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
2369-
}
2370-
}
2371-
2372-
- test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") {
2373-
+ test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ",
2374-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
2375-
val data = (1 to 1000).map { i =>
2376-
val ts = new java.sql.Timestamp(i)
2377-
Row(ts)
2378-
@@ -998,7 +999,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
2338+
@@ -998,7 +998,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
23792339
}
23802340
}
23812341

@@ -2415,17 +2375,7 @@ index f6472ba3d9d..ce39ebb52e6 100644
24152375
checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
24162376
checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
24172377
checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0"))
2418-
@@ -1133,7 +1138,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
2419-
}
2420-
}
2421-
2422-
- test("row group skipping doesn't overflow when reading into larger type") {
2423-
+ test("row group skipping doesn't overflow when reading into larger type",
2424-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
2425-
withTempPath { path =>
2426-
Seq(0).toDF("a").write.parquet(path.toString)
2427-
// The vectorized and non-vectorized readers will produce different exceptions, we don't need
2428-
@@ -1148,7 +1154,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
2378+
@@ -1148,7 +1152,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
24292379
.where(s"a < ${Long.MaxValue}")
24302380
.collect()
24312381
}

native/core/src/execution/planner.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,6 +1053,8 @@ impl PhysicalPlanner {
10531053
default_values,
10541054
scan.session_timezone.as_str(),
10551055
scan.case_sensitive,
1056+
scan.schema_validation_enabled,
1057+
scan.schema_evolution_enabled,
10561058
self.session_ctx(),
10571059
scan.encryption_enabled,
10581060
)?;

native/core/src/execution/spark_config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ pub(crate) const COMET_TRACING_ENABLED: &str = "spark.comet.tracing.enabled";
2121
pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled";
2222
pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str = "spark.comet.explain.native.enabled";
2323
pub(crate) const COMET_MAX_TEMP_DIRECTORY_SIZE: &str = "spark.comet.maxTempDirectorySize";
24+
#[allow(dead_code)]
25+
pub(crate) const COMET_PARQUET_SCHEMA_VALIDATION_ENABLED: &str =
26+
"spark.comet.parquet.schemaValidation.enabled";
2427

2528
pub(crate) trait SparkConfig {
2629
fn get_bool(&self, name: &str) -> bool;

native/core/src/parquet/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -773,6 +773,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
773773
None,
774774
session_timezone.as_str(),
775775
case_sensitive != JNI_FALSE,
776+
false, // schema_validation_enabled - validation is done on the Java side
777+
false, // schema_evolution_enabled
776778
session_ctx,
777779
encryption_enabled,
778780
)?;

native/core/src/parquet/parquet_exec.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,16 @@ pub(crate) fn init_datasource_exec(
6868
default_values: Option<HashMap<usize, ScalarValue>>,
6969
session_timezone: &str,
7070
case_sensitive: bool,
71+
schema_validation_enabled: bool,
72+
schema_evolution_enabled: bool,
7173
session_ctx: &Arc<SessionContext>,
7274
encryption_enabled: bool,
7375
) -> Result<Arc<DataSourceExec>, ExecutionError> {
7476
let (table_parquet_options, spark_parquet_options) = get_options(
7577
session_timezone,
7678
case_sensitive,
79+
schema_validation_enabled,
80+
schema_evolution_enabled,
7781
&object_store_url,
7882
encryption_enabled,
7983
);
@@ -142,6 +146,8 @@ pub(crate) fn init_datasource_exec(
142146
fn get_options(
143147
session_timezone: &str,
144148
case_sensitive: bool,
149+
schema_validation_enabled: bool,
150+
schema_evolution_enabled: bool,
145151
object_store_url: &ObjectStoreUrl,
146152
encryption_enabled: bool,
147153
) -> (TableParquetOptions, SparkParquetOptions) {
@@ -153,6 +159,8 @@ fn get_options(
153159
SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false);
154160
spark_parquet_options.allow_cast_unsigned_ints = true;
155161
spark_parquet_options.case_sensitive = case_sensitive;
162+
spark_parquet_options.schema_validation_enabled = schema_validation_enabled;
163+
spark_parquet_options.schema_evolution_enabled = schema_evolution_enabled;
156164

157165
if encryption_enabled {
158166
table_parquet_options.crypto.configure_factory(

native/core/src/parquet/parquet_support.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ pub struct SparkParquetOptions {
7676
pub use_legacy_date_timestamp_or_ntz: bool,
7777
// Whether schema field names are case sensitive
7878
pub case_sensitive: bool,
79+
/// Whether to validate schema compatibility (type coercions) in a Spark-compatible way
80+
pub schema_validation_enabled: bool,
81+
/// Whether schema evolution (type widening) is enabled
82+
pub schema_evolution_enabled: bool,
7983
}
8084

8185
impl SparkParquetOptions {
@@ -88,6 +92,8 @@ impl SparkParquetOptions {
8892
use_decimal_128: false,
8993
use_legacy_date_timestamp_or_ntz: false,
9094
case_sensitive: false,
95+
schema_validation_enabled: true,
96+
schema_evolution_enabled: false,
9197
}
9298
}
9399

@@ -100,6 +106,8 @@ impl SparkParquetOptions {
100106
use_decimal_128: false,
101107
use_legacy_date_timestamp_or_ntz: false,
102108
case_sensitive: false,
109+
schema_validation_enabled: true,
110+
schema_evolution_enabled: false,
103111
}
104112
}
105113
}

0 commit comments

Comments
 (0)