1
0
mirror of https://github.com/danog/amp.git synced 2024-11-27 04:24:42 +01:00

Add awaitable/coroutine-aware loop functions

These functions mirror the standard loop methods, but run returned Generators as coroutines and invoke rethrow() on returned awaitables.
This commit is contained in:
Aaron Piotrowski 2016-08-02 12:21:21 -05:00
parent 13908fe2e5
commit aa7b76e842
4 changed files with 296 additions and 10 deletions

View File

@ -7,9 +7,8 @@ use Amp\Coroutine;
use Amp\Pause;
use Amp\Postponed;
use Amp\Loop\NativeLoop;
use Interop\Async\Loop;
Loop::execute(Amp\coroutine(function () {
Amp\execute(function () {
try {
$postponed = new Postponed;
@ -48,4 +47,4 @@ Loop::execute(Amp\coroutine(function () {
} catch (\Exception $exception) {
printf("Exception: %s\n", $exception);
}
}), $loop = new NativeLoop());
}, $loop = new NativeLoop());

View File

@ -9,9 +9,8 @@ use Amp\Observable;
use Amp\Observer;
use Amp\Pause;
use Amp\Loop\NativeLoop;
use Interop\Async\Loop;
Loop::execute(Amp\coroutine(function () {
Amp\execute(function () {
try {
$emitter = new Emitter(function (callable $emit) {
yield $emit(1);
@ -43,4 +42,4 @@ Loop::execute(Amp\coroutine(function () {
} catch (\Exception $exception) {
printf("Exception: %s\n", $exception);
}
}), $loop = new NativeLoop());
}, $loop = new NativeLoop());

View File

@ -9,13 +9,12 @@ use Amp\Observer;
use Amp\Pause;
use Amp\Postponed;
use Amp\Loop\NativeLoop;
use Interop\Async\Loop;
Loop::execute(Amp\coroutine(function () {
Amp\execute(function () {
try {
$postponed = new Postponed;
Loop::defer(function () use ($postponed) {
Amp\defer(function () use ($postponed) {
// Observer emits all values at once.
$postponed->emit(1);
$postponed->emit(2);
@ -48,4 +47,4 @@ Loop::execute(Amp\coroutine(function () {
} catch (\Exception $exception) {
printf("Exception: %s\n", $exception);
}
}), $loop = new NativeLoop());
}, $loop = new NativeLoop());

View File

@ -4,6 +4,295 @@ namespace Amp;
use Interop\Async\Awaitable;
use Interop\Async\Loop;
use Interop\Async\Loop\Driver;
/**
* Execute a callback within the event loop scope.
* If an awaitable is returned, failure reasons are forwarded to the loop error callback.
* Returned Generators are run as coroutines and handled the same as a returned awaitable.
*
* @see \Interop\Async\Loop::execute()
*
* @param callable $callback
* @param \Interop\Async\Loop\Driver|null $driver
*/
function execute(callable $callback, Driver $driver = null) {
Loop::execute(function () use ($callback) {
$result = $callback();
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Awaitable) {
rethrow($result);
}
}, $driver);
}
/**
* Stops the event loop.
*
* @see \Interop\Async\Loop::stop()
*/
function stop() {
Loop::stop();
}
/**
* Execute a callback when a stream resource becomes readable.
* If an awaitable is returned, failure reasons are forwarded to the loop error callback.
* Returned Generators are run as coroutines and handled the same as a returned awaitable.
*
* @see \Interop\Async\Loop::onReadable()
*
* @param resource $stream The stream to monitor.
* @param callable(string $watcherId, resource $stream, mixed $data) $callback The callback to execute.
* @param mixed $data
*
* @return string Watcher identifier.
*/
function onReadable($stream, callable $callback, $data = null) {
return Loop::onReadable($stream, function ($watcherId, $stream, $data) use ($callback) {
$result = $callback($watcherId, $stream, $data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Awaitable) {
rethrow($result);
}
}, $data);
}
/**
* Execute a callback when a stream resource becomes writable.
* If an awaitable is returned, failure reasons are forwarded to the loop error callback.
* Returned Generators are run as coroutines and handled the same as a returned awaitable.
*
* @see \Interop\Async\Loop::onWritable()
*
* @param resource $stream The stream to monitor.
* @param callable(string $watcherId, resource $stream, mixed $data) $callback The callback to execute.
* @param mixed $data
*
* @return string Watcher identifier.
*/
function onWritable($stream, callable $callback, $data = null) {
return Loop::onWritable($stream, function ($watcherId, $stream, $data) use ($callback) {
$result = $callback($watcherId, $stream, $data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Awaitable) {
rethrow($result);
}
}, $data);
}
/**
* Execute a callback when a signal is received.
*
* @see \Interop\Async\Loop::onSignal()
*
* @param int $signo The signal number to monitor.
* @param callable(string $watcherId, int $signo, mixed $data) $callback The callback to execute.
* @param mixed $data
*
* @return string Watcher identifier.
*/
function onSignal($signo, callable $callback, $data = null) {
return Loop::onSignal($signo, function ($watcherId, $signo, $data) use ($callback) {
$result = $callback($watcherId, $signo, $data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Awaitable) {
rethrow($result);
}
}, $data);
}
/**
* Defer the execution of a callback.
* If an awaitable is returned, failure reasons are forwarded to the loop error callback.
* Returned Generators are run as coroutines and handled the same as a returned awaitable.
*
* @see \Interop\Async\Loop::defer()
*
* @param callable(string $watcherId, mixed $data) $callback The callback to delay.
* @param mixed $data
*
* @return string Watcher identifier.
*/
function defer(callable $callback, $data = null) {
return Loop::defer(function ($watcherId, $data) use ($callback) {
$result = $callback($watcherId, $data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Awaitable) {
rethrow($result);
}
}, $data);
}
/**
* Delay the execution of a callback.
* If an awaitable is returned, failure reasons are forwarded to the loop error callback.
* Returned Generators are run as coroutines and handled the same as a returned awaitable.
*
* @see \Interop\Async\Loop::delay()
*
* @param int $time
* @param callable(string $watcherId, mixed $data) $callback The callback to delay.
* @param mixed $data
*
* @return string Watcher identifier.
*/
function delay($time, callable $callback, $data = null) {
return Loop::delay($time, function ($watcherId, $data) use ($callback) {
$result = $callback($watcherId, $data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Awaitable) {
rethrow($result);
}
}, $data);
}
/**
* Repeatedly execute a callback.
* If an awaitable is returned, failure reasons are forwarded to the loop error callback.
* Returned Generators are run as coroutines and handled the same as a returned awaitable.
*
* @see \Interop\Async\Loop::repeat()
*
* @param int $time
* @param callable(string $watcherId, mixed $data) $callback The callback to delay.
* @param mixed $data
*
* @return string Watcher identifier.
*/
function repeat($time, callable $callback, $data = null) {
return Loop::repeat($time, function ($watcherId, $data) use ($callback) {
$result = $callback($watcherId, $data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Awaitable) {
rethrow($result);
}
}, $data);
}
/**
* Enable a watcher.
*
* @see \Interop\Async\Loop::enable()
*
* @param string $watcherId
*/
function enable($watcherId) {
Loop::enable($watcherId);
}
/**
* Disable a watcher.
*
* @see \Interop\Async\Loop::disable()
*
* @param string $watcherId
*/
function disable($watcherId) {
Loop::disable($watcherId);
}
/**
* Cancel a watcher.
*
* @see \Interop\Async\Loop::cancel()
*
* @param string $watcherId
*/
function cancel($watcherId) {
Loop::cancel($watcherId);
}
/**
* Reference a watcher.
*
* @see \Interop\Async\Loop::reference()
*
* @param string $watcherId
*/
function reference($watcherId) {
Loop::reference($watcherId);
}
/**
* Unreference a watcher.
*
* @see \Interop\Async\Loop::unreference()
*
* @param string $watcherId
*/
function unreference($watcherId) {
Loop::unreference($watcherId);
}
/**
* @see \Interop\Async\Loop::setErrorHandler()
*
* @param callable $callback
*/
function setErrorHandler(callable $callback) {
Loop::setErrorHandler(function ($exception) use ($callback) {
$result = $callback($exception);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Awaitable) {
rethrow($result);
}
});
}
/**
* Wraps the callback in an awaiatable/coroutine-aware function that automatically upgrades Generators to coroutines and
* calls rethrow() on returned awaitables (including coroutines created from returned Generators).
*
* @param callable(...$args): \Generator|\Interop\Async\Awaitable|mixed $callback
*
* @return callable(...$args): void
*/
function wrap(callable $callback) {
return function (/* ...$args */) use ($callback) {
$result = \call_user_func_array($callback, \func_get_args());
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Awaitable) {
rethrow($result);
}
};
}
/**
* Returns a new function that when invoked runs the Generator returned by $worker as a coroutine.