1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-26 20:34:40 +01:00

Improve worker error reporting

This commit is contained in:
Aaron Piotrowski 2017-06-17 23:57:12 -05:00
parent 4546ef8b1d
commit 20bb51e926

View File

@ -4,11 +4,9 @@ namespace Amp\Parallel\Worker;
use Amp\Coroutine;
use Amp\Deferred;
use Amp\Parallel\ContextException;
use Amp\Parallel\StatusError;
use Amp\Parallel\Strand;
use Amp\Parallel\Worker\Internal\Job;
use Amp\Parallel\Worker\Internal\TaskResult;
use Amp\Promise;
/**
@ -25,7 +23,7 @@ abstract class AbstractWorker implements Worker {
private $jobQueue = [];
/** @var callable */
private $when;
private $onResolve;
/**
* @param \Amp\Parallel\Strand $strand
@ -33,21 +31,21 @@ abstract class AbstractWorker implements Worker {
public function __construct(Strand $strand) {
$this->context = $strand;
$this->when = function ($exception, $data) {
$this->onResolve = function ($exception, $data) {
if ($exception) {
$this->kill();
$this->cancel($exception);
return;
}
if (!$data instanceof TaskResult) {
$this->kill();
if (!$data instanceof Internal\TaskResult) {
$this->cancel(new ContextException("Context did not return a task result"));
return;
}
$id = $data->getId();
if (!isset($this->jobQueue[$id])) {
$this->kill();
$this->cancel(new ContextException("Job ID returned by context does not exist"));
return;
}
@ -55,7 +53,7 @@ abstract class AbstractWorker implements Worker {
unset($this->jobQueue[$id]);
if (!empty($this->jobQueue)) {
$this->context->receive()->onResolve($this->when);
$this->context->receive()->onResolve($this->onResolve);
}
$deferred->resolve($data->promise());
@ -88,11 +86,11 @@ abstract class AbstractWorker implements Worker {
*/
public function enqueue(Task $task): Promise {
if (!$this->context->isRunning()) {
throw new StatusError('The worker has not been started.');
throw new StatusError("The worker has not been started");
}
if ($this->shutdown) {
throw new StatusError('The worker has been shut down.');
throw new StatusError("The worker has been shut down");
}
return new Coroutine($this->doEnqueue($task));
@ -111,16 +109,17 @@ abstract class AbstractWorker implements Worker {
*/
private function doEnqueue(Task $task): \Generator {
if (empty($this->jobQueue)) {
$this->context->receive()->onResolve($this->when);
$this->context->receive()->onResolve($this->onResolve);
}
try {
$job = new Job($task);
$job = new Internal\Job($task);
$this->jobQueue[$job->getId()] = $deferred = new Deferred;
yield $this->context->send($job);
} catch (\Throwable $exception) {
$this->kill();
throw new WorkerException('Sending the task to the worker failed.', $exception);
$exception = new WorkerException("Sending the task to the worker failed", $exception);
$this->cancel($exception);
throw $exception;
}
return yield $deferred->promise();
@ -131,7 +130,7 @@ abstract class AbstractWorker implements Worker {
*/
public function shutdown(): Promise {
if (!$this->context->isRunning() || $this->shutdown) {
throw new StatusError('The worker is not running.');
throw new StatusError("The worker is not running");
}
return new Coroutine($this->doShutdown());
@ -156,16 +155,17 @@ abstract class AbstractWorker implements Worker {
* {@inheritdoc}
*/
public function kill() {
$this->cancelPending();
$this->context->kill();
$this->cancel();
}
/**
* Cancels all pending tasks.
* Cancels all pending tasks and kills the context.
*
* @param \Throwable|null $exception Optional exception to be used as the previous exception.
*/
private function cancelPending() {
protected function cancel(\Throwable $exception = null) {
if (!empty($this->jobQueue)) {
$exception = new WorkerException('Worker was shut down.');
$exception = new WorkerException('Worker was shut down', $exception);
foreach ($this->jobQueue as $job) {
$job->fail($exception);
@ -173,5 +173,7 @@ abstract class AbstractWorker implements Worker {
$this->jobQueue = [];
}
$this->context->kill();
}
}