1
0
mirror of https://github.com/danog/amp.git synced 2025-01-22 13:21:16 +01:00
amp/lib/CancellationTokenSource.php

143 lines
4.1 KiB
PHP
Raw Normal View History

<?php
namespace Amp;
use React\Promise\PromiseInterface as ReactPromise;
use function Amp\Promise\rethrow;
/**
* A cancellation token source provides a mechanism to cancel operations.
*
* Cancellation of operation works by creating a cancellation token source and passing the corresponding token when
* starting the operation. To cancel the operation, invoke `CancellationTokenSource::cancel()`.
*
* Any operation can decide what to do on a cancellation request, it has "don't care" semantics. An operation SHOULD be
* aborted, but MAY continue. Example: A DNS client might continue to receive and cache the response, as the query has
* been sent anyway. An HTTP client would usually close a connection, but might not do so in case a response is close to
* be fully received to reuse the connection.
*
* **Example**
*
* ```php
2017-05-16 21:46:52 +02:00
* $tokenSource = new CancellationTokenSource;
* $token = $tokenSource->getToken();
*
2017-05-16 21:46:52 +02:00
* $response = yield $httpClient->request("https://example.com/stream", $token);
* $responseBody = $response->getBody();
*
* while (($chunk = yield $response->read()) !== null) {
* // consume $chunk
*
* if ($noLongerInterested) {
* $cancellationTokenSource->cancel();
* break;
* }
* }
* ```
*
* @see CancellationToken
* @see CancelledException
*/
2018-06-18 20:00:01 +02:00
final class CancellationTokenSource
{
private $token;
private $onCancel;
2018-06-18 20:00:01 +02:00
public function __construct()
{
$this->token = new class($this->onCancel) implements CancellationToken {
/** @var string */
private $nextId = "a";
/** @var callable[] */
private $callbacks = [];
/** @var \Throwable|null */
private $exception = null;
2018-06-18 20:00:01 +02:00
public function __construct(&$onCancel)
{
$onCancel = function (\Throwable $exception) {
$this->exception = $exception;
$callbacks = $this->callbacks;
$this->callbacks = [];
foreach ($callbacks as $callback) {
$this->invokeCallback($callback);
}
};
}
2018-06-18 20:00:01 +02:00
private function invokeCallback($callback)
{
// No type declaration to prevent exception outside the try!
try {
$result = $callback($this->exception);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
Loop::defer(static function () use ($exception) {
throw $exception;
});
}
}
2018-06-18 20:00:01 +02:00
public function subscribe(callable $callback): string
{
$id = $this->nextId++;
if ($this->exception) {
$this->invokeCallback($callback);
} else {
$this->callbacks[$id] = $callback;
}
return $id;
}
2018-06-18 20:00:01 +02:00
public function unsubscribe(string $id)
{
unset($this->callbacks[$id]);
}
2018-06-18 20:00:01 +02:00
public function isRequested(): bool
{
return isset($this->exception);
}
2018-06-18 20:00:01 +02:00
public function throwIfRequested()
{
if (isset($this->exception)) {
throw $this->exception;
}
}
};
}
2018-06-18 20:00:01 +02:00
public function getToken(): CancellationToken
{
return $this->token;
}
2017-06-13 12:41:47 -05:00
/**
* @param \Throwable|null $previous Exception to be used as the previous exception to CancelledException.
*/
2018-06-18 20:00:01 +02:00
public function cancel(\Throwable $previous = null)
{
if ($this->onCancel === null) {
return;
}
$onCancel = $this->onCancel;
$this->onCancel = null;
2017-06-13 12:41:47 -05:00
$onCancel(new CancelledException($previous));
}
}