Skip to content

Commit 2bd4cf3

Browse files
committed
[SPARK-53807][CORE][FOLLOWUP] Fix a race condition issue between unlock and releaseAllLocksForTask for write lock in BlockInfoManager
### What changes were proposed in this pull request? This PR fixes an issue that `unlock` and `releaseAllLocksForTask` for write lock in `BlockInfoManager`. #52524 tried to fix this issue but it's still remaining. This issue can be reproduced by inserting `Thread.sleep()` into `BlockInfoManager` like as follows. ``` --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala -397,6 +397,7 private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false blockInfo(blockId) { (info, condition) => if (info.writerTask != BlockInfo.NO_WRITER) { info.writerTask = BlockInfo.NO_WRITER + Thread.sleep(100) val blockIds = writeLocksByTask.get(taskAttemptId) if (blockIds != null) { blockIds.remove(blockId) -490,12 +491,14 private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false val blocksWithReleasedLocks = mutable.ArrayBuffer[BlockId]() val writeLocks = Option(writeLocksByTask.remove(taskAttemptId)).getOrElse(util.Set.of()) + Thread.sleep(100) writeLocks.forEach { blockId => blockInfo(blockId) { (info, condition) => // Check the existence of `blockId` because `unlock` may have already removed it // concurrently. if (writeLocks.contains(blockId)) { blocksWithReleasedLocks += blockId + Thread.sleep(100) assert(info.writerTask == taskAttemptId) info.writerTask = BlockInfo.NO_WRITER condition.signalAll() ``` And then, run a test `SPARK-53807 - concurrent unlock and releaseAllLocksForTask for write should not fail` in `BlockInfoManagerSuite`. ``` $ build/sbt 'testOnly org.apache.spark.storage.BlockInfoManagerSuite -- -z "SPARK-53807"' ``` ### Why are the changes needed? To fix a race condition issue. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? `BlockInfoManagerSuite` passes even if inserting `Thread.sleep` as mentioned above. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #53940 from sarutak/fix-blockinfo-manager-issue2. Authored-by: Kousuke Saruta <sarutak@amazon.co.jp> Signed-off-by: Kousuke Saruta <sarutak@apache.org>
1 parent 0882317 commit 2bd4cf3

File tree

1 file changed

+1
-1
lines changed

1 file changed

+1
-1
lines changed

core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,9 +396,9 @@ private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false
396396
logTrace(s"Task $taskAttemptId releasing lock for $blockId")
397397
blockInfo(blockId) { (info, condition) =>
398398
if (info.writerTask != BlockInfo.NO_WRITER) {
399-
info.writerTask = BlockInfo.NO_WRITER
400399
val blockIds = writeLocksByTask.get(taskAttemptId)
401400
if (blockIds != null) {
401+
info.writerTask = BlockInfo.NO_WRITER
402402
blockIds.remove(blockId)
403403
}
404404
} else {

0 commit comments

Comments
 (0)