Skip to content

Commit d908cf0

Browse files
committed
Add experimental Horizon integration and refactor queue key handling
This took me about 1.5 days to get right, but I think it was worth it. The main issue was that Horizon dashboard showed "queues:default" instead of "default" because getQueue() was returning the prefixed key. Instead of adding workarounds, I decided to properly refactor the entire key handling system. Key changes: - Added experimental Horizon integration (fires JobPushed, JobReserved, etc.) - Introduced getCleanQueueName() to separate queue name from Redis keys - All helper methods now consistently add 'queues:' prefix internally - Unified key generation across BalancedRedisQueue, Metrics, SmartFairStrategy, and console commands This maintains backwards compatibility with existing Redis data structure while fixing the Horizon display issue. The integration is disabled by default and can be enabled via BALANCED_QUEUE_HORIZON_ENABLED=true. Hopefully everything works as expected - all tests pass (59 unit + 22 integration). Closes #9
1 parent a70cbd2 commit d908cf0

File tree

10 files changed

+525
-137
lines changed

10 files changed

+525
-137
lines changed

README.md

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,10 +227,45 @@ Add a supervisor for balanced queue in `config/horizon.php`:
227227
| Failed jobs list | Works | Failed jobs appear in Horizon |
228228
| Worker metrics | Works | CPU, memory, throughput visible |
229229
| **Pending jobs count** | **Doesn't work** | Horizon shows 0 pending |
230-
| **Completed jobs list** | **Doesn't work** | Use Prometheus/Grafana for metrics |
230+
| **Completed jobs list** | **Experimental** | Enable with `horizon.enabled` config |
231+
| **Recent jobs list** | **Experimental** | Enable with `horizon.enabled` config |
231232
| **horizon:clear** | **Doesn't work** | Use `balanced-queue:clear` instead |
232233

233-
**Why?** Balanced Queue uses a different Redis key structure (partitioned queues) than standard Laravel queues. Horizon expects jobs in `queues:{name}` but we store them in `balanced-queue:{queue}:{partition}`.
234+
**Why?** Balanced Queue uses a different Redis key structure (partitioned queues) than standard Laravel queues. Horizon expects jobs in `queues:{name}` but we store them in `balanced-queue:queues:{name}:{partition}`.
235+
236+
### Experimental: Horizon Dashboard Integration
237+
238+
You can enable experimental Horizon events integration to see completed/recent jobs in the Horizon dashboard:
239+
240+
```php
241+
// config/balanced-queue.php
242+
'horizon' => [
243+
'enabled' => 'auto', // 'auto', true, or false
244+
],
245+
```
246+
247+
| Value | Behavior |
248+
|-------|----------|
249+
| `'auto'` | Enable if `laravel/horizon` is installed (default) |
250+
| `true` | Always enable (requires Horizon) |
251+
| `false` | Disable Horizon events |
252+
253+
Or via environment variable:
254+
255+
```env
256+
BALANCED_QUEUE_HORIZON_ENABLED=auto
257+
```
258+
259+
**Warning:** This feature is experimental and adds a small overhead per job (writing to Horizon's Redis keys). Test thoroughly in your environment before using in production.
260+
261+
**What this enables:**
262+
- Completed jobs appear in Horizon dashboard
263+
- Recent jobs list works
264+
- Job metrics (throughput, runtime) are tracked
265+
266+
**What still doesn't work:**
267+
- Pending jobs count (architectural limitation)
268+
- `horizon:clear` command (use `balanced-queue:clear`)
234269

235270
### Monitoring Commands
236271

config/balanced-queue.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,29 @@
142142
*/
143143
'partition_resolver' => null,
144144

145+
/*
146+
|--------------------------------------------------------------------------
147+
| Horizon Integration (Experimental)
148+
|--------------------------------------------------------------------------
149+
|
150+
| WARNING: This feature is experimental and may have performance impact
151+
| on high-throughput systems. Test thoroughly before using in production.
152+
|
153+
| When enabled, balanced queue will fire Horizon events so jobs appear
154+
| in the Horizon dashboard (pending, completed, recent jobs).
155+
|
156+
| Options:
157+
| - true: Always fire Horizon events (requires laravel/horizon)
158+
| - false: Never fire Horizon events
159+
| - 'auto': Fire events only if Horizon is installed (default)
160+
|
161+
| Detection: Uses class_exists(\Laravel\Horizon\Horizon::class)
162+
|
163+
*/
164+
'horizon' => [
165+
'enabled' => env('BALANCED_QUEUE_HORIZON_ENABLED', 'auto'),
166+
],
167+
145168
/*
146169
|--------------------------------------------------------------------------
147170
| Redis Configuration

src/BalancedQueueManager.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ protected function createSmartFairStrategy(array $config): SmartFairStrategy
182182
$config['boost_small_queues'] ?? true,
183183
$config['small_queue_threshold'] ?? 5,
184184
$config['boost_multiplier'] ?? 1.5,
185+
$this->getPrefix(),
185186
$config['metrics_key_prefix'] ?? 'balanced-queue:metrics'
186187
);
187188
}

src/Console/BalancedQueueClearCommand.php

Lines changed: 48 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,25 @@ class BalancedQueueClearCommand extends Command
2121
public function handle(): int
2222
{
2323
$this->prefix = config('balanced-queue.redis.prefix', 'balanced-queue');
24-
$queue = $this->argument('queue');
24+
$queueName = $this->argument('queue');
2525
$partition = $this->option('partition');
2626
$force = $this->option('force');
2727

2828
$redis = Redis::connection(config('balanced-queue.redis.connection'));
29-
$queueKey = "queues:{$queue}";
3029

3130
if ($partition) {
32-
return $this->clearPartition($redis, $queueKey, $partition, $force);
31+
return $this->clearPartition($redis, $queueName, $partition, $force);
3332
}
3433

35-
return $this->clearAll($redis, $queueKey, $force);
34+
return $this->clearAll($redis, $queueName, $force);
3635
}
3736

38-
protected function clearPartition($redis, string $queueKey, string $partition, bool $force): int
37+
protected function clearPartition($redis, string $queueName, string $partition, bool $force): int
3938
{
40-
$queueListKey = "{$this->prefix}:{$queueKey}:{$partition}";
41-
$activeKey = "{$this->prefix}:{$queueKey}:{$partition}:active";
42-
$metricsKey = "{$this->prefix}:metrics:{$queueKey}:{$partition}";
43-
$partitionsKey = "{$this->prefix}:{$queueKey}:partitions";
39+
$queueListKey = $this->getPartitionQueueKey($queueName, $partition);
40+
$activeKey = $this->getActiveKey($queueName, $partition);
41+
$metricsKey = $this->getMetricsKey($queueName, $partition);
42+
$partitionsKey = $this->getPartitionsKey($queueName);
4443

4544
$pending = (int) $redis->llen($queueListKey);
4645
$active = (int) $redis->hlen($activeKey);
@@ -70,9 +69,9 @@ protected function clearPartition($redis, string $queueKey, string $partition, b
7069
return Command::SUCCESS;
7170
}
7271

73-
protected function clearAll($redis, string $queueKey, bool $force): int
72+
protected function clearAll($redis, string $queueName, bool $force): int
7473
{
75-
$partitionsKey = "{$this->prefix}:{$queueKey}:partitions";
74+
$partitionsKey = $this->getPartitionsKey($queueName);
7675
$partitions = $redis->smembers($partitionsKey);
7776

7877
if (empty($partitions)) {
@@ -85,8 +84,8 @@ protected function clearAll($redis, string $queueKey, bool $force): int
8584
$partitionStats = [];
8685

8786
foreach ($partitions as $partition) {
88-
$queueListKey = "{$this->prefix}:{$queueKey}:{$partition}";
89-
$activeKey = "{$this->prefix}:{$queueKey}:{$partition}:active";
87+
$queueListKey = $this->getPartitionQueueKey($queueName, $partition);
88+
$activeKey = $this->getActiveKey($queueName, $partition);
9089

9190
$pending = (int) $redis->llen($queueListKey);
9291
$active = (int) $redis->hlen($activeKey);
@@ -101,7 +100,7 @@ protected function clearAll($redis, string $queueKey, bool $force): int
101100
];
102101
}
103102

104-
$this->warn("Queue: {$queueKey}");
103+
$this->warn("Queue: {$queueName}");
105104
$this->table(
106105
['Partition', 'Pending', 'Active'],
107106
array_map(fn($s) => [$s['partition'], $s['pending'], $s['active']], $partitionStats)
@@ -117,20 +116,49 @@ protected function clearAll($redis, string $queueKey, bool $force): int
117116

118117
// Clear all
119118
foreach ($partitions as $partition) {
120-
$redis->del("{$this->prefix}:{$queueKey}:{$partition}");
121-
$redis->del("{$this->prefix}:{$queueKey}:{$partition}:active");
122-
$redis->del("{$this->prefix}:{$queueKey}:{$partition}:delayed");
123-
$redis->del("{$this->prefix}:metrics:{$queueKey}:{$partition}");
119+
$redis->del($this->getPartitionQueueKey($queueName, $partition));
120+
$redis->del($this->getActiveKey($queueName, $partition));
121+
$redis->del($this->getDelayedKey($queueName, $partition));
122+
$redis->del($this->getMetricsKey($queueName, $partition));
124123
}
125124

126125
$redis->del($partitionsKey);
127126

128-
// Clear round-robin state
129-
$rrStateKey = "{$this->prefix}:rr-state:{$queueKey}";
127+
// Clear round-robin state (uses queueName without 'queues:' prefix)
128+
$rrStateKey = "{$this->prefix}:rr-state:{$queueName}";
130129
$redis->del($rrStateKey);
131130

132131
$this->info("✓ Cleared {$totalPending} pending and {$totalActive} active jobs from " . count($partitions) . " partitions.");
133132

134133
return Command::SUCCESS;
135134
}
135+
136+
// =========================================================================
137+
// Redis Key Helpers (mirrors BalancedRedisQueue)
138+
// =========================================================================
139+
140+
protected function getPartitionsKey(string $queueName): string
141+
{
142+
return "{$this->prefix}:queues:{$queueName}:partitions";
143+
}
144+
145+
protected function getPartitionQueueKey(string $queueName, string $partition): string
146+
{
147+
return "{$this->prefix}:queues:{$queueName}:{$partition}";
148+
}
149+
150+
protected function getActiveKey(string $queueName, string $partition): string
151+
{
152+
return "{$this->prefix}:queues:{$queueName}:{$partition}:active";
153+
}
154+
155+
protected function getMetricsKey(string $queueName, string $partition): string
156+
{
157+
return "{$this->prefix}:metrics:{$queueName}:{$partition}";
158+
}
159+
160+
protected function getDelayedKey(string $queueName, string $partition): string
161+
{
162+
return "{$this->prefix}:queues:{$queueName}:{$partition}:delayed";
163+
}
136164
}

src/Console/BalancedQueueTableCommand.php

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,15 @@ protected function getAllQueues(): array
6565
*/
6666
protected function displayQueues(array $queues): void
6767
{
68-
foreach ($queues as $index => $queue) {
68+
foreach ($queues as $index => $queueName) {
6969
if ($index > 0) {
7070
$this->newLine();
7171
}
7272
if (count($queues) > 1) {
73-
$this->info("Queue: {$queue}");
73+
$this->info("Queue: {$queueName}");
7474
$this->line(str_repeat('-', 40));
7575
}
76-
$this->displayTable($queue);
76+
$this->displayTable($queueName);
7777
}
7878
}
7979

@@ -92,38 +92,37 @@ protected function watchQueues(array $queues, int $interval): void
9292
// Clear screen
9393
$this->output->write("\033[2J\033[H");
9494

95-
foreach ($queues as $index => $queue) {
95+
foreach ($queues as $index => $queueName) {
9696
if ($index > 0) {
9797
$this->newLine();
9898
}
99-
$this->displayHeader($queue);
100-
$this->displayTable($queue);
99+
$this->displayHeader($queueName);
100+
$this->displayTable($queueName);
101101
}
102102
$this->displayFooter();
103103

104104
sleep($interval);
105105
}
106106
}
107107

108-
protected function displayHeader(string $queue): void
108+
protected function displayHeader(string $queueName): void
109109
{
110110
$this->info("╔══════════════════════════════════════════════════════════════╗");
111-
$this->info("║ BALANCED QUEUE MONITOR - {$queue}");
111+
$this->info("║ BALANCED QUEUE MONITOR - {$queueName}");
112112
$this->info("╚══════════════════════════════════════════════════════════════╝");
113113
$this->newLine();
114114
}
115115

116-
protected function displayTable(string $queue): void
116+
protected function displayTable(string $queueName): void
117117
{
118118
$redis = Redis::connection(config('balanced-queue.redis.connection'));
119-
$queueKey = "queues:{$queue}";
120119

121120
// Get all partitions
122-
$partitionsKey = "{$this->prefix}:{$queueKey}:partitions";
121+
$partitionsKey = $this->getPartitionsKey($queueName);
123122
$partitions = $redis->smembers($partitionsKey);
124123

125124
if (empty($partitions)) {
126-
$this->warn("No active partitions in queue '{$queue}'");
125+
$this->warn("No active partitions in queue '{$queueName}'");
127126
return;
128127
}
129128

@@ -132,9 +131,9 @@ protected function displayTable(string $queue): void
132131
$rows = [];
133132

134133
foreach ($partitions as $partition) {
135-
$queueListKey = "{$this->prefix}:{$queueKey}:{$partition}";
136-
$activeKey = "{$this->prefix}:{$queueKey}:{$partition}:active";
137-
$metricsKey = "{$this->prefix}:metrics:{$queueKey}:{$partition}";
134+
$queueListKey = $this->getPartitionQueueKey($queueName, $partition);
135+
$activeKey = $this->getActiveKey($queueName, $partition);
136+
$metricsKey = $this->getMetricsKey($queueName, $partition);
138137

139138
$pending = (int) $redis->llen($queueListKey);
140139
$active = (int) $redis->hlen($activeKey);
@@ -209,4 +208,28 @@ protected function formatActive(int $num): string
209208

210209
return "<fg=green>{$num}</>";
211210
}
211+
212+
// =========================================================================
213+
// Redis Key Helpers (mirrors BalancedRedisQueue)
214+
// =========================================================================
215+
216+
protected function getPartitionsKey(string $queueName): string
217+
{
218+
return "{$this->prefix}:queues:{$queueName}:partitions";
219+
}
220+
221+
protected function getPartitionQueueKey(string $queueName, string $partition): string
222+
{
223+
return "{$this->prefix}:queues:{$queueName}:{$partition}";
224+
}
225+
226+
protected function getActiveKey(string $queueName, string $partition): string
227+
{
228+
return "{$this->prefix}:queues:{$queueName}:{$partition}:active";
229+
}
230+
231+
protected function getMetricsKey(string $queueName, string $partition): string
232+
{
233+
return "{$this->prefix}:metrics:{$queueName}:{$partition}";
234+
}
212235
}

src/Queue/BalancedRedisJob.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public function release($delay = 0): void
4848
$this->partition,
4949
$this->balancedJobId,
5050
$this->getRawBody(),
51-
$delay
51+
$delay,
52+
$this
5253
);
5354
}
5455

@@ -65,7 +66,8 @@ public function delete(): void
6566
$redis->deletePartitionJob(
6667
$this->queue,
6768
$this->partition,
68-
$this->balancedJobId
69+
$this->balancedJobId,
70+
$this
6971
);
7072
}
7173

0 commit comments

Comments
 (0)