1
0
mirror of https://github.com/danog/parallel.git synced 2025-01-22 14:01:14 +01:00

Improve process communication error handling

This commit is contained in:
Aaron Piotrowski 2018-10-23 22:14:51 -05:00
parent 14def89bff
commit fe293c09bb
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
4 changed files with 29 additions and 15 deletions

View File

@ -65,10 +65,12 @@ final class ProcessHub
try { try {
$received = yield Promise\timeout($channel->receive(), self::KEY_RECEIVE_TIMEOUT); $received = yield Promise\timeout($channel->receive(), self::KEY_RECEIVE_TIMEOUT);
} catch (TimeoutException $exception) { } catch (TimeoutException $exception) {
$channel->close();
return; // Ignore possible foreign connection attempt. return; // Ignore possible foreign connection attempt.
} }
if (!\is_string($received) || !isset($keys[$received])) { if (!\is_string($received) || !isset($keys[$received])) {
$channel->close();
return; // Ignore possible foreign connection attempt. return; // Ignore possible foreign connection attempt.
} }
@ -77,10 +79,6 @@ final class ProcessHub
$deferred = $acceptor[$pid]; $deferred = $acceptor[$pid];
unset($acceptor[$pid], $keys[$received]); unset($acceptor[$pid], $keys[$received]);
$deferred->resolve($channel); $deferred->resolve($channel);
if (empty($acceptor)) {
Loop::disable($watcher);
}
}); });
Loop::disable($this->watcher); Loop::disable($this->watcher);
@ -112,17 +110,19 @@ final class ProcessHub
Loop::enable($this->watcher); Loop::enable($this->watcher);
try { try {
return yield Promise\timeout($this->acceptor[$pid]->promise(), self::PROCESS_START_TIMEOUT); $channel = yield Promise\timeout($this->acceptor[$pid]->promise(), self::PROCESS_START_TIMEOUT);
} catch (TimeoutException $exception) { } catch (TimeoutException $exception) {
$key = \array_search($pid, $this->keys, true); $key = \array_search($pid, $this->keys, true);
\assert(\is_string($key), "Key for {$pid} not found");
unset($this->acceptor[$pid], $this->keys[$key]); unset($this->acceptor[$pid], $this->keys[$key]);
throw new ContextException("Starting the process timed out", 0, $exception);
} finally {
if (empty($this->acceptor)) { if (empty($this->acceptor)) {
Loop::disable($this->watcher); Loop::disable($this->watcher);
} }
throw new ContextException("Starting the process timed out", 0, $exception);
} }
return $channel;
}); });
} }
} }

View File

@ -184,11 +184,16 @@ final class Process implements Context
public function start(): Promise public function start(): Promise
{ {
return call(function () { return call(function () {
try {
$pid = yield $this->process->start(); $pid = yield $this->process->start();
yield $this->process->getStdin()->write($this->hub->generateKey($pid, self::KEY_LENGTH)); yield $this->process->getStdin()->write($this->hub->generateKey($pid, self::KEY_LENGTH));
$this->channel = yield $this->hub->accept($pid); $this->channel = yield $this->hub->accept($pid);
} catch (\Throwable $exception) {
$this->process->kill();
throw new ContextException("Staring the process failed", 0, $exception);
}
}); });
} }

View File

@ -2,6 +2,10 @@
namespace Amp\Parallel\Sync; namespace Amp\Parallel\Sync;
class SerializationException extends ChannelException class SerializationException extends \Exception
{ {
public function __construct(string $message, \Throwable $previous = null)
{
parent::__construct($message, 0, $previous);
}
} }

View File

@ -4,6 +4,7 @@ namespace Amp\Parallel\Worker;
use Amp\Parallel\Context\Context; use Amp\Parallel\Context\Context;
use Amp\Parallel\Context\StatusError; use Amp\Parallel\Context\StatusError;
use Amp\Parallel\Sync\ChannelException;
use Amp\Promise; use Amp\Promise;
use Amp\Success; use Amp\Success;
use function Amp\call; use function Amp\call;
@ -23,7 +24,7 @@ abstract class AbstractWorker implements Worker
private $pending; private $pending;
/** /**
* @param \Amp\Parallel\Context\Context $context * @param \Amp\Parallel\Context\Context $context A context running an instance of TaskRunner.
*/ */
public function __construct(Context $context) public function __construct(Context $context)
{ {
@ -78,8 +79,12 @@ abstract class AbstractWorker implements Worker
$job = new Internal\Job($task); $job = new Internal\Job($task);
try {
yield $this->context->send($job); yield $this->context->send($job);
$result = yield $this->context->receive(); $result = yield $this->context->receive();
} catch (ChannelException $exception) {
throw new WorkerException("Communicating with the worker failed", $exception);
}
if (!$result instanceof Internal\TaskResult) { if (!$result instanceof Internal\TaskResult) {
$this->kill(); $this->kill();