diff --git a/lib/Context/Parallel.php b/lib/Context/Parallel.php index 4740cb3..447b1d5 100644 --- a/lib/Context/Parallel.php +++ b/lib/Context/Parallel.php @@ -3,6 +3,7 @@ namespace Amp\Parallel\Context; use Amp\Loop; +use Amp\Parallel\Sync\ChannelException; use Amp\Parallel\Sync\ChannelledSocket; use Amp\Parallel\Sync\ExitFailure; use Amp\Parallel\Sync\ExitResult; @@ -10,6 +11,7 @@ use Amp\Parallel\Sync\ExitSuccess; use Amp\Parallel\Sync\SerializationException; use Amp\Parallel\Sync\SynchronizationError; use Amp\Promise; +use Amp\TimeoutException; use parallel\Runtime; use function Amp\call; @@ -341,16 +343,20 @@ final class Parallel implements Context public function receive(): Promise { if ($this->channel === null) { - throw new StatusError('The process has not been started.'); + throw new StatusError('The thread has not been started.'); } return call(function (): \Generator { - $data = yield $this->channel->receive(); + try { + $data = yield $this->channel->receive(); + } catch (ChannelException $e) { + throw new ContextException("The thread stopped responding, potentially due to a fatal error or calling exit", 0, $e); + } if ($data instanceof ExitResult) { $data = $data->getResult(); throw new SynchronizationError(\sprintf( - 'Thread process unexpectedly exited with result of type: %s', + 'Thread unexpectedly exited with result of type: %s', \is_object($data) ? \get_class($data) : \gettype($data) )); } @@ -372,7 +378,27 @@ final class Parallel implements Context throw new \Error('Cannot send exit result objects.'); } - return $this->channel->send($data); + return call(function () use ($data): \Generator { + try { + return yield $this->channel->send($data); + } catch (ChannelException $e) { + if ($this->channel === null) { + throw new ContextException("The thread stopped responding, potentially due to a fatal error or calling exit", 0, $e); + } + + try { + $data = yield Promise\timeout($this->join(), 100); + } catch (ContextException | ChannelException | TimeoutException $ex) { + $this->kill(); + throw new ContextException("The thread stopped responding, potentially due to a fatal error or calling exit", 0, $e); + } + + throw new SynchronizationError(\sprintf( + 'Thread unexpectedly exited with result of type: %s', + \is_object($data) ? \get_class($data) : \gettype($data) + ), 0, $e); + } + }); } /** diff --git a/lib/Context/Process.php b/lib/Context/Process.php index 55fdbb4..469ddca 100644 --- a/lib/Context/Process.php +++ b/lib/Context/Process.php @@ -10,6 +10,7 @@ use Amp\Process\Process as BaseProcess; use Amp\Process\ProcessInputStream; use Amp\Process\ProcessOutputStream; use Amp\Promise; +use Amp\TimeoutException; use function Amp\call; final class Process implements Context @@ -221,7 +222,7 @@ final class Process implements Context try { $data = yield $this->channel->receive(); } catch (ChannelException $e) { - throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e); + throw new ContextException("The process stopped responding, potentially due to a fatal error or calling exit", 0, $e); } if ($data instanceof ExitResult) { @@ -249,7 +250,29 @@ final class Process implements Context throw new \Error("Cannot send exit result objects"); } - return $this->channel->send($data); + return call(function () use ($data): \Generator { + try { + return yield $this->channel->send($data); + } catch (ChannelException $e) { + if ($this->channel === null) { + throw new ContextException("The process stopped responding, potentially due to a fatal error or calling exit", 0, $e); + } + + try { + $data = yield Promise\timeout($this->join(), 100); + } catch (ContextException | ChannelException | TimeoutException $ex) { + if ($this->isRunning()) { + $this->kill(); + } + throw new ContextException("The process stopped responding, potentially due to a fatal error or calling exit", 0, $e); + } + + throw new SynchronizationError(\sprintf( + 'Process unexpectedly exited with result of type: %s', + \is_object($data) ? \get_class($data) : \gettype($data) + ), 0, $e); + } + }); } /** diff --git a/test/Context/AbstractContextTest.php b/test/Context/AbstractContextTest.php index bca578e..094a2ac 100644 --- a/test/Context/AbstractContextTest.php +++ b/test/Context/AbstractContextTest.php @@ -32,6 +32,26 @@ abstract class AbstractContextTest extends AsyncTestCase yield $context->join(); } + public function testThrowingProcessOnReceive() + { + $this->expectException(PanicError::class); + $this->expectExceptionMessage('Test message'); + + $context = $this->createContext(__DIR__ . "/Fixtures/throwing-process.php"); + yield $context->start(); + yield $context->receive(); + } + + public function testThrowingProcessOnSend() + { + $this->expectException(PanicError::class); + $this->expectExceptionMessage('Test message'); + + $context = $this->createContext(__DIR__ . "/Fixtures/throwing-process.php"); + yield $context->start(); + yield $context->send(1); + } + public function testInvalidScriptPath() { $this->expectException(PanicError::class); @@ -118,4 +138,24 @@ abstract class AbstractContextTest extends AsyncTestCase yield $context->start(); yield $context->join(); } + + public function testExitingProcessOnReceive() + { + $this->expectException(ContextException::class); + $this->expectExceptionMessage('stopped responding'); + + $context = $this->createContext(__DIR__ . "/Fixtures/exiting-process.php"); + yield $context->start(); + yield $context->receive(); + } + + public function testExitingProcessOnSend() + { + $this->expectException(ContextException::class); + $this->expectExceptionMessage('stopped responding'); + + $context = $this->createContext(__DIR__ . "/Fixtures/exiting-process.php"); + yield $context->start(); + yield $context->send(1); + } } diff --git a/test/Context/Fixtures/throwing-process.php b/test/Context/Fixtures/throwing-process.php new file mode 100644 index 0000000..4aad57d --- /dev/null +++ b/test/Context/Fixtures/throwing-process.php @@ -0,0 +1,8 @@ +