1
0
mirror of https://github.com/danog/amp.git synced 2024-11-27 04:24:42 +01:00

Catch watcher callback exceptions individually

Exceptions thrown from watcher callbacks are now forwarded to the loop error handler without ending the current tick. Fixes #74.
This commit is contained in:
Aaron Piotrowski 2017-03-14 00:20:05 -05:00
parent fd7587b0fd
commit d7353d4d8f
5 changed files with 225 additions and 207 deletions

View File

@ -6,7 +6,6 @@ use Amp\Coroutine;
use Amp\Promise;
use Amp\Internal\Watcher;
use React\Promise\PromiseInterface as ReactPromise;
use function Amp\adapt;
use function Amp\rethrow;
/**
@ -100,16 +99,15 @@ abstract class Driver {
$this->activate($this->enableQueue);
$this->enableQueue = [];
try {
foreach ($this->deferQueue as $watcher) {
if (!isset($this->deferQueue[$watcher->id])) {
continue; // Watcher disabled by another defer watcher.
}
foreach ($this->deferQueue as $watcher) {
if (!isset($this->deferQueue[$watcher->id])) {
continue; // Watcher disabled by another defer watcher.
}
unset($this->watchers[$watcher->id], $this->deferQueue[$watcher->id]);
unset($this->watchers[$watcher->id], $this->deferQueue[$watcher->id]);
$callback = $watcher->callback;
$result = $callback($watcher->id, $watcher->data);
try {
$result = ($watcher->callback)($watcher->id, $watcher->data);
if ($result === null) {
continue;
@ -117,25 +115,17 @@ abstract class Driver {
if ($result instanceof \Generator) {
$result = new Coroutine($result);
} elseif ($result instanceof ReactPromise) {
$result = adapt($result);
}
if ($result instanceof Promise) {
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
$this->dispatch(empty($this->nextTickQueue) && empty($this->enableQueue) && $this->running);
} catch (\Throwable $exception) {
if (null === $this->errorHandler) {
throw $exception;
}
$errorHandler = $this->errorHandler;
$errorHandler($exception);
}
$this->dispatch(empty($this->nextTickQueue) && empty($this->enableQueue) && $this->running);
}
/**
@ -559,6 +549,21 @@ abstract class Driver {
return $previous;
}
/**
* Invokes the error handler with the given exception.
*
* @param \Throwable $exception The exception thrown from a watcher callback.
*
* @throws \Throwable If no error handler has been set.
*/
protected function error(\Throwable $exception) {
if ($this->errorHandler === null) {
throw $exception;
}
($this->errorHandler)($exception);
}
/**
* Get the underlying loop handle.
*

View File

@ -6,7 +6,6 @@ use Amp\Coroutine;
use Amp\Promise;
use Amp\Internal\Watcher;
use React\Promise\PromiseInterface as ReactPromise;
use function Amp\adapt;
use function Amp\rethrow;
class EvDriver extends Driver {
@ -36,21 +35,22 @@ class EvDriver extends Driver {
/** @var \Amp\Internal\Watcher $watcher */
$watcher = $event->data;
$callback = $watcher->callback;
$result = $callback($watcher->id, $watcher->value, $watcher->data);
try {
$result = ($watcher->callback)($watcher->id, $watcher->value, $watcher->data);
if ($result === null) {
return;
}
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
} elseif ($result instanceof ReactPromise) {
$result = adapt($result);
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
@ -62,21 +62,22 @@ class EvDriver extends Driver {
$this->cancel($watcher->id);
}
$callback = $watcher->callback;
$result = $callback($watcher->id, $watcher->data);
try {
$result = ($watcher->callback)($watcher->id, $watcher->data);
if ($result === null) {
return;
}
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
} elseif ($result instanceof ReactPromise) {
$result = adapt($result);
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
@ -84,21 +85,22 @@ class EvDriver extends Driver {
/** @var \Amp\Internal\Watcher $watcher */
$watcher = $event->data;
$callback = $watcher->callback;
$result = $callback($watcher->id, $watcher->value, $watcher->data);
$result = ($watcher->callback)($watcher->id, $watcher->value, $watcher->data);
if ($result === null) {
return;
}
try {
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
} elseif ($result instanceof ReactPromise) {
$result = adapt($result);
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
}

View File

@ -6,7 +6,6 @@ use Amp\Coroutine;
use Amp\Promise;
use Amp\Internal\Watcher;
use React\Promise\PromiseInterface as ReactPromise;
use function Amp\adapt;
use function Amp\rethrow;
class EventDriver extends Driver {
@ -33,21 +32,22 @@ class EventDriver extends Driver {
}
$this->ioCallback = function ($resource, $what, Watcher $watcher) {
$callback = $watcher->callback;
$result = $callback($watcher->id, $watcher->value, $watcher->data);
try {
$result = ($watcher->callback)($watcher->id, $watcher->value, $watcher->data);
if ($result === null) {
return;
}
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
} elseif ($result instanceof ReactPromise) {
$result = adapt($result);
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
@ -56,40 +56,42 @@ class EventDriver extends Driver {
$this->cancel($watcher->id);
}
$callback = $watcher->callback;
$result = $callback($watcher->id, $watcher->data);
try {
$result = ($watcher->callback)($watcher->id, $watcher->data);
if ($result === null) {
return;
}
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
} elseif ($result instanceof ReactPromise) {
$result = adapt($result);
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
$this->signalCallback = function ($signum, $what, Watcher $watcher) {
$callback = $watcher->callback;
$result = $callback($watcher->id, $watcher->value, $watcher->data);
try {
$result = ($watcher->callback)($watcher->id, $watcher->value, $watcher->data);
if ($result === null) {
return;
}
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
} elseif ($result instanceof ReactPromise) {
$result = adapt($result);
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
}

View File

@ -6,7 +6,6 @@ use Amp\Coroutine;
use Amp\Promise;
use Amp\Internal\Watcher;
use React\Promise\PromiseInterface as ReactPromise;
use function Amp\adapt;
use function Amp\rethrow;
class NativeDriver extends Driver {
@ -92,18 +91,19 @@ class NativeDriver extends Driver {
$this->cancel($id);
}
// Execute the timer.
$callback = $watcher->callback;
$result = $callback($id, $watcher->data);
try {
// Execute the timer.
$result = ($watcher->callback)($id, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
} elseif ($result instanceof ReactPromise) {
$result = adapt($result);
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
}
}
@ -133,62 +133,68 @@ class NativeDriver extends Driver {
$except = null;
// Error reporting suppressed since stream_select() emits an E_WARNING if it is interrupted by a signal.
$count = @\stream_select($read, $write, $except, $seconds, $microseconds);
if (!@\stream_select($read, $write, $except, $seconds, $microseconds)) {
return;
}
if ($count) {
foreach ($read as $stream) {
$streamId = (int) $stream;
if (isset($this->readWatchers[$streamId])) {
foreach ($this->readWatchers[$streamId] as $watcher) {
if (!isset($this->readWatchers[$streamId][$watcher->id])) {
continue; // Watcher disabled by another IO watcher.
}
$callback = $watcher->callback;
$result = $callback($watcher->id, $stream, $watcher->data);
if ($result === null) {
continue;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
} elseif ($result instanceof ReactPromise) {
$result = adapt($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
}
}
foreach ($read as $stream) {
$streamId = (int) $stream;
if (!isset($this->readWatchers[$streamId])) {
continue; // All read watchers disabled.
}
foreach ($write as $stream) {
$streamId = (int) $stream;
if (isset($this->writeWatchers[$streamId])) {
foreach ($this->writeWatchers[$streamId] as $watcher) {
if (!isset($this->writeWatchers[$streamId][$watcher->id])) {
continue; // Watcher disabled by another IO watcher.
}
foreach ($this->readWatchers[$streamId] as $watcher) {
if (!isset($this->readWatchers[$streamId][$watcher->id])) {
continue; // Watcher disabled by another IO watcher.
}
$callback = $watcher->callback;
$result = $callback($watcher->id, $stream, $watcher->data);
try {
$result = ($watcher->callback)($watcher->id, $stream, $watcher->data);
if ($result === null) {
continue;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
} elseif ($result instanceof ReactPromise) {
$result = adapt($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
if ($result === null) {
continue;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
}
}
foreach ($write as $stream) {
$streamId = (int) $stream;
if (!isset($this->writeWatchers[$streamId])) {
continue; // All write watchers disabled.
}
foreach ($this->writeWatchers[$streamId] as $watcher) {
if (!isset($this->writeWatchers[$streamId][$watcher->id])) {
continue; // Watcher disabled by another IO watcher.
}
try {
$result = ($watcher->callback)($watcher->id, $stream, $watcher->data);
if ($result === null) {
continue;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
}
}
@ -319,21 +325,22 @@ class NativeDriver extends Driver {
continue;
}
$callback = $watcher->callback;
$result = $callback($watcher->id, $signo, $watcher->data);
try {
$result = ($watcher->callback)($watcher->id, $signo, $watcher->data);
if ($result === null) {
continue;
}
if ($result === null) {
continue;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
} elseif ($result instanceof ReactPromise) {
$result = adapt($result);
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
}
}

View File

@ -6,7 +6,6 @@ use Amp\Coroutine;
use Amp\Promise;
use Amp\Internal\Watcher;
use React\Promise\PromiseInterface as ReactPromise;
use function Amp\adapt;
use function Amp\rethrow;
class UvDriver extends Driver {
@ -58,21 +57,22 @@ class UvDriver extends Driver {
$watchers = $this->watchers[(int) $event];
foreach ($watchers as $watcher) {
$callback = $watcher->callback;
$result = $callback($watcher->id, $resource, $watcher->data);
try {
$result = ($watcher->callback)($watcher->id, $resource, $watcher->data);
if ($result === null) {
return;
}
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
} elseif ($result instanceof ReactPromise) {
$result = adapt($result);
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
}
};
@ -84,42 +84,44 @@ class UvDriver extends Driver {
$this->cancel($watcher->id);
}
$callback = $watcher->callback;
$result = $callback($watcher->id, $watcher->data);
try {
$result = ($watcher->callback)($watcher->id, $watcher->data);
if ($result === null) {
return;
}
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
} elseif ($result instanceof ReactPromise) {
$result = adapt($result);
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
$this->signalCallback = function ($event, $signo) {
$watcher = $this->watchers[(int) $event];
$callback = $watcher->callback;
$result = $callback($watcher->id, $signo, $watcher->data);
try {
$result = ($watcher->callback)($watcher->id, $signo, $watcher->data);
if ($result === null) {
return;
}
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
} elseif ($result instanceof ReactPromise) {
$result = adapt($result);
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
}