Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,45 @@ Add a supervisor for balanced queue in `config/horizon.php`:
| Failed jobs list | Works | Failed jobs appear in Horizon |
| Worker metrics | Works | CPU, memory, throughput visible |
| **Pending jobs count** | **Doesn't work** | Horizon shows 0 pending |
| **Completed jobs list** | **Doesn't work** | Use Prometheus/Grafana for metrics |
| **Completed jobs list** | **Experimental** | Enable with `horizon.enabled` config |
| **Recent jobs list** | **Experimental** | Enable with `horizon.enabled` config |
| **horizon:clear** | **Doesn't work** | Use `balanced-queue:clear` instead |

**Why?** Balanced Queue uses a different Redis key structure (partitioned queues) than standard Laravel queues. Horizon expects jobs in `queues:{name}` but we store them in `balanced-queue:{queue}:{partition}`.
**Why?** Balanced Queue uses a different Redis key structure (partitioned queues) than standard Laravel queues. Horizon expects jobs in `queues:{name}` but we store them in `balanced-queue:queues:{name}:{partition}`.

### Experimental: Horizon Dashboard Integration

You can enable experimental Horizon events integration to see completed/recent jobs in the Horizon dashboard:

```php
// config/balanced-queue.php
'horizon' => [
'enabled' => 'auto', // 'auto', true, or false
],
```

| Value | Behavior |
|-------|----------|
| `'auto'` | Enable if `laravel/horizon` is installed (default) |
| `true` | Always enable (requires Horizon) |
| `false` | Disable Horizon events |

Or via environment variable:

```env
BALANCED_QUEUE_HORIZON_ENABLED=auto
```

**Warning:** This feature is experimental and adds a small overhead per job (writing to Horizon's Redis keys). Test thoroughly in your environment before using in production.

**What this enables:**
- Completed jobs appear in Horizon dashboard
- Recent jobs list works
- Job metrics (throughput, runtime) are tracked

**What still doesn't work:**
- Pending jobs count (architectural limitation)
- `horizon:clear` command (use `balanced-queue:clear`)

### Monitoring Commands

Expand Down
23 changes: 23 additions & 0 deletions config/balanced-queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,29 @@
*/
'partition_resolver' => null,

/*
|--------------------------------------------------------------------------
| Horizon Integration (Experimental)
|--------------------------------------------------------------------------
|
| WARNING: This feature is experimental and may have performance impact
| on high-throughput systems. Test thoroughly before using in production.
|
| When enabled, balanced queue will fire Horizon events so jobs appear
| in the Horizon dashboard (pending, completed, recent jobs).
|
| Options:
| - true: Always fire Horizon events (requires laravel/horizon)
| - false: Never fire Horizon events
| - 'auto': Fire events only if Horizon is installed (default)
|
| Detection: Uses class_exists(\Laravel\Horizon\Horizon::class)
|
*/
'horizon' => [
'enabled' => env('BALANCED_QUEUE_HORIZON_ENABLED', 'auto'),
],

/*
|--------------------------------------------------------------------------
| Redis Configuration
Expand Down
2 changes: 1 addition & 1 deletion src/BalancedQueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ protected function createSmartFairStrategy(array $config): SmartFairStrategy
$config['boost_small_queues'] ?? true,
$config['small_queue_threshold'] ?? 5,
$config['boost_multiplier'] ?? 1.5,
$config['metrics_key_prefix'] ?? 'balanced-queue:metrics'
$this->getPrefix()
);
}

Expand Down
43 changes: 21 additions & 22 deletions src/Console/BalancedQueueClearCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;
use YanGusik\BalancedQueue\Support\RedisKeys;

class BalancedQueueClearCommand extends Command
{
Expand All @@ -16,31 +17,30 @@ class BalancedQueueClearCommand extends Command

protected $description = 'Clear balanced queue jobs';

protected string $prefix;
protected RedisKeys $keys;

public function handle(): int
{
$this->prefix = config('balanced-queue.redis.prefix', 'balanced-queue');
$queue = $this->argument('queue');
$this->keys = new RedisKeys(config('balanced-queue.redis.prefix', 'balanced-queue'));
$queueName = $this->argument('queue');
$partition = $this->option('partition');
$force = $this->option('force');

$redis = Redis::connection(config('balanced-queue.redis.connection'));
$queueKey = "queues:{$queue}";

if ($partition) {
return $this->clearPartition($redis, $queueKey, $partition, $force);
return $this->clearPartition($redis, $queueName, $partition, $force);
}

return $this->clearAll($redis, $queueKey, $force);
return $this->clearAll($redis, $queueName, $force);
}

protected function clearPartition($redis, string $queueKey, string $partition, bool $force): int
protected function clearPartition($redis, string $queueName, string $partition, bool $force): int
{
$queueListKey = "{$this->prefix}:{$queueKey}:{$partition}";
$activeKey = "{$this->prefix}:{$queueKey}:{$partition}:active";
$metricsKey = "{$this->prefix}:metrics:{$queueKey}:{$partition}";
$partitionsKey = "{$this->prefix}:{$queueKey}:partitions";
$queueListKey = $this->keys->partitionQueue($queueName, $partition);
$activeKey = $this->keys->active($queueName, $partition);
$metricsKey = $this->keys->metrics($queueName, $partition);
$partitionsKey = $this->keys->partitions($queueName);

$pending = (int) $redis->llen($queueListKey);
$active = (int) $redis->hlen($activeKey);
Expand Down Expand Up @@ -70,9 +70,9 @@ protected function clearPartition($redis, string $queueKey, string $partition, b
return Command::SUCCESS;
}

protected function clearAll($redis, string $queueKey, bool $force): int
protected function clearAll($redis, string $queueName, bool $force): int
{
$partitionsKey = "{$this->prefix}:{$queueKey}:partitions";
$partitionsKey = $this->keys->partitions($queueName);
$partitions = $redis->smembers($partitionsKey);

if (empty($partitions)) {
Expand All @@ -85,8 +85,8 @@ protected function clearAll($redis, string $queueKey, bool $force): int
$partitionStats = [];

foreach ($partitions as $partition) {
$queueListKey = "{$this->prefix}:{$queueKey}:{$partition}";
$activeKey = "{$this->prefix}:{$queueKey}:{$partition}:active";
$queueListKey = $this->keys->partitionQueue($queueName, $partition);
$activeKey = $this->keys->active($queueName, $partition);

$pending = (int) $redis->llen($queueListKey);
$active = (int) $redis->hlen($activeKey);
Expand All @@ -101,7 +101,7 @@ protected function clearAll($redis, string $queueKey, bool $force): int
];
}

$this->warn("Queue: {$queueKey}");
$this->warn("Queue: {$queueName}");
$this->table(
['Partition', 'Pending', 'Active'],
array_map(fn($s) => [$s['partition'], $s['pending'], $s['active']], $partitionStats)
Expand All @@ -117,17 +117,16 @@ protected function clearAll($redis, string $queueKey, bool $force): int

// Clear all
foreach ($partitions as $partition) {
$redis->del("{$this->prefix}:{$queueKey}:{$partition}");
$redis->del("{$this->prefix}:{$queueKey}:{$partition}:active");
$redis->del("{$this->prefix}:{$queueKey}:{$partition}:delayed");
$redis->del("{$this->prefix}:metrics:{$queueKey}:{$partition}");
$redis->del($this->keys->partitionQueue($queueName, $partition));
$redis->del($this->keys->active($queueName, $partition));
$redis->del($this->keys->delayed($queueName, $partition));
$redis->del($this->keys->metrics($queueName, $partition));
}

$redis->del($partitionsKey);

// Clear round-robin state
$rrStateKey = "{$this->prefix}:rr-state:{$queueKey}";
$redis->del($rrStateKey);
$redis->del($this->keys->roundRobinState($queueName));

$this->info("✓ Cleared {$totalPending} pending and {$totalActive} active jobs from " . count($partitions) . " partitions.");

Expand Down
36 changes: 18 additions & 18 deletions src/Console/BalancedQueueTableCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;
use YanGusik\BalancedQueue\Support\Metrics;
use YanGusik\BalancedQueue\Support\RedisKeys;

class BalancedQueueTableCommand extends Command
{
Expand All @@ -18,11 +19,11 @@ class BalancedQueueTableCommand extends Command

protected $description = 'Display balanced queue statistics';

protected string $prefix;
protected RedisKeys $keys;

public function handle(): int
{
$this->prefix = config('balanced-queue.redis.prefix', 'balanced-queue');
$this->keys = new RedisKeys(config('balanced-queue.redis.prefix', 'balanced-queue'));
$showAll = $this->option('all');
$watch = $this->option('watch');
$interval = (int) $this->option('interval');
Expand Down Expand Up @@ -53,7 +54,7 @@ public function handle(): int
*/
protected function getAllQueues(): array
{
$metrics = new Metrics(Redis::connection(config('balanced-queue.redis.connection')), $this->prefix);
$metrics = new Metrics(Redis::connection(config('balanced-queue.redis.connection')), $this->keys->getPrefix());

return $metrics->getAllQueues();
}
Expand All @@ -65,15 +66,15 @@ protected function getAllQueues(): array
*/
protected function displayQueues(array $queues): void
{
foreach ($queues as $index => $queue) {
foreach ($queues as $index => $queueName) {
if ($index > 0) {
$this->newLine();
}
if (count($queues) > 1) {
$this->info("Queue: {$queue}");
$this->info("Queue: {$queueName}");
$this->line(str_repeat('-', 40));
}
$this->displayTable($queue);
$this->displayTable($queueName);
}
}

Expand All @@ -92,38 +93,37 @@ protected function watchQueues(array $queues, int $interval): void
// Clear screen
$this->output->write("\033[2J\033[H");

foreach ($queues as $index => $queue) {
foreach ($queues as $index => $queueName) {
if ($index > 0) {
$this->newLine();
}
$this->displayHeader($queue);
$this->displayTable($queue);
$this->displayHeader($queueName);
$this->displayTable($queueName);
}
$this->displayFooter();

sleep($interval);
}
}

protected function displayHeader(string $queue): void
protected function displayHeader(string $queueName): void
{
$this->info("╔══════════════════════════════════════════════════════════════╗");
$this->info("║ BALANCED QUEUE MONITOR - {$queue}");
$this->info("║ BALANCED QUEUE MONITOR - {$queueName}");
$this->info("╚══════════════════════════════════════════════════════════════╝");
$this->newLine();
}

protected function displayTable(string $queue): void
protected function displayTable(string $queueName): void
{
$redis = Redis::connection(config('balanced-queue.redis.connection'));
$queueKey = "queues:{$queue}";

// Get all partitions
$partitionsKey = "{$this->prefix}:{$queueKey}:partitions";
$partitionsKey = $this->keys->partitions($queueName);
$partitions = $redis->smembers($partitionsKey);

if (empty($partitions)) {
$this->warn("No active partitions in queue '{$queue}'");
$this->warn("No active partitions in queue '{$queueName}'");
return;
}

Expand All @@ -132,9 +132,9 @@ protected function displayTable(string $queue): void
$rows = [];

foreach ($partitions as $partition) {
$queueListKey = "{$this->prefix}:{$queueKey}:{$partition}";
$activeKey = "{$this->prefix}:{$queueKey}:{$partition}:active";
$metricsKey = "{$this->prefix}:metrics:{$queueKey}:{$partition}";
$queueListKey = $this->keys->partitionQueue($queueName, $partition);
$activeKey = $this->keys->active($queueName, $partition);
$metricsKey = $this->keys->metrics($queueName, $partition);

$pending = (int) $redis->llen($queueListKey);
$active = (int) $redis->hlen($activeKey);
Expand Down
6 changes: 4 additions & 2 deletions src/Queue/BalancedRedisJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public function release($delay = 0): void
$this->partition,
$this->balancedJobId,
$this->getRawBody(),
$delay
$delay,
$this
);
}

Expand All @@ -65,7 +66,8 @@ public function delete(): void
$redis->deletePartitionJob(
$this->queue,
$this->partition,
$this->balancedJobId
$this->balancedJobId,
$this
);
}

Expand Down
Loading