1
0
mirror of https://github.com/danog/byte-stream.git synced 2024-12-02 09:17:50 +01:00
byte-stream/test/MessageTest.php

282 lines
7.4 KiB
PHP
Raw Permalink Normal View History

2017-04-13 16:05:37 +02:00
<?php
namespace Amp\ByteStream\Test;
use Amp\ByteStream\InMemoryStream;
use Amp\ByteStream\IteratorStream;
2017-05-11 01:20:34 +02:00
use Amp\ByteStream\Message;
use Amp\ByteStream\PendingReadError;
use Amp\Emitter;
use Amp\Loop;
2017-04-13 16:05:37 +02:00
use Amp\PHPUnit\TestCase;
use Amp\PHPUnit\TestException;
2017-04-13 16:05:37 +02:00
2018-09-21 22:45:13 +02:00
class MessageTest extends TestCase
{
public function testBufferingAll()
{
2017-04-13 16:05:37 +02:00
Loop::run(function () {
$values = ["abc", "def", "ghi"];
$emitter = new Emitter;
$stream = new Message(new IteratorStream($emitter->iterate()));
2017-04-13 16:05:37 +02:00
foreach ($values as $value) {
$emitter->emit($value);
}
2017-05-05 16:55:43 +02:00
$emitter->complete();
2017-04-13 16:05:37 +02:00
2017-05-11 01:04:10 +02:00
$this->assertSame(\implode($values), yield $stream);
2017-04-13 16:05:37 +02:00
});
}
2018-09-21 22:45:13 +02:00
public function testFullStreamConsumption()
{
2017-04-13 16:05:37 +02:00
Loop::run(function () use (&$invoked) {
$values = ["abc", "def", "ghi"];
$emitter = new Emitter;
$stream = new Message(new IteratorStream($emitter->iterate()));
2017-04-13 16:05:37 +02:00
foreach ($values as $value) {
$emitter->emit($value);
}
Loop::delay(5, function () use ($emitter) {
2017-05-05 16:55:43 +02:00
$emitter->complete();
2017-04-13 16:05:37 +02:00
});
$buffer = "";
2017-05-05 16:55:43 +02:00
while (($chunk = yield $stream->read()) !== null) {
$buffer .= $chunk;
2017-04-13 16:05:37 +02:00
}
$this->assertSame(\implode($values), $buffer);
2017-05-05 16:55:43 +02:00
$this->assertSame("", yield $stream);
2017-04-13 16:05:37 +02:00
});
}
2018-09-21 22:45:13 +02:00
public function testFastResolvingStream()
{
2017-04-13 16:05:37 +02:00
Loop::run(function () {
$values = ["abc", "def", "ghi"];
$emitter = new Emitter;
$stream = new Message(new IteratorStream($emitter->iterate()));
2017-04-13 16:05:37 +02:00
foreach ($values as $value) {
$emitter->emit($value);
}
2017-05-05 16:55:43 +02:00
$emitter->complete();
2017-04-13 16:05:37 +02:00
$emitted = [];
2017-05-05 16:55:43 +02:00
while (($chunk = yield $stream->read()) !== null) {
$emitted[] = $chunk;
2017-04-13 16:05:37 +02:00
}
2017-05-11 01:04:10 +02:00
$this->assertSame($values, $emitted);
$this->assertSame("", yield $stream);
});
}
2018-09-21 22:45:13 +02:00
public function testFastResolvingStreamBufferingOnly()
{
2017-05-11 01:04:10 +02:00
Loop::run(function () {
$values = ["abc", "def", "ghi"];
$emitter = new Emitter;
$stream = new Message(new IteratorStream($emitter->iterate()));
2017-05-11 01:04:10 +02:00
foreach ($values as $value) {
$emitter->emit($value);
}
$emitter->complete();
2017-05-05 16:55:43 +02:00
$this->assertSame(\implode($values), yield $stream);
2017-04-13 16:05:37 +02:00
});
}
2017-05-11 01:04:10 +02:00
2018-09-21 22:45:13 +02:00
public function testPartialStreamConsumption()
{
2017-04-13 16:05:37 +02:00
Loop::run(function () {
$values = ["abc", "def", "ghi"];
$emitter = new Emitter;
$stream = new Message(new IteratorStream($emitter->iterate()));
2017-05-05 16:55:43 +02:00
$emitter->emit($values[0]);
$chunk = yield $stream->read();
$this->assertSame(\array_shift($values), $chunk);
2017-04-13 16:05:37 +02:00
foreach ($values as $value) {
$emitter->emit($value);
}
2017-05-05 16:55:43 +02:00
$emitter->complete();
2017-04-13 16:05:37 +02:00
2017-05-05 16:55:43 +02:00
$this->assertSame(\implode($values), yield $stream);
2017-04-13 16:05:37 +02:00
});
}
2018-09-21 22:45:13 +02:00
public function testFailingStream()
{
2017-04-13 16:05:37 +02:00
Loop::run(function () {
$exception = new TestException;
2017-04-13 16:05:37 +02:00
$value = "abc";
$emitter = new Emitter;
$stream = new Message(new IteratorStream($emitter->iterate()));
2017-04-13 16:05:37 +02:00
$emitter->emit($value);
$emitter->fail($exception);
$callable = $this->createCallback(1);
2017-04-13 16:05:37 +02:00
try {
2017-05-05 16:55:43 +02:00
while (($chunk = yield $stream->read()) !== null) {
$this->assertSame($value, $chunk);
2017-04-13 16:05:37 +02:00
}
$this->fail("No exception has been thrown");
} catch (TestException $reason) {
2017-04-13 16:05:37 +02:00
$this->assertSame($exception, $reason);
$callable(); // <-- ensure this point is reached
2017-04-13 16:05:37 +02:00
}
});
}
2018-09-21 22:45:13 +02:00
public function testFailingStreamWithPendingRead()
{
Loop::run(function () {
$exception = new TestException;
$value = "abc";
$emitter = new Emitter;
$stream = new Message(new IteratorStream($emitter->iterate()));
$readPromise = $stream->read();
$emitter->fail($exception);
$callable = $this->createCallback(1);
try {
yield $readPromise;
$this->fail("No exception has been thrown");
} catch (TestException $reason) {
$this->assertSame($exception, $reason);
$callable(); // <-- ensure this point is reached
}
});
}
2018-09-21 22:45:13 +02:00
public function testEmptyStream()
{
2017-04-13 16:05:37 +02:00
Loop::run(function () {
$emitter = new Emitter;
2017-05-05 16:55:43 +02:00
$emitter->complete();
$stream = new Message(new IteratorStream($emitter->iterate()));
2017-04-13 16:05:37 +02:00
2017-05-05 16:55:43 +02:00
$this->assertNull(yield $stream->read());
2017-04-13 16:05:37 +02:00
});
}
2018-09-21 22:45:13 +02:00
public function testEmptyStringStream()
{
2017-10-06 22:25:55 +02:00
Loop::run(function () {
$value = "";
$emitter = new Emitter;
$stream = new Message(new IteratorStream($emitter->iterate()));
$emitter->emit($value);
$emitter->complete();
$this->assertSame("", yield $stream);
});
}
2018-09-21 22:45:13 +02:00
public function testReadAfterCompletion()
{
2017-04-13 16:05:37 +02:00
Loop::run(function () {
$value = "abc";
$emitter = new Emitter;
$stream = new Message(new IteratorStream($emitter->iterate()));
2017-04-13 16:05:37 +02:00
$emitter->emit($value);
2017-05-05 16:55:43 +02:00
$emitter->complete();
2017-04-13 16:05:37 +02:00
2017-05-05 16:55:43 +02:00
$this->assertSame($value, yield $stream->read());
$this->assertNull(yield $stream->read());
2017-04-13 16:05:37 +02:00
});
}
2018-09-21 22:45:13 +02:00
public function testGetInputStream()
{
Loop::run(function () {
$inputStream = new InMemoryStream("");
$message = new Message($inputStream);
$this->assertSame($inputStream, $message->getInputStream());
$this->assertSame("", yield $message->getInputStream()->read());
});
}
2018-09-21 22:45:13 +02:00
public function testPendingRead()
{
Loop::run(function () {
$emitter = new Emitter;
$stream = new Message(new IteratorStream($emitter->iterate()));
Loop::delay(0, function () use ($emitter) {
$emitter->emit("test");
});
$this->assertSame("test", yield $stream->read());
});
}
2018-09-21 22:45:13 +02:00
public function testPendingReadError()
{
Loop::run(function () {
$emitter = new Emitter;
$stream = new Message(new IteratorStream($emitter->iterate()));
$stream->read();
$this->expectException(PendingReadError::class);
$stream->read();
});
}
2018-09-23 21:30:08 +02:00
public function testFalsyValueInStreamWhenBuffering()
{
Loop::run(function () {
$emitter = new Emitter;
$emitter->emit("0");
$emitter->complete();
$message = new Message(new IteratorStream($emitter->iterate()));
$this->assertSame("0", yield $message);
});
}
public function testFalsyValueInStreamWhenStreaming()
{
Loop::run(function () {
$emitter = new Emitter;
$emitter->emit("0");
$message = new Message(new IteratorStream($emitter->iterate()));
$this->assertSame("0", yield $message->read());
});
}
2017-04-13 16:05:37 +02:00
}