From aa7b76e842619d348d6b8ae157885ef5053da79c Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Tue, 2 Aug 2016 12:21:21 -0500 Subject: [PATCH] Add awaitable/coroutine-aware loop functions These functions mirror the standard loop methods, but run returned Generators as coroutines and invoke rethrow() on returned awaitables. --- example/backpressure.php | 5 +- example/emitter.php | 5 +- example/postponed.php | 7 +- lib/functions.php | 289 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 296 insertions(+), 10 deletions(-) diff --git a/example/backpressure.php b/example/backpressure.php index ca77d62..f439202 100644 --- a/example/backpressure.php +++ b/example/backpressure.php @@ -7,9 +7,8 @@ use Amp\Coroutine; use Amp\Pause; use Amp\Postponed; use Amp\Loop\NativeLoop; -use Interop\Async\Loop; -Loop::execute(Amp\coroutine(function () { +Amp\execute(function () { try { $postponed = new Postponed; @@ -48,4 +47,4 @@ Loop::execute(Amp\coroutine(function () { } catch (\Exception $exception) { printf("Exception: %s\n", $exception); } -}), $loop = new NativeLoop()); +}, $loop = new NativeLoop()); diff --git a/example/emitter.php b/example/emitter.php index 09f7337..648a169 100644 --- a/example/emitter.php +++ b/example/emitter.php @@ -9,9 +9,8 @@ use Amp\Observable; use Amp\Observer; use Amp\Pause; use Amp\Loop\NativeLoop; -use Interop\Async\Loop; -Loop::execute(Amp\coroutine(function () { +Amp\execute(function () { try { $emitter = new Emitter(function (callable $emit) { yield $emit(1); @@ -43,4 +42,4 @@ Loop::execute(Amp\coroutine(function () { } catch (\Exception $exception) { printf("Exception: %s\n", $exception); } -}), $loop = new NativeLoop()); +}, $loop = new NativeLoop()); diff --git a/example/postponed.php b/example/postponed.php index 5677304..fd45b7a 100644 --- a/example/postponed.php +++ b/example/postponed.php @@ -9,13 +9,12 @@ use Amp\Observer; use Amp\Pause; use Amp\Postponed; use Amp\Loop\NativeLoop; -use Interop\Async\Loop; -Loop::execute(Amp\coroutine(function () { +Amp\execute(function () { try { $postponed = new Postponed; - Loop::defer(function () use ($postponed) { + Amp\defer(function () use ($postponed) { // Observer emits all values at once. $postponed->emit(1); $postponed->emit(2); @@ -48,4 +47,4 @@ Loop::execute(Amp\coroutine(function () { } catch (\Exception $exception) { printf("Exception: %s\n", $exception); } -}), $loop = new NativeLoop()); +}, $loop = new NativeLoop()); diff --git a/lib/functions.php b/lib/functions.php index 16a18e3..65482d0 100644 --- a/lib/functions.php +++ b/lib/functions.php @@ -4,6 +4,295 @@ namespace Amp; use Interop\Async\Awaitable; use Interop\Async\Loop; +use Interop\Async\Loop\Driver; + +/** + * Execute a callback within the event loop scope. + * If an awaitable is returned, failure reasons are forwarded to the loop error callback. + * Returned Generators are run as coroutines and handled the same as a returned awaitable. + * + * @see \Interop\Async\Loop::execute() + * + * @param callable $callback + * @param \Interop\Async\Loop\Driver|null $driver + */ +function execute(callable $callback, Driver $driver = null) { + Loop::execute(function () use ($callback) { + $result = $callback(); + + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } + + if ($result instanceof Awaitable) { + rethrow($result); + } + }, $driver); +} + +/** + * Stops the event loop. + * + * @see \Interop\Async\Loop::stop() + */ +function stop() { + Loop::stop(); +} + +/** + * Execute a callback when a stream resource becomes readable. + * If an awaitable is returned, failure reasons are forwarded to the loop error callback. + * Returned Generators are run as coroutines and handled the same as a returned awaitable. + * + * @see \Interop\Async\Loop::onReadable() + * + * @param resource $stream The stream to monitor. + * @param callable(string $watcherId, resource $stream, mixed $data) $callback The callback to execute. + * @param mixed $data + * + * @return string Watcher identifier. + */ +function onReadable($stream, callable $callback, $data = null) { + return Loop::onReadable($stream, function ($watcherId, $stream, $data) use ($callback) { + $result = $callback($watcherId, $stream, $data); + + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } + + if ($result instanceof Awaitable) { + rethrow($result); + } + }, $data); +} + +/** + * Execute a callback when a stream resource becomes writable. + * If an awaitable is returned, failure reasons are forwarded to the loop error callback. + * Returned Generators are run as coroutines and handled the same as a returned awaitable. + * + * @see \Interop\Async\Loop::onWritable() + * + * @param resource $stream The stream to monitor. + * @param callable(string $watcherId, resource $stream, mixed $data) $callback The callback to execute. + * @param mixed $data + * + * @return string Watcher identifier. + */ +function onWritable($stream, callable $callback, $data = null) { + return Loop::onWritable($stream, function ($watcherId, $stream, $data) use ($callback) { + $result = $callback($watcherId, $stream, $data); + + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } + + if ($result instanceof Awaitable) { + rethrow($result); + } + }, $data); +} + +/** + * Execute a callback when a signal is received. + * + * @see \Interop\Async\Loop::onSignal() + * + * @param int $signo The signal number to monitor. + * @param callable(string $watcherId, int $signo, mixed $data) $callback The callback to execute. + * @param mixed $data + * + * @return string Watcher identifier. + */ +function onSignal($signo, callable $callback, $data = null) { + return Loop::onSignal($signo, function ($watcherId, $signo, $data) use ($callback) { + $result = $callback($watcherId, $signo, $data); + + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } + + if ($result instanceof Awaitable) { + rethrow($result); + } + }, $data); +} + +/** + * Defer the execution of a callback. + * If an awaitable is returned, failure reasons are forwarded to the loop error callback. + * Returned Generators are run as coroutines and handled the same as a returned awaitable. + * + * @see \Interop\Async\Loop::defer() + * + * @param callable(string $watcherId, mixed $data) $callback The callback to delay. + * @param mixed $data + * + * @return string Watcher identifier. + */ +function defer(callable $callback, $data = null) { + return Loop::defer(function ($watcherId, $data) use ($callback) { + $result = $callback($watcherId, $data); + + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } + + if ($result instanceof Awaitable) { + rethrow($result); + } + }, $data); +} + +/** + * Delay the execution of a callback. + * If an awaitable is returned, failure reasons are forwarded to the loop error callback. + * Returned Generators are run as coroutines and handled the same as a returned awaitable. + * + * @see \Interop\Async\Loop::delay() + * + * @param int $time + * @param callable(string $watcherId, mixed $data) $callback The callback to delay. + * @param mixed $data + * + * @return string Watcher identifier. + */ +function delay($time, callable $callback, $data = null) { + return Loop::delay($time, function ($watcherId, $data) use ($callback) { + $result = $callback($watcherId, $data); + + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } + + if ($result instanceof Awaitable) { + rethrow($result); + } + }, $data); +} + +/** + * Repeatedly execute a callback. + * If an awaitable is returned, failure reasons are forwarded to the loop error callback. + * Returned Generators are run as coroutines and handled the same as a returned awaitable. + * + * @see \Interop\Async\Loop::repeat() + * + * @param int $time + * @param callable(string $watcherId, mixed $data) $callback The callback to delay. + * @param mixed $data + * + * @return string Watcher identifier. + */ +function repeat($time, callable $callback, $data = null) { + return Loop::repeat($time, function ($watcherId, $data) use ($callback) { + $result = $callback($watcherId, $data); + + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } + + if ($result instanceof Awaitable) { + rethrow($result); + } + }, $data); +} + +/** + * Enable a watcher. + * + * @see \Interop\Async\Loop::enable() + * + * @param string $watcherId + */ +function enable($watcherId) { + Loop::enable($watcherId); +} + +/** + * Disable a watcher. + * + * @see \Interop\Async\Loop::disable() + * + * @param string $watcherId + */ +function disable($watcherId) { + Loop::disable($watcherId); +} + +/** + * Cancel a watcher. + * + * @see \Interop\Async\Loop::cancel() + * + * @param string $watcherId + */ +function cancel($watcherId) { + Loop::cancel($watcherId); +} + +/** + * Reference a watcher. + * + * @see \Interop\Async\Loop::reference() + * + * @param string $watcherId + */ +function reference($watcherId) { + Loop::reference($watcherId); +} + +/** + * Unreference a watcher. + * + * @see \Interop\Async\Loop::unreference() + * + * @param string $watcherId + */ +function unreference($watcherId) { + Loop::unreference($watcherId); +} + +/** + * @see \Interop\Async\Loop::setErrorHandler() + * + * @param callable $callback + */ +function setErrorHandler(callable $callback) { + Loop::setErrorHandler(function ($exception) use ($callback) { + $result = $callback($exception); + + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } + + if ($result instanceof Awaitable) { + rethrow($result); + } + }); +} + +/** + * Wraps the callback in an awaiatable/coroutine-aware function that automatically upgrades Generators to coroutines and + * calls rethrow() on returned awaitables (including coroutines created from returned Generators). + * + * @param callable(...$args): \Generator|\Interop\Async\Awaitable|mixed $callback + * + * @return callable(...$args): void + */ +function wrap(callable $callback) { + return function (/* ...$args */) use ($callback) { + $result = \call_user_func_array($callback, \func_get_args()); + + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } + + if ($result instanceof Awaitable) { + rethrow($result); + } + }; +} /** * Returns a new function that when invoked runs the Generator returned by $worker as a coroutine.