mirror of
https://github.com/danog/byte-stream.git
synced 2024-11-30 04:19:23 +01:00
parent
704adf70cf
commit
4777508637
@ -4,6 +4,7 @@ namespace Amp\ByteStream;
|
||||
|
||||
use Amp\Coroutine;
|
||||
use Amp\Deferred;
|
||||
use Amp\Failure;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
|
||||
@ -49,6 +50,9 @@ class Message implements InputStream, Promise {
|
||||
/** @var bool True if the iterator has completed. */
|
||||
private $complete = false;
|
||||
|
||||
/** @var \Throwable Used to fail future reads on failure. */
|
||||
private $error;
|
||||
|
||||
/**
|
||||
* @param InputStream $source An iterator that only emits strings.
|
||||
*/
|
||||
@ -95,6 +99,15 @@ class Message implements InputStream, Promise {
|
||||
|
||||
if ($this->coroutine === null) {
|
||||
$this->coroutine = new Coroutine($this->consume());
|
||||
$this->coroutine->onResolve(function ($error) {
|
||||
if ($error) {
|
||||
$this->error = $error;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if ($this->error) {
|
||||
return new Failure($this->error);
|
||||
}
|
||||
|
||||
if ($this->buffer !== "") {
|
||||
|
@ -9,6 +9,7 @@ use Amp\ByteStream\PendingReadError;
|
||||
use Amp\Emitter;
|
||||
use Amp\Loop;
|
||||
use Amp\PHPUnit\TestCase;
|
||||
use Amp\PHPUnit\TestException;
|
||||
|
||||
class MessageTest extends TestCase {
|
||||
public function testBufferingAll() {
|
||||
@ -118,7 +119,7 @@ class MessageTest extends TestCase {
|
||||
|
||||
public function testFailingStream() {
|
||||
Loop::run(function () {
|
||||
$exception = new \Exception;
|
||||
$exception = new TestException;
|
||||
$value = "abc";
|
||||
|
||||
$emitter = new Emitter;
|
||||
@ -127,12 +128,17 @@ class MessageTest extends TestCase {
|
||||
$emitter->emit($value);
|
||||
$emitter->fail($exception);
|
||||
|
||||
$callable = $this->createCallback(1);
|
||||
|
||||
try {
|
||||
while (($chunk = yield $stream->read()) !== null) {
|
||||
$this->assertSame($value, $chunk);
|
||||
}
|
||||
} catch (\Exception $reason) {
|
||||
|
||||
$this->fail("No exception has been thrown");
|
||||
} catch (TestException $reason) {
|
||||
$this->assertSame($exception, $reason);
|
||||
$callable(); // <-- ensure this point is reached
|
||||
}
|
||||
});
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user