Skip to content

Commit 3851cb5

Browse files
ivosonLuciferYang
authored andcommitted
[SPARK-56302][CORE] Free task result memory eagerly during serialization on executor
### What changes were proposed in this pull request? Eagerly null intermediate objects during task result serialization in `Executor` to reduce peak heap memory usage. During result serialization in `TaskRunner.run()`, three representations of the result coexist on the heap simultaneously: 1. `value` — the raw task result object from `task.run()` 2. `valueByteBuffer` — first serialization of the result 3. `serializedDirectResult` — second serialization wrapping the above into a `DirectTaskResult` Each becomes dead as soon as the next is produced, but none were released. This PR nulls each reference as soon as it's no longer needed: - `value = null` after serializing into `valueByteBuffer` - `valueByteBuffer = null` and `directResult = null` after re-serializing into `serializedDirectResult` All changes are confined to the executor side within `TaskRunner.run()`, where the variables are local and not exposed to other components. ### Why are the changes needed? For tasks returning large results (e.g. `collect()` on large datasets), the redundant copies can roughly triple peak memory during serialization, increasing GC pressure or causing executor OOM. Eagerly freeing dead references lets the GC reclaim memory sooner. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UTs ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code v2.1.88 Closes apache#55110 from ivoson/free-result-memory-asap. Lead-authored-by: Tengfei Huang <[email protected]> Co-authored-by: Tengfei Huang <[email protected]> Signed-off-by: yangjie01 <[email protected]>
1 parent 4018cc7 commit 3851cb5

File tree

1 file changed

+11
-3
lines changed

1 file changed

+11
-3
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ import org.apache.spark.status.api.v1.ThreadStackTrace
5757
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
5858
import org.apache.spark.util._
5959
import org.apache.spark.util.ArrayImplicits._
60+
import org.apache.spark.util.io.ChunkedByteBuffer
6061

6162
private[spark] object IsolatedSessionState {
6263
// Authoritative store for all isolated sessions. Sessions are put here when created
@@ -883,7 +884,7 @@ private[spark] class Executor(
883884
val resources = taskDescription.resources.map { case (rName, addressesAmounts) =>
884885
rName -> new ResourceInformation(rName, addressesAmounts.keys.toSeq.sorted.toArray)
885886
}
886-
val value = Utils.tryWithSafeFinally {
887+
var value: Any = Utils.tryWithSafeFinally {
887888
val res = task.run(
888889
taskAttemptId = taskId,
889890
attemptNumber = taskDescription.attemptNumber,
@@ -938,7 +939,9 @@ private[spark] class Executor(
938939

939940
val resultSer = env.serializer.newInstance()
940941
val beforeSerializationNs = System.nanoTime()
941-
val valueByteBuffer = SerializerHelper.serializeToChunkedBuffer(resultSer, value)
942+
var valueByteBuffer: ChunkedByteBuffer = SerializerHelper.serializeToChunkedBuffer(
943+
resultSer, value)
944+
value = null // Allow GC to reclaim the raw task result
942945
val afterSerializationNs = System.nanoTime()
943946

944947
// Deserialization happens in two parts: first, we deserialize a Task object, which
@@ -982,10 +985,15 @@ private[spark] class Executor(
982985
val accumUpdates = task.collectAccumulatorUpdates()
983986
val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId)
984987
// TODO: do not serialize value twice
985-
val directResult = new DirectTaskResult(valueByteBuffer, accumUpdates, metricPeaks)
988+
var directResult: DirectTaskResult[Any] = new DirectTaskResult(
989+
valueByteBuffer, accumUpdates, metricPeaks)
986990
// try to estimate a reasonable upper bound of DirectTaskResult serialization
987991
val serializedDirectResult = SerializerHelper.serializeToChunkedBuffer(ser, directResult,
988992
valueByteBuffer.size + accumUpdates.size * 32 + metricPeaks.length * 8)
993+
// Allow GC to reclaim the first serialization buffer. Both references must be
994+
// nulled: the local var and the field inside directResult point to the same object.
995+
valueByteBuffer = null
996+
directResult = null
989997
val resultSize = serializedDirectResult.size
990998
executorSource.METRIC_RESULT_SIZE.inc(resultSize)
991999

0 commit comments

Comments
 (0)