1
0
mirror of https://github.com/danog/amp.git synced 2024-11-26 20:15:00 +01:00

Run watcher callbacks as coroutines within drivers

This commit is contained in:
Aaron Piotrowski 2017-03-10 16:03:41 -06:00
parent ca30af4d22
commit 1fea860a05
6 changed files with 147 additions and 20 deletions

View File

@ -81,7 +81,7 @@ final class Loop {
* @return string An unique identifier that can be used to cancel, enable or disable the watcher.
*/
public static function defer(callable $callback, $data = null): string {
return self::$driver->defer(wrap($callback), $data);
return self::$driver->defer($callback, $data);
}
/**
@ -101,7 +101,7 @@ final class Loop {
* @return string An unique identifier that can be used to cancel, enable or disable the watcher.
*/
public static function delay(int $delay, callable $callback, $data = null): string {
return self::$driver->delay($delay, wrap($callback), $data);
return self::$driver->delay($delay, $callback, $data);
}
/**
@ -121,7 +121,7 @@ final class Loop {
* @return string An unique identifier that can be used to cancel, enable or disable the watcher.
*/
public static function repeat(int $interval, callable $callback, $data = null): string {
return self::$driver->repeat($interval, wrap($callback), $data);
return self::$driver->repeat($interval, $callback, $data);
}
/**
@ -144,7 +144,7 @@ final class Loop {
* @return string An unique identifier that can be used to cancel, enable or disable the watcher.
*/
public static function onReadable($stream, callable $callback, $data = null): string {
return self::$driver->onReadable($stream, wrap($callback), $data);
return self::$driver->onReadable($stream, $callback, $data);
}
/**
@ -167,7 +167,7 @@ final class Loop {
* @return string An unique identifier that can be used to cancel, enable or disable the watcher.
*/
public static function onWritable($stream, callable $callback, $data = null): string {
return self::$driver->onWritable($stream, wrap($callback), $data);
return self::$driver->onWritable($stream, $callback, $data);
}
/**
@ -191,7 +191,7 @@ final class Loop {
* @throws UnsupportedFeatureException If signal handling is not supported.
*/
public static function onSignal(int $signo, callable $callback, $data = null): string {
return self::$driver->onSignal($signo, wrap($callback), $data);
return self::$driver->onSignal($signo, $callback, $data);
}
/**

View File

@ -2,7 +2,10 @@
namespace Amp\Loop;
use Amp\Coroutine;
use Amp\Promise;
use Amp\Internal\Watcher;
use function Amp\rethrow;
/**
* Event loop driver which implements all basic operations to allow interoperability.
@ -104,7 +107,15 @@ abstract class Driver {
unset($this->watchers[$watcher->id], $this->deferQueue[$watcher->id]);
$callback = $watcher->callback;
$callback($watcher->id, $watcher->data);
$result = $callback($watcher->id, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
}
$this->dispatch(empty($this->nextTickQueue) && empty($this->enableQueue) && $this->running);

View File

@ -2,7 +2,10 @@
namespace Amp\Loop;
use Amp\Coroutine;
use Amp\Promise;
use Amp\Internal\Watcher;
use function Amp\rethrow;
class EvLoop extends Driver {
/** @var \EvSignal[]|null */
@ -32,7 +35,15 @@ class EvLoop extends Driver {
$watcher = $event->data;
$callback = $watcher->callback;
$callback($watcher->id, $watcher->value, $watcher->data);
$result = $callback($watcher->id, $watcher->value, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
};
$this->timerCallback = function (\EvTimer $event) {
@ -44,7 +55,15 @@ class EvLoop extends Driver {
}
$callback = $watcher->callback;
$callback($watcher->id, $watcher->data);
$result = $callback($watcher->id, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
};
$this->signalCallback = function (\EvSignal $event) {
@ -52,7 +71,15 @@ class EvLoop extends Driver {
$watcher = $event->data;
$callback = $watcher->callback;
$callback($watcher->id, $watcher->value, $watcher->data);
$result = $callback($watcher->id, $watcher->value, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
};
}

View File

@ -2,7 +2,10 @@
namespace Amp\Loop;
use Amp\Coroutine;
use Amp\Promise;
use Amp\Internal\Watcher;
use function Amp\rethrow;
class EventLoop extends Driver {
/** @var \Event[]|null */
@ -29,7 +32,15 @@ class EventLoop extends Driver {
$this->ioCallback = function ($resource, $what, Watcher $watcher) {
$callback = $watcher->callback;
$callback($watcher->id, $watcher->value, $watcher->data);
$result = $callback($watcher->id, $watcher->value, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
};
$this->timerCallback = function ($resource, $what, Watcher $watcher) {
@ -38,12 +49,28 @@ class EventLoop extends Driver {
}
$callback = $watcher->callback;
$callback($watcher->id, $watcher->data);
$result = $callback($watcher->id, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
};
$this->signalCallback = function ($signum, $what, Watcher $watcher) {
$callback = $watcher->callback;
$callback($watcher->id, $watcher->value, $watcher->data);
$result = $callback($watcher->id, $watcher->value, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
};
}

View File

@ -2,7 +2,10 @@
namespace Amp\Loop;
use Amp\Coroutine;
use Amp\Promise;
use Amp\Internal\Watcher;
use function Amp\rethrow;
class NativeLoop extends Driver {
/** @var resource[] */
@ -89,7 +92,15 @@ class NativeLoop extends Driver {
// Execute the timer.
$callback = $watcher->callback;
$callback($id, $watcher->data);
$result = $callback($id, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
}
}
@ -130,7 +141,15 @@ class NativeLoop extends Driver {
}
$callback = $watcher->callback;
$callback($watcher->id, $stream, $watcher->data);
$result = $callback($watcher->id, $stream, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
}
}
}
@ -144,7 +163,15 @@ class NativeLoop extends Driver {
}
$callback = $watcher->callback;
$callback($watcher->id, $stream, $watcher->data);
$result = $callback($watcher->id, $stream, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
}
}
}
@ -277,7 +304,15 @@ class NativeLoop extends Driver {
}
$callback = $watcher->callback;
$callback($watcher->id, $signo, $watcher->data);
$result = $callback($watcher->id, $signo, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
}
}
}

View File

@ -2,7 +2,10 @@
namespace Amp\Loop;
use Amp\Coroutine;
use Amp\Promise;
use Amp\Internal\Watcher;
use function Amp\rethrow;
class UvLoop extends Driver {
/** @var resource A uv_loop resource created with uv_loop_new() */
@ -54,7 +57,15 @@ class UvLoop extends Driver {
foreach ($watchers as $watcher) {
$callback = $watcher->callback;
$callback($watcher->id, $resource, $watcher->data);
$result = $callback($watcher->id, $resource, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
}
};
@ -66,14 +77,30 @@ class UvLoop extends Driver {
}
$callback = $watcher->callback;
$callback($watcher->id, $watcher->data);
$result = $callback($watcher->id, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
};
$this->signalCallback = function ($event, $signo) {
$watcher = $this->watchers[(int) $event];
$callback = $watcher->callback;
$callback($watcher->id, $signo, $watcher->data);
$result = $callback($watcher->id, $signo, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
};
}