mirror of
https://github.com/danog/amp.git
synced 2025-01-23 05:41:25 +01:00
Rename Amp\Future\spawn() to Amp\coroutine()
Added delay() and trap() functions.
This commit is contained in:
parent
0ce29b3b22
commit
c98c9a40d2
@ -57,6 +57,7 @@
|
|||||||
"Amp\\": "lib"
|
"Amp\\": "lib"
|
||||||
},
|
},
|
||||||
"files": [
|
"files": [
|
||||||
|
"lib/functions.php",
|
||||||
"lib/Future/functions.php",
|
"lib/Future/functions.php",
|
||||||
"lib/Internal/functions.php"
|
"lib/Internal/functions.php"
|
||||||
]
|
]
|
||||||
|
@ -5,27 +5,6 @@ namespace Amp\Future;
|
|||||||
use Amp\CancellationToken;
|
use Amp\CancellationToken;
|
||||||
use Amp\CompositeException;
|
use Amp\CompositeException;
|
||||||
use Amp\Future;
|
use Amp\Future;
|
||||||
use Amp\Internal;
|
|
||||||
use Revolt\EventLoop\Loop;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Spawns a new fiber asynchronously using the given callable and argument list.
|
|
||||||
*
|
|
||||||
* @template T
|
|
||||||
*
|
|
||||||
* @param callable():T $callback
|
|
||||||
*
|
|
||||||
* @return Future<T>
|
|
||||||
*/
|
|
||||||
function spawn(callable $callback): Future
|
|
||||||
{
|
|
||||||
$state = new Internal\FutureState;
|
|
||||||
|
|
||||||
$fiber = new \Fiber('Amp\\Internal\\run');
|
|
||||||
Loop::queue([$fiber, 'start'], $state, $callback);
|
|
||||||
|
|
||||||
return new Future($state);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unwraps the first completed future.
|
* Unwraps the first completed future.
|
||||||
|
90
lib/functions.php
Normal file
90
lib/functions.php
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Amp;
|
||||||
|
|
||||||
|
use Revolt\EventLoop\Loop;
|
||||||
|
use Revolt\EventLoop\UnsupportedFeatureException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new fiber asynchronously using the given callable, returning a Future that is completed with the
|
||||||
|
* eventual return value of the passed function or will fail if the callback throws an exception.
|
||||||
|
*
|
||||||
|
* @template T
|
||||||
|
*
|
||||||
|
* @param callable():T $callback
|
||||||
|
*
|
||||||
|
* @return Future<T>
|
||||||
|
*/
|
||||||
|
function coroutine(callable $callback): Future
|
||||||
|
{
|
||||||
|
$state = new Internal\FutureState;
|
||||||
|
|
||||||
|
$fiber = new \Fiber('Amp\\Internal\\run');
|
||||||
|
Loop::queue([$fiber, 'start'], $state, $callback);
|
||||||
|
|
||||||
|
return new Future($state);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Non-blocking sleep for the specified number of seconds.
|
||||||
|
*
|
||||||
|
* @param float $timeout Number of seconds to wait.
|
||||||
|
* @param bool $reference If false, unreference the underlying watcher.
|
||||||
|
* @param CancellationToken|null $token Cancel waiting if cancellation is requested.
|
||||||
|
*/
|
||||||
|
function delay(float $timeout, bool $reference = true, ?CancellationToken $token = null): void
|
||||||
|
{
|
||||||
|
$suspension = Loop::createSuspension();
|
||||||
|
$watcher = Loop::delay($timeout, static fn () => $suspension->resume(null));
|
||||||
|
$id = $token?->subscribe(static fn (CancelledException $exception) => $suspension->throw($exception));
|
||||||
|
|
||||||
|
if (!$reference) {
|
||||||
|
Loop::unreference($watcher);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
$suspension->suspend();
|
||||||
|
} finally {
|
||||||
|
$token?->unsubscribe($id);
|
||||||
|
Loop::cancel($watcher);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for signal(s) in a non-blocking way.
|
||||||
|
*
|
||||||
|
* @param int|array $signals Signal number or array of signal numbers.
|
||||||
|
* @param bool $reference If false, unreference the underlying watcher.
|
||||||
|
* @param CancellationToken|null $token Cancel waiting if cancellation is requested.
|
||||||
|
*
|
||||||
|
* @return int Caught signal number.
|
||||||
|
* @throws UnsupportedFeatureException
|
||||||
|
*/
|
||||||
|
function trap(int|array $signals, bool $reference = true, ?CancellationToken $token = null): int
|
||||||
|
{
|
||||||
|
$suspension = Loop::createSuspension();
|
||||||
|
$callback = static fn (string $watcher, int $signo) => $suspension->resume($signo);
|
||||||
|
$id = $token?->subscribe(static fn (CancelledException $exception) => $suspension->throw($exception));
|
||||||
|
|
||||||
|
$watchers = [];
|
||||||
|
|
||||||
|
if (\is_int($signals)) {
|
||||||
|
$signals = [$signals];
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach ($signals as $signo) {
|
||||||
|
$watchers[] = $watcher = Loop::onSignal($signo, $callback);
|
||||||
|
if (!$reference) {
|
||||||
|
Loop::unreference($watcher);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
return $suspension->suspend();
|
||||||
|
} finally {
|
||||||
|
$token?->unsubscribe($id);
|
||||||
|
foreach ($watchers as $watcher) {
|
||||||
|
Loop::cancel($watcher);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -6,7 +6,7 @@ use Amp\CancellationTokenSource;
|
|||||||
use Amp\PHPUnit\AsyncTestCase;
|
use Amp\PHPUnit\AsyncTestCase;
|
||||||
use Amp\PHPUnit\TestException;
|
use Amp\PHPUnit\TestException;
|
||||||
use Revolt\EventLoop\Loop;
|
use Revolt\EventLoop\Loop;
|
||||||
use function Revolt\EventLoop\delay;
|
use function Amp\delay;
|
||||||
|
|
||||||
class CancellationTest extends AsyncTestCase
|
class CancellationTest extends AsyncTestCase
|
||||||
{
|
{
|
||||||
|
@ -7,7 +7,7 @@ use Amp\PHPUnit\AsyncTestCase;
|
|||||||
use Amp\TimeoutCancellationToken;
|
use Amp\TimeoutCancellationToken;
|
||||||
use Amp\TimeoutException;
|
use Amp\TimeoutException;
|
||||||
use Revolt\EventLoop\Loop;
|
use Revolt\EventLoop\Loop;
|
||||||
use function Revolt\EventLoop\delay;
|
use function Amp\delay;
|
||||||
|
|
||||||
class TimeoutCancellationTokenTest extends AsyncTestCase
|
class TimeoutCancellationTokenTest extends AsyncTestCase
|
||||||
{
|
{
|
||||||
|
@ -9,9 +9,9 @@ use Amp\Future;
|
|||||||
use Amp\TimeoutCancellationToken;
|
use Amp\TimeoutCancellationToken;
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Revolt\EventLoop\Loop;
|
use Revolt\EventLoop\Loop;
|
||||||
use function Amp\Future\spawn;
|
use function Amp\coroutine;
|
||||||
|
use function Amp\delay;
|
||||||
use function Revolt\EventLoop\defer;
|
use function Revolt\EventLoop\defer;
|
||||||
use function Revolt\EventLoop\delay;
|
|
||||||
|
|
||||||
class FutureTest extends TestCase
|
class FutureTest extends TestCase
|
||||||
{
|
{
|
||||||
@ -168,13 +168,12 @@ class FutureTest extends TestCase
|
|||||||
*/
|
*/
|
||||||
private function delay(float $seconds, mixed $value): Future
|
private function delay(float $seconds, mixed $value): Future
|
||||||
{
|
{
|
||||||
return spawn(
|
return coroutine(
|
||||||
/**
|
/**
|
||||||
* @return T
|
* @return T
|
||||||
*/
|
*/
|
||||||
static function () use ($seconds, $value) {
|
static function () use ($seconds, $value): mixed {
|
||||||
delay($seconds);
|
delay($seconds);
|
||||||
|
|
||||||
return $value;
|
return $value;
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user