read()) !== null) { * // Immediately use $chunk, reducing memory consumption since the entire message is never buffered. * } */ class Message implements InputStream, Promise { /** @var InputStream */ private $source; /** @var string */ private $buffer = ""; /** @var \Amp\Deferred|null */ private $pendingRead; /** @var \Amp\Coroutine */ private $coroutine; /** @var bool True if onResolve() has been called. */ private $buffering = false; /** @var \Amp\Deferred|null */ private $backpressure; /** @var bool True if the iterator has completed. */ private $complete = false; /** * @param InputStream $source An iterator that only emits strings. */ public function __construct(InputStream $source) { $this->source = $source; } private function consume(): \Generator { while (($chunk = yield $this->source->read()) !== null) { $buffer = $this->buffer .= $chunk; if ($buffer === "") { continue; // Do not succeed reads with empty string. } elseif ($this->pendingRead) { $deferred = $this->pendingRead; $this->pendingRead = null; $this->buffer = ""; $deferred->resolve($buffer); $buffer = ""; // Destroy last emitted chunk to free memory. } elseif (!$this->buffering) { $buffer = ""; // Destroy last emitted chunk to free memory. $this->backpressure = new Deferred; yield $this->backpressure->promise(); } } $this->complete = true; if ($this->pendingRead) { $deferred = $this->pendingRead; $this->pendingRead = null; $deferred->resolve($this->buffer !== "" ? $this->buffer : null); $this->buffer = ""; } return $this->buffer; } final public function read(): Promise { if ($this->pendingRead) { throw new PendingReadError; } if ($this->coroutine === null) { $this->coroutine = new Coroutine($this->consume()); } if ($this->buffer !== "") { $buffer = $this->buffer; $this->buffer = ""; if ($this->backpressure) { $backpressure = $this->backpressure; $this->backpressure = null; $backpressure->resolve(); } return new Success($buffer); } if ($this->complete) { return new Success; } $this->pendingRead = new Deferred; return $this->pendingRead->promise(); } /** * {@inheritdoc} */ final public function onResolve(callable $onResolved) { $this->buffering = true; if ($this->coroutine === null) { $this->coroutine = new Coroutine($this->consume()); } if ($this->backpressure) { $backpressure = $this->backpressure; $this->backpressure = null; $backpressure->resolve(); } $this->coroutine->onResolve($onResolved); } /** * Exposes the source input stream. * * This might be required to resolve a promise with an InputStream, because promises in Amp can't be resolved with * other promises. * * @return InputStream */ final public function getInputStream(): InputStream { return $this->source; } }