Skip to content

Commit 275a1f9

Browse files
committed
Add support for delayed jobs and fix release with delay
- Override later() to push delayed jobs to partitioned delayed ZSET - Add migratePartitionDelayed() called on every pop() to move ready jobs - Add migrateDelayed() Lua script for atomic ZSET → LIST migration - Update popWithLimit() Lua to check delayed ZSET before removing partition from SET - Add integration tests for delayed job behavior Fixes #11
1 parent 4035550 commit 275a1f9

File tree

3 files changed

+206
-4
lines changed

3 files changed

+206
-4
lines changed

src/Queue/BalancedRedisQueue.php

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,30 @@ protected function pushToPartition(string $queueName, string $partition, string
182182
return $result;
183183
}
184184

185+
/**
186+
* Push a job onto the queue after a delay.
187+
*/
188+
public function later($delay, $job, $data = '', $queue = null): mixed
189+
{
190+
$queueName = $this->getCleanQueueName($queue);
191+
$partition = $this->resolvePartition($job);
192+
$payload = $this->createPayload($job, $this->getQueue($queue), $data);
193+
194+
$redis = $this->getConnection();
195+
196+
// Add partition to SET so strategy visits it
197+
$redis->sadd($this->keys->partitions($queueName), $partition);
198+
199+
// Push to delayed ZSET with score = availableAt timestamp
200+
$redis->zadd(
201+
$this->keys->delayed($queueName, $partition),
202+
$this->availableAt($delay),
203+
$payload
204+
);
205+
206+
return 0;
207+
}
208+
185209
/**
186210
* Pop the next job from the queue.
187211
*/
@@ -198,10 +222,29 @@ public function pop($queue = null, $index = 0): ?Job
198222
return null;
199223
}
200224

225+
// Migrate any delayed jobs that are ready for this partition
226+
$this->migratePartitionDelayed($queueName, $partition);
227+
201228
// Try to pop a job with concurrency limit
202229
return $this->popFromPartition($queueName, $partition);
203230
}
204231

232+
/**
233+
* Migrate delayed jobs that are ready into the partition queue.
234+
*/
235+
protected function migratePartitionDelayed(string $queueName, string $partition): void
236+
{
237+
$this->getConnection()->eval(
238+
LuaScripts::migrateDelayed(),
239+
3,
240+
$this->keys->delayed($queueName, $partition),
241+
$this->keys->partitionQueue($queueName, $partition),
242+
$this->keys->partitions($queueName),
243+
$partition,
244+
time()
245+
);
246+
}
247+
205248
/**
206249
* Pop a job from a specific partition.
207250
*
@@ -231,11 +274,12 @@ protected function popFromPartition(string $queueName, string $partition): ?Job
231274
// Pop with limit check
232275
$payload = $redis->eval(
233276
LuaScripts::popWithLimit(),
234-
4,
277+
5,
235278
$queueKey,
236279
$partitionsKey,
237280
$activeKey,
238281
$metricsKey,
282+
$this->keys->delayed($queueName, $partition),
239283
$partition,
240284
$jobId,
241285
$this->getLimiterMaxConcurrent(),

src/Queue/LuaScripts.php

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public static function pop(): string
9090
* KEYS[2] - partitions set key
9191
* KEYS[3] - active jobs key
9292
* KEYS[4] - metrics key
93+
* KEYS[5] - delayed jobs key
9394
* ARGV[1] - partition identifier
9495
* ARGV[2] - job id
9596
* ARGV[3] - max concurrent
@@ -103,6 +104,7 @@ public static function popWithLimit(): string
103104
local partitions_key = KEYS[2]
104105
local active_key = KEYS[3]
105106
local metrics_key = KEYS[4]
107+
local delayed_key = KEYS[5]
106108
local partition = ARGV[1]
107109
local job_id = ARGV[2]
108110
local max_concurrent = tonumber(ARGV[3])
@@ -126,18 +128,61 @@ public static function popWithLimit(): string
126128
-- Update metrics
127129
redis.call('HINCRBY', metrics_key, 'total_popped', 1)
128130
129-
-- Check if queue is now empty
131+
-- Remove partition from set only if both LIST and delayed ZSET are empty
130132
local remaining = redis.call('LLEN', queue_key)
131133
if remaining == 0 then
132-
redis.call('SREM', partitions_key, partition)
133-
redis.call('HDEL', metrics_key, 'first_job_time')
134+
local delayed_count = redis.call('ZCARD', delayed_key)
135+
if delayed_count == 0 then
136+
redis.call('SREM', partitions_key, partition)
137+
redis.call('HDEL', metrics_key, 'first_job_time')
138+
end
134139
end
135140
end
136141
137142
return job
138143
LUA;
139144
}
140145

146+
/**
147+
* Migrate delayed jobs that are ready into the partition queue.
148+
* Removes partition from set if both LIST and delayed ZSET are empty.
149+
*
150+
* KEYS[1] - delayed jobs key (ZSET)
151+
* KEYS[2] - partition queue key (LIST)
152+
* KEYS[3] - partitions set key
153+
* ARGV[1] - partition identifier
154+
* ARGV[2] - current timestamp
155+
*/
156+
public static function migrateDelayed(): string
157+
{
158+
return <<<'LUA'
159+
local delayed_key = KEYS[1]
160+
local queue_key = KEYS[2]
161+
local partitions_key = KEYS[3]
162+
local partition = ARGV[1]
163+
local current_time = tonumber(ARGV[2])
164+
165+
-- Get all jobs with score <= current_time (ready to process)
166+
local jobs = redis.call('ZRANGEBYSCORE', delayed_key, '-inf', current_time)
167+
168+
if #jobs > 0 then
169+
redis.call('ZREMRANGEBYSCORE', delayed_key, '-inf', current_time)
170+
for _, job in ipairs(jobs) do
171+
redis.call('RPUSH', queue_key, job)
172+
end
173+
end
174+
175+
-- Remove partition from set only if both LIST and delayed ZSET are empty
176+
local list_len = redis.call('LLEN', queue_key)
177+
local delayed_count = redis.call('ZCARD', delayed_key)
178+
if list_len == 0 and delayed_count == 0 then
179+
redis.call('SREM', partitions_key, partition)
180+
end
181+
182+
return #jobs
183+
LUA;
184+
}
185+
141186
/**
142187
* Get partition stats.
143188
*

tests/Integration/BalancedQueueIntegrationTest.php

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,119 @@ public function test_numeric_partition_keys_work_correctly(): void
106106
$this->assertIsString($partitions[0]);
107107
}
108108

109+
public function test_later_adds_partition_to_set(): void
110+
{
111+
$job = new TestJob(['user_id' => 555]);
112+
113+
$this->queue->later(3600, $job, '', 'default');
114+
115+
$partitions = $this->metrics->getPartitions('default');
116+
$this->assertContains('user:555', $partitions);
117+
}
118+
119+
public function test_later_job_not_available_before_delay(): void
120+
{
121+
$job = new TestJob(['user_id' => 555]);
122+
123+
$this->queue->later(3600, $job, '', 'default');
124+
125+
// Job should not be available yet (delay = 1 hour)
126+
$poppedJob = $this->queue->pop('default');
127+
$this->assertNull($poppedJob);
128+
}
129+
130+
public function test_later_job_available_after_delay_expires(): void
131+
{
132+
$job = new TestJob(['user_id' => 555]);
133+
134+
// Delay of 0 seconds — immediately available
135+
$this->queue->later(0, $job, '', 'default');
136+
137+
$poppedJob = $this->queue->pop('default');
138+
$this->assertNotNull($poppedJob);
139+
}
140+
141+
public function test_later_partition_removed_from_set_after_job_processed(): void
142+
{
143+
$job = new TestJob(['user_id' => 555]);
144+
145+
$this->queue->later(0, $job, '', 'default');
146+
147+
$poppedJob = $this->queue->pop('default');
148+
$this->assertNotNull($poppedJob);
149+
150+
$poppedJob->delete();
151+
152+
// After processing, partition should be removed from set
153+
$partitions = $this->metrics->getPartitions('default');
154+
$this->assertNotContains('user:555', $partitions);
155+
}
156+
157+
public function test_later_partition_stays_in_set_while_delayed_jobs_pending(): void
158+
{
159+
$job = new TestJob(['user_id' => 555]);
160+
161+
// Push with 1 hour delay — partition should stay in set
162+
$this->queue->later(3600, $job, '', 'default');
163+
164+
// Pop returns null (nothing ready), but partition stays because delayed ZSET is not empty
165+
$this->queue->pop('default');
166+
167+
$partitions = $this->metrics->getPartitions('default');
168+
$this->assertContains('user:555', $partitions);
169+
}
170+
171+
// public function test_later_job_becomes_available_after_delay_with_sleep(): void
172+
// {
173+
// $job = new TestJob(['user_id' => 555]);
174+
//
175+
// $this->queue->later(2, $job, '', 'default');
176+
//
177+
// // Before delay expires — not available
178+
// $before = $this->queue->pop('default');
179+
// $this->assertNull($before, 'Job should not be available before delay expires');
180+
//
181+
// sleep(3);
182+
//
183+
// // After delay expires — available
184+
// $after = $this->queue->pop('default');
185+
// $this->assertNotNull($after, 'Job should be available after delay expires');
186+
// }
187+
188+
public function test_release_with_delay_requeues_job(): void
189+
{
190+
$job = new TestJob(['user_id' => 777]);
191+
192+
$this->queue->push($job, '', 'default');
193+
194+
$poppedJob = $this->queue->pop('default');
195+
$this->assertNotNull($poppedJob);
196+
197+
// Release back with delay of 0 (immediately available)
198+
$poppedJob->release(0);
199+
200+
// Job should be available again
201+
$poppedJob2 = $this->queue->pop('default');
202+
$this->assertNotNull($poppedJob2);
203+
}
204+
205+
public function test_release_with_future_delay_not_available_immediately(): void
206+
{
207+
$job = new TestJob(['user_id' => 777]);
208+
209+
$this->queue->push($job, '', 'default');
210+
211+
$poppedJob = $this->queue->pop('default');
212+
$this->assertNotNull($poppedJob);
213+
214+
// Release back with 1 hour delay
215+
$poppedJob->release(3600);
216+
217+
// Job should not be available yet
218+
$poppedJob2 = $this->queue->pop('default');
219+
$this->assertNull($poppedJob2);
220+
}
221+
109222
public function test_concurrent_limit_respected(): void
110223
{
111224
// Default max_concurrent is 2, push 3 jobs to same partition

0 commit comments

Comments
 (0)