1
0
mirror of https://github.com/danog/parallel.git synced 2025-01-22 14:01:14 +01:00

Implement thread killing

Will not interrupt blocking calls such as sleep, but in practice this shouldn't be much of a problem.
This commit is contained in:
Aaron Piotrowski 2019-02-14 18:12:51 -06:00
parent 23669b9572
commit 638a1bb638
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
2 changed files with 120 additions and 55 deletions

View File

@ -13,15 +13,11 @@ use function Amp\call;
final class ParallelRunner final class ParallelRunner
{ {
const EXIT_CHECK_FREQUENCY = 250; const KILL = 9;
const TERMINATE = 15;
public static function execute(Channel $channel, string $path, string $arguments): int public static function unserializeArguments(string $arguments): array
{ {
Loop::unreference(Loop::repeat(self::EXIT_CHECK_FREQUENCY, function () {
// Empty function. This timer exists to provide a breakpoint for the thread to be killed.
}));
try {
\set_error_handler(function ($errno, $errstr, $errfile, $errline) { \set_error_handler(function ($errno, $errstr, $errfile, $errline) {
if ($errno & \error_reporting()) { if ($errno & \error_reporting()) {
throw new ChannelException(\sprintf( throw new ChannelException(\sprintf(
@ -43,10 +39,36 @@ final class ParallelRunner
\restore_error_handler(); \restore_error_handler();
} }
if (!\is_array($arguments)) { // This *should not* be able to happen. if (!\is_array($arguments)) {
throw new \Error("Arguments did not unserialize to an array"); throw new SerializationException("Argument list did not unserialize to an array");
} }
return \array_values($arguments);
}
public static function handleSignals(Channel $channel): Promise
{
return call(function () use ($channel) {
try {
$signal = yield $channel->receive();
switch ($signal) {
case self::TERMINATE:
Loop::stop();
return;
case self::KILL:
exit(1);
}
} catch (ChannelException $exception) {
// Channel closed unexpectedly, ignore.
}
});
}
public static function execute(Channel $channel, string $path, string $arguments): int
{
try {
if (!\is_file($path)) { if (!\is_file($path)) {
throw new \Error(\sprintf("No script found at '%s' (be sure to provide the full path to the script)", $path)); throw new \Error(\sprintf("No script found at '%s' (be sure to provide the full path to the script)", $path));
} }
@ -62,6 +84,8 @@ final class ParallelRunner
throw new \Error(\sprintf("Script '%s' contains a parse error", $path), 0, $exception); throw new \Error(\sprintf("Script '%s' contains a parse error", $path), 0, $exception);
} }
$arguments = self::unserializeArguments($arguments);
$result = new ExitSuccess(Promise\wait(call($callable, $channel, ...$arguments))); $result = new ExitSuccess(Promise\wait(call($callable, $channel, ...$arguments)));
} catch (\Throwable $exception) { } catch (\Throwable $exception) {
$result = new ExitFailure($exception); $result = new ExitFailure($exception);

View File

@ -10,7 +10,6 @@ use Amp\Parallel\Sync\ExitResult;
use Amp\Parallel\Sync\SerializationException; use Amp\Parallel\Sync\SerializationException;
use Amp\Parallel\Sync\SynchronizationError; use Amp\Parallel\Sync\SynchronizationError;
use Amp\Promise; use Amp\Promise;
use parallel\Exception as ParallelException;
use parallel\Runtime; use parallel\Runtime;
use function Amp\call; use function Amp\call;
@ -35,7 +34,10 @@ final class Parallel implements Context
private $runtime; private $runtime;
/** @var ChannelledSocket|null A channel for communicating with the parallel thread. */ /** @var ChannelledSocket|null A channel for communicating with the parallel thread. */
private $channel; private $communicationChannel;
/** @var ChannelledSocket|null */
private $signalChannel;
/** @var string Script path. */ /** @var string Script path. */
private $script; private $script;
@ -46,6 +48,9 @@ final class Parallel implements Context
/** @var int */ /** @var int */
private $oid = 0; private $oid = 0;
/** @var bool */
private $killed = false;
/** @var \parallel\Future|null */ /** @var \parallel\Future|null */
private $future; private $future;
@ -127,8 +132,11 @@ final class Parallel implements Context
public function __clone() public function __clone()
{ {
$this->runtime = null; $this->runtime = null;
$this->channel = null; $this->future = null;
$this->communicationChannel = null;
$this->signalChannel = null;
$this->oid = 0; $this->oid = 0;
$this->killed = false;
} }
/** /**
@ -150,7 +158,7 @@ final class Parallel implements Context
*/ */
public function isRunning(): bool public function isRunning(): bool
{ {
return $this->channel !== null; return $this->communicationChannel !== null && $this->signalChannel !== null;
} }
/** /**
@ -177,9 +185,13 @@ final class Parallel implements Context
$this->runtime = new Runtime(self::$autoloadPath); $this->runtime = new Runtime(self::$autoloadPath);
$id = \random_int(0, \PHP_INT_MAX); $cid = \random_int(\PHP_INT_MIN, \PHP_INT_MAX);
$this->future = $this->runtime->run(static function (string $uri, string $key, string $path, string $arguments): int { do {
$sid = \random_int(\PHP_INT_MIN, \PHP_INT_MAX);
} while ($sid === $cid);
$this->future = $this->runtime->run(static function (string $uri, string $channelKey, string $signalKey, string $path, string $arguments): int {
\define("AMP_CONTEXT", "parallel"); \define("AMP_CONTEXT", "parallel");
if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) { if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) {
@ -187,30 +199,53 @@ final class Parallel implements Context
return 1; return 1;
} }
$channel = new ChannelledSocket($socket, $socket); $communicationChannel = new ChannelledSocket($socket, $socket);
try { try {
Promise\wait($channel->send($key)); Promise\wait($communicationChannel->send($channelKey));
} catch (\Throwable $exception) { } catch (\Throwable $exception) {
\trigger_error("Could not send key to parent", E_USER_ERROR); \trigger_error("Could not send key to parent", E_USER_ERROR);
return 1; return 1;
} }
return Internal\ParallelRunner::execute($channel, $path, $arguments); if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) {
\trigger_error("Could not connect to IPC socket", E_USER_ERROR);
return 1;
}
$signalChannel = new ChannelledSocket($socket, $socket);
$signalChannel->unreference();
try {
Promise\wait($signalChannel->send($signalKey));
} catch (\Throwable $exception) {
\trigger_error("Could not send key to parent", E_USER_ERROR);
return 1;
}
Promise\rethrow(Internal\ParallelRunner::handleSignals($signalChannel));
return Internal\ParallelRunner::execute($communicationChannel, $path, $arguments);
}, [ }, [
$this->hub->getUri(), $this->hub->getUri(),
$this->hub->generateKey($id, self::KEY_LENGTH), $this->hub->generateKey($cid, self::KEY_LENGTH),
$this->hub->generateKey($sid, self::KEY_LENGTH),
$this->script, $this->script,
$arguments $arguments
]); ]);
return call(function () use ($id) { return call(function () use ($cid, $sid) {
try { try {
$this->channel = yield $this->hub->accept($id); $this->communicationChannel = yield $this->hub->accept($cid);
$this->signalChannel = yield $this->hub->accept($sid);
} catch (\Throwable $exception) { } catch (\Throwable $exception) {
$this->kill(); $this->kill();
throw new ContextException("Starting the parallel runtime failed", 0, $exception); throw new ContextException("Starting the parallel runtime failed", 0, $exception);
} }
if ($this->killed) {
$this->kill();
}
}); });
} }
@ -219,11 +254,12 @@ final class Parallel implements Context
*/ */
public function kill() public function kill()
{ {
if ($this->runtime !== null) { $this->killed = true;
if ($this->signalChannel !== null) {
try { try {
$this->runtime->kill(); $this->signalChannel->send(Internal\ParallelRunner::KILL);
} catch (ParallelException $exception) { $this->signalChannel->close();
// Ignore runtime being unusable since we're killing it anyway.
} finally { } finally {
$this->close(); $this->close();
} }
@ -235,11 +271,16 @@ final class Parallel implements Context
*/ */
private function close() private function close()
{ {
if ($this->channel !== null) { if ($this->communicationChannel !== null) {
$this->channel->close(); $this->communicationChannel->close();
} }
$this->channel = null; if ($this->signalChannel !== null) {
$this->signalChannel->close();
}
$this->communicationChannel = null;
$this->signalChannel = null;
} }
/** /**
@ -254,13 +295,13 @@ final class Parallel implements Context
*/ */
public function join(): Promise public function join(): Promise
{ {
if ($this->channel == null || $this->runtime === null || $this->future === null) { if ($this->communicationChannel === null) {
throw new StatusError('The thread has not been started or has already finished.'); throw new StatusError('The thread has not been started or has already finished.');
} }
return call(function () { return call(function () {
try { try {
$response = yield $this->channel->receive(); $response = yield $this->communicationChannel->receive();
if (!$response instanceof ExitResult) { if (!$response instanceof ExitResult) {
throw new SynchronizationError('Did not receive an exit result from thread.'); throw new SynchronizationError('Did not receive an exit result from thread.');
@ -288,12 +329,12 @@ final class Parallel implements Context
*/ */
public function receive(): Promise public function receive(): Promise
{ {
if ($this->channel === null) { if ($this->communicationChannel === null) {
throw new StatusError('The process has not been started.'); throw new StatusError('The process has not been started.');
} }
return call(function () { return call(function () {
$data = yield $this->channel->receive(); $data = yield $this->communicationChannel->receive();
if ($data instanceof ExitResult) { if ($data instanceof ExitResult) {
$data = $data->getResult(); $data = $data->getResult();
@ -312,7 +353,7 @@ final class Parallel implements Context
*/ */
public function send($data): Promise public function send($data): Promise
{ {
if ($this->channel === null) { if ($this->communicationChannel === null) {
throw new StatusError('The thread has not been started or has already finished.'); throw new StatusError('The thread has not been started or has already finished.');
} }
@ -320,6 +361,6 @@ final class Parallel implements Context
throw new \Error('Cannot send exit result objects.'); throw new \Error('Cannot send exit result objects.');
} }
return $this->channel->send($data); return $this->communicationChannel->send($data);
} }
} }