Skip to content

Commit 9ecf53f

Browse files
andygroveclaude
andauthored
fix: preserve partitioning in CometNativeScanExec for bucketed scans (#3392)
This fixes test failures when `native_datafusion` is enabled (issue #3315): 1. CometNativeScanExec now preserves the original outputPartitioning for bucketed scans, matching the pattern used by CometScanExec. Previously it always returned UnknownPartitioning, causing BroadcastJoinSuite tests to fail when they expected PartitioningCollection. 2. Updated diff files to accept CometNativeScanExec in the FileDataSourceV2FallBackSuite "Fallback Parquet V2 to V1" test, which checks for FileSourceScanExec or CometScanExec in the plan. Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 7886a3d commit 9ecf53f

4 files changed

Lines changed: 16 additions & 8 deletions

File tree

dev/diffs/3.4.3.diff

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1157,7 +1157,7 @@ index cfc8b2cc845..c6fcfd7bd08 100644
11571157
import org.apache.spark.SparkConf
11581158
import org.apache.spark.sql.{AnalysisException, QueryTest}
11591159
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
1160-
+import org.apache.spark.sql.comet.CometScanExec
1160+
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
11611161
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
11621162
import org.apache.spark.sql.connector.read.ScanBuilder
11631163
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
@@ -1167,7 +1167,7 @@ index cfc8b2cc845..c6fcfd7bd08 100644
11671167
assert(
11681168
- df.queryExecution.executedPlan.exists(_.isInstanceOf[FileSourceScanExec]))
11691169
+ df.queryExecution.executedPlan.exists {
1170-
+ case _: FileSourceScanExec | _: CometScanExec => true
1170+
+ case _: FileSourceScanExec | _: CometScanExec | _: CometNativeScanExec => true
11711171
+ case _ => false
11721172
+ }
11731173
+ )

dev/diffs/3.5.8.diff

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,7 +1111,7 @@ index cfc8b2cc845..c6fcfd7bd08 100644
11111111
import org.apache.spark.SparkConf
11121112
import org.apache.spark.sql.{AnalysisException, QueryTest}
11131113
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
1114-
+import org.apache.spark.sql.comet.CometScanExec
1114+
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
11151115
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
11161116
import org.apache.spark.sql.connector.read.ScanBuilder
11171117
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
@@ -1121,7 +1121,7 @@ index cfc8b2cc845..c6fcfd7bd08 100644
11211121
assert(
11221122
- df.queryExecution.executedPlan.exists(_.isInstanceOf[FileSourceScanExec]))
11231123
+ df.queryExecution.executedPlan.exists {
1124-
+ case _: FileSourceScanExec | _: CometScanExec => true
1124+
+ case _: FileSourceScanExec | _: CometScanExec | _: CometNativeScanExec => true
11251125
+ case _ => false
11261126
+ }
11271127
+ )

dev/diffs/4.0.1.diff

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1443,7 +1443,7 @@ index 2a0ab21ddb0..e8a5a891105 100644
14431443
import org.apache.spark.{SparkConf, SparkException}
14441444
import org.apache.spark.sql.QueryTest
14451445
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
1446-
+import org.apache.spark.sql.comet.CometScanExec
1446+
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
14471447
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
14481448
import org.apache.spark.sql.connector.read.ScanBuilder
14491449
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
@@ -1453,7 +1453,7 @@ index 2a0ab21ddb0..e8a5a891105 100644
14531453
assert(
14541454
- df.queryExecution.executedPlan.exists(_.isInstanceOf[FileSourceScanExec]))
14551455
+ df.queryExecution.executedPlan.exists {
1456-
+ case _: FileSourceScanExec | _: CometScanExec => true
1456+
+ case _: FileSourceScanExec | _: CometScanExec | _: CometNativeScanExec => true
14571457
+ case _ => false
14581458
+ }
14591459
+ )

spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,16 @@ case class CometNativeScanExec(
6565
override val nodeName: String =
6666
s"CometNativeScan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}"
6767

68-
override lazy val outputPartitioning: Partitioning =
69-
UnknownPartitioning(originalPlan.inputRDD.getNumPartitions)
68+
// exposed for testing
69+
lazy val bucketedScan: Boolean = originalPlan.bucketedScan && !disableBucketedScan
70+
71+
override lazy val outputPartitioning: Partitioning = {
72+
if (bucketedScan) {
73+
originalPlan.outputPartitioning
74+
} else {
75+
UnknownPartitioning(originalPlan.inputRDD.getNumPartitions)
76+
}
77+
}
7078

7179
override lazy val outputOrdering: Seq[SortOrder] = originalPlan.outputOrdering
7280

0 commit comments

Comments
 (0)