1
0
mirror of https://github.com/danog/parallel.git synced 2024-12-03 10:07:49 +01:00
parallel/src/Threading/Internal/Thread.php

136 lines
3.7 KiB
PHP
Raw Normal View History

<?php
namespace Icicle\Concurrent\Threading\Internal;
2016-01-23 07:00:56 +01:00
use Icicle\Concurrent\Sync\{Channel, ChannelledStream, Internal\ExitFailure, Internal\ExitSuccess};
use Icicle\Coroutine\Coroutine;
use Icicle\Loop;
2015-10-17 01:20:20 +02:00
use Icicle\Stream\Pipe\DuplexPipe;
/**
* An internal thread that executes a given function concurrently.
*
* @internal
*/
class Thread extends \Thread
{
const KILL_CHECK_FREQUENCY = 0.25;
/**
* @var callable The function to execute in the thread.
*/
private $function;
/**
* @var mixed[] Arguments to pass to the function.
*/
private $args;
/**
* @var resource
*/
private $socket;
/**
* @var bool
*/
private $killed = false;
/**
* Creates a new thread object.
*
* @param resource $socket IPC communication socket.
* @param callable $function The function to execute in the thread.
* @param mixed[] $args Arguments to pass to the function.
*/
public function __construct($socket, callable $function, array $args = [])
{
$this->function = $function;
$this->args = $args;
$this->socket = $socket;
}
/**
* Runs the thread code and the initialized function.
*
* @codeCoverageIgnore Only executed in thread.
*/
public function run()
{
/* First thing we need to do is re-initialize the class autoloader. If
* we don't do this first, any object of a class that was loaded after
* the thread started will just be garbage data and unserializable
* values (like resources) will be lost. This happens even with
* thread-safe objects.
*/
foreach (get_declared_classes() as $className) {
if (strpos($className, 'ComposerAutoloaderInit') === 0) {
// Calling getLoader() will register the class loader for us
$className::getLoader();
break;
}
}
2015-10-17 01:20:20 +02:00
Loop\loop($loop = Loop\create(false)); // Disable signals in thread.
// At this point, the thread environment has been prepared so begin using the thread.
2015-12-30 05:29:01 +01:00
$channel = new ChannelledStream(new DuplexPipe($this->socket, false));
$coroutine = new Coroutine($this->execute($channel));
$coroutine->done();
2015-09-06 21:59:24 +02:00
$timer = $loop->timer(self::KILL_CHECK_FREQUENCY, true, function () use ($loop) {
if ($this->killed) {
$loop->stop();
}
});
$timer->unreference();
$loop->run();
}
/**
* Sets a local variable to true so the running event loop can check for a kill signal.
*/
public function kill()
{
$this->killed = true;
2015-09-06 21:59:24 +02:00
return parent::kill();
}
/**
* @coroutine
*
2015-12-05 06:50:32 +01:00
* @param \Icicle\Concurrent\Sync\Channel $channel
*
* @return \Generator
*
* @resolve int
*
* @codeCoverageIgnore Only executed in thread.
*/
2016-01-23 07:00:56 +01:00
private function execute(Channel $channel): \Generator
{
try {
2015-08-27 21:49:41 +02:00
if ($this->function instanceof \Closure) {
$function = $this->function->bindTo($channel, Channel::class);
2015-08-27 21:49:41 +02:00
}
if (empty($function)) {
$function = $this->function;
}
2016-01-23 07:00:56 +01:00
$result = new ExitSuccess(yield $function(...$this->args));
} catch (\Throwable $exception) {
$result = new ExitFailure($exception);
}
2015-12-12 07:34:41 +01:00
// Attempt to return the result.
try {
2016-01-23 07:00:56 +01:00
return yield from $channel->send($result);
} catch (\Throwable $exception) {
2015-12-12 07:34:41 +01:00
// The result was not sendable! Try sending the reason why instead.
2016-01-23 07:00:56 +01:00
return yield from $channel->send(new ExitFailure($exception));
2015-12-12 07:34:41 +01:00
}
}
}