From 71745c6105c5c5b00d74eaa2a0a8493ea277e70a Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Sun, 14 May 2017 16:40:42 +0200 Subject: [PATCH] Make Message accept an InputStream instead of an Iterator The purpose of Message is to allow streaming and buffering of an InputStream with a simple API. Before this commit, Message served a second purpose: Converting an Amp\Iterator to an InputStream. This has been separated to allow ZlibInputStream and other InputStreams to be used as Message. Converting an Amp\Iterator to an InputStream is now possible using the new IteratorStream class. --- lib/IteratorStream.php | 32 ++++++++++++++++++++++++++++ lib/Message.php | 48 ++++++++++++++++++++++++++++++------------ test/MessageTest.php | 17 ++++++++------- 3 files changed, 76 insertions(+), 21 deletions(-) create mode 100644 lib/IteratorStream.php diff --git a/lib/IteratorStream.php b/lib/IteratorStream.php new file mode 100644 index 0000000..791e4b2 --- /dev/null +++ b/lib/IteratorStream.php @@ -0,0 +1,32 @@ +iterator = $iterator; + } + + /** @inheritdoc */ + public function read(): Promise { + $deferred = new Deferred; + + $this->iterator->advance()->onResolve(function ($error, $hasNextElement) use ($deferred) { + if ($error) { + $deferred->fail($error); + } elseif ($hasNextElement) { + $deferred->resolve($this->iterator->getCurrent()); + } else { + $deferred->resolve(null); + } + }); + + return $deferred->promise(); + } +} diff --git a/lib/Message.php b/lib/Message.php index 016f340..5175578 100644 --- a/lib/Message.php +++ b/lib/Message.php @@ -4,28 +4,30 @@ namespace Amp\ByteStream; use Amp\Coroutine; use Amp\Deferred; -use Amp\Iterator; use Amp\Promise; use Amp\Success; /** - * Creates a buffered message from an Iterator. The message can be consumed in chunks using the read() API or it may be - * buffered and accessed in its entirety by waiting for the promise to resolve. + * Creates a buffered message from an InputStream. The message can be consumed in chunks using the read() API or it may + * be buffered and accessed in its entirety by waiting for the promise to resolve. * * Buffering Example: * - * $stream = new Message($iterator); // $iterator is an instance of \Amp\Iterator emitting only strings. + * $stream = new Message($inputStream); * $content = yield $stream; * * Streaming Example: * - * $stream = new Message($iterator); // $iterator is an instance of \Amp\Iterator emitting only strings. + * $stream = new Message($inputStream); * * while (($chunk = yield $stream->read()) !== null) { * // Immediately use $chunk, reducing memory consumption since the entire message is never buffered. * } */ class Message implements InputStream, Promise { + /** @var InputStream */ + private $source; + /** @var string */ private $buffer = ""; @@ -45,15 +47,15 @@ class Message implements InputStream, Promise { private $complete = false; /** - * @param \Amp\Iterator $iterator An iterator that only emits strings. + * @param InputStream $source An iterator that only emits strings. */ - public function __construct(Iterator $iterator) { - $this->coroutine = new Coroutine($this->iterate($iterator)); + public function __construct(InputStream $source) { + $this->source = $source; } - private function iterate(Iterator $iterator): \Generator { - while (yield $iterator->advance()) { - $buffer = $this->buffer .= $iterator->getCurrent(); + private function consume(): \Generator { + while (($chunk = yield $this->source->read()) !== null) { + $buffer = $this->buffer .= $chunk; if ($buffer === "") { continue; // Do not succeed reads with empty string. @@ -62,12 +64,12 @@ class Message implements InputStream, Promise { $this->pendingRead = null; $this->buffer = ""; $deferred->resolve($buffer); + $buffer = ""; // Destroy last emitted chunk to free memory. } elseif (!$this->buffering) { + $buffer = ""; // Destroy last emitted chunk to free memory. $this->backpressure = new Deferred; yield $this->backpressure->promise(); } - - $buffer = ""; // Destroy last emitted chunk to free memory. } $this->complete = true; @@ -87,6 +89,10 @@ class Message implements InputStream, Promise { throw new PendingReadError; } + if ($this->coroutine === null) { + $this->coroutine = new Coroutine($this->consume()); + } + if ($this->buffer !== "") { $buffer = $this->buffer; $this->buffer = ""; @@ -114,6 +120,10 @@ class Message implements InputStream, Promise { public function onResolve(callable $onResolved) { $this->buffering = true; + if ($this->coroutine === null) { + $this->coroutine = new Coroutine($this->consume()); + } + if ($this->backpressure) { $backpressure = $this->backpressure; $this->backpressure = null; @@ -122,4 +132,16 @@ class Message implements InputStream, Promise { $this->coroutine->onResolve($onResolved); } + + /** + * Exposes the source input stream. + * + * This might be required to resolve a promise with an InputStream, because promises in Amp can't be resolved with + * other promises. + * + * @return InputStream + */ + public function getInputStream(): InputStream { + return $this->source; + } } diff --git a/test/MessageTest.php b/test/MessageTest.php index 1366b65..04de2df 100644 --- a/test/MessageTest.php +++ b/test/MessageTest.php @@ -2,6 +2,7 @@ namespace Amp\ByteStream\Test; +use Amp\ByteStream\IteratorStream; use Amp\ByteStream\Message; use Amp\Emitter; use Amp\Loop; @@ -13,7 +14,7 @@ class MessageTest extends TestCase { $values = ["abc", "def", "ghi"]; $emitter = new Emitter; - $stream = new Message($emitter->iterate()); + $stream = new Message(new IteratorStream($emitter->iterate())); foreach ($values as $value) { $emitter->emit($value); @@ -30,7 +31,7 @@ class MessageTest extends TestCase { $values = ["abc", "def", "ghi"]; $emitter = new Emitter; - $stream = new Message($emitter->iterate()); + $stream = new Message(new IteratorStream($emitter->iterate())); foreach ($values as $value) { $emitter->emit($value); @@ -55,7 +56,7 @@ class MessageTest extends TestCase { $values = ["abc", "def", "ghi"]; $emitter = new Emitter; - $stream = new Message($emitter->iterate()); + $stream = new Message(new IteratorStream($emitter->iterate())); foreach ($values as $value) { $emitter->emit($value); @@ -78,7 +79,7 @@ class MessageTest extends TestCase { $values = ["abc", "def", "ghi"]; $emitter = new Emitter; - $stream = new Message($emitter->iterate()); + $stream = new Message(new IteratorStream($emitter->iterate())); foreach ($values as $value) { $emitter->emit($value); @@ -95,7 +96,7 @@ class MessageTest extends TestCase { $values = ["abc", "def", "ghi"]; $emitter = new Emitter; - $stream = new Message($emitter->iterate()); + $stream = new Message(new IteratorStream($emitter->iterate())); $emitter->emit($values[0]); @@ -119,7 +120,7 @@ class MessageTest extends TestCase { $value = "abc"; $emitter = new Emitter; - $stream = new Message($emitter->iterate()); + $stream = new Message(new IteratorStream($emitter->iterate())); $emitter->emit($value); $emitter->fail($exception); @@ -138,7 +139,7 @@ class MessageTest extends TestCase { Loop::run(function () { $emitter = new Emitter; $emitter->complete(); - $stream = new Message($emitter->iterate()); + $stream = new Message(new IteratorStream($emitter->iterate())); $this->assertNull(yield $stream->read()); }); @@ -149,7 +150,7 @@ class MessageTest extends TestCase { $value = "abc"; $emitter = new Emitter; - $stream = new Message($emitter->iterate()); + $stream = new Message(new IteratorStream($emitter->iterate())); $emitter->emit($value); $emitter->complete();