1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-27 04:44:56 +01:00

Drop Fork along with Process and Strand interfaces

Forking is just too dangerous for virtually no gain over Process. Context now extends Sync\Channel.
This commit is contained in:
Aaron Piotrowski 2017-11-10 09:58:42 -06:00
parent 0ef74413af
commit f48066eb1b
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
8 changed files with 8 additions and 468 deletions

View File

@ -4,7 +4,7 @@ namespace Amp\Parallel;
use Amp\Promise; use Amp\Promise;
interface Context { interface Context extends Sync\Channel {
/** /**
* @return bool * @return bool
*/ */

View File

@ -1,354 +0,0 @@
<?php
namespace Amp\Parallel\Forking;
use Amp\Coroutine;
use Amp\Loop;
use Amp\Parallel\ContextException;
use Amp\Parallel\Process;
use Amp\Parallel\StatusError;
use Amp\Parallel\Strand;
use Amp\Parallel\Sync\Channel;
use Amp\Parallel\Sync\ChannelException;
use Amp\Parallel\Sync\ChannelledSocket;
use Amp\Parallel\Sync\ExitFailure;
use Amp\Parallel\Sync\ExitResult;
use Amp\Parallel\Sync\ExitSuccess;
use Amp\Parallel\Sync\SerializationException;
use Amp\Parallel\SynchronizationError;
use Amp\Promise;
use function Amp\call;
/**
* Implements a UNIX-compatible context using forked processes.
*/
class Fork implements Process, Strand {
/** @var \Amp\Parallel\Sync\ChannelledSocket A channel for communicating with the child. */
private $channel;
/** @var int */
private $pid = 0;
/** @var callable */
private $function;
/** @var mixed[] */
private $args;
/** @var int */
private $oid = 0;
/**
* Checks if forking is enabled.
*
* @return bool True if forking is enabled, otherwise false.
*/
public static function supported(): bool {
return \extension_loaded('pcntl');
}
/**
* Spawns a new forked process and runs it.
*
* @param callable $function A callable to invoke in the process.
*
* @return \Amp\Parallel\Forking\Fork The process object that was spawned.
*/
public static function spawn(callable $function, ...$args): self {
$fork = new self($function, ...$args);
$fork->start();
return $fork;
}
public function __construct(callable $function, ...$args) {
if (!self::supported()) {
throw new \Error("The pcntl extension is required to create forks.");
}
$this->function = $function;
$this->args = $args;
}
public function __clone() {
$this->pid = 0;
$this->oid = 0;
$this->channel = null;
}
public function __destruct() {
if (0 !== $this->pid && \posix_getpid() === $this->oid) { // Only kill in owner process.
$this->kill(); // Will only terminate if the process is still running.
}
}
/**
* Checks if the context is running.
*
* @return bool True if the context is running, otherwise false.
*/
public function isRunning(): bool {
return 0 !== $this->pid && false !== \posix_getpgid($this->pid);
}
/**
* Gets the forked process's process ID.
*
* @return int The process ID.
*/
public function getPid(): int {
return $this->pid;
}
/**
* Gets the fork's scheduling priority as a percentage.
*
* The priority is a float between 0 and 1 that indicates the relative priority for the forked process, where 0 is
* very low priority, 1 is very high priority, and 0.5 is considered a "normal" priority. The value is based on the
* forked process's "nice" value. The priority affects the operating system's scheduling of processes. How much the
* priority actually affects the amount of CPU time the process gets is ultimately system-specific.
*
* @return float A priority value between 0 and 1.
*
* @throws ContextException If the operation failed.
*
* @see Fork::setPriority()
* @see http://linux.die.net/man/2/getpriority
*/
public function getPriority(): float {
if (($nice = \pcntl_getpriority($this->pid)) === false) {
throw new ContextException('Failed to get the fork\'s priority.');
}
return (19 - $nice) / 39;
}
/**
* Sets the fork's scheduling priority as a percentage.
*
* Note that on many systems, only the superuser can increase the priority of a process.
*
* @param float $priority A priority value between 0 and 1.
*
* @throws \Error If the given priority is an invalid value.
* @throws ContextException If the operation failed.
*
* @see Fork::getPriority()
*/
public function setPriority(float $priority) {
if ($priority < 0 || $priority > 1) {
throw new \Error('Priority value must be between 0.0 and 1.0.');
}
$nice = (int) \round(19 - ($priority * 39));
if (!\pcntl_setpriority($nice, $this->pid, \PRIO_PROCESS)) {
throw new ContextException('Failed to set the fork\'s priority.');
}
}
/**
* Starts the context execution.
*
* @throws \Amp\Parallel\ContextException If forking fails.
*/
public function start() {
if (0 !== $this->oid) {
throw new StatusError('The context has already been started.');
}
$sockets = @\stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
if ($sockets === false) {
$message = "Failed to create socket pair";
if ($error = \error_get_last()) {
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
}
throw new ContextException($message);
}
list($parent, $child) = $sockets;
switch ($pid = \pcntl_fork()) {
case -1: // Failure
throw new ContextException('Could not fork process!');
case 0: // Child
// @codeCoverageIgnoreStart
\fclose($child);
Loop::set((new Loop\DriverFactory)->create()); // Replace loop instance inherited from parent.
Loop::run(function () use ($parent) {
return $this->execute(new ChannelledSocket($parent, $parent));
});
exit(0);
// @codeCoverageIgnoreEnd
default: // Parent
$this->pid = $pid;
$this->oid = \posix_getpid();
$this->channel = new ChannelledSocket($child, $child);
\fclose($parent);
}
}
/**
* @coroutine
*
* This method is run only on the child.
*
* @param \Amp\Parallel\Sync\Channel $channel
*
* @return \Generator
*
* @codeCoverageIgnore Only executed in the child.
*/
private function execute(Channel $channel): \Generator {
try {
if ($this->function instanceof \Closure) {
$result = call($this->function->bindTo($channel, null), ...$this->args);
} else {
$result = call($this->function, ...$this->args);
}
$result = new ExitSuccess(yield $result);
} catch (\Throwable $exception) {
$result = new ExitFailure($exception);
}
// Attempt to return the result.
try {
try {
yield $channel->send($result);
} catch (SerializationException $exception) {
// Serializing the result failed. Send the reason why.
yield $channel->send(new ExitFailure($exception));
}
} catch (ChannelException $exception) {
// The result was not sendable! The parent context must have died or killed the context.
}
}
/**
* {@inheritdoc}
*/
public function kill() {
if ($this->isRunning()) {
// Forcefully kill the process using SIGKILL.
\posix_kill($this->pid, \SIGKILL);
}
if ($this->channel !== null) {
$this->channel->close();
}
// "Detach" from the process and let it die asynchronously.
$this->pid = 0;
$this->channel = null;
}
/**
* @param int $signo
*
* @throws \Amp\Parallel\StatusError
*/
public function signal(int $signo) {
if (0 === $this->pid) {
throw new StatusError('The fork has not been started or has already finished.');
}
\posix_kill($this->pid, $signo);
}
/**
* Gets a promise that resolves when the context ends and joins with the
* parent context.
*
* @return \Amp\Promise<int>
*
* @throws \Amp\Parallel\StatusError Thrown if the context has not been started.
* @throws \Amp\Parallel\SynchronizationError Thrown if an exit status object is not received.
*/
public function join(): Promise {
if (null === $this->channel) {
throw new StatusError('The fork has not been started or has already finished.');
}
return new Coroutine($this->doJoin());
}
/**
* @coroutine
*
* @return \Generator
*
* @throws \Amp\Parallel\SynchronizationError
*/
private function doJoin(): \Generator {
try {
$response = yield $this->channel->receive();
if (!$response instanceof ExitResult) {
throw new SynchronizationError(\sprintf(
'Did not receive an exit result from fork. Instead received data of type %s',
\is_object($response) ? \get_class($response) : \gettype($response)
));
}
} catch (ChannelException $e) {
throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e);
} finally {
$this->kill();
}
return $response->getResult();
}
/**
* {@inheritdoc}
*/
public function receive(): Promise {
if (null === $this->channel) {
throw new StatusError('The process has not been started.');
}
return new Coroutine($this->doReceive());
}
private function doReceive() {
try {
$data = yield $this->channel->receive();
} catch (ChannelException $e) {
throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e);
}
if ($data instanceof ExitResult) {
$data = $data->getResult();
throw new SynchronizationError(\sprintf(
'Forked process unexpectedly exited with result of type: %s',
\is_object($data) ? \get_class($data) : \gettype($data)
));
}
return $data;
}
/**
* {@inheritdoc}
*/
public function send($data): Promise {
if (null === $this->channel) {
throw new StatusError('The fork has not been started or has already finished.');
}
if ($data instanceof ExitResult) {
throw new \Error('Cannot send exit result objects.');
}
return call(function () use ($data) {
try {
yield $this->channel->send($data);
} catch (ChannelException $e) {
throw new ContextException("The context went away, potentially due to a fatal error or calling exit", 0, $e);
}
});
}
}

View File

@ -1,17 +0,0 @@
<?php
namespace Amp\Parallel;
interface Process extends Context {
/**
* @return int PID of process.
*/
public function getPid(): int;
/**
* @param int $signo
*
* @throws \Amp\Parallel\StatusError
*/
public function signal(int $signo);
}

View File

@ -4,10 +4,9 @@ namespace Amp\Parallel\Process;
use Amp\ByteStream; use Amp\ByteStream;
use Amp\Coroutine; use Amp\Coroutine;
use Amp\Parallel\Context;
use Amp\Parallel\ContextException; use Amp\Parallel\ContextException;
use Amp\Parallel\Process as ProcessContext;
use Amp\Parallel\StatusError; use Amp\Parallel\StatusError;
use Amp\Parallel\Strand;
use Amp\Parallel\Sync\ChannelException; use Amp\Parallel\Sync\ChannelException;
use Amp\Parallel\Sync\ChannelledStream; use Amp\Parallel\Sync\ChannelledStream;
use Amp\Parallel\Sync\ExitResult; use Amp\Parallel\Sync\ExitResult;
@ -17,7 +16,7 @@ use Amp\Promise;
use function Amp\asyncCall; use function Amp\asyncCall;
use function Amp\call; use function Amp\call;
class ChannelledProcess implements ProcessContext, Strand { class ChannelledProcess implements Context {
/** @var \Amp\Process\Process */ /** @var \Amp\Process\Process */
private $process; private $process;
@ -187,18 +186,4 @@ class ChannelledProcess implements ProcessContext, Strand {
public function kill() { public function kill() {
$this->process->kill(); $this->process->kill();
} }
/**
* {@inheritdoc}
*/
public function getPid(): int {
return $this->process->getPid();
}
/**
* {@inheritdoc}
*/
public function signal(int $signo) {
$this->process->signal($signo);
}
} }

View File

@ -1,6 +0,0 @@
<?php
namespace Amp\Parallel;
interface Strand extends Context, Sync\Channel {
}

View File

@ -4,16 +4,16 @@ namespace Amp\Parallel\Worker;
use Amp\Coroutine; use Amp\Coroutine;
use Amp\Deferred; use Amp\Deferred;
use Amp\Parallel\Context;
use Amp\Parallel\ContextException; use Amp\Parallel\ContextException;
use Amp\Parallel\StatusError; use Amp\Parallel\StatusError;
use Amp\Parallel\Strand;
use Amp\Promise; use Amp\Promise;
/** /**
* Base class for most common types of task workers. * Base class for most common types of task workers.
*/ */
abstract class AbstractWorker implements Worker { abstract class AbstractWorker implements Worker {
/** @var \Amp\Parallel\Strand */ /** @var \Amp\Parallel\Context */
private $context; private $context;
/** @var bool */ /** @var bool */
@ -26,10 +26,10 @@ abstract class AbstractWorker implements Worker {
private $onResolve; private $onResolve;
/** /**
* @param \Amp\Parallel\Strand $strand * @param \Amp\Parallel\Context $context
*/ */
public function __construct(Strand $strand) { public function __construct(Context $context) {
$this->context = $strand; $this->context = $context;
$this->onResolve = function ($exception, $data) { $this->onResolve = function ($exception, $data) {
if ($exception) { if ($exception) {

View File

@ -1,29 +0,0 @@
<?php
namespace Amp\Parallel\Test\Forking;
use Amp\Loop;
use Amp\Parallel\Forking\Fork;
use Amp\Parallel\Test\AbstractContextTest;
/**
* @group forking
* @requires extension pcntl
*/
class ForkTest extends AbstractContextTest {
public function createContext(callable $function) {
return new Fork($function);
}
public function testSpawnStartsFork() {
Loop::run(function () {
$fork = Fork::spawn(function () {
usleep(100);
});
$this->assertTrue($fork->isRunning());
return yield $fork->join();
});
}
}

View File

@ -52,43 +52,4 @@ class PosixSemaphoreTest extends AbstractSemaphoreTest {
$this->assertTrue($this->semaphore->isFreed()); $this->assertTrue($this->semaphore->isFreed());
} }
/**
* @requires extension pcntl
*/
public function testAcquireInMultipleForks() {
Loop::run(function () {
$this->semaphore = $this->createSemaphore(1);
$fork1 = new Fork(function (Semaphore $semaphore) {
$lock = yield $semaphore->acquire();
usleep(100000);
$lock->release();
return 0;
}, $this->semaphore);
$fork2 = new Fork(function (Semaphore $semaphore) {
$lock = yield $semaphore->acquire();
usleep(100000);
$lock->release();
return 1;
}, $this->semaphore);
$start = microtime(true);
$fork1->start();
$fork2->start();
yield $fork1->join();
yield $fork2->join();
$this->assertGreaterThan(0.1, microtime(true) - $start);
});
}
} }