1
0
mirror of https://github.com/danog/parallel.git synced 2024-12-03 10:07:49 +01:00
parallel/src/Threading/Thread.php

79 lines
1.8 KiB
PHP
Raw Normal View History

2015-07-14 00:30:59 +02:00
<?php
namespace Icicle\Concurrent\Threading;
2015-07-27 00:53:00 +02:00
use Icicle\Coroutine\Coroutine;
use Icicle\Loop;
/**
2015-07-27 00:53:00 +02:00
* An internal thread that executes a given function concurrently.
*/
2015-07-14 00:30:59 +02:00
class Thread extends \Thread
{
const MSG_DONE = 1;
const MSG_ERROR = 2;
private $socket;
2015-07-27 00:53:00 +02:00
/**
* @var ThreadContext An instance of the context local to this thread.
*/
public $context;
/**
* @var string|null Path to an autoloader to include.
*/
public $autoloaderPath;
/**
* @var callable The function to execute in the thread.
*/
private $function;
/**
* Creates a new thread object.
*
* @param callable $function The function to execute in the thread.
*/
public function __construct(callable $function)
{
2015-07-27 00:53:00 +02:00
$this->function = $function;
$this->context = ThreadContext::createLocalInstance($this);
}
2015-07-14 00:30:59 +02:00
public function initialize($socket)
{
$this->socket = $socket;
}
public function run()
{
2015-07-27 00:53:00 +02:00
try {
if (file_exists($this->autoloaderPath)) {
require $this->autoloaderPath;
}
$generator = call_user_func($this->function);
if ($generator instanceof \Generator) {
$coroutine = new Coroutine($generator);
}
Loop\run();
2015-07-14 00:30:59 +02:00
2015-07-27 00:53:00 +02:00
$this->sendMessage(self::MSG_DONE);
} catch (\Exception $exception) {
print $exception . PHP_EOL;
$this->sendMessage(self::MSG_ERROR);
$serialized = serialize($exception);
$length = strlen($serialized);
fwrite($this->socket, pack('S', $length) . $serialized);
} finally {
fclose($this->socket);
}
2015-07-14 00:30:59 +02:00
}
private function sendMessage($message)
{
fwrite($this->socket, chr($message));
}
}