mirror of
https://github.com/danog/amp.git
synced 2024-12-11 17:09:40 +01:00
More direct use of fiber
Avoids creating unnecessary promise objects. delay(0) ticking the loop only once required using delay(x) instead of delay(0) in some tests.
This commit is contained in:
parent
f3b189f33f
commit
6d5e0f5ff7
@ -13,21 +13,20 @@ final class Signal implements Promise
|
|||||||
{
|
{
|
||||||
$this->placeholder = $placeholder = new Internal\Placeholder;
|
$this->placeholder = $placeholder = new Internal\Placeholder;
|
||||||
|
|
||||||
\array_unshift($signals, $signal);
|
$signals[] = $signal;
|
||||||
|
|
||||||
$watchers = &$this->watchers;
|
$watchers = &$this->watchers;
|
||||||
foreach ($signals as $signal) {
|
$callback = static function (string $id, int $signo) use (&$watchers, $placeholder): void {
|
||||||
$this->watchers[] = Loop::onSignal($signal, static function (string $id, int $signo) use (
|
foreach ($watchers as $watcher) {
|
||||||
&$watchers,
|
Loop::cancel($watcher);
|
||||||
$placeholder
|
}
|
||||||
): void {
|
$watchers = [];
|
||||||
foreach ($watchers as $watcher) {
|
|
||||||
Loop::cancel($watcher);
|
|
||||||
}
|
|
||||||
$watchers = [];
|
|
||||||
|
|
||||||
$placeholder->resolve($signo);
|
$placeholder->resolve($signo);
|
||||||
});
|
};
|
||||||
|
|
||||||
|
foreach ($signals as $signal) {
|
||||||
|
$this->watchers[] = Loop::onSignal($signal, $callback);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,9 +26,10 @@ namespace Amp
|
|||||||
}
|
}
|
||||||
|
|
||||||
return \Fiber::suspend(static fn(\Continuation $continuation) => $promise->onResolve(
|
return \Fiber::suspend(static fn(\Continuation $continuation) => $promise->onResolve(
|
||||||
static fn(?\Throwable $exception, mixed $value) => $exception
|
static fn(?\Throwable $exception, mixed $value) => match ($exception) {
|
||||||
? $continuation->throw($exception)
|
null => $continuation->resume($value),
|
||||||
: $continuation->resume($value)
|
default => $continuation->throw($exception),
|
||||||
|
}
|
||||||
), Loop::get());
|
), Loop::get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -204,11 +205,14 @@ namespace Amp
|
|||||||
/**
|
/**
|
||||||
* Async sleep for the specified number of milliseconds.
|
* Async sleep for the specified number of milliseconds.
|
||||||
*
|
*
|
||||||
* @param int $milliseconds Numberr of milliseconds to sleep.
|
* @param int $milliseconds Number of milliseconds to sleep.
|
||||||
*/
|
*/
|
||||||
function delay(int $milliseconds): void
|
function delay(int $milliseconds): void
|
||||||
{
|
{
|
||||||
await(new Delayed($milliseconds));
|
\Fiber::suspend(static fn(\Continuation $continuation) => Loop::delay(
|
||||||
|
$milliseconds,
|
||||||
|
static fn() => $continuation->resume()
|
||||||
|
), Loop::get());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -221,7 +225,21 @@ namespace Amp
|
|||||||
*/
|
*/
|
||||||
function signal(int $signal, int ...$signals): int
|
function signal(int $signal, int ...$signals): int
|
||||||
{
|
{
|
||||||
return await(new Signal($signal, ...$signals));
|
$signals[] = $signal;
|
||||||
|
|
||||||
|
return \Fiber::suspend(static function (\Continuation $continuation) use ($signals): void {
|
||||||
|
$watchers = [];
|
||||||
|
$callback = static function (string $id, int $signo) use (&$watchers, $continuation): void {
|
||||||
|
foreach ($watchers as $watcher) {
|
||||||
|
Loop::cancel($watcher);
|
||||||
|
}
|
||||||
|
$continuation->resume($signo);
|
||||||
|
};
|
||||||
|
|
||||||
|
foreach ($signals as $signal) {
|
||||||
|
$watchers[] = Loop::onSignal($signal, $callback);
|
||||||
|
}
|
||||||
|
}, Loop::get());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -244,8 +262,8 @@ namespace Amp\Promise
|
|||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
use Amp\Success;
|
use Amp\Success;
|
||||||
use Amp\TimeoutException;
|
use Amp\TimeoutException;
|
||||||
|
use function Amp\async;
|
||||||
use function Amp\await;
|
use function Amp\await;
|
||||||
use function Amp\call;
|
|
||||||
use function Amp\Internal\createTypeError;
|
use function Amp\Internal\createTypeError;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -262,10 +280,6 @@ namespace Amp\Promise
|
|||||||
*/
|
*/
|
||||||
function rethrow(Promise $promise): void
|
function rethrow(Promise $promise): void
|
||||||
{
|
{
|
||||||
if (!$promise instanceof Promise) {
|
|
||||||
$promise = adapt($promise);
|
|
||||||
}
|
|
||||||
|
|
||||||
$promise->onResolve(static function (?\Throwable $exception): void {
|
$promise->onResolve(static function (?\Throwable $exception): void {
|
||||||
if ($exception) {
|
if ($exception) {
|
||||||
throw $exception;
|
throw $exception;
|
||||||
@ -310,10 +324,6 @@ namespace Amp\Promise
|
|||||||
*/
|
*/
|
||||||
function timeout(Promise $promise, int $timeout): Promise
|
function timeout(Promise $promise, int $timeout): Promise
|
||||||
{
|
{
|
||||||
if (!$promise instanceof Promise) {
|
|
||||||
$promise = adapt($promise);
|
|
||||||
}
|
|
||||||
|
|
||||||
$deferred = new Deferred;
|
$deferred = new Deferred;
|
||||||
|
|
||||||
$watcher = Loop::delay($timeout, static function () use (&$deferred) {
|
$watcher = Loop::delay($timeout, static function () use (&$deferred) {
|
||||||
@ -354,9 +364,9 @@ namespace Amp\Promise
|
|||||||
{
|
{
|
||||||
$promise = timeout($promise, $timeout);
|
$promise = timeout($promise, $timeout);
|
||||||
|
|
||||||
return call(static function () use ($promise, $default) {
|
return async(static function () use ($promise, $default) {
|
||||||
try {
|
try {
|
||||||
return yield $promise;
|
return await($promise);
|
||||||
} catch (TimeoutException $exception) {
|
} catch (TimeoutException $exception) {
|
||||||
return $default;
|
return $default;
|
||||||
}
|
}
|
||||||
@ -590,24 +600,18 @@ namespace Amp\Promise
|
|||||||
* @param callable $callback
|
* @param callable $callback
|
||||||
*
|
*
|
||||||
* @return Promise
|
* @return Promise
|
||||||
|
*
|
||||||
|
* @deprecated Use {@see await()} instead.
|
||||||
*/
|
*/
|
||||||
function wrap(Promise $promise, callable $callback): Promise
|
function wrap(Promise $promise, callable $callback): Promise
|
||||||
{
|
{
|
||||||
$deferred = new Deferred;
|
return async(function () use ($promise, $callback) {
|
||||||
|
|
||||||
$promise->onResolve(static function (?\Throwable $exception, mixed $result) use ($deferred, $callback): void {
|
|
||||||
try {
|
try {
|
||||||
$result = $callback($exception, $result);
|
return $callback(null, await($promise));
|
||||||
} catch (\Throwable $exception) {
|
} catch (\Throwable $exception) {
|
||||||
$deferred->fail($exception);
|
return $callback($exception, null);
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$deferred->resolve($result);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
return $deferred->promise();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,7 +79,7 @@ class CancellationTest extends AsyncTestCase
|
|||||||
|
|
||||||
$cancellationSource->cancel();
|
$cancellationSource->cancel();
|
||||||
|
|
||||||
delay(0); // Tick event loop to invoke callbacks.
|
delay(1); // Tick event loop to invoke callbacks.
|
||||||
|
|
||||||
$this->assertInstanceOf(TestException::class, $reason);
|
$this->assertInstanceOf(TestException::class, $reason);
|
||||||
}
|
}
|
||||||
|
@ -41,7 +41,7 @@ class CoroutineTest extends AsyncTestCase
|
|||||||
|
|
||||||
$coroutine = new Coroutine($generator());
|
$coroutine = new Coroutine($generator());
|
||||||
|
|
||||||
delay(0); // Force loop to tick once.
|
delay(1); // Force loop to tick once.
|
||||||
|
|
||||||
$this->assertNull($yielded);
|
$this->assertNull($yielded);
|
||||||
|
|
||||||
@ -49,7 +49,7 @@ class CoroutineTest extends AsyncTestCase
|
|||||||
$reason = $exception;
|
$reason = $exception;
|
||||||
});
|
});
|
||||||
|
|
||||||
delay(0); // Force loop to tick once.
|
delay(1); // Force loop to tick once.
|
||||||
|
|
||||||
$this->assertSame($exception, $reason);
|
$this->assertSame($exception, $reason);
|
||||||
}
|
}
|
||||||
|
@ -22,7 +22,7 @@ class DeferTest extends AsyncTestCase
|
|||||||
throw $exception;
|
throw $exception;
|
||||||
});
|
});
|
||||||
|
|
||||||
delay(0); // Tick event loop.
|
delay(1); // Tick event loop.
|
||||||
|
|
||||||
$this->assertSame($exception, $reason);
|
$this->assertSame($exception, $reason);
|
||||||
}
|
}
|
||||||
|
@ -193,7 +193,7 @@ class PipelineSourceTest extends AsyncTestCase
|
|||||||
|
|
||||||
unset($pipeline); // Should relieve all back-pressure.
|
unset($pipeline); // Should relieve all back-pressure.
|
||||||
|
|
||||||
delay(0); // Tick event loop to invoke promise callbacks.
|
delay(1); // Tick event loop to invoke promise callbacks.
|
||||||
|
|
||||||
$this->assertSame(5, $invoked);
|
$this->assertSame(5, $invoked);
|
||||||
|
|
||||||
|
@ -296,7 +296,7 @@ class PromiseTest extends AsyncTestCase
|
|||||||
});
|
});
|
||||||
$succeeder(true);
|
$succeeder(true);
|
||||||
|
|
||||||
delay(0); // Tick event loop to invoke onResolve callback.
|
delay(5); // Tick event loop a few times to invoke onResolve callback.
|
||||||
|
|
||||||
$this->assertSame(3, $invoked);
|
$this->assertSame(3, $invoked);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user