From d7353d4d8f5788af11992833f3da2362b7bfd031 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Tue, 14 Mar 2017 00:20:05 -0500 Subject: [PATCH] Catch watcher callback exceptions individually Exceptions thrown from watcher callbacks are now forwarded to the loop error handler without ending the current tick. Fixes #74. --- lib/Loop/Driver.php | 49 ++++++------ lib/Loop/EvDriver.php | 76 ++++++++++--------- lib/Loop/EventDriver.php | 78 +++++++++---------- lib/Loop/NativeDriver.php | 153 ++++++++++++++++++++------------------ lib/Loop/UvDriver.php | 76 ++++++++++--------- 5 files changed, 225 insertions(+), 207 deletions(-) diff --git a/lib/Loop/Driver.php b/lib/Loop/Driver.php index 9d174ad..5dbcedf 100644 --- a/lib/Loop/Driver.php +++ b/lib/Loop/Driver.php @@ -6,7 +6,6 @@ use Amp\Coroutine; use Amp\Promise; use Amp\Internal\Watcher; use React\Promise\PromiseInterface as ReactPromise; -use function Amp\adapt; use function Amp\rethrow; /** @@ -100,16 +99,15 @@ abstract class Driver { $this->activate($this->enableQueue); $this->enableQueue = []; - try { - foreach ($this->deferQueue as $watcher) { - if (!isset($this->deferQueue[$watcher->id])) { - continue; // Watcher disabled by another defer watcher. - } + foreach ($this->deferQueue as $watcher) { + if (!isset($this->deferQueue[$watcher->id])) { + continue; // Watcher disabled by another defer watcher. + } - unset($this->watchers[$watcher->id], $this->deferQueue[$watcher->id]); + unset($this->watchers[$watcher->id], $this->deferQueue[$watcher->id]); - $callback = $watcher->callback; - $result = $callback($watcher->id, $watcher->data); + try { + $result = ($watcher->callback)($watcher->id, $watcher->data); if ($result === null) { continue; @@ -117,25 +115,17 @@ abstract class Driver { if ($result instanceof \Generator) { $result = new Coroutine($result); - } elseif ($result instanceof ReactPromise) { - $result = adapt($result); } - if ($result instanceof Promise) { + if ($result instanceof Promise || $result instanceof ReactPromise) { rethrow($result); } + } catch (\Throwable $exception) { + $this->error($exception); } - - $this->dispatch(empty($this->nextTickQueue) && empty($this->enableQueue) && $this->running); - - } catch (\Throwable $exception) { - if (null === $this->errorHandler) { - throw $exception; - } - - $errorHandler = $this->errorHandler; - $errorHandler($exception); } + + $this->dispatch(empty($this->nextTickQueue) && empty($this->enableQueue) && $this->running); } /** @@ -559,6 +549,21 @@ abstract class Driver { return $previous; } + /** + * Invokes the error handler with the given exception. + * + * @param \Throwable $exception The exception thrown from a watcher callback. + * + * @throws \Throwable If no error handler has been set. + */ + protected function error(\Throwable $exception) { + if ($this->errorHandler === null) { + throw $exception; + } + + ($this->errorHandler)($exception); + } + /** * Get the underlying loop handle. * diff --git a/lib/Loop/EvDriver.php b/lib/Loop/EvDriver.php index 9e97226..78f66eb 100644 --- a/lib/Loop/EvDriver.php +++ b/lib/Loop/EvDriver.php @@ -6,7 +6,6 @@ use Amp\Coroutine; use Amp\Promise; use Amp\Internal\Watcher; use React\Promise\PromiseInterface as ReactPromise; -use function Amp\adapt; use function Amp\rethrow; class EvDriver extends Driver { @@ -36,21 +35,22 @@ class EvDriver extends Driver { /** @var \Amp\Internal\Watcher $watcher */ $watcher = $event->data; - $callback = $watcher->callback; - $result = $callback($watcher->id, $watcher->value, $watcher->data); + try { + $result = ($watcher->callback)($watcher->id, $watcher->value, $watcher->data); - if ($result === null) { - return; - } + if ($result === null) { + return; + } - if ($result instanceof \Generator) { - $result = new Coroutine($result); - } elseif ($result instanceof ReactPromise) { - $result = adapt($result); - } + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } - if ($result instanceof Promise) { - rethrow($result); + if ($result instanceof Promise || $result instanceof ReactPromise) { + rethrow($result); + } + } catch (\Throwable $exception) { + $this->error($exception); } }; @@ -62,21 +62,22 @@ class EvDriver extends Driver { $this->cancel($watcher->id); } - $callback = $watcher->callback; - $result = $callback($watcher->id, $watcher->data); + try { + $result = ($watcher->callback)($watcher->id, $watcher->data); - if ($result === null) { - return; - } + if ($result === null) { + return; + } - if ($result instanceof \Generator) { - $result = new Coroutine($result); - } elseif ($result instanceof ReactPromise) { - $result = adapt($result); - } + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } - if ($result instanceof Promise) { - rethrow($result); + if ($result instanceof Promise || $result instanceof ReactPromise) { + rethrow($result); + } + } catch (\Throwable $exception) { + $this->error($exception); } }; @@ -84,21 +85,22 @@ class EvDriver extends Driver { /** @var \Amp\Internal\Watcher $watcher */ $watcher = $event->data; - $callback = $watcher->callback; - $result = $callback($watcher->id, $watcher->value, $watcher->data); + $result = ($watcher->callback)($watcher->id, $watcher->value, $watcher->data); - if ($result === null) { - return; - } + try { + if ($result === null) { + return; + } - if ($result instanceof \Generator) { - $result = new Coroutine($result); - } elseif ($result instanceof ReactPromise) { - $result = adapt($result); - } + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } - if ($result instanceof Promise) { - rethrow($result); + if ($result instanceof Promise || $result instanceof ReactPromise) { + rethrow($result); + } + } catch (\Throwable $exception) { + $this->error($exception); } }; } diff --git a/lib/Loop/EventDriver.php b/lib/Loop/EventDriver.php index 3732898..de551ef 100644 --- a/lib/Loop/EventDriver.php +++ b/lib/Loop/EventDriver.php @@ -6,7 +6,6 @@ use Amp\Coroutine; use Amp\Promise; use Amp\Internal\Watcher; use React\Promise\PromiseInterface as ReactPromise; -use function Amp\adapt; use function Amp\rethrow; class EventDriver extends Driver { @@ -33,21 +32,22 @@ class EventDriver extends Driver { } $this->ioCallback = function ($resource, $what, Watcher $watcher) { - $callback = $watcher->callback; - $result = $callback($watcher->id, $watcher->value, $watcher->data); + try { + $result = ($watcher->callback)($watcher->id, $watcher->value, $watcher->data); - if ($result === null) { - return; - } + if ($result === null) { + return; + } - if ($result instanceof \Generator) { - $result = new Coroutine($result); - } elseif ($result instanceof ReactPromise) { - $result = adapt($result); - } + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } - if ($result instanceof Promise) { - rethrow($result); + if ($result instanceof Promise || $result instanceof ReactPromise) { + rethrow($result); + } + } catch (\Throwable $exception) { + $this->error($exception); } }; @@ -55,41 +55,43 @@ class EventDriver extends Driver { if ($watcher->type & Watcher::DELAY) { $this->cancel($watcher->id); } - - $callback = $watcher->callback; - $result = $callback($watcher->id, $watcher->data); - if ($result === null) { - return; - } + try { + $result = ($watcher->callback)($watcher->id, $watcher->data); - if ($result instanceof \Generator) { - $result = new Coroutine($result); - } elseif ($result instanceof ReactPromise) { - $result = adapt($result); - } + if ($result === null) { + return; + } - if ($result instanceof Promise) { - rethrow($result); + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } + + if ($result instanceof Promise || $result instanceof ReactPromise) { + rethrow($result); + } + } catch (\Throwable $exception) { + $this->error($exception); } }; $this->signalCallback = function ($signum, $what, Watcher $watcher) { - $callback = $watcher->callback; - $result = $callback($watcher->id, $watcher->value, $watcher->data); + try { + $result = ($watcher->callback)($watcher->id, $watcher->value, $watcher->data); - if ($result === null) { - return; - } + if ($result === null) { + return; + } - if ($result instanceof \Generator) { - $result = new Coroutine($result); - } elseif ($result instanceof ReactPromise) { - $result = adapt($result); - } + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } - if ($result instanceof Promise) { - rethrow($result); + if ($result instanceof Promise || $result instanceof ReactPromise) { + rethrow($result); + } + } catch (\Throwable $exception) { + $this->error($exception); } }; } diff --git a/lib/Loop/NativeDriver.php b/lib/Loop/NativeDriver.php index b873e0d..49b7bcf 100644 --- a/lib/Loop/NativeDriver.php +++ b/lib/Loop/NativeDriver.php @@ -6,7 +6,6 @@ use Amp\Coroutine; use Amp\Promise; use Amp\Internal\Watcher; use React\Promise\PromiseInterface as ReactPromise; -use function Amp\adapt; use function Amp\rethrow; class NativeDriver extends Driver { @@ -92,18 +91,19 @@ class NativeDriver extends Driver { $this->cancel($id); } - // Execute the timer. - $callback = $watcher->callback; - $result = $callback($id, $watcher->data); + try { + // Execute the timer. + $result = ($watcher->callback)($id, $watcher->data); - if ($result instanceof \Generator) { - $result = new Coroutine($result); - } elseif ($result instanceof ReactPromise) { - $result = adapt($result); - } + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } - if ($result instanceof Promise) { - rethrow($result); + if ($result instanceof Promise || $result instanceof ReactPromise) { + rethrow($result); + } + } catch (\Throwable $exception) { + $this->error($exception); } } } @@ -133,62 +133,68 @@ class NativeDriver extends Driver { $except = null; // Error reporting suppressed since stream_select() emits an E_WARNING if it is interrupted by a signal. - $count = @\stream_select($read, $write, $except, $seconds, $microseconds); + if (!@\stream_select($read, $write, $except, $seconds, $microseconds)) { + return; + } - if ($count) { - foreach ($read as $stream) { - $streamId = (int) $stream; - if (isset($this->readWatchers[$streamId])) { - foreach ($this->readWatchers[$streamId] as $watcher) { - if (!isset($this->readWatchers[$streamId][$watcher->id])) { - continue; // Watcher disabled by another IO watcher. - } - - $callback = $watcher->callback; - $result = $callback($watcher->id, $stream, $watcher->data); - - if ($result === null) { - continue; - } - - if ($result instanceof \Generator) { - $result = new Coroutine($result); - } elseif ($result instanceof ReactPromise) { - $result = adapt($result); - } - - if ($result instanceof Promise) { - rethrow($result); - } - } - } + foreach ($read as $stream) { + $streamId = (int) $stream; + if (!isset($this->readWatchers[$streamId])) { + continue; // All read watchers disabled. } - foreach ($write as $stream) { - $streamId = (int) $stream; - if (isset($this->writeWatchers[$streamId])) { - foreach ($this->writeWatchers[$streamId] as $watcher) { - if (!isset($this->writeWatchers[$streamId][$watcher->id])) { - continue; // Watcher disabled by another IO watcher. - } + foreach ($this->readWatchers[$streamId] as $watcher) { + if (!isset($this->readWatchers[$streamId][$watcher->id])) { + continue; // Watcher disabled by another IO watcher. + } - $callback = $watcher->callback; - $result = $callback($watcher->id, $stream, $watcher->data); + try { + $result = ($watcher->callback)($watcher->id, $stream, $watcher->data); - if ($result === null) { - continue; - } - - if ($result instanceof \Generator) { - $result = new Coroutine($result); - } elseif ($result instanceof ReactPromise) { - $result = adapt($result); - } - - if ($result instanceof Promise) { - rethrow($result); - } + if ($result === null) { + continue; } + + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } + + if ($result instanceof Promise || $result instanceof ReactPromise) { + rethrow($result); + } + } catch (\Throwable $exception) { + $this->error($exception); + } + } + } + + foreach ($write as $stream) { + $streamId = (int) $stream; + if (!isset($this->writeWatchers[$streamId])) { + continue; // All write watchers disabled. + } + + foreach ($this->writeWatchers[$streamId] as $watcher) { + if (!isset($this->writeWatchers[$streamId][$watcher->id])) { + continue; // Watcher disabled by another IO watcher. + } + + try { + $result = ($watcher->callback)($watcher->id, $stream, $watcher->data); + + if ($result === null) { + continue; + } + + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } + + if ($result instanceof Promise || $result instanceof ReactPromise) { + rethrow($result); + } + } catch (\Throwable $exception) { + $this->error($exception); } } } @@ -319,21 +325,22 @@ class NativeDriver extends Driver { continue; } - $callback = $watcher->callback; - $result = $callback($watcher->id, $signo, $watcher->data); + try { + $result = ($watcher->callback)($watcher->id, $signo, $watcher->data); - if ($result === null) { - continue; - } + if ($result === null) { + continue; + } - if ($result instanceof \Generator) { - $result = new Coroutine($result); - } elseif ($result instanceof ReactPromise) { - $result = adapt($result); - } + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } - if ($result instanceof Promise) { - rethrow($result); + if ($result instanceof Promise || $result instanceof ReactPromise) { + rethrow($result); + } + } catch (\Throwable $exception) { + $this->error($exception); } } } diff --git a/lib/Loop/UvDriver.php b/lib/Loop/UvDriver.php index 8a63f88..b1f00f6 100644 --- a/lib/Loop/UvDriver.php +++ b/lib/Loop/UvDriver.php @@ -6,7 +6,6 @@ use Amp\Coroutine; use Amp\Promise; use Amp\Internal\Watcher; use React\Promise\PromiseInterface as ReactPromise; -use function Amp\adapt; use function Amp\rethrow; class UvDriver extends Driver { @@ -58,21 +57,22 @@ class UvDriver extends Driver { $watchers = $this->watchers[(int) $event]; foreach ($watchers as $watcher) { - $callback = $watcher->callback; - $result = $callback($watcher->id, $resource, $watcher->data); + try { + $result = ($watcher->callback)($watcher->id, $resource, $watcher->data); - if ($result === null) { - return; - } + if ($result === null) { + return; + } - if ($result instanceof \Generator) { - $result = new Coroutine($result); - } elseif ($result instanceof ReactPromise) { - $result = adapt($result); - } + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } - if ($result instanceof Promise) { - rethrow($result); + if ($result instanceof Promise || $result instanceof ReactPromise) { + rethrow($result); + } + } catch (\Throwable $exception) { + $this->error($exception); } } }; @@ -84,42 +84,44 @@ class UvDriver extends Driver { $this->cancel($watcher->id); } - $callback = $watcher->callback; - $result = $callback($watcher->id, $watcher->data); + try { + $result = ($watcher->callback)($watcher->id, $watcher->data); - if ($result === null) { - return; - } + if ($result === null) { + return; + } - if ($result instanceof \Generator) { - $result = new Coroutine($result); - } elseif ($result instanceof ReactPromise) { - $result = adapt($result); - } + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } - if ($result instanceof Promise) { - rethrow($result); + if ($result instanceof Promise || $result instanceof ReactPromise) { + rethrow($result); + } + } catch (\Throwable $exception) { + $this->error($exception); } }; $this->signalCallback = function ($event, $signo) { $watcher = $this->watchers[(int) $event]; - $callback = $watcher->callback; - $result = $callback($watcher->id, $signo, $watcher->data); + try { + $result = ($watcher->callback)($watcher->id, $signo, $watcher->data); - if ($result === null) { - return; - } + if ($result === null) { + return; + } - if ($result instanceof \Generator) { - $result = new Coroutine($result); - } elseif ($result instanceof ReactPromise) { - $result = adapt($result); - } + if ($result instanceof \Generator) { + $result = new Coroutine($result); + } - if ($result instanceof Promise) { - rethrow($result); + if ($result instanceof Promise || $result instanceof ReactPromise) { + rethrow($result); + } + } catch (\Throwable $exception) { + $this->error($exception); } }; }