mirror of
https://github.com/danog/parallel.git
synced 2024-11-30 04:39:01 +01:00
First working, async threads using channels for messages
Channel was changed to return resources on creation. Also, much better setting up synchronization at the beginning of a new thread.
This commit is contained in:
parent
ced3ddc8ff
commit
ff413c904c
@ -45,9 +45,7 @@ class Channel
|
||||
throw new ChannelException('Failed to create channel sockets.');
|
||||
}
|
||||
|
||||
return array_map(static function ($socket) {
|
||||
return new static($socket);
|
||||
}, $sockets);
|
||||
return $sockets;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -147,7 +145,7 @@ class Channel
|
||||
*
|
||||
* @param resource $socketResource
|
||||
*/
|
||||
private function __construct($socketResource)
|
||||
public function __construct($socketResource)
|
||||
{
|
||||
$this->socketResource = $socketResource;
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
<?php
|
||||
namespace Icicle\Concurrent\Threading;
|
||||
|
||||
use Icicle\Concurrent\Sync\Channel;
|
||||
use Icicle\Coroutine\Coroutine;
|
||||
use Icicle\Loop;
|
||||
|
||||
@ -9,11 +10,6 @@ use Icicle\Loop;
|
||||
*/
|
||||
class Thread extends \Thread
|
||||
{
|
||||
const MSG_DONE = 1;
|
||||
const MSG_ERROR = 2;
|
||||
|
||||
private $socket;
|
||||
|
||||
/**
|
||||
* @var ThreadContext An instance of the context local to this thread.
|
||||
*/
|
||||
@ -29,6 +25,12 @@ class Thread extends \Thread
|
||||
*/
|
||||
private $function;
|
||||
|
||||
public $prepared = false;
|
||||
public $initialized = false;
|
||||
|
||||
private $channel;
|
||||
private $socket;
|
||||
|
||||
/**
|
||||
* Creates a new thread object.
|
||||
*
|
||||
@ -36,37 +38,108 @@ class Thread extends \Thread
|
||||
*/
|
||||
public function __construct(callable $function)
|
||||
{
|
||||
$this->function = $function;
|
||||
$this->context = ThreadContext::createLocalInstance($this);
|
||||
$this->function = $function;
|
||||
}
|
||||
|
||||
public function initialize($socket)
|
||||
/**
|
||||
* Initializes the thread by injecting values from the parent into threaded memory.
|
||||
*
|
||||
* @param resource $socket The channel socket to communicate to the parent with.
|
||||
*/
|
||||
public function init($socket)
|
||||
{
|
||||
$this->socket = $socket;
|
||||
$this->initialized = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs the thread code and the initialized function.
|
||||
*/
|
||||
public function run()
|
||||
{
|
||||
try {
|
||||
// First thing we need to do is prepare the thread environment to make
|
||||
// it usable, so lock the thread while we do it. Hopefully we get the
|
||||
// lock first, but if we don't the parent will release and give us a
|
||||
// chance before continuing.
|
||||
$this->lock();
|
||||
|
||||
// First thing we need to do is initialize the class autoloader. If we
|
||||
// don't do this first, objects we receive from other threads will just
|
||||
// be garbage data and unserializable values (like resources) will be
|
||||
// lost. This happens even with thread-safe objects.
|
||||
if (file_exists($this->autoloaderPath)) {
|
||||
require $this->autoloaderPath;
|
||||
}
|
||||
|
||||
// Initialize the thread-local global event loop.
|
||||
Loop\loop();
|
||||
|
||||
// Now let the parent thread know that we are done preparing the
|
||||
// thread environment and are ready to accept data.
|
||||
$this->prepared = true;
|
||||
$this->notify();
|
||||
$this->unlock();
|
||||
|
||||
// Wait for objects to be injected by the context wrapper object.
|
||||
$this->lock();
|
||||
if (!$this->initialized) {
|
||||
$this->wait();
|
||||
}
|
||||
$this->unlock();
|
||||
|
||||
// At this point, the thread environment has been prepared, and the
|
||||
// parent has finished injecting values into our memory.
|
||||
|
||||
$this->channel = new LocalObject(new Channel($this->socket));
|
||||
//$this->socket = null;
|
||||
|
||||
//register_shutdown_function([$this, 'handleShutdown']);
|
||||
try {
|
||||
if ($this->function instanceof \Closure) {
|
||||
$generator = $this->function->bindTo($this->context)->__invoke();
|
||||
} else {
|
||||
$generator = call_user_func($this->function);
|
||||
}
|
||||
|
||||
if ($generator instanceof \Generator) {
|
||||
$coroutine = new Coroutine($generator);
|
||||
} else {
|
||||
$returnValue = $generator;
|
||||
}
|
||||
} catch (\Exception $exception) {
|
||||
print $exception;
|
||||
|
||||
$panic = [
|
||||
'panic' => [
|
||||
'message' => $exception->getMessage(),
|
||||
'code' => $exception->getCode(),
|
||||
'trace' => array_map([$this, 'removeTraceArgs'], $exception->getTrace()),
|
||||
],
|
||||
];
|
||||
|
||||
$this->channel->deref()->send($panic);
|
||||
} finally {
|
||||
$this->channel->deref()->close();
|
||||
}
|
||||
|
||||
Loop\run();
|
||||
$this->channel->free();
|
||||
}
|
||||
|
||||
public function handleShutdown()
|
||||
{
|
||||
if ($error = error_get_last()) {
|
||||
$panic = [
|
||||
'message' => $error['message'],
|
||||
'code' => 0,
|
||||
'trace' => array_map([$this, 'removeTraceArgs'], debug_backtrace()),
|
||||
];
|
||||
|
||||
$this->sendMessage(self::MSG_DONE);
|
||||
} catch (\Exception $exception) {
|
||||
print $exception . PHP_EOL;
|
||||
$this->sendMessage(self::MSG_ERROR);
|
||||
$serialized = serialize($exception);
|
||||
$serialized = serialize($panic);
|
||||
$length = strlen($serialized);
|
||||
fwrite($this->socket, pack('S', $length).$serialized);
|
||||
} finally {
|
||||
fclose($this->socket);
|
||||
}
|
||||
}
|
||||
@ -75,4 +148,10 @@ class Thread extends \Thread
|
||||
{
|
||||
fwrite($this->socket, chr($message));
|
||||
}
|
||||
|
||||
public function removeTraceArgs($trace)
|
||||
{
|
||||
unset($trace['args']);
|
||||
return $trace;
|
||||
}
|
||||
}
|
||||
|
@ -2,7 +2,8 @@
|
||||
namespace Icicle\Concurrent\Threading;
|
||||
|
||||
use Icicle\Concurrent\ContextInterface;
|
||||
use Icicle\Concurrent\Exception\ContextAbortException;
|
||||
use Icicle\Concurrent\Exception\PanicError;
|
||||
use Icicle\Concurrent\Sync\Channel;
|
||||
use Icicle\Promise;
|
||||
use Icicle\Socket\Stream\DuplexStream;
|
||||
|
||||
@ -20,11 +21,6 @@ class ThreadContext implements ContextInterface
|
||||
*/
|
||||
public $thread;
|
||||
|
||||
/**
|
||||
* @var Promise\Deferred A deferred object that resolves when the context ends.
|
||||
*/
|
||||
private $deferredJoin;
|
||||
|
||||
/**
|
||||
* @var DuplexStream An active socket connection to the thread's socket.
|
||||
*/
|
||||
@ -35,18 +31,26 @@ class ThreadContext implements ContextInterface
|
||||
*/
|
||||
private $invoker;
|
||||
|
||||
/**
|
||||
* @var Channel A channel for communicating with the thread.
|
||||
*/
|
||||
private $channel;
|
||||
|
||||
private $isThread = false;
|
||||
|
||||
/**
|
||||
* Creates an instance of the current context class for the local thread.
|
||||
*
|
||||
* @return self
|
||||
*
|
||||
* @internal
|
||||
*
|
||||
* @return self
|
||||
*/
|
||||
final public static function createLocalInstance(Thread $thread)
|
||||
{
|
||||
$class = new \ReflectionClass(static::class);
|
||||
$instance = $class->newInstanceWithoutConstructor();
|
||||
$instance->thread = $thread;
|
||||
$instance->isThread = true;
|
||||
return $instance;
|
||||
}
|
||||
|
||||
@ -78,16 +82,33 @@ class ThreadContext implements ContextInterface
|
||||
*/
|
||||
public function start()
|
||||
{
|
||||
if (($sockets = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP)) === false) {
|
||||
throw new \Exception();
|
||||
}
|
||||
$channels = Channel::create();
|
||||
$this->channel = new Channel($channels[1]);
|
||||
|
||||
$this->thread->initialize($sockets[1]);
|
||||
// Start the thread first. The thread will prepare the autoloader and
|
||||
// the event loop, and then notify us when the thread environment is
|
||||
// ready. If we don't do this first, objects will break when passed
|
||||
// to the thread, since the classes are not yet defined.
|
||||
$this->thread->start(PTHREADS_INHERIT_INI);
|
||||
|
||||
$this->socket = new DuplexStream($sockets[0]);
|
||||
// The thread must prepare itself first, so wait until the thread has
|
||||
// done so. We need to unlock ourselves while waiting to prevent
|
||||
// deadlocks if we somehow acquired the lock before the thread did.
|
||||
$this->thread->synchronized(function () {
|
||||
if (!$this->thread->prepared) {
|
||||
$this->thread->wait();
|
||||
}
|
||||
});
|
||||
|
||||
$this->socket->read(1)->then(function ($data) {
|
||||
// At this stage, the thread environment has been prepared, and we kept
|
||||
// the lock from above, so initialize the thread with the necessary
|
||||
// values to be copied over.
|
||||
$this->thread->synchronized(function () use ($channels) {
|
||||
$this->thread->init($channels[0]);
|
||||
$this->thread->notify();
|
||||
});
|
||||
|
||||
/*$this->socket->read(1)->then(function ($data) {
|
||||
$message = ord($data);
|
||||
if ($message === Thread::MSG_DONE) {
|
||||
$this->deferredJoin->resolve();
|
||||
@ -101,14 +122,14 @@ class ThreadContext implements ContextInterface
|
||||
$serializedLength = $serializedLength[1];
|
||||
return $this->socket->read($serializedLength);
|
||||
})->then(function ($data) {
|
||||
$previous = unserialize($data);
|
||||
$exception = new ContextAbortException('The context encountered an error.', 0, $previous);
|
||||
$panic = unserialize($data);
|
||||
$exception = new PanicError($panic['message'], $panic['code'], $panic['trace']);
|
||||
$this->deferredJoin->reject($exception);
|
||||
$this->socket->close();
|
||||
});
|
||||
}, function (\Exception $exception) {
|
||||
$this->deferredJoin->reject($exception);
|
||||
});
|
||||
});*/
|
||||
}
|
||||
|
||||
/**
|
||||
@ -127,12 +148,23 @@ class ThreadContext implements ContextInterface
|
||||
$this->thread->kill();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function panic($message = '', $code = 0)
|
||||
{
|
||||
if ($this->isThread) {
|
||||
throw new PanicError($message, $code);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function join()
|
||||
{
|
||||
return $this->deferredJoin->getPromise();
|
||||
yield $this->channel->receive();
|
||||
$this->thread->join();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -185,6 +217,14 @@ class ThreadContext implements ContextInterface
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
// Find the Composer autoloader initializer class, and use it to fetch
|
||||
// the autoloader instance.
|
||||
/*foreach (get_declared_classes() as $name) {
|
||||
if (strpos($name, 'ComposerAutoloaderInit') === 0) {
|
||||
return $name::getLoader();
|
||||
}
|
||||
}*/
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -2,22 +2,25 @@
|
||||
namespace Icicle\Tests\Concurrent\Sync;
|
||||
|
||||
use Icicle\Concurrent\Sync\Channel;
|
||||
use Icicle\Loop;
|
||||
use Icicle\Coroutine;
|
||||
use Icicle\Loop;
|
||||
use Icicle\Tests\Concurrent\TestCase;
|
||||
|
||||
class ChannelTest extends \PHPUnit_Framework_TestCase
|
||||
class ChannelTest extends TestCase
|
||||
{
|
||||
public function testCreate()
|
||||
{
|
||||
list($a, $b) = Channel::create();
|
||||
|
||||
$this->assertInstanceOf(Channel::class, $a);
|
||||
$this->assertInstanceOf(Channel::class, $b);
|
||||
$this->assertInternalType('resource', $a);
|
||||
$this->assertInternalType('resource', $b);
|
||||
}
|
||||
|
||||
public function testClose()
|
||||
{
|
||||
list($a, $b) = Channel::create();
|
||||
$a = new Channel($a);
|
||||
$b = new Channel($b);
|
||||
|
||||
// Close $a. $b should close on next read...
|
||||
$a->close();
|
||||
@ -33,6 +36,8 @@ class ChannelTest extends \PHPUnit_Framework_TestCase
|
||||
{
|
||||
Coroutine\create(function () {
|
||||
list($a, $b) = Channel::create();
|
||||
$a = new Channel($a);
|
||||
$b = new Channel($b);
|
||||
|
||||
yield $a->send('hello');
|
||||
$data = (yield $b->receive());
|
||||
@ -41,4 +46,22 @@ class ChannelTest extends \PHPUnit_Framework_TestCase
|
||||
|
||||
Loop\run();
|
||||
}
|
||||
|
||||
/**
|
||||
* @group threading
|
||||
*/
|
||||
public function testThreadTransfer()
|
||||
{
|
||||
list($a, $b) = Channel::create();
|
||||
$a = new Channel($a);
|
||||
$b = new Channel($b);
|
||||
|
||||
$thread = \Thread::from(function () {
|
||||
$a = $this->a;
|
||||
});
|
||||
|
||||
$thread->a; // <-- Transfer channel $a to the thread
|
||||
$thread->start(PTHREADS_INHERIT_INI);
|
||||
$thread->join();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user