Skip to content

Commit 4035550

Browse files
authored
Merge pull request #10 from YanGusik/feature/horizon-integration
[#9] Add experimental Horizon integration and refactor queue key handling
2 parents a70cbd2 + c927fdd commit 4035550

File tree

11 files changed

+477
-180
lines changed

11 files changed

+477
-180
lines changed

README.md

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,10 +227,45 @@ Add a supervisor for balanced queue in `config/horizon.php`:
227227
| Failed jobs list | Works | Failed jobs appear in Horizon |
228228
| Worker metrics | Works | CPU, memory, throughput visible |
229229
| **Pending jobs count** | **Doesn't work** | Horizon shows 0 pending |
230-
| **Completed jobs list** | **Doesn't work** | Use Prometheus/Grafana for metrics |
230+
| **Completed jobs list** | **Experimental** | Enable with `horizon.enabled` config |
231+
| **Recent jobs list** | **Experimental** | Enable with `horizon.enabled` config |
231232
| **horizon:clear** | **Doesn't work** | Use `balanced-queue:clear` instead |
232233

233-
**Why?** Balanced Queue uses a different Redis key structure (partitioned queues) than standard Laravel queues. Horizon expects jobs in `queues:{name}` but we store them in `balanced-queue:{queue}:{partition}`.
234+
**Why?** Balanced Queue uses a different Redis key structure (partitioned queues) than standard Laravel queues. Horizon expects jobs in `queues:{name}` but we store them in `balanced-queue:queues:{name}:{partition}`.
235+
236+
### Experimental: Horizon Dashboard Integration
237+
238+
You can enable experimental Horizon events integration to see completed/recent jobs in the Horizon dashboard:
239+
240+
```php
241+
// config/balanced-queue.php
242+
'horizon' => [
243+
'enabled' => 'auto', // 'auto', true, or false
244+
],
245+
```
246+
247+
| Value | Behavior |
248+
|-------|----------|
249+
| `'auto'` | Enable if `laravel/horizon` is installed (default) |
250+
| `true` | Always enable (requires Horizon) |
251+
| `false` | Disable Horizon events |
252+
253+
Or via environment variable:
254+
255+
```env
256+
BALANCED_QUEUE_HORIZON_ENABLED=auto
257+
```
258+
259+
**Warning:** This feature is experimental and adds a small overhead per job (writing to Horizon's Redis keys). Test thoroughly in your environment before using in production.
260+
261+
**What this enables:**
262+
- Completed jobs appear in Horizon dashboard
263+
- Recent jobs list works
264+
- Job metrics (throughput, runtime) are tracked
265+
266+
**What still doesn't work:**
267+
- Pending jobs count (architectural limitation)
268+
- `horizon:clear` command (use `balanced-queue:clear`)
234269

235270
### Monitoring Commands
236271

config/balanced-queue.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,29 @@
142142
*/
143143
'partition_resolver' => null,
144144

145+
/*
146+
|--------------------------------------------------------------------------
147+
| Horizon Integration (Experimental)
148+
|--------------------------------------------------------------------------
149+
|
150+
| WARNING: This feature is experimental and may have performance impact
151+
| on high-throughput systems. Test thoroughly before using in production.
152+
|
153+
| When enabled, balanced queue will fire Horizon events so jobs appear
154+
| in the Horizon dashboard (pending, completed, recent jobs).
155+
|
156+
| Options:
157+
| - true: Always fire Horizon events (requires laravel/horizon)
158+
| - false: Never fire Horizon events
159+
| - 'auto': Fire events only if Horizon is installed (default)
160+
|
161+
| Detection: Uses class_exists(\Laravel\Horizon\Horizon::class)
162+
|
163+
*/
164+
'horizon' => [
165+
'enabled' => env('BALANCED_QUEUE_HORIZON_ENABLED', 'auto'),
166+
],
167+
145168
/*
146169
|--------------------------------------------------------------------------
147170
| Redis Configuration

src/BalancedQueueManager.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ protected function createSmartFairStrategy(array $config): SmartFairStrategy
182182
$config['boost_small_queues'] ?? true,
183183
$config['small_queue_threshold'] ?? 5,
184184
$config['boost_multiplier'] ?? 1.5,
185-
$config['metrics_key_prefix'] ?? 'balanced-queue:metrics'
185+
$this->getPrefix()
186186
);
187187
}
188188

src/Console/BalancedQueueClearCommand.php

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
use Illuminate\Console\Command;
88
use Illuminate\Support\Facades\Redis;
9+
use YanGusik\BalancedQueue\Support\RedisKeys;
910

1011
class BalancedQueueClearCommand extends Command
1112
{
@@ -16,31 +17,30 @@ class BalancedQueueClearCommand extends Command
1617

1718
protected $description = 'Clear balanced queue jobs';
1819

19-
protected string $prefix;
20+
protected RedisKeys $keys;
2021

2122
public function handle(): int
2223
{
23-
$this->prefix = config('balanced-queue.redis.prefix', 'balanced-queue');
24-
$queue = $this->argument('queue');
24+
$this->keys = new RedisKeys(config('balanced-queue.redis.prefix', 'balanced-queue'));
25+
$queueName = $this->argument('queue');
2526
$partition = $this->option('partition');
2627
$force = $this->option('force');
2728

2829
$redis = Redis::connection(config('balanced-queue.redis.connection'));
29-
$queueKey = "queues:{$queue}";
3030

3131
if ($partition) {
32-
return $this->clearPartition($redis, $queueKey, $partition, $force);
32+
return $this->clearPartition($redis, $queueName, $partition, $force);
3333
}
3434

35-
return $this->clearAll($redis, $queueKey, $force);
35+
return $this->clearAll($redis, $queueName, $force);
3636
}
3737

38-
protected function clearPartition($redis, string $queueKey, string $partition, bool $force): int
38+
protected function clearPartition($redis, string $queueName, string $partition, bool $force): int
3939
{
40-
$queueListKey = "{$this->prefix}:{$queueKey}:{$partition}";
41-
$activeKey = "{$this->prefix}:{$queueKey}:{$partition}:active";
42-
$metricsKey = "{$this->prefix}:metrics:{$queueKey}:{$partition}";
43-
$partitionsKey = "{$this->prefix}:{$queueKey}:partitions";
40+
$queueListKey = $this->keys->partitionQueue($queueName, $partition);
41+
$activeKey = $this->keys->active($queueName, $partition);
42+
$metricsKey = $this->keys->metrics($queueName, $partition);
43+
$partitionsKey = $this->keys->partitions($queueName);
4444

4545
$pending = (int) $redis->llen($queueListKey);
4646
$active = (int) $redis->hlen($activeKey);
@@ -70,9 +70,9 @@ protected function clearPartition($redis, string $queueKey, string $partition, b
7070
return Command::SUCCESS;
7171
}
7272

73-
protected function clearAll($redis, string $queueKey, bool $force): int
73+
protected function clearAll($redis, string $queueName, bool $force): int
7474
{
75-
$partitionsKey = "{$this->prefix}:{$queueKey}:partitions";
75+
$partitionsKey = $this->keys->partitions($queueName);
7676
$partitions = $redis->smembers($partitionsKey);
7777

7878
if (empty($partitions)) {
@@ -85,8 +85,8 @@ protected function clearAll($redis, string $queueKey, bool $force): int
8585
$partitionStats = [];
8686

8787
foreach ($partitions as $partition) {
88-
$queueListKey = "{$this->prefix}:{$queueKey}:{$partition}";
89-
$activeKey = "{$this->prefix}:{$queueKey}:{$partition}:active";
88+
$queueListKey = $this->keys->partitionQueue($queueName, $partition);
89+
$activeKey = $this->keys->active($queueName, $partition);
9090

9191
$pending = (int) $redis->llen($queueListKey);
9292
$active = (int) $redis->hlen($activeKey);
@@ -101,7 +101,7 @@ protected function clearAll($redis, string $queueKey, bool $force): int
101101
];
102102
}
103103

104-
$this->warn("Queue: {$queueKey}");
104+
$this->warn("Queue: {$queueName}");
105105
$this->table(
106106
['Partition', 'Pending', 'Active'],
107107
array_map(fn($s) => [$s['partition'], $s['pending'], $s['active']], $partitionStats)
@@ -117,17 +117,16 @@ protected function clearAll($redis, string $queueKey, bool $force): int
117117

118118
// Clear all
119119
foreach ($partitions as $partition) {
120-
$redis->del("{$this->prefix}:{$queueKey}:{$partition}");
121-
$redis->del("{$this->prefix}:{$queueKey}:{$partition}:active");
122-
$redis->del("{$this->prefix}:{$queueKey}:{$partition}:delayed");
123-
$redis->del("{$this->prefix}:metrics:{$queueKey}:{$partition}");
120+
$redis->del($this->keys->partitionQueue($queueName, $partition));
121+
$redis->del($this->keys->active($queueName, $partition));
122+
$redis->del($this->keys->delayed($queueName, $partition));
123+
$redis->del($this->keys->metrics($queueName, $partition));
124124
}
125125

126126
$redis->del($partitionsKey);
127127

128128
// Clear round-robin state
129-
$rrStateKey = "{$this->prefix}:rr-state:{$queueKey}";
130-
$redis->del($rrStateKey);
129+
$redis->del($this->keys->roundRobinState($queueName));
131130

132131
$this->info("✓ Cleared {$totalPending} pending and {$totalActive} active jobs from " . count($partitions) . " partitions.");
133132

src/Console/BalancedQueueTableCommand.php

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Illuminate\Console\Command;
88
use Illuminate\Support\Facades\Redis;
99
use YanGusik\BalancedQueue\Support\Metrics;
10+
use YanGusik\BalancedQueue\Support\RedisKeys;
1011

1112
class BalancedQueueTableCommand extends Command
1213
{
@@ -18,11 +19,11 @@ class BalancedQueueTableCommand extends Command
1819

1920
protected $description = 'Display balanced queue statistics';
2021

21-
protected string $prefix;
22+
protected RedisKeys $keys;
2223

2324
public function handle(): int
2425
{
25-
$this->prefix = config('balanced-queue.redis.prefix', 'balanced-queue');
26+
$this->keys = new RedisKeys(config('balanced-queue.redis.prefix', 'balanced-queue'));
2627
$showAll = $this->option('all');
2728
$watch = $this->option('watch');
2829
$interval = (int) $this->option('interval');
@@ -53,7 +54,7 @@ public function handle(): int
5354
*/
5455
protected function getAllQueues(): array
5556
{
56-
$metrics = new Metrics(Redis::connection(config('balanced-queue.redis.connection')), $this->prefix);
57+
$metrics = new Metrics(Redis::connection(config('balanced-queue.redis.connection')), $this->keys->getPrefix());
5758

5859
return $metrics->getAllQueues();
5960
}
@@ -65,15 +66,15 @@ protected function getAllQueues(): array
6566
*/
6667
protected function displayQueues(array $queues): void
6768
{
68-
foreach ($queues as $index => $queue) {
69+
foreach ($queues as $index => $queueName) {
6970
if ($index > 0) {
7071
$this->newLine();
7172
}
7273
if (count($queues) > 1) {
73-
$this->info("Queue: {$queue}");
74+
$this->info("Queue: {$queueName}");
7475
$this->line(str_repeat('-', 40));
7576
}
76-
$this->displayTable($queue);
77+
$this->displayTable($queueName);
7778
}
7879
}
7980

@@ -92,38 +93,37 @@ protected function watchQueues(array $queues, int $interval): void
9293
// Clear screen
9394
$this->output->write("\033[2J\033[H");
9495

95-
foreach ($queues as $index => $queue) {
96+
foreach ($queues as $index => $queueName) {
9697
if ($index > 0) {
9798
$this->newLine();
9899
}
99-
$this->displayHeader($queue);
100-
$this->displayTable($queue);
100+
$this->displayHeader($queueName);
101+
$this->displayTable($queueName);
101102
}
102103
$this->displayFooter();
103104

104105
sleep($interval);
105106
}
106107
}
107108

108-
protected function displayHeader(string $queue): void
109+
protected function displayHeader(string $queueName): void
109110
{
110111
$this->info("╔══════════════════════════════════════════════════════════════╗");
111-
$this->info("║ BALANCED QUEUE MONITOR - {$queue}");
112+
$this->info("║ BALANCED QUEUE MONITOR - {$queueName}");
112113
$this->info("╚══════════════════════════════════════════════════════════════╝");
113114
$this->newLine();
114115
}
115116

116-
protected function displayTable(string $queue): void
117+
protected function displayTable(string $queueName): void
117118
{
118119
$redis = Redis::connection(config('balanced-queue.redis.connection'));
119-
$queueKey = "queues:{$queue}";
120120

121121
// Get all partitions
122-
$partitionsKey = "{$this->prefix}:{$queueKey}:partitions";
122+
$partitionsKey = $this->keys->partitions($queueName);
123123
$partitions = $redis->smembers($partitionsKey);
124124

125125
if (empty($partitions)) {
126-
$this->warn("No active partitions in queue '{$queue}'");
126+
$this->warn("No active partitions in queue '{$queueName}'");
127127
return;
128128
}
129129

@@ -132,9 +132,9 @@ protected function displayTable(string $queue): void
132132
$rows = [];
133133

134134
foreach ($partitions as $partition) {
135-
$queueListKey = "{$this->prefix}:{$queueKey}:{$partition}";
136-
$activeKey = "{$this->prefix}:{$queueKey}:{$partition}:active";
137-
$metricsKey = "{$this->prefix}:metrics:{$queueKey}:{$partition}";
135+
$queueListKey = $this->keys->partitionQueue($queueName, $partition);
136+
$activeKey = $this->keys->active($queueName, $partition);
137+
$metricsKey = $this->keys->metrics($queueName, $partition);
138138

139139
$pending = (int) $redis->llen($queueListKey);
140140
$active = (int) $redis->hlen($activeKey);

src/Queue/BalancedRedisJob.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public function release($delay = 0): void
4848
$this->partition,
4949
$this->balancedJobId,
5050
$this->getRawBody(),
51-
$delay
51+
$delay,
52+
$this
5253
);
5354
}
5455

@@ -65,7 +66,8 @@ public function delete(): void
6566
$redis->deletePartitionJob(
6667
$this->queue,
6768
$this->partition,
68-
$this->balancedJobId
69+
$this->balancedJobId,
70+
$this
6971
);
7072
}
7173

0 commit comments

Comments
 (0)