1
0
mirror of https://github.com/danog/amp.git synced 2024-11-27 04:24:42 +01:00

Fix buggy PromiseStream behavior

This commit is contained in:
Daniel Lowrey 2015-05-23 16:44:31 -04:00
parent a8322c8446
commit f024061d22
2 changed files with 40 additions and 48 deletions

View File

@ -3,32 +3,22 @@
namespace Amp;
class PromiseStream implements Streamable {
const NOTIFY = 0;
const WAIT = 1;
const ERROR = 2;
const DONE = 3;
private $promisors;
private $index = 0;
private $state;
/**
* @param \Amp\Promise $watchedPromise
*/
public function __construct(Promise $watchedPromise) {
$this->state = self::WAIT;
$this->promisors[] = new Deferred;
$watchedPromise->watch(function($data) {
$this->state = self::NOTIFY;
$this->promisors[$this->index + 1] = new Deferred;
$this->promisors[$this->index++]->succeed($data);
});
$watchedPromise->when(function($error, $result) {
if ($error) {
$this->state = self::ERROR;
$this->promisors[$this->index]->fail($error);
} else {
$this->state = self::DONE;
$this->promisors[$this->index]->succeed();
}
});
@ -48,23 +38,7 @@ class PromiseStream implements Streamable {
while ($this->promisors) {
$key = key($this->promisors);
yield $this->promisors[$key]->promise();
switch ($this->state) {
case self::NOTIFY:
$this->state = self::WAIT;
unset($this->promisors[$key]);
break;
case self::WAIT:
throw new \LogicException(
"Cannot advance stream: previous Promise not yet resolved"
);
break;
case self::DONE:
return;
case self::ERROR:
throw new \LogicException(
"Cannot advance stream: subject Promise failed"
);
}
unset($this->promisors[$key]);
}
}
@ -79,6 +53,7 @@ class PromiseStream implements Streamable {
$buffer[] = (yield $promise);
}
array_pop($buffer);
yield "return" => $buffer;
}
}

View File

@ -4,9 +4,47 @@ namespace Amp\Test;
use Amp\PromiseStream;
use Amp\NativeReactor;
use Amp\Deferred;
class PromiseStreamTest extends \PHPUnit_Framework_TestCase {
public function testStream() {
$endReached = false;
(new NativeReactor)->run(function($reactor) use (&$endReached) {
$def = new Deferred;
$msg = new PromiseStream($def->promise());
$i = 0;
$reactor->repeat(function($reactor, $watcherId) use ($def, &$i) {
$i++;
$def->update("test");
if ($i === 3) {
$def->succeed();
$reactor->cancel($watcherId);
}
}, 100);
$result = (yield $msg);
$this->assertSame(["test", "test", "test"], $result);
$endReached = true;
});
$this->assertTrue($endReached);
}
public function testStreamRetainsUpdatesUntilInitialized() {
$endReached = false;
(new NativeReactor)->run(function($reactor) use (&$endReached) {
$def = new Deferred;
$msg = new PromiseStream($def->promise());
$def->update("foo");
$def->update("bar");
$def->update("baz");
$def->succeed();
$result = (yield $msg);
$this->assertSame(["foo", "bar", "baz"], $result);
$endReached = true;
});
$this->assertTrue($endReached);
}
/**
* @expectedException \Exception
* @expectedExceptionMessage test
@ -28,25 +66,4 @@ class PromiseStreamTest extends \PHPUnit_Framework_TestCase {
$result = (yield new PromiseStream($promisor->promise()));
});
}
/**
* @expectedException \LogicException
* @expectedExceptionMessage Cannot advance stream: previous Promise not yet resolved
*/
public function testStreamThrowsIfPrematurelyIterated() {
$promisor = new PromisorPrivateImpl;
$stream = (new PromiseStream($promisor->promise()))->stream();
$stream->next();
}
/**
* @expectedException \LogicException
* @expectedExceptionMessage Cannot advance stream: subject Promise failed
*/
public function testStreamThrowsIfIteratedAfterFailure() {
$promisor = new PromisorPrivateImpl;
$promisor->fail(new \Exception("test"));
$stream = (new PromiseStream($promisor->promise()))->stream();
$stream->next();
}
}