1
0
mirror of https://github.com/danog/amp.git synced 2024-11-27 04:24:42 +01:00

Add PromiseStream to stream Promise updates without callback hell

This commit is contained in:
Daniel Lowrey 2015-05-18 14:25:33 -04:00
parent 02e85b29c6
commit 0973bba5ef
4 changed files with 163 additions and 0 deletions

64
lib/PromiseStream.php Normal file
View File

@ -0,0 +1,64 @@
<?php
namespace Amp;
class PromiseStream {
const NOTIFY = 0b00;
const WAIT = 0b01;
const ERROR = 0b10;
private $promisors;
private $index = 0;
private $state;
/**
* @param \Amp\Promise $watchedPromise
*/
public function __construct(Promise $watchedPromise) {
$this->state = self::WAIT;
$this->promisors[] = new PrivateFuture;
$watchedPromise->watch(function($data) {
$this->state = self::NOTIFY;
$this->promisors[$this->index + 1] = new PrivateFuture;
$this->promisors[$this->index++]->succeed($data);
});
$watchedPromise->when(function($error, $result) {
if ($error) {
$this->state = self::ERROR;
$this->promisors[$this->index]->fail($error);
}
});
}
/**
* Generate a stream of promises that may be iteratively yielded to await resolution
*
* NOTE: Only values sent to Promise::update() will be streamed. The final resolution
* value of the promise is not sent to the stream. If the Promise is failed that failure
* will resolve the stream's current Promise instance.
*
* @throws \LogicException if stream is in an un-iterable state
* @return \Generator
*/
public function stream() {
while ($this->promisors) {
$key = key($this->promisors);
yield $this->promisors[$key]->promise();
switch ($this->state) {
case self::NOTIFY:
$this->state = self::WAIT;
unset($this->promisors[$key]);
break;
case self::WAIT:
throw new \LogicException(
"Cannot advance stream: previous Promise not yet resolved"
);
break;
case self::ERROR:
throw new \LogicException(
"Cannot advance stream: subject Promise failed"
);
}
}
}
}

View File

@ -543,6 +543,10 @@ function __coroutinePromisify($cs) {
return $yielded;
}
if ($yielded instanceof PromiseStream) {
return resolve(__bufferPromiseStream($yielded), $cs->reactor);
}
// Allow custom promisifier callables to create Promise from
// the yielded key/value for extension use-cases
if ($cs->promisifier) {
@ -559,3 +563,11 @@ function __coroutinePromisify($cs) {
)
));
}
function __bufferPromiseStream(PromiseStream $stream) {
$buffer = [];
foreach ($stream->stream() as $promise) {
$buffer[] = (yield $promise);
}
yield "return" => $buffer;
}

View File

@ -5,6 +5,8 @@ namespace Amp\Test;
use Amp\NativeReactor;
use Amp\Success;
use Amp\Failure;
use Amp\PrivateFuture;
use Amp\PromiseStream;
class FunctionsTest extends \PHPUnit_Framework_TestCase {
@ -205,4 +207,36 @@ class FunctionsTest extends \PHPUnit_Framework_TestCase {
$this->assertSame(42, $result);
});
}
public function testCoroutineResolutionBuffersYieldedPromiseStream() {
(new NativeReactor)->run(function($reactor) {
$promisor = new PrivateFuture;
$reactor->repeat(function($reactor, $watcherId) use (&$i, $promisor) {
$i++;
$promisor->update($i);
if ($i === 3) {
$reactor->cancel($watcherId);
$promisor->succeed();
}
}, 10);
$result = (yield new PromiseStream($promisor->promise()));
$this->assertSame([1, 2, 3], $result);
});
}
/**
* @expectedException \Exception
* @expectedExceptionMessage test
*/
public function testCoroutineResolutionThrowsOnPromiseStreamBufferFailure() {
(new NativeReactor)->run(function($reactor) {
$promisor = new PrivateFuture;
$reactor->repeat(function($reactor, $watcherId) use (&$i, $promisor) {
$promisor->fail(new \Exception("test"));
}, 10);
$result = (yield new PromiseStream($promisor->promise()));
});
}
}

View File

@ -0,0 +1,53 @@
<?php
namespace Amp\Test;
use Amp\PromiseStream;
use Amp\NativeReactor;
use Amp\PrivateFuture;
class PromiseStreamTest extends \PHPUnit_Framework_TestCase {
/**
* @expectedException \Exception
* @expectedExceptionMessage test
*/
public function testStreamThrowsIfPromiseFails() {
(new NativeReactor)->run(function($reactor) {
$promisor = new PrivateFuture;
$reactor->repeat(function($reactor, $watcherId) use (&$i, $promisor) {
$i++;
$promisor->update($i);
if ($i === 3) {
$reactor->cancel($watcherId);
$promisor->fail(new \Exception(
"test"
));
}
}, 10);
$result = (yield new PromiseStream($promisor->promise()));
});
}
/**
* @expectedException \LogicException
* @expectedExceptionMessage Cannot advance stream: previous Promise not yet resolved
*/
public function testStreamThrowsIfPrematurelyIterated() {
$promisor = new PrivateFuture;
$stream = (new PromiseStream($promisor->promise()))->stream();
$stream->next();
}
/**
* @expectedException \LogicException
* @expectedExceptionMessage Cannot advance stream: subject Promise failed
*/
public function testStreamThrowsIfIteratedAfterFailure() {
$promisor = new PrivateFuture;
$promisor->fail(new \Exception("test"));
$stream = (new PromiseStream($promisor->promise()))->stream();
$stream->next();
}
}