mirror of
https://github.com/danog/amp.git
synced 2024-11-27 12:35:02 +01:00
44 lines
1.3 KiB
PHP
44 lines
1.3 KiB
PHP
<?php
|
|
|
|
namespace Amp;
|
|
|
|
class PromiseStream {
|
|
private $promisors;
|
|
private $index = 0;
|
|
|
|
/**
|
|
* @param \Amp\Promise $watchedPromise
|
|
*/
|
|
public function __construct(Promise $watchedPromise) {
|
|
$this->promisors[] = new Deferred;
|
|
$watchedPromise->watch(function($data) {
|
|
$this->promisors[$this->index + 1] = new Deferred;
|
|
$this->promisors[$this->index++]->succeed($data);
|
|
});
|
|
$watchedPromise->when(function($error, $result) {
|
|
if ($error) {
|
|
$this->promisors[$this->index]->fail($error);
|
|
} else {
|
|
$this->promisors[$this->index]->succeed();
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Generate a stream of promises that may be iteratively yielded to await resolution
|
|
*
|
|
* NOTE: Only values sent to Promise::update() will be streamed. The final resolution
|
|
* value of the promise is not sent to the stream -- instead, the final promise value
|
|
* is NULL. If the Promise is failed that failure will resolve the stream's current Promise.
|
|
*
|
|
* @return \Generator
|
|
*/
|
|
public function stream() {
|
|
while ($this->promisors) {
|
|
$key = key($this->promisors);
|
|
yield $this->promisors[$key]->promise();
|
|
unset($this->promisors[$key]);
|
|
}
|
|
}
|
|
}
|