Skip to content

Commit 92a9ccf

Browse files
committed
Fix async pool compatibility on Ruby 3.1
1 parent 9ee9f75 commit 92a9ccf

2 files changed

Lines changed: 35 additions & 46 deletions

File tree

lib/solid_queue/execution_pools/async_pool.rb

Lines changed: 11 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ module ExecutionPools
55
class AsyncPool
66
include AppExecutor
77

8-
WAKEUP_SIGNAL = ".".b
8+
IDLE_WAIT_INTERVAL = 0.01
99

1010
class MissingDependencyError < LoadError
1111
def initialize(error)
@@ -59,7 +59,6 @@ def initialize(size, on_state_change: nil)
5959
@fatal_error = nil
6060
@boot_queue = Thread::Queue.new
6161
@pending_executions = Thread::Queue.new
62-
@wakeup_reader, @wakeup_writer = IO.pipe
6362

6463
self.class.ensure_dependency!
6564
self.class.ensure_supported_isolation_level!
@@ -78,7 +77,6 @@ def post(execution)
7877
reserve_capacity!
7978
reserved = true
8079
pending_executions << execution
81-
signal_reactor
8280
rescue Exception
8381
restore_capacity if reserved
8482
raise
@@ -94,13 +92,11 @@ def idle?
9492
end
9593

9694
def shutdown
97-
should_shutdown = state_mutex.synchronize do
95+
state_mutex.synchronize do
9896
next false if @shutdown
9997

10098
@shutdown = true
10199
end
102-
103-
signal_reactor if should_shutdown
104100
end
105101

106102
def shutdown?
@@ -120,7 +116,7 @@ def metadata
120116
end
121117

122118
private
123-
attr_reader :boot_queue, :mutex, :on_state_change, :pending_executions, :reactor_thread, :state_mutex, :wakeup_reader, :wakeup_writer
119+
attr_reader :boot_queue, :mutex, :on_state_change, :pending_executions, :reactor_thread, :state_mutex
124120

125121
def name
126122
@name ||= "solid_queue-async-pool-#{object_id}"
@@ -133,31 +129,23 @@ def start_reactor
133129
boot_queue << :ready
134130

135131
wait_for_executions(semaphore)
136-
wait_for_child_tasks(task)
132+
wait_for_inflight_executions
137133
end
138134
rescue Exception => error
139135
register_fatal_error(error)
140136
raise
141-
ensure
142-
close_wakeup_pipe
143137
end
144138
end
145139

146140
def wait_for_executions(semaphore)
147141
loop do
148-
wakeup_reader.wait_readable
149-
clear_wakeup_signal
150142
schedule_pending_executions(semaphore)
151143

152144
break if shutdown? && pending_executions.empty?
153-
end
154-
end
155145

156-
def clear_wakeup_signal
157-
loop do
158-
wakeup_reader.read_nonblock(1024)
159-
rescue IO::WaitReadable, EOFError
160-
break
146+
# Older async releases don't support waking the reactor from another
147+
# thread reliably, so we cooperatively poll for newly posted work.
148+
sleep(IDLE_WAIT_INTERVAL) if pending_executions.empty?
161149
end
162150
end
163151

@@ -217,25 +205,12 @@ def raise_if_fatal_error!
217205
raise error if error
218206
end
219207

220-
def signal_reactor
221-
wakeup_writer.write_nonblock(WAKEUP_SIGNAL)
222-
rescue IO::WaitWritable, Errno::EPIPE, IOError
223-
nil
208+
def wait_for_inflight_executions
209+
sleep(IDLE_WAIT_INTERVAL) while executions_in_flight?
224210
end
225211

226-
def wait_for_child_tasks(task)
227-
if task.respond_to?(:wait_all)
228-
task.wait_all
229-
else
230-
task.children&.each(&:wait)
231-
end
232-
end
233-
234-
def close_wakeup_pipe
235-
wakeup_reader.close unless wakeup_reader.closed?
236-
wakeup_writer.close unless wakeup_writer.closed?
237-
rescue IOError
238-
nil
212+
def executions_in_flight?
213+
mutex.synchronize { @available_capacity < size }
239214
end
240215
end
241216
end

test/unit/worker_test.rb

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -269,28 +269,42 @@ class WorkerTest < ActiveSupport::TestCase
269269
end
270270

271271
test "sleeps `10.minutes` if at capacity" do
272-
3.times { |i| StoreResultJob.perform_later(i, pause: 1.second) }
272+
3.times { |i| StoreResultJob.perform_later(i, pause: 5.seconds) }
273273

274-
@worker.expects(:interruptible_sleep).with(10.minutes).at_least_once
275-
@worker.expects(:interruptible_sleep).with(@worker.polling_interval).never
276-
@worker.expects(:handle_thread_error).never
274+
delays = stub_interruptible_sleep(@worker)
277275

278276
@worker.start
279-
sleep 1.second
277+
278+
first_delay = Timeout.timeout(1.second) { delays.pop }
279+
280+
assert_equal 10.minutes, first_delay
280281
end
281282

282283
test "sleeps `polling_interval` if worker not at capacity" do
283-
2.times { |i| StoreResultJob.perform_later(i, pause: 1.second) }
284+
2.times { |i| StoreResultJob.perform_later(i, pause: 5.seconds) }
284285

285-
@worker.expects(:interruptible_sleep).with(@worker.polling_interval).at_least_once
286-
@worker.expects(:interruptible_sleep).with(10.minutes).never
287-
@worker.expects(:handle_thread_error).never
286+
delays = stub_interruptible_sleep(@worker)
288287

289288
@worker.start
290-
sleep 1.second
289+
290+
first_delay = Timeout.timeout(1.second) { delays.pop }
291+
292+
assert_equal @worker.polling_interval, first_delay
291293
end
292294

293295
private
296+
def stub_interruptible_sleep(worker)
297+
delays = Thread::Queue.new
298+
299+
worker.stubs(:handle_thread_error)
300+
worker.define_singleton_method(:interruptible_sleep) do |delay|
301+
delays << delay
302+
sleep 0.01
303+
end
304+
305+
delays
306+
end
307+
294308
def with_worker_execution_support(options, &block)
295309
if options[:execution_mode] == :async
296310
with_execution_isolation(:fiber, &block)

0 commit comments

Comments
 (0)