From 20bb51e926b01b42da52c320da5d37971f0067f5 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sat, 17 Jun 2017 23:57:12 -0500 Subject: [PATCH] Improve worker error reporting --- lib/Worker/AbstractWorker.php | 46 ++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/lib/Worker/AbstractWorker.php b/lib/Worker/AbstractWorker.php index eef7805..c177b12 100644 --- a/lib/Worker/AbstractWorker.php +++ b/lib/Worker/AbstractWorker.php @@ -4,11 +4,9 @@ namespace Amp\Parallel\Worker; use Amp\Coroutine; use Amp\Deferred; +use Amp\Parallel\ContextException; use Amp\Parallel\StatusError; use Amp\Parallel\Strand; -use Amp\Parallel\Worker\Internal\Job; - -use Amp\Parallel\Worker\Internal\TaskResult; use Amp\Promise; /** @@ -25,7 +23,7 @@ abstract class AbstractWorker implements Worker { private $jobQueue = []; /** @var callable */ - private $when; + private $onResolve; /** * @param \Amp\Parallel\Strand $strand @@ -33,21 +31,21 @@ abstract class AbstractWorker implements Worker { public function __construct(Strand $strand) { $this->context = $strand; - $this->when = function ($exception, $data) { + $this->onResolve = function ($exception, $data) { if ($exception) { - $this->kill(); + $this->cancel($exception); return; } - if (!$data instanceof TaskResult) { - $this->kill(); + if (!$data instanceof Internal\TaskResult) { + $this->cancel(new ContextException("Context did not return a task result")); return; } $id = $data->getId(); if (!isset($this->jobQueue[$id])) { - $this->kill(); + $this->cancel(new ContextException("Job ID returned by context does not exist")); return; } @@ -55,7 +53,7 @@ abstract class AbstractWorker implements Worker { unset($this->jobQueue[$id]); if (!empty($this->jobQueue)) { - $this->context->receive()->onResolve($this->when); + $this->context->receive()->onResolve($this->onResolve); } $deferred->resolve($data->promise()); @@ -88,11 +86,11 @@ abstract class AbstractWorker implements Worker { */ public function enqueue(Task $task): Promise { if (!$this->context->isRunning()) { - throw new StatusError('The worker has not been started.'); + throw new StatusError("The worker has not been started"); } if ($this->shutdown) { - throw new StatusError('The worker has been shut down.'); + throw new StatusError("The worker has been shut down"); } return new Coroutine($this->doEnqueue($task)); @@ -111,16 +109,17 @@ abstract class AbstractWorker implements Worker { */ private function doEnqueue(Task $task): \Generator { if (empty($this->jobQueue)) { - $this->context->receive()->onResolve($this->when); + $this->context->receive()->onResolve($this->onResolve); } try { - $job = new Job($task); + $job = new Internal\Job($task); $this->jobQueue[$job->getId()] = $deferred = new Deferred; yield $this->context->send($job); } catch (\Throwable $exception) { - $this->kill(); - throw new WorkerException('Sending the task to the worker failed.', $exception); + $exception = new WorkerException("Sending the task to the worker failed", $exception); + $this->cancel($exception); + throw $exception; } return yield $deferred->promise(); @@ -131,7 +130,7 @@ abstract class AbstractWorker implements Worker { */ public function shutdown(): Promise { if (!$this->context->isRunning() || $this->shutdown) { - throw new StatusError('The worker is not running.'); + throw new StatusError("The worker is not running"); } return new Coroutine($this->doShutdown()); @@ -156,16 +155,17 @@ abstract class AbstractWorker implements Worker { * {@inheritdoc} */ public function kill() { - $this->cancelPending(); - $this->context->kill(); + $this->cancel(); } /** - * Cancels all pending tasks. + * Cancels all pending tasks and kills the context. + * + * @param \Throwable|null $exception Optional exception to be used as the previous exception. */ - private function cancelPending() { + protected function cancel(\Throwable $exception = null) { if (!empty($this->jobQueue)) { - $exception = new WorkerException('Worker was shut down.'); + $exception = new WorkerException('Worker was shut down', $exception); foreach ($this->jobQueue as $job) { $job->fail($exception); @@ -173,5 +173,7 @@ abstract class AbstractWorker implements Worker { $this->jobQueue = []; } + + $this->context->kill(); } }