From 10325d5e9e817f85eb2c8d18902519b6a10974f6 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Tue, 10 Sep 2019 10:52:43 -0500 Subject: [PATCH] Throw different exception for cancelled tasks --- lib/Worker/Internal/TaskCancelled.php | 47 +++++++++++++++++++++++++++ lib/Worker/Internal/TaskFailure.php | 2 +- lib/Worker/TaskCancelledException.php | 47 +++++++++++++++++++++++++++ lib/Worker/TaskRunner.php | 8 ++++- test/Worker/AbstractWorkerTest.php | 8 ++--- 5 files changed, 106 insertions(+), 6 deletions(-) create mode 100644 lib/Worker/Internal/TaskCancelled.php create mode 100644 lib/Worker/TaskCancelledException.php diff --git a/lib/Worker/Internal/TaskCancelled.php b/lib/Worker/Internal/TaskCancelled.php new file mode 100644 index 0000000..cd0c95b --- /dev/null +++ b/lib/Worker/Internal/TaskCancelled.php @@ -0,0 +1,47 @@ +type = \get_class($exception); + $this->code = $exception->getCode(); + $this->trace = $exception->getTraceAsString(); + + if ($previous = $exception->getPrevious()) { + $this->previous = new TaskFailure($id, $previous); + } + } + + public function promise(): Promise + { + $previous = $this->previous ? $this->previous->createException() : null; + + return new Failure(new TaskCancelledException( + $this->type, + $this->trace, + $previous + )); + } +} diff --git a/lib/Worker/Internal/TaskFailure.php b/lib/Worker/Internal/TaskFailure.php index 5d4091e..b746624 100644 --- a/lib/Worker/Internal/TaskFailure.php +++ b/lib/Worker/Internal/TaskFailure.php @@ -50,7 +50,7 @@ final class TaskFailure extends TaskResult return new Failure($this->createException()); } - private function createException(): \Throwable + public function createException(): \Throwable { $previous = $this->previous ? $this->previous->createException() : null; diff --git a/lib/Worker/TaskCancelledException.php b/lib/Worker/TaskCancelledException.php new file mode 100644 index 0000000..9e7d2e2 --- /dev/null +++ b/lib/Worker/TaskCancelledException.php @@ -0,0 +1,47 @@ +name = $name; + $this->trace = $trace; + } + + /** + * Returns the class name of the exception thrown from the task. + * + * @return string + */ + public function getName(): string + { + return $this->name; + } + + /** + * Gets the stack trace at the point the exception was thrown in the task. + * + * @return string + */ + public function getWorkerTrace(): string + { + return $this->trace; + } +} diff --git a/lib/Worker/TaskRunner.php b/lib/Worker/TaskRunner.php index 69e78bf..f37b6c3 100644 --- a/lib/Worker/TaskRunner.php +++ b/lib/Worker/TaskRunner.php @@ -3,6 +3,7 @@ namespace Amp\Parallel\Worker; use Amp\CancellationTokenSource; +use Amp\CancelledException; use Amp\Coroutine; use Amp\Parallel\Sync\Channel; use Amp\Parallel\Sync\SerializationException; @@ -57,7 +58,11 @@ final class TaskRunner yield call([$job->getTask(), "run"], $this->environment, $source->getToken()) ); } catch (\Throwable $exception) { - $result = new Internal\TaskFailure($job->getId(), $exception); + if ($exception instanceof CancelledException && $source->getToken()->isRequested()) { + $result = new Internal\TaskCancelled($job->getId(), $exception); + } else { + $result = new Internal\TaskFailure($job->getId(), $exception); + } } finally { $resolved = true; } @@ -74,6 +79,7 @@ final class TaskRunner $result = null; // Free memory from last result. while (!($job = yield $receive) instanceof Internal\Job && $job !== null) { + // Ignore possible cancellation request received after task resolved. $receive = $this->channel->receive(); } } diff --git a/test/Worker/AbstractWorkerTest.php b/test/Worker/AbstractWorkerTest.php index dfd0ab2..562194f 100644 --- a/test/Worker/AbstractWorkerTest.php +++ b/test/Worker/AbstractWorkerTest.php @@ -9,6 +9,7 @@ use Amp\Parallel\Sync\SerializationException; use Amp\Parallel\Worker\BasicEnvironment; use Amp\Parallel\Worker\Environment; use Amp\Parallel\Worker\Task; +use Amp\Parallel\Worker\TaskCancelledException; use Amp\Parallel\Worker\TaskError; use Amp\Parallel\Worker\TaskException; use Amp\Parallel\Worker\WorkerException; @@ -310,8 +311,7 @@ abstract class AbstractWorkerTest extends AsyncTestCase public function testCancellableTask() { - $this->expectException(TaskException::class); - $this->expectExceptionMessage('Uncaught Amp\CancelledException in worker with message "The operation was cancelled" and code "0"'); + $this->expectException(TaskCancelledException::class); $worker = $this->createWorker(); @@ -326,8 +326,8 @@ abstract class AbstractWorkerTest extends AsyncTestCase try { yield $worker->enqueue(new Fixtures\CancellingTask, new TimeoutCancellationToken(100)); - $this->fail(TaskException::class . ' did not fail enqueue promise'); - } catch (TaskException $exception) { + $this->fail(TaskCancelledException::class . ' did not fail enqueue promise'); + } catch (TaskCancelledException $exception) { // Task should be cancelled, ignore this exception. }