Skip to content

Commit 33167c4

Browse files
committed
Move all SparkFunSuite-dependent methods from SQLTestUtils to QueryTest
SQLTestUtils body is now empty. Methods like withTempDir, testQuietly, testWithWholeStageCodegenOnAndOff are in QueryTest. SparkPlanTest, PlannerSuite, and HDFSMetadataLogSuite updated to extend QueryTest. Co-authored-by: Isaac
1 parent 39cdb9c commit 33167c4

File tree

5 files changed

+126
-133
lines changed

5 files changed

+126
-133
lines changed

sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala

Lines changed: 117 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ import scala.jdk.CollectionConverters._
2828
import scala.language.implicitConversions
2929

3030
import org.apache.hadoop.fs.Path
31-
import org.scalatest.{Assertions, BeforeAndAfterAll, Suite}
31+
import org.scalactic.source.Position
32+
import org.scalatest.{Assertions, BeforeAndAfterAll, Suite, Tag}
3233
import org.scalatest.concurrent.Eventually
3334

3435
import org.apache.spark.SparkFunSuite
@@ -43,12 +44,14 @@ import org.apache.spark.sql.catalyst.util._
4344
import org.apache.spark.sql.classic.{ClassicConversions, ColumnConversions, ColumnNodeToExpressionConverter, SQLImplicits}
4445
import org.apache.spark.sql.classic.ClassicConversions._
4546
import org.apache.spark.sql.execution.{FilterExec, QueryExecution, SQLExecution}
47+
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
4648
import org.apache.spark.sql.execution.columnar.InMemoryRelation
4749
import org.apache.spark.sql.execution.datasources.DataSourceUtils
50+
import org.apache.spark.sql.internal.SQLConf
4851
import org.apache.spark.sql.util.QueryExecutionListener
4952
import org.apache.spark.storage.StorageLevel
53+
import org.apache.spark.util.{UninterruptibleThread, Utils}
5054
import org.apache.spark.util.ArrayImplicits._
51-
import org.apache.spark.util.Utils
5255

5356

5457
/**
@@ -602,7 +605,118 @@ trait QueryTestBase extends PlanTestBase
602605

603606
}
604607

605-
abstract class QueryTest extends SparkFunSuite with QueryTestBase with PlanTest
608+
abstract class QueryTest extends SparkFunSuite with QueryTestBase with PlanTest {
609+
import scala.util.control.NonFatal
610+
611+
/**
612+
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
613+
* returns.
614+
*/
615+
protected override def withTempDir(f: File => Unit): Unit = {
616+
super.withTempDir { dir =>
617+
f(dir)
618+
waitForTasksToFinish()
619+
}
620+
}
621+
622+
/**
623+
* A helper function for turning off/on codegen.
624+
*/
625+
protected def testWithWholeStageCodegenOnAndOff(testName: String)(f: String => Unit): Unit = {
626+
Seq("false", "true").foreach { codegenEnabled =>
627+
val isTurnOn = if (codegenEnabled == "true") "on" else "off"
628+
test(s"$testName (whole-stage-codegen ${isTurnOn})") {
629+
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegenEnabled) {
630+
f(codegenEnabled)
631+
}
632+
}
633+
}
634+
}
635+
636+
/**
637+
* Disable stdout and stderr when running the test. To not output the logs to the console,
638+
* ConsoleAppender's `follow` should be set to `true` so that it will honor reassignments of
639+
* System.out or System.err. Otherwise, ConsoleAppender will still output to the console even if
640+
* we change System.out and System.err.
641+
*/
642+
protected def testQuietly(name: String)(f: => Unit): Unit = {
643+
test(name) {
644+
quietly {
645+
f
646+
}
647+
}
648+
}
649+
650+
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)
651+
(implicit pos: Position): Unit = {
652+
if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
653+
super.test(testName, testTags: _*) {
654+
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
655+
testFun
656+
}
657+
}
658+
} else {
659+
super.test(testName, testTags: _*)(testFun)
660+
}
661+
}
662+
663+
/**
664+
* Run a test on a separate `UninterruptibleThread`.
665+
*/
666+
protected def testWithUninterruptibleThread(name: String, quietly: Boolean = false)
667+
(body: => Unit): Unit = {
668+
val timeoutMillis = 10000
669+
@transient var ex: Throwable = null
670+
671+
def runOnThread(): Unit = {
672+
val thread = new UninterruptibleThread(s"Testing thread for test $name") {
673+
override def run(): Unit = {
674+
try {
675+
body
676+
} catch {
677+
case NonFatal(e) =>
678+
ex = e
679+
}
680+
}
681+
}
682+
thread.setDaemon(true)
683+
thread.start()
684+
thread.join(timeoutMillis)
685+
if (thread.isAlive) {
686+
thread.interrupt()
687+
// If this interrupt does not work, then this thread is most likely running something that
688+
// is not interruptible. There is not much point to wait for the thread to terminate, and
689+
// we rather let the JVM terminate the thread on exit.
690+
fail(
691+
s"Test '$name' running on o.a.s.util.UninterruptibleThread timed out after" +
692+
s" $timeoutMillis ms")
693+
} else if (ex != null) {
694+
throw ex
695+
}
696+
}
697+
698+
if (quietly) {
699+
testQuietly(name) { runOnThread() }
700+
} else {
701+
test(name) { runOnThread() }
702+
}
703+
}
704+
705+
/**
706+
* Copy file in jar's resource to a temp file, then pass it to `f`.
707+
* This function is used to make `f` can use the path of temp file(e.g. file:/), instead of
708+
* path of jar's resource which starts with 'jar:file:/'
709+
*/
710+
protected def withResourceTempPath(resourcePath: String)(f: File => Unit): Unit = {
711+
val inputStream =
712+
Thread.currentThread().getContextClassLoader.getResourceAsStream(resourcePath)
713+
withTempDir { dir =>
714+
val tmpFile = new File(dir, "tmp")
715+
Files.copy(inputStream, tmpFile.toPath)
716+
f(tmpFile)
717+
}
718+
}
719+
}
606720

607721
object QueryTest extends Assertions {
608722
/**

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.SparkUnsupportedOperationException
2121
import org.apache.spark.rdd.RDD
22-
import org.apache.spark.sql.{execution, DataFrame, Row}
22+
import org.apache.spark.sql.{execution, DataFrame, QueryTest, Row}
2323
import org.apache.spark.sql.AnalysisException
2424
import org.apache.spark.sql.catalyst.InternalRow
2525
import org.apache.spark.sql.catalyst.expressions._
@@ -37,7 +37,7 @@ import org.apache.spark.sql.internal.SQLConf
3737
import org.apache.spark.sql.test.SharedSparkSession
3838
import org.apache.spark.sql.types._
3939

40-
class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
40+
class PlannerSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlanHelper {
4141
import testImplicits._
4242

4343
setupTestData()

sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ package org.apache.spark.sql.execution
1919

2020
import scala.util.control.NonFatal
2121

22-
import org.apache.spark.SparkFunSuite
23-
import org.apache.spark.sql.{classic, DataFrame, Row, SparkSessionProvider, SQLContext}
22+
import org.apache.spark.sql.{classic, DataFrame, QueryTest, Row, SQLContext}
2423
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
2524
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
2625
import org.apache.spark.sql.classic.ClassicConversions._
@@ -30,7 +29,7 @@ import org.apache.spark.sql.test.SQLTestUtils
3029
* Base class for writing tests for individual physical operators. For an example of how this
3130
* class's test helper methods can be used, see [[SortSuite]].
3231
*/
33-
private[sql] abstract class SparkPlanTest extends SparkFunSuite with SparkSessionProvider {
32+
private[sql] abstract class SparkPlanTest extends QueryTest {
3433
override protected def spark: classic.SparkSession
3534

3635
/**

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ import scala.language.implicitConversions
2525
import org.scalatest.concurrent.Waiters._
2626
import org.scalatest.time.SpanSugar._
2727

28+
import org.apache.spark.sql.QueryTest
2829
import org.apache.spark.sql.execution.streaming.checkpointing.{FileContextBasedCheckpointFileManager, FileSystemBasedCheckpointFileManager, HDFSMetadataLog}
2930
import org.apache.spark.sql.internal.SQLConf
3031
import org.apache.spark.sql.test.SharedSparkSession
3132
import org.apache.spark.util.UninterruptibleThread
3233

33-
class HDFSMetadataLogSuite extends SharedSparkSession {
34+
class HDFSMetadataLogSuite extends QueryTest with SharedSparkSession {
3435

3536
private implicit def toOption[A](a: A): Option[A] = Option(a)
3637

sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala

Lines changed: 3 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,11 @@
1717

1818
package org.apache.spark.sql.test
1919

20-
import java.io.File
21-
import java.nio.file.Files
22-
23-
import scala.util.control.NonFatal
24-
25-
import org.scalactic.source.Position
26-
import org.scalatest.{Suite, Tag}
20+
import org.scalatest.Suite
2721

2822
import org.apache.spark.SparkFunSuite
2923
import org.apache.spark.sql.{QueryTest, QueryTestBase, Row}
3024
import org.apache.spark.sql.catalyst.plans.PlanTest
31-
import org.apache.spark.sql.catalyst.util.quietly
32-
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
33-
import org.apache.spark.sql.internal.SQLConf
34-
import org.apache.spark.util.UninterruptibleThread
3525

3626
/**
3727
* Helper trait that can be extended by all external SQL test suites.
@@ -44,120 +34,9 @@ private[sql] trait SQLTestUtilsBase extends QueryTestBase { self: Suite => }
4434
* Helper trait that should be extended by all SQL test suites within the Spark
4535
* code base.
4636
*
47-
* This extends [[SparkFunSuite]] with [[QueryTestBase]] and adds test helpers that
48-
* depend on [[SparkFunSuite]] (e.g. `test()` overrides, `withTempDir`).
37+
* This is now an alias for [[QueryTest]].
4938
*/
50-
private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with PlanTest {
51-
52-
/**
53-
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
54-
* returns.
55-
*/
56-
protected override def withTempDir(f: File => Unit): Unit = {
57-
super.withTempDir { dir =>
58-
f(dir)
59-
waitForTasksToFinish()
60-
}
61-
}
62-
63-
/**
64-
* A helper function for turning off/on codegen.
65-
*/
66-
protected def testWithWholeStageCodegenOnAndOff(testName: String)(f: String => Unit): Unit = {
67-
Seq("false", "true").foreach { codegenEnabled =>
68-
val isTurnOn = if (codegenEnabled == "true") "on" else "off"
69-
test(s"$testName (whole-stage-codegen ${isTurnOn})") {
70-
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegenEnabled) {
71-
f(codegenEnabled)
72-
}
73-
}
74-
}
75-
}
76-
77-
/**
78-
* Disable stdout and stderr when running the test. To not output the logs to the console,
79-
* ConsoleAppender's `follow` should be set to `true` so that it will honor reassignments of
80-
* System.out or System.err. Otherwise, ConsoleAppender will still output to the console even if
81-
* we change System.out and System.err.
82-
*/
83-
protected def testQuietly(name: String)(f: => Unit): Unit = {
84-
test(name) {
85-
quietly {
86-
f
87-
}
88-
}
89-
}
90-
91-
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)
92-
(implicit pos: Position): Unit = {
93-
if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
94-
super.test(testName, testTags: _*) {
95-
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
96-
testFun
97-
}
98-
}
99-
} else {
100-
super.test(testName, testTags: _*)(testFun)
101-
}
102-
}
103-
104-
/**
105-
* Run a test on a separate `UninterruptibleThread`.
106-
*/
107-
protected def testWithUninterruptibleThread(name: String, quietly: Boolean = false)
108-
(body: => Unit): Unit = {
109-
val timeoutMillis = 10000
110-
@transient var ex: Throwable = null
111-
112-
def runOnThread(): Unit = {
113-
val thread = new UninterruptibleThread(s"Testing thread for test $name") {
114-
override def run(): Unit = {
115-
try {
116-
body
117-
} catch {
118-
case NonFatal(e) =>
119-
ex = e
120-
}
121-
}
122-
}
123-
thread.setDaemon(true)
124-
thread.start()
125-
thread.join(timeoutMillis)
126-
if (thread.isAlive) {
127-
thread.interrupt()
128-
// If this interrupt does not work, then this thread is most likely running something that
129-
// is not interruptible. There is not much point to wait for the thread to terminate, and
130-
// we rather let the JVM terminate the thread on exit.
131-
fail(
132-
s"Test '$name' running on o.a.s.util.UninterruptibleThread timed out after" +
133-
s" $timeoutMillis ms")
134-
} else if (ex != null) {
135-
throw ex
136-
}
137-
}
138-
139-
if (quietly) {
140-
testQuietly(name) { runOnThread() }
141-
} else {
142-
test(name) { runOnThread() }
143-
}
144-
}
145-
146-
/**
147-
* Copy file in jar's resource to a temp file, then pass it to `f`.
148-
* This function is used to make `f` can use the path of temp file(e.g. file:/), instead of
149-
* path of jar's resource which starts with 'jar:file:/'
150-
*/
151-
protected def withResourceTempPath(resourcePath: String)(f: File => Unit): Unit = {
152-
val inputStream =
153-
Thread.currentThread().getContextClassLoader.getResourceAsStream(resourcePath)
154-
withTempDir { dir =>
155-
val tmpFile = new File(dir, "tmp")
156-
Files.copy(inputStream, tmpFile.toPath)
157-
f(tmpFile)
158-
}
159-
}
160-
}
39+
private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with PlanTest
16140

16241
private[sql] object SQLTestUtils {
16342

0 commit comments

Comments
 (0)