From f48066eb1be194e1902e4909826d8f3d4a8291e7 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Fri, 10 Nov 2017 09:58:42 -0600 Subject: [PATCH] Drop Fork along with Process and Strand interfaces Forking is just too dangerous for virtually no gain over Process. Context now extends Sync\Channel. --- lib/Context.php | 2 +- lib/Forking/Fork.php | 354 ------------------------------ lib/Process.php | 17 -- lib/Process/ChannelledProcess.php | 19 +- lib/Strand.php | 6 - lib/Worker/AbstractWorker.php | 10 +- test/Forking/ForkTest.php | 29 --- test/Sync/PosixSemaphoreTest.php | 39 ---- 8 files changed, 8 insertions(+), 468 deletions(-) delete mode 100644 lib/Forking/Fork.php delete mode 100644 lib/Process.php delete mode 100644 lib/Strand.php delete mode 100644 test/Forking/ForkTest.php diff --git a/lib/Context.php b/lib/Context.php index 0b40d7b..3e2209c 100644 --- a/lib/Context.php +++ b/lib/Context.php @@ -4,7 +4,7 @@ namespace Amp\Parallel; use Amp\Promise; -interface Context { +interface Context extends Sync\Channel { /** * @return bool */ diff --git a/lib/Forking/Fork.php b/lib/Forking/Fork.php deleted file mode 100644 index 3da7d95..0000000 --- a/lib/Forking/Fork.php +++ /dev/null @@ -1,354 +0,0 @@ -start(); - return $fork; - } - - public function __construct(callable $function, ...$args) { - if (!self::supported()) { - throw new \Error("The pcntl extension is required to create forks."); - } - - $this->function = $function; - $this->args = $args; - } - - public function __clone() { - $this->pid = 0; - $this->oid = 0; - $this->channel = null; - } - - public function __destruct() { - if (0 !== $this->pid && \posix_getpid() === $this->oid) { // Only kill in owner process. - $this->kill(); // Will only terminate if the process is still running. - } - } - - /** - * Checks if the context is running. - * - * @return bool True if the context is running, otherwise false. - */ - public function isRunning(): bool { - return 0 !== $this->pid && false !== \posix_getpgid($this->pid); - } - - /** - * Gets the forked process's process ID. - * - * @return int The process ID. - */ - public function getPid(): int { - return $this->pid; - } - - /** - * Gets the fork's scheduling priority as a percentage. - * - * The priority is a float between 0 and 1 that indicates the relative priority for the forked process, where 0 is - * very low priority, 1 is very high priority, and 0.5 is considered a "normal" priority. The value is based on the - * forked process's "nice" value. The priority affects the operating system's scheduling of processes. How much the - * priority actually affects the amount of CPU time the process gets is ultimately system-specific. - * - * @return float A priority value between 0 and 1. - * - * @throws ContextException If the operation failed. - * - * @see Fork::setPriority() - * @see http://linux.die.net/man/2/getpriority - */ - public function getPriority(): float { - if (($nice = \pcntl_getpriority($this->pid)) === false) { - throw new ContextException('Failed to get the fork\'s priority.'); - } - - return (19 - $nice) / 39; - } - - /** - * Sets the fork's scheduling priority as a percentage. - * - * Note that on many systems, only the superuser can increase the priority of a process. - * - * @param float $priority A priority value between 0 and 1. - * - * @throws \Error If the given priority is an invalid value. - * @throws ContextException If the operation failed. - * - * @see Fork::getPriority() - */ - public function setPriority(float $priority) { - if ($priority < 0 || $priority > 1) { - throw new \Error('Priority value must be between 0.0 and 1.0.'); - } - - $nice = (int) \round(19 - ($priority * 39)); - - if (!\pcntl_setpriority($nice, $this->pid, \PRIO_PROCESS)) { - throw new ContextException('Failed to set the fork\'s priority.'); - } - } - - /** - * Starts the context execution. - * - * @throws \Amp\Parallel\ContextException If forking fails. - */ - public function start() { - if (0 !== $this->oid) { - throw new StatusError('The context has already been started.'); - } - - $sockets = @\stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); - - if ($sockets === false) { - $message = "Failed to create socket pair"; - if ($error = \error_get_last()) { - $message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]); - } - throw new ContextException($message); - } - - list($parent, $child) = $sockets; - - switch ($pid = \pcntl_fork()) { - case -1: // Failure - throw new ContextException('Could not fork process!'); - - case 0: // Child - // @codeCoverageIgnoreStart - \fclose($child); - - Loop::set((new Loop\DriverFactory)->create()); // Replace loop instance inherited from parent. - Loop::run(function () use ($parent) { - return $this->execute(new ChannelledSocket($parent, $parent)); - }); - - exit(0); - // @codeCoverageIgnoreEnd - default: // Parent - $this->pid = $pid; - $this->oid = \posix_getpid(); - $this->channel = new ChannelledSocket($child, $child); - \fclose($parent); - } - } - - /** - * @coroutine - * - * This method is run only on the child. - * - * @param \Amp\Parallel\Sync\Channel $channel - * - * @return \Generator - * - * @codeCoverageIgnore Only executed in the child. - */ - private function execute(Channel $channel): \Generator { - try { - if ($this->function instanceof \Closure) { - $result = call($this->function->bindTo($channel, null), ...$this->args); - } else { - $result = call($this->function, ...$this->args); - } - - $result = new ExitSuccess(yield $result); - } catch (\Throwable $exception) { - $result = new ExitFailure($exception); - } - - // Attempt to return the result. - try { - try { - yield $channel->send($result); - } catch (SerializationException $exception) { - // Serializing the result failed. Send the reason why. - yield $channel->send(new ExitFailure($exception)); - } - } catch (ChannelException $exception) { - // The result was not sendable! The parent context must have died or killed the context. - } - } - - /** - * {@inheritdoc} - */ - public function kill() { - if ($this->isRunning()) { - // Forcefully kill the process using SIGKILL. - \posix_kill($this->pid, \SIGKILL); - } - - if ($this->channel !== null) { - $this->channel->close(); - } - - // "Detach" from the process and let it die asynchronously. - $this->pid = 0; - $this->channel = null; - } - - /** - * @param int $signo - * - * @throws \Amp\Parallel\StatusError - */ - public function signal(int $signo) { - if (0 === $this->pid) { - throw new StatusError('The fork has not been started or has already finished.'); - } - - \posix_kill($this->pid, $signo); - } - - /** - * Gets a promise that resolves when the context ends and joins with the - * parent context. - * - * @return \Amp\Promise - * - * @throws \Amp\Parallel\StatusError Thrown if the context has not been started. - * @throws \Amp\Parallel\SynchronizationError Thrown if an exit status object is not received. - */ - public function join(): Promise { - if (null === $this->channel) { - throw new StatusError('The fork has not been started or has already finished.'); - } - - return new Coroutine($this->doJoin()); - } - - /** - * @coroutine - * - * @return \Generator - * - * @throws \Amp\Parallel\SynchronizationError - */ - private function doJoin(): \Generator { - try { - $response = yield $this->channel->receive(); - - if (!$response instanceof ExitResult) { - throw new SynchronizationError(\sprintf( - 'Did not receive an exit result from fork. Instead received data of type %s', - \is_object($response) ? \get_class($response) : \gettype($response) - )); - } - } catch (ChannelException $e) { - throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e); - } finally { - $this->kill(); - } - - return $response->getResult(); - } - - /** - * {@inheritdoc} - */ - public function receive(): Promise { - if (null === $this->channel) { - throw new StatusError('The process has not been started.'); - } - - return new Coroutine($this->doReceive()); - } - - private function doReceive() { - try { - $data = yield $this->channel->receive(); - } catch (ChannelException $e) { - throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e); - } - - if ($data instanceof ExitResult) { - $data = $data->getResult(); - throw new SynchronizationError(\sprintf( - 'Forked process unexpectedly exited with result of type: %s', - \is_object($data) ? \get_class($data) : \gettype($data) - )); - } - - return $data; - } - - /** - * {@inheritdoc} - */ - public function send($data): Promise { - if (null === $this->channel) { - throw new StatusError('The fork has not been started or has already finished.'); - } - - if ($data instanceof ExitResult) { - throw new \Error('Cannot send exit result objects.'); - } - - return call(function () use ($data) { - try { - yield $this->channel->send($data); - } catch (ChannelException $e) { - throw new ContextException("The context went away, potentially due to a fatal error or calling exit", 0, $e); - } - }); - } -} diff --git a/lib/Process.php b/lib/Process.php deleted file mode 100644 index f488656..0000000 --- a/lib/Process.php +++ /dev/null @@ -1,17 +0,0 @@ -process->kill(); } - - /** - * {@inheritdoc} - */ - public function getPid(): int { - return $this->process->getPid(); - } - - /** - * {@inheritdoc} - */ - public function signal(int $signo) { - $this->process->signal($signo); - } } diff --git a/lib/Strand.php b/lib/Strand.php deleted file mode 100644 index efd6b6b..0000000 --- a/lib/Strand.php +++ /dev/null @@ -1,6 +0,0 @@ -context = $strand; + public function __construct(Context $context) { + $this->context = $context; $this->onResolve = function ($exception, $data) { if ($exception) { diff --git a/test/Forking/ForkTest.php b/test/Forking/ForkTest.php deleted file mode 100644 index 4e72035..0000000 --- a/test/Forking/ForkTest.php +++ /dev/null @@ -1,29 +0,0 @@ -assertTrue($fork->isRunning()); - - return yield $fork->join(); - }); - } -} diff --git a/test/Sync/PosixSemaphoreTest.php b/test/Sync/PosixSemaphoreTest.php index fcf05fd..041292a 100644 --- a/test/Sync/PosixSemaphoreTest.php +++ b/test/Sync/PosixSemaphoreTest.php @@ -52,43 +52,4 @@ class PosixSemaphoreTest extends AbstractSemaphoreTest { $this->assertTrue($this->semaphore->isFreed()); } - - /** - * @requires extension pcntl - */ - public function testAcquireInMultipleForks() { - Loop::run(function () { - $this->semaphore = $this->createSemaphore(1); - - $fork1 = new Fork(function (Semaphore $semaphore) { - $lock = yield $semaphore->acquire(); - - usleep(100000); - - $lock->release(); - - return 0; - }, $this->semaphore); - - $fork2 = new Fork(function (Semaphore $semaphore) { - $lock = yield $semaphore->acquire(); - - usleep(100000); - - $lock->release(); - - return 1; - }, $this->semaphore); - - $start = microtime(true); - - $fork1->start(); - $fork2->start(); - - yield $fork1->join(); - yield $fork2->join(); - - $this->assertGreaterThan(0.1, microtime(true) - $start); - }); - } }