1
0
mirror of https://github.com/danog/amp.git synced 2025-01-23 05:41:25 +01:00

Reduce overhead for timeout cancellation tokens

This commit is contained in:
Aaron Piotrowski 2021-08-30 00:07:10 -05:00
parent eab76ca303
commit 73fb73614e
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
6 changed files with 127 additions and 81 deletions

View File

@ -2,9 +2,6 @@
namespace Amp; namespace Amp;
use Revolt\EventLoop\Loop;
use function Revolt\EventLoop\defer;
/** /**
* A cancellation token source provides a mechanism to cancel operations. * A cancellation token source provides a mechanism to cancel operations.
* *
@ -40,72 +37,13 @@ use function Revolt\EventLoop\defer;
*/ */
final class CancellationTokenSource final class CancellationTokenSource
{ {
private Internal\CancellableToken $source;
private CancellationToken $token; private CancellationToken $token;
/** @var callable|null */
private $onCancel;
public function __construct() public function __construct()
{ {
$onCancel = &$this->onCancel; $this->source = new Internal\CancellableToken;
$this->token = new Internal\WrappedCancellationToken($this->source);
$this->token = new class($onCancel) implements CancellationToken {
private string $nextId = "a";
/** @var callable[] */
private array $callbacks = [];
/** @var \Throwable|null */
private ?\Throwable $exception = null;
/**
* @param callable|null $onCancel
* @param-out callable $onCancel
*/
public function __construct(?callable &$onCancel)
{
$onCancel = function (\Throwable $exception): void {
$this->exception = $exception;
$callbacks = $this->callbacks;
$this->callbacks = [];
foreach ($callbacks as $callback) {
Loop::queue($callback, $this->exception);
}
};
}
public function subscribe(callable $callback): string
{
$id = $this->nextId++;
if ($this->exception) {
Loop::queue($callback, $this->exception);
} else {
$this->callbacks[$id] = $callback;
}
return $id;
}
public function unsubscribe(string $id): void
{
unset($this->callbacks[$id]);
}
public function isRequested(): bool
{
return isset($this->exception);
}
public function throwIfRequested(): void
{
if (isset($this->exception)) {
throw $this->exception;
}
}
};
} }
public function getToken(): CancellationToken public function getToken(): CancellationToken
@ -116,14 +54,8 @@ final class CancellationTokenSource
/** /**
* @param \Throwable|null $previous Exception to be used as the previous exception to CancelledException. * @param \Throwable|null $previous Exception to be used as the previous exception to CancelledException.
*/ */
public function cancel(\Throwable $previous = null): void public function cancel(?\Throwable $previous = null): void
{ {
if ($this->onCancel === null) { $this->source->cancel($previous);
return;
}
$onCancel = $this->onCancel;
$this->onCancel = null;
$onCancel(new CancelledException($previous));
} }
} }

View File

@ -10,7 +10,7 @@ namespace Amp;
*/ */
class CancelledException extends \Exception class CancelledException extends \Exception
{ {
public function __construct(\Throwable $previous = null) public function __construct(?\Throwable $previous = null)
{ {
parent::__construct("The operation was cancelled", 0, $previous); parent::__construct("The operation was cancelled", 0, $previous);
} }

View File

@ -0,0 +1,69 @@
<?php
namespace Amp\Internal;
use Amp\CancellationToken;
use Amp\CancelledException;
use Revolt\EventLoop\Loop;
/**
* Cancellation Token with public cancellation method.
*
* @internal
*/
final class CancellableToken implements CancellationToken
{
private string $nextId = "a";
/** @var callable[] */
private array $callbacks = [];
/** @var \Throwable|null */
private ?\Throwable $exception = null;
public function cancel(?\Throwable $previous = null): void
{
if (isset($this->exception)) {
return;
}
$this->exception = new CancelledException($previous);
$callbacks = $this->callbacks;
$this->callbacks = [];
foreach ($callbacks as $callback) {
Loop::queue($callback, $this->exception);
}
}
public function subscribe(callable $callback): string
{
$id = $this->nextId++;
if ($this->exception) {
Loop::queue($callback, $this->exception);
} else {
$this->callbacks[$id] = $callback;
}
return $id;
}
public function unsubscribe(string $id): void
{
unset($this->callbacks[$id]);
}
public function isRequested(): bool
{
return isset($this->exception);
}
public function throwIfRequested(): void
{
if (isset($this->exception)) {
throw $this->exception;
}
}
}

View File

@ -0,0 +1,36 @@
<?php
namespace Amp\Internal;
use Amp\CancellationToken;
/**
* @internal
*/
final class WrappedCancellationToken implements CancellationToken
{
public function __construct(
private CancellationToken $token
) {
}
public function subscribe(callable $callback): string
{
return $this->token->subscribe($callback);
}
public function unsubscribe(string $id): void
{
$this->token->unsubscribe($id);
}
public function isRequested(): bool
{
return $this->token->isRequested();
}
public function throwIfRequested(): void
{
$this->token->throwIfRequested();
}
}

View File

@ -19,13 +19,19 @@ final class TimeoutCancellationToken implements CancellationToken
*/ */
public function __construct(float $timeout, string $message = "Operation timed out") public function __construct(float $timeout, string $message = "Operation timed out")
{ {
$source = new CancellationTokenSource; $this->token = $source = new Internal\CancellableToken;
$this->token = $source->getToken();
$trace = null; // Defined in case assertions are disabled.
\assert($trace = \debug_backtrace());
$trace = \debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS);
$this->watcher = Loop::delay($timeout, static function () use ($source, $message, $trace): void { $this->watcher = Loop::delay($timeout, static function () use ($source, $message, $trace): void {
$trace = Internal\formatStacktrace($trace); if ($trace) {
$source->cancel(new TimeoutException("$message\r\nTimeoutCancellationToken was created here:\r\n$trace")); $message .= \sprintf("\r\n%s was created here: %s", self::class, Internal\formatStacktrace($trace));
} else {
$message .= \sprintf(" (Enable assertions for a backtrace of the %s creation)", self::class);
}
$source->cancel(new TimeoutException($message));
}); });
Loop::unreference($this->watcher); Loop::unreference($this->watcher);

View File

@ -26,10 +26,13 @@ class TimeoutCancellationTokenTest extends AsyncTestCase
self::assertInstanceOf(TimeoutException::class, $exception->getPrevious()); self::assertInstanceOf(TimeoutException::class, $exception->getPrevious());
$message = $exception->getPrevious()->getMessage(); $message = $exception->getPrevious()->getMessage();
if ((int)ini_get('zend.assertions') > 0) {
self::assertStringContainsString('TimeoutCancellationToken was created here', $message); self::assertStringContainsString('TimeoutCancellationToken was created here', $message);
self::assertStringContainsString('TimeoutCancellationTokenTest.php:' . $line, $message); self::assertStringContainsString('TimeoutCancellationTokenTest.php:' . $line, $message);
} }
} }
}
public function testWatcherCancellation(): void public function testWatcherCancellation(): void
{ {