diff --git a/examples/fork.php b/examples/fork.php index 9af2108..89cf923 100644 --- a/examples/fork.php +++ b/examples/fork.php @@ -1,30 +1,40 @@ synchronized(function () { + $this->data = 'progress'; + }); + + sleep(2); } } $context = new Test(); -$context->start()->then(function () { - print "Context finished!\n"; - Icicle\Loop\stop(); +$context->data = 'blank'; +$context->start(); +$context->join()->then(function () { + print "Context done!\n"; + Loop\stop(); }); -print "Context started.\n"; - -Icicle\Loop\periodic(1, function () { +Loop\periodic(1, function () use ($context) { static $i; $i = $i + 1 ?: 1; print "Demonstrating how alive the parent is for the {$i}th time.\n"; + + $context->synchronized(function ($context) { + printf("Context data: '%s'\n", $context->data); + }); }); -Icicle\Loop\run(); +Loop\run(); diff --git a/src/Forking/ForkContext.php b/src/Forking/ForkContext.php index de5f41e..fbb1574 100644 --- a/src/Forking/ForkContext.php +++ b/src/Forking/ForkContext.php @@ -1,23 +1,54 @@ deferred = new Deferred(function (\Exception $exception) { + $this->stop(); + }); + $this->semaphore = new Semaphore(); + } + + /** + * Gets the forked process's process ID. + * + * @return int The process ID. + */ public function getPid() { return $this->pid; } + /** + * {@inheritdoc} + */ public function isRunning() { if (!$this->isChild) { @@ -27,17 +58,11 @@ abstract class ForkContext implements Context return true; } - public function join() - { - pcntl_waitpid($this->pid, $status); - } - + /** + * {@inheritdoc} + */ public function start() { - $deferred = new Deferred(function (\Exception $exception) { - $this->stop(); - }); - if (($fd = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP)) === false) { throw new \Exception(); } @@ -51,19 +76,42 @@ abstract class ForkContext implements Context Loop\reInit(); - // We are the parent, so create a server socket. if ($pid !== 0) { + // We are the parent, so close the child socket. $this->pid = $pid; - $this->parentSocket->read(0, "\n")->then(function ($data) use ($deferred) { - print "Got data from worker: $data\n"; - $deferred->resolve(); - }, function (\Exception $exception) use ($deferred) { - $deferred->reject($exception); + fclose($this->childSocket); + + // Wait for the child process to send us a byte over the socket pair + // to discover immediately when the process has completed. + $this->parentSocket->read(1)->then(function ($data) { + $message = ord($data); + if ($message === self::MSG_DONE) { + $this->deferred->resolve(); + return; + } + + // Get the fatal exception from the process. + return $this->parentSocket->read(2)->then(function ($data) { + list($serializedLength) = unpack('S', $data); + return $this->parentSocket->read($serializedLength); + })->then(function ($data) { + $previous = unserialize($data); + $exception = new ContextAbortException('The context encountered an error.', 0, $previous); + $this->deferred->reject($exception); + $this->parentSocket->close(); + }); + }, function (\Exception $exception) { + $this->deferred->reject($exception); }); - return $deferred->getPromise(); + return; } + // We are the child, so close the parent socket and initialize child values. + $this->isChild = true; + $this->pid = getmypid(); + $this->parentSocket->close(); + // We will have a cloned event loop from the parent after forking. The // child context by default is synchronous and uses the parent event // loop, so we need to stop the clone before doing any work in case it @@ -71,21 +119,19 @@ abstract class ForkContext implements Context Loop\clear(); Loop\stop(); - $this->pid = getmypid(); - + // Execute the context runnable and send the parent context the result. try { - // We are the child, so begin working. $this->run(); - - // Let the parent context now that we are done by sending some data. - fwrite($this->childSocket, 'done'); - } catch (\Throwable $e) { - fwrite($this->childSocket, 'error'); + fwrite($this->childSocket, chr(self::MSG_DONE)); + } catch (\Exception $exception) { + fwrite($this->childSocket, chr(self::MSG_ERROR)); + $serialized = serialize($exception); + $length = strlen($serialized); + fwrite($this->childSocket, pack('S', $length).$serialized); + } finally { + fclose($this->childSocket); + exit(0); } - - fwrite($this->childSocket, 'done'); - fclose($this->childSocket); - exit(0); } public function stop() @@ -104,17 +150,46 @@ abstract class ForkContext implements Context } } + /** + * {@inheritdoc} + */ + public function join() + { + if ($this->isChild) { + throw new \Exception(); + } + + return $this->deferred->getPromise(); + } + + /** + * {@inheritdoc} + */ public function lock() { + $this->semaphore->lock(); } + /** + * {@inheritdoc} + */ public function unlock() { + $this->semaphore->unlock(); } - public function synchronize(callable $callback) + /** + * {@inheritdoc} + */ + public function synchronized(callable $callback) { + $this->lock(); + $callback($this); + $this->unlock(); } + /** + * {@inheritdoc} + */ abstract public function run(); } diff --git a/src/Forking/Synchronizable.php b/src/Forking/Synchronizable.php new file mode 100644 index 0000000..1c47d87 --- /dev/null +++ b/src/Forking/Synchronizable.php @@ -0,0 +1,57 @@ +memoryKey = abs(crc32(spl_object_hash($this))); + $this->memoryBlock = shm_attach($this->memoryKey, 8192); + if (!is_resource($this->memoryBlock)) { + throw new \Exception(); + } + } + + public function __isset($name) + { + $key = abs(crc32($name)); + return shm_has_var($this->memoryBlock, $key); + } + + public function __get($name) + { + $key = abs(crc32($name)); + if (shm_has_var($this->memoryBlock, $key)) { + $serialized = shm_get_var($this->memoryBlock, $key); + return unserialize($serialized); + } + } + + public function __set($name, $value) + { + $key = abs(crc32($name)); + if (!shm_put_var($this->memoryBlock, $key, serialize($value))) { + throw new \Exception(); + } + } + + public function __unset($name) + { + $key = abs(crc32($name)); + if (!shm_remove_var($this->memoryBlock, $key)) { + throw new \Exception(); + } + } + + public function __destruct() + { + if ($this->memoryBlock) { + if (!shm_remove($this->memoryBlock)) { + throw new \Exception(); + } + } + } +}