Skip to content

Commit dbc2b13

Browse files
authored
chore: Register task completion listener to ensure CometExecIterator is always closed (#3959)
1 parent 61ea6ad commit dbc2b13

3 files changed

Lines changed: 6 additions & 2 deletions

File tree

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,12 @@ class CometExecIterator(
135135
private var currentBatch: ColumnarBatch = null
136136
private var closed: Boolean = false
137137

138+
// Register a task completion listener to ensure native resources are released
139+
// when the task is done.
140+
TaskContext.get().addTaskCompletionListener[Unit] { _ =>
141+
this.close()
142+
}
143+
138144
private def getNextBatch: Option[ColumnarBatch] = {
139145
assert(partitionIndex >= 0 && partitionIndex < numParts)
140146

spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@ private[spark] class CometExecRDD(
137137

138138
Option(context).foreach { ctx =>
139139
ctx.addTaskCompletionListener[Unit] { _ =>
140-
it.close()
141140
subqueries.foreach(sub => CometScalarSubquery.removeSubquery(it.id, sub))
142141
}
143142
}

spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,6 @@ case class CometTakeOrderedAndProjectExec(
174174

175175
Option(TaskContext.get()).foreach { context =>
176176
context.addTaskCompletionListener[Unit] { _ =>
177-
it.close()
178177
cleanSubqueries(it.id, this)
179178
}
180179
}

0 commit comments

Comments
 (0)