1
0
mirror of https://github.com/danog/amp.git synced 2025-01-22 21:31:18 +01:00

Move loop fiber storage to Loop

This commit is contained in:
Aaron Piotrowski 2021-02-19 11:38:59 -06:00
parent 99e765a8be
commit 41b9af525d
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
2 changed files with 36 additions and 23 deletions

View File

@ -18,6 +18,8 @@ final class Loop
{
private static Driver $driver;
private static \Fiber $fiber;
/**
* Disable construction as this is a static class.
*/
@ -35,8 +37,8 @@ final class Loop
*/
public static function setDriver(Driver $driver): void
{
if (isset(self::$driver) && self::$driver->isRunning()) {
throw new \Error("Can't swap the event loop while it is running");
if (isset(self::$fiber)) {
throw new \Error("Can't swap the event loop once an associated fiber has been created");
}
try {
@ -388,6 +390,26 @@ final class Loop
{
return self::$driver;
}
/**
* Retrieve the fiber instance associated with the event loop driver.
*
* @return \Fiber
*/
public static function getFiber(): \Fiber
{
if (!isset(self::$fiber) || self::$fiber->isTerminated()) {
self::$fiber = $fiber = new \Fiber(static fn() => self::$driver->run());
// Run event loop to completion on shutdown.
\register_shutdown_function(static function () use ($fiber): void {
if ($fiber->isSuspended()) {
$fiber->resume();
}
});
}
return self::$fiber;
}
}
// Default factory, don't move this to a file loaded by the composer "files" autoload mechanism, otherwise custom

View File

@ -21,8 +21,6 @@ namespace Amp
*/
function await(Promise|array $promise): mixed
{
static $loop;
if (!$promise instanceof Promise) {
$promise = Promise\all($promise);
}
@ -31,7 +29,7 @@ namespace Amp
$resolved = false;
if ($fiber) { // Awaiting from within a fiber.
if ($fiber === $loop) {
if ($fiber === Loop::getFiber()) {
throw new \Error(\sprintf('Cannot call %s() within an event loop callback', __FUNCTION__));
}
@ -46,28 +44,21 @@ namespace Amp
$fiber->resume($value);
});
// Suspend the current fiber until the promise is resolved.
$value = \Fiber::suspend();
if (!$resolved) {
// $resolved should only be false if the fiber was manually resumed outside of the callback above.
throw new \Error('Fiber resumed before promise was resolved');
try {
// Suspend the current fiber until the promise is resolved.
$value = \Fiber::suspend();
} finally {
if (!$resolved) {
// $resolved should only be false if the fiber was manually resumed outside of the callback above.
throw new \Error('Fiber resumed before promise was resolved');
}
}
return $value;
}
// Awaiting from {main}.
if (!$loop || $loop->isTerminated()) {
$loop = new \Fiber(static fn() => Loop::getDriver()->run());
// Run event loop to completion on shutdown.
\register_shutdown_function(static function () use ($loop): void {
if ($loop->isSuspended()) {
$loop->resume();
}
});
}
$fiber = Loop::getFiber();
$promise->onResolve(static function (?\Throwable $exception, mixed $value) use (&$resolved): void {
$resolved = true;
@ -82,7 +73,7 @@ namespace Amp
});
try {
$lambda = $loop->isStarted() ? $loop->resume() : $loop->start();
$lambda = $fiber->isStarted() ? $fiber->resume() : $fiber->start();
} catch (\Throwable $exception) {
throw new \Error('Exception unexpectedly thrown from event loop', 0, $exception);
}
@ -119,7 +110,7 @@ namespace Amp
}
});
$fiber->start();
Loop::defer(static fn() => $fiber->start());
return new Internal\PrivatePromise($placeholder);
}