1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-30 04:39:01 +01:00

Throw different exception for cancelled tasks

This commit is contained in:
Aaron Piotrowski 2019-09-10 10:52:43 -05:00
parent 39442be6ca
commit 10325d5e9e
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
5 changed files with 106 additions and 6 deletions

View File

@ -0,0 +1,47 @@
<?php
namespace Amp\Parallel\Worker\Internal;
use Amp\CancelledException;
use Amp\Failure;
use Amp\Parallel\Worker\TaskCancelledException;
use Amp\Promise;
/** @internal */
final class TaskCancelled extends TaskResult
{
/** @var string */
private $type;
/** @var int|string */
private $code;
/** @var array */
private $trace;
/** @var TaskFailure|null */
private $previous;
public function __construct(string $id, CancelledException $exception)
{
parent::__construct($id);
$this->type = \get_class($exception);
$this->code = $exception->getCode();
$this->trace = $exception->getTraceAsString();
if ($previous = $exception->getPrevious()) {
$this->previous = new TaskFailure($id, $previous);
}
}
public function promise(): Promise
{
$previous = $this->previous ? $this->previous->createException() : null;
return new Failure(new TaskCancelledException(
$this->type,
$this->trace,
$previous
));
}
}

View File

@ -50,7 +50,7 @@ final class TaskFailure extends TaskResult
return new Failure($this->createException()); return new Failure($this->createException());
} }
private function createException(): \Throwable public function createException(): \Throwable
{ {
$previous = $this->previous ? $this->previous->createException() : null; $previous = $this->previous ? $this->previous->createException() : null;

View File

@ -0,0 +1,47 @@
<?php
namespace Amp\Parallel\Worker;
use Amp\CancelledException;
final class TaskCancelledException extends CancelledException
{
/** @var string Class name of exception thrown from task. */
private $name;
/** @var string Stack trace of the exception thrown from task. */
private $trace;
/**
* @param string $name The exception class name.
* @param string $trace The exception stack trace.
* @param \Throwable|null $previous Previous exception.
*/
public function __construct(string $name, string $trace = '', \Throwable $previous = null)
{
parent::__construct($previous);
$this->name = $name;
$this->trace = $trace;
}
/**
* Returns the class name of the exception thrown from the task.
*
* @return string
*/
public function getName(): string
{
return $this->name;
}
/**
* Gets the stack trace at the point the exception was thrown in the task.
*
* @return string
*/
public function getWorkerTrace(): string
{
return $this->trace;
}
}

View File

@ -3,6 +3,7 @@
namespace Amp\Parallel\Worker; namespace Amp\Parallel\Worker;
use Amp\CancellationTokenSource; use Amp\CancellationTokenSource;
use Amp\CancelledException;
use Amp\Coroutine; use Amp\Coroutine;
use Amp\Parallel\Sync\Channel; use Amp\Parallel\Sync\Channel;
use Amp\Parallel\Sync\SerializationException; use Amp\Parallel\Sync\SerializationException;
@ -57,7 +58,11 @@ final class TaskRunner
yield call([$job->getTask(), "run"], $this->environment, $source->getToken()) yield call([$job->getTask(), "run"], $this->environment, $source->getToken())
); );
} catch (\Throwable $exception) { } catch (\Throwable $exception) {
$result = new Internal\TaskFailure($job->getId(), $exception); if ($exception instanceof CancelledException && $source->getToken()->isRequested()) {
$result = new Internal\TaskCancelled($job->getId(), $exception);
} else {
$result = new Internal\TaskFailure($job->getId(), $exception);
}
} finally { } finally {
$resolved = true; $resolved = true;
} }
@ -74,6 +79,7 @@ final class TaskRunner
$result = null; // Free memory from last result. $result = null; // Free memory from last result.
while (!($job = yield $receive) instanceof Internal\Job && $job !== null) { while (!($job = yield $receive) instanceof Internal\Job && $job !== null) {
// Ignore possible cancellation request received after task resolved.
$receive = $this->channel->receive(); $receive = $this->channel->receive();
} }
} }

View File

@ -9,6 +9,7 @@ use Amp\Parallel\Sync\SerializationException;
use Amp\Parallel\Worker\BasicEnvironment; use Amp\Parallel\Worker\BasicEnvironment;
use Amp\Parallel\Worker\Environment; use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task; use Amp\Parallel\Worker\Task;
use Amp\Parallel\Worker\TaskCancelledException;
use Amp\Parallel\Worker\TaskError; use Amp\Parallel\Worker\TaskError;
use Amp\Parallel\Worker\TaskException; use Amp\Parallel\Worker\TaskException;
use Amp\Parallel\Worker\WorkerException; use Amp\Parallel\Worker\WorkerException;
@ -310,8 +311,7 @@ abstract class AbstractWorkerTest extends AsyncTestCase
public function testCancellableTask() public function testCancellableTask()
{ {
$this->expectException(TaskException::class); $this->expectException(TaskCancelledException::class);
$this->expectExceptionMessage('Uncaught Amp\CancelledException in worker with message "The operation was cancelled" and code "0"');
$worker = $this->createWorker(); $worker = $this->createWorker();
@ -326,8 +326,8 @@ abstract class AbstractWorkerTest extends AsyncTestCase
try { try {
yield $worker->enqueue(new Fixtures\CancellingTask, new TimeoutCancellationToken(100)); yield $worker->enqueue(new Fixtures\CancellingTask, new TimeoutCancellationToken(100));
$this->fail(TaskException::class . ' did not fail enqueue promise'); $this->fail(TaskCancelledException::class . ' did not fail enqueue promise');
} catch (TaskException $exception) { } catch (TaskCancelledException $exception) {
// Task should be cancelled, ignore this exception. // Task should be cancelled, ignore this exception.
} }