Skip to content

Commit f9bb48c

Browse files
AngersZhuuuupan3793
authored andcommitted
[SPARK-54774][CORE] Submit failed should keep same exit code with app exit code in K8s mode
### What changes were proposed in this pull request? When submit failed, k8s case will stop SparkContext. Here we already know the exit code, so here we stop SC should also pass the exitcode too. ### Why are the changes needed? keep same exit code for SC and whole app, help when checking the log ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53539 from AngersZhuuuu/SPARK-54774. Authored-by: Angerszhuuuu <[email protected]> Signed-off-by: Cheng Pan <[email protected]>
1 parent f00ea04 commit f9bb48c

File tree

2 files changed

+53
-1
lines changed

2 files changed

+53
-1
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1050,7 +1050,7 @@ private[spark] class SparkSubmit extends Logging {
10501050
!isSqlShell(args.mainClass) && !isThriftServer(args.mainClass) &&
10511051
!isConnectServer(args.mainClass)) {
10521052
try {
1053-
SparkContext.getActive.foreach(_.stop())
1053+
SparkContext.getActive.foreach(_.stop(exitCode))
10541054
} catch {
10551055
case e: Throwable => logError("Failed to close SparkContext", e)
10561056
}

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1632,6 +1632,38 @@ class SparkSubmitSuite
16321632
assertResult(3)(runSparkSubmit(args, expectFailure = true))
16331633
}
16341634

1635+
test("SPARK-54774: k8s submit failed should keep same exit code with user code") {
1636+
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
1637+
val fileSystem = Utils.getHadoopFileSystem("/",
1638+
SparkHadoopUtil.get.newConfiguration(new SparkConf()))
1639+
withTempDir { testDir =>
1640+
val testDirPath = new Path(testDir.getAbsolutePath())
1641+
val args = Seq(
1642+
"--class", K8sExitCodeTestApplication.getClass.getName.stripSuffix("$"),
1643+
"--name", "testApp",
1644+
"--master", "k8s://host:port",
1645+
"--conf", "spark.ui.enabled=false",
1646+
"--conf", "spark.master.rest.enabled=false",
1647+
"--conf", "spark.kubernetes.authenticate.driver.serviceAccountName=default",
1648+
"--conf", "spark.eventLog.enabled=true",
1649+
"--conf", "spark.eventLog.rolling.enabled=false",
1650+
"--conf", "spark.eventLog.testing=true",
1651+
"--conf", s"spark.eventLog.dir=${testDirPath.toUri.toString}",
1652+
unusedJar.toString
1653+
)
1654+
// The test application throws SparkUserAppException with exit code 42,
1655+
// so SparkContext.stop(42) should be called in k8s mode
1656+
runSparkSubmit(args, expectFailure = true)
1657+
val listStatus = fileSystem.listStatus(testDirPath)
1658+
val logData = EventLogFileReader.openEventLog(listStatus.last.getPath, fileSystem)
1659+
Source.fromInputStream(logData)(Codec.UTF8).getLines().filter { line =>
1660+
line.contains("SparkListenerApplicationEnd")
1661+
}.foreach { line =>
1662+
assert(line.contains("\"ExitCode\":42"))
1663+
}
1664+
}
1665+
}
1666+
16351667
private def testRemoteResources(
16361668
enableHttpFs: Boolean,
16371669
forceDownloadSchemes: Seq[String] = Nil): Unit = {
@@ -2120,3 +2152,23 @@ class TestSparkApplication extends SparkApplication with Matchers {
21202152
}
21212153

21222154
}
2155+
2156+
object K8sExitCodeTestApplication {
2157+
def main(args: Array[String]): Unit = {
2158+
TestUtils.configTestLog4j2("INFO")
2159+
// Use local master to ensure SparkContext can be created in test environment
2160+
// The k8s master is set in SparkSubmit args, which triggers the finally block logic
2161+
val conf = new SparkConf().setMaster("local[2]")
2162+
val sc = new SparkContext(conf)
2163+
try {
2164+
// Create a simple RDD to ensure SparkContext is active
2165+
sc.parallelize(1 to 10).count()
2166+
// Throw SparkUserAppException with a specific exit code
2167+
// This simulates a user application failure
2168+
throw new SparkUserAppException(42)
2169+
} finally {
2170+
// Note: In k8s mode, SparkSubmit should call sc.stop(42) in the finally block
2171+
// We don't call stop() here to let SparkSubmit handle it
2172+
}
2173+
}
2174+
}

0 commit comments

Comments
 (0)