Skip to content

Commit b976e7b

Browse files
committed
[SPARK-55603][K8S] Improve removeExecutorFromK8s to use patch instead of edit API
### What changes were proposed in this pull request? This PR aims to improve `removeExecutorFromK8s` to use `patch` instead of `edit` API. ### Why are the changes needed? **Network Efficiency** - `edit` requires fetching the entire resource and sending the full updated resource back. - `patch` only transmits the specific changes, making it much more network-efficient. **Concurrency & Conflict Resolution** - `edit` typically follows a Get -> Modify -> Update (PUT) pattern. Using this pattern creates a race condition where, if another client modifies the resource in between, a 409 Conflict error occurs due to a mismatched resourceVersion. - `patch` sends only the changes (delta) to the server, where the merge is handled server-side. This significantly reduces the risk of conflicts, especially for simple operations like adding an annotation. ### Does this PR introduce _any_ user-facing change? This will reduce the overhead of K8s control plane and the chance of 409 error. ### How was this patch tested? Pass the CIs with newly updated test case. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: `Gemini 3 Pro (High)` on `Antigravity` Closes #54376 from dongjoon-hyun/SPARK-55603. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 4ee63ac commit b976e7b

File tree

2 files changed

+14
-18
lines changed

2 files changed

+14
-18
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717
package org.apache.spark.scheduler.cluster.k8s
1818

1919
import java.util.concurrent.TimeUnit
20-
import java.util.function.UnaryOperator
2120

2221
import scala.collection.mutable
2322
import scala.jdk.CollectionConverters._
2423

2524
import com.google.common.cache.CacheBuilder
2625
import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
2726
import io.fabric8.kubernetes.client.KubernetesClient
27+
import io.fabric8.kubernetes.client.dsl.base.{PatchContext, PatchType}
2828

2929
import org.apache.spark.SparkConf
3030
import org.apache.spark.deploy.ExecutorFailureTracker
@@ -64,6 +64,8 @@ private[spark] class ExecutorPodsLifecycleManager(
6464

6565
private val namespace = conf.get(KUBERNETES_NAMESPACE)
6666

67+
private val PATCH_CONTEXT = PatchContext.of(PatchType.STRATEGIC_MERGE)
68+
6769
private val sparkContainerName = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME)
6870
.getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME)
6971

@@ -231,7 +233,11 @@ private[spark] class ExecutorPodsLifecycleManager(
231233
.pods()
232234
.inNamespace(namespace)
233235
.withName(updatedPod.getMetadata.getName)
234-
.edit(executorInactivationFn)
236+
.patch(PATCH_CONTEXT, new PodBuilder()
237+
.editOrNewMetadata()
238+
.addToLabels(SPARK_EXECUTOR_INACTIVE_LABEL, "true")
239+
.endMetadata()
240+
.build())
235241

236242
inactivatedPods += execId
237243
}
@@ -321,10 +327,4 @@ private object ExecutorPodsLifecycleManager {
321327
}
322328
s"${code}${humanStr}"
323329
}
324-
325-
def executorInactivationFn: UnaryOperator[Pod] = (p: Pod) => new PodBuilder(p)
326-
.editOrNewMetadata()
327-
.addToLabels(SPARK_EXECUTOR_INACTIVE_LABEL, "true")
328-
.endMetadata()
329-
.build()
330330
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616
*/
1717
package org.apache.spark.scheduler.cluster.k8s
1818

19-
import java.util.function.UnaryOperator
20-
2119
import scala.collection.mutable
2220

2321
import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
2422
import io.fabric8.kubernetes.client.KubernetesClient
2523
import io.fabric8.kubernetes.client.dsl.PodResource
26-
import org.mockito.{Mock, MockitoAnnotations}
24+
import io.fabric8.kubernetes.client.dsl.base.PatchContext
25+
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
2726
import org.mockito.ArgumentMatchers.any
2827
import org.mockito.ArgumentMatchers.anyString
2928
import org.mockito.Mockito.{mock, never, times, verify, when}
@@ -191,12 +190,6 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
191190
verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
192191
}
193192

194-
test("SPARK-40458: test executor inactivation function") {
195-
val failedPod = failedExecutorWithoutDeletion(1)
196-
val inactivated = ExecutorPodsLifecycleManager.executorInactivationFn(failedPod)
197-
assert(inactivated.getMetadata().getLabels().get(SPARK_EXECUTOR_INACTIVE_LABEL) === "true")
198-
}
199-
200193
test("Keep executor pods in k8s if configured.") {
201194
val failedPod = failedExecutorWithoutDeletion(1)
202195
eventHandlerUnderTest.conf.set(Config.KUBERNETES_DELETE_EXECUTORS, false)
@@ -206,8 +199,11 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
206199
val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
207200
verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
208201
verify(namedExecutorPods(failedPod.getMetadata.getName), never()).delete()
202+
203+
val patchCaptor = ArgumentCaptor.forClass(classOf[Pod])
209204
verify(namedExecutorPods(failedPod.getMetadata.getName))
210-
.edit(any[UnaryOperator[Pod]]())
205+
.patch(any[PatchContext], patchCaptor.capture())
206+
assert(patchCaptor.getValue.getMetadata.getLabels.get(SPARK_EXECUTOR_INACTIVE_LABEL) === "true")
211207
}
212208

213209
test("SPARK-49804: Use the exit code of executor container always") {

0 commit comments

Comments
 (0)