Skip to content

Commit b06f470

Browse files
rosaclaude
andcommitted
Fix race condition between job enqueue and concurrency unblock
This addresses #456. There is a race condition in the concurrency control mechanism where a job that finishes and tries to unblock the next blocked execution can miss a `BlockedExecution` that is being created concurrently. This causes the blocked job to remain stuck until the `ConcurrencyMaintenance` periodic task runs (potentially minutes later). It happens as follows: 1. Job A is running (semaphore value=0) 2. Job B enqueue starts: reads semaphore (value=0, no row lock) → decides to block 3. Job A finishes: `Semaphore.signal` → `UPDATE` value to 1 (succeeds immediately since no lock held) 4. Job A: `BlockedExecution.release_one` → `SELECT` finds nothing (Job B's `BlockedExecution` not committed yet) 5. Job B enqueue commits: `BlockedExecution` now exists but nobody will unblock it The root cause is that `Semaphore::Proxy#wait` doesn't lock the semaphore row when checking the semaphore. This allows the concurrent `signal` to complete before the enqueue transaction commits, creating a window where the `BlockedExecution` is invisible. To fix, we lock the semaphore row with `FOR UPDATE` during the wait check so that the enqueue transaction holds the lock from the check through `BlockedExecution` creation and commit. This forces a concurrent signal `UPDATE` to wait, guaranteeing the `BlockedExecution` is visible when release_one runs. This shouldn't introduce any dead locks, as there's no new circular dependencies introduced by these two: - Enqueue path: locks `Semaphore` row → `INSERT`s `BlockedExecution` (no lock on existing rows) - `release_one` path: locks `BlockedExecution` row (`SKIP LOCKED`) → locks `Semaphore` row (via wait in release) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 552f7d0 commit b06f470

2 files changed

Lines changed: 120 additions & 1 deletion

File tree

app/models/solid_queue/semaphore.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def initialize(job)
4040
end
4141

4242
def wait
43-
if semaphore = Semaphore.find_by(key: key)
43+
if semaphore = Semaphore.lock.find_by(key: key)
4444
semaphore.value > 0 && attempt_decrement
4545
else
4646
attempt_creation
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
# frozen_string_literal: true
2+
3+
require "test_helper"
4+
5+
class SolidQueue::SemaphoreTest < ActiveSupport::TestCase
6+
self.use_transactional_tests = false
7+
8+
setup do
9+
@result = JobResult.create!(queue_name: "default")
10+
end
11+
12+
test "wait acquires a row lock that blocks concurrent signal" do
13+
skip_on_sqlite
14+
15+
# Enqueue first job to create semaphore with value=0
16+
NonOverlappingUpdateResultJob.perform_later(@result)
17+
concurrency_key = SolidQueue::Job.last.concurrency_key
18+
assert_equal 0, SolidQueue::Semaphore.find_by(key: concurrency_key).value
19+
20+
lock_held = Concurrent::Event.new
21+
22+
# Background thread: holds a FOR UPDATE lock on the semaphore row
23+
locker = Thread.new do
24+
SolidQueue::Record.connection_pool.with_connection do
25+
SolidQueue::Record.transaction do
26+
SolidQueue::Semaphore.where(key: concurrency_key).lock.first
27+
lock_held.set
28+
sleep 1
29+
end
30+
end
31+
end
32+
33+
lock_held.wait(5)
34+
sleep 0.1
35+
36+
# Main thread: this UPDATE should block until the locker's transaction commits
37+
start = monotonic_now
38+
SolidQueue::Semaphore.where(key: concurrency_key).update_all("value = value + 1")
39+
elapsed = monotonic_now - start
40+
41+
locker.join(5)
42+
43+
assert elapsed >= 0.5, "UPDATE should have been blocked by FOR UPDATE lock (took #{elapsed.round(3)}s)"
44+
assert_equal 1, SolidQueue::Semaphore.find_by(key: concurrency_key).value
45+
end
46+
47+
test "blocked execution created during enqueue is visible to release_one after signal" do
48+
skip_on_sqlite
49+
50+
# Enqueue first job to create semaphore with value=0
51+
NonOverlappingUpdateResultJob.perform_later(@result)
52+
job_a = SolidQueue::Job.last
53+
concurrency_key = job_a.concurrency_key
54+
assert_equal 0, SolidQueue::Semaphore.find_by(key: concurrency_key).value
55+
56+
lock_held = Concurrent::Event.new
57+
58+
# Background thread: simulates the enqueue path for a second job.
59+
# Locks the semaphore row (as the code does), creates a BlockedExecution,
60+
# then holds the transaction open to simulate the window where the race occurs.
61+
enqueue_thread = Thread.new do
62+
SolidQueue::Record.connection_pool.with_connection do
63+
SolidQueue::Record.transaction do
64+
# Lock the semaphore (same as Semaphore::Proxy#wait)
65+
SolidQueue::Semaphore.where(key: concurrency_key).lock.first
66+
67+
# Create job and blocked execution bypassing after_create callbacks
68+
# to avoid re-entering Semaphore.wait
69+
uuid = SecureRandom.uuid
70+
SolidQueue::Job.insert({
71+
queue_name: "default",
72+
class_name: "NonOverlappingUpdateResultJob",
73+
concurrency_key: concurrency_key,
74+
active_job_id: uuid,
75+
arguments: "{}",
76+
scheduled_at: Time.current
77+
})
78+
job_b_id = SolidQueue::Job.where(active_job_id: uuid).pick(:id)
79+
80+
SolidQueue::BlockedExecution.insert({
81+
job_id: job_b_id,
82+
queue_name: "default",
83+
concurrency_key: concurrency_key,
84+
expires_at: SolidQueue.default_concurrency_control_period.from_now,
85+
priority: 0
86+
})
87+
88+
lock_held.set
89+
90+
# Hold the transaction open so the signal path must wait
91+
sleep 1
92+
end
93+
end
94+
end
95+
96+
lock_held.wait(5)
97+
sleep 0.1
98+
99+
# Main thread: simulates job_a finishing — signal + release_one.
100+
# The signal UPDATE will block until the enqueue transaction commits,
101+
# guaranteeing the BlockedExecution is visible to release_one.
102+
assert SolidQueue::Semaphore.signal(job_a)
103+
assert SolidQueue::BlockedExecution.release_one(concurrency_key),
104+
"release_one should find the BlockedExecution created by the concurrent enqueue"
105+
106+
enqueue_thread.join(5)
107+
108+
assert_equal 0, SolidQueue::BlockedExecution.where(concurrency_key: concurrency_key).count
109+
end
110+
111+
private
112+
def skip_on_sqlite
113+
skip "Row-level locking not supported on SQLite" if SolidQueue::Record.connection.adapter_name.downcase.include?("sqlite")
114+
end
115+
116+
def monotonic_now
117+
Process.clock_gettime(Process::CLOCK_MONOTONIC)
118+
end
119+
end

0 commit comments

Comments
 (0)