1
0
mirror of https://github.com/danog/amp.git synced 2024-11-26 20:15:00 +01:00

Merge pull request #130 from amphp/cancellation

Implement cancellation primitives, resolves #125
This commit is contained in:
Aaron Piotrowski 2017-05-16 11:32:42 -05:00 committed by GitHub
commit be2a9f8c07
4 changed files with 229 additions and 0 deletions

32
lib/CancellationToken.php Normal file
View File

@ -0,0 +1,32 @@
<?php
namespace Amp;
/**
* Cancellation tokens are simple objects that allow registering handlers to subscribe to cancellation requests.
*/
interface CancellationToken {
/**
* Subscribes a new handler to be invoked on a cancellation request.
*
* This handler might be invoked immediately in case the token has already been cancelled. Returned generators will
* automatically be run as coroutines. Any unhandled exceptions will be throw into the event loop.
*
* @param callable $callback Callback to be invoked on a cancellation request. Will receive a `CancelledException`
* as first argument that may be used to fail the operation's promise.
*
* @return string Identifier that can be used to cancel the subscription.
*/
public function subscribe(callable $callback): string;
/**
* Unsubscribes a previously registered handler.
*
* The handler will no longer be called as long as this method isn't invoked from a subscribed callback.
*
* @param string $id
*
* @return void
*/
public function unsubscribe(string $id);
}

View File

@ -0,0 +1,113 @@
<?php
namespace Amp;
use React\Promise\PromiseInterface as ReactPromise;
use function Amp\Promise\rethrow;
/**
* A cancellation token source provides a mechanism to cancel operations.
*
* Cancellation of operation works by creating a cancellation token source and passing the corresponding token when
* starting the operation. To cancel the operation, invoke `CancellationTokenSource::cancel()`.
*
* Any operation can decide what to do on a cancellation request, it has "don't care" semantics. An operation SHOULD be
* aborted, but MAY continue. Example: A DNS client might continue to receive and cache the response, as the query has
* been sent anyway. An HTTP client would usually close a connection, but might not do so in case a response is close to
* be fully received to reuse the connection.
*
* **Example**
*
* ```php
* $cancellationTokenSource = new CancellationTokenSource;
*
* $response = yield $httpClient->request("https://example.com/stream");
* $responseBody = $response->getBody();
*
* while (($chunk = yield $response->read()) !== null) {
* // consume $chunk
*
* if ($noLongerInterested) {
* $cancellationTokenSource->cancel();
* break;
* }
* }
* ```
*
* @see CancellationToken
* @see CancelledException
*/
final class CancellationTokenSource {
private $token;
private $onCancel;
public function __construct() {
$this->token = new class($this->onCancel) implements CancellationToken {
private $nextId = "a";
private $callbacks = [];
private $exception = null;
public function __construct(&$onCancel) {
$onCancel = function (\Throwable $exception) {
$this->exception = $exception;
$callbacks = $this->callbacks;
$this->callbacks = [];
foreach ($callbacks as $callback) {
$this->invokeCallback($callback);
}
};
}
private function invokeCallback($callback) {
// No type declaration to prevent exception outside the try!
try {
$result = $callback($this->exception);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
Loop::defer(static function () use ($exception) {
throw $exception;
});
}
}
public function subscribe(callable $callback): string {
$id = $this->nextId++;
if ($this->exception) {
$this->invokeCallback($callback);
} else {
$this->callbacks[$id] = $callback;
}
return $id;
}
public function unsubscribe(string $id) {
unset($this->callbacks[$id]);
}
};
}
public function getToken(): CancellationToken {
return $this->token;
}
public function cancel() {
if ($this->onCancel === null) {
return;
}
$onCancel = $this->onCancel;
$this->onCancel = null;
$onCancel(new CancelledException);
}
}

View File

@ -0,0 +1,12 @@
<?php
namespace Amp;
/**
* Will be thrown in case an operation is cancelled.
*
* @see CancellationToken
* @see CancellationTokenSource
*/
class CancelledException extends \Exception {
}

72
test/CancellationTest.php Normal file
View File

@ -0,0 +1,72 @@
<?php
namespace Amp\Test;
use Amp\CancellationToken;
use Amp\CancellationTokenSource;
use Amp\Emitter;
use Amp\Loop;
use PHPUnit\Framework\TestCase;
use function Amp\asyncCall;
class CancellationTest extends TestCase {
private function createAsyncIterator(CancellationToken $cancellationToken) {
$emitter = new Emitter;
asyncCall(function () use ($emitter, $cancellationToken) {
$running = true;
$cancellationToken->subscribe(function () use (&$running) {
$running = false;
});
$i = 0;
while ($running) {
yield $emitter->emit($i++);
}
});
return $emitter->iterate();
}
public function testCancellationCancelsIterator() {
Loop::run(function () {
$cancellationSource = new CancellationTokenSource;
$iterator = $this->createAsyncIterator($cancellationSource->getToken());
$current = null;
while (yield $iterator->advance()) {
$current = $iterator->getCurrent();
$this->assertInternalType("int", $current);
if ($current === 3) {
$cancellationSource->cancel();
}
}
$this->assertSame(3, $current);
});
}
public function testUnsubscribeWorks() {
Loop::run(function () {
$cancellationSource = new CancellationTokenSource;
$id = $cancellationSource->getToken()->subscribe(function () {
$this->fail("Callback has been called");
});
$cancellationSource->getToken()->subscribe(function () {
$this->assertTrue(true);
});
$cancellationSource->getToken()->unsubscribe($id);
$cancellationSource->cancel();
});
}
}