1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-29 20:29:00 +01:00

Improve error handling when sending and receiving

This commit is contained in:
Aaron Piotrowski 2020-02-10 12:11:45 -06:00
parent 72dd3a495f
commit 4ed05f6aac
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
4 changed files with 103 additions and 6 deletions

View File

@ -3,6 +3,7 @@
namespace Amp\Parallel\Context;
use Amp\Loop;
use Amp\Parallel\Sync\ChannelException;
use Amp\Parallel\Sync\ChannelledSocket;
use Amp\Parallel\Sync\ExitFailure;
use Amp\Parallel\Sync\ExitResult;
@ -10,6 +11,7 @@ use Amp\Parallel\Sync\ExitSuccess;
use Amp\Parallel\Sync\SerializationException;
use Amp\Parallel\Sync\SynchronizationError;
use Amp\Promise;
use Amp\TimeoutException;
use parallel\Runtime;
use function Amp\call;
@ -341,16 +343,20 @@ final class Parallel implements Context
public function receive(): Promise
{
if ($this->channel === null) {
throw new StatusError('The process has not been started.');
throw new StatusError('The thread has not been started.');
}
return call(function (): \Generator {
$data = yield $this->channel->receive();
try {
$data = yield $this->channel->receive();
} catch (ChannelException $e) {
throw new ContextException("The thread stopped responding, potentially due to a fatal error or calling exit", 0, $e);
}
if ($data instanceof ExitResult) {
$data = $data->getResult();
throw new SynchronizationError(\sprintf(
'Thread process unexpectedly exited with result of type: %s',
'Thread unexpectedly exited with result of type: %s',
\is_object($data) ? \get_class($data) : \gettype($data)
));
}
@ -372,7 +378,27 @@ final class Parallel implements Context
throw new \Error('Cannot send exit result objects.');
}
return $this->channel->send($data);
return call(function () use ($data): \Generator {
try {
return yield $this->channel->send($data);
} catch (ChannelException $e) {
if ($this->channel === null) {
throw new ContextException("The thread stopped responding, potentially due to a fatal error or calling exit", 0, $e);
}
try {
$data = yield Promise\timeout($this->join(), 100);
} catch (ContextException | ChannelException | TimeoutException $ex) {
$this->kill();
throw new ContextException("The thread stopped responding, potentially due to a fatal error or calling exit", 0, $e);
}
throw new SynchronizationError(\sprintf(
'Thread unexpectedly exited with result of type: %s',
\is_object($data) ? \get_class($data) : \gettype($data)
), 0, $e);
}
});
}
/**

View File

@ -10,6 +10,7 @@ use Amp\Process\Process as BaseProcess;
use Amp\Process\ProcessInputStream;
use Amp\Process\ProcessOutputStream;
use Amp\Promise;
use Amp\TimeoutException;
use function Amp\call;
final class Process implements Context
@ -221,7 +222,7 @@ final class Process implements Context
try {
$data = yield $this->channel->receive();
} catch (ChannelException $e) {
throw new ContextException("The context stopped responding, potentially due to a fatal error or calling exit", 0, $e);
throw new ContextException("The process stopped responding, potentially due to a fatal error or calling exit", 0, $e);
}
if ($data instanceof ExitResult) {
@ -249,7 +250,29 @@ final class Process implements Context
throw new \Error("Cannot send exit result objects");
}
return $this->channel->send($data);
return call(function () use ($data): \Generator {
try {
return yield $this->channel->send($data);
} catch (ChannelException $e) {
if ($this->channel === null) {
throw new ContextException("The process stopped responding, potentially due to a fatal error or calling exit", 0, $e);
}
try {
$data = yield Promise\timeout($this->join(), 100);
} catch (ContextException | ChannelException | TimeoutException $ex) {
if ($this->isRunning()) {
$this->kill();
}
throw new ContextException("The process stopped responding, potentially due to a fatal error or calling exit", 0, $e);
}
throw new SynchronizationError(\sprintf(
'Process unexpectedly exited with result of type: %s',
\is_object($data) ? \get_class($data) : \gettype($data)
), 0, $e);
}
});
}
/**

View File

@ -32,6 +32,26 @@ abstract class AbstractContextTest extends AsyncTestCase
yield $context->join();
}
public function testThrowingProcessOnReceive()
{
$this->expectException(PanicError::class);
$this->expectExceptionMessage('Test message');
$context = $this->createContext(__DIR__ . "/Fixtures/throwing-process.php");
yield $context->start();
yield $context->receive();
}
public function testThrowingProcessOnSend()
{
$this->expectException(PanicError::class);
$this->expectExceptionMessage('Test message');
$context = $this->createContext(__DIR__ . "/Fixtures/throwing-process.php");
yield $context->start();
yield $context->send(1);
}
public function testInvalidScriptPath()
{
$this->expectException(PanicError::class);
@ -118,4 +138,24 @@ abstract class AbstractContextTest extends AsyncTestCase
yield $context->start();
yield $context->join();
}
public function testExitingProcessOnReceive()
{
$this->expectException(ContextException::class);
$this->expectExceptionMessage('stopped responding');
$context = $this->createContext(__DIR__ . "/Fixtures/exiting-process.php");
yield $context->start();
yield $context->receive();
}
public function testExitingProcessOnSend()
{
$this->expectException(ContextException::class);
$this->expectExceptionMessage('stopped responding');
$context = $this->createContext(__DIR__ . "/Fixtures/exiting-process.php");
yield $context->start();
yield $context->send(1);
}
}

View File

@ -0,0 +1,8 @@
<?php
use Amp\Parallel\Sync\Channel;
use Amp\PHPUnit\TestException;
return function (Channel $channel) use ($argv) {
throw new TestException('Test message');
};