1
0
mirror of https://github.com/danog/parallel.git synced 2024-12-13 01:27:24 +01:00
parallel/src/Threading/Thread.php

147 lines
3.5 KiB
PHP
Raw Normal View History

2015-07-14 00:30:59 +02:00
<?php
namespace Icicle\Concurrent\Threading;
use Icicle\Concurrent\Sync\Channel;
2015-08-19 02:12:58 +02:00
use Icicle\Concurrent\Sync\ChannelInterface;
2015-08-07 01:59:25 +02:00
use Icicle\Concurrent\Sync\ExitFailure;
use Icicle\Concurrent\Sync\ExitSuccess;
2015-07-27 00:53:00 +02:00
use Icicle\Coroutine\Coroutine;
2015-08-11 00:38:58 +02:00
use Icicle\Loop;
2015-07-27 00:53:00 +02:00
/**
2015-07-27 00:53:00 +02:00
* An internal thread that executes a given function concurrently.
2015-08-11 00:38:58 +02:00
*
* @internal
*/
2015-07-14 00:30:59 +02:00
class Thread extends \Thread
{
2015-07-27 00:53:00 +02:00
/**
* @var callable The function to execute in the thread.
*/
private $function;
/**
* @var
*/
private $args;
2015-08-07 01:59:25 +02:00
/**
* @var resource
*/
private $socket;
2015-08-18 17:12:06 +02:00
/**
* @var bool
*/
private $lock = true;
2015-07-27 00:53:00 +02:00
/**
* Creates a new thread object.
*
* @param resource $socket IPC communication socket.
* @param callable $function The function to execute in the thread.
* @param mixed[] $args Arguments to pass to the function.
2015-07-27 00:53:00 +02:00
*/
public function __construct($socket, callable $function, array $args = [])
{
$this->function = $function;
$this->args = $args;
2015-07-14 00:30:59 +02:00
$this->socket = $socket;
2015-08-07 01:59:25 +02:00
}
/**
* Runs the thread code and the initialized function.
*/
2015-07-14 00:30:59 +02:00
public function run()
{
/* First thing we need to do is re-initialize the class autoloader. If
* we don't do this first, any object of a class that was loaded after
* the thread started will just be garbage data and unserializable
* values (like resources) will be lost. This happens even with
* thread-safe objects.
2015-08-11 00:38:58 +02:00
*/
foreach (get_declared_classes() as $className) {
if (strpos($className, 'ComposerAutoloaderInit') === 0) {
// Calling getLoader() will register the class loader for us
$className::getLoader();
break;
}
}
// Erase the old event loop inherited from the parent thread and create
// a new one.
Loop\loop(Loop\create());
2015-08-11 00:38:58 +02:00
// At this point, the thread environment has been prepared so begin using the thread.
2015-08-19 02:12:58 +02:00
$channel = new Channel($this->socket);
2015-08-19 02:12:58 +02:00
$coroutine = new Coroutine($this->execute($channel));
2015-08-11 00:38:58 +02:00
$coroutine->done();
Loop\run();
}
2015-08-18 17:12:06 +02:00
/**
* Attempts to obtain the lock. Returns true if the lock was obtained.
*
* @return bool
*/
public function tsl()
{
if (!$this->lock) {
return false;
}
$this->lock();
try {
if ($this->lock) {
$this->lock = false;
return true;
}
return false;
} finally {
$this->unlock();
}
}
/**
* Releases the lock.
*/
public function release()
{
$this->lock = true;
}
2015-08-07 01:59:25 +02:00
/**
2015-08-19 02:12:58 +02:00
* @coroutine
*
* @param \Icicle\Concurrent\Sync\ChannelInterface $channel
2015-08-07 01:59:25 +02:00
*
* @return \Generator
2015-08-19 02:12:58 +02:00
*
* @resolve int
2015-08-07 01:59:25 +02:00
*/
2015-08-19 02:12:58 +02:00
private function execute(ChannelInterface $channel)
{
2015-08-19 02:12:58 +02:00
$executor = new ThreadExecutor($this, $channel);
2015-08-07 01:59:25 +02:00
try {
2015-08-07 07:07:53 +02:00
$function = $this->function;
if ($function instanceof \Closure) {
$function = $function->bindTo($executor, ThreadExecutor::class);
2015-08-07 07:07:53 +02:00
}
$result = new ExitSuccess(yield call_user_func_array($function, $this->args));
2015-08-07 01:59:25 +02:00
} catch (\Exception $exception) {
$result = new ExitFailure($exception);
2015-07-27 00:53:00 +02:00
}
2015-07-14 00:30:59 +02:00
2015-08-19 02:12:58 +02:00
try {
yield $channel->send($result);
} finally {
$channel->close();
}
}
2015-07-14 00:30:59 +02:00
}