Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 31 additions & 51 deletions dev/diffs/4.0.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ index 81713c777bc..b5f92ed9742 100644
assert(exchanges.size == 2)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index 2c24cc7d570..63047ef482e 100644
index 2c24cc7d570..753737a1057 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
Expand Down Expand Up @@ -640,17 +640,7 @@ index 2c24cc7d570..63047ef482e 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withTable("large", "dimTwo", "dimThree") {
@@ -1151,7 +1162,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

- test("join key with multiple references on the filtering plan") {
+ test("join key with multiple references on the filtering plan",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3321")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName,
SQLConf.ANSI_ENABLED.key -> "false" // ANSI mode doesn't support "String + String"
@@ -1204,10 +1216,16 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1204,10 +1215,16 @@ abstract class DynamicPartitionPruningSuiteBase

val plan = df.queryExecution.executedPlan
val countSubqueryBroadcasts =
Expand All @@ -669,7 +659,7 @@ index 2c24cc7d570..63047ef482e 100644

assert(countSubqueryBroadcasts == 1)
assert(countReusedSubqueryBroadcasts == 1)
@@ -1215,7 +1233,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1215,7 +1232,8 @@ abstract class DynamicPartitionPruningSuiteBase
}

test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
Expand All @@ -679,15 +669,15 @@ index 2c24cc7d570..63047ef482e 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
@@ -1330,6 +1349,7 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1330,6 +1348,7 @@ abstract class DynamicPartitionPruningSuiteBase
}

test("Subquery reuse across the whole plan",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3321"),
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313"),
DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
@@ -1424,7 +1444,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1424,7 +1443,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

Expand All @@ -697,15 +687,15 @@ index 2c24cc7d570..63047ef482e 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
""" WITH v as (
@@ -1578,6 +1599,7 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1578,6 +1598,7 @@ abstract class DynamicPartitionPruningSuiteBase

val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) {
case s: SubqueryBroadcastExec => s
+ case s: CometSubqueryBroadcastExec => s
}
assert(subqueryBroadcastExecs.size === 1)
subqueryBroadcastExecs.foreach { subqueryBroadcastExec =>
@@ -1730,6 +1752,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
@@ -1730,6 +1751,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))
Expand Down Expand Up @@ -742,7 +732,7 @@ index 9c90e0105a4..fadf2f0f698 100644

test("SPARK-35884: Explain Formatted") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 9c529d14221..6d5db65b5d8 100644
index 9c529d14221..a046f1ed1ca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
Expand All @@ -761,7 +751,7 @@ index 9c529d14221..6d5db65b5d8 100644
- testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") {
+ val ignoreMissingTags: Seq[org.scalatest.Tag] = if (format == "parquet") {
+ Seq(IgnoreCometNativeDataFusion(
+ "https://github.com/apache/datafusion-comet/issues/3321"))
+ "https://github.com/apache/datafusion-comet/issues/3314"))
+ } else Seq.empty
+ test(s"Enabling/disabling ignoreMissingFiles using $format", ignoreMissingTags: _*) { quietly {
def testIgnoreMissingFiles(options: Map[String, String]): Unit = {
Expand Down Expand Up @@ -1329,10 +1319,10 @@ index 0df7f806272..92390bd819f 100644

test("non-matching optional group") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 2e33f6505ab..6d4a75d02ff 100644
index 2e33f6505ab..3a8b154b565 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -23,12 +23,14 @@ import org.apache.spark.SparkRuntimeException
@@ -23,11 +23,13 @@ import org.apache.spark.SparkRuntimeException
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LogicalPlan, Project, Sort, Union}
Expand All @@ -1343,11 +1333,10 @@ index 2e33f6505ab..6d4a75d02ff 100644
-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, BroadcastNestedLoopJoinExec}
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.IgnoreCometNativeDataFusion
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession

class SubquerySuite extends QueryTest
@@ -1529,6 +1531,18 @@ class SubquerySuite extends QueryTest
fs.inputRDDs().forall(
_.asInstanceOf[FileScanRDD].filePartitions.forall(
Expand Down Expand Up @@ -1382,7 +1371,7 @@ index 2e33f6505ab..6d4a75d02ff 100644

- test("SPARK-43402: FileSourceScanExec supports push down data filter with scalar subquery") {
+ test("SPARK-43402: FileSourceScanExec supports push down data filter with scalar subquery",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3321")) {
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) {
def checkFileSourceScan(query: String, answer: Seq[Row]): Unit = {
val df = sql(query)
checkAnswer(df, answer)
Expand Down Expand Up @@ -1902,7 +1891,7 @@ index 47679ed7865..9ffbaecb98e 100644
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index 77a988f340e..e4deeb6b1d8 100644
index 77a988f340e..263208a67d9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -1061,7 +1061,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
Expand All @@ -1911,7 +1900,7 @@ index 77a988f340e..e4deeb6b1d8 100644

- test("alter temporary view should follow current storeAnalyzedPlanForView config") {
+ test("alter temporary view should follow current storeAnalyzedPlanForView config",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3321")) {
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) {
withTable("t") {
Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
withView("v1") {
Expand Down Expand Up @@ -2898,7 +2887,7 @@ index 4474ec1fd42..05fa0257c82 100644
checkAnswer(
// "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index bba71f1c48d..e1b0c25a354 100644
index bba71f1c48d..faee9b4ce83 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
Expand All @@ -2925,7 +2914,7 @@ index bba71f1c48d..e1b0c25a354 100644

- test("Enabling/disabling ignoreCorruptFiles") {
+ test("Enabling/disabling ignoreCorruptFiles",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3321")) {
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) {
def testIgnoreCorruptFiles(options: Map[String, String]): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
Expand Down Expand Up @@ -3122,7 +3111,7 @@ index 0acb21f3e6f..1f9c3fd13fc 100644

withTempPath { dir =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
index 09ed6955a51..98e313cddd4 100644
index 09ed6955a51..5cd856ff7b6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
@@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter
Expand Down Expand Up @@ -3154,72 +3143,63 @@ index 09ed6955a51..98e313cddd4 100644
}
}

@@ -190,10 +192,16 @@ class ParquetTypeWideningSuite
@@ -190,7 +192,8 @@ class ParquetTypeWideningSuite
(Seq("1", "2", Short.MinValue.toString), ShortType, DoubleType),
(Seq("1", "2", Int.MinValue.toString), IntegerType, DoubleType),
(Seq("1.23", "10.34"), FloatType, DoubleType),
- (Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampNTZType)
+ // TODO: Comet cannot handle older than "1582-10-15"
+ (Seq("2020-01-01", "2020-01-02"/* , "1312-02-27" */), DateType, TimestampNTZType)
)
+ wideningTags: Seq[org.scalatest.Tag] =
+ if (fromType == DateType && toType == TimestampNTZType) {
+ Seq(IgnoreCometNativeDataFusion(
+ "https://github.com/apache/datafusion-comet/issues/3321"))
+ } else Seq.empty
}
- test(s"parquet widening conversion $fromType -> $toType") {
+ test(s"parquet widening conversion $fromType -> $toType", wideningTags: _*) {
checkAllParquetReaders(values, fromType, toType, expectError = false)
}

@@ -231,7 +239,8 @@ class ParquetTypeWideningSuite
test(s"parquet widening conversion $fromType -> $toType") {
@@ -231,7 +234,8 @@ class ParquetTypeWideningSuite
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampType)
)
}
- test(s"unsupported parquet conversion $fromType -> $toType") {
+ test(s"unsupported parquet conversion $fromType -> $toType",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3321")) {
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
checkAllParquetReaders(values, fromType, toType, expectError = true)
}

@@ -257,7 +266,8 @@ class ParquetTypeWideningSuite
@@ -257,7 +261,8 @@ class ParquetTypeWideningSuite
(Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1))
)
}
- test(s"unsupported parquet conversion $fromType -> $toType") {
+ test(s"unsupported parquet conversion $fromType -> $toType",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3321")) {
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
checkAllParquetReaders(values, fromType, toType,
expectError =
// parquet-mr allows reading decimals into a smaller precision decimal type without
@@ -271,7 +281,8 @@ class ParquetTypeWideningSuite
@@ -271,7 +276,8 @@ class ParquetTypeWideningSuite
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType))
outputTimestampType <- ParquetOutputTimestampType.values
}
- test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType") {
+ test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3321")) {
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
withSQLConf(
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString,
SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LegacyBehaviorPolicy.CORRECTED.toString
@@ -291,7 +302,8 @@ class ParquetTypeWideningSuite
@@ -291,7 +297,8 @@ class ParquetTypeWideningSuite
Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20)
}
test(
- s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") {
+ s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3321")) {
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
checkAllParquetReaders(
values = Seq("1.23", "10.34"),
fromType = DecimalType(fromPrecision, 2),
@@ -322,7 +334,8 @@ class ParquetTypeWideningSuite
@@ -322,7 +329,8 @@ class ParquetTypeWideningSuite
Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8))
}
test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " +
- s"Decimal($toPrecision, $toScale)"
+ s"Decimal($toPrecision, $toScale)",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3321")
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")
) {
checkAllParquetReaders(
values = Seq("1.23", "10.34"),
Expand Down
Loading