buffer = new Buffer($data); $this->reads = new \SplQueue; } /** * {@inheritdoc} */ public function isReadable() { return $this->readable; } /** * {@inheritdoc} */ public function isWritable() { return $this->writable; } /** * {@inheritdoc} */ public function close() { $this->readable = false; $this->writable = false; if (!$this->reads->isEmpty()) { $exception = new ClosedException("The stream was unexpectedly closed"); do { /** @var \Amp\Deferred $deferred */ list( , , $deferred) = $this->reads->shift(); $deferred->fail($exception); } while (!$this->reads->isEmpty()); } } /** * {@inheritdoc} */ public function read($bytes = null, $delimiter = null) { if ($bytes !== null) { $bytes = (int) $bytes; if ($bytes <= 0) { throw new \InvalidArgumentException("The number of bytes to read should be a positive integer or null"); } } if (!$this->readable) { return new Failure(new \LogicException("The stream has been closed")); } $deferred = new Deferred; $this->reads->push([$bytes, $delimiter, $deferred]); $this->checkPendingReads(); return $deferred->getAwaitable(); } /** * Returns bytes from the buffer based on the current length or current search byte. */ private function checkPendingReads() { while (!$this->buffer->isEmpty() && !$this->reads->isEmpty()) { /** * @var int|null $bytes * @var string|null $delimiter * @var \Amp\Deferred $deferred */ list($bytes, $delimiter, $deferred) = $this->reads->shift(); if ($delimiter !== null && ($position = $this->buffer->search($delimiter)) !== false) { $length = $position + \strlen($delimiter); if ($bytes === null || $length < $bytes) { $deferred->resolve($this->buffer->shift($length)); continue; } } if ($bytes !== null && $this->buffer->getLength() >= $bytes) { $deferred->resolve($this->buffer->shift($bytes)); continue; } if ($bytes === null) { $deferred->resolve($this->buffer->drain()); continue; } $this->reads->unshift([$bytes, $delimiter, $deferred]); return; } } /** * {@inheritdoc} */ public function write($data) { return $this->send($data, false); } /** * {@inheritdoc} */ public function end($data = '') { return $this->send($data, true); } /** * @param string $data * @param bool $end * * @return \Interop\Async\Awaitable */ protected function send($data, $end = false) { if (!$this->writable) { return new Failure(new \LogicException("The stream is not writable")); } if ($end) { $this->writable = false; } $this->buffer->push($data); $this->checkPendingReads(); return new Success(\strlen($data)); } }