1
0
mirror of https://github.com/danog/amp.git synced 2024-12-11 17:09:40 +01:00

Allow tested calls to Amp\Promise\wait

This commit is contained in:
Niklas Keller 2020-04-15 22:46:31 +02:00
parent b6f99cd534
commit 5b4d019753
4 changed files with 179 additions and 103 deletions

View File

@ -77,85 +77,31 @@ abstract class Driver
} }
/** /**
* @return bool True if no enabled and referenced watchers remain in the loop. * Run the event loop with an explicit stop handle.
*
* This method is intended for Amp\Promise\wait only and NOT exposed as method in Amp\Loop.
*
* @return void
* @see Driver::run()
*
*/ */
private function isEmpty(): bool public function execute(callable $callback)
{ {
foreach ($this->watchers as $watcher) { $running = true;
if ($watcher->enabled && $watcher->referenced) {
return false;
}
}
return true; $callback(static function () use (&$running) {
$running = false;
});
while ($running) {
if ($this->isEmpty()) {
return;
}
$this->tick();
}
} }
/**
* Executes a single tick of the event loop.
*
* @return void
*/
private function tick()
{
if (empty($this->deferQueue)) {
$this->deferQueue = $this->nextTickQueue;
} else {
$this->deferQueue = \array_merge($this->deferQueue, $this->nextTickQueue);
}
$this->nextTickQueue = [];
$this->activate($this->enableQueue);
$this->enableQueue = [];
foreach ($this->deferQueue as $watcher) {
if (!isset($this->deferQueue[$watcher->id])) {
continue; // Watcher disabled by another defer watcher.
}
unset($this->watchers[$watcher->id], $this->deferQueue[$watcher->id]);
try {
/** @var mixed $result */
$result = ($watcher->callback)($watcher->id, $watcher->data);
if ($result === null) {
continue;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
}
/** @psalm-suppress RedundantCondition */
$this->dispatch(empty($this->nextTickQueue) && empty($this->enableQueue) && $this->running && !$this->isEmpty());
}
/**
* Activates (enables) all the given watchers.
*
* @param Watcher[] $watchers
*
* @return void
*/
abstract protected function activate(array $watchers);
/**
* Dispatches any pending read/write, timer, and signal events.
*
* @param bool $blocking
*
* @return void
*/
abstract protected function dispatch(bool $blocking);
/** /**
* Stop the event loop. * Stop the event loop.
* *
@ -479,15 +425,6 @@ abstract class Driver
} }
} }
/**
* Deactivates (disables) the given watcher.
*
* @param Watcher $watcher
*
* @return void
*/
abstract protected function deactivate(Watcher $watcher);
/** /**
* Reference a watcher. * Reference a watcher.
* *
@ -590,23 +527,6 @@ abstract class Driver
return $previous; return $previous;
} }
/**
* Invokes the error handler with the given exception.
*
* @param \Throwable $exception The exception thrown from a watcher callback.
*
* @return void
* @throws \Throwable If no error handler has been set.
*/
protected function error(\Throwable $exception)
{
if ($this->errorHandler === null) {
throw $exception;
}
($this->errorHandler)($exception);
}
/** /**
* Returns the current loop time in millisecond increments. Note this value does not necessarily correlate to * Returns the current loop time in millisecond increments. Note this value does not necessarily correlate to
* wall-clock time, rather the value returned is meant to be used in relative comparisons to prior values returned * wall-clock time, rather the value returned is meant to be used in relative comparisons to prior values returned
@ -729,4 +649,110 @@ abstract class Driver
"running" => (bool) $this->running, "running" => (bool) $this->running,
]; ];
} }
/**
* Activates (enables) all the given watchers.
*
* @param Watcher[] $watchers
*
* @return void
*/
abstract protected function activate(array $watchers);
/**
* Dispatches any pending read/write, timer, and signal events.
*
* @param bool $blocking
*
* @return void
*/
abstract protected function dispatch(bool $blocking);
/**
* Deactivates (disables) the given watcher.
*
* @param Watcher $watcher
*
* @return void
*/
abstract protected function deactivate(Watcher $watcher);
/**
* Invokes the error handler with the given exception.
*
* @param \Throwable $exception The exception thrown from a watcher callback.
*
* @return void
* @throws \Throwable If no error handler has been set.
*/
protected function error(\Throwable $exception)
{
if ($this->errorHandler === null) {
throw $exception;
}
($this->errorHandler)($exception);
}
/**
* @return bool True if no enabled and referenced watchers remain in the loop.
*/
private function isEmpty(): bool
{
foreach ($this->watchers as $watcher) {
if ($watcher->enabled && $watcher->referenced) {
return false;
}
}
return true;
}
/**
* Executes a single tick of the event loop.
*
* @return void
*/
private function tick()
{
if (empty($this->deferQueue)) {
$this->deferQueue = $this->nextTickQueue;
} else {
$this->deferQueue = \array_merge($this->deferQueue, $this->nextTickQueue);
}
$this->nextTickQueue = [];
$this->activate($this->enableQueue);
$this->enableQueue = [];
foreach ($this->deferQueue as $watcher) {
if (!isset($this->deferQueue[$watcher->id])) {
continue; // Watcher disabled by another defer watcher.
}
unset($this->watchers[$watcher->id], $this->deferQueue[$watcher->id]);
try {
/** @var mixed $result */
$result = ($watcher->callback)($watcher->id, $watcher->data);
if ($result === null) {
continue;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
}
/** @psalm-suppress RedundantCondition */
$this->dispatch(empty($this->nextTickQueue) && empty($this->enableQueue) && $this->running && !$this->isEmpty());
}
} }

View File

@ -27,6 +27,11 @@ final class TracingDriver extends Driver
$this->driver->run(); $this->driver->run();
} }
public function execute(callable $callback)
{
$this->driver->execute($callback);
}
public function stop() public function stop()
{ {
$this->driver->stop(); $this->driver->stop();

View File

@ -196,9 +196,11 @@ namespace Amp\Promise
$resolved = false; $resolved = false;
try { try {
Loop::run(function () use (&$resolved, &$value, &$exception, $promise) { $driver = Loop::get();
$promise->onResolve(function ($e, $v) use (&$resolved, &$value, &$exception) { $driver->execute(static function (callable $stop) use (&$resolved, &$value, &$exception, $promise) {
Loop::stop(); $promise->onResolve(static function ($e, $v) use (&$resolved, &$value, &$exception, $stop) {
$stop();
$resolved = true; $resolved = true;
$exception = $e; $exception = $e;
$value = $v; $value = $v;

View File

@ -9,6 +9,8 @@ use Amp\Loop;
use Amp\PHPUnit\TestException; use Amp\PHPUnit\TestException;
use Amp\Promise; use Amp\Promise;
use Amp\Success; use Amp\Success;
use function Amp\call;
use function Amp\delay;
use function React\Promise\resolve; use function React\Promise\resolve;
class WaitTest extends BaseTest class WaitTest extends BaseTest
@ -97,4 +99,45 @@ class WaitTest extends BaseTest
$this->expectException(\TypeError::class); $this->expectException(\TypeError::class);
Promise\wait(42); Promise\wait(42);
} }
public function testWaitNested()
{
$promise = call(static function () {
yield delay(10);
return Promise\wait(new Delayed(10, 1));
});
$result = Promise\wait($promise);
$this->assertSame(1, $result);
}
public function testWaitNestedDelayed()
{
$promise = call(static function () {
yield delay(10);
$result = Promise\wait(new Delayed(10, 1));
yield delay(0);
return $result;
});
$result = Promise\wait($promise);
$this->assertSame(1, $result);
}
public function testWaitNestedConcurrent()
{
Loop::defer(function () {
Promise\wait(new Delayed(100));
});
$result = Promise\wait(new Delayed(10, 1));
$this->assertSame(1, $result);
}
} }