From 4ed05f6aac4eece061ac20a739a7b6834a82e5a5 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Mon, 10 Feb 2020 12:11:45 -0600 Subject: [PATCH 1/5] Improve error handling when sending and receiving --- lib/Context/Parallel.php | 34 +++++++++++++++--- lib/Context/Process.php | 27 +++++++++++++-- test/Context/AbstractContextTest.php | 40 ++++++++++++++++++++++ test/Context/Fixtures/throwing-process.php | 8 +++++ 4 files changed, 103 insertions(+), 6 deletions(-) create mode 100644 test/Context/Fixtures/throwing-process.php diff --git a/lib/Context/Parallel.php b/lib/Context/Parallel.php index 4740cb3..447b1d5 100644 --- a/lib/Context/Parallel.php +++ b/lib/Context/Parallel.php @@ -3,6 +3,7 @@ namespace Amp\Parallel\Context; use Amp\Loop; +use Amp\Parallel\Sync\ChannelException; use Amp\Parallel\Sync\ChannelledSocket; use Amp\Parallel\Sync\ExitFailure; use Amp\Parallel\Sync\ExitResult; @@ -10,6 +11,7 @@ use Amp\Parallel\Sync\ExitSuccess; use Amp\Parallel\Sync\SerializationException; use Amp\Parallel\Sync\SynchronizationError; use Amp\Promise; +use Amp\TimeoutException; use parallel\Runtime; use function Amp\call; @@ -341,16 +343,20 @@ final class Parallel implements Context public function receive(): Promise { if ($this->channel === null) { - throw new StatusError('The process has not been started.'); + throw new StatusError('The thread has not been started.'); } return call(function (): \Generator { - $data = yield $this->channel->receive(); + try { + $data = yield $this->channel->receive(); + } catch (ChannelException $e) { + throw new ContextException("The thread stopped responding, potentially due to a fatal error or calling exit", 0, $e); + } if ($data instanceof ExitResult) { $data = $data->getResult(); throw new SynchronizationError(\sprintf( - 'Thread process unexpectedly exited with result of type: %s', + 'Thread unexpectedly exited with result of type: %s', \is_object($data) ? \get_class($data) : \gettype($data) )); } @@ -372,7 +378,27 @@ final class Parallel implements Context throw new \Error('Cannot send exit result objects.'); } - return $this->channel->send($data); + return call(function () use ($data): \Generator { + try { + return yield $this->channel->send($data); + } catch (ChannelException $e) { + if ($this->channel === null) { + throw new ContextException("The thread stopped responding, potentially due to a fatal error or calling exit", 0, $e); + } + + try { + $data = yield Promise\timeout($this->join(), 100); + } catch (ContextException | ChannelException | TimeoutException $ex) { + $this->kill(); + throw new ContextException("The thread stopped responding, potentially due to a fatal error or calling exit", 0, $e); + } + + throw new SynchronizationError(\sprintf( + 'Thread unexpectedly exited with result of type: %s', + \is_object($data) ? \get_class($data) : \gettype($data) + ), 0, $e); + } + }); } /** diff --git a/lib/Context/Process.php b/lib/Context/Process.php index 55fdbb4..469ddca 100644 --- a/lib/Context/Process.php +++ b/lib/Context/Process.php @@ -10,6 +10,7 @@ use Amp\Process\Process as BaseProcess; use Amp\Process\ProcessInputStream; use Amp\Process\ProcessOutputStream; use Amp\Promise; +use Amp\TimeoutException; use function Amp\call; final class Process implements Context @@ -221,7 +222,7 @@ final class Process implements Context try { $data = yield $this->channel->receive(); } catch (ChannelException $e) { - throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e); + throw new ContextException("The process stopped responding, potentially due to a fatal error or calling exit", 0, $e); } if ($data instanceof ExitResult) { @@ -249,7 +250,29 @@ final class Process implements Context throw new \Error("Cannot send exit result objects"); } - return $this->channel->send($data); + return call(function () use ($data): \Generator { + try { + return yield $this->channel->send($data); + } catch (ChannelException $e) { + if ($this->channel === null) { + throw new ContextException("The process stopped responding, potentially due to a fatal error or calling exit", 0, $e); + } + + try { + $data = yield Promise\timeout($this->join(), 100); + } catch (ContextException | ChannelException | TimeoutException $ex) { + if ($this->isRunning()) { + $this->kill(); + } + throw new ContextException("The process stopped responding, potentially due to a fatal error or calling exit", 0, $e); + } + + throw new SynchronizationError(\sprintf( + 'Process unexpectedly exited with result of type: %s', + \is_object($data) ? \get_class($data) : \gettype($data) + ), 0, $e); + } + }); } /** diff --git a/test/Context/AbstractContextTest.php b/test/Context/AbstractContextTest.php index bca578e..094a2ac 100644 --- a/test/Context/AbstractContextTest.php +++ b/test/Context/AbstractContextTest.php @@ -32,6 +32,26 @@ abstract class AbstractContextTest extends AsyncTestCase yield $context->join(); } + public function testThrowingProcessOnReceive() + { + $this->expectException(PanicError::class); + $this->expectExceptionMessage('Test message'); + + $context = $this->createContext(__DIR__ . "/Fixtures/throwing-process.php"); + yield $context->start(); + yield $context->receive(); + } + + public function testThrowingProcessOnSend() + { + $this->expectException(PanicError::class); + $this->expectExceptionMessage('Test message'); + + $context = $this->createContext(__DIR__ . "/Fixtures/throwing-process.php"); + yield $context->start(); + yield $context->send(1); + } + public function testInvalidScriptPath() { $this->expectException(PanicError::class); @@ -118,4 +138,24 @@ abstract class AbstractContextTest extends AsyncTestCase yield $context->start(); yield $context->join(); } + + public function testExitingProcessOnReceive() + { + $this->expectException(ContextException::class); + $this->expectExceptionMessage('stopped responding'); + + $context = $this->createContext(__DIR__ . "/Fixtures/exiting-process.php"); + yield $context->start(); + yield $context->receive(); + } + + public function testExitingProcessOnSend() + { + $this->expectException(ContextException::class); + $this->expectExceptionMessage('stopped responding'); + + $context = $this->createContext(__DIR__ . "/Fixtures/exiting-process.php"); + yield $context->start(); + yield $context->send(1); + } } diff --git a/test/Context/Fixtures/throwing-process.php b/test/Context/Fixtures/throwing-process.php new file mode 100644 index 0000000..4aad57d --- /dev/null +++ b/test/Context/Fixtures/throwing-process.php @@ -0,0 +1,8 @@ + Date: Mon, 10 Feb 2020 12:33:55 -0600 Subject: [PATCH 2/5] Serialize exception trace arguments in child processes and threads --- composer.json | 2 + lib/Context/functions.php | 33 +++++++++++++ lib/Sync/ExitFailure.php | 6 +-- lib/Sync/functions.php | 75 +++++++++++++++++++++++++++++ lib/Worker/Internal/TaskFailure.php | 9 ++-- 5 files changed, 118 insertions(+), 7 deletions(-) create mode 100644 lib/Context/functions.php create mode 100644 lib/Sync/functions.php diff --git a/composer.json b/composer.json index 9676b62..f7bf817 100755 --- a/composer.json +++ b/composer.json @@ -38,6 +38,8 @@ "Amp\\Parallel\\": "lib" }, "files": [ + "lib/Context/functions.php", + "lib/Sync/functions.php", "lib/Worker/functions.php" ] }, diff --git a/lib/Context/functions.php b/lib/Context/functions.php new file mode 100644 index 0000000..7344a53 --- /dev/null +++ b/lib/Context/functions.php @@ -0,0 +1,33 @@ + + */ +function run($script): Promise +{ + if (Parallel::isSupported()) { + return Parallel::run($script); + } + + return Process::run($script); +} diff --git a/lib/Sync/ExitFailure.php b/lib/Sync/ExitFailure.php index f067bac..eb78b19 100644 --- a/lib/Sync/ExitFailure.php +++ b/lib/Sync/ExitFailure.php @@ -13,7 +13,7 @@ final class ExitFailure implements ExitResult /** @var int|string */ private $code; - /** @var array */ + /** @var string[] */ private $trace; /** @var self|null */ @@ -24,7 +24,7 @@ final class ExitFailure implements ExitResult $this->type = \get_class($exception); $this->message = $exception->getMessage(); $this->code = $exception->getCode(); - $this->trace = $exception->getTraceAsString(); + $this->trace = flattenThrowableBacktrace($exception); if ($previous = $exception->getPrevious()) { $this->previous = new self($previous); @@ -53,7 +53,7 @@ final class ExitFailure implements ExitResult $this->code, PanicError::class ), - $this->trace, + \implode("\n", $this->trace), $previous ); } diff --git a/lib/Sync/functions.php b/lib/Sync/functions.php new file mode 100644 index 0000000..e113f58 --- /dev/null +++ b/lib/Sync/functions.php @@ -0,0 +1,75 @@ +getTrace(); + + foreach ($trace as $call) { + if (isset($call['class'])) { + $name = $call['class'] . $call['type'] . $call['function']; + } else { + $name = $call['function']; + } + + $args = \implode(', ', \array_map(__NAMESPACE__ . '\\flattenArgument', $call['args'])); + + $output[] = \sprintf( + '#%d %s(%d): %s(%s)', + $counter++, + $call['file'] ?? '[internal function]', + $call['line'] ?? 0, + $name, + $args + ); + } + + return $output; +} + +/** + * @param mixed $value + * + * @return string Serializable string representation of $value for backtraces. + */ +function flattenArgument($value): string +{ + if ($value instanceof \Closure) { + $closureReflection = new \ReflectionFunction($value); + return \sprintf( + 'Closure(%s:%s)', + $closureReflection->getFileName(), + $closureReflection->getStartLine() + ); + } + + if (\is_object($value)) { + return \sprintf('Object(%s)', \get_class($value)); + } + + if (\is_array($value)) { + return 'Array([' . \implode(', ', \array_map(__FUNCTION__, $value)) . '])'; + } + + if (\is_resource($value)) { + return \sprintf('Resource(%s)', \get_resource_type($value)); + } + + if (\is_string($value)) { + return '"' . $value . '"'; + } + + if (\is_null($value)) { + return 'null'; + } + + return (string) $value; +} diff --git a/lib/Worker/Internal/TaskFailure.php b/lib/Worker/Internal/TaskFailure.php index 5d4091e..32e2fbf 100644 --- a/lib/Worker/Internal/TaskFailure.php +++ b/lib/Worker/Internal/TaskFailure.php @@ -3,6 +3,7 @@ namespace Amp\Parallel\Worker\Internal; use Amp\Failure; +use Amp\Parallel\Sync; use Amp\Parallel\Worker\TaskError; use Amp\Parallel\Worker\TaskException; use Amp\Promise; @@ -25,7 +26,7 @@ final class TaskFailure extends TaskResult /** @var int|string */ private $code; - /** @var array */ + /** @var string[] */ private $trace; /** @var self|null */ @@ -38,7 +39,7 @@ final class TaskFailure extends TaskResult $this->parent = $exception instanceof \Error ? self::PARENT_ERROR : self::PARENT_EXCEPTION; $this->message = $exception->getMessage(); $this->code = $exception->getCode(); - $this->trace = $exception->getTraceAsString(); + $this->trace = Sync\flattenThrowableBacktrace($exception); if ($previous = $exception->getPrevious()) { $this->previous = new self($id, $previous); @@ -61,7 +62,7 @@ final class TaskFailure extends TaskResult return new TaskError( $this->type, \sprintf($format, $this->type, $this->message, $this->code, TaskError::class), - $this->trace, + \implode("\n", $this->trace), $previous ); } @@ -69,7 +70,7 @@ final class TaskFailure extends TaskResult return new TaskException( $this->type, \sprintf($format, $this->type, $this->message, $this->code, TaskException::class), - $this->trace, + \implode("\n", $this->trace), $previous ); } From fc7dafe7a53a1b8615dbcc900f166f815d4aae9c Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Mon, 10 Feb 2020 17:30:50 -0600 Subject: [PATCH 3/5] Update travis build --- .travis.yml | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1cced46..ced3458 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,14 +4,13 @@ php: - 7.1 - 7.2 - 7.3 - - 7.4snapshot + - 7.4 - nightly sudo: false matrix: allow_failures: - - php: 7.4snapshot - php: nightly fast_finish: true @@ -19,15 +18,21 @@ env: - AMP_DEBUG=true before_install: - - phpenv config-rm xdebug.ini || echo "No xdebug config." + # xdebug causes hangs on PHP 7.1 and 7.2 + - if [ "$TRAVIS_PHP_VERSION" == "7.1" ] || [ "$TRAVIS_PHP_VERSION" == "7.2" ]; then + phpenv config-rm xdebug.ini || echo "No xdebug config."; + fi install: - composer update -n --prefer-dist - # pthreads is now only supported on PHP 7.2+ - - if [ "$TRAVIS_PHP_VERSION" != "7.1" ]; then + # ext-pthreads is only supported on PHP 7.2 and 7.3 + - if [ "$TRAVIS_PHP_VERSION" == "7.2" ] || [ "$TRAVIS_PHP_VERSION" == "7.3" ]; then travis/install-pthreads.sh; fi - - travis/install-parallel.sh; + # ext-parallel is only supported on PHP 7.2+ + - if [ "$TRAVIS_PHP_VERSION" != "7.1" ]; then + travis/install-parallel.sh; + fi - wget https://github.com/php-coveralls/php-coveralls/releases/download/v1.0.2/coveralls.phar - chmod +x coveralls.phar From dfb7b0149b4bd543deb1755ec9514e2800c18827 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Mon, 10 Feb 2020 17:40:34 -0600 Subject: [PATCH 4/5] Add test delay to allow process to crash Sending to fast can succeed before process crashes. --- test/Context/AbstractContextTest.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/Context/AbstractContextTest.php b/test/Context/AbstractContextTest.php index 094a2ac..6d210c8 100644 --- a/test/Context/AbstractContextTest.php +++ b/test/Context/AbstractContextTest.php @@ -49,6 +49,7 @@ abstract class AbstractContextTest extends AsyncTestCase $context = $this->createContext(__DIR__ . "/Fixtures/throwing-process.php"); yield $context->start(); + yield new Delayed(100); yield $context->send(1); } @@ -156,6 +157,7 @@ abstract class AbstractContextTest extends AsyncTestCase $context = $this->createContext(__DIR__ . "/Fixtures/exiting-process.php"); yield $context->start(); + yield new Delayed(500); yield $context->send(1); } } From 09ab72333ba830a24fb4ab1aed3696e2db3d821a Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Mon, 10 Feb 2020 17:40:52 -0600 Subject: [PATCH 5/5] Increase test timeout Parallel threads only check for kills every 250 ms, so this can fail. --- test/Worker/AbstractWorkerTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/Worker/AbstractWorkerTest.php b/test/Worker/AbstractWorkerTest.php index 0328a77..8246090 100644 --- a/test/Worker/AbstractWorkerTest.php +++ b/test/Worker/AbstractWorkerTest.php @@ -154,7 +154,7 @@ abstract class AbstractWorkerTest extends AsyncTestCase public function testKill() { - $this->setTimeout(250); + $this->setTimeout(500); $worker = $this->createWorker();