request("https://example.com/stream"); * $responseBody = $response->getBody(); * * while (($chunk = yield $response->read()) !== null) { * // consume $chunk * * if ($noLongerInterested) { * $cancellationTokenSource->cancel(); * break; * } * } * ``` * * @see CancellationToken * @see CancelledException */ final class CancellationTokenSource { private $token; private $onCancel; public function __construct() { $this->token = new class($this->onCancel) implements CancellationToken { private $nextId = "a"; private $callbacks = []; private $exception = null; public function __construct(&$onCancel) { $onCancel = function (\Throwable $exception) { $this->exception = $exception; $callbacks = $this->callbacks; $this->callbacks = []; foreach ($callbacks as $callback) { $this->invokeCallback($callback); } }; } private function invokeCallback($callback) { // No type declaration to prevent exception outside the try! try { $result = $callback($this->exception); if ($result instanceof \Generator) { $result = new Coroutine($result); } if ($result instanceof Promise || $result instanceof ReactPromise) { rethrow($result); } } catch (\Throwable $exception) { Loop::defer(static function () use ($exception) { throw $exception; }); } } public function subscribe(callable $callback): string { $id = $this->nextId++; if ($this->exception) { $this->invokeCallback($callback); } else { $this->callbacks[$id] = $callback; } return $id; } public function unsubscribe(string $id) { unset($this->callbacks[$id]); } }; } public function getToken(): CancellationToken { return $this->token; } public function cancel() { if ($this->onCancel === null) { return; } $onCancel = $this->onCancel; $this->onCancel = null; $onCancel(new CancelledException); } }