diff --git a/lib/IteratorStream.php b/lib/IteratorStream.php new file mode 100644 index 0000000..791e4b2 --- /dev/null +++ b/lib/IteratorStream.php @@ -0,0 +1,32 @@ +iterator = $iterator; + } + + /** @inheritdoc */ + public function read(): Promise { + $deferred = new Deferred; + + $this->iterator->advance()->onResolve(function ($error, $hasNextElement) use ($deferred) { + if ($error) { + $deferred->fail($error); + } elseif ($hasNextElement) { + $deferred->resolve($this->iterator->getCurrent()); + } else { + $deferred->resolve(null); + } + }); + + return $deferred->promise(); + } +} diff --git a/lib/Message.php b/lib/Message.php index 016f340..5175578 100644 --- a/lib/Message.php +++ b/lib/Message.php @@ -4,28 +4,30 @@ namespace Amp\ByteStream; use Amp\Coroutine; use Amp\Deferred; -use Amp\Iterator; use Amp\Promise; use Amp\Success; /** - * Creates a buffered message from an Iterator. The message can be consumed in chunks using the read() API or it may be - * buffered and accessed in its entirety by waiting for the promise to resolve. + * Creates a buffered message from an InputStream. The message can be consumed in chunks using the read() API or it may + * be buffered and accessed in its entirety by waiting for the promise to resolve. * * Buffering Example: * - * $stream = new Message($iterator); // $iterator is an instance of \Amp\Iterator emitting only strings. + * $stream = new Message($inputStream); * $content = yield $stream; * * Streaming Example: * - * $stream = new Message($iterator); // $iterator is an instance of \Amp\Iterator emitting only strings. + * $stream = new Message($inputStream); * * while (($chunk = yield $stream->read()) !== null) { * // Immediately use $chunk, reducing memory consumption since the entire message is never buffered. * } */ class Message implements InputStream, Promise { + /** @var InputStream */ + private $source; + /** @var string */ private $buffer = ""; @@ -45,15 +47,15 @@ class Message implements InputStream, Promise { private $complete = false; /** - * @param \Amp\Iterator $iterator An iterator that only emits strings. + * @param InputStream $source An iterator that only emits strings. */ - public function __construct(Iterator $iterator) { - $this->coroutine = new Coroutine($this->iterate($iterator)); + public function __construct(InputStream $source) { + $this->source = $source; } - private function iterate(Iterator $iterator): \Generator { - while (yield $iterator->advance()) { - $buffer = $this->buffer .= $iterator->getCurrent(); + private function consume(): \Generator { + while (($chunk = yield $this->source->read()) !== null) { + $buffer = $this->buffer .= $chunk; if ($buffer === "") { continue; // Do not succeed reads with empty string. @@ -62,12 +64,12 @@ class Message implements InputStream, Promise { $this->pendingRead = null; $this->buffer = ""; $deferred->resolve($buffer); + $buffer = ""; // Destroy last emitted chunk to free memory. } elseif (!$this->buffering) { + $buffer = ""; // Destroy last emitted chunk to free memory. $this->backpressure = new Deferred; yield $this->backpressure->promise(); } - - $buffer = ""; // Destroy last emitted chunk to free memory. } $this->complete = true; @@ -87,6 +89,10 @@ class Message implements InputStream, Promise { throw new PendingReadError; } + if ($this->coroutine === null) { + $this->coroutine = new Coroutine($this->consume()); + } + if ($this->buffer !== "") { $buffer = $this->buffer; $this->buffer = ""; @@ -114,6 +120,10 @@ class Message implements InputStream, Promise { public function onResolve(callable $onResolved) { $this->buffering = true; + if ($this->coroutine === null) { + $this->coroutine = new Coroutine($this->consume()); + } + if ($this->backpressure) { $backpressure = $this->backpressure; $this->backpressure = null; @@ -122,4 +132,16 @@ class Message implements InputStream, Promise { $this->coroutine->onResolve($onResolved); } + + /** + * Exposes the source input stream. + * + * This might be required to resolve a promise with an InputStream, because promises in Amp can't be resolved with + * other promises. + * + * @return InputStream + */ + public function getInputStream(): InputStream { + return $this->source; + } } diff --git a/test/MessageTest.php b/test/MessageTest.php index 1366b65..04de2df 100644 --- a/test/MessageTest.php +++ b/test/MessageTest.php @@ -2,6 +2,7 @@ namespace Amp\ByteStream\Test; +use Amp\ByteStream\IteratorStream; use Amp\ByteStream\Message; use Amp\Emitter; use Amp\Loop; @@ -13,7 +14,7 @@ class MessageTest extends TestCase { $values = ["abc", "def", "ghi"]; $emitter = new Emitter; - $stream = new Message($emitter->iterate()); + $stream = new Message(new IteratorStream($emitter->iterate())); foreach ($values as $value) { $emitter->emit($value); @@ -30,7 +31,7 @@ class MessageTest extends TestCase { $values = ["abc", "def", "ghi"]; $emitter = new Emitter; - $stream = new Message($emitter->iterate()); + $stream = new Message(new IteratorStream($emitter->iterate())); foreach ($values as $value) { $emitter->emit($value); @@ -55,7 +56,7 @@ class MessageTest extends TestCase { $values = ["abc", "def", "ghi"]; $emitter = new Emitter; - $stream = new Message($emitter->iterate()); + $stream = new Message(new IteratorStream($emitter->iterate())); foreach ($values as $value) { $emitter->emit($value); @@ -78,7 +79,7 @@ class MessageTest extends TestCase { $values = ["abc", "def", "ghi"]; $emitter = new Emitter; - $stream = new Message($emitter->iterate()); + $stream = new Message(new IteratorStream($emitter->iterate())); foreach ($values as $value) { $emitter->emit($value); @@ -95,7 +96,7 @@ class MessageTest extends TestCase { $values = ["abc", "def", "ghi"]; $emitter = new Emitter; - $stream = new Message($emitter->iterate()); + $stream = new Message(new IteratorStream($emitter->iterate())); $emitter->emit($values[0]); @@ -119,7 +120,7 @@ class MessageTest extends TestCase { $value = "abc"; $emitter = new Emitter; - $stream = new Message($emitter->iterate()); + $stream = new Message(new IteratorStream($emitter->iterate())); $emitter->emit($value); $emitter->fail($exception); @@ -138,7 +139,7 @@ class MessageTest extends TestCase { Loop::run(function () { $emitter = new Emitter; $emitter->complete(); - $stream = new Message($emitter->iterate()); + $stream = new Message(new IteratorStream($emitter->iterate())); $this->assertNull(yield $stream->read()); }); @@ -149,7 +150,7 @@ class MessageTest extends TestCase { $value = "abc"; $emitter = new Emitter; - $stream = new Message($emitter->iterate()); + $stream = new Message(new IteratorStream($emitter->iterate())); $emitter->emit($value); $emitter->complete();