Skip to content

Commit dcaaffa

Browse files
authored
test: fix SparkToColumnar plan-shape assertions on Spark 4 (#4032)
1 parent 2bd01af commit dcaaffa

1 file changed

Lines changed: 6 additions & 6 deletions

File tree

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2388,8 +2388,6 @@ class CometExecSuite extends CometTestBase {
23882388
}
23892389

23902390
test("SparkToColumnar eliminate redundant in AQE") {
2391-
// TODO fix for Spark 4.0.0
2392-
assume(!isSpark40Plus)
23932391
withSQLConf(
23942392
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
23952393
CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
@@ -2404,7 +2402,10 @@ class CometExecSuite extends CometTestBase {
24042402
val planAfter = df.queryExecution.executedPlan
24052403
assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true"))
24062404
val adaptivePlan = planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
2407-
val numOperators = adaptivePlan.collect { case c: CometSparkToColumnarExec =>
2405+
// Use AdaptiveSparkPlanHelper.collect so traversal descends through QueryStageExec.plan;
2406+
// Spark 4 wraps the final plan in ResultQueryStageExec (a LeafExecNode) that
2407+
// SparkPlan.collect would otherwise stop at.
2408+
val numOperators = collect(adaptivePlan) { case c: CometSparkToColumnarExec =>
24082409
c
24092410
}
24102411
assert(numOperators.length == 1)
@@ -2478,8 +2479,6 @@ class CometExecSuite extends CometTestBase {
24782479
}
24792480

24802481
test("SparkToColumnar override node name for row input") {
2481-
// TODO fix for Spark 4.0.0
2482-
assume(!isSpark40Plus)
24832482
withSQLConf(
24842483
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
24852484
CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
@@ -2494,7 +2493,8 @@ class CometExecSuite extends CometTestBase {
24942493
val planAfter = df.queryExecution.executedPlan
24952494
assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true"))
24962495
val adaptivePlan = planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
2497-
val nodeNames = adaptivePlan.collect { case c: CometSparkToColumnarExec =>
2496+
// See comment in the "eliminate redundant in AQE" test about AdaptiveSparkPlanHelper.collect.
2497+
val nodeNames = collect(adaptivePlan) { case c: CometSparkToColumnarExec =>
24982498
c.nodeName
24992499
}
25002500
assert(nodeNames.length == 1)

0 commit comments

Comments
 (0)