Skip to content

Commit a2f8e54

Browse files
authored
feat: Drop native_comet as a valid option for COMET_NATIVE_SCAN_IMPL config (#3358)
1 parent 14cd6c9 commit a2f8e54

12 files changed

Lines changed: 56 additions & 141 deletions

File tree

.github/actions/java-test/action.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ inputs:
3232
scan_impl:
3333
description: 'The default Parquet scan implementation'
3434
required: false
35-
default: 'native_comet'
35+
default: 'auto'
3636
upload-test-reports:
3737
description: 'Whether to upload test results including coverage to GitHub'
3838
required: false

.github/workflows/pr_build_linux.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ jobs:
164164
- name: "Spark 3.4, JDK 11, Scala 2.12"
165165
java_version: "11"
166166
maven_opts: "-Pspark-3.4 -Pscala-2.12"
167-
scan_impl: "native_comet"
167+
scan_impl: "auto"
168168

169169
- name: "Spark 3.5.5, JDK 17, Scala 2.13"
170170
java_version: "17"
@@ -174,7 +174,7 @@ jobs:
174174
- name: "Spark 3.5.6, JDK 17, Scala 2.13"
175175
java_version: "17"
176176
maven_opts: "-Pspark-3.5 -Dspark.version=3.5.6 -Pscala-2.13"
177-
scan_impl: "native_comet"
177+
scan_impl: "auto"
178178

179179
- name: "Spark 3.5, JDK 17, Scala 2.12"
180180
java_version: "17"

.github/workflows/spark_sql_test.yml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,15 +116,12 @@ jobs:
116116
- {name: "sql_hive-3", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"}
117117
# Test combinations:
118118
# - auto scan: all Spark versions (3.4, 3.5, 4.0)
119-
# - native_comet: Spark 3.4, 3.5
120119
# - native_iceberg_compat: Spark 3.5 only
121120
config:
122121
- {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl: 'auto', scan-env: ''}
123122
- {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'auto', scan-env: ''}
124-
- {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto', scan-env: ''}
125-
- {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl: 'native_comet', scan-env: 'COMET_PARQUET_SCAN_IMPL=native_comet'}
126-
- {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'native_comet', scan-env: 'COMET_PARQUET_SCAN_IMPL=native_comet'}
127123
- {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'native_iceberg_compat', scan-env: 'COMET_PARQUET_SCAN_IMPL=native_iceberg_compat'}
124+
- {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto', scan-env: ''}
128125
# Skip sql_hive-1 for Spark 4.0 due to https://github.com/apache/datafusion-comet/issues/2946
129126
exclude:
130127
- config: {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto', scan-env: ''}

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,8 @@ object CometConf extends ShimCometConf {
124124
val COMET_NATIVE_SCAN_IMPL: ConfigEntry[String] = conf("spark.comet.scan.impl")
125125
.category(CATEGORY_SCAN)
126126
.doc(
127-
s"The implementation of Comet Native Scan to use. Available modes are `$SCAN_NATIVE_COMET`," +
127+
"The implementation of Comet Native Scan to use. Available modes are " +
128128
s"`$SCAN_NATIVE_DATAFUSION`, and `$SCAN_NATIVE_ICEBERG_COMPAT`. " +
129-
s"`$SCAN_NATIVE_COMET` (DEPRECATED - will be removed in a future release) is for the " +
130-
"original Comet native scan which uses a jvm based parquet file reader and native " +
131-
"column decoding. Supports simple types only. " +
132129
s"`$SCAN_NATIVE_DATAFUSION` is a fully native implementation of scan based on " +
133130
"DataFusion. " +
134131
s"`$SCAN_NATIVE_ICEBERG_COMPAT` is the recommended native implementation that " +
@@ -137,8 +134,7 @@ object CometConf extends ShimCometConf {
137134
.internal()
138135
.stringConf
139136
.transform(_.toLowerCase(Locale.ROOT))
140-
.checkValues(
141-
Set(SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT, SCAN_AUTO))
137+
.checkValues(Set(SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT, SCAN_AUTO))
142138
.createWithEnvVarOrDefault("COMET_PARQUET_SCAN_IMPL", SCAN_AUTO)
143139

144140
val COMET_ICEBERG_NATIVE_ENABLED: ConfigEntry[Boolean] =

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
185185
}
186186
}
187187

188-
test("basic data type support") {
188+
// ignored: native_comet scan is no longer supported
189+
ignore("basic data type support") {
189190
// this test requires native_comet scan due to unsigned u8/u16 issue
190191
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
191192
Seq(true, false).foreach { dictionaryEnabled =>
@@ -216,7 +217,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
216217
}
217218
}
218219

219-
test("uint data type support") {
220+
// ignored: native_comet scan is no longer supported
221+
ignore("uint data type support") {
220222
// this test requires native_comet scan due to unsigned u8/u16 issue
221223
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
222224
Seq(true, false).foreach { dictionaryEnabled =>
@@ -1503,7 +1505,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
15031505
}
15041506
}
15051507

1506-
test("round") {
1508+
// ignored: native_comet scan is no longer supported
1509+
ignore("round") {
15071510
// https://github.com/apache/datafusion-comet/issues/1441
15081511
assume(usingLegacyNativeCometScan)
15091512
Seq(true, false).foreach { dictionaryEnabled =>
@@ -1567,7 +1570,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
15671570
}
15681571
}
15691572

1570-
test("hex") {
1573+
// ignored: native_comet scan is no longer supported
1574+
ignore("hex") {
15711575
// https://github.com/apache/datafusion-comet/issues/1441
15721576
assume(usingLegacyNativeCometScan)
15731577
Seq(true, false).foreach { dictionaryEnabled =>
@@ -2781,7 +2785,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
27812785
}
27822786
}
27832787

2784-
test("test integral divide") {
2788+
// ignored: native_comet scan is no longer supported
2789+
ignore("test integral divide") {
27852790
// this test requires native_comet scan due to unsigned u8/u16 issue
27862791
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
27872792
Seq(true, false).foreach { dictionaryEnabled =>

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,8 @@ class CometExecSuite extends CometTestBase {
382382
}
383383
}
384384

385-
test("ReusedExchangeExec should work on CometBroadcastExchangeExec with V2 scan") {
385+
// ignored: native_comet scan is no longer supported
386+
ignore("ReusedExchangeExec should work on CometBroadcastExchangeExec with V2 scan") {
386387
withSQLConf(
387388
CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true",
388389
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET,

spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,8 @@ class CometParquetWriterSuite extends CometTestBase {
310310
}
311311
}
312312

313-
test("native write falls back when scan produces non-Arrow data") {
313+
// ignored: native_comet scan is no longer supported
314+
ignore("native write falls back when scan produces non-Arrow data") {
314315
// This test verifies that when a native scan (like native_comet) doesn't support
315316
// certain data types (complex types), the native write correctly falls back to Spark
316317
// instead of failing at runtime with "Comet execution only takes Arrow Arrays" error.

spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ abstract class ParquetReadSuite extends CometTestBase {
8585
}
8686
}
8787

88-
test("unsupported Spark types") {
88+
// ignored: native_comet scan is no longer supported
89+
ignore("unsupported Spark types") {
8990
// TODO this test is not correctly implemented for scan implementations other than SCAN_NATIVE_COMET
9091
// https://github.com/apache/datafusion-comet/issues/2188
9192
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
@@ -130,7 +131,8 @@ abstract class ParquetReadSuite extends CometTestBase {
130131
}
131132
}
132133

133-
test("unsupported Spark schema") {
134+
// ignored: native_comet scan is no longer supported
135+
ignore("unsupported Spark schema") {
134136
// TODO this test is not correctly implemented for scan implementations other than SCAN_NATIVE_COMET
135137
// https://github.com/apache/datafusion-comet/issues/2188
136138
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
@@ -368,7 +370,8 @@ abstract class ParquetReadSuite extends CometTestBase {
368370
checkParquetFile(data)
369371
}
370372

371-
test("test multiple pages with different sizes and nulls") {
373+
// ignored: native_comet scan is no longer supported
374+
ignore("test multiple pages with different sizes and nulls") {
372375
def makeRawParquetFile(
373376
path: Path,
374377
dictionaryEnabled: Boolean,
@@ -1344,7 +1347,8 @@ abstract class ParquetReadSuite extends CometTestBase {
13441347
}
13451348
}
13461349

1347-
test("scan metrics") {
1350+
// ignored: native_comet scan is no longer supported
1351+
ignore("scan metrics") {
13481352

13491353
val cometScanMetricNames = Seq(
13501354
"ParquetRowGroups",
@@ -1866,8 +1870,7 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {
18661870

18671871
test("Test V1 parquet scan uses respective scanner") {
18681872
Seq(
1869-
("false", CometConf.SCAN_NATIVE_COMET, "FileScan parquet"),
1870-
("true", CometConf.SCAN_NATIVE_COMET, "CometScan [native_comet] parquet"),
1873+
("false", CometConf.SCAN_NATIVE_DATAFUSION, "FileScan parquet"),
18711874
("true", CometConf.SCAN_NATIVE_DATAFUSION, "CometNativeScan"),
18721875
("true", CometConf.SCAN_NATIVE_ICEBERG_COMPAT, "CometScan [native_iceberg_compat] parquet"))
18731876
.foreach { case (cometEnabled, cometNativeScanImpl, expectedScanner) =>
@@ -2014,10 +2017,11 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {
20142017

20152018
}
20162019

2020+
// ignored: native_comet scan is no longer supported
20172021
class ParquetReadV2Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {
20182022
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
20192023
pos: Position): Unit = {
2020-
super.test(testName, testTags: _*)(
2024+
super.ignore(testName, testTags: _*)(
20212025
withSQLConf(
20222026
SQLConf.USE_V1_SOURCE_LIST.key -> "",
20232027
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
@@ -2040,7 +2044,8 @@ class ParquetReadV2Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {
20402044
}
20412045
}
20422046

2043-
test("Test V2 parquet scan uses respective scanner") {
2047+
// ignored: native_comet scan is no longer supported
2048+
ignore("Test V2 parquet scan uses respective scanner") {
20442049
Seq(("false", "BatchScan"), ("true", "CometBatchScan")).foreach {
20452050
case (cometEnabled, expectedScanner) =>
20462051
testScanner(

spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ class CometScanRuleSuite extends CometTestBase {
101101
}
102102
}
103103

104-
test("CometScanRule should replace V2 BatchScanExec, but only when Comet is enabled") {
104+
// ignored: native_comet scan is no longer supported
105+
ignore("CometScanRule should replace V2 BatchScanExec, but only when Comet is enabled") {
105106
withTempPath { path =>
106107
createTestDataFrame.write.parquet(path.toString)
107108
withTempView("test_data") {

spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala

Lines changed: 1 addition & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.types._
3838
import org.apache.spark.sql.vectorized.ColumnVector
3939

4040
import org.apache.comet.{CometConf, WithHdfsCluster}
41-
import org.apache.comet.CometConf.{SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT}
41+
import org.apache.comet.CometConf.{SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT}
4242
import org.apache.comet.parquet.BatchReader
4343

4444
/**
@@ -67,14 +67,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
6767
spark.sql(s"select $query from parquetV1Table").noop()
6868
}
6969

70-
sqlBenchmark.addCase("SQL Parquet - Comet") { _ =>
71-
withSQLConf(
72-
CometConf.COMET_ENABLED.key -> "true",
73-
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) {
74-
spark.sql(s"select $query from parquetV1Table").noop()
75-
}
76-
}
77-
7870
sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
7971
withSQLConf(
8072
CometConf.COMET_ENABLED.key -> "true",
@@ -175,21 +167,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
175167
}
176168
}
177169

178-
sqlBenchmark.addCase("SQL Parquet - Comet") { _ =>
179-
withSQLConf(
180-
"spark.memory.offHeap.enabled" -> "true",
181-
"spark.memory.offHeap.size" -> "10g",
182-
CometConf.COMET_ENABLED.key -> "true",
183-
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET,
184-
DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> cryptoFactoryClass,
185-
KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
186-
"org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
187-
InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
188-
s"footerKey: ${footerKey}, key1: ${key1}") {
189-
spark.sql(s"select $query from parquetV1Table").noop()
190-
}
191-
}
192-
193170
sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
194171
withSQLConf(
195172
"spark.memory.offHeap.enabled" -> "true",
@@ -245,14 +222,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
245222
spark.sql("select sum(id) from parquetV1Table").noop()
246223
}
247224

248-
sqlBenchmark.addCase("SQL Parquet - Comet") { _ =>
249-
withSQLConf(
250-
CometConf.COMET_ENABLED.key -> "true",
251-
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) {
252-
spark.sql("select sum(id) from parquetV1Table").noop()
253-
}
254-
}
255-
256225
sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
257226
withSQLConf(
258227
CometConf.COMET_ENABLED.key -> "true",
@@ -373,14 +342,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
373342
spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 0").noop()
374343
}
375344

376-
benchmark.addCase("SQL Parquet - Comet") { _ =>
377-
withSQLConf(
378-
CometConf.COMET_ENABLED.key -> "true",
379-
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) {
380-
spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 0").noop()
381-
}
382-
}
383-
384345
benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
385346
withSQLConf(
386347
CometConf.COMET_ENABLED.key -> "true",
@@ -431,14 +392,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
431392
spark.sql("select sum(length(id)) from parquetV1Table").noop()
432393
}
433394

434-
sqlBenchmark.addCase("SQL Parquet - Comet") { _ =>
435-
withSQLConf(
436-
CometConf.COMET_ENABLED.key -> "true",
437-
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) {
438-
spark.sql("select sum(length(id)) from parquetV1Table").noop()
439-
}
440-
}
441-
442395
sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
443396
withSQLConf(
444397
CometConf.COMET_ENABLED.key -> "true",
@@ -482,17 +435,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
482435
.noop()
483436
}
484437

485-
benchmark.addCase("SQL Parquet - Comet") { _ =>
486-
withSQLConf(
487-
CometConf.COMET_ENABLED.key -> "true",
488-
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) {
489-
spark
490-
.sql("select sum(length(c2)) from parquetV1Table where c1 is " +
491-
"not NULL and c2 is not NULL")
492-
.noop()
493-
}
494-
}
495-
496438
benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
497439
withSQLConf(
498440
CometConf.COMET_ENABLED.key -> "true",
@@ -538,14 +480,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
538480
spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop()
539481
}
540482

541-
benchmark.addCase("SQL Parquet - Comet") { _ =>
542-
withSQLConf(
543-
CometConf.COMET_ENABLED.key -> "true",
544-
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) {
545-
spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop()
546-
}
547-
}
548-
549483
benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
550484
withSQLConf(
551485
CometConf.COMET_ENABLED.key -> "true",
@@ -589,14 +523,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
589523
spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
590524
}
591525

592-
benchmark.addCase("SQL Parquet - Comet") { _ =>
593-
withSQLConf(
594-
CometConf.COMET_ENABLED.key -> "true",
595-
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) {
596-
spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
597-
}
598-
}
599-
600526
benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
601527
withSQLConf(
602528
CometConf.COMET_ENABLED.key -> "true",
@@ -640,14 +566,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
640566
spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
641567
}
642568

643-
benchmark.addCase("SQL Parquet - Comet") { _ =>
644-
withSQLConf(
645-
CometConf.COMET_ENABLED.key -> "true",
646-
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) {
647-
spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
648-
}
649-
}
650-
651569
benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
652570
withSQLConf(
653571
CometConf.COMET_ENABLED.key -> "true",

0 commit comments

Comments
 (0)