1
0
mirror of https://github.com/danog/byte-stream.git synced 2024-11-26 20:04:51 +01:00
byte-stream/test/IteratorStreamTest.php
2017-05-07 22:10:16 +02:00

146 lines
3.8 KiB
PHP

<?php
namespace Amp\ByteStream\Test;
use Amp\ByteStream\IteratorStream;
use Amp\Emitter;
use Amp\Loop;
use Amp\PHPUnit\TestCase;
class IteratorStreamTest extends TestCase {
public function testBufferingAll() {
Loop::run(function () {
$values = ["abc", "def", "ghi"];
$emitter = new Emitter;
$stream = new IteratorStream($emitter->iterate());
foreach ($values as $value) {
$emitter->emit($value);
}
$emitter->complete();
$result = yield $stream;
$this->assertSame(\implode($values), $result);
});
}
public function testFullStreamConsumption() {
Loop::run(function () use (&$invoked) {
$values = ["abc", "def", "ghi"];
$emitter = new Emitter;
$stream = new IteratorStream($emitter->iterate());
foreach ($values as $value) {
$emitter->emit($value);
}
Loop::delay(5, function () use ($emitter) {
$emitter->complete();
});
$buffer = "";
while (($chunk = yield $stream->read()) !== null) {
$buffer .= $chunk;
}
$this->assertSame(\implode($values), $buffer);
$this->assertSame("", yield $stream);
});
}
public function testFastResolvingStream() {
Loop::run(function () {
$values = ["abc", "def", "ghi"];
$emitter = new Emitter;
$stream = new IteratorStream($emitter->iterate());
foreach ($values as $value) {
$emitter->emit($value);
}
$emitter->complete();
$emitted = [];
while (($chunk = yield $stream->read()) !== null) {
$emitted[] = $chunk;
}
$this->assertSame([\implode($values)], $emitted);
$this->assertSame(\implode($values), yield $stream);
});
}
public function testPartialStreamConsumption() {
Loop::run(function () {
$values = ["abc", "def", "ghi"];
$emitter = new Emitter;
$stream = new IteratorStream($emitter->iterate());
$emitter->emit($values[0]);
$chunk = yield $stream->read();
$this->assertSame(\array_shift($values), $chunk);
foreach ($values as $value) {
$emitter->emit($value);
}
$emitter->complete();
$this->assertSame(\implode($values), yield $stream);
});
}
public function testFailingStream() {
Loop::run(function () {
$exception = new \Exception;
$value = "abc";
$emitter = new Emitter;
$stream = new IteratorStream($emitter->iterate());
$emitter->emit($value);
$emitter->fail($exception);
try {
while (($chunk = yield $stream->read()) !== null) {
$this->assertSame($value, $chunk);
}
} catch (\Exception $reason) {
$this->assertSame($exception, $reason);
}
});
}
public function testEmptyStream() {
Loop::run(function () {
$emitter = new Emitter;
$emitter->complete();
$stream = new IteratorStream($emitter->iterate());
$this->assertNull(yield $stream->read());
});
}
public function testReadAfterCompletion() {
Loop::run(function () {
$value = "abc";
$emitter = new Emitter;
$stream = new IteratorStream($emitter->iterate());
$emitter->emit($value);
$emitter->complete();
$this->assertSame($value, yield $stream->read());
$this->assertNull(yield $stream->read());
});
}
}