diff --git a/examples/fork.php b/examples/fork.php old mode 100644 new mode 100755 index 90387d2..5ce3c88 --- a/examples/fork.php +++ b/examples/fork.php @@ -1,3 +1,4 @@ +#!/usr/bin/env php start(); @@ -22,8 +25,7 @@ Coroutine\create(function () { }); try { - yield $context->join(); - print "Context done!\n"; + printf("Child ended with value %d!\n", (yield $context->join())); } catch (Exception $e) { print "Error from child!\n"; print $e."\n"; diff --git a/examples/thread.php b/examples/thread.php old mode 100644 new mode 100755 index 11b7c3f..b434f1f --- a/examples/thread.php +++ b/examples/thread.php @@ -1,3 +1,4 @@ +#!/usr/bin/env php start(); printf("Thread ended with value %d!\n", (yield $test->join())); -})->done(function () { - Loop\stop(); -}); +})->done([$timer, 'stop']); Loop\run(); diff --git a/src/ContextInterface.php b/src/ContextInterface.php index 960e4be..36315dc 100644 --- a/src/ContextInterface.php +++ b/src/ContextInterface.php @@ -6,13 +6,6 @@ namespace Icicle\Concurrent; */ interface ContextInterface extends SynchronizableInterface { - /** - * Creates a new context with a given function to run. - * - * @return ContextInterface A context instance. - */ - public static function create(callable $function); - /** * Checks if the context is running. * @@ -31,12 +24,20 @@ interface ContextInterface extends SynchronizableInterface public function kill(); /** - * Causes the context to immediately panic. + * @return \Generator * - * @param string $message A panic message. - * @param int $code A panic code. + * @resolve mixed */ - public function panic($message = '', $code = 0); + public function receive(); + + /** + * @param mixed $data + * + * @return \Generator + * + * @resolve int + */ + public function send($data); /** * Gets a promise that resolves when the context ends and joins with the diff --git a/src/Exception/PanicError.php b/src/Exception/PanicError.php index fcbbe34..eb8f2e4 100644 --- a/src/Exception/PanicError.php +++ b/src/Exception/PanicError.php @@ -4,9 +4,9 @@ namespace Icicle\Concurrent\Exception; class PanicError extends Error { /** - * @var array Stack trace of the panic. + * @var string Stack trace of the panic. */ - private $panicTrace; + private $trace; /** * Creates a new panic error. @@ -15,39 +15,19 @@ class PanicError extends Error * @param int $code The panic code. * @param array $trace The panic stack trace. */ - public function __construct($message = '', $code = 0, array $trace = []) + public function __construct($message = '', $code = 0, $trace = '') { parent::__construct($message, $code); - $this->panicTrace = $trace; + $this->trace = $trace; } /** * Gets the stack trace at the point the panic occurred. * - * @return array + * @return string */ public function getPanicTrace() { - return $this->panicTrace; - } - - /** - * Gets the panic stack trace as a string. - * - * @return string - */ - public function getPanicTraceAsString() - { - foreach ($this->panicTrace as $id => $scope) { - $string .= sprintf("%d# %s(%d): %s%s%s()\n", - $id, - $scope['file'], - $scope['line'], - isset($scope['class']) ? $scope['class'] : '', - isset($scope['type']) ? $scope['type'] : '', - $scope['function']); - } - - return $string; + return $this->trace; } } diff --git a/src/Exception/SynchronizationError.php b/src/Exception/SynchronizationError.php new file mode 100644 index 0000000..2802661 --- /dev/null +++ b/src/Exception/SynchronizationError.php @@ -0,0 +1,6 @@ +function = $function; - $instance->deferred = new Deferred(function (\Exception $exception) use ($instance) { - $instance->stop(); - }); - - return $instance; + $this->function = $function; } /** @@ -55,11 +56,6 @@ class ForkContext extends Synchronized implements ContextInterface */ public function isRunning() { - // If we are the child process, then we must be running, don't you think? - if ($this->isChild) { - return true; - } - return posix_getpgid($this->pid) !== false; } @@ -68,37 +64,24 @@ class ForkContext extends Synchronized implements ContextInterface */ public function start() { - $channels = Channel::create(); + list($parent, $child) = Channel::createSocketPair(); - $this->parentSocket = $channels[0]; - $this->childSocket = $channels[1]; - - $parentPid = getmypid(); if (($pid = pcntl_fork()) === -1) { throw new \Exception(); } // We are the parent inside this block. if ($pid !== 0) { - $this->pid = $pid; + $this->channel = new Channel($parent); + fclose($child); - // Wait for the child process to send us a byte over the socket pair - // to discover immediately when the process has completed. - // @TODO error checking, check message type received - $receive = new Coroutine($this->parentSocket->receive()); - $receive->then(function ($data) { - $this->deferred->resolve(); - }, function (\Exception $exception) { - $this->deferred->reject($exception); - }); + $this->pid = $pid; return; } - // We are the child, so close the parent socket and initialize child values. - $this->isChild = true; - $this->pid = getmypid(); - $this->parentSocket->close(); + $channel = new Channel($child); + fclose($parent); // We will have a cloned event loop from the parent after forking. The // child context by default is synchronous and uses the parent event @@ -109,9 +92,33 @@ class ForkContext extends Synchronized implements ContextInterface Loop\clear(); // Execute the context runnable and send the parent context the result. - $this->run(); + Promise\wait(new Coroutine($this->execute($channel))); + + exit(0); } + /** + * @param Channel $channel + * + * @return \Generator + */ + private function execute(Channel $channel) + { + try { + $function = $this->function; + $result = new ExitSuccess(yield $function($channel)); + } catch (\Exception $exception) { + $result = new ExitFailure($exception); + } + + yield $channel->send($result); + + $channel->close(); + } + + /** + * {@inheritdoc} + */ public function kill() { if ($this->isRunning()) { @@ -125,50 +132,38 @@ class ForkContext extends Synchronized implements ContextInterface */ public function join() { - if ($this->isChild) { - throw new \Exception(); - } + try { + $response = (yield $this->channel->receive()); - return $this->deferred->getPromise(); + if (!$response instanceof ExitInterface) { + throw new SynchronizationError('Did not receive an exit status from fork.'); + } + + yield $response->getResult(); + } finally { + $this->kill(); + } } /** * {@inheritdoc} */ - public function panic($message = '', $code = 0) + public function receive() { - if ($this->isThread) { - throw new PanicError($message, $code); + $data = (yield $this->channel->receive()); + + if ($data instanceof ExitInterface) { + throw new SynchronizationError(sprintf('Fork exited with result of type: %s', $data->getResult())); } + + yield $data; } - public function __destruct() + /** + * {@inheritdoc} + */ + public function send($data) { - parent::__destruct(); - - // The parent process outlives the child process, so don't destroy the - // semaphore until the parent exits. - if (!$this->isChild) { - //$this->semaphore->destroy(); - } - } - - private function run() - { - try { - $generator = call_user_func($this->function); - if ($generator instanceof \Generator) { - $coroutine = new Coroutine($generator); - } - Loop\run(); - /*} catch (\Exception $exception) { - fwrite($this->childSocket, chr(self::MSG_ERROR)); - $serialized = serialize($exception); - $length = strlen($serialized); - fwrite($this->childSocket, pack('S', $length).$serialized);*/ - } finally { - $this->childSocket->close(); - exit(0); - } + return $this->channel->send($data); } } diff --git a/src/Sync/Channel.php b/src/Sync/Channel.php index a5099b9..1cd85d0 100644 --- a/src/Sync/Channel.php +++ b/src/Sync/Channel.php @@ -21,14 +21,19 @@ class Channel const HEADER_LENGTH = 5; /** - * @var DuplexStream An asynchronous socket stream. + * @var \Icicle\Socket\Stream\DuplexStream An asynchronous socket stream. */ - private $socket; + private $stream; /** - * @var resource A synchronous socket stream. + * Creates a new channel instance. + * + * @param resource $socket */ - private $socketResource; + public function __construct($socket) + { + $this->stream = new DuplexStream($socket); + } /** * Creates a new channel and returns a pair of connections. @@ -37,9 +42,11 @@ class Channel * channel. Each connection is a peer and interacts with the other, even * across threads or processes. * - * @return [Channel, Channel] A pair of channels. + * @return resource[] Pair of socket resources. + * + * @throws \Icicle\Concurrent\Exception\ChannelException */ - public static function create() + public static function createSocketPair() { // Create a socket pair. if (($sockets = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP)) === false) { @@ -54,7 +61,7 @@ class Channel * * @param mixed $data The data to send. * - * @return Generator + * @return \Generator */ public function send($data) { @@ -68,15 +75,15 @@ class Channel $length = strlen($serialized); $header = pack('CL', self::MESSAGE_DATA, $length); - $message = $header.$serialized; + $message = $header . $serialized; - yield $this->getSocket()->write($message); + yield $this->stream->write($message); } /** * Waits asynchronously for a message from the peer. * - * @return Generator + * @return \Generator */ public function receive() { @@ -84,7 +91,7 @@ class Channel $buffer = ''; $length = self::HEADER_LENGTH; do { - $buffer .= (yield $this->getSocket()->read($length)); + $buffer .= (yield $this->stream->read($length)); } while (($length -= strlen($buffer)) > 0); $header = unpack('Ctype/Llength', $buffer); @@ -92,7 +99,7 @@ class Channel // If the message type is MESSAGE_CLOSE, the peer was closed and the channel // is done. if ($header['type'] === self::MESSAGE_CLOSE) { - $this->getSocket()->close(); + $this->stream->close(); yield null; return; } @@ -102,7 +109,7 @@ class Channel $buffer = ''; $length = $header['length']; do { - $buffer .= (yield $this->getSocket()->read($length)); + $buffer .= (yield $this->stream->read($length)); } while (($length -= strlen($buffer)) > 0); // Attempt to unserialize the received data. @@ -120,14 +127,14 @@ class Channel * This method closes the connection to the peer and sends a message to the * peer notifying that the connection has been closed. * - * @return PromiseInterface + * @return \Icicle\Promise\PromiseInterface */ public function close() { // Create a message with just a DONE header and zero data. $message = pack('Cx4', self::MESSAGE_CLOSE); - return $this->getSocket()->end($message); + return $this->stream->end($message); } /** @@ -137,30 +144,6 @@ class Channel */ public function isOpen() { - return $this->getSocket()->isOpen(); - } - - /** - * Creates a new channel instance. - * - * @param resource $socketResource - */ - public function __construct($socketResource) - { - $this->socketResource = $socketResource; - } - - /** - * Gets an asynchronous socket instance. - * - * @return DuplexStream - */ - private function getSocket() - { - if ($this->socket === null) { - $this->socket = new DuplexStream($this->socketResource); - } - - return $this->socket; + return $this->stream->isOpen(); } } diff --git a/src/Sync/ExitFailure.php b/src/Sync/ExitFailure.php new file mode 100644 index 0000000..d79413f --- /dev/null +++ b/src/Sync/ExitFailure.php @@ -0,0 +1,47 @@ +type = get_class($exception); + $this->message = $exception->getMessage(); + $this->code = $exception->getCode(); + $this->trace = $exception->getTraceAsString(); + } + + /** + * {@inheritdoc} + */ + public function getResult() + { + throw new PanicError( + sprintf('Uncaught exception in context of type "%s" with message "%s"', $this->type, $this->message), + $this->code, + $this->trace + ); + } +} \ No newline at end of file diff --git a/src/Sync/ExitInterface.php b/src/Sync/ExitInterface.php new file mode 100644 index 0000000..50ab5c6 --- /dev/null +++ b/src/Sync/ExitInterface.php @@ -0,0 +1,12 @@ +result = $result; + } + + /** + * {@inheritdoc} + */ + public function getResult() + { + return $this->result; + } +} \ No newline at end of file diff --git a/src/Threading/Thread.php b/src/Threading/Thread.php index e4ee307..ba3076e 100644 --- a/src/Threading/Thread.php +++ b/src/Threading/Thread.php @@ -2,8 +2,10 @@ namespace Icicle\Concurrent\Threading; use Icicle\Concurrent\Sync\Channel; +use Icicle\Concurrent\Sync\ExitFailure; +use Icicle\Concurrent\Sync\ExitSuccess; use Icicle\Coroutine\Coroutine; -use Icicle\Loop; +use Icicle\Promise; /** * An internal thread that executes a given function concurrently. @@ -11,12 +13,7 @@ use Icicle\Loop; class Thread extends \Thread { /** - * @var ThreadContext An instance of the context local to this thread. - */ - public $context; - - /** - * @var string|null Path to an autoloader to include. + * @var string Path to an autoloader to include. */ public $autoloaderPath; @@ -25,10 +22,12 @@ class Thread extends \Thread */ private $function; - public $prepared = false; - public $initialized = false; + private $prepared = false; + private $initialized = false; - private $channel; + /** + * @var resource + */ private $socket; /** @@ -36,9 +35,9 @@ class Thread extends \Thread * * @param callable $function The function to execute in the thread. */ - public function __construct(callable $function) + public function __construct(callable $function, $autoloaderPath = '') { - $this->context = new ThreadContext($this); + $this->autoloaderPath = $autoloaderPath; $this->function = $function; } @@ -53,6 +52,16 @@ class Thread extends \Thread $this->initialized = true; } + /** + * Determines if the thread has successfully been prepared. + * + * @return bool + */ + public function isPrepared() + { + return $this->prepared; + } + /** * Runs the thread code and the initialized function. */ @@ -68,13 +77,10 @@ class Thread extends \Thread // don't do this first, objects we receive from other threads will just // be garbage data and unserializable values (like resources) will be // lost. This happens even with thread-safe objects. - if (file_exists($this->autoloaderPath)) { + if ('' !== $this->autoloaderPath && file_exists($this->autoloaderPath)) { require $this->autoloaderPath; } - // Initialize the thread-local global event loop. - Loop\loop(); - // Register a shutdown handler to deal with errors smoothly. //register_shutdown_function([$this, 'handleShutdown']); @@ -94,73 +100,27 @@ class Thread extends \Thread // At this point, the thread environment has been prepared, and the // parent has finished injecting values into our memory, so begin using // the channel. - $this->channel = new LocalObject(new Channel($this->socket)); + $channel = new Channel($this->socket); - // Now that everything is finally ready, invoke the function, closure, - // or coroutine passed in from the user. + Promise\wait(new Coroutine($this->execute($channel))); + } + + /** + * @param Channel $channel + * + * @return \Generator + */ + private function execute(Channel $channel) + { try { - if ($this->function instanceof \Closure) { - $generator = $this->function->bindTo($this->context)->__invoke(); - } else { - $generator = call_user_func($this->function); - } - - if ($generator instanceof \Generator) { - $coroutine = new Coroutine($generator); - } else { - $returnValue = $generator; - } - - // Send the return value back to the parent thread. - $response = [ - 'ok' => true, - 'value' => $returnValue, - ]; - new Coroutine($this->channel->deref()->send($response)); - - Loop\run(); + $function = $this->function; + $result = new ExitSuccess(yield $function($channel)); } catch (\Exception $exception) { - // If normal execution failed and caused an error, catch it and send - // it to the parent context so the error can bubble up. - $response = [ - 'ok' => false, - 'panic' => [ - 'message' => $exception->getMessage(), - 'code' => $exception->getCode(), - 'trace' => array_map([$this, 'removeTraceArgs'], $exception->getTrace()), - ], - ]; - - new Coroutine($this->channel->deref()->send($response)); - } finally { - $this->channel->deref()->close(); + $result = new ExitFailure($exception); } - // We don't really need to do this, but let's be explicit about freeing - // our resources. - $this->channel->free(); - } + yield $channel->send($result); - public function handleShutdown() - { - if ($error = error_get_last()) { - $panic = [ - 'message' => $error['message'], - 'code' => 0, - 'trace' => array_map([$this, 'removeTraceArgs'], debug_backtrace()), - ]; - - $this->sendMessage(self::MSG_ERROR); - $serialized = serialize($panic); - $length = strlen($serialized); - fwrite($this->socket, pack('S', $length).$serialized); - fclose($this->socket); - } - } - - public function removeTraceArgs($trace) - { - unset($trace['args']); - return $trace; + $channel->close(); } } diff --git a/src/Threading/ThreadContext.php b/src/Threading/ThreadContext.php index 4141ccc..d41da45 100644 --- a/src/Threading/ThreadContext.php +++ b/src/Threading/ThreadContext.php @@ -2,8 +2,9 @@ namespace Icicle\Concurrent\Threading; use Icicle\Concurrent\ContextInterface; -use Icicle\Concurrent\Exception\PanicError; +use Icicle\Concurrent\Exception\SynchronizationError; use Icicle\Concurrent\Sync\Channel; +use Icicle\Concurrent\Sync\ExitInterface; use Icicle\Promise; /** @@ -16,45 +17,23 @@ use Icicle\Promise; class ThreadContext implements ContextInterface { /** - * @var Thread A thread instance. + * @var \Icicle\Concurrent\Threading\Thread A thread instance. */ - public $thread; + private $thread; /** - * @var Channel A channel for communicating with the thread. + * @var \Icicle\Concurrent\Sync\Channel A channel for communicating with the thread. */ private $channel; - /** - * @var bool Indicates if this context instance belongs to the thread. - */ - private $isThread = true; - - /** - * {@inheritdoc} - */ - public static function create(callable $function) - { - $thread = new Thread($function); - $thread->autoloaderPath = static::getComposerAutoloader(); - - $context = new static($thread); - $context->isThread = false; - $context->deferredJoin = new Promise\Deferred(function () use ($context) { - $context->kill(); - }); - - return $context; - } - /** * Creates a new thread context from a thread. * - * @param Thread $thread The thread object. + * @param callable $function */ - public function __construct(Thread $thread) + public function __construct(callable $function) { - $this->thread = $thread; + $this->thread = new Thread($function, $this->getComposerAutoloader()); } /** @@ -70,20 +49,20 @@ class ThreadContext implements ContextInterface */ public function start() { - $channels = Channel::create(); - $this->channel = new Channel($channels[1]); + list($threadSocket, $parentSocket) = Channel::createSocketPair(); + $this->channel = new Channel($parentSocket); // Start the thread first. The thread will prepare the autoloader and // the event loop, and then notify us when the thread environment is // ready. If we don't do this first, objects will break when passed // to the thread, since the classes are not yet defined. - $this->thread->start(PTHREADS_INHERIT_INI | PTHREADS_ALLOW_GLOBALS); + $this->thread->start(PTHREADS_INHERIT_INI); // The thread must prepare itself first, so wait until the thread has // done so. We need to unlock ourselves while waiting to prevent // deadlocks if we somehow acquired the lock before the thread did. $this->thread->synchronized(function () { - if (!$this->thread->prepared) { + if (!$this->thread->isPrepared()) { $this->thread->wait(); } }); @@ -91,8 +70,8 @@ class ThreadContext implements ContextInterface // At this stage, the thread environment has been prepared, and we kept // the lock from above, so initialize the thread with the necessary // values to be copied over. - $this->thread->synchronized(function () use ($channels) { - $this->thread->init($channels[0]); + $this->thread->synchronized(function () use ($threadSocket) { + $this->thread->init($threadSocket); $this->thread->notify(); }); } @@ -105,35 +84,23 @@ class ThreadContext implements ContextInterface $this->thread->kill(); } - /** - * {@inheritdoc} - */ - public function panic($message = '', $code = 0) - { - if ($this->isThread) { - throw new PanicError($message, $code); - } else { - $this->kill(); - } - } - /** * {@inheritdoc} */ public function join() { - // Get an array of completion data from the thread when it finishes. - $response = (yield $this->channel->receive()); + try { + $response = (yield $this->channel->receive()); - // If the status is not OK, bubble the problem up. - if (!$response['ok']) { - throw new PanicError($response['panic']['message'], $response['panic']['code'], $response['panic']['trace']); + if (!$response instanceof ExitInterface) { + throw new SynchronizationError('Did not receive an exit status from thread.'); + } + + yield $response->getResult(); + } finally { + $this->channel->close(); + $this->thread->join(); } - - $this->channel->close(); - $this->thread->join(); - - yield $response['value']; } /** @@ -168,32 +135,43 @@ class ThreadContext implements ContextInterface return $returnValue; } + /** + * {@inheritdoc} + */ + public function receive() + { + $data = (yield $this->channel->receive()); + + if ($data instanceof ExitInterface) { + throw new SynchronizationError(sprintf('Thread exited with result of type: %s', $data->getResult())); + } + + yield $data; + } + + /** + * {@inheritdoc} + */ + public function send($data) + { + return $this->channel->send($data); + } + /** * Gets the full path to the Composer autoloader. * * If no Composer autoloader is being used, `null` is returned. * - * @return \Composer\Autoload\ClassLoader|null + * @return string */ - private static function getComposerAutoloader() + private function getComposerAutoloader() { foreach (get_included_files() as $path) { - if (strpos($path, 'vendor/autoload.php') !== false) { - $source = file_get_contents($path); - if (strpos($source, '@generated by Composer') !== false) { - return $path; - } + if (preg_match('/vendor\/autoload.php$/i', $path)) { + return $path; } } - // Find the Composer autoloader initializer class, and use it to fetch - // the autoloader instance. - /*foreach (get_declared_classes() as $name) { - if (strpos($name, 'ComposerAutoloaderInit') === 0) { - return $name::getLoader(); - } - }*/ - - return; + return ''; } }