mirror of
https://github.com/danog/amp.git
synced 2024-11-26 20:15:00 +01:00
Add Flow
This commit is contained in:
parent
5889f4e0fd
commit
41b18fe6cc
59
lib/AsyncGenerator.php
Normal file
59
lib/AsyncGenerator.php
Normal file
@ -0,0 +1,59 @@
|
||||
<?php
|
||||
|
||||
namespace Amp;
|
||||
|
||||
final class AsyncGenerator implements Flow
|
||||
{
|
||||
/** @var \Amp\Flow */
|
||||
private $flow;
|
||||
|
||||
/**
|
||||
* @param callable(callable(mixed $value, mixed $key = null): Promise $yield): \Generator $callable
|
||||
*
|
||||
* @throws \Error Thrown if the callable does not return a Generator.
|
||||
*/
|
||||
public function __construct(callable $callable)
|
||||
{
|
||||
$generator = new class {
|
||||
use Internal\Generator {
|
||||
yield as public;
|
||||
complete as public;
|
||||
fail as public;
|
||||
}
|
||||
};
|
||||
|
||||
if (\PHP_VERSION_ID < 70100) {
|
||||
$yield = static function ($value, $key = null) use ($generator): Promise {
|
||||
return $generator->yield($value, $key);
|
||||
};
|
||||
} else {
|
||||
$yield = \Closure::fromCallable([$generator, "yield"]);
|
||||
}
|
||||
|
||||
$result = $callable($yield);
|
||||
|
||||
if (!$result instanceof \Generator) {
|
||||
throw new \Error("The callable did not return a Generator");
|
||||
}
|
||||
|
||||
$coroutine = new Coroutine($result);
|
||||
$coroutine->onResolve(static function ($exception) use ($generator) {
|
||||
if ($exception) {
|
||||
$generator->fail($exception);
|
||||
return;
|
||||
}
|
||||
|
||||
$generator->complete();
|
||||
});
|
||||
|
||||
$this->flow = $generator->iterate();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function continue(): Promise
|
||||
{
|
||||
return $this->flow->continue();
|
||||
}
|
||||
}
|
17
lib/DisposedException.php
Normal file
17
lib/DisposedException.php
Normal file
@ -0,0 +1,17 @@
|
||||
<?php
|
||||
|
||||
namespace Amp;
|
||||
|
||||
/**
|
||||
* Will be thrown in case an operation is cancelled.
|
||||
*
|
||||
* @see CancellationToken
|
||||
* @see CancellationTokenSource
|
||||
*/
|
||||
class DisposedException extends \Exception
|
||||
{
|
||||
public function __construct(\Throwable $previous = null)
|
||||
{
|
||||
parent::__construct("The flow has been disposed", 0, $previous);
|
||||
}
|
||||
}
|
20
lib/Flow.php
Normal file
20
lib/Flow.php
Normal file
@ -0,0 +1,20 @@
|
||||
<?php
|
||||
|
||||
namespace Amp;
|
||||
|
||||
/**
|
||||
* Defines a flow, an asynchronous generator that yields key/value pairs as data is available.
|
||||
*/
|
||||
interface Flow
|
||||
{
|
||||
/**
|
||||
* Succeeds with a [value, key] pair or null if no more values are available. If the flow fails, the returned promise
|
||||
* will fail with the same exception.
|
||||
*
|
||||
* @return \Amp\Promise<[mixed $value, mixed $key]|null>
|
||||
*
|
||||
* @throws \Error If the prior promise returned from this method has not resolved.
|
||||
* @throws \Throwable The exception used to fail the flow.
|
||||
*/
|
||||
public function continue(): Promise;
|
||||
}
|
64
lib/Generator.php
Normal file
64
lib/Generator.php
Normal file
@ -0,0 +1,64 @@
|
||||
<?php
|
||||
|
||||
namespace Amp;
|
||||
|
||||
/**
|
||||
* Generator is a container for a Flow that can yield values using the yield() method and completed using the
|
||||
* complete() and fail() methods of this object. The contained Flow may be accessed using the iterate()
|
||||
* method. This object should not be part of a public API, but used internally to create and yield values to a Flow.
|
||||
*/
|
||||
final class Generator
|
||||
{
|
||||
/** @var object Has public yield, complete, and fail methods. */
|
||||
private $generator;
|
||||
|
||||
public function __construct()
|
||||
{
|
||||
$this->generator = new class {
|
||||
use Internal\Generator {
|
||||
yield as public;
|
||||
complete as public;
|
||||
fail as public;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \Amp\Flow
|
||||
*/
|
||||
public function iterate(): Flow
|
||||
{
|
||||
return $this->generator->flow();
|
||||
}
|
||||
|
||||
/**
|
||||
* Yields a value to the flow.
|
||||
*
|
||||
* @param mixed $value
|
||||
* @param mixed $key Using null auto-generates an incremental integer key.
|
||||
*
|
||||
* @return \Amp\Promise
|
||||
*/
|
||||
public function yield($value, $key = null): Promise
|
||||
{
|
||||
return $this->generator->yield($value, $key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Completes the flow.
|
||||
*/
|
||||
public function complete()
|
||||
{
|
||||
$this->generator->complete();
|
||||
}
|
||||
|
||||
/**
|
||||
* Fails the flow with the given reason.
|
||||
*
|
||||
* @param \Throwable $reason
|
||||
*/
|
||||
public function fail(\Throwable $reason)
|
||||
{
|
||||
$this->generator->fail($reason);
|
||||
}
|
||||
}
|
222
lib/Internal/Generator.php
Normal file
222
lib/Internal/Generator.php
Normal file
@ -0,0 +1,222 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Internal;
|
||||
|
||||
use Amp\Deferred;
|
||||
use Amp\DisposedException;
|
||||
use Amp\Failure;
|
||||
use Amp\Flow;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
|
||||
/**
|
||||
* Trait used by Iterator implementations. Do not use this trait in your code, instead compose your class from one of
|
||||
* the available classes implementing \Amp\Iterator.
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
trait Generator
|
||||
{
|
||||
/** @var \Amp\Promise|null */
|
||||
private $complete;
|
||||
|
||||
/** @var mixed[] */
|
||||
private $values = [];
|
||||
|
||||
/** @var \Amp\Deferred[] */
|
||||
private $backPressure = [];
|
||||
|
||||
/** @var \Amp\Deferred|null */
|
||||
private $waiting;
|
||||
|
||||
/** @var bool */
|
||||
private $disposed = false;
|
||||
|
||||
/** @var null|array */
|
||||
private $resolutionTrace;
|
||||
|
||||
/** @var int. */
|
||||
private $nextKey = 0;
|
||||
|
||||
/**
|
||||
* Returns an flow instance that when destroyed fails further calls to yield() with an instance of \Amp\DisposedException.
|
||||
*
|
||||
* @return \Amp\Flow
|
||||
*/
|
||||
public function iterate(): Flow
|
||||
{
|
||||
$values = &$this->values;
|
||||
$backPressure = &$this->backPressure;
|
||||
$complete = &$this->complete;
|
||||
$waiting = &$this->waiting;
|
||||
$disposed = &$this->disposed;
|
||||
|
||||
return new class($values, $backPressure, $disposed, $waiting, $complete) implements Flow {
|
||||
/** @var \Amp\Promise|null */
|
||||
private $complete;
|
||||
|
||||
/** @var mixed[] */
|
||||
private $values = [];
|
||||
|
||||
/** @var \Amp\Deferred[] */
|
||||
private $backPressure = [];
|
||||
|
||||
/** @var \Amp\Deferred|null */
|
||||
private $waiting;
|
||||
|
||||
/** @var bool */
|
||||
private $disposed = false;
|
||||
|
||||
/** @var int */
|
||||
private $position = -1;
|
||||
|
||||
public function __construct(
|
||||
array &$values,
|
||||
array &$backpressure,
|
||||
bool &$disposed,
|
||||
Promise &$waiting = null,
|
||||
Promise &$complete = null
|
||||
) {
|
||||
$this->values = &$values;
|
||||
$this->backPressure = &$backpressure;
|
||||
$this->disposed = &$disposed;
|
||||
$this->waiting = &$waiting;
|
||||
$this->complete = &$complete;
|
||||
}
|
||||
|
||||
public function __destruct()
|
||||
{
|
||||
if (!empty($this->backPressure)) {
|
||||
for ($key = \key($this->backPressure); isset($this->backPressure[$key]); $key++) {
|
||||
$deferred = $this->backPressure[$key];
|
||||
unset($this->values[$key], $this->backPressure[$key]);
|
||||
$deferred->resolve();
|
||||
}
|
||||
}
|
||||
|
||||
$this->disposed = true;
|
||||
}
|
||||
|
||||
public function continue(): Promise
|
||||
{
|
||||
if ($this->waiting !== null) {
|
||||
throw new \Error("The prior promise returned must resolve before invoking this method again");
|
||||
}
|
||||
|
||||
if (isset($this->backPressure[$this->position])) {
|
||||
$deferred = $this->backPressure[$this->position];
|
||||
unset($this->backPressure[$this->position]);
|
||||
$deferred->resolve();
|
||||
}
|
||||
|
||||
unset($this->values[$this->position]);
|
||||
|
||||
++$this->position;
|
||||
|
||||
if (isset($this->values[$this->position])) {
|
||||
return new Success($this->values[$this->position]);
|
||||
}
|
||||
|
||||
if ($this->complete) {
|
||||
return $this->complete;
|
||||
}
|
||||
|
||||
$this->waiting = new Deferred;
|
||||
return $this->waiting->promise();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Yields a value from the flow. The returned promise is resolved with the yielded value once all disposed
|
||||
* have been invoked.
|
||||
*
|
||||
* @param mixed $value
|
||||
* @param mixed $key Using null auto-generates an incremental integer key.
|
||||
*
|
||||
* @return \Amp\Promise
|
||||
*
|
||||
* @throws \Error If the iterator has completed.
|
||||
*/
|
||||
private function yield($value, $key = null): Promise
|
||||
{
|
||||
if ($this->complete) {
|
||||
throw new \Error("Flows cannot yield values after calling complete");
|
||||
}
|
||||
|
||||
if ($this->disposed) {
|
||||
return new Failure(new DisposedException);
|
||||
}
|
||||
|
||||
if ($key === null) {
|
||||
$key = $this->nextKey++;
|
||||
} elseif (\is_int($key) && $key > $this->nextKey) {
|
||||
$this->nextKey = $key + 1;
|
||||
}
|
||||
|
||||
$this->values[] = $yielded = [$value, $key];
|
||||
$this->backPressure[] = $pressure = new Deferred;
|
||||
|
||||
if ($this->waiting !== null) {
|
||||
$waiting = $this->waiting;
|
||||
$this->waiting = null;
|
||||
$waiting->resolve($yielded);
|
||||
}
|
||||
|
||||
return $pressure->promise();
|
||||
}
|
||||
|
||||
/**
|
||||
* Completes the flow.
|
||||
*
|
||||
* @throws \Error If the flow has already been completed.
|
||||
*/
|
||||
private function complete()
|
||||
{
|
||||
if ($this->complete) {
|
||||
$message = "Flow has already been completed";
|
||||
|
||||
if (isset($this->resolutionTrace)) {
|
||||
$trace = formatStacktrace($this->resolutionTrace);
|
||||
$message .= ". Previous completion trace:\n\n{$trace}\n\n";
|
||||
} else {
|
||||
// @codeCoverageIgnoreStart
|
||||
$message .= ", define environment variable AMP_DEBUG or const AMP_DEBUG = true and enable assertions "
|
||||
. "for a stacktrace of the previous resolution.";
|
||||
// @codeCoverageIgnoreEnd
|
||||
}
|
||||
|
||||
throw new \Error($message);
|
||||
}
|
||||
|
||||
\assert((function () {
|
||||
$env = \getenv("AMP_DEBUG");
|
||||
if (($env !== "0" && $env !== "false") || (\defined("AMP_DEBUG") && \AMP_DEBUG)) {
|
||||
$trace = \debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS);
|
||||
\array_shift($trace); // remove current closure
|
||||
$this->resolutionTrace = $trace;
|
||||
}
|
||||
|
||||
return true;
|
||||
})());
|
||||
|
||||
$this->complete = new Success;
|
||||
|
||||
if ($this->waiting !== null) {
|
||||
$waiting = $this->waiting;
|
||||
$this->waiting = null;
|
||||
$waiting->resolve($this->complete);
|
||||
}
|
||||
}
|
||||
|
||||
private function fail(\Throwable $exception)
|
||||
{
|
||||
$this->complete = new Failure($exception);
|
||||
|
||||
if ($this->waiting !== null) {
|
||||
$waiting = $this->waiting;
|
||||
$this->waiting = null;
|
||||
$waiting->resolve($this->complete);
|
||||
}
|
||||
}
|
||||
}
|
126
test/GeneratorTraitTest.php
Normal file
126
test/GeneratorTraitTest.php
Normal file
@ -0,0 +1,126 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Test;
|
||||
|
||||
use Amp\Loop;
|
||||
use Amp\Promise;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
|
||||
class AsyncGenerator
|
||||
{
|
||||
use \Amp\Internal\Generator {
|
||||
yield as public;
|
||||
complete as public;
|
||||
fail as public;
|
||||
}
|
||||
}
|
||||
|
||||
class GeneratorTraitTest extends TestCase
|
||||
{
|
||||
/** @var \Amp\Test\AsyncGenerator */
|
||||
private $generator;
|
||||
|
||||
public function setUp()
|
||||
{
|
||||
$this->generator = new AsyncGenerator;
|
||||
}
|
||||
|
||||
public function testYield()
|
||||
{
|
||||
Loop::run(function () {
|
||||
$value = 'Emitted Value';
|
||||
|
||||
$promise = $this->generator->yield($value);
|
||||
$iterator = $this->generator->iterate();
|
||||
|
||||
$this->assertSame([$value, 0], yield $iterator->continue());
|
||||
|
||||
$this->assertInstanceOf(Promise::class, $promise);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @depends testYield
|
||||
*/
|
||||
public function testYieldWithKey()
|
||||
{
|
||||
Loop::run(function () {
|
||||
$value = 'Emitted value';
|
||||
|
||||
$promise = $this->generator->yield($value);
|
||||
$iterator = $this->generator->iterate();
|
||||
|
||||
$this->assertSame([$value, 0], yield $iterator->continue());
|
||||
|
||||
$this->assertInstanceOf(Promise::class, $promise);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @depends testYield
|
||||
* @expectedException \Error
|
||||
* @expectedExceptionMessage Flows cannot yield values after calling complete
|
||||
*/
|
||||
public function testYieldAfterComplete()
|
||||
{
|
||||
$this->generator->complete();
|
||||
$this->generator->yield(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Error
|
||||
* @expectedExceptionMessage The prior promise returned must resolve before invoking this method again
|
||||
*/
|
||||
public function testDoubleAdvance()
|
||||
{
|
||||
$iterator = $this->generator->iterate();
|
||||
$iterator->continue();
|
||||
$iterator->continue();
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Error
|
||||
* @expectedExceptionMessage Flow has already been completed
|
||||
*/
|
||||
public function testDoubleComplete()
|
||||
{
|
||||
$this->generator->complete();
|
||||
$this->generator->complete();
|
||||
}
|
||||
|
||||
public function testDestroyingIteratorRelievesBackPressure()
|
||||
{
|
||||
$iterator = $this->generator->iterate();
|
||||
|
||||
$invoked = 0;
|
||||
$onResolved = function () use (&$invoked) {
|
||||
$invoked++;
|
||||
};
|
||||
|
||||
foreach (\range(1, 5) as $value) {
|
||||
$promise = $this->generator->yield($value);
|
||||
$promise->onResolve($onResolved);
|
||||
}
|
||||
|
||||
$this->assertSame(0, $invoked);
|
||||
|
||||
unset($iterator);
|
||||
|
||||
$this->assertSame(5, $invoked);
|
||||
}
|
||||
|
||||
/**
|
||||
* @depends testDestroyingIteratorRelievesBackPressure
|
||||
* @expectedException \Amp\DisposedException
|
||||
* @expectedExceptionMessage The flow has been disposed
|
||||
*/
|
||||
public function testYieldAfterDisposal()
|
||||
{
|
||||
Loop::run(function () {
|
||||
$iterator = $this->generator->iterate();
|
||||
unset($iterator);
|
||||
yield $this->generator->yield(1);
|
||||
});
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user