1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-26 20:34:40 +01:00

Refactor contexts and exiting

This commit is contained in:
Aaron Piotrowski 2015-08-06 18:59:25 -05:00
parent d0ec5d677e
commit c52ec19a38
12 changed files with 303 additions and 316 deletions

8
examples/fork.php Normal file → Executable file
View File

@ -1,3 +1,4 @@
#!/usr/bin/env php
<?php <?php
require dirname(__DIR__).'/vendor/autoload.php'; require dirname(__DIR__).'/vendor/autoload.php';
@ -6,12 +7,14 @@ use Icicle\Coroutine;
use Icicle\Loop; use Icicle\Loop;
Coroutine\create(function () { Coroutine\create(function () {
$context = ForkContext::create(function () { $context = new ForkContext(function () {
print "Child sleeping for 4 seconds...\n"; print "Child sleeping for 4 seconds...\n";
sleep(4); sleep(4);
print "Child sleeping for 2 seconds...\n"; print "Child sleeping for 2 seconds...\n";
sleep(2); sleep(2);
return 42;
}); });
$context->start(); $context->start();
@ -22,8 +25,7 @@ Coroutine\create(function () {
}); });
try { try {
yield $context->join(); printf("Child ended with value %d!\n", (yield $context->join()));
print "Context done!\n";
} catch (Exception $e) { } catch (Exception $e) {
print "Error from child!\n"; print "Error from child!\n";
print $e."\n"; print $e."\n";

14
examples/thread.php Normal file → Executable file
View File

@ -1,3 +1,4 @@
#!/usr/bin/env php
<?php <?php
require dirname(__DIR__).'/vendor/autoload.php'; require dirname(__DIR__).'/vendor/autoload.php';
@ -5,14 +6,15 @@ use Icicle\Concurrent\Threading\ThreadContext;
use Icicle\Coroutine; use Icicle\Coroutine;
use Icicle\Loop; use Icicle\Loop;
$timer = Loop\periodic(1, function () {
print "Demonstrating how alive the parent is.\n";
});
Coroutine\create(function () { Coroutine\create(function () {
// Create a periodic message in the main thread. // Create a periodic message in the main thread.
$timer = Loop\periodic(1, function () {
print "Demonstrating how alive the parent is.\n";
});
// Create a new child thread that does some blocking stuff. // Create a new child thread that does some blocking stuff.
$test = ThreadContext::create(function () { $test = new ThreadContext(function () {
print "Sleeping for 5 seconds...\n"; print "Sleeping for 5 seconds...\n";
sleep(5); sleep(5);
return 42; return 42;
@ -21,8 +23,6 @@ Coroutine\create(function () {
// Run the thread and wait asynchronously for it to finish. // Run the thread and wait asynchronously for it to finish.
$test->start(); $test->start();
printf("Thread ended with value %d!\n", (yield $test->join())); printf("Thread ended with value %d!\n", (yield $test->join()));
})->done(function () { })->done([$timer, 'stop']);
Loop\stop();
});
Loop\run(); Loop\run();

View File

@ -6,13 +6,6 @@ namespace Icicle\Concurrent;
*/ */
interface ContextInterface extends SynchronizableInterface interface ContextInterface extends SynchronizableInterface
{ {
/**
* Creates a new context with a given function to run.
*
* @return ContextInterface A context instance.
*/
public static function create(callable $function);
/** /**
* Checks if the context is running. * Checks if the context is running.
* *
@ -31,12 +24,20 @@ interface ContextInterface extends SynchronizableInterface
public function kill(); public function kill();
/** /**
* Causes the context to immediately panic. * @return \Generator
* *
* @param string $message A panic message. * @resolve mixed
* @param int $code A panic code.
*/ */
public function panic($message = '', $code = 0); public function receive();
/**
* @param mixed $data
*
* @return \Generator
*
* @resolve int
*/
public function send($data);
/** /**
* Gets a promise that resolves when the context ends and joins with the * Gets a promise that resolves when the context ends and joins with the

View File

@ -4,9 +4,9 @@ namespace Icicle\Concurrent\Exception;
class PanicError extends Error class PanicError extends Error
{ {
/** /**
* @var array Stack trace of the panic. * @var string Stack trace of the panic.
*/ */
private $panicTrace; private $trace;
/** /**
* Creates a new panic error. * Creates a new panic error.
@ -15,39 +15,19 @@ class PanicError extends Error
* @param int $code The panic code. * @param int $code The panic code.
* @param array $trace The panic stack trace. * @param array $trace The panic stack trace.
*/ */
public function __construct($message = '', $code = 0, array $trace = []) public function __construct($message = '', $code = 0, $trace = '')
{ {
parent::__construct($message, $code); parent::__construct($message, $code);
$this->panicTrace = $trace; $this->trace = $trace;
} }
/** /**
* Gets the stack trace at the point the panic occurred. * Gets the stack trace at the point the panic occurred.
* *
* @return array * @return string
*/ */
public function getPanicTrace() public function getPanicTrace()
{ {
return $this->panicTrace; return $this->trace;
}
/**
* Gets the panic stack trace as a string.
*
* @return string
*/
public function getPanicTraceAsString()
{
foreach ($this->panicTrace as $id => $scope) {
$string .= sprintf("%d# %s(%d): %s%s%s()\n",
$id,
$scope['file'],
$scope['line'],
isset($scope['class']) ? $scope['class'] : '',
isset($scope['type']) ? $scope['type'] : '',
$scope['function']);
}
return $string;
} }
} }

View File

@ -0,0 +1,6 @@
<?php
namespace Icicle\Concurrent\Exception;
class SynchronizationError extends Error
{
}

View File

@ -2,11 +2,14 @@
namespace Icicle\Concurrent\Forking; namespace Icicle\Concurrent\Forking;
use Icicle\Concurrent\ContextInterface; use Icicle\Concurrent\ContextInterface;
use Icicle\Concurrent\Exception\PanicError; use Icicle\Concurrent\Exception\SynchronizationError;
use Icicle\Concurrent\Sync\Channel; use Icicle\Concurrent\Sync\Channel;
use Icicle\Concurrent\Sync\ExitFailure;
use Icicle\Concurrent\Sync\ExitInterface;
use Icicle\Concurrent\Sync\ExitSuccess;
use Icicle\Coroutine\Coroutine; use Icicle\Coroutine\Coroutine;
use Icicle\Loop; use Icicle\Loop;
use Icicle\Promise\Deferred; use Icicle\Promise;
/** /**
* Implements a UNIX-compatible context using forked processes. * Implements a UNIX-compatible context using forked processes.
@ -16,28 +19,26 @@ class ForkContext extends Synchronized implements ContextInterface
const MSG_DONE = 1; const MSG_DONE = 1;
const MSG_ERROR = 2; const MSG_ERROR = 2;
private $parentSocket; /**
private $childSocket; * @var \Icicle\Concurrent\Sync\Channel A channel for communicating with the child.
private $pid = 0; */
private $isChild = false; private $channel;
private $deferred;
private $function;
/** /**
* Creates a new fork context. * @var int
*
* @param callable $function The function to run in the context.
*/ */
public static function create(callable $function) private $pid = 0;
/**
* @var callable
*/
private $function;
public function __construct(callable $function)
{ {
$instance = new static(); parent::__construct();
$instance->function = $function; $this->function = $function;
$instance->deferred = new Deferred(function (\Exception $exception) use ($instance) {
$instance->stop();
});
return $instance;
} }
/** /**
@ -55,11 +56,6 @@ class ForkContext extends Synchronized implements ContextInterface
*/ */
public function isRunning() public function isRunning()
{ {
// If we are the child process, then we must be running, don't you think?
if ($this->isChild) {
return true;
}
return posix_getpgid($this->pid) !== false; return posix_getpgid($this->pid) !== false;
} }
@ -68,37 +64,24 @@ class ForkContext extends Synchronized implements ContextInterface
*/ */
public function start() public function start()
{ {
$channels = Channel::create(); list($parent, $child) = Channel::createSocketPair();
$this->parentSocket = $channels[0];
$this->childSocket = $channels[1];
$parentPid = getmypid();
if (($pid = pcntl_fork()) === -1) { if (($pid = pcntl_fork()) === -1) {
throw new \Exception(); throw new \Exception();
} }
// We are the parent inside this block. // We are the parent inside this block.
if ($pid !== 0) { if ($pid !== 0) {
$this->pid = $pid; $this->channel = new Channel($parent);
fclose($child);
// Wait for the child process to send us a byte over the socket pair $this->pid = $pid;
// to discover immediately when the process has completed.
// @TODO error checking, check message type received
$receive = new Coroutine($this->parentSocket->receive());
$receive->then(function ($data) {
$this->deferred->resolve();
}, function (\Exception $exception) {
$this->deferred->reject($exception);
});
return; return;
} }
// We are the child, so close the parent socket and initialize child values. $channel = new Channel($child);
$this->isChild = true; fclose($parent);
$this->pid = getmypid();
$this->parentSocket->close();
// We will have a cloned event loop from the parent after forking. The // We will have a cloned event loop from the parent after forking. The
// child context by default is synchronous and uses the parent event // child context by default is synchronous and uses the parent event
@ -109,9 +92,33 @@ class ForkContext extends Synchronized implements ContextInterface
Loop\clear(); Loop\clear();
// Execute the context runnable and send the parent context the result. // Execute the context runnable and send the parent context the result.
$this->run(); Promise\wait(new Coroutine($this->execute($channel)));
exit(0);
} }
/**
* @param Channel $channel
*
* @return \Generator
*/
private function execute(Channel $channel)
{
try {
$function = $this->function;
$result = new ExitSuccess(yield $function($channel));
} catch (\Exception $exception) {
$result = new ExitFailure($exception);
}
yield $channel->send($result);
$channel->close();
}
/**
* {@inheritdoc}
*/
public function kill() public function kill()
{ {
if ($this->isRunning()) { if ($this->isRunning()) {
@ -125,50 +132,38 @@ class ForkContext extends Synchronized implements ContextInterface
*/ */
public function join() public function join()
{ {
if ($this->isChild) { try {
throw new \Exception(); $response = (yield $this->channel->receive());
}
return $this->deferred->getPromise(); if (!$response instanceof ExitInterface) {
throw new SynchronizationError('Did not receive an exit status from fork.');
}
yield $response->getResult();
} finally {
$this->kill();
}
} }
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
public function panic($message = '', $code = 0) public function receive()
{ {
if ($this->isThread) { $data = (yield $this->channel->receive());
throw new PanicError($message, $code);
if ($data instanceof ExitInterface) {
throw new SynchronizationError(sprintf('Fork exited with result of type: %s', $data->getResult()));
} }
yield $data;
} }
public function __destruct() /**
* {@inheritdoc}
*/
public function send($data)
{ {
parent::__destruct(); return $this->channel->send($data);
// The parent process outlives the child process, so don't destroy the
// semaphore until the parent exits.
if (!$this->isChild) {
//$this->semaphore->destroy();
}
}
private function run()
{
try {
$generator = call_user_func($this->function);
if ($generator instanceof \Generator) {
$coroutine = new Coroutine($generator);
}
Loop\run();
/*} catch (\Exception $exception) {
fwrite($this->childSocket, chr(self::MSG_ERROR));
$serialized = serialize($exception);
$length = strlen($serialized);
fwrite($this->childSocket, pack('S', $length).$serialized);*/
} finally {
$this->childSocket->close();
exit(0);
}
} }
} }

View File

@ -21,14 +21,19 @@ class Channel
const HEADER_LENGTH = 5; const HEADER_LENGTH = 5;
/** /**
* @var DuplexStream An asynchronous socket stream. * @var \Icicle\Socket\Stream\DuplexStream An asynchronous socket stream.
*/ */
private $socket; private $stream;
/** /**
* @var resource A synchronous socket stream. * Creates a new channel instance.
*
* @param resource $socket
*/ */
private $socketResource; public function __construct($socket)
{
$this->stream = new DuplexStream($socket);
}
/** /**
* Creates a new channel and returns a pair of connections. * Creates a new channel and returns a pair of connections.
@ -37,9 +42,11 @@ class Channel
* channel. Each connection is a peer and interacts with the other, even * channel. Each connection is a peer and interacts with the other, even
* across threads or processes. * across threads or processes.
* *
* @return [Channel, Channel] A pair of channels. * @return resource[] Pair of socket resources.
*
* @throws \Icicle\Concurrent\Exception\ChannelException
*/ */
public static function create() public static function createSocketPair()
{ {
// Create a socket pair. // Create a socket pair.
if (($sockets = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP)) === false) { if (($sockets = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP)) === false) {
@ -54,7 +61,7 @@ class Channel
* *
* @param mixed $data The data to send. * @param mixed $data The data to send.
* *
* @return Generator * @return \Generator
*/ */
public function send($data) public function send($data)
{ {
@ -68,15 +75,15 @@ class Channel
$length = strlen($serialized); $length = strlen($serialized);
$header = pack('CL', self::MESSAGE_DATA, $length); $header = pack('CL', self::MESSAGE_DATA, $length);
$message = $header.$serialized; $message = $header . $serialized;
yield $this->getSocket()->write($message); yield $this->stream->write($message);
} }
/** /**
* Waits asynchronously for a message from the peer. * Waits asynchronously for a message from the peer.
* *
* @return Generator * @return \Generator
*/ */
public function receive() public function receive()
{ {
@ -84,7 +91,7 @@ class Channel
$buffer = ''; $buffer = '';
$length = self::HEADER_LENGTH; $length = self::HEADER_LENGTH;
do { do {
$buffer .= (yield $this->getSocket()->read($length)); $buffer .= (yield $this->stream->read($length));
} while (($length -= strlen($buffer)) > 0); } while (($length -= strlen($buffer)) > 0);
$header = unpack('Ctype/Llength', $buffer); $header = unpack('Ctype/Llength', $buffer);
@ -92,7 +99,7 @@ class Channel
// If the message type is MESSAGE_CLOSE, the peer was closed and the channel // If the message type is MESSAGE_CLOSE, the peer was closed and the channel
// is done. // is done.
if ($header['type'] === self::MESSAGE_CLOSE) { if ($header['type'] === self::MESSAGE_CLOSE) {
$this->getSocket()->close(); $this->stream->close();
yield null; yield null;
return; return;
} }
@ -102,7 +109,7 @@ class Channel
$buffer = ''; $buffer = '';
$length = $header['length']; $length = $header['length'];
do { do {
$buffer .= (yield $this->getSocket()->read($length)); $buffer .= (yield $this->stream->read($length));
} while (($length -= strlen($buffer)) > 0); } while (($length -= strlen($buffer)) > 0);
// Attempt to unserialize the received data. // Attempt to unserialize the received data.
@ -120,14 +127,14 @@ class Channel
* This method closes the connection to the peer and sends a message to the * This method closes the connection to the peer and sends a message to the
* peer notifying that the connection has been closed. * peer notifying that the connection has been closed.
* *
* @return PromiseInterface * @return \Icicle\Promise\PromiseInterface
*/ */
public function close() public function close()
{ {
// Create a message with just a DONE header and zero data. // Create a message with just a DONE header and zero data.
$message = pack('Cx4', self::MESSAGE_CLOSE); $message = pack('Cx4', self::MESSAGE_CLOSE);
return $this->getSocket()->end($message); return $this->stream->end($message);
} }
/** /**
@ -137,30 +144,6 @@ class Channel
*/ */
public function isOpen() public function isOpen()
{ {
return $this->getSocket()->isOpen(); return $this->stream->isOpen();
}
/**
* Creates a new channel instance.
*
* @param resource $socketResource
*/
public function __construct($socketResource)
{
$this->socketResource = $socketResource;
}
/**
* Gets an asynchronous socket instance.
*
* @return DuplexStream
*/
private function getSocket()
{
if ($this->socket === null) {
$this->socket = new DuplexStream($this->socketResource);
}
return $this->socket;
} }
} }

47
src/Sync/ExitFailure.php Normal file
View File

@ -0,0 +1,47 @@
<?php
namespace Icicle\Concurrent\Sync;
use Icicle\Concurrent\Exception\PanicError;
class ExitFailure implements ExitInterface
{
/**
* @var string
*/
private $type;
/**
* @var string
*/
private $message;
/**
* @var int
*/
private $code;
/**
* @var array
*/
private $trace;
public function __construct(\Exception $exception)
{
$this->type = get_class($exception);
$this->message = $exception->getMessage();
$this->code = $exception->getCode();
$this->trace = $exception->getTraceAsString();
}
/**
* {@inheritdoc}
*/
public function getResult()
{
throw new PanicError(
sprintf('Uncaught exception in context of type "%s" with message "%s"', $this->type, $this->message),
$this->code,
$this->trace
);
}
}

View File

@ -0,0 +1,12 @@
<?php
namespace Icicle\Concurrent\Sync;
interface ExitInterface
{
/**
* @return mixed Return value of the callable given to the execution context.
*
* @throws \Icicle\Concurrent\Exception\PanicError If the context exited with an uncaught exception.
*/
public function getResult();
}

23
src/Sync/ExitSuccess.php Normal file
View File

@ -0,0 +1,23 @@
<?php
namespace Icicle\Concurrent\Sync;
class ExitSuccess implements ExitInterface
{
/**
* @var mixed
*/
private $result;
public function __construct($result)
{
$this->result = $result;
}
/**
* {@inheritdoc}
*/
public function getResult()
{
return $this->result;
}
}

View File

@ -2,8 +2,10 @@
namespace Icicle\Concurrent\Threading; namespace Icicle\Concurrent\Threading;
use Icicle\Concurrent\Sync\Channel; use Icicle\Concurrent\Sync\Channel;
use Icicle\Concurrent\Sync\ExitFailure;
use Icicle\Concurrent\Sync\ExitSuccess;
use Icicle\Coroutine\Coroutine; use Icicle\Coroutine\Coroutine;
use Icicle\Loop; use Icicle\Promise;
/** /**
* An internal thread that executes a given function concurrently. * An internal thread that executes a given function concurrently.
@ -11,12 +13,7 @@ use Icicle\Loop;
class Thread extends \Thread class Thread extends \Thread
{ {
/** /**
* @var ThreadContext An instance of the context local to this thread. * @var string Path to an autoloader to include.
*/
public $context;
/**
* @var string|null Path to an autoloader to include.
*/ */
public $autoloaderPath; public $autoloaderPath;
@ -25,10 +22,12 @@ class Thread extends \Thread
*/ */
private $function; private $function;
public $prepared = false; private $prepared = false;
public $initialized = false; private $initialized = false;
private $channel; /**
* @var resource
*/
private $socket; private $socket;
/** /**
@ -36,9 +35,9 @@ class Thread extends \Thread
* *
* @param callable $function The function to execute in the thread. * @param callable $function The function to execute in the thread.
*/ */
public function __construct(callable $function) public function __construct(callable $function, $autoloaderPath = '')
{ {
$this->context = new ThreadContext($this); $this->autoloaderPath = $autoloaderPath;
$this->function = $function; $this->function = $function;
} }
@ -53,6 +52,16 @@ class Thread extends \Thread
$this->initialized = true; $this->initialized = true;
} }
/**
* Determines if the thread has successfully been prepared.
*
* @return bool
*/
public function isPrepared()
{
return $this->prepared;
}
/** /**
* Runs the thread code and the initialized function. * Runs the thread code and the initialized function.
*/ */
@ -68,13 +77,10 @@ class Thread extends \Thread
// don't do this first, objects we receive from other threads will just // don't do this first, objects we receive from other threads will just
// be garbage data and unserializable values (like resources) will be // be garbage data and unserializable values (like resources) will be
// lost. This happens even with thread-safe objects. // lost. This happens even with thread-safe objects.
if (file_exists($this->autoloaderPath)) { if ('' !== $this->autoloaderPath && file_exists($this->autoloaderPath)) {
require $this->autoloaderPath; require $this->autoloaderPath;
} }
// Initialize the thread-local global event loop.
Loop\loop();
// Register a shutdown handler to deal with errors smoothly. // Register a shutdown handler to deal with errors smoothly.
//register_shutdown_function([$this, 'handleShutdown']); //register_shutdown_function([$this, 'handleShutdown']);
@ -94,73 +100,27 @@ class Thread extends \Thread
// At this point, the thread environment has been prepared, and the // At this point, the thread environment has been prepared, and the
// parent has finished injecting values into our memory, so begin using // parent has finished injecting values into our memory, so begin using
// the channel. // the channel.
$this->channel = new LocalObject(new Channel($this->socket)); $channel = new Channel($this->socket);
// Now that everything is finally ready, invoke the function, closure, Promise\wait(new Coroutine($this->execute($channel)));
// or coroutine passed in from the user. }
/**
* @param Channel $channel
*
* @return \Generator
*/
private function execute(Channel $channel)
{
try { try {
if ($this->function instanceof \Closure) { $function = $this->function;
$generator = $this->function->bindTo($this->context)->__invoke(); $result = new ExitSuccess(yield $function($channel));
} else {
$generator = call_user_func($this->function);
}
if ($generator instanceof \Generator) {
$coroutine = new Coroutine($generator);
} else {
$returnValue = $generator;
}
// Send the return value back to the parent thread.
$response = [
'ok' => true,
'value' => $returnValue,
];
new Coroutine($this->channel->deref()->send($response));
Loop\run();
} catch (\Exception $exception) { } catch (\Exception $exception) {
// If normal execution failed and caused an error, catch it and send $result = new ExitFailure($exception);
// it to the parent context so the error can bubble up.
$response = [
'ok' => false,
'panic' => [
'message' => $exception->getMessage(),
'code' => $exception->getCode(),
'trace' => array_map([$this, 'removeTraceArgs'], $exception->getTrace()),
],
];
new Coroutine($this->channel->deref()->send($response));
} finally {
$this->channel->deref()->close();
} }
// We don't really need to do this, but let's be explicit about freeing yield $channel->send($result);
// our resources.
$this->channel->free();
}
public function handleShutdown() $channel->close();
{
if ($error = error_get_last()) {
$panic = [
'message' => $error['message'],
'code' => 0,
'trace' => array_map([$this, 'removeTraceArgs'], debug_backtrace()),
];
$this->sendMessage(self::MSG_ERROR);
$serialized = serialize($panic);
$length = strlen($serialized);
fwrite($this->socket, pack('S', $length).$serialized);
fclose($this->socket);
}
}
public function removeTraceArgs($trace)
{
unset($trace['args']);
return $trace;
} }
} }

View File

@ -2,8 +2,9 @@
namespace Icicle\Concurrent\Threading; namespace Icicle\Concurrent\Threading;
use Icicle\Concurrent\ContextInterface; use Icicle\Concurrent\ContextInterface;
use Icicle\Concurrent\Exception\PanicError; use Icicle\Concurrent\Exception\SynchronizationError;
use Icicle\Concurrent\Sync\Channel; use Icicle\Concurrent\Sync\Channel;
use Icicle\Concurrent\Sync\ExitInterface;
use Icicle\Promise; use Icicle\Promise;
/** /**
@ -16,45 +17,23 @@ use Icicle\Promise;
class ThreadContext implements ContextInterface class ThreadContext implements ContextInterface
{ {
/** /**
* @var Thread A thread instance. * @var \Icicle\Concurrent\Threading\Thread A thread instance.
*/ */
public $thread; private $thread;
/** /**
* @var Channel A channel for communicating with the thread. * @var \Icicle\Concurrent\Sync\Channel A channel for communicating with the thread.
*/ */
private $channel; private $channel;
/**
* @var bool Indicates if this context instance belongs to the thread.
*/
private $isThread = true;
/**
* {@inheritdoc}
*/
public static function create(callable $function)
{
$thread = new Thread($function);
$thread->autoloaderPath = static::getComposerAutoloader();
$context = new static($thread);
$context->isThread = false;
$context->deferredJoin = new Promise\Deferred(function () use ($context) {
$context->kill();
});
return $context;
}
/** /**
* Creates a new thread context from a thread. * Creates a new thread context from a thread.
* *
* @param Thread $thread The thread object. * @param callable $function
*/ */
public function __construct(Thread $thread) public function __construct(callable $function)
{ {
$this->thread = $thread; $this->thread = new Thread($function, $this->getComposerAutoloader());
} }
/** /**
@ -70,20 +49,20 @@ class ThreadContext implements ContextInterface
*/ */
public function start() public function start()
{ {
$channels = Channel::create(); list($threadSocket, $parentSocket) = Channel::createSocketPair();
$this->channel = new Channel($channels[1]); $this->channel = new Channel($parentSocket);
// Start the thread first. The thread will prepare the autoloader and // Start the thread first. The thread will prepare the autoloader and
// the event loop, and then notify us when the thread environment is // the event loop, and then notify us when the thread environment is
// ready. If we don't do this first, objects will break when passed // ready. If we don't do this first, objects will break when passed
// to the thread, since the classes are not yet defined. // to the thread, since the classes are not yet defined.
$this->thread->start(PTHREADS_INHERIT_INI | PTHREADS_ALLOW_GLOBALS); $this->thread->start(PTHREADS_INHERIT_INI);
// The thread must prepare itself first, so wait until the thread has // The thread must prepare itself first, so wait until the thread has
// done so. We need to unlock ourselves while waiting to prevent // done so. We need to unlock ourselves while waiting to prevent
// deadlocks if we somehow acquired the lock before the thread did. // deadlocks if we somehow acquired the lock before the thread did.
$this->thread->synchronized(function () { $this->thread->synchronized(function () {
if (!$this->thread->prepared) { if (!$this->thread->isPrepared()) {
$this->thread->wait(); $this->thread->wait();
} }
}); });
@ -91,8 +70,8 @@ class ThreadContext implements ContextInterface
// At this stage, the thread environment has been prepared, and we kept // At this stage, the thread environment has been prepared, and we kept
// the lock from above, so initialize the thread with the necessary // the lock from above, so initialize the thread with the necessary
// values to be copied over. // values to be copied over.
$this->thread->synchronized(function () use ($channels) { $this->thread->synchronized(function () use ($threadSocket) {
$this->thread->init($channels[0]); $this->thread->init($threadSocket);
$this->thread->notify(); $this->thread->notify();
}); });
} }
@ -105,35 +84,23 @@ class ThreadContext implements ContextInterface
$this->thread->kill(); $this->thread->kill();
} }
/**
* {@inheritdoc}
*/
public function panic($message = '', $code = 0)
{
if ($this->isThread) {
throw new PanicError($message, $code);
} else {
$this->kill();
}
}
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
public function join() public function join()
{ {
// Get an array of completion data from the thread when it finishes. try {
$response = (yield $this->channel->receive()); $response = (yield $this->channel->receive());
// If the status is not OK, bubble the problem up. if (!$response instanceof ExitInterface) {
if (!$response['ok']) { throw new SynchronizationError('Did not receive an exit status from thread.');
throw new PanicError($response['panic']['message'], $response['panic']['code'], $response['panic']['trace']); }
yield $response->getResult();
} finally {
$this->channel->close();
$this->thread->join();
} }
$this->channel->close();
$this->thread->join();
yield $response['value'];
} }
/** /**
@ -168,32 +135,43 @@ class ThreadContext implements ContextInterface
return $returnValue; return $returnValue;
} }
/**
* {@inheritdoc}
*/
public function receive()
{
$data = (yield $this->channel->receive());
if ($data instanceof ExitInterface) {
throw new SynchronizationError(sprintf('Thread exited with result of type: %s', $data->getResult()));
}
yield $data;
}
/**
* {@inheritdoc}
*/
public function send($data)
{
return $this->channel->send($data);
}
/** /**
* Gets the full path to the Composer autoloader. * Gets the full path to the Composer autoloader.
* *
* If no Composer autoloader is being used, `null` is returned. * If no Composer autoloader is being used, `null` is returned.
* *
* @return \Composer\Autoload\ClassLoader|null * @return string
*/ */
private static function getComposerAutoloader() private function getComposerAutoloader()
{ {
foreach (get_included_files() as $path) { foreach (get_included_files() as $path) {
if (strpos($path, 'vendor/autoload.php') !== false) { if (preg_match('/vendor\/autoload.php$/i', $path)) {
$source = file_get_contents($path); return $path;
if (strpos($source, '@generated by Composer') !== false) {
return $path;
}
} }
} }
// Find the Composer autoloader initializer class, and use it to fetch return '';
// the autoloader instance.
/*foreach (get_declared_classes() as $name) {
if (strpos($name, 'ComposerAutoloaderInit') === 0) {
return $name::getLoader();
}
}*/
return;
} }
} }