mirror of
https://github.com/danog/byte-stream.git
synced 2024-11-30 04:19:23 +01:00
Merge pull request #9 from amphp/message-input-stream
Make Message accept an InputStream instead of an Iterator
This commit is contained in:
commit
ad249a8fc8
32
lib/IteratorStream.php
Normal file
32
lib/IteratorStream.php
Normal file
@ -0,0 +1,32 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\ByteStream;
|
||||
|
||||
use Amp\Deferred;
|
||||
use Amp\Iterator;
|
||||
use Amp\Promise;
|
||||
|
||||
class IteratorStream implements InputStream {
|
||||
private $iterator;
|
||||
|
||||
public function __construct(Iterator $iterator) {
|
||||
$this->iterator = $iterator;
|
||||
}
|
||||
|
||||
/** @inheritdoc */
|
||||
public function read(): Promise {
|
||||
$deferred = new Deferred;
|
||||
|
||||
$this->iterator->advance()->onResolve(function ($error, $hasNextElement) use ($deferred) {
|
||||
if ($error) {
|
||||
$deferred->fail($error);
|
||||
} elseif ($hasNextElement) {
|
||||
$deferred->resolve($this->iterator->getCurrent());
|
||||
} else {
|
||||
$deferred->resolve(null);
|
||||
}
|
||||
});
|
||||
|
||||
return $deferred->promise();
|
||||
}
|
||||
}
|
@ -4,28 +4,30 @@ namespace Amp\ByteStream;
|
||||
|
||||
use Amp\Coroutine;
|
||||
use Amp\Deferred;
|
||||
use Amp\Iterator;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
|
||||
/**
|
||||
* Creates a buffered message from an Iterator. The message can be consumed in chunks using the read() API or it may be
|
||||
* buffered and accessed in its entirety by waiting for the promise to resolve.
|
||||
* Creates a buffered message from an InputStream. The message can be consumed in chunks using the read() API or it may
|
||||
* be buffered and accessed in its entirety by waiting for the promise to resolve.
|
||||
*
|
||||
* Buffering Example:
|
||||
*
|
||||
* $stream = new Message($iterator); // $iterator is an instance of \Amp\Iterator emitting only strings.
|
||||
* $stream = new Message($inputStream);
|
||||
* $content = yield $stream;
|
||||
*
|
||||
* Streaming Example:
|
||||
*
|
||||
* $stream = new Message($iterator); // $iterator is an instance of \Amp\Iterator emitting only strings.
|
||||
* $stream = new Message($inputStream);
|
||||
*
|
||||
* while (($chunk = yield $stream->read()) !== null) {
|
||||
* // Immediately use $chunk, reducing memory consumption since the entire message is never buffered.
|
||||
* }
|
||||
*/
|
||||
class Message implements InputStream, Promise {
|
||||
/** @var InputStream */
|
||||
private $source;
|
||||
|
||||
/** @var string */
|
||||
private $buffer = "";
|
||||
|
||||
@ -45,15 +47,15 @@ class Message implements InputStream, Promise {
|
||||
private $complete = false;
|
||||
|
||||
/**
|
||||
* @param \Amp\Iterator $iterator An iterator that only emits strings.
|
||||
* @param InputStream $source An iterator that only emits strings.
|
||||
*/
|
||||
public function __construct(Iterator $iterator) {
|
||||
$this->coroutine = new Coroutine($this->iterate($iterator));
|
||||
public function __construct(InputStream $source) {
|
||||
$this->source = $source;
|
||||
}
|
||||
|
||||
private function iterate(Iterator $iterator): \Generator {
|
||||
while (yield $iterator->advance()) {
|
||||
$buffer = $this->buffer .= $iterator->getCurrent();
|
||||
private function consume(): \Generator {
|
||||
while (($chunk = yield $this->source->read()) !== null) {
|
||||
$buffer = $this->buffer .= $chunk;
|
||||
|
||||
if ($buffer === "") {
|
||||
continue; // Do not succeed reads with empty string.
|
||||
@ -62,12 +64,12 @@ class Message implements InputStream, Promise {
|
||||
$this->pendingRead = null;
|
||||
$this->buffer = "";
|
||||
$deferred->resolve($buffer);
|
||||
$buffer = ""; // Destroy last emitted chunk to free memory.
|
||||
} elseif (!$this->buffering) {
|
||||
$buffer = ""; // Destroy last emitted chunk to free memory.
|
||||
$this->backpressure = new Deferred;
|
||||
yield $this->backpressure->promise();
|
||||
}
|
||||
|
||||
$buffer = ""; // Destroy last emitted chunk to free memory.
|
||||
}
|
||||
|
||||
$this->complete = true;
|
||||
@ -87,6 +89,10 @@ class Message implements InputStream, Promise {
|
||||
throw new PendingReadError;
|
||||
}
|
||||
|
||||
if ($this->coroutine === null) {
|
||||
$this->coroutine = new Coroutine($this->consume());
|
||||
}
|
||||
|
||||
if ($this->buffer !== "") {
|
||||
$buffer = $this->buffer;
|
||||
$this->buffer = "";
|
||||
@ -114,6 +120,10 @@ class Message implements InputStream, Promise {
|
||||
public function onResolve(callable $onResolved) {
|
||||
$this->buffering = true;
|
||||
|
||||
if ($this->coroutine === null) {
|
||||
$this->coroutine = new Coroutine($this->consume());
|
||||
}
|
||||
|
||||
if ($this->backpressure) {
|
||||
$backpressure = $this->backpressure;
|
||||
$this->backpressure = null;
|
||||
@ -122,4 +132,16 @@ class Message implements InputStream, Promise {
|
||||
|
||||
$this->coroutine->onResolve($onResolved);
|
||||
}
|
||||
|
||||
/**
|
||||
* Exposes the source input stream.
|
||||
*
|
||||
* This might be required to resolve a promise with an InputStream, because promises in Amp can't be resolved with
|
||||
* other promises.
|
||||
*
|
||||
* @return InputStream
|
||||
*/
|
||||
public function getInputStream(): InputStream {
|
||||
return $this->source;
|
||||
}
|
||||
}
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
namespace Amp\ByteStream\Test;
|
||||
|
||||
use Amp\ByteStream\IteratorStream;
|
||||
use Amp\ByteStream\Message;
|
||||
use Amp\Emitter;
|
||||
use Amp\Loop;
|
||||
@ -13,7 +14,7 @@ class MessageTest extends TestCase {
|
||||
$values = ["abc", "def", "ghi"];
|
||||
|
||||
$emitter = new Emitter;
|
||||
$stream = new Message($emitter->iterate());
|
||||
$stream = new Message(new IteratorStream($emitter->iterate()));
|
||||
|
||||
foreach ($values as $value) {
|
||||
$emitter->emit($value);
|
||||
@ -30,7 +31,7 @@ class MessageTest extends TestCase {
|
||||
$values = ["abc", "def", "ghi"];
|
||||
|
||||
$emitter = new Emitter;
|
||||
$stream = new Message($emitter->iterate());
|
||||
$stream = new Message(new IteratorStream($emitter->iterate()));
|
||||
|
||||
foreach ($values as $value) {
|
||||
$emitter->emit($value);
|
||||
@ -55,7 +56,7 @@ class MessageTest extends TestCase {
|
||||
$values = ["abc", "def", "ghi"];
|
||||
|
||||
$emitter = new Emitter;
|
||||
$stream = new Message($emitter->iterate());
|
||||
$stream = new Message(new IteratorStream($emitter->iterate()));
|
||||
|
||||
foreach ($values as $value) {
|
||||
$emitter->emit($value);
|
||||
@ -78,7 +79,7 @@ class MessageTest extends TestCase {
|
||||
$values = ["abc", "def", "ghi"];
|
||||
|
||||
$emitter = new Emitter;
|
||||
$stream = new Message($emitter->iterate());
|
||||
$stream = new Message(new IteratorStream($emitter->iterate()));
|
||||
|
||||
foreach ($values as $value) {
|
||||
$emitter->emit($value);
|
||||
@ -95,7 +96,7 @@ class MessageTest extends TestCase {
|
||||
$values = ["abc", "def", "ghi"];
|
||||
|
||||
$emitter = new Emitter;
|
||||
$stream = new Message($emitter->iterate());
|
||||
$stream = new Message(new IteratorStream($emitter->iterate()));
|
||||
|
||||
$emitter->emit($values[0]);
|
||||
|
||||
@ -119,7 +120,7 @@ class MessageTest extends TestCase {
|
||||
$value = "abc";
|
||||
|
||||
$emitter = new Emitter;
|
||||
$stream = new Message($emitter->iterate());
|
||||
$stream = new Message(new IteratorStream($emitter->iterate()));
|
||||
|
||||
$emitter->emit($value);
|
||||
$emitter->fail($exception);
|
||||
@ -138,7 +139,7 @@ class MessageTest extends TestCase {
|
||||
Loop::run(function () {
|
||||
$emitter = new Emitter;
|
||||
$emitter->complete();
|
||||
$stream = new Message($emitter->iterate());
|
||||
$stream = new Message(new IteratorStream($emitter->iterate()));
|
||||
|
||||
$this->assertNull(yield $stream->read());
|
||||
});
|
||||
@ -149,7 +150,7 @@ class MessageTest extends TestCase {
|
||||
$value = "abc";
|
||||
|
||||
$emitter = new Emitter;
|
||||
$stream = new Message($emitter->iterate());
|
||||
$stream = new Message(new IteratorStream($emitter->iterate()));
|
||||
|
||||
$emitter->emit($value);
|
||||
$emitter->complete();
|
||||
|
Loading…
Reference in New Issue
Block a user