getToken(); * * $response = yield $httpClient->request("https://example.com/pipeline", $token); * $responseBody = $response->getBody(); * * while (($chunk = yield $response->read()) !== null) { * // consume $chunk * * if ($noLongerInterested) { * $cancellationTokenSource->cancel(); * break; * } * } * ``` * * @see CancellationToken * @see CancelledException */ final class CancellationTokenSource { private CancellationToken $token; /** @var callable|null */ private $onCancel; public function __construct() { $onCancel = &$this->onCancel; $this->token = new class($onCancel) implements CancellationToken { private string $nextId = "a"; /** @var callable[] */ private array $callbacks = []; /** @var \Throwable|null */ private ?\Throwable $exception = null; /** * @param callable|null $onCancel * @param-out callable $onCancel */ public function __construct(?callable &$onCancel) { $onCancel = function (\Throwable $exception): void { $this->exception = $exception; $callbacks = $this->callbacks; $this->callbacks = []; foreach ($callbacks as $callback) { defer($callback, $this->exception); } }; } public function subscribe(callable $callback): string { $id = $this->nextId++; if ($this->exception) { defer($callback, $this->exception); } else { $this->callbacks[$id] = $callback; } return $id; } public function unsubscribe(string $id): void { unset($this->callbacks[$id]); } public function isRequested(): bool { return isset($this->exception); } public function throwIfRequested(): void { if (isset($this->exception)) { throw $this->exception; } } }; } public function getToken(): CancellationToken { return $this->token; } /** * @param \Throwable|null $previous Exception to be used as the previous exception to CancelledException. */ public function cancel(\Throwable $previous = null): void { if ($this->onCancel === null) { return; } $onCancel = $this->onCancel; $this->onCancel = null; $onCancel(new CancelledException($previous)); } }