2015-05-18 20:25:33 +02:00
|
|
|
<?php
|
|
|
|
|
|
|
|
namespace Amp\Test;
|
|
|
|
|
|
|
|
use Amp\PromiseStream;
|
|
|
|
use Amp\NativeReactor;
|
2015-05-23 22:44:31 +02:00
|
|
|
use Amp\Deferred;
|
2015-05-18 20:25:33 +02:00
|
|
|
|
|
|
|
class PromiseStreamTest extends \PHPUnit_Framework_TestCase {
|
|
|
|
|
2015-05-23 22:44:31 +02:00
|
|
|
public function testStream() {
|
|
|
|
$endReached = false;
|
|
|
|
(new NativeReactor)->run(function($reactor) use (&$endReached) {
|
|
|
|
$def = new Deferred;
|
|
|
|
$msg = new PromiseStream($def->promise());
|
|
|
|
$i = 0;
|
|
|
|
$reactor->repeat(function($reactor, $watcherId) use ($def, &$i) {
|
|
|
|
$i++;
|
2015-06-15 19:17:27 +02:00
|
|
|
$def->update("test{$i}");
|
2015-05-23 22:44:31 +02:00
|
|
|
if ($i === 3) {
|
|
|
|
$def->succeed();
|
|
|
|
$reactor->cancel($watcherId);
|
|
|
|
}
|
|
|
|
}, 100);
|
2015-06-15 19:17:27 +02:00
|
|
|
|
|
|
|
$results = [];
|
|
|
|
foreach ($msg->stream() as $msgElement) {
|
|
|
|
$results[] = (yield $msgElement);
|
|
|
|
}
|
|
|
|
|
|
|
|
$this->assertSame(["test1", "test2", "test3", null], $results);
|
2015-05-23 22:44:31 +02:00
|
|
|
$endReached = true;
|
|
|
|
});
|
|
|
|
$this->assertTrue($endReached);
|
|
|
|
}
|
|
|
|
|
|
|
|
public function testStreamRetainsUpdatesUntilInitialized() {
|
|
|
|
$endReached = false;
|
|
|
|
(new NativeReactor)->run(function($reactor) use (&$endReached) {
|
|
|
|
$def = new Deferred;
|
|
|
|
$msg = new PromiseStream($def->promise());
|
|
|
|
$def->update("foo");
|
|
|
|
$def->update("bar");
|
|
|
|
$def->update("baz");
|
|
|
|
$def->succeed();
|
2015-06-15 19:17:27 +02:00
|
|
|
|
|
|
|
$results = [];
|
|
|
|
foreach ($msg->stream() as $msgElement) {
|
|
|
|
$results[] = (yield $msgElement);
|
|
|
|
}
|
|
|
|
$this->assertSame(["foo", "bar", "baz", null], $results);
|
2015-05-23 22:44:31 +02:00
|
|
|
$endReached = true;
|
|
|
|
});
|
|
|
|
$this->assertTrue($endReached);
|
|
|
|
}
|
|
|
|
|
2015-05-18 20:25:33 +02:00
|
|
|
/**
|
|
|
|
* @expectedException \Exception
|
|
|
|
* @expectedExceptionMessage test
|
|
|
|
*/
|
|
|
|
public function testStreamThrowsIfPromiseFails() {
|
|
|
|
(new NativeReactor)->run(function($reactor) {
|
2015-05-19 05:57:34 +02:00
|
|
|
$promisor = new PromisorPrivateImpl;
|
2015-05-18 20:25:33 +02:00
|
|
|
$reactor->repeat(function($reactor, $watcherId) use (&$i, $promisor) {
|
|
|
|
$i++;
|
|
|
|
$promisor->update($i);
|
|
|
|
if ($i === 3) {
|
|
|
|
$reactor->cancel($watcherId);
|
|
|
|
$promisor->fail(new \Exception(
|
|
|
|
"test"
|
|
|
|
));
|
|
|
|
}
|
|
|
|
}, 10);
|
|
|
|
|
2015-06-15 19:17:27 +02:00
|
|
|
$msg = new PromiseStream($promisor->promise());
|
|
|
|
|
|
|
|
$results = [];
|
|
|
|
foreach ($msg->stream() as $msgElement) {
|
|
|
|
$results[] = (yield $msgElement);
|
|
|
|
}
|
2015-05-18 20:25:33 +02:00
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|