-
Notifications
You must be signed in to change notification settings - Fork 0
Thread ThreadManager
The Thread and ThreadManager classes provide concurrent execution in PHP via proc_open. ThreadManager manages a pool of threads with configurable maximum concurrency, automatically queuing tasks when the limit is reached. It supports inline (synchronous) execution and process-based (asynchronous) execution of PHP code, PHP files, and arbitrary shell commands.
$tm = new ThreadManager();Creates a new thread manager with a default maxConcurrency of 4.
Set the maximum number of processes that can run simultaneously. Excess tasks are queued and started automatically when a running thread finishes.
$tm->setMaxConcurrency(8);Execute a callable. In inline mode (no command option), the callable runs synchronously and returns immediately with the result. If options['command'] is provided, it delegates to spawnProcessCommand() for async execution.
// Inline mode → synchronous
$thread = $tm->spawn(function() {
return array_sum(range(1, 100));
});
echo $thread->getResult(); // 5050Run arbitrary PHP code in a background process. The code is base64-encoded to avoid shell escaping issues (especially on Windows). Auto-detects the PHP binary path if not specified.
$thread = $tm->spawnPHPCode('echo json_encode(["pid" => getmypid()]);');
$result = $tm->await($thread->getId());
echo $result['stdout']; // {"pid":12345}Writes PHP code to a temporary file and executes it as a separate process. The temp file is automatically cleaned up on shutdown. Useful for very long scripts where base64 encoding overhead is a concern.
$thread = $tm->spawnPHPFile('
$data = file_get_contents("/data/large-file.json");
$parsed = json_decode($data, true);
echo count($parsed);
');
$result = $tm->await($thread->getId());Spawn an arbitrary process. Arguments are escaped via escapeshellarg(). Supports cwd and env options.
$thread = $tm->spawnProcessCommand('node', ['script.js', '--verbose'], [
'cwd' => '/app/scripts',
'env' => ['NODE_ENV' => 'production'],
]);Block until the thread finishes (or times out). Returns the thread result, or null if the thread failed. The timeout is in milliseconds.
$result = $tm->await($thread->getId(), 5000); // 5 second timeoutWait for multiple threads to complete. Returns an associative array of [id => result].
$results = $tm->joinAll([$thread1, $thread2, $thread3], 10000);status() returns the current status string of a thread (polls process state first). getThread() returns the Thread object or null.
$status = $tm->status($id); // 'pending' | 'running' | 'completed' | 'failed'
$thread = $tm->getThread($id); // Thread | null| Constant | Value | Description |
| --- | --- | --- |
| STATUS_PENDING | 'pending' | Thread created but not yet started |
| STATUS_RUNNING | 'running' | Thread is currently executing |
| STATUS_COMPLETED | 'completed' | Thread finished successfully |
| STATUS_FAILED | 'failed' | Thread failed with an error or non-zero exit code |
| Constant | Value | Description |
| --- | --- | --- |
| MODE_INLINE | 'inline' | Synchronous callable execution |
| MODE_PROCESS | 'process' | Asynchronous process execution via proc_open |
| Method | Return | Description |
| --- | --- | --- |
| getId() | string | Unique thread identifier |
| getStatus() | string | Current status constant |
| getResult() | mixed | Return value (inline) or result array (process) |
| getError() | ?Throwable | Exception if the thread failed |
| getStdout() | string | Captured stdout (process mode) |
| getStderr() | string | Captured stderr (process mode) |
| getExitCode() | ?int | Process exit code (null if timeout or inline) |
| isFinished() | bool | true if completed or failed |
| getMode() | string | 'inline' or 'process' |
| getCommand() | string | Full command string (process mode) |
use Razy\ThreadManager;
$tm = new ThreadManager();
// Run a heavy computation in the background
$thread = $tm->spawnPHPCode('
$sum = 0;
for ($i = 0; $i < 1000000; $i++) {
$sum += $i;
}
echo json_encode(["sum" => $sum]);
');
// Do other work while it runs...
echo 'Working on other tasks...';
// Collect the result
$result = $tm->await($thread->getId());
echo $result['stdout']; // {"sum":499999500000}use Razy\ThreadManager;
$tm = new ThreadManager();
// Run a shell command
$thread = $tm->spawnProcessCommand('git', ['log', '--oneline', '-5']);
$result = $tm->await($thread->getId());
echo $result['stdout']; // Last 5 commit messages
echo $result['exit_code']; // 0use Razy\ThreadManager;
$tm = new ThreadManager();
$tm->setMaxConcurrency(4);
$urls = [
'https://api.example.com/users',
'https://api.example.com/orders',
'https://api.example.com/products',
'https://api.example.com/analytics',
];
// Spawn a thread for each URL
$threads = [];
foreach ($urls as $url) {
$code = 'echo file_get_contents("' . $url . '");';
$threads[] = $tm->spawnPHPCode($code);
}
// Wait for all to finish (10 second timeout)
$results = $tm->joinAll($threads, 10000);
// Process results
foreach ($results as $id => $result) {
if ($result !== null) {
$data = json_decode($result['stdout'], true);
echo "Thread $id: " . count($data) . " records\n";
}
}The ThreadManager uses a queue system to limit the number of simultaneously running processes. When a new thread is spawned and the running count has reached maxConcurrency, the thread is placed in a pending queue with STATUS_PENDING. As running threads complete and are polled (via await() or joinAll()), the manager drains the queue and starts pending threads automatically.
$tm = new ThreadManager();
$tm->setMaxConcurrency(2); // Only 2 processes at a time
// Spawn 5 tasks → first 2 start immediately, rest are queued
$threads = [];
for ($i = 0; $i < 5; $i++) {
$threads[] = $tm->spawnPHPCode('sleep(1); echo "Task ' . $i . ' done";');
}
// joinAll drains the queue as threads complete
$results = $tm->joinAll($threads);