1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-30 04:39:01 +01:00

Add support for cancelling tasks

This commit is contained in:
Aaron Piotrowski 2019-09-04 18:13:05 -05:00
parent 2b418eb71d
commit 39442be6ca
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
17 changed files with 151 additions and 48 deletions

View File

@ -2,6 +2,8 @@
namespace Amp\Parallel\Worker;
use Amp\CancellationToken;
/**
* Task implementation dispatching a simple callable.
*/
@ -23,7 +25,7 @@ final class CallableTask implements Task
$this->args = $args;
}
public function run(Environment $environment)
public function run(Environment $environment, CancellationToken $token)
{
if ($this->callable instanceof \__PHP_Incomplete_Class) {
throw new \Error('When using a class instance as a callable, the class must be autoloadable');

View File

@ -2,6 +2,7 @@
namespace Amp\Parallel\Worker;
use Amp\CancellationToken;
use Amp\Parallel\Context\StatusError;
use Amp\Promise;
@ -145,11 +146,11 @@ final class DefaultPool implements Pool
* @throws \Amp\Parallel\Context\StatusError If the pool has been shutdown.
* @throws \Amp\Parallel\Worker\TaskException If the task throws an exception.
*/
public function enqueue(Task $task): Promise
public function enqueue(Task $task, ?CancellationToken $token = null): Promise
{
$worker = $this->pull();
$promise = $worker->enqueue($task);
$promise = $worker->enqueue($task, $token);
$promise->onResolve(function () use ($worker): void {
($this->push)($worker);
});

View File

@ -2,6 +2,7 @@
namespace Amp\Parallel\Worker\Internal;
use Amp\CancellationToken;
use Amp\Parallel\Worker\Task;
use Amp\Parallel\Worker\Worker;
use Amp\Promise;
@ -52,9 +53,9 @@ final class PooledWorker implements Worker
/**
* {@inheritdoc}
*/
public function enqueue(Task $task): Promise
public function enqueue(Task $task, ?CancellationToken $token = null): Promise
{
return $this->worker->enqueue($task);
return $this->worker->enqueue($task, $token);
}
/**

View File

@ -2,6 +2,8 @@
namespace Amp\Parallel\Worker;
use Amp\CancellationToken;
/**
* A runnable unit of execution.
*/
@ -10,11 +12,12 @@ interface Task
/**
* Runs the task inside the caller's context.
*
* Does not have to be a coroutine, can also be a regular function returning a value.
* Does not have to be a coroutine or return a promise, can also be a regular function returning a value.
*
* @param \Amp\Parallel\Worker\Environment
* @param Environment $environment
* @param CancellationToken $token
*
* @return mixed|\Amp\Promise|\Generator
*/
public function run(Environment $environment);
public function run(Environment $environment, CancellationToken $token);
}

View File

@ -2,6 +2,7 @@
namespace Amp\Parallel\Worker;
use Amp\CancellationTokenSource;
use Amp\Coroutine;
use Amp\Parallel\Sync\Channel;
use Amp\Parallel\Sync\SerializationException;
@ -10,10 +11,10 @@ use function Amp\call;
final class TaskRunner
{
/** @var \Amp\Parallel\Sync\Channel */
/** @var Channel */
private $channel;
/** @var \Amp\Parallel\Worker\Environment */
/** @var Environment */
private $environment;
public function __construct(Channel $channel, Environment $environment)
@ -25,7 +26,7 @@ final class TaskRunner
/**
* Runs the task runner, receiving tasks from the parent and sending the result of those tasks.
*
* @return \Amp\Promise
* @return \Amp\Promise<null>
*/
public function run(): Promise
{
@ -33,8 +34,6 @@ final class TaskRunner
}
/**
* @coroutine
*
* @return \Generator
*/
private function execute(): \Generator
@ -42,14 +41,28 @@ final class TaskRunner
$job = yield $this->channel->receive();
while ($job instanceof Internal\Job) {
$receive = $this->channel->receive();
$source = new CancellationTokenSource;
$resolved = false;
try {
$result = yield call([$job->getTask(), "run"], $this->environment);
$result = new Internal\TaskSuccess($job->getId(), $result);
$receive->onResolve(static function (?\Throwable $exception) use (&$resolved, $source): void {
if (!$resolved) {
$source->cancel($exception);
}
});
$result = new Internal\TaskSuccess(
$job->getId(),
yield call([$job->getTask(), "run"], $this->environment, $source->getToken())
);
} catch (\Throwable $exception) {
$result = new Internal\TaskFailure($job->getId(), $exception);
} finally {
$resolved = true;
}
$job = null; // Free memory from last job.
$job = $source = null; // Free memory from last job.
try {
yield $this->channel->send($result);
@ -60,9 +73,9 @@ final class TaskRunner
$result = null; // Free memory from last result.
$job = yield $this->channel->receive();
while (!($job = yield $receive) instanceof Internal\Job && $job !== null) {
$receive = $this->channel->receive();
}
}
return $job;
}
}

View File

@ -2,7 +2,9 @@
namespace Amp\Parallel\Worker;
use Amp\CancellationToken;
use Amp\Failure;
use Amp\NullCancellationToken;
use Amp\Parallel\Context\Context;
use Amp\Parallel\Context\StatusError;
use Amp\Parallel\Sync\ChannelException;
@ -52,7 +54,7 @@ abstract class TaskWorker implements Worker
yield $pending;
}
yield $context->send(0);
yield $context->send(null);
return yield $context->join();
}), self::SHUTDOWN_TIMEOUT));
} catch (\Throwable $exception) {
@ -82,13 +84,15 @@ abstract class TaskWorker implements Worker
/**
* {@inheritdoc}
*/
public function enqueue(Task $task): Promise
public function enqueue(Task $task, ?CancellationToken $token = null): Promise
{
if ($this->exitStatus) {
throw new StatusError("The worker has been shut down");
}
$promise = $this->pending = call(function () use ($task): \Generator {
$token = $token ?? new NullCancellationToken;
$promise = $this->pending = call(function () use ($task, $token): \Generator {
if ($this->pending) {
try {
yield $this->pending;
@ -109,7 +113,16 @@ abstract class TaskWorker implements Worker
try {
yield $this->context->send($job);
$result = yield $this->context->receive();
$id = $token->subscribe(function () use ($job) {
$this->context->send($job->getId());
});
try {
$result = yield $this->context->receive();
} finally {
$token->unsubscribe($id);
}
} catch (ChannelException $exception) {
try {
yield Promise\timeout($this->context->join(), self::ERROR_TIMEOUT);
@ -162,7 +175,7 @@ abstract class TaskWorker implements Worker
yield Promise\any([$this->pending]);
}
yield $this->context->send(0);
yield $this->context->send(null);
try {
return yield Promise\timeout($this->context->join(), self::SHUTDOWN_TIMEOUT);

View File

@ -2,6 +2,7 @@
namespace Amp\Parallel\Worker;
use Amp\CancellationToken;
use Amp\Promise;
/**
@ -27,10 +28,11 @@ interface Worker
* Enqueues a task to be executed by the worker.
*
* @param Task $task The task to enqueue.
* @param CancellationToken|null $token
*
* @return \Amp\Promise<mixed> Resolves with the return value of Task::run().
*/
public function enqueue(Task $task): Promise;
public function enqueue(Task $task, ?CancellationToken $token = null): Promise;
/**
* @return \Amp\Promise<int> Exit code.

View File

@ -2,6 +2,7 @@
namespace Amp\Parallel\Worker;
use Amp\CancellationToken;
use Amp\Loop;
use Amp\Promise;
@ -11,9 +12,9 @@ const LOOP_FACTORY_IDENTIFIER = WorkerFactory::class;
/**
* Gets or sets the global worker pool.
*
* @param \Amp\Parallel\Worker\Pool|null $pool A worker pool instance.
* @param Pool|null $pool A worker pool instance.
*
* @return \Amp\Parallel\Worker\Pool The global worker pool instance.
* @return Pool The global worker pool instance.
*/
function pool(Pool $pool = null): Pool
{
@ -33,20 +34,21 @@ function pool(Pool $pool = null): Pool
/**
* Enqueues a task to be executed by the global worker pool.
*
* @param Task $task The task to enqueue.
* @param Task $task The task to enqueue.
* @param CancellationToken|null $token
*
* @return Promise<mixed>
*/
function enqueue(Task $task): Promise
function enqueue(Task $task, ?CancellationToken $token = null): Promise
{
return pool()->enqueue($task);
return pool()->enqueue($task, $token);
}
/**
* Enqueues a callable to be executed by the global worker pool.
*
* @param callable $callable Callable needs to be serializable.
* @param mixed ...$args Arguments have to be serializable.
* @param mixed ...$args Arguments have to be serializable.
*
* @return Promise<mixed>
*/
@ -58,7 +60,7 @@ function enqueueCallable(callable $callable, ...$args)
/**
* Gets a worker from the global worker pool.
*
* @return \Amp\Parallel\Worker\Worker
* @return Worker
*/
function worker(): Worker
{
@ -68,7 +70,7 @@ function worker(): Worker
/**
* Creates a worker using the global worker factory.
*
* @return \Amp\Parallel\Worker\Worker
* @return Worker
*/
function create(): Worker
{
@ -78,9 +80,9 @@ function create(): Worker
/**
* Gets or sets the global worker factory.
*
* @param \Amp\Parallel\Worker\WorkerFactory|null $factory
* @param WorkerFactory|null $factory
*
* @return \Amp\Parallel\Worker\WorkerFactory
* @return WorkerFactory
*/
function factory(WorkerFactory $factory = null): WorkerFactory
{

View File

@ -2,6 +2,7 @@
namespace Amp\Parallel\Test\Worker;
use Amp\CancellationToken;
use Amp\Parallel\Context\StatusError;
use Amp\Parallel\Sync\PanicError;
use Amp\Parallel\Sync\SerializationException;
@ -12,10 +13,11 @@ use Amp\Parallel\Worker\TaskError;
use Amp\Parallel\Worker\TaskException;
use Amp\Parallel\Worker\WorkerException;
use Amp\PHPUnit\AsyncTestCase;
use Amp\TimeoutCancellationToken;
class NonAutoloadableTask implements Task
{
public function run(Environment $environment)
public function run(Environment $environment, CancellationToken $token)
{
return 1;
}
@ -229,7 +231,7 @@ abstract class AbstractWorkerTest extends AsyncTestCase
try {
yield $worker->enqueue(new class implements Task { // Anonymous classes are not serializable.
public function run(Environment $environment)
public function run(Environment $environment, CancellationToken $token)
{
}
});
@ -274,7 +276,7 @@ abstract class AbstractWorkerTest extends AsyncTestCase
$worker = $this->createWorker();
$promise1 = $worker->enqueue(new class implements Task { // Anonymous classes are not serializable.
public function run(Environment $environment)
public function run(Environment $environment, CancellationToken $token)
{
}
});
@ -301,7 +303,44 @@ abstract class AbstractWorkerTest extends AsyncTestCase
$worker = $this->createWorker(BasicEnvironment::class, __DIR__ . '/Fixtures/not-found.php');
$this->assertTrue(yield $worker->enqueue(new Fixtures\AutoloadTestTask));
yield $worker->enqueue(new Fixtures\AutoloadTestTask);
yield $worker->shutdown();
}
public function testCancellableTask()
{
$this->expectException(TaskException::class);
$this->expectExceptionMessage('Uncaught Amp\CancelledException in worker with message "The operation was cancelled" and code "0"');
$worker = $this->createWorker();
yield $worker->enqueue(new Fixtures\CancellingTask, new TimeoutCancellationToken(100));
yield $worker->shutdown();
}
public function testEnqueueAfterCancelledTask()
{
$worker = $this->createWorker();
try {
yield $worker->enqueue(new Fixtures\CancellingTask, new TimeoutCancellationToken(100));
$this->fail(TaskException::class . ' did not fail enqueue promise');
} catch (TaskException $exception) {
// Task should be cancelled, ignore this exception.
}
$this->assertTrue(yield $worker->enqueue(new Fixtures\ConstantTask));
yield $worker->shutdown();
}
public function testCancellingCompletedTask()
{
$worker = $this->createWorker();
$this->assertTrue(yield $worker->enqueue(new Fixtures\ConstantTask(), new TimeoutCancellationToken(100)));
yield $worker->shutdown();
}

View File

@ -2,12 +2,13 @@
namespace Amp\Parallel\Test\Worker\Fixtures;
use Amp\CancellationToken;
use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;
class AutoloadTestTask implements Task
{
public function run(Environment $environment): bool
public function run(Environment $environment, CancellationToken $token): bool
{
return \class_exists('CustomAutoloadClass', true);
}

View File

@ -0,0 +1,19 @@
<?php
namespace Amp\Parallel\Test\Worker\Fixtures;
use Amp\CancellationToken;
use Amp\Deferred;
use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;
use Amp\Promise;
class CancellingTask implements Task
{
public function run(Environment $environment, CancellationToken $token): Promise
{
$deferred = new Deferred;
$token->subscribe([$deferred, 'fail']);
return $deferred->promise();
}
}

View File

@ -2,12 +2,13 @@
namespace Amp\Parallel\Test\Worker\Fixtures;
use Amp\CancellationToken;
use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;
class ConstantTask implements Task
{
public function run(Environment $environment)
public function run(Environment $environment, CancellationToken $token): bool
{
return \defined("AMP_WORKER");
}

View File

@ -2,6 +2,7 @@
namespace Amp\Parallel\Test\Worker\Fixtures;
use Amp\CancellationToken;
use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;
@ -23,11 +24,12 @@ class FailingTask implements Task
* Runs the task inside the caller's context.
* Does not have to be a coroutine, can also be a regular function returning a value.
*
* @param \Amp\Parallel\Worker\Environment
* @param Environment $environment
* @param CancellationToken $token
*
* @return mixed|\Amp\Promise|\Generator
*/
public function run(Environment $environment)
public function run(Environment $environment, CancellationToken $token)
{
$previous = $this->previousExceptionType ? new $this->previousExceptionType : null;
throw new $this->exceptionType('Test', 0, $previous);

View File

@ -2,12 +2,13 @@
namespace Amp\Parallel\Test\Worker\Fixtures;
use Amp\CancellationToken;
use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;
class NonAutoloadableResultTask implements Task
{
public function run(Environment $environment)
public function run(Environment $environment, CancellationToken $token)
{
require __DIR__ . "/non-autoloadable-class.php";
return new NonAutoloadableClass;

View File

@ -2,6 +2,7 @@
namespace Amp\Parallel\Test\Worker\Fixtures;
use Amp\CancellationToken;
use Amp\Delayed;
use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;
@ -17,7 +18,7 @@ class TestTask implements Task
$this->delay = $delay;
}
public function run(Environment $environment)
public function run(Environment $environment, CancellationToken $token)
{
if ($this->delay) {
return new Delayed($this->delay, $this->returnValue);

View File

@ -2,12 +2,13 @@
namespace Amp\Parallel\Test\Worker\Fixtures;
use Amp\CancellationToken;
use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;
class UnserializableResultTask implements Task
{
public function run(Environment $environment)
public function run(Environment $environment, CancellationToken $token)
{
return function () {}; // Anonymous functions are not serializable.
}

View File

@ -2,6 +2,7 @@
namespace Amp\Parallel\Test\Worker;
use Amp\CancellationToken;
use Amp\Parallel\Worker;
use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Pool;
@ -35,7 +36,7 @@ class FunctionsTest extends AsyncTestCase
$pool = $this->createMock(Pool::class);
$pool->method('enqueue')
->will($this->returnCallback(function (Task $task): Promise {
return new Success($task->run($this->createMock(Environment::class)));
return new Success($task->run($this->createMock(Environment::class), $this->createMock(CancellationToken::class)));
}));
Worker\pool($pool);
@ -55,7 +56,7 @@ class FunctionsTest extends AsyncTestCase
$pool = $this->createMock(Pool::class);
$pool->method('enqueue')
->will($this->returnCallback(function (Task $task): Promise {
return new Success($task->run($this->createMock(Environment::class)));
return new Success($task->run($this->createMock(Environment::class), $this->createMock(CancellationToken::class)));
}));
Worker\pool($pool);