resource = $stream; $deferred = &$this->deferred; $readable = &$this->readable; $resource = &$this->resource; $this->watcher = Loop::onReadable($this->resource, static function ($watcher, $stream) use ( &$deferred, &$readable, &$resource, $chunkSize ) { if ($deferred === null) { return; } // Error reporting suppressed since fread() produces a warning if the stream unexpectedly closes. $data = @\fread($stream, $chunkSize); if ($data === false || ($data === '' && (\feof($stream) || !\is_resource($stream)))) { $readable = false; $resource = null; Loop::cancel($watcher); $data = null; // Stream closed, resolve read with null. } $temp = $deferred; $deferred = null; $temp->resolve($data); if ($deferred === null) { // Only disable watcher if no further read was requested. Loop::disable($watcher); } }); Loop::disable($this->watcher); } /** * Reads data from the stream. * * @return Promise Resolves with a string when new data is available or `null` if the stream has closed. * * @throws PendingReadError Thrown if another read operation is still pending. */ public function read(): Promise { if ($this->deferred !== null) { throw new PendingReadError; } if (!$this->readable) { return new Success; // Resolve with null on closed stream. } $this->deferred = new Deferred; Loop::enable($this->watcher); return $this->deferred->promise(); } /** * Closes the stream forcefully. Multiple `close()` calls are ignored. * * @return void */ public function close() { $this->resource = null; $this->readable = false; if ($this->deferred !== null) { $deferred = $this->deferred; $this->deferred = null; $deferred->resolve(null); } Loop::cancel($this->watcher); } /** * @return resource|null The stream resource or null if the stream has closed. */ public function getResource() { return $this->resource; } public function __destruct() { if ($this->resource !== null) { $this->close(); } } }